annotate src/main/java/alice/daemon/IncomingTcpConnection.java @ 452:f68d103498e0 dispose

refactor (InputDataSegment holder class changed)
author sugi
date Tue, 28 Oct 2014 17:24:16 +0900
parents 09a80f83c605
children b004f62b83e5
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
345
8f71c3e6f11d Change directory structure Maven standard
sugi
parents:
diff changeset
1 package alice.daemon;
8f71c3e6f11d Change directory structure Maven standard
sugi
parents:
diff changeset
2
8f71c3e6f11d Change directory structure Maven standard
sugi
parents:
diff changeset
3 import java.io.EOFException;
8f71c3e6f11d Change directory structure Maven standard
sugi
parents:
diff changeset
4 import java.io.IOException;
8f71c3e6f11d Change directory structure Maven standard
sugi
parents:
diff changeset
5 import java.nio.channels.ClosedChannelException;
8f71c3e6f11d Change directory structure Maven standard
sugi
parents:
diff changeset
6
443
2f2623484b77 change protocol
sugi
parents: 419
diff changeset
7 import org.msgpack.unpacker.MessagePackUnpacker;
345
8f71c3e6f11d Change directory structure Maven standard
sugi
parents:
diff changeset
8 import org.msgpack.unpacker.Unpacker;
8f71c3e6f11d Change directory structure Maven standard
sugi
parents:
diff changeset
9
8f71c3e6f11d Change directory structure Maven standard
sugi
parents:
diff changeset
10 import alice.codesegment.SingletonMessage;
8f71c3e6f11d Change directory structure Maven standard
sugi
parents:
diff changeset
11 import alice.datasegment.Command;
8f71c3e6f11d Change directory structure Maven standard
sugi
parents:
diff changeset
12 import alice.datasegment.CommandType;
8f71c3e6f11d Change directory structure Maven standard
sugi
parents:
diff changeset
13 import alice.datasegment.DataSegment;
8f71c3e6f11d Change directory structure Maven standard
sugi
parents:
diff changeset
14 import alice.datasegment.DataSegmentManager;
8f71c3e6f11d Change directory structure Maven standard
sugi
parents:
diff changeset
15 import alice.datasegment.LocalDataSegmentManager;
8f71c3e6f11d Change directory structure Maven standard
sugi
parents:
diff changeset
16 import alice.topology.manager.keeparive.RespondData;
8f71c3e6f11d Change directory structure Maven standard
sugi
parents:
diff changeset
17
8f71c3e6f11d Change directory structure Maven standard
sugi
parents:
diff changeset
18 public class IncomingTcpConnection extends Thread {
419
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
19
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
20 public Connection connection;
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
21 public DataSegmentManager manager;
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
22 public String reverseKey;
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
23 private LocalDataSegmentManager lmanager = DataSegment.getLocal();
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
24
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
25 public IncomingTcpConnection(DataSegmentManager manager) {
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
26 this.manager = manager;
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
27 }
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
28
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
29 public IncomingTcpConnection(Connection connection, DataSegmentManager manager, String reverseKey) {
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
30 this.manager = manager;
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
31 this.connection = connection;
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
32 this.reverseKey = reverseKey;
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
33 }
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
34
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
35 public LocalDataSegmentManager getLocalDataSegmentManager(){
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
36 return lmanager;
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
37 }
345
8f71c3e6f11d Change directory structure Maven standard
sugi
parents:
diff changeset
38
419
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
39 /**
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
40 * pipeline thread for receiving
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
41 */
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
42 public void run() {
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
43 Unpacker unpacker = null;
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
44 try {
445
86b74532e66c change Protocol
sugi
parents: 443
diff changeset
45 unpacker = SingletonMessage.getInstance().createUnpacker(connection.socket.getInputStream());
419
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
46 } catch (IOException e) {
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
47 e.printStackTrace();
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
48 }
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
49 if (unpacker == null) {
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
50 return;
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
51 }
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
52 while (true) {
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
53 try {
447
d30451d1882f fix IncomingUdpConnection and passed UdpTest
sugi
parents: 446
diff changeset
54 Command cmd = null;
443
2f2623484b77 change protocol
sugi
parents: 419
diff changeset
55 byte[] val = null;
419
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
56 CommandMessage msg = unpacker.read(CommandMessage.class);
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
57 CommandType type = CommandType.getCommandTypeFromId(msg.type);
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
58 switch (type) {
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
59 case UPDATE:
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
60 case PUT:
443
2f2623484b77 change protocol
sugi
parents: 419
diff changeset
61 val = getSerializedByteArray(unpacker);
446
a91890dff56e refactor
sugi
parents: 445
diff changeset
62 cmd = new Command(type, null, null, val, 0, 0, null, null, reverseKey);
452
f68d103498e0 refactor (InputDataSegment holder class changed)
sugi
parents: 449
diff changeset
63 // these flags express DataSegment status
f68d103498e0 refactor (InputDataSegment holder class changed)
sugi
parents: 449
diff changeset
64 cmd.setCompressFlag(msg.compressed);
f68d103498e0 refactor (InputDataSegment holder class changed)
sugi
parents: 449
diff changeset
65 cmd.setSerializeFlag(msg.serialized);
446
a91890dff56e refactor
sugi
parents: 445
diff changeset
66 lmanager.getDataSegmentKey(msg.key).runCommand(cmd);
419
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
67 break;
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
68 case PEEK:
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
69 case TAKE:
446
a91890dff56e refactor
sugi
parents: 445
diff changeset
70 cmd = new Command(type, null, null, null, msg.index, msg.seq, null, null, connection);
449
09a80f83c605 add Serialized and compressed flag on protocol header
sugi
parents: 447
diff changeset
71 cmd.setQuickFlag(msg.quickFlag);
446
a91890dff56e refactor
sugi
parents: 445
diff changeset
72 lmanager.getDataSegmentKey(msg.key).runCommand(cmd);
419
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
73 break;
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
74 case REMOVE:
446
a91890dff56e refactor
sugi
parents: 445
diff changeset
75 cmd = new Command(type, null, null, null, 0, 0, null, null, "");
a91890dff56e refactor
sugi
parents: 445
diff changeset
76 lmanager.getDataSegmentKey(msg.key).runCommand(cmd);
419
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
77 break;
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
78 case REPLY:
446
a91890dff56e refactor
sugi
parents: 445
diff changeset
79 cmd = manager.getAndRemoveCmd(msg.seq);
443
2f2623484b77 change protocol
sugi
parents: 419
diff changeset
80 val = getSerializedByteArray(unpacker);
452
f68d103498e0 refactor (InputDataSegment holder class changed)
sugi
parents: 449
diff changeset
81 Command rCmd = new Command(type, null, null, val, msg.index, msg.seq, null, null, "");
f68d103498e0 refactor (InputDataSegment holder class changed)
sugi
parents: 449
diff changeset
82 rCmd.setCompressFlag(msg.compressed);
f68d103498e0 refactor (InputDataSegment holder class changed)
sugi
parents: 449
diff changeset
83 rCmd.setSerializeFlag(msg.serialized);
f68d103498e0 refactor (InputDataSegment holder class changed)
sugi
parents: 449
diff changeset
84 cmd.cs.ids.reply(cmd.receiver, rCmd);
419
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
85 break;
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
86 case PING:
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
87 DataSegment.get(reverseKey).response(msg.key);
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
88 break;
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
89 case RESPONSE:
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
90 DataSegment.getLocal().put(msg.key, new RespondData(reverseKey, System.currentTimeMillis()));
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
91 break;
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
92 default:
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
93 break;
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
94 }
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
95 } catch (ClosedChannelException e) {
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
96 connection.putConnectionInfo();
446
a91890dff56e refactor
sugi
parents: 445
diff changeset
97 connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, reverseKey));
419
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
98 return;
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
99 } catch (EOFException e) {
446
a91890dff56e refactor
sugi
parents: 445
diff changeset
100 connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, reverseKey));
419
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
101 return;
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
102 } catch (IOException e) {
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
103 e.printStackTrace();
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
104 }
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
105 }
443
2f2623484b77 change protocol
sugi
parents: 419
diff changeset
106
419
aefbe41fcf12 change tab to space
sugi
parents: 417
diff changeset
107 }
345
8f71c3e6f11d Change directory structure Maven standard
sugi
parents:
diff changeset
108
443
2f2623484b77 change protocol
sugi
parents: 419
diff changeset
109 private byte[] getSerializedByteArray(Unpacker unpacker) {
2f2623484b77 change protocol
sugi
parents: 419
diff changeset
110 int len;
2f2623484b77 change protocol
sugi
parents: 419
diff changeset
111 byte[] b = null;
2f2623484b77 change protocol
sugi
parents: 419
diff changeset
112 try {
2f2623484b77 change protocol
sugi
parents: 419
diff changeset
113 len = unpacker.readInt();
2f2623484b77 change protocol
sugi
parents: 419
diff changeset
114 b = ((MessagePackUnpacker) unpacker).getSerializedByteArray(len);
2f2623484b77 change protocol
sugi
parents: 419
diff changeset
115 } catch (IOException e) {
2f2623484b77 change protocol
sugi
parents: 419
diff changeset
116 e.printStackTrace();
2f2623484b77 change protocol
sugi
parents: 419
diff changeset
117 }
2f2623484b77 change protocol
sugi
parents: 419
diff changeset
118 return b;
2f2623484b77 change protocol
sugi
parents: 419
diff changeset
119 }
345
8f71c3e6f11d Change directory structure Maven standard
sugi
parents:
diff changeset
120 }