changeset 10:4e0ed81bea89

added ConcurrentClient
author shoshi
date Tue, 19 Apr 2011 19:12:16 +0900
parents 17ed97ca9960
children 85061e874775
files src/treecms/tree/cassandra/v1/util/ConcurrentCassandraClient.java src/treecms/tree/cassandra/v1/util/ConcurrentCassandraClientTest.java
diffstat 2 files changed, 94 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- a/src/treecms/tree/cassandra/v1/util/ConcurrentCassandraClient.java	Mon Apr 18 01:07:27 2011 +0900
+++ b/src/treecms/tree/cassandra/v1/util/ConcurrentCassandraClient.java	Tue Apr 19 19:12:16 2011 +0900
@@ -4,6 +4,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.cassandra.thrift.ColumnOrSuperColumn;
 import org.apache.cassandra.thrift.ColumnPath;
@@ -28,6 +29,16 @@
 		return client;
 	}
 	
+	public void disconnect()
+	{
+		m_service.shutdown();
+		try{
+			m_service.awaitTermination(Long.MAX_VALUE,TimeUnit.DAYS);
+		}catch(InterruptedException _e){
+			_e.printStackTrace();
+		}
+	}
+	
 	public Future<ColumnOrSuperColumn> get(final String _ks,final String _key,final ColumnPath _path,final ConsistencyLevel _level)
 	{
 		Callable<ColumnOrSuperColumn> task = new Callable<ColumnOrSuperColumn>(){
@@ -43,4 +54,20 @@
 		Future<ColumnOrSuperColumn> ret = m_service.submit(task);
 		return ret;	
 	}
+	
+	public Future<Boolean> insert(final String _ks,final String _key,final ColumnPath _path,final byte[] _value,final long _timestamp,final ConsistencyLevel _level)
+	{
+		Callable<Boolean> task = new Callable<Boolean>(){
+			@Override
+			public Boolean call() throws Exception
+			{
+				CassandraClientWrapper client = ((CassandraClientThread)Thread.currentThread()).getClientWrapper();
+				client.insert(_ks,_key,_path,_value,_timestamp,_level);
+				return true;
+			}
+		};
+		
+		Future<Boolean> ret = m_service.submit(task);
+		return ret;
+	}
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/treecms/tree/cassandra/v1/util/ConcurrentCassandraClientTest.java	Tue Apr 19 19:12:16 2011 +0900
@@ -0,0 +1,67 @@
+package treecms.tree.cassandra.v1.util;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.ColumnPath;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.NotFoundException;
+import org.apache.cassandra.thrift.TimedOutException;
+import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+
+public class ConcurrentCassandraClientTest
+{
+	public static void main(String _args[]) throws InterruptedException, ExecutionException, InvalidRequestException, NotFoundException, UnavailableException, TimedOutException, TException
+	{
+		String host = "misaka01.cr.ie.u-ryukyu.ac.jp";
+		int port = 9160;
+		int count = 1000;
+		
+		System.out.println("Read benchmark.");
+		ConcurrentCassandraClient conClient = ConcurrentCassandraClient.newInstance(host,port,7);
+		ColumnPath path = new ColumnPath();
+		path.column_family = "Standard1";
+		path.column = "fuga".getBytes();
+		
+		long start = System.currentTimeMillis();
+		for(int i = 0;i < count;i ++){
+			conClient.get("Keyspace1","hoge",path,ConsistencyLevel.ONE); 
+		}
+		conClient.disconnect();
+		System.out.println("concurrent client time = "+(System.currentTimeMillis()-start));
+		
+		TTransport tr = new TSocket(host,port);
+		Cassandra.Client client = new Cassandra.Client(new TBinaryProtocol(tr));
+		tr.open();
+		
+		start = System.currentTimeMillis();
+		for(int i = 0;i < count;i ++){
+			client.get("Keyspace1","hoge",path,ConsistencyLevel.ONE);
+		}
+		System.out.println("normal client time = "+(System.currentTimeMillis()-start));
+		
+		System.out.println("Write benchmark");
+		conClient = ConcurrentCassandraClient.newInstance(host,port,30);
+		
+		start = System.currentTimeMillis();
+		for(int i = 0;i < count;i ++){
+			conClient.insert("Keyspace1","hoge",path,"piga".getBytes(),System.currentTimeMillis()/1000,ConsistencyLevel.ONE);
+		}
+		conClient.disconnect();
+		System.out.println("concurrent client time = "+(System.currentTimeMillis()-start));
+		
+		start = System.currentTimeMillis();
+		for(int i = 0;i < count;i ++){
+			client.insert("Keyspace1","hoge",path,"piga".getBytes(),System.currentTimeMillis()/1000,ConsistencyLevel.ONE);
+		}
+		System.out.println("normal client time = "+(System.currentTimeMillis()-start));
+		
+	}
+}