annotate src/alice/datasegment/RemoteDataSegmentManager.java @ 254:2ec10cfa8cc3

refactor
author sugi
date Mon, 01 Jul 2013 20:00:07 +0900
parents 32e7d5271477
children b4690114a0cd
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
12
c4d6ff56b9bf unite Command and Reply
one
parents:
diff changeset
1 package alice.datasegment;
c4d6ff56b9bf unite Command and Reply
one
parents:
diff changeset
2
23
54bf607118ae change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 20
diff changeset
3 import java.io.IOException;
54bf607118ae change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 20
diff changeset
4 import java.net.InetSocketAddress;
54bf607118ae change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 20
diff changeset
5 import java.nio.channels.SocketChannel;
54bf607118ae change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 20
diff changeset
6
25
50c75cb3de60 implements TopologyNode
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 23
diff changeset
7 import org.apache.log4j.Logger;
12
c4d6ff56b9bf unite Command and Reply
one
parents:
diff changeset
8 import org.msgpack.type.Value;
c4d6ff56b9bf unite Command and Reply
one
parents:
diff changeset
9
c4d6ff56b9bf unite Command and Reply
one
parents:
diff changeset
10 import alice.codesegment.CodeSegment;
14
e3f1b21718b0 implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 13
diff changeset
11 import alice.daemon.Connection;
e3f1b21718b0 implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 13
diff changeset
12 import alice.daemon.IncomingTcpConnection;
e3f1b21718b0 implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 13
diff changeset
13 import alice.daemon.OutboundTcpConnection;
12
c4d6ff56b9bf unite Command and Reply
one
parents:
diff changeset
14
14
e3f1b21718b0 implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 13
diff changeset
15 public class RemoteDataSegmentManager extends DataSegmentManager {
e3f1b21718b0 implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 13
diff changeset
16
e3f1b21718b0 implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 13
diff changeset
17 Connection connection;
39
3155337e754e add logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 34
diff changeset
18 Logger logger;
14
e3f1b21718b0 implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 13
diff changeset
19
28
98ab26e09a98 Configuration Manager work and implements reverseKey
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 27
diff changeset
20 public RemoteDataSegmentManager(String connectionKey, final String reverseKey, final String hostName, final int port) {
39
3155337e754e add logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 34
diff changeset
21 logger = Logger.getLogger(connectionKey);
23
54bf607118ae change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 20
diff changeset
22 connection = new Connection();
54bf607118ae change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 20
diff changeset
23 final RemoteDataSegmentManager manager = this;
28
98ab26e09a98 Configuration Manager work and implements reverseKey
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 27
diff changeset
24 new Thread(replyThread, "RemoteDataSegmentManager-" + connectionKey).start();
98ab26e09a98 Configuration Manager work and implements reverseKey
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 27
diff changeset
25 new Thread("Connect-" + connectionKey) {
23
54bf607118ae change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 20
diff changeset
26 public void run() {
54bf607118ae change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 20
diff changeset
27 boolean connect = true;
54bf607118ae change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 20
diff changeset
28 do {
54bf607118ae change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 20
diff changeset
29 try {
54bf607118ae change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 20
diff changeset
30 SocketChannel sc = SocketChannel.open(new InetSocketAddress(hostName, port));
54bf607118ae change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 20
diff changeset
31 connection.socket = sc.socket();
63
498d1d2524d3 change getDataSegmentKey
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 58
diff changeset
32 connection.socket.setTcpNoDelay(true);
23
54bf607118ae change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 20
diff changeset
33 connect = false;
39
3155337e754e add logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 34
diff changeset
34 logger.info("Connect to " + connection.getInfoString());
23
54bf607118ae change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 20
diff changeset
35 } catch (IOException e) {
54bf607118ae change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 20
diff changeset
36 try {
54
27a64e771c4c change connection wait time
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 41
diff changeset
37 Thread.sleep(50);
23
54bf607118ae change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 20
diff changeset
38 } catch (InterruptedException e1) {
54bf607118ae change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 20
diff changeset
39 e1.printStackTrace();
54bf607118ae change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 20
diff changeset
40 }
54bf607118ae change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 20
diff changeset
41 }
54bf607118ae change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 20
diff changeset
42 } while (connect);
28
98ab26e09a98 Configuration Manager work and implements reverseKey
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 27
diff changeset
43 new IncomingTcpConnection(connection, manager, reverseKey).start();
23
54bf607118ae change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 20
diff changeset
44 new OutboundTcpConnection(connection).start();
54bf607118ae change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 20
diff changeset
45 }
54bf607118ae change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 20
diff changeset
46 }.start();
54bf607118ae change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 20
diff changeset
47 }
54bf607118ae change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 20
diff changeset
48
58
ebdcab7b9b04 add comment
one
parents: 54
diff changeset
49 /**
ebdcab7b9b04 add comment
one
parents: 54
diff changeset
50 * send put command to target DataSegment
ebdcab7b9b04 add comment
one
parents: 54
diff changeset
51 */
12
c4d6ff56b9bf unite Command and Reply
one
parents:
diff changeset
52 @Override
132
1044a79ce4ef delete cs from OutputCodeSegment
sugi
parents: 71
diff changeset
53 public void put(String key, Value val) {
1044a79ce4ef delete cs from OutputCodeSegment
sugi
parents: 71
diff changeset
54 Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null);
58
ebdcab7b9b04 add comment
one
parents: 54
diff changeset
55 connection.sendCommand(cmd); // put command on the transmission thread
71
a3a2605e16a2 change checking debug mode of logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 68
diff changeset
56 if (logger.isDebugEnabled())
68
d4c7f7b1096b remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 66
diff changeset
57 logger.debug(cmd.getCommandString());
12
c4d6ff56b9bf unite Command and Reply
one
parents:
diff changeset
58 }
c4d6ff56b9bf unite Command and Reply
one
parents:
diff changeset
59
c4d6ff56b9bf unite Command and Reply
one
parents:
diff changeset
60 @Override
252
b78f52865b8d no use Queue API
sugi
parents: 132
diff changeset
61 public void quickPut(String key, Value val) {
b78f52865b8d no use Queue API
sugi
parents: 132
diff changeset
62 Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null);
b78f52865b8d no use Queue API
sugi
parents: 132
diff changeset
63 connection.write(cmd); // put command is executed right now
b78f52865b8d no use Queue API
sugi
parents: 132
diff changeset
64 if (logger.isDebugEnabled())
b78f52865b8d no use Queue API
sugi
parents: 132
diff changeset
65 logger.debug(cmd.getCommandString());
b78f52865b8d no use Queue API
sugi
parents: 132
diff changeset
66 }
b78f52865b8d no use Queue API
sugi
parents: 132
diff changeset
67
b78f52865b8d no use Queue API
sugi
parents: 132
diff changeset
68 @Override
132
1044a79ce4ef delete cs from OutputCodeSegment
sugi
parents: 71
diff changeset
69 public void update(String key, Value val) {
1044a79ce4ef delete cs from OutputCodeSegment
sugi
parents: 71
diff changeset
70 Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null);
39
3155337e754e add logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 34
diff changeset
71 connection.sendCommand(cmd);
71
a3a2605e16a2 change checking debug mode of logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 68
diff changeset
72 if (logger.isDebugEnabled())
68
d4c7f7b1096b remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 66
diff changeset
73 logger.debug(cmd.getCommandString());
12
c4d6ff56b9bf unite Command and Reply
one
parents:
diff changeset
74 }
252
b78f52865b8d no use Queue API
sugi
parents: 132
diff changeset
75
b78f52865b8d no use Queue API
sugi
parents: 132
diff changeset
76 @Override
b78f52865b8d no use Queue API
sugi
parents: 132
diff changeset
77 public void quickUpdate(String key, Value val) {
b78f52865b8d no use Queue API
sugi
parents: 132
diff changeset
78 Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null);
b78f52865b8d no use Queue API
sugi
parents: 132
diff changeset
79 connection.write(cmd);
b78f52865b8d no use Queue API
sugi
parents: 132
diff changeset
80 if (logger.isDebugEnabled())
b78f52865b8d no use Queue API
sugi
parents: 132
diff changeset
81 logger.debug(cmd.getCommandString());
b78f52865b8d no use Queue API
sugi
parents: 132
diff changeset
82
b78f52865b8d no use Queue API
sugi
parents: 132
diff changeset
83 }
12
c4d6ff56b9bf unite Command and Reply
one
parents:
diff changeset
84
c4d6ff56b9bf unite Command and Reply
one
parents:
diff changeset
85 @Override
254
2ec10cfa8cc3 refactor
sugi
parents: 253
diff changeset
86 public void take(Receiver receiver, CodeSegment cs) {
14
e3f1b21718b0 implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 13
diff changeset
87 int seq = this.seq.getAndIncrement();
254
2ec10cfa8cc3 refactor
sugi
parents: 253
diff changeset
88 Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
14
e3f1b21718b0 implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 13
diff changeset
89 seqHash.put(seq, cmd);
39
3155337e754e add logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 34
diff changeset
90 connection.sendCommand(cmd);
71
a3a2605e16a2 change checking debug mode of logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 68
diff changeset
91 if (logger.isDebugEnabled())
68
d4c7f7b1096b remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 66
diff changeset
92 logger.debug(cmd.getCommandString());
12
c4d6ff56b9bf unite Command and Reply
one
parents:
diff changeset
93 }
252
b78f52865b8d no use Queue API
sugi
parents: 132
diff changeset
94
b78f52865b8d no use Queue API
sugi
parents: 132
diff changeset
95 @Override
254
2ec10cfa8cc3 refactor
sugi
parents: 253
diff changeset
96 public void quickTake(Receiver receiver, CodeSegment cs) {
252
b78f52865b8d no use Queue API
sugi
parents: 132
diff changeset
97 int seq = this.seq.getAndIncrement();
254
2ec10cfa8cc3 refactor
sugi
parents: 253
diff changeset
98 Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null, true);
252
b78f52865b8d no use Queue API
sugi
parents: 132
diff changeset
99 seqHash.put(seq, cmd);
b78f52865b8d no use Queue API
sugi
parents: 132
diff changeset
100 connection.write(cmd);
b78f52865b8d no use Queue API
sugi
parents: 132
diff changeset
101 if (logger.isDebugEnabled())
b78f52865b8d no use Queue API
sugi
parents: 132
diff changeset
102 logger.debug(cmd.getCommandString());
b78f52865b8d no use Queue API
sugi
parents: 132
diff changeset
103 }
12
c4d6ff56b9bf unite Command and Reply
one
parents:
diff changeset
104
c4d6ff56b9bf unite Command and Reply
one
parents:
diff changeset
105 @Override
254
2ec10cfa8cc3 refactor
sugi
parents: 253
diff changeset
106 public void peek(Receiver receiver, CodeSegment cs) {
14
e3f1b21718b0 implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 13
diff changeset
107 int seq = this.seq.getAndIncrement();
254
2ec10cfa8cc3 refactor
sugi
parents: 253
diff changeset
108 Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
14
e3f1b21718b0 implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 13
diff changeset
109 seqHash.put(seq, cmd);
e3f1b21718b0 implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 13
diff changeset
110 connection.sendCommand(cmd);
71
a3a2605e16a2 change checking debug mode of logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 68
diff changeset
111 if (logger.isDebugEnabled())
68
d4c7f7b1096b remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 66
diff changeset
112 logger.debug(cmd.getCommandString());
12
c4d6ff56b9bf unite Command and Reply
one
parents:
diff changeset
113 }
252
b78f52865b8d no use Queue API
sugi
parents: 132
diff changeset
114
b78f52865b8d no use Queue API
sugi
parents: 132
diff changeset
115
b78f52865b8d no use Queue API
sugi
parents: 132
diff changeset
116 @Override
254
2ec10cfa8cc3 refactor
sugi
parents: 253
diff changeset
117 public void quickPeek(Receiver receiver, CodeSegment cs) {
252
b78f52865b8d no use Queue API
sugi
parents: 132
diff changeset
118 int seq = this.seq.getAndIncrement();
254
2ec10cfa8cc3 refactor
sugi
parents: 253
diff changeset
119 Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null, true);
252
b78f52865b8d no use Queue API
sugi
parents: 132
diff changeset
120 seqHash.put(seq, cmd);
b78f52865b8d no use Queue API
sugi
parents: 132
diff changeset
121 connection.write(cmd);
b78f52865b8d no use Queue API
sugi
parents: 132
diff changeset
122 if (logger.isDebugEnabled())
b78f52865b8d no use Queue API
sugi
parents: 132
diff changeset
123 logger.debug(cmd.getCommandString());
b78f52865b8d no use Queue API
sugi
parents: 132
diff changeset
124
b78f52865b8d no use Queue API
sugi
parents: 132
diff changeset
125 }
12
c4d6ff56b9bf unite Command and Reply
one
parents:
diff changeset
126
c4d6ff56b9bf unite Command and Reply
one
parents:
diff changeset
127 @Override
c4d6ff56b9bf unite Command and Reply
one
parents:
diff changeset
128 public void remove(String key) {
39
3155337e754e add logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 34
diff changeset
129 Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, null, null, null);
3155337e754e add logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 34
diff changeset
130 connection.sendCommand(cmd);
71
a3a2605e16a2 change checking debug mode of logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 68
diff changeset
131 if (logger.isDebugEnabled())
68
d4c7f7b1096b remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 66
diff changeset
132 logger.debug(cmd.getCommandString());
12
c4d6ff56b9bf unite Command and Reply
one
parents:
diff changeset
133 }
41
f9334781344a add close api
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 40
diff changeset
134
f9334781344a add close api
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 40
diff changeset
135 @Override
30
b5a21baf0b07 implements RingTopology
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 28
diff changeset
136 public void finish() {
39
3155337e754e add logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 34
diff changeset
137 Command cmd = new Command(CommandType.FINISH, null, null, null, 0, 0, null, null, null);
3155337e754e add logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 34
diff changeset
138 connection.sendCommand(cmd);
30
b5a21baf0b07 implements RingTopology
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 28
diff changeset
139 }
13
30f97d776a3e implements Alice daemon
one
parents: 12
diff changeset
140
41
f9334781344a add close api
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 40
diff changeset
141 @Override
f9334781344a add close api
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 40
diff changeset
142 public void close() {
f9334781344a add close api
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 40
diff changeset
143 Command cmd = new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null);
f9334781344a add close api
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 40
diff changeset
144 connection.sendCommand(cmd);
f9334781344a add close api
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 40
diff changeset
145 }
f9334781344a add close api
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 40
diff changeset
146
252
b78f52865b8d no use Queue API
sugi
parents: 132
diff changeset
147
12
c4d6ff56b9bf unite Command and Reply
one
parents:
diff changeset
148 }