diff rep/handler/Editor.java @ 399:19705f4b8015

waitingCommandInMerge
author one
date Mon, 24 Nov 2008 23:11:51 +0900
parents 7de83b6a34e7
children 29f01a7ce71f
line wrap: on
line diff
--- a/rep/handler/Editor.java	Sun Nov 23 18:38:52 2008 +0900
+++ b/rep/handler/Editor.java	Mon Nov 24 23:11:51 2008 +0900
@@ -18,6 +18,7 @@
 	private Translator translator;
 	// REPCommands we are going to send to the next editor
 	private List<REPCommand> sentList = new LinkedList<REPCommand>();
+	protected List<PacketSet> waitingCommandInMerge= new LinkedList<PacketSet>();
 	private REPCommand quit2=null;
 	private boolean merging;
 	private REPCommand preMergeCommand;
@@ -74,8 +75,9 @@
 				if (waitingRequired(command)) return;
 				keep = new REPCommand(command);
 				sentList.add(keep);
-				ServerMainLoop.logger.writeLog("Editor eid:"+eid+" sentList = "+sentList);
-				((Editor) next).checkReturnedCommand(command);
+				checkDouble(sentList);
+				next.forwardedCommandManage(command);
+				// ((Editor) next).checkReturnedCommand(command);
 			} else
 				next.send(command);
 		} else {
@@ -91,23 +93,40 @@
 		return;
 	}
 	
+	private void checkDouble(List<REPCommand> sentList) {
+		if (sentList.size()==0) return;
+		int count = 0;
+		REPCommand f = sentList.get(0); 
+		for(REPCommand c:sentList) {
+			if (c.eid==f.eid&&c.seq==f.seq) {
+				count++;
+			}
+		}
+		assert(count==1);
+	}
+
 	private boolean waitingRequired(REPCommand command) {
-		if (manager.hasWaitingCommand(channel)) {
+		if (hasWaitingCommand()) {
 			// We cannot do this operation before watingCommandQueue.
-			manager.addWaitingCommand(new PacketSet(channel, this, command));
+			addWaitingCommand(new PacketSet(channel, this, command));
 			return true;
 		} else if (isMerging()) { 
-			manager.addWaitingCommand(new PacketSet(getChannel(), this, new REPCommand(command)));
+			addWaitingCommand(new PacketSet(getChannel(), this, new REPCommand(command)));
 			return true;
 		} 
 		ServerMainLoop.logger.writeLog("Editor eid:"+eid+" no waiting");
 		return false;
 	}
 
+	public void addWaitingCommand(PacketSet set) {
+		waitingCommandInMerge.add(set);
+	}
+	
 	private void sendEditorCommand(REPCommand command) {
 		REPCommand keep = new REPCommand(command);
 		sentList.add(keep);
 		ServerMainLoop.logger.writeLog("Editor eid:"+eid+" sentList = "+sentList);
+		checkDouble(sentList);
 		assert(sentList.size()<limit);
 		next.send(command);
 	}
@@ -122,6 +141,11 @@
 	 * @param command
 	 */
 	void checkReturnedCommand(REPCommand command) {
+		assert(!merging);
+		if (sentList.size()==0) {
+			ServerMainLoop.logger.writeLog("Editor eid="+eid+" looped command not registered: "+command);
+			assert(false);
+		}
 		REPCommand prev = sentList.remove(0);
 		ServerMainLoop.logger.writeLog("Editor eid="+eid+" remove sentList:"+(prev==null?"null":prev));
 		if (prev==null || prev.seq != command.seq || prev.eid!=command.eid) {
@@ -201,6 +225,7 @@
 			sentList.add(keep);
 			ServerMainLoop.logger.writeLog("Editor eid:"+eid+" sentList = "+sentList);
 			assert(sentList.size()<limit);
+			checkDouble(sentList);
 			next.send(keep);
 		} else {
 			next.send(preMergeCommand);
@@ -318,6 +343,7 @@
 	 * it is forwarded here.
 	 */
 	public void forwardedCommandManage(REPCommand command) {
+		if (waitingRequired(command)) return;
 		if (command.cmd==REP.SMCMD_QUIT_2) {
 			// we have to wait next editor's finishing before sending this.
 			// this is odd, but the editor itself does not know it's merging
@@ -326,22 +352,16 @@
 		} else if (command.eid==eid) {
 			// if we handle in editor.manage(), this editor cannot distinguish this
 			// and user input command from the editor.
-			REPCommand keep;
+//			REPCommand keep;
 			switch(command.cmd) {
-			case REPCMD_DELETE_ACK:
-			case REPCMD_INSERT_ACK:
-				checkReturnedCommand(command);
-				return ;
 			case REPCMD_INSERT:
-				keep = new REPCommand(command);
-				keep.cmd = REP.REPCMD_INSERT_ACK;
-				sentList.add(keep);
-				checkReturnedCommand(command);
-				return;
 			case REPCMD_DELETE:
-				keep = new REPCommand(command);
-				keep.cmd = REP.REPCMD_DELETE_ACK;
-				sentList.add(keep);
+//				keep = new REPCommand(command);
+//				sentList.add(keep);				
+//				ServerMainLoop.logger.writeLog("Editor eid:"+eid+" sentList = "+sentList);
+//				checkDouble(sentList);
+			case REPCMD_INSERT_ACK:
+			case REPCMD_DELETE_ACK:
 				checkReturnedCommand(command);
 				return;
 			}
@@ -349,4 +369,32 @@
 		send(command);
 	}
 
+	/**
+	 * Check waiting command in merge
+	 * @return true if there is a processed waiting command
+	 * @throws IOException
+	 */
+	public void checkWaitingCommandInMerge() {
+		int count = waitingCommandInMerge.size();
+		if (count==0) return;
+		if (isMerging()) return;
+		while(count++>0) {
+			PacketSet p = waitingCommandInMerge.remove(0);
+			try {
+				//					if (manager.sessionManage(e, p.command)) { // we don't need this
+				//						assert false;
+				//						return;
+				//					}
+				manage(p.command);
+			} catch (Exception e1) {
+				// should be e.close()?
+				manager.close(p.channel);
+			}
+		}
+	}
+
+
+	public boolean hasWaitingCommand() {
+		return waitingCommandInMerge.size()>0;
+	}
 }