changeset 198:f151dea22b2c working

add flip api
author sugi
date Tue, 19 Mar 2013 01:25:09 +0900
parents 2b28d3c16a58
children 15b68b65f8a4
files .settings/org.eclipse.core.resources.prefs src/alice/codesegment/InputDataSegment.java src/alice/datasegment/Command.java src/alice/datasegment/DataSegmentKey.java src/alice/datasegment/DataSegmentManager.java src/alice/datasegment/DataSegmentValue.java src/alice/datasegment/LocalDataSegmentManager.java src/alice/datasegment/Receiver.java src/alice/test/codesegment/api/FlipCodeSegment.java src/alice/test/codesegment/api/FlipTest.java src/alice/test/codesegment/api/StartCodeSegment.java src/alice/test/codesegment/api/TakeCodeSegment.java src/alice/test/codesegment/api/TestApiAlice.java
diffstat 13 files changed, 150 insertions(+), 50 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/.settings/org.eclipse.core.resources.prefs	Tue Mar 19 01:25:09 2013 +0900
@@ -0,0 +1,2 @@
+eclipse.preferences.version=1
+encoding//src/alice/test/codesegment/local/bitonicsort/SortTest.java=UTF-8
--- a/src/alice/codesegment/InputDataSegment.java	Fri Mar 15 17:25:25 2013 +0900
+++ b/src/alice/codesegment/InputDataSegment.java	Tue Mar 19 01:25:09 2013 +0900
@@ -2,6 +2,8 @@
 
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.msgpack.type.Value;
+
 import alice.datasegment.CommandType;
 import alice.datasegment.DataSegment;
 import alice.datasegment.DataSegmentValue;
@@ -18,6 +20,8 @@
 	public CodeSegment cs;
 	private AtomicInteger count = new AtomicInteger(1); // 1 for no input data segments
 	private AtomicInteger keyCount = new AtomicInteger(0); // number of DataSegments
+
+	private DataSegmentValue dsv;
 	
 	public InputDataSegment(CodeSegment cs) {
 		this.cs = cs;
@@ -54,12 +58,24 @@
 	public void take(Receiver receiver, String key, int index) {
 		DataSegment.getLocal().take(receiver, key, index, cs);
 	}
+	
+	public void flip(Receiver receiver, String key){
+		
+	}
+	
+	public void flip(Receiver receiver, Value val, Object obj){
+		//int index = DataSegment.getLocal().getDataSegmentKey(receiver.key).getIndex();
+		//dsv.setValue(index, val, obj);
+		DataSegment.getLocal().flip(receiver.key, val, obj, dsv);
+		
+	}
 
 	public void reply(Receiver receiver, DataSegmentValue val) {
 		receiver.index = val.index;
 		receiver.val = val.val;
 		receiver.from = val.from;
 		receiver.obj = val.obj;
+		setDataSegmentValue(val);
 		receive();
 	}
 
@@ -88,5 +104,9 @@
 	public Receiver create(CommandType type) {
 		return new Receiver(this, type);
 	}
+	
+	private void setDataSegmentValue(DataSegmentValue dsv){
+		this.dsv = dsv;
+	}
 
 }
--- a/src/alice/datasegment/Command.java	Fri Mar 15 17:25:25 2013 +0900
+++ b/src/alice/datasegment/Command.java	Tue Mar 19 01:25:09 2013 +0900
@@ -1,7 +1,6 @@
 package alice.datasegment;
 
 import java.util.concurrent.BlockingQueue;
-
 import org.msgpack.type.Value;
 
 import alice.codesegment.CodeSegment;
@@ -20,9 +19,9 @@
 	public DataSegmentValue dsv;
 	
 	public Command(CommandType cmdType, int seq, DataSegmentValue dsv){
-		this.type = cmdType;
-		this.seq = seq;
-		this.dsv = dsv;
+		this.type=cmdType;
+		this.seq=seq;
+		this.dsv=dsv;
 	}
 	
 	public Command(CommandType cmdType, Receiver receiver, String key, Value val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) {
@@ -62,6 +61,8 @@
 	}
 	
 	
