changeset 69:1d4f2b72fb31

delete KeyThread
author kazz <kazz@cr.ie.u-ryukyu.ac.jp>
date Tue, 21 Feb 2012 19:44:33 +0900
parents d4c7f7b1096b
children f2d4a4686036
files src/alice/daemon/IncomingTcpConnection.java src/alice/datasegment/DataSegmentKey.java src/alice/datasegment/DataSegmentManager.java src/alice/datasegment/KeyCommand.java src/alice/datasegment/LocalDataSegmentManager.java
diffstat 5 files changed, 147 insertions(+), 120 deletions(-) [+]
line wrap: on
line diff
--- a/src/alice/daemon/IncomingTcpConnection.java	Sat Feb 11 16:40:03 2012 +0900
+++ b/src/alice/daemon/IncomingTcpConnection.java	Tue Feb 21 19:44:33 2012 +0900
@@ -20,6 +20,8 @@
 	public Connection connection;
 	public DataSegmentManager manager;
 	public String reverseKey;
+	private LocalDataSegmentManager lmanager = DataSegment.getLocal();;
+
 	public IncomingTcpConnection(Connection connection, DataSegmentManager manager, String reverseKey) {
 		this.manager = manager;
 		this.connection = connection;
@@ -42,44 +44,36 @@
 				CommandType type = CommandType.getCommandTypeFromId(msg.type);
 				switch (type) {
 				case UPDATE:
-					getDataSegmentKey(msg).addCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey));
+					lmanager.addCommand(getDataSegmentKey(msg),
+							new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey));
 					break;
 				case PUT:
-					getDataSegmentKey(msg).addCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey));
+					lmanager.addCommand(getDataSegmentKey(msg),
+							new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey));
 					break;
 				case PEEK:
-					//Command(CommandType cmdType, String argKey, Value val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs) {
-					getDataSegmentKey(msg).addCommand(new Command(type, null, null, null, msg.index, msg.seq, connection.sendQueue, null, null));
+					lmanager.addCommand(getDataSegmentKey(msg),
+							new Command(type, null, null, null, msg.index, msg.seq, connection.sendQueue, null, null));
 					break;
 				case TAKE:
-					getDataSegmentKey(msg).addCommand(new Command(type, null, null, null, msg.index, msg.seq, connection.sendQueue, null, null));
+					lmanager.addCommand(getDataSegmentKey(msg),
+							new Command(type, null, null, null, msg.index, msg.seq, connection.sendQueue, null, null));
 					break;	
 				case REMOVE:
-					getDataSegmentKey(msg).addCommand(new Command(type, null, null, null, 0, 0, null, null, null));
+					lmanager.addCommand(getDataSegmentKey(msg),
+							new Command(type, null, null, null, 0, 0, null, null, null));
 					break;
 				case REPLY:
-					try {
-						manager.replyQueue.put(new Command(type, null, null, msg.val, msg.index, msg.seq, null, null, null));
-					} catch (InterruptedException e) {
-						e.printStackTrace();
-					}
+					manager.addReplyCommand(new Command(type, null, null, msg.val, msg.index, msg.seq, null, null, null));
 					break;
 				default:
 					break;
 				}
 			} catch (ClosedChannelException e) {
-				try {
-					connection.sendQueue.put(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null));
-				} catch (InterruptedException e1) {
-					e1.printStackTrace();
-				}
+				connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null));
 				return;
 			} catch (EOFException e) {
-				try {
-					connection.sendQueue.put(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null));
-				} catch (InterruptedException e1) {
-					e1.printStackTrace();
-				}
+				connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null));
 				return;
 			} catch (IOException e) {
 				e.printStackTrace();
@@ -87,7 +81,6 @@
 		}
 	}
 	private DataSegmentKey getDataSegmentKey(CommandMessage msg) {
-		LocalDataSegmentManager lmanager = DataSegment.getLocal();
 		return lmanager.getDataSegmentKey(msg.key);
 	}
 }
