view daemon/IncomingTcpConnection.cs @ 39:9217d14cc220

fix NetworkStream
author riono <e165729@ie.u-ryukyu.ac.jp>
date Tue, 25 May 2021 02:35:52 +0900
parents 090be804eaa9
children 7276e3429c99
line wrap: on
line source

using System;
using System.Data;
using System.IO;
using System.Runtime.InteropServices;
using Christie_net.codegear;
using Christie_net.datagear;
using Christie_net.datagear.command;
using Christie_net.datagear.dg;
using MessagePack;
using Microsoft.VisualBasic;
using CommandType = Christie_net.datagear.command.CommandType;


namespace Christie_net.daemon {
public class IncomingTcpConnection {
    private RemoteDataGearManager manager;
    private CodeGearManager cgm;
    private Connection connection;

    public IncomingTcpConnection(Connection connection) {
        this.connection = connection;
        this.cgm = connection.cgm;
    }

    public void SetManager(RemoteDataGearManager manager) {
        this.manager = manager;
    }

    public void Run() {
        //TODO: Data長がわからないので1024で仮置き → ぴったしで読み込む必要がある
        byte[] streamData = new byte[1024];
        MemoryStream memoryData = null;
        try {
            connection.stream.Read(streamData);
            memoryData = new MemoryStream(streamData);
        } catch (Exception e) {
            Console.WriteLine(e);
        }

        if (memoryData == null) {
            return;
        }
        
        while (true) {
            try {
                // データはRemotemessage(Command), length, dataの順で入っている
                //int dataLength = connection.socket.Receive(deserializeCommand);
                byte[] remoteMessageData = new byte[100];
                memoryData.Read(remoteMessageData, 0, (int) Marshal.SizeOf(typeof(RemoteMessage)));
                RemoteMessage msg = MessagePackSerializer.Deserialize<RemoteMessage>(remoteMessageData);
                CommandType type = CommandTypeExt.GetCommandTypeFormId(msg.type);
                
                Console.WriteLine("stream pos;" + memoryData.Position);

                // Debug
                //Console.WriteLine("length: " + dataLength);
                
                // old code
                // RemoteMessage msg =
                //     MessagePackSerializer.Deserialize<RemoteMessage>(streamData);

                // Debug
                //Console.WriteLine("incoming:" + msg.type);
                byte[] data;
                byte[] lengthData;
                int length;
                
                switch (type) {
                    case CommandType.PUT:
                        //data = new byte[MessagePackSerializer.Deserialize<int>(deserializeCommand)];
                        //connection.socket.Receive(data);
                        lengthData = new byte[100];
                        memoryData.Read(lengthData, 0, Marshal.SizeOf(typeof(int)));
                        length = MessagePackSerializer.Deserialize<int>(lengthData);
                        data = new byte[length];

                        memoryData.Read(data, 0, length);
                        
                        try {
                            MessagePackDataGear<object> dg =
                                new MessagePackDataGear<object>(data, Type.GetType(msg.clazz));
                           
                            // Debug
                            // Type t = Type.GetType(msg.clazz);
                            // object obj = MessagePackSerializer.Deserialize<object>(msg.data);
                            Console.WriteLine("***type:" + msg.type +  " key:" + msg.key + " fromDgm:" + msg.fromDmgName + " class:" + msg.clazz + " data: null" );
                            
                            
                            cgm.GetLocalDGM().Put(msg.key, dg);
                        } catch (TypeLoadException e) {
                            Console.WriteLine(e.StackTrace);
                        }

                        break;
                    case CommandType.REMOTEPEEK:
                    case CommandType.REMOTETAKE:
                        try {
                            Command cm = new CommandBuilder().Init(type).FromDgmName(msg.fromDmgName)
                                .Key(msg.key)
                                .Clazz(Type.GetType(msg.clazz))
                                .Connection(connection).Build();
                            cgm.GetLocalDGM().RunCommand(cm);
                        } catch (TypeLoadException e) {
                            Console.WriteLine(e.StackTrace);
                        }

                        break;
                    case CommandType.REPLY: // 待っていたwaitlistに渡してcsにセット
                        lengthData = new byte[100];
                        memoryData.Read(lengthData, 0, Marshal.SizeOf(typeof(int)));
                        length = MessagePackSerializer.Deserialize<int>(lengthData);
                        data = new byte[length];

                        memoryData.Read(data, 0, length);
                        
                        // data = new byte[MessagePackSerializer.Deserialize<int>(streamData)];
                        // connection.socket.Receive(data);

                        try {
                            MessagePackDataGear<object> dg =
                                new MessagePackDataGear<object>(data, Type.GetType(msg.clazz));
                            cgm.GetDGM(msg.fromDmgName).ResolveWaitCommand(msg.key, dg);
                        } catch (TypeLoadException e) {
                            Console.WriteLine(e.StackTrace);
                        }

                        break;
                    default:
                        break;
                }
            } catch (IOException e) {
                Console.WriteLine(e.StackTrace);
            }
        }
    }
}
}