Mercurial > hg > Gears > Gears
changeset 1000:d8142d91bc71
tweak
author | ichikitakahiro <e165713@ie.u-ryukyu.ac.jp> |
---|---|
date | Thu, 23 Dec 2021 16:59:01 +0900 |
parents | 705fad3712cd |
children | 40817b3f91e2 |
files | src/parallel_execution/examples/socketQueue/DataTransfer.h src/parallel_execution/examples/socketQueue/DataTransferImpl.cbc src/parallel_execution/examples/socketQueue/DataTransferImpl.h src/parallel_execution/examples/socketQueue/FileString.h src/parallel_execution/examples/socketQueue/LocalDGMQueue.cbc src/parallel_execution/examples/socketQueue/LocalDGMQueue.h src/parallel_execution/examples/socketQueue/Local_test.cbc src/parallel_execution/examples/socketQueue/RemoteDGMQueue.cbc src/parallel_execution/examples/socketQueue/RemoteDGMQueue.h src/parallel_execution/examples/socketQueue/Remote_test.cbc src/parallel_execution/examples/socketQueue/TQueue.h |
diffstat | 11 files changed, 636 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/parallel_execution/examples/socketQueue/DataTransfer.h Thu Dec 23 16:59:01 2021 +0900 @@ -0,0 +1,8 @@ +typedef struct DataTransfer<>{ + union Data* dataTransfer; + struct Element* element; + + __code getData(Impl* dataTransfer, __code next(...)); + __code putData(Impl* dataTransfer, Type* element, __code next(...)); + __code next(...); +} DataTransfer;
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/parallel_execution/examples/socketQueue/DataTransferImpl.cbc Thu Dec 23 16:59:01 2021 +0900 @@ -0,0 +1,103 @@ +#include "../../../context.h" +#interface "DataTransfer.h" +#impl "DataTransfer.h" as "DataTransfer.h" + + +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <stdio.h> +#include <string.h> +#include <unistd.h> +#include <netdb.h> +#define BUF_SIZR 1024 + + +// ---- +// typedef struct DataTransferImpl <> impl DataTransfer{ +// int* socket; +// } DataTransfer; +// ---- + +DataTransfer* createDataTransferImpl(struct Context* context) { + struct DataTransfer* dataTransfer = new DataTransfer(); + struct DataTransferImpl* data_transfer_impl = new DataTransferImpl(); + dataTransfer->dataTransfer = (union Data*)data_transfer_impl; + dataTransfer->element = NULL; + data_transfer_impl->socket = createSocket(); + dataTransfer->getData = C_getDataDataTransferImpl; + dataTransfer->putData = C_putDataDataTransferImpl; + return dataTransfer; +} + +int* createSocket(){ + int w_addr, c_sock; + struct sockaddr_in a_addr; + char *hostname = "Localhost"; + char *service = "8080"; + struct addrinfo hints, *res0, *res; + int err; + + memset(&hints, 0, sizeof(hints)); + hints.ai_socktype = SOCK_STREAM; + hints.ai_family = PF_UNSPEC; //UNSPECはIPv4,6両方のうち使えるものを返す + if((err = getaddrinfo(hostname, service, &hints, &res0)) != 0){ + printf("error %d\n" , err); + return 1; + } + + for (res=res0; res!=NULL; res=res->ai_next){ + w_addr = socket(res->ai_family, res->ai_socktype, res->ai_protocol); //AF_INETは古い + break; + } + + if (w_addr == -1) { + printf("socket error\n"); + return -1; + } + + struct in_addr { + u_int32_t s_addr; + }; + + memset(&a_addr, 0, sizeof(struct sockaddr_in)); + + a_addr.sin_family = PF_UNSPEC; + a_addr.sin_port = res->ai_addr; + a_addr.sin_addr.s_addr = res->ai_addrlen; + + if (bind(w_addr, res->ai_addr, res->ai_addrlen) == -1) { + printf("bind error\n"); + close(w_addr); + return -1; + } + + if (listen(w_addr, 3) == -1) { + printf("listen error\n"); + close(w_addr); + return -1; + } + + + printf("Waiting connect...\n"); + c_sock = accept(w_addr, NULL, NULL); + if (c_sock == -1) { + printf("accept error\n"); + close(w_addr); + return -1; + } + printf("Connected!!\n"); + return c_sock; +} + + +__code getData(struct DataTransferImpl* dataTransfer, __code next(...)) { + printf("getData"); + goto next(...); +} + +__code putData(struct DataTransferImpl* dataTransfer, struct DataTransfer* element, __code next(...)) { + + goto next(...); +} +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/parallel_execution/examples/socketQueue/DataTransferImpl.h Thu Dec 23 16:59:01 2021 +0900 @@ -0,0 +1,3 @@ +typedef struct DataTransferImpl <> impl DataTransfer{ + int* socket; +} DataTransferImpl;
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/parallel_execution/examples/socketQueue/FileString.h Thu Dec 23 16:59:01 2021 +0900 @@ -0,0 +1,4 @@ +typedef struct FileString <> { + char str[1024]; + int size; +} FileString;
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/parallel_execution/examples/socketQueue/LocalDGMQueue.cbc Thu Dec 23 16:59:01 2021 +0900 @@ -0,0 +1,223 @@ +#include "../../../context.h" +#interface "TQueue.h" +#interface "Atomic.h" +#interface "FileString.h" +#interface "DataTransfer.h" +#impl "TQueue.h" as "LocalDGMQueue.h" + +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <stdio.h> +#include <string.h> +#include <unistd.h> +#include <netdb.h> +#define BUF_SIZE 1024 + +/* + * Non-blocking queue of Paper: Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms(https://www.research.ibm.com/people/m/michael/podc-1996.pdf). + */ + +TQueue* createLocalDGMQueue(struct Context* context) { + struct TQueue* tQueue = new TQueue(); + struct LocalDGMQueue* LocalDGMQueue = new LocalDGMQueue(); + LocalDGMQueue->top = new Element(); // allocate a free node + LocalDGMQueue->top->next = NULL; + LocalDGMQueue->last = LocalDGMQueue->top; + LocalDGMQueue->atomic = createAtomicReference(context); + LocalDGMQueue->socket = createSocket(); + + tQueue->tQueue = (union Data*)LocalDGMQueue; + tQueue->take = C_takeLocalDGMQueue; + tQueue->put = C_putLocalDGMQueue; + tQueue->isEmpty = C_isEmptyLocalDGMQueue; + tQueue->clear = C_clearLocalDGMQueue; + tQueue->getData = C_getDataLocalDGMQueue; + return tQueue; +} + +int* createSocket(){ + int w_addr, c_sock; + struct sockaddr_in a_addr; + char *hostname = "Localhost"; + char *service = "8080"; + struct addrinfo hints, *res0, *res; + int err; + + memset(&hints, 0, sizeof(hints)); + hints.ai_socktype = SOCK_STREAM; + hints.ai_family = PF_UNSPEC; //UNSPECはIPv4,6両方のうち使えるものを返す + if((err = getaddrinfo(hostname, service, &hints, &res0)) != 0){ + printf("error %d\n" , err); + return 1; + } + + for (res=res0; res!=NULL; res=res->ai_next){ + w_addr = socket(res->ai_family, res->ai_socktype, res->ai_protocol); //AF_INETは古い + break; + } + + if (w_addr == -1) { + printf("socket error\n"); + return -1; + } + + struct in_addr { + u_int32_t s_addr; + }; + + memset(&a_addr, 0, sizeof(struct sockaddr_in)); + + a_addr.sin_family = PF_UNSPEC; + a_addr.sin_port = res->ai_addr; + a_addr.sin_addr.s_addr = res->ai_addrlen; + + if (bind(w_addr, res->ai_addr, res->ai_addrlen) == -1) { + printf("bind error\n"); + close(w_addr); + return -1; + } + + if (listen(w_addr, 3) == -1) { + printf("listen error\n"); + close(w_addr); + return -1; + } + + printf("Waiting connect...\n"); + c_sock = accept(w_addr, NULL, NULL); + if (c_sock == -1) { + printf("accept error\n"); + close(w_addr); + return -1; + } + printf("Connected!!\n"); + return c_sock; +} + + + + +__code clearLocalDGMQueue(struct LocalDGMQueue* tQueue, __code next(...)) { + struct Element* top = tQueue->top; + struct Atomic* atomic = tQueue->atomic; + goto atomic->checkAndSet(&tQueue->top, top, NULL, next(...), clearLocalDGMQueue); +} + +__code getTrance(struct LocalDGMQueue* tQueue, union Data* data, __code next(...)){ + printf("get"); + goto next(...); +} + +__code putLocalDGMQueue(struct LocalDGMQueue* tQueue, union Data* data, __code next(...)) { + Element* element = new Element(); + element->data = data; + element->next = NULL; + Element* last = tQueue->last; + Element* nextElement = last->next; + if (last != tQueue->last) { + goto putLocalDGMQueue(); + } + if (nextElement == NULL) { + struct Atomic* atomic = tQueue->atomic; + goto atomic->checkAndSet(&last->next, nextElement, element, getDataLocalDGMQueue, putLocalDGMQueue); //書き換え + } else { + struct Atomic* atomic = tQueue->atomic; + goto atomic->checkAndSet(&tQueue->last, last, nextElement, putLocalDGMQueue, putLocalDGMQueue); + } +} + + +__code takeLocalDGMQueue(struct LocalDGMQueue* tQueue, __code next(union Data* data, ...)) { + struct Element* top = tQueue->top; + struct Element* last = tQueue->last; + struct Element* nextElement = top->next; + if (top != tQueue->top) { + goto takeLocalDGMQueue(); + } + if (top == last) { + if (nextElement != NULL) { + struct Atomic* atomic = tQueue->atomic; + goto atomic->checkAndSet(&tQueue->last, last, nextElement, takeLocalDGMQueue, takeLocalDGMQueue); + } + } else { + struct Atomic* atomic = tQueue->atomic; + goto atomic->checkAndSet(&tQueue->top, top, nextElement, takeLocalDGMQueue1, takeLocalDGMQueue); + } + goto takeLocalDGMQueue(); +} + +__code takeLocalDGMQueue1(struct LocalDGMQueue* tQueue, __code next(union Data* data, ...), struct Element* nextElement) { + data = nextElement->data; + Gearef(context, TQueue)->data = data; + goto next(data, ...); +} + +__code takeLocalDGMQueue1_stub(struct Context* context) { + LocalDGMQueue* tQueue = (LocalDGMQueue*)GearImpl(context, TQueue, tQueue); + enum Code next = Gearef(context, TQueue)->next; + Data** O_data = &Gearef(context, TQueue)->data; + goto takeLocalDGMQueue1(context, + tQueue, + next, + O_data, + (struct Element*)Gearef(context, Atomic)->newData); +} + +__code isEmptyLocalDGMQueue(struct LocalDGMQueue* tQueue, __code next(...), __code whenEmpty(...)) { + struct Element* top = tQueue->top; + struct Element* last = tQueue->last; + struct Element* nextElement = top->next; + if (top != tQueue->top) { + goto isEmptyLocalDGMQueue(); + } + if (top == last && nextElement == NULL) { + goto whenEmpty(...); + } + goto next(...); +} + +__code getData(struct LocalDGMQueue* tQueue, __code next(...)){ + int recv_size, send_size; + char recv_buf[BUF_SIZE], send_buf; + + /* クライアントから文字列を受信 */ + recv_size = recv(tQueue->socket, recv_buf, BUF_SIZE, 0); + if (recv_size == -1) { + printf("recv error\n"); + goto exit_code(); + } + if (recv_size == 0) { + /* 受信サイズが0の場合は相手が接続閉じていると判断 */ + printf("connection ended\n"); + } + + /* 受信した文字列を表示 */ + printf("%s\n", recv_buf); + + /**/ + /* 文字列が"finish"ならクライアントとの接続終了 */ + if (strcmp(recv_buf, "finish") == 0) { + /* 接続終了を表す0を送信 */ + send_buf = 0; + send_size = send(tQueue->socket, &send_buf, 1, 0); + if (send_size == -1) { + printf("send error\n"); + } + close(tQueue->buffer); + goto next(...); + } else { + /* "finish"以外の場合はクライアントとの接続を継続 */ + send_buf = 1; + send_size = send(tQueue->socket, &send_buf, 1, 0); + if (send_size == -1) { + printf("send error\n"); + } + } + + FileString* string = NEW(FileString); + memcpy(string->str, recv_buf, sizeof(recv_buf)); + Gearef(context, TQueue)->data = string; + + goto putLocalDGMQueue(string, next); +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/parallel_execution/examples/socketQueue/LocalDGMQueue.h Thu Dec 23 16:59:01 2021 +0900 @@ -0,0 +1,11 @@ +typedef struct LocalDGMQueue <> impl TQueue { + struct Element* top; + struct Element* last; + struct Atomic* atomic; + + int* socket; + char* buffer; + char* send_buf; + +} LocalDGMQueue; +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/parallel_execution/examples/socketQueue/Local_test.cbc Thu Dec 23 16:59:01 2021 +0900 @@ -0,0 +1,44 @@ +#include "../../../context.h" +#interface "TQueue.h" +#interface "Integer.h" +#interface "FileString.h" +#interface "DataTransfer.h" + +__code Task1(TQueue* localDGMQueue){ + Integer* integer = NEW(Integer); + integer->value = "22"; + goto localDGMQueue->getData(Task2); +} + +__code Task1_stub(struct Context* context){ + TQueue* localDGMQueue = createLocalDGMQueue(context); + goto Task1(context, localDGMQueue); +} + +__code Task2(TQueue* localDGMQueue){ + printf("Task2\n"); + goto localDGMQueue->take(Task3); +} + +__code Task2_stub(struct Context* context){ + TQueue* localDGMQueue = (struct TQueue*)Gearef(context, TQueue)->tQueue; + goto Task2(context, localDGMQueue); +} + + +__code Task3(TQueue* localDGMQueue, FileString* string){ + printf("take[%s]\n", string->str); + printf("Task3_exit\n"); + goto localDGMQueue->take(exit_code); +} + +__code Task3_stub(struct Context* context){ + TQueue* localDGMQueue = (struct TQueue*)Gearef(context, TQueue)->tQueue; + FileString* string = Gearef(context, TQueue)->data; + goto Task3(context, localDGMQueue, string); +} + +int main(int argc, char** argv) { + printf("main\n"); + goto Task1(); +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/parallel_execution/examples/socketQueue/RemoteDGMQueue.cbc Thu Dec 23 16:59:01 2021 +0900 @@ -0,0 +1,191 @@ +#include "../../../context.h" +#interface "Queue.h" +#interface "Atomic.h" +#impl "Queue.h" as "RemoteDGMQueue.h" + +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <stdio.h> +#include <string.h> +#include <unistd.h> +#include <netdb.h> +#define BUF_SIZR 1024 + + + +/* + * Non-blocking queue of Paper: Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms(https://www.research.ibm.com/people/m/michael/podc-1996.pdf). + */ + +Queue* createRemoteDGMQueue(struct Context* context) { + struct Queue* queue = new Queue(); + struct RemoteDGMQueue* remoteDGMQueue = new RemoteDGMQueue(); + remoteDGMQueue->top = new Element(); // allocate a free node + remoteDGMQueue->top->next = NULL; + remoteDGMQueue->last = remoteDGMQueue->top; + remoteDGMQueue->atomic = createAtomicReference(context); + remoteDGMQueue->c_sock = createSocket(); + remoteDGMQueue->w_sock = NULL; + + queue->queue = (union Data*)remoteDGMQueue; + queue->take = C_takeRemoteDGMQueue; + queue->put = C_putRemoteDGMQueue; + queue->isEmpty = C_isEmptyRemoteDGMQueue; + queue->clear = C_clearRemoteDGMQueue; + queue->getData = C_getDataRemoteDGMQueue; + return queue; +} + +int* createSocket(){ + int w_addr, c_sock; + struct sockaddr_in a_addr; + char *hostname = "Localhost"; + char *service = "8080"; + struct addrinfo hints, *res0, *res; + int err; + + memset(&hints, 0, sizeof(hints)); + hints.ai_socktype = SOCK_STREAM; + hints.ai_family = PF_UNSPEC; //UNSPECはIPv4,6両方のうち使えるものを返す + if((err = getaddrinfo(hostname, service, &hints, &res0)) != 0){ + printf("error %d\n" , err); + return 1; + } + + for (res=res0; res!=NULL; res=res->ai_next){ + w_addr = socket(res->ai_family, res->ai_socktype, res->ai_protocol); //AF_INETは古い + break; + } + + if (w_addr == -1) { + printf("socket error\n"); + return -1; + } + + struct in_addr { + u_int32_t s_addr; + }; + + memset(&a_addr, 0, sizeof(struct sockaddr_in)); + + a_addr.sin_family = PF_UNSPEC; + a_addr.sin_port = res->ai_addr; + a_addr.sin_addr.s_addr = res->ai_addrlen; + + if (bind(w_addr, res->ai_addr, res->ai_addrlen) == -1) { + printf("bind error\n"); + close(w_addr); + return -1; + } + + if (listen(w_addr, 3) == -1) { + printf("listen error\n"); + close(w_addr); + return -1; + } + + + printf("Waiting connect...\n"); + c_sock = accept(w_addr, NULL, NULL); + if (c_sock == -1) { + printf("accept error\n"); + close(w_addr); + return -1; + } + printf("Connected!!\n"); + return c_sock; +} + + +__code clearRemoteDGMQueue(struct RemoteDGMQueue* queue, __code next(...)) { + struct Element* top = queue->top; + struct Atomic* atomic = queue->atomic; + goto atomic->checkAndSet(&queue->top, top, NULL, next(...), clearRemoteDGMQueue); +} + +__code putRemoteDGMQueue(struct RemoteDGMQueue* queue, union Data* data, __code next(...)) { + Element* element = new Element(); + element->data = data; + element->next = NULL; + Element* last = queue->last; + Element* nextElement = last->next; + if (last != queue->last) { + goto putRemoteDGMQueue(); + } + if (nextElement == NULL) { + struct Atomic* atomic = queue->atomic; + goto atomic->checkAndSet(&last->next, nextElement, element, next(...), putRemoteDGMQueue); + } else { + struct Atomic* atomic = queue->atomic; + goto atomic->checkAndSet(&queue->last, last, nextElement, putRemoteDGMQueue, putRemoteDGMQueue); + } +} + + +__code takeRemoteDGMQueue(struct RemoteDGMQueue* queue, __code next(union Data* data, ...)) { + struct Element* top = queue->top; + struct Element* last = queue->last; + struct Element* nextElement = top->next; + if (top != queue->top) { + goto takeRemoteDGMQueue(); + } + if (top == last) { + if (nextElement != NULL) { + struct Atomic* atomic = queue->atomic; + goto atomic->checkAndSet(&queue->last, last, nextElement, takeRemoteDGMQueue, takeRemoteDGMQueue); + } + } else { + struct Atomic* atomic = queue->atomic; + goto atomic->checkAndSet(&queue->top, top, nextElement, takeRemoteDGMQueue1, takeRemoteDGMQueue); + } + goto takeRemoteDGMQueue(); +} + +__code takeRemoteDGMQueue1(struct RemoteDGMQueue* queue, __code next(union Data* data, ...), struct Element* nextElement) { + data = nextElement->data; + goto next(data, ...); +} + +__code takeRemoteDGMQueue1_stub(struct Context* context) { + RemoteDGMQueue* queue = (RemoteDGMQueue*)GearImpl(context, Queue, queue); + enum Code next = Gearef(context, Queue)->next; + Data** O_data = &Gearef(context, Queue)->data; + goto takeRemoteDGMQueue1(context, + queue, + next, + O_data, + (struct Element*)Gearef(context, Atomic)->newData); +} + +__code isEmptyRemoteDGMQueue(struct RemoteDGMQueue* queue, __code next(...), __code whenEmpty(...)) { + struct Element* top = queue->top; + struct Element* last = queue->last; + struct Element* nextElement = top->next; + if (top != queue->top) { + goto isEmptyRemoteDGMQueue(); + } + if (top == last && nextElement == NULL) { + goto whenEmpty(...); + } + goto next(...); +} + + +__code connectToLocal(struct RemoteDGMQueue* queue, __code next(...)){ + printf("Waiting connect...\n"); + queue->c_sock = accept(queue->w_sock, NULL, NULL); + if (queue->c_sock == -1) { + printf("accept error\n"); + close(queue->w_sock); + return -1; + } + printf("Connected!!\n"); + close(queue->w_sock); + + goto next(...); +} + +__code connectToRemote(struct RemoteDGMQueue* queue, __code next(...)){ + +} \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/parallel_execution/examples/socketQueue/RemoteDGMQueue.h Thu Dec 23 16:59:01 2021 +0900 @@ -0,0 +1,7 @@ +typedef struct RemoteDGMQueue <> impl Queue { + struct Element* top; + struct Element* last; + struct Atomic* atomic; + int* w_sock; + int* c_sock; +} RemoteDGMQueue;
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/parallel_execution/examples/socketQueue/Remote_test.cbc Thu Dec 23 16:59:01 2021 +0900 @@ -0,0 +1,29 @@ +#include "../../../context.h" +#interface "Queue.h" +#interface "Integer.h" + + +__code Task1(Queue* remoteDGMQueue){ + Integer* integer = NEW(Integer); + integer->value = "22"; + goto remoteDGMQueue->put(integer, Task2); +} + +__code Task1_stub(struct Context* context){ + Queue* remoteDGMQueue = createRemoteDGMQueue(context); + goto Task1(context, remoteDGMQueue); +} + +__code Task2(Queue* remoteDGMQueue){ + printf("Task2\n"); +} + +__code Task2_stub(struct Context* context){ + Queue* remoteDGMQueue = (struct Queue*)Gearef(context, Queue)->queue; + goto Task2(context, remoteDGMQueue); +} + +int main(int argc, char** argv) { + printf("main\n"); + goto Task1(); +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/parallel_execution/examples/socketQueue/TQueue.h Thu Dec 23 16:59:01 2021 +0900 @@ -0,0 +1,13 @@ +typedef struct TQueue<>{ + union Data* tQueue; + union Data* data; + + __code whenEmpty(...); + __code clear(Impl* tQueue, __code next(...)); + __code put(Impl* tQueue, union Data* data, __code next(...)); + __code take(Impl* tQueue, __code next(union Data* data, ...)); + __code isEmpty(Impl* tQueue, __code next(...), __code whenEmpty(...)); + + __code getData(Impl* tQueue, __code next(...)); + __code next(...); +} TQueue;