comparison rep/SessionManager.java @ 355:98607350f7d1

*** empty log message ***
author kono
date Fri, 17 Oct 2008 22:11:34 +0900
parents 0d47ff22ee0e
children b18c24dcc5d2
comparison
equal deleted inserted replaced
354:6ea3aa6c795f 355:98607350f7d1
60 private int receive_port; 60 private int receive_port;
61 private int parent_port; 61 private int parent_port;
62 static final int DEFAULT_PORT = 8766; 62 static final int DEFAULT_PORT = 8766;
63 private static final int packetLimit = 200; 63 private static final int packetLimit = 200;
64 SessionXMLDecoder decoder = new SessionXMLDecoder(); 64 SessionXMLDecoder decoder = new SessionXMLDecoder();
65 65 private Forwarder sm_join_channel;
66 boolean smjoin_mode;
67 66
68 public static void main(String[] args) throws InterruptedException, IOException { 67 public static void main(String[] args) throws InterruptedException, IOException {
69 68
70 int port = DEFAULT_PORT; 69 int port = DEFAULT_PORT;
71 int port_s = DEFAULT_PORT; 70 int port_s = DEFAULT_PORT;
206 Set<REPSelectionKey<REPCommand>> keys = selector.selectedKeys1(); 205 Set<REPSelectionKey<REPCommand>> keys = selector.selectedKeys1();
207 for(REPSelectionKey<REPCommand> key : keys){ 206 for(REPSelectionKey<REPCommand> key : keys){
208 if(key.isAcceptable()){ 207 if(key.isAcceptable()){
209 REPSocketChannel<REPCommand> channel = key.accept(new REPCommandPacker()); 208 REPSocketChannel<REPCommand> channel = key.accept(new REPCommandPacker());
210 logger.writeLog("SessionManager.select() : key.isAcceptable : channel = " + channel); 209 logger.writeLog("SessionManager.select() : key.isAcceptable : channel = " + channel);
211 registerChannel (channel, SelectionKey.OP_READ,new Forwarder(this)); 210 registerChannel (channel, new FirstConnector(this));
212 channel = null; 211 channel = null;
213 212
214 }else if(key.isReadable()){ 213 }else if(key.isReadable()){
215 REPHandler handler = (REPHandler)(key.attachment()); 214 REPHandler handler = (REPHandler)(key.attachment());
216 try { 215 try {
221 } 220 }
222 } 221 }
223 } 222 }
224 } 223 }
225 224
226 void registerChannel(REPSocketChannel<REPCommand> channel, int ops,Forwarder handler) throws IOException { 225 void registerChannel(REPSocketChannel<REPCommand> channel,Forwarder handler) throws IOException {
227 if(channel == null) { 226 if(channel == null) {
228 return; 227 return;
229 } 228 }
230 handler.setChannel(channel); 229 handler.setChannel(channel);
231 channel.configureBlocking(false); 230 channel.configureBlocking(false);
232 channel.register(selector, ops, handler); 231 channel.register(selector, SelectionKey.OP_READ, handler);
233 } 232 }
234 233
235 234
236 void cancel_sm_join() { 235 void cancel_sm_join() {
237 smjoin_mode=false; 236 removeChannel(sm_join_channel);
237 sm_join_channel=null;
238 }
239
240
241 private void removeChannel(Forwarder sm_join_channel) {
242 REPSelectionKey<REPCommand> key = sm_join_channel.channel.keyFor1(selector);
243 key.cancel();
244 try {
245 sm_join_channel.channel.close();
246 } catch (IOException e) {
247 }
238 } 248 }
239 249
240 250
241 boolean hasSession(int sid) { 251 boolean hasSession(int sid) {
242 for(Session s:sessionList) { 252 for(Session s:sessionList) {
292 for(Editor editor : editorList){ 302 for(Editor editor : editorList){
293 editor.setHost(myHost2); 303 editor.setHost(myHost2);
294 } 304 }
295 } 305 }
296 306
307
308 /**
309 * Host 名のSession Manager に SM_JOIN する。自分は、Session を持っていては
310 * ならない。複数のSession Managerにjoinすることは出来ない。(NATを実装するまでは)。
311 * @param host
312 */
297 public void connectSession(String host) { 313 public void connectSession(String host) {
314 if (sm_join_channel!=null) return;
315 if (!sessionList.isEmpty()) return;
316 if (!smList.isMaster()) return;
298 int port = parent_port; 317 int port = parent_port;
299 InetSocketAddress addr = new InetSocketAddress(host, port); 318 InetSocketAddress addr = new InetSocketAddress(host, port);
300 try { 319 try {
301 REPSocketChannel<REPCommand> sessionchannel = REPSocketChannel.<REPCommand>create(new REPCommandPacker()); 320 REPSocketChannel<REPCommand> sessionchannel = REPSocketChannel.<REPCommand>create(new REPCommandPacker());
302 321
303 sessionchannel.connect(addr); 322 sessionchannel.connect(addr);
304 while(!sessionchannel.finishConnect()); 323 while(!sessionchannel.finishConnect());
305 Forwarder sm = new Forwarder(this); 324 Forwarder sm = new Forwarder(this);
306 registerChannel(sessionchannel, SelectionKey.OP_READ,sm); 325 registerChannel(sessionchannel, sm);
307 sm_join(sm); 326 sm_join(sm);
308 }catch (IOException e) { 327 }catch (IOException e) {
309 } 328 }
310 } 329 }
311 330
312 private void sm_join(Forwarder channel){ 331 private void sm_join(Forwarder channel){
313 smjoin_mode = true; 332 sm_join_channel = channel;
314 //SM_JOINコマンドを生成。 333 //SM_JOINコマンドを生成。
315 REPCommand command = new REPCommand(); 334 REPCommand command = new REPCommand();
316 command.setCMD(REP.SMCMD_SM_JOIN); 335 command.setCMD(REP.SMCMD_SM_JOIN);
317 command.setEID(-1); // request Parent SessionManagerID 336 command.setEID(-1); // request Parent SessionManagerID
318 command.setSID(-1); // request SessionManagerID 337 command.setSID(-1); // request SessionManagerID
319 338
320 //hostnameをセット。 339 //hostnameをセット。
321 setMyHostName(channel.getLocalHostName()); 340 setMyHostName(channel.getLocalHostName());
322 341
323 //XMLを生成。送信コマンドにセット。 342 //XMLを生成。送信コマンドにセット。
324 SessionXMLEncoder encoder = new SessionXMLEncoder(sessionList); 343 //SessionXMLEncoder encoder = new SessionXMLEncoder(sessionList);
325 String string = encoder.sessionListToXML(); 344 //String string = encoder.sessionListToXML();
345 String string = myHost;
326 command.setString(string); 346 command.setString(string);
327 347
328 //SM_JOINコマンドを送信。 348 //SM_JOINコマンドを送信。
329 channel.send(command); 349 channel.send(command);
330 //SessionManagerのListに追加。 350 //SessionManagerのListに追加。
437 public int getPort() { 457 public int getPort() {
438 return receive_port; 458 return receive_port;
439 } 459 }
440 460
441 461
442 boolean sessionManage(Forwarder forwarder, REPCommand receivedCommand) throws ClosedChannelException, 462 boolean sessionManage(Forwarder forwarder, REPCommand command) throws ClosedChannelException,
443 IOException, SAXException { 463 IOException, SAXException {
444 switch(receivedCommand.cmd){ 464 switch(command.cmd){
445 465
446 // Session Manager Command 466 // Session Manager Command
447 467
448 case SMCMD_JOIN: 468 case SMCMD_JOIN:
449 { 469 {
450 //どのSessionにも属さないエディタをリストに追加
451 //エディタとchannelは1対1 (ではない)
452 //エディタが新しくputする場合は新しくソケットを作る
453 // ここのeditorList はsessionのとは別物
454 Editor editor1 = new Editor(this,-1,forwarder.channel);
455 registerChannel(forwarder.channel,SelectionKey.OP_READ,editor1);
456 editor1.setHost(myHost);
457 editorList.add(editor1);
458
459 updateGUI(); 470 updateGUI();
460
461 } 471 }
462 472
463 break; 473 break;
464 474
465 case SMCMD_JOIN_ACK: 475 case SMCMD_JOIN_ACK:
471 //Sessionを生成 481 //Sessionを生成
472 // sessionIDってglobaly uniqueだから、本来は、 482 // sessionIDってglobaly uniqueだから、本来は、
473 // 自分の親に作ってもらう必要がある。自分が親なら自分で作って良い。 483 // 自分の親に作ってもらう必要がある。自分が親なら自分で作って良い。
474 484
475 int sid = sessionList.size(); 485 int sid = sessionList.size();
476 Editor editor = new Editor(this,0, forwarder.channel); 486 Editor editor = (Editor) forwarder;
477 registerChannel(forwarder.channel,SelectionKey.OP_READ,editor); 487 Session session = new Session(sid, command.string, (Editor)forwarder);
478 editorList.add(editor);
479 editor.setHost(myHost);
480 Session session = new Session(sid, receivedCommand.string, editor);
481 session.hasOwner(true); 488 session.hasOwner(true);
482 sessionList.add(session); 489 sessionList.add(session);
483 490
484 updateGUI(); 491 updateGUI();
485 492
486 //エディタにAckを送信 493 //エディタにAckを送信
487 REPCommand sendCommand = new REPCommand(receivedCommand); 494 REPCommand sendCommand = new REPCommand(command);
488 sendCommand.setCMD(REP.SMCMD_PUT_ACK); 495 sendCommand.setCMD(REP.SMCMD_PUT_ACK);
489 sendCommand.setEID(editor.getEID()); 496 sendCommand.setEID(editor.getEID());
490 sendCommand.setSID(session.getSID()); 497 sendCommand.setSID(session.getSID());
491 editor.send(sendCommand); 498 editor.send(sendCommand);
492 499
493 //他のSessionManagerへSessionの追加を報告 500 //他のSessionManagerへSessionの追加を報告
494 //親に送って、親から子へ 501 //親に送って、親から子へ
495 SessionXMLEncoder sessionEncoder = new SessionXMLEncoder(session); 502 SessionXMLEncoder sessionEncoder = new SessionXMLEncoder(session);
496 REPCommand command = new REPCommand(); 503 REPCommand command1 = new REPCommand();
497 command.setSID(session.getSID()); 504 command1.setSID(session.getSID());
498 command.setString(sessionEncoder.sessionListToXML()); 505 command1.setString(sessionEncoder.sessionListToXML());
499 command.setCMD(REP.SMCMD_UPDATE); 506 command1.setCMD(REP.SMCMD_UPDATE);
500 smList.sendToSlaves( command); 507 smList.sendToSlaves( command);
501 508
502 } 509 }
503 510
504 break; 511 break;
508 case SMCMD_SELECT: 515 case SMCMD_SELECT:
509 { 516 {
510 //他のSessionManagerをエディタとしてSessionに追加 517 //他のSessionManagerをエディタとしてSessionに追加
511 Forwarder next = new Forwarder(this); 518 Forwarder next = new Forwarder(this);
512 next.setChannel(forwarder.channel); 519 next.setChannel(forwarder.channel);
513 Session session = getSession(receivedCommand.sid); 520 Session session = getSession(command.sid);
514 session.addForwarder(next); 521 session.addForwarder(next);
515 522
516 if(session.hasOwner()){ 523 if(session.hasOwner()){
517 //このSessionManagerがオーナーを持っている場合、Sessionにエディタを追加し、エディタへAckを返す 524 //このSessionManagerがオーナーを持っている場合、Sessionにエディタを追加し、エディタへAckを返す
518 REPCommand sendCommand = new REPCommand(receivedCommand); 525 REPCommand sendCommand = new REPCommand(command);
519 sendCommand.setCMD(REP.SMCMD_SELECT_ACK); 526 sendCommand.setCMD(REP.SMCMD_SELECT_ACK);
520 sendCommand.setEID(next.getEID()); 527 sendCommand.setEID(next.getEID());
521 next.send(sendCommand); 528 next.send(sendCommand);
522 }else{ 529 }else{
523 //オーナーを持ってない場合は、オーナーを持っているSessionManagerへSELECTコマンドを中継する 530 //オーナーを持ってない場合は、オーナーを持っているSessionManagerへSELECTコマンドを中継する
524 Forwarder owner = session.getOwner(); 531 Forwarder owner = session.getOwner();
525 owner.send(receivedCommand); 532 owner.send(command);
526 } 533 }
527 } 534 }
528 535
529 break; 536 break;
530 537
531 case SMCMD_SELECT_ACK: 538 case SMCMD_SELECT_ACK:
532 { 539 {
533 String hostport = receivedCommand.string; 540 String hostport = command.string;
534 Forwarder editor1 = getEditor(hostport); 541 Forwarder editor1 = getEditor(hostport);
535 542
536 if(editor1 != null) { 543 if(editor1 != null) {
537 //host, port を見て、このコマンドが自分が送信したSelectコマンドのAckかどうかを判断する 544 //host, port を見て、このコマンドが自分が送信したSelectコマンドのAckかどうかを判断する
538 REPCommand command = new REPCommand(); 545 REPCommand command1 = new REPCommand();
539 command.setCMD(REP.SMCMD_JOIN_ACK); 546 command1.setCMD(REP.SMCMD_JOIN_ACK);
540 command.setSID(receivedCommand.sid); 547 command1.setSID(command.sid);
541 command.setEID(receivedCommand.eid); 548 command1.setEID(command.eid);
542 editor1.send(command); 549 editor1.send(command);
543 550
544 }else{ 551 }else{
545 //自分が送信したコマンドでなければ、次のSessionManagerへ中継する 552 //自分が送信したコマンドでなければ、次のSessionManagerへ中継する
546 smList.sendToSlaves(receivedCommand); 553 smList.sendToSlaves(command);
547 } 554 }
548 } 555 }
549 556
550 break; 557 break;
551 case SMCMD_SM_JOIN: 558 case SMCMD_SM_JOIN:
552 559
553 { 560 {
554 // SM_JOIN中にSMCMD_SM_JOINが来たら、自分のSM_JOINを 561 // SM_JOIN中にSMCMD_SM_JOINが来たら、これはループなので、
555 // 取り消す。 562 ///自分のSM_JOINを取り消す。
556 if (smjoin_mode) cancel_sm_join(); 563 if (sm_join_channel!=null) cancel_sm_join();
557 // SMCMD_SM_JOIN は、master まで上昇する。 564 // SMCMD_SM_JOIN は、master まで上昇する。
558 // masterでなければ、自分のparentに転送する。 565 // masterでなければ、自分のparentに転送する。
559 if(smList.isMaster()) { 566 if(isMaster()) {
560 // master であれば、SessionManager IDを決めて、 567 // master であれば、SessionManager IDを決めて、
561 // 自分のsmList に登録 568 // 自分のsmList に登録
562 int sid = smList.addNewSessionManager(receivedCommand); 569 Forwarder sm;
563 //SessionListからXMLを生成。 570 int psid = command.eid;
564 //joinしてきたSessionManagerに対してACKを送信。 571 if (forwarder.sid!=-1) {
565 SessionXMLEncoder sessionlistEncoder = new SessionXMLEncoder(sessionList); 572 // すでに channelはSessionManager Idを持っていて、
566 REPCommand sendCommand = new REPCommand(); 573 // direct link ではないので、
574 // channel を持たないForwarderとして登録する
575 sm = new Forwarder(this);
576 } else {
577 sm = forwarder;
578 }
579 int sid = smList.addNewSessionManager(sm,command);
580 REPCommand sendCommand = makeREPCommandWithSessionList(REP.SMCMD_SM_JOIN_ACK);
581 // command.eid==smList.sesionManagerID() の場合は、
582 // 待っている自分の下のsessionManagerにsidをassignする必要がある。
567 sendCommand.setSID(sid); // new Session manager ID 583 sendCommand.setSID(sid); // new Session manager ID
568 // 複数のSM_JOIN_ACKを識別するには、最初にSM_JOINを受け付けた 584 // 複数のSM_JOIN_ACKを識別するには、最初にSM_JOINを受け付けた
569 // Session manager IDを使う。 585 // Session manager IDを使う。
570 sendCommand.setEID(receivedCommand.eid); 586 sendCommand.setEID(psid);
571 sendCommand.setCMD(REP.SMCMD_SM_JOIN_ACK); 587 send_sm_join_ack(psid, sid, sendCommand);
572 sendCommand.setString(sessionlistEncoder.sessionListToXML()); 588 } else {
573 smList.sendToSlaves(sendCommand); 589 if (forwarder.sid==-1) {
574 break; 590 // direct link の場合は、識別のために、EIDに直上の
575 } 591 // smid を入れておく。
576 // 592 command.setEID(smList.sessionManagerID());
577 593 }
578 //XMLからSessionListオブジェクトを生成する。 594 smList.sendToMaster(command);
579 //SessionList receivedSessionList = decoder.decode(receivedCommand.string); 595 }
580
581 //myHost を設定。
582 //立ち上げ時にやるとlocalhostしか取れない
583 if(myHost == null) setMyHostName(forwarder.getLocalHostName());
584
585 //maxHost を設定。
586 // if(setMaxHost(channel, receivedSessionList.getMaxHost())){
587 // REPCommand sendCommand = new REPCommand();
588 // sendCommand.setCMD(REP.SMCMD_CH_MASTER);
589 // sendCommand.setString(maxHost);
590 // smList.sendExcept(channel, sendCommand);
591 // }
592
593
594 } 596 }
595 break; 597 break;
596 598
597 case SMCMD_SM_JOIN_ACK: 599 case SMCMD_SM_JOIN_ACK:
598 600 send_sm_join_ack(command.eid, command.sid, command);
599 //XMLからSessionListオブジェクトを生成。
600 //SessionList receivedSessionList2 = decoder.decode(receivedCommand.string);
601
602 //maxHostを決定。
603 // if(setMaxHost(channel, receivedSessionList2.getMaxHost())){
604 // REPCommand sendCommand = new REPCommand();
605 // sendCommand.setCMD(REP.SMCMD_CH_MASTER);
606 // sendCommand.setString(maxHost);
607 // smList.sendExcept(channel, sendCommand);
608 // }
609
610
611 break; 601 break;
612 602
613 case SMCMD_UPDATE: 603 case SMCMD_UPDATE:
614 { 604 {
615 SessionList receivedSessionList3 = decoder.decode(receivedCommand.string); 605 SessionList receivedSessionList3 = decoder.decode(command.string);
616 606
617 //UPDATEコマンドにより送られてきたSessionの情報を追加する 607 //UPDATEコマンドにより送られてきたSessionの情報を追加する
618 LinkedList<Session> list = receivedSessionList3.getList(); 608 LinkedList<Session> list = receivedSessionList3.getList();
619 for(Session session : list){ 609 for(Session session : list){
620 session.getEditorList().get(0).setChannel(forwarder.channel); 610 session.getEditorList().get(0).setChannel(forwarder.channel);
621 sessionList.add(session); 611 sessionList.add(session);
622 } 612 }
623 613
624 //他のSessionManagerへ中継する 614 //他のSessionManagerへ中継する
625 smList.sendToSlaves(receivedCommand); 615 smList.sendToSlaves(command);
626 616
627 updateGUI(); 617 updateGUI();
628 } 618 }
629 break; 619 break;
630 620
631 case SMCMD_UPDATE_ACK: 621 case SMCMD_UPDATE_ACK:
632 { 622 {
633 if(!hasSession(receivedCommand.sid)) { 623 if(!hasSession(command.sid)) {
634 // accept new Session 624 // accept new Session
635 // ここで初めてsession id が決まる。 625 // ここで初めてsession id が決まる。
636 // このコマンドは、master session manager が出すはず 626 // このコマンドは、master session manager が出すはず
637 Forwarder sm = new Forwarder(this); 627 Forwarder sm = new Forwarder(this);
638 sm.setChannel(forwarder.channel); 628 sm.setChannel(forwarder.channel);
639 Session session = new Session(receivedCommand.sid,receivedCommand.string,null); 629 Session session = new Session(command.sid,command.string,null);
640 session.addForwarder(sm); 630 session.addForwarder(sm);
641 631
642 sessionList.add(session); 632 sessionList.add(session);
643 633
644 updateGUI(); 634 updateGUI();
645 } 635 }
646 smList.sendToSlaves(receivedCommand); 636 smList.sendToSlaves(command);
647 } 637 }
648 break; 638 break;
649 639
650 // case SMCMD_CH_MASTER: 640 // case SMCMD_CH_MASTER:
651 // { 641 // {
664 return false; 654 return false;
665 } 655 }
666 return true; 656 return true;
667 } 657 }
668 658
659
660 void send_sm_join_ack(int psid, int sid,REPCommand sendCommand) {
661 if (psid==smList.sessionManagerID()) {
662 // 直下のsessionManagerにIDを割り振る必要がある。
663 smList.assignSessionManagerIDtoWaitingSM(sid);
664 // ここで smList に一つだけ追加されるので
665 // 待っている最初のsm一つにだけ、sm_joinが新たに送られる。
666 }
667 smList.sendToSlaves(sendCommand);
668 }
669
670
671 private REPCommand makeREPCommandWithSessionList(REP cmd) {
672 //SessionListからXMLを生成。
673 //joinしてきたSessionManagerに対してACKを送信。
674 SessionXMLEncoder sessionlistEncoder = new SessionXMLEncoder(sessionList);
675 REPCommand sendCommand = new REPCommand();
676 sendCommand.setCMD(cmd);
677 sendCommand.setString(sessionlistEncoder.sessionListToXML());
678 return sendCommand;
679 }
680
681
682 public boolean isMaster() {
683 return smList.isMaster();
684 }
685
686
687 public void setSessionManagerID(int sid) {
688 smList.setSessionManagerID(sid);
689 }
690
669 } 691 }