# HG changeset patch # User kazz # Date 1326956510 -32400 # Node ID ca079a730d0b6867ae973d08849d8499089c2c2e # Parent 20c67f67322476975a440492ef2a454640e63deb added method to OutputDataSegment and Receiver, to convert type from Value to Class without MessagePack diff -r 20c67f673224 -r ca079a730d0b src/alice/codesegment/OutputDataSegment.java --- a/src/alice/codesegment/OutputDataSegment.java Wed Jan 18 01:34:23 2012 +0900 +++ b/src/alice/codesegment/OutputDataSegment.java Thu Jan 19 16:01:50 2012 +0900 @@ -1,5 +1,8 @@ package alice.codesegment; +import java.io.IOException; + +import org.msgpack.MessagePack; import org.msgpack.type.Value; import org.msgpack.type.ValueFactory; @@ -31,6 +34,24 @@ DataSegment.get(managerKey).update(key, ValueFactory.createIntegerValue(val)); } + public void put(String managerKey, String key, T val) { + MessagePack msgpack = new MessagePack(); + try { + DataSegment.get(managerKey).put(key, msgpack.unconvert(val)); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public void update(String managerKey, String key, T val) { + MessagePack msgpack = new MessagePack(); + try { + DataSegment.get(managerKey).update(key, msgpack.unconvert(val)); + } catch (IOException e) { + e.printStackTrace(); + } + } + public void finish(String managerKey) { DataSegment.get(managerKey).finish(); } diff -r 20c67f673224 -r ca079a730d0b src/alice/datasegment/Receiver.java --- a/src/alice/datasegment/Receiver.java Wed Jan 18 01:34:23 2012 +0900 +++ b/src/alice/datasegment/Receiver.java Thu Jan 19 16:01:50 2012 +0900 @@ -1,5 +1,8 @@ package alice.datasegment; +import java.io.IOException; + +import org.msgpack.MessagePack; import org.msgpack.type.Value; import org.msgpack.type.ValueType; @@ -12,7 +15,6 @@ public String from; public CommandType type; - public Receiver(InputDataSegment ids, CommandType type) { this.ids = ids; this.type = type; @@ -49,4 +51,14 @@ return 0; } + public T asClass(Class clazz) { + MessagePack msgpack = new MessagePack(); + try { + return msgpack.convert(val, clazz); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + } diff -r 20c67f673224 -r ca079a730d0b src/alice/datasegment/RemoteDataSegmentManager.java --- a/src/alice/datasegment/RemoteDataSegmentManager.java Wed Jan 18 01:34:23 2012 +0900 +++ b/src/alice/datasegment/RemoteDataSegmentManager.java Thu Jan 19 16:01:50 2012 +0900 @@ -17,17 +17,6 @@ Connection connection; Logger logger = Logger.getLogger(RemoteDataSegmentManager.class); - // TODO: delete this constructor later - @Deprecated - public RemoteDataSegmentManager(Connection connection) { - this.connection = connection; - new IncomingTcpConnection(connection, this, "").start(); - new OutboundTcpConnection(connection).start(); - new Thread(replyThread, "RemoteDataSegmentManager-" - + connection.socket.getInetAddress().getHostName() - + ":" + connection.socket.getPort()).start(); - } - public RemoteDataSegmentManager(String connectionKey, final String reverseKey, final String hostName, final int port) { connection = new Connection(); final RemoteDataSegmentManager manager = this; diff -r 20c67f673224 -r ca079a730d0b src/alice/test/codesegment/local/TestCodeSegment.java --- a/src/alice/test/codesegment/local/TestCodeSegment.java Wed Jan 18 01:34:23 2012 +0900 +++ b/src/alice/test/codesegment/local/TestCodeSegment.java Thu Jan 19 16:01:50 2012 +0900 @@ -1,8 +1,5 @@ package alice.test.codesegment.local; -import org.msgpack.type.Value; -import org.msgpack.type.ValueFactory; - import alice.codesegment.CodeSegment; import alice.datasegment.CommandType; import alice.datasegment.Receiver; @@ -25,8 +22,7 @@ TestCodeSegment cs = new TestCodeSegment(); cs.arg1.setKey("local", "key1", arg1.index); - Value val = ValueFactory.createRawValue("String data"); - ods.update("local", "key1", val); + ods.update("local", "key1", "String data"); } } diff -r 20c67f673224 -r ca079a730d0b src/alice/test/codesegment/remote/RemoteIncrement.java --- a/src/alice/test/codesegment/remote/RemoteIncrement.java Wed Jan 18 01:34:23 2012 +0900 +++ b/src/alice/test/codesegment/remote/RemoteIncrement.java Thu Jan 19 16:01:50 2012 +0900 @@ -1,7 +1,5 @@ package alice.test.codesegment.remote; -import org.msgpack.type.ValueFactory; - import alice.codesegment.CodeSegment; import alice.datasegment.CommandType; import alice.datasegment.Receiver; @@ -12,14 +10,14 @@ @Override public void run() { - int num = this.num.val.asIntegerValue().getInt(); + int num = this.num.asInteger(); System.out.println("[CodeSegment] " + num++); if (num == 10) System.exit(0); RemoteIncrement cs = new RemoteIncrement(); cs.num.setKey("remote", "num"); - ods.put("local", "num", ValueFactory.createIntegerValue(num)); + ods.put("local", "num", num); } } diff -r 20c67f673224 -r ca079a730d0b src/alice/test/codesegment/remote/RemoteStartCodeSegment.java --- a/src/alice/test/codesegment/remote/RemoteStartCodeSegment.java Wed Jan 18 01:34:23 2012 +0900 +++ b/src/alice/test/codesegment/remote/RemoteStartCodeSegment.java Thu Jan 19 16:01:50 2012 +0900 @@ -1,8 +1,5 @@ package alice.test.codesegment.remote; -import org.msgpack.type.Value; -import org.msgpack.type.ValueFactory; - import alice.codesegment.CodeSegment; public class RemoteStartCodeSegment extends CodeSegment { @@ -12,8 +9,7 @@ RemoteIncrement cs = new RemoteIncrement(); cs.num.setKey("remote", "num"); - Value num = ValueFactory.createIntegerValue(0); - ods.put("local", "num", num); + ods.put("local", "num", 0); } } diff -r 20c67f673224 -r ca079a730d0b src/alice/test/codesegment/remote/TestRemoteAlice.java --- a/src/alice/test/codesegment/remote/TestRemoteAlice.java Wed Jan 18 01:34:23 2012 +0900 +++ b/src/alice/test/codesegment/remote/TestRemoteAlice.java Thu Jan 19 16:01:50 2012 +0900 @@ -1,13 +1,7 @@ package alice.test.codesegment.remote; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.channels.SocketChannel; - import alice.daemon.AliceDaemon; -import alice.daemon.Connection; import alice.datasegment.DataSegment; -import alice.datasegment.RemoteDataSegmentManager; public class TestRemoteAlice { @@ -15,23 +9,7 @@ TestRemoteConfig conf = new TestRemoteConfig(args); new AliceDaemon(conf).listen(); - - boolean connect = true; - do { - try { - SocketChannel sc = SocketChannel.open(new InetSocketAddress(conf.hostname, conf.connectPort)); - Connection connection = new Connection(sc.socket()); - RemoteDataSegmentManager manager = new RemoteDataSegmentManager(connection); - DataSegment.regist(conf.key, manager); - connect = false; - } catch (IOException e) { - try { - Thread.sleep(500); - } catch (InterruptedException e1) { - e1.printStackTrace(); - } - } - } while (connect); + DataSegment.connect(conf.key, "", conf.hostname, conf.connectPort); new RemoteStartCodeSegment().execute(); } diff -r 20c67f673224 -r ca079a730d0b src/alice/test/topology/ring/RingMessagePassing.java --- a/src/alice/test/topology/ring/RingMessagePassing.java Wed Jan 18 01:34:23 2012 +0900 +++ b/src/alice/test/topology/ring/RingMessagePassing.java Thu Jan 19 16:01:50 2012 +0900 @@ -15,7 +15,6 @@ public void run() { int counter = this.counter.asInteger(); - try { System.out.print("[" + InetAddress.getLocalHost().getHostName() + "] "); } catch (UnknownHostException e) { diff -r 20c67f673224 -r ca079a730d0b src/alice/topology/manager/IncomingHosts.java --- a/src/alice/topology/manager/IncomingHosts.java Wed Jan 18 01:34:23 2012 +0900 +++ b/src/alice/topology/manager/IncomingHosts.java Thu Jan 19 16:01:50 2012 +0900 @@ -1,17 +1,15 @@ package alice.topology.manager; -import java.io.IOException; import java.util.HashMap; import java.util.LinkedList; import org.apache.log4j.Logger; -import org.msgpack.MessagePack; + import org.msgpack.type.ValueFactory; import alice.codesegment.CodeSegment; import alice.datasegment.CommandType; import alice.datasegment.DataSegment; -import alice.datasegment.DataSegmentManager; import alice.datasegment.Receiver; import alice.topology.HostMessage; @@ -29,21 +27,15 @@ @Override public void run() { - MessagePack msgpack = new MessagePack(); - try { - HostMessage host = msgpack.convert(this.host.val, HostMessage.class); - String nodeName = nodeNames.poll(); - // Manager connect to Node - DataSegmentManager manager = DataSegment.connect(nodeName, "", host.name, host.port); - manager.put("host", ValueFactory.createRawValue(nodeName)); - LinkedList nodes = topology.get(nodeName); - for (NodeInfo nodeInfo : nodes) { - HostMessage newHost = new HostMessage(host.name, host.port, nodeInfo.connectionName, nodeInfo.reverseName); - ods.put("local", nodeInfo.sourceNodeName, msgpack.unconvert(newHost)); - } - } catch (IOException e) { - logger.error("HostMessage format error"); - e.printStackTrace(); + HostMessage host = this.host.asClass(HostMessage.class); + String nodeName = nodeNames.poll(); + // Manager connect to Node + DataSegment.connect(nodeName, "", host.name, host.port); + ods.put(nodeName, "host", nodeName); + LinkedList nodes = topology.get(nodeName); + for (NodeInfo nodeInfo : nodes) { + HostMessage newHost = new HostMessage(host.name, host.port, nodeInfo.connectionName, nodeInfo.reverseName); + ods.put("local", nodeInfo.sourceNodeName, newHost); } if (nodeNames.isEmpty()) { diff -r 20c67f673224 -r ca079a730d0b src/alice/topology/node/IncomingAbstractHostName.java --- a/src/alice/topology/node/IncomingAbstractHostName.java Wed Jan 18 01:34:23 2012 +0900 +++ b/src/alice/topology/node/IncomingAbstractHostName.java Thu Jan 19 16:01:50 2012 +0900 @@ -10,7 +10,7 @@ @Override public void run() { - String absName = this.absName.val.asRawValue().getString(); + String absName = this.absName.asString(); IncomingConnectionInfo cs = new IncomingConnectionInfo(absName, 0); cs.hostInfo.setKey("manager", absName); } diff -r 20c67f673224 -r ca079a730d0b src/alice/topology/node/IncomingConnectionInfo.java --- a/src/alice/topology/node/IncomingConnectionInfo.java Wed Jan 18 01:34:23 2012 +0900 +++ b/src/alice/topology/node/IncomingConnectionInfo.java Thu Jan 19 16:01:50 2012 +0900 @@ -1,9 +1,5 @@ package alice.topology.node; -import java.io.IOException; - -import org.msgpack.MessagePack; - import alice.codesegment.CodeSegment; import alice.datasegment.CommandType; import alice.datasegment.DataSegment; @@ -27,15 +23,10 @@ ods.put("local", "configNodeNum", count); return; } - MessagePack msgpack = new MessagePack(); - try { - HostMessage hostInfo = msgpack.convert(this.hostInfo.val, HostMessage.class); - DataSegment.connect(hostInfo.connectionName, hostInfo.reverseName, hostInfo.name, hostInfo.port); - //manager.put("reverseKey", ValueFactory.createRawValue(hostInfo.reverseName)); - ods.put(hostInfo.connectionName, "reverseKey", hostInfo.reverseName); - } catch (IOException e) { - e.printStackTrace(); - } + + HostMessage hostInfo = this.hostInfo.asClass(HostMessage.class); + DataSegment.connect(hostInfo.connectionName, hostInfo.reverseName, hostInfo.name, hostInfo.port); + ods.put(hostInfo.connectionName, "reverseKey", hostInfo.reverseName); IncomingConnectionInfo cs = new IncomingConnectionInfo(absName, ++count); cs.hostInfo.setKey("manager", absName); diff -r 20c67f673224 -r ca079a730d0b src/alice/topology/node/IncomingReverseKey.java --- a/src/alice/topology/node/IncomingReverseKey.java Wed Jan 18 01:34:23 2012 +0900 +++ b/src/alice/topology/node/IncomingReverseKey.java Thu Jan 19 16:01:50 2012 +0900 @@ -1,7 +1,5 @@ package alice.topology.node; -import org.msgpack.type.ValueFactory; - import alice.codesegment.CodeSegment; import alice.datasegment.CommandType; import alice.datasegment.DataSegment; @@ -18,9 +16,9 @@ String from = this.reverseKey.from; DataSegment.getAccept(from).reverseKey = reverseKey; - int reverseCount = this.reverseCount.val.asIntegerValue().getInt(); + int reverseCount = this.reverseCount.asInteger(); reverseCount++; - ods.update("local", "reverseCount", ValueFactory.createIntegerValue(reverseCount)); + ods.update("local", "reverseCount", reverseCount); IncomingReverseKey cs = new IncomingReverseKey(); diff -r 20c67f673224 -r ca079a730d0b src/alice/topology/node/StartTopologyNode.java --- a/src/alice/topology/node/StartTopologyNode.java Wed Jan 18 01:34:23 2012 +0900 +++ b/src/alice/topology/node/StartTopologyNode.java Thu Jan 19 16:01:50 2012 +0900 @@ -1,15 +1,10 @@ package alice.topology.node; -import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; -import org.msgpack.MessagePack; -import org.msgpack.type.ValueFactory; - import alice.codesegment.CodeSegment; import alice.datasegment.DataSegment; -import alice.datasegment.DataSegmentManager; import alice.topology.HostMessage; public class StartTopologyNode extends CodeSegment { @@ -24,17 +19,16 @@ @Override public void run() { - DataSegmentManager manager = DataSegment.connect("manager", "", conf.managerHostName, conf.managerPort); + DataSegment.connect("manager", "", conf.managerHostName, conf.managerPort); + String localHostName = null; try { - HostMessage host; - host = new HostMessage(InetAddress.getLocalHost().getHostName(), conf.localPort); - MessagePack msgpack = new MessagePack(); - manager.put("host", msgpack.unconvert(host)); + localHostName = InetAddress.getLocalHost().getHostName(); } catch (UnknownHostException e) { e.printStackTrace(); - } catch (IOException e) { - e.printStackTrace(); } + HostMessage host; + host = new HostMessage(localHostName, conf.localPort); + ods.put("manager", "host", host); IncomingAbstractHostName cs1 = new IncomingAbstractHostName(); cs1.absName.setKey("local", "host"); @@ -43,7 +37,7 @@ cs2.reverseKey.setKey("local", "reverseKey"); cs2.reverseCount.setKey("local", "reverseCount"); - ods.put("local", "reverseCount", ValueFactory.createIntegerValue(0)); + ods.put("local", "reverseCount", 0); ConfigurationFinish cs3 = new ConfigurationFinish(clazz); cs3.reverseCount.setKey("local", "reverseCount");