Mercurial > hg > Database > Alice
annotate src/main/java/alice/codesegment/InputDataSegment.java @ 455:b004f62b83e5 dispose
refactor (remove quick method from DataSegmentManager and use flag)
author | sugi |
---|---|
date | Sun, 02 Nov 2014 18:07:43 +0900 |
parents | f68d103498e0 |
children | bcf6f4a6fcd0 |
rev | line source |
---|---|
345 | 1 package alice.codesegment; |
2 | |
3 import java.util.concurrent.atomic.AtomicInteger; | |
4 | |
5 import alice.datasegment.Command; | |
6 import alice.datasegment.CommandType; | |
7 import alice.datasegment.DataSegment; | |
452 | 8 import alice.datasegment.ReceiveData; |
345 | 9 import alice.datasegment.Receiver; |
455
b004f62b83e5
refactor (remove quick method from DataSegmentManager and use flag)
sugi
parents:
452
diff
changeset
|
10 import alice.datasegment.SendOption; |
345 | 11 |
12 /** | |
13 * InputDataSegment Manager | |
14 * keep tracking unbound/bound count | |
15 * @author kazz | |
16 * | |
17 */ | |
18 public class InputDataSegment { | |
417 | 19 |
20 public CodeSegment cs; | |
21 private AtomicInteger count = new AtomicInteger(1); // 1 for no input data segments | |
22 private AtomicInteger keyCount = new AtomicInteger(0); // number of DataSegments | |
23 public InputDataSegment(CodeSegment cs) { | |
24 this.cs = cs; | |
25 } | |
26 | |
27 public void init(){ | |
28 count = new AtomicInteger(1); | |
29 keyCount = new AtomicInteger(0); | |
30 } | |
31 | |
32 public void quickPeek(Receiver receiver) { | |
33 cs.list.add(receiver); | |
34 if (receiver.managerKey==null){ | |
455
b004f62b83e5
refactor (remove quick method from DataSegmentManager and use flag)
sugi
parents:
452
diff
changeset
|
35 DataSegment.getLocal().peek(receiver, cs, null); |
417 | 36 } else { |
455
b004f62b83e5
refactor (remove quick method from DataSegmentManager and use flag)
sugi
parents:
452
diff
changeset
|
37 SendOption option = new SendOption(true, false); |
b004f62b83e5
refactor (remove quick method from DataSegmentManager and use flag)
sugi
parents:
452
diff
changeset
|
38 DataSegment.get(receiver.managerKey).peek(receiver, cs, option); |
b004f62b83e5
refactor (remove quick method from DataSegmentManager and use flag)
sugi
parents:
452
diff
changeset
|
39 } |
b004f62b83e5
refactor (remove quick method from DataSegmentManager and use flag)
sugi
parents:
452
diff
changeset
|
40 } |
b004f62b83e5
refactor (remove quick method from DataSegmentManager and use flag)
sugi
parents:
452
diff
changeset
|
41 |
b004f62b83e5
refactor (remove quick method from DataSegmentManager and use flag)
sugi
parents:
452
diff
changeset
|
42 |
b004f62b83e5
refactor (remove quick method from DataSegmentManager and use flag)
sugi
parents:
452
diff
changeset
|
43 public void peek(Receiver receiver) { |
b004f62b83e5
refactor (remove quick method from DataSegmentManager and use flag)
sugi
parents:
452
diff
changeset
|
44 cs.list.add(receiver); |
b004f62b83e5
refactor (remove quick method from DataSegmentManager and use flag)
sugi
parents:
452
diff
changeset
|
45 if (receiver.managerKey==null){ |
b004f62b83e5
refactor (remove quick method from DataSegmentManager and use flag)
sugi
parents:
452
diff
changeset
|
46 DataSegment.getLocal().peek(receiver, cs, null); |
b004f62b83e5
refactor (remove quick method from DataSegmentManager and use flag)
sugi
parents:
452
diff
changeset
|
47 } else { |
b004f62b83e5
refactor (remove quick method from DataSegmentManager and use flag)
sugi
parents:
452
diff
changeset
|
48 SendOption option = new SendOption(false, false); |
b004f62b83e5
refactor (remove quick method from DataSegmentManager and use flag)
sugi
parents:
452
diff
changeset
|
49 DataSegment.get(receiver.managerKey).peek(receiver, cs, option); |
417 | 50 } |
51 } | |
52 | |
53 public void quickTake(Receiver receiver) { | |
54 cs.list.add(receiver); | |
55 if (receiver.managerKey==null){ | |
455
b004f62b83e5
refactor (remove quick method from DataSegmentManager and use flag)
sugi
parents:
452
diff
changeset
|
56 DataSegment.getLocal().take(receiver, cs, null); |
417 | 57 } else { |
455
b004f62b83e5
refactor (remove quick method from DataSegmentManager and use flag)
sugi
parents:
452
diff
changeset
|
58 SendOption option = new SendOption(true, false); |
b004f62b83e5
refactor (remove quick method from DataSegmentManager and use flag)
sugi
parents:
452
diff
changeset
|
59 DataSegment.get(receiver.managerKey).take(receiver, cs, option); |
417 | 60 } |
61 } | |
62 | |
63 public void take(Receiver receiver) { | |
64 cs.list.add(receiver); | |
65 if (receiver.managerKey==null){ | |
455
b004f62b83e5
refactor (remove quick method from DataSegmentManager and use flag)
sugi
parents:
452
diff
changeset
|
66 DataSegment.getLocal().take(receiver, cs, null); |
417 | 67 } else { |
455
b004f62b83e5
refactor (remove quick method from DataSegmentManager and use flag)
sugi
parents:
452
diff
changeset
|
68 SendOption option = new SendOption(false, false); |
b004f62b83e5
refactor (remove quick method from DataSegmentManager and use flag)
sugi
parents:
452
diff
changeset
|
69 DataSegment.get(receiver.managerKey).take(receiver, cs, option); |
417 | 70 } |
71 } | |
345 | 72 |
417 | 73 public void reply(Receiver receiver, Command reply) { |
74 receiver.index = reply.index; | |
452 | 75 receiver.from = reply.reverseKey; |
76 receiver.setData(new ReceiveData(reply.val, reply.getCompressFlag(), reply.getSerializeFlag())); | |
417 | 77 receive(); |
78 } | |
79 | |
80 public void register() { | |
81 count.getAndIncrement(); | |
82 keyCount.getAndIncrement(); | |
83 } | |
345 | 84 |
417 | 85 public void setKey() { |
86 if (keyCount.decrementAndGet() == 0) { | |
87 receive(); | |
88 } | |
89 } | |
90 | |
91 public void receive() { | |
92 if (count.decrementAndGet() == 0) { | |
93 CodeSegmentManager.submit(cs); | |
94 } | |
95 } | |
345 | 96 |
417 | 97 /** |
98 * InputDataSegment factory | |
99 * @param type PEEK or TAKE | |
100 * @return Receiver of DataSegment reply | |
101 */ | |
102 public Receiver create(CommandType type) { | |
103 return new Receiver(this, type); | |
104 } | |
105 | |
106 public void recommand(Receiver receiver) { | |
107 // TODO why only local? | |
108 DataSegment.getLocal().recommand(receiver, cs); | |
109 } | |
110 | |
111 public void setCounter(int cnt){ | |
112 count.set(cnt); | |
113 } | |
345 | 114 } |