changeset 458:bcf6f4a6fcd0 dispose

need set Meta DataSegment PUT API
author sugi
date Mon, 03 Nov 2014 17:12:53 +0900
parents 41d80f5e1fb2
children 4419a2415661
files src/main/java/alice/codesegment/InputDataSegment.java src/main/java/alice/codesegment/OutputDataSegment.java src/main/java/alice/daemon/Connection.java src/main/java/alice/daemon/IncomingTcpConnection.java src/main/java/alice/daemon/IncomingUdpConnection.java src/main/java/alice/datasegment/Command.java src/main/java/alice/datasegment/DataSegmentKey.java src/main/java/alice/datasegment/DataSegmentManager.java src/main/java/alice/datasegment/DataSegmentValue.java src/main/java/alice/datasegment/LocalDataSegmentManager.java src/main/java/alice/datasegment/Receiver.java src/main/java/alice/datasegment/RemoteDataSegmentManager.java
diffstat 12 files changed, 107 insertions(+), 84 deletions(-) [+]
line wrap: on
line diff
--- a/src/main/java/alice/codesegment/InputDataSegment.java	Mon Nov 03 13:21:36 2014 +0900
+++ b/src/main/java/alice/codesegment/InputDataSegment.java	Mon Nov 03 17:12:53 2014 +0900
@@ -5,7 +5,6 @@
 import alice.datasegment.Command;
 import alice.datasegment.CommandType;
 import alice.datasegment.DataSegment;
-import alice.datasegment.ReceiveData;
 import alice.datasegment.Receiver;
 import alice.datasegment.SendOption;
 
@@ -73,7 +72,7 @@
     public void reply(Receiver receiver, Command reply) {
         receiver.index = reply.index;
         receiver.from = reply.reverseKey;        
-        receiver.setData(new ReceiveData(reply.val, reply.getCompressFlag(), reply.getSerializeFlag()));
+        receiver.setData(reply.rData);
         receive();
     }
 
--- a/src/main/java/alice/codesegment/OutputDataSegment.java	Mon Nov 03 13:21:36 2014 +0900
+++ b/src/main/java/alice/codesegment/OutputDataSegment.java	Mon Nov 03 17:12:53 2014 +0900
@@ -2,6 +2,7 @@
 
 import alice.datasegment.CommandType;
 import alice.datasegment.DataSegment;
+import alice.datasegment.ReceiveData;
 import alice.datasegment.Receiver;
 import alice.datasegment.SendOption;
 
