view rep/handler/Editor.java @ 490:168909388479

new merge continue
author one
date Sat, 23 Oct 2010 02:56:20 +0900
parents 4aba8ae3125b
children d2afd4efdd2d
line wrap: on
line source

package rep.handler;

import java.io.IOException;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.TreeSet;

import rep.REP;
import rep.REPCommand;
import rep.ServerMainLoop;
import rep.SessionManager;
import rep.channel.REPLogger;
import rep.channel.REPSelectionKey;
import rep.channel.REPSocketChannel;
import rep.optimizers.*;

public class Editor extends Forwarder {

	// REPCommands we are going to send to the next editor
	private LinkedList<REPCommand> sentList = new LinkedList<REPCommand>();
	// Expected acknowledge list
	private LinkedList<REPCommand> ackList = new LinkedList<REPCommand>();
	public LinkedList<REPCommand> waitingCommandInMerge= new LinkedList<REPCommand>();
	private REPCommand quit_2=null;
	private REPCommand preMergeCommand;

	public REPCommandOptimizer optimizer;
	private LinkedList<REPCommand> unMergedCmds;
	private LinkedList<REPCommand> sentMergedList;
	private TreeSet<REPCommand> sortedEditCmds;
	boolean mergeAgain;
	public REPLogger logger = SessionManager.logger;
	private boolean blocking = false;
	private boolean merging = false;
	private LinkedList<REPCommand> writeQueue = new LinkedList<REPCommand>();
	private REPCommand mergeMark =new REPCommand(REP.REPCMD_MERGE_MARK,0,0, REP.MERGE_EID.id, 0, "");
	
	public enum MergeMode {
		NoMerge,    // no merge 
		Slow,          // merge at Ack
		Early,         //  merge at returned command and Ack
		Direct        //  merge at incoming command
	}
	public static  MergeMode mergeMode = MergeMode.Direct;
	static final boolean doOptimize = false;

	public Editor(SessionManager manager,int editorNo){
		// no translator case
		super(manager, null);
	}

	public Editor(int eid, REPCommandOptimizer optimizer) {
		super(null, null);
		this.optimizer = optimizer;
	}

	public Editor(int editorNo, SessionManager manager,REPSocketChannel<REPCommand> channel){
		super(editorNo,manager,channel);
		eid = editorNo;
		if (doOptimize) optimizer = new DeleteInsertOptimizer(); //タカノがつくったおぷてぃまいざ
		else            optimizer = new NullOptimizer();         //なにもしないけどOptimizer.

		unMergedCmds = new LinkedList<REPCommand>();
		mergeAgain = false;
		sentMergedList = new LinkedList<REPCommand>();
	}
	
	/*
	 *  Merge Protocol
	(0) Editor へのコマンドは、ack 以外は直接 Editor へ送られてしまう。(next.send(cmd))
	     Editor から返ってくるコマンドをtranslatorが処理する。
	(1) Editor CommandをSession Ring 上に流し、それが戻って来るまでに、他のEditorから
	    受け取った Editor Command をキューに入れておく。
	        sentList              外に送り出したEditor Command
	        unMergedList       接続されたEditorのundo list (reverse order)
	        MergingSentList   Mergeするlist 。Mergeのやり直し用。
	Slow/Early
	(2) 戻って来たタイミングで、キュー上のEditor Commandを、eid とCommandの
	    順序を基にソートする。(self merge (Early))
	(3) 他のEditorにソートのタイミングを与えるために、Editor Command の
	    ack を、もう一周させる。
	(4) 他のEditorのCommandを受け取ってから、ack が来るまでのCommandをキューに
		入れておき、ack が来たら、eid とCommandの順序を基にソートする。(other merge (Slow))
	Direct
	(5)  他のEditor Command が来た時点で、すぐにmergeする
	(6)  自分のEditor Command が来て、
	          未確定の他のEditor Command があれば、それを確定(sentList/unMergedListから削除)
	          自分のEditor Command はsentListに追加 (unMergedListには既に入っている)
	(7)  Ackが来たら、そのEditor Command まで確定

		Editor には、ソートした編集結果になるように、それまで行なった編集をUndo
		して、ソートした編集結果を適用する。Undo が無駄な動作をしないように最適化する。
		*/
	
