comparison src/main/java/alice/daemon/IncomingUdpConnection.java @ 365:8072df9130c6 multicast

IncomingUdpConnection have to improve. it same IncommingTcpConnection
author sugi
date Sat, 17 May 2014 21:32:24 +0900
parents 1494d44392a2
children 0c24894db37e
comparison
equal deleted inserted replaced
364:1494d44392a2 365:8072df9130c6
1 package alice.daemon; 1 package alice.daemon;
2 2
3 import java.io.EOFException;
3 import java.io.IOException; 4 import java.io.IOException;
4 import java.nio.ByteBuffer; 5 import java.nio.ByteBuffer;
6 import java.nio.channels.ClosedChannelException;
5 7
6 import org.msgpack.unpacker.Unpacker; 8 import org.msgpack.unpacker.Unpacker;
7 9
8 import alice.codesegment.SingletonMessage; 10 import alice.codesegment.SingletonMessage;
9 import alice.datasegment.Command; 11 import alice.datasegment.Command;
10 import alice.datasegment.CommandType; 12 import alice.datasegment.CommandType;
13 import alice.datasegment.DataSegment;
14 import alice.topology.HostMessage;
15 import alice.topology.manager.keeparive.RespondData;
16 import alice.topology.manager.reconnection.SendError;
11 17
12 public class IncomingUdpConnection extends IncomingTcpConnection { 18 public class IncomingUdpConnection extends IncomingTcpConnection {
13 public MulticastConnection mConnection; 19 // receive Data set into LocalDataSegment now but need to set into MulticastDataSegment.
20 // and this implement has problem. If over 4096 data receive, can not read.
21
22 public MulticastConnection receiver;
14 23
15 public IncomingUdpConnection(MulticastConnection mc) { 24 public IncomingUdpConnection(MulticastConnection mc) {
16 super(null, null, "multicast"); 25 super(null, null, "multicast");
17 mConnection = mc; 26 receiver = mc;
18 } 27 }
19 28
20 @Override 29 @Override
21 public void run() { 30 public void run() {
22 ByteBuffer receive = ByteBuffer.allocate(4096);
23 while (true){ 31 while (true){
24 try { 32 try {
25 mConnection.receive(receive); 33 ByteBuffer receive = ByteBuffer.allocate(4096);
34 receiver.receive(receive);
26 Unpacker unpacker = SingletonMessage.getInstance().createBufferUnpacker(receive); 35 Unpacker unpacker = SingletonMessage.getInstance().createBufferUnpacker(receive);
27 receive.flip(); 36 receive.flip();
28 CommandMessage msg = unpacker.read(CommandMessage.class); 37 CommandMessage msg = unpacker.read(CommandMessage.class);
29 CommandType type = CommandType.getCommandTypeFromId(msg.type); 38 CommandType type = CommandType.getCommandTypeFromId(msg.type);
30 switch (type){ 39 System.out.println(msg.val);
40 switch (type) {
31 case UPDATE: 41 case UPDATE:
32 lmanager.getDataSegmentKey(msg.key) 42 getLocalDataSegmentManager().getDataSegmentKey(msg.key)
33 .runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); 43 .runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey));
34 break; 44 break;
35 case PUT: 45 case PUT:
36 lmanager.getDataSegmentKey(msg.key) 46 getLocalDataSegmentManager().getDataSegmentKey(msg.key)
37 .runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); 47 .runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey));
48 break;
49 case PEEK:
50 getLocalDataSegmentManager().getDataSegmentKey(msg.key)
51 .runCommand(new Command(type, null, null, null, msg.index, msg.seq, connection, null, null, msg.flag));
52 break;
53 case TAKE:
54 getLocalDataSegmentManager().getDataSegmentKey(msg.key)
55 .runCommand(new Command(type, null, null, null, msg.index, msg.seq, connection, null, null, msg.flag));
56 break;
57 case REMOVE:
58 getLocalDataSegmentManager().getDataSegmentKey(msg.key)
59 .runCommand(new Command(type, null, null, null, 0, 0, null, null, null));
60 break;
61 case REPLY:
62 Command cmd = manager.getAndRemoveCmd(msg.seq);
63 cmd.cs.ids.reply(cmd.receiver, new Command(type, null, null, msg.val, msg.index, msg.seq, null, null, null));
64 cmd=null;
65 break;
66 case PING:
67 DataSegment.get(reverseKey).response(msg.key);
68 break;
69 case RESPONSE:
70 DataSegment.getLocal().put(msg.key, new RespondData(reverseKey, System.currentTimeMillis()));
38 break; 71 break;
39 default: 72 default:
40 break; 73 break;
41 } 74 }
75
76 } catch (ClosedChannelException e) {
77 connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null));
78 return;
79 } catch (EOFException e) {
80 new SendError(new HostMessage(connection.socket.getInetAddress().getHostName(), connection.socket.getPort())).execute();
81 connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null));
82 return;
42 } catch (IOException e) { 83 } catch (IOException e) {
43 e.printStackTrace(); 84 e.printStackTrace();
44 } 85 }
45 } 86 }
46 } 87 }