view datagear/RemoteDataGearManager.cs @ 27:efb06874a34e

update RemoteDataGearManager
author riono <e165729@ie.u-ryukyu.ac.jp>
date Tue, 19 Jan 2021 20:18:32 +0900
parents 45ff08d59fda
children 0cd2684e401b
line wrap: on
line source

using System;
using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Christie_net.codegear;
using Christie_net.daemon;
using Christie_net.datagear.command;
using Christie_net.datagear.dg;

namespace Christie_net.datagear {
public class RemoteDataGearManager : DataGearManager {
    private Connection connection;
    private CodeGearManager cgm;
    private bool connect = false;
    private object syncObj = new object();
    
    public RemoteDataGearManager (string address, int port, CodeGearManager cgm) {
        this.cgm = cgm;
        RemoteDataGearManager manager = this;
        Task.Factory.StartNew(() => {
            do {
                try {
                    IPHostEntry host = Dns.GetHostEntry(address);
                    IPAddress ipAddress = host.AddressList[0];
                    IPEndPoint remoteEndPoint = new IPEndPoint(ipAddress, port);
                    Socket socket = new Socket(ipAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
                    socket.Bind(remoteEndPoint);
                    socket.Listen((int) SocketOptionName.MaxConnections);
                    socket.NoDelay = true;

                    Socket listener = socket.Accept();
                    connection = new Connection(listener, cgm);

                    lock (syncObj) {
                        connect = true;
                        Monitor.Pulse(syncObj);
                    }
                } catch { }
            } while (!connect);

            IncomingTcpConnection incoming = new IncomingTcpConnection(connection);
            incoming.SetManager(manager);
            Task.Factory.StartNew(
                () => incoming.Run(),
                    TaskCreationOptions.LongRunning);
            OutboundTcpConnection outbound = new OutboundTcpConnection(connection);
            Task.Factory.StartNew(
                () => outbound.Run(),
                TaskCreationOptions.LongRunning);
        });
    }
    
    public override void Put(string key, object data) {
        Command cm = new CommandBuilder().Init(CommandType.PUT).Key(key)
            .Dg(new DataGear<object>(data)).Build();

        // TODO: javaの方ではconnectがnullになってしまうときがあるらしい
        // コンストラクタで呼び出されるThreadをやめて実効すればいいらしい
        if (!connect) {
            ConnectWait();
        }
    }

    public override void RunCommand(Command cm) {
        waitList.Add(cm);
        CommandType type = cm.type;
        switch (cm.type) {
            case CommandType.PEEK:
                type = CommandType.REMOTEPEEK;
                break;
            case CommandType.TAKE:
                type = CommandType.REMOTETAKE;
                break;
        }

        Command remoteCmd = new CommandBuilder().Init(type).FromDgmName(connection.name).Key(cm.key)
            .Clazz(cm.clazz).Connection(connection).Build();

    }

    public override void ResolveWaitCommand(string key, DataGear<Type> dg) {
        Command cm = waitList.GetAndRemoveCommand(key);
        cm.SetDg(dg);
        cm.Execute();
    }

    public override void Finish() {
        Command cm = new CommandBuilder().Init(CommandType.FINISH).Build();
        connection.SendCommand(cm);
    }

    public override void Close() {
        Command cm = new CommandBuilder().Init(CommandType.CLOSE).Connection(connection).Build();
        connection.SendCommand(cm);
    }

    public override void Shutdown() {
        connection.Close();
        BlockingCollection<Command> queue = connection.sendQueue;
        if (queue.Count == 0) {
            queue.Dispose();
        }
    }

    public void ConnectWait() {
        lock (syncObj) {
            while (!connect) {
                try {
                    Monitor.Wait(syncObj);
                } catch {}
            }
        }
    }
}
}