changeset 487:455df381449a

separate blocking and merging
author one
date Thu, 21 Oct 2010 22:35:37 +0900
parents 877aacde8651
children c49a86a7ab8f
files Todo rep/handler/Editor.java
diffstat 2 files changed, 62 insertions(+), 24 deletions(-) [+]
line wrap: on
line diff
--- a/Todo	Thu Oct 21 20:30:52 2010 +0900
+++ b/Todo	Thu Oct 21 22:35:37 2010 +0900
@@ -42,6 +42,11 @@
 
 かな。で、preMergedCommand の送信タイミングは? 戻って来た順に出してしまって良い?
 
+ACKも入り込んでるのはおかしいんですが... あ、違うな。一周した自コマンドをblockしてない?
+いや、merging のflagが、ちょっと早い感じ。二つのflagを一緒にしてしまったので。
+
+あと、mergeのundoのunMergeCommandをendMerge 時にtruncateしきれてない。
+
 Wed Oct 20 20:35:53 JST 2010
 
     Editor1    Editor2    Editor3 
--- a/rep/handler/Editor.java	Thu Oct 21 20:30:52 2010 +0900
+++ b/rep/handler/Editor.java	Thu Oct 21 22:35:37 2010 +0900
@@ -31,7 +31,8 @@
 	private TreeSet<REPCommand> sortedEditCmds;
 	boolean mergeAgain;
 	public REPLogger logger = SessionManager.logger;
-	boolean merging = false;
+	private boolean blocking = false;
+	private boolean merging = false;
 	private LinkedList<REPCommand> writeQueue = new LinkedList<REPCommand>();
 	private REPCommand mergeMark =new REPCommand(REP.REPCMD_MERGE_MARK,0,0, REP.MERGE_EID.id, 0, "");
 	
@@ -245,13 +246,13 @@
 	 */
 	@Override
 	public void send(REPCommand command) {
-		if (merging || isMerging() || waitingCommandInMerge.size()>0) {
+		if (blocking || isMerging() || waitingCommandInMerge.size()>0) {
 			waitingCommandInMerge.addLast(command);
 			ServerMainLoop.logger.writeLog("Editor eid:"+eid+" waitingCommandInMerge = "+waitingCommandInMerge);
 			return;
 		}
 		if (isMergeCommand(command)) {
-			merging = true;
+			blocking = true;
 			ServerMainLoop.logger.writeLog("Editor"+eid+": merging=true (send)"+command);
 		}
 		writeQueue.add(command);
@@ -268,13 +269,13 @@
 			super.write(command);
 			return;
 		}
-		if (merging || isMerging()) return;
+		if (blocking || isMerging()) return;
 		if (waitingCommandInMerge.size()>0) {
 			REPCommand command = waitingCommandInMerge.pollFirst();
 			ServerMainLoop.logger.writeLog("Editor"+eid+": send waiting comand="+command);
 			super.write(command);
 			if (isMergeCommand(command)) {
-				merging = true;
+				blocking = true;
 			}
 		}
 	}
@@ -341,7 +342,8 @@
 	private boolean checkAck(REPCommand command) {
 		REPCommand prev = null;
 		try {
-			if(isMerging() || ackList.size()==0) throw new Exception();
+			if(mergeMode!=MergeMode.Direct && isMerging()) throw new Exception();
+			if(ackList.size()==0) throw new Exception();
 			prev=ackList.remove(0);		
 			if (prev==null || prev.seq != command.seq || prev.eid!=command.eid) throw new Exception();
 		} catch (Exception n) {
@@ -349,6 +351,7 @@
 			String err = "Editor eid="+eid+" checkReturnedCommand() : command = " + command + " prev="+
 			(prev==null?"null":prev)+" ackList=";
 			err += ackList;
+			err += "merging="+isMerging();
 			ServerMainLoop.logger.writeLog(err);
 			assert(false);
 		}
@@ -378,6 +381,7 @@
 			if (isMerging()) return;
 			endMerge();
 			merging = false;
+			blocking = false;
 		}
 		if (quit_2!=null) checkQuit();
 	}
