changeset 446:a91890dff56e dispose

refactor
author sugi
date Mon, 27 Oct 2014 00:26:35 +0900
parents 86b74532e66c
children d30451d1882f
files src/main/java/alice/daemon/IncomingTcpConnection.java src/main/java/alice/datasegment/Command.java src/main/java/alice/datasegment/RemoteDataSegmentManager.java
diffstat 3 files changed, 33 insertions(+), 44 deletions(-) [+]
line wrap: on
line diff
--- a/src/main/java/alice/daemon/IncomingTcpConnection.java	Sun Oct 26 18:21:48 2014 +0900
+++ b/src/main/java/alice/daemon/IncomingTcpConnection.java	Mon Oct 27 00:26:35 2014 +0900
@@ -52,36 +52,30 @@
         while (true) {
             try {
                 byte[] val = null;
+                Command cmd = null;
                 CommandMessage msg = unpacker.read(CommandMessage.class);
                 CommandType type = CommandType.getCommandTypeFromId(msg.type);
                 switch (type) {
                 case UPDATE:
-                    val = getSerializedByteArray(unpacker);
-                    lmanager.getDataSegmentKey(msg.key)
-                        .runCommand(new Command(type, null, null, val, 0, 0, null, null, reverseKey));
-                    break;
                 case PUT:
                     val = getSerializedByteArray(unpacker);
-                    lmanager.getDataSegmentKey(msg.key)
-                        .runCommand(new Command(type, null, null, val, 0, 0, null, null, reverseKey));
+                    cmd = new Command(type, null, null, val, 0, 0, null, null, reverseKey);
+                    lmanager.getDataSegmentKey(msg.key).runCommand(cmd);
                     break;
                 case PEEK:
-                    lmanager.getDataSegmentKey(msg.key)
-                        .runCommand(new Command(type, null, null, null, msg.index, msg.seq, connection, null, null, msg.flag));
-                    break;
                 case TAKE:
-                    lmanager.getDataSegmentKey(msg.key)
-                        .runCommand(new Command(type, null, null, null, msg.index, msg.seq, connection, null, null, msg.flag));
+                    cmd = new Command(type, null, null, null, msg.index, msg.seq, null, null, connection);
+                    cmd.setQuickFlag(msg.flag);
+                    lmanager.getDataSegmentKey(msg.key).runCommand(cmd);
                     break;	
                 case REMOVE:
-                    lmanager.getDataSegmentKey(msg.key)
-                        .runCommand(new Command(type, null, null, null, 0, 0, null, null, null));
+                    cmd = new Command(type, null, null, null, 0, 0, null, null, "");
+                    lmanager.getDataSegmentKey(msg.key).runCommand(cmd);
                     break;
                 case REPLY:
-                    Command cmd = manager.getAndRemoveCmd(msg.seq);
+                    cmd = manager.getAndRemoveCmd(msg.seq);
                     val = getSerializedByteArray(unpacker);
-                    cmd.cs.ids.reply(cmd.receiver, new Command(type, null, null, val, msg.index, msg.seq, null, null, null));
-                    cmd=null;
+                    cmd.cs.ids.reply(cmd.receiver, new Command(type, null, null, val, msg.index, msg.seq, null, null, ""));
                     break;
                 case PING:
                     DataSegment.get(reverseKey).response(msg.key);
@@ -94,10 +88,10 @@
                 }
             } catch (ClosedChannelException e) {
                 connection.putConnectionInfo();
-                connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null));
+                connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, reverseKey));
                 return;
             } catch (EOFException e) {
-                connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null));
+                connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, reverseKey));
                 return;
             } catch (IOException e) {
                 e.printStackTrace();
--- a/src/main/java/alice/datasegment/Command.java	Sun Oct 26 18:21:48 2014 +0900
+++ b/src/main/java/alice/datasegment/Command.java	Mon Oct 27 00:26:35 2014 +0900
@@ -22,8 +22,9 @@
     public CodeSegment cs;
     public String reverseKey;
     public Object obj;
-    public boolean quickFlag;
+    public boolean quickFlag = false;
     private boolean serializeFlag = true;
+    private boolean compressFlag = true;
 
     public Command(CommandType cmdType, Receiver receiver, String key, byte[] val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) {
         this.type = cmdType;
@@ -38,20 +39,7 @@
         this.quickFlag = false;
     }
 
-    public Command(CommandType cmdType, Receiver receiver, String key, byte[] val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey, boolean flag) {
-        this.type = cmdType;
-        this.receiver = receiver;
-        this.key = key;
-        this.val = val;
-        this.index = index;
-        this.seq = seq;
-        this.replyQueue = replyQueue;
-        this.cs = cs;
-        this.reverseKey = reverseKey;
-        this.quickFlag = flag;
-    }
-
-    public Command(CommandType cmdType, Receiver receiver, String key, byte[] val, int index, int seq, Connection connection, CodeSegment cs, String reverseKey, boolean flag) {
+    public Command(CommandType cmdType, Receiver receiver, String key, byte[] val, int index, int seq, CodeSegment cs, String reverseKey, Connection connection) {
         this.type = cmdType;
         this.receiver = receiver;
         this.key = key;
@@ -61,7 +49,6 @@
         this.connection = connection;
         this.cs = cs;
         this.reverseKey = reverseKey;
-        this.quickFlag = flag;
     }
 
     public Command(CommandType cmdType, Receiver receiver, String key, Object obj, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) {
@@ -74,7 +61,6 @@
         this.replyQueue = replyQueue;
         this.cs = cs;
         this.reverseKey = reverseKey;
-        this.quickFlag = false;
     }
 
     public Command(CommandType cmdType, Receiver receiver, String key, byte[] val, Object obj, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) {
@@ -88,7 +74,6 @@
         this.replyQueue = replyQueue;
         this.cs = cs;
         this.reverseKey = reverseKey;
-        this.quickFlag = false;
     }
 
     public String getCommandString() {
@@ -140,7 +125,15 @@
         return buf;
     }
     
+    public void setQuickFlag(boolean flag){
+        quickFlag = flag;
+    }
+    
     public void setSerializeFlag(boolean flag){
         serializeFlag = flag;
     }
+    
+    public void setCompressFlag(boolean flag){
+        compressFlag = flag;
+    }
 }
--- a/src/main/java/alice/datasegment/RemoteDataSegmentManager.java	Sun Oct 26 18:21:48 2014 +0900
+++ b/src/main/java/alice/datasegment/RemoteDataSegmentManager.java	Mon Oct 27 00:26:35 2014 +0900
@@ -59,7 +59,7 @@
     }
     
     public void put(String key, byte[] val) {
-        Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null);
+        Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, "");
         cmd.setSerializeFlag(false);
         connection.sendCommand(cmd); // put command on the transmission thread
         if (logger.isDebugEnabled())
