view example/test/main.cc @ 0:04e28d8d3c6f

first commit
author Daiki KINJYO <e085722@ie.u-ryukyu.ac.jp>
date Mon, 08 Nov 2010 01:23:25 +0900
parents
children 2f90ce50a0af
line wrap: on
line source

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include "TaskManager.h"
#include "SchedTask.h"
#include "Func.h"
#include "WordGrep.h"
extern void task_init();

int all = 0;
int use_task_array = 1;
int use_compat = 0;
int array_task_num = 8;int spe_num = 1;
int num = 0;
/*
static int blocks = 48;
static int division = 1024*10;
static int division_int = 4096;
*/

static int blocks = 1;
static int division = 5;
static int division_int = 5;

//char *sstr;
int first_over_lap;
int over_lap;

#define READBUF 1024*16

const char *usr_help_str = "Usage: ./word_count [-a -c -s] [-cpu spe_num] [-file filename]\n";

SchedDefineTask1(RUN_TASK_BLOCKS,run16);

typedef struct {
	caddr_t file_mmap;
	off_t size;
}st_mmap_t;

/*与えられたsizeをfix_byte_sizeの倍数にする*/
static int fix_byte(int size,int fix_byte_size){
	size = (size/fix_byte_size)*fix_byte_size  + ((size%fix_byte_size)!= 0)*fix_byte_size;
	return size;
}

static st_mmap_t my_mmap(char *filename){
	/*マッピングしてその場所を返す*/
	int fd = -1;
	int map = MAP_PRIVATE;
	st_mmap_t st_mmap;
	struct stat sb;
	/*ファイルオープン*/
	if ((fd=open(filename,O_RDONLY,0666))==0) {
		fprintf(stderr,"can't open %s\n",filename);
	}
	/*ファイルが使われていないか*/
	if (fstat(fd,&sb)) {
		fprintf(stderr,"can't fstat %s\n",filename);
	}
	/*sizeをページングサイズの倍数にあわせる*/
	st_mmap.size = fix_byte(sb.st_size,division_int);
	/*マッピングしてその場所を返す*/
	st_mmap.file_mmap = (char*)mmap(NULL,st_mmap.size,PROT_READ,map,fd,(off_t)0);
	if (st_mmap.file_mmap == (caddr_t)-1) {
		perror(NULL);
		exit(0);
	}
	return st_mmap;
}

