changeset 460:ef70109af810

self writeQueue and waitingQueue
author one
date Fri, 24 Sep 2010 17:05:19 +0900
parents 66c4f6b29baf
children e7eeb8be0de1
files rep/ServerMainLoop.java rep/handler/Editor.java
diffstat 2 files changed, 52 insertions(+), 90 deletions(-) [+]
line wrap: on
line diff
--- a/rep/ServerMainLoop.java	Fri Sep 24 03:24:06 2010 +0900
+++ b/rep/ServerMainLoop.java	Fri Sep 24 17:05:19 2010 +0900
@@ -120,22 +120,6 @@
 		}
 		return false;
 	}
-
-	/**
-	 * Move all command to the editor from manager's writing queue to the editor's waiting queue
-	 * @param editor
-	 */
-	public void getWriteQueue(Editor editor) {
-		LinkedList<PacketSet> w = new LinkedList<PacketSet>();
-		for(PacketSet p:writeQueue) {
-			if (p.channel==editor) {
-				editor.waitingCommandInMerge.addLast(p);
-			} else {
-				w.addLast(p);
-			}
-		}
-		writeQueue = w;
-	}
 	/**
 	 * Debug message
 	 * @param p
--- a/rep/handler/Editor.java	Fri Sep 24 03:24:06 2010 +0900
+++ b/rep/handler/Editor.java	Fri Sep 24 17:05:19 2010 +0900
@@ -18,14 +18,15 @@
 	private Translator translator;
 	// REPCommands we are going to send to the next editor
 	private LinkedList<REPCommand> sentList = new LinkedList<REPCommand>();
-	public LinkedList<PacketSet> waitingCommandInMerge= new LinkedList<PacketSet>();
+	public LinkedList<REPCommand> waitingCommandInMerge= new LinkedList<REPCommand>();
 	private REPCommand quit2=null;
 	private REPCommand preMergeCommand;
 	private boolean merging;
 	private REPCommand mergeMark = new REPCommand(REP.SMCMD_START_MERGE, 0,0, 0, 0, "");
 	public static boolean noMergeMode=false;
 	static final boolean doOptimize = false;
-	private Forwarder toEditor; 
+	private Forwarder toEditor;
+	private LinkedList<REPCommand> writeQueue = new LinkedList<REPCommand>(); 
 
 	public Editor(SessionManager manager,int editorNo){
 		// no translator case
@@ -162,30 +163,56 @@
 //		assert(count==0);
 //	}
 
-	private boolean waitingRequired(REPCommand command, REPSocketChannel<REPCommand> channel) {
-		if (hasWaitingCommand()) {
-			// We cannot do this operation before watingCommandQueue.
-			addWaitingCommand(new PacketSet(this, new REPCommand(command)));
-			return true;
-		} else if (isMerging()) { 
-			addWaitingCommand(new PacketSet(this, new REPCommand(command)));
-			return true;
-		} 
-		//ServerMainLoop.logger.writeLog("Editor eid:"+eid+" no waiting");
-		return false;
+	/**
+	 * Sending to Editor and waiting Queue
+	 *                                  +--------+
+	 *      send() --> write() -> | Editor | -> handle() -> manager()
+	 *                                  +--------+
+	 *      waitingQueue      
+	 *                    writeQueue
+	 *                    
+	 *      send()  は、他のEditor Node から呼ばれる
+	 *      write()  は、内部で優先的に送信するのに用いる
+	 *          writeQueue は、waitingQueue よりも常に先に実行される必要がある
+
+	 *      Manageの送信キューはここでは使わない
+	 *            send()   manage
+	 */
+	@Override
+	public void send(REPCommand command) {
+		if (merging || isMerging() || waitingCommandInMerge.size()>0) {
+			waitingCommandInMerge.addLast(command);
+			ServerMainLoop.logger.writeLog("Editor eid:"+eid+" waitingCommandInMerge = "+waitingCommandInMerge);
+			return;
+		}
+		if (isMergeCommand(command)) {
+			merging = true;
+			ServerMainLoop.logger.writeLog("Editor"+eid+": merging=true (send)"+command);
+		}
+		writeQueue.add(command);
 	}
 
