changeset 572:ef3dc954eb43 dispose

delete serializeFlag
author Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
date Sun, 13 Dec 2015 23:49:46 +0900
parents 80a6c4a1c601
children fa3c8424dea4
files src/main/java/alice/codesegment/OutputDataSegment.java src/main/java/alice/daemon/CommandMessage.java src/main/java/alice/daemon/IncomingTcpConnection.java src/main/java/alice/daemon/IncomingUdpConnection.java src/main/java/alice/datasegment/Command.java src/main/java/alice/datasegment/CompressedLocalDataSegmentManager.java src/main/java/alice/datasegment/DataSegment.java src/main/java/alice/datasegment/DataSegmentKey.java src/main/java/alice/datasegment/LocalDataSegmentManager.java src/main/java/alice/datasegment/ReceiveData.java src/main/java/alice/test/codesegment/remote/RemoteIncrement.java src/main/java/alice/test/codesegment/remote/RemoteStartCodeSegment.java
diffstat 12 files changed, 35 insertions(+), 262 deletions(-) [+]
line wrap: on
line diff
--- a/src/main/java/alice/codesegment/OutputDataSegment.java	Tue Dec 08 16:29:09 2015 +0900
+++ b/src/main/java/alice/codesegment/OutputDataSegment.java	Sun Dec 13 23:49:46 2015 +0900
@@ -16,11 +16,7 @@
      * input→ds変更→outputのときコピーを防ぐ
      */
     public void flip(Receiver receiver) {
-        if (receiver.isCompressed()){
-            DataSegment.getCompressedLocal().put(receiver.key, receiver.getReceiveData(), false);
-        } else {
-            DataSegment.getLocal().put(receiver.key, receiver.getReceiveData(), false);
-        }
+        DataSegment.getLocal().put(receiver.key, receiver.getReceiveData(), false);
     }
 
     public void flip(String managerKey, String key, Receiver receiver){
@@ -35,19 +31,10 @@
     public void flip(Receiver receiver, CommandType type) {
         switch (type) {
             case PUT:
-                if (receiver.isCompressed()){
-                    DataSegment.getCompressedLocal().put(receiver.key, receiver.getReceiveData(), false);//localなら全部false。
-                } else {
-                    DataSegment.getLocal().put(receiver.key, receiver.getReceiveData(), false);
-                }
+                DataSegment.getLocal().put(receiver.key, receiver.getReceiveData(), false);
                 break;
             case UPDATE:
-                if (receiver.isCompressed()){
-                    DataSegment.getCompressedLocal().update(receiver.key, receiver.getReceiveData(), false);
-                } else {
-                    DataSegment.getLocal().update(receiver.key, receiver.getReceiveData(), false);
-                }
-
+                DataSegment.getLocal().update(receiver.key, receiver.getReceiveData(), false);
                 break;
             default:
                 break;
--- a/src/main/java/alice/daemon/CommandMessage.java	Tue Dec 08 16:29:09 2015 +0900
+++ b/src/main/java/alice/daemon/CommandMessage.java	Sun Dec 13 23:49:46 2015 +0900
@@ -12,7 +12,6 @@
     public int seq;//DSの待ち合わせを行っているCSを表すunique number
     public String key;//DS key
     public boolean quickFlag = false;//SEDAを挟まずに処理を行うかどうか
-    public boolean serialized = false;//シリアライズされているかどうか
     public boolean compressed = false;//圧縮されているかどうか
     public int dataSize = 0;//圧縮前のサイズ
 
@@ -26,13 +25,12 @@
     public CommandMessage() {}
 
     public CommandMessage(int type, int index, int seq, String key
-            , boolean qFlag, boolean sFlag, boolean cFlag, int datasize) {
+            , boolean qFlag, boolean cFlag, int datasize) {
         this.type = type;
         this.index = index;
         this.seq = seq;
         this.key = key;
         this.quickFlag = qFlag;
-        this.serialized = sFlag;
         this.compressed = cFlag;
         this.dataSize = datasize;
     }
--- a/src/main/java/alice/daemon/IncomingTcpConnection.java	Tue Dec 08 16:29:09 2015 +0900
+++ b/src/main/java/alice/daemon/IncomingTcpConnection.java	Sun Dec 13 23:49:46 2015 +0900
@@ -16,7 +16,6 @@
     protected DataSegmentManager manager;
     protected String reverseKey;
     private LocalDataSegmentManager lmanager = DataSegment.getLocal();
-    private CompressedLocalDataSegmentManager compressedlmanager = DataSegment.getCompressedLocal();
     private static final MessagePack packer = new MessagePack();
 
     public IncomingTcpConnection(DataSegmentManager manager) {
@@ -33,10 +32,6 @@
         return lmanager;
     }
 
-    public CompressedLocalDataSegmentManager getCompressedLocalDataSegmentManager(){
-        return compressedlmanager;
-    }
-
     /**
      * pipeline thread for receiving
      */
@@ -60,11 +55,8 @@
                 case UPDATE:
                 case PUT:
                     int dataSize = unpacker.readInt();
-                    if (msg.compressed) {
-                        rData = new ReceiveData(packer.read(unpacker.getSerializedByteArray(dataSize), byte[].class), true, msg.dataSize);
-                    } else {
-                        rData = new ReceiveData(unpacker.getSerializedByteArray(dataSize), false, msg.dataSize);
-                    }
+                    rData = new ReceiveData(unpacker.getSerializedByteArray(dataSize), msg.compressed, msg.dataSize);
+
 
                     if (msg.setTime) {
                         rData.setTimes(msg.time, true, msg.depth);
@@ -78,11 +70,7 @@
                     cmd = new Command(type, null, null, rData, 0, 0, null, null, reverseKey);
                     cmd.setCompressFlag(msg.compressed);
 
-                    if (rData.compressed()){
-                        compressedlmanager.getDataSegmentKey(msg.key).runCommand(cmd);
-                    } else {
-                        lmanager.getDataSegmentKey(msg.key).runCommand(cmd);
-                    }
+                    lmanager.getDataSegmentKey(msg.key).runCommand(cmd);
 
                     break;
                 case PEEK:
@@ -90,11 +78,7 @@
                     cmd = new Command(type, null, null, null, msg.index, msg.seq, null, null, connection);
                     cmd.setCompressFlag(msg.compressed);
 
-                    if (msg.compressed){
-                        compressedlmanager.getDataSegmentKey(msg.key).runCommand(cmd);
-                    } else {
-                        lmanager.getDataSegmentKey(msg.key).runCommand(cmd);
-                    }
+                    lmanager.getDataSegmentKey(msg.key).runCommand(cmd);
 
                     break;
                 case REMOVE:
@@ -104,11 +88,7 @@
                 case REPLY:
                     cmd = manager.getAndRemoveCmd(msg.seq);
 
-                    if (msg.compressed) {
-                        rData = new ReceiveData(packer.read(unpacker.getSerializedByteArray(unpacker.readInt()), byte[].class), true, msg.dataSize);
-                    } else {
-                        rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()), false, msg.dataSize);
-                    }
+                    rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()), msg.compressed, msg.dataSize);
 
                     Command rCmd = new Command(type, null, null, rData, msg.index, msg.seq, null, null, "");
                     cmd.setCompressFlag(msg.compressed);
