view src/main/java/alice/datasegment/MulticastDataSegmentManager.java @ 366:abc54fa0c81b multicast

MulticastDataSegment's extend class change from DataSegmentManager from LocalDataSegmentManager
author sugi
date Sat, 17 May 2014 21:34:01 +0900
parents 1494d44392a2
children 0c24894db37e
line wrap: on
line source

package alice.datasegment;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.SocketAddress;
import java.net.StandardProtocolFamily;
import java.net.StandardSocketOptions;
import java.nio.channels.DatagramChannel;

import org.apache.log4j.Logger;

import alice.daemon.IncomingUdpConnection;
import alice.daemon.MulticastConnection;
import alice.daemon.OutboundTcpConnection;

public class MulticastDataSegmentManager extends LocalDataSegmentManager {
	private MulticastConnection sender;
	private Logger logger;
	
	public MulticastDataSegmentManager(String connectionKey ,final String MCASTADDR, final int port, final String nis) {
		super.setReverseKey(connectionKey);
		logger = Logger.getLogger(connectionKey);
		InetAddress mAddr;
		try {
			mAddr = InetAddress.getByName(MCASTADDR);

			DatagramChannel dcr =  createDatagramChannel(mAddr, port, nis);
			dcr.bind(new InetSocketAddress(port));
			SocketAddress sAddrr = new InetSocketAddress(mAddr,port);
			MulticastConnection receiver = new MulticastConnection(dcr, sAddrr);
			new IncomingUdpConnection(receiver).start();
			
			DatagramChannel dcs =  createDatagramChannel(mAddr, port, nis);
			SocketAddress sAddrs = new InetSocketAddress(mAddr,port);
			sender = new MulticastConnection(dcs, sAddrs);
			new OutboundTcpConnection(sender).start(); // OutboundUdpConnection sender
			
		} catch (Exception e) {
			e.printStackTrace();
		}
		
	}
	
	private DatagramChannel createDatagramChannel(InetAddress group, int port, String nis) {
		DatagramChannel dc = null;
		NetworkInterface ni;
		try {
			ni = NetworkInterface.getByName(nis);
			if (ni==null) {
				System.err.println("Can't open network interface "+nis);
				throw new IOException();
			}
			if (!ni.supportsMulticast()) {
				System.err.println("Network interface does not support multicast"+nis);
				throw new IOException();
			}
			
			dc = DatagramChannel.open(StandardProtocolFamily.INET);
			dc.setOption(StandardSocketOptions.SO_REUSEADDR, true);
			dc.setOption(StandardSocketOptions.IP_MULTICAST_IF, ni);
			dc.join(group, ni);
		} catch (Exception e) {
			e.printStackTrace();
		}
		return dc;
	}
	
	@Override
	public void put(String key, Object val) {
		Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null);
		sender.sendCommand(cmd); // put command on the transmission thread
		if (logger.isDebugEnabled())
			logger.debug(cmd.getCommandString());
	}

	@Override
	public void update(String key, Object val) {
		Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null);
		sender.sendCommand(cmd);
		if (logger.isDebugEnabled())
			logger.debug(cmd.getCommandString());
	}

	@Override
	public void quickPut(String key, Object val) {
		Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null);
		sender.write(cmd); // put command is executed right now
	}

	@Override
	public void quickUpdate(String key, Object val) {
		Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null);
		sender.write(cmd);
	}

}