changeset 34:ca079a730d0b

added method to OutputDataSegment and Receiver, to convert type from Value to Class<?> without MessagePack
author kazz <kazz@cr.ie.u-ryukyu.ac.jp>
date Thu, 19 Jan 2012 16:01:50 +0900
parents 20c67f673224
children ac3b48c5f4da
files src/alice/codesegment/OutputDataSegment.java src/alice/datasegment/Receiver.java src/alice/datasegment/RemoteDataSegmentManager.java src/alice/test/codesegment/local/TestCodeSegment.java src/alice/test/codesegment/remote/RemoteIncrement.java src/alice/test/codesegment/remote/RemoteStartCodeSegment.java src/alice/test/codesegment/remote/TestRemoteAlice.java src/alice/test/topology/ring/RingMessagePassing.java src/alice/topology/manager/IncomingHosts.java src/alice/topology/node/IncomingAbstractHostName.java src/alice/topology/node/IncomingConnectionInfo.java src/alice/topology/node/IncomingReverseKey.java src/alice/topology/node/StartTopologyNode.java
diffstat 13 files changed, 63 insertions(+), 99 deletions(-) [+]
line wrap: on
line diff
--- a/src/alice/codesegment/OutputDataSegment.java	Wed Jan 18 01:34:23 2012 +0900
+++ b/src/alice/codesegment/OutputDataSegment.java	Thu Jan 19 16:01:50 2012 +0900
@@ -1,5 +1,8 @@
 package alice.codesegment;
 
+import java.io.IOException;
+
+import org.msgpack.MessagePack;
 import org.msgpack.type.Value;
 import org.msgpack.type.ValueFactory;
 
@@ -31,6 +34,24 @@
 		DataSegment.get(managerKey).update(key, ValueFactory.createIntegerValue(val));
 	}
 	
+	public <T> void put(String managerKey, String key, T val) {
+		MessagePack msgpack = new MessagePack();
+		try {
+			DataSegment.get(managerKey).put(key, msgpack.unconvert(val));
+		} catch (IOException e) {
+			e.printStackTrace();
+		}
+	}
+	
+	public <T> void update(String managerKey, String key, T val) {
+		MessagePack msgpack = new MessagePack();
+		try {
+			DataSegment.get(managerKey).update(key, msgpack.unconvert(val));
+		} catch (IOException e) {
+			e.printStackTrace();
+		}
+	}
+	
 	public void finish(String managerKey) {
 		DataSegment.get(managerKey).finish();
 	}
--- a/src/alice/datasegment/Receiver.java	Wed Jan 18 01:34:23 2012 +0900
+++ b/src/alice/datasegment/Receiver.java	Thu Jan 19 16:01:50 2012 +0900
@@ -1,5 +1,8 @@
 package alice.datasegment;
 
+import java.io.IOException;
+
+import org.msgpack.MessagePack;
 import org.msgpack.type.Value;
 import org.msgpack.type.ValueType;
 
@@ -12,7 +15,6 @@
 	public String from;
 	public CommandType type;
 	
-	
 	public Receiver(InputDataSegment ids, CommandType type) {
 		this.ids = ids;
 		this.type = type;
@@ -49,4 +51,14 @@
 		return 0;
 	}
 	
+	public <T> T asClass(Class<T> clazz) {
+		MessagePack msgpack = new MessagePack();
+		try {
+			return msgpack.convert(val, clazz);
+		} catch (IOException e) {
+			e.printStackTrace();
+		}
+		return null;
+	}
+	
 }
--- a/src/alice/datasegment/RemoteDataSegmentManager.java	Wed Jan 18 01:34:23 2012 +0900
+++ b/src/alice/datasegment/RemoteDataSegmentManager.java	Thu Jan 19 16:01:50 2012 +0900
@@ -17,17 +17,6 @@
 	Connection connection;
 	Logger logger = Logger.getLogger(RemoteDataSegmentManager.class);
 	
