view src/main/java/alice/daemon/IncomingUdpConnection.java @ 629:d0d1cd3dfc11 dispose

fix UDP conection
author Shinji KONO <kono@ie.u-ryukyu.ac.jp>
date Thu, 14 Sep 2017 20:04:48 +0900
parents 5a9b83c64ddf
children cbfdcecf7e3c
line wrap: on
line source

package alice.daemon;

import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;

import org.msgpack.MessagePack;
import org.msgpack.unpacker.Unpacker;

import alice.datasegment.Command;
import alice.datasegment.CommandType;
import alice.datasegment.DataSegment;
import alice.datasegment.DataSegmentManager;
import alice.datasegment.ReceiveData;
import alice.topology.manager.keeparive.RespondData;

public class IncomingUdpConnection extends IncomingTcpConnection {
    // receive Data set into LocalDataSegment now but need to set into MulticastDataSegment.
    // and this implement has problem. If over 65507 data receive, can not read.
    // but  Max data length is 65507 because of the max length of UDP payload

    public MulticastConnection receiver;
    public MulticastConnection sender;
    private static final MessagePack packer = new MessagePack();

    public IncomingUdpConnection(MulticastConnection s, MulticastConnection r, DataSegmentManager manager) {
        super(manager);
        receiver = r;
        sender = s;
        reverseKey = "multicast";
    }

    @Override
    public void run() {
        while (true){
            try {
                Command cmd = null;
                ReceiveData rData = null;
                // Max data length is 65507 because of the max length of UDP payload
                ByteBuffer receive = ByteBuffer.allocate(65507);
                receiver.receive(receive);
                Unpacker unpacker = packer.createBufferUnpacker(receive);
                receive.flip();
                CommandMessage msg = unpacker.read(CommandMessage.class);
                CommandType type = CommandType.getCommandTypeFromId(msg.type);
                int dataSize = unpacker.readInt();
                byte [] data = new byte[dataSize];
                switch (type) {
                case UPDATE:
                case PUT:
                    receive.get(data);
                    rData = new ReceiveData(data);
                    cmd = new Command(type, null, null, rData, 0, 0, null, null, reverseKey);

                    if (msg.compressed){
                        getCompressedLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd);
                    } else {
                        getLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd);
                    }
                    break;
                case PEEK:
                case TAKE:
                    cmd = new Command(type, null, null, null, msg.index, msg.seq, null, null ,sender);
                    cmd.setQuickFlag(msg.quickFlag);
                    cmd.setCompressFlag(msg.compressed);

                    if (msg.compressed) {
                        getCompressedLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd);
                    } else {
                        getLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd);
                    }
                    break;
                case REMOVE:
                    cmd = new Command(type, null, null, null, 0, 0, null, null, "");
                    getLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd);
                    break;
                case REPLY:
                    cmd = manager.getAndRemoveCmd(msg.seq);
                    receive.get(data);
                    rData = new ReceiveData(data);

                    Command rCmd = new Command(type, null, null, rData, msg.index, msg.seq, null, null, "");
                    cmd.cs.ids.reply(cmd.receiver, rCmd);
                    break;
                case PING:
                    if (DataSegment.contains(reverseKey))
                        DataSegment.get(reverseKey).response(msg.key);
                    break;
                case RESPONSE:
                    rData = new ReceiveData(new RespondData(reverseKey, System.currentTimeMillis()));
                    DataSegment.getLocal().put(msg.key, rData, false);
                    break;
                default:
                    break;
                }

            } catch (ClosedChannelException e) {
                return;
            } catch (EOFException e) {
                return;
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

}