changeset 470:780ae843cdac dispose

Delete disconnect managerKey from connection list
author sugi
date Mon, 24 Nov 2014 19:52:13 +0900
parents b31b1d197c42
children be0b61986ff7
files src/main/java/alice/codesegment/CloseEventCodeSegment.java src/main/java/alice/daemon/AcceptThread.java src/main/java/alice/daemon/Connection.java src/main/java/alice/daemon/ConnectionInfo.java src/main/java/alice/datasegment/RemoteDataSegmentManager.java src/main/java/alice/topology/manager/keeparive/CatchDisconnectEvent.java src/main/java/alice/topology/node/CloseEventCodeSegment.java src/main/java/alice/topology/node/ConfigurationFinish.java src/main/java/alice/topology/node/DeleteConnection.java src/main/java/alice/topology/node/DisconnectEventManager.java src/main/java/alice/topology/node/ExecuteEvent.java
diffstat 11 files changed, 94 insertions(+), 31 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/alice/codesegment/CloseEventCodeSegment.java	Mon Nov 24 19:52:13 2014 +0900
@@ -0,0 +1,15 @@
+package alice.codesegment;
+
+import alice.daemon.ConnectionInfo;
+import alice.datasegment.CommandType;
+import alice.datasegment.Receiver;
+
+public abstract class CloseEventCodeSegment extends CodeSegment {
+
+    public Receiver metaInfo = ids.create(CommandType.PEEK);
+
+    public ConnectionInfo getConnectionInfo() {
+        return metaInfo.asClass(ConnectionInfo.class);
+    }
+
+}
--- a/src/main/java/alice/daemon/AcceptThread.java	Sun Nov 23 22:14:30 2014 +0900
+++ b/src/main/java/alice/daemon/AcceptThread.java	Mon Nov 24 19:52:13 2014 +0900
@@ -30,11 +30,9 @@
                 String key = "accept" + counter;
                 IncomingTcpConnection incoming =
                         new IncomingTcpConnection(connection, DataSegment.get("local"), key);
-                incoming.setPriority(MAX_PRIORITY);
                 incoming.start();
                 DataSegment.setAccept(key, incoming);
                 OutboundTcpConnection outbound = new OutboundTcpConnection(connection);
-                outbound.setPriority(MAX_PRIORITY);
                 outbound.start();
                 counter++;
             } catch (IOException e) {
--- a/src/main/java/alice/daemon/Connection.java	Sun Nov 23 22:14:30 2014 +0900
+++ b/src/main/java/alice/daemon/Connection.java	Mon Nov 24 19:52:13 2014 +0900
@@ -13,6 +13,7 @@
 public class Connection {
 
     public Socket socket;
+    public String name;
     public LinkedBlockingQueue<Command> sendQueue = new LinkedBlockingQueue<Command>();
 
     public Connection(Socket socket) {
@@ -55,17 +56,19 @@
             socket.shutdownInput();
             socket.close();
         } catch (ClosedChannelException e) {
-            putConnectionInfo();
+            return;
         } catch (IOException e) {
-            e.printStackTrace();
+            return;
         }
 
     }
 
     public void putConnectionInfo() {
-        ConnectionInfo c = new ConnectionInfo(socket.getInetAddress().toString(), socket.getPort());
-        ReceiveData rData = new ReceiveData(c, false, false);
-        DataSegment.getLocal().put("disconnect", rData, null);
+        if (name!=null){
+            ConnectionInfo c = new ConnectionInfo(name, socket.getInetAddress().getHostAddress(), socket.getPort());
+            ReceiveData rData = new ReceiveData(c, false, false);
+            DataSegment.getLocal().put("_DISCONNECT", rData, null);
+        }
 
     }
 }
--- a/src/main/java/alice/daemon/ConnectionInfo.java	Sun Nov 23 22:14:30 2014 +0900
+++ b/src/main/java/alice/daemon/ConnectionInfo.java	Mon Nov 24 19:52:13 2014 +0900
@@ -4,12 +4,14 @@
 
 @Message
 public class ConnectionInfo {
+    public String name;
     public String addr;
     public int port;
 
     public ConnectionInfo(){}
 
-    public ConnectionInfo(String addr, int port) {
+    public ConnectionInfo(String name, String addr, int port) {
+        this.name = name;
         this.addr = addr;
         this.port = port;
     }
--- a/src/main/java/alice/datasegment/RemoteDataSegmentManager.java	Sun Nov 23 22:14:30 2014 +0900
+++ b/src/main/java/alice/datasegment/RemoteDataSegmentManager.java	Mon Nov 24 19:52:13 2014 +0900
@@ -3,6 +3,7 @@
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.channels.SocketChannel;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.log4j.Logger;
 
@@ -15,14 +16,17 @@
 
     Connection connection;
     Logger logger;
+    private OutboundTcpConnection out;
+    private IncomingTcpConnection in;
 
     public RemoteDataSegmentManager(){}
 
     public RemoteDataSegmentManager(String connectionKey, final String reverseKey, final String hostName, final int port) {
         logger = Logger.getLogger(connectionKey);
         connection = new Connection();
+        connection.name = connectionKey;
         final RemoteDataSegmentManager manager = this;
-        new Thread(replyThread, "RemoteDataSegmentManager-" + connectionKey).start();
+        //new Thread(replyThread, "RemoteDataSegmentManager-" + connectionKey).start();
         new Thread("Connect-" + connectionKey) {
             public void run() {
                 boolean connect = true;
@@ -41,8 +45,12 @@
                         }
                     }
                 } while (connect);
-                new IncomingTcpConnection(connection, manager, reverseKey).start();
-                new OutboundTcpConnection(connection).start();
+                in = new IncomingTcpConnection(connection, manager, reverseKey);
+                in.setPriority(MAX_PRIORITY);
+                in.start();
+                out = new OutboundTcpConnection(connection);
+                out.setPriority(MAX_PRIORITY);
+                out.start();
             }
         }.start();
     }
