view src/CPUWorker.cbc @ 590:9146d6017f18 default tip

hg mv parallel_execution/* ..
author anatofuz <anatofuz@cr.ie.u-ryukyu.ac.jp>
date Thu, 16 Jan 2020 15:12:06 +0900
parents src/parallel_execution/CPUWorker.cbc@a517b11c37f7
children
line wrap: on
line source

#include "../context.h"
#interface "TaskManager.h"
#interface "Worker.h"
#interface "Iterator.h"
#interface "Queue.h"

static void startWorker(Worker* worker);

Worker* createCPUWorker(struct Context* context, int id, Queue* queue) {
    struct Worker* worker = new Worker();
    struct CPUWorker* cpuWorker = new CPUWorker();
    worker->worker = (union Data*)cpuWorker;
    worker->tasks = queue;
    cpuWorker->id = id;
    cpuWorker->loopCounter = 0;
    worker->taskReceive = C_taskReceiveCPUWorker;
    worker->shutdown = C_shutdownCPUWorker;
    pthread_create(&worker->thread, NULL, (void*)&startWorker, worker);
    return worker;
}

static void startWorker(struct Worker* worker) {
    struct CPUWorker* cpuWorker = &worker->worker->CPUWorker;
    cpuWorker->context = NEW(struct Context);
    initContext(cpuWorker->context);
    Gearef(cpuWorker->context, Worker)->worker = (union Data*)worker;
    Gearef(cpuWorker->context, Worker)->tasks = worker->tasks;
    goto meta(cpuWorker->context, worker->taskReceive);
}

__code taskReceiveCPUWorker(struct CPUWorker* worker, struct Queue* tasks) {
    goto tasks->take(getTaskCPUWorker);
}

__code getTaskCPUWorker(struct CPUWorker* cpuWorker, struct Context* task, struct Worker* worker) {
    if (!task) {
        goto worker->shutdown(); // end thread
    }
    task->worker = worker;
    enum Code taskCg = task->next;
    task->next = C_odgCommitCPUWorker; // commit outputDG after task exec
    goto meta(task, taskCg); // switch task context
}

__code getTaskCPUWorker_stub(struct Context* context) {
    CPUWorker* cpuWorker = (CPUWorker*)GearImpl(context, Worker, worker);
    Worker* worker = &Gearef(context,Worker)->worker->Worker;
    struct Context* task = &Gearef(context, Queue)->data->Context;
    goto getTaskCPUWorker(context, cpuWorker, task, worker);
}

__code odgCommitCPUWorker(struct CPUWorker* worker, struct Context* task) {
    if (task->iterate) {
        struct Iterator* iterator = task->iterator;
        goto iterator->barrier(task, odgCommitCPUWorker1, odgCommitCPUWorker6);
    } else {
        goto odgCommitCPUWorker1();
    }
}

__code odgCommitCPUWorker_stub(struct Context* context) {
    // switch worker context
    struct Context* workerContext = context->worker->worker->CPUWorker.context;
    Gearef(workerContext, Worker)->worker = (union Data*)context->worker;
    Gearef(workerContext, Worker)->task = context;
    CPUWorker* cpuWorker = (CPUWorker*)GearImpl(workerContext, Worker, worker);
    goto odgCommitCPUWorker(workerContext,
                            cpuWorker,
                            context);
}

__code odgCommitCPUWorker1(struct CPUWorker* worker, struct Context* task) {
    int i = worker->loopCounter;
    if (task->odg+i < task->maxOdg) {
        goto odgCommitCPUWorker2();
    }
    worker->loopCounter = 0;
    struct TaskManager* taskManager = task->taskManager;
    goto taskManager->decrementTaskCount(odgCommitCPUWorker6);
}

__code odgCommitCPUWorker2(struct CPUWorker* worker, struct Context* task) {
    int i = worker->loopCounter;
    struct Queue* queue = GET_WAIT_LIST(task->data[task->odg+i]);
    goto queue->isEmpty(odgCommitCPUWorker3, odgCommitCPUWorker5);
}

__code odgCommitCPUWorker3(struct CPUWorker* worker, struct Context* task) {
    int i = worker->loopCounter;
    struct Queue* queue = GET_WAIT_LIST(task->data[task->odg+i]);
    goto queue->take(odgCommitCPUWorker4);
}

__code odgCommitCPUWorker4(struct CPUWorker* worker, struct Context* task, struct Context* waitTask) {
    if (__sync_fetch_and_sub(&waitTask->idgCount, 1) == 1) { // atomic decrement idg counter(__sync_fetch_and_sub function return initial value of waitTask->idgCount point)
        struct TaskManager* taskManager = waitTask->taskManager;
        goto taskManager->spawn(waitTask, odgCommitCPUWorker2);
    }
    goto odgCommitCPUWorker2();
}

__code odgCommitCPUWorker4_stub(struct Context* context) {
    CPUWorker* cpuWorker     = (CPUWorker*)GearImpl(context, Worker, worker);
    struct Context* task     = Gearef(context, Worker)->task;
    struct Context* waitTask = &Gearef(context, Queue)->data->Context;
    goto odgCommitCPUWorker4(context,
                             cpuWorker,
                             task,
                             waitTask);
}

__code odgCommitCPUWorker5(struct CPUWorker* worker, struct Context* task) {
    worker->loopCounter++;
    goto odgCommitCPUWorker1();
}

__code odgCommitCPUWorker6(struct CPUWorker* worker, struct Context* task) {
    struct Worker* taskWorker = task->worker;
    goto taskWorker->taskReceive(taskWorker->tasks);
}

__code shutdownCPUWorker(struct CPUWorker* worker) {
    goto exit_code();
}