view src/fdl/PSXLinda.java @ 19:0243987383b7

Meta Protocol Engine and sample implementation of event logger. ComDebug_Client needs fixes.
author kono
date Tue, 19 Aug 2008 05:33:32 +0900
parents 609b288f47f9
children b4fd7fb9135a
line wrap: on
line source


/*
 * @(#)PSXLinda.java       1.1 06/04/01
 *
 * Copyright 2006  Shinji KONO
 * 

   PSX Lidna
     Trasport layer of PSX Linda library

 */

package fdl;

import java.io.IOException;
//import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;


/**
 PSXLinda 
 *
 * @author Shinji Kono
 *
 * @param mytsid   Tuple Space ID

   Initialize connection channel for a tuple space

   one instance for each Tuple space connection

 */

public class PSXLinda implements PSXLindaInterface {
	private FederatedLinda fdl;
	private SocketChannel socketChannel;
	public String host;
	public int port;
	public int mytsid;
	public PSXLindaInterface next;
	static final boolean debug = false;	

	public PSXLinda(FederatedLinda _fdl,int _mytsid,String _host,int _port) 
	throws IOException {
		Socket socket;
		host = _host;
		port = _port;
		mytsid = _mytsid;
		fdl = _fdl;

		socketChannel = SocketChannel.open();
		socketChannel.configureBlocking(false);

		socket = socketChannel.socket();
		// socket.setReuseAddress(true);
		socket.setTcpNoDelay(true);

		// can be blocked (thread required?)
		//socketChannel.connect(new InetSocketAddress(InetAddress.getLocalHost(), _port));
		socketChannel.connect(new InetSocketAddress(_host, _port));
		while (! socketChannel.finishConnect()) {
			if (debug) {
				System.out.println("waiting for connect");
			}
			if (false) {
				try {
					wait(2000);
				} catch (InterruptedException e) {
				}
			}
		}

		socketChannel.register(fdl.selector(), SelectionKey.OP_READ);


		checkConnect("PSXLinda");
	}

	protected void finalize() {
		if (socketChannel != null) {
			try {
				socketChannel.close();
			} catch (IOException e) {
			}
		}
	}

	private void checkConnect(String s) {
		System.out.print("Connected:");
		System.out.print(s);
		System.out.print(": ");
		System.out.println(socketChannel.isConnected());
	}

	public PSXReply in(int id) {
		PSXReply r = fdl.psx_queue(this, id, null, 0, PSX.PSX_IN, (PSXCallback)null);
		return r;
	}

	public void in(int id, PSXCallback callback) {
		fdl.psx_queue(this, id, null, 0, PSX.PSX_IN, callback);
	}

	public PSXReply ck(int id) {
		PSXReply r = fdl.psx_queue(this, id, null, 0, PSX.PSX_IN, null);
		return r;
	}

	public void ck(int id, PSXCallback callback) {
		fdl.psx_queue(this, id, null, 0, PSX.PSX_IN, callback);
	}

	public PSXReply out(int id, ByteBuffer data,int size) {
		PSXReply r = fdl.psx_queue(this, id, data, size, PSX.PSX_OUT, null);
		return r;
	}

	public PSXReply update(int id, ByteBuffer data,int size) {
		PSXReply r = fdl.psx_queue(this, id, data, size, PSX.PSX_UPDATE, null);
		return r;
	}

	public void update(int id, ByteBuffer data,int size,PSXCallback callback) {
		fdl.psx_queue(this, id, data, size, PSX.PSX_UPDATE, callback);
	}

	public PSXReply rd(int id) {
		PSXReply r = fdl.psx_queue(this, id, null, 0, PSX.PSX_RD, null);
		return r;
	}

	public void rd(int id, PSXCallback callback) {
		fdl.psx_queue(this, id, null, 0, PSX.PSX_RD, callback);
	}

	public PSXLindaInterface add(PSXLindaInterface linda) {
		next = linda;
		return this;
	}

	public void send(ByteBuffer command,ByteBuffer data)
	throws IOException {
		if (debug) {
			checkConnect("send");
			if (command == null) {
				System.out.println("PSXLinda:command is null");
			}
			if (data == null) {
				System.out.println("PSXLinda:data is null");
			}
		}
		socketChannel.write(command);
		if (data != null)
			socketChannel.write(data);
	}

	public int sync() 
	throws IOException {
		return fdl.sync();
	}

	public int sync(long mtime) 
	throws IOException {
		return fdl.sync(mtime);
	}
}


/* end */