view rep/channel/ChannelSimulator.java @ 382:4b87f89b3afd

REP Session Manager (Java version) new structure
author one@firefly.cr.ie.u-ryukyu.ac.jp
date Mon, 10 Nov 2008 22:07:45 +0900
parents c965ef2b5fd6
children 2c00fa39dd84
line wrap: on
line source

package rep.channel;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.NotYetConnectedException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.LinkedList;
import java.util.Queue;

public class ChannelSimulator<P> extends REPSocketChannel<P>{
	protected NetworkSimulator ns;
	protected Queue<P> readQ;
	protected Selector selector = new NullSelector();
	protected boolean isBlocking;
	protected ChannelSimulator<P> otherEnd;
	protected SelectionKeySimulator<P> key;
	private String ownerName;

	/**  Constructors. */
	public ChannelSimulator(){
		super(null, null);
		ns = NetworkSimulator.singleton();
		readQ = new LinkedList<P>();
		ownerName = Thread.currentThread().getName();
	}

	/** read from Queue.  
	 * @throws IOException */
	public synchronized P read() throws IOException {
		// We may lock selector instead of this, but it reduces
		// concurrency, but be careful of dead lock.
		P tmp;
		while ( (tmp=readQ.poll())==null && isBlocking ) {
			try {
				wait(); // queue is empty
			} catch (InterruptedException e) {
				throw new IOException();
			}
		}
		// for write wait (we don't need this)
		//otherEnd.wakeup(); selector.wakeup();
		return tmp;
	}
	
	//private synchronized void wakeup() {
	//	notify();
	//}

	/** write packet to other end.  */
	public boolean write(P p){
		if (otherEnd==null) throw new NotYetConnectedException();
		return otherEnd.enQ(p);
	}
	
	/** otherEnd Channel enqueue p to own queue using this method. */
	
	boolean enQ(P p) {
		if (enQ1(p)) {
			// don't lock this channel while calling selector.wakeup().
			// selector may lock this channel, which may cause dead lock.
			selector.wakeup();
			return true;
		}
		return false;
	}
	
	protected synchronized boolean enQ1(P p){
		while(true) {
			if (readQ.offer(p)) {
				notify(); // other end my wait()
				break;
			} else { // this can't happen
				assert(false);
				try {
					wait(); // queue is full, we have to wait here
				} catch (InterruptedException e) {
				}
			}
		}
		return true;
	}

	/** Connecting methods */
	// for clients.
	public boolean connect(InetSocketAddress ip){
		return ns.connect(ip, this); // ns.connectはotherEndを返した方がよいか?
	}
	
	public boolean connect(SocketAddress ip){
		return ns.connect((InetSocketAddress)ip, this);
	}
	
	void setOtherEnd(ChannelSimulator<P> other){
		otherEnd = other;
	}
	
	public ChannelSimulator<P> accept(){
		return null;
	}

	public boolean isAcceptable() {
		return false;
	}
	public synchronized boolean isReadable() {
		return !readQ.isEmpty();
	}
	
	public boolean isWritable() {
		return true;
	}
	
	@Override
	public SelectableChannel configureBlocking(boolean block) throws IOException {
		isBlocking = block;
		return this;
	}


	@SuppressWarnings("unchecked")
	@Override
	public SelectionKey keyFor(Selector selector2) {
		return ((SelectorSimulator) selector2).getKey(this);
	}

	@SuppressWarnings("unchecked")
	@Override
	public SelectionKey keyFor(REPSelector<?> sel) {
		return (SelectionKey)((SelectorSimulator) sel).getKey(this);
	}

	@SuppressWarnings("unchecked")
	@Override
	public REPSelectionKey<P> keyFor1(REPSelector<P> sel) {
		return (REPSelectionKey<P>)((SelectorSimulator) sel).getKey(this);
	}
	
	@SuppressWarnings("unchecked")
	@Override
	public SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException {
		SelectorSimulator<P> sel1 = (SelectorSimulator<P>) sel;
		selector = sel1;
		key = sel1.register(this, ops, att);
		return key;
	}
	
	@SuppressWarnings("unchecked")
	public SelectionKey register(REPSelector<P> sel, int ops, Object att) throws ClosedChannelException {
		selector = sel;
		key = (SelectionKeySimulator<P>) sel.register(this, ops, att);
		return key;
	}


	public boolean finishConnect() throws IOException {
		return otherEnd!=null;
	}

	public Socket socket() {
		assert(false);
		return null;
	}

	@Override
	public String getLocalHostName() {
		return "localhost"; // always...
	}
	
	public String toString(){
		return "ChSim("+ownerName+")";  
	}

	public ChannelSimulator<P> newChannel() {
		return new ChannelSimulator<P>();
	}

	@SuppressWarnings("unchecked")
	public void setOtherEnd1(ChannelSimulator<?> other) {
		otherEnd = (ChannelSimulator<P>) other;
	}

    public void close1() throws IOException {
    }
}