Mercurial > hg > Database > Alice
annotate src/main/java/alice/daemon/IncomingTcpConnection.java @ 627:f5365a5e6672 dispose
don't use unpacker
author | nozomi |
---|---|
date | Thu, 14 Sep 2017 18:00:15 +0900 |
parents | 4835788cbb7e |
children | 71dfb0051be0 |
rev | line source |
---|---|
345 | 1 package alice.daemon; |
2 | |
3 import java.io.EOFException; | |
4 import java.io.IOException; | |
5 import java.nio.channels.ClosedChannelException; | |
6 | |
527 | 7 import alice.datasegment.*; |
528
6ebddfac7ff6
delete RecieveData.setCompressFlag
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
527
diff
changeset
|
8 import org.msgpack.MessagePack; |
345 | 9 import org.msgpack.unpacker.Unpacker; |
10 | |
11 import alice.topology.manager.keeparive.RespondData; | |
12 | |
13 public class IncomingTcpConnection extends Thread { | |
419 | 14 |
480 | 15 private Connection connection; |
16 protected DataSegmentManager manager; | |
17 protected String reverseKey; | |
419 | 18 private LocalDataSegmentManager lmanager = DataSegment.getLocal(); |
529
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
19 private CompressedLocalDataSegmentManager compressedlmanager = DataSegment.getCompressedLocal(); |
528
6ebddfac7ff6
delete RecieveData.setCompressFlag
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
527
diff
changeset
|
20 private static final MessagePack packer = new MessagePack(); |
419 | 21 |
22 public IncomingTcpConnection(DataSegmentManager manager) { | |
23 this.manager = manager; | |
24 } | |
25 | |
26 public IncomingTcpConnection(Connection connection, DataSegmentManager manager, String reverseKey) { | |
27 this.manager = manager; | |
28 this.connection = connection; | |
29 this.reverseKey = reverseKey; | |
30 } | |
31 | |
32 public LocalDataSegmentManager getLocalDataSegmentManager(){ | |
33 return lmanager; | |
34 } | |
345 | 35 |
529
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
36 public CompressedLocalDataSegmentManager getCompressedLocalDataSegmentManager(){ |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
37 return compressedlmanager; |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
38 } |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
39 |
419 | 40 /** |
41 * pipeline thread for receiving | |
42 */ | |
43 public void run() { | |
44 Unpacker unpacker = null; | |
45 try { | |
528
6ebddfac7ff6
delete RecieveData.setCompressFlag
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
527
diff
changeset
|
46 unpacker = packer.createUnpacker(connection.socket.getInputStream()); |
419 | 47 } catch (IOException e) { |
48 e.printStackTrace(); | |
49 } | |
50 if (unpacker == null) { | |
51 return; | |
52 } | |
53 while (true) { | |
54 try { | |
447 | 55 Command cmd = null; |
458 | 56 ReceiveData rData = null; |
530 | 57 CommandMessage msg = unpacker.read(CommandMessage.class);///read header |
419 | 58 CommandType type = CommandType.getCommandTypeFromId(msg.type); |
59 switch (type) { | |
60 case UPDATE: | |
61 case PUT: | |
555
2af387211a85
add zippedDataSize to ReceiveData
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
554
diff
changeset
|
62 int dataSize = unpacker.readInt(); |
627 | 63 //byte[] data = unpacker.getSerializedByteArray(dataSize); |
64 byte[] data = new byte[dataSize]; | |
65 connection.socket.getInputStream().read(data); | |
66 rData = new ReceiveData(data, msg.compressed, msg.dataSize); | |
625
4835788cbb7e
fix TestRemote default arguments
Shinji KONO <kono@ie.u-ryukyu.ac.jp>
parents:
574
diff
changeset
|
67 |
4835788cbb7e
fix TestRemote default arguments
Shinji KONO <kono@ie.u-ryukyu.ac.jp>
parents:
574
diff
changeset
|
68 /* |
4835788cbb7e
fix TestRemote default arguments
Shinji KONO <kono@ie.u-ryukyu.ac.jp>
parents:
574
diff
changeset
|
69 rData = new ReceiveData(new byte[dataSize], msg.compressed, msg.dataSize); |
4835788cbb7e
fix TestRemote default arguments
Shinji KONO <kono@ie.u-ryukyu.ac.jp>
parents:
574
diff
changeset
|
70 try { |
4835788cbb7e
fix TestRemote default arguments
Shinji KONO <kono@ie.u-ryukyu.ac.jp>
parents:
574
diff
changeset
|
71 unpacker.readPayload(rData.getMessagePack()); |
4835788cbb7e
fix TestRemote default arguments
Shinji KONO <kono@ie.u-ryukyu.ac.jp>
parents:
574
diff
changeset
|
72 } catch ( ); |
4835788cbb7e
fix TestRemote default arguments
Shinji KONO <kono@ie.u-ryukyu.ac.jp>
parents:
574
diff
changeset
|
73 */ |
4835788cbb7e
fix TestRemote default arguments
Shinji KONO <kono@ie.u-ryukyu.ac.jp>
parents:
574
diff
changeset
|
74 |
526
928907206d21
remove CompressedRDSM & CompressedLDSM class
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
525
diff
changeset
|
75 |
488 | 76 if (msg.setTime) { |
554
1dc473a637c6
add setter/getter to setTime of ReceivedData
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
533
diff
changeset
|
77 rData.setTimes(msg.time, true, msg.depth); |
560
72ef96ba4195
add zeppedDataSize in CommandMessage
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
555
diff
changeset
|
78 } |
561
f1777341c5a2
add setZipped to CommandMessage&ReceiveData
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
560
diff
changeset
|
79 if (msg.setZepped){ |
f1777341c5a2
add setZipped to CommandMessage&ReceiveData
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
560
diff
changeset
|
80 rData.setZipped(msg.zippedDataSize, true); |
560
72ef96ba4195
add zeppedDataSize in CommandMessage
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
555
diff
changeset
|
81 } else { |
561
f1777341c5a2
add setZipped to CommandMessage&ReceiveData
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
560
diff
changeset
|
82 rData.setZipped(dataSize, true); |
488 | 83 } |
526
928907206d21
remove CompressedRDSM & CompressedLDSM class
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
525
diff
changeset
|
84 |
458 | 85 cmd = new Command(type, null, null, rData, 0, 0, null, null, reverseKey); |
530 | 86 cmd.setCompressFlag(msg.compressed); |
526
928907206d21
remove CompressedRDSM & CompressedLDSM class
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
525
diff
changeset
|
87 |
530 | 88 if (rData.compressed()){ |
89 compressedlmanager.getDataSegmentKey(msg.key).runCommand(cmd); | |
90 } else { | |
91 lmanager.getDataSegmentKey(msg.key).runCommand(cmd); | |
92 } | |
526
928907206d21
remove CompressedRDSM & CompressedLDSM class
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
525
diff
changeset
|
93 |
419 | 94 break; |
95 case PEEK: | |
96 case TAKE: | |
446 | 97 cmd = new Command(type, null, null, null, msg.index, msg.seq, null, null, connection); |
530 | 98 cmd.setCompressFlag(msg.compressed); |
526
928907206d21
remove CompressedRDSM & CompressedLDSM class
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
525
diff
changeset
|
99 |
530 | 100 if (msg.compressed){ |
101 compressedlmanager.getDataSegmentKey(msg.key).runCommand(cmd); | |
102 } else { | |
103 lmanager.getDataSegmentKey(msg.key).runCommand(cmd); | |
104 } | |
526
928907206d21
remove CompressedRDSM & CompressedLDSM class
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
525
diff
changeset
|
105 |
467 | 106 break; |
419 | 107 case REMOVE: |
446 | 108 cmd = new Command(type, null, null, null, 0, 0, null, null, ""); |
530 | 109 lmanager.getDataSegmentKey(msg.key).runCommand(cmd);//ToDo:fix |
419 | 110 break; |
111 case REPLY: | |
446 | 112 cmd = manager.getAndRemoveCmd(msg.seq); |
533
b3c9554ccb1b
change compressed API to set data specified DSM name
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
532
diff
changeset
|
113 |
574
ea21af9a4762
delete serializeFlag, fix MessagePack pack&unpack
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
561
diff
changeset
|
114 rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()), msg.compressed, msg.dataSize); |
526
928907206d21
remove CompressedRDSM & CompressedLDSM class
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
525
diff
changeset
|
115 |
467 | 116 Command rCmd = new Command(type, null, null, rData, msg.index, msg.seq, null, null, ""); |
530 | 117 cmd.setCompressFlag(msg.compressed); |
452 | 118 cmd.cs.ids.reply(cmd.receiver, rCmd); |
419 | 119 break; |
120 case PING: | |
471
be0b61986ff7
checking having DataSegmentManger before get DataSegmentManager
sugi
parents:
467
diff
changeset
|
121 if (DataSegment.contains(reverseKey)) |
be0b61986ff7
checking having DataSegmentManger before get DataSegmentManager
sugi
parents:
467
diff
changeset
|
122 DataSegment.get(reverseKey).response(msg.key); |
419 | 123 break; |
124 case RESPONSE: | |
525
30a74eee59c7
working TestRemoteAlice
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
524
diff
changeset
|
125 rData = new ReceiveData(new RespondData(reverseKey, System.currentTimeMillis())); |
533
b3c9554ccb1b
change compressed API to set data specified DSM name
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
532
diff
changeset
|
126 DataSegment.getLocal().put(msg.key, rData, false); |
419 | 127 break; |
128 default: | |
129 break; | |
130 } | |
131 } catch (ClosedChannelException e) { | |
132 return; | |
133 } catch (EOFException e) { | |
134 return; | |
135 } catch (IOException e) { | |
478 | 136 return; |
419 | 137 } |
138 } | |
443 | 139 } |
480 | 140 |
141 public void setReverseKey(String name) { | |
142 this.reverseKey = name; | |
143 setName(name+"-IncomingTcp"); | |
144 } | |
345 | 145 } |