changeset 494:d8b2036c6942

BoundedBuffer implments Buffer interface
author Tatsuki IHA <innparusu@cr.ie.u-ryukyu.ac.jp>
date Sun, 31 Dec 2017 02:40:08 +0900
parents 82f0c49750f1
children 2e7ea81e5943
files src/parallel_execution/Buffer.h src/parallel_execution/CMakeLists.txt src/parallel_execution/context.h src/parallel_execution/examples/bitonicSort/bitonicSort.cbc src/parallel_execution/examples/boundedBuffer/BoundedBuffer.cbc src/parallel_execution/examples/boundedBuffer/SemaphoreImpl.cbc src/parallel_execution/examples/boundedBuffer/consumer.cbc src/parallel_execution/examples/boundedBuffer/initBuffer.cbc src/parallel_execution/examples/boundedBuffer/initQueue.cbc src/parallel_execution/examples/boundedBuffer/main.cbc src/parallel_execution/examples/boundedBuffer/producer.cbc src/parallel_execution/examples/twice/main.cbc
diffstat 12 files changed, 105 insertions(+), 83 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/parallel_execution/Buffer.h	Sun Dec 31 02:40:08 2017 +0900
@@ -0,0 +1,7 @@
+typedef struct Buffer<Impl>{
+        union Data* buffer;
+        union Data* data;
+        __code put(Impl* buffer, union Data* data, __code next(...));
+        __code take(Impl* buffer, __code next(union Data*, ...));
+        __code next(...);
+} Buffer;
--- a/src/parallel_execution/CMakeLists.txt	Sun Dec 31 01:36:18 2017 +0900
+++ b/src/parallel_execution/CMakeLists.txt	Sun Dec 31 02:40:08 2017 +0900
@@ -138,5 +138,5 @@
   TARGET
       boundedBuffer
   SOURCES
-      examples/boundedBuffer/main.cbc examples/boundedBuffer/initQueue.cbc examples/boundedBuffer/SemaphoreImpl.cbc examples/boundedBuffer/BoundedBuffer.cbc examples/boundedBuffer/consumer.cbc examples/boundedBuffer/producer.cbc CPUWorker.cbc TaskManagerImpl.cbc SingleLinkedQueue.cbc SynchronizedQueue.cbc MultiDimIterator.cbc AtomicReference.cbc
+      examples/boundedBuffer/main.cbc examples/boundedBuffer/initBuffer.cbc examples/boundedBuffer/SemaphoreImpl.cbc examples/boundedBuffer/BoundedBuffer.cbc examples/boundedBuffer/consumer.cbc examples/boundedBuffer/producer.cbc CPUWorker.cbc TaskManagerImpl.cbc SingleLinkedQueue.cbc SynchronizedQueue.cbc MultiDimIterator.cbc AtomicReference.cbc
 )
--- a/src/parallel_execution/context.h	Sun Dec 31 01:36:18 2017 +0900
+++ b/src/parallel_execution/context.h	Sun Dec 31 02:40:08 2017 +0900
@@ -400,6 +400,13 @@
         union Data* body;
         int hash;
     } Memory;
