Mercurial > hg > RemoteEditor > REPSessionManager
view rep/channel/ChannelSimulator.java @ 382:4b87f89b3afd
REP Session Manager (Java version)
new structure
author | one@firefly.cr.ie.u-ryukyu.ac.jp |
---|---|
date | Mon, 10 Nov 2008 22:07:45 +0900 |
parents | c965ef2b5fd6 |
children | 2c00fa39dd84 |
line wrap: on
line source
package rep.channel; import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; import java.nio.channels.NotYetConnectedException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.LinkedList; import java.util.Queue; public class ChannelSimulator<P> extends REPSocketChannel<P>{ protected NetworkSimulator ns; protected Queue<P> readQ; protected Selector selector = new NullSelector(); protected boolean isBlocking; protected ChannelSimulator<P> otherEnd; protected SelectionKeySimulator<P> key; private String ownerName; /** Constructors. */ public ChannelSimulator(){ super(null, null); ns = NetworkSimulator.singleton(); readQ = new LinkedList<P>(); ownerName = Thread.currentThread().getName(); } /** read from Queue. * @throws IOException */ public synchronized P read() throws IOException { // We may lock selector instead of this, but it reduces // concurrency, but be careful of dead lock. P tmp; while ( (tmp=readQ.poll())==null && isBlocking ) { try { wait(); // queue is empty } catch (InterruptedException e) { throw new IOException(); } } // for write wait (we don't need this) //otherEnd.wakeup(); selector.wakeup(); return tmp; } //private synchronized void wakeup() { // notify(); //} /** write packet to other end. */ public boolean write(P p){ if (otherEnd==null) throw new NotYetConnectedException(); return otherEnd.enQ(p); } /** otherEnd Channel enqueue p to own queue using this method. */ boolean enQ(P p) { if (enQ1(p)) { // don't lock this channel while calling selector.wakeup(). // selector may lock this channel, which may cause dead lock. selector.wakeup(); return true; } return false; } protected synchronized boolean enQ1(P p){ while(true) { if (readQ.offer(p)) { notify(); // other end my wait() break; } else { // this can't happen assert(false); try { wait(); // queue is full, we have to wait here } catch (InterruptedException e) { } } } return true; } /** Connecting methods */ // for clients. public boolean connect(InetSocketAddress ip){ return ns.connect(ip, this); // ns.connectはotherEndを返した方がよいか? } public boolean connect(SocketAddress ip){ return ns.connect((InetSocketAddress)ip, this); } void setOtherEnd(ChannelSimulator<P> other){ otherEnd = other; } public ChannelSimulator<P> accept(){ return null; } public boolean isAcceptable() { return false; } public synchronized boolean isReadable() { return !readQ.isEmpty(); } public boolean isWritable() { return true; } @Override public SelectableChannel configureBlocking(boolean block) throws IOException { isBlocking = block; return this; } @SuppressWarnings("unchecked") @Override public SelectionKey keyFor(Selector selector2) { return ((SelectorSimulator) selector2).getKey(this); } @SuppressWarnings("unchecked") @Override public SelectionKey keyFor(REPSelector<?> sel) { return (SelectionKey)((SelectorSimulator) sel).getKey(this); } @SuppressWarnings("unchecked") @Override public REPSelectionKey<P> keyFor1(REPSelector<P> sel) { return (REPSelectionKey<P>)((SelectorSimulator) sel).getKey(this); } @SuppressWarnings("unchecked") @Override public SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException { SelectorSimulator<P> sel1 = (SelectorSimulator<P>) sel; selector = sel1; key = sel1.register(this, ops, att); return key; } @SuppressWarnings("unchecked") public SelectionKey register(REPSelector<P> sel, int ops, Object att) throws ClosedChannelException { selector = sel; key = (SelectionKeySimulator<P>) sel.register(this, ops, att); return key; } public boolean finishConnect() throws IOException { return otherEnd!=null; } public Socket socket() { assert(false); return null; } @Override public String getLocalHostName() { return "localhost"; // always... } public String toString(){ return "ChSim("+ownerName+")"; } public ChannelSimulator<P> newChannel() { return new ChannelSimulator<P>(); } @SuppressWarnings("unchecked") public void setOtherEnd1(ChannelSimulator<?> other) { otherEnd = (ChannelSimulator<P>) other; } public void close1() throws IOException { } }