changeset 529:cb7c31848d16 dispose

add CompressedDSMs
author Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
date Fri, 01 May 2015 18:19:16 +0900
parents 6ebddfac7ff6
children 4aeebea0c9b5
files src/main/java/alice/daemon/CommandMessage.java src/main/java/alice/daemon/IncomingTcpConnection.java src/main/java/alice/daemon/IncomingUdpConnection.java src/main/java/alice/datasegment/Command.java src/main/java/alice/datasegment/CompressedLocalDataSegmentManager.java src/main/java/alice/datasegment/CompressedRemoteDataSegmentManager.java src/main/java/alice/datasegment/DataSegment.java src/main/java/alice/datasegment/LocalDataSegmentManager.java src/main/java/alice/datasegment/ReceiveData.java src/main/java/alice/datasegment/RemoteDataSegmentManager.java src/main/java/alice/datasegment/SendOption.java src/main/java/alice/test/topology/aquarium/fx/CheckAllFishInfoExist.java
diffstat 12 files changed, 458 insertions(+), 113 deletions(-) [+]
line wrap: on
line diff
--- a/src/main/java/alice/daemon/CommandMessage.java	Thu Apr 30 20:52:21 2015 +0900
+++ b/src/main/java/alice/daemon/CommandMessage.java	Fri May 01 18:19:16 2015 +0900
@@ -14,6 +14,7 @@
     public boolean quickFlag = false;//SEDAを挟まずに処理を行うかどうか
     public boolean serialized = false;//シリアライズされているかどうか
     public boolean compressed = false;//圧縮されているかどうか
+    public int datasize;
 
     public boolean setTime = false;//?
     public long time;//?
@@ -30,5 +31,6 @@
         this.quickFlag = qFlag;
         this.serialized = sFlag;
         this.compressed = cFlag;
+        ///this.datasize = datasize;
     }
 }
--- a/src/main/java/alice/daemon/IncomingTcpConnection.java	Thu Apr 30 20:52:21 2015 +0900
+++ b/src/main/java/alice/daemon/IncomingTcpConnection.java	Fri May 01 18:19:16 2015 +0900
@@ -8,7 +8,6 @@
 import org.msgpack.MessagePack;
 import org.msgpack.unpacker.Unpacker;
 
-import alice.codesegment.SingletonMessage;
 import alice.topology.manager.keeparive.RespondData;
 
 public class IncomingTcpConnection extends Thread {
@@ -17,6 +16,7 @@
     protected DataSegmentManager manager;
     protected String reverseKey;
     private LocalDataSegmentManager lmanager = DataSegment.getLocal();
+    private CompressedLocalDataSegmentManager compressedlmanager = DataSegment.getCompressedLocal();
     private static final MessagePack packer = new MessagePack();
 
     public IncomingTcpConnection(DataSegmentManager manager) {
@@ -33,6 +33,10 @@
         return lmanager;
     }
 
+    public CompressedLocalDataSegmentManager getCompressedLocalDataSegmentManager(){
+        return compressedlmanager;
+    }
+
     /**
      * pipeline thread for receiving
      */
@@ -86,7 +90,7 @@
                     System.out.println("in TCP REPLY");
                     cmd = manager.getAndRemoveCmd(msg.seq);
 
-                    rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()));//ここのコンストラクタでx
+                    rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()));
 
                     Command rCmd = new Command(type, null, null, rData, msg.index, msg.seq, null, null, "");
                     cmd.cs.ids.reply(cmd.receiver, rCmd);
--- a/src/main/java/alice/daemon/IncomingUdpConnection.java	Thu Apr 30 20:52:21 2015 +0900
+++ b/src/main/java/alice/daemon/IncomingUdpConnection.java	Fri May 01 18:19:16 2015 +0900
@@ -5,9 +5,9 @@
 import java.nio.ByteBuffer;
 import java.nio.channels.ClosedChannelException;
 
+import org.msgpack.MessagePack;
 import org.msgpack.unpacker.Unpacker;
 
-import alice.codesegment.SingletonMessage;
 import alice.datasegment.Command;
 import alice.datasegment.CommandType;
 import alice.datasegment.DataSegment;
@@ -22,6 +22,7 @@
 
     public MulticastConnection receiver;
     public MulticastConnection sender;
