view src/fdl/test/TestTree.java @ 103:7da06001aeb5 fuchita

add TestTree.java
author one
date Wed, 26 May 2010 17:55:40 +0900
parents
children 25f7fc05be71
line wrap: on
line source

package fdl.test;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.LinkedList;

import fdl.FDLindaServ;
import fdl.FederatedLinda;
import fdl.MetaEngine;
import fdl.MetaLinda;
import fdl.PSX;
import fdl.PSXCallback;
import fdl.PSXLinda;
import fdl.PSXReply;


public class TestTree {

	public static final int PORT = 10000;
	public static final int ConnectReuest = 100;
	private static final int Down = 101;
	public static final int Up = 102;
	private static final int NO_HOST = -1;
	private static final int HOST_LIST_END_MARK = -2;
	private static final int CONNECT_LIST_END_MARK = -3;
	public LinkedList<Thread> lindas = new LinkedList<Thread>();
	public  int id = 0;
	public boolean debug = true;
	
	class TreeNode implements Runnable {
		public int id, port;
		public TreeNode(int id, int port) { this.id = id; this.port = port; }
		public void run() {
			String[] args = {
					// "-d",
					"-p",Integer.toString(port)};
			FDLindaServ.main(new TreeMetaProtocolEngine(id,port), args);
		}
	}
	
	class TreeMetaProtocolEngine implements MetaEngine {
		boolean running = true;
		LinkedList<ByteBuffer> leftWaiter = new LinkedList<ByteBuffer>();
		LinkedList<ByteBuffer> rightWaiter = new LinkedList<ByteBuffer>();
		PSXLinda parent, left, right;
		String parentHost, leftHost,rightHost;
		int parentPort=0, leftPort=0, rightPort=0;
		public int nodeId = 0;
		public int nodePort  = 0;

		public TreeMetaProtocolEngine(int id, int port) {
			nodeId = id;
			nodePort = port;
		}

		public void mainLoop(final MetaLinda ml) {
			ml.in(PSX.META_STOP, new PSXCallback() {public void callback(ByteBuffer reply) {
				running = false;}});
			ml.in(ConnectReuest, new PSXCallback() {public void callback(ByteBuffer reply) {
				connectChildren(ml, reply);
				}
			});
			while(running) {
				ml.sync(0);
			}
		}

		private void setHostPort(ByteBuffer reply) {
			parentPort = reply.getInt();
			leftPort = reply.getInt();
			rightPort = reply.getInt();
			if (parentPort!=0) parentHost = getString(reply);
			if (leftPort!=0) leftHost = getString(reply);
			if (rightPort!=0) rightHost = getString(reply);
		}

		/**
		 *       parent
		 *          |
		 *       self
		 *       /    \
		 *    left    right
		 *    
		 * @param ml
		 * @param reply
		 */
		private void connectChildren(final MetaLinda ml, ByteBuffer reply) {
			setHostPort(reply);
			try {
				if (parentPort!=0) parent = ml.fdl.open(parentHost, parentPort);
				if (leftPort!=0) left = ml.fdl.open(leftHost, leftPort);
				if (rightPort!=0) right = ml.fdl.open(rightHost, rightPort);

				if (debug) System.out.println("Connect this port="+nodePort+" parent="+parentPort+" left="+leftPort+" right="+rightPort);
				ml.in(Down, new PSXCallback() {public void callback(ByteBuffer reply) {
					ml.in(Down,this);
					if (left==null) {
						if (debug) System.out.println("Reached in Leaf "+nodeId+" value = "+parentPort);
						// Leaf case
						ByteBuffer answer = ByteBuffer.allocate(10);
						answer.putInt(parentPort);
						answer.flip();
						ml.out(Up, answer);
						return;
					}
					if (debug) System.out.println("Pass it to the children from "+nodeId + " to "+ leftPort + " and " + rightPort + ".");
					ByteBuffer copy = reply.duplicate();
					left.out(Down, reply);
					right.out(Down, copy);
				}

				});
				if (leftPort!=0)
					left.in(Up, new PSXCallback() {public void callback(ByteBuffer reply) {
						if (debug) System.out.println("Up from left at"+nodeId);
						left.in(Up,this);
						leftWaiter.add(reply);
						checkSend(ml);
					}

					});
				if (rightPort!=0)
					right.in(Up, new PSXCallback() {public void callback(ByteBuffer reply) {
						if (debug) System.out.println("Up from right at"+nodeId);
						right.in(Up,this);
						rightWaiter.add(reply);
						checkSend(ml);
					}
					});
			} catch (IOException e) {
			}
		}

		private void checkSend(MetaLinda ml) {
			if (leftWaiter.isEmpty()||rightWaiter.isEmpty()) return;
			ByteBuffer out = ByteBuffer.allocate(10);
			int value = leftWaiter.poll().getInt()+rightWaiter.poll().getInt();
			if (parent!=null) {
				if (debug) System.out.println("Up the vluae "+value+" from "+nodeId);
				out.putInt(value);
				out.flip();
				ml.out(Up, out);
			} else {
				// Top node case
				System.out.println("Top level node gets "+value);
			}
		}
	}


	class Congigure implements Runnable {
		public String id;
		public ByteBuffer config;
		public FederatedLinda fdl;
		public LinkedList<PSXLinda> psxs = new LinkedList<PSXLinda>();
		private LinkedList<String> hosts = new LinkedList<String>();
		private LinkedList<Integer> ports = new LinkedList<Integer>();
		public Congigure(int id,ByteBuffer config) { this.id = "Configure"+id; 
		   this.config = config;
		   }
		public void run() {
			String[] args = {id};
			main(args);
		}

