changeset 26:45ff08d59fda

update CGM
author riono <e165729@ie.u-ryukyu.ac.jp>
date Tue, 12 Jan 2021 21:23:23 +0900
parents 52cb63c37218
children efb06874a34e
files Christie_net.csproj Test/RewritingTest/OverwriteScheduler.cs Test/RewritingTest/SocketListenerTask.cs Test/RewritingTest/TaskCreateOptionTest.cs codegear/CodeGearExecutor.cs codegear/CodeGearManager.cs codegear/ThreadPoolExecutors.cs daemon/AcceptThread.cs daemon/ChristieDaemon.cs datagear/DataGears.cs datagear/RemoteDataGearManager.cs datagear/command/RemoteDataGearManager.cs
diffstat 12 files changed, 248 insertions(+), 49 deletions(-) [+]
line wrap: on
line diff
--- a/Christie_net.csproj	Fri Dec 18 01:06:47 2020 +0900
+++ b/Christie_net.csproj	Tue Jan 12 21:23:23 2021 +0900
@@ -3,7 +3,7 @@
     <PropertyGroup>
         <OutputType>Exe</OutputType>
         <TargetFramework>netcoreapp3.1</TargetFramework>
-        <StartupObject>SocketListenerTask</StartupObject>
+        <StartupObject>TaskCreateOptionTest</StartupObject>
     </PropertyGroup>
 
     <ItemGroup>
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Test/RewritingTest/OverwriteScheduler.cs	Tue Jan 12 21:23:23 2021 +0900
@@ -0,0 +1,18 @@
+using System.Collections.Generic;
+using System.Threading.Tasks;
+
+namespace Christie_net {
+public class OverwriteScheduler : TaskScheduler {
+    protected override IEnumerable<Task>? GetScheduledTasks() {
+        throw new System.NotImplementedException();
+    }
+
+    protected override void QueueTask(Task task) {
+        throw new System.NotImplementedException();
+    }
+
+    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) {
+        throw new System.NotImplementedException();
+    }
+}
+}
\ No newline at end of file
--- a/Test/RewritingTest/SocketListenerTask.cs	Fri Dec 18 01:06:47 2020 +0900
+++ b/Test/RewritingTest/SocketListenerTask.cs	Tue Jan 12 21:23:23 2021 +0900
@@ -40,7 +40,7 @@
             //     Console.WriteLine("Accept:" + listener.LocalEndPoint);
             // }
          
-            SocketListenerThread newThread = new SocketListenerThread (ss);
+            SocketListenerTask newThread = new SocketListenerTask(ss);
             newThread.Run ();
             
             // Console.WriteLine("fin");
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Test/RewritingTest/TaskCreateOptionTest.cs	Tue Jan 12 21:23:23 2021 +0900
@@ -0,0 +1,64 @@
+using System;
+using System.Diagnostics;
+using System.Threading;
+using System.Threading.Tasks;
+
+class TaskCreateOptionTest
+{
+    static void Main(string[] args)
+    {
+        
+        // Task.Runする際のworkerThreadにカウントするかしないかの確認テスト TaskCreationOptions.LongRunningのテスト
+        int workerThreads, completionPortThreads;
+        
+        ThreadPool.GetMaxThreads(out workerThreads, out completionPortThreads);
+        Console.WriteLine(workerThreads);
+        Console.WriteLine(completionPortThreads);
+        
+        ThreadPool.GetAvailableThreads(out workerThreads, out completionPortThreads);
+        Console.WriteLine(workerThreads);
+        Console.WriteLine(completionPortThreads);
+        
+        ThreadPool.GetMinThreads(out workerThreads, out completionPortThreads);
+        Console.WriteLine(workerThreads);
+        Console.WriteLine(completionPortThreads);
+        
+        Console.WriteLine("--");
+        
+        Task.Factory.StartNew(() =>
+        {
+            ThreadPool.GetMaxThreads(out workerThreads, out completionPortThreads);
+            Console.WriteLine(workerThreads);
+            Console.WriteLine(completionPortThreads);
+        
+            ThreadPool.GetAvailableThreads(out workerThreads, out completionPortThreads);
+            Console.WriteLine(workerThreads);
+            Console.WriteLine(completionPortThreads);
+            
+            ThreadPool.GetMinThreads(out workerThreads, out completionPortThreads);
+            Console.WriteLine(workerThreads);
+            Console.WriteLine(completionPortThreads);
+        
+            while (true) { }
+        
+        }, TaskCreationOptions.LongRunning);
+        
+        
+        // workerThreadの最大数をみるテスト
+        // var watch = Stopwatch.StartNew();  
+        //
+        // for (int i = 0; i < 20; i++)  
+        // {  
+        //     Task.Run(() =>  
+        //     {  
+        //         var threadId = Thread.CurrentThread.ManagedThreadId;  
+        //         Console.WriteLine("{0:D2}: {1}ms後に開始",  
+        //             threadId, watch.ElapsedMilliseconds);  
+        //
+        //         Thread.Sleep(4000);  //4秒くらいかかる処理  
+        //     });  
+        // }  
+
+        Console.Read();
+    }
+}
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/codegear/CodeGearExecutor.cs	Tue Jan 12 21:23:23 2021 +0900
@@ -0,0 +1,25 @@
+namespace Christie_net.codegear {
+public class CodeGearExecutor {
+    private CodeGear cg;
+    private CodeGearManager cgm;
+    private int priority = 1;
+
+    public CodeGearExecutor(CodeGear cg, CodeGearManager cgm, int priority) {
+        this.cg = cg;
+        this.cgm = cgm;
+        this.priority = priority;
+    }
+
+    public void Run() {
+        //cg.Run();
+    }
+
+    public int GetPriority() {
+        return priority;
+    }
+
+    public void SetPriority(int priority) {
+        this.priority = priority;
+    }
+}
+}
\ No newline at end of file
--- a/codegear/CodeGearManager.cs	Fri Dec 18 01:06:47 2020 +0900
+++ b/codegear/CodeGearManager.cs	Tue Jan 12 21:23:23 2021 +0900
@@ -1,16 +1,46 @@
+using System;
 using System.Collections.Concurrent;
 using System.Threading;