+	
+
 	public String getCommandString() {
 		String csName = "null";
 		if (cs != null) {
--- a/src/alice/datasegment/DataSegmentKey.java	Fri Mar 15 17:25:25 2013 +0900
+++ b/src/alice/datasegment/DataSegmentKey.java	Tue Mar 19 01:25:09 2013 +0900
@@ -17,8 +17,8 @@
 	private ArrayList<Command> waitList = new ArrayList<Command>();
 	private AtomicInteger tailIndex = new AtomicInteger(1);
 	
-	public AtomicInteger getTailIndex(){
-		return tailIndex;
+	public int getIndex(){
+		return tailIndex.getAndIncrement();
 	}
 	
 	public ArrayList<DataSegmentValue> getDataList(){
@@ -40,7 +40,8 @@
 				Command waitCmd = iter.next();
 				if (waitCmd.index < index) {
 					try {
-						waitCmd.replyQueue.put(new Command(CommandType.REPLY, null, null, cmd.val, cmd.obj, index, waitCmd.seq, null, null, cmd.reverseKey));
+						//waitCmd.replyQueue.put(new Command(CommandType.REPLY, null, null, cmd.val, cmd.obj, index, waitCmd.seq, null, null, cmd.reverseKey));
+						waitCmd.replyQueue.put(new Command(CommandType.REPLY, waitCmd.seq, dsv));
 					} catch (InterruptedException e) {
 						e.printStackTrace();
 					}
@@ -61,7 +62,8 @@
 			for (DataSegmentValue data : dataList) {
 				if (data.index > cmd.index) {
 					try {
-						cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, cmd.seq, null, null, data.from));
+						//cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, cmd.seq, null, null, data.from));
+						cmd.replyQueue.put(new Command(CommandType.REPLY, cmd.seq, data));
 					} catch (InterruptedException e) {
 						e.printStackTrace();
 					}
@@ -82,7 +84,8 @@
 				DataSegmentValue data = iter.next();
 				if (data.index > cmd.index) {
 					try {
-						cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, cmd.seq, null, null, data.from));
+						//cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, cmd.seq, null, null, data.from));
+						cmd.replyQueue.put(new Command(CommandType.REPLY, cmd.seq, data));
 					} catch (InterruptedException e) {
 						e.printStackTrace();
 					}
@@ -95,9 +98,24 @@
 				waitList.add(cmd);
 			break;
 		case FLIP:
-			index = tailIndex.getAndIncrement();
-			dataList.set(0, new DataSegmentValue(index, cmd.val, cmd.obj, cmd.reverseKey));
-			// need to check waitList 
+			index = cmd.dsv.index;
+			// need to check waitList
+			for (Iterator<Command> iter = waitList.iterator(); iter.hasNext(); ) {
+				Command waitCmd = iter.next();
+				if (waitCmd.index < index) {
+					try {
+						//waitCmd.replyQueue.put(new Command(CommandType.REPLY, null, null, cmd.val, cmd.obj, index, waitCmd.seq, null, null, cmd.reverseKey));
+						waitCmd.replyQueue.put(new Command(CommandType.REPLY, waitCmd.seq, cmd.dsv));
+					} catch (InterruptedException e) {
+						e.printStackTrace();
+					}
+					iter.remove();
+					if (waitCmd.type == CommandType.TAKE) { // someone is waiting for this put or update command
+						dataList.remove(cmd.dsv);
+						break;
+					}
+				}
+			}
 			break;
 		case REMOVE:
 			// TODO: implements later
--- a/src/alice/datasegment/DataSegmentManager.java	Fri Mar 15 17:25:25 2013 +0900
+++ b/src/alice/datasegment/DataSegmentManager.java	Tue Mar 19 01:25:09 2013 +0900
@@ -28,7 +28,8 @@
 						continue;
 					}
 					seqHash.remove(reply.seq);
-					cmd.cs.ids.reply(cmd.receiver, new DataSegmentValue(reply.index, reply.val, reply.obj, reply.reverseKey));
+					//cmd.cs.ids.reply(cmd.receiver, new DataSegmentValue(reply.index, reply.val, reply.obj, reply.reverseKey));
+					cmd.cs.ids.reply(cmd.receiver, reply.dsv);
 					if (logger.isDebugEnabled())
 						logger.debug(reply.getCommandString() + " " + cmd.getCommandString());
 				} catch (InterruptedException e) {
--- a/src/alice/datasegment/DataSegmentValue.java	Fri Mar 15 17:25:25 2013 +0900
+++ b/src/alice/datasegment/DataSegmentValue.java	Tue Mar 19 01:25:09 2013 +0900
@@ -21,12 +21,11 @@
 		this.val = val;
 		this.from = reverseKey;
 	}
-
-	public void setDataSegmentValue(int index, Value val, Object obj, String reverseKey){
+	
+	public synchronized void setValue(int index, Value val, Object obj){
 		this.index = index;
 		this.val = val;
 		this.obj = obj;
-		this.from = reverseKey;
 	}
-	
+
 }
--- a/src/alice/datasegment/LocalDataSegmentManager.java	Fri Mar 15 17:25:25 2013 +0900
+++ b/src/alice/datasegment/LocalDataSegmentManager.java	Tue Mar 19 01:25:09 2013 +0900
@@ -136,18 +136,12 @@
 		
 	}
 
-	public void flip(String key, Value val) {
+	public void flip(String key ,Value val, Object obj, DataSegmentValue dsv) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
-		Command cmd = new Command(CommandType.FLIP, null, key, val, 0, 0, replyQueue, null, reverseKey);
-		addCommand(dataSegmentKey, cmd);
-		if (logger.isDebugEnabled())
-			logger.debug(cmd.getCommandString());
-	}
-	
-	public void flip(String key, Object val) {
-		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
-		Command cmd = new Command(CommandType.FLIP, null, key, val, 0, 0, replyQueue, null, reverseKey);
-		addCommand(dataSegmentKey, cmd);
+		int index = dataSegmentKey.getIndex();
+		dsv.setValue(index, val, obj);
+		Command cmd = new Command(CommandType.FLIP, 0, dsv);
+		//addCommand(dataSegmentKey, cmd);
 		if (logger.isDebugEnabled())
 			logger.debug(cmd.getCommandString());
 	}
--- a/src/alice/datasegment/Receiver.java	Fri Mar 15 17:25:25 2013 +0900
+++ b/src/alice/datasegment/Receiver.java	Tue Mar 19 01:25:09 2013 +0900
@@ -4,7 +4,6 @@
 
 import org.msgpack.type.ArrayValue;
 import org.msgpack.type.Value;
-import org.msgpack.type.ValueFactory;
 import org.msgpack.type.ValueType;
 
 import alice.codesegment.InputDataSegment;
@@ -32,26 +31,12 @@
 		ids.regist();
 	}
 	
