view 2018/2018_01_09/slide.md @ 30:d2e208431fbf

Update
author Tatsuki IHA <innparusu@cr.ie.u-ryukyu.ac.jp>
date Thu, 18 Jan 2018 03:47:13 +0900
parents
children
line wrap: on
line source

title: Gears OS による並列処理
author: Tatsuki IHA
profile:
lang: Japanese
code-engine: coderay

## 研究目的
- 当研究室では  処理の単位を Code Gear、 データの単位を Data Gear を用いて 信頼性が高い並列処理を行う Gears OS を開発している
- Gears OS では Task を Code Gear と実行するときに必要な Input Data Gear と出力するための Output Data Gear の組で表現される。 Input Data Gear/Output Data Gear によって依存関係が決定し、それにそって並列実行を行う.
- 信頼性の確保はモデルチェック、検証等を使用して行う。この信頼性のための計算は通常の計算とは別の階層のメタ計算として記述する。
- また、 メタ計算は信頼性の他に CPU, GPU などの実行環境の切り替え, データ拡張等の柔軟性を提供する。
- 本研究では、 Gears OS の並列処理機構の実装を行う。また、並列処理の検証をメタ計算として記述することで、 並列処理の信頼性を保証する。

## 今週
- BoundedBuffer(producer-consumer)
  - context の wait 機能を作りたい
  - Semaphore
  - spinlock

## BoundedBuffer
- producer 側は buffer に put し続ける
  - buffer の上限を超えるなら, take されるまで待つ
- consumer 側は buffer を take し続ける
  - buffer に要素がないなら, put されるまで待つ
- よくある例だと 上限と空かどうかの2つの semaphore を持って実装される

``` c
mutex buffer_mutex;
semaphore fillCount = 0;
semaphore emptyCount = BUFFER_SIZE;

procedure producer() 
{
    while (true) 
    {
        item = produceItem();
        down(emptyCount);
        down(buffer_mutex);
        putItemIntoBuffer(item);
        up(buffer_mutex);
        up(fillCount);
    }
}

procedure consumer() 
{
    while (true) 
    {
        down(fillCount);
        down(buffer_mutex);
        item = removeItemFromBuffer();
        up(buffer_mutex);
        up(emptyCount);
        consumeItem(item);
    }
}
```

## boundedBuffer
- CbC の実装
- semaphore 2つと binary semaphore を用意

``` c
struct Buffer {
    union Data* buffer;
    union Data* data;
    enum Code put;
    enum Code take;
    enum Code next;
} Buffer;
struct BoundedBuffer {
    struct Element* top;
    struct Element* last;
    struct Semaphore* fullCount;
    struct Semaphore* emptyCount;
    struct Semaphore* lock;
} BoundedBuffer;

Buffer* createBoundedBuffer(struct Context* context, int size) {
    struct Buffer* buffer = new Buffer();
    struct BoundedBuffer* boundedBuffer = new BoundedBuffer();
    boundedBuffer->top = new Element();
    boundedBuffer->top->next = NULL;
    boundedBuffer->last = boundedBuffer->top;
    boundedBuffer->fullCount = createSemaphoreImpl(context, 0);
    boundedBuffer->emptyCount = createSemaphoreImpl(context, size);
    boundedBuffer->lock = createSemaphoreImpl(context, 1); // binary semaphore
    buffer->buffer = (union Data*)boundedBuffer;
    buffer->take = C_takeBoundedBuffer;
    buffer->put = C_putBoundedBuffer;
    return buffer;
}
```

## boundedBuffer
- put の実装

``` c
__code putBoundedBuffer(struct BoundedBuffer* buffer, union Data* data, __code next(...)) {
    struct Semaphore* semaphore = buffer->emptyCount;
    goto semaphore->p(putBoundedBuffer1);
}

__code putBoundedBuffer1(struct BoundedBuffer* buffer, union Data* data, __code next(...)) {
    struct Semaphore* semaphore = buffer->lock;
    goto semaphore->p(putBoundedBuffer2);
}

__code putBoundedBuffer2(struct BoundedBuffer* buffer, union Data* data, __code next(...)) {
    struct Element* element = new Element();
    element->data = data;
    element->next = NULL;
    struct Element* last = buffer->last;
    last->next = element;
    buffer->last = element;
    struct Semaphore* semaphore = buffer->lock;
    goto semaphore->v(putBoundedBuffer3);
}

__code putBoundedBuffer3(struct BoundedBuffer* buffer, union Data* data, __code next(...)) {
    struct Semaphore* semaphore = buffer->fullCount;
    goto semaphore->v(putBoundedBuffer4);
}

__code putBoundedBuffer4(struct BoundedBuffer* buffer, union Data* data, __code next(...)) {
    goto next(...);
}
```

## boundedBuffer
- take

``` c
__code takeBoundedBuffer(struct BoundedBuffer* buffer, __code next(union Data* data, ...)) {
    struct Semaphore* semaphore = buffer->fullCount;
    goto semaphore->p(takeBoundedBuffer1);
}

__code takeBoundedBuffer1(struct BoundedBuffer* buffer, __code next(union Data* data, ...)) {
    struct Semaphore* semaphore = buffer->lock;
    goto semaphore->p(takeBoundedBuffer2);
}

__code takeBoundedBuffer2(struct BoundedBuffer* buffer, __code next(union Data* data, ...)) {
    struct Element* top = buffer->top;
    struct Element* nextElement = top->next;
    data = nextElement->data;
    *O_data =data;
    buffer->top = nextElement;
    struct Semaphore* semaphore = buffer->lock;
    goto semaphore->v(takeBoundedBuffer3);
}

__code takeBoundedBuffer3(struct BoundedBuffer* buffer, __code next(union Data* data, ...)) {
    struct Semaphore* semaphore = buffer->emptyCount;
    goto semaphore->v(takeBoundedBuffer4);
}

__code takeBoundedBuffer4(struct BoundedBuffer* buffer, __code next(union Data* data, ...)) {
    goto next(data, ...);
}
```

