changeset 530:4aeebea0c9b5 dispose

can't unzip
author Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
date Sun, 03 May 2015 10:04:28 +0900
parents cb7c31848d16
children b6049fb123d8
files src/main/java/alice/codesegment/OutputDataSegment.java src/main/java/alice/daemon/CommandMessage.java src/main/java/alice/daemon/IncomingTcpConnection.java src/main/java/alice/datasegment/Command.java src/main/java/alice/datasegment/CompressedLocalDataSegmentManager.java src/main/java/alice/datasegment/CompressedRemoteDataSegmentManager.java src/main/java/alice/datasegment/DataSegmentKey.java src/main/java/alice/datasegment/ReceiveData.java
diffstat 8 files changed, 140 insertions(+), 60 deletions(-) [+]
line wrap: on
line diff
--- a/src/main/java/alice/codesegment/OutputDataSegment.java	Fri May 01 18:19:16 2015 +0900
+++ b/src/main/java/alice/codesegment/OutputDataSegment.java	Sun May 03 10:04:28 2015 +0900
@@ -7,7 +7,6 @@
 import alice.datasegment.SendOption;
 
 public class OutputDataSegment {
-    private boolean compressFlag = false;//圧縮するかどうか
 
     /**
      * for local
@@ -96,7 +95,6 @@
         System.out.println("in PUT");
         ReceiveData rData = new ReceiveData(val);
         SendOption option = new SendOption(false, cFlag);
-        //rData.setCompressFlag(cFlag);
 
         if (!managerKey.equals("local") && DataSegment.contains(managerKey)){///if remote
             if (cFlag){
@@ -134,7 +132,6 @@
         if (!managerKey.equals("local") && DataSegment.contains(managerKey)){
             ReceiveData rData = new ReceiveData(val);
             SendOption option = new SendOption(true, cFlag);
-            //rData.setCompressFlag(cFlag);
 
             if (cFlag){
                 DataSegment.get(managerKey + "!").put(key, rData, option);
@@ -166,7 +163,6 @@
     public void update(String managerKey, String key, Object val, boolean cFlag) {//追加
         ReceiveData rData = new ReceiveData(val);
         SendOption option = new SendOption(false, cFlag);
-        //rData.setCompressFlag(cFlag);
 
         if (!managerKey.equals("local") && DataSegment.contains(managerKey)){///if remote
             if (cFlag){
@@ -249,11 +245,4 @@
             DataSegment.get(managerKey).ping(returnKey);
     }
 
-    public boolean compressFlag() {
-        return compressFlag;
-    }
-
-    public void setCompressFlag(boolean cFlag) {
-        compressFlag = cFlag;
-    }
 }
--- a/src/main/java/alice/daemon/CommandMessage.java	Fri May 01 18:19:16 2015 +0900
+++ b/src/main/java/alice/daemon/CommandMessage.java	Sun May 03 10:04:28 2015 +0900
@@ -14,7 +14,7 @@
     public boolean quickFlag = false;//SEDAを挟まずに処理を行うかどうか
     public boolean serialized = false;//シリアライズされているかどうか
     public boolean compressed = false;//圧縮されているかどうか
-    public int datasize;
+    public int dataSize = 0;
 
     public boolean setTime = false;//?
     public long time;//?
@@ -23,7 +23,7 @@
     public CommandMessage() {}
 
     public CommandMessage(int type, int index, int seq, String key
-            , boolean qFlag, boolean sFlag, boolean cFlag) {//コンストラクタ. setter.
+            , boolean qFlag, boolean sFlag, boolean cFlag, int datasize) {
         this.type = type;
         this.index = index;
         this.seq = seq;
@@ -31,6 +31,6 @@
         this.quickFlag = qFlag;
         this.serialized = sFlag;
         this.compressed = cFlag;
-        ///this.datasize = datasize;
+        this.dataSize = datasize;
     }
 }
--- a/src/main/java/alice/daemon/IncomingTcpConnection.java	Fri May 01 18:19:16 2015 +0900
+++ b/src/main/java/alice/daemon/IncomingTcpConnection.java	Sun May 03 10:04:28 2015 +0900
@@ -6,6 +6,7 @@
 
 import alice.datasegment.*;
 import org.msgpack.MessagePack;
+import org.msgpack.type.Value;
 import org.msgpack.unpacker.Unpacker;
 
 import alice.topology.manager.keeparive.RespondData;
@@ -54,13 +55,13 @@
             try {
                 Command cmd = null;
                 ReceiveData rData = null;
-                CommandMessage msg = unpacker.read(CommandMessage.class);
+                CommandMessage msg = unpacker.read(CommandMessage.class);///read header
                 CommandType type = CommandType.getCommandTypeFromId(msg.type);
                 switch (type) {
                 case UPDATE:
                 case PUT:
                     System.out.println("in TCP PUT");
-                    rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()), msg.compressed);
+                    rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()), msg.compressed, msg.dataSize);///read rData
 
                     if (msg.setTime) {
                         rData.setTime = true;
@@ -69,30 +70,49 @@
                     }
 
                     cmd = new Command(type, null, null, rData, 0, 0, null, null, reverseKey);
+                    cmd.setCompressFlag(msg.compressed);
 
-                    lmanager.getDataSegmentKey(msg.key).runCommand(cmd);
-
+                    if (rData.compressed()){
+                        compressedlmanager.getDataSegmentKey(msg.key).runCommand(cmd);
+                    } else {
+                        lmanager.getDataSegmentKey(msg.key).runCommand(cmd);
+                    }
 
                     break;
                 case PEEK:
                 case TAKE:
-                    System.out.println("in TCP TAKE");
+                    System.out.println("in TCP TAKE:" + msg.compressed);
                     cmd = new Command(type, null, null, null, msg.index, msg.seq, null, null, connection);
+                    cmd.setCompressFlag(msg.compressed);
 
-                    lmanager.getDataSegmentKey(msg.key).runCommand(cmd);
+                    if (msg.compressed){
+                        compressedlmanager.getDataSegmentKey(msg.key).runCommand(cmd);
+                    } else {
+                        lmanager.getDataSegmentKey(msg.key).runCommand(cmd);
+                    }
 
                     break;
                 case REMOVE:
                     cmd = new Command(type, null, null, null, 0, 0, null, null, "");
-                    lmanager.getDataSegmentKey(msg.key).runCommand(cmd);
+                    lmanager.getDataSegmentKey(msg.key).runCommand(cmd);//ToDo:fix
                     break;
                 case REPLY:
-                    System.out.println("in TCP REPLY");
+                    System.out.println("in TCP REPLY:");
+                    System.out.println("After DataSize:" + msg.dataSize);
                     cmd = manager.getAndRemoveCmd(msg.seq);
+                    byte[] unpack = unpacker.getSerializedByteArray(unpacker.readInt());
+                    //Value unpack2 = packer.read(byte []);
+                    System.out.print("REPLY unpacker: ");
+                    for (int i = 0; i < unpack.length; i++) {
+                        System.out.print(Integer.toHexString(unpack[i] & 0xff));
+                    }
+                    System.out.print("\n");
 
-                    rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()));
+
+                    rData = new ReceiveData(unpack, msg.compressed, msg.dataSize);
 
                     Command rCmd = new Command(type, null, null, rData, msg.index, msg.seq, null, null, "");
+                    cmd.setCompressFlag(msg.compressed);
                     cmd.cs.ids.reply(cmd.receiver, rCmd);
                     break;
                 case PING:
--- a/src/main/java/alice/datasegment/Command.java	Fri May 01 18:19:16 2015 +0900
+++ b/src/main/java/alice/datasegment/Command.java	Sun May 03 10:04:28 2015 +0900
@@ -94,31 +94,30 @@
                 case UPDATE:
                 case PUT:
                 case REPLY:
-                    if (rData.compressed()) {
-                        // have already converted
-                        data = (byte[]) rData.getObj();
-                        compressed = rData.compressed(); // true
-                        serialized = rData.serialized();
+                    System.out.println("Command reply compressFlag:" + compressFlag);
+                    if(compressFlag){
+                        System.out.println("Command get zMP:" + rData.getZMessagePack());
+                        data = packer.write(rData.getZMessagePack());
+                        compressed = true;
                     } else {
-                        if (!rData.serialized() && !rData.isByteArray()) {
-                            data = packer.write(rData.getObj());
-                            serialized = true;
-                        } else { // rData is RAW ByteArray or already serialized
-                            data = (byte[]) rData.getObj();
-                            serialized = rData.serialized();
-                        }
-                        if (compressFlag) {
-                            rData.zip();
-                            compressed = true;
-                        }
+                        data = rData.getMessagePack();
+                        serialized = true;
                     }
-                    CommandMessage cm = new CommandMessage(type.id, index, seq, key, false, serialized, compressed);
+
+                    System.out.println("Before DataSize:" + rData.getDataSize());
+                    CommandMessage cm = new CommandMessage(type.id, index, seq, key, false, serialized, compressed, rData.getDataSize());
                     if (rData.setTime) {
                         cm.setTime = true;
                         cm.time = rData.time;
                         cm.depth = rData.depth + 1;
                     }
 
+                    System.out.print("Command packer: ");
+                    for (int i = 0; i < data.length; i++) {
+                        System.out.print(Integer.toHexString(data[i] & 0xff));
+                    }
+                    System.out.print("\n");
+
                     header = packer.write(cm);
                     dataSize = packer.write(data.length);
                     buf = ByteBuffer.allocate(header.length+dataSize.length+data.length);
@@ -127,7 +126,8 @@
                     buf.put(data);
                     break;
                 default:
-                    header = packer.write(new CommandMessage(type.id, index, seq, key, quickFlag, false, compressFlag));
+                    System.out.println("default compressFlag:" + compressFlag);
+                    header = packer.write(new CommandMessage(type.id, index, seq, key, quickFlag, false, compressFlag, 0));
                     buf = ByteBuffer.allocate(header.length);
                     buf.put(header);
                     break;
--- a/src/main/java/alice/datasegment/CompressedLocalDataSegmentManager.java	Fri May 01 18:19:16 2015 +0900
+++ b/src/main/java/alice/datasegment/CompressedLocalDataSegmentManager.java	Sun May 03 10:04:28 2015 +0900
@@ -111,6 +111,8 @@
         DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key);
         int seq = this.seq.getAndIncrement();
         Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
+        cmd.setCompressFlag(option.isCompress());
+
         dataSegmentKey.runCommand(cmd);
         if (logger.isDebugEnabled())
             logger.debug(cmd.getCommandString());
@@ -121,6 +123,8 @@
         DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key);
         int seq = this.seq.getAndIncrement();
         Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
+        cmd.setCompressFlag(option.isCompress());
+
         dataSegmentKey.runCommand(cmd);
         if (logger.isDebugEnabled())
             logger.debug(cmd.getCommandString());
--- a/src/main/java/alice/datasegment/CompressedRemoteDataSegmentManager.java	Fri May 01 18:19:16 2015 +0900
+++ b/src/main/java/alice/datasegment/CompressedRemoteDataSegmentManager.java	Sun May 03 10:04:28 2015 +0900
@@ -99,7 +99,9 @@
     public void take(Receiver receiver, CodeSegment cs, SendOption option) {
         int seq = this.seq.getAndIncrement();
         Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
+        cmd.setCompressFlag(option.isCompress());
         cmd.setQuickFlag(option.isQuick());
+
         seqHash.put(seq, cmd);
         if (option.isQuick()){
             connection.write(cmd);
@@ -114,7 +116,9 @@
     public void peek(Receiver receiver, CodeSegment cs, SendOption option) {
         int seq = this.seq.getAndIncrement();
         Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
+        cmd.setCompressFlag(option.isCompress());
         cmd.setQuickFlag(option.isQuick());
+
         seqHash.put(seq, cmd);
         if (option.isQuick()){
             connection.write(cmd);
--- a/src/main/java/alice/datasegment/DataSegmentKey.java	Fri May 01 18:19:16 2015 +0900
+++ b/src/main/java/alice/datasegment/DataSegmentKey.java	Sun May 03 10:04:28 2015 +0900
@@ -31,6 +31,7 @@
             dataList.add(dsv);
             // Process waiting peek and take commands
             for (Iterator<Command> iter = waitList.iterator(); iter.hasNext(); ) {
+                System.out.println("in DSkey PUTfor");
                 Command waitCmd = iter.next();
                 if (waitCmd.index < index) {
                     System.out.println("DSKey cmdFlag:" + cmd.getCompressFlag());
@@ -51,7 +52,8 @@
             boolean waitFlag2 = true;
             for (DataSegmentValue data : dataList) {
                 if (data.index > cmd.index) {
-                    replyValue(cmd, data);
+                    replyValue(cmd, data, cmd.getCompressFlag());
+                    //replyValue(cmd, data);
                     waitFlag2 = false;
                     break;
                 }
@@ -89,7 +91,7 @@
 
     public void replyValue(Command cmd, DataSegmentValue data){
         Command rCmd = new Command(CommandType.REPLY, null, null, data.rData, data.index, cmd.seq, null, null, data.from);
-        cmd.setCompressFlag(true);
+
         if (cmd.cs!=null){ // if cmd has cs-instance, it means Command from local.
             cmd.cs.ids.reply(cmd.receiver, rCmd);
         } else {
@@ -108,6 +110,7 @@
     public void replyValue(Command cmd, DataSegmentValue data, boolean cFlag){
         Command rCmd = new Command(CommandType.REPLY, null, null, data.rData, data.index, cmd.seq, null, null, data.from);
         rCmd.setCompressFlag(cFlag);
+
         if (cmd.cs!=null){ // if cmd has cs-instance, it means Command from local.
             cmd.cs.ids.reply(cmd.receiver, rCmd);
         } else {
--- a/src/main/java/alice/datasegment/ReceiveData.java	Fri May 01 18:19:16 2015 +0900
+++ b/src/main/java/alice/datasegment/ReceiveData.java	Sun May 03 10:04:28 2015 +0900
@@ -2,9 +2,11 @@
 
 import java.io.*;
 import java.nio.ByteBuffer;
+import java.util.Collection;
 import java.util.LinkedList;
 import java.util.zip.*;
 
+import javassist.bytecode.ByteArray;
 import org.msgpack.MessagePack;
 import org.msgpack.type.Value;
 
@@ -18,6 +20,7 @@
     private Object val;//for Object DS
     private byte[] messagePack;//for byteArray(serialized) DS
     private byte[] zMessagePack;//for byteArray(compressed) DS
+    private int dataSize;
     private Class<?> clazz;
 
     public long time;//測定用
@@ -42,7 +45,9 @@
      *
      * @param messagePack DS本体(byteArray)
      */
-    public ReceiveData(byte[] messagePack, boolean compressed) {
+    public ReceiveData(byte[] messagePack, boolean compressed, int datasize) {
+        this.dataSize = datasize;
+        System.out.println("rData datasize:" + datasize);
         if (compressed){
             this.zMessagePack = messagePack;
         } else {
@@ -109,7 +114,8 @@
             }
 
             if (zMessagePack != null && messagePack == null) {
-                messagePack = unzip(zMessagePack, 100);///ToDo:read header and set length
+                System.out.println("unzip datasize:" + dataSize);
+                messagePack = unzip(zMessagePack, dataSize);///ToDo:read header and set length
             }
 
             return packer.read(messagePack, clazz);
@@ -120,13 +126,51 @@
         }
     }
 
+    public byte[] getMessagePack(){
+        if (messagePack != null){
+            return messagePack;
+        } else {
+            try {
+                messagePack = packer.write(val);
+                System.out.print("make messagePack1: ");
+                for (int i = 0; i < messagePack.length; i++) {
+                    System.out.print(Integer.toHexString(messagePack[i] & 0xff));
+                }
+                System.out.print("\n");
+                System.out.println("mpLength:" + messagePack.length);
+                setDataSize(messagePack.length);
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
 
-    public int zip() throws IOException {
+            return messagePack;
+        }
+    }
+
+    public byte[] getZMessagePack(){
+        if (zMessagePack != null){
+            System.out.println("have zMessagePack");
+            return zMessagePack;
+        } else {
+            try {
+                zip();
+
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+
+            return zMessagePack;
+        }
+    }
+
+    public void zip() throws IOException {
+        System.out.println("in zip");
         LinkedList<ByteBuffer> inputs = new LinkedList<ByteBuffer>();
         int inputIndex = 0;
         LinkedList<ByteBuffer> outputs = new LinkedList<ByteBuffer>();
         Deflater deflater = new Deflater();
 
+        inputs.add(ByteBuffer.wrap(getMessagePack()));
         int len = 0;
         int INFLATE_BUFSIZE = 1024 * 100;//ToDo:fix
         ByteBuffer c1 = allocate(INFLATE_BUFSIZE);//for output
@@ -141,7 +185,7 @@
 
             int len1 = 0;
             do {
-                len1 = deflater.deflate(c1.array(), c1.position(), c1.remaining());///Bytearray for ziped data、start offset、length
+                len1 = deflater.deflate(c1.array(), c1.position(), c1.remaining());
                 if (len1 > 0) {
                     len += len1;
                     c1.position(c1.position() + len1);
@@ -151,28 +195,46 @@
                         c1 = allocate(INFLATE_BUFSIZE);
                     }
                 }
-            } while (len1 > 0 || !deflater.needsInput());//needsInput()...true if setInput is empty
+            } while (len1 > 0 || !deflater.needsInput());
         }
         if (c1.position() != 0) {
             c1.flip();
             outputs.addLast(c1);
         }
 
-        //zMessagePack = outputs.toArray();
         deflater.reset();
-        return len;///return length of ziped data
+
+        zMessagePack = new byte[len];
+        int tmp = 0;
+        for (int i = 0; i < outputs.size(); i++){
+            System.arraycopy(outputs.get(i).array(), 0, zMessagePack, 0 + tmp, outputs.get(i).limit());//limit? remaining?
+            tmp += outputs.get(i).limit();
+        }
+        System.out.println("in make zMessagePack1:" + zMessagePack);
+        System.out.print("in make zMessagePack2: ");
+        for (int i = 0; i < zMessagePack.length; i++) {
+            System.out.print(Integer.toHexString(zMessagePack[i] & 0xff));
+        }
+        System.out.print("\n");
     }
 
     protected byte[] unzip(byte[] input, int zippedLength) {///read header & unzip
         int length = input.length;
         Inflater inflater = new Inflater();
 
+        System.out.print("unziped input: ");
+        for (int i = 0; i < input.length; i++) {
+            System.out.print(Integer.toHexString(input[i] & 0xff));
+        }
+        System.out.print("\n");
+
         byte [] output = new byte [zippedLength];///byteArray for unziped data
         inflater.setInput(input, 0, length);///set unzip data without header
 
         try {
             System.out.println("return:" + inflater.inflate(output, 0, zippedLength));///unzip
         } catch (DataFormatException e) {
+            System.out.println("unzip exception:" + e.toString());
             e.printStackTrace();
         }
 
@@ -208,16 +270,6 @@
         return b;
     }
 
-    public byte[] asByteArray() throws IOException{///ToDo : delete
-        ByteArrayOutputStream buff = new ByteArrayOutputStream();
-        ObjectOutput out = new ObjectOutputStream(buff);
-        out.writeObject(this.val);
-        byte[] bytes = buff.toByteArray();
-        out.close();
-        buff.close();
-        return bytes;
-    }
-
     public static int byteArrayToInt(byte[] b)
     {
         return   b[3] & 0xFF |
@@ -236,4 +288,12 @@
         };
     }
 
+    public int getDataSize(){
+        return this.dataSize;
+    }
+
+    public void setDataSize(int datasize){
+        this.dataSize = datasize;
+    }
+
 }