view src/main/java/alice/daemon/IncomingTcpConnection.java @ 359:11ba40caa93b multicast

remove no need method
author sugi
date Thu, 15 May 2014 15:43:23 +0900
parents 8f71c3e6f11d
children 3c9446fa4073
line wrap: on
line source

package alice.daemon;

import java.io.EOFException;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;

import org.msgpack.unpacker.Unpacker;

import alice.codesegment.SingletonMessage;
import alice.datasegment.Command;
import alice.datasegment.CommandType;
import alice.datasegment.DataSegment;
import alice.datasegment.DataSegmentManager;
import alice.datasegment.LocalDataSegmentManager;
import alice.topology.HostMessage;
import alice.topology.manager.keeparive.RespondData;
import alice.topology.manager.reconnection.SendError;

public class IncomingTcpConnection extends Thread {
	
	public Connection connection;
	public DataSegmentManager manager;
	public String reverseKey;
	private LocalDataSegmentManager lmanager = DataSegment.getLocal();

	public IncomingTcpConnection(Connection connection, DataSegmentManager manager, String reverseKey) {
		this.manager = manager;
		this.connection = connection;
		this.reverseKey = reverseKey;
	}
	
	/**
	 * pipeline thread for receiving
	 */
	public void run() {
		Unpacker unpacker = this.getUnpacker();
		if (unpacker == null) {
			return;
		}
		while (true) {
			try {
				CommandMessage msg = unpacker.read(CommandMessage.class);
				CommandType type = CommandType.getCommandTypeFromId(msg.type);
				switch (type) {
				case UPDATE:
					lmanager.getDataSegmentKey(msg.key)
						.runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey));
					break;
				case PUT:
					lmanager.getDataSegmentKey(msg.key)
						.runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey));
					break;
				case PEEK:
					lmanager.getDataSegmentKey(msg.key)
						.runCommand(new Command(type, null, null, null, msg.index, msg.seq, connection, null, null, msg.flag));
					break;
				case TAKE:
					lmanager.getDataSegmentKey(msg.key)
						.runCommand(new Command(type, null, null, null, msg.index, msg.seq, connection, null, null, msg.flag));
					break;	
				case REMOVE:
					lmanager.getDataSegmentKey(msg.key)
						.runCommand(new Command(type, null, null, null, 0, 0, null, null, null));
					break;
				case REPLY:
					Command cmd = manager.getAndRemoveCmd(msg.seq);
					cmd.cs.ids.reply(cmd.receiver, new Command(type, null, null, msg.val, msg.index, msg.seq, null, null, null));
					cmd=null;
					break;
				case PING:
					DataSegment.get(reverseKey).response(msg.key);
					break;
				case RESPONSE:
					DataSegment.getLocal().put(msg.key, new RespondData(reverseKey, System.currentTimeMillis()));
					break;
				default:
					break;
				}
			} catch (ClosedChannelException e) {
				connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null));
				return;
			} catch (EOFException e) {
				new SendError(new HostMessage(connection.socket.getInetAddress().getHostName(), connection.socket.getPort())).execute();
				connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null));
				return;
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}

	private Unpacker getUnpacker() {
		Unpacker unpacker = null;
		try {
			unpacker = SingletonMessage.getInstance().createUnpacker(connection.socket.getInputStream());
		} catch (IOException e2) {
			e2.printStackTrace();
		}
		return unpacker;
	}
}