Mercurial > hg > FederatedLinda
view src/fdl/test/TestWaitRd.java @ 97:0ea086f0e96f fuchita
main loop modification, for easy meta engine addition.
add comments.
author | one |
---|---|
date | Wed, 26 May 2010 10:49:50 +0900 |
parents | 96c63bc659d4 |
children |
line wrap: on
line source
package fdl.test; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; import fdl.FDLindaServ; import fdl.FederatedLinda; import fdl.PSX; import fdl.PSXLinda; import fdl.PSXReply; public class TestWaitRd { public FederatedLinda fdl; public FDLindaServ fds; public static final int PORT = 10000; class LindaServer implements Runnable { public int id; public LindaServer(int id) { this.id = id; } public void run() { String[] args = {/* "-d",*/"-p",Integer.toString(PORT)}; FDLindaServ.main(args); } } class Sender implements Runnable { public String id; public Sender(int id) { this.id = "Sender"+id; } public void run() { String[] args = {id}; sleep(2000); main(args); } public void main(String[] arg) { try { PSXLinda psx = openLinda(id); sleep(1000); sendData(psx,1,0); psx.sync(1000); in_wait(psx,1); sendData(psx,1,1); psx.sync(1000); in_wait(psx,1); sendData(psx,1,2); psx.sync(1000); in_wait(psx,1); for(int i=3;i<10;i++) { sendData(psx,1,i); psx.sync(1000); in_wait(psx,1); } sleep(1000); } catch (IOException e) { e.printStackTrace(); } } } class Reader implements Runnable { public String id; public Reader(int id) { this.id = "Reader"+id; } public void run() { String[] args = {id}; sleep(2000); main(args); } public void main(String[] arg) { final String name = arg[0]; try { PSXLinda psx = openLinda(id); PSXReply reply = psx.waitRd(1); read_wait(psx, reply,name+": First read"); PSXReply reply1 = psx.waitRd(1); read_wait(psx, reply1,name+": 2nd read"); for(int i=3;i<10;i++) { PSXReply replyn = psx.waitRd(1); read_wait(psx, replyn,name+": "+i+"th read"); } ByteBuffer data = ByteBuffer.allocate(10); psx.out(PSX.META_STOP, data); psx.sync(1); } catch (IOException e) { System.err.println(name+": Communication failure."); } } } public synchronized void sleep(int time) { try { wait(time); } catch (InterruptedException e) { e.printStackTrace(); } } public void read_wait(PSXLinda psx, PSXReply reply, String mesg) throws IOException { while(!reply.ready()) psx.sync(10); System.out.println(mesg); System.out.println(reply.getData().getInt()); System.out.println(""); } public PSXLinda openLinda(String name) throws IOException { FederatedLinda fdl; PSXLinda psx; PSXReply r; String host; InetSocketAddress localAddress; int port = PORT; try { localAddress = new InetSocketAddress(InetAddress.getLocalHost(), port); host = localAddress.getHostName(); } catch (UnknownHostException e) { host = "localhost"; } fdl = FederatedLinda.init(); psx = fdl.open(host,port); r = psx.in(65535); fdl.sync(1); System.out.println(name+": Connected."); int cnt=0; while(!r.ready()) { // psx.sync(1000); psx.sync(10); System.out.println(name+": Waiting...."+(cnt++)); } print_id(r); return psx; } public ByteBuffer sendData(PSXLinda psx,int id, int n) { ByteBuffer data = ByteBuffer.allocate(10); data.putInt(n); data.flip(); psx.out(id,data); return data; } public void in_wait(PSXLinda psx, int i) throws IOException { PSXReply r = psx.in(i); while(! r.ready()) { psx.sync(10); } return; } public void print_id (PSXReply ans) throws IOException { ByteBuffer r = ans.getData(); System.out.print("ID = "); System.out.write(r.array()); System.out.println(""); } public static void main(String[] arg) throws InterruptedException { TestWaitRd me = new TestWaitRd(); me.test1(); } public void test1() throws InterruptedException { Thread l1 = new Thread(new LindaServer(1)); Thread r1 = new Thread(new Reader(1)); Thread r2 = new Thread(new Reader(2)); Thread s1 = new Thread(new Sender(1)); l1.start(); s1.start(); r1.start(); r2.start(); s1.join(); r1.join(); r2.join(); l1.join(); } }