Mercurial > hg > Members > shoshi > TreeCMSv2
view src/treecms/tree/cassandra/v1/util/CassandraClientWrapper.java @ 9:17ed97ca9960
commit
author | shoshi |
---|---|
date | Mon, 18 Apr 2011 01:07:27 +0900 |
parents | src/treecms/tree/cassandra/v1/ClientWrapper.java@fc19e38b669b |
children | 85061e874775 |
line wrap: on
line source
package treecms.tree.cassandra.v1.util; import java.util.List; import java.util.Map; import org.apache.cassandra.thrift.Cassandra; import org.apache.cassandra.thrift.ColumnOrSuperColumn; import org.apache.cassandra.thrift.ColumnParent; import org.apache.cassandra.thrift.ColumnPath; import org.apache.cassandra.thrift.ConsistencyLevel; import org.apache.cassandra.thrift.InvalidRequestException; import org.apache.cassandra.thrift.KeyRange; import org.apache.cassandra.thrift.KeySlice; import org.apache.cassandra.thrift.Mutation; import org.apache.cassandra.thrift.NotFoundException; import org.apache.cassandra.thrift.SlicePredicate; import org.apache.cassandra.thrift.TimedOutException; import org.apache.cassandra.thrift.UnavailableException; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; /** * Cassandra.Clientのラッパークラスです。Cassandra.Clientを使いやすくするための機能を提供します。 * 接続済みのCassandra.Clientの接続が切断されたときに再接続を行います。このクラスはスレッドセーフでは有りません。 * * このラッパークラスはCassandra 0.6.xのAPIをベースに作成します。 * @author shoshi */ final class CassandraClientWrapper { private String m_host; private int m_port; private int m_retryCount; private TTransport m_tr; private Cassandra.Client m_client; /** * コンストラクタです。初期化して接続します。 * @param _host Cassandraのホスト名 * @param _port Cassandraのポート番号 * @param _retryCount リクエストが失敗した場合リトライする回数 * @throws TTransportException */ public CassandraClientWrapper(String _host,int _port,int _retryCount) throws TTransportException { m_host = _host; m_port = _port; m_retryCount = _retryCount; connect(); } /** * Cassandraに接続します。 * @throws TTransportException */ private void connect() throws TTransportException { Cassandra.Client client; TTransport tr = new TSocket(m_host,m_port); client = new Cassandra.Client(new TBinaryProtocol(tr)); tr.open(); m_tr = tr; m_client = client; } /** * Cassandraへの接続を切ります。 */ private void disconnect() { m_tr.close(); m_client = null; } /** * Cassandraへ再接続を行います。 * @return 再接続が成功した場合true */ private boolean reconnect() { disconnect(); try { connect(); }catch(TTransportException _e){ _e.printStackTrace(); } return true; } /** * ここで共通する例外処理(再接続など)を行う * @param _e 例外 */ private void exceptionHandler(Exception _e) { _e.printStackTrace(); } /** * Cassandra.Client.getのラッパーメソッド */ public ColumnOrSuperColumn get(String _ks,String _key,ColumnPath _path,ConsistencyLevel _level) throws InvalidRequestException, NotFoundException, UnavailableException, TimedOutException, TException { for(int i = 0;i < m_retryCount;i ++){ if(m_client == null){ throw new IllegalStateException("Cassandra.Client is disconnected. "+m_host+":"+m_port); } try { ColumnOrSuperColumn cors = m_client.get(_ks,_key,_path,_level); return cors; }catch(Exception _e){ exceptionHandler(_e); } } return m_client.get(_ks,_key,_path,_level); } /** * Cassandra.Client.get_sliceのラッパーメソッド */ public List<ColumnOrSuperColumn> get_slice(String _ks,String _key,ColumnParent _parent,SlicePredicate _predicate,ConsistencyLevel _level) throws InvalidRequestException, UnavailableException, TimedOutException, TException { for(int i = 0;i < m_retryCount;i ++){ if(m_client == null){ throw new IllegalStateException("Cassandra.Client is disconnected. "+m_host+":"+m_port); } try { List<ColumnOrSuperColumn> list = m_client.get_slice(_ks,_key,_parent,_predicate,_level); return list; }catch(Exception _e){ exceptionHandler(_e); } } return m_client.get_slice(_ks,_key,_parent,_predicate,_level); } /** * Cassandra.Client.get_range_slicesのラッパーメソッド */ public List<KeySlice> get_range_slices(String _ks,ColumnParent _parent,SlicePredicate _predicate,KeyRange _range,ConsistencyLevel _level) throws InvalidRequestException, UnavailableException, TimedOutException, TException { for(int i = 0;i < m_retryCount;i ++){ if(m_client == null){ throw new IllegalStateException("Cassandra.Client is disconnected. "+m_host+":"+m_port); } try { return m_client.get_range_slices(_ks,_parent,_predicate,_range,_level); }catch(Exception _e){ exceptionHandler(_e); } } return m_client.get_range_slices(_ks,_parent,_predicate,_range,_level); } /** * Cassandra.Client.insertのラッパーメソッド */ public void insert(String _ks,String _key,ColumnPath _path,byte[] _value,long _timestamp,ConsistencyLevel _level) throws InvalidRequestException, UnavailableException, TimedOutException, TException { for(int i = 1;i < m_retryCount;i ++){ if(m_client == null){ throw new IllegalStateException("Cassandra.Client is disconnected. "+m_host+":"+m_port); } try{ m_client.insert(_ks,_key,_path,_value,_timestamp,_level); }catch(Exception _e){ exceptionHandler(_e); } } m_client.insert(_ks,_key,_path,_value,_timestamp,_level); return; } /** * Cassandra.Client.batch_mutateのラッパーメソッド */ public void batch_mutate(String _ks,Map<String,Map<String,List<Mutation>>> _mutation_map,ConsistencyLevel _level) throws InvalidRequestException, UnavailableException, TimedOutException, TException { for(int i = 1;i < m_retryCount;i ++){ if(m_client == null){ throw new IllegalStateException("Cassandra.Client is disconnected. "+m_host+":"+m_port); } try{ m_client.batch_mutate(_ks,_mutation_map,_level); }catch(Exception _e){ exceptionHandler(_e); } } m_client.batch_mutate(_ks,_mutation_map,_level); return; } }