view rep/handler/Editor.java @ 497:6d7e284f22dc

global sequence mode
author one
date Sun, 24 Oct 2010 14:16:40 +0900
parents bd76f7e39247
children 138f3b33aa5e
line wrap: on
line source

package rep.handler;

import java.io.IOException;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.TreeSet;

import rep.REP;
import rep.REPCommand;
import rep.ServerMainLoop;
import rep.SessionManager;
import rep.channel.REPLogger;
import rep.channel.REPSelectionKey;
import rep.channel.REPSocketChannel;
import rep.optimizers.*;

public class Editor extends Forwarder {

	// REPCommands we are going to send to the next editor
	private LinkedList<REPCommand> sentList = new LinkedList<REPCommand>();
	// Expected acknowledge list
	private LinkedList<REPCommand> ackList = new LinkedList<REPCommand>();
	public LinkedList<REPCommand> waitingCommandInMerge= new LinkedList<REPCommand>();
	private REPCommand quit_2=null;
	private REPCommand preMergeCommand;

	public REPCommandOptimizer optimizer;
	private LinkedList<REPCommand> sentMergedList;
	private TreeSet<REPCommand> sortedEditCmds;
	boolean mergeAgain;
	public REPLogger logger = SessionManager.logger;
	private boolean blocking = false;
	private boolean merging = false;
	private LinkedList<REPCommand> writeQueue = new LinkedList<REPCommand>();
	private REPCommand mergeMark =new REPCommand(REP.REPCMD_MERGE_MARK,0,0, REP.MERGE_EID.id, 0, "");
	
	public enum MergeMode {
		NoMerge,    // no merge 
		Slow,          // merge at Ack
		Early,         //  merge at returned command and Ack
		Direct        //  merge at incoming command
	}
	public static  MergeMode mergeMode = MergeMode.Direct;
	static final boolean doOptimize = false;

	public Editor(SessionManager manager,int editorNo){
		// no translator case
		super(manager, null);
	}

	public Editor(int eid, REPCommandOptimizer optimizer) {
		super(null, null);
		this.optimizer = optimizer;
	}

	public Editor(int editorNo, SessionManager manager,REPSocketChannel<REPCommand> channel){
		super(editorNo,manager,channel);
		eid = editorNo;
		if (doOptimize) optimizer = new DeleteInsertOptimizer(); //タカノがつくったおぷてぃまいざ
		else            optimizer = new NullOptimizer();         //なにもしないけどOptimizer.

		mergeAgain = false;
		sentMergedList = new LinkedList<REPCommand>();
	}
	
	/*
	 *  Merge Protocol
	(0) Editor へのコマンドは、ack 以外は直接 Editor へ送られてしまう。(next.send(cmd))
	     Editor から返ってくるコマンドをtranslatorが処理する。
	(1) Editor CommandをSession Ring 上に流し、それが戻って来るまでに、他のEditorから
	    受け取った Editor Command をキューに入れておく。
	        sentList              外に送り出したEditor Command
	        MergingSentList   Mergeするlist 。Mergeのやり直し用。
	Slow/Early
	(2) 戻って来たタイミングで、キュー上のEditor Commandを、eid とCommandの
	    順序を基にソートする。(self merge (Early))
	(3) 他のEditorにソートのタイミングを与えるために、Editor Command の
	    ack を、もう一周させる。
	(4) 他のEditorのCommandを受け取ってから、ack が来るまでのCommandをキューに
		入れておき、ack が来たら、eid とCommandの順序を基にソートする。(other merge (Slow))
	Direct
	       seq = gseq*limit + lseq
	(5)  他のEditor Command が来た時点で、すぐにmergeする。  gseqを他コマンドのgseqよりも
	  大きくなるように設定。sentListに追加  
	(6)  自分のEditor Command はsentListに追加
	(7)  Ackが来たら、そのEditor Command まで確定 (sentList/unMergedListから削除)

		Editor には、ソートした編集結果になるように、それまで行なった編集をUndo
		して、ソートした編集結果を適用する。Undo が無駄な動作をしないように最適化する。
		*/
	
