changeset 323:d746c4486287

work
author one
date Mon, 23 Dec 2013 17:13:16 +0900
parents 4b5bf9cf1505
children 806cc010a5bd
files .settings/org.eclipse.core.resources.prefs src/alice/test/codesegment/local/StartCodeSegment.java src/alice/topology/manager/keeparive/CreateSchedule.java src/alice/topology/manager/keeparive/CreateTask.java src/alice/topology/manager/keeparive/RemoveTask.java src/alice/topology/manager/keeparive/RespondPing.java src/alice/topology/manager/keeparive/TaskExecuter.java
diffstat 7 files changed, 93 insertions(+), 47 deletions(-) [+]
line wrap: on
line diff
--- a/.settings/org.eclipse.core.resources.prefs	Tue Dec 17 19:46:29 2013 +0900
+++ b/.settings/org.eclipse.core.resources.prefs	Mon Dec 23 17:13:16 2013 +0900
@@ -4,4 +4,5 @@
 encoding//src/alice/test/codesegment/local/wordcount/WordCount.java=UTF-8
 encoding//src/alice/topology/manager/IncomingHosts.java=UTF-8
 encoding//src/alice/topology/manager/createABSIPList.java=UTF-8
+encoding//src/alice/topology/manager/keeparive/RemoveTask.java=UTF-8
 encoding//src/alice/topology/manager/keeparive/TaskExecuter.java=UTF-8
--- a/src/alice/test/codesegment/local/StartCodeSegment.java	Tue Dec 17 19:46:29 2013 +0900
+++ b/src/alice/test/codesegment/local/StartCodeSegment.java	Mon Dec 23 17:13:16 2013 +0900
@@ -8,19 +8,13 @@
 	public void run() {
 		System.out.println("run StartCodeSegment");
 		
-		//TestCodeSegment cs = new TestCodeSegment();
-		//cs.arg1.setKey("key1"); // unbound datasegment key1 is created and connect to cs.
+		TestCodeSegment cs = new TestCodeSegment();
+		cs.arg1.setKey("key1"); // unbound datasegment key1 is created and connect to cs.
 								// cs is waiting for local.key1
 		System.out.println("create TestCodeSegment");
 		
 		ods.update("local", "key1", "String data"); // bind string data to datasegment local.key1
 													// this startup TestCodeSegment.
-		
-		SingletonTestCodeSegment cs1 = SingletonTestCodeSegment.getInstance();
-		cs1.arg1.setKey("key2");
-		
-		// DataSegment.get("local").update
-		ods.update("local", "key2", "String data");
  	}
 
 }
--- a/src/alice/topology/manager/keeparive/CreateSchedule.java	Tue Dec 17 19:46:29 2013 +0900
+++ b/src/alice/topology/manager/keeparive/CreateSchedule.java	Mon Dec 23 17:13:16 2013 +0900
@@ -1,7 +1,5 @@
 package alice.topology.manager.keeparive;
 
-import java.util.LinkedList;
-
 import alice.codesegment.CodeSegment;
 import alice.datasegment.CommandType;
 import alice.datasegment.Receiver;
@@ -19,25 +17,7 @@
 	public void run() {
 		ListManager manager = info1.asClass(ListManager.class);
 		TaskInfo newTask = info2.asClass(TaskInfo.class);
-		TaskExecuter ps = TaskExecuter.getInstance();
-		LinkedList<TaskInfo> list = manager.getTaskList();
-		if (ps.getNowTask()!=null&&ps.getNowTask().getManagerKey().equals(newTask.getManagerKey())){
-			long postponeTime = ps.getNowTask().getSleepTime() - (System.currentTimeMillis() - ps.getTime());
-			TaskInfo nextTask;
-			if (list.size() != 0){
-				nextTask = list.getFirst();
-				nextTask.setSleepTime(postponeTime + nextTask.getSleepTime());
-			}
-			manager.addTask(newTask);
-			ps.interrupt();
-		} else {
-			for (TaskInfo info : list){
-				if (newTask.getManagerKey().equals(info.getManagerKey())){
-					list.remove(info);
-				}
-			}
-			manager.addTask(newTask);
-		}
+		manager.addTask(newTask);
 		ods.update("_WAITINGLIST", manager);
 		new CheckExistTask();
 	}
--- a/src/alice/topology/manager/keeparive/CreateTask.java	Tue Dec 17 19:46:29 2013 +0900
+++ b/src/alice/topology/manager/keeparive/CreateTask.java	Mon Dec 23 17:13:16 2013 +0900
@@ -24,7 +24,7 @@
 		for (String manager : list){
 			int i = 5;
 			TaskInfo task = new TaskInfo(TaskType.PING);
-			task.setInfo(manager, "_RESPONCE", i * 1000);
+			task.setInfo(manager, "_FORM_"+manager, i * 1000);
 			ods.put("_TASKINFO", task);
 			i +=5;
 		}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/topology/manager/keeparive/RemoveTask.java	Mon Dec 23 17:13:16 2013 +0900
