changeset 269:5170539348ec

rename TaskManagerImpl.cbc
author mir3636
date Sun, 29 Jan 2017 22:15:32 +0900
parents 378ce6f74f4b
children b6ed4b2a5d9d
files src/parallel_execution/TaskManager.cbc src/parallel_execution/TaskManagerImpl.cbc src/parallel_execution/generate_stub.pl
diffstat 3 files changed, 156 insertions(+), 142 deletions(-) [+]
line wrap: on
line diff
--- a/src/parallel_execution/TaskManager.cbc	Sun Jan 29 21:30:58 2017 +0900
+++ b/src/parallel_execution/TaskManager.cbc	Sun Jan 29 22:15:32 2017 +0900
@@ -1,140 +1,14 @@
-#include "../context.h"
-#include "../stack.h"
-#include "../queue.h"
-#include "../worker.h"
-#include "../origin_cs.h"
-#include <stdio.h>
-
-void createWorkers(struct Context* context, TaskManager* taskManeger, TaskManagerImpl* taskManagerImpl);
-
-TaskManager** createTaskManager(struct Context* context, int numCPU, int numGPU, int numIO) {
-    struct TaskManager* taskManager = new TaskManager();
-    // 0...numIO-1 IOProcessor 
-    // numIO...numIO+numGPU-1 GPUProcessor
-    // numIO+numGPU...numIO+numGPU+numCPU-1 CPUProcessor
-    taskManager->io = 0;
-    taskManager->gpu = numIO;
-    taskManager->cpu = numIO+numGPU;
-    taskManager->maxCPU = numIO+numGPU+numCPU;
-    taskManager->createTask = C_createTask;
-    taskManager->spawn = C_spawnTaskManager;
-    taskManager->shutdown  = C_shutdownTaskManager;
-    struct TaskManagerImpl* taskManagerImpl = new TaskManagerImpl();
-    taskManager->taskManager = (union Data*)taskManagerImpl;
-    taskManagerImpl -> activeQueue = &createSingleLinkedQueue(context)->Queue;
-    taskManagerImpl -> taskQueue = &createSingleLinkedQueue(context)->Queue;
-    taskManagerImpl -> numWorker = taskManager->maxCPU;
-    createWorkers(context, taskManager, taskManagerImpl);
-    return taskManager;
-}
-
-void createWorkers(struct Context* context, TaskManager* taskManager, TaskManagerImpl* taskManagerImpl) {
-    int i = 0;
-    taskManagerImpl->workers = (Worker**)ALLOC_ARRAY(context, Worker, taskManager->maxCPU);
-    for (;i<taskManager->gpu;i++) {
-        Queue* queue = &createSynchronizedQueue(context)->Queue;
-        taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context, i, queue);
-    }
-    for (;i<taskManager->cpu;i++) {
-#ifdef USE_CUDA
-#else
-        Queue* queue = &createSynchronizedQueue(context)->Queue;
-        taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context, i, queue);
-#endif        
-    }
-    for (;i<taskManager->maxCPU;i++) {
-        Queue* queue = &createSynchronizedQueue(context)->Queue;
-        taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context, i, queue);
-    }
-}
-
-__code createTask(struct TaskManager* taskManager) {
-    taskManager->context = NEW(struct Context);
-    initContext(taskManager->context);
-    goto C_setWorker(...);
-}
-
-__code setWorker(struct TaskManagerImpl* taskManager, struct Context* task, __code next(...)) {
-    task->workerId = taskManager->sendWorkerIndex;
-    if(++taskManager->sendWorkerIndex >= taskManager->numWorker) {
-        taskManager->sendWorkerIndex = 0;
-    }
-    goto next(...);
-}
-
-__code spawnTaskManager(struct TaskManagerImpl* taskManager, struct Queue* queue, struct Context* task, __code next(...)) {
-    if (task->idgCount == 0) {
-        // enqueue activeQueue
-        queue->queue = (union Data*)taskManager->activeQueue;
-    } else {
-        // enqueue waitQueue
-        queue->queue = (union Data*)taskManager->taskQueue;
-    }
-    queue->data = (union Data*)task;
-    queue->next = C_spawnTaskManager1;
-    goto meta(context, queue->queue->Queue.put);
-}
-
-__code spawnTaskManager_stub(struct Context* context) {
-    TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
-    pthread_mutex_lock(&taskManager->mutex);
-    goto spawnTaskManager(context,
-                          taskManager,
-                          Gearef(context, Queue),
-                          Gearef(context, TaskManager)->context,
-                          Gearef(context, TaskManager)->next);
-}
-
-
-__code spawnTaskManager1(struct TaskManagerImpl* taskManager) {
-    pthread_mutex_unlock(&taskManager->mutex);
-    goto C_taskSend(...);
-}
-
-__code spawnTaskManager1_stub(struct Context* context) {
-    TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
-    goto spawnTaskManager1(context,
-                           taskManager);
-}
-
-__code taskSend(struct TaskManagerImpl* taskManager, struct Queue* queue) {
-    queue->queue = (union Data*)taskManager->activeQueue;
-    queue->next = C_taskSend1;
-    goto meta(context, taskManager->activeQueue->take);
-}
-
-__code taskSend1(struct TaskManagerImpl* taskManager, struct Queue* queue, struct Context* task, __code next(...)) {
-    struct Queue* tasks = taskManager->workers[task->workerId]->tasks;
-    queue->queue = (union Data*)tasks;
-    queue->data = (union Data*)task;
-    queue->next = next;
-    pthread_cond_signal(&taskManager->workers[task->workerId]->worker->CPUWorker.cond);
-    goto meta(context, tasks->put);
-}
-
-__code taskSend1_stub(struct Context* context) {
-    TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
-    goto taskSend1(context, taskManager, Gearef(context, Queue), Gearef(context, TaskManager)->context, Gearef(context, TaskManager)->next);
-}
-
-__code shutdownTaskManager(struct LoopCounter* loopCounter, struct TaskManager* taskManager, struct TaskManagerImpl* taskManagerImpl, struct Queue* queue, __code next(...)) {
-    int i = loopCounter->i;
-    if (taskManager->cpu <= i && i < taskManager->maxCPU) {
-        struct Queue* tasks = taskManagerImpl->workers[i]->tasks;
-        queue->queue = (union Data*)tasks;
-        queue->data = NULL;
-        queue->next = next;
-        goto meta(context, tasks->put);
-        pthread_join(taskManagerImpl->workers[i]->worker->CPUWorker.thread, NULL);
-        loopCounter->i++;
-        goto C_shutdownTaskManager(...);
-    }
-
-    loopCounter->i = 0;
-    goto meta(context, taskManager->next);
-}
-
-__code shutdownTaskManager_stub(struct Context* context) {
-    TaskManagerImpl* taskManagerImpl = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
-    goto shutdownTaskManager(context, Gearef(context, LoopCounter), Gearef(context, TaskManager), taskManagerImpl, Gearef(context, Queue), Gearef(context, TaskManager)->next);
-}
+typedef struct TaskMabager<Impl>{
+    union Data* taskManager;
+    __code createTask(struct TaskManager* taskManager);
+    __code spawn(Impl* taskManager, struct Queue* queue, struct Context* task, __code next(...));
+    __code shutdown(struct LoopCounter* loopCounter, struct TaskManager* taskManager, Impl* taskManagerImpl, struct Queue* queue, __code next(...));
+    __code next(...);
+    __code task(...);
+    struct Context* context;
+    int worker;
+    int cpu;
+    int gpu;
+    int io;
+    int maxCPU;
+} TaskManager;
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/parallel_execution/TaskManagerImpl.cbc	Sun Jan 29 22:15:32 2017 +0900
@@ -0,0 +1,140 @@
+#include "../context.h"
+#include "../stack.h"
+#include "../queue.h"
+#include "../worker.h"
+#include "../origin_cs.h"
+#include <stdio.h>
+
+void createWorkers(struct Context* context, TaskManager* taskManeger, TaskManagerImpl* taskManagerImpl);
+
+TaskManager** createTaskManager(struct Context* context, int numCPU, int numGPU, int numIO) {
+    struct TaskManager* taskManager = new TaskManager();
+    // 0...numIO-1 IOProcessor 
+    // numIO...numIO+numGPU-1 GPUProcessor
+    // numIO+numGPU...numIO+numGPU+numCPU-1 CPUProcessor
+    taskManager->io = 0;
+    taskManager->gpu = numIO;
+    taskManager->cpu = numIO+numGPU;
+    taskManager->maxCPU = numIO+numGPU+numCPU;
+    taskManager->createTask = C_createTask;
+    taskManager->spawn = C_spawnTaskManager;
+    taskManager->shutdown  = C_shutdownTaskManager;
+    struct TaskManagerImpl* taskManagerImpl = new TaskManagerImpl();
+    taskManager->taskManager = (union Data*)taskManagerImpl;
+    taskManagerImpl -> activeQueue = &createSingleLinkedQueue(context)->Queue;
+    taskManagerImpl -> taskQueue = &createSingleLinkedQueue(context)->Queue;
+    taskManagerImpl -> numWorker = taskManager->maxCPU;
+    createWorkers(context, taskManager, taskManagerImpl);
+    return taskManager;
+}
+
+void createWorkers(struct Context* context, TaskManager* taskManager, TaskManagerImpl* taskManagerImpl) {
+    int i = 0;
+    taskManagerImpl->workers = (Worker**)ALLOC_ARRAY(context, Worker, taskManager->maxCPU);
+    for (;i<taskManager->gpu;i++) {
+        Queue* queue = &createSynchronizedQueue(context)->Queue;
+        taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context, i, queue);
+    }
+    for (;i<taskManager->cpu;i++) {
+#ifdef USE_CUDA
+#else
+        Queue* queue = &createSynchronizedQueue(context)->Queue;
+        taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context, i, queue);
+#endif        
+    }
+    for (;i<taskManager->maxCPU;i++) {
+        Queue* queue = &createSynchronizedQueue(context)->Queue;
+        taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context, i, queue);
+    }
+}
+
+__code createTask(struct TaskManager* taskManager) {
+    taskManager->context = NEW(struct Context);
+    initContext(taskManager->context);
+    goto C_setWorker(...);
+}
+
+__code setWorker(struct TaskManagerImpl* taskManager, struct Context* task, __code next(...)) {
+    task->workerId = taskManager->sendWorkerIndex;
+    if(++taskManager->sendWorkerIndex >= taskManager->numWorker) {
+        taskManager->sendWorkerIndex = 0;
+    }
+    goto next(...);
+}
+
+__code spawnTaskManager(struct TaskManagerImpl* taskManager, struct Queue* queue, struct Context* task, __code next(...)) {
+    if (task->idgCount == 0) {
+        // enqueue activeQueue
+        queue->queue = (union Data*)taskManager->activeQueue;
+    } else {
+        // enqueue waitQueue
+        queue->queue = (union Data*)taskManager->taskQueue;
+    }
+    queue->data = (union Data*)task;
+    queue->next = C_spawnTaskManager1;
+    goto meta(context, queue->queue->Queue.put);
+}
+
+__code spawnTaskManager_stub(struct Context* context) {
+    TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
+    pthread_mutex_lock(&taskManager->mutex);
+    goto spawnTaskManager(context,
+                          taskManager,
+                          Gearef(context, Queue),
+                          Gearef(context, TaskManager)->context,
+                          Gearef(context, TaskManager)->next);
+}
+
+
+__code spawnTaskManager1(struct TaskManagerImpl* taskManager) {
+    pthread_mutex_unlock(&taskManager->mutex);
+    goto C_taskSend(...);
+}
+
+__code spawnTaskManager1_stub(struct Context* context) {
+    TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
+    goto spawnTaskManager1(context,
+                           taskManager);
+}
+
+__code taskSend(struct TaskManagerImpl* taskManager, struct Queue* queue) {
+    queue->queue = (union Data*)taskManager->activeQueue;
+    queue->next = C_taskSend1;
+    goto meta(context, taskManager->activeQueue->take);
+}
+
+__code taskSend1(struct TaskManagerImpl* taskManager, struct Queue* queue, struct Context* task, __code next(...)) {
+    struct Queue* tasks = taskManager->workers[task->workerId]->tasks;
+    queue->queue = (union Data*)tasks;
+    queue->data = (union Data*)task;
+    queue->next = next;
+    pthread_cond_signal(&taskManager->workers[task->workerId]->worker->CPUWorker.cond);
+    goto meta(context, tasks->put);
+}
+
+__code taskSend1_stub(struct Context* context) {
+    TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
+    goto taskSend1(context, taskManager, Gearef(context, Queue), Gearef(context, TaskManager)->context, Gearef(context, TaskManager)->next);
+}
+
+__code shutdownTaskManager(struct LoopCounter* loopCounter, struct TaskManager* taskManager, struct TaskManagerImpl* taskManagerImpl, struct Queue* queue, __code next(...)) {
+    int i = loopCounter->i;
+    if (taskManager->cpu <= i && i < taskManager->maxCPU) {
+        struct Queue* tasks = taskManagerImpl->workers[i]->tasks;
+        queue->queue = (union Data*)tasks;
+        queue->data = NULL;
+        queue->next = next;
+        goto meta(context, tasks->put);
+        pthread_join(taskManagerImpl->workers[i]->worker->CPUWorker.thread, NULL);
+        loopCounter->i++;
+        goto C_shutdownTaskManager(...);
+    }
+
+    loopCounter->i = 0;
+    goto meta(context, taskManager->next);
+}
+
+__code shutdownTaskManager_stub(struct Context* context) {
+    TaskManagerImpl* taskManagerImpl = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
+    goto shutdownTaskManager(context, Gearef(context, LoopCounter), Gearef(context, TaskManager), taskManagerImpl, Gearef(context, Queue), Gearef(context, TaskManager)->next);
+}
--- a/src/parallel_execution/generate_stub.pl	Sun Jan 29 21:30:58 2017 +0900
+++ b/src/parallel_execution/generate_stub.pl	Sun Jan 29 22:15:32 2017 +0900
@@ -53,12 +53,12 @@
                 $dataGear{$name} = $_;
                 $var{$name} = {};
                 $code{$name} = {};
-            } elsif (/^(\w+)\* create(\w+)\(/) {
+            } elsif (/^(\w+)(\*)+ create(\w+)\(/) {
                 if (defined $interface) {
                    die "duplicate interface $interface\n"; 
                 }
                 $interface = $1;
-                $implementation = $2;
+                $implementation = $3;
                 if ( -f "$interface.cbc") {
                     &getDataGear("$interface.cbc");
                 }