Mercurial > hg > Database > Alice
annotate src/main/java/alice/daemon/IncomingTcpConnection.java @ 531:b6049fb123d8 dispose
resolve unzip, working TestRemoteAlice
author | Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp> |
---|---|
date | Sun, 03 May 2015 15:58:31 +0900 |
parents | 4aeebea0c9b5 |
children | 33f981dd91d2 |
rev | line source |
---|---|
345 | 1 package alice.daemon; |
2 | |
3 import java.io.EOFException; | |
4 import java.io.IOException; | |
5 import java.nio.channels.ClosedChannelException; | |
6 | |
527 | 7 import alice.datasegment.*; |
528
6ebddfac7ff6
delete RecieveData.setCompressFlag
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
527
diff
changeset
|
8 import org.msgpack.MessagePack; |
345 | 9 import org.msgpack.unpacker.Unpacker; |
10 | |
11 import alice.topology.manager.keeparive.RespondData; | |
12 | |
13 public class IncomingTcpConnection extends Thread { | |
419 | 14 |
480 | 15 private Connection connection; |
16 protected DataSegmentManager manager; | |
17 protected String reverseKey; | |
419 | 18 private LocalDataSegmentManager lmanager = DataSegment.getLocal(); |
529
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
19 private CompressedLocalDataSegmentManager compressedlmanager = DataSegment.getCompressedLocal(); |
528
6ebddfac7ff6
delete RecieveData.setCompressFlag
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
527
diff
changeset
|
20 private static final MessagePack packer = new MessagePack(); |
419 | 21 |
22 public IncomingTcpConnection(DataSegmentManager manager) { | |
23 this.manager = manager; | |
24 } | |
25 | |
26 public IncomingTcpConnection(Connection connection, DataSegmentManager manager, String reverseKey) { | |
27 this.manager = manager; | |
28 this.connection = connection; | |
29 this.reverseKey = reverseKey; | |
30 } | |
31 | |
32 public LocalDataSegmentManager getLocalDataSegmentManager(){ | |
33 return lmanager; | |
34 } | |
345 | 35 |
529
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
36 public CompressedLocalDataSegmentManager getCompressedLocalDataSegmentManager(){ |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
37 return compressedlmanager; |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
38 } |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
39 |
419 | 40 /** |
41 * pipeline thread for receiving | |
42 */ | |
43 public void run() { | |
44 Unpacker unpacker = null; | |
45 try { | |
528
6ebddfac7ff6
delete RecieveData.setCompressFlag
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
527
diff
changeset
|
46 unpacker = packer.createUnpacker(connection.socket.getInputStream()); |
419 | 47 } catch (IOException e) { |
48 e.printStackTrace(); | |
49 } | |
50 if (unpacker == null) { | |
51 return; | |
52 } | |
53 while (true) { | |
54 try { | |
447 | 55 Command cmd = null; |
458 | 56 ReceiveData rData = null; |
530 | 57 CommandMessage msg = unpacker.read(CommandMessage.class);///read header |
419 | 58 CommandType type = CommandType.getCommandTypeFromId(msg.type); |
59 switch (type) { | |
60 case UPDATE: | |
61 case PUT: | |
526
928907206d21
remove CompressedRDSM & CompressedLDSM class
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
525
diff
changeset
|
62 System.out.println("in TCP PUT"); |
531
b6049fb123d8
resolve unzip, working TestRemoteAlice
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
530
diff
changeset
|
63 rData = new ReceiveData(packer.read(unpacker.getSerializedByteArray(unpacker.readInt()), byte[].class), msg.compressed, msg.dataSize);///read rData |
526
928907206d21
remove CompressedRDSM & CompressedLDSM class
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
525
diff
changeset
|
64 |
488 | 65 if (msg.setTime) { |
66 rData.setTime = true; | |
67 rData.time = msg.time; | |
68 rData.depth = msg.depth; | |
69 } | |
526
928907206d21
remove CompressedRDSM & CompressedLDSM class
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
525
diff
changeset
|
70 |
458 | 71 cmd = new Command(type, null, null, rData, 0, 0, null, null, reverseKey); |
530 | 72 cmd.setCompressFlag(msg.compressed); |
526
928907206d21
remove CompressedRDSM & CompressedLDSM class
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
525
diff
changeset
|
73 |
530 | 74 if (rData.compressed()){ |
75 compressedlmanager.getDataSegmentKey(msg.key).runCommand(cmd); | |
76 } else { | |
77 lmanager.getDataSegmentKey(msg.key).runCommand(cmd); | |
78 } | |
526
928907206d21
remove CompressedRDSM & CompressedLDSM class
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
525
diff
changeset
|
79 |
419 | 80 break; |
81 case PEEK: | |
82 case TAKE: | |
531
b6049fb123d8
resolve unzip, working TestRemoteAlice
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
530
diff
changeset
|
83 System.out.println("in TCP TAKE"); |
446 | 84 cmd = new Command(type, null, null, null, msg.index, msg.seq, null, null, connection); |
530 | 85 cmd.setCompressFlag(msg.compressed); |
526
928907206d21
remove CompressedRDSM & CompressedLDSM class
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
525
diff
changeset
|
86 |
530 | 87 if (msg.compressed){ |
88 compressedlmanager.getDataSegmentKey(msg.key).runCommand(cmd); | |
89 } else { | |
90 lmanager.getDataSegmentKey(msg.key).runCommand(cmd); | |
91 } | |
526
928907206d21
remove CompressedRDSM & CompressedLDSM class
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
525
diff
changeset
|
92 |
467 | 93 break; |
419 | 94 case REMOVE: |
446 | 95 cmd = new Command(type, null, null, null, 0, 0, null, null, ""); |
530 | 96 lmanager.getDataSegmentKey(msg.key).runCommand(cmd);//ToDo:fix |
419 | 97 break; |
98 case REPLY: | |
531
b6049fb123d8
resolve unzip, working TestRemoteAlice
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
530
diff
changeset
|
99 System.out.println("in TCP REPLY"); |
446 | 100 cmd = manager.getAndRemoveCmd(msg.seq); |
526
928907206d21
remove CompressedRDSM & CompressedLDSM class
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
525
diff
changeset
|
101 |
531
b6049fb123d8
resolve unzip, working TestRemoteAlice
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
530
diff
changeset
|
102 rData = new ReceiveData(packer.read(unpacker.getSerializedByteArray(unpacker.readInt()), byte[].class), msg.compressed, msg.dataSize); |
526
928907206d21
remove CompressedRDSM & CompressedLDSM class
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
525
diff
changeset
|
103 |
467 | 104 Command rCmd = new Command(type, null, null, rData, msg.index, msg.seq, null, null, ""); |
530 | 105 cmd.setCompressFlag(msg.compressed); |
452 | 106 cmd.cs.ids.reply(cmd.receiver, rCmd); |
419 | 107 break; |
108 case PING: | |
471
be0b61986ff7
checking having DataSegmentManger before get DataSegmentManager
sugi
parents:
467
diff
changeset
|
109 if (DataSegment.contains(reverseKey)) |
be0b61986ff7
checking having DataSegmentManger before get DataSegmentManager
sugi
parents:
467
diff
changeset
|
110 DataSegment.get(reverseKey).response(msg.key); |
419 | 111 break; |
112 case RESPONSE: | |
525
30a74eee59c7
working TestRemoteAlice
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
524
diff
changeset
|
113 rData = new ReceiveData(new RespondData(reverseKey, System.currentTimeMillis())); |
527 | 114 DataSegment.getLocal().put(msg.key, rData, null); |
419 | 115 break; |
116 default: | |
117 break; | |
118 } | |
119 } catch (ClosedChannelException e) { | |
120 return; | |
121 } catch (EOFException e) { | |
122 return; | |
123 } catch (IOException e) { | |
478 | 124 return; |
419 | 125 } |
126 } | |
443 | 127 } |
480 | 128 |
129 public void setReverseKey(String name) { | |
130 this.reverseKey = name; | |
131 setName(name+"-IncomingTcp"); | |
132 } | |
345 | 133 } |