Mercurial > hg > RemoteEditor > REPSessionManager
annotate rep/SessionManager.java @ 364:c965ef2b5fd6
*** empty log message ***
author | kono |
---|---|
date | Mon, 20 Oct 2008 13:44:34 +0900 |
parents | 1a8856580d38 |
children | c432755c3555 |
rev | line source |
---|---|
266 | 1 |
0 | 2 package rep; |
3 | |
4 import java.io.IOException; | |
5 import java.net.InetSocketAddress; | |
267 | 6 import java.nio.channels.ClosedChannelException; |
0 | 7 import java.nio.channels.SelectionKey; |
83 | 8 import java.util.LinkedList; |
144 | 9 import java.util.List; |
231 | 10 import java.util.Set; |
178 | 11 import java.util.concurrent.BlockingQueue; |
192 | 12 import java.util.concurrent.LinkedBlockingQueue; |
0 | 13 |
346 | 14 import org.xml.sax.SAXException; |
15 | |
198 | 16 |
353 | 17 |
337 | 18 import rep.channel.REPLogger; |
123 | 19 import rep.channel.REPServerSocketChannel; |
133 | 20 import rep.channel.REPSocketChannel; |
144 | 21 import rep.handler.PacketSet; |
146 | 22 import rep.handler.REPHandler; |
158 | 23 import rep.channel.REPSelector; |
56 | 24 import rep.xml.SessionXMLDecoder; |
45 | 25 import rep.xml.SessionXMLEncoder; |
198 | 26 import rep.channel.REPSelectionKey; |
264 | 27 |
198 | 28 /* |
264 | 29 +-------+--------+--------+-------+--------+---------+------+ |
30 | cmd | session| editor | seqid | lineno | textsiz | text | | |
31 | | id | id | | | | | | |
32 +-------+--------+--------+-------+--------+---------+------+ | |
33 o---------- header section (network order) ----------o | |
34 | |
35 int cmd; kind of command | |
36 int sid; session ID : uniqu to editing file | |
37 int eid; editor ID : owner editor ID = 1。Session に対して unique | |
308 | 38 -1 session manager command |
39 -2 merge command | |
264 | 40 int seqno; Sequence number : sequence number はエディタごとに管理 |
41 int lineno; line number | |
42 int textsize; textsize : bytesize | |
43 byte[] text; | |
198 | 44 */ |
1 | 45 |
250 | 46 public class SessionManager implements SessionManagerEventListener{ |
337 | 47 static public REPLogger logger = REPLogger.singleton(); |
48 | |
363 | 49 SessionList sessionList; |
280 | 50 private SessionManagerGUI gui; |
363 | 51 // Main nio.Selector of this server |
198 | 52 private REPSelector<REPCommand> selector; |
363 | 53 // Known Session Manager List, At most one parent. No parent means master. |
319 | 54 SessionManagerList smList; |
363 | 55 // Known Editor list. Connected Editor has a channel. |
56 // Session Manager Channel may have dummy editors. | |
356 | 57 EditorList editorList; |
363 | 58 // Commands for busy editor are kept in this queue. |
212 | 59 private List<PacketSet> waitingCommandInMerge; |
363 | 60 // Command from gui. Synchronization is required. |
61 private BlockingQueue<SessionManagerEvent> waitingEventQueue | |
62 = new LinkedBlockingQueue<SessionManagerEvent>();; | |
63 // host name of this server. One of connecting SocketChannel's hostname | |
319 | 64 String myHost; |
363 | 65 // Single threaded write queueu. To avoid dead lock with too many writes. |
317 | 66 private LinkedList<PacketSet> writeQueue = new LinkedList<PacketSet>(); |
336 | 67 private int receive_port; |
68 private int parent_port; | |
101 | 69 static final int DEFAULT_PORT = 8766; |
363 | 70 // Queue limit for debugging purpose. |
332 | 71 private static final int packetLimit = 200; |
358 | 72 |
363 | 73 // globalSessionID = SessionManagerID * MAXID + localSessionID |
358 | 74 private static final int MAXID = 10000; |
353 | 75 SessionXMLDecoder decoder = new SessionXMLDecoder(); |
358 | 76 SessionXMLEncoder encoder = new SessionXMLEncoder(); |
363 | 77 // SocketChannel for our parent. At most one parent is allowed. |
355 | 78 private Forwarder sm_join_channel; |
363 | 79 // Routing table for session and session manager. |
358 | 80 private RoutingTable routingTable = new RoutingTable(); |
81 | |
316 | 82 public static void main(String[] args) throws InterruptedException, IOException { |
83 | |
84 int port = DEFAULT_PORT; | |
85 int port_s = DEFAULT_PORT; | |
86 //System.setProperty("file.encoding", "UTF-8"); | |
87 if(args.length > 0){ | |
363 | 88 if (args.length!=2) { |
89 logger.writeLog("Usage: sessionManager our_port parent_port"); | |
90 return; | |
91 } | |
316 | 92 port = Integer.parseInt(args[0]); |
93 port_s = Integer.parseInt(args[1]); | |
94 } | |
95 SessionManager sm = new SessionManager(); | |
336 | 96 sm.setReceivePort(port); |
97 sm.setParentPort(port_s); | |
363 | 98 // Ok start main loop |
316 | 99 sm.init(port,new SessionManagerGUIimpl(sm)); |
100 } | |
101 | |
336 | 102 public void setReceivePort(int port) { |
103 receive_port = port; | |
104 } | |
105 | |
280 | 106 public void init(int port, SessionManagerGUI gui) throws IOException, InterruptedException { |
107 this.gui = gui; | |
108 init(port); | |
109 mainLoop(); | |
110 } | |
111 | |
112 private void init(int port) throws InterruptedException, IOException { | |
363 | 113 selector = REPSelector.<REPCommand>create(); |
186 | 114 REPServerSocketChannel<REPCommand> ssc = REPServerSocketChannel.<REPCommand>open(new REPCommandPacker()); |
363 | 115 ssc.configureBlocking(false); // Selector requires this |
116 ssc.socket().setReuseAddress(true); //reuse address 必須 | |
212 | 117 //getAllByNameで取れた全てのアドレスに対してbindする |
118 ssc.socket().bind(new InetSocketAddress(port)); | |
349 | 119 ssc.register(selector, SelectionKey.OP_ACCEPT, |
120 new Forwarder(this)); | |
6 | 121 |
358 | 122 sessionList = new SessionList(); |
7 | 123 smList = new SessionManagerList(); |
356 | 124 editorList = new EditorList(); |
212 | 125 waitingCommandInMerge = new LinkedList<PacketSet>(); |
228 | 126 |
215 | 127 |
155 | 128 } |
313 | 129 |
130 /* | |
363 | 131 * The main loop. |
132 * Check incoming events and waiting writes. | |
133 * Do select and call select() to check in coming packets. | |
313 | 134 * We wrote everything in one thread, but we can assign |
135 * one thread for each communication channel and GUI event. | |
136 */ | |
231 | 137 public void mainLoop() throws IOException { |
0 | 138 while(true){ |
328 | 139 checkWaitingCommandInMerge(); |
313 | 140 if (checkInputEvent() || |
328 | 141 checkWaitingWrite()) { |
313 | 142 // try to do fair execution for waiting task |
143 if(selector.selectNow() > 0) select(); | |
144 continue; | |
300 | 145 } |
313 | 146 // now we can wait for input packet or event |
233 | 147 selector.select(); |
144 | 148 select(); |
149 } | |
150 } | |
151 | |
363 | 152 /* |
153 * Synchronize GUI event in the main loop. | |
154 */ | |
313 | 155 private boolean checkInputEvent() { |
156 SessionManagerEvent e; | |
157 if((e = waitingEventQueue.poll())!=null){ | |
334 | 158 e.exec(this); |
313 | 159 return true; |
160 } | |
161 return false; | |
162 } | |
163 | |
363 | 164 /* |
165 * Write a packet during the main loop. | |
166 */ | |
313 | 167 private boolean checkWaitingWrite() throws IOException { |
317 | 168 PacketSet p = writeQueue.poll(); |
169 if (p!=null) { | |
327 | 170 p.channel.write(p.command); |
317 | 171 return true; |
313 | 172 } |
173 return false; | |
174 } | |
175 | |
308 | 176 /** |
177 * Check waiting command in merge | |
178 * @return true if there is a processed waiting command | |
179 * @throws IOException | |
180 */ | |
346 | 181 private void checkWaitingCommandInMerge() { |
328 | 182 List<PacketSet> w = waitingCommandInMerge; |
183 waitingCommandInMerge = new LinkedList<PacketSet>(); | |
184 for(PacketSet p: w) { | |
185 Editor e = p.getEditor(); | |
353 | 186 if(e.isMerging()) { // still merging do nothing |
328 | 187 waitingCommandInMerge.add(p); |
188 } else { | |
346 | 189 try { |
353 | 190 if (sessionManage(e, p.command)) { // we don't need this |
191 assert false; | |
192 return; | |
193 } | |
194 e.manage(p.command); | |
347 | 195 } catch (Exception e1) { |
349 | 196 // should be e.close()? |
346 | 197 close(p.channel); |
198 } | |
328 | 199 } |
200 } | |
201 } | |
363 | 202 |
203 /* | |
204 * If we have waiting write commands, further sent commands also | |
205 * wait to avoid out of order packet sending. | |
206 */ | |
207 public boolean hasWaitingCommand(REPSocketChannel<REPCommand>c) { | |
208 for(PacketSet p:waitingCommandInMerge) { | |
209 if (p.channel==c) { | |
210 return true; | |
211 } | |
212 } | |
213 return false; | |
214 } | |
328 | 215 |
363 | 216 /* |
217 * Close a channel in case of exception or close. | |
218 */ | |
346 | 219 private void close(REPSocketChannel<REPCommand> channel) { |
220 REPSelectionKey<REPCommand>key = channel.keyFor1(selector); | |
221 REPHandler handler = (REPHandler)key.attachment(); | |
222 key.cancel(); | |
223 handler.cancel(channel); | |
224 // we have to remove session/enditor | |
225 } | |
226 | |
227 | |
363 | 228 /* |
229 * Do select operation on the Selector. Each key has a forwarder. | |
230 * A forwarder can be a firstConnector, a forwarder for Session Manager | |
231 * or an Editor. | |
232 */ | |
144 | 233 private void select() throws IOException { |
231 | 234 |
235 Set<REPSelectionKey<REPCommand>> keys = selector.selectedKeys1(); | |
236 for(REPSelectionKey<REPCommand> key : keys){ | |
144 | 237 if(key.isAcceptable()){ |
363 | 238 /* |
239 * Incoming connection. We don't know which, editor or | |
240 * session manager. Assign FirstConnector to distinguish. | |
241 */ | |
199 | 242 REPSocketChannel<REPCommand> channel = key.accept(new REPCommandPacker()); |
337 | 243 logger.writeLog("SessionManager.select() : key.isAcceptable : channel = " + channel); |
363 | 244 registerChannel(channel, new FirstConnector(this)); |
144 | 245 channel = null; |
246 }else if(key.isReadable()){ | |
363 | 247 /* |
248 * Incoming packets are handled by a various forwarder. | |
249 * A hadler throw IOException() in case of a trouble to | |
250 * close the channel. | |
251 */ | |
212 | 252 REPHandler handler = (REPHandler)(key.attachment()); |
267 | 253 try { |
254 handler.handle(key); | |
358 | 255 } catch (IOException e) { |
267 | 256 key.cancel(); |
308 | 257 handler.cancel(key.channel1()); |
267 | 258 } |
0 | 259 } |
260 } | |
261 } | |
1 | 262 |
355 | 263 void registerChannel(REPSocketChannel<REPCommand> channel,Forwarder handler) throws IOException { |
2 | 264 if(channel == null) { |
265 return; | |
266 } | |
349 | 267 handler.setChannel(channel); |
2 | 268 channel.configureBlocking(false); |
355 | 269 channel.register(selector, SelectionKey.OP_READ, handler); |
2 | 270 } |
271 | |
363 | 272 /* |
273 * After loop detection, we give up session manager join. | |
274 */ | |
275 private void cancel_sm_join() { | |
364 | 276 logger.writeLog("Loop detected "+this); |
355 | 277 removeChannel(sm_join_channel); |
278 sm_join_channel=null; | |
279 } | |
280 | |
281 | |
363 | 282 private void removeChannel(Forwarder channel) { |
283 REPSelectionKey<REPCommand> key = channel.channel.keyFor1(selector); | |
355 | 284 key.cancel(); |
285 try { | |
364 | 286 channel.channel.close1(); |
355 | 287 } catch (IOException e) { |
288 } | |
349 | 289 } |
290 | |
320 | 291 |
319 | 292 void updateGUI() { |
212 | 293 //リストのコピーをGUIに渡す |
358 | 294 LinkedList<Session> sList = new LinkedList<Session>(sessionList.values()); |
295 LinkedList<Editor> eList = new LinkedList<Editor>(editorList.values()); | |
212 | 296 //GUIに反映 |
297 Runnable doRun = new DoGUIUpdate(sList, eList, gui); | |
279 | 298 gui.invokeLater(doRun); |
212 | 299 } |
300 | |
83 | 301 |
139 | 302 |
353 | 303 void setMyHostName(String localHostName) { |
308 | 304 myHost = localHostName + receive_port; |
164 | 305 setHostToEditor(myHost); |
306 } | |
307 | |
308 private void setHostToEditor(String myHost2) { | |
358 | 309 for(Editor editor : editorList.values()){ |
310 if (editor.channel!=null) | |
311 editor.setHost(myHost2); | |
164 | 312 } |
76 | 313 } |
0 | 314 |
355 | 315 |
316 /** | |
363 | 317 * GUI から、呼ばれて、Session Managerに接続する。 |
355 | 318 * Host 名のSession Manager に SM_JOIN する。自分は、Session を持っていては |
319 * ならない。複数のSession Managerにjoinすることは出来ない。(NATを実装するまでは)。 | |
320 * @param host | |
321 */ | |
364 | 322 public void connectSession(String host, int port) { |
355 | 323 if (sm_join_channel!=null) return; |
324 if (!sessionList.isEmpty()) return; | |
325 if (!smList.isMaster()) return; | |
363 | 326 /* |
327 * IPv6 対応では、複数のアドレスを取って、それのすべてに接続を試す必要が | |
328 * ある。 | |
329 */ | |
1 | 330 InetSocketAddress addr = new InetSocketAddress(host, port); |
331 try { | |
186 | 332 REPSocketChannel<REPCommand> sessionchannel = REPSocketChannel.<REPCommand>create(new REPCommandPacker()); |
1 | 333 sessionchannel.connect(addr); |
337 | 334 while(!sessionchannel.finishConnect()); |
349 | 335 Forwarder sm = new Forwarder(this); |
355 | 336 registerChannel(sessionchannel, sm); |
349 | 337 sm_join(sm); |
1 | 338 }catch (IOException e) { |
339 } | |
340 } | |
364 | 341 |
342 public void connectSession(String host) { | |
343 connectSession(host,parent_port); | |
344 } | |
77 | 345 |
363 | 346 /** |
347 * channel に SMCMD_SM_JOIN command を送る。 | |
348 * @param channel | |
349 */ | |
349 | 350 private void sm_join(Forwarder channel){ |
355 | 351 sm_join_channel = channel; |
122 | 352 //SM_JOINコマンドを生成。 |
77 | 353 REPCommand command = new REPCommand(); |
354 command.setCMD(REP.SMCMD_SM_JOIN); | |
349 | 355 command.setEID(-1); // request Parent SessionManagerID |
356 command.setSID(-1); // request SessionManagerID | |
79 | 357 |
122 | 358 //hostnameをセット。 |
349 | 359 setMyHostName(channel.getLocalHostName()); |
82 | 360 |
355 | 361 String string = myHost; |
77 | 362 command.setString(string); |
363 | |
122 | 364 //SM_JOINコマンドを送信。 |
349 | 365 channel.send(command); |
363 | 366 // ack を受け取ったら、SessionManagerのListに追加。ここではやらない。 |
77 | 367 } |
349 | 368 |
361 | 369 /* |
370 * Select Session from Manager button | |
371 * selected editor is joined editor directly connected to this session | |
372 * manager. | |
373 */ | |
316 | 374 public void selectSession(SelectButtonEvent event) throws IOException { |
250 | 375 int sid = event.getSID(); |
358 | 376 Session session = sessionList.get(sid); |
227 | 377 |
320 | 378 Editor editor = (Editor)event.getEditor(); |
227 | 379 if(editor == null){ |
363 | 380 logger.writeLog("Error SessionManager.selectSession(): editor = " + editor); |
227 | 381 return; |
382 } | |
324 | 383 if (editor.hasSession()) return; |
319 | 384 |
358 | 385 selectSession(sid, session, editor.getEID(), editor); |
386 } | |
387 | |
361 | 388 /* |
389 * Select Session Protocol handler | |
363 | 390 * called from GUI or incoming SMCMD_SELECT command. |
361 | 391 */ |
358 | 392 private void selectSession(int sid, Session session, int eid, Forwarder editor) { |
158 | 393 if(session.hasOwner()){ |
363 | 394 // we have selected session. |
107 | 395 REPCommand sendCommand = new REPCommand(); |
358 | 396 if (editor.isDirect()&&editor.getEID()==eid) { |
363 | 397 // Found directly connected joined editor. Send join_ack(). |
361 | 398 session.addForwarder(editor); |
359
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
399 sendUpdate(session.getSID()); |
358 | 400 sendCommand.setCMD(REP.SMCMD_JOIN_ACK); |
401 } else { | |
363 | 402 // We have a session, but joined editor is on the other sm. |
358 | 403 // SELECT_ACK is sent to the session ring to |
363 | 404 // find out the joined editor. |
358 | 405 sendCommand.setCMD(REP.SMCMD_SELECT_ACK); |
361 | 406 // Do not directly addForwarder(forwarder). It may be |
407 // shared among sessions. | |
408 Forwarder f = new Editor(this, false, makeID(editorList.newEid())); | |
409 f.setChannel(editor.channel); // incoming channel | |
410 f.setHost(myHost); | |
411 f.setSID(sid); | |
363 | 412 session.addForwarder(f); // f.next is set up here. |
358 | 413 } |
148 | 414 sendCommand.setEID(editor.getEID()); |
107 | 415 sendCommand.setSID(sid); |
361 | 416 sendCommand.string = session.getName(); |
349 | 417 editor.send(sendCommand); |
107 | 418 }else { |
363 | 419 // session searching continue... |
358 | 420 Forwarder next = routingTable.toSession(sid); |
107 | 421 |
363 | 422 // create dummy editor for this session |
361 | 423 Forwarder f = new Editor(this, false, makeID(editorList.newEid())); |
424 f.setChannel(editor.channel); // incoming channel | |
425 f.setNext(next); | |
426 f.setHost(myHost); | |
427 f.setSID(sid); | |
428 session.setFirstForwarder(f); | |
429 | |
363 | 430 // pass the select command to the next path. |
107 | 431 REPCommand command = new REPCommand(); |
432 command.setCMD(REP.SMCMD_SELECT); | |
433 command.setSID(sid); | |
358 | 434 command.setEID(eid); |
178 | 435 command.setString(editor.getHost()); |
358 | 436 next.send(command); |
107 | 437 } |
8 | 438 } |
122 | 439 |
363 | 440 /* |
441 * Create and send UPDATE command. | |
442 */ | |
359
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
443 private void sendUpdate(int sid) { |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
444 REPCommand command = makeREPCommandWithSessionList(REP.SMCMD_UPDATE); |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
445 command.setSID(sid); |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
446 command.setEID(REP.SM_EID.id); |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
447 smList.sendToMaster(command); |
358 | 448 } |
449 | |
363 | 450 /* |
451 * Create new editor in this sessin manager. A dummy editor | |
452 * is created also. | |
453 */ | |
359
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
454 public Editor newEditor(REPSocketChannel<REPCommand> channel) { |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
455 int eid = makeID(editorList.newEid()); |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
456 Editor editor = new Editor(this, eid, channel); |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
457 editorList.add(editor); |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
458 return editor; |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
459 } |
363 | 460 |
461 /* | |
462 * Create new session. | |
463 */ | |
359
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
464 public Session newSession(Forwarder master) { |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
465 int sid= makeID(sessionList.newSessionID()); |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
466 Session session = new Session(sid, master); |
360 | 467 sessionList.put(sid, session); |
359
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
468 return session; |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
469 } |
358 | 470 |
144 | 471 public void addWaitingCommand(PacketSet set) { |
212 | 472 waitingCommandInMerge.add(set); |
144 | 473 } |
148 | 474 |
363 | 475 /* |
476 * Synchronize GUI command in this session manager. | |
477 */ | |
222 | 478 public void buttonPressed(SessionManagerEvent event) { |
479 try { | |
308 | 480 waitingEventQueue.put(event); |
222 | 481 } catch (InterruptedException e) {} |
482 selector.wakeup(); | |
483 } | |
281 | 484 |
363 | 485 /* |
486 * Execute incoming event during the initialization for | |
487 * testing purpose. | |
488 */ | |
281 | 489 public void syncExec(SessionManagerEvent event) { |
490 try { | |
308 | 491 waitingEventQueue.put(event); |
281 | 492 } catch (InterruptedException e) { |
493 } | |
494 } | |
222 | 495 |
363 | 496 /* |
497 * GUI command interface for close session. | |
498 */ | |
259 | 499 public void closeSession(SessionManagerEvent event) { |
500 Session session = ((CloseButtonEvent) event).getSession(); | |
501 session.closeSession(); | |
502 sessionList.remove(session); | |
503 updateGUI(); | |
504 } | |
505 | |
363 | 506 /* |
507 * Remove editors which has the cannel. | |
508 */ | |
274 | 509 public void remove(REPSocketChannel<REPCommand> channel) { |
358 | 510 int i = 0; |
511 for(Session s:sessionList.values()) { | |
274 | 512 if (s.deleteEditor(channel)) { |
358 | 513 i++; |
274 | 514 } |
515 } | |
358 | 516 assert(i==1); |
274 | 517 // can be other session manager? what should I do? |
518 } | |
519 | |
317 | 520 |
521 public void addWriteQueue(PacketSet packetSet) { | |
324 | 522 writeQueue.addLast(packetSet); |
323 | 523 assert(writeQueue.size()<packetLimit) ; |
317 | 524 } |
525 | |
318 | 526 |
527 public void remove(Editor editor) { | |
358 | 528 Session s = sessionList.get(editor.getSID()); |
362 | 529 if (s==null) { |
530 assert(false); | |
531 editorList.remove(editor); | |
532 } else if (editor.isMaster()) { | |
358 | 533 removeSession(s); |
534 } else { | |
535 s.deleteForwarder(editor); | |
536 editorList.remove(editor); | |
318 | 537 } |
341 | 538 updateGUI(); |
539 } | |
540 | |
541 private void removeSession(Session s0) { | |
358 | 542 s0.remove(this); |
341 | 543 sessionList.remove(s0); |
359
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
544 sendUpdate(s0.getSID()); |
318 | 545 } |
546 | |
334 | 547 public void setParentPort(int port) { |
548 parent_port = port; | |
549 } | |
550 public int getParentPort() { | |
551 return parent_port; | |
552 } | |
553 | |
554 public int getPort() { | |
555 return receive_port; | |
556 } | |
557 | |
353 | 558 |
355 | 559 boolean sessionManage(Forwarder forwarder, REPCommand command) throws ClosedChannelException, |
358 | 560 IOException { |
355 | 561 switch(command.cmd){ |
353 | 562 |
563 // Session Manager Command | |
564 | |
565 case SMCMD_JOIN: | |
566 { | |
358 | 567 // first connection or forwarded command |
568 if(isMaster()) { | |
569 REPCommand ackCommand = new REPCommand(); | |
570 ackCommand.setCMD(REP.SMCMD_JOIN_ACK); | |
571 ackCommand.setEID(command.eid); | |
360 | 572 ackCommand.setSID(command.sid); |
358 | 573 ackCommand.string = command.string; |
574 smList.sendToSlaves(ackCommand); | |
575 registEditor(forwarder,ackCommand); | |
576 } else { | |
577 routingTable.add(forwarder,getSMID(command.eid),command.sid); | |
578 smList.sendToMaster(command); | |
579 } | |
360 | 580 updateGUI(); |
353 | 581 } |
582 | |
583 break; | |
584 | |
361 | 585 case SMCMD_PUT_ACK: |
353 | 586 case SMCMD_JOIN_ACK: |
358 | 587 registEditor(forwarder,command); |
360 | 588 updateGUI(); |
353 | 589 break; |
590 | |
591 case SMCMD_PUT: | |
592 { | |
358 | 593 // first connection or forwarded command |
594 if(isMaster()) { | |
595 command.setCMD(REP.SMCMD_PUT_ACK); | |
596 command.string = command.string; | |
597 command.setEID(command.eid); | |
598 command.setSID(command.sid); | |
599 smList.sendToSlaves(command); | |
600 registEditor(forwarder,command); | |
601 } else { | |
602 routingTable.add(forwarder,getSMID(command.eid),command.sid); | |
603 smList.sendToMaster(command); | |
361 | 604 // registEditor will be done by SMCMD_PUT_ACK |
605 } | |
606 if (forwarder.isDirect()) { | |
607 // send put_ack to the editor now. | |
608 command.setCMD(REP.SMCMD_PUT_ACK); | |
609 command.string = command.string; | |
610 command.setEID(command.eid); | |
611 command.setSID(command.sid); | |
612 forwarder.send(command); | |
358 | 613 } |
360 | 614 updateGUI(); |
353 | 615 |
616 } | |
617 break; | |
358 | 618 |
353 | 619 case SMCMD_SELECT: |
361 | 620 { |
621 Session session = sessionList.get(command.sid); | |
622 if (session==null) { | |
623 sessionList.put(command.sid, | |
624 new Session(command.sid, command.string,null)); | |
625 } | |
626 selectSession(command.sid, session, command.eid, forwarder); | |
627 } | |
628 break; | |
353 | 629 case SMCMD_SELECT_ACK: |
630 { | |
358 | 631 Session session = sessionList.get(command.sid); |
361 | 632 selectSession(command.sid, session, command.eid, |
633 session.getFirstForwarder()); | |
353 | 634 } |
358 | 635 break; |
636 | |
353 | 637 case SMCMD_SM_JOIN: |
638 { | |
355 | 639 // SM_JOIN中にSMCMD_SM_JOINが来たら、これはループなので、 |
640 ///自分のSM_JOINを取り消す。 | |
641 if (sm_join_channel!=null) cancel_sm_join(); | |
353 | 642 // SMCMD_SM_JOIN は、master まで上昇する。 |
643 // masterでなければ、自分のparentに転送する。 | |
355 | 644 if(isMaster()) { |
353 | 645 // master であれば、SessionManager IDを決めて、 |
646 // 自分のsmList に登録 | |
355 | 647 Forwarder sm; |
648 int psid = command.eid; | |
649 if (forwarder.sid!=-1) { | |
650 // すでに channelはSessionManager Idを持っていて、 | |
651 // direct link ではないので、 | |
652 // channel を持たないForwarderとして登録する | |
653 sm = new Forwarder(this); | |
654 } else { | |
655 sm = forwarder; | |
656 } | |
657 int sid = smList.addNewSessionManager(sm,command); | |
658 REPCommand sendCommand = makeREPCommandWithSessionList(REP.SMCMD_SM_JOIN_ACK); | |
659 // command.eid==smList.sesionManagerID() の場合は、 | |
660 // 待っている自分の下のsessionManagerにsidをassignする必要がある。 | |
353 | 661 sendCommand.setSID(sid); // new Session manager ID |
662 // 複数のSM_JOIN_ACKを識別するには、最初にSM_JOINを受け付けた | |
663 // Session manager IDを使う。 | |
355 | 664 sendCommand.setEID(psid); |
665 send_sm_join_ack(psid, sid, sendCommand); | |
666 } else { | |
667 if (forwarder.sid==-1) { | |
668 // direct link の場合は、識別のために、EIDに直上の | |
669 // smid を入れておく。 | |
670 command.setEID(smList.sessionManagerID()); | |
671 } | |
672 smList.sendToMaster(command); | |
353 | 673 } |
674 } | |
675 break; | |
676 | |
355 | 677 case SMCMD_SM_JOIN_ACK: |
678 send_sm_join_ack(command.eid, command.sid, command); | |
353 | 679 break; |
680 | |
681 case SMCMD_UPDATE: | |
359
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
682 if (!isMaster()) { |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
683 command.setString(mergeUpdate(command)); |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
684 // 上に知らせる |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
685 smList.sendToMaster(command); |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
686 break; |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
687 } |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
688 // fall thru |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
689 command.setCMD(REP.SMCMD_UPDATE_ACK); |
353 | 690 case SMCMD_UPDATE_ACK: |
358 | 691 command.setString(mergeUpdate(command)); |
692 // 下に知らせる | |
355 | 693 smList.sendToSlaves(command); |
358 | 694 updateGUI(); |
353 | 695 break; |
696 default: | |
697 return false; | |
698 } | |
699 return true; | |
700 } | |
701 | |
355 | 702 |
363 | 703 /** |
704 * UPDATE/UPDATE_ACKにより送られてきたSessionの情報を追加する | |
705 * @param command | |
706 * @return | |
707 * @throws IOException | |
708 */ | |
358 | 709 private String mergeUpdate(REPCommand command) throws IOException { |
710 SessionList receivedSessionList; | |
711 try { | |
712 receivedSessionList = decoder.decode(command.string); | |
713 } catch (SAXException e) { | |
714 throw new IOException(); | |
715 } | |
363 | 716 // 受け取った情報と自分の情報を混ぜる。 |
717 sessionList.merge(receivedSessionList); | |
358 | 718 //XMLを生成。送信コマンドにセット。 |
719 return encoder.sessionListToXML(sessionList); | |
720 | |
721 } | |
722 | |
723 /* | |
724 * id has SessionManager ID part | |
725 */ | |
726 private int makeID(int newid) { | |
727 return newid+smList.sessionManagerID()*MAXID; | |
728 } | |
729 | |
730 private int getSMID(int id) { | |
731 return id/MAXID; | |
732 } | |
733 | |
734 | |
361 | 735 /** |
736 * Register Editor to our editorList. No connection is made. | |
737 * @param forwarder Editor to be add | |
738 * @param command | |
739 */ | |
358 | 740 private void registEditor(Forwarder forwarder,REPCommand command) { |
741 // make ack for PUT/JOIN. Do not send this to the editor, | |
742 // before select. After select, ack is sent to the editor. | |
743 routingTable.add(forwarder,getSMID(command.eid),command.sid); | |
744 Editor editor; | |
361 | 745 if (getSMID(command.sid)==smList.sessionManagerID() |
358 | 746 && forwarder.isDirect()) { |
747 // direct link だった | |
748 editor = (Editor)forwarder; | |
749 } else { | |
750 editor = new Editor(this, command.cmd==REP.SMCMD_PUT_ACK, command.eid); | |
751 } | |
752 editor.setName(command.string); | |
753 editor.setSID(command.sid); | |
754 if (!editorList.hasEid(command.eid)) { | |
755 editorList.add(editor); | |
756 } | |
757 // we don't join ack to the direct linked editor. We | |
758 // have to wait select command | |
759 } | |
760 | |
761 | |
355 | 762 void send_sm_join_ack(int psid, int sid,REPCommand sendCommand) { |
763 if (psid==smList.sessionManagerID()) { | |
764 // 直下のsessionManagerにIDを割り振る必要がある。 | |
765 smList.assignSessionManagerIDtoWaitingSM(sid); | |
766 // ここで smList に一つだけ追加されるので | |
358 | 767 // 待っている最初のsm一つにだけ、sm_join_ackが新たに送られる。 |
355 | 768 } |
769 smList.sendToSlaves(sendCommand); | |
770 } | |
771 | |
772 | |
773 private REPCommand makeREPCommandWithSessionList(REP cmd) { | |
774 //SessionListからXMLを生成。 | |
775 //joinしてきたSessionManagerに対してACKを送信。 | |
776 REPCommand sendCommand = new REPCommand(); | |
777 sendCommand.setCMD(cmd); | |
358 | 778 sendCommand.setString(encoder.sessionListToXML(sessionList)); |
355 | 779 return sendCommand; |
780 } | |
781 | |
782 | |
783 public boolean isMaster() { | |
784 return smList.isMaster(); | |
785 } | |
786 | |
787 | |
788 public void setSessionManagerID(int sid) { | |
789 smList.setSessionManagerID(sid); | |
790 } | |
791 | |
358 | 792 |
793 public Session getSession(int sid) { | |
794 return sessionList.get(sid); | |
795 } | |
796 | |
364 | 797 |
0 | 798 } |