view src/parallel_execution/examples/socketQueue/LocalDGMQueue.cbc @ 1032:793b21a8ea12

fix include
author Shinji KONO <kono@ie.u-ryukyu.ac.jp>
date Tue, 24 Oct 2023 12:25:40 +0900
parents 635ccc391642
children
line wrap: on
line source

#interface "TQueue.h"
#interface "Atomic.h"
#interface "FileString.h"
#interface "DataTransfer.h"
#impl "TQueue.h" as "LocalDGMQueue.h"

#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <netdb.h>
#define BUF_SIZE 1024 

/*
 * Non-blocking queue of Paper: Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms(https://www.research.ibm.com/people/m/michael/podc-1996.pdf).
 */

TQueue* createLocalDGMQueue(struct Context* context, char* sNum) {
    struct TQueue* tQueue = new TQueue();
    struct LocalDGMQueue* LocalDGMQueue = new LocalDGMQueue();
    LocalDGMQueue->top = new Element(); // allocate a free node
    LocalDGMQueue->top->next = NULL;
    LocalDGMQueue->last = LocalDGMQueue->top;
    LocalDGMQueue->atomic = createAtomicReference(context);
    LocalDGMQueue->socket = createSocketLocalDGMQueue(sNum); 

    tQueue->tQueue = (union Data*)LocalDGMQueue;
    tQueue->take  = C_takeLocalDGMQueue;
    tQueue->put  = C_putLocalDGMQueue;
    tQueue->isEmpty = C_isEmptyLocalDGMQueue;
    tQueue->clear = C_clearLocalDGMQueue;
    tQueue->getData = C_getDataLocalDGMQueue;
    return tQueue;
}

int* createSocketLocalDGMQueue(char* sNum){
    int w_addr, c_sock;
    struct sockaddr_in a_addr;
    char *hostname = "Localhost";
    char *service = sNum;
    struct addrinfo hints, *res0, *res;
    int err;

    memset(&hints, 0, sizeof(hints));
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_family = PF_UNSPEC; //UNSPECはIPv4,6両方のうち使えるものを返す
    if((err = getaddrinfo(hostname, service, &hints, &res0)) != 0){
        printf("error %d : %s\n", err, gai_strerror(err));
        return 1;
    }

    for (res=res0; res!=NULL; res=res->ai_next){
        w_addr = socket(res->ai_family, res->ai_socktype, res->ai_protocol); //AF_INETは古い
        break;
    }

    if (w_addr == -1) {
        printf("socket error\n");
        return -1;
    }

    struct in_addr {
      u_int32_t s_addr;
    };

    memset(&a_addr, 0, sizeof(struct sockaddr_in));

    a_addr.sin_family = PF_UNSPEC;
    a_addr.sin_port = res->ai_addr;
    a_addr.sin_addr.s_addr = res->ai_addrlen;

    if (bind(w_addr, res->ai_addr, res->ai_addrlen) == -1) {
        printf("bind error\n");
        close(w_addr);
        return -1;
    }

    if (listen(w_addr, 3) == -1) {
        printf("listen error\n");
        close(w_addr);
        return -1;
    }   

    printf("Waiting connect...\n");
    c_sock = accept(w_addr, NULL, NULL);
    if (c_sock == -1) {
        printf("accept error\n");
        close(w_addr);
        return -1;
    }
    printf("Connected!!\n");
    return c_sock;
}




__code clearLocalDGMQueue(struct LocalDGMQueue* tQueue, __code next(...)) {
    struct Element* top = tQueue->top;
    struct Atomic* atomic = tQueue->atomic;
    goto atomic->checkAndSet(&tQueue->top, top, NULL, next(...), clearLocalDGMQueue);
}

__code getTranceLocalDGMQueue(struct LocalDGMQueue* tQueue, union Data* data, __code next(...)){
    printf("get");
    goto next(...);
}

__code putLocalDGMQueue(struct LocalDGMQueue* tQueue, union Data* data, __code next(...)) {
    Element* element = new Element();
    element->data = data;
    element->next = NULL;
    Element* last = tQueue->last;
    Element* nextElement = last->next;
    if (last != tQueue->last) {
        goto putLocalDGMQueue();
    }
    if (nextElement == NULL) {
        struct Atomic* atomic = tQueue->atomic;
        goto atomic->checkAndSet(&last->next, nextElement, element, next(...), putLocalDGMQueue); //書き換え
    } else {
        struct Atomic* atomic = tQueue->atomic;
        goto atomic->checkAndSet(&tQueue->last, last, nextElement, putLocalDGMQueue, putLocalDGMQueue);
    }
}


