view src/main/java/alice/daemon/IncomingUdpConnection.java @ 369:0c24894db37e multicast

MulticastDataSegment's extends change from LocalDataSegment to RemoteDataSegment
author sugi
date Tue, 20 May 2014 15:00:21 +0900
parents 8072df9130c6
children 2f2623484b77
line wrap: on
line source

package alice.daemon;

import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;

import org.msgpack.unpacker.Unpacker;

import alice.codesegment.SingletonMessage;
import alice.datasegment.Command;
import alice.datasegment.CommandType;
import alice.datasegment.DataSegment;
import alice.datasegment.DataSegmentManager;
import alice.topology.manager.keeparive.RespondData;

public class IncomingUdpConnection extends IncomingTcpConnection {
	// receive Data set into LocalDataSegment now but need to set into MulticastDataSegment.
	// and this implement has problem. If over 4096 data receive, can not read.
	
	public MulticastConnection receiver;
	public MulticastConnection sender;

	public IncomingUdpConnection(MulticastConnection s, MulticastConnection r, DataSegmentManager manager) {
		super(manager);
		receiver = r;
		sender = s;
	}
	
	@Override
	public void run() {
		while (true){
			try {				
				ByteBuffer receive = ByteBuffer.allocate(4096); 
				receiver.receive(receive);
				Unpacker unpacker = SingletonMessage.getInstance().createBufferUnpacker(receive);
				receive.flip();
				CommandMessage msg = unpacker.read(CommandMessage.class);
				CommandType type = CommandType.getCommandTypeFromId(msg.type);
				switch (type) {
				case UPDATE:
					getLocalDataSegmentManager().getDataSegmentKey(msg.key)
						.runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey));
					break;
				case PUT:
					getLocalDataSegmentManager().getDataSegmentKey(msg.key)
						.runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey));
					break;
				case PEEK:
					getLocalDataSegmentManager().getDataSegmentKey(msg.key)
						.runCommand(new Command(type, null, null, null, msg.index, msg.seq, sender, null, null, msg.flag));
					break;
				case TAKE:
					getLocalDataSegmentManager().getDataSegmentKey(msg.key)
						.runCommand(new Command(type, null, null, null, msg.index, msg.seq, sender, null, null, msg.flag));
					break;	
				case REMOVE:
					getLocalDataSegmentManager().getDataSegmentKey(msg.key)
						.runCommand(new Command(type, null, null, null, 0, 0, null, null, null));
					break;
				case REPLY:
					Command cmd = manager.getAndRemoveCmd(msg.seq);
					cmd.cs.ids.reply(cmd.receiver, new Command(type, null, null, msg.val, msg.index, msg.seq, null, null, null));
					cmd=null;
					break;
				case PING:
					DataSegment.get(reverseKey).response(msg.key);
					break;
				case RESPONSE:
					DataSegment.getLocal().put(msg.key, new RespondData(reverseKey, System.currentTimeMillis()));
					break;
				default:
					break;
				}
				
			} catch (ClosedChannelException e) {
				return;
			} catch (EOFException e) {
				return;
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}

}