annotate src/treecms/tree/cassandra/v1/util/ConcurrentCassandraClient.java @ 10:4e0ed81bea89

added ConcurrentClient
author shoshi
date Tue, 19 Apr 2011 19:12:16 +0900
parents 17ed97ca9960
children 85061e874775
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
9
shoshi
parents:
diff changeset
1 package treecms.tree.cassandra.v1.util;
shoshi
parents:
diff changeset
2
shoshi
parents:
diff changeset
3 import java.util.concurrent.Callable;
shoshi
parents:
diff changeset
4 import java.util.concurrent.ExecutorService;
shoshi
parents:
diff changeset
5 import java.util.concurrent.Executors;
shoshi
parents:
diff changeset
6 import java.util.concurrent.Future;
10
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
7 import java.util.concurrent.TimeUnit;
9
shoshi
parents:
diff changeset
8
shoshi
parents:
diff changeset
9 import org.apache.cassandra.thrift.ColumnOrSuperColumn;
shoshi
parents:
diff changeset
10 import org.apache.cassandra.thrift.ColumnPath;
shoshi
parents:
diff changeset
11 import org.apache.cassandra.thrift.ConsistencyLevel;
shoshi
parents:
diff changeset
12
shoshi
parents:
diff changeset
13 /**
shoshi
parents:
diff changeset
14 *
shoshi
parents:
diff changeset
15 * @author shoshi
shoshi
parents:
diff changeset
16 */
shoshi
parents:
diff changeset
17 public class ConcurrentCassandraClient
shoshi
parents:
diff changeset
18 {
shoshi
parents:
diff changeset
19 private ExecutorService m_service;
shoshi
parents:
diff changeset
20
shoshi
parents:
diff changeset
21 private ConcurrentCassandraClient(String _host,int _port,int _thCount)
shoshi
parents:
diff changeset
22 {
shoshi
parents:
diff changeset
23 m_service = Executors.newFixedThreadPool(_thCount,new CassandraClientThreadFactory(_host,_port));
shoshi
parents:
diff changeset
24 }
shoshi
parents:
diff changeset
25
shoshi
parents:
diff changeset
26 public static ConcurrentCassandraClient newInstance(String _host,int _port,int _thCount)
shoshi
parents:
diff changeset
27 {
shoshi
parents:
diff changeset
28 ConcurrentCassandraClient client = new ConcurrentCassandraClient(_host,_port,_thCount);
shoshi
parents:
diff changeset
29 return client;
shoshi
parents:
diff changeset
30 }
shoshi
parents:
diff changeset
31
10
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
32 public void disconnect()
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
33 {
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
34 m_service.shutdown();
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
35 try{
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
36 m_service.awaitTermination(Long.MAX_VALUE,TimeUnit.DAYS);
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
37 }catch(InterruptedException _e){
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
38 _e.printStackTrace();
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
39 }
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
40 }
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
41
9
shoshi
parents:
diff changeset
42 public Future<ColumnOrSuperColumn> get(final String _ks,final String _key,final ColumnPath _path,final ConsistencyLevel _level)
shoshi
parents:
diff changeset
43 {
shoshi
parents:
diff changeset
44 Callable<ColumnOrSuperColumn> task = new Callable<ColumnOrSuperColumn>(){
shoshi
parents:
diff changeset
45 @Override
shoshi
parents:
diff changeset
46 public ColumnOrSuperColumn call() throws Exception
shoshi
parents:
diff changeset
47 {
shoshi
parents:
diff changeset
48 CassandraClientWrapper client = ((CassandraClientThread)Thread.currentThread()).getClientWrapper();
shoshi
parents:
diff changeset
49 ColumnOrSuperColumn cors = client.get(_ks,_key,_path,_level);
shoshi
parents:
diff changeset
50 return cors;
shoshi
parents:
diff changeset
51 }
shoshi
parents:
diff changeset
52 };
shoshi
parents:
diff changeset
53
shoshi
parents:
diff changeset
54 Future<ColumnOrSuperColumn> ret = m_service.submit(task);
shoshi
parents:
diff changeset
55 return ret;
shoshi
parents:
diff changeset
56 }
10
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
57
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
58 public Future<Boolean> insert(final String _ks,final String _key,final ColumnPath _path,final byte[] _value,final long _timestamp,final ConsistencyLevel _level)
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
59 {
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
60 Callable<Boolean> task = new Callable<Boolean>(){
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
61 @Override
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
62 public Boolean call() throws Exception
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
63 {
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
64 CassandraClientWrapper client = ((CassandraClientThread)Thread.currentThread()).getClientWrapper();
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
65 client.insert(_ks,_key,_path,_value,_timestamp,_level);
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
66 return true;
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
67 }
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
68 };
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
69
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
70 Future<Boolean> ret = m_service.submit(task);
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
71 return ret;
4e0ed81bea89 added ConcurrentClient
shoshi
parents: 9
diff changeset
72 }
9
shoshi
parents:
diff changeset
73 }