diff src/main/java/alice/daemon/IncomingTcpConnection.java @ 345:8f71c3e6f11d

Change directory structure Maven standard
author sugi
date Wed, 16 Apr 2014 18:26:07 +0900
parents
children 11ba40caa93b
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/alice/daemon/IncomingTcpConnection.java	Wed Apr 16 18:26:07 2014 +0900
@@ -0,0 +1,101 @@
+package alice.daemon;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
+
+import org.msgpack.unpacker.Unpacker;
+
+import alice.codesegment.SingletonMessage;
+import alice.datasegment.Command;
+import alice.datasegment.CommandType;
+import alice.datasegment.DataSegment;
+import alice.datasegment.DataSegmentKey;
+import alice.datasegment.DataSegmentManager;
+import alice.datasegment.LocalDataSegmentManager;
+import alice.topology.HostMessage;
+import alice.topology.manager.keeparive.RespondData;
+import alice.topology.manager.reconnection.SendError;
+
+public class IncomingTcpConnection extends Thread {
+	
+	public Connection connection;
+	public DataSegmentManager manager;
+	public String reverseKey;
+	private LocalDataSegmentManager lmanager = DataSegment.getLocal();
+
+	public IncomingTcpConnection(Connection connection, DataSegmentManager manager, String reverseKey) {
+		this.manager = manager;
+		this.connection = connection;
+		this.reverseKey = reverseKey;
+	}
+	
+	/**
+	 * pipeline thread for receiving
+	 */
+	public void run() {
+		Unpacker unpacker = this.getUnpacker();
+		if (unpacker == null) {
+			return;
+		}
+		while (true) {
+			try {
+				CommandMessage msg = unpacker.read(CommandMessage.class);
+				CommandType type = CommandType.getCommandTypeFromId(msg.type);
+				switch (type) {
+				case UPDATE:
+					getDataSegmentKey(msg).runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey));
+					break;
+				case PUT:
+					getDataSegmentKey(msg).runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey));
+					break;
+				case PEEK:
+					getDataSegmentKey(msg).runCommand(new Command(type, null, null, null, msg.index, msg.seq, connection, null, null, msg.flag));
+					break;
+				case TAKE:
+					getDataSegmentKey(msg).runCommand(new Command(type, null, null, null, msg.index, msg.seq, connection, null, null, msg.flag));
+					break;	
+				case REMOVE:
+					getDataSegmentKey(msg).runCommand(new Command(type, null, null, null, 0, 0, null, null, null));
+					break;
+				case REPLY:
+					Command cmd = manager.getAndRemoveCmd(msg.seq);
+					cmd.cs.ids.reply(cmd.receiver, new Command(type, null, null, msg.val, msg.index, msg.seq, null, null, null));
+					cmd=null;
+					break;
+				case PING:
+					DataSegment.get(reverseKey).response(msg.key);
+					break;
+				case RESPONSE:
+					DataSegment.getLocal().put(msg.key, new RespondData(reverseKey, System.currentTimeMillis()));
+					break;
+				default:
+					break;
+				}
+			} catch (ClosedChannelException e) {
+				connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null));
+				return;
+			} catch (EOFException e) {
+				new SendError(new HostMessage(connection.socket.getInetAddress().getHostName(), connection.socket.getPort())).execute();
+				connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null));
+				return;
+			} catch (IOException e) {
+				e.printStackTrace();
+			}
+		}
+	}
+
+	private Unpacker getUnpacker() {
+		Unpacker unpacker = null;
+		try {
+			unpacker = SingletonMessage.getInstance().createUnpacker(connection.socket.getInputStream());
+		} catch (IOException e2) {
+			e2.printStackTrace();
+		}
+		return unpacker;
+	}
+
+	private DataSegmentKey getDataSegmentKey(CommandMessage msg) {
+		return lmanager.getDataSegmentKey(msg.key);
+	}
+}