changeset 138:337fdbffa693 default tip

Merge
author Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
date Sat, 01 Oct 2016 00:23:35 +0900
parents fe1fbfec7d01 (current diff) 77e60b6cdace (diff)
children
files src/parallel_execution/context.c src/parallel_execution/context.h src/parallel_execution/main.c src/parallel_execution/rb_tree.c
diffstat 6 files changed, 391 insertions(+), 154 deletions(-) [+]
line wrap: on
line diff
--- a/src/parallel_execution/allocate.c	Thu Sep 29 22:22:36 2016 +0900
+++ b/src/parallel_execution/allocate.c	Sat Oct 01 00:23:35 2016 +0900
@@ -4,3 +4,8 @@
     context->data[++context->dataNum] = context->heap;
     context->heap += context->data[Allocate]->allocate.size;
 }
+
+void metaAllocator(struct Context* context) {
+    context->mData[++context->metaDataNum] = context->heap;
+    context->heap += context->data[Allocate]->allocate.size;
+}
--- a/src/parallel_execution/context.c	Thu Sep 29 22:22:36 2016 +0900
+++ b/src/parallel_execution/context.c	Sat Oct 01 00:23:35 2016 +0900
@@ -55,13 +55,29 @@
 extern __code createTask2_stub(struct Context*);
 extern __code createTask3_stub(struct Context*);
 extern __code createTask4_stub(struct Context*);
+extern __code createTask5_stub(struct Context*);
+extern __code createTask6_stub(struct Context*);
+extern __code poll1_stub(struct Context*);
+extern __code poll2_stub(struct Context*);
+/* extern __code createTask3_stub(struct Context*); */
+/* extern __code createTask4_stub(struct Context*); */
 extern __code putQueue1_stub(struct Context*);
 extern __code putQueue2_stub(struct Context*);
 extern __code putQueue3_stub(struct Context*);
 extern __code putQueue4_stub(struct Context*);
-extern __code getQueue_stub(struct Context*);
+extern __code getTask_stub(struct Context*);
+extern __code getQueue1_stub(struct Context*);
+extern __code getInputData1_stub(struct Context*);
+extern __code getInputData2_stub(struct Context*);
+extern __code getOutputData1_stub(struct Context*);
+extern __code getOutputData2_stub(struct Context*);
+extern __code waitFor_stub(struct Context*);
 extern __code spawnTask_stub(struct Context*);
+extern __code checkTaskFinish1_stub(struct Context*);
+extern __code checkTaskFinish2_stub(struct Context*);
 extern __code twice_stub(struct Context*);
+extern __code add_stub(struct Context*);
+extern __code mult_stub(struct Context*);
 extern __code start_time_stub(struct Context*);
 extern __code end_time_stub(struct Context*);
 extern __code exit_code(struct Context*);
@@ -70,6 +86,7 @@
     context->heapLimit = sizeof(union Data)*ALLOCATE_SIZE;
     context->code = (__code(**) (struct Context*)) NEWN(ALLOCATE_SIZE, void*);
     context->data = NEWN(ALLOCATE_SIZE, union Data*);
+    context->mData = NEWN(ALLOCATE_SIZE, union MetaData*);
     context->heapStart = NEWN(context->heapLimit, char);
     context->heap = context->heapStart;
 
@@ -124,13 +141,27 @@
     context->code[CreateTask2]   = createTask2_stub;
     context->code[CreateTask3]   = createTask3_stub;
     context->code[CreateTask4]   = createTask4_stub;
+    context->code[CreateTask5]   = createTask5_stub;
+    context->code[CreateTask6]   = createTask6_stub;
+    context->code[Poll1]         = poll1_stub;
+    context->code[Poll2]         = poll2_stub;
     context->code[PutQueue1]     = putQueue1_stub;
     context->code[PutQueue2]     = putQueue2_stub;
     context->code[PutQueue3]     = putQueue3_stub;
     context->code[PutQueue4]     = putQueue4_stub;
-    context->code[GetQueue]      = getQueue_stub;
+    context->code[GetTask]       = getTask_stub;
+    context->code[GetQueue1]     = getQueue1_stub;
+    context->code[GetInputData1] = getInputData1_stub;
+    context->code[GetInputData2] = getInputData2_stub;
+    context->code[GetOutputData1] = getOutputData1_stub;
+    context->code[GetOutputData2] = getOutputData2_stub;
+    context->code[WaitFor]       = waitFor_stub;
     context->code[SpawnTask]     = spawnTask_stub;
