changeset 398:fc4fcd441700

Fix spanwTasks
author Tatsuki IHA <innparusu@cr.ie.u-ryukyu.ac.jp>
date Tue, 29 Aug 2017 01:01:44 +0900
parents c43ec0e3fa84
children 394e38952c80
files src/parallel_execution/TaskManager.cbc src/parallel_execution/TaskManagerImpl.cbc src/parallel_execution/context.h src/parallel_execution/examples/calc/calc.cbc src/parallel_execution/generate_context.pl src/parallel_execution/generate_stub.pl
diffstat 6 files changed, 125 insertions(+), 97 deletions(-) [+]
line wrap: on
line diff
--- a/src/parallel_execution/TaskManager.cbc	Thu Aug 24 17:23:41 2017 +0900
+++ b/src/parallel_execution/TaskManager.cbc	Tue Aug 29 01:01:44 2017 +0900
@@ -4,7 +4,7 @@
     struct Context** tasks;
     __code spawn(Impl* taskManager, struct Queue* queue, struct Context* task, __code next(...));
     __code spawnTasks(Impl* taskManagerImpl, struct Context** tasks, __code next1(...), struct TaskManager* taskManager);
-    __code shutdown(struct LoopCounter* loopCounter, struct TaskManager* taskManager, Impl* taskManagerImpl, struct Queue* queue, __code next(...));
+    __code shutdown(Impl* taskManagerImpl, __code next(...), struct TaskManager* taskManager, struct Queue* queue);
     __code next(...);
     __code next1(...);
     int worker;
--- a/src/parallel_execution/TaskManagerImpl.cbc	Thu Aug 24 17:23:41 2017 +0900
+++ b/src/parallel_execution/TaskManagerImpl.cbc	Tue Aug 29 01:01:44 2017 +0900
@@ -17,7 +17,6 @@
     taskManager->spawn = C_spawnTaskManagerImpl;
     taskManager->shutdown  = C_shutdownTaskManagerImpl;
     struct TaskManagerImpl* taskManagerImpl = new TaskManagerImpl();
-    taskManagerImpl -> activeQueue = createSingleLinkedQueue(context);
     taskManagerImpl -> taskQueue = createSingleLinkedQueue(context);
     taskManagerImpl -> numWorker = taskManager->maxCPU;
     taskManagerImpl -> loopCounter = new LoopCounter();
@@ -53,71 +52,91 @@
     }
 }
 
