comparison src/treecms/cassandra/util/ConcurrentCassandraClient.java @ 17:168deb591f21

commit
author shoshi
date Tue, 24 May 2011 00:33:12 +0900
parents src/treecms/tree/cassandra/v1/util/ConcurrentCassandraClient.java@85061e874775
children
comparison
equal deleted inserted replaced
16:bb9760760744 17:168deb591f21
1 package treecms.cassandra.util;
2
3 import java.util.concurrent.Callable;
4 import java.util.List;
5 import java.util.Map;
6 import java.util.concurrent.ExecutorService;
7 import java.util.concurrent.Executors;
8 import java.util.concurrent.Future;
9 import java.util.concurrent.TimeUnit;
10
11 import org.apache.cassandra.thrift.ColumnOrSuperColumn;
12 import org.apache.cassandra.thrift.ColumnParent;
13 import org.apache.cassandra.thrift.ColumnPath;
14 import org.apache.cassandra.thrift.ConsistencyLevel;
15 import org.apache.cassandra.thrift.Mutation;
16 import org.apache.cassandra.thrift.SlicePredicate;
17
18 /**
19 *
20 * @author shoshi
21 */
22 public class ConcurrentCassandraClient
23 {
24 private ExecutorService m_service;
25
26 private ConcurrentCassandraClient(String _host,int _port,int _thCount)
27 {
28 m_service = Executors.newFixedThreadPool(_thCount,new CassandraClientThreadFactory(_host,_port));
29 }
30
31 public static ConcurrentCassandraClient newInstance(String _host,int _port,int _thCount)
32 {
33 ConcurrentCassandraClient client = new ConcurrentCassandraClient(_host,_port,_thCount);
34 return client;
35 }
36
37 public void disconnect()
38 {
39 m_service.shutdown();
40 try{
41 m_service.awaitTermination(Long.MAX_VALUE,TimeUnit.DAYS);
42 }catch(InterruptedException _e){
43 _e.printStackTrace();
44 }
45 }
46
47 public Future<ColumnOrSuperColumn> get(final String _ks,final String _key,final ColumnPath _path,final ConsistencyLevel _level)
48 {
49 Callable<ColumnOrSuperColumn> task = new Callable<ColumnOrSuperColumn>(){
50 @Override
51 public ColumnOrSuperColumn call() throws Exception
52 {
53 CassandraClientWrapper client = ((CassandraClientThread)Thread.currentThread()).getClientWrapper();
54 ColumnOrSuperColumn cors = client.get(_ks,_key,_path,_level);
55 return cors;
56 }
57 };
58
59 Future<ColumnOrSuperColumn> ret = m_service.submit(task);
60 return ret;
61 }
62
63 public Future<List<ColumnOrSuperColumn>> get_slice(final String _ks,final String _key,final ColumnParent _parent,final SlicePredicate _predicate,final ConsistencyLevel _level)
64 {
65 Callable<List<ColumnOrSuperColumn>> task = new Callable<List<ColumnOrSuperColumn>>(){
66 @Override
67 public List<ColumnOrSuperColumn> call() throws Exception
68 {
69 CassandraClientWrapper client = ((CassandraClientThread)Thread.currentThread()).getClientWrapper();
70 List<ColumnOrSuperColumn> ret = client.get_slice(_ks,_key,_parent,_predicate,_level);
71 return ret;
72 }
73 };
74
75 Future<List<ColumnOrSuperColumn>> ret = m_service.submit(task);
76 return ret;
77 }
78
79 public Future<?> batch_mutate(final String _ks,final Map<String,Map<String,List<Mutation>>> _mutation_map,final ConsistencyLevel _level)
80 {
81 Callable<Boolean> task = new Callable<Boolean>(){
82 @Override
83 public Boolean call() throws Exception
84 {
85 CassandraClientWrapper client = ((CassandraClientThread)Thread.currentThread()).getClientWrapper();
86 client.batch_mutate(_ks,_mutation_map,_level);
87 return true;
88 }
89 };
90
91 Future<?> ret = m_service.submit(task);
92 return ret;
93 }
94
95 public Future<Boolean> insert(final String _ks,final String _key,final ColumnPath _path,final byte[] _value,final long _timestamp,final ConsistencyLevel _level)
96 {
97 Callable<Boolean> task = new Callable<Boolean>(){
98 @Override
99 public Boolean call() throws Exception
100 {
101 CassandraClientWrapper client = ((CassandraClientThread)Thread.currentThread()).getClientWrapper();
102 client.insert(_ks,_key,_path,_value,_timestamp,_level);
103 return true;
104 }
105 };
106
107 Future<Boolean> ret = m_service.submit(task);
108 return ret;
109 }
110
111 public Future<String> describe_cluster_name()
112 {
113 Callable<String> task = new Callable<String>(){
114 @Override
115 public String call() throws Exception
116 {
117 return null;
118 }
119 };
120
121 Future<String> ret = m_service.submit(task);
122 return ret;
123 }
124 }