changeset 365:8072df9130c6 multicast

IncomingUdpConnection have to improve. it same IncommingTcpConnection
author sugi
date Sat, 17 May 2014 21:32:24 +0900
parents 1494d44392a2
children abc54fa0c81b
files src/main/java/alice/daemon/IncomingTcpConnection.java src/main/java/alice/daemon/IncomingUdpConnection.java
diffstat 2 files changed, 54 insertions(+), 9 deletions(-) [+]
line wrap: on
line diff
--- a/src/main/java/alice/daemon/IncomingTcpConnection.java	Fri May 16 17:39:33 2014 +0900
+++ b/src/main/java/alice/daemon/IncomingTcpConnection.java	Sat May 17 21:32:24 2014 +0900
@@ -21,7 +21,7 @@
 	public Connection connection;
 	public DataSegmentManager manager;
 	public String reverseKey;
-	protected LocalDataSegmentManager lmanager = DataSegment.getLocal();
+	private LocalDataSegmentManager lmanager = DataSegment.getLocal();
 
 	public IncomingTcpConnection(Connection connection, DataSegmentManager manager, String reverseKey) {
 		this.manager = manager;
@@ -29,6 +29,10 @@
 		this.reverseKey = reverseKey;
 	}
 	
+	public LocalDataSegmentManager getLocalDataSegmentManager(){
+		return lmanager;
+	}
+	
 	/**
 	 * pipeline thread for receiving
 	 */
--- a/src/main/java/alice/daemon/IncomingUdpConnection.java	Fri May 16 17:39:33 2014 +0900
+++ b/src/main/java/alice/daemon/IncomingUdpConnection.java	Sat May 17 21:32:24 2014 +0900
@@ -1,44 +1,85 @@
 package alice.daemon;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
 
 import org.msgpack.unpacker.Unpacker;
 
 import alice.codesegment.SingletonMessage;
 import alice.datasegment.Command;
 import alice.datasegment.CommandType;
+import alice.datasegment.DataSegment;
+import alice.topology.HostMessage;
+import alice.topology.manager.keeparive.RespondData;
+import alice.topology.manager.reconnection.SendError;
 
 public class IncomingUdpConnection extends IncomingTcpConnection {
-	public MulticastConnection mConnection;
+	// receive Data set into LocalDataSegment now but need to set into MulticastDataSegment.
+	// and this implement has problem. If over 4096 data receive, can not read.
+	
+	public MulticastConnection receiver;
 
 	public IncomingUdpConnection(MulticastConnection mc) {
 		super(null, null, "multicast");
-		mConnection = mc;
+		receiver = mc;
 	}
 	
 	@Override
 	public void run() {
-		ByteBuffer receive = ByteBuffer.allocate(4096);
 		while (true){
-			try {
-				mConnection.receive(receive);
+			try {				
+				ByteBuffer receive = ByteBuffer.allocate(4096); 
+				receiver.receive(receive);
 				Unpacker unpacker = SingletonMessage.getInstance().createBufferUnpacker(receive);
 				receive.flip();
 				CommandMessage msg = unpacker.read(CommandMessage.class);
 				CommandType type = CommandType.getCommandTypeFromId(msg.type);
-				switch (type){
+				System.out.println(msg.val);
+				switch (type) {
 				case UPDATE:
-					lmanager.getDataSegmentKey(msg.key)
+					getLocalDataSegmentManager().getDataSegmentKey(msg.key)
 						.runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey));
 					break;
 				case PUT:
-					lmanager.getDataSegmentKey(msg.key)
+					getLocalDataSegmentManager().getDataSegmentKey(msg.key)
 						.runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey));
 					break;
+				case PEEK:
+					getLocalDataSegmentManager().getDataSegmentKey(msg.key)
+						.runCommand(new Command(type, null, null, null, msg.index, msg.seq, connection, null, null, msg.flag));
+					break;
+				case TAKE:
+					getLocalDataSegmentManager().getDataSegmentKey(msg.key)
+						.runCommand(new Command(type, null, null, null, msg.index, msg.seq, connection, null, null, msg.flag));
+					break;	
+				case REMOVE:
+					getLocalDataSegmentManager().getDataSegmentKey(msg.key)
+						.runCommand(new Command(type, null, null, null, 0, 0, null, null, null));
+					break;
+				case REPLY:
+					Command cmd = manager.getAndRemoveCmd(msg.seq);
+					cmd.cs.ids.reply(cmd.receiver, new Command(type, null, null, msg.val, msg.index, msg.seq, null, null, null));
+					cmd=null;
+					break;
+				case PING:
+					DataSegment.get(reverseKey).response(msg.key);
+					break;
+				case RESPONSE:
+					DataSegment.getLocal().put(msg.key, new RespondData(reverseKey, System.currentTimeMillis()));
+					break;
 				default:
 					break;
 				}
+				
+			} catch (ClosedChannelException e) {
+				connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null));
+				return;
+			} catch (EOFException e) {
+				new SendError(new HostMessage(connection.socket.getInetAddress().getHostName(), connection.socket.getPort())).execute();
+				connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null));
+				return;
 			} catch (IOException e) {
 				e.printStackTrace();
 			}