# HG changeset patch # User sugi # Date 1400146170 -32400 # Node ID 60eee1fb0fd326259e0c338200803cf70b75789b # Parent 6cf08aebfc31c048259806327d41fd70c17c550c create sender with udp diff -r 6cf08aebfc31 -r 60eee1fb0fd3 src/main/java/alice/daemon/IncomingUdpConnection.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/alice/daemon/IncomingUdpConnection.java Thu May 15 18:29:30 2014 +0900 @@ -0,0 +1,20 @@ +package alice.daemon; + +import org.msgpack.unpacker.Unpacker; + +import alice.codesegment.SingletonMessage; + +public class IncomingUdpConnection extends IncomingTcpConnection { + public MulticastConnection mConnection; + + public IncomingUdpConnection(MulticastConnection mc) { + super(null, null, "multicast"); + mConnection = mc; + } + + private Unpacker getUnpacker() { + //Unpacker unpacker = SingletonMessage.getInstance().createUnpacker(); + return null; + + } +} diff -r 6cf08aebfc31 -r 60eee1fb0fd3 src/main/java/alice/daemon/MulticastConnection.java --- a/src/main/java/alice/daemon/MulticastConnection.java Thu May 15 15:44:22 2014 +0900 +++ b/src/main/java/alice/daemon/MulticastConnection.java Thu May 15 18:29:30 2014 +0900 @@ -12,12 +12,12 @@ private DatagramChannel dc; private SocketAddress sAddr; - public MulticastConnection(DatagramChannel d, SocketAddress s) { dc = d; sAddr = s; } + // may need to add infomation who send on ds. @Override public synchronized void write(Command cmd){ CommandMessage cmdMsg = cmd.convert(); @@ -41,5 +41,13 @@ e.printStackTrace(); } } + + public void receive(ByteBuffer receiveData){ + try { + dc.receive(receiveData); + } catch (IOException e) { + e.printStackTrace(); + } + } } diff -r 6cf08aebfc31 -r 60eee1fb0fd3 src/main/java/alice/daemon/OutboundTcpConnection.java --- a/src/main/java/alice/daemon/OutboundTcpConnection.java Thu May 15 15:44:22 2014 +0900 +++ b/src/main/java/alice/daemon/OutboundTcpConnection.java Thu May 15 18:29:30 2014 +0900 @@ -1,6 +1,5 @@ package alice.daemon; -import java.io.IOException; import alice.datasegment.Command; public class OutboundTcpConnection extends Thread { @@ -11,8 +10,6 @@ this.connection = connection; } - - /** * pipeline thread for transmission */ @@ -22,7 +19,7 @@ Command cmd = connection.sendQueue.take(); switch (cmd.type) { case CLOSE: - connection.socket.close(); + connection.close(); return; case FINISH: System.exit(0); @@ -33,8 +30,6 @@ connection.write(cmd); } catch (InterruptedException e) { e.printStackTrace(); - } catch (IOException e) { - e.printStackTrace(); } } } diff -r 6cf08aebfc31 -r 60eee1fb0fd3 src/main/java/alice/datasegment/MulticastDataSegmentManager.java --- a/src/main/java/alice/datasegment/MulticastDataSegmentManager.java Thu May 15 15:44:22 2014 +0900 +++ b/src/main/java/alice/datasegment/MulticastDataSegmentManager.java Thu May 15 18:29:30 2014 +0900 @@ -2,27 +2,48 @@ import java.io.IOException; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.NetworkInterface; +import java.net.SocketAddress; import java.net.StandardProtocolFamily; import java.net.StandardSocketOptions; import java.nio.channels.DatagramChannel; + import org.apache.log4j.Logger; import alice.codesegment.CodeSegment; -import alice.daemon.Connection; +import alice.daemon.IncomingUdpConnection; +import alice.daemon.MulticastConnection; +import alice.daemon.OutboundTcpConnection; public class MulticastDataSegmentManager extends DataSegmentManager { - Connection connection; + MulticastConnection sender; Logger logger; public MulticastDataSegmentManager(final String MCASTADDR, final int port, final String nis) { logger = Logger.getLogger("multicast"); - connection = new Connection(); - DatagramChannel dc = createDatagramChannel(MCASTADDR, port, nis); + InetAddress mAddr; + try { + mAddr = InetAddress.getByName(MCASTADDR); + + DatagramChannel dcr = createDatagramChannel(mAddr, port, nis); + dcr.bind(new InetSocketAddress(port)); + SocketAddress sAddrr = new InetSocketAddress(mAddr,port); + MulticastConnection receiver = new MulticastConnection(dcr, sAddrr); + new IncomingUdpConnection(receiver).start(); + + DatagramChannel dcs = createDatagramChannel(mAddr, port, nis); + SocketAddress sAddrs = new InetSocketAddress(mAddr,port); + sender = new MulticastConnection(dcs, sAddrs); + new OutboundTcpConnection(sender).start(); // OutboundUdpConnection sender + + } catch (Exception e) { + e.printStackTrace(); + } } - private DatagramChannel createDatagramChannel(String MCASTADDR, int port, String nis) { + private DatagramChannel createDatagramChannel(InetAddress group, int port, String nis) { DatagramChannel dc = null; NetworkInterface ni; try { @@ -39,7 +60,6 @@ dc = DatagramChannel.open(StandardProtocolFamily.INET); dc.setOption(StandardSocketOptions.SO_REUSEADDR, true); dc.setOption(StandardSocketOptions.IP_MULTICAST_IF, ni); - InetAddress group = InetAddress.getByName(MCASTADDR); dc.join(group, ni); } catch (Exception e) { e.printStackTrace();