diff src/main/java/alice/datasegment/RemoteDataSegmentManager.java @ 345:8f71c3e6f11d

Change directory structure Maven standard
author sugi
date Wed, 16 Apr 2014 18:26:07 +0900
parents
children 388e7d4b0624
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/alice/datasegment/RemoteDataSegmentManager.java	Wed Apr 16 18:26:07 2014 +0900
@@ -0,0 +1,166 @@
+package alice.datasegment;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.SocketChannel;
+
+import org.apache.log4j.Logger;
+
+import alice.codesegment.CodeSegment;
+import alice.daemon.Connection;
+import alice.daemon.IncomingTcpConnection;
+import alice.daemon.OutboundTcpConnection;
+import alice.topology.HostMessage;
+import alice.topology.manager.reconnection.SendError;
+
+public class RemoteDataSegmentManager extends DataSegmentManager {
+	
+	Connection connection;
+	Logger logger;
+	
+	public RemoteDataSegmentManager(String connectionKey, final String reverseKey, final String hostName, final int port, final boolean rFlag) {
+		logger = Logger.getLogger(connectionKey);
+		connection = new Connection();
+		final RemoteDataSegmentManager manager = this;
+		new Thread(replyThread, "RemoteDataSegmentManager-" + connectionKey).start();
+		new Thread("Connect-" + connectionKey) {
+			public void run() {
+				boolean connect = true;
+				do {
+					try {
+						SocketChannel sc = SocketChannel.open(new InetSocketAddress(hostName, port));
+						connection.socket = sc.socket();
+						connection.socket.setTcpNoDelay(true);
+						connect = false;
+						logger.info("Connect to " + connection.getInfoString());
+					} catch (IOException e) {
+						try {
+							Thread.sleep(50);
+						} catch (InterruptedException e1) {
+							e1.printStackTrace();
+						}
+					}
+				} while (connect&&!rFlag);
+				new IncomingTcpConnection(connection, manager, reverseKey).start();
+				new OutboundTcpConnection(connection).start();
+				// if connection failed need to stop these thread 
+				if (connect){
+					new SendError(new HostMessage(hostName, port)).execute();
+				}
+			}
+		}.start();
+	}
+	
+	/**
+	 * send put command to target DataSegment
+	 */
+	@Override
+	public void put(String key, Object val) {
+		Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null);
+		connection.sendCommand(cmd); // put command on the transmission thread
+		if (logger.isDebugEnabled())
+			logger.debug(cmd.getCommandString());
+	}
+	
+	@Override
+	public void quickPut(String key, Object val) {
+		Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null);
+		connection.write(cmd); // put command is executed right now
+		if (logger.isDebugEnabled())
+			logger.debug(cmd.getCommandString());
+	}
+	
+	@Override
+	public void update(String key, Object val) {
+		Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null);
+		connection.sendCommand(cmd);
+		if (logger.isDebugEnabled())
+			logger.debug(cmd.getCommandString());
+	}
+	
+	@Override
+	public void quickUpdate(String key, Object val) {
+		Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null);
+		connection.write(cmd);
+		if (logger.isDebugEnabled())
+			logger.debug(cmd.getCommandString());
+	}
+
+	@Override
+	public void take(Receiver receiver, CodeSegment cs) {
+		int seq = this.seq.getAndIncrement();
+		Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
+		seqHash.put(seq, cmd);
+		connection.sendCommand(cmd);
+		if (logger.isDebugEnabled())
+			logger.debug(cmd.getCommandString());
+	}
+	
+	public void quickTake(Receiver receiver, CodeSegment cs) {
+		int seq = this.seq.getAndIncrement();
+		Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null, true);
+		seqHash.put(seq, cmd);
+		connection.write(cmd);
+		if (logger.isDebugEnabled())
+			logger.debug(cmd.getCommandString());
+	}
+
+	@Override
+	public void peek(Receiver receiver, CodeSegment cs) {
+		int seq = this.seq.getAndIncrement();
+		Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
+		seqHash.put(seq, cmd);
+		connection.sendCommand(cmd);
+		if (logger.isDebugEnabled())
+			logger.debug(cmd.getCommandString());
+	}
+	
+	public void quickPeek(Receiver receiver, CodeSegment cs) {
+		int seq = this.seq.getAndIncrement();
+		Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null, true);
+		seqHash.put(seq, cmd);
+		connection.write(cmd);
+		if (logger.isDebugEnabled())
+			logger.debug(cmd.getCommandString());
+		
+	}
+
+	@Override
+	public void remove(String key) {
+		Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, null, null, null);
+		connection.sendCommand(cmd);
+		if (logger.isDebugEnabled())
+			logger.debug(cmd.getCommandString());
+	}
+
+	@Override
+	public void finish() {
+		Command cmd = new Command(CommandType.FINISH, null, null, null, 0, 0, null, null, null);
+		connection.sendCommand(cmd);
+	}
+
+	@Override
+	public void close() {
+		Command cmd = new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null);
+		connection.sendCommand(cmd);
+	}
+
+	@Override
+	public void ping(String returnKey) {
+		Command cmd = new Command(CommandType.PING, null, returnKey, null, 0, 0, null, null, null);
+		connection.write(cmd);
+	}
+	
+	@Override
+	public void response(String returnKey) {
+		Command cmd = new Command(CommandType.RESPONSE, null, returnKey, null, 0, 0, null, null, null);
+		connection.write(cmd);
+	}
+
+	@Override
+	public void shutdown(String key) {
+		connection.close();
+	}
+
+
+}