view parallel_processing/ppb_cond_queue/ppb_cond_queue.c @ 21:d137f1823794

fix (but cannot running)
author Masataka Kohagura <e085726@ie.u-ryukyu.ac.jp>
date Tue, 07 Jan 2014 14:15:51 +0900
parents a9534f217a0c
children
line wrap: on
line source

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <stdbool.h>
#include <pthread.h>
#include "ppb_queue.h"

#define THREAD_NUM 2
#define DATA_NUM 10
#define MAX_QUEUE_NUM 3
#define THREAD_DATA_NUM (DATA_NUM / THREAD_NUM)

extern void enqueue(queue_t, int);
extern void dequeue(queue_t, int);

typedef struct _queue {
    int values[MAX_QUEUE_NUM];
    volatile int remain;
    int rp, wp;
    pthread_mutex_t mutex;
    pthread_cond_t not_full;
    pthread_cond_t not_empty;
} queue_t;

typedef struct _thread_arg {
    int id;
    queue_t *queue;
} thread_arg_t;

void *
producer_func(void *arg)
{
    thread_arg_t *targ = (thread_arg_t*)arg;

    for (int i = 0; i < THREAD_DATA_NUM; i++) {
        int num = targ->id * THREAD_DATA_NUM + 1;
        enqueue(targ->queue, num);
        printf("[Producer %d] ==> %d \n", targ->id, num);
        sleep(rand() % 3);
    }
    enqueue(targ->queue, END_DATA);
    return 0;
}

void *
consumer_func(void *arg)
{
    thread_arg_t *targ = (thread_arg_t*)arg;
    int i;
    while (1) {
        dequeue(targ->queue, &i);
        if (i == END_DATA) break;
        printf("[Consumer %d] ==> %d \n", targ->id, i);
        sleep(rand() % 3);
    }
    return 0;
}

int
main()
{
    pthread_t producer[THREAD_NUM], consumer[THREAD_NUM];
    thread_arg_t ptarg[THREAD_NUM], ctarg[THREAD_NUM];
    queue_t queue;
    int i;

    /* initialize */
    queue.rp = queue.wp = 0;
    queue.remain = 0;
    pthread_mutex_init(&queue.mutex, NULL);
    pthread_cond_init(&queue.not_full, NULL);
    pthread_cond_init(&queue.not_empty, NULL);

    /* spawn Producer thread */
    for (i = 0; i < THREAD_NUM; ++i) {
        ptarg[i].id = i;
        ptarg[i].queue = &queue;
        pthread_create(&consumer[i], NULL, &producer_func, &ptarg[i]);
    }

    /* spawn Consumer thread */
    for (i = 0; i < THREAD_NUM; ++i) {
        ctarg[i].id = i;
        ctarg[i].queue = &queue;
        pthread_create(&consumer[i], NULL, &consumer_func, &ctarg[i]);
    }

    /* wait for running all thread */
    for (i = 0; i < THREAD_NUM; ++i)
        pthread_join(consumer[i], NULL);

    return 0;
}