Mercurial > hg > RemoteEditor > REPSessionManager
annotate rep/SessionManager.java @ 362:f0bd158dace6
*** empty log message ***
author | kono |
---|---|
date | Sun, 19 Oct 2008 23:05:59 +0900 |
parents | 65c6d12a5835 |
children | 1a8856580d38 |
rev | line source |
---|---|
266 | 1 |
0 | 2 package rep; |
3 | |
4 import java.io.IOException; | |
5 import java.net.InetSocketAddress; | |
267 | 6 import java.nio.channels.ClosedChannelException; |
0 | 7 import java.nio.channels.SelectionKey; |
83 | 8 import java.util.LinkedList; |
144 | 9 import java.util.List; |
231 | 10 import java.util.Set; |
178 | 11 import java.util.concurrent.BlockingQueue; |
192 | 12 import java.util.concurrent.LinkedBlockingQueue; |
0 | 13 |
346 | 14 import org.xml.sax.SAXException; |
15 | |
198 | 16 |
353 | 17 |
337 | 18 import rep.channel.REPLogger; |
123 | 19 import rep.channel.REPServerSocketChannel; |
133 | 20 import rep.channel.REPSocketChannel; |
144 | 21 import rep.handler.PacketSet; |
146 | 22 import rep.handler.REPHandler; |
158 | 23 import rep.channel.REPSelector; |
56 | 24 import rep.xml.SessionXMLDecoder; |
45 | 25 import rep.xml.SessionXMLEncoder; |
198 | 26 import rep.channel.REPSelectionKey; |
264 | 27 |
198 | 28 /* |
264 | 29 +-------+--------+--------+-------+--------+---------+------+ |
30 | cmd | session| editor | seqid | lineno | textsiz | text | | |
31 | | id | id | | | | | | |
32 +-------+--------+--------+-------+--------+---------+------+ | |
33 o---------- header section (network order) ----------o | |
34 | |
35 int cmd; kind of command | |
36 int sid; session ID : uniqu to editing file | |
37 int eid; editor ID : owner editor ID = 1。Session に対して unique | |
308 | 38 -1 session manager command |
39 -2 merge command | |
264 | 40 int seqno; Sequence number : sequence number はエディタごとに管理 |
41 int lineno; line number | |
42 int textsize; textsize : bytesize | |
43 byte[] text; | |
198 | 44 */ |
1 | 45 |
250 | 46 public class SessionManager implements SessionManagerEventListener{ |
337 | 47 static public REPLogger logger = REPLogger.singleton(); |
48 | |
358 | 49 SessionList sessionList; |
280 | 50 private SessionManagerGUI gui; |
198 | 51 private REPSelector<REPCommand> selector; |
319 | 52 SessionManagerList smList; |
356 | 53 EditorList editorList; |
304 | 54 // editorList は、sessionList に入っているeditorとは別なeditorのlistらしい。 |
349 | 55 // private String maxHost; |
212 | 56 private List<PacketSet> waitingCommandInMerge; |
308 | 57 private BlockingQueue<SessionManagerEvent> waitingEventQueue = new LinkedBlockingQueue<SessionManagerEvent>();; |
319 | 58 String myHost; |
317 | 59 private LinkedList<PacketSet> writeQueue = new LinkedList<PacketSet>(); |
336 | 60 private int receive_port; |
61 private int parent_port; | |
101 | 62 static final int DEFAULT_PORT = 8766; |
332 | 63 private static final int packetLimit = 200; |
358 | 64 |
65 private static final int MAXID = 10000; | |
353 | 66 SessionXMLDecoder decoder = new SessionXMLDecoder(); |
358 | 67 SessionXMLEncoder encoder = new SessionXMLEncoder(); |
355 | 68 private Forwarder sm_join_channel; |
349 | 69 |
358 | 70 private RoutingTable routingTable = new RoutingTable(); |
71 | |
316 | 72 public static void main(String[] args) throws InterruptedException, IOException { |
73 | |
74 int port = DEFAULT_PORT; | |
75 int port_s = DEFAULT_PORT; | |
76 //System.setProperty("file.encoding", "UTF-8"); | |
77 if(args.length > 0){ | |
78 port = Integer.parseInt(args[0]); | |
79 port_s = Integer.parseInt(args[1]); | |
80 } | |
81 SessionManager sm = new SessionManager(); | |
336 | 82 sm.setReceivePort(port); |
83 sm.setParentPort(port_s); | |
316 | 84 sm.init(port,new SessionManagerGUIimpl(sm)); |
85 | |
86 | |
87 } | |
88 | |
2 | 89 |
336 | 90 public void setReceivePort(int port) { |
91 receive_port = port; | |
92 } | |
93 | |
94 | |
2 | 95 public void openSelector() throws IOException{ |
231 | 96 selector = REPSelector.<REPCommand>create(); |
2 | 97 } |
280 | 98 |
99 public void init(int port, SessionManagerGUI gui) throws IOException, InterruptedException { | |
100 this.gui = gui; | |
101 openSelector(); | |
102 init(port); | |
103 mainLoop(); | |
104 } | |
105 | |
106 | |
107 private void init(int port) throws InterruptedException, IOException { | |
2 | 108 |
186 | 109 REPServerSocketChannel<REPCommand> ssc = REPServerSocketChannel.<REPCommand>open(new REPCommandPacker()); |
122 | 110 ssc.configureBlocking(false); //reuse address 必須 |
101 | 111 ssc.socket().setReuseAddress(true); |
212 | 112 //getAllByNameで取れた全てのアドレスに対してbindする |
113 ssc.socket().bind(new InetSocketAddress(port)); | |
349 | 114 ssc.register(selector, SelectionKey.OP_ACCEPT, |
115 new Forwarder(this)); | |
6 | 116 |
358 | 117 sessionList = new SessionList(); |
7 | 118 smList = new SessionManagerList(); |
356 | 119 editorList = new EditorList(); |
212 | 120 waitingCommandInMerge = new LinkedList<PacketSet>(); |
228 | 121 |
215 | 122 |
155 | 123 } |
313 | 124 |
125 /* | |
126 * We wrote everything in one thread, but we can assign | |
127 * one thread for each communication channel and GUI event. | |
128 */ | |
155 | 129 |
231 | 130 public void mainLoop() throws IOException { |
0 | 131 while(true){ |
328 | 132 checkWaitingCommandInMerge(); |
313 | 133 if (checkInputEvent() || |
328 | 134 checkWaitingWrite()) { |
313 | 135 // try to do fair execution for waiting task |
136 if(selector.selectNow() > 0) select(); | |
137 continue; | |
300 | 138 } |
313 | 139 // now we can wait for input packet or event |
233 | 140 selector.select(); |
144 | 141 select(); |
142 } | |
143 } | |
144 | |
313 | 145 private boolean checkInputEvent() { |
146 SessionManagerEvent e; | |
147 if((e = waitingEventQueue.poll())!=null){ | |
334 | 148 e.exec(this); |
313 | 149 return true; |
150 } | |
151 return false; | |
152 } | |
153 | |
154 private boolean checkWaitingWrite() throws IOException { | |
317 | 155 PacketSet p = writeQueue.poll(); |
156 if (p!=null) { | |
327 | 157 p.channel.write(p.command); |
317 | 158 return true; |
313 | 159 } |
160 return false; | |
161 } | |
162 | |
308 | 163 /** |
164 * Check waiting command in merge | |
165 * @return true if there is a processed waiting command | |
166 * @throws IOException | |
167 */ | |
346 | 168 private void checkWaitingCommandInMerge() { |
328 | 169 List<PacketSet> w = waitingCommandInMerge; |
170 waitingCommandInMerge = new LinkedList<PacketSet>(); | |
171 for(PacketSet p: w) { | |
172 Editor e = p.getEditor(); | |
353 | 173 if(e.isMerging()) { // still merging do nothing |
328 | 174 waitingCommandInMerge.add(p); |
175 } else { | |
346 | 176 try { |
353 | 177 if (sessionManage(e, p.command)) { // we don't need this |
178 assert false; | |
179 return; | |
180 } | |
181 e.manage(p.command); | |
347 | 182 } catch (Exception e1) { |
349 | 183 // should be e.close()? |
346 | 184 close(p.channel); |
185 } | |
328 | 186 } |
187 } | |
188 } | |
189 | |
346 | 190 private void close(REPSocketChannel<REPCommand> channel) { |
191 REPSelectionKey<REPCommand>key = channel.keyFor1(selector); | |
192 REPHandler handler = (REPHandler)key.attachment(); | |
193 key.cancel(); | |
194 handler.cancel(channel); | |
195 // we have to remove session/enditor | |
196 } | |
197 | |
198 | |
328 | 199 public boolean hasWaitingCommand(REPSocketChannel<REPCommand>c) { |
200 for(PacketSet p:waitingCommandInMerge) { | |
201 if (p.channel==c) { | |
212 | 202 return true; |
178 | 203 } |
204 } | |
205 return false; | |
206 } | |
207 | |
144 | 208 private void select() throws IOException { |
231 | 209 |
210 Set<REPSelectionKey<REPCommand>> keys = selector.selectedKeys1(); | |
211 for(REPSelectionKey<REPCommand> key : keys){ | |
144 | 212 if(key.isAcceptable()){ |
199 | 213 REPSocketChannel<REPCommand> channel = key.accept(new REPCommandPacker()); |
337 | 214 logger.writeLog("SessionManager.select() : key.isAcceptable : channel = " + channel); |
355 | 215 registerChannel (channel, new FirstConnector(this)); |
144 | 216 channel = null; |
123 | 217 |
144 | 218 }else if(key.isReadable()){ |
212 | 219 REPHandler handler = (REPHandler)(key.attachment()); |
267 | 220 try { |
221 handler.handle(key); | |
358 | 222 } catch (IOException e) { |
267 | 223 key.cancel(); |
308 | 224 handler.cancel(key.channel1()); |
267 | 225 } |
0 | 226 } |
227 } | |
228 } | |
1 | 229 |
355 | 230 void registerChannel(REPSocketChannel<REPCommand> channel,Forwarder handler) throws IOException { |
2 | 231 if(channel == null) { |
232 return; | |
233 } | |
349 | 234 handler.setChannel(channel); |
2 | 235 channel.configureBlocking(false); |
355 | 236 channel.register(selector, SelectionKey.OP_READ, handler); |
2 | 237 } |
238 | |
144 | 239 |
353 | 240 void cancel_sm_join() { |
355 | 241 removeChannel(sm_join_channel); |
242 sm_join_channel=null; | |
243 } | |
244 | |
245 | |
246 private void removeChannel(Forwarder sm_join_channel) { | |
247 REPSelectionKey<REPCommand> key = sm_join_channel.channel.keyFor1(selector); | |
248 key.cancel(); | |
249 try { | |
250 sm_join_channel.channel.close(); | |
251 } catch (IOException e) { | |
252 } | |
349 | 253 } |
254 | |
320 | 255 |
319 | 256 void updateGUI() { |
212 | 257 //リストのコピーをGUIに渡す |
358 | 258 LinkedList<Session> sList = new LinkedList<Session>(sessionList.values()); |
259 LinkedList<Editor> eList = new LinkedList<Editor>(editorList.values()); | |
212 | 260 //GUIに反映 |
261 Runnable doRun = new DoGUIUpdate(sList, eList, gui); | |
279 | 262 gui.invokeLater(doRun); |
212 | 263 } |
264 | |
83 | 265 |
139 | 266 |
353 | 267 void setMyHostName(String localHostName) { |
308 | 268 myHost = localHostName + receive_port; |
349 | 269 // if(maxHost == null) { |
270 // maxHost = myHost; | |
271 // } | |
164 | 272 setHostToEditor(myHost); |
273 } | |
274 | |
275 private void setHostToEditor(String myHost2) { | |
358 | 276 for(Editor editor : editorList.values()){ |
277 if (editor.channel!=null) | |
278 editor.setHost(myHost2); | |
164 | 279 } |
76 | 280 } |
0 | 281 |
355 | 282 |
283 /** | |
284 * Host 名のSession Manager に SM_JOIN する。自分は、Session を持っていては | |
285 * ならない。複数のSession Managerにjoinすることは出来ない。(NATを実装するまでは)。 | |
286 * @param host | |
287 */ | |
178 | 288 public void connectSession(String host) { |
355 | 289 if (sm_join_channel!=null) return; |
290 if (!sessionList.isEmpty()) return; | |
291 if (!smList.isMaster()) return; | |
334 | 292 int port = parent_port; |
1 | 293 InetSocketAddress addr = new InetSocketAddress(host, port); |
294 try { | |
186 | 295 REPSocketChannel<REPCommand> sessionchannel = REPSocketChannel.<REPCommand>create(new REPCommandPacker()); |
349 | 296 |
1 | 297 sessionchannel.connect(addr); |
337 | 298 while(!sessionchannel.finishConnect()); |
349 | 299 Forwarder sm = new Forwarder(this); |
355 | 300 registerChannel(sessionchannel, sm); |
349 | 301 sm_join(sm); |
1 | 302 }catch (IOException e) { |
303 } | |
304 } | |
77 | 305 |
349 | 306 private void sm_join(Forwarder channel){ |
355 | 307 sm_join_channel = channel; |
122 | 308 //SM_JOINコマンドを生成。 |
77 | 309 REPCommand command = new REPCommand(); |
310 command.setCMD(REP.SMCMD_SM_JOIN); | |
349 | 311 command.setEID(-1); // request Parent SessionManagerID |
312 command.setSID(-1); // request SessionManagerID | |
79 | 313 |
122 | 314 //hostnameをセット。 |
349 | 315 setMyHostName(channel.getLocalHostName()); |
82 | 316 |
355 | 317 String string = myHost; |
77 | 318 command.setString(string); |
319 | |
122 | 320 //SM_JOINコマンドを送信。 |
349 | 321 channel.send(command); |
122 | 322 //SessionManagerのListに追加。 |
349 | 323 |
77 | 324 } |
349 | 325 |
361 | 326 /* |
327 * Select Session from Manager button | |
328 * selected editor is joined editor directly connected to this session | |
329 * manager. | |
330 */ | |
316 | 331 public void selectSession(SelectButtonEvent event) throws IOException { |
250 | 332 int sid = event.getSID(); |
358 | 333 Session session = sessionList.get(sid); |
227 | 334 |
320 | 335 Editor editor = (Editor)event.getEditor(); |
227 | 336 if(editor == null){ |
337 | 337 logger.writeLog("SessionManager.selectSession():editor = " + editor); |
227 | 338 return; |
339 } | |
324 | 340 if (editor.hasSession()) return; |
319 | 341 |
358 | 342 selectSession(sid, session, editor.getEID(), editor); |
343 } | |
344 | |
361 | 345 /* |
346 * Select Session Protocol handler | |
347 */ | |
358 | 348 private void selectSession(int sid, Session session, int eid, Forwarder editor) { |
158 | 349 if(session.hasOwner()){ |
107 | 350 REPCommand sendCommand = new REPCommand(); |
358 | 351 if (editor.isDirect()&&editor.getEID()==eid) { |
361 | 352 session.addForwarder(editor); |
359
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
353 sendUpdate(session.getSID()); |
358 | 354 sendCommand.setCMD(REP.SMCMD_JOIN_ACK); |
355 } else { | |
356 // SELECT_ACK is sent to the session ring to | |
357 // find out joined editor | |
358 sendCommand.setCMD(REP.SMCMD_SELECT_ACK); | |
361 | 359 // Do not directly addForwarder(forwarder). It may be |
360 // shared among sessions. | |
361 Forwarder f = new Editor(this, false, makeID(editorList.newEid())); | |
362 f.setChannel(editor.channel); // incoming channel | |
363 f.setHost(myHost); | |
364 f.setSID(sid); | |
365 session.addForwarder(f); | |
358 | 366 } |
148 | 367 sendCommand.setEID(editor.getEID()); |
107 | 368 sendCommand.setSID(sid); |
361 | 369 sendCommand.string = session.getName(); |
349 | 370 editor.send(sendCommand); |
107 | 371 }else { |
361 | 372 // session searching |
358 | 373 Forwarder next = routingTable.toSession(sid); |
107 | 374 |
361 | 375 Forwarder f = new Editor(this, false, makeID(editorList.newEid())); |
376 f.setChannel(editor.channel); // incoming channel | |
377 f.setNext(next); | |
378 f.setHost(myHost); | |
379 f.setSID(sid); | |
380 session.setFirstForwarder(f); | |
381 | |
107 | 382 REPCommand command = new REPCommand(); |
383 command.setCMD(REP.SMCMD_SELECT); | |
384 command.setSID(sid); | |
358 | 385 command.setEID(eid); |
178 | 386 command.setString(editor.getHost()); |
358 | 387 next.send(command); |
107 | 388 } |
8 | 389 } |
122 | 390 |
359
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
391 private void sendUpdate(int sid) { |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
392 REPCommand command = makeREPCommandWithSessionList(REP.SMCMD_UPDATE); |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
393 command.setSID(sid); |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
394 command.setEID(REP.SM_EID.id); |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
395 smList.sendToMaster(command); |
358 | 396 } |
397 | |
359
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
398 public Editor newEditor(REPSocketChannel<REPCommand> channel) { |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
399 int eid = makeID(editorList.newEid()); |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
400 Editor editor = new Editor(this, eid, channel); |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
401 editorList.add(editor); |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
402 return editor; |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
403 } |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
404 |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
405 |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
406 public Session newSession(Forwarder master) { |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
407 int sid= makeID(sessionList.newSessionID()); |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
408 Session session = new Session(sid, master); |
360 | 409 sessionList.put(sid, session); |
359
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
410 return session; |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
411 } |
358 | 412 |
144 | 413 public void addWaitingCommand(PacketSet set) { |
212 | 414 waitingCommandInMerge.add(set); |
144 | 415 } |
148 | 416 |
222 | 417 public void buttonPressed(SessionManagerEvent event) { |
418 try { | |
308 | 419 waitingEventQueue.put(event); |
222 | 420 } catch (InterruptedException e) {} |
421 selector.wakeup(); | |
422 } | |
281 | 423 |
424 public void syncExec(SessionManagerEvent event) { | |
425 try { | |
308 | 426 waitingEventQueue.put(event); |
281 | 427 } catch (InterruptedException e) { |
428 } | |
429 } | |
222 | 430 |
259 | 431 public void closeSession(SessionManagerEvent event) { |
432 Session session = ((CloseButtonEvent) event).getSession(); | |
433 session.closeSession(); | |
434 sessionList.remove(session); | |
435 updateGUI(); | |
436 } | |
437 | |
274 | 438 public void remove(REPSocketChannel<REPCommand> channel) { |
358 | 439 int i = 0; |
440 for(Session s:sessionList.values()) { | |
274 | 441 if (s.deleteEditor(channel)) { |
358 | 442 i++; |
274 | 443 } |
444 } | |
358 | 445 assert(i==1); |
274 | 446 // can be other session manager? what should I do? |
447 } | |
448 | |
317 | 449 |
450 public void addWriteQueue(PacketSet packetSet) { | |
324 | 451 writeQueue.addLast(packetSet); |
323 | 452 assert(writeQueue.size()<packetLimit) ; |
317 | 453 } |
454 | |
318 | 455 |
456 public void remove(Editor editor) { | |
358 | 457 Session s = sessionList.get(editor.getSID()); |
362 | 458 if (s==null) { |
459 assert(false); | |
460 editorList.remove(editor); | |
461 } else if (editor.isMaster()) { | |
358 | 462 removeSession(s); |
463 } else { | |
464 s.deleteForwarder(editor); | |
465 editorList.remove(editor); | |
318 | 466 } |
341 | 467 updateGUI(); |
468 } | |
469 | |
470 private void removeSession(Session s0) { | |
358 | 471 s0.remove(this); |
341 | 472 sessionList.remove(s0); |
359
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
473 sendUpdate(s0.getSID()); |
318 | 474 } |
475 | |
334 | 476 public void setParentPort(int port) { |
477 parent_port = port; | |
478 } | |
479 public int getParentPort() { | |
480 return parent_port; | |
481 } | |
482 | |
483 public int getPort() { | |
484 return receive_port; | |
485 } | |
486 | |
353 | 487 |
355 | 488 boolean sessionManage(Forwarder forwarder, REPCommand command) throws ClosedChannelException, |
358 | 489 IOException { |
355 | 490 switch(command.cmd){ |
353 | 491 |
492 // Session Manager Command | |
493 | |
494 case SMCMD_JOIN: | |
495 { | |
358 | 496 // first connection or forwarded command |
497 if(isMaster()) { | |
498 REPCommand ackCommand = new REPCommand(); | |
499 ackCommand.setCMD(REP.SMCMD_JOIN_ACK); | |
500 ackCommand.setEID(command.eid); | |
360 | 501 ackCommand.setSID(command.sid); |
358 | 502 ackCommand.string = command.string; |
503 smList.sendToSlaves(ackCommand); | |
504 registEditor(forwarder,ackCommand); | |
505 } else { | |
506 routingTable.add(forwarder,getSMID(command.eid),command.sid); | |
507 smList.sendToMaster(command); | |
508 } | |
360 | 509 updateGUI(); |
353 | 510 } |
511 | |
512 break; | |
513 | |
361 | 514 case SMCMD_PUT_ACK: |
353 | 515 case SMCMD_JOIN_ACK: |
358 | 516 registEditor(forwarder,command); |
360 | 517 updateGUI(); |
353 | 518 break; |
519 | |
520 case SMCMD_PUT: | |
521 { | |
358 | 522 // first connection or forwarded command |
523 if(isMaster()) { | |
524 command.setCMD(REP.SMCMD_PUT_ACK); | |
525 command.string = command.string; | |
526 command.setEID(command.eid); | |
527 command.setSID(command.sid); | |
528 smList.sendToSlaves(command); | |
529 registEditor(forwarder,command); | |
530 } else { | |
531 routingTable.add(forwarder,getSMID(command.eid),command.sid); | |
532 smList.sendToMaster(command); | |
361 | 533 // registEditor will be done by SMCMD_PUT_ACK |
534 } | |
535 if (forwarder.isDirect()) { | |
536 // send put_ack to the editor now. | |
537 command.setCMD(REP.SMCMD_PUT_ACK); | |
538 command.string = command.string; | |
539 command.setEID(command.eid); | |
540 command.setSID(command.sid); | |
541 forwarder.send(command); | |
358 | 542 } |
360 | 543 updateGUI(); |
353 | 544 |
545 } | |
546 break; | |
358 | 547 |
353 | 548 case SMCMD_SELECT: |
361 | 549 { |
550 Session session = sessionList.get(command.sid); | |
551 if (session==null) { | |
552 sessionList.put(command.sid, | |
553 new Session(command.sid, command.string,null)); | |
554 } | |
555 selectSession(command.sid, session, command.eid, forwarder); | |
556 } | |
557 break; | |
353 | 558 case SMCMD_SELECT_ACK: |
559 { | |
358 | 560 Session session = sessionList.get(command.sid); |
361 | 561 selectSession(command.sid, session, command.eid, |
562 session.getFirstForwarder()); | |
353 | 563 } |
358 | 564 break; |
565 | |
353 | 566 case SMCMD_SM_JOIN: |
567 { | |
355 | 568 // SM_JOIN中にSMCMD_SM_JOINが来たら、これはループなので、 |
569 ///自分のSM_JOINを取り消す。 | |
570 if (sm_join_channel!=null) cancel_sm_join(); | |
353 | 571 // SMCMD_SM_JOIN は、master まで上昇する。 |
572 // masterでなければ、自分のparentに転送する。 | |
355 | 573 if(isMaster()) { |
353 | 574 // master であれば、SessionManager IDを決めて、 |
575 // 自分のsmList に登録 | |
355 | 576 Forwarder sm; |
577 int psid = command.eid; | |
578 if (forwarder.sid!=-1) { | |
579 // すでに channelはSessionManager Idを持っていて、 | |
580 // direct link ではないので、 | |
581 // channel を持たないForwarderとして登録する | |
582 sm = new Forwarder(this); | |
583 } else { | |
584 sm = forwarder; | |
585 } | |
586 int sid = smList.addNewSessionManager(sm,command); | |
587 REPCommand sendCommand = makeREPCommandWithSessionList(REP.SMCMD_SM_JOIN_ACK); | |
588 // command.eid==smList.sesionManagerID() の場合は、 | |
589 // 待っている自分の下のsessionManagerにsidをassignする必要がある。 | |
353 | 590 sendCommand.setSID(sid); // new Session manager ID |
591 // 複数のSM_JOIN_ACKを識別するには、最初にSM_JOINを受け付けた | |
592 // Session manager IDを使う。 | |
355 | 593 sendCommand.setEID(psid); |
594 send_sm_join_ack(psid, sid, sendCommand); | |
595 } else { | |
596 if (forwarder.sid==-1) { | |
597 // direct link の場合は、識別のために、EIDに直上の | |
598 // smid を入れておく。 | |
599 command.setEID(smList.sessionManagerID()); | |
600 } | |
601 smList.sendToMaster(command); | |
353 | 602 } |
603 } | |
604 break; | |
605 | |
355 | 606 case SMCMD_SM_JOIN_ACK: |
607 send_sm_join_ack(command.eid, command.sid, command); | |
353 | 608 break; |
609 | |
610 case SMCMD_UPDATE: | |
359
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
611 if (!isMaster()) { |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
612 command.setString(mergeUpdate(command)); |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
613 // 上に知らせる |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
614 smList.sendToMaster(command); |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
615 break; |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
616 } |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
617 // fall thru |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
618 command.setCMD(REP.SMCMD_UPDATE_ACK); |
353 | 619 case SMCMD_UPDATE_ACK: |
358 | 620 command.setString(mergeUpdate(command)); |
621 // 下に知らせる | |
355 | 622 smList.sendToSlaves(command); |
358 | 623 updateGUI(); |
353 | 624 break; |
625 default: | |
626 return false; | |
627 } | |
628 return true; | |
629 } | |
630 | |
355 | 631 |
358 | 632 private String mergeUpdate(REPCommand command) throws IOException { |
633 SessionList receivedSessionList; | |
634 try { | |
635 receivedSessionList = decoder.decode(command.string); | |
636 } catch (SAXException e) { | |
637 throw new IOException(); | |
638 } | |
639 // UPDATE/UPDATE_ACKにより送られてきたSessionの情報を追加する | |
640 //XMLを生成。送信コマンドにセット。 | |
641 sessionList.merge(receivedSessionList); | |
642 return encoder.sessionListToXML(sessionList); | |
643 | |
644 } | |
645 | |
646 /* | |
647 * id has SessionManager ID part | |
648 */ | |
649 | |
650 private int makeID(int newid) { | |
651 return newid+smList.sessionManagerID()*MAXID; | |
652 } | |
653 | |
654 | |
655 private int getSMID(int id) { | |
656 return id/MAXID; | |
657 } | |
658 | |
659 | |
361 | 660 /** |
661 * Register Editor to our editorList. No connection is made. | |
662 * @param forwarder Editor to be add | |
663 * @param command | |
664 */ | |
358 | 665 private void registEditor(Forwarder forwarder,REPCommand command) { |
666 // make ack for PUT/JOIN. Do not send this to the editor, | |
667 // before select. After select, ack is sent to the editor. | |
668 routingTable.add(forwarder,getSMID(command.eid),command.sid); | |
669 Editor editor; | |
361 | 670 if (getSMID(command.sid)==smList.sessionManagerID() |
358 | 671 && forwarder.isDirect()) { |
672 // direct link だった | |
673 editor = (Editor)forwarder; | |
674 } else { | |
675 editor = new Editor(this, command.cmd==REP.SMCMD_PUT_ACK, command.eid); | |
676 } | |
677 editor.setName(command.string); | |
678 editor.setSID(command.sid); | |
679 if (!editorList.hasEid(command.eid)) { | |
680 editorList.add(editor); | |
681 } | |
682 // we don't join ack to the direct linked editor. We | |
683 // have to wait select command | |
684 } | |
685 | |
686 | |
355 | 687 void send_sm_join_ack(int psid, int sid,REPCommand sendCommand) { |
688 if (psid==smList.sessionManagerID()) { | |
689 // 直下のsessionManagerにIDを割り振る必要がある。 | |
690 smList.assignSessionManagerIDtoWaitingSM(sid); | |
691 // ここで smList に一つだけ追加されるので | |
358 | 692 // 待っている最初のsm一つにだけ、sm_join_ackが新たに送られる。 |
355 | 693 } |
694 smList.sendToSlaves(sendCommand); | |
695 } | |
696 | |
697 | |
698 private REPCommand makeREPCommandWithSessionList(REP cmd) { | |
699 //SessionListからXMLを生成。 | |
700 //joinしてきたSessionManagerに対してACKを送信。 | |
701 REPCommand sendCommand = new REPCommand(); | |
702 sendCommand.setCMD(cmd); | |
358 | 703 sendCommand.setString(encoder.sessionListToXML(sessionList)); |
355 | 704 return sendCommand; |
705 } | |
706 | |
707 | |
708 public boolean isMaster() { | |
709 return smList.isMaster(); | |
710 } | |
711 | |
712 | |
713 public void setSessionManagerID(int sid) { | |
714 smList.setSessionManagerID(sid); | |
715 } | |
716 | |
358 | 717 |
718 public Session getSession(int sid) { | |
719 return sessionList.get(sid); | |
720 } | |
721 | |
0 | 722 } |