changeset 138:6b33e7dfb146

マージ example
author ichikitakahiro <e165713@ie.u-ryukyu.ac.jp>
date Fri, 28 Dec 2018 20:06:41 +0900
parents f172c4540461 (diff) cb2a15d5c915 (current diff)
children 77169cd8a5e8 8caca9158f0f
files
diffstat 30 files changed, 247 insertions(+), 160 deletions(-) [+]
line wrap: on
line diff
--- a/build.gradle	Fri Dec 28 20:00:58 2018 +0900
+++ b/build.gradle	Fri Dec 28 20:06:41 2018 +0900
@@ -16,8 +16,10 @@
 dependencies {
     compile fileTree(dir: 'lib', include: '*.jar')
     testCompile('org.junit.jupiter:junit-jupiter-api:5.2.0')
-    compile group: 'org.msgpack', name: 'msgpack-core', version: '0.8.16'
-    compile group: 'org.msgpack', name: 'jackson-dataformat-msgpack', version: '0.8.16'
+    //compile group: 'org.msgpack', name: 'msgpack-core', version: '0.8.16'
+    //compile group: 'org.msgpack', name: 'jackson-dataformat-msgpack', version: '0.8.16'
+    compile group: 'org.msgpack', name: 'msgpack', version: '0.6.12'
+
 
 }
 
--- a/scripts/local_test_run.sh	Fri Dec 28 20:00:58 2018 +0900
+++ b/scripts/local_test_run.sh	Fri Dec 28 20:06:41 2018 +0900
@@ -15,14 +15,14 @@
 ruby ./ring.rb $1 > Log/ring.dot
 #dot -Tpng ./topology/ring.dot > ./topology/ring.png
 #open ./topology/ring.png
-java -cp ${jar_path}${topo_jarname} --localPort 10000 --confFile Log/ring.dot &
+java -jar ${jar_path}${topo_jarname} --localPort 10000 --confFile Log/ring.dot &
 
 sleep 3
 
 cnt=0
 while [ $cnt -lt $max ]
 do
-   java -cp ${jar_path}${torquetest_jarname} --managerHost localhost --managerPort 10000 --localPort `expr 20000 + $cnt`&
+   java -jar ${jar_path}${torquetest_jarname} --managerHost localhost --managerPort 10000 --localPort `expr 20000 + $cnt`&
    cnt=`expr $cnt + 1`
 done
 wait
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/scripts/local_test_run2.sh	Fri Dec 28 20:06:41 2018 +0900
@@ -0,0 +1,26 @@
+#!/bin/bash
+
+if [ ! -d output ]; then
+    mkdir output
+fi
+
+max=$1
+count=$2
+jar_path=../build/libs/Christie.jar
+
+mkdir -p Log
+
+ruby ./ring.rb $1 > Log/ring.dot
+#dot -Tpng ./topology/ring.dot > ./topology/ring.png
+#open ./topology/ring.png
+java -cp $jar_path christie.topology.manager.StartTopologyManager --localPort 10000 --confFile Log/ring.dot &
+
+#sleep 3
+
+cnt=0
+while [ $cnt -lt $max ]
+do
+   java -cp $jar_path christie.test.topology.localTestTopology.StartTorqueTestTopology --managerHost `hostname` --managerPort 10000 --localPort `expr 20000 + $cnt`&
+    cnt=`expr $cnt + 1`
+done
+wait
\ No newline at end of file
--- a/src/main/java/christie/codegear/CodeGear.java	Fri Dec 28 20:00:58 2018 +0900
+++ b/src/main/java/christie/codegear/CodeGear.java	Fri Dec 28 20:06:41 2018 +0900
@@ -66,12 +66,15 @@
         return cgm.getDGM(dgmName);
     }
 