--- a/src/main/java/alice/daemon/IncomingUdpConnection.java	Tue Dec 08 16:29:09 2015 +0900
+++ b/src/main/java/alice/daemon/IncomingUdpConnection.java	Sun Dec 13 23:49:46 2015 +0900
@@ -50,11 +50,8 @@
                     rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()));
                     cmd = new Command(type, null, null, rData, 0, 0, null, null, reverseKey);
 
-                    if (msg.compressed){
-                        getCompressedLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd);
-                    } else {
-                        getLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd);
-                    }
+                    getLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd);
+
                     break;
                 case PEEK:
                 case TAKE:
@@ -62,11 +59,8 @@
                     cmd.setQuickFlag(msg.quickFlag);
                     cmd.setCompressFlag(msg.compressed);
 
-                    if (msg.compressed) {
-                        getCompressedLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd);
-                    } else {
-                        getLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd);
-                    }
+                    getLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd);
+
                     break;
                 case REMOVE:
                     cmd = new Command(type, null, null, null, 0, 0, null, null, "");
--- a/src/main/java/alice/datasegment/Command.java	Tue Dec 08 16:29:09 2015 +0900
+++ b/src/main/java/alice/datasegment/Command.java	Sun Dec 13 23:49:46 2015 +0900
@@ -80,8 +80,6 @@
             byte[] header = null;
             byte[] data = null;
             byte[] dataSize = null;