		public void main(String[] arg) {
			final String name = arg[0];
			try {
				fdl = FederatedLinda.init();
				readConfigure();
				fdl.sync(1000);
				startTest();
				stop();
			} catch (IOException e) {
				System.err.println(name+": Communication failure.");
			}
		}


		public void startTest() {
			System.out.println("StartTest");
			try {
				PSXLinda psx = psxs.get(0);
				for(int i=3;i<10;i++) {
					sendData(psx,Down,i);
					psx.sync(1000);
				}
				sleep(1000);
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		
		public void stop() throws IOException {
			ByteBuffer data = ByteBuffer.allocate(10);
			for(PSXLinda psx:psxs) {
				psx.out(PSX.META_STOP, data.duplicate());
				psx.sync(1);
			}
		}

		public void openLinda(String host, int port) throws IOException {
			FederatedLinda fdl;
			PSXLinda psx;
			PSXReply r;
			fdl = FederatedLinda.init();
			psx = fdl.open(host,port);
			r = psx.in(65535);
			fdl.sync(1);
			System.out.println(host+" port "+port +": Connected.");
			int cnt=0;
			while(!r.ready()) {
				// psx.sync(1000);
				psx.sync(10);
				System.out.println(host+" port "+port +": Waiting...."+(cnt++));
			}
			print_id(r);
			
			return ;
		}

		public void readConfigure() throws IOException {
			int mode = 0;
			int host = 0;
			while(config.hasRemaining()) {
				int port = config.getInt();
				if (mode==0) {
					if (port==HOST_LIST_END_MARK) { 
						mode = 1;
						continue;
					}
					String hostname = getString(config);
					
					psxs.add(fdl.open(hostname,port));
					hosts.add(hostname);
					ports.add(port);

				} else {
					if (port == CONNECT_LIST_END_MARK) return;
					connect(host++, port,config.getInt(),config.getInt());
				}
			}
		}
		
		private void connect(int host, int parent, int left, int right) {
			PSXLinda p = psxs.get(host);
			ByteBuffer out = ByteBuffer.allocate(4096);
			
			out.putInt(parent!=NO_HOST? ports.get(parent):0);
			out.putInt(left!=NO_HOST? ports.get(left):0);
			out.putInt(right!=NO_HOST? ports.get(right):0);
			if (parent!=NO_HOST) putString(out,hosts.get(parent));
			if (left!=NO_HOST) putString(out,hosts.get(left));
			if (right!=NO_HOST) putString(out,hosts.get(right));
			out.flip();
			p.out(ConnectReuest, out);
		}
		
	}
	
	public synchronized void sleep(int time) {
		try {
			wait(time);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	public void read_wait(PSXLinda psx, PSXReply reply, String mesg) throws IOException {
		while(!reply.ready()) psx.sync(10);
		System.out.println(mesg);
		System.out.println(reply.getData().getInt());
		System.out.println("");
	}

	public ByteBuffer sendData(PSXLinda psx,int id, int n) {
		ByteBuffer data = ByteBuffer.allocate(10);
		data.putInt(n);
		data.flip();
		psx.out(id,data);
		return data;
	}

	public void in_wait(PSXLinda psx, int i) throws IOException {
		PSXReply r = psx.in(i);
		while(! r.ready()) { 
			psx.sync(10); 
		}
		return;
	}

	public void print_id (PSXReply ans) throws IOException {
		ByteBuffer r = ans.getData();
		System.out.print("ID = ");
		System.out.write(r.array());
		System.out.println("");
	}


	public String getString(ByteBuffer reply) {
		char c;
		String s = "";
		while(reply.hasRemaining()) {
			c = reply.getChar();
			if (c== 0) break;
			s += c; 
			}
		return s;
	}
	
	public void putString(ByteBuffer reply,String s) {
		for(int i=0; i<s.length(); i++) {
			char c= s.charAt(i);
			reply.putChar(c);
		}
		reply.putChar((char) 0);
	}

	public static void main(String[] arg) throws InterruptedException {
		TestTree me = new TestTree();
		me.test1();
	}

	public void test1() throws InterruptedException {
		ByteBuffer config = makeConfig();
		sleep(2000);
		System.out.println("Start Configure");
		Thread r1 = new Thread(new Congigure(1,config));
		r1.start();
		r1.join();
	}

	private ByteBuffer makeConfig() {
		ByteBuffer config = ByteBuffer.allocate(4096);
		putHostRun(config,"localhost",PORT);
		putHostRun(config,"localhost",PORT+1);
		putHostRun(config,"localhost",PORT+2);
		putHostRun(config,"localhost",PORT+3);
		putHostRun(config,"localhost",PORT+4);
		putHostRun(config,"localhost",PORT+5);
		putHostRun(config,"localhost",PORT+6);
		config.putInt(HOST_LIST_END_MARK);
		putTree(config,NO_HOST,1,2);
		putTree(config,0,3,4);
		putTree(config,0,5,6);
		putTree(config,1,NO_HOST,NO_HOST);
		putTree(config,1,NO_HOST,NO_HOST);
		putTree(config,2,NO_HOST,NO_HOST);
		putTree(config,2,NO_HOST,NO_HOST);
		config.putInt(CONNECT_LIST_END_MARK);
		config.flip();
		return config;
	}

	private void putHostRun(ByteBuffer config, String host, int port2) {
		putHost(config,host,port2);
		// should run on specified host using ssh 
		Thread s = new Thread(new TreeNode(id ++, port2));
		s.start();
		lindas.add(s);
	}
	
	private void putHost(ByteBuffer config, String s, int port2) {
		config.putInt(port2);
		putString(config, s);		
	}

	private void putTree(ByteBuffer config, int p, int l, int r) {
		config.putInt(p);
		config.putInt(l);
		config.putInt(r);
	}
}