view src/main/java/christie/daemon/IncomingTcpConnection.java @ 198:dd3c0ba6a0a6

fix topology manager
author akahori
date Sat, 09 Mar 2019 21:53:37 +0900
parents c6e4d0e4954c
children e486c13d9ea9
line wrap: on
line source

package christie.daemon;


import christie.codegear.CodeGearManager;
import christie.datagear.*;

import christie.datagear.command.CommandType;
import christie.datagear.RemoteMessage;
import christie.datagear.command.RemotePeekCommand;
import christie.datagear.command.RemoteTakeCommand;
import christie.datagear.dg.MessagePackDataGear;
import org.msgpack.MessagePack;
import org.msgpack.unpacker.Unpacker;

import java.io.EOFException;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;

public class IncomingTcpConnection extends Thread {

    RemoteDataGearManager manager;
    CodeGearManager cgm;
    Connection connection;
    private MessagePack packer = new MessagePack();

    public IncomingTcpConnection(Connection connection) {
        this.connection = connection;
        this.cgm = connection.cgm;
    }

    public void setManager(RemoteDataGearManager manager){
        this.manager = manager;
    }

    public void run() {
        Unpacker unpacker = null;
        try {
            unpacker = packer.createUnpacker(connection.socket.getInputStream());
        } catch (IOException e) {
            e.printStackTrace();
        }
        if (unpacker == null) {
            return;
        }
        while (true) {
            try {
                RemoteMessage msg = unpacker.read(RemoteMessage.class);
                CommandType type = CommandType.getCommandTypeFromId(msg.type);
                //System.out.println("read " + msg.key);
                byte[] data;

                switch (type) {
                    case PUT:
                        data = new byte[unpacker.readInt()];
                        connection.socket.getInputStream().read(data);
                        try {
                            MessagePackDataGear dg = new MessagePackDataGear(data, Class.forName(msg.clazz));
                            cgm.getLocalDGM().put(msg.key, dg);
                        } catch (ClassNotFoundException e) {
                            e.printStackTrace();
                        }

                        break;

                    case REMOTEPEEK:
                        try {
                            cgm.getLocalDGM().runCommand(new RemotePeekCommand(msg, connection));
                        } catch (ClassNotFoundException e) {
                            e.printStackTrace();
                        }
                        break;
                    case REMOTETAKE:
                        cgm.getLocalDGM().runCommand(new RemoteTakeCommand(msg, connection));

                        break;
                    case REPLY://待っていたwaitListに渡してcsにセット
                        data = new byte[unpacker.readInt()];
                        connection.socket.getInputStream().read(data);

                        try {
                            MessagePackDataGear dg = new MessagePackDataGear(data, Class.forName(msg.clazz));
                            cgm.getDGM(msg.fromDgmName).resolveWaitCommand(msg.key, dg);
                        } catch (ClassNotFoundException e) {
                            e.printStackTrace();
                        }

                        break;
                    default:
                        break;
                }
            } catch (ClosedChannelException e) {
                return;
            }catch (EOFException e) {
                return;
            } catch (IOException e) {
                e.printStackTrace();
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            }
        }
    }


}