view src/alice/daemon/IncomingTcpConnection.java @ 16:433e601a8e28

network bug fix
author kazz <kazz@cr.ie.u-ryukyu.ac.jp>
date Sun, 15 Jan 2012 12:17:30 +0900
parents e3f1b21718b0
children bb075e103cd3
line wrap: on
line source

package alice.daemon;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

import org.msgpack.MessagePack;

import alice.datasegment.Command;
import alice.datasegment.CommandType;
import alice.datasegment.DataSegment;
import alice.datasegment.DataSegmentKey;
import alice.datasegment.DataSegmentManager;
import alice.datasegment.LocalDataSegmentManager;

public class IncomingTcpConnection extends Thread {
	
	public Connection connection;
	public DataSegmentManager manager;
	
	public IncomingTcpConnection(Connection connection, DataSegmentManager manager) {
		this.manager = manager;
		this.connection = connection;
	}
	
	public void run() {
		MessagePack msgpack = new MessagePack();
		while (true) {
			SocketChannel ch = connection.socket.getChannel();
			ByteBuffer buf = ByteBuffer.allocateDirect(4); // for int
			try {
				int allReadLen = 0;
				do {
					int readLen = ch.read(buf);
					if (readLen < 0) return;
					allReadLen += readLen;
				} while (allReadLen < 4);
				buf.rewind();
				int msgLen = buf.getInt();
				allReadLen = 0;
				ByteBuffer msgBuf = ByteBuffer.allocateDirect(msgLen);
				do {
					int readLen = ch.read(msgBuf);
					if (readLen < 0) return;
					allReadLen += readLen;
				} while (allReadLen < msgLen);
				msgBuf.flip();
				CommandMessage msg = msgpack.read(msgBuf, CommandMessage.class);
				msgBuf.flip();
				System.out.println(msgpack.read(msgBuf));
				CommandType type = CommandType.getCommandTypeFromId(msg.type);
				LocalDataSegmentManager lmanager = (LocalDataSegmentManager)DataSegment.get("local");
				DataSegmentKey dsKey = lmanager.getDataSegmentKey(msg.key);
				switch (type) {
				case UPDATE:
					dsKey.addCommand(new Command(type, null, null, msg.val, 0, 0, null, null));
					break;
				case PUT:
					dsKey.addCommand(new Command(type, null, null, msg.val, 0, 0, null, null));
					break;
				case PEEK:
					//Command(CommandType cmdType, String argKey, Value val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs) {
					dsKey.addCommand(new Command(type, null, null, null, msg.index, msg.seq, connection.sendQueue, null));
					break;
				case TAKE:
					dsKey.addCommand(new Command(type, null, null, null, msg.index, msg.seq, connection.sendQueue, null));
					break;	
				case REMOVE:
					dsKey.addCommand(new Command(type, null, null, null, 0, 0, null, null));
					break;
				case REPLY:
					try {
						manager.replyQueue.put(new Command(type, null, null, msg.val, msg.index, msg.seq, null, null));
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					break;
				default:
					break;
				}
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
	
}