comparison rep/handler/Editor.java @ 450:21cb16b7f3df

block message in Editor.write()
author one
date Thu, 23 Sep 2010 18:15:37 +0900
parents 89a326696c54
children fa7d9ec2008e
comparison
equal deleted inserted replaced
449:89a326696c54 450:21cb16b7f3df
163 // } 163 // }
164 164
165 private boolean waitingRequired(REPCommand command, REPSocketChannel<REPCommand> channel) { 165 private boolean waitingRequired(REPCommand command, REPSocketChannel<REPCommand> channel) {
166 if (hasWaitingCommand()) { 166 if (hasWaitingCommand()) {
167 // We cannot do this operation before watingCommandQueue. 167 // We cannot do this operation before watingCommandQueue.
168 addWaitingCommand(new PacketSet(channel, this, new REPCommand(command))); 168 addWaitingCommand(new PacketSet(this, new REPCommand(command)));
169 return true; 169 return true;
170 } else if (isMerging()) { 170 } else if (isMerging()) {
171 addWaitingCommand(new PacketSet(channel, this, new REPCommand(command))); 171 addWaitingCommand(new PacketSet(this, new REPCommand(command)));
172 return true; 172 return true;
173 } 173 }
174 //ServerMainLoop.logger.writeLog("Editor eid:"+eid+" no waiting"); 174 //ServerMainLoop.logger.writeLog("Editor eid:"+eid+" no waiting");
175 return false; 175 return false;
176 } 176 }
229 startMerge(command); 229 startMerge(command);
230 return; 230 return;
231 } 231 }
232 232
233 private boolean checkAck(REPCommand command) { 233 private boolean checkAck(REPCommand command) {
234 assert(!merging); 234 assert(!isMerging());
235 REPCommand prev; 235 REPCommand prev;
236 if (sentList.getFirst()==mergeMark) prev=sentList.remove(1); else prev=sentList.remove(0); 236 if (sentList.getFirst()==mergeMark) prev=sentList.remove(1); else prev=sentList.remove(0);
237 if (prev==null || prev.seq != command.seq || prev.eid!=command.eid) { 237 if (prev==null || prev.seq != command.seq || prev.eid!=command.eid) {
238 // should be more robust to allow communication failure 238 // should be more robust to allow communication failure
239 String err = "Editor eid="+eid+" checkReturnedCommand() : command = " + command + " prev="+ 239 String err = "Editor eid="+eid+" checkReturnedCommand() : command = " + command + " prev="+
254 return; 254 return;
255 } 255 }
256 // START_MERGE を送る 256 // START_MERGE を送る
257 // 送らないで良い場合もある? 257 // 送らないで良い場合もある?
258 REPCommand cmd = new REPCommand(REP.SMCMD_START_MERGE,command.sid,REP.SM_EID.id,seq(),0,""); 258 REPCommand cmd = new REPCommand(REP.SMCMD_START_MERGE,command.sid,REP.SM_EID.id,seq(),0,"");
259 send(cmd); 259 sendToEditor(cmd);
260 merging = true; 260 merging = true;
261 // Session Manager 側で、このeditorへの他のeditorからの 261 // Session Manager 側で、このeditorへの他のeditorからの
262 // 入力を止めて、merge にそなえる。merge は、eidtor 側から 262 // 入力を止めて、merge にそなえる。merge は、eidtor 側から
263 // ACKが来てから始まる。 263 // ACKが来てから始まる。
264 translator.startMerge(command); 264 translator.startMerge(command);
299 299
300 300
301 private void endMerge() { 301 private void endMerge() {
302 translator.endMerge(); 302 translator.endMerge();
303 REPCommand mergeEnd = new REPCommand(REP.SMCMD_END_MERGE,sid,eid,seq(),0,""); 303 REPCommand mergeEnd = new REPCommand(REP.SMCMD_END_MERGE,sid,eid,seq(),0,"");
304 send(mergeEnd); 304 sendToEditor(mergeEnd);
305 if (preMergeCommand.eid==eid) { 305 if (preMergeCommand.eid==eid) {
306 // Ackの場合はcheckAck() で既にremoveされている 306 // Ackの場合はcheckAck() で既にremoveされている
307 if (sentList.getFirst()==mergeMark) sentList.remove(1); else sentList.remove(0); 307 if (sentList.getFirst()==mergeMark) sentList.remove(1); else sentList.remove(0);
308 // First Phase End, send ACK 308 // First Phase End, send ACK
309 REPCommand keep = new REPCommand(preMergeCommand); 309 REPCommand keep = new REPCommand(preMergeCommand);
324 preMergeCommand = null; 324 preMergeCommand = null;
325 } 325 }
326 326
327 private boolean checkQuit() { 327 private boolean checkQuit() {
328 if (quit2!=null && sentList.size()==1&&!isMerging()) { 328 if (quit2!=null && sentList.size()==1&&!isMerging()) {
329 send(quit2); 329 sendToEditor(quit2);
330 quit2 = null; 330 quit2 = null;
331 return true; 331 return true;
332 } 332 }
333 return false; 333 return false;
334 } 334 }
362 break; 362 break;
363 } 363 }
364 364
365 case SMCMD_SYNC: 365 case SMCMD_SYNC:
366 if (isMaster()) 366 if (isMaster())
367 send(command); 367 sendToEditor(command);
368 else 368 else
369 next.send(command); 369 next.send(command);
370 370
371 case SMCMD_QUIT: 371 case SMCMD_QUIT:
372 { 372 {
401 } 401 }
402 return true; 402 return true;
403 } 403 }
404 404
405 /** 405 /**
406 * send command to the editor 406 * write command to the editor
407 * called from another Editor instance such as next.send(command) 407 * called from another Editor instance such as next.send(command)
408 */ 408 */
409 @Override 409 @Override
410 public void send(REPCommand command) { 410 public void write(REPCommand command) {
411 if (command.eid == REP.MERGE_EID.id || 411 if ( !waitingRequired(command,channel)) {
412 command.cmd==REP.SMCMD_END_MERGE || 412 if (isMergeCommand(command)) {
413 !waitingRequired(command,channel)) { 413 merging = true;
414 super.send(command); 414 }
415 } 415 super.write(command);
416 } 416 }
417 417 }
418
419 private boolean isMergeCommand(REPCommand command) {
420 switch(command.cmd) {
421 case REPCMD_INSERT: case REPCMD_DELETE:
422 return command.eid==eid;
423 case REPCMD_INSERT_ACK: case REPCMD_DELETE_ACK:
424 return command.eid!=eid;
425 }
426 return false;
427 }
428
429 public void sendToEditor(REPCommand command) {
430 super.write(command);
431 }
432
418 @Override 433 @Override
419 public void handle(REPCommand command, REPSelectionKey<REPCommand> key) throws IOException { 434 public void handle(REPCommand command, REPSelectionKey<REPCommand> key) throws IOException {
420 //ServerMainLoop.logger.writeLog("Manager "+manager.getId()+" read : command = " + command 435 //ServerMainLoop.logger.writeLog("Manager "+manager.getId()+" read : command = " + command
421 // +" from "+manager.editorList.editorByChannel(channel)); 436 // +" from "+manager.editorList.editorByChannel(channel));
422 if (command.cmd==REP.SMCMD_JOIN||command.cmd==REP.SMCMD_PUT) { 437 if (command.cmd==REP.SMCMD_JOIN||command.cmd==REP.SMCMD_PUT) {
474 return; 489 return;
475 } 490 }
476 PacketSet p = w.remove(0); 491 PacketSet p = w.remove(0);
477 try { 492 try {
478 if (p.channel!=null) 493 if (p.channel!=null)
479 send(p.command); 494 write(p.command);
480 else 495 else
481 manage(p.command); 496 manage(p.command);
482 } catch (Exception e1) { 497 } catch (Exception e1) {
483 assert false; 498 assert false;
484 manager.close(p.channel); 499 manager.close(p.channel.channel);
485 return; 500 return;
486 } 501 }
487 } 502 }
488 } 503 }
489 504