changeset 445:86b74532e66c dispose

change Protocol
author sugi
date Sun, 26 Oct 2014 18:21:48 +0900
parents 8f006f9d1b9c
children a91890dff56e
files src/main/java/alice/daemon/IncomingTcpConnection.java src/main/java/alice/datasegment/Command.java src/main/java/alice/datasegment/DataSegmentKey.java src/main/java/alice/datasegment/RemoteDataSegmentManager.java
diffstat 4 files changed, 31 insertions(+), 15 deletions(-) [+]
line wrap: on
line diff
--- a/src/main/java/alice/daemon/IncomingTcpConnection.java	Sat Oct 18 22:26:15 2014 +0900
+++ b/src/main/java/alice/daemon/IncomingTcpConnection.java	Sun Oct 26 18:21:48 2014 +0900
@@ -42,7 +42,7 @@
     public void run() {
         Unpacker unpacker = null;
         try {
-            unpacker = SingletonMessage.getInstance().createUnpacker(connection.socket.getInputStream());            
+            unpacker = SingletonMessage.getInstance().createUnpacker(connection.socket.getInputStream());
         } catch (IOException e) {
             e.printStackTrace();
         }
--- a/src/main/java/alice/datasegment/Command.java	Sat Oct 18 22:26:15 2014 +0900
+++ b/src/main/java/alice/datasegment/Command.java	Sun Oct 26 18:21:48 2014 +0900
@@ -22,7 +22,8 @@
     public CodeSegment cs;
     public String reverseKey;
     public Object obj;
-    public boolean flag;
+    public boolean quickFlag;
+    private boolean serializeFlag = true;
 
     public Command(CommandType cmdType, Receiver receiver, String key, byte[] val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) {
         this.type = cmdType;
@@ -34,7 +35,7 @@
         this.replyQueue = replyQueue;
         this.cs = cs;
         this.reverseKey = reverseKey;
-        this.flag = false;
+        this.quickFlag = false;
     }
 
     public Command(CommandType cmdType, Receiver receiver, String key, byte[] val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey, boolean flag) {
@@ -47,7 +48,7 @@
         this.replyQueue = replyQueue;
         this.cs = cs;
         this.reverseKey = reverseKey;
-        this.flag = flag;
+        this.quickFlag = flag;
     }
 
     public Command(CommandType cmdType, Receiver receiver, String key, byte[] val, int index, int seq, Connection connection, CodeSegment cs, String reverseKey, boolean flag) {
@@ -60,7 +61,7 @@
         this.connection = connection;
         this.cs = cs;
         this.reverseKey = reverseKey;
-        this.flag = flag;
+        this.quickFlag = flag;
     }
 
     public Command(CommandType cmdType, Receiver receiver, String key, Object obj, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) {
@@ -73,7 +74,7 @@
         this.replyQueue = replyQueue;
         this.cs = cs;
         this.reverseKey = reverseKey;
-        this.flag = false;
+        this.quickFlag = false;
     }
 
     public Command(CommandType cmdType, Receiver receiver, String key, byte[] val, Object obj, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) {
@@ -87,7 +88,7 @@
         this.replyQueue = replyQueue;
         this.cs = cs;
         this.reverseKey = reverseKey;
-        this.flag = false;
+        this.quickFlag = false;
     }
 
     public String getCommandString() {
@@ -97,22 +98,27 @@
         }
         return this.type + "\t" + key + "\t" + val + "\tindex=" + index + "\tcs=" + csName;
     }
+      
+    /**
+     * @return serialized ByteBuffer
+     */
     public ByteBuffer convert() {
         ByteBuffer buf = null;
         MessagePack msg = SingletonMessage.getInstance();
         try {
-            byte[] header = msg.write(new CommandMessage(type.id, index, seq, key, flag));
+            byte[] header = msg.write(new CommandMessage(type.id, index, seq, key, quickFlag));
             
             switch (type) {
             case UPDATE:
             case PUT:
             case REPLY:
-            case RESPONSE:
                 byte[] data = null;
-                if (val==null&&obj!=null){
+                if (val!=null) {
+                    data = val;
+                } else if (!serializeFlag) {
+                    data = (byte[]) obj;
+                } else if (val==null && obj!=null) {
                     data = msg.write(obj);
-                } else if (val!=null) {
-                    data = val;
                 }
                 byte[] dataSize = msg.write(data.length);
                 
@@ -121,7 +127,6 @@
                 buf.put(dataSize);
                 buf.put(data);                
                 break;
-
             default:
                 buf = ByteBuffer.allocate(header.length);
                 buf.put(header);
@@ -134,5 +139,8 @@
         }
         return buf;
     }
-
+    
+    public void setSerializeFlag(boolean flag){
+        serializeFlag = flag;
+    }
 }
--- a/src/main/java/alice/datasegment/DataSegmentKey.java	Sat Oct 18 22:26:15 2014 +0900
+++ b/src/main/java/alice/datasegment/DataSegmentKey.java	Sun Oct 26 18:21:48 2014 +0900
@@ -87,7 +87,7 @@
             cmd.cs.ids.reply(cmd.receiver, new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, cmd.seq, null, null, data.from));
         } else {
             try {
-                if (!cmd.flag){ 
+                if (!cmd.quickFlag){ 
                     cmd.connection.sendQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, cmd.seq, null, null, data.from));
                 }
                 else {
--- a/src/main/java/alice/datasegment/RemoteDataSegmentManager.java	Sat Oct 18 22:26:15 2014 +0900
+++ b/src/main/java/alice/datasegment/RemoteDataSegmentManager.java	Sun Oct 26 18:21:48 2014 +0900
@@ -57,6 +57,14 @@
         if (logger.isDebugEnabled())
             logger.debug(cmd.getCommandString());
     }
+    
+    public void put(String key, byte[] val) {
+        Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null);
+        cmd.setSerializeFlag(false);
+        connection.sendCommand(cmd); // put command on the transmission thread
+        if (logger.isDebugEnabled())
+            logger.debug(cmd.getCommandString());
+    }
 
     @Override
     public void quickPut(String key, Object val) {