Mercurial > hg > Gears > Gears
changeset 1006:d3355697c87c
commit to pull
author | ichikitakahiro <e165713@ie.u-ryukyu.ac.jp> |
---|---|
date | Thu, 06 Jan 2022 22:53:24 +0900 |
parents | 40817b3f91e2 |
children | 111e313e883e |
files | src/parallel_execution/CMakeLists.txt src/parallel_execution/examples/socketQueue/LocalDGMQueue.cbc src/parallel_execution/examples/socketQueue/RemoteDGMQueue.cbc src/parallel_execution/examples/socketQueue/RemoteDGMQueue.h src/parallel_execution/examples/socketQueue/Remote_test.cbc src/parallel_execution/examples/socketQueue/TQueue.h |
diffstat | 6 files changed, 185 insertions(+), 117 deletions(-) [+] |
line wrap: on
line diff
--- a/src/parallel_execution/CMakeLists.txt Thu Dec 23 16:59:31 2021 +0900 +++ b/src/parallel_execution/CMakeLists.txt Thu Jan 06 22:53:24 2022 +0900 @@ -263,4 +263,11 @@ LDGM SOURCES examples/socketQueue/LocalDGMQueue.cbc examples/socketQueue/Local_test.cbc AtomicReference.cbc SingleLinkedStack.cbc +) + +GearsCommand( + TARGET + socket_wc + SOURCES + examples/socketQueue/RemoteDGMQueue.cbc examples/socketQueue/wordCount_Remote.cbc AtomicReference.cbc SingleLinkedStack.cbc ) \ No newline at end of file
--- a/src/parallel_execution/examples/socketQueue/LocalDGMQueue.cbc Thu Dec 23 16:59:31 2021 +0900 +++ b/src/parallel_execution/examples/socketQueue/LocalDGMQueue.cbc Thu Jan 06 22:53:24 2022 +0900 @@ -48,7 +48,7 @@ hints.ai_socktype = SOCK_STREAM; hints.ai_family = PF_UNSPEC; //UNSPECはIPv4,6両方のうち使えるものを返す if((err = getaddrinfo(hostname, service, &hints, &res0)) != 0){ - printf("error %d\n" , err); + printf("error %d : %s\n", err, gai_strerror(err)); return 1; } @@ -132,11 +132,11 @@ struct Element* top = tQueue->top; struct Element* last = tQueue->last; struct Element* nextElement = top->next; - if (top != tQueue->top) { + if (top != tQueue->top) { //top!= Queue->top 割り込まれたらループ? goto takeLocalDGMQueue(); } - if (top == last) { - if (nextElement != NULL) { + if (top == last) { //top = lastつまり要素が一つの場合。 + if (nextElement != NULL) { //nextがヌルでなければ、Queueのラストをlastからelementに置き換える。 struct Atomic* atomic = tQueue->atomic; goto atomic->checkAndSet(&tQueue->last, last, nextElement, takeLocalDGMQueue, takeLocalDGMQueue); } @@ -216,8 +216,12 @@ } FileString* string = NEW(FileString); - memcpy(string->str, recv_buf, sizeof(recv_buf)); + memcpy(string->str, recv_buf, sizeof(string->str)); Gearef(context, TQueue)->data = string; goto putLocalDGMQueue(string, next); } + +__code sendData(struct RemoteDGMQueue* tQueue, struct FileString* string, __code next(...)){ +} +
--- a/src/parallel_execution/examples/socketQueue/RemoteDGMQueue.cbc Thu Dec 23 16:59:31 2021 +0900 +++ b/src/parallel_execution/examples/socketQueue/RemoteDGMQueue.cbc Thu Jan 06 22:53:24 2022 +0900 @@ -1,7 +1,9 @@ #include "../../../context.h" -#interface "Queue.h" +#interface "TQueue.h" #interface "Atomic.h" -#impl "Queue.h" as "RemoteDGMQueue.h" +#interface "FileString.h" +#interface "DataTransfer.h" +#impl "TQueue.h" as "RemoteDGMQueue.h" #include <sys/socket.h> #include <netinet/in.h> @@ -10,37 +12,34 @@ #include <string.h> #include <unistd.h> #include <netdb.h> -#define BUF_SIZR 1024 - - +#define BUF_SIZE 1024 /* * Non-blocking queue of Paper: Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms(https://www.research.ibm.com/people/m/michael/podc-1996.pdf). */ -Queue* createRemoteDGMQueue(struct Context* context) { - struct Queue* queue = new Queue(); - struct RemoteDGMQueue* remoteDGMQueue = new RemoteDGMQueue(); - remoteDGMQueue->top = new Element(); // allocate a free node - remoteDGMQueue->top->next = NULL; - remoteDGMQueue->last = remoteDGMQueue->top; - remoteDGMQueue->atomic = createAtomicReference(context); - remoteDGMQueue->c_sock = createSocket(); - remoteDGMQueue->w_sock = NULL; +TQueue* createRemoteDGMQueue(struct Context* context) { + struct TQueue* tQueue = new TQueue(); + struct RemoteDGMQueue* RemoteDGMQueue = new RemoteDGMQueue(); + RemoteDGMQueue->top = new Element(); // allocate a free node + RemoteDGMQueue->top->next = NULL; + RemoteDGMQueue->last = RemoteDGMQueue->top; + RemoteDGMQueue->atomic = createAtomicReference(context); + RemoteDGMQueue->socket = createSocket(); - queue->queue = (union Data*)remoteDGMQueue; - queue->take = C_takeRemoteDGMQueue; - queue->put = C_putRemoteDGMQueue; - queue->isEmpty = C_isEmptyRemoteDGMQueue; - queue->clear = C_clearRemoteDGMQueue; - queue->getData = C_getDataRemoteDGMQueue; - return queue; + tQueue->tQueue = (union Data*)RemoteDGMQueue; + tQueue->take = C_takeRemoteDGMQueue; + tQueue->put = C_putRemoteDGMQueue; + tQueue->isEmpty = C_isEmptyRemoteDGMQueue; + tQueue->clear = C_clearRemoteDGMQueue; + tQueue->getData = C_getDataRemoteDGMQueue; + return tQueue; } int* createSocket(){ - int w_addr, c_sock; + int sock; struct sockaddr_in a_addr; - char *hostname = "Localhost"; + char *hostname = "localhost"; char *service = "8080"; struct addrinfo hints, *res0, *res; int err; @@ -49,120 +48,111 @@ hints.ai_socktype = SOCK_STREAM; hints.ai_family = PF_UNSPEC; //UNSPECはIPv4,6両方のうち使えるものを返す if((err = getaddrinfo(hostname, service, &hints, &res0)) != 0){ - printf("error %d\n" , err); + printf("error %d : %s\n", err, gai_strerror(err)); return 1; } - for (res=res0; res!=NULL; res=res->ai_next){ - w_addr = socket(res->ai_family, res->ai_socktype, res->ai_protocol); //AF_INETは古い + for (res=res0; res!=NULL; res=res->ai_next) { + sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol); + if (sock < 0) { + continue; + } + + if (connect(sock, res->ai_addr, res->ai_addrlen) != 0) { + close(sock); + continue; + } break; } - if (w_addr == -1) { - printf("socket error\n"); - return -1; - } - - struct in_addr { - u_int32_t s_addr; - }; - - memset(&a_addr, 0, sizeof(struct sockaddr_in)); - - a_addr.sin_family = PF_UNSPEC; - a_addr.sin_port = res->ai_addr; - a_addr.sin_addr.s_addr = res->ai_addrlen; + if (res == NULL) { + /* 有効な接続が出来なかった */ + printf("failed\n"); - if (bind(w_addr, res->ai_addr, res->ai_addrlen) == -1) { - printf("bind error\n"); - close(w_addr); - return -1; + return 1; } + + freeaddrinfo(res0); - if (listen(w_addr, 3) == -1) { - printf("listen error\n"); - close(w_addr); - return -1; - } - - - printf("Waiting connect...\n"); - c_sock = accept(w_addr, NULL, NULL); - if (c_sock == -1) { - printf("accept error\n"); - close(w_addr); - return -1; - } printf("Connected!!\n"); - return c_sock; + return sock; } -__code clearRemoteDGMQueue(struct RemoteDGMQueue* queue, __code next(...)) { - struct Element* top = queue->top; - struct Atomic* atomic = queue->atomic; - goto atomic->checkAndSet(&queue->top, top, NULL, next(...), clearRemoteDGMQueue); + + +__code clearRemoteDGMQueue(struct RemoteDGMQueue* tQueue, __code next(...)) { + struct Element* top = tQueue->top; + struct Atomic* atomic = tQueue->atomic; + goto atomic->checkAndSet(&tQueue->top, top, NULL, next(...), clearRemoteDGMQueue); } -__code putRemoteDGMQueue(struct RemoteDGMQueue* queue, union Data* data, __code next(...)) { +__code getTrance(struct RemoteDGMQueue* tQueue, union Data* data, __code next(...)){ + printf("get"); + goto next(...); +} + +__code putRemoteDGMQueue(struct RemoteDGMQueue* tQueue, union Data* data, __code next(...)) { Element* element = new Element(); element->data = data; element->next = NULL; - Element* last = queue->last; + Element* last = tQueue->last; Element* nextElement = last->next; - if (last != queue->last) { + if (last != tQueue->last) { goto putRemoteDGMQueue(); } if (nextElement == NULL) { - struct Atomic* atomic = queue->atomic; - goto atomic->checkAndSet(&last->next, nextElement, element, next(...), putRemoteDGMQueue); + struct Atomic* atomic = tQueue->atomic; + Gearef(context, TQueue)->data = element->data; + goto atomic->checkAndSet(&last->next, nextElement, element, sendDataRemoteDGMQueue, putRemoteDGMQueue); //書き換え } else { - struct Atomic* atomic = queue->atomic; - goto atomic->checkAndSet(&queue->last, last, nextElement, putRemoteDGMQueue, putRemoteDGMQueue); + struct Atomic* atomic = tQueue->atomic; + goto atomic->checkAndSet(&tQueue->last, last, nextElement, putRemoteDGMQueue, putRemoteDGMQueue); } } -__code takeRemoteDGMQueue(struct RemoteDGMQueue* queue, __code next(union Data* data, ...)) { - struct Element* top = queue->top; - struct Element* last = queue->last; +__code takeRemoteDGMQueue(struct RemoteDGMQueue* tQueue, __code next(union Data* data, ...)) { + struct Element* top = tQueue->top; + struct Element* last = tQueue->last; struct Element* nextElement = top->next; - if (top != queue->top) { + if (top != tQueue->top) { goto takeRemoteDGMQueue(); } if (top == last) { if (nextElement != NULL) { - struct Atomic* atomic = queue->atomic; - goto atomic->checkAndSet(&queue->last, last, nextElement, takeRemoteDGMQueue, takeRemoteDGMQueue); + struct Atomic* atomic = tQueue->atomic; + goto atomic->checkAndSet(&tQueue->last, last, nextElement, takeRemoteDGMQueue, takeRemoteDGMQueue); } } else { - struct Atomic* atomic = queue->atomic; - goto atomic->checkAndSet(&queue->top, top, nextElement, takeRemoteDGMQueue1, takeRemoteDGMQueue); + struct Atomic* atomic = tQueue->atomic; + goto atomic->checkAndSet(&tQueue->top, top, nextElement, takeRemoteDGMQueue1, takeRemoteDGMQueue); } goto takeRemoteDGMQueue(); } -__code takeRemoteDGMQueue1(struct RemoteDGMQueue* queue, __code next(union Data* data, ...), struct Element* nextElement) { +__code takeRemoteDGMQueue1(struct RemoteDGMQueue* tQueue, __code next(union Data* data, ...), struct Element* nextElement) { data = nextElement->data; + Gearef(context, TQueue)->data = data; goto next(data, ...); } __code takeRemoteDGMQueue1_stub(struct Context* context) { - RemoteDGMQueue* queue = (RemoteDGMQueue*)GearImpl(context, Queue, queue); - enum Code next = Gearef(context, Queue)->next; - Data** O_data = &Gearef(context, Queue)->data; + RemoteDGMQueue* tQueue = (RemoteDGMQueue*)GearImpl(context, TQueue, tQueue); + enum Code next = Gearef(context, TQueue)->next; + Data** O_data = &Gearef(context, TQueue)->data; goto takeRemoteDGMQueue1(context, - queue, + tQueue, next, O_data, (struct Element*)Gearef(context, Atomic)->newData); } -__code isEmptyRemoteDGMQueue(struct RemoteDGMQueue* queue, __code next(...), __code whenEmpty(...)) { - struct Element* top = queue->top; - struct Element* last = queue->last; +__code isEmptyRemoteDGMQueue(struct RemoteDGMQueue* tQueue, __code next(...), __code whenEmpty(...)) { + struct Element* top = tQueue->top; + struct Element* last = tQueue->last; struct Element* nextElement = top->next; - if (top != queue->top) { + if (top != tQueue->top) { goto isEmptyRemoteDGMQueue(); } if (top == last && nextElement == NULL) { @@ -171,21 +161,54 @@ goto next(...); } +__code sendData(struct RemoteDGMQueue* tQueue, FileString* string, __code next(...)){ + char send_buf[BUF_SIZE], recv_buf; + int send_size, recv_size; -__code connectToLocal(struct RemoteDGMQueue* queue, __code next(...)){ - printf("Waiting connect...\n"); - queue->c_sock = accept(queue->w_sock, NULL, NULL); - if (queue->c_sock == -1) { - printf("accept error\n"); - close(queue->w_sock); - return -1; - } - printf("Connected!!\n"); - close(queue->w_sock); + + /* サーバーに送る文字列を取得 */ + memcpy(send_buf, string->str, sizeof(send_buf)); + + /* 文字列を送信 */ + send_size = send(tQueue->socket, send_buf, strlen(send_buf) + 1, 0); + if (send_size == -1) { + printf("send error\n"); + close(tQueue->socket); + goto exit_code(); + } + /* サーバーからの応答を受信 */ + recv_size = recv(tQueue->socket, &recv_buf, 1, 0); + if (recv_size == -1) { + printf("recv error\n"); + close(tQueue->socket); + goto exit_code(); + } + + if (recv_size == 0) { + /* 受信サイズが0の場合は相手が接続閉じていると判断 */ + printf("connection ended\n"); + close(tQueue->socket); + goto exit_code(); + } + + /* 応答が0の場合はデータ送信終了 */ + if (recv_buf == 0) { + printf("Finish connection\n"); + close(tQueue->socket); + goto exit_code(); + } goto next(...); } -__code connectToRemote(struct RemoteDGMQueue* queue, __code next(...)){ - -} \ No newline at end of file + +__code sendDataRemoteDGMQueue_stub(struct Context* context) { + RemoteDGMQueue* tQueue = (RemoteDGMQueue*)GearImpl(context, TQueue, tQueue); + FileString* string = Gearef(context, TQueue)->data; + enum Code next = Gearef(context, TQueue)->next; + goto sendDataRemoteDGMQueue(context, tQueue, string, next); +} + +__code getData(struct RemoteDGMQueue* tQueue, __code next(...)){ + +}
--- a/src/parallel_execution/examples/socketQueue/RemoteDGMQueue.h Thu Dec 23 16:59:31 2021 +0900 +++ b/src/parallel_execution/examples/socketQueue/RemoteDGMQueue.h Thu Jan 06 22:53:24 2022 +0900 @@ -1,7 +1,11 @@ -typedef struct RemoteDGMQueue <> impl Queue { +typedef struct RemoteDGMQueue <> impl TQueue { struct Element* top; struct Element* last; struct Atomic* atomic; - int* w_sock; - int* c_sock; + + int* socket; + char* buffer; + char* send_buf; + } RemoteDGMQueue; +
--- a/src/parallel_execution/examples/socketQueue/Remote_test.cbc Thu Dec 23 16:59:31 2021 +0900 +++ b/src/parallel_execution/examples/socketQueue/Remote_test.cbc Thu Jan 06 22:53:24 2022 +0900 @@ -1,28 +1,56 @@ #include "../../../context.h" -#interface "Queue.h" +#interface "TQueue.h" #interface "Integer.h" - +#interface "FileString.h" +#interface "DataTransfer.h" -__code Task1(Queue* remoteDGMQueue){ - Integer* integer = NEW(Integer); - integer->value = "22"; - goto remoteDGMQueue->put(integer, Task2); +__code Task1(TQueue* remoteDGMQueue){ + FileString* string = NEW(FileString); + char word[1024] = "first"; + memcpy(string->str, word, sizeof(word)); + goto remoteDGMQueue->put(string, Task2); } __code Task1_stub(struct Context* context){ - Queue* remoteDGMQueue = createRemoteDGMQueue(context); + TQueue* remoteDGMQueue = createRemoteDGMQueue(context); goto Task1(context, remoteDGMQueue); } -__code Task2(Queue* remoteDGMQueue){ +__code Task2(TQueue* remoteDGMQueue){ printf("Task2\n"); + FileString* string = NEW(FileString); + char word[1024] = "secound"; + memcpy(string->str, word, sizeof(word)); + goto remoteDGMQueue->put(string, Task3); } __code Task2_stub(struct Context* context){ - Queue* remoteDGMQueue = (struct Queue*)Gearef(context, Queue)->queue; + TQueue* remoteDGMQueue = (struct TQueue*)Gearef(context, TQueue)->tQueue; goto Task2(context, remoteDGMQueue); } +__code Task3(TQueue* remoteDGMQueue){ + printf("Task3\n"); + FileString* string = NEW(FileString); + char word[1024] = "finish"; + memcpy(string->str, word, sizeof(word)); + goto remoteDGMQueue->put(string, Task4); ; +} + +__code Task3_stub(struct Context* context){ + TQueue* remoteDGMQueue = (struct TQueue*)Gearef(context, TQueue)->tQueue; + goto Task3(context, remoteDGMQueue); +} + +__code Task4(TQueue* remoteDGMQueue){ + goto exit_code(); +} + +__code Task4_stub(struct Context* context){ + TQueue* remoteDGMQueue = (struct TQueue*)Gearef(context, TQueue)->tQueue; + goto Task4(context, remoteDGMQueue); +} + int main(int argc, char** argv) { printf("main\n"); goto Task1();
--- a/src/parallel_execution/examples/socketQueue/TQueue.h Thu Dec 23 16:59:31 2021 +0900 +++ b/src/parallel_execution/examples/socketQueue/TQueue.h Thu Jan 06 22:53:24 2022 +0900 @@ -1,6 +1,7 @@ typedef struct TQueue<>{ union Data* tQueue; union Data* data; + struct FileString* string; __code whenEmpty(...); __code clear(Impl* tQueue, __code next(...)); @@ -8,6 +9,7 @@ __code take(Impl* tQueue, __code next(union Data* data, ...)); __code isEmpty(Impl* tQueue, __code next(...), __code whenEmpty(...)); + __code sendData(Impl* tQueue, FileString* string, __code next(...)); __code getData(Impl* tQueue, __code next(...)); __code next(...); } TQueue;