changeset 2047:de89da997e07 draft

add FileMapReduce
author Nozomi
date Wed, 27 Jan 2016 19:09:33 +0900
parents 476fc75a5e17
children 6796d85f3d6b
files TaskManager/ManyCore/FileMapReduce.cc TaskManager/ManyCore/FileMapReduce.h example/word_count/main.cc
diffstat 3 files changed, 418 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/TaskManager/ManyCore/FileMapReduce.cc	Wed Jan 27 19:09:33 2016 +0900
@@ -0,0 +1,412 @@
+#include "FileMapReduce.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/mman.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <sys/time.h>
+#include "TaskManager.h"
+#include "SchedTask.h"
+#include "Func.h"
+#include "WordCount.h"
+
+/* ;TODO
+ * PS3でCPU数が2以上の時に、あまりが計算されてない
+ */
+
+extern void task_init();
+void TMend(TaskManager *);
+static double st_time;
+static double ed_time;
+int all = 0;
+int use_task_array = 1;
+int use_task_creater = 0;
+int use_compat = 0;
+int use_iterate = 0;
+int array_task_num = 11;
+int spe_num = 1;
+int read_type = MY_MMAP;
+int t_exec_num = 4;
+CPU_TYPE spe_cpu = SPE_ANY;
+CPU_TYPE read_spe_cpu = IO_0;
+
+const char *usr_help_str = "Usage: ./word_count [-a -c -s] [-cpu spe_num] [-g] [-file filename] [-br]\n";
+
+static double
+getTime() {
+    struct timeval tv;
+    gettimeofday(&tv, NULL);
+    return tv.tv_sec + (double)tv.tv_usec*1e-6;
+}
+
+typedef struct {
+    caddr_t file_mmap;
+    off_t size;
+} st_mmap_t;
+
+/*与えられたsizeをfix_byte_sizeの倍数にする(丸め込むっていうのかな?)*/
+static int 
+fix_byte(int size,int fix_byte_size)
+{
+    size = (size/fix_byte_size)*fix_byte_size  + ((size%fix_byte_size)!= 0)*fix_byte_size;
+
+    return size;
+}
+
+static void
+my_read(char *filename, WordCount *w, TaskManager *manager)
+{
+    long fd = w->fd;
+    long r_filesize = w->read_filesize;
+
+    if ((fd=open(filename,O_RDONLY,0666))==0) {
+        fprintf(stderr,"can't open %s\n",filename);
+    }
+
+    w->file_mmap = (char*)manager->allocate(w->read_filesize);
+
+    long one_read_size = 1024 * 1024 * 1024;  // 1GB
+
+    for (int i = 0; 0 < r_filesize; i++) {
+        if (r_filesize > one_read_size) {
+            pread(fd, w->file_mmap + i*one_read_size, one_read_size,i*one_read_size);
+        }else if ((r_filesize < one_read_size) && (r_filesize != 0)) {
+            pread(fd, w->file_mmap + i*one_read_size, r_filesize,i*one_read_size);
+        }
+        r_filesize -= one_read_size;
+    }
+
+    if (w->file_mmap == (caddr_t)-1) {
+        fprintf(stderr,"Can't mmap file\n");
+        perror(NULL);
+        exit(0);
+    }
+
+    return ;
+}
+
+static void
+my_mmap(char *filename, WordCount *w)
+{
+    /*マッピングだよ!*/
+    int map = MAP_PRIVATE;
+    st_mmap_t st_mmap;
+    struct stat sb;
+    long fd = w->fd;
+
+    if ((fd=open(filename,O_RDONLY,0666))==0) {
+        fprintf(stderr,"can't open %s\n",filename);
+    }
+
+    if (fstat(fd,&sb)) {
+        fprintf(stderr,"can't fstat %s\n",filename);
+    }
+
+    st_mmap.size = fix_byte(sb.st_size,4096);
+
+    //madvise(w->file_mmap, w->read_filesize, POSIX_MADV_NORMAL);
+    w->file_mmap = (char*)mmap(NULL,st_mmap.size,PROT_WRITE|PROT_READ,map,fd,(off_t)0);
+
+    if (st_mmap.file_mmap == (caddr_t)-1) {
+        fprintf(stderr,"Can't mmap file\n");
+        perror(NULL);
+        exit(0);
+    }
+
+    return ;
+}
+
+static void
+run_tasks(SchedTask *manager, WordCount *w, int task_count, HTaskPtr t_read, HTaskPtr t_next, int size)
+{
+
+    if (task_count < array_task_num) {
+        array_task_num = task_count;
+        if (task_count<=0) return;
+    }
+    for (int i = 0; i < task_count; i += array_task_num) {
+        HTask *task_array;
+        if (use_task_array) {
+            int task_num = (w->size+size-1)/size;
+            if (task_num>array_task_num) task_num = array_task_num;
+            task_array = manager->create_task_array(TASK_EXEC,task_num,1,1,1);
+            if (t_read != 0) task_array->wait_for(t_read);
+            if (!all) { 
+                t_next->wait_for(task_array);
+            } else {
+                w->t_print->wait_for(task_array);
+            }
+        }
+
+        Task *t_exec = 0;
+        HTask *h_exec = 0;
+        for (int j = 0; j < array_task_num; j++) {
+            int i = w->task_spawned++;
+            if (w->size < size) size = w->size;
+            if (size==0) break;
+            if (use_task_array) {
+                t_exec = task_array->next_task_array(TASK_EXEC,t_exec);
+                t_exec->set_param(0,(long)0);
+                t_exec->set_param(1,(long)0);
+                t_exec->set_param(2,(long)size);
+                t_exec->set_param(3,(long)w->division_out_size);
+                t_exec->set_inData(0,w->file_mmap + i*w->division_size, size);
+                t_exec->set_outData(0,w->o_data + i*w->out_size, w->division_out_size);
+            } else if (use_compat) {
+                h_exec = manager->create_task(TASK_EXEC);
+                t_exec->set_param(0,(long)0);
+                t_exec->set_param(1,(long)0);
+                t_exec->set_param(2,(long)size);
+                t_exec->set_param(3,(long)w->division_out_size);
+                h_exec->set_inData(0,w->file_mmap + i*w->division_size, size);
+                h_exec->set_outData(0,w->o_data + i*w->out_size, w->division_out_size);
+                
+                t_next->wait_for(h_exec);
+
+                h_exec->set_cpu(spe_cpu);
+                h_exec->spawn();
+            } else if (use_iterate) {
+                array_task_num = w->task_num;
+                use_iterate = 0;
+                use_compat = 1;
+                
+                w->size -= size*array_task_num;
+                if(w->size < 0) {
+                    array_task_num -= 1;
+                    w->size += size;
+                }
+                h_exec = manager->create_task(TASK_EXEC);
+                h_exec->flip();
+                h_exec->set_inData(0,w->file_mmap,w->file_size);
+                h_exec->set_inData(1,w->o_data,w->out_size_);
+                h_exec->set_outData(0,w->file_mmap,w->file_size);
+                h_exec->set_outData(1,w->o_data,w->out_size_);
+                h_exec->set_param(0,(long)i);
+                h_exec->set_param(1,(long)w->division_size);
+                h_exec->set_param(2,(long)size);
+                h_exec->set_param(3,(long)w->out_size);
+                
+                t_next->wait_for(h_exec);
+                h_exec->set_cpu(spe_cpu);
+                h_exec->iterate(array_task_num);
+                
+                w->task_num -= array_task_num;
+                w->task_spawned += array_task_num-1;
+                
+                break;
+            } else {
+                h_exec = manager->create_task(TASK_EXEC,
+                                              (memaddr)(w->file_mmap + i*w->division_size), size,
+                                              (memaddr)(w->o_data + i*w->out_size), w->division_out_size);
+                t_exec->set_param(0,(long)0);
+                t_exec->set_param(1,(long)0);
+                t_exec->set_param(2,(long)size);
+                t_exec->set_param(3,(long)w->division_out_size);
+                t_next->wait_for(h_exec);
+                h_exec->set_cpu(spe_cpu);
+                h_exec->spawn();
+            }
+            w->size -= size;
+            w->task_num--;
+        }
+        if (use_task_array) {
+            task_array->spawn_task_array(t_exec->next());
+            task_array->set_cpu(spe_cpu);
+            task_array->spawn();
+        } else {
+            //if (!all) t_next->wait_for(h_exec);
+        }
+    } 
+}
+
+/**
+ *   このTaskは、PPE上で実行されるので、並列に実行されることはない
+ *   二つ実行されていて、Task が足りなくなることがないようにしている。
+ */
+
+SchedDefineTask1(BREAD_RUN_TASK_BLOCKS,bread_run16);
+
+static int
+bread_run16(SchedTask *manager, void *in, void *out)
+{
+    WordCount *w = *(WordCount **)in;
+
+    HTaskPtr t_read = manager->create_task(READ_TASK);
+    w->t_print->wait_for(t_read);
+    t_read->set_cpu(read_spe_cpu);
+    t_read->set_param(0,w->fd);
+
+    if (w->task_num < w->task_blocks) {
+        t_read->set_param(1,w->task_spawned*w->division_size);
+        t_read->set_outData(0, w->file_mmap + w->task_spawned * w->division_size, w->task_blocks * w->division_size);
+
+        // last case
+        while (w->size >= w->division_size)
+            run_tasks(manager,w,w->task_num,t_read,w->t_print, w->division_size);
+        // remaining data
+        while (w->size>0)
+            run_tasks(manager,w,1,t_read,w->t_print, w->division_size);
+
+        t_read->set_param(2,w->task_spawned*w->division_size);
+        t_read->spawn();
+    } else {
+        HTaskPtr t_next = manager->create_task(BREAD_RUN_TASK_BLOCKS,
+                                               (memaddr)&w->self,sizeof(memaddr),0,0);
+        w->t_print->wait_for(t_next);
+
+        t_read->set_param(1,w->task_spawned*w->division_size);
+        t_read->set_outData(0, w->file_mmap + w->task_spawned * w->division_size, w->task_blocks * w->division_size);
+
+        run_tasks(manager,w, w->task_blocks, t_read, t_next, w->division_size);
+
+        t_read->set_param(2,w->task_spawned*w->division_size);
+
+        t_read->spawn();
+        t_next->spawn();
+    }
+    return 0;
+}
+
+SchedDefineTask1(RUN_TASK_BLOCKS,run16);
+
+static int
+run16(SchedTask *manager, void *in, void *out)
+{
+    WordCount *w = *(WordCount **)in;
+
+    if(use_iterate) {
+        run_tasks(manager, w, w->task_num, 0, w->t_print, w->division_size);
+    } else if (w->task_num < w->task_blocks) {
+        // last case
+        while (w->size >= w->division_size)
+            run_tasks(manager,w,w->task_num,0,w->t_print, w->division_size);
+        // remaining data
+        while (w->size>0)
+            run_tasks(manager,w,1,0, w->t_print, w->size);
+        // printf("run16 last %d\n",w->task_num);
+    } else {
+        HTaskPtr t_next = manager->create_task(RUN_TASK_BLOCKS,
+                                               (memaddr)&w->self,sizeof(memaddr),0,0);
+        w->t_print->wait_for(t_next);
+
+        run_tasks(manager,w, w->task_blocks,0, t_next, w->division_size);
+
+        t_next->spawn();
+        // printf("run16 next %d\n",w->task_num);
+    }
+    return 0;
+}
+
+static int blocks = 48;
+//static int blocks = 31 * 6 * 24;
+static int division = 16; // in Kbyte
+
+static void
+run_start(TaskManager *manager, char *filename)
+{
+    long   fd = (long)manager->allocate(sizeof(long));
+    struct stat *sb = (struct stat*)manager->allocate(sizeof(struct stat));
+    HTaskPtr t_exec;
+
+    if ((fd=open(filename,O_RDONLY,0666))==0) {
+        fprintf(stderr,"can't open %s\n",filename);
+        return ;
+    }
+
+    if (fstat(fd,sb)) {
+        fprintf(stderr,"can't fstat %s\n",filename);
+        return ;
+    }
+
+    WordCountPtr w = (WordCountPtr)manager->allocate(sizeof(WordCount));
+
+    w->self = w;
+    w->fd = fd;
+    w->read_filesize = sb->st_size;
+
+
+    if (read_type == BLOCKED_READ) {
+        printf("[blocked read mode]\n");
+        w->file_mmap = (char*)manager->allocate(w->read_filesize);
+    }else if (read_type == MY_READ) {
+        printf("[single read mode]\n");
+        my_read(filename, w, manager);
+    }else if(read_type == MY_MMAP) {
+        printf("[mmap mode]\n");
+        my_mmap(filename, w);
+    }else if(read_type == BLOCKED_MMAP) {
+        printf("[blocked mmap mode]\n");
+        my_mmap(filename, w);
+    }
+
+    HTaskPtr t_print;
+
+    //w->task_blocks = blocks;
+    w->self = w;
+    w->task_spawned = 0;
+
+    w->size = w->file_size = w->read_filesize;
+    printf("w %lx\n",(long)w);
+
+    /* 1task分のデータサイズ(byte) */
+    if (w->size >= 1024*division) {
+        w->division_size = 1024 * division;/*16kbyte*/
+    } else {
+        w->division_size = w->size;
+    }
+
+    printf("division_size %ld\n",w->division_size);
+
+    /* "word num" and "line num" */
+    w->status_num = 2;
+    /* taskの数 */
+    w->task_num = w->size / w->division_size;
+    w->task_num = w->task_num + (w->division_size*w->task_num < w->size);
+    int out_task_num = w->task_num;
+
+    if(!all) {
+        w->task_blocks = blocks;
+    } else {
+        w->task_blocks = w->task_num;
+    }
+
+    w->out_task_num = out_task_num;
+    printf("task_num %ld\n",w->task_num);
+    printf("out_task_num %ld\n",w->out_task_num);
+
+    /* out用のdivision_size. statusが2つなので、あわせて16byteになるように、long long(4byte)を使用 */
+
+    w->division_out_size = sizeof(unsigned long long)*4;
+    int out_size = w->division_out_size*out_task_num;
+    w->o_data = (unsigned long long *)manager->allocate(out_size);
+    w->out_size_ = out_size;
+    w->out_size = 4;
+    printf("out size %d\n",out_size);
+
+    /*各SPEの結果を合計して出力するタスク*/
+
+    t_print = manager->create_task(TASK_PRINT,
+                                   (memaddr)&w->self,sizeof(memaddr),0,0);
+    w->t_print = t_print;    
+    for(int i=0;i<t_exec_num;i++) {
+        /* Task を task_blocks ずつ起動する Task */
+        /* serialize されていると仮定する... */
+        if (read_type == BLOCKED_READ) {
+            t_exec = manager->create_task(BREAD_RUN_TASK_BLOCKS,
+                                                   (memaddr)&w->self,sizeof(memaddr),0,0);
+        }else {
+            t_exec = manager->create_task(RUN_TASK_BLOCKS,
+                                                   (memaddr)&w->self,sizeof(memaddr),0,0);
+        }
+
+        t_print->wait_for(t_exec);
+        //    t_exec->iterate(4);
+        t_exec->spawn();
+    }
+    t_print->spawn();
+}
+/* end */
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/TaskManager/ManyCore/FileMapReduce.h	Wed Jan 27 19:09:33 2016 +0900
@@ -0,0 +1,4 @@
+class FileMapReduce {
+    FileMapReduce(TaskManager *manager,const char* filename,int TASK_EXEC,int TASK_PRINT);
+    void start();
+}
--- a/example/word_count/main.cc	Wed Jan 27 18:55:25 2016 +0900
+++ b/example/word_count/main.cc	Wed Jan 27 19:09:33 2016 +0900
@@ -484,6 +484,8 @@
     task_init();
     st_time = getTime();
     run_start(manager, filename);
+    // FileMapReduce fmp = new FileMapReduce(manager, filename, TASK_EXEC, TASK_PRINT);
+    // fmp->start();
     manager->set_TMend(TMend);
     return 0;
 }