changeset 267:d041069bc7fe

add \TaskManager.cbc
author mir3636
date Sun, 29 Jan 2017 21:26:27 +0900
parents ffcd80cc3a83
children 378ce6f74f4b
files src/parallel_execution/TaskManager.cbc
diffstat 1 files changed, 140 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/parallel_execution/TaskManager.cbc	Sun Jan 29 21:26:27 2017 +0900
@@ -0,0 +1,140 @@
+#include "../context.h"
+#include "../stack.h"
+#include "../queue.h"
+#include "../worker.h"
+#include "../origin_cs.h"
+#include <stdio.h>
+
+void createWorkers(struct Context* context, TaskManager* taskManeger, TaskManagerImpl* taskManagerImpl);
+
+TaskManager** createTaskManager(struct Context* context, int numCPU, int numGPU, int numIO) {
+    struct TaskManager* taskManager = new TaskManager();
+    // 0...numIO-1 IOProcessor 
+    // numIO...numIO+numGPU-1 GPUProcessor
+    // numIO+numGPU...numIO+numGPU+numCPU-1 CPUProcessor
+    taskManager->io = 0;
+    taskManager->gpu = numIO;
+    taskManager->cpu = numIO+numGPU;
+    taskManager->maxCPU = numIO+numGPU+numCPU;
+    taskManager->createTask = C_createTask;
+    taskManager->spawn = C_spawnTaskManager;
+    taskManager->shutdown  = C_shutdownTaskManager;
+    struct TaskManagerImpl* taskManagerImpl = new TaskManagerImpl();
+    taskManager->taskManager = (union Data*)taskManagerImpl;
+    taskManagerImpl -> activeQueue = &createSingleLinkedQueue(context)->Queue;
+    taskManagerImpl -> taskQueue = &createSingleLinkedQueue(context)->Queue;
+    taskManagerImpl -> numWorker = taskManager->maxCPU;
+    createWorkers(context, taskManager, taskManagerImpl);
+    return taskManager;
+}
+
+void createWorkers(struct Context* context, TaskManager* taskManager, TaskManagerImpl* taskManagerImpl) {
+    int i = 0;
+    taskManagerImpl->workers = (Worker**)ALLOC_ARRAY(context, Worker, taskManager->maxCPU);
+    for (;i<taskManager->gpu;i++) {
+        Queue* queue = &createSynchronizedQueue(context)->Queue;
+        taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context, i, queue);
+    }
+    for (;i<taskManager->cpu;i++) {
+#ifdef USE_CUDA
+#else
+        Queue* queue = &createSynchronizedQueue(context)->Queue;
+        taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context, i, queue);
+#endif        
+    }
+    for (;i<taskManager->maxCPU;i++) {
+        Queue* queue = &createSynchronizedQueue(context)->Queue;
+        taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context, i, queue);
+    }
+}
+
+__code createTask(struct TaskManager* taskManager) {
+    taskManager->context = NEW(struct Context);
+    initContext(taskManager->context);
+    goto C_setWorker(...);
+}
+
+__code setWorker(struct TaskManagerImpl* taskManager, struct Context* task, __code next(...)) {
+    task->workerId = taskManager->sendWorkerIndex;
+    if(++taskManager->sendWorkerIndex >= taskManager->numWorker) {
+        taskManager->sendWorkerIndex = 0;
+    }
+    goto next(...);
+}
+
+__code spawnTaskManager(struct TaskManagerImpl* taskManager, struct Queue* queue, struct Context* task, __code next(...)) {
+    if (task->idgCount == 0) {
+        // enqueue activeQueue
+        queue->queue = (union Data*)taskManager->activeQueue;
+    } else {
+        // enqueue waitQueue
+        queue->queue = (union Data*)taskManager->taskQueue;
+    }
+    queue->data = (union Data*)task;
+    queue->next = C_spawnTaskManager1;
+    goto meta(context, queue->queue->Queue.put);
+}
+
+__code spawnTaskManager_stub(struct Context* context) {
+    TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
+    pthread_mutex_lock(&taskManager->mutex);
+    goto spawnTaskManager(context,
+                          taskManager,
+                          Gearef(context, Queue),
+                          Gearef(context, TaskManager)->context,
+                          Gearef(context, TaskManager)->next);
+}
+
+
+__code spawnTaskManager1(struct TaskManagerImpl* taskManager) {
+    pthread_mutex_unlock(&taskManager->mutex);
+    goto C_taskSend(...);
+}
+
+__code spawnTaskManager1_stub(struct Context* context) {
+    TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
+    goto spawnTaskManager1(context,
+                           taskManager);
+}
+
+__code taskSend(struct TaskManagerImpl* taskManager, struct Queue* queue) {
+    queue->queue = (union Data*)taskManager->activeQueue;
+    queue->next = C_taskSend1;
+    goto meta(context, taskManager->activeQueue->take);
+}
+
+__code taskSend1(struct TaskManagerImpl* taskManager, struct Queue* queue, struct Context* task, __code next(...)) {
+    struct Queue* tasks = taskManager->workers[task->workerId]->tasks;
+    queue->queue = (union Data*)tasks;
+    queue->data = (union Data*)task;
+    queue->next = next;
+    pthread_cond_signal(&taskManager->workers[task->workerId]->worker->CPUWorker.cond);
+    goto meta(context, tasks->put);
+}
+
+__code taskSend1_stub(struct Context* context) {
+    TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
+    goto taskSend1(context, taskManager, Gearef(context, Queue), Gearef(context, TaskManager)->context, Gearef(context, TaskManager)->next);
+}
+
+__code shutdownTaskManager(struct LoopCounter* loopCounter, struct TaskManager* taskManager, struct TaskManagerImpl* taskManagerImpl, struct Queue* queue, __code next(...)) {
+    int i = loopCounter->i;
+    if (taskManager->cpu <= i && i < taskManager->maxCPU) {
+        struct Queue* tasks = taskManagerImpl->workers[i]->tasks;
+        queue->queue = (union Data*)tasks;
+        queue->data = NULL;
+        queue->next = next;
+        goto meta(context, tasks->put);
+        pthread_join(taskManagerImpl->workers[i]->worker->CPUWorker.thread, NULL);
+        loopCounter->i++;
+        goto C_shutdownTaskManager(...);
+    }
+
+    loopCounter->i = 0;
+    goto meta(context, taskManager->next);
+}
+
+__code shutdownTaskManager_stub(struct Context* context) {
+    TaskManagerImpl* taskManagerImpl = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
+    goto shutdownTaskManager(context, Gearef(context, LoopCounter), Gearef(context, TaskManager), taskManagerImpl, Gearef(context, Queue), Gearef(context, TaskManager)->next);
+}