changeset 6:c78a1cc2cd8f

implements Reply
author one
date Thu, 12 Jan 2012 13:19:04 +0900
parents 80375ae09a1f
children 352eb19d837d
files src/alice/codesegment/CodeSegmentManager.java src/alice/codesegment/Input.java src/alice/codesegment/Reply.java src/alice/datasegment/Command.java src/alice/datasegment/DataSegmentKey.java src/alice/datasegment/DataSegmentManager.java src/alice/datasegment/LocalDataSegmentManager.java
diffstat 7 files changed, 103 insertions(+), 40 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/codesegment/CodeSegmentManager.java	Thu Jan 12 13:19:04 2012 +0900
@@ -0,0 +1,35 @@
+package alice.codesegment;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class CodeSegmentManager {
+	private static CodeSegmentManager instance = new CodeSegmentManager();
+	private LinkedBlockingQueue<Reply> replyQueue = new LinkedBlockingQueue<Reply>();
+	private ConcurrentHashMap<Integer, CodeSegment> seqHash = new ConcurrentHashMap<Integer, CodeSegment>();
+	
+	private CodeSegmentManager() {
+		Runnable replyThread = new Runnable() {
+
+			@Override
+			public void run() {
+				while (true) {
+					try {
+						Reply reply = replyQueue.take();
+						
+					} catch (InterruptedException e) {
+						e.printStackTrace();
+					}
+				}
+				
+			}
+		};
+	}
+	
+	public void create(CodeSegment cs) {
+	}
+	
+	public static CodeSegmentManager get() {
+		return instance;
+	}
+}
--- a/src/alice/codesegment/Input.java	Wed Jan 11 23:28:02 2012 +0900
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,15 +0,0 @@
-package alice.codesegment;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-import alice.datasegment.CommandType;
-
-@Retention(RetentionPolicy.RUNTIME)
-@Target(ElementType.FIELD)
-public @interface Input {
-	public CommandType type();
-	public int index();
-}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/codesegment/Reply.java	Thu Jan 12 13:19:04 2012 +0900
@@ -0,0 +1,15 @@
+package alice.codesegment;
+
+import org.msgpack.type.Value;
+
+public class Reply {
+	int seq;
+	int index;
+	Value val;
+	
+	public Reply(int seq, int index, Value val) {
+		this.seq = seq;
+		this.index = index;
+		this.val = val;
+	}
+}
--- a/src/alice/datasegment/Command.java	Wed Jan 11 23:28:02 2012 +0900
+++ b/src/alice/datasegment/Command.java	Thu Jan 12 13:19:04 2012 +0900
@@ -2,21 +2,19 @@
 
 import org.msgpack.type.Value;
 
-import alice.codesegment.CodeSegment;
-
 public class Command {
 	public CommandType cmdType;
 	public Value val;
 	public int index;
-	public CodeSegment cs;
 	public int seq;
+	public DataSegmentManager manager;
 
-	public Command(CommandType cmdType, Value val, int index, CodeSegment cs, int seq) {
-		this.cs = cs;
+	public Command(CommandType cmdType, Value val, int index, int seq, DataSegmentManager manager) {
 		this.cmdType = cmdType;
 		this.val = val;
 		this.index = index;
 		this.seq = seq;
+		this.manager = manager;
 	}
 	
 }
--- a/src/alice/datasegment/DataSegmentKey.java	Wed Jan 11 23:28:02 2012 +0900
+++ b/src/alice/datasegment/DataSegmentKey.java	Thu Jan 12 13:19:04 2012 +0900
@@ -4,6 +4,7 @@
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import alice.codesegment.Reply;
 import alice.datasegment.Command; 
 
 public class DataSegmentKey {
@@ -36,12 +37,16 @@
 							}
 						case PUT:
 							int index = tailIndex.getAndIncrement();
-							dataList.add(new DataSegmentValue(index, cmd.val));
+							DataSegmentValue dsv = new DataSegmentValue(index, cmd.val); 
+							dataList.add(dsv);
 							// run waiting peek and take
 							for (Command waitCmd : waitList) {
 								if (waitCmd.index < index) {
-									// TODO: make and send reply msg
-									
+									waitCmd.manager.replyQueue.put(new Reply(waitCmd.seq, index, cmd.val));
+									if (waitCmd.cmdType == CommandType.TAKE) { // delete data, if it run take cmd.
+										dataList.remove(dsv);
+										break;
+									}
 								}
 							}
 							break;
@@ -52,30 +57,25 @@
 							}
 							for (DataSegmentValue data : dataList) {
 								if (data.index > cmd.index) {
-									// TODO: make and send reply msg
-									
+									cmd.manager.replyQueue.put(new Reply(cmd.seq, data.index, data.val));
 									break;
 								}
 							}
