changeset 244:d1567718f12c

Fix error
author Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
date Wed, 25 Jan 2017 01:48:47 +0900
parents 9f3f8ed6ed9f
children 308368406fe7
files src/parallel_execution/context.h src/parallel_execution/taskManager.c src/parallel_execution/worker.c
diffstat 3 files changed, 26 insertions(+), 51 deletions(-) [+]
line wrap: on
line diff
--- a/src/parallel_execution/context.h	Tue Jan 24 18:39:42 2017 +0900
+++ b/src/parallel_execution/context.h	Wed Jan 25 01:48:47 2017 +0900
@@ -40,7 +40,7 @@
     struct Meta* meta = (struct Meta*)context->heap;\
     context->heap += sizeof(struct Meta);\
     union Data* data = context->heap; \
-    context->heap += sizeof(struct dseg)*len; \
+    context->heap += sizeof(struct dseg *)*len; \
     meta->type = D_##dseg; \
     meta->size = len; \
     data; })
@@ -116,13 +116,14 @@
         pthread_mutex_t mutex;
         struct Queue* activeQueue;
         struct Queue* taskQueue;
-        struct Worker* workers;
+        struct Worker** workers;
     } TaskManagerImpl;
     struct Worker {
         union Data* worker;
         enum Code taskReceive;
         enum Code shutdown;
         enum Code next;
+        struct Queue* tasks;
     } Worker;
     struct CPUWorker {
         pthread_t thread;
@@ -130,7 +131,6 @@
         pthread_cond_t cond;
         struct Context* context;
         int id;
-        struct Queue* tasks;
     } CPUWorker;
 #ifdef USE_CUDA
     struct CudaWorker {
--- a/src/parallel_execution/taskManager.c	Tue Jan 24 18:39:42 2017 +0900
+++ b/src/parallel_execution/taskManager.c	Wed Jan 25 01:48:47 2017 +0900
@@ -31,7 +31,7 @@
 void createWorkers(Context* context, TaskManager* taskManager) {
     int i = 0;
     TaskManagerImpl* taskManagerImpl = (TaskManagerImpl*)GearImpl(context,TaskManager,taskManager);
-    taskManagerImpl->workers = (Worker*)ALLOC_ARRAY(context,Worker,taskManager->maxCPU);
+    taskManagerImpl->workers = (Worker**)ALLOC_ARRAY(context, Worker, taskManager->maxCPU);
     for (;i<taskManager->gpu;i++) {
         Queue* queue = &createSynchronizedQueue(context)->Queue;
         taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context, i, queue);
@@ -59,17 +59,17 @@
     goto createTask(context,Gearef(context,TaskManager));
 }
 
-__code setRunWorker(struct Context* context, TaskManagerImpl* taskManager, enum Code next) {
-    task->workerId = taskManagerImpl->sendWorkerIndex;
-    if(++taskManagerImpl->sendWorkerIndex >= taskManagerImpl->numWorker) {
-        taskManagerImpl->sendWorkerIndex = 0;
+__code setRunWorker(struct Context* context, TaskManagerImpl* taskManager, Context* task, enum Code next) {
+    task->workerId = taskManager->sendWorkerIndex;
+    if(++taskManager->sendWorkerIndex >= taskManager->numWorker) {
+        taskManager->sendWorkerIndex = 0;
     }
     goto meta(context, next);
 }
 
 __code setRunWorker_stub(struct Context* context) {
     TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
-    goto createTask(context, taskManager, Gearef(context, TaskManager)->next);
+    goto setRunWorker(context, taskManager, Gearef(context, TaskManager)->context, Gearef(context, TaskManager)->next);
 }
 
 __code spawnTaskManager(struct Context* context, struct TaskManagerImpl* taskManager, struct Queue* queue, struct Context* task, enum Code next) {
@@ -103,12 +103,12 @@
 
 __code spawnTaskManager1_stub(struct Context* context) {
     TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
-    goto spawnTaskManager(context,
+    goto spawnTaskManager1(context,
                           taskManager,
                           Gearef(context, TaskManager)->next);
 }
 
-__code taskSend(struct Context* context, TaskManagerImpl* taskManager) {
+__code taskSend(struct Context* context, TaskManagerImpl* taskManager, Queue* queue) {
     queue->queue = (union Data*)taskManager->activeQueue;
     queue->next = C_taskSend1;
     goto meta(context, taskManager->activeQueue->put);
@@ -116,51 +116,27 @@
 
 __code taskSend_stub(struct Context* context) {
     TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
-    goto taskSend(context, taskManager);
+    goto taskSend(context, taskManager, Gearef(context, Queue));
 }
 
-__code taskSend1(struct Context* context, TaskManagerImpl* taskManager, struct Context* task, enum Code next) {
+__code taskSend1(struct Context* context, TaskManagerImpl* taskManager, Queue* queue, struct Context* task, enum Code next) {
     struct Queue* tasks = taskManager->workers[task->workerId]->tasks;
-    queue->queue = tasks;
-    queue->data = (union Data*)taskManager->context;
+    queue->queue = (union Data*)tasks;
+    queue->data = (union Data*)task;
     queue->next = next;
-    pthread_cond_signal(&taskManager->workers[task->workerId]->cond);
+    pthread_cond_signal(&taskManager->workers[task->workerId]->worker->CPUWorker.cond);
     goto meta(context, tasks->put);
 }
 
 __code taskSend1_stub(struct Context* context) {
     TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
-    goto taskSend(context, taskManager, Gearef(context, TaskManager)->context, Gearef(context, TaskManager)->next);
+    goto taskSend1(context, taskManager, Gearef(context, Queue), Gearef(context, TaskManager)->context, Gearef(context, TaskManager)->next);
 }
 
-__code createWorker1(struct Context* context, struct LoopCounter* loopCounter, struct Worker* worker) {
+__code shutdownTaskManager(struct Context* context, struct LoopCounter* loopCounter, TaskManager* taskManager, TaskManagerImpl* taskManagerImpl) {
     int i = loopCounter->i;
-
-    if (i < worker->id) {
-        struct Context* worker_context = &worker->contexts[i];
-        worker_context->next = C_getTask1;
-        worker_context->data[D_Tree] = context->data[D_Tree];
-        // worker_context->data[D_ActiveQueue] = context->data[D_ActiveQueue];                                                            
-        pthread_create(&worker_context->thread, NULL, (void*)&start_code, worker_context);
-        worker_context->thread_num = i;
-        loopCounter->i++;
-
-        goto meta(context, C_createWorker1);
-    }
-
-    loopCounter->i = 0;
-    goto meta(context, C_taskManager);
-}
-
-__code createWorker1_stub(struct Context* context) {
-    goto createWorker1(context, &context->data[D_LoopCounter]->LoopCounter, &context->data[D_Worker]->Worker);
-}
-
-__code shutdownTaskManager(struct Context* context, struct LoopCounter* loopCounter, struct Worker* worker, struct TaskManager* taskManager) {
-    int i = loopCounter->i;
-
-    if (i < worker->id) {
-        pthread_join(worker->contexts[i].thread, NULL);
+    if (taskManager->cpu <= i && i < taskManager->maxCPU) {
+        pthread_join(taskManagerImpl->workers[i]->worker->CPUWorker.thread, NULL);
         loopCounter->i++;
 
         goto meta(context, C_shutdownTaskManager);
@@ -171,5 +147,6 @@
 }
 
 __code shutdownTaskManager_stub(struct Context* context) {
-    goto shutdownTaskManager(context, &context->data[D_LoopCounter]->LoopCounter, &context->data[D_Worker]->Worker, Gearef(context, TaskManager));
+    TaskManagerImpl* taskManagerImpl = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
+    goto shutdownTaskManager(context, Gearef(context, LoopCounter), Gearef(context, TaskManager), taskManagerImpl);
 }
--- a/src/parallel_execution/worker.c	Tue Jan 24 18:39:42 2017 +0900
+++ b/src/parallel_execution/worker.c	Wed Jan 25 01:48:47 2017 +0900
@@ -9,7 +9,7 @@
     struct Worker* worker = ALLOC(context, Worker);
     struct CPUWorker* cpuWorker = ALLOC(context, CPUWorker);
     worker->worker = (union Data*)cpuWorker;
-    cpuWorker->tasks = queue;
+    worker->tasks = queue;
     cpuWorker->id = id;
     worker->taskReceive = C_taskReceiveWorker;
     worker->shutdown = C_shutdownWorker;
@@ -25,15 +25,14 @@
     goto meta(cpuWorker->context, C_taskReceiveWorker);
 }
 
-__code taskReceiveWorker(struct Context* context, CPUWorker* worker) {
+__code taskReceiveWorker(struct Context* context, Worker* worker) {
     Queue* queue = worker->tasks;
     queue->next = C_getTask1;
     goto meta(context, queue->take);
 }
 
 __code taskReceiveWorker_stub(struct Context* context) {
-    CPUWorker* worker = (CPUWorker *)GearImpl(context, Worker, worker);
-    goto taskReceiveWorker(context,worker);
+    goto taskReceiveWorker(context, Gearef(context, Worker));
 }
 
 __code getTask1(struct Context* context, Worker* worker, struct Context* task) {
@@ -45,8 +44,7 @@
 
 __code getTask1_stub(struct Context* context) {
     Worker* worker = Gearef(context,Worker);
-    CPUWorker* cpuWorker = (CPUWorker *)GearImpl(context, Worker, worker);
-    Context* task = &cpuWorker->tasks->data->context; 
+    Context* task = &worker->tasks->data->context; 
     goto getTask1(context,worker,task);
 }