view src/main/java/jp/ac/u_ryukyu/treevnc/MulticastQueue.java @ 492:57e0d052b126

add blockedReadSendData
author oshiro
date Fri, 01 Feb 2019 16:45:56 +0900
parents 7ef4ac588459
children c4d1a275b7d5
line wrap: on
line source

package jp.ac.u_ryukyu.treevnc;

import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;

public class MulticastQueue<T>
{

	Node<T> tail;
	
	 /**
     * Multicastcast Queue
     *      Pass a data to multiple clients.
     *          element Node T  
     *      Another time out thread should be used to limit the total size.
     */
	public MulticastQueue()
	{
		tail = new Node<T>(null);
	}
	
	/**
	 * @param size
	 * @return
	 * 
	 * try to allocate byteBuffer.
	 * wait until heap is available.
	 */
	public ByteBuffer allocate(int size)
	{
		ByteBuffer b=null;
		while(true){
			try {
				b = ByteBuffer.allocate(size);
			} catch (OutOfMemoryError e) {
			   b = null;
			   System.err.println("multicastqueue : wait for heap : " + e);
			}
			if (b!=null) {
				break;
			}
			try {
				wait();
			} catch (InterruptedException e) {
                System.out.println("thread has interrupted the current thread.");
			}
		}		
		return b;
	}

	public synchronized void heapAvailable() {
		notifyAll();
	}
	
	/**
	 * put item to the queue
	 *   all client threads start read it
	 * @param item
	 */
	public synchronized void put(T item)
	{
		Node<T> next = new Node<T>(item);
		tail.set(next);
		tail = next;
	}

	/**
	 * waitput item to the queue
	 *   all client threads start read it
	 * @param item
	 */
	public synchronized void waitput(T item) throws InterruptedException
	{
		Node<T> next = new Node<T>(item);
		tail.set(next);
		tail = next;
		wait(); //wait for send completion
	}
	
	/**
	 * register new clients. Clients read this queue, if all clients read the queue, item is removed
	 * @return
	 */
	public Client<T> newClient()
	{
		return new Client<T>(tail);
	}
	
	/**
	 * @author kono
	 * Inner Client class
	 * @param <T>
	 */
	public static class Client<T>
	{
		Node<T> node;
		
		Client(Node<T> tail)
		{
			node = tail;
		}
		
		/**
		 * try to read next item, if not available, wait for the next item
		 * All clients wait for a CountDownLatch in the next item.
		 * set operation count down it, and all clients get the item.
		 * @return
		 */
		public T poll()
		{
			Node<T> next = null;
			T item = null;
			do {
				try {
					next = node.next();
				} catch(InterruptedException _e) {
                    System.out.println("thread has interrupted the current thread.");
					continue;
				}
				item = next.getItem();				
				node = next;
			} while ( item == null);
			return item;
		}
	}
	
	static class Node<T>
	{
		private T item;
		private Node<T> next;
		private CountDownLatch latch;
		
		public Node(T item)
		{
			this.item = item;
			this.next = null;
			latch = new CountDownLatch(1);
		}
		
		public T getItem() {
			return item;
		}

		public void set(Node<T> next)
		{
			this.next = next;
			latch.countDown();
		}
		
		public Node<T> next() throws InterruptedException
		{
			latch.await();
			return next;
		}

		public void clear() {
			item = null;
		}
	}
}