comparison rep/SessionManager.java @ 319:dfed28488274

*** empty log message ***
author kono
date Thu, 09 Oct 2008 17:26:55 +0900
parents dc57e24ea3df
children 83790b8b8174
comparison
equal deleted inserted replaced
318:dc57e24ea3df 319:dfed28488274
42 byte[] text; 42 byte[] text;
43 */ 43 */
44 44
45 public class SessionManager implements SessionManagerEventListener{ 45 public class SessionManager implements SessionManagerEventListener{
46 46
47 private LinkedList<Session> sessionList; 47 LinkedList<Session> sessionList;
48 private SessionManagerGUI gui; 48 private SessionManagerGUI gui;
49 private REPSelector<REPCommand> selector; 49 private REPSelector<REPCommand> selector;
50 private SessionManagerList smList; 50 SessionManagerList smList;
51 private List<Editor> editorList; 51 List<Editor> editorList;
52 // editorList は、sessionList に入っているeditorとは別なeditorのlistらしい。 52 // editorList は、sessionList に入っているeditorとは別なeditorのlistらしい。
53 private String maxHost; 53 private String maxHost;
54 private List<PacketSet> waitingCommandInMerge; 54 private List<PacketSet> waitingCommandInMerge;
55 REPHandler normalHandler = new REPEditorHandler(this); 55 REPHandler normalHandler = new REPEditorHandler(this);
56 private BlockingQueue<SessionManagerEvent> waitingEventQueue = new LinkedBlockingQueue<SessionManagerEvent>();; 56 private BlockingQueue<SessionManagerEvent> waitingEventQueue = new LinkedBlockingQueue<SessionManagerEvent>();;
57 private String myHost; 57 String myHost;
58 private LinkedList<PacketSet> writeQueue = new LinkedList<PacketSet>(); 58 private LinkedList<PacketSet> writeQueue = new LinkedList<PacketSet>();
59 private static int receive_port; 59 private static int receive_port;
60 private static int parent_port; 60 private static int parent_port;
61 static final int DEFAULT_PORT = 8766; 61 static final int DEFAULT_PORT = 8766;
62 62
199 REPHandler handler = normalHandler; 199 REPHandler handler = normalHandler;
200 channel.register(selector, ops, handler); 200 channel.register(selector, ops, handler);
201 } 201 }
202 202
203 public void manage(REPSocketChannel<REPCommand> channel, REPCommand receivedCommand) throws IOException { 203 public void manage(REPSocketChannel<REPCommand> channel, REPCommand receivedCommand) throws IOException {
204 if(receivedCommand == null) return; 204 if (sessionManagerCommand(channel, receivedCommand)) return;
205 //Session session; 205 Session s = getSession(receivedCommand.sid);
206 REPSocketChannel<REPCommand> send = channel; 206 Editor e = s.getEditor(channel);
207 207 e.manage(receivedCommand);
208 }
209
210
211 private boolean sessionManagerCommand(REPSocketChannel<REPCommand> channel,
212 REPCommand receivedCommand) throws ClosedChannelException,
213 IOException {
208 switch(receivedCommand.cmd){ 214 switch(receivedCommand.cmd){
209 215
210 // Editor Command 216 // Session Manager Command
211 217
212 case REPCMD_DELETE:
213 case REPCMD_INSERT:
214 case REPCMD_NOP:
215 {
216 // sid から Session を取得
217 Session session = getSession(receivedCommand.sid);
218 if (session==null) throw new IOException();
219 // 次のエディタへコマンドを送信する処理
220 Editor editor = session.getEditor(channel);
221 editor.translate(session.getNextEditor(editor), receivedCommand);
222 break;
223 }
224
225 case SMCMD_JOIN: 218 case SMCMD_JOIN:
226 { 219 {
227 //どのSessionにも属さないエディタをリストに追加 220 //どのSessionにも属さないエディタをリストに追加
228 //エディタとchannelは1対1 (ではない) 221 //エディタとchannelは1対1 (ではない)
229 //エディタが新しくputする場合は新しくソケットを作る 222 //エディタが新しくputする場合は新しくソケットを作る
230 // ここのeditorList はsessionのとは別物 223 // ここのeditorList はsessionのとは別物
231 Editor editor = new Editor(this,editorList.size(),channel); 224 Editor editor1 = new Editor(this,editorList.size(),channel);
232 editor.setHost(myHost); 225 editor1.setHost(myHost);
233 editorList.add(editor); 226 editorList.add(editor1);
234 227
235 updateGUI(); 228 updateGUI();
236 229
237 } 230 }
238 231
239 break; 232 break;
240 233
241 case SMCMD_JOIN_ACK: 234 case SMCMD_JOIN_ACK:
242 assert (false); 235 assert (false);
243 break; 236 break;
244 237
245 case SMCMD_PUT: 238 case SMCMD_PUT:
246 { 239 {
247 //エディタのリストに追加
248 Editor editor = new Editor(this,editorList.size(), channel);
249 //editorList.add(editor);
250
251 //Sessionを生成 240 //Sessionを生成
252 int sid = sessionList.size(); 241 int sid = sessionList.size();
253 editor = new Editor(this,0, channel); 242 Editor editor2 = new Editor(this,0, channel);
254 editor.setHost(myHost); 243 editorList.add(editor2);
255 Session session = new Session(sid, receivedCommand.string, editor); 244 editor2.setHost(myHost);
245 Session session = new Session(sid, receivedCommand.string, editor2);
256 session.hasOwner(true); 246 session.hasOwner(true);
257 sessionList.add(session); 247 sessionList.add(session);
258 248
259 updateGUI(); 249 updateGUI();
260 250
261 //エディタにAckを送信 251 //エディタにAckを送信
262 REPCommand sendCommand = new REPCommand(receivedCommand); 252 REPCommand sendCommand = new REPCommand(receivedCommand);
263 sendCommand.setCMD(REP.SMCMD_PUT_ACK); 253 sendCommand.setCMD(REP.SMCMD_PUT_ACK);
264 sendCommand.setEID(editor.getEID()); 254 sendCommand.setEID(editor2.getEID());
265 sendCommand.setSID(session.getSID()); 255 sendCommand.setSID(session.getSID());
266 editor.send(sendCommand); 256 editor2.send(sendCommand);
267 257
268 //他のSessionManagerへSessionの追加を報告 258 //他のSessionManagerへSessionの追加を報告
269 //親に送って、親から子へ 259 //親に送って、親から子へ
270 SessionXMLEncoder sessionEncoder = new SessionXMLEncoder(session); 260 SessionXMLEncoder sessionEncoder = new SessionXMLEncoder(session);
271 REPCommand command = new REPCommand(); 261 REPCommand command = new REPCommand();
272 command.setSID(session.getSID()); 262 command.setSID(session.getSID());
273 command.setString(sessionEncoder.sessionListToXML()); 263 command.setString(sessionEncoder.sessionListToXML());
274 command.setCMD(REP.SMCMD_UPDATE); 264 command.setCMD(REP.SMCMD_UPDATE);
275 smList.sendExcept(channel, command); 265 smList.sendExcept(channel, command);
276 266
277 } 267 }
278 268
279 break; 269 break;
280 270
281 // SELECT is no longer used in a editor. Select 271 // SELECT is no longer used in a editor. Select
282 // operation is handled in Session Manager Only 272 // operation is handled in Session Manager Only
283 case SMCMD_SELECT: 273 case SMCMD_SELECT:
284 { 274 {
285 //他のSessionManagerをエディタとしてSessionに追加 275 //他のSessionManagerをエディタとしてSessionに追加
286 Editor editor = new Editor(this,0,channel); 276 Forwarder next = new Forwarder(this);
277 next.setChannel(channel);
287 Session session = getSession(receivedCommand.sid); 278 Session session = getSession(receivedCommand.sid);
288 session.addEditor(editor); 279 session.addForwarder(next);
289 280
290 if(session.hasOwner()){ 281 if(session.hasOwner()){
291 //このSessionManagerがオーナーを持っている場合、Sessionにエディタを追加し、エディタへAckを返す 282 //このSessionManagerがオーナーを持っている場合、Sessionにエディタを追加し、エディタへAckを返す
292 REPCommand sendCommand = new REPCommand(receivedCommand); 283 REPCommand sendCommand = new REPCommand(receivedCommand);
293 sendCommand.setCMD(REP.SMCMD_SELECT_ACK); 284 sendCommand.setCMD(REP.SMCMD_SELECT_ACK);
294 sendCommand.setEID(editor.getEID()); 285 sendCommand.setEID(next.getEID());
295 editor.send(sendCommand); 286 next.send(sendCommand);
296 }else{ 287 }else{
297 //オーナーを持ってない場合は、オーナーを持っているSessionManagerへSELECTコマンドを中継する 288 //オーナーを持ってない場合は、オーナーを持っているSessionManagerへSELECTコマンドを中継する
298 Forwarder owner = session.getOwner(); 289 Forwarder owner = session.getOwner();
299 owner.send(receivedCommand); 290 owner.send(receivedCommand);
300 } 291 }
301 } 292 }
302 293
303 break; 294 break;
304 295
305 case SMCMD_SELECT_ACK: 296 case SMCMD_SELECT_ACK:
306 { 297 {
307 String hostport = receivedCommand.string; 298 String hostport = receivedCommand.string;
308 Forwarder editor = getEditor(hostport); 299 Forwarder editor1 = getEditor(hostport);
309 300
310 if(editor != null) { 301 if(editor1 != null) {
311 //host, port を見て、このコマンドが自分が送信したSelectコマンドのAckかどうかを判断する 302 //host, port を見て、このコマンドが自分が送信したSelectコマンドのAckかどうかを判断する
312 REPCommand command = new REPCommand(); 303 REPCommand command = new REPCommand();
313 command.setCMD(REP.SMCMD_JOIN_ACK); 304 command.setCMD(REP.SMCMD_JOIN_ACK);
314 command.setSID(receivedCommand.sid); 305 command.setSID(receivedCommand.sid);
315 command.setEID(receivedCommand.eid); 306 command.setEID(receivedCommand.eid);
316 editor.send(command); 307 editor1.send(command);
317 308
318 }else{ 309 }else{
319 //自分が送信したコマンドでなければ、次のSessionManagerへ中継する 310 //自分が送信したコマンドでなければ、次のSessionManagerへ中継する
320 smList.sendExcept(channel, receivedCommand); 311 smList.sendExcept(channel, receivedCommand);
321 } 312 }
322 } 313 }
323 314
324 break; 315 break;
325
326 // Session Manager Command
327
328 case SMCMD_SM_JOIN: 316 case SMCMD_SM_JOIN:
329 317
330 { 318 {
331 // このchannelの相手は、SessionManager なので、 319 // このchannelの相手は、SessionManager なので、
332 // 特別なhandlerを接続する必要がある 320 // 特別なhandlerを接続する必要がある
356 //joinしてきたSessionManagerに対してACKを送信。 344 //joinしてきたSessionManagerに対してACKを送信。
357 SessionXMLEncoder sessionlistEncoder = new SessionXMLEncoder(sessionList); 345 SessionXMLEncoder sessionlistEncoder = new SessionXMLEncoder(sessionList);
358 REPCommand sendCommand = new REPCommand(); 346 REPCommand sendCommand = new REPCommand();
359 sendCommand.setCMD(REP.SMCMD_SM_JOIN_ACK); 347 sendCommand.setCMD(REP.SMCMD_SM_JOIN_ACK);
360 sendCommand.setString(sessionlistEncoder.sessionListToXML()); 348 sendCommand.setString(sessionlistEncoder.sessionListToXML());
361 send.write(sendCommand); 349 channel.write(sendCommand);
362 350
363 //その他の SessionManager に対して SMCMD_UPDATEを 送信。 351 //その他の SessionManager に対して SMCMD_UPDATEを 送信。
364 sendCommand = new REPCommand(); 352 sendCommand = new REPCommand();
365 sendCommand.setCMD(REP.SMCMD_UPDATE); 353 sendCommand.setCMD(REP.SMCMD_UPDATE);
366 sendCommand.setString(receivedCommand.string); 354 sendCommand.setString(receivedCommand.string);
409 } 397 }
410 break; 398 break;
411 399
412 case SMCMD_UPDATE_ACK: 400 case SMCMD_UPDATE_ACK:
413 { 401 {
414 if(receivedCommand.sid > sessionList.size()){ 402 if(!hasSession(receivedCommand.sid)) {
415 Editor editor = new Editor(this,0,channel); 403 // accept new Session
416 editor.setName(receivedCommand.string); 404 Forwarder editor = new Forwarder(this);
417 405 editor.setChannel(channel);
418 Session session = new Session(editor); 406
419 session.addEditor(editor); 407 Session session = new Session(receivedCommand.sid,receivedCommand.string,editor);
408 session.addEditor(this, session.newEid(), channel);
420 409
421 sessionList.add(session); 410 sessionList.add(session);
422 411
423 //リストのコピーをGUIに渡す 412 //リストのコピーをGUIに渡す
424 LinkedList<Session> sList = new LinkedList<Session>(sessionList); 413 LinkedList<Session> sList = new LinkedList<Session>(sessionList);
445 434
446 case SMCMD_START_MERGE_ACK: 435 case SMCMD_START_MERGE_ACK:
447 { 436 {
448 // sid から Session を取得 437 // sid から Session を取得
449 Session session = getSession(receivedCommand.sid); 438 Session session = getSession(receivedCommand.sid);
450 if (session==null) throw new IOException();
451 // マージの処理と次のエディタへコマンドを送信する処理 439 // マージの処理と次のエディタへコマンドを送信する処理
452 Editor editor = session.getEditor(channel); 440 Editor editor = session.getEditor(channel);
453 if (!editor.merge(editor,receivedCommand)) { 441 if (!editor.merge(editor,receivedCommand)) {
454 // nothing to do, send END_MERGE 442 // nothing to do, send END_MERGE
455 editor.endMerge(); 443 editor.endMerge();
456 } 444 }
457 break; 445 break;
458 } 446 }
459 case SMCMD_QUIT: 447
460 {
461 Session session = getSession(receivedCommand.sid);
462 if (session==null) throw new IOException();
463 session.sendToNextEditor(channel,receivedCommand);
464 break;
465 }
466 case SMCMD_QUIT_2:
467 {
468 Session session = getSession(receivedCommand.sid);
469 if (session==null) throw new IOException();
470 Forwarder me = session.getEditor(channel);
471 if (me==null) break; // already removed.
472 Forwarder editor = session.getNextEditor(me);
473 // don't send quit2 to the editor until all pending
474 // merge is processed.
475 editor.setQuit2(receivedCommand);
476 break;
477 }
478 default: 448 default:
479 assert(false); 449 return false;
480 break; 450 }
481 451 return true;
482 } 452 }
483 } 453
484 454 private boolean hasSession(int sid) {
485 private void updateGUI() { 455 for(Session s:sessionList) {
456 if (s.getSID()==sid) return true;
457 }
458 return false;
459 }
460
461
462 void updateGUI() {
486 //リストのコピーをGUIに渡す 463 //リストのコピーをGUIに渡す
487 LinkedList<Session> sList = new LinkedList<Session>(sessionList); 464 LinkedList<Session> sList = new LinkedList<Session>(sessionList);
488 LinkedList<Editor> eList = new LinkedList<Editor>(editorList); 465 LinkedList<Editor> eList = new LinkedList<Editor>(editorList);
489 //GUIに反映 466 //GUIに反映
490 Runnable doRun = new DoGUIUpdate(sList, eList, gui); 467 Runnable doRun = new DoGUIUpdate(sList, eList, gui);
491 gui.invokeLater(doRun); 468 gui.invokeLater(doRun);
492 } 469 }
493 470
494 private Forwarder getEditor(String hostport) { 471 Forwarder getEditor(String hostport) {
495 for(Editor editor : editorList){ 472 for(Editor editor : editorList){
496 if(editor.getHost() == hostport){ 473 if(editor.getHost() == hostport){
497 return editor; 474 return editor;
498 } 475 }
499 } 476 }
587 if(editor == null){ 564 if(editor == null){
588 System.out.println("SessionManager.selectSession():editor = " + editor); 565 System.out.println("SessionManager.selectSession():editor = " + editor);
589 return; 566 return;
590 } 567 }
591 568
592 session.addEditor(editor); 569
593 570
594 System.out.println("SessionManager.session.hasOnwer="+session.hasOwner()); 571 System.out.println("SessionManager.session.hasOnwer="+session.hasOwner());
595 if(session.hasOwner()){ 572 if(session.hasOwner()){
573 session.addForwarder(editor);
596 REPCommand sendCommand = new REPCommand(); 574 REPCommand sendCommand = new REPCommand();
597 sendCommand.setCMD(REP.SMCMD_JOIN_ACK); 575 sendCommand.setCMD(REP.SMCMD_JOIN_ACK);
598 sendCommand.setEID(editor.getEID()); 576 sendCommand.setEID(editor.getEID());
599 sendCommand.setSID(sid); 577 sendCommand.setSID(sid);
600 sendCommand.string = ""; 578 sendCommand.string = "";
602 }else { 580 }else {
603 sid = event.getSID(); 581 sid = event.getSID();
604 editor = new Editor(this,0,channel); 582 editor = new Editor(this,0,channel);
605 editor.setHost(myHost); 583 editor.setHost(myHost);
606 session = getSession(sid); 584 session = getSession(sid);
607 session.addEditor(editor); 585 session.addEditor(this,0,channel);
608
609 Forwarder owner = session.getOwner(); 586 Forwarder owner = session.getOwner();
610 587
611 REPCommand command = new REPCommand(); 588 REPCommand command = new REPCommand();
612 command.setCMD(REP.SMCMD_SELECT); 589 command.setCMD(REP.SMCMD_SELECT);
613 command.setSID(sid); 590 command.setSID(sid);