9
|
1 package treecms.tree.cassandra.v1.util;
|
|
2
|
|
3 import java.util.concurrent.Callable;
|
|
4 import java.util.concurrent.ExecutorService;
|
|
5 import java.util.concurrent.Executors;
|
|
6 import java.util.concurrent.Future;
|
10
|
7 import java.util.concurrent.TimeUnit;
|
9
|
8
|
|
9 import org.apache.cassandra.thrift.ColumnOrSuperColumn;
|
|
10 import org.apache.cassandra.thrift.ColumnPath;
|
|
11 import org.apache.cassandra.thrift.ConsistencyLevel;
|
|
12
|
|
13 /**
|
|
14 *
|
|
15 * @author shoshi
|
|
16 */
|
|
17 public class ConcurrentCassandraClient
|
|
18 {
|
|
19 private ExecutorService m_service;
|
|
20
|
|
21 private ConcurrentCassandraClient(String _host,int _port,int _thCount)
|
|
22 {
|
|
23 m_service = Executors.newFixedThreadPool(_thCount,new CassandraClientThreadFactory(_host,_port));
|
|
24 }
|
|
25
|
|
26 public static ConcurrentCassandraClient newInstance(String _host,int _port,int _thCount)
|
|
27 {
|
|
28 ConcurrentCassandraClient client = new ConcurrentCassandraClient(_host,_port,_thCount);
|
|
29 return client;
|
|
30 }
|
|
31
|
10
|
32 public void disconnect()
|
|
33 {
|
|
34 m_service.shutdown();
|
|
35 try{
|
|
36 m_service.awaitTermination(Long.MAX_VALUE,TimeUnit.DAYS);
|
|
37 }catch(InterruptedException _e){
|
|
38 _e.printStackTrace();
|
|
39 }
|
|
40 }
|
|
41
|
9
|
42 public Future<ColumnOrSuperColumn> get(final String _ks,final String _key,final ColumnPath _path,final ConsistencyLevel _level)
|
|
43 {
|
|
44 Callable<ColumnOrSuperColumn> task = new Callable<ColumnOrSuperColumn>(){
|
|
45 @Override
|
|
46 public ColumnOrSuperColumn call() throws Exception
|
|
47 {
|
|
48 CassandraClientWrapper client = ((CassandraClientThread)Thread.currentThread()).getClientWrapper();
|
|
49 ColumnOrSuperColumn cors = client.get(_ks,_key,_path,_level);
|
|
50 return cors;
|
|
51 }
|
|
52 };
|
|
53
|
|
54 Future<ColumnOrSuperColumn> ret = m_service.submit(task);
|
|
55 return ret;
|
|
56 }
|
10
|
57
|
|
58 public Future<Boolean> insert(final String _ks,final String _key,final ColumnPath _path,final byte[] _value,final long _timestamp,final ConsistencyLevel _level)
|
|
59 {
|
|
60 Callable<Boolean> task = new Callable<Boolean>(){
|
|
61 @Override
|
|
62 public Boolean call() throws Exception
|
|
63 {
|
|
64 CassandraClientWrapper client = ((CassandraClientThread)Thread.currentThread()).getClientWrapper();
|
|
65 client.insert(_ks,_key,_path,_value,_timestamp,_level);
|
|
66 return true;
|
|
67 }
|
|
68 };
|
|
69
|
|
70 Future<Boolean> ret = m_service.submit(task);
|
|
71 return ret;
|
|
72 }
|
9
|
73 }
|