view src/treecms/tree/cassandra/v1/CassandraForest.java @ 7:fc19e38b669b

added concurrent access client for cassandr
author shoshi
date Thu, 17 Mar 2011 23:24:08 +0900
parents 12604eb6b615
children f96193babac0
line wrap: on
line source

package treecms.tree.cassandra.v1;

import java.util.HashMap;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.StringTokenizer;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;

import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.Mutation;
import org.apache.cassandra.thrift.NotFoundException;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;

import treecms.api.Forest;
import treecms.api.Node;
import treecms.api.NodeData;
import treecms.api.NodeID;
import treecms.tree.id.AbstractRandomNodeID;

/**
 * Cassandra上で非破壊的木構造を実現するためのForestの実装です。
 * 
 * TreeCMSKS.NODETABLE (table of all nodes)
 * 
 * +---------------------------------------------+
 * +   Key   |   Col1   |  Col2  |  Col3  | ...  |
 * +---------------------------------------------+
 * +  NodeID | Children | _attr1 | _attr2 | ...  |
 * +---------------------------------------------+
 * 
 * TreeCMSKS.TIPTABLE (table of tip)
 * 
 * +--------------------+
 * +   Key   |   Col1   |
 * +--------------------+
 * +  NodeID |  version |
 * +--------------------+
 * 
 * @author shoshi
 */

public class CassandraForest implements Forest
{
	ExecutorService m_service;
	
	//column families.
	static final String NODETABLE = "NODETABLE";
	static final String TIPTABLE = "TIPTABLE";
	
	//reserved column.
	static final byte[] TIPID = "TIPID".getBytes();
	static final byte[] CHILDREN = "CHILDREN".getBytes();
	static final char PREFIX = '_';
	
	//cache
	private ConcurrentHashMap<NodeID,CassandraNode> m_cache;
	private ConcurrentHashMap<String,CassandraNode> m_tipCache;
	
	public CassandraForest(String _host,int _port,String _ks,int _threads)
	{
		m_service = Executors.newFixedThreadPool(_threads,new ClientThreadFactory(_host,_port));
		m_cache = new ConcurrentHashMap<NodeID,CassandraNode>();
		m_tipCache = new ConcurrentHashMap<String,CassandraNode>();
	}

	@Override
	public Node get(NodeID _id)
	{
		return new CassandraNode(this,_id);
	}
	
	@Override
	public Node create()
	{
		return createNode(null,null);
	}
	
	public NodeData getNodeData(NodeID _id)
	{
		final NodeID id = _id;
		return null;
	}

	public NodeID getTipID(String _uuid)
	{
		final String uuid = _uuid;
		return null;
	}
	
	public Node createNode(NodeID _id,NodeData _data)
	{
		final NodeData data = _data;
		final NodeID id = (_id != null) ? _id : createID(null,null);
		
		Callable<Boolean> task = new Callable<Boolean>(){
			@Override
			public Boolean call() throws Exception
			{
				return true;
			}
		};
		
		m_service.submit(task);
		
		return new CassandraNode(this,id);
	}
	
	public NodeID createID(String _uuid,String _version)
	{
		return new RandomNodeID(_uuid,_version);
	}
	
	class RandomNodeID extends AbstractRandomNodeID
	{
		String m_uuid;
		String m_version;
		
		public RandomNodeID(String _uuid,String _version)
		{
			m_uuid = (_uuid != null) ? _uuid : UUID.randomUUID().toString();
			m_version = (_version != null) ? _version : Long.toHexString((new Random()).nextLong());
		}

		@Override
		public NodeID create()
		{
			return new RandomNodeID(null,null);
		}

		@Override
		public NodeID update()
		{
			return new RandomNodeID(m_uuid,null);
		}

		@Override
		public String getUUID()
		{
			return m_uuid;
		}

		@Override
		public String getVersion()
		{
			return m_version;
		}
	}

	@Override
	public Node create(NodeData data)
	{
		return null;
	}

	@Override
	public Node getTip(String uuid)
	{
		return null;
	}
}