Mercurial > hg > RemoteEditor > REPSessionManager
diff rep/SessionManager.java @ 355:98607350f7d1
*** empty log message ***
author | kono |
---|---|
date | Fri, 17 Oct 2008 22:11:34 +0900 |
parents | 0d47ff22ee0e |
children | b18c24dcc5d2 |
line wrap: on
line diff
--- a/rep/SessionManager.java Fri Oct 17 18:40:08 2008 +0900 +++ b/rep/SessionManager.java Fri Oct 17 22:11:34 2008 +0900 @@ -62,8 +62,7 @@ static final int DEFAULT_PORT = 8766; private static final int packetLimit = 200; SessionXMLDecoder decoder = new SessionXMLDecoder(); - - boolean smjoin_mode; + private Forwarder sm_join_channel; public static void main(String[] args) throws InterruptedException, IOException { @@ -208,7 +207,7 @@ if(key.isAcceptable()){ REPSocketChannel<REPCommand> channel = key.accept(new REPCommandPacker()); logger.writeLog("SessionManager.select() : key.isAcceptable : channel = " + channel); - registerChannel (channel, SelectionKey.OP_READ,new Forwarder(this)); + registerChannel (channel, new FirstConnector(this)); channel = null; }else if(key.isReadable()){ @@ -223,18 +222,29 @@ } } - void registerChannel(REPSocketChannel<REPCommand> channel, int ops,Forwarder handler) throws IOException { + void registerChannel(REPSocketChannel<REPCommand> channel,Forwarder handler) throws IOException { if(channel == null) { return; } handler.setChannel(channel); channel.configureBlocking(false); - channel.register(selector, ops, handler); + channel.register(selector, SelectionKey.OP_READ, handler); } void cancel_sm_join() { - smjoin_mode=false; + removeChannel(sm_join_channel); + sm_join_channel=null; + } + + + private void removeChannel(Forwarder sm_join_channel) { + REPSelectionKey<REPCommand> key = sm_join_channel.channel.keyFor1(selector); + key.cancel(); + try { + sm_join_channel.channel.close(); + } catch (IOException e) { + } } @@ -294,7 +304,16 @@ } } + + /** + * Host 名のSession Manager に SM_JOIN する。自分は、Session を持っていては + * ならない。複数のSession Managerにjoinすることは出来ない。(NATを実装するまでは)。 + * @param host + */ public void connectSession(String host) { + if (sm_join_channel!=null) return; + if (!sessionList.isEmpty()) return; + if (!smList.isMaster()) return; int port = parent_port; InetSocketAddress addr = new InetSocketAddress(host, port); try { @@ -303,14 +322,14 @@ sessionchannel.connect(addr); while(!sessionchannel.finishConnect()); Forwarder sm = new Forwarder(this); - registerChannel(sessionchannel, SelectionKey.OP_READ,sm); + registerChannel(sessionchannel, sm); sm_join(sm); }catch (IOException e) { } } private void sm_join(Forwarder channel){ - smjoin_mode = true; + sm_join_channel = channel; //SM_JOINコマンドを生成。 REPCommand command = new REPCommand(); command.setCMD(REP.SMCMD_SM_JOIN); @@ -321,8 +340,9 @@ setMyHostName(channel.getLocalHostName()); //XMLを生成。送信コマンドにセット。 - SessionXMLEncoder encoder = new SessionXMLEncoder(sessionList); - String string = encoder.sessionListToXML(); + //SessionXMLEncoder encoder = new SessionXMLEncoder(sessionList); + //String string = encoder.sessionListToXML(); + String string = myHost; command.setString(string); //SM_JOINコマンドを送信。 @@ -439,25 +459,15 @@ } - boolean sessionManage(Forwarder forwarder, REPCommand receivedCommand) throws ClosedChannelException, + boolean sessionManage(Forwarder forwarder, REPCommand command) throws ClosedChannelException, IOException, SAXException { - switch(receivedCommand.cmd){ + switch(command.cmd){ // Session Manager Command case SMCMD_JOIN: { - //どのSessionにも属さないエディタをリストに追加 - //エディタとchannelは1対1 (ではない) - //エディタが新しくputする場合は新しくソケットを作る - // ここのeditorList はsessionのとは別物 - Editor editor1 = new Editor(this,-1,forwarder.channel); - registerChannel(forwarder.channel,SelectionKey.OP_READ,editor1); - editor1.setHost(myHost); - editorList.add(editor1); - updateGUI(); - } break; @@ -473,18 +483,15 @@ // 自分の親に作ってもらう必要がある。自分が親なら自分で作って良い。 int sid = sessionList.size(); - Editor editor = new Editor(this,0, forwarder.channel); - registerChannel(forwarder.channel,SelectionKey.OP_READ,editor); - editorList.add(editor); - editor.setHost(myHost); - Session session = new Session(sid, receivedCommand.string, editor); + Editor editor = (Editor) forwarder; + Session session = new Session(sid, command.string, (Editor)forwarder); session.hasOwner(true); sessionList.add(session); updateGUI(); //エディタにAckを送信 - REPCommand sendCommand = new REPCommand(receivedCommand); + REPCommand sendCommand = new REPCommand(command); sendCommand.setCMD(REP.SMCMD_PUT_ACK); sendCommand.setEID(editor.getEID()); sendCommand.setSID(session.getSID()); @@ -493,10 +500,10 @@ //他のSessionManagerへSessionの追加を報告 //親に送って、親から子へ SessionXMLEncoder sessionEncoder = new SessionXMLEncoder(session); - REPCommand command = new REPCommand(); - command.setSID(session.getSID()); - command.setString(sessionEncoder.sessionListToXML()); - command.setCMD(REP.SMCMD_UPDATE); + REPCommand command1 = new REPCommand(); + command1.setSID(session.getSID()); + command1.setString(sessionEncoder.sessionListToXML()); + command1.setCMD(REP.SMCMD_UPDATE); smList.sendToSlaves( command); } @@ -510,19 +517,19 @@ //他のSessionManagerをエディタとしてSessionに追加 Forwarder next = new Forwarder(this); next.setChannel(forwarder.channel); - Session session = getSession(receivedCommand.sid); + Session session = getSession(command.sid); session.addForwarder(next); if(session.hasOwner()){ //このSessionManagerがオーナーを持っている場合、Sessionにエディタを追加し、エディタへAckを返す - REPCommand sendCommand = new REPCommand(receivedCommand); + REPCommand sendCommand = new REPCommand(command); sendCommand.setCMD(REP.SMCMD_SELECT_ACK); sendCommand.setEID(next.getEID()); next.send(sendCommand); }else{ //オーナーを持ってない場合は、オーナーを持っているSessionManagerへSELECTコマンドを中継する Forwarder owner = session.getOwner(); - owner.send(receivedCommand); + owner.send(command); } } @@ -530,20 +537,20 @@ case SMCMD_SELECT_ACK: { - String hostport = receivedCommand.string; + String hostport = command.string; Forwarder editor1 = getEditor(hostport); if(editor1 != null) { //host, port を見て、このコマンドが自分が送信したSelectコマンドのAckかどうかを判断する - REPCommand command = new REPCommand(); - command.setCMD(REP.SMCMD_JOIN_ACK); - command.setSID(receivedCommand.sid); - command.setEID(receivedCommand.eid); + REPCommand command1 = new REPCommand(); + command1.setCMD(REP.SMCMD_JOIN_ACK); + command1.setSID(command.sid); + command1.setEID(command.eid); editor1.send(command); }else{ //自分が送信したコマンドでなければ、次のSessionManagerへ中継する - smList.sendToSlaves(receivedCommand); + smList.sendToSlaves(command); } } @@ -551,68 +558,51 @@ case SMCMD_SM_JOIN: { - // SM_JOIN中にSMCMD_SM_JOINが来たら、自分のSM_JOINを - // 取り消す。 - if (smjoin_mode) cancel_sm_join(); + // SM_JOIN中にSMCMD_SM_JOINが来たら、これはループなので、 + ///自分のSM_JOINを取り消す。 + if (sm_join_channel!=null) cancel_sm_join(); // SMCMD_SM_JOIN は、master まで上昇する。 // masterでなければ、自分のparentに転送する。 - if(smList.isMaster()) { + if(isMaster()) { // master であれば、SessionManager IDを決めて、 // 自分のsmList に登録 - int sid = smList.addNewSessionManager(receivedCommand); - //SessionListからXMLを生成。 - //joinしてきたSessionManagerに対してACKを送信。 - SessionXMLEncoder sessionlistEncoder = new SessionXMLEncoder(sessionList); - REPCommand sendCommand = new REPCommand(); + Forwarder sm; + int psid = command.eid; + if (forwarder.sid!=-1) { + // すでに channelはSessionManager Idを持っていて、 + // direct link ではないので、 + // channel を持たないForwarderとして登録する + sm = new Forwarder(this); + } else { + sm = forwarder; + } + int sid = smList.addNewSessionManager(sm,command); + REPCommand sendCommand = makeREPCommandWithSessionList(REP.SMCMD_SM_JOIN_ACK); + // command.eid==smList.sesionManagerID() の場合は、 + // 待っている自分の下のsessionManagerにsidをassignする必要がある。 sendCommand.setSID(sid); // new Session manager ID // 複数のSM_JOIN_ACKを識別するには、最初にSM_JOINを受け付けた // Session manager IDを使う。 - sendCommand.setEID(receivedCommand.eid); - sendCommand.setCMD(REP.SMCMD_SM_JOIN_ACK); - sendCommand.setString(sessionlistEncoder.sessionListToXML()); - smList.sendToSlaves(sendCommand); - break; + sendCommand.setEID(psid); + send_sm_join_ack(psid, sid, sendCommand); + } else { + if (forwarder.sid==-1) { + // direct link の場合は、識別のために、EIDに直上の + // smid を入れておく。 + command.setEID(smList.sessionManagerID()); + } + smList.sendToMaster(command); } - // - - //XMLからSessionListオブジェクトを生成する。 - //SessionList receivedSessionList = decoder.decode(receivedCommand.string); - - //myHost を設定。 - //立ち上げ時にやるとlocalhostしか取れない - if(myHost == null) setMyHostName(forwarder.getLocalHostName()); - - //maxHost を設定。 - // if(setMaxHost(channel, receivedSessionList.getMaxHost())){ - // REPCommand sendCommand = new REPCommand(); - // sendCommand.setCMD(REP.SMCMD_CH_MASTER); - // sendCommand.setString(maxHost); - // smList.sendExcept(channel, sendCommand); - // } - - } break; - case SMCMD_SM_JOIN_ACK: - - //XMLからSessionListオブジェクトを生成。 - //SessionList receivedSessionList2 = decoder.decode(receivedCommand.string); - - //maxHostを決定。 - // if(setMaxHost(channel, receivedSessionList2.getMaxHost())){ - // REPCommand sendCommand = new REPCommand(); - // sendCommand.setCMD(REP.SMCMD_CH_MASTER); - // sendCommand.setString(maxHost); - // smList.sendExcept(channel, sendCommand); - // } - - + case SMCMD_SM_JOIN_ACK: + send_sm_join_ack(command.eid, command.sid, command); break; case SMCMD_UPDATE: { - SessionList receivedSessionList3 = decoder.decode(receivedCommand.string); + SessionList receivedSessionList3 = decoder.decode(command.string); //UPDATEコマンドにより送られてきたSessionの情報を追加する LinkedList<Session> list = receivedSessionList3.getList(); @@ -622,7 +612,7 @@ } //他のSessionManagerへ中継する - smList.sendToSlaves(receivedCommand); + smList.sendToSlaves(command); updateGUI(); } @@ -630,20 +620,20 @@ case SMCMD_UPDATE_ACK: { - if(!hasSession(receivedCommand.sid)) { + if(!hasSession(command.sid)) { // accept new Session // ここで初めてsession id が決まる。 // このコマンドは、master session manager が出すはず Forwarder sm = new Forwarder(this); sm.setChannel(forwarder.channel); - Session session = new Session(receivedCommand.sid,receivedCommand.string,null); + Session session = new Session(command.sid,command.string,null); session.addForwarder(sm); sessionList.add(session); updateGUI(); } - smList.sendToSlaves(receivedCommand); + smList.sendToSlaves(command); } break; @@ -666,4 +656,36 @@ return true; } + + void send_sm_join_ack(int psid, int sid,REPCommand sendCommand) { + if (psid==smList.sessionManagerID()) { + // 直下のsessionManagerにIDを割り振る必要がある。 + smList.assignSessionManagerIDtoWaitingSM(sid); + // ここで smList に一つだけ追加されるので + // 待っている最初のsm一つにだけ、sm_joinが新たに送られる。 + } + smList.sendToSlaves(sendCommand); + } + + + private REPCommand makeREPCommandWithSessionList(REP cmd) { + //SessionListからXMLを生成。 + //joinしてきたSessionManagerに対してACKを送信。 + SessionXMLEncoder sessionlistEncoder = new SessionXMLEncoder(sessionList); + REPCommand sendCommand = new REPCommand(); + sendCommand.setCMD(cmd); + sendCommand.setString(sessionlistEncoder.sessionListToXML()); + return sendCommand; + } + + + public boolean isMaster() { + return smList.isMaster(); + } + + + public void setSessionManagerID(int sid) { + smList.setSessionManagerID(sid); + } + }