changeset 96:96c63bc659d4 fuchita

change sync() for no wait sync. (sorry for this) more practical wait read example.
author one
date Wed, 26 May 2010 08:56:24 +0900
parents 7bf2eeea23a0
children 0ea086f0e96f
files src/fdl/FDLindaServ.java src/fdl/FederatedLinda.java src/fdl/IOHandler.java src/fdl/MetaLinda.java src/fdl/PSX.java src/fdl/test/TestWaitRd.java
diffstat 6 files changed, 219 insertions(+), 111 deletions(-) [+]
line wrap: on
line diff
--- a/src/fdl/FDLindaServ.java	Tue May 25 23:11:11 2010 +0900
+++ b/src/fdl/FDLindaServ.java	Wed May 26 08:56:24 2010 +0900
@@ -17,6 +17,7 @@
 	static final int MAX_REQ = 1;
 	static final int FAIL = (-1);
 	static final int DEF_PORT = 10000;
+	public static boolean debug = false;
 	public int port = DEF_PORT;
 	AbstractSelector selector;
 	private ServerSocketChannel ssChannel;
@@ -24,7 +25,7 @@
 	public MetaEngine me;
 	
 	public static void main(final String[] args) {
-		final String usages = "usage: FDLindaServ [-p port]";
+		final String usages = "usage: FDLindaServ [-d] [-p port]";
 		
 		int port = DEF_PORT;
 		//引数判定
@@ -32,7 +33,9 @@
 			for (int i=0; i<args.length; ++i) {
 				if("-p".equals(args[i])) {
 					port = Integer.parseInt(args[++i]);					
-				} 
+				} else if("-d".equals(args[i])) {
+					debug  = true;
+				}
 			}
 		} catch (NumberFormatException e) {
 			System.err.println(usages);
@@ -80,8 +83,25 @@
 	}
 
 	public void checkTuple() {
-		checkTuple(0);
-	}
+		// セレクタによる監視    
+		try {
+			if (selector.select()>0) {
+//              this does not work because #it.remove() is not called.
+//				for(SelectionKey s:selector.selectedKeys()) {
+//					TupleHandler handler = (TupleHandler)s.attachment();
+//					handler.handle(s);
+//				}
+			    for (Iterator<SelectionKey> it = selector.selectedKeys().iterator();it.hasNext(); ) {
+			        SelectionKey s = it.next();
+			        it.remove();
+					TupleHandler handler = (TupleHandler)s.attachment();
+					handler.handle(s);
+				}
+			}
+		} catch (ClosedChannelException e) {
+			// we have to do something...
+		} catch (IOException e) {
+		}}
 	
 	public void checkTuple(long timeout) {
 		// セレクタによる監視    
@@ -107,6 +127,7 @@
 	
 
 	public void log(Level level,String msg) {
+		if (level!=Level.SEVERE && !debug) return;
 		System.err.println(msg);
 		if (level==Level.SEVERE)
 			new IOException().setStackTrace(null);
--- a/src/fdl/FederatedLinda.java	Tue May 25 23:11:11 2010 +0900
+++ b/src/fdl/FederatedLinda.java	Wed May 26 08:56:24 2010 +0900
@@ -38,7 +38,7 @@
 
 	FederatedLinda fdl;
 	static int MAX_SEQUENCE = 2048;
-	static boolean debug = false;
+	public static boolean debug = false;
 
 	public int tid;
 	public int seq;
@@ -116,9 +116,7 @@
 			}
 			s = new Integer(seq);
 		} while (seqHash.containsKey(s));
-		if (debug) {
-			log(Level.INFO,"hash value = "+s.hashCode());
-		}
+		// log(Level.INFO,"hash value = "+s.hashCode());
 		seqHash.put(s,reply);
 		seq++;
 		return seq-1;
@@ -128,10 +126,46 @@
 		return selector;
 	}
 