	    /*
		handle()   
		   セッションの処理 
		   manage()
		       編集コマンドは translate() へ
		       		一周して来た編集コマンドのACKは廃棄 (merge queue から削除)
		          一周して来た自分のコマンドならself merge
		       		他のエディタの編集コマンドのACK->other merge
		       			それ以外は、そのまま実行、merge queue へ格納
		          merge は checkReturnedCommand() から
		             startMerge() へ 
		        			まず、接続されている Editor に START_MERGE を送る
		        			邪魔されないように、他のcommand は block する
		   manager()		
		      START_MERGE_ACK が来たら、translator.mergeAck() で教えて、
		   		merge()->
		   			translator.checkOwnCommand() へ 
		   				ここで、sort されて、Merge Command をEditorへ送信 
		   			checkEndMerge()から
		   				endMerge() が呼ばれる。
		   					自分のエディタにEND_MERGE で Merge終了を通知 
		   					自分のコマンドは、ACKに変えて送信 (3)
		   					それ以外は、そのまま送信 (一周させる)
		       
	 */


	public void translate(REPCommand command){				
		switch(command.cmd) {
		case REPCMD_INSERT_ACK:
		case REPCMD_DELETE_ACK:
			if (command.eid==eid) {
				if (mergeMode==MergeMode.Slow) {
					checkReturnedCommand(command);
					checkQuit();
					return;
				} 
				// Second Phase が終わって同期が終了。
				// SessionManager.logger.writeLog("Complete "+command);
				checkAck(command);
				checkQuit();
				return;
			}
			if (mergeMode==MergeMode.Direct) {
				checkAck(command);
				truncateUnMergedCmds(command);
				ServerMainLoop.logger.writeLog("Editor"+eid+": send ackCommand "+command+report());
				next.send(command);
				checkQuit();
			} else
				checkReturnedCommand(command);
			return;
		case REPCMD_INSERT_USER:
			command.cmd = REP.REPCMD_INSERT;
			userEditorCommand(command);
			return;
		case REPCMD_DELETE_USER:
			command.cmd = REP.REPCMD_DELETE;
			userEditorCommand(command);
			return;
		case REPCMD_INSERT:
		case REPCMD_DELETE:
		case REPCMD_MERGE_MARK:
			if (command.eid == REP.MERGE_EID.id){
				//マージコマンドが返ってきた
				if(checkMergeConflict(command)){
					//マージ中にエディタからの割り込みがあった場合
					getMergeAgain();
				}
				checkEndMerge();
				return;
			} 
			if (command.eid == eid){ 
				// 編集コマンドが一周して来た
				if (mergeMode==MergeMode.Slow) {
					checkAck(command);
					sendAck(command);
				} else	if (mergeMode==MergeMode.Direct) {
					truncateUnMergedCmds(command);
					checkAck(command);
					sendAck(command);
				} else {
					checkReturnedCommand(command);
				}
				return;
			}

			//他のエディタからの編集コマンド
			transReceiveCmd(next,command);
			if (mergeMode==MergeMode.Direct)  { 
				sendEditorCommand(command);
				// Own commands may enter after here. To distinguish put mark here
				sentList.addLast(mergeMark );
				startMerge(command);
			} else
				sendEditorCommand(command);
			return;
		default:
			assert(false);
		}
	}

	
	/**
	 * 	エディタからの新たな編集コマンド
	 * @param command
	 */
	private void userEditorCommand(REPCommand command) {
		if (next==this) return; // singleton case
		transSendCmd(command);
		sendEditorCommand(command);
		if (mergeMode==MergeMode.Direct) {
			ServerMainLoop.logger.writeLog("Editor"+eid+": User Command Before "+command+report());
			truncateSentList(command,true);
			ServerMainLoop.logger.writeLog("Editor"+eid+": User Command After "+command+report());
		}
		return;
	}
	
//	private void checkDouble(List<REPCommand> sentList) {
//		if (sentList.size()==0) return;
//		int count = 0;
//		REPCommand f = sentList.get(0); 
//		for(REPCommand c:sentList) {
//			if (c.eid==f.eid&&c.seq==f.seq) {
//				count++;
//			}
//		}
//		assert(count==1);
//		if (true) return;
//		count = 0;
//		for(PacketSet c:waitingCommandInMerge) {
//			for(REPCommand g:sentList) { 
//				if (c.command.eid==g.eid&&c.command.seq==g.seq) {
//					count++;
//				}
//			}
//		}
//		assert(count==0);
//	}


