view src/alice/datasegment/DataSegmentKey.java @ 14:e3f1b21718b0

implements RemoteDataSegment
author kazz <kazz@cr.ie.u-ryukyu.ac.jp>
date Sun, 15 Jan 2012 00:56:25 +0900
parents 30f97d776a3e
children bb075e103cd3
line wrap: on
line source

package alice.datasegment;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

import alice.datasegment.Command; 

public class DataSegmentKey {
	
	private LinkedBlockingQueue<Command> cmdQueue = new LinkedBlockingQueue<Command>();
	private ArrayList<DataSegmentValue> dataList = new ArrayList<DataSegmentValue>();
	private ArrayList<Command> waitList = new ArrayList<Command>();
	private AtomicInteger tailIndex = new AtomicInteger(1);
	private Runnable keyThread;
	
	public DataSegmentKey() {
		
	}
	
	public void addCommand(Command cmd) {
		cmdQueue.add(cmd);
	}
	
	public void runKeyThread() {
		keyThread = new Runnable() {
			@Override
			public void run() {
				while (true) {
					try {
						Command cmd = cmdQueue.take();
						switch (cmd.type) {
						case UPDATE:
							if (dataList.size() != 0) {
								dataList.remove(0);
							}
						case PUT:
							int index = tailIndex.getAndIncrement();
							DataSegmentValue dsv = new DataSegmentValue(index, cmd.val); 
							dataList.add(dsv);
							// run waiting peek and take
							LinkedList<Command> removeList = new LinkedList<Command>();
							for (Command waitCmd : waitList) {
								if (waitCmd.index < index) {
									waitCmd.replyQueue.put(new Command(CommandType.REPLY, null, null, cmd.val, index, waitCmd.seq, null, null));
									removeList.add(waitCmd);
									if (waitCmd.type == CommandType.TAKE) { // delete data, if it run take cmd.
										dataList.remove(dsv);
										break;
									}
								}
							}
							for (Command rmCmd : removeList) {
								waitList.remove(rmCmd);
							}
							break;
						case PEEK:
							if (cmd.index >= tailIndex.get()) {
								waitList.add(cmd);
								break;
							}
							for (DataSegmentValue data : dataList) {
								if (data.index > cmd.index) {
									cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null));
									break;
								}
							}
							waitList.add(cmd);
							break;
						case TAKE:
							if (cmd.index >= tailIndex.get()) {
								waitList.add(cmd);
								break;
							}
							for (DataSegmentValue data : dataList) {
								if (data.index > cmd.index) {
									cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null));
									dataList.remove(data);
									break;
								}
							}
							waitList.add(cmd);
							break;
						case REMOVE:
							// TODO: implements later
							break;
						default:
						}
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}
		};
		new Thread(keyThread).start();
	};
	
}