changeset 103:7da06001aeb5 fuchita

add TestTree.java
author one
date Wed, 26 May 2010 17:55:40 +0900
parents 3b000c4a4d31
children 25f7fc05be71
files src/fdl/test/TestTree.java
diffstat 1 files changed, 373 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/fdl/test/TestTree.java	Wed May 26 17:55:40 2010 +0900
@@ -0,0 +1,373 @@
+package fdl.test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+
+import fdl.FDLindaServ;
+import fdl.FederatedLinda;
+import fdl.MetaEngine;
+import fdl.MetaLinda;
+import fdl.PSX;
+import fdl.PSXCallback;
+import fdl.PSXLinda;
+import fdl.PSXReply;
+
+
+public class TestTree {
+
+	public static final int PORT = 10000;
+	public static final int ConnectReuest = 100;
+	private static final int Down = 101;
+	public static final int Up = 102;
+	private static final int NO_HOST = -1;
+	private static final int HOST_LIST_END_MARK = -2;
+	private static final int CONNECT_LIST_END_MARK = -3;
+	public LinkedList<Thread> lindas = new LinkedList<Thread>();
+	public  int id = 0;
+	public boolean debug = true;
+	
+	class TreeNode implements Runnable {
+		public int id, port;
+		public TreeNode(int id, int port) { this.id = id; this.port = port; }
+		public void run() {
+			String[] args = {
+					// "-d",
+					"-p",Integer.toString(port)};
+			FDLindaServ.main(new TreeMetaProtocolEngine(id,port), args);
+		}
+	}
+	
+	class TreeMetaProtocolEngine implements MetaEngine {
+		boolean running = true;
+		LinkedList<ByteBuffer> leftWaiter = new LinkedList<ByteBuffer>();
+		LinkedList<ByteBuffer> rightWaiter = new LinkedList<ByteBuffer>();
+		PSXLinda parent, left, right;
+		String parentHost, leftHost,rightHost;
+		int parentPort=0, leftPort=0, rightPort=0;
+		public int nodeId = 0;
+		public int nodePort  = 0;
+
+		public TreeMetaProtocolEngine(int id, int port) {
+			nodeId = id;
+			nodePort = port;
+		}
+
+		public void mainLoop(final MetaLinda ml) {
+			ml.in(PSX.META_STOP, new PSXCallback() {public void callback(ByteBuffer reply) {
+				running = false;}});
+			ml.in(ConnectReuest, new PSXCallback() {public void callback(ByteBuffer reply) {
+				connectChildren(ml, reply);
+				}
+			});
+			while(running) {
+				ml.sync(0);
+			}
+		}
+
+		private void setHostPort(ByteBuffer reply) {
+			parentPort = reply.getInt();
+			leftPort = reply.getInt();
+			rightPort = reply.getInt();
+			if (parentPort!=0) parentHost = getString(reply);
+			if (leftPort!=0) leftHost = getString(reply);
+			if (rightPort!=0) rightHost = getString(reply);
+		}
+
+		/**
+		 *       parent
+		 *          |
+		 *       self
+		 *       /    \
+		 *    left    right
+		 *    
+		 * @param ml
+		 * @param reply
+		 */
+		private void connectChildren(final MetaLinda ml, ByteBuffer reply) {
+			setHostPort(reply);
+			try {
+				if (parentPort!=0) parent = ml.fdl.open(parentHost, parentPort);
+				if (leftPort!=0) left = ml.fdl.open(leftHost, leftPort);
+				if (rightPort!=0) right = ml.fdl.open(rightHost, rightPort);
+
+				if (debug) System.out.println("Connect this port="+nodePort+" parent="+parentPort+" left="+leftPort+" right="+rightPort);
+				ml.in(Down, new PSXCallback() {public void callback(ByteBuffer reply) {
+					ml.in(Down,this);
+					if (left==null) {
+						if (debug) System.out.println("Reached in Leaf "+nodeId+" value = "+parentPort);
+						// Leaf case
+						ByteBuffer answer = ByteBuffer.allocate(10);
+						answer.putInt(parentPort);
+						answer.flip();
+						ml.out(Up, answer);
+						return;
+					}
+					if (debug) System.out.println("Pass it to the children from "+nodeId + " to "+ leftPort + " and " + rightPort + ".");
+					ByteBuffer copy = reply.duplicate();
+					left.out(Down, reply);
+					right.out(Down, copy);
+				}
+
+				});
+				if (leftPort!=0)
+					left.in(Up, new PSXCallback() {public void callback(ByteBuffer reply) {
+						if (debug) System.out.println("Up from left at"+nodeId);
+						left.in(Up,this);
+						leftWaiter.add(reply);
+						checkSend(ml);
+					}
+
+					});
+				if (rightPort!=0)
+					right.in(Up, new PSXCallback() {public void callback(ByteBuffer reply) {
+						if (debug) System.out.println("Up from right at"+nodeId);
+						right.in(Up,this);
+						rightWaiter.add(reply);
+						checkSend(ml);
+					}
+					});
+			} catch (IOException e) {
+			}
+		}
+
+		private void checkSend(MetaLinda ml) {
+			if (leftWaiter.isEmpty()||rightWaiter.isEmpty()) return;
+			ByteBuffer out = ByteBuffer.allocate(10);
+			int value = leftWaiter.poll().getInt()+rightWaiter.poll().getInt();
+			if (parent!=null) {
+				if (debug) System.out.println("Up the vluae "+value+" from "+nodeId);
+				out.putInt(value);
+				out.flip();
+				ml.out(Up, out);
+			} else {
+				// Top node case
+				System.out.println("Top level node gets "+value);
+			}
+		}
+	}
+
+
+	class Congigure implements Runnable {
+		public String id;
+		public ByteBuffer config;
+		public FederatedLinda fdl;
+		public LinkedList<PSXLinda> psxs = new LinkedList<PSXLinda>();
+		private LinkedList<String> hosts = new LinkedList<String>();
+		private LinkedList<Integer> ports = new LinkedList<Integer>();
+		public Congigure(int id,ByteBuffer config) { this.id = "Configure"+id; 
+		   this.config = config;
+		   }
+		public void run() {
+			String[] args = {id};
+			main(args);
+		}
+
+		public void main(String[] arg) {
+			final String name = arg[0];
+			try {
+				fdl = FederatedLinda.init();
+				readConfigure();
+				fdl.sync(1000);
+				startTest();
+				stop();
+			} catch (IOException e) {
+				System.err.println(name+": Communication failure.");
+			}
+		}
+
+
+		public void startTest() {
+			System.out.println("StartTest");
+			try {
+				PSXLinda psx = psxs.get(0);
+				for(int i=3;i<10;i++) {
+					sendData(psx,Down,i);
+					psx.sync(1000);
+				}
+				sleep(1000);
+			} catch (IOException e) {
+				e.printStackTrace();
+			}
+		}
+		
+		public void stop() throws IOException {
+			ByteBuffer data = ByteBuffer.allocate(10);
+			for(PSXLinda psx:psxs) {
+				psx.out(PSX.META_STOP, data.duplicate());
+				psx.sync(1);
+			}
+		}
+
+		public void openLinda(String host, int port) throws IOException {
+			FederatedLinda fdl;
+			PSXLinda psx;
+			PSXReply r;
+			fdl = FederatedLinda.init();
+			psx = fdl.open(host,port);
+			r = psx.in(65535);
+			fdl.sync(1);
+			System.out.println(host+" port "+port +": Connected.");
+			int cnt=0;
+			while(!r.ready()) {
+				// psx.sync(1000);
+				psx.sync(10);
+				System.out.println(host+" port "+port +": Waiting...."+(cnt++));
+			}
+			print_id(r);
+			
+			return ;
+		}
+
+		public void readConfigure() throws IOException {
+			int mode = 0;
+			int host = 0;
+			while(config.hasRemaining()) {
+				int port = config.getInt();
+				if (mode==0) {
+					if (port==HOST_LIST_END_MARK) { 
+						mode = 1;
+						continue;
+					}
+					String hostname = getString(config);
+					
+					psxs.add(fdl.open(hostname,port));
+					hosts.add(hostname);
+					ports.add(port);
+
+				} else {
+					if (port == CONNECT_LIST_END_MARK) return;
+					connect(host++, port,config.getInt(),config.getInt());
+				}
+			}
+		}
+		
+		private void connect(int host, int parent, int left, int right) {
+			PSXLinda p = psxs.get(host);
+			ByteBuffer out = ByteBuffer.allocate(4096);
+			
+			out.putInt(parent!=NO_HOST? ports.get(parent):0);
+			out.putInt(left!=NO_HOST? ports.get(left):0);
+			out.putInt(right!=NO_HOST? ports.get(right):0);
+			if (parent!=NO_HOST) putString(out,hosts.get(parent));
+			if (left!=NO_HOST) putString(out,hosts.get(left));
+			if (right!=NO_HOST) putString(out,hosts.get(right));
+			out.flip();
+			p.out(ConnectReuest, out);
+		}
+		
+	}
+	
+	public synchronized void sleep(int time) {
+		try {
+			wait(time);
+		} catch (InterruptedException e) {
+			e.printStackTrace();
+		}
+	}
+
+	public void read_wait(PSXLinda psx, PSXReply reply, String mesg) throws IOException {
+		while(!reply.ready()) psx.sync(10);
+		System.out.println(mesg);
+		System.out.println(reply.getData().getInt());
+		System.out.println("");
+	}
+
+	public ByteBuffer sendData(PSXLinda psx,int id, int n) {
+		ByteBuffer data = ByteBuffer.allocate(10);
+		data.putInt(n);
+		data.flip();
+		psx.out(id,data);
+		return data;
+	}
+
+	public void in_wait(PSXLinda psx, int i) throws IOException {
+		PSXReply r = psx.in(i);
+		while(! r.ready()) { 
+			psx.sync(10); 
+		}
+		return;
+	}
+
+	public void print_id (PSXReply ans) throws IOException {
+		ByteBuffer r = ans.getData();
+		System.out.print("ID = ");
+		System.out.write(r.array());
+		System.out.println("");
+	}
+
+
+	public String getString(ByteBuffer reply) {
+		char c;
+		String s = "";
+		while(reply.hasRemaining()) {
+			c = reply.getChar();
+			if (c== 0) break;
+			s += c; 
+			}
+		return s;
+	}
+	
+	public void putString(ByteBuffer reply,String s) {
+		for(int i=0; i<s.length(); i++) {
+			char c= s.charAt(i);
+			reply.putChar(c);
+		}
+		reply.putChar((char) 0);
+	}
+
+	public static void main(String[] arg) throws InterruptedException {
+		TestTree me = new TestTree();
+		me.test1();
+	}
+
+	public void test1() throws InterruptedException {
+		ByteBuffer config = makeConfig();
+		sleep(2000);
+		System.out.println("Start Configure");
+		Thread r1 = new Thread(new Congigure(1,config));
+		r1.start();
+		r1.join();
+	}
+
+	private ByteBuffer makeConfig() {
+		ByteBuffer config = ByteBuffer.allocate(4096);
+		putHostRun(config,"localhost",PORT);
+		putHostRun(config,"localhost",PORT+1);
+		putHostRun(config,"localhost",PORT+2);
+		putHostRun(config,"localhost",PORT+3);
+		putHostRun(config,"localhost",PORT+4);
+		putHostRun(config,"localhost",PORT+5);
+		putHostRun(config,"localhost",PORT+6);
+		config.putInt(HOST_LIST_END_MARK);
+		putTree(config,NO_HOST,1,2);
+		putTree(config,0,3,4);
+		putTree(config,0,5,6);
+		putTree(config,1,NO_HOST,NO_HOST);
+		putTree(config,1,NO_HOST,NO_HOST);
+		putTree(config,2,NO_HOST,NO_HOST);
+		putTree(config,2,NO_HOST,NO_HOST);
+		config.putInt(CONNECT_LIST_END_MARK);
+		config.flip();
+		return config;
+	}
+
+	private void putHostRun(ByteBuffer config, String host, int port2) {
+		putHost(config,host,port2);
+		// should run on specified host using ssh 
+		Thread s = new Thread(new TreeNode(id ++, port2));
+		s.start();
+		lindas.add(s);
+	}
+	
+	private void putHost(ByteBuffer config, String s, int port2) {
+		config.putInt(port2);
+		putString(config, s);		
+	}
+
+	private void putTree(ByteBuffer config, int p, int l, int r) {
+		config.putInt(p);
+		config.putInt(l);
+		config.putInt(r);
+	}
+}