changeset 28:98ab26e09a98

Configuration Manager work and implements reverseKey
author kazz <kazz@cr.ie.u-ryukyu.ac.jp>
date Tue, 17 Jan 2012 08:41:34 +0900
parents f54dcbebde3a
children 414fcce36e90
files build.xml src/alice/codesegment/InputDataSegment.java src/alice/daemon/AcceptThread.java src/alice/daemon/IncomingTcpConnection.java src/alice/datasegment/Command.java src/alice/datasegment/DataSegment.java src/alice/datasegment/DataSegmentKey.java src/alice/datasegment/DataSegmentManager.java src/alice/datasegment/DataSegmentReceiver.java src/alice/datasegment/DataSegmentValue.java src/alice/datasegment/LocalDataSegmentManager.java src/alice/datasegment/RemoteDataSegmentManager.java src/alice/topology/HostMessage.java src/alice/topology/manager/IncomingHosts.java src/alice/topology/manager/NodeInfo.java src/alice/topology/manager/StartTopologyManager.java src/alice/topology/node/ConfigurationFinish.java src/alice/topology/node/IncomingAbstractHostName.java src/alice/topology/node/IncomingConnectionInfo.java src/alice/topology/node/IncomingReverseKey.java src/alice/topology/node/StartTopologyNode.java
diffstat 21 files changed, 175 insertions(+), 65 deletions(-) [+]
line wrap: on
line diff
--- a/build.xml	Tue Jan 17 03:52:39 2012 +0900
+++ b/build.xml	Tue Jan 17 08:41:34 2012 +0900
@@ -37,6 +37,7 @@
 	<pathelement path="lib/slf4j-api-1.6.1.jar" />
 	<pathelement path="lib/slf4j-log4j12-1.6.1.jar" />
 	<pathelement path="lib/msgpack-0.6.5-SNAPSHOT.jar" />
+	<pathelement path="lib/com.alexmerz.graphviz.jar" />
       </classpath>
       <sourcepath>
 	<pathelement path="src"/>
--- a/src/alice/codesegment/InputDataSegment.java	Tue Jan 17 03:52:39 2012 +0900
+++ b/src/alice/codesegment/InputDataSegment.java	Tue Jan 17 08:41:34 2012 +0900
@@ -35,6 +35,7 @@
 	public void reply(DataSegmentReceiver receiver, DataSegmentValue val) {
 		receiver.index = val.index;
 		receiver.val = val.val;
+		receiver.from = val.from;
 		receive();
 	}
 
--- a/src/alice/daemon/AcceptThread.java	Tue Jan 17 03:52:39 2012 +0900
+++ b/src/alice/daemon/AcceptThread.java	Tue Jan 17 08:41:34 2012 +0900
@@ -12,7 +12,7 @@
 
 	private ServerSocket ss;
 	private Logger log = Logger.getLogger(AcceptThread.class);
-
+	public int counter = 0;
 
 	public AcceptThread(ServerSocket ss, String name) {
 		super(name);
@@ -26,8 +26,13 @@
 				Socket socket = ss.accept();
 				log.info("Accept " + socket.getInetAddress().getHostName() + ":" + socket.getPort());
 				Connection connection = new Connection(socket);
-				new IncomingTcpConnection(connection, DataSegment.get("local")).start();
+				String key = "accept" + counter;
+				IncomingTcpConnection incoming =
+						new IncomingTcpConnection(connection, DataSegment.get("local"), key);
+				incoming.start();
+				DataSegment.setAccept(key, incoming);
 				new OutboundTcpConnection(connection).start();
+				counter++;
 			} catch (IOException e) {
 				e.printStackTrace();
 			}
--- a/src/alice/daemon/IncomingTcpConnection.java	Tue Jan 17 03:52:39 2012 +0900
+++ b/src/alice/daemon/IncomingTcpConnection.java	Tue Jan 17 08:41:34 2012 +0900
@@ -17,10 +17,11 @@
 	
 	public Connection connection;
 	public DataSegmentManager manager;