+using Christie_net.daemon;
 using Christie_net.datagear;
+using Christie_net.datagear.command;
 
 namespace Christie_net.codegear {
 public class CodeGearManager {
     private ConcurrentDictionary<string, DataGearManager> dgmList = new ConcurrentDictionary<string, DataGearManager>();
     private ConcurrentDictionary<int, CodeGearManager> cgmList = new ConcurrentDictionary<int, CodeGearManager>();
-    //private ThreadPool threadPoolExecutor;
+    private ThreadPoolExecutors threadPoolExecutors;
     private LocalDataGearManager localDgm = new LocalDataGearManager();
-    //private ConcurrentDictionary<string, >
+    private ConcurrentDictionary<string, IncomingTcpConnection> acceptHash =
+        new ConcurrentDictionary<string, IncomingTcpConnection>();
     public int cgmID;
-    //public
+    public ChristieDaemon daemon;
     public int localPort;
+
+    public CodeGearManager(int cgmID, ThreadPoolExecutors exe, ConcurrentDictionary<int, CodeGearManager> cgmList, int localPort) {
+        this.dgmList.TryAdd("local", localDgm);
+        this.cgmList = cgmList;
+        this.threadPoolExecutors = exe;
+        this.cgmID = cgmID;
+        this.localPort = localPort;
+        this.daemon = new ChristieDaemon(localPort, this);
+        this.daemon.Listen();
+    }
+
+    public LocalDataGearManager GetLocalDGM() {
+        return localDgm;
+    }
+
+    public DataGearManager GetDGM(string dgmName) {
+        if (dgmList.ContainsKey(dgmName)) {
+            return dgmList[dgmName];
+        } else {
+            throw new ArgumentNullException("DGM " + dgmName + " is not found");
+        }
+    }
+
+    public RemoteDataGearManager CreateRemoteDGM(string dgmName, string address, int port) {
+        RemoteDataGearManager remote = new RemoteDataGearManager(dgmName, address, port, this);
+    }
 }
 }
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/codegear/ThreadPoolExecutors.cs	Tue Jan 12 21:23:23 2021 +0900
@@ -0,0 +1,19 @@
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Christie_net.codegear {
+public class ThreadPoolExecutors {
+
+    public ThreadPoolExecutors() {
+        
+    }
+    
+    public void CreateThreadPool() {
+        
+    }
+    
+    public void Execute(CodeGearExecutor command) {
+        Task.Factory.StartNew(() => command.Run());
+    }
+}
+}
\ No newline at end of file
--- a/daemon/AcceptThread.cs	Fri Dec 18 01:06:47 2020 +0900
+++ b/daemon/AcceptThread.cs	Tue Jan 12 21:23:23 2021 +0900
@@ -18,20 +18,24 @@
     public void Run() {
         while (true) {
             try {
-                Socket socket = soc;
-                socket.Listen((int)SocketOptionName.MaxConnections);
+                Socket socket = null;
+                socket = soc.Accept();
                 socket.NoDelay = true;
                 Console.WriteLine("Accept " + socket.LocalEndPoint + ":" + ((IPEndPoint)socket.LocalEndPoint).Port);
                 
                 Connection connection = new Connection(socket, cgm);
                 string key = "accept" + counter;
                 
-                IncomingTcpConnection inComing = new IncomingTcpConnection(connection);
-                Task.Run(() => inComing.Run());
+                IncomingTcpConnection incoming = new IncomingTcpConnection(connection);
+                Task.Factory.StartNew(
+                    () => incoming.Run(),
+                    TaskCreationOptions.LongRunning);
                 //cgm.SetAccept(key, in);
 
-                OutboundTcpConnection outBound = new OutboundTcpConnection(connection);
-                Task.Run(() => outBound.Run());
+                OutboundTcpConnection outbound = new OutboundTcpConnection(connection);
+                Task.Factory.StartNew(
+                    () => outbound.Run(),
+                    TaskCreationOptions.LongRunning);
                 counter++;
             } catch (Exception e) {
                 Console.WriteLine(e.StackTrace);
--- a/daemon/ChristieDaemon.cs	Fri Dec 18 01:06:47 2020 +0900
+++ b/daemon/ChristieDaemon.cs	Tue Jan 12 21:23:23 2021 +0900
@@ -27,9 +27,12 @@
             socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
             Console.WriteLine("ChristieDaemon, listen: bind to " + localEndPoint);
             socket.Bind(localEndPoint);
+            socket.Listen((int)SocketOptionName.MaxConnections);
 
             acceptThread = new AcceptThread(socket, cgm);
-            Task.Run(() => acceptThread.Run());
+            Task.Factory.StartNew(
+                () => acceptThread.Run(),
+                TaskCreationOptions.LongRunning);
         } catch (IOException e) {
             Console.WriteLine(e.StackTrace);
         }
--- a/datagear/DataGears.cs	Fri Dec 18 01:06:47 2020 +0900
+++ b/datagear/DataGears.cs	Tue Jan 12 21:23:23 2021 +0900
@@ -61,7 +61,7 @@
         lock (syncObject) {
             object data = null;
             DataGear<object> dataGear;
-            // BlockingQueueではpeekできないので、中のConcurrentQueueを取り出して操作
+            // BlockingCollectionではpeekできないので、中のConcurrentQueueを取り出して操作
             ConcurrentQueue<DataGear<object>> queue = new ConcurrentQueue<DataGear<object>>(dataGears[key]);
             if (queue.TryPeek(out dataGear)) {
                 data = dataGear.GetData();
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/datagear/RemoteDataGearManager.cs	Tue Jan 12 21:23:23 2021 +0900
@@ -0,0 +1,72 @@
+using System;
+using System.Net;
+using System.Net.Sockets;
+using System.Threading.Tasks;
+using Christie_net.codegear;
+using Christie_net.daemon;
+using Christie_net.datagear.command;
+using Christie_net.datagear.dg;
+
+namespace Christie_net.datagear {
+public class RemoteDataGearManager : DataGearManager {
+    private Connection connection;
+    private CodeGearManager cgm;
+    private bool connect = false;
+    private object lockObj = new object();
+    
+    public RemoteDataGearManager (string address, int port, CodeGearManager cgm) {
+        this.cgm = cgm;
+        RemoteDataGearManager manager = this;
+        Task.Factory.StartNew(() => {
+            do {
+                try {
+                    IPHostEntry host = Dns.GetHostEntry(address);
+                    IPAddress ipAddress = host.AddressList[0];
+                    IPEndPoint remoteEndPoint = new IPEndPoint(ipAddress, port);
+                    Socket socket = new Socket(ipAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
+                    socket.Bind(remoteEndPoint);
+                    socket.Listen((int) SocketOptionName.MaxConnections);
+                    socket.NoDelay = true;
+
+                    Socket listener = socket.Accept();
+                    connection = new Connection(listener, cgm);
+                } catch { }
+            } while (!connect);
+
+            IncomingTcpConnection incoming = new IncomingTcpConnection(connection);
+            incoming.SetManager(manager);
+            Task.Factory.StartNew(
+                () => incoming.Run(),
+                    TaskCreationOptions.LongRunning);
+            OutboundTcpConnection outbound = new OutboundTcpConnection(connection);
+            Task.Factory.StartNew(
+                () => outbound.Run(),
+                TaskCreationOptions.LongRunning);
+        });
+    }
+    
+    public override void Put(string key, object data) {
+        throw new NotImplementedException();
+    }
+
+    public override void RunCommand(Command cm) {
+        throw new NotImplementedException();
+    }
+
+    public override void ResolveWaitCommand(string key, DataGear<Type> dg) {
+        throw new NotImplementedException();
+    }
+
+    public override void Finish() {
+        throw new NotImplementedException();
+    }
+
+    public override void Close() {
+        throw new NotImplementedException();
+    }
+
+    public override void Shutdown() {
+        throw new NotImplementedException();
+    }
+}
+}
\ No newline at end of file
--- a/datagear/command/RemoteDataGearManager.cs	Fri Dec 18 01:06:47 2020 +0900
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,36 +0,0 @@
-using System;
-using Christie_net.daemon;
-using Christie_net.datagear.dg;
-
-namespace Christie_net.datagear.command {
-public class RemoteDataGearManager : DataGearManager {
-    private Connection connection;
-    //private CodegearManager cgm;
-    private bool connect = false;
-    private object lockObj = new object();
-    
-    public override void Put(string key, object data) {
-        throw new NotImplementedException();
-    }
-
-    public override void RunCommand(Command cm) {
-        throw new NotImplementedException();
-    }
-
-    public override void ResolveWaitCommand(string key, DataGear<Type> dg) {
-        throw new NotImplementedException();
-    }
-
-    public override void Finish() {
-        throw new NotImplementedException();
-    }
-
-    public override void Close() {
-        throw new NotImplementedException();
-    }
-
-    public override void Shutdown() {
-        throw new NotImplementedException();
-    }
-}
-}
\ No newline at end of file