Mercurial > hg > Database > Alice
comparison src/main/java/alice/datasegment/RemoteDataSegmentManager.java @ 345:8f71c3e6f11d
Change directory structure Maven standard
author | sugi |
---|---|
date | Wed, 16 Apr 2014 18:26:07 +0900 |
parents | |
children | 388e7d4b0624 |
comparison
equal
deleted
inserted
replaced
344:9f97ec18f8c5 | 345:8f71c3e6f11d |
---|---|
1 package alice.datasegment; | |
2 | |
3 import java.io.IOException; | |
4 import java.net.InetSocketAddress; | |
5 import java.nio.channels.SocketChannel; | |
6 | |
7 import org.apache.log4j.Logger; | |
8 | |
9 import alice.codesegment.CodeSegment; | |
10 import alice.daemon.Connection; | |
11 import alice.daemon.IncomingTcpConnection; | |
12 import alice.daemon.OutboundTcpConnection; | |
13 import alice.topology.HostMessage; | |
14 import alice.topology.manager.reconnection.SendError; | |
15 | |
16 public class RemoteDataSegmentManager extends DataSegmentManager { | |
17 | |
18 Connection connection; | |
19 Logger logger; | |
20 | |
21 public RemoteDataSegmentManager(String connectionKey, final String reverseKey, final String hostName, final int port, final boolean rFlag) { | |
22 logger = Logger.getLogger(connectionKey); | |
23 connection = new Connection(); | |
24 final RemoteDataSegmentManager manager = this; | |
25 new Thread(replyThread, "RemoteDataSegmentManager-" + connectionKey).start(); | |
26 new Thread("Connect-" + connectionKey) { | |
27 public void run() { | |
28 boolean connect = true; | |
29 do { | |
30 try { | |
31 SocketChannel sc = SocketChannel.open(new InetSocketAddress(hostName, port)); | |
32 connection.socket = sc.socket(); | |
33 connection.socket.setTcpNoDelay(true); | |
34 connect = false; | |
35 logger.info("Connect to " + connection.getInfoString()); | |
36 } catch (IOException e) { | |
37 try { | |
38 Thread.sleep(50); | |
39 } catch (InterruptedException e1) { | |
40 e1.printStackTrace(); | |
41 } | |
42 } | |
43 } while (connect&&!rFlag); | |
44 new IncomingTcpConnection(connection, manager, reverseKey).start(); | |
45 new OutboundTcpConnection(connection).start(); | |
46 // if connection failed need to stop these thread | |
47 if (connect){ | |
48 new SendError(new HostMessage(hostName, port)).execute(); | |
49 } | |
50 } | |
51 }.start(); | |
52 } | |
53 | |
54 /** | |
55 * send put command to target DataSegment | |
56 */ | |
57 @Override | |
58 public void put(String key, Object val) { | |
59 Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null); | |
60 connection.sendCommand(cmd); // put command on the transmission thread | |
61 if (logger.isDebugEnabled()) | |
62 logger.debug(cmd.getCommandString()); | |
63 } | |
64 | |
65 @Override | |
66 public void quickPut(String key, Object val) { | |
67 Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null); | |
68 connection.write(cmd); // put command is executed right now | |
69 if (logger.isDebugEnabled()) | |
70 logger.debug(cmd.getCommandString()); | |
71 } | |
72 | |
73 @Override | |
74 public void update(String key, Object val) { | |
75 Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null); | |
76 connection.sendCommand(cmd); | |
77 if (logger.isDebugEnabled()) | |
78 logger.debug(cmd.getCommandString()); | |
79 } | |
80 | |
81 @Override | |
82 public void quickUpdate(String key, Object val) { | |
83 Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null); | |
84 connection.write(cmd); | |
85 if (logger.isDebugEnabled()) | |
86 logger.debug(cmd.getCommandString()); | |
87 } | |
88 | |
89 @Override | |
90 public void take(Receiver receiver, CodeSegment cs) { | |
91 int seq = this.seq.getAndIncrement(); | |
92 Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); | |
93 seqHash.put(seq, cmd); | |
94 connection.sendCommand(cmd); | |
95 if (logger.isDebugEnabled()) | |
96 logger.debug(cmd.getCommandString()); | |
97 } | |
98 | |
99 public void quickTake(Receiver receiver, CodeSegment cs) { | |
100 int seq = this.seq.getAndIncrement(); | |
101 Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null, true); | |
102 seqHash.put(seq, cmd); | |
103 connection.write(cmd); | |
104 if (logger.isDebugEnabled()) | |
105 logger.debug(cmd.getCommandString()); | |
106 } | |
107 | |
108 @Override | |
109 public void peek(Receiver receiver, CodeSegment cs) { | |
110 int seq = this.seq.getAndIncrement(); | |
111 Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); | |
112 seqHash.put(seq, cmd); | |
113 connection.sendCommand(cmd); | |
114 if (logger.isDebugEnabled()) | |
115 logger.debug(cmd.getCommandString()); | |
116 } | |
117 | |
118 public void quickPeek(Receiver receiver, CodeSegment cs) { | |
119 int seq = this.seq.getAndIncrement(); | |
120 Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null, true); | |
121 seqHash.put(seq, cmd); | |
122 connection.write(cmd); | |
123 if (logger.isDebugEnabled()) | |
124 logger.debug(cmd.getCommandString()); | |
125 | |
126 } | |
127 | |
128 @Override | |
129 public void remove(String key) { | |
130 Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, null, null, null); | |
131 connection.sendCommand(cmd); | |
132 if (logger.isDebugEnabled()) | |
133 logger.debug(cmd.getCommandString()); | |
134 } | |
135 | |
136 @Override | |
137 public void finish() { | |
138 Command cmd = new Command(CommandType.FINISH, null, null, null, 0, 0, null, null, null); | |
139 connection.sendCommand(cmd); | |
140 } | |
141 | |
142 @Override | |
143 public void close() { | |
144 Command cmd = new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null); | |
145 connection.sendCommand(cmd); | |
146 } | |
147 | |
148 @Override | |
149 public void ping(String returnKey) { | |
150 Command cmd = new Command(CommandType.PING, null, returnKey, null, 0, 0, null, null, null); | |
151 connection.write(cmd); | |
152 } | |
153 | |
154 @Override | |
155 public void response(String returnKey) { | |
156 Command cmd = new Command(CommandType.RESPONSE, null, returnKey, null, 0, 0, null, null, null); | |
157 connection.write(cmd); | |
158 } | |
159 | |
160 @Override | |
161 public void shutdown(String key) { | |
162 connection.close(); | |
163 } | |
164 | |
165 | |
166 } |