-	
-	public IncomingTcpConnection(Connection connection, DataSegmentManager manager) {
+	public String reverseKey;
+	public IncomingTcpConnection(Connection connection, DataSegmentManager manager, String reverseKey) {
 		this.manager = manager;
 		this.connection = connection;
+		this.reverseKey = reverseKey;
 	}
 	
 	public void run() {
@@ -52,24 +53,24 @@
 				DataSegmentKey dsKey = lmanager.getDataSegmentKey(msg.key);
 				switch (type) {
 				case UPDATE:
-					dsKey.addCommand(new Command(type, null, null, msg.val, 0, 0, null, null));
+					dsKey.addCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey));
 					break;
 				case PUT:
-					dsKey.addCommand(new Command(type, null, null, msg.val, 0, 0, null, null));
+					dsKey.addCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey));
 					break;
 				case PEEK:
 					//Command(CommandType cmdType, String argKey, Value val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs) {
-					dsKey.addCommand(new Command(type, null, null, null, msg.index, msg.seq, connection.sendQueue, null));
+					dsKey.addCommand(new Command(type, null, null, null, msg.index, msg.seq, connection.sendQueue, null, null));
 					break;
 				case TAKE:
-					dsKey.addCommand(new Command(type, null, null, null, msg.index, msg.seq, connection.sendQueue, null));
+					dsKey.addCommand(new Command(type, null, null, null, msg.index, msg.seq, connection.sendQueue, null, null));
 					break;	
 				case REMOVE:
-					dsKey.addCommand(new Command(type, null, null, null, 0, 0, null, null));
+					dsKey.addCommand(new Command(type, null, null, null, 0, 0, null, null, null));
 					break;
 				case REPLY:
 					try {
-						manager.replyQueue.put(new Command(type, null, null, msg.val, msg.index, msg.seq, null, null));
+						manager.replyQueue.put(new Command(type, null, null, msg.val, msg.index, msg.seq, null, null, null));
 					} catch (InterruptedException e) {
 						e.printStackTrace();
 					}
--- a/src/alice/datasegment/Command.java	Tue Jan 17 03:52:39 2012 +0900
+++ b/src/alice/datasegment/Command.java	Tue Jan 17 08:41:34 2012 +0900
@@ -15,8 +15,9 @@
 	public int seq;
 	public BlockingQueue<Command> replyQueue;
 	public CodeSegment cs;
+	public String reverseKey;
 	
-	public Command(CommandType cmdType, DataSegmentReceiver receiver, String key, Value val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs) {
+	public Command(CommandType cmdType, DataSegmentReceiver receiver, String key, Value val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) {
 		this.type = cmdType;
 		this.receiver = receiver;
 		this.key = key;
@@ -25,6 +26,7 @@
 		this.seq = seq;
 		this.replyQueue = replyQueue;
 		this.cs = cs;
+		this.reverseKey = reverseKey;
 	}
 	
 }
--- a/src/alice/datasegment/DataSegment.java	Tue Jan 17 03:52:39 2012 +0900
+++ b/src/alice/datasegment/DataSegment.java	Tue Jan 17 08:41:34 2012 +0900
@@ -2,10 +2,13 @@
 
 import java.util.concurrent.ConcurrentHashMap;
 
+import alice.daemon.IncomingTcpConnection;
+
 public class DataSegment {
 	
 	private static DataSegment dataSegment = new DataSegment();
 	private ConcurrentHashMap<String, DataSegmentManager> dataSegmentManageres = new ConcurrentHashMap<String, DataSegmentManager>();
+	private ConcurrentHashMap<String, IncomingTcpConnection> acceptHash = new ConcurrentHashMap<String, IncomingTcpConnection>();
 	
 	private DataSegment() {
 		dataSegmentManageres.put("local", new LocalDataSegmentManager());
@@ -19,10 +22,18 @@
 		dataSegment.dataSegmentManageres.put(key, manager);
 	}
 	
-	public static RemoteDataSegmentManager connect(String key, String hostName, int port) {
-		RemoteDataSegmentManager manager = new RemoteDataSegmentManager(key, hostName, port);
-		regist(key, manager);
+	public static RemoteDataSegmentManager connect(String connectionKey, String reverseKey, String hostName, int port) {
+		RemoteDataSegmentManager manager = new RemoteDataSegmentManager(connectionKey, reverseKey, hostName, port);
+		regist(connectionKey, manager);
 		return manager;
 	}
 	
+	public static void setAccept(String key, IncomingTcpConnection incoming) {
+		dataSegment.acceptHash.put(key, incoming);
+	}
+	
+	public static IncomingTcpConnection getAccept(String key) {
+		return dataSegment.acceptHash.get(key);
+	}
+	
 }
--- a/src/alice/datasegment/DataSegmentKey.java	Tue Jan 17 03:52:39 2012 +0900
+++ b/src/alice/datasegment/DataSegmentKey.java	Tue Jan 17 08:41:34 2012 +0900
@@ -38,14 +38,14 @@
 							}
 						case PUT:
 							int index = tailIndex.getAndIncrement();
