# HG changeset patch # User one # Date 1274864140 -32400 # Node ID 7da06001aeb578bd376b00195104ecdcc638d1da # Parent 3b000c4a4d315e402f25a3b12445e8b691abed9b add TestTree.java diff -r 3b000c4a4d31 -r 7da06001aeb5 src/fdl/test/TestTree.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/test/TestTree.java Wed May 26 17:55:40 2010 +0900 @@ -0,0 +1,373 @@ +package fdl.test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.LinkedList; + +import fdl.FDLindaServ; +import fdl.FederatedLinda; +import fdl.MetaEngine; +import fdl.MetaLinda; +import fdl.PSX; +import fdl.PSXCallback; +import fdl.PSXLinda; +import fdl.PSXReply; + + +public class TestTree { + + public static final int PORT = 10000; + public static final int ConnectReuest = 100; + private static final int Down = 101; + public static final int Up = 102; + private static final int NO_HOST = -1; + private static final int HOST_LIST_END_MARK = -2; + private static final int CONNECT_LIST_END_MARK = -3; + public LinkedList lindas = new LinkedList(); + public int id = 0; + public boolean debug = true; + + class TreeNode implements Runnable { + public int id, port; + public TreeNode(int id, int port) { this.id = id; this.port = port; } + public void run() { + String[] args = { + // "-d", + "-p",Integer.toString(port)}; + FDLindaServ.main(new TreeMetaProtocolEngine(id,port), args); + } + } + + class TreeMetaProtocolEngine implements MetaEngine { + boolean running = true; + LinkedList leftWaiter = new LinkedList(); + LinkedList rightWaiter = new LinkedList(); + PSXLinda parent, left, right; + String parentHost, leftHost,rightHost; + int parentPort=0, leftPort=0, rightPort=0; + public int nodeId = 0; + public int nodePort = 0; + + public TreeMetaProtocolEngine(int id, int port) { + nodeId = id; + nodePort = port; + } + + public void mainLoop(final MetaLinda ml) { + ml.in(PSX.META_STOP, new PSXCallback() {public void callback(ByteBuffer reply) { + running = false;}}); + ml.in(ConnectReuest, new PSXCallback() {public void callback(ByteBuffer reply) { + connectChildren(ml, reply); + } + }); + while(running) { + ml.sync(0); + } + } + + private void setHostPort(ByteBuffer reply) { + parentPort = reply.getInt(); + leftPort = reply.getInt(); + rightPort = reply.getInt(); + if (parentPort!=0) parentHost = getString(reply); + if (leftPort!=0) leftHost = getString(reply); + if (rightPort!=0) rightHost = getString(reply); + } + + /** + * parent + * | + * self + * / \ + * left right + * + * @param ml + * @param reply + */ + private void connectChildren(final MetaLinda ml, ByteBuffer reply) { + setHostPort(reply); + try { + if (parentPort!=0) parent = ml.fdl.open(parentHost, parentPort); + if (leftPort!=0) left = ml.fdl.open(leftHost, leftPort); + if (rightPort!=0) right = ml.fdl.open(rightHost, rightPort); + + if (debug) System.out.println("Connect this port="+nodePort+" parent="+parentPort+" left="+leftPort+" right="+rightPort); + ml.in(Down, new PSXCallback() {public void callback(ByteBuffer reply) { + ml.in(Down,this); + if (left==null) { + if (debug) System.out.println("Reached in Leaf "+nodeId+" value = "+parentPort); + // Leaf case + ByteBuffer answer = ByteBuffer.allocate(10); + answer.putInt(parentPort); + answer.flip(); + ml.out(Up, answer); + return; + } + if (debug) System.out.println("Pass it to the children from "+nodeId + " to "+ leftPort + " and " + rightPort + "."); + ByteBuffer copy = reply.duplicate(); + left.out(Down, reply); + right.out(Down, copy); + } + + }); + if (leftPort!=0) + left.in(Up, new PSXCallback() {public void callback(ByteBuffer reply) { + if (debug) System.out.println("Up from left at"+nodeId); + left.in(Up,this); + leftWaiter.add(reply); + checkSend(ml); + } + + }); + if (rightPort!=0) + right.in(Up, new PSXCallback() {public void callback(ByteBuffer reply) { + if (debug) System.out.println("Up from right at"+nodeId); + right.in(Up,this); + rightWaiter.add(reply); + checkSend(ml); + } + }); + } catch (IOException e) { + } + } + + private void checkSend(MetaLinda ml) { + if (leftWaiter.isEmpty()||rightWaiter.isEmpty()) return; + ByteBuffer out = ByteBuffer.allocate(10); + int value = leftWaiter.poll().getInt()+rightWaiter.poll().getInt(); + if (parent!=null) { + if (debug) System.out.println("Up the vluae "+value+" from "+nodeId); + out.putInt(value); + out.flip(); + ml.out(Up, out); + } else { + // Top node case + System.out.println("Top level node gets "+value); + } + } + } + + + class Congigure implements Runnable { + public String id; + public ByteBuffer config; + public FederatedLinda fdl; + public LinkedList psxs = new LinkedList(); + private LinkedList hosts = new LinkedList(); + private LinkedList ports = new LinkedList(); + public Congigure(int id,ByteBuffer config) { this.id = "Configure"+id; + this.config = config; + } + public void run() { + String[] args = {id}; + main(args); + } + + public void main(String[] arg) { + final String name = arg[0]; + try { + fdl = FederatedLinda.init(); + readConfigure(); + fdl.sync(1000); + startTest(); + stop(); + } catch (IOException e) { + System.err.println(name+": Communication failure."); + } + } + + + public void startTest() { + System.out.println("StartTest"); + try { + PSXLinda psx = psxs.get(0); + for(int i=3;i<10;i++) { + sendData(psx,Down,i); + psx.sync(1000); + } + sleep(1000); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public void stop() throws IOException { + ByteBuffer data = ByteBuffer.allocate(10); + for(PSXLinda psx:psxs) { + psx.out(PSX.META_STOP, data.duplicate()); + psx.sync(1); + } + } + + public void openLinda(String host, int port) throws IOException { + FederatedLinda fdl; + PSXLinda psx; + PSXReply r; + fdl = FederatedLinda.init(); + psx = fdl.open(host,port); + r = psx.in(65535); + fdl.sync(1); + System.out.println(host+" port "+port +": Connected."); + int cnt=0; + while(!r.ready()) { + // psx.sync(1000); + psx.sync(10); + System.out.println(host+" port "+port +": Waiting...."+(cnt++)); + } + print_id(r); + + return ; + } + + public void readConfigure() throws IOException { + int mode = 0; + int host = 0; + while(config.hasRemaining()) { + int port = config.getInt(); + if (mode==0) { + if (port==HOST_LIST_END_MARK) { + mode = 1; + continue; + } + String hostname = getString(config); + + psxs.add(fdl.open(hostname,port)); + hosts.add(hostname); + ports.add(port); + + } else { + if (port == CONNECT_LIST_END_MARK) return; + connect(host++, port,config.getInt(),config.getInt()); + } + } + } + + private void connect(int host, int parent, int left, int right) { + PSXLinda p = psxs.get(host); + ByteBuffer out = ByteBuffer.allocate(4096); + + out.putInt(parent!=NO_HOST? ports.get(parent):0); + out.putInt(left!=NO_HOST? ports.get(left):0); + out.putInt(right!=NO_HOST? ports.get(right):0); + if (parent!=NO_HOST) putString(out,hosts.get(parent)); + if (left!=NO_HOST) putString(out,hosts.get(left)); + if (right!=NO_HOST) putString(out,hosts.get(right)); + out.flip(); + p.out(ConnectReuest, out); + } + + } + + public synchronized void sleep(int time) { + try { + wait(time); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public void read_wait(PSXLinda psx, PSXReply reply, String mesg) throws IOException { + while(!reply.ready()) psx.sync(10); + System.out.println(mesg); + System.out.println(reply.getData().getInt()); + System.out.println(""); + } + + public ByteBuffer sendData(PSXLinda psx,int id, int n) { + ByteBuffer data = ByteBuffer.allocate(10); + data.putInt(n); + data.flip(); + psx.out(id,data); + return data; + } + + public void in_wait(PSXLinda psx, int i) throws IOException { + PSXReply r = psx.in(i); + while(! r.ready()) { + psx.sync(10); + } + return; + } + + public void print_id (PSXReply ans) throws IOException { + ByteBuffer r = ans.getData(); + System.out.print("ID = "); + System.out.write(r.array()); + System.out.println(""); + } + + + public String getString(ByteBuffer reply) { + char c; + String s = ""; + while(reply.hasRemaining()) { + c = reply.getChar(); + if (c== 0) break; + s += c; + } + return s; + } + + public void putString(ByteBuffer reply,String s) { + for(int i=0; i