changeset 31:846c6c14cf04

worked?
author kono
date Fri, 22 Aug 2008 14:48:41 +0900
parents fca6eec8016f
children 7e0f6f00763e
files src/fdl/ComDebug_Client.java src/fdl/FederatedLinda.java src/fdl/IOHandler.java src/fdl/MetaLinda.java src/fdl/MetaLogEngine.java src/fdl/MetaReply.java src/fdl/PSX.java src/fdl/TupleSpace.java
diffstat 8 files changed, 79 insertions(+), 76 deletions(-) [+]
line wrap: on
line diff
--- a/src/fdl/ComDebug_Client.java	Thu Aug 21 18:46:40 2008 +0900
+++ b/src/fdl/ComDebug_Client.java	Fri Aug 22 14:48:41 2008 +0900
@@ -76,12 +76,12 @@
 				}
 				psx.in(65535, new MyCallBack(psx)); 
 
-				System.out.println("COM_DEBUG Connected.["+host+":"+port+"]");
+				System.err.println("COM_DEBUG Connected.["+host+":"+port+"]");
 				psx.out(PSX.META_MONITOR, nullBuffer);
 				debugCallback = 
 					new PSXCallback() {
 					public void callback(ByteBuffer reply) {
-						System.out.println(PSX.getdataString(reply));
+						System.err.println("COM_DEBUG: "+PSX.getdataString(reply));
 						psx.out(PSX.META_MONITOR, nullBuffer);
 						psx.in(PSX.META_MONITOR_DATA,debugCallback); 
 					}
--- a/src/fdl/FederatedLinda.java	Thu Aug 21 18:46:40 2008 +0900
+++ b/src/fdl/FederatedLinda.java	Fri Aug 22 14:48:41 2008 +0900
@@ -20,6 +20,7 @@
 import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
 import java.util.Hashtable;
+import java.util.Iterator;
 import java.util.Set;
 
 
@@ -129,8 +130,6 @@
 	public int sync(long mtimeout) 
 	throws IOException {
 		int key_num = 0;
-		Set<SelectionKey> keys;
-
 		while (q_top != null){
 			PSXQueue c = q_top;
 			c.send();
@@ -139,11 +138,12 @@
 		}
 
 		try {
-			key_num = selector.select(mtimeout);
-			keys = selector.selectedKeys();
-			for (SelectionKey key : keys) {
-				SocketChannel sock = (SocketChannel)key.channel();
-				chkServe(sock);
+			if (selector.select(mtimeout)>0) {
+		      for (Iterator<SelectionKey> it = selector.selectedKeys().iterator();it.hasNext(); ) {
+			        SelectionKey s = it.next();
+			        it.remove();
+					chkServe((SocketChannel)s.channel());
+				}
 			}
 		} catch (IOException e) {
 			e.printStackTrace();
@@ -159,36 +159,37 @@
 		int length;
 		ByteBuffer command = ByteBuffer.allocate(PSX.LINDA_HEADER_SIZE);
 		command.order(ByteOrder.BIG_ENDIAN);
+		PSX.receive(sock, command, PSX.LINDA_HEADER_SIZE);
 
-		sock.read(command);
-		command.rewind();
 		length =  command.getInt(PSX.LINDA_DATA_LENGTH_OFFSET);
-		if (length>0) {
-			ByteBuffer data = ByteBuffer.allocate(length);
-			int read = length;
-			if (debug) {
-				System.out.print("client reading:");
-				System.out.println(length);
-			}
+		ByteBuffer data = ByteBuffer.allocate(length);
+		int read = length;
+		if (debug) {
+			System.out.print("client reading:");
+			System.out.println(length);
+		}
+
+		data.order(ByteOrder.BIG_ENDIAN);
+		while(read>0) {
+			read -= sock.read(data);
+		}
+		data.rewind();
 
-			data.order(ByteOrder.BIG_ENDIAN);
-			while(read>0) {
-				read -= sock.read(data);
-			}
-			data.rewind();
+		if (debug) {
+			PSX.printCommand("chkServe:",command, data);
+		}
 
-			if (debug) {
-				PSX.printCommand("chkServe:",command, data);
-			}
+		int rseq = command.getInt(PSX.LINDA_SEQ_OFFSET);
+		int mode = command.get(PSX.LINDA_MODE_OFFSET);
+		PSXReply r = getReply(rseq);
+		if (r==null) {
+			System.err.println("Illegal answer sequence.");
+			return;
+		}
+		r.setAnswer(mode,command,data);
 
-			int rseq = command.getInt(PSX.LINDA_SEQ_OFFSET);
-			int mode = command.get(PSX.LINDA_MODE_OFFSET);
-			PSXReply r = getReply(rseq);
-			r.setAnswer(mode,command,data);
-
-			if (r.callback != null ) {
-				r.callback.callback(data);
-			}
+		if (r.callback != null ) {
+			r.callback.callback(data);
 		}
 	}
 
@@ -196,9 +197,6 @@
 		Integer a;
 
 		PSXReply r = seqHash.get((a = new Integer(rseq)));
-		if (r==null) {
-			System.out.println("hashed reply not found");
-		}
 		seqHash.remove(a);
 		return r;
 	}
--- a/src/fdl/IOHandler.java	Thu Aug 21 18:46:40 2008 +0900
+++ b/src/fdl/IOHandler.java	Fri Aug 22 14:48:41 2008 +0900
@@ -45,36 +45,20 @@
         command.clear();
         
         int readsize = PSX.LINDA_HEADER_SIZE;
-        int count = 0;
-        
+
         // 読み込み
-        while(readsize>0) {
-        	if(debug){
-        		System.out.println("reading command..."+readsize);
-        	}
-        	count = channel.read(command);
-        	if(count==0) throw new IOException();
-        	if(count < 0) throw new IOException();        		
-        	readsize -= count;
-    	}
-        command.rewind();
+        PSX.receive(channel, command, readsize);
                 
         command.getInt(PSX.LINDA_PACKET_LENGTH_OFFSET);
         int datalen = command.getInt(PSX.LINDA_DATA_LENGTH_OFFSET);
         
         ByteBuffer data = ByteBuffer.allocate(datalen);
-        int read = datalen;
-        
         if (debug) {
         	System.out.println("reading: " +datalen);
     	}
 
         data.order(ByteOrder.BIG_ENDIAN);
-        data.clear();
-        while(read>0) {
-        	read -= channel.read(data);
-    	}
-        data.rewind();    	    
+        PSX.receive(channel, data, datalen);
         
         command.order(ByteOrder.BIG_ENDIAN);
         command.rewind();
--- a/src/fdl/MetaLinda.java	Thu Aug 21 18:46:40 2008 +0900
+++ b/src/fdl/MetaLinda.java	Fri Aug 22 14:48:41 2008 +0900
@@ -14,6 +14,7 @@
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.LinkedList;
 
 /**
  MetaLinda 
@@ -31,8 +32,7 @@
 	public FDLindaServ fds;
 	public FederatedLinda fdl=null;
 	public PSXLinda next=null;
-	private MetaReply replies=new MetaReply(0, 0, ts);
-	private MetaReply last=replies;
+	private LinkedList<MetaReply> replies=new LinkedList<MetaReply>();
 
 	public MetaLinda(TupleSpace ts,FDLindaServ fds) {
 		this.ts = ts;
@@ -49,7 +49,7 @@
 	}
 
 	private void addReply(MetaReply r) {
-		last.next = r; last = r;
+		replies.add(r);
 	}
 
 	public PSXReply ck(int id) {
@@ -100,14 +100,14 @@
 
 	public int sync(long timeout) {
 		fds.checkTuple(timeout);
-		PSXReply r;
-		for(r=replies;r!=null&&r.next!=null;r = r.next) {
-			if (r.next.ready()) {
-				// ready() may modify replies list
-				r.next = r.next.next;
+		// copy replies to avoid insert during r.ready() 
+		LinkedList<MetaReply> list = replies;
+		replies = new LinkedList<MetaReply>();
+		for(MetaReply r:list) {
+			if (!r.ready()) {
+				addReply(r);
 			}
 		}
-		last = (MetaReply)r;
 		if (fdl!=null) {
 			try {
 				fdl.sync(timeout);
--- a/src/fdl/MetaLogEngine.java	Thu Aug 21 18:46:40 2008 +0900
+++ b/src/fdl/MetaLogEngine.java	Fri Aug 22 14:48:41 2008 +0900
@@ -37,7 +37,8 @@
 	
 	public void mainLoop() {
 		meta.in(PSX.META_MONITOR,monitor_callback_start);
-		meta.in(PSX.META_STOP, new PSXCallback() {public void callback(ByteBuffer reply) { running = false;}});
+		meta.in(PSX.META_STOP, new PSXCallback() {public void callback(ByteBuffer reply) {
+			running = false;}});
 		while(running) 
 			meta.sync();
 	}
--- a/src/fdl/MetaReply.java	Thu Aug 21 18:46:40 2008 +0900
+++ b/src/fdl/MetaReply.java	Fri Aug 22 14:48:41 2008 +0900
@@ -48,6 +48,7 @@
 			}
 			break;
 		case PSX.PSX_OUT:
+			command=PSX.setCommand(PSX.PSX_OUT, id, 0, data.remaining());
 			ts.Out(null, command, data);
 			return true;
 		case PSX.PSX_UPDATE:
--- a/src/fdl/PSX.java	Thu Aug 21 18:46:40 2008 +0900
+++ b/src/fdl/PSX.java	Fri Aug 22 14:48:41 2008 +0900
@@ -199,6 +199,21 @@
 	    	return (host +":"+port);
 	    }
 	}
+
+	static void receive(SocketChannel channel, ByteBuffer command, int readsize)
+			throws IOException {
+		int count;
+		while(readsize>0) {
+	    	if(IOHandler.debug){
+	    		System.out.println("reading packet..."+readsize);
+	    	}
+	    	count = channel.read(command);
+	    	if(count==0) throw new IOException();
+	    	if(count < 0) throw new IOException();        		
+	    	readsize -= count;
+		}
+	    command.rewind();
+	}
     
 
 }
--- a/src/fdl/TupleSpace.java	Thu Aug 21 18:46:40 2008 +0900
+++ b/src/fdl/TupleSpace.java	Fri Aug 22 14:48:41 2008 +0900
@@ -153,6 +153,9 @@
 		if (tuple!=null) {
 		//send
 			ByteBuffer sendcommand = tuple.getCommand();
+			if (tuple.getSeq()==0) {
+				System.err.println("Illegal sequence in answer.");
+			}
 			ByteBuffer senddata = tuple.getData();
 			PSX.send(key,sendcommand, senddata);
 		}
@@ -170,7 +173,7 @@
 		id = (int)idc;
 		int seq = command.getInt(PSX.LINDA_SEQ_OFFSET);
 		command.rewind();
-		
+
 		if (debug) System.out.println("*** "+(char)mode+" command : id = "+ id +" ***\n");    		
 		hook.inHook(key,id,seq,(char)mode);
 
@@ -183,14 +186,18 @@
 		}
 		
 		if (tuple != null && (tuple.mode == 'o')){
+			tuple.seq = seq;
 			tuple = tupleIsAvailable(command, mode, tuple, id, temp);
 		} else {
-			tuple = setupWait(key, command, mode, tuple, id);
+			tuple = setupWait(key, command, mode, tuple, seq, id);
 		}
 		return tuple;
 	}
 
 	public ByteBuffer IN(int id,int mode, ByteBuffer command) {
+		/**
+		 * IN for MetaLinda (no wait);
+		 */
 		Tuple tuple,temp=null;
 		tuple = tuple_space[id];
 
@@ -202,6 +209,7 @@
 
 		if (tuple != null && (tuple.mode == 'o')){
 			ByteBuffer data = tuple.data;
+			tuple.seq = 0;
 			tupleIsAvailable(command, mode, tuple, id, temp);
 			return data;
 		} 
@@ -210,9 +218,7 @@
 	
 	private Tuple tupleIsAvailable(ByteBuffer command, int mode, Tuple tuple,
 			int id, Tuple temp) {
-		int seq = command.getInt(PSX.LINDA_SEQ_OFFSET);
-		command.rewind();
-		tuple.setCommand('a', seq);
+		tuple.setCommand('a', tuple.seq);
 
 		if(debug){
 			int sendsize = tuple.getdataLength()+PSX.LINDA_HEADER_SIZE;
@@ -231,9 +237,8 @@
 	}
 
 	private Tuple setupWait(SelectionKey key, ByteBuffer command, int mode,
-			Tuple tuple, int id) {
+			Tuple tuple, int seq, int id) {
 		if(tuple == null) {
-			//ServerSocketChannel sc = (ServerSocketChannel)key.channel();
 			tuple = tuple_space[id] = new Tuple((SocketChannel)key.channel());
 			tuple.next = null;
 		}else {
@@ -244,16 +249,15 @@
 		}
 		
 		tuple.setMode(mode);
-		int seq2 = command.getInt(PSX.LINDA_SEQ_OFFSET);
 		command.rewind();
-		tuple.setSeq(seq2);
+		tuple.setSeq(seq);
 		tuple.ch = (SocketChannel) key.channel();
 		ByteBuffer buff = ByteBuffer.allocate(0);
 		tuple.setData(buff);
 		tuple = null;
 			
 		if(debug){
-			System.out.println("data inserted insert seq = "+seq2 +", id = "+id);
+			System.out.println("wait inserted seq = "+seq +", id = "+id);
 		}
 		return tuple;
 	}