changeset 1000:d8142d91bc71

tweak
author ichikitakahiro <e165713@ie.u-ryukyu.ac.jp>
date Thu, 23 Dec 2021 16:59:01 +0900
parents 705fad3712cd
children 40817b3f91e2
files src/parallel_execution/examples/socketQueue/DataTransfer.h src/parallel_execution/examples/socketQueue/DataTransferImpl.cbc src/parallel_execution/examples/socketQueue/DataTransferImpl.h src/parallel_execution/examples/socketQueue/FileString.h src/parallel_execution/examples/socketQueue/LocalDGMQueue.cbc src/parallel_execution/examples/socketQueue/LocalDGMQueue.h src/parallel_execution/examples/socketQueue/Local_test.cbc src/parallel_execution/examples/socketQueue/RemoteDGMQueue.cbc src/parallel_execution/examples/socketQueue/RemoteDGMQueue.h src/parallel_execution/examples/socketQueue/Remote_test.cbc src/parallel_execution/examples/socketQueue/TQueue.h
diffstat 11 files changed, 636 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/parallel_execution/examples/socketQueue/DataTransfer.h	Thu Dec 23 16:59:01 2021 +0900
@@ -0,0 +1,8 @@
+typedef struct DataTransfer<>{
+    union Data* dataTransfer;
+    struct Element* element;
+
+    __code getData(Impl* dataTransfer, __code next(...));
+    __code putData(Impl* dataTransfer, Type* element, __code next(...));
+    __code next(...);
+} DataTransfer;
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/parallel_execution/examples/socketQueue/DataTransferImpl.cbc	Thu Dec 23 16:59:01 2021 +0900
@@ -0,0 +1,103 @@
+#include "../../../context.h"
+#interface "DataTransfer.h"
+#impl "DataTransfer.h" as "DataTransfer.h"
+
+
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <netdb.h>
+#define BUF_SIZR 1024 
+
+
+// ----
+// typedef struct DataTransferImpl <> impl DataTransfer{
+//     int* socket;
+// } DataTransfer;
+// ----
+
+DataTransfer* createDataTransferImpl(struct Context* context) {
+    struct DataTransfer* dataTransfer  = new DataTransfer();
+    struct DataTransferImpl* data_transfer_impl = new DataTransferImpl();
+    dataTransfer->dataTransfer = (union Data*)data_transfer_impl;
+    dataTransfer->element = NULL;
+    data_transfer_impl->socket = createSocket();
+    dataTransfer->getData = C_getDataDataTransferImpl;
+    dataTransfer->putData = C_putDataDataTransferImpl;
+    return dataTransfer;
+}
+
+int* createSocket(){
+    int w_addr, c_sock;
+    struct sockaddr_in a_addr;
+    char *hostname = "Localhost";
+    char *service = "8080";
+    struct addrinfo hints, *res0, *res;
+    int err;
+
+    memset(&hints, 0, sizeof(hints));
+    hints.ai_socktype = SOCK_STREAM;
+    hints.ai_family = PF_UNSPEC; //UNSPECはIPv4,6両方のうち使えるものを返す
+    if((err = getaddrinfo(hostname, service, &hints, &res0)) != 0){
+        printf("error %d\n" , err);
+        return 1;
+    }
+
+    for (res=res0; res!=NULL; res=res->ai_next){
+        w_addr = socket(res->ai_family, res->ai_socktype, res->ai_protocol); //AF_INETは古い
+        break;
+    }
+
+    if (w_addr == -1) {
+        printf("socket error\n");
+        return -1;
+    }
+
+    struct in_addr {
+      u_int32_t s_addr;
+    };
+
+    memset(&a_addr, 0, sizeof(struct sockaddr_in));
+
+    a_addr.sin_family = PF_UNSPEC;
+    a_addr.sin_port = res->ai_addr;
+    a_addr.sin_addr.s_addr = res->ai_addrlen;
+
+    if (bind(w_addr, res->ai_addr, res->ai_addrlen) == -1) {
+        printf("bind error\n");
+        close(w_addr);
+        return -1;
+    }
+
+    if (listen(w_addr, 3) == -1) {
+        printf("listen error\n");
+        close(w_addr);
+        return -1;
+    }   
+
+
+    printf("Waiting connect...\n");
+    c_sock = accept(w_addr, NULL, NULL);
+    if (c_sock == -1) {
+        printf("accept error\n");
+        close(w_addr);
+        return -1;
+    }
+    printf("Connected!!\n");
+    return c_sock;
+}
+
+
+__code getData(struct DataTransferImpl* dataTransfer, __code next(...)) {
+    printf("getData");
+    goto next(...);
+}
+
+__code putData(struct DataTransferImpl* dataTransfer, struct DataTransfer* element, __code next(...)) {
+
+    goto next(...);
+}
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/parallel_execution/examples/socketQueue/DataTransferImpl.h	Thu Dec 23 16:59:01 2021 +0900
@@ -0,0 +1,3 @@
+typedef struct DataTransferImpl <> impl DataTransfer{
+    int* socket;
+} DataTransferImpl;
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/parallel_execution/examples/socketQueue/FileString.h	Thu Dec 23 16:59:01 2021 +0900
@@ -0,0 +1,4 @@
+typedef struct FileString <> {
+  char str[1024];
+  int size;
+} FileString;
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/parallel_execution/examples/socketQueue/LocalDGMQueue.cbc	Thu Dec 23 16:59:01 2021 +0900
@@ -0,0 +1,223 @@
+#include "../../../context.h"
+#interface "TQueue.h"
+#interface "Atomic.h"
+#interface "FileString.h"
+#interface "DataTransfer.h"
+#impl "TQueue.h" as "LocalDGMQueue.h"
+
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <netdb.h>
+#define BUF_SIZE 1024 
+
+/*
+ * Non-blocking queue of Paper: Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms(https://www.research.ibm.com/people/m/michael/podc-1996.pdf).
+ */
+
+TQueue* createLocalDGMQueue(struct Context* context) {
+    struct TQueue* tQueue = new TQueue();
+    struct LocalDGMQueue* LocalDGMQueue = new LocalDGMQueue();
+    LocalDGMQueue->top = new Element(); // allocate a free node
+    LocalDGMQueue->top->next = NULL;
+    LocalDGMQueue->last = LocalDGMQueue->top;
+    LocalDGMQueue->atomic = createAtomicReference(context);
+    LocalDGMQueue->socket = createSocket(); 
+
+    tQueue->tQueue = (union Data*)LocalDGMQueue;
+    tQueue->take  = C_takeLocalDGMQueue;
+    tQueue->put  = C_putLocalDGMQueue;
+    tQueue->isEmpty = C_isEmptyLocalDGMQueue;
+    tQueue->clear = C_clearLocalDGMQueue;
+    tQueue->getData = C_getDataLocalDGMQueue;
+    return tQueue;
+}
+
+int* createSocket(){
+    int w_addr, c_sock;
+    struct sockaddr_in a_addr;
+    char *hostname = "Localhost";
+    char *service = "8080";
+    struct addrinfo hints, *res0, *res;
+    int err;
+
+    memset(&hints, 0, sizeof(hints));
+    hints.ai_socktype = SOCK_STREAM;
+    hints.ai_family = PF_UNSPEC; //UNSPECはIPv4,6両方のうち使えるものを返す
+    if((err = getaddrinfo(hostname, service, &hints, &res0)) != 0){
+        printf("error %d\n" , err);
+        return 1;
+    }
+
+    for (res=res0; res!=NULL; res=res->ai_next){
+        w_addr = socket(res->ai_family, res->ai_socktype, res->ai_protocol); //AF_INETは古い
+        break;
+    }
+
+    if (w_addr == -1) {
+        printf("socket error\n");
+        return -1;
+    }
+
+    struct in_addr {
+      u_int32_t s_addr;
+    };
+
+    memset(&a_addr, 0, sizeof(struct sockaddr_in));
+
+    a_addr.sin_family = PF_UNSPEC;
+    a_addr.sin_port = res->ai_addr;
+    a_addr.sin_addr.s_addr = res->ai_addrlen;
+
+    if (bind(w_addr, res->ai_addr, res->ai_addrlen) == -1) {
+        printf("bind error\n");
+        close(w_addr);
+        return -1;
+    }
+
+    if (listen(w_addr, 3) == -1) {
+        printf("listen error\n");
+        close(w_addr);
+        return -1;
+    }   
+
+    printf("Waiting connect...\n");
+    c_sock = accept(w_addr, NULL, NULL);
+    if (c_sock == -1) {
+        printf("accept error\n");
+        close(w_addr);
+        return -1;
+    }
+    printf("Connected!!\n");
+    return c_sock;
+}
+
+
+
+
+__code clearLocalDGMQueue(struct LocalDGMQueue* tQueue, __code next(...)) {
+    struct Element* top = tQueue->top;
+    struct Atomic* atomic = tQueue->atomic;
+    goto atomic->checkAndSet(&tQueue->top, top, NULL, next(...), clearLocalDGMQueue);
+}
+
+__code getTrance(struct LocalDGMQueue* tQueue, union Data* data, __code next(...)){
+    printf("get");
+    goto next(...);
+}
+
+__code putLocalDGMQueue(struct LocalDGMQueue* tQueue, union Data* data, __code next(...)) {
+    Element* element = new Element();
+    element->data = data;
+    element->next = NULL;
+    Element* last = tQueue->last;
+    Element* nextElement = last->next;
+    if (last != tQueue->last) {
+        goto putLocalDGMQueue();
+    }
+    if (nextElement == NULL) {
+        struct Atomic* atomic = tQueue->atomic;
+        goto atomic->checkAndSet(&last->next, nextElement, element, getDataLocalDGMQueue, putLocalDGMQueue); //書き換え
+    } else {
+        struct Atomic* atomic = tQueue->atomic;
+        goto atomic->checkAndSet(&tQueue->last, last, nextElement, putLocalDGMQueue, putLocalDGMQueue);
+    }
+}
+
+
+__code takeLocalDGMQueue(struct LocalDGMQueue* tQueue, __code next(union Data* data, ...)) {
+    struct Element* top = tQueue->top;
+    struct Element* last = tQueue->last;
+    struct Element* nextElement = top->next;
+    if (top != tQueue->top) {
+        goto takeLocalDGMQueue();
+    }
+    if (top == last) {
+        if (nextElement != NULL) {
+            struct Atomic* atomic = tQueue->atomic;
+            goto atomic->checkAndSet(&tQueue->last, last, nextElement, takeLocalDGMQueue, takeLocalDGMQueue);
+        }
+    } else {
+        struct Atomic* atomic = tQueue->atomic;
+        goto atomic->checkAndSet(&tQueue->top, top, nextElement, takeLocalDGMQueue1, takeLocalDGMQueue);
+    }
+    goto takeLocalDGMQueue();
+}
+
+__code takeLocalDGMQueue1(struct LocalDGMQueue* tQueue, __code next(union Data* data, ...), struct Element* nextElement) {
+    data = nextElement->data;
+    Gearef(context, TQueue)->data = data;
+    goto next(data, ...);
+}
+
+__code takeLocalDGMQueue1_stub(struct Context* context) {
+	LocalDGMQueue* tQueue = (LocalDGMQueue*)GearImpl(context, TQueue, tQueue);
+	enum Code next = Gearef(context, TQueue)->next;
+	Data** O_data = &Gearef(context, TQueue)->data;
+	goto takeLocalDGMQueue1(context,
+                                tQueue,
+                                next,
+                                O_data,
+                                (struct Element*)Gearef(context, Atomic)->newData);
+}
+
+__code isEmptyLocalDGMQueue(struct LocalDGMQueue* tQueue, __code next(...), __code whenEmpty(...)) {
+    struct Element* top = tQueue->top;
+    struct Element* last = tQueue->last;
+    struct Element* nextElement = top->next;
+    if (top != tQueue->top) {
+        goto isEmptyLocalDGMQueue();
+    }
+    if (top == last && nextElement == NULL) {
+        goto whenEmpty(...);
+    }
+    goto next(...);
+}
+
+__code getData(struct LocalDGMQueue* tQueue, __code next(...)){
+    int recv_size, send_size;
+    char recv_buf[BUF_SIZE], send_buf;
+
+    /* クライアントから文字列を受信 */
+    recv_size = recv(tQueue->socket, recv_buf, BUF_SIZE, 0);
+    if (recv_size == -1) {
+        printf("recv error\n");
+        goto exit_code();
+    }
+    if (recv_size == 0) {
+        /* 受信サイズが0の場合は相手が接続閉じていると判断 */
+        printf("connection ended\n");
+    }
+        
+    /* 受信した文字列を表示 */
+    printf("%s\n", recv_buf);
+
+    /**/
+    /* 文字列が"finish"ならクライアントとの接続終了 */
+    if (strcmp(recv_buf, "finish") == 0) {
+        /* 接続終了を表す0を送信 */
+        send_buf = 0;
+        send_size = send(tQueue->socket, &send_buf, 1, 0);
+        if (send_size == -1) {
+            printf("send error\n");
+        }
+        close(tQueue->buffer);
+        goto next(...);
+    } else {
+        /* "finish"以外の場合はクライアントとの接続を継続 */
+        send_buf = 1;
+        send_size = send(tQueue->socket, &send_buf, 1, 0);
+        if (send_size == -1) {
+            printf("send error\n");
+        }
+    }
+
+    FileString* string = NEW(FileString);
+    memcpy(string->str, recv_buf, sizeof(recv_buf));
+    Gearef(context, TQueue)->data = string;
+
+    goto putLocalDGMQueue(string, next);
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/parallel_execution/examples/socketQueue/LocalDGMQueue.h	Thu Dec 23 16:59:01 2021 +0900
@@ -0,0 +1,11 @@
+typedef struct LocalDGMQueue <> impl TQueue {
+  struct Element* top;
+  struct Element* last;
+  struct Atomic* atomic;
+  
+  int* socket;
+  char* buffer;
+  char* send_buf;
+
+} LocalDGMQueue;
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/parallel_execution/examples/socketQueue/Local_test.cbc	Thu Dec 23 16:59:01 2021 +0900
@@ -0,0 +1,44 @@
+#include "../../../context.h"
+#interface "TQueue.h"
+#interface "Integer.h"
+#interface "FileString.h"
+#interface "DataTransfer.h"
+
+__code Task1(TQueue* localDGMQueue){ 
+    Integer* integer = NEW(Integer);
+    integer->value = "22"; 
+    goto localDGMQueue->getData(Task2); 
+}
+
+__code Task1_stub(struct Context* context){
+    TQueue* localDGMQueue = createLocalDGMQueue(context);
+    goto Task1(context, localDGMQueue);
+}
+
+__code Task2(TQueue* localDGMQueue){
+    printf("Task2\n");
+    goto localDGMQueue->take(Task3);
+}
+
+__code Task2_stub(struct Context* context){
+    TQueue* localDGMQueue = (struct TQueue*)Gearef(context, TQueue)->tQueue;
+    goto Task2(context, localDGMQueue);
+}
+
+
+__code Task3(TQueue* localDGMQueue, FileString* string){
+    printf("take[%s]\n", string->str);
+    printf("Task3_exit\n");
+    goto localDGMQueue->take(exit_code);
+}
+
+__code Task3_stub(struct Context* context){
+    TQueue* localDGMQueue = (struct TQueue*)Gearef(context, TQueue)->tQueue;
+    FileString* string = Gearef(context, TQueue)->data;
+    goto Task3(context, localDGMQueue, string);
+}
+
+int main(int argc, char** argv) {
+    printf("main\n");
+    goto Task1();
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/parallel_execution/examples/socketQueue/RemoteDGMQueue.cbc	Thu Dec 23 16:59:01 2021 +0900
@@ -0,0 +1,191 @@
+#include "../../../context.h"
+#interface "Queue.h"
+#interface "Atomic.h"
+#impl "Queue.h" as "RemoteDGMQueue.h"
+
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <netdb.h>
+#define BUF_SIZR 1024 
+
+
+
+/*
+ * Non-blocking queue of Paper: Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms(https://www.research.ibm.com/people/m/michael/podc-1996.pdf).
+ */
+
+Queue* createRemoteDGMQueue(struct Context* context) {
+    struct Queue* queue = new Queue();
+    struct RemoteDGMQueue* remoteDGMQueue = new RemoteDGMQueue();
+    remoteDGMQueue->top = new Element(); // allocate a free node
+    remoteDGMQueue->top->next = NULL;
+    remoteDGMQueue->last = remoteDGMQueue->top;
+    remoteDGMQueue->atomic = createAtomicReference(context);
+    remoteDGMQueue->c_sock = createSocket();
+    remoteDGMQueue->w_sock = NULL; 
+
+    queue->queue = (union Data*)remoteDGMQueue;
+    queue->take  = C_takeRemoteDGMQueue;
+    queue->put  = C_putRemoteDGMQueue;
+    queue->isEmpty = C_isEmptyRemoteDGMQueue;
+    queue->clear = C_clearRemoteDGMQueue;
+    queue->getData = C_getDataRemoteDGMQueue;
+    return queue;
+}
+
+int* createSocket(){
+    int w_addr, c_sock;
+    struct sockaddr_in a_addr;
+    char *hostname = "Localhost";
+    char *service = "8080";
+    struct addrinfo hints, *res0, *res;
+    int err;
+
+    memset(&hints, 0, sizeof(hints));
+    hints.ai_socktype = SOCK_STREAM;
+    hints.ai_family = PF_UNSPEC; //UNSPECはIPv4,6両方のうち使えるものを返す
+    if((err = getaddrinfo(hostname, service, &hints, &res0)) != 0){
+        printf("error %d\n" , err);
+        return 1;
+    }
+
+    for (res=res0; res!=NULL; res=res->ai_next){
+        w_addr = socket(res->ai_family, res->ai_socktype, res->ai_protocol); //AF_INETは古い
+        break;
+    }
+
+    if (w_addr == -1) {
+        printf("socket error\n");
+        return -1;
+    }
+
+    struct in_addr {
+      u_int32_t s_addr;
+    };
+
+    memset(&a_addr, 0, sizeof(struct sockaddr_in));
+
+    a_addr.sin_family = PF_UNSPEC;
+    a_addr.sin_port = res->ai_addr;
+    a_addr.sin_addr.s_addr = res->ai_addrlen;
+
+    if (bind(w_addr, res->ai_addr, res->ai_addrlen) == -1) {
+        printf("bind error\n");
+        close(w_addr);
+        return -1;
+    }
+
+    if (listen(w_addr, 3) == -1) {
+        printf("listen error\n");
+        close(w_addr);
+        return -1;
+    }   
+
+
+    printf("Waiting connect...\n");
+    c_sock = accept(w_addr, NULL, NULL);
+    if (c_sock == -1) {
+        printf("accept error\n");
+        close(w_addr);
+        return -1;
+    }
+    printf("Connected!!\n");
+    return c_sock;
+}
+
+
+__code clearRemoteDGMQueue(struct RemoteDGMQueue* queue, __code next(...)) {
+    struct Element* top = queue->top;
+    struct Atomic* atomic = queue->atomic;
+    goto atomic->checkAndSet(&queue->top, top, NULL, next(...), clearRemoteDGMQueue);
+}
+
+__code putRemoteDGMQueue(struct RemoteDGMQueue* queue, union Data* data, __code next(...)) {
+    Element* element = new Element();
+    element->data = data;
+    element->next = NULL;
+    Element* last = queue->last;
+    Element* nextElement = last->next;
+    if (last != queue->last) {
+        goto putRemoteDGMQueue();
+    }
+    if (nextElement == NULL) {
+        struct Atomic* atomic = queue->atomic;
+        goto atomic->checkAndSet(&last->next, nextElement, element, next(...), putRemoteDGMQueue);
+    } else {
+        struct Atomic* atomic = queue->atomic;
+        goto atomic->checkAndSet(&queue->last, last, nextElement, putRemoteDGMQueue, putRemoteDGMQueue);
+    }
+}
+
+
+__code takeRemoteDGMQueue(struct RemoteDGMQueue* queue, __code next(union Data* data, ...)) {
+    struct Element* top = queue->top;
+    struct Element* last = queue->last;
+    struct Element* nextElement = top->next;
+    if (top != queue->top) {
+        goto takeRemoteDGMQueue();
+    }
+    if (top == last) {
+        if (nextElement != NULL) {
+            struct Atomic* atomic = queue->atomic;
+            goto atomic->checkAndSet(&queue->last, last, nextElement, takeRemoteDGMQueue, takeRemoteDGMQueue);
+        }
+    } else {
+        struct Atomic* atomic = queue->atomic;
+        goto atomic->checkAndSet(&queue->top, top, nextElement, takeRemoteDGMQueue1, takeRemoteDGMQueue);
+    }
+    goto takeRemoteDGMQueue();
+}
+
+__code takeRemoteDGMQueue1(struct RemoteDGMQueue* queue, __code next(union Data* data, ...), struct Element* nextElement) {
+    data = nextElement->data;
+    goto next(data, ...);
+}
+
+__code takeRemoteDGMQueue1_stub(struct Context* context) {
+	RemoteDGMQueue* queue = (RemoteDGMQueue*)GearImpl(context, Queue, queue);
+	enum Code next = Gearef(context, Queue)->next;
+	Data** O_data = &Gearef(context, Queue)->data;
+	goto takeRemoteDGMQueue1(context,
+                                queue,
+                                next,
+                                O_data,
+                                (struct Element*)Gearef(context, Atomic)->newData);
+}
+
+__code isEmptyRemoteDGMQueue(struct RemoteDGMQueue* queue, __code next(...), __code whenEmpty(...)) {
+    struct Element* top = queue->top;
+    struct Element* last = queue->last;
+    struct Element* nextElement = top->next;
+    if (top != queue->top) {
+        goto isEmptyRemoteDGMQueue();
+    }
+    if (top == last && nextElement == NULL) {
+        goto whenEmpty(...);
+    }
+    goto next(...);
+}
+
+
+__code connectToLocal(struct RemoteDGMQueue* queue, __code next(...)){
+        printf("Waiting connect...\n");
+        queue->c_sock = accept(queue->w_sock, NULL, NULL);
+        if (queue->c_sock == -1) {
+            printf("accept error\n");
+            close(queue->w_sock);
+            return -1;
+        }
+        printf("Connected!!\n");
+        close(queue->w_sock);
+
+    goto next(...);
+}
+
+__code connectToRemote(struct RemoteDGMQueue* queue, __code next(...)){
+    
+}
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/parallel_execution/examples/socketQueue/RemoteDGMQueue.h	Thu Dec 23 16:59:01 2021 +0900
@@ -0,0 +1,7 @@
+typedef struct RemoteDGMQueue <> impl Queue {
+  struct Element* top;
+  struct Element* last;
+  struct Atomic* atomic;
+  int* w_sock;
+  int* c_sock;
+} RemoteDGMQueue;
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/parallel_execution/examples/socketQueue/Remote_test.cbc	Thu Dec 23 16:59:01 2021 +0900
@@ -0,0 +1,29 @@
+#include "../../../context.h"
+#interface "Queue.h"
+#interface "Integer.h"
+
+
+__code Task1(Queue* remoteDGMQueue){ 
+    Integer* integer = NEW(Integer);
+    integer->value = "22";   
+    goto remoteDGMQueue->put(integer, Task2); 
+}
+
+__code Task1_stub(struct Context* context){
+    Queue* remoteDGMQueue = createRemoteDGMQueue(context);
+    goto Task1(context, remoteDGMQueue);
+}
+
+__code Task2(Queue* remoteDGMQueue){
+    printf("Task2\n");
+}
+
+__code Task2_stub(struct Context* context){
+    Queue* remoteDGMQueue = (struct Queue*)Gearef(context, Queue)->queue;
+    goto Task2(context, remoteDGMQueue);
+}
+
+int main(int argc, char** argv) {
+    printf("main\n");
+    goto Task1();
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/parallel_execution/examples/socketQueue/TQueue.h	Thu Dec 23 16:59:01 2021 +0900
@@ -0,0 +1,13 @@
+typedef struct TQueue<>{
+    union Data* tQueue;
+    union Data* data;
+
+    __code whenEmpty(...);
+    __code clear(Impl* tQueue, __code next(...));
+    __code put(Impl* tQueue, union Data* data, __code next(...));
+    __code take(Impl* tQueue, __code next(union Data* data, ...));
+    __code isEmpty(Impl* tQueue, __code next(...), __code whenEmpty(...));
+
+    __code getData(Impl* tQueue, __code next(...));
+    __code next(...);
+} TQueue;