Mercurial > hg > FederatedLinda
view src/fdl/test/TestTree.java @ 122:ad73eacf560a default tip
remove warning
author | e095732 |
---|---|
date | Thu, 07 Feb 2013 22:32:26 +0900 |
parents | 3c532f771c56 |
children |
line wrap: on
line source
package fdl.test; import java.io.IOException; import java.nio.ByteBuffer; import java.util.LinkedList; import fdl.FDLindaServ; import fdl.FederatedLinda; import fdl.MetaEngine; import fdl.MetaLinda; import fdl.PSX; import fdl.PSXCallback; import fdl.PSXLinda; import fdl.PSXReply; public class TestTree { /** * Tree Walk Example * configure tree structured federated Linda * run message from tree root * sum up data from the leaves */ public static final int PORT = 10000; /** * Tuple id for the Protocol */ public static final int ConnectReuest = 100; private static final int Down = 101; public static final int Up = 102; /** * Configuration binary format separator */ private static final int NO_HOST = -1; private static final int HOST_LIST_END_MARK = -2; private static final int CONNECT_LIST_END_MARK = -3; /** * Global */ public LinkedList<Thread> lindas = new LinkedList<Thread>(); // not used. public int id = 0; public boolean debug = true; class TreeNode implements Runnable { /** * Linda Server Thread */ public int id, port; public TreeNode(int id, int port) { this.id = id; this.port = port; } public void run() { String[] args = { // "-d", // Debug Message Flag "-p",Integer.toString(port)}; FDLindaServ.main(new TreeMetaProtocolEngine(id,port), args); } } /** * @author kono * Tuple transfer engine for a federated linda node * MetaLinda is passed to mainLoop in this class */ class TreeMetaProtocolEngine implements MetaEngine { /** * Meta Engine for Configuration and Tree Walk */ boolean running = true; LinkedList<ByteBuffer> leftWaiter = new LinkedList<ByteBuffer>(); LinkedList<ByteBuffer> rightWaiter = new LinkedList<ByteBuffer>(); PSXLinda parent, left, right; String parentHost, leftHost,rightHost; int parentPort=0, leftPort=0, rightPort=0; public int nodeId = 0; public int nodePort = 0; public MetaLinda ml; public TreeMetaProtocolEngine(int id, int port) { nodeId = id; nodePort = port; } /** * Meta Engine Main Loop */ public void mainLoop(MetaLinda meta) { ml = meta; /* handler of Server STOP command */ ml.in(PSX.META_STOP, new PSXCallback() {public void callback(ByteBuffer reply) { running = false;}}); /* handler of Configuration */ ml.in(ConnectReuest, new PSXCallback() {public void callback(ByteBuffer reply) { configureConnection(reply); engineStart(); } }); while(running) { ml.sync(0); // Wait forever } } /** * Configuration of node according to configuration packet * @param reply */ private void configureConnection(ByteBuffer reply) { parentPort = reply.getInt(); leftPort = reply.getInt(); rightPort = reply.getInt(); if (parentPort!=0) parentHost = getString(reply); if (leftPort!=0) leftHost = getString(reply); if (rightPort!=0) rightHost = getString(reply); } /** * * Open Inter-Server Connections and start tuple communication Engine * * parent * | * self = ml (Meta Linda Server) * / \ * left right * * @param ml Meta Linda Server * @param reply a contents of configuration packet */ private void engineStart() { try { /** * connect to neighbors */ if (parentPort!=0) parent = ml.fdl.open(parentHost, parentPort); if (leftPort!=0) left = ml.fdl.open(leftHost, leftPort); if (rightPort!=0) right = ml.fdl.open(rightHost, rightPort); if (debug) System.out.println("Connect this port="+nodePort+" parent="+parentPort+" left="+leftPort+" right="+rightPort); /* * Handle Downward tree walk */ ml.in(Down, new PSXCallback() {public void callback(ByteBuffer reply) { ml.in(Down,this); // start this call back again for next packet if (left==null) { if (debug) System.out.println("Reached in Leaf "+nodeId+" value = "+parentPort); // Leaf case ByteBuffer answer = ByteBuffer.allocate(10); answer.putInt(parentPort); answer.flip(); ml.out(Up, answer); return; } // intermediate node if (debug) System.out.println("Pass it to the children from "+nodeId + " to "+ leftPort + " and " + rightPort + "."); ByteBuffer copy = reply.duplicate(); left.out(Down, reply); right.out(Down, copy); } }); /** * Handle upward tree walk */ if (leftPort!=0) left.in(Up, new PSXCallback() {public void callback(ByteBuffer reply) { if (debug) System.out.println("Up from left at"+nodeId); left.in(Up,this); leftWaiter.add(reply); checkSend(ml); } }); if (rightPort!=0) right.in(Up, new PSXCallback() {public void callback(ByteBuffer reply) { if (debug) System.out.println("Up from right at"+nodeId); right.in(Up,this); rightWaiter.add(reply); checkSend(ml); } }); } catch (IOException e) { } } /** * Wait for all message from the sub node and * calc sum and pass it to the parent. * @param ml */ private void checkSend(MetaLinda ml) { if (leftWaiter.isEmpty()||rightWaiter.isEmpty()) return; ByteBuffer out = ByteBuffer.allocate(10); int value = leftWaiter.poll().getInt()+rightWaiter.poll().getInt(); if (parent!=null) { // intermediate node if (debug) System.out.println("Up the vluae "+value+" from "+nodeId); out.putInt(value); out.flip(); ml.out(Up, out); } else { // Top node case System.out.println("Top level node gets "+value); } } } /** * @author kono * Configuration Manager * Read Configuration * start linda servers * send connect message to the servers * then send start message * stop all the server with Meta stop message */ class Configure implements Runnable { public String id; public ByteBuffer config; public FederatedLinda fdl; public LinkedList<PSXLinda> psxs = new LinkedList<PSXLinda>(); private LinkedList<String> hosts = new LinkedList<String>(); private LinkedList<Integer> ports = new LinkedList<Integer>(); public Configure(int id,ByteBuffer config) { this.id = "Configure"+id; this.config = config; } public void run() { String[] args = {id}; main(args); } /** * Configuration main * @param arg */ public void main(String[] arg) { final String name = arg[0]; try { fdl = FederatedLinda.init(); readConfigure(); fdl.sync(1000); startTest(); stop(); } catch (IOException e) { System.err.println(name+": Communication failure."); } } /** * Send start messages to configured servers */ public void startTest() { System.out.println("StartTest"); try { PSXLinda psx = psxs.get(0); for(int i=3;i<10;i++) { sendData(psx,Down,i); psx.sync(1000); } sleep(1000); } catch (IOException e) { e.printStackTrace(); } } /** * Send Server Stop message * @throws IOException */ public void stop() throws IOException { ByteBuffer data = ByteBuffer.allocate(10); for(PSXLinda psx:psxs) { psx.out(PSX.META_STOP, data.duplicate()); psx.sync(1); } } /** * This is not used. Open with connection id * @param host * @param port * @throws IOException */ public void openLinda(String host, int port) throws IOException { FederatedLinda fdl; PSXLinda psx; PSXReply r; fdl = FederatedLinda.init(); psx = fdl.open(host,port); r = psx.in(65535); fdl.sync(1); System.out.println(host+" port "+port +": Connected."); int cnt=0; while(!r.ready()) { // psx.sync(1000); psx.sync(10); System.out.println(host+" port "+port +": Waiting...."+(cnt++)); } print_id(r); return ; } /** * Read connection p configuation in a ByteBuffer * @throws IOException * * Easy binary format, we should use more general format such as * XML or messagePack. */ public void readConfigure() throws IOException { int mode = 0; int host = 0; while(config.hasRemaining()) { int port = config.getInt(); if (mode==0) { // Former part // List of host and server name if (port==HOST_LIST_END_MARK) { mode = 1; continue; } String hostname = getString(config); psxs.add(fdl.open(hostname,port)); hosts.add(hostname); ports.add(port); } else { // Later part // create interconnection configuration packet // send it to the server if (port == CONNECT_LIST_END_MARK) return; // connect this, it's parent, left and right connect(host++, port,config.getInt(),config.getInt()); } } } /** * Make a configuration packet for a federated linda node * @param host No. of host to configure * @param parent * @param left * @param right */ private void connect(int host, int parent, int left, int right) { PSXLinda p = psxs.get(host); ByteBuffer out = ByteBuffer.allocate(4096); out.putInt(parent!=NO_HOST? ports.get(parent):0); out.putInt(left!=NO_HOST? ports.get(left):0); out.putInt(right!=NO_HOST? ports.get(right):0); if (parent!=NO_HOST) putString(out,hosts.get(parent)); if (left!=NO_HOST) putString(out,hosts.get(left)); if (right!=NO_HOST) putString(out,hosts.get(right)); out.flip(); p.out(ConnectReuest, out); } } public synchronized void sleep(int time) { try { wait(time); } catch (InterruptedException e) { e.printStackTrace(); } } public void read_wait(PSXLinda psx, PSXReply reply, String mesg) throws IOException { while(!reply.ready()) psx.sync(10); System.out.println(mesg); System.out.println(reply.getData().getInt()); System.out.println(""); } public ByteBuffer sendData(PSXLinda psx,int id, int n) { ByteBuffer data = ByteBuffer.allocate(10); data.putInt(n); data.flip(); psx.out(id,data); return data; } public void in_wait(PSXLinda psx, int i) throws IOException { PSXReply r = psx.in(i); while(! r.ready()) { psx.sync(10); } return; } public void print_id (PSXReply ans) throws IOException { ByteBuffer r = ans.getData(); System.out.print("ID = "); System.out.write(r.array()); System.out.println(""); } /** * read null terminated String in ByteBuffer * @param reply * @return */ public String getString(ByteBuffer reply) { byte c; int position = reply.position(); int len = 0; while(reply.hasRemaining()) { c = reply.get(); len ++; if (c== 0) break; } return new String(reply.array(),position,len-1); } /** * put String in ByteBuffer with null * @param reply * @param s */ public void putString(ByteBuffer reply,String s) { reply.put(s.getBytes()); reply.put((byte)0); } public static void main(String[] arg) throws InterruptedException { TestTree me = new TestTree(); me.test1(); } public void test1() throws InterruptedException { // Make Configuration First ByteBuffer config = makeConfig(); sleep(2000); // Wait for servers' start System.out.println("Start Configure"); // Start configuration server and start test Thread r1 = new Thread(new Configure(1,config)); r1.start(); r1.join(); } /** * Make Configuration * start servers now * No connections yet * @return */ private ByteBuffer makeConfig() { ByteBuffer config = ByteBuffer.allocate(4096); putHostRun(config,"127.0.0.1",PORT); putHostRun(config,"127.0.0.1",PORT+1); putHostRun(config,"127.0.0.1",PORT+2); putHostRun(config,"127.0.0.1",PORT+3); putHostRun(config,"127.0.0.1",PORT+4); putHostRun(config,"127.0.0.1",PORT+5); putHostRun(config,"127.0.0.1",PORT+6); config.putInt(HOST_LIST_END_MARK); putTree(config,NO_HOST,1,2); putTree(config,0,3,4); putTree(config,0,5,6); putTree(config,1,NO_HOST,NO_HOST); putTree(config,1,NO_HOST,NO_HOST); putTree(config,2,NO_HOST,NO_HOST); putTree(config,2,NO_HOST,NO_HOST); config.putInt(CONNECT_LIST_END_MARK); config.flip(); return config; } private void putHostRun(ByteBuffer config, String host, int port2) { putHost(config,host,port2); // should run on specified host using ssh Thread s = new Thread(new TreeNode(id ++, port2)); s.start(); lindas.add(s); } private void putHost(ByteBuffer config, String s, int port2) { config.putInt(port2); putString(config, s); } private void putTree(ByteBuffer config, int p, int l, int r) { config.putInt(p); config.putInt(l); config.putInt(r); } } /* end */