changeset 25:50c75cb3de60

implements TopologyNode
author kazz <kazz@cr.ie.u-ryukyu.ac.jp>
date Tue, 17 Jan 2012 02:40:27 +0900
parents ebd91e607b63
children 9c6b9e032338
files src/alice/datasegment/RemoteDataSegmentManager.java src/alice/topology/node/IncomingAbstractHostName.java src/alice/topology/node/IncomingConnectionInfo.java src/alice/topology/node/StartTopologyNode.java src/alice/topology/node/TopologyNode.java src/alice/topology/node/TopologyNodeConfig.java src/topology/manager/IncomingHosts.java
diffstat 7 files changed, 133 insertions(+), 1 deletions(-) [+]
line wrap: on
line diff
--- a/src/alice/datasegment/RemoteDataSegmentManager.java	Tue Jan 17 01:32:35 2012 +0900
+++ b/src/alice/datasegment/RemoteDataSegmentManager.java	Tue Jan 17 02:40:27 2012 +0900
@@ -4,6 +4,7 @@
 import java.net.InetSocketAddress;
 import java.nio.channels.SocketChannel;
 
+import org.apache.log4j.Logger;
 import org.msgpack.type.Value;
 
 import alice.codesegment.CodeSegment;
@@ -14,6 +15,7 @@
 public class RemoteDataSegmentManager extends DataSegmentManager {
 	
 	Connection connection;
+	Logger logger = Logger.getLogger(RemoteDataSegmentManager.class);
 	
 	// TODO: delete this constructor later
 	@Deprecated
@@ -40,6 +42,9 @@
 						SocketChannel sc = SocketChannel.open(new InetSocketAddress(hostName, port));
 						connection.socket = sc.socket();
 						connect = false;
+						logger.info("Connect to "
+								+ connection.socket.getInetAddress().getHostName()
+								+ ":" + connection.socket.getPort());
 					} catch (IOException e) {
 						try {
 							Thread.sleep(500);
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/topology/node/IncomingAbstractHostName.java	Tue Jan 17 02:40:27 2012 +0900
@@ -0,0 +1,18 @@
+package alice.topology.node;
+
+import alice.codesegment.CodeSegment;
+import alice.datasegment.CommandType;
+import alice.datasegment.DataSegmentReceiver;
+
+public class IncomingAbstractHostName extends CodeSegment {
+
+	public DataSegmentReceiver absName = new DataSegmentReceiver(ids, CommandType.PEEK);
+	
+	@Override
+	public void run() {
+		String absName = this.absName.val.asRawValue().getString();
+		IncomingConnectionInfo cs = new IncomingConnectionInfo(absName);
+		cs.hostInfo.setKey("manager", absName);
+	}
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/topology/node/IncomingConnectionInfo.java	Tue Jan 17 02:40:27 2012 +0900
@@ -0,0 +1,32 @@
+package alice.topology.node;
+
+import java.io.IOException;
+
+import org.msgpack.MessagePack;
+
+import alice.codesegment.CodeSegment;
+import alice.datasegment.CommandType;
+import alice.datasegment.DataSegment;
+import alice.datasegment.DataSegmentReceiver;
+import alice.topology.HostMessage;
+
+public class IncomingConnectionInfo extends CodeSegment {
+
+	public DataSegmentReceiver hostInfo = new DataSegmentReceiver(ids, CommandType.TAKE);
+	public String absName;
+	public IncomingConnectionInfo(String absName) {
+		this.absName = absName;
+	}
+
+	@Override
+	public void run() {
+		MessagePack msgpack = new MessagePack();
+		try {
+			HostMessage hostInfo = msgpack.convert(this.hostInfo.val, HostMessage.class);
+			DataSegment.connect(hostInfo.connectionName, hostInfo.name, hostInfo.port);
+		} catch (IOException e) {
+			e.printStackTrace();
+		}
+	}
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/topology/node/StartTopologyNode.java	Tue Jan 17 02:40:27 2012 +0900
@@ -0,0 +1,40 @@
+package alice.topology.node;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import org.msgpack.MessagePack;
+
+import alice.codesegment.CodeSegment;
+import alice.datasegment.DataSegment;
+import alice.datasegment.DataSegmentManager;
+import alice.topology.HostMessage;
+
+public class StartTopologyNode extends CodeSegment {
+
+	TopologyNodeConfig conf;
+	
+	public StartTopologyNode(TopologyNodeConfig conf) {
+		this.conf = conf;
+	}
+	
+	@Override
+	public void run() {
+		DataSegmentManager manager = DataSegment.connect("topology_manager", conf.managerHostName, conf.managerPort);
+		try {
+			HostMessage host;
+			host = new HostMessage(InetAddress.getLocalHost().getHostName(), conf.localPort);
+			MessagePack msgpack = new MessagePack();
+			manager.put("host", msgpack.unconvert(host));
+		} catch (UnknownHostException e) {
+			e.printStackTrace();
+		} catch (IOException e) {
+			e.printStackTrace();
+		}
+		
+		IncomingAbstractHostName cs = new IncomingAbstractHostName();
+		cs.absName.setKey("local", "host");
+	}
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/topology/node/TopologyNode.java	Tue Jan 17 02:40:27 2012 +0900
@@ -0,0 +1,17 @@
+package alice.topology.node;
+
+import alice.daemon.AliceDaemon;
+
+public class TopologyNode {
+
+	public TopologyNode(String[] args, TopologyNodeConfig conf) {
+		new AliceDaemon(conf).listen();
+		new StartTopologyNode(conf).execute();
+	}
+	
+	public static void main(String[] args) {
+		TopologyNodeConfig conf = new TopologyNodeConfig(args);
+		TopologyNode node = new TopologyNode(args, conf);
+	}
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/topology/node/TopologyNodeConfig.java	Tue Jan 17 02:40:27 2012 +0900
@@ -0,0 +1,21 @@
+package alice.topology.node;
+
+import alice.daemon.Config;
+
+public class TopologyNodeConfig extends Config {
+	
+	String managerHostName;
+	int managerPort;
+	
+	public TopologyNodeConfig(String[] args) {
+		super(args);
+		for (int i = 0; i< args.length; i++) {
+			if ("-host".equals(args[i])) {
+				managerHostName = args[++i];
+			} else if ("-port".equals(args[i])) {
+				managerPort = Integer.parseInt(args[++i]);
+			}
+		}
+	}
+
+}
--- a/src/topology/manager/IncomingHosts.java	Tue Jan 17 01:32:35 2012 +0900
+++ b/src/topology/manager/IncomingHosts.java	Tue Jan 17 02:40:27 2012 +0900
@@ -49,7 +49,6 @@
 			IncomingHosts cs = new IncomingHosts(topology, nodeNames);
 			cs.host.setKey("local", "host");
 		}
-
 	}
 
 }