changeset 454:f8a8f869f016 dispose

bug fix
author sugi
date Tue, 28 Oct 2014 17:34:26 +0900
parents 8470db2523d5
children b004f62b83e5
files src/main/java/alice/daemon/IncomingUdpConnection.java
diffstat 1 files changed, 75 insertions(+), 68 deletions(-) [+]
line wrap: on
line diff
--- a/src/main/java/alice/daemon/IncomingUdpConnection.java	Tue Oct 28 17:25:30 2014 +0900
+++ b/src/main/java/alice/daemon/IncomingUdpConnection.java	Tue Oct 28 17:34:26 2014 +0900
@@ -15,75 +15,82 @@
 import alice.topology.manager.keeparive.RespondData;
 
 public class IncomingUdpConnection extends IncomingTcpConnection {
-	// receive Data set into LocalDataSegment now but need to set into MulticastDataSegment.
-	// and this implement has problem. If over 65507 data receive, can not read.
+    // receive Data set into LocalDataSegment now but need to set into MulticastDataSegment.
+    // and this implement has problem. If over 65507 data receive, can not read.
     // but  Max data length is 65507 because of the max length of UDP payload
-	
-	public MulticastConnection receiver;
-	public MulticastConnection sender;
+
+    public MulticastConnection receiver;
+    public MulticastConnection sender;
+
+    public IncomingUdpConnection(MulticastConnection s, MulticastConnection r, DataSegmentManager manager) {
+        super(manager);
+        receiver = r;
+        sender = s;
+        reverseKey = "multicast";
+    }
 
-	public IncomingUdpConnection(MulticastConnection s, MulticastConnection r, DataSegmentManager manager) {
-		super(manager);
-		receiver = r;
-		sender = s;
-		reverseKey = "multicast";
-	}
-	
-	@Override
-	public void run() {
-		while (true){
-			try {
-			    Command cmd = null;
-			    byte[] val = null;
-			    // Max data length is 65507 because of the max length of UDP payload
-				ByteBuffer receive = ByteBuffer.allocate(65507); 
-				receiver.receive(receive);
-				Unpacker unpacker = SingletonMessage.getInstance().createBufferUnpacker(receive);
-				receive.flip();
-				CommandMessage msg = unpacker.read(CommandMessage.class);
-				CommandType type = CommandType.getCommandTypeFromId(msg.type);
-				switch (type) {
-				case UPDATE:
-				case PUT:				    
-				    val = new byte[unpacker.readInt()];				    
-				    receive.get(val);
-				    cmd = new Command(type, null, null, val, 0, 0, null, null, reverseKey);
-					getLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd);
-					break;
-				case PEEK:
-				case TAKE:
-				    cmd = new Command(type, null, null, null, msg.index, msg.seq, null, null ,sender);
-				    cmd.setQuickFlag(msg.quickFlag);
-				    getLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd);
-					break;
-				case REMOVE:
-				    cmd = new Command(type, null, null, null, 0, 0, null, null, "");
-					getLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd);
-					break;
-				case REPLY:
-					cmd = manager.getAndRemoveCmd(msg.seq);
-					val = new byte[unpacker.readInt()];
-					receive.get(val);
-					cmd.cs.ids.reply(cmd.receiver, new Command(type, null, null, val, msg.index, msg.seq, null, null, ""));
-					break;
-				case PING:
-					DataSegment.get(reverseKey).response(msg.key);
-					break;
-				case RESPONSE:
-					DataSegment.getLocal().put(msg.key, new RespondData(reverseKey, System.currentTimeMillis()));
-					break;
-				default:
-					break;
-				}
-				
-			} catch (ClosedChannelException e) {
-				return;
-			} catch (EOFException e) {
-				return;
-			} catch (IOException e) {
-				e.printStackTrace();
-			}
-		}
-	}
+    @Override
+    public void run() {
+        while (true){
+            try {
+                Command cmd = null;
+                byte[] val = null;
+                // Max data length is 65507 because of the max length of UDP payload
+                ByteBuffer receive = ByteBuffer.allocate(65507); 
+                receiver.receive(receive);
+                Unpacker unpacker = SingletonMessage.getInstance().createBufferUnpacker(receive);
+                receive.flip();
+                CommandMessage msg = unpacker.read(CommandMessage.class);
+                CommandType type = CommandType.getCommandTypeFromId(msg.type);
+                switch (type) {
+                case UPDATE:
+                case PUT:				    
+                    val = new byte[unpacker.readInt()];				    
+                    receive.get(val);
+                    cmd = new Command(type, null, null, val, 0, 0, null, null, reverseKey);
+                    // these flags express DataSegment status
+                    cmd.setCompressFlag(msg.compressed);
+                    cmd.setSerializeFlag(msg.serialized);
+                    getLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd);
+                    break;
+                case PEEK:
+                case TAKE:
+                    cmd = new Command(type, null, null, null, msg.index, msg.seq, null, null ,sender);
+                    cmd.setQuickFlag(msg.quickFlag);
+                    getLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd);
+                    break;
+                case REMOVE:
+                    cmd = new Command(type, null, null, null, 0, 0, null, null, "");
+                    getLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd);
+                    break;
+                case REPLY:
+                    cmd = manager.getAndRemoveCmd(msg.seq);
+                    val = new byte[unpacker.readInt()];
+                    receive.get(val);
+                    Command rCmd = new Command(type, null, null, val, msg.index, msg.seq, null, null, "");
+                    // these flags express DataSegment status
+                    rCmd.setCompressFlag(msg.compressed);
+                    rCmd.setSerializeFlag(msg.serialized);
+                    cmd.cs.ids.reply(cmd.receiver, rCmd);
+                    break;
+                case PING:
+                    DataSegment.get(reverseKey).response(msg.key);
+                    break;
+                case RESPONSE:
+                    DataSegment.getLocal().put(msg.key, new RespondData(reverseKey, System.currentTimeMillis()));
+                    break;
+                default:
+                    break;
+                }
+
+            } catch (ClosedChannelException e) {
+                return;
+            } catch (EOFException e) {
+                return;
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+    }
 
 }