# HG changeset patch # User sugi # Date 1400229573 -32400 # Node ID 1494d44392a2553ecbc3e1e024f33ad4f006f54b # Parent 3c9446fa4073fff8cd5491e6a0048d210452b74a succeed to receive and send DataSegment on multicast diff -r 3c9446fa4073 -r 1494d44392a2 src/main/java/alice/daemon/IncomingUdpConnection.java --- a/src/main/java/alice/daemon/IncomingUdpConnection.java Fri May 16 16:50:38 2014 +0900 +++ b/src/main/java/alice/daemon/IncomingUdpConnection.java Fri May 16 17:39:33 2014 +0900 @@ -1,8 +1,13 @@ package alice.daemon; +import java.io.IOException; +import java.nio.ByteBuffer; + import org.msgpack.unpacker.Unpacker; import alice.codesegment.SingletonMessage; +import alice.datasegment.Command; +import alice.datasegment.CommandType; public class IncomingUdpConnection extends IncomingTcpConnection { public MulticastConnection mConnection; @@ -12,9 +17,32 @@ mConnection = mc; } - private Unpacker getUnpacker() { - //Unpacker unpacker = SingletonMessage.getInstance().createUnpacker(); - return null; - + @Override + public void run() { + ByteBuffer receive = ByteBuffer.allocate(4096); + while (true){ + try { + mConnection.receive(receive); + Unpacker unpacker = SingletonMessage.getInstance().createBufferUnpacker(receive); + receive.flip(); + CommandMessage msg = unpacker.read(CommandMessage.class); + CommandType type = CommandType.getCommandTypeFromId(msg.type); + switch (type){ + case UPDATE: + lmanager.getDataSegmentKey(msg.key) + .runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); + break; + case PUT: + lmanager.getDataSegmentKey(msg.key) + .runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); + break; + default: + break; + } + } catch (IOException e) { + e.printStackTrace(); + } + } } + } diff -r 3c9446fa4073 -r 1494d44392a2 src/main/java/alice/datasegment/MulticastDataSegmentManager.java --- a/src/main/java/alice/datasegment/MulticastDataSegmentManager.java Fri May 16 16:50:38 2014 +0900 +++ b/src/main/java/alice/datasegment/MulticastDataSegmentManager.java Fri May 16 17:39:33 2014 +0900 @@ -69,7 +69,10 @@ @Override public void put(String key, Object val) { - + Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null); + sender.sendCommand(cmd); // put command on the transmission thread + if (logger.isDebugEnabled()) + logger.debug(cmd.getCommandString()); } @Override