Mercurial > hg > RemoteEditor > REPSessionManager
annotate rep/SessionManager.java @ 376:c4ffdac26132
*** empty log message ***
author | kono |
---|---|
date | Wed, 22 Oct 2008 03:19:57 +0900 |
parents | e16b6326fdac |
children | 85a5980d96d8 |
rev | line source |
---|---|
266 | 1 |
0 | 2 package rep; |
3 | |
4 import java.io.IOException; | |
5 import java.net.InetSocketAddress; | |
267 | 6 import java.nio.channels.ClosedChannelException; |
0 | 7 import java.nio.channels.SelectionKey; |
83 | 8 import java.util.LinkedList; |
144 | 9 import java.util.List; |
231 | 10 import java.util.Set; |
178 | 11 import java.util.concurrent.BlockingQueue; |
192 | 12 import java.util.concurrent.LinkedBlockingQueue; |
0 | 13 |
346 | 14 import org.xml.sax.SAXException; |
15 | |
198 | 16 |
353 | 17 |
337 | 18 import rep.channel.REPLogger; |
123 | 19 import rep.channel.REPServerSocketChannel; |
133 | 20 import rep.channel.REPSocketChannel; |
144 | 21 import rep.handler.PacketSet; |
146 | 22 import rep.handler.REPHandler; |
158 | 23 import rep.channel.REPSelector; |
56 | 24 import rep.xml.SessionXMLDecoder; |
45 | 25 import rep.xml.SessionXMLEncoder; |
198 | 26 import rep.channel.REPSelectionKey; |
264 | 27 |
198 | 28 /* |
264 | 29 +-------+--------+--------+-------+--------+---------+------+ |
30 | cmd | session| editor | seqid | lineno | textsiz | text | | |
31 | | id | id | | | | | | |
32 +-------+--------+--------+-------+--------+---------+------+ | |
33 o---------- header section (network order) ----------o | |
34 | |
35 int cmd; kind of command | |
36 int sid; session ID : uniqu to editing file | |
37 int eid; editor ID : owner editor ID = 1。Session に対して unique | |
308 | 38 -1 session manager command |
39 -2 merge command | |
264 | 40 int seqno; Sequence number : sequence number はエディタごとに管理 |
41 int lineno; line number | |
42 int textsize; textsize : bytesize | |
43 byte[] text; | |
198 | 44 */ |
1 | 45 |
250 | 46 public class SessionManager implements SessionManagerEventListener{ |
337 | 47 static public REPLogger logger = REPLogger.singleton(); |
48 | |
363 | 49 SessionList sessionList; |
280 | 50 private SessionManagerGUI gui; |
363 | 51 // Main nio.Selector of this server |
198 | 52 private REPSelector<REPCommand> selector; |
363 | 53 // Known Session Manager List, At most one parent. No parent means master. |
319 | 54 SessionManagerList smList; |
363 | 55 // Known Editor list. Connected Editor has a channel. |
56 // Session Manager Channel may have dummy editors. | |
356 | 57 EditorList editorList; |
363 | 58 // Commands for busy editor are kept in this queue. |
212 | 59 private List<PacketSet> waitingCommandInMerge; |
363 | 60 // Command from gui. Synchronization is required. |
61 private BlockingQueue<SessionManagerEvent> waitingEventQueue | |
62 = new LinkedBlockingQueue<SessionManagerEvent>();; | |
63 // host name of this server. One of connecting SocketChannel's hostname | |
319 | 64 String myHost; |
363 | 65 // Single threaded write queueu. To avoid dead lock with too many writes. |
317 | 66 private LinkedList<PacketSet> writeQueue = new LinkedList<PacketSet>(); |
336 | 67 private int receive_port; |
68 private int parent_port; | |
101 | 69 static final int DEFAULT_PORT = 8766; |
363 | 70 // Queue limit for debugging purpose. |
332 | 71 private static final int packetLimit = 200; |
358 | 72 |
363 | 73 // globalSessionID = SessionManagerID * MAXID + localSessionID |
358 | 74 private static final int MAXID = 10000; |
353 | 75 SessionXMLDecoder decoder = new SessionXMLDecoder(); |
358 | 76 SessionXMLEncoder encoder = new SessionXMLEncoder(); |
363 | 77 // SocketChannel for our parent. At most one parent is allowed. |
355 | 78 private Forwarder sm_join_channel; |
363 | 79 // Routing table for session and session manager. |
369 | 80 private RoutingTable routingTable = new RoutingTable(this); |
365
c432755c3555
distributed session debug continue... SELECT/SELECT_ACK loop
kono
parents:
364
diff
changeset
|
81 private SessionManagerEvent execAfterConnect = null;; |
358 | 82 |
316 | 83 public static void main(String[] args) throws InterruptedException, IOException { |
84 | |
85 int port = DEFAULT_PORT; | |
86 int port_s = DEFAULT_PORT; | |
87 //System.setProperty("file.encoding", "UTF-8"); | |
88 if(args.length > 0){ | |
363 | 89 if (args.length!=2) { |
90 logger.writeLog("Usage: sessionManager our_port parent_port"); | |
91 return; | |
92 } | |
316 | 93 port = Integer.parseInt(args[0]); |
94 port_s = Integer.parseInt(args[1]); | |
95 } | |
96 SessionManager sm = new SessionManager(); | |
336 | 97 sm.setReceivePort(port); |
98 sm.setParentPort(port_s); | |
363 | 99 // Ok start main loop |
316 | 100 sm.init(port,new SessionManagerGUIimpl(sm)); |
101 } | |
102 | |
336 | 103 public void setReceivePort(int port) { |
104 receive_port = port; | |
105 } | |
106 | |
280 | 107 public void init(int port, SessionManagerGUI gui) throws IOException, InterruptedException { |
108 this.gui = gui; | |
109 init(port); | |
110 mainLoop(); | |
111 } | |
112 | |
113 private void init(int port) throws InterruptedException, IOException { | |
363 | 114 selector = REPSelector.<REPCommand>create(); |
186 | 115 REPServerSocketChannel<REPCommand> ssc = REPServerSocketChannel.<REPCommand>open(new REPCommandPacker()); |
363 | 116 ssc.configureBlocking(false); // Selector requires this |
117 ssc.socket().setReuseAddress(true); //reuse address 必須 | |
212 | 118 //getAllByNameで取れた全てのアドレスに対してbindする |
119 ssc.socket().bind(new InetSocketAddress(port)); | |
349 | 120 ssc.register(selector, SelectionKey.OP_ACCEPT, |
121 new Forwarder(this)); | |
6 | 122 |
358 | 123 sessionList = new SessionList(); |
7 | 124 smList = new SessionManagerList(); |
356 | 125 editorList = new EditorList(); |
212 | 126 waitingCommandInMerge = new LinkedList<PacketSet>(); |
228 | 127 |
215 | 128 |
155 | 129 } |
313 | 130 |
131 /* | |
363 | 132 * The main loop. |
133 * Check incoming events and waiting writes. | |
134 * Do select and call select() to check in coming packets. | |
313 | 135 * We wrote everything in one thread, but we can assign |
136 * one thread for each communication channel and GUI event. | |
137 */ | |
231 | 138 public void mainLoop() throws IOException { |
0 | 139 while(true){ |
328 | 140 checkWaitingCommandInMerge(); |
313 | 141 if (checkInputEvent() || |
328 | 142 checkWaitingWrite()) { |
313 | 143 // try to do fair execution for waiting task |
144 if(selector.selectNow() > 0) select(); | |
145 continue; | |
300 | 146 } |
313 | 147 // now we can wait for input packet or event |
233 | 148 selector.select(); |
144 | 149 select(); |
150 } | |
151 } | |
152 | |
363 | 153 /* |
154 * Synchronize GUI event in the main loop. | |
155 */ | |
313 | 156 private boolean checkInputEvent() { |
157 SessionManagerEvent e; | |
158 if((e = waitingEventQueue.poll())!=null){ | |
334 | 159 e.exec(this); |
313 | 160 return true; |
161 } | |
162 return false; | |
163 } | |
164 | |
363 | 165 /* |
166 * Write a packet during the main loop. | |
167 */ | |
313 | 168 private boolean checkWaitingWrite() throws IOException { |
317 | 169 PacketSet p = writeQueue.poll(); |
170 if (p!=null) { | |
327 | 171 p.channel.write(p.command); |
317 | 172 return true; |
313 | 173 } |
174 return false; | |
175 } | |
176 | |
308 | 177 /** |
178 * Check waiting command in merge | |
179 * @return true if there is a processed waiting command | |
180 * @throws IOException | |
181 */ | |
346 | 182 private void checkWaitingCommandInMerge() { |
328 | 183 List<PacketSet> w = waitingCommandInMerge; |
184 waitingCommandInMerge = new LinkedList<PacketSet>(); | |
185 for(PacketSet p: w) { | |
186 Editor e = p.getEditor(); | |
353 | 187 if(e.isMerging()) { // still merging do nothing |
328 | 188 waitingCommandInMerge.add(p); |
189 } else { | |
346 | 190 try { |
353 | 191 if (sessionManage(e, p.command)) { // we don't need this |
192 assert false; | |
193 return; | |
194 } | |
195 e.manage(p.command); | |
347 | 196 } catch (Exception e1) { |
349 | 197 // should be e.close()? |
346 | 198 close(p.channel); |
199 } | |
328 | 200 } |
201 } | |
202 } | |
363 | 203 |
204 /* | |
205 * If we have waiting write commands, further sent commands also | |
206 * wait to avoid out of order packet sending. | |
207 */ | |
208 public boolean hasWaitingCommand(REPSocketChannel<REPCommand>c) { | |
209 for(PacketSet p:waitingCommandInMerge) { | |
210 if (p.channel==c) { | |
211 return true; | |
212 } | |
213 } | |
214 return false; | |
215 } | |
328 | 216 |
363 | 217 /* |
218 * Close a channel in case of exception or close. | |
219 */ | |
346 | 220 private void close(REPSocketChannel<REPCommand> channel) { |
221 REPSelectionKey<REPCommand>key = channel.keyFor1(selector); | |
222 REPHandler handler = (REPHandler)key.attachment(); | |
223 key.cancel(); | |
224 handler.cancel(channel); | |
225 // we have to remove session/enditor | |
226 } | |
227 | |
228 | |
363 | 229 /* |
230 * Do select operation on the Selector. Each key has a forwarder. | |
231 * A forwarder can be a firstConnector, a forwarder for Session Manager | |
232 * or an Editor. | |
233 */ | |
144 | 234 private void select() throws IOException { |
231 | 235 |
236 Set<REPSelectionKey<REPCommand>> keys = selector.selectedKeys1(); | |
237 for(REPSelectionKey<REPCommand> key : keys){ | |
144 | 238 if(key.isAcceptable()){ |
363 | 239 /* |
240 * Incoming connection. We don't know which, editor or | |
241 * session manager. Assign FirstConnector to distinguish. | |
242 */ | |
199 | 243 REPSocketChannel<REPCommand> channel = key.accept(new REPCommandPacker()); |
337 | 244 logger.writeLog("SessionManager.select() : key.isAcceptable : channel = " + channel); |
363 | 245 registerChannel(channel, new FirstConnector(this)); |
144 | 246 channel = null; |
247 }else if(key.isReadable()){ | |
363 | 248 /* |
249 * Incoming packets are handled by a various forwarder. | |
250 * A hadler throw IOException() in case of a trouble to | |
251 * close the channel. | |
252 */ | |
212 | 253 REPHandler handler = (REPHandler)(key.attachment()); |
267 | 254 try { |
255 handler.handle(key); | |
358 | 256 } catch (IOException e) { |
267 | 257 key.cancel(); |
308 | 258 handler.cancel(key.channel1()); |
267 | 259 } |
0 | 260 } |
261 } | |
262 } | |
1 | 263 |
355 | 264 void registerChannel(REPSocketChannel<REPCommand> channel,Forwarder handler) throws IOException { |
2 | 265 if(channel == null) { |
266 return; | |
267 } | |
349 | 268 handler.setChannel(channel); |
2 | 269 channel.configureBlocking(false); |
355 | 270 channel.register(selector, SelectionKey.OP_READ, handler); |
2 | 271 } |
272 | |
363 | 273 /* |
274 * After loop detection, we give up session manager join. | |
275 */ | |
276 private void cancel_sm_join() { | |
364 | 277 logger.writeLog("Loop detected "+this); |
355 | 278 removeChannel(sm_join_channel); |
279 sm_join_channel=null; | |
280 } | |
281 | |
282 | |
363 | 283 private void removeChannel(Forwarder channel) { |
284 REPSelectionKey<REPCommand> key = channel.channel.keyFor1(selector); | |
355 | 285 key.cancel(); |
286 try { | |
364 | 287 channel.channel.close1(); |
355 | 288 } catch (IOException e) { |
289 } | |
349 | 290 } |
291 | |
320 | 292 |
319 | 293 void updateGUI() { |
212 | 294 //リストのコピーをGUIに渡す |
358 | 295 LinkedList<Session> sList = new LinkedList<Session>(sessionList.values()); |
365
c432755c3555
distributed session debug continue... SELECT/SELECT_ACK loop
kono
parents:
364
diff
changeset
|
296 LinkedList<Editor> eList; |
c432755c3555
distributed session debug continue... SELECT/SELECT_ACK loop
kono
parents:
364
diff
changeset
|
297 if (false) { |
c432755c3555
distributed session debug continue... SELECT/SELECT_ACK loop
kono
parents:
364
diff
changeset
|
298 // local editor only |
c432755c3555
distributed session debug continue... SELECT/SELECT_ACK loop
kono
parents:
364
diff
changeset
|
299 eList = new LinkedList<Editor>(); |
c432755c3555
distributed session debug continue... SELECT/SELECT_ACK loop
kono
parents:
364
diff
changeset
|
300 for(Editor e:editorList.values()) { |
c432755c3555
distributed session debug continue... SELECT/SELECT_ACK loop
kono
parents:
364
diff
changeset
|
301 if (getSMID(e.eid)==smList.sessionManagerID()) { |
c432755c3555
distributed session debug continue... SELECT/SELECT_ACK loop
kono
parents:
364
diff
changeset
|
302 eList.add(e); |
c432755c3555
distributed session debug continue... SELECT/SELECT_ACK loop
kono
parents:
364
diff
changeset
|
303 } |
c432755c3555
distributed session debug continue... SELECT/SELECT_ACK loop
kono
parents:
364
diff
changeset
|
304 } |
c432755c3555
distributed session debug continue... SELECT/SELECT_ACK loop
kono
parents:
364
diff
changeset
|
305 } else { |
c432755c3555
distributed session debug continue... SELECT/SELECT_ACK loop
kono
parents:
364
diff
changeset
|
306 eList = new LinkedList<Editor>(editorList.values()); |
c432755c3555
distributed session debug continue... SELECT/SELECT_ACK loop
kono
parents:
364
diff
changeset
|
307 } |
212 | 308 //GUIに反映 |
309 Runnable doRun = new DoGUIUpdate(sList, eList, gui); | |
279 | 310 gui.invokeLater(doRun); |
212 | 311 } |
312 | |
83 | 313 |
139 | 314 |
353 | 315 void setMyHostName(String localHostName) { |
308 | 316 myHost = localHostName + receive_port; |
164 | 317 setHostToEditor(myHost); |
318 } | |
319 | |
320 private void setHostToEditor(String myHost2) { | |
358 | 321 for(Editor editor : editorList.values()){ |
322 if (editor.channel!=null) | |
323 editor.setHost(myHost2); | |
164 | 324 } |
76 | 325 } |
0 | 326 |
355 | 327 |
328 /** | |
363 | 329 * GUI から、呼ばれて、Session Managerに接続する。 |
355 | 330 * Host 名のSession Manager に SM_JOIN する。自分は、Session を持っていては |
331 * ならない。複数のSession Managerにjoinすることは出来ない。(NATを実装するまでは)。 | |
332 * @param host | |
333 */ | |
364 | 334 public void connectSession(String host, int port) { |
355 | 335 if (sm_join_channel!=null) return; |
336 if (!sessionList.isEmpty()) return; | |
337 if (!smList.isMaster()) return; | |
363 | 338 /* |
339 * IPv6 対応では、複数のアドレスを取って、それのすべてに接続を試す必要が | |
340 * ある。 | |
341 */ | |
1 | 342 InetSocketAddress addr = new InetSocketAddress(host, port); |
343 try { | |
186 | 344 REPSocketChannel<REPCommand> sessionchannel = REPSocketChannel.<REPCommand>create(new REPCommandPacker()); |
1 | 345 sessionchannel.connect(addr); |
337 | 346 while(!sessionchannel.finishConnect()); |
365
c432755c3555
distributed session debug continue... SELECT/SELECT_ACK loop
kono
parents:
364
diff
changeset
|
347 Forwarder sm = new FirstConnector(this); |
355 | 348 registerChannel(sessionchannel, sm); |
349 | 349 sm_join(sm); |
1 | 350 }catch (IOException e) { |
351 } | |
352 } | |
364 | 353 |
354 public void connectSession(String host) { | |
355 connectSession(host,parent_port); | |
356 } | |
77 | 357 |
363 | 358 /** |
359 * channel に SMCMD_SM_JOIN command を送る。 | |
360 * @param channel | |
361 */ | |
349 | 362 private void sm_join(Forwarder channel){ |
355 | 363 sm_join_channel = channel; |
122 | 364 //SM_JOINコマンドを生成。 |
77 | 365 REPCommand command = new REPCommand(); |
366 command.setCMD(REP.SMCMD_SM_JOIN); | |
349 | 367 command.setEID(-1); // request Parent SessionManagerID |
368 command.setSID(-1); // request SessionManagerID | |
79 | 369 |
122 | 370 //hostnameをセット。 |
349 | 371 setMyHostName(channel.getLocalHostName()); |
82 | 372 |
355 | 373 String string = myHost; |
77 | 374 command.setString(string); |
375 | |
122 | 376 //SM_JOINコマンドを送信。 |
349 | 377 channel.send(command); |
363 | 378 // ack を受け取ったら、SessionManagerのListに追加。ここではやらない。 |
77 | 379 } |
349 | 380 |
361 | 381 /* |
382 * Select Session from Manager button | |
383 * selected editor is joined editor directly connected to this session | |
384 * manager. | |
385 */ | |
316 | 386 public void selectSession(SelectButtonEvent event) throws IOException { |
250 | 387 int sid = event.getSID(); |
358 | 388 Session session = sessionList.get(sid); |
365
c432755c3555
distributed session debug continue... SELECT/SELECT_ACK loop
kono
parents:
364
diff
changeset
|
389 if (session==null) throw new IOException(); |
320 | 390 Editor editor = (Editor)event.getEditor(); |
324 | 391 if (editor.hasSession()) return; |
365
c432755c3555
distributed session debug continue... SELECT/SELECT_ACK loop
kono
parents:
364
diff
changeset
|
392 // assert(getSMID(editor.eid)==smList.sessionManagerID()); |
c432755c3555
distributed session debug continue... SELECT/SELECT_ACK loop
kono
parents:
364
diff
changeset
|
393 // assert(editor.channel!=null); |
c432755c3555
distributed session debug continue... SELECT/SELECT_ACK loop
kono
parents:
364
diff
changeset
|
394 editor.setSID(sid); // mark as selected |
372 | 395 selectSession0(sid, session, editor.getEID(), editor); |
396 } | |
397 | |
398 private void selectSession0(int sid, Session session, int eid, Editor editor) { | |
399 if (editor.isDirect()&&editor.getEID()==eid) { | |
400 selectSession(sid, session, editor.getEID(), editor); | |
401 } else { | |
402 // we don't have this editor, search the editor first. | |
403 Forwarder next = routingTable.toSessionManager(getSMID(eid)); | |
404 // pass the select command to the next path. | |
405 REPCommand command = new REPCommand(); | |
406 command.setCMD(REP.SMCMD_SELECT0); | |
407 command.setSID(sid); | |
408 command.setEID(eid); | |
409 command.setString(editor.getHost()); | |
410 next.send(command); | |
411 } | |
358 | 412 } |
413 | |
361 | 414 /* |
415 * Select Session Protocol handler | |
363 | 416 * called from GUI or incoming SMCMD_SELECT command. |
361 | 417 */ |
358 | 418 private void selectSession(int sid, Session session, int eid, Forwarder editor) { |
372 | 419 if(session.hasOwner()){ |
363 | 420 // we have selected session. |
107 | 421 REPCommand sendCommand = new REPCommand(); |
358 | 422 if (editor.isDirect()&&editor.getEID()==eid) { |
363 | 423 // Found directly connected joined editor. Send join_ack(). |
361 | 424 session.addForwarder(editor); |
359
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
425 sendUpdate(session.getSID()); |
358 | 426 sendCommand.setCMD(REP.SMCMD_JOIN_ACK); |
427 } else { | |
363 | 428 // We have a session, but joined editor is on the other sm. |
358 | 429 // SELECT_ACK is sent to the session ring to |
363 | 430 // find out the joined editor. |
358 | 431 sendCommand.setCMD(REP.SMCMD_SELECT_ACK); |
432 } | |
370 | 433 sendCommand.setSID(sid); |
434 sendCommand.string = session.getName(); | |
435 sendCommand.setEID(eid); | |
436 editor.send(sendCommand); | |
107 | 437 }else { |
372 | 438 forwardSelect(sid, session, eid, editor); |
107 | 439 } |
8 | 440 } |
122 | 441 |
372 | 442 private void forwardSelect(int sid, Session session, int eid, |
443 Forwarder editor) { | |
444 Forwarder next; | |
445 // session searching continue... | |
446 next = routingTable.toSessionManager(getSMID(sid)); | |
447 // make a forwarding channel here | |
448 Forwarder f = createSessionDispatcher(sid, next); | |
449 session.setFirstForwarder(f); | |
450 session.addForwarder(editor); | |
451 // pass the select command to the next path. | |
452 REPCommand command = new REPCommand(); | |
453 command.setCMD(REP.SMCMD_SELECT); | |
454 command.setSID(sid); | |
455 command.setEID(eid); | |
456 command.setString(editor.getHost()); | |
457 next.send(command); | |
458 } | |
459 | |
460 private Forwarder createSessionDispatcher(int sid, Forwarder editor) { | |
461 Forwarder f = new Editor(this, false, makeID(editorList.newEid())); | |
462 f.setChannel(editor.channel); // incoming channel | |
463 f.setHost(myHost); | |
464 f.setSID(sid); | |
465 return f; | |
466 } | |
467 | |
363 | 468 /* |
469 * Create and send UPDATE command. | |
470 */ | |
359
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
471 private void sendUpdate(int sid) { |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
472 REPCommand command = makeREPCommandWithSessionList(REP.SMCMD_UPDATE); |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
473 command.setSID(sid); |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
474 command.setEID(REP.SM_EID.id); |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
475 smList.sendToMaster(command); |
358 | 476 } |
477 | |
363 | 478 /* |
479 * Create new editor in this sessin manager. A dummy editor | |
480 * is created also. | |
481 */ | |
359
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
482 public Editor newEditor(REPSocketChannel<REPCommand> channel) { |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
483 int eid = makeID(editorList.newEid()); |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
484 Editor editor = new Editor(this, eid, channel); |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
485 editorList.add(editor); |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
486 return editor; |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
487 } |
363 | 488 |
489 /* | |
490 * Create new session. | |
491 */ | |
359
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
492 public Session newSession(Forwarder master) { |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
493 int sid= makeID(sessionList.newSessionID()); |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
494 Session session = new Session(sid, master); |
360 | 495 sessionList.put(sid, session); |
359
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
496 return session; |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
497 } |
358 | 498 |
144 | 499 public void addWaitingCommand(PacketSet set) { |
212 | 500 waitingCommandInMerge.add(set); |
144 | 501 } |
148 | 502 |
363 | 503 /* |
504 * Synchronize GUI command in this session manager. | |
505 */ | |
222 | 506 public void buttonPressed(SessionManagerEvent event) { |
507 try { | |
308 | 508 waitingEventQueue.put(event); |
222 | 509 } catch (InterruptedException e) {} |
510 selector.wakeup(); | |
511 } | |
281 | 512 |
363 | 513 /* |
514 * Execute incoming event during the initialization for | |
515 * testing purpose. | |
516 */ | |
281 | 517 public void syncExec(SessionManagerEvent event) { |
518 try { | |
308 | 519 waitingEventQueue.put(event); |
281 | 520 } catch (InterruptedException e) { |
521 } | |
522 } | |
222 | 523 |
363 | 524 /* |
525 * GUI command interface for close session. | |
526 */ | |
259 | 527 public void closeSession(SessionManagerEvent event) { |
528 Session session = ((CloseButtonEvent) event).getSession(); | |
529 session.closeSession(); | |
530 sessionList.remove(session); | |
531 updateGUI(); | |
532 } | |
533 | |
363 | 534 /* |
535 * Remove editors which has the cannel. | |
536 */ | |
274 | 537 public void remove(REPSocketChannel<REPCommand> channel) { |
358 | 538 int i = 0; |
539 for(Session s:sessionList.values()) { | |
274 | 540 if (s.deleteEditor(channel)) { |
358 | 541 i++; |
274 | 542 } |
543 } | |
358 | 544 assert(i==1); |
274 | 545 // can be other session manager? what should I do? |
546 } | |
547 | |
317 | 548 |
549 public void addWriteQueue(PacketSet packetSet) { | |
324 | 550 writeQueue.addLast(packetSet); |
323 | 551 assert(writeQueue.size()<packetLimit) ; |
317 | 552 } |
553 | |
318 | 554 |
555 public void remove(Editor editor) { | |
358 | 556 Session s = sessionList.get(editor.getSID()); |
362 | 557 if (s==null) { |
558 assert(false); | |
559 editorList.remove(editor); | |
560 } else if (editor.isMaster()) { | |
358 | 561 removeSession(s); |
562 } else { | |
563 s.deleteForwarder(editor); | |
564 editorList.remove(editor); | |
318 | 565 } |
341 | 566 updateGUI(); |
567 } | |
568 | |
569 private void removeSession(Session s0) { | |
358 | 570 s0.remove(this); |
341 | 571 sessionList.remove(s0); |
359
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
572 sendUpdate(s0.getSID()); |
318 | 573 } |
574 | |
334 | 575 public void setParentPort(int port) { |
576 parent_port = port; | |
577 } | |
578 public int getParentPort() { | |
579 return parent_port; | |
580 } | |
581 | |
582 public int getPort() { | |
583 return receive_port; | |
584 } | |
585 | |
353 | 586 |
355 | 587 boolean sessionManage(Forwarder forwarder, REPCommand command) throws ClosedChannelException, |
358 | 588 IOException { |
355 | 589 switch(command.cmd){ |
353 | 590 |
591 // Session Manager Command | |
592 | |
593 case SMCMD_JOIN: | |
594 { | |
358 | 595 // first connection or forwarded command |
367 | 596 routingTable.add(forwarder,getSMID(command.eid)); |
358 | 597 if(isMaster()) { |
598 REPCommand ackCommand = new REPCommand(); | |
599 ackCommand.setCMD(REP.SMCMD_JOIN_ACK); | |
600 ackCommand.setEID(command.eid); | |
360 | 601 ackCommand.setSID(command.sid); |
358 | 602 ackCommand.string = command.string; |
603 smList.sendToSlaves(ackCommand); | |
604 registEditor(forwarder,ackCommand); | |
605 } else { | |
606 smList.sendToMaster(command); | |
607 } | |
360 | 608 updateGUI(); |
353 | 609 } |
610 | |
611 break; | |
612 | |
361 | 613 case SMCMD_PUT_ACK: |
366 | 614 if (forwarder.isDirect()) { |
615 // send put_ack to the editor now. | |
616 command.setCMD(REP.SMCMD_PUT_ACK); | |
617 command.string = command.string; | |
618 command.setEID(command.eid); | |
619 command.setSID(command.sid); | |
620 forwarder.send(command); | |
621 } | |
353 | 622 case SMCMD_JOIN_ACK: |
358 | 623 registEditor(forwarder,command); |
360 | 624 updateGUI(); |
353 | 625 break; |
626 | |
627 case SMCMD_PUT: | |
628 { | |
358 | 629 // first connection or forwarded command |
367 | 630 routingTable.add(forwarder,getSMID(command.eid)); |
370 | 631 REPCommand ack = new REPCommand(command); ack.setCMD(REP.SMCMD_PUT_ACK); |
632 if(isMaster()) { | |
371 | 633 // Reached to the top of the tree, multicast the ack. |
370 | 634 smList.sendToSlaves(ack); |
635 registEditor(forwarder,ack); | |
371 | 636 if (forwarder.isDirect()) { |
637 // If put editor on the master, no SMCMD_PUT_ACK is | |
638 // generated. Send ack to the editor now. | |
639 forwarder.send(ack); | |
640 } | |
358 | 641 } else { |
371 | 642 // Pass this to the master. |
358 | 643 smList.sendToMaster(command); |
361 | 644 // registEditor will be done by SMCMD_PUT_ACK |
645 } | |
360 | 646 updateGUI(); |
353 | 647 |
648 } | |
649 break; | |
358 | 650 |
372 | 651 case SMCMD_SELECT0: |
652 /* | |
653 * finding joining editor, do not make the path. | |
654 */ | |
655 Forwarder editor = editorList.get(command.eid); | |
656 if (editor==null|| !editor.isDirect()) { | |
657 Forwarder next = routingTable.toSessionManager(getSMID(command.eid)); | |
658 next.send(command); | |
659 break; | |
660 } | |
661 // we've found the editor, fall thru. | |
353 | 662 case SMCMD_SELECT: |
361 | 663 { |
372 | 664 /* |
665 * finding active session ring from joined editor. | |
666 */ | |
361 | 667 Session session = sessionList.get(command.sid); |
668 if (session==null) { | |
365
c432755c3555
distributed session debug continue... SELECT/SELECT_ACK loop
kono
parents:
364
diff
changeset
|
669 session = new Session(command.sid, command.string,null); |
c432755c3555
distributed session debug continue... SELECT/SELECT_ACK loop
kono
parents:
364
diff
changeset
|
670 sessionList.put(command.sid,session); |
361 | 671 } |
372 | 672 // Do not directly addForwarder(forwarder). It may be |
673 // shared among sessions. | |
674 Forwarder f = createSessionDispatcher(command.sid, forwarder); | |
675 session.addForwarder(f); // f.next is set up here. | |
361 | 676 selectSession(command.sid, session, command.eid, forwarder); |
677 } | |
678 break; | |
353 | 679 case SMCMD_SELECT_ACK: |
680 { | |
369 | 681 // Sessionが見つかったので、select したeditorに教える。 |
358 | 682 Session session = sessionList.get(command.sid); |
376 | 683 searchSelectedEditor(command,session.getForwarder(forwarder.channel)); |
353 | 684 } |
358 | 685 break; |
686 | |
353 | 687 case SMCMD_SM_JOIN: |
688 { | |
355 | 689 // SM_JOIN中にSMCMD_SM_JOINが来たら、これはループなので、 |
690 ///自分のSM_JOINを取り消す。 | |
691 if (sm_join_channel!=null) cancel_sm_join(); | |
353 | 692 // SMCMD_SM_JOIN は、master まで上昇する。 |
693 // masterでなければ、自分のparentに転送する。 | |
355 | 694 if(isMaster()) { |
353 | 695 // master であれば、SessionManager IDを決めて、 |
696 // 自分のsmList に登録 | |
355 | 697 Forwarder sm; |
698 int psid = command.eid; | |
699 if (forwarder.sid!=-1) { | |
700 // すでに channelはSessionManager Idを持っていて、 | |
701 // direct link ではないので、 | |
702 // channel を持たないForwarderとして登録する | |
703 sm = new Forwarder(this); | |
704 } else { | |
705 sm = forwarder; | |
706 } | |
367 | 707 int sid = smList.addNewSessionManager(sm,command); |
708 routingTable.add(forwarder,sid); | |
709 | |
355 | 710 REPCommand sendCommand = makeREPCommandWithSessionList(REP.SMCMD_SM_JOIN_ACK); |
711 // command.eid==smList.sesionManagerID() の場合は、 | |
712 // 待っている自分の下のsessionManagerにsidをassignする必要がある。 | |
353 | 713 sendCommand.setSID(sid); // new Session manager ID |
714 // 複数のSM_JOIN_ACKを識別するには、最初にSM_JOINを受け付けた | |
715 // Session manager IDを使う。 | |
355 | 716 sendCommand.setEID(psid); |
717 send_sm_join_ack(psid, sid, sendCommand); | |
718 } else { | |
719 if (forwarder.sid==-1) { | |
720 // direct link の場合は、識別のために、EIDに直上の | |
721 // smid を入れておく。 | |
722 command.setEID(smList.sessionManagerID()); | |
723 } | |
724 smList.sendToMaster(command); | |
353 | 725 } |
726 } | |
727 break; | |
728 | |
355 | 729 case SMCMD_SM_JOIN_ACK: |
730 send_sm_join_ack(command.eid, command.sid, command); | |
353 | 731 break; |
732 | |
733 case SMCMD_UPDATE: | |
359
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
734 if (!isMaster()) { |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
735 command.setString(mergeUpdate(command)); |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
736 // 上に知らせる |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
737 smList.sendToMaster(command); |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
738 break; |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
739 } |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
740 // fall thru |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
741 command.setCMD(REP.SMCMD_UPDATE_ACK); |
353 | 742 case SMCMD_UPDATE_ACK: |
358 | 743 command.setString(mergeUpdate(command)); |
744 // 下に知らせる | |
355 | 745 smList.sendToSlaves(command); |
358 | 746 updateGUI(); |
353 | 747 break; |
748 default: | |
749 return false; | |
750 } | |
751 return true; | |
752 } | |
753 | |
372 | 754 /* |
755 * 指定されたeditorがlocalにあるかどうかを調べる。なければ、他に送る。戻って何回も探すことが | |
756 * あり得るので、よろしくない。 | |
757 */ | |
758 private void searchSelectedEditor(REPCommand command, Forwarder editor) { | |
759 for(;editor.isDirect();editor = editor.getNextForwarder()) { | |
760 if (editor.getEID()==command.eid) { | |
761 // select したeditor を見つけた | |
762 command.cmd=REP.SMCMD_JOIN_ACK; | |
763 editor.send(command); | |
764 return; | |
765 } | |
766 } | |
767 // ここにはありませんでした。 | |
768 editor.send(command); | |
769 } | |
770 | |
355 | 771 |
363 | 772 /** |
773 * UPDATE/UPDATE_ACKにより送られてきたSessionの情報を追加する | |
774 * @param command | |
775 * @return | |
776 * @throws IOException | |
777 */ | |
358 | 778 private String mergeUpdate(REPCommand command) throws IOException { |
779 SessionList receivedSessionList; | |
780 try { | |
781 receivedSessionList = decoder.decode(command.string); | |
782 } catch (SAXException e) { | |
783 throw new IOException(); | |
784 } | |
363 | 785 // 受け取った情報と自分の情報を混ぜる。 |
786 sessionList.merge(receivedSessionList); | |
358 | 787 //XMLを生成。送信コマンドにセット。 |
788 return encoder.sessionListToXML(sessionList); | |
789 | |
790 } | |
791 | |
792 /* | |
793 * id has SessionManager ID part | |
794 */ | |
795 private int makeID(int newid) { | |
796 return newid+smList.sessionManagerID()*MAXID; | |
797 } | |
798 | |
799 private int getSMID(int id) { | |
800 return id/MAXID; | |
801 } | |
802 | |
803 | |
361 | 804 /** |
805 * Register Editor to our editorList. No connection is made. | |
806 * @param forwarder Editor to be add | |
807 * @param command | |
808 */ | |
367 | 809 public void registEditor(Forwarder forwarder,REPCommand command) { |
358 | 810 // make ack for PUT/JOIN. Do not send this to the editor, |
811 // before select. After select, ack is sent to the editor. | |
812 Editor editor; | |
367 | 813 if (getSMID(command.eid)==smList.sessionManagerID()) { |
814 if (forwarder.isDirect()) { | |
815 editor = (Editor)forwarder; | |
816 } else | |
817 return; | |
358 | 818 } else { |
819 editor = new Editor(this, command.cmd==REP.SMCMD_PUT_ACK, command.eid); | |
820 } | |
821 editor.setName(command.string); | |
822 editor.setSID(command.sid); | |
823 if (!editorList.hasEid(command.eid)) { | |
824 editorList.add(editor); | |
825 } | |
367 | 826 if (command.cmd==REP.SMCMD_PUT_ACK) { |
827 Session session = new Session(command.sid, command.string, editor); | |
828 sessionList.put(command.sid, session); | |
829 } | |
358 | 830 // we don't join ack to the direct linked editor. We |
831 // have to wait select command | |
832 } | |
833 | |
834 | |
355 | 835 void send_sm_join_ack(int psid, int sid,REPCommand sendCommand) { |
836 if (psid==smList.sessionManagerID()) { | |
837 // 直下のsessionManagerにIDを割り振る必要がある。 | |
838 smList.assignSessionManagerIDtoWaitingSM(sid); | |
839 // ここで smList に一つだけ追加されるので | |
358 | 840 // 待っている最初のsm一つにだけ、sm_join_ackが新たに送られる。 |
355 | 841 } |
842 smList.sendToSlaves(sendCommand); | |
843 } | |
844 | |
845 | |
846 private REPCommand makeREPCommandWithSessionList(REP cmd) { | |
847 //SessionListからXMLを生成。 | |
848 //joinしてきたSessionManagerに対してACKを送信。 | |
849 REPCommand sendCommand = new REPCommand(); | |
850 sendCommand.setCMD(cmd); | |
358 | 851 sendCommand.setString(encoder.sessionListToXML(sessionList)); |
355 | 852 return sendCommand; |
853 } | |
854 | |
855 | |
856 public boolean isMaster() { | |
857 return smList.isMaster(); | |
858 } | |
859 | |
860 | |
861 public void setSessionManagerID(int sid) { | |
862 smList.setSessionManagerID(sid); | |
863 } | |
864 | |
358 | 865 |
866 public Session getSession(int sid) { | |
867 return sessionList.get(sid); | |
868 } | |
869 | |
365
c432755c3555
distributed session debug continue... SELECT/SELECT_ACK loop
kono
parents:
364
diff
changeset
|
870 public void execAfterConnect(SessionManagerEvent sessionManagerEvent) { |
c432755c3555
distributed session debug continue... SELECT/SELECT_ACK loop
kono
parents:
364
diff
changeset
|
871 execAfterConnect = sessionManagerEvent; |
c432755c3555
distributed session debug continue... SELECT/SELECT_ACK loop
kono
parents:
364
diff
changeset
|
872 } |
c432755c3555
distributed session debug continue... SELECT/SELECT_ACK loop
kono
parents:
364
diff
changeset
|
873 |
c432755c3555
distributed session debug continue... SELECT/SELECT_ACK loop
kono
parents:
364
diff
changeset
|
874 public void afterConnect() { |
c432755c3555
distributed session debug continue... SELECT/SELECT_ACK loop
kono
parents:
364
diff
changeset
|
875 if (execAfterConnect!=null) execAfterConnect.exec(this); |
c432755c3555
distributed session debug continue... SELECT/SELECT_ACK loop
kono
parents:
364
diff
changeset
|
876 execAfterConnect = null; |
c432755c3555
distributed session debug continue... SELECT/SELECT_ACK loop
kono
parents:
364
diff
changeset
|
877 } |
c432755c3555
distributed session debug continue... SELECT/SELECT_ACK loop
kono
parents:
364
diff
changeset
|
878 |
c432755c3555
distributed session debug continue... SELECT/SELECT_ACK loop
kono
parents:
364
diff
changeset
|
879 public void setParent(Forwarder fw) { |
c432755c3555
distributed session debug continue... SELECT/SELECT_ACK loop
kono
parents:
364
diff
changeset
|
880 smList.setParent(fw); |
c432755c3555
distributed session debug continue... SELECT/SELECT_ACK loop
kono
parents:
364
diff
changeset
|
881 } |
367 | 882 |
883 public String toString() { | |
884 int myId = 0; | |
885 if (smList!=null) myId = smList.sessionManagerID(); | |
886 return "rep.SessionManager-"+myId+"@"+myHost+":"+receive_port; | |
887 } | |
365
c432755c3555
distributed session debug continue... SELECT/SELECT_ACK loop
kono
parents:
364
diff
changeset
|
888 |
364 | 889 |
0 | 890 } |