# HG changeset patch # User kono # Date 1223213784 -32400 # Node ID 0585fd2410b8b3db3dfa3b8040829c2a152f81c8 # Parent f39a8045175d36bea0b90f7b1c6d8abe07434a48 Single Insert Command worked. diff -r f39a8045175d -r 0585fd2410b8 rep/Editor.java --- a/rep/Editor.java Sun Oct 05 11:39:18 2008 +0900 +++ b/rep/Editor.java Sun Oct 05 22:36:24 2008 +0900 @@ -12,6 +12,7 @@ public class Editor { private int eid; // unique id in a session private int sid = -1 ; // globally unique session id + private int seq = 0; private REPSocketChannel myChannel; private String host; private String file; @@ -62,12 +63,9 @@ if(command.eid == eid){ if(checkReturnedCommand(command)){ //エディタからのコマンドが元のエディタに戻ってきた - //マージして送信 - ArrayList cmds = translater.catchOwnCommand(command); - //optimizer - //マージ中のエディタからの割り込み検知に使う - sentMergedList.addAll(cmds); - sendMergedCommand(cmds); + // START_MERGE を送る + REPCommand cmd = new REPCommand(REP.SMCMD_START_MERGE,command.sid,REP.SM_EID.id,seq(),0,""); + list.add(cmd); return list; }else{ //エディタからの新たな編集コマンド @@ -96,15 +94,31 @@ return list; } + boolean merge(REPCommand command) { + REPCommand prev = translater.prev(); + if(prev==null) return false; + assert(prev.eid==command.eid); + //マージして送信 + ArrayList cmds = translater.catchOwnCommand(); + //optimizer + if (cmds.size()==0) + return false; // no merge phase is necessary + //マージ中のエディタからの割り込み検知に使う + sentMergedList.addAll(cmds); + sendMergedCommand(cmds); + return true; + } + private void sendMergedCommand(ArrayList cmds) { for(REPCommand mergeCommand : cmds){ mergeCommand.setEID(REP.MERGE_EID.id); + mergeCommand.setSEQID(seq()); writeQueue.add(mergeCommand); assert(writeQueue.size() 0){ if(sentList.get(0).seq == command.seq){ sentList.remove(0); @@ -126,7 +140,7 @@ // select loop で送るしかない。 REPCommand cmd; if (writeQueue.size()>0) { - cmd = writeQueue.remove(0); + cmd = new REPCommand(writeQueue.remove(0)); ns.writeLog("SessionManager write to "+myChannel+" cmd="+cmd); myChannel.write(cmd); return true; @@ -199,5 +213,8 @@ public void setSID(int sessionID) { sid = sessionID; } + public int seq() { + return seq++; + } } diff -r f39a8045175d -r 0585fd2410b8 rep/REP.java --- a/rep/REP.java Sun Oct 05 11:39:18 2008 +0900 +++ b/rep/REP.java Sun Oct 05 22:36:24 2008 +0900 @@ -44,6 +44,7 @@ SMCMD_QUIT_2 ( 78), + SM_EID ( -1), MERGE_EID ( -2), SMCMD_CH_MASTER ( 79), SMCMD_UPDATE_UP ( 80), diff -r f39a8045175d -r 0585fd2410b8 rep/SessionManager.java --- a/rep/SessionManager.java Sun Oct 05 11:39:18 2008 +0900 +++ b/rep/SessionManager.java Sun Oct 05 22:36:24 2008 +0900 @@ -89,29 +89,44 @@ } + + /* + * We wrote everything in one thread, but we can assign + * one thread for each communication channel and GUI event. + */ public void mainLoop() throws IOException { while(true){ - SessionManagerEvent e; - while((e = waitingEventQueue.poll())!=null){ - e.exec(); - } - for(Session s:sessionList) { - for(Editor editor: s.getEditorList()) - if (editor.doWaitingWrite()) break; + if (checkInputEvent() || + checkWaitingWrite() || + checkWaitingCommandInMerge()) { + // try to do fair execution for waiting task + if(selector.selectNow() > 0) select(); + continue; } - // if there are waiting command during merge operation, do process it - if(checkWaitingCommandInMerge()){ - if(selector.selectNow() > 0){ - select(); - } - continue; - } + // now we can wait for input packet or event selector.select(); select(); } } + private boolean checkInputEvent() { + SessionManagerEvent e; + if((e = waitingEventQueue.poll())!=null){ + e.exec(); + return true; + } + return false; + } + + private boolean checkWaitingWrite() throws IOException { + for(Session s:sessionList) { + for(Editor editor: s.getEditorList()) + if (editor.doWaitingWrite()) return true; + } + return false; + } + /** * Check waiting command in merge * @return true if there is a processed waiting command @@ -387,28 +402,36 @@ case REPCMD_INSERT: case REPCMD_NOP: { - //sid から Session を取得 + // sid から Session を取得 Session session = getSession(receivedCommand.sid); if (session==null) throw new IOException(); - //マージの処理と次のエディタへコマンドを送信する処理 + // 次のエディタへコマンドを送信する処理 Editor editor = session.getEditor(channel); boolean old = editor.isMerging(); session.translate(channel, receivedCommand); - boolean newState = editor.isMerging(); - if (old!=newState) { - // prevEditor なのは変だと思うが... + if(editor.isMerging()!=old){ + assert(old==false); + REPCommand mergeEnd = new REPCommand(REP.SMCMD_END_MERGE,receivedCommand.sid,editor.getEID(),editor.seq(),0,""); + editor.send(mergeEnd); Editor prevEditor = session.getPrevEditor(editor); + setNormalState(prevEditor.getChannel(), session.getSID()); + } + break; + } + case SMCMD_START_MERGE_ACK: + { + // sid から Session を取得 + Session session = getSession(receivedCommand.sid); + if (session==null) throw new IOException(); + // マージの処理と次のエディタへコマンドを送信する処理 + Editor editor = session.getEditor(channel); + if (editor.merge(receivedCommand)) { //マージ中のエディタはコマンドを受け取らない - // この代入は状態が変わったときだけ行えば良い。毎回、new するのは変 - if(editor.isMerging()){ - //Handlerを切り替える - setMergeState(prevEditor.getChannel(), session.getSID()); - }else { - setNormalState(prevEditor.getChannel(), session.getSID()); - } + Editor prevEditor = session.getPrevEditor(editor); + setMergeState(prevEditor.getChannel(), session.getSID()); } + break; } - break; case SMCMD_QUIT: { Session session = getSession(receivedCommand.sid); @@ -459,16 +482,7 @@ return null; } - public Editor getEditor(REPSocketChannel channel){ - for(Editor editor : editorList){ - if(editor.getChannel() == channel){ - return editor; - } - } - return null; - } - - private Session getSession(int sid) { + public Session getSession(int sid) { for(Session session : sessionList){ if(session.getSID() == sid) return session; } diff -r f39a8045175d -r 0585fd2410b8 rep/handler/REPHandlerInMerge.java --- a/rep/handler/REPHandlerInMerge.java Sun Oct 05 11:39:18 2008 +0900 +++ b/rep/handler/REPHandlerInMerge.java Sun Oct 05 22:36:24 2008 +0900 @@ -3,6 +3,7 @@ import java.io.IOException; import rep.Editor; import rep.REPCommand; +import rep.Session; import rep.SessionManager; import rep.channel.REPSelectionKey; import rep.channel.REPSocketChannel; @@ -24,7 +25,8 @@ // if (manager.isMerging(command.sid()))... // 同じchannelで、merge中のsessionは一つは限らない。 // なので、sid をinstanceで持つのではだめ。 - Editor editor = manager.getEditor(channel); + Session s = manager.getSession(command.sid); + Editor editor = s.getEditor(channel); manager.addWaitingCommand(new PacketSet(channel, editor, command)); } diff -r f39a8045175d -r 0585fd2410b8 rep/translater/Translater.java --- a/rep/translater/Translater.java Sun Oct 05 11:39:18 2008 +0900 +++ b/rep/translater/Translater.java Sun Oct 05 22:36:24 2008 +0900 @@ -18,7 +18,7 @@ * but in this case, you can use also transReceiveCmd() * @param command which the editor sent. */ - abstract public ArrayList catchOwnCommand(REPCommand cmd); + abstract public ArrayList catchOwnCommand(); /** * Translate Command cmd that was received from SeMa. diff -r f39a8045175d -r 0585fd2410b8 rep/translater/TranslaterImp1.java --- a/rep/translater/TranslaterImp1.java Sun Oct 05 11:39:18 2008 +0900 +++ b/rep/translater/TranslaterImp1.java Sun Oct 05 22:36:24 2008 +0900 @@ -53,14 +53,10 @@ * Dequeue command cmd that was returned. * @param cmd */ - public ArrayList catchOwnCommand(REPCommand cmd){ + public ArrayList catchOwnCommand(){ ArrayList returnCmds = new ArrayList(); ArrayList cmds = new ArrayList(); - // ringである以上、戻ってきたコマンドは確実にキューsentCmdsの先頭にある事を期待している - REPCommand tmp = sentCmds.poll(); - assert tmp.seq==cmd.seq; - assert cmd.eid==eid; - + prev(); //スタック上にあるコマンドを全部undoコマンドにする while ( !unMergedCmds.isEmpty() ){ REPCommand cmd0 = unMergedCmds.pop(); @@ -81,6 +77,10 @@ return returnCmds; } + public REPCommand prev() { + return sentCmds.poll(); + } + private REPCommand createUndo(REPCommand cmd){ String str = new String(cmd.string); REPCommand retCmd = new REPCommand(cmd.cmd, cmd.sid, cmd.eid, cmd.seq, cmd.lineno, str); diff -r f39a8045175d -r 0585fd2410b8 test/sematest/TestEditor.java --- a/test/sematest/TestEditor.java Sun Oct 05 11:39:18 2008 +0900 +++ b/test/sematest/TestEditor.java Sun Oct 05 22:36:24 2008 +0900 @@ -26,6 +26,7 @@ private InetSocketAddress semaIP; private REPLogger ns; private int seq = 0; + private int prevSeq = 0; public Text text; public LinkedList cmds; private int eid = 0; @@ -34,7 +35,6 @@ boolean running = true; long timeout = 1; private String name; - private REPCommand nop = new REPCommand(REP.REPCMD_NOP,0,0,0,0,""); private boolean inputLock=false; public boolean detached=false; public boolean master=false; @@ -179,6 +179,7 @@ case SMCMD_JOIN: case SMCMD_PUT: sendCommand(cmd,seq++); + prevSeq = seq; /* * To prevent confusion, stop user input until the ack */ @@ -194,7 +195,8 @@ } - private void sendCommand(REPCommand cmd,int seq) { + private void sendCommand(REPCommand cmd1,int seq) { + REPCommand cmd = new REPCommand(cmd1); cmd.setSEQID(seq); cmd.setEID(eid); cmd.setSID(sid); @@ -208,29 +210,32 @@ switch(cmd.cmd) { case REPCMD_INSERT : text.insert(cmd.lineno, cmd.string); + if (cmd.eid==REP.MERGE_EID.id) break; + addNop(); sendCommand(cmd,cmd.seq); - // sendCommand(nop); session manager do this for me break; case REPCMD_INSERT_ACK : assert(false); break; case REPCMD_DELETE : String del = text.delete(cmd.lineno); + if (cmd.eid==REP.MERGE_EID.id) break; + addNop(); cmd.setString(del); sendCommand(cmd,cmd.seq); - // sendCommand(nop); session manager do this for me break; case REPCMD_DELETE_ACK : assert(false); break; - case REPCMD_CLOSE : + case REPCMD_NOP : + if (cmd.eid==REP.MERGE_EID.id) break; + addNop(); + sendCommand(cmd,cmd.seq); + break; case REPCMD_CLOSE : case REPCMD_CLOSE_2 : assert(false); break; - case REPCMD_NOP : - sendCommand(cmd,cmd.seq); - sendCommand(nop,seq++); - break; + case SMCMD_JOIN_ACK : sid = cmd.sid; eid = cmd.eid; @@ -283,4 +288,12 @@ break; } } + + private void addNop() { + if (seq!=prevSeq) return; + // We haven't send any command, add nop before retransmition. + REPCommand nop = new REPCommand(REP.REPCMD_NOP, sid, eid, seq, 0, ""); + sendCommand(nop,seq++); + prevSeq = seq; + } } diff -r f39a8045175d -r 0585fd2410b8 test/sematest/TestSessionManager.java --- a/test/sematest/TestSessionManager.java Sun Oct 05 11:39:18 2008 +0900 +++ b/test/sematest/TestSessionManager.java Sun Oct 05 22:36:24 2008 +0900 @@ -80,11 +80,12 @@ if (i