	/**
	 * Sending to Editor and waiting Queue
	 *                                  +--------+
	 *      send() --> write() -> | Editor | -> handle() -> manager()
	 *                                  +--------+
	 *      waitingQueue      
	 *                    writeQueue
	 *                    
	 *      send()  は、他のEditor Node から呼ばれる
	 *      write()  は、内部で優先的に送信するのに用いる
	 *          writeQueue は、waitingQueue よりも常に先に実行される必要がある

	 *      Manageの送信キューはここでは使わない
	 *            send()   manage
	 */
	@Override
	public void send(REPCommand command) {
		if (blocking || isMerging() || waitingCommandInMerge.size()>0) {
			waitingCommandInMerge.addLast(command);
			ServerMainLoop.logger.writeLog("Editor eid:"+eid+" waitingCommandInMerge = "+waitingCommandInMerge);
			return;
		}
		if (isMergeCommand(command)) {
			blocking = true;
			ServerMainLoop.logger.writeLog("Editor"+eid+": merging=true (send)"+command);
		}
		writeQueue.add(command);
	}

	/**
	 * Check waiting command in merge 
	 *     periodically called from manager    
	 */
	public void checkWaitingCommandInMerge() {
		if (writeQueue.size()>0) {
			REPCommand command =writeQueue.pollFirst(); 
			ServerMainLoop.logger.writeLog("Editor"+eid+": write comand="+command);
			super.write(command);
			return;
		}
		if (blocking || isMerging()) return;
		if (waitingCommandInMerge.size()>0) {
			REPCommand command = waitingCommandInMerge.pollFirst();
			ServerMainLoop.logger.writeLog("Editor"+eid+": send waiting comand="+command);
			super.write(command);
			if (isMergeCommand(command)) {
				blocking = true;
			}
		}
	}
	/**
	 * 他のエディタへのコマンドの送信
	 * @param command
	 * 
	 * sendList にキープする必要がある。 
	 */
	private void sendEditorCommand(REPCommand command) {
		REPCommand keep = new REPCommand(command);
		sentList.add(keep);
		ackList.add(keep);
		//ServerMainLoop.logger.writeLog("Editor eid:"+eid+" sentList = "+sentList);
		assert(ackList.size()<limit);
		if (command.cmd==REP.REPCMD_DELETE) {
			// delete のundo用の文字列は、外に出す意味はない
			command.string=null;
		}
		next.send(command);
	}

	/**
	 * 一周して来たcommandの処理。
	 * 
	 *   INSERT/DELETEを受け取った時に、sentListに登録
	 *    INSERT_ACK/DELETE_ACKが来たら一周。そこで、Mergeする。
	 *    
	 *    自分が出したINSERT/DELETEが戻って来たら、ACKに変更して、Merge。 
	 * 
	 * 途中から参加した場合、自分が受けとってないcommandのACKが先に来ることが
	 * ある。それは、無視して良い。
	 * @param command
	 */
	void checkReturnedCommand(REPCommand command) {
			startMerge(command);
	}

	void startMerge(REPCommand command) {
		ServerMainLoop.logger.writeLog("Editor"+eid+": startMerge "+command);
		preMergeCommand = new REPCommand(command);
		// merge は必須だが、EditorのCommand実装をテストするには邪魔なので、off に出来るようにする。
		if (mergeMode==MergeMode.NoMerge) {
			checkQuit();
			endMerge();
			return;
		}
		// START_MERGE を送る
		//    送らないで良い場合もある?
		REPCommand cmd = new REPCommand(REP.SMCMD_START_MERGE,command.sid,REP.SM_EID.id,seq(),0,"");
		sendToEditor(cmd);
		// merging = true;
		// Session Manager 側で、このeditorへの他のeditorからの
		// 入力を止めて、merge にそなえるのは、ここでは間に合わないので、
		// send() で行っている。USER Command は、止められないが、問題ない。
		// merge は、eidtor 側からACKが来てから始まる。
	}

