Mercurial > hg > Database > Alice
view src/main/java/alice/codesegment/InputDataSegment.java @ 527:bfec2c3ff1b8 dispose
change unzip
author | Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp> |
---|---|
date | Thu, 30 Apr 2015 18:14:02 +0900 |
parents | 928907206d21 |
children | b3c9554ccb1b |
line wrap: on
line source
package alice.codesegment; import java.util.concurrent.atomic.AtomicInteger; import alice.datasegment.Command; import alice.datasegment.CommandType; import alice.datasegment.DataSegment; import alice.datasegment.Receiver; import alice.datasegment.SendOption; /** * RemoteかLocalかで分けて処理する。ここに圧縮DSMへ投げる処理を追加。 * InputDataSegment Manager * keep tracking unbound/bound count * @author kazz * */ public class InputDataSegment { public CodeSegment cs; private AtomicInteger count = new AtomicInteger(1); // 1 for no input data segments private AtomicInteger keyCount = new AtomicInteger(0); // number of DataSegments public InputDataSegment(CodeSegment cs) { this.cs = cs; } public void init(){ count = new AtomicInteger(1); keyCount = new AtomicInteger(0); } public void quickPeek(Receiver receiver) {//SEDAで実行 cs.register(receiver); if (receiver.compressedFlag){ SendOption option = new SendOption(true, true); if (receiver.managerKey == null){//localの場合 DataSegment.getCompressedLocal().peek(receiver, cs, option); } else { if (DataSegment.contains(receiver.managerKey)) {//remoteの場合 DataSegment.get(receiver.managerKey + "!").peek(receiver, cs, option); } } } else { SendOption option = new SendOption(true, false); if (receiver.managerKey == null){ DataSegment.getLocal().peek(receiver, cs, option); } else { if (DataSegment.contains(receiver.managerKey)) { DataSegment.get(receiver.managerKey).peek(receiver, cs, option); } } } } public void peek(Receiver receiver) { cs.register(receiver); if (receiver.compressedFlag){ SendOption option = new SendOption(false, true); if (receiver.managerKey==null){ DataSegment.getCompressedLocal().peek(receiver, cs, option); } else { if (DataSegment.contains(receiver.managerKey)) { DataSegment.get(receiver.managerKey + "!").peek(receiver, cs, option); } } } else { SendOption option = new SendOption(false, false); if (receiver.managerKey==null){ DataSegment.getLocal().peek(receiver, cs, option); } else { if (DataSegment.contains(receiver.managerKey)) { DataSegment.get(receiver.managerKey).peek(receiver, cs, option); } } } } public void quickTake(Receiver receiver) { cs.register(receiver); if (receiver.compressedFlag){ SendOption option = new SendOption(true, true); if (receiver.managerKey==null){ DataSegment.getCompressedLocal().take(receiver, cs, option); } else { if (DataSegment.contains(receiver.managerKey)) { DataSegment.get(receiver.managerKey + "!").take(receiver, cs, option); } } } else { SendOption option = new SendOption(true, false); if (receiver.managerKey==null){ DataSegment.getLocal().take(receiver, cs, option); } else { if (DataSegment.contains(receiver.managerKey)) { DataSegment.get(receiver.managerKey).take(receiver, cs, option); } } } } public void take(Receiver receiver) { System.out.println("in TAKE"); cs.register(receiver); if (receiver.compressedFlag){ SendOption option = new SendOption(false, true); if (receiver.managerKey==null){// 指定なしの場合デフォはローカルになる DataSegment.getCompressedLocal().take(receiver, cs, option); } else { if (DataSegment.contains(receiver.managerKey)) { DataSegment.get(receiver.managerKey + "!").take(receiver, cs, option); } } } else { SendOption option = new SendOption(false, false); if (receiver.managerKey==null){ DataSegment.getLocal().take(receiver, cs, option); } else { if (DataSegment.contains(receiver.managerKey)) { DataSegment.get(receiver.managerKey).take(receiver, cs, option); } } } } public void reply(Receiver receiver, Command reply) { receiver.index = reply.index; receiver.from = reply.reverseKey; receiver.setData(reply.rData); receive(); } public void register() { count.getAndIncrement(); keyCount.getAndIncrement(); } public void setKey() { if (keyCount.decrementAndGet() == 0) { receive(); } } public void receive() { if (count.decrementAndGet() == 0) { CodeSegmentManager.submit(cs); } } /** * InputDataSegment factory * @param type PEEK or TAKE * @return Receiver of DataSegment reply */ public Receiver create(CommandType type) { return new Receiver(this, type); }//Receiverを作成 public Receiver create(CommandType type, boolean compressFlag) {//追加 return new Receiver(this, type, compressFlag); } public void recommand(Receiver receiver) { // TODO why only local? DataSegment.getLocal().recommand(receiver, cs); } public void setCounter(int cnt){ count.set(cnt); } }