diff src/treecms/tree/cassandra/v1/CassandraForest.java @ 7:fc19e38b669b

added concurrent access client for cassandr
author shoshi
date Thu, 17 Mar 2011 23:24:08 +0900
parents 12604eb6b615
children f96193babac0
line wrap: on
line diff
--- a/src/treecms/tree/cassandra/v1/CassandraForest.java	Mon Mar 14 23:24:38 2011 +0900
+++ b/src/treecms/tree/cassandra/v1/CassandraForest.java	Thu Mar 17 23:24:08 2011 +0900
@@ -10,6 +10,7 @@
 import java.util.StringTokenizer;
 import java.util.UUID;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -37,9 +38,9 @@
 import treecms.tree.id.AbstractRandomNodeID;
 
 /**
- * implementation of TreeCMS with Cassandra backend.
+ * Cassandra上で非破壊的木構造を実現するためのForestの実装です。
  * 
- * TreeCMSKS.NodeTable (table of all nodes)
+ * TreeCMSKS.NODETABLE (table of all nodes)
  * 
  * +---------------------------------------------+
  * +   Key   |   Col1   |  Col2  |  Col3  | ...  |
@@ -47,7 +48,7 @@
  * +  NodeID | Children | _attr1 | _attr2 | ...  |
  * +---------------------------------------------+
  * 
- * TreeCMSKS.TipTable (table of tip)
+ * TreeCMSKS.TIPTABLE (table of tip)
  * 
  * +--------------------+
  * +   Key   |   Col1   |
@@ -63,197 +64,47 @@
 	ExecutorService m_service;
 	
 	//column families.
-	static final String NODETABLE = "NodeTable";
-	static final String TIPTABLE = "TipTable";
+	static final String NODETABLE = "NODETABLE";
+	static final String TIPTABLE = "TIPTABLE";
 	
-	//reserved word.
-	static final String NODE = "Node";
-	static final String CHILDREN = "Children";
+	//reserved column.
+	static final byte[] TIPID = "TIPID".getBytes();
+	static final byte[] CHILDREN = "CHILDREN".getBytes();
 	static final char PREFIX = '_';
 	
-	//id table reserved
-	static final String TIPID = "TipID";
+	//cache
+	private ConcurrentHashMap<NodeID,CassandraNode> m_cache;
+	private ConcurrentHashMap<String,CassandraNode> m_tipCache;
 	
 	public CassandraForest(String _host,int _port,String _ks,int _threads)
 	{
-		m_service = Executors.newFixedThreadPool(_threads,new RequestSenderFactory(_host,_port,_ks));
+		m_service = Executors.newFixedThreadPool(_threads,new ClientThreadFactory(_host,_port));
+		m_cache = new ConcurrentHashMap<NodeID,CassandraNode>();
+		m_tipCache = new ConcurrentHashMap<String,CassandraNode>();
 	}
 
 	@Override
 	public Node get(NodeID _id)
 	{
-			
 		return new CassandraNode(this,_id);
 	}
 	
 	@Override
 	public Node create()
 	{
-		return createNode(null,null); //create new node
+		return createNode(null,null);
 	}
 	
 	public NodeData getNodeData(NodeID _id)
 	{
 		final NodeID id = _id;
-		
-		//create future task.
-		Callable<List<ColumnOrSuperColumn>> task = new Callable<List<ColumnOrSuperColumn>>(){
-			@Override
-			public List<ColumnOrSuperColumn> call() throws Exception
-			{
-				RequestSender sender = (RequestSender)Thread.currentThread();
-				List<ColumnOrSuperColumn> res = sender.get_slice(NODETABLE,id.toString(),ConsistencyLevel.ONE);
-				return res;
-			}
-		};
-		Future<List<ColumnOrSuperColumn>> future = m_service.submit(task);
-		
-		NodeData data = new NodeData();
-		try{
-			List<ColumnOrSuperColumn> slice = future.get();
-			
-			//iterate column
-			for(ColumnOrSuperColumn column : slice){
-				String name = new String(column.column.name);
-				
-				//if column name matches CHILDREN , deserialize value to child list.
-				if(name.equals(CHILDREN)){
-					List<Node> tmp = deserialize(new String(column.column.value));
-					data.add(tmp);
-				}else{
-					String key = name.substring(1); //size of prefix
-					data.set(key.getBytes(),column.column.value);
-				}
-			}
-		}catch(Exception _e){
-			_e.printStackTrace();
-		}
-		
-		return data;
-	}
-	
-	public List<CassandraNode> multiCreateNode(List<CassandraNode> _list)
-	{
-		final Map<String,Map<String,List<Mutation>>> mutationMap = new HashMap<String,Map<String,List<Mutation>>>();
-		
-		Map<String,List<Mutation>> nodeTable = new HashMap<String,List<Mutation>>();
-		Map<String,List<Mutation>> tipTable = new HashMap<String,List<Mutation>>();
-		for(CassandraNode node : _list){
-			LinkedList<Mutation> list = new LinkedList<Mutation>();
-			Mutation mut = new Mutation();
-			ColumnOrSuperColumn column = new ColumnOrSuperColumn();
-			mut.column_or_supercolumn = column;
-			column.column.name = CHILDREN.getBytes(); 
-			column.column.value = serialize(node.getData().list()).getBytes();
-			list.add(mut);
-			
-			for(byte[] key : node.getData().keys()){
-				mut = new Mutation();
-				column = new ColumnOrSuperColumn();
-				mut.column_or_supercolumn = column;
-				column.column.name = key;
-				column.column.value = node.getData().get(key);
-				
-				list.add(mut);
-			}
-			
-			nodeTable.put(node.getID().toString(),list);
-			
-			mut = new Mutation();
-			column = new ColumnOrSuperColumn();
-			column.column.name = TIPID.getBytes();
-			column.column.value = node.getID().getVersion().getBytes();
-			list = new LinkedList<Mutation>();
-			list.add(mut);
-			tipTable.put(node.getID().getUUID(),list);
-		}
-		mutationMap.put(NODETABLE,nodeTable);
-		mutationMap.put(TIPTABLE,tipTable);
-		
-		Runnable task = new Runnable(){
-			@Override
-			public void run()
-			{
-				RequestSender sender = (RequestSender)Thread.currentThread();
-				sender.batch_mutate(mutationMap,ConsistencyLevel.ONE);
-			}
-		};
-		
-		m_service.execute(task);
-		
-		return _list;
-	}
-	
-	/**
-	 * list serializer.
-	 * ex. list{"hoge","fuga"} -> "hoge,fuga" 
-	 * @param _list
-	 * @return selialized string
-	 */
-	public String serialize(List<Node> _list)
-	{
-		String prefix = "";
-		StringBuffer buf = new StringBuffer();
-		for(Node child : _list){
-			buf.append(prefix+child.getID().toString());
-			prefix = ",";
-		}
-		
-		return buf.toString();
-	}
-	
-	/**
-	 * string deserializer.
-	 * ex. "hoge,fuga" -> list{"hoge","fuga"}
-	 * @param _selialized
-	 * @return list
-	 */
-	public LinkedList<Node> deserialize(String _serialized) throws IllegalArgumentException
-	{
-		StringTokenizer tokens = new StringTokenizer(_serialized,",");
-		LinkedList<Node> res = new LinkedList<Node>();
-		
-		while(tokens.hasMoreElements()){
-			String tmp = tokens.nextToken();
-			StringTokenizer uuidAndVer = new StringTokenizer(tmp,"@");
-			
-			try{
-				NodeID id = createID(uuidAndVer.nextToken(),uuidAndVer.nextToken());
-				res.add(get(id));
-			}catch(Exception _e){
-				throw new IllegalArgumentException("unable to deserialize string ["+_serialized+"]",_e);
-			}
-		}
-		
-		return res;
+		return null;
 	}
 
 	public NodeID getTipID(String _uuid)
 	{
 		final String uuid = _uuid;
-		Callable<byte[]> task = new Callable<byte[]>(){
-			@Override
-			public byte[] call() throws Exception
-			{
-				RequestSender sender = (RequestSender)Thread.currentThread();
-				byte[] value = sender.get(NODETABLE,uuid,TIPID.getBytes(),ConsistencyLevel.ONE);
-				return value;
-			}
-		};
-		
-		Future<byte[]> future = m_service.submit(task);
-		
-		try {
-			byte[] value = future.get();
-			String id = new String(value);
-			StringTokenizer token = new StringTokenizer(id,"@");
-			NodeID nodeID = createID(token.nextToken(),token.nextToken());
-			return nodeID;
-		}catch(Exception _e){
-			_e.printStackTrace();
-		}
-		
-		return null; //not found.
+		return null;
 	}
 	
 	public Node createNode(NodeID _id,NodeData _data)
