# HG changeset patch # User Nozomi Teruya # Date 1430385242 -32400 # Node ID bfec2c3ff1b84e6dea2928eeb4ea43e812f14082 # Parent 928907206d2189d848344651a8ed58420ec8cd95 change unzip diff -r 928907206d21 -r bfec2c3ff1b8 src/main/java/alice/codesegment/InputDataSegment.java --- a/src/main/java/alice/codesegment/InputDataSegment.java Sat Apr 18 19:09:15 2015 +0900 +++ b/src/main/java/alice/codesegment/InputDataSegment.java Thu Apr 30 18:14:02 2015 +0900 @@ -34,19 +34,21 @@ cs.register(receiver); if (receiver.compressedFlag){ + SendOption option = new SendOption(true, true); if (receiver.managerKey == null){//localの場合 - DataSegment.getCompressedLocal().peek(receiver, cs, true); + DataSegment.getCompressedLocal().peek(receiver, cs, option); } else { if (DataSegment.contains(receiver.managerKey)) {//remoteの場合 - DataSegment.get(receiver.managerKey + "!").peek(receiver, cs, true); + DataSegment.get(receiver.managerKey + "!").peek(receiver, cs, option); } } } else { + SendOption option = new SendOption(true, false); if (receiver.managerKey == null){ - DataSegment.getLocal().peek(receiver, cs, true); + DataSegment.getLocal().peek(receiver, cs, option); } else { if (DataSegment.contains(receiver.managerKey)) { - DataSegment.get(receiver.managerKey).peek(receiver, cs, true); + DataSegment.get(receiver.managerKey).peek(receiver, cs, option); } } } @@ -58,19 +60,21 @@ cs.register(receiver); if (receiver.compressedFlag){ + SendOption option = new SendOption(false, true); if (receiver.managerKey==null){ - DataSegment.getCompressedLocal().peek(receiver, cs, false); + DataSegment.getCompressedLocal().peek(receiver, cs, option); } else { if (DataSegment.contains(receiver.managerKey)) { - DataSegment.get(receiver.managerKey + "!").peek(receiver, cs, false); + DataSegment.get(receiver.managerKey + "!").peek(receiver, cs, option); } } } else { + SendOption option = new SendOption(false, false); if (receiver.managerKey==null){ - DataSegment.getLocal().peek(receiver, cs, false); + DataSegment.getLocal().peek(receiver, cs, option); } else { if (DataSegment.contains(receiver.managerKey)) { - DataSegment.get(receiver.managerKey).peek(receiver, cs, false); + DataSegment.get(receiver.managerKey).peek(receiver, cs, option); } } } @@ -81,19 +85,21 @@ cs.register(receiver); if (receiver.compressedFlag){ + SendOption option = new SendOption(true, true); if (receiver.managerKey==null){ - DataSegment.getCompressedLocal().take(receiver, cs, true); + DataSegment.getCompressedLocal().take(receiver, cs, option); } else { if (DataSegment.contains(receiver.managerKey)) { - DataSegment.get(receiver.managerKey + "!").take(receiver, cs, true); + DataSegment.get(receiver.managerKey + "!").take(receiver, cs, option); } } } else { + SendOption option = new SendOption(true, false); if (receiver.managerKey==null){ - DataSegment.getLocal().take(receiver, cs, true); + DataSegment.getLocal().take(receiver, cs, option); } else { if (DataSegment.contains(receiver.managerKey)) { - DataSegment.get(receiver.managerKey).take(receiver, cs, true); + DataSegment.get(receiver.managerKey).take(receiver, cs, option); } } } @@ -104,19 +110,21 @@ cs.register(receiver); if (receiver.compressedFlag){ + SendOption option = new SendOption(false, true); if (receiver.managerKey==null){// 指定なしの場合デフォはローカルになる - DataSegment.getCompressedLocal().take(receiver, cs, false); + DataSegment.getCompressedLocal().take(receiver, cs, option); } else { if (DataSegment.contains(receiver.managerKey)) { - DataSegment.get(receiver.managerKey + "!").take(receiver, cs, false, true); + DataSegment.get(receiver.managerKey + "!").take(receiver, cs, option); } } } else { + SendOption option = new SendOption(false, false); if (receiver.managerKey==null){ - DataSegment.getLocal().take(receiver, cs, false); + DataSegment.getLocal().take(receiver, cs, option); } else { if (DataSegment.contains(receiver.managerKey)) { - DataSegment.get(receiver.managerKey).take(receiver, cs, false); + DataSegment.get(receiver.managerKey).take(receiver, cs, option); } } } diff -r 928907206d21 -r bfec2c3ff1b8 src/main/java/alice/codesegment/OutputDataSegment.java --- a/src/main/java/alice/codesegment/OutputDataSegment.java Sat Apr 18 19:09:15 2015 +0900 +++ b/src/main/java/alice/codesegment/OutputDataSegment.java Thu Apr 30 18:14:02 2015 +0900 @@ -18,9 +18,9 @@ */ public void flip(Receiver receiver) { if (receiver.isCompressed()){ - DataSegment.getCompressedLocal().put(receiver.key, receiver.getReceiveData(), false); + DataSegment.getCompressedLocal().put(receiver.key, receiver.getReceiveData(), null); } else { - DataSegment.getLocal().put(receiver.key, receiver.getReceiveData(), false); + DataSegment.getLocal().put(receiver.key, receiver.getReceiveData(), null); } } @@ -28,16 +28,16 @@ switch (type) { case PUT: if (receiver.isCompressed()){ - DataSegment.getCompressedLocal().put(receiver.key, receiver.getReceiveData(), false);//localなら全部false。 + DataSegment.getCompressedLocal().put(receiver.key, receiver.getReceiveData(), null);//localなら全部false。 } else { - DataSegment.getLocal().put(receiver.key, receiver.getReceiveData(), false); + DataSegment.getLocal().put(receiver.key, receiver.getReceiveData(), null); } break; case UPDATE: if (receiver.isCompressed()){ - DataSegment.getCompressedLocal().update(receiver.key, receiver.getReceiveData(), false); + DataSegment.getCompressedLocal().update(receiver.key, receiver.getReceiveData(), null); } else { - DataSegment.getLocal().update(receiver.key, receiver.getReceiveData(), false); + DataSegment.getLocal().update(receiver.key, receiver.getReceiveData(), null); } break; @@ -47,29 +47,27 @@ } public void put(String key, ReceiveData rData) { - DataSegment.getLocal().put(key, rData, false); + DataSegment.getLocal().put(key, rData, null); } public void put(String key, Object val) { ReceiveData rData = new ReceiveData(val); - DataSegment.getLocal().put(key, rData, false); + DataSegment.getLocal().put(key, rData, new SendOption(false, false)); } - public void put(String key, Object val, boolean cFlag) {//追加 + public void put(String key, Object val, boolean cFlag) {///追加 ReceiveData rData = new ReceiveData(val); - rData.setCompressFlag(cFlag); - DataSegment.getLocal().put(key, rData, false); + DataSegment.getLocal().put(key, rData, new SendOption(false, cFlag)); } public void update(String key, Object val) { ReceiveData rData = new ReceiveData(val); - DataSegment.getLocal().update(key, rData, false); + DataSegment.getLocal().update(key, rData, new SendOption(false, false)); } - public void update(String key, Object val, boolean cFlag) {//追加 + public void update(String key, Object val, boolean cFlag) {///追加 ReceiveData rData = new ReceiveData(val); - rData.setCompressFlag(cFlag); - DataSegment.getLocal().update(key, rData, false); + DataSegment.getLocal().update(key, rData, new SendOption(false, cFlag)); } /** @@ -77,12 +75,14 @@ */ public void put(String managerKey, String key, ReceiveData rData) { if (!managerKey.equals("local") && DataSegment.contains(managerKey)){ + SendOption option = new SendOption(false, rData.compressed()); + //SendOption option = new SendOption(false, compressFlag); + if (rData.compressed()){ - DataSegment.get(managerKey + "!").put(key, rData, false); + DataSegment.get(managerKey + "!").put(key, rData, option); } else { - DataSegment.get(managerKey).put(key, rData, false); + DataSegment.get(managerKey).put(key, rData, option); } - } else { put(key, rData); } @@ -95,32 +95,31 @@ public void put(String managerKey, String key, Object val, boolean cFlag) {//追加 System.out.println("in PUT"); ReceiveData rData = new ReceiveData(val); - rData.setCompressFlag(cFlag); + SendOption option = new SendOption(false, cFlag); + //rData.setCompressFlag(cFlag); if (!managerKey.equals("local") && DataSegment.contains(managerKey)){///if remote if (cFlag){ - DataSegment.get(managerKey + "!").put(key, rData, false); + DataSegment.get(managerKey + "!").put(key, rData, option); } else { - DataSegment.get(managerKey).put(key, rData, false); + DataSegment.get(managerKey).put(key, rData, option); } } else {// if local if (cFlag){ - DataSegment.getCompressedLocal().put(key, rData, false); + DataSegment.getCompressedLocal().put(key, rData, option); } else { put(key, val); } } - - - } public void quickPut(String managerKey, String key, ReceiveData rData) { if (!managerKey.equals("local") && DataSegment.contains(managerKey)){ + SendOption option = new SendOption(true, false); if (rData.compressed()){ - DataSegment.get(managerKey + "!").put(key, rData, true); + DataSegment.get(managerKey + "!").put(key, rData, option); } else { - DataSegment.get(managerKey).put(key, rData, true); + DataSegment.get(managerKey).put(key, rData, option); } } else { put(key, rData); @@ -134,11 +133,13 @@ public void quickPut(String managerKey, String key, Object val, boolean cFlag) {//追加 if (!managerKey.equals("local") && DataSegment.contains(managerKey)){ ReceiveData rData = new ReceiveData(val); - rData.setCompressFlag(cFlag); + SendOption option = new SendOption(true, cFlag); + //rData.setCompressFlag(cFlag); + if (cFlag){ - DataSegment.get(managerKey + "!").put(key, rData, true); + DataSegment.get(managerKey + "!").put(key, rData, option); } else { - DataSegment.get(managerKey).put(key, rData, true); + DataSegment.get(managerKey).put(key, rData, option); } } else { put(key, val); @@ -147,10 +148,11 @@ public void update(String managerKey, String key, ReceiveData rData) { if (!managerKey.equals("local") && DataSegment.contains(managerKey)){ + SendOption option = new SendOption(false, rData.compressed()); if (rData.compressed()){ - DataSegment.get(managerKey + "!").update(key, rData, false); + DataSegment.get(managerKey + "!").update(key, rData, option); } else { - DataSegment.get(managerKey).update(key, rData, false); + DataSegment.get(managerKey).update(key, rData, option); } } else { update(key, rData); @@ -162,25 +164,32 @@ } public void update(String managerKey, String key, Object val, boolean cFlag) {//追加 - if (!managerKey.equals("local") && DataSegment.contains(managerKey)){ - ReceiveData rData = new ReceiveData(val); - rData.setCompressFlag(cFlag); - if(cFlag){ - DataSegment.get(managerKey + "!").update(key, rData, false); + ReceiveData rData = new ReceiveData(val); + SendOption option = new SendOption(false, cFlag); + //rData.setCompressFlag(cFlag); + + if (!managerKey.equals("local") && DataSegment.contains(managerKey)){///if remote + if (cFlag){ + DataSegment.get(managerKey + "!").update(key, rData, option); } else { - DataSegment.get(managerKey).update(key, rData, false); + DataSegment.get(managerKey).update(key, rData, option); } - } else { - update(key, val); + } else {// if local + if (cFlag){ + DataSegment.getCompressedLocal().update(key, rData, option); + } else { + update(key, val); + } } } public void quickUpdate(String managerKey, String key, ReceiveData rData) { if (!managerKey.equals("local") && DataSegment.contains(managerKey)){ + SendOption option = new SendOption(true, rData.compressed()); if (rData.compressed()){ - DataSegment.get(managerKey + "!").update(key, rData, true); + DataSegment.get(managerKey + "!").update(key, rData, option); } else { - DataSegment.get(managerKey).update(key, rData, true); + DataSegment.get(managerKey).update(key, rData, option); } } else { update(key, rData); @@ -194,11 +203,12 @@ public void quickUpdate(String managerKey, String key, Object val, boolean cFlag) {//追加 if (!managerKey.equals("local") && DataSegment.contains(managerKey)){ ReceiveData rData = new ReceiveData(val); - rData.setCompressFlag(cFlag); + SendOption option = new SendOption(true, cFlag); + //rData.setCompressFlag(cFlag); if (cFlag){ - DataSegment.get(managerKey + "!").update(key, rData, true); + DataSegment.get(managerKey + "!").update(key, rData, option); } else { - DataSegment.get(managerKey).update(key, rData, true); + DataSegment.get(managerKey).update(key, rData, option); } } else { update(key, val); diff -r 928907206d21 -r bfec2c3ff1b8 src/main/java/alice/daemon/Connection.java --- a/src/main/java/alice/daemon/Connection.java Sat Apr 18 19:09:15 2015 +0900 +++ b/src/main/java/alice/daemon/Connection.java Thu Apr 30 18:14:02 2015 +0900 @@ -41,7 +41,8 @@ while (buffer.hasRemaining()) { socket.getChannel().write(buffer); } - } catch (Exception e) { } + } catch (Exception e) { + } } public void close(){ @@ -58,9 +59,11 @@ if (name!=null){ ConnectionInfo c = new ConnectionInfo(name, socket); ReceiveData rData = new ReceiveData(c); - DataSegment.getLocal().put("_DISCONNECT", rData, false); + DataSegment.getLocal().put("_DISCONNECT", rData, null); if (sendManager) { - DataSegment.get("manager").put("_DISCONNECTNODE", rData, false); + SendOption option = new SendOption(false, false); + DataSegment.get("manager").put("_DISCONNECTNODE", rData, option); + sendManager = false; } } diff -r 928907206d21 -r bfec2c3ff1b8 src/main/java/alice/daemon/IncomingTcpConnection.java --- a/src/main/java/alice/daemon/IncomingTcpConnection.java Sat Apr 18 19:09:15 2015 +0900 +++ b/src/main/java/alice/daemon/IncomingTcpConnection.java Thu Apr 30 18:14:02 2015 +0900 @@ -4,15 +4,10 @@ import java.io.IOException; import java.nio.channels.ClosedChannelException; +import alice.datasegment.*; import org.msgpack.unpacker.Unpacker; import alice.codesegment.SingletonMessage; -import alice.datasegment.Command; -import alice.datasegment.CommandType; -import alice.datasegment.DataSegment; -import alice.datasegment.DataSegmentManager; -import alice.datasegment.LocalDataSegmentManager; -import alice.datasegment.ReceiveData; import alice.topology.manager.keeparive.RespondData; public class IncomingTcpConnection extends Thread { @@ -104,7 +99,7 @@ System.out.println("in TCP REPLY"); cmd = manager.getAndRemoveCmd(msg.seq); - rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt())); + rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()));//ここのコンストラクタでx rData.setCompressFlag(msg.compressed); Command rCmd = new Command(type, null, null, rData, msg.index, msg.seq, null, null, ""); @@ -116,7 +111,7 @@ break; case RESPONSE: rData = new ReceiveData(new RespondData(reverseKey, System.currentTimeMillis())); - DataSegment.getLocal().put(msg.key, rData, false); + DataSegment.getLocal().put(msg.key, rData, null); break; default: break; diff -r 928907206d21 -r bfec2c3ff1b8 src/main/java/alice/daemon/IncomingUdpConnection.java --- a/src/main/java/alice/daemon/IncomingUdpConnection.java Sat Apr 18 19:09:15 2015 +0900 +++ b/src/main/java/alice/daemon/IncomingUdpConnection.java Thu Apr 30 18:14:02 2015 +0900 @@ -88,7 +88,7 @@ break; case RESPONSE: rData = new ReceiveData(new RespondData(reverseKey, System.currentTimeMillis())); - DataSegment.getLocal().put(msg.key, rData, false); + DataSegment.getLocal().put(msg.key, rData, null); break; default: break; diff -r 928907206d21 -r bfec2c3ff1b8 src/main/java/alice/daemon/OutboundTcpConnection.java --- a/src/main/java/alice/daemon/OutboundTcpConnection.java Sat Apr 18 19:09:15 2015 +0900 +++ b/src/main/java/alice/daemon/OutboundTcpConnection.java Thu Apr 30 18:14:02 2015 +0900 @@ -27,7 +27,7 @@ default: break; } - connection.write(cmd); + connection.write(cmd);//ここでconvert()がよばれてる } catch (InterruptedException e) { e.printStackTrace(); } diff -r 928907206d21 -r bfec2c3ff1b8 src/main/java/alice/datasegment/Command.java --- a/src/main/java/alice/datasegment/Command.java Sat Apr 18 19:09:15 2015 +0900 +++ b/src/main/java/alice/datasegment/Command.java Thu Apr 30 18:14:02 2015 +0900 @@ -74,60 +74,42 @@ * @return serialized ByteBuffer */ public ByteBuffer convert() {//byteArrayに変換 + ByteBuffer buf = null; - MessagePack msg = SingletonMessage.getInstance(); - try { - byte[] header = null;//DSのメタデータ用byteArray - byte[] data = null;//DS本体用byteArray - byte[] dataSize = null;//DSのサイズ - - switch (type) { - /* - * UPDATE, PUT, REPLY need send DataSegment to RemoteDataSegment - * case UPDATE and PUT - * compress and serialize flag are selected by user, so if true, need convert. - * case REPLY - * these flags represent DataSegment status. - * for example, serializeFlag is true. DataSegment had already converted, so no need convert. - */ - case UPDATE: - case PUT: - case REPLY://ReceiveDataからREPLYするDSを取得 - System.out.println("in REPLY"); - - if (!rData.serialized()) {//純粋なオブジェクトの場合シリアライズ - data = msg.write(rData.getObj()); - } else { // rData is RAW ByteArray or already serialized - data = rData.asByteArray(); - } + switch (type) { + /* + * UPDATE, PUT, REPLY need send DataSegment to RemoteDataSegment + * case UPDATE and PUT + * compress and serialize flag are selected by user, so if true, need convert. + * case REPLY + * these flags represent DataSegment status. + * for example, serializeFlag is true. DataSegment had already converted, so no need convert. + */ - CommandMessage cm = new CommandMessage(type.id, index, seq, key, false, rData.serialized(), rData.compressed()); - if (rData.setTime) {//AliceVNCの計測用(消してもいい) - cm.setTime = true; - cm.time = rData.time; - cm.depth = rData.depth + 1; - } + case UPDATE: + System.out.println("update compressFlag:" + compressFlag); + rData.setCompressFlag(compressFlag); + break; + case PUT: + System.out.println("put compressFlag:" + compressFlag); + rData.setCompressFlag(compressFlag); + break; + case REPLY://ReceiveDataからREPLYするDSを取得 + System.out.println("in REPLY"); + System.out.println("reply compressFlag:" + compressFlag); + buf = rData.setMPHeader(new CommandMessage(type.id, index, seq, key, false, rData.serialized(), rData.compressed()), type); + rData.setCompressFlag(compressFlag); - //MessagePackでDSを作成(ヘッダー・データ本体のサイズ・データ本体) - header = msg.write(cm); - dataSize = msg.write(data.length); - buf = ByteBuffer.allocate(header.length+dataSize.length+data.length); - buf.put(header); - buf.put(dataSize); - buf.put(data); + break; + default://PEEK, TAKE, RemoveならCommandMessageとそのサイズのみセット + ReceiveData rData2 = new ReceiveData("hoge"); + System.out.println("default compressFlag:" + compressFlag); + buf = rData2.setMPHeader(new CommandMessage(type.id, index, seq, key, quickFlag, false, compressFlag), type); + rData2.setCompressFlag(compressFlag); - break; - default://PEEK, TAKE, RemoveならCommandMessageとそのサイズのみセット - header = msg.write(new CommandMessage(type.id, index, seq, key, quickFlag, false, compressFlag)); - buf = ByteBuffer.allocate(header.length); - buf.put(header); - break; - } + break; + } - buf.flip(); - } catch (IOException e) { - e.printStackTrace(); - }//ここに圧縮機能を入れる return buf; } diff -r 928907206d21 -r bfec2c3ff1b8 src/main/java/alice/datasegment/DataSegmentKey.java --- a/src/main/java/alice/datasegment/DataSegmentKey.java Sat Apr 18 19:09:15 2015 +0900 +++ b/src/main/java/alice/datasegment/DataSegmentKey.java Thu Apr 30 18:14:02 2015 +0900 @@ -33,7 +33,8 @@ for (Iterator iter = waitList.iterator(); iter.hasNext(); ) { Command waitCmd = iter.next(); if (waitCmd.index < index) { - replyValue(waitCmd, dsv); + System.out.println("DSKey cmdFlag:" + cmd.getCompressFlag()); + replyValue(waitCmd, dsv, cmd.getCompressFlag()); iter.remove(); if (waitCmd.type == CommandType.TAKE) { // someone is waiting for this put or update command dataList.remove(dsv); @@ -67,7 +68,9 @@ for (Iterator iter = dataList.iterator(); iter.hasNext(); ) { DataSegmentValue data = iter.next(); if (data.index > cmd.index) { - replyValue(cmd, data); + System.out.println("DSKey2 cmdFlag:" + cmd.getCompressFlag()); + replyValue(cmd, data, cmd.getCompressFlag()); + //replyValue(cmd, data); iter.remove(); waitFlag = false; break; @@ -86,6 +89,25 @@ public void replyValue(Command cmd, DataSegmentValue data){ Command rCmd = new Command(CommandType.REPLY, null, null, data.rData, data.index, cmd.seq, null, null, data.from); + cmd.setCompressFlag(true); + if (cmd.cs!=null){ // if cmd has cs-instance, it means Command from local. + cmd.cs.ids.reply(cmd.receiver, rCmd); + } else { + try { + if (!cmd.getQuickFlag()) { + cmd.connection.sendQueue.put(rCmd); + } else { + cmd.connection.write(rCmd); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + public void replyValue(Command cmd, DataSegmentValue data, boolean cFlag){ + Command rCmd = new Command(CommandType.REPLY, null, null, data.rData, data.index, cmd.seq, null, null, data.from); + rCmd.setCompressFlag(cFlag); if (cmd.cs!=null){ // if cmd has cs-instance, it means Command from local. cmd.cs.ids.reply(cmd.receiver, rCmd); } else { diff -r 928907206d21 -r bfec2c3ff1b8 src/main/java/alice/datasegment/DataSegmentManager.java --- a/src/main/java/alice/datasegment/DataSegmentManager.java Sat Apr 18 19:09:15 2015 +0900 +++ b/src/main/java/alice/datasegment/DataSegmentManager.java Thu Apr 30 18:14:02 2015 +0900 @@ -51,12 +51,10 @@ } //各コマンドの抽象クラス - public abstract void put(String key, ReceiveData rData, boolean quickFlag); - public abstract void update(String key, ReceiveData rData, boolean quickFlag); - public abstract void peek(Receiver receiver, CodeSegment cs, boolean quickFlag); - public abstract void take(Receiver receiver, CodeSegment cs, boolean quickFlag); - - public abstract void take(Receiver receiver, CodeSegment cs, boolean quickFlag, boolean compressFlag); + public abstract void put(String key, ReceiveData rData, SendOption option); + public abstract void update(String key, ReceiveData rData, SendOption option); + public abstract void peek(Receiver receiver, CodeSegment cs, SendOption option); + public abstract void take(Receiver receiver, CodeSegment cs, SendOption option); public abstract void remove(String key); public abstract void shutdown(); diff -r 928907206d21 -r bfec2c3ff1b8 src/main/java/alice/datasegment/LocalDataSegmentManager.java --- a/src/main/java/alice/datasegment/LocalDataSegmentManager.java Sat Apr 18 19:09:15 2015 +0900 +++ b/src/main/java/alice/datasegment/LocalDataSegmentManager.java Thu Apr 30 18:14:02 2015 +0900 @@ -75,52 +75,52 @@ } @Override - public void put(String key, ReceiveData rData, boolean quickFlag) { + public void put(String key, ReceiveData rData, SendOption option) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); - Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, reverseKey); + Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, reverseKey);//ここでMP変換している + cmd.setCompressFlag(option.getCompressFlag()); + rData.setCompressFlag(option.getCompressFlag()); + dataSegmentKey.runCommand(cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } - public void put(String key, ReceiveData rData) { - this.put(key, rData, false); - } - /** * Enqueue update command to the queue of each DataSegment key */ @Override - public void update(String key, ReceiveData rData, boolean quickFlag) { + public void update(String key, ReceiveData rData, SendOption option) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, reverseKey); + cmd.setCompressFlag(option.getCompressFlag()); + rData.setCompressFlag(option.getCompressFlag()); + dataSegmentKey.runCommand(cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } @Override - public void take(Receiver receiver, CodeSegment cs, boolean quickFlag) { - take(receiver, cs, quickFlag, false); - } - - @Override - public void take(Receiver receiver, CodeSegment cs, boolean quickFlag, boolean compressFlag) { + public void take(Receiver receiver, CodeSegment cs, SendOption option) { DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); - cmd.setCompressFlag(compressFlag); + cmd.setCompressFlag(option.getCompressFlag()); + dataSegmentKey.runCommand(cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } @Override - public void peek(Receiver receiver, CodeSegment cs, boolean quickFlag) { + public void peek(Receiver receiver, CodeSegment cs, SendOption option) { DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); + cmd.setCompressFlag(option.getCompressFlag()); + dataSegmentKey.runCommand(cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); diff -r 928907206d21 -r bfec2c3ff1b8 src/main/java/alice/datasegment/ReceiveData.java --- a/src/main/java/alice/datasegment/ReceiveData.java Sat Apr 18 19:09:15 2015 +0900 +++ b/src/main/java/alice/datasegment/ReceiveData.java Thu Apr 30 18:14:02 2015 +0900 @@ -3,13 +3,17 @@ import java.io.*; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.LinkedList; import java.util.zip.*; +import alice.daemon.CommandMessage; import org.apache.log4j.Logger; +import org.msgpack.MessagePack; import org.msgpack.type.Value; import alice.codesegment.SingletonMessage; import javax.xml.bind.DatatypeConverter; +import java.io.ByteArrayInputStream; import static java.util.Arrays.*; @@ -17,15 +21,20 @@ * 送られてきたDSを一時的に取っておくクラス。inputでも使用。 */ public class ReceiveData { - private Object val;//Object型のDS - private byte[] messagePack;//byteArray(serialized)のDS - private byte[] zMessagePack;//byteArray(compressed)のDS + private Object val;//for Object DS + private byte[] messagePack;//for byteArray(serialized) DS + private byte[] zMessagePack;//for byteArray(compressed) DS private Class clazz; public long time;//測定用 public boolean setTime = false; public int depth = 1; + private Deflater deflater = new Deflater(); + private Inflater inflater = new Inflater(); + private ByteBuffer buf = null; + private MessagePack msg = SingletonMessage.getInstance(); + /** * コンストラクタ。Object型のDSと圧縮のメタ情報を受け取る。 * put/update/reply用? @@ -66,28 +75,28 @@ return val == null; } - public Object getObj(){//Object型のDS本体を取得するメソッド。 + public Object getObj(){ return asClass(Object.class); } - public String asString(){//String型としてDSを取得するメソッド。DSがシリアライズされていればStringクラスを返す。 + public String asString(){ return asClass(String.class); } - public int asInteger(){//Int型としてDSを取得するメソッド。DSがシリアライズされていればIntクラスを返す。 + public int asInteger(){ return asClass(Integer.class); } - public Float asFloat(){//Float型としてDSを取得するメソッド。DSがシリアライズされていればFloatクラスを返す。 + public Float asFloat(){ return asClass(Float.class); } - public Value getVal(){//Value型としてDSを取得するメソッド - if (val == null){//val != null + public Value getVal(){///get DS as Value type + if (val == null){///val != null return asClass(Value.class); - } else {//ここに圧縮のときの処理入れるべきなのでは + } else { try { - return SingletonMessage.getInstance().unconvert(val);//MassagePackでvalue型に変換。できなければnullを返す。 + return SingletonMessage.getInstance().unconvert(val);///convert to Value type by MassagePack } catch (IOException e) { e.printStackTrace(); } @@ -104,7 +113,7 @@ * @param * @return */ - public T asClass(Class clazz) {//javasist + public T asClass(Class clazz) {///javasist System.out.println("in asClass val:" + val + ", MP:" + messagePack + ", zMP:" + zMessagePack); try { @@ -112,68 +121,70 @@ return (T) val; } - if (zMessagePack != null && messagePack == null) { - messagePack = unzip(copyOfRange(zMessagePack, 1, zMessagePack.length - 1));//先頭0xC1を削除したものを解凍 + if (zMessagePack != null && messagePack == null) {//ToDo:fix + messagePack = unzip(zMessagePack); + System.out.println("unzip messagePack:" + messagePack); + //zMessagePack = null;? } return SingletonMessage.getInstance().read(messagePack, clazz); - } catch (IOException | DataFormatException e) { + } catch (IOException e) {// | DataFormatException e e.printStackTrace(); return null; } } - /** - * java.util.zip.Inflater(zlib)を使ってbyteArray型のDSを解凍する。 - * - * @param input 圧縮されたbyteArray型のDS - * @return 解凍したbyteArray型DS - * @throws IOException - * @throws DataFormatException - */ - public byte[] unzip(byte[] input) throws IOException, DataFormatException{ - Inflater inflater = new Inflater(); - ByteArrayOutputStream os = new ByteArrayOutputStream(); - InflaterOutputStream ios = new InflaterOutputStream(os, inflater); - ios.write(input); - ios.finish(); - return os.toByteArray(); - } + public void setCompressFlag(boolean cFlag) {///compress + LinkedList input = new LinkedList(); + LinkedList output = new LinkedList(); - /** - * java.util.zip.Deflater(zlib)を使ってbyteArray型のDSを圧縮する。 - * - * @param input 非圧縮状態のbyteArray型のDS - * @return 圧縮したbyteArray型DS - * @throws IOException - */ - public byte[] zip(byte[] input) throws IOException{ - Deflater deflater = new Deflater(); - ByteArrayOutputStream os = new ByteArrayOutputStream(); - DeflaterOutputStream dos = new DeflaterOutputStream(os, deflater); - dos.write(input); - dos.finish(); - return os.toByteArray(); - } - - public void setCompressFlag(boolean cFlag) { if (cFlag){ - try { - messagePack = asByteArray(); - } catch (IOException e) { - e.printStackTrace(); + System.out.println("in setCompressFlag val:" + val + ", MP:" + messagePack + ", zMP:" + zMessagePack); + //messagePack = asByteArray();///ToDo:fix + if (val != null){ + try { + messagePack = msg.write(val); + } catch (IOException e) { + e.printStackTrace(); + } + } else { + messagePack = unzip(zMessagePack); } - - ByteBuffer buf = null; + System.out.println("no zip messagePack:" + messagePack); + System.out.print("no zip messagePack: "); + for (int i = 0; i < messagePack.length; i++) { + System.out.print(Integer.toHexString(messagePack[i] & 0xff)); + } + System.out.print("\n"); + System.out.println("no zip messagePack length:" + messagePack.length); try { - System.out.println("in zip"); - byte[] z = zip(messagePack); - buf = ByteBuffer.allocate(z.length + 1); - buf.put((byte) 0xc1); - buf.put(z); - zMessagePack = buf.array(); + //System.out.println("in zip"); + input.add(ByteBuffer.wrap(messagePack)); + int len = zip(input, 0, output); + + byte[] ziped = new byte[len + 8]; + ziped[0] = (byte) 0xc1;///set compressedFlag to header + ziped[1] = ziped[2] = ziped[3] = (byte) 0x00; + System.arraycopy(intToByteArray(messagePack.length), 0, ziped, 4, 4);///set data length to header + + System.out.println("zipedlen: " + len); + //System.out.println("limit: " + output.get(0).limit()); + //System.out.println("remaining: " + output.get(0).remaining()); + int tmp = 0; + for (int i = 0; i < output.size(); i++){///Is this copy OK??? + System.arraycopy(output.get(i).array(), 0, ziped, 8 + tmp, output.get(i).limit());//limit? remaining? + tmp += output.get(i).limit(); + } + + System.out.print("ziped: "); + for (int i = 0; i < ziped.length; i++) { + System.out.print(Integer.toHexString(ziped[i] & 0xff)); + } + System.out.print("\n"); + + zMessagePack = ziped; val = null; messagePack = null; } catch (IOException e) { @@ -182,12 +193,152 @@ } } - public Class getClaszz(){ - return clazz; + public ByteBuffer setMPHeader(CommandMessage cm, CommandType type){ + + System.out.println("in setMPHeader val:" + val + ", MP:" + messagePack + ", zMP:" + zMessagePack); + + try { + byte[] header = null;//DSのメタデータ用byteArray + byte[] data = null;//DS本体用byteArray + byte[] dataSize = null;//DSのサイズ + + if (type == CommandType.REPLY){ + if (val != null) {//純粋なオブジェクトの場合シリアライズ + data = msg.write(val); + System.out.print("header MP data: "); + for (int i = 0; i < data.length; i++) { + System.out.print(Integer.toHexString(data[i] & 0xff)); + } + System.out.print("\n"); + } else { // rData is RAW ByteArray or already serialized + data = messagePack; + } + + if (setTime) {//AliceVNCの計測用(消してもいい) + cm.setTime = true; + cm.time = time; + cm.depth = depth + 1; + } + + //MessagePackでDSを作成(ヘッダー・データ本体のサイズ・データ本体) + header = msg.write(cm); + dataSize = msg.write(data.length); + buf = ByteBuffer.allocate(header.length+dataSize.length+data.length); + buf.put(header); + buf.put(dataSize); + buf.put(data); + } else { + header = msg.write(cm); + buf = ByteBuffer.allocate(header.length); + buf.put(header); + } + + buf.flip(); + } catch (IOException e) { + e.printStackTrace(); + } + + messagePack = buf.array(); + + System.out.print("MP with header: "); + for (int i = 0; i < messagePack.length; i++) { + System.out.print(Integer.toHexString(messagePack[i] & 0xff)); + } + System.out.print("\n"); + + return buf; } - public byte[] asByteArray() throws IOException{ + public int zip(LinkedList inputs, int inputIndex, LinkedList outputs) throws IOException { + int len = 0; + int INFLATE_BUFSIZE = 1024 * 100; + ByteBuffer c1 = allocate(INFLATE_BUFSIZE);//for output + + while (inputIndex < inputs.size()) { + ByteBuffer b1 = inputs.get(inputIndex++); + deflater.setInput(b1.array(), b1.position(), b1.remaining()); + /** + * If we finish() stream and reset() it, Deflater start new gzip + * stream, this makes continuous zlib reader unhappy. if we remove + * finish(), Deflater.deflate() never flushes its output. The + * original zlib deflate has flush flag. I'm pretty sure this a kind + * of bug of Java library. + */ + if (inputIndex == inputs.size()){ + deflater.finish(); + } + + int len1 = 0; + do { + len1 = deflater.deflate(c1.array(), c1.position(), c1.remaining());///Bytearray for ziped data、start offset、length + if (len1 > 0) { + len += len1; + c1.position(c1.position() + len1); + if (c1.remaining() == 0) { + c1.flip(); + outputs.addLast(c1); + c1 = allocate(INFLATE_BUFSIZE); + } + } + } while (len1 > 0 || !deflater.needsInput());//needsInput()...true if setInput is empty + } + if (c1.position() != 0) { + c1.flip(); + outputs.addLast(c1); + } + deflater.reset(); + return len;///return length of ziped data + } + + protected byte[] unzip(byte[] input) {///read header & unzip + int length = input.length; + int zippedLength = byteArrayToInt(copyOfRange(input, 4, 8));///read header...Is this copy OK?? + + byte [] output = new byte [zippedLength];///byteArray for unziped data + inflater.setInput(input, 8, length - 8);///set unzip data without header + + try { + System.out.println("return:" + inflater.inflate(output, 0, zippedLength));///unzip + } catch (DataFormatException e) { + e.printStackTrace(); + } + + inflater.reset(); + + System.out.print("unziped: "); + for (int i = 0; i < output.length; i++) { + System.out.print(Integer.toHexString(output[i] & 0xff)); + } + System.out.print("\n"); + + return output; + } + + + public ByteBuffer allocate(int size) + { + ByteBuffer b = null; + while(true){ + try { + b = ByteBuffer.allocate(size); + } catch (OutOfMemoryError e) { + b = null; + System.err.println("multicastqueue : wait for heap : " + e); + } + if (b!=null) { + break; + } + try { + wait(); + } catch (InterruptedException e) { + System.out.println("thread has interrupted the current thread."); + } + } + return b; + } + + public byte[] asByteArray() throws IOException{///ToDo : delete ByteArrayOutputStream buff = new ByteArrayOutputStream(); ObjectOutput out = new ObjectOutputStream(buff); out.writeObject(this.val); @@ -197,4 +348,22 @@ return bytes; } + public static int byteArrayToInt(byte[] b) + { + return b[3] & 0xFF | + (b[2] & 0xFF) << 8 | + (b[1] & 0xFF) << 16 | + (b[0] & 0xFF) << 24; + } + + public static byte[] intToByteArray(int a) + { + return new byte[] { + (byte) ((a >> 24) & 0xFF), + (byte) ((a >> 16) & 0xFF), + (byte) ((a >> 8) & 0xFF), + (byte) (a & 0xFF) + }; + } + } diff -r 928907206d21 -r bfec2c3ff1b8 src/main/java/alice/datasegment/RemoteDataSegmentManager.java --- a/src/main/java/alice/datasegment/RemoteDataSegmentManager.java Sat Apr 18 19:09:15 2015 +0900 +++ b/src/main/java/alice/datasegment/RemoteDataSegmentManager.java Thu Apr 30 18:14:02 2015 +0900 @@ -58,10 +58,10 @@ * send put command to target DataSegment */ @Override - public void put(String key, ReceiveData rData, boolean quickFlag) { + public void put(String key, ReceiveData rData, SendOption option) { Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, "");///set compressed flag - //cmd.setCompressFlag(option.isCompress()); - if (quickFlag){ + cmd.setCompressFlag(option.getCompressFlag()); + if (option.getQuickFlag()){ connection.write(cmd); // put command is executed right now } else { connection.sendCommand(cmd); // put command on the transmission thread @@ -71,10 +71,10 @@ } @Override - public void update(String key, ReceiveData rData, boolean quickFlag) { + public void update(String key, ReceiveData rData, SendOption option) { Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, ""); - //cmd.setCompressFlag(option.isCompress()); - if (quickFlag){ + cmd.setCompressFlag(option.getCompressFlag()); + if (option.getQuickFlag()){ connection.write(cmd); } else { connection.sendCommand(cmd); @@ -84,18 +84,14 @@ } @Override - public void take(Receiver receiver, CodeSegment cs, boolean quickFlag) { - take(receiver, cs, quickFlag, false); - } - - public void take(Receiver receiver, CodeSegment cs, boolean quickFlag, boolean compressFlag) { + public void take(Receiver receiver, CodeSegment cs, SendOption option) { int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); - cmd.setQuickFlag(quickFlag); - cmd.setCompressFlag(compressFlag); + //cmd.setQuickFlag(option.getQuickFlag()); + cmd.setCompressFlag(option.getCompressFlag()); seqHash.put(seq, cmd); - if (quickFlag){ + if (option.getQuickFlag()){ connection.write(cmd); } else { connection.sendCommand(cmd); @@ -105,12 +101,14 @@ } @Override - public void peek(Receiver receiver, CodeSegment cs, boolean quickFlag) { + public void peek(Receiver receiver, CodeSegment cs, SendOption option) { int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); - cmd.setQuickFlag(quickFlag); + cmd.setQuickFlag(option.getQuickFlag()); + //cmd.setCompressFlag(option.getCompressFlag()); + seqHash.put(seq, cmd); - if (quickFlag){ + if (option.getQuickFlag()){ connection.write(cmd); } else { connection.sendCommand(cmd); diff -r 928907206d21 -r bfec2c3ff1b8 src/main/java/alice/datasegment/SendOption.java --- a/src/main/java/alice/datasegment/SendOption.java Sat Apr 18 19:09:15 2015 +0900 +++ b/src/main/java/alice/datasegment/SendOption.java Thu Apr 30 18:14:02 2015 +0900 @@ -1,30 +1,30 @@ package alice.datasegment; /** - * フラグを一時的に格納するだけ。たぶんリモート用。 + * フラグを一時的に格納するだけ。 */ public class SendOption { - private boolean quick = false; - private boolean compress = false; + private boolean quickFlag = false; + private boolean compressFlag = false; public SendOption(boolean qFlag, boolean cFlag) { - quick = qFlag; - compress = cFlag; + quickFlag = qFlag; + compressFlag = cFlag; } - public boolean isQuick() { - return quick; + public boolean getQuickFlag() { + return quickFlag; } - public void setQuick(boolean quick) { - this.quick = quick; + public void setQuickFlag(boolean quick) { + this.quickFlag = quick; } - public boolean isCompress() { - return compress; + public boolean getCompressFlag() { + return compressFlag; } - public void setCompress(boolean compress) { - this.compress = compress; + public void setCompressFlag(boolean compress) { + this.compressFlag = compress; } } diff -r 928907206d21 -r bfec2c3ff1b8 src/main/java/alice/test/codesegment/remote/RemoteStartCodeSegment.java --- a/src/main/java/alice/test/codesegment/remote/RemoteStartCodeSegment.java Sat Apr 18 19:09:15 2015 +0900 +++ b/src/main/java/alice/test/codesegment/remote/RemoteStartCodeSegment.java Thu Apr 30 18:14:02 2015 +0900 @@ -11,5 +11,4 @@ ods.put("local", "num", 0, true); } - -} +} \ No newline at end of file