view src/alice/datasegment/LocalDataSegmentManager.java @ 198:f151dea22b2c working

add flip api
author sugi
date Tue, 19 Mar 2013 01:25:09 +0900
parents a85ff8dc16c1
children 15b68b65f8a4
line wrap: on
line source

package alice.datasegment;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.log4j.Logger;
import org.msgpack.type.Value;

import alice.codesegment.CodeSegment;

public class LocalDataSegmentManager extends DataSegmentManager {
	
	private String reverseKey = "local";
	private ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>();
	private LinkedBlockingQueue<KeyCommand> cmdQueue = new LinkedBlockingQueue<KeyCommand>();
	private Logger logger = Logger.getLogger("local");

	private Runnable keyCommandThread = new Runnable() {

		@Override
		public void run() {
			while (true) {
				KeyCommand keyCmd = null;
				try {
					keyCmd = cmdQueue.take();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				keyCmd.runCommand();
			}
		}
		
	};

	public LocalDataSegmentManager() {
		new Thread(replyThread, "LocalDataSegmentManager-replyCommand").start();
		new Thread(keyCommandThread, "LocalDataSegmentManager-runKeyCommand").start();
	}

	public void addCommand(DataSegmentKey key, Command cmd) {
		try {
			cmdQueue.put(new KeyCommand(key, cmd));
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	
	public DataSegmentKey getDataSegmentKey(String key) {
		DataSegmentKey dsKey = dataSegments.get(key);
		if (dsKey != null)
			return dsKey;
		if (key == null)
			return null;
		DataSegmentKey newDataSegmentKey = new DataSegmentKey();
		DataSegmentKey dataSegmentKey = dataSegments.putIfAbsent(key, newDataSegmentKey);
		if (dataSegmentKey == null) {
			dataSegmentKey = newDataSegmentKey;
		}
		return dataSegmentKey;
	}
	
	@Override
	public void put(String key, Value val) {
		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
		Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, replyQueue, null, reverseKey);
		addCommand(dataSegmentKey, cmd);
		if (logger.isDebugEnabled())
			logger.debug(cmd.getCommandString());
	}
	
	public void put(String key, Object obj) {
		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
		Command cmd = new Command(CommandType.PUT, null, key, obj, 0, 0, replyQueue, null, reverseKey);
		addCommand(dataSegmentKey, cmd);
		if (logger.isDebugEnabled())
			logger.debug(cmd.getCommandString());
	}
	
	/**
	 * Enqueue update command to the queue of each DataSegment key
	 */
	@Override
	public void update(String key, Value val) {
		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
		Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, replyQueue, null, reverseKey);
		addCommand(dataSegmentKey, cmd);
		if (logger.isDebugEnabled())
			logger.debug(cmd.getCommandString());
	}
	
	public void update(String key, Object val) {
		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
		Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, replyQueue, null, reverseKey);
		addCommand(dataSegmentKey, cmd);
		if (logger.isDebugEnabled())
			logger.debug(cmd.getCommandString());
	}

	@Override
	public void take(Receiver receiver, String key, int index, CodeSegment cs) {
		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
		int seq = this.seq.getAndIncrement();
		Command cmd = new Command(CommandType.TAKE, receiver, key, null, index, seq, replyQueue, cs, null);
		seqHash.put(seq, cmd);
		addCommand(dataSegmentKey, cmd);
		if (logger.isDebugEnabled())
			logger.debug(cmd.getCommandString());
	}

	@Override
	public void peek(Receiver receiver, String key, int index, CodeSegment cs) {
		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
		int seq = this.seq.getAndIncrement();
		Command cmd = new Command(CommandType.PEEK, receiver, key, null, index, seq, replyQueue, cs, null);
		seqHash.put(seq, cmd); // waiting for PUT or UPDATE at unique sequence number
		addCommand(dataSegmentKey, cmd);
		if (logger.isDebugEnabled())
			logger.debug(cmd.getCommandString());
	}

	@Override
	public void remove(String key) {
		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
		Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, replyQueue, null, null);
		addCommand(dataSegmentKey, cmd);
		if (logger.isDebugEnabled())
			logger.debug(cmd.getCommandString());
	}
	
	@Override public void finish() {
		System.exit(0);
	}

	@Override
	public void close() {
		
	}

	public void flip(String key ,Value val, Object obj, DataSegmentValue dsv) {
		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
		int index = dataSegmentKey.getIndex();
		dsv.setValue(index, val, obj);
		Command cmd = new Command(CommandType.FLIP, 0, dsv);
		//addCommand(dataSegmentKey, cmd);
		if (logger.isDebugEnabled())
			logger.debug(cmd.getCommandString());
	}
	
}