	    /*
		handle()   
		   セッションの処理 
		   manage()
		       編集コマンドは translate() へ
		       		一周して来た編集コマンドのACKは廃棄 (merge queue から削除)
		          一周して来た自分のコマンドならself merge
		       		他のエディタの編集コマンドのACK->other merge
		       			それ以外は、そのまま実行、merge queue へ格納
		          merge は checkReturnedCommand() から
		             startMerge() へ 
		        			まず、接続されている Editor に START_MERGE を送る
		        			邪魔されないように、他のcommand は block する
		   manager()		
		      START_MERGE_ACK が来たら、translator.mergeAck() で教えて、
		   		merge()->
		   			translator.checkOwnCommand() へ 
		   				ここで、sort されて、Merge Command をEditorへ送信 
		   			checkEndMerge()から
		   				endMerge() が呼ばれる。
		   					自分のエディタにEND_MERGE で Merge終了を通知 
		   					自分のコマンドは、ACKに変えて送信 (3)
		   					それ以外は、そのまま送信 (一周させる)
		       
	 */


	public void translate(REPCommand command){				
		switch(command.cmd) {
		case REPCMD_INSERT_ACK:
		case REPCMD_DELETE_ACK:
			if (command.eid==eid) {
				if (mergeMode==MergeMode.Slow) {
					checkReturnedCommand(command);
					checkQuit();
					return;
				} 
				// Second Phase が終わって同期が終了。
				// SessionManager.logger.writeLog("Complete "+command);
				checkAck(command);
				checkQuit();
				return;
			}
			if (mergeMode==MergeMode.Direct) {
				checkAck(command);
				truncateUnMergedCmds(command);
				ServerMainLoop.logger.writeLog("Editor"+eid+": send ackCommand "+command+report());
				next.send(command);
				checkQuit();
			} else
				checkReturnedCommand(command);
			return;
		case REPCMD_INSERT_USER:
			command.cmd = REP.REPCMD_INSERT;
			userEditorCommand(command);
			return;
		case REPCMD_DELETE_USER:
			command.cmd = REP.REPCMD_DELETE;
			userEditorCommand(command);
			return;
		case REPCMD_INSERT:
		case REPCMD_DELETE:
		case REPCMD_MERGE_MARK:
			if (command.eid == REP.MERGE_EID.id){
				//マージコマンドが返ってきた
				if(checkMergeConflict(command)){
					//マージ中にエディタからの割り込みがあった場合
					getMergeAgain();
				}
				checkEndMerge();
				return;
			} 
			if (command.eid == eid){ 
				// 編集コマンドが一周して来た
				if (mergeMode==MergeMode.Slow) {
					checkAck(command);
					sendAck(command);
				} else	if (mergeMode==MergeMode.Direct) {
					truncateUnMergedCmds(command);
					checkAck(command);
					sendAck(command);
				} else {
					checkReturnedCommand(command);
				}
				return;
			}

			//他のエディタからの編集コマンド
			transReceiveCmd(next,command);
			if (mergeMode==MergeMode.Direct)  { 
				sendEditorCommand(command);
				// Own commands may enter after here. To distinguish put mark here
				sentList.addLast(mergeMark );
				startMerge(command);
			} else
				sendEditorCommand(command);
			return;
		default:
			assert(false);
		}
	}

	private void userEditorCommand(REPCommand command) {
		//エディタからの新たな編集コマンド
		if (next==this) return; // singleton case
		transSendCmd(command);
		sendEditorCommand(command);
		if (mergeMode==MergeMode.Direct) {
			ServerMainLoop.logger.writeLog("Editor"+eid+": User Command Before "+command+report());
			truncateSentList(command,true);
			ServerMainLoop.logger.writeLog("Editor"+eid+": User Command After "+command+report());
		}
		return;
	}
	
//	private void checkDouble(List<REPCommand> sentList) {
//		if (sentList.size()==0) return;
//		int count = 0;
//		REPCommand f = sentList.get(0); 
//		for(REPCommand c:sentList) {
//			if (c.eid==f.eid&&c.seq==f.seq) {
//				count++;
//			}
//		}
//		assert(count==1);
//		if (true) return;
//		count = 0;
//		for(PacketSet c:waitingCommandInMerge) {
//			for(REPCommand g:sentList) { 
//				if (c.command.eid==g.eid&&c.command.seq==g.seq) {
//					count++;
//				}
//			}
//		}
//		assert(count==0);
//	}


