# HG changeset patch # User sugi # Date 1400329944 -32400 # Node ID 8072df9130c6a4c34701447eee9685894bc4fa19 # Parent 1494d44392a2553ecbc3e1e024f33ad4f006f54b IncomingUdpConnection have to improve. it same IncommingTcpConnection diff -r 1494d44392a2 -r 8072df9130c6 src/main/java/alice/daemon/IncomingTcpConnection.java --- a/src/main/java/alice/daemon/IncomingTcpConnection.java Fri May 16 17:39:33 2014 +0900 +++ b/src/main/java/alice/daemon/IncomingTcpConnection.java Sat May 17 21:32:24 2014 +0900 @@ -21,7 +21,7 @@ public Connection connection; public DataSegmentManager manager; public String reverseKey; - protected LocalDataSegmentManager lmanager = DataSegment.getLocal(); + private LocalDataSegmentManager lmanager = DataSegment.getLocal(); public IncomingTcpConnection(Connection connection, DataSegmentManager manager, String reverseKey) { this.manager = manager; @@ -29,6 +29,10 @@ this.reverseKey = reverseKey; } + public LocalDataSegmentManager getLocalDataSegmentManager(){ + return lmanager; + } + /** * pipeline thread for receiving */ diff -r 1494d44392a2 -r 8072df9130c6 src/main/java/alice/daemon/IncomingUdpConnection.java --- a/src/main/java/alice/daemon/IncomingUdpConnection.java Fri May 16 17:39:33 2014 +0900 +++ b/src/main/java/alice/daemon/IncomingUdpConnection.java Sat May 17 21:32:24 2014 +0900 @@ -1,44 +1,85 @@ package alice.daemon; +import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; import org.msgpack.unpacker.Unpacker; import alice.codesegment.SingletonMessage; import alice.datasegment.Command; import alice.datasegment.CommandType; +import alice.datasegment.DataSegment; +import alice.topology.HostMessage; +import alice.topology.manager.keeparive.RespondData; +import alice.topology.manager.reconnection.SendError; public class IncomingUdpConnection extends IncomingTcpConnection { - public MulticastConnection mConnection; + // receive Data set into LocalDataSegment now but need to set into MulticastDataSegment. + // and this implement has problem. If over 4096 data receive, can not read. + + public MulticastConnection receiver; public IncomingUdpConnection(MulticastConnection mc) { super(null, null, "multicast"); - mConnection = mc; + receiver = mc; } @Override public void run() { - ByteBuffer receive = ByteBuffer.allocate(4096); while (true){ - try { - mConnection.receive(receive); + try { + ByteBuffer receive = ByteBuffer.allocate(4096); + receiver.receive(receive); Unpacker unpacker = SingletonMessage.getInstance().createBufferUnpacker(receive); receive.flip(); CommandMessage msg = unpacker.read(CommandMessage.class); CommandType type = CommandType.getCommandTypeFromId(msg.type); - switch (type){ + System.out.println(msg.val); + switch (type) { case UPDATE: - lmanager.getDataSegmentKey(msg.key) + getLocalDataSegmentManager().getDataSegmentKey(msg.key) .runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); break; case PUT: - lmanager.getDataSegmentKey(msg.key) + getLocalDataSegmentManager().getDataSegmentKey(msg.key) .runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); break; + case PEEK: + getLocalDataSegmentManager().getDataSegmentKey(msg.key) + .runCommand(new Command(type, null, null, null, msg.index, msg.seq, connection, null, null, msg.flag)); + break; + case TAKE: + getLocalDataSegmentManager().getDataSegmentKey(msg.key) + .runCommand(new Command(type, null, null, null, msg.index, msg.seq, connection, null, null, msg.flag)); + break; + case REMOVE: + getLocalDataSegmentManager().getDataSegmentKey(msg.key) + .runCommand(new Command(type, null, null, null, 0, 0, null, null, null)); + break; + case REPLY: + Command cmd = manager.getAndRemoveCmd(msg.seq); + cmd.cs.ids.reply(cmd.receiver, new Command(type, null, null, msg.val, msg.index, msg.seq, null, null, null)); + cmd=null; + break; + case PING: + DataSegment.get(reverseKey).response(msg.key); + break; + case RESPONSE: + DataSegment.getLocal().put(msg.key, new RespondData(reverseKey, System.currentTimeMillis())); + break; default: break; } + + } catch (ClosedChannelException e) { + connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null)); + return; + } catch (EOFException e) { + new SendError(new HostMessage(connection.socket.getInetAddress().getHostName(), connection.socket.getPort())).execute(); + connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null)); + return; } catch (IOException e) { e.printStackTrace(); }