view src/fdl/FederatedLinda.java @ 101:d671c78d3757 fuchita

share selector in FDLindaServe and FederatedLinda
author one
date Wed, 26 May 2010 15:57:23 +0900
parents 270093b61001
children 8ae522e1a4bf
line wrap: on
line source


/*
 * @(#)FederatedLinda.java       1.1 06/04/01
 *
 * Copyright 2006  Shinji KONO
 * 

   PSX Lidna
     Trasport layer of PSX Linda library

 */

package fdl;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.AbstractSelector;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.logging.Level;

/**
 FederatedLinda 
 *
 * @author Shinji Kono
 *
 * @param mytsid   Tuple Space ID

   Initialize connection channel for a tuple space

   one instance for each Tuple space connection

 */

public class FederatedLinda  {

	FederatedLinda fdl;
	static int MAX_SEQUENCE = 2048;
	public static boolean debug = false;

	public int tid;
	public int seq;
	public int qsize;
	public PSXLinda linda;

	public Selector selector;

	public PSXQueue q_top,q_end;
	public PSXReply r_top,r_end;
	public Hashtable<Integer,PSXReply> seqHash = new Hashtable<Integer, PSXReply>();

	public static FederatedLinda init() {
		FederatedLinda fdl = new FederatedLinda();
		return fdl;
	}

	public static FederatedLinda init(Selector selector) {
		FederatedLinda fdl = new FederatedLinda(selector);
		return fdl;
	}
	
	private FederatedLinda() {
		try {
			selector = Selector.open();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	public FederatedLinda(Selector selector) {
		this.selector = selector;
	}

	public PSXLinda open(String _host,int _port) 
	throws IOException {
		tid++;
		PSXLindaImpl newlinda = new PSXLindaImpl(this,selector,tid,_host,_port);
		linda = newlinda.add(linda);
		return linda;
	}

	PSXLinda openFromMetaLinda(MetaLinda metaLinda, String _host, int _port)
			throws IOException {
		tid++;
		PSXLindaImpl newlinda = new PSXLindaImpl(this,metaLinda.fds.selector,tid,_host,_port);
		linda = newlinda.add(linda);
		return newlinda;
	}
	
	public PSXReply psx_queue(PSXLinda linda, int id, ByteBuffer s, int mode, PSXCallback callback) {
		PSXQueue c = new PSXQueue(linda,id,mode,s,callback);

		if (q_top == null) {
			c = q_end = q_top = c;
		} else {
			q_end.next = c;
			q_end = c;
		}
		qsize++;

		if (mode != PSX.PSX_OUT) {  
			PSXReply p = new PSXReply(PSX.PSX_REPLY,callback);
			p.seq = seq(p);
			c.setSeq(p.seq);
			if (r_top == null){
				r_end = r_top = p;
			} else {
				r_end.next = p;
				r_end = p;
			}
			return p;
		}
		return null;
	}

	public int seq(PSXReply reply) {
		Integer s;
		do {
			seq++;
			if (seq>MAX_SEQUENCE) {
				seq = 0;
			}
			s = new Integer(seq);
		} while (seqHash.containsKey(s));
		// log(Level.INFO,"hash value = "+s.hashCode());
		seqHash.put(s,reply);
		seq++;
		return seq-1;
	}

	public Selector selector() {
		return selector;
	}

	/**
	 * sync with no wait 
	 * @return 0
	 * @throws IOException
	 */
	public int sync()  {
		int key_num = 0;
		queueExec();

		try {
			if (selector.selectNow()>0) {
				for (Iterator<SelectionKey> it = selector.selectedKeys().iterator();it.hasNext(); ) {
					SelectionKey s = it.next();
					it.remove();
					try {
						if (!s.isReadable()) 
							throw new IOException();
						TupleHandler handle = (TupleHandler)s.attachment();
						handle.handle(s);
					} catch (IOException e) {
						s.cancel();
						log(Level.INFO,""+s.channel()+" is closed.");
					}
				}
			}
		} catch (IOException e) {
			e.printStackTrace();
		} catch (ClosedSelectorException e) {
			// client should be know
		}

		return key_num;
	}

	/**
	 * sync with mtimeout msec wait
	 * @param mtimeout   0 means indifinite wait
	 * @return 0
	 * @throws IOException
	 */
	public int sync(long mtimeout) {
		int key_num = 0;
		queueExec();

		try {
			if (selector.select(mtimeout)>0) {
		      for (Iterator<SelectionKey> it = selector.selectedKeys().iterator();it.hasNext(); ) {
			        SelectionKey s = it.next();
			        it.remove();
			        try {
			        	if (!s.isReadable()) 
			        		throw new IOException();
			        	TupleHandler handle = (TupleHandler)s.attachment();
			        	handle.handle(s);
			        } catch (IOException e) {
			        	s.cancel();
					log(Level.INFO,""+s.channel()+" is closed.");
			        }
		      }
			}
		} catch (IOException e) {
			e.printStackTrace();
		} catch (ClosedSelectorException e) {
			// client should be know
		}

		return key_num;
	}

	public void queueExec() {
		while (q_top != null){
			PSXQueue c = q_top;
			c.send();
			q_top = c.next;
			qsize--;
		}
	}
	
	PSXReply getReply(int rseq) {
		Integer a;

		PSXReply r = seqHash.get((a = new Integer(rseq)));
		seqHash.remove(a);
		return r;
	}

	public void log(Level level,String msg) {
		if (level!=Level.SEVERE && !debug) return;
		System.err.println(msg);
		if (level==Level.SEVERE)
			new IOException().setStackTrace(null);
	}

	public void wakeup() {
		selector.wakeup();		
	}

}

/* end */