changeset 536:d2f7d02c4976 dispose

remoteDSM refactoring
author Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
date Mon, 15 Jun 2015 19:27:06 +0900
parents dd20acf579bd
children 8f949fa80653
files src/main/java/alice/daemon/AliceDaemon.java src/main/java/alice/datasegment/Command.java src/main/java/alice/datasegment/CompressedRemoteDataSegmentManager.java src/main/java/alice/datasegment/DataSegment.java src/main/java/alice/datasegment/DataSegmentManager.java src/main/java/alice/datasegment/RemoteDataSegmentManager.java src/main/java/alice/test/codesegment/remote/RemoteIncrement.java src/main/java/alice/test/codesegment/remote/RemoteStartCodeSegment.java
diffstat 8 files changed, 31 insertions(+), 28 deletions(-) [+]
line wrap: on
line diff
--- a/src/main/java/alice/daemon/AliceDaemon.java	Mon Jun 15 18:22:27 2015 +0900
+++ b/src/main/java/alice/daemon/AliceDaemon.java	Mon Jun 15 19:27:06 2015 +0900
@@ -50,7 +50,7 @@
             // listen on any address ipv4/ipv6
             InetSocketAddress a = new InetSocketAddress("::", conf.localPort);
 
-            //System.out.println("AliceDaemon.listen: bind to " + a);
+            System.out.println("AliceDaemon.listen: bind to " + a);
             ss.bind(a);
             acceptThread = new AcceptThread(ss, "ACCEPT" + conf.localPort);
             acceptThread.start();
--- a/src/main/java/alice/datasegment/Command.java	Mon Jun 15 18:22:27 2015 +0900
+++ b/src/main/java/alice/datasegment/Command.java	Mon Jun 15 19:27:06 2015 +0900
@@ -160,4 +160,8 @@
     public boolean getCompressFlag(){
         return compressFlag;
     }
+
+    public void setSeq(int seq) {
+        this.seq = seq;
+    }
 }
--- a/src/main/java/alice/datasegment/CompressedRemoteDataSegmentManager.java	Mon Jun 15 18:22:27 2015 +0900
+++ b/src/main/java/alice/datasegment/CompressedRemoteDataSegmentManager.java	Mon Jun 15 19:27:06 2015 +0900
@@ -19,6 +19,7 @@
     public CompressedRemoteDataSegmentManager(){}
 
     public CompressedRemoteDataSegmentManager(Connection c) {
+        logger = Logger.getLogger(c.name);
         connection = c;
         connection.name = "compressed" + c.name;
     }
@@ -100,13 +101,14 @@
         } else {
             connection.sendCommand(cmd);
         }
-        if (logger.isDebugEnabled())
+        if (logger.isDebugEnabled()
             logger.debug(cmd.getCommandString());
     }
 
     @Override
     public void take(Receiver receiver, CodeSegment cs, boolean quickFlag) {
         int seq = this.seq.getAndIncrement();
+        System.err.println("CompressedDataSegment take seq :" + seq);
         Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
         cmd.setCompressFlag(true);
         cmd.setQuickFlag(quickFlag);
--- a/src/main/java/alice/datasegment/DataSegment.java	Mon Jun 15 18:22:27 2015 +0900
+++ b/src/main/java/alice/datasegment/DataSegment.java	Mon Jun 15 19:27:06 2015 +0900
@@ -41,10 +41,10 @@
             System.exit(0);
         }
         RemoteDataSegmentManager manager = new RemoteDataSegmentManager(connectionKey, reverseKey, hostName, port);
-        //CompressedRemoteDataSegmentManager compressedManager = new CompressedRemoteDataSegmentManager(manager.connection);
+        CompressedRemoteDataSegmentManager compressedManager = new CompressedRemoteDataSegmentManager(manager.connection);
 
         register(connectionKey, manager);
-        //register("compressed" + connectionKey, compressedManager);
+        register("compressed" + connectionKey, compressedManager);
 
         return manager;
     }
