# HG changeset patch # User kono # Date 1223125954 -32400 # Node ID c5be84d53c7f8c8c312bee5347ae7f52f7292861 # Parent e4b7af3fdf99e6383bcfc2883984f2cb809aac48 *** empty log message *** diff -r e4b7af3fdf99 -r c5be84d53c7f rep/Editor.java --- a/rep/Editor.java Sat Oct 04 22:12:16 2008 +0900 +++ b/rep/Editor.java Sat Oct 04 22:12:34 2008 +0900 @@ -22,6 +22,7 @@ private List writeQueue; private REPCommand quit2 = null; private REPLogger ns = REPLogger.singleton(); + private final int limit=100; public Editor(){ this(true); @@ -71,6 +72,7 @@ }else{ //エディタからの新たな編集コマンド sentList.add(command); + assert(sentList.size() channel) { diff -r e4b7af3fdf99 -r c5be84d53c7f rep/SessionManager.java --- a/rep/SessionManager.java Sat Oct 04 22:12:16 2008 +0900 +++ b/rep/SessionManager.java Sat Oct 04 22:12:34 2008 +0900 @@ -34,6 +34,8 @@ int cmd; kind of command int sid; session ID : uniqu to editing file int eid; editor ID : owner editor ID = 1。Session に対して unique + -1 session manager command + -2 merge command int seqno; Sequence number : sequence number はエディタごとに管理 int lineno; line number int textsize; textsize : bytesize @@ -46,14 +48,16 @@ private SessionManagerGUI gui; private REPSelector selector; private SessionManagerList smList; - private String myHost; private List editorList; // editorList は、sessionList に入っているeditorとは別なeditorのlistらしい。 private String maxHost; private List waitingCommandInMerge; - private BlockingQueue waitingQueue = new LinkedBlockingQueue();; - private static int temp_port; - private static int send_port; + REPHandler normalHandler = new REPHandlerImpl(this); + REPHandler handlerInMerge =new REPHandlerInMerge(this); + private BlockingQueue waitingEventQueue = new LinkedBlockingQueue();; + private String myHost; + private static int receive_port; + private static int parent_port; static final int DEFAULT_PORT = 8766; @@ -76,31 +80,28 @@ ssc.socket().setReuseAddress(true); //getAllByNameで取れた全てのアドレスに対してbindする ssc.socket().bind(new InetSocketAddress(port)); - ssc.register(selector, SelectionKey.OP_ACCEPT, new REPHandlerImpl(-1, this)); + ssc.register(selector, SelectionKey.OP_ACCEPT, normalHandler); sessionList = new LinkedList(); smList = new SessionManagerList(); editorList = new LinkedList(); waitingCommandInMerge = new LinkedList(); - //デフォルトのSessionを作っておく(テスト用に?) - //if(sessionList.size() > 0) System.out.println("Error : SessionManager.init():"); - //Session defaultSession = new Session(sessionList.size(), "DefaultSession.txt", new Editor(0,null)); - //sessionList.add(defaultSession); } public void mainLoop() throws IOException { while(true){ SessionManagerEvent e; - while((e = waitingQueue.poll())!=null){ + while((e = waitingEventQueue.poll())!=null){ e.exec(); } for(Session s:sessionList) { for(Editor editor: s.getEditorList()) if (editor.doWaitingWrite()) break; } - if(checkSend()){ + // if there are waiting command during merge operation, do process it + if(checkWaitingCommandInMerge()){ if(selector.selectNow() > 0){ select(); } @@ -111,12 +112,18 @@ } } - private boolean checkSend() throws IOException { + /** + * Check waiting command in merge + * @return true if there is a processed waiting command + * @throws IOException + */ + private boolean checkWaitingCommandInMerge() throws IOException { for(Iterator it = waitingCommandInMerge.iterator(); it.hasNext();){ PacketSet p = it.next(); - if(p.getEditor().isMerging()) { + if(p.getEditor().isMerging()) { // still merging do nothing continue; }else{ + // process one command and return true manage(p.channel, p.command); it.remove(); return true; @@ -125,7 +132,6 @@ return false; } - @SuppressWarnings("unchecked") private void select() throws IOException { Set> keys = selector.selectedKeys1(); @@ -142,10 +148,10 @@ handler.handle(key); } catch (ClosedChannelException x) { key.cancel(); - handler.cancel((REPSocketChannel)key.channel()); + handler.cancel(key.channel1()); } catch (IOException x) { key.cancel(); - handler.cancel((REPSocketChannel)key.channel()); + handler.cancel( key.channel1()); } } } @@ -156,7 +162,7 @@ return; } channel.configureBlocking(false); - REPHandler handler = new REPHandlerImpl(-1, this); + REPHandler handler = normalHandler; channel.register(selector, ops, handler); } @@ -390,6 +396,7 @@ session.translate(channel, receivedCommand); boolean newState = editor.isMerging(); if (old!=newState) { + // prevEditor なのは変だと思うが... Editor prevEditor = session.getPrevEditor(editor); //マージ中のエディタはコマンドを受け取らない // この代入は状態が変わったときだけ行えば良い。毎回、new するのは変 @@ -435,12 +442,12 @@ private void setNormalState(REPSocketChannel channel, int sid) { SelectionKey key = channel.keyFor(selector); - key.attach(new REPHandlerImpl(sid, this)); + key.attach(normalHandler); } private void setMergeState(REPSocketChannel channel, int sid) { SelectionKey key = channel.keyFor(selector); - key.attach(new REPHandlerInMerge(sid, this)); + key.attach(handlerInMerge); } private Editor getEditor(String hostport) { @@ -478,7 +485,7 @@ } private void setMyHostName(String localHostName) { - myHost = localHostName + temp_port; + myHost = localHostName + receive_port; if(maxHost == null) { maxHost = myHost; } @@ -500,8 +507,8 @@ port = Integer.parseInt(args[0]); port_s = Integer.parseInt(args[1]); } - temp_port = port; - send_port = port_s; + receive_port = port; + parent_port = port_s; SessionManager sm = new SessionManager(); sm.init(port,new SessionManagerGUIimpl(sm)); @@ -510,7 +517,7 @@ public void connectSession(String host) { int port = DEFAULT_PORT; - port = send_port; + port = parent_port; InetSocketAddress addr = new InetSocketAddress(host, port); try { REPSocketChannel sessionchannel = REPSocketChannel.create(new REPCommandPacker()); @@ -600,14 +607,14 @@ public void buttonPressed(SessionManagerEvent event) { try { - waitingQueue.put(event); + waitingEventQueue.put(event); } catch (InterruptedException e) {} selector.wakeup(); } public void syncExec(SessionManagerEvent event) { try { - waitingQueue.put(event); + waitingEventQueue.put(event); } catch (InterruptedException e) { } } diff -r e4b7af3fdf99 -r c5be84d53c7f rep/channel/ChannelSimulator.java --- a/rep/channel/ChannelSimulator.java Sat Oct 04 22:12:16 2008 +0900 +++ b/rep/channel/ChannelSimulator.java Sat Oct 04 22:12:34 2008 +0900 @@ -35,7 +35,7 @@ P tmp; while ( (tmp=readQ.poll())==null && isBlocking ) { try { - wait(); + wait(); // queue is empty } catch (InterruptedException e) { } } @@ -63,7 +63,7 @@ break; } else { try { - wait(); + wait(); // queue is full, we have to wait here } catch (InterruptedException e) { } } diff -r e4b7af3fdf99 -r c5be84d53c7f rep/channel/NetworkSimulator.java --- a/rep/channel/NetworkSimulator.java Sat Oct 04 22:12:16 2008 +0900 +++ b/rep/channel/NetworkSimulator.java Sat Oct 04 22:12:34 2008 +0900 @@ -63,7 +63,7 @@ sd0.scs.enQ(CHserver); logger.writeLog("connected", 1); - printAllState(); + //printAllState(); return true; } return false; diff -r e4b7af3fdf99 -r c5be84d53c7f rep/channel/NullSelector.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rep/channel/NullSelector.java Sat Oct 04 22:12:34 2008 +0900 @@ -0,0 +1,56 @@ +package rep.channel; + +import java.io.IOException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.spi.SelectorProvider; +import java.util.Set; + +public class NullSelector extends Selector { + + @Override + public void close() throws IOException { + + } + + @Override + public boolean isOpen() { + return false; + } + + @Override + public Set keys() { + return null; + } + + @Override + public SelectorProvider provider() { + return null; + } + + @Override + public int select() throws IOException { + return 0; + } + + @Override + public int select(long timeout) throws IOException { + return 0; + } + + @Override + public int selectNow() throws IOException { + return 0; + } + + @Override + public Set selectedKeys() { + return null; + } + + @Override + public Selector wakeup() { + return this; + } + +} diff -r e4b7af3fdf99 -r c5be84d53c7f rep/channel/REPSelectionKey.java --- a/rep/channel/REPSelectionKey.java Sat Oct 04 22:12:16 2008 +0900 +++ b/rep/channel/REPSelectionKey.java Sat Oct 04 22:12:34 2008 +0900 @@ -35,6 +35,22 @@ return rsc; } + @SuppressWarnings("unchecked") + public REPSocketChannel

