comparison rep/handler/Editor.java @ 399:19705f4b8015

waitingCommandInMerge
author one
date Mon, 24 Nov 2008 23:11:51 +0900
parents 7de83b6a34e7
children 29f01a7ce71f
comparison
equal deleted inserted replaced
398:7de83b6a34e7 399:19705f4b8015
16 public class Editor extends Forwarder { 16 public class Editor extends Forwarder {
17 17
18 private Translator translator; 18 private Translator translator;
19 // REPCommands we are going to send to the next editor 19 // REPCommands we are going to send to the next editor
20 private List<REPCommand> sentList = new LinkedList<REPCommand>(); 20 private List<REPCommand> sentList = new LinkedList<REPCommand>();
21 protected List<PacketSet> waitingCommandInMerge= new LinkedList<PacketSet>();
21 private REPCommand quit2=null; 22 private REPCommand quit2=null;
22 private boolean merging; 23 private boolean merging;
23 private REPCommand preMergeCommand; 24 private REPCommand preMergeCommand;
24 public static boolean noMergeMode=false; 25 public static boolean noMergeMode=false;
25 static final boolean doOptimize = true; 26 static final boolean doOptimize = true;
72 if (next.isDirect()) { 73 if (next.isDirect()) {
73 REPCommand keep; 74 REPCommand keep;
74 if (waitingRequired(command)) return; 75 if (waitingRequired(command)) return;
75 keep = new REPCommand(command); 76 keep = new REPCommand(command);
76 sentList.add(keep); 77 sentList.add(keep);
77 ServerMainLoop.logger.writeLog("Editor eid:"+eid+" sentList = "+sentList); 78 checkDouble(sentList);
78 ((Editor) next).checkReturnedCommand(command); 79 next.forwardedCommandManage(command);
80 // ((Editor) next).checkReturnedCommand(command);
79 } else 81 } else
80 next.send(command); 82 next.send(command);
81 } else { 83 } else {
82 //他のエディタからの編集コマンド 84 //他のエディタからの編集コマンド
83 if (waitingRequired(command)) return; 85 if (waitingRequired(command)) return;
89 sendEditorCommand(command); 91 sendEditorCommand(command);
90 } 92 }
91 return; 93 return;
92 } 94 }
93 95
96 private void checkDouble(List<REPCommand> sentList) {
97 if (sentList.size()==0) return;
98 int count = 0;
99 REPCommand f = sentList.get(0);
100 for(REPCommand c:sentList) {
101 if (c.eid==f.eid&&c.seq==f.seq) {
102 count++;
103 }
104 }
105 assert(count==1);
106 }
107
94 private boolean waitingRequired(REPCommand command) { 108 private boolean waitingRequired(REPCommand command) {
95 if (manager.hasWaitingCommand(channel)) { 109 if (hasWaitingCommand()) {
96 // We cannot do this operation before watingCommandQueue. 110 // We cannot do this operation before watingCommandQueue.
97 manager.addWaitingCommand(new PacketSet(channel, this, command)); 111 addWaitingCommand(new PacketSet(channel, this, command));
98 return true; 112 return true;
99 } else if (isMerging()) { 113 } else if (isMerging()) {
100 manager.addWaitingCommand(new PacketSet(getChannel(), this, new REPCommand(command))); 114 addWaitingCommand(new PacketSet(getChannel(), this, new REPCommand(command)));
101 return true; 115 return true;
102 } 116 }
103 ServerMainLoop.logger.writeLog("Editor eid:"+eid+" no waiting"); 117 ServerMainLoop.logger.writeLog("Editor eid:"+eid+" no waiting");
104 return false; 118 return false;
105 } 119 }
106 120
121 public void addWaitingCommand(PacketSet set) {
122 waitingCommandInMerge.add(set);
123 }
124
107 private void sendEditorCommand(REPCommand command) { 125 private void sendEditorCommand(REPCommand command) {
108 REPCommand keep = new REPCommand(command); 126 REPCommand keep = new REPCommand(command);
109 sentList.add(keep); 127 sentList.add(keep);
110 ServerMainLoop.logger.writeLog("Editor eid:"+eid+" sentList = "+sentList); 128 ServerMainLoop.logger.writeLog("Editor eid:"+eid+" sentList = "+sentList);
129 checkDouble(sentList);
111 assert(sentList.size()<limit); 130 assert(sentList.size()<limit);
112 next.send(command); 131 next.send(command);
113 } 132 }
114 133
115 boolean merge(REPCommand command) { 134 boolean merge(REPCommand command) {
120 /** 139 /**
121 * 一周して来たcommandの処理。 140 * 一周して来たcommandの処理。
122 * @param command 141 * @param command
123 */ 142 */
124 void checkReturnedCommand(REPCommand command) { 143 void checkReturnedCommand(REPCommand command) {
144 assert(!merging);
145 if (sentList.size()==0) {
146 ServerMainLoop.logger.writeLog("Editor eid="+eid+" looped command not registered: "+command);
147 assert(false);
148 }
125 REPCommand prev = sentList.remove(0); 149 REPCommand prev = sentList.remove(0);
126 ServerMainLoop.logger.writeLog("Editor eid="+eid+" remove sentList:"+(prev==null?"null":prev)); 150 ServerMainLoop.logger.writeLog("Editor eid="+eid+" remove sentList:"+(prev==null?"null":prev));
127 if (prev==null || prev.seq != command.seq || prev.eid!=command.eid) { 151 if (prev==null || prev.seq != command.seq || prev.eid!=command.eid) {
128 String err = "Editor eid="+eid+" checkReturnedCommand() : command = " + command + " prev="+ 152 String err = "Editor eid="+eid+" checkReturnedCommand() : command = " + command + " prev="+
129 (prev==null?"null":prev)+" sentList="; 153 (prev==null?"null":prev)+" sentList=";
199 case REPCMD_DELETE: keep.cmd = REP.REPCMD_DELETE_ACK;break; 223 case REPCMD_DELETE: keep.cmd = REP.REPCMD_DELETE_ACK;break;
200 } 224 }
201 sentList.add(keep); 225 sentList.add(keep);
202 ServerMainLoop.logger.writeLog("Editor eid:"+eid+" sentList = "+sentList); 226 ServerMainLoop.logger.writeLog("Editor eid:"+eid+" sentList = "+sentList);
203 assert(sentList.size()<limit); 227 assert(sentList.size()<limit);
228 checkDouble(sentList);
204 next.send(keep); 229 next.send(keep);
205 } else { 230 } else {
206 next.send(preMergeCommand); 231 next.send(preMergeCommand);
207 } 232 }
208 preMergeCommand = null; 233 preMergeCommand = null;
316 /* Handle special case first, usually these cases 341 /* Handle special case first, usually these cases
317 * are handled in the next Editor in a session manager, but 342 * are handled in the next Editor in a session manager, but
318 * it is forwarded here. 343 * it is forwarded here.
319 */ 344 */
320 public void forwardedCommandManage(REPCommand command) { 345 public void forwardedCommandManage(REPCommand command) {
346 if (waitingRequired(command)) return;
321 if (command.cmd==REP.SMCMD_QUIT_2) { 347 if (command.cmd==REP.SMCMD_QUIT_2) {
322 // we have to wait next editor's finishing before sending this. 348 // we have to wait next editor's finishing before sending this.
323 // this is odd, but the editor itself does not know it's merging 349 // this is odd, but the editor itself does not know it's merging
324 // state. Only this session manager knows it. 350 // state. Only this session manager knows it.
325 setQuit2(command); 351 setQuit2(command);
326 } else if (command.eid==eid) { 352 } else if (command.eid==eid) {
327 // if we handle in editor.manage(), this editor cannot distinguish this 353 // if we handle in editor.manage(), this editor cannot distinguish this
328 // and user input command from the editor. 354 // and user input command from the editor.
329 REPCommand keep; 355 // REPCommand keep;
330 switch(command.cmd) { 356 switch(command.cmd) {
357 case REPCMD_INSERT:
358 case REPCMD_DELETE:
359 // keep = new REPCommand(command);
360 // sentList.add(keep);
361 // ServerMainLoop.logger.writeLog("Editor eid:"+eid+" sentList = "+sentList);
362 // checkDouble(sentList);
363 case REPCMD_INSERT_ACK:
331 case REPCMD_DELETE_ACK: 364 case REPCMD_DELETE_ACK:
332 case REPCMD_INSERT_ACK:
333 checkReturnedCommand(command);
334 return ;
335 case REPCMD_INSERT:
336 keep = new REPCommand(command);
337 keep.cmd = REP.REPCMD_INSERT_ACK;
338 sentList.add(keep);
339 checkReturnedCommand(command); 365 checkReturnedCommand(command);
340 return; 366 return;
341 case REPCMD_DELETE:
342 keep = new REPCommand(command);
343 keep.cmd = REP.REPCMD_DELETE_ACK;
344 sentList.add(keep);
345 checkReturnedCommand(command);
346 return;
347 } 367 }
348 } 368 }
349 send(command); 369 send(command);
350 } 370 }
351 371
372 /**
373 * Check waiting command in merge
374 * @return true if there is a processed waiting command
375 * @throws IOException
376 */
377 public void checkWaitingCommandInMerge() {
378 int count = waitingCommandInMerge.size();
379 if (count==0) return;
380 if (isMerging()) return;
381 while(count++>0) {
382 PacketSet p = waitingCommandInMerge.remove(0);
383 try {
384 // if (manager.sessionManage(e, p.command)) { // we don't need this
385 // assert false;
386 // return;
387 // }
388 manage(p.command);
389 } catch (Exception e1) {
390 // should be e.close()?
391 manager.close(p.channel);
392 }
393 }
394 }
395
396
397 public boolean hasWaitingCommand() {
398 return waitingCommandInMerge.size()>0;
399 }
352 } 400 }