# HG changeset patch # User k118585 # Date 1310040789 -32400 # Node ID 6eb7d0c8f11d720b16661b772533d5ecc222de6b # Parent 089bd4510538b9c7259e7db76658dab8e6f5e967 added MulticastQueue diff -r 089bd4510538 -r 6eb7d0c8f11d src/test/MulticastQueue.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/test/MulticastQueue.java Thu Jul 07 21:13:09 2011 +0900 @@ -0,0 +1,129 @@ +package test; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.concurrent.CountDownLatch; + +public class MulticastQueue +{ + public static void main(String args[]) throws IOException + { + int threads = 5; + final MulticastQueue queue = new MulticastQueue(); + + Runnable type2 = new Runnable(){ + + @Override + public void run() + { + Client client = queue.newClient(); + + for(;;){ + String str = client.poll(); + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + System.out.println(Thread.currentThread().getName()+":"+str); + } + } + }; + + Runnable thread = new Runnable(){ + + @Override + public void run() + { + Client client = queue.newClient(); + + for(;;){ + String str = client.poll(); + System.out.println(Thread.currentThread().getName()+":"+str); + } + } + }; + + for(int i = 0;i < threads;i ++){ + new Thread(thread).start(); + } + new Thread(type2).start(); + + BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); + for(;;){ + String str = br.readLine(); + queue.put(str); + } + } + + + Node tail; + + public MulticastQueue() + { + tail = new Node(null); + } + + public synchronized void put(T item) + { + Node next = new Node(item); + tail.set(next); + tail = next; + } + + public Client newClient() + { + return new Client(tail); + } + + static class Client + { + Node node; + + Client(Node tail) + { + node = tail; + } + + public T poll() + { + Node next = null; + + try { + next = node.next(); + }catch(InterruptedException _e){ + _e.printStackTrace(); + } + node = next; + return next.item; + } + } + + private static class Node + { + private T item; + private Node next; + private CountDownLatch latch; + + public Node(T item) + { + this.item = item; + this.next = null; + latch = new CountDownLatch(1); + } + + public void set(Node next) + { + this.next = next; + latch.countDown(); + } + + public Node next() throws InterruptedException + { + latch.await(); + return next; + } + } +}