Mercurial > hg > Database > Alice
diff src/main/java/alice/codesegment/InputDataSegment.java @ 345:8f71c3e6f11d
Change directory structure Maven standard
author | sugi |
---|---|
date | Wed, 16 Apr 2014 18:26:07 +0900 |
parents | |
children | aadea6a59376 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/alice/codesegment/InputDataSegment.java Wed Apr 16 18:26:07 2014 +0900 @@ -0,0 +1,116 @@ +package alice.codesegment; + +import java.util.concurrent.atomic.AtomicInteger; + +import alice.datasegment.Command; +import alice.datasegment.CommandType; +import alice.datasegment.DataSegment; +import alice.datasegment.ReceiveLocalData; +import alice.datasegment.ReceiveRemoteData; +import alice.datasegment.Receiver; + +/** + * InputDataSegment Manager + * keep tracking unbound/bound count + * @author kazz + * + */ +public class InputDataSegment { + + public CodeSegment cs; + private AtomicInteger count = new AtomicInteger(1); // 1 for no input data segments + private AtomicInteger keyCount = new AtomicInteger(0); // number of DataSegments + public InputDataSegment(CodeSegment cs) { + this.cs = cs; + } + + public void init(){ + count = new AtomicInteger(1); + keyCount = new AtomicInteger(0); + } + + public void quickPeek(Receiver receiver) { + cs.list.add(receiver); + if (receiver.managerKey==null){ + DataSegment.getLocal().peek(receiver, cs); + } else { + DataSegment.get(receiver.managerKey).quickPeek(receiver ,cs); + } + } + + public void quickTake(Receiver receiver) { + cs.list.add(receiver); + if (receiver.managerKey==null){ + DataSegment.getLocal().quickTake(receiver, cs); + } else { + DataSegment.get(receiver.managerKey).quickTake(receiver ,cs); + } + } + + public void peek(Receiver receiver) { + cs.list.add(receiver); + if (receiver.managerKey==null){ + DataSegment.getLocal().peek(receiver, cs); + } else { + DataSegment.get(receiver.managerKey).peek(receiver, cs); + } + } + + + public void take(Receiver receiver) { + cs.list.add(receiver); + if (receiver.managerKey==null){ + DataSegment.getLocal().take(receiver, cs); + } else { + DataSegment.get(receiver.managerKey).take(receiver, cs); + } + } + + public void reply(Receiver receiver, Command reply) { + receiver.index = reply.index; + receiver.from = reply.reverseKey; + if (reply.reverseKey==null){ + receiver.setData(new ReceiveRemoteData(reply.val)); + } else if (!reply.reverseKey.equals("local")) { + receiver.setData(new ReceiveRemoteData(reply.val)); + } else { + receiver.setData(new ReceiveLocalData(reply.obj)); + } + receive(); + } + + public void register() { + count.getAndIncrement(); + keyCount.getAndIncrement(); + } + + public void setKey() { + if (keyCount.decrementAndGet() == 0) { + receive(); + } + } + + public void receive() { + if (count.decrementAndGet() == 0) { + CodeSegmentManager.submit(cs); + } + } + + /** + * InputDataSegment factory + * @param type PEEK or TAKE + * @return Receiver of DataSegment reply + */ + public Receiver create(CommandType type) { + return new Receiver(this, type); + } + + public void recommand(Receiver receiver) { + // TODO why only local? + DataSegment.getLocal().recommand(receiver, cs); + } + + public void setCounter(int cnt){ + count.set(cnt); + } +}