	/**
	 * sentList と ack を見比べて、正しい順序で来たかどうかを調べる。途中参加したEditorの場合は、Ackは
	 * 無視して良い。
	 * @param command
	 * @return
	 */
	private boolean checkAck(REPCommand command) {
		REPCommand prev = null;
		try {
			if(mergeMode!=MergeMode.Direct && isMerging()) throw new Exception();
			if(ackList.size()==0) throw new Exception();
			prev=ackList.remove(0);		
			if (prev==null || prev.seq != command.seq || prev.eid!=command.eid) throw new Exception();
		} catch (Exception n) {
			// should be more robust to allow communication failure
			String err = "Editor eid="+eid+" checkReturnedCommand() : command = " + command + " prev="+
			(prev==null?"null":prev)+" ackList=";
			err += ackList;
			err += "merging="+isMerging();
			ServerMainLoop.logger.writeLog(err);
			assert(false);
		}
		return true;
	}


	@Override
	public void setQuit2(REPCommand cmd) {
		quit_2 = cmd;
		checkQuit();
		// do not send quit2 until we received all pending
		// command
	}

	@Override
	public void setEID(int eid) {
		this.eid = eid;
	}
	
	public String toString(){
		return ("Editor eid="+eid+" sid="+sid+" " + host  + ":" + file);
	}

	void checkEndMerge() {
		if (blocking) {
			if (isMerging()) return;
			endMerge();
			blocking = false;
		}
		if (quit_2!=null) checkQuit();
	}


	private void endMerge() {
		REPCommand mergeEnd = new REPCommand(REP.SMCMD_END_MERGE,sid,eid,seq(),0,"");
		sendToEditor(mergeEnd);
		if (mergeMode==MergeMode.Direct) {
			REPCommand last = sentList.size()==0?null:sentList.getLast();
			ServerMainLoop.logger.writeLog("Editor"+eid+": EndMerge Before"+report());
			if (last!=null && last.eid==eid && last.sid==sid) {
				truncateSentList(last,false);  // Own command interrupts us, trucate sentList
			}
			sentList.remove(mergeMark);
			preMergeCommand = null;
			ServerMainLoop.logger.writeLog("Editor"+eid+": EndMerge "+report());
			return ;
		}
		sortedEditCmds = null;
		checkAck(preMergeCommand);
		if (preMergeCommand.eid==eid) {
			if (mergeMode==MergeMode.Early) {
				sendAck(preMergeCommand);
			}
		} else {
			ServerMainLoop.logger.writeLog("Editor"+eid+": send preMergeCommand "+preMergeCommand);
			next.send(preMergeCommand);
		}
		// sentList.clear();
		preMergeCommand = null;
	}

	/**
	 * User Editor Command
	 *    no truncate here 
	 */
	private void truncateSentList(REPCommand commit, boolean mode) {
		if (mode && blocking) 	return; // merging is not enough except from endMerge()
	}
	
	/**
	 * Returned command fixed command order. Remove from
	 * sentList and unMergedCmds
	 * @param commit
	 */
	public void truncateUnMergedCmds(REPCommand commit) {
		assert(!merging);
		boolean flag = false;
		LinkedList<REPCommand>s = new LinkedList<REPCommand>();
		for(REPCommand command:sentList) {
			if (command.isSameSeq(commit)) {
				flag = true; continue;
			}
			if (flag) s.addLast(command);
		}
		if (flag) sentList = s;
	}

	/**
	 * Send ack command after receiving self command
	 * @param command
	 */
	private void sendAck(REPCommand command) {
		REPCommand keep = new REPCommand(command);
		// First Phase End, send ACK
		switch(keep.cmd) {
		case REPCMD_INSERT: keep.cmd = REP.REPCMD_INSERT_ACK;break;
		case REPCMD_DELETE: keep.cmd = REP.REPCMD_DELETE_ACK;break;
		default: assert(false);
		}
		ackList.addLast(keep);
		ServerMainLoop.logger.writeLog("Editor"+eid+": sendAck sentList = "+sentList);
		assert(ackList.size()<limit);
		keep.string = "";
		next.send(keep);
	}

	private boolean checkQuit() {
		if (quit_2!=null && ackList.size()==0 &&!isMerging() && waitingCommandInMerge.size()==0) {
			if (emptySentList() ){
			sendToEditor(quit_2);
			quit_2 = null;
			return true;
			}
		}
		return false;
	}

