comparison 2018/2018_01_09/slide.md @ 30:d2e208431fbf

Update
author Tatsuki IHA <innparusu@cr.ie.u-ryukyu.ac.jp>
date Thu, 18 Jan 2018 03:47:13 +0900
parents
children
comparison
equal deleted inserted replaced
29:791013bd4429 30:d2e208431fbf
1 title: Gears OS による並列処理
2 author: Tatsuki IHA
3 profile:
4 lang: Japanese
5 code-engine: coderay
6
7 ## 研究目的
8 - 当研究室では 処理の単位を Code Gear、 データの単位を Data Gear を用いて 信頼性が高い並列処理を行う Gears OS を開発している
9 - Gears OS では Task を Code Gear と実行するときに必要な Input Data Gear と出力するための Output Data Gear の組で表現される。 Input Data Gear/Output Data Gear によって依存関係が決定し、それにそって並列実行を行う.
10 - 信頼性の確保はモデルチェック、検証等を使用して行う。この信頼性のための計算は通常の計算とは別の階層のメタ計算として記述する。
11 - また、 メタ計算は信頼性の他に CPU, GPU などの実行環境の切り替え, データ拡張等の柔軟性を提供する。
12 - 本研究では、 Gears OS の並列処理機構の実装を行う。また、並列処理の検証をメタ計算として記述することで、 並列処理の信頼性を保証する。
13
14 ## 今週
15 - BoundedBuffer(producer-consumer)
16 - context の wait 機能を作りたい
17 - Semaphore
18 - spinlock
19
20 ## BoundedBuffer
21 - producer 側は buffer に put し続ける
22 - buffer の上限を超えるなら, take されるまで待つ
23 - consumer 側は buffer を take し続ける
24 - buffer に要素がないなら, put されるまで待つ
25 - よくある例だと 上限と空かどうかの2つの semaphore を持って実装される
26
27 ``` c
28 mutex buffer_mutex;
29 semaphore fillCount = 0;
30 semaphore emptyCount = BUFFER_SIZE;
31
32 procedure producer()
33 {
34 while (true)
35 {
36 item = produceItem();
37 down(emptyCount);
38 down(buffer_mutex);
39 putItemIntoBuffer(item);
40 up(buffer_mutex);
41 up(fillCount);
42 }
43 }
44
45 procedure consumer()
46 {
47 while (true)
48 {
49 down(fillCount);
50 down(buffer_mutex);
51 item = removeItemFromBuffer();
52 up(buffer_mutex);
53 up(emptyCount);
54 consumeItem(item);
55 }
56 }
57 ```
58
59 ## boundedBuffer
60 - CbC の実装
61 - semaphore 2つと binary semaphore を用意
62
63 ``` c
64 struct Buffer {
65 union Data* buffer;
66 union Data* data;
67 enum Code put;
68 enum Code take;
69 enum Code next;
70 } Buffer;
71 struct BoundedBuffer {
72 struct Element* top;
73 struct Element* last;
74 struct Semaphore* fullCount;
75 struct Semaphore* emptyCount;
76 struct Semaphore* lock;
77 } BoundedBuffer;
78
79 Buffer* createBoundedBuffer(struct Context* context, int size) {
80 struct Buffer* buffer = new Buffer();
81 struct BoundedBuffer* boundedBuffer = new BoundedBuffer();
82 boundedBuffer->top = new Element();
83 boundedBuffer->top->next = NULL;
84 boundedBuffer->last = boundedBuffer->top;
85 boundedBuffer->fullCount = createSemaphoreImpl(context, 0);
86 boundedBuffer->emptyCount = createSemaphoreImpl(context, size);
87 boundedBuffer->lock = createSemaphoreImpl(context, 1); // binary semaphore
88 buffer->buffer = (union Data*)boundedBuffer;
89 buffer->take = C_takeBoundedBuffer;
90 buffer->put = C_putBoundedBuffer;
91 return buffer;
92 }
93 ```
94
95 ## boundedBuffer
96 - put の実装
97
98 ``` c
99 __code putBoundedBuffer(struct BoundedBuffer* buffer, union Data* data, __code next(...)) {
100 struct Semaphore* semaphore = buffer->emptyCount;
101 goto semaphore->p(putBoundedBuffer1);
102 }
103
104 __code putBoundedBuffer1(struct BoundedBuffer* buffer, union Data* data, __code next(...)) {
105 struct Semaphore* semaphore = buffer->lock;
106 goto semaphore->p(putBoundedBuffer2);
107 }
108
109 __code putBoundedBuffer2(struct BoundedBuffer* buffer, union Data* data, __code next(...)) {
110 struct Element* element = new Element();
111 element->data = data;
112 element->next = NULL;
113 struct Element* last = buffer->last;
114 last->next = element;
115 buffer->last = element;
116 struct Semaphore* semaphore = buffer->lock;
117 goto semaphore->v(putBoundedBuffer3);
118 }
119
120 __code putBoundedBuffer3(struct BoundedBuffer* buffer, union Data* data, __code next(...)) {
121 struct Semaphore* semaphore = buffer->fullCount;
122 goto semaphore->v(putBoundedBuffer4);
123 }
124
125 __code putBoundedBuffer4(struct BoundedBuffer* buffer, union Data* data, __code next(...)) {
126 goto next(...);
127 }
128 ```
129
130 ## boundedBuffer
131 - take
132
133 ``` c
134 __code takeBoundedBuffer(struct BoundedBuffer* buffer, __code next(union Data* data, ...)) {
135 struct Semaphore* semaphore = buffer->fullCount;
136 goto semaphore->p(takeBoundedBuffer1);
137 }
138
139 __code takeBoundedBuffer1(struct BoundedBuffer* buffer, __code next(union Data* data, ...)) {
140 struct Semaphore* semaphore = buffer->lock;
141 goto semaphore->p(takeBoundedBuffer2);
142 }
143
144 __code takeBoundedBuffer2(struct BoundedBuffer* buffer, __code next(union Data* data, ...)) {
145 struct Element* top = buffer->top;
146 struct Element* nextElement = top->next;
147 data = nextElement->data;
148 *O_data =data;
149 buffer->top = nextElement;
150 struct Semaphore* semaphore = buffer->lock;
151 goto semaphore->v(takeBoundedBuffer3);
152 }
153
154 __code takeBoundedBuffer3(struct BoundedBuffer* buffer, __code next(union Data* data, ...)) {
155 struct Semaphore* semaphore = buffer->emptyCount;
156 goto semaphore->v(takeBoundedBuffer4);
157 }
158
159 __code takeBoundedBuffer4(struct BoundedBuffer* buffer, __code next(union Data* data, ...)) {
160 goto next(data, ...);
161 }
162 ```
163
164 ## Semaphore
165 - Gears の Semaphore を実装してみました
166 - p,v 命令
167 - p は 資源の取得, 取得できない場合, thread はwaitする
168 - v は 資源の開放, 待っているThread があったら, signal する
169 - Gears の thread は context なので, wait する際は semaphore の waitContextQueue に context をput する
170 - wait したあとは, Worker に処理を任せる
171 - signal の際は, waitContextQueue から1つ take して, TaskManager に spawn する
172
173 ## Semaphore
174 <img src="./pictures/semaphoreSequence.svg" alt="message" width="1000">
175
176 ## Semaphore
177 - create
178
179 ``` c
180 struct Semaphore {
181 union Data* semaphore;
182 enum Code p;
183 enum Code v;
184 enum Code next;
185 } Semaphore;
186 struct SemaphoreImpl {
187 int value;
188 struct Lock* lock;
189 struct Queue* waitThreadQueue;
190 } SemaphoreImpl;
191
192 Semaphore* createSemaphoreImpl(struct Context* context, int n) {
193 struct Semaphore* semaphore = new Semaphore();
194 struct SemaphoreImpl* semaphoreImpl = new SemaphoreImpl();
195 semaphore->semaphore = (union Data*)semaphoreImpl;
196 semaphoreImpl->value = n;
197 semaphoreImpl->waitThreadQueue = createSingleLinkedQueue(context);
198 semaphoreImpl->lock = createSpinLock(context);
199 semaphore->p = C_pOperationSemaphoreImpl;
200 semaphore->v = C_vOperationSemaphoreImpl;
201 return semaphore;
202 }
203 ```
204
205 ## Semaphore
206 - pOperation
207
208 ``` c
209 __code pOperationSemaphoreImpl(struct SemaphoreImpl* semaphore, __code next(...)) {
210 struct Lock* lock = semaphore->lock;
211 goto lock->doLock(pOperationSemaphoreImpl1);
212 }
213
214 __code pOperationSemaphoreImpl1(struct SemaphoreImpl* semaphore, __code next(...)) {
215 if(semaphore->value == 0) {
216 context->next= C_pOperationSemaphoreImpl;
217 struct Queue* queue = semaphore->waitThreadQueue;
218 goto queue->put(context, pOperationSemaphoreImpl2); // put this context(thread, process)
219 }
220 semaphore->value--;
221 struct Lock* lock = semaphore->lock;
222 goto lock->doUnlock(next(...));
223 }
224
225 __code pOperationSemaphoreImpl2(struct SemaphoreImpl* semaphore, __code next(...)) {
226 struct Lock* lock = semaphore->lock;
227 goto lock->doUnlock(pOperationSemaphoreImpl3);
228 }
229
230 __code pOperationSemaphoreImpl3(struct SemaphoreImpl* semaphore, struct Worker* worker, __code next(...)) {
231 goto worker->taskReceive(); // goto shceduler
232 }
233
234 __code pOperationSemaphoreImpl3_stub(struct Context* context) {
235 // switch worker context
236 struct Context* workerContext = context->worker->worker->CPUWorker.context;
237 SemaphoreImpl* semaphoreImpl = (SemaphoreImpl*)GearImpl(context, Semaphore, semaphore);
238 goto pOperationSemaphoreImpl3(workerContext,
239 semaphoreImpl,
240 context->worker,
241 Gearef(context, Semaphore)->next);
242 }
243 ```
244
245 ## Semaphore
246 - vOperation
247
248 ```
249 __code vOperationSemaphoreImpl(struct SemaphoreImpl* semaphore, __code next(...)) {
250 struct Lock* lock = semaphore->lock;
251 goto lock->doLock(vOperationSemaphoreImpl1);
252 }
253
254 __code vOperationSemaphoreImpl1(struct SemaphoreImpl* semaphore, __code next(...)) {
255 semaphore->value++;
256 struct Queue* queue = semaphore->waitThreadQueue;
257 goto queue->isEmpty(vOperationSemaphoreImpl2, vOperationSemaphoreImpl4);
258 }
259
260 __code vOperationSemaphoreImpl2(struct SemaphoreImpl* semaphore, __code next(...)) {
261 struct Queue* queue = semaphore->waitThreadQueue;
262 goto queue->take(vOperationSemaphoreImpl3);
263 }
264
265 __code vOperationSemaphoreImpl3(struct SemaphoreImpl* semaphore, struct Context* waitTask, __code next(...)) {
266 struct TaskManager* taskManager = waitTask->taskManager;
267 goto taskManager->spawn(waitTask, vOperationSemaphoreImpl4); //notify
268 }
269
270 __code vOperationSemaphoreImpl3_stub(struct Context* context) {
271 SemaphoreImpl* semaphoreImpl = (SemaphoreImpl*)GearImpl(context, Semaphore, semaphore);
272 struct Context* waitTask = &Gearef(context, Queue)->data->Context;
273 goto vOperationSemaphoreImpl3(context,
274 semaphoreImpl,
275 waitTask,
276 Gearef(context, Semaphore)->next);
277 }
278
279 __code vOperationSemaphoreImpl4(struct SemaphoreImpl* semaphore, __code next(...)) {
280 struct Lock* lock = semaphore->lock;
281 goto lock->doUnlock(next(...));
282 }
283 ```