diff rep/handler/Editor.java @ 468:b800b33c6988

check quit2 and ackList
author one
date Mon, 11 Oct 2010 19:57:34 +0900
parents b13926e43c28
children e252d092b720
line wrap: on
line diff
--- a/rep/handler/Editor.java	Mon Oct 11 12:54:55 2010 +0900
+++ b/rep/handler/Editor.java	Mon Oct 11 19:57:34 2010 +0900
@@ -1,20 +1,22 @@
 package rep.handler;
 
 import java.io.IOException;
+import java.util.Comparator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.TreeSet;
 
 import rep.REP;
 import rep.REPCommand;
 import rep.ServerMainLoop;
 import rep.SessionManager;
+import rep.channel.REPLogger;
 import rep.channel.REPSelectionKey;
 import rep.channel.REPSocketChannel;
 import rep.optimizers.*;
 
 public class Editor extends Forwarder {
 
-	private Translator translator;
 	// REPCommands we are going to send to the next editor
 	private LinkedList<REPCommand> sentList = new LinkedList<REPCommand>();
 	// Expected acknowledge list
@@ -22,6 +24,15 @@
 	public LinkedList<REPCommand> waitingCommandInMerge= new LinkedList<REPCommand>();
 	private REPCommand quit_2=null;
 	private REPCommand preMergeCommand;
+
+	public REPCommandOptimizer optimizer;
+	private LinkedList<REPCommand> unMergedCmds;
+	private LinkedList<REPCommand> sentMergedList;
+	private TreeSet<REPCommand> sortedEditCmds;
+	boolean mergeAgain;
+	public REPLogger logger = SessionManager.logger;
+	boolean merge_mode = false;
+	
 	private boolean merging;
 	public static boolean noMergeMode=false;
 	static final boolean doOptimize = false;
@@ -33,13 +44,20 @@
 		super(manager, null);
 	}
 
+	public Editor(int eid, REPCommandOptimizer optimizer) {
+		super(null, null);
+		this.optimizer = optimizer;
+	}
+
 	public Editor(int editorNo, SessionManager manager,REPSocketChannel<REPCommand> channel){
 		super(editorNo,manager,channel);
 		eid = editorNo;
-		REPCommandOptimizer optimizer;
 		if (doOptimize) optimizer = new DeleteInsertOptimizer(); //タカノがつくったおぷてぃまいざ
 		else            optimizer = new NullOptimizer();         //なにもしないけどOptimizer.
-		translator = new Translator(eid,optimizer);
+
+		unMergedCmds = new LinkedList<REPCommand>();
+		mergeAgain = false;
+		sentMergedList = new LinkedList<REPCommand>();
 	}
 	
 	/*
@@ -83,6 +101,7 @@
 		       
 	 */
 