	/**
	 * Sending to Editor and waiting Queue
	 *                                  +--------+
	 *      send() --> write() -> | Editor | -> handle() -> manager()
	 *                                  +--------+
	 *      waitingQueue      
	 *                    writeQueue
	 *                    
	 *      send()  は、他のEditor Node から呼ばれる
	 *      write()  は、内部で優先的に送信するのに用いる
	 *          writeQueue は、waitingQueue よりも常に先に実行される必要がある

	 *      Manageの送信キューはここでは使わない
	 *            send()   manage
	 */
	@Override
	public void send(REPCommand command) {
		if (blocking || isMerging() || waitingCommandInMerge.size()>0) {
			waitingCommandInMerge.addLast(command);
			ServerMainLoop.logger.writeLog("Editor eid:"+eid+" waitingCommandInMerge = "+waitingCommandInMerge);
			return;
		}
		if (isMergeCommand(command)) {
			blocking = true;
			ServerMainLoop.logger.writeLog("Editor"+eid+": merging=true (send)"+command);
		}
		writeQueue.add(command);
	}

	/**
	 * Check waiting command in merge 
	 *     periodically called from manager    
	 */
	public void checkWaitingCommandInMerge() {
		if (writeQueue.size()>0) {
			REPCommand command =writeQueue.pollFirst(); 
			ServerMainLoop.logger.writeLog("Editor"+eid+": write comand="+command);
			super.write(command);
			return;
		}
		if (blocking || isMerging()) return;
		if (waitingCommandInMerge.size()>0) {
			REPCommand command = waitingCommandInMerge.pollFirst();
			ServerMainLoop.logger.writeLog("Editor"+eid+": send waiting comand="+command);
			super.write(command);
			if (isMergeCommand(command)) {
				blocking = true;
			}
		}
	}
	/**
	 * 他のエディタへのコマンドの送信
	 * @param command
	 * 
	 * sendList にキープする必要がある。 
	 */
	private void sendEditorCommand(REPCommand command) {
		REPCommand keep = new REPCommand(command);
		sentList.add(keep);
		ackList.add(keep);
		//ServerMainLoop.logger.writeLog("Editor eid:"+eid+" sentList = "+sentList);
		assert(ackList.size()<limit);
		if (command.cmd==REP.REPCMD_DELETE) {
			// delete のundo用の文字列は、外に出す意味はない
			command.string=null;
		}
		next.send(command);
	}

	/**
	 * 一周して来たcommandの処理。
	 * 
	 *   INSERT/DELETEを受け取った時に、sentListに登録
	 *    INSERT_ACK/DELETE_ACKが来たら一周。そこで、Mergeする。
	 *    
	 *    自分が出したINSERT/DELETEが戻って来たら、ACKに変更して、Merge。 
	 * 
	 * 途中から参加した場合、自分が受けとってないcommandのACKが先に来ることが
	 * ある。それは、無視して良い。
	 * @param command
	 */
	void checkReturnedCommand(REPCommand command) {
			startMerge(command);
	}

	void startMerge(REPCommand command) {
		ServerMainLoop.logger.writeLog("Editor"+eid+": startMerge "+command);
		preMergeCommand = new REPCommand(command);
		// merge は必須だが、EditorのCommand実装をテストするには邪魔なので、off に出来るようにする。
		if (mergeMode==MergeMode.NoMerge) {
			checkQuit();
			endMerge();
			return;
		}
		// START_MERGE を送る
		//    送らないで良い場合もある?
		REPCommand cmd = new REPCommand(REP.SMCMD_START_MERGE,command.sid,REP.SM_EID.id,seq(),0,"");
		sendToEditor(cmd);
		// merging = true;
		// Session Manager 側で、このeditorへの他のeditorからの
		// 入力を止めて、merge にそなえるのは、ここでは間に合わないので、
		// send() で行っている。USER Command は、止められないが、問題ない。
		// merge は、eidtor 側からACKが来てから始まる。
	}

	/**
	 * sentList と ack を見比べて、正しい順序で来たかどうかを調べる。途中参加したEditorの場合は、Ackは
	 * 無視して良い。
	 * @param command
	 * @return
	 */
	private boolean checkAck(REPCommand command) {
		REPCommand prev = null;
		try {
			if(mergeMode!=MergeMode.Direct && isMerging()) throw new Exception();
			if(ackList.size()==0) throw new Exception();
			prev=ackList.remove(0);		
			if (prev==null || prev.seq != command.seq || prev.eid!=command.eid) throw new Exception();
		} catch (Exception n) {
			// should be more robust to allow communication failure
			String err = "Editor eid="+eid+" checkReturnedCommand() : command = " + command + " prev="+
			(prev==null?"null":prev)+" ackList=";
			err += ackList;
			err += "merging="+isMerging();
			ServerMainLoop.logger.writeLog(err);
			assert(false);
		}
		return true;
	}


