view src/parallel_execution/ModelChecking/MCWorker.cbc @ 885:12cf168d6d3f

...
author anatofuz <anatofuz@cr.ie.u-ryukyu.ac.jp>
date Mon, 25 Jan 2021 16:07:03 +0900
parents 084639a31aaf
children ece622492758
line wrap: on
line source

#include "../../context.h"
#interface "TaskManager.h"
#interface "Worker.h"
#interface "Iterator.h"
#interface "Queue.h"
#interface "SingleLinkedQueue.h"

static void startWorker(Worker* worker);

Worker* createMCWorker(struct Context* context, int id, Queue* queue) {
    struct Worker* worker = new Worker();
    struct MCWorker* mcWorker = new MCWorker();
    mcWorker->mcQueue = createSingleLinkedQueue(context);
    worker->worker = (union Data*)mcWorker;
    worker->tasks = queue;
    mcWorker->id = id;
    mcWorker->loopCounter = 0;
    worker->taskReceive = C_taskReceiveMCWorker;
    worker->shutdown = C_shutdownMCWorker;
    pthread_create(&worker->thread, NULL, (void*)&startWorker, worker);
    return worker;
}

static void startWorker(struct Worker* worker) {
    struct MCWorker* mcWorker = &worker->worker->MCWorker;
    mcWorker->context = NEW(struct Context);
    initContext(mcWorker->context);
    Gearef(mcWorker->context, Worker)->worker = (union Data*)worker;
    Gearef(mcWorker->context, Worker)->tasks = worker->tasks;
    goto meta(mcWorker->context, worker->taskReceive);
}

__code taskReceiveMCWorker(struct MCWorker* worker, struct Queue* tasks) {
    goto tasks->take(getTaskMCWorker);
}

__code startModelChecker(struct MCWorker* worker) {
    struct Queue* mcQueue =  worker->mcQueue;
    goto takeSingleLinkedQueue(mcQueue, taskReceiveMCWorker);
}

__code getTaskMCWorker(struct MCWorker* mcWorker, struct Context* task, struct Worker* worker) {
    if (!task) {
        goto worker->shutdown(); // end thread
    }
    task->worker = worker;
    enum Code taskCg = task->next;
    struct Queue* mcQueue = mcWorker->mcQueue;
    goto mcQueue->put(task, taskReceiveMCWorker);
}

__code getTaskMCWorker_stub(struct Context* context) {
    MCWorker* mcWorker = (MCWorker*)GearImpl(context, Worker, worker);
    Worker* worker = &Gearef(context,Worker)->worker->Worker;
    struct Context* task = &Gearef(context, Queue)->data->Context;
    goto getTaskMCWorker(context, mcWorker, task, worker);
}

__code odgCommitMCWorker(struct MCWorker* worker, struct Context* task) {
    if (task->iterate) {
        struct Iterator* iterator = task->iterator;
        goto iterator->barrier(task, odgCommitMCWorker1, odgCommitMCWorker6);
    } else {
        goto odgCommitMCWorker1();
    }
}

__code odgCommitMCWorker_stub(struct Context* context) {
    // switch worker context
    struct Context* workerContext = context->worker->worker->MCWorker.context;
    Gearef(workerContext, Worker)->worker = (union Data*)context->worker;
    Gearef(workerContext, Worker)->task = context;
    MCWorker* mcWorker = (MCWorker*)GearImpl(workerContext, Worker, worker);
    goto odgCommitMCWorker(workerContext,
                            mcWorker,
                            context);
}

__code odgCommitMCWorker1(struct MCWorker* worker, struct Context* task) {
    int i = worker->loopCounter;
    if (task->odg+i < task->maxOdg) {
        goto odgCommitMCWorker2();
    }
    worker->loopCounter = 0;
    struct TaskManager* taskManager = task->taskManager;
    goto taskManager->decrementTaskCount(odgCommitMCWorker6);
}

__code odgCommitMCWorker2(struct MCWorker* worker, struct Context* task) {
    int i = worker->loopCounter;
    struct Queue* queue = GET_WAIT_LIST(task->data[task->odg+i]);
    goto queue->isEmpty(odgCommitMCWorker3, odgCommitMCWorker5);
}

__code odgCommitMCWorker3(struct MCWorker* worker, struct Context* task) {
    int i = worker->loopCounter;
    struct Queue* queue = GET_WAIT_LIST(task->data[task->odg+i]);
    goto queue->take(odgCommitMCWorker4);
}

__code odgCommitMCWorker4(struct MCWorker* worker, struct Context* task, struct Context* waitTask) {
    if (__sync_fetch_and_sub(&waitTask->idgCount, 1) == 1) { // atomic decrement idg counter(__sync_fetch_and_sub function return initial value of waitTask->idgCount point)
        struct TaskManager* taskManager = waitTask->taskManager;
        goto taskManager->spawn(waitTask, odgCommitMCWorker2);
    }
    goto odgCommitMCWorker2();
}

__code odgCommitMCWorker4_stub(struct Context* context) {
    MCWorker* mcWorker     = (MCWorker*)GearImpl(context, Worker, worker);
    struct Context* task     = Gearef(context, Worker)->task;
    struct Context* waitTask = &Gearef(context, Queue)->data->Context;
    goto odgCommitMCWorker4(context,
                             mcWorker,
                             task,
                             waitTask);
}

__code odgCommitMCWorker5(struct MCWorker* worker, struct Context* task) {
    worker->loopCounter++;
    goto odgCommitMCWorker1();
}

__code odgCommitMCWorker6(struct MCWorker* worker, struct Context* task) {
    struct Worker* taskWorker = task->worker;
    goto taskWorker->taskReceive(taskWorker->tasks);
}

__code shutdownMCWorker(struct MCWorker* worker) {
    goto exit_code();
}