changeset 30:b5a21baf0b07

implements RingTopology
author kazz <kazz@cr.ie.u-ryukyu.ac.jp>
date Tue, 17 Jan 2012 16:13:03 +0900
parents 414fcce36e90
children 5c704b9a9a87
files scripts/ring.dot src/alice/codesegment/OutputDataSegment.java src/alice/daemon/OutboundTcpConnection.java src/alice/datasegment/CommandType.java src/alice/datasegment/DataSegmentManager.java src/alice/datasegment/DataSegmentReceiver.java src/alice/datasegment/LocalDataSegmentManager.java src/alice/datasegment/RemoteDataSegmentManager.java src/alice/test/topology/ring/CheckMyName.java src/alice/test/topology/ring/FirstRingMessagePassing.java src/alice/test/topology/ring/RingFinish.java src/alice/test/topology/ring/RingMessagePassing.java src/alice/test/topology/ring/RingTopology.java src/alice/test/topology/ring/RingTopologyConfig.java src/alice/test/topology/ring/StartRing.java src/alice/topology/manager/StartTopologyManager.java src/alice/topology/manager/TopologyFinish.java src/alice/topology/node/ConfigurationFinish.java src/alice/topology/node/StartTopologyNode.java src/alice/topology/node/TopologyNode.java
diffstat 20 files changed, 244 insertions(+), 14 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/scripts/ring.dot	Tue Jan 17 16:13:03 2012 +0900
@@ -0,0 +1,8 @@
+digraph test {
+	node0 -> node1 [label="right"]
+	node0 -> node2 [label="left"]
+	node1 -> node2 [label="right"]
+	node1 -> node0 [label="left"]
+	node2 -> node0 [label="right"]
+	node2 -> node1 [label="left"]
+}
--- a/src/alice/codesegment/OutputDataSegment.java	Tue Jan 17 14:17:13 2012 +0900
+++ b/src/alice/codesegment/OutputDataSegment.java	Tue Jan 17 16:13:03 2012 +0900
@@ -1,6 +1,7 @@
 package alice.codesegment;
 
 import org.msgpack.type.Value;
+import org.msgpack.type.ValueFactory;
 
 import alice.datasegment.DataSegment;
 
@@ -14,4 +15,25 @@
 		DataSegment.get(managerKey).update(key, val);
 	}
 	
+	public void put(String managerKey, String key, String val) {
+		DataSegment.get(managerKey).put(key, ValueFactory.createRawValue(val));
+	}
+	
+	public void update(String managerKey, String key, String val) {
+		DataSegment.get(managerKey).update(key, ValueFactory.createRawValue(val));
+	}
+
+	public void put(String managerKey, String key, int val) {
+		DataSegment.get(managerKey).put(key, ValueFactory.createIntegerValue(val));
+	}
+	
+	public void update(String managerKey, String key, int val) {
+		DataSegment.get(managerKey).update(key, ValueFactory.createIntegerValue(val));
+	}
+	
+	public void finish(String managerKey) {
+		DataSegment.get(managerKey).finish();
+	}
+
+
 }
--- a/src/alice/daemon/OutboundTcpConnection.java	Tue Jan 17 14:17:13 2012 +0900
+++ b/src/alice/daemon/OutboundTcpConnection.java	Tue Jan 17 16:13:03 2012 +0900
@@ -6,6 +6,7 @@
 import org.msgpack.MessagePack;
 
 import alice.datasegment.Command;
+import alice.datasegment.CommandType;
 
 public class OutboundTcpConnection extends Thread {
 	
@@ -23,7 +24,12 @@
 		MessagePack msgpack = new MessagePack();
 		while (true) {
 			try {
-				CommandMessage cmdMsg = convert(connection.sendQueue.take());
+				Command cmd = connection.sendQueue.take();
+				if (cmd.type == CommandType.FINISH) {
+					System.exit(0);
+					return;
+				}
+				CommandMessage cmdMsg = convert(cmd);
 				byte[] buf = msgpack.write(cmdMsg);
 				ByteBuffer buffer = ByteBuffer.allocateDirect(4 + buf.length);
 				buffer.putInt(buf.length);
@@ -35,7 +41,6 @@
 			} catch (IOException e) {
 				e.printStackTrace();
 			}
-			
 		}
 	}
 	
