changeset 14:e3f1b21718b0

implements RemoteDataSegment
author kazz <kazz@cr.ie.u-ryukyu.ac.jp>
date Sun, 15 Jan 2012 00:56:25 +0900
parents 30f97d776a3e
children 45e98e74db96
files src/alice/daemon/AcceptThread.java src/alice/daemon/AliceDaemon.java src/alice/daemon/Config.java src/alice/daemon/Connection.java src/alice/daemon/IncomingTcpConnection.java src/alice/daemon/OutboundTcpConnection.java src/alice/datasegment/Command.java src/alice/datasegment/DataSegmentKey.java src/alice/datasegment/DataSegmentManager.java src/alice/datasegment/LocalDataSegmentManager.java src/alice/datasegment/RemoteDataSegment.java src/alice/datasegment/RemoteDataSegmentManager.java src/alice/test/codesegment/RemoteIncrement.java src/alice/test/codesegment/RemoteStartCodeSegment.java src/alice/test/codesegment/TestAlice.java src/alice/test/codesegment/TestLocalAlice.java src/alice/test/codesegment/TestRemoteAlice.java
diffstat 17 files changed, 214 insertions(+), 95 deletions(-) [+]
line wrap: on
line diff
--- a/src/alice/daemon/AcceptThread.java	Fri Jan 13 19:04:59 2012 +0900
+++ b/src/alice/daemon/AcceptThread.java	Sun Jan 15 00:56:25 2012 +0900
@@ -4,6 +4,8 @@
 import java.net.ServerSocket;
 import java.net.Socket;
 
