changeset 122:a086857e1812

implement dependency example. do not work
author Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
date Thu, 25 Aug 2016 03:23:29 +0900
parents bb8be1820302
children 4ff6f093b695
files src/parallel_execution/context.c src/parallel_execution/context.h src/parallel_execution/dependency.c src/parallel_execution/main.c src/parallel_execution/rb_tree.c src/parallel_execution/worker.c
diffstat 6 files changed, 204 insertions(+), 51 deletions(-) [+]
line wrap: on
line diff
--- a/src/parallel_execution/context.c	Tue Jul 26 13:06:46 2016 +0900
+++ b/src/parallel_execution/context.c	Thu Aug 25 03:23:29 2016 +0900
@@ -49,15 +49,29 @@
 extern __code createData2_stub(struct Context*);
 extern __code createTask1_stub(struct Context*);
 extern __code createTask2_stub(struct Context*);
+extern __code createTask3_stub(struct Context*);
+extern __code createTask4_stub(struct Context*);
+extern __code poll1_stub(struct Context*);
+extern __code poll2_stub(struct Context*);
 /* extern __code createTask3_stub(struct Context*); */
 /* extern __code createTask4_stub(struct Context*); */
 extern __code putQueue1_stub(struct Context*);
 extern __code putQueue2_stub(struct Context*);
 extern __code putQueue3_stub(struct Context*);
 extern __code putQueue4_stub(struct Context*);
-extern __code getQueue_stub(struct Context*);
+extern __code initWorker_stub(struct Context*);
+extern __code getQueue1_stub(struct Context*);
+extern __code getInputData1_stub(struct Context*);
+extern __code getInputData2_stub(struct Context*);
+extern __code getOutputData1_stub(struct Context*);
+extern __code getOutputData2_stub(struct Context*);
+extern __code waitFor_stub(struct Context*);
 extern __code spawnTask_stub(struct Context*);
+extern __code checkTaskFinish1_stub(struct Context*);
+extern __code checkTaskFinish2_stub(struct Context*);
 extern __code twice_stub(struct Context*);
+extern __code add_stub(struct Context*);
+extern __code mult_stub(struct Context*);
 extern __code start_time_stub(struct Context*);
 extern __code end_time_stub(struct Context*);
 extern __code exit_code(struct Context*);
@@ -114,15 +128,25 @@
     context->code[CreateData2]   = createData2_stub;
     context->code[CreateTask1]   = createTask1_stub;
     context->code[CreateTask2]   = createTask2_stub;
-    /* context->code[CreateTask3]   = createTask3_stub; */
-    /* context->code[CreateTask4]   = createTask4_stub; */
+    context->code[CreateTask3]   = createTask3_stub;
+    context->code[CreateTask4]   = createTask4_stub;
     context->code[PutQueue1]     = putQueue1_stub;
     context->code[PutQueue2]     = putQueue2_stub;
     context->code[PutQueue3]     = putQueue3_stub;
     context->code[PutQueue4]     = putQueue4_stub;
-    context->code[GetQueue]      = getQueue_stub;
+    context->code[InitWorker]    = initWorker_stub;
+    context->code[GetQueue1]     = getQueue1_stub;
+    context->code[GetInputData1] = getInputData1_stub;
+    context->code[GetInputData2] = getInputData2_stub;
+    context->code[GetOutputData1] = getOutputData1_stub;
+    context->code[GetOutputData2] = getOutputData2_stub;
+    context->code[WaitFor]       = waitFor_stub;
     context->code[SpawnTask]     = spawnTask_stub;
+    context->code[CheckTaskFinish1] = checkTaskFinish1_stub;
+    context->code[CheckTaskFinish2] = checkTaskFinish2_stub;
     context->code[Twice]         = twice_stub;
+    context->code[Add]           = add_stub;
+    context->code[Mult]          = mult_stub;
     context->code[StartTime]     = start_time_stub;
     context->code[EndTime]       = end_time_stub;
     context->code[Exit]          = exit_code;
