Mercurial > hg > RemoteEditor > REPSessionManager
annotate rep/SessionManager.java @ 359:fa041bae35f1
all code written for distributed session except gather.
author | kono |
---|---|
date | Sun, 19 Oct 2008 19:24:38 +0900 |
parents | 034acadc0cdc |
children | b25f832f875d |
rev | line source |
---|---|
266 | 1 |
0 | 2 package rep; |
3 | |
4 import java.io.IOException; | |
5 import java.net.InetSocketAddress; | |
267 | 6 import java.nio.channels.ClosedChannelException; |
0 | 7 import java.nio.channels.SelectionKey; |
83 | 8 import java.util.LinkedList; |
144 | 9 import java.util.List; |
231 | 10 import java.util.Set; |
178 | 11 import java.util.concurrent.BlockingQueue; |
192 | 12 import java.util.concurrent.LinkedBlockingQueue; |
0 | 13 |
346 | 14 import org.xml.sax.SAXException; |
15 | |
198 | 16 |
353 | 17 |
337 | 18 import rep.channel.REPLogger; |
123 | 19 import rep.channel.REPServerSocketChannel; |
133 | 20 import rep.channel.REPSocketChannel; |
144 | 21 import rep.handler.PacketSet; |
146 | 22 import rep.handler.REPHandler; |
158 | 23 import rep.channel.REPSelector; |
56 | 24 import rep.xml.SessionXMLDecoder; |
45 | 25 import rep.xml.SessionXMLEncoder; |
198 | 26 import rep.channel.REPSelectionKey; |
264 | 27 |
198 | 28 /* |
264 | 29 +-------+--------+--------+-------+--------+---------+------+ |
30 | cmd | session| editor | seqid | lineno | textsiz | text | | |
31 | | id | id | | | | | | |
32 +-------+--------+--------+-------+--------+---------+------+ | |
33 o---------- header section (network order) ----------o | |
34 | |
35 int cmd; kind of command | |
36 int sid; session ID : uniqu to editing file | |
37 int eid; editor ID : owner editor ID = 1。Session に対して unique | |
308 | 38 -1 session manager command |
39 -2 merge command | |
264 | 40 int seqno; Sequence number : sequence number はエディタごとに管理 |
41 int lineno; line number | |
42 int textsize; textsize : bytesize | |
43 byte[] text; | |
198 | 44 */ |
1 | 45 |
250 | 46 public class SessionManager implements SessionManagerEventListener{ |
337 | 47 static public REPLogger logger = REPLogger.singleton(); |
48 | |
358 | 49 SessionList sessionList; |
280 | 50 private SessionManagerGUI gui; |
198 | 51 private REPSelector<REPCommand> selector; |
319 | 52 SessionManagerList smList; |
356 | 53 EditorList editorList; |
304 | 54 // editorList は、sessionList に入っているeditorとは別なeditorのlistらしい。 |
349 | 55 // private String maxHost; |
212 | 56 private List<PacketSet> waitingCommandInMerge; |
308 | 57 private BlockingQueue<SessionManagerEvent> waitingEventQueue = new LinkedBlockingQueue<SessionManagerEvent>();; |
319 | 58 String myHost; |
317 | 59 private LinkedList<PacketSet> writeQueue = new LinkedList<PacketSet>(); |
336 | 60 private int receive_port; |
61 private int parent_port; | |
101 | 62 static final int DEFAULT_PORT = 8766; |
332 | 63 private static final int packetLimit = 200; |
358 | 64 |
65 private static final int MAXID = 10000; | |
353 | 66 SessionXMLDecoder decoder = new SessionXMLDecoder(); |
358 | 67 SessionXMLEncoder encoder = new SessionXMLEncoder(); |
355 | 68 private Forwarder sm_join_channel; |
349 | 69 |
358 | 70 private RoutingTable routingTable = new RoutingTable(); |
71 | |
316 | 72 public static void main(String[] args) throws InterruptedException, IOException { |
73 | |
74 int port = DEFAULT_PORT; | |
75 int port_s = DEFAULT_PORT; | |
76 //System.setProperty("file.encoding", "UTF-8"); | |
77 if(args.length > 0){ | |
78 port = Integer.parseInt(args[0]); | |
79 port_s = Integer.parseInt(args[1]); | |
80 } | |
81 SessionManager sm = new SessionManager(); | |
336 | 82 sm.setReceivePort(port); |
83 sm.setParentPort(port_s); | |
316 | 84 sm.init(port,new SessionManagerGUIimpl(sm)); |
85 | |
86 | |
87 } | |
88 | |
2 | 89 |
336 | 90 public void setReceivePort(int port) { |
91 receive_port = port; | |
92 } | |
93 | |
94 | |
2 | 95 public void openSelector() throws IOException{ |
231 | 96 selector = REPSelector.<REPCommand>create(); |
2 | 97 } |
280 | 98 |
99 public void init(int port, SessionManagerGUI gui) throws IOException, InterruptedException { | |
100 this.gui = gui; | |
101 openSelector(); | |
102 init(port); | |
103 mainLoop(); | |
104 } | |
105 | |
106 | |
107 private void init(int port) throws InterruptedException, IOException { | |
2 | 108 |
186 | 109 REPServerSocketChannel<REPCommand> ssc = REPServerSocketChannel.<REPCommand>open(new REPCommandPacker()); |
122 | 110 ssc.configureBlocking(false); //reuse address 必須 |
101 | 111 ssc.socket().setReuseAddress(true); |
212 | 112 //getAllByNameで取れた全てのアドレスに対してbindする |
113 ssc.socket().bind(new InetSocketAddress(port)); | |
349 | 114 ssc.register(selector, SelectionKey.OP_ACCEPT, |
115 new Forwarder(this)); | |
6 | 116 |
358 | 117 sessionList = new SessionList(); |
7 | 118 smList = new SessionManagerList(); |
356 | 119 editorList = new EditorList(); |
212 | 120 waitingCommandInMerge = new LinkedList<PacketSet>(); |
228 | 121 |
215 | 122 |
155 | 123 } |
313 | 124 |
125 /* | |
126 * We wrote everything in one thread, but we can assign | |
127 * one thread for each communication channel and GUI event. | |
128 */ | |
155 | 129 |
231 | 130 public void mainLoop() throws IOException { |
0 | 131 while(true){ |
328 | 132 checkWaitingCommandInMerge(); |
313 | 133 if (checkInputEvent() || |
328 | 134 checkWaitingWrite()) { |
313 | 135 // try to do fair execution for waiting task |
136 if(selector.selectNow() > 0) select(); | |
137 continue; | |
300 | 138 } |
313 | 139 // now we can wait for input packet or event |
233 | 140 selector.select(); |
144 | 141 select(); |
142 } | |
143 } | |
144 | |
313 | 145 private boolean checkInputEvent() { |
146 SessionManagerEvent e; | |
147 if((e = waitingEventQueue.poll())!=null){ | |
334 | 148 e.exec(this); |
313 | 149 return true; |
150 } | |
151 return false; | |
152 } | |
153 | |
154 private boolean checkWaitingWrite() throws IOException { | |
317 | 155 PacketSet p = writeQueue.poll(); |
156 if (p!=null) { | |
327 | 157 p.channel.write(p.command); |
317 | 158 return true; |
313 | 159 } |
160 return false; | |
161 } | |
162 | |
308 | 163 /** |
164 * Check waiting command in merge | |
165 * @return true if there is a processed waiting command | |
166 * @throws IOException | |
167 */ | |
346 | 168 private void checkWaitingCommandInMerge() { |
328 | 169 List<PacketSet> w = waitingCommandInMerge; |
170 waitingCommandInMerge = new LinkedList<PacketSet>(); | |
171 for(PacketSet p: w) { | |
172 Editor e = p.getEditor(); | |
353 | 173 if(e.isMerging()) { // still merging do nothing |
328 | 174 waitingCommandInMerge.add(p); |
175 } else { | |
346 | 176 try { |
353 | 177 if (sessionManage(e, p.command)) { // we don't need this |
178 assert false; | |
179 return; | |
180 } | |
181 e.manage(p.command); | |
347 | 182 } catch (Exception e1) { |
349 | 183 // should be e.close()? |
346 | 184 close(p.channel); |
185 } | |
328 | 186 } |
187 } | |
188 } | |
189 | |
346 | 190 private void close(REPSocketChannel<REPCommand> channel) { |
191 REPSelectionKey<REPCommand>key = channel.keyFor1(selector); | |
192 REPHandler handler = (REPHandler)key.attachment(); | |
193 key.cancel(); | |
194 handler.cancel(channel); | |
195 // we have to remove session/enditor | |
196 } | |
197 | |
198 | |
328 | 199 public boolean hasWaitingCommand(REPSocketChannel<REPCommand>c) { |
200 for(PacketSet p:waitingCommandInMerge) { | |
201 if (p.channel==c) { | |
212 | 202 return true; |
178 | 203 } |
204 } | |
205 return false; | |
206 } | |
207 | |
144 | 208 private void select() throws IOException { |
231 | 209 |
210 Set<REPSelectionKey<REPCommand>> keys = selector.selectedKeys1(); | |
211 for(REPSelectionKey<REPCommand> key : keys){ | |
144 | 212 if(key.isAcceptable()){ |
199 | 213 REPSocketChannel<REPCommand> channel = key.accept(new REPCommandPacker()); |
337 | 214 logger.writeLog("SessionManager.select() : key.isAcceptable : channel = " + channel); |
355 | 215 registerChannel (channel, new FirstConnector(this)); |
144 | 216 channel = null; |
123 | 217 |
144 | 218 }else if(key.isReadable()){ |
212 | 219 REPHandler handler = (REPHandler)(key.attachment()); |
267 | 220 try { |
221 handler.handle(key); | |
358 | 222 } catch (IOException e) { |
267 | 223 key.cancel(); |
308 | 224 handler.cancel(key.channel1()); |
267 | 225 } |
0 | 226 } |
227 } | |
228 } | |
1 | 229 |
355 | 230 void registerChannel(REPSocketChannel<REPCommand> channel,Forwarder handler) throws IOException { |
2 | 231 if(channel == null) { |
232 return; | |
233 } | |
349 | 234 handler.setChannel(channel); |
2 | 235 channel.configureBlocking(false); |
355 | 236 channel.register(selector, SelectionKey.OP_READ, handler); |
2 | 237 } |
238 | |
144 | 239 |
353 | 240 void cancel_sm_join() { |
355 | 241 removeChannel(sm_join_channel); |
242 sm_join_channel=null; | |
243 } | |
244 | |
245 | |
246 private void removeChannel(Forwarder sm_join_channel) { | |
247 REPSelectionKey<REPCommand> key = sm_join_channel.channel.keyFor1(selector); | |
248 key.cancel(); | |
249 try { | |
250 sm_join_channel.channel.close(); | |
251 } catch (IOException e) { | |
252 } | |
349 | 253 } |
254 | |
320 | 255 |
319 | 256 void updateGUI() { |
212 | 257 //リストのコピーをGUIに渡す |
358 | 258 LinkedList<Session> sList = new LinkedList<Session>(sessionList.values()); |
259 LinkedList<Editor> eList = new LinkedList<Editor>(editorList.values()); | |
212 | 260 //GUIに反映 |
261 Runnable doRun = new DoGUIUpdate(sList, eList, gui); | |
279 | 262 gui.invokeLater(doRun); |
212 | 263 } |
264 | |
83 | 265 |
139 | 266 |
353 | 267 void setMyHostName(String localHostName) { |
308 | 268 myHost = localHostName + receive_port; |
349 | 269 // if(maxHost == null) { |
270 // maxHost = myHost; | |
271 // } | |
164 | 272 setHostToEditor(myHost); |
273 } | |
274 | |
275 private void setHostToEditor(String myHost2) { | |
358 | 276 for(Editor editor : editorList.values()){ |
277 if (editor.channel!=null) | |
278 editor.setHost(myHost2); | |
164 | 279 } |
76 | 280 } |
0 | 281 |
355 | 282 |
283 /** | |
284 * Host 名のSession Manager に SM_JOIN する。自分は、Session を持っていては | |
285 * ならない。複数のSession Managerにjoinすることは出来ない。(NATを実装するまでは)。 | |
286 * @param host | |
287 */ | |
178 | 288 public void connectSession(String host) { |
355 | 289 if (sm_join_channel!=null) return; |
290 if (!sessionList.isEmpty()) return; | |
291 if (!smList.isMaster()) return; | |
334 | 292 int port = parent_port; |
1 | 293 InetSocketAddress addr = new InetSocketAddress(host, port); |
294 try { | |
186 | 295 REPSocketChannel<REPCommand> sessionchannel = REPSocketChannel.<REPCommand>create(new REPCommandPacker()); |
349 | 296 |
1 | 297 sessionchannel.connect(addr); |
337 | 298 while(!sessionchannel.finishConnect()); |
349 | 299 Forwarder sm = new Forwarder(this); |
355 | 300 registerChannel(sessionchannel, sm); |
349 | 301 sm_join(sm); |
1 | 302 }catch (IOException e) { |
303 } | |
304 } | |
77 | 305 |
349 | 306 private void sm_join(Forwarder channel){ |
355 | 307 sm_join_channel = channel; |
122 | 308 //SM_JOINコマンドを生成。 |
77 | 309 REPCommand command = new REPCommand(); |
310 command.setCMD(REP.SMCMD_SM_JOIN); | |
349 | 311 command.setEID(-1); // request Parent SessionManagerID |
312 command.setSID(-1); // request SessionManagerID | |
79 | 313 |
122 | 314 //hostnameをセット。 |
349 | 315 setMyHostName(channel.getLocalHostName()); |
82 | 316 |
355 | 317 String string = myHost; |
77 | 318 command.setString(string); |
319 | |
122 | 320 //SM_JOINコマンドを送信。 |
349 | 321 channel.send(command); |
122 | 322 //SessionManagerのListに追加。 |
349 | 323 |
77 | 324 } |
349 | 325 |
316 | 326 public void selectSession(SelectButtonEvent event) throws IOException { |
250 | 327 int sid = event.getSID(); |
358 | 328 Session session = sessionList.get(sid); |
227 | 329 |
320 | 330 Editor editor = (Editor)event.getEditor(); |
227 | 331 if(editor == null){ |
337 | 332 logger.writeLog("SessionManager.selectSession():editor = " + editor); |
227 | 333 return; |
334 } | |
324 | 335 if (editor.hasSession()) return; |
319 | 336 |
358 | 337 selectSession(sid, session, editor.getEID(), editor); |
338 } | |
339 | |
340 | |
341 private void selectSession(int sid, Session session, int eid, Forwarder editor) { | |
158 | 342 if(session.hasOwner()){ |
319 | 343 session.addForwarder(editor); |
107 | 344 REPCommand sendCommand = new REPCommand(); |
358 | 345 if (editor.isDirect()&&editor.getEID()==eid) { |
359
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
346 sendUpdate(session.getSID()); |
358 | 347 sendCommand.setCMD(REP.SMCMD_JOIN_ACK); |
348 } else { | |
349 // SELECT_ACK is sent to the session ring to | |
350 // find out joined editor | |
351 sendCommand.setCMD(REP.SMCMD_SELECT_ACK); | |
352 } | |
148 | 353 sendCommand.setEID(editor.getEID()); |
107 | 354 sendCommand.setSID(sid); |
286 | 355 sendCommand.string = ""; |
349 | 356 editor.send(sendCommand); |
107 | 357 }else { |
358 | 358 session.addForwarder(editor); |
107 | 359 editor.setHost(myHost); |
322 | 360 editor.setSID(sid); |
358 | 361 Forwarder next = routingTable.toSession(sid); |
107 | 362 |
363 REPCommand command = new REPCommand(); | |
364 command.setCMD(REP.SMCMD_SELECT); | |
365 command.setSID(sid); | |
358 | 366 command.setEID(eid); |
178 | 367 command.setString(editor.getHost()); |
358 | 368 next.send(command); |
107 | 369 } |
8 | 370 } |
122 | 371 |
359
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
372 private void sendUpdate(int sid) { |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
373 REPCommand command = makeREPCommandWithSessionList(REP.SMCMD_UPDATE); |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
374 command.setSID(sid); |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
375 command.setEID(REP.SM_EID.id); |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
376 smList.sendToMaster(command); |
358 | 377 } |
378 | |
359
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
379 public Editor newEditor(REPSocketChannel<REPCommand> channel) { |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
380 int eid = makeID(editorList.newEid()); |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
381 Editor editor = new Editor(this, eid, channel); |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
382 editorList.add(editor); |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
383 return editor; |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
384 } |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
385 |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
386 |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
387 public Session newSession(Forwarder master) { |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
388 int sid= makeID(sessionList.newSessionID()); |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
389 Session session = new Session(sid, master); |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
390 return session; |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
391 } |
358 | 392 |
144 | 393 public void addWaitingCommand(PacketSet set) { |
212 | 394 waitingCommandInMerge.add(set); |
144 | 395 } |
148 | 396 |
222 | 397 public void buttonPressed(SessionManagerEvent event) { |
398 try { | |
308 | 399 waitingEventQueue.put(event); |
222 | 400 } catch (InterruptedException e) {} |
401 selector.wakeup(); | |
402 } | |
281 | 403 |
404 public void syncExec(SessionManagerEvent event) { | |
405 try { | |
308 | 406 waitingEventQueue.put(event); |
281 | 407 } catch (InterruptedException e) { |
408 } | |
409 } | |
222 | 410 |
259 | 411 public void closeSession(SessionManagerEvent event) { |
412 Session session = ((CloseButtonEvent) event).getSession(); | |
413 session.closeSession(); | |
414 sessionList.remove(session); | |
415 updateGUI(); | |
416 } | |
417 | |
274 | 418 public void remove(REPSocketChannel<REPCommand> channel) { |
358 | 419 int i = 0; |
420 for(Session s:sessionList.values()) { | |
274 | 421 if (s.deleteEditor(channel)) { |
358 | 422 i++; |
274 | 423 } |
424 } | |
358 | 425 assert(i==1); |
274 | 426 // can be other session manager? what should I do? |
427 } | |
428 | |
317 | 429 |
430 public void addWriteQueue(PacketSet packetSet) { | |
324 | 431 writeQueue.addLast(packetSet); |
323 | 432 assert(writeQueue.size()<packetLimit) ; |
317 | 433 } |
434 | |
318 | 435 |
436 public void remove(Editor editor) { | |
358 | 437 Session s = sessionList.get(editor.getSID()); |
438 if (editor.isMaster()) { | |
439 removeSession(s); | |
440 } else { | |
441 s.deleteForwarder(editor); | |
442 editorList.remove(editor); | |
318 | 443 } |
341 | 444 updateGUI(); |
445 } | |
446 | |
447 private void removeSession(Session s0) { | |
358 | 448 s0.remove(this); |
341 | 449 sessionList.remove(s0); |
359
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
450 sendUpdate(s0.getSID()); |
318 | 451 } |
452 | |
334 | 453 public void setParentPort(int port) { |
454 parent_port = port; | |
455 } | |
456 public int getParentPort() { | |
457 return parent_port; | |
458 } | |
459 | |
460 public int getPort() { | |
461 return receive_port; | |
462 } | |
463 | |
353 | 464 |
355 | 465 boolean sessionManage(Forwarder forwarder, REPCommand command) throws ClosedChannelException, |
358 | 466 IOException { |
355 | 467 switch(command.cmd){ |
353 | 468 |
469 // Session Manager Command | |
470 | |
471 case SMCMD_JOIN: | |
472 { | |
358 | 473 // first connection or forwarded command |
474 if(isMaster()) { | |
475 REPCommand ackCommand = new REPCommand(); | |
476 ackCommand.setCMD(REP.SMCMD_JOIN_ACK); | |
477 ackCommand.setEID(command.eid); | |
478 ackCommand.string = command.string; | |
479 smList.sendToSlaves(ackCommand); | |
480 registEditor(forwarder,ackCommand); | |
481 } else { | |
482 routingTable.add(forwarder,getSMID(command.eid),command.sid); | |
483 smList.sendToMaster(command); | |
484 } | |
485 | |
353 | 486 } |
487 | |
488 break; | |
489 | |
490 case SMCMD_JOIN_ACK: | |
358 | 491 case SMCMD_PUT_ACK: |
492 registEditor(forwarder,command); | |
353 | 493 break; |
494 | |
495 case SMCMD_PUT: | |
496 { | |
358 | 497 // first connection or forwarded command |
498 if(isMaster()) { | |
499 command.setCMD(REP.SMCMD_PUT_ACK); | |
500 command.string = command.string; | |
501 command.setEID(command.eid); | |
502 command.setSID(command.sid); | |
503 smList.sendToSlaves(command); | |
504 registEditor(forwarder,command); | |
505 } else { | |
506 routingTable.add(forwarder,getSMID(command.eid),command.sid); | |
507 smList.sendToMaster(command); | |
508 } | |
353 | 509 |
510 } | |
511 break; | |
358 | 512 |
353 | 513 case SMCMD_SELECT: |
514 case SMCMD_SELECT_ACK: | |
515 { | |
358 | 516 Session session = sessionList.get(command.sid); |
517 selectSession(command.sid, session, command.eid, forwarder); | |
353 | 518 } |
358 | 519 break; |
520 | |
353 | 521 case SMCMD_SM_JOIN: |
522 { | |
355 | 523 // SM_JOIN中にSMCMD_SM_JOINが来たら、これはループなので、 |
524 ///自分のSM_JOINを取り消す。 | |
525 if (sm_join_channel!=null) cancel_sm_join(); | |
353 | 526 // SMCMD_SM_JOIN は、master まで上昇する。 |
527 // masterでなければ、自分のparentに転送する。 | |
355 | 528 if(isMaster()) { |
353 | 529 // master であれば、SessionManager IDを決めて、 |
530 // 自分のsmList に登録 | |
355 | 531 Forwarder sm; |
532 int psid = command.eid; | |
533 if (forwarder.sid!=-1) { | |
534 // すでに channelはSessionManager Idを持っていて、 | |
535 // direct link ではないので、 | |
536 // channel を持たないForwarderとして登録する | |
537 sm = new Forwarder(this); | |
538 } else { | |
539 sm = forwarder; | |
540 } | |
541 int sid = smList.addNewSessionManager(sm,command); | |
542 REPCommand sendCommand = makeREPCommandWithSessionList(REP.SMCMD_SM_JOIN_ACK); | |
543 // command.eid==smList.sesionManagerID() の場合は、 | |
544 // 待っている自分の下のsessionManagerにsidをassignする必要がある。 | |
353 | 545 sendCommand.setSID(sid); // new Session manager ID |
546 // 複数のSM_JOIN_ACKを識別するには、最初にSM_JOINを受け付けた | |
547 // Session manager IDを使う。 | |
355 | 548 sendCommand.setEID(psid); |
549 send_sm_join_ack(psid, sid, sendCommand); | |
550 } else { | |
551 if (forwarder.sid==-1) { | |
552 // direct link の場合は、識別のために、EIDに直上の | |
553 // smid を入れておく。 | |
554 command.setEID(smList.sessionManagerID()); | |
555 } | |
556 smList.sendToMaster(command); | |
353 | 557 } |
558 } | |
559 break; | |
560 | |
355 | 561 case SMCMD_SM_JOIN_ACK: |
562 send_sm_join_ack(command.eid, command.sid, command); | |
353 | 563 break; |
564 | |
565 case SMCMD_UPDATE: | |
359
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
566 if (!isMaster()) { |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
567 command.setString(mergeUpdate(command)); |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
568 // 上に知らせる |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
569 smList.sendToMaster(command); |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
570 break; |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
571 } |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
572 // fall thru |
fa041bae35f1
all code written for distributed session except gather.
kono
parents:
358
diff
changeset
|
573 command.setCMD(REP.SMCMD_UPDATE_ACK); |
353 | 574 case SMCMD_UPDATE_ACK: |
358 | 575 command.setString(mergeUpdate(command)); |
576 // 下に知らせる | |
355 | 577 smList.sendToSlaves(command); |
358 | 578 updateGUI(); |
353 | 579 break; |
580 default: | |
581 return false; | |
582 } | |
583 return true; | |
584 } | |
585 | |
355 | 586 |
358 | 587 private String mergeUpdate(REPCommand command) throws IOException { |
588 SessionList receivedSessionList; | |
589 try { | |
590 receivedSessionList = decoder.decode(command.string); | |
591 } catch (SAXException e) { | |
592 throw new IOException(); | |
593 } | |
594 // UPDATE/UPDATE_ACKにより送られてきたSessionの情報を追加する | |
595 //XMLを生成。送信コマンドにセット。 | |
596 sessionList.merge(receivedSessionList); | |
597 return encoder.sessionListToXML(sessionList); | |
598 | |
599 } | |
600 | |
601 /* | |
602 * id has SessionManager ID part | |
603 */ | |
604 | |
605 private int makeID(int newid) { | |
606 return newid+smList.sessionManagerID()*MAXID; | |
607 } | |
608 | |
609 private int getSessionID(int id) { | |
610 return id%MAXID; | |
611 } | |
612 | |
613 private int getSMID(int id) { | |
614 return id/MAXID; | |
615 } | |
616 | |
617 | |
618 private void registEditor(Forwarder forwarder,REPCommand command) { | |
619 // make ack for PUT/JOIN. Do not send this to the editor, | |
620 // before select. After select, ack is sent to the editor. | |
621 routingTable.add(forwarder,getSMID(command.eid),command.sid); | |
622 Editor editor; | |
623 if (getSessionID(command.sid)==smList.sessionManagerID() | |
624 && forwarder.isDirect()) { | |
625 // direct link だった | |
626 editor = (Editor)forwarder; | |
627 } else { | |
628 editor = new Editor(this, command.cmd==REP.SMCMD_PUT_ACK, command.eid); | |
629 } | |
630 editor.setName(command.string); | |
631 editor.setSID(command.sid); | |
632 if (!editorList.hasEid(command.eid)) { | |
633 editorList.add(editor); | |
634 updateGUI(); | |
635 } | |
636 // we don't join ack to the direct linked editor. We | |
637 // have to wait select command | |
638 } | |
639 | |
640 | |
355 | 641 void send_sm_join_ack(int psid, int sid,REPCommand sendCommand) { |
642 if (psid==smList.sessionManagerID()) { | |
643 // 直下のsessionManagerにIDを割り振る必要がある。 | |
644 smList.assignSessionManagerIDtoWaitingSM(sid); | |
645 // ここで smList に一つだけ追加されるので | |
358 | 646 // 待っている最初のsm一つにだけ、sm_join_ackが新たに送られる。 |
355 | 647 } |
648 smList.sendToSlaves(sendCommand); | |
649 } | |
650 | |
651 | |
652 private REPCommand makeREPCommandWithSessionList(REP cmd) { | |
653 //SessionListからXMLを生成。 | |
654 //joinしてきたSessionManagerに対してACKを送信。 | |
655 REPCommand sendCommand = new REPCommand(); | |
656 sendCommand.setCMD(cmd); | |
358 | 657 sendCommand.setString(encoder.sessionListToXML(sessionList)); |
355 | 658 return sendCommand; |
659 } | |
660 | |
661 | |
662 public boolean isMaster() { | |
663 return smList.isMaster(); | |
664 } | |
665 | |
666 | |
667 public void setSessionManagerID(int sid) { | |
668 smList.setSessionManagerID(sid); | |
669 } | |
670 | |
358 | 671 |
672 public Session getSession(int sid) { | |
673 return sessionList.get(sid); | |
674 } | |
675 | |
0 | 676 } |