changeset 18:72dd27d952b0

change InputDataSegment API
author kazz <kazz@cr.ie.u-ryukyu.ac.jp>
date Sun, 15 Jan 2012 16:03:11 +0900
parents bb075e103cd3
children e7867328a2fb
files src/alice/codesegment/InputDataSegment.java src/alice/daemon/AcceptThread.java src/alice/datasegment/Command.java src/alice/datasegment/DataSegmentKey.java src/alice/datasegment/DataSegmentManager.java src/alice/datasegment/DataSegmentReceiver.java src/alice/datasegment/LocalDataSegmentManager.java src/alice/datasegment/RemoteDataSegmentManager.java src/alice/test/codesegment/RemoteIncrement.java src/alice/test/codesegment/RemoteStartCodeSegment.java src/alice/test/codesegment/StartCodeSegment.java src/alice/test/codesegment/TestCodeSegment.java
diffstat 12 files changed, 81 insertions(+), 55 deletions(-) [+]
line wrap: on
line diff
--- a/src/alice/codesegment/InputDataSegment.java	Sun Jan 15 15:18:01 2012 +0900
+++ b/src/alice/codesegment/InputDataSegment.java	Sun Jan 15 16:03:11 2012 +0900
@@ -1,14 +1,13 @@
 package alice.codesegment;
 
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import alice.datasegment.DataSegment;
+import alice.datasegment.DataSegmentReceiver;
 import alice.datasegment.DataSegmentValue;
 
 public class InputDataSegment {
 	
-	private ConcurrentHashMap<String, DataSegmentValue> inputDataSegments = new ConcurrentHashMap<String, DataSegmentValue>();
 	private CodeSegment cs;
 	private AtomicInteger count = new AtomicInteger(1); // for execute()
 	
@@ -16,26 +15,27 @@
 		this.cs = cs;
 	}
 	
