changeset 527:bfec2c3ff1b8 dispose

change unzip
author Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
date Thu, 30 Apr 2015 18:14:02 +0900
parents 928907206d21
children 6ebddfac7ff6
files src/main/java/alice/codesegment/InputDataSegment.java src/main/java/alice/codesegment/OutputDataSegment.java src/main/java/alice/daemon/Connection.java src/main/java/alice/daemon/IncomingTcpConnection.java src/main/java/alice/daemon/IncomingUdpConnection.java src/main/java/alice/daemon/OutboundTcpConnection.java src/main/java/alice/datasegment/Command.java src/main/java/alice/datasegment/DataSegmentKey.java src/main/java/alice/datasegment/DataSegmentManager.java src/main/java/alice/datasegment/LocalDataSegmentManager.java src/main/java/alice/datasegment/ReceiveData.java src/main/java/alice/datasegment/RemoteDataSegmentManager.java src/main/java/alice/datasegment/SendOption.java src/main/java/alice/test/codesegment/remote/RemoteStartCodeSegment.java
diffstat 14 files changed, 425 insertions(+), 241 deletions(-) [+]
line wrap: on
line diff
--- a/src/main/java/alice/codesegment/InputDataSegment.java	Sat Apr 18 19:09:15 2015 +0900
+++ b/src/main/java/alice/codesegment/InputDataSegment.java	Thu Apr 30 18:14:02 2015 +0900
@@ -34,19 +34,21 @@
         cs.register(receiver);
 
         if (receiver.compressedFlag){
+            SendOption option = new SendOption(true, true);
             if (receiver.managerKey == null){//localの場合
-                DataSegment.getCompressedLocal().peek(receiver, cs, true);
+                DataSegment.getCompressedLocal().peek(receiver, cs, option);
             } else {
                 if (DataSegment.contains(receiver.managerKey)) {//remoteの場合
-                    DataSegment.get(receiver.managerKey + "!").peek(receiver, cs, true);
+                    DataSegment.get(receiver.managerKey + "!").peek(receiver, cs, option);
                 }
             }
         } else {
+            SendOption option = new SendOption(true, false);
             if (receiver.managerKey == null){
-                DataSegment.getLocal().peek(receiver, cs, true);
+                DataSegment.getLocal().peek(receiver, cs, option);
             } else {
                 if (DataSegment.contains(receiver.managerKey)) {
-                    DataSegment.get(receiver.managerKey).peek(receiver, cs, true);
+                    DataSegment.get(receiver.managerKey).peek(receiver, cs, option);
                 }
             }
         }
@@ -58,19 +60,21 @@
         cs.register(receiver);
 
         if (receiver.compressedFlag){
+            SendOption option = new SendOption(false, true);
             if (receiver.managerKey==null){
-                DataSegment.getCompressedLocal().peek(receiver, cs, false);
+                DataSegment.getCompressedLocal().peek(receiver, cs, option);
             } else {
                 if (DataSegment.contains(receiver.managerKey)) {
-                    DataSegment.get(receiver.managerKey + "!").peek(receiver, cs, false);
+                    DataSegment.get(receiver.managerKey + "!").peek(receiver, cs, option);
                 }
             }
         } else {
+            SendOption option = new SendOption(false, false);
             if (receiver.managerKey==null){
-                DataSegment.getLocal().peek(receiver, cs, false);
+                DataSegment.getLocal().peek(receiver, cs, option);
             } else {
                 if (DataSegment.contains(receiver.managerKey)) {
-                    DataSegment.get(receiver.managerKey).peek(receiver, cs, false);
+                    DataSegment.get(receiver.managerKey).peek(receiver, cs, option);
                 }
             }
         }
@@ -81,19 +85,21 @@
         cs.register(receiver);
 
         if (receiver.compressedFlag){
+            SendOption option = new SendOption(true, true);
             if (receiver.managerKey==null){
-                DataSegment.getCompressedLocal().take(receiver, cs, true);
+                DataSegment.getCompressedLocal().take(receiver, cs, option);
             } else {
                 if (DataSegment.contains(receiver.managerKey)) {
-                    DataSegment.get(receiver.managerKey + "!").take(receiver, cs, true);
+                    DataSegment.get(receiver.managerKey + "!").take(receiver, cs, option);
                 }
             }
         } else {
+            SendOption option = new SendOption(true, false);
             if (receiver.managerKey==null){
-                DataSegment.getLocal().take(receiver, cs, true);
+                DataSegment.getLocal().take(receiver, cs, option);
             } else {
                 if (DataSegment.contains(receiver.managerKey)) {
-                    DataSegment.get(receiver.managerKey).take(receiver, cs, true);
+                    DataSegment.get(receiver.managerKey).take(receiver, cs, option);
                 }
             }
         }
@@ -104,19 +110,21 @@
         cs.register(receiver);
 
         if (receiver.compressedFlag){
+            SendOption option = new SendOption(false, true);
             if (receiver.managerKey==null){// 指定なしの場合デフォはローカルになる
-                DataSegment.getCompressedLocal().take(receiver, cs, false);
+                DataSegment.getCompressedLocal().take(receiver, cs, option);
             } else {
                 if (DataSegment.contains(receiver.managerKey)) {
-                    DataSegment.get(receiver.managerKey + "!").take(receiver, cs, false, true);
+                    DataSegment.get(receiver.managerKey + "!").take(receiver, cs, option);
                 }
             }
         } else {
+            SendOption option = new SendOption(false, false);
             if (receiver.managerKey==null){
-                DataSegment.getLocal().take(receiver, cs, false);
+                DataSegment.getLocal().take(receiver, cs, option);
             } else {
                 if (DataSegment.contains(receiver.managerKey)) {
-                    DataSegment.get(receiver.managerKey).take(receiver, cs, false);
+                    DataSegment.get(receiver.managerKey).take(receiver, cs, option);
                 }
             }
         }
