Mercurial > hg > Members > tatsuki > Alice
diff src/main/java/alice/daemon/IncomingTcpConnection.java @ 345:8f71c3e6f11d
Change directory structure Maven standard
author | sugi |
---|---|
date | Wed, 16 Apr 2014 18:26:07 +0900 |
parents | |
children | 11ba40caa93b |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/alice/daemon/IncomingTcpConnection.java Wed Apr 16 18:26:07 2014 +0900 @@ -0,0 +1,101 @@ +package alice.daemon; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.channels.ClosedChannelException; + +import org.msgpack.unpacker.Unpacker; + +import alice.codesegment.SingletonMessage; +import alice.datasegment.Command; +import alice.datasegment.CommandType; +import alice.datasegment.DataSegment; +import alice.datasegment.DataSegmentKey; +import alice.datasegment.DataSegmentManager; +import alice.datasegment.LocalDataSegmentManager; +import alice.topology.HostMessage; +import alice.topology.manager.keeparive.RespondData; +import alice.topology.manager.reconnection.SendError; + +public class IncomingTcpConnection extends Thread { + + public Connection connection; + public DataSegmentManager manager; + public String reverseKey; + private LocalDataSegmentManager lmanager = DataSegment.getLocal(); + + public IncomingTcpConnection(Connection connection, DataSegmentManager manager, String reverseKey) { + this.manager = manager; + this.connection = connection; + this.reverseKey = reverseKey; + } + + /** + * pipeline thread for receiving + */ + public void run() { + Unpacker unpacker = this.getUnpacker(); + if (unpacker == null) { + return; + } + while (true) { + try { + CommandMessage msg = unpacker.read(CommandMessage.class); + CommandType type = CommandType.getCommandTypeFromId(msg.type); + switch (type) { + case UPDATE: + getDataSegmentKey(msg).runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); + break; + case PUT: + getDataSegmentKey(msg).runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); + break; + case PEEK: + getDataSegmentKey(msg).runCommand(new Command(type, null, null, null, msg.index, msg.seq, connection, null, null, msg.flag)); + break; + case TAKE: + getDataSegmentKey(msg).runCommand(new Command(type, null, null, null, msg.index, msg.seq, connection, null, null, msg.flag)); + break; + case REMOVE: + getDataSegmentKey(msg).runCommand(new Command(type, null, null, null, 0, 0, null, null, null)); + break; + case REPLY: + Command cmd = manager.getAndRemoveCmd(msg.seq); + cmd.cs.ids.reply(cmd.receiver, new Command(type, null, null, msg.val, msg.index, msg.seq, null, null, null)); + cmd=null; + break; + case PING: + DataSegment.get(reverseKey).response(msg.key); + break; + case RESPONSE: + DataSegment.getLocal().put(msg.key, new RespondData(reverseKey, System.currentTimeMillis())); + break; + default: + break; + } + } catch (ClosedChannelException e) { + connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null)); + return; + } catch (EOFException e) { + new SendError(new HostMessage(connection.socket.getInetAddress().getHostName(), connection.socket.getPort())).execute(); + connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null)); + return; + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + private Unpacker getUnpacker() { + Unpacker unpacker = null; + try { + unpacker = SingletonMessage.getInstance().createUnpacker(connection.socket.getInputStream()); + } catch (IOException e2) { + e2.printStackTrace(); + } + return unpacker; + } + + private DataSegmentKey getDataSegmentKey(CommandMessage msg) { + return lmanager.getDataSegmentKey(msg.key); + } +}