changeset 1979:889472b0e6d5 draft

implement blocked read (not running)
author Masataka Kohagura <e085726@ie.u-ryukyu.ac.jp>
date Thu, 13 Mar 2014 02:43:35 +0900
parents 8fbe022126e1
children aa5fabf2d4b2
files example/word_count/WordCount.h example/word_count/main.cc
diffstat 2 files changed, 104 insertions(+), 37 deletions(-) [+]
line wrap: on
line diff
--- a/example/word_count/WordCount.h	Thu Mar 13 02:15:06 2014 +0900
+++ b/example/word_count/WordCount.h	Thu Mar 13 02:43:35 2014 +0900
@@ -1,32 +1,25 @@
 
 typedef struct wordCount {
     struct wordCount *self;
-    int fd;
-    int read_division_size;
-    int read_task_number;
-    int read_task_num;
-    int read_left_task_num;
-    int read_filesize;
-    int read_left_size;
-    int read_task_blocks;
-    char *read_text;
+    long fd;
+    long read_filesize;
     CPU_TYPE read_cpu;
 
-    int size;             // remaining file size
-    int division_size;    // for each word count task
-    int division_out_size;    
-    int out_size;    
-    int out_size_;
-    int task_num;         // remaining task count
-    int task_blocks;      // spawn task one at a time
-    int status_num; 
-    int task_spwaned;
+    long size;             // remaining file size
+    long division_size;    // for each word count task
+    long division_out_size;
+    long out_size;
+    long out_size_;
+    long task_num;         // remaining task count
+    long task_blocks;      // spawn task one at a time
+    long status_num;
+    long task_spawned;
     unsigned long long *o_data;
     unsigned long long *head_tail_flag;
-    int out_task_num;
-    int pad; 
+    long out_task_num;
+    long pad;
     char * file_mmap;
-    int file_size;
+    long file_size;
     HTaskPtr t_print;
     HTaskPtr t_exec;
 } WordCount, *WordCountPtr;
--- a/example/word_count/main.cc	Thu Mar 13 02:15:06 2014 +0900
+++ b/example/word_count/main.cc	Thu Mar 13 02:43:35 2014 +0900
@@ -29,6 +29,7 @@
 int array_task_num = 11;
 int spe_num = 1;
 CPU_TYPE spe_cpu = SPE_ANY;
+CPU_TYPE read_spe_cpu = SPE_ANY;
 const char *usr_help_str = "Usage: ./word_count [-a -c -s] [-cpu spe_num] [-g] [-file filename]\n";
 
 static double
