diff example/word_count/main.cc @ 946:a8b6ee80c108

unify word count examples....
author Shinji KONO <kono@ie.u-ryukyu.ac.jp>
date Sun, 01 Aug 2010 21:05:35 +0900
parents 45c141669de7
children 6a8941ee8294
line wrap: on
line diff
--- a/example/word_count/main.cc	Sun Aug 01 19:29:27 2010 +0900
+++ b/example/word_count/main.cc	Sun Aug 01 21:05:35 2010 +0900
@@ -7,11 +7,17 @@
 #include <fcntl.h>
 #include <unistd.h>
 #include "TaskManager.h"
+#include "SchedTask.h"
 #include "Func.h"
+#include "WordCount.h"
 
 extern void task_init();
 
-const char *usr_help_str = "Usage: ./word_count [-cpu spe_num] [-file filename]\n";
+int all = 0;
+int use_task_array = 1;
+int use_compat = 0;
+
+const char *usr_help_str = "Usage: ./word_count [-a -c -s] [-cpu spe_num] [-file filename]\n";
 
 typedef struct {
     caddr_t file_mmap;
@@ -19,8 +25,9 @@
 } st_mmap_t;
 
 
+
 /*与えられたsizeをfix_byte_sizeの倍数にする(丸め込むっていうのかな?)*/
-int
+static int
 fix_byte(int size,int fix_byte_size)
 {
     size = (size/fix_byte_size)*fix_byte_size  + ((size%fix_byte_size)!= 0)*fix_byte_size;
@@ -29,7 +36,7 @@
 }
 
 
-st_mmap_t
+static st_mmap_t
 my_mmap(char *filename)
 {
 
@@ -65,115 +72,158 @@
 
 }
 
+static void
+run_tasks(SchedTask *manager, WordCount *w, int task_count, HTaskPtr t_next, int size) 
+{
+  
+  int array_task_num = 8;
+  if (task_count < array_task_num) {
+    array_task_num = task_count;
+  }
+  
+  for (int i = 0; i < task_count; i += array_task_num) {
+        
+    HTask *task_array;
+    if (use_task_array) {
+	task_array = manager->create_task_array(TASK_EXEC,array_task_num,0,1,1);
+	if (!all) t_next->wait_for(task_array);
+    }
+    
+    Task *t_exec = 0;
+    HTask *h_exec;
+    for (int j = 0; j < array_task_num; j++) {
+	int i = w->task_spwaned++;
+	if (use_task_array) {
+	    t_exec = task_array->next_task_array(TASK_EXEC,t_exec);
+	    t_exec->set_inData(0,w->file_mmap + i*w->division_size, size);
+	    t_exec->set_outData(0,w->o_data + i*w->out_size, w->division_out_size);
+	} else if (use_compat) {
+	    h_exec = manager->create_task(TASK_EXEC);
+	    h_exec->set_inData(0,w->file_mmap + i*w->division_size, size);
+	    h_exec->set_outData(0,w->o_data + i*w->out_size, w->division_out_size);
+	    h_exec->set_cpu(SPE_ANY);
+	    h_exec->spawn();
+	} else {
+	    h_exec = manager->create_task(TASK_EXEC,
+		(memaddr)(w->file_mmap + i*w->division_size), size,
+		(memaddr)(w->o_data + i*w->out_size), w->division_out_size);
+	    h_exec->set_cpu(SPE_ANY);
+	    h_exec->spawn();
+	}
+	w->size -= size;
+	w->task_num--;
+    }
+    if (!use_task_array) {
+	task_array->spawn_task_array(t_exec->next());
+	task_array->set_cpu(SPE_ANY);
+	task_array->spawn();
+    } else {
+	if (!all) t_next->wait_for(h_exec);
+    }
+  } 
+}
 
