# HG changeset patch # User kono # Date 1224439408 -32400 # Node ID 1a8856580d38c3fe7ae8d0cadb175bf8d48d9635 # Parent f0bd158dace67e3e28994858933b7148688b1767 *** empty log message *** diff -r f0bd158dace6 -r 1a8856580d38 rep/Forwarder.java --- a/rep/Forwarder.java Sun Oct 19 23:05:59 2008 +0900 +++ b/rep/Forwarder.java Mon Oct 20 03:03:28 2008 +0900 @@ -79,10 +79,10 @@ if (manager.sessionManage(this, command)) return; Session s = manager.getSession(command.sid); + if (s==null) throw new IOException(); Forwarder editor = s.getFirstForwarder(); - if (editor!=null) { - editor.manage(command); - } else throw new IOException(); + if (editor==null) throw new IOException(); + editor.manage(command); } public void setMode(REP cmd) { diff -r f0bd158dace6 -r 1a8856580d38 rep/SessionManager.java --- a/rep/SessionManager.java Sun Oct 19 23:05:59 2008 +0900 +++ b/rep/SessionManager.java Mon Oct 20 03:03:28 2008 +0900 @@ -46,27 +46,37 @@ public class SessionManager implements SessionManagerEventListener{ static public REPLogger logger = REPLogger.singleton(); - SessionList sessionList; + SessionList sessionList; private SessionManagerGUI gui; + // Main nio.Selector of this server private REPSelector selector; + // Known Session Manager List, At most one parent. No parent means master. SessionManagerList smList; + // Known Editor list. Connected Editor has a channel. + // Session Manager Channel may have dummy editors. EditorList editorList; - // editorList は、sessionList に入っているeditorとは別なeditorのlistらしい。 - // private String maxHost; + // Commands for busy editor are kept in this queue. private List waitingCommandInMerge; - private BlockingQueue waitingEventQueue = new LinkedBlockingQueue();; + // Command from gui. Synchronization is required. + private BlockingQueue waitingEventQueue + = new LinkedBlockingQueue();; + // host name of this server. One of connecting SocketChannel's hostname String myHost; + // Single threaded write queueu. To avoid dead lock with too many writes. private LinkedList writeQueue = new LinkedList(); private int receive_port; private int parent_port; static final int DEFAULT_PORT = 8766; + // Queue limit for debugging purpose. private static final int packetLimit = 200; + // globalSessionID = SessionManagerID * MAXID + localSessionID private static final int MAXID = 10000; SessionXMLDecoder decoder = new SessionXMLDecoder(); SessionXMLEncoder encoder = new SessionXMLEncoder(); + // SocketChannel for our parent. At most one parent is allowed. private Forwarder sm_join_channel; - + // Routing table for session and session manager. private RoutingTable routingTable = new RoutingTable(); public static void main(String[] args) throws InterruptedException, IOException { @@ -75,40 +85,35 @@ int port_s = DEFAULT_PORT; //System.setProperty("file.encoding", "UTF-8"); if(args.length > 0){ + if (args.length!=2) { + logger.writeLog("Usage: sessionManager our_port parent_port"); + return; + } port = Integer.parseInt(args[0]); port_s = Integer.parseInt(args[1]); } SessionManager sm = new SessionManager(); sm.setReceivePort(port); sm.setParentPort(port_s); + // Ok start main loop sm.init(port,new SessionManagerGUIimpl(sm)); - - } - public void setReceivePort(int port) { receive_port = port; } - - public void openSelector() throws IOException{ - selector = REPSelector.create(); - } - public void init(int port, SessionManagerGUI gui) throws IOException, InterruptedException { this.gui = gui; - openSelector(); init(port); mainLoop(); } - private void init(int port) throws InterruptedException, IOException { - + selector = REPSelector.create(); REPServerSocketChannel ssc = REPServerSocketChannel.open(new REPCommandPacker()); - ssc.configureBlocking(false); //reuse address 必須 - ssc.socket().setReuseAddress(true); + ssc.configureBlocking(false); // Selector requires this + ssc.socket().setReuseAddress(true); //reuse address 必須 //getAllByNameで取れた全てのアドレスに対してbindする ssc.socket().bind(new InetSocketAddress(port)); ssc.register(selector, SelectionKey.OP_ACCEPT, @@ -123,10 +128,12 @@ } /* + * The main loop. + * Check incoming events and waiting writes. + * Do select and call select() to check in coming packets. * We wrote everything in one thread, but we can assign * one thread for each communication channel and GUI event. */ - public void mainLoop() throws IOException { while(true){ checkWaitingCommandInMerge(); @@ -142,6 +149,9 @@ } } + /* + * Synchronize GUI event in the main loop. + */ private boolean checkInputEvent() { SessionManagerEvent e; if((e = waitingEventQueue.poll())!=null){ @@ -151,6 +161,9 @@ return false; } + /* + * Write a packet during the main loop. + */ private boolean checkWaitingWrite() throws IOException { PacketSet p = writeQueue.poll(); if (p!=null) { @@ -186,7 +199,23 @@ } } } + + /* + * If we have waiting write commands, further sent commands also + * wait to avoid out of order packet sending. + */ + public boolean hasWaitingCommand(REPSocketChannelc) { + for(PacketSet p:waitingCommandInMerge) { + if (p.channel==c) { + return true; + } + } + return false; + } + /* + * Close a channel in case of exception or close. + */ private void close(REPSocketChannel channel) { REPSelectionKeykey = channel.keyFor1(selector); REPHandler handler = (REPHandler)key.attachment(); @@ -196,26 +225,30 @@ } - public boolean hasWaitingCommand(REPSocketChannelc) { - for(PacketSet p:waitingCommandInMerge) { - if (p.channel==c) { - return true; - } - } - return false; - } - + /* + * Do select operation on the Selector. Each key has a forwarder. + * A forwarder can be a firstConnector, a forwarder for Session Manager + * or an Editor. + */ private void select() throws IOException { Set> keys = selector.selectedKeys1(); for(REPSelectionKey key : keys){ if(key.isAcceptable()){ + /* + * Incoming connection. We don't know which, editor or + * session manager. Assign FirstConnector to distinguish. + */ REPSocketChannel channel = key.accept(new REPCommandPacker()); logger.writeLog("SessionManager.select() : key.isAcceptable : channel = " + channel); - registerChannel (channel, new FirstConnector(this)); + registerChannel(channel, new FirstConnector(this)); channel = null; - }else if(key.isReadable()){ + /* + * Incoming packets are handled by a various forwarder. + * A hadler throw IOException() in case of a trouble to + * close the channel. + */ REPHandler handler = (REPHandler)(key.attachment()); try { handler.handle(key); @@ -236,18 +269,20 @@ channel.register(selector, SelectionKey.OP_READ, handler); } - - void cancel_sm_join() { + /* + * After loop detection, we give up session manager join. + */ + private void cancel_sm_join() { removeChannel(sm_join_channel); sm_join_channel=null; } - private void removeChannel(Forwarder sm_join_channel) { - REPSelectionKey key = sm_join_channel.channel.keyFor1(selector); + private void removeChannel(Forwarder channel) { + REPSelectionKey key = channel.channel.keyFor1(selector); key.cancel(); try { - sm_join_channel.channel.close(); + channel.channel.close(); } catch (IOException e) { } } @@ -266,9 +301,6 @@ void setMyHostName(String localHostName) { myHost = localHostName + receive_port; -// if(maxHost == null) { -// maxHost = myHost; -// } setHostToEditor(myHost); } @@ -281,6 +313,7 @@ /** + * GUI から、呼ばれて、Session Managerに接続する。 * Host 名のSession Manager に SM_JOIN する。自分は、Session を持っていては * ならない。複数のSession Managerにjoinすることは出来ない。(NATを実装するまでは)。 * @param host @@ -290,10 +323,13 @@ if (!sessionList.isEmpty()) return; if (!smList.isMaster()) return; int port = parent_port; + /* + * IPv6 対応では、複数のアドレスを取って、それのすべてに接続を試す必要が + * ある。 + */ InetSocketAddress addr = new InetSocketAddress(host, port); try { REPSocketChannel sessionchannel = REPSocketChannel.create(new REPCommandPacker()); - sessionchannel.connect(addr); while(!sessionchannel.finishConnect()); Forwarder sm = new Forwarder(this); @@ -303,6 +339,10 @@ } } + /** + * channel に SMCMD_SM_JOIN command を送る。 + * @param channel + */ private void sm_join(Forwarder channel){ sm_join_channel = channel; //SM_JOINコマンドを生成。 @@ -319,8 +359,7 @@ //SM_JOINコマンドを送信。 channel.send(command); - //SessionManagerのListに追加。 - + // ack を受け取ったら、SessionManagerのListに追加。ここではやらない。 } /* @@ -334,7 +373,7 @@ Editor editor = (Editor)event.getEditor(); if(editor == null){ - logger.writeLog("SessionManager.selectSession():editor = " + editor); + logger.writeLog("Error SessionManager.selectSession(): editor = " + editor); return; } if (editor.hasSession()) return; @@ -344,17 +383,21 @@ /* * Select Session Protocol handler + * called from GUI or incoming SMCMD_SELECT command. */ private void selectSession(int sid, Session session, int eid, Forwarder editor) { if(session.hasOwner()){ + // we have selected session. REPCommand sendCommand = new REPCommand(); if (editor.isDirect()&&editor.getEID()==eid) { + // Found directly connected joined editor. Send join_ack(). session.addForwarder(editor); sendUpdate(session.getSID()); sendCommand.setCMD(REP.SMCMD_JOIN_ACK); } else { + // We have a session, but joined editor is on the other sm. // SELECT_ACK is sent to the session ring to - // find out joined editor + // find out the joined editor. sendCommand.setCMD(REP.SMCMD_SELECT_ACK); // Do not directly addForwarder(forwarder). It may be // shared among sessions. @@ -362,16 +405,17 @@ f.setChannel(editor.channel); // incoming channel f.setHost(myHost); f.setSID(sid); - session.addForwarder(f); + session.addForwarder(f); // f.next is set up here. } sendCommand.setEID(editor.getEID()); sendCommand.setSID(sid); sendCommand.string = session.getName(); editor.send(sendCommand); }else { - // session searching + // session searching continue... Forwarder next = routingTable.toSession(sid); + // create dummy editor for this session Forwarder f = new Editor(this, false, makeID(editorList.newEid())); f.setChannel(editor.channel); // incoming channel f.setNext(next); @@ -379,6 +423,7 @@ f.setSID(sid); session.setFirstForwarder(f); + // pass the select command to the next path. REPCommand command = new REPCommand(); command.setCMD(REP.SMCMD_SELECT); command.setSID(sid); @@ -388,6 +433,9 @@ } } + /* + * Create and send UPDATE command. + */ private void sendUpdate(int sid) { REPCommand command = makeREPCommandWithSessionList(REP.SMCMD_UPDATE); command.setSID(sid); @@ -395,14 +443,20 @@ smList.sendToMaster(command); } + /* + * Create new editor in this sessin manager. A dummy editor + * is created also. + */ public Editor newEditor(REPSocketChannel channel) { int eid = makeID(editorList.newEid()); Editor editor = new Editor(this, eid, channel); editorList.add(editor); return editor; } - - + + /* + * Create new session. + */ public Session newSession(Forwarder master) { int sid= makeID(sessionList.newSessionID()); Session session = new Session(sid, master); @@ -414,6 +468,9 @@ waitingCommandInMerge.add(set); } + /* + * Synchronize GUI command in this session manager. + */ public void buttonPressed(SessionManagerEvent event) { try { waitingEventQueue.put(event); @@ -421,6 +478,10 @@ selector.wakeup(); } + /* + * Execute incoming event during the initialization for + * testing purpose. + */ public void syncExec(SessionManagerEvent event) { try { waitingEventQueue.put(event); @@ -428,6 +489,9 @@ } } + /* + * GUI command interface for close session. + */ public void closeSession(SessionManagerEvent event) { Session session = ((CloseButtonEvent) event).getSession(); session.closeSession(); @@ -435,6 +499,9 @@ updateGUI(); } + /* + * Remove editors which has the cannel. + */ public void remove(REPSocketChannel channel) { int i = 0; for(Session s:sessionList.values()) { @@ -629,6 +696,12 @@ } + /** + * UPDATE/UPDATE_ACKにより送られてきたSessionの情報を追加する + * @param command + * @return + * @throws IOException + */ private String mergeUpdate(REPCommand command) throws IOException { SessionList receivedSessionList; try { @@ -636,9 +709,9 @@ } catch (SAXException e) { throw new IOException(); } - // UPDATE/UPDATE_ACKにより送られてきたSessionの情報を追加する + // 受け取った情報と自分の情報を混ぜる。 + sessionList.merge(receivedSessionList); //XMLを生成。送信コマンドにセット。 - sessionList.merge(receivedSessionList); return encoder.sessionListToXML(sessionList); } @@ -646,11 +719,9 @@ /* * id has SessionManager ID part */ - private int makeID(int newid) { return newid+smList.sessionManagerID()*MAXID; } - private int getSMID(int id) { return id/MAXID;