--- a/src/main/java/alice/datasegment/DataSegmentManager.java	Mon Jun 15 18:22:27 2015 +0900
+++ b/src/main/java/alice/datasegment/DataSegmentManager.java	Mon Jun 15 19:27:06 2015 +0900
@@ -39,6 +39,7 @@
     };
 
     public Command getAndRemoveCmd(int index){
+        System.err.println("DSM getAndRemoveCmd seq : " + index);
         return seqHash.remove(index);
     }
 
--- a/src/main/java/alice/datasegment/RemoteDataSegmentManager.java	Mon Jun 15 18:22:27 2015 +0900
+++ b/src/main/java/alice/datasegment/RemoteDataSegmentManager.java	Mon Jun 15 19:27:06 2015 +0900
@@ -63,6 +63,10 @@
     public void put(String key, ReceiveData rData, boolean quickFlag) {
         Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, "");
 
+        put1(quickFlag, cmd);
+    }
+
+    public void put1(boolean quickFlag, Command cmd) {
         if (quickFlag){
             connection.write(cmd); // put command is executed right now
         } else {
@@ -76,21 +80,22 @@
     public void update(String key, ReceiveData rData, boolean quickFlag) {
         Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, "");
 
-        if (quickFlag){
-            connection.write(cmd);
-        } else {
-            connection.sendCommand(cmd);
-        }
-        if (logger.isDebugEnabled())
-            logger.debug(cmd.getCommandString());
+        put1(quickFlag, cmd);
     }
 
     @Override
     public void take(Receiver receiver, CodeSegment cs, boolean quickFlag) {
+
+        Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, 0, replyQueue, cs, null);
+        take1(quickFlag, cmd);
+    }
+
+    public void take1(boolean quickFlag, Command cmd) {
         int seq = this.seq.getAndIncrement();
-        Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
+        System.err.println("DataSegment take seq :" + seq);
+        cmd.setSeq(seq);
+        seqHash.put(seq, cmd);
         cmd.setQuickFlag(quickFlag);
-        seqHash.put(seq, cmd);
         if (quickFlag){
             connection.write(cmd);
         } else {
@@ -102,17 +107,8 @@
 
     @Override
     public void peek(Receiver receiver, CodeSegment cs, boolean quickFlag) {
-        int seq = this.seq.getAndIncrement();
-        Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
-        cmd.setQuickFlag(quickFlag);
-        seqHash.put(seq, cmd);
-        if (quickFlag){
-            connection.write(cmd);
-        } else {
-            connection.sendCommand(cmd);
-        }
-        if (logger.isDebugEnabled())
-            logger.debug(cmd.getCommandString());
+        Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, 0, replyQueue, cs, null);
+        take1(quickFlag, cmd);
     }
 
     @Override
--- a/src/main/java/alice/test/codesegment/remote/RemoteIncrement.java	Mon Jun 15 18:22:27 2015 +0900
+++ b/src/main/java/alice/test/codesegment/remote/RemoteIncrement.java	Mon Jun 15 19:27:06 2015 +0900
@@ -18,9 +18,9 @@
         if (num == 10) System.exit(0);
 
         RemoteIncrement cs = new RemoteIncrement();
-        cs.num.setKey("remote", "num");
+        cs.num.setKey("compressedremote", "num");
 
-        ods.put("local", "num", num);
+        ods.put("compressedlocal", "num", num);
     }
 
 }
\ No newline at end of file
--- a/src/main/java/alice/test/codesegment/remote/RemoteStartCodeSegment.java	Mon Jun 15 18:22:27 2015 +0900
+++ b/src/main/java/alice/test/codesegment/remote/RemoteStartCodeSegment.java	Mon Jun 15 19:27:06 2015 +0900
@@ -7,8 +7,8 @@
     @Override
     public void run() {
         RemoteIncrement cs = new RemoteIncrement();
-        cs.num.setKey("remote", "num");
+        cs.num.setKey("compressedremote", "num");
 
-        ods.put("local", "num", 0);
+        ods.put("compressedlocal", "num", 0);
     }
 }
\ No newline at end of file