7
|
1 package treecms.tree.cassandra.v1;
|
|
2
|
|
3 import java.util.List;
|
|
4 import java.util.Map;
|
|
5 import org.apache.cassandra.thrift.Cassandra;
|
|
6 import org.apache.cassandra.thrift.ColumnOrSuperColumn;
|
|
7 import org.apache.cassandra.thrift.ColumnParent;
|
|
8 import org.apache.cassandra.thrift.ColumnPath;
|
|
9 import org.apache.cassandra.thrift.ConsistencyLevel;
|
|
10 import org.apache.cassandra.thrift.InvalidRequestException;
|
|
11 import org.apache.cassandra.thrift.KeyRange;
|
|
12 import org.apache.cassandra.thrift.KeySlice;
|
|
13 import org.apache.cassandra.thrift.Mutation;
|
|
14 import org.apache.cassandra.thrift.NotFoundException;
|
|
15 import org.apache.cassandra.thrift.SlicePredicate;
|
|
16 import org.apache.cassandra.thrift.TimedOutException;
|
|
17 import org.apache.cassandra.thrift.UnavailableException;
|
|
18 import org.apache.thrift.TException;
|
|
19 import org.apache.thrift.protocol.TBinaryProtocol;
|
|
20 import org.apache.thrift.transport.TSocket;
|
|
21 import org.apache.thrift.transport.TTransport;
|
|
22 import org.apache.thrift.transport.TTransportException;
|
|
23
|
|
24 /**
|
|
25 * Cassandra.Clientのラッパークラスです。Cassandra.Clientを使いやすくするための機能を提供します。
|
|
26 * 接続済みのCassandra.Clientの接続が切断されたときに再接続を行います。このクラスはスレッドセーフでは有りません。
|
|
27 *
|
|
28 * このラッパークラスはCassandra 0.6.xのAPIをベースに作成します。
|
|
29 * @author shoshi
|
|
30 */
|
|
31 final class ClientWrapper
|
|
32 {
|
|
33 private String m_host;
|
|
34 private int m_port;
|
|
35 private int m_retryCount;
|
|
36
|
|
37 private TTransport m_tr;
|
|
38 private Cassandra.Client m_client;
|
|
39
|
|
40 /**
|
|
41 * コンストラクタです。初期化して接続します。
|
|
42 * @param _host Cassandraのホスト名
|
|
43 * @param _port Cassandraのポート番号
|
|
44 * @param _retryCount リクエストが失敗した場合リトライする回数
|
|
45 * @throws TTransportException
|
|
46 */
|
|
47 public ClientWrapper(String _host,int _port,int _retryCount) throws TTransportException
|
|
48 {
|
|
49 m_host = _host;
|
|
50 m_port = _port;
|
|
51 m_retryCount = _retryCount;
|
|
52
|
|
53 connect();
|
|
54 }
|
|
55
|
|
56 /**
|
|
57 * Cassandraに接続します。
|
|
58 * @throws TTransportException
|
|
59 */
|
|
60 private void connect() throws TTransportException
|
|
61 {
|
|
62 Cassandra.Client client;
|
|
63 TTransport tr = new TSocket(m_host,m_port);
|
|
64 client = new Cassandra.Client(new TBinaryProtocol(tr));
|
|
65
|
|
66 tr.open();
|
|
67
|
|
68 m_tr = tr;
|
|
69 m_client = client;
|
|
70 }
|
|
71
|
|
72 /**
|
|
73 * Cassandraへの接続を切ります。
|
|
74 */
|
|
75 private void disconnect()
|
|
76 {
|
|
77 m_tr.close();
|
|
78 m_client = null;
|
|
79 }
|
|
80
|
|
81 /**
|
|
82 * Cassandraへ再接続を行います。
|
|
83 * @return 再接続が成功した場合true
|
|
84 */
|
|
85 private boolean reconnect()
|
|
86 {
|
|
87 disconnect();
|
|
88 try {
|
|
89 connect();
|
|
90 }catch(TTransportException _e){
|
|
91 _e.printStackTrace();
|
|
92 }
|
|
93 return true;
|
|
94 }
|
|
95
|
|
96 /**
|
|
97 * ここで共通する例外処理(再接続など)を行う
|
|
98 * @param _e 例外
|
|
99 */
|
|
100 private void exceptionHandler(Exception _e)
|
|
101 {
|
|
102 _e.printStackTrace();
|
|
103 }
|
|
104
|
|
105 /**
|
|
106 * Cassandra.Client.getのラッパーメソッド
|
|
107 */
|
|
108 public ColumnOrSuperColumn get(String _ks,String _key,ColumnPath _path,ConsistencyLevel _level) throws InvalidRequestException, NotFoundException, UnavailableException, TimedOutException, TException
|
|
109 {
|
|
110 for(int i = 0;i < m_retryCount;i ++){
|
|
111 if(m_client == null){
|
|
112 throw new IllegalStateException("Cassandra.Client is disconnected. "+m_host+":"+m_port);
|
|
113 }
|
|
114
|
|
115 try {
|
|
116 ColumnOrSuperColumn cors = m_client.get(_ks,_key,_path,_level);
|
|
117 return cors;
|
|
118 }catch(Exception _e){
|
|
119 exceptionHandler(_e);
|
|
120 }
|
|
121 }
|
|
122 return m_client.get(_ks,_key,_path,_level);
|
|
123 }
|
|
124
|
|
125 /**
|
|
126 * Cassandra.Client.get_sliceのラッパーメソッド
|
|
127 */
|
|
128 public List<ColumnOrSuperColumn> get_slice(String _ks,String _key,ColumnParent _parent,SlicePredicate _predicate,ConsistencyLevel _level) throws InvalidRequestException, UnavailableException, TimedOutException, TException
|
|
129 {
|
|
130 for(int i = 0;i < m_retryCount;i ++){
|
|
131 if(m_client == null){
|
|
132 throw new IllegalStateException("Cassandra.Client is disconnected. "+m_host+":"+m_port);
|
|
133 }
|
|
134
|
|
135 try {
|
|
136 List<ColumnOrSuperColumn> list = m_client.get_slice(_ks,_key,_parent,_predicate,_level);
|
|
137 return list;
|
|
138 }catch(Exception _e){
|
|
139 exceptionHandler(_e);
|
|
140 }
|
|
141 }
|
|
142 return m_client.get_slice(_ks,_key,_parent,_predicate,_level);
|
|
143 }
|
|
144
|
|
145 /**
|
|
146 * Cassandra.Client.get_range_slicesのラッパーメソッド
|
|
147 */
|
|
148 public List<KeySlice> get_range_slices(String _ks,ColumnParent _parent,SlicePredicate _predicate,KeyRange _range,ConsistencyLevel _level) throws InvalidRequestException, UnavailableException, TimedOutException, TException
|
|
149 {
|
|
150 for(int i = 0;i < m_retryCount;i ++){
|
|
151 if(m_client == null){
|
|
152 throw new IllegalStateException("Cassandra.Client is disconnected. "+m_host+":"+m_port);
|
|
153 }
|
|
154
|
|
155 try {
|
|
156 return m_client.get_range_slices(_ks,_parent,_predicate,_range,_level);
|
|
157 }catch(Exception _e){
|
|
158 exceptionHandler(_e);
|
|
159 }
|
|
160 }
|
|
161 return m_client.get_range_slices(_ks,_parent,_predicate,_range,_level);
|
|
162 }
|
|
163
|
|
164 /**
|
|
165 * Cassandra.Client.insertのラッパーメソッド
|
|
166 */
|
|
167 public void insert(String _ks,String _key,ColumnPath _path,byte[] _value,long _timestamp,ConsistencyLevel _level) throws InvalidRequestException, UnavailableException, TimedOutException, TException
|
|
168 {
|
|
169 for(int i = 1;i < m_retryCount;i ++){
|
|
170 if(m_client == null){
|
|
171 throw new IllegalStateException("Cassandra.Client is disconnected. "+m_host+":"+m_port);
|
|
172 }
|
|
173
|
|
174 try{
|
|
175 m_client.insert(_ks,_key,_path,_value,_timestamp,_level);
|
|
176 }catch(Exception _e){
|
|
177 exceptionHandler(_e);
|
|
178 }
|
|
179 }
|
|
180 m_client.insert(_ks,_key,_path,_value,_timestamp,_level);
|
|
181 return;
|
|
182 }
|
|
183
|
|
184 /**
|
|
185 * Cassandra.Client.batch_mutateのラッパーメソッド
|
|
186 */
|
|
187 public void batch_mutate(String _ks,Map<String,Map<String,List<Mutation>>> _mutation_map,ConsistencyLevel _level) throws InvalidRequestException, UnavailableException, TimedOutException, TException
|
|
188 {
|
|
189 for(int i = 1;i < m_retryCount;i ++){
|
|
190 if(m_client == null){
|
|
191 throw new IllegalStateException("Cassandra.Client is disconnected. "+m_host+":"+m_port);
|
|
192 }
|
|
193
|
|
194 try{
|
|
195 m_client.batch_mutate(_ks,_mutation_map,_level);
|
|
196 }catch(Exception _e){
|
|
197 exceptionHandler(_e);
|
|
198 }
|
|
199 }
|
|
200 m_client.batch_mutate(_ks,_mutation_map,_level);
|
|
201 return;
|
|
202 }
|
|
203 }
|