+
 	public void translate(REPCommand command){				
 		switch(command.cmd) {
 		case REPCMD_INSERT_ACK:
@@ -114,9 +133,9 @@
 		case REPCMD_MERGE_MARK:
 			if (command.eid == REP.MERGE_EID.id){
 				//マージコマンドが返ってきた
-				if(translator.checkMergeConflict(command)){
+				if(checkMergeConflict(command)){
 					//マージ中にエディタからの割り込みがあった場合
-					translator.getMergeAgain(this);
+					getMergeAgain(this);
 				}
 				checkEndMerge();
 				return;
@@ -133,7 +152,7 @@
 			}
 
 			//他のエディタからの編集コマンド
-			translator.transReceiveCmd(next,command);
+			transReceiveCmd(next,command);
 
 			sendEditorCommand(command);
 			return;
@@ -145,7 +164,7 @@
 	private void userEditorCommand(REPCommand command) {
 		//エディタからの新たな編集コマンド
 		if (next==this) return; // singleton case
-		translator.transSendCmd(command);
+		transSendCmd(command);
 		sendEditorCommand(command);
 		return;
 	}
@@ -212,7 +231,7 @@
 			super.write(command);
 			return;
 		}
-		if (translator==null || merging || isMerging()) return;
+		if (merging || isMerging()) return;
 		if (waitingCommandInMerge.size()>0) {
 			REPCommand command = waitingCommandInMerge.pollFirst();
 			ServerMainLoop.logger.writeLog("Editor"+eid+": send waiting comand="+command);
@@ -278,7 +297,6 @@
 		// Session Manager 側で、このeditorへの他のeditorからの
 		// 入力を止めて、merge にそなえる。merge は、eidtor 側から
 		// ACKが来てから始まる。
-		translator.startMerge(command);
 	}
 	
 	/**
@@ -316,22 +334,15 @@
 	@Override
 	public void setEID(int eid) {
 		this.eid = eid;
-		if (translator!=null)
-			translator.setEid(eid);
 	}
 	
 	public String toString(){
 		return ("Editor eid="+eid+" sid="+sid+" " + host  + ":" + file);
 	}
 
-	public boolean isMerging() {
-		return translator.isMerging();
-	}
-
-
 	void checkEndMerge() {
 		if (merging) {
-			if (translator.isMerging()) return;
+			if (isMerging()) return;
 			endMerge();
 			merging = false;
 		}
@@ -340,7 +351,17 @@
 
 
 	private void endMerge() {
-		translator.endMerge();
+		sortedEditCmds = null;
+		LinkedList<REPCommand>u = new LinkedList<REPCommand>();
+		boolean flag=true;
+		for(REPCommand command:unMergedCmds) {
+			if (command.cmd==REP.REPCMD_MERGE_MARK) {
+				flag = false;
+			}
+			if (flag) u.addLast(command);
+		}
+		unMergedCmds = u;
+
 		REPCommand mergeEnd = new REPCommand(REP.SMCMD_END_MERGE,sid,eid,seq(),0,"");
 		sendToEditor(mergeEnd);
 		checkAck(preMergeCommand);
@@ -349,6 +370,7 @@
 				sendAck(preMergeCommand);
 			}
 		} else {
+			ServerMainLoop.logger.writeLog("Editor"+eid+": send preMergeCommand "+preMergeCommand);
 			next.send(preMergeCommand);
 		}
 		// sentList.clear();
@@ -371,7 +393,7 @@
 	}
 
 	private boolean checkQuit() {
-		if (quit_2!=null && sentList.size()==0&&!isMerging() && waitingCommandInMerge.size()==0) {
+		if (quit_2!=null && ackList.size()==0 && sentList.size()==0&&!isMerging() && waitingCommandInMerge.size()==0) {
 			sendToEditor(quit_2);
 			quit_2 = null;
 			return true;
@@ -401,8 +423,8 @@
 		case SMCMD_START_MERGE_ACK:
 		{
 			// マージの処理と次のエディタへコマンドを送信する処理
-			translator.mergeAck();
-			if (!translator.merge(this, preMergeCommand)) {
+			mergeAck();
+			if (!merge(this, preMergeCommand)) {
 				// nothing to do, send END_MERGE
 				checkEndMerge();
 			}
@@ -507,5 +529,192 @@
 		send(command);
 	}
 
+	/**
+	 * New command from an editor
+	 * The command is sent to the next editor
+	 * @param cmd
+	 * @return translated command.
+	 */
+	public REPCommand transSendCmd(REPCommand cmd){
+		assert(cmd.eid==eid);
+		unMergedCmds.addLast(cmd);
+		
+		//マージ中にユーザから割り込みがあった場合
+		if(isMerging()){
+			mergeAgain = true;
+		}
+		
+		return cmd;
+	}
+	
+	/**
+	 * My command is returned from the session ring, and START_MERGE_ACK
+	 * is returned. At this
+	 * stage my writeQueue is empty, our editor is waiting for me.
+	 * Start merge process.
+	 * @param cmd
+	 */
+	public boolean merge(Editor editor, REPCommand prev){
+		logger.writeLog("beforeMerge"+eid+":"+unMergedCmds);
+		LinkedList<REPCommand> output = new LinkedList<REPCommand>();
+		LinkedList<REPCommand> newSentList = new LinkedList<REPCommand>();
+		// merge queue上にあるコマンドを全部undoコマンドするのと同時に
+		// sort したコマンド列を生成する
+		for( REPCommand cmd0 : unMergedCmds) {
+			output.addLast( createUndo(cmd0) );
+		}
+
+		sortedEditCmds = new TreeSet<REPCommand>(new REPCommandComparator(1));
+		logger.writeLog("sentList"+eid+":"+editor.getSentList());
+		boolean flag = false;
+		for( REPCommand cmd0 : editor.getSentList()) {
+			if (cmd0.cmd==REP.REPCMD_INSERT || cmd0.cmd==REP.REPCMD_DELETE) {
+				if (flag) {
+					output.addLast(cmd0);
+					newSentList.addLast(cmd0);
+				} else {
+					sortedEditCmds.add(cmd0);
+				}
+			}
+			if (cmd0.sid==prev.sid && cmd0.eid==prev.eid && cmd0.seq==prev.seq) { 
+				flag = true;
+				output.addAll(sortedEditCmds);
+				output.addLast(new REPCommand(REP.REPCMD_MERGE_MARK,0, editor.getSID(), REP.MERGE_EID.id, editor.seq(), ""));
+			}
+		}
+		if (!flag) {
+			output.addAll(sortedEditCmds);
+			output.addLast(new REPCommand(REP.REPCMD_MERGE_MARK,0, editor.getSID(), REP.MERGE_EID.id, editor.seq(), ""));
+		}
+		logger.writeLog("sortedMerge"+eid+":"+sortedEditCmds);
+		// unMerged command のdeleteのundo string は、この時点で使えない。
+		// Editor 側から送り返して来たものを使う必要がある。
+		unMergedCmds.clear();
+		logger.writeLog("outputMerge"+eid+":"+output);
+		editor.setSentList(newSentList);
+		return optimizedSend(editor,output);
+	}
+
+	/**
+	 * Sent optimized merged command list
+	 * @param editor 
+	 * @param output
+	 * @return if any sent commands output 
+	 */
+	public boolean optimizedSend(REPNode editor, LinkedList<REPCommand> output) {
+		/*
+		 * Optimized send の場合は、unMergedCommand のつじつまを合わせる必要がある。
+		 */
+		sentMergedList.clear();
+		List<REPCommand> output1 = optimizer.optimize(output);
+		if (output1.size()==0) {
+			merge_mode = false;
+			return false;
+		}
+		for(REPCommand c:output1) {
+			REPCommand m = new REPCommand(c);
+			m.setEID(REP.MERGE_EID.id);
+			m.setSEQID(editor.seq());
+			sentMergedList.addLast(m);
+			editor.sendToEditor(m);
+		}
+		logger.writeLog("OptimizedOutputMerge"+eid+":"+sentMergedList);
+		merge_mode = true;
+		return true;
+	}
+	
+	private REPCommand createUndo(REPCommand cmd){
+		REPCommand retCmd = new REPCommand(cmd);
+		if (cmd.cmd==REP.REPCMD_INSERT) {
+			retCmd.cmd=REP.REPCMD_DELETE;
+			retCmd.string="";
+		}
+		else if (cmd.cmd==REP.REPCMD_DELETE) retCmd.cmd=REP.REPCMD_INSERT;
+		return retCmd;
+	}
+
+	class REPCommandComparator implements Comparator<REPCommand>{
+		int base;
+		REPCommandComparator(int base) {
+			this.base = base;
+		}
+		public int compare(REPCommand o1, REPCommand o2) {
+			int eid1 = o1.eid-base; if (eid1<0) eid1 += Integer.MAX_VALUE;
+			int eid2 = o2.eid-base; if (eid2<0) eid2 += Integer.MAX_VALUE;
+			if ( eid1<eid2 ) return -1;
+			if ( eid1>eid2 ) return 1;
+			if ( o1.seq<o2.seq ) return -1;
+			if ( o1.seq>o2.seq ) return 1;
+			// assert(false);  // this can happen in MergedAgain case
+			return 0;
+		}
+	}
+	
+	/**
+	 * Translate Command  that was received from SeMa.
+	 * @param cmd the command to be translated.
+	 * @return translated command.
+	 */
+	public void transReceiveCmd(REPNode nextEditor,REPCommand cmd){
+		assert (cmd.eid != eid);
+		unMergedCmds.addFirst(cmd);
+	}
+
+	public void setEid(int _eid){
+		eid = _eid;
+	}
+
+	public boolean checkMergeConflict(REPCommand command) {
+		unMergedCmds.addFirst(command);
+		
+		REPCommand prev = sentMergedList.getFirst();
+		if (prev.seq==command.seq) {
+			// logger.writeLog("Input eid="+eid+"SentMergedList = "+sentMergedList);
+			sentMergedList.removeFirst();
+		} 
+		//  previous merge command may be returned
+
+		if(sentMergedList.size()==0 && !mergeAgain) {
+			merge_mode=false;
+		}
+		return mergeAgain;
+	}
+
+	public void getMergeAgain(Editor editor) {
+		if (sentMergedList.size()>0) return; //  wait for previous merge completion
+		
+		LinkedList<REPCommand> returnCommand = new LinkedList<REPCommand>();
+		for(REPCommand command : unMergedCmds) {
+			if (command.cmd==REP.REPCMD_INSERT||command.cmd==REP.REPCMD_DELETE)
+				returnCommand.add(createUndo(command));
+		}
+		returnCommand.addAll(sortedEditCmds);
+		returnCommand.addLast(new REPCommand(REP.REPCMD_MERGE_MARK,0, editor.getSID(), REP.MERGE_EID.id, editor.seq(), ""));
+		returnCommand.addAll(editor.getSentList());
+		unMergedCmds.clear();
+		logger.writeLog("MergeAgain "+eid+" ret="+returnCommand.size());
+		mergeAgain = false;
+		optimizedSend(editor, returnCommand);
+	}
+//
+//	public boolean isFinished() {
+//		if(unMergedCmds.size() > 0) return false;
+//		if(sentMergedList.size() > 0) return false;
+//		return true;
+//	}
+
+	public boolean isMerging() {
+		return merge_mode;
+	}
+
+	/**
+	 *  receive SMCMD_START_MERGE_ACK
+	 */
+	public void mergeAck() {
+		logger.writeLog("Editor"+eid+": START MERGE "+
+				((unMergedCmds.size()>0)?" and top of unMergedCmds = "+ unMergedCmds.getLast():""));
+		merge_mode = true;
+	}
+
 
 }