	@Override
	public void setQuit2(REPCommand cmd) {
		quit_2 = cmd;
		checkQuit();
		// do not send quit2 until we received all pending
		// command
	}

	@Override
	public void setEID(int eid) {
		this.eid = eid;
	}
	
	public String toString(){
		return ("Editor eid="+eid+" sid="+sid+" " + host  + ":" + file);
	}

	void checkEndMerge() {
		if (blocking) {
			if (isMerging()) return;
			endMerge();
			blocking = false;
		}
		if (quit_2!=null) checkQuit();
	}


	private void endMerge() {
		REPCommand mergeEnd = new REPCommand(REP.SMCMD_END_MERGE,sid,eid,seq(),0,"");
		sendToEditor(mergeEnd);
		if (mergeMode==MergeMode.Direct) {
			REPCommand last = sentList.size()==0?null:sentList.getLast();
			ServerMainLoop.logger.writeLog("Editor"+eid+": EndMerge Before"+report());
			LinkedList<REPCommand> u = new LinkedList<REPCommand>();
			for(REPCommand c: unMergedCmds) {
				if (c.cmd==REP.REPCMD_MERGE_MARK) break;
				u.addLast(c);
			}
			unMergedCmds = u;
			if (last!=null && last.eid==eid && last.sid==sid)
				truncateSentList(last,false);
			sentList.remove(mergeMark);
			preMergeCommand = null;
			ServerMainLoop.logger.writeLog("Editor"+eid+": EndMerge "+report());
			return ;
		}
		sortedEditCmds = null;
		checkAck(preMergeCommand);
		if (preMergeCommand.eid==eid) {
			if (mergeMode==MergeMode.Early) {
				sendAck(preMergeCommand);
			}
		} else {
			ServerMainLoop.logger.writeLog("Editor"+eid+": send preMergeCommand "+preMergeCommand);
			next.send(preMergeCommand);
		}
		// sentList.clear();
		preMergeCommand = null;
	}

	/**
	 * User Editor Command Fix the command order of
	 * other editor commands except own command
	 * truncate sentList and unMergedCmds.
	 *   mode==false  Don't truncate unMergedCmds after merge mark 
	 */
	private void truncateSentList(REPCommand commit, boolean mode) {
		if (merging) 	return;
		LinkedList<REPCommand>u = new LinkedList<REPCommand>();
		boolean flag=true;
		for(REPCommand command:unMergedCmds) {
			if (command.cmd==REP.REPCMD_MERGE_MARK) {
				flag = mode;
				continue;
			}
			if (command.eid!=eid && command.eid!=REP.MERGE_EID.id) 	break;
			if (flag) u.addLast(command);
		}		
		unMergedCmds = u;
		LinkedList<REPCommand>s = new LinkedList<REPCommand>();
		for(REPCommand command:sentList) {
			if (command.eid!=eid) {
				s.clear();
				continue;
			}
			s.addLast(command);
		}
		sentList = s;
	}
	
	/**
	 * Returned command fixed command order. Remove from
	 * sentList and unMergedCmds
	 * @param commit
	 */
	public void truncateUnMergedCmds(REPCommand commit) {
		assert(!merging);
		LinkedList<REPCommand>u = new LinkedList<REPCommand>();
		for(REPCommand command:unMergedCmds) {
			if (command.cmd==REP.REPCMD_MERGE_MARK) continue;
			if (command.isSameSeq(commit)) 	break;
			u.addLast(command);
		}		
		unMergedCmds = u;
		boolean flag = false;
		LinkedList<REPCommand>s = new LinkedList<REPCommand>();
		for(REPCommand command:sentList) {
			if (command.isSameSeq(commit)) {
				flag = true; continue;
			}
			if (flag) s.addLast(command);
		}
		if (flag) sentList = s;
	}