channel1() { + assert (!REPServerSocketChannel.isSimulation); + SelectableChannel sc = key.channel(); + REPSocketChannel

rsc = (REPSocketChannel

) REPSocketChannel.channels.get(sc); + return rsc; + } + + @SuppressWarnings("unchecked") + public REPServerSocketChannel

serverSocketChannel() { + assert (!REPServerSocketChannel.isSimulation); + SelectableChannel sc = key.channel(); + REPServerSocketChannel

rsc = (REPServerSocketChannel

) REPSocketChannel.channels.get(sc); + return rsc; + } + @Override public int interestOps() { return key.interestOps(); diff -r e4b7af3fdf99 -r c5be84d53c7f rep/channel/SelectableChannelSimulator.java --- a/rep/channel/SelectableChannelSimulator.java Sat Oct 04 22:12:16 2008 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,75 +0,0 @@ -package rep.channel; - -import java.nio.channels.SocketChannel; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -public abstract class SelectableChannelSimulator

extends REPSocketChannel

{ - - protected BlockingQueue

qread; - protected BlockingQueue

qwrite; - protected SelectorSimulator

writeSelector; - protected SelectorSimulator

readSelector; - - public SelectableChannelSimulator(SocketChannel channel) { - super(channel, null); - } - - /* read from Queue. */ - public P read(){ - try { - if(readSelector!=null) - synchronized (readSelector){ - return qread.take(); - } - else{ - return qread.take(); - } - } catch (InterruptedException e) { - e.printStackTrace(); - return null; - } - } - /* write to Queue. */ - public boolean write(P p){ - try { - if (writeSelector!=null) - synchronized (writeSelector){ - qwrite.put(p); - writeSelector.notifyAll(); - } - else { - qwrite.put(p); - } - return true; - } catch (InterruptedException e) { - e.printStackTrace(); - return false; - } - } - public abstract ChannelSimulator

accept(); - - /* accessor methods. */ - public BlockingQueue

getReadQ(){ - return qread; - } - public BlockingQueue

getWriteQ(){ - return qwrite; - } - public void createReadQ(){ - qread = new LinkedBlockingQueue

(); - } - public void createWriteQ(){ - qwrite = new LinkedBlockingQueue

(); - } - public void setWriteSelector(SelectorSimulator

_selector){ - writeSelector = _selector; - } - - - /* return state of the Queue */ - abstract public boolean isReadable(); - abstract public boolean isWritable(); - abstract public boolean isAcceptable(); - -} diff -r e4b7af3fdf99 -r c5be84d53c7f rep/channel/SelectionKeySimulator.java --- a/rep/channel/SelectionKeySimulator.java Sat Oct 04 22:12:16 2008 +0900 +++ b/rep/channel/SelectionKeySimulator.java Sat Oct 04 22:12:34 2008 +0900 @@ -49,6 +49,18 @@ return channel; } + @SuppressWarnings("unchecked") + @Override + public REPSocketChannel

