view TaskManager/ManyCore/FileMapReduce.cc @ 2059:f3e748c0e7ea draft

change parameter in create_task_array
author masa
date Sat, 30 Jan 2016 19:06:17 +0900
parents ce57fa0495a7
children b70758c358dc
line wrap: on
line source

#include "FileMapReduce.h"

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/time.h>
#include <sys/mman.h>

/* ;TODO
 * PS3でCPU数が2以上の時に、あまりが計算されてない
 */

enum {
#include "SysTasks.h"
};

FileMapReduce::FileMapReduce(TaskManager *manager,int TASK_EXEC,int TASK_EXEC_DATA_PARALLEL,int TASK_PRINT) {
    all = 0;
    use_task_array = 1;
    use_task_creater = 0;
    use_compat = 0;
    use_iterate = 0;
    array_task_num = 11;
    spe_num = 1;
    read_type = MY_MMAP;
    t_exec_num = 4;
    spe_cpu = SPE_ANY;
    read_spe_cpu = IO_0;
    blocks = 48;
    division = 16;  // in KByte
    this->TASK_EXEC = TASK_EXEC;
    this->TASK_EXEC_DATA_PARALLEL = TASK_EXEC_DATA_PARALLEL;
    this->TASK_PRINT = TASK_PRINT;
    w = NULL;
    fmp_help_str = "[-a -c -s] [-cpu spe_num] [-g] [-file filename] [-br]\n";
}

FileMapReduce::~FileMapReduce() {

}

char*
FileMapReduce::init(int argc, char **argv)
{
    char *filename = 0;
    for (int i = 1; argv[i]; ++i) {
        if (strcmp(argv[i], "-file") == 0) {
            filename = argv[i+1]; i++;
        } else if (strcmp(argv[i], "-division") == 0) {
            division = atoi(argv[i+1]);
            i++;
        } else if (strcmp(argv[i], "-block") == 0) {
            blocks = atoi(argv[i+1]);
            i++;
        } else if (strcmp(argv[i], "-a") == 0) {
            // create task all at once
            all = 1;
        } else if (strcmp(argv[i], "-c") == 0) {
            use_task_array = 0;
            use_compat = 1;
        } else if (strcmp(argv[i], "-s") == 0) {
            use_task_array = 0;
            use_compat = 0;
        } else if (strcmp(argv[i], "-t") == 0) {
            use_task_creater = 1;
            use_task_array = 0;
            use_compat = 0;
        } else if (strcmp(argv[i], "-anum") == 0) {
            array_task_num = atoi(argv[i+1]);
            i++;
        } else if (strcmp(argv[i], "-g") == 0) {
            spe_cpu = GPU_0;
        } else if (strcmp(argv[i], "-any") == 0) {
            spe_cpu = ANY_ANY;
        } else if (strcmp(argv[i], "-i") == 0) {
            use_iterate = 1;
            use_task_array = 0;
            t_exec_num = 1;
        } else if (strcmp(argv[i], "-br") == 0) {
            read_type = BLOCKED_READ;
        } else if (strcmp(argv[i], "-r") == 0) {
            read_type = MY_READ;
        }
        /* else if (strcmp(argv[i], "-cpu") == 0) {
            spe_num = atoi(argv[i+1]);
            i++;
            if (spe_num==0) spe_num = 1;
            } else {
            fprintf(stderr,"%s\n",fmp_help_str);
            exit (0);
            }*/
    }
    if (filename==0) {
        printf("Usage: %s ",argv[0]);
        puts(fmp_help_str);
        exit(1);
    }

    MapReducePtr w = (MapReducePtr)malloc(sizeof(MapReduce));
    this->w = w;
    return filename;
}

/*与えられたsizeをfix_byte_sizeの倍数にする(丸め込むっていうのかな?)*/
static int
fix_byte(int size,int fix_byte_size)
{
    size = (size/fix_byte_size)*fix_byte_size  + ((size%fix_byte_size)!= 0)*fix_byte_size;

    return size;
}