	/**
	 * Send ack command after receiving self command
	 * @param command
	 */
	private void sendAck(REPCommand command) {
		REPCommand keep = new REPCommand(command);
		// First Phase End, send ACK
		switch(keep.cmd) {
		case REPCMD_INSERT: keep.cmd = REP.REPCMD_INSERT_ACK;break;
		case REPCMD_DELETE: keep.cmd = REP.REPCMD_DELETE_ACK;break;
		default: assert(false);
		}
		ackList.addLast(keep);
		ServerMainLoop.logger.writeLog("Editor"+eid+": sendAck sentList = "+sentList);
		assert(ackList.size()<limit);
		keep.string = "";
		next.send(keep);
	}

	private boolean checkQuit() {
		if (quit_2!=null && ackList.size()==0 &&!isMerging() && waitingCommandInMerge.size()==0) {
			if (emptySentList() ){
			sendToEditor(quit_2);
			quit_2 = null;
			return true;
			}
		}
		return false;
	}

	private boolean emptySentList() {
		return sentList.size()==0||(sentList.size()==1 && sentList.getFirst().cmd==REP.REPCMD_MERGE_MARK);
	}

	@Override
	public boolean manage(REPCommand command) {
		
		
		switch(command.cmd){
		// Editor Command

		case REPCMD_DELETE:
		case REPCMD_INSERT:
		case REPCMD_DELETE_USER:
		case REPCMD_INSERT_USER:
		case REPCMD_DELETE_ACK:
		case REPCMD_INSERT_ACK:
		case REPCMD_MERGE_MARK:
		{
			translate(command);
			break;
		}

		case SMCMD_START_MERGE_ACK:
		{
			// マージの処理と次のエディタへコマンドを送信する処理
			mergeAck();
			if (!merge(preMergeCommand)) {
				// nothing to do, send END_MERGE
				checkEndMerge();
			}
			break;
		}
		
		case SMCMD_SYNC:
			if (isMaster()) 
				sendToEditor(command);
			else
				next.send(command);
			
		case SMCMD_QUIT:
		{
			next.send(command);
			break;
		}
		case SMCMD_QUIT_2:
		{
			// QUIT_2 is returned.
			if (command.eid!=eid) {
				// stop this editor unless this is the start, starter will stopped
				// by QUIT_2_ACK
				manager.remove(this);
			}
			// don't send quit_2 directly to the editor until all pending
			// merge is processed.
			//   this does not work in distributed case.
			if (next.isDirect())
				next.setQuit2(command);
			else
				next.send(command);
			break;
		}
		case SMCMD_QUIT_2_ACK:
		{
			manager.remove(this);
			break;
		}
		default:
			assert false;
			return false;
		}
		return true;
	}
	
	
	private boolean isMergeCommand(REPCommand command) {
		if (mergeMode==MergeMode.Direct) 
			return (command.eid!=eid&&command.cmd==REP.REPCMD_INSERT || command.cmd==REP.REPCMD_DELETE);
		switch(command.cmd) {
		case REPCMD_INSERT: case REPCMD_DELETE:
			return mergeMode==MergeMode.Slow?false:command.eid==eid;
		case REPCMD_INSERT_ACK: case REPCMD_DELETE_ACK:
			return mergeMode==MergeMode.Slow?true:command.eid!=eid;
		}
		return false;
	}
	
	public void sendToEditor(REPCommand command) {
		writeQueue.add(command);
	}

	@Override
	public void handle(REPCommand command, REPSelectionKey<REPCommand> key) throws IOException {
		if (command.cmd==REP.SMCMD_JOIN||command.cmd==REP.SMCMD_PUT) {
			// assert false;
			// 一つのエディタ上に複数のセッションが作られた場合。
			// 若干問題があるらしい
			next = new Forwarder(manager,next.channel);
			REPNode first = new FirstConnector(manager,channel);
			first.handle(command, key);
			key.attach(new Dispatcher(manager,channel));
			return;
		}
		if (manager.sessionManage(this, command)) return;
		// ServerMainLoop.logger.writeLog("Editor"+eid+": handle command="+command);
		manage(command);
	}

	@Override
	public void cancel(REPSocketChannel<REPCommand> socketChannel) {
		manager.remove(socketChannel);
	}

	public boolean isMaster() {
		return mode==REP.SMCMD_PUT;
	}

	
	/* Handle special case first, usually these cases
	 * are handled in the next Editor in a session manager, but 
	 * it is forwarded here.
	 */
	public void forwardedCommandManage(REPCommand command) {
		if (command.cmd==REP.SMCMD_QUIT_2) {
			// we have to wait next editor's finishing before sending this.
			// this is odd, but the editor itself does not know it's merging
			// state. Only this session manager knows it.
			setQuit2(command);
			return;
		} 
		send(command);
	}

