view src/alice/datasegment/RemoteDataSegmentManager.java @ 14:e3f1b21718b0

implements RemoteDataSegment
author kazz <kazz@cr.ie.u-ryukyu.ac.jp>
date Sun, 15 Jan 2012 00:56:25 +0900
parents src/alice/datasegment/RemoteDataSegment.java@30f97d776a3e
children 72dd27d952b0
line wrap: on
line source

package alice.datasegment;

import org.msgpack.type.Value;

import alice.codesegment.CodeSegment;
import alice.daemon.Connection;
import alice.daemon.IncomingTcpConnection;
import alice.daemon.OutboundTcpConnection;

public class RemoteDataSegmentManager extends DataSegmentManager {
	
	Connection connection;
	
	public RemoteDataSegmentManager(Connection connection) {
		this.connection = connection;
		new IncomingTcpConnection(connection, this).start();
		new OutboundTcpConnection(connection).start();
		new Thread(replyThread).start();
	}
	
	@Override
	public void put(String key, Value val) {
		connection.sendCommand(new Command(CommandType.PUT, null, key, val, 0, 0, null, null));
	}

	@Override
	public void update(String key, Value val) {
		connection.sendCommand(new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null));
	}

	@Override
	public void take(String argKey, String key, int index, CodeSegment cs) {
		int seq = this.seq.getAndIncrement();
		Command cmd = new Command(CommandType.TAKE, argKey, key, null, index, seq, replyQueue, cs);
		seqHash.put(seq, cmd);
		connection.sendCommand(cmd);		
	}

	@Override
	public void peek(String argKey, String key, int index, CodeSegment cs) {
		int seq = this.seq.getAndIncrement();
		Command cmd = new Command(CommandType.PEEK, argKey, key, null, index, seq, replyQueue, cs);
		seqHash.put(seq, cmd);
		connection.sendCommand(cmd);
	}

	@Override
	public void remove(String key) {
		connection.sendCommand(new Command(CommandType.REMOVE, null, key, null, 0, 0, null, null));
	}

}