void
FileMapReduce::my_read(const char *filename, MapReduce *w, TaskManager *manager)
{
    long fd = w->fd;
    long r_filesize = w->read_filesize;

    if ((fd=open(filename,O_RDONLY,0666))==0) {
        fprintf(stderr,"can't open %s\n",filename);
    }

    w->file_mmap = (char*)manager->allocate(w->read_filesize);

    long one_read_size = 1024 * 1024 * 1024;  // 1GB

    for (int i = 0; 0 < r_filesize; i++) {
        if (r_filesize > one_read_size) {
            pread(fd, w->file_mmap + i*one_read_size, one_read_size,i*one_read_size);
        }else if ((r_filesize < one_read_size) && (r_filesize != 0)) {
            pread(fd, w->file_mmap + i*one_read_size, r_filesize,i*one_read_size);
        }
        r_filesize -= one_read_size;
    }

    if (w->file_mmap == (caddr_t)-1) {
        fprintf(stderr,"Can't mmap file\n");
        perror(NULL);
        exit(0);
    }
}

void
FileMapReduce::my_mmap(const char *filename, MapReduce *w)
{
    /*マッピングだよ!*/
    typedef struct {
        caddr_t file_mmap;
        off_t size;
    } st_mmap_t;

    int map = MAP_PRIVATE;
    st_mmap_t st_mmap;
    struct stat sb;
    long fd = w->fd;

    if ((fd=open(filename,O_RDONLY,0666))==0) {
        fprintf(stderr,"can't open %s\n",filename);
    }

    if (fstat(fd,&sb)) {
        fprintf(stderr,"can't fstat %s\n",filename);
    }

    st_mmap.size = fix_byte(sb.st_size,4096);

    w->file_mmap = (char*)mmap(NULL,st_mmap.size,PROT_WRITE|PROT_READ,map,fd,(off_t)0);

    if (st_mmap.file_mmap == (caddr_t)-1) {
        fprintf(stderr,"Can't mmap file\n");
        perror(NULL);
        exit(0);
    }
}

void
run_tasks(SchedTask *manager, MapReduce *w, int task_count, HTaskPtr t_read, HTaskPtr t_next, int size)
{
    w->fmp->run_tasks(manager,w,task_count,t_read,t_next,size);
}

