view src/main/java/alice/codesegment/InputDataSegment.java @ 523:145c425db88d dispose

add CompressedLDSM
author Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
date Thu, 09 Apr 2015 18:36:26 +0900
parents 549cc29aca59
children 30a74eee59c7
line wrap: on
line source

package alice.codesegment;

import java.util.concurrent.atomic.AtomicInteger;

import alice.datasegment.Command;
import alice.datasegment.CommandType;
import alice.datasegment.DataSegment;
import alice.datasegment.Receiver;
import alice.datasegment.SendOption;

/**
 * RemoteかLocalかで分けて処理する。ここに圧縮DSMへ投げる処理を追加。
 * InputDataSegment Manager
 * 			keep tracking unbound/bound count
 * @author kazz
 *
 */
public class InputDataSegment {

    public CodeSegment cs;
    private AtomicInteger count = new AtomicInteger(1); // 1 for no input data segments
    private AtomicInteger keyCount = new AtomicInteger(0); // number of DataSegments

    public InputDataSegment(CodeSegment cs) {
        this.cs = cs;
    }

    public void init(){
        count = new AtomicInteger(1);
        keyCount = new AtomicInteger(0);
    }

    public void quickPeek(Receiver receiver) {//SEDAで実行
        cs.register(receiver);

        if (receiver.compressedFlag){
            if (receiver.managerKey == null){//localの場合
                DataSegment.getCompressedLocal().peek(receiver, cs, true);
            } else {
                if (DataSegment.contains(receiver.managerKey)) {//remoteの場合
                    DataSegment.get(receiver.managerKey).peek(receiver, cs, true);//remoteはあとで考える。DataSegment.connect
                }
            }
        } else {
            if (receiver.managerKey == null){
                DataSegment.getLocal().peek(receiver, cs, true);
            } else {
                if (DataSegment.contains(receiver.managerKey)) {
                    DataSegment.get(receiver.managerKey).peek(receiver, cs, true);
                }
            }
        }

    }


    public void peek(Receiver receiver) {
        cs.register(receiver);

        if (receiver.compressedFlag){
            if (receiver.managerKey==null){
                DataSegment.getCompressedLocal().peek(receiver, cs, false);
            } else {
                if (DataSegment.contains(receiver.managerKey)) {
                    DataSegment.get(receiver.managerKey).peek(receiver, cs, false);
                }
            }
        } else {
            if (receiver.managerKey==null){
                DataSegment.getLocal().peek(receiver, cs, false);
            } else {
                if (DataSegment.contains(receiver.managerKey)) {
                    DataSegment.get(receiver.managerKey).peek(receiver, cs, false);
                }
            }
        }

    }

    public void quickTake(Receiver receiver) {
        cs.register(receiver);
        if (receiver.managerKey==null){
            DataSegment.getLocal().take(receiver, cs, true);
        } else {
            if (DataSegment.contains(receiver.managerKey)) {
                DataSegment.get(receiver.managerKey).take(receiver, cs, true);
            }
        }
    }

    public void take(Receiver receiver) {
        cs.register(receiver);
        if (receiver.managerKey==null){
            DataSegment.getLocal().take(receiver, cs, false);
        } else {
            if (DataSegment.contains(receiver.managerKey)) {
                DataSegment.get(receiver.managerKey).take(receiver, cs, false);
            }
        }
    }

    public void reply(Receiver receiver, Command reply) {
        receiver.index = reply.index;
        receiver.from = reply.reverseKey;
        receiver.setData(reply.rData);
        receive();
    }

    public void register() {
        count.getAndIncrement();
        keyCount.getAndIncrement();
    }

    public void setKey() {
        if (keyCount.decrementAndGet() == 0) {
            receive();
        }
    }

    public void receive() {
        if (count.decrementAndGet() == 0) {
            CodeSegmentManager.submit(cs);
        }
    }

    /**
     * InputDataSegment factory
     * @param type PEEK or TAKE
     * @return Receiver of DataSegment reply
     */
    public Receiver create(CommandType type) {
        return new Receiver(this, type);
    }//Receiverを作成
    public Receiver create(CommandType type, boolean compressFlag) {//追加
        return new Receiver(this, type, compressFlag);
    }//Receiverを作成

    public void recommand(Receiver receiver) {
        // TODO why only local?
        DataSegment.getLocal().recommand(receiver, cs);
    }

    public void setCounter(int cnt){
        count.set(cnt);
    }
}