view src/test/MulticastQueue.java @ 145:8bdbed4c4505

add template.txt
author e085711
date Mon, 05 Sep 2011 06:03:14 +0900
parents 9250cacee347
children
line wrap: on
line source

package test;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.concurrent.CountDownLatch;

public class MulticastQueue<T>
{
	public static void main(String args[]) throws IOException
	{
		int threads = 5;
		final MulticastQueue<String> queue = new MulticastQueue<String>();
		
		Runnable type2 = new Runnable(){

			public void run()
			{
				Client<String> client = queue.newClient();
				
				for(;;){
					String str = client.poll();
					try {
						Thread.sleep(10000);
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
					System.out.println(Thread.currentThread().getName()+":"+str);
				}
			}
		};
		
		Runnable thread = new Runnable(){

			public void run()
			{
				Client<String> client = queue.newClient();
				
				for(;;){
					String str = client.poll();
					System.out.println(Thread.currentThread().getName()+":"+str);
				}
			}
		};
		
		for(int i = 0;i < threads;i ++){
			new Thread(thread).start();
		}
		new Thread(type2).start();
		
		BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
		for(;;){
			String str = br.readLine();
			queue.put(str);
		}
	}
	
	
	Node<T> tail;
	
	public MulticastQueue()
	{
		tail = new Node<T>(null);
	}
	
	public synchronized void put(T item)
	{
		Node<T> next = new Node<T>(item);
		tail.set(next);
		tail = next;
	}
	
	public Client<T> newClient()
	{
		return new Client<T>(tail);
	}
	
	static class Client<T>
	{
		Node<T> node;
		
		Client(Node<T> tail)
		{
			node = tail;
		}
		
		public T poll()
		{
			Node<T> next = null;
			
			try {
				next = node.next();
			}catch(InterruptedException _e){
				_e.printStackTrace();
			}
			node = next;
			return next.item;
		}
	}
	
	private static class Node<T>
	{
		private T item;
		private Node<T> next;
		private CountDownLatch latch;
		
		public Node(T item)
		{
			this.item = item;
			this.next = null;
			latch = new CountDownLatch(1);
		}
		
		public void set(Node<T> next)
		{
			this.next = next;
			latch.countDown();
		}
		
		public Node<T> next() throws InterruptedException
		{
			latch.await();
			return next;
		}
	}
}