@@ -413,10 +417,7 @@
 	 * truncate sentList and unMergedCmds.
 	 */
 	private void truncateSentList(REPCommand commit) {
-		if (merging) {
-			preMergeCommand = commit;
-			return;
-		}
+		if (merging) 	return;
 		LinkedList<REPCommand>u = new LinkedList<REPCommand>();
 		for(REPCommand command:unMergedCmds) {
 			if (command.cmd==REP.REPCMD_MERGE_MARK) continue;
@@ -441,7 +442,7 @@
 	 * @param commit
 	 */
 	public void truncateUnMergedCmds(REPCommand commit) {
-		// assert(!merging);  merging でもすり抜ける場合がある
+		assert(!merging);
 		LinkedList<REPCommand>u = new LinkedList<REPCommand>();
 		for(REPCommand command:unMergedCmds) {
 			if (command.cmd==REP.REPCMD_MERGE_MARK) continue;
@@ -452,7 +453,9 @@
 		boolean flag = false;
 		LinkedList<REPCommand>s = new LinkedList<REPCommand>();
 		for(REPCommand command:sentList) {
-			if (command.isSameSeq(commit)) flag = true;
+			if (command.isSameSeq(commit)) {
+				flag = true; continue;
+			}
 			if (flag) s.addLast(command);
 		}
 		if (flag) sentList = s;
@@ -650,29 +653,51 @@
 		logger.writeLog("sentList"+eid+":"+sentList);
 		boolean flag = true;
 		for( REPCommand cmd0 : sentList ) {
-			if (mergeMode==MergeMode.Direct) {
-				if (cmd0.cmd==REP.REPCMD_MERGE_MARK) { 
-					flag = false;
-				}
+			if (cmd0.cmd==REP.REPCMD_MERGE_MARK) { 
+				flag = false;
+			}
+			if (cmd0.cmd==REP.REPCMD_INSERT || cmd0.cmd==REP.REPCMD_DELETE) {
+				if (flag)	sortedEditCmds.add(cmd0);
+				else newSentList.add(cmd0);
 			}
+		}
+		output.addLast(mergeMark);
+		output.addAll(sortedEditCmds);
+		output.addAll(newSentList);
+		logger.writeLog("sortedMerge"+eid+":"+sentList);
+		// unMerged command のdeleteのundo string は、この時点で使えない。
+		// Editor 側から送り返して来たものを使う必要がある。
+		unMergedCmds.clear();
+		logger.writeLog("outputMerge"+eid+":"+output);
+		return optimizedSend(this,output);
+	}
+
+	public boolean mergeEarly(REPCommand prev){
+		logger.writeLog("beforeMerge"+eid+":"+unMergedCmds);
+		LinkedList<REPCommand> output = new LinkedList<REPCommand>();
+		LinkedList<REPCommand> newSentList = new LinkedList<REPCommand>();
+		// merge queue上にあるコマンドを全部undoコマンドするのと同時に
+		// sort したコマンド列を生成する
+		for( REPCommand cmd0 : unMergedCmds) {
+			output.addLast( createUndo(cmd0) );
+		}
+
+		sortedEditCmds = new TreeSet<REPCommand>(new REPCommandComparator(1));
+		logger.writeLog("sentList"+eid+":"+sentList);
+		boolean flag = true;
+		for( REPCommand cmd0 : sentList ) {
 			if (cmd0.cmd==REP.REPCMD_INSERT || cmd0.cmd==REP.REPCMD_DELETE) {
 				if (flag)	sortedEditCmds.add(cmd0);
 				else newSentList.add(cmd0);
 			}
 		}
 		output.addAll(sortedEditCmds);
-		if (mergeMode==MergeMode.Direct) {
-			output.addAll(newSentList);
-			output.remove(mergeMark);
-			sentList.remove(mergeMark);
-		} else
-			output.addLast(mergeMark);
+		output.addLast(mergeMark);
 		logger.writeLog("sortedMerge"+eid+":"+sortedEditCmds);
 		// unMerged command のdeleteのundo string は、この時点で使えない。
 		// Editor 側から送り返して来たものを使う必要がある。
 		unMergedCmds.clear();
-		if (mergeMode!=MergeMode.Direct)
-			sentList = newSentList;
+		sentList = newSentList;
 		logger.writeLog("outputMerge"+eid+":"+output);
 		return optimizedSend(this,output);
 	}
@@ -691,6 +716,7 @@
 		List<REPCommand> output1 = optimizer.optimize(output);
 		if (output1.size()==0) {
 			merging = false;
+			blocking = false;
 			return false;
 		}
 		for(REPCommand c:output1) {
@@ -757,13 +783,20 @@
 		//  previous merge command may be returned
 
 		if(sentMergedList.size()==0 && !mergeAgain) {
+			// this is duplicate...
 			merging=false;
+			blocking = false;
 		}
 		return mergeAgain;
 	}
 
 	public void getMergeAgain() {
 		if (sentMergedList.size()>0) return; //  wait for previous merge completion
+		if (mergeMode==MergeMode.Direct) {
+			logger.writeLog("MergeAgain "+eid);
+			merge(preMergeCommand);
+			return;
+		}
 		
 		LinkedList<REPCommand> returnCommand = new LinkedList<REPCommand>();
 		for(REPCommand command : unMergedCmds) {