# HG changeset patch # User kazz # Date 1326784383 -32400 # Node ID b5a21baf0b074c73f3d1b08d4a767423f7a34174 # Parent 414fcce36e90244ed442aa2a119ff13d60f7baaa implements RingTopology diff -r 414fcce36e90 -r b5a21baf0b07 scripts/ring.dot --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/scripts/ring.dot Tue Jan 17 16:13:03 2012 +0900 @@ -0,0 +1,8 @@ +digraph test { + node0 -> node1 [label="right"] + node0 -> node2 [label="left"] + node1 -> node2 [label="right"] + node1 -> node0 [label="left"] + node2 -> node0 [label="right"] + node2 -> node1 [label="left"] +} diff -r 414fcce36e90 -r b5a21baf0b07 src/alice/codesegment/OutputDataSegment.java --- a/src/alice/codesegment/OutputDataSegment.java Tue Jan 17 14:17:13 2012 +0900 +++ b/src/alice/codesegment/OutputDataSegment.java Tue Jan 17 16:13:03 2012 +0900 @@ -1,6 +1,7 @@ package alice.codesegment; import org.msgpack.type.Value; +import org.msgpack.type.ValueFactory; import alice.datasegment.DataSegment; @@ -14,4 +15,25 @@ DataSegment.get(managerKey).update(key, val); } + public void put(String managerKey, String key, String val) { + DataSegment.get(managerKey).put(key, ValueFactory.createRawValue(val)); + } + + public void update(String managerKey, String key, String val) { + DataSegment.get(managerKey).update(key, ValueFactory.createRawValue(val)); + } + + public void put(String managerKey, String key, int val) { + DataSegment.get(managerKey).put(key, ValueFactory.createIntegerValue(val)); + } + + public void update(String managerKey, String key, int val) { + DataSegment.get(managerKey).update(key, ValueFactory.createIntegerValue(val)); + } + + public void finish(String managerKey) { + DataSegment.get(managerKey).finish(); + } + + } diff -r 414fcce36e90 -r b5a21baf0b07 src/alice/daemon/OutboundTcpConnection.java --- a/src/alice/daemon/OutboundTcpConnection.java Tue Jan 17 14:17:13 2012 +0900 +++ b/src/alice/daemon/OutboundTcpConnection.java Tue Jan 17 16:13:03 2012 +0900 @@ -6,6 +6,7 @@ import org.msgpack.MessagePack; import alice.datasegment.Command; +import alice.datasegment.CommandType; public class OutboundTcpConnection extends Thread { @@ -23,7 +24,12 @@ MessagePack msgpack = new MessagePack(); while (true) { try { - CommandMessage cmdMsg = convert(connection.sendQueue.take()); + Command cmd = connection.sendQueue.take(); + if (cmd.type == CommandType.FINISH) { + System.exit(0); + return; + } + CommandMessage cmdMsg = convert(cmd); byte[] buf = msgpack.write(cmdMsg); ByteBuffer buffer = ByteBuffer.allocateDirect(4 + buf.length); buffer.putInt(buf.length); @@ -35,7 +41,6 @@ } catch (IOException e) { e.printStackTrace(); } - } } diff -r 414fcce36e90 -r b5a21baf0b07 src/alice/datasegment/CommandType.java --- a/src/alice/datasegment/CommandType.java Tue Jan 17 14:17:13 2012 +0900 +++ b/src/alice/datasegment/CommandType.java Tue Jan 17 16:13:03 2012 +0900 @@ -8,7 +8,8 @@ PEEK, TAKE, REMOVE, - REPLY; + REPLY, + FINISH; public int id; public static HashMap hash = new HashMap(); diff -r 414fcce36e90 -r b5a21baf0b07 src/alice/datasegment/DataSegmentManager.java --- a/src/alice/datasegment/DataSegmentManager.java Tue Jan 17 14:17:13 2012 +0900 +++ b/src/alice/datasegment/DataSegmentManager.java Tue Jan 17 16:13:03 2012 +0900 @@ -42,5 +42,6 @@ } public abstract void peek(DataSegmentReceiver receiver, String key, int index, CodeSegment cs); public abstract void remove(String key); + public abstract void finish(); } diff -r 414fcce36e90 -r b5a21baf0b07 src/alice/datasegment/DataSegmentReceiver.java --- a/src/alice/datasegment/DataSegmentReceiver.java Tue Jan 17 14:17:13 2012 +0900 +++ b/src/alice/datasegment/DataSegmentReceiver.java Tue Jan 17 16:13:03 2012 +0900 @@ -1,6 +1,7 @@ package alice.datasegment; import org.msgpack.type.Value; +import org.msgpack.type.ValueType; import alice.codesegment.InputDataSegment; @@ -34,4 +35,18 @@ ids.setKey(); } + public String asString() { + if (val.getType() == ValueType.RAW) { + return val.asRawValue().getString(); + } + return null; + } + + public int asInteger() { + if (val.getType() == ValueType.INTEGER) { + return val.asIntegerValue().getInt(); + } + return 0; + } + } diff -r 414fcce36e90 -r b5a21baf0b07 src/alice/datasegment/LocalDataSegmentManager.java --- a/src/alice/datasegment/LocalDataSegmentManager.java Tue Jan 17 14:17:13 2012 +0900 +++ b/src/alice/datasegment/LocalDataSegmentManager.java Tue Jan 17 16:13:03 2012 +0900 @@ -61,5 +61,9 @@ DataSegmentKey dataSegmentKey = getDataSegmentKey(key); dataSegmentKey.addCommand(new Command(CommandType.REMOVE, null, null, null, 0, 0, replyQueue, null, null)); } - + + @Override public void finish() { + System.exit(0); + } + } diff -r 414fcce36e90 -r b5a21baf0b07 src/alice/datasegment/RemoteDataSegmentManager.java --- a/src/alice/datasegment/RemoteDataSegmentManager.java Tue Jan 17 14:17:13 2012 +0900 +++ b/src/alice/datasegment/RemoteDataSegmentManager.java Tue Jan 17 16:13:03 2012 +0900 @@ -87,5 +87,9 @@ public void remove(String key) { connection.sendCommand(new Command(CommandType.REMOVE, null, key, null, 0, 0, null, null, null)); } + + public void finish() { + connection.sendCommand(new Command(CommandType.FINISH, null, null, null, 0, 0, null, null, null)); + } } diff -r 414fcce36e90 -r b5a21baf0b07 src/alice/test/topology/ring/CheckMyName.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/test/topology/ring/CheckMyName.java Tue Jan 17 16:13:03 2012 +0900 @@ -0,0 +1,31 @@ +package alice.test.topology.ring; + +import alice.codesegment.CodeSegment; +import alice.datasegment.CommandType; +import alice.datasegment.DataSegmentReceiver; + +public class CheckMyName extends CodeSegment { + + DataSegmentReceiver host = new DataSegmentReceiver(ids, CommandType.PEEK); + + @Override + public void run() { + String host = this.host.asString(); + if (host.equals("node0")) { + System.out.println("I am first node"); + ods.put("local", "counter", 0); + FirstRingMessagePassing cs1 = new FirstRingMessagePassing(); + cs1.counter.setKey("local", "counter"); + RingFinish cs2 = new RingFinish("manager"); + cs2.finish.setKey("local", "finish"); + + } else { + System.out.println("I am normal node"); + RingMessagePassing cs1 = new RingMessagePassing(); + cs1.counter.setKey("local", "counter"); + RingFinish cs2 = new RingFinish("right"); + cs2.finish.setKey("local", "finish"); + } + } + +} diff -r 414fcce36e90 -r b5a21baf0b07 src/alice/test/topology/ring/FirstRingMessagePassing.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/test/topology/ring/FirstRingMessagePassing.java Tue Jan 17 16:13:03 2012 +0900 @@ -0,0 +1,33 @@ +package alice.test.topology.ring; + +import org.msgpack.type.ValueFactory; + +import alice.codesegment.CodeSegment; +import alice.datasegment.CommandType; +import alice.datasegment.DataSegmentReceiver; + +public class FirstRingMessagePassing extends CodeSegment { + + DataSegmentReceiver counter = new DataSegmentReceiver(ids, CommandType.TAKE); + + @Override + public void run() { + int counter = this.counter.asInteger(); + System.out.println(++counter); + try { + Thread.sleep(200); + } catch (InterruptedException e) { + e.printStackTrace(); + } + ods.put("right", "counter", counter); + + if (counter >= 10) { + ods.put("right", "finish", ValueFactory.createNilValue()); + return; + } + + FirstRingMessagePassing cs = new FirstRingMessagePassing(); + cs.counter.setKey("local", "counter"); + } + +} diff -r 414fcce36e90 -r b5a21baf0b07 src/alice/test/topology/ring/RingFinish.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/test/topology/ring/RingFinish.java Tue Jan 17 16:13:03 2012 +0900 @@ -0,0 +1,24 @@ +package alice.test.topology.ring; + +import org.msgpack.type.ValueFactory; + +import alice.codesegment.CodeSegment; +import alice.datasegment.CommandType; +import alice.datasegment.DataSegmentReceiver; + +public class RingFinish extends CodeSegment { + + private String sendKey; + public DataSegmentReceiver finish = new DataSegmentReceiver(ids, CommandType.TAKE); + + public RingFinish(String sendKey) { + this.sendKey = sendKey; + } + + @Override + public void run() { + ods.put(sendKey, "finish", ValueFactory.createNilValue()); + ods.finish(sendKey); + } + +} diff -r 414fcce36e90 -r b5a21baf0b07 src/alice/test/topology/ring/RingMessagePassing.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/test/topology/ring/RingMessagePassing.java Tue Jan 17 16:13:03 2012 +0900 @@ -0,0 +1,28 @@ +package alice.test.topology.ring; + +import alice.codesegment.CodeSegment; +import alice.datasegment.CommandType; +import alice.datasegment.DataSegmentReceiver; + +public class RingMessagePassing extends CodeSegment { + + public DataSegmentReceiver counter = new DataSegmentReceiver(ids, CommandType.TAKE); + + @Override + public void run() { + int counter = this.counter.asInteger(); + System.out.println(counter); + + try { + Thread.sleep(200); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + ods.put("right", "counter", counter); + + RingMessagePassing cs = new RingMessagePassing(); + cs.counter.setKey("local", "counter"); + } + +} diff -r 414fcce36e90 -r b5a21baf0b07 src/alice/test/topology/ring/RingTopology.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/test/topology/ring/RingTopology.java Tue Jan 17 16:13:03 2012 +0900 @@ -0,0 +1,12 @@ +package alice.test.topology.ring; + +import alice.topology.node.TopologyNode; + +public class RingTopology { + + public static void main(String[] args) { + RingTopologyConfig conf = new RingTopologyConfig(args); + TopologyNode node = new TopologyNode(conf, StartRing.class); + } + +} diff -r 414fcce36e90 -r b5a21baf0b07 src/alice/test/topology/ring/RingTopologyConfig.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/test/topology/ring/RingTopologyConfig.java Tue Jan 17 16:13:03 2012 +0900 @@ -0,0 +1,12 @@ +package alice.test.topology.ring; + +import alice.topology.node.TopologyNodeConfig; + +public class RingTopologyConfig extends TopologyNodeConfig { + + public RingTopologyConfig(String[] args) { + super(args); + + } + +} diff -r 414fcce36e90 -r b5a21baf0b07 src/alice/test/topology/ring/StartRing.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/test/topology/ring/StartRing.java Tue Jan 17 16:13:03 2012 +0900 @@ -0,0 +1,13 @@ +package alice.test.topology.ring; + +import alice.codesegment.CodeSegment; + +public class StartRing extends CodeSegment { + + @Override + public void run() { + CheckMyName cs = new CheckMyName(); + cs.host.setKey("local", "host"); + } + +} diff -r 414fcce36e90 -r b5a21baf0b07 src/alice/topology/manager/StartTopologyManager.java --- a/src/alice/topology/manager/StartTopologyManager.java Tue Jan 17 14:17:13 2012 +0900 +++ b/src/alice/topology/manager/StartTopologyManager.java Tue Jan 17 16:13:03 2012 +0900 @@ -9,14 +9,14 @@ import org.apache.log4j.Logger; +import alice.codesegment.CodeSegment; + import com.alexmerz.graphviz.ParseException; import com.alexmerz.graphviz.Parser; import com.alexmerz.graphviz.objects.Edge; import com.alexmerz.graphviz.objects.Graph; import com.alexmerz.graphviz.objects.Node; -import alice.codesegment.CodeSegment; - public class StartTopologyManager extends CodeSegment { TopologyManagerConfig conf; @@ -73,8 +73,11 @@ e.printStackTrace(); } - IncomingHosts cs = new IncomingHosts(topology, nodeNames); - cs.host.setKey("local", "host"); + IncomingHosts cs1 = new IncomingHosts(topology, nodeNames); + cs1.host.setKey("local", "host"); + + TopologyFinish cs2 = new TopologyFinish(); + cs2.finish.setKey("local", "finish"); } } diff -r 414fcce36e90 -r b5a21baf0b07 src/alice/topology/manager/TopologyFinish.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/topology/manager/TopologyFinish.java Tue Jan 17 16:13:03 2012 +0900 @@ -0,0 +1,14 @@ +package alice.topology.manager; + +import alice.codesegment.CodeSegment; +import alice.datasegment.CommandType; +import alice.datasegment.DataSegmentReceiver; + +public class TopologyFinish extends CodeSegment { + public DataSegmentReceiver finish = new DataSegmentReceiver(ids, CommandType.TAKE); + @Override + public void run() { + System.exit(0); + } + +} diff -r 414fcce36e90 -r b5a21baf0b07 src/alice/topology/node/ConfigurationFinish.java --- a/src/alice/topology/node/ConfigurationFinish.java Tue Jan 17 14:17:13 2012 +0900 +++ b/src/alice/topology/node/ConfigurationFinish.java Tue Jan 17 16:13:03 2012 +0900 @@ -8,9 +8,9 @@ public DataSegmentReceiver reverseCount = new DataSegmentReceiver(ids, CommandType.PEEK); public DataSegmentReceiver configNodeNum = new DataSegmentReceiver(ids, CommandType.PEEK); - private Class clazz; + private Class clazz; - public ConfigurationFinish(Class clazz) { + public ConfigurationFinish(Class clazz) { this.clazz = clazz; } diff -r 414fcce36e90 -r b5a21baf0b07 src/alice/topology/node/StartTopologyNode.java --- a/src/alice/topology/node/StartTopologyNode.java Tue Jan 17 14:17:13 2012 +0900 +++ b/src/alice/topology/node/StartTopologyNode.java Tue Jan 17 16:13:03 2012 +0900 @@ -15,9 +15,9 @@ public class StartTopologyNode extends CodeSegment { private TopologyNodeConfig conf; - private Class clazz; + private Class clazz; - public StartTopologyNode(TopologyNodeConfig conf, Class clazz) { + public StartTopologyNode(TopologyNodeConfig conf, Class clazz) { this.conf = conf; this.clazz = clazz; } diff -r 414fcce36e90 -r b5a21baf0b07 src/alice/topology/node/TopologyNode.java --- a/src/alice/topology/node/TopologyNode.java Tue Jan 17 14:17:13 2012 +0900 +++ b/src/alice/topology/node/TopologyNode.java Tue Jan 17 16:13:03 2012 +0900 @@ -5,14 +5,14 @@ public class TopologyNode { - public TopologyNode(String[] args, TopologyNodeConfig conf, Class clazz) { + public TopologyNode(TopologyNodeConfig conf, Class clazz) { new AliceDaemon(conf).listen(); new StartTopologyNode(conf, clazz).execute(); } public static void main(String[] args) { TopologyNodeConfig conf = new TopologyNodeConfig(args); - TopologyNode node = new TopologyNode(args, conf, null); + TopologyNode node = new TopologyNode(conf, null); } }