view src/alice/datasegment/LocalDataSegmentManager.java @ 8:78b415d019de

Local DS and CS work! maybe...
author one
date Thu, 12 Jan 2012 16:02:28 +0900
parents 352eb19d837d
children c4d6ff56b9bf
line wrap: on
line source

package alice.datasegment;

import java.util.concurrent.atomic.AtomicInteger;

import org.msgpack.type.Value;

import alice.codesegment.CodeSegment;
import alice.datasegment.CommandType;

public class LocalDataSegmentManager extends DataSegmentManager {
	
	private AtomicInteger seq = new AtomicInteger(1);
	private Runnable replyThread = new Runnable() {

		@Override
		public void run() {
			while (true) {
				try {
					Reply reply = replyQueue.take();
					Command cmd = seqHash.get(reply.seq);
					cmd.cs.ids.reply(cmd.argKey, new DataSegmentValue(reply.index, reply.val));
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}
		
	};
	public LocalDataSegmentManager() {
		new Thread(replyThread).start();
	}
	
	private DataSegmentKey getDataSegmentKey(String key) {
		DataSegmentKey newDataSegmentKey = new DataSegmentKey();
		DataSegmentKey dataSegmentKey = dataSegments.putIfAbsent(key, newDataSegmentKey);
		if (dataSegmentKey == null) {
			newDataSegmentKey.runKeyThread();
			dataSegmentKey = newDataSegmentKey;
		}
		return dataSegmentKey;
	}
	
	@Override
	public void put(String key, Value val) {
		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
		dataSegmentKey.addCommand(new Command(CommandType.PUT, null, val, 0, 0, this, null));
	}

	@Override
	public void update(String key, Value val) {
		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
		dataSegmentKey.addCommand(new Command(CommandType.UPDATE, null, val, 0, 0, this, null));
	}

	@Override
	public void take(String argKey, String key, int index, CodeSegment cs) {
		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
		int seq = this.seq.getAndIncrement();
		Command cmd = new Command(CommandType.TAKE, argKey, null, index, seq, this, cs);
		seqHash.put(seq, cmd);
		dataSegmentKey.addCommand(cmd);
	}

	@Override
	public void peek(String argKey, String key, int index, CodeSegment cs) {
		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
		int seq = this.seq.getAndIncrement();
		Command cmd = new Command(CommandType.PEEK, argKey, null, index, seq, this, cs);
		seqHash.put(seq, cmd);
		dataSegmentKey.addCommand(cmd);
	}

	@Override
	public void remove(String key) {
		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
		dataSegmentKey.addCommand(new Command(CommandType.REMOVE, null, null, 0, 0, this, null));
	}

}