diff src/treecms/tree/cassandra/v1/CassandraForest.java @ 4:f5ed85be5640

finished treecms.cassandra.v1 implementation (not tested yet)
author shoshi
date Thu, 24 Feb 2011 21:30:18 +0900
parents
children 12604eb6b615
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/treecms/tree/cassandra/v1/CassandraForest.java	Thu Feb 24 21:30:18 2011 +0900
@@ -0,0 +1,511 @@
+package treecms.tree.cassandra.v1;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.StringTokenizer;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.ColumnPath;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.Mutation;
+import org.apache.cassandra.thrift.NotFoundException;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SliceRange;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import treecms.api.Forest;
+import treecms.api.Node;
+import treecms.api.NodeData;
+import treecms.api.NodeID;
+import treecms.tree.id.RandomNodeID;
+
+/**
+ * implementation of TreeCMS with Cassandra backend.
+ * 
+ * TreeCMSKS.NodeTable (table of all nodes)
+ * 
+ * +---------------------------------------------+
+ * +   Key   |   Col1   |  Col2  |  Col3  | ...  |
+ * +---------------------------------------------+
+ * +  NodeID | Children | _attr1 | _attr2 | ...  |
+ * +---------------------------------------------+
+ * 
+ * TreeCMSKS.TipTable (table of tip)
+ * 
+ * +--------------------+
+ * +   Key   |   Col1   |
+ * +--------------------+
+ * +  NodeID |  version |
+ * +--------------------+
+ * 
+ * @author shoshi
+ */
+
+public class CassandraForest implements Forest
+{
+	ExecutorService m_service;
+	
+	//column families.
+	static final String NODETABLE = "NodeTable";
+	static final String TIPTABLE = "TipTable";
+	
+	//reserved word.
+	static final String NODE = "Node";
+	static final String CHILDREN = "Children";
+	static final char PREFIX = '_';
+	
+	//id table reserved
+	static final String TIPID = "TipID";
+	
+	public CassandraForest(String _host,int _port,String _ks,int _threads)
+	{
+		m_service = Executors.newFixedThreadPool(_threads,new RequestSenderFactory(_host,_port,_ks));
+	}
+
+	@Override
+	public Node get(NodeID _id)
+	{
+			
+		return new CassandraNode(this,_id);
+	}
+	
+	@Override
+	public Node create()
+	{
+		return createNode(null,null); //create new node
+	}
+	
+	public NodeData getNodeData(NodeID _id)
+	{
+		final NodeID id = _id;
+		
+		//create future task.
+		Callable<List<ColumnOrSuperColumn>> task = new Callable<List<ColumnOrSuperColumn>>(){
+			@Override
+			public List<ColumnOrSuperColumn> call() throws Exception
+			{
+				RequestSender sender = (RequestSender)Thread.currentThread();
+				List<ColumnOrSuperColumn> res = sender.get_slice(NODETABLE,id.toString(),ConsistencyLevel.ONE);
+				return res;
+			}
+		};
+		Future<List<ColumnOrSuperColumn>> future = m_service.submit(task);
+		
+		NodeData data = new NodeData();
+		try{
+			List<ColumnOrSuperColumn> slice = future.get();
+			
+			//iterate column
+			for(ColumnOrSuperColumn column : slice){
+				String name = new String(column.column.name);
+				
+				//if column name matches CHILDREN , deserialize value to child list.
+				if(name.equals(CHILDREN)){
+					List<Node> tmp = deserialize(new String(column.column.value));
+					data.add(tmp);
+				}else{
+					String key = name.substring(1); //size of prefix
+					data.set(key.getBytes(),column.column.value);
+				}
+			}
+		}catch(Exception _e){
+			_e.printStackTrace();
+		}
+		
+		return data;
+	}
+	
+	public List<CassandraNode> multiCreateNode(List<CassandraNode> _list)
+	{
+		final Map<String,Map<String,List<Mutation>>> mutationMap = new HashMap<String,Map<String,List<Mutation>>>();
+		
+		Map<String,List<Mutation>> nodeTable = new HashMap<String,List<Mutation>>();
+		Map<String,List<Mutation>> tipTable = new HashMap<String,List<Mutation>>();
+		for(CassandraNode node : _list){
+			LinkedList<Mutation> list = new LinkedList<Mutation>();
+			Mutation mut = new Mutation();
+			ColumnOrSuperColumn column = new ColumnOrSuperColumn();
+			mut.column_or_supercolumn = column;
+			column.column.name = CHILDREN.getBytes(); 
+			column.column.value = serialize(node.getData().list()).getBytes();
+			list.add(mut);
+			
+			for(byte[] key : node.getData().keys()){
+				mut = new Mutation();
+				column = new ColumnOrSuperColumn();
+				mut.column_or_supercolumn = column;
+				column.column.name = key;
+				column.column.value = node.getData().get(key);
+				
+				list.add(mut);
+			}
+			
+			nodeTable.put(node.getID().toString(),list);
+			
+			mut = new Mutation();
+			column = new ColumnOrSuperColumn();
+			column.column.name = TIPID.getBytes();
+			column.column.value = node.getID().getVersion().getBytes();
+			list = new LinkedList<Mutation>();
+			list.add(mut);
+			tipTable.put(node.getID().getUUID(),list);
+		}
+		mutationMap.put(NODETABLE,nodeTable);
+		mutationMap.put(TIPTABLE,tipTable);
+		
+		Runnable task = new Runnable(){
+			@Override
+			public void run()
+			{
+				RequestSender sender = (RequestSender)Thread.currentThread();
+				sender.batch_mutate(mutationMap,ConsistencyLevel.ONE);
+			}
+		};
+		
+		m_service.execute(task);
+		
+		return _list;
+	}
+	
+	/**
+	 * list serializer.
+	 * ex. list{"hoge","fuga"} -> "hoge,fuga" 
+	 * @param _list
+	 * @return selialized string
+	 */
+	public String serialize(List<Node> _list)
+	{
+		String prefix = "";
+		StringBuffer buf = new StringBuffer();
+		for(Node child : _list){
+			buf.append(prefix+child.getID().toString());
+			prefix = ",";
+		}
+		
+		return buf.toString();
+	}
+	
+	/**
+	 * string deserializer.
+	 * ex. "hoge,fuga" -> list{"hoge","fuga"}
+	 * @param _selialized
+	 * @return list
+	 */
+	public LinkedList<Node> deserialize(String _serialized) throws IllegalArgumentException
+	{
+		StringTokenizer tokens = new StringTokenizer(_serialized,",");
+		LinkedList<Node> res = new LinkedList<Node>();
+		
+		while(tokens.hasMoreElements()){
+			String tmp = tokens.nextToken();
+			StringTokenizer uuidAndVer = new StringTokenizer(tmp,"@");
+			
+			try{
+				NodeID id = createID(uuidAndVer.nextToken(),uuidAndVer.nextToken());
+				res.add(get(id));
+			}catch(Exception _e){
+				throw new IllegalArgumentException("unable to deserialize string ["+_serialized+"]",_e);
+			}
+		}
+		
+		return res;
+	}
+
+	public NodeID getTipID(String _uuid)
+	{
+		final String uuid = _uuid;
+		Callable<byte[]> task = new Callable<byte[]>(){
+			@Override
+			public byte[] call() throws Exception
+			{
+				RequestSender sender = (RequestSender)Thread.currentThread();
+				byte[] value = sender.get(NODETABLE,uuid,TIPID.getBytes(),ConsistencyLevel.ONE);
+				return value;
+			}
+		};
+		
+		Future<byte[]> future = m_service.submit(task);
+		
+		try {
+			byte[] value = future.get();
+			String id = new String(value);
+			StringTokenizer token = new StringTokenizer(id,"@");
+			NodeID nodeID = createID(token.nextToken(),token.nextToken());
+			return nodeID;
+		}catch(Exception _e){
+			_e.printStackTrace();
+		}
+		
+		return null; //not found.
+	}
+	
+	public Node createNode(NodeID _id,NodeData _data)
+	{
+		final NodeData data = _data;
+		final NodeID id = (_id != null) ? _id : createID(null,null);
+		
+		Callable<Boolean> task = new Callable<Boolean>(){
+			@Override
+			public Boolean call() throws Exception
+			{
+				RequestSender sender = (RequestSender)Thread.currentThread();
+				
+				//mutation map
+				HashMap<String,Map<String,List<Mutation>>> map = new HashMap<String,Map<String,List<Mutation>>>();
+				
+				/*
+				 * create mutation map for NODETABLE
+				 */
+				if(data != null){
+					LinkedList<Mutation> list = new LinkedList<Mutation>();
+					HashMap<String,List<Mutation>> info = new HashMap<String,List<Mutation>>();
+					Iterator<Node> itr = data.list().iterator();
+					
+					/*
+					 * create CSV from child list.
+					 */
+					StringBuffer buffer = new StringBuffer();
+					for(String prefix = "";itr.hasNext();prefix = ","){
+						buffer.append(String.format("%s%s",prefix,itr.next().getID().toString()));
+					}
+					Mutation mutChildren = new Mutation();
+					ColumnOrSuperColumn children = new ColumnOrSuperColumn();
+					children.column.name = CHILDREN.getBytes();
+					children.column.value = buffer.toString().getBytes();
+					mutChildren.column_or_supercolumn = children;
+					list.add(mutChildren);
+					
+					/*
+					 *  
+					 */
+					for(byte[] key : data.keys()){
+						Mutation mut = new Mutation();
+						ColumnOrSuperColumn column = new ColumnOrSuperColumn();
+						column.column.name = key;
+						column.column.value = data.get(key);
+						mut.column_or_supercolumn = column;
+						list.add(mut);
+					}
+					info.put(id.toString(),list);
+					
+					map.put(NODETABLE,info);
+				}
+				
+				/*
+				 * create mutation map for NODEIDTABLE
+				 */
+				HashMap<String,List<Mutation>> idtable_mutations = new HashMap<String,List<Mutation>>();
+				LinkedList<Mutation> list = new LinkedList<Mutation>();
+				
+				Mutation mutTipID = new Mutation();
+				ColumnOrSuperColumn tipID = new ColumnOrSuperColumn();
+				tipID.column.name = TIPID.getBytes();
+				tipID.column.value = id.getVersion().getBytes();
+				mutTipID.column_or_supercolumn = tipID;
+				
+				list.add(mutTipID);
+				idtable_mutations.put(TIPTABLE,list);
+				
+				return sender.batch_mutate(map,ConsistencyLevel.ONE);
+			}
+		};
+		
+		m_service.submit(task);
+		
+		return new CassandraNode(this,id);
+	}
+	
+	public NodeID createID(String _uuid,String _version)
+	{
+		return new RandomNodeIDImpl(_uuid,_version);
+	}
+	
+	class RandomNodeIDImpl extends RandomNodeID
+	{
+		String m_uuid;
+		String m_version;
+		
+		public RandomNodeIDImpl(String _uuid,String _version)
+		{
+			m_uuid = (_uuid != null) ? _uuid : UUID.randomUUID().toString();
+			m_version = (_version != null) ? _version : Long.toHexString((new Random()).nextLong());
+		}
+
+		@Override
+		public NodeID create()
+		{
+			return new RandomNodeIDImpl(null,null);
+		}
+
+		@Override
+		public NodeID update()
+		{
+			return new RandomNodeIDImpl(m_uuid,null);
+		}
+
+		@Override
+		public String getUUID()
+		{
+			return m_uuid;
+		}
+
+		@Override
+		public String getVersion()
+		{
+			return m_version;
+		}
+	}
+	
+	private static class RequestSender extends Thread
+	{
+		private int m_port;
+		private String m_host,m_ks;
+		private Cassandra.Client m_client;
+		
+		public RequestSender(Runnable _runnable,String _host,int _port,String _ks) throws TTransportException
+		{
+			super(_runnable);
+			m_port = _port;
+			m_host = _host;
+			m_ks = _ks;
+			
+			connect();
+		}
+		
+		public void connect() throws TTransportException
+		{
+			TTransport tr = new TSocket(m_host,m_port);
+			TProtocol proto = new TBinaryProtocol(tr);
+			m_client = new Cassandra.Client(proto);
+			
+			tr.open();			
+		}
+		
+		public static RequestSender newInstance(Runnable _runnable,String _host,int _port,String _ks)
+		{
+			RequestSender sender = null;
+			try {
+				sender = new RequestSender(_runnable,_host,_port,_ks);
+			} catch (TTransportException _e) {
+				_e.printStackTrace();
+			}
+			
+			return sender;
+		}
+		
+		public byte[] get(String _cf,String _key,byte[] _name,ConsistencyLevel _lv)
+		{
+			byte[] ret = null;
+			
+			ColumnPath path = new ColumnPath();
+			path.column_family = _cf;
+			path.column = _name;
+			try {
+				ColumnOrSuperColumn cors = m_client.get(m_ks,_key,path,_lv);
+				ret = cors.column.value;
+			}catch(NotFoundException _e){
+				System.out.println(String.format("column not found [%s][%s][%s]",_cf,_key,new String(_name)));
+			}catch(Exception _e){
+				_e.printStackTrace();
+			}
+			
+			return ret;
+		}
+		
+		public boolean insert(String _cf,String _key,byte[] _name,byte[] _value,ConsistencyLevel _lv)
+		{	
+			ColumnPath path = new ColumnPath();
+			path.column_family = _cf;
+			path.column = _name;
+			
+			try{
+				m_client.insert(m_ks,_key,path,_value,System.currentTimeMillis()/1000,_lv);
+				return true;
+			}catch(Exception _e){
+				_e.printStackTrace();
+			}
+			
+			return false;
+		}
+		
+		public List<ColumnOrSuperColumn> get_slice(String _cf,String _key,ConsistencyLevel _lv)
+		{
+			List<ColumnOrSuperColumn> ret = null;
+			SliceRange sr = new SliceRange(new byte[0],new byte[0],false,-1);
+			SlicePredicate sp = new SlicePredicate();
+			sp.slice_range = sr;
+			
+			try {
+				ret = m_client.get_slice(m_ks,_key,new ColumnParent(_cf),sp,_lv);
+			}catch(Exception _e){
+				_e.printStackTrace();
+			}
+			
+			return ret;
+		}
+		
+		public boolean batch_insert(String _cf,Map<String,List<ColumnOrSuperColumn>> _map,ConsistencyLevel _lv)
+		{
+			try{
+				m_client.batch_insert(m_ks,_cf,_map,_lv);
+				return true;
+			}catch(Exception _e){
+				_e.printStackTrace();
+			}
+			
+			return false;
+		}
+		
+		public boolean batch_mutate(Map<String,Map<String,List<Mutation>>> _mutateMap,ConsistencyLevel _lv)
+		{
+			try {
+				m_client.batch_mutate(m_ks,_mutateMap,_lv);
+				return true;
+			}catch(Exception _e){
+				_e.printStackTrace();
+			}
+			
+			return false;
+		}
+		
+		public String toString()
+		{
+			return "[thread="+this.getName()+",host="+m_host+",port="+m_port+",ks="+m_ks+"]";
+		}
+	}
+	
+	private class RequestSenderFactory implements ThreadFactory
+	{
+		private int m_port;
+		private String m_host,m_ks;
+		
+		public RequestSenderFactory(String _host,int _port,String _ks)
+		{
+			m_host = _host;
+			m_port = _port;
+			m_ks = _ks;
+		}
+		
+		@Override
+		public Thread newThread(Runnable _runnable)
+		{
+			return RequestSender.newInstance(_runnable,m_host,m_port,m_ks);
+		}
+	}
+}