changeset 68:d4c7f7b1096b

remove copy at OutboundTcpConnection
author kazz <kazz@cr.ie.u-ryukyu.ac.jp>
date Sat, 11 Feb 2012 16:40:03 +0900
parents 2afbb6404840
children 1d4f2b72fb31
files scripts/ring_run.sh src/alice/codesegment/CodeSegmentManager.java src/alice/daemon/OutboundTcpConnection.java src/alice/datasegment/DataSegmentManager.java src/alice/datasegment/LocalDataSegmentManager.java src/alice/datasegment/RemoteDataSegmentManager.java src/alice/topology/manager/IncomingHosts.java
diffstat 7 files changed, 32 insertions(+), 23 deletions(-) [+]
line wrap: on
line diff
--- a/scripts/ring_run.sh	Fri Feb 10 03:33:29 2012 +0900
+++ b/scripts/ring_run.sh	Sat Feb 11 16:40:03 2012 +0900
@@ -9,12 +9,12 @@
 ruby ./topology/ring.rb $1 > ./topology/ring.dot
 #dot -Tpng ./topology/ring.dot > ./topology/ring.png
 #open ./topology/ring.png
-java -cp ../Alice.jar alice.topology.manager.TopologyManager -p 10000 -conf ./topology/ring.dot -log ./output/manager.log -level info &
+java -cp ../Alice.jar alice.topology.manager.TopologyManager -p 10000 -conf ./topology/ring.dot -log ./output/manager.log -level debug &
 
 cnt=0
 while [ $cnt -lt $max ]
 do
-    java -cp ../Alice.jar alice.test.topology.ring.RingTopology -host `hostname` -port 10000 -p `expr 20000 + $cnt` -log ./output/ring${cnt}.log -level info -count $count -size 4096 &
+    java -cp ../Alice.jar alice.test.topology.ring.RingTopology -host `hostname` -port 10000 -p `expr 20000 + $cnt` -log ./output/ring${cnt}.log -level debug -count $count -size 4096 &
     cnt=`expr $cnt + 1`
 done
 wait
--- a/src/alice/codesegment/CodeSegmentManager.java	Fri Feb 10 03:33:29 2012 +0900
+++ b/src/alice/codesegment/CodeSegmentManager.java	Sat Feb 11 16:40:03 2012 +0900
@@ -19,7 +19,7 @@
 	
 	private CodeSegmentManager() {
 		Runnable prepareThread = new Runnable() {
-
+			// TODO: remove this thread
 			@Override
 			public void run() {
 				while (true) {
--- a/src/alice/daemon/OutboundTcpConnection.java	Fri Feb 10 03:33:29 2012 +0900
+++ b/src/alice/daemon/OutboundTcpConnection.java	Sat Feb 11 16:40:03 2012 +0900
@@ -36,10 +36,7 @@
 					return;
 				}
 				CommandMessage cmdMsg = convert(cmd);
-				byte[] buf = msgpack.write(cmdMsg);
-				ByteBuffer buffer = ByteBuffer.allocateDirect(buf.length);
-				buffer.put(buf);
-				buffer.flip();
+				ByteBuffer buffer = ByteBuffer.wrap(msgpack.write(cmdMsg));
 				while (buffer.hasRemaining()) {
 					connection.socket.getChannel().write(buffer);
 				}
--- a/src/alice/datasegment/DataSegmentManager.java	Fri Feb 10 03:33:29 2012 +0900
+++ b/src/alice/datasegment/DataSegmentManager.java	Sat Feb 11 16:40:03 2012 +0900
@@ -14,6 +14,8 @@
 	protected ConcurrentHashMap<Integer, Command> seqHash = new ConcurrentHashMap<Integer, Command>();
 	public LinkedBlockingQueue<Command> replyQueue = new LinkedBlockingQueue<Command>();
 	protected AtomicInteger seq = new AtomicInteger(1);
+	boolean debug = false;
+	
 	protected Runnable replyThread = new Runnable() {
 		Logger logger = Logger.getLogger("reply");
 		@Override
@@ -28,7 +30,8 @@
 					}
 					seqHash.remove(reply.seq);
 					cmd.cs.ids.reply(cmd.receiver, new DataSegmentValue(reply.index, reply.val, reply.reverseKey));
-					//logger.debug(reply.getCommandString() + " " + cmd.getCommandString());
+					if (debug)
+						logger.debug(reply.getCommandString() + " " + cmd.getCommandString());
 				} catch (InterruptedException e) {
 					e.printStackTrace();
 				}
--- a/src/alice/datasegment/LocalDataSegmentManager.java	Fri Feb 10 03:33:29 2012 +0900
+++ b/src/alice/datasegment/LocalDataSegmentManager.java	Sat Feb 11 16:40:03 2012 +0900
@@ -2,7 +2,7 @@
 
 import java.util.concurrent.ConcurrentHashMap;
 
-//import org.apache.log4j.Logger;
+import org.apache.log4j.Logger;
 import org.msgpack.type.Value;
 
 import alice.codesegment.CodeSegment;
@@ -12,7 +12,8 @@
 	private String reverseKey = "local";
 	private ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>();
 	
-//	private Logger logger = Logger.getLogger("local");
+	private Logger logger = Logger.getLogger("local");
+	boolean debug = false;
 	
 	public LocalDataSegmentManager() {
 		new Thread(replyThread, "LocalDataSegmentManager").start();
@@ -38,7 +39,8 @@
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
 		Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, replyQueue, cs, reverseKey);
 		dataSegmentKey.addCommand(cmd);
-//		logger.debug(cmd.getCommandString());
+		if (debug)
+			logger.debug(cmd.getCommandString());
 	}
 	
 	
@@ -50,7 +52,8 @@
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
 		Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, replyQueue, cs, reverseKey);
 		dataSegmentKey.addCommand(cmd);
