changeset 452:f68d103498e0 dispose

refactor (InputDataSegment holder class changed)
author sugi
date Tue, 28 Oct 2014 17:24:16 +0900
parents ad1547756565
children 8470db2523d5
files src/main/java/alice/codesegment/InputDataSegment.java src/main/java/alice/daemon/CommandMessage.java src/main/java/alice/daemon/IncomingTcpConnection.java src/main/java/alice/datasegment/Command.java src/main/java/alice/datasegment/DataSegmentKey.java src/main/java/alice/datasegment/DataSegmentValue.java src/main/java/alice/datasegment/ReceiveData.java src/main/java/alice/datasegment/Receiver.java src/main/java/alice/datasegment/RemoteDataSegmentManager.java
diffstat 9 files changed, 201 insertions(+), 90 deletions(-) [+]
line wrap: on
line diff
--- a/src/main/java/alice/codesegment/InputDataSegment.java	Tue Oct 28 11:07:23 2014 +0900
+++ b/src/main/java/alice/codesegment/InputDataSegment.java	Tue Oct 28 17:24:16 2014 +0900
@@ -5,8 +5,7 @@
 import alice.datasegment.Command;
 import alice.datasegment.CommandType;
 import alice.datasegment.DataSegment;
-import alice.datasegment.ReceiveLocalData;
-import alice.datasegment.ReceiveRemoteData;
+import alice.datasegment.ReceiveData;
 import alice.datasegment.Receiver;
 
 /**
@@ -68,14 +67,8 @@
 
     public void reply(Receiver receiver, Command reply) {
         receiver.index = reply.index;
-        receiver.from = reply.reverseKey;
-        if (reply.reverseKey==null){
-            receiver.setData(new ReceiveRemoteData(reply.val));
-        } else if (!reply.reverseKey.equals("local")) {
-            receiver.setData(new ReceiveRemoteData(reply.val));
-        } else {
-            receiver.setData(new ReceiveLocalData(reply.obj));			
-        }
+        receiver.from = reply.reverseKey;        
+        receiver.setData(new ReceiveData(reply.val, reply.getCompressFlag(), reply.getSerializeFlag()));
         receive();
     }
 
--- a/src/main/java/alice/daemon/CommandMessage.java	Tue Oct 28 11:07:23 2014 +0900
+++ b/src/main/java/alice/daemon/CommandMessage.java	Tue Oct 28 17:24:16 2014 +0900
@@ -8,9 +8,9 @@
     public int index;
     public int seq;
     public String key;
-    public boolean quickFlag;
-    public boolean serializedFlag;
-    public boolean compressedFlag;
+    public boolean quickFlag = false;
+    public boolean serialized = false;
+    public boolean compressed = false;
     
     public CommandMessage() {}
 
@@ -21,7 +21,7 @@
         this.seq = seq;
         this.key = key;
         this.quickFlag = qFlag;
-        this.serializedFlag = sFlag;
-        this.compressedFlag = cFlag;
+        this.serialized = sFlag;
+        this.compressed = cFlag;
     }
 }
--- a/src/main/java/alice/daemon/IncomingTcpConnection.java	Tue Oct 28 11:07:23 2014 +0900
+++ b/src/main/java/alice/daemon/IncomingTcpConnection.java	Tue Oct 28 17:24:16 2014 +0900
@@ -60,6 +60,9 @@
                 case PUT:
                     val = getSerializedByteArray(unpacker);
                     cmd = new Command(type, null, null, val, 0, 0, null, null, reverseKey);
+                    // these flags express DataSegment status
+                    cmd.setCompressFlag(msg.compressed);
+                    cmd.setSerializeFlag(msg.serialized);
                     lmanager.getDataSegmentKey(msg.key).runCommand(cmd);
                     break;
                 case PEEK:
@@ -75,7 +78,10 @@
                 case REPLY:
                     cmd = manager.getAndRemoveCmd(msg.seq);
                     val = getSerializedByteArray(unpacker);
-                    cmd.cs.ids.reply(cmd.receiver, new Command(type, null, null, val, msg.index, msg.seq, null, null, ""));
+                    Command rCmd = new Command(type, null, null, val, msg.index, msg.seq, null, null, "");
+                    rCmd.setCompressFlag(msg.compressed);
+                    rCmd.setSerializeFlag(msg.serialized);
+                    cmd.cs.ids.reply(cmd.receiver, rCmd);
                     break;
                 case PING:
                     DataSegment.get(reverseKey).response(msg.key);
--- a/src/main/java/alice/datasegment/Command.java	Tue Oct 28 11:07:23 2014 +0900
+++ b/src/main/java/alice/datasegment/Command.java	Tue Oct 28 17:24:16 2014 +0900
@@ -1,10 +1,14 @@
 package alice.datasegment;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.BlockingQueue;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
 
 import org.msgpack.MessagePack;
+
 import alice.codesegment.CodeSegment;
 import alice.codesegment.SingletonMessage;
 import alice.daemon.CommandMessage;
@@ -14,19 +18,18 @@
     public CommandType type;
     public String key;
     public Receiver receiver;
-    public byte[] val;
+    public Object val;
     public int index;
     public int seq;
     public Connection connection; // for remote
     public BlockingQueue<Command> replyQueue;
     public CodeSegment cs;
     public String reverseKey;
-    public Object obj;
     private boolean quickFlag = false;
-    private boolean serializeFlag = true;
-    private boolean compressFlag = true;
+    private boolean serializeFlag = false;
+    private boolean compressFlag = false;
 
-    public Command(CommandType cmdType, Receiver receiver, String key, byte[] val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) {
+    public Command(CommandType cmdType, Receiver receiver, String key, Object val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) {
         this.type = cmdType;
         this.receiver = receiver;
         this.key = key;
@@ -36,10 +39,9 @@
         this.replyQueue = replyQueue;
         this.cs = cs;
         this.reverseKey = reverseKey;
-        this.quickFlag = false;
     }
 
-    public Command(CommandType cmdType, Receiver receiver, String key, byte[] val, int index, int seq, CodeSegment cs, String reverseKey, Connection connection) {
+    public Command(CommandType cmdType, Receiver receiver, String key, Object val, int index, int seq, CodeSegment cs, String reverseKey, Connection connection) {
         this.type = cmdType;
         this.receiver = receiver;
         this.key = key;
@@ -51,31 +53,6 @@
         this.reverseKey = reverseKey;
     }
 
-    public Command(CommandType cmdType, Receiver receiver, String key, Object obj, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) {
-        this.type = cmdType;
-        this.receiver = receiver;
-        this.key = key;
-        this.obj = obj;
-        this.index = index;
-        this.seq = seq;
-        this.replyQueue = replyQueue;
-        this.cs = cs;
-        this.reverseKey = reverseKey;
-    }
-
-    public Command(CommandType cmdType, Receiver receiver, String key, byte[] val, Object obj, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) {
-        this.type = cmdType;
-        this.receiver = receiver;
-        this.key = key;
-        this.val = val;
-        this.obj = obj;
-        this.index = index;
-        this.seq = seq;
-        this.replyQueue = replyQueue;
-        this.cs = cs;
-        this.reverseKey = reverseKey;
-    }
-
     public String getCommandString() {
         String csName = "null";
         if (cs != null) {
@@ -91,28 +68,57 @@
         ByteBuffer buf = null;
         MessagePack msg = SingletonMessage.getInstance();
         try {
-            byte[] header = msg.write(new CommandMessage(type.id, index, seq, key, quickFlag, serializeFlag, compressFlag));
-            
+            byte[] header = null;
+            byte[] data = null;
+            byte[] dataSize = null;
             switch (type) {
+            /*
+             * UPDATE, PUT, REPLY need send DataSegment to RemoteDataSegment
+             * case UPDATE and PUT
+             * compress and serialize flag are selected by user, so if true, need convert.
+             * case REPLY
+             * these flags represent DataSegment status.
+             * for example, serializeFlag is true. DataSegment had already converted, so no need convert.
+             */
             case UPDATE:
