view src/main/java/alice/datasegment/LocalDataSegmentManager.java @ 350:388e7d4b0624

heart beat may finish.
author sugi
date Tue, 22 Apr 2014 04:21:22 +0900
parents 8f71c3e6f11d
children abc54fa0c81b
line wrap: on
line source

package alice.datasegment;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.log4j.Logger;

import alice.codesegment.CodeSegment;

public class LocalDataSegmentManager extends DataSegmentManager {
	
	private String reverseKey = "local";
	private ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>();
	private Logger logger = Logger.getLogger("local");

	private ThreadPoolExecutor dataSegmentExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), // initial number of threads
			Runtime.getRuntime().availableProcessors(),
			Integer.MAX_VALUE, // keepAliveTime
			TimeUnit.SECONDS,
			new LinkedBlockingQueue<Runnable>());
	
	public LocalDataSegmentManager() {
		new Thread(replyThread, "LocalDataSegmentManager-replyCommand").start();
	}

	private class RunCommand implements Runnable {
		
		DataSegmentKey key;
		Command cmd;
		
		public RunCommand(DataSegmentKey key, Command cmd) {
			this.key = key;
			this.cmd = cmd;
		}
		
		@Override
		public void run() {
			key.runCommand(cmd);
		}
		
	}
	
	public void submitCommand(DataSegmentKey key, Command cmd) {
		dataSegmentExecutor.execute(new RunCommand(key, cmd));
	}
	
	public DataSegmentKey getDataSegmentKey(String key) {
		DataSegmentKey dsKey = dataSegments.get(key);
		if (dsKey != null)
			return dsKey;
		if (key == null)
			return null;
		DataSegmentKey newDataSegmentKey = new DataSegmentKey();
		DataSegmentKey dataSegmentKey = dataSegments.putIfAbsent(key, newDataSegmentKey);
		if (dataSegmentKey == null) {
			dataSegmentKey = newDataSegmentKey;
		}
		return dataSegmentKey;
	}
	
	@Override
	public void put(String key, Object val) {
		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
		Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, reverseKey);
		dataSegmentKey.runCommand(cmd);
		if (logger.isDebugEnabled())
			logger.debug(cmd.getCommandString());
	}
	
	@Override
	public void quickPut(String key, Object val) {
		put(key, val);
	}
	
	/**
	 * Enqueue update command to the queue of each DataSegment key
	 */
	
	@Override
	public void update(String key, Object val) {
		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
		Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, reverseKey);
		dataSegmentKey.runCommand(cmd);
		if (logger.isDebugEnabled())
			logger.debug(cmd.getCommandString());
	}
	
	
	@Override
	public void quickUpdate(String key, Object val) {
		update(key, val);
	}

	
	
	@Override
	public void take(Receiver receiver, CodeSegment cs) {
		DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key);
		int seq = this.seq.getAndIncrement();
		Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
		dataSegmentKey.runCommand(cmd);
		if (logger.isDebugEnabled())
			logger.debug(cmd.getCommandString());
	}

	@Override
	public void quickTake(Receiver receiver, CodeSegment cs) {
		take(receiver, cs);		
	}
	
	@Override
	public void peek(Receiver receiver, CodeSegment cs) {
		DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key);
		int seq = this.seq.getAndIncrement();
		Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
		dataSegmentKey.runCommand(cmd);
		if (logger.isDebugEnabled())
			logger.debug(cmd.getCommandString());
	}
	
	@Override
	public void quickPeek(Receiver receiver, CodeSegment cs) {
		peek(receiver, cs);
	}


	@Override
	public void remove(String key) {
		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
		Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, replyQueue, null, null);
		dataSegmentKey.runCommand(cmd);
		if (logger.isDebugEnabled())
			logger.debug(cmd.getCommandString());
	}
	
	@Override public void finish() {
		System.exit(0);
	}

	@Override
	public void close() {
		
	}

	public void recommand(Receiver receiver, CodeSegment cs) {
		DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key);
		int seq = this.seq.getAndIncrement();
		Command cmd = new Command(receiver.type, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
		dataSegmentKey.runCommand(cmd);
		if (logger.isDebugEnabled())
			logger.debug(cmd.getCommandString());
		
	}

	@Override
	public void ping(String returnKey) {
		
	}

	@Override
	public void response(String returnKey) {
				
	}

	@Override
	public void shutdown() {
		
	}

}