# HG changeset patch # User sugi # Date 1384850384 -32400 # Node ID a8255a831ade7d673f43cb47d9212a766ea0e39c # Parent 52bb813ed52e01e6467011c636e26402333a4f9e implement ping api diff -r 52bb813ed52e -r a8255a831ade src/alice/codesegment/OutputDataSegment.java --- a/src/alice/codesegment/OutputDataSegment.java Tue Nov 19 15:51:52 2013 +0900 +++ b/src/alice/codesegment/OutputDataSegment.java Tue Nov 19 17:39:44 2013 +0900 @@ -85,4 +85,15 @@ DataSegment.get(managerKey).close(); } + /** + * "key" is not remote DataSegment's key. + * "Ping Response" return in this "key" + * + * @param managerKey + * @param key + */ + public void ping(String managerKey ,String returnKey) { + DataSegment.get(managerKey).ping(returnKey); + } + } diff -r 52bb813ed52e -r a8255a831ade src/alice/daemon/IncomingTcpConnection.java --- a/src/alice/daemon/IncomingTcpConnection.java Tue Nov 19 15:51:52 2013 +0900 +++ b/src/alice/daemon/IncomingTcpConnection.java Tue Nov 19 17:39:44 2013 +0900 @@ -62,6 +62,12 @@ cmd.cs.ids.reply(cmd.receiver, new Command(type, null, null, msg.val, msg.index, msg.seq, null, null, null)); cmd=null; break; + case PING: + DataSegment.get(reverseKey).response(msg.key); + break; + case RESPONSE: + DataSegment.getLocal().put(msg.key, System.currentTimeMillis()); + break; default: break; } diff -r 52bb813ed52e -r a8255a831ade src/alice/datasegment/CommandType.java --- a/src/alice/datasegment/CommandType.java Tue Nov 19 15:51:52 2013 +0900 +++ b/src/alice/datasegment/CommandType.java Tue Nov 19 17:39:44 2013 +0900 @@ -10,7 +10,9 @@ REMOVE, REPLY, CLOSE, - FINISH; + FINISH, + PING, + RESPONSE; public int id; public static HashMap hash = new HashMap(); diff -r 52bb813ed52e -r a8255a831ade src/alice/datasegment/DataSegmentKey.java --- a/src/alice/datasegment/DataSegmentKey.java Tue Nov 19 15:51:52 2013 +0900 +++ b/src/alice/datasegment/DataSegmentKey.java Tue Nov 19 17:39:44 2013 +0900 @@ -25,7 +25,7 @@ case PUT: int index = tailIndex; tailIndex++; - DataSegmentValue dsv = new DataSegmentValue(index, cmd.val, cmd.obj,cmd.reverseKey); + DataSegmentValue dsv = new DataSegmentValue(index, cmd.val, cmd.obj, cmd.reverseKey); dataList.add(dsv); // Process waiting peek and take commands for (Iterator iter = waitList.iterator(); iter.hasNext(); ) { diff -r 52bb813ed52e -r a8255a831ade src/alice/datasegment/DataSegmentManager.java --- a/src/alice/datasegment/DataSegmentManager.java Tue Nov 19 15:51:52 2013 +0900 +++ b/src/alice/datasegment/DataSegmentManager.java Tue Nov 19 17:39:44 2013 +0900 @@ -10,7 +10,7 @@ public abstract class DataSegmentManager { - protected ConcurrentHashMap seqHash = new ConcurrentHashMap(); //TODO Over Head + protected ConcurrentHashMap seqHash = new ConcurrentHashMap(); protected LinkedBlockingQueue replyQueue = new LinkedBlockingQueue(); protected AtomicInteger seq = new AtomicInteger(1); // waiting for PUT or UPDATE at unique sequence number // but it doesn't need for Local @@ -49,23 +49,22 @@ e.printStackTrace(); } } - - + public abstract void put(String key, Object val); public abstract void update(String key, Object val); public abstract void take(Receiver receiver, CodeSegment cs); public abstract void peek(Receiver receiver, CodeSegment cs); + + public abstract void quickPut(String key, Object val); + public abstract void quickUpdate(String key, Object val); + public abstract void quickPeek(Receiver receiver, CodeSegment cs); + public abstract void quickTake(Receiver receiver, CodeSegment cs); public abstract void remove(String key); public abstract void close(); public abstract void finish(); - public abstract void quickPut(String key, Object val); - public abstract void quickUpdate(String key, Object val); - - public abstract void quickPeek(Receiver receiver, CodeSegment cs); - public abstract void quickTake(Receiver receiver, CodeSegment cs); - - + public abstract void ping(String returnKey); + public abstract void response(String returnKey); } diff -r 52bb813ed52e -r a8255a831ade src/alice/datasegment/LocalDataSegmentManager.java --- a/src/alice/datasegment/LocalDataSegmentManager.java Tue Nov 19 15:51:52 2013 +0900 +++ b/src/alice/datasegment/LocalDataSegmentManager.java Tue Nov 19 17:39:44 2013 +0900 @@ -153,4 +153,14 @@ } + @Override + public void ping(String returnKey) { + + } + + @Override + public void response(String returnKey) { + + } + } diff -r 52bb813ed52e -r a8255a831ade src/alice/datasegment/RemoteDataSegmentManager.java --- a/src/alice/datasegment/RemoteDataSegmentManager.java Tue Nov 19 15:51:52 2013 +0900 +++ b/src/alice/datasegment/RemoteDataSegmentManager.java Tue Nov 19 17:39:44 2013 +0900 @@ -144,5 +144,17 @@ connection.sendCommand(cmd); } + @Override + public void ping(String returnKey) { + Command cmd = new Command(CommandType.PING, null, returnKey, null, 0, 0, null, null, null); + connection.write(cmd); + } + + @Override + public void response(String returnKey) { + Command cmd = new Command(CommandType.RESPONSE, null, returnKey, null, 0, 0, null, null, null); + connection.write(cmd); + } + } diff -r 52bb813ed52e -r a8255a831ade src/alice/topology/manager/keeparive/PingScheduler.java --- a/src/alice/topology/manager/keeparive/PingScheduler.java Tue Nov 19 15:51:52 2013 +0900 +++ b/src/alice/topology/manager/keeparive/PingScheduler.java Tue Nov 19 17:39:44 2013 +0900 @@ -59,7 +59,7 @@ if (interruptFlag){ interruptFlag = false; } else { - //ods.put("_SENDPING", nowTask); + ods.put("_SENDPING", nowTask.getManagerKey()); } } } catch (InterruptedException e) { diff -r 52bb813ed52e -r a8255a831ade src/alice/topology/manager/keeparive/SendPing.java --- a/src/alice/topology/manager/keeparive/SendPing.java Tue Nov 19 15:51:52 2013 +0900 +++ b/src/alice/topology/manager/keeparive/SendPing.java Tue Nov 19 17:39:44 2013 +0900 @@ -13,8 +13,8 @@ @Override public void run(){ - TaskInfo info = taskInfo.asClass(TaskInfo.class); - ods.ping(info.getManagerKey()); + String managerKey = taskInfo.asString(); + ods.ping(managerKey , "UUID"); //new RespondPing(); } }