@@ -102,7 +102,8 @@
 
     public void quickTake(Receiver receiver, CodeSegment cs) {
         int seq = this.seq.getAndIncrement();
-        Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null, true);
+        Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
+        cmd.setQuickFlag(true);
         seqHash.put(seq, cmd);
         connection.write(cmd);
         if (logger.isDebugEnabled())
@@ -121,7 +122,8 @@
 
     public void quickPeek(Receiver receiver, CodeSegment cs) {
         int seq = this.seq.getAndIncrement();
-        Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null, true);
+        Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
+        cmd.setQuickFlag(true);
         seqHash.put(seq, cmd);
         connection.write(cmd);
         if (logger.isDebugEnabled())
@@ -131,7 +133,7 @@
 
     @Override
     public void remove(String key) {
-        Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, null, null, null);
+        Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, null, null, "");
         connection.sendCommand(cmd);
         if (logger.isDebugEnabled())
             logger.debug(cmd.getCommandString());
@@ -139,25 +141,25 @@
 
     @Override
     public void finish() {
-        Command cmd = new Command(CommandType.FINISH, null, null, null, 0, 0, null, null, null);
+        Command cmd = new Command(CommandType.FINISH, null, null, null, 0, 0, null, null, "");
         connection.sendCommand(cmd);
     }
 
     @Override
     public void ping(String returnKey) {
-        Command cmd = new Command(CommandType.PING, null, returnKey, null, 0, 0, null, null, null);
+        Command cmd = new Command(CommandType.PING, null, returnKey, null, 0, 0, null, null, "");
         connection.write(cmd);
     }
 
     @Override
     public void response(String returnKey) {
-        Command cmd = new Command(CommandType.RESPONSE, null, returnKey, null, 0, 0, null, null, null);
+        Command cmd = new Command(CommandType.RESPONSE, null, returnKey, null, 0, 0, null, null, "");
         connection.write(cmd);
     }
 
     @Override
     public void close() {
-        Command cmd = new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null);
+        Command cmd = new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, "");
         connection.sendCommand(cmd);
     }