view src/fdl/MetaLinda.java @ 91:4df1d50df52a

Ring: fdl.test.debug
author kazz <kazz@cr.ie.u-ryukyu.ac.jp>
date Tue, 16 Feb 2010 03:58:06 +0900
parents b658ff1eb90f
children 7bf2eeea23a0
line wrap: on
line source


/*
 * @(#)MetaLinda.java       1.1 06/04/01
 *
 * Copyright 2008  Shinji KONO
 * 

   Meta Lidna
     Trasport layer of Meta Linda API

 */

package fdl;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.LinkedList;

/**
 MetaLinda 
 *
 * @author Shinji Kono
 *

   meta tuple interface in Linda Server

 */

public class MetaLinda implements PSXLinda {

	public TupleSpace ts;
	public FDLindaServ fds;
	public FederatedLinda fdl=FederatedLinda.init();
	public PSXLinda next=null;
	public LinkedList<MetaReply> replies=new LinkedList<MetaReply>();

	public MetaLinda(TupleSpace ts,FDLindaServ fds) {
		this.ts = ts;
		this.fds = fds;
	}

	public PSXLinda open(String _host,int _port) 
	throws IOException {
		return fdl.openFromMetaLinda(this, _host, _port);
	}

	public PSXReply in(int id) {
		MetaReply r = new MetaReply(PSX.PSX_IN,id,ts);
		addReply(r);
		return r;
	}

	public void in(int id, PSXCallback callback) {
		MetaReply r = new MetaReply(PSX.PSX_IN,id,ts, callback);
		addReply(r);
	}

	private void addReply(MetaReply r) {
		replies.addLast(r);
	}

	public PSXReply ck(int id) {
		MetaReply r = new MetaReply(PSX.PSX_CHECK,id,ts);
		return r;
	}

	public void ck(int id, PSXCallback callback) {
		MetaReply r = new MetaReply(PSX.PSX_CHECK,id,ts,callback);
		addReply(r);
	}

	public PSXReply out(int id, ByteBuffer data) {
		MetaReply r = new MetaReply(PSX.PSX_OUT,id,ts,data,null);
		addReply(r);
		return r;
	}

	public PSXReply update(int id, ByteBuffer data) {
		MetaReply r = new MetaReply(PSX.PSX_UPDATE,id,ts,data,null);
		return r;
	}

	public void update(int id, ByteBuffer data,PSXCallback callback) {
		MetaReply r = new MetaReply(PSX.PSX_UPDATE,id,ts,data,callback);
		addReply(r);
	}

	public PSXReply rd(int id) {
		MetaReply r = new MetaReply(PSX.PSX_RD,id,ts);
		return r;
	}

	public void rd(int id, PSXCallback callback) {
		MetaReply r = new MetaReply(PSX.PSX_RD,id,ts,callback);
		addReply(r);
	}

	public PSXLinda add(PSXLinda linda) {
		next = linda;
		return this;
	}


	public int sync() {
		return sync(0);
	}

	public int sync(long timeout) {
		fdl.queueExec();
		fds.checkTuple(timeout); // fdl sync is also handled here
		/*
		 * r.callback() may call meta.sync() and modifies the
		 * replies queue. Do current size of queue only. The
		 * rest is checked on the next sync call including
		 * the recursive case.
		 */
		boolean hasNewReply;
		do {
			hasNewReply = false;
			int count = replies.size();
			while (count-->0) {
				MetaReply r = replies.poll();
				// previous call back may call this sync and make 
				// replies shorter.
				if (r==null) break;
				if (r.ready()) {
					hasNewReply = true;
				} else {
					addReply(r);
				}
			}
		} while (hasNewReply);
		return 0;
	}

	public void send(ByteBuffer command, ByteBuffer data) {
	}

	public void setTupleSpaceHook(IOHandlerHook hook) {
		ts.hook = hook;
	}

}


/* end */