channel1() { + return (REPSocketChannel

)channel; + } + + @SuppressWarnings("unchecked") + @Override + public REPServerSocketChannel

serverSocketChannel() { + return (REPServerSocketChannel

)channel; + } + public SelectableChannel channel(REPPack

packer) { return channel; } @@ -93,12 +105,10 @@ @Override public int readyOps() { int ops=0; - //if ( channel instanceof SelectableChannelSimulator){ if ( channel instanceof ServerChannelSimulator ){ ServerChannelSimulator scs = (ServerChannelSimulator) channel; ops = ( OP_ACCEPT * (scs.isAcceptable()? 1:0) ); - } - if ( channel instanceof ChannelSimulator ){ + } else if ( channel instanceof ChannelSimulator ){ ChannelSimulator scs = (ChannelSimulator) channel; ops = ( OP_READ * (scs.isReadable()? 1:0) ) | ( OP_WRITE * (scs.isWritable()? 1:0) ); diff -r e4b7af3fdf99 -r c5be84d53c7f rep/channel/SelectorSimulator.java --- a/rep/channel/SelectorSimulator.java Sat Oct 04 22:12:16 2008 +0900 +++ b/rep/channel/SelectorSimulator.java Sat Oct 04 22:12:34 2008 +0900 @@ -10,72 +10,57 @@ import java.util.Set; public class SelectorSimulator

extends REPSelector

{ - + + // access to these set have to be synchronized private Set keyList; private Set selectedKeys; - private boolean wakeFlag=false; - + public SelectorSimulator() { super(null); keyList = new HashSet(); } - public int select() throws IOException { - selectedKeys = new HashSet(); - - synchronized(this) { - - while(selectedKeys.isEmpty() && !wakeFlag){ - for(SelectionKey key : keyList){ - if(((SelectionKeySimulator) key).isAble()) - selectedKeys.add(key); + public synchronized int select() throws IOException { + while(true) { + getSelectedKeys(); + if(selectedKeys.isEmpty()) { + try { + this.wait(); + } catch (InterruptedException e) { + throw new IOException("Error, Selector was interrupted!"); } - - if(selectedKeys.isEmpty()) - try { - this.wait(); - } catch (InterruptedException e) { - throw new IOException("Error, Selector was interrupted!"); - } - } - wakeFlag=false; - } - return selectedKeys.size(); - } - - @Override - public int select(long timeout) throws IOException { - selectedKeys = new HashSet(); - - synchronized(this) { - if (!wakeFlag){ - for(SelectionKey key : keyList){ - if(((SelectionKeySimulator) key).isAble()) - selectedKeys.add(key); - } - - if(selectedKeys.isEmpty()) - try { - this.wait(timeout); - } catch (InterruptedException e) { - throw new IOException("Error, Selector was interrupted!"); - } - } - wakeFlag=false; + } else + break; } return selectedKeys.size(); } @Override - public int selectNow() throws IOException { + public synchronized int select(long timeout) throws IOException { + getSelectedKeys(); + if(selectedKeys.isEmpty()) { + try { + this.wait(timeout); + // we cannot know if we timeouted or not + getSelectedKeys(); + } catch (InterruptedException e) { + throw new IOException("Error, Selector was interrupted!"); + } + } + return selectedKeys.size(); + } + + private void getSelectedKeys() { selectedKeys = new HashSet(); - - synchronized(this) { - for(SelectionKey key : keyList){ - if(((SelectionKeySimulator) key).isAble()) - selectedKeys.add(key); - } + for(SelectionKey key : keyList){ + if(((SelectionKeySimulator) key).isAble()) + selectedKeys.add(key); } + } + + @Override + public synchronized int selectNow() throws IOException { + getSelectedKeys(); return selectedKeys.size(); } @@ -85,29 +70,31 @@ public SelectionKeySimulator

register(SelectableChannel cs, int opt, Object handler){ SelectionKeySimulator

key = new SelectionKeySimulator

(cs, opt, this); key.attach(handler); - keyList.add(key); + synchronized(this) { + keyList.add(key); + } return key; } - - public void deregister(SelectableChannel channel) { + + public synchronized void deregister(SelectableChannel channel) { for(Iterator it = keyList.iterator();it.hasNext();) { if(it.next().channel() == channel) it.remove(); } } - + - public Set> selectedKeys1() { - Set keys = keyList; - Set> newKeys = new HashSet>(); - for(SelectionKey k: keys) { - // REPSelectionKeyを生成しないように注意 - newKeys.add(new SelectionKeySimulator

(k)); - } - return newKeys;//(Set>)newKeys; + public synchronized Set> selectedKeys1() { + Set keys = keyList; + Set> newKeys = new HashSet>(); + for(SelectionKey k: keys) { + // REPSelectionKeyを生成しないように注意 + newKeys.add(new SelectionKeySimulator

(k)); + } + return newKeys;//(Set>)newKeys; } - - public SelectionKey getKey(ChannelSimulator channel){ + + public synchronized SelectionKey getKey(ChannelSimulator channel){ for(SelectionKey key : keyList){ if(key.channel() == channel) return key; @@ -118,7 +105,7 @@ @Override public void close() throws IOException { // TODO Auto-generated method stub - + } @Override @@ -139,16 +126,13 @@ @Override - public Selector wakeup() { - synchronized(this){ - this.notifyAll(); - } + public synchronized Selector wakeup() { + notifyAll(); return this; } @Override - public Set selectedKeys() { - // TODO Auto-generated method stub + public synchronized Set selectedKeys() { return (Set)selectedKeys; } diff -r e4b7af3fdf99 -r c5be84d53c7f rep/handler/REPHandlerImpl.java --- a/rep/handler/REPHandlerImpl.java Sat Oct 04 22:12:16 2008 +0900 +++ b/rep/handler/REPHandlerImpl.java Sat Oct 04 22:12:34 2008 +0900 @@ -17,10 +17,8 @@ public void handle(REPSelectionKey key) throws IOException { REPSocketChannel channel = key.channel1(); - System.out.println("REPHandlerImpl.handle() : channel = " + channel); - REPCommand command = channel.read(); - System.out.println("REPHandlerImpl.handle() : command = " + command); + System.out.println("REPHandlerImpl.handle() read : command = " + command +" from "+channel); manager.manage(channel, command); } diff -r e4b7af3fdf99 -r c5be84d53c7f rep/handler/REPHandlerInMerge.java --- a/rep/handler/REPHandlerInMerge.java Sat Oct 04 22:12:16 2008 +0900 +++ b/rep/handler/REPHandlerInMerge.java Sat Oct 04 22:12:34 2008 +0900 @@ -10,29 +10,22 @@ public class REPHandlerInMerge implements REPHandler { private SessionManager manager; - private int sid; public REPHandlerInMerge(SessionManager manager) { this.manager = manager; } - public REPHandlerInMerge(int sid, SessionManager manager2) { - this.manager = manager2; - this.sid = sid; - } - @SuppressWarnings("unchecked") public void handle(REPSelectionKey key) throws IOException { //マージ中のエディタの前のエディタのコマンドをWaitingListに追加する - REPSocketChannel channel = (REPSocketChannel) key.channel(); + REPSocketChannel channel = key.channel1(); REPCommand command = channel.read(); System.out.println("REPHandlerImpl.handle() : command = " + command); - if(command.sid == sid){ - Editor editor = manager.getEditor(channel); - manager.addWaitingCommand(new PacketSet(channel, editor, command)); - }else{ - manager.manage(channel, command); - } + // if (manager.isMerging(command.sid()))... + // 同じchannelで、merge中のsessionは一つは限らない。 + // なので、sid をinstanceで持つのではだめ。 + Editor editor = manager.getEditor(channel); + manager.addWaitingCommand(new PacketSet(channel, editor, command)); } public void cancel(REPSocketChannel socketChannel) { diff -r e4b7af3fdf99 -r c5be84d53c7f test/channeltest/testSeMa.java --- a/test/channeltest/testSeMa.java Sat Oct 04 22:12:16 2008 +0900 +++ b/test/channeltest/testSeMa.java Sat Oct 04 22:12:34 2008 +0900 @@ -31,7 +31,6 @@ } - @SuppressWarnings("unchecked") public void run() { REPSelector selector=null; @@ -71,7 +70,7 @@ }else if(key.isReadable()){ try { - REPSocketChannel channel = (REPSocketChannel) key.channel(); + REPSocketChannel channel = key.channel1(); String packet; packet = channel.read(); if (packet==null) continue; diff -r e4b7af3fdf99 -r c5be84d53c7f test/channeltest/testSeMaSlave.java --- a/test/channeltest/testSeMaSlave.java Sat Oct 04 22:12:16 2008 +0900 +++ b/test/channeltest/testSeMaSlave.java Sat Oct 04 22:12:34 2008 +0900 @@ -33,7 +33,6 @@ } - @SuppressWarnings("unchecked") public void run() { REPSelector selector; REPSocketChannel masterCH ; @@ -68,7 +67,7 @@ }else if(key.isReadable()){ try { - REPSocketChannel channel = (REPSocketChannel) key.channel(); + REPSocketChannel channel = key.channel1(); String packet = channel.read(); if (packet==null) continue; //if (channel==masterCH){ diff -r e4b7af3fdf99 -r c5be84d53c7f test/sematest/TestEditor.java --- a/test/sematest/TestEditor.java Sat Oct 04 22:12:16 2008 +0900 +++ b/test/sematest/TestEditor.java Sat Oct 04 22:12:34 2008 +0900 @@ -9,6 +9,7 @@ import rep.REPCommand; import rep.REPCommandPacker; import rep.channel.REPLogger; +import rep.channel.REPSelectionKey; import rep.channel.REPSelector; import rep.channel.REPSocketChannel; import test.Text; @@ -124,8 +125,11 @@ syncText(); // send the master editor buffer to clients. } userInput(); - } else { - handle(channel.read()); + } + // selector(timeout) returns 0, but it may contain readable channel.. + for(REPSelectionKey key : selector.selectedKeys1()) { + REPSocketChannel ch = key.channel1(); + handle(ch.read()); } } } @@ -199,6 +203,7 @@ } private void handle(REPCommand cmd) { + if (cmd==null) return; ns.writeLog(name +": read "+cmd); switch(cmd.cmd) { case REPCMD_INSERT :