+    private static final MessagePack packer = new MessagePack();
 
     public IncomingUdpConnection(MulticastConnection s, MulticastConnection r, DataSegmentManager manager) {
         super(manager);
@@ -39,7 +40,7 @@
                 // Max data length is 65507 because of the max length of UDP payload
                 ByteBuffer receive = ByteBuffer.allocate(65507);
                 receiver.receive(receive);
-                Unpacker unpacker = SingletonMessage.getInstance().createBufferUnpacker(receive);
+                Unpacker unpacker = packer.createBufferUnpacker(receive);
                 receive.flip();
                 CommandMessage msg = unpacker.read(CommandMessage.class);
                 CommandType type = CommandType.getCommandTypeFromId(msg.type);
@@ -47,7 +48,6 @@
                 case UPDATE:
                 case PUT:
                     rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()));
-                    rData.setCompressFlag(msg.compressed);
                     cmd = new Command(type, null, null, rData, 0, 0, null, null, reverseKey);
 
                     if (msg.compressed){
@@ -77,7 +77,6 @@
                     cmd = manager.getAndRemoveCmd(msg.seq);
 
                     rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()));
-                    rData.setCompressFlag(msg.compressed);
 
                     Command rCmd = new Command(type, null, null, rData, msg.index, msg.seq, null, null, "");
                     cmd.cs.ids.reply(cmd.receiver, rCmd);
--- a/src/main/java/alice/datasegment/Command.java	Thu Apr 30 20:52:21 2015 +0900
+++ b/src/main/java/alice/datasegment/Command.java	Fri May 01 18:19:16 2015 +0900
@@ -7,7 +7,6 @@
 import org.msgpack.MessagePack;
 
 import alice.codesegment.CodeSegment;
-import alice.codesegment.SingletonMessage;
 import alice.daemon.CommandMessage;
 import alice.daemon.Connection;
 
@@ -28,6 +27,8 @@
     private boolean quickFlag = false;//SEDAを挟まずに処理を行うかどうか。trueだとリモート
     private boolean compressFlag = false;//trueだったら圧縮する必要がある
 
+    private static final MessagePack packer = new MessagePack();
+
     /**
      * for PEEK/TAKE
      */
@@ -69,14 +70,19 @@
         }
         return this.type + "\t" + key + "\t" + rData + "\tindex=" + index + "\tcs=" + csName;
     }
-
     /**
      * @return serialized ByteBuffer
      */
