annotate src/treecms/tree/cassandra/v1/util/ConcurrentCassandraClient.java @ 11:85061e874775

commit
author shoshi
date Fri, 06 May 2011 00:42:57 +0900
parents 4e0ed81bea89
children
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
9
shoshi
parents:
diff changeset
1 package treecms.tree.cassandra.v1.util;
shoshi
parents:
diff changeset
2
shoshi
parents:
diff changeset
3 import java.util.concurrent.Callable;
11
shoshi
parents: 10
diff changeset
4 import java.util.List;
shoshi
parents: 10
diff changeset
5 import java.util.Map;
9
shoshi
parents:
diff changeset
6 import java.util.concurrent.ExecutorService;
shoshi
parents:
diff changeset
7 import java.util.concurrent.Executors;
shoshi
parents:
diff changeset
8 import java.util.concurrent.Future;
10
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
9 import java.util.concurrent.TimeUnit;
9
shoshi
parents:
diff changeset
10
shoshi
parents:
diff changeset
11 import org.apache.cassandra.thrift.ColumnOrSuperColumn;
11
shoshi
parents: 10
diff changeset
12 import org.apache.cassandra.thrift.ColumnParent;
9
shoshi
parents:
diff changeset
13 import org.apache.cassandra.thrift.ColumnPath;
shoshi
parents:
diff changeset
14 import org.apache.cassandra.thrift.ConsistencyLevel;
11
shoshi
parents: 10
diff changeset
15 import org.apache.cassandra.thrift.Mutation;
shoshi
parents: 10
diff changeset
16 import org.apache.cassandra.thrift.SlicePredicate;
9
shoshi
parents:
diff changeset
17
shoshi
parents:
diff changeset
18 /**
shoshi
parents:
diff changeset
19 *
shoshi
parents:
diff changeset
20 * @author shoshi
shoshi
parents:
diff changeset
21 */
shoshi
parents:
diff changeset
22 public class ConcurrentCassandraClient
shoshi
parents:
diff changeset
23 {
shoshi
parents:
diff changeset
24 private ExecutorService m_service;
shoshi
parents:
diff changeset
25
shoshi
parents:
diff changeset
26 private ConcurrentCassandraClient(String _host,int _port,int _thCount)
shoshi
parents:
diff changeset
27 {
shoshi
parents:
diff changeset
28 m_service = Executors.newFixedThreadPool(_thCount,new CassandraClientThreadFactory(_host,_port));
shoshi
parents:
diff changeset
29 }
shoshi
parents:
diff changeset
30
shoshi
parents:
diff changeset
31 public static ConcurrentCassandraClient newInstance(String _host,int _port,int _thCount)
shoshi
parents:
diff changeset
32 {
shoshi
parents:
diff changeset
33 ConcurrentCassandraClient client = new ConcurrentCassandraClient(_host,_port,_thCount);
shoshi
parents:
diff changeset
34 return client;
shoshi
parents:
diff changeset
35 }
shoshi
parents:
diff changeset
36
10
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
37 public void disconnect()
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
38 {
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
39 m_service.shutdown();
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
40 try{
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
41 m_service.awaitTermination(Long.MAX_VALUE,TimeUnit.DAYS);
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
42 }catch(InterruptedException _e){
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
43 _e.printStackTrace();
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
44 }
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
45 }
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
46
9
shoshi
parents:
diff changeset
47 public Future<ColumnOrSuperColumn> get(final String _ks,final String _key,final ColumnPath _path,final ConsistencyLevel _level)
shoshi
parents:
diff changeset
48 {
shoshi
parents:
diff changeset
49 Callable<ColumnOrSuperColumn> task = new Callable<ColumnOrSuperColumn>(){
shoshi
parents:
diff changeset
50 @Override
shoshi
parents:
diff changeset
51 public ColumnOrSuperColumn call() throws Exception
shoshi
parents:
diff changeset
52 {
shoshi
parents:
diff changeset
53 CassandraClientWrapper client = ((CassandraClientThread)Thread.currentThread()).getClientWrapper();
shoshi
parents:
diff changeset
54 ColumnOrSuperColumn cors = client.get(_ks,_key,_path,_level);
shoshi
parents:
diff changeset
55 return cors;
shoshi
parents:
diff changeset
56 }
shoshi
parents:
diff changeset
57 };
shoshi
parents:
diff changeset
58
shoshi
parents:
diff changeset
59 Future<ColumnOrSuperColumn> ret = m_service.submit(task);
shoshi
parents:
diff changeset
60 return ret;
shoshi
parents:
diff changeset
61 }
10
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
62
11
shoshi
parents: 10
diff changeset
63 public Future<List<ColumnOrSuperColumn>> get_slice(final String _ks,final String _key,final ColumnParent _parent,final SlicePredicate _predicate,final ConsistencyLevel _level)
shoshi
parents: 10
diff changeset
64 {
shoshi
parents: 10
diff changeset
65 Callable<List<ColumnOrSuperColumn>> task = new Callable<List<ColumnOrSuperColumn>>(){
shoshi
parents: 10
diff changeset
66 @Override
shoshi
parents: 10
diff changeset
67 public List<ColumnOrSuperColumn> call() throws Exception
shoshi
parents: 10
diff changeset
68 {
shoshi
parents: 10
diff changeset
69 CassandraClientWrapper client = ((CassandraClientThread)Thread.currentThread()).getClientWrapper();
shoshi
parents: 10
diff changeset
70 List<ColumnOrSuperColumn> ret = client.get_slice(_ks,_key,_parent,_predicate,_level);
shoshi
parents: 10
diff changeset
71 return ret;
shoshi
parents: 10
diff changeset
72 }
shoshi
parents: 10
diff changeset
73 };
shoshi
parents: 10
diff changeset
74
shoshi
parents: 10
diff changeset
75 Future<List<ColumnOrSuperColumn>> ret = m_service.submit(task);
shoshi
parents: 10
diff changeset
76 return ret;
shoshi
parents: 10
diff changeset
77 }
shoshi
parents: 10
diff changeset
78
shoshi
parents: 10
diff changeset
79 public Future<?> batch_mutate(final String _ks,final Map<String,Map<String,List<Mutation>>> _mutation_map,final ConsistencyLevel _level)
shoshi
parents: 10
diff changeset
80 {
shoshi
parents: 10
diff changeset
81 Callable<Boolean> task = new Callable<Boolean>(){
shoshi
parents: 10
diff changeset
82 @Override
shoshi
parents: 10
diff changeset
83 public Boolean call() throws Exception
shoshi
parents: 10
diff changeset
84 {
shoshi
parents: 10
diff changeset
85 CassandraClientWrapper client = ((CassandraClientThread)Thread.currentThread()).getClientWrapper();
shoshi
parents: 10
diff changeset
86 client.batch_mutate(_ks,_mutation_map,_level);
shoshi
parents: 10
diff changeset
87 return true;
shoshi
parents: 10
diff changeset
88 }
shoshi
parents: 10
diff changeset
89 };
shoshi
parents: 10
diff changeset
90
shoshi
parents: 10
diff changeset
91 Future<?> ret = m_service.submit(task);
shoshi
parents: 10
diff changeset
92 return ret;
shoshi
parents: 10
diff changeset
93 }
shoshi
parents: 10
diff changeset
94
10
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
95 public Future<Boolean> insert(final String _ks,final String _key,final ColumnPath _path,final byte[] _value,final long _timestamp,final ConsistencyLevel _level)
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
96 {
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
97 Callable<Boolean> task = new Callable<Boolean>(){
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
98 @Override
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
99 public Boolean call() throws Exception
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
100 {
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
101 CassandraClientWrapper client = ((CassandraClientThread)Thread.currentThread()).getClientWrapper();
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
102 client.insert(_ks,_key,_path,_value,_timestamp,_level);
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
103 return true;
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
104 }
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
105 };
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
106
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
107 Future<Boolean> ret = m_service.submit(task);
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
108 return ret;
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
109 }
11
shoshi
parents: 10
diff changeset
110
shoshi
parents: 10
diff changeset
111 public Future<String> describe_cluster_name()
shoshi
parents: 10
diff changeset
112 {
shoshi
parents: 10
diff changeset
113 Callable<String> task = new Callable<String>(){
shoshi
parents: 10
diff changeset
114 @Override
shoshi
parents: 10
diff changeset
115 public String call() throws Exception
shoshi
parents: 10
diff changeset
116 {
shoshi
parents: 10
diff changeset
117 return null;
shoshi
parents: 10
diff changeset
118 }
shoshi
parents: 10
diff changeset
119 };
shoshi
parents: 10
diff changeset
120
shoshi
parents: 10
diff changeset
121 Future<String> ret = m_service.submit(task);
shoshi
parents: 10
diff changeset
122 return ret;
shoshi
parents: 10
diff changeset
123 }
9
shoshi
parents:
diff changeset
124 }