--- a/src/alice/datasegment/DataSegmentKey.java	Sat Feb 11 16:40:03 2012 +0900
+++ b/src/alice/datasegment/DataSegmentKey.java	Tue Feb 21 19:44:33 2012 +0900
@@ -2,7 +2,6 @@
 
 import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import alice.datasegment.Command; 
@@ -14,101 +13,85 @@
  */
 public class DataSegmentKey {
 	
-	private String key;
-	private LinkedBlockingQueue<Command> cmdQueue = new LinkedBlockingQueue<Command>();
 	private ArrayList<DataSegmentValue> dataList = new ArrayList<DataSegmentValue>();
 	private ArrayList<Command> waitList = new ArrayList<Command>();
 	private AtomicInteger tailIndex = new AtomicInteger(1);
-	private Thread keyThread;
 	
-	public DataSegmentKey(String key) {
-		this.key = key;
-	}
-	
-	public void addCommand(Command cmd) {
-		cmdQueue.add(cmd);
-	}
-	
-	/**
-	 * too many threads are generated here
-	 * single scheduling queue and waiting queue can be used in future
-	 */
-	public void runKeyThread() {
-		this.keyThread = new Thread() {
-			@Override
-			public void run() {
-				while (true) {
+	public void runCommand(Command cmd) {
+		switch (cmd.type) {
+		case UPDATE:
+			if (dataList.size() != 0) {
+				dataList.remove(0);
+			}
+		case PUT:
+			int index = tailIndex.getAndIncrement();
+			DataSegmentValue dsv = new DataSegmentValue(index, cmd.val, cmd.reverseKey); 
+			dataList.add(dsv);
+			// Process waiting peek and take commands
+			for (Iterator<Command> iter = waitList.iterator(); iter.hasNext(); ) {
+				Command waitCmd = iter.next();
+				if (waitCmd.index < index) {
 					try {
-						Command cmd = cmdQueue.take();
-						switch (cmd.type) {
-						case UPDATE:
-							if (dataList.size() != 0) {
-								dataList.remove(0);
-							}
-						case PUT:
-							int index = tailIndex.getAndIncrement();
-							DataSegmentValue dsv = new DataSegmentValue(index, cmd.val, cmd.reverseKey); 
-							dataList.add(dsv);
-							// Process waiting peek and take commands
-							for (Iterator<Command> iter = waitList.iterator(); iter.hasNext(); ) {
-								Command waitCmd = iter.next();
-								if (waitCmd.index < index) {
-									waitCmd.replyQueue.put(new Command(CommandType.REPLY, null, null, cmd.val, index, waitCmd.seq, null, null, cmd.reverseKey));
-									iter.remove();
-									if (waitCmd.type == CommandType.TAKE) { // someone is waiting for this put or update command
-										dataList.remove(dsv);
-										break;
-									}
-								}
-							}
-							break;
-						case PEEK:
-							if (cmd.index >= tailIndex.get()) {
-								waitList.add(cmd);
-								break;
-							}
-							boolean waitFlag2 = true;
-							for (DataSegmentValue data : dataList) {
-								if (data.index > cmd.index) {
-									cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null, data.from));
-									waitFlag2 = false;
-									break;
-								}
-							}
-							if (waitFlag2)
-								waitList.add(cmd);
-							break;
-						case TAKE:
-							if (cmd.index >= tailIndex.get()) {
-								waitList.add(cmd);
-								break;
-							}
-							boolean waitFlag = true;
-							for (Iterator<DataSegmentValue> iter = dataList.iterator(); iter.hasNext(); ) {
-								DataSegmentValue data = iter.next();
-								if (data.index > cmd.index) {
-									cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null, data.from));
-									iter.remove();
-									waitFlag = false;
-									break;
-								}
-							}
-							if (waitFlag)
-								waitList.add(cmd);
-							break;
-						case REMOVE:
-							// TODO: implements later
-							break;
-						default:
-						}
+						waitCmd.replyQueue.put(new Command(CommandType.REPLY, null, null, cmd.val, index, waitCmd.seq, null, null, cmd.reverseKey));
 					} catch (InterruptedException e) {
 						e.printStackTrace();
 					}
+					iter.remove();
+					if (waitCmd.type == CommandType.TAKE) { // someone is waiting for this put or update command
+						dataList.remove(dsv);
+						break;
+					}
+				}
+			}
+			break;
+		case PEEK:
+			if (cmd.index >= tailIndex.get()) {
+				waitList.add(cmd);
+				break;
+			}
+			boolean waitFlag2 = true;
+			for (DataSegmentValue data : dataList) {
+				if (data.index > cmd.index) {
+					try {
+						cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null, data.from));
+					} catch (InterruptedException e) {
+						e.printStackTrace();
+					}
+					waitFlag2 = false;
+					break;
 				}
 			}
-		};
-		keyThread.setName("DataSegmentKey-" + key);
-		keyThread.start();
+			if (waitFlag2)
+				waitList.add(cmd);
+			break;
+		case TAKE:
+			if (cmd.index >= tailIndex.get()) {
+				waitList.add(cmd);
+				break;
+			}
+			boolean waitFlag = true;
+			for (Iterator<DataSegmentValue> iter = dataList.iterator(); iter.hasNext(); ) {
+				DataSegmentValue data = iter.next();
+				if (data.index > cmd.index) {
+					try {
+						cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null, data.from));
+					} catch (InterruptedException e) {
+						e.printStackTrace();
+					}
+					iter.remove();
+					waitFlag = false;
+					break;
+				}
+			}
+			if (waitFlag)
+				waitList.add(cmd);
+			break;
+		case REMOVE:
+			// TODO: implements later
+			break;
+		default:
+		}
+
 	}
 	
 }