-//		logger.debug(cmd.getCommandString());
+		if (debug)
+			logger.debug(cmd.getCommandString());
 	}
 
 	@Override
@@ -60,7 +63,8 @@
 		Command cmd = new Command(CommandType.TAKE, receiver, key, null, index, seq, replyQueue, cs, null);
 		seqHash.put(seq, cmd);
 		dataSegmentKey.addCommand(cmd);
-//		logger.debug(cmd.getCommandString());
+		if (debug)
+			logger.debug(cmd.getCommandString());
 	}
 
 	@Override
@@ -70,7 +74,8 @@
 		Command cmd = new Command(CommandType.PEEK, receiver, key, null, index, seq, replyQueue, cs, null);
 		seqHash.put(seq, cmd); // waiting for PUT or UPDATE at unique sequence number
 		dataSegmentKey.addCommand(cmd);
-//		logger.debug(cmd.getCommandString());
+		if (debug)
+			logger.debug(cmd.getCommandString());
 	}
 
 	@Override
@@ -78,7 +83,8 @@
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
 		Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, replyQueue, null, null);
 		dataSegmentKey.addCommand(cmd);
-//		logger.debug(cmd.getCommandString());
+		if (debug)
+			logger.debug(cmd.getCommandString());
 	}
 	
 	@Override public void finish() {
--- a/src/alice/datasegment/RemoteDataSegmentManager.java	Fri Feb 10 03:33:29 2012 +0900
+++ b/src/alice/datasegment/RemoteDataSegmentManager.java	Sat Feb 11 16:40:03 2012 +0900
@@ -16,6 +16,7 @@
 	
 	Connection connection;
 	Logger logger;
+	boolean debug = false;
 	
 	public RemoteDataSegmentManager(String connectionKey, final String reverseKey, final String hostName, final int port) {
 		logger = Logger.getLogger(connectionKey);
@@ -53,14 +54,16 @@
 	public void put(String key, Value val, CodeSegment cs) {
 		Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, cs, null);
 		connection.sendCommand(cmd); // put command on the transmission thread
-//		logger.debug(cmd.getCommandString());
+		if (debug)
+			logger.debug(cmd.getCommandString());
 	}
 
 	@Override
 	public void update(String key, Value val, CodeSegment cs) {
 		Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, cs, null);
 		connection.sendCommand(cmd);
-//		logger.debug(cmd.getCommandString());
+		if (debug)
+			logger.debug(cmd.getCommandString());
 	}
 
 	@Override
@@ -69,7 +72,8 @@
 		Command cmd = new Command(CommandType.TAKE, receiver, key, null, index, seq, replyQueue, cs, null);
 		seqHash.put(seq, cmd);
 		connection.sendCommand(cmd);
-//		logger.debug(cmd.getCommandString());
+		if (debug)
+			logger.debug(cmd.getCommandString());
 	}
 
 	@Override
@@ -78,14 +82,16 @@
 		Command cmd = new Command(CommandType.PEEK, receiver, key, null, index, seq, replyQueue, cs, null);
 		seqHash.put(seq, cmd);
 		connection.sendCommand(cmd);
-//		logger.debug(cmd.getCommandString());
+		if (debug)
+			logger.debug(cmd.getCommandString());
 	}
 
 	@Override
 	public void remove(String key) {
 		Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, null, null, null);
 		connection.sendCommand(cmd);
-//		logger.debug(cmd.getCommandString());
+		if (debug)
+			logger.debug(cmd.getCommandString());
 	}
 
 	@Override
--- a/src/alice/topology/manager/IncomingHosts.java	Fri Feb 10 03:33:29 2012 +0900
+++ b/src/alice/topology/manager/IncomingHosts.java	Sat Feb 11 16:40:03 2012 +0900
@@ -3,8 +3,6 @@
 import java.util.HashMap;
 import java.util.LinkedList;
 
-import org.apache.log4j.Logger;
-
 import org.msgpack.type.ValueFactory;
 
 import alice.codesegment.CodeSegment;
@@ -18,7 +16,6 @@
 	HashMap<String, LinkedList<NodeInfo>> topology;
 	LinkedList<String> nodeNames;
 	Receiver host = ids.create(CommandType.TAKE);
-//	Logger logger = Logger.getLogger(IncomingHosts.class);
 	
 	public IncomingHosts(HashMap<String, LinkedList<NodeInfo>> topology, LinkedList<String> nodeNames) {
 		this.topology = topology;