-__code spawnTasksTaskManagerImpl(struct TaskManagerImpl* taskManagerImpl, struct Context** tasks, __code next1(...), struct TaskManager* taskManager) {
-    int i = taskManagerImpl->loopCounter->i;
-    if(i < GET_SIZE(tasks)) {
-        struct Context* task = tasks[i];
-        taskManagerImpl->loopCounter->i++;
-        task->taskManager = &taskManager->taskManager->TaskManager;
-        taskManager->context = task;
-        taskManager->next = C_spawnTasksTaskManagerImpl;
-        goto meta(context, C_setWaitTask);
-    }
-    taskManagerImpl->loopCounter->i = 0;
-    goto meta(context, C_spawnTasksTaskManagerImpl1);
+__code spawnTasksTaskManagerImpl(struct TaskManagerImpl* taskManager, struct Queue* tasks, __code next1(...), struct Queue* queue) {
+    queue->queue = (union Data*)tasks;
+    queue->next      = C_spawnTasksTaskManagerImpl1;
+    queue->whenEmpty = C_spawnTasksTaskManagerImpl3;
+    goto meta(context, tasks->isEmpty);
 }
 
-__code spawnTasksTaskManagerImpl_stub(struct Context* context) {
-    TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
-    goto spawnTasksTaskManagerImpl(context,
-                               taskManager,
-                               Gearef(context, TaskManager)->tasks,
-                               Gearef(context, TaskManager)->next1,
-                               Gearef(context, TaskManager));
-}
-
-__code spawnTasksTaskManagerImpl1(struct TaskManagerImpl* taskManagerImpl, struct Context** tasks, __code next1(...), struct TaskManager* taskManager) {
-    int i = taskManagerImpl->loopCounter->i;
-    if(i < GET_SIZE(tasks)) {
-        struct Context* task = tasks[i];
-        taskManagerImpl->loopCounter->i++;
-        taskManager->context = task;
-        taskManager->next = C_spawnTasksTaskManagerImpl1;
-        goto meta(context, C_spawnTaskManagerImpl);
-    }
-    taskManagerImpl->loopCounter->i = 0;
-    goto meta(context, next1);
+__code spawnTasksTaskManagerImpl1(struct TaskManagerImpl* taskManager, struct Queue* tasks, __code next1(...), struct Queue* queue) {
+    queue->queue = (union Data*)tasks;
+    queue->next      = C_spawnTasksTaskManagerImpl2;
+    goto meta(context, tasks->take);
 }
 
 __code spawnTasksTaskManagerImpl1_stub(struct Context* context) {
-    TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
-    goto spawnTasksTaskManagerImpl1(context,
-                               taskManager,
-                               Gearef(context, TaskManager)->tasks,
-                               Gearef(context, TaskManager)->next1,
-                               Gearef(context, TaskManager));
+	TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
+	Queue* tasks = Gearef(context, TaskManager)->tasks;
+	enum Code next1 = Gearef(context, TaskManager)->next1;
+	Queue* queue = Gearef(context, Queue);
+	goto spawnTasksTaskManagerImpl1(context, taskManager, tasks, next1, queue);
+} 
+
+__code spawnTasksTaskManagerImpl2(struct TaskManagerImpl* taskManagerImpl, struct Context* task, struct TaskManager* taskManager) {
+    task->taskManager = &taskManager->taskManager->TaskManager;
+    taskManager->context = task;
+    taskManager->next = C_spawnTasksTaskManagerImpl;
+    goto meta(context, C_setWaitTask);
+}
+
+__code spawnTasksTaskManagerImpl2_stub(struct Context* context) {
+	TaskManagerImpl* taskManagerImpl = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
+	Context* task = (struct Context*)Gearef(context, Queue)->data;
+	TaskManager* taskManager = Gearef(context, TaskManager);
+	goto spawnTasksTaskManagerImpl2(context, taskManagerImpl, task, taskManager);
+} 
+
+__code spawnTasksTaskManagerImpl3(struct TaskManagerImpl* taskManager, __code next1(...), struct Queue* queue) {
+    queue->queue     = (union Data*)taskManager->taskQueue;
+    queue->next      = C_spawnTasksTaskManagerImpl4;
+    queue->whenEmpty = next1;
+    goto meta(context, taskManager->taskQueue->isEmpty);
 }
 
-__code setWaitTask(struct Queue* queue, struct LoopCounter* loopCounter, struct Context* task, __code next(...)) {
-    int i = loopCounter->i;
+__code spawnTasksTaskManagerImpl4(struct TaskManagerImpl* taskManager, __code next1(...), struct Queue* queue) {
+    queue->queue = (union Data*)taskManager->taskQueue;
+    queue->next      = C_spawnTasksTaskManagerImpl5;
+    goto meta(context, taskManager->taskQueue->take);
+}
+
+__code spawnTasksTaskManagerImpl5(struct TaskManagerImpl* taskManagerImpl, struct Context* task, struct TaskManager* taskManager) {
+    taskManager->context = task;
+    taskManager->next = C_spawnTasksTaskManagerImpl3;
+    goto meta(context, C_spawnTaskManagerImpl);
+}
+
+__code spawnTasksTaskManagerImpl5_stub(struct Context* context) {
+	TaskManagerImpl* taskManagerImpl = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
+	Context* task = (struct Context*)Gearef(context, Queue)->data;
+	TaskManager* taskManager = Gearef(context, TaskManager);
+	goto spawnTasksTaskManagerImpl5(context, taskManagerImpl, task, taskManager);
+} 
+
+__code setWaitTask(struct TaskManagerImpl* taskManager, struct Context* task, __code next(...), struct Queue* queue) {
+    int i = taskManager->loopCounter->i;
     if(task->idg+i < task->maxIdg) {
         queue->queue = (Data *)GET_WAIT_LIST(task->data[task->idg + i]);
+        queue->data  = (Data *)task;
         queue->next  = C_setWaitTask;
-        queue->data  = (Data *)task;
-        loopCounter->i++;
+        taskManager->loopCounter->i++;
         goto meta(context, queue->queue->Queue.put);
     }
-    loopCounter->i = 0;
-    goto next(...);
+    taskManager->loopCounter->i = 0;
+    queue->queue = (Data *)taskManager->taskQueue;
+    queue->data  = (Data *)task;
+    queue->next  = next;
+    goto meta(context, taskManager->taskQueue->put);
 }
 
 __code setWaitTask_stub(struct Context* context) {
+    TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
     struct Context* task = Gearef(context, TaskManager)->context;
     goto setWaitTask(context,
-                     Gearef(context, Queue),
-                     Gearef(task, LoopCounter),
+                     taskManager,
                      task,
-                     Gearef(context, TaskManager)->next);
+                     Gearef(context, TaskManager)->next,
+                     Gearef(context, Queue));
 }
 
 __code spawnTaskManagerImpl(struct TaskManagerImpl* taskManager, struct Iterator* iterator, struct Context* task, __code next(...)) {
@@ -167,8 +186,8 @@
             Gearef(context, TaskManager)->next);
 }
 
-__code shutdownTaskManagerImpl(struct LoopCounter* loopCounter, struct TaskManager* taskManager, struct TaskManagerImpl* taskManagerImpl, struct Queue* queue, __code next(...)) {
-    int i = loopCounter->i;
+__code shutdownTaskManagerImpl(struct TaskManagerImpl* taskManagerImpl, __code next(...), struct TaskManager* taskManager, struct Queue* queue) {
+    int i = taskManagerImpl->loopCounter->i;
     if (taskManager->cpu <= i && i < taskManager->maxCPU) {
         struct Queue* tasks = taskManagerImpl->workers[i]->tasks;
         queue->queue = (union Data*)tasks;
@@ -177,23 +196,27 @@
         goto meta(context, tasks->put);
     }
 
-    loopCounter->i = 0;
+    taskManagerImpl->loopCounter->i = 0;
     goto meta(context, next);
 }
 
 __code shutdownTaskManagerImpl_stub(struct Context* context) {
     TaskManagerImpl* taskManagerImpl = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
-    goto shutdownTaskManagerImpl(context, Gearef(context, LoopCounter), &Gearef(context, TaskManager)->taskManager->TaskManager, taskManagerImpl, Gearef(context, Queue), Gearef(context, TaskManager)->next);
+    goto shutdownTaskManagerImpl(context,
+                                 taskManagerImpl,
+                                 Gearef(context, TaskManager)->next,
+                                 &Gearef(context, TaskManager)->taskManager->TaskManager,
+                                 Gearef(context, Queue));
 }
 
-__code shutdownTaskManagerImpl1(struct LoopCounter* loopCounter, TaskManagerImpl* taskManagerImpl) {
-    int i = loopCounter->i;
-    pthread_join(taskManagerImpl->workers[i]->worker->CPUWorker.thread, NULL);
-    loopCounter->i++;
+__code shutdownTaskManagerImpl1(TaskManagerImpl* taskManager) {
+    int i = taskManager->loopCounter->i;
+    pthread_join(taskManager->workers[i]->worker->CPUWorker.thread, NULL);
+    taskManager->loopCounter->i++;
     goto meta(context, C_shutdownTaskManagerImpl);
 }
 
 __code shutdownTaskManagerImpl1_stub(struct Context* context) {
     TaskManagerImpl* taskManagerImpl = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
-    goto shutdownTaskManagerImpl1(context, Gearef(context, LoopCounter), taskManagerImpl);
+    goto shutdownTaskManagerImpl1(context, taskManagerImpl);
 }
--- a/src/parallel_execution/context.h	Thu Aug 24 17:23:41 2017 +0900
+++ b/src/parallel_execution/context.h	Tue Aug 29 01:01:44 2017 +0900
@@ -88,7 +88,7 @@
     int odg;
     int maxOdg;
     int workerId;
-    struct Context** contexts;
+    struct Queue* tasks;
 #ifdef USE_CUDAWorker
     int num_exec;
     CUmodule module;
@@ -126,13 +126,13 @@
 #endif
         union Data* taskManager;
         enum Code spawn;      // start NEW context on the worker
-        enum Code spawnTasks; // start NEW contexts on the worker
+        enum Code spawnTasks; // start NEW tasks on the worker
         enum Code shutdown;
         enum Code next;
         enum Code next1;
         enum Code task;
         struct Context* context;
-        struct Context** tasks;
+        struct Queue* tasks;
         union Data* data;
         int worker;
         int cpu;
@@ -333,6 +333,7 @@
 extern __code start_code(struct Context* context);
 extern __code exit_code(struct Context* context);
 extern __code meta(struct Context* context, enum Code next);
+extern __code par_meta(struct Context* context, enum Code spawns, enum Code next);
 extern void initContext(struct Context* context);
 
 #endif
--- a/src/parallel_execution/examples/calc/calc.cbc	Thu Aug 24 17:23:41 2017 +0900
+++ b/src/parallel_execution/examples/calc/calc.cbc	Tue Aug 29 01:01:44 2017 +0900
@@ -105,8 +105,6 @@
 }
 
 __code createTask2(struct LoopCounter* loopCounter, struct TaskManager* taskManager) {
-    struct Context** tasks = (struct Context**)ALLOC_ARRAY(context, Context, 3);
-
     Integer* integer1 = &ALLOCATE_DATA_GEAR(context, Integer)->Integer;
     Integer* integer2 = &ALLOCATE_DATA_GEAR(context, Integer)->Integer;
     Integer* integer3 = &ALLOCATE_DATA_GEAR(context, Integer)->Integer;
@@ -118,7 +116,7 @@
 
     par goto initIntegerDataGears(integer2, integer4, integer5, __exit);
 
-    goto crateTask1();
+    goto createTask1();
 }
 
 void init(int argc, char** argv) {
--- a/src/parallel_execution/generate_context.pl	Thu Aug 24 17:23:41 2017 +0900
+++ b/src/parallel_execution/generate_context.pl	Tue Aug 29 01:01:44 2017 +0900
@@ -150,17 +150,13 @@
     context->data = NEWN(ALLOCATE_SIZE, union Data*);
     context->heapStart = NEWN(context->heapLimit, char);
     context->heap = context->heapStart;
-
     // context->codeNum = Exit;
 
 $code_init
 
 #include "dataGearInit.c"
-
-    // context->data[D_ActiveQueue] = createSynchronizedQueue(context);
-    // context->data[D_WaitQueue]   = createSynchronizedQueue(context);
-
     context->dataNum = D_Worker;
+    context->tasks = createSingleLinkedQueue(context);
 }
 EOFEOF
 
@@ -169,11 +165,10 @@
 
 my $meta_call = <<"EOFEOF";
 __code par_meta(struct Context* context, enum Code spawns, enum Code next) {
-    if (context->contexts != NULL) {
-        context->contexts = NULL;
-        goto (context->code[spawns])(context);
-    }
-    goto (context->code[next])(context);
+    Gearef(context, Queue)->queue = (union Data*)context->tasks;
+    Gearef(context, Queue)->whenEmpty = next;
+    Gearef(context, Queue)->next = spawns;
+    goto (context->code[context->tasks->isEmpty])(context);
 }
 
 __code meta(struct Context* context, enum Code next) {
--- a/src/parallel_execution/generate_stub.pl	Thu Aug 24 17:23:41 2017 +0900
+++ b/src/parallel_execution/generate_stub.pl	Tue Aug 29 01:01:44 2017 +0900
@@ -327,7 +327,7 @@
                     if ($args =~ s/^(\s)*\_\_code\s+(\w+)\(([^)]*)\)//) {
                         my $next = $2;
                         my @args = split(/,/,$3);
-                        if ( &generateStubArgs($codeGearName, $next, "Code", $next, $interface,0) ) {
+                        if (&generateStubArgs($codeGearName, $next, "Code", $next, $interface,0) ) {
                             $newArgs .= "enum Code $next";
                         }
                         # analyze continuation arguments
@@ -422,22 +422,19 @@
                 my $outputCount = $codeGear{$codeGearName}->{'output'};
                 if (! $inParGoto) {
                     $inParGoto = 1;
-                    my $initTasks = << "EOFEOF";
-                    ${prev}struct Context** tasks = (struct Context**)ALLOC_ARRAY(context, Context, ?);
-                    ${prev}int taskCount = 0;
-EOFEOF
-                    print $fd $initTasks;
+                    print $fd "${prev}struct SingleLinkedQueue* queue = &context->tasks->queue->SingleLinkedQueue;\n";
+                    print $fd "${prev}struct Element* element;\n";
+                    print $fd "${prev}struct Context* task;\n";
                 }
-
                 my $initTask = << "EOFEOF";
-                ${prev}struct Context* task = NEW(struct Context);
-                ${prev}initContext(task);
-                ${prev}task->next = C_$codeGearName;
-                ${prev}task->idgCount = $inputCount;
-                ${prev}task->idg = task->dataNum;
-                ${prev}task->maxIdg = task->idg + $inputCount;
-                ${prev}task->odg = task->maxIdg;
-                ${prev}task->maxOdg = task->odg + $outputCount;
+${prev}task = NEW(struct Context);
+${prev}initContext(task);
+${prev}task->next = C_$codeGearName;
+${prev}task->idgCount = $inputCount;
+${prev}task->idg = task->dataNum;
+${prev}task->maxIdg = task->idg + $inputCount;
+${prev}task->odg = task->maxIdg;
+${prev}task->maxOdg = task->odg + $outputCount;
 EOFEOF
                 print $fd $initTask;
                 for my $i (0..$inputCount-1) {
@@ -447,9 +444,20 @@
                 for my $i (0..$outputCount-1) {
                     print $fd "${prev}task->data[task->odg+$i] = (union Data*)@dataGears[$inputCount+$i];\n";
                 }
-
-                print $fd "${prev}tasks[taskCount] = task;\n";
-                print $fd "${prev}taskCount++;\n";
+                my $putTask = << "EOFEOF";
+${prev}element = &ALLOCATE(context, Element)->Element;
+${prev}element->next = NULL;
+${prev}element->data = (union Data*)task;
+${prev}if (queue->last) {
+${prev}    Element* last = queue->last;
+${prev}    last->next  = element;
+${prev}    queue->last = element;
+${prev}} else {
+${prev}    queue->top  = element;
+${prev}    queue->last = element;
+${prev}}
+EOFEOF
+                print $fd $putTask;
                 next;
             } elsif (/^(.*)goto (\w+)\((.*)\);/) {
                 # handling goto statement  
@@ -470,25 +478,28 @@
                         print $fd "\t*O_$arg = $v;\n";
                     }
                     if ($inParGoto) {
-                        print $fd "${prev}taskManager->contexts = tasks;\n";
-                        print $fd "${prev}goto par_meta(context, Gearef(context, TaskManager)->taskManager->TaskManager.spawnTasks, $next);\n";
+                        print $fd "${prev}taskManager->tasks = context->tasks;\n";
+                        print $fd "${prev}taskManager->next1 = C_$next;\n";
+                        print $fd "${prev}goto par_meta(context, Gearef(context, TaskManager)->taskManager->TaskManager.spawnTasks, C_$next);\n";
                     } else {
                         print $fd "${prev}goto meta(context, $next);\n";
                     }
                     next;
                 }
                 if ($inParGoto) {
-                    print $fd "${prev}taskManager->contexts = tasks;\n";
-                    print $fd "${prev}goto par_meta(context, Gearef(context, TaskManager)->taskManager->TaskManager.spawnTasks, $next);\n";
+                    print $fd "${prev}taskManager->tasks = context->tasks;\n";
+                    print $fd "${prev}taskManager->next1 = C_$next;\n";
+                    print $fd "${prev}goto par_meta(context, Gearef(context, TaskManager)->taskManager->TaskManager.spawnTasks, C_$next);\n";
                     next;
                 }
+            } elsif(/^}/) {
+                $inParGoto = 0;
             }
             else {
                 s/new\s+(\w+)\(\)/\&ALLOCATE(context, \1)->\1/g;   # replacing new
             }
             # gather type name and type
         } elsif (/^}/) {
-            $inParGoto = 0;
             $inStub = 0;
             $inTypedef = 0;
         }