comparison src/treecms/tree/cassandra/v1/ClientWrapper.java @ 7:fc19e38b669b

added concurrent access client for cassandr
author shoshi
date Thu, 17 Mar 2011 23:24:08 +0900
parents
children
comparison
equal deleted inserted replaced
6:12604eb6b615 7:fc19e38b669b
1 package treecms.tree.cassandra.v1;
2
3 import java.util.List;
4 import java.util.Map;
5 import org.apache.cassandra.thrift.Cassandra;
6 import org.apache.cassandra.thrift.ColumnOrSuperColumn;
7 import org.apache.cassandra.thrift.ColumnParent;
8 import org.apache.cassandra.thrift.ColumnPath;
9 import org.apache.cassandra.thrift.ConsistencyLevel;
10 import org.apache.cassandra.thrift.InvalidRequestException;
11 import org.apache.cassandra.thrift.KeyRange;
12 import org.apache.cassandra.thrift.KeySlice;
13 import org.apache.cassandra.thrift.Mutation;
14 import org.apache.cassandra.thrift.NotFoundException;
15 import org.apache.cassandra.thrift.SlicePredicate;
16 import org.apache.cassandra.thrift.TimedOutException;
17 import org.apache.cassandra.thrift.UnavailableException;
18 import org.apache.thrift.TException;
19 import org.apache.thrift.protocol.TBinaryProtocol;
20 import org.apache.thrift.transport.TSocket;
21 import org.apache.thrift.transport.TTransport;
22 import org.apache.thrift.transport.TTransportException;
23
24 /**
25 * Cassandra.Clientのラッパークラスです。Cassandra.Clientを使いやすくするための機能を提供します。
26 * 接続済みのCassandra.Clientの接続が切断されたときに再接続を行います。このクラスはスレッドセーフでは有りません。
27 *
28 * このラッパークラスはCassandra 0.6.xのAPIをベースに作成します。
29 * @author shoshi
30 */
31 final class ClientWrapper
32 {
33 private String m_host;
34 private int m_port;
35 private int m_retryCount;
36
37 private TTransport m_tr;
38 private Cassandra.Client m_client;
39
40 /**
41 * コンストラクタです。初期化して接続します。
42 * @param _host Cassandraのホスト名
43 * @param _port Cassandraのポート番号
44 * @param _retryCount リクエストが失敗した場合リトライする回数
45 * @throws TTransportException
46 */
47 public ClientWrapper(String _host,int _port,int _retryCount) throws TTransportException
48 {
49 m_host = _host;
50 m_port = _port;
51 m_retryCount = _retryCount;
52
53 connect();
54 }
55
56 /**
57 * Cassandraに接続します。
58 * @throws TTransportException
59 */
60 private void connect() throws TTransportException
61 {
62 Cassandra.Client client;
63 TTransport tr = new TSocket(m_host,m_port);
64 client = new Cassandra.Client(new TBinaryProtocol(tr));
65
66 tr.open();
67
68 m_tr = tr;
69 m_client = client;
70 }
71
72 /**
73 * Cassandraへの接続を切ります。
74 */
75 private void disconnect()
76 {
77 m_tr.close();
78 m_client = null;
79 }
80
81 /**
82 * Cassandraへ再接続を行います。
83 * @return 再接続が成功した場合true
84 */
85 private boolean reconnect()
86 {
87 disconnect();
88 try {
89 connect();
90 }catch(TTransportException _e){
91 _e.printStackTrace();
92 }
93 return true;
94 }
95
96 /**
97 * ここで共通する例外処理(再接続など)を行う
98 * @param _e 例外
99 */
100 private void exceptionHandler(Exception _e)
101 {
102 _e.printStackTrace();
103 }
104
105 /**
106 * Cassandra.Client.getのラッパーメソッド
107 */
108 public ColumnOrSuperColumn get(String _ks,String _key,ColumnPath _path,ConsistencyLevel _level) throws InvalidRequestException, NotFoundException, UnavailableException, TimedOutException, TException
109 {
110 for(int i = 0;i < m_retryCount;i ++){
111 if(m_client == null){
112 throw new IllegalStateException("Cassandra.Client is disconnected. "+m_host+":"+m_port);
113 }
114
115 try {
116 ColumnOrSuperColumn cors = m_client.get(_ks,_key,_path,_level);
117 return cors;
118 }catch(Exception _e){
119 exceptionHandler(_e);
120 }
121 }
122 return m_client.get(_ks,_key,_path,_level);
123 }
124
125 /**
126 * Cassandra.Client.get_sliceのラッパーメソッド
127 */
128 public List<ColumnOrSuperColumn> get_slice(String _ks,String _key,ColumnParent _parent,SlicePredicate _predicate,ConsistencyLevel _level) throws InvalidRequestException, UnavailableException, TimedOutException, TException
129 {
130 for(int i = 0;i < m_retryCount;i ++){
131 if(m_client == null){
132 throw new IllegalStateException("Cassandra.Client is disconnected. "+m_host+":"+m_port);
133 }
134
135 try {
136 List<ColumnOrSuperColumn> list = m_client.get_slice(_ks,_key,_parent,_predicate,_level);
137 return list;
138 }catch(Exception _e){
139 exceptionHandler(_e);
140 }
141 }
142 return m_client.get_slice(_ks,_key,_parent,_predicate,_level);
143 }
144
145 /**
146 * Cassandra.Client.get_range_slicesのラッパーメソッド
147 */
148 public List<KeySlice> get_range_slices(String _ks,ColumnParent _parent,SlicePredicate _predicate,KeyRange _range,ConsistencyLevel _level) throws InvalidRequestException, UnavailableException, TimedOutException, TException
149 {
150 for(int i = 0;i < m_retryCount;i ++){
151 if(m_client == null){
152 throw new IllegalStateException("Cassandra.Client is disconnected. "+m_host+":"+m_port);
153 }
154
155 try {
156 return m_client.get_range_slices(_ks,_parent,_predicate,_range,_level);
157 }catch(Exception _e){
158 exceptionHandler(_e);
159 }
160 }
161 return m_client.get_range_slices(_ks,_parent,_predicate,_range,_level);
162 }
163
164 /**
165 * Cassandra.Client.insertのラッパーメソッド
166 */
167 public void insert(String _ks,String _key,ColumnPath _path,byte[] _value,long _timestamp,ConsistencyLevel _level) throws InvalidRequestException, UnavailableException, TimedOutException, TException
168 {
169 for(int i = 1;i < m_retryCount;i ++){
170 if(m_client == null){
171 throw new IllegalStateException("Cassandra.Client is disconnected. "+m_host+":"+m_port);
172 }
173
174 try{
175 m_client.insert(_ks,_key,_path,_value,_timestamp,_level);
176 }catch(Exception _e){
177 exceptionHandler(_e);
178 }
179 }
180 m_client.insert(_ks,_key,_path,_value,_timestamp,_level);
181 return;
182 }
183
184 /**
185 * Cassandra.Client.batch_mutateのラッパーメソッド
186 */
187 public void batch_mutate(String _ks,Map<String,Map<String,List<Mutation>>> _mutation_map,ConsistencyLevel _level) throws InvalidRequestException, UnavailableException, TimedOutException, TException
188 {
189 for(int i = 1;i < m_retryCount;i ++){
190 if(m_client == null){
191 throw new IllegalStateException("Cassandra.Client is disconnected. "+m_host+":"+m_port);
192 }
193
194 try{
195 m_client.batch_mutate(_ks,_mutation_map,_level);
196 }catch(Exception _e){
197 exceptionHandler(_e);
198 }
199 }
200 m_client.batch_mutate(_ks,_mutation_map,_level);
201 return;
202 }
203 }