	/**
	 * New command from an editor
	 * The command is sent to the next editor
	 * @param cmd
	 * @return translated command.
	 */
	public REPCommand transSendCmd(REPCommand cmd){
		assert(cmd.eid==eid);
		unMergedCmds.addFirst(cmd);
		
		//マージ中にユーザから割り込みがあった場合
		if(isMerging()){
			logger.writeLog("mergeAgain"+eid+":"+cmd);
			mergeAgain = true;
		}
		
		return cmd;
	}
	
	/**
	 * My command is returned from the session ring, and START_MERGE_ACK
	 * is returned. At this
	 * stage my writeQueue is empty, our editor is waiting for me.
	 * Start merge process.
	 * @param cmd
	 */
	public boolean merge(REPCommand prev){
		logger.writeLog("beforeMerge"+eid+":"+unMergedCmds);
		LinkedList<REPCommand> output = new LinkedList<REPCommand>();
		LinkedList<REPCommand> newSentList = new LinkedList<REPCommand>();
		// merge queue上にあるコマンドを全部undoコマンドするのと同時に
		// sort したコマンド列を生成する
		for( REPCommand cmd0 : unMergedCmds) {
			if (cmd0.cmd==REP.REPCMD_INSERT || cmd0.cmd==REP.REPCMD_DELETE) 
				output.addLast( createUndo(cmd0) );
		}

		sortedEditCmds = new TreeSet<REPCommand>(new REPCommandComparator(1));
		logger.writeLog("sentList"+eid+":"+sentList);
		boolean flag = true;
		for( REPCommand cmd0 : sentList ) {
			if (cmd0.cmd==REP.REPCMD_MERGE_MARK) { 
				flag = false;
			}
			if (cmd0.cmd==REP.REPCMD_INSERT || cmd0.cmd==REP.REPCMD_DELETE) {
				if (flag)	sortedEditCmds.add(cmd0);
				else newSentList.add(cmd0);
			}
		}
		output.addLast(mergeMark);
		output.addAll(sortedEditCmds);
		output.addAll(newSentList);
		logger.writeLog("sortedMerge"+eid+":"+sentList);
		// unMerged command のdeleteのundo string は、この時点で使えない。
		// Editor 側から送り返して来たものを使う必要がある。
		unMergedCmds.clear();
		logger.writeLog("outputMerge"+eid+":"+output);
		return optimizedSend(this,output);
	}

	public boolean mergeEarly(REPCommand prev){
		logger.writeLog("beforeMerge"+eid+":"+unMergedCmds);
		LinkedList<REPCommand> output = new LinkedList<REPCommand>();
		LinkedList<REPCommand> newSentList = new LinkedList<REPCommand>();
		// merge queue上にあるコマンドを全部undoコマンドするのと同時に
		// sort したコマンド列を生成する
		for( REPCommand cmd0 : unMergedCmds) {
			output.addLast( createUndo(cmd0) );
		}

		sortedEditCmds = new TreeSet<REPCommand>(new REPCommandComparator(1));
		logger.writeLog("sentList"+eid+":"+sentList);
		boolean flag = true;
		for( REPCommand cmd0 : sentList ) {
			if (cmd0.cmd==REP.REPCMD_INSERT || cmd0.cmd==REP.REPCMD_DELETE) {
				if (flag)	sortedEditCmds.add(cmd0);
				else newSentList.add(cmd0);
			}
		}
		output.addAll(sortedEditCmds);
		output.addLast(mergeMark);
		logger.writeLog("sortedMerge"+eid+":"+sortedEditCmds);
		// unMerged command のdeleteのundo string は、この時点で使えない。
		// Editor 側から送り返して来たものを使う必要がある。
		unMergedCmds.clear();
		sentList = newSentList;
		logger.writeLog("outputMerge"+eid+":"+output);
		return optimizedSend(this,output);
	}

	/**
	 * Sent optimized merged command list
	 * @param editor 
	 * @param output
	 * @return if any sent commands output 
	 */
	public boolean optimizedSend(REPNode editor, LinkedList<REPCommand> output) {
		/*
		 * Optimized send の場合は、unMergedCommand のつじつまを合わせる必要がある。
		 */
		sentMergedList.clear();
		List<REPCommand> output1 = optimizer.optimize(output);
		if (output1.size()==0) {
			merging = false;
			return false;
		}
		for(REPCommand c:output1) {
			REPCommand m = new REPCommand(c);
			m.setEID(REP.MERGE_EID.id);
			m.setSEQID(editor.seq());
			sentMergedList.addLast(m);
			editor.sendToEditor(m);
		}
		logger.writeLog("OptimizedOutputMerge"+eid+":"+sentMergedList);
		return true;
	}
	
