Mercurial > hg > Members > shoshi > TreeCMSv2
view src/treecms/tree/cassandra/v1/CassandraForest.java @ 6:12604eb6b615
added javadoc
author | shoshi |
---|---|
date | Mon, 14 Mar 2011 23:24:38 +0900 |
parents | f5ed85be5640 |
children | fc19e38b669b |
line wrap: on
line source
package treecms.tree.cassandra.v1; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Random; import java.util.StringTokenizer; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; import org.apache.cassandra.thrift.Cassandra; import org.apache.cassandra.thrift.ColumnOrSuperColumn; import org.apache.cassandra.thrift.ColumnParent; import org.apache.cassandra.thrift.ColumnPath; import org.apache.cassandra.thrift.ConsistencyLevel; import org.apache.cassandra.thrift.Mutation; import org.apache.cassandra.thrift.NotFoundException; import org.apache.cassandra.thrift.SlicePredicate; import org.apache.cassandra.thrift.SliceRange; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; import treecms.api.Forest; import treecms.api.Node; import treecms.api.NodeData; import treecms.api.NodeID; import treecms.tree.id.AbstractRandomNodeID; /** * implementation of TreeCMS with Cassandra backend. * * TreeCMSKS.NodeTable (table of all nodes) * * +---------------------------------------------+ * + Key | Col1 | Col2 | Col3 | ... | * +---------------------------------------------+ * + NodeID | Children | _attr1 | _attr2 | ... | * +---------------------------------------------+ * * TreeCMSKS.TipTable (table of tip) * * +--------------------+ * + Key | Col1 | * +--------------------+ * + NodeID | version | * +--------------------+ * * @author shoshi */ public class CassandraForest implements Forest { ExecutorService m_service; //column families. static final String NODETABLE = "NodeTable"; static final String TIPTABLE = "TipTable"; //reserved word. static final String NODE = "Node"; static final String CHILDREN = "Children"; static final char PREFIX = '_'; //id table reserved static final String TIPID = "TipID"; public CassandraForest(String _host,int _port,String _ks,int _threads) { m_service = Executors.newFixedThreadPool(_threads,new RequestSenderFactory(_host,_port,_ks)); } @Override public Node get(NodeID _id) { return new CassandraNode(this,_id); } @Override public Node create() { return createNode(null,null); //create new node } public NodeData getNodeData(NodeID _id) { final NodeID id = _id; //create future task. Callable<List<ColumnOrSuperColumn>> task = new Callable<List<ColumnOrSuperColumn>>(){ @Override public List<ColumnOrSuperColumn> call() throws Exception { RequestSender sender = (RequestSender)Thread.currentThread(); List<ColumnOrSuperColumn> res = sender.get_slice(NODETABLE,id.toString(),ConsistencyLevel.ONE); return res; } }; Future<List<ColumnOrSuperColumn>> future = m_service.submit(task); NodeData data = new NodeData(); try{ List<ColumnOrSuperColumn> slice = future.get(); //iterate column for(ColumnOrSuperColumn column : slice){ String name = new String(column.column.name); //if column name matches CHILDREN , deserialize value to child list. if(name.equals(CHILDREN)){ List<Node> tmp = deserialize(new String(column.column.value)); data.add(tmp); }else{ String key = name.substring(1); //size of prefix data.set(key.getBytes(),column.column.value); } } }catch(Exception _e){ _e.printStackTrace(); } return data; } public List<CassandraNode> multiCreateNode(List<CassandraNode> _list) { final Map<String,Map<String,List<Mutation>>> mutationMap = new HashMap<String,Map<String,List<Mutation>>>(); Map<String,List<Mutation>> nodeTable = new HashMap<String,List<Mutation>>(); Map<String,List<Mutation>> tipTable = new HashMap<String,List<Mutation>>(); for(CassandraNode node : _list){ LinkedList<Mutation> list = new LinkedList<Mutation>(); Mutation mut = new Mutation(); ColumnOrSuperColumn column = new ColumnOrSuperColumn(); mut.column_or_supercolumn = column; column.column.name = CHILDREN.getBytes(); column.column.value = serialize(node.getData().list()).getBytes(); list.add(mut); for(byte[] key : node.getData().keys()){ mut = new Mutation(); column = new ColumnOrSuperColumn(); mut.column_or_supercolumn = column; column.column.name = key; column.column.value = node.getData().get(key); list.add(mut); } nodeTable.put(node.getID().toString(),list); mut = new Mutation(); column = new ColumnOrSuperColumn(); column.column.name = TIPID.getBytes(); column.column.value = node.getID().getVersion().getBytes(); list = new LinkedList<Mutation>(); list.add(mut); tipTable.put(node.getID().getUUID(),list); } mutationMap.put(NODETABLE,nodeTable); mutationMap.put(TIPTABLE,tipTable); Runnable task = new Runnable(){ @Override public void run() { RequestSender sender = (RequestSender)Thread.currentThread(); sender.batch_mutate(mutationMap,ConsistencyLevel.ONE); } }; m_service.execute(task); return _list; } /** * list serializer. * ex. list{"hoge","fuga"} -> "hoge,fuga" * @param _list * @return selialized string */ public String serialize(List<Node> _list) { String prefix = ""; StringBuffer buf = new StringBuffer(); for(Node child : _list){ buf.append(prefix+child.getID().toString()); prefix = ","; } return buf.toString(); } /** * string deserializer. * ex. "hoge,fuga" -> list{"hoge","fuga"} * @param _selialized * @return list */ public LinkedList<Node> deserialize(String _serialized) throws IllegalArgumentException { StringTokenizer tokens = new StringTokenizer(_serialized,","); LinkedList<Node> res = new LinkedList<Node>(); while(tokens.hasMoreElements()){ String tmp = tokens.nextToken(); StringTokenizer uuidAndVer = new StringTokenizer(tmp,"@"); try{ NodeID id = createID(uuidAndVer.nextToken(),uuidAndVer.nextToken()); res.add(get(id)); }catch(Exception _e){ throw new IllegalArgumentException("unable to deserialize string ["+_serialized+"]",_e); } } return res; } public NodeID getTipID(String _uuid) { final String uuid = _uuid; Callable<byte[]> task = new Callable<byte[]>(){ @Override public byte[] call() throws Exception { RequestSender sender = (RequestSender)Thread.currentThread(); byte[] value = sender.get(NODETABLE,uuid,TIPID.getBytes(),ConsistencyLevel.ONE); return value; } }; Future<byte[]> future = m_service.submit(task); try { byte[] value = future.get(); String id = new String(value); StringTokenizer token = new StringTokenizer(id,"@"); NodeID nodeID = createID(token.nextToken(),token.nextToken()); return nodeID; }catch(Exception _e){ _e.printStackTrace(); } return null; //not found. } public Node createNode(NodeID _id,NodeData _data) { final NodeData data = _data; final NodeID id = (_id != null) ? _id : createID(null,null); Callable<Boolean> task = new Callable<Boolean>(){ @Override public Boolean call() throws Exception { RequestSender sender = (RequestSender)Thread.currentThread(); //mutation map HashMap<String,Map<String,List<Mutation>>> map = new HashMap<String,Map<String,List<Mutation>>>(); /* * create mutation map for NODETABLE */ if(data != null){ LinkedList<Mutation> list = new LinkedList<Mutation>(); HashMap<String,List<Mutation>> info = new HashMap<String,List<Mutation>>(); Iterator<Node> itr = data.list().iterator(); /* * create CSV from child list. */ StringBuffer buffer = new StringBuffer(); for(String prefix = "";itr.hasNext();prefix = ","){ buffer.append(String.format("%s%s",prefix,itr.next().getID().toString())); } Mutation mutChildren = new Mutation(); ColumnOrSuperColumn children = new ColumnOrSuperColumn(); children.column.name = CHILDREN.getBytes(); children.column.value = buffer.toString().getBytes(); mutChildren.column_or_supercolumn = children; list.add(mutChildren); /* * */ for(byte[] key : data.keys()){ Mutation mut = new Mutation(); ColumnOrSuperColumn column = new ColumnOrSuperColumn(); column.column.name = key; column.column.value = data.get(key); mut.column_or_supercolumn = column; list.add(mut); } info.put(id.toString(),list); map.put(NODETABLE,info); } /* * create mutation map for NODEIDTABLE */ HashMap<String,List<Mutation>> idtable_mutations = new HashMap<String,List<Mutation>>(); LinkedList<Mutation> list = new LinkedList<Mutation>(); Mutation mutTipID = new Mutation(); ColumnOrSuperColumn tipID = new ColumnOrSuperColumn(); tipID.column.name = TIPID.getBytes(); tipID.column.value = id.getVersion().getBytes(); mutTipID.column_or_supercolumn = tipID; list.add(mutTipID); idtable_mutations.put(TIPTABLE,list); return sender.batch_mutate(map,ConsistencyLevel.ONE); } }; m_service.submit(task); return new CassandraNode(this,id); } public NodeID createID(String _uuid,String _version) { return new RandomNodeID(_uuid,_version); } class RandomNodeID extends AbstractRandomNodeID { String m_uuid; String m_version; public RandomNodeID(String _uuid,String _version) { m_uuid = (_uuid != null) ? _uuid : UUID.randomUUID().toString(); m_version = (_version != null) ? _version : Long.toHexString((new Random()).nextLong()); } @Override public NodeID create() { return new RandomNodeID(null,null); } @Override public NodeID update() { return new RandomNodeID(m_uuid,null); } @Override public String getUUID() { return m_uuid; } @Override public String getVersion() { return m_version; } } private static class RequestSender extends Thread { private int m_port; private String m_host,m_ks; private Cassandra.Client m_client; public RequestSender(Runnable _runnable,String _host,int _port,String _ks) throws TTransportException { super(_runnable); m_port = _port; m_host = _host; m_ks = _ks; connect(); } public void connect() throws TTransportException { TTransport tr = new TSocket(m_host,m_port); TProtocol proto = new TBinaryProtocol(tr); m_client = new Cassandra.Client(proto); tr.open(); } public static RequestSender newInstance(Runnable _runnable,String _host,int _port,String _ks) { RequestSender sender = null; try { sender = new RequestSender(_runnable,_host,_port,_ks); } catch (TTransportException _e) { _e.printStackTrace(); } return sender; } public byte[] get(String _cf,String _key,byte[] _name,ConsistencyLevel _lv) { byte[] ret = null; ColumnPath path = new ColumnPath(); path.column_family = _cf; path.column = _name; try { ColumnOrSuperColumn cors = m_client.get(m_ks,_key,path,_lv); ret = cors.column.value; }catch(NotFoundException _e){ System.out.println(String.format("column not found [%s][%s][%s]",_cf,_key,new String(_name))); }catch(Exception _e){ _e.printStackTrace(); } return ret; } public boolean insert(String _cf,String _key,byte[] _name,byte[] _value,ConsistencyLevel _lv) { ColumnPath path = new ColumnPath(); path.column_family = _cf; path.column = _name; try{ m_client.insert(m_ks,_key,path,_value,System.currentTimeMillis()/1000,_lv); return true; }catch(Exception _e){ _e.printStackTrace(); } return false; } public List<ColumnOrSuperColumn> get_slice(String _cf,String _key,ConsistencyLevel _lv) { List<ColumnOrSuperColumn> ret = null; SliceRange sr = new SliceRange(new byte[0],new byte[0],false,-1); SlicePredicate sp = new SlicePredicate(); sp.slice_range = sr; try { ret = m_client.get_slice(m_ks,_key,new ColumnParent(_cf),sp,_lv); }catch(Exception _e){ _e.printStackTrace(); } return ret; } public boolean batch_insert(String _cf,Map<String,List<ColumnOrSuperColumn>> _map,ConsistencyLevel _lv) { try{ m_client.batch_insert(m_ks,_cf,_map,_lv); return true; }catch(Exception _e){ _e.printStackTrace(); } return false; } public boolean batch_mutate(Map<String,Map<String,List<Mutation>>> _mutateMap,ConsistencyLevel _lv) { try { m_client.batch_mutate(m_ks,_mutateMap,_lv); return true; }catch(Exception _e){ _e.printStackTrace(); } return false; } public String toString() { return "[thread="+this.getName()+",host="+m_host+",port="+m_port+",ks="+m_ks+"]"; } } private class RequestSenderFactory implements ThreadFactory { private int m_port; private String m_host,m_ks; public RequestSenderFactory(String _host,int _port,String _ks) { m_host = _host; m_port = _port; m_ks = _ks; } @Override public Thread newThread(Runnable _runnable) { return RequestSender.newInstance(_runnable,m_host,m_port,m_ks); } } }