view src/treecms/cassandra/util/ConcurrentCassandraClient.java @ 17:168deb591f21

commit
author shoshi
date Tue, 24 May 2011 00:33:12 +0900
parents src/treecms/tree/cassandra/v1/util/ConcurrentCassandraClient.java@85061e874775
children
line wrap: on
line source

package treecms.cassandra.util;

import java.util.concurrent.Callable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.Mutation;
import org.apache.cassandra.thrift.SlicePredicate;

/**
 * 
 * @author shoshi
 */
public class ConcurrentCassandraClient
{
	private ExecutorService m_service;
	
	private ConcurrentCassandraClient(String _host,int _port,int _thCount)
	{
		m_service = Executors.newFixedThreadPool(_thCount,new CassandraClientThreadFactory(_host,_port));
	}

	public static ConcurrentCassandraClient newInstance(String _host,int _port,int _thCount)
	{
		ConcurrentCassandraClient client = new ConcurrentCassandraClient(_host,_port,_thCount);
		return client;
	}
	
	public void disconnect()
	{
		m_service.shutdown();
		try{
			m_service.awaitTermination(Long.MAX_VALUE,TimeUnit.DAYS);
		}catch(InterruptedException _e){
			_e.printStackTrace();
		}
	}
	
	public Future<ColumnOrSuperColumn> get(final String _ks,final String _key,final ColumnPath _path,final ConsistencyLevel _level)
	{
		Callable<ColumnOrSuperColumn> task = new Callable<ColumnOrSuperColumn>(){
			@Override
			public ColumnOrSuperColumn call() throws Exception
			{
				CassandraClientWrapper client = ((CassandraClientThread)Thread.currentThread()).getClientWrapper();
				ColumnOrSuperColumn cors = client.get(_ks,_key,_path,_level);
				return cors;
			}
		};
		
		Future<ColumnOrSuperColumn> ret = m_service.submit(task);
		return ret;	
	}
	
	public Future<List<ColumnOrSuperColumn>> get_slice(final String _ks,final String _key,final ColumnParent _parent,final SlicePredicate _predicate,final ConsistencyLevel _level)
	{
		Callable<List<ColumnOrSuperColumn>> task = new Callable<List<ColumnOrSuperColumn>>(){
			@Override
			public List<ColumnOrSuperColumn> call() throws Exception
			{
				CassandraClientWrapper client = ((CassandraClientThread)Thread.currentThread()).getClientWrapper();
				List<ColumnOrSuperColumn> ret = client.get_slice(_ks,_key,_parent,_predicate,_level);
				return ret;
			}
		};
		
		Future<List<ColumnOrSuperColumn>> ret = m_service.submit(task);
		return ret;
	}
	
	public Future<?> batch_mutate(final String _ks,final Map<String,Map<String,List<Mutation>>> _mutation_map,final ConsistencyLevel _level)
	{
		Callable<Boolean> task = new Callable<Boolean>(){
			@Override
			public Boolean call() throws Exception
			{
				CassandraClientWrapper client = ((CassandraClientThread)Thread.currentThread()).getClientWrapper();
				client.batch_mutate(_ks,_mutation_map,_level);
				return true;
			}
		};
		
		Future<?> ret = m_service.submit(task);
		return ret;
	}
	
	public Future<Boolean> insert(final String _ks,final String _key,final ColumnPath _path,final byte[] _value,final long _timestamp,final ConsistencyLevel _level)
	{
		Callable<Boolean> task = new Callable<Boolean>(){
			@Override
			public Boolean call() throws Exception
			{
				CassandraClientWrapper client = ((CassandraClientThread)Thread.currentThread()).getClientWrapper();
				client.insert(_ks,_key,_path,_value,_timestamp,_level);
				return true;
			}
		};
		
		Future<Boolean> ret = m_service.submit(task);
		return ret;
	}
	
	public Future<String> describe_cluster_name()
	{
		Callable<String> task = new Callable<String>(){
			@Override
			public String call() throws Exception
			{
				return null;
			}
		};
		
		Future<String> ret = m_service.submit(task);
		return ret;
	}
}