@@ -265,65 +116,7 @@
 			@Override
 			public Boolean call() throws Exception
 			{
-				RequestSender sender = (RequestSender)Thread.currentThread();
-				
-				//mutation map
-				HashMap<String,Map<String,List<Mutation>>> map = new HashMap<String,Map<String,List<Mutation>>>();
-				
-				/*
-				 * create mutation map for NODETABLE
-				 */
-				if(data != null){
-					LinkedList<Mutation> list = new LinkedList<Mutation>();
-					HashMap<String,List<Mutation>> info = new HashMap<String,List<Mutation>>();
-					Iterator<Node> itr = data.list().iterator();
-					
-					/*
-					 * create CSV from child list.
-					 */
-					StringBuffer buffer = new StringBuffer();
-					for(String prefix = "";itr.hasNext();prefix = ","){
-						buffer.append(String.format("%s%s",prefix,itr.next().getID().toString()));
-					}
-					Mutation mutChildren = new Mutation();
-					ColumnOrSuperColumn children = new ColumnOrSuperColumn();
-					children.column.name = CHILDREN.getBytes();
-					children.column.value = buffer.toString().getBytes();
-					mutChildren.column_or_supercolumn = children;
-					list.add(mutChildren);
-					
-					/*
-					 *  
-					 */
-					for(byte[] key : data.keys()){
-						Mutation mut = new Mutation();
-						ColumnOrSuperColumn column = new ColumnOrSuperColumn();
-						column.column.name = key;
-						column.column.value = data.get(key);
-						mut.column_or_supercolumn = column;
-						list.add(mut);
-					}
-					info.put(id.toString(),list);
-					
-					map.put(NODETABLE,info);
-				}
-				
-				/*
-				 * create mutation map for NODEIDTABLE
-				 */
-				HashMap<String,List<Mutation>> idtable_mutations = new HashMap<String,List<Mutation>>();
-				LinkedList<Mutation> list = new LinkedList<Mutation>();
-				
-				Mutation mutTipID = new Mutation();
-				ColumnOrSuperColumn tipID = new ColumnOrSuperColumn();
-				tipID.column.name = TIPID.getBytes();
-				tipID.column.value = id.getVersion().getBytes();
-				mutTipID.column_or_supercolumn = tipID;
-				
-				list.add(mutTipID);
-				idtable_mutations.put(TIPTABLE,list);
-				
-				return sender.batch_mutate(map,ConsistencyLevel.ONE);
+				return true;
 			}
 		};
 		