void
FileMapReduce::run_tasks(SchedTask *manager, MapReduce *w, int task_count, HTaskPtr t_read, HTaskPtr t_next, int size)
{
    if (task_count < array_task_num) {
        array_task_num = task_count;
        if (task_count<=0) return;
    }
    for (int i = 0; i < task_count; i += array_task_num) {
        HTask *task_array;
        if (use_task_array) {
            int task_num = (w->size+size-1)/size;
            if (task_num>array_task_num) task_num = array_task_num;
            task_array = manager->create_task_array(TASK_EXEC,task_num,5,1,1);
            if (t_read != 0) task_array->wait_for(t_read);
            if (!all) { 
                t_next->wait_for(task_array);
            } else {
                w->t_print->wait_for(task_array);
            }
        }

        Task *t_exec = 0;
        HTask *h_exec = 0;
        int out_size = w->division_out_size / sizeof(*w->o_data);
        for (int j = 0; j < array_task_num; j++) {
            int i = w->task_spawned++;
            if (w->size < size) size = w->size;
            if (size==0) break;
            if (use_task_array) {
                t_exec = task_array->next_task_array(TASK_EXEC,t_exec);
                t_exec->set_param(0,(long)0);
                t_exec->set_param(1,(long)0);
                t_exec->set_param(2,(long)size);
                t_exec->set_param(3,(long)w->division_out_size);
                t_exec->set_param(4,(long)w);
                t_exec->set_inData(0,w->file_mmap + i*w->division_size, size);
                t_exec->set_outData(0,w->o_data + i*out_size, w->division_out_size);
            } else if (use_compat) {
                h_exec = manager->create_task(TASK_EXEC);
                t_exec->set_param(0,(long)0);
                t_exec->set_param(1,(long)0);
                t_exec->set_param(2,(long)size);
                t_exec->set_param(3,(long)w->division_out_size);
                t_exec->set_param(4,(long)w);
                h_exec->set_inData(0,w->file_mmap + i*w->division_size, size);
                h_exec->set_outData(0,w->o_data + i*out_size, w->division_out_size);
                t_next->wait_for(h_exec);
                h_exec->set_cpu(spe_cpu);
                h_exec->spawn();
            } else if (use_iterate) {
                array_task_num = w->remain_task;
                use_iterate = 0;
                use_compat = 1;
                w->size -= size*array_task_num;
                if(w->size < 0) {
                    array_task_num -= 1;
                    w->size += size;
                }
                h_exec = manager->create_task(TASK_EXEC_DATA_PARALLEL);
                h_exec->flip();
                h_exec->set_inData(0,w->file_mmap,w->file_size);
                h_exec->set_inData(1,w->o_data,w->division_out_size*w->remain_task);
                h_exec->set_outData(0,w->file_mmap,w->file_size);
                h_exec->set_outData(1,w->o_data,w->division_out_size*w->remain_task);
                h_exec->set_param(0,(long)i);
                h_exec->set_param(1,(long)w->division_size);
                h_exec->set_param(2,(long)size);
                h_exec->set_param(3,(long)out_size);
                h_exec->set_param(4,(long)w);
                t_next->wait_for(h_exec);
                h_exec->set_cpu(spe_cpu);
                h_exec->iterate(array_task_num);
                w->remain_task -= array_task_num;
                w->task_spawned += array_task_num-1;
                break;
            } else {
                h_exec = manager->create_task(TASK_EXEC,
                                              (memaddr)(w->file_mmap + i*w->division_size), size,
                                              (memaddr)(w->o_data + i*out_size), w->division_out_size);
                t_exec->set_param(0,(long)0);
                t_exec->set_param(1,(long)0);
                t_exec->set_param(2,(long)size);
                t_exec->set_param(3,(long)w->division_out_size);
                t_exec->set_param(4,(long)w);
                t_next->wait_for(h_exec);
                h_exec->set_cpu(spe_cpu);
                h_exec->spawn();
            }
            w->size -= size;
            w->remain_task--;
        }
        if (use_task_array) {
            task_array->spawn_task_array(t_exec->next());
            task_array->set_cpu(spe_cpu);
            task_array->spawn();
        } else {
            //if (!all) t_next->wait_for(h_exec);
        }
    } 
}

/**
 *   このTaskは、PPE上で実行されるので、並列に実行されることはない
 *   二つ実行されていて、Task が足りなくなることがないようにしている。
 */

SchedDefineTask1(BREAD_RUN_TASK_BLOCKS,bread_run16);

static int
bread_run16(SchedTask *manager, void *in, void *out)
{
    MapReduce *w = *(MapReduce **)in;

    HTaskPtr t_read = manager->create_task(READ_TASK);
    w->t_print->wait_for(t_read);
    t_read->set_cpu(w->fmp->read_spe_cpu);
    t_read->set_param(0,w->fd);

    if (w->remain_task < w->task_blocks) {
        t_read->set_param(1,w->task_spawned*w->division_size);
        t_read->set_outData(0, w->file_mmap + w->task_spawned * w->division_size, w->task_blocks * w->division_size);

        // last case
        while (w->size >= w->division_size)
            run_tasks(manager,w,w->remain_task,t_read,w->t_print, w->division_size);
        // remaining data
        while (w->size>0)
            run_tasks(manager,w,1,t_read,w->t_print, w->division_size);

        t_read->set_param(2,w->task_spawned*w->division_size);
        t_read->spawn();
    } else {
        HTaskPtr t_next = manager->create_task(BREAD_RUN_TASK_BLOCKS,
                                               (memaddr)&w->self,sizeof(memaddr),0,0);
        w->t_print->wait_for(t_next);

        t_read->set_param(1,w->task_spawned*w->division_size);
        t_read->set_outData(0, w->file_mmap + w->task_spawned * w->division_size, w->task_blocks * w->division_size);

        run_tasks(manager,w, w->task_blocks, t_read, t_next, w->division_size);

        t_read->set_param(2,w->task_spawned*w->division_size);

        t_read->spawn();
        t_next->spawn();
    }
    return 0;
}

SchedDefineTask1(RUN_TASK_BLOCKS,run16);

