# HG changeset patch # User sugi # Date 1400136262 -32400 # Node ID 6cf08aebfc31c048259806327d41fd70c17c550c # Parent 11ba40caa93ba580707bd77a8467a7fb1b44a697 add MulticastConnection and DataSegmentManager diff -r 11ba40caa93b -r 6cf08aebfc31 src/main/java/alice/daemon/MulticastConnection.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/alice/daemon/MulticastConnection.java Thu May 15 15:44:22 2014 +0900 @@ -0,0 +1,45 @@ +package alice.daemon; + +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; + +import alice.codesegment.SingletonMessage; +import alice.datasegment.Command; + +public class MulticastConnection extends Connection { + private DatagramChannel dc; + private SocketAddress sAddr; + + + public MulticastConnection(DatagramChannel d, SocketAddress s) { + dc = d; + sAddr = s; + } + + @Override + public synchronized void write(Command cmd){ + CommandMessage cmdMsg = cmd.convert(); + ByteBuffer buffer; + try { + buffer = ByteBuffer.wrap(SingletonMessage.getInstance().write(cmdMsg)); + while (buffer.hasRemaining()){ + dc.send(buffer, sAddr); + } + } catch (IOException e) { + e.printStackTrace(); + } + + } + + @Override + public void close(){ + try { + dc.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + +} diff -r 11ba40caa93b -r 6cf08aebfc31 src/main/java/alice/datasegment/MulticastDataSegmentManager.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/alice/datasegment/MulticastDataSegmentManager.java Thu May 15 15:44:22 2014 +0900 @@ -0,0 +1,106 @@ +package alice.datasegment; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.StandardProtocolFamily; +import java.net.StandardSocketOptions; +import java.nio.channels.DatagramChannel; +import org.apache.log4j.Logger; + +import alice.codesegment.CodeSegment; +import alice.daemon.Connection; + +public class MulticastDataSegmentManager extends DataSegmentManager { + Connection connection; + Logger logger; + + public MulticastDataSegmentManager(final String MCASTADDR, final int port, final String nis) { + logger = Logger.getLogger("multicast"); + connection = new Connection(); + DatagramChannel dc = createDatagramChannel(MCASTADDR, port, nis); + + } + + private DatagramChannel createDatagramChannel(String MCASTADDR, int port, String nis) { + DatagramChannel dc = null; + NetworkInterface ni; + try { + ni = NetworkInterface.getByName(nis); + if (ni==null) { + System.err.println("Can't open network interface "+nis); + throw new IOException(); + } + if (!ni.supportsMulticast()) { + System.err.println("Network interface does not support multicast"+nis); + throw new IOException(); + } + + dc = DatagramChannel.open(StandardProtocolFamily.INET); + dc.setOption(StandardSocketOptions.SO_REUSEADDR, true); + dc.setOption(StandardSocketOptions.IP_MULTICAST_IF, ni); + InetAddress group = InetAddress.getByName(MCASTADDR); + dc.join(group, ni); + } catch (Exception e) { + e.printStackTrace(); + } + return dc; + } + + @Override + public void put(String key, Object val) { + + } + + @Override + public void update(String key, Object val) { + + } + + @Override + public void take(Receiver receiver, CodeSegment cs) {} + @Override + public void peek(Receiver receiver, CodeSegment cs) {} + + @Override + public void quickPut(String key, Object val) { + + } + + @Override + public void quickUpdate(String key, Object val) { + + } + + @Override + public void quickPeek(Receiver receiver, CodeSegment cs) {} + @Override + public void quickTake(Receiver receiver, CodeSegment cs) {} + + @Override + public void remove(String key) { + + } + + @Override + public void shutdown() { + + } + + @Override + public void close() { + + } + + @Override + public void finish() { + + } + + @Override + public void ping(String returnKey) {} + + @Override + public void response(String returnKey) {} + +}