changeset 234:0498425202a4

made ChannelSimulator no extends SelectableChannelSimulator but REPSocketChannel directly
author kent
date Sun, 31 Aug 2008 19:02:34 +0900
parents dae90ded1bcd
children a8302aa5a495
files rep/channel/ChannelSimulator.java rep/channel/REPSocketChannel.java rep/channel/SelectionKeySimulator.java rep/channel/SelectorSimulator.java
diffstat 4 files changed, 84 insertions(+), 12 deletions(-) [+]
line wrap: on
line diff
--- a/rep/channel/ChannelSimulator.java	Sun Aug 31 19:01:03 2008 +0900
+++ b/rep/channel/ChannelSimulator.java	Sun Aug 31 19:02:34 2008 +0900
@@ -6,13 +6,19 @@
 import java.nio.channels.SelectableChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
-public class ChannelSimulator<P> extends SelectableChannelSimulator<P>{
+public class ChannelSimulator<P> extends REPSocketChannel<P>{
 	protected NetworkSimulator<P> ns;
+	protected BlockingQueue<P> qread;
+	protected BlockingQueue<P> qwrite;
+	protected SelectorSimulator<P> writeSelector;
+	protected SelectorSimulator<P> readSelector;
 
 	/**  Constructors. */
 	public ChannelSimulator(){
-		super(null);
+		super(null, null);
 		ns = NetworkSimulator.<P>singleton();
 	}
 
@@ -24,6 +30,40 @@
 		ret.writeSelector=readSelector;
 		return ret;
 	}
+	
+	/* read from Queue.  */
+	public P read(){
+		try {
+			if(readSelector!=null)
+				synchronized (readSelector){
+					return qread.take();
+				}
+			else{
+				return qread.take();
+			}
+		} catch (InterruptedException e) {
+			e.printStackTrace();
+			return null;
+		}
+	}
+	/* write to Queue.  */
+	public boolean write(P p){
+		try {
+			if (writeSelector!=null)
+				synchronized (writeSelector){
+					qwrite.put(p);
+					writeSelector.notifyAll();
+				}
+			else {
+				qwrite.put(p);
+			}
+			return true;
+		} catch (InterruptedException e) {
+			e.printStackTrace();
+			return false;
+		}
+	}
+
 
 	/** Connecting methods */
 	// for clients.
@@ -31,22 +71,36 @@
 		return ns.connect(ip, this);
 	}
 
+	/* accessor methods.  */
+	public BlockingQueue<P> getReadQ(){
+		return qread;
+	}
+	public BlockingQueue<P> getWriteQ(){
+		return qwrite;
+	}
+	public void createReadQ(){
+		qread = new LinkedBlockingQueue<P>();
+	}
+	public void createWriteQ(){
+		qwrite = new LinkedBlockingQueue<P>();
+	}
+	public void setWriteSelector(SelectorSimulator<P> _selector){
+		writeSelector = _selector; 
+	}
+
 	
 	public ChannelSimulator<P> accept(){
 		return null;
 	}
 
-	@Override
 	public boolean isAcceptable() {
 		return false;
 	}
-	@Override
 	public boolean isReadable() {
 		synchronized (qread){ 
 		return !qread.isEmpty();
 		}
 	}
-	@Override
 	public boolean isWritable() {
 		return true;
 	}
--- a/rep/channel/REPSocketChannel.java	Sun Aug 31 19:01:03 2008 +0900
+++ b/rep/channel/REPSocketChannel.java	Sun Aug 31 19:02:34 2008 +0900
@@ -139,6 +139,7 @@
 		return sc.connect(semaIP);
 	}
 
+
 	public SelectionKey register1(REPSelector<P> sel, int ops, Object att)
 			throws ClosedChannelException {
 		if(sel instanceof REPSelector) {
@@ -159,5 +160,4 @@
 		return sc.register(sel, ops,att);
 	}
 
-
 }
\ No newline at end of file
--- a/rep/channel/SelectionKeySimulator.java	Sun Aug 31 19:01:03 2008 +0900
+++ b/rep/channel/SelectionKeySimulator.java	Sun Aug 31 19:02:34 2008 +0900
@@ -36,7 +36,7 @@
 	}
 
 	public void setFlag() {
-		SelectableChannelSimulator<?> scs = (SelectableChannelSimulator<?>) channel;
+		ChannelSimulator<?> scs = (ChannelSimulator<?>) channel;
 		ready = 0;
 		if(scs.isAcceptable()) ready |= OP_ACCEPT;
 		if(scs.isReadable()) ready |= OP_READ;
--- a/rep/channel/SelectorSimulator.java	Sun Aug 31 19:01:03 2008 +0900
+++ b/rep/channel/SelectorSimulator.java	Sun Aug 31 19:02:34 2008 +0900
@@ -41,6 +41,29 @@
 		}
 		return selectedKeys.size();
 	}
+	
+	@Override
+	public int select(long timeout) throws IOException {
+		selectedKeys = new HashSet<SelectionKey>();
+		
+		synchronized(this) {
+			if (!wakeFlag){
+				for(SelectionKey key : keyList){
+					if(((SelectionKeySimulator<?>) key).isAble())
+						selectedKeys.add(key);
+				}
+
+				if(selectedKeys.isEmpty())
+					try {
+						this.wait(timeout);
+					} catch (InterruptedException e) {
+						throw new IOException("Error, Selector was interrupted!");
+					}
+			}
+			wakeFlag=false;
+		}
+		return selectedKeys.size();
+	}
 
 	@Override
 	public int selectNow() throws IOException {
@@ -108,11 +131,6 @@
 		return null;
 	}
 
-	@Override
-	public int select(long timeout) throws IOException {
-		// TODO Auto-generated method stub
-		return 0;
-	}
 
 	@Override
 	public Selector wakeup() {