view src/treecms/tree/cassandra/v1/CassandraForest.java @ 4:f5ed85be5640

finished treecms.cassandra.v1 implementation (not tested yet)
author shoshi
date Thu, 24 Feb 2011 21:30:18 +0900
parents
children 12604eb6b615
line wrap: on
line source

package treecms.tree.cassandra.v1;

import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.StringTokenizer;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;

import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.Mutation;
import org.apache.cassandra.thrift.NotFoundException;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;

import treecms.api.Forest;
import treecms.api.Node;
import treecms.api.NodeData;
import treecms.api.NodeID;
import treecms.tree.id.RandomNodeID;

/**
 * implementation of TreeCMS with Cassandra backend.
 * 
 * TreeCMSKS.NodeTable (table of all nodes)
 * 
 * +---------------------------------------------+
 * +   Key   |   Col1   |  Col2  |  Col3  | ...  |
 * +---------------------------------------------+
 * +  NodeID | Children | _attr1 | _attr2 | ...  |
 * +---------------------------------------------+
 * 
 * TreeCMSKS.TipTable (table of tip)
 * 
 * +--------------------+
 * +   Key   |   Col1   |
 * +--------------------+
 * +  NodeID |  version |
 * +--------------------+
 * 
 * @author shoshi
 */

public class CassandraForest implements Forest
{
	ExecutorService m_service;
	
	//column families.
	static final String NODETABLE = "NodeTable";
	static final String TIPTABLE = "TipTable";
	
	//reserved word.
	static final String NODE = "Node";
	static final String CHILDREN = "Children";
	static final char PREFIX = '_';
	
	//id table reserved
	static final String TIPID = "TipID";
	
	public CassandraForest(String _host,int _port,String _ks,int _threads)
	{
		m_service = Executors.newFixedThreadPool(_threads,new RequestSenderFactory(_host,_port,_ks));
	}

	@Override
	public Node get(NodeID _id)
	{
			
		return new CassandraNode(this,_id);
	}
	
	@Override
	public Node create()
	{
		return createNode(null,null); //create new node
	}
	
	public NodeData getNodeData(NodeID _id)
	{
		final NodeID id = _id;
		
		//create future task.
		Callable<List<ColumnOrSuperColumn>> task = new Callable<List<ColumnOrSuperColumn>>(){
			@Override
			public List<ColumnOrSuperColumn> call() throws Exception
			{
				RequestSender sender = (RequestSender)Thread.currentThread();
				List<ColumnOrSuperColumn> res = sender.get_slice(NODETABLE,id.toString(),ConsistencyLevel.ONE);
				return res;
			}
		};
		Future<List<ColumnOrSuperColumn>> future = m_service.submit(task);
		
		NodeData data = new NodeData();
		try{
			List<ColumnOrSuperColumn> slice = future.get();
			
			//iterate column
			for(ColumnOrSuperColumn column : slice){
				String name = new String(column.column.name);
				
				//if column name matches CHILDREN , deserialize value to child list.
				if(name.equals(CHILDREN)){
					List<Node> tmp = deserialize(new String(column.column.value));
					data.add(tmp);
				}else{
					String key = name.substring(1); //size of prefix
					data.set(key.getBytes(),column.column.value);
				}
			}
		}catch(Exception _e){
			_e.printStackTrace();
		}
		
		return data;
	}
	
	public List<CassandraNode> multiCreateNode(List<CassandraNode> _list)
	{
		final Map<String,Map<String,List<Mutation>>> mutationMap = new HashMap<String,Map<String,List<Mutation>>>();
		
		Map<String,List<Mutation>> nodeTable = new HashMap<String,List<Mutation>>();
		Map<String,List<Mutation>> tipTable = new HashMap<String,List<Mutation>>();
		for(CassandraNode node : _list){
			LinkedList<Mutation> list = new LinkedList<Mutation>();
			Mutation mut = new Mutation();
			ColumnOrSuperColumn column = new ColumnOrSuperColumn();
			mut.column_or_supercolumn = column;
			column.column.name = CHILDREN.getBytes(); 
			column.column.value = serialize(node.getData().list()).getBytes();
			list.add(mut);
			
			for(byte[] key : node.getData().keys()){
				mut = new Mutation();
				column = new ColumnOrSuperColumn();
				mut.column_or_supercolumn = column;
				column.column.name = key;
				column.column.value = node.getData().get(key);
				
				list.add(mut);
			}
			
			nodeTable.put(node.getID().toString(),list);
			
			mut = new Mutation();
			column = new ColumnOrSuperColumn();
			column.column.name = TIPID.getBytes();
			column.column.value = node.getID().getVersion().getBytes();
			list = new LinkedList<Mutation>();
			list.add(mut);
			tipTable.put(node.getID().getUUID(),list);
		}
		mutationMap.put(NODETABLE,nodeTable);
		mutationMap.put(TIPTABLE,tipTable);
		
		Runnable task = new Runnable(){
			@Override
			public void run()
			{
				RequestSender sender = (RequestSender)Thread.currentThread();
				sender.batch_mutate(mutationMap,ConsistencyLevel.ONE);
			}
		};
		
		m_service.execute(task);
		
		return _list;
	}
	
