changeset 27:f54dcbebde3a

topology manager work!
author kazz <kazz@cr.ie.u-ryukyu.ac.jp>
date Tue, 17 Jan 2012 03:52:39 +0900
parents 9c6b9e032338
children 98ab26e09a98
files src/alice/datasegment/RemoteDataSegmentManager.java src/alice/topology/manager/IncomingHosts.java src/alice/topology/manager/NodeInfo.java src/alice/topology/manager/StartTopologyManager.java src/alice/topology/manager/TopologyManager.java src/alice/topology/manager/TopologyManagerConfig.java src/alice/topology/node/IncomingConnectionInfo.java src/alice/topology/node/StartTopologyNode.java src/topology/manager/IncomingHosts.java src/topology/manager/NodeInfo.java src/topology/manager/StartTopologyManager.java src/topology/manager/TopologyManager.java src/topology/manager/TopologyManagerConfig.java
diffstat 13 files changed, 174 insertions(+), 176 deletions(-) [+]
line wrap: on
line diff
--- a/src/alice/datasegment/RemoteDataSegmentManager.java	Tue Jan 17 03:11:23 2012 +0900
+++ b/src/alice/datasegment/RemoteDataSegmentManager.java	Tue Jan 17 03:52:39 2012 +0900
@@ -31,9 +31,7 @@
 	public RemoteDataSegmentManager(String key, final String hostName, final int port) {
 		connection = new Connection();
 		final RemoteDataSegmentManager manager = this;
-		new Thread(replyThread, "RemoteDataSegmentManager-"
-				+ connection.socket.getInetAddress().getHostName()
-				+ ":" + connection.socket.getPort()).start();
+		new Thread(replyThread, "RemoteDataSegmentManager-" + key).start();
 		new Thread("Connect-" + key) {
 			public void run() {
 				boolean connect = true;
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/topology/manager/IncomingHosts.java	Tue Jan 17 03:52:39 2012 +0900
@@ -0,0 +1,59 @@
+package alice.topology.manager;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+
+import org.apache.log4j.Logger;
+import org.msgpack.MessagePack;
+import org.msgpack.type.ValueFactory;
+
+import alice.codesegment.CodeSegment;
+import alice.datasegment.CommandType;
+import alice.datasegment.DataSegment;
+import alice.datasegment.DataSegmentManager;
+import alice.datasegment.DataSegmentReceiver;
+import alice.topology.HostMessage;
+
+public class IncomingHosts extends CodeSegment {
+	
+	HashMap<String, LinkedList<NodeInfo>> topology;
+	LinkedList<String> nodeNames;
+	DataSegmentReceiver host = new DataSegmentReceiver(ids, CommandType.TAKE);
+	Logger logger = Logger.getLogger(IncomingHosts.class);
+	
+	public IncomingHosts(HashMap<String, LinkedList<NodeInfo>> topology, LinkedList<String> nodeNames) {
+		this.topology = topology;
+		this.nodeNames = nodeNames;
+	}
+
+	@Override
+	public void run() {
+		MessagePack msgpack = new MessagePack();
+		try {
+			HostMessage host = msgpack.convert(this.host.val, HostMessage.class);
+			String nodeName = nodeNames.poll();
+			DataSegmentManager manager = DataSegment.connect(nodeName, host.name, host.port);
+			manager.put("host", ValueFactory.createRawValue(nodeName));
+			LinkedList<NodeInfo> nodes = topology.get(nodeName);
+			for (NodeInfo nodeInfo : nodes) {
+				HostMessage newHost = new HostMessage(host.name, host.port, nodeInfo.connectionName);
+				ods.put("local", nodeInfo.sourceNodeName, msgpack.unconvert(newHost));
+			}
+		} catch (IOException e) {
+			logger.error("HostMessage format error");
+			e.printStackTrace();
+		}
+		
+		if (nodeNames.isEmpty()) {
+			// configuration finish
+			for (String key : topology.keySet()) {
+				ods.put("local", key, ValueFactory.createNilValue());
+			}
+		} else {
+			IncomingHosts cs = new IncomingHosts(topology, nodeNames);
+			cs.host.setKey("local", "host");
+		}
+	}
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/topology/manager/NodeInfo.java	Tue Jan 17 03:52:39 2012 +0900
@@ -0,0 +1,14 @@
+package alice.topology.manager;
+
+public class NodeInfo {
+	
+	public String sourceNodeName;
+	public String connectionName;
+	
+	public NodeInfo(String source, String connection) {
+		this.sourceNodeName = source;
+		this.connectionName = connection;
+		
+	}
+	
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/topology/manager/StartTopologyManager.java	Tue Jan 17 03:52:39 2012 +0900
@@ -0,0 +1,67 @@
+package alice.topology.manager;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+
+import org.apache.log4j.Logger;
+
+import com.alexmerz.graphviz.ParseException;
+import com.alexmerz.graphviz.Parser;
+import com.alexmerz.graphviz.objects.Edge;
+import com.alexmerz.graphviz.objects.Graph;
+import com.alexmerz.graphviz.objects.Node;
+
+import alice.codesegment.CodeSegment;
+
+public class StartTopologyManager extends CodeSegment {
+
+	TopologyManagerConfig conf;
+	Logger logger = Logger.getLogger(StartTopologyManager.class);
+
+	public StartTopologyManager(TopologyManagerConfig conf) {
+		this.conf = conf;
+	}
+
+	@Override
+	public void run() {
+		LinkedList<String> nodeNames = new LinkedList<String>();
+		HashMap<String, LinkedList<NodeInfo>> topology = new HashMap<String, LinkedList<NodeInfo>>();
+		try {
+			FileReader reader = new FileReader(new File(conf.confFilePath));
+			Parser parser = new Parser();
+			parser.parse(reader);
+			ArrayList<Graph> graphs = parser.getGraphs();
+			for (Graph graph : graphs) {
+				ArrayList<Node> nodes = graph.getNodes(false);
+				for (Node node : nodes) {
+					String nodeName = node.getId().getId();
+					nodeNames.add(nodeName);
+					topology.put(nodeName, new LinkedList<NodeInfo>());
+				}
+				ArrayList<Edge> edges = graph.getEdges();
+				for (Edge edge : edges) {
+					String connection = edge.getAttribute("label");
+					String source = edge.getSource().getNode().getId().getId();
+					String target = edge.getSource().getNode().getId().getId();
+					LinkedList<NodeInfo> sources = topology.get(target);
+					sources.add(new NodeInfo(source, connection));
+				}
+			}
+			
+		} catch (FileNotFoundException e) {
+			logger.error("File not found: " + conf.confFilePath);
+			e.printStackTrace();
+		} catch (ParseException e) {
+			logger.error("File format error: " + conf.confFilePath);
+			e.printStackTrace();
+		}
+		
+		IncomingHosts cs = new IncomingHosts(topology, nodeNames);
+		cs.host.setKey("local", "host");
+	}
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/topology/manager/TopologyManager.java	Tue Jan 17 03:52:39 2012 +0900
@@ -0,0 +1,13 @@
+package alice.topology.manager;
+
+import alice.daemon.AliceDaemon;
+
+public class TopologyManager {
+
+	public static void main(String[] args) {
+		TopologyManagerConfig conf = new TopologyManagerConfig(args);
+		new AliceDaemon(conf).listen();
+		new StartTopologyManager(conf).execute();
+	}
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/topology/manager/TopologyManagerConfig.java	Tue Jan 17 03:52:39 2012 +0900
@@ -0,0 +1,18 @@
+package alice.topology.manager;
+
+import alice.daemon.Config;
+
+public class TopologyManagerConfig extends Config {
+	
+	public String confFilePath;
+	
+	public TopologyManagerConfig(String[] args) {
+		super(args);
+		for (int i = 0; i < args.length; i++) {
+			if ("-conf".equals(args[i])) {
+				confFilePath = args[++i];
+			}
+		}
+	}
+
+}
--- a/src/alice/topology/node/IncomingConnectionInfo.java	Tue Jan 17 03:11:23 2012 +0900
+++ b/src/alice/topology/node/IncomingConnectionInfo.java	Tue Jan 17 03:52:39 2012 +0900
@@ -23,7 +23,7 @@
 
 	@Override
 	public void run() {
-		if (this.hostInfo.val.isNilValue()) {
+		if (this.hostInfo.val == null) {
 			System.out.println("Configuration finished");
 			if (clazz == null)
 				return;
--- a/src/alice/topology/node/StartTopologyNode.java	Tue Jan 17 03:11:23 2012 +0900
+++ b/src/alice/topology/node/StartTopologyNode.java	Tue Jan 17 03:52:39 2012 +0900
@@ -23,7 +23,7 @@
 	
 	@Override
 	public void run() {
-		DataSegmentManager manager = DataSegment.connect("topology_manager", conf.managerHostName, conf.managerPort);
+		DataSegmentManager manager = DataSegment.connect("manager", conf.managerHostName, conf.managerPort);
 		try {
 			HostMessage host;
 			host = new HostMessage(InetAddress.getLocalHost().getHostName(), conf.localPort);
--- a/src/topology/manager/IncomingHosts.java	Tue Jan 17 03:11:23 2012 +0900
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,59 +0,0 @@
-package topology.manager;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.LinkedList;
-
-import org.apache.log4j.Logger;
-import org.msgpack.MessagePack;
-import org.msgpack.type.ValueFactory;
-
-import alice.codesegment.CodeSegment;
-import alice.datasegment.CommandType;
-import alice.datasegment.DataSegment;
-import alice.datasegment.DataSegmentManager;
-import alice.datasegment.DataSegmentReceiver;
-import alice.topology.HostMessage;
-
-public class IncomingHosts extends CodeSegment {
-	
-	HashMap<String, LinkedList<NodeInfo>> topology;
-	LinkedList<String> nodeNames;
-	DataSegmentReceiver host = new DataSegmentReceiver(ids, CommandType.TAKE);
-	Logger logger = Logger.getLogger(IncomingHosts.class);
-	
-	public IncomingHosts(HashMap<String, LinkedList<NodeInfo>> topology, LinkedList<String> nodeNames) {
-		this.topology = topology;
-		this.nodeNames = nodeNames;
-	}
-
-	@Override
-	public void run() {
-		MessagePack msgpack = new MessagePack();
-		try {
-			HostMessage host = msgpack.convert(this.host.val, HostMessage.class);
-			String nodeName = nodeNames.poll();
-			DataSegmentManager manager = DataSegment.connect(nodeName, host.name, host.port);
-			manager.put("host", ValueFactory.createRawValue(nodeName));
-			LinkedList<NodeInfo> nodes = topology.get(nodeName);
-			for (NodeInfo nodeInfo : nodes) {
-				HostMessage newHost = new HostMessage(host.name, host.port, nodeInfo.connectionName);
-				ods.put("local", nodeInfo.sourceNodeName, msgpack.unconvert(newHost));
-			}
-		} catch (IOException e) {
-			logger.error("HostMessage format error");
-			e.printStackTrace();
-		}
-		
-		if (nodeNames.isEmpty()) {
-			// configuration finish
-			for (String key : topology.keySet()) {
-				ods.put("local", key, ValueFactory.createNilValue());
-			}
-		} else {
-			IncomingHosts cs = new IncomingHosts(topology, nodeNames);
-			cs.host.setKey("local", "host");
-		}
-	}
-
-}
--- a/src/topology/manager/NodeInfo.java	Tue Jan 17 03:11:23 2012 +0900
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,14 +0,0 @@
-package topology.manager;
-
-public class NodeInfo {
-	
-	public String sourceNodeName;
-	public String connectionName;
-	
-	public NodeInfo(String source, String connection) {
-		this.sourceNodeName = source;
-		this.connectionName = connection;
-		
-	}
-	
-}
--- a/src/topology/manager/StartTopologyManager.java	Tue Jan 17 03:11:23 2012 +0900
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,67 +0,0 @@
-package topology.manager;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-
-import org.apache.log4j.Logger;
-
-import com.alexmerz.graphviz.ParseException;
-import com.alexmerz.graphviz.Parser;
-import com.alexmerz.graphviz.objects.Edge;
-import com.alexmerz.graphviz.objects.Graph;
-import com.alexmerz.graphviz.objects.Node;
-
-import alice.codesegment.CodeSegment;
-
-public class StartTopologyManager extends CodeSegment {
-
-	TopologyManagerConfig conf;
-	Logger logger = Logger.getLogger(StartTopologyManager.class);
-
-	public StartTopologyManager(TopologyManagerConfig conf) {
-		conf = this.conf;
-	}
-
-	@Override
-	public void run() {
-		LinkedList<String> nodeNames = new LinkedList<String>();
-		HashMap<String, LinkedList<NodeInfo>> topology = new HashMap<String, LinkedList<NodeInfo>>();
-		try {
-			FileReader reader = new FileReader(new File(conf.confFilePath));
-			Parser parser = new Parser();
-			parser.parse(reader);
-			ArrayList<Graph> graphs = parser.getGraphs();
-			for (Graph graph : graphs) {
-				ArrayList<Node> nodes = graph.getNodes(false);
-				for (Node node : nodes) {
-					String nodeName = node.getId().getId();
-					nodeNames.add(nodeName);
-					topology.put(nodeName, new LinkedList<NodeInfo>());
-				}
-				ArrayList<Edge> edges = graph.getEdges();
-				for (Edge edge : edges) {
-					String connection = edge.getAttribute("label");
-					String source = edge.getSource().getNode().getId().getId();
-					String target = edge.getSource().getNode().getId().getId();
-					LinkedList<NodeInfo> sources = topology.get(target);
-					sources.add(new NodeInfo(source, connection));
-				}
-			}
-			
-		} catch (FileNotFoundException e) {
-			logger.error("File not found: " + conf.confFilePath);
-			e.printStackTrace();
-		} catch (ParseException e) {
-			logger.error("File format error: " + conf.confFilePath);
-			e.printStackTrace();
-		}
-		
-		IncomingHosts cs = new IncomingHosts(topology, nodeNames);
-		cs.host.setKey("local", "host");
-	}
-
-}
--- a/src/topology/manager/TopologyManager.java	Tue Jan 17 03:11:23 2012 +0900
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,13 +0,0 @@
-package topology.manager;
-
-import alice.daemon.AliceDaemon;
-
-public class TopologyManager {
-
-	public static void main(String[] args) {
-		TopologyManagerConfig conf = new TopologyManagerConfig(args);
-		new AliceDaemon(conf).listen();
-		new StartTopologyManager(conf).execute();
-	}
-
-}
--- a/src/topology/manager/TopologyManagerConfig.java	Tue Jan 17 03:11:23 2012 +0900
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,18 +0,0 @@
-package topology.manager;
-
-import alice.daemon.Config;
-
-public class TopologyManagerConfig extends Config {
-	
-	public String confFilePath;
-	
-	public TopologyManagerConfig(String[] args) {
-		super(args);
-		for (int i = 0; i < args.length; i++) {
-			if ("-conf".equals(args[i])) {
-				confFilePath = args[++i];
-			}
-		}
-	}
-
-}