changeset 369:0c24894db37e multicast

MulticastDataSegment's extends change from LocalDataSegment to RemoteDataSegment
author sugi
date Tue, 20 May 2014 15:00:21 +0900
parents f38cafa457c9
children ce3ad64bacd3
files src/main/java/alice/daemon/IncomingTcpConnection.java src/main/java/alice/daemon/IncomingUdpConnection.java src/main/java/alice/datasegment/DataSegmentManager.java src/main/java/alice/datasegment/MulticastDataSegmentManager.java src/main/java/alice/datasegment/RemoteDataSegmentManager.java src/test/java/alice/daemon/MulticastIncrement.java src/test/java/alice/daemon/MulticastStartCodeSegment.java
diffstat 7 files changed, 23 insertions(+), 51 deletions(-) [+]
line wrap: on
line diff
--- a/src/main/java/alice/daemon/IncomingTcpConnection.java	Sat May 17 21:37:17 2014 +0900
+++ b/src/main/java/alice/daemon/IncomingTcpConnection.java	Tue May 20 15:00:21 2014 +0900
@@ -23,6 +23,10 @@
 	public String reverseKey;
 	private LocalDataSegmentManager lmanager = DataSegment.getLocal();
 
+	public IncomingTcpConnection(DataSegmentManager manager) {
+		this.manager = manager;
+	}
+	
 	public IncomingTcpConnection(Connection connection, DataSegmentManager manager, String reverseKey) {
 		this.manager = manager;
 		this.connection = connection;
--- a/src/main/java/alice/daemon/IncomingUdpConnection.java	Sat May 17 21:37:17 2014 +0900
+++ b/src/main/java/alice/daemon/IncomingUdpConnection.java	Tue May 20 15:00:21 2014 +0900
@@ -11,19 +11,20 @@
 import alice.datasegment.Command;
 import alice.datasegment.CommandType;
 import alice.datasegment.DataSegment;
-import alice.topology.HostMessage;
+import alice.datasegment.DataSegmentManager;
 import alice.topology.manager.keeparive.RespondData;
-import alice.topology.manager.reconnection.SendError;
 
 public class IncomingUdpConnection extends IncomingTcpConnection {
 	// receive Data set into LocalDataSegment now but need to set into MulticastDataSegment.
 	// and this implement has problem. If over 4096 data receive, can not read.
 	
 	public MulticastConnection receiver;
+	public MulticastConnection sender;
 
-	public IncomingUdpConnection(MulticastConnection mc) {
-		super(null, null, "multicast");
-		receiver = mc;
+	public IncomingUdpConnection(MulticastConnection s, MulticastConnection r, DataSegmentManager manager) {
+		super(manager);
+		receiver = r;
+		sender = s;
 	}
 	
 	@Override
@@ -36,7 +37,6 @@
 				receive.flip();
 				CommandMessage msg = unpacker.read(CommandMessage.class);
 				CommandType type = CommandType.getCommandTypeFromId(msg.type);
-				System.out.println(msg.val);
 				switch (type) {
 				case UPDATE:
 					getLocalDataSegmentManager().getDataSegmentKey(msg.key)
@@ -48,11 +48,11 @@
 					break;
 				case PEEK:
 					getLocalDataSegmentManager().getDataSegmentKey(msg.key)
-						.runCommand(new Command(type, null, null, null, msg.index, msg.seq, connection, null, null, msg.flag));
+						.runCommand(new Command(type, null, null, null, msg.index, msg.seq, sender, null, null, msg.flag));
 					break;
 				case TAKE:
 					getLocalDataSegmentManager().getDataSegmentKey(msg.key)
-						.runCommand(new Command(type, null, null, null, msg.index, msg.seq, connection, null, null, msg.flag));
+						.runCommand(new Command(type, null, null, null, msg.index, msg.seq, sender, null, null, msg.flag));
 					break;	
 				case REMOVE:
 					getLocalDataSegmentManager().getDataSegmentKey(msg.key)
@@ -74,11 +74,8 @@
 				}
 				
 			} catch (ClosedChannelException e) {
-				connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null));
 				return;
 			} catch (EOFException e) {
-				new SendError(new HostMessage(connection.socket.getInetAddress().getHostName(), connection.socket.getPort())).execute();
-				connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null));
 				return;
 			} catch (IOException e) {
 				e.printStackTrace();
--- a/src/main/java/alice/datasegment/DataSegmentManager.java	Sat May 17 21:37:17 2014 +0900
+++ b/src/main/java/alice/datasegment/DataSegmentManager.java	Tue May 20 15:00:21 2014 +0900
@@ -52,8 +52,8 @@
 		
 	public abstract void put(String key, Object val);
 	public abstract void update(String key, Object val);
+	public abstract void peek(Receiver receiver, CodeSegment cs);
 	public abstract void take(Receiver receiver, CodeSegment cs);
-	public abstract void peek(Receiver receiver, CodeSegment cs);
 	
 	public abstract void quickPut(String key, Object val);
 	public abstract void quickUpdate(String key, Object val);
--- a/src/main/java/alice/datasegment/MulticastDataSegmentManager.java	Sat May 17 21:37:17 2014 +0900
+++ b/src/main/java/alice/datasegment/MulticastDataSegmentManager.java	Tue May 20 15:00:21 2014 +0900
@@ -15,12 +15,9 @@
 import alice.daemon.MulticastConnection;
 import alice.daemon.OutboundTcpConnection;
 
-public class MulticastDataSegmentManager extends LocalDataSegmentManager {
-	private MulticastConnection sender;
-	private Logger logger;
+public class MulticastDataSegmentManager extends RemoteDataSegmentManager {
 	
 	public MulticastDataSegmentManager(String connectionKey ,final String MCASTADDR, final int port, final String nis) {
-		super.setReverseKey(connectionKey);
 		logger = Logger.getLogger(connectionKey);
 		InetAddress mAddr;
 		try {
@@ -30,12 +27,13 @@
 			dcr.bind(new InetSocketAddress(port));
 			SocketAddress sAddrr = new InetSocketAddress(mAddr,port);
 			MulticastConnection receiver = new MulticastConnection(dcr, sAddrr);
-			new IncomingUdpConnection(receiver).start();
 			
 			DatagramChannel dcs =  createDatagramChannel(mAddr, port, nis);
 			SocketAddress sAddrs = new InetSocketAddress(mAddr,port);
-			sender = new MulticastConnection(dcs, sAddrs);
-			new OutboundTcpConnection(sender).start(); // OutboundUdpConnection sender
+			connection = new MulticastConnection(dcs, sAddrs); // sender 
+
+			new IncomingUdpConnection((MulticastConnection) connection, receiver, this).start();
+			new OutboundTcpConnection(connection).start(); // OutboundUdpConnection sender
 			
 		} catch (Exception e) {
 			e.printStackTrace();
@@ -65,34 +63,5 @@
 			e.printStackTrace();
 		}
 		return dc;
-	}
-	
-	@Override
-	public void put(String key, Object val) {
-		Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null);
-		sender.sendCommand(cmd); // put command on the transmission thread
-		if (logger.isDebugEnabled())
-			logger.debug(cmd.getCommandString());
-	}
-
-	@Override
-	public void update(String key, Object val) {
-		Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null);
-		sender.sendCommand(cmd);
-		if (logger.isDebugEnabled())
-			logger.debug(cmd.getCommandString());
-	}
-
-	@Override
-	public void quickPut(String key, Object val) {
-		Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null);
-		sender.write(cmd); // put command is executed right now
-	}
-
-	@Override
-	public void quickUpdate(String key, Object val) {
-		Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null);
-		sender.write(cmd);
-	}
-
+	}	
 }
