view datagear/RemoteDataGearManager.cs @ 71:1169915705ab default tip

fix TopologyNode connect
author KaitoMaeshiro <aosskaito@cr.ie.u-ryukyu.ac.jp>
date Sun, 06 Feb 2022 16:47:41 +0900
parents 976d43003487
children
line wrap: on
line source

using System;
using System.Collections.Concurrent;
using System.Net;
using System.Net.Http;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Christie_net.codegear;
using Christie_net.daemon;
using Christie_net.datagear.command;
using Christie_net.datagear.dg;
using Christie_net.Test.Example.RemoteTake;
using Christie_net.topology;

namespace Christie_net.datagear {
public class RemoteDataGearManager : DataGearManager {
    private Connection connection = null;
    private CodeGearManager cgm;
    private bool connect = false;
    private object syncObj = new object();

    public RemoteDataGearManager(string dgmname, string address, int port, CodeGearManager cgm) {
        this.cgm = cgm;
        RemoteDataGearManager manager = this;
        //Task.Factory.StartNew(() => {
        System.Diagnostics.StackTrace t = new System.Diagnostics.StackTrace();
        Thread thread = new Thread( () => {
            TcpClient client;
            HostMessage hostMessage = new HostMessage();
            do {
                if (port == 0) {
                    Console.WriteLine("port failure" + t);
                    return;
                }
                try {
                    IPHostEntry host = Dns.GetHostEntry(address);
                    
                    client = new TcpClient(AddressFamily.InterNetworkV6);
                    client.Client.DualMode = true;

                    foreach (IPAddress ipAddress in host.AddressList) {
                        IPEndPoint remoteEndPoint = new IPEndPoint(ipAddress, port);
                        
                        Console.WriteLine("try to connect " +  ipAddress + " " +address + " " + port);
                        
                        try {
                            client.Connect(remoteEndPoint);
                            client.NoDelay = true;
                            
                            connection = new Connection(client.Client, cgm);
                            connection.name = dgmname;
                            hostMessage.setHostAndPort(client.Client.LocalEndPoint.ToString(),cgm.localPort);

                            lock (syncObj) {
                                connect = true;
                                Monitor.Pulse(syncObj);
                            }
                            
                            break;
                        }catch{
                            
                        }
                    }

                    if (!connect) {
                        Console.WriteLine("not connected " + host);
                        return;
                    }
                    
                } catch (Exception e) {
                    Console.WriteLine(e.ToString());
                }
            } while (!connect);

            IncomingTcpConnection incoming = new IncomingTcpConnection(connection);
            incoming.SetManager(manager);
            Thread incomingThread = new Thread(incoming.Run);
            incomingThread.Name = dgmname + "-IncomingTcp";
            //incomingThread.Priority = ThreadPriority.Highest;
            incomingThread.Start();

            
            OutboundTcpConnection outbound = new OutboundTcpConnection(connection);
            Thread outboundThread = new Thread(outbound.Run);
            outboundThread.Name = dgmname + "-OutboundTcp";
            //outboundThread.Priority = ThreadPriority.Highest;
            outboundThread.Start();
            cgm.GetLocalDGM().Put("hostMassage",hostMessage);
        });

        thread.Name = "Connect-" + dgmname;
        thread.Start();
    }

    public override void Put(string key, object data) {
        Command cm = new CommandBuilder().Init(CommandType.PUT).Key(key)
            .Dg(new DataGear<object>(data)).Build();

        // TODO: javaの方ではconnectがnullになってしまうときがあるらしい
        // コンストラクタで呼び出されるThreadをやめて実効すればいいらしい
        if (!connect) {
            ConnectWait();
        }
        connection.Write(cm);
    }

    public override void RunCommand(Command cm) {
        waitList.Add(cm);
        CommandType type = cm.type;
        switch (cm.type) {
            case CommandType.PEEK:
                type = CommandType.REMOTEPEEK;
                break;
            case CommandType.TAKE:
                type = CommandType.REMOTETAKE;
                break;
        }

        Command remoteCmd = new CommandBuilder().Init(type).FromDgmName(connection.name).Key(cm.key)
            .Clazz(cm.clazz).Connection(connection).Build();

        connection.Write(remoteCmd);
    }

    public override void ResolveWaitCommand(string key, DataGear<object> dg) {
        Command cm = waitList.GetAndRemoveCommand(key);
        cm.SetDg(dg);
        cm.Execute();
    }

    public override void Finish() {
        Command cm = new CommandBuilder().Init(CommandType.FINISH).Build();
        connection.SendCommand(cm);
    }

    public override void Close() {
        Command cm = new CommandBuilder().Init(CommandType.CLOSE).Connection(connection).Build();
        connection.SendCommand(cm);
    }

    public override void Shutdown() {
        connection.Close();
        BlockingCollection<Command> queue = connection.sendQueue;
        if (queue.Count == 0) {
            queue.Dispose();
        }
    }

    public void ConnectWait() {
        lock (syncObj) {
            while (!connect) {
                try {
                    Monitor.Wait(syncObj);
                } catch {}
            }
        }
    }
}
}