# HG changeset patch # User Nozomi Teruya # Date 1430636311 -32400 # Node ID b6049fb123d814b25bd2ba3e252c60016024e3fa # Parent 4aeebea0c9b54dc13acb933091112f1425d065b5 resolve unzip, working TestRemoteAlice diff -r 4aeebea0c9b5 -r b6049fb123d8 src/main/java/alice/daemon/IncomingTcpConnection.java --- a/src/main/java/alice/daemon/IncomingTcpConnection.java Sun May 03 10:04:28 2015 +0900 +++ b/src/main/java/alice/daemon/IncomingTcpConnection.java Sun May 03 15:58:31 2015 +0900 @@ -6,7 +6,6 @@ import alice.datasegment.*; import org.msgpack.MessagePack; -import org.msgpack.type.Value; import org.msgpack.unpacker.Unpacker; import alice.topology.manager.keeparive.RespondData; @@ -61,7 +60,7 @@ case UPDATE: case PUT: System.out.println("in TCP PUT"); - rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()), msg.compressed, msg.dataSize);///read rData + rData = new ReceiveData(packer.read(unpacker.getSerializedByteArray(unpacker.readInt()), byte[].class), msg.compressed, msg.dataSize);///read rData if (msg.setTime) { rData.setTime = true; @@ -81,7 +80,7 @@ break; case PEEK: case TAKE: - System.out.println("in TCP TAKE:" + msg.compressed); + System.out.println("in TCP TAKE"); cmd = new Command(type, null, null, null, msg.index, msg.seq, null, null, connection); cmd.setCompressFlag(msg.compressed); @@ -97,19 +96,10 @@ lmanager.getDataSegmentKey(msg.key).runCommand(cmd);//ToDo:fix break; case REPLY: - System.out.println("in TCP REPLY:"); - System.out.println("After DataSize:" + msg.dataSize); + System.out.println("in TCP REPLY"); cmd = manager.getAndRemoveCmd(msg.seq); - byte[] unpack = unpacker.getSerializedByteArray(unpacker.readInt()); - //Value unpack2 = packer.read(byte []); - System.out.print("REPLY unpacker: "); - for (int i = 0; i < unpack.length; i++) { - System.out.print(Integer.toHexString(unpack[i] & 0xff)); - } - System.out.print("\n"); - - rData = new ReceiveData(unpack, msg.compressed, msg.dataSize); + rData = new ReceiveData(packer.read(unpacker.getSerializedByteArray(unpacker.readInt()), byte[].class), msg.compressed, msg.dataSize); Command rCmd = new Command(type, null, null, rData, msg.index, msg.seq, null, null, ""); cmd.setCompressFlag(msg.compressed); diff -r 4aeebea0c9b5 -r b6049fb123d8 src/main/java/alice/datasegment/Command.java --- a/src/main/java/alice/datasegment/Command.java Sun May 03 10:04:28 2015 +0900 +++ b/src/main/java/alice/datasegment/Command.java Sun May 03 15:58:31 2015 +0900 @@ -94,9 +94,7 @@ case UPDATE: case PUT: case REPLY: - System.out.println("Command reply compressFlag:" + compressFlag); if(compressFlag){ - System.out.println("Command get zMP:" + rData.getZMessagePack()); data = packer.write(rData.getZMessagePack()); compressed = true; } else { @@ -104,7 +102,6 @@ serialized = true; } - System.out.println("Before DataSize:" + rData.getDataSize()); CommandMessage cm = new CommandMessage(type.id, index, seq, key, false, serialized, compressed, rData.getDataSize()); if (rData.setTime) { cm.setTime = true; @@ -112,12 +109,6 @@ cm.depth = rData.depth + 1; } - System.out.print("Command packer: "); - for (int i = 0; i < data.length; i++) { - System.out.print(Integer.toHexString(data[i] & 0xff)); - } - System.out.print("\n"); - header = packer.write(cm); dataSize = packer.write(data.length); buf = ByteBuffer.allocate(header.length+dataSize.length+data.length); @@ -126,7 +117,6 @@ buf.put(data); break; default: - System.out.println("default compressFlag:" + compressFlag); header = packer.write(new CommandMessage(type.id, index, seq, key, quickFlag, false, compressFlag, 0)); buf = ByteBuffer.allocate(header.length); buf.put(header); diff -r 4aeebea0c9b5 -r b6049fb123d8 src/main/java/alice/datasegment/DataSegmentKey.java --- a/src/main/java/alice/datasegment/DataSegmentKey.java Sun May 03 10:04:28 2015 +0900 +++ b/src/main/java/alice/datasegment/DataSegmentKey.java Sun May 03 15:58:31 2015 +0900 @@ -31,10 +31,8 @@ dataList.add(dsv); // Process waiting peek and take commands for (Iterator iter = waitList.iterator(); iter.hasNext(); ) { - System.out.println("in DSkey PUTfor"); Command waitCmd = iter.next(); if (waitCmd.index < index) { - System.out.println("DSKey cmdFlag:" + cmd.getCompressFlag()); replyValue(waitCmd, dsv, cmd.getCompressFlag()); iter.remove(); if (waitCmd.type == CommandType.TAKE) { // someone is waiting for this put or update command @@ -53,7 +51,6 @@ for (DataSegmentValue data : dataList) { if (data.index > cmd.index) { replyValue(cmd, data, cmd.getCompressFlag()); - //replyValue(cmd, data); waitFlag2 = false; break; } @@ -70,9 +67,7 @@ for (Iterator iter = dataList.iterator(); iter.hasNext(); ) { DataSegmentValue data = iter.next(); if (data.index > cmd.index) { - System.out.println("DSKey2 cmdFlag:" + cmd.getCompressFlag()); replyValue(cmd, data, cmd.getCompressFlag()); - //replyValue(cmd, data); iter.remove(); waitFlag = false; break; @@ -89,24 +84,6 @@ } - public void replyValue(Command cmd, DataSegmentValue data){ - Command rCmd = new Command(CommandType.REPLY, null, null, data.rData, data.index, cmd.seq, null, null, data.from); - - if (cmd.cs!=null){ // if cmd has cs-instance, it means Command from local. - cmd.cs.ids.reply(cmd.receiver, rCmd); - } else { - try { - if (!cmd.getQuickFlag()) { - cmd.connection.sendQueue.put(rCmd); - } else { - cmd.connection.write(rCmd); - } - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - public void replyValue(Command cmd, DataSegmentValue data, boolean cFlag){ Command rCmd = new Command(CommandType.REPLY, null, null, data.rData, data.index, cmd.seq, null, null, data.from); rCmd.setCompressFlag(cFlag); diff -r 4aeebea0c9b5 -r b6049fb123d8 src/main/java/alice/datasegment/ReceiveData.java --- a/src/main/java/alice/datasegment/ReceiveData.java Sun May 03 10:04:28 2015 +0900 +++ b/src/main/java/alice/datasegment/ReceiveData.java Sun May 03 15:58:31 2015 +0900 @@ -2,17 +2,13 @@ import java.io.*; import java.nio.ByteBuffer; -import java.util.Collection; import java.util.LinkedList; import java.util.zip.*; -import javassist.bytecode.ByteArray; import org.msgpack.MessagePack; import org.msgpack.type.Value; -import static java.util.Arrays.*; - /** * 送られてきたDSを一時的に取っておくクラス。inputでも使用。 */ @@ -27,7 +23,6 @@ public boolean setTime = false; public int depth = 1; - private ByteBuffer buf = null; private static final MessagePack packer = new MessagePack(); /** @@ -47,7 +42,6 @@ */ public ReceiveData(byte[] messagePack, boolean compressed, int datasize) { this.dataSize = datasize; - System.out.println("rData datasize:" + datasize); if (compressed){ this.zMessagePack = messagePack; } else { @@ -114,7 +108,6 @@ } if (zMessagePack != null && messagePack == null) { - System.out.println("unzip datasize:" + dataSize); messagePack = unzip(zMessagePack, dataSize);///ToDo:read header and set length } @@ -132,12 +125,6 @@ } else { try { messagePack = packer.write(val); - System.out.print("make messagePack1: "); - for (int i = 0; i < messagePack.length; i++) { - System.out.print(Integer.toHexString(messagePack[i] & 0xff)); - } - System.out.print("\n"); - System.out.println("mpLength:" + messagePack.length); setDataSize(messagePack.length); } catch (IOException e) { e.printStackTrace(); @@ -210,7 +197,6 @@ System.arraycopy(outputs.get(i).array(), 0, zMessagePack, 0 + tmp, outputs.get(i).limit());//limit? remaining? tmp += outputs.get(i).limit(); } - System.out.println("in make zMessagePack1:" + zMessagePack); System.out.print("in make zMessagePack2: "); for (int i = 0; i < zMessagePack.length; i++) { System.out.print(Integer.toHexString(zMessagePack[i] & 0xff)); @@ -232,9 +218,8 @@ inflater.setInput(input, 0, length);///set unzip data without header try { - System.out.println("return:" + inflater.inflate(output, 0, zippedLength));///unzip + inflater.inflate(output, 0, zippedLength);///unzip } catch (DataFormatException e) { - System.out.println("unzip exception:" + e.toString()); e.printStackTrace(); }