+	/**
+	 * sync with no wait 
+	 * @return 0
+	 * @throws IOException
+	 */
 	public int sync() throws IOException {
-		return sync(0);
+		int key_num = 0;
+		queueExec();
+
+		try {
+			if (selector.select()>0) {
+				for (Iterator<SelectionKey> it = selector.selectedKeys().iterator();it.hasNext(); ) {
+					SelectionKey s = it.next();
+					it.remove();
+					try {
+						if (!s.isReadable()) 
+							throw new IOException();
+						TupleHandler handle = (TupleHandler)s.attachment();
+						handle.handle(s);
+					} catch (IOException e) {
+						s.cancel();
+						log(Level.INFO,""+s.channel()+" is closed.");
+					}
+				}
+			}
+		} catch (IOException e) {
+			e.printStackTrace();
+		} catch (ClosedSelectorException e) {
+			// client should be know
+		}
+
+		return key_num;
 	}
 
+	/**
+	 * sync with mtimeout msec wait
+	 * @param mtimeout   0 means indifinite wait
+	 * @return 0
+	 * @throws IOException
+	 */
 	public int sync(long mtimeout) 
 	throws IOException {
 		int key_num = 0;
@@ -180,6 +214,7 @@
 	}
 
 	public void log(Level level,String msg) {
+		if (level!=Level.SEVERE && !debug) return;
 		System.err.println(msg);
 		if (level==Level.SEVERE)
 			new IOException().setStackTrace(null);
--- a/src/fdl/IOHandler.java	Tue May 25 23:11:11 2010 +0900
+++ b/src/fdl/IOHandler.java	Wed May 26 08:56:24 2010 +0900
@@ -9,7 +9,7 @@
 import java.util.logging.Level;
 
 public class IOHandler implements TupleHandler {
-    static final boolean debug = true;
+    static final boolean debug = false;
     public TupleSpace tupleSpace;
     public SocketChannel ch;
     public FDLindaServ fds;
--- a/src/fdl/MetaLinda.java	Tue May 25 23:11:11 2010 +0900
+++ b/src/fdl/MetaLinda.java	Wed May 26 08:56:24 2010 +0900
@@ -95,19 +95,42 @@
 		addReply(r);
 	}
 
+
+	public void waitRd(int id, PSXCallback callback) {
+		MetaReply r = new MetaReply(PSX.PSX_WAIT_RD,id,ts,callback);
+		addReply(r);
+	}
+
+	public PSXReply waitRd(int id) {
+		MetaReply r = new MetaReply(PSX.PSX_WAIT_RD,id,ts);
+		return r;
+	}
+	
 	public PSXLinda add(PSXLinda linda) {
 		next = linda;
 		return this;
 	}
 
-
+	/**
+	 * Meta Sync with no wait
+	 */
 	public int sync() {
-		return sync(0);
+		fdl.queueExec();
+		fds.checkTuple(); // fdl sync is also handled here
+		return metaSync();
 	}
 
+	/**
+	 * Meta Sync with wait
+	 * @param timeout wait timeout msec, if 0 wait indefinitely
+	 */
 	public int sync(long timeout) {
 		fdl.queueExec();
 		fds.checkTuple(timeout); // fdl sync is also handled here
+		return metaSync();
+	}
+		
+	public int metaSync() {
 		/*
 		 * r.callback() may call meta.sync() and modifies the
 		 * replies queue. Do current size of queue only. The
@@ -143,16 +166,6 @@
 	public void setTupleSpaceHook(IOHandlerHook hook) {
 		ts.hook = hook;
 	}
-
-	public void waitRd(int id, PSXCallback callback) {
-		MetaReply r = new MetaReply(PSX.PSX_WAIT_RD,id,ts,callback);
-		addReply(r);
-	}
-
-	public PSXReply waitRd(int id) {
-		MetaReply r = new MetaReply(PSX.PSX_WAIT_RD,id,ts);
-		return r;
-	}
 }
 
 
--- a/src/fdl/PSX.java	Tue May 25 23:11:11 2010 +0900
+++ b/src/fdl/PSX.java	Wed May 26 08:56:24 2010 +0900
@@ -218,7 +218,7 @@
     		System.err.println("read size mismatch"+readsize+" and "+command.capacity());
 		}
 		while(readsize>0) {
-	    	if(false && IOHandler.debug){
+	    	if(IOHandler.debug){
 	    		System.out.println("reading packet..."+readsize);
 	    	}
 	    	count = channel.read(command);
--- a/src/fdl/test/TestWaitRd.java	Tue May 25 23:11:11 2010 +0900
+++ b/src/fdl/test/TestWaitRd.java	Wed May 26 08:56:24 2010 +0900
@@ -20,130 +20,169 @@
 	public FDLindaServ fds;
 	public static final int PORT = 10000;
 
-	class Server implements Runnable {
+	class LindaServer implements Runnable {
+		public int id;
+		public LindaServer(int id) { this.id = id; }
 		public void run() {
-			String[] args = {"-p",Integer.toString(PORT)};
+			String[] args = {/* "-d",*/"-p",Integer.toString(PORT)};
 			FDLindaServ.main(args);
 		}
 	}
 
