view src/main/java/christie/topology/manager/IncomingHosts.java @ 198:dd3c0ba6a0a6

fix topology manager
author akahori
date Sat, 09 Mar 2019 21:53:37 +0900
parents 6eb548c188e5
children
line wrap: on
line source

package christie.topology.manager;


import christie.annotation.Peek;
import christie.annotation.Take;
import christie.codegear.CodeGear;
import christie.codegear.CodeGearManager;
import christie.topology.HostMessage;
import christie.topology.Message;
import christie.topology.TopologyDataGear;

import java.util.HashMap;
import java.util.LinkedList;

public class IncomingHosts extends CodeGear {

    @Take
    HashMap<String, LinkedList<HostMessage>> resultParse;
    @Take
    LinkedList<String> nodeNames;
    @Take
    HashMap<String, String> absCookieTable;

    @Take // new coming host info
    HostMessage newHost;
    @Take
    String MD5;

    @Take
    HashMap<String, LinkedList<HostMessage>> topology;

    @Peek
    TopologyDataGear topoDG;

    public IncomingHosts() {

    }

    @Override
    protected void run(CodeGearManager cgm) {
        // not have or match cookie
        String nodeName = nodeNames.poll();
        getLocalDGM().put("nodeNames", nodeNames);

        // Manager connect to Node
        cgm.createRemoteDGM(nodeName,
                            newHost.getHostName(),
                            newHost.getPort());

        absCookieTable.put(MD5, nodeName);
        getLocalDGM().put("absCookieTable", absCookieTable);

        getDGM(nodeName).put("cookie", MD5);

        LinkedList<HostMessage> nodeInfoList = resultParse.get(nodeName);
        put(nodeName, "connectNodeNum", nodeInfoList.size());
        if(nodeInfoList.size() == 0) put(nodeName,"_CONNECTIONMESSAGE", new Message());

        for (HostMessage nodeInfo : nodeInfoList) {


            nodeInfo.setHostAndPort(newHost);

            //getLocalDGM().put("nodeInfo", nodeInfo);

            //getDGM(nodeName).put("remoteNodeInfo", nodeInfo);
            //cgm.setup(new RecordTopology());

            String nodeInfoNodeName = nodeInfo.getNodeName();
            LinkedList<HostMessage> connections;

            if (!topology.containsKey(nodeInfoNodeName)) {
                connections = new LinkedList<HostMessage>();
            } else {
                connections = topology.get(nodeInfoNodeName);
            }
            connections.add(nodeInfo);
            topology.put(nodeInfoNodeName, connections);
        }

        getLocalDGM().put("topology", topology);


        if (nodeNames.isEmpty()) {
            // configuration finish
            for (String key : resultParse.keySet()) {
                topoDG.setNodeName(key);
                getDGM(key).put("topoDG", topoDG);
                if(topology.containsKey(key)){
                    LinkedList<HostMessage> connections = topology.get(key);
                    for(HostMessage connection: connections) {
                        put(key, "remoteNodeInfo", connection);
                    }
                }
                //put(key, "remoteNodeInfo", new HostMessage()); // end mark
            }
        }else{
            cgm.setup(new IncomingHosts());
        }
        getLocalDGM().put("resultParse", resultParse);

    }
}