view src/alice/datasegment/DataSegmentKey.java @ 232:496b687f3793

peek or take command from local no use Reply Thread
author sugi
date Fri, 29 Mar 2013 18:32:27 +0900
parents bc061ee5f31f
children 4dc2f09a8f7a
line wrap: on
line source

package alice.datasegment;

import java.util.ArrayList;
import java.util.Iterator;

import alice.datasegment.Command; 

/**
 * Synchronized DataSegment for each DataSegment key
 * @author kazz
 *
 */
public class DataSegmentKey {
	
	private ArrayList<DataSegmentValue> dataList = new ArrayList<DataSegmentValue>();
	private ArrayList<Command> waitList = new ArrayList<Command>();
	private int tailIndex = 1;
	
	public synchronized void runCommand(Command cmd) {
		switch (cmd.type) {
		case UPDATE:
			if (dataList.size() != 0) {
				dataList.remove(0);
			}
		case PUT:
			int index = tailIndex;
			tailIndex++;
			DataSegmentValue dsv = new DataSegmentValue(index, cmd.val, cmd.obj,cmd.reverseKey); 
			dataList.add(dsv);
			// Process waiting peek and take commands
			for (Iterator<Command> iter = waitList.iterator(); iter.hasNext(); ) {
				Command waitCmd = iter.next();
				if (waitCmd.index < index) {
					try {
						waitCmd.replyQueue.put(new Command(CommandType.REPLY, null, null, cmd.val, cmd.obj, index, waitCmd.seq, null, null, cmd.reverseKey));
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					iter.remove();
					if (waitCmd.type == CommandType.TAKE) { // someone is waiting for this put or update command
						dataList.remove(dsv);
						break;
					}
				}
			}
			break;
		case PEEK:
			if (cmd.index >= tailIndex) {
				waitList.add(cmd);
				break;
			}
			boolean waitFlag2 = true;
			for (DataSegmentValue data : dataList) {
				if (data.index > cmd.index) {
					replyValue(cmd ,data);
					waitFlag2 = false;
					break;
				}
			}
			if (waitFlag2)
				waitList.add(cmd);
			break;
		case TAKE:
			if (cmd.index >= tailIndex) {
				waitList.add(cmd);
				break;
			}
			boolean waitFlag = true;
			for (Iterator<DataSegmentValue> iter = dataList.iterator(); iter.hasNext(); ) {
				DataSegmentValue data = iter.next();
				if (data.index > cmd.index) {
					replyValue(cmd ,data);
					iter.remove();
					waitFlag = false;
					break;
				}
			}
			if (waitFlag)
				waitList.add(cmd);
			break;
		case REMOVE:
			// TODO: implements later
			break;
		default:
		}

	}
	
	public void replyValue(Command cmd, DataSegmentValue data){
		if (cmd.cs!=null){ // if cmd has cs-instance it means Command from local.
			cmd.cs.ids.reply(cmd.receiver, new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, 0, null, null, data.from));
			DataSegment.getLocal().getAndRemoveCmd(cmd.seq);
			cmd = null; // someone has been holding instance.
		} else {
			try {
				cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, cmd.seq, null, null, data.from));
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
	
}