changeset 264:b4690114a0cd

refactor API
author sugi
date Tue, 13 Aug 2013 06:02:55 +0900
parents 106cc1cfa595
children 41312b857f34
files src/alice/codesegment/InputDataSegment.java src/alice/codesegment/OutputDataSegment.java src/alice/datasegment/DataSegmentManager.java src/alice/datasegment/LocalDataSegmentManager.java src/alice/datasegment/RemoteDataSegmentManager.java
diffstat 5 files changed, 99 insertions(+), 141 deletions(-) [+]
line wrap: on
line diff
--- a/src/alice/codesegment/InputDataSegment.java	Tue Aug 06 02:14:40 2013 +0900
+++ b/src/alice/codesegment/InputDataSegment.java	Tue Aug 13 06:02:55 2013 +0900
@@ -26,7 +26,7 @@
 	public void quickPeek(Receiver receiver) {
 		cs.list.add(receiver);
 		if (receiver.managerKey==null){
-			DataSegment.getLocal().quickPeek(receiver, cs);
+			DataSegment.getLocal().peek(receiver, cs);
 		} else {
 			DataSegment.get(receiver.managerKey).quickPeek(receiver ,cs);
 		}
--- a/src/alice/codesegment/OutputDataSegment.java	Tue Aug 06 02:14:40 2013 +0900
+++ b/src/alice/codesegment/OutputDataSegment.java	Tue Aug 13 06:02:55 2013 +0900
@@ -1,11 +1,5 @@
 package alice.codesegment;
 
-import java.io.IOException;
-
-
-import org.msgpack.type.Value;
-import org.msgpack.type.ValueFactory;
-
 import alice.datasegment.DataSegment;
 import alice.datasegment.Receiver;
 
@@ -17,70 +11,31 @@
 
 	public void flip(Receiver receiver) {
 		receiver.managerKey=null;
-		DataSegment.getLocal().putObject(receiver.key, receiver.getObj());
-	}
-
-	public void put(String key, String val) {
-		DataSegment.getLocal().putObject(key, val);
+		DataSegment.getLocal().put(receiver.key, receiver.getObj());
 	}
 
 	public void put(String key, byte[] val) {
-		DataSegment.getLocal().putObject(key, val);
-	}
-
-	public void put(String key, int val) {
-		DataSegment.getLocal().putObject(key, val);
+		DataSegment.getLocal().put(key, val);
 	}
 
 	public <T> void put(String key, T val) {
-		DataSegment.getLocal().putObject(key, val);
-	}
-
-	public void update(String key, String val) {
-		DataSegment.getLocal().updateObject(key, val);
+		DataSegment.getLocal().put(key, val);
 	}
 
 	public void update(String key, byte[] val) {
-		DataSegment.getLocal().updateObject(key, val);
-	}
-
-	public void update(String key, int val) {
-		DataSegment.getLocal().updateObject(key, val);
+		DataSegment.getLocal().update(key, val);
 	}
 
 	public <T> void update(String key, T val) {
-		DataSegment.getLocal().updateObject(key, val);
+		DataSegment.getLocal().update(key, val);
 	}
 
-
 	/**
 	 * for remote
 	 */
-
-	public void put(String managerKey, String key, Value val) {
-		DataSegment.get(managerKey).put(key, val);
-	}
-
-	public void put(String managerKey, String key, String val) {
-		if (!managerKey.equals("local")){
-			DataSegment.get(managerKey).put(key, ValueFactory.createRawValue(val));
-		} else {
-			put(key, val);
-		}
-
-	}
-
 	public void put(String managerKey, String key, byte[] val) {
 		if (!managerKey.equals("local")){
-			DataSegment.get(managerKey).put(key, ValueFactory.createRawValue(val, true));
-		} else {
-			put(key, val);
-		}
-	}
-
-	public void put(String managerKey, String key, int val) {
-		if (!managerKey.equals("local")){
-			DataSegment.get(managerKey).put(key, ValueFactory.createIntegerValue(val));
+			DataSegment.get(managerKey).put(key,val);
 		} else {
 			put(key, val);
 		}
@@ -88,39 +43,15 @@
 
 	public <T> void put(String managerKey, String key, T val) {
 		if (!managerKey.equals("local")){
-			try {
-				DataSegment.get(managerKey).put(key, SingletonMessage.getInstance().unconvert(val));
-			} catch (IOException e) {
-				e.printStackTrace();
-			}
+			DataSegment.get(managerKey).put(key, val);
 		} else {
 			put(key, val);
 		}
 	}
 
-	public void update(String managerKey, String key, Value val) {
-		DataSegment.get(managerKey).update(key, val);
-	}
-
-	public void update(String managerKey, String key, String val) {
-		if (!managerKey.equals("local")){
-			DataSegment.get(managerKey).update(key, ValueFactory.createRawValue(val));
-		} else {
-			update(key, val);
-		}
-	}
-
 	public void update(String managerKey, String key, byte[] val) {
 		if (!managerKey.equals("local")){
-			DataSegment.get(managerKey).update(key, ValueFactory.createRawValue(val, true));
-		} else {
-			update(key, val);
-		}
-	}
-
-	public void update(String managerKey, String key, int val) {
-		if (!managerKey.equals("local")){
-			DataSegment.get(managerKey).update(key, ValueFactory.createIntegerValue(val));
+			DataSegment.get(managerKey).update(key, val);
 		} else {
 			update(key, val);
 		}
@@ -128,11 +59,7 @@
 
 	public <T> void update(String managerKey, String key, T val) {
 		if (!managerKey.equals("local")){
-			try {
-				DataSegment.get(managerKey).update(key, SingletonMessage.getInstance().unconvert(val));
-			} catch (IOException e) {
-				e.printStackTrace();
-			}
+			DataSegment.get(managerKey).update(key, val);
 		} else {
 			update(key, val);
 		}
@@ -157,9 +84,5 @@
 	public void close(String managerKey) {
 		DataSegment.get(managerKey).close();
 	}
-
-
-
-
-
+	
 }
--- a/src/alice/datasegment/DataSegmentManager.java	Tue Aug 06 02:14:40 2013 +0900
+++ b/src/alice/datasegment/DataSegmentManager.java	Tue Aug 13 06:02:55 2013 +0900
@@ -5,7 +5,6 @@
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.log4j.Logger;
-import org.msgpack.type.Value;
 
 import alice.codesegment.CodeSegment;
 
@@ -51,19 +50,25 @@
 		}
 	}
 	
-	public abstract void put(String key, Value val);
-	public abstract void update(String key, Value val);
+	public abstract void put(String key, byte[] val);
+	public abstract void put(String key, Object val);
+	public abstract void update(String key, byte[] val);
+	public abstract void update(String key, Object val);
 	public abstract void take(Receiver receiver, CodeSegment cs);
 	public abstract void peek(Receiver receiver, CodeSegment cs);
+
 	public abstract void remove(String key);
 	public abstract void close();
 	public abstract void finish();
 	
-	public abstract void quickPut(String key, Value val);
-	public abstract void quickUpdate(String key, Value val);
+	public abstract void quickPut(String key, byte[] val);
+	public abstract void quickPut(String key, Object val);
+	public abstract void quickUpdate(String key, byte[] val);
+	public abstract void quickUpdate(String key, Object val);
 	
 	public abstract void quickPeek(Receiver receiver, CodeSegment cs);
 	public abstract void quickTake(Receiver receiver, CodeSegment cs);
 
 	
+		
 }
