changeset 7:352eb19d837d

implements reply of LocalDataSegment
author one
date Thu, 12 Jan 2012 13:48:34 +0900
parents c78a1cc2cd8f
children 78b415d019de
files src/alice/codesegment/CodeSegmentManager.java src/alice/codesegment/InputDataSegment.java src/alice/codesegment/Reply.java src/alice/datasegment/Command.java src/alice/datasegment/DataSegmentKey.java src/alice/datasegment/DataSegmentManager.java src/alice/datasegment/LocalDataSegmentManager.java src/alice/datasegment/Reply.java src/alice/test/codesegment/TestCodeSegment.java
diffstat 9 files changed, 53 insertions(+), 46 deletions(-) [+]
line wrap: on
line diff
--- a/src/alice/codesegment/CodeSegmentManager.java	Thu Jan 12 13:19:04 2012 +0900
+++ b/src/alice/codesegment/CodeSegmentManager.java	Thu Jan 12 13:48:34 2012 +0900
@@ -3,6 +3,8 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import alice.datasegment.Reply;
+
 public class CodeSegmentManager {
 	private static CodeSegmentManager instance = new CodeSegmentManager();
 	private LinkedBlockingQueue<Reply> replyQueue = new LinkedBlockingQueue<Reply>();
--- a/src/alice/codesegment/InputDataSegment.java	Thu Jan 12 13:19:04 2012 +0900
+++ b/src/alice/codesegment/InputDataSegment.java	Thu Jan 12 13:48:34 2012 +0900
@@ -16,22 +16,25 @@
 		this.cs = cs;
 	}
 	
