Mercurial > hg > Members > tatsuki > Alice
view src/main/java/alice/daemon/IncomingUdpConnection.java @ 365:8072df9130c6 multicast
IncomingUdpConnection have to improve. it same IncommingTcpConnection
author | sugi |
---|---|
date | Sat, 17 May 2014 21:32:24 +0900 |
parents | 1494d44392a2 |
children | 0c24894db37e |
line wrap: on
line source
package alice.daemon; import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import org.msgpack.unpacker.Unpacker; import alice.codesegment.SingletonMessage; import alice.datasegment.Command; import alice.datasegment.CommandType; import alice.datasegment.DataSegment; import alice.topology.HostMessage; import alice.topology.manager.keeparive.RespondData; import alice.topology.manager.reconnection.SendError; public class IncomingUdpConnection extends IncomingTcpConnection { // receive Data set into LocalDataSegment now but need to set into MulticastDataSegment. // and this implement has problem. If over 4096 data receive, can not read. public MulticastConnection receiver; public IncomingUdpConnection(MulticastConnection mc) { super(null, null, "multicast"); receiver = mc; } @Override public void run() { while (true){ try { ByteBuffer receive = ByteBuffer.allocate(4096); receiver.receive(receive); Unpacker unpacker = SingletonMessage.getInstance().createBufferUnpacker(receive); receive.flip(); CommandMessage msg = unpacker.read(CommandMessage.class); CommandType type = CommandType.getCommandTypeFromId(msg.type); System.out.println(msg.val); switch (type) { case UPDATE: getLocalDataSegmentManager().getDataSegmentKey(msg.key) .runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); break; case PUT: getLocalDataSegmentManager().getDataSegmentKey(msg.key) .runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); break; case PEEK: getLocalDataSegmentManager().getDataSegmentKey(msg.key) .runCommand(new Command(type, null, null, null, msg.index, msg.seq, connection, null, null, msg.flag)); break; case TAKE: getLocalDataSegmentManager().getDataSegmentKey(msg.key) .runCommand(new Command(type, null, null, null, msg.index, msg.seq, connection, null, null, msg.flag)); break; case REMOVE: getLocalDataSegmentManager().getDataSegmentKey(msg.key) .runCommand(new Command(type, null, null, null, 0, 0, null, null, null)); break; case REPLY: Command cmd = manager.getAndRemoveCmd(msg.seq); cmd.cs.ids.reply(cmd.receiver, new Command(type, null, null, msg.val, msg.index, msg.seq, null, null, null)); cmd=null; break; case PING: DataSegment.get(reverseKey).response(msg.key); break; case RESPONSE: DataSegment.getLocal().put(msg.key, new RespondData(reverseKey, System.currentTimeMillis())); break; default: break; } } catch (ClosedChannelException e) { connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null)); return; } catch (EOFException e) { new SendError(new HostMessage(connection.socket.getInetAddress().getHostName(), connection.socket.getPort())).execute(); connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null)); return; } catch (IOException e) { e.printStackTrace(); } } } }