changeset 33:64071f8e2e0d

*** empty log message ***
author kono
date Sun, 24 Aug 2008 03:23:08 +0900
parents 7e0f6f00763e
children e7c5958fd285
files src/fdl/ComDebug_Client.java src/fdl/FederatedLinda.java src/fdl/IOHandler.java src/fdl/MetaLinda.java src/fdl/MetaLogEngine.java src/fdl/MetaReply.java src/fdl/NullMetaEngine.java src/fdl/PSX.java src/fdl/PSXQueue.java src/fdl/Tuple.java src/fdl/TupleSpace.java src/fdl/test/TestMetaLinda.java src/fdl/test/TestMonitor.java
diffstat 13 files changed, 97 insertions(+), 85 deletions(-) [+]
line wrap: on
line diff
--- a/src/fdl/ComDebug_Client.java	Fri Aug 22 14:49:52 2008 +0900
+++ b/src/fdl/ComDebug_Client.java	Sun Aug 24 03:23:08 2008 +0900
@@ -13,7 +13,6 @@
 
 	static int id;
 	static final boolean debug = false;
-	ByteBuffer nullBuffer = ByteBuffer.allocate(0);
 	PSXCallback debugCallback ; 
 
 	FederatedLinda fdl;
@@ -77,16 +76,16 @@
 				psx.in(65535, new MyCallBack(psx)); 
 
 				System.err.println("COM_DEBUG Connected.["+host+":"+port+"]");
-				psx.out(PSX.META_MONITOR, nullBuffer);
+				psx.out(PSX.META_MONITOR, null);
 				debugCallback = 
 					new PSXCallback() {
 					public void callback(ByteBuffer reply) {
 						System.err.println("COM_DEBUG: "+PSX.getdataString(reply));
-						psx.out(PSX.META_MONITOR, nullBuffer);
+						psx.out(PSX.META_MONITOR, null);
 						psx.in(PSX.META_MONITOR_DATA,debugCallback); 
 					}
 				};
-				psx.out(PSX.META_MONITOR, nullBuffer);
+				psx.out(PSX.META_MONITOR, null);
 				psx.in(PSX.META_MONITOR_DATA,debugCallback); 
 
 				connect_num++;
--- a/src/fdl/FederatedLinda.java	Fri Aug 22 14:49:52 2008 +0900
+++ b/src/fdl/FederatedLinda.java	Sun Aug 24 03:23:08 2008 +0900
@@ -39,7 +39,7 @@
 
 	static FederatedLinda fdl = new FederatedLinda();
 	static int MAX_SEQUENCE = 2048;
-	static boolean debug = true;
+	static boolean debug = false;
 
 	public int tid;
 	public int seq;
@@ -140,8 +140,14 @@
 		      for (Iterator<SelectionKey> it = selector.selectedKeys().iterator();it.hasNext(); ) {
 			        SelectionKey s = it.next();
 			        it.remove();
-					chkServe((SocketChannel)s.channel());
-				}
+			        try {
+			        	if (!s.isReadable()) throw new IOException();
+			        	chkServe((SocketChannel)s.channel());
+			        } catch (IOException e) {
+			        	selector.selectedKeys().remove(s);
+			        	System.err.println(""+s.channel()+" is closed.");
+			        }
+		      }
 			}
 		} catch (IOException e) {
 			e.printStackTrace();
@@ -152,26 +158,10 @@
 		return key_num;
 	}
 
