view src/alice/datasegment/RemoteDataSegmentManager.java @ 25:50c75cb3de60

implements TopologyNode
author kazz <kazz@cr.ie.u-ryukyu.ac.jp>
date Tue, 17 Jan 2012 02:40:27 +0900
parents 54bf607118ae
children f54dcbebde3a
line wrap: on
line source

package alice.datasegment;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;

import org.apache.log4j.Logger;
import org.msgpack.type.Value;

import alice.codesegment.CodeSegment;
import alice.daemon.Connection;
import alice.daemon.IncomingTcpConnection;
import alice.daemon.OutboundTcpConnection;

public class RemoteDataSegmentManager extends DataSegmentManager {
	
	Connection connection;
	Logger logger = Logger.getLogger(RemoteDataSegmentManager.class);
	
	// TODO: delete this constructor later
	@Deprecated
	public RemoteDataSegmentManager(Connection connection) {
		this.connection = connection;
		new IncomingTcpConnection(connection, this).start();
		new OutboundTcpConnection(connection).start();
		new Thread(replyThread, "RemoteDataSegmentManager-"
		+ connection.socket.getInetAddress().getHostName()
		+ ":" + connection.socket.getPort()).start();
	}
	
	public RemoteDataSegmentManager(String key, final String hostName, final int port) {
		connection = new Connection();
		final RemoteDataSegmentManager manager = this;
		new Thread(replyThread, "RemoteDataSegmentManager-"
				+ connection.socket.getInetAddress().getHostName()
				+ ":" + connection.socket.getPort()).start();
		new Thread("Connect-" + key) {
			public void run() {
				boolean connect = true;
				do {
					try {
						SocketChannel sc = SocketChannel.open(new InetSocketAddress(hostName, port));
						connection.socket = sc.socket();
						connect = false;
						logger.info("Connect to "
								+ connection.socket.getInetAddress().getHostName()
								+ ":" + connection.socket.getPort());
					} catch (IOException e) {
						try {
							Thread.sleep(500);
						} catch (InterruptedException e1) {
							e1.printStackTrace();
						}
					}
				} while (connect);
				new IncomingTcpConnection(connection, manager).start();
				new OutboundTcpConnection(connection).start();
			}
		}.start();
	}
	
	@Override
	public void put(String key, Value val) {
		connection.sendCommand(new Command(CommandType.PUT, null, key, val, 0, 0, null, null));
	}

	@Override
	public void update(String key, Value val) {
		connection.sendCommand(new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null));
	}

	@Override
	public void take(DataSegmentReceiver receiver, String key, int index, CodeSegment cs) {
		int seq = this.seq.getAndIncrement();
		Command cmd = new Command(CommandType.TAKE, receiver, key, null, index, seq, replyQueue, cs);
		seqHash.put(seq, cmd);
		connection.sendCommand(cmd);		
	}

	@Override
	public void peek(DataSegmentReceiver receiver, String key, int index, CodeSegment cs) {
		int seq = this.seq.getAndIncrement();
		Command cmd = new Command(CommandType.PEEK, receiver, key, null, index, seq, replyQueue, cs);
		seqHash.put(seq, cmd);
		connection.sendCommand(cmd);
	}

	@Override
	public void remove(String key) {
		connection.sendCommand(new Command(CommandType.REMOVE, null, key, null, 0, 0, null, null));
	}

}