annotate src/test/MulticastQueue.java @ 52:6eb7d0c8f11d

added MulticastQueue
author k118585
date Thu, 07 Jul 2011 21:13:09 +0900
parents
children 9250cacee347
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
52
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
1 package test;
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
2
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
3 import java.io.BufferedReader;
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
4 import java.io.IOException;
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
5 import java.io.InputStreamReader;
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
6 import java.util.concurrent.CountDownLatch;
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
7
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
8 public class MulticastQueue<T>
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
9 {
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
10 public static void main(String args[]) throws IOException
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
11 {
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
12 int threads = 5;
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
13 final MulticastQueue<String> queue = new MulticastQueue<String>();
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
14
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
15 Runnable type2 = new Runnable(){
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
16
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
17 @Override
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
18 public void run()
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
19 {
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
20 Client<String> client = queue.newClient();
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
21
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
22 for(;;){
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
23 String str = client.poll();
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
24 try {
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
25 Thread.sleep(10000);
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
26 } catch (InterruptedException e) {
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
27 // TODO Auto-generated catch block
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
28 e.printStackTrace();
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
29 }
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
30 System.out.println(Thread.currentThread().getName()+":"+str);
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
31 }
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
32 }
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
33 };
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
34
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
35 Runnable thread = new Runnable(){
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
36
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
37 @Override
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
38 public void run()
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
39 {
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
40 Client<String> client = queue.newClient();
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
41
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
42 for(;;){
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
43 String str = client.poll();
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
44 System.out.println(Thread.currentThread().getName()+":"+str);
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
45 }
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
46 }
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
47 };
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
48
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
49 for(int i = 0;i < threads;i ++){
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
50 new Thread(thread).start();
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
51 }
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
52 new Thread(type2).start();
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
53
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
54 BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
55 for(;;){
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
56 String str = br.readLine();
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
57 queue.put(str);
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
58 }
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
59 }
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
60
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
61
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
62 Node<T> tail;
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
63
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
64 public MulticastQueue()
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
65 {
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
66 tail = new Node<T>(null);
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
67 }
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
68
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
69 public synchronized void put(T item)
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
70 {
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
71 Node<T> next = new Node<T>(item);
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
72 tail.set(next);
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
73 tail = next;
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
74 }
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
75
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
76 public Client<T> newClient()
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
77 {
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
78 return new Client<T>(tail);
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
79 }
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
80
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
81 static class Client<T>
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
82 {
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
83 Node<T> node;
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
84
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
85 Client(Node<T> tail)
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
86 {
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
87 node = tail;
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
88 }
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
89
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
90 public T poll()
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
91 {
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
92 Node<T> next = null;
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
93
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
94 try {
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
95 next = node.next();
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
96 }catch(InterruptedException _e){
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
97 _e.printStackTrace();
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
98 }
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
99 node = next;
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
100 return next.item;
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
101 }
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
102 }
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
103
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
104 private static class Node<T>
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
105 {
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
106 private T item;
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
107 private Node<T> next;
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
108 private CountDownLatch latch;
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
109
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
110 public Node(T item)
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
111 {
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
112 this.item = item;
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
113 this.next = null;
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
114 latch = new CountDownLatch(1);
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
115 }
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
116
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
117 public void set(Node<T> next)
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
118 {
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
119 this.next = next;
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
120 latch.countDown();
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
121 }
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
122
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
123 public Node<T> next() throws InterruptedException
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
124 {
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
125 latch.await();
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
126 return next;
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
127 }
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
128 }
6eb7d0c8f11d added MulticastQueue
k118585
parents:
diff changeset
129 }