# HG changeset patch # User kono # Date 1224134745 -32400 # Node ID 0d47ff22ee0e6e5d0896ce28f75bbb400a82c67a # Parent 39223c6ef689ad5d617ee5caef04d150c4d3cd92 *** empty log message *** diff -r 39223c6ef689 -r 0d47ff22ee0e rep/Editor.java --- a/rep/Editor.java Thu Oct 16 13:52:11 2008 +0900 +++ b/rep/Editor.java Thu Oct 16 14:25:45 2008 +0900 @@ -196,7 +196,7 @@ REPSocketChannel channel = key.channel1(); REPCommand command = channel.read(); SessionManager.logger.writeLog("REPHandlerImpl.handle() read : command = " + command +" from "+channel); - if (manager.manage(this, command)) return; + if (manager.sessionManage(this, command)) return; manage(command); } diff -r 39223c6ef689 -r 0d47ff22ee0e rep/Forwarder.java --- a/rep/Forwarder.java Thu Oct 16 13:52:11 2008 +0900 +++ b/rep/Forwarder.java Thu Oct 16 14:25:45 2008 +0900 @@ -2,6 +2,7 @@ import java.io.IOException; + import rep.channel.REPLogger; import rep.channel.REPSelectionKey; import rep.channel.REPSocketChannel; @@ -51,7 +52,7 @@ } public boolean manage(REPCommand command) { - send(command); + next.send(command); return true; } @@ -76,7 +77,7 @@ REPSocketChannel channel = key.channel1(); REPCommand command = channel.read(); SessionManager.logger.writeLog("REPHandlerImpl.handle() : command = " + command); - if (manager.manage(this, command)) return; + if (manager.sessionManage(this, command)) return; Session s = manager.getSession(command.sid); Forwarder editor = s.getFirstForwarder(); if (editor!=null) { diff -r 39223c6ef689 -r 0d47ff22ee0e rep/SessionManager.java --- a/rep/SessionManager.java Thu Oct 16 13:52:11 2008 +0900 +++ b/rep/SessionManager.java Thu Oct 16 14:25:45 2008 +0900 @@ -14,6 +14,7 @@ import org.xml.sax.SAXException; + import rep.channel.REPLogger; import rep.channel.REPServerSocketChannel; import rep.channel.REPSocketChannel; @@ -60,9 +61,9 @@ private int parent_port; static final int DEFAULT_PORT = 8766; private static final int packetLimit = 200; - private SessionXMLDecoder decoder = new SessionXMLDecoder(); + SessionXMLDecoder decoder = new SessionXMLDecoder(); - private boolean smjoin_mode; + boolean smjoin_mode; public static void main(String[] args) throws InterruptedException, IOException { @@ -165,11 +166,15 @@ waitingCommandInMerge = new LinkedList(); for(PacketSet p: w) { Editor e = p.getEditor(); - if(e!=null &&e.isMerging()) { // still merging do nothing + if(e.isMerging()) { // still merging do nothing waitingCommandInMerge.add(p); } else { try { - manage(e, p.command); + if (sessionManage(e, p.command)) { // we don't need this + assert false; + return; + } + e.manage(p.command); } catch (Exception e1) { // should be e.close()? close(p.channel); @@ -218,7 +223,7 @@ } } - private void registerChannel(REPSocketChannel channel, int ops,Forwarder handler) throws IOException { + void registerChannel(REPSocketChannel channel, int ops,Forwarder handler) throws IOException { if(channel == null) { return; } @@ -228,240 +233,12 @@ } - boolean manage(Forwarder channel, - REPCommand receivedCommand) throws ClosedChannelException, - IOException, SAXException { - switch(receivedCommand.cmd){ - - // Session Manager Command - - case SMCMD_JOIN: - { - //どのSessionにも属さないエディタをリストに追加 - //エディタとchannelは1対1 (ではない) - //エディタが新しくputする場合は新しくソケットを作る - // ここのeditorList はsessionのとは別物 - Editor editor1 = new Editor(this,-1,channel.channel); - registerChannel(channel.channel,SelectionKey.OP_READ,editor1); - editor1.setHost(myHost); - editorList.add(editor1); - - updateGUI(); - - } - - break; - - case SMCMD_JOIN_ACK: - assert (false); - break; - - case SMCMD_PUT: - { - //Sessionを生成 - // sessionIDってglobaly uniqueだから、本来は、 - // 自分の親に作ってもらう必要がある。自分が親なら自分で作って良い。 - - int sid = sessionList.size(); - Editor editor = new Editor(this,0, channel.channel); - registerChannel(channel.channel,SelectionKey.OP_READ,editor); - editorList.add(editor); - editor.setHost(myHost); - Session session = new Session(sid, receivedCommand.string, editor); - session.hasOwner(true); - sessionList.add(session); - - updateGUI(); - - //エディタにAckを送信 - REPCommand sendCommand = new REPCommand(receivedCommand); - sendCommand.setCMD(REP.SMCMD_PUT_ACK); - sendCommand.setEID(editor.getEID()); - sendCommand.setSID(session.getSID()); - editor.send(sendCommand); - - //他のSessionManagerへSessionの追加を報告 - //親に送って、親から子へ - SessionXMLEncoder sessionEncoder = new SessionXMLEncoder(session); - REPCommand command = new REPCommand(); - command.setSID(session.getSID()); - command.setString(sessionEncoder.sessionListToXML()); - command.setCMD(REP.SMCMD_UPDATE); - smList.sendToSlaves( command); - - } - - break; - - // SELECT is no longer used in a editor. Select - // operation is handled in Session Manager Only - case SMCMD_SELECT: - { - //他のSessionManagerをエディタとしてSessionに追加 - Forwarder next = new Forwarder(this); - next.setChannel(channel.channel); - Session session = getSession(receivedCommand.sid); - session.addForwarder(next); - - if(session.hasOwner()){ - //このSessionManagerがオーナーを持っている場合、Sessionにエディタを追加し、エディタへAckを返す - REPCommand sendCommand = new REPCommand(receivedCommand); - sendCommand.setCMD(REP.SMCMD_SELECT_ACK); - sendCommand.setEID(next.getEID()); - next.send(sendCommand); - }else{ - //オーナーを持ってない場合は、オーナーを持っているSessionManagerへSELECTコマンドを中継する - Forwarder owner = session.getOwner(); - owner.send(receivedCommand); - } - } - - break; - - case SMCMD_SELECT_ACK: - { - String hostport = receivedCommand.string; - Forwarder editor1 = getEditor(hostport); - - if(editor1 != null) { - //host, port を見て、このコマンドが自分が送信したSelectコマンドのAckかどうかを判断する - REPCommand command = new REPCommand(); - command.setCMD(REP.SMCMD_JOIN_ACK); - command.setSID(receivedCommand.sid); - command.setEID(receivedCommand.eid); - editor1.send(command); - - }else{ - //自分が送信したコマンドでなければ、次のSessionManagerへ中継する - smList.sendToSlaves(receivedCommand); - } - } - - break; - case SMCMD_SM_JOIN: - - { - // SM_JOIN中にSMCMD_SM_JOINが来たら、自分のSM_JOINを - // 取り消す。 - if (smjoin_mode) cancel_sm_join(); - // SMCMD_SM_JOIN は、master まで上昇する。 - // masterでなければ、自分のparentに転送する。 - if(smList.isMaster()) { - // master であれば、SessionManager IDを決めて、 - // 自分のsmList に登録 - int sid = smList.addNewSessionManager(receivedCommand); - //SessionListからXMLを生成。 - //joinしてきたSessionManagerに対してACKを送信。 - SessionXMLEncoder sessionlistEncoder = new SessionXMLEncoder(sessionList); - REPCommand sendCommand = new REPCommand(); - sendCommand.setSID(sid); // new Session manager ID - // 複数のSM_JOIN_ACKを識別するには、最初にSM_JOINを受け付けた - // Session manager IDを使う。 - sendCommand.setEID(receivedCommand.eid); - sendCommand.setCMD(REP.SMCMD_SM_JOIN_ACK); - sendCommand.setString(sessionlistEncoder.sessionListToXML()); - smList.sendToSlaves(sendCommand); - break; - } - // - - //XMLからSessionListオブジェクトを生成する。 - //SessionList receivedSessionList = decoder.decode(receivedCommand.string); - - //myHost を設定。 - //立ち上げ時にやるとlocalhostしか取れない - if(myHost == null) setMyHostName(channel.getLocalHostName()); - - //maxHost を設定。 - // if(setMaxHost(channel, receivedSessionList.getMaxHost())){ - // REPCommand sendCommand = new REPCommand(); - // sendCommand.setCMD(REP.SMCMD_CH_MASTER); - // sendCommand.setString(maxHost); - // smList.sendExcept(channel, sendCommand); - // } - - - } - break; - - case SMCMD_SM_JOIN_ACK: - - //XMLからSessionListオブジェクトを生成。 - //SessionList receivedSessionList2 = decoder.decode(receivedCommand.string); - - //maxHostを決定。 - // if(setMaxHost(channel, receivedSessionList2.getMaxHost())){ - // REPCommand sendCommand = new REPCommand(); - // sendCommand.setCMD(REP.SMCMD_CH_MASTER); - // sendCommand.setString(maxHost); - // smList.sendExcept(channel, sendCommand); - // } - - - break; - - case SMCMD_UPDATE: - { - SessionList receivedSessionList3 = decoder.decode(receivedCommand.string); - - //UPDATEコマンドにより送られてきたSessionの情報を追加する - LinkedList list = receivedSessionList3.getList(); - for(Session session : list){ - session.getEditorList().get(0).setChannel(channel.channel); - sessionList.add(session); - } - - //他のSessionManagerへ中継する - smList.sendToSlaves(receivedCommand); - - updateGUI(); - } - break; - - case SMCMD_UPDATE_ACK: - { - if(!hasSession(receivedCommand.sid)) { - // accept new Session - // ここで初めてsession id が決まる。 - // このコマンドは、master session manager が出すはず - Forwarder sm = new Forwarder(this); - sm.setChannel(channel.channel); - Session session = new Session(receivedCommand.sid,receivedCommand.string,null); - session.addForwarder(sm); - - sessionList.add(session); - - updateGUI(); - } - smList.sendToSlaves(receivedCommand); - } - break; - -// case SMCMD_CH_MASTER: -// { -// //maxHost を設定。 -// if(setMaxHost(channel, receivedCommand.string)){ -// REPCommand sendCommand = new REPCommand(); -// sendCommand.setCMD(REP.SMCMD_CH_MASTER); -// sendCommand.setString(maxHost); -// smList.sendExcept(channel, sendCommand); -// } -// } -// break; - - - default: - return false; - } - return true; - } - - private void cancel_sm_join() { + void cancel_sm_join() { smjoin_mode=false; } - private boolean hasSession(int sid) { + boolean hasSession(int sid) { for(Session s:sessionList) { if (s.getSID()==sid) return true; } @@ -503,7 +280,7 @@ // } // } - private void setMyHostName(String localHostName) { + void setMyHostName(String localHostName) { myHost = localHostName + receive_port; // if(maxHost == null) { // maxHost = myHost; @@ -661,4 +438,232 @@ return receive_port; } + + boolean sessionManage(Forwarder forwarder, REPCommand receivedCommand) throws ClosedChannelException, + IOException, SAXException { + switch(receivedCommand.cmd){ + + // Session Manager Command + + case SMCMD_JOIN: + { + //どのSessionにも属さないエディタをリストに追加 + //エディタとchannelは1対1 (ではない) + //エディタが新しくputする場合は新しくソケットを作る + // ここのeditorList はsessionのとは別物 + Editor editor1 = new Editor(this,-1,forwarder.channel); + registerChannel(forwarder.channel,SelectionKey.OP_READ,editor1); + editor1.setHost(myHost); + editorList.add(editor1); + + updateGUI(); + + } + + break; + + case SMCMD_JOIN_ACK: + assert (false); + break; + + case SMCMD_PUT: + { + //Sessionを生成 + // sessionIDってglobaly uniqueだから、本来は、 + // 自分の親に作ってもらう必要がある。自分が親なら自分で作って良い。 + + int sid = sessionList.size(); + Editor editor = new Editor(this,0, forwarder.channel); + registerChannel(forwarder.channel,SelectionKey.OP_READ,editor); + editorList.add(editor); + editor.setHost(myHost); + Session session = new Session(sid, receivedCommand.string, editor); + session.hasOwner(true); + sessionList.add(session); + + updateGUI(); + + //エディタにAckを送信 + REPCommand sendCommand = new REPCommand(receivedCommand); + sendCommand.setCMD(REP.SMCMD_PUT_ACK); + sendCommand.setEID(editor.getEID()); + sendCommand.setSID(session.getSID()); + editor.send(sendCommand); + + //他のSessionManagerへSessionの追加を報告 + //親に送って、親から子へ + SessionXMLEncoder sessionEncoder = new SessionXMLEncoder(session); + REPCommand command = new REPCommand(); + command.setSID(session.getSID()); + command.setString(sessionEncoder.sessionListToXML()); + command.setCMD(REP.SMCMD_UPDATE); + smList.sendToSlaves( command); + + } + + break; + + // SELECT is no longer used in a editor. Select + // operation is handled in Session Manager Only + case SMCMD_SELECT: + { + //他のSessionManagerをエディタとしてSessionに追加 + Forwarder next = new Forwarder(this); + next.setChannel(forwarder.channel); + Session session = getSession(receivedCommand.sid); + session.addForwarder(next); + + if(session.hasOwner()){ + //このSessionManagerがオーナーを持っている場合、Sessionにエディタを追加し、エディタへAckを返す + REPCommand sendCommand = new REPCommand(receivedCommand); + sendCommand.setCMD(REP.SMCMD_SELECT_ACK); + sendCommand.setEID(next.getEID()); + next.send(sendCommand); + }else{ + //オーナーを持ってない場合は、オーナーを持っているSessionManagerへSELECTコマンドを中継する + Forwarder owner = session.getOwner(); + owner.send(receivedCommand); + } + } + + break; + + case SMCMD_SELECT_ACK: + { + String hostport = receivedCommand.string; + Forwarder editor1 = getEditor(hostport); + + if(editor1 != null) { + //host, port を見て、このコマンドが自分が送信したSelectコマンドのAckかどうかを判断する + REPCommand command = new REPCommand(); + command.setCMD(REP.SMCMD_JOIN_ACK); + command.setSID(receivedCommand.sid); + command.setEID(receivedCommand.eid); + editor1.send(command); + + }else{ + //自分が送信したコマンドでなければ、次のSessionManagerへ中継する + smList.sendToSlaves(receivedCommand); + } + } + + break; + case SMCMD_SM_JOIN: + + { + // SM_JOIN中にSMCMD_SM_JOINが来たら、自分のSM_JOINを + // 取り消す。 + if (smjoin_mode) cancel_sm_join(); + // SMCMD_SM_JOIN は、master まで上昇する。 + // masterでなければ、自分のparentに転送する。 + if(smList.isMaster()) { + // master であれば、SessionManager IDを決めて、 + // 自分のsmList に登録 + int sid = smList.addNewSessionManager(receivedCommand); + //SessionListからXMLを生成。 + //joinしてきたSessionManagerに対してACKを送信。 + SessionXMLEncoder sessionlistEncoder = new SessionXMLEncoder(sessionList); + REPCommand sendCommand = new REPCommand(); + sendCommand.setSID(sid); // new Session manager ID + // 複数のSM_JOIN_ACKを識別するには、最初にSM_JOINを受け付けた + // Session manager IDを使う。 + sendCommand.setEID(receivedCommand.eid); + sendCommand.setCMD(REP.SMCMD_SM_JOIN_ACK); + sendCommand.setString(sessionlistEncoder.sessionListToXML()); + smList.sendToSlaves(sendCommand); + break; + } + // + + //XMLからSessionListオブジェクトを生成する。 + //SessionList receivedSessionList = decoder.decode(receivedCommand.string); + + //myHost を設定。 + //立ち上げ時にやるとlocalhostしか取れない + if(myHost == null) setMyHostName(forwarder.getLocalHostName()); + + //maxHost を設定。 + // if(setMaxHost(channel, receivedSessionList.getMaxHost())){ + // REPCommand sendCommand = new REPCommand(); + // sendCommand.setCMD(REP.SMCMD_CH_MASTER); + // sendCommand.setString(maxHost); + // smList.sendExcept(channel, sendCommand); + // } + + + } + break; + + case SMCMD_SM_JOIN_ACK: + + //XMLからSessionListオブジェクトを生成。 + //SessionList receivedSessionList2 = decoder.decode(receivedCommand.string); + + //maxHostを決定。 + // if(setMaxHost(channel, receivedSessionList2.getMaxHost())){ + // REPCommand sendCommand = new REPCommand(); + // sendCommand.setCMD(REP.SMCMD_CH_MASTER); + // sendCommand.setString(maxHost); + // smList.sendExcept(channel, sendCommand); + // } + + + break; + + case SMCMD_UPDATE: + { + SessionList receivedSessionList3 = decoder.decode(receivedCommand.string); + + //UPDATEコマンドにより送られてきたSessionの情報を追加する + LinkedList list = receivedSessionList3.getList(); + for(Session session : list){ + session.getEditorList().get(0).setChannel(forwarder.channel); + sessionList.add(session); + } + + //他のSessionManagerへ中継する + smList.sendToSlaves(receivedCommand); + + updateGUI(); + } + break; + + case SMCMD_UPDATE_ACK: + { + if(!hasSession(receivedCommand.sid)) { + // accept new Session + // ここで初めてsession id が決まる。 + // このコマンドは、master session manager が出すはず + Forwarder sm = new Forwarder(this); + sm.setChannel(forwarder.channel); + Session session = new Session(receivedCommand.sid,receivedCommand.string,null); + session.addForwarder(sm); + + sessionList.add(session); + + updateGUI(); + } + smList.sendToSlaves(receivedCommand); + } + break; + + // case SMCMD_CH_MASTER: + // { + // //maxHost を設定。 + // if(setMaxHost(channel, receivedCommand.string)){ + // REPCommand sendCommand = new REPCommand(); + // sendCommand.setCMD(REP.SMCMD_CH_MASTER); + // sendCommand.setString(maxHost); + // smList.sendExcept(channel, sendCommand); + // } + // } + // break; + + + default: + return false; + } + return true; + } + }