-            boolean serialized = false;
-            boolean compressed = false;
             switch (type) {
         /*
          * UPDATE, PUT, REPLY need send DataSegment to RemoteDataSegment
@@ -95,15 +93,12 @@
                 case PUT:
                 case REPLY:
                     if(compressFlag){
-                        // ToDo: Do not pack again
-                        data = packer.write(rData.getZMessagePack());
-                        compressed = true;
+                        data = rData.getZMessagePack();
                     } else {
                         data = rData.getMessagePack();
-                        serialized = true;
                     }
 
-                    CommandMessage cm = new CommandMessage(type.id, index, seq, key, false, serialized, compressed, rData.getDataSize());
+                    CommandMessage cm = new CommandMessage(type.id, index, seq, key, false, compressFlag, rData.getDataSize());
                     if (rData.getSetTime()) {
                         cm.setTime = true;
                         cm.time = rData.getTime();
@@ -123,7 +118,7 @@
                     buf.put(data);
                     break;
                 default:
-                    header = packer.write(new CommandMessage(type.id, index, seq, key, quickFlag, false, compressFlag, 0));
+                    header = packer.write(new CommandMessage(type.id, index, seq, key, quickFlag, compressFlag, 0));
                     buf = ByteBuffer.allocate(header.length);
                     buf.put(header);
                     break;
--- a/src/main/java/alice/datasegment/CompressedLocalDataSegmentManager.java	Tue Dec 08 16:29:09 2015 +0900
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,146 +0,0 @@
-package alice.datasegment;
-
-import java.io.IOException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.log4j.Logger;
-
-import alice.codesegment.CodeSegment;
-
-public class CompressedLocalDataSegmentManager extends DataSegmentManager {
-
-    LocalDataSegmentManager manager;
-    private String reverseKey = "compressedlocal";
-
-    public CompressedLocalDataSegmentManager(LocalDataSegmentManager manager) {
-        this.manager = manager;
-        new Thread(replyThread, "CompressedLocalDataSegmentManager-replyCommand").start();
-    }
-
-    public void setReverseKey(String s){
-        reverseKey = s;
-    }
-
-    private class RunCommand implements Runnable {
-
-        DataSegmentKey key;
-        Command cmd;
-
-        public RunCommand(DataSegmentKey key, Command cmd) {
-            this.key = key;
-            this.cmd = cmd;
-        }
-
-        @Override
-        public void run() {
-            key.runCommand(cmd);
-        }
-
-    }
-
-    public void submitCommand(DataSegmentKey key, Command cmd) {
-        manager.submitCommand(key, cmd);
-    }
-
-    public DataSegmentKey getDataSegmentKey(String key) {
-        return manager.getDataSegmentKey(key);
-    }
-
-    public void removeDataSegmentKey(String key) {
-        manager.removeDataSegmentKey(key);
-    }
-
-    @Override
-    public void put(String key, ReceiveData rData, boolean quickFlag) {
-        if (!rData.compressed()){
-            try {
-                rData.zip();
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-        }
-
-        Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, reverseKey);
-        cmd.setCompressFlag(true);
-
-        manager.put1(key, cmd);
-    }
-
-    /**
-     * Enqueue update command to the queue of each DataSegment key
-     */
-
-    @Override
-    public void update(String key, ReceiveData rData, boolean quickFlag) {
-
-        if (!rData.compressed()){
-            try {
-                rData.zip();
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-        }
-
-        Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, reverseKey);
-        cmd.setCompressFlag(true);
-
-        manager.put1(key, cmd);
-    }
-
-    @Override
-    public void take(Receiver receiver, CodeSegment cs, boolean quickFlag) {
-        Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, 0, replyQueue, cs, null);
-        cmd.setCompressFlag(true);
-
-        manager.take1(receiver, cmd);
-    }
-
-    @Override
-    public void peek(Receiver receiver, CodeSegment cs, boolean quickFlag) {
-        Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, 0, replyQueue, cs, null);
-        cmd.setCompressFlag(true);
-
-        manager.take1(receiver, cmd);
-    }
-
-    @Override
-    public void remove(String key) {
-        manager.remove(key);
-    }
-
-    @Override public void finish() {
-        manager.finish();
-    }
-
-    @Override
-    public void close() {
-
-    }
-
-    public void recommand(Receiver receiver, CodeSegment cs) {
-        manager.recommand(receiver, cs);
-    }
-
-    @Override
-    public void ping(String returnKey) {
-
-    }
-
-    @Override
-    public void response(String returnKey) {
-
-    }
-
-    @Override
-    public void shutdown() {
-
-    }
-
-    @Override
-    public void setSendError(boolean b) {
-
-    }
-}
--- a/src/main/java/alice/datasegment/DataSegment.java	Tue Dec 08 16:29:09 2015 +0900
+++ b/src/main/java/alice/datasegment/DataSegment.java	Sun Dec 13 23:49:46 2015 +0900
@@ -10,13 +10,11 @@
 
     private static DataSegment dataSegment = new DataSegment();
     private LocalDataSegmentManager local = new LocalDataSegmentManager();
