changeset 1006:d3355697c87c

commit to pull
author ichikitakahiro <e165713@ie.u-ryukyu.ac.jp>
date Thu, 06 Jan 2022 22:53:24 +0900
parents 40817b3f91e2
children 111e313e883e
files src/parallel_execution/CMakeLists.txt src/parallel_execution/examples/socketQueue/LocalDGMQueue.cbc src/parallel_execution/examples/socketQueue/RemoteDGMQueue.cbc src/parallel_execution/examples/socketQueue/RemoteDGMQueue.h src/parallel_execution/examples/socketQueue/Remote_test.cbc src/parallel_execution/examples/socketQueue/TQueue.h
diffstat 6 files changed, 185 insertions(+), 117 deletions(-) [+]
line wrap: on
line diff
--- a/src/parallel_execution/CMakeLists.txt	Thu Dec 23 16:59:31 2021 +0900
+++ b/src/parallel_execution/CMakeLists.txt	Thu Jan 06 22:53:24 2022 +0900
@@ -263,4 +263,11 @@
   LDGM
   SOURCES
   examples/socketQueue/LocalDGMQueue.cbc  examples/socketQueue/Local_test.cbc AtomicReference.cbc SingleLinkedStack.cbc 
+)
+
+GearsCommand(
+  TARGET
+  socket_wc
+  SOURCES
+  examples/socketQueue/RemoteDGMQueue.cbc  examples/socketQueue/wordCount_Remote.cbc AtomicReference.cbc SingleLinkedStack.cbc
 )
\ No newline at end of file
--- a/src/parallel_execution/examples/socketQueue/LocalDGMQueue.cbc	Thu Dec 23 16:59:31 2021 +0900
+++ b/src/parallel_execution/examples/socketQueue/LocalDGMQueue.cbc	Thu Jan 06 22:53:24 2022 +0900
@@ -48,7 +48,7 @@
     hints.ai_socktype = SOCK_STREAM;
     hints.ai_family = PF_UNSPEC; //UNSPECはIPv4,6両方のうち使えるものを返す
     if((err = getaddrinfo(hostname, service, &hints, &res0)) != 0){
-        printf("error %d\n" , err);
+        printf("error %d : %s\n", err, gai_strerror(err));
         return 1;
     }
 
@@ -132,11 +132,11 @@
     struct Element* top = tQueue->top;
     struct Element* last = tQueue->last;
     struct Element* nextElement = top->next;
