view src/alice/codesegment/InputDataSegment.java @ 313:4b99234c88d4

add init method for ids.
author sugi
date Tue, 03 Dec 2013 21:34:42 +0900
parents b90d03c6cb00
children f1f0638861ab
line wrap: on
line source

package alice.codesegment;

import java.util.concurrent.atomic.AtomicInteger;

import alice.datasegment.Command;
import alice.datasegment.CommandType;
import alice.datasegment.DataSegment;
import alice.datasegment.ReceiveLocalData;
import alice.datasegment.ReceiveRemoteData;
import alice.datasegment.Receiver;

/**
 * InputDataSegment Manager
 * 			keep tracking unbound/bound count
 * @author kazz
 *
 */
public class InputDataSegment {
	
	public CodeSegment cs;
	private AtomicInteger count = new AtomicInteger(1); // 1 for no input data segments
	private AtomicInteger keyCount = new AtomicInteger(0); // number of DataSegments
	public InputDataSegment(CodeSegment cs) {
		this.cs = cs;
	}
	
	public void initCounter(){
		count = new AtomicInteger(1);
		keyCount = new AtomicInteger(0);
	}
	
	public void quickPeek(Receiver receiver) {
		cs.list.add(receiver);
		if (receiver.managerKey==null){
			DataSegment.getLocal().peek(receiver, cs);
		} else {
			DataSegment.get(receiver.managerKey).quickPeek(receiver ,cs);
		}
	}
	
	public void quickTake(Receiver receiver) {
		cs.list.add(receiver);
		if (receiver.managerKey==null){
			DataSegment.getLocal().quickTake(receiver, cs);
		} else {
			DataSegment.get(receiver.managerKey).quickTake(receiver ,cs);
		}
	}
	
	public void peek(Receiver receiver) {
		cs.list.add(receiver);
		if (receiver.managerKey==null){
			DataSegment.getLocal().peek(receiver, cs);
		} else {
			DataSegment.get(receiver.managerKey).peek(receiver, cs);
		}
	}

	
	public void take(Receiver receiver) {
		cs.list.add(receiver);
		if (receiver.managerKey==null){
			DataSegment.getLocal().take(receiver, cs);
		} else {
			DataSegment.get(receiver.managerKey).take(receiver, cs);
		}
	}

	public void reply(Receiver receiver, Command reply) {
		receiver.index = reply.index;
		receiver.from = reply.reverseKey;
		if (reply.reverseKey==null){
			receiver.setData(new ReceiveRemoteData(reply.val));
		} else if (!reply.reverseKey.equals("local")) {
			receiver.setData(new ReceiveRemoteData(reply.val));
		} else {
			receiver.setData(new ReceiveLocalData(reply.obj));			
		}
		receive();
	}

	public void register() {
		count.getAndIncrement();
		keyCount.getAndIncrement();
	}
	
	public void setKey() {
		if (keyCount.decrementAndGet() == 0) {
			receive();
		}
	}
	
	public void receive() {
		if (count.decrementAndGet() == 0) {
			CodeSegmentManager.submit(cs);
		}
	}
	
	/**
	 * InputDataSegment factory
	 * @param type PEEK or TAKE
	 * @return Receiver of DataSegment reply 
	 */
	public Receiver create(CommandType type) {
		return new Receiver(this, type);
	}

	public void recommand(Receiver receiver) {
		// TODO why only local?
		DataSegment.getLocal().recommand(receiver, cs);
	}
	
	public void setCounter(int cnt){
		count.set(cnt);
	}
}