changeset 615:82736f6fae50 dispose

merge
author surug
date Mon, 24 Jul 2017 19:16:35 +0900
parents 9324ef9728c0 (current diff) 86b39f5bf1d7 (diff)
children 747bcd5bbba1
files Alice.iml build.gradle
diffstat 23 files changed, 163 insertions(+), 117 deletions(-) [+]
line wrap: on
line diff
--- a/.hgignore	Mon Jul 24 19:15:01 2017 +0900
+++ b/.hgignore	Mon Jul 24 19:16:35 2017 +0900
@@ -7,4 +7,8 @@
 ^Alice\.jar$
 syntax: regexp
 syntax: glob
-.classpath
\ No newline at end of file
+.classpath
+
+build
+.gradle
+.idea
--- a/.hgtags	Mon Jul 24 19:15:01 2017 +0900
+++ b/.hgtags	Mon Jul 24 19:16:35 2017 +0900
@@ -1,2 +1,3 @@
 d7a3ecc8c4a193008f48513e24d8bd50481f0cc2 working
 d5d9ca4cbe87215b6a6e97e141b3cf98e095687d fish-example-worked
+924e5f52a61f58687719fe36f8d5e4a76472d180 before-multi-topology-manager
Binary file lib/j3dcore.jar has changed
Binary file lib/j3dutils.jar has changed
Binary file lib/vecmath.jar has changed
--- a/src/main/java/alice/daemon/AliceDaemon.java	Mon Jul 24 19:15:01 2017 +0900
+++ b/src/main/java/alice/daemon/AliceDaemon.java	Mon Jul 24 19:16:35 2017 +0900
@@ -7,39 +7,39 @@
 import java.net.ServerSocket;
 import java.nio.channels.ServerSocketChannel;
 
