Mercurial > hg > Database > Alice
annotate src/main/java/alice/daemon/IncomingUdpConnection.java @ 523:145c425db88d dispose
add CompressedLDSM
author | Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp> |
---|---|
date | Thu, 09 Apr 2015 18:36:26 +0900 |
parents | 118e150ac9f3 |
children | 30a74eee59c7 |
rev | line source |
---|---|
361 | 1 package alice.daemon; |
2 | |
365
8072df9130c6
IncomingUdpConnection have to improve. it same IncommingTcpConnection
sugi
parents:
364
diff
changeset
|
3 import java.io.EOFException; |
364 | 4 import java.io.IOException; |
5 import java.nio.ByteBuffer; | |
365
8072df9130c6
IncomingUdpConnection have to improve. it same IncommingTcpConnection
sugi
parents:
364
diff
changeset
|
6 import java.nio.channels.ClosedChannelException; |
364 | 7 |
361 | 8 import org.msgpack.unpacker.Unpacker; |
9 | |
10 import alice.codesegment.SingletonMessage; | |
364 | 11 import alice.datasegment.Command; |
12 import alice.datasegment.CommandType; | |
365
8072df9130c6
IncomingUdpConnection have to improve. it same IncommingTcpConnection
sugi
parents:
364
diff
changeset
|
13 import alice.datasegment.DataSegment; |
369
0c24894db37e
MulticastDataSegment's extends change from LocalDataSegment to RemoteDataSegment
sugi
parents:
365
diff
changeset
|
14 import alice.datasegment.DataSegmentManager; |
458 | 15 import alice.datasegment.ReceiveData; |
365
8072df9130c6
IncomingUdpConnection have to improve. it same IncommingTcpConnection
sugi
parents:
364
diff
changeset
|
16 import alice.topology.manager.keeparive.RespondData; |
361 | 17 |
18 public class IncomingUdpConnection extends IncomingTcpConnection { | |
454 | 19 // receive Data set into LocalDataSegment now but need to set into MulticastDataSegment. |
20 // and this implement has problem. If over 65507 data receive, can not read. | |
443 | 21 // but Max data length is 65507 because of the max length of UDP payload |
454 | 22 |
23 public MulticastConnection receiver; | |
24 public MulticastConnection sender; | |
25 | |
26 public IncomingUdpConnection(MulticastConnection s, MulticastConnection r, DataSegmentManager manager) { | |
27 super(manager); | |
28 receiver = r; | |
29 sender = s; | |
30 reverseKey = "multicast"; | |
31 } | |
361 | 32 |
454 | 33 @Override |
34 public void run() { | |
35 while (true){ | |
36 try { | |
37 Command cmd = null; | |
458 | 38 ReceiveData rData = null; |
454 | 39 // Max data length is 65507 because of the max length of UDP payload |
467 | 40 ByteBuffer receive = ByteBuffer.allocate(65507); |
454 | 41 receiver.receive(receive); |
42 Unpacker unpacker = SingletonMessage.getInstance().createBufferUnpacker(receive); | |
43 receive.flip(); | |
44 CommandMessage msg = unpacker.read(CommandMessage.class); | |
45 CommandType type = CommandType.getCommandTypeFromId(msg.type); | |
46 switch (type) { | |
47 case UPDATE: | |
458 | 48 case PUT: |
510 | 49 rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()), msg.compressed, msg.serialized); |
458 | 50 cmd = new Command(type, null, null, rData, 0, 0, null, null, reverseKey); |
454 | 51 getLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd); |
52 break; | |
53 case PEEK: | |
54 case TAKE: | |
55 cmd = new Command(type, null, null, null, msg.index, msg.seq, null, null ,sender); | |
56 cmd.setQuickFlag(msg.quickFlag); | |
57 getLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd); | |
58 break; | |
59 case REMOVE: | |
60 cmd = new Command(type, null, null, null, 0, 0, null, null, ""); | |
61 getLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd); | |
62 break; | |
63 case REPLY: | |
64 cmd = manager.getAndRemoveCmd(msg.seq); | |
510 | 65 rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()), msg.compressed, msg.serialized); |
458 | 66 Command rCmd = new Command(type, null, null, rData, msg.index, msg.seq, null, null, ""); |
454 | 67 cmd.cs.ids.reply(cmd.receiver, rCmd); |
68 break; | |
69 case PING: | |
471
be0b61986ff7
checking having DataSegmentManger before get DataSegmentManager
sugi
parents:
467
diff
changeset
|
70 if (DataSegment.contains(reverseKey)) |
be0b61986ff7
checking having DataSegmentManger before get DataSegmentManager
sugi
parents:
467
diff
changeset
|
71 DataSegment.get(reverseKey).response(msg.key); |
454 | 72 break; |
73 case RESPONSE: | |
458 | 74 rData = new ReceiveData(new RespondData(reverseKey, System.currentTimeMillis()), false, false); |
523
145c425db88d
add CompressedLDSM
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
510
diff
changeset
|
75 DataSegment.getLocal().put(msg.key, rData, false); |
454 | 76 break; |
77 default: | |
78 break; | |
79 } | |
80 | |
81 } catch (ClosedChannelException e) { | |
82 return; | |
83 } catch (EOFException e) { | |
84 return; | |
85 } catch (IOException e) { | |
86 e.printStackTrace(); | |
87 } | |
88 } | |
89 } | |
364 | 90 |
361 | 91 } |