## Semaphore
- Gears の Semaphore を実装してみました
- p,v 命令
  - p は 資源の取得, 取得できない場合, thread はwaitする
  - v は 資源の開放, 待っているThread があったら, signal する
- Gears の thread は context なので, wait する際は semaphore の waitContextQueue に context をput する
  - wait したあとは, Worker に処理を任せる
- signal の際は, waitContextQueue から1つ take して, TaskManager に spawn する

## Semaphore
<img src="./pictures/semaphoreSequence.svg" alt="message" width="1000">

## Semaphore
- create

``` c
struct Semaphore {
    union Data* semaphore;
    enum Code p;
    enum Code v;
    enum Code next;
} Semaphore;
struct SemaphoreImpl {
    int value;
    struct Lock* lock;
    struct Queue* waitThreadQueue;
} SemaphoreImpl;

Semaphore* createSemaphoreImpl(struct Context* context, int n) {
    struct Semaphore* semaphore = new Semaphore();
    struct SemaphoreImpl* semaphoreImpl = new SemaphoreImpl();
    semaphore->semaphore = (union Data*)semaphoreImpl;
    semaphoreImpl->value =  n;
    semaphoreImpl->waitThreadQueue = createSingleLinkedQueue(context);
    semaphoreImpl->lock = createSpinLock(context);
    semaphore->p = C_pOperationSemaphoreImpl;
    semaphore->v = C_vOperationSemaphoreImpl;
    return semaphore;
}
```

## Semaphore
- pOperation

``` c
__code pOperationSemaphoreImpl(struct SemaphoreImpl* semaphore, __code next(...)) {
    struct Lock* lock = semaphore->lock;
    goto lock->doLock(pOperationSemaphoreImpl1);
}

__code pOperationSemaphoreImpl1(struct SemaphoreImpl* semaphore, __code next(...)) {
    if(semaphore->value == 0) {
        context->next= C_pOperationSemaphoreImpl;
        struct Queue* queue = semaphore->waitThreadQueue;
        goto queue->put(context, pOperationSemaphoreImpl2); // put this context(thread, process)
    }
    semaphore->value--;
    struct Lock* lock = semaphore->lock;
    goto lock->doUnlock(next(...));
}

__code pOperationSemaphoreImpl2(struct SemaphoreImpl* semaphore, __code next(...)) {
    struct Lock* lock = semaphore->lock;
    goto lock->doUnlock(pOperationSemaphoreImpl3);
}

__code pOperationSemaphoreImpl3(struct SemaphoreImpl* semaphore, struct Worker* worker, __code next(...)) {
    goto worker->taskReceive(); // goto shceduler
}

__code pOperationSemaphoreImpl3_stub(struct Context* context) {
    // switch worker context
    struct Context* workerContext = context->worker->worker->CPUWorker.context;
    SemaphoreImpl* semaphoreImpl = (SemaphoreImpl*)GearImpl(context, Semaphore, semaphore);
    goto pOperationSemaphoreImpl3(workerContext,
            semaphoreImpl,
            context->worker,
            Gearef(context, Semaphore)->next);
}
```

## Semaphore
- vOperation

```
__code vOperationSemaphoreImpl(struct SemaphoreImpl* semaphore, __code next(...)) {
    struct Lock* lock = semaphore->lock;
    goto lock->doLock(vOperationSemaphoreImpl1);
}

__code vOperationSemaphoreImpl1(struct SemaphoreImpl* semaphore, __code next(...)) {
    semaphore->value++;
    struct Queue* queue = semaphore->waitThreadQueue;
    goto queue->isEmpty(vOperationSemaphoreImpl2, vOperationSemaphoreImpl4);
}

__code vOperationSemaphoreImpl2(struct SemaphoreImpl* semaphore, __code next(...)) {
    struct Queue* queue = semaphore->waitThreadQueue;
    goto queue->take(vOperationSemaphoreImpl3);
}

__code vOperationSemaphoreImpl3(struct SemaphoreImpl* semaphore, struct Context* waitTask, __code next(...)) {
    struct TaskManager* taskManager = waitTask->taskManager;
    goto taskManager->spawn(waitTask, vOperationSemaphoreImpl4); //notify
}

__code vOperationSemaphoreImpl3_stub(struct Context* context) {
    SemaphoreImpl* semaphoreImpl = (SemaphoreImpl*)GearImpl(context, Semaphore, semaphore);
    struct Context* waitTask = &Gearef(context, Queue)->data->Context;
    goto vOperationSemaphoreImpl3(context,
            semaphoreImpl,
            waitTask,
            Gearef(context, Semaphore)->next);
}

__code vOperationSemaphoreImpl4(struct SemaphoreImpl* semaphore, __code next(...)) {
    struct Lock* lock = semaphore->lock;
    goto lock->doUnlock(next(...));
}
```