changeset 314:edb373aa421e

use channel lock instead of selector lock. remove generic type from NetworkSimulator, which simplifies singleton pattern. remove lock from REPLogger.
author kono
date Mon, 06 Oct 2008 10:34:37 +0900
parents 0585fd2410b8
children 20fb70068089
files rep/channel/ChannelSimulator.java rep/channel/NetworkSimulator.java rep/channel/REPLogger.java rep/channel/SelectorSimulator.java rep/channel/ServerChannelSimulator.java test/channeltest/testNetworkSimulator.java
diffstat 6 files changed, 107 insertions(+), 90 deletions(-) [+]
line wrap: on
line diff
--- a/rep/channel/ChannelSimulator.java	Sun Oct 05 22:36:24 2008 +0900
+++ b/rep/channel/ChannelSimulator.java	Mon Oct 06 10:34:37 2008 +0900
@@ -12,7 +12,7 @@
 import java.util.Queue;
 
 public class ChannelSimulator<P> extends REPSocketChannel<P>{
-	protected NetworkSimulator<P> ns;
+	protected NetworkSimulator ns;
 	protected Queue<P> readQ;
 	protected Selector selector = new NullSelector();
 	protected boolean isBlocking;
@@ -23,20 +23,22 @@
 	/**  Constructors. */
 	public ChannelSimulator(){
 		super(null, null);
-		ns = NetworkSimulator.<P>singleton();
+		ns = NetworkSimulator.singleton();
 		readQ = new LinkedList<P>();
 		ownerName = Thread.currentThread().getName();
 	}
 
-	/** read from Queue.  */
-	public synchronized P read() {
+	/** read from Queue.  
+	 * @throws IOException */
+	public synchronized P read() throws IOException {
 		// We may lock selector instead of this, but it reduces
-		// concurrency.
+		// concurrency, but be careful of dead lock.
 		P tmp;
 		while ( (tmp=readQ.poll())==null && isBlocking ) {
 			try {
 				wait(); // queue is empty
 			} catch (InterruptedException e) {
+				throw new IOException();
 			}
 		}
 		// for write wait (we don't need this)
@@ -55,11 +57,21 @@
 	}
 	
 	/** otherEnd Channel enqueue p to own queue using this method. */
-	protected synchronized boolean enQ(P p){
+	
+	boolean enQ(P p) {
+		if (enQ1(p)) {
+			// don't lock this channel while calling selector.wakeup().
+			// selector may lock this channel, which may cause dead lock.
+			selector.wakeup();
+			return true;
+		}
+		return false;
+	}
+	
+	protected synchronized boolean enQ1(P p){
 		while(true) {
 			if (readQ.offer(p)) {
 				notify(); // other end my wait()
-				selector.wakeup(); // selector may wait
 				break;
 			} else { // this can't happen
 				assert(false);
@@ -138,5 +150,14 @@
 		return "ChSim("+ownerName+")";  
 	}
 
+	public ChannelSimulator<P> newChannel() {
+		return new ChannelSimulator<P>();
+	}
+
+	@SuppressWarnings("unchecked")
+	public void setOtherEnd1(ChannelSimulator<?> other) {
+		otherEnd = (ChannelSimulator<P>) other;
+	}
+
 
 }
--- a/rep/channel/NetworkSimulator.java	Sun Oct 05 22:36:24 2008 +0900
+++ b/rep/channel/NetworkSimulator.java	Mon Oct 06 10:34:37 2008 +0900
@@ -5,47 +5,41 @@
 import java.util.HashMap;
 import java.util.LinkedList;
 
-public class NetworkSimulator<P> {
-	public static NetworkSimulator<?> ns;
-	
+public class NetworkSimulator {
+	// we don't use <P> because we need singleton. 
+	public static NetworkSimulator ns = new NetworkSimulator();
 
 	public HashMap<SocketAddress,Integer>namedb = new HashMap<SocketAddress,Integer>();
 	public int ipcount = 1;
 	public REPLogger logger;
 	
-	@SuppressWarnings("unchecked")  // <?> から <T> へのキャストのため. 
-	public static <T> NetworkSimulator<T> singleton(){
-		// double check singleton
-		if (ns==null)
-			synchronized (NetworkSimulator.class) {
-				if (ns==null) 
-					ns = new NetworkSimulator<T>();
-			}
-		return (NetworkSimulator<T>) ns;
+	public static NetworkSimulator singleton() {
+		// singleton pattern may used here, but it has a little cost.
+		return ns;
 	}
 
 	int logLevel=5;
 	/** Listening Servers. */
-	private LinkedList<ServerData<P>> serverList;
+	private LinkedList<ServerData> serverList;
 
 	/** Constructor. */
 	public NetworkSimulator(){
-		serverList = new LinkedList<ServerData<P>>();
+		serverList = new LinkedList<ServerData>();
 		logger = REPLogger.singleton();
 		logger.writeLog("construct Networksimulator", 1);
 		// printAllState();
 	}
 	
 	/*   */
-	synchronized public void listen(InetSocketAddress ip, ServerChannelSimulator<P> scs) {
-		serverList.add(new ServerData<P>(ip, scs));
+	synchronized public void listen(InetSocketAddress ip, ServerChannelSimulator<?> scs) {
+		serverList.add(new ServerData(ip, scs));
 		logger.writeLog("listen", 1);
 		printAllState();
 	}
 
-	synchronized public boolean connect(InetSocketAddress ip, ChannelSimulator<P> CHclient) {
+	synchronized public boolean connect(InetSocketAddress ip, ChannelSimulator<?> CHclient) {
 		logger.writeLog("connecting..", 1);
-		for (ServerData<P> sd0: serverList){
+		for (ServerData sd0: serverList){
 			// ANY address (0.0.0.0/0.0.0.0) should be considered.
 			if (sd0.IP.getAddress().isAnyLocalAddress()) {
 				if (sd0.IP.getPort() != ip.getPort()) continue;
@@ -54,9 +48,10 @@
 				// use different port address. 
 			} else if (!sd0.IP.equals(ip)) continue;
 
-			ChannelSimulator<P> CHserver = new ChannelSimulator<P>();
-			CHserver.setOtherEnd(CHclient);
-			CHclient.setOtherEnd(CHserver);
+			//ChannelSimulator<?> CHserver = new ChannelSimulator<?>();
+			ChannelSimulator<?> CHserver = CHclient.newChannel();
+			CHserver.setOtherEnd1(CHclient);
+			CHclient.setOtherEnd1(CHserver);
 
 			sd0.connectedListS.add(CHserver);
 			sd0.connectedListC.add(CHclient);
@@ -72,16 +67,16 @@
 	/** for DEBUG methods. */
 	void printAllState(){
 		String log = "NetworkSimulator State:";
-		for (ServerData<P> sd: serverList){
+		for (ServerData sd: serverList){
 			log += "\tSessionManager(ip="+sd.IP.toString()+"): ";
 			log += channelList(sd.connectedListC);
 		}
 		logger.writeLog(log);
 	}
 	
-	String channelList(LinkedList<ChannelSimulator<P>> list){
+	private String channelList(LinkedList<ChannelSimulator<?>> list){
 		String tmp = "";
-		for (ChannelSimulator<P> ch: list){
+		for (ChannelSimulator<?> ch: list){
 			tmp += ch.toString()+" ";
 		}
 		return "\t"+tmp;
@@ -89,7 +84,7 @@
 
 
 
-	public int nslookup(SocketAddress semaIP) {
+	public synchronized int nslookup(SocketAddress semaIP) {
 		Integer ip;
 		if ((ip=namedb.get(semaIP))==null) {
 			namedb.put(semaIP, (ip=ipcount++));
@@ -100,19 +95,19 @@
 
 }
 
-class ServerData<P> {
+class ServerData {
 	//int virtualIP;
 	InetSocketAddress IP;
 	//SelectorSimulator<P> selector;
-	ServerChannelSimulator<P> scs;
-	LinkedList<ChannelSimulator<P>> connectedListS;
-	LinkedList<ChannelSimulator<P>> connectedListC;
+	ServerChannelSimulator<?> scs;
+	LinkedList<ChannelSimulator<?>> connectedListS;
+	LinkedList<ChannelSimulator<?>> connectedListC;
 
-	ServerData(InetSocketAddress ip, ServerChannelSimulator<P> _scs){
+	ServerData(InetSocketAddress ip, ServerChannelSimulator<?> _scs){
 		IP = ip;
 		//selector = _selector;
 		scs = _scs;
-		connectedListS = new LinkedList<ChannelSimulator<P>>();
-		connectedListC = new LinkedList<ChannelSimulator<P>>();
+		connectedListS = new LinkedList<ChannelSimulator<?>>();
+		connectedListC = new LinkedList<ChannelSimulator<?>>();
 	}
 }
\ No newline at end of file
--- a/rep/channel/REPLogger.java	Sun Oct 05 22:36:24 2008 +0900
+++ b/rep/channel/REPLogger.java	Mon Oct 06 10:34:37 2008 +0900
@@ -1,15 +1,15 @@
 package rep.channel;
 
 public class REPLogger {
-	static REPLogger single;
+	static REPLogger single = new REPLogger();
 	
 	public static REPLogger singleton(){
-		if(single==null){
-			synchronized(REPLogger.class){
-				if(single==null)
-					single = new REPLogger();
-			}
-		}
+//		if(single==null){
+//			synchronized(REPLogger.class){
+//				if(single==null)
+//					single = new REPLogger();
+//			}
+//		}
 		return single;
 	}
 	protected REPLogger(){
@@ -17,16 +17,16 @@
 
 	private int logLevel;
 	/** simulation log command */
-	synchronized public void writeLog(String log, int level){
+	public void writeLog(String log, int level){
 		if ( level<=logLevel )
 			System.out.println(Thread.currentThread().getName()+": "+log);
-		System.out.flush();
+		//System.out.flush();
 	}
 	public void writeLog(String log){
 		writeLog(log, 0);
 	}
 
-	synchronized public void setLogLevel(int logLevel) {
+	public void setLogLevel(int logLevel) {
 		this.logLevel = logLevel;
 	}
 
--- a/rep/channel/SelectorSimulator.java	Sun Oct 05 22:36:24 2008 +0900
+++ b/rep/channel/SelectorSimulator.java	Mon Oct 06 10:34:37 2008 +0900
@@ -14,6 +14,7 @@
 	// access to these set have to be synchronized
 	private Set<SelectionKey> keyList;
 	private Set<SelectionKey> selectedKeys;
+	private boolean isOpen=true;
 
 	public SelectorSimulator() {
 		super(null);
@@ -40,8 +41,8 @@
 		getSelectedKeys();
 		if(selectedKeys.isEmpty()) {
 			try {
-				this.wait(timeout);
-				// we cannot know if we timeouted or not
+				wait(timeout);
+				// we cannot know if we time outed or not
 				getSelectedKeys(); 
 			} catch (InterruptedException e) {
 				throw new IOException("Error, Selector was interrupted!");
@@ -91,7 +92,7 @@
 			// REPSelectionKeyを生成しないように注意
 			newKeys.add(new SelectionKeySimulator<P>(k));
 		}
-		return newKeys;//(Set<REPSelectionKey<P>>)newKeys;
+		return newKeys;
 	}
 
 	public synchronized <T> SelectionKey getKey(ChannelSimulator<T> channel){
@@ -104,13 +105,12 @@
 
 	@Override
 	public void close() throws IOException {
-		// TODO Auto-generated method stub
-
+		isOpen = false;
 	}
 
 	@Override
 	public boolean isOpen() {
-		return true;
+		return isOpen;
 	}
 
 	@Override
--- a/rep/channel/ServerChannelSimulator.java	Sun Oct 05 22:36:24 2008 +0900
+++ b/rep/channel/ServerChannelSimulator.java	Mon Oct 06 10:34:37 2008 +0900
@@ -13,20 +13,19 @@
 
 /* シミュレーションの際にコンストラクトされる REPServerSocketChannel の実装  */
 public class ServerChannelSimulator<P>extends REPServerSocketChannel<P> {
-	protected NetworkSimulator<P> ns;
+	protected NetworkSimulator ns;
 	//public REPServerSocket<REPSocketChannel<P>> socket;
 	protected InetSocketAddress IP;
 	protected Queue<ChannelSimulator<P>> acceptQ;
-	protected Object lock;
+	protected Selector selector;
 	protected boolean isBlocking;
 	private SelectionKeySimulator<P> key;
 
 	/**  Constructors. 
 	 * @throws IOException */
 	public ServerChannelSimulator() throws IOException {
-		//socket = REPServerSocket.<REPSocketChannel<P>>create();
-		ns = NetworkSimulator.<P>singleton();
-		lock = new Object();
+		ns = NetworkSimulator.singleton();
+		selector = new NullSelector(); // new Object();
 		acceptQ = new LinkedList<ChannelSimulator<P>>();
 	}
 	
@@ -34,28 +33,36 @@
 		IP = ip;
 	}
 
-	public REPSocketChannel<P> accept1() throws IOException {
-		ChannelSimulator<P> tmp;
-		synchronized (lock) {
-			while ( (tmp=acceptQ.poll())==null && isBlocking ) {
-				try {
-					lock.wait();
-				} catch (InterruptedException e) {
-					throw new IOException();
-				}
+	public synchronized REPSocketChannel<P> accept1() throws IOException {
+		ChannelSimulator<P> channel;
+		while ( (channel=acceptQ.poll())==null && isBlocking ) {
+			try {
+				wait();
+			} catch (InterruptedException e) {
+				throw new IOException();
 			}
 		}
-		//return ns.accept(IP);
-		return tmp;
+		return channel;
 	}
+	
 	protected boolean enQ(ChannelSimulator<P> ch){
-		synchronized (lock){
+		// Don't lock a selector from a locked channel, the selector may
+		// use channel.isAble() which locks the channel.
+		synchronized(this) {
 			acceptQ.offer(ch);
-			lock.notifyAll();
+			notify();
 		}
+		selector.wakeup();
 		return true;
 	}
 
+	@SuppressWarnings("unchecked")
+	public void enQ(ChannelSimulator<?> hserver) {
+		// NetworkSimulator doesn't know P
+		ChannelSimulator<P>ch = (ChannelSimulator<P>) hserver;
+		enQ(ch);
+	}
+	
 	public ServerSocket socket() {
 		try {
 			return  new REPServerSocket(this);
@@ -69,36 +76,29 @@
 
 	@SuppressWarnings("unchecked")
 	public SelectionKey register(REPSelector<P> sel, int ops, Object att) throws ClosedChannelException {
-		synchronized (lock) {
-			lock = sel;
-		}
-		REPSelector<P> selector = sel;
+		selector = sel;
+		REPSelector<P> selector1 = sel;
 		ns.listen(IP, this);
-		key = (SelectionKeySimulator<P>) selector.register(this, ops, att);
+		key = (SelectionKeySimulator<P>) selector1.register(this, ops, att);
 		return key;
 	}
 	@SuppressWarnings("unchecked")
 	@Override
 	public SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException {
-		// TODO
-		synchronized (lock) {
-			lock = sel;
-		}
-		REPSelector<P> selector = (REPSelector<P>)sel;
+		selector = sel;
+		REPSelector<P> selector1 = (REPSelector<P>)sel;
 		ns.listen(IP, this); // bindに移動してもいいよ
-		key = (SelectionKeySimulator<P>) selector.register(this, ops, att);
+		key = (SelectionKeySimulator<P>) selector1.register(this, ops, att);
 		return key;
 	}
 
-	public boolean isAcceptable() {
-		synchronized (lock){ 
-			return !acceptQ.isEmpty();
-		}
+	public synchronized boolean isAcceptable() {
+		return !acceptQ.isEmpty();
 	}
 
 	@Override
 	public Object blockingLock() {
-		return lock;
+		return selector;
 	}
 
 	public SelectableChannel configureBlocking(boolean block) throws IOException {
@@ -142,4 +142,5 @@
 		
 	}
 
+
 }
--- a/test/channeltest/testNetworkSimulator.java	Sun Oct 05 22:36:24 2008 +0900
+++ b/test/channeltest/testNetworkSimulator.java	Mon Oct 06 10:34:37 2008 +0900
@@ -15,7 +15,7 @@
 	static public REPLogger logger = REPLogger.singleton();
 
 	public static void main(String[] args){
-		REPServerSocketChannel.isSimulation = false;
+		REPServerSocketChannel.isSimulation = true;
 		testNetworkSimulator testns = new testNetworkSimulator(3, 10, 90);
 		logger.setLogLevel(5);