-import org.apache.log4j.Logger;
-import org.apache.log4j.PatternLayout;
-import org.apache.log4j.WriterAppender;
+//import org.apache.log4j.Logger;
+//import org.apache.log4j.PatternLayout;
+//import org.apache.log4j.WriterAppender;
 
 public class AliceDaemon {
 
     private Config conf;
     private AcceptThread acceptThread;
-    private Logger log = Logger.getLogger(AliceDaemon.class);
+    //private Logger log = Logger.getLogger(AliceDaemon.class);
 
     public AliceDaemon(Config conf) {
         this.conf = conf;
-        setLogger();
+        //setLogger();
     }
 
-    private void setLogger() {
-        Logger root = Logger.getRootLogger();
-        if (conf.level != null)
-            root.setLevel(conf.level);
-        if (conf.logFile == null)
-            return;
-        PatternLayout layout = new PatternLayout();
-        layout.setConversionPattern("%d %-5p %c - %m [%t] (%F:%L)%n");
-        try {
-            FileWriter writer = new FileWriter(conf.logFile);
-            WriterAppender writerAppender = new WriterAppender(layout, writer);
-            root.removeAllAppenders();
-            root.addAppender(writerAppender);
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-        log.info("configured");
-    }
+//    private void setLogger() {
+//        Logger root = Logger.getRootLogger();
+//        if (conf.level != null)
+//            root.setLevel(conf.level);
+//        if (conf.logFile == null)
+//            return;
+//        PatternLayout layout = new PatternLayout();
+//        layout.setConversionPattern("%d %-5p %c - %m [%t] (%F:%L)%n");
+//        try {
+//            FileWriter writer = new FileWriter(conf.logFile);
+//            WriterAppender writerAppender = new WriterAppender(layout, writer);
+//            root.removeAllAppenders();
+//            root.addAppender(writerAppender);
+//        } catch (IOException e) {
+//            e.printStackTrace();
+//        }
+//        log.info("configured");
+//    }
 
     public void listen() {
         try {
--- a/src/main/java/alice/daemon/CommandMessage.java	Mon Jul 24 19:15:01 2017 +0900
+++ b/src/main/java/alice/daemon/CommandMessage.java	Mon Jul 24 19:16:35 2017 +0900
@@ -12,25 +12,27 @@
     public int seq;//DSの待ち合わせを行っているCSを表すunique number
     public String key;//DS key
     public boolean quickFlag = false;//SEDAを挟まずに処理を行うかどうか
-    public boolean serialized = false;//シリアライズされているかどうか
     public boolean compressed = false;//圧縮されているかどうか
-    public int dataSize = 0;
+    public int dataSize = 0;//圧縮前のサイズ
 
-    public boolean setTime = false;//?
-    public long time;//?
-    public int depth;//?
+    //計測用
+    public boolean setTime = false;
+    public long time;
+    public int depth;
+    public boolean setZepped = false;
+    public int zippedDataSize;//圧縮後のサイズ
 
     public CommandMessage() {}
 
     public CommandMessage(int type, int index, int seq, String key
-            , boolean qFlag, boolean sFlag, boolean cFlag, int datasize) {
+            , boolean qFlag, boolean cFlag, int datasize) {
         this.type = type;
         this.index = index;
         this.seq = seq;
         this.key = key;
         this.quickFlag = qFlag;
-        this.serialized = sFlag;
         this.compressed = cFlag;
         this.dataSize = datasize;
     }
+
 }
--- a/src/main/java/alice/daemon/ConnectionInfo.java	Mon Jul 24 19:15:01 2017 +0900
+++ b/src/main/java/alice/daemon/ConnectionInfo.java	Mon Jul 24 19:16:35 2017 +0900
@@ -15,7 +15,7 @@
 
     public ConnectionInfo(String name, Socket socket){
         this.nodeName = name;
-        this.hostname = socket.getInetAddress().getHostName();
+        this.hostname = socket.getInetAddress().getHostAddress();
         this.addr = socket.getInetAddress().getHostAddress();
         this.port = socket.getPort();
     }
--- a/src/main/java/alice/daemon/IncomingTcpConnection.java	Mon Jul 24 19:15:01 2017 +0900
+++ b/src/main/java/alice/daemon/IncomingTcpConnection.java	Mon Jul 24 19:16:35 2017 +0900
@@ -59,16 +59,16 @@
                 switch (type) {
                 case UPDATE:
                 case PUT:
-                    if (msg.compressed) {
-                        rData = new ReceiveData(packer.read(unpacker.getSerializedByteArray(unpacker.readInt()), byte[].class), true, msg.dataSize);
-                    } else {
-                        rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()), false, msg.dataSize);
-                    }
+                    int dataSize = unpacker.readInt();
+                    rData = new ReceiveData(unpacker.getSerializedByteArray(dataSize), msg.compressed, msg.dataSize);
 
                     if (msg.setTime) {
-                        rData.setTime = true;
-                        rData.time = msg.time;
-                        rData.depth = msg.depth;
+                        rData.setTimes(msg.time, true, msg.depth);
+                    }
+                    if (msg.setZepped){
+                        rData.setZipped(msg.zippedDataSize, true);
+                    } else {
+                        rData.setZipped(dataSize, true);
                     }
 
                     cmd = new Command(type, null, null, rData, 0, 0, null, null, reverseKey);
@@ -100,11 +100,7 @@
                 case REPLY:
                     cmd = manager.getAndRemoveCmd(msg.seq);
 
-                    if (msg.compressed) {
-                        rData = new ReceiveData(packer.read(unpacker.getSerializedByteArray(unpacker.readInt()), byte[].class), true, msg.dataSize);
-                    } else {
-                        rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()), false, msg.dataSize);
-                    }
+                    rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()), msg.compressed, msg.dataSize);
 
                     Command rCmd = new Command(type, null, null, rData, msg.index, msg.seq, null, null, "");
                     cmd.setCompressFlag(msg.compressed);
--- a/src/main/java/alice/daemon/IncomingUdpConnection.java	Mon Jul 24 19:15:01 2017 +0900
+++ b/src/main/java/alice/daemon/IncomingUdpConnection.java	Mon Jul 24 19:16:35 2017 +0900
@@ -73,7 +73,6 @@
                     getLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd);
                     break;
                 case REPLY:
-                    System.out.println("in UDP REPLY");
                     cmd = manager.getAndRemoveCmd(msg.seq);
 
                     rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()));
