comparison rep/handler/Editor.java @ 406:8009dd7b2013

command execution when commandInMergeQueue>0 duplicate JOIN_ACK
author one
date Tue, 25 Nov 2008 09:07:52 +0900
parents 4bb04d5a9bbf
children de4ef4313adc
comparison
equal deleted inserted replaced
405:0b1d52ffb803 406:8009dd7b2013
16 public class Editor extends Forwarder { 16 public class Editor extends Forwarder {
17 17
18 private Translator translator; 18 private Translator translator;
19 // REPCommands we are going to send to the next editor 19 // REPCommands we are going to send to the next editor
20 private List<REPCommand> sentList = new LinkedList<REPCommand>(); 20 private List<REPCommand> sentList = new LinkedList<REPCommand>();
21 protected List<PacketSet> waitingCommandInMerge= new LinkedList<PacketSet>(); 21 protected LinkedList<PacketSet> waitingCommandInMerge= new LinkedList<PacketSet>();
22 private REPCommand quit2=null; 22 private REPCommand quit2=null;
23 private boolean merging; 23 private boolean merging;
24 private REPCommand preMergeCommand; 24 private REPCommand preMergeCommand;
25 public static boolean noMergeMode=false; 25 public static boolean noMergeMode=false;
26 static final boolean doOptimize = true; 26 static final boolean doOptimize = true;
42 42
43 public void translate(REPCommand command){ 43 public void translate(REPCommand command){
44 switch(command.cmd) { 44 switch(command.cmd) {
45 case REPCMD_INSERT_ACK: 45 case REPCMD_INSERT_ACK:
46 case REPCMD_DELETE_ACK: 46 case REPCMD_DELETE_ACK:
47 if (waitingRequired(command)) return;
47 if (command.eid==eid) { 48 if (command.eid==eid) {
48 // Second Phase が終わって同期が終了。 49 // Second Phase が終わって同期が終了。
50 removeFromSentList(command);
51 SessionManager.logger.writeLog("Complete "+command);
49 return; 52 return;
50 } 53 }
51 if (waitingRequired(command)) return;
52 checkReturnedCommand(command); 54 checkReturnedCommand(command);
53 return; 55 return;
54 case REPCMD_INSERT_USER: 56 case REPCMD_INSERT_USER:
55 command.cmd = REP.REPCMD_INSERT; 57 command.cmd = REP.REPCMD_INSERT;
56 userEditorCommand(command); 58 userEditorCommand(command);
57 return; 59 return;
58 case REPCMD_DELETE_USER: 60 case REPCMD_DELETE_USER:
59 command.cmd = REP.REPCMD_INSERT; 61 command.cmd = REP.REPCMD_DELETE;
60 userEditorCommand(command); 62 userEditorCommand(command);
61 return; 63 return;
62 case REPCMD_INSERT: 64 case REPCMD_INSERT:
63 case REPCMD_DELETE: 65 case REPCMD_DELETE:
64 if (command.eid == REP.MERGE_EID.id){ 66 if (command.eid == REP.MERGE_EID.id){
167 * 途中から参加した場合、自分が受けとってないcommandのACKが先に来ることが 169 * 途中から参加した場合、自分が受けとってないcommandのACKが先に来ることが
168 * ある。それは、無視して良い。 170 * ある。それは、無視して良い。
169 * @param command 171 * @param command
170 */ 172 */
171 void checkReturnedCommand(REPCommand command) { 173 void checkReturnedCommand(REPCommand command) {
174 if (removeFromSentList(command))
175 startMerge(command);
176 return;
177 }
178
179 private boolean removeFromSentList(REPCommand command) {
172 assert(!merging); 180 assert(!merging);
173 if (sentList.size()==0) { 181 if (sentList.size()==0) {
174 ServerMainLoop.logger.writeLog("Editor eid="+eid+" looped command not registered: "+command); 182 ServerMainLoop.logger.writeLog("Editor eid="+eid+" looped command not registered: "+command);
175 assert(command.cmd==REP.REPCMD_DELETE_ACK|| 183 assert(command.cmd==REP.REPCMD_DELETE_ACK||
176 command.cmd==REP.REPCMD_INSERT_ACK); 184 command.cmd==REP.REPCMD_INSERT_ACK);
177 return; 185 return false;
178 } 186 }
179 REPCommand prev = sentList.remove(0); 187 REPCommand prev = sentList.remove(0);
180 // ServerMainLoop.logger.writeLog("Editor eid="+eid+" remove sentList:"+(prev==null?"null":prev)); 188 // ServerMainLoop.logger.writeLog("Editor eid="+eid+" remove sentList:"+(prev==null?"null":prev));
181 if (prev==null || prev.seq != command.seq || prev.eid!=command.eid) { 189 if (prev==null || prev.seq != command.seq || prev.eid!=command.eid) {
182 String err = "Editor eid="+eid+" checkReturnedCommand() : command = " + command + " prev="+ 190 String err = "Editor eid="+eid+" checkReturnedCommand() : command = " + command + " prev="+
184 err += sentList; 192 err += sentList;
185 ServerMainLoop.logger.writeLog(err); 193 ServerMainLoop.logger.writeLog(err);
186 assert(command.cmd==REP.REPCMD_DELETE_ACK|| 194 assert(command.cmd==REP.REPCMD_DELETE_ACK||
187 command.cmd==REP.REPCMD_INSERT_ACK); 195 command.cmd==REP.REPCMD_INSERT_ACK);
188 sentList.add(0,prev); 196 sentList.add(0,prev);
189 return; 197 return false;
190 } 198 }
191 startMerge(command); 199 return true;
192 return;
193 } 200 }
194 201
195 private void startMerge(REPCommand command) { 202 private void startMerge(REPCommand command) {
196 preMergeCommand = new REPCommand(command); 203 preMergeCommand = new REPCommand(command);
197 preMergeCommand.string = ""; 204 preMergeCommand.string = "";
252 // First Phase End, send ACK 259 // First Phase End, send ACK
253 REPCommand keep = new REPCommand(preMergeCommand); 260 REPCommand keep = new REPCommand(preMergeCommand);
254 switch(keep.cmd) { 261 switch(keep.cmd) {
255 case REPCMD_INSERT: keep.cmd = REP.REPCMD_INSERT_ACK;break; 262 case REPCMD_INSERT: keep.cmd = REP.REPCMD_INSERT_ACK;break;
256 case REPCMD_DELETE: keep.cmd = REP.REPCMD_DELETE_ACK;break; 263 case REPCMD_DELETE: keep.cmd = REP.REPCMD_DELETE_ACK;break;
264 default: assert(false);
257 } 265 }
258 sentList.add(keep); 266 sentList.add(keep);
259 ServerMainLoop.logger.writeLog("Editor eid:"+eid+" sentList = "+sentList); 267 ServerMainLoop.logger.writeLog("Editor eid:"+eid+" sentList = "+sentList);
260 assert(sentList.size()<limit); 268 assert(sentList.size()<limit);
261 next.send(keep); 269 next.send(keep);
390 * Check waiting command in merge 398 * Check waiting command in merge
391 * @return true if there is a processed waiting command 399 * @return true if there is a processed waiting command
392 * @throws IOException 400 * @throws IOException
393 */ 401 */
394 public void checkWaitingCommandInMerge() { 402 public void checkWaitingCommandInMerge() {
395 int count = waitingCommandInMerge.size(); 403 if (translator==null||isMerging()) return;
396 if (count==0) return; 404 LinkedList<PacketSet> w = waitingCommandInMerge;
397 if (isMerging()) return; 405 waitingCommandInMerge = new LinkedList<PacketSet>();
398 while(count-->0) { 406 while(w.size()>0) {
399 PacketSet p = waitingCommandInMerge.remove(0); 407 if (isMerging()) {
408 w.addAll(waitingCommandInMerge);
409 waitingCommandInMerge = w;
410 return;
411 }
412 PacketSet p = w.remove(0);
400 try { 413 try {
401 // if (manager.sessionManage(e, p.command)) { // we don't need this
402 // assert false;
403 // return;
404 // }
405 manage(p.command); 414 manage(p.command);
406 } catch (Exception e1) { 415 } catch (Exception e1) {
407 // should be e.close()? 416 assert false;
408 manager.close(p.channel); 417 manager.close(p.channel);
418 return;
409 } 419 }
410 } 420 }
411 } 421 }
412 422
413 423