# HG changeset patch # User kazz # Date 1328946003 -32400 # Node ID d4c7f7b1096bbfac8fd079709f5c67b58bb4a9b1 # Parent 2afbb6404840e6437d9089e629b817607ec13cc1 remove copy at OutboundTcpConnection diff -r 2afbb6404840 -r d4c7f7b1096b scripts/ring_run.sh --- a/scripts/ring_run.sh Fri Feb 10 03:33:29 2012 +0900 +++ b/scripts/ring_run.sh Sat Feb 11 16:40:03 2012 +0900 @@ -9,12 +9,12 @@ ruby ./topology/ring.rb $1 > ./topology/ring.dot #dot -Tpng ./topology/ring.dot > ./topology/ring.png #open ./topology/ring.png -java -cp ../Alice.jar alice.topology.manager.TopologyManager -p 10000 -conf ./topology/ring.dot -log ./output/manager.log -level info & +java -cp ../Alice.jar alice.topology.manager.TopologyManager -p 10000 -conf ./topology/ring.dot -log ./output/manager.log -level debug & cnt=0 while [ $cnt -lt $max ] do - java -cp ../Alice.jar alice.test.topology.ring.RingTopology -host `hostname` -port 10000 -p `expr 20000 + $cnt` -log ./output/ring${cnt}.log -level info -count $count -size 4096 & + java -cp ../Alice.jar alice.test.topology.ring.RingTopology -host `hostname` -port 10000 -p `expr 20000 + $cnt` -log ./output/ring${cnt}.log -level debug -count $count -size 4096 & cnt=`expr $cnt + 1` done wait diff -r 2afbb6404840 -r d4c7f7b1096b src/alice/codesegment/CodeSegmentManager.java --- a/src/alice/codesegment/CodeSegmentManager.java Fri Feb 10 03:33:29 2012 +0900 +++ b/src/alice/codesegment/CodeSegmentManager.java Sat Feb 11 16:40:03 2012 +0900 @@ -19,7 +19,7 @@ private CodeSegmentManager() { Runnable prepareThread = new Runnable() { - + // TODO: remove this thread @Override public void run() { while (true) { diff -r 2afbb6404840 -r d4c7f7b1096b src/alice/daemon/OutboundTcpConnection.java --- a/src/alice/daemon/OutboundTcpConnection.java Fri Feb 10 03:33:29 2012 +0900 +++ b/src/alice/daemon/OutboundTcpConnection.java Sat Feb 11 16:40:03 2012 +0900 @@ -36,10 +36,7 @@ return; } CommandMessage cmdMsg = convert(cmd); - byte[] buf = msgpack.write(cmdMsg); - ByteBuffer buffer = ByteBuffer.allocateDirect(buf.length); - buffer.put(buf); - buffer.flip(); + ByteBuffer buffer = ByteBuffer.wrap(msgpack.write(cmdMsg)); while (buffer.hasRemaining()) { connection.socket.getChannel().write(buffer); } diff -r 2afbb6404840 -r d4c7f7b1096b src/alice/datasegment/DataSegmentManager.java --- a/src/alice/datasegment/DataSegmentManager.java Fri Feb 10 03:33:29 2012 +0900 +++ b/src/alice/datasegment/DataSegmentManager.java Sat Feb 11 16:40:03 2012 +0900 @@ -14,6 +14,8 @@ protected ConcurrentHashMap seqHash = new ConcurrentHashMap(); public LinkedBlockingQueue replyQueue = new LinkedBlockingQueue(); protected AtomicInteger seq = new AtomicInteger(1); + boolean debug = false; + protected Runnable replyThread = new Runnable() { Logger logger = Logger.getLogger("reply"); @Override @@ -28,7 +30,8 @@ } seqHash.remove(reply.seq); cmd.cs.ids.reply(cmd.receiver, new DataSegmentValue(reply.index, reply.val, reply.reverseKey)); - //logger.debug(reply.getCommandString() + " " + cmd.getCommandString()); + if (debug) + logger.debug(reply.getCommandString() + " " + cmd.getCommandString()); } catch (InterruptedException e) { e.printStackTrace(); } diff -r 2afbb6404840 -r d4c7f7b1096b src/alice/datasegment/LocalDataSegmentManager.java --- a/src/alice/datasegment/LocalDataSegmentManager.java Fri Feb 10 03:33:29 2012 +0900 +++ b/src/alice/datasegment/LocalDataSegmentManager.java Sat Feb 11 16:40:03 2012 +0900 @@ -2,7 +2,7 @@ import java.util.concurrent.ConcurrentHashMap; -//import org.apache.log4j.Logger; +import org.apache.log4j.Logger; import org.msgpack.type.Value; import alice.codesegment.CodeSegment; @@ -12,7 +12,8 @@ private String reverseKey = "local"; private ConcurrentHashMap dataSegments = new ConcurrentHashMap(); -// private Logger logger = Logger.getLogger("local"); + private Logger logger = Logger.getLogger("local"); + boolean debug = false; public LocalDataSegmentManager() { new Thread(replyThread, "LocalDataSegmentManager").start(); @@ -38,7 +39,8 @@ DataSegmentKey dataSegmentKey = getDataSegmentKey(key); Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, replyQueue, cs, reverseKey); dataSegmentKey.addCommand(cmd); -// logger.debug(cmd.getCommandString()); + if (debug) + logger.debug(cmd.getCommandString()); } @@ -50,7 +52,8 @@ DataSegmentKey dataSegmentKey = getDataSegmentKey(key); Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, replyQueue, cs, reverseKey); dataSegmentKey.addCommand(cmd); -// logger.debug(cmd.getCommandString()); + if (debug) + logger.debug(cmd.getCommandString()); } @Override @@ -60,7 +63,8 @@ Command cmd = new Command(CommandType.TAKE, receiver, key, null, index, seq, replyQueue, cs, null); seqHash.put(seq, cmd); dataSegmentKey.addCommand(cmd); -// logger.debug(cmd.getCommandString()); + if (debug) + logger.debug(cmd.getCommandString()); } @Override @@ -70,7 +74,8 @@ Command cmd = new Command(CommandType.PEEK, receiver, key, null, index, seq, replyQueue, cs, null); seqHash.put(seq, cmd); // waiting for PUT or UPDATE at unique sequence number dataSegmentKey.addCommand(cmd); -// logger.debug(cmd.getCommandString()); + if (debug) + logger.debug(cmd.getCommandString()); } @Override @@ -78,7 +83,8 @@ DataSegmentKey dataSegmentKey = getDataSegmentKey(key); Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, replyQueue, null, null); dataSegmentKey.addCommand(cmd); -// logger.debug(cmd.getCommandString()); + if (debug) + logger.debug(cmd.getCommandString()); } @Override public void finish() { diff -r 2afbb6404840 -r d4c7f7b1096b src/alice/datasegment/RemoteDataSegmentManager.java --- a/src/alice/datasegment/RemoteDataSegmentManager.java Fri Feb 10 03:33:29 2012 +0900 +++ b/src/alice/datasegment/RemoteDataSegmentManager.java Sat Feb 11 16:40:03 2012 +0900 @@ -16,6 +16,7 @@ Connection connection; Logger logger; + boolean debug = false; public RemoteDataSegmentManager(String connectionKey, final String reverseKey, final String hostName, final int port) { logger = Logger.getLogger(connectionKey); @@ -53,14 +54,16 @@ public void put(String key, Value val, CodeSegment cs) { Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, cs, null); connection.sendCommand(cmd); // put command on the transmission thread -// logger.debug(cmd.getCommandString()); + if (debug) + logger.debug(cmd.getCommandString()); } @Override public void update(String key, Value val, CodeSegment cs) { Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, cs, null); connection.sendCommand(cmd); -// logger.debug(cmd.getCommandString()); + if (debug) + logger.debug(cmd.getCommandString()); } @Override @@ -69,7 +72,8 @@ Command cmd = new Command(CommandType.TAKE, receiver, key, null, index, seq, replyQueue, cs, null); seqHash.put(seq, cmd); connection.sendCommand(cmd); -// logger.debug(cmd.getCommandString()); + if (debug) + logger.debug(cmd.getCommandString()); } @Override @@ -78,14 +82,16 @@ Command cmd = new Command(CommandType.PEEK, receiver, key, null, index, seq, replyQueue, cs, null); seqHash.put(seq, cmd); connection.sendCommand(cmd); -// logger.debug(cmd.getCommandString()); + if (debug) + logger.debug(cmd.getCommandString()); } @Override public void remove(String key) { Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, null, null, null); connection.sendCommand(cmd); -// logger.debug(cmd.getCommandString()); + if (debug) + logger.debug(cmd.getCommandString()); } @Override diff -r 2afbb6404840 -r d4c7f7b1096b src/alice/topology/manager/IncomingHosts.java --- a/src/alice/topology/manager/IncomingHosts.java Fri Feb 10 03:33:29 2012 +0900 +++ b/src/alice/topology/manager/IncomingHosts.java Sat Feb 11 16:40:03 2012 +0900 @@ -3,8 +3,6 @@ import java.util.HashMap; import java.util.LinkedList; -import org.apache.log4j.Logger; - import org.msgpack.type.ValueFactory; import alice.codesegment.CodeSegment; @@ -18,7 +16,6 @@ HashMap> topology; LinkedList nodeNames; Receiver host = ids.create(CommandType.TAKE); -// Logger logger = Logger.getLogger(IncomingHosts.class); public IncomingHosts(HashMap> topology, LinkedList nodeNames) { this.topology = topology;