changeset 308:a8255a831ade

implement ping api
author sugi
date Tue, 19 Nov 2013 17:39:44 +0900
parents 52bb813ed52e
children 797267843126
files src/alice/codesegment/OutputDataSegment.java src/alice/daemon/IncomingTcpConnection.java src/alice/datasegment/CommandType.java src/alice/datasegment/DataSegmentKey.java src/alice/datasegment/DataSegmentManager.java src/alice/datasegment/LocalDataSegmentManager.java src/alice/datasegment/RemoteDataSegmentManager.java src/alice/topology/manager/keeparive/PingScheduler.java src/alice/topology/manager/keeparive/SendPing.java
diffstat 9 files changed, 55 insertions(+), 15 deletions(-) [+]
line wrap: on
line diff
--- a/src/alice/codesegment/OutputDataSegment.java	Tue Nov 19 15:51:52 2013 +0900
+++ b/src/alice/codesegment/OutputDataSegment.java	Tue Nov 19 17:39:44 2013 +0900
@@ -85,4 +85,15 @@
 		DataSegment.get(managerKey).close();
 	}
 
+	/**
+	 * "key" is not remote DataSegment's key.
+	 * "Ping Response" return in this "key" 
+	 * 
+	 * @param managerKey
+	 * @param key
+	 */
+	public void ping(String managerKey ,String returnKey) {
+		DataSegment.get(managerKey).ping(returnKey);
+	}
+
 }
--- a/src/alice/daemon/IncomingTcpConnection.java	Tue Nov 19 15:51:52 2013 +0900
+++ b/src/alice/daemon/IncomingTcpConnection.java	Tue Nov 19 17:39:44 2013 +0900
@@ -62,6 +62,12 @@
 					cmd.cs.ids.reply(cmd.receiver, new Command(type, null, null, msg.val, msg.index, msg.seq, null, null, null));
 					cmd=null;
 					break;
+				case PING:
+					DataSegment.get(reverseKey).response(msg.key);
+					break;
+				case RESPONSE:
+					DataSegment.getLocal().put(msg.key, System.currentTimeMillis());
+					break;
 				default:
 					break;
 				}
--- a/src/alice/datasegment/CommandType.java	Tue Nov 19 15:51:52 2013 +0900
+++ b/src/alice/datasegment/CommandType.java	Tue Nov 19 17:39:44 2013 +0900
@@ -10,7 +10,9 @@
 	REMOVE,
 	REPLY,
 	CLOSE,
-	FINISH;
+	FINISH, 
+	PING,
+	RESPONSE;
 	
 	public int id;
 	public static HashMap<Integer, CommandType> hash = new HashMap<Integer, CommandType>();
--- a/src/alice/datasegment/DataSegmentKey.java	Tue Nov 19 15:51:52 2013 +0900
+++ b/src/alice/datasegment/DataSegmentKey.java	Tue Nov 19 17:39:44 2013 +0900
@@ -25,7 +25,7 @@
 		case PUT:
 			int index = tailIndex;
 			tailIndex++;
-			DataSegmentValue dsv = new DataSegmentValue(index, cmd.val, cmd.obj,cmd.reverseKey); 
+			DataSegmentValue dsv = new DataSegmentValue(index, cmd.val, cmd.obj, cmd.reverseKey); 
 			dataList.add(dsv);
 			// Process waiting peek and take commands
 			for (Iterator<Command> iter = waitList.iterator(); iter.hasNext(); ) {
--- a/src/alice/datasegment/DataSegmentManager.java	Tue Nov 19 15:51:52 2013 +0900
+++ b/src/alice/datasegment/DataSegmentManager.java	Tue Nov 19 17:39:44 2013 +0900
@@ -10,7 +10,7 @@
 
 public abstract class DataSegmentManager {
 	
-	protected ConcurrentHashMap<Integer, Command> seqHash = new ConcurrentHashMap<Integer, Command>(); //TODO Over Head
+	protected ConcurrentHashMap<Integer, Command> seqHash = new ConcurrentHashMap<Integer, Command>();
 	protected LinkedBlockingQueue<Command> replyQueue = new LinkedBlockingQueue<Command>();
 	protected AtomicInteger seq = new AtomicInteger(1); // waiting for PUT or UPDATE at unique sequence number
 														// but it doesn't need for Local
@@ -49,23 +49,22 @@
 			e.printStackTrace();
 		}
 	}
