# HG changeset patch # User Shinji KONO # Date 1514650132 -32400 # Node ID 646f705e65b11af922f9f4b33b65c1bf7758e80e # Parent f05a89484ec11b5f8a8e6cf3d5dcdd9da842d9c3 setkey on remote diff -r f05a89484ec1 -r 646f705e65b1 Alice.iml --- a/Alice.iml Sat Dec 30 20:33:06 2017 +0900 +++ b/Alice.iml Sun Dec 31 01:08:52 2017 +0900 @@ -1,362 +1,13 @@ - - - + - - - - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file diff -r f05a89484ec1 -r 646f705e65b1 src/main/java/alice/daemon/AliceDaemon.java --- a/src/main/java/alice/daemon/AliceDaemon.java Sat Dec 30 20:33:06 2017 +0900 +++ b/src/main/java/alice/daemon/AliceDaemon.java Sun Dec 31 01:08:52 2017 +0900 @@ -7,7 +7,7 @@ import java.net.ServerSocket; import java.nio.channels.ServerSocketChannel; -//import org.apache.log4j.Logger; +import org.apache.log4j.Logger; //import org.apache.log4j.PatternLayout; //import org.apache.log4j.WriterAppender; @@ -15,7 +15,7 @@ private Config conf; private AcceptThread acceptThread; - //private Logger log = Logger.getLogger(AliceDaemon.class); + private Logger log = Logger.getLogger(AliceDaemon.class); public AliceDaemon(Config conf) { this.conf = conf; @@ -50,7 +50,7 @@ // listen on any address ipv4/ipv6 InetSocketAddress a = new InetSocketAddress("::", conf.localPort); - System.out.println("AliceDaemon.listen: bind to " + a); + log.info("AliceDaemon.listen: bind to " + a); ss.bind(a); acceptThread = new AcceptThread(ss, "ACCEPT" + conf.localPort); acceptThread.start(); diff -r f05a89484ec1 -r 646f705e65b1 src/main/java/alice/daemon/Connection.java --- a/src/main/java/alice/daemon/Connection.java Sat Dec 30 20:33:06 2017 +0900 +++ b/src/main/java/alice/daemon/Connection.java Sun Dec 31 01:08:52 2017 +0900 @@ -56,7 +56,7 @@ } public void putConnectionInfo() { - if (name!=null){ + if (name!=null) { ConnectionInfo c = new ConnectionInfo(name, socket); ReceiveData rData = new ReceiveData(c); DataSegment.getLocal().put("_DISCONNECT", rData, false); diff -r f05a89484ec1 -r 646f705e65b1 src/main/java/alice/daemon/IncomingTcpConnection.java --- a/src/main/java/alice/daemon/IncomingTcpConnection.java Sat Dec 30 20:33:06 2017 +0900 +++ b/src/main/java/alice/daemon/IncomingTcpConnection.java Sun Dec 31 01:08:52 2017 +0900 @@ -56,11 +56,12 @@ ReceiveData rData = null; CommandMessage msg = unpacker.read(CommandMessage.class);///read header CommandType type = CommandType.getCommandTypeFromId(msg.type); - int dataSize = unpacker.readInt(); - byte[] data = new byte[dataSize]; + switch (type) { case UPDATE: case PUT: + int dataSize = unpacker.readInt(); + byte[] data = new byte[dataSize]; connection.socket.getInputStream().read(data); rData = new ReceiveData(data, msg.compressed, msg.dataSize); @@ -108,10 +109,12 @@ lmanager.getDataSegmentKey(msg.key).runCommand(cmd);//ToDo:fix break; case REPLY: + int dataSize1 = unpacker.readInt(); + byte[] data1 = new byte[dataSize1]; cmd = manager.getAndRemoveCmd(msg.seq); - connection.socket.getInputStream().read(data); - rData = new ReceiveData(data, msg.compressed, msg.dataSize); + connection.socket.getInputStream().read(data1); + rData = new ReceiveData(data1, msg.compressed, msg.dataSize); Command rCmd = new Command(type, null, null, rData, msg.index, msg.seq, null, null, ""); cmd.setCompressFlag(msg.compressed); diff -r f05a89484ec1 -r 646f705e65b1 src/main/java/alice/daemon/OutboundTcpConnection.java --- a/src/main/java/alice/daemon/OutboundTcpConnection.java Sat Dec 30 20:33:06 2017 +0900 +++ b/src/main/java/alice/daemon/OutboundTcpConnection.java Sun Dec 31 01:08:52 2017 +0900 @@ -27,7 +27,7 @@ default: break; } - connection.write(cmd);//ここでconvert()がよばれてる + connection.write(cmd); //ここでconvert()がよばれてる } catch (InterruptedException e) { e.printStackTrace(); } diff -r f05a89484ec1 -r 646f705e65b1 src/main/java/alice/datasegment/DataSegment.java --- a/src/main/java/alice/datasegment/DataSegment.java Sat Dec 30 20:33:06 2017 +0900 +++ b/src/main/java/alice/datasegment/DataSegment.java Sun Dec 31 01:08:52 2017 +0900 @@ -13,6 +13,7 @@ private CompressedLocalDataSegmentManager compressedLocal = new CompressedLocalDataSegmentManager(local);//追加 private ConcurrentHashMap dataSegmentManagers = new ConcurrentHashMap(); //TODO Over Head private ConcurrentHashMap acceptHash = new ConcurrentHashMap(); + private static Logger log = Logger.getLogger(DataSegment.class); private DataSegment() { dataSegmentManagers.put("local", local); @@ -50,7 +51,7 @@ register(connectionKey, manager); register("compressed" + connectionKey, compressedManager); - System.out.println("connected to " + hostName + " DSMkey: " + connectionKey); + log.info("connected to " + hostName + " DSMkey: " + connectionKey); return manager; } diff -r f05a89484ec1 -r 646f705e65b1 src/main/java/alice/datasegment/DataSegmentKey.java --- a/src/main/java/alice/datasegment/DataSegmentKey.java Sat Dec 30 20:33:06 2017 +0900 +++ b/src/main/java/alice/datasegment/DataSegmentKey.java Sun Dec 31 01:08:52 2017 +0900 @@ -89,7 +89,7 @@ if (cFlag && !data.rData.compressed()){ try { data.rData.zip(); - System.out.println("in reply zip"); + // System.out.println("in reply zip"); } catch (IOException e) { e.printStackTrace(); } diff -r f05a89484ec1 -r 646f705e65b1 src/main/java/alice/test/codesegment/remote/RemoteIncrement.java --- a/src/main/java/alice/test/codesegment/remote/RemoteIncrement.java Sat Dec 30 20:33:06 2017 +0900 +++ b/src/main/java/alice/test/codesegment/remote/RemoteIncrement.java Sun Dec 31 01:08:52 2017 +0900 @@ -19,12 +19,14 @@ } int num = this.num.asInteger(); System.out.println("[CodeSegment" + z + "] " + num++); - if (num == 5) System.exit(0); + if (num == 10) System.exit(0); RemoteIncrement cs = new RemoteIncrement(); - ods.put("compressedremote", "num", num); - ods.put("remote", "num", num); + if ( num % 2 == 0 ) + ods.put("compressedremote", "num", num); + else + ods.put("remote", "num", num); cs.num.setKey("compressedlocal", "num"); } diff -r f05a89484ec1 -r 646f705e65b1 src/main/java/alice/test/codesegment/remote/RemoteIncrement1.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/alice/test/codesegment/remote/RemoteIncrement1.java Sun Dec 31 01:08:52 2017 +0900 @@ -0,0 +1,29 @@ +package alice.test.codesegment.remote; + +import alice.codesegment.CodeSegment; +import alice.datasegment.CommandType; +import alice.datasegment.Receiver; + +public class RemoteIncrement1 extends CodeSegment { + + public Receiver num = ids.create(CommandType.TAKE); + + RemoteIncrement1() { + num.setKey("remote","num2"); + } + + /** + * Increment DataSegment value up to 10 + */ + @Override + public void run() { + + int num = this.num.asInteger(); + System.out.println("remote " + num++); + if (num == 5) return ; + + new RemoteIncrement1(); + ods.put("remote", "num2", num); + } + +} \ No newline at end of file diff -r f05a89484ec1 -r 646f705e65b1 src/main/java/alice/test/codesegment/remote/RemoteStartCodeSegment.java --- a/src/main/java/alice/test/codesegment/remote/RemoteStartCodeSegment.java Sat Dec 30 20:33:06 2017 +0900 +++ b/src/main/java/alice/test/codesegment/remote/RemoteStartCodeSegment.java Sun Dec 31 01:08:52 2017 +0900 @@ -7,9 +7,9 @@ @Override public void run() { RemoteIncrement cs = new RemoteIncrement(); - ods.put("compressedremote", "num", 0); - + ods.put("num2",0); + new RemoteIncrement1(); cs.num.setKey("compressedlocal", "num"); } } \ No newline at end of file diff -r f05a89484ec1 -r 646f705e65b1 src/main/java/alice/test/codesegment/remote/TestRemoteAlice.java --- a/src/main/java/alice/test/codesegment/remote/TestRemoteAlice.java Sat Dec 30 20:33:06 2017 +0900 +++ b/src/main/java/alice/test/codesegment/remote/TestRemoteAlice.java Sun Dec 31 01:08:52 2017 +0900 @@ -8,7 +8,7 @@ public static void main(String[] args) { TestRemoteConfig conf = new TestRemoteConfig(args);//トポロジー設定をコマンドライン引数からとって設定? - new AliceDaemon(conf).listen();//構成開始?TopMはない + new AliceDaemon(conf).listen(); DataSegment.connect(conf.key, "", conf.hostname, conf.connectPort); new RemoteStartCodeSegment().execute(); } diff -r f05a89484ec1 -r 646f705e65b1 src/main/java/alice/topology/manager/IncomingHosts.java --- a/src/main/java/alice/topology/manager/IncomingHosts.java Sat Dec 30 20:33:06 2017 +0900 +++ b/src/main/java/alice/topology/manager/IncomingHosts.java Sun Dec 31 01:08:52 2017 +0900 @@ -3,6 +3,7 @@ import java.util.HashMap; import java.util.LinkedList; +import org.apache.log4j.Logger; import org.msgpack.type.ValueFactory; import alice.codesegment.CodeSegment; @@ -18,6 +19,7 @@ private Receiver host = ids.create(CommandType.TAKE); // new coming host info private Receiver absCookieTable = ids.create(CommandType.TAKE); // cookie, AbsName HashMap private Receiver cookie = ids.create(CommandType.TAKE); // MD5 + private Logger log = Logger.getLogger(IncomingHosts.class); public IncomingHosts() { this.topology.setKey("resultParse"); @@ -48,6 +50,7 @@ ods.put(this.absCookieTable.key, absCookieTable); ods.put(nodeName, "cookie", cookie); + log.info( "toplology manager connected from " + nodeName); LinkedList nodes = topology.get(nodeName); for (NodeInfo nodeInfo : nodes) { diff -r f05a89484ec1 -r 646f705e65b1 src/main/java/alice/topology/node/IncomingConnectionInfo.java --- a/src/main/java/alice/topology/node/IncomingConnectionInfo.java Sat Dec 30 20:33:06 2017 +0900 +++ b/src/main/java/alice/topology/node/IncomingConnectionInfo.java Sun Dec 31 01:08:52 2017 +0900 @@ -5,25 +5,27 @@ import alice.datasegment.DataSegment; import alice.datasegment.Receiver; import alice.topology.HostMessage; +import org.apache.log4j.Logger; public class IncomingConnectionInfo extends CodeSegment { public Receiver hostInfo = ids.create(CommandType.TAKE); private String absName; private int count; + private Logger log = Logger.getLogger(IncomingConnectionInfo.class); public IncomingConnectionInfo(String absName, int count) { this.absName = absName; this.count = count; } - @Override public void run() { if (this.hostInfo.getVal() == null) { ods.put("local", "configNodeNum", count); } else { HostMessage hostInfo = this.hostInfo.asClass(HostMessage.class); + log.info("topology node " + absName + " will connect to " + hostInfo.name ); if (DataSegment.contains(hostInfo.connectionName)) { // need to wait remove by DeleteConnection ods.put("manager", absName, hostInfo); diff -r f05a89484ec1 -r 646f705e65b1 src/main/java/alice/topology/node/SaveCookie.java --- a/src/main/java/alice/topology/node/SaveCookie.java Sat Dec 30 20:33:06 2017 +0900 +++ b/src/main/java/alice/topology/node/SaveCookie.java Sun Dec 31 01:08:52 2017 +0900 @@ -3,10 +3,13 @@ import alice.codesegment.CodeSegment; import alice.datasegment.CommandType; import alice.datasegment.Receiver; +import org.apache.log4j.Logger; + public class SaveCookie extends CodeSegment{ private Receiver info = ids.create(CommandType.PEEK); + private Logger logger = Logger.getLogger(SaveCookie.class); public SaveCookie(){ info.setKey("cookie"); @@ -14,8 +17,7 @@ @Override public void run() { - System.out.println("SaveCookie:" + info.asString()); - + logger.info("SaveCookie:" + info.asString()); }