345
|
1 package alice.daemon;
|
|
2
|
|
3 import java.io.EOFException;
|
|
4 import java.io.IOException;
|
|
5 import java.nio.channels.ClosedChannelException;
|
|
6
|
|
7 import org.msgpack.unpacker.Unpacker;
|
|
8
|
|
9 import alice.codesegment.SingletonMessage;
|
|
10 import alice.datasegment.Command;
|
|
11 import alice.datasegment.CommandType;
|
|
12 import alice.datasegment.DataSegment;
|
|
13 import alice.datasegment.DataSegmentKey;
|
|
14 import alice.datasegment.DataSegmentManager;
|
|
15 import alice.datasegment.LocalDataSegmentManager;
|
|
16 import alice.topology.HostMessage;
|
|
17 import alice.topology.manager.keeparive.RespondData;
|
|
18 import alice.topology.manager.reconnection.SendError;
|
|
19
|
|
20 public class IncomingTcpConnection extends Thread {
|
|
21
|
|
22 public Connection connection;
|
|
23 public DataSegmentManager manager;
|
|
24 public String reverseKey;
|
|
25 private LocalDataSegmentManager lmanager = DataSegment.getLocal();
|
|
26
|
|
27 public IncomingTcpConnection(Connection connection, DataSegmentManager manager, String reverseKey) {
|
|
28 this.manager = manager;
|
|
29 this.connection = connection;
|
|
30 this.reverseKey = reverseKey;
|
|
31 }
|
|
32
|
|
33 /**
|
|
34 * pipeline thread for receiving
|
|
35 */
|
|
36 public void run() {
|
|
37 Unpacker unpacker = this.getUnpacker();
|
|
38 if (unpacker == null) {
|
|
39 return;
|
|
40 }
|
|
41 while (true) {
|
|
42 try {
|
|
43 CommandMessage msg = unpacker.read(CommandMessage.class);
|
|
44 CommandType type = CommandType.getCommandTypeFromId(msg.type);
|
|
45 switch (type) {
|
|
46 case UPDATE:
|
|
47 getDataSegmentKey(msg).runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey));
|
|
48 break;
|
|
49 case PUT:
|
|
50 getDataSegmentKey(msg).runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey));
|
|
51 break;
|
|
52 case PEEK:
|
|
53 getDataSegmentKey(msg).runCommand(new Command(type, null, null, null, msg.index, msg.seq, connection, null, null, msg.flag));
|
|
54 break;
|
|
55 case TAKE:
|
|
56 getDataSegmentKey(msg).runCommand(new Command(type, null, null, null, msg.index, msg.seq, connection, null, null, msg.flag));
|
|
57 break;
|
|
58 case REMOVE:
|
|
59 getDataSegmentKey(msg).runCommand(new Command(type, null, null, null, 0, 0, null, null, null));
|
|
60 break;
|
|
61 case REPLY:
|
|
62 Command cmd = manager.getAndRemoveCmd(msg.seq);
|
|
63 cmd.cs.ids.reply(cmd.receiver, new Command(type, null, null, msg.val, msg.index, msg.seq, null, null, null));
|
|
64 cmd=null;
|
|
65 break;
|
|
66 case PING:
|
|
67 DataSegment.get(reverseKey).response(msg.key);
|
|
68 break;
|
|
69 case RESPONSE:
|
|
70 DataSegment.getLocal().put(msg.key, new RespondData(reverseKey, System.currentTimeMillis()));
|
|
71 break;
|
|
72 default:
|
|
73 break;
|
|
74 }
|
|
75 } catch (ClosedChannelException e) {
|
|
76 connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null));
|
|
77 return;
|
|
78 } catch (EOFException e) {
|
|
79 new SendError(new HostMessage(connection.socket.getInetAddress().getHostName(), connection.socket.getPort())).execute();
|
|
80 connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null));
|
|
81 return;
|
|
82 } catch (IOException e) {
|
|
83 e.printStackTrace();
|
|
84 }
|
|
85 }
|
|
86 }
|
|
87
|
|
88 private Unpacker getUnpacker() {
|
|
89 Unpacker unpacker = null;
|
|
90 try {
|
|
91 unpacker = SingletonMessage.getInstance().createUnpacker(connection.socket.getInputStream());
|
|
92 } catch (IOException e2) {
|
|
93 e2.printStackTrace();
|
|
94 }
|
|
95 return unpacker;
|
|
96 }
|
|
97
|
|
98 private DataSegmentKey getDataSegmentKey(CommandMessage msg) {
|
|
99 return lmanager.getDataSegmentKey(msg.key);
|
|
100 }
|
|
101 }
|