345
|
1 package alice.daemon;
|
|
2
|
|
3 import java.io.EOFException;
|
|
4 import java.io.IOException;
|
|
5 import java.nio.channels.ClosedChannelException;
|
|
6
|
443
|
7 import org.msgpack.unpacker.MessagePackUnpacker;
|
345
|
8 import org.msgpack.unpacker.Unpacker;
|
|
9
|
|
10 import alice.codesegment.SingletonMessage;
|
|
11 import alice.datasegment.Command;
|
|
12 import alice.datasegment.CommandType;
|
|
13 import alice.datasegment.DataSegment;
|
|
14 import alice.datasegment.DataSegmentManager;
|
|
15 import alice.datasegment.LocalDataSegmentManager;
|
|
16 import alice.topology.manager.keeparive.RespondData;
|
|
17
|
|
18 public class IncomingTcpConnection extends Thread {
|
419
|
19
|
|
20 public Connection connection;
|
|
21 public DataSegmentManager manager;
|
|
22 public String reverseKey;
|
|
23 private LocalDataSegmentManager lmanager = DataSegment.getLocal();
|
|
24
|
|
25 public IncomingTcpConnection(DataSegmentManager manager) {
|
|
26 this.manager = manager;
|
|
27 }
|
|
28
|
|
29 public IncomingTcpConnection(Connection connection, DataSegmentManager manager, String reverseKey) {
|
|
30 this.manager = manager;
|
|
31 this.connection = connection;
|
|
32 this.reverseKey = reverseKey;
|
|
33 }
|
|
34
|
|
35 public LocalDataSegmentManager getLocalDataSegmentManager(){
|
|
36 return lmanager;
|
|
37 }
|
345
|
38
|
419
|
39 /**
|
|
40 * pipeline thread for receiving
|
|
41 */
|
|
42 public void run() {
|
|
43 Unpacker unpacker = null;
|
|
44 try {
|
445
|
45 unpacker = SingletonMessage.getInstance().createUnpacker(connection.socket.getInputStream());
|
419
|
46 } catch (IOException e) {
|
|
47 e.printStackTrace();
|
|
48 }
|
|
49 if (unpacker == null) {
|
|
50 return;
|
|
51 }
|
|
52 while (true) {
|
|
53 try {
|
447
|
54 Command cmd = null;
|
443
|
55 byte[] val = null;
|
419
|
56 CommandMessage msg = unpacker.read(CommandMessage.class);
|
|
57 CommandType type = CommandType.getCommandTypeFromId(msg.type);
|
|
58 switch (type) {
|
|
59 case UPDATE:
|
|
60 case PUT:
|
443
|
61 val = getSerializedByteArray(unpacker);
|
446
|
62 cmd = new Command(type, null, null, val, 0, 0, null, null, reverseKey);
|
452
|
63 // these flags express DataSegment status
|
|
64 cmd.setCompressFlag(msg.compressed);
|
|
65 cmd.setSerializeFlag(msg.serialized);
|
446
|
66 lmanager.getDataSegmentKey(msg.key).runCommand(cmd);
|
419
|
67 break;
|
|
68 case PEEK:
|
|
69 case TAKE:
|
446
|
70 cmd = new Command(type, null, null, null, msg.index, msg.seq, null, null, connection);
|
449
|
71 cmd.setQuickFlag(msg.quickFlag);
|
446
|
72 lmanager.getDataSegmentKey(msg.key).runCommand(cmd);
|
419
|
73 break;
|
|
74 case REMOVE:
|
446
|
75 cmd = new Command(type, null, null, null, 0, 0, null, null, "");
|
|
76 lmanager.getDataSegmentKey(msg.key).runCommand(cmd);
|
419
|
77 break;
|
|
78 case REPLY:
|
446
|
79 cmd = manager.getAndRemoveCmd(msg.seq);
|
443
|
80 val = getSerializedByteArray(unpacker);
|
452
|
81 Command rCmd = new Command(type, null, null, val, msg.index, msg.seq, null, null, "");
|
|
82 rCmd.setCompressFlag(msg.compressed);
|
|
83 rCmd.setSerializeFlag(msg.serialized);
|
|
84 cmd.cs.ids.reply(cmd.receiver, rCmd);
|
419
|
85 break;
|
|
86 case PING:
|
|
87 DataSegment.get(reverseKey).response(msg.key);
|
|
88 break;
|
|
89 case RESPONSE:
|
|
90 DataSegment.getLocal().put(msg.key, new RespondData(reverseKey, System.currentTimeMillis()));
|
|
91 break;
|
|
92 default:
|
|
93 break;
|
|
94 }
|
|
95 } catch (ClosedChannelException e) {
|
|
96 connection.putConnectionInfo();
|
446
|
97 connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, reverseKey));
|
419
|
98 return;
|
|
99 } catch (EOFException e) {
|
446
|
100 connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, reverseKey));
|
419
|
101 return;
|
|
102 } catch (IOException e) {
|
|
103 e.printStackTrace();
|
|
104 }
|
|
105 }
|
443
|
106
|
419
|
107 }
|
345
|
108
|
443
|
109 private byte[] getSerializedByteArray(Unpacker unpacker) {
|
|
110 int len;
|
|
111 byte[] b = null;
|
|
112 try {
|
|
113 len = unpacker.readInt();
|
|
114 b = ((MessagePackUnpacker) unpacker).getSerializedByteArray(len);
|
|
115 } catch (IOException e) {
|
|
116 e.printStackTrace();
|
|
117 }
|
|
118 return b;
|
|
119 }
|
345
|
120 }
|