+import alice.datasegment.DataSegment;
+
 public class AcceptThread extends Thread {
 
 	private ServerSocket ss;
@@ -19,7 +21,7 @@
 			try {
 				Socket socket = ss.accept();
 				Connection connection = new Connection(socket);
-				new IncomingTcpConnection(connection).start();
+				new IncomingTcpConnection(connection, DataSegment.get("local")).start();
 				new OutboundTcpConnection(connection).start();
 			} catch (IOException e) {
 				e.printStackTrace();
--- a/src/alice/daemon/AliceDaemon.java	Fri Jan 13 19:04:59 2012 +0900
+++ b/src/alice/daemon/AliceDaemon.java	Sun Jan 15 00:56:25 2012 +0900
@@ -11,8 +11,8 @@
 	private Config conf;
 	private AcceptThread acceptThread;
 	
-	public AliceDaemon(String[] args) {
-		this.conf = new Config(args);
+	public AliceDaemon(Config conf) {
+		this.conf = conf;
 	}
 	
 	public void listen() {
--- a/src/alice/daemon/Config.java	Fri Jan 13 19:04:59 2012 +0900
+++ b/src/alice/daemon/Config.java	Sun Jan 15 00:56:25 2012 +0900
@@ -3,11 +3,20 @@
 public class Config {
 
 	public int port = 10000;
+	public String hostname;
+	public int connectPort = 10000;
+	public String key;
 
 	public Config(String[] args) {
 		for (int i = 0; i< args.length; i++) {
 			if ("-p".equals(args[i])) {
 				port = Integer.parseInt(args[++i]);
+			} else if ("-h".equals(args[i])) {
+				hostname = args[++i];
+			} else if ("-cp".equals(args[i])) {
+				connectPort = Integer.parseInt(args[++i]);
+			} else if ("-key".equals(args[i])) {
+				
 			}
 		}
 	}
--- a/src/alice/daemon/Connection.java	Fri Jan 13 19:04:59 2012 +0900
+++ b/src/alice/daemon/Connection.java	Sun Jan 15 00:56:25 2012 +0900
@@ -13,5 +13,13 @@
 	public Connection(Socket socket) {
 		this.socket = socket;
 	}
+	
+	public void sendCommand(Command cmd) {
+		try {
+			sendQueue.put(cmd);
+		} catch (InterruptedException e) {
+			e.printStackTrace();
+		}
+	}
 
 }
--- a/src/alice/daemon/IncomingTcpConnection.java	Fri Jan 13 19:04:59 2012 +0900
+++ b/src/alice/daemon/IncomingTcpConnection.java	Sun Jan 15 00:56:25 2012 +0900
@@ -10,15 +10,16 @@
 import alice.datasegment.CommandType;
 import alice.datasegment.DataSegment;
 import alice.datasegment.DataSegmentKey;
+import alice.datasegment.DataSegmentManager;
 import alice.datasegment.LocalDataSegmentManager;
-import alice.datasegment.SocketDataSegmentManager;
 
 public class IncomingTcpConnection extends Thread {
 	
 	public Connection connection;
-	public SocketDataSegmentManager manager;
+	public DataSegmentManager manager;
 	
-	public IncomingTcpConnection(Connection connection) {
+	public IncomingTcpConnection(Connection connection, DataSegmentManager manager) {
+		this.manager = manager;
 		this.connection = connection;
 	}
 	
@@ -45,28 +46,28 @@
 				msgBuf.rewind();
 				CommandMessage msg = msgpack.read(msgBuf, CommandMessage.class);
 				CommandType type = CommandType.getCommandTypeFromId(msg.type);
-				LocalDataSegmentManager manager = (LocalDataSegmentManager)DataSegment.get("local"); 
-				DataSegmentKey dsKey = manager.getDataSegmentKey(msg.key);
+				LocalDataSegmentManager lmanager = (LocalDataSegmentManager)DataSegment.get("local"); 
+				DataSegmentKey dsKey = lmanager.getDataSegmentKey(msg.key);
 				switch (type) {
 				case UPDATE:
-					dsKey.addCommand(new Command(type, null, msg.val, 0, 0, null, null));
+					dsKey.addCommand(new Command(type, null, null, msg.val, 0, 0, null, null));
 					break;
 				case PUT:
-					dsKey.addCommand(new Command(type, null, msg.val, 0, 0, null, null));
+					dsKey.addCommand(new Command(type, null, null, msg.val, 0, 0, null, null));
 					break;
 				case PEEK:
 					//Command(CommandType cmdType, String argKey, Value val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs) {
-					dsKey.addCommand(new Command(type, null, null, msg.index, msg.seq, connection.sendQueue, null));
+					dsKey.addCommand(new Command(type, null, null, null, msg.index, msg.seq, connection.sendQueue, null));
 					break;
 				case TAKE:
-					dsKey.addCommand(new Command(type, null, null, msg.index, msg.seq, connection.sendQueue, null));
+					dsKey.addCommand(new Command(type, null, null, null, msg.index, msg.seq, connection.sendQueue, null));
 					break;	
 				case REMOVE:
-					dsKey.addCommand(new Command(type, null, null, 0, 0, null, null));
+					dsKey.addCommand(new Command(type, null, null, null, 0, 0, null, null));
 					break;
 				case REPLY:
 					try {
-						manager.replyQueue.put(new Command(type, null, msg.val, msg.index, msg.seq, null, null));
+						manager.replyQueue.put(new Command(type, null, null, msg.val, msg.index, msg.seq, null, null));
 					} catch (InterruptedException e) {
 						e.printStackTrace();
 					}
--- a/src/alice/daemon/OutboundTcpConnection.java	Fri Jan 13 19:04:59 2012 +0900
+++ b/src/alice/daemon/OutboundTcpConnection.java	Sun Jan 15 00:56:25 2012 +0900
@@ -16,7 +16,7 @@
 	}
 	
 	public CommandMessage convert(Command cmd) {
-		return new CommandMessage(cmd.type.id, cmd.index, cmd.seq, null, cmd.val);
+		return new CommandMessage(cmd.type.id, cmd.index, cmd.seq, cmd.key, cmd.val);
 	}
 	
 	public void run() {
--- a/src/alice/datasegment/Command.java	Fri Jan 13 19:04:59 2012 +0900
+++ b/src/alice/datasegment/Command.java	Sun Jan 15 00:56:25 2012 +0900
@@ -8,6 +8,7 @@
 
 public class Command {
 	public CommandType type;
+	public String key;
 	public String argKey;
 	public Value val;
 	public int index;
@@ -15,7 +16,7 @@
 	public BlockingQueue<Command> replyQueue;
 	public CodeSegment cs;
 	
-	public Command(CommandType cmdType, String argKey, Value val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs) {
+	public Command(CommandType cmdType, String argKey, String key, Value val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs) {
 		this.type = cmdType;
 		this.argKey = argKey;
 		this.val = val;
--- a/src/alice/datasegment/DataSegmentKey.java	Fri Jan 13 19:04:59 2012 +0900
+++ b/src/alice/datasegment/DataSegmentKey.java	Sun Jan 15 00:56:25 2012 +0900
@@ -43,7 +43,7 @@
 							LinkedList<Command> removeList = new LinkedList<Command>();
 							for (Command waitCmd : waitList) {
 								if (waitCmd.index < index) {
-									waitCmd.replyQueue.put(new Command(CommandType.REPLY, null, cmd.val, index, waitCmd.seq, null, null));
+									waitCmd.replyQueue.put(new Command(CommandType.REPLY, null, null, cmd.val, index, waitCmd.seq, null, null));
 									removeList.add(waitCmd);
 									if (waitCmd.type == CommandType.TAKE) { // delete data, if it run take cmd.
 										dataList.remove(dsv);
@@ -62,7 +62,7 @@
 							}
 							for (DataSegmentValue data : dataList) {
 								if (data.index > cmd.index) {
-									cmd.replyQueue.put(new Command(CommandType.REPLY, null, data.val, data.index, cmd.seq, null, null));
+									cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null));
 									break;
 								}
 							}
@@ -75,7 +75,7 @@
 							}
 							for (DataSegmentValue data : dataList) {
 								if (data.index > cmd.index) {
-									cmd.replyQueue.put(new Command(CommandType.REPLY, null, data.val, data.index, cmd.seq, null, null));
+									cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null));
 									dataList.remove(data);
 									break;
 								}
--- a/src/alice/datasegment/DataSegmentManager.java	Fri Jan 13 19:04:59 2012 +0900
+++ b/src/alice/datasegment/DataSegmentManager.java	Sun Jan 15 00:56:25 2012 +0900
@@ -2,15 +2,34 @@
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.msgpack.type.Value;
 
 import alice.codesegment.CodeSegment;
 
 public abstract class DataSegmentManager {
+	
 	protected ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>();
 	protected ConcurrentHashMap<Integer, Command> seqHash = new ConcurrentHashMap<Integer, Command>();
-	public LinkedBlockingQueue<Command> replyQueue = new LinkedBlockingQueue<Command>();  
+	public LinkedBlockingQueue<Command> replyQueue = new LinkedBlockingQueue<Command>();
+	protected AtomicInteger seq = new AtomicInteger(1);
+	protected Runnable replyThread = new Runnable() {
+
+		@Override
+		public void run() {
+			while (true) {
+				try {
+					Command reply = replyQueue.take();
+					Command cmd = seqHash.get(reply.seq);
+					cmd.cs.ids.reply(cmd.argKey, new DataSegmentValue(reply.index, reply.val));
+				} catch (InterruptedException e) {
+					e.printStackTrace();
+				}
+			}
+		}
+		
+	};
 	
 	public abstract void put(String key, Value val);
 	public abstract void update(String key, Value val);
--- a/src/alice/datasegment/LocalDataSegmentManager.java	Fri Jan 13 19:04:59 2012 +0900
+++ b/src/alice/datasegment/LocalDataSegmentManager.java	Sun Jan 15 00:56:25 2012 +0900
@@ -1,7 +1,5 @@
 package alice.datasegment;
 
-import java.util.concurrent.atomic.AtomicInteger;
-
 import org.msgpack.type.Value;
 
 import alice.codesegment.CodeSegment;
@@ -9,24 +7,6 @@
 
 public class LocalDataSegmentManager extends DataSegmentManager {
 	
-	private AtomicInteger seq = new AtomicInteger(1);
-	private Runnable replyThread = new Runnable() {
-
-		@Override
-		public void run() {
-			while (true) {
-				try {
-					Command reply = replyQueue.take();
-					Command cmd = seqHash.get(reply.seq);
-					cmd.cs.ids.reply(cmd.argKey, new DataSegmentValue(reply.index, reply.val));
-				} catch (InterruptedException e) {
-					e.printStackTrace();
-				}
-			}
-		}
-		
-	};
-	
 	public LocalDataSegmentManager() {
 		new Thread(replyThread).start();
 	}
@@ -44,20 +24,20 @@
 	@Override
 	public void put(String key, Value val) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
-		dataSegmentKey.addCommand(new Command(CommandType.PUT, null, val, 0, 0, replyQueue, null));
+		dataSegmentKey.addCommand(new Command(CommandType.PUT, null, null, val, 0, 0, replyQueue, null));
 	}
 
 	@Override
 	public void update(String key, Value val) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
-		dataSegmentKey.addCommand(new Command(CommandType.UPDATE, null, val, 0, 0, replyQueue, null));
+		dataSegmentKey.addCommand(new Command(CommandType.UPDATE, null, null, val, 0, 0, replyQueue, null));
 	}
 
 	@Override
 	public void take(String argKey, String key, int index, CodeSegment cs) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
 		int seq = this.seq.getAndIncrement();
-		Command cmd = new Command(CommandType.TAKE, argKey, null, index, seq, replyQueue, cs);
+		Command cmd = new Command(CommandType.TAKE, argKey, null, null, index, seq, replyQueue, cs);
 		seqHash.put(seq, cmd);
 		dataSegmentKey.addCommand(cmd);
 	}
@@ -66,7 +46,7 @@
 	public void peek(String argKey, String key, int index, CodeSegment cs) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
 		int seq = this.seq.getAndIncrement();
-		Command cmd = new Command(CommandType.PEEK, argKey, null, index, seq, replyQueue, cs);
+		Command cmd = new Command(CommandType.PEEK, argKey, null, null, index, seq, replyQueue, cs);
 		seqHash.put(seq, cmd);
 		dataSegmentKey.addCommand(cmd);
 	}
@@ -74,7 +54,7 @@
 	@Override
 	public void remove(String key) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
-		dataSegmentKey.addCommand(new Command(CommandType.REMOVE, null, null, 0, 0, replyQueue, null));
+		dataSegmentKey.addCommand(new Command(CommandType.REMOVE, null, null, null, 0, 0, replyQueue, null));
 	}
 
 }
--- a/src/alice/datasegment/RemoteDataSegment.java	Fri Jan 13 19:04:59 2012 +0900
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,39 +0,0 @@
-package alice.datasegment;
-
-import org.msgpack.type.Value;
-
-import alice.codesegment.CodeSegment;
-
-public class RemoteDataSegment extends DataSegmentManager {
-
-	@Override
-	public void put(String key, Value val) {
-		// TODO Auto-generated method stub
-		
-	}
-
-	@Override
-	public void update(String key, Value val) {
-		// TODO Auto-generated method stub
-		
-	}
-
-	@Override
-	public void take(String argKey, String key, int index, CodeSegment cs) {
-		// TODO Auto-generated method stub
-		
-	}
-
-	@Override
-	public void peek(String argKey, String key, int index, CodeSegment cs) {
-		// TODO Auto-generated method stub
-		
-	}
-
-	@Override
-	public void remove(String key) {
-		// TODO Auto-generated method stub
-		
-	}
-
-}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/datasegment/RemoteDataSegmentManager.java	Sun Jan 15 00:56:25 2012 +0900
@@ -0,0 +1,52 @@
+package alice.datasegment;
+
+import org.msgpack.type.Value;
+
+import alice.codesegment.CodeSegment;
+import alice.daemon.Connection;
+import alice.daemon.IncomingTcpConnection;
+import alice.daemon.OutboundTcpConnection;
+
+public class RemoteDataSegmentManager extends DataSegmentManager {
+	
+	Connection connection;
+	
+	public RemoteDataSegmentManager(Connection connection) {
+		this.connection = connection;
+		new IncomingTcpConnection(connection, this).start();
+		new OutboundTcpConnection(connection).start();
+		new Thread(replyThread).start();
+	}
+	
+	@Override
+	public void put(String key, Value val) {
+		connection.sendCommand(new Command(CommandType.PUT, null, key, val, 0, 0, null, null));
+	}
+
+	@Override
+	public void update(String key, Value val) {
+		connection.sendCommand(new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null));
+	}
+
+	@Override
+	public void take(String argKey, String key, int index, CodeSegment cs) {
+		int seq = this.seq.getAndIncrement();
+		Command cmd = new Command(CommandType.TAKE, argKey, key, null, index, seq, replyQueue, cs);
+		seqHash.put(seq, cmd);
+		connection.sendCommand(cmd);		
+	}
+
+	@Override
+	public void peek(String argKey, String key, int index, CodeSegment cs) {
+		int seq = this.seq.getAndIncrement();
+		Command cmd = new Command(CommandType.PEEK, argKey, key, null, index, seq, replyQueue, cs);
+		seqHash.put(seq, cmd);
+		connection.sendCommand(cmd);
+	}
+
+	@Override
+	public void remove(String key) {
+		connection.sendCommand(new Command(CommandType.REMOVE, null, key, null, 0, 0, null, null));
+	}
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/test/codesegment/RemoteIncrement.java	Sun Jan 15 00:56:25 2012 +0900
@@ -0,0 +1,24 @@
+package alice.test.codesegment;
+
+import org.msgpack.type.ValueFactory;
+
+import alice.codesegment.CodeSegment;
+import alice.datasegment.DataSegmentValue;
+
+public class RemoteIncrement extends CodeSegment {
+
+	@Override
+	public void run() {
+		DataSegmentValue data = ids.get("num");
+		int num = data.val.asIntegerValue().getInt();
+		System.out.println(num++);
+		if (num == 10) System.exit(0);
+
+		CodeSegment cs = new RemoteIncrement();
+		cs.ids.take("num", "remote", "num");
+		cs.ids.execute();
+		
+		ods.put("local", "num", ValueFactory.createIntegerValue(num));
+	}
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/test/codesegment/RemoteStartCodeSegment.java	Sun Jan 15 00:56:25 2012 +0900
@@ -0,0 +1,20 @@
+package alice.test.codesegment;
+
+import org.msgpack.type.Value;
+import org.msgpack.type.ValueFactory;
+
+import alice.codesegment.CodeSegment;
+
+public class RemoteStartCodeSegment extends CodeSegment {
+
+	@Override
+	public void run() {
+		CodeSegment cs = new RemoteIncrement();
+		cs.ids.take("num", "remote", "num");
+		cs.ids.execute();
+		
+		Value num = ValueFactory.createIntegerValue(0);
+		ods.put("local", "num", num);
+	}
+
+}
--- a/src/alice/test/codesegment/TestAlice.java	Fri Jan 13 19:04:59 2012 +0900
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,11 +0,0 @@
-package alice.test.codesegment;
-
-import alice.codesegment.CodeSegment;
-
-public class TestAlice {
-	public static void main(String args[]) {
-		CodeSegment cs = new StartCodeSegment();
-		cs.ids.execute();
-	}
-	
-}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/test/codesegment/TestLocalAlice.java	Sun Jan 15 00:56:25 2012 +0900
@@ -0,0 +1,11 @@
+package alice.test.codesegment;
+
+import alice.codesegment.CodeSegment;
+
+public class TestLocalAlice {
+	public static void main(String args[]) {
+		CodeSegment cs = new StartCodeSegment();
+		cs.ids.execute();
+	}
+	
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/test/codesegment/TestRemoteAlice.java	Sun Jan 15 00:56:25 2012 +0900
@@ -0,0 +1,42 @@
+package alice.test.codesegment;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.SocketChannel;
+
+import alice.codesegment.CodeSegment;
+import alice.daemon.AliceDaemon;
+import alice.daemon.Config;
+import alice.daemon.Connection;
+import alice.datasegment.DataSegment;
+import alice.datasegment.RemoteDataSegmentManager;
+
+public class TestRemoteAlice {
+
+	public static void main(String[] args) {
+		Config conf = new Config(args);
+		AliceDaemon daemon = new AliceDaemon(conf);
+		daemon.listen();
+		boolean connect = true;
+		do {
+			try {
+				SocketChannel sc = SocketChannel.open(new InetSocketAddress(conf.hostname, conf.connectPort));
+				Connection connection = new Connection(sc.socket());
+				RemoteDataSegmentManager manager = new RemoteDataSegmentManager(connection);
+				DataSegment.regist(conf.key, manager);
+				connect = false;
+				System.out.println("connected");
+			} catch (IOException e) {
+				try {
+					Thread.sleep(500);
+				} catch (InterruptedException e1) {
+					e1.printStackTrace();
+				}
+			}
+		} while (connect);
+		
+		CodeSegment cs = new RemoteStartCodeSegment();
+		cs.ids.execute();
+	}
+
+}