changeset 13:30f97d776a3e

implements Alice daemon
author one
date Fri, 13 Jan 2012 19:04:59 +0900
parents c4d6ff56b9bf
children e3f1b21718b0
files src/alice/daemon/AcceptThread.java src/alice/daemon/CommandMessage.java src/alice/daemon/Connection.java src/alice/daemon/IncomingTcpConnection.java src/alice/daemon/OutboundTcpConnection.java src/alice/datasegment/Command.java src/alice/datasegment/DataSegmentKey.java src/alice/datasegment/DataSegmentManager.java src/alice/datasegment/LocalDataSegmentManager.java src/alice/datasegment/RemoteDataSegment.java src/alice/datasegment/SocketDataSegmentManager.java src/alice/test/codesegment/TestCodeSegment.java
diffstat 12 files changed, 216 insertions(+), 19 deletions(-) [+]
line wrap: on
line diff
--- a/src/alice/daemon/AcceptThread.java	Fri Jan 13 07:04:38 2012 +0900
+++ b/src/alice/daemon/AcceptThread.java	Fri Jan 13 19:04:59 2012 +0900
@@ -19,6 +19,8 @@
 			try {
 				Socket socket = ss.accept();
 				Connection connection = new Connection(socket);
+				new IncomingTcpConnection(connection).start();
+				new OutboundTcpConnection(connection).start();
 			} catch (IOException e) {
 				e.printStackTrace();
 			}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/daemon/CommandMessage.java	Fri Jan 13 19:04:59 2012 +0900
