changeset 41:cf5a75bc3e55

add
author akahori
date Tue, 31 Jul 2018 17:46:32 +0900
parents 342931aea0b8
children d7ac4abc0447
files src/main/java/christie/topology/manager/CheckComingHost.java src/main/java/christie/topology/manager/ConfigWaiter.java src/main/java/christie/topology/manager/IncomingHosts.java src/main/java/christie/topology/manager/NodeInfo.java src/main/java/christie/topology/manager/RecordTopology.java src/main/java/christie/topology/manager/TopologyManager.java src/main/java/christie/topology/manager/TopologyManagerConfig.java src/main/java/christie/topology/manager/TopologyType.java
diffstat 8 files changed, 389 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/topology/manager/CheckComingHost.java	Tue Jul 31 17:46:32 2018 +0900
@@ -0,0 +1,52 @@
+package christie.topology.manager;
+
+import christie.annotation.Peek;
+import christie.annotation.Take;
+import christie.codegear.CodeGear;
+import christie.codegear.CodeGearManager;
+import christie.topology.HostMessage;
+
+import java.util.HashMap;
+
+public class CheckComingHost extends CodeGear {
+
+    @Take
+    HostMessage hostMessage;
+
+    @Peek
+    HashMap<String, String> absCookieTable;
+
+    public CheckComingHost(){
+    }
+
+    @Override
+    public void run(CodeGearManager cgm) {
+
+        boolean match = false;
+        // check cookie
+        if (hostMessage.cookie != null) {
+            if (absCookieTable.containsKey(hostMessage.cookie)){
+                match = true;
+                hostMessage.absName = absCookieTable.get(hostMessage.cookie);
+                System.out.println("match");
+            }
+        }
+
+        if (match){
+            // coming host has ever joined this App
+            getLocalDGM().put("reconnectHost", hostMessage);
+
+            // TODO: これから実装する
+            //new SearchHostName();
+        } else {
+            getLocalDGM().put("orderHash", "order");
+            getLocalDGM().put("newHost", hostMessage);
+        }
+
+        cgm.setup(new CheckComingHost());
+    }
+
+
+
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/topology/manager/ConfigWaiter.java	Tue Jul 31 17:46:32 2018 +0900
@@ -0,0 +1,43 @@
+package christie.topology.manager;
+
+
+import christie.annotation.Take;
+import christie.codegear.CodeGear;
+import christie.codegear.CodeGearManager;
+import org.msgpack.type.ValueFactory;
+
+public class ConfigWaiter extends CodeGear {
+
+
+    // Question; done 変数がわからない...何やっているんだろう.
+    // public Receiver done = ids.create(CommandType.TAKE);
+    @Take
+    boolean done;
+
+    @Take
+    int nodeNum;
+
+
+
+    public ConfigWaiter() { }
+
+    @Override
+    public void run(CodeGearManager cgm) {
+        nodeNum--;
+        if (nodeNum == 0) {
+            put("local", "start", ValueFactory.createNilValue());
+            put("startTime", System.currentTimeMillis());
+            put("done", true);
+            //update("running", true);
+
+            return;
+        }
+        //ConfigWaiter cs3 = new ConfigWaiter(count);
+        //cs3.done.setKey("local", "done");
+
+        cgm.setup(new ConfigWaiter());
+        getLocalDGM().put("nodeNum", nodeNum);
+        getLocalDGM().put("done", false);
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/topology/manager/IncomingHosts.java	Tue Jul 31 17:46:32 2018 +0900
@@ -0,0 +1,84 @@
+package christie.topology.manager;
+
+
+import christie.annotation.Peek;
+import christie.annotation.Take;
+import christie.codegear.CodeGear;
+import christie.codegear.CodeGearManager;
+import christie.topology.HostMessage;
+import org.apache.log4j.Logger;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+
+public class IncomingHosts extends CodeGear {
+
+    @Peek // Topology from parse file
+    HashMap<String, LinkedList<NodeInfo>> topology;
+
+    @Peek // nodeName list
+    LinkedList<String> nodeNames;
+
+    @Take // new coming host info
+    HostMessage host;
+
+
+    //private Receiver absCookieTable = ids.create(CommandType.TAKE); // cookie, AbsName HashMap
+    //private Receiver cookie = ids.create(CommandType.TAKE);    // MD5
+    private Logger log = Logger.getLogger(IncomingHosts.class);
+
+    public IncomingHosts() {
+
+    }
+
+    @Override
+    public void run(CodeGearManager cgm) {
+
+        // not have or match cookie
+        String nodeName = nodeNames.poll();
+        // Manager connect to Node
+
+        cgm.createRemoteDGM(nodeName, host.name, host.port);
+        getDGM(nodeName).put( "host", nodeName);
+
+
+        /* cookie
+        String cookie = this.cookie.asString();
+        absCookieTable.put(cookie, nodeName);
+        ods.put(this.absCookieTable.key, absCookieTable);
+
+        ods.put(nodeName, "cookie", cookie);
+        */
+        log.info( "toplology manager connected from " + nodeName);
+
+        LinkedList<NodeInfo> nodes = topology.get(nodeName);
+        for (NodeInfo nodeInfo : nodes) {
+            HostMessage newHost = new HostMessage(host.name, host.port,
+                    nodeInfo.connectionName, nodeInfo.reverseName);
+            newHost.absName = nodeName;
+            newHost.remoteAbsName = nodeInfo.sourceNodeName;
+
+            getLocalDGM().put("nodeInfo", newHost);
+            getLocalDGM().put(nodeInfo.sourceNodeName, newHost);
+            log.info(" write to " + nodeInfo.sourceNodeName + " config message =" + newHost.toString() + " remain "
+                    + Integer.toString((nodeNames.size())));
+            new RecordTopology();
+        }
+
+        log.info(" remaining configure host = " + Integer.toString(nodeNames.size()));
+        if (nodeNames.isEmpty()) {
+            // configuration finish
+            for (String key : topology.keySet()) {
+                log.info(" write to " + key + " end message =" + (new HostMessage("",0,"","")).toString());
+                getLocalDGM().put(key, new HostMessage("",0,"","")); // end mark
+            }
+        }
+
+        // idsのときはkeyメソッドが使えたけど, 今はlistなのでエラーが出る.
+        // これが何をする処理か読めていないので, 今はコメントアウト.
+        //getLocalDGM().put(this.nodeNames.key, nodeNames);
+        //getLocalDGM().put(this.topology.key, topology);
+
+        cgm.setup(new IncomingHosts());
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/topology/manager/NodeInfo.java	Tue Jul 31 17:46:32 2018 +0900
@@ -0,0 +1,15 @@
+package christie.topology.manager;
+
+public class NodeInfo {
+
+    public String sourceNodeName;
+    public String connectionName;
+    public String reverseName;
+
+
+    public NodeInfo(String source, String connection) {
+        this.sourceNodeName = source;
+        this.connectionName = connection;
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/topology/manager/RecordTopology.java	Tue Jul 31 17:46:32 2018 +0900
@@ -0,0 +1,44 @@
+package christie.topology.manager;
+
+
+import christie.annotation.Peek;
+import christie.annotation.Take;
+import christie.codegear.CodeGear;
+import christie.codegear.CodeGearManager;
+import christie.topology.HostMessage;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+
+public class RecordTopology extends CodeGear {
+
+    @Take
+    HostMessage hostInfo;
+
+    @Peek
+    HashMap<String, LinkedList<HostMessage>> topology;
+    public RecordTopology(){ }
+
+    @Override
+    public void run(CodeGearManager cgm) {
+        LinkedList<HostMessage> connections;
+        if (!topology.containsKey(hostInfo.remoteAbsName)) {
+            connections = new LinkedList<HostMessage>();
+        } else {
+            connections = topology.get(hostInfo.remoteAbsName);
+        }
+        connections.add(hostInfo);
+        topology.put(hostInfo.remoteAbsName, connections);
+// need debug option
+//        for (LinkedList<HostMessage> list :topology.values()){
+//            System.out.print(list.get(0).remoteAbsName+" : ");
+//            for (HostMessage host : list){
+//                System.out.print("[ "+host.absName+" "+host.name+" "+host.port+" "+host.connectionName+" "+host.reverseName+" "+host.remoteAbsName+" ]");
+//            }
+//            System.out.println();
+//        }
+
+        //ods.update(info1.key, topology);
+    }
+
+}
--- a/src/main/java/christie/topology/manager/TopologyManager.java	Tue Jul 31 16:12:27 2018 +0900
+++ b/src/main/java/christie/topology/manager/TopologyManager.java	Tue Jul 31 17:46:32 2018 +0900
@@ -1,14 +1,123 @@
 package christie.topology.manager;
 
+import christie.annotation.Peek;
 import christie.codegear.CodeGear;
 import christie.codegear.CodeGearManager;
 
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+
+import org.apache.log4j.Logger;
+
+import com.alexmerz.graphviz.ParseException;
+import com.alexmerz.graphviz.Parser;
+import com.alexmerz.graphviz.objects.*;
+
 public class TopologyManager extends CodeGear {
 
+    @Peek
+    TopologyManagerConfig config;
+
+    Logger logger = Logger.getLogger(TopologyManager.class);
+
+
     @Override
     protected void run(CodeGearManager cgm) {
+        cgm.setup(new CheckComingHost());
 
+        // if (!conf.dynamic) は, conf.dynamic = trueの動作がわからないので,  省いた.
+
+        LinkedList<String> nodeNames = new LinkedList<String>();
+        HashMap<String, LinkedList<NodeInfo>> topology = new HashMap<String, LinkedList<NodeInfo>>();
+        int nodeNum = 0;
+
+        try {
+            FileReader reader = new FileReader(new File(config.confFilePath));
+            Parser parser = new Parser();
+            parser.parse(reader);
+
+
+            ArrayList<Graph> digraphs = parser.getGraphs();
+
+
+            for (Graph digraph : digraphs) {
+                ArrayList<Node> nodes = digraph.getNodes(false);
+                nodeNum = nodes.size();
+
+                for (Node node : nodes) {
+                    String nodeName = node.getId().getId();
+                    nodeNames.add(nodeName);
+                    topology.put(nodeName, new LinkedList<NodeInfo>());
+                }
+
+                ArrayList<Edge> edges = digraph.getEdges();
+                HashMap<String, NodeInfo> hash = new HashMap<String, NodeInfo>();
+
+                String connection;
+                String source;
+                String target;
+
+                NodeInfo nodeInfo;
+
+                // まず1回グラフを読み込む
+                for (Edge edge : edges) {
+                    connection = edge.getAttribute("label");
+                    source = edge.getSource().getNode().getId().getId();
+                    target = edge.getTarget().getNode().getId().getId();
+                    nodeInfo = new NodeInfo(source, connection);
+
+                    //Question: この下の2行はどんな役目?
+                    //LinkedList<NodeInfo> sources = topology.get(target);
+                    //sources.add(nodeInfo); // addしてその後の使い道は...?
+
+                    hash.put(source + "," + target, nodeInfo);
+                }
+
+                // hash.get(target + "," + source); をして, グラフを逆にたどって, reverseNameにlabelを入れてる.
+                for (Edge edge : edges) {
+                    connection = edge.getAttribute("label");
+                    source = edge.getSource().getNode().getId().getId();
+                    target = edge.getTarget().getNode().getId().getId();
+                    nodeInfo = hash.get(target + "," + source);
+
+                    if (nodeInfo != null) {
+                        nodeInfo.reverseName = connection;
+                    }
+
+                }
+            }
+
+        } catch (FileNotFoundException e) {
+            logger.error("File not found: " + config.confFilePath);
+            e.printStackTrace();
+        } catch (ParseException e) {
+            logger.error("File format error: " + config.confFilePath);
+            e.printStackTrace();
+        }
+
+        // for recode topology information
+        // cookie List
+        getLocalDGM().put("running", false);
+        getLocalDGM().put("resultParse", topology);
+        getLocalDGM().put("nodeNames", nodeNames);
+
+        cgm.setup(new IncomingHosts());
+
+
+
+        // Question: この処理何をやっているのかわからない. 一応, その下にそれっぽいコードを書いた.
+        // ConfigWaiter cs3 = new ConfigWaiter(nodeNum);
+        // cs3.done.setKey("local", "done");
+
+        cgm.setup(new ConfigWaiter());
+        getLocalDGM().put("nodeNum", nodeNum);
+        getLocalDGM().put("done", false);
 
     }
 
+
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/topology/manager/TopologyManagerConfig.java	Tue Jul 31 17:46:32 2018 +0900
@@ -0,0 +1,27 @@
+package christie.topology.manager;
+
+public class TopologyManagerConfig {
+    public boolean showTime = false;
+    public String confFilePath;
+    public TopologyType type = TopologyType.Tree;
+    public int hasChild = 2;
+
+    public TopologyManagerConfig(String[] args) {
+
+        for (int i = 0; i < args.length; i++) {
+            if ("-conf".equals(args[i])) {
+                confFilePath = args[++i];
+            } else if ("--Topology".equals(args[i])) {
+                String typeName = args[++i];
+                if ("tree".equals(typeName)) {
+                    type = TopologyType.Tree;
+                }
+            } else if ("--Child".equals(args[i])) {
+                hasChild = Integer.parseInt(args[++i]);
+            } else if ("--showTime".equals(args[i])) {
+                showTime = true;
+            }
+        }
+
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/topology/manager/TopologyType.java	Tue Jul 31 17:46:32 2018 +0900
@@ -0,0 +1,15 @@
+package christie.topology.manager;
+
+public enum TopologyType {
+    Tree(1);
+
+
+    private final int id;
+    private TopologyType(final int id) {
+        this.id = id;
+    }
+
+    public int getId() {
+        return id;
+    }
+}