view src/main/java/alice/topology/manager/keeparive/TaskExecuter.java @ 599:3284428f525e dispose

add MetaCodeSegment & MetaDSM
author Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
date Mon, 15 Feb 2016 01:16:35 +0900
parents 28627bb3eeda
children
line wrap: on
line source

package alice.topology.manager.keeparive;

import alice.codesegment.CodeSegment;
import alice.codesegment.MetaCodeSegment;
import alice.datasegment.CommandType;
import alice.datasegment.DataSegment;
import alice.datasegment.Receiver;

public class TaskExecuter extends MetaCodeSegment {
    private Receiver info = ids.create(CommandType.TAKE);
    private TaskInfo nowTask;
    private boolean sleepFlag = false;
    private boolean skipFlag = false;
    private long startTime = 0;
    private long remainingTime = 0;
    private static TaskExecuter instance = new TaskExecuter();

    private TaskExecuter() {}
    public static TaskExecuter getInstance() {
        return instance;
    }

    public void setKey() {
        ids.init();
        info.setKey("_SCHEDULER");
    }

    @Override
    public synchronized void run()  {
        ListManager list = info.asClass(ListManager.class);
        if (list.getTaskList().size() == 0){
            TaskInfo task = new TaskInfo(TaskType.CREATE);
            task.setSleepTime(3000);
            list.addTask(task);
            ods.update("_SCHEDULER", list);
            remainingTime = 0;
        } else {
            nowTask = list.getTaskList().poll();
            if (nowTask.getType() !=TaskType.PING)
                ods.update("_SCHEDULER", list);
            if (skipFlag) {
                skipFlag = false;
                nowTask.setSleepTime(remainingTime + nowTask.getSleepTime());
                remainingTime = 0;
            }
            startTime = System.currentTimeMillis();
            if (nowTask.getSleepTime() != 0){
                sleepFlag = true;
                try {
                    wait(nowTask.getSleepTime());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                sleepFlag = false;
            }

            if (!skipFlag){
                // ping or close
                if (nowTask.getType() == TaskType.PING) {
                    ods.ping(nowTask.getManagerKey(), nowTask.getReturnKey());
                    TaskInfo task = new TaskInfo(TaskType.CLOSE);
                    task.setInfo(nowTask.getManagerKey(), 10 * 1000);
                    list.addTask(task);
                    ods.update("_SCHEDULER", list);
                    new RespondPing(nowTask.getReturnKey());
                } else if (nowTask.getType() == TaskType.CLOSE) {
                    // no response from the Remote DataSegment. So close this connection.
                    if (DataSegment.contains(nowTask.getManagerKey())) {
                        DataSegment.get(nowTask.getManagerKey()).shutdown();
                        System.out.println(nowTask.getManagerKey() +" IS SHOTDOWN");
                    } else {
                        System.out.println(nowTask.getManagerKey() +" IS ALREADY SHOTDOWN");
                    }
                } else if (nowTask.getType() == TaskType.CREATE) {
                    new CreateTask();
                }
            }
            nowTask = null;
            startTime = 0;
        }
        setKey();
    }

    public synchronized void skip() {
        if (sleepFlag){
            skipFlag = true;
            if (startTime == 0){
                remainingTime = nowTask.getSleepTime();
            } else {
                remainingTime = nowTask.getSleepTime() - (System.currentTimeMillis() - startTime);
            }
            nowTask = null;
            notify();
        }
    }

    public synchronized void ignore() {
        if (sleepFlag){
            skipFlag = true;
            remainingTime = 0;
            nowTask = null;
            notify();
        }
    }

    public synchronized TaskInfo getNowTask() {
        return nowTask;
    }

    // only use in ListManagerTest
    public synchronized void setNowTask(TaskInfo info) {
        nowTask = info;
    }

    public synchronized long getRemainingTime() {
        if (sleepFlag) {
            if (startTime !=0) {
                return nowTask.getSleepTime() - (System.currentTimeMillis() - startTime);
            } else {
                return nowTask.getSleepTime();
            }
        } else {
            return remainingTime;
        }
    }

    public synchronized boolean compareAndSkip(TaskInfo task) {
        if (nowTask != null){
            if (nowTask.getType().equals(task.getType())
                    && nowTask.getManagerKey().equals(task.getManagerKey())){
                skip();
                return true;
            }
        }
        return false;
    }

}