changeset 443:2f2623484b77 dispose

change protocol
author sugi
date Sat, 18 Oct 2014 19:30:13 +0900
parents 2338b1ef29e8
children 8f006f9d1b9c
files src/main/java/alice/daemon/CommandMessage.java src/main/java/alice/daemon/Connection.java src/main/java/alice/daemon/IncomingTcpConnection.java src/main/java/alice/daemon/IncomingUdpConnection.java src/main/java/alice/daemon/MulticastConnection.java src/main/java/alice/datasegment/Command.java src/main/java/alice/datasegment/DataSegmentValue.java src/main/java/alice/datasegment/ReceiveLocalData.java src/main/java/alice/datasegment/ReceiveRemoteData.java src/main/java/alice/datasegment/ReceiverData.java
diffstat 10 files changed, 90 insertions(+), 82 deletions(-) [+]
line wrap: on
line diff
--- a/src/main/java/alice/daemon/CommandMessage.java	Sat Oct 04 08:52:34 2014 +0900
+++ b/src/main/java/alice/daemon/CommandMessage.java	Sat Oct 18 19:30:13 2014 +0900
@@ -1,7 +1,6 @@
 package alice.daemon;
 
 import org.msgpack.annotation.Message;
-import org.msgpack.type.Value;
 
 @Message
 public class CommandMessage {
@@ -9,17 +8,15 @@
     public int index;
     public int seq;
     public String key;
-    public Value val;
     public boolean flag;
 
     public CommandMessage() {}
 
-    public CommandMessage(int type, int index, int seq, String key, Value val, boolean flag) {
+    public CommandMessage(int type, int index, int seq, String key, boolean flag) {
         this.type = type;
         this.index = index;
         this.seq = seq;
         this.key = key;
-        this.val = val;
         this.flag = flag;
     }
 }
--- a/src/main/java/alice/daemon/Connection.java	Sat Oct 04 08:52:34 2014 +0900
+++ b/src/main/java/alice/daemon/Connection.java	Sat Oct 18 19:30:13 2014 +0900
@@ -6,7 +6,6 @@
 import java.nio.channels.ClosedChannelException;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import alice.codesegment.SingletonMessage;
 import alice.datasegment.Command;
 import alice.datasegment.DataSegment;
 
@@ -35,10 +34,8 @@
     }
 
     public synchronized void write(Command cmd) {	
-        CommandMessage cmdMsg = cmd.convert();
-        ByteBuffer buffer;
+        ByteBuffer buffer = cmd.convert();
         try {
-            buffer = ByteBuffer.wrap(SingletonMessage.getInstance().write(cmdMsg));
             while (buffer.hasRemaining()) {
                 socket.getChannel().write(buffer);
             }
--- a/src/main/java/alice/daemon/IncomingTcpConnection.java	Sat Oct 04 08:52:34 2014 +0900
+++ b/src/main/java/alice/daemon/IncomingTcpConnection.java	Sat Oct 18 19:30:13 2014 +0900
@@ -4,6 +4,7 @@
 import java.io.IOException;
 import java.nio.channels.ClosedChannelException;
 
+import org.msgpack.unpacker.MessagePackUnpacker;
 import org.msgpack.unpacker.Unpacker;
 
 import alice.codesegment.SingletonMessage;
@@ -41,7 +42,7 @@
     public void run() {
         Unpacker unpacker = null;
         try {
-            unpacker = SingletonMessage.getInstance().createUnpacker(connection.socket.getInputStream());
+            unpacker = SingletonMessage.getInstance().createUnpacker(connection.socket.getInputStream());            
         } catch (IOException e) {
             e.printStackTrace();
         }
@@ -50,16 +51,19 @@
         }
         while (true) {
             try {
+                byte[] val = null;
                 CommandMessage msg = unpacker.read(CommandMessage.class);
                 CommandType type = CommandType.getCommandTypeFromId(msg.type);
                 switch (type) {
                 case UPDATE:
+                    val = getSerializedByteArray(unpacker);
                     lmanager.getDataSegmentKey(msg.key)
-                        .runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey));
+                        .runCommand(new Command(type, null, null, val, 0, 0, null, null, reverseKey));
                     break;
                 case PUT:
+                    val = getSerializedByteArray(unpacker);
                     lmanager.getDataSegmentKey(msg.key)
-                        .runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey));
+                        .runCommand(new Command(type, null, null, val, 0, 0, null, null, reverseKey));
                     break;
                 case PEEK:
                     lmanager.getDataSegmentKey(msg.key)
@@ -75,7 +79,8 @@
                     break;
                 case REPLY:
                     Command cmd = manager.getAndRemoveCmd(msg.seq);
-                    cmd.cs.ids.reply(cmd.receiver, new Command(type, null, null, msg.val, msg.index, msg.seq, null, null, null));
+                    val = getSerializedByteArray(unpacker);
+                    cmd.cs.ids.reply(cmd.receiver, new Command(type, null, null, val, msg.index, msg.seq, null, null, null));
                     cmd=null;
                     break;
                 case PING:
@@ -98,6 +103,18 @@
                 e.printStackTrace();
             }
         }
