2
|
1 package alice.datasegment;
|
|
2
|
3
|
3 import java.util.ArrayList;
|
|
4 import java.util.concurrent.LinkedBlockingQueue;
|
|
5 import java.util.concurrent.atomic.AtomicInteger;
|
|
6
|
|
7 import alice.datasegment.Command;
|
|
8
|
2
|
9 public class DataSegmentKey {
|
|
10
|
3
|
11 private LinkedBlockingQueue<Command> cmdQueue = new LinkedBlockingQueue<Command>();
|
|
12 private ArrayList<DataSegmentValue> dataList = new ArrayList<DataSegmentValue>();
|
|
13 private ArrayList<Command> waitList = new ArrayList<Command>();
|
|
14 private AtomicInteger tailIndex = new AtomicInteger(1);
|
|
15 private Runnable keyThread;
|
|
16
|
|
17 public DataSegmentKey() {
|
|
18
|
|
19 }
|
|
20
|
|
21 public void addCommand(Command cmd) {
|
|
22 cmdQueue.add(cmd);
|
|
23 }
|
|
24
|
|
25 public void runKeyThread() {
|
|
26 keyThread = new Runnable() {
|
|
27 @Override
|
|
28 public void run() {
|
|
29 while (true) {
|
|
30 try {
|
|
31 Command cmd = cmdQueue.take();
|
|
32 switch (cmd.cmdType) {
|
5
|
33 case UPDATE:
|
|
34 if (dataList.size() != 0) {
|
|
35 dataList.remove(0);
|
|
36 }
|
3
|
37 case PUT:
|
|
38 int index = tailIndex.getAndIncrement();
|
6
|
39 DataSegmentValue dsv = new DataSegmentValue(index, cmd.val);
|
|
40 dataList.add(dsv);
|
3
|
41 // run waiting peek and take
|
|
42 for (Command waitCmd : waitList) {
|
|
43 if (waitCmd.index < index) {
|
6
|
44 waitCmd.manager.replyQueue.put(new Reply(waitCmd.seq, index, cmd.val));
|
|
45 if (waitCmd.cmdType == CommandType.TAKE) { // delete data, if it run take cmd.
|
|
46 dataList.remove(dsv);
|
|
47 break;
|
|
48 }
|
3
|
49 }
|
|
50 }
|
|
51 break;
|
|
52 case PEEK:
|
|
53 if (cmd.index >= tailIndex.get()) {
|
|
54 waitList.add(cmd);
|
|
55 break;
|
|
56 }
|
|
57 for (DataSegmentValue data : dataList) {
|
|
58 if (data.index > cmd.index) {
|
6
|
59 cmd.manager.replyQueue.put(new Reply(cmd.seq, data.index, data.val));
|
3
|
60 break;
|
|
61 }
|
|
62 }
|
6
|
63 waitList.add(cmd);
|
3
|
64 break;
|
|
65 case TAKE:
|
|
66 if (cmd.index >= tailIndex.get()) {
|
|
67 waitList.add(cmd);
|
|
68 break;
|
|
69 }
|
|
70 for (DataSegmentValue data : dataList) {
|
|
71 if (data.index > cmd.index) {
|
6
|
72 cmd.manager.replyQueue.put(new Reply(cmd.seq, data.index, data.val));
|
3
|
73 dataList.remove(data);
|
|
74 break;
|
|
75 }
|
|
76 }
|
6
|
77 waitList.add(cmd);
|
3
|
78 break;
|
|
79 case REMOVE:
|
|
80 // TODO: implements later
|
|
81 break;
|
|
82 default:
|
|
83 }
|
|
84 } catch (InterruptedException e) {
|
|
85 e.printStackTrace();
|
|
86 }
|
|
87 }
|
|
88 }
|
|
89 };
|
6
|
90 new Thread(keyThread).start();
|
3
|
91 };
|
|
92
|
2
|
93 }
|