changeset 49:fd944876257b

add node and keepalive
author akahori
date Thu, 23 Aug 2018 09:29:05 +0900
parents 174a2f37ec28
children 8dfd93810041
files src/main/java/christie/topology/manager/keepalive/CreateTask.java src/main/java/christie/topology/manager/keepalive/KeepAlive.java src/main/java/christie/topology/manager/keepalive/ListManager.java src/main/java/christie/topology/manager/keepalive/RemoveTask.java src/main/java/christie/topology/manager/keepalive/RespondData.java src/main/java/christie/topology/manager/keepalive/RespondPing.java src/main/java/christie/topology/manager/keepalive/StartKeepAlive.java src/main/java/christie/topology/manager/keepalive/TaskExecuter.java src/main/java/christie/topology/manager/keepalive/TaskInfo.java src/main/java/christie/topology/manager/keepalive/TaskType.java src/main/java/christie/topology/node/ConfigurationFinish.java src/main/java/christie/topology/node/CreateConnectionList.java src/main/java/christie/topology/node/IncomingAbstractHostName.java src/main/java/christie/topology/node/IncomingConnectionInfo.java src/main/java/christie/topology/node/IncomingReverseKey.java src/main/java/christie/topology/node/PrepareToClose.java src/main/java/christie/topology/node/SaveCookie.java src/main/java/christie/topology/node/Start.java src/main/java/christie/topology/node/StartTopologyNode.java src/main/java/christie/topology/node/TopologyNode.java src/main/java/christie/topology/node/TopologyNodeConfig.java
diffstat 21 files changed, 863 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/topology/manager/keepalive/CreateTask.java	Thu Aug 23 09:29:05 2018 +0900
@@ -0,0 +1,32 @@
+package christie.topology.manager.keepalive;
+
+import christie.annotation.Peek;
+import christie.codegear.CodeGear;
+import christie.codegear.CodeGearManager;
+
+import java.util.List;
+
+public class CreateTask extends CodeGear{
+    @Peek
+    List<String> _CLIST;
+    @Peek
+    ListManager _SCHEDULER;
+
+    public static int INTERVAL = 5;
+
+    public CreateTask(){
+    }
+
+    @Override
+    protected void run(CodeGearManager cgm){
+        int i = INTERVAL;
+        for (String manager : _CLIST){
+            TaskInfo task = new TaskInfo(TaskType.PING);
+            task.setInfo(manager, "_FORM_"+manager, i * 1000);
+            _SCHEDULER.addTask(task);
+            i +=INTERVAL;
+        }
+        getLocalDGM().put("_SCHEDULER", _SCHEDULER);
+
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/topology/manager/keepalive/KeepAlive.java	Thu Aug 23 09:29:05 2018 +0900
@@ -0,0 +1,15 @@
+package christie.topology.manager.keepalive;
+
+import christie.codegear.CodeGear;
+import christie.codegear.CodeGearManager;
+
+public class KeepAlive extends CodeGear {
+
+    @Override
+    protected void run(CodeGearManager cgm) {
+        getLocalDGM().put("_SCHEDULER", new ListManager());
+        //TaskExecuter.getInstance().setKey();
+
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/topology/manager/keepalive/ListManager.java	Thu Aug 23 09:29:05 2018 +0900
@@ -0,0 +1,103 @@
+package christie.topology.manager.keepalive;
+
+
+import java.util.LinkedList;
+
+public class ListManager {
+    private LinkedList<TaskInfo> list = new LinkedList<TaskInfo>();
+
+    public LinkedList<TaskInfo> getTaskList() {
+        return list;
+    }
+
+    public void addTask(TaskInfo newInfo) {
+        TaskExecuter exec = TaskExecuter.getInstance();
+        TaskInfo nowTask = exec.getNowTask();
+        if (nowTask == null) { // active task is nothing
+            add(newInfo);
+        } else {
+            long remainingTime = exec.getRemainingTime();
+            if (remainingTime > newInfo.getSleepTime()) {
+                // active task is longer than newTask.
+                TaskInfo task = nowTask.clone();
+                task.setSleepTime(remainingTime);
+                list.addFirst(task);
+                add(newInfo);
+                exec.ignore();
+            } else {
+                // newTask is longer than active task.
+                newInfo.setSleepTime(newInfo.getSleepTime() - remainingTime);
+                add(newInfo);
+            }
+        }
+        //show(); //need debug option
+    }
+
+    private void add(TaskInfo newInfo){
+        int cnt = 0;
+        for (TaskInfo info : list){
+            if (info.getSleepTime() < newInfo.getSleepTime()){
+                newInfo.setSleepTime(newInfo.getSleepTime() - info.getSleepTime());
+            } else if (info.getSleepTime() == newInfo.getSleepTime()){
+                // newInfo.getSleepTime() - info.getSleepTime() must be Zero
+                newInfo.setSleepTime(0);
+                cnt++;
+                break;
+            } else if (info.getSleepTime() > newInfo.getSleepTime()){
+                info.setSleepTime(info.getSleepTime() - newInfo.getSleepTime());
+                break;
+            }
+            cnt++;
+        }
+        list.add(cnt, newInfo);
+    }
+
+    public boolean deleteTask(TaskInfo task){
+        boolean matchFlag = false;
+        int cnt = 0;
+        for (TaskInfo t: list){
+            if (t.getType().equals(task.getType())&&t.getManagerKey().equals(task.getManagerKey())){
+                matchFlag = true;
+                if (cnt+1 < list.size() ){ // next task exists.
+                    TaskInfo next = list.get(cnt+1);
+                    next.setSleepTime(next.getSleepTime()+t.getSleepTime());
+                }
+                list.remove(cnt);
+                break;
+            } else {
+                cnt++;
+            }
+        }
+        return matchFlag;
+    }
+
+    public void show(){
+        TaskInfo nowTask = TaskExecuter.getInstance().getNowTask();
+        if (nowTask!=null){
+            System.out.print("NOW | ");
+            System.out.print(nowTask.getSleepTime() +" "+nowTask.getType()+" "+nowTask.getManagerKey());
+            System.out.print(" | ");
+        }
+        for (TaskInfo info : list){
+            System.out.print(info.getSleepTime() +" "+ info.getType()+" "+info.getManagerKey());
+            System.out.print(" | ");
+        }
+        System.out.println();
+    }
+
+    public void deleteAll(String name) {
+        TaskInfo task = TaskExecuter.getInstance().getNowTask();
+        if (task!= null) {
+            if (task.getManagerKey() != null && task.getManagerKey().equals(name)) {
+                TaskExecuter.getInstance().skip();
+            }
+        }
+        //DataSegment.getLocal().removeDataSegmentKey("_FORM_"+name);
+        task = new TaskInfo(TaskType.CLOSE);
+        task.setInfo(name, 0);
+        while(deleteTask(task));
+        task = new TaskInfo(TaskType.PING);
+        task.setInfo(name, 0);
+        while(deleteTask(task));
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/topology/manager/keepalive/RemoveTask.java	Thu Aug 23 09:29:05 2018 +0900
@@ -0,0 +1,32 @@
+package christie.topology.manager.keepalive;
+
+
+import christie.annotation.Take;
+import christie.codegear.CodeGear;
+import christie.codegear.CodeGearManager;
+
+public class RemoveTask extends CodeGear {
+
+    @Take
+    ListManager _SCHEDULER;
+
+    @Take
+    TaskInfo _REMOVETASK;
+
+    public RemoveTask(){
+    }
+
+    @Override
+    protected void run(CodeGearManager cgm) {
+        TaskExecuter exec = TaskExecuter.getInstance();
+        if (!exec.compareAndSkip(_REMOVETASK)){
+            // case: remove _REMOVETASK is still in the waiting queue.
+            boolean removeFlag = _SCHEDULER.deleteTask(_REMOVETASK);
+            if (!removeFlag){
+                getLocalDGM().put("_REMOVETASK", _REMOVETASK);
+                return;
+            }
+        }
+        getLocalDGM().put("_SCHEDULER", _SCHEDULER);
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/topology/manager/keepalive/RespondData.java	Thu Aug 23 09:29:05 2018 +0900
@@ -0,0 +1,11 @@
+package christie.topology.manager.keepalive;
+
+public class RespondData {
+    public String from;
+    public long time;
+
+    public RespondData(String str, long t){
+        from = str;
+        time = t;
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/topology/manager/keepalive/RespondPing.java	Thu Aug 23 09:29:05 2018 +0900
@@ -0,0 +1,30 @@
+package christie.topology.manager.keepalive;
+
+
+import christie.annotation.Take;
+import christie.codegear.CodeGear;
+import christie.codegear.CodeGearManager;
+
+public class RespondPing extends CodeGear{
+    @Take
+    RespondData respondData;
+//    private long pingedTime = System.currentTimeMillis();
+
+
+    @Override
+    public void run(CodeGearManager cgm) {
+        TaskExecuter exec = TaskExecuter.getInstance();
+        synchronized (exec){
+            //System.out.print("ping from "+d.from);
+            //System.out.println(" Recieved time "+(d.time - pingedTime));
+
+            TaskInfo task = new TaskInfo(TaskType.CLOSE);
+            task.setInfo(respondData.from, null, 0);
+            if (!exec.compareAndSkip(task)){
+                // case: remove task is still in the waiting queue.
+                getLocalDGM().put("_REMOVETASK",task);
+                new RemoveTask();
+            }
+        }
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/topology/manager/keepalive/StartKeepAlive.java	Thu Aug 23 09:29:05 2018 +0900
@@ -0,0 +1,11 @@
+package christie.topology.manager.keepalive;
+
+import christie.topology.node.TopologyNode;
+import christie.topology.node.TopologyNodeConfig;
+
+public class StartKeepAlive {
+    public static void main(String[] args) {
+        TopologyNodeConfig conf = new TopologyNodeConfig(args);
+        new TopologyNode(conf, new KeepAlive());
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/topology/manager/keepalive/TaskExecuter.java	Thu Aug 23 09:29:05 2018 +0900
@@ -0,0 +1,144 @@
+package christie.topology.manager.keepalive;
+
+import christie.annotation.Take;
+import christie.codegear.CodeGear;
+import christie.codegear.CodeGearManager;
+
+public class TaskExecuter extends CodeGear {
+
+    @Take
+    ListManager _SCHEDULER;
+    private TaskInfo nowTask;
+    private boolean sleepFlag = false;
+    private boolean skipFlag = false;
+    private long startTime = 0;
+    private long remainingTime = 0;
+    private static TaskExecuter instance = new TaskExecuter();
+
+    private TaskExecuter() {
+    }
+
+    public static TaskExecuter getInstance() {
+        return instance;
+    }
+
+    public void taskExecutorSetKey() {
+        //ids.init();
+    }
+
+
+    @Override
+    protected synchronized void run(CodeGearManager cgm) {
+        if (_SCHEDULER.getTaskList().size() == 0) {
+            TaskInfo task = new TaskInfo(TaskType.CREATE);
+            task.setSleepTime(3000);
+            _SCHEDULER.addTask(task);
+            getLocalDGM().put("_SCHEDULER", _SCHEDULER);
+            remainingTime = 0;
+            return;
+        }
+
+        nowTask = _SCHEDULER.getTaskList().poll();
+        if (nowTask.getType() != TaskType.PING)
+            getLocalDGM().put("_SCHEDULER", _SCHEDULER);
+        if (skipFlag) {
+            skipFlag = false;
+            nowTask.setSleepTime(remainingTime + nowTask.getSleepTime());
+            remainingTime = 0;
+        }
+        startTime = System.currentTimeMillis();
+        if (nowTask.getSleepTime() != 0) {
+            sleepFlag = true;
+            try {
+                wait(nowTask.getSleepTime());
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+            sleepFlag = false;
+        }
+
+        if (!skipFlag) {
+            // ping or close
+            if (nowTask.getType() == TaskType.PING) {
+                //ods.ping(nowTask.getManagerKey(), nowTask.getReturnKey());
+                TaskInfo task = new TaskInfo(TaskType.CLOSE);
+                task.setInfo(nowTask.getManagerKey(), 10 * 1000);
+                _SCHEDULER.addTask(task);
+                getLocalDGM().put("_SCHEDULER", _SCHEDULER);
+                cgm.setup(new RespondPing());
+            } else if (nowTask.getType() == TaskType.CLOSE) {
+                // TODO: shotdown処理は後で追加する.
+                    /*    // no response from the Remote DataSegment. So close this connection.
+                    if (DataSegment.contains(nowTask.getManagerKey())) {
+                        DataSegment.get(nowTask.getManagerKey()).shutdown();
+                        System.out.println(nowTask.getManagerKey() +" IS SHOTDOWN");
+                    } else {
+                        System.out.println(nowTask.getManagerKey() +" IS ALREADY SHOTDOWN");
+                    }*/
+
+            } else if (nowTask.getType() == TaskType.CREATE) {
+                cgm.setup(new CreateTask());
+            }
+        }
+        nowTask = null;
+        startTime = 0;
+
+
+        // taskExecutorSetKey();
+    }
+
+    public synchronized void skip() {
+        if (sleepFlag) {
+            skipFlag = true;
+            if (startTime == 0) {
+                remainingTime = nowTask.getSleepTime();
+            } else {
+                remainingTime = nowTask.getSleepTime() - (System.currentTimeMillis() - startTime);
+            }
+            nowTask = null;
+            notify();
+        }
+    }
+
+    public synchronized void ignore() {
+        if (sleepFlag) {
+            skipFlag = true;
+            remainingTime = 0;
+            nowTask = null;
+            notify();
+        }
+    }
+
+    public synchronized TaskInfo getNowTask() {
+        return nowTask;
+    }
+
+    // only use in ListManagerTest
+    public synchronized void setNowTask(TaskInfo info) {
+        nowTask = info;
+    }
+
+    public synchronized long getRemainingTime() {
+        if (sleepFlag) {
+            if (startTime != 0) {
+                return nowTask.getSleepTime() - (System.currentTimeMillis() - startTime);
+            } else {
+                return nowTask.getSleepTime();
+            }
+        } else {
+            return remainingTime;
+        }
+    }
+
+    public synchronized boolean compareAndSkip(TaskInfo task) {
+        if (nowTask != null) {
+            if (nowTask.getType().equals(task.getType())
+                    && nowTask.getManagerKey().equals(task.getManagerKey())) {
+                skip();
+                return true;
+            }
+        }
+        return false;
+    }
+
+}
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/topology/manager/keepalive/TaskInfo.java	Thu Aug 23 09:29:05 2018 +0900
@@ -0,0 +1,56 @@
+package christie.topology.manager.keepalive;
+
+public class TaskInfo {
+    private TaskType type;
+    private long sleepTime;
+    private String managerKey;
+    private String returnKey;
+
+    public TaskInfo(TaskType t){
+        type = t;
+    }
+
+    public void setInfo(String managerKey, long time){
+        this.managerKey = managerKey;
+        sleepTime = time;
+    }
+
+    public void setInfo(String managerKey, String returnKey, long time){
+        this.managerKey = managerKey;
+        this.returnKey = returnKey;
+        sleepTime = time;
+    }
+
+    public TaskType getType(){
+        return type;
+    }
+
+    public long getSleepTime(){
+        return sleepTime;
+    }
+
+    public void setSleepTime(long time){
+        sleepTime = time;
+    }
+
+    public String getManagerKey(){
+        return managerKey;
+    }
+
+    public String getReturnKey(){
+        return returnKey;
+    }
+
+    public TaskInfo clone(){
+        TaskInfo task = new TaskInfo(type);
+        task.setInfo(managerKey, returnKey, sleepTime);
+        return task;
+    }
+
+    public void show(){
+        System.out.print(type);
+        System.out.print(" "+  this.sleepTime);
+        System.out.print(" "+ this.managerKey);
+        System.out.println(" "+  this.returnKey);
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/topology/manager/keepalive/TaskType.java	Thu Aug 23 09:29:05 2018 +0900
@@ -0,0 +1,7 @@
+package christie.topology.manager.keepalive;
+
+public enum TaskType {
+    PING,
+    CLOSE,
+    CREATE,
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/topology/node/ConfigurationFinish.java	Thu Aug 23 09:29:05 2018 +0900
@@ -0,0 +1,45 @@
+package christie.topology.node;
+
+import christie.annotation.Peek;
+import christie.codegear.CodeGear;
+import christie.codegear.CodeGearManager;
+import christie.daemon.Config;
+import christie.topology.manager.keepalive.KeepAlive;
+import org.msgpack.type.ValueFactory;
+
+public class ConfigurationFinish extends CodeGear {
+
+    @Peek
+    int reverseCount;
+
+    @Peek
+    int configNodeNum;
+
+    @Peek
+    Config config;
+
+    private CodeGear startCS;
+
+    public ConfigurationFinish(CodeGear startCS) {
+        // System.err.println("config finish ...") ;
+        this.startCS = startCS;
+    }
+
+    @Override
+    protected void run(CodeGearManager cgm) {
+        // System.err.println(" rcount = " + Integer.toString(rcount) + " " + Integer.toString(ncount));
+        if (reverseCount == configNodeNum) {
+            getDGM("manager").put( "done", ValueFactory.createNilValue());
+            cgm.setup(new Start(startCS));
+
+            if (config.useKeepAlive)
+                cgm.setup(new KeepAlive());
+            cgm.setup(new PrepareToClose());
+            //ClosedEventManager.getInstance().register(DeleteConnection.class);
+            //ClosedEventManager.getInstance().setKey();*/
+            return;
+        }
+
+        new ConfigurationFinish(startCS);
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/topology/node/CreateConnectionList.java	Thu Aug 23 09:29:05 2018 +0900
@@ -0,0 +1,24 @@
+package christie.topology.node;
+
+import christie.annotation.Take;
+import christie.codegear.CodeGear;
+import christie.codegear.CodeGearManager;
+
+import java.util.List;
+
+public class CreateConnectionList extends CodeGear {
+    @Take
+    List<String> _CLIST;
+    @Take
+    String cMember;
+
+    public CreateConnectionList() { }
+
+    @Override
+    protected void run(CodeGearManager cgm) {
+        _CLIST.add(cMember);
+        getLocalDGM().put("_CLIST", _CLIST);
+
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/topology/node/IncomingAbstractHostName.java	Thu Aug 23 09:29:05 2018 +0900
@@ -0,0 +1,24 @@
+package christie.topology.node;
+
+
+import christie.annotation.Peek;
+import christie.annotation.Take;
+import christie.codegear.CodeGear;
+import christie.codegear.CodeGearManager;
+
+public class IncomingAbstractHostName extends CodeGear {
+
+    private final TopologyNodeConfig conf;
+    @Peek
+    String host;
+
+    public IncomingAbstractHostName(TopologyNodeConfig conf) {
+        this.conf = conf;
+    }
+
+    @Override
+    protected void run(CodeGearManager cgm) {
+        cgm.setup(new IncomingConnectionInfo(host, 0, conf.getManagerKey()));
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/topology/node/IncomingConnectionInfo.java	Thu Aug 23 09:29:05 2018 +0900
@@ -0,0 +1,53 @@
+package christie.topology.node;
+
+
+import christie.annotation.Take;
+import christie.codegear.CodeGear;
+import christie.codegear.CodeGearManager;
+import christie.topology.HostMessage;
+import org.apache.log4j.Logger;
+
+public class IncomingConnectionInfo extends CodeGear {
+
+    @Take
+    HostMessage hostInfo;
+
+    private String absName;
+    private int count;
+    private Logger log = Logger.getLogger(IncomingConnectionInfo.class);
+    private String managerKey;
+
+    public IncomingConnectionInfo(String absName, int count, String managerKey) {
+        this.absName = absName;
+        this.count = count;
+        this.managerKey = managerKey;
+        // hostInfo.setKey(managerKey,absName);
+
+    }
+
+
+
+    @Override
+    protected void run(CodeGearManager cgm) {
+        log.info(hostInfo.toString());
+        if ( hostInfo.hostName.equals("")) { // end case
+            log.info(" topology node finished " + absName);
+            getLocalDGM().put("configNodeNum", count);
+            return ;
+        }
+        log.info("topology node " + absName + " will connect to " + hostInfo.hostName );
+        if (cgm.getDgmList().contains(hostInfo.connectionName)) {
+            // need to wait remove by DeleteConnection
+            getDGM("manager").put(absName, hostInfo);
+        } else {
+            cgm.createRemoteDGM(hostInfo.connectionName, hostInfo.hostName, hostInfo.port);
+            getDGM(hostInfo.connectionName).put("reverseKey", hostInfo.reverseName);
+            count++;
+
+            getLocalDGM().put("cMember", hostInfo.connectionName);
+            cgm.setup(new CreateConnectionList());
+        }
+        cgm.setup(new IncomingConnectionInfo(absName, count, managerKey));
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/topology/node/IncomingReverseKey.java	Thu Aug 23 09:29:05 2018 +0900
@@ -0,0 +1,29 @@
+package christie.topology.node;
+
+
+import christie.annotation.Peek;
+import christie.annotation.Take;
+import christie.codegear.CodeGear;
+import christie.codegear.CodeGearManager;
+import christie.daemon.IncomingTcpConnection;
+
+public class IncomingReverseKey extends CodeGear {
+
+    @Take
+    String reverseKey;
+
+    @Peek
+    int reverseCount;
+
+    @Override
+    protected void run(CodeGearManager cgm) {
+        //String from = this.reverseKey.from;
+        //IncomingTcpConnection s = DataSegment.getAccept(from);
+        //if (s != null) s.setReverseKey(reverseKey);
+
+        reverseCount++;
+        getLocalDGM().put( "reverseCount", reverseCount);
+        cgm.setup(new IncomingReverseKey());
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/topology/node/PrepareToClose.java	Thu Aug 23 09:29:05 2018 +0900
@@ -0,0 +1,46 @@
+package christie.topology.node;
+
+
+import christie.annotation.Peek;
+import christie.annotation.Take;
+import christie.codegear.CodeGear;
+import christie.codegear.CodeGearManager;
+
+import java.util.List;
+
+public class PrepareToClose extends CodeGear {
+
+    @Peek
+    String _CLOSEMESSEAGE;
+
+    @Take
+    List<String> _CLIST;
+
+
+    public PrepareToClose() {
+
+    }
+
+    @Override
+    protected void run(CodeGearManager cgm) {
+        /*
+        if (_CLIST.contains(_CLOSEMESSEAGE)) {
+            _CLIST.remove(_CLOSEMESSEAGE);
+
+            DataSegmentManager manager = DataSegment.get(_CLOSEMESSEAGE);
+            manager.setSendError(false);
+
+            ods.put(_CLOSEMESSEAGE, "_CLOSEREADY", _CLOSEMESSEAGE);
+            ods.put("_CLOSEREADY", _CLOSEMESSEAGE);
+
+            new CloseRemoteDataSegment();
+        } else {
+            // lost node is this node's parent, so already removed
+            new ReceiveCloseMessage(CommandType.TAKE);
+        }*/
+        cgm.setup(new PrepareToClose());
+        getLocalDGM().put("_CLIST", _CLIST);
+
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/topology/node/SaveCookie.java	Thu Aug 23 09:29:05 2018 +0900
@@ -0,0 +1,25 @@
+package christie.topology.node;
+
+
+import christie.annotation.Peek;
+import christie.annotation.Take;
+import christie.codegear.CodeGear;
+import christie.codegear.CodeGearManager;
+import org.apache.log4j.Logger;
+
+
+public class SaveCookie extends CodeGear{
+    @Peek
+    String cookie;
+
+    private Logger logger = Logger.getLogger(SaveCookie.class);
+
+    public SaveCookie(){ }
+
+    @Override
+    protected void run(CodeGearManager cgm) {
+        logger.info("SaveCookie:" + cookie);
+    }
+
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/topology/node/Start.java	Thu Aug 23 09:29:05 2018 +0900
@@ -0,0 +1,30 @@
+package christie.topology.node;
+
+import christie.annotation.Peek;
+import christie.codegear.CodeGear;
+import christie.codegear.CodeGearManager;
+import org.apache.log4j.Logger;
+
+public class Start extends CodeGear {
+
+    @Peek
+    boolean start;
+
+    private Logger logger = Logger.getLogger(Start.class);
+    private CodeGear startCS;
+
+
+    public Start(CodeGear startCS) {
+        this.startCS = startCS;
+    }
+
+    @Override
+    protected void run(CodeGearManager cgm) {
+        logger.info("Configuration finished.");
+
+        if (startCS == null) return;
+        cgm.setup(startCS);
+
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/topology/node/StartTopologyNode.java	Thu Aug 23 09:29:05 2018 +0900
@@ -0,0 +1,26 @@
+package christie.topology.node;
+
+import christie.codegear.CodeGear;
+import christie.codegear.CodeGearManager;
+import christie.codegear.StartCodeGear;
+
+public class StartTopologyNode extends StartCodeGear{
+    TopologyNodeConfig conf;
+    CodeGear startCG;
+
+    public StartTopologyNode(CodeGearManager cgm, TopologyNodeConfig conf, CodeGear startCG) {
+        super(cgm);
+        cgm.setup(new TopologyNode(conf, startCG));
+
+    }
+
+    public static void main(String[] args) {
+        TopologyNodeConfig conf = new TopologyNodeConfig(args);
+        new TopologyNode(conf, null);
+    }
+
+    @Override
+    protected void run(CodeGearManager cgm) {
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/topology/node/TopologyNode.java	Thu Aug 23 09:29:05 2018 +0900
@@ -0,0 +1,58 @@
+package christie.topology.node;
+
+import christie.codegear.CodeGear;
+import christie.codegear.CodeGearManager;
+import christie.topology.HostMessage;
+import christie.topology.manager.IncomingHosts;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+
+public class TopologyNode extends CodeGear{
+
+    private final String manager;
+    private final String local;
+    private TopologyNodeConfig conf;
+    private CodeGear startCS;
+
+    public TopologyNode(TopologyNodeConfig conf, CodeGear startCS) {
+        this.conf = conf;
+        this.startCS = startCS;
+        this.manager = conf.getManagerKey();
+        this.local = conf.getLocalKey();
+    }
+
+    @Override
+    protected void run(CodeGearManager cgm) {
+        cgm.createRemoteDGM(manager, conf.getManagerHostName(), conf.getManagerPort());
+        String localHostName = null;
+        try {
+            localHostName = InetAddress.getLocalHost().getHostAddress();
+        } catch (UnknownHostException e) {
+            e.printStackTrace();
+        }
+        cgm.setup(new SaveCookie());
+        if (cgm.localPort == 0) {
+            // local test mode
+            localHostName = conf.getLocalKey();
+        }
+        getLocalDGM().put("config" , conf );
+
+        HostMessage host = new HostMessage(localHostName, cgm.localPort);
+        host.cookie = conf.cookie;
+        getDGM(manager).put("hostMessage", host);
+
+        getLocalDGM().put("_CLIST", new ArrayList<String>());
+        //getDGM(local).put("_CLIST", new ArrayList<String>());
+
+        cgm.setup(new IncomingAbstractHostName(conf));
+
+        cgm.setup(new IncomingReverseKey());
+
+        getLocalDGM().put("reverseCount", 0);
+
+        cgm.setup(new ConfigurationFinish(startCS));
+
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/topology/node/TopologyNodeConfig.java	Thu Aug 23 09:29:05 2018 +0900
@@ -0,0 +1,62 @@
+package christie.topology.node;
+
+import christie.daemon.Config;
+
+public class TopologyNodeConfig extends Config {
+    private String managerHostName;
+    private int managerPort = 10000;
+    public String cookie;
+    private String managerKey = "manager";
+    private String localKey = "local";
+
+    public TopologyNodeConfig(String[] args) {
+        super(args);
+        for (int i = 0; i< args.length; i++) {
+            if ("--host".equals(args[i])) {
+                setManagerHostName(args[++i]);
+            } else if ("--managerKey".equals(args[i])) {
+                setManagerKey(args[++i]);
+            } else if ("--localKey".equals(args[i])) {
+                setLocalKey(args[++i]);
+            } else if ("--managerPort".equals(args[i])) {
+                setManagerPort(Integer.parseInt(args[++i]));
+            } else if ("--cookie".equals(args[i])) {
+                cookie = args[++i];
+            }
+        }
+    }
+
+
+    public String getManagerHostName() {
+        return managerHostName;
+    }
+
+    public void setManagerHostName(String managerHostName) {
+        this.managerHostName = managerHostName;
+    }
+
+    public int getManagerPort() {
+        return managerPort;
+    }
+
+    public void setManagerPort(int managerPort) {
+        this.managerPort = managerPort;
+    }
+
+    public String getLocalKey() {
+        return localKey;
+    }
+
+    public void setLocalKey(String local) {
+        this.localKey = local;
+    }
+
+    public void setManagerKey(String manager) {
+        this.managerKey = manager;
+    }
+
+    public String getManagerKey() {
+        return managerKey;
+
+    }
+}