	/**
	 * list serializer.
	 * ex. list{"hoge","fuga"} -> "hoge,fuga" 
	 * @param _list
	 * @return selialized string
	 */
	public String serialize(List<Node> _list)
	{
		String prefix = "";
		StringBuffer buf = new StringBuffer();
		for(Node child : _list){
			buf.append(prefix+child.getID().toString());
			prefix = ",";
		}
		
		return buf.toString();
	}
	
	/**
	 * string deserializer.
	 * ex. "hoge,fuga" -> list{"hoge","fuga"}
	 * @param _selialized
	 * @return list
	 */
	public LinkedList<Node> deserialize(String _serialized) throws IllegalArgumentException
	{
		StringTokenizer tokens = new StringTokenizer(_serialized,",");
		LinkedList<Node> res = new LinkedList<Node>();
		
		while(tokens.hasMoreElements()){
			String tmp = tokens.nextToken();
			StringTokenizer uuidAndVer = new StringTokenizer(tmp,"@");
			
			try{
				NodeID id = createID(uuidAndVer.nextToken(),uuidAndVer.nextToken());
				res.add(get(id));
			}catch(Exception _e){
				throw new IllegalArgumentException("unable to deserialize string ["+_serialized+"]",_e);
			}
		}
		
		return res;
	}

	public NodeID getTipID(String _uuid)
	{
		final String uuid = _uuid;
		Callable<byte[]> task = new Callable<byte[]>(){
			@Override
			public byte[] call() throws Exception
			{
				RequestSender sender = (RequestSender)Thread.currentThread();
				byte[] value = sender.get(NODETABLE,uuid,TIPID.getBytes(),ConsistencyLevel.ONE);
				return value;
			}
		};
		
		Future<byte[]> future = m_service.submit(task);
		
		try {
			byte[] value = future.get();
			String id = new String(value);
			StringTokenizer token = new StringTokenizer(id,"@");
			NodeID nodeID = createID(token.nextToken(),token.nextToken());
			return nodeID;
		}catch(Exception _e){
			_e.printStackTrace();
		}
		
		return null; //not found.
	}
	
	public Node createNode(NodeID _id,NodeData _data)
	{
		final NodeData data = _data;
		final NodeID id = (_id != null) ? _id : createID(null,null);
		
		Callable<Boolean> task = new Callable<Boolean>(){
			@Override
			public Boolean call() throws Exception
			{
				RequestSender sender = (RequestSender)Thread.currentThread();
				
				//mutation map
				HashMap<String,Map<String,List<Mutation>>> map = new HashMap<String,Map<String,List<Mutation>>>();
				
				/*
				 * create mutation map for NODETABLE
				 */
				if(data != null){
					LinkedList<Mutation> list = new LinkedList<Mutation>();
					HashMap<String,List<Mutation>> info = new HashMap<String,List<Mutation>>();
					Iterator<Node> itr = data.list().iterator();
					
					/*
					 * create CSV from child list.
					 */
					StringBuffer buffer = new StringBuffer();
					for(String prefix = "";itr.hasNext();prefix = ","){
						buffer.append(String.format("%s%s",prefix,itr.next().getID().toString()));
					}
					Mutation mutChildren = new Mutation();
					ColumnOrSuperColumn children = new ColumnOrSuperColumn();
					children.column.name = CHILDREN.getBytes();
					children.column.value = buffer.toString().getBytes();
					mutChildren.column_or_supercolumn = children;
					list.add(mutChildren);
					
					/*
					 *  
					 */
					for(byte[] key : data.keys()){
						Mutation mut = new Mutation();
						ColumnOrSuperColumn column = new ColumnOrSuperColumn();
						column.column.name = key;
						column.column.value = data.get(key);
						mut.column_or_supercolumn = column;
						list.add(mut);
					}
					info.put(id.toString(),list);
					
					map.put(NODETABLE,info);
				}
				
				/*
				 * create mutation map for NODEIDTABLE
				 */
				HashMap<String,List<Mutation>> idtable_mutations = new HashMap<String,List<Mutation>>();
				LinkedList<Mutation> list = new LinkedList<Mutation>();
				
				Mutation mutTipID = new Mutation();
				ColumnOrSuperColumn tipID = new ColumnOrSuperColumn();
				tipID.column.name = TIPID.getBytes();
				tipID.column.value = id.getVersion().getBytes();
				mutTipID.column_or_supercolumn = tipID;
				
				list.add(mutTipID);
				idtable_mutations.put(TIPTABLE,list);
				
				return sender.batch_mutate(map,ConsistencyLevel.ONE);
			}
		};
		
		m_service.submit(task);
		
		return new CassandraNode(this,id);
	}
	
