using System; using System.Collections.Concurrent; using System.Net; using System.Net.Http; using System.Net.Sockets; using System.Threading; using System.Threading.Tasks; using Christie_net.codegear; using Christie_net.daemon; using Christie_net.datagear.command; using Christie_net.datagear.dg; using Christie_net.Test.Example.RemoteTake; using Christie_net.topology; namespace Christie_net.datagear { public class RemoteDataGearManager : DataGearManager { private Connection connection = null; private CodeGearManager cgm; private bool connect = false; private object syncObj = new object(); public RemoteDataGearManager(string dgmname, string address, int port, CodeGearManager cgm) { this.cgm = cgm; RemoteDataGearManager manager = this; //Task.Factory.StartNew(() => { System.Diagnostics.StackTrace t = new System.Diagnostics.StackTrace(); Thread thread = new Thread( () => { TcpClient client; HostMessage hostMessage = new HostMessage(); do { if (port == 0) { Console.WriteLine("port failure" + t); return; } try { IPHostEntry host = Dns.GetHostEntry(address); client = new TcpClient(AddressFamily.InterNetworkV6); client.Client.DualMode = true; foreach (IPAddress ipAddress in host.AddressList) { IPEndPoint remoteEndPoint = new IPEndPoint(ipAddress, port); Console.WriteLine("try to connect " + ipAddress + " " +address + " " + port); try { client.Connect(remoteEndPoint); client.NoDelay = true; connection = new Connection(client.Client, cgm); connection.name = dgmname; hostMessage.setHostAndPort(client.Client.LocalEndPoint.ToString(),cgm.localPort); lock (syncObj) { connect = true; Monitor.Pulse(syncObj); } break; }catch{ } } if (!connect) { Console.WriteLine("not connected " + host); return; } } catch (Exception e) { Console.WriteLine(e.ToString()); } } while (!connect); IncomingTcpConnection incoming = new IncomingTcpConnection(connection); incoming.SetManager(manager); Thread incomingThread = new Thread(incoming.Run); incomingThread.Name = dgmname + "-IncomingTcp"; //incomingThread.Priority = ThreadPriority.Highest; incomingThread.Start(); OutboundTcpConnection outbound = new OutboundTcpConnection(connection); Thread outboundThread = new Thread(outbound.Run); outboundThread.Name = dgmname + "-OutboundTcp"; //outboundThread.Priority = ThreadPriority.Highest; outboundThread.Start(); cgm.GetLocalDGM().Put("hostMassage",hostMessage); }); thread.Name = "Connect-" + dgmname; thread.Start(); } public override void Put(string key, object data) { Command cm = new CommandBuilder().Init(CommandType.PUT).Key(key) .Dg(new DataGear(data)).Build(); // TODO: javaの方ではconnectがnullになってしまうときがあるらしい // コンストラクタで呼び出されるThreadをやめて実効すればいいらしい if (!connect) { ConnectWait(); } connection.Write(cm); } public override void RunCommand(Command cm) { waitList.Add(cm); CommandType type = cm.type; switch (cm.type) { case CommandType.PEEK: type = CommandType.REMOTEPEEK; break; case CommandType.TAKE: type = CommandType.REMOTETAKE; break; } Command remoteCmd = new CommandBuilder().Init(type).FromDgmName(connection.name).Key(cm.key) .Clazz(cm.clazz).Connection(connection).Build(); connection.Write(remoteCmd); } public override void ResolveWaitCommand(string key, DataGear dg) { Command cm = waitList.GetAndRemoveCommand(key); cm.SetDg(dg); cm.Execute(); } public override void Finish() { Command cm = new CommandBuilder().Init(CommandType.FINISH).Build(); connection.SendCommand(cm); } public override void Close() { Command cm = new CommandBuilder().Init(CommandType.CLOSE).Connection(connection).Build(); connection.SendCommand(cm); } public override void Shutdown() { connection.Close(); BlockingCollection queue = connection.sendQueue; if (queue.Count == 0) { queue.Dispose(); } } public void ConnectWait() { lock (syncObj) { while (!connect) { try { Monitor.Wait(syncObj); } catch {} } } } } }