+        
     }
 
+    private byte[] getSerializedByteArray(Unpacker unpacker) {
+        int len;
+        byte[] b = null;
+        try {
+            len = unpacker.readInt();
+            b = ((MessagePackUnpacker) unpacker).getSerializedByteArray(len);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        return b;
+    }
 }
--- a/src/main/java/alice/daemon/IncomingUdpConnection.java	Sat Oct 04 08:52:34 2014 +0900
+++ b/src/main/java/alice/daemon/IncomingUdpConnection.java	Sat Oct 18 19:30:13 2014 +0900
@@ -16,7 +16,8 @@
 
 public class IncomingUdpConnection extends IncomingTcpConnection {
 	// receive Data set into LocalDataSegment now but need to set into MulticastDataSegment.
-	// and this implement has problem. If over 4096 data receive, can not read.
+	// and this implement has problem. If over 65507 data receive, can not read.
+    // but  Max data length is 65507 because of the max length of UDP payload
 	
 	public MulticastConnection receiver;
 	public MulticastConnection sender;
@@ -30,21 +31,23 @@
 	@Override
 	public void run() {
 		while (true){
-			try {				
-				ByteBuffer receive = ByteBuffer.allocate(4096); 
+			try {
+			    // Max data length is 65507 because of the max length of UDP payload
+				ByteBuffer receive = ByteBuffer.allocate(65507); 
 				receiver.receive(receive);
 				Unpacker unpacker = SingletonMessage.getInstance().createBufferUnpacker(receive);
 				receive.flip();
 				CommandMessage msg = unpacker.read(CommandMessage.class);
+				byte[] val = unpacker.readByteArray();
 				CommandType type = CommandType.getCommandTypeFromId(msg.type);
 				switch (type) {
 				case UPDATE:
 					getLocalDataSegmentManager().getDataSegmentKey(msg.key)
-						.runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey));
+						.runCommand(new Command(type, null, null, val, 0, 0, null, null, reverseKey));
 					break;
 				case PUT:
 					getLocalDataSegmentManager().getDataSegmentKey(msg.key)
-						.runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey));
+						.runCommand(new Command(type, null, null, val, 0, 0, null, null, reverseKey));
 					break;
 				case PEEK:
 					getLocalDataSegmentManager().getDataSegmentKey(msg.key)
@@ -60,7 +63,7 @@
 					break;
 				case REPLY:
 					Command cmd = manager.getAndRemoveCmd(msg.seq);
-					cmd.cs.ids.reply(cmd.receiver, new Command(type, null, null, msg.val, msg.index, msg.seq, null, null, null));
+					cmd.cs.ids.reply(cmd.receiver, new Command(type, null, null, val, msg.index, msg.seq, null, null, null));
 					cmd=null;
 					break;
 				case PING:
--- a/src/main/java/alice/daemon/MulticastConnection.java	Sat Oct 04 08:52:34 2014 +0900
+++ b/src/main/java/alice/daemon/MulticastConnection.java	Sat Oct 18 19:30:13 2014 +0900
@@ -5,7 +5,6 @@
 import java.nio.ByteBuffer;
 import java.nio.channels.DatagramChannel;
 
-import alice.codesegment.SingletonMessage;
 import alice.datasegment.Command;
 
 public class MulticastConnection extends Connection {
@@ -20,10 +19,8 @@
     // may need to add infomation who send on ds.
     @Override
     public synchronized void write(Command cmd){
-        CommandMessage cmdMsg = cmd.convert();
-        ByteBuffer buffer;
+        ByteBuffer buffer = cmd.convert();
         try {
-            buffer = ByteBuffer.wrap(SingletonMessage.getInstance().write(cmdMsg));
             while (buffer.hasRemaining()){
                 dc.send(buffer, sAddr);
             }
--- a/src/main/java/alice/datasegment/Command.java	Sat Oct 04 08:52:34 2014 +0900
+++ b/src/main/java/alice/datasegment/Command.java	Sat Oct 18 19:30:13 2014 +0900
@@ -1,9 +1,10 @@
 package alice.datasegment;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.concurrent.BlockingQueue;
-import org.msgpack.type.Value;
 
+import org.msgpack.MessagePack;
 import alice.codesegment.CodeSegment;
 import alice.codesegment.SingletonMessage;
 import alice.daemon.CommandMessage;
@@ -13,7 +14,7 @@
     public CommandType type;
     public String key;
     public Receiver receiver;
-    public Value val;
+    public byte[] val;
     public int index;
     public int seq;
     public Connection connection; // for remote
@@ -23,7 +24,7 @@
     public Object obj;
     public boolean flag;
 
-    public Command(CommandType cmdType, Receiver receiver, String key, Value val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) {
+    public Command(CommandType cmdType, Receiver receiver, String key, byte[] val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) {
         this.type = cmdType;
         this.receiver = receiver;
         this.key = key;
@@ -36,7 +37,7 @@
         this.flag = false;
     }
 
-    public Command(CommandType cmdType, Receiver receiver, String key, Value val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey, boolean flag) {
+    public Command(CommandType cmdType, Receiver receiver, String key, byte[] val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey, boolean flag) {
         this.type = cmdType;
         this.receiver = receiver;
         this.key = key;
@@ -49,7 +50,7 @@
         this.flag = flag;
     }
 
-    public Command(CommandType cmdType, Receiver receiver, String key, Value val, int index, int seq, Connection connection, CodeSegment cs, String reverseKey, boolean flag) {
+    public Command(CommandType cmdType, Receiver receiver, String key, byte[] val, int index, int seq, Connection connection, CodeSegment cs, String reverseKey, boolean flag) {
         this.type = cmdType;
         this.receiver = receiver;
         this.key = key;
@@ -75,7 +76,7 @@
         this.flag = false;
     }
 
-    public Command(CommandType cmdType, Receiver receiver, String key, Value val, Object obj, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) {
+    public Command(CommandType cmdType, Receiver receiver, String key, byte[] val, Object obj, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) {
         this.type = cmdType;
         this.receiver = receiver;
         this.key = key;
@@ -96,15 +97,42 @@
         }
         return this.type + "\t" + key + "\t" + val + "\tindex=" + index + "\tcs=" + csName;
     }
-    public CommandMessage convert() {
-        if (val==null&&obj!=null){
-            try {
-                this.val = SingletonMessage.getInstance().unconvert(obj);
-            } catch (IOException e) {
-                e.printStackTrace();
+    public ByteBuffer convert() {
+        ByteBuffer buf = null;
+        MessagePack msg = SingletonMessage.getInstance();
+        try {
+            byte[] header = msg.write(new CommandMessage(type.id, index, seq, key, flag));
+            
+            switch (type) {
+            case UPDATE:
+            case PUT:
+            case REPLY:
+            case RESPONSE:
+                byte[] data = null;
+                if (val==null&&obj!=null){
+                    data = msg.write(obj);
+                } else if (val!=null) {
+                    data = val;
+                }
+                byte[] dataSize = msg.write(data.length);
+                
+                buf = ByteBuffer.allocate(header.length+dataSize.length+data.length);
+                buf.put(header);
+                buf.put(dataSize);
+                buf.put(data);                
+                break;
+
+            default:
+                buf = ByteBuffer.allocate(header.length);
+                buf.put(header);
+                break;
             }
+
+            buf.flip();
+        } catch (IOException e) {
+            e.printStackTrace();
         }
-        return new CommandMessage(type.id, index, seq, key, val, flag);
+        return buf;
     }
 
 }
--- a/src/main/java/alice/datasegment/DataSegmentValue.java	Sat Oct 04 08:52:34 2014 +0900
+++ b/src/main/java/alice/datasegment/DataSegmentValue.java	Sat Oct 18 19:30:13 2014 +0900
@@ -1,22 +1,20 @@
 package alice.datasegment;
 
-import org.msgpack.type.Value;
-
 public class DataSegmentValue {
 
     public int index;
-    public Value val;
+    public byte[] val;
     public String from;
     public Object obj;
 
-    public DataSegmentValue(int index, Value val, Object obj,String reverseKey) {
+    public DataSegmentValue(int index, byte[] val, Object obj,String reverseKey) {
         this.index = index;
         this.val = val;
         this.from = reverseKey;
         this.obj = obj;
     }
 
-    public DataSegmentValue(int index, Value val,String reverseKey) {
+    public DataSegmentValue(int index, byte[] val,String reverseKey) {
         this.index = index;
         this.val = val;
         this.from = reverseKey;
--- a/src/main/java/alice/datasegment/ReceiveLocalData.java	Sat Oct 04 08:52:34 2014 +0900
+++ b/src/main/java/alice/datasegment/ReceiveLocalData.java	Sat Oct 18 19:30:13 2014 +0900
@@ -2,7 +2,6 @@
 
 import java.io.IOException;
 
-import org.msgpack.type.ArrayValue;
 import org.msgpack.type.Value;
 
 import alice.codesegment.SingletonMessage;
@@ -11,8 +10,8 @@
 public class ReceiveLocalData implements ReceiverData {
     private Object obj;
 
-    public ReceiveLocalData(Object obj2) {
-        this.obj = obj2;
+    public ReceiveLocalData(Object obj) {
+        this.obj = obj;
     }
 
     public String asString() {
@@ -27,17 +26,12 @@
         return (Float) obj;
     }
 
-    public ArrayValue asArray(){
-        return (ArrayValue) obj;
-    }
-
     @SuppressWarnings("unchecked")
     public <T> T asClass(Class<T> clazz) {
         return (T) obj;
 
     }
 
-    @Override
     public Value getVal() {
         try {
             return SingletonMessage.getInstance().unconvert(obj);
@@ -47,7 +41,6 @@
         return null;
     }
 
-    @Override
     public Object getObj() {
         return obj;
     }
--- a/src/main/java/alice/datasegment/ReceiveRemoteData.java	Sat Oct 04 08:52:34 2014 +0900
+++ b/src/main/java/alice/datasegment/ReceiveRemoteData.java	Sat Oct 18 19:30:13 2014 +0900
@@ -2,53 +2,35 @@
 
 import java.io.IOException;
 
-import org.msgpack.type.ArrayValue;
 import org.msgpack.type.Value;
-import org.msgpack.type.ValueType;
-
 import alice.codesegment.SingletonMessage;
 
 public class ReceiveRemoteData implements ReceiverData {
-    public Value val;
+    public byte[] val; // serialized with MessagePack. need decode
 
-    public ReceiveRemoteData() {
-    }
+    public ReceiveRemoteData() {}
 
-    public ReceiveRemoteData(Value val2) {
-        this.val = val2;
+    public ReceiveRemoteData(byte[] val) {
+        this.val = val;
     }
 
     public String asString() {
-        if (val.getType() == ValueType.RAW) {
-            return val.asRawValue().getString();
-        }
-        return null;
+        return asClass(String.class);
     }
 
     public int asInteger() {
-        if (val.getType() == ValueType.INTEGER) {
-            return val.asIntegerValue().getInt();
-        }
-        return 0;
+        Integer num = asClass(Integer.class);
+        return num!=null ? num : 0;
     }
 
     public Float asFloat() {
-        if (val.getType() == ValueType.FLOAT) {
-            return val.asFloatValue().getFloat();
-        }
-        return 0.0f;
-    }
-
-    public ArrayValue asArray(){
-        if (val.getType() == ValueType.ARRAY){
-            return val.asArrayValue();
-        }
-        return null;
+        Float num = asClass(Float.class);
+        return num!=null ? num : 0.0f;
     }
 
     public <T> T asClass(Class<T> clazz) {
         try {
-            return SingletonMessage.getInstance().convert(val, clazz);
+            return SingletonMessage.getInstance().read(val, clazz);
         } catch (IOException e) {
             e.printStackTrace();
         }
@@ -56,12 +38,10 @@
 
     }
 
-    @Override
     public Value getVal() {
-        return val;
+        return asClass(Value.class);
     }
 
-    @Override
     public Object getObj() {
         return val;
     }
--- a/src/main/java/alice/datasegment/ReceiverData.java	Sat Oct 04 08:52:34 2014 +0900
+++ b/src/main/java/alice/datasegment/ReceiverData.java	Sat Oct 18 19:30:13 2014 +0900
@@ -1,6 +1,5 @@
 package alice.datasegment;
 
-import org.msgpack.type.ArrayValue;
 import org.msgpack.type.Value;
 
 public interface ReceiverData {
@@ -8,7 +7,6 @@
     public String asString();
     public int asInteger();
     public Float asFloat() ;
-    public ArrayValue asArray();
     public <T> T asClass(Class<T> clazz);
     public Value getVal();
     public Object getObj();