-	public void peek(String argKey, String managerKey, String key) {
-		peek(argKey, managerKey, key, 0);
+	public void peek(DataSegmentReceiver receiver, String managerKey, String key) {
+		peek(receiver, managerKey, key, 0);
 	}
 	
-	public void peek(String argKey, String managerKey, String key, int index) {
-		DataSegment.get(managerKey).peek(argKey, key, index, cs);
+	public void peek(DataSegmentReceiver receiver, String managerKey, String key, int index) {
+		DataSegment.get(managerKey).peek(receiver, key, index, cs);
 		count.getAndIncrement();
 	}
 
-	public void take(String argKey,String managerKey, String key) {
-		take(argKey, managerKey, key, 0);
+	public void take(DataSegmentReceiver receiver, String managerKey, String key) {
+		take(receiver, managerKey, key, 0);
 	}
 	
-	public void take(String argKey, String managerKey, String key, int index) {
-		DataSegment.get(managerKey).take(argKey, key, index, cs);
+	public void take(DataSegmentReceiver receiver, String managerKey, String key, int index) {
+		DataSegment.get(managerKey).take(receiver, key, index, cs);
 		count.getAndIncrement();
 	}
 
-	public void reply(String key, DataSegmentValue val) {
-		inputDataSegments.put(key, val);
+	public void reply(DataSegmentReceiver receiver, DataSegmentValue val) {
+		receiver.index = val.index;
+		receiver.val = val.val;
 		execute();
 	}
 	
@@ -48,8 +48,5 @@
 			}
 		}
 	}
-	
-	public DataSegmentValue get(String argKey) {
-		return inputDataSegments.get(argKey);
-	}
+
 }
--- a/src/alice/daemon/AcceptThread.java	Sun Jan 15 15:18:01 2012 +0900
+++ b/src/alice/daemon/AcceptThread.java	Sun Jan 15 16:03:11 2012 +0900
@@ -4,7 +4,6 @@
 import java.net.ServerSocket;
 import java.net.Socket;
 
-import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
 import alice.datasegment.DataSegment;
--- a/src/alice/datasegment/Command.java	Sun Jan 15 15:18:01 2012 +0900
+++ b/src/alice/datasegment/Command.java	Sun Jan 15 16:03:11 2012 +0900
@@ -9,16 +9,16 @@
 public class Command {
 	public CommandType type;
 	public String key;
-	public String argKey;
+	public DataSegmentReceiver receiver;
 	public Value val;
 	public int index;
 	public int seq;
 	public BlockingQueue<Command> replyQueue;
 	public CodeSegment cs;
 	
-	public Command(CommandType cmdType, String argKey, String key, Value val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs) {
+	public Command(CommandType cmdType, DataSegmentReceiver receiver, String key, Value val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs) {
 		this.type = cmdType;
-		this.argKey = argKey;
+		this.receiver = receiver;
 		this.key = key;
 		this.val = val;
 		this.index = index;
--- a/src/alice/datasegment/DataSegmentKey.java	Sun Jan 15 15:18:01 2012 +0900
+++ b/src/alice/datasegment/DataSegmentKey.java	Sun Jan 15 16:03:11 2012 +0900
@@ -2,7 +2,6 @@
 
 import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
--- a/src/alice/datasegment/DataSegmentManager.java	Sun Jan 15 15:18:01 2012 +0900
+++ b/src/alice/datasegment/DataSegmentManager.java	Sun Jan 15 16:03:11 2012 +0900
@@ -22,7 +22,7 @@
 				try {
 					Command reply = replyQueue.take();
 					Command cmd = seqHash.get(reply.seq);
-					cmd.cs.ids.reply(cmd.argKey, new DataSegmentValue(reply.index, reply.val));
+					cmd.cs.ids.reply(cmd.receiver, new DataSegmentValue(reply.index, reply.val));
 				} catch (InterruptedException e) {
 					e.printStackTrace();
 				}
@@ -33,14 +33,14 @@
 	
 	public abstract void put(String key, Value val);
 	public abstract void update(String key, Value val);
-	public void take(String argKey, String key, CodeSegment cs) {
-		take(argKey, key, 0, cs);
+	public void take(DataSegmentReceiver receiver, String key, CodeSegment cs) {
+		take(receiver, key, 0, cs);
 	}
-	public abstract void take(String argKey, String key, int index, CodeSegment cs);
-	public void peek(String argKey, String key, CodeSegment cs) {
-		peek(argKey, key, 0, cs);
+	public abstract void take(DataSegmentReceiver receiver, String key, int index, CodeSegment cs);
+	public void peek(DataSegmentReceiver receiver, String key, CodeSegment cs) {
+		peek(receiver, key, 0, cs);
 	}
-	public abstract void peek(String argKey, String key, int index, CodeSegment cs);
+	public abstract void peek(DataSegmentReceiver receiver, String key, int index, CodeSegment cs);
 	public abstract void remove(String key);
 	
 }
--- a/src/alice/datasegment/DataSegmentReceiver.java	Sun Jan 15 15:18:01 2012 +0900
+++ b/src/alice/datasegment/DataSegmentReceiver.java	Sun Jan 15 16:03:11 2012 +0900
@@ -1,5 +1,34 @@
 package alice.datasegment;
 
+import org.msgpack.type.Value;
+
+import alice.codesegment.InputDataSegment;
+
 public class DataSegmentReceiver {
+	public InputDataSegment ids;
+	public int index;
+	public Value val;
+	public CommandType type;
+	
+	
+	public DataSegmentReceiver(InputDataSegment ids, CommandType type) {
+		this.ids = ids;
+		this.type = type;
+	}
+	
+	public void setKey(String managerKey, String key) {
+		setKey(managerKey, key, 0);
+	}
 
+	public void setKey(String managerKey, String key, int index) {
+		switch (type) {
+		case PEEK:
+			ids.peek(this, managerKey, key, index);
+			break;
+		case TAKE:
+			ids.take(this, managerKey, key, index);
+			break;
+		}
+	}
+	
 }
--- a/src/alice/datasegment/LocalDataSegmentManager.java	Sun Jan 15 15:18:01 2012 +0900
+++ b/src/alice/datasegment/LocalDataSegmentManager.java	Sun Jan 15 16:03:11 2012 +0900
@@ -37,19 +37,19 @@
 	}
 
 	@Override
-	public void take(String argKey, String key, int index, CodeSegment cs) {
+	public void take(DataSegmentReceiver receiver, String key, int index, CodeSegment cs) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
 		int seq = this.seq.getAndIncrement();
-		Command cmd = new Command(CommandType.TAKE, argKey, null, null, index, seq, replyQueue, cs);
+		Command cmd = new Command(CommandType.TAKE, receiver, null, null, index, seq, replyQueue, cs);
 		seqHash.put(seq, cmd);
 		dataSegmentKey.addCommand(cmd);
 	}
 
 	@Override
-	public void peek(String argKey, String key, int index, CodeSegment cs) {
+	public void peek(DataSegmentReceiver receiver, String key, int index, CodeSegment cs) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
 		int seq = this.seq.getAndIncrement();
-		Command cmd = new Command(CommandType.PEEK, argKey, null, null, index, seq, replyQueue, cs);
+		Command cmd = new Command(CommandType.PEEK, receiver, null, null, index, seq, replyQueue, cs);
 		seqHash.put(seq, cmd);
 		dataSegmentKey.addCommand(cmd);
 	}
--- a/src/alice/datasegment/RemoteDataSegmentManager.java	Sun Jan 15 15:18:01 2012 +0900
+++ b/src/alice/datasegment/RemoteDataSegmentManager.java	Sun Jan 15 16:03:11 2012 +0900
@@ -29,17 +29,17 @@
 	}
 
 	@Override
-	public void take(String argKey, String key, int index, CodeSegment cs) {
+	public void take(DataSegmentReceiver receiver, String key, int index, CodeSegment cs) {
 		int seq = this.seq.getAndIncrement();
-		Command cmd = new Command(CommandType.TAKE, argKey, key, null, index, seq, replyQueue, cs);
+		Command cmd = new Command(CommandType.TAKE, receiver, key, null, index, seq, replyQueue, cs);
 		seqHash.put(seq, cmd);
 		connection.sendCommand(cmd);		
 	}
 
 	@Override
-	public void peek(String argKey, String key, int index, CodeSegment cs) {
+	public void peek(DataSegmentReceiver receiver, String key, int index, CodeSegment cs) {
 		int seq = this.seq.getAndIncrement();
-		Command cmd = new Command(CommandType.PEEK, argKey, key, null, index, seq, replyQueue, cs);
+		Command cmd = new Command(CommandType.PEEK, receiver, key, null, index, seq, replyQueue, cs);
 		seqHash.put(seq, cmd);
 		connection.sendCommand(cmd);
 	}
--- a/src/alice/test/codesegment/RemoteIncrement.java	Sun Jan 15 15:18:01 2012 +0900
+++ b/src/alice/test/codesegment/RemoteIncrement.java	Sun Jan 15 16:03:11 2012 +0900
@@ -3,19 +3,21 @@
 import org.msgpack.type.ValueFactory;
 
 import alice.codesegment.CodeSegment;
-import alice.datasegment.DataSegmentValue;
+import alice.datasegment.CommandType;
+import alice.datasegment.DataSegmentReceiver;
 
 public class RemoteIncrement extends CodeSegment {
 
+	public DataSegmentReceiver num = new DataSegmentReceiver(ids, CommandType.TAKE);
+
 	@Override
 	public void run() {
-		DataSegmentValue data = ids.get("num");
-		int num = data.val.asIntegerValue().getInt();
+		int num = this.num.val.asIntegerValue().getInt();
 		System.out.println("[CodeSegment] " + num++);
 		if (num == 10) System.exit(0);
 
-		CodeSegment cs = new RemoteIncrement();
-		cs.ids.take("num", "remote", "num");
+		RemoteIncrement cs = new RemoteIncrement();
+		cs.num.setKey("remote", "num");
 		cs.ids.execute();
 		
 		ods.put("local", "num", ValueFactory.createIntegerValue(num));
--- a/src/alice/test/codesegment/RemoteStartCodeSegment.java	Sun Jan 15 15:18:01 2012 +0900
+++ b/src/alice/test/codesegment/RemoteStartCodeSegment.java	Sun Jan 15 16:03:11 2012 +0900
@@ -6,11 +6,11 @@
 import alice.codesegment.CodeSegment;
 
 public class RemoteStartCodeSegment extends CodeSegment {
-
+	
 	@Override
 	public void run() {
-		CodeSegment cs = new RemoteIncrement();
-		cs.ids.take("num", "remote", "num");
+		RemoteIncrement cs = new RemoteIncrement();
+		cs.num.setKey("remote", "num");
 		cs.ids.execute();
 		
 		Value num = ValueFactory.createIntegerValue(0);
--- a/src/alice/test/codesegment/StartCodeSegment.java	Sun Jan 15 15:18:01 2012 +0900
+++ b/src/alice/test/codesegment/StartCodeSegment.java	Sun Jan 15 16:03:11 2012 +0900
@@ -11,8 +11,8 @@
 	public void run() {
 		System.out.println("run StartCodeSegment");
 		
-		CodeSegment cs = new TestCodeSegment();
-		cs.ids.peek("arg1", "local", "key1");
+		TestCodeSegment cs = new TestCodeSegment();
+		cs.arg1.setKey("local", "key1");
 		cs.ids.execute();
 		System.out.println("create TestCodeSegment");
 		
--- a/src/alice/test/codesegment/TestCodeSegment.java	Sun Jan 15 15:18:01 2012 +0900
+++ b/src/alice/test/codesegment/TestCodeSegment.java	Sun Jan 15 16:03:11 2012 +0900
@@ -4,26 +4,26 @@
 import org.msgpack.type.ValueFactory;
 
 import alice.codesegment.CodeSegment;
-import alice.datasegment.DataSegmentValue;
+import alice.datasegment.CommandType;
+import alice.datasegment.DataSegmentReceiver;
 
 public class TestCodeSegment extends CodeSegment {
 	
-	DataSegmentValue arg1;
+	DataSegmentReceiver arg1 = new DataSegmentReceiver(ids, CommandType.PEEK);
 	
 	@Override
 	public void run() {
-		DataSegmentValue data = ids.get("arg1");
-		System.out.println("index = " + data.index);
-		System.out.println("data = " + data.val);
-		System.out.println(data.val.getType());
+		System.out.println("index = " + arg1.index);
+		System.out.println("data = " + arg1.val);
+		System.out.println(arg1.val.getType());
 		
-		if (data.index == 10) {
+		if (arg1.index == 10) {
 			System.exit(0);
 			return;
 		}
 		
-		CodeSegment cs = new TestCodeSegment();
-		cs.ids.peek("arg1", "local", "key1", data.index);
+		TestCodeSegment cs = new TestCodeSegment();
+		cs.arg1.setKey("local", "key1", arg1.index);
 		cs.ids.execute();
 		
 		Value val = ValueFactory.createRawValue("String data");