-	
-	
+		
 	public abstract void put(String key, Object val);
 	public abstract void update(String key, Object val);
 	public abstract void take(Receiver receiver, CodeSegment cs);
 	public abstract void peek(Receiver receiver, CodeSegment cs);
+	
+	public abstract void quickPut(String key, Object val);
+	public abstract void quickUpdate(String key, Object val);
+	public abstract void quickPeek(Receiver receiver, CodeSegment cs);
+	public abstract void quickTake(Receiver receiver, CodeSegment cs);
 
 	public abstract void remove(String key);
 	public abstract void close();
 	public abstract void finish();
 	
-	public abstract void quickPut(String key, Object val);
-	public abstract void quickUpdate(String key, Object val);
-	
-	public abstract void quickPeek(Receiver receiver, CodeSegment cs);
-	public abstract void quickTake(Receiver receiver, CodeSegment cs);
-
-	
+	public abstract void ping(String returnKey);
+	public abstract void response(String returnKey);
 		
 }
--- a/src/alice/datasegment/LocalDataSegmentManager.java	Tue Nov 19 15:51:52 2013 +0900
+++ b/src/alice/datasegment/LocalDataSegmentManager.java	Tue Nov 19 17:39:44 2013 +0900
@@ -153,4 +153,14 @@
 		
 	}
 
+	@Override
+	public void ping(String returnKey) {
+		
+	}
+
+	@Override
+	public void response(String returnKey) {
+				
+	}
+
 }
--- a/src/alice/datasegment/RemoteDataSegmentManager.java	Tue Nov 19 15:51:52 2013 +0900
+++ b/src/alice/datasegment/RemoteDataSegmentManager.java	Tue Nov 19 17:39:44 2013 +0900
@@ -144,5 +144,17 @@
 		connection.sendCommand(cmd);
 	}
 
+	@Override
+	public void ping(String returnKey) {
+		Command cmd = new Command(CommandType.PING, null, returnKey, null, 0, 0, null, null, null);
+		connection.write(cmd);
+	}
+	
+	@Override
+	public void response(String returnKey) {
+		Command cmd = new Command(CommandType.RESPONSE, null, returnKey, null, 0, 0, null, null, null);
+		connection.write(cmd);
+	}
+
 
 }
--- a/src/alice/topology/manager/keeparive/PingScheduler.java	Tue Nov 19 15:51:52 2013 +0900
+++ b/src/alice/topology/manager/keeparive/PingScheduler.java	Tue Nov 19 17:39:44 2013 +0900
@@ -59,7 +59,7 @@
 				if (interruptFlag){				
 					interruptFlag = false;				
 				} else {
-					//ods.put("_SENDPING", nowTask);
+					ods.put("_SENDPING", nowTask.getManagerKey());
 				}
 			}
 		} catch (InterruptedException e) {
--- a/src/alice/topology/manager/keeparive/SendPing.java	Tue Nov 19 15:51:52 2013 +0900
+++ b/src/alice/topology/manager/keeparive/SendPing.java	Tue Nov 19 17:39:44 2013 +0900
@@ -13,8 +13,8 @@
 
 	@Override
 	public void run(){
-		TaskInfo info = taskInfo.asClass(TaskInfo.class);
-		ods.ping(info.getManagerKey());
+		String managerKey = taskInfo.asString();
+		ods.ping(managerKey , "UUID");
 		//new RespondPing();
 	}
 }