changeset 307:e4b7af3fdf99

*** empty log message ***
author kono
date Sat, 04 Oct 2008 22:12:16 +0900
parents ef00df38dd5d
children c5be84d53c7f
files rep/channel/ChannelSimulator.java rep/handler/REPHandlerImpl.java
diffstat 2 files changed, 37 insertions(+), 32 deletions(-) [+]
line wrap: on
line diff
--- a/rep/channel/ChannelSimulator.java	Sat Oct 04 18:16:45 2008 +0900
+++ b/rep/channel/ChannelSimulator.java	Sat Oct 04 22:12:16 2008 +0900
@@ -14,49 +14,59 @@
 public class ChannelSimulator<P> extends REPSocketChannel<P>{
 	protected NetworkSimulator<P> ns;
 	protected Queue<P> readQ;
-	protected Object lock;
+	protected Selector selector = new NullSelector();
 	protected boolean isBlocking;
 	protected ChannelSimulator<P> otherEnd;
 	protected SelectionKeySimulator<P> key;
 	private String ownerName;
-	//protected BlockingQueue<P> qread;
-	//protected BlockingQueue<P> qwrite;
-	//protected SelectorSimulator<P> writeSelector;
-	//protected SelectorSimulator<P> readSelector;
 
 	/**  Constructors. */
 	public ChannelSimulator(){
 		super(null, null);
 		ns = NetworkSimulator.<P>singleton();
 		readQ = new LinkedList<P>();
-		lock = new Object();
 		ownerName = Thread.currentThread().getName();
 	}
 
 	/** read from Queue.  */
-	public P read() throws IOException{
+	public synchronized P read() {
+		// We may lock selector instead of this, but it reduces
+		// concurrency.
 		P tmp;
-		synchronized (lock){
-			while ( (tmp=readQ.poll())==null && isBlocking ) {
-				try {
-					lock.wait();
-				} catch (InterruptedException e) {
-					throw new IOException();
-				}
+		while ( (tmp=readQ.poll())==null && isBlocking ) {
+			try {
+				wait();
+			} catch (InterruptedException e) {
 			}
 		}
+		// for write wait
+		otherEnd.wakeup(); selector.wakeup();
 		return tmp;
 	}
+	
+	private synchronized void wakeup() {
+		notify();
+	}
+
 	/** write packet to other end.  */
 	public boolean write(P p){
 		if (otherEnd==null) throw new NotYetConnectedException();
 		return otherEnd.enQ(p);
 	}
+	
 	/** otherEnd Channel enqueue p to own queue using this method. */
-	protected boolean enQ(P p){
-		synchronized (lock){
-			readQ.offer(p);
-			lock.notifyAll();
+	protected synchronized boolean enQ(P p){
+		while(true) {
+			if (readQ.offer(p)) {
+				notify(); // other end my wait()
+				selector.wakeup(); // selector may wait
+				break;
+			} else {
+				try {
+					wait();
+				} catch (InterruptedException e) {
+				}
+			}
 		}
 		return true;
 	}
@@ -83,7 +93,7 @@
 		return false;
 	}
 	public boolean isReadable() {
-		synchronized (lock){ 
+		synchronized (selector){ 
 			return !readQ.isEmpty();
 		}
 	}
@@ -110,19 +120,15 @@
 	@SuppressWarnings("unchecked")
 	@Override
 	public SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException {
-		SelectorSimulator<P> selector = (SelectorSimulator<P>) sel;
-		synchronized (lock){
-			lock = selector;
-		}
-		key = selector.register(this, ops, att);
+		SelectorSimulator<P> sel1 = (SelectorSimulator<P>) sel;
+		selector = sel1;
+		key = sel1.register(this, ops, att);
 		return key;
 	}
 	
 	@SuppressWarnings("unchecked")
 	public SelectionKey register(REPSelector<P> sel, int ops, Object att) throws ClosedChannelException {
-		synchronized (lock){
-			lock = sel;
-		}
+		selector = sel;
 		key = (SelectionKeySimulator<P>) sel.register(this, ops, att);
 		return key;
 	}
--- a/rep/handler/REPHandlerImpl.java	Sat Oct 04 18:16:45 2008 +0900
+++ b/rep/handler/REPHandlerImpl.java	Sat Oct 04 22:12:16 2008 +0900
@@ -11,17 +11,16 @@
 	private SessionManager manager;
 
 
-	public REPHandlerImpl(int sid, SessionManager manager) {
+	public REPHandlerImpl(SessionManager manager) {
 		this.manager = manager;
 	}
 
-	@SuppressWarnings("unchecked")
 	public void handle(REPSelectionKey<REPCommand> key) throws IOException {
-		REPSocketChannel<REPCommand> channel = (REPSocketChannel<REPCommand>) key.channel();
-		//System.out.println("REPHandlerImpl.handle() : channel = " + channel);
+		REPSocketChannel<REPCommand> channel = key.channel1();
+		System.out.println("REPHandlerImpl.handle() : channel = " + channel);
 		
 		REPCommand command = channel.read();
-		//System.out.println("REPHandlerImpl.handle() : command = " + command);
+		System.out.println("REPHandlerImpl.handle() : command = " + command);
 
 		manager.manage(channel, command);
 	}