changeset 488:7ef0ebb40c9b dispose

add measurement data in protocol
author sugi
date Mon, 08 Dec 2014 23:11:26 +0900
parents c1cf44777eef
children 9a7dd7591ddc
files src/main/java/alice/daemon/CommandMessage.java src/main/java/alice/daemon/IncomingTcpConnection.java src/main/java/alice/datasegment/Command.java src/main/java/alice/datasegment/ReceiveData.java src/main/java/alice/topology/manager/keeparive/ListManager.java src/main/java/alice/topology/manager/keeparive/RespondPing.java src/main/java/alice/topology/node/ConfigurationFinish.java
diffstat 7 files changed, 33 insertions(+), 12 deletions(-) [+]
line wrap: on
line diff
--- a/src/main/java/alice/daemon/CommandMessage.java	Mon Dec 08 20:55:10 2014 +0900
+++ b/src/main/java/alice/daemon/CommandMessage.java	Mon Dec 08 23:11:26 2014 +0900
@@ -12,6 +12,10 @@
     public boolean serialized = false;
     public boolean compressed = false;
 
+    public boolean setTime = false;
+    public long time;
+    public int depth;
+
     public CommandMessage() {}
 
     public CommandMessage(int type, int index, int seq, String key
--- a/src/main/java/alice/daemon/IncomingTcpConnection.java	Mon Dec 08 20:55:10 2014 +0900
+++ b/src/main/java/alice/daemon/IncomingTcpConnection.java	Mon Dec 08 23:11:26 2014 +0900
@@ -60,6 +60,11 @@
                 case UPDATE:
                 case PUT:
                     rData = new ReceiveData(getSerializedByteArray(unpacker), msg.compressed, msg.serialized);
+                    if (msg.setTime) {
+                        rData.setTime = true;
+                        rData.time = msg.time;
+                        rData.depth = msg.depth;
+                    }
                     cmd = new Command(type, null, null, rData, 0, 0, null, null, reverseKey);
                     lmanager.getDataSegmentKey(msg.key).runCommand(cmd);
                     break;
--- a/src/main/java/alice/datasegment/Command.java	Mon Dec 08 20:55:10 2014 +0900
+++ b/src/main/java/alice/datasegment/Command.java	Mon Dec 08 23:11:26 2014 +0900
@@ -102,8 +102,14 @@
                         compressed = true;
                     }
                 }
+                CommandMessage cm = new CommandMessage(type.id, index, seq, key, false, serialized, compressed);
+                if (rData.setTime) {
+                    cm.setTime = true;
+                    cm.time = rData.time;
+                    cm.depth = rData.depth + 1;
+                }
 
-                header = msg.write(new CommandMessage(type.id, index, seq, key, false, serialized, compressed));
+                header = msg.write(cm);
                 dataSize = msg.write(data.length);
                 buf = ByteBuffer.allocate(header.length+dataSize.length+data.length);
                 buf.put(header);
--- a/src/main/java/alice/datasegment/ReceiveData.java	Mon Dec 08 20:55:10 2014 +0900
+++ b/src/main/java/alice/datasegment/ReceiveData.java	Mon Dec 08 23:11:26 2014 +0900
@@ -18,6 +18,10 @@
     private boolean serialized = false;
     private boolean byteArray = false;
 
+    public long time;
+    public boolean setTime = false;
+    public int depth = 1;
+
     public ReceiveData(Object obj, boolean cFlag, boolean sFlag){
         val = obj;
         compressed = cFlag;
--- a/src/main/java/alice/topology/manager/keeparive/ListManager.java	Mon Dec 08 20:55:10 2014 +0900
+++ b/src/main/java/alice/topology/manager/keeparive/ListManager.java	Mon Dec 08 23:11:26 2014 +0900
@@ -85,7 +85,8 @@
     }
 
     public void deleteAll(String name) {
-        if (TaskExecuter.getInstance().getNowTask().getManagerKey().equals(name))
+        if (TaskExecuter.getInstance().getNowTask().getManagerKey() != null &&
+                TaskExecuter.getInstance().getNowTask().getManagerKey().equals(name))
             TaskExecuter.getInstance().skip();
         TaskInfo task = new TaskInfo(TaskType.CLOSE);
         task.setInfo(name, 0);
--- a/src/main/java/alice/topology/manager/keeparive/RespondPing.java	Mon Dec 08 20:55:10 2014 +0900
+++ b/src/main/java/alice/topology/manager/keeparive/RespondPing.java	Mon Dec 08 23:11:26 2014 +0900
@@ -2,12 +2,11 @@
 
 import alice.codesegment.CodeSegment;
 import alice.datasegment.CommandType;
-import alice.datasegment.DataSegment;
 import alice.datasegment.Receiver;
 
 public class RespondPing extends CodeSegment{
     private Receiver respond = ids.create(CommandType.TAKE);
-    private long pingedTime = System.currentTimeMillis();
+//    private long pingedTime = System.currentTimeMillis();
 
     public RespondPing(String key) {
         respond.setKey(key);
@@ -16,13 +15,13 @@
     @Override
     public void run() {
         RespondData d = respond.asClass(RespondData.class);
-        System.out.print("ping from "+d.from);
-        System.out.println(" Recieved time "+(d.time - pingedTime));
-        if (d.time - pingedTime > 60 * 1000){
-            // need check, this connection is alive. may be close
-            if (DataSegment.contains(d.from))
-                DataSegment.get(d.from).shutdown();
-        } else {
+//        System.out.print("ping from "+d.from);
+//        System.out.println(" Recieved time "+(d.time - pingedTime));
+//        if (d.time - pingedTime > 60 * 1000){
+//            // need check, this connection is alive. may be close
+//            if (DataSegment.contains(d.from))
+//                DataSegment.get(d.from).shutdown();
+//        } else {
             // if nowTask close d.from's socket cancel.
             // if not remove close task in the Queue.
             TaskExecuter exec = TaskExecuter.getInstance();
@@ -36,6 +35,6 @@
                 ods.put("_REMOVETASK",task);
                 new RemoveTask();
             }
-        }
+//        }
     }
 }
--- a/src/main/java/alice/topology/node/ConfigurationFinish.java	Mon Dec 08 20:55:10 2014 +0900
+++ b/src/main/java/alice/topology/node/ConfigurationFinish.java	Mon Dec 08 23:11:26 2014 +0900
@@ -5,6 +5,7 @@
 import alice.codesegment.CodeSegment;
 import alice.datasegment.CommandType;
 import alice.datasegment.Receiver;
+import alice.topology.manager.keeparive.StartKeepAlive;
 
 public class ConfigurationFinish extends CodeSegment {
 
@@ -25,6 +26,7 @@
             Start cs = new Start(startCS);
             cs.done.setKey("manager", "start");
 
+            //new StartKeepAlive().execute();
             new ReceiveCloseMessage();
             DisconnectEventManager.getInstance().register(DeleteConnection.class);
             DisconnectEventManager.getInstance().setKey();