changeset 190:a85ff8dc16c1 working

add Object data
author one
date Thu, 07 Mar 2013 21:27:00 +0900
parents d2f5c885a367
children 60051454e443
files src/alice/codesegment/InputDataSegment.java src/alice/codesegment/OutputDataSegment.java src/alice/datasegment/Command.java src/alice/datasegment/DataSegmentKey.java src/alice/datasegment/DataSegmentManager.java src/alice/datasegment/DataSegmentValue.java src/alice/datasegment/LocalDataSegmentManager.java src/alice/datasegment/Receiver.java src/alice/test/codesegment/local/StartCodeSegment.java src/alice/test/codesegment/local/TestCodeSegment.java
diffstat 10 files changed, 83 insertions(+), 16 deletions(-) [+]
line wrap: on
line diff
--- a/src/alice/codesegment/InputDataSegment.java	Thu Mar 07 12:31:05 2013 +0900
+++ b/src/alice/codesegment/InputDataSegment.java	Thu Mar 07 21:27:00 2013 +0900
@@ -59,6 +59,7 @@
 		receiver.index = val.index;
 		receiver.val = val.val;
 		receiver.from = val.from;
+		receiver.obj = val.obj;
 		receive();
 	}
 
--- a/src/alice/codesegment/OutputDataSegment.java	Thu Mar 07 12:31:05 2013 +0900
+++ b/src/alice/codesegment/OutputDataSegment.java	Thu Mar 07 21:27:00 2013 +0900
@@ -31,11 +31,7 @@
 	}
 	
 	public <T> void put(String key, T val) {
-		try {
-			DataSegment.getLocal().put(key, SingletonMessage.getInstance().unconvert(val));
-		} catch (IOException e) {
-			e.printStackTrace();
-		}
+		DataSegment.getLocal().put(key, val);
 	}
 	
 	public void update(String key, Value val) {
--- a/src/alice/datasegment/Command.java	Thu Mar 07 12:31:05 2013 +0900
+++ b/src/alice/datasegment/Command.java	Thu Mar 07 21:27:00 2013 +0900
@@ -16,6 +16,7 @@
 	public BlockingQueue<Command> replyQueue;
 	public CodeSegment cs;
 	public String reverseKey;
+	public Object obj;
 	
 	public Command(CommandType cmdType, Receiver receiver, String key, Value val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) {
 		this.type = cmdType;
@@ -28,6 +29,30 @@
 		this.cs = cs;
 		this.reverseKey = reverseKey;
 	}
+	public Command(CommandType cmdType, Receiver receiver, String key, Object obj, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) {
+		this.type = cmdType;
+		this.receiver = receiver;
+		this.key = key;
+		this.obj = obj;
+		this.index = index;
+		this.seq = seq;
+		this.replyQueue = replyQueue;
+		this.cs = cs;
+		this.reverseKey = reverseKey;
+	}
+	
+	public Command(CommandType cmdType, Receiver receiver, String key, Value val, Object obj, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) {
+		this.type = cmdType;
+		this.receiver = receiver;
+		this.key = key;
+		this.val = val;
+		this.index = index;
+		this.seq = seq;
+		this.replyQueue = replyQueue;
+		this.cs = cs;
+		this.reverseKey = reverseKey;
+	}
+	
 	
 	public String getCommandString() {
 		String csName = "null";
--- a/src/alice/datasegment/DataSegmentKey.java	Thu Mar 07 12:31:05 2013 +0900
+++ b/src/alice/datasegment/DataSegmentKey.java	Thu Mar 07 21:27:00 2013 +0900
@@ -33,14 +33,14 @@
 			}
 		case PUT:
 			int index = tailIndex.getAndIncrement();
-			DataSegmentValue dsv = new DataSegmentValue(index, cmd.val, cmd.reverseKey); 
+			DataSegmentValue dsv = new DataSegmentValue(index, cmd.val, cmd.obj,cmd.reverseKey); 
 			dataList.add(dsv);
 			// Process waiting peek and take commands
 			for (Iterator<Command> iter = waitList.iterator(); iter.hasNext(); ) {
 				Command waitCmd = iter.next();
 				if (waitCmd.index < index) {
 					try {
-						waitCmd.replyQueue.put(new Command(CommandType.REPLY, null, null, cmd.val, index, waitCmd.seq, null, null, cmd.reverseKey));
+						waitCmd.replyQueue.put(new Command(CommandType.REPLY, null, null, cmd.val, cmd.obj, index, waitCmd.seq, null, null, cmd.reverseKey));
 					} catch (InterruptedException e) {
 						e.printStackTrace();
 					}
@@ -61,7 +61,7 @@
 			for (DataSegmentValue data : dataList) {
 				if (data.index > cmd.index) {
 					try {
-						cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null, data.from));
+						cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, cmd.seq, null, null, data.from));
 					} catch (InterruptedException e) {
 						e.printStackTrace();
 					}
@@ -82,7 +82,7 @@
 				DataSegmentValue data = iter.next();
 				if (data.index > cmd.index) {
 					try {
-						cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null, data.from));
+						cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, cmd.seq, null, null, data.from));
 					} catch (InterruptedException e) {
 						e.printStackTrace();
 					}
@@ -96,7 +96,7 @@
 			break;
 		case FLIP:
 			index = tailIndex.getAndIncrement();
-			dataList.get(0).setDataSegmentValue(index, cmd.val, cmd.reverseKey);
+			dataList.get(0).setDataSegmentValue(index, cmd.val, cmd.obj, cmd.reverseKey);
 			// need to check waitList 
 			break;
 		case REMOVE:
