view src/fdl/MetaLinda.java @ 36:d5bca4b5ee95

meta.sync() recursive call fix.
author kono
date Sun, 24 Aug 2008 21:06:39 +0900
parents 64071f8e2e0d
children 2a366abc3f1f
line wrap: on
line source


/*
 * @(#)MetaLinda.java       1.1 06/04/01
 *
 * Copyright 2008  Shinji KONO
 * 

   Meta Lidna
     Trasport layer of Meta Linda API

 */

package fdl;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.LinkedList;

/**
 MetaLinda 
 *
 * @author Shinji Kono
 *

   meta tuple interface in Linda Server

 */

public class MetaLinda implements PSXLinda {

	public TupleSpace ts;
	public FDLindaServ fds;
	public FederatedLinda fdl=null;
	public PSXLinda next=null;
	private LinkedList<MetaReply> replies=new LinkedList<MetaReply>();

	public MetaLinda(TupleSpace ts,FDLindaServ fds) {
		this.ts = ts;
		this.fds = fds;
	}

	public PSXReply in(int id) {
		return null;
	}

	public void in(int id, PSXCallback callback) {
		MetaReply r = new MetaReply(PSX.PSX_IN,id,ts, callback);
		addReply(r);
	}

	private void addReply(MetaReply r) {
		replies.addLast(r);
	}

	public PSXReply ck(int id) {
		MetaReply r = new MetaReply(PSX.PSX_CHECK,id,ts);
		return r;
	}

	public void ck(int id, PSXCallback callback) {
		MetaReply r = new MetaReply(PSX.PSX_CHECK,id,ts,callback);
		addReply(r);
	}

	public PSXReply out(int id, ByteBuffer data) {
		MetaReply r = new MetaReply(PSX.PSX_OUT,id,ts,data,null);
		addReply(r);
		return r;
	}

	public PSXReply update(int id, ByteBuffer data) {
		MetaReply r = new MetaReply(PSX.PSX_UPDATE,id,ts,data,null);
		return r;
	}

	public void update(int id, ByteBuffer data,PSXCallback callback) {
		MetaReply r = new MetaReply(PSX.PSX_UPDATE,id,ts,data,callback);
		addReply(r);
	}

	public PSXReply rd(int id) {
		MetaReply r = new MetaReply(PSX.PSX_RD,id,ts);
		return r;
	}

	public void rd(int id, PSXCallback callback) {
		MetaReply r = new MetaReply(PSX.PSX_RD,id,ts,callback);
		addReply(r);
	}

	public PSXLinda add(PSXLinda linda) {
		next = linda;
		return this;
	}


	public int sync() {
		return sync(0);
	}

	public int sync(long timeout) {
		if (fdl!=null) {
			try {
				fdl.sync(timeout);
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		fds.checkTuple(timeout);
		/*
		 * r.callback() may call meta.sync() and modifies the
		 * replies queue. Do current size of queue only. The
		 * rest is checked on the next sync call including
		 * the recursive case.
		 */
			int count = replies.size();
			while(count-->0) {
				MetaReply r = replies.poll();
				// previous call back may call this sync and make 
				// replies shorter.
				if (r==null) break;
				if (r.ready()) {
				} else {
					addReply(r);
				}
			}
		return 0;
	}

	public void send(ByteBuffer command, ByteBuffer data) {
		
	}
}


/* end */