changeset 39:81abceebc869

*** empty log message ***
author kono
date Mon, 25 Aug 2008 14:01:19 +0900
parents 9a0cb612f576
children 046feb56a196
files src/fdl/FDLindaServ.java src/fdl/FederatedLinda.java src/fdl/IOHandler.java src/fdl/MetaLinda.java src/fdl/PSXLindaImpl.java
diffstat 5 files changed, 86 insertions(+), 78 deletions(-) [+]
line wrap: on
line diff
--- a/src/fdl/FDLindaServ.java	Sun Aug 24 21:28:04 2008 +0900
+++ b/src/fdl/FDLindaServ.java	Mon Aug 25 14:01:19 2008 +0900
@@ -20,7 +20,7 @@
 	static final int FAIL = (-1);
 	static final int DEF_PORT = 10000;
 	public int port = DEF_PORT;
-	private AbstractSelector selector;
+	AbstractSelector selector;
 	private ServerSocketChannel ssChannel;
 	public TupleSpace tupleSpace;
 	public MetaEngine me;
--- a/src/fdl/FederatedLinda.java	Sun Aug 24 21:28:04 2008 +0900
+++ b/src/fdl/FederatedLinda.java	Mon Aug 25 14:01:19 2008 +0900
@@ -69,11 +69,19 @@
 	public PSXLinda open(String _host,int _port) 
 	throws IOException {
 		tid++;
-		PSXLindaImpl newlinda = new PSXLindaImpl(this,tid,_host,_port);
+		PSXLindaImpl newlinda = new PSXLindaImpl(this,selector,tid,_host,_port);
 		linda = newlinda.add(linda);
 		return linda;
 	}
 
