Mercurial > hg > Members > shoshi > TreeCMSv2
comparison src/treecms/tree/cassandra/v1/ClientWrapper.java @ 7:fc19e38b669b
added concurrent access client for cassandr
author | shoshi |
---|---|
date | Thu, 17 Mar 2011 23:24:08 +0900 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
6:12604eb6b615 | 7:fc19e38b669b |
---|---|
1 package treecms.tree.cassandra.v1; | |
2 | |
3 import java.util.List; | |
4 import java.util.Map; | |
5 import org.apache.cassandra.thrift.Cassandra; | |
6 import org.apache.cassandra.thrift.ColumnOrSuperColumn; | |
7 import org.apache.cassandra.thrift.ColumnParent; | |
8 import org.apache.cassandra.thrift.ColumnPath; | |
9 import org.apache.cassandra.thrift.ConsistencyLevel; | |
10 import org.apache.cassandra.thrift.InvalidRequestException; | |
11 import org.apache.cassandra.thrift.KeyRange; | |
12 import org.apache.cassandra.thrift.KeySlice; | |
13 import org.apache.cassandra.thrift.Mutation; | |
14 import org.apache.cassandra.thrift.NotFoundException; | |
15 import org.apache.cassandra.thrift.SlicePredicate; | |
16 import org.apache.cassandra.thrift.TimedOutException; | |
17 import org.apache.cassandra.thrift.UnavailableException; | |
18 import org.apache.thrift.TException; | |
19 import org.apache.thrift.protocol.TBinaryProtocol; | |
20 import org.apache.thrift.transport.TSocket; | |
21 import org.apache.thrift.transport.TTransport; | |
22 import org.apache.thrift.transport.TTransportException; | |
23 | |
24 /** | |
25 * Cassandra.Clientのラッパークラスです。Cassandra.Clientを使いやすくするための機能を提供します。 | |
26 * 接続済みのCassandra.Clientの接続が切断されたときに再接続を行います。このクラスはスレッドセーフでは有りません。 | |
27 * | |
28 * このラッパークラスはCassandra 0.6.xのAPIをベースに作成します。 | |
29 * @author shoshi | |
30 */ | |
31 final class ClientWrapper | |
32 { | |
33 private String m_host; | |
34 private int m_port; | |
35 private int m_retryCount; | |
36 | |
37 private TTransport m_tr; | |
38 private Cassandra.Client m_client; | |
39 | |
40 /** | |
41 * コンストラクタです。初期化して接続します。 | |
42 * @param _host Cassandraのホスト名 | |
43 * @param _port Cassandraのポート番号 | |
44 * @param _retryCount リクエストが失敗した場合リトライする回数 | |
45 * @throws TTransportException | |
46 */ | |
47 public ClientWrapper(String _host,int _port,int _retryCount) throws TTransportException | |
48 { | |
49 m_host = _host; | |
50 m_port = _port; | |
51 m_retryCount = _retryCount; | |
52 | |
53 connect(); | |
54 } | |
55 | |
56 /** | |
57 * Cassandraに接続します。 | |
58 * @throws TTransportException | |
59 */ | |
60 private void connect() throws TTransportException | |
61 { | |
62 Cassandra.Client client; | |
63 TTransport tr = new TSocket(m_host,m_port); | |
64 client = new Cassandra.Client(new TBinaryProtocol(tr)); | |
65 | |
66 tr.open(); | |
67 | |
68 m_tr = tr; | |
69 m_client = client; | |
70 } | |
71 | |
72 /** | |
73 * Cassandraへの接続を切ります。 | |
74 */ | |
75 private void disconnect() | |
76 { | |
77 m_tr.close(); | |
78 m_client = null; | |
79 } | |
80 | |
81 /** | |
82 * Cassandraへ再接続を行います。 | |
83 * @return 再接続が成功した場合true | |
84 */ | |
85 private boolean reconnect() | |
86 { | |
87 disconnect(); | |
88 try { | |
89 connect(); | |
90 }catch(TTransportException _e){ | |
91 _e.printStackTrace(); | |
92 } | |
93 return true; | |
94 } | |
95 | |
96 /** | |
97 * ここで共通する例外処理(再接続など)を行う | |
98 * @param _e 例外 | |
99 */ | |
100 private void exceptionHandler(Exception _e) | |
101 { | |
102 _e.printStackTrace(); | |
103 } | |
104 | |
105 /** | |
106 * Cassandra.Client.getのラッパーメソッド | |
107 */ | |
108 public ColumnOrSuperColumn get(String _ks,String _key,ColumnPath _path,ConsistencyLevel _level) throws InvalidRequestException, NotFoundException, UnavailableException, TimedOutException, TException | |
109 { | |
110 for(int i = 0;i < m_retryCount;i ++){ | |
111 if(m_client == null){ | |
112 throw new IllegalStateException("Cassandra.Client is disconnected. "+m_host+":"+m_port); | |
113 } | |
114 | |
115 try { | |
116 ColumnOrSuperColumn cors = m_client.get(_ks,_key,_path,_level); | |
117 return cors; | |
118 }catch(Exception _e){ | |
119 exceptionHandler(_e); | |
120 } | |
121 } | |
122 return m_client.get(_ks,_key,_path,_level); | |
123 } | |
124 | |
125 /** | |
126 * Cassandra.Client.get_sliceのラッパーメソッド | |
127 */ | |
128 public List<ColumnOrSuperColumn> get_slice(String _ks,String _key,ColumnParent _parent,SlicePredicate _predicate,ConsistencyLevel _level) throws InvalidRequestException, UnavailableException, TimedOutException, TException | |
129 { | |
130 for(int i = 0;i < m_retryCount;i ++){ | |
131 if(m_client == null){ | |
132 throw new IllegalStateException("Cassandra.Client is disconnected. "+m_host+":"+m_port); | |
133 } | |
134 | |
135 try { | |
136 List<ColumnOrSuperColumn> list = m_client.get_slice(_ks,_key,_parent,_predicate,_level); | |
137 return list; | |
138 }catch(Exception _e){ | |
139 exceptionHandler(_e); | |
140 } | |
141 } | |
142 return m_client.get_slice(_ks,_key,_parent,_predicate,_level); | |
143 } | |
144 | |
145 /** | |
146 * Cassandra.Client.get_range_slicesのラッパーメソッド | |
147 */ | |
148 public List<KeySlice> get_range_slices(String _ks,ColumnParent _parent,SlicePredicate _predicate,KeyRange _range,ConsistencyLevel _level) throws InvalidRequestException, UnavailableException, TimedOutException, TException | |
149 { | |
150 for(int i = 0;i < m_retryCount;i ++){ | |
151 if(m_client == null){ | |
152 throw new IllegalStateException("Cassandra.Client is disconnected. "+m_host+":"+m_port); | |
153 } | |
154 | |
155 try { | |
156 return m_client.get_range_slices(_ks,_parent,_predicate,_range,_level); | |
157 }catch(Exception _e){ | |
158 exceptionHandler(_e); | |
159 } | |
160 } | |
161 return m_client.get_range_slices(_ks,_parent,_predicate,_range,_level); | |
162 } | |
163 | |
164 /** | |
165 * Cassandra.Client.insertのラッパーメソッド | |
166 */ | |
167 public void insert(String _ks,String _key,ColumnPath _path,byte[] _value,long _timestamp,ConsistencyLevel _level) throws InvalidRequestException, UnavailableException, TimedOutException, TException | |
168 { | |
169 for(int i = 1;i < m_retryCount;i ++){ | |
170 if(m_client == null){ | |
171 throw new IllegalStateException("Cassandra.Client is disconnected. "+m_host+":"+m_port); | |
172 } | |
173 | |
174 try{ | |
175 m_client.insert(_ks,_key,_path,_value,_timestamp,_level); | |
176 }catch(Exception _e){ | |
177 exceptionHandler(_e); | |
178 } | |
179 } | |
180 m_client.insert(_ks,_key,_path,_value,_timestamp,_level); | |
181 return; | |
182 } | |
183 | |
184 /** | |
185 * Cassandra.Client.batch_mutateのラッパーメソッド | |
186 */ | |
187 public void batch_mutate(String _ks,Map<String,Map<String,List<Mutation>>> _mutation_map,ConsistencyLevel _level) throws InvalidRequestException, UnavailableException, TimedOutException, TException | |
188 { | |
189 for(int i = 1;i < m_retryCount;i ++){ | |
190 if(m_client == null){ | |
191 throw new IllegalStateException("Cassandra.Client is disconnected. "+m_host+":"+m_port); | |
192 } | |
193 | |
194 try{ | |
195 m_client.batch_mutate(_ks,_mutation_map,_level); | |
196 }catch(Exception _e){ | |
197 exceptionHandler(_e); | |
198 } | |
199 } | |
200 m_client.batch_mutate(_ks,_mutation_map,_level); | |
201 return; | |
202 } | |
203 } |