# HG changeset patch # User sugi # Date 1420347061 -32400 # Node ID b7d02ea7985004e56b44c607d892030e0de96ccb # Parent 28627bb3eeda10673b81e503d4ef49016cd9c247 change multicast Data Segment API diff -r 28627bb3eeda -r b7d02ea79850 src/main/java/alice/datasegment/DataSegment.java --- a/src/main/java/alice/datasegment/DataSegment.java Sat Jan 03 19:43:36 2015 +0900 +++ b/src/main/java/alice/datasegment/DataSegment.java Sun Jan 04 13:51:01 2015 +0900 @@ -3,6 +3,7 @@ import java.util.concurrent.ConcurrentHashMap; import alice.daemon.IncomingTcpConnection; +import alice.datasegment.MulticastDataSegmentManager.SocketType; public class DataSegment { @@ -33,9 +34,10 @@ return manager; } - public static MulticastDataSegmentManager connectMulticast(String connectionKey ,String MCSTADDR, int port, String nis){ - MulticastDataSegmentManager manager = new MulticastDataSegmentManager(connectionKey ,MCSTADDR, port, nis); - register(connectionKey, manager); + public static MulticastDataSegmentManager connectMulticast(String connectionKey ,String MCSTADDR, int port, String nis, SocketType type){ + MulticastDataSegmentManager manager = new MulticastDataSegmentManager(connectionKey ,MCSTADDR, port, nis, type); + if (type!=SocketType.Receiver) + register(connectionKey, manager); return manager; } diff -r 28627bb3eeda -r b7d02ea79850 src/main/java/alice/datasegment/MulticastDataSegmentManager.java --- a/src/main/java/alice/datasegment/MulticastDataSegmentManager.java Sat Jan 03 19:43:36 2015 +0900 +++ b/src/main/java/alice/datasegment/MulticastDataSegmentManager.java Sun Jan 04 13:51:01 2015 +0900 @@ -17,7 +17,8 @@ public class MulticastDataSegmentManager extends RemoteDataSegmentManager { - public MulticastDataSegmentManager(String connectionKey ,final String MCASTADDR, final int port, final String nis) { + public enum SocketType{Sender, Receiver, Both}; + public MulticastDataSegmentManager(String connectionKey ,final String MCASTADDR, final int port, final String nis, SocketType type) { logger = Logger.getLogger(connectionKey); InetAddress mAddr; try { @@ -32,13 +33,16 @@ SocketAddress sAddrs = new InetSocketAddress(mAddr,port); connection = new MulticastConnection(dcs, sAddrs); // sender - IncomingUdpConnection in = new IncomingUdpConnection((MulticastConnection) connection, receiver, this); - in.setName("multicast-IncomingUdp"); - in.start(); - OutboundTcpConnection out = new OutboundTcpConnection(connection); // OutboundUdpConnection sender - out.setName(connectionKey+"OutboundUdp"); - out.start(); - + if (type !=SocketType.Sender) { + IncomingUdpConnection in = new IncomingUdpConnection((MulticastConnection) connection, receiver, this); + in.setName("multicast-IncomingUdp"); + in.start(); + } + if (type !=SocketType.Receiver) { + OutboundTcpConnection out = new OutboundTcpConnection(connection); // OutboundUdpConnection sender + out.setName(connectionKey+"OutboundUdp"); + out.start(); + } } catch (Exception e) { e.printStackTrace(); } diff -r 28627bb3eeda -r b7d02ea79850 src/test/java/alice/daemon/MulticastStartCodeSegment.java --- a/src/test/java/alice/daemon/MulticastStartCodeSegment.java Sat Jan 03 19:43:36 2015 +0900 +++ b/src/test/java/alice/daemon/MulticastStartCodeSegment.java Sun Jan 04 13:51:01 2015 +0900 @@ -1,15 +1,37 @@ package alice.daemon; import alice.codesegment.CodeSegment; +import alice.datasegment.DataSegment; +import alice.datasegment.MulticastDataSegmentManager.SocketType; public class MulticastStartCodeSegment extends CodeSegment { + private Config conf; + public MulticastStartCodeSegment(Config conf) { + this.conf = conf; + } + @Override public void run() { - MulticastIncrement cs = new MulticastIncrement(); - cs.num.setKey("multicast","num"); + DataSegment.connectMulticast("multicast",conf.MCSTADDR, conf.localPort, conf.nis, MulticastTest.type); + if (MulticastTest.type == SocketType.Both) { + // in this case "multicast" key mean local. + MulticastIncrement cs = new MulticastIncrement(); + cs.num.setKey("multicast","num"); - ods.put("multicast", "num", 0); + ods.put("multicast", "num", 0); + } else if (MulticastTest.type == SocketType.Receiver){ + DataSegment.connectMulticast("multicast1", "224.0.0.2", conf.localPort+1, conf.nis, SocketType.Sender); + ReceiveInteger cs = new ReceiveInteger(); + cs.num.setKey("num"); + } else if (MulticastTest.type == SocketType.Sender){ + DataSegment.connectMulticast("multicast1", "224.0.0.2", conf.localPort+1, conf.nis, SocketType.Receiver); + ReceiveString cs = new ReceiveString(); + cs.str.setKey("str"); + + for (int i=0;i < 11; i++) + ods.put("multicast", "num", i); + } } } diff -r 28627bb3eeda -r b7d02ea79850 src/test/java/alice/daemon/MulticastTest.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/test/java/alice/daemon/MulticastTest.java Sun Jan 04 13:51:01 2015 +0900 @@ -0,0 +1,12 @@ +package alice.daemon; + +import alice.datasegment.MulticastDataSegmentManager.SocketType; + +public class MulticastTest { + public static SocketType type = SocketType.Sender; + public static void main(String[] args){ + Config conf = new Config(args); + new MulticastStartCodeSegment(conf).execute(); + } + +} diff -r 28627bb3eeda -r b7d02ea79850 src/test/java/alice/daemon/ReceiveInteger.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/test/java/alice/daemon/ReceiveInteger.java Sun Jan 04 13:51:01 2015 +0900 @@ -0,0 +1,33 @@ +package alice.daemon; + +import alice.codesegment.CodeSegment; +import alice.datasegment.CommandType; +import alice.datasegment.Receiver; + +public class ReceiveInteger extends CodeSegment{ + + public Receiver num = ids.create(CommandType.TAKE); + + @Override + public void run() { + int num = this.num.asInteger(); + System.out.println("[CodeSegment] "+num); + + if (num==10) { + ods.put("multicast1", "str", "finish"); + try { + synchronized (this) { + wait(2000); + } + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + System.exit(0); + } + ReceiveInteger cs = new ReceiveInteger(); + cs.num.setKey("num"); + + } + +} diff -r 28627bb3eeda -r b7d02ea79850 src/test/java/alice/daemon/ReceiveString.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/test/java/alice/daemon/ReceiveString.java Sun Jan 04 13:51:01 2015 +0900 @@ -0,0 +1,17 @@ +package alice.daemon; + +import alice.codesegment.CodeSegment; +import alice.datasegment.CommandType; +import alice.datasegment.Receiver; + +public class ReceiveString extends CodeSegment { + + public Receiver str = ids.create(CommandType.TAKE); + + @Override + public void run() { + System.out.println(str.asString()); + System.exit(0); + } + +} diff -r 28627bb3eeda -r b7d02ea79850 src/test/java/alice/daemon/UdpTest.java --- a/src/test/java/alice/daemon/UdpTest.java Sat Jan 03 19:43:36 2015 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,12 +0,0 @@ -package alice.daemon; - -import alice.datasegment.DataSegment; - -public class UdpTest { - public static void main(String[] args){ - Config conf = new Config(args); - DataSegment.connectMulticast("multicast",conf.MCSTADDR, conf.localPort, conf.nis); - new MulticastStartCodeSegment().execute(); - } - -}