-	public void peek(String managerKey, String key) {
-		peek(managerKey, key, 0);
+	public void peek(String argKey, String managerKey, String key) {
+		peek(argKey, managerKey, key, 0);
 	}
 	
-	public void peek(String managerKey, String key, int index) {
-		DataSegment.get(managerKey).peek(key, index, cs);
+	public void peek(String argKey, String managerKey, String key, int index) {
+		DataSegment.get(managerKey).peek(argKey, key, index, cs);
 		count.getAndIncrement();
 	}
 
-	public void take(String managerKey, String key) {
-		take(managerKey, key, 0);
+	public void take(String argKey,String managerKey, String key) {
+		take(argKey, managerKey, key, 0);
 	}
 	
-	public void take(String managerKey, String key, int index) {
-		DataSegment.get(managerKey).take(key, index, cs);
+	public void take(String argKey, String managerKey, String key, int index) {
+		DataSegment.get(managerKey).take(argKey, key, index, cs);
 		count.getAndIncrement();
 	}
-	
+
+	public void reply(String key, DataSegmentValue val) {
+		inputDataSegments.put(key, val);
+	}
 }
--- a/src/alice/codesegment/Reply.java	Thu Jan 12 13:19:04 2012 +0900
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,15 +0,0 @@
-package alice.codesegment;
-
-import org.msgpack.type.Value;
-
-public class Reply {
-	int seq;
-	int index;
-	Value val;
-	
-	public Reply(int seq, int index, Value val) {
-		this.seq = seq;
-		this.index = index;
-		this.val = val;
-	}
-}
--- a/src/alice/datasegment/Command.java	Thu Jan 12 13:19:04 2012 +0900
+++ b/src/alice/datasegment/Command.java	Thu Jan 12 13:48:34 2012 +0900
@@ -2,19 +2,25 @@
 
 import org.msgpack.type.Value;
 
+import alice.codesegment.CodeSegment;
+
 public class Command {
 	public CommandType cmdType;
+	public String argKey;
 	public Value val;
 	public int index;
 	public int seq;
 	public DataSegmentManager manager;
-
-	public Command(CommandType cmdType, Value val, int index, int seq, DataSegmentManager manager) {
+	public CodeSegment cs;
+	
+	public Command(CommandType cmdType, String argKey, Value val, int index, int seq, DataSegmentManager manager, CodeSegment cs) {
 		this.cmdType = cmdType;
+		this.argKey = argKey;
 		this.val = val;
 		this.index = index;
 		this.seq = seq;
 		this.manager = manager;
+		this.cs = cs;
 	}
 	
 }
--- a/src/alice/datasegment/DataSegmentKey.java	Thu Jan 12 13:19:04 2012 +0900
+++ b/src/alice/datasegment/DataSegmentKey.java	Thu Jan 12 13:48:34 2012 +0900
@@ -4,7 +4,6 @@
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import alice.codesegment.Reply;
 import alice.datasegment.Command; 
 
 public class DataSegmentKey {
--- a/src/alice/datasegment/DataSegmentManager.java	Thu Jan 12 13:19:04 2012 +0900
+++ b/src/alice/datasegment/DataSegmentManager.java	Thu Jan 12 13:48:34 2012 +0900
@@ -6,7 +6,6 @@
 import org.msgpack.type.Value;
 
 import alice.codesegment.CodeSegment;
-import alice.codesegment.Reply;
 
 public abstract class DataSegmentManager {
 	public ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>();
@@ -15,14 +14,14 @@
 	
 	public abstract void put(String key, Value val);
 	public abstract void update(String key, Value val);
-	public void take(String key, CodeSegment cs) {
-		take(key, 0, cs);
+	public void take(String argKey, String key, CodeSegment cs) {
+		take(argKey, key, 0, cs);
 	}
-	public abstract void take(String key, int index, CodeSegment cs);
-	public void peek(String key, CodeSegment cs) {
-		peek(key, 0, cs);
+	public abstract void take(String argKey, String key, int index, CodeSegment cs);
+	public void peek(String argKey, String key, CodeSegment cs) {
+		peek(argKey, key, 0, cs);
 	}
-	public abstract void peek(String key, int index, CodeSegment cs);
+	public abstract void peek(String argKey, String key, int index, CodeSegment cs);
 	public abstract void remove(String key);
 	
 }
--- a/src/alice/datasegment/LocalDataSegmentManager.java	Thu Jan 12 13:19:04 2012 +0900
+++ b/src/alice/datasegment/LocalDataSegmentManager.java	Thu Jan 12 13:48:34 2012 +0900
@@ -5,7 +5,6 @@
 import org.msgpack.type.Value;
 
 import alice.codesegment.CodeSegment;
-import alice.codesegment.Reply;
 import alice.datasegment.CommandType;
 
 public class LocalDataSegmentManager extends DataSegmentManager {
@@ -18,13 +17,12 @@
 			while (true) {
 				try {
 					Reply reply = replyQueue.take();
-					
+					Command cmd = seqHash.get(reply.seq);
+					cmd.cs.ids.reply(cmd.argKey, new DataSegmentValue(reply.index, reply.val));
 				} catch (InterruptedException e) {
 					e.printStackTrace();
 				}
-				
 			}
-			
 		}
 		
 	};
@@ -44,29 +42,29 @@
 	@Override
 	public void put(String key, Value val) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
-		dataSegmentKey.addCommand(new Command(CommandType.PUT, val, 0, 0, this));
+		dataSegmentKey.addCommand(new Command(CommandType.PUT, null, val, 0, 0, this, null));
 	}
 
 	@Override
 	public void update(String key, Value val) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
-		dataSegmentKey.addCommand(new Command(CommandType.UPDATE, val, 0, 0, this));
+		dataSegmentKey.addCommand(new Command(CommandType.UPDATE, null, val, 0, 0, this, null));
 	}
 
 	@Override
-	public void take(String key, int index, CodeSegment cs) {
+	public void take(String argKey, String key, int index, CodeSegment cs) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
 		int seq = this.seq.getAndIncrement();
-		Command cmd = new Command(CommandType.TAKE, null, index, seq, this);
+		Command cmd = new Command(CommandType.TAKE, argKey, null, index, seq, this, cs);
 		seqHash.put(seq, cmd);
 		dataSegmentKey.addCommand(cmd);
 	}
 
 	@Override
-	public void peek(String key, int index, CodeSegment cs) {
+	public void peek(String argKey, String key, int index, CodeSegment cs) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
 		int seq = this.seq.getAndIncrement();
-		Command cmd = new Command(CommandType.PEEK, null, index, seq, this);
+		Command cmd = new Command(CommandType.PEEK, argKey, null, index, seq, this, cs);
 		seqHash.put(seq, cmd);
 		dataSegmentKey.addCommand(cmd);
 	}
@@ -74,7 +72,7 @@
 	@Override
 	public void remove(String key) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
-		dataSegmentKey.addCommand(new Command(CommandType.REMOVE, null, 0, 0, this));
+		dataSegmentKey.addCommand(new Command(CommandType.REMOVE, null, null, 0, 0, this, null));
 	}
 
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/datasegment/Reply.java	Thu Jan 12 13:48:34 2012 +0900
@@ -0,0 +1,15 @@
+package alice.datasegment;
+
+import org.msgpack.type.Value;
+
+public class Reply {
+	int seq;
+	int index;
+	Value val;
+	
+	public Reply(int seq, int index, Value val) {
+		this.seq = seq;
+		this.index = index;
+		this.val = val;
+	}
+}
--- a/src/alice/test/codesegment/TestCodeSegment.java	Thu Jan 12 13:19:04 2012 +0900
+++ b/src/alice/test/codesegment/TestCodeSegment.java	Thu Jan 12 13:48:34 2012 +0900
@@ -10,7 +10,7 @@
 
 	@Override
 	public void prepare() {
-		ids.peek("local", "key1");
+		ids.peek("arg1", "local", "key1");
 	}
 	
 	@Override