-	// for same key 
-	
 	public void flip(Value val){
-		DataSegment.getLocal().flip(this.key, val);
-	}
-	
-	public void flip(int val){
-		DataSegment.getLocal().flip(this.key, ValueFactory.createIntegerValue(val));
+		ids.flip(this, val, null);
 	}
 	
-	public void flip(String val){
-		DataSegment.getLocal().flip(this.key, ValueFactory.createRawValue(val));
-	}
-	
-	public void flip(byte[] val){
-		DataSegment.getLocal().flip(this.key, ValueFactory.createRawValue(val, true));
-	}
-	
-	public <T> void flip(T val) {
-		DataSegment.getLocal().flip(key, val);
+	public void flip(Object obj) {
+		ids.flip(this, null, obj);
 	}
 	
 	public void setKey(String managerKey, String key) {
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/test/codesegment/api/FlipCodeSegment.java	Tue Mar 19 01:25:09 2013 +0900
@@ -0,0 +1,26 @@
+package alice.test.codesegment.api;
+
+import alice.codesegment.CodeSegment;
+
+public class FlipCodeSegment extends CodeSegment{
+	
+	private String key;
+	public FlipCodeSegment(String _key){
+		this.key = _key;
+	}
+	
+	@Override
+	public void run() {
+		Integer num = new Integer(0);
+		ods.put(key, num, false);
+		//System.out.println("Key is " +key);
+		new FlipTest(key);
+		//for (int i = 0; i < 1000000; i++){
+		//ods.put(key, num, false);
+		//System.out.println("put");
+		//}
+		
+		
+	}
+	
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/test/codesegment/api/FlipTest.java	Tue Mar 19 01:25:09 2013 +0900
@@ -0,0 +1,48 @@
+package alice.test.codesegment.api;
+
+import alice.codesegment.CodeSegment;
+import alice.datasegment.CommandType;
+import alice.datasegment.Receiver;
+
+public class FlipTest extends CodeSegment{
+	
+	private Receiver arg1 = ids.create(CommandType.PEEK);
+	public static long t = 0;
+	public static boolean flag = false;
+	public static int count = 0;
+	
+	public FlipTest(String key){
+		arg1.setKey(key);
+	}
+	
+	public FlipTest(String key, int index){
+		arg1.setKey(key, index);
+	}
+
+	@Override
+	public void run() {
+		if (flag){
+			System.out.println(System.currentTimeMillis() - t);
+			System.out.println(arg1.obj);
+			if (count >= 100) System.exit(0);
+			flag = false;
+			count++;
+			new FlipCodeSegment(Long.toString(t)).execute();
+			
+		} else {
+			t = System.currentTimeMillis();
+			
+			for (int i = 0;i<1000000;i++){
+				Integer num = i;
+				arg1.flip(num);
+				//System.out.println("flip");
+				//ods.update(arg1.key, num, false);
+			}
+
+			flag = true;
+			new FlipTest(arg1.key);
+			//new FlipTest(arg1.key ,1000000);
+		}
+	}
+
+}
--- a/src/alice/test/codesegment/api/StartCodeSegment.java	Fri Mar 15 17:25:25 2013 +0900
+++ b/src/alice/test/codesegment/api/StartCodeSegment.java	Tue Mar 19 01:25:09 2013 +0900
@@ -24,8 +24,10 @@
 				
 			} else if ("-take".equals(args[i])){
 				new PutCodeSegment().execute();
-				new TakeCodeSegment();
+				new TakeCodeSegment("num");
 				
+			} else if ("-flip".equals(args[i])){
+				new FlipCodeSegment("key").execute();
 			}
 			
 			
--- a/src/alice/test/codesegment/api/TakeCodeSegment.java	Fri Mar 15 17:25:25 2013 +0900
+++ b/src/alice/test/codesegment/api/TakeCodeSegment.java	Tue Mar 19 01:25:09 2013 +0900
@@ -8,12 +8,12 @@
 	
 	Receiver ds1 = ids.create(CommandType.TAKE);
 	
-	public TakeCodeSegment(){
-		this.ds1.setKey("num");
+	public TakeCodeSegment(String key){
+		this.ds1.setKey(key);
 	}	
 	@Override
 	public void run() {
-		new TakeCodeSegment();
+		new TakeCodeSegment(ds1.key);
 		
 	}
 }
--- a/src/alice/test/codesegment/api/TestApiAlice.java	Fri Mar 15 17:25:25 2013 +0900
+++ b/src/alice/test/codesegment/api/TestApiAlice.java	Tue Mar 19 01:25:09 2013 +0900
@@ -1,11 +1,15 @@
 package alice.test.codesegment.api;
 
+import alice.daemon.AliceDaemon;
+import alice.daemon.Config;
+
 public class TestApiAlice {
 
 	/**
 	 * @param args
 	 */
 	public static void main(String[] args) {
+		new AliceDaemon(new Config(args)).listen();
 		new StartCodeSegment(args).execute();
 	}