Mercurial > hg > Gears > Gears
view src/parallel_execution/ModelChecking/MCWorker.cbc @ 897:70016b1cd1e2
goto mcMeta
author | anatofuz <anatofuz@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Tue, 26 Jan 2021 15:45:50 +0900 |
parents | 1caa59b7f228 |
children | 04edc3e392bc |
line wrap: on
line source
#include "../../context.h" #include <stdio.h> #include <stdlib.h> #include <time.h> #interface "TaskManager.h" #interface "Worker.h" #interface "Iterator.h" #interface "Queue.h" #interface "SingleLinkedQueue.h" static void startWorker(Worker* worker); extern int lengthSingleLinkedQueue(struct SingleLinkedQueue* queue); extern Element* getElementByIdx(struct SingleLinkedQueue* queue, int idx); Worker* createMCWorker(struct Context* context, int id, Queue* queue) { struct Worker* worker = new Worker(); struct MCWorker* mcWorker = new MCWorker(); mcWorker->mcQueue = createSingleLinkedQueue(context); worker->worker = (union Data*)mcWorker; worker->tasks = queue; mcWorker->id = id; mcWorker->mcContext = NULL; mcWorker->loopCounter = 0; mcWorker->nextStep = C_startModelChecker; worker->taskReceive = C_taskReceiveMCWorker; worker->shutdown = C_shutdownMCWorker; srandom(time(NULL)); pthread_create(&worker->thread, NULL, (void*)&startWorker, worker); return worker; } static void startWorker(struct Worker* worker) { struct MCWorker* mcWorker = &worker->worker->MCWorker; mcWorker->context = NEW(struct Context); initContext(mcWorker->context); Gearef(mcWorker->context, Worker)->worker = (union Data*)worker; Gearef(mcWorker->context, Worker)->tasks = worker->tasks; goto meta(mcWorker->context, worker->taskReceive); } __code taskReceiveMCWorker(struct MCWorker* worker, struct Queue* tasks) { printf("task receive mc worker\n"); goto tasks->take(getTaskMCWorker); } __code startModelChecker(struct MCWorker* worker) { struct SingleLinkedQueue* mcQueue = (struct SingleLinkedQueue*)worker->mcQueue->queue; struct Element* elem = mcQueue->top; elem = elem->next; int length = lengthSingleLinkedQueue(mcQueue); int idx = random()%(length+1); // incase of multithread use random_r elem = getElementByIdx(mcQueue, idx); struct Context* ncontext = (struct Context*)elem->data; worker->mcContext = ncontext; goto meta(ncontext, ncontext->next); } #define __ncode __code __ncode mcMeta(struct Context* context, enum Code next) { struct MCWorker* mcworker = (struct MCWorker*) context->worker->worker; context->next = next; goto meta(mcworker->context, mcworker->nextStep); } __code getTaskMCWorker(struct MCWorker* mcWorker, struct Context* task, struct Worker* worker) { if (!task) { printf("mc worker take end\n"); goto startModelChecker(); // end thread } printf("mc worker take\n"); task->worker = worker; enum Code taskCg = task->next; struct Queue* mcQueue = mcWorker->mcQueue; goto mcQueue->put(task, taskReceiveMCWorker); } __code getTaskMCWorker_stub(struct Context* context) { MCWorker* mcWorker = (MCWorker*)GearImpl(context, Worker, worker); Worker* worker = &Gearef(context,Worker)->worker->Worker; struct Context* task = &Gearef(context, Queue)->data->Context; goto getTaskMCWorker(context, mcWorker, task, worker); } __code odgCommitMCWorker(struct MCWorker* worker, struct Context* task) { if (task->iterate) { struct Iterator* iterator = task->iterator; goto iterator->barrier(task, odgCommitMCWorker1, odgCommitMCWorker6); } else { goto odgCommitMCWorker1(); } } __code odgCommitMCWorker_stub(struct Context* context) { // switch worker context struct Context* workerContext = context->worker->worker->MCWorker.context; Gearef(workerContext, Worker)->worker = (union Data*)context->worker; Gearef(workerContext, Worker)->task = context; MCWorker* mcWorker = (MCWorker*)GearImpl(workerContext, Worker, worker); goto odgCommitMCWorker(workerContext, mcWorker, context); } __code odgCommitMCWorker1(struct MCWorker* worker, struct Context* task) { int i = worker->loopCounter; if (task->odg+i < task->maxOdg) { goto odgCommitMCWorker2(); } worker->loopCounter = 0; struct TaskManager* taskManager = task->taskManager; goto taskManager->decrementTaskCount(odgCommitMCWorker6); } __code odgCommitMCWorker2(struct MCWorker* worker, struct Context* task) { int i = worker->loopCounter; struct Queue* queue = GET_WAIT_LIST(task->data[task->odg+i]); goto queue->isEmpty(odgCommitMCWorker3, odgCommitMCWorker5); } __code odgCommitMCWorker3(struct MCWorker* worker, struct Context* task) { int i = worker->loopCounter; struct Queue* queue = GET_WAIT_LIST(task->data[task->odg+i]); goto queue->take(odgCommitMCWorker4); } __code odgCommitMCWorker4(struct MCWorker* worker, struct Context* task, struct Context* waitTask) { if (__sync_fetch_and_sub(&waitTask->idgCount, 1) == 1) { // atomic decrement idg counter(__sync_fetch_and_sub function return initial value of waitTask->idgCount point) struct TaskManager* taskManager = waitTask->taskManager; goto taskManager->spawn(waitTask, odgCommitMCWorker2); } goto odgCommitMCWorker2(); } __code odgCommitMCWorker4_stub(struct Context* context) { MCWorker* mcWorker = (MCWorker*)GearImpl(context, Worker, worker); struct Context* task = Gearef(context, Worker)->task; struct Context* waitTask = &Gearef(context, Queue)->data->Context; goto odgCommitMCWorker4(context, mcWorker, task, waitTask); } __code odgCommitMCWorker5(struct MCWorker* worker, struct Context* task) { worker->loopCounter++; goto odgCommitMCWorker1(); } __code odgCommitMCWorker6(struct MCWorker* worker, struct Context* task) { struct Worker* taskWorker = task->worker; goto taskWorker->taskReceive(taskWorker->tasks); } __code shutdownMCWorker(struct MCWorker* worker) { goto exit_code(); }