Mercurial > hg > FederatedLinda
changeset 25:330fa49bc4fd
*** empty log message ***
author | kono |
---|---|
date | Wed, 20 Aug 2008 14:12:15 +0900 |
parents | 35375016b2f0 |
children | d7d70edc9c7c |
files | src/fdl/ComDebug_Client.java src/fdl/FDLindaServ.java src/fdl/FederatedLinda.java src/fdl/IOHandler.java src/fdl/MetaLinda.java src/fdl/PSX.java src/fdl/PSXLinda.java src/fdl/PSXLindaImpl.java src/fdl/PSXLindaInterface.java src/fdl/PSXQueue.java src/fdl/TupleSpace.java src/fdl/package-info.java src/fdl/test/TestPSXLinda.java |
diffstat | 13 files changed, 219 insertions(+), 208 deletions(-) [+] |
line wrap: on
line diff
--- a/src/fdl/ComDebug_Client.java Wed Aug 20 10:18:05 2008 +0900 +++ b/src/fdl/ComDebug_Client.java Wed Aug 20 14:12:15 2008 +0900 @@ -17,10 +17,10 @@ PSXCallback debugCallback ; FederatedLinda fdl; - PSXLindaInterface psx; + PSXLinda psx; LinkedList<String> hosts = new LinkedList<String>(); LinkedList<Integer> ports = new LinkedList<Integer>(); - LinkedList<PSXLindaInterface> psxs = new LinkedList<PSXLindaInterface>(); + LinkedList<PSXLinda> psxs = new LinkedList<PSXLinda>(); public static void main(String[] args) { ComDebug_Client com = new ComDebug_Client(); @@ -68,8 +68,8 @@ psx = fdl.open(host,port); psxs.add(psx); class MyCallBack implements PSXCallback { - PSXLindaInterface psx; - public MyCallBack(PSXLindaInterface psx) { + PSXLinda psx; + public MyCallBack(PSXLinda psx) { this.psx = psx; } public void callback(ByteBuffer reply) {
--- a/src/fdl/FDLindaServ.java Wed Aug 20 10:18:05 2008 +0900 +++ b/src/fdl/FDLindaServ.java Wed Aug 20 14:12:15 2008 +0900 @@ -11,6 +11,10 @@ import java.nio.channels.spi.SelectorProvider; import java.util.Iterator; +/** + * @author kono + * + */ public class FDLindaServ { static final int MAX_REQ = 1; static final int FAIL = (-1); @@ -19,6 +23,7 @@ private AbstractSelector selector; private ServerSocketChannel ssChannel; public TupleSpace tupleSpace; + public NullMetaEngine me; public static void main(final String[] args) { final String usages = "usage: FDLindaServ [-p port]"; @@ -46,7 +51,7 @@ private void mainLoop() { MetaLinda ml = new MetaLinda(tupleSpace, this); - MetaEngine me = new NullMetaEngine(ml); + me = new NullMetaEngine(ml); // MetaEngine me = new MetaEngine(ml); while(true) { me.mainLoop();
--- a/src/fdl/FederatedLinda.java Wed Aug 20 10:18:05 2008 +0900 +++ b/src/fdl/FederatedLinda.java Wed Aug 20 14:12:15 2008 +0900 @@ -45,7 +45,7 @@ public int tid; public int seq; public int qsize; - public PSXLindaInterface linda; + public PSXLinda linda; public Selector selector; @@ -66,10 +66,10 @@ seqHash = new Hashtable<Integer, PSXReply>(); } - public PSXLindaInterface open(String _host,int _port) + public PSXLinda open(String _host,int _port) throws IOException { tid++; - PSXLindaInterface newlinda = new PSXLinda(this,tid,_host,_port); + PSXLindaImpl newlinda = new PSXLindaImpl(this,tid,_host,_port); linda = newlinda.add(linda); return linda; } @@ -167,7 +167,7 @@ ByteBuffer data = ByteBuffer.allocate(length); int read = length; if (debug) { - System.out.print("reading:"); + System.out.print("client reading:"); System.out.println(length); } @@ -178,7 +178,7 @@ data.rewind(); if (debug) { - PSX.printCommand(command, data); + PSX.printCommand("chkServe:",command, data); } int rseq = command.getInt(PSX.LINDA_SEQ_OFFSET);
--- a/src/fdl/IOHandler.java Wed Aug 20 10:18:05 2008 +0900 +++ b/src/fdl/IOHandler.java Wed Aug 20 14:12:15 2008 +0900 @@ -80,7 +80,7 @@ command.rewind(); if (debug) { - PSX.printData(command); + PSX.printData("IOHandler:",command); } manager_run(key, command, data); // assert((key.interestOps()& SelectionKey.OP_READ) !=0);
--- a/src/fdl/MetaLinda.java Wed Aug 20 10:18:05 2008 +0900 +++ b/src/fdl/MetaLinda.java Wed Aug 20 14:12:15 2008 +0900 @@ -26,12 +26,12 @@ */ -public class MetaLinda implements PSXLindaInterface { +public class MetaLinda implements PSXLinda { public TupleSpace ts; public FDLindaServ fds; public FederatedLinda fdl=null; - public PSXLindaInterface next=null; + public PSXLinda next=null; public LinkedList<MetaReply> replies = new LinkedList<MetaReply>(); public MetaLinda(TupleSpace ts,FDLindaServ fds) { @@ -81,7 +81,7 @@ replies.add(new MetaReply(PSX.PSX_RD,id,ts,callback)); } - public PSXLindaInterface add(PSXLindaInterface linda) { + public PSXLinda add(PSXLinda linda) { next = linda; return this; } @@ -107,6 +107,10 @@ } return 0; } + + public void send(ByteBuffer command, ByteBuffer data) { + + } }
--- a/src/fdl/PSX.java Wed Aug 20 10:18:05 2008 +0900 +++ b/src/fdl/PSX.java Wed Aug 20 14:12:15 2008 +0900 @@ -63,9 +63,9 @@ static final int META_MONITOR_DATA = PRIVILEGED_ID_START+2; - static void printCommand(ByteBuffer command, ByteBuffer data) { + static void printCommand(String comment, ByteBuffer command, ByteBuffer data) { char id = (char)command.getShort(LINDA_ID_OFFSET); - System.out.println("LENGTH:"+command.getInt(LINDA_PACKET_LENGTH_OFFSET)+" "+ + System.out.println(comment+" LENGTH:"+command.getInt(LINDA_PACKET_LENGTH_OFFSET)+" "+ "MODE:"+(char)command.get(LINDA_MODE_OFFSET)+" "+ "ID:"+(int)id+" "+ "SEQ:"+command.getInt(LINDA_SEQ_OFFSET)+" "+ @@ -74,10 +74,10 @@ command.rewind(); } - static void printData(ByteBuffer command) { + static void printData(String comment,ByteBuffer command) { /*** print data ***/ char id = (char)command.getShort(LINDA_ID_OFFSET); - System.out.println("LENGTH:"+command.getInt(LINDA_PACKET_LENGTH_OFFSET)+" "+ + System.out.println(comment+" LENGTH:"+command.getInt(LINDA_PACKET_LENGTH_OFFSET)+" "+ "MODE:"+(char)command.get(LINDA_MODE_OFFSET)+" "+ "ID:"+(int)id+" "+ "SEQ:"+command.getInt(LINDA_SEQ_OFFSET)+" ");
--- a/src/fdl/PSXLinda.java Wed Aug 20 10:18:05 2008 +0900 +++ b/src/fdl/PSXLinda.java Wed Aug 20 14:12:15 2008 +0900 @@ -1,154 +1,32 @@ - -/* - * @(#)PSXLinda.java 1.1 06/04/01 - * - * Copyright 2006 Shinji KONO - * - - PSX Lidna - Trasport layer of PSX Linda library - - */ - package fdl; import java.io.IOException; -//import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.Socket; import java.nio.ByteBuffer; -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; - - -/** - PSXLinda - * - * @author Shinji Kono - * - * @param mytsid Tuple Space ID - - Initialize connection channel for a tuple space - - one instance for each Tuple space connection - - */ - -public class PSXLinda implements PSXLindaInterface { - private FederatedLinda fdl; - SocketChannel socketChannel; - public String host; - public int port; - public int mytsid; - public PSXLindaInterface next; - static final boolean debug = false; - public PSXLinda(FederatedLinda _fdl,int _mytsid,String _host,int _port) - throws IOException { - Socket socket; - host = _host; - port = _port; - mytsid = _mytsid; - fdl = _fdl; - - socketChannel = SocketChannel.open(); - socketChannel.configureBlocking(false); - - socket = socketChannel.socket(); - // socket.setReuseAddress(true); Client side don't need this. - socket.setTcpNoDelay(true); - - socketChannel.connect(new InetSocketAddress(_host, _port)); - while (! socketChannel.finishConnect()) { - if (debug) { - System.out.println("waiting for connect"); - } - if (false) { - try { - wait(2000); - } catch (InterruptedException e) { - } - } - } - - socketChannel.register(fdl.selector(), SelectionKey.OP_READ); +public interface PSXLinda { - checkConnect("PSXLinda"); - } + public PSXReply in(int id) ; - protected void finalize() { - if (socketChannel != null) { - try { - socketChannel.close(); - } catch (IOException e) { - } - } - } + public void in(int id, PSXCallback callback); + + public PSXReply ck(int id) ; - private void checkConnect(String s) { - System.out.print("Connected:"); - System.out.print(s); - System.out.print(": "); - System.out.println(socketChannel.isConnected()); - } - - public PSXReply in(int id) { - PSXReply r = fdl.psx_queue(this, id, null, PSX.PSX_IN, (PSXCallback)null); - return r; - } + public void ck(int id, PSXCallback callback) ; - public void in(int id, PSXCallback callback) { - fdl.psx_queue(this, id, null, PSX.PSX_IN, callback); - } + public PSXReply out(int id, ByteBuffer data) ; - public PSXReply ck(int id) { - PSXReply r = fdl.psx_queue(this, id, null, PSX.PSX_IN, null); - return r; - } - - public void ck(int id, PSXCallback callback) { - fdl.psx_queue(this, id, null, PSX.PSX_IN, callback); - } + public PSXReply update(int id, ByteBuffer data) ; - public PSXReply out(int id, ByteBuffer data) { - PSXReply r = fdl.psx_queue(this, id, data, PSX.PSX_OUT, null); - return r; - } + public void update(int id, ByteBuffer data,PSXCallback callback) ; - public PSXReply update(int id, ByteBuffer data) { - PSXReply r = fdl.psx_queue(this, id, data, PSX.PSX_UPDATE, null); - return r; - } + public PSXReply rd(int id) ; - public void update(int id, ByteBuffer data,PSXCallback callback) { - fdl.psx_queue(this, id, data, PSX.PSX_UPDATE, callback); - } - - public PSXReply rd(int id) { - PSXReply r = fdl.psx_queue(this, id, null, PSX.PSX_RD, null); - return r; - } + public void rd(int id, PSXCallback callback) ; - public void rd(int id, PSXCallback callback) { - fdl.psx_queue(this, id, null, PSX.PSX_RD, callback); - } - - public PSXLindaInterface add(PSXLindaInterface linda) { - next = linda; - return this; - } + public PSXLinda add(PSXLinda linda) ; - public int sync() - throws IOException { - return fdl.sync(); - } + public int sync() throws IOException ; - public int sync(long mtime) - throws IOException { - return fdl.sync(mtime); - } + public int sync(long mtime) throws IOException ; } - - -/* end */
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/PSXLindaImpl.java Wed Aug 20 14:12:15 2008 +0900 @@ -0,0 +1,158 @@ + +/* + * @(#)PSXLinda.java 1.1 06/04/01 + * + * Copyright 2006 Shinji KONO + * + + PSX Lidna + Trasport layer of PSX Linda library + + */ + +package fdl; + +import java.io.IOException; +//import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; + + +/** + PSXLinda + * + * @author Shinji Kono + * + * @param mytsid Tuple Space ID + + Initialize connection channel for a tuple space + + one instance for each Tuple space connection + + */ + +public class PSXLindaImpl implements PSXLinda { + private FederatedLinda fdl; + SocketChannel socketChannel; + public String host; + public int port; + public int mytsid; + public PSXLinda next; + static final boolean debug = false; + + public PSXLindaImpl(FederatedLinda _fdl,int _mytsid,String _host,int _port) + throws IOException { + Socket socket; + host = _host; + port = _port; + mytsid = _mytsid; + fdl = _fdl; + + socketChannel = SocketChannel.open(); + socketChannel.configureBlocking(false); + + socket = socketChannel.socket(); + // socket.setReuseAddress(true); Client side don't need this. + socket.setTcpNoDelay(true); + + socketChannel.connect(new InetSocketAddress(_host, _port)); + while (! socketChannel.finishConnect()) { + if (debug) { + System.out.println("waiting for connect"); + } + if (false) { + try { + wait(2000); + } catch (InterruptedException e) { + } + } + } + + socketChannel.register(fdl.selector(), SelectionKey.OP_READ); + + + checkConnect("PSXLinda"); + } + + protected void finalize() { + if (socketChannel != null) { + try { + socketChannel.close(); + } catch (IOException e) { + } + } + } + + private void checkConnect(String s) { + System.out.print("Connected:"); + System.out.print(s); + System.out.print(": "); + System.out.println(socketChannel.isConnected()); + } + + public PSXReply in(int id) { + PSXReply r = fdl.psx_queue(this, id, null, PSX.PSX_IN, (PSXCallback)null); + return r; + } + + public void in(int id, PSXCallback callback) { + fdl.psx_queue(this, id, null, PSX.PSX_IN, callback); + } + + public PSXReply ck(int id) { + PSXReply r = fdl.psx_queue(this, id, null, PSX.PSX_IN, null); + return r; + } + + public void ck(int id, PSXCallback callback) { + fdl.psx_queue(this, id, null, PSX.PSX_IN, callback); + } + + public PSXReply out(int id, ByteBuffer data) { + PSXReply r = fdl.psx_queue(this, id, data, PSX.PSX_OUT, null); + return r; + } + + public PSXReply update(int id, ByteBuffer data) { + PSXReply r = fdl.psx_queue(this, id, data, PSX.PSX_UPDATE, null); + return r; + } + + public void update(int id, ByteBuffer data,PSXCallback callback) { + fdl.psx_queue(this, id, data, PSX.PSX_UPDATE, callback); + } + + public PSXReply rd(int id) { + PSXReply r = fdl.psx_queue(this, id, null, PSX.PSX_RD, null); + return r; + } + + public void rd(int id, PSXCallback callback) { + fdl.psx_queue(this, id, null, PSX.PSX_RD, callback); + } + + public PSXLinda add(PSXLinda linda) { + next = linda; + return this; + } + + public int sync() + throws IOException { + return fdl.sync(); + } + + public int sync(long mtime) + throws IOException { + return fdl.sync(mtime); + } + + public void send(ByteBuffer command, ByteBuffer data) { + PSX.send(socketChannel, command, data); + } +} + + +/* end */
--- a/src/fdl/PSXLindaInterface.java Wed Aug 20 10:18:05 2008 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,32 +0,0 @@ -package fdl; - -import java.io.IOException; -import java.nio.ByteBuffer; - -public interface PSXLindaInterface { - - - public PSXReply in(int id) ; - - public void in(int id, PSXCallback callback); - - public PSXReply ck(int id) ; - - public void ck(int id, PSXCallback callback) ; - - public PSXReply out(int id, ByteBuffer data) ; - - public PSXReply update(int id, ByteBuffer data) ; - - public void update(int id, ByteBuffer data,PSXCallback callback) ; - - public PSXReply rd(int id) ; - - public void rd(int id, PSXCallback callback) ; - - public PSXLindaInterface add(PSXLindaInterface linda) ; - - public int sync() throws IOException ; - - public int sync(long mtime) throws IOException ; -}
--- a/src/fdl/PSXQueue.java Wed Aug 20 10:18:05 2008 +0900 +++ b/src/fdl/PSXQueue.java Wed Aug 20 14:12:15 2008 +0900 @@ -56,7 +56,7 @@ } public void send() { - PSX.send(linda.socketChannel, command, data); + linda.send(command, data); } }
--- a/src/fdl/TupleSpace.java Wed Aug 20 10:18:05 2008 +0900 +++ b/src/fdl/TupleSpace.java Wed Aug 20 14:12:15 2008 +0900 @@ -60,7 +60,7 @@ datasize = command.getInt(PSX.LINDA_DATA_LENGTH_OFFSET); command.rewind(); - System.out.println("*** out command : id = "+id +" ***"); + if (debug) System.out.println("*** out command : id = "+id +" ***"); int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); command.rewind(); hook.outHook(key,id,seq,'o',data); @@ -171,7 +171,7 @@ int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); command.rewind(); - System.out.println("*** "+(char)mode+" command : id = "+ id +" ***\n"); + if (debug) System.out.println("*** "+(char)mode+" command : id = "+ id +" ***\n"); hook.inHook(key,id,seq,(char)mode); tuple = tuple_space[id];
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/package-info.java Wed Aug 20 14:12:15 2008 +0900 @@ -0,0 +1,11 @@ +/** + * Asynchronous Federated Linda Servers and Clients Library. + * <p> + * Linda server provides Tuple space, which is accessed by Linda clients. + * Multiple Linda servers are connected by Protocol Engines. A Linda client + * handles multiple Linda server at once. All communications are handled + * asynchronous way, that is, it does not wait for the completion. + * + * @since 1.0 + */ +package fdl;
--- a/src/fdl/test/TestPSXLinda.java Wed Aug 20 10:18:05 2008 +0900 +++ b/src/fdl/test/TestPSXLinda.java Wed Aug 20 14:12:15 2008 +0900 @@ -18,7 +18,7 @@ import java.nio.ByteBuffer; import fdl.FederatedLinda; -import fdl.PSXLindaInterface; +import fdl.PSXLinda; import fdl.PSXReply; @@ -38,7 +38,7 @@ public static void main (String args[]) { FederatedLinda fdl; - PSXLindaInterface psx; + PSXLinda psx; String host; int port = 10000; PSXReply r; @@ -49,23 +49,14 @@ localAddress = new InetSocketAddress(InetAddress.getLocalHost(), port); host = localAddress.getHostName(); } catch (UnknownHostException e) { - // TODO Auto-generated catch block host = "localhost"; } - - // try { - // port = Integer.parseInt(args[1]); - // } catch (NumberFormatException nfex) { } try { fdl = FederatedLinda.init(); psx = fdl.open(host,port); r = psx.in(65535); - //for(int i=0;i<100;i++) { - //if (1==0) { - //} fdl.sync(1); - //} System.out.println("Connected."); ByteBuffer data = ByteBuffer.allocate(10); @@ -80,15 +71,11 @@ psx.sync(); System.out.println("Waiting...."+(cnt++)); } - } catch (IOException nfex) { - nfex.printStackTrace(); - System.out.println("Faild."); - return; + + print_id(r); + } catch (IOException e) { + System.err.println("Communication failure."); } - - print_id(r); - - } public static void print_id (PSXReply ans) {