view src/fdl/TupleSpace.java @ 75:5b1f099da593

user id format fix
author one
date Sat, 24 Oct 2009 17:38:21 +0900
parents 046feb56a196
children 065450f32960
line wrap: on
line source

package fdl;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.logging.Level;

public class TupleSpace {
    static final boolean debug = true;
	public static int user = 0;
    public Tuple[] tuple_space;
    public IOHandlerHook hook = new NullIOHandlerHook();
	private FDLindaServ fds;
	public static final int MAX_TUPLE_ID = 65536;
    
	public TupleSpace(FDLindaServ fds) {
		// 読みこんだデータを格納するためのリストの初期化
        tuple_space = new Tuple[MAX_TUPLE_ID];      
        this.fds = fds;
	}
    

	public void newUser() {
		Tuple tuple;
		//初期生成        
        if((tuple = tuple_space[MAX_TUPLE_ID-1]) == null) {
        	tuple = tuple_space[MAX_TUPLE_ID-1] = new Tuple();
        	tuple.next = null;
        } else {
        	while(tuple.next != null) tuple = tuple.next;
        	tuple.next = new Tuple();
        	tuple = tuple.next;
        	tuple.next = null;
        }
        
        user++;
        
        ByteBuffer data = ByteBuffer.allocate(10);
        ByteBuffer r = ByteBuffer.allocate(10);
        int value = user;
        while(value>0) {
            r.put((byte) (value%10 + '0'));
            value /= 10;
        }
        for(int i=r.position()-1;i>=0;i--) {
            data.put(r.get(i));
        }
        data.flip();
        
        int id = TupleSpace.MAX_TUPLE_ID-1;
        int seq = 0;
        tuple.setTuple('o', id, seq, data);
        fds.log(Level.INFO,"Server: assign id "+user);
	}
	
	protected void Out(SelectionKey key,ByteBuffer command, ByteBuffer data) {
		Tuple tuple;
		int id;
		int datasize;
		char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET);
		command.rewind();
		id = (int)idc;

		datasize = command.getInt(PSX.LINDA_DATA_LENGTH_OFFSET);
		command.rewind();
		
		if (debug) fds.log(Level.INFO,"*** out command : id = "+id +" ***");
		int seq = command.getInt(PSX.LINDA_SEQ_OFFSET);
		command.rewind();
		hook.outHook(key,id,seq,'o',data);    
		
