# HG changeset patch # User kent # Date 1220176954 -32400 # Node ID 0498425202a48ca82a9650f17f4b2a4cc02ff637 # Parent dae90ded1bcdee08391d2762a4a7d20bc24ed62f made ChannelSimulator no extends SelectableChannelSimulator but REPSocketChannel directly diff -r dae90ded1bcd -r 0498425202a4 rep/channel/ChannelSimulator.java --- a/rep/channel/ChannelSimulator.java Sun Aug 31 19:01:03 2008 +0900 +++ b/rep/channel/ChannelSimulator.java Sun Aug 31 19:02:34 2008 +0900 @@ -6,13 +6,19 @@ import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; -public class ChannelSimulator
extends SelectableChannelSimulator
{ +public class ChannelSimulator
extends REPSocketChannel
{ protected NetworkSimulator
ns; + protected BlockingQueue
qread; + protected BlockingQueue
qwrite; + protected SelectorSimulator
writeSelector; + protected SelectorSimulator
readSelector; /** Constructors. */ public ChannelSimulator(){ - super(null); + super(null, null); ns = NetworkSimulator.
singleton(); } @@ -24,6 +30,40 @@ ret.writeSelector=readSelector; return ret; } + + /* read from Queue. */ + public P read(){ + try { + if(readSelector!=null) + synchronized (readSelector){ + return qread.take(); + } + else{ + return qread.take(); + } + } catch (InterruptedException e) { + e.printStackTrace(); + return null; + } + } + /* write to Queue. */ + public boolean write(P p){ + try { + if (writeSelector!=null) + synchronized (writeSelector){ + qwrite.put(p); + writeSelector.notifyAll(); + } + else { + qwrite.put(p); + } + return true; + } catch (InterruptedException e) { + e.printStackTrace(); + return false; + } + } + /** Connecting methods */ // for clients. @@ -31,22 +71,36 @@ return ns.connect(ip, this); } + /* accessor methods. */ + public BlockingQueue
getReadQ(){ + return qread; + } + public BlockingQueue
getWriteQ(){ + return qwrite; + } + public void createReadQ(){ + qread = new LinkedBlockingQueue
(); + } + public void createWriteQ(){ + qwrite = new LinkedBlockingQueue
(); + } + public void setWriteSelector(SelectorSimulator
_selector){ + writeSelector = _selector; + } + public ChannelSimulator
accept(){ return null; } - @Override public boolean isAcceptable() { return false; } - @Override public boolean isReadable() { synchronized (qread){ return !qread.isEmpty(); } } - @Override public boolean isWritable() { return true; } diff -r dae90ded1bcd -r 0498425202a4 rep/channel/REPSocketChannel.java --- a/rep/channel/REPSocketChannel.java Sun Aug 31 19:01:03 2008 +0900 +++ b/rep/channel/REPSocketChannel.java Sun Aug 31 19:02:34 2008 +0900 @@ -139,6 +139,7 @@ return sc.connect(semaIP); } + public SelectionKey register1(REPSelector
sel, int ops, Object att)
throws ClosedChannelException {
if(sel instanceof REPSelector) {
@@ -159,5 +160,4 @@
return sc.register(sel, ops,att);
}
-
}
\ No newline at end of file
diff -r dae90ded1bcd -r 0498425202a4 rep/channel/SelectionKeySimulator.java
--- a/rep/channel/SelectionKeySimulator.java Sun Aug 31 19:01:03 2008 +0900
+++ b/rep/channel/SelectionKeySimulator.java Sun Aug 31 19:02:34 2008 +0900
@@ -36,7 +36,7 @@
}
public void setFlag() {
- SelectableChannelSimulator> scs = (SelectableChannelSimulator>) channel;
+ ChannelSimulator> scs = (ChannelSimulator>) channel;
ready = 0;
if(scs.isAcceptable()) ready |= OP_ACCEPT;
if(scs.isReadable()) ready |= OP_READ;
diff -r dae90ded1bcd -r 0498425202a4 rep/channel/SelectorSimulator.java
--- a/rep/channel/SelectorSimulator.java Sun Aug 31 19:01:03 2008 +0900
+++ b/rep/channel/SelectorSimulator.java Sun Aug 31 19:02:34 2008 +0900
@@ -41,6 +41,29 @@
}
return selectedKeys.size();
}
+
+ @Override
+ public int select(long timeout) throws IOException {
+ selectedKeys = new HashSet