-    public ByteBuffer convert() {//byteArrayに変換
+    public ByteBuffer convert() {
+        ByteBuffer buf = null;
 
-        ByteBuffer buf = null;
-        switch (type) {
+        try {
+            byte[] header = null;
+            byte[] data = null;
+            byte[] dataSize = null;
+            boolean serialized = false;
+            boolean compressed = false;
+            switch (type) {
         /*
          * UPDATE, PUT, REPLY need send DataSegment to RemoteDataSegment
          * case UPDATE and PUT
@@ -85,26 +91,52 @@
          * these flags represent DataSegment status.
          * for example, serializeFlag is true. DataSegment had already converted, so no need convert.
          */
+                case UPDATE:
+                case PUT:
+                case REPLY:
+                    if (rData.compressed()) {
+                        // have already converted
+                        data = (byte[]) rData.getObj();
+                        compressed = rData.compressed(); // true
+                        serialized = rData.serialized();
+                    } else {
+                        if (!rData.serialized() && !rData.isByteArray()) {
+                            data = packer.write(rData.getObj());
+                            serialized = true;
+                        } else { // rData is RAW ByteArray or already serialized
+                            data = (byte[]) rData.getObj();
+                            serialized = rData.serialized();
+                        }
+                        if (compressFlag) {
+                            rData.zip();
+                            compressed = true;
+                        }
+                    }
+                    CommandMessage cm = new CommandMessage(type.id, index, seq, key, false, serialized, compressed);
+                    if (rData.setTime) {
+                        cm.setTime = true;
+                        cm.time = rData.time;
+                        cm.depth = rData.depth + 1;
+                    }
 
-        case UPDATE:
-            System.out.println("update compressFlag:" + compressFlag);
-            break;
-        case PUT:
-            System.out.println("put compressFlag:" + compressFlag);
-            break;
-        case REPLY://ReceiveDataからREPLYするDSを取得
-            System.out.println("in REPLY");
-            System.out.println("reply compressFlag:" + compressFlag + ", " + type.id+ ", " +  index+ ", " + seq + ", " +  key+ ", " +  false+ ", " +  rData.serialized()+ ", " +  rData.compressed());
+                    header = packer.write(cm);
+                    dataSize = packer.write(data.length);
+                    buf = ByteBuffer.allocate(header.length+dataSize.length+data.length);
+                    buf.put(header);
+                    buf.put(dataSize);
+                    buf.put(data);
+                    break;
+                default:
+                    header = packer.write(new CommandMessage(type.id, index, seq, key, quickFlag, false, compressFlag));
+                    buf = ByteBuffer.allocate(header.length);
+                    buf.put(header);
+                    break;
+            }
 
-
-            break;
-        default://PEEK, TAKE, RemoveならCommandMessageとそのサイズのみセット
-            ReceiveData rData2 = new ReceiveData("hoge");
-            System.out.println("default compressFlag:" + compressFlag);
-
-            break;
+            buf.flip();
+        } catch (IOException e) {
+            e.printStackTrace();
         }
-
         return buf;
     }
 
@@ -115,11 +147,11 @@
      * @param flag
      */
 
-    public void setQuickFlag(boolean flag){//SEDA処理の有無フラグのsetter
+    public void setQuickFlag(boolean flag){
         quickFlag = flag;
     }
 
-    public boolean getQuickFlag(){//SEDA処理の有無フラグのgetter
+    public boolean getQuickFlag(){
         return quickFlag;
     }
 
@@ -130,12 +162,11 @@
      * @param flag
      */
 
-    public void setCompressFlag(boolean flag){//圧縮フラグのsetter
+    public void setCompressFlag(boolean flag){
         compressFlag = flag;
     }
 
-    public boolean getCompressFlag(){//圧縮フラグのgetter
+    public boolean getCompressFlag(){
         return compressFlag;
     }
-
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/alice/datasegment/CompressedLocalDataSegmentManager.java	Fri May 01 18:19:16 2015 +0900
@@ -0,0 +1,176 @@
+package alice.datasegment;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+
+import alice.codesegment.CodeSegment;
+
+public class CompressedLocalDataSegmentManager extends DataSegmentManager {
+
+    private String reverseKey = "local";
+    private ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>();
+    private Logger logger = Logger.getLogger("local");
+
+    private ThreadPoolExecutor dataSegmentExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), // initial number of threads
+            Runtime.getRuntime().availableProcessors(),
+            Integer.MAX_VALUE, // keepAliveTime
+            TimeUnit.SECONDS,
+            new LinkedBlockingQueue<Runnable>());
+
+    public CompressedLocalDataSegmentManager() {
+        new Thread(replyThread, "LocalDataSegmentManager-replyCommand").start();
+    }
+
+    public void setReverseKey(String s){
+        reverseKey = s;
+    }
+
+    private class RunCommand implements Runnable {
+
+        DataSegmentKey key;
+        Command cmd;
+
+        public RunCommand(DataSegmentKey key, Command cmd) {
+            this.key = key;
+            this.cmd = cmd;
+        }
+
+        @Override
+        public void run() {
+            key.runCommand(cmd);
+        }
+
+    }
+
+    public void submitCommand(DataSegmentKey key, Command cmd) {
+        dataSegmentExecutor.execute(new RunCommand(key, cmd));
+    }
+
+    public DataSegmentKey getDataSegmentKey(String key) {
+        DataSegmentKey dsKey = dataSegments.get(key);
+        if (dsKey != null)
+            return dsKey;
+        if (key == null)
+            return null;
+        DataSegmentKey newDataSegmentKey = new DataSegmentKey();
+        DataSegmentKey dataSegmentKey = dataSegments.putIfAbsent(key, newDataSegmentKey);
+        if (dataSegmentKey == null) {
+            dataSegmentKey = newDataSegmentKey;
+        }
+        return dataSegmentKey;
+    }
+
+    public void removeDataSegmentKey(String key) {
+        if (key!=null)
+            dataSegments.remove(key);
+    }
+
+    @Override
+    public void put(String key, ReceiveData rData, SendOption option) {
+        DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
+        try {
+            rData.zip();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, reverseKey);
+        cmd.setCompressFlag(option.isCompress());
+
+        dataSegmentKey.runCommand(cmd);
+        if (logger.isDebugEnabled())
+            logger.debug(cmd.getCommandString());
+    }
+
+    /**
+     * Enqueue update command to the queue of each DataSegment key
+     */
+
+    @Override
+    public void update(String key, ReceiveData rData, SendOption option) {
+        DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
+        try {
+            rData.zip();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, reverseKey);
+        cmd.setCompressFlag(option.isCompress());
+
+        dataSegmentKey.runCommand(cmd);
+        if (logger.isDebugEnabled())
+            logger.debug(cmd.getCommandString());
+    }
+
+    @Override
+    public void take(Receiver receiver, CodeSegment cs, SendOption option) {
+        DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key);
+        int seq = this.seq.getAndIncrement();
+        Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
+        dataSegmentKey.runCommand(cmd);
+        if (logger.isDebugEnabled())
+            logger.debug(cmd.getCommandString());
+    }
+
+    @Override
+    public void peek(Receiver receiver, CodeSegment cs, SendOption option) {
+        DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key);
+        int seq = this.seq.getAndIncrement();
+        Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
+        dataSegmentKey.runCommand(cmd);
+        if (logger.isDebugEnabled())
+            logger.debug(cmd.getCommandString());
+    }
+
+    @Override
+    public void remove(String key) {
+        DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
+        Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, replyQueue, null, null);
+        dataSegmentKey.runCommand(cmd);
+        if (logger.isDebugEnabled())
+            logger.debug(cmd.getCommandString());
+    }
+
+    @Override public void finish() {
+        System.exit(0);
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    public void recommand(Receiver receiver, CodeSegment cs) {
+        DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key);
+        int seq = this.seq.getAndIncrement();
+        Command cmd = new Command(receiver.type, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
+        dataSegmentKey.runCommand(cmd);
+        if (logger.isDebugEnabled())
+            logger.debug(cmd.getCommandString());
+
+    }
+
+    @Override
+    public void ping(String returnKey) {
+
+    }
+
+    @Override
+    public void response(String returnKey) {
+
+    }
+
+    @Override
+    public void shutdown() {
+
+    }
+
+    @Override
+    public void setSendError(boolean b) {
+
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/alice/datasegment/CompressedRemoteDataSegmentManager.java	Fri May 01 18:19:16 2015 +0900
@@ -0,0 +1,172 @@
+package alice.datasegment;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.log4j.Logger;
+
+import alice.codesegment.CodeSegment;
+import alice.daemon.Connection;
+import alice.daemon.IncomingTcpConnection;
+import alice.daemon.OutboundTcpConnection;
+
+public class CompressedRemoteDataSegmentManager extends DataSegmentManager {
+    protected Connection connection;
+    protected Logger logger;
+
+    public CompressedRemoteDataSegmentManager(){}
+
+    public CompressedRemoteDataSegmentManager(String connectionKey, final String reverseKey, final String hostName, final int port) {
+        logger = Logger.getLogger(connectionKey);
+        connection = new Connection();
+        connection.name = connectionKey;
+        final CompressedRemoteDataSegmentManager manager = this;
+        //new Thread(replyThread, "RemoteDataSegmentManager-" + connectionKey).start();
+        new Thread("Connect-" + connectionKey) {
+            public void run() {
+                boolean connect = true;
+                do {
+                    try {
+                        SocketChannel sc = SocketChannel.open(new InetSocketAddress(hostName, port));
+                        connection.socket = sc.socket();
+                        connection.socket.setTcpNoDelay(true);
+                        connect = false;
+                        logger.info("Connect to " + connection.getInfoString());
+                    } catch (IOException e) {
+                        try {
+                            Thread.sleep(50);
+                        } catch (InterruptedException e1) {
+                            e1.printStackTrace();
+                        }
+                    }
+                } while (connect);
+                IncomingTcpConnection in = new IncomingTcpConnection(connection, manager, reverseKey);
+                in.setName(reverseKey+"-IncomingTcp");
+                in.setPriority(MAX_PRIORITY);
+                in.start();
+                OutboundTcpConnection out = new OutboundTcpConnection(connection);
+                out.setName(connectionKey+"-OutboundTcp");
+                out.setPriority(MAX_PRIORITY);
+                out.start();
+            }
+        }.start();
+    }
+
+    /**
+     * send put command to target DataSegment
+     */
+    @Override
+    public void put(String key, ReceiveData rData, SendOption option) {
+        try {
+            rData.zip();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, "");
+        cmd.setCompressFlag(option.isCompress());//true
+
+        if (option.isQuick()){
+            connection.write(cmd); // put command is executed right now
+        } else {
+            connection.sendCommand(cmd); // put command on the transmission thread
+        }
+        if (logger.isDebugEnabled())
+            logger.debug(cmd.getCommandString());
+    }
+
+    @Override
+    public void update(String key, ReceiveData rData, SendOption option) {
+        try {
+            rData.zip();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, "");
+        cmd.setCompressFlag(option.isCompress());
+
+        if (option.isQuick()){
+            connection.write(cmd);
+        } else {
+            connection.sendCommand(cmd);
+        }
+        if (logger.isDebugEnabled())
+            logger.debug(cmd.getCommandString());
+    }
+
+    @Override
+    public void take(Receiver receiver, CodeSegment cs, SendOption option) {
+        int seq = this.seq.getAndIncrement();
+        Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
+        cmd.setQuickFlag(option.isQuick());
+        seqHash.put(seq, cmd);
+        if (option.isQuick()){
+            connection.write(cmd);
+        } else {
+            connection.sendCommand(cmd);
+        }
+        if (logger.isDebugEnabled())
+            logger.debug(cmd.getCommandString());
+    }
+
+    @Override
+    public void peek(Receiver receiver, CodeSegment cs, SendOption option) {
+        int seq = this.seq.getAndIncrement();
+        Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
+        cmd.setQuickFlag(option.isQuick());
+        seqHash.put(seq, cmd);
+        if (option.isQuick()){
+            connection.write(cmd);
+        } else {
+            connection.sendCommand(cmd);
+        }
+        if (logger.isDebugEnabled())
+            logger.debug(cmd.getCommandString());
+    }
+
+    @Override
+    public void remove(String key) {
+        Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, null, null, "");
+        connection.sendCommand(cmd);
+        if (logger.isDebugEnabled())
+            logger.debug(cmd.getCommandString());
+    }
+
+    @Override
+    public void finish() {
+        Command cmd = new Command(CommandType.FINISH, null, null, null, 0, 0, null, null, "");
+        connection.sendCommand(cmd);
+    }
+
+    @Override
+    public void ping(String returnKey) {
+        Command cmd = new Command(CommandType.PING, null, returnKey, null, 0, 0, null, null, "");
+        connection.write(cmd);
+    }
+
+    @Override
+    public void response(String returnKey) {
+        Command cmd = new Command(CommandType.RESPONSE, null, returnKey, null, 0, 0, null, null, "");
+        connection.write(cmd);
+    }
+
+    @Override
+    public void close() {
+        Command cmd = new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, "");
+        connection.sendManager = false;
+        connection.sendCommand(cmd);
+    }
+
+    @Override
+    public void shutdown() {
+        connection.close();
+        LinkedBlockingQueue<Command> queue = connection.sendQueue;
+        if (!queue.isEmpty()) queue.clear();
+    }
+
+    @Override
+    public void setSendError(boolean b) {
+        connection.sendManager = b;
+    }
+}
--- a/src/main/java/alice/datasegment/DataSegment.java	Thu Apr 30 20:52:21 2015 +0900
+++ b/src/main/java/alice/datasegment/DataSegment.java	Fri May 01 18:19:16 2015 +0900
@@ -10,7 +10,7 @@
 
     private static DataSegment dataSegment = new DataSegment();
     private LocalDataSegmentManager local = new LocalDataSegmentManager();
-    private LocalDataSegmentManager compressedLocal = new LocalDataSegmentManager();//追加
+    private CompressedLocalDataSegmentManager compressedLocal = new CompressedLocalDataSegmentManager();//追加
     private ConcurrentHashMap<String, DataSegmentManager> dataSegmentManagers = new ConcurrentHashMap<String, DataSegmentManager>(); //TODO Over Head
     private ConcurrentHashMap<String, IncomingTcpConnection> acceptHash = new ConcurrentHashMap<String, IncomingTcpConnection>();
 
@@ -27,7 +27,7 @@
         return dataSegment.local;
     }
 
-    public static LocalDataSegmentManager getCompressedLocal() {//追加
+    public static CompressedLocalDataSegmentManager getCompressedLocal() {//追加
         return dataSegment.compressedLocal;
     }
 
@@ -42,7 +42,7 @@
             System.exit(0);
         }
         RemoteDataSegmentManager manager = new RemoteDataSegmentManager(connectionKey, reverseKey, hostName, port);
-        RemoteDataSegmentManager compressedManager = new RemoteDataSegmentManager(connectionKey + "!", reverseKey, hostName, port);
+        CompressedRemoteDataSegmentManager compressedManager = new CompressedRemoteDataSegmentManager(connectionKey + "!", reverseKey, hostName, port);
 
         register(connectionKey, manager);
         register(connectionKey + "!", compressedManager);
--- a/src/main/java/alice/datasegment/LocalDataSegmentManager.java	Thu Apr 30 20:52:21 2015 +0900
+++ b/src/main/java/alice/datasegment/LocalDataSegmentManager.java	Fri May 01 18:19:16 2015 +0900
@@ -9,10 +9,6 @@
 
 import alice.codesegment.CodeSegment;
 
-/**
- * localのDSに対する処理。DS自体は持っていない。→ReceivedData
- * DataSegmentKey.runCommandに渡してコマンドを実行する。
- */
 public class LocalDataSegmentManager extends DataSegmentManager {
 
     private String reverseKey = "local";
@@ -25,7 +21,6 @@
             TimeUnit.SECONDS,
             new LinkedBlockingQueue<Runnable>());
 
-    //コンストラクタ。スレッドが走る。
     public LocalDataSegmentManager() {
         new Thread(replyThread, "LocalDataSegmentManager-replyCommand").start();
     }
@@ -77,10 +72,7 @@
     @Override
     public void put(String key, ReceiveData rData, SendOption option) {
         DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
-        Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, reverseKey);//ここでMP変換している
-        cmd.setCompressFlag(option.getCompressFlag());
-        rData.setCompressFlag(option.getCompressFlag());
-
+        Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, reverseKey);
         dataSegmentKey.runCommand(cmd);
         if (logger.isDebugEnabled())
             logger.debug(cmd.getCommandString());