--- a/src/main/java/alice/codesegment/OutputDataSegment.java	Sat Apr 18 19:09:15 2015 +0900
+++ b/src/main/java/alice/codesegment/OutputDataSegment.java	Thu Apr 30 18:14:02 2015 +0900
@@ -18,9 +18,9 @@
      */
     public void flip(Receiver receiver) {
         if (receiver.isCompressed()){
-            DataSegment.getCompressedLocal().put(receiver.key, receiver.getReceiveData(), false);
+            DataSegment.getCompressedLocal().put(receiver.key, receiver.getReceiveData(), null);
         } else {
-            DataSegment.getLocal().put(receiver.key, receiver.getReceiveData(), false);
+            DataSegment.getLocal().put(receiver.key, receiver.getReceiveData(), null);
         }
     }
 
@@ -28,16 +28,16 @@
         switch (type) {
         case PUT:
             if (receiver.isCompressed()){
-                DataSegment.getCompressedLocal().put(receiver.key, receiver.getReceiveData(), false);//localなら全部false。
+                DataSegment.getCompressedLocal().put(receiver.key, receiver.getReceiveData(), null);//localなら全部false。
             } else {
-                DataSegment.getLocal().put(receiver.key, receiver.getReceiveData(), false);
+                DataSegment.getLocal().put(receiver.key, receiver.getReceiveData(), null);
             }
             break;
         case UPDATE:
             if (receiver.isCompressed()){
-                DataSegment.getCompressedLocal().update(receiver.key, receiver.getReceiveData(), false);
+                DataSegment.getCompressedLocal().update(receiver.key, receiver.getReceiveData(), null);
             } else {
-                DataSegment.getLocal().update(receiver.key, receiver.getReceiveData(), false);
+                DataSegment.getLocal().update(receiver.key, receiver.getReceiveData(), null);
             }
 
             break;
@@ -47,29 +47,27 @@
     }
 
     public void put(String key, ReceiveData rData) {
-        DataSegment.getLocal().put(key, rData, false);
+        DataSegment.getLocal().put(key, rData, null);
     }
 
     public void put(String key, Object val) {
         ReceiveData rData = new ReceiveData(val);
-        DataSegment.getLocal().put(key, rData, false);
+        DataSegment.getLocal().put(key, rData, new SendOption(false, false));
     }
 