+    struct Buffer {
+        union Data* buffer;
+        union Data* data;
+        enum Code put;
+        enum Code take;
+        enum Code next;
+    } Buffer;
     struct BoundedBuffer {
         struct Element* top;
         struct Element* last;
--- a/src/parallel_execution/examples/bitonicSort/bitonicSort.cbc	Sun Dec 31 01:36:18 2017 +0900
+++ b/src/parallel_execution/examples/bitonicSort/bitonicSort.cbc	Sun Dec 31 02:40:08 2017 +0900
@@ -23,7 +23,7 @@
     // loopCounter->tree = createRedBlackTree(context);
     loopCounter->i = 0;
     taskManager->taskManager = (union Data*)createTaskManagerImpl(context, cpu_num, gpu_num, 0);
-    goto meta(context, C_code1);
+    goto code1();
 }
 
 __code initDataGears_stub(struct Context* context) {
--- a/src/parallel_execution/examples/boundedBuffer/BoundedBuffer.cbc	Sun Dec 31 01:36:18 2017 +0900
+++ b/src/parallel_execution/examples/boundedBuffer/BoundedBuffer.cbc	Sun Dec 31 02:40:08 2017 +0900
@@ -1,9 +1,9 @@
 #include "../../../context.h"
-#interface "Queue.h"
+#interface "Buffer.h"
 #interface "Semaphore.h"
 
-Queue* createBoundedBuffer(struct Context* context, int size) {
-    struct Queue* queue = new Queue();
+Buffer* createBoundedBuffer(struct Context* context, int size) {
+    struct Buffer* buffer = new Buffer();
     struct BoundedBuffer* boundedBuffer = new BoundedBuffer();
     boundedBuffer->top = new Element();
     boundedBuffer->top->next = NULL;
@@ -11,66 +11,65 @@
     boundedBuffer->fullCount = createSemaphoreImpl(context, 0);
     boundedBuffer->emptyCount = createSemaphoreImpl(context, size);
     boundedBuffer->lock = createSemaphoreImpl(context, 1); // binary semaphore
-    queue->queue = (union Data*)boundedBuffer;
-    queue->take = C_takeBoundedBuffer;
-    queue->put = C_putBoundedBuffer;
-    // queue->isEmpty = C_isEmptyBoundedBuffer;
-    // queue->clear = C_clearBoundedBuffer;
-    return queue;
+    buffer->buffer = (union Data*)boundedBuffer;
+    buffer->take = C_takeBoundedBuffer;
+    buffer->put = C_putBoundedBuffer;
+    return buffer;
 }
 
-__code putBoundedBuffer(struct BoundedBuffer* queue, union Data* data, __code next(...)) {
-    struct Semaphore* sem = queue->emptyCount;
+__code putBoundedBuffer(struct BoundedBuffer* buffer, union Data* data, __code next(...)) {
+    struct Semaphore* sem = buffer->emptyCount;
     goto sem->p(putBoundedBuffer1);
 }
 
-__code putBoundedBuffer1(struct BoundedBuffer* queue, union Data* data, __code next(...)) {
-    struct Semaphore* sem = queue->lock;
+__code putBoundedBuffer1(struct BoundedBuffer* buffer, union Data* data, __code next(...)) {
+    struct Semaphore* sem = buffer->lock;
     goto sem->p(putBoundedBuffer2);
 }
 
-__code putBoundedBuffer2(struct BoundedBuffer* queue, union Data* data, __code next(...)) {
+__code putBoundedBuffer2(struct BoundedBuffer* buffer, union Data* data, __code next(...)) {
     struct Element* element = new Element();
     element->data = data;
     element->next = NULL;
-    struct Element* last = queue->last;
+    struct Element* last = buffer->last;
     last->next = element;
-    struct Semaphore* sem = queue->lock;
+    printf("put\n");
+    struct Semaphore* sem = buffer->lock;
     goto sem->v(putBoundedBuffer3);
 }
 
-__code putBoundedBuffer3(struct BoundedBuffer* queue, union Data* data, __code next(...)) {
-    struct Semaphore* sem = queue->fullCount;
+__code putBoundedBuffer3(struct BoundedBuffer* buffer, union Data* data, __code next(...)) {
+    struct Semaphore* sem = buffer->fullCount;
     goto sem->v(putBoundedBuffer4);
 }
 
-__code putBoundedBuffer4(struct BoundedBuffer* queue, union Data* data, __code next(...)) {
-    goto next(data, ...);
+__code putBoundedBuffer4(struct BoundedBuffer* buffer, union Data* data, __code next(...)) {
+    goto next(...);
 }
-__code takeBoundedBuffer(struct BoundedBuffer* queue, __code next(union Data* data, ...)) {
-    struct Semaphore* sem = queue->fullCount;
+__code takeBoundedBuffer(struct BoundedBuffer* buffer, __code next(union Data* data, ...)) {
+    struct Semaphore* sem = buffer->fullCount;
     goto sem->p(takeBoundedBuffer1);
 }
 
-__code takeBoundedBuffer1(struct BoundedBuffer* queue, __code next(union Data* data, ...)) {
-    struct Semaphore* sem = queue->lock;
+__code takeBoundedBuffer1(struct BoundedBuffer* buffer, __code next(union Data* data, ...)) {
+    struct Semaphore* sem = buffer->lock;
     goto sem->p(takeBoundedBuffer2);
 }
 
-__code takeBoundedBuffer2(struct BoundedBuffer* queue, __code next(union Data* data, ...)) {
-    struct Element* top = queue->top;
+__code takeBoundedBuffer2(struct BoundedBuffer* buffer, __code next(union Data* data, ...)) {
+    struct Element* top = buffer->top;
     struct Element* nextElement = top->next;
     data = nextElement->data;
-    queue->top = nextElement;
-    struct  Semaphore* sem = queue->lock;
+    buffer->top = nextElement;
+    struct  Semaphore* sem = buffer->lock;
     goto sem->v(takeBoundedBuffer3);
 }
 
-__code takeBoundedBuffer3(struct BoundedBuffer* queue, __code next(union Data* data, ...)) {
-    struct Semaphore* sem = queue->emptyCount;
+__code takeBoundedBuffer3(struct BoundedBuffer* buffer, __code next(union Data* data, ...)) {
+    struct Semaphore* sem = buffer->emptyCount;
     goto sem->v(takeBoundedBuffer4);
 }
 
-__code takeBoundedBuffer4(struct BoundedBuffer* queue, __code next(union Data* data, ...)) {
+__code takeBoundedBuffer4(struct BoundedBuffer* buffer, __code next(union Data* data, ...)) {
     goto next(data, ...);
 }
--- a/src/parallel_execution/examples/boundedBuffer/SemaphoreImpl.cbc	Sun Dec 31 01:36:18 2017 +0900
+++ b/src/parallel_execution/examples/boundedBuffer/SemaphoreImpl.cbc	Sun Dec 31 02:40:08 2017 +0900
@@ -50,15 +50,29 @@
     pthread_mutex_lock(&semaphore->mutex);
     semaphore->value++;
     struct Queue* queue = semaphore->waitThreadQueue;
-    goto queue->take(vOperationSemaphoreImpl1);                        
+    goto queue->isEmpty(vOperationSemaphoreImpl1, vOperationSemaphoreImpl3);
+}
+
+__code vOperationSemaphoreImpl1(struct SemaphoreImpl* semaphore, __code next(...)) {
+    struct Queue* queue = semaphore->waitThreadQueue;
+    goto queue->take(vOperationSemaphoreImpl2);
 }
 
-__code vOperationSemaphoreImpl1(struct SemaphoreImpl* semaphore, __code next(...), struct Context* waitTask) {
+__code vOperationSemaphoreImpl2(struct SemaphoreImpl* semaphore, struct Context* waitTask, __code next(...)) {
     struct TaskManager* taskManager = waitTask->taskManager;
-    goto taskManager->spawn(waitTask, vOperationSemaphoreImpl2); //notify
+    goto taskManager->spawn(waitTask, vOperationSemaphoreImpl3); //notify
 }
 
-__code vOperationSemaphoreImpl2(struct SemaphoreImpl* semaphore, __code next(...), struct Context* waitTask) {
+__code vOperationSemaphoreImpl2_stub(struct Context* context) {
+    SemaphoreImpl* semaphoreImpl = (SemaphoreImpl*)GearImpl(context, Semaphore, semaphore);
+    struct Context* waitTask = &Gearef(context, Queue)->data->Context;
+    goto vOperationSemaphoreImpl2(context,
+                                  semaphoreImpl,
+                                  waitTask,
+                                  Gearef(context, Semaphore)->next);
+}
+
+__code vOperationSemaphoreImpl3(struct SemaphoreImpl* semaphore, __code next(...)) {
     pthread_mutex_unlock(&semaphore->mutex);
     goto next(...);
 }
--- a/src/parallel_execution/examples/boundedBuffer/consumer.cbc	Sun Dec 31 01:36:18 2017 +0900
+++ b/src/parallel_execution/examples/boundedBuffer/consumer.cbc	Sun Dec 31 02:40:08 2017 +0900
@@ -1,25 +1,25 @@
 #include "../../../context.h"
 #include <stdio.h>
-#interface "Queue.h"
+#interface "Buffer.h"
 
-__code consumer(struct Queue* queue, __code next(...)) {
-    goto queue->take(consumer1);
+__code consumer(struct Buffer* buffer, __code next(...)) {
+    goto buffer->take(consumer1);
 }
 
 __code consumer_stub(struct Context* context) {
     goto consumer(context,
-                  &context->data[context->idg]->Queue,
+                  &context->data[context->idg]->Buffer,
                   context->next);
 }
 
-__code consumer1(struct Queue* queue, __code next(...), struct Node* node) {
+__code consumer1(struct Buffer* buffer, __code next(...), struct Node* node) {
     printf("getData %d\n", node->value->Int);
     goto consumer();
 }
 
 __code consumer1_stub(struct Context* context) {
     goto consumer1(context,
-                   &context->data[context->idg]->Queue,
+                   &context->data[context->idg]->Buffer,
                    context->next,
-                   &Gearef(context, Queue)->data->Node);
+                   &Gearef(context, Buffer)->data->Node);
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/parallel_execution/examples/boundedBuffer/initBuffer.cbc	Sun Dec 31 02:40:08 2017 +0900
@@ -0,0 +1,13 @@
+#include "../../../context.h"
+
+__code initBuffer(__code next(struct Buffer* output, ...)) {
+    struct Buffer* output = *O_output;
+    goto next(output, ...);
+}
+
+__code initBuffer_stub(struct Context* context) {
+    struct Buffer** O_output = (struct Buffer**)&context->data[context->odg];
+    goto initBuffer(context,
+                  context->next,
+                  O_output);
+}
--- a/src/parallel_execution/examples/boundedBuffer/initQueue.cbc	Sun Dec 31 01:36:18 2017 +0900
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,13 +0,0 @@
-#include "../../../context.h"
-
-__code initQueue(__code next(struct Queue* output, ...)) {
-    struct Queue* output = *O_output;
-    goto next(output, ...);
-}
-
-__code initQueue_stub(struct Context* context) {
-    struct Queue** O_output = (struct Queue**)&context->data[context->odg];
-    goto initQueue(context,
-                  context->next,
-                  O_output);
-}
--- a/src/parallel_execution/examples/boundedBuffer/main.cbc	Sun Dec 31 01:36:18 2017 +0900
+++ b/src/parallel_execution/examples/boundedBuffer/main.cbc	Sun Dec 31 02:40:08 2017 +0900
@@ -9,7 +9,7 @@
 
 int cpu_num = 1;
 int length = 100;
-int queue_size = 10;
+int buffer_size = 10;
 int gpu_num = 0;
 int CPU_ANY = -1;
 int CPU_CUDA = -1;
@@ -23,7 +23,7 @@
     // loopCounter->tree = createRedBlackTree(context);
     loopCounter->i = 0;
     taskManager->taskManager = (union Data*)createTaskManagerImpl(context, cpu_num, gpu_num, 0);
-    goto meta(context, C_code1);
+    goto code1();
 }
 
 __code initDataGears_stub(struct Context* context) {
@@ -37,28 +37,23 @@
     printf("cpus:\t\t%d\n", cpu_num);
     printf("gpus:\t\t%d\n", gpu_num);
     printf("length:\t\t%d\n", length);
-    /* puts("queue"); */
-    /* print_queue(context->data[ActiveQueue]->queue.first); */
-    /* puts("tree"); */
-    /* print_tree(context->data[Tree]->tree.root); */
-    /* puts("result"); */
+    goto createTask1();
+}
 
-    //time->next = C_code2;
-    goto meta(context, C_createTask1);
-    //goto meta(context, C_start_time);
+__code code2(struct TaskManager* taskManager) {
+    goto taskManager->shutdown(exit_code);
 }
 
-__code code1_stub(struct Context* context) {
-    goto code1(context, Gearef(context, Timer));
+__code code2_stub(struct Context* context) {
+    goto code2(context, &Gearef(context, TaskManager)->taskManager->TaskManager);
 }
 
-
-__code createTask1(struct LoopCounter* loopCounter, struct TaskManager* taskManager) {
-    struct Queue* queue = createBoundedBuffer(context, queue_size);
-    par goto producer(queue, __exit);
-    par goto consumer(queue, __exit);
-    par goto initQueue(queue, __exit);
-    goto createTask1();
+__code createTask1(struct TaskManager* taskManager) {
+    struct Buffer* buffer = createBoundedBuffer(context, buffer_size);
+    par goto producer(buffer, __exit);
+    par goto consumer(buffer, __exit);
+    par goto initBuffer(buffer, __exit);
+    goto code2();
 }
 
 void init(int argc, char** argv) {
@@ -67,8 +62,8 @@
             cpu_num = (int)atoi(argv[i+1]);
         else if (strcmp(argv[i], "-l") == 0)
             length = (int)atoi(argv[i+1]);
-        else if (strcmp(argv[i], "-queueSize") == 0)
-            queue_size = (int)atoi(argv[i+1]);
+        else if (strcmp(argv[i], "-bufferSize") == 0)
+            buffer_size = (int)atoi(argv[i+1]);
         else if (strcmp(argv[i], "-cuda") == 0) {
             gpu_num = 1;
             CPU_CUDA = 0;
--- a/src/parallel_execution/examples/boundedBuffer/producer.cbc	Sun Dec 31 01:36:18 2017 +0900
+++ b/src/parallel_execution/examples/boundedBuffer/producer.cbc	Sun Dec 31 02:40:08 2017 +0900
@@ -1,15 +1,15 @@
 #include "../../../context.h"
-#interface "Queue.h"
+#interface "Buffer.h"
 
-__code producer(struct Queue* queue, __code next(...)) {
+__code producer(struct Buffer* buffer, __code next(...)) {
     Node* node = new Node();
     node->value = (union Data*)new Int();
     node->value->Int = 10;
-    goto queue->put(node, producer);
+    goto buffer->put(node, producer);
 }
 
 __code producer_stub(struct Context* context) {
     goto producer(context,
-                  &context->data[context->idg]->Queue,
+                  &context->data[context->idg]->Buffer,
                   context->next);
 }
--- a/src/parallel_execution/examples/twice/main.cbc	Sun Dec 31 01:36:18 2017 +0900
+++ b/src/parallel_execution/examples/twice/main.cbc	Sun Dec 31 02:40:08 2017 +0900
@@ -23,7 +23,7 @@
     // loopCounter->tree = createRedBlackTree(context);
     loopCounter->i = 0;
     taskManager->taskManager = (union Data*)createTaskManagerImpl(context, cpu_num, gpu_num, 0);
-    goto meta(context, C_code1);
+    goto code1();
 }
 
 __code initDataGears_stub(struct Context* context) {