12
|
1 package jp.ac.u_ryukyu.treevnc;
|
3
|
2
|
74
|
3 import java.nio.ByteBuffer;
|
518
|
4 import java.nio.ByteOrder;
|
3
|
5 import java.util.concurrent.CountDownLatch;
|
|
6
|
|
7 public class MulticastQueue<T>
|
|
8 {
|
86
|
9
|
3
|
10 Node<T> tail;
|
|
11
|
86
|
12 /**
|
|
13 * Multicastcast Queue
|
|
14 * Pass a data to multiple clients.
|
|
15 * element Node T
|
|
16 * Another time out thread should be used to limit the total size.
|
|
17 */
|
3
|
18 public MulticastQueue()
|
|
19 {
|
|
20 tail = new Node<T>(null);
|
|
21 }
|
74
|
22
|
|
23 /**
|
|
24 * @param size
|
|
25 * @return
|
|
26 *
|
|
27 * try to allocate byteBuffer.
|
|
28 * wait until heap is available.
|
|
29 */
|
|
30 public ByteBuffer allocate(int size)
|
|
31 {
|
|
32 ByteBuffer b=null;
|
|
33 while(true){
|
76
|
34 try {
|
74
|
35 b = ByteBuffer.allocate(size);
|
76
|
36 } catch (OutOfMemoryError e) {
|
|
37 b = null;
|
|
38 System.err.println("multicastqueue : wait for heap : " + e);
|
74
|
39 }
|
76
|
40 if (b!=null) {
|
74
|
41 break;
|
|
42 }
|
|
43 try {
|
|
44 wait();
|
|
45 } catch (InterruptedException e) {
|
354
|
46 System.out.println("thread has interrupted the current thread.");
|
74
|
47 }
|
|
48 }
|
|
49 return b;
|
|
50 }
|
3
|
51
|
74
|
52 public synchronized void heapAvailable() {
|
|
53 notifyAll();
|
|
54 }
|
|
55
|
86
|
56 /**
|
|
57 * put item to the queue
|
|
58 * all client threads start read it
|
|
59 * @param item
|
|
60 */
|
3
|
61 public synchronized void put(T item)
|
|
62 {
|
|
63 Node<T> next = new Node<T>(item);
|
|
64 tail.set(next);
|
|
65 tail = next;
|
|
66 }
|
492
|
67
|
|
68 /**
|
|
69 * waitput item to the queue
|
|
70 * all client threads start read it
|
|
71 * @param item
|
|
72 */
|
|
73 public synchronized void waitput(T item) throws InterruptedException
|
|
74 {
|
|
75 Node<T> next = new Node<T>(item);
|
|
76 tail.set(next);
|
|
77 tail = next;
|
|
78 wait(); //wait for send completion
|
|
79 }
|
3
|
80
|
86
|
81 /**
|
|
82 * register new clients. Clients read this queue, if all clients read the queue, item is removed
|
|
83 * @return
|
|
84 */
|
3
|
85 public Client<T> newClient()
|
|
86 {
|
|
87 return new Client<T>(tail);
|
|
88 }
|
|
89
|
86
|
90 /**
|
|
91 * @author kono
|
|
92 * Inner Client class
|
|
93 * @param <T>
|
|
94 */
|
4
|
95 public static class Client<T>
|
3
|
96 {
|
|
97 Node<T> node;
|
|
98
|
|
99 Client(Node<T> tail)
|
|
100 {
|
|
101 node = tail;
|
|
102 }
|
|
103
|
86
|
104 /**
|
|
105 * try to read next item, if not available, wait for the next item
|
|
106 * All clients wait for a CountDownLatch in the next item.
|
|
107 * set operation count down it, and all clients get the item.
|
|
108 * @return
|
|
109 */
|
|
110 public T poll()
|
3
|
111 {
|
|
112 Node<T> next = null;
|
|
113 T item = null;
|
|
114 do {
|
|
115 try {
|
|
116 next = node.next();
|
354
|
117 } catch(InterruptedException _e) {
|
|
118 System.out.println("thread has interrupted the current thread.");
|
3
|
119 continue;
|
|
120 }
|
|
121 item = next.getItem();
|
|
122 node = next;
|
|
123 } while ( item == null);
|
|
124 return item;
|
|
125 }
|
|
126 }
|
|
127
|
|
128 static class Node<T>
|
|
129 {
|
|
130 private T item;
|
|
131 private Node<T> next;
|
|
132 private CountDownLatch latch;
|
|
133
|
|
134 public Node(T item)
|
|
135 {
|
|
136 this.item = item;
|
|
137 this.next = null;
|
|
138 latch = new CountDownLatch(1);
|
|
139 }
|
|
140
|
86
|
141 public T getItem() {
|
3
|
142 return item;
|
|
143 }
|
|
144
|
|
145 public void set(Node<T> next)
|
|
146 {
|
|
147 this.next = next;
|
|
148 latch.countDown();
|
|
149 }
|
|
150
|
|
151 public Node<T> next() throws InterruptedException
|
|
152 {
|
|
153 latch.await();
|
|
154 return next;
|
|
155 }
|
|
156
|
86
|
157 public void clear() {
|
3
|
158 item = null;
|
|
159 }
|
|
160 }
|
|
161 }
|