/*タスク生成*/
static void run_tasks(SchedTask *manager, WordGrep *w, int task_count, HTaskPtr t_next, int size) {
	over_lap = first_over_lap;
	/*タスク数が1つのspeで処理する数より少なければその数を処理する*/
	if (task_count < array_task_num) {
		array_task_num = task_count;
	}
	
	/*task_arrayを使う場合*/
	if (use_task_array) {
		/*1度に処理できる量(1つのspeで処理できる量*spe数)*/
		int spl = spe_num * array_task_num;
		/*タスク数/spe数=loop回数*/
		int loop = (task_count + spl - 1) / spl;
		for (int i = 0; i < loop; i += 1) {
			
			if (spl > w->task_num) {/*1度に処理できる場合*/
				if (w->task_num >= spe_num) {/*それぞれのspeに均等にタスクを分ける*/
					array_task_num = w->task_num / spe_num;
				} else {/*spe数がタスク数よりも多い場合*/
					int task_num = w->task_num;
					for (int j = 0; j < task_num; j++) {
						HTask *h_exec = 0;
						int i = w->task_spwaned++;
					if (w->size < size + over_lap) {/*division_sizeがファイルサイズより小さい場合その分だけ処理する*/
						size = w->size;
						over_lap = 0;
					}
						/*オーバーラップした文字列を生成してタスク生成*/
						h_exec = manager->create_task(
								TASK_EXEC,/*Ececタスクを作る*/
								w->file_mmap + i*w->division_size,/*マッピングされたアドレス*/
								size+over_lap,/*division_size(in*/
								(memaddr)(w->o_data + i*w->out_size),/*出力先アドレス*/
								w->division_out_size);/*division_size(out*/
						if (all) {/*タスクをまとめて実行する場合*/
							w->t_print->wait_for(h_exec);
						} else {
							t_next->wait_for(h_exec);
						}
						h_exec->set_cpu(SPE_ANY);
						h_exec->spawn();
						/*終了した分サイズをタスク数を引く*/
						w->size -= size;
						w->task_num--;
					}
					return;
				}
			}
			
			HTask **task_array = (HTask**)manager->allocate(sizeof(HTask*)*spe_num);/*空のタスクの配列を作成*/
			Task **t_exec = (Task**)manager->allocate(sizeof(Task*)*spe_num);/*空のタスクの配列を作成*/
			
			for (int k = 0; k < spe_num; k++) {/*タスクの配列に空のタスクを用意する*/
				task_array[k] = manager->create_task_array(TASK_EXEC,array_task_num,0,1,1);
				t_exec[k] = 0;
				
				if (all) {/*タスクをまとめて実行する場合*/
					w->t_print->wait_for(task_array[k]);
				} else {
					t_next->wait_for(task_array[k]);
				}
			}
			for (int j = 0; j < array_task_num; j++) {/*タスクの配列にタスクを入れていく*/
				for (int k = 0; k < spe_num; k++) {
					int a = w->task_spwaned++;
					if (w->size < size + over_lap) {
						size = w->size;
						over_lap = 0;
					}
					
					/*オーバーラップした文字列を生成してタスク生成*/
					
					
					t_exec[k] = task_array[k]->next_task_array(TASK_EXEC,t_exec[k]);
					t_exec[k]->set_inData(0,w->file_mmap + a*w->division_size, size+over_lap);/*入力データ 0,マッピングされた場所,division_size*/
					t_exec[k]->set_outData(0,w->o_data + a*w->out_size, w->division_out_size);/*出力データ 0,マッピングする場所,division_size(out*/
					
					w->size -= size;
					w->task_num--;
				}
			}
			for (int k = 0; k < spe_num; k++) {/*実行する*/
				task_array[k]->spawn_task_array(t_exec[k]->next());
				task_array[k]->set_cpu(SPE_ANY);
				task_array[k]->spawn();
			}
		}
		return;
	}
	
	/*task_arrayを使わない場合*/
	for (int i = 0; i < task_count; i += array_task_num) {
		HTask *h_exec = 0;
		for (int j = 0; j < array_task_num; j++) {
			int i = w->task_spwaned++;
			if (w->size < size + over_lap) {
				size = w->size;
				over_lap = 0;
			}
			if (size==0) break;
			/*compatを使う場合(互換モード?*/
			if (use_compat) {
				/*オーバーラップした文字列を生成してタスク生成*/
				h_exec = manager->create_task(TASK_EXEC);
				h_exec->set_inData(0,w->file_mmap + i*w->division_size, size+over_lap);
				h_exec->set_outData(0,w->o_data + i*w->out_size, w->division_out_size);
				if (all) {/*タスクをまとめて実行する場合*/
					w->t_print->wait_for(h_exec);
				} else {
					t_next->wait_for(h_exec);
				}
				/*タスク実行*/
				h_exec->set_cpu(SPE_ANY);
				h_exec->spawn();
			} else {/*compatを使わない場合*/
				/*オーバーラップした文字列を生成してタスク生成*/
				h_exec = manager->create_task(TASK_EXEC,
							w->file_mmap + i*w->division_size,
							size+over_lap,
							(memaddr)(w->o_data + i*w->out_size),
							w->division_out_size+over_lap);
				if (all) {/*タスクをまとめて実行する場合*/
					w->t_print->wait_for(h_exec);
				} else {
					t_next->wait_for(h_exec);
				}
				/*タスク実行*/
				h_exec->set_cpu(SPE_ANY);
				h_exec->spawn();
			}
			/*終了した分サイズをタスク数を引く*/
			w->size -= size;
			w->task_num--;
		}
	}
}

/**
*   このTaskは、PPE上で実行されるので、並列に実行されることはない
*   二つ実行されていて、Task が足りなくなることがないようにしている。
*/

