changeset 361:60eee1fb0fd3 multicast

create sender with udp
author sugi
date Thu, 15 May 2014 18:29:30 +0900
parents 6cf08aebfc31
children 5b706d682cba
files src/main/java/alice/daemon/IncomingUdpConnection.java src/main/java/alice/daemon/MulticastConnection.java src/main/java/alice/daemon/OutboundTcpConnection.java src/main/java/alice/datasegment/MulticastDataSegmentManager.java
diffstat 4 files changed, 56 insertions(+), 13 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/alice/daemon/IncomingUdpConnection.java	Thu May 15 18:29:30 2014 +0900
@@ -0,0 +1,20 @@
+package alice.daemon;
+
+import org.msgpack.unpacker.Unpacker;
+
+import alice.codesegment.SingletonMessage;
+
+public class IncomingUdpConnection extends IncomingTcpConnection {
+	public MulticastConnection mConnection;
+
+	public IncomingUdpConnection(MulticastConnection mc) {
+		super(null, null, "multicast");
+		mConnection = mc;
+	}
+	
+	private Unpacker getUnpacker() {
+		//Unpacker unpacker = SingletonMessage.getInstance().createUnpacker(); 
+		return null;
+		
+	}
+}
--- a/src/main/java/alice/daemon/MulticastConnection.java	Thu May 15 15:44:22 2014 +0900
+++ b/src/main/java/alice/daemon/MulticastConnection.java	Thu May 15 18:29:30 2014 +0900
@@ -12,12 +12,12 @@
 	private DatagramChannel dc;
 	private SocketAddress sAddr;
 
-
 	public MulticastConnection(DatagramChannel d, SocketAddress s) {
 		dc = d;
 		sAddr = s;
 	}
 
+	// may need to add infomation who send on ds.
 	@Override
 	public synchronized void write(Command cmd){
 		CommandMessage cmdMsg = cmd.convert();
@@ -41,5 +41,13 @@
 			e.printStackTrace();
 		}
 	}
+	
+	public void receive(ByteBuffer receiveData){
+		try {
+			dc.receive(receiveData);
+		} catch (IOException e) {
+			e.printStackTrace();
+		}
+	}
 
 }
--- a/src/main/java/alice/daemon/OutboundTcpConnection.java	Thu May 15 15:44:22 2014 +0900
+++ b/src/main/java/alice/daemon/OutboundTcpConnection.java	Thu May 15 18:29:30 2014 +0900
@@ -1,6 +1,5 @@
 package alice.daemon;
 
-import java.io.IOException;
 import alice.datasegment.Command;
 
 public class OutboundTcpConnection extends Thread {
@@ -11,8 +10,6 @@
 		this.connection = connection;
 	}
 	
-	
-	
 	/**
 	 * pipeline thread for transmission
 	 */
@@ -22,7 +19,7 @@
 				Command cmd = connection.sendQueue.take();
 				switch (cmd.type) {
 				case CLOSE:
-					connection.socket.close();
+					connection.close();
 					return;
 				case FINISH:
 					System.exit(0);
@@ -33,8 +30,6 @@
 				connection.write(cmd);
 			} catch (InterruptedException e) {
 				e.printStackTrace();
-			} catch (IOException e) {
-				e.printStackTrace();
 			}
 		}
 	}
--- a/src/main/java/alice/datasegment/MulticastDataSegmentManager.java	Thu May 15 15:44:22 2014 +0900
+++ b/src/main/java/alice/datasegment/MulticastDataSegmentManager.java	Thu May 15 18:29:30 2014 +0900
@@ -2,27 +2,48 @@
 
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.net.NetworkInterface;
+import java.net.SocketAddress;
 import java.net.StandardProtocolFamily;
 import java.net.StandardSocketOptions;
 import java.nio.channels.DatagramChannel;
+
 import org.apache.log4j.Logger;
 
 import alice.codesegment.CodeSegment;
-import alice.daemon.Connection;
+import alice.daemon.IncomingUdpConnection;
+import alice.daemon.MulticastConnection;
+import alice.daemon.OutboundTcpConnection;
 
 public class MulticastDataSegmentManager extends DataSegmentManager {
-	Connection connection;
+	MulticastConnection sender;
 	Logger logger;
 	
 	public MulticastDataSegmentManager(final String MCASTADDR, final int port, final String nis) {
 		logger = Logger.getLogger("multicast");
-		connection = new Connection();
-		DatagramChannel dc =  createDatagramChannel(MCASTADDR, port, nis);
+		InetAddress mAddr;
+		try {
+			mAddr = InetAddress.getByName(MCASTADDR);
+
+			DatagramChannel dcr =  createDatagramChannel(mAddr, port, nis);
+			dcr.bind(new InetSocketAddress(port));
+			SocketAddress sAddrr = new InetSocketAddress(mAddr,port);
+			MulticastConnection receiver = new MulticastConnection(dcr, sAddrr);
+			new IncomingUdpConnection(receiver).start();
+			
+			DatagramChannel dcs =  createDatagramChannel(mAddr, port, nis);
+			SocketAddress sAddrs = new InetSocketAddress(mAddr,port);
+			sender = new MulticastConnection(dcs, sAddrs);
+			new OutboundTcpConnection(sender).start(); // OutboundUdpConnection sender
+			
+		} catch (Exception e) {
+			e.printStackTrace();
+		}
 		
 	}
 	
-	private DatagramChannel createDatagramChannel(String MCASTADDR, int port, String nis) {
+	private DatagramChannel createDatagramChannel(InetAddress group, int port, String nis) {
 		DatagramChannel dc = null;
 		NetworkInterface ni;
 		try {
@@ -39,7 +60,6 @@
 			dc = DatagramChannel.open(StandardProtocolFamily.INET);
 			dc.setOption(StandardSocketOptions.SO_REUSEADDR, true);
 			dc.setOption(StandardSocketOptions.IP_MULTICAST_IF, ni);
-			InetAddress group = InetAddress.getByName(MCASTADDR);
 			dc.join(group, ni);
 		} catch (Exception e) {
 			e.printStackTrace();