--- a/src/alice/datasegment/LocalDataSegmentManager.java	Tue Aug 06 02:14:40 2013 +0900
+++ b/src/alice/datasegment/LocalDataSegmentManager.java	Tue Aug 13 06:02:55 2013 +0900
@@ -6,8 +6,6 @@
 import java.util.concurrent.TimeUnit;
 
 import org.apache.log4j.Logger;
-import org.msgpack.type.Value;
-
 import alice.codesegment.CodeSegment;
 
 public class LocalDataSegmentManager extends DataSegmentManager {
@@ -62,7 +60,12 @@
 	}
 	
 	@Override
-	public void put(String key, Value val) {
+	public void put(String key, byte[] val) {
+		put(key, val);
+	}
+	
+	@Override
+	public void put(String key, Object val) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
 		Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, reverseKey);
 		dataSegmentKey.runCommand(cmd);
@@ -70,19 +73,27 @@
 			logger.debug(cmd.getCommandString());
 	}
 	
-	public void putObject(String key, Object obj) {
-		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
-		Command cmd = new Command(CommandType.PUT, null, key, obj, 0, 0, null, null, reverseKey);
-		dataSegmentKey.runCommand(cmd);
-		if (logger.isDebugEnabled())
-			logger.debug(cmd.getCommandString());
+	@Override
+	public void quickPut(String key, byte[] val) {
+		put(key, val);
+	}
+	
+	@Override
+	public void quickPut(String key, Object val) {
+		put(key, val);
 	}
 	
 	/**
 	 * Enqueue update command to the queue of each DataSegment key
 	 */
+	
 	@Override
-	public void update(String key, Value val) {
+	public void update(String key, byte[] val) {
+		update(key, val);
+	}
+	
+	@Override
+	public void update(String key, Object val) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
 		Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, reverseKey);
 		dataSegmentKey.runCommand(cmd);
@@ -90,14 +101,18 @@
 			logger.debug(cmd.getCommandString());
 	}
 	
