comparison parallel_processing/ppb_cond_queue/ppb_cond_queue.c @ 21:d137f1823794

fix (but cannot running)
author Masataka Kohagura <e085726@ie.u-ryukyu.ac.jp>
date Tue, 07 Jan 2014 14:15:51 +0900
parents a9534f217a0c
children
comparison
equal deleted inserted replaced
20:28cac8b199cb 21:d137f1823794
1 #include <stdio.h> 1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <unistd.h>
4 #include <stdbool.h>
2 #include <pthread.h> 5 #include <pthread.h>
3 #include <unistd.h> 6 #include "ppb_queue.h"
4 7
5 #define MAX_THREAD_NUM 2 8 #define THREAD_NUM 2
6 #define THREAD_NUM 5 9 #define DATA_NUM 10
10 #define MAX_QUEUE_NUM 3
11 #define THREAD_DATA_NUM (DATA_NUM / THREAD_NUM)
7 12
8 pthread_mutex_t mutex; 13 extern void enqueue(queue_t, int);
9 pthread_cond_t cond; 14 extern void dequeue(queue_t, int);
10 int thread_num = 0; 15
16 typedef struct _queue {
17 int values[MAX_QUEUE_NUM];
18 volatile int remain;
19 int rp, wp;
20 pthread_mutex_t mutex;
21 pthread_cond_t not_full;
22 pthread_cond_t not_empty;
23 } queue_t;
24
25 typedef struct _thread_arg {
26 int id;
27 queue_t *queue;
28 } thread_arg_t;
11 29
12 void * 30 void *
13 thread_func(void *arg) 31 producer_func(void *arg)
14 { 32 {
15 long id = (long)arg; 33 thread_arg_t *targ = (thread_arg_t*)arg;
16 34
17 pthread_mutex_lock(&mutex); 35 for (int i = 0; i < THREAD_DATA_NUM; i++) {
18 while (thread_num >= MAX_THREAD_NUM) 36 int num = targ->id * THREAD_DATA_NUM + 1;
19 pthread_cond_wait(&cond, &mutex); 37 enqueue(targ->queue, num);
20 thread_num++; 38 printf("[Producer %d] ==> %d \n", targ->id, num);
21 pthread_mutex_unlock(&mutex); 39 sleep(rand() % 3);
40 }
41 enqueue(targ->queue, END_DATA);
42 return 0;
43 }
22 44
23 printf("Thread %ld started.\n", id); 45 void *
24 sleep(1); 46 consumer_func(void *arg)
25 printf("Thread %ld finished.\n", id); 47 {
26 48 thread_arg_t *targ = (thread_arg_t*)arg;
27 pthread_mutex_lock(&mutex); 49 int i;
28 thread_num--; 50 while (1) {
29 pthread_cond_signal(&cond); 51 dequeue(targ->queue, &i);
30 pthread_mutex_unlock(&mutex); 52 if (i == END_DATA) break;
31 53 printf("[Consumer %d] ==> %d \n", targ->id, i);
54 sleep(rand() % 3);
55 }
32 return 0; 56 return 0;
33 } 57 }
34 58
35 int 59 int
36 main() 60 main()
37 { 61 {
38 long i; 62 pthread_t producer[THREAD_NUM], consumer[THREAD_NUM];
39 pthread_t handle[THREAD_NUM]; 63 thread_arg_t ptarg[THREAD_NUM], ctarg[THREAD_NUM];
64 queue_t queue;
65 int i;
40 66
41 /* initialize */ 67 /* initialize */
42 pthread_mutex_init(&mutex, NULL); 68 queue.rp = queue.wp = 0;
43 pthread_cond_init(&cond, NULL); 69 queue.remain = 0;
70 pthread_mutex_init(&queue.mutex, NULL);
71 pthread_cond_init(&queue.not_full, NULL);
72 pthread_cond_init(&queue.not_empty, NULL);
44 73
45 /* spawn thread a number of THREAD_NUM */ 74 /* spawn Producer thread */
46 for (i = 0; i < THREAD_NUM; ++i) 75 for (i = 0; i < THREAD_NUM; ++i) {
47 pthread_create(&handle[i], NULL, &thread_func, (void*)i); 76 ptarg[i].id = i;
77 ptarg[i].queue = &queue;
78 pthread_create(&consumer[i], NULL, &producer_func, &ptarg[i]);
79 }
80
81 /* spawn Consumer thread */
82 for (i = 0; i < THREAD_NUM; ++i) {
83 ctarg[i].id = i;
84 ctarg[i].queue = &queue;
85 pthread_create(&consumer[i], NULL, &consumer_func, &ctarg[i]);
86 }
48 87
49 /* wait for running all thread */ 88 /* wait for running all thread */
50 for (i = 0; i < THREAD_NUM; ++i) 89 for (i = 0; i < THREAD_NUM; ++i)
51 pthread_join(handle[i], NULL); 90 pthread_join(consumer[i], NULL);
52
53 /* destroy mutex*/
54 pthread_cond_destroy(&cond);
55 91
56 return 0; 92 return 0;
57 } 93 }