Mercurial > hg > FederatedLinda
view src/fdl/test/TestTree.java @ 103:7da06001aeb5 fuchita
add TestTree.java
author | one |
---|---|
date | Wed, 26 May 2010 17:55:40 +0900 |
parents | |
children | 25f7fc05be71 |
line wrap: on
line source
package fdl.test; import java.io.IOException; import java.nio.ByteBuffer; import java.util.LinkedList; import fdl.FDLindaServ; import fdl.FederatedLinda; import fdl.MetaEngine; import fdl.MetaLinda; import fdl.PSX; import fdl.PSXCallback; import fdl.PSXLinda; import fdl.PSXReply; public class TestTree { public static final int PORT = 10000; public static final int ConnectReuest = 100; private static final int Down = 101; public static final int Up = 102; private static final int NO_HOST = -1; private static final int HOST_LIST_END_MARK = -2; private static final int CONNECT_LIST_END_MARK = -3; public LinkedList<Thread> lindas = new LinkedList<Thread>(); public int id = 0; public boolean debug = true; class TreeNode implements Runnable { public int id, port; public TreeNode(int id, int port) { this.id = id; this.port = port; } public void run() { String[] args = { // "-d", "-p",Integer.toString(port)}; FDLindaServ.main(new TreeMetaProtocolEngine(id,port), args); } } class TreeMetaProtocolEngine implements MetaEngine { boolean running = true; LinkedList<ByteBuffer> leftWaiter = new LinkedList<ByteBuffer>(); LinkedList<ByteBuffer> rightWaiter = new LinkedList<ByteBuffer>(); PSXLinda parent, left, right; String parentHost, leftHost,rightHost; int parentPort=0, leftPort=0, rightPort=0; public int nodeId = 0; public int nodePort = 0; public TreeMetaProtocolEngine(int id, int port) { nodeId = id; nodePort = port; } public void mainLoop(final MetaLinda ml) { ml.in(PSX.META_STOP, new PSXCallback() {public void callback(ByteBuffer reply) { running = false;}}); ml.in(ConnectReuest, new PSXCallback() {public void callback(ByteBuffer reply) { connectChildren(ml, reply); } }); while(running) { ml.sync(0); } } private void setHostPort(ByteBuffer reply) { parentPort = reply.getInt(); leftPort = reply.getInt(); rightPort = reply.getInt(); if (parentPort!=0) parentHost = getString(reply); if (leftPort!=0) leftHost = getString(reply); if (rightPort!=0) rightHost = getString(reply); } /** * parent * | * self * / \ * left right * * @param ml * @param reply */ private void connectChildren(final MetaLinda ml, ByteBuffer reply) { setHostPort(reply); try { if (parentPort!=0) parent = ml.fdl.open(parentHost, parentPort); if (leftPort!=0) left = ml.fdl.open(leftHost, leftPort); if (rightPort!=0) right = ml.fdl.open(rightHost, rightPort); if (debug) System.out.println("Connect this port="+nodePort+" parent="+parentPort+" left="+leftPort+" right="+rightPort); ml.in(Down, new PSXCallback() {public void callback(ByteBuffer reply) { ml.in(Down,this); if (left==null) { if (debug) System.out.println("Reached in Leaf "+nodeId+" value = "+parentPort); // Leaf case ByteBuffer answer = ByteBuffer.allocate(10); answer.putInt(parentPort); answer.flip(); ml.out(Up, answer); return; } if (debug) System.out.println("Pass it to the children from "+nodeId + " to "+ leftPort + " and " + rightPort + "."); ByteBuffer copy = reply.duplicate(); left.out(Down, reply); right.out(Down, copy); } }); if (leftPort!=0) left.in(Up, new PSXCallback() {public void callback(ByteBuffer reply) { if (debug) System.out.println("Up from left at"+nodeId); left.in(Up,this); leftWaiter.add(reply); checkSend(ml); } }); if (rightPort!=0) right.in(Up, new PSXCallback() {public void callback(ByteBuffer reply) { if (debug) System.out.println("Up from right at"+nodeId); right.in(Up,this); rightWaiter.add(reply); checkSend(ml); } }); } catch (IOException e) { } } private void checkSend(MetaLinda ml) { if (leftWaiter.isEmpty()||rightWaiter.isEmpty()) return; ByteBuffer out = ByteBuffer.allocate(10); int value = leftWaiter.poll().getInt()+rightWaiter.poll().getInt(); if (parent!=null) { if (debug) System.out.println("Up the vluae "+value+" from "+nodeId); out.putInt(value); out.flip(); ml.out(Up, out); } else { // Top node case System.out.println("Top level node gets "+value); } } } class Congigure implements Runnable { public String id; public ByteBuffer config; public FederatedLinda fdl; public LinkedList<PSXLinda> psxs = new LinkedList<PSXLinda>(); private LinkedList<String> hosts = new LinkedList<String>(); private LinkedList<Integer> ports = new LinkedList<Integer>(); public Congigure(int id,ByteBuffer config) { this.id = "Configure"+id; this.config = config; } public void run() { String[] args = {id}; main(args); } public void main(String[] arg) { final String name = arg[0]; try { fdl = FederatedLinda.init(); readConfigure(); fdl.sync(1000); startTest(); stop(); } catch (IOException e) { System.err.println(name+": Communication failure."); } } public void startTest() { System.out.println("StartTest"); try { PSXLinda psx = psxs.get(0); for(int i=3;i<10;i++) { sendData(psx,Down,i); psx.sync(1000); } sleep(1000); } catch (IOException e) { e.printStackTrace(); } } public void stop() throws IOException { ByteBuffer data = ByteBuffer.allocate(10); for(PSXLinda psx:psxs) { psx.out(PSX.META_STOP, data.duplicate()); psx.sync(1); } } public void openLinda(String host, int port) throws IOException { FederatedLinda fdl; PSXLinda psx; PSXReply r; fdl = FederatedLinda.init(); psx = fdl.open(host,port); r = psx.in(65535); fdl.sync(1); System.out.println(host+" port "+port +": Connected."); int cnt=0; while(!r.ready()) { // psx.sync(1000); psx.sync(10); System.out.println(host+" port "+port +": Waiting...."+(cnt++)); } print_id(r); return ; } public void readConfigure() throws IOException { int mode = 0; int host = 0; while(config.hasRemaining()) { int port = config.getInt(); if (mode==0) { if (port==HOST_LIST_END_MARK) { mode = 1; continue; } String hostname = getString(config); psxs.add(fdl.open(hostname,port)); hosts.add(hostname); ports.add(port); } else { if (port == CONNECT_LIST_END_MARK) return; connect(host++, port,config.getInt(),config.getInt()); } } } private void connect(int host, int parent, int left, int right) { PSXLinda p = psxs.get(host); ByteBuffer out = ByteBuffer.allocate(4096); out.putInt(parent!=NO_HOST? ports.get(parent):0); out.putInt(left!=NO_HOST? ports.get(left):0); out.putInt(right!=NO_HOST? ports.get(right):0); if (parent!=NO_HOST) putString(out,hosts.get(parent)); if (left!=NO_HOST) putString(out,hosts.get(left)); if (right!=NO_HOST) putString(out,hosts.get(right)); out.flip(); p.out(ConnectReuest, out); } } public synchronized void sleep(int time) { try { wait(time); } catch (InterruptedException e) { e.printStackTrace(); } } public void read_wait(PSXLinda psx, PSXReply reply, String mesg) throws IOException { while(!reply.ready()) psx.sync(10); System.out.println(mesg); System.out.println(reply.getData().getInt()); System.out.println(""); } public ByteBuffer sendData(PSXLinda psx,int id, int n) { ByteBuffer data = ByteBuffer.allocate(10); data.putInt(n); data.flip(); psx.out(id,data); return data; } public void in_wait(PSXLinda psx, int i) throws IOException { PSXReply r = psx.in(i); while(! r.ready()) { psx.sync(10); } return; } public void print_id (PSXReply ans) throws IOException { ByteBuffer r = ans.getData(); System.out.print("ID = "); System.out.write(r.array()); System.out.println(""); } public String getString(ByteBuffer reply) { char c; String s = ""; while(reply.hasRemaining()) { c = reply.getChar(); if (c== 0) break; s += c; } return s; } public void putString(ByteBuffer reply,String s) { for(int i=0; i<s.length(); i++) { char c= s.charAt(i); reply.putChar(c); } reply.putChar((char) 0); } public static void main(String[] arg) throws InterruptedException { TestTree me = new TestTree(); me.test1(); } public void test1() throws InterruptedException { ByteBuffer config = makeConfig(); sleep(2000); System.out.println("Start Configure"); Thread r1 = new Thread(new Congigure(1,config)); r1.start(); r1.join(); } private ByteBuffer makeConfig() { ByteBuffer config = ByteBuffer.allocate(4096); putHostRun(config,"localhost",PORT); putHostRun(config,"localhost",PORT+1); putHostRun(config,"localhost",PORT+2); putHostRun(config,"localhost",PORT+3); putHostRun(config,"localhost",PORT+4); putHostRun(config,"localhost",PORT+5); putHostRun(config,"localhost",PORT+6); config.putInt(HOST_LIST_END_MARK); putTree(config,NO_HOST,1,2); putTree(config,0,3,4); putTree(config,0,5,6); putTree(config,1,NO_HOST,NO_HOST); putTree(config,1,NO_HOST,NO_HOST); putTree(config,2,NO_HOST,NO_HOST); putTree(config,2,NO_HOST,NO_HOST); config.putInt(CONNECT_LIST_END_MARK); config.flip(); return config; } private void putHostRun(ByteBuffer config, String host, int port2) { putHost(config,host,port2); // should run on specified host using ssh Thread s = new Thread(new TreeNode(id ++, port2)); s.start(); lindas.add(s); } private void putHost(ByteBuffer config, String s, int port2) { config.putInt(port2); putString(config, s); } private void putTree(ByteBuffer config, int p, int l, int r) { config.putInt(p); config.putInt(l); config.putInt(r); } }