Mercurial > hg > RemoteEditor > REPSessionManager
comparison rep/SessionManager.java @ 355:98607350f7d1
*** empty log message ***
author | kono |
---|---|
date | Fri, 17 Oct 2008 22:11:34 +0900 |
parents | 0d47ff22ee0e |
children | b18c24dcc5d2 |
comparison
equal
deleted
inserted
replaced
354:6ea3aa6c795f | 355:98607350f7d1 |
---|---|
60 private int receive_port; | 60 private int receive_port; |
61 private int parent_port; | 61 private int parent_port; |
62 static final int DEFAULT_PORT = 8766; | 62 static final int DEFAULT_PORT = 8766; |
63 private static final int packetLimit = 200; | 63 private static final int packetLimit = 200; |
64 SessionXMLDecoder decoder = new SessionXMLDecoder(); | 64 SessionXMLDecoder decoder = new SessionXMLDecoder(); |
65 | 65 private Forwarder sm_join_channel; |
66 boolean smjoin_mode; | |
67 | 66 |
68 public static void main(String[] args) throws InterruptedException, IOException { | 67 public static void main(String[] args) throws InterruptedException, IOException { |
69 | 68 |
70 int port = DEFAULT_PORT; | 69 int port = DEFAULT_PORT; |
71 int port_s = DEFAULT_PORT; | 70 int port_s = DEFAULT_PORT; |
206 Set<REPSelectionKey<REPCommand>> keys = selector.selectedKeys1(); | 205 Set<REPSelectionKey<REPCommand>> keys = selector.selectedKeys1(); |
207 for(REPSelectionKey<REPCommand> key : keys){ | 206 for(REPSelectionKey<REPCommand> key : keys){ |
208 if(key.isAcceptable()){ | 207 if(key.isAcceptable()){ |
209 REPSocketChannel<REPCommand> channel = key.accept(new REPCommandPacker()); | 208 REPSocketChannel<REPCommand> channel = key.accept(new REPCommandPacker()); |
210 logger.writeLog("SessionManager.select() : key.isAcceptable : channel = " + channel); | 209 logger.writeLog("SessionManager.select() : key.isAcceptable : channel = " + channel); |
211 registerChannel (channel, SelectionKey.OP_READ,new Forwarder(this)); | 210 registerChannel (channel, new FirstConnector(this)); |
212 channel = null; | 211 channel = null; |
213 | 212 |
214 }else if(key.isReadable()){ | 213 }else if(key.isReadable()){ |
215 REPHandler handler = (REPHandler)(key.attachment()); | 214 REPHandler handler = (REPHandler)(key.attachment()); |
216 try { | 215 try { |
221 } | 220 } |
222 } | 221 } |
223 } | 222 } |
224 } | 223 } |
225 | 224 |
226 void registerChannel(REPSocketChannel<REPCommand> channel, int ops,Forwarder handler) throws IOException { | 225 void registerChannel(REPSocketChannel<REPCommand> channel,Forwarder handler) throws IOException { |
227 if(channel == null) { | 226 if(channel == null) { |
228 return; | 227 return; |
229 } | 228 } |
230 handler.setChannel(channel); | 229 handler.setChannel(channel); |
231 channel.configureBlocking(false); | 230 channel.configureBlocking(false); |
232 channel.register(selector, ops, handler); | 231 channel.register(selector, SelectionKey.OP_READ, handler); |
233 } | 232 } |
234 | 233 |
235 | 234 |
236 void cancel_sm_join() { | 235 void cancel_sm_join() { |
237 smjoin_mode=false; | 236 removeChannel(sm_join_channel); |
237 sm_join_channel=null; | |
238 } | |
239 | |
240 | |
241 private void removeChannel(Forwarder sm_join_channel) { | |
242 REPSelectionKey<REPCommand> key = sm_join_channel.channel.keyFor1(selector); | |
243 key.cancel(); | |
244 try { | |
245 sm_join_channel.channel.close(); | |
246 } catch (IOException e) { | |
247 } | |
238 } | 248 } |
239 | 249 |
240 | 250 |
241 boolean hasSession(int sid) { | 251 boolean hasSession(int sid) { |
242 for(Session s:sessionList) { | 252 for(Session s:sessionList) { |
292 for(Editor editor : editorList){ | 302 for(Editor editor : editorList){ |
293 editor.setHost(myHost2); | 303 editor.setHost(myHost2); |
294 } | 304 } |
295 } | 305 } |
296 | 306 |
307 | |
308 /** | |
309 * Host 名のSession Manager に SM_JOIN する。自分は、Session を持っていては | |
310 * ならない。複数のSession Managerにjoinすることは出来ない。(NATを実装するまでは)。 | |
311 * @param host | |
312 */ | |
297 public void connectSession(String host) { | 313 public void connectSession(String host) { |
314 if (sm_join_channel!=null) return; | |
315 if (!sessionList.isEmpty()) return; | |
316 if (!smList.isMaster()) return; | |
298 int port = parent_port; | 317 int port = parent_port; |
299 InetSocketAddress addr = new InetSocketAddress(host, port); | 318 InetSocketAddress addr = new InetSocketAddress(host, port); |
300 try { | 319 try { |
301 REPSocketChannel<REPCommand> sessionchannel = REPSocketChannel.<REPCommand>create(new REPCommandPacker()); | 320 REPSocketChannel<REPCommand> sessionchannel = REPSocketChannel.<REPCommand>create(new REPCommandPacker()); |
302 | 321 |
303 sessionchannel.connect(addr); | 322 sessionchannel.connect(addr); |
304 while(!sessionchannel.finishConnect()); | 323 while(!sessionchannel.finishConnect()); |
305 Forwarder sm = new Forwarder(this); | 324 Forwarder sm = new Forwarder(this); |
306 registerChannel(sessionchannel, SelectionKey.OP_READ,sm); | 325 registerChannel(sessionchannel, sm); |
307 sm_join(sm); | 326 sm_join(sm); |
308 }catch (IOException e) { | 327 }catch (IOException e) { |
309 } | 328 } |
310 } | 329 } |
311 | 330 |
312 private void sm_join(Forwarder channel){ | 331 private void sm_join(Forwarder channel){ |
313 smjoin_mode = true; | 332 sm_join_channel = channel; |
314 //SM_JOINコマンドを生成。 | 333 //SM_JOINコマンドを生成。 |
315 REPCommand command = new REPCommand(); | 334 REPCommand command = new REPCommand(); |
316 command.setCMD(REP.SMCMD_SM_JOIN); | 335 command.setCMD(REP.SMCMD_SM_JOIN); |
317 command.setEID(-1); // request Parent SessionManagerID | 336 command.setEID(-1); // request Parent SessionManagerID |
318 command.setSID(-1); // request SessionManagerID | 337 command.setSID(-1); // request SessionManagerID |
319 | 338 |
320 //hostnameをセット。 | 339 //hostnameをセット。 |
321 setMyHostName(channel.getLocalHostName()); | 340 setMyHostName(channel.getLocalHostName()); |
322 | 341 |
323 //XMLを生成。送信コマンドにセット。 | 342 //XMLを生成。送信コマンドにセット。 |
324 SessionXMLEncoder encoder = new SessionXMLEncoder(sessionList); | 343 //SessionXMLEncoder encoder = new SessionXMLEncoder(sessionList); |
325 String string = encoder.sessionListToXML(); | 344 //String string = encoder.sessionListToXML(); |
345 String string = myHost; | |
326 command.setString(string); | 346 command.setString(string); |
327 | 347 |
328 //SM_JOINコマンドを送信。 | 348 //SM_JOINコマンドを送信。 |
329 channel.send(command); | 349 channel.send(command); |
330 //SessionManagerのListに追加。 | 350 //SessionManagerのListに追加。 |
437 public int getPort() { | 457 public int getPort() { |
438 return receive_port; | 458 return receive_port; |
439 } | 459 } |
440 | 460 |
441 | 461 |
442 boolean sessionManage(Forwarder forwarder, REPCommand receivedCommand) throws ClosedChannelException, | 462 boolean sessionManage(Forwarder forwarder, REPCommand command) throws ClosedChannelException, |
443 IOException, SAXException { | 463 IOException, SAXException { |
444 switch(receivedCommand.cmd){ | 464 switch(command.cmd){ |
445 | 465 |
446 // Session Manager Command | 466 // Session Manager Command |
447 | 467 |
448 case SMCMD_JOIN: | 468 case SMCMD_JOIN: |
449 { | 469 { |
450 //どのSessionにも属さないエディタをリストに追加 | |
451 //エディタとchannelは1対1 (ではない) | |
452 //エディタが新しくputする場合は新しくソケットを作る | |
453 // ここのeditorList はsessionのとは別物 | |
454 Editor editor1 = new Editor(this,-1,forwarder.channel); | |
455 registerChannel(forwarder.channel,SelectionKey.OP_READ,editor1); | |
456 editor1.setHost(myHost); | |
457 editorList.add(editor1); | |
458 | |
459 updateGUI(); | 470 updateGUI(); |
460 | |
461 } | 471 } |
462 | 472 |
463 break; | 473 break; |
464 | 474 |
465 case SMCMD_JOIN_ACK: | 475 case SMCMD_JOIN_ACK: |
471 //Sessionを生成 | 481 //Sessionを生成 |
472 // sessionIDってglobaly uniqueだから、本来は、 | 482 // sessionIDってglobaly uniqueだから、本来は、 |
473 // 自分の親に作ってもらう必要がある。自分が親なら自分で作って良い。 | 483 // 自分の親に作ってもらう必要がある。自分が親なら自分で作って良い。 |
474 | 484 |
475 int sid = sessionList.size(); | 485 int sid = sessionList.size(); |
476 Editor editor = new Editor(this,0, forwarder.channel); | 486 Editor editor = (Editor) forwarder; |
477 registerChannel(forwarder.channel,SelectionKey.OP_READ,editor); | 487 Session session = new Session(sid, command.string, (Editor)forwarder); |
478 editorList.add(editor); | |
479 editor.setHost(myHost); | |
480 Session session = new Session(sid, receivedCommand.string, editor); | |
481 session.hasOwner(true); | 488 session.hasOwner(true); |
482 sessionList.add(session); | 489 sessionList.add(session); |
483 | 490 |
484 updateGUI(); | 491 updateGUI(); |
485 | 492 |
486 //エディタにAckを送信 | 493 //エディタにAckを送信 |
487 REPCommand sendCommand = new REPCommand(receivedCommand); | 494 REPCommand sendCommand = new REPCommand(command); |
488 sendCommand.setCMD(REP.SMCMD_PUT_ACK); | 495 sendCommand.setCMD(REP.SMCMD_PUT_ACK); |
489 sendCommand.setEID(editor.getEID()); | 496 sendCommand.setEID(editor.getEID()); |
490 sendCommand.setSID(session.getSID()); | 497 sendCommand.setSID(session.getSID()); |
491 editor.send(sendCommand); | 498 editor.send(sendCommand); |
492 | 499 |
493 //他のSessionManagerへSessionの追加を報告 | 500 //他のSessionManagerへSessionの追加を報告 |
494 //親に送って、親から子へ | 501 //親に送って、親から子へ |
495 SessionXMLEncoder sessionEncoder = new SessionXMLEncoder(session); | 502 SessionXMLEncoder sessionEncoder = new SessionXMLEncoder(session); |
496 REPCommand command = new REPCommand(); | 503 REPCommand command1 = new REPCommand(); |
497 command.setSID(session.getSID()); | 504 command1.setSID(session.getSID()); |
498 command.setString(sessionEncoder.sessionListToXML()); | 505 command1.setString(sessionEncoder.sessionListToXML()); |
499 command.setCMD(REP.SMCMD_UPDATE); | 506 command1.setCMD(REP.SMCMD_UPDATE); |
500 smList.sendToSlaves( command); | 507 smList.sendToSlaves( command); |
501 | 508 |
502 } | 509 } |
503 | 510 |
504 break; | 511 break; |
508 case SMCMD_SELECT: | 515 case SMCMD_SELECT: |
509 { | 516 { |
510 //他のSessionManagerをエディタとしてSessionに追加 | 517 //他のSessionManagerをエディタとしてSessionに追加 |
511 Forwarder next = new Forwarder(this); | 518 Forwarder next = new Forwarder(this); |
512 next.setChannel(forwarder.channel); | 519 next.setChannel(forwarder.channel); |
513 Session session = getSession(receivedCommand.sid); | 520 Session session = getSession(command.sid); |
514 session.addForwarder(next); | 521 session.addForwarder(next); |
515 | 522 |
516 if(session.hasOwner()){ | 523 if(session.hasOwner()){ |
517 //このSessionManagerがオーナーを持っている場合、Sessionにエディタを追加し、エディタへAckを返す | 524 //このSessionManagerがオーナーを持っている場合、Sessionにエディタを追加し、エディタへAckを返す |
518 REPCommand sendCommand = new REPCommand(receivedCommand); | 525 REPCommand sendCommand = new REPCommand(command); |
519 sendCommand.setCMD(REP.SMCMD_SELECT_ACK); | 526 sendCommand.setCMD(REP.SMCMD_SELECT_ACK); |
520 sendCommand.setEID(next.getEID()); | 527 sendCommand.setEID(next.getEID()); |
521 next.send(sendCommand); | 528 next.send(sendCommand); |
522 }else{ | 529 }else{ |
523 //オーナーを持ってない場合は、オーナーを持っているSessionManagerへSELECTコマンドを中継する | 530 //オーナーを持ってない場合は、オーナーを持っているSessionManagerへSELECTコマンドを中継する |
524 Forwarder owner = session.getOwner(); | 531 Forwarder owner = session.getOwner(); |
525 owner.send(receivedCommand); | 532 owner.send(command); |
526 } | 533 } |
527 } | 534 } |
528 | 535 |
529 break; | 536 break; |
530 | 537 |
531 case SMCMD_SELECT_ACK: | 538 case SMCMD_SELECT_ACK: |
532 { | 539 { |
533 String hostport = receivedCommand.string; | 540 String hostport = command.string; |
534 Forwarder editor1 = getEditor(hostport); | 541 Forwarder editor1 = getEditor(hostport); |
535 | 542 |
536 if(editor1 != null) { | 543 if(editor1 != null) { |
537 //host, port を見て、このコマンドが自分が送信したSelectコマンドのAckかどうかを判断する | 544 //host, port を見て、このコマンドが自分が送信したSelectコマンドのAckかどうかを判断する |
538 REPCommand command = new REPCommand(); | 545 REPCommand command1 = new REPCommand(); |
539 command.setCMD(REP.SMCMD_JOIN_ACK); | 546 command1.setCMD(REP.SMCMD_JOIN_ACK); |
540 command.setSID(receivedCommand.sid); | 547 command1.setSID(command.sid); |
541 command.setEID(receivedCommand.eid); | 548 command1.setEID(command.eid); |
542 editor1.send(command); | 549 editor1.send(command); |
543 | 550 |
544 }else{ | 551 }else{ |
545 //自分が送信したコマンドでなければ、次のSessionManagerへ中継する | 552 //自分が送信したコマンドでなければ、次のSessionManagerへ中継する |
546 smList.sendToSlaves(receivedCommand); | 553 smList.sendToSlaves(command); |
547 } | 554 } |
548 } | 555 } |
549 | 556 |
550 break; | 557 break; |
551 case SMCMD_SM_JOIN: | 558 case SMCMD_SM_JOIN: |
552 | 559 |
553 { | 560 { |
554 // SM_JOIN中にSMCMD_SM_JOINが来たら、自分のSM_JOINを | 561 // SM_JOIN中にSMCMD_SM_JOINが来たら、これはループなので、 |
555 // 取り消す。 | 562 ///自分のSM_JOINを取り消す。 |
556 if (smjoin_mode) cancel_sm_join(); | 563 if (sm_join_channel!=null) cancel_sm_join(); |
557 // SMCMD_SM_JOIN は、master まで上昇する。 | 564 // SMCMD_SM_JOIN は、master まで上昇する。 |
558 // masterでなければ、自分のparentに転送する。 | 565 // masterでなければ、自分のparentに転送する。 |
559 if(smList.isMaster()) { | 566 if(isMaster()) { |
560 // master であれば、SessionManager IDを決めて、 | 567 // master であれば、SessionManager IDを決めて、 |
561 // 自分のsmList に登録 | 568 // 自分のsmList に登録 |
562 int sid = smList.addNewSessionManager(receivedCommand); | 569 Forwarder sm; |
563 //SessionListからXMLを生成。 | 570 int psid = command.eid; |
564 //joinしてきたSessionManagerに対してACKを送信。 | 571 if (forwarder.sid!=-1) { |
565 SessionXMLEncoder sessionlistEncoder = new SessionXMLEncoder(sessionList); | 572 // すでに channelはSessionManager Idを持っていて、 |
566 REPCommand sendCommand = new REPCommand(); | 573 // direct link ではないので、 |
574 // channel を持たないForwarderとして登録する | |
575 sm = new Forwarder(this); | |
576 } else { | |
577 sm = forwarder; | |
578 } | |
579 int sid = smList.addNewSessionManager(sm,command); | |
580 REPCommand sendCommand = makeREPCommandWithSessionList(REP.SMCMD_SM_JOIN_ACK); | |
581 // command.eid==smList.sesionManagerID() の場合は、 | |
582 // 待っている自分の下のsessionManagerにsidをassignする必要がある。 | |
567 sendCommand.setSID(sid); // new Session manager ID | 583 sendCommand.setSID(sid); // new Session manager ID |
568 // 複数のSM_JOIN_ACKを識別するには、最初にSM_JOINを受け付けた | 584 // 複数のSM_JOIN_ACKを識別するには、最初にSM_JOINを受け付けた |
569 // Session manager IDを使う。 | 585 // Session manager IDを使う。 |
570 sendCommand.setEID(receivedCommand.eid); | 586 sendCommand.setEID(psid); |
571 sendCommand.setCMD(REP.SMCMD_SM_JOIN_ACK); | 587 send_sm_join_ack(psid, sid, sendCommand); |
572 sendCommand.setString(sessionlistEncoder.sessionListToXML()); | 588 } else { |
573 smList.sendToSlaves(sendCommand); | 589 if (forwarder.sid==-1) { |
574 break; | 590 // direct link の場合は、識別のために、EIDに直上の |
575 } | 591 // smid を入れておく。 |
576 // | 592 command.setEID(smList.sessionManagerID()); |
577 | 593 } |
578 //XMLからSessionListオブジェクトを生成する。 | 594 smList.sendToMaster(command); |
579 //SessionList receivedSessionList = decoder.decode(receivedCommand.string); | 595 } |
580 | |
581 //myHost を設定。 | |
582 //立ち上げ時にやるとlocalhostしか取れない | |
583 if(myHost == null) setMyHostName(forwarder.getLocalHostName()); | |
584 | |
585 //maxHost を設定。 | |
586 // if(setMaxHost(channel, receivedSessionList.getMaxHost())){ | |
587 // REPCommand sendCommand = new REPCommand(); | |
588 // sendCommand.setCMD(REP.SMCMD_CH_MASTER); | |
589 // sendCommand.setString(maxHost); | |
590 // smList.sendExcept(channel, sendCommand); | |
591 // } | |
592 | |
593 | |
594 } | 596 } |
595 break; | 597 break; |
596 | 598 |
597 case SMCMD_SM_JOIN_ACK: | 599 case SMCMD_SM_JOIN_ACK: |
598 | 600 send_sm_join_ack(command.eid, command.sid, command); |
599 //XMLからSessionListオブジェクトを生成。 | |
600 //SessionList receivedSessionList2 = decoder.decode(receivedCommand.string); | |
601 | |
602 //maxHostを決定。 | |
603 // if(setMaxHost(channel, receivedSessionList2.getMaxHost())){ | |
604 // REPCommand sendCommand = new REPCommand(); | |
605 // sendCommand.setCMD(REP.SMCMD_CH_MASTER); | |
606 // sendCommand.setString(maxHost); | |
607 // smList.sendExcept(channel, sendCommand); | |
608 // } | |
609 | |
610 | |
611 break; | 601 break; |
612 | 602 |
613 case SMCMD_UPDATE: | 603 case SMCMD_UPDATE: |
614 { | 604 { |
615 SessionList receivedSessionList3 = decoder.decode(receivedCommand.string); | 605 SessionList receivedSessionList3 = decoder.decode(command.string); |
616 | 606 |
617 //UPDATEコマンドにより送られてきたSessionの情報を追加する | 607 //UPDATEコマンドにより送られてきたSessionの情報を追加する |
618 LinkedList<Session> list = receivedSessionList3.getList(); | 608 LinkedList<Session> list = receivedSessionList3.getList(); |
619 for(Session session : list){ | 609 for(Session session : list){ |
620 session.getEditorList().get(0).setChannel(forwarder.channel); | 610 session.getEditorList().get(0).setChannel(forwarder.channel); |
621 sessionList.add(session); | 611 sessionList.add(session); |
622 } | 612 } |
623 | 613 |
624 //他のSessionManagerへ中継する | 614 //他のSessionManagerへ中継する |
625 smList.sendToSlaves(receivedCommand); | 615 smList.sendToSlaves(command); |
626 | 616 |
627 updateGUI(); | 617 updateGUI(); |
628 } | 618 } |
629 break; | 619 break; |
630 | 620 |
631 case SMCMD_UPDATE_ACK: | 621 case SMCMD_UPDATE_ACK: |
632 { | 622 { |
633 if(!hasSession(receivedCommand.sid)) { | 623 if(!hasSession(command.sid)) { |
634 // accept new Session | 624 // accept new Session |
635 // ここで初めてsession id が決まる。 | 625 // ここで初めてsession id が決まる。 |
636 // このコマンドは、master session manager が出すはず | 626 // このコマンドは、master session manager が出すはず |
637 Forwarder sm = new Forwarder(this); | 627 Forwarder sm = new Forwarder(this); |
638 sm.setChannel(forwarder.channel); | 628 sm.setChannel(forwarder.channel); |
639 Session session = new Session(receivedCommand.sid,receivedCommand.string,null); | 629 Session session = new Session(command.sid,command.string,null); |
640 session.addForwarder(sm); | 630 session.addForwarder(sm); |
641 | 631 |
642 sessionList.add(session); | 632 sessionList.add(session); |
643 | 633 |
644 updateGUI(); | 634 updateGUI(); |
645 } | 635 } |
646 smList.sendToSlaves(receivedCommand); | 636 smList.sendToSlaves(command); |
647 } | 637 } |
648 break; | 638 break; |
649 | 639 |
650 // case SMCMD_CH_MASTER: | 640 // case SMCMD_CH_MASTER: |
651 // { | 641 // { |
664 return false; | 654 return false; |
665 } | 655 } |
666 return true; | 656 return true; |
667 } | 657 } |
668 | 658 |
659 | |
660 void send_sm_join_ack(int psid, int sid,REPCommand sendCommand) { | |
661 if (psid==smList.sessionManagerID()) { | |
662 // 直下のsessionManagerにIDを割り振る必要がある。 | |
663 smList.assignSessionManagerIDtoWaitingSM(sid); | |
664 // ここで smList に一つだけ追加されるので | |
665 // 待っている最初のsm一つにだけ、sm_joinが新たに送られる。 | |
666 } | |
667 smList.sendToSlaves(sendCommand); | |
668 } | |
669 | |
670 | |
671 private REPCommand makeREPCommandWithSessionList(REP cmd) { | |
672 //SessionListからXMLを生成。 | |
673 //joinしてきたSessionManagerに対してACKを送信。 | |
674 SessionXMLEncoder sessionlistEncoder = new SessionXMLEncoder(sessionList); | |
675 REPCommand sendCommand = new REPCommand(); | |
676 sendCommand.setCMD(cmd); | |
677 sendCommand.setString(sessionlistEncoder.sessionListToXML()); | |
678 return sendCommand; | |
679 } | |
680 | |
681 | |
682 public boolean isMaster() { | |
683 return smList.isMaster(); | |
684 } | |
685 | |
686 | |
687 public void setSessionManagerID(int sid) { | |
688 smList.setSessionManagerID(sid); | |
689 } | |
690 | |
669 } | 691 } |