Mercurial > hg > Database > Alice
view src/main/java/alice/daemon/IncomingTcpConnection.java @ 547:e91a574b69de dispose
remove index
author | Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp> |
---|---|
date | Tue, 18 Aug 2015 16:15:17 +0900 |
parents | f3f7e256ee03 |
children | 9f577ff4f7ea |
line wrap: on
line source
package alice.daemon; import java.io.EOFException; import java.io.IOException; import java.nio.channels.ClosedChannelException; import alice.datasegment.*; import org.msgpack.MessagePack; import org.msgpack.unpacker.Unpacker; import alice.topology.manager.keeparive.RespondData; public class IncomingTcpConnection extends Thread { protected AliceDaemon alice; protected MessagePack packer; private Connection connection; protected DataSegmentManager manager; protected String reverseKey; private LocalDataSegmentManager lmanager; private CompressedLocalDataSegmentManager compressedlmanager; public IncomingTcpConnection(DataSegmentManager manager, AliceDaemon alice) { this.manager = manager; this.alice = alice; this.lmanager = alice.dataSegment.getLocal(); this.compressedlmanager = alice.dataSegment.getCompressedLocal(); this.packer = alice.packer; } public IncomingTcpConnection(Connection connection, DataSegmentManager manager, String reverseKey, AliceDaemon alice) { this.manager = manager; this.connection = connection; this.reverseKey = reverseKey; this.alice = alice; this.lmanager = alice.dataSegment.getLocal(); this.compressedlmanager = alice.dataSegment.getCompressedLocal(); this.packer = alice.packer; } public LocalDataSegmentManager getLocalDataSegmentManager(){ return lmanager; } public CompressedLocalDataSegmentManager getCompressedLocalDataSegmentManager(){ return compressedlmanager; } /** * pipeline thread for receiving */ public void run() { Unpacker unpacker = null; try { unpacker = packer.createUnpacker(connection.socket.getInputStream()); } catch (IOException e) { e.printStackTrace(); } if (unpacker == null) { return; } while (true) { try { Command cmd = null; ReceiveData rData = null; CommandMessage msg = unpacker.read(CommandMessage.class);///read header CommandType type = alice.getCommandTypeFromId(msg.type); switch (type) { case UPDATE: case PUT: if (msg.compressed) { rData = new ReceiveData(packer.read(unpacker.getSerializedByteArray(unpacker.readInt()), byte[].class), true, msg.dataSize); } else { rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()), false, msg.dataSize); } if (msg.setTime) { rData.setTime = true; rData.time = msg.time; rData.depth = msg.depth; } cmd = new Command(type, null, null, rData, 0, null, null, reverseKey); cmd.setCompressFlag(msg.compressed); if (rData.compressed()){ compressedlmanager.getDataSegmentKey(msg.key).runCommand(cmd); } else { lmanager.getDataSegmentKey(msg.key).runCommand(cmd); } break; case PEEK: case TAKE: cmd = new Command(type, null, null, null, msg.seq, null, null, connection); cmd.setCompressFlag(msg.compressed); if (msg.compressed){ compressedlmanager.getDataSegmentKey(msg.key).runCommand(cmd); } else { lmanager.getDataSegmentKey(msg.key).runCommand(cmd); } break; case REMOVE: cmd = new Command(type, null, null, null, 0, null, null, ""); lmanager.getDataSegmentKey(msg.key).runCommand(cmd);//ToDo:fix break; case REPLY: cmd = manager.getAndRemoveCmd(msg.seq); if (msg.compressed) { rData = new ReceiveData(packer.read(unpacker.getSerializedByteArray(unpacker.readInt()), byte[].class), true, msg.dataSize); } else { rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()), false, msg.dataSize); } Command rCmd = new Command(type, null, null, rData, msg.seq, null, null, ""); cmd.setCompressFlag(msg.compressed); cmd.cs.ids.reply(cmd.receiver, rCmd); break; case PING: if (alice.dataSegment.contains(reverseKey)) alice.dataSegment.get(reverseKey).response(msg.key); break; case RESPONSE: rData = new ReceiveData(new RespondData(reverseKey, System.currentTimeMillis())); alice.dataSegment.getLocal().put(msg.key, rData, false); break; default: break; } } catch (ClosedChannelException e) { return; } catch (EOFException e) { return; } catch (IOException e) { return; } } } public void setReverseKey(String name) { this.reverseKey = name; setName(name+"-IncomingTcp"); } }