changeset 248:e72e0eae1261

*** empty log message ***
author kent
date Wed, 03 Sep 2008 18:44:08 +0900
parents 2a185042dcd0
children e44c1773d121
files rep/channel/ChannelSimulator.java rep/channel/NetworkSimulator.java rep/channel/ServerChannelSimulator.java test/channeltest/testNetworkSimulator.java test/channeltest/testSeMa.java test/channeltest/testSeMaSlave.java
diffstat 6 files changed, 156 insertions(+), 145 deletions(-) [+]
line wrap: on
line diff
--- a/rep/channel/ChannelSimulator.java	Wed Sep 03 17:03:41 2008 +0900
+++ b/rep/channel/ChannelSimulator.java	Wed Sep 03 18:44:08 2008 +0900
@@ -3,91 +3,71 @@
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.nio.channels.ClosedChannelException;
+import java.nio.channels.NotYetConnectedException;
 import java.nio.channels.SelectableChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.LinkedList;
+import java.util.Queue;
 
 public class ChannelSimulator<P> extends REPSocketChannel<P>{
 	protected NetworkSimulator<P> ns;
-	protected BlockingQueue<P> qread;
-	protected BlockingQueue<P> qwrite;
-	protected SelectorSimulator<P> writeSelector;
-	protected SelectorSimulator<P> readSelector;
+	protected Queue<P> readQ;
+	protected Object lock;
+	protected boolean isBlocking;
+	protected ChannelSimulator<P> otherEnd;
+	protected SelectionKeySimulator<P> key;
+	private String ownerName;
+	//protected BlockingQueue<P> qread;
+	//protected BlockingQueue<P> qwrite;
+	//protected SelectorSimulator<P> writeSelector;
+	//protected SelectorSimulator<P> readSelector;
 
 	/**  Constructors. */
 	public ChannelSimulator(){
 		super(null, null);
 		ns = NetworkSimulator.<P>singleton();
+		readQ = new LinkedList<P>();
+		lock = new Object();
+		ownerName = Thread.currentThread().getName();
 	}
 
-	public ChannelSimulator<P> createConjugatedChannel() {
-		ChannelSimulator<P> ret = new ChannelSimulator<P>();
-		ret.qread=qwrite;
-		ret.qwrite=qread;
-		ret.readSelector=writeSelector;
-		ret.writeSelector=readSelector;
-		return ret;
-	}
-	
-	/* read from Queue.  */
-	public P read(){
-		try {
-			if(readSelector!=null)
-				synchronized (readSelector){
-					return qread.take();
+	/** read from Queue.  */
+	public P read() throws IOException{
+		P tmp;
+		synchronized (lock){
+			while ( (tmp=readQ.poll())==null && isBlocking ) {
+				try {
+					lock.wait();
+				} catch (InterruptedException e) {
+					throw new IOException();
 				}
-			else{
-				return qread.take();
 			}
-		} catch (InterruptedException e) {
-			e.printStackTrace();
-			return null;
 		}
+		return tmp;
 	}
-	/* write to Queue.  */
+	/** write packet to other end.  */
 	public boolean write(P p){
-		try {
-			if (writeSelector!=null)
-				synchronized (writeSelector){
-					qwrite.put(p);
-					writeSelector.notifyAll();
-				}
-			else {
-				qwrite.put(p);
-			}
-			return true;
-		} catch (InterruptedException e) {
-			e.printStackTrace();
-			return false;
+		if (otherEnd==null) throw new NotYetConnectedException();
+		return otherEnd.enQ(p);
+	}
+	/** otherEnd Channel enqueue p to own queue using this method. */
+	protected boolean enQ(P p){
+		synchronized (lock){
+			readQ.offer(p);
+			lock.notifyAll();
 		}
+		return true;
 	}
 
-
 	/** Connecting methods */
 	// for clients.
 	public boolean connect(SocketAddress ip){
-		return ns.connect(ip, this);
-	}
-
-	/* accessor methods.  */
-	public BlockingQueue<P> getReadQ(){
-		return qread;
-	}
-	public BlockingQueue<P> getWriteQ(){
-		return qwrite;
+		return ns.connect(ip, this); // ns.connectはotherEndを返した方がよいか?
 	}
-	public void createReadQ(){
-		qread = new LinkedBlockingQueue<P>();
+	void setOtherEnd(ChannelSimulator<P> other){
+		otherEnd = other;
 	}
-	public void createWriteQ(){
-		qwrite = new LinkedBlockingQueue<P>();
-	}
-	public void setWriteSelector(SelectorSimulator<P> _selector){
-		writeSelector = _selector; 
-	}
-
 	
 	public ChannelSimulator<P> accept(){
 		return null;
@@ -97,8 +77,8 @@
 		return false;
 	}
 	public boolean isReadable() {
-		synchronized (qread){ 
-		return !qread.isEmpty();
+		synchronized (lock){ 
+			return !readQ.isEmpty();
 		}
 	}
 	public boolean isWritable() {
@@ -106,7 +86,8 @@
 	}
 	@Override
 	public SelectableChannel configureBlocking(boolean block) throws IOException {
-		return null;
+		isBlocking = block;
+		return this;
 	}
 
 
@@ -119,12 +100,25 @@
 	@Override
 	public SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException {
 		SelectorSimulator<P> selector = (SelectorSimulator<P>) sel;
-		return selector.register(this, ops, att);
+		synchronized (lock){
+			lock = selector;
+		}
+		key = selector.register(this, ops, att);
+		return key;
 	}
 	
+	@SuppressWarnings("unchecked")
 	public SelectionKey register(REPSelector<P> sel, int ops, Object att) throws ClosedChannelException {
-		return sel.register(this, ops, att);
+		synchronized (lock){
+			lock = sel;
+		}
+		key = (SelectionKeySimulator<P>) sel.register(this, ops, att);
+		return key;
 	}
 	
+	public String toString(){
+		return "ChSim("+ownerName+")";  
+	}
+
 
 }
--- a/rep/channel/NetworkSimulator.java	Wed Sep 03 17:03:41 2008 +0900
+++ b/rep/channel/NetworkSimulator.java	Wed Sep 03 18:44:08 2008 +0900
@@ -4,7 +4,6 @@
 import java.util.HashMap;
 import java.util.LinkedList;
 
-
 public class NetworkSimulator<P> {
 	public static NetworkSimulator<?> ns;
 	
@@ -37,72 +36,51 @@
 	}
 	
 	/*   */
-	synchronized public void listen(SocketAddress ip, SelectorSimulator<P> selector) {
-		serverList.add(new ServerData<P>(ip, selector));
+	synchronized public void listen(SocketAddress ip, ServerChannelSimulator<P> scs) {
+		serverList.add(new ServerData<P>(ip, scs));
 		logger.writeLog("listen", 1);
 		printAllState();
 	}
 
-	synchronized public ChannelSimulator<P> accept(SocketAddress ip) {
-		for (ServerData<P> sd: serverList){
-			if (!sd.IP.equals(ip)) continue;
-			logger.writeLog("accepting..", 1);
-
-			ChannelSimulator<P> serverCH = sd.acceptWaitingList.remove();
-			sd.establishedList.add(serverCH);
+	synchronized public boolean connect(SocketAddress ip, ChannelSimulator<P> CHclient) {
+		logger.writeLog("connecting..", 1);
+		for (ServerData<P> sd0: serverList){
+			if (!sd0.IP.equals(ip)) continue;
 
-			logger.writeLog("accepted", 1);
+			ChannelSimulator<P> CHserver = new ChannelSimulator<P>();
+			CHserver.setOtherEnd(CHclient);
+			CHclient.setOtherEnd(CHserver);
+
+			sd0.connectedListS.add(CHserver);
+			sd0.connectedListC.add(CHclient);
+			sd0.scs.enQ(CHserver);
+
+			logger.writeLog("connected", 1);
 			printAllState();
-			return serverCH;
-		}
-		return null;
-	}
-	synchronized public boolean canAccept(SocketAddress ip){
-		for (ServerData<P> sd: serverList){
-			if (!sd.IP.equals(ip)) continue;
-			return !sd.acceptWaitingList.isEmpty();
+			return true;
 		}
 		return false;
 	}
 
-	public boolean connect(SocketAddress ip, ChannelSimulator<P> clientCH) {
-		ServerData<P> sd = null;
-		logger.writeLog("connecting..", 1);
-		synchronized (this){
-			for (ServerData<P> sd0: serverList){
-				if (sd0.IP.equals(ip)){
-					sd=sd0;
-					break;
-				}
-			}
-			if (sd==null) return false;
-
-			//ChannelSimulator<P> channel = new ChannelSimulator<P>(sd.selector);
-			clientCH.createReadQ();
-			clientCH.createWriteQ();
-			clientCH.setWriteSelector(sd.selector);
-
-			ChannelSimulator<P> serverCH = clientCH.createConjugatedChannel();
-			sd.acceptWaitingList.add(serverCH);
-		}
-
-		synchronized (sd.selector) {
-			sd.selector.notifyAll();
-		}
-		logger.writeLog("connected", 1);
-		printAllState();
-		return true;
-	}
-
 	/** for DEBUG methods. */
 	synchronized void printAllState(){
+		synchronized (logger){
 		logger.writeLog("NetworkSimulator State:");
 		for (ServerData<P> sd: serverList){
 			logger.writeLog("\tSessionManager(ip="+sd.IP.toString()+"): ");
-			logger.writeLog("\tacceptWaitingList="+sd.acceptWaitingList.size());
-			logger.writeLog("\testablishedList="+sd.establishedList.size());
+			//writeLog("\tacceptWaitingList="+sd.acceptWaitingList.size());
+			printChannelList(sd.connectedListC);
+			//writeLog("\testablishedList="+sd.establishedList.size());
+		}
 		}
 	}
+	synchronized void printChannelList(LinkedList<ChannelSimulator<P>> list){
+		String tmp = "";
+		for (ChannelSimulator<P> ch: list){
+			tmp += ch.toString()+" ";
+		}
+		logger.writeLog("\t"+tmp);
+	}
 
 
 
@@ -120,14 +98,16 @@
 class ServerData<P> {
 	//int virtualIP;
 	SocketAddress IP;
-	SelectorSimulator<P> selector;
-	LinkedList<ChannelSimulator<P>> acceptWaitingList;
-	LinkedList<ChannelSimulator<P>> establishedList;
+	//SelectorSimulator<P> selector;
+	ServerChannelSimulator<P> scs;
+	LinkedList<ChannelSimulator<P>> connectedListS;
+	LinkedList<ChannelSimulator<P>> connectedListC;
 
-	ServerData(SocketAddress ip, SelectorSimulator<P> _selector){
+	ServerData(SocketAddress ip, ServerChannelSimulator<P> _scs){
 		IP = ip;
-		selector = _selector;
-		acceptWaitingList = new LinkedList<ChannelSimulator<P>>();
-		establishedList = new LinkedList<ChannelSimulator<P>>();
+		//selector = _selector;
+		scs = _scs;
+		connectedListS = new LinkedList<ChannelSimulator<P>>();
+		connectedListC = new LinkedList<ChannelSimulator<P>>();
 	}
 }
\ No newline at end of file
--- a/rep/channel/ServerChannelSimulator.java	Wed Sep 03 17:03:41 2008 +0900
+++ b/rep/channel/ServerChannelSimulator.java	Wed Sep 03 18:44:08 2008 +0900
@@ -8,26 +8,52 @@
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.spi.SelectorProvider;
+import java.util.LinkedList;
+import java.util.Queue;
 
 /* シミュレーションの際にコンストラクトされる REPServerSocketChannel の実装  */
 public class ServerChannelSimulator<P>extends REPServerSocketChannel<P> {
 	protected NetworkSimulator<P> ns;
 	//public REPServerSocket<REPSocketChannel<P>> socket;
 	protected SocketAddress IP;
+	protected Queue<ChannelSimulator<P>> acceptQ;
+	protected Object lock;
+	protected boolean isBlocking;
+	private SelectionKeySimulator<P> key;
 
 	/**  Constructors. 
 	 * @throws IOException */
 	public ServerChannelSimulator() throws IOException {
 		//socket = REPServerSocket.<REPSocketChannel<P>>create();
 		ns = NetworkSimulator.<P>singleton();
+		lock = new Object();
+		acceptQ = new LinkedList<ChannelSimulator<P>>();
 	}
 	
 	public void bind(SocketAddress ip){
 		IP = ip;
 	}
 
-	public REPSocketChannel<P> accept1() /*throws IOException*/ {
-		return ns.accept(IP);
+	public REPSocketChannel<P> accept1() throws IOException {
+		ChannelSimulator<P> tmp;
+		synchronized (lock) {
+			while ( (tmp=acceptQ.poll())==null && isBlocking ) {
+				try {
+					lock.wait();
+				} catch (InterruptedException e) {
+					throw new IOException();
+				}
+			}
+		}
+		//return ns.accept(IP);
+		return tmp;
+	}
+	protected boolean enQ(ChannelSimulator<P> ch){
+		synchronized (lock){
+			acceptQ.offer(ch);
+			lock.notifyAll();
+		}
+		return true;
 	}
 
 	public ServerSocket socket() {
@@ -41,32 +67,48 @@
 
 	
 
+	@SuppressWarnings("unchecked")
 	public SelectionKey register(REPSelector<P> sel, int ops, Object att) throws ClosedChannelException {
+		synchronized (lock) {
+			lock = sel;
+		}
 		REPSelector<P> selector = sel;
-		ns.listen(IP, (SelectorSimulator<P>) selector);
-		return selector.register(this, ops, att);
+		ns.listen(IP, this);
+		key = (SelectionKeySimulator<P>) selector.register(this, ops, att);
+		return key;
+	}
+	@SuppressWarnings("unchecked")
+	@Override
+	public SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException {
+		// TODO
+		synchronized (lock) {
+			lock = sel;
+		}
+		REPSelector<P> selector = (REPSelector<P>)sel;
+		ns.listen(IP, this); // bindに移動してもいいよ
+		key = (SelectionKeySimulator<P>) selector.register(this, ops, att);
+		return key;
 	}
 
 	public boolean isAcceptable() {
-		return ns.canAccept(IP);
+		synchronized (lock){ 
+			return !acceptQ.isEmpty();
+		}
 	}
 
 	@Override
 	public Object blockingLock() {
-		// TODO Auto-generated method stub
-		return null;
+		return lock;
 	}
 
-	public SelectableChannel configureBlocking(boolean block)
-			throws IOException {
-		// TODO Auto-generated method stub
-		return null;
+	public SelectableChannel configureBlocking(boolean block) throws IOException {
+		isBlocking = block;
+		return this;
 	}
 
 	@Override
 	public boolean isBlocking() {
-		// TODO Auto-generated method stub
-		return false;
+		return isBlocking;
 	}
 
 	@Override
@@ -87,12 +129,6 @@
 		return null;
 	}
 
-	@Override
-	public SelectionKey register(Selector sel, int ops, Object att)
-			throws ClosedChannelException {
-		// TODO Auto-generated method stub
-		return null;
-	}
 
 	@Override
 	public int validOps() {
--- a/test/channeltest/testNetworkSimulator.java	Wed Sep 03 17:03:41 2008 +0900
+++ b/test/channeltest/testNetworkSimulator.java	Wed Sep 03 18:44:08 2008 +0900
@@ -15,8 +15,8 @@
 	static public REPLogger logger = REPLogger.singleton();
 
 	public static void main(String[] args){
-		REPServerSocketChannel.isSimulation = true;
-		testNetworkSimulator testns = new testNetworkSimulator(3, 12, 90);
+		REPServerSocketChannel.isSimulation = false;
+		testNetworkSimulator testns = new testNetworkSimulator(3, 10, 90);
 		logger.setLogLevel(5);
 		
 		testns.startTest();
--- a/test/channeltest/testSeMa.java	Wed Sep 03 17:03:41 2008 +0900
+++ b/test/channeltest/testSeMa.java	Wed Sep 03 18:44:08 2008 +0900
@@ -58,7 +58,7 @@
 		while(running){
 
 			try { 
-				selector.select(2000); 
+				selector.select(); 
 				Set<REPSelectionKey<String>> set = selector.selectedKeys1();
 				for(REPSelectionKey<String> key : set) {
 
--- a/test/channeltest/testSeMaSlave.java	Wed Sep 03 17:03:41 2008 +0900
+++ b/test/channeltest/testSeMaSlave.java	Wed Sep 03 18:44:08 2008 +0900
@@ -54,7 +54,7 @@
 			/* Main Loop */
 			while(running){
 
-				selector.select(2000); 
+				selector.select(); 
 
 				for(REPSelectionKey<String> key : selector.selectedKeys1()){
 
@@ -62,7 +62,8 @@
 						REPSocketChannel<String> channel = key.accept(pack);
 						if(channel==null) continue;
 						channel.configureBlocking(false);
-						selector.register(channel, SelectionKey.OP_READ,null);
+						//selector.register(channel, SelectionKey.OP_READ,null);
+						channel.register(selector, SelectionKey.OP_READ, null);
 						ns.writeLog("accepts a client.", 1);
 
 					}else if(key.isReadable()){