Mercurial > hg > RemoteEditor > REPSessionManager
diff rep/handler/Editor.java @ 468:b800b33c6988
check quit2 and ackList
author | one |
---|---|
date | Mon, 11 Oct 2010 19:57:34 +0900 |
parents | b13926e43c28 |
children | e252d092b720 |
line wrap: on
line diff
--- a/rep/handler/Editor.java Mon Oct 11 12:54:55 2010 +0900 +++ b/rep/handler/Editor.java Mon Oct 11 19:57:34 2010 +0900 @@ -1,20 +1,22 @@ package rep.handler; import java.io.IOException; +import java.util.Comparator; import java.util.LinkedList; import java.util.List; +import java.util.TreeSet; import rep.REP; import rep.REPCommand; import rep.ServerMainLoop; import rep.SessionManager; +import rep.channel.REPLogger; import rep.channel.REPSelectionKey; import rep.channel.REPSocketChannel; import rep.optimizers.*; public class Editor extends Forwarder { - private Translator translator; // REPCommands we are going to send to the next editor private LinkedList<REPCommand> sentList = new LinkedList<REPCommand>(); // Expected acknowledge list @@ -22,6 +24,15 @@ public LinkedList<REPCommand> waitingCommandInMerge= new LinkedList<REPCommand>(); private REPCommand quit_2=null; private REPCommand preMergeCommand; + + public REPCommandOptimizer optimizer; + private LinkedList<REPCommand> unMergedCmds; + private LinkedList<REPCommand> sentMergedList; + private TreeSet<REPCommand> sortedEditCmds; + boolean mergeAgain; + public REPLogger logger = SessionManager.logger; + boolean merge_mode = false; + private boolean merging; public static boolean noMergeMode=false; static final boolean doOptimize = false; @@ -33,13 +44,20 @@ super(manager, null); } + public Editor(int eid, REPCommandOptimizer optimizer) { + super(null, null); + this.optimizer = optimizer; + } + public Editor(int editorNo, SessionManager manager,REPSocketChannel<REPCommand> channel){ super(editorNo,manager,channel); eid = editorNo; - REPCommandOptimizer optimizer; if (doOptimize) optimizer = new DeleteInsertOptimizer(); //タカノがつくったおぷてぃまいざ else optimizer = new NullOptimizer(); //なにもしないけどOptimizer. - translator = new Translator(eid,optimizer); + + unMergedCmds = new LinkedList<REPCommand>(); + mergeAgain = false; + sentMergedList = new LinkedList<REPCommand>(); } /* @@ -83,6 +101,7 @@ */ + public void translate(REPCommand command){ switch(command.cmd) { case REPCMD_INSERT_ACK: @@ -114,9 +133,9 @@ case REPCMD_MERGE_MARK: if (command.eid == REP.MERGE_EID.id){ //マージコマンドが返ってきた - if(translator.checkMergeConflict(command)){ + if(checkMergeConflict(command)){ //マージ中にエディタからの割り込みがあった場合 - translator.getMergeAgain(this); + getMergeAgain(this); } checkEndMerge(); return; @@ -133,7 +152,7 @@ } //他のエディタからの編集コマンド - translator.transReceiveCmd(next,command); + transReceiveCmd(next,command); sendEditorCommand(command); return; @@ -145,7 +164,7 @@ private void userEditorCommand(REPCommand command) { //エディタからの新たな編集コマンド if (next==this) return; // singleton case - translator.transSendCmd(command); + transSendCmd(command); sendEditorCommand(command); return; } @@ -212,7 +231,7 @@ super.write(command); return; } - if (translator==null || merging || isMerging()) return; + if (merging || isMerging()) return; if (waitingCommandInMerge.size()>0) { REPCommand command = waitingCommandInMerge.pollFirst(); ServerMainLoop.logger.writeLog("Editor"+eid+": send waiting comand="+command); @@ -278,7 +297,6 @@ // Session Manager 側で、このeditorへの他のeditorからの // 入力を止めて、merge にそなえる。merge は、eidtor 側から // ACKが来てから始まる。 - translator.startMerge(command); } /** @@ -316,22 +334,15 @@ @Override public void setEID(int eid) { this.eid = eid; - if (translator!=null) - translator.setEid(eid); } public String toString(){ return ("Editor eid="+eid+" sid="+sid+" " + host + ":" + file); } - public boolean isMerging() { - return translator.isMerging(); - } - - void checkEndMerge() { if (merging) { - if (translator.isMerging()) return; + if (isMerging()) return; endMerge(); merging = false; } @@ -340,7 +351,17 @@ private void endMerge() { - translator.endMerge(); + sortedEditCmds = null; + LinkedList<REPCommand>u = new LinkedList<REPCommand>(); + boolean flag=true; + for(REPCommand command:unMergedCmds) { + if (command.cmd==REP.REPCMD_MERGE_MARK) { + flag = false; + } + if (flag) u.addLast(command); + } + unMergedCmds = u; + REPCommand mergeEnd = new REPCommand(REP.SMCMD_END_MERGE,sid,eid,seq(),0,""); sendToEditor(mergeEnd); checkAck(preMergeCommand); @@ -349,6 +370,7 @@ sendAck(preMergeCommand); } } else { + ServerMainLoop.logger.writeLog("Editor"+eid+": send preMergeCommand "+preMergeCommand); next.send(preMergeCommand); } // sentList.clear(); @@ -371,7 +393,7 @@ } private boolean checkQuit() { - if (quit_2!=null && sentList.size()==0&&!isMerging() && waitingCommandInMerge.size()==0) { + if (quit_2!=null && ackList.size()==0 && sentList.size()==0&&!isMerging() && waitingCommandInMerge.size()==0) { sendToEditor(quit_2); quit_2 = null; return true; @@ -401,8 +423,8 @@ case SMCMD_START_MERGE_ACK: { // マージの処理と次のエディタへコマンドを送信する処理 - translator.mergeAck(); - if (!translator.merge(this, preMergeCommand)) { + mergeAck(); + if (!merge(this, preMergeCommand)) { // nothing to do, send END_MERGE checkEndMerge(); } @@ -507,5 +529,192 @@ send(command); } + /** + * New command from an editor + * The command is sent to the next editor + * @param cmd + * @return translated command. + */ + public REPCommand transSendCmd(REPCommand cmd){ + assert(cmd.eid==eid); + unMergedCmds.addLast(cmd); + + //マージ中にユーザから割り込みがあった場合 + if(isMerging()){ + mergeAgain = true; + } + + return cmd; + } + + /** + * My command is returned from the session ring, and START_MERGE_ACK + * is returned. At this + * stage my writeQueue is empty, our editor is waiting for me. + * Start merge process. + * @param cmd + */ + public boolean merge(Editor editor, REPCommand prev){ + logger.writeLog("beforeMerge"+eid+":"+unMergedCmds); + LinkedList<REPCommand> output = new LinkedList<REPCommand>(); + LinkedList<REPCommand> newSentList = new LinkedList<REPCommand>(); + // merge queue上にあるコマンドを全部undoコマンドするのと同時に + // sort したコマンド列を生成する + for( REPCommand cmd0 : unMergedCmds) { + output.addLast( createUndo(cmd0) ); + } + + sortedEditCmds = new TreeSet<REPCommand>(new REPCommandComparator(1)); + logger.writeLog("sentList"+eid+":"+editor.getSentList()); + boolean flag = false; + for( REPCommand cmd0 : editor.getSentList()) { + if (cmd0.cmd==REP.REPCMD_INSERT || cmd0.cmd==REP.REPCMD_DELETE) { + if (flag) { + output.addLast(cmd0); + newSentList.addLast(cmd0); + } else { + sortedEditCmds.add(cmd0); + } + } + if (cmd0.sid==prev.sid && cmd0.eid==prev.eid && cmd0.seq==prev.seq) { + flag = true; + output.addAll(sortedEditCmds); + output.addLast(new REPCommand(REP.REPCMD_MERGE_MARK,0, editor.getSID(), REP.MERGE_EID.id, editor.seq(), "")); + } + } + if (!flag) { + output.addAll(sortedEditCmds); + output.addLast(new REPCommand(REP.REPCMD_MERGE_MARK,0, editor.getSID(), REP.MERGE_EID.id, editor.seq(), "")); + } + logger.writeLog("sortedMerge"+eid+":"+sortedEditCmds); + // unMerged command のdeleteのundo string は、この時点で使えない。 + // Editor 側から送り返して来たものを使う必要がある。 + unMergedCmds.clear(); + logger.writeLog("outputMerge"+eid+":"+output); + editor.setSentList(newSentList); + return optimizedSend(editor,output); + } + + /** + * Sent optimized merged command list + * @param editor + * @param output + * @return if any sent commands output + */ + public boolean optimizedSend(REPNode editor, LinkedList<REPCommand> output) { + /* + * Optimized send の場合は、unMergedCommand のつじつまを合わせる必要がある。 + */ + sentMergedList.clear(); + List<REPCommand> output1 = optimizer.optimize(output); + if (output1.size()==0) { + merge_mode = false; + return false; + } + for(REPCommand c:output1) { + REPCommand m = new REPCommand(c); + m.setEID(REP.MERGE_EID.id); + m.setSEQID(editor.seq()); + sentMergedList.addLast(m); + editor.sendToEditor(m); + } + logger.writeLog("OptimizedOutputMerge"+eid+":"+sentMergedList); + merge_mode = true; + return true; + } + + private REPCommand createUndo(REPCommand cmd){ + REPCommand retCmd = new REPCommand(cmd); + if (cmd.cmd==REP.REPCMD_INSERT) { + retCmd.cmd=REP.REPCMD_DELETE; + retCmd.string=""; + } + else if (cmd.cmd==REP.REPCMD_DELETE) retCmd.cmd=REP.REPCMD_INSERT; + return retCmd; + } + + class REPCommandComparator implements Comparator<REPCommand>{ + int base; + REPCommandComparator(int base) { + this.base = base; + } + public int compare(REPCommand o1, REPCommand o2) { + int eid1 = o1.eid-base; if (eid1<0) eid1 += Integer.MAX_VALUE; + int eid2 = o2.eid-base; if (eid2<0) eid2 += Integer.MAX_VALUE; + if ( eid1<eid2 ) return -1; + if ( eid1>eid2 ) return 1; + if ( o1.seq<o2.seq ) return -1; + if ( o1.seq>o2.seq ) return 1; + // assert(false); // this can happen in MergedAgain case + return 0; + } + } + + /** + * Translate Command that was received from SeMa. + * @param cmd the command to be translated. + * @return translated command. + */ + public void transReceiveCmd(REPNode nextEditor,REPCommand cmd){ + assert (cmd.eid != eid); + unMergedCmds.addFirst(cmd); + } + + public void setEid(int _eid){ + eid = _eid; + } + + public boolean checkMergeConflict(REPCommand command) { + unMergedCmds.addFirst(command); + + REPCommand prev = sentMergedList.getFirst(); + if (prev.seq==command.seq) { + // logger.writeLog("Input eid="+eid+"SentMergedList = "+sentMergedList); + sentMergedList.removeFirst(); + } + // previous merge command may be returned + + if(sentMergedList.size()==0 && !mergeAgain) { + merge_mode=false; + } + return mergeAgain; + } + + public void getMergeAgain(Editor editor) { + if (sentMergedList.size()>0) return; // wait for previous merge completion + + LinkedList<REPCommand> returnCommand = new LinkedList<REPCommand>(); + for(REPCommand command : unMergedCmds) { + if (command.cmd==REP.REPCMD_INSERT||command.cmd==REP.REPCMD_DELETE) + returnCommand.add(createUndo(command)); + } + returnCommand.addAll(sortedEditCmds); + returnCommand.addLast(new REPCommand(REP.REPCMD_MERGE_MARK,0, editor.getSID(), REP.MERGE_EID.id, editor.seq(), "")); + returnCommand.addAll(editor.getSentList()); + unMergedCmds.clear(); + logger.writeLog("MergeAgain "+eid+" ret="+returnCommand.size()); + mergeAgain = false; + optimizedSend(editor, returnCommand); + } +// +// public boolean isFinished() { +// if(unMergedCmds.size() > 0) return false; +// if(sentMergedList.size() > 0) return false; +// return true; +// } + + public boolean isMerging() { + return merge_mode; + } + + /** + * receive SMCMD_START_MERGE_ACK + */ + public void mergeAck() { + logger.writeLog("Editor"+eid+": START MERGE "+ + ((unMergedCmds.size()>0)?" and top of unMergedCmds = "+ unMergedCmds.getLast():"")); + merge_mode = true; + } + }