-    private CompressedLocalDataSegmentManager compressedLocal = new CompressedLocalDataSegmentManager(local);//追加
     private ConcurrentHashMap<String, DataSegmentManager> dataSegmentManagers = new ConcurrentHashMap<String, DataSegmentManager>(); //TODO Over Head
     private ConcurrentHashMap<String, IncomingTcpConnection> acceptHash = new ConcurrentHashMap<String, IncomingTcpConnection>();
 
     private DataSegment() {
         dataSegmentManagers.put("local", local);
-        dataSegmentManagers.put("compressedlocal", compressedLocal);
     }
 
     public static DataSegmentManager get(String key) {
@@ -31,16 +29,12 @@
         return dataSegment.local;
     }
 
-    public static CompressedLocalDataSegmentManager getCompressedLocal() {//追加
-        return dataSegment.compressedLocal;
-    }
-
     public static void register(String key, DataSegmentManager manager) {
         dataSegment.dataSegmentManagers.put(key, manager);
     }
 
-    public static RemoteDataSegmentManager connect(String connectionKey, String reverseKey, String hostName, int port) {
-        if (connectionKey.startsWith("compressed")){//compressedが含まれていたらエラーを返して終了
+    public static RemoteDataSegmentManager connect(String connectionKey, String reverseKey, String hostName, int port) {//create RemoteDSM
+        if (connectionKey.startsWith("compressed")){
             System.out.println("You can't use 'compressed' for DataSegmentManager name.");
             System.exit(0);
         }
--- a/src/main/java/alice/datasegment/DataSegmentKey.java	Tue Dec 08 16:29:09 2015 +0900
+++ b/src/main/java/alice/datasegment/DataSegmentKey.java	Sun Dec 13 23:49:46 2015 +0900
@@ -6,7 +6,7 @@
 import alice.datasegment.Command;
 
 /**
- * run command
+ * This class has DataSegment value and run command method
  *
  * Synchronized DataSegment for each DataSegment key
  * @author kazz
--- a/src/main/java/alice/datasegment/LocalDataSegmentManager.java	Tue Dec 08 16:29:09 2015 +0900
+++ b/src/main/java/alice/datasegment/LocalDataSegmentManager.java	Sun Dec 13 23:49:46 2015 +0900
@@ -15,12 +15,6 @@
     private ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>();
     private Logger logger = Logger.getLogger("local");
 
-    private ThreadPoolExecutor dataSegmentExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), // initial number of threads
-            Runtime.getRuntime().availableProcessors(),
-            Integer.MAX_VALUE, // keepAliveTime
-            TimeUnit.SECONDS,
-            new LinkedBlockingQueue<Runnable>());
-
     public LocalDataSegmentManager() {
         //new Thread(replyThread, "LocalDataSegmentManager-replyCommand").start();
     }
@@ -29,27 +23,6 @@
         reverseKey = s;
     }
 
-    private class RunCommand implements Runnable {
-
-        DataSegmentKey key;
-        Command cmd;
-
-        public RunCommand(DataSegmentKey key, Command cmd) {
-            this.key = key;
-            this.cmd = cmd;
-        }
-
-        @Override
-        public void run() {
-            key.runCommand(cmd);
-        }
-
-    }
-
-    public void submitCommand(DataSegmentKey key, Command cmd) {
-        dataSegmentExecutor.execute(new RunCommand(key, cmd));
-    }
-
     public DataSegmentKey getDataSegmentKey(String key) {
         DataSegmentKey dsKey = dataSegments.get(key);
         if (dsKey != null)
@@ -72,10 +45,10 @@
     @Override
     public void put(String key, ReceiveData rData, boolean quickFlag) {
         Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, reverseKey);
-        put1(key, cmd);
+        runOdsAPI(key, cmd);
     }
 
-    public void put1(String key, Command cmd) {
+    private void runOdsAPI(String key, Command cmd) {
         DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
         dataSegmentKey.runCommand(cmd);
         if (logger.isDebugEnabled())
@@ -89,20 +62,18 @@
     @Override
     public void update(String key, ReceiveData rData, boolean quickFlag) {
         Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, reverseKey);
-        put1(key, cmd);
+        runOdsAPI(key, cmd);
     }
 
     @Override
     public void take(Receiver receiver, CodeSegment cs, boolean quickFlag) {
-
         Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, 0, replyQueue, cs, null);
-        take1(receiver, cmd);
+        runIdsAPI(receiver, cmd);
     }
 
-    public void take1(Receiver receiver, Command cmd) {
+    private void runIdsAPI(Receiver receiver, Command cmd) {
         int seq = this.seq.getAndIncrement();
         cmd.setSeq(seq);
-        //seqHash.put(seq, cmd);
         DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key);
         dataSegmentKey.runCommand(cmd);
         if (logger.isDebugEnabled())
@@ -112,7 +83,7 @@
     @Override
     public void peek(Receiver receiver, CodeSegment cs, boolean quickFlag) {
         Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, 0, replyQueue, cs, null);
-        take1(receiver, cmd);
+        runIdsAPI(receiver, cmd);
     }
 
     @Override
--- a/src/main/java/alice/datasegment/ReceiveData.java	Tue Dec 08 16:29:09 2015 +0900
+++ b/src/main/java/alice/datasegment/ReceiveData.java	Sun Dec 13 23:49:46 2015 +0900
@@ -104,15 +104,15 @@
     public <T> T asClass(Class<T> clazz) {///javasist
 
         try {
-            if (val != null) {
-                return (T) val;
+            if (val == null) {
+                if (zMessagePack != null && messagePack == null) {
+                    messagePack = unzip(zMessagePack, dataSize);
+                }
+
+                val = packer.read(messagePack, clazz);
             }
 
-            if (zMessagePack != null && messagePack == null) {
-                messagePack = unzip(zMessagePack, dataSize);
-            }
-
-            return packer.read(messagePack, clazz);
+            return (T) val;
 
         } catch (IOException e) {// | DataFormatException e
             e.printStackTrace();
--- a/src/main/java/alice/test/codesegment/remote/RemoteIncrement.java	Tue Dec 08 16:29:09 2015 +0900
+++ b/src/main/java/alice/test/codesegment/remote/RemoteIncrement.java	Sun Dec 13 23:49:46 2015 +0900
@@ -18,9 +18,9 @@
         if (num == 10) System.exit(0);
 
         RemoteIncrement cs = new RemoteIncrement();
+
+        ods.put("local", "num", num);
         cs.num.setKey("compressedremote", "num");
-
-        ods.put("compressedlocal", "num", num);
     }
 
 }
\ No newline at end of file
--- a/src/main/java/alice/test/codesegment/remote/RemoteStartCodeSegment.java	Tue Dec 08 16:29:09 2015 +0900
+++ b/src/main/java/alice/test/codesegment/remote/RemoteStartCodeSegment.java	Sun Dec 13 23:49:46 2015 +0900
@@ -7,8 +7,8 @@
     @Override
     public void run() {
         RemoteIncrement cs = new RemoteIncrement();
+        ods.put("local", "num", 0);
         cs.num.setKey("compressedremote", "num");
 
-        ods.put("compressedlocal", "num", 0);
     }
 }
\ No newline at end of file