view example/word_count3/main.cc @ 619:0decff4e867b

RenewTask removal
author Shinji KONO <kono@ie.u-ryukyu.ac.jp>
date Sun, 15 Nov 2009 02:02:30 +0900
parents 7d9d209bdc82
children 60aa3f241b10
line wrap: on
line source

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include "TaskManager.h"
#include "Func.h"

#define TASKBUFF (1)

extern void task_init(void);

static TaskManager *manager;

static int count = 1;
static char *file = NULL;

const char *usr_help_str = "Usage: ./post [-cpu spe_num] [-count N]\n\
  -count  Number of print \"Hello, World!!\"";


struct task_list_t {
  HTaskPtr task;
  task_list_t *next;
};

typedef struct {
  caddr_t file_mmap;
  off_t size;
} st_mmap_t;


typedef struct {

  /*16task毎に回す為に必要な変数*/
  task_list_t *task_list;
  HTaskPtr run_wait_task;
  int *task_num;
  int *sum_task_num;

  /*dataを分割する際に必要な変数*/
  int *file_size;
  int *division_size;
  int *division_out_size;
  int *status_num;
  int *out_task_num;
  int *out_size;
  int *sum_i;
  int *word_flag;
  caddr_t file_mmap;
  unsigned long long *o_data;
  
} data_list_t;

/*与えられたsizeをfix_byte_sizeの倍数にする(丸め込むっていうのかな?)*/
int
fix_byte(int size,int fix_byte_size)
{
    size = (size/fix_byte_size)*fix_byte_size  + ((size%fix_byte_size)!= 0)*fix_byte_size;
    
    return size;
}

void
create_data(char *filename, data_list_t *data)
{

  data->file_size = (int*)manager->allocate(sizeof(int));
  data->division_size = (int*)manager->allocate(sizeof(int));
  data->division_out_size = (int*)manager->allocate(sizeof(int));
  data->status_num = (int*)manager->allocate(sizeof(int));
  data->out_size = (int*)manager->allocate(sizeof(int));
  data->out_task_num = (int*)manager->allocate(sizeof(int));
  data->task_num = (int*)manager->allocate(sizeof(int));
  data->sum_task_num = (int*)manager->allocate(sizeof(int));
  data->sum_i = (int*)manager->allocate(sizeof(int));
  *data->sum_i = 0;
  data->word_flag = (int*)manager->allocate(sizeof(int));
  *data->word_flag = 0;
  data->run_wait_task = (HTaskPtr)manager->allocate(sizeof(HTask));
  data->run_wait_task = manager->create_task(WAIT_TASK);



  /*マッピングだよ!*/
  int fd = -1;
  int map = MAP_PRIVATE;
  off_t size;
  struct stat sb;
  
  if ((fd=open(filename,O_RDONLY,0666))==0) {
    fprintf(stderr,"can't open %s\n",filename);
    }
  
  if (fstat(fd,&sb)) {
	fprintf(stderr,"can't fstat %s\n",filename);
  }
  
  printf("file size %d\n",(int)sb.st_size);
  
  /*sizeをページングサイズの倍数にあわせる*/
  size = fix_byte(sb.st_size,4096);
  
  printf("fix 4096byte file size %d\n",(int)size);
  
  data->file_mmap = (char*)mmap(NULL,size,PROT_READ,map,fd,(off_t)0);
  if (data->file_mmap == (caddr_t)-1) {
    fprintf(stderr,"Can't mmap file\n");
    perror(NULL);
    exit(0);
    }
  
  if (size >= 4096*4) {
    *data->division_size = 4096 * 4; /*16kbyte*/
  }
  else {
    *data->division_size = size;
  }
  
  *data->file_size = size;

  /* status が 2つなので、8 * 2 = 16; 8 は unsinged long long*/
  *data->division_out_size = 16;
  
  /*"word num" and "line num"*/
  *data->status_num = 2;
  *data->sum_task_num = size / (*data->division_size);

  if (*data->sum_task_num < TASKBUFF) {
    *data->task_num = *data->sum_task_num;
  }
  else {
    *data->task_num = TASKBUFF;
  }

  *data->out_task_num = *data->sum_task_num + ( (*data->division_size) * (*data->sum_task_num) < size);
  
  *data->out_size = (*data->division_out_size) * (*data->out_task_num);
  data->o_data = (unsigned long long*)manager->allocate(*data->out_size);
  

}

void
func2(SchedTask *s, void *p, void *q)
{
  data_list_t *data = (data_list_t*)p;

  int in_off_set = 0;
  int out_off_set = 0;

  if (*data->file_size > 0) {

    in_off_set = (*data->sum_i) * (*data->division_size);
    out_off_set = (*data->sum_i) * (*data->status_num);
    *data->division_size = *data->file_size;

    HTaskPtr task = (HTaskPtr)manager->allocate(sizeof(HTask));
    task = manager->create_task(EXEC_TASK);
    task->add_inData(data->file_mmap + in_off_set, *data->division_size);
    task->add_outData(data->o_data + out_off_set, *data->division_out_size);
    task->add_param(*data->division_size);
    task->add_param(*data->word_flag);
    data->run_wait_task->wait_for(task);
    task->set_cpu(SPE_ANY);
    task->spawn();

  }

  data->run_wait_task->spawn();

}

