view src/fdl/MetaLinda.java @ 101:d671c78d3757 fuchita

share selector in FDLindaServe and FederatedLinda
author one
date Wed, 26 May 2010 15:57:23 +0900
parents 270093b61001
children 3b000c4a4d31
line wrap: on
line source


/*
 * @(#)MetaLinda.java       1.1 06/04/01
 *
 * Copyright 2008  Shinji KONO
 * 

   Meta Lidna
     Trasport layer of Meta Linda API

 */

package fdl;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.LinkedList;

/**
 MetaLinda 
 *
 * @author Shinji Kono
 *

   meta tuple interface in Linda Server

 */

public class MetaLinda implements PSXLinda {

	public TupleSpace ts;
	public FDLindaServ fds;
	public FederatedLinda fdl;
	public PSXLinda next=null;
	public LinkedList<MetaReply> replies=new LinkedList<MetaReply>();

	public MetaLinda(TupleSpace ts,FDLindaServ fds) {
		this.ts = ts;
		this.fds = fds;
		fdl=FederatedLinda.init(fds.selector);
	}

	public PSXLinda open(String _host,int _port) 
	throws IOException {
		return fdl.openFromMetaLinda(this, _host, _port);
	}

	public PSXReply in(int id) {
		MetaReply r = new MetaReply(PSX.PSX_IN,id,ts);
		addReply(r);
		return r;
	}

	public void in(int id, PSXCallback callback) {
		MetaReply r = new MetaReply(PSX.PSX_IN,id,ts, callback);
		addReply(r);
	}

	private void addReply(MetaReply r) {
		replies.addLast(r);
	}

	public PSXReply ck(int id) {
		MetaReply r = new MetaReply(PSX.PSX_CHECK,id,ts);
		return r;
	}

	public void ck(int id, PSXCallback callback) {
		MetaReply r = new MetaReply(PSX.PSX_CHECK,id,ts,callback);
		addReply(r);
	}

	public PSXReply out(int id, ByteBuffer data) {
		MetaReply r = new MetaReply(PSX.PSX_OUT,id,ts,data,null);
		addReply(r);
		return r;
	}

	public PSXReply update(int id, ByteBuffer data) {
		MetaReply r = new MetaReply(PSX.PSX_UPDATE,id,ts,data,null);
		return r;
	}

	public void update(int id, ByteBuffer data,PSXCallback callback) {
		MetaReply r = new MetaReply(PSX.PSX_UPDATE,id,ts,data,callback);
		addReply(r);
	}

	public PSXReply rd(int id) {
		MetaReply r = new MetaReply(PSX.PSX_RD,id,ts);
		return r;
	}

	public void rd(int id, PSXCallback callback) {
		MetaReply r = new MetaReply(PSX.PSX_RD,id,ts,callback);
		addReply(r);
	}


	public void waitRd(int id, PSXCallback callback) {
		MetaReply r = new MetaReply(PSX.PSX_WAIT_RD,id,ts,callback);
		addReply(r);
	}

	public PSXReply waitRd(int id) {
		MetaReply r = new MetaReply(PSX.PSX_WAIT_RD,id,ts);
		return r;
	}
	
	public PSXLinda add(PSXLinda linda) {
		next = linda;
		return this;
	}

	/**
	 * Meta Sync with no wait
	 */
	public int sync() {
		fdl.queueExec();
		fds.checkTuple(); // fdl sync is also handled here
		return metaSync();
	}

	/**
	 * Meta Sync with wait
	 * @param timeout wait timeout msec, if 0 wait indefinitely
	 */
	public int sync(long timeout) {
		fdl.queueExec();
		fds.checkTuple(timeout); // fdl sync is also handled here
		return metaSync();
	}
		
	public int metaSync() {
		/*
		 * r.callback() may call meta.sync() and modifies the
		 * replies queue. Do current size of queue only. The
		 * rest is checked on the next sync call including
		 * the recursive case.
		 */
		boolean hasNewReply;
		do {
			hasNewReply = false;
			int count = replies.size();
			while (count-->0) {
				MetaReply r = replies.poll();
				// previous call back may call this sync and make 
				// replies shorter.
				if (r==null) break;
				if (r.ready()) {
					hasNewReply = true;
				} else {
					addReply(r);
				}
			}
		} while (hasNewReply);
		return 0;
	}
	
	public void wakeup() {
		fdl.wakeup();
	}

	public void send(ByteBuffer command, ByteBuffer data) {
	}

	public void setTupleSpaceHook(IOHandlerHook hook) {
		ts.hook = hook;
	}
}


/* end */