+	PSXLinda openFromMetaLinda(MetaLinda metaLinda, String _host, int _port)
+			throws IOException {
+		tid++;
+		PSXLindaImpl newlinda = new PSXLindaImpl(this,metaLinda.fds.selector,tid,_host,_port);
+		linda = newlinda.add(linda);
+		return newlinda;
+	}
+	
 	public PSXReply psx_queue(PSXLinda linda, int id, ByteBuffer s, int mode, PSXCallback callback) {
 		PSXQueue c = new PSXQueue(linda,id,mode,s,callback);
 
@@ -129,12 +137,7 @@
 	public int sync(long mtimeout) 
 	throws IOException {
 		int key_num = 0;
-		while (q_top != null){
-			PSXQueue c = q_top;
-			c.send();
-			q_top = c.next;
-			qsize--;
-		}
+		queueExec();
 
 		try {
 			if (selector.select(mtimeout)>0) {
@@ -144,7 +147,8 @@
 			        try {
 			        	if (!s.isReadable()) 
 			        		throw new IOException();
-			        	chkServe((SocketChannel)s.channel());
+			        	TupleHandler handle = (TupleHandler)s.attachment();
+			        	handle.handle(s);
 			        } catch (IOException e) {
 			        	s.cancel();
 			        	System.err.println(""+s.channel()+" is closed.");
@@ -159,39 +163,24 @@
 
 		return key_num;
 	}
-	
-	// should be IOHandler.handler method.
 
-	private void chkServe(SocketChannel sock) 	throws IOException {
-		ByteBuffer command = ByteBuffer.allocate(PSX.LINDA_HEADER_SIZE);
-		command.order(ByteOrder.BIG_ENDIAN);
-		ByteBuffer data = PSX.receivePacket(sock, command);
-
-		if (debug) {
-			PSX.printCommand("chkServe:",command, data);
-		}
-
-		int rseq = command.getInt(PSX.LINDA_SEQ_OFFSET);
-		int mode = command.get(PSX.LINDA_MODE_OFFSET);
-		PSXReply r = getReply(rseq);
-		if (r==null) {
-			System.err.println("Illegal answer sequence.");
-			return;
-		}
-		r.setAnswer(mode,command,data);
-
-		if (r.callback != null ) {
-			r.callback.callback(data);
+	private void queueExec() {
+		while (q_top != null){
+			PSXQueue c = q_top;
+			c.send();
+			q_top = c.next;
+			qsize--;
 		}
 	}
-
-	private PSXReply getReply(int rseq) {
+	
+	PSXReply getReply(int rseq) {
 		Integer a;
 
 		PSXReply r = seqHash.get((a = new Integer(rseq)));
 		seqHash.remove(a);
 		return r;
 	}
+
 }
 
 /* end */
--- a/src/fdl/IOHandler.java	Sun Aug 24 21:28:04 2008 +0900
+++ b/src/fdl/IOHandler.java	Mon Aug 25 14:01:19 2008 +0900
@@ -28,7 +28,15 @@
         // 書き込み可であれば,読み込みを行う
         if (key.isReadable()) {
             try {
-				read(key);
+            	SocketChannel channel = (SocketChannel)key.channel();
+                if (ch!=channel) {
+                	System.err.println("Wrong socket on IOHandler");
+                }
+                // 読み込み用のバッファの生成
+                ByteBuffer command = ByteBuffer.allocate(PSX.LINDA_HEADER_SIZE);
+                command.order(ByteOrder.BIG_ENDIAN);
+				ByteBuffer data = PSX.receivePacket(channel,command);
+		        manager_run(key, command, data); 
 			} catch (ClosedChannelException e) {
 				key.cancel();
 				tupleSpace.hook.closeHook(key);
@@ -39,26 +47,6 @@
         }
     }
 
-     void read(SelectionKey key)
-                    throws ClosedChannelException, IOException {
-        SocketChannel channel = (SocketChannel)key.channel();
-        if (ch!=channel) {
-        	System.err.println("Wrong socket on IOHandler");
-        }
-
-        // 読み込み用のバッファの生成
-        ByteBuffer command = ByteBuffer.allocate(PSX.LINDA_HEADER_SIZE);
-        command.order(ByteOrder.BIG_ENDIAN);
-        
-        ByteBuffer data = PSX.receivePacket(channel, command);
-
-        if (debug) {
-        	PSX.printData("IOHandler:",command);
-        }	
-        manager_run(key, command, data); 
-        // assert((key.interestOps()& SelectionKey.OP_READ) !=0);
-    }
-
 	public void manager_run(SelectionKey key, ByteBuffer command, ByteBuffer data) throws IOException {
     	command.order(ByteOrder.BIG_ENDIAN); 
 	    int mode = command.get(PSX.LINDA_MODE_OFFSET);
--- a/src/fdl/MetaLinda.java	Sun Aug 24 21:28:04 2008 +0900
+++ b/src/fdl/MetaLinda.java	Mon Aug 25 14:01:19 2008 +0900
@@ -30,7 +30,7 @@
 
 	public TupleSpace ts;
 	public FDLindaServ fds;
-	public FederatedLinda fdl=null;
+	public FederatedLinda fdl=FederatedLinda.init();
 	public PSXLinda next=null;
 	private LinkedList<MetaReply> replies=new LinkedList<MetaReply>();
 
@@ -39,6 +39,11 @@
 		this.fds = fds;
 	}
 
+	public PSXLinda open(String _host,int _port) 
+	throws IOException {
+		return fdl.openFromMetaLinda(this, _host, _port);
+	}
+
 	public PSXReply in(int id) {
 		return null;
 	}
@@ -99,31 +104,24 @@
 	}
 
 	public int sync(long timeout) {
-		if (fdl!=null) {
-			try {
-				fdl.sync(timeout);
-			} catch (IOException e) {
-				e.printStackTrace();
-			}
-		}
-		fds.checkTuple(timeout); // should have done in fdl selector
+		fds.checkTuple(timeout); // fdl sync is also handled here
 		/*
 		 * r.callback() may call meta.sync() and modifies the
 		 * replies queue. Do current size of queue only. The
 		 * rest is checked on the next sync call including
 		 * the recursive case.
 		 */
-			int count = replies.size();
-			while(count-->0) {
-				MetaReply r = replies.poll();
-				// previous call back may call this sync and make 
-				// replies shorter.
-				if (r==null) break;
-				if (r.ready()) {
-				} else {
-					addReply(r);
-				}
+		int count = replies.size();
+		while(count-->0) {
+			MetaReply r = replies.poll();
+			// previous call back may call this sync and make 
+			// replies shorter.
+			if (r==null) break;
+			if (r.ready()) {
+			} else {
+				addReply(r);
 			}
+		}
 		return 0;
 	}
 
--- a/src/fdl/PSXLindaImpl.java	Sun Aug 24 21:28:04 2008 +0900
+++ b/src/fdl/PSXLindaImpl.java	Mon Aug 25 14:01:19 2008 +0900
@@ -17,7 +17,10 @@
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.ClosedChannelException;
 import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
 
 
@@ -34,7 +37,7 @@
 
  */
 
-public class PSXLindaImpl implements PSXLinda {
+public class PSXLindaImpl implements PSXLinda,TupleHandler {
 	private FederatedLinda fdl;
 	SocketChannel socketChannel;
 	public String host;
@@ -43,7 +46,7 @@
 	public PSXLinda next;
 	static final boolean debug = false;	
 
-	public PSXLindaImpl(FederatedLinda _fdl,int _mytsid,String _host,int _port) 
+	public PSXLindaImpl(FederatedLinda _fdl,Selector selector,int _mytsid,String _host,int _port) 
 	throws IOException {
 		host = _host;
 		port = _port;
@@ -70,12 +73,40 @@
 			}
 		}
 		System.err.println("Linda client connect to "+socketChannel);
-		
-		socketChannel.register(fdl.selector(), SelectionKey.OP_READ);
-
+        socketChannel.register(selector,SelectionKey.OP_READ,this);
 
 		checkConnect("PSXLinda");
 	}
+	
+
+	public void handle(SelectionKey key) throws ClosedChannelException,
+			IOException {
+		SocketChannel sock = (SocketChannel)key.channel();
+		if (sock!=socketChannel) {
+			System.err.println("wrong socket on PSXLindaImple.");
+		}
+		ByteBuffer command = ByteBuffer.allocate(PSX.LINDA_HEADER_SIZE);
+		command.order(ByteOrder.BIG_ENDIAN);
+		ByteBuffer data = PSX.receivePacket(sock, command);
+
+		if (debug) {
+			PSX.printCommand("chkServe:",command, data);
+		}
+
+		int rseq = command.getInt(PSX.LINDA_SEQ_OFFSET);
+		int mode = command.get(PSX.LINDA_MODE_OFFSET);
+		PSXReply r = fdl.getReply(rseq);
+		if (r==null) {
+			System.err.println("Illegal answer sequence.");
+			return;
+		}
+		r.setAnswer(mode,command,data);
+
+		if (r.callback != null ) {
+			r.callback.callback(data);
+		}
+	}
+
 
 	protected void finalize() {
 		if (socketChannel != null) {
@@ -152,6 +183,8 @@
 	public void send(ByteBuffer command, ByteBuffer data) {
 		PSX.send(socketChannel, command, data);
 	}
+
+
 }