view src/parallel_execution/DebugTaskManagerImpl.cbc @ 1016:3e8d89f271e2 debugger

debugger branch
author Takato Matsuoka <t.matsuoka@cr.ie.u-ryukyu.ac.jp>
date Wed, 19 Jan 2022 17:51:07 +0900
parents e6778c866876
children a747a99ae1f2
line wrap: on
line source

#include "../context.h"
#interface "TaskManager.h"
#interface "Iterator.h"
#interface "Queue.h" 
#interface "Worker.h" 

#include <stdio.h>
#include <unistd.h>

// プロトタイプ宣言
static void defaultStatefunc(DebugTaskManagerImpl* debugTaskManagerImpl, struct DebugWorker* debugWorker , StateDB now,StateDB next,int flag) {
}
void createWorkers(struct Context* context, DebugTaskManagerImpl* taskManager);

// DebugTaskManagerの作成。enumや変数の初期化を行う。
TaskManager* createDebugTaskManagerImpl(struct Context* context, int numCPU, int numGPU, int numIO) {
    // printf("[Debug log] createDebugTaskManagerImpl in DebugTaskManager\n");
    // TaskManager構造体の生成および、構造体で定義された関数とenumの紐付け。
    struct TaskManager* taskManager = new TaskManager();
    taskManager->spawnTasks = C_spawnTasksDebugTaskManagerImpl;
    taskManager->spawn = C_spawnDebugTaskManagerImpl;
    taskManager->shutdown  = C_shutdownDebugTaskManagerImpl;
    taskManager->incrementTaskCount = C_incrementTaskCountDebugTaskManagerImpl;
    taskManager->decrementTaskCount = C_decrementTaskCountDebugTaskManagerImpl;
    taskManager->setWaitTask = C_setWaitTaskDebugTaskManagerImpl;

    // Impl構造体の生成および、構造体で定義された変数の初期化
    struct DebugTaskManagerImpl* taskManagerImpl = new DebugTaskManagerImpl(); 
    // 0...numIO-1 IOProcessor
    // numIO...numIO+numGPU-1 GPUProcessor
    // numIO+numGPU...numIO+numGPU+numCPU-1 CPUProcessor
    taskManagerImpl->io = 0;
    taskManagerImpl->gpu = numIO;
    taskManagerImpl->cpu = numIO+numGPU;
    taskManagerImpl->maxCPU = numIO+numGPU+numCPU;
    taskManagerImpl->numWorker = taskManagerImpl->maxCPU;
    taskManagerImpl->sendGPUWorkerIndex = taskManagerImpl->gpu;
    taskManagerImpl->sendCPUWorkerIndex = taskManagerImpl->cpu;
    taskManagerImpl->taskCount = 0;
    taskManagerImpl->loopCounter = 0;
    taskManagerImpl->mem = 0;
    taskManagerImpl->statefunc = defaultStatefunc;
    context->taskManager = taskManager;
    createWorkers(context, taskManagerImpl);
    taskManager->taskManager = (union Data*)taskManagerImpl;
    return taskManager;
}

// workerの作成。CPUやGPU, モデル検査など環境によって作成するWorkerを変更する。
void createWorkers(struct Context* context, DebugTaskManagerImpl* taskManager) {
    // printf("[Debug log] createWorkers in DebugTaskManager\n");
    int i = 0;
    taskManager->workers = (Worker**)ALLOCATE_PTR_ARRAY(context, Worker, taskManager->maxCPU);
    for (;i<taskManager->gpu;i++) {
        Queue* queue = createSynchronizedQueue(context);
        taskManager->workers[i] = (Worker*)createCPUWorker(context, i, queue);
    }
    for (;i<taskManager->cpu;i++) {
        Queue* queue = createSynchronizedQueue(context);
#ifdef USE_CUDAWorker
        taskManager->workers[i] = (Worker*)createCUDAWorker(context, i, queue,0);
#else
        taskManager->workers[i] = (Worker*)createDebugWorker(context, i, queue);
#endif
    }
    for (;i<taskManager->maxCPU;i++) {
        Queue* queue = createSynchronizedQueue(context);
        taskManager->workers[i] = (Worker*)createDebugWorker(context, i, queue);
    }
}

