Mercurial > hg > Database > Alice
diff src/main/java/alice/datasegment/RemoteDataSegmentManager.java @ 345:8f71c3e6f11d
Change directory structure Maven standard
author | sugi |
---|---|
date | Wed, 16 Apr 2014 18:26:07 +0900 |
parents | |
children | 388e7d4b0624 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/alice/datasegment/RemoteDataSegmentManager.java Wed Apr 16 18:26:07 2014 +0900 @@ -0,0 +1,166 @@ +package alice.datasegment; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.channels.SocketChannel; + +import org.apache.log4j.Logger; + +import alice.codesegment.CodeSegment; +import alice.daemon.Connection; +import alice.daemon.IncomingTcpConnection; +import alice.daemon.OutboundTcpConnection; +import alice.topology.HostMessage; +import alice.topology.manager.reconnection.SendError; + +public class RemoteDataSegmentManager extends DataSegmentManager { + + Connection connection; + Logger logger; + + public RemoteDataSegmentManager(String connectionKey, final String reverseKey, final String hostName, final int port, final boolean rFlag) { + logger = Logger.getLogger(connectionKey); + connection = new Connection(); + final RemoteDataSegmentManager manager = this; + new Thread(replyThread, "RemoteDataSegmentManager-" + connectionKey).start(); + new Thread("Connect-" + connectionKey) { + public void run() { + boolean connect = true; + do { + try { + SocketChannel sc = SocketChannel.open(new InetSocketAddress(hostName, port)); + connection.socket = sc.socket(); + connection.socket.setTcpNoDelay(true); + connect = false; + logger.info("Connect to " + connection.getInfoString()); + } catch (IOException e) { + try { + Thread.sleep(50); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + } + } while (connect&&!rFlag); + new IncomingTcpConnection(connection, manager, reverseKey).start(); + new OutboundTcpConnection(connection).start(); + // if connection failed need to stop these thread + if (connect){ + new SendError(new HostMessage(hostName, port)).execute(); + } + } + }.start(); + } + + /** + * send put command to target DataSegment + */ + @Override + public void put(String key, Object val) { + Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null); + connection.sendCommand(cmd); // put command on the transmission thread + if (logger.isDebugEnabled()) + logger.debug(cmd.getCommandString()); + } + + @Override + public void quickPut(String key, Object val) { + Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null); + connection.write(cmd); // put command is executed right now + if (logger.isDebugEnabled()) + logger.debug(cmd.getCommandString()); + } + + @Override + public void update(String key, Object val) { + Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null); + connection.sendCommand(cmd); + if (logger.isDebugEnabled()) + logger.debug(cmd.getCommandString()); + } + + @Override + public void quickUpdate(String key, Object val) { + Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null); + connection.write(cmd); + if (logger.isDebugEnabled()) + logger.debug(cmd.getCommandString()); + } + + @Override + public void take(Receiver receiver, CodeSegment cs) { + int seq = this.seq.getAndIncrement(); + Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); + seqHash.put(seq, cmd); + connection.sendCommand(cmd); + if (logger.isDebugEnabled()) + logger.debug(cmd.getCommandString()); + } + + public void quickTake(Receiver receiver, CodeSegment cs) { + int seq = this.seq.getAndIncrement(); + Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null, true); + seqHash.put(seq, cmd); + connection.write(cmd); + if (logger.isDebugEnabled()) + logger.debug(cmd.getCommandString()); + } + + @Override + public void peek(Receiver receiver, CodeSegment cs) { + int seq = this.seq.getAndIncrement(); + Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); + seqHash.put(seq, cmd); + connection.sendCommand(cmd); + if (logger.isDebugEnabled()) + logger.debug(cmd.getCommandString()); + } + + public void quickPeek(Receiver receiver, CodeSegment cs) { + int seq = this.seq.getAndIncrement(); + Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null, true); + seqHash.put(seq, cmd); + connection.write(cmd); + if (logger.isDebugEnabled()) + logger.debug(cmd.getCommandString()); + + } + + @Override + public void remove(String key) { + Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, null, null, null); + connection.sendCommand(cmd); + if (logger.isDebugEnabled()) + logger.debug(cmd.getCommandString()); + } + + @Override + public void finish() { + Command cmd = new Command(CommandType.FINISH, null, null, null, 0, 0, null, null, null); + connection.sendCommand(cmd); + } + + @Override + public void close() { + Command cmd = new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null); + connection.sendCommand(cmd); + } + + @Override + public void ping(String returnKey) { + Command cmd = new Command(CommandType.PING, null, returnKey, null, 0, 0, null, null, null); + connection.write(cmd); + } + + @Override + public void response(String returnKey) { + Command cmd = new Command(CommandType.RESPONSE, null, returnKey, null, 0, 0, null, null, null); + connection.write(cmd); + } + + @Override + public void shutdown(String key) { + connection.close(); + } + + +}