changeset 508:b7d02ea79850 dispose

change multicast Data Segment API
author sugi
date Sun, 04 Jan 2015 13:51:01 +0900
parents 28627bb3eeda
children 53d7cff1fe10
files src/main/java/alice/datasegment/DataSegment.java src/main/java/alice/datasegment/MulticastDataSegmentManager.java src/test/java/alice/daemon/MulticastStartCodeSegment.java src/test/java/alice/daemon/MulticastTest.java src/test/java/alice/daemon/ReceiveInteger.java src/test/java/alice/daemon/ReceiveString.java src/test/java/alice/daemon/UdpTest.java
diffstat 7 files changed, 104 insertions(+), 26 deletions(-) [+]
line wrap: on
line diff
--- a/src/main/java/alice/datasegment/DataSegment.java	Sat Jan 03 19:43:36 2015 +0900
+++ b/src/main/java/alice/datasegment/DataSegment.java	Sun Jan 04 13:51:01 2015 +0900
@@ -3,6 +3,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 
 import alice.daemon.IncomingTcpConnection;
+import alice.datasegment.MulticastDataSegmentManager.SocketType;
 
 public class DataSegment {
 
@@ -33,9 +34,10 @@
         return manager;
     }
 
-    public static MulticastDataSegmentManager connectMulticast(String connectionKey ,String MCSTADDR, int port, String nis){
-        MulticastDataSegmentManager manager = new MulticastDataSegmentManager(connectionKey ,MCSTADDR,  port, nis);
-        register(connectionKey, manager);
+    public static MulticastDataSegmentManager connectMulticast(String connectionKey ,String MCSTADDR, int port, String nis, SocketType type){
+        MulticastDataSegmentManager manager = new MulticastDataSegmentManager(connectionKey ,MCSTADDR,  port, nis, type);
+        if (type!=SocketType.Receiver)
+            register(connectionKey, manager);
         return manager;
     }
 
--- a/src/main/java/alice/datasegment/MulticastDataSegmentManager.java	Sat Jan 03 19:43:36 2015 +0900
+++ b/src/main/java/alice/datasegment/MulticastDataSegmentManager.java	Sun Jan 04 13:51:01 2015 +0900
@@ -17,7 +17,8 @@
 
 public class MulticastDataSegmentManager extends RemoteDataSegmentManager {
 
-    public MulticastDataSegmentManager(String connectionKey ,final String MCASTADDR, final int port, final String nis) {
+    public enum SocketType{Sender, Receiver, Both};
+    public MulticastDataSegmentManager(String connectionKey ,final String MCASTADDR, final int port, final String nis, SocketType type) {
         logger = Logger.getLogger(connectionKey);
         InetAddress mAddr;
         try {
@@ -32,13 +33,16 @@
             SocketAddress sAddrs = new InetSocketAddress(mAddr,port);
             connection = new MulticastConnection(dcs, sAddrs); // sender
 
-            IncomingUdpConnection in = new IncomingUdpConnection((MulticastConnection) connection, receiver, this);
-            in.setName("multicast-IncomingUdp");
-            in.start();
-            OutboundTcpConnection out = new OutboundTcpConnection(connection); // OutboundUdpConnection sender
-            out.setName(connectionKey+"OutboundUdp");
-            out.start();
-
+            if (type !=SocketType.Sender) {
+                IncomingUdpConnection in = new IncomingUdpConnection((MulticastConnection) connection, receiver, this);
+                in.setName("multicast-IncomingUdp");
+                in.start();
+            }
+            if (type !=SocketType.Receiver) {
+                OutboundTcpConnection out = new OutboundTcpConnection(connection); // OutboundUdpConnection sender
+                out.setName(connectionKey+"OutboundUdp");
+                out.start();
+            }
         } catch (Exception e) {
             e.printStackTrace();
         }
--- a/src/test/java/alice/daemon/MulticastStartCodeSegment.java	Sat Jan 03 19:43:36 2015 +0900
+++ b/src/test/java/alice/daemon/MulticastStartCodeSegment.java	Sun Jan 04 13:51:01 2015 +0900
@@ -1,15 +1,37 @@
 package alice.daemon;
 
 import alice.codesegment.CodeSegment;
+import alice.datasegment.DataSegment;
+import alice.datasegment.MulticastDataSegmentManager.SocketType;
 
 public class MulticastStartCodeSegment extends CodeSegment {
 
+    private Config conf;
+    public MulticastStartCodeSegment(Config conf) {
+        this.conf = conf;
+    }
+
     @Override
     public void run() {
-        MulticastIncrement cs = new MulticastIncrement();
-        cs.num.setKey("multicast","num");
+        DataSegment.connectMulticast("multicast",conf.MCSTADDR, conf.localPort, conf.nis, MulticastTest.type);
+        if (MulticastTest.type == SocketType.Both) {
+            // in this case "multicast" key mean local.
+            MulticastIncrement cs = new MulticastIncrement();
+            cs.num.setKey("multicast","num");
 
-        ods.put("multicast", "num", 0);
+            ods.put("multicast", "num", 0);
+        } else if (MulticastTest.type == SocketType.Receiver){
+            DataSegment.connectMulticast("multicast1", "224.0.0.2", conf.localPort+1, conf.nis, SocketType.Sender);
+            ReceiveInteger cs = new ReceiveInteger();
+            cs.num.setKey("num");
+        } else if (MulticastTest.type == SocketType.Sender){
+            DataSegment.connectMulticast("multicast1", "224.0.0.2", conf.localPort+1, conf.nis, SocketType.Receiver);
+            ReceiveString cs = new ReceiveString();
+            cs.str.setKey("str");
+
+            for (int i=0;i < 11; i++)
+                ods.put("multicast", "num", i);
+        }
     }
 
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/test/java/alice/daemon/MulticastTest.java	Sun Jan 04 13:51:01 2015 +0900
@@ -0,0 +1,12 @@
+package alice.daemon;
+
+import alice.datasegment.MulticastDataSegmentManager.SocketType;
+
+public class MulticastTest {
+    public static SocketType type = SocketType.Sender;
+    public static void main(String[] args){
+        Config conf = new Config(args);
+        new MulticastStartCodeSegment(conf).execute();
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/test/java/alice/daemon/ReceiveInteger.java	Sun Jan 04 13:51:01 2015 +0900
@@ -0,0 +1,33 @@
+package alice.daemon;
+
+import alice.codesegment.CodeSegment;
+import alice.datasegment.CommandType;
+import alice.datasegment.Receiver;
+
+public class ReceiveInteger extends CodeSegment{
+
+    public Receiver num = ids.create(CommandType.TAKE);
+
+    @Override
+    public void run() {
+        int num = this.num.asInteger();
+        System.out.println("[CodeSegment] "+num);
+
+        if (num==10) {
+            ods.put("multicast1", "str", "finish");
+            try {
+                synchronized (this) {
+                    wait(2000);
+                }
+            } catch (InterruptedException e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+            }
+            System.exit(0);
+        }
+        ReceiveInteger cs = new ReceiveInteger();
+        cs.num.setKey("num");
+
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/test/java/alice/daemon/ReceiveString.java	Sun Jan 04 13:51:01 2015 +0900
@@ -0,0 +1,17 @@
+package alice.daemon;
+
+import alice.codesegment.CodeSegment;
+import alice.datasegment.CommandType;
+import alice.datasegment.Receiver;
+
+public class ReceiveString extends CodeSegment {
+
+    public Receiver str = ids.create(CommandType.TAKE);
+
+    @Override
+    public void run() {
+        System.out.println(str.asString());
+        System.exit(0);
+    }
+
+}
--- a/src/test/java/alice/daemon/UdpTest.java	Sat Jan 03 19:43:36 2015 +0900
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,12 +0,0 @@
-package alice.daemon;
-
-import alice.datasegment.DataSegment;
-
-public class UdpTest {
-    public static void main(String[] args){
-        Config conf = new Config(args);
-        DataSegment.connectMulticast("multicast",conf.MCSTADDR, conf.localPort, conf.nis);
-        new MulticastStartCodeSegment().execute();
-    }
-
-}