	private REPCommand createUndo(REPCommand cmd){
		REPCommand retCmd = new REPCommand(cmd);
		if (cmd.cmd==REP.REPCMD_INSERT) {
			retCmd.cmd=REP.REPCMD_DELETE;
			retCmd.string="";
		}
		else if (cmd.cmd==REP.REPCMD_DELETE) retCmd.cmd=REP.REPCMD_INSERT;
		return retCmd;
	}

	class REPCommandComparator implements Comparator<REPCommand>{
		int base;
		REPCommandComparator(int base) {
			this.base = base;
		}
		public int compare(REPCommand o1, REPCommand o2) {
			int eid1 = o1.eid-base; if (eid1<0) eid1 += Integer.MAX_VALUE;
			int eid2 = o2.eid-base; if (eid2<0) eid2 += Integer.MAX_VALUE;
			if ( eid1<eid2 ) return -1;
			if ( eid1>eid2 ) return 1;
			if ( o1.seq<o2.seq ) return -1;
			if ( o1.seq>o2.seq ) return 1;
			// assert(false);  // this can happen in MergedAgain case
			return 0;
		}
	}
	
	/**
	 * Translate Command  that was received from SeMa.
	 * @param cmd the command to be translated.
	 * @return translated command.
	 */
	public void transReceiveCmd(REPNode nextEditor,REPCommand cmd){
		assert (cmd.eid != eid);
		unMergedCmds.addFirst(new REPCommand(cmd));
	}

	public void setEid(int _eid){
		eid = _eid;
	}

	public boolean checkMergeConflict(REPCommand command) {
		unMergedCmds.addFirst(new REPCommand(command));
		
		REPCommand prev = sentMergedList.getFirst();
		if (prev.seq==command.seq) {
			// logger.writeLog("Input eid="+eid+"SentMergedList = "+sentMergedList);
			sentMergedList.removeFirst();
		} 
		//  previous merge command may be returned

		if(sentMergedList.size()==0 && !mergeAgain) {
			merging=false;
		}
		return mergeAgain;
	}

	public void getMergeAgain() {
		if (sentMergedList.size()>0) return; //  wait for previous merge completion
		if (mergeMode==MergeMode.Direct) {
			logger.writeLog("MergeAgain "+eid);
			mergeAgain = false;
			merge(preMergeCommand);
			return;
		}
		
		LinkedList<REPCommand> returnCommand = new LinkedList<REPCommand>();
		for(REPCommand command : unMergedCmds) {
			if (command.cmd==REP.REPCMD_INSERT||command.cmd==REP.REPCMD_DELETE)
				returnCommand.add(createUndo(command));
		}
		returnCommand.addAll(sortedEditCmds);
		returnCommand.addLast(new REPCommand(REP.REPCMD_MERGE_MARK,0, sid, REP.MERGE_EID.id, seq(), ""));
		returnCommand.addAll(sentList);
		unMergedCmds.clear();
		logger.writeLog("MergeAgain "+eid+" ret="+returnCommand.size());
		mergeAgain = false;
		optimizedSend(this, returnCommand);
	}
//
//	public boolean isFinished() {
//		if(unMergedCmds.size() > 0) return false;
//		if(sentMergedList.size() > 0) return false;
//		return true;
//	}

	public boolean isMerging() {
		return merging;
	}

	/**
	 *  receive SMCMD_START_MERGE_ACK
	 */
	public void mergeAck() {
		logger.writeLog("Editor"+eid+": START MERGE "+
				((unMergedCmds.size()>0)?" unMergedCmds = "+ unMergedCmds : ""));
		// これ以降のUser command の割り込みはmergeのやり直しが必要
		merging = true;
	}

	/**
	 * Dead lock reporter
	 */
	public String report() {
		String s = "";
		s += "\n  sentList:"+sentList;
		s += "\n  ackList:"+ackList;
		s += "\n  unMergedList:"+unMergedCmds;
		s += "\n  mergeMode=:"+merging;
		return s;
	}

}