diff src/main/java/alice/datasegment/DataSegmentKey.java @ 345:8f71c3e6f11d

Change directory structure Maven standard
author sugi
date Wed, 16 Apr 2014 18:26:07 +0900
parents
children aefbe41fcf12
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/alice/datasegment/DataSegmentKey.java	Wed Apr 16 18:26:07 2014 +0900
@@ -0,0 +1,103 @@
+package alice.datasegment;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import alice.datasegment.Command; 
+
+/**
+ * Synchronized DataSegment for each DataSegment key
+ * @author kazz
+ *
+ */
+public class DataSegmentKey {
+	
+	private ArrayList<DataSegmentValue> dataList = new ArrayList<DataSegmentValue>();
+	private ArrayList<Command> waitList = new ArrayList<Command>();
+	private int tailIndex = 1;
+	
+	public synchronized void runCommand(Command cmd) {
+		switch (cmd.type) {
+		case UPDATE:
+			if (dataList.size() != 0) {
+				dataList.remove(0);
+			}
+		case PUT:
+			int index = tailIndex;
+			tailIndex++;
+			DataSegmentValue dsv = new DataSegmentValue(index, cmd.val, cmd.obj, cmd.reverseKey); 
+			dataList.add(dsv);
+			// Process waiting peek and take commands
+			for (Iterator<Command> iter = waitList.iterator(); iter.hasNext(); ) {
+				Command waitCmd = iter.next();
+				if (waitCmd.index < index) {
+					replyValue(waitCmd ,dsv);
+					iter.remove();
+					if (waitCmd.type == CommandType.TAKE) { // someone is waiting for this put or update command
+						dataList.remove(dsv);
+						break;
+					}
+				}
+			}
+			break;
+		case PEEK:
+			if (cmd.index >= tailIndex) {
+				waitList.add(cmd);
+				break;
+			}
+			boolean waitFlag2 = true;
+			for (DataSegmentValue data : dataList) {
+				if (data.index > cmd.index) {
+					replyValue(cmd ,data);
+					waitFlag2 = false;
+					break;
+				}
+			}
+			if (waitFlag2)
+				waitList.add(cmd);
+			break;
+		case TAKE:
+			if (cmd.index >= tailIndex) {
+				waitList.add(cmd);
+				break;
+			}
+			boolean waitFlag = true;
+			for (Iterator<DataSegmentValue> iter = dataList.iterator(); iter.hasNext(); ) {
+				DataSegmentValue data = iter.next();
+				if (data.index > cmd.index) {
+					replyValue(cmd ,data);
+					iter.remove();
+					waitFlag = false;
+					break;
+				}
+			}
+			if (waitFlag)
+				waitList.add(cmd);
+			break;
+		case REMOVE:
+			// TODO: implements later
+			break;
+		default:
+		}
+
+	}
+	
+	public void replyValue(Command cmd, DataSegmentValue data){
+		if (cmd.cs!=null){ // if cmd has cs-instance, it means Command from local.
+			cmd.cs.ids.reply(cmd.receiver, new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, cmd.seq, null, null, data.from));
+		} else {
+			try {
+				if (!cmd.flag){ 
+					cmd.connection.sendQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, cmd.seq, null, null, data.from));
+				}
+				else {
+					cmd.connection.write(new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, cmd.seq, null, null, data.from));
+				}
+				
+			} catch (InterruptedException e) {
+				e.printStackTrace();
+			}
+		}
+	}
+	
+}