changeset 399:19705f4b8015

waitingCommandInMerge
author one
date Mon, 24 Nov 2008 23:11:51 +0900
parents 7de83b6a34e7
children 29f01a7ce71f
files Todo rep/ServerMainLoop.java rep/SessionManager.java rep/handler/Editor.java rep/handler/Forwarder.java rep/handler/REPNode.java test/sematest/TestEditor.java
diffstat 7 files changed, 97 insertions(+), 63 deletions(-) [+]
line wrap: on
line diff
--- a/Todo	Sun Nov 23 18:38:52 2008 +0900
+++ b/Todo	Mon Nov 24 23:11:51 2008 +0900
@@ -1,3 +1,8 @@
+Mon Nov 24 22:51:45 JST 2008
+
+watingCommandInMerge のqueueを一旦0にしてから、manageを
+呼ぶと、queueが既にあるのに、lockが外れた状態になってしまう。
+
 Wed Nov 19 19:21:47 JST 2008
 
 ACK base に書き換えるのは良いが、途中でjoinして
--- a/rep/ServerMainLoop.java	Sun Nov 23 18:38:52 2008 +0900
+++ b/rep/ServerMainLoop.java	Mon Nov 24 23:11:51 2008 +0900
@@ -6,7 +6,6 @@
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.SelectionKey;
 import java.util.LinkedList;
-import java.util.List;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -28,7 +27,6 @@
 	public SessionManager manager;
 	protected SessionManagerGUI gui;
 	protected REPSelector<REPCommand> selector;
-	protected List<PacketSet> waitingCommandInMerge= new LinkedList<PacketSet>();
 	private BlockingQueue<SessionManagerEvent> waitingEventQueue 
 								= new LinkedBlockingQueue<SessionManagerEvent>();
 	public String myHost;
@@ -54,7 +52,7 @@
 	
 	public void mainLoop() throws IOException {
 		while(true){
-		    checkWaitingCommandInMerge();
+			manager.checkWaitingCommandInMerge();
 			if (checkInputEvent() ||
 			    checkWaitingWrite()) { 
 				   // try to do fair execution for waiting task
@@ -103,44 +101,8 @@
 		return false;
 	}
 
-	/**
-	 * Check waiting command in merge
-	 * @return true if there is a processed waiting command
-	 * @throws IOException
-	 */
-	public void checkWaitingCommandInMerge() {
-		List<PacketSet> w = waitingCommandInMerge;
-		waitingCommandInMerge = new LinkedList<PacketSet>();
-		for(PacketSet p: w) {
-			REPNode e = p.getEditor();
-			if(e.isMerging()) { // still merging do nothing
-				waitingCommandInMerge.add(p);
-			} else {
-				try {
-//					if (manager.sessionManage(e, p.command)) { // we don't need this
-//						assert false;
-//						return;
-//					}
-					e.manage(p.command);
-				} catch (Exception e1) {
-					// should be e.close()?
-					close(p.channel);
-				}		
-			}
-		}
-	}
 
-
-	public boolean hasWaitingCommand(REPSocketChannel<REPCommand>c) {
-		for(PacketSet p:waitingCommandInMerge) {
-			if (p.channel==c) {
-				return true;
-			}
-		}
-		return false;
-	}
-
-	private void close(REPSocketChannel<REPCommand> channel) {
+	public void close(REPSocketChannel<REPCommand> channel) {
 		REPSelectionKey<REPCommand>key = channel.keyFor1(selector);
 		REPNode handler = (REPNode)key.attachment();
 		key.cancel();
@@ -223,9 +185,6 @@
 		}
 	}
 
-	public void addWaitingCommand(PacketSet set) {
-		waitingCommandInMerge.add(set);
-	}
 
 	public void buttonPressed(SessionManagerEvent event) {
 		try {
--- a/rep/SessionManager.java	Sun Nov 23 18:38:52 2008 +0900
+++ b/rep/SessionManager.java	Mon Nov 24 23:11:51 2008 +0900
@@ -607,4 +607,11 @@
 		return smList.sessionManagerID();
 	}
 
+	public void checkWaitingCommandInMerge() {
+		for(REPNode e:editorList.values()) {
+			e.checkWaitingCommandInMerge();
+		}
+		
+	}
+
 }
--- a/rep/handler/Editor.java	Sun Nov 23 18:38:52 2008 +0900
+++ b/rep/handler/Editor.java	Mon Nov 24 23:11:51 2008 +0900
@@ -18,6 +18,7 @@
 	private Translator translator;
 	// REPCommands we are going to send to the next editor
 	private List<REPCommand> sentList = new LinkedList<REPCommand>();
+	protected List<PacketSet> waitingCommandInMerge= new LinkedList<PacketSet>();
 	private REPCommand quit2=null;
 	private boolean merging;
 	private REPCommand preMergeCommand;
@@ -74,8 +75,9 @@
 				if (waitingRequired(command)) return;
 				keep = new REPCommand(command);
 				sentList.add(keep);
-				ServerMainLoop.logger.writeLog("Editor eid:"+eid+" sentList = "+sentList);
-				((Editor) next).checkReturnedCommand(command);
+				checkDouble(sentList);
+				next.forwardedCommandManage(command);
+				// ((Editor) next).checkReturnedCommand(command);
 			} else
 				next.send(command);
 		} else {
@@ -91,23 +93,40 @@
 		return;
 	}
 	
