Mercurial > hg > Members > tatsuki > Alice
comparison src/main/java/alice/daemon/IncomingUdpConnection.java @ 365:8072df9130c6 multicast
IncomingUdpConnection have to improve. it same IncommingTcpConnection
author | sugi |
---|---|
date | Sat, 17 May 2014 21:32:24 +0900 |
parents | 1494d44392a2 |
children | 0c24894db37e |
comparison
equal
deleted
inserted
replaced
364:1494d44392a2 | 365:8072df9130c6 |
---|---|
1 package alice.daemon; | 1 package alice.daemon; |
2 | 2 |
3 import java.io.EOFException; | |
3 import java.io.IOException; | 4 import java.io.IOException; |
4 import java.nio.ByteBuffer; | 5 import java.nio.ByteBuffer; |
6 import java.nio.channels.ClosedChannelException; | |
5 | 7 |
6 import org.msgpack.unpacker.Unpacker; | 8 import org.msgpack.unpacker.Unpacker; |
7 | 9 |
8 import alice.codesegment.SingletonMessage; | 10 import alice.codesegment.SingletonMessage; |
9 import alice.datasegment.Command; | 11 import alice.datasegment.Command; |
10 import alice.datasegment.CommandType; | 12 import alice.datasegment.CommandType; |
13 import alice.datasegment.DataSegment; | |
14 import alice.topology.HostMessage; | |
15 import alice.topology.manager.keeparive.RespondData; | |
16 import alice.topology.manager.reconnection.SendError; | |
11 | 17 |
12 public class IncomingUdpConnection extends IncomingTcpConnection { | 18 public class IncomingUdpConnection extends IncomingTcpConnection { |
13 public MulticastConnection mConnection; | 19 // receive Data set into LocalDataSegment now but need to set into MulticastDataSegment. |
20 // and this implement has problem. If over 4096 data receive, can not read. | |
21 | |
22 public MulticastConnection receiver; | |
14 | 23 |
15 public IncomingUdpConnection(MulticastConnection mc) { | 24 public IncomingUdpConnection(MulticastConnection mc) { |
16 super(null, null, "multicast"); | 25 super(null, null, "multicast"); |
17 mConnection = mc; | 26 receiver = mc; |
18 } | 27 } |
19 | 28 |
20 @Override | 29 @Override |
21 public void run() { | 30 public void run() { |
22 ByteBuffer receive = ByteBuffer.allocate(4096); | |
23 while (true){ | 31 while (true){ |
24 try { | 32 try { |
25 mConnection.receive(receive); | 33 ByteBuffer receive = ByteBuffer.allocate(4096); |
34 receiver.receive(receive); | |
26 Unpacker unpacker = SingletonMessage.getInstance().createBufferUnpacker(receive); | 35 Unpacker unpacker = SingletonMessage.getInstance().createBufferUnpacker(receive); |
27 receive.flip(); | 36 receive.flip(); |
28 CommandMessage msg = unpacker.read(CommandMessage.class); | 37 CommandMessage msg = unpacker.read(CommandMessage.class); |
29 CommandType type = CommandType.getCommandTypeFromId(msg.type); | 38 CommandType type = CommandType.getCommandTypeFromId(msg.type); |
30 switch (type){ | 39 System.out.println(msg.val); |
40 switch (type) { | |
31 case UPDATE: | 41 case UPDATE: |
32 lmanager.getDataSegmentKey(msg.key) | 42 getLocalDataSegmentManager().getDataSegmentKey(msg.key) |
33 .runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); | 43 .runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); |
34 break; | 44 break; |
35 case PUT: | 45 case PUT: |
36 lmanager.getDataSegmentKey(msg.key) | 46 getLocalDataSegmentManager().getDataSegmentKey(msg.key) |
37 .runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); | 47 .runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); |
48 break; | |
49 case PEEK: | |
50 getLocalDataSegmentManager().getDataSegmentKey(msg.key) | |
51 .runCommand(new Command(type, null, null, null, msg.index, msg.seq, connection, null, null, msg.flag)); | |
52 break; | |
53 case TAKE: | |
54 getLocalDataSegmentManager().getDataSegmentKey(msg.key) | |
55 .runCommand(new Command(type, null, null, null, msg.index, msg.seq, connection, null, null, msg.flag)); | |
56 break; | |
57 case REMOVE: | |
58 getLocalDataSegmentManager().getDataSegmentKey(msg.key) | |
59 .runCommand(new Command(type, null, null, null, 0, 0, null, null, null)); | |
60 break; | |
61 case REPLY: | |
62 Command cmd = manager.getAndRemoveCmd(msg.seq); | |
63 cmd.cs.ids.reply(cmd.receiver, new Command(type, null, null, msg.val, msg.index, msg.seq, null, null, null)); | |
64 cmd=null; | |
65 break; | |
66 case PING: | |
67 DataSegment.get(reverseKey).response(msg.key); | |
68 break; | |
69 case RESPONSE: | |
70 DataSegment.getLocal().put(msg.key, new RespondData(reverseKey, System.currentTimeMillis())); | |
38 break; | 71 break; |
39 default: | 72 default: |
40 break; | 73 break; |
41 } | 74 } |
75 | |
76 } catch (ClosedChannelException e) { | |
77 connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null)); | |
78 return; | |
79 } catch (EOFException e) { | |
80 new SendError(new HostMessage(connection.socket.getInetAddress().getHostName(), connection.socket.getPort())).execute(); | |
81 connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null)); | |
82 return; | |
42 } catch (IOException e) { | 83 } catch (IOException e) { |
43 e.printStackTrace(); | 84 e.printStackTrace(); |
44 } | 85 } |
45 } | 86 } |
46 } | 87 } |