	private boolean emptySentList() {
		return sentList.size()==0||(sentList.size()==1 && sentList.getFirst().cmd==REP.REPCMD_MERGE_MARK);
	}

	@Override
	public boolean manage(REPCommand command) {
		
		
		switch(command.cmd){
		// Editor Command

		case REPCMD_DELETE:
		case REPCMD_INSERT:
		case REPCMD_DELETE_USER:
		case REPCMD_INSERT_USER:
		case REPCMD_DELETE_ACK:
		case REPCMD_INSERT_ACK:
		case REPCMD_MERGE_MARK:
		{
			translate(command);
			break;
		}

		case SMCMD_START_MERGE_ACK:
		{
			// マージの処理と次のエディタへコマンドを送信する処理
			mergeAck();
			if (!merge(preMergeCommand)) {
				// nothing to do, send END_MERGE
				checkEndMerge();
			}
			break;
		}
		
		case SMCMD_SYNC:
			if (isMaster()) 
				sendToEditor(command);
			else
				next.send(command);
			
		case SMCMD_QUIT:
		{
			next.send(command);
			break;
		}
		case SMCMD_QUIT_2:
		{
			// QUIT_2 is returned.
			if (command.eid!=eid) {
				// stop this editor unless this is the start, starter will stopped
				// by QUIT_2_ACK
				manager.remove(this);
			}
			// don't send quit_2 directly to the editor until all pending
			// merge is processed.
			//   this does not work in distributed case.
			if (next.isDirect())
				next.setQuit2(command);
			else
				next.send(command);
			break;
		}
		case SMCMD_QUIT_2_ACK:
		{
			manager.remove(this);
			break;
		}
		default:
			assert false;
			return false;
		}
		return true;
	}
	
	
	private boolean isMergeCommand(REPCommand command) {
		if (mergeMode==MergeMode.Direct) 
			return (command.eid!=eid&&command.cmd==REP.REPCMD_INSERT || command.cmd==REP.REPCMD_DELETE);
		switch(command.cmd) {
		case REPCMD_INSERT: case REPCMD_DELETE:
			return mergeMode==MergeMode.Slow?false:command.eid==eid;
		case REPCMD_INSERT_ACK: case REPCMD_DELETE_ACK:
			return mergeMode==MergeMode.Slow?true:command.eid!=eid;
		}
		return false;
	}
	
	public void sendToEditor(REPCommand command) {
		writeQueue.add(command);
	}

	@Override
	public void handle(REPCommand command, REPSelectionKey<REPCommand> key) throws IOException {
		if (command.cmd==REP.SMCMD_JOIN||command.cmd==REP.SMCMD_PUT) {
			// assert false;
			// 一つのエディタ上に複数のセッションが作られた場合。
			// 若干問題があるらしい
			next = new Forwarder(manager,next.channel);
			REPNode first = new FirstConnector(manager,channel);
			first.handle(command, key);
			key.attach(new Dispatcher(manager,channel));
			return;
		}
		if (manager.sessionManage(this, command)) return;
		// ServerMainLoop.logger.writeLog("Editor"+eid+": handle command="+command);
		manage(command);
	}

	@Override
	public void cancel(REPSocketChannel<REPCommand> socketChannel) {
		manager.remove(socketChannel);
	}

	public boolean isMaster() {
		return mode==REP.SMCMD_PUT;
	}

	
	/* Handle special case first, usually these cases
	 * are handled in the next Editor in a session manager, but 
	 * it is forwarded here.
	 */
	public void forwardedCommandManage(REPCommand command) {
		if (command.cmd==REP.SMCMD_QUIT_2) {
			// we have to wait next editor's finishing before sending this.
			// this is odd, but the editor itself does not know it's merging
			// state. Only this session manager knows it.
			setQuit2(command);
			return;
		} 
		send(command);
	}

	/**
	 * New command from an editor
	 * The command is sent to the next editor
	 * @param cmd
	 * @return translated command.
	 */
	public REPCommand transSendCmd(REPCommand cmd){
		assert(cmd.eid==eid);
		
		//マージ中にユーザから割り込みがあった場合
		if(isMerging()){
			logger.writeLog("mergeAgain"+eid+":"+cmd);
			mergeAgain = true;
		}
		
		return cmd;
	}
	