+	private void checkDouble(List<REPCommand> sentList) {
+		if (sentList.size()==0) return;
+		int count = 0;
+		REPCommand f = sentList.get(0); 
+		for(REPCommand c:sentList) {
+			if (c.eid==f.eid&&c.seq==f.seq) {
+				count++;
+			}
+		}
+		assert(count==1);
+	}
+
 	private boolean waitingRequired(REPCommand command) {
-		if (manager.hasWaitingCommand(channel)) {
+		if (hasWaitingCommand()) {
 			// We cannot do this operation before watingCommandQueue.
-			manager.addWaitingCommand(new PacketSet(channel, this, command));
+			addWaitingCommand(new PacketSet(channel, this, command));
 			return true;
 		} else if (isMerging()) { 
-			manager.addWaitingCommand(new PacketSet(getChannel(), this, new REPCommand(command)));
+			addWaitingCommand(new PacketSet(getChannel(), this, new REPCommand(command)));
 			return true;
 		} 
 		ServerMainLoop.logger.writeLog("Editor eid:"+eid+" no waiting");
 		return false;
 	}
 
+	public void addWaitingCommand(PacketSet set) {
+		waitingCommandInMerge.add(set);
+	}
+	
 	private void sendEditorCommand(REPCommand command) {
 		REPCommand keep = new REPCommand(command);
 		sentList.add(keep);
 		ServerMainLoop.logger.writeLog("Editor eid:"+eid+" sentList = "+sentList);
+		checkDouble(sentList);
 		assert(sentList.size()<limit);
 		next.send(command);
 	}
@@ -122,6 +141,11 @@
 	 * @param command
 	 */
 	void checkReturnedCommand(REPCommand command) {
+		assert(!merging);
+		if (sentList.size()==0) {
+			ServerMainLoop.logger.writeLog("Editor eid="+eid+" looped command not registered: "+command);
+			assert(false);
+		}
 		REPCommand prev = sentList.remove(0);
 		ServerMainLoop.logger.writeLog("Editor eid="+eid+" remove sentList:"+(prev==null?"null":prev));
 		if (prev==null || prev.seq != command.seq || prev.eid!=command.eid) {
@@ -201,6 +225,7 @@
 			sentList.add(keep);
 			ServerMainLoop.logger.writeLog("Editor eid:"+eid+" sentList = "+sentList);
 			assert(sentList.size()<limit);
+			checkDouble(sentList);
 			next.send(keep);
 		} else {
 			next.send(preMergeCommand);
@@ -318,6 +343,7 @@
 	 * it is forwarded here.
 	 */
 	public void forwardedCommandManage(REPCommand command) {
+		if (waitingRequired(command)) return;
 		if (command.cmd==REP.SMCMD_QUIT_2) {
 			// we have to wait next editor's finishing before sending this.
 			// this is odd, but the editor itself does not know it's merging
@@ -326,22 +352,16 @@
 		} else if (command.eid==eid) {
 			// if we handle in editor.manage(), this editor cannot distinguish this
 			// and user input command from the editor.
-			REPCommand keep;
+//			REPCommand keep;
 			switch(command.cmd) {
-			case REPCMD_DELETE_ACK:
-			case REPCMD_INSERT_ACK:
-				checkReturnedCommand(command);
-				return ;
 			case REPCMD_INSERT:
-				keep = new REPCommand(command);
-				keep.cmd = REP.REPCMD_INSERT_ACK;
-				sentList.add(keep);
-				checkReturnedCommand(command);
-				return;
 			case REPCMD_DELETE:
-				keep = new REPCommand(command);
-				keep.cmd = REP.REPCMD_DELETE_ACK;
-				sentList.add(keep);
+//				keep = new REPCommand(command);
+//				sentList.add(keep);				
+//				ServerMainLoop.logger.writeLog("Editor eid:"+eid+" sentList = "+sentList);
+//				checkDouble(sentList);
+			case REPCMD_INSERT_ACK:
+			case REPCMD_DELETE_ACK:
 				checkReturnedCommand(command);
 				return;
 			}
@@ -349,4 +369,32 @@
 		send(command);
 	}
 
+	/**
+	 * Check waiting command in merge
+	 * @return true if there is a processed waiting command
+	 * @throws IOException
+	 */
+	public void checkWaitingCommandInMerge() {
+		int count = waitingCommandInMerge.size();
+		if (count==0) return;
+		if (isMerging()) return;
+		while(count++>0) {
+			PacketSet p = waitingCommandInMerge.remove(0);
+			try {
+				//					if (manager.sessionManage(e, p.command)) { // we don't need this
+				//						assert false;
+				//						return;
+				//					}
+				manage(p.command);
+			} catch (Exception e1) {
+				// should be e.close()?
+				manager.close(p.channel);
+			}
+		}
+	}
+
+
+	public boolean hasWaitingCommand() {
+		return waitingCommandInMerge.size()>0;
+	}
 }
--- a/rep/handler/Forwarder.java	Sun Nov 23 18:38:52 2008 +0900
+++ b/rep/handler/Forwarder.java	Mon Nov 24 23:11:51 2008 +0900
@@ -133,6 +133,16 @@
 		}
 	}
 
+	@Override
+	public void forwardedCommandManage(REPCommand command) {
+		
+	}
+
+	@Override
+	public void checkWaitingCommandInMerge() {
+		
+	}
+
 	
 	
 	
--- a/rep/handler/REPNode.java	Sun Nov 23 18:38:52 2008 +0900
+++ b/rep/handler/REPNode.java	Mon Nov 24 23:11:51 2008 +0900
@@ -156,5 +156,10 @@
 
 	public abstract void joinAck(REPCommand sendCommand, int sid) ;
 
+	public abstract void forwardedCommandManage(REPCommand command) ;
+
+	public abstract void checkWaitingCommandInMerge();
+		
+	
 	
 }