@@ -141,6 +149,26 @@
     @Override
     public void shutdown() {
         connection.close();
+        LinkedBlockingQueue<Command> queue = connection.sendQueue;
+        if (!queue.isEmpty()) {
+            queue.clear();
+        }
+
+        System.out.println(out.getState());
+        if (out != null && out.getState() != Thread.State.TERMINATED) {
+            Command cmd = new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, "");
+            queue.add(cmd); // outboundTCP Thread will stop.
+            out = null;
+        }
+
+        System.out.println(in.getState());
+        if (in != null || in.getState() != Thread.State.TERMINATED) {
+            in = null;
+        }
+
+        if (DataSegment.contains(connection.name)) {
+            DataSegment.remove(connection.name);
+        }
     }
 
 }
--- a/src/main/java/alice/topology/manager/keeparive/CatchDisconnectEvent.java	Sun Nov 23 22:14:30 2014 +0900
+++ b/src/main/java/alice/topology/manager/keeparive/CatchDisconnectEvent.java	Mon Nov 24 19:52:13 2014 +0900
@@ -1,7 +1,7 @@
 package alice.topology.manager.keeparive;
 
+import alice.codesegment.CloseEventCodeSegment;
 import alice.daemon.ConnectionInfo;
-import alice.topology.node.CloseEventCodeSegment;
 
 public class CatchDisconnectEvent extends CloseEventCodeSegment{
 
--- a/src/main/java/alice/topology/node/CloseEventCodeSegment.java	Sun Nov 23 22:14:30 2014 +0900
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,16 +0,0 @@
-package alice.topology.node;
-
-import alice.codesegment.CodeSegment;
-import alice.daemon.ConnectionInfo;
-import alice.datasegment.CommandType;
-import alice.datasegment.Receiver;
-
-public abstract class CloseEventCodeSegment extends CodeSegment {
-
-    public Receiver metaInfo = ids.create(CommandType.PEEK);
-
-    public ConnectionInfo getConnectionInfo() {
-        return metaInfo.asClass(ConnectionInfo.class);
-    }
-
-}
--- a/src/main/java/alice/topology/node/ConfigurationFinish.java	Sun Nov 23 22:14:30 2014 +0900
+++ b/src/main/java/alice/topology/node/ConfigurationFinish.java	Mon Nov 24 19:52:13 2014 +0900
@@ -26,8 +26,9 @@
             Start cs = new Start(startCS);
             cs.done.setKey("manager", "start");
 
+            DisconnectEventManager.getInstance().register(CatchDisconnectEvent.class);
+            DisconnectEventManager.getInstance().register(DeleteConnection.class);
             DisconnectEventManager.getInstance().setKey();
-            DisconnectEventManager.getInstance().register(CatchDisconnectEvent.class);
             return;
         }
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/alice/topology/node/DeleteConnection.java	Mon Nov 24 19:52:13 2014 +0900
@@ -0,0 +1,31 @@
+package alice.topology.node;
+
+import java.util.List;
+
+import alice.codesegment.CloseEventCodeSegment;
+import alice.datasegment.CommandType;
+import alice.datasegment.DataSegment;
+import alice.datasegment.Receiver;
+
+public class DeleteConnection extends CloseEventCodeSegment {
+
+    private Receiver info = ids.create(CommandType.TAKE);
+
+    public DeleteConnection() {
+        info.setKey("_CLIST");
+    }
+
+    @Override
+    public void run() {
+        String name = getConnectionInfo().name;
+        @SuppressWarnings("unchecked")
+        List<String> list = info.asClass(List.class);
+
+        if (list.contains(name))
+            list.remove(name);
+
+        DataSegment.get(name).shutdown();
+        ods.put(info.key, list);
+    }
+
+}
--- a/src/main/java/alice/topology/node/DisconnectEventManager.java	Sun Nov 23 22:14:30 2014 +0900
+++ b/src/main/java/alice/topology/node/DisconnectEventManager.java	Mon Nov 24 19:52:13 2014 +0900
@@ -21,7 +21,7 @@
 
     public void setKey() {
         ids.init();
-        info.setKey("disconnect");
+        info.setKey("_DISCONNECT");
     }
 
     @SuppressWarnings("rawtypes")
--- a/src/main/java/alice/topology/node/ExecuteEvent.java	Sun Nov 23 22:14:30 2014 +0900
+++ b/src/main/java/alice/topology/node/ExecuteEvent.java	Mon Nov 24 19:52:13 2014 +0900
@@ -3,6 +3,7 @@
 import java.lang.reflect.Field;
 import java.util.List;
 
+import alice.codesegment.CloseEventCodeSegment;
 import alice.codesegment.CodeSegment;
 import alice.datasegment.CommandType;
 import alice.datasegment.Receiver;
@@ -14,7 +15,7 @@
 
     public ExecuteEvent() {
         info.setKey("_DEVENTLIST");
-        info1.setKey("disconnect");
+        info1.setKey("_DISCONNECT");
     }
 
     @SuppressWarnings("unchecked")