345
|
1 package alice.datasegment;
|
|
2
|
|
3 import java.util.ArrayList;
|
|
4 import java.util.Iterator;
|
|
5
|
|
6 import alice.datasegment.Command;
|
|
7
|
|
8 /**
|
|
9 * Synchronized DataSegment for each DataSegment key
|
|
10 * @author kazz
|
|
11 *
|
|
12 */
|
|
13 public class DataSegmentKey {
|
|
14
|
|
15 private ArrayList<DataSegmentValue> dataList = new ArrayList<DataSegmentValue>();
|
|
16 private ArrayList<Command> waitList = new ArrayList<Command>();
|
|
17 private int tailIndex = 1;
|
|
18
|
|
19 public synchronized void runCommand(Command cmd) {
|
|
20 switch (cmd.type) {
|
|
21 case UPDATE:
|
|
22 if (dataList.size() != 0) {
|
|
23 dataList.remove(0);
|
|
24 }
|
|
25 case PUT:
|
|
26 int index = tailIndex;
|
|
27 tailIndex++;
|
|
28 DataSegmentValue dsv = new DataSegmentValue(index, cmd.val, cmd.obj, cmd.reverseKey);
|
|
29 dataList.add(dsv);
|
|
30 // Process waiting peek and take commands
|
|
31 for (Iterator<Command> iter = waitList.iterator(); iter.hasNext(); ) {
|
|
32 Command waitCmd = iter.next();
|
|
33 if (waitCmd.index < index) {
|
|
34 replyValue(waitCmd ,dsv);
|
|
35 iter.remove();
|
|
36 if (waitCmd.type == CommandType.TAKE) { // someone is waiting for this put or update command
|
|
37 dataList.remove(dsv);
|
|
38 break;
|
|
39 }
|
|
40 }
|
|
41 }
|
|
42 break;
|
|
43 case PEEK:
|
|
44 if (cmd.index >= tailIndex) {
|
|
45 waitList.add(cmd);
|
|
46 break;
|
|
47 }
|
|
48 boolean waitFlag2 = true;
|
|
49 for (DataSegmentValue data : dataList) {
|
|
50 if (data.index > cmd.index) {
|
|
51 replyValue(cmd ,data);
|
|
52 waitFlag2 = false;
|
|
53 break;
|
|
54 }
|
|
55 }
|
|
56 if (waitFlag2)
|
|
57 waitList.add(cmd);
|
|
58 break;
|
|
59 case TAKE:
|
|
60 if (cmd.index >= tailIndex) {
|
|
61 waitList.add(cmd);
|
|
62 break;
|
|
63 }
|
|
64 boolean waitFlag = true;
|
|
65 for (Iterator<DataSegmentValue> iter = dataList.iterator(); iter.hasNext(); ) {
|
|
66 DataSegmentValue data = iter.next();
|
|
67 if (data.index > cmd.index) {
|
|
68 replyValue(cmd ,data);
|
|
69 iter.remove();
|
|
70 waitFlag = false;
|
|
71 break;
|
|
72 }
|
|
73 }
|
|
74 if (waitFlag)
|
|
75 waitList.add(cmd);
|
|
76 break;
|
|
77 case REMOVE:
|
|
78 // TODO: implements later
|
|
79 break;
|
|
80 default:
|
|
81 }
|
|
82
|
|
83 }
|
|
84
|
|
85 public void replyValue(Command cmd, DataSegmentValue data){
|
|
86 if (cmd.cs!=null){ // if cmd has cs-instance, it means Command from local.
|
|
87 cmd.cs.ids.reply(cmd.receiver, new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, cmd.seq, null, null, data.from));
|
|
88 } else {
|
|
89 try {
|
|
90 if (!cmd.flag){
|
|
91 cmd.connection.sendQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, cmd.seq, null, null, data.from));
|
|
92 }
|
|
93 else {
|
|
94 cmd.connection.write(new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, cmd.seq, null, null, data.from));
|
|
95 }
|
|
96
|
|
97 } catch (InterruptedException e) {
|
|
98 e.printStackTrace();
|
|
99 }
|
|
100 }
|
|
101 }
|
|
102
|
|
103 }
|