view TaskManager/Cell/CellTaskManagerImpl.cc @ 640:a909c50081c2

SimpeTask on Cell worked.
author Shinji KONO <kono@ie.u-ryukyu.ac.jp>
date Fri, 20 Nov 2009 23:12:34 +0900
parents 671fca057ad3
children 7c9ded1ea750
line wrap: on
line source

#define DEBUG
#include "error.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "CellTaskManagerImpl.h"
#include "CellTaskListInfo.h"
#include "HTaskInfo.h"
#include "SchedTask.h"
#include "MainScheduler.h"
#include "types.h"

CellTaskManagerImpl::~CellTaskManagerImpl()
{
    delete speThreads;
    delete [] speTaskList;
    delete [] speTaskList_bg;
    /**
     * bufferManager は
     * ppeManager のなかで delete してもらう
     */
    // delete bufferManager;
    delete [] flag_sendTaskList;

    delete ppeManager;
}

void
CellTaskManagerImpl::init()
{
    spe_running = 0;
    taskListImpl = new CellTaskListInfo;
    taskListImpl->init(machineNum*2);

    activeTaskQueue = new HTaskInfo();

    htaskImpl = activeTaskQueue ; // any HTaskInfo

    speThreads = new SpeThreads(machineNum);
    speThreads->init();

    speTaskList  = new TaskListPtr[machineNum];
    speTaskList_bg  = new TaskListPtr[machineNum];

    for (int i = 0; i < machineNum; i++) {
	speTaskList[i] = taskListImpl->create();
	speTaskList_bg[i] = taskListImpl->create();
    }

    flag_sendTaskList = new int[machineNum];
    for (int i = 0; i < machineNum; i++) {
	flag_sendTaskList[i] = 1;
    } 
    // PPE 側の管理をする Manager
    ppeManager = new FifoTaskManagerImpl(machineNum);
    // 大半のTaskQueueInfoは、共有される
    MainScheduler *mscheduler = new MainScheduler;
    ppeManager->init(mscheduler, this);
    
    ppeManager->get_scheduler()->set_manager(this);

    schedTaskManager = new SchedTask();
    schedTaskManager->init(0,0,0,ppeManager->get_scheduler());
}

void
CellTaskManagerImpl::append_activeTask(HTaskPtr task)
{
    if (task->cpu_type == CPU_PPE) {
        ppeManager->append_activeTask(task);
    } else {
        activeTaskQueue->addLast(task);
    }
}

// SPE_ANY が指定されていた時に
// これをインクリメントしつつ呼ぶことにする。
// 乱数使ってもいいけどさ。
int cur_anySpeid = 0;

/**
 * ActiveTaskQueue から Task を
 * 各 SPE に渡す TaskList に入れる
 *
 * ここの activeTaskQueue は FifoTaskManagerImpl のと意味が違い、
 * spe に渡される Task だけ入っている
 */
void
CellTaskManagerImpl::set_runTaskList()
{
    // ここ...直すかな
    TaskListPtr list;
    
    TaskPtr task;
    int speid;

    while (HTaskPtr htask = activeTaskQueue->poll()) {

	if (htask->cpu_type == SPE_ANY) {
	    speid = cur_anySpeid++;
	    cur_anySpeid = (cur_anySpeid < machineNum)
		? cur_anySpeid : 0;
	} else {
	    // -1 してるのは
	    // htask->cpu_type - CPU_SPE で
	    // SPE0 = 1, SPE1 = 2, ... SPE5 = 6 ってなってるので
	    // 配列的 (SPE0 = arr[0], SPE1 = arr[1]) にするため
	    speid = htask->cpu_type - CPU_SPE - 1;

	    // SPU の数以上が指定されていれば
	    // とりあえず MAX_USE_SPE_NUM (実際に動く SPE の最大数) で
	    // あまり求めてそれを使うことにする。
	    // ここで判定するもんでもないか?
	    if (speid >= machineNum) {
		speid %= machineNum;
	    }
	}

	list = speTaskList_bg[speid];

	if (list->length >= TASK_MAX_SIZE) {
	    TaskListPtr newList = taskListImpl->create();
	    newList = TaskListInfo::append(newList, speTaskList_bg[speid]);
	    speTaskList_bg[speid] = newList;
	    list = newList;
	}

	task = &list->tasks[list->length++];
	TaskPtr stask = (TaskPtr) task;
	*stask = *(TaskPtr) htask;
    }
}

void
CellTaskManagerImpl::run()
{
    TaskListPtr ppeTaskList = NULL;
    MailQueuePtr ppeMail = NULL;

    // PPE 側で動く TaskList です
    // FifoTaskManagerImpl::run と上手く合うように
    // こんなことやってますよ
    //
    // 本来は、別 thread で動かすべきだろう...
    ppeTaskList = ppeManager->get_runTaskList();
    if (!ppeTaskList) {
	goto cont;
    }

    // SPE からの Mailbox Check は
    // PPE 側の schedule から抜けて来たときに行う
    // (speThreads で Blocking Mailbox read と
    // セマフォとか使ってやってもいいが、今はこの方式で)
    //
    // すべてのspe task が finish task を待つ場合は、ppeTaskList の
    // 判定だけで十分だが、そうでない場合は、spe の task が残っているか
    // どうかを調べる必要がある。
    //
    do {
	ppeMail = ppeManager->schedule(ppeTaskList);
    cont:
	ppeTaskList = mail_check(ppeMail);
    } while (ppeTaskList || spe_running >0);
}

