changeset 1013:9ca9c36a4633

tweak LDGM
author ichikitakahiro <e165713@ie.u-ryukyu.ac.jp>
date Sat, 15 Jan 2022 16:13:59 +0900
parents a4e57a764cac
children a9c630cc1c65
files src/parallel_execution/examples/socketQueue/FileString.h src/parallel_execution/examples/socketQueue/LocalDGMQueue.cbc src/parallel_execution/examples/socketQueue/Local_test.cbc src/parallel_execution/examples/socketQueue/RemoteDGMQueue.cbc src/parallel_execution/examples/socketQueue/TQueue.h
diffstat 5 files changed, 26 insertions(+), 11 deletions(-) [+]
line wrap: on
line diff
--- a/src/parallel_execution/examples/socketQueue/FileString.h	Fri Jan 14 03:33:29 2022 +0900
+++ b/src/parallel_execution/examples/socketQueue/FileString.h	Sat Jan 15 16:13:59 2022 +0900
@@ -1,4 +1,4 @@
 typedef struct FileString <> {
   char str[1024];
-  int size;
+  int* size;
 } FileString;
--- a/src/parallel_execution/examples/socketQueue/LocalDGMQueue.cbc	Fri Jan 14 03:33:29 2022 +0900
+++ b/src/parallel_execution/examples/socketQueue/LocalDGMQueue.cbc	Sat Jan 15 16:13:59 2022 +0900
@@ -120,7 +120,7 @@
     }
     if (nextElement == NULL) {
         struct Atomic* atomic = tQueue->atomic;
-        goto atomic->checkAndSet(&last->next, nextElement, element, getDataLocalDGMQueue, putLocalDGMQueue); //書き換え
+        goto atomic->checkAndSet(&last->next, nextElement, element, next(...), putLocalDGMQueue); //書き換え
     } else {
         struct Atomic* atomic = tQueue->atomic;
         goto atomic->checkAndSet(&tQueue->last, last, nextElement, putLocalDGMQueue, putLocalDGMQueue);
@@ -177,13 +177,13 @@
     goto next(...);
 }
 
-__code getData(struct LocalDGMQueue* tQueue, __code next(...)){
+__code getData(struct LocalDGMQueue* tQueue, __code next(...), __code whenEOF(...)){
     int recv_size, send_size;
     char recv_buf[BUF_SIZE], send_buf;
 
     /* クライアントから文字列を受信 */
     FileString* fileString = NEW(FileString);
-    recv(tQueue->socket, fileString, sizeof(struct FileString), 0);
+    recv_size = recv(tQueue->socket, fileString, sizeof(struct FileString), 0);
     printf("[%s] [%d]\n", fileString->str, fileString->size);
     if (recv_size == -1) {
         printf("recv error\n");
@@ -206,7 +206,7 @@
             printf("send error\n");
         }
         close(tQueue->buffer);
-        goto next(...);
+        goto whenEOF(...);
     } else {
         /* "finish"以外の場合はクライアントとの接続を継続 */
         send_buf = 1;
--- a/src/parallel_execution/examples/socketQueue/Local_test.cbc	Fri Jan 14 03:33:29 2022 +0900
+++ b/src/parallel_execution/examples/socketQueue/Local_test.cbc	Sat Jan 15 16:13:59 2022 +0900
@@ -6,16 +6,24 @@
 char* service_num;
 
 __code Task1(TQueue* localDGMQueue){ 
-    Integer* integer = NEW(Integer);
-    integer->value = "22"; 
-    goto localDGMQueue->getData(Task2); 
+    goto gData();
 }
 
 __code Task1_stub(struct Context* context){
     TQueue* localDGMQueue = createLocalDGMQueue(context, service_num);
+    Gearef(context, TQueue)->tQueue = localDGMQueue;
     goto Task1(context, localDGMQueue);
 }
 
+__code gData(TQueue* localDGMQueue){
+    goto localDGMQueue->getData(Task2, getEOF); 
+}
+
+__code gData_stub(struct Context* context){
+    TQueue* localDGMQueue = (struct TQueue*)Gearef(context, TQueue)->tQueue;
+    goto gData(context, localDGMQueue);
+}
+
 __code Task2(TQueue* localDGMQueue){
     goto localDGMQueue->take(Task3);
 }
@@ -28,7 +36,8 @@
 
 __code Task3(TQueue* localDGMQueue, FileString* string){
     printf("take[%s] [num:%d]\n", string->str, string->size);
-    goto localDGMQueue->isEmpty(noEmp, whenEmp);
+    goto gData();
+    //goto localDGMQueue->isEmpty(noEmp, whenEmp);
 }
 
 __code Task3_stub(struct Context* context){
@@ -46,6 +55,11 @@
     goto Task2();
 }
 
+__code getEOF(TQueue* localDGMQueue){
+    printf("EOF & end\n");
+    goto exit_code();
+}
+
 void init(int argc, char** argv) {
     for (int i = 1; argv[i]; ++i) {
         if (strcmp(argv[i], "-sn") == 0){
--- a/src/parallel_execution/examples/socketQueue/RemoteDGMQueue.cbc	Fri Jan 14 03:33:29 2022 +0900
+++ b/src/parallel_execution/examples/socketQueue/RemoteDGMQueue.cbc	Sat Jan 15 16:13:59 2022 +0900
@@ -209,6 +209,6 @@
 	goto sendDataRemoteDGMQueue(context, tQueue, string, next);
 }
 
-__code getData(struct RemoteDGMQueue* tQueue, __code next(...)){
+__code getData(struct RemoteDGMQueue* tQueue, __code next(...), __code whenEOF(...)){
 
 }
--- a/src/parallel_execution/examples/socketQueue/TQueue.h	Fri Jan 14 03:33:29 2022 +0900
+++ b/src/parallel_execution/examples/socketQueue/TQueue.h	Sat Jan 15 16:13:59 2022 +0900
@@ -4,12 +4,13 @@
     struct FileString* string;
 
     __code whenEmpty(...);
+    __code whenEOF(...);
     __code clear(Impl* tQueue, __code next(...));
     __code put(Impl* tQueue, union Data* data, __code next(...));
     __code take(Impl* tQueue, __code next(union Data* data, ...));
     __code isEmpty(Impl* tQueue, __code next(...), __code whenEmpty(...));
 
     __code sendData(Impl* tQueue, FileString* string, __code next(...));
-    __code getData(Impl* tQueue, __code next(...));
+    __code getData(Impl* tQueue, __code next(...), __code whenEOF(...));
     __code next(...);
 } TQueue;