changeset 537:8f949fa80653 dispose

Compressed RDSM refactoring
author Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
date Mon, 15 Jun 2015 19:34:00 +0900
parents d2f7d02c4976
children 8c17a9e66cc7
files src/main/java/alice/datasegment/CompressedRemoteDataSegmentManager.java
diffstat 1 files changed, 16 insertions(+), 98 deletions(-) [+]
line wrap: on
line diff
--- a/src/main/java/alice/datasegment/CompressedRemoteDataSegmentManager.java	Mon Jun 15 19:27:06 2015 +0900
+++ b/src/main/java/alice/datasegment/CompressedRemoteDataSegmentManager.java	Mon Jun 15 19:34:00 2015 +0900
@@ -13,51 +13,10 @@
 import alice.daemon.OutboundTcpConnection;
 
 public class CompressedRemoteDataSegmentManager extends DataSegmentManager {
-    protected Connection connection;
-    protected Logger logger;
-
-    public CompressedRemoteDataSegmentManager(){}
-
-    public CompressedRemoteDataSegmentManager(Connection c) {
-        logger = Logger.getLogger(c.name);
-        connection = c;
-        connection.name = "compressed" + c.name;
-    }
+    RemoteDataSegmentManager manager;
 
-    public CompressedRemoteDataSegmentManager(String connectionKey, final String reverseKey, final String hostName, final int port) {
-        logger = Logger.getLogger(connectionKey);
-        connection = new Connection();
-        connection.name = connectionKey;
-        final CompressedRemoteDataSegmentManager manager = this;
-        //new Thread(replyThread, "RemoteDataSegmentManager-" + connectionKey).start();
-        new Thread("Connect-" + connectionKey) {
-            public void run() {
-                boolean connect = true;
-                do {
-                    try {
-                        SocketChannel sc = SocketChannel.open(new InetSocketAddress(hostName, port));
-                        connection.socket = sc.socket();
-                        connection.socket.setTcpNoDelay(true);
-                        connect = false;
-                        logger.info("Connect to " + connection.getInfoString());
-                    } catch (IOException e) {
-                        try {
-                            Thread.sleep(50);
-                        } catch (InterruptedException e1) {
-                            e1.printStackTrace();
-                        }
-                    }
-                } while (connect);
-                IncomingTcpConnection in = new IncomingTcpConnection(connection, manager, reverseKey);
-                in.setName(reverseKey+"-IncomingTcp");
-                in.setPriority(MAX_PRIORITY);
-                in.start();
-                OutboundTcpConnection out = new OutboundTcpConnection(connection);
-                out.setName(connectionKey+"-OutboundTcp");
-                out.setPriority(MAX_PRIORITY);
-                out.start();
-            }
-        }.start();
+    public CompressedRemoteDataSegmentManager(RemoteDataSegmentManager manager) {
+        this.manager = manager;
     }
 
     /**
@@ -75,13 +34,7 @@
         Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, "");
         cmd.setCompressFlag(true);
 
-        if (quickFlag){
-            connection.write(cmd); // put command is executed right now
-        } else {
-            connection.sendCommand(cmd); // put command on the transmission thread
-        }
-        if (logger.isDebugEnabled())
-            logger.debug(cmd.getCommandString());
+        manager.put1(quickFlag, cmd);
     }
 
     @Override
@@ -96,93 +49,58 @@
         Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, "");
         cmd.setCompressFlag(true);
 
-        if (quickFlag){
-            connection.write(cmd);
-        } else {
-            connection.sendCommand(cmd);
-        }
-        if (logger.isDebugEnabled()
-            logger.debug(cmd.getCommandString());
+        manager.put1(quickFlag, cmd);
     }
 
     @Override
     public void take(Receiver receiver, CodeSegment cs, boolean quickFlag) {
-        int seq = this.seq.getAndIncrement();
-        System.err.println("CompressedDataSegment take seq :" + seq);
-        Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
+        Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, 0, replyQueue, cs, null);
         cmd.setCompressFlag(true);
-        cmd.setQuickFlag(quickFlag);
 
-        seqHash.put(seq, cmd);
-        if (quickFlag){
-            connection.write(cmd);
-        } else {
-            connection.sendCommand(cmd);
-        }
-        if (logger.isDebugEnabled())
-            logger.debug(cmd.getCommandString());
+        manager.take1(quickFlag, cmd);
     }
 
     @Override
     public void peek(Receiver receiver, CodeSegment cs, boolean quickFlag) {
-        int seq = this.seq.getAndIncrement();
-        Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
+        Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, 0, replyQueue, cs, null);
         cmd.setCompressFlag(true);
-        cmd.setQuickFlag(quickFlag);
 
-        seqHash.put(seq, cmd);
-        if (quickFlag){
-            connection.write(cmd);
-        } else {
-            connection.sendCommand(cmd);
-        }
-        if (logger.isDebugEnabled())
-            logger.debug(cmd.getCommandString());
+        manager.take1(quickFlag, cmd);
     }
 
     @Override
     public void remove(String key) {
-        Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, null, null, "");
-        connection.sendCommand(cmd);
-        if (logger.isDebugEnabled())
-            logger.debug(cmd.getCommandString());
+        manager.remove(key);
     }
 
     @Override
     public void finish() {
-        Command cmd = new Command(CommandType.FINISH, null, null, null, 0, 0, null, null, "");
-        connection.sendCommand(cmd);
+        manager.finish();
     }
 
     @Override
     public void ping(String returnKey) {
-        Command cmd = new Command(CommandType.PING, null, returnKey, null, 0, 0, null, null, "");
-        connection.write(cmd);
+        manager.ping(returnKey);
     }
 
     @Override
     public void response(String returnKey) {
-        Command cmd = new Command(CommandType.RESPONSE, null, returnKey, null, 0, 0, null, null, "");
-        connection.write(cmd);
+        manager.response(returnKey);
     }
 
     @Override
     public void close() {
-        Command cmd = new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, "");
-        connection.sendManager = false;
-        connection.sendCommand(cmd);
+        manager.close();
     }
 
     @Override
     public void shutdown() {
-        connection.close();
-        LinkedBlockingQueue<Command> queue = connection.sendQueue;
-        if (!queue.isEmpty()) queue.clear();
+        manager.shutdown();
     }
 
     @Override
     public void setSendError(boolean b) {
-        connection.sendManager = b;
+        manager.setSendError(b);
     }
 
 }