view src/main/java/alice/daemon/IncomingTcpConnection.java @ 547:e91a574b69de dispose

remove index
author Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
date Tue, 18 Aug 2015 16:15:17 +0900
parents f3f7e256ee03
children 9f577ff4f7ea
line wrap: on
line source

package alice.daemon;

import java.io.EOFException;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;

import alice.datasegment.*;
import org.msgpack.MessagePack;
import org.msgpack.unpacker.Unpacker;

import alice.topology.manager.keeparive.RespondData;

public class IncomingTcpConnection extends Thread {

    protected AliceDaemon alice;
    protected MessagePack packer;
    private Connection connection;
    protected DataSegmentManager manager;
    protected String reverseKey;
    private LocalDataSegmentManager lmanager;
    private CompressedLocalDataSegmentManager compressedlmanager;

    public IncomingTcpConnection(DataSegmentManager manager, AliceDaemon alice) {
        this.manager = manager;
        this.alice = alice;
        this.lmanager = alice.dataSegment.getLocal();
        this.compressedlmanager = alice.dataSegment.getCompressedLocal();
        this.packer = alice.packer;
    }
    

    public IncomingTcpConnection(Connection connection, DataSegmentManager manager, String reverseKey, AliceDaemon alice) {
        this.manager = manager;
        this.connection = connection;
        this.reverseKey = reverseKey;
        this.alice = alice;
        this.lmanager = alice.dataSegment.getLocal();
        this.compressedlmanager = alice.dataSegment.getCompressedLocal();
        this.packer = alice.packer;
    }

    public LocalDataSegmentManager getLocalDataSegmentManager(){
        return lmanager;
    }

    public CompressedLocalDataSegmentManager getCompressedLocalDataSegmentManager(){
        return compressedlmanager;
    }

    /**
     * pipeline thread for receiving
     */
    public void run() {
        Unpacker unpacker = null;
        try {
            unpacker = packer.createUnpacker(connection.socket.getInputStream());
        } catch (IOException e) {
            e.printStackTrace();
        }
        if (unpacker == null) {
            return;
        }
        while (true) {
            try {
                Command cmd = null;
                ReceiveData rData = null;
                CommandMessage msg = unpacker.read(CommandMessage.class);///read header
                CommandType type = alice.getCommandTypeFromId(msg.type);
                switch (type) {
                case UPDATE:
                case PUT:
                    if (msg.compressed) {
                        rData = new ReceiveData(packer.read(unpacker.getSerializedByteArray(unpacker.readInt()), byte[].class), true, msg.dataSize);
                    } else {
                        rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()), false, msg.dataSize);
                    }

                    if (msg.setTime) {
                        rData.setTime = true;
                        rData.time = msg.time;
                        rData.depth = msg.depth;
                    }

                    cmd = new Command(type, null, null, rData, 0, null, null, reverseKey);
                    cmd.setCompressFlag(msg.compressed);

                    if (rData.compressed()){
                        compressedlmanager.getDataSegmentKey(msg.key).runCommand(cmd);
                    } else {
                        lmanager.getDataSegmentKey(msg.key).runCommand(cmd);
                    }

                    break;
                case PEEK:
                case TAKE:
                    cmd = new Command(type, null, null, null,  msg.seq, null, null, connection);
                    cmd.setCompressFlag(msg.compressed);

                    if (msg.compressed){
                        compressedlmanager.getDataSegmentKey(msg.key).runCommand(cmd);
                    } else {
                        lmanager.getDataSegmentKey(msg.key).runCommand(cmd);
                    }

                    break;
                case REMOVE:
                    cmd = new Command(type, null, null, null, 0, null, null, "");
                    lmanager.getDataSegmentKey(msg.key).runCommand(cmd);//ToDo:fix
                    break;
                case REPLY:
                    cmd = manager.getAndRemoveCmd(msg.seq);

                    if (msg.compressed) {
                        rData = new ReceiveData(packer.read(unpacker.getSerializedByteArray(unpacker.readInt()), byte[].class), true, msg.dataSize);
                    } else {
                        rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()), false, msg.dataSize);
                    }

                    Command rCmd = new Command(type, null, null, rData,  msg.seq, null, null, "");
                    cmd.setCompressFlag(msg.compressed);
                    cmd.cs.ids.reply(cmd.receiver, rCmd);
                    break;
                case PING:
                    if (alice.dataSegment.contains(reverseKey))
                        alice.dataSegment.get(reverseKey).response(msg.key);
                    break;
                case RESPONSE:
                    rData = new ReceiveData(new RespondData(reverseKey, System.currentTimeMillis()));
                    alice.dataSegment.getLocal().put(msg.key, rData, false);
                    break;
                default:
                    break;
                }
            } catch (ClosedChannelException e) {
                return;
            } catch (EOFException e) {
                return;
            } catch (IOException e) {
                return;
            }
        }
    }

    public void setReverseKey(String name) {
        this.reverseKey = name;
        setName(name+"-IncomingTcp");
    }
}