Mercurial > hg > RemoteEditor > REPSessionManager
view rep/channel/ChannelSimulator.java @ 234:0498425202a4
made ChannelSimulator no extends SelectableChannelSimulator
but REPSocketChannel directly
author | kent |
---|---|
date | Sun, 31 Aug 2008 19:02:34 +0900 |
parents | 4c0a94836357 |
children | e72e0eae1261 |
line wrap: on
line source
package rep.channel; import java.io.IOException; import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public class ChannelSimulator<P> extends REPSocketChannel<P>{ protected NetworkSimulator<P> ns; protected BlockingQueue<P> qread; protected BlockingQueue<P> qwrite; protected SelectorSimulator<P> writeSelector; protected SelectorSimulator<P> readSelector; /** Constructors. */ public ChannelSimulator(){ super(null, null); ns = NetworkSimulator.<P>singleton(); } public ChannelSimulator<P> createConjugatedChannel() { ChannelSimulator<P> ret = new ChannelSimulator<P>(); ret.qread=qwrite; ret.qwrite=qread; ret.readSelector=writeSelector; ret.writeSelector=readSelector; return ret; } /* read from Queue. */ public P read(){ try { if(readSelector!=null) synchronized (readSelector){ return qread.take(); } else{ return qread.take(); } } catch (InterruptedException e) { e.printStackTrace(); return null; } } /* write to Queue. */ public boolean write(P p){ try { if (writeSelector!=null) synchronized (writeSelector){ qwrite.put(p); writeSelector.notifyAll(); } else { qwrite.put(p); } return true; } catch (InterruptedException e) { e.printStackTrace(); return false; } } /** Connecting methods */ // for clients. public boolean connect(SocketAddress ip){ return ns.connect(ip, this); } /* accessor methods. */ public BlockingQueue<P> getReadQ(){ return qread; } public BlockingQueue<P> getWriteQ(){ return qwrite; } public void createReadQ(){ qread = new LinkedBlockingQueue<P>(); } public void createWriteQ(){ qwrite = new LinkedBlockingQueue<P>(); } public void setWriteSelector(SelectorSimulator<P> _selector){ writeSelector = _selector; } public ChannelSimulator<P> accept(){ return null; } public boolean isAcceptable() { return false; } public boolean isReadable() { synchronized (qread){ return !qread.isEmpty(); } } public boolean isWritable() { return true; } @Override public SelectableChannel configureBlocking(boolean block) throws IOException { return null; } @SuppressWarnings("unchecked") public SelectionKey keyFor(Selector selector2) { return ((SelectorSimulator) selector2).getKey(this); } @SuppressWarnings("unchecked") @Override public SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException { SelectorSimulator<P> selector = (SelectorSimulator<P>) sel; return selector.register(this, ops, att); } public SelectionKey register(REPSelector<P> sel, int ops, Object att) throws ClosedChannelException { return sel.register(this, ops, att); } }