@@ -94,9 +86,6 @@
     public void update(String key, ReceiveData rData, SendOption option) {
         DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
         Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, reverseKey);
-        cmd.setCompressFlag(option.getCompressFlag());
-        rData.setCompressFlag(option.getCompressFlag());
-
         dataSegmentKey.runCommand(cmd);
         if (logger.isDebugEnabled())
             logger.debug(cmd.getCommandString());
@@ -107,8 +96,6 @@
         DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key);
         int seq = this.seq.getAndIncrement();
         Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
-        cmd.setCompressFlag(option.getCompressFlag());
-
         dataSegmentKey.runCommand(cmd);
         if (logger.isDebugEnabled())
             logger.debug(cmd.getCommandString());
@@ -119,14 +106,11 @@
         DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key);
         int seq = this.seq.getAndIncrement();
         Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
-        cmd.setCompressFlag(option.getCompressFlag());
-
         dataSegmentKey.runCommand(cmd);
         if (logger.isDebugEnabled())
             logger.debug(cmd.getCommandString());
     }
 
-    //このコマンドは?
     @Override
     public void remove(String key) {
         DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
@@ -145,7 +129,6 @@
 
     }
 
-    //?
     public void recommand(Receiver receiver, CodeSegment cs) {
         DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key);
         int seq = this.seq.getAndIncrement();
