# HG changeset patch # User one # Date 1285233337 -32400 # Node ID 21cb16b7f3df897d9defe5424bdd31ba335a5f61 # Parent 89a326696c544ffb7cfcdc42b63d991b513b296f block message in Editor.write() diff -r 89a326696c54 -r 21cb16b7f3df Todo --- a/Todo Wed Sep 22 22:11:58 2010 +0900 +++ b/Todo Thu Sep 23 18:15:37 2010 +0900 @@ -1,3 +1,45 @@ +Thu Sep 23 14:57:57 JST 2010 + +やっぱり、send が + + Editor Object から Editor へのsend + 他の Editor Object から Editor へのsend + +の二つに使われているのはダメだよ。片方をブロックしたい時があるのだから。 + +sendNext で分割してブロックはできた。問題は、途中で送られたものをどう +処理するかだが〜 + +うーん、すでにSession Manager の送信キューに入っているので、 +blocking が効かないようだ。 + +そういうわけなので、受け側でなんとかした方が良いみたい。 +可能なの? いや、無理だろうな。 + + REPNode.send 他のところからの送信 + REPNode.write Serverの送信ループ + +なので、Editor.write() で捕まえるか。 + +Thu Sep 23 12:13:19 JST 2010 + +START_MERGE から START_MERGE_ACK までにEditorから送られたコマンドは、 +sentList に付け加えるべきでは? + +しかし、その間、外部からEditorに送るコマンドは止める必要がある。 + + Editor.merging ... START_MERGE ... START_MERGE_ACK ... END_MERGE + Translaotr.merging ... START_MERGE_ACK ... END_MERGE + +と言うように区別するか。 + +START_MERGE は、 Editor から返って来るタイミングでブロックするので、 +その段階で、Editor へ送られているコマンドをブロックできない。 +もちろん、Editor からのUSER_INPUTもブロックできない。 + +Editorの undoが正しくなくなるだけでなく、 +Merge phase のコマンドが二つ続けて送られるのはよろしくない。 + Wed Sep 22 19:59:26 JST 2010 NOPを廻す方式とAckを廻す方式は、 diff -r 89a326696c54 -r 21cb16b7f3df rep/PacketSet.java --- a/rep/PacketSet.java Wed Sep 22 22:11:58 2010 +0900 +++ b/rep/PacketSet.java Thu Sep 23 18:15:37 2010 +0900 @@ -1,26 +1,23 @@ package rep; -import rep.channel.REPSocketChannel; import rep.handler.REPNode; public class PacketSet { - public REPSocketChannel channel; - public REPNode editor; + public REPNode channel; public REPCommand command; - public PacketSet(REPSocketChannel channel, REPNode editor, REPCommand command) { - this.channel = channel; - this.editor = editor; + public PacketSet(REPNode editor, REPCommand command) { + this.channel = editor; this.command = command; } public REPNode getEditor() { - return editor; + return channel; } public String toString() { - return "PacketSet("+command.toString()+","+editor+")"; + return "PacketSet("+command.toString()+","+channel+")"; } } diff -r 89a326696c54 -r 21cb16b7f3df rep/ServerMainLoop.java --- a/rep/ServerMainLoop.java Wed Sep 22 22:11:58 2010 +0900 +++ b/rep/ServerMainLoop.java Thu Sep 23 18:15:37 2010 +0900 @@ -128,7 +128,7 @@ private void sendLog(PacketSet p) { REPNode to; String s; - to = manager.editorList.editorByChannel(p.channel); + to = manager.editorList.editorByChannel(p.channel.channel); if (to==null) s = p.channel.toString(); else diff -r 89a326696c54 -r 21cb16b7f3df rep/handler/Editor.java --- a/rep/handler/Editor.java Wed Sep 22 22:11:58 2010 +0900 +++ b/rep/handler/Editor.java Thu Sep 23 18:15:37 2010 +0900 @@ -165,10 +165,10 @@ private boolean waitingRequired(REPCommand command, REPSocketChannel channel) { if (hasWaitingCommand()) { // We cannot do this operation before watingCommandQueue. - addWaitingCommand(new PacketSet(channel, this, new REPCommand(command))); + addWaitingCommand(new PacketSet(this, new REPCommand(command))); return true; } else if (isMerging()) { - addWaitingCommand(new PacketSet(channel, this, new REPCommand(command))); + addWaitingCommand(new PacketSet(this, new REPCommand(command))); return true; } //ServerMainLoop.logger.writeLog("Editor eid:"+eid+" no waiting"); @@ -231,7 +231,7 @@ } private boolean checkAck(REPCommand command) { - assert(!merging); + assert(!isMerging()); REPCommand prev; if (sentList.getFirst()==mergeMark) prev=sentList.remove(1); else prev=sentList.remove(0); if (prev==null || prev.seq != command.seq || prev.eid!=command.eid) { @@ -256,7 +256,7 @@ // START_MERGE を送る // 送らないで良い場合もある? REPCommand cmd = new REPCommand(REP.SMCMD_START_MERGE,command.sid,REP.SM_EID.id,seq(),0,""); - send(cmd); + sendToEditor(cmd); merging = true; // Session Manager 側で、このeditorへの他のeditorからの // 入力を止めて、merge にそなえる。merge は、eidtor 側から @@ -301,7 +301,7 @@ private void endMerge() { translator.endMerge(); REPCommand mergeEnd = new REPCommand(REP.SMCMD_END_MERGE,sid,eid,seq(),0,""); - send(mergeEnd); + sendToEditor(mergeEnd); if (preMergeCommand.eid==eid) { // Ackの場合はcheckAck() で既にremoveされている if (sentList.getFirst()==mergeMark) sentList.remove(1); else sentList.remove(0); @@ -326,7 +326,7 @@ private boolean checkQuit() { if (quit2!=null && sentList.size()==1&&!isMerging()) { - send(quit2); + sendToEditor(quit2); quit2 = null; return true; } @@ -364,7 +364,7 @@ case SMCMD_SYNC: if (isMaster()) - send(command); + sendToEditor(command); else next.send(command); @@ -403,18 +403,33 @@ } /** - * send command to the editor + * write command to the editor * called from another Editor instance such as next.send(command) */ @Override - public void send(REPCommand command) { - if (command.eid == REP.MERGE_EID.id || - command.cmd==REP.SMCMD_END_MERGE || - !waitingRequired(command,channel)) { - super.send(command); + public void write(REPCommand command) { + if ( !waitingRequired(command,channel)) { + if (isMergeCommand(command)) { + merging = true; + } + super.write(command); } } + private boolean isMergeCommand(REPCommand command) { + switch(command.cmd) { + case REPCMD_INSERT: case REPCMD_DELETE: + return command.eid==eid; + case REPCMD_INSERT_ACK: case REPCMD_DELETE_ACK: + return command.eid!=eid; + } + return false; + } + + public void sendToEditor(REPCommand command) { + super.write(command); + } + @Override public void handle(REPCommand command, REPSelectionKey key) throws IOException { //ServerMainLoop.logger.writeLog("Manager "+manager.getId()+" read : command = " + command @@ -476,12 +491,12 @@ PacketSet p = w.remove(0); try { if (p.channel!=null) - send(p.command); + write(p.command); else manage(p.command); } catch (Exception e1) { assert false; - manager.close(p.channel); + manager.close(p.channel.channel); return; } } diff -r 89a326696c54 -r 21cb16b7f3df rep/handler/FirstConnector.java --- a/rep/handler/FirstConnector.java Wed Sep 22 22:11:58 2010 +0900 +++ b/rep/handler/FirstConnector.java Thu Sep 23 18:15:37 2010 +0900 @@ -98,6 +98,6 @@ assert(command!=null && command.cmd==REP.SMCMD_SM_JOIN); assert(channel!=null); REPCommand c = new REPCommand(command); - manager.addWriteQueue(new PacketSet(channel,null, c)); + manager.addWriteQueue(new PacketSet(this, c)); } } diff -r 89a326696c54 -r 21cb16b7f3df rep/handler/Forwarder.java --- a/rep/handler/Forwarder.java Wed Sep 22 22:11:58 2010 +0900 +++ b/rep/handler/Forwarder.java Thu Sep 23 18:15:37 2010 +0900 @@ -42,7 +42,7 @@ assert(command!=null); assert(channel!=null); REPCommand c = new REPCommand(command); - manager.addWriteQueue(new PacketSet(channel,null, c)); + manager.addWriteQueue(new PacketSet(this, c)); } public void sendWithSeq(REPCommand command) { @@ -50,7 +50,7 @@ assert(channel!=null); REPCommand c = new REPCommand(command); c.setSEQID(seq()); - manager.addWriteQueue(new PacketSet(channel,null, c)); + manager.addWriteQueue(new PacketSet(this, c)); } public REPSocketChannel getChannel() { @@ -151,6 +151,11 @@ return null; } + @Override + public void write(REPCommand command) { + channel.write(command); + } + diff -r 89a326696c54 -r 21cb16b7f3df rep/handler/REPNode.java --- a/rep/handler/REPNode.java Wed Sep 22 22:11:58 2010 +0900 +++ b/rep/handler/REPNode.java Thu Sep 23 18:15:37 2010 +0900 @@ -110,8 +110,17 @@ public abstract String getLocalHostName(); - + /** + * Send to the next REPNode (with possible blocking) + * @param command + */ public abstract void send(REPCommand command) ; + + /** + * write command to the socket channel + * @param command + */ + public abstract void write(REPCommand command) ; public abstract void sendWithSeq(REPCommand command) ; @@ -119,9 +128,6 @@ public abstract int seq() ; - public abstract boolean isMerging() ; - - public abstract boolean manage(REPCommand command) ; @@ -163,6 +169,10 @@ public abstract List getSentList() ; + public void sendToEditor(REPCommand m) { + send(m); + } + diff -r 89a326696c54 -r 21cb16b7f3df rep/handler/Translator.java --- a/rep/handler/Translator.java Wed Sep 22 22:11:58 2010 +0900 +++ b/rep/handler/Translator.java Thu Sep 23 18:15:37 2010 +0900 @@ -116,7 +116,7 @@ m.setEID(REP.MERGE_EID.id); m.setSEQID(editor.seq()); sentMergedList.addLast(m); - editor.send(m); + editor.sendToEditor(m); } logger.writeLog("OptimizedOutputMerge"+eid+":"+sentMergedList); merge_mode = true; diff -r 89a326696c54 -r 21cb16b7f3df test/mergertest/EditorSimulator.java --- a/test/mergertest/EditorSimulator.java Wed Sep 22 22:11:58 2010 +0900 +++ b/test/mergertest/EditorSimulator.java Thu Sep 23 18:15:37 2010 +0900 @@ -41,11 +41,6 @@ } - @Override - public boolean isMerging() { - // TODO Auto-generated method stub - return false; - } @Override public void joinAck(REPCommand sendCommand, int sid) { diff -r 89a326696c54 -r 21cb16b7f3df test/mergertest/EditorSimulatorImpl.java --- a/test/mergertest/EditorSimulatorImpl.java Wed Sep 22 22:11:58 2010 +0900 +++ b/test/mergertest/EditorSimulatorImpl.java Thu Sep 23 18:15:37 2010 +0900 @@ -28,4 +28,9 @@ return null; } + @Override + public void write(REPCommand command) { + + } + } diff -r 89a326696c54 -r 21cb16b7f3df test/mergertest/TestMerger.java --- a/test/mergertest/TestMerger.java Wed Sep 22 22:11:58 2010 +0900 +++ b/test/mergertest/TestMerger.java Thu Sep 23 18:15:37 2010 +0900 @@ -87,12 +87,6 @@ } @Override - public boolean isMerging() { - // TODO Auto-generated method stub - return false; - } - - @Override public void joinAck(REPCommand sendCommand, int sid) { // TODO Auto-generated method stub @@ -138,4 +132,10 @@ // TODO Auto-generated method stub return null; } + + @Override + public void write(REPCommand command) { + // TODO Auto-generated method stub + + } }