--- a/src/main/java/alice/datasegment/Command.java	Mon Jul 24 19:15:01 2017 +0900
+++ b/src/main/java/alice/datasegment/Command.java	Mon Jul 24 19:16:35 2017 +0900
@@ -80,8 +80,6 @@
             byte[] header = null;
             byte[] data = null;
             byte[] dataSize = null;
-            boolean serialized = false;
-            boolean compressed = false;
             switch (type) {
         /*
          * UPDATE, PUT, REPLY need send DataSegment to RemoteDataSegment
@@ -95,19 +93,21 @@
                 case PUT:
                 case REPLY:
                     if(compressFlag){
-                        // ToDo: Do not pack again
-                        data = packer.write(rData.getZMessagePack());
-                        compressed = true;
+                        data = rData.getZMessagePack();
                     } else {
                         data = rData.getMessagePack();
-                        serialized = true;
                     }
 
-                    CommandMessage cm = new CommandMessage(type.id, index, seq, key, false, serialized, compressed, rData.getDataSize());
-                    if (rData.setTime) {
+                    CommandMessage cm = new CommandMessage(type.id, index, seq, key, false, compressFlag, rData.getDataSize());
+                    if (rData.getSetTime()) {
                         cm.setTime = true;
-                        cm.time = rData.time;
-                        cm.depth = rData.depth + 1;
+                        cm.time = rData.getTime();
+                        cm.depth = rData.getDepth() + 1;
+                    }
+
+                    if (rData.getSetZipped()){
+                        cm.setZepped = true;
+                        cm.zippedDataSize = rData.getZippedDataSize();
                     }
 
                     header = packer.write(cm);
@@ -118,7 +118,7 @@
                     buf.put(data);
                     break;
                 default:
-                    header = packer.write(new CommandMessage(type.id, index, seq, key, quickFlag, false, compressFlag, 0));
+                    header = packer.write(new CommandMessage(type.id, index, seq, key, quickFlag, compressFlag, 0));
                     buf = ByteBuffer.allocate(header.length);
                     buf.put(header);
                     break;
--- a/src/main/java/alice/datasegment/CompressedLocalDataSegmentManager.java	Mon Jul 24 19:15:01 2017 +0900
+++ b/src/main/java/alice/datasegment/CompressedLocalDataSegmentManager.java	Mon Jul 24 19:16:35 2017 +0900
@@ -17,30 +17,12 @@
 
     public CompressedLocalDataSegmentManager(LocalDataSegmentManager manager) {
         this.manager = manager;
-        new Thread(replyThread, "CompressedLocalDataSegmentManager-replyCommand").start();
     }
 
     public void setReverseKey(String s){
         reverseKey = s;
     }
 
-    private class RunCommand implements Runnable {
-
-        DataSegmentKey key;
-        Command cmd;
-
-        public RunCommand(DataSegmentKey key, Command cmd) {
-            this.key = key;
-            this.cmd = cmd;
-        }
-
-        @Override
-        public void run() {
-            key.runCommand(cmd);
-        }
-
-    }
-
     public void submitCommand(DataSegmentKey key, Command cmd) {
         manager.submitCommand(key, cmd);
     }
--- a/src/main/java/alice/datasegment/DataSegmentKey.java	Mon Jul 24 19:15:01 2017 +0900
+++ b/src/main/java/alice/datasegment/DataSegmentKey.java	Mon Jul 24 19:16:35 2017 +0900
@@ -1,5 +1,6 @@
 package alice.datasegment;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 
@@ -85,6 +86,14 @@
     }
 
     public void replyValue(Command cmd, DataSegmentValue data, boolean cFlag){
+        if (cFlag && !data.rData.compressed()){
+            try {
+                data.rData.zip();
+                System.out.println("in reply zip");
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
         Command rCmd = new Command(CommandType.REPLY, null, null, data.rData, data.index, cmd.seq, null, null, data.from);
         rCmd.setCompressFlag(cFlag);
 
--- a/src/main/java/alice/datasegment/DataSegmentManager.java	Mon Jul 24 19:15:01 2017 +0900
+++ b/src/main/java/alice/datasegment/DataSegmentManager.java	Mon Jul 24 19:16:35 2017 +0900
@@ -39,7 +39,7 @@
     };
 
     public Command getAndRemoveCmd(int index){
-        System.err.println("DSM getAndRemoveCmd seq : " + index);
+        //System.err.println("DSM getAndRemoveCmd seq : " + index);
         return seqHash.remove(index);
     }
 
--- a/src/main/java/alice/datasegment/ReceiveData.java	Mon Jul 24 19:15:01 2017 +0900
+++ b/src/main/java/alice/datasegment/ReceiveData.java	Mon Jul 24 19:16:35 2017 +0900
@@ -16,12 +16,14 @@
     private Object val;//for Object DS
     private byte[] messagePack;//for byteArray(serialized) DS
     private byte[] zMessagePack;//for byteArray(compressed) DS
-    private int dataSize;
+    private int dataSize;//圧縮前(MessagePack)のデータサイズ
     private Class<?> clazz;
 
-    public long time;//測定用
-    public boolean setTime = false;
-    public int depth = 1;
+    private long time;//測定用
+    private boolean setTime = false;
+    private int depth = 1;
+    private boolean setZepped = false;
+    private int zippedDataSize;//圧縮後のデータサイズ
 
     private static final MessagePack packer = new MessagePack();
 
@@ -100,7 +102,6 @@
      * @return
      */
     public <T> T asClass(Class<T> clazz) {///javasist
-        System.out.println("in asClass val:" + val + ", MP:" + messagePack + ", zMP:" + zMessagePack);
 
         try {
             if (val != null) {
@@ -136,7 +137,6 @@
 
     public byte[] getZMessagePack(){
         if (zMessagePack != null){
-            System.out.println("have zMessagePack");
             return zMessagePack;
         } else {
             try {
@@ -151,7 +151,6 @@
     }
 
     public void zip() throws IOException {
-        System.out.println("in zip");
         LinkedList<ByteBuffer> inputs = new LinkedList<ByteBuffer>();
         int inputIndex = 0;
         LinkedList<ByteBuffer> outputs = new LinkedList<ByteBuffer>();
@@ -197,40 +196,23 @@
             System.arraycopy(outputs.get(i).array(), 0, zMessagePack, 0 + tmp, outputs.get(i).limit());//limit? remaining?
             tmp += outputs.get(i).limit();
         }
-        System.out.print("in make zMessagePack2: ");
-        for (int i = 0; i < zMessagePack.length; i++) {
-            System.out.print(Integer.toHexString(zMessagePack[i] & 0xff));
-        }
-        System.out.print("\n");
     }
 
-    protected byte[] unzip(byte[] input, int zippedLength) {///read header & unzip
+    protected byte[] unzip(byte[] input, int dataSize) {///read header & unzip
         int length = input.length;
         Inflater inflater = new Inflater();
 
-        System.out.print("unziped input: ");
-        for (int i = 0; i < input.length; i++) {
-            System.out.print(Integer.toHexString(input[i] & 0xff));
-        }
-        System.out.print("\n");
-
-        byte [] output = new byte [zippedLength];///byteArray for unziped data
+        byte [] output = new byte [dataSize];///byteArray for unziped data
         inflater.setInput(input, 0, length);///set unzip data without header
 
         try {
-            inflater.inflate(output, 0, zippedLength);///unzip
+            inflater.inflate(output, 0, dataSize);///unzip
         } catch (DataFormatException e) {
             e.printStackTrace();
         }
 
         inflater.reset();
 
-        System.out.print("unziped: ");
-        for (int i = 0; i < output.length; i++) {
-            System.out.print(Integer.toHexString(output[i] & 0xff));
-        }
-        System.out.print("\n");
-
         return output;
  	}
 
@@ -264,4 +246,35 @@
         this.dataSize = datasize;
     }
 
+    public void setTimes(long time, boolean setTime, int depth){
+        this.time = time;
+        this.setTime = setTime;
+        this.depth = depth;
+    }
+
+    public long getTime(){
+        return this.time;
+    }
+
+    public boolean getSetTime(){
+        return this.setTime;
+    }
+
+    public  int getDepth(){
+        return this.depth;
+    }
+
+    public  void setZipped(int zippedDataSize, boolean setZepped){
+        this.zippedDataSize = zippedDataSize;
+        this.setZepped = setZepped;
+    }
+
+    public  int getZippedDataSize(){
+        return this.zippedDataSize;
+    }
+
+    public boolean getSetZipped(){
+        return this.setZepped;
+    }
+
 }
--- a/src/main/java/alice/datasegment/RemoteDataSegmentManager.java	Mon Jul 24 19:15:01 2017 +0900
+++ b/src/main/java/alice/datasegment/RemoteDataSegmentManager.java	Mon Jul 24 19:16:35 2017 +0900
@@ -18,7 +18,7 @@
 
     public RemoteDataSegmentManager(){}
 
-    public RemoteDataSegmentManager(String connectionKey, final String reverseKey, final String hostName, final int port) {
+    public RemoteDataSegmentManager(final String connectionKey, final String reverseKey, final String hostName, final int port) {
         logger = Logger.getLogger(connectionKey);
         connection = new Connection();
         connection.name = connectionKey;
@@ -92,7 +92,7 @@
 
     public void take1(boolean quickFlag, Command cmd) {
         int seq = this.seq.getAndIncrement();
-        System.err.println("DataSegment take seq :" + seq);
+        //System.err.println("DataSegment take seq :" + seq);
         cmd.setSeq(seq);
         seqHash.put(seq, cmd);
         cmd.setQuickFlag(quickFlag);
--- a/src/main/java/alice/test/codesegment/remote/RemoteIncrement.java	Mon Jul 24 19:15:01 2017 +0900
+++ b/src/main/java/alice/test/codesegment/remote/RemoteIncrement.java	Mon Jul 24 19:16:35 2017 +0900
@@ -13,14 +13,20 @@
      */
     @Override
     public void run() {
+        String z = "";
+        if (num.isCompressed()){
+            z = "zMP";
+        }
         int num = this.num.asInteger();
-        System.out.println("[CodeSegment] " + num++);
-        if (num == 10) System.exit(0);
+        System.out.println("[CodeSegment" + z + "] " + num++);
+        if (num == 5) System.exit(0);
 
         RemoteIncrement cs = new RemoteIncrement();
-        cs.num.setKey("compressedremote", "num");
 
-        ods.put("compressedlocal", "num", num);
+        ods.put("compressedremote", "num", num);
+        ods.put("remote", "num", num);
+
+        cs.num.setKey("compressedlocal", "num");
     }
 
 }
\ No newline at end of file
--- a/src/main/java/alice/test/codesegment/remote/RemoteStartCodeSegment.java	Mon Jul 24 19:15:01 2017 +0900
+++ b/src/main/java/alice/test/codesegment/remote/RemoteStartCodeSegment.java	Mon Jul 24 19:16:35 2017 +0900
@@ -7,8 +7,9 @@
     @Override
     public void run() {
         RemoteIncrement cs = new RemoteIncrement();
-        cs.num.setKey("compressedremote", "num");
 
-        ods.put("compressedlocal", "num", 0);
+        ods.put("compressedremote", "num", 0);
+
+        cs.num.setKey("compressedlocal", "num");
     }
 }
\ No newline at end of file
--- a/src/main/java/alice/test/topology/aquarium/fx/Aquarium.java	Mon Jul 24 19:15:01 2017 +0900
+++ b/src/main/java/alice/test/topology/aquarium/fx/Aquarium.java	Mon Jul 24 19:16:35 2017 +0900
@@ -31,7 +31,7 @@
 
     @Override
     public void start(Stage primaryStage) throws IOException {
-        String myName = getParameters().getRaw().get(0); // name
+        final String myName = getParameters().getRaw().get(0); // name
         primaryStage.setTitle("Aquarium "+ myName);
         primaryStage.setResizable(false);
         primaryStage.setOnCloseRequest(new EventHandler<WindowEvent>(){
--- a/src/main/java/alice/topology/manager/CreateTreeTopology.java	Mon Jul 24 19:15:01 2017 +0900
+++ b/src/main/java/alice/topology/manager/CreateTreeTopology.java	Mon Jul 24 19:16:35 2017 +0900
@@ -31,7 +31,7 @@
     @Override
     public void run() {
         String cookie = info3.asString();
-        System.out.println(cookie);
+        System.out.println("cookie:" + cookie);
         HostMessage host = info.asClass(HostMessage.class);
         int comingHostCount = info1.asInteger();
         ParentManager manager = info6.asClass(ParentManager.class);
--- a/src/main/java/alice/topology/node/SaveCookie.java	Mon Jul 24 19:15:01 2017 +0900
+++ b/src/main/java/alice/topology/node/SaveCookie.java	Mon Jul 24 19:16:35 2017 +0900
@@ -14,7 +14,7 @@
 
     @Override
     public void run() {
-        System.out.println(info.asString());
+        System.out.println("SaveCookie:" + info.asString());
 
     }
 
--- a/src/main/java/alice/topology/node/StartTopologyNode.java	Mon Jul 24 19:15:01 2017 +0900
+++ b/src/main/java/alice/topology/node/StartTopologyNode.java	Mon Jul 24 19:16:35 2017 +0900
@@ -23,7 +23,7 @@
         DataSegment.connect("manager", "manager", conf.getManagerHostName(), conf.getManagerPort());
         String localHostName = null;
         try {
-            localHostName = InetAddress.getLocalHost().getHostName();
+            localHostName = InetAddress.getLocalHost().getHostAddress();
         } catch (UnknownHostException e) {
             e.printStackTrace();
         }
--- a/src/main/java/alice/topology/node/TopologyNodeConfig.java	Mon Jul 24 19:15:01 2017 +0900
+++ b/src/main/java/alice/topology/node/TopologyNodeConfig.java	Mon Jul 24 19:16:35 2017 +0900
@@ -1,6 +1,8 @@
 package alice.topology.node;
 
 import java.util.ArrayList;
+import java.util.LinkedList;
+
 import alice.daemon.Config;
 
 public class TopologyNodeConfig extends Config {
@@ -10,7 +12,6 @@
     public String cookie;
     private ArrayList<Class> eventList = new ArrayList<Class>();
 
-
     public TopologyNodeConfig(String[] args) {
         super(args);
         for (int i = 0; i< args.length; i++) {
@@ -24,6 +25,36 @@
         }
     }
 
+    public static LinkedList<TopologyNodeConfig> MultiTopologyNodeCongingFactory(String[] args) {
+        LinkedList<TopologyNodeConfig> configList = new LinkedList<TopologyNodeConfig>();
+        LinkedList<String> nodeConfigArgs = new LinkedList<String>();
+
+        for(int i = 0; i < args.length; ++i) {
+            if("-host".equals(args[i])) {
+                if(!nodeConfigArgs.isEmpty()){
+                    configList.add(new TopologyNodeConfig((String[]) nodeConfigArgs.toArray()));
+                    nodeConfigArgs = new LinkedList<String>();
+                }
+                ++i;
+                nodeConfigArgs.add("-host");
+                nodeConfigArgs.add(args[i]);
+            } else if("-port".equals(args[i])) {
+                ++i;
+                nodeConfigArgs.add("-port");
+                nodeConfigArgs.add(args[i]);
+            } else if("-cookie".equals(args[i])) {
+                ++i;
+                nodeConfigArgs.add("-cookie");
+                nodeConfigArgs.add(args[i]);
+            }
+        }
+
+        configList.add(new TopologyNodeConfig(nodeConfigArgs.toArray(new String[] {})));
+
+        return configList;
+
+    }
+
     public String getManagerHostName() {
         return managerHostName;
     }
@@ -49,8 +80,10 @@
         for (Class clazz : eventList)
             try {
                 clazz.newInstance();
-            } catch (InstantiationException | IllegalAccessException e) {
+            } catch (InstantiationException e) {
                 e.printStackTrace();
+            } catch (IllegalAccessException e){
+
             }
     }