Mercurial > hg > FederatedLinda
view src/fdl/FederatedLinda.java @ 33:64071f8e2e0d
*** empty log message ***
author | kono |
---|---|
date | Sun, 24 Aug 2008 03:23:08 +0900 |
parents | 7e0f6f00763e |
children | e7c5958fd285 |
line wrap: on
line source
/* * @(#)FederatedLinda.java 1.1 06/04/01 * * Copyright 2006 Shinji KONO * PSX Lidna Trasport layer of PSX Linda library */ package fdl; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.ClosedSelectorException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Hashtable; import java.util.Iterator; /** FederatedLinda * * @author Shinji Kono * * @param mytsid Tuple Space ID Initialize connection channel for a tuple space one instance for each Tuple space connection */ public class FederatedLinda { static FederatedLinda fdl = new FederatedLinda(); static int MAX_SEQUENCE = 2048; static boolean debug = false; public int tid; public int seq; public int qsize; public PSXLinda linda; public Selector selector; public PSXQueue q_top,q_end; public PSXReply r_top,r_end; public Hashtable<Integer,PSXReply> seqHash; public static FederatedLinda init() { return fdl; } private FederatedLinda() { try { selector = Selector.open(); } catch (IOException e) { e.printStackTrace(); } seqHash = new Hashtable<Integer, PSXReply>(); } public PSXLinda open(String _host,int _port) throws IOException { tid++; PSXLindaImpl newlinda = new PSXLindaImpl(this,tid,_host,_port); linda = newlinda.add(linda); return linda; } public PSXReply psx_queue(PSXLinda linda, int id, ByteBuffer s, int mode, PSXCallback callback) { PSXQueue c = new PSXQueue(linda,id,mode,s,callback); if (q_top == null) { c = q_end = q_top = c; } else { q_end.next = c; q_end = c; } qsize++; if (mode != PSX.PSX_OUT) { PSXReply p = new PSXReply(PSX.PSX_REPLY,callback); p.seq = seq(p); c.setSeq(p.seq); if (r_top == null){ r_end = r_top = p; } else { r_end.next = p; r_end = p; } return p; } return null; } public int seq(PSXReply reply) { Integer s; do { seq++; if (seq>MAX_SEQUENCE) { seq = 0; } s = new Integer(seq); } while (seqHash.containsKey(s)); if (debug) { System.out.print("hash value = "); System.out.println(s.hashCode()); } seqHash.put(s,reply); seq++; return seq-1; } public Selector selector() { return selector; } public int sync() throws IOException { return sync(0); } public int sync(long mtimeout) throws IOException { int key_num = 0; while (q_top != null){ PSXQueue c = q_top; c.send(); q_top = c.next; qsize--; } try { if (selector.select(mtimeout)>0) { for (Iterator<SelectionKey> it = selector.selectedKeys().iterator();it.hasNext(); ) { SelectionKey s = it.next(); it.remove(); try { if (!s.isReadable()) throw new IOException(); chkServe((SocketChannel)s.channel()); } catch (IOException e) { selector.selectedKeys().remove(s); System.err.println(""+s.channel()+" is closed."); } } } } catch (IOException e) { e.printStackTrace(); } catch (ClosedSelectorException e) { e.printStackTrace(); } return key_num; } private void chkServe(SocketChannel sock) throws IOException { ByteBuffer command = ByteBuffer.allocate(PSX.LINDA_HEADER_SIZE); command.order(ByteOrder.BIG_ENDIAN); ByteBuffer data = PSX.receivePacket(sock, command); if (debug) { PSX.printCommand("chkServe:",command, data); } int rseq = command.getInt(PSX.LINDA_SEQ_OFFSET); int mode = command.get(PSX.LINDA_MODE_OFFSET); PSXReply r = getReply(rseq); if (r==null) { System.err.println("Illegal answer sequence."); return; } r.setAnswer(mode,command,data); if (r.callback != null ) { r.callback.callback(data); } } private PSXReply getReply(int rseq) { Integer a; PSXReply r = seqHash.get((a = new Integer(rseq))); seqHash.remove(a); return r; } } /* end */