-							DataSegmentValue dsv = new DataSegmentValue(index, cmd.val); 
+							DataSegmentValue dsv = new DataSegmentValue(index, cmd.val, cmd.reverseKey); 
 							dataList.add(dsv);
 							// run waiting peek and take
 							boolean takeFlag = true;
 							for (Iterator<Command> iter = waitList.iterator(); iter.hasNext() && takeFlag; ) {
 								Command waitCmd = iter.next();
 								if (waitCmd.index < index) {
-									waitCmd.replyQueue.put(new Command(CommandType.REPLY, null, null, cmd.val, index, waitCmd.seq, null, null));
+									waitCmd.replyQueue.put(new Command(CommandType.REPLY, null, null, cmd.val, index, waitCmd.seq, null, null, cmd.reverseKey));
 									iter.remove();
 									if (waitCmd.type == CommandType.TAKE) { // delete data, if it run take cmd.
 										dataList.remove(dsv);
@@ -61,7 +61,7 @@
 							}
 							for (DataSegmentValue data : dataList) {
 								if (data.index > cmd.index) {
-									cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null));
+									cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null, data.from));
 									break;
 								}
 							}
@@ -76,7 +76,7 @@
 							for (Iterator<DataSegmentValue> iter = dataList.iterator(); iter.hasNext(); ) {
 								DataSegmentValue data = iter.next();
 								if (data.index > cmd.index) {
-									cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null));
+									cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null, data.from));
 									iter.remove();
 									waitFlag = false;
 									break;
--- a/src/alice/datasegment/DataSegmentManager.java	Tue Jan 17 03:52:39 2012 +0900
+++ b/src/alice/datasegment/DataSegmentManager.java	Tue Jan 17 08:41:34 2012 +0900
@@ -22,7 +22,7 @@
 				try {
 					Command reply = replyQueue.take();
 					Command cmd = seqHash.get(reply.seq);
-					cmd.cs.ids.reply(cmd.receiver, new DataSegmentValue(reply.index, reply.val));
+					cmd.cs.ids.reply(cmd.receiver, new DataSegmentValue(reply.index, reply.val, reply.reverseKey));
 				} catch (InterruptedException e) {
 					e.printStackTrace();
 				}
--- a/src/alice/datasegment/DataSegmentReceiver.java	Tue Jan 17 03:52:39 2012 +0900
+++ b/src/alice/datasegment/DataSegmentReceiver.java	Tue Jan 17 08:41:34 2012 +0900
@@ -8,6 +8,7 @@
 	public InputDataSegment ids;
 	public int index;
 	public Value val;
+	public String from;
 	public CommandType type;
 	
 	
--- a/src/alice/datasegment/DataSegmentValue.java	Tue Jan 17 03:52:39 2012 +0900
+++ b/src/alice/datasegment/DataSegmentValue.java	Tue Jan 17 08:41:34 2012 +0900
@@ -6,10 +6,12 @@
 
 	public int index;
 	public Value val;
+	public String from;
 	
-	public DataSegmentValue(int index, Value val) {
+	public DataSegmentValue(int index, Value val, String reverseKey) {
 		this.index = index;
 		this.val = val;
+		this.from = reverseKey;
 	}
 	
 }
--- a/src/alice/datasegment/LocalDataSegmentManager.java	Tue Jan 17 03:52:39 2012 +0900
+++ b/src/alice/datasegment/LocalDataSegmentManager.java	Tue Jan 17 08:41:34 2012 +0900
@@ -7,6 +7,8 @@
 
 public class LocalDataSegmentManager extends DataSegmentManager {
 	
+	public String reverseKey = "local";
+	
 	public LocalDataSegmentManager() {
 		new Thread(replyThread, "LocalDataSegmentManager").start();
 	}
@@ -27,20 +29,20 @@
 	@Override
 	public void put(String key, Value val) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
-		dataSegmentKey.addCommand(new Command(CommandType.PUT, null, null, val, 0, 0, replyQueue, null));
+		dataSegmentKey.addCommand(new Command(CommandType.PUT, null, null, val, 0, 0, replyQueue, null, reverseKey));
 	}
 
 	@Override
 	public void update(String key, Value val) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
