changeset 2051:b79a250b4f99 draft

FileMapReduce (no compile error)
author masa
date Thu, 28 Jan 2016 17:23:45 +0900
parents 26dd777ba95d
children cc1ea3933551
files TaskManager/ManyCore/FileMapReduce.cc TaskManager/ManyCore/FileMapReduce.h TaskManager/kernel/sys_task/Read.cc TaskManager/kernel/sys_task/Read.h TaskManager/kernel/sys_task/SysTasks.h TaskManager/kernel/sys_task/systask_register.cc example/word_count/Func.h example/word_count/main.cc example/word_count/ppe/Read.cc example/word_count/ppe/Read.h example/word_count/task_init.cc
diffstat 11 files changed, 182 insertions(+), 608 deletions(-) [+]
line wrap: on
line diff
--- a/TaskManager/ManyCore/FileMapReduce.cc	Thu Jan 28 15:43:36 2016 +0900
+++ b/TaskManager/ManyCore/FileMapReduce.cc	Thu Jan 28 17:23:45 2016 +0900
@@ -2,6 +2,7 @@
 
 #include <stdio.h>
 #include <stdlib.h>
+#include <string.h>
 #include <sys/stat.h>
 #include <fcntl.h>
 #include <unistd.h>
@@ -12,62 +13,92 @@
  * PS3でCPU数が2以上の時に、あまりが計算されてない
  */
 
-extern void task_init();
-void TMend(TaskManager *);
-int all = 0;
-int use_task_array = 1;
-int use_task_creater = 0;
-int use_compat = 0;
-int use_iterate = 0;
-int array_task_num = 11;
-int spe_num = 1;
-int read_type = MY_MMAP;
-int t_exec_num = 4;
-CPU_TYPE spe_cpu = SPE_ANY;
-CPU_TYPE read_spe_cpu = IO_0;
+enum {
+#include "SysTasks.h"
+};
 
-const char *usr_help_str = "Usage: ./word_count [-a -c -s] [-cpu spe_num] [-g] [-file filename] [-br]\n";
-
-FileMapReduce::FileMapReduce(TaskManager *manager,const char* filename,int TASK_EXEC,int TASK_PRINT) {
-    task_init();
-    run_start(manager,filename);
+FileMapReduce::FileMapReduce(TaskManager *manager,int TASK_EXEC,int TASK_EXEC_DATA_PARALLEL,int TASK_PRINT) {
+    all = 0;
+    use_task_array = 1;
+    use_task_creater = 0;
+    use_compat = 0;
+    use_iterate = 0;
+    array_task_num = 11;
+    spe_num = 1;
+    read_type = MY_MMAP;
+    t_exec_num = 4;
+    spe_cpu = SPE_ANY;
+    read_spe_cpu = IO_0;
+    blocks = 48;
+    division = 16;  // in KByte
+    this->TASK_EXEC = TASK_EXEC;
+    this->TASK_EXEC_DATA_PARALLEL = TASK_EXEC_DATA_PARALLEL;
+    this->TASK_PRINT = TASK_PRINT;
+    fmp_help_str = "[-a -c -s] [-cpu spe_num] [-g] [-file filename] [-br]\n";
 }
 
 FileMapReduce::~FileMapReduce() {
 
 }
 
-#ifdef __CERIUM_GPU__
-#include "GpuScheduler.h"
-#endif
-#ifdef __CERIUM_CUDA__
-#include "CudaScheduler.h"
-#endif
-
-/* 必ずこの位置に書いて */
-SchedExternTask(READ_TASK);
-SchedExternTask(BREAD_RUN_TASK_BLOCKS);
-SchedExternTask(Exec);
-SchedExternTask(Print);
-SchedExternTask(RUN_TASK_BLOCKS);
+char*
+FileMapReduce::init(int argc, char **argv)
+{
+    char *filename = 0;
+    for (int i = 1; argv[i]; ++i) {
+        if (strcmp(argv[i], "-file") == 0) {
+            filename = argv[i+1]; i++;
+        } else if (strcmp(argv[i], "-division") == 0) {
+            division = atoi(argv[i+1]);
+            i++;
+        } else if (strcmp(argv[i], "-block") == 0) {
+            blocks = atoi(argv[i+1]);
+            i++;
+        } else if (strcmp(argv[i], "-a") == 0) {
+            // create task all at once
+            all = 1;
+        } else if (strcmp(argv[i], "-c") == 0) {
+            use_task_array = 0;
+            use_compat = 1;
+        } else if (strcmp(argv[i], "-s") == 0) {
+            use_task_array = 0;
+            use_compat = 0;
+        } else if (strcmp(argv[i], "-t") == 0) {
+            use_task_creater = 1;
+            use_task_array = 0;
+            use_compat = 0;
+        } else if (strcmp(argv[i], "-anum") == 0) {
+            array_task_num = atoi(argv[i+1]);
+            i++;
+        } else if (strcmp(argv[i], "-g") == 0) {
+            spe_cpu = GPU_0;
+        } else if (strcmp(argv[i], "-any") == 0) {
+            spe_cpu = ANY_ANY;
+        } else if (strcmp(argv[i], "-i") == 0) {
+            use_iterate = 1;
+            use_task_array = 0;
+            t_exec_num = 1;
+        } else if (strcmp(argv[i], "-br") == 0) {
+            read_type = BLOCKED_READ;
+        } else if (strcmp(argv[i], "-r") == 0) {
+            read_type = MY_READ;
+        }
+        /* else if (strcmp(argv[i], "-cpu") == 0) {
+            spe_num = atoi(argv[i+1]);
+            i++;
+            if (spe_num==0) spe_num = 1;
+            } else {
+            fprintf(stderr,"%s\n",fmp_help_str);
+            exit (0);
+            }*/
+    }
+    if (filename==0) {
+        printf("Usage: %s ",argv[0]);
+        puts(fmp_help_str);
+        exit(1);
+    }
 
-void
-task_init(void)
-{
-#ifdef __CERIUM_GPU__
-    GpuSchedRegister(TASK_EXEC, "gpu/Exec.cl", "wordcount");
-#endif
-#ifdef __CERIUM_CUDA__
-    CudaSchedRegister(TASK_EXEC, "cuda/Exec.ptx", "wordcount");
-#endif
-
-    SchedRegisterTask(TASK_EXEC, Exec);
-
-    SchedRegister(READ_TASK);
-    SchedRegister(BREAD_RUN_TASK_BLOCKS);
-
-    SchedRegisterTask(TASK_PRINT, Print);
-    SchedRegister(RUN_TASK_BLOCKS);
+    return filename;
 }
 
 /*与えられたsizeをfix_byte_sizeの倍数にする(丸め込むっていうのかな?)*/
@@ -79,8 +110,8 @@
     return size;
 }
 
-static void
-my_read(const char *filename, WordCount *w, TaskManager *manager)
+void
+FileMapReduce::my_read(const char *filename, MapReduce *w, TaskManager *manager)
 {
     long fd = w->fd;
     long r_filesize = w->read_filesize;
@@ -109,8 +140,8 @@
     }
 }
 
