# HG changeset patch # User suruga # Date 1514716347 -32400 # Node ID e321c5ec9b5863be2392268f13ca47da55316e90 # Parent 3c060de44c2e5946d54704a588a663bf695808a1 fix toplogy manager; ring worked diff -r 3c060de44c2e -r e321c5ec9b58 Alice.iml --- a/Alice.iml Sun Dec 31 12:06:45 2017 +0900 +++ b/Alice.iml Sun Dec 31 19:32:27 2017 +0900 @@ -1,13 +1,363 @@ - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff -r 3c060de44c2e -r e321c5ec9b58 src/main/java/alice/codesegment/CodeSegment.java --- a/src/main/java/alice/codesegment/CodeSegment.java Sun Dec 31 12:06:45 2017 +0900 +++ b/src/main/java/alice/codesegment/CodeSegment.java Sun Dec 31 19:32:27 2017 +0900 @@ -25,32 +25,32 @@ private ArrayList receivers = new ArrayList();//all Receivers private Field[] fields; - public CodeSegment(){ + public CodeSegment() { createReceiver(); setKey(); } - public void createReceiver(){ + public void createReceiver() { this.fields = this.getClass().getDeclaredFields(); for (Field field : fields) { - if (field.isAnnotationPresent(Take.class)){ + if (field.isAnnotationPresent(Take.class)) { receivers.add(new MetaReceiver(field.getType(), ids.create(CommandType.TAKE))); } - else if (field.isAnnotationPresent(Peek.class)){ + else if (field.isAnnotationPresent(Peek.class)) { receivers.add(new MetaReceiver(field.getType(), ids.create(CommandType.PEEK))); } } } - public void setKey(){ + public void setKey() { int i = 0; for (Field field : fields) { - if (field.isAnnotationPresent(Take.class)){ + if (field.isAnnotationPresent(Take.class)) { Take ano = field.getAnnotation(Take.class); receivers.get(i).getReceiver().setKey(ano.value()); i++; } - else if (field.isAnnotationPresent(Peek.class)){ + else if (field.isAnnotationPresent(Peek.class)) { Peek ano = field.getAnnotation(Peek.class); receivers.get(i).getReceiver().setKey(ano.value()); i++; @@ -58,7 +58,7 @@ } } - public void setValue(){ + public void setValue() { int i = 0; for (Field field : fields){ if (field.isAnnotationPresent(Take.class) || field.isAnnotationPresent(Peek.class)) { diff -r 3c060de44c2e -r e321c5ec9b58 src/main/java/alice/test/codesegment/remote/RemoteIncrement.java --- a/src/main/java/alice/test/codesegment/remote/RemoteIncrement.java Sun Dec 31 12:06:45 2017 +0900 +++ b/src/main/java/alice/test/codesegment/remote/RemoteIncrement.java Sun Dec 31 19:32:27 2017 +0900 @@ -3,10 +3,17 @@ import alice.codesegment.CodeSegment; import alice.datasegment.CommandType; import alice.datasegment.Receiver; +import org.msgpack.type.ValueFactory; public class RemoteIncrement extends CodeSegment { public Receiver num = ids.create(CommandType.TAKE);//true でCompressedDSMからtake + public Receiver nullValue = ids.create(CommandType.TAKE);//true でCompressedDSMからtake + + RemoteIncrement() { + num.setKey("compressedlocal", "num"); + nullValue.setKey("null-value"); + } /** * Increment DataSegment value up to 10 @@ -18,6 +25,11 @@ z = "zMP"; } int num = this.num.asInteger(); + if ( nullValue.getVal() == null ) { + System.out.println("get null value"); + } + ods.put("null-value", ValueFactory.createNilValue()); + System.out.println("[CodeSegment" + z + "] " + num++); if (num == 10) System.exit(0); diff -r 3c060de44c2e -r e321c5ec9b58 src/main/java/alice/test/codesegment/remote/RemoteIncrement1.java --- a/src/main/java/alice/test/codesegment/remote/RemoteIncrement1.java Sun Dec 31 12:06:45 2017 +0900 +++ b/src/main/java/alice/test/codesegment/remote/RemoteIncrement1.java Sun Dec 31 19:32:27 2017 +0900 @@ -3,13 +3,16 @@ import alice.codesegment.CodeSegment; import alice.datasegment.CommandType; import alice.datasegment.Receiver; +import org.msgpack.type.ValueFactory; public class RemoteIncrement1 extends CodeSegment { public Receiver num = ids.create(CommandType.TAKE); + public Receiver nullValue = ids.create(CommandType.TAKE); RemoteIncrement1() { num.setKey("remote","num2"); + nullValue.setKey("remote","remote-null-value"); } /** @@ -21,6 +24,11 @@ int num = this.num.asInteger(); System.out.println("remote " + num++); if (num == 5) return ; + if ( nullValue.getVal() == null ) { + System.out.println("get remote null value"); + } + + ods.put("remote","remote-null-value", ValueFactory.createNilValue()); new RemoteIncrement1(); ods.put("remote", "num2", num); diff -r 3c060de44c2e -r e321c5ec9b58 src/main/java/alice/test/codesegment/remote/RemoteStartCodeSegment.java --- a/src/main/java/alice/test/codesegment/remote/RemoteStartCodeSegment.java Sun Dec 31 12:06:45 2017 +0900 +++ b/src/main/java/alice/test/codesegment/remote/RemoteStartCodeSegment.java Sun Dec 31 19:32:27 2017 +0900 @@ -1,6 +1,7 @@ package alice.test.codesegment.remote; import alice.codesegment.CodeSegment; +import org.msgpack.type.ValueFactory; public class RemoteStartCodeSegment extends CodeSegment { @@ -9,7 +10,10 @@ RemoteIncrement cs = new RemoteIncrement(); ods.put("compressedremote", "num", 0); ods.put("num2",0); + + ods.put("remote","null-value", ValueFactory.createNilValue()); + ods.put("remote-null-value", ValueFactory.createNilValue()); + new RemoteIncrement1(); - cs.num.setKey("compressedlocal", "num"); } } \ No newline at end of file diff -r 3c060de44c2e -r e321c5ec9b58 src/main/java/alice/test/topology/localTestTopology/LocalTestTopology.java --- a/src/main/java/alice/test/topology/localTestTopology/LocalTestTopology.java Sun Dec 31 12:06:45 2017 +0900 +++ b/src/main/java/alice/test/topology/localTestTopology/LocalTestTopology.java Sun Dec 31 19:32:27 2017 +0900 @@ -26,8 +26,8 @@ for (LocalTestTopologyConfig conf: configs ) { new AliceDaemon(conf).listen(); - DataSegment.connect(conf.key,conf.key,"localhost",conf.localPort); - String[] csarg = {"-port ", Integer.toString(topologyManagerConfig.localPort), "-host","localhost","-localKey",conf.key }; + DataSegment.connect(conf.key,conf.key,"localhost",conf.connectPort); + String[] csarg = {"-p", Integer.toString(conf.localPort), "-host","localhost","-localKey",conf.key }; TopologyNodeConfig cs = new TopologyNodeConfig(csarg); cs.setLocalKey(conf.key); cs.setManagerKey("manager"); diff -r 3c060de44c2e -r e321c5ec9b58 src/main/java/alice/test/topology/localTestTopology/LocalTestTopologyConfig.java --- a/src/main/java/alice/test/topology/localTestTopology/LocalTestTopologyConfig.java Sun Dec 31 12:06:45 2017 +0900 +++ b/src/main/java/alice/test/topology/localTestTopology/LocalTestTopologyConfig.java Sun Dec 31 19:32:27 2017 +0900 @@ -10,9 +10,9 @@ public LocalTestTopologyConfig(String[] args,int port, String dsmName) { super(args); - hostname = "127.0.0.1"; + hostname = dsmName; connectPort = port; - localPort = port; + localPort = 0; // local test mode for create tree topology key = dsmName; } diff -r 3c060de44c2e -r e321c5ec9b58 src/main/java/alice/topology/HostMessage.java --- a/src/main/java/alice/topology/HostMessage.java Sun Dec 31 12:06:45 2017 +0900 +++ b/src/main/java/alice/topology/HostMessage.java Sun Dec 31 19:32:27 2017 +0900 @@ -1,5 +1,6 @@ package alice.topology; + import org.msgpack.annotation.Message; @Message @@ -32,4 +33,10 @@ public boolean isAlive() { return alive; } + + public String toString() { + return "HostMessage : name = " + name + ", port = " + Integer.toString(port) + " connectionName = " + + connectionName + " absName = " + absName + "reverseName = " + reverseName + " remoteAbsName = " + remoteAbsName + + " cokkie = " + cookie ; + } } diff -r 3c060de44c2e -r e321c5ec9b58 src/main/java/alice/topology/manager/CheckComingHost.java --- a/src/main/java/alice/topology/manager/CheckComingHost.java Sun Dec 31 12:06:45 2017 +0900 +++ b/src/main/java/alice/topology/manager/CheckComingHost.java Sun Dec 31 19:32:27 2017 +0900 @@ -12,7 +12,7 @@ private Receiver absCookieTable = ids.create(CommandType.PEEK); // cookie, AbsName HashMap public CheckComingHost(){ - this.host.setKey("host"); + this.host.setKey("hostMessage"); this.absCookieTable.setKey("absCookieTable"); } diff -r 3c060de44c2e -r e321c5ec9b58 src/main/java/alice/topology/manager/CreateTreeTopology.java --- a/src/main/java/alice/topology/manager/CreateTreeTopology.java Sun Dec 31 12:06:45 2017 +0900 +++ b/src/main/java/alice/topology/manager/CreateTreeTopology.java Sun Dec 31 19:32:27 2017 +0900 @@ -1,6 +1,8 @@ package alice.topology.manager; import java.util.HashMap; + +import alice.datasegment.DataSegmentManager; import org.msgpack.type.ValueFactory; import alice.codesegment.CodeSegment; @@ -17,6 +19,7 @@ private Receiver info3 = ids.create(CommandType.TAKE); // MD5 private Receiver info4 = ids.create(CommandType.TAKE); private Receiver info6 = ids.create(CommandType.TAKE); + private Receiver info7 = ids.create(CommandType.PEEK); public CreateTreeTopology(){ info.setKey("newHost"); @@ -25,6 +28,7 @@ info3.setKey("MD5"); info4.setKey("absCookieTable"); info6.setKey("parentManager"); + info7.setKey("config"); } @SuppressWarnings("unchecked") @@ -35,13 +39,19 @@ HostMessage host = info.asClass(HostMessage.class); int comingHostCount = info1.asInteger(); ParentManager manager = info6.asClass(ParentManager.class); - + TopologyManagerConfig topologyManagerConfig = info7.asClass(TopologyManagerConfig.class); HashMap nameTable = info2.asClass(HashMap.class); HashMap absCookieTable = info4.asClass(HashMap.class); String nodeName = "node"+comingHostCount; // Manager connect to Node - DataSegment.connect(nodeName, nodeName, host.name, host.port); + if (host.port == 0) { + // local test mode + DataSegmentManager dsm = DataSegment.get(host.name); + DataSegment.register(nodeName, dsm); + } else { + DataSegment.connect(nodeName, nodeName, host.name, host.port); + } ods.put(nodeName, "host", nodeName); ods.put(nodeName, "cookie", cookie); @@ -79,7 +89,7 @@ ods.put(info2.key, nameTable); ods.put(info6.key, manager); - ods.put(nodeName, ValueFactory.createNilValue()); + ods.put(nodeName, new HostMessage("",0,"","") ); if (comingHostCount==0) ods.put("start", ValueFactory.createNilValue()); } diff -r 3c060de44c2e -r e321c5ec9b58 src/main/java/alice/topology/manager/IncomingHosts.java --- a/src/main/java/alice/topology/manager/IncomingHosts.java Sun Dec 31 12:06:45 2017 +0900 +++ b/src/main/java/alice/topology/manager/IncomingHosts.java Sun Dec 31 19:32:27 2017 +0900 @@ -61,13 +61,17 @@ ods.put("nodeInfo", newHost); ods.put(nodeInfo.sourceNodeName, newHost); + log.info(" write to " + nodeInfo.sourceNodeName + " config message =" + newHost.toString() + " remain " + + Integer.toString((nodeNames.size()))); new RecordTopology(); } + log.info(" remaining configure host = " + Integer.toString(nodeNames.size())); if (nodeNames.isEmpty()) { // configuration finish for (String key : topology.keySet()) { - ods.put("local", key, ValueFactory.createNilValue()); + log.info(" write to " + key + " end message =" + (new HostMessage("",0,"","")).toString()); + ods.put("local", key, new HostMessage("",0,"","")); // end mark } } diff -r 3c060de44c2e -r e321c5ec9b58 src/main/java/alice/topology/manager/keeparive/TaskExecuter.java --- a/src/main/java/alice/topology/manager/keeparive/TaskExecuter.java Sun Dec 31 12:06:45 2017 +0900 +++ b/src/main/java/alice/topology/manager/keeparive/TaskExecuter.java Sun Dec 31 19:32:27 2017 +0900 @@ -19,7 +19,7 @@ return instance; } - public void setKey() { + public void taskExecutorSetKey() { ids.init(); info.setKey("_SCHEDULER"); } @@ -77,7 +77,7 @@ nowTask = null; startTime = 0; } - setKey(); + taskExecutorSetKey(); } public synchronized void skip() { diff -r 3c060de44c2e -r e321c5ec9b58 src/main/java/alice/topology/node/ClosedEventManager.java --- a/src/main/java/alice/topology/node/ClosedEventManager.java Sun Dec 31 12:06:45 2017 +0900 +++ b/src/main/java/alice/topology/node/ClosedEventManager.java Sun Dec 31 19:32:27 2017 +0900 @@ -6,7 +6,7 @@ import alice.datasegment.CommandType; import alice.datasegment.Receiver; -public class ClosedEventManager extends CodeSegment{ +public class ClosedEventManager extends CodeSegment { private Receiver info = ids.create(CommandType.PEEK); private static ClosedEventManager instance = new ClosedEventManager(); @@ -19,7 +19,7 @@ return instance; } - public void setKey() { + public void ceSetKey() { ids.init(); info.setKey("_DISCONNECT"); } @@ -28,7 +28,7 @@ @Override public void run() { new ExecuteEvent(); - setKey(); + ceSetKey(); } @SuppressWarnings("rawtypes") diff -r 3c060de44c2e -r e321c5ec9b58 src/main/java/alice/topology/node/ConfigurationFinish.java --- a/src/main/java/alice/topology/node/ConfigurationFinish.java Sun Dec 31 12:06:45 2017 +0900 +++ b/src/main/java/alice/topology/node/ConfigurationFinish.java Sun Dec 31 19:32:27 2017 +0900 @@ -1,5 +1,6 @@ package alice.topology.node; +import alice.daemon.Config; import alice.topology.manager.TopologyManagerConfig; import org.msgpack.type.ValueFactory; @@ -16,14 +17,19 @@ private CodeSegment startCS; public ConfigurationFinish(CodeSegment startCS) { + // System.err.println("config finish ...") ; this.startCS = startCS; + reverseCount.setKey("local", "reverseCount"); + configNodeNum.setKey("local", "configNodeNum"); + config.setKey("local","config"); } @Override public void run() { int rcount = reverseCount.asInteger(); int ncount = configNodeNum.asInteger(); - TopologyManagerConfig tconfig = config.asClass(TopologyManagerConfig.class); + Config tconfig = config.asClass(Config.class); + // System.err.println(" rcount = " + Integer.toString(rcount) + " " + Integer.toString(ncount)); if (rcount == ncount) { ods.put("manager", "done", ValueFactory.createNilValue()); Start cs = new Start(startCS); @@ -37,9 +43,6 @@ return; } - ConfigurationFinish cs3 = new ConfigurationFinish(startCS); - cs3.reverseCount.setKey("local", "reverseCount", this.reverseCount.index); - cs3.configNodeNum.setKey("local", "configNodeNum"); + new ConfigurationFinish(startCS); } - } diff -r 3c060de44c2e -r e321c5ec9b58 src/main/java/alice/topology/node/IncomingAbstractHostName.java --- a/src/main/java/alice/topology/node/IncomingAbstractHostName.java Sun Dec 31 12:06:45 2017 +0900 +++ b/src/main/java/alice/topology/node/IncomingAbstractHostName.java Sun Dec 31 19:32:27 2017 +0900 @@ -16,8 +16,7 @@ @Override public void run() { String absName = this.absName.asString(); - IncomingConnectionInfo cs = new IncomingConnectionInfo(absName, 0); - cs.hostInfo.setKey(conf.getManagerKey(), absName); + IncomingConnectionInfo cs = new IncomingConnectionInfo(absName, 0, conf.getManagerKey()); } } diff -r 3c060de44c2e -r e321c5ec9b58 src/main/java/alice/topology/node/IncomingConnectionInfo.java --- a/src/main/java/alice/topology/node/IncomingConnectionInfo.java Sun Dec 31 12:06:45 2017 +0900 +++ b/src/main/java/alice/topology/node/IncomingConnectionInfo.java Sun Dec 31 19:32:27 2017 +0900 @@ -13,35 +13,39 @@ private String absName; private int count; private Logger log = Logger.getLogger(IncomingConnectionInfo.class); + private String managerKey; - public IncomingConnectionInfo(String absName, int count) { + public IncomingConnectionInfo(String absName, int count, String managerKey) { this.absName = absName; this.count = count; + this.managerKey = managerKey; + hostInfo.setKey(managerKey,absName); } + + @Override public void run() { - if (this.hostInfo.getVal() == null) { + HostMessage hostInfo = this.hostInfo.asClass(HostMessage.class); + log.info(hostInfo.toString()); + if ( hostInfo.name.equals("")) { // end case + log.info(" topology node finished " + absName); ods.put("local", "configNodeNum", count); + return ; + } + log.info("topology node " + absName + " will connect to " + hostInfo.name ); + if (DataSegment.contains(hostInfo.connectionName)) { + // need to wait remove by DeleteConnection + ods.put("manager", absName, hostInfo); } else { - HostMessage hostInfo = this.hostInfo.asClass(HostMessage.class); - log.info("topology node " + absName + " will connect to " + hostInfo.name ); - if (DataSegment.contains(hostInfo.connectionName)) { - // need to wait remove by DeleteConnection - ods.put("manager", absName, hostInfo); - } else { - DataSegment.connect(hostInfo.connectionName, hostInfo.reverseName, hostInfo.name, hostInfo.port); - ods.put(hostInfo.connectionName, "reverseKey", hostInfo.reverseName); - count++; + DataSegment.connect(hostInfo.connectionName, hostInfo.reverseName, hostInfo.name, hostInfo.port); + ods.put(hostInfo.connectionName, "reverseKey", hostInfo.reverseName); + count++; - ods.put("cMember", hostInfo.connectionName); - new CreateConnectionList(); - } - + ods.put("cMember", hostInfo.connectionName); + new CreateConnectionList(); } - - IncomingConnectionInfo cs = new IncomingConnectionInfo(absName, count); - cs.hostInfo.setKey(absName); + new IncomingConnectionInfo(absName, count, managerKey); } } diff -r 3c060de44c2e -r e321c5ec9b58 src/main/java/alice/topology/node/IncomingReverseKey.java --- a/src/main/java/alice/topology/node/IncomingReverseKey.java Sun Dec 31 12:06:45 2017 +0900 +++ b/src/main/java/alice/topology/node/IncomingReverseKey.java Sun Dec 31 19:32:27 2017 +0900 @@ -1,6 +1,7 @@ package alice.topology.node; import alice.codesegment.CodeSegment; +import alice.daemon.IncomingTcpConnection; import alice.datasegment.CommandType; import alice.datasegment.DataSegment; import alice.datasegment.Receiver; @@ -14,7 +15,9 @@ public void run() { String reverseKey = this.reverseKey.asString(); String from = this.reverseKey.from; - DataSegment.getAccept(from).setReverseKey(reverseKey); + IncomingTcpConnection s = DataSegment.getAccept(from); + if (s != null) + s.setReverseKey(reverseKey); int reverseCount = this.reverseCount.asInteger(); reverseCount++; diff -r 3c060de44c2e -r e321c5ec9b58 src/main/java/alice/topology/node/StartTopologyNode.java --- a/src/main/java/alice/topology/node/StartTopologyNode.java Sun Dec 31 12:06:45 2017 +0900 +++ b/src/main/java/alice/topology/node/StartTopologyNode.java Sun Dec 31 19:32:27 2017 +0900 @@ -32,10 +32,15 @@ e.printStackTrace(); } new SaveCookie(); + if (conf.localPort == 0) { + // local test mode + localHostName = conf.getLocalKey(); + } + ods.put("config" , conf ); HostMessage host = new HostMessage(localHostName, conf.localPort); host.cookie = conf.cookie; - ods.put(manager, "host", host); + ods.put(manager, "hostMessage", host); ods.put(local,"_CLIST", new ArrayList()); @@ -49,8 +54,6 @@ ods.put(local, "reverseCount", 0); ConfigurationFinish cs3 = new ConfigurationFinish(startCS); - cs3.reverseCount.setKey(local, "reverseCount"); - cs3.configNodeNum.setKey(local, "configNodeNum"); }