Mercurial > hg > RemoteEditor > REPSessionManager
comparison rep/handler/Editor.java @ 460:ef70109af810
self writeQueue and waitingQueue
author | one |
---|---|
date | Fri, 24 Sep 2010 17:05:19 +0900 |
parents | 66c4f6b29baf |
children | e7eeb8be0de1 |
comparison
equal
deleted
inserted
replaced
459:66c4f6b29baf | 460:ef70109af810 |
---|---|
16 public class Editor extends Forwarder { | 16 public class Editor extends Forwarder { |
17 | 17 |
18 private Translator translator; | 18 private Translator translator; |
19 // REPCommands we are going to send to the next editor | 19 // REPCommands we are going to send to the next editor |
20 private LinkedList<REPCommand> sentList = new LinkedList<REPCommand>(); | 20 private LinkedList<REPCommand> sentList = new LinkedList<REPCommand>(); |
21 public LinkedList<PacketSet> waitingCommandInMerge= new LinkedList<PacketSet>(); | 21 public LinkedList<REPCommand> waitingCommandInMerge= new LinkedList<REPCommand>(); |
22 private REPCommand quit2=null; | 22 private REPCommand quit2=null; |
23 private REPCommand preMergeCommand; | 23 private REPCommand preMergeCommand; |
24 private boolean merging; | 24 private boolean merging; |
25 private REPCommand mergeMark = new REPCommand(REP.SMCMD_START_MERGE, 0,0, 0, 0, ""); | 25 private REPCommand mergeMark = new REPCommand(REP.SMCMD_START_MERGE, 0,0, 0, 0, ""); |
26 public static boolean noMergeMode=false; | 26 public static boolean noMergeMode=false; |
27 static final boolean doOptimize = false; | 27 static final boolean doOptimize = false; |
28 private Forwarder toEditor; | 28 private Forwarder toEditor; |
29 private LinkedList<REPCommand> writeQueue = new LinkedList<REPCommand>(); | |
29 | 30 |
30 public Editor(SessionManager manager,int editorNo){ | 31 public Editor(SessionManager manager,int editorNo){ |
31 // no translator case | 32 // no translator case |
32 super(manager, null); | 33 super(manager, null); |
33 } | 34 } |
160 // } | 161 // } |
161 // } | 162 // } |
162 // assert(count==0); | 163 // assert(count==0); |
163 // } | 164 // } |
164 | 165 |
165 private boolean waitingRequired(REPCommand command, REPSocketChannel<REPCommand> channel) { | 166 /** |
166 if (hasWaitingCommand()) { | 167 * Sending to Editor and waiting Queue |
167 // We cannot do this operation before watingCommandQueue. | 168 * +--------+ |
168 addWaitingCommand(new PacketSet(this, new REPCommand(command))); | 169 * send() --> write() -> | Editor | -> handle() -> manager() |
169 return true; | 170 * +--------+ |
170 } else if (isMerging()) { | 171 * waitingQueue |
171 addWaitingCommand(new PacketSet(this, new REPCommand(command))); | 172 * writeQueue |
172 return true; | 173 * |
173 } | 174 * send() は、他のEditor Node から呼ばれる |
174 //ServerMainLoop.logger.writeLog("Editor eid:"+eid+" no waiting"); | 175 * write() は、内部で優先的に送信するのに用いる |
175 return false; | 176 * writeQueue は、waitingQueue よりも常に先に実行される必要がある |
176 } | 177 |
177 | 178 * Manageの送信キューはここでは使わない |
178 public void addWaitingCommand(PacketSet set) { | 179 * send() manage |
179 // if (preMergeCommand!=null) { | 180 */ |
180 // if (preMergeCommand.eid==set.command.eid | 181 @Override |
181 // && preMergeCommand.seq==set.command.seq) { | 182 public void send(REPCommand command) { |
182 // assert(false); | 183 if (merging || isMerging() || waitingCommandInMerge.size()>0) { |
183 // } | 184 waitingCommandInMerge.addLast(command); |
184 // } | 185 ServerMainLoop.logger.writeLog("Editor eid:"+eid+" waitingCommandInMerge = "+waitingCommandInMerge); |
185 waitingCommandInMerge.addLast(set); | 186 return; |
186 ServerMainLoop.logger.writeLog("Editor eid:"+eid+" waitingCommandInMerge = "+waitingCommandInMerge); | 187 } |
187 } | 188 if (isMergeCommand(command)) { |
188 | 189 merging = true; |
190 ServerMainLoop.logger.writeLog("Editor"+eid+": merging=true (send)"+command); | |
191 } | |
192 writeQueue.add(command); | |
193 } | |
194 | |
195 /** | |
196 * Check waiting command in merge | |
197 * periodically called from manager | |
198 */ | |
199 public void checkWaitingCommandInMerge() { | |
200 if (writeQueue.size()>0) { | |
201 REPCommand command =writeQueue.pollFirst(); | |
202 ServerMainLoop.logger.writeLog("Editor"+eid+": write comand="+command); | |
203 super.write(command); | |
204 return; | |
205 } | |
206 if (translator==null || merging || isMerging()) return; | |
207 if (waitingCommandInMerge.size()>0) { | |
208 REPCommand command = waitingCommandInMerge.pollFirst(); | |
209 ServerMainLoop.logger.writeLog("Editor"+eid+": send waiting comand="+command); | |
210 super.write(command); | |
211 if (isMergeCommand(command)) { | |
212 merging = true; | |
213 } | |
214 } | |
215 } | |
189 /** | 216 /** |
190 * 他のエディタへのコマンドの送信 | 217 * 他のエディタへのコマンドの送信 |
191 * @param command | 218 * @param command |
192 * | 219 * |
193 * sendList にキープする必要がある。 | 220 * sendList にキープする必要がある。 |
325 sentList.addLast(mergeMark); | 352 sentList.addLast(mergeMark); |
326 preMergeCommand = null; | 353 preMergeCommand = null; |
327 } | 354 } |
328 | 355 |
329 private boolean checkQuit() { | 356 private boolean checkQuit() { |
330 if (quit2!=null && sentList.size()==1&&!isMerging() && !hasWaitingCommand()) { | 357 if (quit2!=null && sentList.size()==1&&!isMerging() && waitingCommandInMerge.size()==0) { |
331 sendToEditor(quit2); | 358 sendToEditor(quit2); |
332 quit2 = null; | 359 quit2 = null; |
333 return true; | 360 return true; |
334 } | 361 } |
335 return false; | 362 return false; |
402 return false; | 429 return false; |
403 } | 430 } |
404 return true; | 431 return true; |
405 } | 432 } |
406 | 433 |
407 /** | |
408 * write command to the editor | |
409 * called from another Editor instance such as next.send(command) | |
410 */ | |
411 @Override | |
412 public void write(REPCommand command) { | |
413 if (merging || waitingCommandInMerge.size()>0) { | |
414 addWaitingCommand(new PacketSet(this, new REPCommand(command))); | |
415 return; | |
416 } | |
417 if (!waitingRequired(command,channel)) { | |
418 if (isMergeCommand(command)) { | |
419 merging = true; | |
420 ServerMainLoop.logger.writeLog("Editor"+eid+": merging=true "+command); | |
421 } | |
422 super.write(command); | |
423 } | |
424 } | |
425 | 434 |
426 private boolean isMergeCommand(REPCommand command) { | 435 private boolean isMergeCommand(REPCommand command) { |
427 switch(command.cmd) { | 436 switch(command.cmd) { |
428 case REPCMD_INSERT: case REPCMD_DELETE: | 437 case REPCMD_INSERT: case REPCMD_DELETE: |
429 return command.eid==eid; | 438 return command.eid==eid; |
432 } | 441 } |
433 return false; | 442 return false; |
434 } | 443 } |
435 | 444 |
436 public void sendToEditor(REPCommand command) { | 445 public void sendToEditor(REPCommand command) { |
437 toEditor.send(command); | 446 writeQueue.add(command); |
438 } | 447 } |
439 | 448 |
440 @Override | 449 @Override |
441 public void handle(REPCommand command, REPSelectionKey<REPCommand> key) throws IOException { | 450 public void handle(REPCommand command, REPSelectionKey<REPCommand> key) throws IOException { |
442 //ServerMainLoop.logger.writeLog("Manager "+manager.getId()+" read : command = " + command | 451 //ServerMainLoop.logger.writeLog("Manager "+manager.getId()+" read : command = " + command |
478 return; | 487 return; |
479 } | 488 } |
480 send(command); | 489 send(command); |
481 } | 490 } |
482 | 491 |
483 /** | 492 |
484 * Check waiting command in merge | |
485 * @return true if there is a processed waiting command | |
486 * @throws IOException | |
487 */ | |
488 public void checkWaitingCommandInMerge() { | |
489 if (translator==null||isMerging()) return; | |
490 while(waitingCommandInMerge.size()>0) { | |
491 if (merging||isMerging()) return; | |
492 // to preserve command order, move all elements from manager's writing queue | |
493 manager.getWriteQueue(this); | |
494 PacketSet p = waitingCommandInMerge.remove(0); | |
495 REPCommand command = p.command; | |
496 try { | |
497 ServerMainLoop.logger.writeLog("Editor"+eid+": resend after merge comand="+command); | |
498 toEditor.send(command); | |
499 if (isMergeCommand(command)) { | |
500 merging = true; | |
501 return; | |
502 } | |
503 } catch (Exception e1) { | |
504 assert false; | |
505 manager.close(p.channel.channel); | |
506 return; | |
507 } | |
508 } | |
509 } | |
510 | |
511 | |
512 public boolean hasWaitingCommand() { | |
513 return waitingCommandInMerge.size()>0; | |
514 } | |
515 } | 493 } |