@@ -114,7 +115,7 @@
         Task *t_exec = 0;
         HTask *h_exec = 0;
         for (int j = 0; j < array_task_num; j++) {
-            int i = w->task_spwaned++;
+            int i = w->task_spawned++;
             if (w->size < size) size = w->size;
             if (size==0) break;
             if (use_task_array) {
@@ -152,7 +153,7 @@
                 h_exec->iterate(array_task_num);
                 
                 w->task_num -= array_task_num;
-                w->task_spwaned += array_task_num-1;
+                w->task_spawned += array_task_num-1;
                 
                 if(w->size < 0) {
                     h_exec = manager->create_task(TASK_EXEC_DATA_PARALLEL);
@@ -161,7 +162,7 @@
                     h_exec->set_inData(1,w->o_data,w->out_size_);
                     h_exec->set_outData(0,w->file_mmap,w->file_size);
                     h_exec->set_outData(1,w->o_data,w->out_size_);
-                    h_exec->set_param(0,(long)w->task_spwaned);
+                    h_exec->set_param(0,(long)w->task_spawned);
                     h_exec->set_param(1,(long)w->division_size);
                     h_exec->set_param(2,(long)(size+w->size));
                     h_exec->set_param(3,(long)w->out_size);
@@ -171,7 +172,7 @@
                     h_exec->iterate(1);
 
                     w->task_num -= 1;
-                    w->task_spwaned += 1;
+                    w->task_spawned += 1;
                     array_task_num += 1;
                 }
                 break;
@@ -201,6 +202,49 @@
  *   二つ実行されていて、Task が足りなくなることがないようにしている。
  */
 
+SchedDefineTask1(BREAD_RUN_TASK_BLOCKS,bread_run16);
+
+static int
+bread_run16(SchedTask *manager, void *in, void *out)
+{
+    WordCount *w = *(WordCount **)in;
+
+    HTaskPtr t_read = manager->create_task(READ_TASK);
+    w->t_print->wait_for(t_read);
+    t_read->set_cpu(read_spe_cpu);
+    t_read->set_param(0,w->fd);
+
+    if (w->task_num < w->task_blocks) {
+        t_read->set_param(1,w->task_spawned*w->division_size);
+        t_read->set_outData(0, w->file_mmap + w->task_spawned * w->division_size, w->task_blocks * w->division_size);
+
+        // last case
+        while (w->size >= w->division_size)
+            run_tasks(manager,w,w->task_num,t_read,w->t_print, w->division_size);
+        // remaining data
+        while (w->size>0)
+            run_tasks(manager,w,1,t_read,w->t_print, w->division_size);
+
+        t_read->set_param(2,w->task_spawned*w->division_size);
+        t_read->spawn();
+    } else {
+        HTaskPtr t_next = manager->create_task(BREAD_RUN_TASK_BLOCKS,
+                                               (memaddr)&w->self,sizeof(memaddr),0,0);
+        w->t_print->wait_for(t_next);
+
+        t_read->set_param(1,w->task_spawned*w->division_size);
+        t_read->set_outData(0, w->file_mmap + w->task_spawned * w->division_size, w->task_blocks * w->division_size);
+
+        run_tasks(manager,w, w->task_blocks, t_read, t_next, w->division_size + w->extra_len);
+
+        t_read->set_param(2,w->task_spawned*w->division_size + w->extra_len);
+
+        t_read->spawn();
+        t_next->spawn();
+    }
+    return 0;
+}
+
 SchedDefineTask1(RUN_TASK_BLOCKS,run16);
 
 static int
@@ -238,22 +282,46 @@
 static void
 run_start(TaskManager *manager, char *filename)
 {
-    HTaskPtr t_print;
+    long   fd = (long)manager->allocate(sizeof(long));
+    struct stat *sb = (struct stat*)manager->allocate(sizeof(struct stat));
+    HTaskPtr t_exec;
+
+    if ((fd=open(filename,O_RDONLY,0666))==0) {
+        fprintf(stderr,"can't open %s\n",filename);
+        return ;
+    }
+
+    if (fstat(fd,sb)) {
+        fprintf(stderr,"can't fstat %s\n",filename);
+        return ;
+    }
 
-    st_mmap_t st_mmap;
-    st_mmap = my_mmap(filename);
-    WordCount *w = (WordCount*)manager->allocate(sizeof(WordCount));
-    // bzero(w,sizeof(WordCount));
+    WordCountPtr w = (WordCountPtr)manager->allocate(sizeof(WordCount));
+
+    w->self = w;
+    w->fd = fd;
+    w->read_filesize = sb->st_size;
+
+
+    if (block_read_flag == 1) {
+        printf("[block read mode]\n");
+        w->file_mmap = (char*)manager->allocate(w->read_filesize);
+    }else {
+        printf("[mmap mode]\n");
+        my_mmap(filename, w);
+    }
+
+    HTaskPtr t_print;
 
     //w->task_blocks = blocks;
     w->self = w;
-    w->task_spwaned = 0;
+    w->task_spawned = 0;
 
     /*sizeはdivision_sizeの倍数にしている。*/
     w->size = w->file_size = st_mmap.size;
     w->file_mmap = st_mmap.file_mmap;
     printf("w %lx\n",(long)w);
-    
+
     /* 1task分のデータサイズ(byte) */
     if (w->size >= 1024*division) {
         w->division_size = 1024 * division;/*16kbyte*/
@@ -261,7 +329,7 @@
         w->division_size = w->size;
     }
 
-    printf("dvision_size %d\n",w->division_size);
+    printf("dvision_size %ld\n",w->division_size);
 
     /* "word num" and "line num" */
     w->status_num = 2;
@@ -277,8 +345,8 @@
     }
 
     w->out_task_num = out_task_num;
-    printf("task_num %d\n",w->task_num);
-    printf("out_task_num %d\n",w->out_task_num);
+    printf("task_num %ld\n",w->task_num);
+    printf("out_task_num %ld\n",w->out_task_num);
 
     /* out用のdivision_size. statusが2つなので、あわせて16byteになるように、long long(4byte)を使用 */
 
@@ -297,8 +365,14 @@
     for(int i=0;i<4;i++) {
         /* Task を task_blocks ずつ起動する Task */
         /* serialize されていると仮定する... */
-        HTaskPtr t_exec = manager->create_task(RUN_TASK_BLOCKS,
-                                               (memaddr)&w->self,sizeof(memaddr),0,0);
+        if (block_read_flag == 1) {
+            t_exec = manager->create_task(BREAD_RUN_TASK_BLOCKS,
+                                                   (memaddr)&w->self,sizeof(memaddr),0,0);
+        }else {
+            t_exec = manager->create_task(RUN_TASK_BLOCKS,
+                                                   (memaddr)&w->self,sizeof(memaddr),0,0);
+        }
+
         t_print->wait_for(t_exec);
         //    t_exec->iterate(4);
         t_exec->spawn();