-	// TODO: delete this constructor later
-	@Deprecated
-	public RemoteDataSegmentManager(Connection connection) {
-		this.connection = connection;
-		new IncomingTcpConnection(connection, this, "").start();
-		new OutboundTcpConnection(connection).start();
-		new Thread(replyThread, "RemoteDataSegmentManager-"
-		+ connection.socket.getInetAddress().getHostName()
-		+ ":" + connection.socket.getPort()).start();
-	}
-	
 	public RemoteDataSegmentManager(String connectionKey, final String reverseKey, final String hostName, final int port) {
 		connection = new Connection();
 		final RemoteDataSegmentManager manager = this;
--- a/src/alice/test/codesegment/local/TestCodeSegment.java	Wed Jan 18 01:34:23 2012 +0900
+++ b/src/alice/test/codesegment/local/TestCodeSegment.java	Thu Jan 19 16:01:50 2012 +0900
@@ -1,8 +1,5 @@
 package alice.test.codesegment.local;
 
-import org.msgpack.type.Value;
-import org.msgpack.type.ValueFactory;
-
 import alice.codesegment.CodeSegment;
 import alice.datasegment.CommandType;
 import alice.datasegment.Receiver;
@@ -25,8 +22,7 @@
 		TestCodeSegment cs = new TestCodeSegment();
 		cs.arg1.setKey("local", "key1", arg1.index);
 		
-		Value val = ValueFactory.createRawValue("String data");
-		ods.update("local", "key1", val);
+		ods.update("local", "key1", "String data");
 	}
 
 }
--- a/src/alice/test/codesegment/remote/RemoteIncrement.java	Wed Jan 18 01:34:23 2012 +0900
+++ b/src/alice/test/codesegment/remote/RemoteIncrement.java	Thu Jan 19 16:01:50 2012 +0900
@@ -1,7 +1,5 @@
 package alice.test.codesegment.remote;
 
-import org.msgpack.type.ValueFactory;
-
 import alice.codesegment.CodeSegment;
 import alice.datasegment.CommandType;
 import alice.datasegment.Receiver;
@@ -12,14 +10,14 @@
 
 	@Override
 	public void run() {
-		int num = this.num.val.asIntegerValue().getInt();
+		int num = this.num.asInteger();
 		System.out.println("[CodeSegment] " + num++);
 		if (num == 10) System.exit(0);
 
 		RemoteIncrement cs = new RemoteIncrement();
 		cs.num.setKey("remote", "num");
 		
-		ods.put("local", "num", ValueFactory.createIntegerValue(num));
+		ods.put("local", "num", num);
 	}
 
 }
--- a/src/alice/test/codesegment/remote/RemoteStartCodeSegment.java	Wed Jan 18 01:34:23 2012 +0900
+++ b/src/alice/test/codesegment/remote/RemoteStartCodeSegment.java	Thu Jan 19 16:01:50 2012 +0900
@@ -1,8 +1,5 @@
 package alice.test.codesegment.remote;
 
-import org.msgpack.type.Value;
-import org.msgpack.type.ValueFactory;
-
 import alice.codesegment.CodeSegment;
 
 public class RemoteStartCodeSegment extends CodeSegment {
@@ -12,8 +9,7 @@
 		RemoteIncrement cs = new RemoteIncrement();
 		cs.num.setKey("remote", "num");
 		
-		Value num = ValueFactory.createIntegerValue(0);
-		ods.put("local", "num", num);
+		ods.put("local", "num", 0);
 	}
 
 }
--- a/src/alice/test/codesegment/remote/TestRemoteAlice.java	Wed Jan 18 01:34:23 2012 +0900
+++ b/src/alice/test/codesegment/remote/TestRemoteAlice.java	Thu Jan 19 16:01:50 2012 +0900
@@ -1,13 +1,7 @@
 package alice.test.codesegment.remote;
 
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.channels.SocketChannel;
-
 import alice.daemon.AliceDaemon;
-import alice.daemon.Connection;
 import alice.datasegment.DataSegment;
-import alice.datasegment.RemoteDataSegmentManager;
 
 public class TestRemoteAlice {
 
@@ -15,23 +9,7 @@
 		TestRemoteConfig conf = new TestRemoteConfig(args);
 
 		new AliceDaemon(conf).listen();
-		
-		boolean connect = true;
-		do {
-			try {
-				SocketChannel sc = SocketChannel.open(new InetSocketAddress(conf.hostname, conf.connectPort));
-				Connection connection = new Connection(sc.socket());
-				RemoteDataSegmentManager manager = new RemoteDataSegmentManager(connection);
-				DataSegment.regist(conf.key, manager);
-				connect = false;
-			} catch (IOException e) {
-				try {
-					Thread.sleep(500);
-				} catch (InterruptedException e1) {
-					e1.printStackTrace();
-				}
-			}
-		} while (connect);
+		DataSegment.connect(conf.key, "", conf.hostname, conf.connectPort);
 		
 		new RemoteStartCodeSegment().execute();
 	}