+							waitList.add(cmd);
 							break;
 						case TAKE:
 							if (cmd.index >= tailIndex.get()) {
 								waitList.add(cmd);
 								break;
 							}
-							boolean waitFlag = true;
 							for (DataSegmentValue data : dataList) {
 								if (data.index > cmd.index) {
-									// TODO: make and send reply msg
-									
-									
+									cmd.manager.replyQueue.put(new Reply(cmd.seq, data.index, data.val));
 									dataList.remove(data);
-									waitFlag = false;
 									break;
 								}
 							}
-							if (waitFlag)
-								waitList.add(cmd);
+							waitList.add(cmd);
 							break;
 						case REMOVE:
 							// TODO: implements later
@@ -88,7 +88,7 @@
 				}
 			}
 		};
-		keyThread.run();
+		new Thread(keyThread).start();
 	};
 	
 }
--- a/src/alice/datasegment/DataSegmentManager.java	Wed Jan 11 23:28:02 2012 +0900
+++ b/src/alice/datasegment/DataSegmentManager.java	Thu Jan 12 13:19:04 2012 +0900
@@ -1,12 +1,17 @@
 package alice.datasegment;
 
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
 import org.msgpack.type.Value;
 
 import alice.codesegment.CodeSegment;
+import alice.codesegment.Reply;
 
 public abstract class DataSegmentManager {
-	ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>();
+	public ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>();
+	public ConcurrentHashMap<Integer, Command> seqHash = new ConcurrentHashMap<Integer, Command>();
+	public LinkedBlockingQueue<Reply> replyQueue = new LinkedBlockingQueue<Reply>();  
 	
 	public abstract void put(String key, Value val);
 	public abstract void update(String key, Value val);
--- a/src/alice/datasegment/LocalDataSegmentManager.java	Wed Jan 11 23:28:02 2012 +0900
+++ b/src/alice/datasegment/LocalDataSegmentManager.java	Thu Jan 12 13:19:04 2012 +0900
@@ -5,11 +5,32 @@
 import org.msgpack.type.Value;
 
 import alice.codesegment.CodeSegment;
+import alice.codesegment.Reply;
 import alice.datasegment.CommandType;
 
 public class LocalDataSegmentManager extends DataSegmentManager {
 	
-	private AtomicInteger seq = new AtomicInteger(1); 
+	private AtomicInteger seq = new AtomicInteger(1);
+	private Runnable replyThread = new Runnable() {
+
+		@Override
+		public void run() {
+			while (true) {
+				try {
+					Reply reply = replyQueue.take();
+					
+				} catch (InterruptedException e) {
+					e.printStackTrace();
+				}
+				
+			}
+			
+		}
+		
+	};
+	public LocalDataSegmentManager() {
+		new Thread(replyThread).start();
+	}
 	
 	private DataSegmentKey getDataSegmentKey(String key) {
 		DataSegmentKey newDataSegmentKey = new DataSegmentKey();
@@ -23,33 +44,37 @@
 	@Override
 	public void put(String key, Value val) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
-		dataSegmentKey.addCommand(new Command(CommandType.PUT, val, 0, null, 0));
+		dataSegmentKey.addCommand(new Command(CommandType.PUT, val, 0, 0, this));
 	}
 
 	@Override
 	public void update(String key, Value val) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
-		dataSegmentKey.addCommand(new Command(CommandType.UPDATE, val, 0, null, 0));
+		dataSegmentKey.addCommand(new Command(CommandType.UPDATE, val, 0, 0, this));
 	}
 
 	@Override
 	public void take(String key, int index, CodeSegment cs) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
 		int seq = this.seq.getAndIncrement();
-		dataSegmentKey.addCommand(new Command(CommandType.TAKE, null, index, cs, seq));
+		Command cmd = new Command(CommandType.TAKE, null, index, seq, this);
+		seqHash.put(seq, cmd);
+		dataSegmentKey.addCommand(cmd);
 	}
 
 	@Override
 	public void peek(String key, int index, CodeSegment cs) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
 		int seq = this.seq.getAndIncrement();
-		dataSegmentKey.addCommand(new Command(CommandType.PEEK, null, index, cs, seq));
+		Command cmd = new Command(CommandType.PEEK, null, index, seq, this);
+		seqHash.put(seq, cmd);
+		dataSegmentKey.addCommand(cmd);
 	}
 
 	@Override
 	public void remove(String key) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
-		dataSegmentKey.addCommand(new Command(CommandType.REMOVE, null, 0, null, 0));
+		dataSegmentKey.addCommand(new Command(CommandType.REMOVE, null, 0, 0, this));
 	}
 
 }