# HG changeset patch # User kent # Date 1220176954 -32400 # Node ID 0498425202a48ca82a9650f17f4b2a4cc02ff637 # Parent dae90ded1bcdee08391d2762a4a7d20bc24ed62f made ChannelSimulator no extends SelectableChannelSimulator but REPSocketChannel directly diff -r dae90ded1bcd -r 0498425202a4 rep/channel/ChannelSimulator.java --- a/rep/channel/ChannelSimulator.java Sun Aug 31 19:01:03 2008 +0900 +++ b/rep/channel/ChannelSimulator.java Sun Aug 31 19:02:34 2008 +0900 @@ -6,13 +6,19 @@ import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; -public class ChannelSimulator

extends SelectableChannelSimulator

{ +public class ChannelSimulator

extends REPSocketChannel

{ protected NetworkSimulator

ns; + protected BlockingQueue

qread; + protected BlockingQueue

qwrite; + protected SelectorSimulator

writeSelector; + protected SelectorSimulator

readSelector; /** Constructors. */ public ChannelSimulator(){ - super(null); + super(null, null); ns = NetworkSimulator.

singleton(); } @@ -24,6 +30,40 @@ ret.writeSelector=readSelector; return ret; } + + /* read from Queue. */ + public P read(){ + try { + if(readSelector!=null) + synchronized (readSelector){ + return qread.take(); + } + else{ + return qread.take(); + } + } catch (InterruptedException e) { + e.printStackTrace(); + return null; + } + } + /* write to Queue. */ + public boolean write(P p){ + try { + if (writeSelector!=null) + synchronized (writeSelector){ + qwrite.put(p); + writeSelector.notifyAll(); + } + else { + qwrite.put(p); + } + return true; + } catch (InterruptedException e) { + e.printStackTrace(); + return false; + } + } + /** Connecting methods */ // for clients. @@ -31,22 +71,36 @@ return ns.connect(ip, this); } + /* accessor methods. */ + public BlockingQueue

getReadQ(){ + return qread; + } + public BlockingQueue

getWriteQ(){ + return qwrite; + } + public void createReadQ(){ + qread = new LinkedBlockingQueue

(); + } + public void createWriteQ(){ + qwrite = new LinkedBlockingQueue

(); + } + public void setWriteSelector(SelectorSimulator

_selector){ + writeSelector = _selector; + } + public ChannelSimulator

accept(){ return null; } - @Override public boolean isAcceptable() { return false; } - @Override public boolean isReadable() { synchronized (qread){ return !qread.isEmpty(); } } - @Override public boolean isWritable() { return true; } diff -r dae90ded1bcd -r 0498425202a4 rep/channel/REPSocketChannel.java --- a/rep/channel/REPSocketChannel.java Sun Aug 31 19:01:03 2008 +0900 +++ b/rep/channel/REPSocketChannel.java Sun Aug 31 19:02:34 2008 +0900 @@ -139,6 +139,7 @@ return sc.connect(semaIP); } + public SelectionKey register1(REPSelector

sel, int ops, Object att) throws ClosedChannelException { if(sel instanceof REPSelector) { @@ -159,5 +160,4 @@ return sc.register(sel, ops,att); } - } \ No newline at end of file diff -r dae90ded1bcd -r 0498425202a4 rep/channel/SelectionKeySimulator.java --- a/rep/channel/SelectionKeySimulator.java Sun Aug 31 19:01:03 2008 +0900 +++ b/rep/channel/SelectionKeySimulator.java Sun Aug 31 19:02:34 2008 +0900 @@ -36,7 +36,7 @@ } public void setFlag() { - SelectableChannelSimulator scs = (SelectableChannelSimulator) channel; + ChannelSimulator scs = (ChannelSimulator) channel; ready = 0; if(scs.isAcceptable()) ready |= OP_ACCEPT; if(scs.isReadable()) ready |= OP_READ; diff -r dae90ded1bcd -r 0498425202a4 rep/channel/SelectorSimulator.java --- a/rep/channel/SelectorSimulator.java Sun Aug 31 19:01:03 2008 +0900 +++ b/rep/channel/SelectorSimulator.java Sun Aug 31 19:02:34 2008 +0900 @@ -41,6 +41,29 @@ } return selectedKeys.size(); } + + @Override + public int select(long timeout) throws IOException { + selectedKeys = new HashSet(); + + synchronized(this) { + if (!wakeFlag){ + for(SelectionKey key : keyList){ + if(((SelectionKeySimulator) key).isAble()) + selectedKeys.add(key); + } + + if(selectedKeys.isEmpty()) + try { + this.wait(timeout); + } catch (InterruptedException e) { + throw new IOException("Error, Selector was interrupted!"); + } + } + wakeFlag=false; + } + return selectedKeys.size(); + } @Override public int selectNow() throws IOException { @@ -108,11 +131,6 @@ return null; } - @Override - public int select(long timeout) throws IOException { - // TODO Auto-generated method stub - return 0; - } @Override public Selector wakeup() {