static int run16(SchedTask *manager, void *in, void *out){
	WordGrep *w = *(WordGrep **)in;
	/*タスク数が1ブロックで処理できる数よりも少ない場合*/
	if (w->task_num <= w->task_blocks) {/*if (!(w->size > w->division_size))でも同じ*/
		if (w->size == w->division_size) {
			run_tasks(manager,w,w->task_num, w->t_print, w->division_size);
		} else {
			run_tasks(manager,w,1, w->t_print, w->size);
		}
	} else {
		HTaskPtr t_next = manager->create_task(RUN_TASK_BLOCKS,(memaddr)&w->self,sizeof(memaddr),0,0);
		w->t_print->wait_for(t_next);
		run_tasks(manager,w, w->task_blocks, t_next, w->division_size);
		t_next->spawn();
	}
	return 0;
}



static void run_start(TaskManager *manager, char *filename){
	HTaskPtr t_print;
	st_mmap_t st_mmap;
	st_mmap = my_mmap(filename);
	WordGrep *w = (WordGrep*)manager->allocate(sizeof(WordGrep));
	w->self = w;
	w->task_spwaned = 0;
	
	/*sizeはdivision_sizeの倍数にしている。*/
	w->size = w->file_size = st_mmap.size;
	w->file_mmap = st_mmap.file_mmap;
	/* 1task分のデータサイズ(byte) */
	if (w->size >= division) {
		w->division_size = division;
	} else {
		w->division_size = w->size;
	}
	/* "word num" and "line num" */
	w->status_num = 2;
	/* taskの数 */
	w->task_num = w->size / w->division_size;
	w->task_num = w->task_num + (w->division_size*w->task_num < w->size);
	
	
	int out_task_num = w->task_num;
	w->out_task_num = out_task_num;
	
	if(!all) {
		w->task_blocks = blocks;
	} else {
		w->task_blocks = w->task_num;
	}
	
	
	
	/* out用のdivision_size. statusが2つなので、最大16byteになるように*/
	w->division_out_size = sizeof(unsigned long long)*4;
	int out_size = w->division_out_size*out_task_num;
	w->o_data = (int *)manager->allocate(out_size);
	w->out_size = 4;
	
	
	
	
	/*各SPEの結果を合計して出力するタスク*/
	t_print = manager->create_task(TASK_PRINT,(memaddr)&w->self,sizeof(memaddr),0,0);
	w->t_print = t_print;
	for(int i = 0;i<1;i++) {
		/* Task を task_blocks ずつ起動する Task */
		/* serialize されていると仮定する... */
		HTaskPtr t_exec = manager->create_task(RUN_TASK_BLOCKS,(memaddr)&w->self,sizeof(memaddr),0,0);
		t_print->wait_for(t_exec);
		t_exec->spawn();
	}
	t_print->spawn();
}

static char* init(int argc, char **argv){
	char *filename = 0;
	for (int i = 1; argv[i]; ++i) {	
		if (strcmp(argv[i], "-file") == 0) {
			filename = argv[i+1];
		} else if (strcmp(argv[i], "-division") == 0) {
			division = atoi(argv[i+1]);
		} else if (strcmp(argv[i], "-block") == 0) {
			blocks = atoi(argv[i+1]);
		} else if (strcmp(argv[i], "-a") == 0) {
			all = 1;
		} else if (strcmp(argv[i], "-c") == 0) {
			use_task_array = 0;
			use_compat = 1;
		} else if (strcmp(argv[i], "-s") == 0) {
			use_task_array = 0;
			use_compat = 0;
		} else if (strcmp(argv[i], "-anum") == 0) {
			array_task_num = atoi(argv[i+1]);
		} else if (strcmp(argv[i], "-cpu") == 0) {
			spe_num = atoi(argv[i+1]);
		}
	}
	first_over_lap = 2;
	over_lap = first_over_lap;
	if (filename==0) {
		puts(usr_help_str);
		exit(1);
	}
	return filename;
}


int TMmain(TaskManager *manager, int argc, char *argv[]){
	char *filename = 0;
	//引数の入力
	filename = init(argc, argv);
	if (filename < 0) {
		return -1;
	}
	task_init();
	run_start(manager, filename);
	return 0;
}