comparison rep/SessionManager.java @ 382:4b87f89b3afd

REP Session Manager (Java version) new structure
author one@firefly.cr.ie.u-ryukyu.ac.jp
date Mon, 10 Nov 2008 22:07:45 +0900
parents c78569ab5fce
children bcdf5476b8e4
comparison
equal deleted inserted replaced
381:65fdb3dc1885 382:4b87f89b3afd
16 16
17 17
18 import rep.channel.REPLogger; 18 import rep.channel.REPLogger;
19 import rep.channel.REPServerSocketChannel; 19 import rep.channel.REPServerSocketChannel;
20 import rep.channel.REPSocketChannel; 20 import rep.channel.REPSocketChannel;
21 import rep.handler.PacketSet; 21 import rep.gui.CloseButtonEvent;
22 import rep.handler.REPHandler; 22 import rep.gui.DoGUIUpdate;
23 import rep.gui.SelectButtonEvent;
24 import rep.gui.SessionManagerEvent;
25 import rep.gui.SessionManagerEventListener;
26 import rep.gui.SessionManagerGUI;
27 import rep.gui.SessionManagerGUIimpl;
28 import rep.handler.Dispatcher;
29 import rep.handler.Editor;
30 import rep.handler.REPNode;
31 import rep.handler.FirstConnector;
32 import rep.handler.Forwarder;
23 import rep.channel.REPSelector; 33 import rep.channel.REPSelector;
24 import rep.xml.SessionXMLDecoder; 34 import rep.xml.SessionXMLDecoder;
25 import rep.xml.SessionXMLEncoder; 35 import rep.xml.SessionXMLEncoder;
26 import rep.channel.REPSelectionKey; 36 import rep.channel.REPSelectionKey;
27 37
59 private List<PacketSet> waitingCommandInMerge; 69 private List<PacketSet> waitingCommandInMerge;
60 // Command from gui. Synchronization is required. 70 // Command from gui. Synchronization is required.
61 private BlockingQueue<SessionManagerEvent> waitingEventQueue 71 private BlockingQueue<SessionManagerEvent> waitingEventQueue
62 = new LinkedBlockingQueue<SessionManagerEvent>();; 72 = new LinkedBlockingQueue<SessionManagerEvent>();;
63 // host name of this server. One of connecting SocketChannel's hostname 73 // host name of this server. One of connecting SocketChannel's hostname
64 String myHost; 74 public String myHost;
65 // Single threaded write queueu. To avoid dead lock with too many writes. 75 // Single threaded write queueu. To avoid dead lock with too many writes.
66 private LinkedList<PacketSet> writeQueue = new LinkedList<PacketSet>(); 76 private LinkedList<PacketSet> writeQueue = new LinkedList<PacketSet>();
67 private int receive_port; 77 private int receive_port;
68 private int parent_port; 78 private int parent_port;
69 static final int DEFAULT_PORT = 8766; 79 static final int DEFAULT_PORT = 8766;
73 // globalSessionID = SessionManagerID * MAXID + localSessionID 83 // globalSessionID = SessionManagerID * MAXID + localSessionID
74 private static final int MAXID = 10000; 84 private static final int MAXID = 10000;
75 SessionXMLDecoder decoder = new SessionXMLDecoder(); 85 SessionXMLDecoder decoder = new SessionXMLDecoder();
76 SessionXMLEncoder encoder = new SessionXMLEncoder(); 86 SessionXMLEncoder encoder = new SessionXMLEncoder();
77 // SocketChannel for our parent. At most one parent is allowed. 87 // SocketChannel for our parent. At most one parent is allowed.
78 private Forwarder sm_join_channel; 88 private REPNode sm_join_channel;
79 // Routing table for session and session manager. 89 // Routing table for session and session manager.
80 private RoutingTable routingTable = new RoutingTable(this); 90 private RoutingTable routingTable = new RoutingTable(this);
81 private SessionManagerEvent execAfterConnect = null;; 91 private SessionManagerEvent execAfterConnect = null;;
82 92
83 public static void main(String[] args) throws InterruptedException, IOException { 93 public static void main(String[] args) throws InterruptedException, IOException {
116 ssc.configureBlocking(false); // Selector requires this 126 ssc.configureBlocking(false); // Selector requires this
117 ssc.socket().setReuseAddress(true); //reuse address 必須 127 ssc.socket().setReuseAddress(true); //reuse address 必須
118 //getAllByNameで取れた全てのアドレスに対してbindする 128 //getAllByNameで取れた全てのアドレスに対してbindする
119 ssc.socket().bind(new InetSocketAddress(port)); 129 ssc.socket().bind(new InetSocketAddress(port));
120 ssc.register(selector, SelectionKey.OP_ACCEPT, 130 ssc.register(selector, SelectionKey.OP_ACCEPT,
121 new Forwarder(this)); 131 new Dispatcher(this)); // FirstConnector?
122 132
123 sessionList = new SessionList(); 133 sessionList = new SessionList();
124 smList = new SessionManagerList(); 134 smList = new SessionManagerList();
125 editorList = new EditorList(); 135 editorList = new EditorList();
126 waitingCommandInMerge = new LinkedList<PacketSet>(); 136 waitingCommandInMerge = new LinkedList<PacketSet>();
181 */ 191 */
182 private void checkWaitingCommandInMerge() { 192 private void checkWaitingCommandInMerge() {
183 List<PacketSet> w = waitingCommandInMerge; 193 List<PacketSet> w = waitingCommandInMerge;
184 waitingCommandInMerge = new LinkedList<PacketSet>(); 194 waitingCommandInMerge = new LinkedList<PacketSet>();
185 for(PacketSet p: w) { 195 for(PacketSet p: w) {
186 Editor e = p.getEditor(); 196 REPNode e = p.getEditor();
187 if(e.isMerging()) { // still merging do nothing 197 if(e.isMerging()) { // still merging do nothing
188 waitingCommandInMerge.add(p); 198 waitingCommandInMerge.add(p);
189 } else { 199 } else {
190 try { 200 try {
191 if (sessionManage(e, p.command)) { // we don't need this 201 if (sessionManage(e, p.command)) { // we don't need this
217 /* 227 /*
218 * Close a channel in case of exception or close. 228 * Close a channel in case of exception or close.
219 */ 229 */
220 private void close(REPSocketChannel<REPCommand> channel) { 230 private void close(REPSocketChannel<REPCommand> channel) {
221 REPSelectionKey<REPCommand>key = channel.keyFor1(selector); 231 REPSelectionKey<REPCommand>key = channel.keyFor1(selector);
222 REPHandler handler = (REPHandler)key.attachment(); 232 REPNode handler = (REPNode)key.attachment();
223 key.cancel(); 233 key.cancel();
224 handler.cancel(channel); 234 handler.cancel(channel);
225 // we have to remove session/enditor 235 // we have to remove session/enditor
226 } 236 }
227 237
248 /* 258 /*
249 * Incoming packets are handled by a various forwarder. 259 * Incoming packets are handled by a various forwarder.
250 * A hadler throw IOException() in case of a trouble to 260 * A hadler throw IOException() in case of a trouble to
251 * close the channel. 261 * close the channel.
252 */ 262 */
253 REPHandler handler = (REPHandler)(key.attachment()); 263 REPNode handler = (REPNode)key.attachment();
254 try { 264 try {
255 handler.handle(key); 265 handler.handle(key);
256 } catch (IOException e) { 266 } catch (IOException e) {
257 key.cancel(); 267 key.cancel();
258 handler.cancel(key.channel1()); 268 handler.cancel(key.channel1());
259 } 269 }
260 } 270 }
261 } 271 }
262 } 272 }
263 273
264 void registerChannel(REPSocketChannel<REPCommand> channel,Forwarder handler) throws IOException { 274 public void registerChannel(REPSocketChannel<REPCommand> channel,REPNode handler) throws IOException {
265 if(channel == null) { 275 if(channel == null) {
266 return; 276 return;
267 } 277 }
268 handler.setChannel(channel); 278 handler.setChannel(channel);
269 channel.configureBlocking(false); 279 channel.configureBlocking(false);
278 removeChannel(sm_join_channel); 288 removeChannel(sm_join_channel);
279 sm_join_channel=null; 289 sm_join_channel=null;
280 } 290 }
281 291
282 292
283 private void removeChannel(Forwarder channel) { 293 private void removeChannel(REPNode channel) {
284 REPSelectionKey<REPCommand> key = channel.channel.keyFor1(selector); 294 REPSelectionKey<REPCommand> key = channel.channel.keyFor1(selector);
285 key.cancel(); 295 key.cancel();
286 try { 296 try {
287 channel.channel.close1(); 297 channel.channel.close1();
288 } catch (IOException e) { 298 } catch (IOException e) {
291 301
292 302
293 void updateGUI() { 303 void updateGUI() {
294 //リストのコピーをGUIに渡す 304 //リストのコピーをGUIに渡す
295 LinkedList<Session> sList = new LinkedList<Session>(sessionList.values()); 305 LinkedList<Session> sList = new LinkedList<Session>(sessionList.values());
296 LinkedList<Editor> eList; 306 LinkedList<REPNode> eList;
297 if (false) { 307 if (false) {
298 // local editor only 308 // local editor only
299 eList = new LinkedList<Editor>(); 309 eList = new LinkedList<REPNode>();
300 for(Editor e:editorList.values()) { 310 for(REPNode e:editorList.values()) {
301 if (getSMID(e.eid)==smList.sessionManagerID()) { 311 if (getSMID(e.eid)==smList.sessionManagerID()) {
302 eList.add(e); 312 eList.add(e);
303 } 313 }
304 } 314 }
305 } else { 315 } else {
306 eList = new LinkedList<Editor>(editorList.values()); 316 eList = new LinkedList<REPNode>(editorList.values());
307 } 317 }
308 //GUIに反映 318 //GUIに反映
309 Runnable doRun = new DoGUIUpdate(sList, eList, gui); 319 Runnable doRun = new DoGUIUpdate(sList, eList, gui);
310 gui.invokeLater(doRun); 320 gui.invokeLater(doRun);
311 } 321 }
312 322
313 323
314 324
315 void setMyHostName(String localHostName) { 325 public void setMyHostName(String localHostName) {
316 myHost = localHostName + receive_port; 326 myHost = localHostName + receive_port;
317 setHostToEditor(myHost); 327 setHostToEditor(myHost);
318 } 328 }
319 329
320 private void setHostToEditor(String myHost2) { 330 private void setHostToEditor(String myHost2) {
321 for(Editor editor : editorList.values()){ 331 for(REPNode editor : editorList.values()){
322 if (editor.channel!=null) 332 if (editor.channel!=null)
323 editor.setHost(myHost2); 333 editor.setHost(myHost2);
324 } 334 }
325 } 335 }
326 336
342 InetSocketAddress addr = new InetSocketAddress(host, port); 352 InetSocketAddress addr = new InetSocketAddress(host, port);
343 try { 353 try {
344 REPSocketChannel<REPCommand> sessionchannel = REPSocketChannel.<REPCommand>create(new REPCommandPacker()); 354 REPSocketChannel<REPCommand> sessionchannel = REPSocketChannel.<REPCommand>create(new REPCommandPacker());
345 sessionchannel.connect(addr); 355 sessionchannel.connect(addr);
346 while(!sessionchannel.finishConnect()); 356 while(!sessionchannel.finishConnect());
347 Forwarder sm = new FirstConnector(this); 357 REPNode sm = new FirstConnector(this);
348 registerChannel(sessionchannel, sm); 358 registerChannel(sessionchannel, sm);
349 sm_join(sm); 359 sm_join(sm);
350 }catch (IOException e) { 360 }catch (IOException e) {
351 } 361 }
352 } 362 }
357 367
358 /** 368 /**
359 * channel に SMCMD_SM_JOIN command を送る。 369 * channel に SMCMD_SM_JOIN command を送る。
360 * @param channel 370 * @param channel
361 */ 371 */
362 private void sm_join(Forwarder channel){ 372 private void sm_join(REPNode channel){
363 sm_join_channel = channel; 373 sm_join_channel = channel;
364 //SM_JOINコマンドを生成。 374 //SM_JOINコマンドを生成。
365 REPCommand command = new REPCommand(); 375 REPCommand command = new REPCommand();
366 command.setCMD(REP.SMCMD_SM_JOIN); 376 command.setCMD(REP.SMCMD_SM_JOIN);
367 command.setEID(-1); // request Parent SessionManagerID 377 command.setEID(-1); // request Parent SessionManagerID
385 */ 395 */
386 public void selectSession(SelectButtonEvent event) throws IOException { 396 public void selectSession(SelectButtonEvent event) throws IOException {
387 int sid = event.getSID(); 397 int sid = event.getSID();
388 Session session = sessionList.get(sid); 398 Session session = sessionList.get(sid);
389 if (session==null) throw new IOException(); 399 if (session==null) throw new IOException();
390 Editor editor = (Editor)event.getEditor(); 400 REPNode editor = event.getEditor();
391 if (editor.hasSession()) return; 401 if (editor.hasSession()) return;
392 // assert(getSMID(editor.eid)==smList.sessionManagerID()); 402 // assert(getSMID(editor.eid)==smList.sessionManagerID());
393 // assert(editor.channel!=null); 403 // assert(editor.channel!=null);
394 editor.setSID(sid); // mark as selected 404 editor.setSID(sid); // mark as selected
395 selectSession0(sid, session, editor.getEID(), editor); 405 selectSession0(sid, session, editor.getEID(), editor);
396 } 406 }
397 407
398 private void selectSession0(int sid, Session session, int eid, Editor editor) { 408 private void selectSession0(int sid, Session session, int eid, REPNode editor) {
399 if (editor.isDirect()&&editor.getEID()==eid) { 409 if (editor.isDirect()&&editor.getEID()==eid) {
400 selectSession(sid, session, editor.getEID(), editor); 410 selectSession(sid, session, editor.getEID(), editor);
401 } else { 411 } else {
402 // we don't have this editor, search the editor first. 412 // we don't have this editor, search the editor first.
403 Forwarder next = routingTable.toSessionManager(getSMID(eid)); 413 REPNode next = routingTable.toSessionManager(getSMID(eid));
404 // pass the select command to the next path. 414 // pass the select command to the next path.
405 REPCommand command = new REPCommand(); 415 REPCommand command = new REPCommand();
406 command.setCMD(REP.SMCMD_SELECT0); 416 command.setCMD(REP.SMCMD_SELECT0);
407 command.setSID(sid); 417 command.setSID(sid);
408 command.setEID(eid); 418 command.setEID(eid);
413 423
414 /* 424 /*
415 * Select Session Protocol handler 425 * Select Session Protocol handler
416 * called from GUI or incoming SMCMD_SELECT command. 426 * called from GUI or incoming SMCMD_SELECT command.
417 */ 427 */
418 private void selectSession(int sid, Session session, int eid, Forwarder editor) { 428 private void selectSession(int sid, Session session, int eid, REPNode editor) {
419 if(session.hasOwner()){ 429 if(session.hasOwner()){
420 // we have selected session. 430 // we have selected session.
421 REPCommand sendCommand = new REPCommand(); 431 REPCommand sendCommand = new REPCommand();
422 if (editor.isDirect()&&editor.getEID()==eid) { 432 if (editor.isDirect()&&editor.getEID()==eid) {
423 // Found directly connected joined editor. Send join_ack(). 433 // Found directly connected joined editor. Send join_ack().
438 forwardSelect(sid, session, eid, editor); 448 forwardSelect(sid, session, eid, editor);
439 } 449 }
440 } 450 }
441 451
442 private void forwardSelect(int sid, Session session, int eid, 452 private void forwardSelect(int sid, Session session, int eid,
443 Forwarder editor) { 453 REPNode editor) {
444 Forwarder next; 454 REPNode next;
445 // session searching continue... 455 // session searching continue...
446 next = routingTable.toSessionManager(getSMID(sid)); 456 next = routingTable.toSessionManager(getSMID(sid));
447 // make a forwarding channel here 457 // make a forwarding channel here
448 Forwarder f = createSessionDispatcher(sid, next); 458 REPNode f = createSessionForwarder(sid, next);
449 session.setFirstForwarder(f); 459 session.setFirstForwarder(f);
450 session.addForwarder(editor); 460 session.addForwarder(editor);
451 // pass the select command to the next path. 461 // pass the select command to the next path.
452 REPCommand command = new REPCommand(); 462 REPCommand command = new REPCommand();
453 command.setCMD(REP.SMCMD_SELECT); 463 command.setCMD(REP.SMCMD_SELECT);
455 command.setEID(eid); 465 command.setEID(eid);
456 command.setString(editor.getHost()); 466 command.setString(editor.getHost());
457 next.send(command); 467 next.send(command);
458 } 468 }
459 469
460 private Forwarder createSessionDispatcher(int sid, Forwarder editor) { 470 private REPNode createSessionForwarder(int sid, REPNode editor) {
461 Forwarder f = new Editor(this, false, makeID(editorList.newEid())); 471 REPNode f = new Forwarder(this);
472 f.setEID(makeID(editorList.newEid()));
462 f.setChannel(editor.channel); // incoming channel 473 f.setChannel(editor.channel); // incoming channel
463 f.setHost(myHost); 474 f.setHost(myHost);
464 f.setSID(sid); 475 f.setSID(sid);
465 return f; 476 return f;
466 } 477 }
477 488
478 /* 489 /*
479 * Create new editor in this sessin manager. A dummy editor 490 * Create new editor in this sessin manager. A dummy editor
480 * is created also. 491 * is created also.
481 */ 492 */
482 public Editor newEditor(REPSocketChannel<REPCommand> channel) { 493 public REPNode newEditor(REPSocketChannel<REPCommand> channel) {
483 int eid = makeID(editorList.newEid()); 494 int eid = makeID(editorList.newEid());
484 Editor editor = new Editor(this, eid, channel); 495 REPNode editor = new Editor(this, eid, channel);
485 editorList.add(editor); 496 editorList.add(editor);
486 return editor; 497 return editor;
487 } 498 }
488 499
489 /* 500 /*
490 * Create new session. 501 * Create new session.
491 */ 502 */
492 public Session newSession(Forwarder master) { 503 public Session newSession(REPNode master) {
493 int sid= makeID(sessionList.newSessionID()); 504 int sid= makeID(sessionList.newSessionID());
494 Session session = new Session(sid, master); 505 Session session = new Session(sid, master);
495 sessionList.put(sid, session); 506 sessionList.put(sid, session);
496 return session; 507 return session;
497 } 508 }
582 public int getPort() { 593 public int getPort() {
583 return receive_port; 594 return receive_port;
584 } 595 }
585 596
586 597
587 boolean sessionManage(Forwarder forwarder, REPCommand command) throws ClosedChannelException, 598 public boolean sessionManage(REPNode forwarder, REPCommand command) throws ClosedChannelException,
588 IOException { 599 IOException {
589 switch(command.cmd){ 600 switch(command.cmd){
590 601
591 // Session Manager Command 602 // Session Manager Command
592 603
650 661
651 case SMCMD_SELECT0: 662 case SMCMD_SELECT0:
652 /* 663 /*
653 * finding joining editor, do not make the path. 664 * finding joining editor, do not make the path.
654 */ 665 */
655 Forwarder editor = editorList.get(command.eid); 666 REPNode editor = editorList.get(command.eid);
656 if (editor==null|| !editor.isDirect()) { 667 if (editor==null|| !editor.isDirect()) {
657 Forwarder next = routingTable.toSessionManager(getSMID(command.eid)); 668 REPNode next = routingTable.toSessionManager(getSMID(command.eid));
658 next.send(command); 669 next.send(command);
659 break; 670 break;
660 } 671 }
661 // we've found the editor, fall thru. 672 // we've found the editor, fall thru.
662 case SMCMD_SELECT: 673 case SMCMD_SELECT:
669 session = new Session(command.sid, command.string,null); 680 session = new Session(command.sid, command.string,null);
670 sessionList.put(command.sid,session); 681 sessionList.put(command.sid,session);
671 } 682 }
672 // Do not directly addForwarder(forwarder). It may be 683 // Do not directly addForwarder(forwarder). It may be
673 // shared among sessions. 684 // shared among sessions.
674 Forwarder f = createSessionDispatcher(command.sid, forwarder); 685 REPNode f = createSessionForwarder(command.sid, forwarder);
675 session.addForwarder(f); // f.next is set up here. 686 session.addForwarder(f); // f.next is set up here.
676 selectSession(command.sid, session, command.eid, forwarder); 687 selectSession(command.sid, session, command.eid, forwarder);
677 } 688 }
678 break; 689 break;
679 case SMCMD_SELECT_ACK: 690 case SMCMD_SELECT_ACK:
729 return false; 740 return false;
730 } 741 }
731 return true; 742 return true;
732 } 743 }
733 744
734 private void registSessionManager(Forwarder forwarder, REPCommand command) { 745 private void registSessionManager(REPNode forwarder, REPCommand command) {
735 Forwarder sm; 746 REPNode sm;
736 int psid = command.eid; 747 int psid = command.eid;
737 if (forwarder.sid!=-1) { 748 if (forwarder.sid!=-1) {
738 // すでに channelはSessionManager Idを持っていて、 749 // すでに channelはSessionManager Idを持っていて、
739 // direct link ではないので、 750 // direct link ではないので、
740 // channel を持たないForwarderとして登録する 751 // channel を持たないForwarderとして登録する
768 779
769 /* 780 /*
770 * 指定されたeditorがlocalにあるかどうかを調べる。なければ、他に送る。戻って何回も探すことが 781 * 指定されたeditorがlocalにあるかどうかを調べる。なければ、他に送る。戻って何回も探すことが
771 * あり得るので、よろしくない。 782 * あり得るので、よろしくない。
772 */ 783 */
773 private void searchSelectedEditor(REPCommand command, Forwarder editor) { 784 private void searchSelectedEditor(REPCommand command, REPNode editor) {
774 for(;editor.isDirect();editor = editor.getNextForwarder()) { 785 for(;editor.isDirect();editor = editor.getNextForwarder()) {
775 if (editor.getEID()==command.eid) { 786 if (editor.getEID()==command.eid) {
776 // select したeditor を見つけた 787 // select したeditor を見つけた
777 command.cmd=REP.SMCMD_JOIN_ACK; 788 command.cmd=REP.SMCMD_JOIN_ACK;
778 editor.send(command); 789 editor.send(command);
819 /** 830 /**
820 * Register Editor to our editorList. No connection is made. 831 * Register Editor to our editorList. No connection is made.
821 * @param forwarder Editor to be add 832 * @param forwarder Editor to be add
822 * @param command 833 * @param command
823 */ 834 */
824 public void registEditor(Forwarder forwarder,REPCommand command) { 835 public void registEditor(REPNode forwarder,REPCommand command) {
825 // make ack for PUT/JOIN. Do not send this to the editor, 836 // make ack for PUT/JOIN. Do not send this to the editor,
826 // before select. After select, ack is sent to the editor. 837 // before select. After select, ack is sent to the editor.
827 Editor editor; 838 REPNode editor;
828 if (getSMID(command.eid)==smList.sessionManagerID()) { 839 if (getSMID(command.eid)==smList.sessionManagerID()) {
829 if (forwarder.isDirect()) { 840 if (forwarder.isDirect()) {
830 editor = (Editor)forwarder; 841 editor = (Editor)forwarder;
831 } else 842 } else
832 return; 843 return;
878 public void afterConnect() { 889 public void afterConnect() {
879 if (execAfterConnect!=null) execAfterConnect.exec(this); 890 if (execAfterConnect!=null) execAfterConnect.exec(this);
880 execAfterConnect = null; 891 execAfterConnect = null;
881 } 892 }
882 893
883 public void setParent(Forwarder fw) { 894 public void setParent(REPNode fw) {
884 smList.setParent(fw); 895 smList.setParent(fw);
885 } 896 }
886 897
887 public String toString() { 898 public String toString() {
888 int myId = 0; 899 int myId = 0;
889 if (smList!=null) myId = smList.sessionManagerID(); 900 if (smList!=null) myId = smList.sessionManagerID();
890 return "rep.SessionManager-"+myId+"@"+myHost+":"+receive_port; 901 return "rep.SessionManager-"+myId+"@"+myHost+":"+receive_port;
891 } 902 }
892 903
904 public void addWaitingSessionManager(REPNode fw, REPCommand command) {
905 smList.addWaitingSessionManager(fw, command) ;
906 }
907
893 908
894 } 909 }