-	class Client implements Runnable {
+	class Sender implements Runnable {
+		public String id;
+		public Sender(int id) { this.id = "Sender"+id; }
 		public void run() {
-			String[] args = {};
+			String[] args = {id};
 			sleep(2000);
 			main(args);
 		}
-		public synchronized void sleep(int time) {
+
+		public void main(String[] arg) {
 			try {
-				wait(time);
-			} catch (InterruptedException e) {
+				PSXLinda psx = openLinda(id);
+				sleep(1000);
+				sendData(psx,1,0);
+				psx.sync(1000);
+				waitIn(psx,1);
+				sendData(psx,1,1);
+				psx.sync(1000);
+				waitIn(psx,1);
+				sendData(psx,1,2);
+				psx.sync(1000);
+				waitIn(psx,1);
+
+				for(int i=3;i<10;i++) {
+					sendData(psx,1,i);
+					psx.sync(1000);
+					waitIn(psx,1);
+				}
+				sleep(1000);
+			} catch (IOException e) {
 				e.printStackTrace();
 			}
 		}
-		
-		public void main(String[] arg) {
-			try {
-				PSXLinda psx = openLinda();
-				ByteBuffer data = sendData(psx,1,0);
+	}
 
-				psx.waitRd(1, new PSXCallback() {	public void callback(ByteBuffer reply) {read_wait(reply,"First read");} });
-				psx.sync(1);
-				sendData(psx,1,1);
-				psx.sync(1);
-				
+	class Reader implements Runnable {
+		public String id;
+		public Reader(int id) { this.id = "Reader"+id; }
+		public void run() {
+			String[] args = {id};
+			sleep(2000);
+			main(args);
+		}
 
-				sleep(1000);
+		public void main(String[] arg) {
+			final String name = arg[0];
+			try {
+				PSXLinda psx = openLinda(id);
 
-				psx.waitRd(1, new PSXCallback() {	public void callback(ByteBuffer reply) {read_wait(reply,"2nd read");} });
-				psx.sync(1);
-				sendData(psx,1,2);
-				psx.sync(1);
-				sleep(1000);
-
+				PSXReply reply = psx.waitRd(1);
+				read_wait(psx, reply,name+": First read");
+				PSXReply reply1 = psx.waitRd(1);
+				read_wait(psx, reply1,name+": 2nd read");
 
 				for(int i=3;i<10;i++) {
-					final int j = i;
-					psx.waitRd(1, new PSXCallback() {public void callback(ByteBuffer reply) {read_wait(reply,""+j+"th read");} });
-					psx.sync(1);
-					sendData(psx,1,i);
-					psx.sync(1);
+					PSXReply replyn = psx.waitRd(1);
+					read_wait(psx, replyn,name+": "+i+"th read");
 				}
-				sleep(1000);
-				
-				data.clear();
+
+				ByteBuffer data = ByteBuffer.allocate(10);
 				psx.out(PSX.META_STOP, data);
 				psx.sync(1);
 
 			} catch (IOException e) {
-				System.err.println("Communication failure.");
+				System.err.println(name+": Communication failure.");
 			}
 		}
-		
-		public void read_wait(ByteBuffer r, String mesg) {
-			System.out.println(mesg);
-			System.out.println(r.getInt());
-			System.out.println("");
-		}
-		
-		private PSXLinda openLinda() throws IOException {
-			FederatedLinda fdl;
-			PSXLinda psx;
-			PSXReply r;
-			String host;
-			InetSocketAddress localAddress;
-			int port = PORT;
-			try {
-				localAddress = new InetSocketAddress(InetAddress.getLocalHost(), port);
-				host = localAddress.getHostName();
-			} catch (UnknownHostException e) {
-				host = "localhost";
-			}
-			fdl = FederatedLinda.init();
-			psx = fdl.open(host,port);
-			r = psx.in(65535);
-			fdl.sync(1);
-			System.out.println("Connected.");
-			int cnt=0;
-			while(!r.ready()) {
-				// psx.sync(1000);
-				psx.sync();
-				System.out.println("Waiting...."+(cnt++));
-			}
-			print_id(r);
-			return psx;
-		}
-		
-		private ByteBuffer sendData(PSXLinda psx,int id, int n) {
-			ByteBuffer data = ByteBuffer.allocate(10);
-			data.putInt(n);
-			data.flip();
-			psx.out(id,data);
-			return data;
-		}
+	}
 
-
-		public void print_id (PSXReply ans) throws IOException {
-			ByteBuffer r = ans.getData();
-			System.out.print("ID = ");
-			System.out.write(r.array());
-			System.out.println("");
+	public synchronized void sleep(int time) {
+		try {
+			wait(time);
+		} catch (InterruptedException e) {
+			e.printStackTrace();
 		}
 	}
 
+	public void read_wait(PSXLinda psx, PSXReply reply, String mesg) throws IOException {
+		while(!reply.ready()) psx.sync(10);
+		System.out.println(mesg);
+		System.out.println(reply.getData().getInt());
+		System.out.println("");
+	}
+
+	public PSXLinda openLinda(String name) throws IOException {
+		FederatedLinda fdl;
+		PSXLinda psx;
+		PSXReply r;
+		String host;
+		InetSocketAddress localAddress;
+		int port = PORT;
+		try {
+			localAddress = new InetSocketAddress(InetAddress.getLocalHost(), port);
+			host = localAddress.getHostName();
+		} catch (UnknownHostException e) {
+			host = "localhost";
+		}
+		fdl = FederatedLinda.init();
+		psx = fdl.open(host,port);
+		r = psx.in(65535);
+		fdl.sync(1);
+		System.out.println(name+": Connected.");
+		int cnt=0;
+		while(!r.ready()) {
+			// psx.sync(1000);
+			psx.sync();
+			System.out.println(name+": Waiting...."+(cnt++));
+		}
+		print_id(r);
+		return psx;
+	}
+
+	public ByteBuffer sendData(PSXLinda psx,int id, int n) {
+		ByteBuffer data = ByteBuffer.allocate(10);
+		data.putInt(n);
+		data.flip();
+		psx.out(id,data);
+		return data;
+	}
+
+	public void waitIn(PSXLinda psx, int i) throws IOException {
+		PSXReply r = psx.in(i);
+		while(! r.ready()) { 
+			psx.sync(10); 
+		}
+		return;
+	}
+
+	public void print_id (PSXReply ans) throws IOException {
+		ByteBuffer r = ans.getData();
+		System.out.print("ID = ");
+		System.out.write(r.array());
+		System.out.println("");
+	}
+
+
 	public static void main(String[] arg) throws InterruptedException {
 		TestWaitRd me = new TestWaitRd();
 		me.test1();
 	}
 
 	public void test1() throws InterruptedException {
-		Server s = new Server();
-		Client c = new Client();
-		Thread s1 = new Thread(s);
-		Thread c1 = new Thread(c);
+		Thread l1 = new Thread(new LindaServer(1));
+		Thread r1 = new Thread(new Reader(1));
+		Thread r2 = new Thread(new Reader(2));
+		Thread s1 = new Thread(new Sender(1));
+		l1.start();
 		s1.start();
-		c1.start();
+		r1.start();
+		r2.start();
 		s1.join();
-		c1.join();
-		
+		r1.join();
+		r2.join();
+		l1.join();
+
 	}
 }