--- a/src/alice/test/topology/ring/RingMessagePassing.java	Wed Jan 18 01:34:23 2012 +0900
+++ b/src/alice/test/topology/ring/RingMessagePassing.java	Thu Jan 19 16:01:50 2012 +0900
@@ -15,7 +15,6 @@
 	public void run() {
 		int counter = this.counter.asInteger();
 		
-		
 		try {
 			System.out.print("[" + InetAddress.getLocalHost().getHostName() + "] ");
 		} catch (UnknownHostException e) {
--- a/src/alice/topology/manager/IncomingHosts.java	Wed Jan 18 01:34:23 2012 +0900
+++ b/src/alice/topology/manager/IncomingHosts.java	Thu Jan 19 16:01:50 2012 +0900
@@ -1,17 +1,15 @@
 package alice.topology.manager;
 
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.LinkedList;
 
 import org.apache.log4j.Logger;
-import org.msgpack.MessagePack;
+
 import org.msgpack.type.ValueFactory;
 
 import alice.codesegment.CodeSegment;
 import alice.datasegment.CommandType;
 import alice.datasegment.DataSegment;
-import alice.datasegment.DataSegmentManager;
 import alice.datasegment.Receiver;
 import alice.topology.HostMessage;
 
@@ -29,21 +27,15 @@
 
 	@Override
 	public void run() {
-		MessagePack msgpack = new MessagePack();
-		try {
-			HostMessage host = msgpack.convert(this.host.val, HostMessage.class);
-			String nodeName = nodeNames.poll();
-			// Manager connect to Node
-			DataSegmentManager manager = DataSegment.connect(nodeName, "", host.name, host.port);
-			manager.put("host", ValueFactory.createRawValue(nodeName));
-			LinkedList<NodeInfo> nodes = topology.get(nodeName);
-			for (NodeInfo nodeInfo : nodes) {
-				HostMessage newHost = new HostMessage(host.name, host.port, nodeInfo.connectionName, nodeInfo.reverseName);
-				ods.put("local", nodeInfo.sourceNodeName, msgpack.unconvert(newHost));
-			}
-		} catch (IOException e) {
-			logger.error("HostMessage format error");
-			e.printStackTrace();
+		HostMessage host = this.host.asClass(HostMessage.class);
+		String nodeName = nodeNames.poll();
+		// Manager connect to Node
+		DataSegment.connect(nodeName, "", host.name, host.port);
+		ods.put(nodeName, "host", nodeName);
+		LinkedList<NodeInfo> nodes = topology.get(nodeName);
+		for (NodeInfo nodeInfo : nodes) {
+			HostMessage newHost = new HostMessage(host.name, host.port, nodeInfo.connectionName, nodeInfo.reverseName);
+			ods.put("local", nodeInfo.sourceNodeName, newHost);
 		}
 		
 		if (nodeNames.isEmpty()) {
--- a/src/alice/topology/node/IncomingAbstractHostName.java	Wed Jan 18 01:34:23 2012 +0900
+++ b/src/alice/topology/node/IncomingAbstractHostName.java	Thu Jan 19 16:01:50 2012 +0900
@@ -10,7 +10,7 @@
 
 	@Override
 	public void run() {
-		String absName = this.absName.val.asRawValue().getString();
+		String absName = this.absName.asString();
 		IncomingConnectionInfo cs = new IncomingConnectionInfo(absName, 0);
 		cs.hostInfo.setKey("manager", absName);
 	}
--- a/src/alice/topology/node/IncomingConnectionInfo.java	Wed Jan 18 01:34:23 2012 +0900
+++ b/src/alice/topology/node/IncomingConnectionInfo.java	Thu Jan 19 16:01:50 2012 +0900
@@ -1,9 +1,5 @@
 package alice.topology.node;
 
-import java.io.IOException;
-
-import org.msgpack.MessagePack;
-
 import alice.codesegment.CodeSegment;
 import alice.datasegment.CommandType;
 import alice.datasegment.DataSegment;
@@ -27,15 +23,10 @@
 			ods.put("local", "configNodeNum", count);
 			return;
 		}
-		MessagePack msgpack = new MessagePack();
-		try {
-			HostMessage hostInfo = msgpack.convert(this.hostInfo.val, HostMessage.class);
-			DataSegment.connect(hostInfo.connectionName, hostInfo.reverseName, hostInfo.name, hostInfo.port);
-			//manager.put("reverseKey", ValueFactory.createRawValue(hostInfo.reverseName));
-			ods.put(hostInfo.connectionName, "reverseKey", hostInfo.reverseName);
-		} catch (IOException e) {
-			e.printStackTrace();
-		}
+		
+		HostMessage hostInfo = this.hostInfo.asClass(HostMessage.class);
+		DataSegment.connect(hostInfo.connectionName, hostInfo.reverseName, hostInfo.name, hostInfo.port);
+		ods.put(hostInfo.connectionName, "reverseKey", hostInfo.reverseName);
 
 		IncomingConnectionInfo cs = new IncomingConnectionInfo(absName, ++count);
 		cs.hostInfo.setKey("manager", absName);
--- a/src/alice/topology/node/IncomingReverseKey.java	Wed Jan 18 01:34:23 2012 +0900
+++ b/src/alice/topology/node/IncomingReverseKey.java	Thu Jan 19 16:01:50 2012 +0900
@@ -1,7 +1,5 @@
 package alice.topology.node;
 
-import org.msgpack.type.ValueFactory;
-
 import alice.codesegment.CodeSegment;
 import alice.datasegment.CommandType;
 import alice.datasegment.DataSegment;
@@ -18,9 +16,9 @@
 		String from = this.reverseKey.from;
 		DataSegment.getAccept(from).reverseKey = reverseKey;
 		
-		int reverseCount = this.reverseCount.val.asIntegerValue().getInt();
+		int reverseCount = this.reverseCount.asInteger();
 		reverseCount++;
-		ods.update("local", "reverseCount", ValueFactory.createIntegerValue(reverseCount));
+		ods.update("local", "reverseCount", reverseCount);
 		
 		
 		IncomingReverseKey cs = new IncomingReverseKey();
--- a/src/alice/topology/node/StartTopologyNode.java	Wed Jan 18 01:34:23 2012 +0900
+++ b/src/alice/topology/node/StartTopologyNode.java	Thu Jan 19 16:01:50 2012 +0900
@@ -1,15 +1,10 @@
 package alice.topology.node;
 
-import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 
-import org.msgpack.MessagePack;
-import org.msgpack.type.ValueFactory;
-
 import alice.codesegment.CodeSegment;
 import alice.datasegment.DataSegment;
-import alice.datasegment.DataSegmentManager;
 import alice.topology.HostMessage;
 
 public class StartTopologyNode extends CodeSegment {
@@ -24,17 +19,16 @@
 	
 	@Override
 	public void run() {
-		DataSegmentManager manager = DataSegment.connect("manager", "", conf.managerHostName, conf.managerPort);
+		DataSegment.connect("manager", "", conf.managerHostName, conf.managerPort);
+		String localHostName = null;
 		try {
-			HostMessage host;
-			host = new HostMessage(InetAddress.getLocalHost().getHostName(), conf.localPort);
-			MessagePack msgpack = new MessagePack();
-			manager.put("host", msgpack.unconvert(host));
+			localHostName = InetAddress.getLocalHost().getHostName();
 		} catch (UnknownHostException e) {
 			e.printStackTrace();
-		} catch (IOException e) {
-			e.printStackTrace();
 		}
+		HostMessage host;
+		host = new HostMessage(localHostName, conf.localPort);
+		ods.put("manager", "host", host);
 		
 		IncomingAbstractHostName cs1 = new IncomingAbstractHostName();
 		cs1.absName.setKey("local", "host");
@@ -43,7 +37,7 @@
 		cs2.reverseKey.setKey("local", "reverseKey");
 		cs2.reverseCount.setKey("local", "reverseCount");
 		
-		ods.put("local", "reverseCount", ValueFactory.createIntegerValue(0));
+		ods.put("local", "reverseCount", 0);
 		
 		ConfigurationFinish cs3 = new ConfigurationFinish(clazz);
 		cs3.reverseCount.setKey("local", "reverseCount");