@@ -0,0 +1,21 @@
+package alice.daemon;
+
+import org.msgpack.annotation.Message;
+import org.msgpack.type.Value;
+
+@Message
+public class CommandMessage {
+	int type;
+	int index;
+	int seq;
+	String key;
+	Value val;
+	
+	public CommandMessage(int type, int index, int seq, String key, Value val) {
+		this.type = type;
+		this.index = index;
+		this.seq = seq;
+		this.key = key;
+		this.val = val;
+	}
+}
--- a/src/alice/daemon/Connection.java	Fri Jan 13 07:04:38 2012 +0900
+++ b/src/alice/daemon/Connection.java	Fri Jan 13 19:04:59 2012 +0900
@@ -1,11 +1,17 @@
 package alice.daemon;
 
 import java.net.Socket;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import alice.datasegment.Command;
 
 public class Connection {
 
+	public Socket socket;
+	public LinkedBlockingQueue<Command> sendQueue = new LinkedBlockingQueue<Command>();
+	
 	public Connection(Socket socket) {
-		// TODO Auto-generated constructor stub
+		this.socket = socket;
 	}
 
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/daemon/IncomingTcpConnection.java	Fri Jan 13 19:04:59 2012 +0900
@@ -0,0 +1,83 @@
+package alice.daemon;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+import org.msgpack.MessagePack;
+
+import alice.datasegment.Command;
+import alice.datasegment.CommandType;
+import alice.datasegment.DataSegment;
+import alice.datasegment.DataSegmentKey;
+import alice.datasegment.LocalDataSegmentManager;
+import alice.datasegment.SocketDataSegmentManager;
+
+public class IncomingTcpConnection extends Thread {
+	
+	public Connection connection;
+	public SocketDataSegmentManager manager;
+	
+	public IncomingTcpConnection(Connection connection) {
+		this.connection = connection;
+	}
+	
+	public void run() {
+		MessagePack msgpack = new MessagePack();
+		while (true) {
+			SocketChannel ch = connection.socket.getChannel();
+			ByteBuffer buf = ByteBuffer.allocateDirect(4); // for int
+			
+			try {
+				int allReadLen = 0;
+				do {
+					int readLen = ch.read(buf);
+					allReadLen += readLen;
+				} while (allReadLen < 4);
+				buf.rewind();
+				int msgLen = buf.getInt();
+				allReadLen = 0;
+				ByteBuffer msgBuf = ByteBuffer.allocateDirect(msgLen);
+				do {
+					int readLen = ch.read(msgBuf);
+					allReadLen += readLen;
+				} while (allReadLen < msgLen);
+				msgBuf.rewind();
+				CommandMessage msg = msgpack.read(msgBuf, CommandMessage.class);
+				CommandType type = CommandType.getCommandTypeFromId(msg.type);
+				LocalDataSegmentManager manager = (LocalDataSegmentManager)DataSegment.get("local"); 
+				DataSegmentKey dsKey = manager.getDataSegmentKey(msg.key);
+				switch (type) {
+				case UPDATE:
+					dsKey.addCommand(new Command(type, null, msg.val, 0, 0, null, null));
+					break;
+				case PUT:
+					dsKey.addCommand(new Command(type, null, msg.val, 0, 0, null, null));
+					break;
+				case PEEK:
+					//Command(CommandType cmdType, String argKey, Value val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs) {
+					dsKey.addCommand(new Command(type, null, null, msg.index, msg.seq, connection.sendQueue, null));
+					break;
+				case TAKE:
+					dsKey.addCommand(new Command(type, null, null, msg.index, msg.seq, connection.sendQueue, null));
+					break;	
+				case REMOVE:
+					dsKey.addCommand(new Command(type, null, null, 0, 0, null, null));
+					break;
+				case REPLY:
+					try {
+						manager.replyQueue.put(new Command(type, null, msg.val, msg.index, msg.seq, null, null));
+					} catch (InterruptedException e) {
+						e.printStackTrace();
+					}
+					break;
+				default:
+					break;
+				}
+			} catch (IOException e) {
+				e.printStackTrace();
+			}
+		}
+	}
+	
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/daemon/OutboundTcpConnection.java	Fri Jan 13 19:04:59 2012 +0900
@@ -0,0 +1,41 @@
+package alice.daemon;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.msgpack.MessagePack;
+
+import alice.datasegment.Command;
+
+public class OutboundTcpConnection extends Thread {
+	
+	public Connection connection;
+	
+	public OutboundTcpConnection(Connection connection) {
+		this.connection = connection;
+	}
+	
+	public CommandMessage convert(Command cmd) {
+		return new CommandMessage(cmd.type.id, cmd.index, cmd.seq, null, cmd.val);
+	}
+	
+	public void run() {
+		MessagePack msgpack = new MessagePack();
+		while (true) {
+			try {
+				CommandMessage cmdMsg = convert(connection.sendQueue.take());
+				byte[] buf = msgpack.write(cmdMsg);
+				ByteBuffer buffer = ByteBuffer.allocateDirect(4 + buf.length);
+				buffer.putInt(buf.length);
+				buffer.put(buf);
+				connection.socket.getChannel().write(buffer);
+			} catch (InterruptedException e) {
+				e.printStackTrace();
+			} catch (IOException e) {
+				e.printStackTrace();
+			}
+			
+		}
+	}
+	
+}
--- a/src/alice/datasegment/Command.java	Fri Jan 13 07:04:38 2012 +0900
+++ b/src/alice/datasegment/Command.java	Fri Jan 13 19:04:59 2012 +0900
@@ -1,25 +1,27 @@
 package alice.datasegment;
 
+import java.util.concurrent.BlockingQueue;
+
 import org.msgpack.type.Value;
 
 import alice.codesegment.CodeSegment;
 
 public class Command {
-	public CommandType cmdType;
+	public CommandType type;
 	public String argKey;
 	public Value val;
 	public int index;
 	public int seq;
-	public DataSegmentManager manager;
+	public BlockingQueue<Command> replyQueue;
 	public CodeSegment cs;
 	
-	public Command(CommandType cmdType, String argKey, Value val, int index, int seq, DataSegmentManager manager, CodeSegment cs) {
-		this.cmdType = cmdType;
+	public Command(CommandType cmdType, String argKey, Value val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs) {
+		this.type = cmdType;
 		this.argKey = argKey;
 		this.val = val;
 		this.index = index;
 		this.seq = seq;
-		this.manager = manager;
+		this.replyQueue = replyQueue;
 		this.cs = cs;
 	}
 	
--- a/src/alice/datasegment/DataSegmentKey.java	Fri Jan 13 07:04:38 2012 +0900
+++ b/src/alice/datasegment/DataSegmentKey.java	Fri Jan 13 19:04:59 2012 +0900
@@ -30,7 +30,7 @@
 				while (true) {
 					try {
 						Command cmd = cmdQueue.take();
-						switch (cmd.cmdType) {
+						switch (cmd.type) {
 						case UPDATE:
 							if (dataList.size() != 0) {
 								dataList.remove(0);
@@ -43,9 +43,9 @@
 							LinkedList<Command> removeList = new LinkedList<Command>();
 							for (Command waitCmd : waitList) {
 								if (waitCmd.index < index) {
-									waitCmd.manager.replyQueue.put(new Command(CommandType.REPLY, null, cmd.val, index, waitCmd.seq, null, null));
+									waitCmd.replyQueue.put(new Command(CommandType.REPLY, null, cmd.val, index, waitCmd.seq, null, null));
 									removeList.add(waitCmd);
-									if (waitCmd.cmdType == CommandType.TAKE) { // delete data, if it run take cmd.
+									if (waitCmd.type == CommandType.TAKE) { // delete data, if it run take cmd.
 										dataList.remove(dsv);
 										break;
 									}
@@ -62,7 +62,7 @@
 							}
 							for (DataSegmentValue data : dataList) {
 								if (data.index > cmd.index) {
-									cmd.manager.replyQueue.put(new Command(CommandType.REPLY, null, data.val, data.index, cmd.seq, null, null));
+									cmd.replyQueue.put(new Command(CommandType.REPLY, null, data.val, data.index, cmd.seq, null, null));
 									break;
 								}
 							}
@@ -75,7 +75,7 @@
 							}
 							for (DataSegmentValue data : dataList) {
 								if (data.index > cmd.index) {
-									cmd.manager.replyQueue.put(new Command(CommandType.REPLY, null, data.val, data.index, cmd.seq, null, null));
+									cmd.replyQueue.put(new Command(CommandType.REPLY, null, data.val, data.index, cmd.seq, null, null));
 									dataList.remove(data);
 									break;
 								}
--- a/src/alice/datasegment/DataSegmentManager.java	Fri Jan 13 07:04:38 2012 +0900
+++ b/src/alice/datasegment/DataSegmentManager.java	Fri Jan 13 19:04:59 2012 +0900
@@ -8,8 +8,8 @@
 import alice.codesegment.CodeSegment;
 
 public abstract class DataSegmentManager {
-	public ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>();
-	public ConcurrentHashMap<Integer, Command> seqHash = new ConcurrentHashMap<Integer, Command>();
+	protected ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>();
+	protected ConcurrentHashMap<Integer, Command> seqHash = new ConcurrentHashMap<Integer, Command>();
 	public LinkedBlockingQueue<Command> replyQueue = new LinkedBlockingQueue<Command>();  
 	
 	public abstract void put(String key, Value val);
--- a/src/alice/datasegment/LocalDataSegmentManager.java	Fri Jan 13 07:04:38 2012 +0900
+++ b/src/alice/datasegment/LocalDataSegmentManager.java	Fri Jan 13 19:04:59 2012 +0900
@@ -26,11 +26,12 @@
 		}
 		
 	};
+	
 	public LocalDataSegmentManager() {
 		new Thread(replyThread).start();
 	}
 	
-	private DataSegmentKey getDataSegmentKey(String key) {
+	public DataSegmentKey getDataSegmentKey(String key) {
 		DataSegmentKey newDataSegmentKey = new DataSegmentKey();
 		DataSegmentKey dataSegmentKey = dataSegments.putIfAbsent(key, newDataSegmentKey);
 		if (dataSegmentKey == null) {
@@ -43,20 +44,20 @@
 	@Override
 	public void put(String key, Value val) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
-		dataSegmentKey.addCommand(new Command(CommandType.PUT, null, val, 0, 0, this, null));
+		dataSegmentKey.addCommand(new Command(CommandType.PUT, null, val, 0, 0, replyQueue, null));
 	}
 
 	@Override
 	public void update(String key, Value val) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
-		dataSegmentKey.addCommand(new Command(CommandType.UPDATE, null, val, 0, 0, this, null));
+		dataSegmentKey.addCommand(new Command(CommandType.UPDATE, null, val, 0, 0, replyQueue, null));
 	}
 
 	@Override
 	public void take(String argKey, String key, int index, CodeSegment cs) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
 		int seq = this.seq.getAndIncrement();
-		Command cmd = new Command(CommandType.TAKE, argKey, null, index, seq, this, cs);
+		Command cmd = new Command(CommandType.TAKE, argKey, null, index, seq, replyQueue, cs);
 		seqHash.put(seq, cmd);
 		dataSegmentKey.addCommand(cmd);
 	}
@@ -65,7 +66,7 @@
 	public void peek(String argKey, String key, int index, CodeSegment cs) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
 		int seq = this.seq.getAndIncrement();
-		Command cmd = new Command(CommandType.PEEK, argKey, null, index, seq, this, cs);
+		Command cmd = new Command(CommandType.PEEK, argKey, null, index, seq, replyQueue, cs);
 		seqHash.put(seq, cmd);
 		dataSegmentKey.addCommand(cmd);
 	}
@@ -73,7 +74,7 @@
 	@Override
 	public void remove(String key) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
-		dataSegmentKey.addCommand(new Command(CommandType.REMOVE, null, null, 0, 0, this, null));
+		dataSegmentKey.addCommand(new Command(CommandType.REMOVE, null, null, 0, 0, replyQueue, null));
 	}
 
 }
--- a/src/alice/datasegment/RemoteDataSegment.java	Fri Jan 13 07:04:38 2012 +0900
+++ b/src/alice/datasegment/RemoteDataSegment.java	Fri Jan 13 19:04:59 2012 +0900
@@ -35,4 +35,5 @@
 		// TODO Auto-generated method stub
 		
 	}
+
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/datasegment/SocketDataSegmentManager.java	Fri Jan 13 19:04:59 2012 +0900
@@ -0,0 +1,39 @@
+package alice.datasegment;
+
+import org.msgpack.type.Value;
+
+import alice.codesegment.CodeSegment;
+
+public class SocketDataSegmentManager extends DataSegmentManager {
+
+	@Override
+	public void put(String key, Value val) {
+		// TODO Auto-generated method stub
+
+	}
+
+	@Override
+	public void update(String key, Value val) {
+		// TODO Auto-generated method stub
+
+	}
+
+	@Override
+	public void take(String argKey, String key, int index, CodeSegment cs) {
+		// TODO Auto-generated method stub
+
+	}
+
+	@Override
+	public void peek(String argKey, String key, int index, CodeSegment cs) {
+		// TODO Auto-generated method stub
+
+	}
+
+	@Override
+	public void remove(String key) {
+		// TODO Auto-generated method stub
+
+	}
+
+}
--- a/src/alice/test/codesegment/TestCodeSegment.java	Fri Jan 13 07:04:38 2012 +0900
+++ b/src/alice/test/codesegment/TestCodeSegment.java	Fri Jan 13 19:04:59 2012 +0900
@@ -13,6 +13,7 @@
 		DataSegmentValue data = ids.get("arg1");
 		System.out.println("index = " + data.index);
 		System.out.println("data = " + data.val);
+		System.out.println(data.val.getType());
 		
 		if (data.index == 10) {
 			System.exit(0);