view src/alice/datasegment/DataSegmentManager.java @ 232:496b687f3793

peek or take command from local no use Reply Thread
author sugi
date Fri, 29 Mar 2013 18:32:27 +0900
parents 96110f25adcc
children ca1c9c477f54
line wrap: on
line source

package alice.datasegment;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.log4j.Logger;
import org.msgpack.type.Value;

import alice.codesegment.CodeSegment;

public abstract class DataSegmentManager {
	
	protected ConcurrentHashMap<Integer, Command> seqHash = new ConcurrentHashMap<Integer, Command>(); //TODO Over Head
	protected LinkedBlockingQueue<Command> replyQueue = new LinkedBlockingQueue<Command>();
	protected AtomicInteger seq = new AtomicInteger(1);
	
	protected Runnable replyThread = new Runnable() {
		Logger logger = Logger.getLogger("reply");
		@Override
		public void run() {
			while (true) {
				try {
					Command reply = replyQueue.take();
					Command cmd = getAndRemoveCmd(reply.seq);
					if (cmd == null) {
						logger.warn("conflict sequence number");
						continue;
					}
					cmd.cs.ids.reply(cmd.receiver, reply);
					if (logger.isDebugEnabled())
						logger.debug(reply.getCommandString() + " " + cmd.getCommandString());
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}
		
	};
	
	public Command getAndRemoveCmd(int index){
		return seqHash.remove(index);
	}
	
	public void addReplyCommand(Command cmd) {
		try {
			replyQueue.put(cmd);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	
	public abstract void put(String key, Value val);
	public abstract void update(String key, Value val);
	public void take(Receiver receiver, String key, CodeSegment cs) {
		take(receiver, key, 0, cs);
	}
	public abstract void take(Receiver receiver, String key, int index, CodeSegment cs);
	public void peek(Receiver receiver, String key, CodeSegment cs) {
		peek(receiver, key, 0, cs);
	}
	public abstract void peek(Receiver receiver, String key, int index, CodeSegment cs);
	public abstract void remove(String key);
	public abstract void close();
	public abstract void finish();
	
}