view rep/channel/ChannelSimulator.java @ 286:30c993e89286

TestEditor
author kono
date Sun, 28 Sep 2008 15:41:42 +0900
parents 90965a3bd4f3
children e4b7af3fdf99
line wrap: on
line source

package rep.channel;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.NotYetConnectedException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.LinkedList;
import java.util.Queue;

public class ChannelSimulator<P> extends REPSocketChannel<P>{
	protected NetworkSimulator<P> ns;
	protected Queue<P> readQ;
	protected Object lock;
	protected boolean isBlocking;
	protected ChannelSimulator<P> otherEnd;
	protected SelectionKeySimulator<P> key;
	private String ownerName;
	//protected BlockingQueue<P> qread;
	//protected BlockingQueue<P> qwrite;
	//protected SelectorSimulator<P> writeSelector;
	//protected SelectorSimulator<P> readSelector;

	/**  Constructors. */
	public ChannelSimulator(){
		super(null, null);
		ns = NetworkSimulator.<P>singleton();
		readQ = new LinkedList<P>();
		lock = new Object();
		ownerName = Thread.currentThread().getName();
	}

	/** read from Queue.  */
	public P read() throws IOException{
		P tmp;
		synchronized (lock){
			while ( (tmp=readQ.poll())==null && isBlocking ) {
				try {
					lock.wait();
				} catch (InterruptedException e) {
					throw new IOException();
				}
			}
		}
		return tmp;
	}
	/** write packet to other end.  */
	public boolean write(P p){
		if (otherEnd==null) throw new NotYetConnectedException();
		return otherEnd.enQ(p);
	}
	/** otherEnd Channel enqueue p to own queue using this method. */
	protected boolean enQ(P p){
		synchronized (lock){
			readQ.offer(p);
			lock.notifyAll();
		}
		return true;
	}

	/** Connecting methods */
	// for clients.
	public boolean connect(InetSocketAddress ip){
		return ns.connect(ip, this); // ns.connectはotherEndを返した方がよいか?
	}
	
	public boolean connect(SocketAddress ip){
		return ns.connect((InetSocketAddress)ip, this);
	}
	
	void setOtherEnd(ChannelSimulator<P> other){
		otherEnd = other;
	}
	
	public ChannelSimulator<P> accept(){
		return null;
	}

	public boolean isAcceptable() {
		return false;
	}
	public boolean isReadable() {
		synchronized (lock){ 
			return !readQ.isEmpty();
		}
	}
	public boolean isWritable() {
		return true;
	}
	@Override
	public SelectableChannel configureBlocking(boolean block) throws IOException {
		isBlocking = block;
		return this;
	}


	@SuppressWarnings("unchecked")
	public SelectionKey keyFor(Selector selector2) {
		return ((SelectorSimulator) selector2).getKey(this);
	}

	@SuppressWarnings("unchecked")
	public SelectionKey keyFor(REPSelector<?> sel) {
		return (SelectionKey)((SelectorSimulator) sel).getKey(this);
	}
	
	@SuppressWarnings("unchecked")
	@Override
	public SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException {
		SelectorSimulator<P> selector = (SelectorSimulator<P>) sel;
		synchronized (lock){
			lock = selector;
		}
		key = selector.register(this, ops, att);
		return key;
	}
	
	@SuppressWarnings("unchecked")
	public SelectionKey register(REPSelector<P> sel, int ops, Object att) throws ClosedChannelException {
		synchronized (lock){
			lock = sel;
		}
		key = (SelectionKeySimulator<P>) sel.register(this, ops, att);
		return key;
	}
	
	public String toString(){
		return "ChSim("+ownerName+")";  
	}


}