Mercurial > hg > Gears > Gears
view src/parallel_execution/DebugTaskManagerImpl.cbc @ 1016:3e8d89f271e2 debugger
debugger branch
author | Takato Matsuoka <t.matsuoka@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Wed, 19 Jan 2022 17:51:07 +0900 |
parents | e6778c866876 |
children | a747a99ae1f2 |
line wrap: on
line source
#include "../context.h" #interface "TaskManager.h" #interface "Iterator.h" #interface "Queue.h" #interface "Worker.h" #include <stdio.h> #include <unistd.h> // プロトタイプ宣言 static void defaultStatefunc(DebugTaskManagerImpl* debugTaskManagerImpl, struct DebugWorker* debugWorker , StateDB now,StateDB next,int flag) { } void createWorkers(struct Context* context, DebugTaskManagerImpl* taskManager); // DebugTaskManagerの作成。enumや変数の初期化を行う。 TaskManager* createDebugTaskManagerImpl(struct Context* context, int numCPU, int numGPU, int numIO) { // printf("[Debug log] createDebugTaskManagerImpl in DebugTaskManager\n"); // TaskManager構造体の生成および、構造体で定義された関数とenumの紐付け。 struct TaskManager* taskManager = new TaskManager(); taskManager->spawnTasks = C_spawnTasksDebugTaskManagerImpl; taskManager->spawn = C_spawnDebugTaskManagerImpl; taskManager->shutdown = C_shutdownDebugTaskManagerImpl; taskManager->incrementTaskCount = C_incrementTaskCountDebugTaskManagerImpl; taskManager->decrementTaskCount = C_decrementTaskCountDebugTaskManagerImpl; taskManager->setWaitTask = C_setWaitTaskDebugTaskManagerImpl; // Impl構造体の生成および、構造体で定義された変数の初期化 struct DebugTaskManagerImpl* taskManagerImpl = new DebugTaskManagerImpl(); // 0...numIO-1 IOProcessor // numIO...numIO+numGPU-1 GPUProcessor // numIO+numGPU...numIO+numGPU+numCPU-1 CPUProcessor taskManagerImpl->io = 0; taskManagerImpl->gpu = numIO; taskManagerImpl->cpu = numIO+numGPU; taskManagerImpl->maxCPU = numIO+numGPU+numCPU; taskManagerImpl->numWorker = taskManagerImpl->maxCPU; taskManagerImpl->sendGPUWorkerIndex = taskManagerImpl->gpu; taskManagerImpl->sendCPUWorkerIndex = taskManagerImpl->cpu; taskManagerImpl->taskCount = 0; taskManagerImpl->loopCounter = 0; taskManagerImpl->mem = 0; taskManagerImpl->statefunc = defaultStatefunc; context->taskManager = taskManager; createWorkers(context, taskManagerImpl); taskManager->taskManager = (union Data*)taskManagerImpl; return taskManager; } // workerの作成。CPUやGPU, モデル検査など環境によって作成するWorkerを変更する。 void createWorkers(struct Context* context, DebugTaskManagerImpl* taskManager) { // printf("[Debug log] createWorkers in DebugTaskManager\n"); int i = 0; taskManager->workers = (Worker**)ALLOCATE_PTR_ARRAY(context, Worker, taskManager->maxCPU); for (;i<taskManager->gpu;i++) { Queue* queue = createSynchronizedQueue(context); taskManager->workers[i] = (Worker*)createCPUWorker(context, i, queue); } for (;i<taskManager->cpu;i++) { Queue* queue = createSynchronizedQueue(context); #ifdef USE_CUDAWorker taskManager->workers[i] = (Worker*)createCUDAWorker(context, i, queue,0); #else taskManager->workers[i] = (Worker*)createDebugWorker(context, i, queue); #endif } for (;i<taskManager->maxCPU;i++) { Queue* queue = createSynchronizedQueue(context); taskManager->workers[i] = (Worker*)createDebugWorker(context, i, queue); } } // 一連のspawnTaskの始動関数 __code spawnTasksDebugTaskManagerImpl(struct DebugTaskManagerImpl* taskManager, struct Element* taskList, __code next1(...)) { // printf("[Debug log] spawnTaskDebugTaskManagerImpl in DebugTaskManager\n"); taskManager->taskList = taskList; goto spawnTasksDebugTaskManagerImpl1(); } __code spawnTasksDebugTaskManagerImpl1(struct DebugTaskManagerImpl* taskManagerImpl, struct TaskManager* taskManager) { // printf("[Debug log] spawnTasksDebugTaskManagerImpl1 in DebugTaskManager\n"); // TaskListにTaskがない場合 if (taskManagerImpl->taskList == NULL) { goto spawnTasksDebugTaskManagerImpl2(); } // TaskListにTaskがあった場合、taskListからtaskを取り出し、taskListを次のtaskへ更新してsetWaitTaskへ継続。 struct Context* task = (struct Context*)taskManagerImpl->taskList->data; taskManagerImpl->taskList = taskManagerImpl->taskList->next; goto taskManager->setWaitTask(task, spawnTasksDebugTaskManagerImpl1); } __code spawnTasksDebugTaskManagerImpl1_stub(struct Context* context) { DebugTaskManagerImpl* taskManagerImpl = (DebugTaskManagerImpl*)GearImpl(context, TaskManager, taskManager); TaskManager* taskManager = &Gearef(context, TaskManager)->taskManager->TaskManager; goto spawnTasksDebugTaskManagerImpl1(context, taskManagerImpl, taskManager); } __code spawnTasksDebugTaskManagerImpl2(struct DebugTaskManagerImpl* taskManager, struct Element* taskList, __code next1(...)) { // printf("[Debug log] spawnTasksDebugTaskManagerImpl2 DebugTaskManager\n"); taskManager->taskList = taskList; goto spawnTasksDebugTaskManagerImpl3(); } __code spawnTasksDebugTaskManagerImpl3(struct DebugTaskManagerImpl* taskManagerImpl, __code next1(...), struct TaskManager* taskManager) { // printf("[Debug log] spawnTasksDebugTaskManagerImpl3 in DebugTaskManager\n"); if (taskManagerImpl->taskList == NULL) { // struct Queue* tasks = taskManagerImpl->workers[0]->tasks; // printf("put NULL\n"); printf("execute spawnDebugTaskManagerImpl3\n"); goto next1(...); } struct Context* task = (struct Context*)taskManagerImpl->taskList->data; taskManagerImpl->taskList = taskManagerImpl->taskList->next; goto taskManager->spawn(task, spawnTasksDebugTaskManagerImpl3); } __code spawnTasksDebugTaskManagerImpl3_stub(struct Context* context) { DebugTaskManagerImpl* taskManagerImpl = (DebugTaskManagerImpl*)GearImpl(context, TaskManager, taskManager); enum Code next1 = Gearef(context, TaskManager)->next1; TaskManager* taskManager = &Gearef(context, TaskManager)->taskManager->TaskManager; goto spawnTasksDebugTaskManagerImpl3(context, taskManagerImpl, next1, taskManager); } // TaskをQueueにputする __code setWaitTaskDebugTaskManagerImpl(struct DebugTaskManagerImpl* taskManager, struct Context* task, __code next(...)) { // printf("[Debug log] setWaitTaskDebugTaskManagerImpl in DebugTaskManager\n"); int i = taskManager->loopCounter; if (task->idg+i < task->maxIdg) { struct Queue* queue = GET_WAIT_LIST(task->data[task->idg + i]); taskManager->loopCounter++; goto queue->put(task, setWaitTaskDebugTaskManagerImpl); } taskManager->loopCounter = 0; goto incrementTaskCountDebugTaskManagerImpl(); } __code incrementTaskCountDebugTaskManagerImpl(struct DebugTaskManagerImpl* taskManager, __code next(...)) { // printf("[Debug log] incrementTaskCountDebugTaskManagerImpl in DebugTaskManager\n"); __sync_fetch_and_add(&taskManager->taskCount, 1); goto next(...); } __code decrementTaskCountDebugTaskManagerImpl(struct DebugTaskManagerImpl* taskManager, __code next(...)) { // printf("[Debug log] decrementTaskCountDebugTaskManagerImpl in DebugTaskManager\n"); __sync_fetch_and_sub(&taskManager->taskCount, 1); goto next(...); } __code spawnDebugTaskManagerImpl(struct DebugTaskManagerImpl* taskManagerImpl, struct Context* task, __code next(...), struct TaskManager* taskManager) { // printf("[Debug log] spawnDebugTaskManagerImpl in DebugTaskManager\n"); task->taskManager = taskManager; if (task->idgCount == 0) { // iterator task is normal task until spawned if (task->iterator != NULL && task->iterate == 0) { pthread_mutex_unlock(&taskManagerImpl->mutex); struct Iterator* iterator = task->iterator; goto iterator->exec(task, taskManagerImpl->cpu - taskManagerImpl->gpu, next(...)); } goto taskSend(); } pthread_mutex_unlock(&taskManagerImpl->mutex); goto next(...); } __code spawnDebugTaskManagerImpl_stub(struct Context* context) { DebugTaskManagerImpl* taskManagerImpl = (DebugTaskManagerImpl*)GearImpl(context, TaskManager, taskManager); struct Context* task = Gearef(context, TaskManager)->task; TaskManager* taskManager = &Gearef(context, TaskManager)->taskManager->TaskManager; goto spawnDebugTaskManagerImpl(context, taskManagerImpl, task, Gearef(context, TaskManager)->next, taskManager); } __code taskSend(struct DebugTaskManagerImpl* taskManager, struct Context* task, __code next(...)) { // printf("[Debug log] taskSend in DebugTaskManager\n"); // set workerId if (task->gpu) { goto taskSend1(); } else { goto taskSend2(); } } __code taskSend1(struct DebugTaskManagerImpl* taskManager, struct Context* task, __code next(...)) { int workerId = taskManager->sendGPUWorkerIndex; if (++taskManager->sendGPUWorkerIndex >= taskManager->cpu) { taskManager->sendGPUWorkerIndex = taskManager->gpu; } pthread_mutex_unlock(&taskManager->mutex); struct Queue* queue = taskManager->workers[workerId]->tasks; // printf("tasks->put taskSend1\n"); goto queue->put(task, next(...)); } __code taskSend2(struct DebugTaskManagerImpl* taskManager, struct Context* task, __code next(...)) { // printf("[Debug log] taskSend2 in DebugTaskManager\n"); int workerId = taskManager->sendCPUWorkerIndex; if (++taskManager->sendCPUWorkerIndex >= taskManager->maxCPU) { taskManager->sendCPUWorkerIndex = taskManager->cpu; } pthread_mutex_unlock(&taskManager->mutex); struct Queue* queue = taskManager->workers[workerId]->tasks; printf("execute taskSend2\n"); goto queue->put(task, next(...)); } __code shutdownDebugTaskManagerImpl(struct DebugTaskManagerImpl* taskManager, __code next(...)) { // printf("[Debug log] shutdownDebugTaskManagerImpl in DebugTaskManager\n"); if (taskManager->taskCount != 0) { usleep(1000); goto shutdownDebugTaskManagerImpl(); } int i = taskManager->loopCounter; if (i < taskManager->numWorker) { struct Queue* tasks = taskManager->workers[i]->tasks; goto tasks->put(NULL, shutdownDebugTaskManagerImpl1); } taskManager->loopCounter = 0; goto shutdownDebugTaskManagerImpl2(); } __code shutdownDebugTaskManagerImpl1(struct DebugTaskManagerImpl* taskManager, __code next(...)) { // printf("[Debug log] shutdownDebugTaskManagerImpl1 in DebugTaskManager\n"); taskManager->loopCounter++; goto shutdownDebugTaskManagerImpl(); } __code shutdownDebugTaskManagerImpl2(struct DebugTaskManagerImpl* taskManager, __code next(...)) { // printf("[Debug log] shutdownDebugTaskManagerImpl2 in DebugTaskManager\n"); int i = taskManager->loopCounter; if (i < taskManager->numWorker) { pthread_join(taskManager->workers[i]->thread, NULL); taskManager->loopCounter++; goto shutdownDebugTaskManagerImpl2(); } taskManager->loopCounter = 0; goto next(...); }