changeset 531:b6049fb123d8 dispose

resolve unzip, working TestRemoteAlice
author Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
date Sun, 03 May 2015 15:58:31 +0900
parents 4aeebea0c9b5
children 33f981dd91d2
files src/main/java/alice/daemon/IncomingTcpConnection.java src/main/java/alice/datasegment/Command.java src/main/java/alice/datasegment/DataSegmentKey.java src/main/java/alice/datasegment/ReceiveData.java
diffstat 4 files changed, 5 insertions(+), 63 deletions(-) [+]
line wrap: on
line diff
--- a/src/main/java/alice/daemon/IncomingTcpConnection.java	Sun May 03 10:04:28 2015 +0900
+++ b/src/main/java/alice/daemon/IncomingTcpConnection.java	Sun May 03 15:58:31 2015 +0900
@@ -6,7 +6,6 @@
 
 import alice.datasegment.*;
 import org.msgpack.MessagePack;
-import org.msgpack.type.Value;
 import org.msgpack.unpacker.Unpacker;
 
 import alice.topology.manager.keeparive.RespondData;
@@ -61,7 +60,7 @@
                 case UPDATE:
                 case PUT:
                     System.out.println("in TCP PUT");
-                    rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()), msg.compressed, msg.dataSize);///read rData
+                    rData = new ReceiveData(packer.read(unpacker.getSerializedByteArray(unpacker.readInt()), byte[].class), msg.compressed, msg.dataSize);///read rData
 
                     if (msg.setTime) {
                         rData.setTime = true;
@@ -81,7 +80,7 @@
                     break;
                 case PEEK:
                 case TAKE:
-                    System.out.println("in TCP TAKE:" + msg.compressed);
+                    System.out.println("in TCP TAKE");
                     cmd = new Command(type, null, null, null, msg.index, msg.seq, null, null, connection);
                     cmd.setCompressFlag(msg.compressed);
 
@@ -97,19 +96,10 @@
                     lmanager.getDataSegmentKey(msg.key).runCommand(cmd);//ToDo:fix
                     break;
                 case REPLY:
-                    System.out.println("in TCP REPLY:");
-                    System.out.println("After DataSize:" + msg.dataSize);
+                    System.out.println("in TCP REPLY");
                     cmd = manager.getAndRemoveCmd(msg.seq);
-                    byte[] unpack = unpacker.getSerializedByteArray(unpacker.readInt());
-                    //Value unpack2 = packer.read(byte []);
-                    System.out.print("REPLY unpacker: ");
-                    for (int i = 0; i < unpack.length; i++) {
-                        System.out.print(Integer.toHexString(unpack[i] & 0xff));
-                    }
-                    System.out.print("\n");
 
-
-                    rData = new ReceiveData(unpack, msg.compressed, msg.dataSize);
+                    rData = new ReceiveData(packer.read(unpacker.getSerializedByteArray(unpacker.readInt()), byte[].class), msg.compressed, msg.dataSize);
 
                     Command rCmd = new Command(type, null, null, rData, msg.index, msg.seq, null, null, "");
                     cmd.setCompressFlag(msg.compressed);
--- a/src/main/java/alice/datasegment/Command.java	Sun May 03 10:04:28 2015 +0900
+++ b/src/main/java/alice/datasegment/Command.java	Sun May 03 15:58:31 2015 +0900
@@ -94,9 +94,7 @@
                 case UPDATE:
                 case PUT:
                 case REPLY:
-                    System.out.println("Command reply compressFlag:" + compressFlag);
                     if(compressFlag){
-                        System.out.println("Command get zMP:" + rData.getZMessagePack());
                         data = packer.write(rData.getZMessagePack());
                         compressed = true;
                     } else {
@@ -104,7 +102,6 @@
                         serialized = true;
                     }
 
-                    System.out.println("Before DataSize:" + rData.getDataSize());
                     CommandMessage cm = new CommandMessage(type.id, index, seq, key, false, serialized, compressed, rData.getDataSize());
                     if (rData.setTime) {
                         cm.setTime = true;
@@ -112,12 +109,6 @@
                         cm.depth = rData.depth + 1;
                     }
 
-                    System.out.print("Command packer: ");
-                    for (int i = 0; i < data.length; i++) {
-                        System.out.print(Integer.toHexString(data[i] & 0xff));
-                    }
-                    System.out.print("\n");
-
                     header = packer.write(cm);
                     dataSize = packer.write(data.length);
                     buf = ByteBuffer.allocate(header.length+dataSize.length+data.length);
@@ -126,7 +117,6 @@
                     buf.put(data);
                     break;
                 default:
-                    System.out.println("default compressFlag:" + compressFlag);
                     header = packer.write(new CommandMessage(type.id, index, seq, key, quickFlag, false, compressFlag, 0));
                     buf = ByteBuffer.allocate(header.length);
                     buf.put(header);
--- a/src/main/java/alice/datasegment/DataSegmentKey.java	Sun May 03 10:04:28 2015 +0900
+++ b/src/main/java/alice/datasegment/DataSegmentKey.java	Sun May 03 15:58:31 2015 +0900
@@ -31,10 +31,8 @@
             dataList.add(dsv);
             // Process waiting peek and take commands
             for (Iterator<Command> iter = waitList.iterator(); iter.hasNext(); ) {
-                System.out.println("in DSkey PUTfor");
                 Command waitCmd = iter.next();
                 if (waitCmd.index < index) {
-                    System.out.println("DSKey cmdFlag:" + cmd.getCompressFlag());
                     replyValue(waitCmd, dsv, cmd.getCompressFlag());
                     iter.remove();
                     if (waitCmd.type == CommandType.TAKE) { // someone is waiting for this put or update command
@@ -53,7 +51,6 @@
             for (DataSegmentValue data : dataList) {
                 if (data.index > cmd.index) {
                     replyValue(cmd, data, cmd.getCompressFlag());
-                    //replyValue(cmd, data);
                     waitFlag2 = false;
                     break;
                 }
@@ -70,9 +67,7 @@
             for (Iterator<DataSegmentValue> iter = dataList.iterator(); iter.hasNext(); ) {
                 DataSegmentValue data = iter.next();
                 if (data.index > cmd.index) {
-                    System.out.println("DSKey2 cmdFlag:" + cmd.getCompressFlag());
                     replyValue(cmd, data, cmd.getCompressFlag());
-                    //replyValue(cmd, data);
                     iter.remove();
                     waitFlag = false;
                     break;
@@ -89,24 +84,6 @@
 
     }
 
-    public void replyValue(Command cmd, DataSegmentValue data){
-        Command rCmd = new Command(CommandType.REPLY, null, null, data.rData, data.index, cmd.seq, null, null, data.from);
-
-        if (cmd.cs!=null){ // if cmd has cs-instance, it means Command from local.
-            cmd.cs.ids.reply(cmd.receiver, rCmd);
-        } else {
-            try {
-                if (!cmd.getQuickFlag()) {
-                    cmd.connection.sendQueue.put(rCmd);
-                } else {
-                    cmd.connection.write(rCmd);
-                }
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
-        }
-    }
-
     public void replyValue(Command cmd, DataSegmentValue data, boolean cFlag){
         Command rCmd = new Command(CommandType.REPLY, null, null, data.rData, data.index, cmd.seq, null, null, data.from);
         rCmd.setCompressFlag(cFlag);
--- a/src/main/java/alice/datasegment/ReceiveData.java	Sun May 03 10:04:28 2015 +0900
+++ b/src/main/java/alice/datasegment/ReceiveData.java	Sun May 03 15:58:31 2015 +0900
@@ -2,17 +2,13 @@
 
 import java.io.*;
 import java.nio.ByteBuffer;
-import java.util.Collection;
 import java.util.LinkedList;
 import java.util.zip.*;
 
-import javassist.bytecode.ByteArray;
 import org.msgpack.MessagePack;
 import org.msgpack.type.Value;
 
 
-import static java.util.Arrays.*;
-
 /**
  * 送られてきたDSを一時的に取っておくクラス。inputでも使用。
  */
@@ -27,7 +23,6 @@
     public boolean setTime = false;
     public int depth = 1;
 
-    private ByteBuffer buf = null;
     private static final MessagePack packer = new MessagePack();
 
     /**
@@ -47,7 +42,6 @@
      */
     public ReceiveData(byte[] messagePack, boolean compressed, int datasize) {
         this.dataSize = datasize;
-        System.out.println("rData datasize:" + datasize);
         if (compressed){
             this.zMessagePack = messagePack;
         } else {
@@ -114,7 +108,6 @@
             }
 
             if (zMessagePack != null && messagePack == null) {
-                System.out.println("unzip datasize:" + dataSize);
                 messagePack = unzip(zMessagePack, dataSize);///ToDo:read header and set length
             }
 
@@ -132,12 +125,6 @@
         } else {
             try {
                 messagePack = packer.write(val);
-                System.out.print("make messagePack1: ");
-                for (int i = 0; i < messagePack.length; i++) {
-                    System.out.print(Integer.toHexString(messagePack[i] & 0xff));
-                }
-                System.out.print("\n");
-                System.out.println("mpLength:" + messagePack.length);
                 setDataSize(messagePack.length);
             } catch (IOException e) {
                 e.printStackTrace();
@@ -210,7 +197,6 @@
             System.arraycopy(outputs.get(i).array(), 0, zMessagePack, 0 + tmp, outputs.get(i).limit());//limit? remaining?
             tmp += outputs.get(i).limit();
         }
-        System.out.println("in make zMessagePack1:" + zMessagePack);
         System.out.print("in make zMessagePack2: ");
         for (int i = 0; i < zMessagePack.length; i++) {
             System.out.print(Integer.toHexString(zMessagePack[i] & 0xff));
@@ -232,9 +218,8 @@
         inflater.setInput(input, 0, length);///set unzip data without header
 
         try {
-            System.out.println("return:" + inflater.inflate(output, 0, zippedLength));///unzip
+            inflater.inflate(output, 0, zippedLength);///unzip
         } catch (DataFormatException e) {
-            System.out.println("unzip exception:" + e.toString());
             e.printStackTrace();
         }