view rep/channel/ChannelSimulator.java @ 234:0498425202a4

made ChannelSimulator no extends SelectableChannelSimulator but REPSocketChannel directly
author kent
date Sun, 31 Aug 2008 19:02:34 +0900
parents 4c0a94836357
children e72e0eae1261
line wrap: on
line source

package rep.channel;

import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class ChannelSimulator<P> extends REPSocketChannel<P>{
	protected NetworkSimulator<P> ns;
	protected BlockingQueue<P> qread;
	protected BlockingQueue<P> qwrite;
	protected SelectorSimulator<P> writeSelector;
	protected SelectorSimulator<P> readSelector;

	/**  Constructors. */
	public ChannelSimulator(){
		super(null, null);
		ns = NetworkSimulator.<P>singleton();
	}

	public ChannelSimulator<P> createConjugatedChannel() {
		ChannelSimulator<P> ret = new ChannelSimulator<P>();
		ret.qread=qwrite;
		ret.qwrite=qread;
		ret.readSelector=writeSelector;
		ret.writeSelector=readSelector;
		return ret;
	}
	
	/* read from Queue.  */
	public P read(){
		try {
			if(readSelector!=null)
				synchronized (readSelector){
					return qread.take();
				}
			else{
				return qread.take();
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
			return null;
		}
	}
	/* write to Queue.  */
	public boolean write(P p){
		try {
			if (writeSelector!=null)
				synchronized (writeSelector){
					qwrite.put(p);
					writeSelector.notifyAll();
				}
			else {
				qwrite.put(p);
			}
			return true;
		} catch (InterruptedException e) {
			e.printStackTrace();
			return false;
		}
	}


	/** Connecting methods */
	// for clients.
	public boolean connect(SocketAddress ip){
		return ns.connect(ip, this);
	}

	/* accessor methods.  */
	public BlockingQueue<P> getReadQ(){
		return qread;
	}
	public BlockingQueue<P> getWriteQ(){
		return qwrite;
	}
	public void createReadQ(){
		qread = new LinkedBlockingQueue<P>();
	}
	public void createWriteQ(){
		qwrite = new LinkedBlockingQueue<P>();
	}
	public void setWriteSelector(SelectorSimulator<P> _selector){
		writeSelector = _selector; 
	}

	
	public ChannelSimulator<P> accept(){
		return null;
	}

	public boolean isAcceptable() {
		return false;
	}
	public boolean isReadable() {
		synchronized (qread){ 
		return !qread.isEmpty();
		}
	}
	public boolean isWritable() {
		return true;
	}
	@Override
	public SelectableChannel configureBlocking(boolean block) throws IOException {
		return null;
	}


	@SuppressWarnings("unchecked")
	public SelectionKey keyFor(Selector selector2) {
		return ((SelectorSimulator) selector2).getKey(this);
	}
	
	@SuppressWarnings("unchecked")
	@Override
	public SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException {
		SelectorSimulator<P> selector = (SelectorSimulator<P>) sel;
		return selector.register(this, ops, att);
	}
	
	public SelectionKey register(REPSelector<P> sel, int ops, Object att) throws ClosedChannelException {
		return sel.register(this, ops, att);
	}
	

}