@@ -372,141 +165,16 @@
 			return m_version;
 		}
 	}
-	
-	private static class RequestSender extends Thread
+
+	@Override
+	public Node create(NodeData data)
 	{
-		private int m_port;
-		private String m_host,m_ks;
-		private Cassandra.Client m_client;
-		
-		public RequestSender(Runnable _runnable,String _host,int _port,String _ks) throws TTransportException
-		{
-			super(_runnable);
-			m_port = _port;
-			m_host = _host;
-			m_ks = _ks;
-			
-			connect();
-		}
-		
-		public void connect() throws TTransportException
-		{
-			TTransport tr = new TSocket(m_host,m_port);
-			TProtocol proto = new TBinaryProtocol(tr);
-			m_client = new Cassandra.Client(proto);
-			
-			tr.open();			
-		}
-		
-		public static RequestSender newInstance(Runnable _runnable,String _host,int _port,String _ks)
-		{
-			RequestSender sender = null;
-			try {
-				sender = new RequestSender(_runnable,_host,_port,_ks);
-			} catch (TTransportException _e) {
-				_e.printStackTrace();
-			}
-			
-			return sender;
-		}
-		
-		public byte[] get(String _cf,String _key,byte[] _name,ConsistencyLevel _lv)
-		{
-			byte[] ret = null;
-			
-			ColumnPath path = new ColumnPath();
-			path.column_family = _cf;
-			path.column = _name;
-			try {
-				ColumnOrSuperColumn cors = m_client.get(m_ks,_key,path,_lv);
-				ret = cors.column.value;
-			}catch(NotFoundException _e){
-				System.out.println(String.format("column not found [%s][%s][%s]",_cf,_key,new String(_name)));
-			}catch(Exception _e){
-				_e.printStackTrace();
-			}
-			
-			return ret;
-		}
-		
-		public boolean insert(String _cf,String _key,byte[] _name,byte[] _value,ConsistencyLevel _lv)
-		{	
-			ColumnPath path = new ColumnPath();
-			path.column_family = _cf;
-			path.column = _name;
-			
-			try{
-				m_client.insert(m_ks,_key,path,_value,System.currentTimeMillis()/1000,_lv);
-				return true;
-			}catch(Exception _e){
-				_e.printStackTrace();
-			}
-			
-			return false;
-		}
-		
-		public List<ColumnOrSuperColumn> get_slice(String _cf,String _key,ConsistencyLevel _lv)
-		{
-			List<ColumnOrSuperColumn> ret = null;
-			SliceRange sr = new SliceRange(new byte[0],new byte[0],false,-1);
-			SlicePredicate sp = new SlicePredicate();
-			sp.slice_range = sr;
-			
-			try {
-				ret = m_client.get_slice(m_ks,_key,new ColumnParent(_cf),sp,_lv);
-			}catch(Exception _e){
-				_e.printStackTrace();
-			}
-			
-			return ret;
-		}
-		
-		public boolean batch_insert(String _cf,Map<String,List<ColumnOrSuperColumn>> _map,ConsistencyLevel _lv)
-		{
-			try{
-				m_client.batch_insert(m_ks,_cf,_map,_lv);
-				return true;
-			}catch(Exception _e){
-				_e.printStackTrace();
-			}
-			
-			return false;
-		}
-		
-		public boolean batch_mutate(Map<String,Map<String,List<Mutation>>> _mutateMap,ConsistencyLevel _lv)
-		{
-			try {
-				m_client.batch_mutate(m_ks,_mutateMap,_lv);
-				return true;
-			}catch(Exception _e){
-				_e.printStackTrace();
-			}
-			
-			return false;
-		}
-		
-		public String toString()
-		{
-			return "[thread="+this.getName()+",host="+m_host+",port="+m_port+",ks="+m_ks+"]";
-		}
+		return null;
 	}
-	
-	private class RequestSenderFactory implements ThreadFactory
+
+	@Override
+	public Node getTip(String uuid)
 	{
-		private int m_port;
-		private String m_host,m_ks;
-		
-		public RequestSenderFactory(String _host,int _port,String _ks)
-		{
-			m_host = _host;
-			m_port = _port;
-			m_ks = _ks;
-		}
-		
-		@Override
-		public Thread newThread(Runnable _runnable)
-		{
-			return RequestSender.newInstance(_runnable,m_host,m_port,m_ks);
-		}
+		return null;
 	}
 }