view src/main/java/alice/codesegment/InputDataSegment.java @ 458:bcf6f4a6fcd0 dispose

need set Meta DataSegment PUT API
author sugi
date Mon, 03 Nov 2014 17:12:53 +0900
parents b004f62b83e5
children be0b61986ff7
line wrap: on
line source

package alice.codesegment;

import java.util.concurrent.atomic.AtomicInteger;

import alice.datasegment.Command;
import alice.datasegment.CommandType;
import alice.datasegment.DataSegment;
import alice.datasegment.Receiver;
import alice.datasegment.SendOption;

/**
 * InputDataSegment Manager
 * 			keep tracking unbound/bound count
 * @author kazz
 *
 */
public class InputDataSegment {

    public CodeSegment cs;
    private AtomicInteger count = new AtomicInteger(1); // 1 for no input data segments
    private AtomicInteger keyCount = new AtomicInteger(0); // number of DataSegments
    public InputDataSegment(CodeSegment cs) {
        this.cs = cs;
    }

    public void init(){
        count = new AtomicInteger(1);
        keyCount = new AtomicInteger(0);
    }

    public void quickPeek(Receiver receiver) {
        cs.list.add(receiver);
        if (receiver.managerKey==null){
            DataSegment.getLocal().peek(receiver, cs, null);
        } else {
            SendOption option = new SendOption(true, false);
            DataSegment.get(receiver.managerKey).peek(receiver, cs, option);
        }
    }


    public void peek(Receiver receiver) {
        cs.list.add(receiver);
        if (receiver.managerKey==null){
            DataSegment.getLocal().peek(receiver, cs, null);
        } else {
            SendOption option = new SendOption(false, false);
            DataSegment.get(receiver.managerKey).peek(receiver, cs, option);
        }
    }

    public void quickTake(Receiver receiver) {
        cs.list.add(receiver);
        if (receiver.managerKey==null){
            DataSegment.getLocal().take(receiver, cs, null);
        } else {
            SendOption option = new SendOption(true, false);
            DataSegment.get(receiver.managerKey).take(receiver, cs, option);
        }
    }

    public void take(Receiver receiver) {
        cs.list.add(receiver);
        if (receiver.managerKey==null){
            DataSegment.getLocal().take(receiver, cs, null);
        } else {
            SendOption option = new SendOption(false, false);
            DataSegment.get(receiver.managerKey).take(receiver, cs, option);
        }
    }

    public void reply(Receiver receiver, Command reply) {
        receiver.index = reply.index;
        receiver.from = reply.reverseKey;        
        receiver.setData(reply.rData);
        receive();
    }

    public void register() {
        count.getAndIncrement();
        keyCount.getAndIncrement();
    }

    public void setKey() {
        if (keyCount.decrementAndGet() == 0) {
            receive();
        }
    }

    public void receive() {
        if (count.decrementAndGet() == 0) {
            CodeSegmentManager.submit(cs);
        }
    }

    /**
     * InputDataSegment factory
     * @param type PEEK or TAKE
     * @return Receiver of DataSegment reply 
     */
    public Receiver create(CommandType type) {
        return new Receiver(this, type);
    }

    public void recommand(Receiver receiver) {
        // TODO why only local?
        DataSegment.getLocal().recommand(receiver, cs);
    }

    public void setCounter(int cnt){
        count.set(cnt);
    }
}