-    public void addWaitingCommand(PacketSet set) {
-//		if (preMergeCommand!=null) {
-//			if (preMergeCommand.eid==set.command.eid
-//					&& preMergeCommand.seq==set.command.seq) {
-//				assert(false);
-//			}
-//		}
-    	waitingCommandInMerge.addLast(set);
-		ServerMainLoop.logger.writeLog("Editor eid:"+eid+" waitingCommandInMerge = "+waitingCommandInMerge);
+	/**
+	 * Check waiting command in merge 
+	 *     periodically called from manager    
+	 */
+	public void checkWaitingCommandInMerge() {
+		if (writeQueue.size()>0) {
+			REPCommand command =writeQueue.pollFirst(); 
+			ServerMainLoop.logger.writeLog("Editor"+eid+": write comand="+command);
+			super.write(command);
+			return;
+		}
+		if (translator==null || merging || isMerging()) return;
+		if (waitingCommandInMerge.size()>0) {
+			REPCommand command = waitingCommandInMerge.pollFirst();
+			ServerMainLoop.logger.writeLog("Editor"+eid+": send waiting comand="+command);
+			super.write(command);
+			if (isMergeCommand(command)) {
+				merging = true;
+			}
+		}
 	}
-
 	/**
 	 * 他のエディタへのコマンドの送信
 	 * @param command
@@ -327,7 +354,7 @@
 	}
 
 	private boolean checkQuit() {
-		if (quit2!=null && sentList.size()==1&&!isMerging() && !hasWaitingCommand()) {
+		if (quit2!=null && sentList.size()==1&&!isMerging() && waitingCommandInMerge.size()==0) {
 			sendToEditor(quit2);
 			quit2 = null;
 			return true;
@@ -404,24 +431,6 @@
 		return true;
 	}
 	
-	/**
-	 * write command to the editor
-	 *   called from another Editor instance such as next.send(command)
-	 */
-	@Override
-	public void write(REPCommand command) {
-		if (merging || waitingCommandInMerge.size()>0) {
-				addWaitingCommand(new PacketSet(this, new REPCommand(command)));
-				return;
-		}
-		if (!waitingRequired(command,channel)) {
-			if (isMergeCommand(command)) {
-				merging = true;
-				ServerMainLoop.logger.writeLog("Editor"+eid+": merging=true "+command);
-			}
-			super.write(command);
-		}
-	}
 	
 	private boolean isMergeCommand(REPCommand command) {
 		switch(command.cmd) {
@@ -434,7 +443,7 @@
 	}
 	
 	public void sendToEditor(REPCommand command) {
-		toEditor.send(command);
+		writeQueue.add(command);
 	}
 
 	@Override
@@ -480,36 +489,5 @@
 		send(command);
 	}
 
-	/**
-	 * Check waiting command in merge
-	 * @return true if there is a processed waiting command
-	 * @throws IOException
-	 */
-	public void checkWaitingCommandInMerge() {
-		if (translator==null||isMerging()) return;
-		while(waitingCommandInMerge.size()>0) {
-			if (merging||isMerging()) return;
-	    	// to preserve command order, move all elements from manager's writing queue
-	    	manager.getWriteQueue(this);
-			PacketSet p = waitingCommandInMerge.remove(0);
-			REPCommand command = p.command;
-			try {
-				ServerMainLoop.logger.writeLog("Editor"+eid+": resend after merge comand="+command);
-				toEditor.send(command);
-				if (isMergeCommand(command)) {
-					merging = true;
-					return;
-				}
-			} catch (Exception e1) {
-				assert false;
-				manager.close(p.channel.channel);
-				return;
-			}
-		}
-	}
 
-
-	public boolean hasWaitingCommand() {
-		return waitingCommandInMerge.size()>0;
-	}
 }