view src/fdl/PSXLindaImpl.java @ 36:d5bca4b5ee95

meta.sync() recursive call fix.
author kono
date Sun, 24 Aug 2008 21:06:39 +0900
parents fe338d497c72
children 81abceebc869
line wrap: on
line source


/*
 * @(#)PSXLinda.java       1.1 06/04/01
 *
 * Copyright 2006  Shinji KONO
 * 

   PSX Lidna
     Trasport layer of PSX Linda library

 */

package fdl;

import java.io.IOException;
//import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;


/**
 PSXLinda 
 *
 * @author Shinji Kono
 *
 * @param mytsid   Tuple Space ID

   Initialize connection channel for a tuple space

   one instance for each Tuple space connection

 */

public class PSXLindaImpl implements PSXLinda {
	private FederatedLinda fdl;
	SocketChannel socketChannel;
	public String host;
	public int port;
	public int mytsid;
	public PSXLinda next;
	static final boolean debug = false;	

	public PSXLindaImpl(FederatedLinda _fdl,int _mytsid,String _host,int _port) 
	throws IOException {
		host = _host;
		port = _port;
		mytsid = _mytsid;
		fdl = _fdl;

		socketChannel = SocketChannel.open();
		socketChannel.configureBlocking(false); // Selector needs this

		Socket socket = socketChannel.socket();
		// socket.setReuseAddress(true); Client side don't need this.
		socket.setTcpNoDelay(true);

		socketChannel.connect(new InetSocketAddress(_host, _port));
		while (! socketChannel.finishConnect()) {
			if (debug) {
				System.out.println("waiting for connect");
			}
			if (false) {
				try {
					wait(2000);
				} catch (InterruptedException e) {
				}
			}
		}
		System.err.println("Linda client connect to "+socketChannel);
		
		socketChannel.register(fdl.selector(), SelectionKey.OP_READ);


		checkConnect("PSXLinda");
	}

	protected void finalize() {
		if (socketChannel != null) {
			try {
				socketChannel.close();
			} catch (IOException e) {
			}
		}
	}

	private void checkConnect(String s) {
		System.out.print("Connected:");
		System.out.print(s);
		System.out.print(": ");
		System.out.println(socketChannel.isConnected());
	}

	public PSXReply in(int id) {
		PSXReply r = fdl.psx_queue(this, id, null, PSX.PSX_IN, (PSXCallback)null);
		return r;
	}

	public void in(int id, PSXCallback callback) {
		fdl.psx_queue(this, id, null, PSX.PSX_IN, callback);
	}

	public PSXReply ck(int id) {
		PSXReply r = fdl.psx_queue(this, id, null, PSX.PSX_IN, null);
		return r;
	}

	public void ck(int id, PSXCallback callback) {
		fdl.psx_queue(this, id, null, PSX.PSX_IN, callback);
	}

	public PSXReply out(int id, ByteBuffer data) {
		PSXReply r = fdl.psx_queue(this, id, data, PSX.PSX_OUT, null);
		return r;
	}

	public PSXReply update(int id, ByteBuffer data) {
		PSXReply r = fdl.psx_queue(this, id, data, PSX.PSX_UPDATE, null);
		return r;
	}

	public void update(int id, ByteBuffer data,PSXCallback callback) {
		fdl.psx_queue(this, id, data, PSX.PSX_UPDATE, callback);
	}

	public PSXReply rd(int id) {
		PSXReply r = fdl.psx_queue(this, id, null, PSX.PSX_RD, null);
		return r;
	}

	public void rd(int id, PSXCallback callback) {
		fdl.psx_queue(this, id, null, PSX.PSX_RD, callback);
	}

	public PSXLinda add(PSXLinda linda) {
		next = linda;
		return this;
	}

	public int sync() 
	throws IOException {
		return fdl.sync();
	}

	public int sync(long mtime) 
	throws IOException {
		return fdl.sync(mtime);
	}

	public void send(ByteBuffer command, ByteBuffer data) {
		PSX.send(socketChannel, command, data);
	}
}


/* end */