changeset 224:409d7679cf7b

merge
author Shinji KONO <kono@ie.u-ryukyu.ac.jp>
date Thu, 28 Mar 2013 00:21:38 +0900
parents 82a1c25ca0c8 (diff) f4aaada20712 (current diff)
children bc061ee5f31f
files lib/msgpack-0.6.6-SNAPSHOT-sources.jar lib/msgpack-0.6.6-SNAPSHOT.jar scripts/view_log.sh src/alice/daemon/IncomingTcpConnection.java src/alice/datasegment/DataSegmentKey.java src/alice/datasegment/LocalDataSegmentManager.java
diffstat 4 files changed, 37 insertions(+), 53 deletions(-) [+]
line wrap: on
line diff
--- a/src/alice/daemon/IncomingTcpConnection.java	Thu Mar 28 00:08:35 2013 +0900
+++ b/src/alice/daemon/IncomingTcpConnection.java	Thu Mar 28 00:21:38 2013 +0900
@@ -43,23 +43,23 @@
 				CommandType type = CommandType.getCommandTypeFromId(msg.type);
 				switch (type) {
 				case UPDATE:
-					lmanager.addCommand(getDataSegmentKey(msg),
+					lmanager.submitCommand(getDataSegmentKey(msg),
 							new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey));
 					break;
 				case PUT:
-					lmanager.addCommand(getDataSegmentKey(msg),
+					lmanager.submitCommand(getDataSegmentKey(msg),
 							new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey));
 					break;
 				case PEEK:
-					lmanager.addCommand(getDataSegmentKey(msg),
+					lmanager.submitCommand(getDataSegmentKey(msg),
 							new Command(type, null, null, null, msg.index, msg.seq, connection.sendQueue, null, null));
 					break;
 				case TAKE:
-					lmanager.addCommand(getDataSegmentKey(msg),
+					lmanager.submitCommand(getDataSegmentKey(msg),
 							new Command(type, null, null, null, msg.index, msg.seq, connection.sendQueue, null, null));
 					break;	
 				case REMOVE:
-					lmanager.addCommand(getDataSegmentKey(msg),
+					lmanager.submitCommand(getDataSegmentKey(msg),
 							new Command(type, null, null, null, 0, 0, null, null, null));
 					break;
 				case REPLY:
--- a/src/alice/datasegment/DataSegmentKey.java	Thu Mar 28 00:08:35 2013 +0900
+++ b/src/alice/datasegment/DataSegmentKey.java	Thu Mar 28 00:21:38 2013 +0900
@@ -2,7 +2,6 @@
 
 import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import alice.datasegment.Command; 
 
@@ -15,9 +14,9 @@
 	
 	private ArrayList<DataSegmentValue> dataList = new ArrayList<DataSegmentValue>();
 	private ArrayList<Command> waitList = new ArrayList<Command>();
-	private AtomicInteger tailIndex = new AtomicInteger(1);
+	private int tailIndex = 1;
 	
-	public void runCommand(Command cmd) {
+	public synchronized void runCommand(Command cmd) {
 		switch (cmd.type) {
 		case UPDATE:
 			if (dataList.size() != 0) {
@@ -45,7 +44,7 @@
 			}
 			break;
 		case PEEK:
-			if (cmd.index >= tailIndex.get()) {
+			if (cmd.index >= tailIndex) {
 				waitList.add(cmd);
 				break;
 			}
@@ -65,7 +64,7 @@
 				waitList.add(cmd);
 			break;
 		case TAKE:
-			if (cmd.index >= tailIndex.get()) {
+			if (cmd.index >= tailIndex) {
 				waitList.add(cmd);
 				break;
 			}
--- a/src/alice/datasegment/KeyCommand.java	Thu Mar 28 00:08:35 2013 +0900
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,17 +0,0 @@
-package alice.datasegment;
-
-public class KeyCommand {
-	
-	DataSegmentKey key;
-	Command cmd;
-	
-	public KeyCommand(DataSegmentKey key, Command cmd) {
-		this.key = key;
-		this.cmd = cmd;
-	}
-	
-	public void runCommand() {
-		key.runCommand(cmd);
-	}
-	
-}
\ No newline at end of file
--- a/src/alice/datasegment/LocalDataSegmentManager.java	Thu Mar 28 00:08:35 2013 +0900
+++ b/src/alice/datasegment/LocalDataSegmentManager.java	Thu Mar 28 00:21:38 2013 +0900
@@ -2,6 +2,8 @@
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.log4j.Logger;
 import org.msgpack.type.Value;
@@ -12,37 +14,37 @@
 	
 	private String reverseKey = "local";
 	private ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>();
-	private LinkedBlockingQueue<KeyCommand> cmdQueue = new LinkedBlockingQueue<KeyCommand>();
 	private Logger logger = Logger.getLogger("local");
 
-	private Runnable keyCommandThread = new Runnable() {
+	private ThreadPoolExecutor dataSegmentExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), // initial number of threads
+			Runtime.getRuntime().availableProcessors(),
+			Integer.MAX_VALUE, // keepAliveTime
+			TimeUnit.SECONDS,
+			new LinkedBlockingQueue<Runnable>());
+	
+	public LocalDataSegmentManager() {
+		new Thread(replyThread, "LocalDataSegmentManager-replyCommand").start();
+	}
 
+	private class RunCommand implements Runnable {
+		
+		DataSegmentKey key;
+		Command cmd;
+		
+		public RunCommand(DataSegmentKey key, Command cmd) {
+			this.key = key;
+			this.cmd = cmd;
+		}
+		
 		@Override
 		public void run() {
-			while (true) {
-				KeyCommand keyCmd = null;
-				try {
-					keyCmd = cmdQueue.take();
-				} catch (InterruptedException e) {
-					e.printStackTrace();
-				}
-				keyCmd.runCommand();
-			}
+			key.runCommand(cmd);
 		}
 		
-	};
-
-	public LocalDataSegmentManager() {
-		new Thread(replyThread, "LocalDataSegmentManager-replyCommand").start();
-		new Thread(keyCommandThread, "LocalDataSegmentManager-runKeyCommand").start();
 	}
-
-	public void addCommand(DataSegmentKey key, Command cmd) {
-		try {
-			cmdQueue.put(new KeyCommand(key, cmd));
-		} catch (InterruptedException e) {
-			e.printStackTrace();
-		}
+	
+	public void submitCommand(DataSegmentKey key, Command cmd) {
+		dataSegmentExecutor.execute(new RunCommand(key, cmd));
 	}
 	
 	public DataSegmentKey getDataSegmentKey(String key) {
@@ -102,7 +104,7 @@
 		int seq = this.seq.getAndIncrement();
 		Command cmd = new Command(CommandType.TAKE, receiver, key, null, index, seq, replyQueue, cs, null);
 		seqHash.put(seq, cmd);
-		addCommand(dataSegmentKey, cmd);
+		submitCommand(dataSegmentKey, cmd);
 		if (logger.isDebugEnabled())
 			logger.debug(cmd.getCommandString());
 	}
@@ -113,7 +115,7 @@
 		int seq = this.seq.getAndIncrement();
 		Command cmd = new Command(CommandType.PEEK, receiver, key, null, index, seq, replyQueue, cs, null);
 		seqHash.put(seq, cmd); // waiting for PUT or UPDATE at unique sequence number
-		addCommand(dataSegmentKey, cmd);
+		submitCommand(dataSegmentKey, cmd);
 		if (logger.isDebugEnabled())
 			logger.debug(cmd.getCommandString());
 	}
@@ -122,7 +124,7 @@
 	public void remove(String key) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
 		Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, replyQueue, null, null);
-		addCommand(dataSegmentKey, cmd);
+		submitCommand(dataSegmentKey, cmd);
 		if (logger.isDebugEnabled())
 			logger.debug(cmd.getCommandString());
 	}