	/**
	 * My command is returned from the session ring, and START_MERGE_ACK
	 * is returned. At this
	 * stage my writeQueue is empty, our editor is waiting for me.
	 * Start merge process.
	 * @param cmd
	 */
	public boolean merge(REPCommand prev){
		logger.writeLog("beforeMerge"+eid+":"+sentList);
		LinkedList<REPCommand> output = new LinkedList<REPCommand>();
		// merge queue上にあるコマンドを全部undoコマンドするのと同時に
		// sort したコマンド列を生成する
		for( REPCommand cmd0 : sentList) {
			if (cmd0.cmd==REP.REPCMD_INSERT || cmd0.cmd==REP.REPCMD_DELETE) 
				output.addLast( createUndo(cmd0) );
		}

		sortedEditCmds = new TreeSet<REPCommand>(new REPCommandComparator(1));
		logger.writeLog("sentList"+eid+":"+sentList);
		for( REPCommand cmd0 : sentList ) {
			if (cmd0.cmd==REP.REPCMD_INSERT || cmd0.cmd==REP.REPCMD_DELETE) {
				sortedEditCmds.add(cmd0);
			}
		}
		output.addLast(mergeMark);
		output.addAll(sortedEditCmds);
		LinkedList<REPCommand> ns = new LinkedList<REPCommand>();
		ns.addAll(sortedEditCmds);
		sentList = ns;
		logger.writeLog("sortedMerge"+eid+":"+sortedEditCmds);
		// unMerged command のdeleteのundo string は、この時点で使えない。
		// Editor 側から送り返して来たものを使う必要がある。
		logger.writeLog("outputMerge"+eid+":"+output);
		return optimizedSend(this,output);
	}

	public boolean mergeEarly(REPCommand prev){
		logger.writeLog("beforeMerge"+eid+":"+sentList);
		LinkedList<REPCommand> output = new LinkedList<REPCommand>();
		LinkedList<REPCommand> newSentList = new LinkedList<REPCommand>();
		// merge queue上にあるコマンドを全部undoコマンドするのと同時に
		// sort したコマンド列を生成する
		for( REPCommand cmd0 : sentList) {
			output.addLast( createUndo(cmd0) );
		}

		sortedEditCmds = new TreeSet<REPCommand>(new REPCommandComparator(1));
		logger.writeLog("sentList"+eid+":"+sentList);
		boolean flag = true;
		for( REPCommand cmd0 : sentList ) {
			if (cmd0.cmd==REP.REPCMD_INSERT || cmd0.cmd==REP.REPCMD_DELETE) {
				if (flag)	sortedEditCmds.add(cmd0);
				else newSentList.add(cmd0);
			}
		}
		output.addAll(sortedEditCmds);
		output.addLast(mergeMark);
		logger.writeLog("sortedMerge"+eid+":"+sortedEditCmds);
		// sentList の command.string は、
		// Editor 側から送り返して来たものを使う必要がある。
		sentList = newSentList;
		logger.writeLog("outputMerge"+eid+":"+output);
		return optimizedSend(this,output);
	}

	/**
	 * Sent optimized merged command list
	 * @param editor 
	 * @param output
	 * @return if any sent commands output 
	 */
	public boolean optimizedSend(REPNode editor, LinkedList<REPCommand> output) {
		/*
		 * Optimized send の場合は、command.original を意識する必要がある
		 */
		sentMergedList.clear();
		List<REPCommand> output1 = optimizer.optimize(output);
		if (output1.size()==0) {
			merging = false;
			return false;
		}
		for(REPCommand c:output1) {
			REPCommand m = new REPCommand(c);
			m.setEID(REP.MERGE_EID.id);
			m.setSEQID(editor.seq());
			m.original = c;
			sentMergedList.addLast(m);
			editor.sendToEditor(m);
		}
		logger.writeLog("OptimizedOutputMerge"+eid+":"+sentMergedList);
		return true;
	}
	