-    if (top != tQueue->top) {
+    if (top != tQueue->top) { //top!= Queue->top 割り込まれたらループ?
         goto takeLocalDGMQueue();
     }
-    if (top == last) {
-        if (nextElement != NULL) {
+    if (top == last) {    //top = lastつまり要素が一つの場合。
+        if (nextElement != NULL) {  //nextがヌルでなければ、Queueのラストをlastからelementに置き換える。
             struct Atomic* atomic = tQueue->atomic;
             goto atomic->checkAndSet(&tQueue->last, last, nextElement, takeLocalDGMQueue, takeLocalDGMQueue);
         }
@@ -216,8 +216,12 @@
     }
 
     FileString* string = NEW(FileString);
-    memcpy(string->str, recv_buf, sizeof(recv_buf));
+    memcpy(string->str, recv_buf, sizeof(string->str));
     Gearef(context, TQueue)->data = string;
 
     goto putLocalDGMQueue(string, next);
 }
+
+__code sendData(struct RemoteDGMQueue* tQueue, struct FileString* string, __code next(...)){
+}
+
--- a/src/parallel_execution/examples/socketQueue/RemoteDGMQueue.cbc	Thu Dec 23 16:59:31 2021 +0900
+++ b/src/parallel_execution/examples/socketQueue/RemoteDGMQueue.cbc	Thu Jan 06 22:53:24 2022 +0900
@@ -1,7 +1,9 @@
 #include "../../../context.h"
-#interface "Queue.h"
+#interface "TQueue.h"
 #interface "Atomic.h"
-#impl "Queue.h" as "RemoteDGMQueue.h"
+#interface "FileString.h"
+#interface "DataTransfer.h"
+#impl "TQueue.h" as "RemoteDGMQueue.h"
 
 #include <sys/socket.h>
 #include <netinet/in.h>
@@ -10,37 +12,34 @@
 #include <string.h>
 #include <unistd.h>
 #include <netdb.h>
-#define BUF_SIZR 1024 
-
-
+#define BUF_SIZE 1024 
 
 /*
  * Non-blocking queue of Paper: Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms(https://www.research.ibm.com/people/m/michael/podc-1996.pdf).
  */
 
-Queue* createRemoteDGMQueue(struct Context* context) {
-    struct Queue* queue = new Queue();
-    struct RemoteDGMQueue* remoteDGMQueue = new RemoteDGMQueue();
-    remoteDGMQueue->top = new Element(); // allocate a free node
-    remoteDGMQueue->top->next = NULL;
-    remoteDGMQueue->last = remoteDGMQueue->top;
-    remoteDGMQueue->atomic = createAtomicReference(context);
-    remoteDGMQueue->c_sock = createSocket();
-    remoteDGMQueue->w_sock = NULL; 
+TQueue* createRemoteDGMQueue(struct Context* context) {
+    struct TQueue* tQueue = new TQueue();
+    struct RemoteDGMQueue* RemoteDGMQueue = new RemoteDGMQueue();
+    RemoteDGMQueue->top = new Element(); // allocate a free node
+    RemoteDGMQueue->top->next = NULL;
+    RemoteDGMQueue->last = RemoteDGMQueue->top;
+    RemoteDGMQueue->atomic = createAtomicReference(context);
+    RemoteDGMQueue->socket = createSocket(); 
 
-    queue->queue = (union Data*)remoteDGMQueue;
-    queue->take  = C_takeRemoteDGMQueue;
-    queue->put  = C_putRemoteDGMQueue;
-    queue->isEmpty = C_isEmptyRemoteDGMQueue;
-    queue->clear = C_clearRemoteDGMQueue;
-    queue->getData = C_getDataRemoteDGMQueue;
-    return queue;
+    tQueue->tQueue = (union Data*)RemoteDGMQueue;
+    tQueue->take  = C_takeRemoteDGMQueue;
+    tQueue->put  = C_putRemoteDGMQueue;
+    tQueue->isEmpty = C_isEmptyRemoteDGMQueue;
+    tQueue->clear = C_clearRemoteDGMQueue;
+    tQueue->getData = C_getDataRemoteDGMQueue;
+    return tQueue;
 }
 
 int* createSocket(){
-    int w_addr, c_sock;
+    int sock;
     struct sockaddr_in a_addr;
-    char *hostname = "Localhost";
+    char *hostname = "localhost";
     char *service = "8080";
     struct addrinfo hints, *res0, *res;
     int err;
@@ -49,120 +48,111 @@
     hints.ai_socktype = SOCK_STREAM;
     hints.ai_family = PF_UNSPEC; //UNSPECはIPv4,6両方のうち使えるものを返す
     if((err = getaddrinfo(hostname, service, &hints, &res0)) != 0){
-        printf("error %d\n" , err);
+        printf("error %d : %s\n", err, gai_strerror(err));
         return 1;
     }
 
-    for (res=res0; res!=NULL; res=res->ai_next){
-        w_addr = socket(res->ai_family, res->ai_socktype, res->ai_protocol); //AF_INETは古い
+    for (res=res0; res!=NULL; res=res->ai_next) {
+        sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
+        if (sock < 0) {
+          continue;
+        }
+
+        if (connect(sock, res->ai_addr, res->ai_addrlen) != 0) {
+            close(sock);
+            continue;
+        }
         break;
     }
 
-    if (w_addr == -1) {
-        printf("socket error\n");
-        return -1;
-    }
-
-    struct in_addr {
-      u_int32_t s_addr;
-    };
-
-    memset(&a_addr, 0, sizeof(struct sockaddr_in));
-
-    a_addr.sin_family = PF_UNSPEC;
-    a_addr.sin_port = res->ai_addr;
-    a_addr.sin_addr.s_addr = res->ai_addrlen;
+    if (res == NULL) {
+    /* 有効な接続が出来なかった */
+       printf("failed\n");
 
-    if (bind(w_addr, res->ai_addr, res->ai_addrlen) == -1) {
-        printf("bind error\n");
-        close(w_addr);
-        return -1;
+       return 1;
     }
+    
+    freeaddrinfo(res0);
 
-    if (listen(w_addr, 3) == -1) {
-        printf("listen error\n");
-        close(w_addr);
-        return -1;
-    }   
-
-
-    printf("Waiting connect...\n");
-    c_sock = accept(w_addr, NULL, NULL);
-    if (c_sock == -1) {
-        printf("accept error\n");
-        close(w_addr);
-        return -1;
-    }
     printf("Connected!!\n");
-    return c_sock;
+    return sock;
 }
 
 
-__code clearRemoteDGMQueue(struct RemoteDGMQueue* queue, __code next(...)) {
-    struct Element* top = queue->top;
-    struct Atomic* atomic = queue->atomic;
-    goto atomic->checkAndSet(&queue->top, top, NULL, next(...), clearRemoteDGMQueue);
+
+
+__code clearRemoteDGMQueue(struct RemoteDGMQueue* tQueue, __code next(...)) {
+    struct Element* top = tQueue->top;
+    struct Atomic* atomic = tQueue->atomic;
+    goto atomic->checkAndSet(&tQueue->top, top, NULL, next(...), clearRemoteDGMQueue);
 }
 
-__code putRemoteDGMQueue(struct RemoteDGMQueue* queue, union Data* data, __code next(...)) {
+__code getTrance(struct RemoteDGMQueue* tQueue, union Data* data, __code next(...)){
+    printf("get");
+    goto next(...);
+}
+
+__code putRemoteDGMQueue(struct RemoteDGMQueue* tQueue, union Data* data, __code next(...)) {
     Element* element = new Element();
     element->data = data;
     element->next = NULL;
-    Element* last = queue->last;
+    Element* last = tQueue->last;
     Element* nextElement = last->next;
-    if (last != queue->last) {
+    if (last != tQueue->last) {
         goto putRemoteDGMQueue();
     }
     if (nextElement == NULL) {
-        struct Atomic* atomic = queue->atomic;
-        goto atomic->checkAndSet(&last->next, nextElement, element, next(...), putRemoteDGMQueue);
+        struct Atomic* atomic = tQueue->atomic;
+        Gearef(context, TQueue)->data = element->data;
+        goto atomic->checkAndSet(&last->next, nextElement, element, sendDataRemoteDGMQueue, putRemoteDGMQueue); //書き換え
     } else {
-        struct Atomic* atomic = queue->atomic;
-        goto atomic->checkAndSet(&queue->last, last, nextElement, putRemoteDGMQueue, putRemoteDGMQueue);
+        struct Atomic* atomic = tQueue->atomic;
+        goto atomic->checkAndSet(&tQueue->last, last, nextElement, putRemoteDGMQueue, putRemoteDGMQueue);
     }
 }
 
 
-__code takeRemoteDGMQueue(struct RemoteDGMQueue* queue, __code next(union Data* data, ...)) {
-    struct Element* top = queue->top;
-    struct Element* last = queue->last;
+__code takeRemoteDGMQueue(struct RemoteDGMQueue* tQueue, __code next(union Data* data, ...)) {
+    struct Element* top = tQueue->top;
+    struct Element* last = tQueue->last;
     struct Element* nextElement = top->next;
-    if (top != queue->top) {
+    if (top != tQueue->top) {
         goto takeRemoteDGMQueue();
     }
     if (top == last) {
         if (nextElement != NULL) {
-            struct Atomic* atomic = queue->atomic;
-            goto atomic->checkAndSet(&queue->last, last, nextElement, takeRemoteDGMQueue, takeRemoteDGMQueue);
+            struct Atomic* atomic = tQueue->atomic;
+            goto atomic->checkAndSet(&tQueue->last, last, nextElement, takeRemoteDGMQueue, takeRemoteDGMQueue);
         }
     } else {
-        struct Atomic* atomic = queue->atomic;
-        goto atomic->checkAndSet(&queue->top, top, nextElement, takeRemoteDGMQueue1, takeRemoteDGMQueue);
+        struct Atomic* atomic = tQueue->atomic;
+        goto atomic->checkAndSet(&tQueue->top, top, nextElement, takeRemoteDGMQueue1, takeRemoteDGMQueue);
     }
     goto takeRemoteDGMQueue();
 }
 
-__code takeRemoteDGMQueue1(struct RemoteDGMQueue* queue, __code next(union Data* data, ...), struct Element* nextElement) {
+__code takeRemoteDGMQueue1(struct RemoteDGMQueue* tQueue, __code next(union Data* data, ...), struct Element* nextElement) {
     data = nextElement->data;
+    Gearef(context, TQueue)->data = data;
     goto next(data, ...);
 }
 
 __code takeRemoteDGMQueue1_stub(struct Context* context) {
-	RemoteDGMQueue* queue = (RemoteDGMQueue*)GearImpl(context, Queue, queue);
-	enum Code next = Gearef(context, Queue)->next;
-	Data** O_data = &Gearef(context, Queue)->data;
+	RemoteDGMQueue* tQueue = (RemoteDGMQueue*)GearImpl(context, TQueue, tQueue);
+	enum Code next = Gearef(context, TQueue)->next;
+	Data** O_data = &Gearef(context, TQueue)->data;
 	goto takeRemoteDGMQueue1(context,
-                                queue,
+                                tQueue,
                                 next,
                                 O_data,
                                 (struct Element*)Gearef(context, Atomic)->newData);
 }
 
-__code isEmptyRemoteDGMQueue(struct RemoteDGMQueue* queue, __code next(...), __code whenEmpty(...)) {
-    struct Element* top = queue->top;
-    struct Element* last = queue->last;
+__code isEmptyRemoteDGMQueue(struct RemoteDGMQueue* tQueue, __code next(...), __code whenEmpty(...)) {
+    struct Element* top = tQueue->top;
+    struct Element* last = tQueue->last;
     struct Element* nextElement = top->next;
-    if (top != queue->top) {
+    if (top != tQueue->top) {
         goto isEmptyRemoteDGMQueue();
     }
     if (top == last && nextElement == NULL) {
@@ -171,21 +161,54 @@
     goto next(...);
 }
 
+__code sendData(struct RemoteDGMQueue* tQueue, FileString* string, __code next(...)){
+    char send_buf[BUF_SIZE], recv_buf;
+    int send_size, recv_size;
 
-__code connectToLocal(struct RemoteDGMQueue* queue, __code next(...)){
-        printf("Waiting connect...\n");
-        queue->c_sock = accept(queue->w_sock, NULL, NULL);
-        if (queue->c_sock == -1) {
-            printf("accept error\n");
-            close(queue->w_sock);
-            return -1;
-        }
-        printf("Connected!!\n");
-        close(queue->w_sock);
+
+    /* サーバーに送る文字列を取得 */
+    memcpy(send_buf, string->str, sizeof(send_buf));
+
+    /* 文字列を送信 */
+    send_size = send(tQueue->socket, send_buf, strlen(send_buf) + 1, 0);
+    if (send_size == -1) {
+        printf("send error\n");
+        close(tQueue->socket);
+        goto exit_code();
+    }
 
+        /* サーバーからの応答を受信 */
+    recv_size = recv(tQueue->socket, &recv_buf, 1, 0);
+    if (recv_size == -1) {
+        printf("recv error\n");
+        close(tQueue->socket);
+        goto exit_code();
+    }
+
+    if (recv_size == 0) {
+        /* 受信サイズが0の場合は相手が接続閉じていると判断 */
+        printf("connection ended\n");
+        close(tQueue->socket);
+        goto exit_code(); 
+    }
+
+        /* 応答が0の場合はデータ送信終了 */
+    if (recv_buf == 0) {
+        printf("Finish connection\n");
+        close(tQueue->socket);
+        goto exit_code();
+    }
     goto next(...);
 }
 
-__code connectToRemote(struct RemoteDGMQueue* queue, __code next(...)){
-    
-}
\ No newline at end of file
+
+__code sendDataRemoteDGMQueue_stub(struct Context* context) {
+	RemoteDGMQueue* tQueue = (RemoteDGMQueue*)GearImpl(context, TQueue, tQueue);
+	FileString* string = Gearef(context, TQueue)->data;
+	enum Code next = Gearef(context, TQueue)->next;
+	goto sendDataRemoteDGMQueue(context, tQueue, string, next);
+}
+
+__code getData(struct RemoteDGMQueue* tQueue, __code next(...)){
+
+}
--- a/src/parallel_execution/examples/socketQueue/RemoteDGMQueue.h	Thu Dec 23 16:59:31 2021 +0900
+++ b/src/parallel_execution/examples/socketQueue/RemoteDGMQueue.h	Thu Jan 06 22:53:24 2022 +0900
@@ -1,7 +1,11 @@
-typedef struct RemoteDGMQueue <> impl Queue {
+typedef struct RemoteDGMQueue <> impl TQueue {
   struct Element* top;
   struct Element* last;
   struct Atomic* atomic;
-  int* w_sock;
-  int* c_sock;
+  
+  int* socket;
+  char* buffer;
+  char* send_buf;
+
 } RemoteDGMQueue;
+
--- a/src/parallel_execution/examples/socketQueue/Remote_test.cbc	Thu Dec 23 16:59:31 2021 +0900
+++ b/src/parallel_execution/examples/socketQueue/Remote_test.cbc	Thu Jan 06 22:53:24 2022 +0900
@@ -1,28 +1,56 @@
 #include "../../../context.h"
-#interface "Queue.h"
+#interface "TQueue.h"
 #interface "Integer.h"
-
+#interface "FileString.h"
+#interface "DataTransfer.h"
 
-__code Task1(Queue* remoteDGMQueue){ 
-    Integer* integer = NEW(Integer);
-    integer->value = "22";   
-    goto remoteDGMQueue->put(integer, Task2); 
+__code Task1(TQueue* remoteDGMQueue){ 
+    FileString* string = NEW(FileString);
+    char word[1024] = "first";
+    memcpy(string->str, word, sizeof(word));
+    goto remoteDGMQueue->put(string, Task2); 
 }
 
 __code Task1_stub(struct Context* context){
-    Queue* remoteDGMQueue = createRemoteDGMQueue(context);
+    TQueue* remoteDGMQueue = createRemoteDGMQueue(context);
     goto Task1(context, remoteDGMQueue);
 }
 
-__code Task2(Queue* remoteDGMQueue){
+__code Task2(TQueue* remoteDGMQueue){
     printf("Task2\n");
+    FileString* string = NEW(FileString);
+    char word[1024] = "secound";
+    memcpy(string->str, word, sizeof(word));
+    goto remoteDGMQueue->put(string, Task3); 
 }
 
 __code Task2_stub(struct Context* context){
-    Queue* remoteDGMQueue = (struct Queue*)Gearef(context, Queue)->queue;
+    TQueue* remoteDGMQueue = (struct TQueue*)Gearef(context, TQueue)->tQueue;
     goto Task2(context, remoteDGMQueue);
 }
 
+__code Task3(TQueue* remoteDGMQueue){
+    printf("Task3\n");
+    FileString* string = NEW(FileString);
+    char word[1024] = "finish";
+    memcpy(string->str, word, sizeof(word));
+    goto remoteDGMQueue->put(string, Task4); ;
+}
+
+__code Task3_stub(struct Context* context){
+    TQueue* remoteDGMQueue = (struct TQueue*)Gearef(context, TQueue)->tQueue;
+    goto Task3(context, remoteDGMQueue);
+}
+
+__code Task4(TQueue* remoteDGMQueue){
+    goto exit_code();
+}
+
+__code Task4_stub(struct Context* context){
+    TQueue* remoteDGMQueue = (struct TQueue*)Gearef(context, TQueue)->tQueue;
+    goto Task4(context, remoteDGMQueue);
+}
+
 int main(int argc, char** argv) {
     printf("main\n");
     goto Task1();
--- a/src/parallel_execution/examples/socketQueue/TQueue.h	Thu Dec 23 16:59:31 2021 +0900
+++ b/src/parallel_execution/examples/socketQueue/TQueue.h	Thu Jan 06 22:53:24 2022 +0900
@@ -1,6 +1,7 @@
 typedef struct TQueue<>{
     union Data* tQueue;
     union Data* data;
+    struct FileString* string;
 
     __code whenEmpty(...);
     __code clear(Impl* tQueue, __code next(...));
@@ -8,6 +9,7 @@
     __code take(Impl* tQueue, __code next(union Data* data, ...));
     __code isEmpty(Impl* tQueue, __code next(...), __code whenEmpty(...));
 
+    __code sendData(Impl* tQueue, FileString* string, __code next(...));
     __code getData(Impl* tQueue, __code next(...));
     __code next(...);
 } TQueue;