9
|
1 package treecms.tree.cassandra.v1.util;
|
|
2
|
|
3 import java.util.concurrent.Callable;
|
11
|
4 import java.util.List;
|
|
5 import java.util.Map;
|
9
|
6 import java.util.concurrent.ExecutorService;
|
|
7 import java.util.concurrent.Executors;
|
|
8 import java.util.concurrent.Future;
|
10
|
9 import java.util.concurrent.TimeUnit;
|
9
|
10
|
|
11 import org.apache.cassandra.thrift.ColumnOrSuperColumn;
|
11
|
12 import org.apache.cassandra.thrift.ColumnParent;
|
9
|
13 import org.apache.cassandra.thrift.ColumnPath;
|
|
14 import org.apache.cassandra.thrift.ConsistencyLevel;
|
11
|
15 import org.apache.cassandra.thrift.Mutation;
|
|
16 import org.apache.cassandra.thrift.SlicePredicate;
|
9
|
17
|
|
18 /**
|
|
19 *
|
|
20 * @author shoshi
|
|
21 */
|
|
22 public class ConcurrentCassandraClient
|
|
23 {
|
|
24 private ExecutorService m_service;
|
|
25
|
|
26 private ConcurrentCassandraClient(String _host,int _port,int _thCount)
|
|
27 {
|
|
28 m_service = Executors.newFixedThreadPool(_thCount,new CassandraClientThreadFactory(_host,_port));
|
|
29 }
|
|
30
|
|
31 public static ConcurrentCassandraClient newInstance(String _host,int _port,int _thCount)
|
|
32 {
|
|
33 ConcurrentCassandraClient client = new ConcurrentCassandraClient(_host,_port,_thCount);
|
|
34 return client;
|
|
35 }
|
|
36
|
10
|
37 public void disconnect()
|
|
38 {
|
|
39 m_service.shutdown();
|
|
40 try{
|
|
41 m_service.awaitTermination(Long.MAX_VALUE,TimeUnit.DAYS);
|
|
42 }catch(InterruptedException _e){
|
|
43 _e.printStackTrace();
|
|
44 }
|
|
45 }
|
|
46
|
9
|
47 public Future<ColumnOrSuperColumn> get(final String _ks,final String _key,final ColumnPath _path,final ConsistencyLevel _level)
|
|
48 {
|
|
49 Callable<ColumnOrSuperColumn> task = new Callable<ColumnOrSuperColumn>(){
|
|
50 @Override
|
|
51 public ColumnOrSuperColumn call() throws Exception
|
|
52 {
|
|
53 CassandraClientWrapper client = ((CassandraClientThread)Thread.currentThread()).getClientWrapper();
|
|
54 ColumnOrSuperColumn cors = client.get(_ks,_key,_path,_level);
|
|
55 return cors;
|
|
56 }
|
|
57 };
|
|
58
|
|
59 Future<ColumnOrSuperColumn> ret = m_service.submit(task);
|
|
60 return ret;
|
|
61 }
|
10
|
62
|
11
|
63 public Future<List<ColumnOrSuperColumn>> get_slice(final String _ks,final String _key,final ColumnParent _parent,final SlicePredicate _predicate,final ConsistencyLevel _level)
|
|
64 {
|
|
65 Callable<List<ColumnOrSuperColumn>> task = new Callable<List<ColumnOrSuperColumn>>(){
|
|
66 @Override
|
|
67 public List<ColumnOrSuperColumn> call() throws Exception
|
|
68 {
|
|
69 CassandraClientWrapper client = ((CassandraClientThread)Thread.currentThread()).getClientWrapper();
|
|
70 List<ColumnOrSuperColumn> ret = client.get_slice(_ks,_key,_parent,_predicate,_level);
|
|
71 return ret;
|
|
72 }
|
|
73 };
|
|
74
|
|
75 Future<List<ColumnOrSuperColumn>> ret = m_service.submit(task);
|
|
76 return ret;
|
|
77 }
|
|
78
|
|
79 public Future<?> batch_mutate(final String _ks,final Map<String,Map<String,List<Mutation>>> _mutation_map,final ConsistencyLevel _level)
|
|
80 {
|
|
81 Callable<Boolean> task = new Callable<Boolean>(){
|
|
82 @Override
|
|
83 public Boolean call() throws Exception
|
|
84 {
|
|
85 CassandraClientWrapper client = ((CassandraClientThread)Thread.currentThread()).getClientWrapper();
|
|
86 client.batch_mutate(_ks,_mutation_map,_level);
|
|
87 return true;
|
|
88 }
|
|
89 };
|
|
90
|
|
91 Future<?> ret = m_service.submit(task);
|
|
92 return ret;
|
|
93 }
|
|
94
|
10
|
95 public Future<Boolean> insert(final String _ks,final String _key,final ColumnPath _path,final byte[] _value,final long _timestamp,final ConsistencyLevel _level)
|
|
96 {
|
|
97 Callable<Boolean> task = new Callable<Boolean>(){
|
|
98 @Override
|
|
99 public Boolean call() throws Exception
|
|
100 {
|
|
101 CassandraClientWrapper client = ((CassandraClientThread)Thread.currentThread()).getClientWrapper();
|
|
102 client.insert(_ks,_key,_path,_value,_timestamp,_level);
|
|
103 return true;
|
|
104 }
|
|
105 };
|
|
106
|
|
107 Future<Boolean> ret = m_service.submit(task);
|
|
108 return ret;
|
|
109 }
|
11
|
110
|
|
111 public Future<String> describe_cluster_name()
|
|
112 {
|
|
113 Callable<String> task = new Callable<String>(){
|
|
114 @Override
|
|
115 public String call() throws Exception
|
|
116 {
|
|
117 return null;
|
|
118 }
|
|
119 };
|
|
120
|
|
121 Future<String> ret = m_service.submit(task);
|
|
122 return ret;
|
|
123 }
|
9
|
124 }
|