void
func1(SchedTask *s, void *p, void *q)
{

  /*receive*/
  HTaskPtr wait_task = manager->create_task(WAIT_TASK);
  data_list_t *data = (data_list_t*)p;


  if (*data->sum_task_num < *data->task_num) {
    *data->task_num = *data->sum_task_num;
  }


  /*run*/
  task_list_t *t = data->task_list;

  for (;t != NULL; t = t->next) {
    wait_task->wait_for(t->task);
    t->task->spawn();
  }

  task_list_t *now = NULL;
  task_list_t *next = (task_list_t*)manager->allocate(sizeof(task_list_t));
  next = NULL;

  *data->sum_task_num -= *data->task_num;
  if (*data->sum_task_num < TASKBUFF) {
    *data->task_num =  *data->sum_task_num;
  } else {
    *data->task_num =  TASKBUFF;
  }


  /*for (int i = 0; i < *data->task_num; i++) {  

    now = (task_list_t*)manager->allocate(sizeof(task_list_t));
    now->task = (HTaskPtr)manager->allocate(sizeof(HTask));
    now->task = manager->create_task(HELLO_TASK);
    now->task->set_cpu(SPE_ANY);
    now->next = next;

    next = now;

    }*/


  int in_off_set = 0;
  int out_off_set = 0;
  char next_word = 0;

  for (int i = 0; i < *data->task_num; i++) {  

    in_off_set = (*data->sum_i) * (*data->division_size);
    out_off_set = (*data->sum_i) * (*data->status_num);
    next_word = data->file_mmap[(*data->sum_i + 1) * (*data->division_size) - 1];

    now = (task_list_t*)manager->allocate(sizeof(task_list_t));
    now->task = (HTaskPtr)manager->allocate(sizeof(HTask));
    now->task = manager->create_task(EXEC_TASK);
    now->task->add_inData(data->file_mmap + in_off_set, *data->division_size);
    now->task->add_outData(data->o_data + out_off_set, *data->division_out_size);
    now->task->add_param(*data->division_size);
    now->task->add_param(*data->word_flag);
    now->task->set_cpu(SPE_ANY);
    now->next = next;

    next = now;

    *data->word_flag = ((next_word != 0x20) && (next_word != 0x0A));
    *data->file_size -= *data->division_size;
    *data->sum_i += 1;

  }



  data->task_list = now;
  
  
  if (*data->sum_task_num > 0) {
    wait_task->set_post(func1,p, 0);
  }

  /* すべての task が終了するのを待つ。*/
  else {
    wait_task->set_post(func2,p, 0);
  }

  /* 16task が終了するのを待つ */
  wait_task->spawn();
    
}


int
init(int argc, char **argv)
{
    for (int i = 1; argv[i]; ++i) {
	if (strcmp(argv[i], "-count") == 0) {
            count = atoi(argv[++i]);
        } else if (strcmp(argv[i], "-file") == 0) {
	  file = argv[++i];
	}


    }

    return 0;
}

void
run_init(TaskManager *manager)
{

  data_list_t *data = (data_list_t*)manager->allocate(sizeof(data_list_t));;

  /*data create from input file*/
  create_data(file,data);

  HTaskPtr t_print = manager->create_task(PRINT_TASK);
  t_print->add_inData(data->o_data, *data->out_size);
  t_print->add_param(*data->out_task_num);
  t_print->add_param(*data->status_num);
  t_print->wait_for(data->run_wait_task);

  /*task create*/
  task_list_t *now = NULL;
  task_list_t *next = (task_list_t*)manager->allocate(sizeof(task_list_t));
  next = NULL;

  int in_off_set = 0;
  int out_off_set = 0;
  char next_word = 0;

  for (int i = 0; i < *data->task_num; i++) {  

    in_off_set = (*data->sum_i) * (*data->division_size);
    out_off_set = (*data->sum_i) * (*data->status_num);
    next_word = data->file_mmap[(*data->sum_i + 1) * (*data->division_size) - 1];

    now = (task_list_t*)manager->allocate(sizeof(task_list_t));
    now->task = (HTaskPtr)manager->allocate(sizeof(HTask));
    now->task = manager->create_task(EXEC_TASK);
    now->task->add_inData(data->file_mmap + in_off_set, *data->division_size);
    now->task->add_outData(data->o_data + out_off_set, *data->division_out_size);
    now->task->add_param(*data->division_size);
    now->task->add_param(*data->word_flag);
    now->task->set_cpu(SPE_ANY);
    now->next = next;

    next = now;

    *data->word_flag = ((next_word != 0x20) && (next_word != 0x0A));
    *data->file_size -= *data->division_size;
    *data->sum_i += 1;

  }

  data->task_list = now;

  func1(0, (void*)data, 0);

  t_print->spawn();

}

int
TMmain(TaskManager *manager_, int argc, char *argv[])
{
    manager = manager_;

    if (init(argc, argv) < 0) {
	return -1;
    }

    task_init();
    run_init(manager);

    return 0;
}