-            case PUT:
-            case REPLY:
-                byte[] data = null;
-                if (val!=null) {
-                    data = val;
-                } else if (!serializeFlag) {
-                    data = (byte[]) obj;
-                } else if (val==null && obj!=null) {
-                    data = msg.write(obj);
+            case PUT:                
+                if (!serializeFlag) {
+                    data = (byte[]) val;
+                } else {
+                    long start = System.currentTimeMillis();
+                    data = msg.write(val);
+                    long end = System.currentTimeMillis();
+                    System.out.println("convert DataSegment" +(end - start));
                 }
-                byte[] dataSize = msg.write(data.length);
-                
+                if (compressFlag) {
+                    data = zip(data);
+                }
+                header = msg.write(new CommandMessage(type.id, index, seq, key, quickFlag, serializeFlag, compressFlag));
+                dataSize = msg.write(data.length);                
                 buf = ByteBuffer.allocate(header.length+dataSize.length+data.length);
                 buf.put(header);
                 buf.put(dataSize);
-                buf.put(data);                
+                long start = System.currentTimeMillis();
+                buf.put(data);
+                long end = System.currentTimeMillis();
+                System.out.println("put DataSegment" +(end - start));
+                break;
+            case REPLY: // only serialize
+                if (serializeFlag) {
+                    data = (byte[]) val;
+                } else {
+                    data = msg.write(val);
+                    this.serializeFlag = true;
+                }
+                header = msg.write(new CommandMessage(type.id, index, seq, key, quickFlag, serializeFlag, compressFlag));
+                dataSize = msg.write(data.length);
+                buf = ByteBuffer.allocate(header.length+dataSize.length+data.length);
+                buf.put(header);
+                buf.put(dataSize);
+                buf.put(data);
                 break;
             default:
+                header = msg.write(new CommandMessage(type.id, index, seq, key, quickFlag, serializeFlag, compressFlag));
                 buf = ByteBuffer.allocate(header.length);
                 buf.put(header);
                 break;
@@ -148,4 +154,13 @@
     public boolean getCompressFlag(){
         return compressFlag;
     }
+    
+    public byte[] zip(byte[] input) throws IOException{
+        Deflater deflater = new Deflater();
+        ByteArrayOutputStream os = new ByteArrayOutputStream();
+        DeflaterOutputStream dos = new DeflaterOutputStream(os, deflater);
+        dos.write(input);
+        dos.finish(); 
+        return os.toByteArray();
+    } 
 }
