# HG changeset patch # User sugi # Date 1400565621 -32400 # Node ID 0c24894db37eb6ac4fbcdcb713215629592a9b4b # Parent f38cafa457c97e90e6c8d63015b7389b15d94878 MulticastDataSegment's extends change from LocalDataSegment to RemoteDataSegment diff -r f38cafa457c9 -r 0c24894db37e src/main/java/alice/daemon/IncomingTcpConnection.java --- a/src/main/java/alice/daemon/IncomingTcpConnection.java Sat May 17 21:37:17 2014 +0900 +++ b/src/main/java/alice/daemon/IncomingTcpConnection.java Tue May 20 15:00:21 2014 +0900 @@ -23,6 +23,10 @@ public String reverseKey; private LocalDataSegmentManager lmanager = DataSegment.getLocal(); + public IncomingTcpConnection(DataSegmentManager manager) { + this.manager = manager; + } + public IncomingTcpConnection(Connection connection, DataSegmentManager manager, String reverseKey) { this.manager = manager; this.connection = connection; diff -r f38cafa457c9 -r 0c24894db37e src/main/java/alice/daemon/IncomingUdpConnection.java --- a/src/main/java/alice/daemon/IncomingUdpConnection.java Sat May 17 21:37:17 2014 +0900 +++ b/src/main/java/alice/daemon/IncomingUdpConnection.java Tue May 20 15:00:21 2014 +0900 @@ -11,19 +11,20 @@ import alice.datasegment.Command; import alice.datasegment.CommandType; import alice.datasegment.DataSegment; -import alice.topology.HostMessage; +import alice.datasegment.DataSegmentManager; import alice.topology.manager.keeparive.RespondData; -import alice.topology.manager.reconnection.SendError; public class IncomingUdpConnection extends IncomingTcpConnection { // receive Data set into LocalDataSegment now but need to set into MulticastDataSegment. // and this implement has problem. If over 4096 data receive, can not read. public MulticastConnection receiver; + public MulticastConnection sender; - public IncomingUdpConnection(MulticastConnection mc) { - super(null, null, "multicast"); - receiver = mc; + public IncomingUdpConnection(MulticastConnection s, MulticastConnection r, DataSegmentManager manager) { + super(manager); + receiver = r; + sender = s; } @Override @@ -36,7 +37,6 @@ receive.flip(); CommandMessage msg = unpacker.read(CommandMessage.class); CommandType type = CommandType.getCommandTypeFromId(msg.type); - System.out.println(msg.val); switch (type) { case UPDATE: getLocalDataSegmentManager().getDataSegmentKey(msg.key) @@ -48,11 +48,11 @@ break; case PEEK: getLocalDataSegmentManager().getDataSegmentKey(msg.key) - .runCommand(new Command(type, null, null, null, msg.index, msg.seq, connection, null, null, msg.flag)); + .runCommand(new Command(type, null, null, null, msg.index, msg.seq, sender, null, null, msg.flag)); break; case TAKE: getLocalDataSegmentManager().getDataSegmentKey(msg.key) - .runCommand(new Command(type, null, null, null, msg.index, msg.seq, connection, null, null, msg.flag)); + .runCommand(new Command(type, null, null, null, msg.index, msg.seq, sender, null, null, msg.flag)); break; case REMOVE: getLocalDataSegmentManager().getDataSegmentKey(msg.key) @@ -74,11 +74,8 @@ } } catch (ClosedChannelException e) { - connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null)); return; } catch (EOFException e) { - new SendError(new HostMessage(connection.socket.getInetAddress().getHostName(), connection.socket.getPort())).execute(); - connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null)); return; } catch (IOException e) { e.printStackTrace(); diff -r f38cafa457c9 -r 0c24894db37e src/main/java/alice/datasegment/DataSegmentManager.java --- a/src/main/java/alice/datasegment/DataSegmentManager.java Sat May 17 21:37:17 2014 +0900 +++ b/src/main/java/alice/datasegment/DataSegmentManager.java Tue May 20 15:00:21 2014 +0900 @@ -52,8 +52,8 @@ public abstract void put(String key, Object val); public abstract void update(String key, Object val); + public abstract void peek(Receiver receiver, CodeSegment cs); public abstract void take(Receiver receiver, CodeSegment cs); - public abstract void peek(Receiver receiver, CodeSegment cs); public abstract void quickPut(String key, Object val); public abstract void quickUpdate(String key, Object val); diff -r f38cafa457c9 -r 0c24894db37e src/main/java/alice/datasegment/MulticastDataSegmentManager.java --- a/src/main/java/alice/datasegment/MulticastDataSegmentManager.java Sat May 17 21:37:17 2014 +0900 +++ b/src/main/java/alice/datasegment/MulticastDataSegmentManager.java Tue May 20 15:00:21 2014 +0900 @@ -15,12 +15,9 @@ import alice.daemon.MulticastConnection; import alice.daemon.OutboundTcpConnection; -public class MulticastDataSegmentManager extends LocalDataSegmentManager { - private MulticastConnection sender; - private Logger logger; +public class MulticastDataSegmentManager extends RemoteDataSegmentManager { public MulticastDataSegmentManager(String connectionKey ,final String MCASTADDR, final int port, final String nis) { - super.setReverseKey(connectionKey); logger = Logger.getLogger(connectionKey); InetAddress mAddr; try { @@ -30,12 +27,13 @@ dcr.bind(new InetSocketAddress(port)); SocketAddress sAddrr = new InetSocketAddress(mAddr,port); MulticastConnection receiver = new MulticastConnection(dcr, sAddrr); - new IncomingUdpConnection(receiver).start(); DatagramChannel dcs = createDatagramChannel(mAddr, port, nis); SocketAddress sAddrs = new InetSocketAddress(mAddr,port); - sender = new MulticastConnection(dcs, sAddrs); - new OutboundTcpConnection(sender).start(); // OutboundUdpConnection sender + connection = new MulticastConnection(dcs, sAddrs); // sender + + new IncomingUdpConnection((MulticastConnection) connection, receiver, this).start(); + new OutboundTcpConnection(connection).start(); // OutboundUdpConnection sender } catch (Exception e) { e.printStackTrace(); @@ -65,34 +63,5 @@ e.printStackTrace(); } return dc; - } - - @Override - public void put(String key, Object val) { - Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null); - sender.sendCommand(cmd); // put command on the transmission thread - if (logger.isDebugEnabled()) - logger.debug(cmd.getCommandString()); - } - - @Override - public void update(String key, Object val) { - Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null); - sender.sendCommand(cmd); - if (logger.isDebugEnabled()) - logger.debug(cmd.getCommandString()); - } - - @Override - public void quickPut(String key, Object val) { - Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null); - sender.write(cmd); // put command is executed right now - } - - @Override - public void quickUpdate(String key, Object val) { - Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null); - sender.write(cmd); - } - + } } diff -r f38cafa457c9 -r 0c24894db37e src/main/java/alice/datasegment/RemoteDataSegmentManager.java --- a/src/main/java/alice/datasegment/RemoteDataSegmentManager.java Sat May 17 21:37:17 2014 +0900 +++ b/src/main/java/alice/datasegment/RemoteDataSegmentManager.java Tue May 20 15:00:21 2014 +0900 @@ -18,6 +18,8 @@ Connection connection; Logger logger; + public RemoteDataSegmentManager(){} + public RemoteDataSegmentManager(String connectionKey, final String reverseKey, final String hostName, final int port, final boolean rFlag) { logger = Logger.getLogger(connectionKey); connection = new Connection(); diff -r f38cafa457c9 -r 0c24894db37e src/test/java/alice/daemon/MulticastIncrement.java --- a/src/test/java/alice/daemon/MulticastIncrement.java Sat May 17 21:37:17 2014 +0900 +++ b/src/test/java/alice/daemon/MulticastIncrement.java Tue May 20 15:00:21 2014 +0900 @@ -14,7 +14,7 @@ System.out.println("[CodeSegment] " + num++); if (num == 10) System.exit(0); MulticastIncrement cs = new MulticastIncrement(); - cs.num.setKey("num"); + cs.num.setKey("multicast", "num"); ods.put("multicast", "num", num); diff -r f38cafa457c9 -r 0c24894db37e src/test/java/alice/daemon/MulticastStartCodeSegment.java --- a/src/test/java/alice/daemon/MulticastStartCodeSegment.java Sat May 17 21:37:17 2014 +0900 +++ b/src/test/java/alice/daemon/MulticastStartCodeSegment.java Tue May 20 15:00:21 2014 +0900 @@ -7,7 +7,7 @@ @Override public void run() { MulticastIncrement cs = new MulticastIncrement(); - cs.num.setKey("num"); + cs.num.setKey("multicast","num"); ods.put("multicast", "num", 0); }