static int
run16(SchedTask *manager, void *in, void *out)
{
    MapReduce *w = *(MapReduce **)in;

    if(w->fmp->use_iterate) {
        run_tasks(manager, w, w->remain_task, 0, w->t_print, w->division_size);
    } else if (w->remain_task < w->task_blocks) {
        // last case
        while (w->size >= w->division_size)
            run_tasks(manager,w,w->remain_task,0,w->t_print, w->division_size);
        // remaining data
        while (w->size>0)
            run_tasks(manager,w,1,0, w->t_print, w->size);
        // printf("run16 last %d\n",w->remain_task);
    } else {
        HTaskPtr t_next = manager->create_task(RUN_TASK_BLOCKS,
                                               (memaddr)&w->self,sizeof(memaddr),0,0);
        w->t_print->wait_for(t_next);

        run_tasks(manager,w, w->task_blocks,0, t_next, w->division_size);

        t_next->spawn();
        // printf("run16 next %d\n",w->remain_task);
    }
    return 0;
}

void
FileMapReduce::run_start(TaskManager *manager, const char *filename)
{
    long   fd = (long)manager->allocate(sizeof(long));
    struct stat *sb = (struct stat*)manager->allocate(sizeof(struct stat));
    HTaskPtr t_exec;

    if ((fd=open(filename,O_RDONLY,0666))==0) {
        fprintf(stderr,"can't open %s\n",filename);
        return ;
    }

    if (fstat(fd,sb)) {
        fprintf(stderr,"can't fstat %s\n",filename);
        return ;
    }

    w->self = w;
    w->fmp = this;
    w->fd = fd;
    w->read_filesize = sb->st_size;

    if (read_type == BLOCKED_READ) {
        printf("[blocked read mode]\n");
        w->file_mmap = (char*)manager->allocate(w->read_filesize);
    }else if (read_type == MY_READ) {
        printf("[single read mode]\n");
        my_read(filename, w, manager);
    }else if(read_type == MY_MMAP) {
        printf("[mmap mode]\n");
        my_mmap(filename, w);
    }else if(read_type == BLOCKED_MMAP) {
        printf("[blocked mmap mode]\n");
        my_mmap(filename, w);
    }

    HTaskPtr t_print;

    //w->task_blocks = blocks;
    w->self = w;
    w->task_spawned = 0;

    w->size = w->file_size = w->read_filesize;
    printf("w %lx\n",(long)w);

    /* 1task分のデータサイズ(byte) */
    if (w->size >= 1024*division) {
        w->division_size = 1024 * division;/*16kbyte*/
    } else {
        w->division_size = w->size;
    }

    printf("division_size %ld\n",w->division_size);

    /* taskの数 */
    w->task_num = w->size / w->division_size;
    w->task_num = w->task_num + (w->division_size*w->task_num < w->size);
    w->remain_task = w->task_num;

    if(!all) {
        w->task_blocks = blocks;
    } else {
        w->task_blocks = w->remain_task;
    }

    printf("task_num %ld\n",w->remain_task);

    /* out用のdivision_size. statusが2つなので、あわせて16byteになるように、long long(4byte)を使用 */

    w->division_out_size = division_out_size;
    int out_size = w->division_out_size*w->remain_task;
    w->o_data = (unsigned long long *)manager->allocate(out_size);
    printf("out size %d\n",out_size);

    /*各SPEの結果を合計して出力するタスク*/

    t_print = manager->create_task(TASK_PRINT,
                                   (memaddr)&w->self,sizeof(memaddr),0,0);
    w->t_print = t_print;
    for(int i=0;i<t_exec_num;i++) {
        /* Task を task_blocks ずつ起動する Task */
        /* serialize されていると仮定する... */
        if (read_type == BLOCKED_READ) {
            t_exec = manager->create_task(BREAD_RUN_TASK_BLOCKS,
                                                   (memaddr)&w->self,sizeof(memaddr),0,0);
        }else {
            t_exec = manager->create_task(RUN_TASK_BLOCKS,
                                                   (memaddr)&w->self,sizeof(memaddr),0,0);
        }

        t_print->wait_for(t_exec);
        t_exec->spawn();
    }
    t_print->spawn();
}