comparison rep/handler/Editor.java @ 460:ef70109af810

self writeQueue and waitingQueue
author one
date Fri, 24 Sep 2010 17:05:19 +0900
parents 66c4f6b29baf
children e7eeb8be0de1
comparison
equal deleted inserted replaced
459:66c4f6b29baf 460:ef70109af810
16 public class Editor extends Forwarder { 16 public class Editor extends Forwarder {
17 17
18 private Translator translator; 18 private Translator translator;
19 // REPCommands we are going to send to the next editor 19 // REPCommands we are going to send to the next editor
20 private LinkedList<REPCommand> sentList = new LinkedList<REPCommand>(); 20 private LinkedList<REPCommand> sentList = new LinkedList<REPCommand>();
21 public LinkedList<PacketSet> waitingCommandInMerge= new LinkedList<PacketSet>(); 21 public LinkedList<REPCommand> waitingCommandInMerge= new LinkedList<REPCommand>();
22 private REPCommand quit2=null; 22 private REPCommand quit2=null;
23 private REPCommand preMergeCommand; 23 private REPCommand preMergeCommand;
24 private boolean merging; 24 private boolean merging;
25 private REPCommand mergeMark = new REPCommand(REP.SMCMD_START_MERGE, 0,0, 0, 0, ""); 25 private REPCommand mergeMark = new REPCommand(REP.SMCMD_START_MERGE, 0,0, 0, 0, "");
26 public static boolean noMergeMode=false; 26 public static boolean noMergeMode=false;
27 static final boolean doOptimize = false; 27 static final boolean doOptimize = false;
28 private Forwarder toEditor; 28 private Forwarder toEditor;
29 private LinkedList<REPCommand> writeQueue = new LinkedList<REPCommand>();
29 30
30 public Editor(SessionManager manager,int editorNo){ 31 public Editor(SessionManager manager,int editorNo){
31 // no translator case 32 // no translator case
32 super(manager, null); 33 super(manager, null);
33 } 34 }
160 // } 161 // }
161 // } 162 // }
162 // assert(count==0); 163 // assert(count==0);
163 // } 164 // }
164 165
165 private boolean waitingRequired(REPCommand command, REPSocketChannel<REPCommand> channel) { 166 /**
166 if (hasWaitingCommand()) { 167 * Sending to Editor and waiting Queue
167 // We cannot do this operation before watingCommandQueue. 168 * +--------+
168 addWaitingCommand(new PacketSet(this, new REPCommand(command))); 169 * send() --> write() -> | Editor | -> handle() -> manager()
169 return true; 170 * +--------+
170 } else if (isMerging()) { 171 * waitingQueue
171 addWaitingCommand(new PacketSet(this, new REPCommand(command))); 172 * writeQueue
172 return true; 173 *
173 } 174 * send() は、他のEditor Node から呼ばれる
174 //ServerMainLoop.logger.writeLog("Editor eid:"+eid+" no waiting"); 175 * write() は、内部で優先的に送信するのに用いる
175 return false; 176 *     writeQueue は、waitingQueue よりも常に先に実行される必要がある
176 } 177
177 178 * Manageの送信キューはここでは使わない
178 public void addWaitingCommand(PacketSet set) { 179 * send() manage
179 // if (preMergeCommand!=null) { 180 */
180 // if (preMergeCommand.eid==set.command.eid 181 @Override
181 // && preMergeCommand.seq==set.command.seq) { 182 public void send(REPCommand command) {
182 // assert(false); 183 if (merging || isMerging() || waitingCommandInMerge.size()>0) {
183 // } 184 waitingCommandInMerge.addLast(command);
184 // } 185 ServerMainLoop.logger.writeLog("Editor eid:"+eid+" waitingCommandInMerge = "+waitingCommandInMerge);
185 waitingCommandInMerge.addLast(set); 186 return;
186 ServerMainLoop.logger.writeLog("Editor eid:"+eid+" waitingCommandInMerge = "+waitingCommandInMerge); 187 }
187 } 188 if (isMergeCommand(command)) {
188 189 merging = true;
190 ServerMainLoop.logger.writeLog("Editor"+eid+": merging=true (send)"+command);
191 }
192 writeQueue.add(command);
193 }
194
195 /**
196 * Check waiting command in merge
197 * periodically called from manager
198 */
199 public void checkWaitingCommandInMerge() {
200 if (writeQueue.size()>0) {
201 REPCommand command =writeQueue.pollFirst();
202 ServerMainLoop.logger.writeLog("Editor"+eid+": write comand="+command);
203 super.write(command);
204 return;
205 }
206 if (translator==null || merging || isMerging()) return;
207 if (waitingCommandInMerge.size()>0) {
208 REPCommand command = waitingCommandInMerge.pollFirst();
209 ServerMainLoop.logger.writeLog("Editor"+eid+": send waiting comand="+command);
210 super.write(command);
211 if (isMergeCommand(command)) {
212 merging = true;
213 }
214 }
215 }
189 /** 216 /**
190 * 他のエディタへのコマンドの送信 217 * 他のエディタへのコマンドの送信
191 * @param command 218 * @param command
192 * 219 *
193 * sendList にキープする必要がある。 220 * sendList にキープする必要がある。
325 sentList.addLast(mergeMark); 352 sentList.addLast(mergeMark);
326 preMergeCommand = null; 353 preMergeCommand = null;
327 } 354 }
328 355
329 private boolean checkQuit() { 356 private boolean checkQuit() {
330 if (quit2!=null && sentList.size()==1&&!isMerging() && !hasWaitingCommand()) { 357 if (quit2!=null && sentList.size()==1&&!isMerging() && waitingCommandInMerge.size()==0) {
331 sendToEditor(quit2); 358 sendToEditor(quit2);
332 quit2 = null; 359 quit2 = null;
333 return true; 360 return true;
334 } 361 }
335 return false; 362 return false;
402 return false; 429 return false;
403 } 430 }
404 return true; 431 return true;
405 } 432 }
406 433
407 /**
408 * write command to the editor
409 * called from another Editor instance such as next.send(command)
410 */
411 @Override
412 public void write(REPCommand command) {
413 if (merging || waitingCommandInMerge.size()>0) {
414 addWaitingCommand(new PacketSet(this, new REPCommand(command)));
415 return;
416 }
417 if (!waitingRequired(command,channel)) {
418 if (isMergeCommand(command)) {
419 merging = true;
420 ServerMainLoop.logger.writeLog("Editor"+eid+": merging=true "+command);
421 }
422 super.write(command);
423 }
424 }
425 434
426 private boolean isMergeCommand(REPCommand command) { 435 private boolean isMergeCommand(REPCommand command) {
427 switch(command.cmd) { 436 switch(command.cmd) {
428 case REPCMD_INSERT: case REPCMD_DELETE: 437 case REPCMD_INSERT: case REPCMD_DELETE:
429 return command.eid==eid; 438 return command.eid==eid;
432 } 441 }
433 return false; 442 return false;
434 } 443 }
435 444
436 public void sendToEditor(REPCommand command) { 445 public void sendToEditor(REPCommand command) {
437 toEditor.send(command); 446 writeQueue.add(command);
438 } 447 }
439 448
440 @Override 449 @Override
441 public void handle(REPCommand command, REPSelectionKey<REPCommand> key) throws IOException { 450 public void handle(REPCommand command, REPSelectionKey<REPCommand> key) throws IOException {
442 //ServerMainLoop.logger.writeLog("Manager "+manager.getId()+" read : command = " + command 451 //ServerMainLoop.logger.writeLog("Manager "+manager.getId()+" read : command = " + command
478 return; 487 return;
479 } 488 }
480 send(command); 489 send(command);
481 } 490 }
482 491
483 /** 492
484 * Check waiting command in merge
485 * @return true if there is a processed waiting command
486 * @throws IOException
487 */
488 public void checkWaitingCommandInMerge() {
489 if (translator==null||isMerging()) return;
490 while(waitingCommandInMerge.size()>0) {
491 if (merging||isMerging()) return;
492 // to preserve command order, move all elements from manager's writing queue
493 manager.getWriteQueue(this);
494 PacketSet p = waitingCommandInMerge.remove(0);
495 REPCommand command = p.command;
496 try {
497 ServerMainLoop.logger.writeLog("Editor"+eid+": resend after merge comand="+command);
498 toEditor.send(command);
499 if (isMergeCommand(command)) {
500 merging = true;
501 return;
502 }
503 } catch (Exception e1) {
504 assert false;
505 manager.close(p.channel.channel);
506 return;
507 }
508 }
509 }
510
511
512 public boolean hasWaitingCommand() {
513 return waitingCommandInMerge.size()>0;
514 }
515 } 493 }