Mercurial > hg > Database > Christie
changeset 138:6b33e7dfb146
マージ example
author | ichikitakahiro <e165713@ie.u-ryukyu.ac.jp> |
---|---|
date | Fri, 28 Dec 2018 20:06:41 +0900 |
parents | f172c4540461 (diff) cb2a15d5c915 (current diff) |
children | 77169cd8a5e8 8caca9158f0f |
files | |
diffstat | 30 files changed, 247 insertions(+), 160 deletions(-) [+] |
line wrap: on
line diff
--- a/build.gradle Fri Dec 28 20:00:58 2018 +0900 +++ b/build.gradle Fri Dec 28 20:06:41 2018 +0900 @@ -16,8 +16,10 @@ dependencies { compile fileTree(dir: 'lib', include: '*.jar') testCompile('org.junit.jupiter:junit-jupiter-api:5.2.0') - compile group: 'org.msgpack', name: 'msgpack-core', version: '0.8.16' - compile group: 'org.msgpack', name: 'jackson-dataformat-msgpack', version: '0.8.16' + //compile group: 'org.msgpack', name: 'msgpack-core', version: '0.8.16' + //compile group: 'org.msgpack', name: 'jackson-dataformat-msgpack', version: '0.8.16' + compile group: 'org.msgpack', name: 'msgpack', version: '0.6.12' + }
--- a/scripts/local_test_run.sh Fri Dec 28 20:00:58 2018 +0900 +++ b/scripts/local_test_run.sh Fri Dec 28 20:06:41 2018 +0900 @@ -15,14 +15,14 @@ ruby ./ring.rb $1 > Log/ring.dot #dot -Tpng ./topology/ring.dot > ./topology/ring.png #open ./topology/ring.png -java -cp ${jar_path}${topo_jarname} --localPort 10000 --confFile Log/ring.dot & +java -jar ${jar_path}${topo_jarname} --localPort 10000 --confFile Log/ring.dot & sleep 3 cnt=0 while [ $cnt -lt $max ] do - java -cp ${jar_path}${torquetest_jarname} --managerHost localhost --managerPort 10000 --localPort `expr 20000 + $cnt`& + java -jar ${jar_path}${torquetest_jarname} --managerHost localhost --managerPort 10000 --localPort `expr 20000 + $cnt`& cnt=`expr $cnt + 1` done wait
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/scripts/local_test_run2.sh Fri Dec 28 20:06:41 2018 +0900 @@ -0,0 +1,26 @@ +#!/bin/bash + +if [ ! -d output ]; then + mkdir output +fi + +max=$1 +count=$2 +jar_path=../build/libs/Christie.jar + +mkdir -p Log + +ruby ./ring.rb $1 > Log/ring.dot +#dot -Tpng ./topology/ring.dot > ./topology/ring.png +#open ./topology/ring.png +java -cp $jar_path christie.topology.manager.StartTopologyManager --localPort 10000 --confFile Log/ring.dot & + +#sleep 3 + +cnt=0 +while [ $cnt -lt $max ] +do + java -cp $jar_path christie.test.topology.localTestTopology.StartTorqueTestTopology --managerHost `hostname` --managerPort 10000 --localPort `expr 20000 + $cnt`& + cnt=`expr $cnt + 1` +done +wait \ No newline at end of file
--- a/src/main/java/christie/codegear/CodeGear.java Fri Dec 28 20:00:58 2018 +0900 +++ b/src/main/java/christie/codegear/CodeGear.java Fri Dec 28 20:06:41 2018 +0900 @@ -66,12 +66,15 @@ return cgm.getDGM(dgmName); } - public void put(String dgmName, String key, Object data){ + public CodeGear put(String dgmName, String key, Object data){ getDGM(dgmName).put(key, data); + return this; } - public void put(String key, Object value){ + public CodeGear put(String key, Object value){ + localDGM.put(key, value); + return this; } public Boolean containsDGM(String dgmName){ return cgm.containsDGM(dgmName);}
--- a/src/main/java/christie/daemon/AcceptThread.java Fri Dec 28 20:00:58 2018 +0900 +++ b/src/main/java/christie/daemon/AcceptThread.java Fri Dec 28 20:06:41 2018 +0900 @@ -22,14 +22,14 @@ public void run() { while (true) { try { - Socket socket = ss.accept(); + Socket socket = null; + socket = ss.accept(); socket.setTcpNoDelay(true); - //System.out.println("Accept " + socket.getInetAddress().getHostName() + ":" + socket.getPort()); - Connection connection = new Connection(socket); - connection.name = getName(); + System.out.println("Accept " + socket.getInetAddress().getHostName() + ":" + socket.getPort()); + Connection connection = new Connection(socket, cgm); String key = "accept" + counter; IncomingTcpConnection in = - new IncomingTcpConnection(connection, cgm); + new IncomingTcpConnection(connection); in.setName(connection.getInfoString()+"-IncomingTcp"); in.start(); cgm.setAccept(key, in);
--- a/src/main/java/christie/daemon/Connection.java Fri Dec 28 20:00:58 2018 +0900 +++ b/src/main/java/christie/daemon/Connection.java Fri Dec 28 20:06:41 2018 +0900 @@ -1,21 +1,23 @@ package christie.daemon; -import java.awt.*; import java.net.Socket; import java.nio.ByteBuffer; import java.util.concurrent.LinkedBlockingQueue; +import christie.codegear.CodeGearManager; import christie.datagear.command.Command; public class Connection { public Socket socket; public String name; + public CodeGearManager cgm; public LinkedBlockingQueue<Command> sendQueue = new LinkedBlockingQueue<Command>(); public boolean sendManager = true; - public Connection(Socket socket) { + public Connection(Socket socket, CodeGearManager cgm) { this.socket = socket; + this.cgm = cgm; } public Connection() {} @@ -39,24 +41,24 @@ socket.shutdownOutput(); socket.shutdownInput(); socket.close(); - } catch (Exception e) { } + } catch (Exception e) { + e.printStackTrace(); + } //putConnectionInfo(); } - /* public void putConnectionInfo() { if (name!=null) { - ConnectionInfo c = new ConnectionInfo(name, socket); - ReceiveData rData = new ReceiveData(c); - DataSegment.getLocal().put("_DISCONNECT", rData, false); + ConnectionInfo connectionInfo = new ConnectionInfo(name, socket); + cgm.getLocalDGM().put("_DISCONNECT", connectionInfo); if (sendManager) { - DataSegment.get("manager").put("_DISCONNECTNODE", rData, false); + cgm.getDGM("manager").put("_DISCONNECTNODE", connectionInfo); sendManager = false; } } + }*/ - }*/ public synchronized void write(Command cmd) { ByteBuffer buffer = cmd.convert(); @@ -64,7 +66,7 @@ while (buffer.hasRemaining()) { socket.getChannel().write(buffer); } - System.out.println("write : " + cmd.key); + //System.out.println("write : " + cmd.key); } catch (Exception e) { e.printStackTrace();
--- a/src/main/java/christie/daemon/IncomingTcpConnection.java Fri Dec 28 20:00:58 2018 +0900 +++ b/src/main/java/christie/daemon/IncomingTcpConnection.java Fri Dec 28 20:06:41 2018 +0900 @@ -8,19 +8,12 @@ import christie.datagear.RemoteMessage; import christie.datagear.command.RemotePeekCommand; import christie.datagear.command.RemoteTakeCommand; -import christie.datagear.dg.DataGear; import christie.datagear.dg.MessagePackDataGear; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.msgpack.jackson.dataformat.MessagePackFactory; - -//import org.msgpack.MessagePack; -//import org.msgpack.unpacker.Unpacker; - +import org.msgpack.MessagePack; +import org.msgpack.unpacker.Unpacker; import java.io.EOFException; import java.io.IOException; -import java.io.InputStream; import java.nio.channels.ClosedChannelException; public class IncomingTcpConnection extends Thread { @@ -28,48 +21,42 @@ RemoteDataGearManager manager; CodeGearManager cgm; Connection connection; - ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory()); + private MessagePack packer = new MessagePack(); - public IncomingTcpConnection(Connection connection, CodeGearManager cgm) { + public IncomingTcpConnection(Connection connection) { this.connection = connection; - this.cgm = cgm; + this.cgm = connection.cgm; } - public void setManager(RemoteDataGearManager manager) { + public void setManager(RemoteDataGearManager manager){ this.manager = manager; } public void run() { - - objectMapper.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false); - - InputStream in = null; + Unpacker unpacker = null; try { - in = connection.socket.getInputStream(); + unpacker = packer.createUnpacker(connection.socket.getInputStream()); } catch (IOException e) { e.printStackTrace(); } - if (in == null) { - System.out.println("return"); + if (unpacker == null) { return; } - while (true) { try { - - RemoteMessage msg = objectMapper.readValue(in, RemoteMessage.class); - + RemoteMessage msg = unpacker.read(RemoteMessage.class); CommandType type = CommandType.getCommandTypeFromId(msg.type); - System.out.println("read : " + msg.key); + //System.out.println("read " + msg.key); + byte[] data; switch (type) { case PUT: - byte[] msgpackdg = objectMapper.readValue(in, byte[].class); - + data = new byte[unpacker.readInt()]; + connection.socket.getInputStream().read(data); try { - MessagePackDataGear dg = new MessagePackDataGear(msgpackdg, Class.forName(msg.clazz)); + MessagePackDataGear dg = new MessagePackDataGear(data, Class.forName(msg.clazz)); cgm.getLocalDGM().put(msg.key, dg); - } catch (Exception e) { + } catch (ClassNotFoundException e) { e.printStackTrace(); } @@ -83,10 +70,11 @@ break; case REPLY://待っていたwaitListに渡してcsにセット - //data = new byte[objectMapper.readValue(convertedCommand, Integer.class)]; - //connection.socket.getInputStream().read(data); + data = new byte[unpacker.readInt()]; + connection.socket.getInputStream().read(data); + try { - MessagePackDataGear dg = new MessagePackDataGear(null, Class.forName(msg.clazz)); + MessagePackDataGear dg = new MessagePackDataGear(data, Class.forName(msg.clazz)); cgm.getDGM(msg.fromDgmName).resolveWaitCommand(msg.key, dg); } catch (ClassNotFoundException e) { e.printStackTrace(); @@ -96,12 +84,10 @@ default: break; } - } catch (ClosedChannelException e) { - e.printStackTrace(); return; - } catch (EOFException e) { - e.printStackTrace(); + }catch (EOFException e) { + return; } catch (IOException e) { e.printStackTrace(); }
--- a/src/main/java/christie/datagear/DataGearManager.java Fri Dec 28 20:00:58 2018 +0900 +++ b/src/main/java/christie/datagear/DataGearManager.java Fri Dec 28 20:06:41 2018 +0900 @@ -15,6 +15,8 @@ public abstract void resolveWaitCommand(String key, DataGear dg); public abstract void finish(); public abstract void close(); + public abstract void shutdown(); + }
--- a/src/main/java/christie/datagear/LocalDataGearManager.java Fri Dec 28 20:00:58 2018 +0900 +++ b/src/main/java/christie/datagear/LocalDataGearManager.java Fri Dec 28 20:06:41 2018 +0900 @@ -89,7 +89,12 @@ } @Override - public void close() {} + public void close() { + + } + @Override + public void shutdown() { + } }
--- a/src/main/java/christie/datagear/RemoteDataGearManager.java Fri Dec 28 20:00:58 2018 +0900 +++ b/src/main/java/christie/datagear/RemoteDataGearManager.java Fri Dec 28 20:06:41 2018 +0900 @@ -10,6 +10,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SocketChannel; +import java.util.concurrent.LinkedBlockingQueue; import static java.lang.Thread.MAX_PRIORITY; @@ -27,7 +28,7 @@ do { try { SocketChannel sc = SocketChannel.open(new InetSocketAddress(address, port)); - connection = new Connection(sc.socket()); + connection = new Connection(sc.socket(), cgm); connection.name = dgmName; connection.socket.setTcpNoDelay(true); @@ -44,7 +45,7 @@ } } } while (!connect); - IncomingTcpConnection in = new IncomingTcpConnection(connection, cgm); + IncomingTcpConnection in = new IncomingTcpConnection(connection); in.setManager(manager); in.setName(dgmName+"-IncomingTcp"); in.setPriority(MAX_PRIORITY); @@ -91,6 +92,7 @@ cm.setInputs(); } + @Override public void finish() { Command cmd = new FinishCommand(); @@ -100,11 +102,16 @@ @Override public void close() { Command cmd = new CloseCommand(); - connection.sendManager = false; connection.sendCommand(cmd); } - // + @Override + public void shutdown() { + connection.close(); + LinkedBlockingQueue<Command> queue = connection.sendQueue; + if (!queue.isEmpty()) queue.clear(); + } + public void connectWait(){ synchronized (lock){ while(!connect){ @@ -114,7 +121,6 @@ } } } - }
--- a/src/main/java/christie/datagear/RemoteMessage.java Fri Dec 28 20:00:58 2018 +0900 +++ b/src/main/java/christie/datagear/RemoteMessage.java Fri Dec 28 20:06:41 2018 +0900 @@ -1,5 +1,8 @@ package christie.datagear; +import org.msgpack.annotation.Message; + +@Message public class RemoteMessage { public int type;//PUT, PEEKなどのコマンドタイプ public String fromDgmName;//送り元のdsmName。REPLYのときに使用。
--- a/src/main/java/christie/datagear/command/PutCommand.java Fri Dec 28 20:00:58 2018 +0900 +++ b/src/main/java/christie/datagear/command/PutCommand.java Fri Dec 28 20:06:41 2018 +0900 @@ -4,14 +4,12 @@ import christie.datagear.command.CommandType; import christie.datagear.dg.DataGear; import christie.datagear.dg.MessagePackDataGear; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.msgpack.jackson.dataformat.MessagePackFactory; +import org.msgpack.MessagePack; import java.io.IOException; import java.nio.ByteBuffer; public class PutCommand extends Command { - ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory()); public PutCommand(int cgmID, String toDgmName, String key, DataGear dg){ this.type = CommandType.PUT; @@ -25,20 +23,23 @@ @Override public ByteBuffer convert() { ByteBuffer buf = null; + MessagePack packer = new MessagePack(); + try { - byte[] command = objectMapper.writeValueAsBytes(createRemoteMessage()); - byte[] data = objectMapper.writeValueAsBytes(new MessagePackDataGear(dg.getData()).getMessagePack()); + byte[] command = packer.write(createRemoteMessage()); + byte[] data = new MessagePackDataGear(dg.getData()).getMessagePack(); + byte[] dataSize = packer.write(data.length); - buf = ByteBuffer.allocate(command.length+data.length); + buf = ByteBuffer.allocate(command.length+dataSize.length+data.length); buf.put(command); + buf.put(dataSize); buf.put(data); + buf.flip(); - } catch (IOException e) { e.printStackTrace(); } return buf; } - }
--- a/src/main/java/christie/datagear/command/RemoteTakeCommand.java Fri Dec 28 20:00:58 2018 +0900 +++ b/src/main/java/christie/datagear/command/RemoteTakeCommand.java Fri Dec 28 20:06:41 2018 +0900 @@ -5,16 +5,13 @@ import christie.datagear.command.Command; import christie.datagear.command.CommandType; import christie.datagear.dg.MessagePackDataGear; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.msgpack.jackson.dataformat.MessagePackFactory; +import org.msgpack.MessagePack; import java.io.IOException; import java.nio.ByteBuffer; public class RemoteTakeCommand extends Command { - ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory()); - public RemoteTakeCommand(RemoteMessage msg, Connection cn) { this.type = CommandType.REMOTETAKE; this.fromDgmName = msg.fromDgmName; @@ -40,11 +37,10 @@ @Override public ByteBuffer convert() { ByteBuffer buf = null; + MessagePack packer = new MessagePack(); try { - - byte[] command = objectMapper.writeValueAsBytes(createRemoteMessage()); - + byte[] command = packer.write(createRemoteMessage()); buf = ByteBuffer.allocate(command.length); buf.put(command);
--- a/src/main/java/christie/datagear/dg/MessagePackDataGear.java Fri Dec 28 20:00:58 2018 +0900 +++ b/src/main/java/christie/datagear/dg/MessagePackDataGear.java Fri Dec 28 20:06:41 2018 +0900 @@ -1,18 +1,13 @@ package christie.datagear.dg; -//import org.msgpack.MessagePack; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.msgpack.jackson.dataformat.MessagePackFactory; - +import org.msgpack.MessagePack; import java.io.IOException; public class MessagePackDataGear<T> extends DataGear {//必ずmessagePack形式を持つDataGear private byte[] messagePack = null; private int dataSize; - //private MessagePack packer = new MessagePack(); - ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory()); - + private MessagePack packer = new MessagePack(); public MessagePackDataGear(T data){ super(data); @@ -33,7 +28,7 @@ return messagePack; } else { try { - messagePack = objectMapper.writeValueAsBytes(data); + messagePack = packer.write(data); setDataSize(messagePack.length); } catch (IOException e) { e.printStackTrace(); @@ -46,7 +41,7 @@ public synchronized T getData(){ if (data == null){ try { - setData(objectMapper.readValue(messagePack, clazz)); + setData(packer.read(messagePack, clazz)); } catch (IOException e) { e.printStackTrace(); }
--- a/src/main/java/christie/test/Paxos/Proposal.java Fri Dec 28 20:00:58 2018 +0900 +++ b/src/main/java/christie/test/Paxos/Proposal.java Fri Dec 28 20:06:41 2018 +0900 @@ -1,7 +1,9 @@ package christie.test.Paxos; import christie.codegear.CodeGearManager; +import org.msgpack.annotation.Message; +@Message public class Proposal { private String proposerName = ""; private int nodeNum = 0;
--- a/src/main/java/christie/test/topology/localTestTopology/LTRemoteIncrement.java Fri Dec 28 20:00:58 2018 +0900 +++ b/src/main/java/christie/test/topology/localTestTopology/LTRemoteIncrement.java Fri Dec 28 20:06:41 2018 +0900 @@ -25,7 +25,6 @@ if (num == 3) { getDGM(topologyNodeConfig.getManagerKey()).put("finish", ""); - getLocalDGM().finish(); } else { num++; getDGM("right").put("num", num);
--- a/src/main/java/christie/test/topology/localTestTopology/StartLocalTestTopology.java Fri Dec 28 20:00:58 2018 +0900 +++ b/src/main/java/christie/test/topology/localTestTopology/StartLocalTestTopology.java Fri Dec 28 20:06:41 2018 +0900 @@ -22,17 +22,17 @@ String[] managerArg = {"--localPort", String.valueOf(managerPort), "--confFile", "scripts/ring.dot"}; TopologyManagerConfig topologyManagerConfig = new TopologyManagerConfig(managerArg); new StartTopologyManager(topologyManagerConfig); - + CodeGearManager nodeCGM = null; for (int i = 1; i<=nodeNum; i++) { - CodeGearManager nodeCGM = createCGM(managerPort + i); + nodeCGM = createCGM(managerPort + i); String[] nodeArg = {"--managerPort", String.valueOf(managerPort), "--managerHost", "localhost"}; TopologyNodeConfig cs = new TopologyNodeConfig(nodeArg); new StartTopologyNode(nodeCGM, cs, new LTRemoteIncrement()); - nodeCGM.getLocalDGM().put("num", 0); } + nodeCGM.getLocalDGM().put("num", 0); }
--- a/src/main/java/christie/test/topology/treeTestTopology/StartTreeTestTopology.java Fri Dec 28 20:00:58 2018 +0900 +++ b/src/main/java/christie/test/topology/treeTestTopology/StartTreeTestTopology.java Fri Dec 28 20:06:41 2018 +0900 @@ -17,19 +17,18 @@ public static void main(String[] args) { int topologyManagerPort = 10000; int topologyNodePort = 10001; - int nodeNum = 3; + int nodeNum = 3; // 4以上にしたらtopologymanagerがfinishするので動かないけど...どうしよう. String[] managerArg = {"--localPort", String.valueOf(topologyManagerPort), "--Topology", "tree"}; TopologyManagerConfig topologyManagerConfig = new TopologyManagerConfig(managerArg); new StartTopologyManager(topologyManagerConfig); for (int i = 0; i < nodeNum; i++) { - CodeGearManager nodeCGM = createCGM(topologyNodePort + i); String[] nodeArg = { "--managerPort", String.valueOf(topologyManagerPort), - "--managerHost", "localhost"}; + "--managerHost", "localhost", + "--localPort", String.valueOf(topologyNodePort + i)}; TopologyNodeConfig cs = new TopologyNodeConfig(nodeArg); - new StartTopologyNode(nodeCGM, cs, new ChildCodeGear()); - nodeCGM.getLocalDGM().put("maxNodeNum", topologyManagerConfig.hasChild); + new StartTopologyNode(cs, new ChildCodeGear()).put("maxNodeNum", topologyManagerConfig.hasChild); } }
--- a/src/main/java/christie/topology/HostMessage.java Fri Dec 28 20:00:58 2018 +0900 +++ b/src/main/java/christie/topology/HostMessage.java Fri Dec 28 20:06:41 2018 +0900 @@ -1,10 +1,12 @@ package christie.topology; +import org.msgpack.annotation.Message; + import java.net.InetAddress; import java.net.UnknownHostException; - -public class HostMessage { +@Message +public class HostMessage implements Cloneable { private String hostName; private int port; private String nodeName; // this is nodeName which have these IP and port. @@ -64,4 +66,18 @@ connectionName + " absName = " + nodeName + " remoteAbsName = " + remoteNodeName + " cokkie = " + cookie ; } + + @Override + public HostMessage clone(){ + + HostMessage cloneHostMessage = new HostMessage(); + try { + cloneHostMessage = (HostMessage)super.clone(); + }catch (Exception e){ + e.printStackTrace(); + } + return cloneHostMessage; + } + + }
--- a/src/main/java/christie/topology/manager/CreateTreeTopology.java Fri Dec 28 20:00:58 2018 +0900 +++ b/src/main/java/christie/topology/manager/CreateTreeTopology.java Fri Dec 28 20:06:41 2018 +0900 @@ -17,16 +17,16 @@ @Take int hostCount; - @Peek + @Take HashMap<String, HostMessage> nameTable; @Take String MD5; - @Peek + @Take HashMap<String, String> absCookieTable; - @Peek + @Take ParentManager parentManager; public CreateTreeTopology(){ @@ -46,8 +46,9 @@ getDGM(nodeName).put("cookie", MD5); absCookieTable.put(MD5, nodeName); + getLocalDGM().put("absCookieTable", absCookieTable); + getLocalDGM().put("hostCount", hostCount + 1); - newHost.setAlive(true); nameTable.put(nodeName, newHost); parentManager.register(nodeName); @@ -63,10 +64,12 @@ getDGM(nodeName).put("connectNodeNum", 1); // put parent information own String parentNodeName = parentManager.getMyParent(); - HostMessage parentHost = nameTable.get(parentNodeName); + HostMessage parentHost = nameTable.get(parentNodeName).clone(); + // 相手からhostNameとportはもらっているので, nodeの情報だけ与えれば良い. parentHost.setNodeInfo(nodeName, "parent", parentNodeName); + //parentHost.setNodeInfo(parentNodeName, "child", nodeName); getLocalDGM().put("nodeInfo", parentHost); cgm.setup(new RecordTopology()); @@ -76,6 +79,9 @@ cgm.setup(new RecordTopology()); } + getLocalDGM().put("nameTable", nameTable); + getLocalDGM().put("parentManager", parentManager); + getDGM(nodeName).put("start", "start"); getLocalDGM().put("startTime", System.currentTimeMillis());
--- a/src/main/java/christie/topology/manager/IncomingHosts.java Fri Dec 28 20:00:58 2018 +0900 +++ b/src/main/java/christie/topology/manager/IncomingHosts.java Fri Dec 28 20:06:41 2018 +0900 @@ -12,11 +12,11 @@ public class IncomingHosts extends CodeGear { - @Peek // Topology from parse file + @Take HashMap<String, LinkedList<HostMessage>> resultParse; - @Peek // nodeName list + @Take LinkedList<String> nodeNames; - @Peek + @Take HashMap<String, String> absCookieTable; @Take // new coming host info @@ -32,6 +32,7 @@ protected void run(CodeGearManager cgm) { // not have or match cookie String nodeName = nodeNames.poll(); + getLocalDGM().put("nodeNames", nodeNames); String newHostName = newHost.getHostName(); int newHostPort = newHost.getPort(); @@ -40,6 +41,7 @@ cgm.createRemoteDGM(nodeName, newHostName, newHostPort); absCookieTable.put(MD5, nodeName); + getLocalDGM().put("absCookieTable", absCookieTable); getDGM(nodeName).put( "nodeName", nodeName); getDGM(nodeName).put("cookie", MD5); @@ -53,6 +55,8 @@ cgm.setup(new RecordTopology()); } + getLocalDGM().put("resultParse", resultParse); + cgm.setup(new IncomingHosts()); } }
--- a/src/main/java/christie/topology/manager/RecordTopology.java Fri Dec 28 20:00:58 2018 +0900 +++ b/src/main/java/christie/topology/manager/RecordTopology.java Fri Dec 28 20:06:41 2018 +0900 @@ -16,7 +16,7 @@ @Take HostMessage nodeInfo; - @Peek + @Take HashMap<String, HashMap<String, HostMessage>> topology; // ノード数nの全結合のデータ数 (n-1)*n + n @Override @@ -43,6 +43,10 @@ } } } + + getLocalDGM().put("topology", topology); + + } }
--- a/src/main/java/christie/topology/manager/SearchHostName.java Fri Dec 28 20:00:58 2018 +0900 +++ b/src/main/java/christie/topology/manager/SearchHostName.java Fri Dec 28 20:06:41 2018 +0900 @@ -6,7 +6,6 @@ import christie.codegear.CodeGear; import christie.codegear.CodeGearManager; import christie.topology.HostMessage; -import org.msgpack.type.ValueFactory; import java.util.HashMap; import java.util.LinkedList; @@ -48,7 +47,7 @@ // ods.put(reconnectHost.absName, "dummy"); // this is bug if (running){ - getLocalDGM().put(nodeName, ValueFactory.createNilValue()); + getLocalDGM().put(nodeName, ""); }
--- a/src/main/java/christie/topology/manager/TopologyFinish.java Fri Dec 28 20:00:58 2018 +0900 +++ b/src/main/java/christie/topology/manager/TopologyFinish.java Fri Dec 28 20:06:41 2018 +0900 @@ -7,7 +7,6 @@ public class TopologyFinish extends CodeGear { - @Take String finish; @@ -19,11 +18,18 @@ @Override protected void run(CodeGearManager cgm) { + if (topologyManagerConfig.showTime) { + System.out.println("TopologymanagerTime = " + (System.currentTimeMillis() - startTime)); + } - if (topologyManagerConfig.showTime) { - System.out.println("TopologymanagerTime = "+ (System.currentTimeMillis()-startTime)); + + for(String dgm : cgm.getDGMList().keySet()){ + getDGM(dgm).put("_CLOSEMESSEAGE", "_CLOSEMESSEAGE"); + getDGM(dgm).shutdown(); } - System.exit(0); + + getLocalDGM().finish(); + } }
--- a/src/main/java/christie/topology/node/ConfigurationFinish.java Fri Dec 28 20:00:58 2018 +0900 +++ b/src/main/java/christie/topology/node/ConfigurationFinish.java Fri Dec 28 20:06:41 2018 +0900 @@ -25,17 +25,19 @@ @Override protected void run(CodeGearManager cgm) { reverseCount++; - System.out.println(reverseCount + " " + connectNodeNum); - if (reverseCount == connectNodeNum) { + if (reverseCount >= connectNodeNum) { getDGM(topologyNodeConfig.getManagerKey()).put("nodePrepareDone", "done"); cgm.setup(new Start()); /*if (topologyNodeConfig.useKeepAlive) - cgm.setup(new KeepAlive()); + cgm.setup(new KeepAlive());*/ cgm.setup(new PrepareToClose()); - ClosedEventManager.getInstance().register(DeleteConnection.class); - ClosedEventManager.getInstance().setKey();*/ + + /*ClosedEventManager closedEventManager = new ClosedEventManager(); + closedEventManager.register(cgm, DeleteConnection.class); + cgm.setup(closedEventManager); + return;*/ }else { getLocalDGM().put("reverseCount", reverseCount);
--- a/src/main/java/christie/topology/node/CreateConnectionList.java Fri Dec 28 20:00:58 2018 +0900 +++ b/src/main/java/christie/topology/node/CreateConnectionList.java Fri Dec 28 20:06:41 2018 +0900 @@ -8,7 +8,7 @@ import java.util.List; public class CreateConnectionList extends CodeGear { - @Peek + @Take List<String> _CLIST; @Take String cMember; @@ -19,6 +19,7 @@ protected void run(CodeGearManager cgm) { _CLIST.add(cMember); + getLocalDGM().put("_CLIST", _CLIST); } }
--- a/src/main/java/christie/topology/node/IncomingReverseKey.java Fri Dec 28 20:00:58 2018 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,25 +0,0 @@ -package christie.topology.node; - - -import christie.annotation.Take; -import christie.codegear.CodeGear; -import christie.codegear.CodeGearManager; - - -// pingを受け取るみたいなクラス. 相手がreverseNodeNameを送ってきたらreverseCountをプラスする -public class IncomingReverseKey extends CodeGear { - - @Take - String reverseNodeName; - - @Take - int reverseCount; - - @Override - protected void run(CodeGearManager cgm) { - reverseCount++; - getLocalDGM().put( "reverseCount", reverseCount); - cgm.setup(new IncomingReverseKey()); - } - -}
--- a/src/main/java/christie/topology/node/PrepareToClose.java Fri Dec 28 20:00:58 2018 +0900 +++ b/src/main/java/christie/topology/node/PrepareToClose.java Fri Dec 28 20:06:41 2018 +0900 @@ -5,16 +5,20 @@ import christie.annotation.Take; import christie.codegear.CodeGear; import christie.codegear.CodeGearManager; +import christie.datagear.DataGearManager; import java.util.List; public class PrepareToClose extends CodeGear { - @Peek + @Take String _CLOSEMESSEAGE; + @Take + List<String> _CLIST; + @Peek - List<String> _CLIST; + TopologyNodeConfig topologyNodeConfig; public PrepareToClose() { @@ -23,23 +27,27 @@ @Override protected void run(CodeGearManager cgm) { - /* - if (_CLIST.contains(_CLOSEMESSEAGE)) { + + for(String dgmName: _CLIST){ + getDGM(dgmName).shutdown(); + //_CLIST.remove(dgmName); + } + + getLocalDGM().finish(); + + /*if (_CLIST.contains(_CLOSEMESSEAGE)) { _CLIST.remove(_CLOSEMESSEAGE); - DataSegmentManager manager = DataSegment.get(_CLOSEMESSEAGE); - manager.setSendError(false); - - ods.put(_CLOSEMESSEAGE, "_CLOSEREADY", _CLOSEMESSEAGE); - ods.put("_CLOSEREADY", _CLOSEMESSEAGE); + getDGM(_CLOSEMESSEAGE).setSendError(false); + getDGM(_CLOSEMESSEAGE).close(); - new CloseRemoteDataSegment(); - } else { - // lost node is this node's parent, so already removed - new ReceiveCloseMessage(CommandType.TAKE); - }*/ + getDGM(_CLOSEMESSEAGE).put("_CLOSEREADY", _CLOSEMESSEAGE); + getLocalDGM().put("_CLOSEREADY", _CLOSEMESSEAGE); + + cgm.setup(new CloseRemoteDataGear()); + } cgm.setup(new PrepareToClose()); - //getLocalDGM().put("_CLIST", _CLIST); + getLocalDGM().put("_CLIST", _CLIST);*/ }
--- a/src/main/java/christie/topology/node/TopologyNode.java Fri Dec 28 20:00:58 2018 +0900 +++ b/src/main/java/christie/topology/node/TopologyNode.java Fri Dec 28 20:06:41 2018 +0900 @@ -34,7 +34,6 @@ getLocalDGM().put("reverseCount", 0); cgm.setup(new IncomingConnectionInfo()); - //cgm.setup(new IncomingReverseKey()); cgm.setup(new ConfigurationFinish()); }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/test/java/christie/localTestTopology/localTestTopology.java Fri Dec 28 20:06:41 2018 +0900 @@ -0,0 +1,40 @@ +package christie.localTestTopology; + +import christie.test.topology.localTestTopology.LTRemoteIncrement; +import christie.topology.manager.StartTopologyManager; +import christie.topology.manager.TopologyManagerConfig; +import christie.topology.node.StartTopologyNode; +import christie.topology.node.TopologyNodeConfig; + +public class localTestTopology { + + + public static void main(String[] args) { + /* Local Test */ + int managerPort = 10000; + int nodeNum = 3; + String[] managerArg = {"--localPort", String.valueOf(managerPort), "--confFile", "scripts/ring.dot"}; + TopologyManagerConfig topologyManagerConfig = new TopologyManagerConfig(managerArg); + new StartTopologyManager(topologyManagerConfig); + + + for (int i = 1; i<=nodeNum; i++) { + + String[] nodeArg = {"--managerPort", String.valueOf(managerPort), + "--managerHost", "localhost", + "--localPort", String.valueOf(managerPort + i)}; + /*try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + }*/ + TopologyNodeConfig topologyNodeConfig = new TopologyNodeConfig(nodeArg); + StartTopologyNode startTopologyNode = new StartTopologyNode(topologyNodeConfig, new LTRemoteIncrement()); + startTopologyNode.getLocalDGM().put("num", 0); + + } + + + } + +}