changeset 87:8931a3e90c2a

ring implements
author one
date Thu, 11 Feb 2010 12:05:49 +0900
parents c0591636a71a
children 5d1189e9e420
files src/fdl/MetaReply.java src/fdl/test/debug/ConfigurationManagerEngine.java src/fdl/test/debug/MetaProtocolEngine.java
diffstat 3 files changed, 83 insertions(+), 22 deletions(-) [+]
line wrap: on
line diff
--- a/src/fdl/MetaReply.java	Tue Feb 09 06:14:52 2010 +0900
+++ b/src/fdl/MetaReply.java	Thu Feb 11 12:05:49 2010 +0900
@@ -52,7 +52,7 @@
 			ts.Out(null, command, data);
 			return true;
 		case PSX.PSX_UPDATE:
-			// not implemented
+			// TODO: not implemented
 			break;
 		}
 		return mode==PSX.PSX_ANSWER;
--- a/src/fdl/test/debug/ConfigurationManagerEngine.java	Tue Feb 09 06:14:52 2010 +0900
+++ b/src/fdl/test/debug/ConfigurationManagerEngine.java	Thu Feb 11 12:05:49 2010 +0900
@@ -9,6 +9,10 @@
 	public static final int DEFAULTPORT = 10000;
 
 	// tuple id
+	public static final int BODY = 100;
+	public static final int START = 101;
+	public static final int FINISH = 102;
+	
 	public static final int MANAGE = 60000;
 	public static final int DEBUG = 61000;
 	
@@ -19,6 +23,8 @@
 	public static final int RINGLEFT = DEBUG + 1;
 	public static final int RINGRIGHT = DEBUG + 2;
 	
+	public static final int DEBUGSTART = DEBUG + 1000;
+	
 	private int nodeNum = 0;
 	private MetaLinda ml;
 	private NodeInfo[] nodes;
@@ -67,6 +73,10 @@
 				ml.in(MANAGE, this);
 			} else {
 				print("All link configured!");
+				// 実験開始を通知
+				nodes[0].linda.out(START, ByteBuffer.wrap("test".getBytes()));
+				// DebugRing 開始を通知
+				nodes[0].linda.out(DEBUGSTART, ByteBuffer.wrap("relay,10,1024".getBytes()));
 			}
 		}
 
@@ -112,11 +122,6 @@
 			nodes[i].debugConnectionXML = debug.createXML();
 			nodes[i].linda.out(DEBUG, ByteBuffer.wrap(nodes[i].debugConnectionXML.getBytes()));
 			print(nodes[i].debugConnectionXML);
-//			try {
-//				nodes[i].linda.sync(1);
-//			} catch (IOException e) {
-//				e.printStackTrace();
-//			}
 		}
 	}
 	
@@ -143,11 +148,6 @@
 			nodes[i].debugRoutingXML = debug.createXML();
 			nodes[i].linda.out(DEBUG, ByteBuffer.wrap(nodes[i].debugRoutingXML.getBytes()));
 			print(nodes[i].debugRoutingXML);
-//			try {
-//				nodes[i].linda.sync(1);
-//			} catch (IOException e) {
-//				e.printStackTrace();
-//			}
 		}
 	}
 	