--- a/src/alice/datasegment/DataSegmentManager.java	Sat Feb 11 16:40:03 2012 +0900
+++ b/src/alice/datasegment/DataSegmentManager.java	Tue Feb 21 19:44:33 2012 +0900
@@ -12,7 +12,7 @@
 public abstract class DataSegmentManager {
 	
 	protected ConcurrentHashMap<Integer, Command> seqHash = new ConcurrentHashMap<Integer, Command>();
-	public LinkedBlockingQueue<Command> replyQueue = new LinkedBlockingQueue<Command>();
+	protected LinkedBlockingQueue<Command> replyQueue = new LinkedBlockingQueue<Command>();
 	protected AtomicInteger seq = new AtomicInteger(1);
 	boolean debug = false;
 	
@@ -40,6 +40,14 @@
 		
 	};
 	
+	public void addReplyCommand(Command cmd) {
+		try {
+			replyQueue.put(cmd);
+		} catch (InterruptedException e) {
+			e.printStackTrace();
+		}
+	}
+	
 	public abstract void put(String key, Value val, CodeSegment cs);
 	public abstract void update(String key, Value val, CodeSegment cs);
 	public void take(Receiver receiver, String key, CodeSegment cs) {
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/datasegment/KeyCommand.java	Tue Feb 21 19:44:33 2012 +0900
@@ -0,0 +1,17 @@
+package alice.datasegment;
+
+public class KeyCommand {
+	
+	DataSegmentKey key;
+	Command cmd;
+	
+	public KeyCommand(DataSegmentKey key, Command cmd) {
+		this.key = key;
+		this.cmd = cmd;
+	}
+	
+	public void runCommand() {
+		key.runCommand(cmd);
+	}
+	
+}
\ No newline at end of file
--- a/src/alice/datasegment/LocalDataSegmentManager.java	Sat Feb 11 16:40:03 2012 +0900
+++ b/src/alice/datasegment/LocalDataSegmentManager.java	Tue Feb 21 19:44:33 2012 +0900
@@ -1,6 +1,7 @@
 package alice.datasegment;
 
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.log4j.Logger;
 import org.msgpack.type.Value;
@@ -11,12 +12,39 @@
 	
 	private String reverseKey = "local";
 	private ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>();
-	
+	private LinkedBlockingQueue<KeyCommand> cmdQueue = new LinkedBlockingQueue<KeyCommand>();
 	private Logger logger = Logger.getLogger("local");
-	boolean debug = false;
+	private boolean debug = false;
+
+	private Runnable keyCommandThread = new Runnable() {
+
+		@Override
+		public void run() {
+			while (true) {
+				KeyCommand keyCmd = null;
+				try {
+					keyCmd = cmdQueue.take();
+				} catch (InterruptedException e) {
+					e.printStackTrace();
+				}
+				keyCmd.runCommand();
+			}
+		}
+		
+	};
+
 	
 	public LocalDataSegmentManager() {
-		new Thread(replyThread, "LocalDataSegmentManager").start();
+		new Thread(replyThread, "LocalDataSegmentManager-replyCommand").start();
+		new Thread(keyCommandThread, "LocalDataSegmentManager-runKeyCommand").start();
+	}
+
+	public void addCommand(DataSegmentKey key, Command cmd) {
+		try {
+			cmdQueue.put(new KeyCommand(key, cmd));
+		} catch (InterruptedException e) {
+			e.printStackTrace();
+		}
 	}
 	
 	public DataSegmentKey getDataSegmentKey(String key) {
@@ -25,10 +53,9 @@
 			return dsKey;
 		if (key == null)
 			return null;
-		DataSegmentKey newDataSegmentKey = new DataSegmentKey(key);
+		DataSegmentKey newDataSegmentKey = new DataSegmentKey();
 		DataSegmentKey dataSegmentKey = dataSegments.putIfAbsent(key, newDataSegmentKey);
 		if (dataSegmentKey == null) {
-			newDataSegmentKey.runKeyThread();
 			dataSegmentKey = newDataSegmentKey;
 		}
 		return dataSegmentKey;
@@ -38,12 +65,11 @@
 	public void put(String key, Value val, CodeSegment cs) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
 		Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, replyQueue, cs, reverseKey);
-		dataSegmentKey.addCommand(cmd);
+		addCommand(dataSegmentKey, cmd);
 		if (debug)
 			logger.debug(cmd.getCommandString());
 	}
 	
