changeset 21:d137f1823794

fix (but cannot running)
author Masataka Kohagura <e085726@ie.u-ryukyu.ac.jp>
date Tue, 07 Jan 2014 14:15:51 +0900
parents 28cac8b199cb
children 508b47c8f4d8
files parallel_processing/ppb_cond_queue/ppb_cond_queue.c parallel_processing/ppb_cond_queue/ppb_queue.h
diffstat 2 files changed, 72 insertions(+), 34 deletions(-) [+]
line wrap: on
line diff
--- a/parallel_processing/ppb_cond_queue/ppb_cond_queue.c	Tue Jan 07 00:37:27 2014 +0900
+++ b/parallel_processing/ppb_cond_queue/ppb_cond_queue.c	Tue Jan 07 14:15:51 2014 +0900
@@ -1,57 +1,93 @@
 #include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <stdbool.h>
 #include <pthread.h>
-#include <unistd.h>
+#include "ppb_queue.h"
+
+#define THREAD_NUM 2
+#define DATA_NUM 10
+#define MAX_QUEUE_NUM 3
+#define THREAD_DATA_NUM (DATA_NUM / THREAD_NUM)
 
-#define MAX_THREAD_NUM 2
-#define THREAD_NUM 5
+extern void enqueue(queue_t, int);
+extern void dequeue(queue_t, int);
 
-pthread_mutex_t mutex;
-pthread_cond_t  cond;
-int thread_num = 0;
+typedef struct _queue {
+    int values[MAX_QUEUE_NUM];
+    volatile int remain;
+    int rp, wp;
+    pthread_mutex_t mutex;
+    pthread_cond_t not_full;
+    pthread_cond_t not_empty;
+} queue_t;
+
+typedef struct _thread_arg {
+    int id;
+    queue_t *queue;
+} thread_arg_t;
 
 void *
-thread_func(void *arg)
+producer_func(void *arg)
 {
-    long id = (long)arg;
-
-    pthread_mutex_lock(&mutex);
-    while (thread_num >= MAX_THREAD_NUM)
-        pthread_cond_wait(&cond, &mutex);
-    thread_num++;
-    pthread_mutex_unlock(&mutex);
+    thread_arg_t *targ = (thread_arg_t*)arg;
 
-    printf("Thread %ld started.\n", id);
-    sleep(1);
-    printf("Thread %ld finished.\n", id);
+    for (int i = 0; i < THREAD_DATA_NUM; i++) {
+        int num = targ->id * THREAD_DATA_NUM + 1;
+        enqueue(targ->queue, num);
+        printf("[Producer %d] ==> %d \n", targ->id, num);
+        sleep(rand() % 3);
+    }
+    enqueue(targ->queue, END_DATA);
+    return 0;
+}
 
-    pthread_mutex_lock(&mutex);
-    thread_num--;
-    pthread_cond_signal(&cond);
-    pthread_mutex_unlock(&mutex);
-
+void *
+consumer_func(void *arg)
+{
+    thread_arg_t *targ = (thread_arg_t*)arg;
+    int i;
+    while (1) {
+        dequeue(targ->queue, &i);
+        if (i == END_DATA) break;
+        printf("[Consumer %d] ==> %d \n", targ->id, i);
+        sleep(rand() % 3);
+    }
     return 0;
 }
 
 int
 main()
 {
-    long i;
-    pthread_t handle[THREAD_NUM];
+    pthread_t producer[THREAD_NUM], consumer[THREAD_NUM];
+    thread_arg_t ptarg[THREAD_NUM], ctarg[THREAD_NUM];
+    queue_t queue;
+    int i;
 
     /* initialize */
-    pthread_mutex_init(&mutex, NULL);
-    pthread_cond_init(&cond, NULL);
+    queue.rp = queue.wp = 0;
+    queue.remain = 0;
+    pthread_mutex_init(&queue.mutex, NULL);
+    pthread_cond_init(&queue.not_full, NULL);
+    pthread_cond_init(&queue.not_empty, NULL);
 
-    /* spawn thread a number of THREAD_NUM */
-    for (i = 0; i < THREAD_NUM; ++i)
-        pthread_create(&handle[i], NULL, &thread_func, (void*)i);
+    /* spawn Producer thread */
+    for (i = 0; i < THREAD_NUM; ++i) {
+        ptarg[i].id = i;
+        ptarg[i].queue = &queue;
+        pthread_create(&consumer[i], NULL, &producer_func, &ptarg[i]);
+    }
+
+    /* spawn Consumer thread */
+    for (i = 0; i < THREAD_NUM; ++i) {
+        ctarg[i].id = i;
+        ctarg[i].queue = &queue;
+        pthread_create(&consumer[i], NULL, &consumer_func, &ctarg[i]);
+    }
 
     /* wait for running all thread */
     for (i = 0; i < THREAD_NUM; ++i)
-        pthread_join(handle[i], NULL);
-
-    /* destroy mutex*/
-    pthread_cond_destroy(&cond);
+        pthread_join(consumer[i], NULL);
 
     return 0;
 }
--- a/parallel_processing/ppb_cond_queue/ppb_queue.h	Tue Jan 07 00:37:27 2014 +0900
+++ b/parallel_processing/ppb_cond_queue/ppb_queue.h	Tue Jan 07 14:15:51 2014 +0900
@@ -7,6 +7,7 @@
 #define MAX_QUEUE_NUM 10
 #endif /* MAX_QUEUE_NUM */
 
+/*
 typedef struct _queue {
     int values[MAX_QUEUE_NUM];
     volatile int remain;
@@ -15,11 +16,12 @@
     pthread_cond_t not_full;
     pthread_cond_t not_empty;
 } queue_t;
+*/
 
 void enqueue(queue_t *q, int v) {
     pthread_mutex_lock(&q->mutex);
     while (q->remain == MAX_QUEUE_NUM)
-        pthread_cond_wait(&q->not_fill, &q->mutex);
+        pthread_cond_wait(&q->not_full, &q->mutex);
     q->values[q->wp] = v;
     q->wp++; q->remain++;
     if (q->wp == MAX_QUEUE_NUM) q->wp = 0;