Mercurial > hg > RemoteEditor > REPSessionManager
view rep/handler/Editor.java @ 468:b800b33c6988
check quit2 and ackList
author | one |
---|---|
date | Mon, 11 Oct 2010 19:57:34 +0900 |
parents | b13926e43c28 |
children | e252d092b720 |
line wrap: on
line source
package rep.handler; import java.io.IOException; import java.util.Comparator; import java.util.LinkedList; import java.util.List; import java.util.TreeSet; import rep.REP; import rep.REPCommand; import rep.ServerMainLoop; import rep.SessionManager; import rep.channel.REPLogger; import rep.channel.REPSelectionKey; import rep.channel.REPSocketChannel; import rep.optimizers.*; public class Editor extends Forwarder { // REPCommands we are going to send to the next editor private LinkedList<REPCommand> sentList = new LinkedList<REPCommand>(); // Expected acknowledge list private LinkedList<REPCommand> ackList = new LinkedList<REPCommand>(); public LinkedList<REPCommand> waitingCommandInMerge= new LinkedList<REPCommand>(); private REPCommand quit_2=null; private REPCommand preMergeCommand; public REPCommandOptimizer optimizer; private LinkedList<REPCommand> unMergedCmds; private LinkedList<REPCommand> sentMergedList; private TreeSet<REPCommand> sortedEditCmds; boolean mergeAgain; public REPLogger logger = SessionManager.logger; boolean merge_mode = false; private boolean merging; public static boolean noMergeMode=false; static final boolean doOptimize = false; private LinkedList<REPCommand> writeQueue = new LinkedList<REPCommand>(); static boolean slowMerge = true; public Editor(SessionManager manager,int editorNo){ // no translator case super(manager, null); } public Editor(int eid, REPCommandOptimizer optimizer) { super(null, null); this.optimizer = optimizer; } public Editor(int editorNo, SessionManager manager,REPSocketChannel<REPCommand> channel){ super(editorNo,manager,channel); eid = editorNo; if (doOptimize) optimizer = new DeleteInsertOptimizer(); //タカノがつくったおぷてぃまいざ else optimizer = new NullOptimizer(); //なにもしないけどOptimizer. unMergedCmds = new LinkedList<REPCommand>(); mergeAgain = false; sentMergedList = new LinkedList<REPCommand>(); } /* * Merge Protocol (0) Editor へのコマンドは、ack 以外は直接 Editor へ送られてしまう。(next.send(cmd)) Editor から返ってくるコマンドをtranslatorが処理する。 (1) Editor CommandをSession Ring 上に流し、それが戻って来るまでに、他のEditorから 受け取った Editor Command をキューに入れておく。 (2) 戻って来たタイミングで、キュー上のEditor Commandを、eid とCommandの 順序を基にソートする。(self merge) (3) 他のEditorにソートのタイミングを与えるために、Editor Command の ack を、もう一周させる。 (4) 他のEditorのCommandを受け取ってから、ack が来るまでのCommandをキューに 入れておき、ack が来たら、eid とCommandの順序を基にソートする。(other merge) Editor には、ソートした編集結果になるように、それまで行なった編集をUndo して、ソートした編集結果を適用する。Undo が無駄な動作をしないように最適化する。 handle() セッションの処理 manage() 編集コマンドは translate() へ 一周して来た編集コマンドのACKは廃棄 (merge queue から削除) 一周して来た自分のコマンドならself merge 他のエディタの編集コマンドのACK->other merge それ以外は、そのまま実行、merge queue へ格納 merge は checkReturnedCommand() から startMerge() へ まず、接続されている Editor に START_MERGE を送る 邪魔されないように、他のcommand は block する manager() START_MERGE_ACK が来たら、translator.mergeAck() で教えて、 merge()-> translator.checkOwnCommand() へ ここで、sort されて、Merge Command をEditorへ送信 checkEndMerge()から endMerge() が呼ばれる。 自分のエディタにEND_MERGE で Merge終了を通知 自分のコマンドは、ACKに変えて送信 (3) それ以外は、そのまま送信 (一周させる) */ public void translate(REPCommand command){ switch(command.cmd) { case REPCMD_INSERT_ACK: case REPCMD_DELETE_ACK: if (command.eid==eid) { if (slowMerge) { checkReturnedCommand(command); checkQuit(); return; } // Second Phase が終わって同期が終了。 // SessionManager.logger.writeLog("Complete "+command); checkAck(command); checkQuit(); return; } checkReturnedCommand(command); return; case REPCMD_INSERT_USER: command.cmd = REP.REPCMD_INSERT; userEditorCommand(command); return; case REPCMD_DELETE_USER: command.cmd = REP.REPCMD_DELETE; userEditorCommand(command); return; case REPCMD_INSERT: case REPCMD_DELETE: case REPCMD_MERGE_MARK: if (command.eid == REP.MERGE_EID.id){ //マージコマンドが返ってきた if(checkMergeConflict(command)){ //マージ中にエディタからの割り込みがあった場合 getMergeAgain(this); } checkEndMerge(); return; } if (command.eid == eid){ // 編集コマンドが一周して来た if (slowMerge) { checkAck(command); sendAck(command); } else { checkReturnedCommand(command); } return; } //他のエディタからの編集コマンド transReceiveCmd(next,command); sendEditorCommand(command); return; default: assert(false); } } private void userEditorCommand(REPCommand command) { //エディタからの新たな編集コマンド if (next==this) return; // singleton case transSendCmd(command); sendEditorCommand(command); return; } // private void checkDouble(List<REPCommand> sentList) { // if (sentList.size()==0) return; // int count = 0; // REPCommand f = sentList.get(0); // for(REPCommand c:sentList) { // if (c.eid==f.eid&&c.seq==f.seq) { // count++; // } // } // assert(count==1); // if (true) return; // count = 0; // for(PacketSet c:waitingCommandInMerge) { // for(REPCommand g:sentList) { // if (c.command.eid==g.eid&&c.command.seq==g.seq) { // count++; // } // } // } // assert(count==0); // } /** * Sending to Editor and waiting Queue * +--------+ * send() --> write() -> | Editor | -> handle() -> manager() * +--------+ * waitingQueue * writeQueue * * send() は、他のEditor Node から呼ばれる * write() は、内部で優先的に送信するのに用いる * writeQueue は、waitingQueue よりも常に先に実行される必要がある * Manageの送信キューはここでは使わない * send() manage */ @Override public void send(REPCommand command) { if (merging || isMerging() || waitingCommandInMerge.size()>0) { waitingCommandInMerge.addLast(command); ServerMainLoop.logger.writeLog("Editor eid:"+eid+" waitingCommandInMerge = "+waitingCommandInMerge); return; } if (isMergeCommand(command)) { merging = true; ServerMainLoop.logger.writeLog("Editor"+eid+": merging=true (send)"+command); } writeQueue.add(command); } /** * Check waiting command in merge * periodically called from manager */ public void checkWaitingCommandInMerge() { if (writeQueue.size()>0) { REPCommand command =writeQueue.pollFirst(); ServerMainLoop.logger.writeLog("Editor"+eid+": write comand="+command); super.write(command); return; } if (merging || isMerging()) return; if (waitingCommandInMerge.size()>0) { REPCommand command = waitingCommandInMerge.pollFirst(); ServerMainLoop.logger.writeLog("Editor"+eid+": send waiting comand="+command); super.write(command); if (isMergeCommand(command)) { merging = true; } } } /** * 他のエディタへのコマンドの送信 * @param command * * sendList にキープする必要がある。 */ private void sendEditorCommand(REPCommand command) { REPCommand keep = new REPCommand(command); sentList.add(keep); ackList.add(keep); //ServerMainLoop.logger.writeLog("Editor eid:"+eid+" sentList = "+sentList); assert(ackList.size()<limit); if (command.cmd==REP.REPCMD_DELETE) { // delete のundo用の文字列は、外に出す意味はない command.string=null; } next.send(command); } public List<REPCommand> getSentList() { return sentList; } public void setSentList(LinkedList<REPCommand> list) { sentList = list; } /** * 一周して来たcommandの処理。 * * INSERT/DELETEを受け取った時に、sentListに登録 * INSERT_ACK/DELETE_ACKが来たら一周。そこで、Mergeする。 * * 自分が出したINSERT/DELETEが戻って来たら、ACKに変更して、Merge。 * * 途中から参加した場合、自分が受けとってないcommandのACKが先に来ることが * ある。それは、無視して良い。 * @param command */ void checkReturnedCommand(REPCommand command) { ServerMainLoop.logger.writeLog("Editor"+eid+": startMerge "+command); preMergeCommand = new REPCommand(command); // merge は必須だが、EditorのCommand実装をテストするには邪魔なので、off に出来るようにする。 if (noMergeMode) { checkQuit(); endMerge(); return; } // START_MERGE を送る // 送らないで良い場合もある? REPCommand cmd = new REPCommand(REP.SMCMD_START_MERGE,command.sid,REP.SM_EID.id,seq(),0,""); sendToEditor(cmd); merging = true; // Session Manager 側で、このeditorへの他のeditorからの // 入力を止めて、merge にそなえる。merge は、eidtor 側から // ACKが来てから始まる。 } /** * sentList と ack を見比べて、正しい順序で来たかどうかを調べる。途中参加したEditorの場合は、Ackは * 無視して良い。 * @param command * @return */ private boolean checkAck(REPCommand command) { REPCommand prev = null; try { if(isMerging() || ackList.size()==0) throw new Exception(); prev=ackList.remove(0); if (prev==null || prev.seq != command.seq || prev.eid!=command.eid) throw new Exception(); } catch (Exception n) { // should be more robust to allow communication failure String err = "Editor eid="+eid+" checkReturnedCommand() : command = " + command + " prev="+ (prev==null?"null":prev)+" ackList="; err += ackList; ServerMainLoop.logger.writeLog(err); assert(false); } return true; } @Override public void setQuit2(REPCommand cmd) { quit_2 = cmd; checkQuit(); // do not send quit2 until we received all pending // command } @Override public void setEID(int eid) { this.eid = eid; } public String toString(){ return ("Editor eid="+eid+" sid="+sid+" " + host + ":" + file); } void checkEndMerge() { if (merging) { if (isMerging()) return; endMerge(); merging = false; } if (quit_2!=null) checkQuit(); } private void endMerge() { sortedEditCmds = null; LinkedList<REPCommand>u = new LinkedList<REPCommand>(); boolean flag=true; for(REPCommand command:unMergedCmds) { if (command.cmd==REP.REPCMD_MERGE_MARK) { flag = false; } if (flag) u.addLast(command); } unMergedCmds = u; REPCommand mergeEnd = new REPCommand(REP.SMCMD_END_MERGE,sid,eid,seq(),0,""); sendToEditor(mergeEnd); checkAck(preMergeCommand); if (preMergeCommand.eid==eid) { if (!slowMerge) { sendAck(preMergeCommand); } } else { ServerMainLoop.logger.writeLog("Editor"+eid+": send preMergeCommand "+preMergeCommand); next.send(preMergeCommand); } // sentList.clear(); preMergeCommand = null; } private void sendAck(REPCommand command) { REPCommand keep = new REPCommand(command); // First Phase End, send ACK switch(keep.cmd) { case REPCMD_INSERT: keep.cmd = REP.REPCMD_INSERT_ACK;break; case REPCMD_DELETE: keep.cmd = REP.REPCMD_DELETE_ACK;break; default: assert(false); } ackList.addLast(keep); ServerMainLoop.logger.writeLog("Editor"+eid+": sendAck sentList = "+sentList); assert(ackList.size()<limit); keep.string = ""; next.send(keep); } private boolean checkQuit() { if (quit_2!=null && ackList.size()==0 && sentList.size()==0&&!isMerging() && waitingCommandInMerge.size()==0) { sendToEditor(quit_2); quit_2 = null; return true; } return false; } @Override public boolean manage(REPCommand command) { switch(command.cmd){ // Editor Command case REPCMD_DELETE: case REPCMD_INSERT: case REPCMD_DELETE_USER: case REPCMD_INSERT_USER: case REPCMD_DELETE_ACK: case REPCMD_INSERT_ACK: case REPCMD_MERGE_MARK: { translate(command); break; } case SMCMD_START_MERGE_ACK: { // マージの処理と次のエディタへコマンドを送信する処理 mergeAck(); if (!merge(this, preMergeCommand)) { // nothing to do, send END_MERGE checkEndMerge(); } break; } case SMCMD_SYNC: if (isMaster()) sendToEditor(command); else next.send(command); case SMCMD_QUIT: { next.send(command); break; } case SMCMD_QUIT_2: { // QUIT_2 is returned. if (command.eid!=eid) { // stop this editor unless this is the start, starter will stopped // by QUIT_2_ACK manager.remove(this); } // don't send quit_2 directly to the editor until all pending // merge is processed. // this does not work in distributed case. if (next.isDirect()) next.setQuit2(command); else next.send(command); break; } case SMCMD_QUIT_2_ACK: { manager.remove(this); break; } default: assert false; return false; } return true; } private boolean isMergeCommand(REPCommand command) { switch(command.cmd) { case REPCMD_INSERT: case REPCMD_DELETE: return slowMerge?false:command.eid==eid; case REPCMD_INSERT_ACK: case REPCMD_DELETE_ACK: return slowMerge?true:command.eid!=eid; } return false; } public void sendToEditor(REPCommand command) { writeQueue.add(command); } @Override public void handle(REPCommand command, REPSelectionKey<REPCommand> key) throws IOException { //ServerMainLoop.logger.writeLog("Manager "+manager.getId()+" read : command = " + command // +" from "+manager.editorList.editorByChannel(channel)); if (command.cmd==REP.SMCMD_JOIN||command.cmd==REP.SMCMD_PUT) { // assert false; // 一つのエディタ上に複数のセッションが作られた場合。 // 若干問題があるらしい next = new Forwarder(manager,next.channel); REPNode first = new FirstConnector(manager,channel); first.handle(command, key); key.attach(new Dispatcher(manager,channel)); return; } if (manager.sessionManage(this, command)) return; manage(command); } @Override public void cancel(REPSocketChannel<REPCommand> socketChannel) { manager.remove(socketChannel); } public boolean isMaster() { return mode==REP.SMCMD_PUT; } /* Handle special case first, usually these cases * are handled in the next Editor in a session manager, but * it is forwarded here. */ public void forwardedCommandManage(REPCommand command) { if (command.cmd==REP.SMCMD_QUIT_2) { // we have to wait next editor's finishing before sending this. // this is odd, but the editor itself does not know it's merging // state. Only this session manager knows it. setQuit2(command); return; } send(command); } /** * New command from an editor * The command is sent to the next editor * @param cmd * @return translated command. */ public REPCommand transSendCmd(REPCommand cmd){ assert(cmd.eid==eid); unMergedCmds.addLast(cmd); //マージ中にユーザから割り込みがあった場合 if(isMerging()){ mergeAgain = true; } return cmd; } /** * My command is returned from the session ring, and START_MERGE_ACK * is returned. At this * stage my writeQueue is empty, our editor is waiting for me. * Start merge process. * @param cmd */ public boolean merge(Editor editor, REPCommand prev){ logger.writeLog("beforeMerge"+eid+":"+unMergedCmds); LinkedList<REPCommand> output = new LinkedList<REPCommand>(); LinkedList<REPCommand> newSentList = new LinkedList<REPCommand>(); // merge queue上にあるコマンドを全部undoコマンドするのと同時に // sort したコマンド列を生成する for( REPCommand cmd0 : unMergedCmds) { output.addLast( createUndo(cmd0) ); } sortedEditCmds = new TreeSet<REPCommand>(new REPCommandComparator(1)); logger.writeLog("sentList"+eid+":"+editor.getSentList()); boolean flag = false; for( REPCommand cmd0 : editor.getSentList()) { if (cmd0.cmd==REP.REPCMD_INSERT || cmd0.cmd==REP.REPCMD_DELETE) { if (flag) { output.addLast(cmd0); newSentList.addLast(cmd0); } else { sortedEditCmds.add(cmd0); } } if (cmd0.sid==prev.sid && cmd0.eid==prev.eid && cmd0.seq==prev.seq) { flag = true; output.addAll(sortedEditCmds); output.addLast(new REPCommand(REP.REPCMD_MERGE_MARK,0, editor.getSID(), REP.MERGE_EID.id, editor.seq(), "")); } } if (!flag) { output.addAll(sortedEditCmds); output.addLast(new REPCommand(REP.REPCMD_MERGE_MARK,0, editor.getSID(), REP.MERGE_EID.id, editor.seq(), "")); } logger.writeLog("sortedMerge"+eid+":"+sortedEditCmds); // unMerged command のdeleteのundo string は、この時点で使えない。 // Editor 側から送り返して来たものを使う必要がある。 unMergedCmds.clear(); logger.writeLog("outputMerge"+eid+":"+output); editor.setSentList(newSentList); return optimizedSend(editor,output); } /** * Sent optimized merged command list * @param editor * @param output * @return if any sent commands output */ public boolean optimizedSend(REPNode editor, LinkedList<REPCommand> output) { /* * Optimized send の場合は、unMergedCommand のつじつまを合わせる必要がある。 */ sentMergedList.clear(); List<REPCommand> output1 = optimizer.optimize(output); if (output1.size()==0) { merge_mode = false; return false; } for(REPCommand c:output1) { REPCommand m = new REPCommand(c); m.setEID(REP.MERGE_EID.id); m.setSEQID(editor.seq()); sentMergedList.addLast(m); editor.sendToEditor(m); } logger.writeLog("OptimizedOutputMerge"+eid+":"+sentMergedList); merge_mode = true; return true; } private REPCommand createUndo(REPCommand cmd){ REPCommand retCmd = new REPCommand(cmd); if (cmd.cmd==REP.REPCMD_INSERT) { retCmd.cmd=REP.REPCMD_DELETE; retCmd.string=""; } else if (cmd.cmd==REP.REPCMD_DELETE) retCmd.cmd=REP.REPCMD_INSERT; return retCmd; } class REPCommandComparator implements Comparator<REPCommand>{ int base; REPCommandComparator(int base) { this.base = base; } public int compare(REPCommand o1, REPCommand o2) { int eid1 = o1.eid-base; if (eid1<0) eid1 += Integer.MAX_VALUE; int eid2 = o2.eid-base; if (eid2<0) eid2 += Integer.MAX_VALUE; if ( eid1<eid2 ) return -1; if ( eid1>eid2 ) return 1; if ( o1.seq<o2.seq ) return -1; if ( o1.seq>o2.seq ) return 1; // assert(false); // this can happen in MergedAgain case return 0; } } /** * Translate Command that was received from SeMa. * @param cmd the command to be translated. * @return translated command. */ public void transReceiveCmd(REPNode nextEditor,REPCommand cmd){ assert (cmd.eid != eid); unMergedCmds.addFirst(cmd); } public void setEid(int _eid){ eid = _eid; } public boolean checkMergeConflict(REPCommand command) { unMergedCmds.addFirst(command); REPCommand prev = sentMergedList.getFirst(); if (prev.seq==command.seq) { // logger.writeLog("Input eid="+eid+"SentMergedList = "+sentMergedList); sentMergedList.removeFirst(); } // previous merge command may be returned if(sentMergedList.size()==0 && !mergeAgain) { merge_mode=false; } return mergeAgain; } public void getMergeAgain(Editor editor) { if (sentMergedList.size()>0) return; // wait for previous merge completion LinkedList<REPCommand> returnCommand = new LinkedList<REPCommand>(); for(REPCommand command : unMergedCmds) { if (command.cmd==REP.REPCMD_INSERT||command.cmd==REP.REPCMD_DELETE) returnCommand.add(createUndo(command)); } returnCommand.addAll(sortedEditCmds); returnCommand.addLast(new REPCommand(REP.REPCMD_MERGE_MARK,0, editor.getSID(), REP.MERGE_EID.id, editor.seq(), "")); returnCommand.addAll(editor.getSentList()); unMergedCmds.clear(); logger.writeLog("MergeAgain "+eid+" ret="+returnCommand.size()); mergeAgain = false; optimizedSend(editor, returnCommand); } // // public boolean isFinished() { // if(unMergedCmds.size() > 0) return false; // if(sentMergedList.size() > 0) return false; // return true; // } public boolean isMerging() { return merge_mode; } /** * receive SMCMD_START_MERGE_ACK */ public void mergeAck() { logger.writeLog("Editor"+eid+": START MERGE "+ ((unMergedCmds.size()>0)?" and top of unMergedCmds = "+ unMergedCmds.getLast():"")); merge_mode = true; } }