--- a/src/main/java/alice/datasegment/ReceiveData.java	Thu Apr 30 20:52:21 2015 +0900
+++ b/src/main/java/alice/datasegment/ReceiveData.java	Fri May 01 18:19:16 2015 +0900
@@ -2,18 +2,12 @@
 
 import java.io.*;
 import java.nio.ByteBuffer;
-import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.zip.*;
 
-import alice.daemon.CommandMessage;
-import org.apache.log4j.Logger;
 import org.msgpack.MessagePack;
 import org.msgpack.type.Value;
-import alice.codesegment.SingletonMessage;
 
-import javax.xml.bind.DatatypeConverter;
-import java.io.ByteArrayInputStream;
 
 import static java.util.Arrays.*;
 
@@ -56,7 +50,6 @@
         }
     }
 
-
     public boolean isByteArray(){
         return messagePack != null | zMessagePack != null;
     }
@@ -116,10 +109,10 @@
             }
 
             if (zMessagePack != null && messagePack == null) {
-                messagePack = unzip(zMessagePack);
+                messagePack = unzip(zMessagePack, 100);///ToDo:read header and set length
             }
 
-            return SingletonMessage.getInstance().read(messagePack, clazz);
+            return packer.read(messagePack, clazz);
 
         } catch (IOException e) {// | DataFormatException e
             e.printStackTrace();
@@ -129,24 +122,19 @@
 
 
     public int zip() throws IOException {
-        LinkedList<ByteBuffer> inputs = new LinkedList<ByteBuffer>(messagePack);
-        int inputIndex
-        LinkedList<ByteBuffer> outputs;
+        LinkedList<ByteBuffer> inputs = new LinkedList<ByteBuffer>();
+        int inputIndex = 0;
+        LinkedList<ByteBuffer> outputs = new LinkedList<ByteBuffer>();
+        Deflater deflater = new Deflater();
 
         int len = 0;
-        int INFLATE_BUFSIZE = 1024 * 100;
+        int INFLATE_BUFSIZE = 1024 * 100;//ToDo:fix
         ByteBuffer c1 = allocate(INFLATE_BUFSIZE);//for output
 
         while (inputIndex < inputs.size()) {
             ByteBuffer b1 = inputs.get(inputIndex++);
             deflater.setInput(b1.array(), b1.position(), b1.remaining());
-            /**
-             * If we finish() stream and reset() it, Deflater start new gzip
-             * stream, this makes continuous zlib reader unhappy. if we remove
-             * finish(), Deflater.deflate() never flushes its output. The
-             * original zlib deflate has flush flag. I'm pretty sure this a kind
-             * of bug of Java library.
-             */
+            
             if (inputIndex == inputs.size()){
                 deflater.finish();
             }
@@ -170,17 +158,17 @@
             outputs.addLast(c1);
         }
 
-        zMessagePack = outputs
+        //zMessagePack = outputs.toArray();
         deflater.reset();
         return len;///return length of ziped data
     }
 
-    protected byte[] unzip(byte[] input) {///read header & unzip
+    protected byte[] unzip(byte[] input, int zippedLength) {///read header & unzip
         int length = input.length;
-        int zippedLength = byteArrayToInt(copyOfRange(input, 4, 8));///read header...Is this copy OK??
+        Inflater inflater = new Inflater();
 
         byte [] output = new byte [zippedLength];///byteArray for unziped data
-        inflater.setInput(input, 8, length - 8);///set unzip data without header
+        inflater.setInput(input, 0, length);///set unzip data without header
 
         try {
             System.out.println("return:" + inflater.inflate(output, 0, zippedLength));///unzip
@@ -208,7 +196,6 @@
                 b = ByteBuffer.allocate(size);
             } catch (OutOfMemoryError e) {
                 b = null;
-                System.err.println("multicastqueue : wait for heap : " + e);
             }
             if (b!=null) {
                 break;
@@ -216,7 +203,6 @@
             try {
                 wait();
             } catch (InterruptedException e) {
-                System.out.println("thread has interrupted the current thread.");
             }
         }
         return b;
--- a/src/main/java/alice/datasegment/RemoteDataSegmentManager.java	Thu Apr 30 20:52:21 2015 +0900
+++ b/src/main/java/alice/datasegment/RemoteDataSegmentManager.java	Fri May 01 18:19:16 2015 +0900
@@ -18,7 +18,7 @@
 
     public RemoteDataSegmentManager(){}
 
-    public RemoteDataSegmentManager(final String connectionKey, final String reverseKey, final String hostName, final int port) {
+    public RemoteDataSegmentManager(String connectionKey, final String reverseKey, final String hostName, final int port) {
         logger = Logger.getLogger(connectionKey);
         connection = new Connection();
         connection.name = connectionKey;
@@ -59,9 +59,9 @@
      */
     @Override
     public void put(String key, ReceiveData rData, SendOption option) {
-        Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, "");///set compressed flag
-        cmd.setCompressFlag(option.getCompressFlag());
-        if (option.getQuickFlag()){
+        Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, "");
+        cmd.setCompressFlag(option.isCompress());
+        if (option.isQuick()){
             connection.write(cmd); // put command is executed right now
         } else {
             connection.sendCommand(cmd); // put command on the transmission thread
@@ -73,8 +73,8 @@
     @Override
     public void update(String key, ReceiveData rData, SendOption option) {
         Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, "");
-        cmd.setCompressFlag(option.getCompressFlag());
-        if (option.getQuickFlag()){
+        cmd.setCompressFlag(option.isCompress());
+        if (option.isQuick()){
             connection.write(cmd);
         } else {
             connection.sendCommand(cmd);
@@ -87,11 +87,9 @@
     public void take(Receiver receiver, CodeSegment cs, SendOption option) {
         int seq = this.seq.getAndIncrement();
         Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
-        //cmd.setQuickFlag(option.getQuickFlag());
-        cmd.setCompressFlag(option.getCompressFlag());
-
+        cmd.setQuickFlag(option.isQuick());
         seqHash.put(seq, cmd);
-        if (option.getQuickFlag()){
+        if (option.isQuick()){
             connection.write(cmd);
         } else {
             connection.sendCommand(cmd);
@@ -104,11 +102,9 @@
     public void peek(Receiver receiver, CodeSegment cs, SendOption option) {
         int seq = this.seq.getAndIncrement();
         Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
-        cmd.setQuickFlag(option.getQuickFlag());
-        //cmd.setCompressFlag(option.getCompressFlag());
-
+        cmd.setQuickFlag(option.isQuick());
         seqHash.put(seq, cmd);
-        if (option.getQuickFlag()){
+        if (option.isQuick()){
             connection.write(cmd);
         } else {
             connection.sendCommand(cmd);
--- a/src/main/java/alice/datasegment/SendOption.java	Thu Apr 30 20:52:21 2015 +0900
+++ b/src/main/java/alice/datasegment/SendOption.java	Fri May 01 18:19:16 2015 +0900
@@ -1,30 +1,27 @@
 package alice.datasegment;
 
-/**
- * フラグを一時的に格納するだけ。
- */
 public class SendOption {
-    private boolean quickFlag = false;
-    private boolean compressFlag = false;
-    
+    private boolean quick = false;
+    private boolean compress = false;
+
     public SendOption(boolean qFlag, boolean cFlag) {
-        quickFlag = qFlag;
-        compressFlag = cFlag;
+        quick = qFlag;
+        compress = cFlag;
     }
 
-    public boolean getQuickFlag() {
-        return quickFlag;
+    public boolean isQuick() {
+        return quick;
     }
 
-    public void setQuickFlag(boolean quick) {
-        this.quickFlag = quick;
+    public void setQuick(boolean quick) {
+        this.quick = quick;
     }
 
-    public boolean getCompressFlag() {
-        return compressFlag;
+    public boolean isCompress() {
+        return compress;
     }
 
-    public void setCompressFlag(boolean compress) {
-        this.compressFlag = compress;
+    public void setCompress(boolean compress) {
+        this.compress = compress;
     }
 }
--- a/src/main/java/alice/test/topology/aquarium/fx/CheckAllFishInfoExist.java	Thu Apr 30 20:52:21 2015 +0900
+++ b/src/main/java/alice/test/topology/aquarium/fx/CheckAllFishInfoExist.java	Fri May 01 18:19:16 2015 +0900
@@ -10,7 +10,6 @@
 import org.msgpack.type.Value;
 
 import alice.codesegment.CodeSegment;
-import alice.codesegment.SingletonMessage;
 import alice.datasegment.CommandType;
 import alice.datasegment.Receiver;
 
@@ -31,11 +30,11 @@
             new SendDataOthers(info, info.key);
             ods.put("registeredList", info.getVal());
             ArrayValue fishInfoList = info.asClass(Value.class).asArrayValue();
-            MessagePack msg = SingletonMessage.getInstance();
+            MessagePack packer = new MessagePack();
             Group root = info1.asClass(Group.class);
             for (Value v : fishInfoList){
                 boolean exist = false;
-                FishInfo info = msg.convert(v, FishInfo.class);
+                FishInfo info = packer.convert(v, FishInfo.class);
                 if (info.name != null) {
                     for (Node n : root.getChildren()) {
                         if (info.name.equals(n.getId())) {