--- a/src/fdl/test/debug/MetaProtocolEngine.java	Tue Feb 09 06:14:52 2010 +0900
+++ b/src/fdl/test/debug/MetaProtocolEngine.java	Thu Feb 11 12:05:49 2010 +0900
@@ -29,9 +29,23 @@
 public class MetaProtocolEngine implements MetaEngine {
 	// Fields
 	public static final int DEFAULTPORT = 10000;
+	
+	public static final int BODY = 100;
+	public static final int START = 101;
+	public static final int FINISH = 102;
+	
 	public static final int MANAGE = 60000;
 	public static final int DEBUG = 61000;
 	
+	public static final int TREETOP = MANAGE + 1;
+	public static final int TREELEFT = MANAGE + 2;
+	public static final int TREERIGHT = MANAGE + 3;
+
+	public static final int RINGLEFT = DEBUG + 1;
+	public static final int RINGRIGHT = DEBUG + 2;
+	
+	public static final int DEBUGSTART = DEBUG + 1000;
+	
 	private MetaLinda ml;
 	private String localHostName;
 	private int localPort;
@@ -44,6 +58,8 @@
 	private int nodeId;
 	private HashMap<Integer, Routing> nodes;
 	
+	private int relayNum, relaySize, relayCounter;
+	
 	// Callback class
 	class AcceptXMLCallback implements PSXCallback {
 		int tid;
@@ -83,6 +99,10 @@
 			Element root = doc.getDocumentElement();
 			if(root.getTagName().equals("connections")) {
 				nodeId = new Integer(root.getAttribute("id")).intValue();
+				if (nodeId == 0) {
+					ml.in(START, new StartCallback());
+					ml.in(DEBUGSTART, new DebugStartCallback());
+				}
 				NodeList connections = root.getElementsByTagName("connection");
 				for (int i = 0; i < connections.getLength(); i++) {
 					Element connection = (Element)connections.item(i);
@@ -133,28 +153,74 @@
 	private class RoutingCallback implements PSXCallback {
 		int tid;
 		Routing routing;
+		
 		public RoutingCallback(int tid, Routing routing) {
 			this.tid = tid;
 			this.routing = routing;
+			ml.out(BODY, ByteBuffer.wrap("dummy".getBytes()));
 		} 
 		
 		public void callback(ByteBuffer reply) {
+			if (tid == TREETOP || tid == TREELEFT || tid == TREERIGHT) {
+				ml.in(BODY);
+				ml.out(BODY, reply);
+				print("Update body");
+			} else if (nodeId == 0 && tid == RINGLEFT) {
+				relayCounter++;
+				print(new Integer(relayCounter).toString() + " relay");
+				if (relayCounter >= relayNum) {
+					ml.in(tid, this);
+					return;
+				}
+			}
 			Iterator<Integer> it = routing.route.iterator();
 			while (it.hasNext()) {
 				Integer dstId = it.next();
 				Routing r = nodes.get(dstId);
 				r.linda.out(r.dstId, reply);
-//				try {
-//					r.linda.sync(1);
-//				} catch (IOException e) {
-//					e.printStackTrace();
-//				}
 				ml.in(tid, this);
 			}
 		}
 		
 	}
 	
+	private class StartCallback implements PSXCallback {
+
+		public void callback(ByteBuffer reply) {
+			Routing r;
+			
+			// 子があるならば、子にタプルを伝搬
+			if (nodes.containsKey(new Integer(TREERIGHT))) {
+				r = nodes.get(new Integer(TREERIGHT));
+				r.linda.out(r.dstId, reply);
+			}
+			if (nodes.containsKey(new Integer(TREELEFT))) {
+				r = nodes.get(new Integer(TREELEFT));
+				r.linda.out(r.dstId, reply);
+			}
+			ml.in(START, this);
+		}
+		
+	}
+	
+	private class DebugStartCallback implements PSXCallback {
+		public void callback(ByteBuffer reply) {
+			String[] commands = new String(reply.array()).split(",");
+			String command = commands[0];
+			if (command.equals("relay")) {
+				relayNum = new Integer(commands[1]).intValue();
+				relaySize = new Integer(commands[2]).intValue();
+				relayCounter = 0;
+				
+				print("relay num=" + new Integer(relayNum).toString() + " size=" + new Integer(relaySize).toString());
+				
+				Routing r = nodes.get(new Integer(RINGRIGHT));
+				r.linda.out(r.dstId, ByteBuffer.wrap(new byte[relaySize]));
+				ml.in(DEBUGSTART, this);
+			}
+		}
+	}
+	
 	// Constructor
 	public MetaProtocolEngine(int port, MetaLinda ml, String managerHostName, int managerPort) {
 		this.ml = ml;
@@ -167,7 +233,7 @@
 		} catch (UnknownHostException e) {
 			e.printStackTrace();
 		}
-		manager = connectServer(managerHostName, managerPort);
+		manager = connectServer(this.managerHostName, this.managerPort);
 		sendLocalHostName();
 	}
 
@@ -188,11 +254,6 @@
 		ByteBuffer local;
 		local = ByteBuffer.wrap((localHostName + ":" + new Integer(localPort).toString()).getBytes());
 		manager.out(MANAGE, local);
-//		try {
-//			manager.sync(1);
-//		} catch (IOException e) {
-//			e.printStackTrace();
-//		}
 	}
 
 	protected PSXLinda connectServer(String hostName, int port) {