// 一連のspawnTaskの始動関数
__code spawnTasksDebugTaskManagerImpl(struct DebugTaskManagerImpl* taskManager, struct Element* taskList, __code next1(...)) {
    // printf("[Debug log] spawnTaskDebugTaskManagerImpl in DebugTaskManager\n");
    taskManager->taskList = taskList;
    goto spawnTasksDebugTaskManagerImpl1();
}

__code spawnTasksDebugTaskManagerImpl1(struct DebugTaskManagerImpl* taskManagerImpl, struct TaskManager* taskManager) {
    // printf("[Debug log] spawnTasksDebugTaskManagerImpl1 in DebugTaskManager\n");
    // TaskListにTaskがない場合
    if (taskManagerImpl->taskList == NULL) {
        goto spawnTasksDebugTaskManagerImpl2();
    }
    // TaskListにTaskがあった場合、taskListからtaskを取り出し、taskListを次のtaskへ更新してsetWaitTaskへ継続。
    struct Context* task = (struct Context*)taskManagerImpl->taskList->data;
    taskManagerImpl->taskList = taskManagerImpl->taskList->next;
    goto taskManager->setWaitTask(task, spawnTasksDebugTaskManagerImpl1);
}

__code spawnTasksDebugTaskManagerImpl1_stub(struct Context* context) {
    DebugTaskManagerImpl* taskManagerImpl = (DebugTaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
    TaskManager* taskManager = &Gearef(context, TaskManager)->taskManager->TaskManager;
    goto spawnTasksDebugTaskManagerImpl1(context, taskManagerImpl, taskManager);
}

__code spawnTasksDebugTaskManagerImpl2(struct DebugTaskManagerImpl* taskManager, struct Element* taskList, __code next1(...)) {
    // printf("[Debug log] spawnTasksDebugTaskManagerImpl2 DebugTaskManager\n");
    taskManager->taskList = taskList;
    goto spawnTasksDebugTaskManagerImpl3();
}

__code spawnTasksDebugTaskManagerImpl3(struct DebugTaskManagerImpl* taskManagerImpl, __code next1(...), struct TaskManager* taskManager) {
    // printf("[Debug log] spawnTasksDebugTaskManagerImpl3 in DebugTaskManager\n");
    if (taskManagerImpl->taskList == NULL) {
        // struct Queue* tasks = taskManagerImpl->workers[0]->tasks;
        // printf("put NULL\n");
        printf("execute spawnDebugTaskManagerImpl3\n");
        goto next1(...);
    }
    struct Context* task = (struct Context*)taskManagerImpl->taskList->data;
    taskManagerImpl->taskList = taskManagerImpl->taskList->next;
    goto taskManager->spawn(task, spawnTasksDebugTaskManagerImpl3);
}

__code spawnTasksDebugTaskManagerImpl3_stub(struct Context* context) {
    DebugTaskManagerImpl* taskManagerImpl = (DebugTaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
    enum Code next1 = Gearef(context, TaskManager)->next1;
    TaskManager* taskManager = &Gearef(context, TaskManager)->taskManager->TaskManager;
    goto spawnTasksDebugTaskManagerImpl3(context, taskManagerImpl, next1, taskManager);
}

// TaskをQueueにputする
__code setWaitTaskDebugTaskManagerImpl(struct DebugTaskManagerImpl* taskManager, struct Context* task, __code next(...)) {
    // printf("[Debug log] setWaitTaskDebugTaskManagerImpl in DebugTaskManager\n");
    int i = taskManager->loopCounter;
    if (task->idg+i < task->maxIdg) {
        struct Queue* queue = GET_WAIT_LIST(task->data[task->idg + i]);
        taskManager->loopCounter++;
        goto queue->put(task, setWaitTaskDebugTaskManagerImpl);
    }
    taskManager->loopCounter = 0;
    goto incrementTaskCountDebugTaskManagerImpl();
}

__code incrementTaskCountDebugTaskManagerImpl(struct DebugTaskManagerImpl* taskManager, __code next(...)) {
    // printf("[Debug log] incrementTaskCountDebugTaskManagerImpl in DebugTaskManager\n");
    __sync_fetch_and_add(&taskManager->taskCount, 1);
    goto next(...);
}

__code decrementTaskCountDebugTaskManagerImpl(struct DebugTaskManagerImpl* taskManager, __code next(...)) {
    // printf("[Debug log] decrementTaskCountDebugTaskManagerImpl in DebugTaskManager\n");
    __sync_fetch_and_sub(&taskManager->taskCount, 1);
    goto next(...);
}

__code spawnDebugTaskManagerImpl(struct DebugTaskManagerImpl* taskManagerImpl, struct Context* task, __code next(...), struct TaskManager* taskManager) {
    // printf("[Debug log] spawnDebugTaskManagerImpl in DebugTaskManager\n");
    task->taskManager = taskManager;
    if (task->idgCount == 0) {
        // iterator task is normal task until spawned
        if (task->iterator != NULL && task->iterate == 0) {
            pthread_mutex_unlock(&taskManagerImpl->mutex);
            struct Iterator* iterator = task->iterator;
            goto iterator->exec(task, taskManagerImpl->cpu - taskManagerImpl->gpu, next(...));
        }
        goto taskSend();
    }
    pthread_mutex_unlock(&taskManagerImpl->mutex);
    goto next(...);
}

__code spawnDebugTaskManagerImpl_stub(struct Context* context) {
    DebugTaskManagerImpl* taskManagerImpl = (DebugTaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
    struct Context* task = Gearef(context, TaskManager)->task;
    TaskManager* taskManager = &Gearef(context, TaskManager)->taskManager->TaskManager;
    goto spawnDebugTaskManagerImpl(context,
                              taskManagerImpl,
                              task,
                              Gearef(context, TaskManager)->next,
                              taskManager);
}


__code taskSend(struct DebugTaskManagerImpl* taskManager, struct Context* task, __code next(...)) {
    // printf("[Debug log] taskSend in DebugTaskManager\n");
    // set workerId
    if (task->gpu) {
        goto taskSend1();
    } else {
        goto taskSend2();
    }
}

__code taskSend1(struct DebugTaskManagerImpl* taskManager, struct Context* task, __code next(...)) {
    int workerId = taskManager->sendGPUWorkerIndex;
    if (++taskManager->sendGPUWorkerIndex >= taskManager->cpu) {
        taskManager->sendGPUWorkerIndex = taskManager->gpu;
    }
    pthread_mutex_unlock(&taskManager->mutex);
    struct Queue* queue = taskManager->workers[workerId]->tasks;
    // printf("tasks->put taskSend1\n");
    goto queue->put(task, next(...));
}

__code taskSend2(struct DebugTaskManagerImpl* taskManager, struct Context* task, __code next(...)) {
    // printf("[Debug log] taskSend2 in DebugTaskManager\n");
    int workerId = taskManager->sendCPUWorkerIndex;
    if (++taskManager->sendCPUWorkerIndex >= taskManager->maxCPU) {
        taskManager->sendCPUWorkerIndex = taskManager->cpu;
    }
    pthread_mutex_unlock(&taskManager->mutex);
    struct Queue* queue = taskManager->workers[workerId]->tasks;
    printf("execute taskSend2\n");
    goto queue->put(task, next(...));
}

__code shutdownDebugTaskManagerImpl(struct DebugTaskManagerImpl* taskManager, __code next(...)) {
    // printf("[Debug log] shutdownDebugTaskManagerImpl in DebugTaskManager\n");
    if (taskManager->taskCount != 0) {
        usleep(1000);
        goto shutdownDebugTaskManagerImpl();
    }
    int i = taskManager->loopCounter;
    if (i < taskManager->numWorker) {
        struct Queue* tasks = taskManager->workers[i]->tasks;
        goto tasks->put(NULL, shutdownDebugTaskManagerImpl1);
    }
    taskManager->loopCounter = 0;
    goto shutdownDebugTaskManagerImpl2();
}

__code shutdownDebugTaskManagerImpl1(struct DebugTaskManagerImpl* taskManager, __code next(...)) {
    // printf("[Debug log] shutdownDebugTaskManagerImpl1 in DebugTaskManager\n");
    taskManager->loopCounter++;
    goto shutdownDebugTaskManagerImpl();
}

__code shutdownDebugTaskManagerImpl2(struct DebugTaskManagerImpl* taskManager, __code next(...)) {
    // printf("[Debug log] shutdownDebugTaskManagerImpl2 in DebugTaskManager\n");
    int i = taskManager->loopCounter;
    if (i < taskManager->numWorker) {
        pthread_join(taskManager->workers[i]->thread, NULL);
        taskManager->loopCounter++;
        goto shutdownDebugTaskManagerImpl2();
    }
    taskManager->loopCounter = 0;
    goto next(...);
}