-		dataSegmentKey.addCommand(new Command(CommandType.UPDATE, null, null, val, 0, 0, replyQueue, null));
+		dataSegmentKey.addCommand(new Command(CommandType.UPDATE, null, null, val, 0, 0, replyQueue, null, reverseKey));
 	}
 
 	@Override
 	public void take(DataSegmentReceiver receiver, String key, int index, CodeSegment cs) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
 		int seq = this.seq.getAndIncrement();
-		Command cmd = new Command(CommandType.TAKE, receiver, null, null, index, seq, replyQueue, cs);
+		Command cmd = new Command(CommandType.TAKE, receiver, null, null, index, seq, replyQueue, cs, null);
 		seqHash.put(seq, cmd);
 		dataSegmentKey.addCommand(cmd);
 	}
@@ -49,7 +51,7 @@
 	public void peek(DataSegmentReceiver receiver, String key, int index, CodeSegment cs) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
 		int seq = this.seq.getAndIncrement();
-		Command cmd = new Command(CommandType.PEEK, receiver, null, null, index, seq, replyQueue, cs);
+		Command cmd = new Command(CommandType.PEEK, receiver, null, null, index, seq, replyQueue, cs, null);
 		seqHash.put(seq, cmd);
 		dataSegmentKey.addCommand(cmd);
 	}
@@ -57,7 +59,7 @@
 	@Override
 	public void remove(String key) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
-		dataSegmentKey.addCommand(new Command(CommandType.REMOVE, null, null, null, 0, 0, replyQueue, null));
+		dataSegmentKey.addCommand(new Command(CommandType.REMOVE, null, null, null, 0, 0, replyQueue, null, null));
 	}
 
 }
--- a/src/alice/datasegment/RemoteDataSegmentManager.java	Tue Jan 17 03:52:39 2012 +0900
+++ b/src/alice/datasegment/RemoteDataSegmentManager.java	Tue Jan 17 08:41:34 2012 +0900
@@ -21,18 +21,18 @@
 	@Deprecated
 	public RemoteDataSegmentManager(Connection connection) {
 		this.connection = connection;
-		new IncomingTcpConnection(connection, this).start();
+		new IncomingTcpConnection(connection, this, "").start();
 		new OutboundTcpConnection(connection).start();
 		new Thread(replyThread, "RemoteDataSegmentManager-"
 		+ connection.socket.getInetAddress().getHostName()
 		+ ":" + connection.socket.getPort()).start();
 	}
 	
-	public RemoteDataSegmentManager(String key, final String hostName, final int port) {
+	public RemoteDataSegmentManager(String connectionKey, final String reverseKey, final String hostName, final int port) {
 		connection = new Connection();
 		final RemoteDataSegmentManager manager = this;
-		new Thread(replyThread, "RemoteDataSegmentManager-" + key).start();
-		new Thread("Connect-" + key) {
+		new Thread(replyThread, "RemoteDataSegmentManager-" + connectionKey).start();
+		new Thread("Connect-" + connectionKey) {
 			public void run() {
 				boolean connect = true;
 				do {
@@ -51,7 +51,7 @@
 						}
 					}
 				} while (connect);
-				new IncomingTcpConnection(connection, manager).start();
+				new IncomingTcpConnection(connection, manager, reverseKey).start();
 				new OutboundTcpConnection(connection).start();
 			}
 		}.start();
@@ -59,18 +59,18 @@
 	
 	@Override
 	public void put(String key, Value val) {
-		connection.sendCommand(new Command(CommandType.PUT, null, key, val, 0, 0, null, null));
+		connection.sendCommand(new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null));
 	}
 
 	@Override
 	public void update(String key, Value val) {
-		connection.sendCommand(new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null));
+		connection.sendCommand(new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null));
 	}
 
 	@Override
 	public void take(DataSegmentReceiver receiver, String key, int index, CodeSegment cs) {
 		int seq = this.seq.getAndIncrement();
-		Command cmd = new Command(CommandType.TAKE, receiver, key, null, index, seq, replyQueue, cs);
+		Command cmd = new Command(CommandType.TAKE, receiver, key, null, index, seq, replyQueue, cs, null);
 		seqHash.put(seq, cmd);
 		connection.sendCommand(cmd);		
 	}