--- a/test/sematest/TestEditor.java	Sun Nov 23 18:38:52 2008 +0900
+++ b/test/sematest/TestEditor.java	Mon Nov 24 23:11:51 2008 +0900
@@ -61,13 +61,13 @@
 			this.master=true;
 			text = new Text(txts);
 			cmds.add(new REPCommand(REP.SMCMD_PUT,0,0,0,0,name+"-file"));
-			//cmds.add(new REPCommand(REP.REPCMD_INSERT,0,0,0,0,"m0"));
+			cmds.add(new REPCommand(REP.REPCMD_INSERT,0,0,0,0,"m0"));
 			cmds.add(new REPCommand(REP.REPCMD_DELETE,0,0,0,0,"m0"));
 			//cmds.add(new REPCommand(REP.SMCMD_QUIT,0,0,0,0,""));
 		} else {
 			text = new Text(new String[0]);
 			cmds.add(new REPCommand(REP.SMCMD_JOIN,0,0,0,0,name));
-			cmds.add(new REPCommand(REP.REPCMD_INSERT,0,0,0,0,"c0"));
+			//cmds.add(new REPCommand(REP.REPCMD_INSERT,0,0,0,0,"c0"));
 			//cmds.add(new REPCommand(REP.REPCMD_DELETE,0,0,0,0,"c0"));
 		}
 	}