view src/main/java/christie/datagear/RemoteDataGearManager.java @ 210:9a1d9c99e708

fix Command run to execute
author akahori
date Mon, 11 Mar 2019 16:45:37 +0900
parents 2ecb3a93b8ae
children e486c13d9ea9
line wrap: on
line source

package christie.datagear;

import christie.codegear.CodeGearManager;
import christie.daemon.Connection;
import christie.daemon.IncomingTcpConnection;
import christie.daemon.OutboundTcpConnection;
import christie.datagear.command.*;
import christie.datagear.dg.DataGear;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.util.concurrent.LinkedBlockingQueue;

import static java.lang.Thread.MAX_PRIORITY;

public class RemoteDataGearManager extends DataGearManager{
    private Connection connection;
    private CodeGearManager cgm;
    boolean connect = false;
    Object lock = new Object();

    public RemoteDataGearManager(final String dgmName, final String address, final int port, CodeGearManager cgm) {
        this.cgm = cgm;
        RemoteDataGearManager manager = this;
        new Thread("Connect-" + dgmName) {
            public void run() {
                do {
                    try {
                        SocketChannel sc = SocketChannel.open(new InetSocketAddress(address, port));
                        connection = new Connection(sc.socket(), cgm);
                        connection.name = dgmName;
                        connection.socket.setTcpNoDelay(true);


                        synchronized (lock){
                            connect = true;
                            lock.notify();
                        }
                    } catch (IOException e) {
                        try {
                            Thread.sleep(50);
                        } catch (InterruptedException e1) {
                            e1.printStackTrace();
                        }
                    }
                } while (!connect);
                IncomingTcpConnection in = new IncomingTcpConnection(connection);
                in.setManager(manager);
                in.setName(dgmName+"-IncomingTcp");
                in.setPriority(MAX_PRIORITY);
                in.start();
                OutboundTcpConnection out = new OutboundTcpConnection(connection);
                out.setName(dgmName + "-OutboundTcp");
                out.setPriority(MAX_PRIORITY);
                out.start();
            }
        }.start();

    }

    @Override
    public void put(String key, Object data) {

        Command cm = new PutCommand(0, null, key, new DataGear(data));
        // これ入れないと, connectionがnullの時があるのでしょうがなくwait.
        // コンストラクタで呼び出されるThreadをやめて実効すればいんだけどね...
        if(!connect) connectWait();

        connection.write(cm);
    }

    @Override
    public void runCommand(Command cm) {
        waitList.add(cm);
        Command remoteCmd = null;
        switch (cm.type) {
            case PEEK:
                remoteCmd = new RemotePeekCommand(connection.name, cm, connection);
                break;
            case TAKE:
                remoteCmd = new RemoteTakeCommand(connection.name, cm, connection);
                break;
        }
        connection.write(remoteCmd);
    }

    @Override
    public void resolveWaitCommand(String key, DataGear dg) {
        Command cm = waitList.getAndRemoveCommand(key);
        cm.setDg(dg);
        cm.execute();
    }


    @Override
    public void finish() {
        Command cmd = new FinishCommand();
        connection.sendCommand(cmd);
    }

    @Override
    public void close() {
        Command cmd = new CloseCommand();
        connection.sendCommand(cmd);
    }

    @Override
    public void shutdown() {
        connection.close();
        LinkedBlockingQueue<Command> queue = connection.sendQueue;
        if (!queue.isEmpty()) queue.clear();
    }

    public void connectWait(){
        synchronized (lock){
            while(!connect){
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                }
            }
        }
    }


}