diff src/main/java/alice/datasegment/LocalDataSegmentManager.java @ 345:8f71c3e6f11d

Change directory structure Maven standard
author sugi
date Wed, 16 Apr 2014 18:26:07 +0900
parents
children 388e7d4b0624
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/alice/datasegment/LocalDataSegmentManager.java	Wed Apr 16 18:26:07 2014 +0900
@@ -0,0 +1,172 @@
+package alice.datasegment;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+
+import alice.codesegment.CodeSegment;
+
+public class LocalDataSegmentManager extends DataSegmentManager {
+	
+	private String reverseKey = "local";
+	private ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>();
+	private Logger logger = Logger.getLogger("local");
+
+	private ThreadPoolExecutor dataSegmentExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), // initial number of threads
+			Runtime.getRuntime().availableProcessors(),
+			Integer.MAX_VALUE, // keepAliveTime
+			TimeUnit.SECONDS,
+			new LinkedBlockingQueue<Runnable>());
+	
+	public LocalDataSegmentManager() {
+		new Thread(replyThread, "LocalDataSegmentManager-replyCommand").start();
+	}
+
+	private class RunCommand implements Runnable {
+		
+		DataSegmentKey key;
+		Command cmd;
+		
+		public RunCommand(DataSegmentKey key, Command cmd) {
+			this.key = key;
+			this.cmd = cmd;
+		}
+		
+		@Override
+		public void run() {
+			key.runCommand(cmd);
+		}
+		
+	}
+	
+	public void submitCommand(DataSegmentKey key, Command cmd) {
+		dataSegmentExecutor.execute(new RunCommand(key, cmd));
+	}
+	
+	public DataSegmentKey getDataSegmentKey(String key) {
+		DataSegmentKey dsKey = dataSegments.get(key);
+		if (dsKey != null)
+			return dsKey;
+		if (key == null)
+			return null;
+		DataSegmentKey newDataSegmentKey = new DataSegmentKey();
+		DataSegmentKey dataSegmentKey = dataSegments.putIfAbsent(key, newDataSegmentKey);
+		if (dataSegmentKey == null) {
+			dataSegmentKey = newDataSegmentKey;
+		}
+		return dataSegmentKey;
+	}
+	
+	@Override
+	public void put(String key, Object val) {
+		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
+		Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, reverseKey);
+		dataSegmentKey.runCommand(cmd);
+		if (logger.isDebugEnabled())
+			logger.debug(cmd.getCommandString());
+	}
+	
+	@Override
+	public void quickPut(String key, Object val) {
+		put(key, val);
+	}
+	
+	/**
+	 * Enqueue update command to the queue of each DataSegment key
+	 */
+	
+	@Override
+	public void update(String key, Object val) {
+		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
+		Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, reverseKey);
+		dataSegmentKey.runCommand(cmd);
+		if (logger.isDebugEnabled())
+			logger.debug(cmd.getCommandString());
+	}
+	
+	
+	@Override
+	public void quickUpdate(String key, Object val) {
+		update(key, val);
+	}
+
+	
+	
+	@Override
+	public void take(Receiver receiver, CodeSegment cs) {
+		DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key);
+		int seq = this.seq.getAndIncrement();
+		Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
+		dataSegmentKey.runCommand(cmd);
+		if (logger.isDebugEnabled())
+			logger.debug(cmd.getCommandString());
+	}
+
+	@Override
+	public void quickTake(Receiver receiver, CodeSegment cs) {
+		take(receiver, cs);		
+	}
+	
+	@Override
+	public void peek(Receiver receiver, CodeSegment cs) {
+		DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key);
+		int seq = this.seq.getAndIncrement();
+		Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
+		dataSegmentKey.runCommand(cmd);
+		if (logger.isDebugEnabled())
+			logger.debug(cmd.getCommandString());
+	}
+	
+	@Override
+	public void quickPeek(Receiver receiver, CodeSegment cs) {
+		peek(receiver, cs);
+	}
+
+
+	@Override
+	public void remove(String key) {
+		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
+		Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, replyQueue, null, null);
+		dataSegmentKey.runCommand(cmd);
+		if (logger.isDebugEnabled())
+			logger.debug(cmd.getCommandString());
+	}
+	
+	@Override public void finish() {
+		System.exit(0);
+	}
+
+	@Override
+	public void close() {
+		
+	}
+
+	public void recommand(Receiver receiver, CodeSegment cs) {
+		DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key);
+		int seq = this.seq.getAndIncrement();
+		Command cmd = new Command(receiver.type, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
+		dataSegmentKey.runCommand(cmd);
+		if (logger.isDebugEnabled())
+			logger.debug(cmd.getCommandString());
+		
+	}
+
+	@Override
+	public void ping(String returnKey) {
+		
+	}
+
+	@Override
+	public void response(String returnKey) {
+				
+	}
+
+	@Override
+	public void shutdown(String key) {
+		
+	}
+
+}