-    public void put(String dgmName, String key, Object data){
+    public CodeGear put(String dgmName, String key, Object data){
         getDGM(dgmName).put(key, data);
+        return this;
     }
 
-    public void put(String key, Object value){
+    public CodeGear put(String key, Object value){
+
         localDGM.put(key, value);
+        return this;
     }
 
     public Boolean containsDGM(String dgmName){ return cgm.containsDGM(dgmName);}
--- a/src/main/java/christie/daemon/AcceptThread.java	Fri Dec 28 20:00:58 2018 +0900
+++ b/src/main/java/christie/daemon/AcceptThread.java	Fri Dec 28 20:06:41 2018 +0900
@@ -22,14 +22,14 @@
     public void run() {
         while (true) {
             try {
-                Socket socket = ss.accept();
+                Socket socket = null;
+                socket = ss.accept();
                 socket.setTcpNoDelay(true);
-                //System.out.println("Accept " + socket.getInetAddress().getHostName() + ":" + socket.getPort());
-                Connection connection = new Connection(socket);
-                connection.name = getName();
+                System.out.println("Accept " + socket.getInetAddress().getHostName() + ":" + socket.getPort());
+                Connection connection = new Connection(socket, cgm);
                 String key = "accept" + counter;
                 IncomingTcpConnection in =
-                        new IncomingTcpConnection(connection, cgm);
+                        new IncomingTcpConnection(connection);
                 in.setName(connection.getInfoString()+"-IncomingTcp");
                 in.start();
                 cgm.setAccept(key, in);
--- a/src/main/java/christie/daemon/Connection.java	Fri Dec 28 20:00:58 2018 +0900
+++ b/src/main/java/christie/daemon/Connection.java	Fri Dec 28 20:06:41 2018 +0900
@@ -1,21 +1,23 @@
 package christie.daemon;
 
-import java.awt.*;
 import java.net.Socket;
 import java.nio.ByteBuffer;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import christie.codegear.CodeGearManager;
 import christie.datagear.command.Command;
 
 public class Connection {
 
     public Socket socket;
     public String name;
+    public CodeGearManager cgm;
     public LinkedBlockingQueue<Command> sendQueue = new LinkedBlockingQueue<Command>();
     public boolean sendManager = true;
 
-    public Connection(Socket socket) {
+    public Connection(Socket socket, CodeGearManager cgm) {
         this.socket = socket;
+        this.cgm = cgm;
     }
 
     public Connection() {}
@@ -39,24 +41,24 @@
             socket.shutdownOutput();
             socket.shutdownInput();
             socket.close();
-        } catch (Exception e) { }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
         //putConnectionInfo();
 
     }
-
     /*
     public void putConnectionInfo() {
         if (name!=null) {
-            ConnectionInfo c = new ConnectionInfo(name, socket);
-            ReceiveData rData = new ReceiveData(c);
-            DataSegment.getLocal().put("_DISCONNECT", rData, false);
+            ConnectionInfo connectionInfo = new ConnectionInfo(name, socket);
+            cgm.getLocalDGM().put("_DISCONNECT", connectionInfo);
             if (sendManager) {
-                DataSegment.get("manager").put("_DISCONNECTNODE", rData, false);
+                cgm.getDGM("manager").put("_DISCONNECTNODE", connectionInfo);
                 sendManager = false;
             }
         }
+    }*/
 
-    }*/
     public synchronized void write(Command cmd) {
         ByteBuffer buffer = cmd.convert();
 
@@ -64,7 +66,7 @@
             while (buffer.hasRemaining()) {
                 socket.getChannel().write(buffer);
             }
-            System.out.println("write : " + cmd.key);
+            //System.out.println("write : " + cmd.key);
 
         } catch (Exception e) {
             e.printStackTrace();
--- a/src/main/java/christie/daemon/IncomingTcpConnection.java	Fri Dec 28 20:00:58 2018 +0900
+++ b/src/main/java/christie/daemon/IncomingTcpConnection.java	Fri Dec 28 20:06:41 2018 +0900
@@ -8,19 +8,12 @@
 import christie.datagear.RemoteMessage;
 import christie.datagear.command.RemotePeekCommand;
 import christie.datagear.command.RemoteTakeCommand;
-import christie.datagear.dg.DataGear;
 import christie.datagear.dg.MessagePackDataGear;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.msgpack.jackson.dataformat.MessagePackFactory;
-
-//import org.msgpack.MessagePack;
-//import org.msgpack.unpacker.Unpacker;
-
+import org.msgpack.MessagePack;
+import org.msgpack.unpacker.Unpacker;
 
 import java.io.EOFException;
 import java.io.IOException;
-import java.io.InputStream;
 import java.nio.channels.ClosedChannelException;
 
 public class IncomingTcpConnection extends Thread {
@@ -28,48 +21,42 @@
     RemoteDataGearManager manager;
     CodeGearManager cgm;
     Connection connection;
-    ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory());
+    private MessagePack packer = new MessagePack();
 
-    public IncomingTcpConnection(Connection connection, CodeGearManager cgm) {
+    public IncomingTcpConnection(Connection connection) {
         this.connection = connection;
-        this.cgm = cgm;
+        this.cgm = connection.cgm;
     }
 
-    public void setManager(RemoteDataGearManager manager) {
+    public void setManager(RemoteDataGearManager manager){
         this.manager = manager;
     }
 
     public void run() {
-
-        objectMapper.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false);
-
-        InputStream in = null;
+        Unpacker unpacker = null;
         try {
-            in = connection.socket.getInputStream();
+            unpacker = packer.createUnpacker(connection.socket.getInputStream());
         } catch (IOException e) {
             e.printStackTrace();
         }
-        if (in == null) {
-            System.out.println("return");
+        if (unpacker == null) {
             return;
         }
-
         while (true) {
             try {
-
-                RemoteMessage msg = objectMapper.readValue(in, RemoteMessage.class);
-
+                RemoteMessage msg = unpacker.read(RemoteMessage.class);
                 CommandType type = CommandType.getCommandTypeFromId(msg.type);
-                System.out.println("read : " + msg.key);
+                //System.out.println("read " + msg.key);
+                byte[] data;
 
                 switch (type) {
                     case PUT:
-                        byte[] msgpackdg = objectMapper.readValue(in, byte[].class);
-
+                        data = new byte[unpacker.readInt()];
+                        connection.socket.getInputStream().read(data);
                         try {
-                            MessagePackDataGear dg = new MessagePackDataGear(msgpackdg, Class.forName(msg.clazz));
+                            MessagePackDataGear dg = new MessagePackDataGear(data, Class.forName(msg.clazz));
                             cgm.getLocalDGM().put(msg.key, dg);
-                        } catch (Exception e) {
+                        } catch (ClassNotFoundException e) {
                             e.printStackTrace();
                         }
 
@@ -83,10 +70,11 @@
 
                         break;
                     case REPLY://待っていたwaitListに渡してcsにセット
-                        //data = new byte[objectMapper.readValue(convertedCommand, Integer.class)];
-                        //connection.socket.getInputStream().read(data);
+                        data = new byte[unpacker.readInt()];
+                        connection.socket.getInputStream().read(data);
+
                         try {
-                            MessagePackDataGear dg = new MessagePackDataGear(null, Class.forName(msg.clazz));
+                            MessagePackDataGear dg = new MessagePackDataGear(data, Class.forName(msg.clazz));
                             cgm.getDGM(msg.fromDgmName).resolveWaitCommand(msg.key, dg);
                         } catch (ClassNotFoundException e) {
                             e.printStackTrace();
@@ -96,12 +84,10 @@
                     default:
                         break;
                 }
-
             } catch (ClosedChannelException e) {
-                e.printStackTrace();
                 return;
-            } catch (EOFException e) {
-                e.printStackTrace();
+            }catch (EOFException e) {
+                return;
             } catch (IOException e) {
                 e.printStackTrace();
             }
--- a/src/main/java/christie/datagear/DataGearManager.java	Fri Dec 28 20:00:58 2018 +0900
+++ b/src/main/java/christie/datagear/DataGearManager.java	Fri Dec 28 20:06:41 2018 +0900
@@ -15,6 +15,8 @@
     public abstract void resolveWaitCommand(String key, DataGear dg);
     public abstract void finish();
     public abstract void close();
+    public abstract void shutdown();
+
 
 
 }
--- a/src/main/java/christie/datagear/LocalDataGearManager.java	Fri Dec 28 20:00:58 2018 +0900
+++ b/src/main/java/christie/datagear/LocalDataGearManager.java	Fri Dec 28 20:06:41 2018 +0900
@@ -89,7 +89,12 @@
     }
 
     @Override
-    public void close() {}
+    public void close() {
+
+    }
 
+    @Override
+    public void shutdown() {
 
+    }
 }
--- a/src/main/java/christie/datagear/RemoteDataGearManager.java	Fri Dec 28 20:00:58 2018 +0900
+++ b/src/main/java/christie/datagear/RemoteDataGearManager.java	Fri Dec 28 20:06:41 2018 +0900
@@ -10,6 +10,7 @@
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.channels.SocketChannel;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import static java.lang.Thread.MAX_PRIORITY;
 
@@ -27,7 +28,7 @@
                 do {
                     try {
                         SocketChannel sc = SocketChannel.open(new InetSocketAddress(address, port));
-                        connection = new Connection(sc.socket());
+                        connection = new Connection(sc.socket(), cgm);
                         connection.name = dgmName;
                         connection.socket.setTcpNoDelay(true);
 
@@ -44,7 +45,7 @@
                         }
                     }
                 } while (!connect);
-                IncomingTcpConnection in = new IncomingTcpConnection(connection, cgm);
+                IncomingTcpConnection in = new IncomingTcpConnection(connection);
                 in.setManager(manager);
                 in.setName(dgmName+"-IncomingTcp");
                 in.setPriority(MAX_PRIORITY);
@@ -91,6 +92,7 @@
         cm.setInputs();
     }
 
+
     @Override
     public void finish() {
         Command cmd = new FinishCommand();
@@ -100,11 +102,16 @@
     @Override
     public void close() {
         Command cmd = new CloseCommand();
-        connection.sendManager = false;
         connection.sendCommand(cmd);
     }
 
-    //
+    @Override
+    public void shutdown() {
+        connection.close();
+        LinkedBlockingQueue<Command> queue = connection.sendQueue;
+        if (!queue.isEmpty()) queue.clear();
+    }
+
     public void connectWait(){
         synchronized (lock){
             while(!connect){
@@ -114,7 +121,6 @@
                 }
             }
         }
-
     }
 
 
--- a/src/main/java/christie/datagear/RemoteMessage.java	Fri Dec 28 20:00:58 2018 +0900
+++ b/src/main/java/christie/datagear/RemoteMessage.java	Fri Dec 28 20:06:41 2018 +0900
@@ -1,5 +1,8 @@
 package christie.datagear;
 
+import org.msgpack.annotation.Message;
+
+@Message
 public class RemoteMessage {
     public int type;//PUT, PEEKなどのコマンドタイプ
     public String fromDgmName;//送り元のdsmName。REPLYのときに使用。
--- a/src/main/java/christie/datagear/command/PutCommand.java	Fri Dec 28 20:00:58 2018 +0900
+++ b/src/main/java/christie/datagear/command/PutCommand.java	Fri Dec 28 20:06:41 2018 +0900
@@ -4,14 +4,12 @@
 import christie.datagear.command.CommandType;
 import christie.datagear.dg.DataGear;
 import christie.datagear.dg.MessagePackDataGear;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.msgpack.jackson.dataformat.MessagePackFactory;
+import org.msgpack.MessagePack;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
 public class PutCommand extends Command {
-    ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory());
 
     public PutCommand(int cgmID, String toDgmName, String key, DataGear dg){
         this.type = CommandType.PUT;
@@ -25,20 +23,23 @@
     @Override
     public ByteBuffer convert() {
         ByteBuffer buf = null;
+        MessagePack packer = new MessagePack();
+
         try {
-            byte[] command = objectMapper.writeValueAsBytes(createRemoteMessage());
-            byte[] data = objectMapper.writeValueAsBytes(new MessagePackDataGear(dg.getData()).getMessagePack());
+            byte[] command = packer.write(createRemoteMessage());
+            byte[] data = new MessagePackDataGear(dg.getData()).getMessagePack();
+            byte[] dataSize = packer.write(data.length);
 
-            buf = ByteBuffer.allocate(command.length+data.length);
+            buf = ByteBuffer.allocate(command.length+dataSize.length+data.length);
             buf.put(command);
+            buf.put(dataSize);
             buf.put(data);
+
             buf.flip();
-
         } catch (IOException e) {
             e.printStackTrace();
         }
 
         return buf;
     }
-
 }
--- a/src/main/java/christie/datagear/command/RemoteTakeCommand.java	Fri Dec 28 20:00:58 2018 +0900
+++ b/src/main/java/christie/datagear/command/RemoteTakeCommand.java	Fri Dec 28 20:06:41 2018 +0900
@@ -5,16 +5,13 @@
 import christie.datagear.command.Command;
 import christie.datagear.command.CommandType;
 import christie.datagear.dg.MessagePackDataGear;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.msgpack.jackson.dataformat.MessagePackFactory;
+import org.msgpack.MessagePack;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
 public class RemoteTakeCommand extends Command {
 
-    ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory());
-
     public RemoteTakeCommand(RemoteMessage msg, Connection cn) {
         this.type = CommandType.REMOTETAKE;
         this.fromDgmName = msg.fromDgmName;
@@ -40,11 +37,10 @@
     @Override
     public ByteBuffer convert() {
         ByteBuffer buf = null;
+        MessagePack packer = new MessagePack();
 
         try {
-
-            byte[] command = objectMapper.writeValueAsBytes(createRemoteMessage());
-
+            byte[] command = packer.write(createRemoteMessage());
             buf = ByteBuffer.allocate(command.length);
             buf.put(command);
 
--- a/src/main/java/christie/datagear/dg/MessagePackDataGear.java	Fri Dec 28 20:00:58 2018 +0900
+++ b/src/main/java/christie/datagear/dg/MessagePackDataGear.java	Fri Dec 28 20:06:41 2018 +0900
@@ -1,18 +1,13 @@
 package christie.datagear.dg;
 
-//import org.msgpack.MessagePack;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.msgpack.jackson.dataformat.MessagePackFactory;
-
+import org.msgpack.MessagePack;
 
 import java.io.IOException;
 
 public class MessagePackDataGear<T> extends DataGear {//必ずmessagePack形式を持つDataGear
     private byte[] messagePack = null;
     private int dataSize;
-    //private MessagePack packer = new MessagePack();
-    ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory());
-
+    private MessagePack packer = new MessagePack();
 
     public MessagePackDataGear(T data){
         super(data);
@@ -33,7 +28,7 @@
             return messagePack;
         } else {
             try {
-                messagePack = objectMapper.writeValueAsBytes(data);
+                messagePack = packer.write(data);
                 setDataSize(messagePack.length);
             } catch (IOException e) {
                 e.printStackTrace();
@@ -46,7 +41,7 @@
     public synchronized T getData(){
         if (data == null){
             try {
-                setData(objectMapper.readValue(messagePack, clazz));
+                setData(packer.read(messagePack, clazz));
             } catch (IOException e) {
                 e.printStackTrace();
             }
--- a/src/main/java/christie/test/Paxos/Proposal.java	Fri Dec 28 20:00:58 2018 +0900
+++ b/src/main/java/christie/test/Paxos/Proposal.java	Fri Dec 28 20:06:41 2018 +0900
@@ -1,7 +1,9 @@
 package christie.test.Paxos;
 
 import christie.codegear.CodeGearManager;
+import org.msgpack.annotation.Message;
 
+@Message
 public class Proposal {
     private String proposerName = "";
     private int nodeNum = 0;
--- a/src/main/java/christie/test/topology/localTestTopology/LTRemoteIncrement.java	Fri Dec 28 20:00:58 2018 +0900
+++ b/src/main/java/christie/test/topology/localTestTopology/LTRemoteIncrement.java	Fri Dec 28 20:06:41 2018 +0900
@@ -25,7 +25,6 @@
 
         if (num == 3) {
             getDGM(topologyNodeConfig.getManagerKey()).put("finish", "");
-            getLocalDGM().finish();
         } else {
             num++;
             getDGM("right").put("num", num);
--- a/src/main/java/christie/test/topology/localTestTopology/StartLocalTestTopology.java	Fri Dec 28 20:00:58 2018 +0900
+++ b/src/main/java/christie/test/topology/localTestTopology/StartLocalTestTopology.java	Fri Dec 28 20:06:41 2018 +0900
@@ -22,17 +22,17 @@
         String[] managerArg = {"--localPort", String.valueOf(managerPort), "--confFile", "scripts/ring.dot"};
         TopologyManagerConfig topologyManagerConfig = new TopologyManagerConfig(managerArg);
         new StartTopologyManager(topologyManagerConfig);
-
+        CodeGearManager nodeCGM = null;
         for (int i = 1; i<=nodeNum; i++) {
-            CodeGearManager nodeCGM = createCGM(managerPort + i);
+            nodeCGM = createCGM(managerPort + i);
             String[] nodeArg = {"--managerPort", String.valueOf(managerPort),
                                 "--managerHost", "localhost"};
             TopologyNodeConfig cs  = new TopologyNodeConfig(nodeArg);
 
             new StartTopologyNode(nodeCGM, cs, new LTRemoteIncrement());
-            nodeCGM.getLocalDGM().put("num", 0);
 
         }
+        nodeCGM.getLocalDGM().put("num", 0);
 
 
     }
--- a/src/main/java/christie/test/topology/treeTestTopology/StartTreeTestTopology.java	Fri Dec 28 20:00:58 2018 +0900
+++ b/src/main/java/christie/test/topology/treeTestTopology/StartTreeTestTopology.java	Fri Dec 28 20:06:41 2018 +0900
@@ -17,19 +17,18 @@
     public static void main(String[] args) {
         int topologyManagerPort = 10000;
         int topologyNodePort = 10001;
-        int nodeNum = 3;
+        int nodeNum = 3; // 4以上にしたらtopologymanagerがfinishするので動かないけど...どうしよう.
         String[] managerArg = {"--localPort", String.valueOf(topologyManagerPort), "--Topology", "tree"};
         TopologyManagerConfig topologyManagerConfig = new TopologyManagerConfig(managerArg);
         new StartTopologyManager(topologyManagerConfig);
 
         for (int i = 0; i < nodeNum; i++) {
-            CodeGearManager nodeCGM = createCGM(topologyNodePort + i);
             String[] nodeArg = {
                     "--managerPort", String.valueOf(topologyManagerPort),
-                    "--managerHost", "localhost"};
+                    "--managerHost", "localhost",
+                    "--localPort", String.valueOf(topologyNodePort + i)};
             TopologyNodeConfig cs = new TopologyNodeConfig(nodeArg);
-            new StartTopologyNode(nodeCGM, cs, new ChildCodeGear());
-            nodeCGM.getLocalDGM().put("maxNodeNum", topologyManagerConfig.hasChild);
+            new StartTopologyNode(cs, new ChildCodeGear()).put("maxNodeNum", topologyManagerConfig.hasChild);
         }
 
     }
--- a/src/main/java/christie/topology/HostMessage.java	Fri Dec 28 20:00:58 2018 +0900
+++ b/src/main/java/christie/topology/HostMessage.java	Fri Dec 28 20:06:41 2018 +0900
@@ -1,10 +1,12 @@
 package christie.topology;
 
+import org.msgpack.annotation.Message;
+
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 
-
-public class HostMessage {
+@Message
+public class HostMessage implements Cloneable {
     private String hostName;
     private int port;
     private String nodeName; // this is nodeName which have these IP and port.
@@ -64,4 +66,18 @@
                 connectionName + " absName = " + nodeName + " remoteAbsName = " + remoteNodeName
                 + " cokkie = " + cookie ;
     }
+
+    @Override
+    public HostMessage clone(){
+
+        HostMessage cloneHostMessage = new HostMessage();
+        try {
+            cloneHostMessage = (HostMessage)super.clone();
+        }catch (Exception e){
+            e.printStackTrace();
+        }
+        return cloneHostMessage;
+    }
+
+
 }
--- a/src/main/java/christie/topology/manager/CreateTreeTopology.java	Fri Dec 28 20:00:58 2018 +0900
+++ b/src/main/java/christie/topology/manager/CreateTreeTopology.java	Fri Dec 28 20:06:41 2018 +0900
@@ -17,16 +17,16 @@
     @Take
     int hostCount;
 
-    @Peek
+    @Take
     HashMap<String, HostMessage> nameTable;
 
     @Take
     String MD5;
 
-    @Peek
+    @Take
     HashMap<String, String> absCookieTable;
 
-    @Peek
+    @Take
     ParentManager parentManager;
 
     public CreateTreeTopology(){
@@ -46,8 +46,9 @@
         getDGM(nodeName).put("cookie", MD5);
 
         absCookieTable.put(MD5, nodeName);
+        getLocalDGM().put("absCookieTable", absCookieTable);
+
         getLocalDGM().put("hostCount", hostCount + 1);
-
         newHost.setAlive(true);
         nameTable.put(nodeName, newHost);
         parentManager.register(nodeName);
@@ -63,10 +64,12 @@
             getDGM(nodeName).put("connectNodeNum", 1);
             // put parent information own
             String parentNodeName = parentManager.getMyParent();
-            HostMessage parentHost = nameTable.get(parentNodeName);
+            HostMessage parentHost = nameTable.get(parentNodeName).clone();
+
 
             // 相手からhostNameとportはもらっているので, nodeの情報だけ与えれば良い.
             parentHost.setNodeInfo(nodeName, "parent", parentNodeName);
+            //parentHost.setNodeInfo(parentNodeName, "child", nodeName);
             getLocalDGM().put("nodeInfo", parentHost);
             cgm.setup(new RecordTopology());
 
@@ -76,6 +79,9 @@
             cgm.setup(new RecordTopology());
         }
 
+        getLocalDGM().put("nameTable", nameTable);
+        getLocalDGM().put("parentManager", parentManager);
+
         getDGM(nodeName).put("start", "start");
         getLocalDGM().put("startTime", System.currentTimeMillis());
 
--- a/src/main/java/christie/topology/manager/IncomingHosts.java	Fri Dec 28 20:00:58 2018 +0900
+++ b/src/main/java/christie/topology/manager/IncomingHosts.java	Fri Dec 28 20:06:41 2018 +0900
@@ -12,11 +12,11 @@
 
 public class IncomingHosts extends CodeGear {
 
-    @Peek // Topology from parse file
+    @Take
     HashMap<String, LinkedList<HostMessage>> resultParse;
-    @Peek // nodeName list
+    @Take
     LinkedList<String> nodeNames;
-    @Peek
+    @Take
     HashMap<String, String> absCookieTable;
 
     @Take // new coming host info
@@ -32,6 +32,7 @@
     protected void run(CodeGearManager cgm) {
         // not have or match cookie
         String nodeName = nodeNames.poll();
+        getLocalDGM().put("nodeNames", nodeNames);
 
         String newHostName = newHost.getHostName();
         int newHostPort = newHost.getPort();
@@ -40,6 +41,7 @@
         cgm.createRemoteDGM(nodeName, newHostName, newHostPort);
 
         absCookieTable.put(MD5, nodeName);
+        getLocalDGM().put("absCookieTable", absCookieTable);
 
         getDGM(nodeName).put( "nodeName", nodeName);
         getDGM(nodeName).put("cookie", MD5);
@@ -53,6 +55,8 @@
 
             cgm.setup(new RecordTopology());
         }
+        getLocalDGM().put("resultParse", resultParse);
+
         cgm.setup(new IncomingHosts());
     }
 }
--- a/src/main/java/christie/topology/manager/RecordTopology.java	Fri Dec 28 20:00:58 2018 +0900
+++ b/src/main/java/christie/topology/manager/RecordTopology.java	Fri Dec 28 20:06:41 2018 +0900
@@ -16,7 +16,7 @@
     @Take
     HostMessage nodeInfo;
 
-    @Peek
+    @Take
     HashMap<String, HashMap<String, HostMessage>> topology; // ノード数nの全結合のデータ数 (n-1)*n + n
 
     @Override
@@ -43,6 +43,10 @@
                 }
             }
         }
+
+        getLocalDGM().put("topology", topology);
+
+
     }
 
 }
--- a/src/main/java/christie/topology/manager/SearchHostName.java	Fri Dec 28 20:00:58 2018 +0900
+++ b/src/main/java/christie/topology/manager/SearchHostName.java	Fri Dec 28 20:06:41 2018 +0900
@@ -6,7 +6,6 @@
 import christie.codegear.CodeGear;
 import christie.codegear.CodeGearManager;
 import christie.topology.HostMessage;
-import org.msgpack.type.ValueFactory;
 
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -48,7 +47,7 @@
             // ods.put(reconnectHost.absName, "dummy"); // this is bug
 
             if (running){
-                getLocalDGM().put(nodeName, ValueFactory.createNilValue());
+                getLocalDGM().put(nodeName, "");
             }
 
 
--- a/src/main/java/christie/topology/manager/TopologyFinish.java	Fri Dec 28 20:00:58 2018 +0900
+++ b/src/main/java/christie/topology/manager/TopologyFinish.java	Fri Dec 28 20:06:41 2018 +0900
@@ -7,7 +7,6 @@
 
 
 public class TopologyFinish extends CodeGear {
-
     @Take
     String finish;
 
@@ -19,11 +18,18 @@
 
     @Override
     protected void run(CodeGearManager cgm) {
+        if (topologyManagerConfig.showTime) {
+            System.out.println("TopologymanagerTime = " + (System.currentTimeMillis() - startTime));
+        }
 
-        if (topologyManagerConfig.showTime) {
-            System.out.println("TopologymanagerTime = "+ (System.currentTimeMillis()-startTime));
+
+       for(String dgm : cgm.getDGMList().keySet()){
+            getDGM(dgm).put("_CLOSEMESSEAGE", "_CLOSEMESSEAGE");
+            getDGM(dgm).shutdown();
         }
-        System.exit(0);
+
+        getLocalDGM().finish();
+
     }
 
 }
--- a/src/main/java/christie/topology/node/ConfigurationFinish.java	Fri Dec 28 20:00:58 2018 +0900
+++ b/src/main/java/christie/topology/node/ConfigurationFinish.java	Fri Dec 28 20:06:41 2018 +0900
@@ -25,17 +25,19 @@
     @Override
     protected void run(CodeGearManager cgm) {
         reverseCount++;
-        System.out.println(reverseCount + " " + connectNodeNum);
-        if (reverseCount == connectNodeNum) {
+        if (reverseCount >= connectNodeNum) {
             getDGM(topologyNodeConfig.getManagerKey()).put("nodePrepareDone", "done");
             cgm.setup(new Start());
 
 
             /*if (topologyNodeConfig.useKeepAlive)
-                cgm.setup(new KeepAlive());
+                cgm.setup(new KeepAlive());*/
             cgm.setup(new PrepareToClose());
-            ClosedEventManager.getInstance().register(DeleteConnection.class);
-            ClosedEventManager.getInstance().setKey();*/
+
+            /*ClosedEventManager closedEventManager = new ClosedEventManager();
+            closedEventManager.register(cgm, DeleteConnection.class);
+            cgm.setup(closedEventManager);
+            return;*/
 
         }else {
             getLocalDGM().put("reverseCount", reverseCount);
--- a/src/main/java/christie/topology/node/CreateConnectionList.java	Fri Dec 28 20:00:58 2018 +0900
+++ b/src/main/java/christie/topology/node/CreateConnectionList.java	Fri Dec 28 20:06:41 2018 +0900
@@ -8,7 +8,7 @@
 import java.util.List;
 
 public class CreateConnectionList extends CodeGear {
-    @Peek
+    @Take
     List<String> _CLIST;
     @Take
     String cMember;
@@ -19,6 +19,7 @@
     protected void run(CodeGearManager cgm) {
 
         _CLIST.add(cMember);
+        getLocalDGM().put("_CLIST", _CLIST);
     }
 
 }
--- a/src/main/java/christie/topology/node/IncomingReverseKey.java	Fri Dec 28 20:00:58 2018 +0900
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,25 +0,0 @@
-package christie.topology.node;
-
-
-import christie.annotation.Take;
-import christie.codegear.CodeGear;
-import christie.codegear.CodeGearManager;
-
-
-// pingを受け取るみたいなクラス. 相手がreverseNodeNameを送ってきたらreverseCountをプラスする
-public class IncomingReverseKey extends CodeGear {
-
-    @Take
-    String reverseNodeName;
-
-    @Take
-    int reverseCount;
-
-    @Override
-    protected void run(CodeGearManager cgm) {
-        reverseCount++;
-        getLocalDGM().put( "reverseCount", reverseCount);
-        cgm.setup(new IncomingReverseKey());
-    }
-
-}
--- a/src/main/java/christie/topology/node/PrepareToClose.java	Fri Dec 28 20:00:58 2018 +0900
+++ b/src/main/java/christie/topology/node/PrepareToClose.java	Fri Dec 28 20:06:41 2018 +0900
@@ -5,16 +5,20 @@
 import christie.annotation.Take;
 import christie.codegear.CodeGear;
 import christie.codegear.CodeGearManager;
+import christie.datagear.DataGearManager;
 
 import java.util.List;
 
 public class PrepareToClose extends CodeGear {
 
-    @Peek
+    @Take
     String _CLOSEMESSEAGE;
 
+    @Take
+    List<String> _CLIST;
+
     @Peek
-    List<String> _CLIST;
+    TopologyNodeConfig topologyNodeConfig;
 
 
     public PrepareToClose() {
@@ -23,23 +27,27 @@
 
     @Override
     protected void run(CodeGearManager cgm) {
-        /*
-        if (_CLIST.contains(_CLOSEMESSEAGE)) {
+
+        for(String dgmName: _CLIST){
+            getDGM(dgmName).shutdown();
+            //_CLIST.remove(dgmName);
+        }
+
+        getLocalDGM().finish();
+
+        /*if (_CLIST.contains(_CLOSEMESSEAGE)) {
             _CLIST.remove(_CLOSEMESSEAGE);
 
-            DataSegmentManager manager = DataSegment.get(_CLOSEMESSEAGE);
-            manager.setSendError(false);
-
-            ods.put(_CLOSEMESSEAGE, "_CLOSEREADY", _CLOSEMESSEAGE);
-            ods.put("_CLOSEREADY", _CLOSEMESSEAGE);
+            getDGM(_CLOSEMESSEAGE).setSendError(false);
+            getDGM(_CLOSEMESSEAGE).close();
 
-            new CloseRemoteDataSegment();
-        } else {
-            // lost node is this node's parent, so already removed
-            new ReceiveCloseMessage(CommandType.TAKE);
-        }*/
+            getDGM(_CLOSEMESSEAGE).put("_CLOSEREADY", _CLOSEMESSEAGE);
+            getLocalDGM().put("_CLOSEREADY", _CLOSEMESSEAGE);
+
+            cgm.setup(new CloseRemoteDataGear());
+        }
         cgm.setup(new PrepareToClose());
-        //getLocalDGM().put("_CLIST", _CLIST);
+        getLocalDGM().put("_CLIST", _CLIST);*/
 
     }
 
--- a/src/main/java/christie/topology/node/TopologyNode.java	Fri Dec 28 20:00:58 2018 +0900
+++ b/src/main/java/christie/topology/node/TopologyNode.java	Fri Dec 28 20:06:41 2018 +0900
@@ -34,7 +34,6 @@
         getLocalDGM().put("reverseCount", 0);
 
         cgm.setup(new IncomingConnectionInfo());
-        //cgm.setup(new IncomingReverseKey());
         cgm.setup(new ConfigurationFinish());
 
     }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/test/java/christie/localTestTopology/localTestTopology.java	Fri Dec 28 20:06:41 2018 +0900
@@ -0,0 +1,40 @@
+package christie.localTestTopology;
+
+import christie.test.topology.localTestTopology.LTRemoteIncrement;
+import christie.topology.manager.StartTopologyManager;
+import christie.topology.manager.TopologyManagerConfig;
+import christie.topology.node.StartTopologyNode;
+import christie.topology.node.TopologyNodeConfig;
+
+public class localTestTopology {
+
+
+    public static void main(String[] args) {
+        /* Local Test */
+        int managerPort = 10000;
+        int nodeNum = 3;
+        String[] managerArg = {"--localPort", String.valueOf(managerPort), "--confFile", "scripts/ring.dot"};
+        TopologyManagerConfig topologyManagerConfig = new TopologyManagerConfig(managerArg);
+        new StartTopologyManager(topologyManagerConfig);
+
+
+        for (int i = 1; i<=nodeNum; i++) {
+
+            String[] nodeArg = {"--managerPort", String.valueOf(managerPort),
+                    "--managerHost", "localhost",
+                    "--localPort", String.valueOf(managerPort + i)};
+            /*try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }*/
+            TopologyNodeConfig topologyNodeConfig = new TopologyNodeConfig(nodeArg);
+            StartTopologyNode startTopologyNode = new StartTopologyNode(topologyNodeConfig, new LTRemoteIncrement());
+            startTopologyNode.getLocalDGM().put("num", 0);
+
+        }
+
+
+    }
+
+}