-static void
-my_mmap(const char *filename, WordCount *w)
+void
+FileMapReduce::my_mmap(const char *filename, MapReduce *w)
 {
     /*マッピングだよ!*/
     int map = MAP_PRIVATE;
@@ -137,8 +168,14 @@
     }
 }
 
-static void
-run_tasks(SchedTask *manager, WordCount *w, int task_count, HTaskPtr t_read, HTaskPtr t_next, int size)
+void
+run_tasks(SchedTask *manager, MapReduce *w, int task_count, HTaskPtr t_read, HTaskPtr t_next, int size)
+{
+    w->fmp->run_tasks(manager,w,task_count,t_read,t_next,size);
+}
+
+void
+FileMapReduce::run_tasks(SchedTask *manager, MapReduce *w, int task_count, HTaskPtr t_read, HTaskPtr t_next, int size)
 {
     if (task_count < array_task_num) {
         array_task_num = task_count;
@@ -192,7 +229,7 @@
                     array_task_num -= 1;
                     w->size += size;
                 }
-                h_exec = manager->create_task(TASK_EXEC);
+                h_exec = manager->create_task(TASK_EXEC_DATA_PARALLEL);
                 h_exec->flip();
                 h_exec->set_inData(0,w->file_mmap,w->file_size);
                 h_exec->set_inData(1,w->o_data,w->out_size_);
@@ -243,11 +280,11 @@
 static int
 bread_run16(SchedTask *manager, void *in, void *out)
 {
-    WordCount *w = *(WordCount **)in;
+    MapReduce *w = *(MapReduce **)in;
 
     HTaskPtr t_read = manager->create_task(READ_TASK);
     w->t_print->wait_for(t_read);
-    t_read->set_cpu(read_spe_cpu);
+    t_read->set_cpu(w->fmp->read_spe_cpu);
     t_read->set_param(0,w->fd);
 
     if (w->task_num < w->task_blocks) {
@@ -286,9 +323,9 @@
 static int
 run16(SchedTask *manager, void *in, void *out)
 {
-    WordCount *w = *(WordCount **)in;
+    MapReduce *w = *(MapReduce **)in;
 
-    if(use_iterate) {
+    if(w->fmp->use_iterate) {
         run_tasks(manager, w, w->task_num, 0, w->t_print, w->division_size);
     } else if (w->task_num < w->task_blocks) {
         // last case
@@ -311,11 +348,8 @@
     return 0;
 }
 
-static int blocks = 48;
-static int division = 16; // in Kbyte
-
-static void
-run_start(TaskManager *manager, const char *filename)
+void
+FileMapReduce::run_start(TaskManager *manager, const char *filename)
 {
     long   fd = (long)manager->allocate(sizeof(long));
     struct stat *sb = (struct stat*)manager->allocate(sizeof(struct stat));
@@ -331,13 +365,13 @@
         return ;
     }
 
-    WordCountPtr w = (WordCountPtr)manager->allocate(sizeof(WordCount));
+    MapReducePtr w = (MapReducePtr)manager->allocate(sizeof(MapReduce));
 
     w->self = w;
+    w->fmp = this;
     w->fd = fd;
     w->read_filesize = sb->st_size;
 
-
     if (read_type == BLOCKED_READ) {
         printf("[blocked read mode]\n");
         w->file_mmap = (char*)manager->allocate(w->read_filesize);
@@ -417,10 +451,3 @@
     }
     t_print->spawn();
 }
-
-void
-start()
-{
-    t_exec->spawn();
-    t_print->spawn();
-}
--- a/TaskManager/ManyCore/FileMapReduce.h	Thu Jan 28 15:43:36 2016 +0900
+++ b/TaskManager/ManyCore/FileMapReduce.h	Thu Jan 28 17:23:45 2016 +0900
@@ -4,8 +4,11 @@
 #include "SchedTask.h"
 #include "HTask.h"
 
-typedef struct wordCount {
-    struct wordCount *self;
+class FileMapReduce;
+
+typedef struct mapReduce {
+    struct mapReduce *self;
+    FileMapReduce *fmp;
     long fd;
     long read_filesize;
     CPU_TYPE read_cpu;
@@ -27,16 +30,7 @@
     long file_size;
     HTaskPtr t_print;
     HTaskPtr t_exec;
-} WordCount, *WordCountPtr;
-
-enum {
-#include "SysTasks.h"
-    BREAD_RUN_TASK_BLOCKS,
-    READ_TASK,
-    TASK_EXEC,
-    RUN_TASK_BLOCKS,
-    TASK_PRINT,
-};
+} MapReduce, *MapReducePtr;
 
 // Read Type
 enum {
@@ -59,18 +53,38 @@
 class FileMapReduce {
 public:
     /* constructor */
-    FileMapReduce(TaskManager *manager,const char* filename,int TASK_EXEC,int TASK_PRINT);
+    FileMapReduce(TaskManager *manager,int TASK_EXEC,int TASK_EXEC_DATA_PARALLEL,int TASK_PRINT);
     ~FileMapReduce();
 
     /* User API */
-    static void run_start(TaskManager *manager, const char *filename);
-    void start();
+    void run_start(TaskManager *manager, const char *filename);
+    void run_tasks(SchedTask *manager, MapReduce *w, int task_count, HTaskPtr t_read, HTaskPtr t_next, int size);
+    char* init(int argc, char **argv);
 
 private:
-    static void my_read(const char *filename, WordCount *w, TaskManager *manager);
-    static void my_mmap(const char *filename, WordCount *w);
+    void my_read(const char *filename, MapReduce *w, TaskManager *manager);
+    void my_mmap(const char *filename, MapReduce *w);
+    void task_init(void);
 
     /* variable */
-    HTaskPtr t_exec;
-    HTaskPtr t_print;
+    int all;
+    int use_task_array;
+    int use_task_creater ;
+    int use_compat ;
+    int array_task_num ;
+    int spe_num ;
+    int read_type ;
+    int t_exec_num ;
+    CPU_TYPE spe_cpu ;
+    int TASK_EXEC;
+    int TASK_EXEC_DATA_PARALLEL;
+    int TASK_PRINT;
+    int blocks;
+    int division; // in Kbyte
+public:
+    CPU_TYPE read_spe_cpu;
+    int use_iterate;
+    int task_exec_id;
+    int task_print_id;
+    const char *fmp_help_str;
 };
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/TaskManager/kernel/sys_task/Read.cc	Thu Jan 28 17:23:45 2016 +0900
@@ -0,0 +1,28 @@
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <sys/stat.h>
+#include <iostream>
+#include <vector>
+#include <cstdlib>
+
+#include "SchedTask.h"
+#include "SysFunc.h"
+
+/* これは必須 */
+SchedDefineTask1(READ_TASK,read_task);
+
+static int
+read_task(SchedTask *s, void *rbuf, void *wbuf)
+{
+    long fd = (long)s->get_param(0);
+    long start_read_position = (long)s->get_param(1);
+    long end_read_position = (long)s->get_param(2);
+    char *read_text = (char*)s->get_output(wbuf,0);
+
+    long read_size = end_read_position - start_read_position;
+
+    pread(fd, read_text, read_size , start_read_position);
+
+    return 0;
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/TaskManager/kernel/sys_task/Read.h	Thu Jan 28 17:23:45 2016 +0900
@@ -0,0 +1,7 @@
+#ifndef INCLUDED_TASK_HELLO
+#define INCLUDED_TASK_HELLO
+
+#include "SchedTask.h"
+
+
+#endif
--- a/TaskManager/kernel/sys_task/SysTasks.h	Thu Jan 28 15:43:36 2016 +0900
+++ b/TaskManager/kernel/sys_task/SysTasks.h	Thu Jan 28 17:23:45 2016 +0900
@@ -2,4 +2,7 @@
 FinishTask,
 ShowTime,
 StartProfile,
+READ_TASK,
+BREAD_RUN_TASK_BLOCKS,
+RUN_TASK_BLOCKS,
 #define Dummy StartTask
--- a/TaskManager/kernel/sys_task/systask_register.cc	Thu Jan 28 15:43:36 2016 +0900
+++ b/TaskManager/kernel/sys_task/systask_register.cc	Thu Jan 28 17:23:45 2016 +0900
@@ -3,12 +3,16 @@
 
 SchedExternTask(StartTask);
 SchedExternTask(FinishTask);
-// SchedExternTask(TaskArray);
+SchedExternTask(READ_TASK);
+SchedExternTask(BREAD_RUN_TASK_BLOCKS);
+SchedExternTask(RUN_TASK_BLOCKS);
 
 void
 systask_register()
 {
     SchedRegister(StartTask);
     SchedRegister(FinishTask);
-//     SchedRegister(TaskArray);
+    SchedRegister(READ_TASK);
+    SchedRegister(BREAD_RUN_TASK_BLOCKS);
+    SchedRegister(RUN_TASK_BLOCKS);
 }
--- a/example/word_count/Func.h	Thu Jan 28 15:43:36 2016 +0900
+++ b/example/word_count/Func.h	Thu Jan 28 17:23:45 2016 +0900
@@ -1,22 +1,6 @@
 enum {
 #include "SysTasks.h"
-    BREAD_RUN_TASK_BLOCKS,
-    READ_TASK,
     TASK_EXEC,
     TASK_EXEC_DATA_PARALLEL,
-    RUN_TASK_BLOCKS,
     TASK_PRINT,
 };
-
-// Read Type
-enum {
-    MY_MMAP,
-    MY_READ,
-    BLOCKED_READ,
-    BLOCKED_MMAP,
-};
-
-#define DATA_NUM 16
-#define ADD_NUM 26
-
-#define DATA_ID 0
--- a/example/word_count/main.cc	Thu Jan 28 15:43:36 2016 +0900
+++ b/example/word_count/main.cc	Thu Jan 28 17:23:45 2016 +0900
@@ -1,16 +1,9 @@
 #include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <sys/mman.h>
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <fcntl.h>
-#include <unistd.h>
 #include <sys/time.h>
 #include "TaskManager.h"
 #include "SchedTask.h"
 #include "Func.h"
-#include "WordCount.h"
+#include "FileMapReduce.h"
 
 /* ;TODO
  * PS3でCPU数が2以上の時に、あまりが計算されてない
@@ -20,19 +13,7 @@
 void TMend(TaskManager *);
 static double st_time;
 static double ed_time;
-int all = 0;
-int use_task_array = 1;
-int use_task_creater = 0;
-int use_compat = 0;
-int use_iterate = 0;
-int array_task_num = 11;
-int spe_num = 1;
-int read_type = MY_MMAP;
-int t_exec_num = 4;
-CPU_TYPE spe_cpu = SPE_ANY;
-CPU_TYPE read_spe_cpu = IO_0;
-
-const char *usr_help_str = "Usage: ./word_count [-a -c -s] [-cpu spe_num] [-g] [-file filename] [-br]\n";
+const char* usr_help_str = "";
 
 static double
 getTime() {
@@ -41,441 +22,13 @@
     return tv.tv_sec + (double)tv.tv_usec*1e-6;
 }
 
-typedef struct {
-    caddr_t file_mmap;
-    off_t size;
-} st_mmap_t;
-
-/*与えられたsizeをfix_byte_sizeの倍数にする(丸め込むっていうのかな?)*/
-static int 
-fix_byte(int size,int fix_byte_size)
-{
-    size = (size/fix_byte_size)*fix_byte_size  + ((size%fix_byte_size)!= 0)*fix_byte_size;
-
-    return size;
-}
-
-static void
-my_read(char *filename, WordCount *w, TaskManager *manager)
-{
-    long fd = w->fd;
-    long r_filesize = w->read_filesize;
-
-    if ((fd=open(filename,O_RDONLY,0666))==0) {
-        fprintf(stderr,"can't open %s\n",filename);
-    }
-
-    w->file_mmap = (char*)manager->allocate(w->read_filesize);
-
-    long one_read_size = 1024 * 1024 * 1024;  // 1GB
-
-    for (int i = 0; 0 < r_filesize; i++) {
-        if (r_filesize > one_read_size) {
-            pread(fd, w->file_mmap + i*one_read_size, one_read_size,i*one_read_size);
-        }else if ((r_filesize < one_read_size) && (r_filesize != 0)) {
-            pread(fd, w->file_mmap + i*one_read_size, r_filesize,i*one_read_size);
-        }
-        r_filesize -= one_read_size;
-    }
-
-    if (w->file_mmap == (caddr_t)-1) {
-        fprintf(stderr,"Can't mmap file\n");
-        perror(NULL);
-        exit(0);
-    }
-
-    return ;
-}
-
-static void
-my_mmap(char *filename, WordCount *w)
-{
-    /*マッピングだよ!*/
-    int map = MAP_PRIVATE;
-    st_mmap_t st_mmap;
-    struct stat sb;
-    long fd = w->fd;
-
-    if ((fd=open(filename,O_RDONLY,0666))==0) {
-        fprintf(stderr,"can't open %s\n",filename);
-    }
-
-    if (fstat(fd,&sb)) {
-        fprintf(stderr,"can't fstat %s\n",filename);
-    }
-
-    st_mmap.size = fix_byte(sb.st_size,4096);
-
-    //madvise(w->file_mmap, w->read_filesize, POSIX_MADV_NORMAL);
-    w->file_mmap = (char*)mmap(NULL,st_mmap.size,PROT_WRITE|PROT_READ,map,fd,(off_t)0);
-
-    if (st_mmap.file_mmap == (caddr_t)-1) {
-        fprintf(stderr,"Can't mmap file\n");
-        perror(NULL);
-        exit(0);
-    }
-
-    return ;
-}
-
-static void
-run_tasks(SchedTask *manager, WordCount *w, int task_count, HTaskPtr t_read, HTaskPtr t_next, int size)
-{
-
-    if (task_count < array_task_num) {
-        array_task_num = task_count;
-        if (task_count<=0) return;
-    }
-    for (int i = 0; i < task_count; i += array_task_num) {
-        HTask *task_array;
-        if (use_task_array) {
-            int task_num = (w->size+size-1)/size;
-            if (task_num>array_task_num) task_num = array_task_num;
-            task_array = manager->create_task_array(TASK_EXEC,task_num,1,1,1);
-            if (t_read != 0) task_array->wait_for(t_read);
-            if (!all) { 
-                t_next->wait_for(task_array);
-            } else {
-                w->t_print->wait_for(task_array);
-            }
-        }
-
-        Task *t_exec = 0;
-        HTask *h_exec = 0;
-        for (int j = 0; j < array_task_num; j++) {
-            int i = w->task_spawned++;
-            if (w->size < size) size = w->size;
-            if (size==0) break;
-            if (use_task_array) {
-                t_exec = task_array->next_task_array(TASK_EXEC,t_exec);
-                t_exec->set_param(0,(long)0);
-                t_exec->set_param(1,(long)0);
-                t_exec->set_param(2,(long)size);
-                t_exec->set_param(3,(long)w->division_out_size);
-                t_exec->set_inData(0,w->file_mmap + i*w->division_size, size);
-                t_exec->set_outData(0,w->o_data + i*w->out_size, w->division_out_size);
-            } else if (use_compat) {
-                h_exec = manager->create_task(TASK_EXEC);
-                t_exec->set_param(0,(long)0);
-                t_exec->set_param(1,(long)0);
-                t_exec->set_param(2,(long)size);
-                t_exec->set_param(3,(long)w->division_out_size);
-                h_exec->set_inData(0,w->file_mmap + i*w->division_size, size);
-                h_exec->set_outData(0,w->o_data + i*w->out_size, w->division_out_size);
-                
-                t_next->wait_for(h_exec);
-
-                h_exec->set_cpu(spe_cpu);
-                h_exec->spawn();
-            } else if (use_iterate) {
-                array_task_num = w->task_num;
-                use_iterate = 0;
-                use_compat = 1;
-                
-                w->size -= size*array_task_num;
-                if(w->size < 0) {
-                    array_task_num -= 1;
-                    w->size += size;
-                }
-                h_exec = manager->create_task(TASK_EXEC_DATA_PARALLEL);
-                h_exec->flip();
-                h_exec->set_inData(0,w->file_mmap,w->file_size);
-                h_exec->set_inData(1,w->o_data,w->out_size_);
-                h_exec->set_outData(0,w->file_mmap,w->file_size);
-                h_exec->set_outData(1,w->o_data,w->out_size_);
-                h_exec->set_param(0,(long)i);
-                h_exec->set_param(1,(long)w->division_size);
-                h_exec->set_param(2,(long)size);
-                h_exec->set_param(3,(long)w->out_size);
-                
-                t_next->wait_for(h_exec);
-                h_exec->set_cpu(spe_cpu);
-                h_exec->iterate(array_task_num);
-                
-                w->task_num -= array_task_num;
-                w->task_spawned += array_task_num-1;
-                
-                break;
-            } else {
-                h_exec = manager->create_task(TASK_EXEC,
-                                              (memaddr)(w->file_mmap + i*w->division_size), size,
-                                              (memaddr)(w->o_data + i*w->out_size), w->division_out_size);
-                t_exec->set_param(0,(long)0);
-                t_exec->set_param(1,(long)0);
-                t_exec->set_param(2,(long)size);
-                t_exec->set_param(3,(long)w->division_out_size);
-                t_next->wait_for(h_exec);
-                h_exec->set_cpu(spe_cpu);
-                h_exec->spawn();
-            }
-            w->size -= size;
-            w->task_num--;
-        }
-        if (use_task_array) {
-            task_array->spawn_task_array(t_exec->next());
-            task_array->set_cpu(spe_cpu);
-            task_array->spawn();
-        } else {
-            //if (!all) t_next->wait_for(h_exec);
-        }
-    } 
-}
-
-/**
- *   このTaskは、PPE上で実行されるので、並列に実行されることはない
- *   二つ実行されていて、Task が足りなくなることがないようにしている。
- */
-
-SchedDefineTask1(BREAD_RUN_TASK_BLOCKS,bread_run16);
-
-static int
-bread_run16(SchedTask *manager, void *in, void *out)
-{
-    WordCount *w = *(WordCount **)in;
-
-    HTaskPtr t_read = manager->create_task(READ_TASK);
-    w->t_print->wait_for(t_read);
-    t_read->set_cpu(read_spe_cpu);
-    t_read->set_param(0,w->fd);
-
-    if (w->task_num < w->task_blocks) {
-        t_read->set_param(1,w->task_spawned*w->division_size);
-        t_read->set_outData(0, w->file_mmap + w->task_spawned * w->division_size, w->task_blocks * w->division_size);
-
-        // last case
-        while (w->size >= w->division_size)
-            run_tasks(manager,w,w->task_num,t_read,w->t_print, w->division_size);
-        // remaining data
-        while (w->size>0)
-            run_tasks(manager,w,1,t_read,w->t_print, w->division_size);
-
-        t_read->set_param(2,w->task_spawned*w->division_size);
-        t_read->spawn();
-    } else {
-        HTaskPtr t_next = manager->create_task(BREAD_RUN_TASK_BLOCKS,
-                                               (memaddr)&w->self,sizeof(memaddr),0,0);
-        w->t_print->wait_for(t_next);
-
-        t_read->set_param(1,w->task_spawned*w->division_size);
-        t_read->set_outData(0, w->file_mmap + w->task_spawned * w->division_size, w->task_blocks * w->division_size);
-
-        run_tasks(manager,w, w->task_blocks, t_read, t_next, w->division_size);
-
-        t_read->set_param(2,w->task_spawned*w->division_size);
-
-        t_read->spawn();
-        t_next->spawn();
-    }
-    return 0;
-}
-
-SchedDefineTask1(RUN_TASK_BLOCKS,run16);
-
-static int
-run16(SchedTask *manager, void *in, void *out)
-{
-    WordCount *w = *(WordCount **)in;
-
-    if(use_iterate) {
-        run_tasks(manager, w, w->task_num, 0, w->t_print, w->division_size);
-    } else if (w->task_num < w->task_blocks) {
-        // last case
-        while (w->size >= w->division_size)
-            run_tasks(manager,w,w->task_num,0,w->t_print, w->division_size);
-        // remaining data
-        while (w->size>0)
-            run_tasks(manager,w,1,0, w->t_print, w->size);
-        // printf("run16 last %d\n",w->task_num);
-    } else {
-        HTaskPtr t_next = manager->create_task(RUN_TASK_BLOCKS,
-                                               (memaddr)&w->self,sizeof(memaddr),0,0);
-        w->t_print->wait_for(t_next);
-
-        run_tasks(manager,w, w->task_blocks,0, t_next, w->division_size);
-
-        t_next->spawn();
-        // printf("run16 next %d\n",w->task_num);
-    }
-    return 0;
-}
-
-static int blocks = 48;
-//static int blocks = 31 * 6 * 24;
-static int division = 16; // in Kbyte
-
-static void
-run_start(TaskManager *manager, char *filename)
-{
-    long   fd = (long)manager->allocate(sizeof(long));
-    struct stat *sb = (struct stat*)manager->allocate(sizeof(struct stat));
-    HTaskPtr t_exec;
-
-    if ((fd=open(filename,O_RDONLY,0666))==0) {
-        fprintf(stderr,"can't open %s\n",filename);
-        return ;
-    }
-
-    if (fstat(fd,sb)) {
-        fprintf(stderr,"can't fstat %s\n",filename);
-        return ;
-    }
-
-    WordCountPtr w = (WordCountPtr)manager->allocate(sizeof(WordCount));
-
-    w->self = w;
-    w->fd = fd;
-    w->read_filesize = sb->st_size;
-
-
-    if (read_type == BLOCKED_READ) {
-        printf("[blocked read mode]\n");
-        w->file_mmap = (char*)manager->allocate(w->read_filesize);
-    }else if (read_type == MY_READ) {
-        printf("[single read mode]\n");
-        my_read(filename, w, manager);
-    }else if(read_type == MY_MMAP) {
-        printf("[mmap mode]\n");
-        my_mmap(filename, w);
-    }else if(read_type == BLOCKED_MMAP) {
-        printf("[blocked mmap mode]\n");
-        my_mmap(filename, w);
-    }
-
-    HTaskPtr t_print;
-
-    //w->task_blocks = blocks;
-    w->self = w;
-    w->task_spawned = 0;
-
-    w->size = w->file_size = w->read_filesize;
-    printf("w %lx\n",(long)w);
-
-    /* 1task分のデータサイズ(byte) */
-    if (w->size >= 1024*division) {
-        w->division_size = 1024 * division;/*16kbyte*/
-    } else {
-        w->division_size = w->size;
-    }
-
-    printf("division_size %ld\n",w->division_size);
-
-    /* "word num" and "line num" */
-    w->status_num = 2;
-    /* taskの数 */
-    w->task_num = w->size / w->division_size;
-    w->task_num = w->task_num + (w->division_size*w->task_num < w->size);
-    int out_task_num = w->task_num;
-
-    if(!all) {
-        w->task_blocks = blocks;
-    } else {
-        w->task_blocks = w->task_num;
-    }
-
-    w->out_task_num = out_task_num;
-    printf("task_num %ld\n",w->task_num);
-    printf("out_task_num %ld\n",w->out_task_num);
-
-    /* out用のdivision_size. statusが2つなので、あわせて16byteになるように、long long(4byte)を使用 */
-
-    w->division_out_size = sizeof(unsigned long long)*4;
-    int out_size = w->division_out_size*out_task_num;
-    w->o_data = (unsigned long long *)manager->allocate(out_size);
-    w->out_size_ = out_size;
-    w->out_size = 4;
-    printf("out size %d\n",out_size);
-
-    /*各SPEの結果を合計して出力するタスク*/
-
-    t_print = manager->create_task(TASK_PRINT,
-                                   (memaddr)&w->self,sizeof(memaddr),0,0);
-    w->t_print = t_print;    
-    for(int i=0;i<t_exec_num;i++) {
-        /* Task を task_blocks ずつ起動する Task */
-        /* serialize されていると仮定する... */
-        if (read_type == BLOCKED_READ) {
-            t_exec = manager->create_task(BREAD_RUN_TASK_BLOCKS,
-                                                   (memaddr)&w->self,sizeof(memaddr),0,0);
-        }else {
-            t_exec = manager->create_task(RUN_TASK_BLOCKS,
-                                                   (memaddr)&w->self,sizeof(memaddr),0,0);
-        }
-
-        t_print->wait_for(t_exec);
-        //    t_exec->iterate(4);
-        t_exec->spawn();
-    }
-    t_print->spawn();
-}
-
-static char*
-init(int argc, char **argv)
-{
-
-    char *filename = 0;
-
-    for (int i = 1; argv[i]; ++i) {
-        if (strcmp(argv[i], "-file") == 0) {
-            filename = argv[i+1]; i++;
-        } else if (strcmp(argv[i], "-division") == 0) {
-            division = atoi(argv[i+1]);
-            i++;
-        } else if (strcmp(argv[i], "-block") == 0) {
-            blocks = atoi(argv[i+1]);
-            i++;
-        } else if (strcmp(argv[i], "-a") == 0) {
-            // create task all at once
-            all = 1;
-        } else if (strcmp(argv[i], "-c") == 0) {
-            use_task_array = 0;
-            use_compat = 1;
-        } else if (strcmp(argv[i], "-s") == 0) {
-            use_task_array = 0;
-            use_compat = 0;
-        } else if (strcmp(argv[i], "-t") == 0) {
-            use_task_creater = 1;
-            use_task_array = 0;
-            use_compat = 0;
-        } else if (strcmp(argv[i], "-anum") == 0) {
-            array_task_num = atoi(argv[i+1]);
-            i++;
-        } else if (strcmp(argv[i], "-g") == 0) {
-            spe_cpu = GPU_0;
-        } else if (strcmp(argv[i], "-any") == 0) {
-            spe_cpu = ANY_ANY;
-        } else if (strcmp(argv[i], "-i") == 0) {
-            use_iterate = 1;
-            use_task_array = 0;
-            t_exec_num = 1;
-        } else if (strcmp(argv[i], "-br") == 0) {
-            read_type = BLOCKED_READ;
-        } else if (strcmp(argv[i], "-r") == 0) {
-            read_type = MY_READ;
-        }
-        /* else if (strcmp(argv[i], "-cpu") == 0) {
-            spe_num = atoi(argv[i+1]);
-            i++;
-            if (spe_num==0) spe_num = 1;
-            } else {
-            fprintf(stderr,"%s\n",usr_help_str);
-            exit (0);
-            }*/
-    }
-    if (filename==0) {
-        puts(usr_help_str);
-        exit(1);
-    }
-
-    return filename;
-}
-
-
 int
 TMmain(TaskManager *manager, int argc, char *argv[])
 {
 
     char *filename = 0;
-    filename = init(argc, argv);
+    FileMapReduce *fmp = new FileMapReduce(manager,TASK_EXEC,TASK_EXEC_DATA_PARALLEL,TASK_PRINT);
+    filename = fmp->init(argc, argv);
 
     if (filename < 0) {
         return -1;
@@ -483,8 +36,7 @@
 
     task_init();
     st_time = getTime();
-    run_start(manager, filename);
-    // FileMapReduce fmp = new FileMapReduce(manager, filename, TASK_EXEC, TASK_PRINT);
+    fmp->run_start(manager, filename);
     // fmp->start();
     manager->set_TMend(TMend);
     return 0;
--- a/example/word_count/ppe/Read.cc	Thu Jan 28 15:43:36 2016 +0900
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,30 +0,0 @@
-#include <stdio.h>
-#include <string.h>
-#include <unistd.h>
-#include <sys/stat.h>
-#include <iostream>
-#include <vector>
-#include <cstdlib>
-
-#include "SchedTask.h"
-#include "Print.h"
-#include "Func.h"
-#include "WordCount.h"
-
-/* これは必須 */
-SchedDefineTask1(READ_TASK,read_task);
-
-static int
-read_task(SchedTask *s, void *rbuf, void *wbuf)
-{
-    long fd = (long)s->get_param(0);
-    long start_read_position = (long)s->get_param(1);
-    long end_read_position = (long)s->get_param(2);
-    char *read_text = (char*)s->get_output(wbuf,0);
-
-    long read_size = end_read_position - start_read_position;
-
-    pread(fd, read_text, read_size , start_read_position);
-
-    return 0;
-}
--- a/example/word_count/ppe/Read.h	Thu Jan 28 15:43:36 2016 +0900
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,7 +0,0 @@
-#ifndef INCLUDED_TASK_HELLO
-#define INCLUDED_TASK_HELLO
-
-#include "SchedTask.h"
-
-
-#endif
--- a/example/word_count/task_init.cc	Thu Jan 28 15:43:36 2016 +0900
+++ b/example/word_count/task_init.cc	Thu Jan 28 17:23:45 2016 +0900
@@ -8,12 +8,9 @@
 #endif
 
 /* 必ずこの位置に書いて */
-SchedExternTask(READ_TASK);
-SchedExternTask(BREAD_RUN_TASK_BLOCKS);
 SchedExternTask(Exec);
 SchedExternTask(Exec_Data_Parallel);
 SchedExternTask(Print);
-SchedExternTask(RUN_TASK_BLOCKS);
 
 /**
  * この関数は ../spe/spe-main と違って
@@ -34,10 +31,5 @@
 
     SchedRegisterTask(TASK_EXEC, Exec);
     SchedRegisterTask(TASK_EXEC_DATA_PARALLEL, Exec);
-
-    SchedRegister(READ_TASK);
-    SchedRegister(BREAD_RUN_TASK_BLOCKS);
-
     SchedRegisterTask(TASK_PRINT, Print);
-    SchedRegister(RUN_TASK_BLOCKS);
 }