@@ -13,16 +14,16 @@
      */
 
     public void flip(Receiver receiver) {
-        DataSegment.getLocal().put(receiver.key, receiver.getObj(), null);
+        DataSegment.getLocal().put(receiver.key, receiver.getReceiveData(), null);
     }
 
     public void flip(Receiver receiver, CommandType type) {
         switch (type) {
-        case PUT: 
-            DataSegment.getLocal().put(receiver.key, receiver.getObj(), null);
+        case PUT:
+            DataSegment.getLocal().put(receiver.key, receiver.getReceiveData(), null);
             break;
         case UPDATE:
-            DataSegment.getLocal().update(receiver.key, receiver.getObj(), null);
+            DataSegment.getLocal().update(receiver.key, receiver.getReceiveData(), null);
             break;
         default:
             break;
@@ -30,11 +31,13 @@
     }
 
     public void put(String key, Object val) {
-        DataSegment.getLocal().put(key, val, null);
+        ReceiveData rData = new ReceiveData(val, false, false);
+        DataSegment.getLocal().put(key, rData, null);
     }
 
     public void update(String key, Object val) {
-        DataSegment.getLocal().update(key, val, null);
+        ReceiveData rData = new ReceiveData(val, false, false);
+        DataSegment.getLocal().update(key, rData, null);
     }
 
     /**
@@ -42,8 +45,9 @@
      */
     public void put(String managerKey, String key, Object val) {
         if (!managerKey.equals("local")){
+            ReceiveData rData = new ReceiveData(val, false, false);
             SendOption option = new SendOption(false, compressFlag());
-            DataSegment.get(managerKey).put(key, val, option);
+            DataSegment.get(managerKey).put(key, rData, option);
         } else {
             put(key, val);
         }
@@ -51,8 +55,9 @@
 
     public void quickPut(String managerKey, String key, Object val) {
         if (!managerKey.equals("local")){
+            ReceiveData rData = new ReceiveData(val, false, false);
             SendOption option = new SendOption(true, compressFlag());
-            DataSegment.get(managerKey).put(key, val, option);
+            DataSegment.get(managerKey).put(key, rData, option);
         } else {
             put(key, val);
         }
@@ -60,8 +65,9 @@
 
     public void update(String managerKey, String key, Object val) {
         if (!managerKey.equals("local")){
+            ReceiveData rData = new ReceiveData(val, false, false);
             SendOption option = new SendOption(false, compressFlag());
-            DataSegment.get(managerKey).update(key, val, option);
+            DataSegment.get(managerKey).update(key, rData, option);
         } else {
             update(key, val);
         }
@@ -69,8 +75,9 @@
 
     public void quickUpdate(String managerKey, String key, Object val) {
         if (!managerKey.equals("local")){
+            ReceiveData rData = new ReceiveData(val, false, false);
             SendOption option = new SendOption(true, compressFlag());
-            DataSegment.get(managerKey).update(key, val, option);
+            DataSegment.get(managerKey).update(key, rData, option);
         } else {
             update(key, val);
         }
--- a/src/main/java/alice/daemon/Connection.java	Mon Nov 03 13:21:36 2014 +0900
+++ b/src/main/java/alice/daemon/Connection.java	Mon Nov 03 17:12:53 2014 +0900
@@ -8,6 +8,7 @@
 
 import alice.datasegment.Command;
 import alice.datasegment.DataSegment;
+import alice.datasegment.ReceiveData;
 
 public class Connection {
 
@@ -62,8 +63,9 @@
     }
 
     public void putConnectionInfo() {
-        ConnectionInfo c = new ConnectionInfo(socket.getInetAddress().toString() ,socket.getPort());
-        DataSegment.getLocal().put("disconnect", c, null);
+        ConnectionInfo c = new ConnectionInfo(socket.getInetAddress().toString(), socket.getPort());
+        ReceiveData rData = new ReceiveData(c, false, false);
+        DataSegment.getLocal().put("disconnect", rData, null);
 
     }
 }
--- a/src/main/java/alice/daemon/IncomingTcpConnection.java	Mon Nov 03 13:21:36 2014 +0900
+++ b/src/main/java/alice/daemon/IncomingTcpConnection.java	Mon Nov 03 17:12:53 2014 +0900
@@ -13,6 +13,7 @@
 import alice.datasegment.DataSegment;
 import alice.datasegment.DataSegmentManager;
 import alice.datasegment.LocalDataSegmentManager;
+import alice.datasegment.ReceiveData;
 import alice.topology.manager.keeparive.RespondData;
 
 public class IncomingTcpConnection extends Thread {
@@ -52,17 +53,14 @@
         while (true) {
             try {
                 Command cmd = null;
-                byte[] val = null;
+                ReceiveData rData = null;
                 CommandMessage msg = unpacker.read(CommandMessage.class);
                 CommandType type = CommandType.getCommandTypeFromId(msg.type);
                 switch (type) {
                 case UPDATE:
                 case PUT:
-                    val = getSerializedByteArray(unpacker);
-                    cmd = new Command(type, null, null, val, 0, 0, null, null, reverseKey);
-                    // these flags express DataSegment status
-                    cmd.setCompressFlag(msg.compressed);
-                    cmd.setSerializeFlag(msg.serialized);
+                    rData = new ReceiveData(getSerializedByteArray(unpacker), msg.compressed, msg.serialized);
+                    cmd = new Command(type, null, null, rData, 0, 0, null, null, reverseKey);
                     lmanager.getDataSegmentKey(msg.key).runCommand(cmd);
                     break;
                 case PEEK:
@@ -77,8 +75,8 @@
                     break;
                 case REPLY:
                     cmd = manager.getAndRemoveCmd(msg.seq);
-                    val = getSerializedByteArray(unpacker);
-                    Command rCmd = new Command(type, null, null, val, msg.index, msg.seq, null, null, "");
+                    rData = new ReceiveData(getSerializedByteArray(unpacker), msg.compressed, msg.serialized);
+                    Command rCmd = new Command(type, null, null, rData, msg.index, msg.seq, null, null, "");
                     rCmd.setCompressFlag(msg.compressed);
                     rCmd.setSerializeFlag(msg.serialized);
                     cmd.cs.ids.reply(cmd.receiver, rCmd);
@@ -87,7 +85,8 @@
                     DataSegment.get(reverseKey).response(msg.key);
                     break;
                 case RESPONSE:
-                    DataSegment.getLocal().put(msg.key, new RespondData(reverseKey, System.currentTimeMillis()), null);
+                    rData = new ReceiveData(new RespondData(reverseKey, System.currentTimeMillis()), false, false);
+                    DataSegment.getLocal().put(msg.key, rData, null);
                     break;
                 default:
                     break;
--- a/src/main/java/alice/daemon/IncomingUdpConnection.java	Mon Nov 03 13:21:36 2014 +0900
+++ b/src/main/java/alice/daemon/IncomingUdpConnection.java	Mon Nov 03 17:12:53 2014 +0900
@@ -12,6 +12,7 @@
 import alice.datasegment.CommandType;
 import alice.datasegment.DataSegment;
 import alice.datasegment.DataSegmentManager;
+import alice.datasegment.ReceiveData;
 import alice.topology.manager.keeparive.RespondData;
 
 public class IncomingUdpConnection extends IncomingTcpConnection {
@@ -35,6 +36,7 @@
             try {
                 Command cmd = null;
                 byte[] val = null;
+                ReceiveData rData = null;
                 // Max data length is 65507 because of the max length of UDP payload
                 ByteBuffer receive = ByteBuffer.allocate(65507); 
                 receiver.receive(receive);
@@ -44,13 +46,11 @@
                 CommandType type = CommandType.getCommandTypeFromId(msg.type);
                 switch (type) {
                 case UPDATE:
-                case PUT:				    
+                case PUT:
                     val = new byte[unpacker.readInt()];				    
                     receive.get(val);
-                    cmd = new Command(type, null, null, val, 0, 0, null, null, reverseKey);
-                    // these flags express DataSegment status
-                    cmd.setCompressFlag(msg.compressed);
-                    cmd.setSerializeFlag(msg.serialized);
+                    rData = new ReceiveData(val, msg.compressed, msg.serialized);
+                    cmd = new Command(type, null, null, rData, 0, 0, null, null, reverseKey);
                     getLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd);
                     break;
                 case PEEK:
@@ -67,17 +67,16 @@
                     cmd = manager.getAndRemoveCmd(msg.seq);
                     val = new byte[unpacker.readInt()];
                     receive.get(val);
-                    Command rCmd = new Command(type, null, null, val, msg.index, msg.seq, null, null, "");
-                    // these flags express DataSegment status
-                    rCmd.setCompressFlag(msg.compressed);
-                    rCmd.setSerializeFlag(msg.serialized);
+                    rData = new ReceiveData(val, msg.compressed, msg.serialized);
+                    Command rCmd = new Command(type, null, null, rData, msg.index, msg.seq, null, null, "");
                     cmd.cs.ids.reply(cmd.receiver, rCmd);
                     break;
                 case PING:
                     DataSegment.get(reverseKey).response(msg.key);
                     break;
                 case RESPONSE:
-                    DataSegment.getLocal().put(msg.key, new RespondData(reverseKey, System.currentTimeMillis()), null);
+                    rData = new ReceiveData(new RespondData(reverseKey, System.currentTimeMillis()), false, false);
+                    DataSegment.getLocal().put(msg.key, rData, null);
                     break;
                 default:
                     break;
--- a/src/main/java/alice/datasegment/Command.java	Mon Nov 03 13:21:36 2014 +0900
+++ b/src/main/java/alice/datasegment/Command.java	Mon Nov 03 17:12:53 2014 +0900
@@ -18,7 +18,7 @@
     public CommandType type;
     public String key;
     public Receiver receiver;
-    public Object val;
+    public ReceiveData rData;
     public int index;
     public int seq;
     public Connection connection; // for remote
@@ -29,11 +29,11 @@
     private boolean serializeFlag = false;
     private boolean compressFlag = false;
 
-    public Command(CommandType cmdType, Receiver receiver, String key, Object val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) {
+    public Command(CommandType cmdType, Receiver receiver, String key, ReceiveData rData, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) {
         this.type = cmdType;
         this.receiver = receiver;
         this.key = key;
-        this.val = val;
+        this.rData = rData;
         this.index = index;
         this.seq = seq;
         this.replyQueue = replyQueue;
@@ -41,11 +41,11 @@
         this.reverseKey = reverseKey;
     }
 
-    public Command(CommandType cmdType, Receiver receiver, String key, Object val, int index, int seq, CodeSegment cs, String reverseKey, Connection connection) {
+    public Command(CommandType cmdType, Receiver receiver, String key, ReceiveData rData, int index, int seq, CodeSegment cs, String reverseKey, Connection connection) {
         this.type = cmdType;
         this.receiver = receiver;
         this.key = key;
-        this.val = val;
+        this.rData = rData;
         this.index = index;
         this.seq = seq;
         this.connection = connection;
@@ -58,7 +58,7 @@
         if (cs != null) {
             csName = cs.toString();
         }
-        return this.type + "\t" + key + "\t" + val + "\tindex=" + index + "\tcs=" + csName;
+        return this.type + "\t" + key + "\t" + rData + "\tindex=" + index + "\tcs=" + csName;
     }
       
     /**
@@ -71,6 +71,8 @@
             byte[] header = null;
             byte[] data = null;
             byte[] dataSize = null;
+            boolean serialized = false;
+            boolean compressed = false;
             switch (type) {
             /*
              * UPDATE, PUT, REPLY need send DataSegment to RemoteDataSegment
@@ -81,36 +83,34 @@
              * for example, serializeFlag is true. DataSegment had already converted, so no need convert.
              */
             case UPDATE:
-            case PUT:                
-                if (!serializeFlag) {
-                    data = (byte[]) val;
+            case PUT:
+            case REPLY:
+                if (rData.compressed()) {
+                    // have already converted
+                    data = (byte[]) rData.getObj();
+                    compressed = rData.compressed(); // true
+                    serialized = rData.serialized();
                 } else {
-                    data = msg.write(val);
+                    if (!rData.serialized() && !rData.isByteArray()) {
+                        data = msg.write(rData.getObj());
+                        serialized = true;
+                    } else { // rData is RAW ByteArray or already serialized
+                        data = (byte[]) rData.getObj();
+                        serialized = rData.serialized();
+                    }                    
+                    if (compressFlag) {
+                        data = zip(data);
+                        compressed = true;
+                    }
                 }
-                if (compressFlag) {
-                    data = zip(data);
-                }
-                header = msg.write(new CommandMessage(type.id, index, seq, key, quickFlag, serializeFlag, compressFlag));
+                
+                header = msg.write(new CommandMessage(type.id, index, seq, key, false, serialized, compressed));
                 dataSize = msg.write(data.length);                
                 buf = ByteBuffer.allocate(header.length+dataSize.length+data.length);
                 buf.put(header);
                 buf.put(dataSize);
                 buf.put(data);
                 break;
-            case REPLY: // only serialize
-                if (serializeFlag) {
-                    data = (byte[]) val;
-                } else {
-                    data = msg.write(val);
-                    this.serializeFlag = true;
-                }
-                header = msg.write(new CommandMessage(type.id, index, seq, key, quickFlag, serializeFlag, compressFlag));
-                dataSize = msg.write(data.length);
-                buf = ByteBuffer.allocate(header.length+dataSize.length+data.length);
-                buf.put(header);
-                buf.put(dataSize);
-                buf.put(data);
-                break;
             default:
                 header = msg.write(new CommandMessage(type.id, index, seq, key, quickFlag, serializeFlag, compressFlag));
                 buf = ByteBuffer.allocate(header.length);
@@ -125,6 +125,13 @@
         return buf;
     }
     
+    /**
+     * If this flag is true, command isn't send queue.
+     * command is executed right now.
+     * 
+     * @param flag
+     */
+    
     public void setQuickFlag(boolean flag){
         quickFlag = flag;
     }
@@ -133,6 +140,13 @@
         return quickFlag;
     }
     
+    /**
+     * If this flag is true, DataSegment isn't serialized.
+     * Alice auto select true or false.
+     *   
+     * @param flag
+     */
+    
     public void setSerializeFlag(boolean flag){
         serializeFlag = flag;
     }
@@ -141,6 +155,13 @@
         return serializeFlag;
     }
     
+    /**
+     * Before sending Remote DataSegment, DataSegment type is ByteArray.
+     * If this flag true, ByteArray is compressed with ZRLEE(ZRIB) algorithm
+     * 
+     * @param flag
+     */
+    
     public void setCompressFlag(boolean flag){
         compressFlag = flag;
     }
--- a/src/main/java/alice/datasegment/DataSegmentKey.java	Mon Nov 03 13:21:36 2014 +0900
+++ b/src/main/java/alice/datasegment/DataSegmentKey.java	Mon Nov 03 17:12:53 2014 +0900
@@ -25,7 +25,7 @@
         case PUT:
             int index = tailIndex;
             tailIndex++;
-            DataSegmentValue dsv = new DataSegmentValue(index, cmd.val, cmd.reverseKey, cmd.getCompressFlag(), cmd.getSerializeFlag()); 
+            DataSegmentValue dsv = new DataSegmentValue(index, cmd.rData, cmd.reverseKey); 
             dataList.add(dsv);
             // Process waiting peek and take commands
             for (Iterator<Command> iter = waitList.iterator(); iter.hasNext(); ) {
@@ -83,14 +83,12 @@
     }
 
     public void replyValue(Command cmd, DataSegmentValue data){
-        Command rCmd = new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null, data.from);
-        rCmd.setCompressFlag(data.compressed);
-        rCmd.setSerializeFlag(data.serialized);
+        Command rCmd = new Command(CommandType.REPLY, null, null, data.rData, data.index, cmd.seq, null, null, data.from);
         if (cmd.cs!=null){ // if cmd has cs-instance, it means Command from local.
             cmd.cs.ids.reply(cmd.receiver, rCmd);
         } else {
             try {
-                if (!cmd.getQuickFlag()){ 
+                if (!cmd.getQuickFlag()) {
                     cmd.connection.sendQueue.put(rCmd);
                 } else {
                     cmd.connection.write(rCmd);
--- a/src/main/java/alice/datasegment/DataSegmentManager.java	Mon Nov 03 13:21:36 2014 +0900
+++ b/src/main/java/alice/datasegment/DataSegmentManager.java	Mon Nov 03 17:12:53 2014 +0900
@@ -50,8 +50,8 @@
         }
     }
 
-    public abstract void put(String key, Object val, SendOption option);
-    public abstract void update(String key, Object val, SendOption option);
+    public abstract void put(String key, ReceiveData rData, SendOption option);
+    public abstract void update(String key, ReceiveData rData, SendOption option);
     public abstract void peek(Receiver receiver, CodeSegment cs, SendOption option);
     public abstract void take(Receiver receiver, CodeSegment cs, SendOption option);
 
--- a/src/main/java/alice/datasegment/DataSegmentValue.java	Mon Nov 03 13:21:36 2014 +0900
+++ b/src/main/java/alice/datasegment/DataSegmentValue.java	Mon Nov 03 17:12:53 2014 +0900
@@ -3,18 +3,13 @@
 public class DataSegmentValue {
 
     public int index;
-    public Object val;
+    public ReceiveData rData;
     public String from;
-    public boolean compressed;
-    public boolean serialized;
 
-    public DataSegmentValue(int index, Object val, String reverseKey, 
-            boolean compressed, boolean serialized) {
+    public DataSegmentValue(int index, ReceiveData rData, String reverseKey) {
         this.index = index;
-        this.val = val;
+        this.rData = rData;
         this.from = reverseKey;
-        this.compressed = compressed;
-        this.serialized = serialized;
     }
     
 }
--- a/src/main/java/alice/datasegment/LocalDataSegmentManager.java	Mon Nov 03 13:21:36 2014 +0900
+++ b/src/main/java/alice/datasegment/LocalDataSegmentManager.java	Mon Nov 03 17:12:53 2014 +0900
@@ -65,9 +65,9 @@
     }
 
     @Override
-    public void put(String key, Object val, SendOption option) {
+    public void put(String key, ReceiveData rData, SendOption option) {
         DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
-        Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, reverseKey);
+        Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, reverseKey);
         dataSegmentKey.runCommand(cmd);
         if (logger.isDebugEnabled())
             logger.debug(cmd.getCommandString());
@@ -78,9 +78,9 @@
      */
 
     @Override
-    public void update(String key, Object val, SendOption option) {
+    public void update(String key, ReceiveData rData, SendOption option) {
         DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
-        Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, reverseKey);
+        Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, reverseKey);
         dataSegmentKey.runCommand(cmd);
         if (logger.isDebugEnabled())
             logger.debug(cmd.getCommandString());
--- a/src/main/java/alice/datasegment/Receiver.java	Mon Nov 03 13:21:36 2014 +0900
+++ b/src/main/java/alice/datasegment/Receiver.java	Mon Nov 03 17:12:53 2014 +0900
@@ -109,4 +109,7 @@
         return data.getVal();
     }
 
+    public ReceiveData getReceiveData() {
+        return data;
+    }
 }
--- a/src/main/java/alice/datasegment/RemoteDataSegmentManager.java	Mon Nov 03 13:21:36 2014 +0900
+++ b/src/main/java/alice/datasegment/RemoteDataSegmentManager.java	Mon Nov 03 17:12:53 2014 +0900
@@ -51,8 +51,8 @@
      * send put command to target DataSegment
      */
     @Override
-    public void put(String key, Object val, SendOption option) {
-        Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, "");
+    public void put(String key, ReceiveData rData, SendOption option) {
+        Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, "");
         cmd.setSerializeFlag(true);
         if (option.isQuick()){
             connection.write(cmd); // put command is executed right now
@@ -64,8 +64,8 @@
     }
 
     @Override
-    public void update(String key, Object val, SendOption option) {
-        Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, "");
+    public void update(String key, ReceiveData rData, SendOption option) {
+        Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, "");
         cmd.setSerializeFlag(true);
         if (option.isQuick()){
             connection.write(cmd);