Mercurial > hg > Gears > Gears
view src/parallel_execution/examples/socketQueue/LocalDGMQueue.cbc @ 1032:793b21a8ea12
fix include
author | Shinji KONO <kono@ie.u-ryukyu.ac.jp> |
---|---|
date | Tue, 24 Oct 2023 12:25:40 +0900 |
parents | 635ccc391642 |
children |
line wrap: on
line source
#interface "TQueue.h" #interface "Atomic.h" #interface "FileString.h" #interface "DataTransfer.h" #impl "TQueue.h" as "LocalDGMQueue.h" #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <stdio.h> #include <string.h> #include <unistd.h> #include <netdb.h> #define BUF_SIZE 1024 /* * Non-blocking queue of Paper: Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms(https://www.research.ibm.com/people/m/michael/podc-1996.pdf). */ TQueue* createLocalDGMQueue(struct Context* context, char* sNum) { struct TQueue* tQueue = new TQueue(); struct LocalDGMQueue* LocalDGMQueue = new LocalDGMQueue(); LocalDGMQueue->top = new Element(); // allocate a free node LocalDGMQueue->top->next = NULL; LocalDGMQueue->last = LocalDGMQueue->top; LocalDGMQueue->atomic = createAtomicReference(context); LocalDGMQueue->socket = createSocketLocalDGMQueue(sNum); tQueue->tQueue = (union Data*)LocalDGMQueue; tQueue->take = C_takeLocalDGMQueue; tQueue->put = C_putLocalDGMQueue; tQueue->isEmpty = C_isEmptyLocalDGMQueue; tQueue->clear = C_clearLocalDGMQueue; tQueue->getData = C_getDataLocalDGMQueue; return tQueue; } int* createSocketLocalDGMQueue(char* sNum){ int w_addr, c_sock; struct sockaddr_in a_addr; char *hostname = "Localhost"; char *service = sNum; struct addrinfo hints, *res0, *res; int err; memset(&hints, 0, sizeof(hints)); hints.ai_socktype = SOCK_STREAM; hints.ai_family = PF_UNSPEC; //UNSPECはIPv4,6両方のうち使えるものを返す if((err = getaddrinfo(hostname, service, &hints, &res0)) != 0){ printf("error %d : %s\n", err, gai_strerror(err)); return 1; } for (res=res0; res!=NULL; res=res->ai_next){ w_addr = socket(res->ai_family, res->ai_socktype, res->ai_protocol); //AF_INETは古い break; } if (w_addr == -1) { printf("socket error\n"); return -1; } struct in_addr { u_int32_t s_addr; }; memset(&a_addr, 0, sizeof(struct sockaddr_in)); a_addr.sin_family = PF_UNSPEC; a_addr.sin_port = res->ai_addr; a_addr.sin_addr.s_addr = res->ai_addrlen; if (bind(w_addr, res->ai_addr, res->ai_addrlen) == -1) { printf("bind error\n"); close(w_addr); return -1; } if (listen(w_addr, 3) == -1) { printf("listen error\n"); close(w_addr); return -1; } printf("Waiting connect...\n"); c_sock = accept(w_addr, NULL, NULL); if (c_sock == -1) { printf("accept error\n"); close(w_addr); return -1; } printf("Connected!!\n"); return c_sock; } __code clearLocalDGMQueue(struct LocalDGMQueue* tQueue, __code next(...)) { struct Element* top = tQueue->top; struct Atomic* atomic = tQueue->atomic; goto atomic->checkAndSet(&tQueue->top, top, NULL, next(...), clearLocalDGMQueue); } __code getTranceLocalDGMQueue(struct LocalDGMQueue* tQueue, union Data* data, __code next(...)){ printf("get"); goto next(...); } __code putLocalDGMQueue(struct LocalDGMQueue* tQueue, union Data* data, __code next(...)) { Element* element = new Element(); element->data = data; element->next = NULL; Element* last = tQueue->last; Element* nextElement = last->next; if (last != tQueue->last) { goto putLocalDGMQueue(); } if (nextElement == NULL) { struct Atomic* atomic = tQueue->atomic; goto atomic->checkAndSet(&last->next, nextElement, element, next(...), putLocalDGMQueue); //書き換え } else { struct Atomic* atomic = tQueue->atomic; goto atomic->checkAndSet(&tQueue->last, last, nextElement, putLocalDGMQueue, putLocalDGMQueue); } } __code takeLocalDGMQueue(struct LocalDGMQueue* tQueue, __code next(union Data* data, ...)) { struct Element* top = tQueue->top; struct Element* last = tQueue->last; struct Element* nextElement = top->next; if (top != tQueue->top) { //top!= Queue->top 割り込まれたらループ? goto takeLocalDGMQueue(); } if (top == last) { //top = lastつまり要素が一つの場合。 if (nextElement != NULL) { //nextがヌルでなければ、Queueのラストをlastからelementに置き換える。 struct Atomic* atomic = tQueue->atomic; goto atomic->checkAndSet(&tQueue->last, last, nextElement, takeLocalDGMQueue, takeLocalDGMQueue); } } else { struct Atomic* atomic = tQueue->atomic; goto atomic->checkAndSet(&tQueue->top, top, nextElement, takeLocalDGMQueue1, takeLocalDGMQueue); } goto takeLocalDGMQueue(); } __code takeLocalDGMQueue1(struct LocalDGMQueue* tQueue, __code next(union Data* data, ...), struct Element* nextElement) { data = nextElement->data; Gearef(context, TQueue)->data = data; goto next(data, ...); } __code takeLocalDGMQueue1_stub(struct Context* context) { LocalDGMQueue* tQueue = (LocalDGMQueue*)GearImpl(context, TQueue, tQueue); enum Code next = Gearef(context, TQueue)->next; Data** O_data = &Gearef(context, TQueue)->data; goto takeLocalDGMQueue1(context, tQueue, next, O_data, (struct Element*)Gearef(context, Atomic)->newData); } __code isEmptyLocalDGMQueue(struct LocalDGMQueue* tQueue, __code next(...), __code whenEmpty(...)) { struct Element* top = tQueue->top; struct Element* last = tQueue->last; struct Element* nextElement = top->next; if (top != tQueue->top) { goto isEmptyLocalDGMQueue(); } if (top == last && nextElement == NULL) { goto whenEmpty(...); } goto next(...); } __code getDataLocalDGMQueue(struct LocalDGMQueue* tQueue, __code next(...), __code whenEOF(...)){ int recv_size, send_size; char recv_buf[BUF_SIZE], send_buf; /* クライアントから文字列を受信 */ union Data* recv_data; recv_size = read(tQueue->socket, recv_data, sizeof(union Data)); //printf("[%s] [%d]\n", fileString->str, fileString->size); if (recv_size == -1) { printf("recv error\n"); goto exit_code(); } if (recv_size == 0) { /* 受信サイズが0の場合は相手が接続閉じていると判断 */ printf("connection ended\n"); } /* 受信した文字列を表示 */ /* 文字列が"finish"ならクライアントとの接続終了 */ FileString* fileString = NEW(FileString); fileString = recv_data; if (strcmp(fileString->str, "finish") == 0) { /* 接続終了を表す0を送信 */ send_buf = 0; send_size = write(tQueue->socket, &send_buf, 1); if (send_size == -1) { printf("send error\n"); } close(tQueue->buffer); goto whenEOF(...); } else { /* "finish"以外の場合はクライアントとの接続を継続 */ send_buf = 1; send_size = write(tQueue->socket, &send_buf, 1); if (send_size == -1) { printf("send error\n"); } } Gearef(context, TQueue)->data = recv_data; goto putLocalDGMQueue(recv_data, next); } __code sendDataLocalDGMQueue(struct RemoteDGMQueue* tQueue, struct FileString* string, __code next(...)){ }