	public NodeID createID(String _uuid,String _version)
	{
		return new RandomNodeIDImpl(_uuid,_version);
	}
	
	class RandomNodeIDImpl extends RandomNodeID
	{
		String m_uuid;
		String m_version;
		
		public RandomNodeIDImpl(String _uuid,String _version)
		{
			m_uuid = (_uuid != null) ? _uuid : UUID.randomUUID().toString();
			m_version = (_version != null) ? _version : Long.toHexString((new Random()).nextLong());
		}

		@Override
		public NodeID create()
		{
			return new RandomNodeIDImpl(null,null);
		}

		@Override
		public NodeID update()
		{
			return new RandomNodeIDImpl(m_uuid,null);
		}

		@Override
		public String getUUID()
		{
			return m_uuid;
		}

		@Override
		public String getVersion()
		{
			return m_version;
		}
	}
	
	private static class RequestSender extends Thread
	{
		private int m_port;
		private String m_host,m_ks;
		private Cassandra.Client m_client;
		
		public RequestSender(Runnable _runnable,String _host,int _port,String _ks) throws TTransportException
		{
			super(_runnable);
			m_port = _port;
			m_host = _host;
			m_ks = _ks;
			
			connect();
		}
		
		public void connect() throws TTransportException
		{
			TTransport tr = new TSocket(m_host,m_port);
			TProtocol proto = new TBinaryProtocol(tr);
			m_client = new Cassandra.Client(proto);
			
			tr.open();			
		}
		
		public static RequestSender newInstance(Runnable _runnable,String _host,int _port,String _ks)
		{
			RequestSender sender = null;
			try {
				sender = new RequestSender(_runnable,_host,_port,_ks);
			} catch (TTransportException _e) {
				_e.printStackTrace();
			}
			
			return sender;
		}
		
		public byte[] get(String _cf,String _key,byte[] _name,ConsistencyLevel _lv)
		{
			byte[] ret = null;
			
			ColumnPath path = new ColumnPath();
			path.column_family = _cf;
			path.column = _name;
			try {
				ColumnOrSuperColumn cors = m_client.get(m_ks,_key,path,_lv);
				ret = cors.column.value;
			}catch(NotFoundException _e){
				System.out.println(String.format("column not found [%s][%s][%s]",_cf,_key,new String(_name)));
			}catch(Exception _e){
				_e.printStackTrace();
			}
			
			return ret;
		}
		
		public boolean insert(String _cf,String _key,byte[] _name,byte[] _value,ConsistencyLevel _lv)
		{	
			ColumnPath path = new ColumnPath();
			path.column_family = _cf;
			path.column = _name;
			
			try{
				m_client.insert(m_ks,_key,path,_value,System.currentTimeMillis()/1000,_lv);
				return true;
			}catch(Exception _e){
				_e.printStackTrace();
			}
			
			return false;
		}
		
		public List<ColumnOrSuperColumn> get_slice(String _cf,String _key,ConsistencyLevel _lv)
		{
			List<ColumnOrSuperColumn> ret = null;
			SliceRange sr = new SliceRange(new byte[0],new byte[0],false,-1);
			SlicePredicate sp = new SlicePredicate();
			sp.slice_range = sr;
			
			try {
				ret = m_client.get_slice(m_ks,_key,new ColumnParent(_cf),sp,_lv);
			}catch(Exception _e){
				_e.printStackTrace();
			}
			
			return ret;
		}
		
		public boolean batch_insert(String _cf,Map<String,List<ColumnOrSuperColumn>> _map,ConsistencyLevel _lv)
		{
			try{
				m_client.batch_insert(m_ks,_cf,_map,_lv);
				return true;
			}catch(Exception _e){
				_e.printStackTrace();
			}
			
			return false;
		}
		
		public boolean batch_mutate(Map<String,Map<String,List<Mutation>>> _mutateMap,ConsistencyLevel _lv)
		{
			try {
				m_client.batch_mutate(m_ks,_mutateMap,_lv);
				return true;
			}catch(Exception _e){
				_e.printStackTrace();
			}
			
			return false;
		}
		
		public String toString()
		{
			return "[thread="+this.getName()+",host="+m_host+",port="+m_port+",ks="+m_ks+"]";
		}
	}
	
	private class RequestSenderFactory implements ThreadFactory
	{
		private int m_port;
		private String m_host,m_ks;
		
		public RequestSenderFactory(String _host,int _port,String _ks)
		{
			m_host = _host;
			m_port = _port;
			m_ks = _ks;
		}
		
		@Override
		public Thread newThread(Runnable _runnable)
		{
			return RequestSender.newInstance(_runnable,m_host,m_port,m_ks);
		}
	}
}