view rep/channel/ChannelSimulator.java @ 308:c5be84d53c7f channel-simulator-update **INVALID**

*** empty log message ***
author kono
date Sat, 04 Oct 2008 22:12:34 +0900
parents e4b7af3fdf99
children f27c8551e877
line wrap: on
line source

package rep.channel;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.NotYetConnectedException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.LinkedList;
import java.util.Queue;

public class ChannelSimulator<P> extends REPSocketChannel<P>{
	protected NetworkSimulator<P> ns;
	protected Queue<P> readQ;
	protected Selector selector = new NullSelector();
	protected boolean isBlocking;
	protected ChannelSimulator<P> otherEnd;
	protected SelectionKeySimulator<P> key;
	private String ownerName;

	/**  Constructors. */
	public ChannelSimulator(){
		super(null, null);
		ns = NetworkSimulator.<P>singleton();
		readQ = new LinkedList<P>();
		ownerName = Thread.currentThread().getName();
	}

	/** read from Queue.  */
	public synchronized P read() {
		// We may lock selector instead of this, but it reduces
		// concurrency.
		P tmp;
		while ( (tmp=readQ.poll())==null && isBlocking ) {
			try {
				wait(); // queue is empty
			} catch (InterruptedException e) {
			}
		}
		// for write wait
		otherEnd.wakeup(); selector.wakeup();
		return tmp;
	}
	
	private synchronized void wakeup() {
		notify();
	}

	/** write packet to other end.  */
	public boolean write(P p){
		if (otherEnd==null) throw new NotYetConnectedException();
		return otherEnd.enQ(p);
	}
	
	/** otherEnd Channel enqueue p to own queue using this method. */
	protected synchronized boolean enQ(P p){
		while(true) {
			if (readQ.offer(p)) {
				notify(); // other end my wait()
				selector.wakeup(); // selector may wait
				break;
			} else {
				try {
					wait(); // queue is full, we have to wait here
				} catch (InterruptedException e) {
				}
			}
		}
		return true;
	}

	/** Connecting methods */
	// for clients.
	public boolean connect(InetSocketAddress ip){
		return ns.connect(ip, this); // ns.connectはotherEndを返した方がよいか?
	}
	
	public boolean connect(SocketAddress ip){
		return ns.connect((InetSocketAddress)ip, this);
	}
	
	void setOtherEnd(ChannelSimulator<P> other){
		otherEnd = other;
	}
	
	public ChannelSimulator<P> accept(){
		return null;
	}

	public boolean isAcceptable() {
		return false;
	}
	public boolean isReadable() {
		synchronized (selector){ 
			return !readQ.isEmpty();
		}
	}
	public boolean isWritable() {
		return true;
	}
	@Override
	public SelectableChannel configureBlocking(boolean block) throws IOException {
		isBlocking = block;
		return this;
	}


	@SuppressWarnings("unchecked")
	public SelectionKey keyFor(Selector selector2) {
		return ((SelectorSimulator) selector2).getKey(this);
	}

	@SuppressWarnings("unchecked")
	public SelectionKey keyFor(REPSelector<?> sel) {
		return (SelectionKey)((SelectorSimulator) sel).getKey(this);
	}
	
	@SuppressWarnings("unchecked")
	@Override
	public SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException {
		SelectorSimulator<P> sel1 = (SelectorSimulator<P>) sel;
		selector = sel1;
		key = sel1.register(this, ops, att);
		return key;
	}
	
	@SuppressWarnings("unchecked")
	public SelectionKey register(REPSelector<P> sel, int ops, Object att) throws ClosedChannelException {
		selector = sel;
		key = (SelectionKeySimulator<P>) sel.register(this, ops, att);
		return key;
	}
	
	public String toString(){
		return "ChSim("+ownerName+")";  
	}


}