	private REPCommand createUndo(REPCommand cmd){
		REPCommand retCmd = new REPCommand(cmd);
		retCmd.original = cmd;
		if (cmd.cmd==REP.REPCMD_INSERT) {
			retCmd.cmd=REP.REPCMD_DELETE;
			retCmd.string="";
		}
		else if (cmd.cmd==REP.REPCMD_DELETE) retCmd.cmd=REP.REPCMD_INSERT;
		return retCmd;
	}

	class REPCommandComparator implements Comparator<REPCommand>{
		int base;
		REPCommandComparator(int base) {
			this.base = base;
		}
		public int compare(REPCommand o1, REPCommand o2) {
			int eid1 = o1.eid-base; if (eid1<0) eid1 += Integer.MAX_VALUE;
			int eid2 = o2.eid-base; if (eid2<0) eid2 += Integer.MAX_VALUE;
			if ( gSeq(o1)<gSeq(o2) ) return -1;
			if ( gSeq(o1)>gSeq(o2) ) return 1;
			if ( eid1<eid2 ) return -1;
			if ( eid1>eid2 ) return 1;
			if ( o1.seq<o2.seq ) return -1;
			if ( o1.seq>o2.seq ) return 1;
			// assert(false);  // this can happen in MergedAgain case
			return 0;
		}
	}
	
	/**
	 * Translate Command  that was received from SeMa.
	 * @param cmd the command to be translated.
	 * @return translated command.
	 */
	public void transReceiveCmd(REPNode nextEditor,REPCommand cmd){
		assert (cmd.eid != eid);
		incrementGseq(cmd);
	}

	final int gseqLimit = 1000;
	
	private int gSeq(REPCommand cmd) {
		return cmd.seq/gseqLimit;
	}
	
	/**
	 * increment global sequence part 
	 * @param oseq
	 */
	private void incrementGseq( REPCommand cmd) {
		if (gSeq(cmd) / gseqLimit > seq / gseqLimit) {
			setSeq((gSeq(cmd)/gseqLimit+1)*gseqLimit);
		}
	}
	
	public void setEid(int _eid){
		eid = _eid;
	}

	public boolean checkMergeConflict(REPCommand command) {
		REPCommand prev = sentMergedList.getFirst();
		if (prev.seq==command.seq) {
			// logger.writeLog("Input eid="+eid+"SentMergedList = "+sentMergedList);
			sentMergedList.removeFirst();
			if (prev.original!=null && command.string!=null && !command.string.equals("")) {
				prev.original.string = command.string;
			}
		} 
		//  previous merge command may be returned

		if(sentMergedList.size()==0 && !mergeAgain) {
			merging=false;
		}
		return mergeAgain;
	}

	public void getMergeAgain() {
		if (sentMergedList.size()>0) return; //  wait for previous merge completion
		if (mergeMode==MergeMode.Direct) {
			logger.writeLog("MergeAgain "+eid);
			mergeAgain = false;
			merge(preMergeCommand);
			return;
		}
		
		LinkedList<REPCommand> returnCommand = new LinkedList<REPCommand>();
		for(REPCommand command : sentList) {
			if (command.cmd==REP.REPCMD_INSERT||command.cmd==REP.REPCMD_DELETE)
				returnCommand.addLast(createUndo(command));
		}
		returnCommand.addAll(sortedEditCmds);
		returnCommand.addLast(new REPCommand(REP.REPCMD_MERGE_MARK,0, sid, REP.MERGE_EID.id, seq(), ""));
		returnCommand.addAll(sentList);
		logger.writeLog("MergeAgain "+eid+" ret="+returnCommand.size());
		mergeAgain = false;
		optimizedSend(this, returnCommand);
	}
//
//	public boolean isFinished() {
//		if(unMergedCmds.size() > 0) return false;
//		if(sentMergedList.size() > 0) return false;
//		return true;
//	}

	public boolean isMerging() {
		return merging;
	}

	/**
	 *  receive SMCMD_START_MERGE_ACK
	 */
	public void mergeAck() {
		logger.writeLog("Editor"+eid+": START MERGE "+ sentList);
		// これ以降のUser command の割り込みはmergeのやり直しが必要
		merging = true;
	}

	/**
	 * Dead lock reporter
	 */
	public String report() {
		String s = "";
		s += "\n  sentList:"+sentList;
		s += "\n  ackList:"+ackList;
		s += "\n  mergeMode=:"+merging;
		return s;
	}

}