-	public void updateObject(String key, Object val) {
-		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
-		Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, reverseKey);
-		dataSegmentKey.runCommand(cmd);
-		if (logger.isDebugEnabled())
-			logger.debug(cmd.getCommandString());
+	@Override
+	public void quickUpdate(String key, byte[] val) {
+		update(key, val);
+	}
+	
+	@Override
+	public void quickUpdate(String key, Object val) {
+		update(key, val);
 	}
 
+	
+	
 	@Override
 	public void take(Receiver receiver, CodeSegment cs) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key);
@@ -109,6 +124,11 @@
 	}
 
 	@Override
+	public void quickTake(Receiver receiver, CodeSegment cs) {
+		take(receiver, cs);		
+	}
+	
+	@Override
 	public void peek(Receiver receiver, CodeSegment cs) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key);
 		int seq = this.seq.getAndIncrement();
@@ -117,6 +137,12 @@
 		if (logger.isDebugEnabled())
 			logger.debug(cmd.getCommandString());
 	}
+	
+	@Override
+	public void quickPeek(Receiver receiver, CodeSegment cs) {
+		peek(receiver, cs);
+	}
+
 
 	@Override
 	public void remove(String key) {
@@ -146,26 +172,4 @@
 		
 	}
 
-	@Override
-	public void quickPeek(Receiver receiver, CodeSegment cs) {
-		
-	}
-
-	@Override
-	public void quickTake(Receiver receiver, CodeSegment cs) {
-		
-	}
-
-	@Override
-	public void quickPut(String key, Value val) {
-
-	}
-
-	@Override
-	public void quickUpdate(String key, Value val) {
-		
-	}
-
-	
-
 }
--- a/src/alice/datasegment/RemoteDataSegmentManager.java	Tue Aug 06 02:14:40 2013 +0900
+++ b/src/alice/datasegment/RemoteDataSegmentManager.java	Tue Aug 13 06:02:55 2013 +0900
@@ -5,8 +5,6 @@
 import java.nio.channels.SocketChannel;
 
 import org.apache.log4j.Logger;
-import org.msgpack.type.Value;
-
 import alice.codesegment.CodeSegment;
 import alice.daemon.Connection;
 import alice.daemon.IncomingTcpConnection;
@@ -50,7 +48,15 @@
 	 * send put command to target DataSegment
 	 */
 	@Override
-	public void put(String key, Value val) {
+	public void put(String key, byte[] val) {
+		Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null);
+		connection.sendCommand(cmd); // put command on the transmission thread
+		if (logger.isDebugEnabled())
+			logger.debug(cmd.getCommandString());
+	}
+	
+	@Override
+	public void put(String key, Object val) {
 		Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null);
 		connection.sendCommand(cmd); // put command on the transmission thread
 		if (logger.isDebugEnabled())
@@ -58,7 +64,15 @@
 	}
 
 	@Override
-	public void quickPut(String key, Value val) {
+	public void quickPut(String key, byte[] val) {
+		Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null);
+		connection.write(cmd); // put command is executed right now
+		if (logger.isDebugEnabled())
+			logger.debug(cmd.getCommandString());
+	}
+	
+	@Override
+	public void quickPut(String key, Object val) {
 		Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null);
 		connection.write(cmd); // put command is executed right now
 		if (logger.isDebugEnabled())
@@ -66,7 +80,7 @@
 	}
 	
 	@Override
-	public void update(String key, Value val) {
+	public void update(String key, byte[] val) {
 		Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null);
 		connection.sendCommand(cmd);
 		if (logger.isDebugEnabled())
@@ -74,12 +88,27 @@
 	}
 	
 	@Override
-	public void quickUpdate(String key, Value val) {
+	public void update(String key, Object val) {
+		Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null);
+		connection.sendCommand(cmd);
+		if (logger.isDebugEnabled())
+			logger.debug(cmd.getCommandString());
+	}
+	
+	@Override
+	public void quickUpdate(String key, byte[] val) {
 		Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null);
 		connection.write(cmd);
 		if (logger.isDebugEnabled())
 			logger.debug(cmd.getCommandString());
-		
+	}
+	
+	@Override
+	public void quickUpdate(String key, Object val) {
+		Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null);
+		connection.write(cmd);
+		if (logger.isDebugEnabled())
+			logger.debug(cmd.getCommandString());
 	}
 
 	@Override
@@ -92,7 +121,6 @@
 			logger.debug(cmd.getCommandString());
 	}
 	
-	@Override
 	public void quickTake(Receiver receiver, CodeSegment cs) {
 		int seq = this.seq.getAndIncrement();
 		Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null, true);
@@ -112,8 +140,6 @@
 			logger.debug(cmd.getCommandString());
 	}
 	
-
-	@Override
 	public void quickPeek(Receiver receiver, CodeSegment cs) {
 		int seq = this.seq.getAndIncrement();
 		Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null, true);