--- a/src/main/java/alice/datasegment/RemoteDataSegmentManager.java	Sat May 17 21:37:17 2014 +0900
+++ b/src/main/java/alice/datasegment/RemoteDataSegmentManager.java	Tue May 20 15:00:21 2014 +0900
@@ -18,6 +18,8 @@
 	Connection connection;
 	Logger logger;
 	
+	public RemoteDataSegmentManager(){}
+	
 	public RemoteDataSegmentManager(String connectionKey, final String reverseKey, final String hostName, final int port, final boolean rFlag) {
 		logger = Logger.getLogger(connectionKey);
 		connection = new Connection();
--- a/src/test/java/alice/daemon/MulticastIncrement.java	Sat May 17 21:37:17 2014 +0900
+++ b/src/test/java/alice/daemon/MulticastIncrement.java	Tue May 20 15:00:21 2014 +0900
@@ -14,7 +14,7 @@
 		System.out.println("[CodeSegment] " + num++);
 		if (num == 10) System.exit(0);
 		MulticastIncrement cs = new MulticastIncrement();
-		cs.num.setKey("num");
+		cs.num.setKey("multicast", "num");
 		
 		ods.put("multicast", "num", num);
 		
--- a/src/test/java/alice/daemon/MulticastStartCodeSegment.java	Sat May 17 21:37:17 2014 +0900
+++ b/src/test/java/alice/daemon/MulticastStartCodeSegment.java	Tue May 20 15:00:21 2014 +0900
@@ -7,7 +7,7 @@
 	@Override
 	public void run() {
 		MulticastIncrement cs = new MulticastIncrement();
-		cs.num.setKey("num");
+		cs.num.setKey("multicast","num");
 		
 		ods.put("multicast", "num", 0);
 	}