changeset 288:f1b0cc555b6e

Add odgCommit
author Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
date Mon, 06 Feb 2017 04:04:25 +0900
parents 6b099d73949c
children 3d70e21a3902
files doc/dependency.graffle src/parallel_execution/CPUWorker.cbc src/parallel_execution/SynchronizedQueue.cbc src/parallel_execution/TaskManagerImpl.cbc src/parallel_execution/context.h src/parallel_execution/main.cbc src/parallel_execution/twice.cbc
diffstat 7 files changed, 52 insertions(+), 11 deletions(-) [+]
line wrap: on
line diff
Binary file doc/dependency.graffle has changed
--- a/src/parallel_execution/CPUWorker.cbc	Mon Feb 06 00:59:39 2017 +0900
+++ b/src/parallel_execution/CPUWorker.cbc	Mon Feb 06 04:04:25 2017 +0900
@@ -38,7 +38,7 @@
     if (!task)
         return; // end thread
     task->worker = worker;
-    context->next = C_taskReceiveWorker; // set CG after task exec
+    task->next = C_odgCommit; // set CG after task exec
     goto meta(task, task->next);
 }
 
@@ -48,6 +48,46 @@
     goto getTask(context, worker, task);
 }
 
+__code odgCommit(struct LoopCounter* loopCounter, struct Queue* queue, struct Context* task) {
+    int i = loopCounter->i ;
+    if(task->odg + i < task->maxOdg) {
+        queue->queue = (union Data*)GET_WAIT_LIST(task->data[task->odg+i]);
+        queue->next = C_odgCommit1;
+        goto meta(context, queue->queue->Queue.take);
+    }
+    loopCounter->i = 0;
+    goto meta(context, C_taskReceiveWorker);
+}
+
+__code odgCommit_stub(struct Context* context) {
+    struct Context* workerContext = context->worker->worker->CPUWorker.context;
+    goto odgCommit(workerContext,
+                   Gearef(workerContext, LoopCounter),
+                   Gearef(workerContext, Queue),
+                   context);
+}
+
+__code odgCommit1(struct TaskManager* taskManager, struct Context* task) {
+    if(__sync_fetch_and_sub(&task->idgCount, 1)) {
+        if(task->idgCount == 0) {
+            taskManager->taskManager = (union Data*)task->taskManager;
+            taskManager->context = task;
+            taskManager->next = C_odgCommit;
+            goto meta(context, task->taskManager->spawn);
+        }
+    } else {
+        goto meta(context, C_odgCommit1);
+    }
+}
+
+__code odgCommit1_stub(struct Context* context) {
+    struct Context* task = &Gearef(context, Queue)->data->Context;
+    goto odgCommit1(context,
+                    Gearef(context, TaskManager),
+                    task);
+                 
+}
+
 #ifdef USE_CUDA
 __code twiceGpu() {
     cuMemcpyHtoDAsync(context,context,context,context->stream);
@@ -62,4 +102,3 @@
 
 __code shutdownWorker(struct CPUWorker* worker) {
 }
-
--- a/src/parallel_execution/SynchronizedQueue.cbc	Mon Feb 06 00:59:39 2017 +0900
+++ b/src/parallel_execution/SynchronizedQueue.cbc	Mon Feb 06 04:04:25 2017 +0900
@@ -46,7 +46,7 @@
     goto meta(context, C_putSynchronizedQueue1);
 }
 
-__code putSynchronizedQueue1(struct SynchronizedQueue* queue, union Data* data, struct Semaphore* semaphore, __code next(...)) {
+__code putSynchronizedQueue1(struct SynchronizedQueue* queue, struct Semaphore* semaphore, __code next(...)) {
     semaphore->semaphore = (union Data*)queue->queueCount;
     semaphore->next = next;
     goto meta(context, queue->queueCount->v);
--- a/src/parallel_execution/TaskManagerImpl.cbc	Mon Feb 06 00:59:39 2017 +0900
+++ b/src/parallel_execution/TaskManagerImpl.cbc	Mon Feb 06 04:04:25 2017 +0900
@@ -48,6 +48,7 @@
 __code createTask(struct TaskManager* taskManager) {
     taskManager->context = NEW(struct Context);
     initContext(taskManager->context);
+    taskManager->context->taskManager = taskManager;
     goto meta(context, C_setWorker);
 }
 
--- a/src/parallel_execution/context.h	Mon Feb 06 00:59:39 2017 +0900
+++ b/src/parallel_execution/context.h	Mon Feb 06 04:04:25 2017 +0900
@@ -45,9 +45,9 @@
     meta->size = len; \
     data; })
 
-#define GET_TYPE(dseg) ({ \
-    struct Meta* meta = (struct Meta*)(((void*)dseg) - sizeof(struct Meta));\
-    meta->type; })
+#define GET_META(dseg) ((struct Meta*)(((void*)dseg) - sizeof(struct Meta)))
+#define GET_TYPE(dseg) (GET_META(dseg)->type)
+#define GET_WAIT_LIST(dseg) (GET_META(dseg)->wait)
 
 #define Gearef(context, t) (&(context)->data[D_##t]->t)
 
@@ -77,6 +77,7 @@
     int dataNum;
     int idgCount; //number of waiting dataGear
     int odg;
+    int maxOdg;
     int workerId;
     union Data **data;
 };
@@ -125,6 +126,7 @@
         enum Code shutdown;
         enum Code next;
         struct Queue* tasks;
+        struct TaskManager* taskManager;
     } Worker;
     struct CPUWorker {
         pthread_t thread;
--- a/src/parallel_execution/main.cbc	Mon Feb 06 00:59:39 2017 +0900
+++ b/src/parallel_execution/main.cbc	Mon Feb 06 04:04:25 2017 +0900
@@ -143,6 +143,7 @@
     task->data[task->dataNum] = (union Data*)loopCounter2;
     task->data[task->dataNum+1] = (union Data*)array;
     task->odg = task->dataNum + 2;
+    task->maxOdg = task->odg;
     taskManager->next = C_createTask1;
     loopCounter->i++;
     goto meta(context, taskManager->taskManager->TaskManager.spawn);
--- a/src/parallel_execution/twice.cbc	Mon Feb 06 00:59:39 2017 +0900
+++ b/src/parallel_execution/twice.cbc	Mon Feb 06 04:04:25 2017 +0900
@@ -2,7 +2,7 @@
 
 #include "../context.h"
 
-__code twice(struct LoopCounter* loopCounter, int index, int prefix, int* array, struct Context* workerContext) {
+__code twice(struct LoopCounter* loopCounter, int index, int prefix, int* array) {
     int i = loopCounter->i;
     if (i < prefix) {
         array[i+index*prefix] = array[i+index*prefix]*2;
@@ -12,17 +12,15 @@
     }
 
     loopCounter->i = 0;
-    goto meta(workerContext, workerContext->next);
+    goto meta(context, context->next);
 }
 
 __code twice_stub(struct Context* context) {
-    struct Context* workerContext = context->worker->worker->CPUWorker.context;
     struct LoopCounter* loopCounter = &context->data[context->dataNum]->LoopCounter;
     struct Array* array = &context->data[context->dataNum+1]->Array;
     goto twice(context,
                loopCounter,
                array->index,
                array->prefix,
-               array->array,
-               workerContext);
+               array->array);
 }