--- a/src/alice/datasegment/DataSegmentManager.java	Thu Mar 07 12:31:05 2013 +0900
+++ b/src/alice/datasegment/DataSegmentManager.java	Thu Mar 07 21:27:00 2013 +0900
@@ -28,7 +28,7 @@
 						continue;
 					}
 					seqHash.remove(reply.seq);
-					cmd.cs.ids.reply(cmd.receiver, new DataSegmentValue(reply.index, reply.val, reply.reverseKey));
+					cmd.cs.ids.reply(cmd.receiver, new DataSegmentValue(reply.index, reply.val, reply.obj, reply.reverseKey));
 					if (logger.isDebugEnabled())
 						logger.debug(reply.getCommandString() + " " + cmd.getCommandString());
 				} catch (InterruptedException e) {
--- a/src/alice/datasegment/DataSegmentValue.java	Thu Mar 07 12:31:05 2013 +0900
+++ b/src/alice/datasegment/DataSegmentValue.java	Thu Mar 07 21:27:00 2013 +0900
@@ -7,16 +7,25 @@
 	public int index;
 	public Value val;
 	public String from;
+	public Object obj;
 	
-	public DataSegmentValue(int index, Value val, String reverseKey) {
+	public DataSegmentValue(int index, Value val, Object obj,String reverseKey) {
+		this.index = index;
+		this.val = val;
+		this.from = reverseKey;
+		this.obj = obj;
+	}
+	
+	public DataSegmentValue(int index, Value val,String reverseKey) {
 		this.index = index;
 		this.val = val;
 		this.from = reverseKey;
 	}
-	
-	public void setDataSegmentValue(int index, Value val, String reverseKey){
+
+	public void setDataSegmentValue(int index, Value val, Object obj,String reverseKey){
 		this.index = index;
 		this.val = val;
+		this.obj = obj;
 		this.from = reverseKey;
 	}
 	
--- a/src/alice/datasegment/LocalDataSegmentManager.java	Thu Mar 07 12:31:05 2013 +0900
+++ b/src/alice/datasegment/LocalDataSegmentManager.java	Thu Mar 07 21:27:00 2013 +0900
@@ -68,6 +68,14 @@
 			logger.debug(cmd.getCommandString());
 	}
 	
+	public void put(String key, Object obj) {
+		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
+		Command cmd = new Command(CommandType.PUT, null, key, obj, 0, 0, replyQueue, null, reverseKey);
+		addCommand(dataSegmentKey, cmd);
+		if (logger.isDebugEnabled())
+			logger.debug(cmd.getCommandString());
+	}
+	
 	/**
 	 * Enqueue update command to the queue of each DataSegment key
 	 */
@@ -79,6 +87,14 @@
 		if (logger.isDebugEnabled())
 			logger.debug(cmd.getCommandString());
 	}
+	
+	public void update(String key, Object val) {
+		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
+		Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, replyQueue, null, reverseKey);
+		addCommand(dataSegmentKey, cmd);
+		if (logger.isDebugEnabled())
+			logger.debug(cmd.getCommandString());
+	}
 
 	@Override
 	public void take(Receiver receiver, String key, int index, CodeSegment cs) {
@@ -128,4 +144,12 @@
 			logger.debug(cmd.getCommandString());
 	}
 	
+	public void flip(String key, Object val) {
+		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
+		Command cmd = new Command(CommandType.FLIP, null, key, val, 0, 0, replyQueue, null, reverseKey);
+		addCommand(dataSegmentKey, cmd);
+		if (logger.isDebugEnabled())
+			logger.debug(cmd.getCommandString());
+	}
+	
 }
--- a/src/alice/datasegment/Receiver.java	Thu Mar 07 12:31:05 2013 +0900
+++ b/src/alice/datasegment/Receiver.java	Thu Mar 07 21:27:00 2013 +0900
@@ -19,6 +19,7 @@
 	public InputDataSegment ids;
 	public int index;
 	public Value val;
+	public Object obj;
 	public String from;
 	public CommandType type;
 	
--- a/src/alice/test/codesegment/local/StartCodeSegment.java	Thu Mar 07 12:31:05 2013 +0900
+++ b/src/alice/test/codesegment/local/StartCodeSegment.java	Thu Mar 07 21:27:00 2013 +0900
@@ -1,6 +1,9 @@
 package alice.test.codesegment.local;
 
+import java.util.Random;
+
 import alice.codesegment.CodeSegment;
+import alice.test.codesegment.local.bitonicsort.DataList;
 
 public class StartCodeSegment extends CodeSegment {
 
@@ -17,8 +20,14 @@
 		ods.update("local", "key1", 0);  // bind string data to datasegment local.key1
 										 // this startup TestCodeSegment.  
 		*/
+		DataList list = new DataList();
+		int size = 10;
+		for (int i = 0; i < size; i++){
+			Random rnd = new Random();
+			list.table.add(rnd.nextInt(100000));
+		}
 		t = System.currentTimeMillis();
-		ods.put("local", "key1", 0);
+		ods.put("key1", list);
 		//ods.put("local", "key1", 1);
 		//ods.put("local", "key1", 2);
 		//ods.put("local", "key1", 3);
--- a/src/alice/test/codesegment/local/TestCodeSegment.java	Thu Mar 07 12:31:05 2013 +0900
+++ b/src/alice/test/codesegment/local/TestCodeSegment.java	Thu Mar 07 21:27:00 2013 +0900
@@ -19,10 +19,12 @@
 	
 	@Override
 	public void run() {
-		if(count > 10000){
+		if(count > 1){
 			System.out.println(System.currentTimeMillis() - StartCodeSegment.t);
 			System.exit(1);
 		}
+		//System.out.println(arg1.obj);
+		//System.out.println(arg1.val);
 		
 		//ods.update("key1",arg1.asInteger()+1);
 		new TestCodeSegment();