# HG changeset patch # User kazz # Date 1326728427 -32400 # Node ID 2ca2d961a8d2f10c4622a066b35374417926e711 # Parent 145667a554ada0f0d3a7816af28eccaa0ec68875 implements outline of TopologyManager diff -r 145667a554ad -r 2ca2d961a8d2 .classpath --- a/.classpath Sun Jan 15 19:02:01 2012 +0900 +++ b/.classpath Tue Jan 17 00:40:27 2012 +0900 @@ -7,5 +7,6 @@ + diff -r 145667a554ad -r 2ca2d961a8d2 lib/com.alexmerz.graphviz.jar Binary file lib/com.alexmerz.graphviz.jar has changed diff -r 145667a554ad -r 2ca2d961a8d2 src/alice/daemon/AliceDaemon.java --- a/src/alice/daemon/AliceDaemon.java Sun Jan 15 19:02:01 2012 +0900 +++ b/src/alice/daemon/AliceDaemon.java Tue Jan 17 00:40:27 2012 +0900 @@ -20,8 +20,8 @@ ServerSocketChannel ssChannel = ServerSocketChannel.open(); ServerSocket ss = ssChannel.socket(); ss.setReuseAddress(true); - ss.bind(new InetSocketAddress(InetAddress.getLocalHost(), conf.port)); - acceptThread = new AcceptThread(ss, "ACCEPT" + conf.port); + ss.bind(new InetSocketAddress(InetAddress.getLocalHost(), conf.localPort)); + acceptThread = new AcceptThread(ss, "ACCEPT" + conf.localPort); acceptThread.start(); } catch (IOException e) { diff -r 145667a554ad -r 2ca2d961a8d2 src/alice/daemon/Config.java --- a/src/alice/daemon/Config.java Sun Jan 15 19:02:01 2012 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,24 +0,0 @@ -package alice.daemon; - -public class Config { - - public int port = 10000; - public String hostname; - public int connectPort = 10000; - public String key; - - public Config(String[] args) { - for (int i = 0; i< args.length; i++) { - if ("-p".equals(args[i])) { - port = Integer.parseInt(args[++i]); - } else if ("-h".equals(args[i])) { - hostname = args[++i]; - } else if ("-cp".equals(args[i])) { - connectPort = Integer.parseInt(args[++i]); - } else if ("-key".equals(args[i])) { - key = args[++i]; - } - } - } - -} diff -r 145667a554ad -r 2ca2d961a8d2 src/alice/test/codesegment/RemoteIncrement.java --- a/src/alice/test/codesegment/RemoteIncrement.java Sun Jan 15 19:02:01 2012 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,25 +0,0 @@ -package alice.test.codesegment; - -import org.msgpack.type.ValueFactory; - -import alice.codesegment.CodeSegment; -import alice.datasegment.CommandType; -import alice.datasegment.DataSegmentReceiver; - -public class RemoteIncrement extends CodeSegment { - - public DataSegmentReceiver num = new DataSegmentReceiver(ids, CommandType.TAKE); - - @Override - public void run() { - int num = this.num.val.asIntegerValue().getInt(); - System.out.println("[CodeSegment] " + num++); - if (num == 10) System.exit(0); - - RemoteIncrement cs = new RemoteIncrement(); - cs.num.setKey("remote", "num"); - - ods.put("local", "num", ValueFactory.createIntegerValue(num)); - } - -} diff -r 145667a554ad -r 2ca2d961a8d2 src/alice/test/codesegment/RemoteStartCodeSegment.java --- a/src/alice/test/codesegment/RemoteStartCodeSegment.java Sun Jan 15 19:02:01 2012 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,19 +0,0 @@ -package alice.test.codesegment; - -import org.msgpack.type.Value; -import org.msgpack.type.ValueFactory; - -import alice.codesegment.CodeSegment; - -public class RemoteStartCodeSegment extends CodeSegment { - - @Override - public void run() { - RemoteIncrement cs = new RemoteIncrement(); - cs.num.setKey("remote", "num"); - - Value num = ValueFactory.createIntegerValue(0); - ods.put("local", "num", num); - } - -} diff -r 145667a554ad -r 2ca2d961a8d2 src/alice/test/codesegment/StartCodeSegment.java --- a/src/alice/test/codesegment/StartCodeSegment.java Sun Jan 15 19:02:01 2012 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,22 +0,0 @@ -package alice.test.codesegment; - -import org.msgpack.type.Value; -import org.msgpack.type.ValueFactory; - -import alice.codesegment.CodeSegment; - -public class StartCodeSegment extends CodeSegment { - - @Override - public void run() { - System.out.println("run StartCodeSegment"); - - TestCodeSegment cs = new TestCodeSegment(); - cs.arg1.setKey("local", "key1"); - System.out.println("create TestCodeSegment"); - - Value val = ValueFactory.createRawValue("String data"); - ods.update("local", "key1", val); - } - -} diff -r 145667a554ad -r 2ca2d961a8d2 src/alice/test/codesegment/TestCodeSegment.java --- a/src/alice/test/codesegment/TestCodeSegment.java Sun Jan 15 19:02:01 2012 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,32 +0,0 @@ -package alice.test.codesegment; - -import org.msgpack.type.Value; -import org.msgpack.type.ValueFactory; - -import alice.codesegment.CodeSegment; -import alice.datasegment.CommandType; -import alice.datasegment.DataSegmentReceiver; - -public class TestCodeSegment extends CodeSegment { - - DataSegmentReceiver arg1 = new DataSegmentReceiver(ids, CommandType.PEEK); - - @Override - public void run() { - System.out.println("index = " + arg1.index); - System.out.println("data = " + arg1.val); - System.out.println(arg1.val.getType()); - - if (arg1.index == 10) { - System.exit(0); - return; - } - - TestCodeSegment cs = new TestCodeSegment(); - cs.arg1.setKey("local", "key1", arg1.index); - - Value val = ValueFactory.createRawValue("String data"); - ods.update("local", "key1", val); - } - -} diff -r 145667a554ad -r 2ca2d961a8d2 src/alice/test/codesegment/TestLocalAlice.java --- a/src/alice/test/codesegment/TestLocalAlice.java Sun Jan 15 19:02:01 2012 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,8 +0,0 @@ -package alice.test.codesegment; - -public class TestLocalAlice { - public static void main(String args[]) { - new StartCodeSegment().execute(); - } - -} diff -r 145667a554ad -r 2ca2d961a8d2 src/alice/test/codesegment/TestRemoteAlice.java --- a/src/alice/test/codesegment/TestRemoteAlice.java Sun Jan 15 19:02:01 2012 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,40 +0,0 @@ -package alice.test.codesegment; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.channels.SocketChannel; - -import alice.daemon.AliceDaemon; -import alice.daemon.Config; -import alice.daemon.Connection; -import alice.datasegment.DataSegment; -import alice.datasegment.RemoteDataSegmentManager; - -public class TestRemoteAlice { - - public static void main(String[] args) { - Config conf = new Config(args); - - new AliceDaemon(conf).listen(); - - boolean connect = true; - do { - try { - SocketChannel sc = SocketChannel.open(new InetSocketAddress(conf.hostname, conf.connectPort)); - Connection connection = new Connection(sc.socket()); - RemoteDataSegmentManager manager = new RemoteDataSegmentManager(connection); - DataSegment.regist(conf.key, manager); - connect = false; - } catch (IOException e) { - try { - Thread.sleep(500); - } catch (InterruptedException e1) { - e1.printStackTrace(); - } - } - } while (connect); - - new RemoteStartCodeSegment().execute(); - } - -} diff -r 145667a554ad -r 2ca2d961a8d2 src/alice/test/codesegment/local/StartCodeSegment.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/test/codesegment/local/StartCodeSegment.java Tue Jan 17 00:40:27 2012 +0900 @@ -0,0 +1,22 @@ +package alice.test.codesegment.local; + +import org.msgpack.type.Value; +import org.msgpack.type.ValueFactory; + +import alice.codesegment.CodeSegment; + +public class StartCodeSegment extends CodeSegment { + + @Override + public void run() { + System.out.println("run StartCodeSegment"); + + TestCodeSegment cs = new TestCodeSegment(); + cs.arg1.setKey("local", "key1"); + System.out.println("create TestCodeSegment"); + + Value val = ValueFactory.createRawValue("String data"); + ods.update("local", "key1", val); + } + +} diff -r 145667a554ad -r 2ca2d961a8d2 src/alice/test/codesegment/local/TestCodeSegment.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/test/codesegment/local/TestCodeSegment.java Tue Jan 17 00:40:27 2012 +0900 @@ -0,0 +1,32 @@ +package alice.test.codesegment.local; + +import org.msgpack.type.Value; +import org.msgpack.type.ValueFactory; + +import alice.codesegment.CodeSegment; +import alice.datasegment.CommandType; +import alice.datasegment.DataSegmentReceiver; + +public class TestCodeSegment extends CodeSegment { + + DataSegmentReceiver arg1 = new DataSegmentReceiver(ids, CommandType.PEEK); + + @Override + public void run() { + System.out.println("index = " + arg1.index); + System.out.println("data = " + arg1.val); + System.out.println(arg1.val.getType()); + + if (arg1.index == 10) { + System.exit(0); + return; + } + + TestCodeSegment cs = new TestCodeSegment(); + cs.arg1.setKey("local", "key1", arg1.index); + + Value val = ValueFactory.createRawValue("String data"); + ods.update("local", "key1", val); + } + +} diff -r 145667a554ad -r 2ca2d961a8d2 src/alice/test/codesegment/local/TestLocalAlice.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/test/codesegment/local/TestLocalAlice.java Tue Jan 17 00:40:27 2012 +0900 @@ -0,0 +1,8 @@ +package alice.test.codesegment.local; + +public class TestLocalAlice { + public static void main(String args[]) { + new StartCodeSegment().execute(); + } + +} diff -r 145667a554ad -r 2ca2d961a8d2 src/alice/test/codesegment/remote/RemoteIncrement.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/test/codesegment/remote/RemoteIncrement.java Tue Jan 17 00:40:27 2012 +0900 @@ -0,0 +1,25 @@ +package alice.test.codesegment.remote; + +import org.msgpack.type.ValueFactory; + +import alice.codesegment.CodeSegment; +import alice.datasegment.CommandType; +import alice.datasegment.DataSegmentReceiver; + +public class RemoteIncrement extends CodeSegment { + + public DataSegmentReceiver num = new DataSegmentReceiver(ids, CommandType.TAKE); + + @Override + public void run() { + int num = this.num.val.asIntegerValue().getInt(); + System.out.println("[CodeSegment] " + num++); + if (num == 10) System.exit(0); + + RemoteIncrement cs = new RemoteIncrement(); + cs.num.setKey("remote", "num"); + + ods.put("local", "num", ValueFactory.createIntegerValue(num)); + } + +} diff -r 145667a554ad -r 2ca2d961a8d2 src/alice/test/codesegment/remote/RemoteStartCodeSegment.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/test/codesegment/remote/RemoteStartCodeSegment.java Tue Jan 17 00:40:27 2012 +0900 @@ -0,0 +1,19 @@ +package alice.test.codesegment.remote; + +import org.msgpack.type.Value; +import org.msgpack.type.ValueFactory; + +import alice.codesegment.CodeSegment; + +public class RemoteStartCodeSegment extends CodeSegment { + + @Override + public void run() { + RemoteIncrement cs = new RemoteIncrement(); + cs.num.setKey("remote", "num"); + + Value num = ValueFactory.createIntegerValue(0); + ods.put("local", "num", num); + } + +} diff -r 145667a554ad -r 2ca2d961a8d2 src/alice/test/codesegment/remote/TestRemoteAlice.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/test/codesegment/remote/TestRemoteAlice.java Tue Jan 17 00:40:27 2012 +0900 @@ -0,0 +1,39 @@ +package alice.test.codesegment.remote; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.channels.SocketChannel; + +import alice.daemon.AliceDaemon; +import alice.daemon.Connection; +import alice.datasegment.DataSegment; +import alice.datasegment.RemoteDataSegmentManager; + +public class TestRemoteAlice { + + public static void main(String[] args) { + TestRemoteConfig conf = new TestRemoteConfig(args); + + new AliceDaemon(conf).listen(); + + boolean connect = true; + do { + try { + SocketChannel sc = SocketChannel.open(new InetSocketAddress(conf.hostname, conf.connectPort)); + Connection connection = new Connection(sc.socket()); + RemoteDataSegmentManager manager = new RemoteDataSegmentManager(connection); + DataSegment.regist(conf.key, manager); + connect = false; + } catch (IOException e) { + try { + Thread.sleep(500); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + } + } while (connect); + + new RemoteStartCodeSegment().execute(); + } + +} diff -r 145667a554ad -r 2ca2d961a8d2 src/alice/test/codesegment/remote/TestRemoteConfig.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/test/codesegment/remote/TestRemoteConfig.java Tue Jan 17 00:40:27 2012 +0900 @@ -0,0 +1,24 @@ +package alice.test.codesegment.remote; + +import alice.daemon.Config; + +public class TestRemoteConfig extends Config { + + public String hostname; + public int connectPort = 10000; + public String key; + + public TestRemoteConfig(String[] args) { + super(args); + for (int i = 0; i< args.length; i++) { + if ("-h".equals(args[i])) { + hostname = args[++i]; + } else if ("-cp".equals(args[i])) { + connectPort = Integer.parseInt(args[++i]); + } else if ("-key".equals(args[i])) { + key = args[++i]; + } + } + } + +} diff -r 145667a554ad -r 2ca2d961a8d2 src/alice/topology/HostMessage.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/topology/HostMessage.java Tue Jan 17 00:40:27 2012 +0900 @@ -0,0 +1,25 @@ +package alice.topology; + +import org.msgpack.annotation.Message; +import org.msgpack.annotation.Optional; + +@Message +public class HostMessage { + + public String name; + public int port; + @Optional public String connectionName; + + public HostMessage() { } + public HostMessage(String name, int port) { + this.name = name; + this.port = port; + } + + public HostMessage(String name, int port, String connectionName) { + this.name = name; + this.port = port; + this.connectionName = connectionName; + } + +} diff -r 145667a554ad -r 2ca2d961a8d2 src/topology/manager/IncomingHosts.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/topology/manager/IncomingHosts.java Tue Jan 17 00:40:27 2012 +0900 @@ -0,0 +1,48 @@ +package topology.manager; + +import java.io.IOException; +import java.util.HashMap; +import java.util.LinkedList; + +import org.apache.log4j.Logger; +import org.msgpack.MessagePack; + +import alice.codesegment.CodeSegment; +import alice.datasegment.CommandType; +import alice.datasegment.DataSegmentReceiver; +import alice.topology.HostMessage; + +public class IncomingHosts extends CodeSegment { + + HashMap topology; + LinkedList nodeNames; + DataSegmentReceiver host = new DataSegmentReceiver(ids, CommandType.TAKE); + Logger logger = Logger.getLogger(IncomingHosts.class); + + public IncomingHosts(HashMap topology, LinkedList nodeNames) { + this.topology = topology; + this.nodeNames = nodeNames; + } + + @Override + public void run() { + MessagePack msgpack = new MessagePack(); + try { + HostMessage host = msgpack.convert(this.host.val, HostMessage.class); + String nodeName = nodeNames.poll(); + + // TODO: send nodeName to node + + } catch (IOException e) { + logger.error("HostMessage format error"); + e.printStackTrace(); + } + + if (!nodeNames.isEmpty()) { + IncomingHosts cs = new IncomingHosts(topology, nodeNames); + cs.host.setKey("local", "host"); + } + + } + +} diff -r 145667a554ad -r 2ca2d961a8d2 src/topology/manager/NodeInfo.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/topology/manager/NodeInfo.java Tue Jan 17 00:40:27 2012 +0900 @@ -0,0 +1,14 @@ +package topology.manager; + +public class NodeInfo { + + public String sourceNodeName; + public String connectionName; + + public NodeInfo(String source, String connection) { + this.sourceNodeName = source; + this.connectionName = connection; + + } + +} diff -r 145667a554ad -r 2ca2d961a8d2 src/topology/manager/StartTopologyManager.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/topology/manager/StartTopologyManager.java Tue Jan 17 00:40:27 2012 +0900 @@ -0,0 +1,64 @@ +package topology.manager; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; + +import org.apache.log4j.Logger; + +import com.alexmerz.graphviz.ParseException; +import com.alexmerz.graphviz.Parser; +import com.alexmerz.graphviz.objects.Edge; +import com.alexmerz.graphviz.objects.Graph; +import com.alexmerz.graphviz.objects.Node; + +import alice.codesegment.CodeSegment; + +public class StartTopologyManager extends CodeSegment { + + TopologyManagerConfig conf; + Logger logger = Logger.getLogger(StartTopologyManager.class); + + public StartTopologyManager(TopologyManagerConfig conf) { + conf = this.conf; + } + + @Override + public void run() { + LinkedList nodeNames = new LinkedList(); + HashMap topology = new HashMap(); + try { + FileReader reader = new FileReader(new File(conf.confFilePath)); + Parser parser = new Parser(); + parser.parse(reader); + ArrayList graphs = parser.getGraphs(); + for (Graph graph : graphs) { + ArrayList nodes = graph.getNodes(false); + for (Node node : nodes) { + nodeNames.add(node.getId().getId()); + } + ArrayList edges = graph.getEdges(); + for (Edge edge : edges) { + String connection = edge.getAttribute("label"); + String source = edge.getSource().getNode().getId().getId(); + String target = edge.getSource().getNode().getId().getId(); + topology.put(target, new NodeInfo(source, connection)); + } + } + + } catch (FileNotFoundException e) { + logger.error("File not found: " + conf.confFilePath); + e.printStackTrace(); + } catch (ParseException e) { + logger.error("File format error: " + conf.confFilePath); + e.printStackTrace(); + } + + IncomingHosts cs = new IncomingHosts(topology, nodeNames); + cs.host.setKey("local", "host"); + } + +} diff -r 145667a554ad -r 2ca2d961a8d2 src/topology/manager/TopologyManager.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/topology/manager/TopologyManager.java Tue Jan 17 00:40:27 2012 +0900 @@ -0,0 +1,13 @@ +package topology.manager; + +import alice.daemon.AliceDaemon; + +public class TopologyManager { + + public static void main(String[] args) { + TopologyManagerConfig conf = new TopologyManagerConfig(args); + new AliceDaemon(conf).listen(); + new StartTopologyManager(conf).execute(); + } + +} diff -r 145667a554ad -r 2ca2d961a8d2 src/topology/manager/TopologyManagerConfig.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/topology/manager/TopologyManagerConfig.java Tue Jan 17 00:40:27 2012 +0900 @@ -0,0 +1,18 @@ +package topology.manager; + +import alice.daemon.Config; + +public class TopologyManagerConfig extends Config { + + public String confFilePath; + + public TopologyManagerConfig(String[] args) { + super(args); + for (int i = 0; i < args.length; i++) { + if ("-conf".equals(args[i])) { + confFilePath = args[++i]; + } + } + } + +}