view src/parallel_execution/SynchronizedQueue.cbc @ 549:05fd14d8edbe

add update_context.pl
author anatofuz <anatofuz@cr.ie.u-ryukyu.ac.jp>
date Wed, 13 Nov 2019 16:04:34 +0900
parents 98c5235b3ecb
children
line wrap: on
line source

#include "../context.h"
#interface "Queue.h"
#interface "Atomic.h"

#include <stdio.h>

/*
 * Non-blocking queue of Paper: Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms(https://www.research.ibm.com/people/m/michael/podc-1996.pdf).
 */

Queue* createSynchronizedQueue(struct Context* context) {
    struct Queue* queue = new Queue();
    struct SynchronizedQueue* synchronizedQueue = new SynchronizedQueue();
    synchronizedQueue->top = new Element(); // allocate a free node
    synchronizedQueue->top->next = NULL;
    synchronizedQueue->last = synchronizedQueue->top;
    synchronizedQueue->atomic = createAtomicReference(context);
    queue->queue = (union Data*)synchronizedQueue;
    queue->take  = C_takeSynchronizedQueue;
    queue->put  = C_putSynchronizedQueue;
    queue->isEmpty = C_isEmptySynchronizedQueue;
    queue->clear = C_clearSynchronizedQueue;
    return queue;
}

__code clearSynchronizedQueue(struct SynchronizedQueue* queue, __code next(...)) {
    struct Element* top = queue->top;
    struct Atomic* atomic = queue->atomic;
    goto atomic->checkAndSet(&queue->top, top, NULL, next(...), clearSynchronizedQueue);
}

__code putSynchronizedQueue(struct SynchronizedQueue* queue, union Data* data, __code next(...)) {
    Element* element = new Element();
    element->data = data;
    element->next = NULL;
    Element* last = queue->last;
    Element* nextElement = last->next;
    if (last != queue->last) {
        goto putSynchronizedQueue();
    }
    if (nextElement == NULL) {
        struct Atomic* atomic = queue->atomic;
        goto atomic->checkAndSet(&last->next, nextElement, element, next(...), putSynchronizedQueue);
    } else {
        struct Atomic* atomic = queue->atomic;
        goto atomic->checkAndSet(&queue->last, last, nextElement, putSynchronizedQueue, putSynchronizedQueue);
    }
}

__code takeSynchronizedQueue(struct SynchronizedQueue* queue, __code next(union Data* data, ...)) {
    struct Element* top = queue->top;
    struct Element* last = queue->last;
    struct Element* nextElement = top->next;
    if (top != queue->top) {
        goto takeSynchronizedQueue();
    }
    if (top == last) {
        if (nextElement != NULL) {
            struct Atomic* atomic = queue->atomic;
            goto atomic->checkAndSet(&queue->last, last, nextElement, takeSynchronizedQueue, takeSynchronizedQueue);
        }
    } else {
        struct Atomic* atomic = queue->atomic;
        goto atomic->checkAndSet(&queue->top, top, nextElement, takeSynchronizedQueue1, takeSynchronizedQueue);
    }
    goto takeSynchronizedQueue();
}

__code takeSynchronizedQueue1(struct SynchronizedQueue* queue, __code next(union Data* data, ...), struct Element* nextElement) {
    data = nextElement->data;
    goto next(data, ...);
}

__code takeSynchronizedQueue1_stub(struct Context* context) {
	SynchronizedQueue* queue = (SynchronizedQueue*)GearImpl(context, Queue, queue);
	enum Code next = Gearef(context, Queue)->next;
	Data** O_data = &Gearef(context, Queue)->data;
	goto takeSynchronizedQueue1(context,
                                queue,
                                next,
                                O_data,
                                (struct Element*)Gearef(context, Atomic)->newData);
}

__code isEmptySynchronizedQueue(struct SynchronizedQueue* queue, __code next(...), __code whenEmpty(...)) {
    struct Element* top = queue->top;
    struct Element* last = queue->last;
    struct Element* nextElement = top->next;
    if (top != queue->top) {
        goto isEmptySynchronizedQueue();
    }
    if (top == last && nextElement == NULL) {
        goto whenEmpty(...);
    }
    goto next(...);
}