Mercurial > hg > Database > Alice
annotate src/alice/datasegment/DataSegmentKey.java @ 251:88be2824a989
no use Queue
author | sugi |
---|---|
date | Mon, 17 Jun 2013 18:56:54 +0900 |
parents | ca1c9c477f54 |
children | 4fe924c9f504 |
rev | line source |
---|---|
2 | 1 package alice.datasegment; |
2 | |
3 | 3 import java.util.ArrayList; |
17 | 4 import java.util.Iterator; |
3 | 5 |
6 import alice.datasegment.Command; | |
7 | |
57 | 8 /** |
9 * Synchronized DataSegment for each DataSegment key | |
10 * @author kazz | |
11 * | |
12 */ | |
2 | 13 public class DataSegmentKey { |
14 | |
3 | 15 private ArrayList<DataSegmentValue> dataList = new ArrayList<DataSegmentValue>(); |
16 private ArrayList<Command> waitList = new ArrayList<Command>(); | |
73
4bfd81352cfa
change to concurrent data segment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
69
diff
changeset
|
17 private int tailIndex = 1; |
3 | 18 |
73
4bfd81352cfa
change to concurrent data segment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
69
diff
changeset
|
19 public synchronized void runCommand(Command cmd) { |
69 | 20 switch (cmd.type) { |
21 case UPDATE: | |
22 if (dataList.size() != 0) { | |
23 dataList.remove(0); | |
24 } | |
25 case PUT: | |
225 | 26 int index = tailIndex; |
27 tailIndex++; | |
190 | 28 DataSegmentValue dsv = new DataSegmentValue(index, cmd.val, cmd.obj,cmd.reverseKey); |
69 | 29 dataList.add(dsv); |
30 // Process waiting peek and take commands | |
31 for (Iterator<Command> iter = waitList.iterator(); iter.hasNext(); ) { | |
32 Command waitCmd = iter.next(); | |
33 if (waitCmd.index < index) { | |
235 | 34 replyValue(waitCmd ,dsv); |
69 | 35 iter.remove(); |
36 if (waitCmd.type == CommandType.TAKE) { // someone is waiting for this put or update command | |
37 dataList.remove(dsv); | |
38 break; | |
39 } | |
40 } | |
41 } | |
42 break; | |
43 case PEEK: | |
73
4bfd81352cfa
change to concurrent data segment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
69
diff
changeset
|
44 if (cmd.index >= tailIndex) { |
69 | 45 waitList.add(cmd); |
46 break; | |
47 } | |
48 boolean waitFlag2 = true; | |
49 for (DataSegmentValue data : dataList) { | |
50 if (data.index > cmd.index) { | |
232 | 51 replyValue(cmd ,data); |
69 | 52 waitFlag2 = false; |
53 break; | |
3 | 54 } |
55 } | |
69 | 56 if (waitFlag2) |
57 waitList.add(cmd); | |
58 break; | |
59 case TAKE: | |
73
4bfd81352cfa
change to concurrent data segment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
69
diff
changeset
|
60 if (cmd.index >= tailIndex) { |
69 | 61 waitList.add(cmd); |
62 break; | |
63 } | |
64 boolean waitFlag = true; | |
65 for (Iterator<DataSegmentValue> iter = dataList.iterator(); iter.hasNext(); ) { | |
66 DataSegmentValue data = iter.next(); | |
67 if (data.index > cmd.index) { | |
232 | 68 replyValue(cmd ,data); |
69 | 69 iter.remove(); |
70 waitFlag = false; | |
71 break; | |
72 } | |
73 } | |
74 if (waitFlag) | |
75 waitList.add(cmd); | |
76 break; | |
77 case REMOVE: | |
78 // TODO: implements later | |
79 break; | |
80 default: | |
81 } | |
82 | |
20 | 83 } |
3 | 84 |
232 | 85 public void replyValue(Command cmd, DataSegmentValue data){ |
235 | 86 if (cmd.cs!=null){ // if cmd has cs-instance, it means Command from local. |
87 cmd.cs.ids.reply(cmd.receiver, new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, cmd.seq, null, null, data.from)); | |
232 | 88 cmd = null; // someone has been holding instance. |
89 } else { | |
90 try { | |
251 | 91 if (!cmd.flag){ |
92 cmd.connection.sendQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, cmd.seq, null, null, data.from)); | |
93 } | |
94 else { | |
95 cmd.connection.write(new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, cmd.seq, null, null, data.from)); | |
96 cmd = null; | |
97 } | |
98 | |
232 | 99 } catch (InterruptedException e) { |
100 e.printStackTrace(); | |
101 } | |
102 } | |
103 } | |
104 | |
2 | 105 } |