+    context->code[CheckTaskFinish1] = checkTaskFinish1_stub;
+    context->code[CheckTaskFinish2] = checkTaskFinish2_stub;
     context->code[Twice]         = twice_stub;
+    context->code[Add]           = add_stub;
+    context->code[Mult]          = mult_stub;
     context->code[StartTime]     = start_time_stub;
     context->code[EndTime]       = end_time_stub;
     context->code[Exit]          = exit_code;
@@ -175,6 +206,7 @@
 
     context->dataNum = Queue;
     
+    context->metaDataNum = 0;
     context->node_stack = stack_init(sizeof(struct Node*), 100);
     context->code_stack = stack_init(sizeof(enum Code), 100);
 }
--- a/src/parallel_execution/context.h	Thu Sep 29 22:22:36 2016 +0900
+++ b/src/parallel_execution/context.h	Sat Oct 01 00:23:35 2016 +0900
@@ -66,13 +66,27 @@
     CreateTask2,
     CreateTask3,
     CreateTask4,
+    CreateTask5,
+    CreateTask6,
+    Poll1,
+    Poll2,
     PutQueue1,
     PutQueue2,
     PutQueue3,
     PutQueue4,
-    GetQueue,
+    GetTask,
+    GetQueue1,
+    GetInputData1,
+    GetInputData2,
+    GetOutputData1,
+    GetOutputData2,
+    WaitFor,
     SpawnTask,
+    CheckTaskFinish1,
+    CheckTaskFinish2,
     Twice,
+    Add,
+    Mult,
     StartTime,
     EndTime,
     Exit,
@@ -110,7 +124,9 @@
     stack_ptr code_stack;
     stack_ptr node_stack;
     int dataNum;
+    int metaDataNum;
     union Data **data;
+    union MetaData **mData;
 };
 
 union Data {
@@ -136,15 +152,18 @@
 #endif
     struct Task {
         enum Code code;
-        int key;
-        struct Queue* waitMe;
-        struct Queue* waitI;
+        int iKeys[2];
+        int oKeys[1];
         int idsCount;
+        union Data* iargs[2];
+        union Data* oargs[1];
     } task;
     struct Queue {
         struct Element* first;
         struct Element* last;
         int count;
+        enum Code next;
+        enum Code emptyNext;
     } queue;
     struct Element {
         union Data* data;
@@ -183,13 +202,13 @@
         enum Code next;
         long size;
     } allocate;
-    struct OutPutDataSegments {
-        union Data **data;
-    } ods;
+    struct Integer {
+        int value;
+        int mDataOffset;
+    } integer;
 };
 
 union MetaData {
     struct Queue waitMeTasks;
-    struct Queue waitI;
 };
 
--- a/src/parallel_execution/dependency.c	Thu Sep 29 22:22:36 2016 +0900
+++ b/src/parallel_execution/dependency.c	Sat Oct 01 00:23:35 2016 +0900
@@ -3,40 +3,31 @@
 #include "origin_cs.h"
 
 __code meta_waitFor(struct Context* context, struct Queue* queue, enum Code next) {
-    context->data[Queue] = (union Data *)queue;
+    context->data[Queue] = (union Data*)queue;
     goto (context->code[next])(context);
 }
 
