# HG changeset patch # User Masataka Kohagura # Date 1389071751 -32400 # Node ID d137f1823794577fa4bdd5a8c814fa5d7f93370d # Parent 28cac8b199cbb047b960202c15fa2116dd978e4d fix (but cannot running) diff -r 28cac8b199cb -r d137f1823794 parallel_processing/ppb_cond_queue/ppb_cond_queue.c --- a/parallel_processing/ppb_cond_queue/ppb_cond_queue.c Tue Jan 07 00:37:27 2014 +0900 +++ b/parallel_processing/ppb_cond_queue/ppb_cond_queue.c Tue Jan 07 14:15:51 2014 +0900 @@ -1,57 +1,93 @@ #include +#include +#include +#include #include -#include +#include "ppb_queue.h" + +#define THREAD_NUM 2 +#define DATA_NUM 10 +#define MAX_QUEUE_NUM 3 +#define THREAD_DATA_NUM (DATA_NUM / THREAD_NUM) -#define MAX_THREAD_NUM 2 -#define THREAD_NUM 5 +extern void enqueue(queue_t, int); +extern void dequeue(queue_t, int); -pthread_mutex_t mutex; -pthread_cond_t cond; -int thread_num = 0; +typedef struct _queue { + int values[MAX_QUEUE_NUM]; + volatile int remain; + int rp, wp; + pthread_mutex_t mutex; + pthread_cond_t not_full; + pthread_cond_t not_empty; +} queue_t; + +typedef struct _thread_arg { + int id; + queue_t *queue; +} thread_arg_t; void * -thread_func(void *arg) +producer_func(void *arg) { - long id = (long)arg; - - pthread_mutex_lock(&mutex); - while (thread_num >= MAX_THREAD_NUM) - pthread_cond_wait(&cond, &mutex); - thread_num++; - pthread_mutex_unlock(&mutex); + thread_arg_t *targ = (thread_arg_t*)arg; - printf("Thread %ld started.\n", id); - sleep(1); - printf("Thread %ld finished.\n", id); + for (int i = 0; i < THREAD_DATA_NUM; i++) { + int num = targ->id * THREAD_DATA_NUM + 1; + enqueue(targ->queue, num); + printf("[Producer %d] ==> %d \n", targ->id, num); + sleep(rand() % 3); + } + enqueue(targ->queue, END_DATA); + return 0; +} - pthread_mutex_lock(&mutex); - thread_num--; - pthread_cond_signal(&cond); - pthread_mutex_unlock(&mutex); - +void * +consumer_func(void *arg) +{ + thread_arg_t *targ = (thread_arg_t*)arg; + int i; + while (1) { + dequeue(targ->queue, &i); + if (i == END_DATA) break; + printf("[Consumer %d] ==> %d \n", targ->id, i); + sleep(rand() % 3); + } return 0; } int main() { - long i; - pthread_t handle[THREAD_NUM]; + pthread_t producer[THREAD_NUM], consumer[THREAD_NUM]; + thread_arg_t ptarg[THREAD_NUM], ctarg[THREAD_NUM]; + queue_t queue; + int i; /* initialize */ - pthread_mutex_init(&mutex, NULL); - pthread_cond_init(&cond, NULL); + queue.rp = queue.wp = 0; + queue.remain = 0; + pthread_mutex_init(&queue.mutex, NULL); + pthread_cond_init(&queue.not_full, NULL); + pthread_cond_init(&queue.not_empty, NULL); - /* spawn thread a number of THREAD_NUM */ - for (i = 0; i < THREAD_NUM; ++i) - pthread_create(&handle[i], NULL, &thread_func, (void*)i); + /* spawn Producer thread */ + for (i = 0; i < THREAD_NUM; ++i) { + ptarg[i].id = i; + ptarg[i].queue = &queue; + pthread_create(&consumer[i], NULL, &producer_func, &ptarg[i]); + } + + /* spawn Consumer thread */ + for (i = 0; i < THREAD_NUM; ++i) { + ctarg[i].id = i; + ctarg[i].queue = &queue; + pthread_create(&consumer[i], NULL, &consumer_func, &ctarg[i]); + } /* wait for running all thread */ for (i = 0; i < THREAD_NUM; ++i) - pthread_join(handle[i], NULL); - - /* destroy mutex*/ - pthread_cond_destroy(&cond); + pthread_join(consumer[i], NULL); return 0; } diff -r 28cac8b199cb -r d137f1823794 parallel_processing/ppb_cond_queue/ppb_queue.h --- a/parallel_processing/ppb_cond_queue/ppb_queue.h Tue Jan 07 00:37:27 2014 +0900 +++ b/parallel_processing/ppb_cond_queue/ppb_queue.h Tue Jan 07 14:15:51 2014 +0900 @@ -7,6 +7,7 @@ #define MAX_QUEUE_NUM 10 #endif /* MAX_QUEUE_NUM */ +/* typedef struct _queue { int values[MAX_QUEUE_NUM]; volatile int remain; @@ -15,11 +16,12 @@ pthread_cond_t not_full; pthread_cond_t not_empty; } queue_t; +*/ void enqueue(queue_t *q, int v) { pthread_mutex_lock(&q->mutex); while (q->remain == MAX_QUEUE_NUM) - pthread_cond_wait(&q->not_fill, &q->mutex); + pthread_cond_wait(&q->not_full, &q->mutex); q->values[q->wp] = v; q->wp++; q->remain++; if (q->wp == MAX_QUEUE_NUM) q->wp = 0;