-	private void chkServe(SocketChannel sock) 
-	throws IOException {
-		int length;
+	private void chkServe(SocketChannel sock) 	throws IOException {
 		ByteBuffer command = ByteBuffer.allocate(PSX.LINDA_HEADER_SIZE);
 		command.order(ByteOrder.BIG_ENDIAN);
-		PSX.receive(sock, command, PSX.LINDA_HEADER_SIZE);
-
-		length =  command.getInt(PSX.LINDA_DATA_LENGTH_OFFSET);
-		ByteBuffer data = ByteBuffer.allocate(length);
-		int read = length;
-		if (debug) {
-			System.out.print("client reading:");
-			System.out.println(length);
-		}
-
-		data.order(ByteOrder.BIG_ENDIAN);
-		while(read>0) {
-			read -= sock.read(data);
-		}
-		data.rewind();
+		ByteBuffer data = PSX.receivePacket(sock, command);
 
 		if (debug) {
 			PSX.printCommand("chkServe:",command, data);
--- a/src/fdl/IOHandler.java	Fri Aug 22 14:49:52 2008 +0900
+++ b/src/fdl/IOHandler.java	Sun Aug 24 03:23:08 2008 +0900
@@ -28,8 +28,10 @@
             try {
 				read(key);
 			} catch (ClosedChannelException e) {
+				key.cancel();
 				tupleSpace.hook.closeHook(key);
 			} catch (IOException e) {
+				key.cancel();
 				tupleSpace.hook.closeHook(key);
 			}
         }
@@ -42,26 +44,8 @@
         // 読み込み用のバッファの生成
         ByteBuffer command = ByteBuffer.allocate(PSX.LINDA_HEADER_SIZE);
         command.order(ByteOrder.BIG_ENDIAN);
-        command.clear();
         
-        int readsize = PSX.LINDA_HEADER_SIZE;
-
-        // 読み込み
-        PSX.receive(channel, command, readsize);
-                
-        command.getInt(PSX.LINDA_PACKET_LENGTH_OFFSET);
-        int datalen = command.getInt(PSX.LINDA_DATA_LENGTH_OFFSET);
-        
-        ByteBuffer data = ByteBuffer.allocate(datalen);
-        if (debug) {
-        	System.out.println("reading: " +datalen);
-    	}
-
-        data.order(ByteOrder.BIG_ENDIAN);
-        PSX.receive(channel, data, datalen);
-        
-        command.order(ByteOrder.BIG_ENDIAN);
-        command.rewind();
+        ByteBuffer data = PSX.receivePacket(channel, command);
 
         if (debug) {
         	PSX.printData("IOHandler:",command);
@@ -71,7 +55,7 @@
     }
 
 	public void manager_run(SelectionKey key, ByteBuffer command, ByteBuffer data) throws IOException {
-    	command.order(ByteOrder.BIG_ENDIAN);
+    	command.order(ByteOrder.BIG_ENDIAN); 
 	    int mode = command.get(PSX.LINDA_MODE_OFFSET);
 	    command.rewind();
 
@@ -82,8 +66,7 @@
 			tupleSpace.hook.closeHook(key); 
 		} else if(mode == PSX.PSX_CHECK) {
             tupleSpace.Check(key, command);
-	    }
-    	else if(mode == PSX.PSX_IN || mode == PSX.PSX_RD){
+	    } else if(mode == PSX.PSX_IN || mode == PSX.PSX_RD){
     		tupleSpace.In_Rd(key, command, mode); 
     	} else if (mode == PSX.PSX_WAIT_RD) {	
 			tupleSpace.Wait_Rd(key, command, mode); 			
@@ -91,7 +74,7 @@
 	    	tupleSpace.Out(key, command, data);		
 	    } else {
 	    	tupleSpace.hook.closeHook(key);
-    		System.out.println("Incorrect tuple operation");
+    		System.err.println("Incorrect tuple operation");
     		System.exit(1);
 	    }
     	
--- a/src/fdl/MetaLinda.java	Fri Aug 22 14:49:52 2008 +0900
+++ b/src/fdl/MetaLinda.java	Sun Aug 24 03:23:08 2008 +0900
@@ -100,12 +100,14 @@
 
 	public int sync(long timeout) {
 		fds.checkTuple(timeout);
-		// copy replies to avoid insert during r.ready() 
-		LinkedList<MetaReply> list = replies;
-		replies = new LinkedList<MetaReply>();
-		for(MetaReply r:list) {
-			if (!r.ready()) {
-				addReply(r);
+		if (replies.size()>0) {
+			// copy replies to avoid insert during r.ready() 
+			LinkedList<MetaReply> list = replies;
+			replies = new LinkedList<MetaReply>();
+			for(MetaReply r:list) {
+				if (!r.ready()) {
+					addReply(r);
+				}
 			}
 		}
 		if (fdl!=null) {
--- a/src/fdl/MetaLogEngine.java	Fri Aug 22 14:49:52 2008 +0900
+++ b/src/fdl/MetaLogEngine.java	Sun Aug 24 03:23:08 2008 +0900
@@ -19,8 +19,7 @@
 	PSXCallback monitor_callback_start =
 			new PSXCallback() {public void callback(ByteBuffer reply) { 
 				meta.ts.hook = commDebug = new CommDebugHook();
-				ByteBuffer data = ByteBuffer.allocate(0) ;
-				meta.out(PSX.META_MONITOR_DATA, data);
+				meta.out(PSX.META_MONITOR_DATA, null);
 				meta.in(PSX.META_MONITOR,monitor_callback);
 	}};
 	PSXCallback monitor_callback =
--- a/src/fdl/MetaReply.java	Fri Aug 22 14:49:52 2008 +0900
+++ b/src/fdl/MetaReply.java	Sun Aug 24 03:23:08 2008 +0900
@@ -48,7 +48,7 @@
 			}
 			break;
 		case PSX.PSX_OUT:
-			command=PSX.setCommand(PSX.PSX_OUT, id, 0, data.remaining());
+			command=PSX.setCommand(PSX.PSX_OUT, id, 0, data);
 			ts.Out(null, command, data);
 			return true;
 		case PSX.PSX_UPDATE:
--- a/src/fdl/NullMetaEngine.java	Fri Aug 22 14:49:52 2008 +0900
+++ b/src/fdl/NullMetaEngine.java	Sun Aug 24 03:23:08 2008 +0900
@@ -1,5 +1,7 @@
 package fdl;
 
+import java.nio.ByteBuffer;
+
 public class NullMetaEngine implements MetaEngine {
 	public MetaLinda meta;
 	public boolean running=true;
@@ -9,7 +11,9 @@
 	}
 	
 	public void mainLoop() {
+		meta.in(PSX.META_STOP, new PSXCallback() {public void callback(ByteBuffer reply) {
+			running = false;}});
 		while(running) 
-			meta.fds.checkTuple();
+			meta.sync();
 	}
 }
--- a/src/fdl/PSX.java	Fri Aug 22 14:49:52 2008 +0900
+++ b/src/fdl/PSX.java	Sun Aug 24 03:23:08 2008 +0900
@@ -65,12 +65,15 @@
 	
 	static void printCommand(String comment, ByteBuffer command, ByteBuffer data) {
 		char id = (char)command.getShort(LINDA_ID_OFFSET);
-		System.out.println(comment+" LENGTH:"+command.getInt(LINDA_PACKET_LENGTH_OFFSET)+" "+
+		System.err.println(comment+" LENGTH:"+command.getInt(LINDA_PACKET_LENGTH_OFFSET)+" "+
 				"MODE:"+(char)command.get(LINDA_MODE_OFFSET)+" "+
 				"ID:"+(int)id+" "+
 				"SEQ:"+command.getInt(LINDA_SEQ_OFFSET)+" "+
 				"DATA LENGTH:"+command.getInt(LINDA_DATA_LENGTH_OFFSET)+" ");
-		System.out.println("DATA:"+data);
+		if(data!=null) {
+			System.err.println("DATA:"+data);
+			data.rewind();
+		}
 		command.rewind();
 	}
 	
@@ -85,7 +88,8 @@
 	}
 	
 	
-	static ByteBuffer setCommand(int _mode, int _id, int _seq, int _datalen) {
+	static ByteBuffer setCommand(int _mode, int _id, int _seq, ByteBuffer data) {
+		int _datalen = data==null?0:data.remaining();
 		ByteBuffer command = ByteBuffer.allocate(LINDA_HEADER_SIZE);
 		command.order(ByteOrder.BIG_ENDIAN);
 	
@@ -116,7 +120,6 @@
 
 	public static String getdataString(ByteBuffer data) {
 		String sendtext;
-		data.rewind();
 		//Decode UTF-8 to System Encoding(UTF-16) 
 		Charset charset = Charset.forName("UTF-8");
 		CharsetDecoder decoder = charset.newDecoder();
@@ -124,9 +127,9 @@
 		try {
 			cb = decoder.decode(data);
 		} catch (CharacterCodingException e) {
-			e.printStackTrace();
 		}
 		cb.rewind();
+		data.rewind();
 	
 		sendtext = cb.toString();
 		return sendtext;
@@ -134,28 +137,36 @@
 
 	public static void send(SocketChannel ch, ByteBuffer command, ByteBuffer data) {
 		int send_size = LINDA_HEADER_SIZE;
-		int count = 0;
-	
-		if (command.position()!=0||command.limit()!=PSX.LINDA_HEADER_SIZE)
-			System.err.println("command length erron send");
+		// if datalen in the header is different from ByteBuffer remaining(), we lost
+		// protocol synchronization. Make sure to have correct length now.
+		if (true && data!=null) {
+			int datalen = data.limit()-data.position();
+			command.putInt(LINDA_DATA_LENGTH_OFFSET,datalen); 
+			command.putInt(LINDA_PACKET_LENGTH_OFFSET,datalen+LINDA_HEADER_SIZE-INT_SIZE);
+			command.rewind();
+		}
 		try {
 			//command Send
 			while(send_size > 0){
-				count = ch.write(command);
-				if(count < 0) throw new IOException();
+				int count = ch.write(command);
+				if(count <= 0) throw new IOException();
 				send_size -= count;
 			}
 	
 			if (data==null) return;
 			//data Send
 			while(data.remaining() > 0){
-				count = ch.write(data);
+				int count = ch.write(data);
+				System.err.println("Data out length = "+count);
 				if(count < 0) throw new IOException();
 			}
 		} catch (IOException e) {
 			System.out.println("Write Falied on:"+ch);
 			return;
 		}
+		// command or data may be shared among PSX queue
+		command.rewind();
+		data.rewind();
 	}
 
 	public static void send(SelectionKey key, ByteBuffer command, ByteBuffer data) {
@@ -203,6 +214,9 @@
 	static void receive(SocketChannel channel, ByteBuffer command, int readsize)
 			throws IOException {
 		int count;
+		if (command.capacity()!=readsize) {
+    		System.err.println("read size mismatch"+readsize+" and "+command.capacity());
+		}
 		while(readsize>0) {
 	    	if(IOHandler.debug){
 	    		System.out.println("reading packet..."+readsize);
@@ -214,6 +228,22 @@
 		}
 	    command.rewind();
 	}
+
+	static ByteBuffer receivePacket(SocketChannel channel, ByteBuffer command)
+			throws IOException {
+		/**
+		 * Receive a command and data according to the command.
+		 */
+	    receive(channel, command,LINDA_HEADER_SIZE); 
+	    int datalen = command.getInt(LINDA_DATA_LENGTH_OFFSET);
+	    int packetlen = command.getInt(LINDA_PACKET_LENGTH_OFFSET);
+	    command.rewind();
+	    if (datalen+LINDA_HEADER_SIZE!=packetlen) return null; 
+	    ByteBuffer data = ByteBuffer.allocate(datalen);
+	    data.order(ByteOrder.BIG_ENDIAN);
+	    receive(channel, data, datalen);
+		return data;
+	}
     
 
 }
--- a/src/fdl/PSXQueue.java	Fri Aug 22 14:49:52 2008 +0900
+++ b/src/fdl/PSXQueue.java	Sun Aug 24 03:23:08 2008 +0900
@@ -46,7 +46,7 @@
 	}
 
 	private void setCommand() {
-		command = PSX.setCommand(mode,id,seq,size);
+		command = PSX.setCommand(mode,id,seq,data);
 	}
 
 	public void setSeq(int _seq) {
--- a/src/fdl/Tuple.java	Fri Aug 22 14:49:52 2008 +0900
+++ b/src/fdl/Tuple.java	Sun Aug 24 03:23:08 2008 +0900
@@ -24,11 +24,11 @@
     }
     
     public void setCommand(int _mode, int _seq) {
-    	setCommand( _mode, id, _seq,data.limit());
+    	setCommand( _mode, id, _seq,data);
     }
 
-	public void setCommand(int _mode, int _id, int _seq, int _datalen) {
-    	command = PSX.setCommand(_mode, _id, _seq, _datalen);
+	public void setCommand(int _mode, int _id, int _seq, ByteBuffer data) {
+    	command = PSX.setCommand(_mode, _id, _seq, data);
     }
 
 	public void setTuple(int _mode,int _id, int _seq, ByteBuffer _data) {
@@ -59,7 +59,7 @@
     }
 
     public int getdataLength() {
-    	return data.limit();
+    	return data==null?0:data.remaining();
     }
     
     public ByteBuffer getData() {
--- a/src/fdl/TupleSpace.java	Fri Aug 22 14:49:52 2008 +0900
+++ b/src/fdl/TupleSpace.java	Sun Aug 24 03:23:08 2008 +0900
@@ -77,8 +77,7 @@
 			PSX.setAnserCommand(command, tuple_space[id].getSeq());
 			
 			if(debug){
-				int sendsize = datasize+PSX.LINDA_HEADER_SIZE;
-				System.out.println("send size "+sendsize+" : mode = "+(char)'a');
+				System.out.println("send size "+datasize+" : mode = "+(char)'a');
 			}
 			PSX.send(tuple_space[id].ch, command, data);
 			removeTuple(id);
--- a/src/fdl/test/TestMetaLinda.java	Fri Aug 22 14:49:52 2008 +0900
+++ b/src/fdl/test/TestMetaLinda.java	Sun Aug 24 03:23:08 2008 +0900
@@ -57,23 +57,25 @@
 			psx = fdl.open(host,port);
 			System.out.println("Connected.");
 
-			ByteBuffer data = ByteBuffer.allocate(10);
 			r = psx.in(1);
 
 			for(int i=0;i<10;i++) {
-				data.clear();
+				ByteBuffer data = ByteBuffer.allocate(10);
 				data.putInt(i);
 				data.flip();
 				psx.out(1,data);
+			}
+			for(int i=0;i<100;i++) {
 				if (r.ready()) {
 					System.err.println("Get:"+r.data.getInt());
 					r = psx.in(1);
 				}
-				psx.sync(100);
+				// System.out.println("syncing..."+i);
+				psx.sync(10);
 			}
 
-			data.clear();
-			psx.out(PSX.META_STOP, data);
+			System.out.println("Try to stop the server");
+			psx.out(PSX.META_STOP, null);
 			psx.sync();
 			
 		} catch (IOException e) {
--- a/src/fdl/test/TestMonitor.java	Fri Aug 22 14:49:52 2008 +0900
+++ b/src/fdl/test/TestMonitor.java	Sun Aug 24 03:23:08 2008 +0900
@@ -69,9 +69,13 @@
 		Server s = new Server();
 		Client c = new Client();
 		Monitor m = new Monitor();
-		new Thread(s).start();
-		new Thread(m).start();
-		new Thread(c).start();
+		Thread ts = new Thread(s); ts.start();
+		Thread tm = new Thread(m); tm.start();
+		Thread tc = new Thread(c); tc.start();
+		try {
+			ts.join(); tm.join(); tc.join();
+		} catch (InterruptedException e) {
+		}
 	}