# HG changeset patch # User shoshi # Date 1303207936 -32400 # Node ID 4e0ed81bea8947058777e69505af3baef69916e9 # Parent 17ed97ca9960ea8c6282b56b76ca5ecaa2155969 added ConcurrentClient diff -r 17ed97ca9960 -r 4e0ed81bea89 src/treecms/tree/cassandra/v1/util/ConcurrentCassandraClient.java --- a/src/treecms/tree/cassandra/v1/util/ConcurrentCassandraClient.java Mon Apr 18 01:07:27 2011 +0900 +++ b/src/treecms/tree/cassandra/v1/util/ConcurrentCassandraClient.java Tue Apr 19 19:12:16 2011 +0900 @@ -4,6 +4,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.apache.cassandra.thrift.ColumnOrSuperColumn; import org.apache.cassandra.thrift.ColumnPath; @@ -28,6 +29,16 @@ return client; } + public void disconnect() + { + m_service.shutdown(); + try{ + m_service.awaitTermination(Long.MAX_VALUE,TimeUnit.DAYS); + }catch(InterruptedException _e){ + _e.printStackTrace(); + } + } + public Future get(final String _ks,final String _key,final ColumnPath _path,final ConsistencyLevel _level) { Callable task = new Callable(){ @@ -43,4 +54,20 @@ Future ret = m_service.submit(task); return ret; } + + public Future insert(final String _ks,final String _key,final ColumnPath _path,final byte[] _value,final long _timestamp,final ConsistencyLevel _level) + { + Callable task = new Callable(){ + @Override + public Boolean call() throws Exception + { + CassandraClientWrapper client = ((CassandraClientThread)Thread.currentThread()).getClientWrapper(); + client.insert(_ks,_key,_path,_value,_timestamp,_level); + return true; + } + }; + + Future ret = m_service.submit(task); + return ret; + } } diff -r 17ed97ca9960 -r 4e0ed81bea89 src/treecms/tree/cassandra/v1/util/ConcurrentCassandraClientTest.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/treecms/tree/cassandra/v1/util/ConcurrentCassandraClientTest.java Tue Apr 19 19:12:16 2011 +0900 @@ -0,0 +1,67 @@ +package treecms.tree.cassandra.v1.util; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.apache.cassandra.thrift.Cassandra; +import org.apache.cassandra.thrift.ColumnOrSuperColumn; +import org.apache.cassandra.thrift.ColumnPath; +import org.apache.cassandra.thrift.ConsistencyLevel; +import org.apache.cassandra.thrift.InvalidRequestException; +import org.apache.cassandra.thrift.NotFoundException; +import org.apache.cassandra.thrift.TimedOutException; +import org.apache.cassandra.thrift.UnavailableException; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; + +public class ConcurrentCassandraClientTest +{ + public static void main(String _args[]) throws InterruptedException, ExecutionException, InvalidRequestException, NotFoundException, UnavailableException, TimedOutException, TException + { + String host = "misaka01.cr.ie.u-ryukyu.ac.jp"; + int port = 9160; + int count = 1000; + + System.out.println("Read benchmark."); + ConcurrentCassandraClient conClient = ConcurrentCassandraClient.newInstance(host,port,7); + ColumnPath path = new ColumnPath(); + path.column_family = "Standard1"; + path.column = "fuga".getBytes(); + + long start = System.currentTimeMillis(); + for(int i = 0;i < count;i ++){ + conClient.get("Keyspace1","hoge",path,ConsistencyLevel.ONE); + } + conClient.disconnect(); + System.out.println("concurrent client time = "+(System.currentTimeMillis()-start)); + + TTransport tr = new TSocket(host,port); + Cassandra.Client client = new Cassandra.Client(new TBinaryProtocol(tr)); + tr.open(); + + start = System.currentTimeMillis(); + for(int i = 0;i < count;i ++){ + client.get("Keyspace1","hoge",path,ConsistencyLevel.ONE); + } + System.out.println("normal client time = "+(System.currentTimeMillis()-start)); + + System.out.println("Write benchmark"); + conClient = ConcurrentCassandraClient.newInstance(host,port,30); + + start = System.currentTimeMillis(); + for(int i = 0;i < count;i ++){ + conClient.insert("Keyspace1","hoge",path,"piga".getBytes(),System.currentTimeMillis()/1000,ConsistencyLevel.ONE); + } + conClient.disconnect(); + System.out.println("concurrent client time = "+(System.currentTimeMillis()-start)); + + start = System.currentTimeMillis(); + for(int i = 0;i < count;i ++){ + client.insert("Keyspace1","hoge",path,"piga".getBytes(),System.currentTimeMillis()/1000,ConsistencyLevel.ONE); + } + System.out.println("normal client time = "+(System.currentTimeMillis()-start)); + + } +}