changeset 11:1355eb28e41d

fix seqHash removal on checkServe.
author kono
date Sat, 09 Aug 2008 19:06:32 +0900
parents abd8cd62b4c6
children 34821c03b206
files src/fdl/FederatedLinda.java
diffstat 1 files changed, 262 insertions(+), 260 deletions(-) [+]
line wrap: on
line diff
--- a/src/fdl/FederatedLinda.java	Tue Jun 03 19:58:21 2008 +0900
+++ b/src/fdl/FederatedLinda.java	Sat Aug 09 19:06:32 2008 +0900
@@ -39,215 +39,215 @@
 
 public class FederatedLinda implements PSXQueueInterface {
 
-    static FederatedLinda fdl;
-    static int MAX_SEQUENCE = 2048;
-    static boolean debug = true;
+	static FederatedLinda fdl;
+	static int MAX_SEQUENCE = 2048;
+	static boolean debug = true;
 
-    public int tid;
-    public int seq;
-    public int qsize;
-    public PSXLinda linda;
+	public int tid;
+	public int seq;
+	public int qsize;
+	public PSXLinda linda;
+
+	public Selector selector;
 
-    public Selector selector;
+	public PSXQueue q_top,q_end;
+	public PSXReply r_top,r_end;
+	public Hashtable<Integer,PSXReply> seqHash;
 
-    public PSXQueue q_top,q_end;
-    public PSXReply r_top,r_end;
-    public Hashtable<Integer,PSXReply> seqHash;
-
-    static FederatedLinda init() 
-			throws IOException {
-	if (fdl==null) {
-	    fdl = new FederatedLinda();
+	static FederatedLinda init() 
+	throws IOException {
+		if (fdl==null) {
+			fdl = new FederatedLinda();
+		}
+		return fdl;
 	}
-	return fdl;
-    }
 
-    private FederatedLinda() 
-			throws IOException {
-	selector = Selector.open();
-	seqHash =  new Hashtable<Integer, PSXReply>();
-    }
+	private FederatedLinda() 
+	throws IOException {
+		selector = Selector.open();
+		seqHash =  new Hashtable<Integer, PSXReply>();
+	}
 
-    public PSXLinda open(String _host,int _port) 
-                                throws IOException {
-	tid++;
-	// System.out.print("Tid = ");
-	// System.out.println(tid);
-	PSXLinda newlinda = new PSXLinda(this,tid,_host,_port);
-	linda = newlinda.add(linda);
-	return linda;
-    }
+	public PSXLinda open(String _host,int _port) 
+	throws IOException {
+		tid++;
+		// System.out.print("Tid = ");
+		// System.out.println(tid);
+		PSXLinda newlinda = new PSXLinda(this,tid,_host,_port);
+		linda = newlinda.add(linda);
+		return linda;
+	}
 
-/**
+	/**
   psx_queue (unsigned int tspace_id, unsigned int id,
              unsigned int size, unsigned char *data, char mode,
              void(*callback)(char*,void*), void * obj):   
- */    
-    
-    public PSXReply psx_queue(PSXLinda linda, int id, ByteBuffer s, int length, int mode, PSXCallback callback) {
-	PSXQueue c = new PSXQueue(linda,id,mode,s,length,callback);
+	 */    
+
+	public PSXReply psx_queue(PSXLinda linda, int id, ByteBuffer s, int length, int mode, PSXCallback callback) {
+		PSXQueue c = new PSXQueue(linda,id,mode,s,length,callback);
 
-	if (q_top == null) {
-	    c = q_end = q_top = c;
-	} else {
-	    q_end.next = c;
-	    q_end = c;
-	}
-	qsize++;
-	
-	if (mode != PSX_OUT) {  
-	    PSXReply p = new PSXReply(PSX_REPLY,callback);
-	    p.seq = seq(p);
-	    c.setSeq(p.seq);
-	    if (debug) {
-	    	System.out.print("Integer compare=");
-	    	System.out.println((new Integer(2)).equals(new Integer(2)));
-	    	System.out.print("Seding seq=");
-	    	System.out.println(p.seq);
-	    }
-	    if (r_top == null){
-		r_end = r_top = p;
-	    } else {
-		r_end.next = p;
-		r_end = p;
-	    }
-	    return p;
-	}
-	return null;
-    }
+		if (q_top == null) {
+			c = q_end = q_top = c;
+		} else {
+			q_end.next = c;
+			q_end = c;
+		}
+		qsize++;
 
-    public int seq(PSXReply reply) {
-	Integer s;
-	do {
-	    seq++;
-	    if (seq>MAX_SEQUENCE) {
-		seq = 0;
-	    }
-	    s = new Integer(seq);
-	} while (seqHash.containsKey(s));
-	if (debug) {
-		System.out.print("hash value = ");
-		System.out.println(s.hashCode());
+		if (mode != PSX_OUT) {  
+			PSXReply p = new PSXReply(PSX_REPLY,callback);
+			p.seq = seq(p);
+			c.setSeq(p.seq);
+			if (debug) {
+				System.out.print("Integer compare=");
+				System.out.println((new Integer(2)).equals(new Integer(2)));
+				System.out.print("Seding seq=");
+				System.out.println(p.seq);
+			}
+			if (r_top == null){
+				r_end = r_top = p;
+			} else {
+				r_end.next = p;
+				r_end = p;
+			}
+			return p;
+		}
+		return null;
 	}
-	seqHash.put(s,reply);
-	seq++;
-	return seq-1;
-    }
 
-    public Selector selector() {
-	return selector;
-    }
-
-    public int sync() throws IOException {
-	return sync(0);
-    }
+	public int seq(PSXReply reply) {
+		Integer s;
+		do {
+			seq++;
+			if (seq>MAX_SEQUENCE) {
+				seq = 0;
+			}
+			s = new Integer(seq);
+		} while (seqHash.containsKey(s));
+		if (debug) {
+			System.out.print("hash value = ");
+			System.out.println(s.hashCode());
+		}
+		seqHash.put(s,reply);
+		seq++;
+		return seq-1;
+	}
 
-    public int sync(long mtimeout) 
-                                throws IOException {
-        int key_num = 0;
-	Set<SelectionKey> keys;
+	public Selector selector() {
+		return selector;
+	}
 
-	while (q_top != null){
-	    PSXQueue c = q_top;
-	    c.Send();
-	    q_top = c.next;
-	    // psx_free(c);
-	    // q_top = c = t;
-	    qsize--;
+	public int sync() throws IOException {
+		return sync(0);
 	}
 
-        try {
-            key_num = selector.select(mtimeout);
-            keys = selector.selectedKeys();
-	    for (SelectionKey key : keys) {
-	    // System.out.println("selecting");
-		SocketChannel sock = (SocketChannel)key.channel();
-		chkServe(sock);
-	    }
-        } catch (IOException e) {
-            e.printStackTrace();
-        } catch (ClosedSelectorException e) {
-            e.printStackTrace();
-        }
+	public int sync(long mtimeout) 
+	throws IOException {
+		int key_num = 0;
+		Set<SelectionKey> keys;
 
-        return key_num;
-    }
-    
-    public int sync_com(long mtimeout) 
-    throws IOException {
-    	int key_num = 0;
-    	Set<SelectionKey> keys;
+		while (q_top != null){
+			PSXQueue c = q_top;
+			c.Send();
+			q_top = c.next;
+			// psx_free(c);
+			// q_top = c = t;
+			qsize--;
+		}
 
-    	while (q_top != null){
-    		PSXQueue c = q_top;
-    		c.Send();
-    		q_top = c.next;
-    		// psx_free(c);
-    		// q_top = c = t;
-    		qsize--;
-    	}
+		try {
+			key_num = selector.select(mtimeout);
+			keys = selector.selectedKeys();
+			for (SelectionKey key : keys) {
+				// System.out.println("selecting");
+				SocketChannel sock = (SocketChannel)key.channel();
+				chkServe(sock);
+			}
+		} catch (IOException e) {
+			e.printStackTrace();
+		} catch (ClosedSelectorException e) {
+			e.printStackTrace();
+		}
 
-    	try {
-    		key_num = selector.select(mtimeout);
-    		keys = selector.selectedKeys();
-    		for (SelectionKey key : keys) {
-    			SocketChannel sock = (SocketChannel)key.channel();
-    			chkCom(sock);
-    		}
-    	} catch (IOException e) {
-    		e.printStackTrace();
-    	} catch (ClosedSelectorException e) {
-    		e.printStackTrace();
-    	}
+		return key_num;
+	}
+
+	public int sync_com(long mtimeout) 
+	throws IOException {
+		int key_num = 0;
+		Set<SelectionKey> keys;
+
+		while (q_top != null){
+			PSXQueue c = q_top;
+			c.Send();
+			q_top = c.next;
+			// psx_free(c);
+			// q_top = c = t;
+			qsize--;
+		}
 
-    	return key_num;
-    }
-
-// should be in PSXLinda, but sock->linda is unknown here
-
-    private void chkCom(SocketChannel sock)  throws IOException {
+		try {
+			key_num = selector.select(mtimeout);
+			keys = selector.selectedKeys();
+			for (SelectionKey key : keys) {
+				SocketChannel sock = (SocketChannel)key.channel();
+				chkCom(sock);
+			}
+		} catch (IOException e) {
+			e.printStackTrace();
+		} catch (ClosedSelectorException e) {
+			e.printStackTrace();
+		}
 
-    	int length;
-    	ByteBuffer command = ByteBuffer.allocate(LINDA_HEADER_SIZE);
-    	command.order(ByteOrder.BIG_ENDIAN);
-    	debug = false;
+		return key_num;
+	}
+
+//	should be in PSXLinda, but sock->linda is unknown here
 
-    	sock.read(command);
-    	command.rewind();
-    	length =  command.getInt(LINDA_DATA_LENGTH_OFFSET);
-    	if (length>0) {
-    	    ByteBuffer data = ByteBuffer.allocate(length);
-    	    int read = length;
-    	    if (debug) {
-    	    	System.out.print("reading:");
-    	    	System.out.println(length);
-    	    }
+	private void chkCom(SocketChannel sock)  throws IOException {
+
+		int length;
+		ByteBuffer command = ByteBuffer.allocate(LINDA_HEADER_SIZE);
+		command.order(ByteOrder.BIG_ENDIAN);
+		debug = false;
 
-    	    data.order(ByteOrder.BIG_ENDIAN);
-    	    while(read>0) {
-    		read -= sock.read(data);
-    	    }
-    	    data.rewind();
+		sock.read(command);
+		command.rewind();
+		length =  command.getInt(LINDA_DATA_LENGTH_OFFSET);
+		if (length>0) {
+			ByteBuffer data = ByteBuffer.allocate(length);
+			int read = length;
+			if (debug) {
+				System.out.print("reading:");
+				System.out.println(length);
+			}
+
+			data.order(ByteOrder.BIG_ENDIAN);
+			while(read>0) {
+				read -= sock.read(data);
+			}
+			data.rewind();
 
-    	    if (debug) {
-    	    	char id = (char)command.getShort(LINDA_ID_OFFSET);
-    	        System.out.println("LENGTH:"+command.getInt(LINDA_PACKET_LENGTH_OFFSET)+" "+
-    	        		"MODE:"+(char)command.get(LINDA_MODE_OFFSET)+" "+
-    	        		"ID:"+(int)id+" "+
-    	        		"SEQ:"+command.getInt(LINDA_SEQ_OFFSET)+" "+
-    	        		"DATA LENGTH:"+command.getInt(LINDA_DATA_LENGTH_OFFSET)+" ");
-    	    	System.out.println("DATA:"+data);
-    	        command.rewind();
-    	    }
-    	    //if (debug_com) {
-    	    String comdata ="";
-    	    CharBuffer chardata = data.asCharBuffer();
-    	    comdata = chardata.toString();
+			if (debug) {
+				char id = (char)command.getShort(LINDA_ID_OFFSET);
+				System.out.println("LENGTH:"+command.getInt(LINDA_PACKET_LENGTH_OFFSET)+" "+
+						"MODE:"+(char)command.get(LINDA_MODE_OFFSET)+" "+
+						"ID:"+(int)id+" "+
+						"SEQ:"+command.getInt(LINDA_SEQ_OFFSET)+" "+
+						"DATA LENGTH:"+command.getInt(LINDA_DATA_LENGTH_OFFSET)+" ");
+				System.out.println("DATA:"+data);
+				command.rewind();
+			}
+			//if (debug_com) {
+			String comdata ="";
+			CharBuffer chardata = data.asCharBuffer();
+			comdata = chardata.toString();
 
-    	    //System.out.println("Com_data =>");
-    	    System.out.println(comdata);
-    	    //}
-    	    /***if (debug) {
+			//System.out.println("Com_data =>");
+			System.out.println(comdata);
+			//}
+			/***if (debug) {
     	    	System.out.print("header:");
     	    	for(int i=0;i<LINDA_HEADER_SIZE;i++) {
     	    		System.out.println(command.get(i));
@@ -258,73 +258,74 @@
     	    	}
     	    	data.rewind();
     	    }***/
-    	    
-    	    int rseq = command.getInt(LINDA_SEQ_OFFSET);
-    	    int mode = command.get(LINDA_MODE_OFFSET);
-    	    Integer a;
-    	    /***
+
+			int rseq = command.getInt(LINDA_SEQ_OFFSET);
+			int mode = command.get(LINDA_MODE_OFFSET);
+			Integer a;
+			/***
     	    if (debug) {
     	    	System.out.print("mode = ");
     	    	System.out.println(mode);
     	    	System.out.print("seq = ");
     	    	System.out.println(rseq);
     	    }***/
-    	    try {
-    	    	PSXReply r = seqHash.get((a = new Integer(rseq)));
-    		if (debug) {
-    			System.out.print("hash value = ");
-    			System.out.println(a.hashCode());
-    		}
-    		
-    		r.setAnswer(mode,command,data);
+			try {
+				PSXReply r = seqHash.get((a = new Integer(rseq)));
+				seqHash.put(a, null);
+				if (debug) {
+					System.out.print("hash value = ");
+					System.out.println(a.hashCode());
+				}
+
+				r.setAnswer(mode,command,data);
 
-    		if (r.callback != null ) {
-    		    r.callback.callback(data);
-    		}
-    	    } catch (NullPointerException e ) {
-    	    	if (debug) {
-    	    		System.out.println("hashed reply not found");
-    	    	}
-    		// can't happen
-    		return ;
-    	    }
-        }
+				if (r.callback != null ) {
+					r.callback.callback(data);
+				}
+			} catch (NullPointerException e ) {
+				if (debug) {
+					System.out.println("hashed reply not found");
+				}
+				// can't happen
+				return ;
+			}
+		}
 	}
 
 	private void chkServe(SocketChannel sock) 
-				    throws IOException {
-	int length;
-	ByteBuffer command = ByteBuffer.allocate(LINDA_HEADER_SIZE);
-	command.order(ByteOrder.BIG_ENDIAN);
+	throws IOException {
+		int length;
+		ByteBuffer command = ByteBuffer.allocate(LINDA_HEADER_SIZE);
+		command.order(ByteOrder.BIG_ENDIAN);
 
-	sock.read(command);
-	command.rewind();
-	length =  command.getInt(LINDA_DATA_LENGTH_OFFSET);
-	if (length>0) {
-	    ByteBuffer data = ByteBuffer.allocate(length);
-	    int read = length;
-	    if (debug) {
-	    	System.out.print("reading:");
-	    	System.out.println(length);
-	    }
+		sock.read(command);
+		command.rewind();
+		length =  command.getInt(LINDA_DATA_LENGTH_OFFSET);
+		if (length>0) {
+			ByteBuffer data = ByteBuffer.allocate(length);
+			int read = length;
+			if (debug) {
+				System.out.print("reading:");
+				System.out.println(length);
+			}
 
-	    data.order(ByteOrder.BIG_ENDIAN);
-	    while(read>0) {
-		read -= sock.read(data);
-	    }
-	    data.rewind();
+			data.order(ByteOrder.BIG_ENDIAN);
+			while(read>0) {
+				read -= sock.read(data);
+			}
+			data.rewind();
 
-	    if (debug) {
-	    	char id = (char)command.getShort(LINDA_ID_OFFSET);
-	        System.out.println("LENGTH:"+command.getInt(LINDA_PACKET_LENGTH_OFFSET)+" "+
-	        		"MODE:"+(char)command.get(LINDA_MODE_OFFSET)+" "+
-	        		"ID:"+(int)id+" "+
-	        		"SEQ:"+command.getInt(LINDA_SEQ_OFFSET)+" "+
-	        		"DATA LENGTH:"+command.getInt(LINDA_DATA_LENGTH_OFFSET)+" ");
-	    	System.out.println("DATA:"+data);
-	        command.rewind();
-	    }
-	    /***if (debug) {
+			if (debug) {
+				char id = (char)command.getShort(LINDA_ID_OFFSET);
+				System.out.println("LENGTH:"+command.getInt(LINDA_PACKET_LENGTH_OFFSET)+" "+
+						"MODE:"+(char)command.get(LINDA_MODE_OFFSET)+" "+
+						"ID:"+(int)id+" "+
+						"SEQ:"+command.getInt(LINDA_SEQ_OFFSET)+" "+
+						"DATA LENGTH:"+command.getInt(LINDA_DATA_LENGTH_OFFSET)+" ");
+				System.out.println("DATA:"+data);
+				command.rewind();
+			}
+			/***if (debug) {
 	    	System.out.print("header:");
 	    	for(int i=0;i<LINDA_HEADER_SIZE;i++) {
 	    		System.out.println(command.get(i));
@@ -335,38 +336,39 @@
 	    	}
 	    	data.rewind();
 	    }***/
-	    
-	    int rseq = command.getInt(LINDA_SEQ_OFFSET);
-	    int mode = command.get(LINDA_MODE_OFFSET);
-	    Integer a;
-	    /***
+
+			int rseq = command.getInt(LINDA_SEQ_OFFSET);
+			int mode = command.get(LINDA_MODE_OFFSET);
+			Integer a;
+			/***
 	    if (debug) {
 	    	System.out.print("mode = ");
 	    	System.out.println(mode);
 	    	System.out.print("seq = ");
 	    	System.out.println(rseq);
 	    }***/
-	    try {
-	    	PSXReply r = seqHash.get((a = new Integer(rseq)));
-		if (debug) {
-			System.out.print("hash value = ");
-			System.out.println(a.hashCode());
-		}
-		
-		r.setAnswer(mode,command,data);
+			try {
+				PSXReply r = seqHash.get((a = new Integer(rseq)));
+				seqHash.put(a, null);
+				if (debug) {
+					System.out.print("hash value = ");
+					System.out.println(a.hashCode());
+				}
+
+				r.setAnswer(mode,command,data);
 
-		if (r.callback != null ) {
-		    r.callback.callback(data);
+				if (r.callback != null ) {
+					r.callback.callback(data);
+				}
+			} catch (NullPointerException e ) {
+				if (debug) {
+					System.out.println("hashed reply not found");
+				}
+				// can't happen
+				return ;
+			}
 		}
-	    } catch (NullPointerException e ) {
-	    	if (debug) {
-	    		System.out.println("hashed reply not found");
-	    	}
-		// can't happen
-		return ;
-	    }
-    }
-    }
+	}
 }
 
 /* end */