Mercurial > hg > Members > shoshi > TreeCMSv2
view src/treecms/cassandra/util/CassandraClientWrapper.java @ 17:168deb591f21
commit
author | shoshi |
---|---|
date | Tue, 24 May 2011 00:33:12 +0900 |
parents | src/treecms/tree/cassandra/v1/util/CassandraClientWrapper.java@85061e874775 |
children |
line wrap: on
line source
package treecms.cassandra.util; import java.util.List; import java.util.Map; import org.apache.cassandra.thrift.Cassandra; import org.apache.cassandra.thrift.ColumnOrSuperColumn; import org.apache.cassandra.thrift.ColumnParent; import org.apache.cassandra.thrift.ColumnPath; import org.apache.cassandra.thrift.ConsistencyLevel; import org.apache.cassandra.thrift.InvalidRequestException; import org.apache.cassandra.thrift.KeyRange; import org.apache.cassandra.thrift.KeySlice; import org.apache.cassandra.thrift.Mutation; import org.apache.cassandra.thrift.NotFoundException; import org.apache.cassandra.thrift.SlicePredicate; import org.apache.cassandra.thrift.TimedOutException; import org.apache.cassandra.thrift.UnavailableException; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; /** * Cassandra.Clientのラッパークラスです。Cassandra.Clientを使いやすくするための機能を提供します。 * 接続済みのCassandra.Clientの接続が切断されたときに再接続を行います。このクラスはスレッドセーフでは有りません。 * * このラッパークラスはCassandra 0.6.xのAPIをベースに作成します。 * @author shoshi */ final class CassandraClientWrapper { private String m_host; private int m_port; private int m_retryCount; private TTransport m_tr; private Cassandra.Client m_client; /** * コンストラクタです。初期化して接続します。 * @param _host Cassandraのホスト名 * @param _port Cassandraのポート番号 * @param _retryCount リクエストが失敗した場合リトライする回数 * @throws TTransportException */ public CassandraClientWrapper(String _host,int _port,int _retryCount) throws TTransportException { m_host = _host; m_port = _port; m_retryCount = _retryCount; connect(); } /** * Cassandraに接続します。 * @throws TTransportException */ private void connect() throws TTransportException { Cassandra.Client client; TTransport tr = new TSocket(m_host,m_port); client = new Cassandra.Client(new TBinaryProtocol(tr)); m_tr = tr; m_tr.open(); m_client = client; } /** * ここで共通する例外処理(再接続など)を行う * @param _e 例外 */ private void exceptionHandler(Exception _e) { _e.printStackTrace(); } /** * Cassandra.Client.getのラッパーメソッド */ public ColumnOrSuperColumn get(String _ks,String _key,ColumnPath _path,ConsistencyLevel _level) throws InvalidRequestException, NotFoundException, UnavailableException, TimedOutException, TException { for(int i = 0;i < m_retryCount;i ++){ if(m_client == null){ throw new IllegalStateException("Cassandra.Client is disconnected. "+m_host+":"+m_port); } try { ColumnOrSuperColumn cors = m_client.get(_ks,_key,_path,_level); return cors; }catch(Exception _e){ exceptionHandler(_e); } } return m_client.get(_ks,_key,_path,_level); } /** * Cassandra.Client.get_sliceのラッパーメソッド */ public List<ColumnOrSuperColumn> get_slice(String _ks,String _key,ColumnParent _parent,SlicePredicate _predicate,ConsistencyLevel _level) throws InvalidRequestException, UnavailableException, TimedOutException, TException { for(int i = 0;i < m_retryCount;i ++){ if(m_client == null){ throw new IllegalStateException("Cassandra.Client is disconnected. "+m_host+":"+m_port); } try { List<ColumnOrSuperColumn> list = m_client.get_slice(_ks,_key,_parent,_predicate,_level); return list; }catch(Exception _e){ exceptionHandler(_e); } } return m_client.get_slice(_ks,_key,_parent,_predicate,_level); } /** * Cassandra.Client.get_range_slicesのラッパーメソッド */ public List<KeySlice> get_range_slices(String _ks,ColumnParent _parent,SlicePredicate _predicate,KeyRange _range,ConsistencyLevel _level) throws InvalidRequestException, UnavailableException, TimedOutException, TException { for(int i = 0;i < m_retryCount;i ++){ if(m_client == null){ throw new IllegalStateException("Cassandra.Client is disconnected. "+m_host+":"+m_port); } try { return m_client.get_range_slices(_ks,_parent,_predicate,_range,_level); }catch(Exception _e){ exceptionHandler(_e); } } return m_client.get_range_slices(_ks,_parent,_predicate,_range,_level); } /** * describe_cluster_name の ラッパーメソッド * @return cluster name * @throws TException */ public String describe_cluster_name() throws TException { for(int i = 0;i < m_retryCount;i ++){ if(m_client == null){ throw new IllegalStateException("Cassandra.Client is disconnected. "+m_host+":"+m_port); } try { return m_client.describe_cluster_name(); }catch(Exception _e){ exceptionHandler(_e); } } return m_client.describe_cluster_name(); } /** * Cassandra.Client.insertのラッパーメソッド */ public void insert(String _ks,String _key,ColumnPath _path,byte[] _value,long _timestamp,ConsistencyLevel _level) throws InvalidRequestException, UnavailableException, TimedOutException, TException { for(int i = 1;i < m_retryCount;i ++){ if(m_client == null){ throw new IllegalStateException("Cassandra.Client is disconnected. "+m_host+":"+m_port); } try{ m_client.insert(_ks,_key,_path,_value,_timestamp,_level); }catch(Exception _e){ exceptionHandler(_e); } } m_client.insert(_ks,_key,_path,_value,_timestamp,_level); return; } /** * Cassandra.Client.batch_mutateのラッパーメソッド */ public void batch_mutate(String _ks,Map<String,Map<String,List<Mutation>>> _mutation_map,ConsistencyLevel _level) throws InvalidRequestException, UnavailableException, TimedOutException, TException { for(int i = 1;i < m_retryCount;i ++){ if(m_client == null){ throw new IllegalStateException("Cassandra.Client is disconnected. "+m_host+":"+m_port); } try{ m_client.batch_mutate(_ks,_mutation_map,_level); }catch(Exception _e){ exceptionHandler(_e); } } m_client.batch_mutate(_ks,_mutation_map,_level); return; } }