-    public void put(String key, Object val, boolean cFlag) {//追加
+    public void put(String key, Object val, boolean cFlag) {///追加
         ReceiveData rData = new ReceiveData(val);
-        rData.setCompressFlag(cFlag);
-        DataSegment.getLocal().put(key, rData, false);
+        DataSegment.getLocal().put(key, rData, new SendOption(false, cFlag));
     }
 
     public void update(String key, Object val) {
         ReceiveData rData = new ReceiveData(val);
-        DataSegment.getLocal().update(key, rData, false);
+        DataSegment.getLocal().update(key, rData, new SendOption(false, false));
     }
 
-    public void update(String key, Object val, boolean cFlag) {//追加
+    public void update(String key, Object val, boolean cFlag) {///追加
         ReceiveData rData = new ReceiveData(val);
-        rData.setCompressFlag(cFlag);
-        DataSegment.getLocal().update(key, rData, false);
+        DataSegment.getLocal().update(key, rData, new SendOption(false, cFlag));
     }
 
     /**
@@ -77,12 +75,14 @@
      */
     public void put(String managerKey, String key, ReceiveData rData) {
         if (!managerKey.equals("local") && DataSegment.contains(managerKey)){
+            SendOption option = new SendOption(false, rData.compressed());
+            //SendOption option = new SendOption(false, compressFlag);
+
             if (rData.compressed()){
-                DataSegment.get(managerKey + "!").put(key, rData, false);
+                DataSegment.get(managerKey + "!").put(key, rData, option);
             } else {
-                DataSegment.get(managerKey).put(key, rData, false);
+                DataSegment.get(managerKey).put(key, rData, option);
             }
-
         } else {
             put(key, rData);
         }
@@ -95,32 +95,31 @@
     public void put(String managerKey, String key, Object val, boolean cFlag) {//追加
         System.out.println("in PUT");
         ReceiveData rData = new ReceiveData(val);
-        rData.setCompressFlag(cFlag);
+        SendOption option = new SendOption(false, cFlag);
+        //rData.setCompressFlag(cFlag);
 
         if (!managerKey.equals("local") && DataSegment.contains(managerKey)){///if remote
             if (cFlag){
-                DataSegment.get(managerKey + "!").put(key, rData, false);
+                DataSegment.get(managerKey + "!").put(key, rData, option);
             } else {
-                DataSegment.get(managerKey).put(key, rData, false);
+                DataSegment.get(managerKey).put(key, rData, option);
             }
         } else {// if local
             if (cFlag){
-                DataSegment.getCompressedLocal().put(key, rData, false);
+                DataSegment.getCompressedLocal().put(key, rData, option);
             } else {
                 put(key, val);
             }
         }
-
-
-
     }
 
     public void quickPut(String managerKey, String key, ReceiveData rData) {
         if (!managerKey.equals("local") && DataSegment.contains(managerKey)){
+            SendOption option = new SendOption(true, false);
             if (rData.compressed()){
-                DataSegment.get(managerKey + "!").put(key, rData, true);
+                DataSegment.get(managerKey + "!").put(key, rData, option);
             } else {
-                DataSegment.get(managerKey).put(key, rData, true);
+                DataSegment.get(managerKey).put(key, rData, option);
             }
         } else {
             put(key, rData);
@@ -134,11 +133,13 @@
     public void quickPut(String managerKey, String key, Object val, boolean cFlag) {//追加
         if (!managerKey.equals("local") && DataSegment.contains(managerKey)){
             ReceiveData rData = new ReceiveData(val);
-            rData.setCompressFlag(cFlag);
+            SendOption option = new SendOption(true, cFlag);
+            //rData.setCompressFlag(cFlag);
+
             if (cFlag){
-                DataSegment.get(managerKey + "!").put(key, rData, true);
+                DataSegment.get(managerKey + "!").put(key, rData, option);
             } else {
-                DataSegment.get(managerKey).put(key, rData, true);
+                DataSegment.get(managerKey).put(key, rData, option);
             }
         } else {
             put(key, val);
@@ -147,10 +148,11 @@
 
     public void update(String managerKey, String key, ReceiveData rData) {
         if (!managerKey.equals("local") && DataSegment.contains(managerKey)){
+            SendOption option = new SendOption(false, rData.compressed());
             if (rData.compressed()){
-                DataSegment.get(managerKey + "!").update(key, rData, false);
+                DataSegment.get(managerKey + "!").update(key, rData, option);
             } else {
-                DataSegment.get(managerKey).update(key, rData, false);
+                DataSegment.get(managerKey).update(key, rData, option);
             }
         } else {
             update(key, rData);
@@ -162,25 +164,32 @@
     }
 
     public void update(String managerKey, String key, Object val, boolean cFlag) {//追加
-        if (!managerKey.equals("local") && DataSegment.contains(managerKey)){
-            ReceiveData rData = new ReceiveData(val);
-            rData.setCompressFlag(cFlag);
-            if(cFlag){
-                DataSegment.get(managerKey + "!").update(key, rData, false);
+        ReceiveData rData = new ReceiveData(val);
+        SendOption option = new SendOption(false, cFlag);
+        //rData.setCompressFlag(cFlag);
+
+        if (!managerKey.equals("local") && DataSegment.contains(managerKey)){///if remote
+            if (cFlag){
+                DataSegment.get(managerKey + "!").update(key, rData, option);
             } else {
-                DataSegment.get(managerKey).update(key, rData, false);
+                DataSegment.get(managerKey).update(key, rData, option);
             }
-        } else {
-            update(key, val);
+        } else {// if local
+            if (cFlag){
+                DataSegment.getCompressedLocal().update(key, rData, option);
+            } else {
+                update(key, val);
+            }
         }
     }
 
     public void quickUpdate(String managerKey, String key, ReceiveData rData) {
         if (!managerKey.equals("local") && DataSegment.contains(managerKey)){
+            SendOption option = new SendOption(true, rData.compressed());
             if (rData.compressed()){
-                DataSegment.get(managerKey + "!").update(key, rData, true);
+                DataSegment.get(managerKey + "!").update(key, rData, option);
             } else {
-                DataSegment.get(managerKey).update(key, rData, true);
+                DataSegment.get(managerKey).update(key, rData, option);
             }
         } else {
             update(key, rData);
@@ -194,11 +203,12 @@
     public void quickUpdate(String managerKey, String key, Object val, boolean cFlag) {//追加
         if (!managerKey.equals("local") && DataSegment.contains(managerKey)){
             ReceiveData rData = new ReceiveData(val);
-            rData.setCompressFlag(cFlag);
+            SendOption option = new SendOption(true, cFlag);
+            //rData.setCompressFlag(cFlag);
             if (cFlag){
-                DataSegment.get(managerKey + "!").update(key, rData, true);
+                DataSegment.get(managerKey + "!").update(key, rData, option);
             } else {
-                DataSegment.get(managerKey).update(key, rData, true);
+                DataSegment.get(managerKey).update(key, rData, option);
             }
         } else {
             update(key, val);
--- a/src/main/java/alice/daemon/Connection.java	Sat Apr 18 19:09:15 2015 +0900
+++ b/src/main/java/alice/daemon/Connection.java	Thu Apr 30 18:14:02 2015 +0900
@@ -41,7 +41,8 @@
             while (buffer.hasRemaining()) {
                 socket.getChannel().write(buffer);
             }
-        } catch (Exception e) { }
+        } catch (Exception e) {
+        }
     }
 
     public void close(){
@@ -58,9 +59,11 @@
         if (name!=null){
             ConnectionInfo c = new ConnectionInfo(name, socket);
             ReceiveData rData = new ReceiveData(c);
-            DataSegment.getLocal().put("_DISCONNECT", rData, false);
+            DataSegment.getLocal().put("_DISCONNECT", rData, null);
             if (sendManager) {
-                DataSegment.get("manager").put("_DISCONNECTNODE", rData, false);
+                SendOption option = new SendOption(false, false);
+                DataSegment.get("manager").put("_DISCONNECTNODE", rData, option);
+                sendManager = false;
             }
         }
 
--- a/src/main/java/alice/daemon/IncomingTcpConnection.java	Sat Apr 18 19:09:15 2015 +0900
+++ b/src/main/java/alice/daemon/IncomingTcpConnection.java	Thu Apr 30 18:14:02 2015 +0900
@@ -4,15 +4,10 @@
 import java.io.IOException;
 import java.nio.channels.ClosedChannelException;
 
+import alice.datasegment.*;
 import org.msgpack.unpacker.Unpacker;
 
 import alice.codesegment.SingletonMessage;
-import alice.datasegment.Command;
-import alice.datasegment.CommandType;
-import alice.datasegment.DataSegment;
-import alice.datasegment.DataSegmentManager;
-import alice.datasegment.LocalDataSegmentManager;
-import alice.datasegment.ReceiveData;
 import alice.topology.manager.keeparive.RespondData;
 
 public class IncomingTcpConnection extends Thread {
@@ -104,7 +99,7 @@
                     System.out.println("in TCP REPLY");
                     cmd = manager.getAndRemoveCmd(msg.seq);
 
-                    rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()));
+                    rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()));//ここのコンストラクタでx
                     rData.setCompressFlag(msg.compressed);
 
                     Command rCmd = new Command(type, null, null, rData, msg.index, msg.seq, null, null, "");
@@ -116,7 +111,7 @@
                     break;
                 case RESPONSE:
                     rData = new ReceiveData(new RespondData(reverseKey, System.currentTimeMillis()));
-                    DataSegment.getLocal().put(msg.key, rData, false);
+                    DataSegment.getLocal().put(msg.key, rData, null);
                     break;
                 default:
                     break;
--- a/src/main/java/alice/daemon/IncomingUdpConnection.java	Sat Apr 18 19:09:15 2015 +0900
+++ b/src/main/java/alice/daemon/IncomingUdpConnection.java	Thu Apr 30 18:14:02 2015 +0900
@@ -88,7 +88,7 @@
                     break;
                 case RESPONSE:
                     rData = new ReceiveData(new RespondData(reverseKey, System.currentTimeMillis()));
-                    DataSegment.getLocal().put(msg.key, rData, false);
+                    DataSegment.getLocal().put(msg.key, rData, null);
                     break;
                 default:
                     break;
--- a/src/main/java/alice/daemon/OutboundTcpConnection.java	Sat Apr 18 19:09:15 2015 +0900
+++ b/src/main/java/alice/daemon/OutboundTcpConnection.java	Thu Apr 30 18:14:02 2015 +0900
@@ -27,7 +27,7 @@
                 default:
                     break;
                 }
-                connection.write(cmd);
+                connection.write(cmd);//ここでconvert()がよばれてる
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
--- a/src/main/java/alice/datasegment/Command.java	Sat Apr 18 19:09:15 2015 +0900
+++ b/src/main/java/alice/datasegment/Command.java	Thu Apr 30 18:14:02 2015 +0900
@@ -74,60 +74,42 @@
      * @return serialized ByteBuffer
      */
     public ByteBuffer convert() {//byteArrayに変換
+
         ByteBuffer buf = null;
-        MessagePack msg = SingletonMessage.getInstance();
-        try {
-            byte[] header = null;//DSのメタデータ用byteArray
-            byte[] data = null;//DS本体用byteArray
-            byte[] dataSize = null;//DSのサイズ
-
-            switch (type) {
-            /*
-             * UPDATE, PUT, REPLY need send DataSegment to RemoteDataSegment
-             * case UPDATE and PUT
-             * compress and serialize flag are selected by user, so if true, need convert.
-             * case REPLY
-             * these flags represent DataSegment status.
-             * for example, serializeFlag is true. DataSegment had already converted, so no need convert.
-             */
-            case UPDATE:
-            case PUT:
-            case REPLY://ReceiveDataからREPLYするDSを取得
-                System.out.println("in REPLY");
-
-                if (!rData.serialized()) {//純粋なオブジェクトの場合シリアライズ
-                    data = msg.write(rData.getObj());
-                } else { // rData is RAW ByteArray or already serialized
-                    data = rData.asByteArray();
-                }
+        switch (type) {
+        /*
+         * UPDATE, PUT, REPLY need send DataSegment to RemoteDataSegment
+         * case UPDATE and PUT
+         * compress and serialize flag are selected by user, so if true, need convert.
+         * case REPLY
+         * these flags represent DataSegment status.
+         * for example, serializeFlag is true. DataSegment had already converted, so no need convert.
+         */
 
-                CommandMessage cm = new CommandMessage(type.id, index, seq, key, false, rData.serialized(), rData.compressed());
-                if (rData.setTime) {//AliceVNCの計測用(消してもいい)
-                    cm.setTime = true;
-                    cm.time = rData.time;
-                    cm.depth = rData.depth + 1;
-                }
+        case UPDATE:
+            System.out.println("update compressFlag:" + compressFlag);
+            rData.setCompressFlag(compressFlag);
+            break;
+        case PUT:
+            System.out.println("put compressFlag:" + compressFlag);
+            rData.setCompressFlag(compressFlag);
+            break;
+        case REPLY://ReceiveDataからREPLYするDSを取得
+            System.out.println("in REPLY");
+            System.out.println("reply compressFlag:" + compressFlag);
+            buf = rData.setMPHeader(new CommandMessage(type.id, index, seq, key, false, rData.serialized(), rData.compressed()), type);
+            rData.setCompressFlag(compressFlag);
 
-                //MessagePackでDSを作成(ヘッダー・データ本体のサイズ・データ本体)
-                header = msg.write(cm);
-                dataSize = msg.write(data.length);
-                buf = ByteBuffer.allocate(header.length+dataSize.length+data.length);
-                buf.put(header);
-                buf.put(dataSize);
-                buf.put(data);
+            break;
+        default://PEEK, TAKE, RemoveならCommandMessageとそのサイズのみセット
+            ReceiveData rData2 = new ReceiveData("hoge");
+            System.out.println("default compressFlag:" + compressFlag);
+            buf = rData2.setMPHeader(new CommandMessage(type.id, index, seq, key, quickFlag, false, compressFlag), type);
+            rData2.setCompressFlag(compressFlag);
 
-                break;
-            default://PEEK, TAKE, RemoveならCommandMessageとそのサイズのみセット
-                header = msg.write(new CommandMessage(type.id, index, seq, key, quickFlag, false, compressFlag));
-                buf = ByteBuffer.allocate(header.length);
-                buf.put(header);
-                break;
-            }
+            break;
+        }
 
-            buf.flip();
-        } catch (IOException e) {
-            e.printStackTrace();
-        }//ここに圧縮機能を入れる
         return buf;
     }
 
--- a/src/main/java/alice/datasegment/DataSegmentKey.java	Sat Apr 18 19:09:15 2015 +0900
+++ b/src/main/java/alice/datasegment/DataSegmentKey.java	Thu Apr 30 18:14:02 2015 +0900
@@ -33,7 +33,8 @@
             for (Iterator<Command> iter = waitList.iterator(); iter.hasNext(); ) {
                 Command waitCmd = iter.next();
                 if (waitCmd.index < index) {
-                    replyValue(waitCmd, dsv);
+                    System.out.println("DSKey cmdFlag:" + cmd.getCompressFlag());
+                    replyValue(waitCmd, dsv, cmd.getCompressFlag());
                     iter.remove();
                     if (waitCmd.type == CommandType.TAKE) { // someone is waiting for this put or update command
                         dataList.remove(dsv);
@@ -67,7 +68,9 @@
             for (Iterator<DataSegmentValue> iter = dataList.iterator(); iter.hasNext(); ) {
                 DataSegmentValue data = iter.next();
                 if (data.index > cmd.index) {
-                    replyValue(cmd, data);
+                    System.out.println("DSKey2 cmdFlag:" + cmd.getCompressFlag());
+                    replyValue(cmd, data, cmd.getCompressFlag());
+                    //replyValue(cmd, data);
                     iter.remove();
                     waitFlag = false;
                     break;
@@ -86,6 +89,25 @@
 
     public void replyValue(Command cmd, DataSegmentValue data){
         Command rCmd = new Command(CommandType.REPLY, null, null, data.rData, data.index, cmd.seq, null, null, data.from);
+        cmd.setCompressFlag(true);
+        if (cmd.cs!=null){ // if cmd has cs-instance, it means Command from local.
+            cmd.cs.ids.reply(cmd.receiver, rCmd);
+        } else {
+            try {
+                if (!cmd.getQuickFlag()) {
+                    cmd.connection.sendQueue.put(rCmd);
+                } else {
+                    cmd.connection.write(rCmd);
+                }
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    public void replyValue(Command cmd, DataSegmentValue data, boolean cFlag){
+        Command rCmd = new Command(CommandType.REPLY, null, null, data.rData, data.index, cmd.seq, null, null, data.from);
+        rCmd.setCompressFlag(cFlag);
         if (cmd.cs!=null){ // if cmd has cs-instance, it means Command from local.
             cmd.cs.ids.reply(cmd.receiver, rCmd);
         } else {
--- a/src/main/java/alice/datasegment/DataSegmentManager.java	Sat Apr 18 19:09:15 2015 +0900
+++ b/src/main/java/alice/datasegment/DataSegmentManager.java	Thu Apr 30 18:14:02 2015 +0900
@@ -51,12 +51,10 @@
     }
 
     //各コマンドの抽象クラス
-    public abstract void put(String key, ReceiveData rData, boolean quickFlag);
-    public abstract void update(String key, ReceiveData rData, boolean quickFlag);
-    public abstract void peek(Receiver receiver, CodeSegment cs, boolean quickFlag);
-    public abstract void take(Receiver receiver, CodeSegment cs, boolean quickFlag);
-
-    public abstract void take(Receiver receiver, CodeSegment cs, boolean quickFlag, boolean compressFlag);
+    public abstract void put(String key, ReceiveData rData, SendOption option);
+    public abstract void update(String key, ReceiveData rData, SendOption option);
+    public abstract void peek(Receiver receiver, CodeSegment cs, SendOption option);
+    public abstract void take(Receiver receiver, CodeSegment cs, SendOption option);
 
     public abstract void remove(String key);
     public abstract void shutdown();
--- a/src/main/java/alice/datasegment/LocalDataSegmentManager.java	Sat Apr 18 19:09:15 2015 +0900
+++ b/src/main/java/alice/datasegment/LocalDataSegmentManager.java	Thu Apr 30 18:14:02 2015 +0900
@@ -75,52 +75,52 @@
     }
 
     @Override
-    public void put(String key, ReceiveData rData, boolean quickFlag) {
+    public void put(String key, ReceiveData rData, SendOption option) {
         DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
-        Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, reverseKey);
+        Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, reverseKey);//ここでMP変換している
+        cmd.setCompressFlag(option.getCompressFlag());
+        rData.setCompressFlag(option.getCompressFlag());
+
         dataSegmentKey.runCommand(cmd);
         if (logger.isDebugEnabled())
             logger.debug(cmd.getCommandString());
     }
 
-    public void put(String key, ReceiveData rData) {
-        this.put(key, rData, false);
-    }
-
     /**
      * Enqueue update command to the queue of each DataSegment key
      */
 
     @Override
-    public void update(String key, ReceiveData rData, boolean quickFlag) {
+    public void update(String key, ReceiveData rData, SendOption option) {
         DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
         Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, reverseKey);
+        cmd.setCompressFlag(option.getCompressFlag());
+        rData.setCompressFlag(option.getCompressFlag());
+
         dataSegmentKey.runCommand(cmd);
         if (logger.isDebugEnabled())
             logger.debug(cmd.getCommandString());
     }
 
     @Override
-    public void take(Receiver receiver, CodeSegment cs, boolean quickFlag) {
-        take(receiver, cs, quickFlag, false);
-    }
-
-    @Override
-    public void take(Receiver receiver, CodeSegment cs, boolean quickFlag, boolean compressFlag) {
+    public void take(Receiver receiver, CodeSegment cs, SendOption option) {
         DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key);
         int seq = this.seq.getAndIncrement();
         Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
-        cmd.setCompressFlag(compressFlag);
+        cmd.setCompressFlag(option.getCompressFlag());
+
         dataSegmentKey.runCommand(cmd);
         if (logger.isDebugEnabled())
             logger.debug(cmd.getCommandString());
     }
 
     @Override
-    public void peek(Receiver receiver, CodeSegment cs, boolean quickFlag) {
+    public void peek(Receiver receiver, CodeSegment cs, SendOption option) {
         DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key);
         int seq = this.seq.getAndIncrement();
         Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
+        cmd.setCompressFlag(option.getCompressFlag());
+
         dataSegmentKey.runCommand(cmd);
         if (logger.isDebugEnabled())
             logger.debug(cmd.getCommandString());
--- a/src/main/java/alice/datasegment/ReceiveData.java	Sat Apr 18 19:09:15 2015 +0900
+++ b/src/main/java/alice/datasegment/ReceiveData.java	Thu Apr 30 18:14:02 2015 +0900
@@ -3,13 +3,17 @@
 import java.io.*;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.LinkedList;
 import java.util.zip.*;
 
+import alice.daemon.CommandMessage;
 import org.apache.log4j.Logger;
+import org.msgpack.MessagePack;
 import org.msgpack.type.Value;
 import alice.codesegment.SingletonMessage;
 
 import javax.xml.bind.DatatypeConverter;
+import java.io.ByteArrayInputStream;
 
 import static java.util.Arrays.*;
 
@@ -17,15 +21,20 @@
  * 送られてきたDSを一時的に取っておくクラス。inputでも使用。
  */
 public class ReceiveData {
-    private Object val;//Object型のDS
-    private byte[] messagePack;//byteArray(serialized)のDS
-    private byte[] zMessagePack;//byteArray(compressed)のDS
+    private Object val;//for Object DS
+    private byte[] messagePack;//for byteArray(serialized) DS
+    private byte[] zMessagePack;//for byteArray(compressed) DS
     private Class<?> clazz;
 
     public long time;//測定用
     public boolean setTime = false;
     public int depth = 1;
 
+    private Deflater deflater = new Deflater();
+    private Inflater inflater = new Inflater();
+    private ByteBuffer buf = null;
+    private MessagePack msg = SingletonMessage.getInstance();
+
     /**
      * コンストラクタ。Object型のDSと圧縮のメタ情報を受け取る。
      * put/update/reply用?
@@ -66,28 +75,28 @@
         return val == null;
     }
 
-    public Object getObj(){//Object型のDS本体を取得するメソッド。
+    public Object getObj(){
         return asClass(Object.class);
     }
 
-    public String asString(){//String型としてDSを取得するメソッド。DSがシリアライズされていればStringクラスを返す。
+    public String asString(){
         return asClass(String.class);
     }
 
-    public int asInteger(){//Int型としてDSを取得するメソッド。DSがシリアライズされていればIntクラスを返す。
+    public int asInteger(){
         return asClass(Integer.class);
     }
 
-    public Float asFloat(){//Float型としてDSを取得するメソッド。DSがシリアライズされていればFloatクラスを返す。
+    public Float asFloat(){
         return asClass(Float.class);
     }
 
-    public Value getVal(){//Value型としてDSを取得するメソッド
-        if (val == null){//val != null
+    public Value getVal(){///get DS as Value type
+        if (val == null){///val != null
             return asClass(Value.class);
-        } else {//ここに圧縮のときの処理入れるべきなのでは
+        } else {
             try {
-                return SingletonMessage.getInstance().unconvert(val);//MassagePackでvalue型に変換。できなければnullを返す。
+                return SingletonMessage.getInstance().unconvert(val);///convert to Value type by MassagePack
             } catch (IOException e) {
                 e.printStackTrace();
             }
@@ -104,7 +113,7 @@
      * @param <T>
      * @return
      */
-    public <T> T asClass(Class<T> clazz) {//javasist
+    public <T> T asClass(Class<T> clazz) {///javasist
         System.out.println("in asClass val:" + val + ", MP:" + messagePack + ", zMP:" + zMessagePack);
 
         try {
@@ -112,68 +121,70 @@
                 return (T) val;
             }
 
-            if (zMessagePack != null && messagePack == null) {
-                messagePack = unzip(copyOfRange(zMessagePack, 1, zMessagePack.length - 1));//先頭0xC1を削除したものを解凍
+            if (zMessagePack != null && messagePack == null) {//ToDo:fix
+                messagePack = unzip(zMessagePack);
+                System.out.println("unzip messagePack:" + messagePack);
+                //zMessagePack = null;?
             }
 
             return SingletonMessage.getInstance().read(messagePack, clazz);
 
-        } catch (IOException | DataFormatException e) {
+        } catch (IOException e) {// | DataFormatException e
             e.printStackTrace();
             return null;
         }
     }
 
-    /**
-     * java.util.zip.Inflater(zlib)を使ってbyteArray型のDSを解凍する。
-     *
-     * @param input 圧縮されたbyteArray型のDS
-     * @return 解凍したbyteArray型DS
-     * @throws IOException
-     * @throws DataFormatException
-     */
-    public byte[] unzip(byte[] input) throws IOException, DataFormatException{
-        Inflater inflater = new Inflater();
-        ByteArrayOutputStream os = new ByteArrayOutputStream();
-        InflaterOutputStream ios = new InflaterOutputStream(os, inflater);
-        ios.write(input);
-        ios.finish();
-        return os.toByteArray();
-    }
+    public void setCompressFlag(boolean cFlag) {///compress
+        LinkedList<ByteBuffer> input = new LinkedList<ByteBuffer>();
+        LinkedList<ByteBuffer> output = new LinkedList<ByteBuffer>();
 
-    /**
-     * java.util.zip.Deflater(zlib)を使ってbyteArray型のDSを圧縮する。
-     *
-     * @param input 非圧縮状態のbyteArray型のDS
-     * @return 圧縮したbyteArray型DS
-     * @throws IOException
-     */
-    public byte[] zip(byte[] input) throws IOException{
-        Deflater deflater = new Deflater();
-        ByteArrayOutputStream os = new ByteArrayOutputStream();
-        DeflaterOutputStream dos = new DeflaterOutputStream(os, deflater);
-        dos.write(input);
-        dos.finish();
-        return os.toByteArray();
-    }
-
-    public void setCompressFlag(boolean cFlag) {
         if (cFlag){
-            try {
-                messagePack = asByteArray();
-            } catch (IOException e) {
-                e.printStackTrace();
+            System.out.println("in setCompressFlag  val:" + val + ", MP:" + messagePack + ", zMP:" + zMessagePack);
+            //messagePack =  asByteArray();///ToDo:fix
+            if (val != null){
+                try {
+                    messagePack = msg.write(val);
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            } else {
+                messagePack = unzip(zMessagePack);
             }
-
-            ByteBuffer buf = null;
+            System.out.println("no zip messagePack:" + messagePack);
+            System.out.print("no zip messagePack: ");
+            for (int i = 0; i < messagePack.length; i++) {
+                System.out.print(Integer.toHexString(messagePack[i] & 0xff));
+            }
+            System.out.print("\n");
+            System.out.println("no zip messagePack length:" + messagePack.length);
 
             try {
-                System.out.println("in zip");
-                byte[] z = zip(messagePack);
-                buf = ByteBuffer.allocate(z.length + 1);
-                buf.put((byte) 0xc1);
-                buf.put(z);
-                zMessagePack = buf.array();
+                //System.out.println("in zip");
+                input.add(ByteBuffer.wrap(messagePack));
+                int len = zip(input, 0, output);
+
+                byte[] ziped = new byte[len + 8];
+                ziped[0] = (byte) 0xc1;///set compressedFlag to header
+                ziped[1] = ziped[2] = ziped[3] = (byte) 0x00;
+                System.arraycopy(intToByteArray(messagePack.length), 0, ziped, 4, 4);///set data length to header
+
+                System.out.println("zipedlen: " + len);
+                //System.out.println("limit: " + output.get(0).limit());
+                //System.out.println("remaining: " + output.get(0).remaining());
+                int tmp = 0;
+                for (int i = 0; i < output.size(); i++){///Is this copy OK???
+                    System.arraycopy(output.get(i).array(), 0, ziped, 8 + tmp, output.get(i).limit());//limit? remaining?
+                    tmp += output.get(i).limit();
+                }
+
+                System.out.print("ziped: ");
+                for (int i = 0; i < ziped.length; i++) {
+                    System.out.print(Integer.toHexString(ziped[i] & 0xff));
+                }
+                System.out.print("\n");
+
+                zMessagePack = ziped;
                 val = null;
                 messagePack = null;
             } catch (IOException e) {
@@ -182,12 +193,152 @@
         }
     }
 
-    public Class getClaszz(){
-        return clazz;
+    public ByteBuffer setMPHeader(CommandMessage cm, CommandType type){
+
+        System.out.println("in setMPHeader  val:" + val + ", MP:" + messagePack + ", zMP:" + zMessagePack);
+
+        try {
+            byte[] header = null;//DSのメタデータ用byteArray
+            byte[] data = null;//DS本体用byteArray
+            byte[] dataSize = null;//DSのサイズ
+
+            if (type == CommandType.REPLY){
+                if (val != null) {//純粋なオブジェクトの場合シリアライズ
+                    data = msg.write(val);
+                    System.out.print("header MP data: ");
+                    for (int i = 0; i < data.length; i++) {
+                        System.out.print(Integer.toHexString(data[i] & 0xff));
+                    }
+                    System.out.print("\n");
+                } else { // rData is RAW ByteArray or already serialized
+                    data = messagePack;
+                }
+
+                if (setTime) {//AliceVNCの計測用(消してもいい)
+                    cm.setTime = true;
+                    cm.time = time;
+                    cm.depth = depth + 1;
+                }
+
+                //MessagePackでDSを作成(ヘッダー・データ本体のサイズ・データ本体)
+                header = msg.write(cm);
+                dataSize = msg.write(data.length);
+                buf = ByteBuffer.allocate(header.length+dataSize.length+data.length);
+                buf.put(header);
+                buf.put(dataSize);
+                buf.put(data);
+            } else {
+                header = msg.write(cm);
+                buf = ByteBuffer.allocate(header.length);
+                buf.put(header);
+            }
+
+            buf.flip();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+        messagePack = buf.array();
+
+        System.out.print("MP with header: ");
+        for (int i = 0; i < messagePack.length; i++) {
+            System.out.print(Integer.toHexString(messagePack[i] & 0xff));
+        }
+        System.out.print("\n");
+
+        return buf;
     }
 
 
-    public byte[] asByteArray() throws IOException{
+    public int zip(LinkedList<ByteBuffer> inputs, int inputIndex, LinkedList<ByteBuffer> outputs) throws IOException {
+        int len = 0;
+        int INFLATE_BUFSIZE = 1024 * 100;
+        ByteBuffer c1 = allocate(INFLATE_BUFSIZE);//for output
+
+        while (inputIndex < inputs.size()) {
+            ByteBuffer b1 = inputs.get(inputIndex++);
+            deflater.setInput(b1.array(), b1.position(), b1.remaining());
+            /**
+             * If we finish() stream and reset() it, Deflater start new gzip
+             * stream, this makes continuous zlib reader unhappy. if we remove
+             * finish(), Deflater.deflate() never flushes its output. The
+             * original zlib deflate has flush flag. I'm pretty sure this a kind
+             * of bug of Java library.
+             */
+            if (inputIndex == inputs.size()){
+                deflater.finish();
+            }
+
+            int len1 = 0;
+            do {
+                len1 = deflater.deflate(c1.array(), c1.position(), c1.remaining());///Bytearray for ziped data、start offset、length
+                if (len1 > 0) {
+                    len += len1;
+                    c1.position(c1.position() + len1);
+                    if (c1.remaining() == 0) {
+                        c1.flip();
+                        outputs.addLast(c1);
+                        c1 = allocate(INFLATE_BUFSIZE);
+                    }
+                }
+            } while (len1 > 0 || !deflater.needsInput());//needsInput()...true if setInput is empty
+        }
+        if (c1.position() != 0) {
+            c1.flip();
+            outputs.addLast(c1);
+        }
+        deflater.reset();
+        return len;///return length of ziped data
+    }
+
+    protected byte[] unzip(byte[] input) {///read header & unzip
+        int length = input.length;
+        int zippedLength = byteArrayToInt(copyOfRange(input, 4, 8));///read header...Is this copy OK??
+
+        byte [] output = new byte [zippedLength];///byteArray for unziped data
+        inflater.setInput(input, 8, length - 8);///set unzip data without header
+
+        try {
+            System.out.println("return:" + inflater.inflate(output, 0, zippedLength));///unzip
+        } catch (DataFormatException e) {
+            e.printStackTrace();
+        }
+
+        inflater.reset();
+
+        System.out.print("unziped: ");
+        for (int i = 0; i < output.length; i++) {
+            System.out.print(Integer.toHexString(output[i] & 0xff));
+        }
+        System.out.print("\n");
+
+        return output;
+ 	}
+
+
+    public ByteBuffer allocate(int size)
+    {
+        ByteBuffer b = null;
+        while(true){
+            try {
+                b = ByteBuffer.allocate(size);
+            } catch (OutOfMemoryError e) {
+                b = null;
+                System.err.println("multicastqueue : wait for heap : " + e);
+            }
+            if (b!=null) {
+                break;
+            }
+            try {
+                wait();
+            } catch (InterruptedException e) {
+                System.out.println("thread has interrupted the current thread.");
+            }
+        }
+        return b;
+    }
+
+    public byte[] asByteArray() throws IOException{///ToDo : delete
         ByteArrayOutputStream buff = new ByteArrayOutputStream();
         ObjectOutput out = new ObjectOutputStream(buff);
         out.writeObject(this.val);
@@ -197,4 +348,22 @@
         return bytes;
     }
 
+    public static int byteArrayToInt(byte[] b)
+    {
+        return   b[3] & 0xFF |
+                (b[2] & 0xFF) << 8 |
+                (b[1] & 0xFF) << 16 |
+                (b[0] & 0xFF) << 24;
+    }
+
+    public static byte[] intToByteArray(int a)
+    {
+        return new byte[] {
+                (byte) ((a >> 24) & 0xFF),
+                (byte) ((a >> 16) & 0xFF),
+                (byte) ((a >> 8) & 0xFF),
+                (byte) (a & 0xFF)
+        };
+    }
+
 }
--- a/src/main/java/alice/datasegment/RemoteDataSegmentManager.java	Sat Apr 18 19:09:15 2015 +0900
+++ b/src/main/java/alice/datasegment/RemoteDataSegmentManager.java	Thu Apr 30 18:14:02 2015 +0900
@@ -58,10 +58,10 @@
      * send put command to target DataSegment
      */
     @Override
-    public void put(String key, ReceiveData rData, boolean quickFlag) {
+    public void put(String key, ReceiveData rData, SendOption option) {
         Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, "");///set compressed flag
-        //cmd.setCompressFlag(option.isCompress());
-        if (quickFlag){
+        cmd.setCompressFlag(option.getCompressFlag());
+        if (option.getQuickFlag()){
             connection.write(cmd); // put command is executed right now
         } else {
             connection.sendCommand(cmd); // put command on the transmission thread
@@ -71,10 +71,10 @@
     }
 
     @Override
-    public void update(String key, ReceiveData rData, boolean quickFlag) {
+    public void update(String key, ReceiveData rData, SendOption option) {
         Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, "");
-        //cmd.setCompressFlag(option.isCompress());
-        if (quickFlag){
+        cmd.setCompressFlag(option.getCompressFlag());
+        if (option.getQuickFlag()){
             connection.write(cmd);
         } else {
             connection.sendCommand(cmd);
@@ -84,18 +84,14 @@
     }
 
     @Override
-    public void take(Receiver receiver, CodeSegment cs, boolean quickFlag) {
-        take(receiver, cs, quickFlag, false);
-    }
-
-    public void take(Receiver receiver, CodeSegment cs, boolean quickFlag, boolean compressFlag) {
+    public void take(Receiver receiver, CodeSegment cs, SendOption option) {
         int seq = this.seq.getAndIncrement();
         Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
-        cmd.setQuickFlag(quickFlag);
-        cmd.setCompressFlag(compressFlag);
+        //cmd.setQuickFlag(option.getQuickFlag());
+        cmd.setCompressFlag(option.getCompressFlag());
 
         seqHash.put(seq, cmd);
-        if (quickFlag){
+        if (option.getQuickFlag()){
             connection.write(cmd);
         } else {
             connection.sendCommand(cmd);
@@ -105,12 +101,14 @@
     }
 
     @Override
-    public void peek(Receiver receiver, CodeSegment cs, boolean quickFlag) {
+    public void peek(Receiver receiver, CodeSegment cs, SendOption option) {
         int seq = this.seq.getAndIncrement();
         Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
-        cmd.setQuickFlag(quickFlag);
+        cmd.setQuickFlag(option.getQuickFlag());
+        //cmd.setCompressFlag(option.getCompressFlag());
+
         seqHash.put(seq, cmd);
-        if (quickFlag){
+        if (option.getQuickFlag()){
             connection.write(cmd);
         } else {
             connection.sendCommand(cmd);
--- a/src/main/java/alice/datasegment/SendOption.java	Sat Apr 18 19:09:15 2015 +0900
+++ b/src/main/java/alice/datasegment/SendOption.java	Thu Apr 30 18:14:02 2015 +0900
@@ -1,30 +1,30 @@
 package alice.datasegment;
 
 /**
- * フラグを一時的に格納するだけ。たぶんリモート用。
+ * フラグを一時的に格納するだけ。
  */
 public class SendOption {
-    private boolean quick = false;
-    private boolean compress = false;
+    private boolean quickFlag = false;
+    private boolean compressFlag = false;
     
     public SendOption(boolean qFlag, boolean cFlag) {
-        quick = qFlag;
-        compress = cFlag;
+        quickFlag = qFlag;
+        compressFlag = cFlag;
     }
 
-    public boolean isQuick() {
-        return quick;
+    public boolean getQuickFlag() {
+        return quickFlag;
     }
 
-    public void setQuick(boolean quick) {
-        this.quick = quick;
+    public void setQuickFlag(boolean quick) {
+        this.quickFlag = quick;
     }
 
-    public boolean isCompress() {
-        return compress;
+    public boolean getCompressFlag() {
+        return compressFlag;
     }
 
-    public void setCompress(boolean compress) {
-        this.compress = compress;
+    public void setCompressFlag(boolean compress) {
+        this.compressFlag = compress;
     }
 }
--- a/src/main/java/alice/test/codesegment/remote/RemoteStartCodeSegment.java	Sat Apr 18 19:09:15 2015 +0900
+++ b/src/main/java/alice/test/codesegment/remote/RemoteStartCodeSegment.java	Thu Apr 30 18:14:02 2015 +0900
@@ -11,5 +11,4 @@
 
         ods.put("local", "num", 0, true);
     }
-
-}
+}
\ No newline at end of file