comparison rep/SessionManager.java @ 308:c5be84d53c7f channel-simulator-update **INVALID**

*** empty log message ***
author kono
date Sat, 04 Oct 2008 22:12:34 +0900
parents 75192c844a8d
children 0585fd2410b8
comparison
equal deleted inserted replaced
307:e4b7af3fdf99 308:c5be84d53c7f
32 o---------- header section (network order) ----------o 32 o---------- header section (network order) ----------o
33 33
34 int cmd; kind of command 34 int cmd; kind of command
35 int sid; session ID : uniqu to editing file 35 int sid; session ID : uniqu to editing file
36 int eid; editor ID : owner editor ID = 1。Session に対して unique 36 int eid; editor ID : owner editor ID = 1。Session に対して unique
37 -1 session manager command
38 -2 merge command
37 int seqno; Sequence number : sequence number はエディタごとに管理 39 int seqno; Sequence number : sequence number はエディタごとに管理
38 int lineno; line number 40 int lineno; line number
39 int textsize; textsize : bytesize 41 int textsize; textsize : bytesize
40 byte[] text; 42 byte[] text;
41 */ 43 */
44 46
45 private LinkedList<Session> sessionList; 47 private LinkedList<Session> sessionList;
46 private SessionManagerGUI gui; 48 private SessionManagerGUI gui;
47 private REPSelector<REPCommand> selector; 49 private REPSelector<REPCommand> selector;
48 private SessionManagerList smList; 50 private SessionManagerList smList;
49 private String myHost;
50 private List<Editor> editorList; 51 private List<Editor> editorList;
51 // editorList は、sessionList に入っているeditorとは別なeditorのlistらしい。 52 // editorList は、sessionList に入っているeditorとは別なeditorのlistらしい。
52 private String maxHost; 53 private String maxHost;
53 private List<PacketSet> waitingCommandInMerge; 54 private List<PacketSet> waitingCommandInMerge;
54 private BlockingQueue<SessionManagerEvent> waitingQueue = new LinkedBlockingQueue<SessionManagerEvent>();; 55 REPHandler normalHandler = new REPHandlerImpl(this);
55 private static int temp_port; 56 REPHandler handlerInMerge =new REPHandlerInMerge(this);
56 private static int send_port; 57 private BlockingQueue<SessionManagerEvent> waitingEventQueue = new LinkedBlockingQueue<SessionManagerEvent>();;
58 private String myHost;
59 private static int receive_port;
60 private static int parent_port;
57 static final int DEFAULT_PORT = 8766; 61 static final int DEFAULT_PORT = 8766;
58 62
59 63
60 public void openSelector() throws IOException{ 64 public void openSelector() throws IOException{
61 selector = REPSelector.<REPCommand>create(); 65 selector = REPSelector.<REPCommand>create();
74 REPServerSocketChannel<REPCommand> ssc = REPServerSocketChannel.<REPCommand>open(new REPCommandPacker()); 78 REPServerSocketChannel<REPCommand> ssc = REPServerSocketChannel.<REPCommand>open(new REPCommandPacker());
75 ssc.configureBlocking(false); //reuse address 必須 79 ssc.configureBlocking(false); //reuse address 必須
76 ssc.socket().setReuseAddress(true); 80 ssc.socket().setReuseAddress(true);
77 //getAllByNameで取れた全てのアドレスに対してbindする 81 //getAllByNameで取れた全てのアドレスに対してbindする
78 ssc.socket().bind(new InetSocketAddress(port)); 82 ssc.socket().bind(new InetSocketAddress(port));
79 ssc.register(selector, SelectionKey.OP_ACCEPT, new REPHandlerImpl(-1, this)); 83 ssc.register(selector, SelectionKey.OP_ACCEPT, normalHandler);
80 84
81 sessionList = new LinkedList<Session>(); 85 sessionList = new LinkedList<Session>();
82 smList = new SessionManagerList(); 86 smList = new SessionManagerList();
83 editorList = new LinkedList<Editor>(); 87 editorList = new LinkedList<Editor>();
84 waitingCommandInMerge = new LinkedList<PacketSet>(); 88 waitingCommandInMerge = new LinkedList<PacketSet>();
85 89
86 //デフォルトのSessionを作っておく(テスト用に?)
87 //if(sessionList.size() > 0) System.out.println("Error : SessionManager.init():");
88 //Session defaultSession = new Session(sessionList.size(), "DefaultSession.txt", new Editor(0,null));
89 //sessionList.add(defaultSession);
90 90
91 } 91 }
92 92
93 public void mainLoop() throws IOException { 93 public void mainLoop() throws IOException {
94 while(true){ 94 while(true){
95 SessionManagerEvent e; 95 SessionManagerEvent e;
96 while((e = waitingQueue.poll())!=null){ 96 while((e = waitingEventQueue.poll())!=null){
97 e.exec(); 97 e.exec();
98 } 98 }
99 for(Session s:sessionList) { 99 for(Session s:sessionList) {
100 for(Editor editor: s.getEditorList()) 100 for(Editor editor: s.getEditorList())
101 if (editor.doWaitingWrite()) break; 101 if (editor.doWaitingWrite()) break;
102 } 102 }
103 if(checkSend()){ 103 // if there are waiting command during merge operation, do process it
104 if(checkWaitingCommandInMerge()){
104 if(selector.selectNow() > 0){ 105 if(selector.selectNow() > 0){
105 select(); 106 select();
106 } 107 }
107 continue; 108 continue;
108 } 109 }
109 selector.select(); 110 selector.select();
110 select(); 111 select();
111 } 112 }
112 } 113 }
113 114
114 private boolean checkSend() throws IOException { 115 /**
116 * Check waiting command in merge
117 * @return true if there is a processed waiting command
118 * @throws IOException
119 */
120 private boolean checkWaitingCommandInMerge() throws IOException {
115 for(Iterator<PacketSet> it = waitingCommandInMerge.iterator(); it.hasNext();){ 121 for(Iterator<PacketSet> it = waitingCommandInMerge.iterator(); it.hasNext();){
116 PacketSet p = it.next(); 122 PacketSet p = it.next();
117 if(p.getEditor().isMerging()) { 123 if(p.getEditor().isMerging()) { // still merging do nothing
118 continue; 124 continue;
119 }else{ 125 }else{
126 // process one command and return true
120 manage(p.channel, p.command); 127 manage(p.channel, p.command);
121 it.remove(); 128 it.remove();
122 return true; 129 return true;
123 } 130 }
124 } 131 }
125 return false; 132 return false;
126 } 133 }
127 134
128 @SuppressWarnings("unchecked")
129 private void select() throws IOException { 135 private void select() throws IOException {
130 136
131 Set<REPSelectionKey<REPCommand>> keys = selector.selectedKeys1(); 137 Set<REPSelectionKey<REPCommand>> keys = selector.selectedKeys1();
132 for(REPSelectionKey<REPCommand> key : keys){ 138 for(REPSelectionKey<REPCommand> key : keys){
133 if(key.isAcceptable()){ 139 if(key.isAcceptable()){
140 REPHandler handler = (REPHandler)(key.attachment()); 146 REPHandler handler = (REPHandler)(key.attachment());
141 try { 147 try {
142 handler.handle(key); 148 handler.handle(key);
143 } catch (ClosedChannelException x) { 149 } catch (ClosedChannelException x) {
144 key.cancel(); 150 key.cancel();
145 handler.cancel((REPSocketChannel<REPCommand>)key.channel()); 151 handler.cancel(key.channel1());
146 } catch (IOException x) { 152 } catch (IOException x) {
147 key.cancel(); 153 key.cancel();
148 handler.cancel((REPSocketChannel<REPCommand>)key.channel()); 154 handler.cancel( key.channel1());
149 } 155 }
150 } 156 }
151 } 157 }
152 } 158 }
153 159
154 private void registerChannel(REPSocketChannel<REPCommand> channel, int ops) throws IOException { 160 private void registerChannel(REPSocketChannel<REPCommand> channel, int ops) throws IOException {
155 if(channel == null) { 161 if(channel == null) {
156 return; 162 return;
157 } 163 }
158 channel.configureBlocking(false); 164 channel.configureBlocking(false);
159 REPHandler handler = new REPHandlerImpl(-1, this); 165 REPHandler handler = normalHandler;
160 channel.register(selector, ops, handler); 166 channel.register(selector, ops, handler);
161 } 167 }
162 168
163 public void manage(REPSocketChannel<REPCommand> channel, REPCommand receivedCommand) throws IOException { 169 public void manage(REPSocketChannel<REPCommand> channel, REPCommand receivedCommand) throws IOException {
164 if(receivedCommand == null) return; 170 if(receivedCommand == null) return;
388 Editor editor = session.getEditor(channel); 394 Editor editor = session.getEditor(channel);
389 boolean old = editor.isMerging(); 395 boolean old = editor.isMerging();
390 session.translate(channel, receivedCommand); 396 session.translate(channel, receivedCommand);
391 boolean newState = editor.isMerging(); 397 boolean newState = editor.isMerging();
392 if (old!=newState) { 398 if (old!=newState) {
399 // prevEditor なのは変だと思うが...
393 Editor prevEditor = session.getPrevEditor(editor); 400 Editor prevEditor = session.getPrevEditor(editor);
394 //マージ中のエディタはコマンドを受け取らない 401 //マージ中のエディタはコマンドを受け取らない
395 // この代入は状態が変わったときだけ行えば良い。毎回、new するのは変 402 // この代入は状態が変わったときだけ行えば良い。毎回、new するのは変
396 if(editor.isMerging()){ 403 if(editor.isMerging()){
397 //Handlerを切り替える 404 //Handlerを切り替える
433 gui.invokeLater(doRun); 440 gui.invokeLater(doRun);
434 } 441 }
435 442
436 private void setNormalState(REPSocketChannel<REPCommand> channel, int sid) { 443 private void setNormalState(REPSocketChannel<REPCommand> channel, int sid) {
437 SelectionKey key = channel.keyFor(selector); 444 SelectionKey key = channel.keyFor(selector);
438 key.attach(new REPHandlerImpl(sid, this)); 445 key.attach(normalHandler);
439 } 446 }
440 447
441 private void setMergeState(REPSocketChannel<REPCommand> channel, int sid) { 448 private void setMergeState(REPSocketChannel<REPCommand> channel, int sid) {
442 SelectionKey key = channel.keyFor(selector); 449 SelectionKey key = channel.keyFor(selector);
443 key.attach(new REPHandlerInMerge(sid, this)); 450 key.attach(handlerInMerge);
444 } 451 }
445 452
446 private Editor getEditor(String hostport) { 453 private Editor getEditor(String hostport) {
447 for(Editor editor : editorList){ 454 for(Editor editor : editorList){
448 if(editor.getHost() == hostport){ 455 if(editor.getHost() == hostport){
476 return true; 483 return true;
477 } 484 }
478 } 485 }
479 486
480 private void setMyHostName(String localHostName) { 487 private void setMyHostName(String localHostName) {
481 myHost = localHostName + temp_port; 488 myHost = localHostName + receive_port;
482 if(maxHost == null) { 489 if(maxHost == null) {
483 maxHost = myHost; 490 maxHost = myHost;
484 } 491 }
485 setHostToEditor(myHost); 492 setHostToEditor(myHost);
486 } 493 }
498 //System.setProperty("file.encoding", "UTF-8"); 505 //System.setProperty("file.encoding", "UTF-8");
499 if(args.length > 0){ 506 if(args.length > 0){
500 port = Integer.parseInt(args[0]); 507 port = Integer.parseInt(args[0]);
501 port_s = Integer.parseInt(args[1]); 508 port_s = Integer.parseInt(args[1]);
502 } 509 }
503 temp_port = port; 510 receive_port = port;
504 send_port = port_s; 511 parent_port = port_s;
505 SessionManager sm = new SessionManager(); 512 SessionManager sm = new SessionManager();
506 sm.init(port,new SessionManagerGUIimpl(sm)); 513 sm.init(port,new SessionManagerGUIimpl(sm));
507 514
508 515
509 } 516 }
510 517
511 public void connectSession(String host) { 518 public void connectSession(String host) {
512 int port = DEFAULT_PORT; 519 int port = DEFAULT_PORT;
513 port = send_port; 520 port = parent_port;
514 InetSocketAddress addr = new InetSocketAddress(host, port); 521 InetSocketAddress addr = new InetSocketAddress(host, port);
515 try { 522 try {
516 REPSocketChannel<REPCommand> sessionchannel = REPSocketChannel.<REPCommand>create(new REPCommandPacker()); 523 REPSocketChannel<REPCommand> sessionchannel = REPSocketChannel.<REPCommand>create(new REPCommandPacker());
517 sessionchannel.configureBlocking(true); 524 sessionchannel.configureBlocking(true);
518 sessionchannel.connect(addr); 525 sessionchannel.connect(addr);
598 waitingCommandInMerge.add(set); 605 waitingCommandInMerge.add(set);
599 } 606 }
600 607
601 public void buttonPressed(SessionManagerEvent event) { 608 public void buttonPressed(SessionManagerEvent event) {
602 try { 609 try {
603 waitingQueue.put(event); 610 waitingEventQueue.put(event);
604 } catch (InterruptedException e) {} 611 } catch (InterruptedException e) {}
605 selector.wakeup(); 612 selector.wakeup();
606 } 613 }
607 614
608 public void syncExec(SessionManagerEvent event) { 615 public void syncExec(SessionManagerEvent event) {
609 try { 616 try {
610 waitingQueue.put(event); 617 waitingEventQueue.put(event);
611 } catch (InterruptedException e) { 618 } catch (InterruptedException e) {
612 } 619 }
613 } 620 }
614 621
615 public void closeSession(SessionManagerEvent event) { 622 public void closeSession(SessionManagerEvent event) {