Mercurial > hg > Members > shoshi > TreeCMSv2
diff src/treecms/tree/cassandra/v1/CassandraForest.java @ 4:f5ed85be5640
finished treecms.cassandra.v1 implementation (not tested yet)
author | shoshi |
---|---|
date | Thu, 24 Feb 2011 21:30:18 +0900 |
parents | |
children | 12604eb6b615 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/treecms/tree/cassandra/v1/CassandraForest.java Thu Feb 24 21:30:18 2011 +0900 @@ -0,0 +1,511 @@ +package treecms.tree.cassandra.v1; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.StringTokenizer; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; + +import org.apache.cassandra.thrift.Cassandra; +import org.apache.cassandra.thrift.ColumnOrSuperColumn; +import org.apache.cassandra.thrift.ColumnParent; +import org.apache.cassandra.thrift.ColumnPath; +import org.apache.cassandra.thrift.ConsistencyLevel; +import org.apache.cassandra.thrift.Mutation; +import org.apache.cassandra.thrift.NotFoundException; +import org.apache.cassandra.thrift.SlicePredicate; +import org.apache.cassandra.thrift.SliceRange; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + +import treecms.api.Forest; +import treecms.api.Node; +import treecms.api.NodeData; +import treecms.api.NodeID; +import treecms.tree.id.RandomNodeID; + +/** + * implementation of TreeCMS with Cassandra backend. + * + * TreeCMSKS.NodeTable (table of all nodes) + * + * +---------------------------------------------+ + * + Key | Col1 | Col2 | Col3 | ... | + * +---------------------------------------------+ + * + NodeID | Children | _attr1 | _attr2 | ... | + * +---------------------------------------------+ + * + * TreeCMSKS.TipTable (table of tip) + * + * +--------------------+ + * + Key | Col1 | + * +--------------------+ + * + NodeID | version | + * +--------------------+ + * + * @author shoshi + */ + +public class CassandraForest implements Forest +{ + ExecutorService m_service; + + //column families. + static final String NODETABLE = "NodeTable"; + static final String TIPTABLE = "TipTable"; + + //reserved word. + static final String NODE = "Node"; + static final String CHILDREN = "Children"; + static final char PREFIX = '_'; + + //id table reserved + static final String TIPID = "TipID"; + + public CassandraForest(String _host,int _port,String _ks,int _threads) + { + m_service = Executors.newFixedThreadPool(_threads,new RequestSenderFactory(_host,_port,_ks)); + } + + @Override + public Node get(NodeID _id) + { + + return new CassandraNode(this,_id); + } + + @Override + public Node create() + { + return createNode(null,null); //create new node + } + + public NodeData getNodeData(NodeID _id) + { + final NodeID id = _id; + + //create future task. + Callable<List<ColumnOrSuperColumn>> task = new Callable<List<ColumnOrSuperColumn>>(){ + @Override + public List<ColumnOrSuperColumn> call() throws Exception + { + RequestSender sender = (RequestSender)Thread.currentThread(); + List<ColumnOrSuperColumn> res = sender.get_slice(NODETABLE,id.toString(),ConsistencyLevel.ONE); + return res; + } + }; + Future<List<ColumnOrSuperColumn>> future = m_service.submit(task); + + NodeData data = new NodeData(); + try{ + List<ColumnOrSuperColumn> slice = future.get(); + + //iterate column + for(ColumnOrSuperColumn column : slice){ + String name = new String(column.column.name); + + //if column name matches CHILDREN , deserialize value to child list. + if(name.equals(CHILDREN)){ + List<Node> tmp = deserialize(new String(column.column.value)); + data.add(tmp); + }else{ + String key = name.substring(1); //size of prefix + data.set(key.getBytes(),column.column.value); + } + } + }catch(Exception _e){ + _e.printStackTrace(); + } + + return data; + } + + public List<CassandraNode> multiCreateNode(List<CassandraNode> _list) + { + final Map<String,Map<String,List<Mutation>>> mutationMap = new HashMap<String,Map<String,List<Mutation>>>(); + + Map<String,List<Mutation>> nodeTable = new HashMap<String,List<Mutation>>(); + Map<String,List<Mutation>> tipTable = new HashMap<String,List<Mutation>>(); + for(CassandraNode node : _list){ + LinkedList<Mutation> list = new LinkedList<Mutation>(); + Mutation mut = new Mutation(); + ColumnOrSuperColumn column = new ColumnOrSuperColumn(); + mut.column_or_supercolumn = column; + column.column.name = CHILDREN.getBytes(); + column.column.value = serialize(node.getData().list()).getBytes(); + list.add(mut); + + for(byte[] key : node.getData().keys()){ + mut = new Mutation(); + column = new ColumnOrSuperColumn(); + mut.column_or_supercolumn = column; + column.column.name = key; + column.column.value = node.getData().get(key); + + list.add(mut); + } + + nodeTable.put(node.getID().toString(),list); + + mut = new Mutation(); + column = new ColumnOrSuperColumn(); + column.column.name = TIPID.getBytes(); + column.column.value = node.getID().getVersion().getBytes(); + list = new LinkedList<Mutation>(); + list.add(mut); + tipTable.put(node.getID().getUUID(),list); + } + mutationMap.put(NODETABLE,nodeTable); + mutationMap.put(TIPTABLE,tipTable); + + Runnable task = new Runnable(){ + @Override + public void run() + { + RequestSender sender = (RequestSender)Thread.currentThread(); + sender.batch_mutate(mutationMap,ConsistencyLevel.ONE); + } + }; + + m_service.execute(task); + + return _list; + } + + /** + * list serializer. + * ex. list{"hoge","fuga"} -> "hoge,fuga" + * @param _list + * @return selialized string + */ + public String serialize(List<Node> _list) + { + String prefix = ""; + StringBuffer buf = new StringBuffer(); + for(Node child : _list){ + buf.append(prefix+child.getID().toString()); + prefix = ","; + } + + return buf.toString(); + } + + /** + * string deserializer. + * ex. "hoge,fuga" -> list{"hoge","fuga"} + * @param _selialized + * @return list + */ + public LinkedList<Node> deserialize(String _serialized) throws IllegalArgumentException + { + StringTokenizer tokens = new StringTokenizer(_serialized,","); + LinkedList<Node> res = new LinkedList<Node>(); + + while(tokens.hasMoreElements()){ + String tmp = tokens.nextToken(); + StringTokenizer uuidAndVer = new StringTokenizer(tmp,"@"); + + try{ + NodeID id = createID(uuidAndVer.nextToken(),uuidAndVer.nextToken()); + res.add(get(id)); + }catch(Exception _e){ + throw new IllegalArgumentException("unable to deserialize string ["+_serialized+"]",_e); + } + } + + return res; + } + + public NodeID getTipID(String _uuid) + { + final String uuid = _uuid; + Callable<byte[]> task = new Callable<byte[]>(){ + @Override + public byte[] call() throws Exception + { + RequestSender sender = (RequestSender)Thread.currentThread(); + byte[] value = sender.get(NODETABLE,uuid,TIPID.getBytes(),ConsistencyLevel.ONE); + return value; + } + }; + + Future<byte[]> future = m_service.submit(task); + + try { + byte[] value = future.get(); + String id = new String(value); + StringTokenizer token = new StringTokenizer(id,"@"); + NodeID nodeID = createID(token.nextToken(),token.nextToken()); + return nodeID; + }catch(Exception _e){ + _e.printStackTrace(); + } + + return null; //not found. + } + + public Node createNode(NodeID _id,NodeData _data) + { + final NodeData data = _data; + final NodeID id = (_id != null) ? _id : createID(null,null); + + Callable<Boolean> task = new Callable<Boolean>(){ + @Override + public Boolean call() throws Exception + { + RequestSender sender = (RequestSender)Thread.currentThread(); + + //mutation map + HashMap<String,Map<String,List<Mutation>>> map = new HashMap<String,Map<String,List<Mutation>>>(); + + /* + * create mutation map for NODETABLE + */ + if(data != null){ + LinkedList<Mutation> list = new LinkedList<Mutation>(); + HashMap<String,List<Mutation>> info = new HashMap<String,List<Mutation>>(); + Iterator<Node> itr = data.list().iterator(); + + /* + * create CSV from child list. + */ + StringBuffer buffer = new StringBuffer(); + for(String prefix = "";itr.hasNext();prefix = ","){ + buffer.append(String.format("%s%s",prefix,itr.next().getID().toString())); + } + Mutation mutChildren = new Mutation(); + ColumnOrSuperColumn children = new ColumnOrSuperColumn(); + children.column.name = CHILDREN.getBytes(); + children.column.value = buffer.toString().getBytes(); + mutChildren.column_or_supercolumn = children; + list.add(mutChildren); + + /* + * + */ + for(byte[] key : data.keys()){ + Mutation mut = new Mutation(); + ColumnOrSuperColumn column = new ColumnOrSuperColumn(); + column.column.name = key; + column.column.value = data.get(key); + mut.column_or_supercolumn = column; + list.add(mut); + } + info.put(id.toString(),list); + + map.put(NODETABLE,info); + } + + /* + * create mutation map for NODEIDTABLE + */ + HashMap<String,List<Mutation>> idtable_mutations = new HashMap<String,List<Mutation>>(); + LinkedList<Mutation> list = new LinkedList<Mutation>(); + + Mutation mutTipID = new Mutation(); + ColumnOrSuperColumn tipID = new ColumnOrSuperColumn(); + tipID.column.name = TIPID.getBytes(); + tipID.column.value = id.getVersion().getBytes(); + mutTipID.column_or_supercolumn = tipID; + + list.add(mutTipID); + idtable_mutations.put(TIPTABLE,list); + + return sender.batch_mutate(map,ConsistencyLevel.ONE); + } + }; + + m_service.submit(task); + + return new CassandraNode(this,id); + } + + public NodeID createID(String _uuid,String _version) + { + return new RandomNodeIDImpl(_uuid,_version); + } + + class RandomNodeIDImpl extends RandomNodeID + { + String m_uuid; + String m_version; + + public RandomNodeIDImpl(String _uuid,String _version) + { + m_uuid = (_uuid != null) ? _uuid : UUID.randomUUID().toString(); + m_version = (_version != null) ? _version : Long.toHexString((new Random()).nextLong()); + } + + @Override + public NodeID create() + { + return new RandomNodeIDImpl(null,null); + } + + @Override + public NodeID update() + { + return new RandomNodeIDImpl(m_uuid,null); + } + + @Override + public String getUUID() + { + return m_uuid; + } + + @Override + public String getVersion() + { + return m_version; + } + } + + private static class RequestSender extends Thread + { + private int m_port; + private String m_host,m_ks; + private Cassandra.Client m_client; + + public RequestSender(Runnable _runnable,String _host,int _port,String _ks) throws TTransportException + { + super(_runnable); + m_port = _port; + m_host = _host; + m_ks = _ks; + + connect(); + } + + public void connect() throws TTransportException + { + TTransport tr = new TSocket(m_host,m_port); + TProtocol proto = new TBinaryProtocol(tr); + m_client = new Cassandra.Client(proto); + + tr.open(); + } + + public static RequestSender newInstance(Runnable _runnable,String _host,int _port,String _ks) + { + RequestSender sender = null; + try { + sender = new RequestSender(_runnable,_host,_port,_ks); + } catch (TTransportException _e) { + _e.printStackTrace(); + } + + return sender; + } + + public byte[] get(String _cf,String _key,byte[] _name,ConsistencyLevel _lv) + { + byte[] ret = null; + + ColumnPath path = new ColumnPath(); + path.column_family = _cf; + path.column = _name; + try { + ColumnOrSuperColumn cors = m_client.get(m_ks,_key,path,_lv); + ret = cors.column.value; + }catch(NotFoundException _e){ + System.out.println(String.format("column not found [%s][%s][%s]",_cf,_key,new String(_name))); + }catch(Exception _e){ + _e.printStackTrace(); + } + + return ret; + } + + public boolean insert(String _cf,String _key,byte[] _name,byte[] _value,ConsistencyLevel _lv) + { + ColumnPath path = new ColumnPath(); + path.column_family = _cf; + path.column = _name; + + try{ + m_client.insert(m_ks,_key,path,_value,System.currentTimeMillis()/1000,_lv); + return true; + }catch(Exception _e){ + _e.printStackTrace(); + } + + return false; + } + + public List<ColumnOrSuperColumn> get_slice(String _cf,String _key,ConsistencyLevel _lv) + { + List<ColumnOrSuperColumn> ret = null; + SliceRange sr = new SliceRange(new byte[0],new byte[0],false,-1); + SlicePredicate sp = new SlicePredicate(); + sp.slice_range = sr; + + try { + ret = m_client.get_slice(m_ks,_key,new ColumnParent(_cf),sp,_lv); + }catch(Exception _e){ + _e.printStackTrace(); + } + + return ret; + } + + public boolean batch_insert(String _cf,Map<String,List<ColumnOrSuperColumn>> _map,ConsistencyLevel _lv) + { + try{ + m_client.batch_insert(m_ks,_cf,_map,_lv); + return true; + }catch(Exception _e){ + _e.printStackTrace(); + } + + return false; + } + + public boolean batch_mutate(Map<String,Map<String,List<Mutation>>> _mutateMap,ConsistencyLevel _lv) + { + try { + m_client.batch_mutate(m_ks,_mutateMap,_lv); + return true; + }catch(Exception _e){ + _e.printStackTrace(); + } + + return false; + } + + public String toString() + { + return "[thread="+this.getName()+",host="+m_host+",port="+m_port+",ks="+m_ks+"]"; + } + } + + private class RequestSenderFactory implements ThreadFactory + { + private int m_port; + private String m_host,m_ks; + + public RequestSenderFactory(String _host,int _port,String _ks) + { + m_host = _host; + m_port = _port; + m_ks = _ks; + } + + @Override + public Thread newThread(Runnable _runnable) + { + return RequestSender.newInstance(_runnable,m_host,m_port,m_ks); + } + } +}