view src/parallel_execution/ModelChecking/MCWorker.cbc @ 897:70016b1cd1e2

goto mcMeta
author anatofuz <anatofuz@cr.ie.u-ryukyu.ac.jp>
date Tue, 26 Jan 2021 15:45:50 +0900
parents 1caa59b7f228
children 04edc3e392bc
line wrap: on
line source

#include "../../context.h"
#include <stdio.h>
#include <stdlib.h>
#include <time.h>

#interface "TaskManager.h"
#interface "Worker.h"
#interface "Iterator.h"
#interface "Queue.h"
#interface "SingleLinkedQueue.h"

static void startWorker(Worker* worker);
extern int lengthSingleLinkedQueue(struct SingleLinkedQueue* queue);
extern Element* getElementByIdx(struct SingleLinkedQueue* queue, int idx);

Worker* createMCWorker(struct Context* context, int id, Queue* queue) {
    struct Worker* worker = new Worker();
    struct MCWorker* mcWorker = new MCWorker();
    mcWorker->mcQueue = createSingleLinkedQueue(context);
    worker->worker = (union Data*)mcWorker;
    worker->tasks = queue;
    mcWorker->id = id;
    mcWorker->mcContext = NULL;
    mcWorker->loopCounter = 0;
    mcWorker->nextStep = C_startModelChecker;
    worker->taskReceive = C_taskReceiveMCWorker;
    worker->shutdown = C_shutdownMCWorker;
    srandom(time(NULL));
    pthread_create(&worker->thread, NULL, (void*)&startWorker, worker);
    return worker;
}

static void startWorker(struct Worker* worker) {
    struct MCWorker* mcWorker = &worker->worker->MCWorker;
    mcWorker->context = NEW(struct Context);
    initContext(mcWorker->context);
    Gearef(mcWorker->context, Worker)->worker = (union Data*)worker;
    Gearef(mcWorker->context, Worker)->tasks = worker->tasks;
    goto meta(mcWorker->context, worker->taskReceive);
}

__code taskReceiveMCWorker(struct MCWorker* worker, struct Queue* tasks) {
    printf("task receive mc worker\n");
    goto tasks->take(getTaskMCWorker);
}


__code startModelChecker(struct MCWorker* worker) {
    struct SingleLinkedQueue* mcQueue =  (struct SingleLinkedQueue*)worker->mcQueue->queue;
    struct Element* elem = mcQueue->top;
    elem = elem->next;
    int length = lengthSingleLinkedQueue(mcQueue);
    int idx = random()%(length+1); // incase of multithread use random_r
    elem = getElementByIdx(mcQueue, idx);
    struct Context* ncontext = (struct Context*)elem->data;
    worker->mcContext = ncontext;
    goto meta(ncontext, ncontext->next);
}

#define __ncode __code


__ncode mcMeta(struct Context* context, enum Code next) {
    struct MCWorker* mcworker =  (struct MCWorker*) context->worker->worker;
    context->next = next;
    goto meta(mcworker->context, mcworker->nextStep);
}

__code getTaskMCWorker(struct MCWorker* mcWorker, struct Context* task, struct Worker* worker) {
    if (!task) {
        printf("mc worker take end\n");
        goto startModelChecker(); // end thread
    }
    printf("mc worker take\n");
    task->worker = worker;
    enum Code taskCg = task->next;
    struct Queue* mcQueue = mcWorker->mcQueue;
    goto mcQueue->put(task, taskReceiveMCWorker);
}

__code getTaskMCWorker_stub(struct Context* context) {
    MCWorker* mcWorker = (MCWorker*)GearImpl(context, Worker, worker);
    Worker* worker = &Gearef(context,Worker)->worker->Worker;
    struct Context* task = &Gearef(context, Queue)->data->Context;
    goto getTaskMCWorker(context, mcWorker, task, worker);
}

__code odgCommitMCWorker(struct MCWorker* worker, struct Context* task) {
    if (task->iterate) {
        struct Iterator* iterator = task->iterator;
        goto iterator->barrier(task, odgCommitMCWorker1, odgCommitMCWorker6);
    } else {
        goto odgCommitMCWorker1();
    }
}

__code odgCommitMCWorker_stub(struct Context* context) {
    // switch worker context
    struct Context* workerContext = context->worker->worker->MCWorker.context;
    Gearef(workerContext, Worker)->worker = (union Data*)context->worker;
    Gearef(workerContext, Worker)->task = context;
    MCWorker* mcWorker = (MCWorker*)GearImpl(workerContext, Worker, worker);
    goto odgCommitMCWorker(workerContext,
                            mcWorker,
                            context);
}

__code odgCommitMCWorker1(struct MCWorker* worker, struct Context* task) {
    int i = worker->loopCounter;
    if (task->odg+i < task->maxOdg) {
        goto odgCommitMCWorker2();
    }
    worker->loopCounter = 0;
    struct TaskManager* taskManager = task->taskManager;
    goto taskManager->decrementTaskCount(odgCommitMCWorker6);
}

__code odgCommitMCWorker2(struct MCWorker* worker, struct Context* task) {
    int i = worker->loopCounter;
    struct Queue* queue = GET_WAIT_LIST(task->data[task->odg+i]);
    goto queue->isEmpty(odgCommitMCWorker3, odgCommitMCWorker5);
}

__code odgCommitMCWorker3(struct MCWorker* worker, struct Context* task) {
    int i = worker->loopCounter;
    struct Queue* queue = GET_WAIT_LIST(task->data[task->odg+i]);
    goto queue->take(odgCommitMCWorker4);
}

__code odgCommitMCWorker4(struct MCWorker* worker, struct Context* task, struct Context* waitTask) {
    if (__sync_fetch_and_sub(&waitTask->idgCount, 1) == 1) { // atomic decrement idg counter(__sync_fetch_and_sub function return initial value of waitTask->idgCount point)
        struct TaskManager* taskManager = waitTask->taskManager;
        goto taskManager->spawn(waitTask, odgCommitMCWorker2);
    }
    goto odgCommitMCWorker2();
}

__code odgCommitMCWorker4_stub(struct Context* context) {
    MCWorker* mcWorker     = (MCWorker*)GearImpl(context, Worker, worker);
    struct Context* task     = Gearef(context, Worker)->task;
    struct Context* waitTask = &Gearef(context, Queue)->data->Context;
    goto odgCommitMCWorker4(context,
                             mcWorker,
                             task,
                             waitTask);
}

__code odgCommitMCWorker5(struct MCWorker* worker, struct Context* task) {
    worker->loopCounter++;
    goto odgCommitMCWorker1();
}

__code odgCommitMCWorker6(struct MCWorker* worker, struct Context* task) {
    struct Worker* taskWorker = task->worker;
    goto taskWorker->taskReceive(taskWorker->tasks);
}

__code shutdownMCWorker(struct MCWorker* worker) {
    goto exit_code();
}