@@ -78,14 +78,14 @@
 	@Override
 	public void peek(DataSegmentReceiver receiver, String key, int index, CodeSegment cs) {
 		int seq = this.seq.getAndIncrement();
-		Command cmd = new Command(CommandType.PEEK, receiver, key, null, index, seq, replyQueue, cs);
+		Command cmd = new Command(CommandType.PEEK, receiver, key, null, index, seq, replyQueue, cs, null);
 		seqHash.put(seq, cmd);
 		connection.sendCommand(cmd);
 	}
 
 	@Override
 	public void remove(String key) {
-		connection.sendCommand(new Command(CommandType.REMOVE, null, key, null, 0, 0, null, null));
+		connection.sendCommand(new Command(CommandType.REMOVE, null, key, null, 0, 0, null, null, null));
 	}
 
 }
--- a/src/alice/topology/HostMessage.java	Tue Jan 17 03:52:39 2012 +0900
+++ b/src/alice/topology/HostMessage.java	Tue Jan 17 08:41:34 2012 +0900
@@ -9,6 +9,7 @@
 	public String name;
 	public int port;
 	@Optional public String connectionName;
+	@Optional public String reverseName;
 	
 	public HostMessage() { }
 	public HostMessage(String name, int port) {
@@ -16,10 +17,11 @@
 		this.port = port;
 	}
 
-	public HostMessage(String name, int port, String connectionName) {
+	public HostMessage(String name, int port, String connectionName, String reverseName) {
 		this.name = name;
 		this.port = port;
 		this.connectionName = connectionName;
+		this.reverseName = reverseName;
 	}
 	
 }
--- a/src/alice/topology/manager/IncomingHosts.java	Tue Jan 17 03:52:39 2012 +0900
+++ b/src/alice/topology/manager/IncomingHosts.java	Tue Jan 17 08:41:34 2012 +0900
@@ -33,11 +33,11 @@
 		try {
 			HostMessage host = msgpack.convert(this.host.val, HostMessage.class);
 			String nodeName = nodeNames.poll();
-			DataSegmentManager manager = DataSegment.connect(nodeName, host.name, host.port);
+			DataSegmentManager manager = DataSegment.connect(nodeName, "", host.name, host.port);
 			manager.put("host", ValueFactory.createRawValue(nodeName));
 			LinkedList<NodeInfo> nodes = topology.get(nodeName);
 			for (NodeInfo nodeInfo : nodes) {
-				HostMessage newHost = new HostMessage(host.name, host.port, nodeInfo.connectionName);
+				HostMessage newHost = new HostMessage(host.name, host.port, nodeInfo.connectionName, nodeInfo.reverseName);
 				ods.put("local", nodeInfo.sourceNodeName, msgpack.unconvert(newHost));
 			}
 		} catch (IOException e) {
--- a/src/alice/topology/manager/NodeInfo.java	Tue Jan 17 03:52:39 2012 +0900
+++ b/src/alice/topology/manager/NodeInfo.java	Tue Jan 17 08:41:34 2012 +0900
@@ -4,11 +4,11 @@
 	
 	public String sourceNodeName;
 	public String connectionName;
+	public String reverseName;
 	
 	public NodeInfo(String source, String connection) {
 		this.sourceNodeName = source;
 		this.connectionName = connection;
-		
 	}
 	
 }
--- a/src/alice/topology/manager/StartTopologyManager.java	Tue Jan 17 03:52:39 2012 +0900
+++ b/src/alice/topology/manager/StartTopologyManager.java	Tue Jan 17 08:41:34 2012 +0900
@@ -43,12 +43,25 @@
 					topology.put(nodeName, new LinkedList<NodeInfo>());
 				}
 				ArrayList<Edge> edges = graph.getEdges();
