changeset 126:c6e4d0e4954c

update datagear add shutdown
author akahori
date Tue, 18 Dec 2018 15:12:45 +0900
parents 136d2a6cd0f4
children 42f195de3152
files scripts/local_test_run2.sh src/main/java/christie/daemon/AcceptThread.java src/main/java/christie/daemon/Connection.java src/main/java/christie/daemon/IncomingTcpConnection.java src/main/java/christie/datagear/DataGearManager.java src/main/java/christie/datagear/LocalDataGearManager.java src/main/java/christie/datagear/RemoteDataGearManager.java
diffstat 7 files changed, 63 insertions(+), 22 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/scripts/local_test_run2.sh	Tue Dec 18 15:12:45 2018 +0900
@@ -0,0 +1,26 @@
+#!/bin/bash
+
+if [ ! -d output ]; then
+    mkdir output
+fi
+
+max=$1
+count=$2
+jar_path=../build/libs/Christie.jar
+
+mkdir -p Log
+
+ruby ./ring.rb $1 > Log/ring.dot
+#dot -Tpng ./topology/ring.dot > ./topology/ring.png
+#open ./topology/ring.png
+java -cp $jar_path christie.topology.manager.StartTopologyManager --localPort 10000 --confFile Log/ring.dot &
+
+#sleep 3
+
+cnt=0
+while [ $cnt -lt $max ]
+do
+   java -cp $jar_path christie.test.topology.localTestTopology.StartTorqueTestTopology --managerHost `hostname` --managerPort 10000 --localPort `expr 20000 + $cnt`&
+    cnt=`expr $cnt + 1`
+done
+wait
\ No newline at end of file
--- a/src/main/java/christie/daemon/AcceptThread.java	Sat Dec 15 17:58:43 2018 +0900
+++ b/src/main/java/christie/daemon/AcceptThread.java	Tue Dec 18 15:12:45 2018 +0900
@@ -22,14 +22,14 @@
     public void run() {
         while (true) {
             try {
-                Socket socket = ss.accept();
+                Socket socket = null;
+                socket = ss.accept();
                 socket.setTcpNoDelay(true);
-                //System.out.println("Accept " + socket.getInetAddress().getHostName() + ":" + socket.getPort());
-                Connection connection = new Connection(socket);
-                connection.name = getName();
+                System.out.println("Accept " + socket.getInetAddress().getHostName() + ":" + socket.getPort());
+                Connection connection = new Connection(socket, cgm);
                 String key = "accept" + counter;
                 IncomingTcpConnection in =
-                        new IncomingTcpConnection(connection, cgm);
+                        new IncomingTcpConnection(connection);
                 in.setName(connection.getInfoString()+"-IncomingTcp");
                 in.start();
                 cgm.setAccept(key, in);
--- a/src/main/java/christie/daemon/Connection.java	Sat Dec 15 17:58:43 2018 +0900
+++ b/src/main/java/christie/daemon/Connection.java	Tue Dec 18 15:12:45 2018 +0900
@@ -1,21 +1,23 @@
 package christie.daemon;
 
-import java.awt.*;
 import java.net.Socket;
 import java.nio.ByteBuffer;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import christie.codegear.CodeGearManager;
 import christie.datagear.command.Command;
 
 public class Connection {
 
     public Socket socket;
     public String name;
+    public CodeGearManager cgm;
     public LinkedBlockingQueue<Command> sendQueue = new LinkedBlockingQueue<Command>();
     public boolean sendManager = true;
 
-    public Connection(Socket socket) {
+    public Connection(Socket socket, CodeGearManager cgm) {
         this.socket = socket;
+        this.cgm = cgm;
     }
 
     public Connection() {}
@@ -39,24 +41,24 @@
             socket.shutdownOutput();
             socket.shutdownInput();
             socket.close();
-        } catch (Exception e) { }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
         //putConnectionInfo();
 
     }
-
     /*
     public void putConnectionInfo() {
         if (name!=null) {
-            ConnectionInfo c = new ConnectionInfo(name, socket);
-            ReceiveData rData = new ReceiveData(c);
-            DataSegment.getLocal().put("_DISCONNECT", rData, false);
+            ConnectionInfo connectionInfo = new ConnectionInfo(name, socket);
+            cgm.getLocalDGM().put("_DISCONNECT", connectionInfo);
             if (sendManager) {
-                DataSegment.get("manager").put("_DISCONNECTNODE", rData, false);
+                cgm.getDGM("manager").put("_DISCONNECTNODE", connectionInfo);
                 sendManager = false;
             }
         }
+    }*/
 
-    }*/
     public synchronized void write(Command cmd) {
         ByteBuffer buffer = cmd.convert();
 
--- a/src/main/java/christie/daemon/IncomingTcpConnection.java	Sat Dec 15 17:58:43 2018 +0900
+++ b/src/main/java/christie/daemon/IncomingTcpConnection.java	Tue Dec 18 15:12:45 2018 +0900
@@ -23,9 +23,9 @@
     Connection connection;
     private MessagePack packer = new MessagePack();
 
-    public IncomingTcpConnection(Connection connection, CodeGearManager cgm) {
+    public IncomingTcpConnection(Connection connection) {
         this.connection = connection;
-        this.cgm = cgm;
+        this.cgm = connection.cgm;
     }
 
     public void setManager(RemoteDataGearManager manager){
--- a/src/main/java/christie/datagear/DataGearManager.java	Sat Dec 15 17:58:43 2018 +0900
+++ b/src/main/java/christie/datagear/DataGearManager.java	Tue Dec 18 15:12:45 2018 +0900
@@ -15,6 +15,8 @@
     public abstract void resolveWaitCommand(String key, DataGear dg);
     public abstract void finish();
     public abstract void close();
+    public abstract void shutdown();
+
 
 
 }
--- a/src/main/java/christie/datagear/LocalDataGearManager.java	Sat Dec 15 17:58:43 2018 +0900
+++ b/src/main/java/christie/datagear/LocalDataGearManager.java	Tue Dec 18 15:12:45 2018 +0900
@@ -89,7 +89,12 @@
     }
 
     @Override
-    public void close() {}
+    public void close() {
+
+    }
 
+    @Override
+    public void shutdown() {
 
+    }
 }
--- a/src/main/java/christie/datagear/RemoteDataGearManager.java	Sat Dec 15 17:58:43 2018 +0900
+++ b/src/main/java/christie/datagear/RemoteDataGearManager.java	Tue Dec 18 15:12:45 2018 +0900
@@ -10,6 +10,7 @@
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.channels.SocketChannel;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import static java.lang.Thread.MAX_PRIORITY;
 
@@ -27,7 +28,7 @@
                 do {
                     try {
                         SocketChannel sc = SocketChannel.open(new InetSocketAddress(address, port));
-                        connection = new Connection(sc.socket());
+                        connection = new Connection(sc.socket(), cgm);
                         connection.name = dgmName;
                         connection.socket.setTcpNoDelay(true);
 
@@ -44,7 +45,7 @@
                         }
                     }
                 } while (!connect);
-                IncomingTcpConnection in = new IncomingTcpConnection(connection, cgm);
+                IncomingTcpConnection in = new IncomingTcpConnection(connection);
                 in.setManager(manager);
                 in.setName(dgmName+"-IncomingTcp");
                 in.setPriority(MAX_PRIORITY);
@@ -91,6 +92,7 @@
         cm.setInputs();
     }
 
+
     @Override
     public void finish() {
         Command cmd = new FinishCommand();
@@ -100,11 +102,16 @@
     @Override
     public void close() {
         Command cmd = new CloseCommand();
-        connection.sendManager = false;
         connection.sendCommand(cmd);
     }
 
-    //
+    @Override
+    public void shutdown() {
+        connection.close();
+        LinkedBlockingQueue<Command> queue = connection.sendQueue;
+        if (!queue.isEmpty()) queue.clear();
+    }
+
     public void connectWait(){
         synchronized (lock){
             while(!connect){
@@ -114,7 +121,6 @@
                 }
             }
         }
-
     }