# HG changeset patch # User kazz # Date 1326735627 -32400 # Node ID 50c75cb3de6083a7cd6ae4628fd227f4c66812d7 # Parent ebd91e607b63ec289f735bd725546402d258dd22 implements TopologyNode diff -r ebd91e607b63 -r 50c75cb3de60 src/alice/datasegment/RemoteDataSegmentManager.java --- a/src/alice/datasegment/RemoteDataSegmentManager.java Tue Jan 17 01:32:35 2012 +0900 +++ b/src/alice/datasegment/RemoteDataSegmentManager.java Tue Jan 17 02:40:27 2012 +0900 @@ -4,6 +4,7 @@ import java.net.InetSocketAddress; import java.nio.channels.SocketChannel; +import org.apache.log4j.Logger; import org.msgpack.type.Value; import alice.codesegment.CodeSegment; @@ -14,6 +15,7 @@ public class RemoteDataSegmentManager extends DataSegmentManager { Connection connection; + Logger logger = Logger.getLogger(RemoteDataSegmentManager.class); // TODO: delete this constructor later @Deprecated @@ -40,6 +42,9 @@ SocketChannel sc = SocketChannel.open(new InetSocketAddress(hostName, port)); connection.socket = sc.socket(); connect = false; + logger.info("Connect to " + + connection.socket.getInetAddress().getHostName() + + ":" + connection.socket.getPort()); } catch (IOException e) { try { Thread.sleep(500); diff -r ebd91e607b63 -r 50c75cb3de60 src/alice/topology/node/IncomingAbstractHostName.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/topology/node/IncomingAbstractHostName.java Tue Jan 17 02:40:27 2012 +0900 @@ -0,0 +1,18 @@ +package alice.topology.node; + +import alice.codesegment.CodeSegment; +import alice.datasegment.CommandType; +import alice.datasegment.DataSegmentReceiver; + +public class IncomingAbstractHostName extends CodeSegment { + + public DataSegmentReceiver absName = new DataSegmentReceiver(ids, CommandType.PEEK); + + @Override + public void run() { + String absName = this.absName.val.asRawValue().getString(); + IncomingConnectionInfo cs = new IncomingConnectionInfo(absName); + cs.hostInfo.setKey("manager", absName); + } + +} diff -r ebd91e607b63 -r 50c75cb3de60 src/alice/topology/node/IncomingConnectionInfo.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/topology/node/IncomingConnectionInfo.java Tue Jan 17 02:40:27 2012 +0900 @@ -0,0 +1,32 @@ +package alice.topology.node; + +import java.io.IOException; + +import org.msgpack.MessagePack; + +import alice.codesegment.CodeSegment; +import alice.datasegment.CommandType; +import alice.datasegment.DataSegment; +import alice.datasegment.DataSegmentReceiver; +import alice.topology.HostMessage; + +public class IncomingConnectionInfo extends CodeSegment { + + public DataSegmentReceiver hostInfo = new DataSegmentReceiver(ids, CommandType.TAKE); + public String absName; + public IncomingConnectionInfo(String absName) { + this.absName = absName; + } + + @Override + public void run() { + MessagePack msgpack = new MessagePack(); + try { + HostMessage hostInfo = msgpack.convert(this.hostInfo.val, HostMessage.class); + DataSegment.connect(hostInfo.connectionName, hostInfo.name, hostInfo.port); + } catch (IOException e) { + e.printStackTrace(); + } + } + +} diff -r ebd91e607b63 -r 50c75cb3de60 src/alice/topology/node/StartTopologyNode.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/topology/node/StartTopologyNode.java Tue Jan 17 02:40:27 2012 +0900 @@ -0,0 +1,40 @@ +package alice.topology.node; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; + +import org.msgpack.MessagePack; + +import alice.codesegment.CodeSegment; +import alice.datasegment.DataSegment; +import alice.datasegment.DataSegmentManager; +import alice.topology.HostMessage; + +public class StartTopologyNode extends CodeSegment { + + TopologyNodeConfig conf; + + public StartTopologyNode(TopologyNodeConfig conf) { + this.conf = conf; + } + + @Override + public void run() { + DataSegmentManager manager = DataSegment.connect("topology_manager", conf.managerHostName, conf.managerPort); + try { + HostMessage host; + host = new HostMessage(InetAddress.getLocalHost().getHostName(), conf.localPort); + MessagePack msgpack = new MessagePack(); + manager.put("host", msgpack.unconvert(host)); + } catch (UnknownHostException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + + IncomingAbstractHostName cs = new IncomingAbstractHostName(); + cs.absName.setKey("local", "host"); + } + +} diff -r ebd91e607b63 -r 50c75cb3de60 src/alice/topology/node/TopologyNode.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/topology/node/TopologyNode.java Tue Jan 17 02:40:27 2012 +0900 @@ -0,0 +1,17 @@ +package alice.topology.node; + +import alice.daemon.AliceDaemon; + +public class TopologyNode { + + public TopologyNode(String[] args, TopologyNodeConfig conf) { + new AliceDaemon(conf).listen(); + new StartTopologyNode(conf).execute(); + } + + public static void main(String[] args) { + TopologyNodeConfig conf = new TopologyNodeConfig(args); + TopologyNode node = new TopologyNode(args, conf); + } + +} diff -r ebd91e607b63 -r 50c75cb3de60 src/alice/topology/node/TopologyNodeConfig.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/topology/node/TopologyNodeConfig.java Tue Jan 17 02:40:27 2012 +0900 @@ -0,0 +1,21 @@ +package alice.topology.node; + +import alice.daemon.Config; + +public class TopologyNodeConfig extends Config { + + String managerHostName; + int managerPort; + + public TopologyNodeConfig(String[] args) { + super(args); + for (int i = 0; i< args.length; i++) { + if ("-host".equals(args[i])) { + managerHostName = args[++i]; + } else if ("-port".equals(args[i])) { + managerPort = Integer.parseInt(args[++i]); + } + } + } + +} diff -r ebd91e607b63 -r 50c75cb3de60 src/topology/manager/IncomingHosts.java --- a/src/topology/manager/IncomingHosts.java Tue Jan 17 01:32:35 2012 +0900 +++ b/src/topology/manager/IncomingHosts.java Tue Jan 17 02:40:27 2012 +0900 @@ -49,7 +49,6 @@ IncomingHosts cs = new IncomingHosts(topology, nodeNames); cs.host.setKey("local", "host"); } - } }