# HG changeset patch # User one@firefly.cr.ie.u-ryukyu.ac.jp # Date 1226323474 -32400 # Node ID aa07134fea328f8c09a507f2f858cd448b3ed5c3 # Parent 4b535bef903a8f4b013d997769080db901956d10 23 diff -r 4b535bef903a -r aa07134fea32 rep/ServerMainLoop.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rep/ServerMainLoop.java Mon Nov 10 22:24:34 2008 +0900 @@ -0,0 +1,291 @@ +package rep; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectionKey; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import rep.channel.REPLogger; +import rep.channel.REPSelectionKey; +import rep.channel.REPSelector; +import rep.channel.REPServerSocketChannel; +import rep.channel.REPSocketChannel; +import rep.gui.DoGUIUpdate; +import rep.gui.SessionManagerEvent; +import rep.gui.SessionManagerGUI; +import rep.handler.FirstConnector; +import rep.handler.REPNode; + +public class ServerMainLoop { + + public static REPLogger logger = REPLogger.singleton(); + public SessionManager manager; + protected SessionManagerGUI gui; + protected REPSelector selector; + protected List waitingCommandInMerge= new LinkedList(); + private BlockingQueue waitingEventQueue + = new LinkedBlockingQueue(); + public String myHost; + private LinkedList writeQueue = new LinkedList(); + protected int receive_port; + protected int parent_port; + protected static final int DEFAULT_PORT = 8766; + private SessionManagerEvent execAfterConnect = null; + + + public void setReceivePort(int port) { + receive_port = port; + } + + void mainLoop(SessionManager sessionManager, int port, SessionManagerGUI gui) throws IOException, + SocketException, ClosedChannelException { + this.gui = gui; + manager = sessionManager; + receive_port = port; + serverInit(); + mainLoop(); + } + + public void mainLoop() throws IOException { + while(true){ + checkWaitingCommandInMerge(); + if (checkInputEvent() || + checkWaitingWrite()) { + // try to do fair execution for waiting task + if(selector.selectNow() > 0) select(); + continue; + } + // now we can wait for input packet or event + selector.select(); + select(); + } + } + + void serverInit() throws IOException, SocketException, + ClosedChannelException { + selector = REPSelector.create(); + REPServerSocketChannel ssc = REPServerSocketChannel.open(new REPCommandPacker()); + ssc.configureBlocking(false); // Selector requires this + ssc.socket().setReuseAddress(true); //reuse address 必須 + //getAllByNameで取れた全てのアドレスに対してbindする + try { + ssc.socket().bind(new InetSocketAddress("::",receive_port)); + } catch (SocketException e) { + // for some IPv6 imlementation + ssc.socket().bind(new InetSocketAddress(receive_port)); + } + ssc.register(selector, SelectionKey.OP_ACCEPT,null); + } + + private boolean checkInputEvent() { + SessionManagerEvent e; + if((e = waitingEventQueue.poll())!=null){ + e.exec(manager); + return true; + } + return false; + } + + private boolean checkWaitingWrite() throws IOException { + PacketSet p = writeQueue.poll(); + if (p!=null) { + p.channel.write(p.command); + return true; + } + return false; + } + + /** + * Check waiting command in merge + * @return true if there is a processed waiting command + * @throws IOException + */ + public void checkWaitingCommandInMerge() { + List w = waitingCommandInMerge; + waitingCommandInMerge = new LinkedList(); + for(PacketSet p: w) { + REPNode e = p.getEditor(); + if(e.isMerging()) { // still merging do nothing + waitingCommandInMerge.add(p); + } else { + try { +// if (manager.sessionManage(e, p.command)) { // we don't need this +// assert false; +// return; +// } + e.manage(p.command); + } catch (Exception e1) { + // should be e.close()? + close(p.channel); + } + } + } + } + + + public boolean hasWaitingCommand(REPSocketChannelc) { + for(PacketSet p:waitingCommandInMerge) { + if (p.channel==c) { + return true; + } + } + return false; + } + + private void close(REPSocketChannel channel) { + REPSelectionKeykey = channel.keyFor1(selector); + REPNode handler = (REPNode)key.attachment(); + key.cancel(); + handler.cancel(channel); + // we have to remove session/editor + } + + private void select() throws IOException { + + Set> keys = selector.selectedKeys1(); + for(REPSelectionKey key : keys){ + if(key.isAcceptable()){ + /* + * Incoming connection. We don't know which, editor or + * session manager. Assign FirstConnector to distinguish. + */ + REPSocketChannel channel = key.accept(new REPCommandPacker()); + logger.writeLog("SessionManager.select() : key.isAcceptable : channel = " + channel); + registerChannel(channel, new FirstConnector(manager,channel)); + } else if(key.isReadable()){ + /* + * Incoming packets are handled by a various forwarder. + * A handler throw IOException() in case of a trouble to + * close the channel. + */ + REPNode handler = (REPNode)key.attachment(); + try { + REPCommand command = key.channel1().read(); + handler.handle(command, key); + } catch (IOException e) { + key.cancel(); + handler.cancel(key.channel1()); + } + } + } + } + + public void registerChannel(REPSocketChannel channel, REPNode handler) throws IOException { + if(channel == null) { + return; + } + // handler.setChannel(channel); + channel.configureBlocking(false); + channel.register(selector, SelectionKey.OP_READ, handler); + } + + protected void updateGUI() { + //リストのコピーをGUIに渡す + LinkedList sList = new LinkedList(manager.sessionList.values()); + LinkedList eList; + if (false) { + // local editor only + eList = new LinkedList(); + for(REPNode e:manager.editorList.values()) { + if (manager.getSMID(e.eid)==manager.smList.sessionManagerID()) { + eList.add(e); + } + } + } else { + eList = new LinkedList(manager.editorList.values()); + } + //GUIに反映 + Runnable doRun = new DoGUIUpdate(sList, eList, gui); + gui.invokeLater(doRun); + } + + public void setMyHostName(String localHostName) { + myHost = localHostName + receive_port; + setHostToEditor(myHost); + } + + public String myHost() { + return myHost; + } + + private void setHostToEditor(String myHost2) { + for(REPNode editor : manager.editorList.values()){ + if (editor.channel!=null) + editor.setHost(myHost2); + } + } + + public void addWaitingCommand(PacketSet set) { + waitingCommandInMerge.add(set); + } + + public void buttonPressed(SessionManagerEvent event) { + try { + waitingEventQueue.put(event); + } catch (InterruptedException e) {} + selector.wakeup(); + } + + public void syncExec(SessionManagerEvent event) { + try { + waitingEventQueue.put(event); + } catch (InterruptedException e) { + } + } + + public void addWriteQueue(PacketSet packetSet) { + writeQueue.addLast(packetSet); + assert(writeQueue.size() key = channel.channel.keyFor1(selector); + key.cancel(); + try { + channel.channel.close1(); + } catch (IOException e) { + } + } + + public String toString() { + return ""+myHost+":"+receive_port; + } + + + public void setGUI(SessionManagerGUI gui) { + this.gui = gui; + } + + public void setManager(SessionManager sessionManager) { + manager = sessionManager; + } + +} \ No newline at end of file diff -r 4b535bef903a -r aa07134fea32 rep/Translator.java --- a/rep/Translator.java Mon Nov 10 22:24:21 2008 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,47 +0,0 @@ -package rep; - -import rep.handler.REPNode; - -public interface Translator { - - /* - * Interface で分離する意味が良くわからない。Application毎に - * Transltorを換えるつもりなのか? - */ - /** - * Translate command When the editor send REPCommand to remote editor. - * @param command which the editor want to send. - * @return translated command which should be sent by the editor. - */ - public REPCommand transSendCmd(REPCommand cmd); - - /** - * Inform translater about that the editor receive own command which it sent. - * but in this case, you can use also transReceiveCmd() - * @param command which the editor sent. - */ - public boolean catchOwnCommand(REPNode editor); - - /** - * Translate Command cmd that was received from SeMa. - * @param cmd the command to be translated. - * @return translated command. - */ - public void transReceiveCmd(REPNode next,REPCommand cmd); - - /** - * set the editor's id. - * @param editor's id. - */ - public void setEid(int _eid); - - boolean checkMergeConflict(REPCommand command); - - public void getMergeAgain(REPNode editor); - - public void startMerge(REPCommand cmd); - - public boolean isMerging(); - - public void mergeAck(); -} diff -r 4b535bef903a -r aa07134fea32 rep/TranslatorImpl.java --- a/rep/TranslatorImpl.java Mon Nov 10 22:24:21 2008 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,228 +0,0 @@ -package rep; - -import java.util.Collection; -import java.util.Comparator; -import java.util.LinkedList; -import java.util.List; -import java.util.TreeSet; - -import rep.handler.REPNode; -import rep.optimizers.REPCommandOptimizer; - -public class TranslatorImpl implements Translator{ - public int eid; - - public REPCommandOptimizer optimizer; - private LinkedList unMergedCmds; - public LinkedList sentMergedList; - private LinkedList mergeAgainList; - boolean merge_mode = false; - - public TranslatorImpl(int _eid,REPCommandOptimizer opt){ - eid = _eid; - optimizer = opt; - unMergedCmds = new LinkedList(); - mergeAgainList = new LinkedList(); - sentMergedList = new LinkedList(); - } - - /** - * New command from an editor - * The command is sent to the next editor - * @param cmd - * @return translated command. - */ - public REPCommand transSendCmd(REPCommand cmd){ - assert(cmd.eid==eid); - unMergedCmds.add(cmd); - - //マージ中にユーザから割り込みがあった場合 - if(isMerging()){ - mergeAgainList.add(cmd); - } - - return cmd; - } - /** - * My command is returned from the session ring, and START_MERGE_ACK - * is returned. At this - * stage my writeQueue is empty, our editor is waiting for me. - * Start merge process. - * @param cmd - */ - public boolean catchOwnCommand(REPNode editor){ - LinkedList output = new LinkedList(); - LinkedList cmds = new LinkedList(); - //スタック上にあるコマンドを全部undoコマンドにする - while ( !unMergedCmds.isEmpty() ){ - REPCommand cmd0 = unMergedCmds.removeLast(); - output.add( createUndo(cmd0) ); - cmds.add(cmd0); - } - - /* 必要な分だけソートして返却用のリストに追加 */ - output.addAll( sortCmds(cmds) ); - - /* 残ったコマンドも再び実行させるが、まだマージされてないのでunMergedにも入れる */ - output.addAll(cmds); - for(REPCommand c: cmds) { - output.add(c); - unMergedCmds.add(c); - } - return optimizedSend(editor,output); - } - - /** - * Sent optimized merged command list - * @param editor - * @param output - * @return if any sent commands output - */ - public boolean optimizedSend(REPNode editor, LinkedList output) { - List output1 = optimizer.optimize(output); - if (output1.size()==0) { - merge_mode = false; - return false; - } - for(REPCommand c:output1) { - REPCommand m = new REPCommand(c); - m.setEID(REP.MERGE_EID.id); - m.setSEQID(editor.seq()); - sentMergedList.add(m); - editor.send(m); - } - return true; - } - - private REPCommand createUndo(REPCommand cmd){ - REPCommand retCmd = new REPCommand(cmd); - if (cmd.cmd==REP.REPCMD_INSERT) retCmd.cmd=REP.REPCMD_DELETE; - else if (cmd.cmd==REP.REPCMD_DELETE) retCmd.cmd=REP.REPCMD_INSERT; - return retCmd; - } - - class REPCommandComparator implements Comparator{ - - public int compare(REPCommand o1, REPCommand o2) { - - if ( o2.lineno > o1.lineno ) return 1; - else if ( o2.lineno < o1.lineno - || o2.eid > o1.eid ) - return -1; - - return 1; - } - - } - - private Collection sortCmds(LinkedList cmds) { - TreeSet sortedCmds1 = new TreeSet(new REPCommandComparator()); - int top; - int prevEid=-1; - while ( -1 != (top=getPrecedence(cmds, prevEid+1)) ){ - REPCommand tmp = cmds.remove(top); - sortedCmds1.add(tmp); - prevEid = tmp.eid; - } - - return sortedCmds1; - } - - /* search cmd. ordering by min EID that is lower lowEid and min SEQ. */ - private int getPrecedence(LinkedList cmds, int lowEid) { - int cEid, cSeq; - cEid=cSeq=Integer.MAX_VALUE; - int ret=-1; - for (int i=0; icEid ) continue; - else if ( c.eid==cEid ) { - if ( c.seq>cSeq ) continue; - cSeq=c.seq; - ret = i; - } else { /* tmp.eid 0){ - mergeAgainList.add(command); - return true; - } - if(sentMergedList.size()==0) { - merge_mode=false; - } - return false; - } - - public void getMergeAgain(REPNode editor) { - LinkedList returnCommand = new LinkedList(); - for(int i = 0; i < mergeAgainList.size(); i++){ - //eid = REP.MEGE_EID - returnCommand.add(createUndo(mergeAgainList.get(mergeAgainList.size() - i -1))); - } - for(REPCommand command : mergeAgainList){ - if(command.eid == REP.MERGE_EID.id){ - returnCommand.add(command); - } - } - for(REPCommand command : mergeAgainList){ - if(command.eid == eid){ - command.eid = REP.MERGE_EID.id; - returnCommand.add(command); - } - } - mergeAgainList.clear(); - optimizedSend(editor, returnCommand); - } - - public boolean isFinished() { - if(unMergedCmds.size() > 0) return false; - if(sentMergedList.size() > 0) return false; - return true; - } - - public boolean isMerging() { - return merge_mode; - } - - public void startMerge(REPCommand cmd) { - merge_mode = true; - } - - public void mergeAck() { - } - - - -} diff -r 4b535bef903a -r aa07134fea32 rep/handler/Translator.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rep/handler/Translator.java Mon Nov 10 22:24:34 2008 +0900 @@ -0,0 +1,230 @@ +package rep.handler; + +import java.util.Collection; +import java.util.Comparator; +import java.util.LinkedList; +import java.util.List; +import java.util.TreeSet; + +import rep.REP; +import rep.REPCommand; +import rep.optimizers.REPCommandOptimizer; + +public class Translator { + public int eid; + + public REPCommandOptimizer optimizer; + private LinkedList unMergedCmds; + public LinkedList sentMergedList; + private LinkedList mergeAgainList; + boolean merge_mode = false; + + public Translator(int _eid,REPCommandOptimizer opt){ + eid = _eid; + optimizer = opt; + unMergedCmds = new LinkedList(); + mergeAgainList = new LinkedList(); + sentMergedList = new LinkedList(); + } + + /** + * New command from an editor + * The command is sent to the next editor + * @param cmd + * @return translated command. + */ + public REPCommand transSendCmd(REPCommand cmd){ + assert(cmd.eid==eid); + unMergedCmds.add(cmd); + + //マージ中にユーザから割り込みがあった場合 + if(isMerging()){ + mergeAgainList.add(cmd); + } + + return cmd; + } + /** + * My command is returned from the session ring, and START_MERGE_ACK + * is returned. At this + * stage my writeQueue is empty, our editor is waiting for me. + * Start merge process. + * @param cmd + */ + public boolean catchOwnCommand(REPNode editor){ + LinkedList output = new LinkedList(); + LinkedList cmds = new LinkedList(); + //スタック上にあるコマンドを全部undoコマンドにする + while ( !unMergedCmds.isEmpty() ){ + REPCommand cmd0 = unMergedCmds.removeLast(); + output.add( createUndo(cmd0) ); + cmds.add(cmd0); + } + + /* 必要な分だけソートして返却用のリストに追加 */ + output.addAll( sortCmds(cmds) ); + + /* 残ったコマンドも再び実行させるが、まだマージされてないのでunMergedにも入れる */ + output.addAll(cmds); + for(REPCommand c: cmds) { + output.add(c); + unMergedCmds.add(c); + } + return optimizedSend(editor,output); + } + + /** + * Sent optimized merged command list + * @param editor + * @param output + * @return if any sent commands output + */ + public boolean optimizedSend(REPNode editor, LinkedList output) { + List output1 = optimizer.optimize(output); + if (output1.size()==0) { + merge_mode = false; + return false; + } + for(REPCommand c:output1) { + REPCommand m = new REPCommand(c); + m.setEID(REP.MERGE_EID.id); + m.setSEQID(editor.seq()); + sentMergedList.add(m); + editor.send(m); + } + merge_mode = true; + return true; + } + + private REPCommand createUndo(REPCommand cmd){ + REPCommand retCmd = new REPCommand(cmd); + if (cmd.cmd==REP.REPCMD_INSERT) retCmd.cmd=REP.REPCMD_DELETE; + else if (cmd.cmd==REP.REPCMD_DELETE) retCmd.cmd=REP.REPCMD_INSERT; + return retCmd; + } + + class REPCommandComparator implements Comparator{ + + public int compare(REPCommand o1, REPCommand o2) { + + if ( o2.lineno > o1.lineno ) return 1; + else if ( o2.lineno < o1.lineno + || o2.eid > o1.eid ) + return -1; + + return 1; + } + + } + + private Collection sortCmds(LinkedList cmds) { + TreeSet sortedCmds1 = new TreeSet(new REPCommandComparator()); + int top; + int prevEid=-1; + while ( -1 != (top=getPrecedence(cmds, prevEid+1)) ){ + REPCommand tmp = cmds.remove(top); + sortedCmds1.add(tmp); + prevEid = tmp.eid; + } + + return sortedCmds1; + } + + /* search cmd. ordering by min EID that is lower lowEid and min SEQ. */ + private int getPrecedence(LinkedList cmds, int lowEid) { + int cEid, cSeq; + cEid=cSeq=Integer.MAX_VALUE; + int ret=-1; + for (int i=0; icEid ) continue; + else if ( c.eid==cEid ) { + if ( c.seq>cSeq ) continue; + cSeq=c.seq; + ret = i; + } else { /* tmp.eid 0){ + mergeAgainList.add(command); + return true; + } + if(sentMergedList.size()==0) { + merge_mode=false; + } + return false; + } + + public void getMergeAgain(REPNode editor) { + LinkedList returnCommand = new LinkedList(); + for(int i = 0; i < mergeAgainList.size(); i++){ + //eid = REP.MEGE_EID + returnCommand.add(createUndo(mergeAgainList.get(mergeAgainList.size() - i -1))); + } + for(REPCommand command : mergeAgainList){ + if(command.eid == REP.MERGE_EID.id){ + returnCommand.add(command); + } + } + for(REPCommand command : mergeAgainList){ + if(command.eid == eid){ + command.eid = REP.MERGE_EID.id; + returnCommand.add(command); + } + } + mergeAgainList.clear(); + optimizedSend(editor, returnCommand); + } + + public boolean isFinished() { + if(unMergedCmds.size() > 0) return false; + if(sentMergedList.size() > 0) return false; + return true; + } + + public boolean isMerging() { + return merge_mode; + } + + public void startMerge(REPCommand cmd) { + merge_mode = true; + } + + public void mergeAck() { + } + + + +}