		while((tuple_space[id] != null) &&
				((tuple_space[id].mode == PSX.PSX_WAIT_RD)||(tuple_space[id].mode == PSX.PSX_RD))) {
			PSX.setAnserCommand(command, tuple_space[id].getSeq());
			PSX.send(tuple_space[id].ch, command, data);

			removeTuple(id);
			tuple = null;
		}
		if(tuple_space[id] != null && tuple_space[id].mode == PSX.PSX_IN) {
			PSX.setAnserCommand(command, tuple_space[id].getSeq());
			
			if(debug){
				if (debug) fds.log(Level.INFO,"send size "+datasize+" : mode = "+(char)'a');
			}
			PSX.send(tuple_space[id].ch, command, data);
			removeTuple(id);
			tuple = null;
			// Incoming Out tuple is consumed here, and wating IN tuple is also removed.
		} else if ((tuple_space[id] == null)|| (tuple_space[id].getMode() == PSX.PSX_OUT)) {
			if((tuple = tuple_space[id]) == null) {
				tuple = tuple_space[id] = new Tuple();
				tuple.next = null;
			}
			else {
				while(tuple.next != null) tuple = tuple.next;
				tuple.next = new Tuple();
				tuple = tuple.next;
				tuple.next = null;
			}
	
			tuple.setMode('o');
			tuple.setSeq(seq);
			tuple.setData(data);
			if(debug){
			    fds.log(Level.INFO, "data inserted len = "+tuple.getdataLength()+" : id = "+id);
			}
		} else {
			fds.log(Level.SEVERE, "Incorrect mode :"+(char)tuple_space[id].getMode());
		}
	}

	private void removeTuple(int id) {
		Tuple tuple;
		//後処理
		tuple = tuple_space[id];
		tuple_space[id] = tuple.next;
	}

	protected void Wait_Rd(SelectionKey key, ByteBuffer command, int mode) {
		Tuple tuple;
		int id;

		char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET);
		command.rewind();
		id = (int)idc;
		
		if (debug)
			fds.log(Level.INFO, "*** "+(char)mode+" command : id = "+ id +" ***\n");
		
		tuple = new Tuple();
		tuple.setMode(mode);
		int seq = command.getInt(PSX.LINDA_SEQ_OFFSET);
		command.rewind();
		tuple.setSeq(seq);
		
		hook.waitReadHook(key,id,seq,(char)mode);
		
		tuple.ch = (SocketChannel) key.channel();
		ByteBuffer buff = ByteBuffer.allocate(0);
		tuple.setData(buff);
		tuple.next = tuple_space[id];
		tuple_space[id] = tuple;
		if(debug){
			fds.log(Level.INFO, "data inserted insert seq = "+seq +", id = "+id);
		}
	}

	protected void In_Rd(SelectionKey key, ByteBuffer command, int mode)
			throws IOException {
		Tuple tuple = read_in_1(key, command, mode);

		if (tuple!=null) {
		//send
			ByteBuffer sendcommand = tuple.getCommand();
			ByteBuffer senddata = tuple.getData();
			PSX.send(key,sendcommand, senddata);
		}
	}

	private Tuple read_in_1(SelectionKey key, ByteBuffer command, int mode) {
		Tuple tuple;
		int id;
		//id = command.getInt(PSX.LINDA_ID_OFFSET);
		//int mode = command.getInt(PSX.LINDA_MODE_OFFSET);
		Tuple temp = null;
		
		char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET);
		id = (int)idc;
		int seq = command.getInt(PSX.LINDA_SEQ_OFFSET);
		command.rewind();

		if (debug) fds.log(Level.INFO, "*** "+(char)mode+" command : id = "+ id +" ***\n");    		
		hook.inHook(key,id,seq,(char)mode);

		tuple = tuple_space[id];
			
		//wを無視
		while(tuple != null && tuple.next != null && (tuple.mode == 'w')){
			temp = tuple;
			tuple = tuple.next;
		}
		
		if (tuple != null && (tuple.mode == 'o')){
			tuple.seq = seq;
			tuple = tupleIsAvailable(command, mode, tuple, id, temp);
		} else {
			tuple = setupWait(key, command, mode, tuple, seq, id);
		}
		return tuple;
	}

	public ByteBuffer IN(int id,int mode, ByteBuffer command) {
		/**
		 * IN for MetaLinda (no wait);
		 */
		Tuple tuple,temp=null;
		tuple = tuple_space[id];

		//wを無視
		while(tuple != null && tuple.next != null && (tuple.mode == 'w')){
			temp = tuple;
			tuple = tuple.next;
		}

		if (tuple != null && (tuple.mode == 'o')){
			ByteBuffer data = tuple.data;
			tuple.seq = 0;
			tupleIsAvailable(command, mode, tuple, id, temp);
			return data;
		} 
		return null;
	}
	
	private Tuple tupleIsAvailable(ByteBuffer command, int mode, Tuple tuple,
			int id, Tuple temp) {
		tuple.setCommand('a', tuple.seq);

		if(debug){
			int sendsize = tuple.getdataLength()+PSX.LINDA_HEADER_SIZE;
			fds.log(Level.INFO, "send size "+sendsize+" : mode = "+(char)tuple.getMode());
		}
		//INの場合はremoveする
		if(mode == PSX.PSX_IN) {
			if(temp != null){
				temp.next = tuple.next;
			}
			else {
				tuple_space[id] = tuple.next;
			}
		}
		return tuple;
	}

	private Tuple setupWait(SelectionKey key, ByteBuffer command, int mode,
			Tuple tuple, int seq, int id) {
		if(tuple == null) {
			tuple = tuple_space[id] = new Tuple((SocketChannel)key.channel());
			tuple.next = null;
		}else {
			while(tuple.next !=null) tuple =tuple.next;
			tuple.next= new Tuple((SocketChannel)key.channel());
			tuple = tuple.next;
			tuple.next = null;
		}
		
		tuple.setMode(mode);
		tuple.setSeq(seq);
		tuple.ch = (SocketChannel) key.channel();
		ByteBuffer buff = ByteBuffer.allocate(0);
		tuple.setData(buff);
		tuple = null;
			
		if(debug){
			fds.log(Level.INFO, "wait inserted seq = "+seq +", id = "+id);
		}
		return tuple;
	}

	protected void Check(SelectionKey key, ByteBuffer command) throws IOException {
		ByteBuffer data = check1(key,command);
		PSX.send(key, command, data);
	}

	public ByteBuffer check1(SelectionKey key,ByteBuffer command) {
		ByteBuffer data;
		Tuple tmpTuple;
		int id;
		char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET);
		command.rewind();
		id = (int)idc;
		int seq = command.getInt(PSX.LINDA_SEQ_OFFSET);
		command.rewind();
		hook.checkHook(key,id,seq,'c');
		
		tmpTuple = tuple_space[id];
		while(tmpTuple != null && tmpTuple.next != null && (tmpTuple.mode == 'w')){
			tmpTuple = tmpTuple.next;
		}
		if (tmpTuple != null && (tmpTuple.mode == 'o')) {
			command.putInt(PSX.LINDA_DATA_LENGTH_OFFSET, tmpTuple.getdataLength());
			command.rewind();
			data = tmpTuple.getData();
		}else {
			//means no out tuple
			command.putInt(PSX.LINDA_DATA_LENGTH_OFFSET, 0);
			command.rewind();
			data = ByteBuffer.allocate(0);
		}
		return data;
	}


}