-void
+/**
+ *   このTaskは、PPE上で実行されるので、並列に実行されることはない
+ *   二つ実行されていて、Task が足りなくなることがないようにしている。
+ */
+
+SchedDefineTask1(RUN_TASK_BLOCKS,run16);
+
+static int
+run16(SchedTask *manager, void *in, void *out)
+{
+    WordCount *w = *(WordCount **)in;
+   
+    if (w->task_num < w->task_blocks) {
+	if (w->size >= w->division_size) 
+	    run_tasks(manager,w,w->task_num, w->t_print, w->division_size);
+	while (w->size>0) 
+	    run_tasks(manager,w,1, w->t_print, w->size);
+	// printf("run16 last %d\n",w->task_num);
+    } else {
+	HTaskPtr t_next = manager->create_task(RUN_TASK_BLOCKS,
+	    (memaddr)&w->self,sizeof(memaddr),0,0);
+	w->t_print->wait_for(t_next);
+
+	run_tasks(manager,w, w->task_blocks, t_next, w->division_size);
+
+	t_next->spawn();
+	// printf("run16 next %d\n",w->task_num);
+    }
+    return 0;
+}
+
+
+static int blocks = 48;
+static int division = 16; // in Kbyte
+
+static void
 run_start(TaskManager *manager, char *filename)
 {
-    HTaskPtr t_exec;
     HTaskPtr t_print;
 
     st_mmap_t st_mmap;
     st_mmap = my_mmap(filename);
+    WordCount *w = (WordCount*)manager->allocate(sizeof(WordCount));
+    // bzero(w,sizeof(WordCount));
 
+    w->self = w;
+    w->task_blocks = blocks;
+    w->task_spwaned = 0;
 
     /*sizeはdivision_sizeの倍数にしている。*/
-    int size = st_mmap.size;
-    char *file_mmap = st_mmap.file_mmap;
+    w->size = st_mmap.size;
+    w->file_mmap = st_mmap.file_mmap;
 
     /* 1task分のデータサイズ(byte) */
-    int division_size;
-    if (size >= 1024*16) {
-	division_size = 1024 * 16;/*16kbyte*/
-    }
-    else {
-	division_size = size;
+    if (w->size >= 1024*division) {
+	w->division_size = 1024 * division;/*16kbyte*/
+    } else {
+	w->division_size = w->size;
     }
 
-    printf("dvision_size %d\n",division_size);
+    printf("dvision_size %d\n",w->division_size);
 
     /* "word num" and "line num" */
-    int status_num = 2;
+    w->status_num = 2;
     /* taskの数 */
-    int task_num = size / division_size;
-    int out_task_num = task_num + (division_size*task_num < size);
+    w->task_num = w->size / w->division_size;
+    w->task_num = w->task_num + (w->division_size*w->task_num < w->size);
+    int out_task_num = w->task_num;
 
-    printf("task_num %d\n",task_num);
+    w->out_task_num = out_task_num;
+    printf("task_num %d\n",w->task_num);
+    printf("out_task_num %d\n",w->out_task_num);
 
     /* out用のdivision_size. statusが2つなので、あわせて16byteになるように、long long(8byte)を使用 */
-    int division_out_size = 16;
-    /* out用のデータのサイズ。*/
-    int out_size = division_out_size*out_task_num;
-    unsigned long long *o_data = (unsigned long long*)manager->allocate(out_size);
 
-
+    w->division_out_size = sizeof(unsigned long long)*4;
+    int out_size = w->division_out_size*out_task_num;
+    w->o_data = (unsigned long long *)manager->allocate(out_size);
+    w->out_size = 4;
     printf("out size %d\n",out_size);
 
     /*各SPEの結果を合計して出力するタスク*/
 
-    t_print = manager->create_task(TASK_PRINT);
-    t_print->set_inData(0,o_data, out_size);
-    t_print->set_param(0,out_task_num);
-    t_print->set_param(1,status_num);
-
-    /*各SPEに処理するデータを割り振る。*/
+    t_print = manager->create_task(TASK_PRINT,
+	(memaddr)&w->self,sizeof(memaddr),0,0);
 
-    /*渡すデータの最後が文字かどうか。(スペース、改行以外)*/
-    int word_flag = 0;
-    int index = 0;
-    int array_task_num = 32;
-    for (int i = 0; i < task_num; i += array_task_num) {
-	if (task_num-i < array_task_num) array_task_num = task_num-i;
-	HTask *task_main = manager->create_task_array(TASK_EXEC,array_task_num,2,1,1);
-	t_print->wait_for(task_main);
-	int j ;
-	Task *t_exec = 0;
-	for(j = 0; j < array_task_num ; j++) {
-	    t_exec = task_main->next_task_array(TASK_EXEC,t_exec);
-
-	    // t_exec = manager->create_task(TASK_EXEC);
-	    t_exec->set_param(0,division_size);
-	    t_exec->set_param(1,word_flag);
-
-	    //printf("%c",file_mmap[index*division_size]);
+    w->t_print = t_print;
 
-	    t_exec->set_inData(0,&file_mmap[index*division_size], division_size);
-	    t_exec->set_outData(0,&o_data[index*status_num], division_out_size);
-
-	    word_flag = ((file_mmap[(index+1)*division_size-1] != 0x20) && (file_mmap[(index+1)*division_size-1] != 0x0A)); 
-
-	    size -= division_size;
-	    index++;
-
-       }
-
-       task_main->spawn_task_array(t_exec->next());
-       task_main->set_cpu(SPE_ANY);
-       task_main->spawn();
-
-    }
-
-    //printf("size %d\n",size);
-
-    while (size>0) {
-
-	t_exec = manager->create_task(TASK_EXEC);
-	t_exec->set_param(0,size);
-	t_exec->set_param(1,word_flag);
-	t_exec->set_inData(0,file_mmap + index*division_size, size);
-	t_exec->set_outData(0,o_data + index*status_num, division_out_size);
-	t_exec->set_cpu(SPE_ANY);
+    for(int i = 0;i<2;i++) {
+	/* Task を task_blocks ずつ起動する Task */
+        /* serialize されていると仮定する... */
+	HTaskPtr t_exec = manager->create_task(RUN_TASK_BLOCKS,
+	    (memaddr)&w->self,sizeof(memaddr),0,0);
 	t_print->wait_for(t_exec);
 	t_exec->spawn();
-	index++;
-	size -= division_size;
     }
 
-    //printf("size %d\n", st_mmap.size - index*division_size);
-
     t_print->spawn();
 }
 
-char*
+static char*
 init(int argc, char **argv)
 {
     
@@ -182,16 +232,30 @@
     for (int i = 1; argv[i]; ++i) {	
 	if (strcmp(argv[i], "-file") == 0) {
 	    filename = argv[i+1];
+	} else if (strcmp(argv[i], "-division") == 0) {
+	    division = atoi(argv[i+1]);
+	} else if (strcmp(argv[i], "-block") == 0) {
+	    blocks = atoi(argv[i+1]);
+	} else if (strcmp(argv[i], "-a") == 0) {
+	    // create task all at once
+	    all = 1;
+	} else if (strcmp(argv[i], "-c") == 0) {
+	    use_task_array = 0;
+	    use_compat = 1;
+	} else if (strcmp(argv[i], "-s") == 0) {
+	    use_task_array = 0;
+	    use_compat = 0;
 	}
     }
     if (filename==0) {
-        printf("usage: %s [-count 10] -file filename\n",argv[0]);
+        puts(usr_help_str);
 	exit(1);
     }
     
     return filename;
 }
 
+
 int
 TMmain(TaskManager *manager, int argc, char *argv[])
 {
@@ -208,3 +272,5 @@
 
     return 0;
 }
+
+/* end */