Mercurial > hg > Database > Alice
diff src/main/java/alice/datasegment/LocalDataSegmentManager.java @ 345:8f71c3e6f11d
Change directory structure Maven standard
author | sugi |
---|---|
date | Wed, 16 Apr 2014 18:26:07 +0900 |
parents | |
children | 388e7d4b0624 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/alice/datasegment/LocalDataSegmentManager.java Wed Apr 16 18:26:07 2014 +0900 @@ -0,0 +1,172 @@ +package alice.datasegment; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.log4j.Logger; + +import alice.codesegment.CodeSegment; + +public class LocalDataSegmentManager extends DataSegmentManager { + + private String reverseKey = "local"; + private ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>(); + private Logger logger = Logger.getLogger("local"); + + private ThreadPoolExecutor dataSegmentExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), // initial number of threads + Runtime.getRuntime().availableProcessors(), + Integer.MAX_VALUE, // keepAliveTime + TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>()); + + public LocalDataSegmentManager() { + new Thread(replyThread, "LocalDataSegmentManager-replyCommand").start(); + } + + private class RunCommand implements Runnable { + + DataSegmentKey key; + Command cmd; + + public RunCommand(DataSegmentKey key, Command cmd) { + this.key = key; + this.cmd = cmd; + } + + @Override + public void run() { + key.runCommand(cmd); + } + + } + + public void submitCommand(DataSegmentKey key, Command cmd) { + dataSegmentExecutor.execute(new RunCommand(key, cmd)); + } + + public DataSegmentKey getDataSegmentKey(String key) { + DataSegmentKey dsKey = dataSegments.get(key); + if (dsKey != null) + return dsKey; + if (key == null) + return null; + DataSegmentKey newDataSegmentKey = new DataSegmentKey(); + DataSegmentKey dataSegmentKey = dataSegments.putIfAbsent(key, newDataSegmentKey); + if (dataSegmentKey == null) { + dataSegmentKey = newDataSegmentKey; + } + return dataSegmentKey; + } + + @Override + public void put(String key, Object val) { + DataSegmentKey dataSegmentKey = getDataSegmentKey(key); + Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, reverseKey); + dataSegmentKey.runCommand(cmd); + if (logger.isDebugEnabled()) + logger.debug(cmd.getCommandString()); + } + + @Override + public void quickPut(String key, Object val) { + put(key, val); + } + + /** + * Enqueue update command to the queue of each DataSegment key + */ + + @Override + public void update(String key, Object val) { + DataSegmentKey dataSegmentKey = getDataSegmentKey(key); + Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, reverseKey); + dataSegmentKey.runCommand(cmd); + if (logger.isDebugEnabled()) + logger.debug(cmd.getCommandString()); + } + + + @Override + public void quickUpdate(String key, Object val) { + update(key, val); + } + + + + @Override + public void take(Receiver receiver, CodeSegment cs) { + DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); + int seq = this.seq.getAndIncrement(); + Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); + dataSegmentKey.runCommand(cmd); + if (logger.isDebugEnabled()) + logger.debug(cmd.getCommandString()); + } + + @Override + public void quickTake(Receiver receiver, CodeSegment cs) { + take(receiver, cs); + } + + @Override + public void peek(Receiver receiver, CodeSegment cs) { + DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); + int seq = this.seq.getAndIncrement(); + Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); + dataSegmentKey.runCommand(cmd); + if (logger.isDebugEnabled()) + logger.debug(cmd.getCommandString()); + } + + @Override + public void quickPeek(Receiver receiver, CodeSegment cs) { + peek(receiver, cs); + } + + + @Override + public void remove(String key) { + DataSegmentKey dataSegmentKey = getDataSegmentKey(key); + Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, replyQueue, null, null); + dataSegmentKey.runCommand(cmd); + if (logger.isDebugEnabled()) + logger.debug(cmd.getCommandString()); + } + + @Override public void finish() { + System.exit(0); + } + + @Override + public void close() { + + } + + public void recommand(Receiver receiver, CodeSegment cs) { + DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); + int seq = this.seq.getAndIncrement(); + Command cmd = new Command(receiver.type, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); + dataSegmentKey.runCommand(cmd); + if (logger.isDebugEnabled()) + logger.debug(cmd.getCommandString()); + + } + + @Override + public void ping(String returnKey) { + + } + + @Override + public void response(String returnKey) { + + } + + @Override + public void shutdown(String key) { + + } + +}