view src/fdl/PSXLindaImpl.java @ 93:a1d796c0e975 fuchita

Wait Read Tester
author one
date Tue, 25 May 2010 22:57:54 +0900
parents 82a292aa41ad
children 7bf2eeea23a0
line wrap: on
line source


/*
 * @(#)PSXLinda.java       1.1 06/04/01
 *
 * Copyright 2006  Shinji KONO
 * 

   PSX Lidna
     Trasport layer of PSX Linda library

 */

package fdl;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.logging.Level;


/**
 PSXLinda 
 *
 * @author Shinji Kono
 *
 * @param mytsid   Tuple Space ID

   Initialize connection channel for a tuple space

   one instance for each Tuple space connection

 */

public class PSXLindaImpl implements PSXLinda,TupleHandler {
	private FederatedLinda fdl;
	SocketChannel socketChannel;
	public String host;
	public int port;
	public int mytsid;
	public PSXLinda next;
	static final boolean debug = false;	

	public PSXLindaImpl(FederatedLinda _fdl,Selector selector,int _mytsid,String _host,int _port) 
	throws IOException {
		host = _host;
		port = _port;
		mytsid = _mytsid;
		fdl = _fdl;

		socketChannel = SocketChannel.open();
		socketChannel.configureBlocking(false); // Selector needs this

		Socket socket = socketChannel.socket();
		// socket.setReuseAddress(true); Client side don't need this.
		socket.setTcpNoDelay(true);

		socketChannel.connect(new InetSocketAddress(_host, _port));
		while (! socketChannel.finishConnect()) {
			if (debug) {
			    fdl.log(Level.INFO,"waiting for connect");
			}
			if (false) {
				try {
					wait(2000);
				} catch (InterruptedException e) {
				}
			}
		}
		fdl.log(Level.INFO,"Linda client connect to "+socketChannel);
        socketChannel.register(selector,SelectionKey.OP_READ,this);

		checkConnect("PSXLinda");
	}
	

	public void handle(SelectionKey key) throws ClosedChannelException,
			IOException {
		SocketChannel sock = (SocketChannel)key.channel();
		if (sock!=socketChannel) {
		    fdl.log(Level.SEVERE,"wrong socket on PSXLindaImple.");
		}
		ByteBuffer command = ByteBuffer.allocate(PSX.LINDA_HEADER_SIZE);
		command.order(ByteOrder.BIG_ENDIAN);
		ByteBuffer data = PSX.receivePacket(sock, command);

		if (debug) {
			PSX.printCommand("chkServe:",command, data);
		}

		int rseq = command.getInt(PSX.LINDA_SEQ_OFFSET);
		int mode = command.get(PSX.LINDA_MODE_OFFSET);
		PSXReply r = fdl.getReply(rseq);
		if (r==null) {
			fdl.log(Level.SEVERE,"Illegal answer sequence.");
			return;
		}
		r.setAnswer(mode,command,data);

		if (r.callback != null ) {
			r.callback.callback(data);
		}
	}


	protected void finalize() {
		if (socketChannel != null) {
			try {
				socketChannel.close();
			} catch (IOException e) {
			}
		}
	}

	private void checkConnect(String s) {
		fdl.log(Level.INFO, "Connected:"+ s +": "
		    +socketChannel.isConnected());
	}

	public PSXReply in(int id) {
		PSXReply r = fdl.psx_queue(this, id, null, PSX.PSX_IN, (PSXCallback)null);
		return r;
	}

	public void in(int id, PSXCallback callback) {
		fdl.psx_queue(this, id, null, PSX.PSX_IN, callback);
	}

	public PSXReply ck(int id) {
		PSXReply r = fdl.psx_queue(this, id, null, PSX.PSX_IN, null);
		return r;
	}

	public void ck(int id, PSXCallback callback) {
		fdl.psx_queue(this, id, null, PSX.PSX_IN, callback);
	}

	public PSXReply out(int id, ByteBuffer data) {
		PSXReply r = fdl.psx_queue(this, id, data, PSX.PSX_OUT, null);
		return r;
	}

	public PSXReply update(int id, ByteBuffer data) {
		PSXReply r = fdl.psx_queue(this, id, data, PSX.PSX_UPDATE, null);
		return r;
	}

	public void update(int id, ByteBuffer data,PSXCallback callback) {
		fdl.psx_queue(this, id, data, PSX.PSX_UPDATE, callback);
	}

	public PSXReply rd(int id) {
		PSXReply r = fdl.psx_queue(this, id, null, PSX.PSX_RD, null);
		return r;
	}

	public void rd(int id, PSXCallback callback) {
		fdl.psx_queue(this, id, null, PSX.PSX_RD, callback);
	}

	public PSXLinda add(PSXLinda linda) {
		next = linda;
		return this;
	}

	public int sync() 
	throws IOException {
		return fdl.sync();
	}

	public int sync(long mtime) 
	throws IOException {
		return fdl.sync(mtime);
	}

	public void send(ByteBuffer command, ByteBuffer data) {
		PSX.send(socketChannel, command, data);
	}


	public PSXReply waitRd(int id) {
		PSXReply r = fdl.psx_queue(this, id, null, PSX.PSX_WAIT_RD, null);
		return r;
	}

	public void waitRd(int id, PSXCallback callback) {
		fdl.psx_queue(this, id, null, PSX.PSX_WAIT_RD, callback);
	}

}


/* end */