Mercurial > hg > Members > shoshi > TreeCMSv2
comparison src/treecms/cassandra/util/ConcurrentCassandraClient.java @ 17:168deb591f21
commit
author | shoshi |
---|---|
date | Tue, 24 May 2011 00:33:12 +0900 |
parents | src/treecms/tree/cassandra/v1/util/ConcurrentCassandraClient.java@85061e874775 |
children |
comparison
equal
deleted
inserted
replaced
16:bb9760760744 | 17:168deb591f21 |
---|---|
1 package treecms.cassandra.util; | |
2 | |
3 import java.util.concurrent.Callable; | |
4 import java.util.List; | |
5 import java.util.Map; | |
6 import java.util.concurrent.ExecutorService; | |
7 import java.util.concurrent.Executors; | |
8 import java.util.concurrent.Future; | |
9 import java.util.concurrent.TimeUnit; | |
10 | |
11 import org.apache.cassandra.thrift.ColumnOrSuperColumn; | |
12 import org.apache.cassandra.thrift.ColumnParent; | |
13 import org.apache.cassandra.thrift.ColumnPath; | |
14 import org.apache.cassandra.thrift.ConsistencyLevel; | |
15 import org.apache.cassandra.thrift.Mutation; | |
16 import org.apache.cassandra.thrift.SlicePredicate; | |
17 | |
18 /** | |
19 * | |
20 * @author shoshi | |
21 */ | |
22 public class ConcurrentCassandraClient | |
23 { | |
24 private ExecutorService m_service; | |
25 | |
26 private ConcurrentCassandraClient(String _host,int _port,int _thCount) | |
27 { | |
28 m_service = Executors.newFixedThreadPool(_thCount,new CassandraClientThreadFactory(_host,_port)); | |
29 } | |
30 | |
31 public static ConcurrentCassandraClient newInstance(String _host,int _port,int _thCount) | |
32 { | |
33 ConcurrentCassandraClient client = new ConcurrentCassandraClient(_host,_port,_thCount); | |
34 return client; | |
35 } | |
36 | |
37 public void disconnect() | |
38 { | |
39 m_service.shutdown(); | |
40 try{ | |
41 m_service.awaitTermination(Long.MAX_VALUE,TimeUnit.DAYS); | |
42 }catch(InterruptedException _e){ | |
43 _e.printStackTrace(); | |
44 } | |
45 } | |
46 | |
47 public Future<ColumnOrSuperColumn> get(final String _ks,final String _key,final ColumnPath _path,final ConsistencyLevel _level) | |
48 { | |
49 Callable<ColumnOrSuperColumn> task = new Callable<ColumnOrSuperColumn>(){ | |
50 @Override | |
51 public ColumnOrSuperColumn call() throws Exception | |
52 { | |
53 CassandraClientWrapper client = ((CassandraClientThread)Thread.currentThread()).getClientWrapper(); | |
54 ColumnOrSuperColumn cors = client.get(_ks,_key,_path,_level); | |
55 return cors; | |
56 } | |
57 }; | |
58 | |
59 Future<ColumnOrSuperColumn> ret = m_service.submit(task); | |
60 return ret; | |
61 } | |
62 | |
63 public Future<List<ColumnOrSuperColumn>> get_slice(final String _ks,final String _key,final ColumnParent _parent,final SlicePredicate _predicate,final ConsistencyLevel _level) | |
64 { | |
65 Callable<List<ColumnOrSuperColumn>> task = new Callable<List<ColumnOrSuperColumn>>(){ | |
66 @Override | |
67 public List<ColumnOrSuperColumn> call() throws Exception | |
68 { | |
69 CassandraClientWrapper client = ((CassandraClientThread)Thread.currentThread()).getClientWrapper(); | |
70 List<ColumnOrSuperColumn> ret = client.get_slice(_ks,_key,_parent,_predicate,_level); | |
71 return ret; | |
72 } | |
73 }; | |
74 | |
75 Future<List<ColumnOrSuperColumn>> ret = m_service.submit(task); | |
76 return ret; | |
77 } | |
78 | |
79 public Future<?> batch_mutate(final String _ks,final Map<String,Map<String,List<Mutation>>> _mutation_map,final ConsistencyLevel _level) | |
80 { | |
81 Callable<Boolean> task = new Callable<Boolean>(){ | |
82 @Override | |
83 public Boolean call() throws Exception | |
84 { | |
85 CassandraClientWrapper client = ((CassandraClientThread)Thread.currentThread()).getClientWrapper(); | |
86 client.batch_mutate(_ks,_mutation_map,_level); | |
87 return true; | |
88 } | |
89 }; | |
90 | |
91 Future<?> ret = m_service.submit(task); | |
92 return ret; | |
93 } | |
94 | |
95 public Future<Boolean> insert(final String _ks,final String _key,final ColumnPath _path,final byte[] _value,final long _timestamp,final ConsistencyLevel _level) | |
96 { | |
97 Callable<Boolean> task = new Callable<Boolean>(){ | |
98 @Override | |
99 public Boolean call() throws Exception | |
100 { | |
101 CassandraClientWrapper client = ((CassandraClientThread)Thread.currentThread()).getClientWrapper(); | |
102 client.insert(_ks,_key,_path,_value,_timestamp,_level); | |
103 return true; | |
104 } | |
105 }; | |
106 | |
107 Future<Boolean> ret = m_service.submit(task); | |
108 return ret; | |
109 } | |
110 | |
111 public Future<String> describe_cluster_name() | |
112 { | |
113 Callable<String> task = new Callable<String>(){ | |
114 @Override | |
115 public String call() throws Exception | |
116 { | |
117 return null; | |
118 } | |
119 }; | |
120 | |
121 Future<String> ret = m_service.submit(task); | |
122 return ret; | |
123 } | |
124 } |