view src/main/java/alice/daemon/IncomingTcpConnection.java @ 445:86b74532e66c dispose

change Protocol
author sugi
date Sun, 26 Oct 2014 18:21:48 +0900
parents 2f2623484b77
children a91890dff56e
line wrap: on
line source

package alice.daemon;

import java.io.EOFException;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;

import org.msgpack.unpacker.MessagePackUnpacker;
import org.msgpack.unpacker.Unpacker;

import alice.codesegment.SingletonMessage;
import alice.datasegment.Command;
import alice.datasegment.CommandType;
import alice.datasegment.DataSegment;
import alice.datasegment.DataSegmentManager;
import alice.datasegment.LocalDataSegmentManager;
import alice.topology.manager.keeparive.RespondData;

public class IncomingTcpConnection extends Thread {

    public Connection connection;
    public DataSegmentManager manager;
    public String reverseKey;
    private LocalDataSegmentManager lmanager = DataSegment.getLocal();

    public IncomingTcpConnection(DataSegmentManager manager) {
        this.manager = manager;
    }

    public IncomingTcpConnection(Connection connection, DataSegmentManager manager, String reverseKey) {
        this.manager = manager;
        this.connection = connection;
        this.reverseKey = reverseKey;
    }

    public LocalDataSegmentManager getLocalDataSegmentManager(){
        return lmanager;
    }

    /**
     * pipeline thread for receiving
     */
    public void run() {
        Unpacker unpacker = null;
        try {
            unpacker = SingletonMessage.getInstance().createUnpacker(connection.socket.getInputStream());
        } catch (IOException e) {
            e.printStackTrace();
        }
        if (unpacker == null) {
            return;
        }
        while (true) {
            try {
                byte[] val = null;
                CommandMessage msg = unpacker.read(CommandMessage.class);
                CommandType type = CommandType.getCommandTypeFromId(msg.type);
                switch (type) {
                case UPDATE:
                    val = getSerializedByteArray(unpacker);
                    lmanager.getDataSegmentKey(msg.key)
                        .runCommand(new Command(type, null, null, val, 0, 0, null, null, reverseKey));
                    break;
                case PUT:
                    val = getSerializedByteArray(unpacker);
                    lmanager.getDataSegmentKey(msg.key)
                        .runCommand(new Command(type, null, null, val, 0, 0, null, null, reverseKey));
                    break;
                case PEEK:
                    lmanager.getDataSegmentKey(msg.key)
                        .runCommand(new Command(type, null, null, null, msg.index, msg.seq, connection, null, null, msg.flag));
                    break;
                case TAKE:
                    lmanager.getDataSegmentKey(msg.key)
                        .runCommand(new Command(type, null, null, null, msg.index, msg.seq, connection, null, null, msg.flag));
                    break;	
                case REMOVE:
                    lmanager.getDataSegmentKey(msg.key)
                        .runCommand(new Command(type, null, null, null, 0, 0, null, null, null));
                    break;
                case REPLY:
                    Command cmd = manager.getAndRemoveCmd(msg.seq);
                    val = getSerializedByteArray(unpacker);
                    cmd.cs.ids.reply(cmd.receiver, new Command(type, null, null, val, msg.index, msg.seq, null, null, null));
                    cmd=null;
                    break;
                case PING:
                    DataSegment.get(reverseKey).response(msg.key);
                    break;
                case RESPONSE:
                    DataSegment.getLocal().put(msg.key, new RespondData(reverseKey, System.currentTimeMillis()));
                    break;
                default:
                    break;
                }
            } catch (ClosedChannelException e) {
                connection.putConnectionInfo();
                connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null));
                return;
            } catch (EOFException e) {
                connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null));
                return;
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        
    }

    private byte[] getSerializedByteArray(Unpacker unpacker) {
        int len;
        byte[] b = null;
        try {
            len = unpacker.readInt();
            b = ((MessagePackUnpacker) unpacker).getSerializedByteArray(len);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return b;
    }
}