+				HashMap<String, NodeInfo> hash = new HashMap<String, NodeInfo>();
 				for (Edge edge : edges) {
 					String connection = edge.getAttribute("label");
 					String source = edge.getSource().getNode().getId().getId();
-					String target = edge.getSource().getNode().getId().getId();
+					String target = edge.getTarget().getNode().getId().getId();
 					LinkedList<NodeInfo> sources = topology.get(target);
-					sources.add(new NodeInfo(source, connection));
+					NodeInfo nodeInfo = new NodeInfo(source, connection);
+					sources.add(nodeInfo);
+					hash.put(source + target, nodeInfo);
+				}
+				for (Edge edge : edges) {
+					String connection = edge.getAttribute("label");
+					String source = edge.getSource().getNode().getId().getId();
+					String target = edge.getTarget().getNode().getId().getId();
+					NodeInfo nodeInfo = hash.get(target + source);
+					if (nodeInfo != null) {
+						nodeInfo.reverseName = connection;
+					}
+					
 				}
 			}
 			
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/topology/node/ConfigurationFinish.java	Tue Jan 17 08:41:34 2012 +0900
@@ -0,0 +1,38 @@
+package alice.topology.node;
+
+import alice.codesegment.CodeSegment;
+import alice.datasegment.CommandType;
+import alice.datasegment.DataSegmentReceiver;
+
+public class ConfigurationFinish extends CodeSegment {
+
+	public DataSegmentReceiver reverseCount = new DataSegmentReceiver(ids, CommandType.PEEK);
+	public DataSegmentReceiver configNodeNum = new DataSegmentReceiver(ids, CommandType.PEEK);
+	private Class<CodeSegment> clazz;
+	
+	public ConfigurationFinish(Class<CodeSegment> clazz) {
+		this.clazz = clazz;
+	}
+	
+	@Override
+	public void run() {
+		if (reverseCount.val.equals(configNodeNum.val)) {
+			System.out.println("Configuration finished");
+			if (clazz == null)
+				return;
+			try {
+				clazz.newInstance().execute();
+			} catch (InstantiationException e) {
+				e.printStackTrace();
+			} catch (IllegalAccessException e) {
+				e.printStackTrace();
+			}
+			return;
+		}
+		
+		ConfigurationFinish cs3 = new ConfigurationFinish(clazz);
+		cs3.reverseCount.setKey("local", "reverseCount");
+		cs3.configNodeNum.setKey("local", "configNodeNum");
+	}
+
+}
--- a/src/alice/topology/node/IncomingAbstractHostName.java	Tue Jan 17 03:52:39 2012 +0900
+++ b/src/alice/topology/node/IncomingAbstractHostName.java	Tue Jan 17 08:41:34 2012 +0900
@@ -7,16 +7,11 @@
 public class IncomingAbstractHostName extends CodeSegment {
 
 	public DataSegmentReceiver absName = new DataSegmentReceiver(ids, CommandType.PEEK);
-	private Class<CodeSegment> clazz;
-	
-	public IncomingAbstractHostName(Class<CodeSegment> clazz) {
-		this.clazz = clazz;
-	}
 
 	@Override
 	public void run() {
 		String absName = this.absName.val.asRawValue().getString();
-		IncomingConnectionInfo cs = new IncomingConnectionInfo(absName, clazz);
+		IncomingConnectionInfo cs = new IncomingConnectionInfo(absName, 0);
 		cs.hostInfo.setKey("manager", absName);
 	}
 
--- a/src/alice/topology/node/IncomingConnectionInfo.java	Tue Jan 17 03:52:39 2012 +0900
+++ b/src/alice/topology/node/IncomingConnectionInfo.java	Tue Jan 17 08:41:34 2012 +0900
@@ -3,10 +3,12 @@
 import java.io.IOException;
 
 import org.msgpack.MessagePack;
+import org.msgpack.type.ValueFactory;
 
 import alice.codesegment.CodeSegment;
 import alice.datasegment.CommandType;
 import alice.datasegment.DataSegment;
+import alice.datasegment.DataSegmentManager;
 import alice.datasegment.DataSegmentReceiver;
 import alice.topology.HostMessage;
 
@@ -14,37 +16,30 @@
 
 	public DataSegmentReceiver hostInfo = new DataSegmentReceiver(ids, CommandType.TAKE);
 	private String absName;
-	private Class<CodeSegment> clazz;
+	private int count;
 	
-	public IncomingConnectionInfo(String absName, Class<CodeSegment> clazz) {
+	public IncomingConnectionInfo(String absName, int count) {
 		this.absName = absName;
-		this.clazz = clazz;
+		this.count = count;
 	}
 
 	@Override
 	public void run() {
 		if (this.hostInfo.val == null) {
-			System.out.println("Configuration finished");
-			if (clazz == null)
-				return;
-			try {
-				clazz.newInstance().execute();
-			} catch (InstantiationException e) {
-				e.printStackTrace();
-			} catch (IllegalAccessException e) {
-				e.printStackTrace();
-			}
+			ods.put("local", "configNodeNum", ValueFactory.createIntegerValue(count));
+			
 			return;
 		}
 		MessagePack msgpack = new MessagePack();
 		try {
 			HostMessage hostInfo = msgpack.convert(this.hostInfo.val, HostMessage.class);
-			DataSegment.connect(hostInfo.connectionName, hostInfo.name, hostInfo.port);
+			DataSegmentManager manager = DataSegment.connect(hostInfo.connectionName, hostInfo.reverseName, hostInfo.name, hostInfo.port);
+			manager.put("reverseKey", ValueFactory.createRawValue(hostInfo.reverseName));
 		} catch (IOException e) {
 			e.printStackTrace();
 		}
 
-		IncomingConnectionInfo cs = new IncomingConnectionInfo(absName, clazz);
+		IncomingConnectionInfo cs = new IncomingConnectionInfo(absName, ++count);
 		cs.hostInfo.setKey("manager", absName);
 	}
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/topology/node/IncomingReverseKey.java	Tue Jan 17 08:41:34 2012 +0900
@@ -0,0 +1,30 @@
+package alice.topology.node;
+
+import org.msgpack.type.ValueFactory;
+
+import alice.codesegment.CodeSegment;
+import alice.datasegment.CommandType;
+import alice.datasegment.DataSegment;
+import alice.datasegment.DataSegmentReceiver;
+
+public class IncomingReverseKey extends CodeSegment {
+
+	public DataSegmentReceiver reverseKey = new DataSegmentReceiver(ids, CommandType.TAKE);
+	public DataSegmentReceiver reverseCount = new DataSegmentReceiver(ids, CommandType.PEEK);
+	@Override
+	public void run() {
+		String reverseKey = this.reverseKey.val.asRawValue().getString();
+		String from = this.reverseKey.from;
+		DataSegment.getAccept(from).reverseKey = reverseKey;
+		
+		int reverseCount = this.reverseCount.val.asIntegerValue().getInt();
+		reverseCount++;
+		ods.update("local", "reverseCount", ValueFactory.createIntegerValue(reverseCount));
+		
+		
+		IncomingReverseKey cs = new IncomingReverseKey();
+		cs.reverseKey.setKey("local", "reverseKey");
+		cs.reverseCount.setKey("local", "reverseCount");
+	}
+
+}
--- a/src/alice/topology/node/StartTopologyNode.java	Tue Jan 17 03:52:39 2012 +0900
+++ b/src/alice/topology/node/StartTopologyNode.java	Tue Jan 17 08:41:34 2012 +0900
@@ -5,6 +5,7 @@
 import java.net.UnknownHostException;
 
 import org.msgpack.MessagePack;
+import org.msgpack.type.ValueFactory;
 
 import alice.codesegment.CodeSegment;
 import alice.datasegment.DataSegment;
@@ -23,7 +24,7 @@
 	
 	@Override
 	public void run() {
-		DataSegmentManager manager = DataSegment.connect("manager", conf.managerHostName, conf.managerPort);
+		DataSegmentManager manager = DataSegment.connect("manager", "", conf.managerHostName, conf.managerPort);
 		try {
 			HostMessage host;
 			host = new HostMessage(InetAddress.getLocalHost().getHostName(), conf.localPort);
@@ -35,8 +36,18 @@
 			e.printStackTrace();
 		}
 		
-		IncomingAbstractHostName cs = new IncomingAbstractHostName(clazz);
-		cs.absName.setKey("local", "host");
+		IncomingAbstractHostName cs1 = new IncomingAbstractHostName();
+		cs1.absName.setKey("local", "host");
+		
+		IncomingReverseKey cs2 = new IncomingReverseKey();
+		cs2.reverseKey.setKey("local", "reverseKey");
+		cs2.reverseCount.setKey("local", "reverseCount");
+		
+		ods.put("local", "reverseCount", ValueFactory.createIntegerValue(0));
+		
+		ConfigurationFinish cs3 = new ConfigurationFinish(clazz);
+		cs3.reverseCount.setKey("local", "reverseCount");
+		cs3.configNodeNum.setKey("local", "configNodeNum");
 	}
 
 }