view src/alice/datasegment/DataSegmentManager.java @ 14:e3f1b21718b0

implements RemoteDataSegment
author kazz <kazz@cr.ie.u-ryukyu.ac.jp>
date Sun, 15 Jan 2012 00:56:25 +0900
parents 30f97d776a3e
children 72dd27d952b0
line wrap: on
line source

package alice.datasegment;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

import org.msgpack.type.Value;

import alice.codesegment.CodeSegment;

public abstract class DataSegmentManager {
	
	protected ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>();
	protected ConcurrentHashMap<Integer, Command> seqHash = new ConcurrentHashMap<Integer, Command>();
	public LinkedBlockingQueue<Command> replyQueue = new LinkedBlockingQueue<Command>();
	protected AtomicInteger seq = new AtomicInteger(1);
	protected Runnable replyThread = new Runnable() {

		@Override
		public void run() {
			while (true) {
				try {
					Command reply = replyQueue.take();
					Command cmd = seqHash.get(reply.seq);
					cmd.cs.ids.reply(cmd.argKey, new DataSegmentValue(reply.index, reply.val));
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}
		
	};
	
	public abstract void put(String key, Value val);
	public abstract void update(String key, Value val);
	public void take(String argKey, String key, CodeSegment cs) {
		take(argKey, key, 0, cs);
	}
	public abstract void take(String argKey, String key, int index, CodeSegment cs);
	public void peek(String argKey, String key, CodeSegment cs) {
		peek(argKey, key, 0, cs);
	}
	public abstract void peek(String argKey, String key, int index, CodeSegment cs);
	public abstract void remove(String key);
	
}