__code takeLocalDGMQueue(struct LocalDGMQueue* tQueue, __code next(union Data* data, ...)) {
    struct Element* top = tQueue->top;
    struct Element* last = tQueue->last;
    struct Element* nextElement = top->next;
    if (top != tQueue->top) { //top!= Queue->top 割り込まれたらループ?
        goto takeLocalDGMQueue();
    }
    if (top == last) {    //top = lastつまり要素が一つの場合。
        if (nextElement != NULL) {  //nextがヌルでなければ、Queueのラストをlastからelementに置き換える。
            struct Atomic* atomic = tQueue->atomic;
            goto atomic->checkAndSet(&tQueue->last, last, nextElement, takeLocalDGMQueue, takeLocalDGMQueue);
        }
    } else {
        struct Atomic* atomic = tQueue->atomic;
        goto atomic->checkAndSet(&tQueue->top, top, nextElement, takeLocalDGMQueue1, takeLocalDGMQueue);
    }
    goto takeLocalDGMQueue();
}

__code takeLocalDGMQueue1(struct LocalDGMQueue* tQueue, __code next(union Data* data, ...), struct Element* nextElement) {
    data = nextElement->data;
    Gearef(context, TQueue)->data = data;
    goto next(data, ...);
}

__code takeLocalDGMQueue1_stub(struct Context* context) {
	LocalDGMQueue* tQueue = (LocalDGMQueue*)GearImpl(context, TQueue, tQueue);
	enum Code next = Gearef(context, TQueue)->next;
	Data** O_data = &Gearef(context, TQueue)->data;
	goto takeLocalDGMQueue1(context,
                                tQueue,
                                next,
                                O_data,
                                (struct Element*)Gearef(context, Atomic)->newData);
}

__code isEmptyLocalDGMQueue(struct LocalDGMQueue* tQueue, __code next(...), __code whenEmpty(...)) {
    struct Element* top = tQueue->top;
    struct Element* last = tQueue->last;
    struct Element* nextElement = top->next;
    if (top != tQueue->top) {
        goto isEmptyLocalDGMQueue();
    }
    if (top == last && nextElement == NULL) {
        goto whenEmpty(...);
    }
    goto next(...);
}

__code getDataLocalDGMQueue(struct LocalDGMQueue* tQueue, __code next(...), __code whenEOF(...)){
    int recv_size, send_size;
    char recv_buf[BUF_SIZE], send_buf;

    /* クライアントから文字列を受信 */
    union Data* recv_data;
    recv_size = read(tQueue->socket, recv_data, sizeof(union Data));
    //printf("[%s] [%d]\n", fileString->str, fileString->size);
    if (recv_size == -1) {
        printf("recv error\n");
        goto exit_code();
    }
    if (recv_size == 0) {
        /* 受信サイズが0の場合は相手が接続閉じていると判断 */
        printf("connection ended\n");
    }
        
    /* 受信した文字列を表示 */


    /* 文字列が"finish"ならクライアントとの接続終了 */

    FileString* fileString = NEW(FileString);
    fileString = recv_data;
    if (strcmp(fileString->str, "finish") == 0) {
       /* 接続終了を表す0を送信 */
        send_buf = 0;
        send_size = write(tQueue->socket, &send_buf, 1);
        if (send_size == -1) {
            printf("send error\n");
        }
        close(tQueue->buffer);
        goto whenEOF(...);
    } else {
        /* "finish"以外の場合はクライアントとの接続を継続 */
        send_buf = 1;
        send_size = write(tQueue->socket, &send_buf, 1);
        if (send_size == -1) {
            printf("send error\n");
        }
    }

    Gearef(context, TQueue)->data = recv_data;
    goto putLocalDGMQueue(recv_data, next);
}

__code sendDataLocalDGMQueue(struct RemoteDGMQueue* tQueue, struct FileString* string, __code next(...)){
}