# HG changeset patch # User one # Date 1387786396 -32400 # Node ID d746c4486287bcf369eb73bd7ea6d418fcc53267 # Parent 4b5bf9cf15058723f7394dd4613e68db2efe2191 work diff -r 4b5bf9cf1505 -r d746c4486287 .settings/org.eclipse.core.resources.prefs --- a/.settings/org.eclipse.core.resources.prefs Tue Dec 17 19:46:29 2013 +0900 +++ b/.settings/org.eclipse.core.resources.prefs Mon Dec 23 17:13:16 2013 +0900 @@ -4,4 +4,5 @@ encoding//src/alice/test/codesegment/local/wordcount/WordCount.java=UTF-8 encoding//src/alice/topology/manager/IncomingHosts.java=UTF-8 encoding//src/alice/topology/manager/createABSIPList.java=UTF-8 +encoding//src/alice/topology/manager/keeparive/RemoveTask.java=UTF-8 encoding//src/alice/topology/manager/keeparive/TaskExecuter.java=UTF-8 diff -r 4b5bf9cf1505 -r d746c4486287 src/alice/test/codesegment/local/StartCodeSegment.java --- a/src/alice/test/codesegment/local/StartCodeSegment.java Tue Dec 17 19:46:29 2013 +0900 +++ b/src/alice/test/codesegment/local/StartCodeSegment.java Mon Dec 23 17:13:16 2013 +0900 @@ -8,19 +8,13 @@ public void run() { System.out.println("run StartCodeSegment"); - //TestCodeSegment cs = new TestCodeSegment(); - //cs.arg1.setKey("key1"); // unbound datasegment key1 is created and connect to cs. + TestCodeSegment cs = new TestCodeSegment(); + cs.arg1.setKey("key1"); // unbound datasegment key1 is created and connect to cs. // cs is waiting for local.key1 System.out.println("create TestCodeSegment"); ods.update("local", "key1", "String data"); // bind string data to datasegment local.key1 // this startup TestCodeSegment. - - SingletonTestCodeSegment cs1 = SingletonTestCodeSegment.getInstance(); - cs1.arg1.setKey("key2"); - - // DataSegment.get("local").update - ods.update("local", "key2", "String data"); } } diff -r 4b5bf9cf1505 -r d746c4486287 src/alice/topology/manager/keeparive/CreateSchedule.java --- a/src/alice/topology/manager/keeparive/CreateSchedule.java Tue Dec 17 19:46:29 2013 +0900 +++ b/src/alice/topology/manager/keeparive/CreateSchedule.java Mon Dec 23 17:13:16 2013 +0900 @@ -1,7 +1,5 @@ package alice.topology.manager.keeparive; -import java.util.LinkedList; - import alice.codesegment.CodeSegment; import alice.datasegment.CommandType; import alice.datasegment.Receiver; @@ -19,25 +17,7 @@ public void run() { ListManager manager = info1.asClass(ListManager.class); TaskInfo newTask = info2.asClass(TaskInfo.class); - TaskExecuter ps = TaskExecuter.getInstance(); - LinkedList list = manager.getTaskList(); - if (ps.getNowTask()!=null&&ps.getNowTask().getManagerKey().equals(newTask.getManagerKey())){ - long postponeTime = ps.getNowTask().getSleepTime() - (System.currentTimeMillis() - ps.getTime()); - TaskInfo nextTask; - if (list.size() != 0){ - nextTask = list.getFirst(); - nextTask.setSleepTime(postponeTime + nextTask.getSleepTime()); - } - manager.addTask(newTask); - ps.interrupt(); - } else { - for (TaskInfo info : list){ - if (newTask.getManagerKey().equals(info.getManagerKey())){ - list.remove(info); - } - } - manager.addTask(newTask); - } + manager.addTask(newTask); ods.update("_WAITINGLIST", manager); new CheckExistTask(); } diff -r 4b5bf9cf1505 -r d746c4486287 src/alice/topology/manager/keeparive/CreateTask.java --- a/src/alice/topology/manager/keeparive/CreateTask.java Tue Dec 17 19:46:29 2013 +0900 +++ b/src/alice/topology/manager/keeparive/CreateTask.java Mon Dec 23 17:13:16 2013 +0900 @@ -24,7 +24,7 @@ for (String manager : list){ int i = 5; TaskInfo task = new TaskInfo(TaskType.PING); - task.setInfo(manager, "_RESPONCE", i * 1000); + task.setInfo(manager, "_FORM_"+manager, i * 1000); ods.put("_TASKINFO", task); i +=5; } diff -r 4b5bf9cf1505 -r d746c4486287 src/alice/topology/manager/keeparive/RemoveTask.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/topology/manager/keeparive/RemoveTask.java Mon Dec 23 17:13:16 2013 +0900 @@ -0,0 +1,36 @@ +package alice.topology.manager.keeparive; + +import alice.codesegment.CodeSegment; +import alice.datasegment.CommandType; +import alice.datasegment.Receiver; + +public class RemoveTask extends CodeSegment { + private Receiver info1 = ids.create(CommandType.TAKE); // list + private Receiver info2 = ids.create(CommandType.TAKE); // remove task + + public RemoveTask(){ + info1.setKey("_WAITINGLIST"); + info2.setKey("_REMOVETASK"); + } + + @Override + public void run() { + System.out.println(); + ListManager list = info1.asClass(ListManager.class); // explicit cast + TaskInfo task = info2.asClass(TaskInfo.class); + TaskExecuter exec = TaskExecuter.getInstance(); + if (exec.compareNowTask(task)){ + // case: remove task is state countdown. + exec.skip(); + } else { + // case: remove task is still in the waiting queue. + for (TaskInfo t: list.getTaskList()){ + if (t.getType().equals(task.getType())){ + list.getTaskList().remove(t); + break; + } + } + } + ods.update("_WAITINGLIST", list); + } +} diff -r 4b5bf9cf1505 -r d746c4486287 src/alice/topology/manager/keeparive/RespondPing.java --- a/src/alice/topology/manager/keeparive/RespondPing.java Tue Dec 17 19:46:29 2013 +0900 +++ b/src/alice/topology/manager/keeparive/RespondPing.java Mon Dec 23 17:13:16 2013 +0900 @@ -2,10 +2,12 @@ import alice.codesegment.CodeSegment; import alice.datasegment.CommandType; +import alice.datasegment.DataSegment; import alice.datasegment.Receiver; public class RespondPing extends CodeSegment{ private Receiver respond = ids.create(CommandType.TAKE); + private long pingedTime = System.currentTimeMillis(); public RespondPing(String key) { respond.setKey(key); @@ -16,10 +18,27 @@ RespondData d = respond.asClass(RespondData.class); System.out.print("ping from "+d.from); System.out.println(" Recieved time "+d.time); - TaskInfo task = new TaskInfo(TaskType.PING); - task.setInfo(d.from, "_RESPONCE", 5 * 1000); - ods.put("_TASKINFO", task); - + if (pingedTime - d.time > 60 * 1000){ + // need check, this connection is alive. may be close + DataSegment.get(d.from).close(); + } else { + System.out.println("alive"); + // if nowTask is close d.from's socket cancel. + // if not remove close task in the Queue. + TaskExecuter exec = TaskExecuter.getInstance(); + TaskInfo task = new TaskInfo(TaskType.CLOSE); + task.setInfo(d.from, null, 0); + if (exec.compareNowTask(task)){ + // case: remove task is state countdown. + exec.skip(); + } else { + // case: remove task is still in the waiting queue. + ods.put("_REMOVETASK",task); + new RemoveTask(); + } + TaskInfo task2 = new TaskInfo(TaskType.PING); + task2.setInfo(d.from, "_FORM_"+d.from, 5 * 1000); + ods.put("_TASKINFO", task2); + } } - } diff -r 4b5bf9cf1505 -r d746c4486287 src/alice/topology/manager/keeparive/TaskExecuter.java --- a/src/alice/topology/manager/keeparive/TaskExecuter.java Tue Dec 17 19:46:29 2013 +0900 +++ b/src/alice/topology/manager/keeparive/TaskExecuter.java Mon Dec 23 17:13:16 2013 +0900 @@ -8,8 +8,9 @@ public class TaskExecuter extends CodeSegment{ private Receiver info = ids.create(CommandType.TAKE); private TaskInfo nowTask; - private boolean interruptFlag = false; - private long time = 0; + private boolean skipFlag = false; + private long startTime = 0; + private long remainingTime = 0; private static TaskExecuter instance = new TaskExecuter(); private TaskExecuter(){} @@ -26,21 +27,23 @@ public synchronized void run(){ ListManager list = info.asClass(ListManager.class); if (list.getTaskList().size() == 0){ + remainingTime = 0; ods.update("_WAITINGLIST", list); setKey(); return; } nowTask = list.getTaskList().poll(); ods.update("_WAITINGLIST", list); - time = System.currentTimeMillis(); + if (skipFlag){ + nowTask.setSleepTime(nowTask.getSleepTime() + remainingTime); + } + startTime = System.currentTimeMillis(); if (nowTask.getSleepTime()!=0){ try { this.wait(nowTask.getSleepTime()); } catch (InterruptedException e){} } - if (interruptFlag){ - interruptFlag = false; - } else { + if (!skipFlag){ if (nowTask.getType() == TaskType.PING){ ods.ping(nowTask.getManagerKey(), nowTask.getReturnKey()); TaskInfo task = new TaskInfo(TaskType.CLOSE); @@ -48,15 +51,18 @@ ods.put("_TASKINFO", task); new RespondPing(nowTask.getReturnKey()); } else { + // no response from the Remote DataSegment. So close this connection. DataSegment.get(nowTask.getManagerKey()).close(); - } - + } } nowTask = null; setKey(); } - public synchronized void interrupt(){ - interruptFlag = true; + + public synchronized void skip(){ + skipFlag = true; + remainingTime = nowTask.getSleepTime() - (System.currentTimeMillis() - startTime); + nowTask = null; notify(); } @@ -65,7 +71,17 @@ } public long getTime(){ - return time; + return startTime; } -} + + public boolean compareNowTask(TaskInfo task){ + if (nowTask != null){ + if (nowTask.getType().equals(task.getType()) + && nowTask.getManagerKey().equals(task.getManagerKey())){ + return true; + } + } + return false; + } +} \ No newline at end of file