view src/treecms/cassandra/util/CassandraClientWrapper.java @ 17:168deb591f21

commit
author shoshi
date Tue, 24 May 2011 00:33:12 +0900
parents src/treecms/tree/cassandra/v1/util/CassandraClientWrapper.java@85061e874775
children
line wrap: on
line source

package treecms.cassandra.util;

import java.util.List;
import java.util.Map;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.KeyRange;
import org.apache.cassandra.thrift.KeySlice;
import org.apache.cassandra.thrift.Mutation;
import org.apache.cassandra.thrift.NotFoundException;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.TimedOutException;
import org.apache.cassandra.thrift.UnavailableException;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;

/**
 * Cassandra.Clientのラッパークラスです。Cassandra.Clientを使いやすくするための機能を提供します。
 * 接続済みのCassandra.Clientの接続が切断されたときに再接続を行います。このクラスはスレッドセーフでは有りません。
 * 
 * このラッパークラスはCassandra 0.6.xのAPIをベースに作成します。
 * @author shoshi
 */
final class CassandraClientWrapper
{
	private String m_host;
	private int m_port;
	private int m_retryCount;
	
	private TTransport m_tr;
	private Cassandra.Client m_client;
	
	/**
	 * コンストラクタです。初期化して接続します。
	 * @param _host Cassandraのホスト名
	 * @param _port Cassandraのポート番号
	 * @param _retryCount リクエストが失敗した場合リトライする回数
	 * @throws TTransportException
	 */
	public CassandraClientWrapper(String _host,int _port,int _retryCount) throws TTransportException
	{
		m_host = _host;
		m_port = _port;
		m_retryCount = _retryCount;
		
		connect();
	}
	
	/**
	 * Cassandraに接続します。
	 * @throws TTransportException
	 */
	private void connect() throws TTransportException
	{
		Cassandra.Client client;
		TTransport tr = new TSocket(m_host,m_port);
		client = new Cassandra.Client(new TBinaryProtocol(tr));	
		
		m_tr = tr;
		m_tr.open();
		
		m_client = client;
	}
	
	/**
	 * ここで共通する例外処理(再接続など)を行う
	 * @param _e 例外
	 */
	private void exceptionHandler(Exception _e)
	{
		_e.printStackTrace();
	}
	
	/**
	 * Cassandra.Client.getのラッパーメソッド
	 */
	public ColumnOrSuperColumn get(String _ks,String _key,ColumnPath _path,ConsistencyLevel _level) throws InvalidRequestException, NotFoundException, UnavailableException, TimedOutException, TException
	{
		for(int i = 0;i < m_retryCount;i ++){
			if(m_client == null){
				throw new IllegalStateException("Cassandra.Client is disconnected. "+m_host+":"+m_port);
			}
			
			try {
				ColumnOrSuperColumn cors = m_client.get(_ks,_key,_path,_level);
				return cors;
			}catch(Exception _e){
				exceptionHandler(_e);
			}
		}
		return m_client.get(_ks,_key,_path,_level);
	}
	
	/**
	 * Cassandra.Client.get_sliceのラッパーメソッド
	 */
	public List<ColumnOrSuperColumn> get_slice(String _ks,String _key,ColumnParent _parent,SlicePredicate _predicate,ConsistencyLevel _level) throws InvalidRequestException, UnavailableException, TimedOutException, TException
	{
		for(int i = 0;i < m_retryCount;i ++){
			if(m_client == null){
				throw new IllegalStateException("Cassandra.Client is disconnected. "+m_host+":"+m_port);
			}
			
			try {
				List<ColumnOrSuperColumn> list = m_client.get_slice(_ks,_key,_parent,_predicate,_level);
				return list;
			}catch(Exception _e){
				exceptionHandler(_e);
			}
		}
		return m_client.get_slice(_ks,_key,_parent,_predicate,_level);
	}
	
	/**
	 * Cassandra.Client.get_range_slicesのラッパーメソッド
	 */
	public List<KeySlice> get_range_slices(String _ks,ColumnParent _parent,SlicePredicate _predicate,KeyRange _range,ConsistencyLevel _level) throws InvalidRequestException, UnavailableException, TimedOutException, TException
	{
		for(int i = 0;i < m_retryCount;i ++){
			if(m_client == null){
				throw new IllegalStateException("Cassandra.Client is disconnected. "+m_host+":"+m_port);
			}
			
			try {
				return m_client.get_range_slices(_ks,_parent,_predicate,_range,_level);
			}catch(Exception _e){
				exceptionHandler(_e);
			}
		}
		return m_client.get_range_slices(_ks,_parent,_predicate,_range,_level);
	}
	
	/**
	 * describe_cluster_name の ラッパーメソッド
	 * @return cluster name
	 * @throws TException
	 */
	public String describe_cluster_name() throws TException
	{
		for(int i = 0;i < m_retryCount;i ++){
			if(m_client == null){
				throw new IllegalStateException("Cassandra.Client is disconnected. "+m_host+":"+m_port);
			}
			
			try {
				return m_client.describe_cluster_name();
			}catch(Exception _e){
				exceptionHandler(_e);
			}
		}
		return m_client.describe_cluster_name();
	}
	
	/**
	 * Cassandra.Client.insertのラッパーメソッド
	 */
	public void insert(String _ks,String _key,ColumnPath _path,byte[] _value,long _timestamp,ConsistencyLevel _level) throws InvalidRequestException, UnavailableException, TimedOutException, TException
	{
		for(int i = 1;i < m_retryCount;i ++){
			if(m_client == null){
				throw new IllegalStateException("Cassandra.Client is disconnected. "+m_host+":"+m_port);
			}
			
			try{
				m_client.insert(_ks,_key,_path,_value,_timestamp,_level);
			}catch(Exception _e){
				exceptionHandler(_e);
			}
		}
		m_client.insert(_ks,_key,_path,_value,_timestamp,_level);
		return;
	}
	
	/**
	 * Cassandra.Client.batch_mutateのラッパーメソッド
	 */
	public void batch_mutate(String _ks,Map<String,Map<String,List<Mutation>>> _mutation_map,ConsistencyLevel _level) throws InvalidRequestException, UnavailableException, TimedOutException, TException
	{
		for(int i = 1;i < m_retryCount;i ++){
			if(m_client == null){
				throw new IllegalStateException("Cassandra.Client is disconnected. "+m_host+":"+m_port);
			}
			
			try{
				m_client.batch_mutate(_ks,_mutation_map,_level);
			}catch(Exception _e){
				exceptionHandler(_e);
			}
		}
		m_client.batch_mutate(_ks,_mutation_map,_level);
		return;
	}
}