view src/alice/datasegment/DataSegmentKey.java @ 184:4475ba30238f working

minor change
author e095732
date Wed, 27 Feb 2013 06:30:14 +0900
parents 52a1fa5ba38b
children 91b6251ef6c8
line wrap: on
line source

package alice.datasegment;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;

import alice.datasegment.Command; 

/**
 * Synchronized DataSegment for each DataSegment key
 * @author kazz
 *
 */
public class DataSegmentKey {
	
	private ArrayList<DataSegmentValue> dataList = new ArrayList<DataSegmentValue>();
	private ArrayList<Command> waitList = new ArrayList<Command>();
	private AtomicInteger tailIndex = new AtomicInteger(1);
	
	public AtomicInteger getTailIndex(){
		return tailIndex;
	}
	
	public ArrayList<DataSegmentValue> getDataList(){
		return dataList;
	}
	
	public void runCommand(Command cmd) {
		switch (cmd.type) {
		case UPDATE:
			if (dataList.size() != 0) {
				dataList.remove(0);
			}
		case PUT:
			int index = tailIndex.getAndIncrement();
			DataSegmentValue dsv = new DataSegmentValue(index, cmd.val, cmd.reverseKey); 
			dataList.add(dsv);
			// Process waiting peek and take commands
			for (Iterator<Command> iter = waitList.iterator(); iter.hasNext(); ) {
				Command waitCmd = iter.next();
				if (waitCmd.index < index) {
					try {
						waitCmd.replyQueue.put(new Command(CommandType.REPLY, null, null, cmd.val, index, waitCmd.seq, null, null, cmd.reverseKey));
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					iter.remove();
					if (waitCmd.type == CommandType.TAKE) { // someone is waiting for this put or update command
						dataList.remove(dsv);
						break;
					}
				}
			}
			break;
		case PEEK:
			if (cmd.index >= tailIndex.get()) {
				waitList.add(cmd);
				break;
			}
			boolean waitFlag2 = true;
			for (DataSegmentValue data : dataList) {
				if (data.index > cmd.index) {
					try {
						cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null, data.from));
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					waitFlag2 = false;
					break;
				}
			}
			if (waitFlag2)
				waitList.add(cmd);
			break;
		case TAKE:
			if (cmd.index >= tailIndex.get()) {
				waitList.add(cmd);
				break;
			}
			boolean waitFlag = true;
			
			synchronized(dataList){
			for (Iterator<DataSegmentValue> iter = dataList.iterator(); iter.hasNext(); ) {
				DataSegmentValue data = iter.next();
				if (data.index > cmd.index) {
					try {
						cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null, data.from));
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					iter.remove();
					waitFlag = false;
					break;
				}
			}
			}
			if (waitFlag)
				waitList.add(cmd);
			break;
		case FLIP:
			break;
		case REMOVE:
			// TODO: implements later
			break;
		default:
		}

	}
	
}