Mercurial > hg > Database > Alice
view src/main/java/alice/topology/manager/keeparive/TaskExecuter.java @ 599:3284428f525e dispose
add MetaCodeSegment & MetaDSM
author | Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp> |
---|---|
date | Mon, 15 Feb 2016 01:16:35 +0900 |
parents | 28627bb3eeda |
children |
line wrap: on
line source
package alice.topology.manager.keeparive; import alice.codesegment.CodeSegment; import alice.codesegment.MetaCodeSegment; import alice.datasegment.CommandType; import alice.datasegment.DataSegment; import alice.datasegment.Receiver; public class TaskExecuter extends MetaCodeSegment { private Receiver info = ids.create(CommandType.TAKE); private TaskInfo nowTask; private boolean sleepFlag = false; private boolean skipFlag = false; private long startTime = 0; private long remainingTime = 0; private static TaskExecuter instance = new TaskExecuter(); private TaskExecuter() {} public static TaskExecuter getInstance() { return instance; } public void setKey() { ids.init(); info.setKey("_SCHEDULER"); } @Override public synchronized void run() { ListManager list = info.asClass(ListManager.class); if (list.getTaskList().size() == 0){ TaskInfo task = new TaskInfo(TaskType.CREATE); task.setSleepTime(3000); list.addTask(task); ods.update("_SCHEDULER", list); remainingTime = 0; } else { nowTask = list.getTaskList().poll(); if (nowTask.getType() !=TaskType.PING) ods.update("_SCHEDULER", list); if (skipFlag) { skipFlag = false; nowTask.setSleepTime(remainingTime + nowTask.getSleepTime()); remainingTime = 0; } startTime = System.currentTimeMillis(); if (nowTask.getSleepTime() != 0){ sleepFlag = true; try { wait(nowTask.getSleepTime()); } catch (InterruptedException e) { e.printStackTrace(); } sleepFlag = false; } if (!skipFlag){ // ping or close if (nowTask.getType() == TaskType.PING) { ods.ping(nowTask.getManagerKey(), nowTask.getReturnKey()); TaskInfo task = new TaskInfo(TaskType.CLOSE); task.setInfo(nowTask.getManagerKey(), 10 * 1000); list.addTask(task); ods.update("_SCHEDULER", list); new RespondPing(nowTask.getReturnKey()); } else if (nowTask.getType() == TaskType.CLOSE) { // no response from the Remote DataSegment. So close this connection. if (DataSegment.contains(nowTask.getManagerKey())) { DataSegment.get(nowTask.getManagerKey()).shutdown(); System.out.println(nowTask.getManagerKey() +" IS SHOTDOWN"); } else { System.out.println(nowTask.getManagerKey() +" IS ALREADY SHOTDOWN"); } } else if (nowTask.getType() == TaskType.CREATE) { new CreateTask(); } } nowTask = null; startTime = 0; } setKey(); } public synchronized void skip() { if (sleepFlag){ skipFlag = true; if (startTime == 0){ remainingTime = nowTask.getSleepTime(); } else { remainingTime = nowTask.getSleepTime() - (System.currentTimeMillis() - startTime); } nowTask = null; notify(); } } public synchronized void ignore() { if (sleepFlag){ skipFlag = true; remainingTime = 0; nowTask = null; notify(); } } public synchronized TaskInfo getNowTask() { return nowTask; } // only use in ListManagerTest public synchronized void setNowTask(TaskInfo info) { nowTask = info; } public synchronized long getRemainingTime() { if (sleepFlag) { if (startTime !=0) { return nowTask.getSleepTime() - (System.currentTimeMillis() - startTime); } else { return nowTask.getSleepTime(); } } else { return remainingTime; } } public synchronized boolean compareAndSkip(TaskInfo task) { if (nowTask != null){ if (nowTask.getType().equals(task.getType()) && nowTask.getManagerKey().equals(task.getManagerKey())){ skip(); return true; } } return false; } }