-	
 	/**
 	 * Enqueue update command to the queue of each DataSegment key
 	 */
@@ -51,7 +77,7 @@
 	public void update(String key, Value val, CodeSegment cs) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
 		Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, replyQueue, cs, reverseKey);
-		dataSegmentKey.addCommand(cmd);
+		addCommand(dataSegmentKey, cmd);
 		if (debug)
 			logger.debug(cmd.getCommandString());
 	}
@@ -62,7 +88,7 @@
 		int seq = this.seq.getAndIncrement();
 		Command cmd = new Command(CommandType.TAKE, receiver, key, null, index, seq, replyQueue, cs, null);
 		seqHash.put(seq, cmd);
-		dataSegmentKey.addCommand(cmd);
+		addCommand(dataSegmentKey, cmd);
 		if (debug)
 			logger.debug(cmd.getCommandString());
 	}
@@ -73,7 +99,7 @@
 		int seq = this.seq.getAndIncrement();
 		Command cmd = new Command(CommandType.PEEK, receiver, key, null, index, seq, replyQueue, cs, null);
 		seqHash.put(seq, cmd); // waiting for PUT or UPDATE at unique sequence number
-		dataSegmentKey.addCommand(cmd);
+		addCommand(dataSegmentKey, cmd);
 		if (debug)
 			logger.debug(cmd.getCommandString());
 	}
@@ -82,7 +108,7 @@
 	public void remove(String key) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
 		Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, replyQueue, null, null);
-		dataSegmentKey.addCommand(cmd);
+		addCommand(dataSegmentKey, cmd);
 		if (debug)
 			logger.debug(cmd.getCommandString());
 	}