@@ -0,0 +1,36 @@
+package alice.topology.manager.keeparive;
+
+import alice.codesegment.CodeSegment;
+import alice.datasegment.CommandType;
+import alice.datasegment.Receiver;
+
+public class RemoveTask extends CodeSegment {
+	private Receiver info1 = ids.create(CommandType.TAKE); // list
+	private Receiver info2 = ids.create(CommandType.TAKE); // remove task
+
+	public RemoveTask(){
+		info1.setKey("_WAITINGLIST");
+		info2.setKey("_REMOVETASK");
+	}
+
+	@Override
+	public void run() {
+		System.out.println();
+		ListManager list = info1.asClass(ListManager.class); // explicit cast
+		TaskInfo task = info2.asClass(TaskInfo.class);
+		TaskExecuter exec = TaskExecuter.getInstance();
+		if (exec.compareNowTask(task)){
+			// case: remove task is state countdown.
+			exec.skip();
+		} else {
+			// case: remove task is still in the waiting queue.
+			for (TaskInfo t: list.getTaskList()){
+				if (t.getType().equals(task.getType())){
+					list.getTaskList().remove(t);
+					break;
+				}
+			}
+		}
+		ods.update("_WAITINGLIST", list);
+	}
+}
--- a/src/alice/topology/manager/keeparive/RespondPing.java	Tue Dec 17 19:46:29 2013 +0900
+++ b/src/alice/topology/manager/keeparive/RespondPing.java	Mon Dec 23 17:13:16 2013 +0900
@@ -2,10 +2,12 @@
 
 import alice.codesegment.CodeSegment;
 import alice.datasegment.CommandType;
+import alice.datasegment.DataSegment;
 import alice.datasegment.Receiver;
 
 public class RespondPing extends CodeSegment{
 	private Receiver respond = ids.create(CommandType.TAKE);
+	private long pingedTime = System.currentTimeMillis();
 	
 	public RespondPing(String key) {
 		respond.setKey(key);
@@ -16,10 +18,27 @@
 		RespondData d = respond.asClass(RespondData.class);
 		System.out.print("ping from "+d.from);
 		System.out.println(" Recieved time "+d.time);
-		TaskInfo task = new TaskInfo(TaskType.PING);
-		task.setInfo(d.from, "_RESPONCE", 5 * 1000);
-		ods.put("_TASKINFO", task);
-		
+		if (pingedTime - d.time > 60 * 1000){
+			// need check, this connection is alive. may be close 
+			DataSegment.get(d.from).close();
+		} else {
+			System.out.println("alive");
+			// if nowTask is close d.from's socket cancel. 
+			// if not remove close task in the Queue.
+			TaskExecuter exec = TaskExecuter.getInstance();
+			TaskInfo task = new TaskInfo(TaskType.CLOSE);
+			task.setInfo(d.from, null, 0);
+			if (exec.compareNowTask(task)){
+				// case: remove task is state countdown.
+				exec.skip();
+			} else {
+				// case: remove task is still in the waiting queue.
+				ods.put("_REMOVETASK",task);
+				new RemoveTask();
+			}
+			TaskInfo task2 = new TaskInfo(TaskType.PING);
+			task2.setInfo(d.from, "_FORM_"+d.from, 5 * 1000);
+			ods.put("_TASKINFO", task2);
+		}
 	}
-
 }
--- a/src/alice/topology/manager/keeparive/TaskExecuter.java	Tue Dec 17 19:46:29 2013 +0900
+++ b/src/alice/topology/manager/keeparive/TaskExecuter.java	Mon Dec 23 17:13:16 2013 +0900
@@ -8,8 +8,9 @@
 public class TaskExecuter extends CodeSegment{
 	private Receiver info = ids.create(CommandType.TAKE);
 	private TaskInfo nowTask;
-	private boolean interruptFlag = false;
-	private long time = 0;
+	private boolean skipFlag = false;
+	private long startTime = 0;
+	private long remainingTime = 0;
 	private static TaskExecuter instance = new TaskExecuter();
 
 	private TaskExecuter(){}
@@ -26,21 +27,23 @@
 	public synchronized void run(){
 		ListManager list = info.asClass(ListManager.class);
 		if (list.getTaskList().size() == 0){
+			remainingTime = 0;
 			ods.update("_WAITINGLIST", list);
 			setKey();
 			return;
 		}
 		nowTask = list.getTaskList().poll();
 		ods.update("_WAITINGLIST", list);
-		time = System.currentTimeMillis();
+		if (skipFlag){
+			nowTask.setSleepTime(nowTask.getSleepTime() + remainingTime);
+		}
+		startTime = System.currentTimeMillis();
 		if (nowTask.getSleepTime()!=0){
 			try {
 				this.wait(nowTask.getSleepTime());
 			} catch (InterruptedException e){}
 		}
-		if (interruptFlag){				
-			interruptFlag = false;
-		} else {
+		if (!skipFlag){
 			if (nowTask.getType() == TaskType.PING){
 				ods.ping(nowTask.getManagerKey(), nowTask.getReturnKey());
 				TaskInfo task = new TaskInfo(TaskType.CLOSE);
@@ -48,15 +51,18 @@
 				ods.put("_TASKINFO", task);
 				new RespondPing(nowTask.getReturnKey());
 			} else {
+				// no response from the Remote DataSegment. So close this connection.
 				DataSegment.get(nowTask.getManagerKey()).close();
-			}
-			
+			}		
 		}
 		nowTask = null;
 		setKey();
 	}
-	public synchronized void interrupt(){
-		interruptFlag = true;
+	
+	public synchronized void skip(){
+		skipFlag = true;
+		remainingTime = nowTask.getSleepTime() - (System.currentTimeMillis() - startTime);
+		nowTask = null;
 		notify();
 	}
 
@@ -65,7 +71,17 @@
 	}
 
 	public long getTime(){
-		return time;
+		return startTime;
 	}
-}
+	
+	public boolean compareNowTask(TaskInfo task){
+		if (nowTask != null){
+			if (nowTask.getType().equals(task.getType())
+					&& nowTask.getManagerKey().equals(task.getManagerKey())){
+				return true;
+			} 
+		}
+		return false;
+	}
 
+}
\ No newline at end of file