--- a/src/alice/datasegment/CommandType.java	Tue Jan 17 14:17:13 2012 +0900
+++ b/src/alice/datasegment/CommandType.java	Tue Jan 17 16:13:03 2012 +0900
@@ -8,7 +8,8 @@
 	PEEK,
 	TAKE,
 	REMOVE,
-	REPLY;
+	REPLY,
+	FINISH;
 	
 	public int id;
 	public static HashMap<Integer, CommandType> hash = new HashMap<Integer, CommandType>();
--- a/src/alice/datasegment/DataSegmentManager.java	Tue Jan 17 14:17:13 2012 +0900
+++ b/src/alice/datasegment/DataSegmentManager.java	Tue Jan 17 16:13:03 2012 +0900
@@ -42,5 +42,6 @@
 	}
 	public abstract void peek(DataSegmentReceiver receiver, String key, int index, CodeSegment cs);
 	public abstract void remove(String key);
+	public abstract void finish();
 	
 }
--- a/src/alice/datasegment/DataSegmentReceiver.java	Tue Jan 17 14:17:13 2012 +0900
+++ b/src/alice/datasegment/DataSegmentReceiver.java	Tue Jan 17 16:13:03 2012 +0900
@@ -1,6 +1,7 @@
 package alice.datasegment;
 
 import org.msgpack.type.Value;
+import org.msgpack.type.ValueType;
 
 import alice.codesegment.InputDataSegment;
 
@@ -34,4 +35,18 @@
 		ids.setKey();
 	}
 	
+	public String asString() {
+		if (val.getType() == ValueType.RAW) {
+			return val.asRawValue().getString();
+		}
+		return null;
+	}
+
+	public int asInteger() {
+		if (val.getType() == ValueType.INTEGER) {
+			return val.asIntegerValue().getInt();
+		}
+		return 0;
+	}
+	
 }
--- a/src/alice/datasegment/LocalDataSegmentManager.java	Tue Jan 17 14:17:13 2012 +0900
+++ b/src/alice/datasegment/LocalDataSegmentManager.java	Tue Jan 17 16:13:03 2012 +0900
@@ -61,5 +61,9 @@
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
 		dataSegmentKey.addCommand(new Command(CommandType.REMOVE, null, null, null, 0, 0, replyQueue, null, null));
 	}
-
+	
+	@Override public void finish() {
+		System.exit(0);
+	}
+	
 }
--- a/src/alice/datasegment/RemoteDataSegmentManager.java	Tue Jan 17 14:17:13 2012 +0900
+++ b/src/alice/datasegment/RemoteDataSegmentManager.java	Tue Jan 17 16:13:03 2012 +0900
@@ -87,5 +87,9 @@
 	public void remove(String key) {
 		connection.sendCommand(new Command(CommandType.REMOVE, null, key, null, 0, 0, null, null, null));
 	}