--- a/src/main/java/alice/datasegment/DataSegmentKey.java	Tue Oct 28 11:07:23 2014 +0900
+++ b/src/main/java/alice/datasegment/DataSegmentKey.java	Tue Oct 28 17:24:16 2014 +0900
@@ -25,13 +25,13 @@
         case PUT:
             int index = tailIndex;
             tailIndex++;
-            DataSegmentValue dsv = new DataSegmentValue(index, cmd.val, cmd.obj, cmd.reverseKey); 
+            DataSegmentValue dsv = new DataSegmentValue(index, cmd.val, cmd.reverseKey, cmd.getCompressFlag(), cmd.getSerializeFlag()); 
             dataList.add(dsv);
             // Process waiting peek and take commands
             for (Iterator<Command> iter = waitList.iterator(); iter.hasNext(); ) {
                 Command waitCmd = iter.next();
                 if (waitCmd.index < index) {
-                    replyValue(waitCmd ,dsv);
+                    replyValue(waitCmd, dsv);
                     iter.remove();
                     if (waitCmd.type == CommandType.TAKE) { // someone is waiting for this put or update command
                         dataList.remove(dsv);
@@ -48,7 +48,7 @@
             boolean waitFlag2 = true;
             for (DataSegmentValue data : dataList) {
                 if (data.index > cmd.index) {
-                    replyValue(cmd ,data);
+                    replyValue(cmd, data);
                     waitFlag2 = false;
                     break;
                 }
@@ -65,7 +65,7 @@
             for (Iterator<DataSegmentValue> iter = dataList.iterator(); iter.hasNext(); ) {
                 DataSegmentValue data = iter.next();
                 if (data.index > cmd.index) {
-                    replyValue(cmd ,data);
+                    replyValue(cmd, data);
                     iter.remove();
                     waitFlag = false;
                     break;
@@ -83,17 +83,18 @@
     }
 
     public void replyValue(Command cmd, DataSegmentValue data){
+        Command rCmd = new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null, data.from);
+        rCmd.setCompressFlag(data.compressed);
+        rCmd.setSerializeFlag(data.serialized);
         if (cmd.cs!=null){ // if cmd has cs-instance, it means Command from local.
-            cmd.cs.ids.reply(cmd.receiver, new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, cmd.seq, null, null, data.from));
+            cmd.cs.ids.reply(cmd.receiver, rCmd);
         } else {
             try {
                 if (!cmd.getQuickFlag()){ 
-                    cmd.connection.sendQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, cmd.seq, null, null, data.from));
+                    cmd.connection.sendQueue.put(rCmd);
+                } else {
+                    cmd.connection.write(rCmd);
                 }
-                else {
-                    cmd.connection.write(new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, cmd.seq, null, null, data.from));
-                }
-
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
--- a/src/main/java/alice/datasegment/DataSegmentValue.java	Tue Oct 28 11:07:23 2014 +0900
+++ b/src/main/java/alice/datasegment/DataSegmentValue.java	Tue Oct 28 17:24:16 2014 +0900
@@ -3,17 +3,18 @@
 public class DataSegmentValue {
 
     public int index;
-    public byte[] val;
+    public Object val;
     public String from;
-    public Object obj;
     public boolean compressed;
     public boolean serialized;
 
-    public DataSegmentValue(int index, byte[] val, Object obj, String reverseKey) {
+    public DataSegmentValue(int index, Object val, String reverseKey, 
+            boolean compressed, boolean serialized) {
         this.index = index;
         this.val = val;
         this.from = reverseKey;
-        this.obj = obj;
+        this.compressed = compressed;
+        this.serialized = serialized;
     }
     
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/alice/datasegment/ReceiveData.java	Tue Oct 28 17:24:16 2014 +0900
@@ -0,0 +1,100 @@
+package alice.datasegment;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.zip.Inflater;
+import java.util.zip.InflaterOutputStream;
+
+import org.msgpack.type.Value;
+
+import alice.codesegment.SingletonMessage;
+
+public class ReceiveData {
+    private Object val;
+    
+    // both flag have to be true or false except DataSegment is byteArray;
+    private boolean compressed = false;
+    private boolean serialized = false;
+    
+    public ReceiveData(Object obj, boolean cFlag, boolean sFlag){
+        val = obj;
+        compressed = cFlag;
+        serialized = sFlag;
+    }
+    
+    public boolean compressed(){
+        return compressed;
+    }
+    
+    public boolean serialized(){
+        return serialized;
+    }
+    
+    public Object getObj(){
+        return val;
+    }
+    
+    public String asString(){
+        if (serialized){
+            return asClass(String.class);
+        } else {
+            return (String) val;
+        }
+    }
+    
+    public int asInteger(){
+        if (serialized){
+            return asClass(Integer.class);
+        } else {
+            return (Integer) val;
+        }
+    }
+    
+    public Float asFloat(){
+        if (serialized){
+            return asClass(Float.class);
+        } else {
+            return (Float) val;
+        }
+    }
+    
+    public Value getVal(){
+        if (serialized){
+            return asClass(Value.class);
+        } else {
+            try {
+                return SingletonMessage.getInstance().unconvert(val);
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+            return null;
+        }
+    }
+    
+    @SuppressWarnings("unchecked")
+    public <T> T asClass(Class<T> clazz) {
+        try {
+            if (!serialized)
+                return (T) val;
+            byte[] b = null;
+            if (compressed) {
+                b = unzip((byte[]) val);                
+            } else {
+                b = (byte[]) val;
+            }
+            return SingletonMessage.getInstance().read(b, clazz);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        return null;
+    }
+    
+    public byte[] unzip(byte[] zipped) throws IOException{
+        Inflater inflater = new Inflater();
+        ByteArrayOutputStream os = new ByteArrayOutputStream();
+        InflaterOutputStream ios = new InflaterOutputStream(os, inflater);
+        ios.write(zipped);
+        ios.finish();
+        return os.toByteArray();
+    }
+}
--- a/src/main/java/alice/datasegment/Receiver.java	Tue Oct 28 11:07:23 2014 +0900
+++ b/src/main/java/alice/datasegment/Receiver.java	Tue Oct 28 17:24:16 2014 +0900
@@ -9,7 +9,7 @@
  *
  */
 public class Receiver {
-    private ReceiverData data = new ReceiveRemoteData();
+    private ReceiveData data;
     public InputDataSegment ids;
     public int index;
     public String from;
@@ -85,7 +85,7 @@
         ids.setKey();
     }
 
-    public void setData(ReceiverData r) {
+    public void setData(ReceiveData r) {
         data = r;
     }
 
--- a/src/main/java/alice/datasegment/RemoteDataSegmentManager.java	Tue Oct 28 11:07:23 2014 +0900
+++ b/src/main/java/alice/datasegment/RemoteDataSegmentManager.java	Tue Oct 28 17:24:16 2014 +0900
@@ -52,15 +52,8 @@
      */
     @Override
     public void put(String key, Object val) {
-        Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null);
-        connection.sendCommand(cmd); // put command on the transmission thread
-        if (logger.isDebugEnabled())
-            logger.debug(cmd.getCommandString());
-    }
-    
-    public void put(String key, byte[] val) {
         Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, "");
-        cmd.setSerializeFlag(false);
+        cmd.setSerializeFlag(true);
         connection.sendCommand(cmd); // put command on the transmission thread
         if (logger.isDebugEnabled())
             logger.debug(cmd.getCommandString());
@@ -68,7 +61,8 @@
 
     @Override
     public void quickPut(String key, Object val) {
-        Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null);
+        Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, "");
+        cmd.setSerializeFlag(true);
         connection.write(cmd); // put command is executed right now
         if (logger.isDebugEnabled())
             logger.debug(cmd.getCommandString());
@@ -76,7 +70,8 @@
 
     @Override
     public void update(String key, Object val) {
-        Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null);
+        Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, "");
+        cmd.setSerializeFlag(true);
         connection.sendCommand(cmd);
         if (logger.isDebugEnabled())
             logger.debug(cmd.getCommandString());
@@ -84,7 +79,8 @@
 
     @Override
     public void quickUpdate(String key, Object val) {
-        Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null);
+        Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, "");
+        cmd.setSerializeFlag(true);
         connection.write(cmd);
         if (logger.isDebugEnabled())
             logger.debug(cmd.getCommandString());
@@ -168,5 +164,4 @@
         connection.close();
     }
 
-
 }