/**
 * SPE からのメールをチェックする
 *
 * @param [mail_list]
 *        PPE 側で動く Scheduler からのメールリスト
 *        この中で PPE 側の mail check も行う
 *
 * @return PPE Scheduler に対してのメール。
 *         次に実行する TaskList のアドレスや、終了コマンドを送る
 */

TaskListPtr
CellTaskManagerImpl::mail_check(MailQueuePtr mail_list)
{
    // PPE Scheduler からの mail check
    ppeManager->mail_check(mail_list, waitTaskQueue);

    do {
	memaddr data;

	// SPE Scheduler からの mail check
	for (int id = 0; id < machineNum; id++) {	    
	    while (speThreads->check_mail(id, 1, &data)) {				
		/**
		 * MY_SPE_STATUS_READY: SPE が持ってた Task 全て終了
		 * MY_SPE_NOP: 特に意味のないコマンド
		 * それ以外:終了したタスク(PPEにあるのでアドレス
		 *
		 * MY_SPE_NOP が 0 なので、
		 * 下のように data > MY_SPE_NOP とかしています。
		 * 一目でよくわからない書き方なんで、直したいところですが。。。
		 */
		// 名前あとでちゃんと決めよう => MY_SPE_... とかじゃなくて
		if (data == (memaddr)MY_SPE_STATUS_READY) {
		    //__debug_ppe("[SPE %d] finish\n", id);
		    flag_sendTaskList[id] = 1;
		    spe_running--;
		} else if (data == (memaddr)MY_SPE_COMMAND_MALLOC) {
		    //__debug_ppe("[PPE] MALLOC COMMAND from [SPE %d]\n", id);

		    /**
		     * info[0] = alloc_id; (CellScheduler::mainMem_alloc 参照)
		     * info[1] = alloc_addr;
		     */
		    memaddr alloc_info[2];
		    long alloc_size;
		    long command;
		    
		    speThreads->get_mail(id, 2, alloc_info);
		    command = (long)alloc_info[0];
		    alloc_size = (long)alloc_info[1];

		    
		    alloc_info[1] = (memaddr)allocate(alloc_size);
		    /*
		     * allocate された領域は今の SPE buffer にリンクとして接続する
		     * ここでは TaskList を allocate(new) して登録してやろうか
		     */

		    //__debug_ppe("[PPE] MALLOCED 0x%lx from [SPE %d]\n", alloc_info[1],id);
		    // 今のところ何もしてない。どうも、この allocate を free 
		    // するのは、SPE task が返した値を見て行うらしい。それは、
		    // 忘れやすいのではないか?
		    speThreads->add_output_tasklist(command, alloc_info[1], alloc_size);

		    speThreads->send_mail(id, 2, alloc_info);
		} else if (data > (memaddr)MY_SPE_NOP) {
		    //__debug_ppe("[PPE] recv from [SPE %d] : 0x%x\n", id, data);
		    HTaskPtr task = (HTaskPtr)data;
		    task->post_func(schedTaskManager, task->post_arg1, task->post_arg2);
		    check_task_finish(task);
		}
	    }
	}

	// 依存関係を満たしたタスクをアクティブに
	wakeup_waitTask();

	// SPE に送る TaskList の準備
	set_runTaskList();

	// TaskList 待ちの SPE に TaskList を送る
	for (int i = 0; i < machineNum; i++)  {
	    if (flag_sendTaskList[i] == 1 && speTaskList_bg[i]->length >= 1 ) {
		send_taskList(i);
		spe_running++;
	    }
	}

	// 現在の FifoTaskManager の仕様では
	// ・PPE で実行するタスクが無くなれば終了する
	// であり、この場合もし SPE にタスクが残っていても
	// メインループから抜けてプログラム終了となってしまうので
	// ここでストップかけてます。
    } while (ppeManager->activeTaskQueue->empty() && !waitTaskQueue->empty());
	
    return ppeManager->get_runTaskList();
}

/**
 * 条件を満たしたら SPE に TaskList を送信する
 * 条件1. SPE が持ってた TaskList を終了して、次の TaskList を待ってる
 * 条件2. SPE に送る TaskList に Task がある
 *
 * SPE で実行終了した TaskList [speTaskList] と
 * これから実行する TaskList [speTaskList_bg] のバッファを入れ替える
 * ついでに実行終了したやつは clear しておく。
 */
void
CellTaskManagerImpl::send_taskList(int id)
{
    TaskListPtr tmp;

    tmp = speTaskList[id];
    speTaskList[id]  = speTaskList_bg[id];
    speTaskList_bg[id] = tmp;

    taskListImpl->clear_taskList(speTaskList_bg[id]);

    speThreads->send_mail(id, 1, (memaddr *)&speTaskList[id]);
    flag_sendTaskList[id] = 0;
}


#ifdef __CERIUM_CELL__
TaskManagerImpl*
create_impl(int num)
{
    return new CellTaskManagerImpl(num);
}
#endif // __CERIUM_CELL