+	
+	public void finish() {
+		connection.sendCommand(new Command(CommandType.FINISH, null, null, null, 0, 0, null, null, null));
+	}
 
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/test/topology/ring/CheckMyName.java	Tue Jan 17 16:13:03 2012 +0900
@@ -0,0 +1,31 @@
+package alice.test.topology.ring;
+
+import alice.codesegment.CodeSegment;
+import alice.datasegment.CommandType;
+import alice.datasegment.DataSegmentReceiver;
+
+public class CheckMyName extends CodeSegment {
+
+	DataSegmentReceiver host = new DataSegmentReceiver(ids, CommandType.PEEK);
+	
+	@Override
+	public void run() {
+		String host = this.host.asString();
+		if (host.equals("node0")) {
+			System.out.println("I am first node");
+			ods.put("local", "counter", 0);
+			FirstRingMessagePassing cs1 = new FirstRingMessagePassing();
+			cs1.counter.setKey("local", "counter");
+			RingFinish cs2 = new RingFinish("manager");
+			cs2.finish.setKey("local", "finish");
+			
+		} else {
+			System.out.println("I am normal node");
+			RingMessagePassing cs1 = new RingMessagePassing();
+			cs1.counter.setKey("local", "counter");
+			RingFinish cs2 = new RingFinish("right");
+			cs2.finish.setKey("local", "finish");
+		}
+	}
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/test/topology/ring/FirstRingMessagePassing.java	Tue Jan 17 16:13:03 2012 +0900
@@ -0,0 +1,33 @@
+package alice.test.topology.ring;
+
+import org.msgpack.type.ValueFactory;
+
+import alice.codesegment.CodeSegment;
+import alice.datasegment.CommandType;
+import alice.datasegment.DataSegmentReceiver;
+
+public class FirstRingMessagePassing extends CodeSegment {
+	
+	DataSegmentReceiver counter = new DataSegmentReceiver(ids, CommandType.TAKE);
+	
+	@Override
+	public void run() {
+		int counter = this.counter.asInteger();
+		System.out.println(++counter);
+		try {
+			Thread.sleep(200);
+		} catch (InterruptedException e) {
+			e.printStackTrace();
+		}
+		ods.put("right", "counter", counter);
+		
+		if (counter >= 10) {
+			ods.put("right", "finish", ValueFactory.createNilValue());
+			return;	
+		}
+		
+		FirstRingMessagePassing cs = new FirstRingMessagePassing();
+		cs.counter.setKey("local", "counter");
+	}
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/test/topology/ring/RingFinish.java	Tue Jan 17 16:13:03 2012 +0900
@@ -0,0 +1,24 @@
+package alice.test.topology.ring;
+
+import org.msgpack.type.ValueFactory;
+
+import alice.codesegment.CodeSegment;
+import alice.datasegment.CommandType;
+import alice.datasegment.DataSegmentReceiver;
+
+public class RingFinish extends CodeSegment {
+	
+	private String sendKey;
+	public DataSegmentReceiver finish = new DataSegmentReceiver(ids, CommandType.TAKE);
+	
+	public RingFinish(String sendKey) {
+		this.sendKey = sendKey;
+	}
+
+	@Override
+	public void run() {
+		ods.put(sendKey, "finish", ValueFactory.createNilValue());
+		ods.finish(sendKey);
+	}
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/test/topology/ring/RingMessagePassing.java	Tue Jan 17 16:13:03 2012 +0900
@@ -0,0 +1,28 @@
+package alice.test.topology.ring;
+
+import alice.codesegment.CodeSegment;
+import alice.datasegment.CommandType;
+import alice.datasegment.DataSegmentReceiver;
+
+public class RingMessagePassing extends CodeSegment {
+
+	public DataSegmentReceiver counter = new DataSegmentReceiver(ids, CommandType.TAKE);
+
+	@Override
+	public void run() {
+		int counter = this.counter.asInteger();
+		System.out.println(counter);
+
+		try {
+			Thread.sleep(200);
+		} catch (InterruptedException e) {
+			e.printStackTrace();
+		}
+		
+		ods.put("right", "counter", counter);
+		
+		RingMessagePassing cs = new RingMessagePassing();
+		cs.counter.setKey("local", "counter");
+	}
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/test/topology/ring/RingTopology.java	Tue Jan 17 16:13:03 2012 +0900
@@ -0,0 +1,12 @@
+package alice.test.topology.ring;
+
+import alice.topology.node.TopologyNode;
+
+public class RingTopology {
+	
+	public static void main(String[] args) {
+		RingTopologyConfig conf = new RingTopologyConfig(args);
+		TopologyNode node = new TopologyNode(conf, StartRing.class);
+	}
+	
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/test/topology/ring/RingTopologyConfig.java	Tue Jan 17 16:13:03 2012 +0900
@@ -0,0 +1,12 @@
+package alice.test.topology.ring;
+
+import alice.topology.node.TopologyNodeConfig;
+
+public class RingTopologyConfig extends TopologyNodeConfig {
+
+	public RingTopologyConfig(String[] args) {
+		super(args);
+		
+	}
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/test/topology/ring/StartRing.java	Tue Jan 17 16:13:03 2012 +0900
@@ -0,0 +1,13 @@
+package alice.test.topology.ring;
+
+import alice.codesegment.CodeSegment;
+
+public class StartRing extends CodeSegment {
+
+	@Override
+	public void run() {
+		CheckMyName cs = new CheckMyName();
+		cs.host.setKey("local", "host");
+	}
+
+}
--- a/src/alice/topology/manager/StartTopologyManager.java	Tue Jan 17 14:17:13 2012 +0900
+++ b/src/alice/topology/manager/StartTopologyManager.java	Tue Jan 17 16:13:03 2012 +0900
@@ -9,14 +9,14 @@
 
 import org.apache.log4j.Logger;
 
+import alice.codesegment.CodeSegment;
+
 import com.alexmerz.graphviz.ParseException;
 import com.alexmerz.graphviz.Parser;
 import com.alexmerz.graphviz.objects.Edge;
 import com.alexmerz.graphviz.objects.Graph;
 import com.alexmerz.graphviz.objects.Node;
 
-import alice.codesegment.CodeSegment;
-
 public class StartTopologyManager extends CodeSegment {
 
 	TopologyManagerConfig conf;
@@ -73,8 +73,11 @@
 			e.printStackTrace();
 		}
 		
-		IncomingHosts cs = new IncomingHosts(topology, nodeNames);
-		cs.host.setKey("local", "host");
+		IncomingHosts cs1 = new IncomingHosts(topology, nodeNames);
+		cs1.host.setKey("local", "host");
+		
+		TopologyFinish cs2 = new TopologyFinish();
+		cs2.finish.setKey("local", "finish");
 	}
 
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/topology/manager/TopologyFinish.java	Tue Jan 17 16:13:03 2012 +0900
@@ -0,0 +1,14 @@
+package alice.topology.manager;
+
+import alice.codesegment.CodeSegment;
+import alice.datasegment.CommandType;
+import alice.datasegment.DataSegmentReceiver;
+
+public class TopologyFinish extends CodeSegment {
+	public DataSegmentReceiver finish = new DataSegmentReceiver(ids, CommandType.TAKE);
+	@Override
+	public void run() {
+		System.exit(0);
+	}
+
+}
--- a/src/alice/topology/node/ConfigurationFinish.java	Tue Jan 17 14:17:13 2012 +0900
+++ b/src/alice/topology/node/ConfigurationFinish.java	Tue Jan 17 16:13:03 2012 +0900
@@ -8,9 +8,9 @@
 
 	public DataSegmentReceiver reverseCount = new DataSegmentReceiver(ids, CommandType.PEEK);
 	public DataSegmentReceiver configNodeNum = new DataSegmentReceiver(ids, CommandType.PEEK);
-	private Class<CodeSegment> clazz;
+	private Class<? extends CodeSegment> clazz;
 	
-	public ConfigurationFinish(Class<CodeSegment> clazz) {
+	public ConfigurationFinish(Class<? extends CodeSegment> clazz) {
 		this.clazz = clazz;
 	}
 	
--- a/src/alice/topology/node/StartTopologyNode.java	Tue Jan 17 14:17:13 2012 +0900
+++ b/src/alice/topology/node/StartTopologyNode.java	Tue Jan 17 16:13:03 2012 +0900
@@ -15,9 +15,9 @@
 public class StartTopologyNode extends CodeSegment {
 
 	private TopologyNodeConfig conf;
-	private Class<CodeSegment> clazz;
+	private Class<? extends CodeSegment> clazz;
 	
-	public StartTopologyNode(TopologyNodeConfig conf, Class<CodeSegment> clazz) {
+	public StartTopologyNode(TopologyNodeConfig conf, Class<? extends CodeSegment> clazz) {
 		this.conf = conf;
 		this.clazz = clazz;
 	}
--- a/src/alice/topology/node/TopologyNode.java	Tue Jan 17 14:17:13 2012 +0900
+++ b/src/alice/topology/node/TopologyNode.java	Tue Jan 17 16:13:03 2012 +0900
@@ -5,14 +5,14 @@
 
 public class TopologyNode {
 
-	public TopologyNode(String[] args, TopologyNodeConfig conf, Class<CodeSegment> clazz) {
+	public TopologyNode(TopologyNodeConfig conf, Class<? extends CodeSegment> clazz) {
 		new AliceDaemon(conf).listen();
 		new StartTopologyNode(conf, clazz).execute();
 	}
 	
 	public static void main(String[] args) {
 		TopologyNodeConfig conf = new TopologyNodeConfig(args);
-		TopologyNode node = new TopologyNode(args, conf, null);
+		TopologyNode node = new TopologyNode(conf, null);
 	}
 
 }