--- a/src/parallel_execution/context.h	Tue Jul 26 13:06:46 2016 +0900
+++ b/src/parallel_execution/context.h	Thu Aug 25 03:23:29 2016 +0900
@@ -60,13 +60,27 @@
     CreateData2,
     CreateTask1,
     CreateTask2,
+    CreateTask3,
+    CreateTask4,
+    Poll1,
+    Poll2,
     PutQueue1,
     PutQueue2,
     PutQueue3,
     PutQueue4,
-    GetQueue,
+    InitWorker,
+    GetQueue1,
+    GetInputData1,
+    GetInputData2,
+    GetOutputData1,
+    GetOutputData2,
+    WaitFor,
     SpawnTask,
+    CheckTaskFinish1,
+    CheckTaskFinish2,
     Twice,
+    Add,
+    Mult,
     StartTime,
     EndTime,
     Exit,
@@ -133,8 +147,10 @@
     struct Task {
         enum Code code;
         int iKeys[2];
-        int oKeys[2];
+        int oKeys[1];
         int idsCount;
+        union Data* iargs[2];
+        union Data* oargs[1];
     } task;
     struct Queue {
         struct Element* first;
--- a/src/parallel_execution/dependency.c	Tue Jul 26 13:06:46 2016 +0900
+++ b/src/parallel_execution/dependency.c	Thu Aug 25 03:23:29 2016 +0900
@@ -3,13 +3,13 @@
 #include "origin_cs.h"
 
 __code meta_waitFor(struct Context* context, struct Queue* queue, enum Code next) {
-    context->data[Queue] = (union Data *)queue;
+    context->data[Queue] = (union Data*)queue;
     goto (context->code[next])(context);
 }
 
 __code waitFor(struct Context* context, struct Task* slave, struct Element* element, union Data* data) {
-    struct Integer integer = (struct Integer *)data;
-    struct Queue waitMeTasks = (struct Queue *)(integer + integer-> mDataOffset);
+    struct Integer* integer   = (struct Integer *)data;
+    struct Queue* waitMeTasks = (struct Queue *)(integer + integer-> mDataOffset);
     element->data = (union Data *)slave;
     goto meta_waitFor(context, waitMeTasks, PutQueue1);
 }
@@ -37,7 +37,7 @@
 
 __code spawnTask_stub(struct Context* context) {
     goto spawnTask(context,
-            &context->data[context->dataNum]->task,
+            (struct Task *)(context->data[Element]->element.data),
             &context->data[Element]->element,
             &context->data[ActiveQueue]->queue,
             &context->data[WaitQueue]->queue);
@@ -48,25 +48,34 @@
     goto (context->code[next])(context);
 }
 
-__code checkTaskFinish1(struct Context* context, struct Data* data) {
-    struct Integer integer = (struct Integer *)data;
-    struct Queue waitMeTasks = (struct Queue *)(integer + integer->mDataOffset);
-    context->next = check_task_finish2;
-    goto meta_check_task_finish1(context, waitMeTasks, GetQueue1);
+__code checkTaskFinish1(struct Context* context, union Data* data) {
+    struct Integer* integer = (struct Integer *)data;
+    struct Queue* waitMeTasks = (struct Queue *)(integer + integer->mDataOffset);
+    context->next = CheckTaskFinish2;
+    goto meta_checkTaskFinish1(context, waitMeTasks, GetQueue1);
 }
 
 __code checkTaskFinish1_stub(struct Context* context) {
+    goto checkTaskFinish1(context,
+                          ((struct Task*)(context->data[Element]->element.data))->oargs[0]);
 }
 
 /*
  * ready input data Gear for waitme task
  */
-__code checkTaskFinish2(struct Context* context) {
-    struct Task task = ((struct Task) element -> data);
-    task->idsCount--;
+__code checkTaskFinish2(struct Context* context, struct Task* task) {
+    // using cas?
+    //
+    if (__sync_fetch_and_sub(&task->idsCount, 1)) {
+        goto meta(context, GetQueue1);
+    } else {
+        goto meta(context, CheckTaskFinish2);
+    }
 }
 
 __code checkTaskFinish2_stub(struct Context* context) {
+    goto checkTaskFinish1(context,
+                          (struct Task*)(context->data[Element]->element.data));
 }
 
 __code add(struct Context* context, struct Integer* input1, struct Integer* input2, struct Integer* output) {
@@ -74,7 +83,21 @@
     goto meta(context, context->next);
 }
 
+__code add_stub(struct Context* context) {
+    goto add(context,
+            (struct Integer*)((struct Task*)(context->data[Element]->element.data))->iargs[0],
+            (struct Integer*)((struct Task*)(context->data[Element]->element.data))->iargs[1],
+            (struct Integer*)((struct Task*)(context->data[Element]->element.data))->oargs[0]);
+}
+
 __code mult(struct Context* context, struct Integer* input1, struct Integer* input2, struct Integer* output) {
     output->value = input1->value * input2->value;
     goto meta(context, context->next);
 }
+
+__code mult_stub(struct Context* context) {
+    goto mult(context,
+             (struct Integer*)((struct Task*)(context->data[Element]->element.data))->iargs[0],
+             (struct Integer*)((struct Task*)(context->data[Element]->element.data))->iargs[1],
+             (struct Integer*)((struct Task*)(context->data[Element]->element.data))->oargs[0]);
+}
--- a/src/parallel_execution/main.c	Tue Jul 26 13:06:46 2016 +0900
+++ b/src/parallel_execution/main.c	Thu Aug 25 03:23:29 2016 +0900
@@ -6,6 +6,7 @@
 
 extern __code initContext(struct Context* context);
 extern void allocator(struct Context* context);
+extern void metaAllocator(struct Context* context);
 
 int cpu_num = 1;
 int length = 1024;
@@ -14,7 +15,7 @@
 
 void print_queue(struct Element* element) {
     while (element) {
-        printf("%d\n", ((struct Task *)(element->data))->key);
+        //printf("%d\n", ((struct Task *)(element->data))->key);
         element = element->next;
     }
 }
@@ -180,7 +181,7 @@
 */
 
 
-__code createTask2(struct Context* context, struct LoopCounter* loopCounter, struct Task* task, struct Element* element, struct Queue* activeQueue) {
+__code createTask2(struct Context* context, struct LoopCounter* loopCounter, struct Task* task, struct Element* element) {
     int i = loopCounter->i;
 
     task->code = Mult;
@@ -189,7 +190,7 @@
     task->idsCount = 1;
     loopCounter->i -= 2;
 
-    element->data = (union Data *)task;
+    element->data = (union Data*)task;
 
     context->next = CreateTask3;
 
@@ -223,18 +224,18 @@
     task->iKeys[1] = i-2;
     loopCounter->i -= 2;
     element->data = (union Data *)task;
-    context->next = WaitFor1;
+    context->next = WaitFor;
 
-    node->key = okeys[0];
+    node->key = task->oKeys[0];
     goto meta(context, Get);
 }
 
 __code createTask4_stub(struct Context* context) {
-    goto createTask3(context,
+    goto createTask4(context,
             &context->data[LoopCounter]->loopCounter,
             &context->data[context->dataNum]->task,
-            &context->data[Node]->node,
-            &context->data[Element]->element);
+            &context->data[Element]->element,
+            &context->data[Node]->node);
 }
 
 __code putQueue1(struct Context* context, struct Allocate* allocate) {
@@ -301,12 +302,12 @@
 
     if (i < worker->num) {
         struct Context* worker_context = &worker->contexts[i];
-        worker_context->next = GetQueue2;
+        worker_context->next = InitWorker;
         worker_context->data[Tree] = context->data[Tree];
         worker_context->data[ActiveQueue] = context->data[ActiveQueue];
         worker_context->data[WaitQueue] = context->data[WaitQueue];
         worker_context->data[Queue] = context->data[ActiveQueue];
-        pthread_create(&worker_context->thread, NULL, (void*)&getQueue1, worker_context);
+        pthread_create(&worker_context->thread, NULL, (void*)&start_code, worker_context);
         worker_context->thread_num = i;
         loopCounter->i++;
 
@@ -321,25 +322,53 @@
     goto createWorker(context, &context->data[LoopCounter]->loopCounter, &context->data[Worker]->worker);
 }
 
-__code taskManager(struct Context* context, struct LoopCounter* loopCounter, struct Worker* worker) {
+__code taskManager(struct Context* context, struct LoopCounter* loopCounter, struct Worker* worker, struct Queue* waitQueue, struct Queue* ActiveQueue) {
+    if (waitQueue->first) {
+        goto meta(context, Poll1);
+    }
     int i = loopCounter->i;
-
     if (i < worker->num) {
         pthread_join(worker->contexts[i].thread, NULL);
         loopCounter->i++;
 
         goto meta(context, TaskManager);
     }
-
     loopCounter->i = 0;
-
     struct Time *t = &context->data[Time]->time;
     t->next = Code2;
     goto meta(context, EndTime);
 }
 
 __code taskManager_stub(struct Context* context) {
-    goto taskManager(context, &context->data[LoopCounter]->loopCounter, &context->data[Worker]->worker);
+    goto taskManager(context, 
+                     &context->data[LoopCounter]->loopCounter,
+                     &context->data[Worker]->worker,
+                     &context->data[WaitQueue]->queue,
+                     &context->data[ActiveQueue]->queue);
+}
+
+__code meta_poll1(struct Context* context, struct Queue* queue, enum Code next) {
+    context->data[Queue] = (union Data *)queue;
+    goto (context->code[next])(context);
+}
+
+__code poll1(struct Context* context, struct Queue* waitQueue) {
+    context->next = Poll2;
+    goto meta_poll1(context, waitQueue, GetQueue1);
+}
+
+__code poll1_stub(struct Context* context) {
+    goto poll1(context,
+            &context->data[WaitQueue]->queue);
+}
+
+__code poll2(struct Context* context) {
+    context->next = TaskManager;
+    goto meta(context, SpawnTask);
+}
+
+__code poll2_stub(struct Context* context) {
+    goto poll2(context);
 }
 
 void init(int argc, char** argv) {
--- a/src/parallel_execution/rb_tree.c	Tue Jul 26 13:06:46 2016 +0900
+++ b/src/parallel_execution/rb_tree.c	Thu Aug 25 03:23:29 2016 +0900
@@ -333,8 +333,7 @@
     if (traverse->current)
         goto meta(context, Search);
 
-    stack_pop(context->code_stack, &context->next);
-    goto meta(context, context->next);
+    goto meta(context, traverse->next);
 }
 
 __code search_stub(struct Context* context) {
--- a/src/parallel_execution/worker.c	Tue Jul 26 13:06:46 2016 +0900
+++ b/src/parallel_execution/worker.c	Thu Aug 25 03:23:29 2016 +0900
@@ -3,37 +3,99 @@
 #include "context.h"
 #include "origin_cs.h"
 
+__code initWorker(struct Context* context) {
+    context->next = GetInputData1;
+    goto meta(context, GetQueue1);
+}
+
+__code initWorker_stub(struct Context* context) {
+    goto initWorker(context);
+}
+
 __code getQueue1(struct Context* context, struct Queue* queue, struct Element* element) {
     if (queue->first == 0)
         return;
 
     element = queue->first;
-    if (__sync_bool_compare_and_swap(&queue->first, first, first->next)) {
+    if (__sync_bool_compare_and_swap(&queue->first, element, element->next)) {
         queue->count--;
-        goto meta(context, context->next)
+        goto meta(context, context->next);
     } else {
-        goto meta(context, GetQueue);
+        goto meta(context, GetQueue1);
     }
 }
 
 __code getQueue1_stub(struct Context* context) {
-    goto getQueue(context, &context->data[Queue]->queue, &context->data[Element]->element);
+    goto getQueue1(context, &context->data[Queue]->queue, &context->data[Element]->element);
+}
+
+__code getInputData1(struct Context* context, struct Task* task, struct Node* node, struct LoopCounter* loopCounter) {
+    int i = loopCounter->i;
+    if(i < sizeof(task->iKeys)/ sizeof(int)) {
+        context->next = GetInputData2;
+        node->key = task->iKeys[i];
+        struct Traverse *t = &context->data[Traverse]->traverse;
+        t->next = GetQueue1;
+        goto meta(context, Get);
+    }
+    loopCounter->i = 0;
+    goto meta(context, GetOutputData1);
+}
+
+__code getInputData1_stub(struct Context* context) {
+    goto getInputData1(context,
+            (struct Task*)(context->data[Element]->element.data),
+            &context->data[Node]->node,
+            &context->data[LoopCounter]->loopCounter);
+}
+
+__code getInputData2(struct Context* context, struct Task* task, union Data* data, struct LoopCounter* loopCounter) {
+    int i = loopCounter->i;
+    task->iargs[i] = data;
+    loopCounter->i++;
+    goto meta(context, GetInputData1);
 }
 
-__code getQueue2(struct Context* context, struct Element* element, struct Node* node) {
-    context->next = GetQueue;
-    stack_push(context->code_stack, &context->next);
-
-    context->next = ((struct Task *)(element->data))->code;
-    node->key = ((struct Task *)(element->data))->key;
-
-    struct Traverse *t = &context->data[Traverse]->traverse;
-    t->next = GetQueue;
-    goto meta(context, Get);
+__code getInputData2_stub(struct Context* context) {
+    goto getInputData2(context,
+            (struct Task*)(context->data[Element]->element.data),
+            context->data[Node]->node.value,
+            &context->data[LoopCounter]->loopCounter);
 }
 
-__code getQueue2_stub(struct Context* context) {
-    goto getQueue(context, &context->data[Element]->element, &context->data[Node]->node);
+__code getOutputData1(struct Context* context, struct Task* task, struct Node* node, struct LoopCounter* loopCounter) {
+    int i = loopCounter->i;
+    if(i < sizeof(task->oKeys)/ sizeof(int)) {
+        context->next = GetOutputData2;
+        node->key = task->oKeys[i];
+        struct Traverse *t = &context->data[Traverse]->traverse;
+        t->next = GetQueue1;
+        goto meta(context, Get);
+    }
+    context->next = CheckTaskFinish1;
+    loopCounter->i = 0;
+    goto meta(context, task->code);
+}
+
+__code getOutputData1_stub(struct Context* context) {
+    goto getOutputData1(context,
+            (struct Task*)(context->data[Element]->element.data),
+            &context->data[Node]->node,
+            &context->data[LoopCounter]->loopCounter);
+}
+
+__code getOutputData2(struct Context* context, struct Task* task, union Data* data, struct LoopCounter* loopCounter) {
+    int i = loopCounter->i;
+    task->oargs[i] = data;
+    loopCounter->i++;
+    goto meta(context, GetOutputData1);
+}
+
+__code getOutputData2_stub(struct Context* context) {
+    goto getOutputData2(context,
+            (struct Task*)(context->data[Element]->element.data),
+            context->data[Node]->node.value,
+            &context->data[LoopCounter]->loopCounter);
 }
 
 #ifdef USE_CUDA