view src/main/java/alice/codesegment/InputDataSegment.java @ 481:549cc29aca59 dispose

change access type private
author sugi
date Fri, 05 Dec 2014 17:22:11 +0900
parents be0b61986ff7
children 145c425db88d
line wrap: on
line source

package alice.codesegment;

import java.util.concurrent.atomic.AtomicInteger;

import alice.datasegment.Command;
import alice.datasegment.CommandType;
import alice.datasegment.DataSegment;
import alice.datasegment.Receiver;
import alice.datasegment.SendOption;

/**
 * InputDataSegment Manager
 * 			keep tracking unbound/bound count
 * @author kazz
 *
 */
public class InputDataSegment {

    public CodeSegment cs;
    private AtomicInteger count = new AtomicInteger(1); // 1 for no input data segments
    private AtomicInteger keyCount = new AtomicInteger(0); // number of DataSegments
    public InputDataSegment(CodeSegment cs) {
        this.cs = cs;
    }

    public void init(){
        count = new AtomicInteger(1);
        keyCount = new AtomicInteger(0);
    }

    public void quickPeek(Receiver receiver) {
        cs.register(receiver);
        if (receiver.managerKey==null){
            DataSegment.getLocal().peek(receiver, cs, null);
        } else {
            if (DataSegment.contains(receiver.managerKey)) {
                SendOption option = new SendOption(true, false);
                DataSegment.get(receiver.managerKey).peek(receiver, cs, option);
            }
        }
    }


    public void peek(Receiver receiver) {
        cs.register(receiver);
        if (receiver.managerKey==null){
            DataSegment.getLocal().peek(receiver, cs, null);
        } else {
            if (DataSegment.contains(receiver.managerKey)) {
                SendOption option = new SendOption(false, false);
                DataSegment.get(receiver.managerKey).peek(receiver, cs, option);
            }
        }
    }

    public void quickTake(Receiver receiver) {
        cs.register(receiver);
        if (receiver.managerKey==null){
            DataSegment.getLocal().take(receiver, cs, null);
        } else {
            if (DataSegment.contains(receiver.managerKey)) {
                SendOption option = new SendOption(true, false);
                DataSegment.get(receiver.managerKey).take(receiver, cs, option);
            }
        }
    }

    public void take(Receiver receiver) {
        cs.register(receiver);
        if (receiver.managerKey==null){
            DataSegment.getLocal().take(receiver, cs, null);
        } else {
            if (DataSegment.contains(receiver.managerKey)) {
                SendOption option = new SendOption(false, false);
                DataSegment.get(receiver.managerKey).take(receiver, cs, option);
            }
        }
    }

    public void reply(Receiver receiver, Command reply) {
        receiver.index = reply.index;
        receiver.from = reply.reverseKey;
        receiver.setData(reply.rData);
        receive();
    }

    public void register() {
        count.getAndIncrement();
        keyCount.getAndIncrement();
    }

    public void setKey() {
        if (keyCount.decrementAndGet() == 0) {
            receive();
        }
    }

    public void receive() {
        if (count.decrementAndGet() == 0) {
            CodeSegmentManager.submit(cs);
        }
    }

    /**
     * InputDataSegment factory
     * @param type PEEK or TAKE
     * @return Receiver of DataSegment reply
     */
    public Receiver create(CommandType type) {
        return new Receiver(this, type);
    }

    public void recommand(Receiver receiver) {
        // TODO why only local?
        DataSegment.getLocal().recommand(receiver, cs);
    }

    public void setCounter(int cnt){
        count.set(cnt);
    }
}