-__code waitFor1(struct Context* context, struct Task* master, struct Task* slave, struct Element* element) {
+__code waitFor(struct Context* context, struct Task* slave, struct Element* element, union Data* data) {
+    struct Integer* integer   = (struct Integer *)data;
+    struct Queue* waitMeTasks = (struct Queue *)(integer + integer-> mDataOffset / sizeof(struct Integer));
     element->data = (union Data *)slave;
-    // enqueue waitMe
-    goto meta_waitFor(context, master->waitMe, PutQueue1);
-}
-
-__code waitFor1_stub(struct Context* context) {
-    // next think
+    goto meta_waitFor(context, waitMeTasks, PutQueue1);
 }
 
-//__code waitFor2(struct Context* context, struct Task* master, struct Task* slave, struct Element* element) {
-//    element->task = master;
-//    task->next = context->next;
-//    // enqueue waitI
-//    goto meta_waitFor(context, task->waitI, PutQueue1);
-//}
-//
-//__code waitFor2_stub(struct Context* context) {
-//    // next think
-//}
+__code waitFor_stub(struct Context* context) {
+    goto waitFor(context,
+            &context->data[context->dataNum-2]->task,
+            &context->data[Element]->element,
+            context->data[Node]->node.value);
+}
 
 __code meta_spawnTask(struct Context* context, struct Queue* queue, enum Code next) {
     context->data[Queue] = (union Data *)queue;
     goto (context->code[next])(context);
 }
 
-__code spawnTask(struct Context* context, struct Task* task, struct Element* element, struct Queue* activeQueue, struct Queue* waitQueue) {
-    //printf("spawn Task\n");
-    element->data = (union Data *)task;
-    if (task->waitI->count == task->idsCount) {
+__code spawnTask(struct Context* context, struct Task* task, struct Queue* activeQueue, struct Queue* waitQueue) {
+    if (task->idsCount == 0) {
         //printf("put ActiveQueue\n");
         // enqueue activeQueue
         goto meta_spawnTask(context, activeQueue, PutQueue1);
@@ -49,23 +40,68 @@
 
 __code spawnTask_stub(struct Context* context) {
     goto spawnTask(context,
-            &context->data[context->dataNum-2]->task,
-            &context->data[Element]->element,
+            (struct Task *)(context->data[Element]->element.data),
             &context->data[ActiveQueue]->queue,
             &context->data[WaitQueue]->queue);
 }
 
-__code taskA(struct Context* context) {
-    printf("TaskA\n");
+__code meta_checkTaskFinish1(struct Context* context, struct Queue* queue, enum Code next) {
+    context->data[Queue] = (union Data *)queue;
+    goto (context->code[next])(context);
+}
+
+__code checkTaskFinish1(struct Context* context, union Data* data) {
+    struct Integer* integer = (struct Integer *)data;
+    struct Queue* waitMeTasks = (struct Queue *)(integer + integer->mDataOffset / sizeof(struct Integer));
+    context->next = CheckTaskFinish2;
+    waitMeTasks->emptyNext = GetTask;
+    goto meta_checkTaskFinish1(context, waitMeTasks, GetQueue1);
+}
+
+__code checkTaskFinish1_stub(struct Context* context) {
+    goto checkTaskFinish1(context,
+                          ((struct Task*)(context->data[Element]->element.data))->oargs[0]);
+}
+
+/*
+ * ready input data Gear for waitme task
+ */
+__code checkTaskFinish2(struct Context* context, struct Task* task) {
+    // using cas?
+    if (__sync_fetch_and_sub(&task->idsCount, 1)) {
+        goto meta(context, GetQueue1);
+    } else {
+        goto meta(context, CheckTaskFinish2);
+    }
+}
+
+__code checkTaskFinish2_stub(struct Context* context) {
+    goto checkTaskFinish2(context,
+                          (struct Task*)(context->data[Element]->element.data));
+}
+
+__code add(struct Context* context, struct Integer* input1, struct Integer* input2, struct Integer* output) {
+    output->value = input1->value + input2->value;
+    printf("%d + %d = %d\n", input1->value, input2->value, output->value);
     goto meta(context, context->next);
 }
 
-__code taskB(struct Context* context) {
-    printf("TaskB\n");
+__code add_stub(struct Context* context) {
+    goto add(context,
+            (struct Integer*)((struct Task*)(context->data[Element]->element.data))->iargs[0],
+            (struct Integer*)((struct Task*)(context->data[Element]->element.data))->iargs[1],
+            (struct Integer*)((struct Task*)(context->data[Element]->element.data))->oargs[0]);
+}
+
+__code mult(struct Context* context, struct Integer* input1, struct Integer* input2, struct Integer* output) {
+    output->value = input1->value * input2->value;
+    printf("%d * %d = %d\n", input1->value, input2->value, output->value);
     goto meta(context, context->next);
 }
 
-__code taskC(struct Context* context) {
-    printf("TaskC\n");
-    goto meta(context, context->next);
+__code mult_stub(struct Context* context) {
+    goto mult(context,
+             (struct Integer*)((struct Task*)(context->data[Element]->element.data))->iargs[0],
+             (struct Integer*)((struct Task*)(context->data[Element]->element.data))->iargs[1],
+             (struct Integer*)((struct Task*)(context->data[Element]->element.data))->oargs[0]);
 }
--- a/src/parallel_execution/main.c	Thu Sep 29 22:22:36 2016 +0900
+++ b/src/parallel_execution/main.c	Sat Oct 01 00:23:35 2016 +0900
@@ -6,6 +6,7 @@
 
 extern __code initContext(struct Context* context);
 extern void allocator(struct Context* context);
+extern void metaAllocator(struct Context* context);
 
 int cpu_num = 1;
 int length = 102400;
@@ -14,7 +15,7 @@
 
 void print_queue(struct Element* element) {
     while (element) {
-        printf("%d\n", ((struct Task *)(element->data))->key);
+        //printf("%d\n", ((struct Task *)(element->data))->key);
         element = element->next;
     }
 }
@@ -51,15 +52,15 @@
 __code code2(struct Context* context, struct Array* array, struct LoopCounter* loopCounter) {
     int i = loopCounter->i;
 
-    if (i < length) {
-           //printf("%d\n", array->array[i]);
-        if (array->array[i] == (i*2)) {
-            loopCounter->i++;
-            goto meta(context, Code2);
-        } else
-            puts("wrong result");
+    //if (i < length) {
+    //       //printf("%d\n", array->array[i]);
+    //    if (array->array[i] == (i*2)) {
+    //        loopCounter->i++;
+    //        goto meta(context, Code2);
+    //    } else
+    //        puts("wrong result");
 
-    }
+    //}
 
     goto meta(context, Exit);
 }
@@ -68,13 +69,13 @@
     goto code2(context, &context->data[Node]->node.value->array, &context->data[LoopCounter]->loopCounter);
 }
 
+/*
 __code createData1(struct Context* context, struct Allocate* allocate, struct LoopCounter* loopCounter) {
     int i = loopCounter->i;
 
     if ((length/split*i) < length) {
         allocate->size = sizeof(struct Array);
         allocator(context);
-
         goto meta(context, CreateData2);
     }
 
@@ -106,6 +107,44 @@
             &context->data[LoopCounter]->loopCounter,
             &context->data[context->dataNum]->array,
             &context->data[Node]->node);
+} */
+
+__code createData1(struct Context* context, struct Allocate* allocate, struct LoopCounter* loopCounter) {
+    int i = loopCounter->i;
+
+    if (i < 4) {
+        allocate->size = sizeof(struct Queue);
+        metaAllocator(context);
+        allocate->size = sizeof(struct Integer);
+        allocator(context);
+        goto meta(context, CreateData2);
+    }
+
+    goto meta(context, CreateTask1);
+}
+
+__code createData1_stub(struct Context* context) {
+    goto createData1(context, &context->data[Allocate]->allocate, &context->data[LoopCounter]->loopCounter);
+}
+
+__code createData2(struct Context* context, struct LoopCounter* loopCounter, struct Integer* integer, struct Node* node) {
+    int i = loopCounter->i;
+    integer->value = i;
+    integer->mDataOffset = -sizeof(struct Queue);
+    node->key = i;
+    node->value = (union Data*)integer;
+
+    context->next = CreateData1;
+    loopCounter->i++;
+
+    goto meta(context, PutTree);
+}
+
+__code createData2_stub(struct Context* context) {
+    goto createData2(context,
+            &context->data[LoopCounter]->loopCounter,
+            &context->data[context->dataNum]->integer,
+            &context->data[Node]->node);
 }
 
 __code createTask1(struct Context* context, struct Allocate* allocate) {
@@ -118,18 +157,55 @@
     goto createTask1(context, &context->data[Allocate]->allocate);
 }
 
-__code createTask2(struct Context* context, struct Allocate* allocate) {
-    allocate->size = sizeof(struct Queue);
-    allocator(context);
-    goto meta(context, CreateTask3);
+/*
+   __code createTask2(struct Context* context, struct LoopCounter* loopCounter, struct Task* task, struct Element* element) {
+   int i = loopCounter->i;
+
+   task->code = Twice;
+   task->key = i;
+
+   element->data = (union Data *)task;
+
+   context->next = CreateData1;
+   loopCounter->i++;
+
+   goto meta(context, SpawnTask);
+   }
+
+   __code createTask2_stub(struct Context* context) {
+   goto createTask2(context,
+   &context->data[LoopCounter]->loopCounter,
+   &context->data[context->dataNum]->task,
+   &context->data[Element]->element);
+   }
+*/
+
+
+__code createTask2(struct Context* context, struct LoopCounter* loopCounter, struct Task* task, struct Element* element) {
+    int i = loopCounter->i;
+
+    task->code = Mult;
+    task->iKeys[0] = i-1;
+    task->iKeys[1] = i-2;
+    task->idsCount = 1;
+    loopCounter->i -= 2;
+
+    element->data = (union Data*)task;
+
+    context->next = CreateTask3;
+
+    goto meta(context, SpawnTask);
 }
 
 __code createTask2_stub(struct Context* context) {
-    goto createTask2(context, &context->data[Allocate]->allocate);
+    goto createTask2(context,
+            &context->data[LoopCounter]->loopCounter,
+            &context->data[context->dataNum]->task,
+            &context->data[Element]->element);
 }
 
 __code createTask3(struct Context* context, struct Allocate* allocate) {
-    allocate->size = sizeof(struct Queue);
+    allocate->size = sizeof(struct Task);
     allocator(context);
     goto meta(context, CreateTask4);
 }
@@ -138,92 +214,55 @@
     goto createTask3(context, &context->data[Allocate]->allocate);
 }
 
-__code meta_createTask4(struct Context* context, struct Queue* activeQueue, enum Code next) {
-    context->data[Queue] = (union Data *)activeQueue;
-    goto (context->code[next])(context);
-}
 
-__code createTask4(struct Context* context, struct LoopCounter* loopCounter, struct Task* task, struct Queue* waitMe, struct Queue* waitI, struct Element* element, struct Queue* activeQueue) {
+__code createTask4(struct Context* context, struct LoopCounter* loopCounter, struct Task* task, struct Node* node) {
     int i = loopCounter->i;
 
-    waitMe->first = 0;
-    waitMe->last = 0;
-    waitMe->count = 0;
-
-    waitI->first = 0;
-    waitI->last = 0;
-    waitI->count = 0;
+    task->code = Add;
+    task->oKeys[0] = i;
+    task->iKeys[0] = i-1;
+    task->iKeys[1] = i-2;
+    task->idsCount = 0;
+    loopCounter->i -= 2;
+    context->next = CreateTask5;
 
-    task->code = Twice;
-    task->key = i;
-    task->waitMe = waitMe;
-    task->waitI = waitI;
-    task->idsCount = 0;
-
-    element->data = (union Data *)task;
-
-    context->next = CreateData1;
-    loopCounter->i++;
-
-    goto meta(context, SpawnTask);
+    node->key = task->oKeys[0];
+    goto meta(context, Get);
 }
 
 __code createTask4_stub(struct Context* context) {
     goto createTask4(context,
             &context->data[LoopCounter]->loopCounter,
-            &context->data[context->dataNum-2]->task,
-            &context->data[context->dataNum-1]->queue,
-            &context->data[context->dataNum]->queue,
-            &context->data[Element]->element,
-            &context->data[ActiveQueue]->queue);
+            &context->data[context->dataNum]->task,
+            &context->data[Node]->node
+            );
+}
+
+__code createTask5(struct Context* context, struct Task* task) {
+    context->next = CreateTask6;
+    goto meta(context, WaitFor);
 }
 
+__code createTask5_stub(struct Context* context) {
+    goto createTask5(context,
+            &context->data[context->dataNum]->task
+            );
+}
 
-//__code createTask4(struct Context* context, struct LoopCounter* loopCounter, struct Task* task, struct Queue* waitMe, struct OdsQueue* waitI, struct Element* element, struct Queue* activeQueue) {
-//    int i = loopCounter->i;
-//
-//    task->code = TaskC;
-//    task->key = i;
-//    task->waitMe = waitMe;
-//    task->waitI = waitI;
-//    task->idsCount = 1;
-//
-//    element->data = (union Data *)task;
-//
-//    context->next = CreateData1;
-//    loopCounter->i++;
-//
-//    goto meta_createTask2(context, activeQueue, PutQueue1);
-//}
-//
-//__code createTask4_stub(struct Context* context) {
-//    goto createTask2(context,
-//            &context->data[LoopCounter]->loopCounter,
-//            &context->data[context->dataNum]->task,
-//            &context->data[context->dataNum-1]->queue,
-//            &context->data[context->dataNum-2]->odsQueue,
-//            &context->data[Element]->element,
-//            &context->data[ActiveQueue]->queue);
-//}
-//
-//__code createTask5(struct Context* context, struct LoopCounter* loopCounter, struct Task* task, struct Task* slave, struct Element* element) {
-//    int i = loopCounter->i;
-//
-//    task->code = TaskB;
-//    task->key = i;
-//    element->data = (union Data *)task;
-//
-//    context->next = CreateData1;
-//    loopCounter->i++;
-//
-//    goto meta(context, WaitFor1);
-//}
-//
-//__code createTask5_stub(struct Context* context) {
-//    goto createTask2(context,
-//            &context->data[context->dataNum]->task,
-//            &context->data[Element]->element);
-//}
+__code createTask6(struct Context* context, struct LoopCounter* loopCounter, struct Task* task, struct Element* element) {
+    element->data = (union Data *)task;
+    loopCounter->i = 0;
+    context->next = Code1;
+    goto meta(context, SpawnTask);
+}
+
+__code createTask6_stub(struct Context* context) {
+    goto createTask6(context,
+            &context->data[LoopCounter]->loopCounter,
+            &context->data[context->dataNum-1]->task,
+            &context->data[Element]->element
+            );
+}
 
 __code putQueue1(struct Context* context, struct Allocate* allocate) {
     allocate->size = sizeof(struct Element);
@@ -289,9 +328,11 @@
 
     if (i < worker->num) {
         struct Context* worker_context = &worker->contexts[i];
-        worker_context->next = GetQueue;
+        worker_context->next = GetTask;
         worker_context->data[Tree] = context->data[Tree];
         worker_context->data[ActiveQueue] = context->data[ActiveQueue];
+        worker_context->data[WaitQueue] = context->data[WaitQueue];
+        worker_context->data[Queue] = context->data[ActiveQueue];
         pthread_create(&worker_context->thread, NULL, (void*)&start_code, worker_context);
         worker_context->thread_num = i;
         loopCounter->i++;
@@ -307,25 +348,54 @@
     goto createWorker(context, &context->data[LoopCounter]->loopCounter, &context->data[Worker]->worker);
 }
 
-__code taskManager(struct Context* context, struct LoopCounter* loopCounter, struct Worker* worker) {
+__code taskManager(struct Context* context, struct LoopCounter* loopCounter, struct Worker* worker, struct Queue* waitQueue, struct Queue* ActiveQueue) {
+    if (waitQueue->first) {
+       goto meta(context, Poll1);
+    }
     int i = loopCounter->i;
-
     if (i < worker->num) {
         pthread_join(worker->contexts[i].thread, NULL);
         loopCounter->i++;
 
         goto meta(context, TaskManager);
     }
-
     loopCounter->i = 0;
-
     struct Time *t = &context->data[Time]->time;
     t->next = Code2;
     goto meta(context, EndTime);
 }
 
 __code taskManager_stub(struct Context* context) {
-    goto taskManager(context, &context->data[LoopCounter]->loopCounter, &context->data[Worker]->worker);
+    goto taskManager(context, 
+                     &context->data[LoopCounter]->loopCounter,
+                     &context->data[Worker]->worker,
+                     &context->data[WaitQueue]->queue,
+                     &context->data[ActiveQueue]->queue);
+}
+
+__code meta_poll1(struct Context* context, struct Queue* queue, enum Code next) {
+    context->data[Queue] = (union Data *)queue;
+    goto (context->code[next])(context);
+}
+
+__code poll1(struct Context* context, struct Queue* waitQueue) {
+    usleep(100);
+    context->next = Poll2;
+    goto meta_poll1(context, waitQueue, GetQueue1);
+}
+
+__code poll1_stub(struct Context* context) {
+    goto poll1(context,
+            &context->data[WaitQueue]->queue);
+}
+
+__code poll2(struct Context* context) {
+    context->next = TaskManager;
+    goto meta(context, SpawnTask);
+}
+
+__code poll2_stub(struct Context* context) {
+    goto poll2(context);
 }
 
 void init(int argc, char** argv) {
--- a/src/parallel_execution/worker.c	Thu Sep 29 22:22:36 2016 +0900
+++ b/src/parallel_execution/worker.c	Sat Oct 01 00:23:35 2016 +0900
@@ -3,30 +3,105 @@
 #include "context.h"
 #include "origin_cs.h"
 
-__code getQueue(struct Context* context, struct Queue* queue, struct Node* node) {
-    if (queue->first == 0)
-        return;
+__code meta_getTask(struct Context* context, struct Queue* queue, enum Code next) {
+    context->data[Queue] = (union Data *)queue;
+    goto (context->code[next])(context);
+}
 
-    struct Element* first = queue->first;
-    if (__sync_bool_compare_and_swap(&queue->first, first, first->next)) {
-        queue->count--;
+__code getTask(struct Context* context, struct Queue* activeQueue) {
+    context->next = GetInputData1;
+    activeQueue->emptyNext = GetTask;
+    goto meta_getTask(context, activeQueue, GetQueue1);
+}
 
-        context->next = GetQueue;
-        stack_push(context->code_stack, &context->next);
-
-        context->next = ((struct Task *)(first->data))->code;
-        node->key = ((struct Task *)(first->data))->key;
+__code getTask_stub(struct Context* context) {
+    goto getTask(context, &context->data[ActiveQueue]->queue);
+}
 
-        struct Traverse *t = &context->data[Traverse]->traverse;
-        t->next = GetQueue;
-        goto meta(context, Get);
+__code getQueue1(struct Context* context, struct Queue* queue, struct Element* element) {
+    if (queue->first == 0)
+        goto meta(context, queue->emptyNext);
+
+    element->data = queue->first->data;
+    if (__sync_bool_compare_and_swap(&queue->first, queue->first, queue->next)) {
+        queue->count--;
+        goto meta(context, context->next);
     } else {
-        goto meta(context, GetQueue);
+        goto meta(context, GetQueue1);
     }
 }
 
-__code getQueue_stub(struct Context* context) {
-    goto getQueue(context, &context->data[ActiveQueue]->queue, &context->data[Node]->node);
+__code getQueue1_stub(struct Context* context) {
+    goto getQueue1(context, &context->data[Queue]->queue, &context->data[Element]->element);
+}
+
+__code getInputData1(struct Context* context, struct Task* task, struct Node* node, struct LoopCounter* loopCounter) {
+    int i = loopCounter->i;
+    if(i < sizeof(task->iKeys) / sizeof(int)) {
+        context->next = GetInputData2;
+        node->key = task->iKeys[i];
+        struct Traverse *t = &context->data[Traverse]->traverse;
+        t->next = GetQueue1;
+        goto meta(context, Get);
+    }
+    loopCounter->i = 0;
+    goto meta(context, GetOutputData1);
+}
+
+__code getInputData1_stub(struct Context* context) {
+    goto getInputData1(context,
+            (struct Task*)(context->data[Element]->element.data),
+            &context->data[Node]->node,
+            &context->data[LoopCounter]->loopCounter);
+}
+
+__code getInputData2(struct Context* context, struct Task* task, union Data* data, struct LoopCounter* loopCounter) {
+    int i = loopCounter->i;
+    task->iargs[i] = data;
+    loopCounter->i++;
+    goto meta(context, GetInputData1);
+}
+
+__code getInputData2_stub(struct Context* context) {
+    goto getInputData2(context,
+            (struct Task*)(context->data[Element]->element.data),
+            context->data[Node]->node.value,
+            &context->data[LoopCounter]->loopCounter);
+}
+
+__code getOutputData1(struct Context* context, struct Task* task, struct Node* node, struct LoopCounter* loopCounter) {
+    int i = loopCounter->i;
+    if(i < sizeof(task->oKeys)/ sizeof(int)) {
+        context->next = GetOutputData2;
+        node->key = task->oKeys[i];
+        struct Traverse *t = &context->data[Traverse]->traverse;
+        t->next = GetQueue1;
+        goto meta(context, Get);
+    }
+    context->next = CheckTaskFinish1;
+    loopCounter->i = 0;
+    goto meta(context, task->code);
+}
+
+__code getOutputData1_stub(struct Context* context) {
+    goto getOutputData1(context,
+            (struct Task*)(context->data[Element]->element.data),
+            &context->data[Node]->node,
+            &context->data[LoopCounter]->loopCounter);
+}
+
+__code getOutputData2(struct Context* context, struct Task* task, union Data* data, struct LoopCounter* loopCounter) {
+    int i = loopCounter->i;
+    task->oargs[i] = data;
+    loopCounter->i++;
+    goto meta(context, GetOutputData1);
+}
+
+__code getOutputData2_stub(struct Context* context) {
+    goto getOutputData2(context,
+            (struct Task*)(context->data[Element]->element.data),
+            context->data[Node]->node.value,
+            &context->data[LoopCounter]->loopCounter);
 }
 
 #ifdef USE_CUDA