view src/alice/datasegment/RemoteDataSegmentManager.java @ 299:48de3510fb00

Scheduler has bug
author sugi
date Mon, 04 Nov 2013 13:09:14 +0900
parents f5d7654b90ff
children a8255a831ade
line wrap: on
line source

package alice.datasegment;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;

import org.apache.log4j.Logger;
import alice.codesegment.CodeSegment;
import alice.daemon.Connection;
import alice.daemon.IncomingTcpConnection;
import alice.daemon.OutboundTcpConnection;
import alice.topology.HostMessage;
import alice.topology.manager.reconnection.SendError;

public class RemoteDataSegmentManager extends DataSegmentManager {
	
	Connection connection;
	Logger logger;
	
	public RemoteDataSegmentManager(String connectionKey, final String reverseKey, final String hostName, final int port, final boolean rFlag) {
		logger = Logger.getLogger(connectionKey);
		connection = new Connection();
		final RemoteDataSegmentManager manager = this;
		new Thread(replyThread, "RemoteDataSegmentManager-" + connectionKey).start();
		new Thread("Connect-" + connectionKey) {
			public void run() {
				boolean connect = true;
				do {
					try {
						SocketChannel sc = SocketChannel.open(new InetSocketAddress(hostName, port));
						connection.socket = sc.socket();
						connection.socket.setTcpNoDelay(true);
						connect = false;
						logger.info("Connect to " + connection.getInfoString());
					} catch (IOException e) {
						try {
							Thread.sleep(50);
						} catch (InterruptedException e1) {
							e1.printStackTrace();
						}
					}
				} while (connect&&!rFlag);
				new IncomingTcpConnection(connection, manager, reverseKey).start();
				new OutboundTcpConnection(connection).start();
				// if connection failed need to stop these thread 
				if (connect){
					new SendError(new HostMessage(hostName, port)).execute();
				}
			}
		}.start();
	}
	
	/**
	 * send put command to target DataSegment
	 */
	@Override
	public void put(String key, Object val) {
		Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null);
		connection.sendCommand(cmd); // put command on the transmission thread
		if (logger.isDebugEnabled())
			logger.debug(cmd.getCommandString());
	}
	
	@Override
	public void quickPut(String key, Object val) {
		Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null);
		connection.write(cmd); // put command is executed right now
		if (logger.isDebugEnabled())
			logger.debug(cmd.getCommandString());
	}
	
	@Override
	public void update(String key, Object val) {
		Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null);
		connection.sendCommand(cmd);
		if (logger.isDebugEnabled())
			logger.debug(cmd.getCommandString());
	}
	
	@Override
	public void quickUpdate(String key, Object val) {
		Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null);
		connection.write(cmd);
		if (logger.isDebugEnabled())
			logger.debug(cmd.getCommandString());
	}

	@Override
	public void take(Receiver receiver, CodeSegment cs) {
		int seq = this.seq.getAndIncrement();
		Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
		seqHash.put(seq, cmd);
		connection.sendCommand(cmd);
		if (logger.isDebugEnabled())
			logger.debug(cmd.getCommandString());
	}
	
	public void quickTake(Receiver receiver, CodeSegment cs) {
		int seq = this.seq.getAndIncrement();
		Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null, true);
		seqHash.put(seq, cmd);
		connection.write(cmd);
		if (logger.isDebugEnabled())
			logger.debug(cmd.getCommandString());
	}

	@Override
	public void peek(Receiver receiver, CodeSegment cs) {
		int seq = this.seq.getAndIncrement();
		Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
		seqHash.put(seq, cmd);
		connection.sendCommand(cmd);
		if (logger.isDebugEnabled())
			logger.debug(cmd.getCommandString());
	}
	
	public void quickPeek(Receiver receiver, CodeSegment cs) {
		int seq = this.seq.getAndIncrement();
		Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null, true);
		seqHash.put(seq, cmd);
		connection.write(cmd);
		if (logger.isDebugEnabled())
			logger.debug(cmd.getCommandString());
		
	}

	@Override
	public void remove(String key) {
		Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, null, null, null);
		connection.sendCommand(cmd);
		if (logger.isDebugEnabled())
			logger.debug(cmd.getCommandString());
	}

	@Override
	public void finish() {
		Command cmd = new Command(CommandType.FINISH, null, null, null, 0, 0, null, null, null);
		connection.sendCommand(cmd);
	}

	@Override
	public void close() {
		Command cmd = new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null);
		connection.sendCommand(cmd);
	}


}