view src/fdl/PSXLinda.java @ 4:2023d9b31af9

fix parameter
author fuchita
date Tue, 12 Feb 2008 09:15:25 +0900
parents ae7e0e92c651
children aced4bfc15af
line wrap: on
line source


/*
 * @(#)PSXLinda.java       1.1 06/04/01
 *
 * Copyright 2006  Shinji KONO
 * 

   PSX Lidna
     Trasport layer of PSX Linda library

 */

package fdl;

import java.io.IOException;
//import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;


/**
 PSXLinda 
 *
 * @author Shinji Kono
 *
 * @param mytsid   Tuple Space ID

   Initialize connection channel for a tuple space

   one instance for each Tuple space connection
   
 */

public class PSXLinda implements PSXQueueInterface {
    private FederatedLinda fdl;
    private SocketChannel socketChannel;
    public String host;
    public int port;
    public int mytsid;
    public PSXLinda next;
    static final boolean debug = false;	
    
    public PSXLinda(FederatedLinda _fdl,int _mytsid,String _host,int _port) 
                                throws IOException {
	Socket socket;
        host = _host;
        port = _port;
        mytsid = _mytsid;
        fdl = _fdl;

	socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);

	socket = socketChannel.socket();
	// socket.setReuseAddress(true);
	socket.setTcpNoDelay(true);

	// can be blocked (thread required?)
	//socketChannel.connect(new InetSocketAddress(InetAddress.getLocalHost(), _port));
	socketChannel.connect(new InetSocketAddress(_host, _port));
	while (! socketChannel.finishConnect()) {
		if (debug) {
			System.out.println("waiting for connect");
		}
if (false) {
	    try {
		wait(2000);
	    } catch (InterruptedException e) {
	    }
}
	}

	socketChannel.register(fdl.selector(), SelectionKey.OP_READ);


	checkConnect("PSXLinda");
    }

    protected void finalize() {
	if (socketChannel != null) {
	    try {
		socketChannel.close();
	    } catch (IOException e) {
	    }
	}
    }

    private void checkConnect(String s) {
	System.out.print("Connected:");
	System.out.print(s);
	System.out.print(": ");
	System.out.println(socketChannel.isConnected());
    }

    public PSXReply in(int id) {
	PSXReply r = fdl.psx_queue(this, id, null, 0, PSX_IN, (PSXCallback)null);
	return r;
    }

    public void in(int id, PSXCallback callback) {
	fdl.psx_queue(this, id, null, 0, PSX_IN, callback);
    }

    public PSXReply ck(int id) {
	PSXReply r = fdl.psx_queue(this, id, null, 0, PSX_IN, null);
	return r;
    }

    public void ck(int id, PSXCallback callback) {
	fdl.psx_queue(this, id, null, 0, PSX_IN, callback);
    }

    public PSXReply out(int id, ByteBuffer data,int size) {
	PSXReply r = fdl.psx_queue(this, id, data, size, PSX_OUT, null);
	return r;
    }

    public PSXReply update(int id, ByteBuffer data,int size) {
	PSXReply r = fdl.psx_queue(this, id, data, size, PSX_UPDATE, null);
	return r;
    }

    public void update(int id, ByteBuffer data,int size,PSXCallback callback) {
	fdl.psx_queue(this, id, data, size, PSX_UPDATE, callback);
    }

    public PSXReply rd(int id) {
	PSXReply r = fdl.psx_queue(this, id, null, 0, PSX_RD, null);
	return r;
    }

    public void rd(int id, PSXCallback callback) {
	fdl.psx_queue(this, id, null, 0, PSX_RD, callback);
    }

    public PSXLinda add(PSXLinda linda) {
	next = linda;
	return this;
    }

    public void send(ByteBuffer command,ByteBuffer data)
	throws IOException {
    	if (debug) {
	checkConnect("send");
        if (command == null) {
            System.out.println("PSXLinda:command is null");
        }
        if (data == null) {
            System.out.println("PSXLinda:data is null");
        }
    	}
	socketChannel.write(command);
	if (data != null)
	    socketChannel.write(data);
    }

    public int sync() 
                                throws IOException {
	return fdl.sync();
    }

    public int sync(long mtime) 
                                throws IOException {
	return fdl.sync(mtime);
    }
}


/* end */