Mercurial > hg > Members > shoshi > TreeCMSv2
diff src/treecms/tree/cassandra/v1/CassandraForest.java @ 7:fc19e38b669b
added concurrent access client for cassandr
author | shoshi |
---|---|
date | Thu, 17 Mar 2011 23:24:08 +0900 |
parents | 12604eb6b615 |
children | f96193babac0 |
line wrap: on
line diff
--- a/src/treecms/tree/cassandra/v1/CassandraForest.java Mon Mar 14 23:24:38 2011 +0900 +++ b/src/treecms/tree/cassandra/v1/CassandraForest.java Thu Mar 17 23:24:08 2011 +0900 @@ -10,6 +10,7 @@ import java.util.StringTokenizer; import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -37,9 +38,9 @@ import treecms.tree.id.AbstractRandomNodeID; /** - * implementation of TreeCMS with Cassandra backend. + * Cassandra上で非破壊的木構造を実現するためのForestの実装です。 * - * TreeCMSKS.NodeTable (table of all nodes) + * TreeCMSKS.NODETABLE (table of all nodes) * * +---------------------------------------------+ * + Key | Col1 | Col2 | Col3 | ... | @@ -47,7 +48,7 @@ * + NodeID | Children | _attr1 | _attr2 | ... | * +---------------------------------------------+ * - * TreeCMSKS.TipTable (table of tip) + * TreeCMSKS.TIPTABLE (table of tip) * * +--------------------+ * + Key | Col1 | @@ -63,197 +64,47 @@ ExecutorService m_service; //column families. - static final String NODETABLE = "NodeTable"; - static final String TIPTABLE = "TipTable"; + static final String NODETABLE = "NODETABLE"; + static final String TIPTABLE = "TIPTABLE"; - //reserved word. - static final String NODE = "Node"; - static final String CHILDREN = "Children"; + //reserved column. + static final byte[] TIPID = "TIPID".getBytes(); + static final byte[] CHILDREN = "CHILDREN".getBytes(); static final char PREFIX = '_'; - //id table reserved - static final String TIPID = "TipID"; + //cache + private ConcurrentHashMap<NodeID,CassandraNode> m_cache; + private ConcurrentHashMap<String,CassandraNode> m_tipCache; public CassandraForest(String _host,int _port,String _ks,int _threads) { - m_service = Executors.newFixedThreadPool(_threads,new RequestSenderFactory(_host,_port,_ks)); + m_service = Executors.newFixedThreadPool(_threads,new ClientThreadFactory(_host,_port)); + m_cache = new ConcurrentHashMap<NodeID,CassandraNode>(); + m_tipCache = new ConcurrentHashMap<String,CassandraNode>(); } @Override public Node get(NodeID _id) { - return new CassandraNode(this,_id); } @Override public Node create() { - return createNode(null,null); //create new node + return createNode(null,null); } public NodeData getNodeData(NodeID _id) { final NodeID id = _id; - - //create future task. - Callable<List<ColumnOrSuperColumn>> task = new Callable<List<ColumnOrSuperColumn>>(){ - @Override - public List<ColumnOrSuperColumn> call() throws Exception - { - RequestSender sender = (RequestSender)Thread.currentThread(); - List<ColumnOrSuperColumn> res = sender.get_slice(NODETABLE,id.toString(),ConsistencyLevel.ONE); - return res; - } - }; - Future<List<ColumnOrSuperColumn>> future = m_service.submit(task); - - NodeData data = new NodeData(); - try{ - List<ColumnOrSuperColumn> slice = future.get(); - - //iterate column - for(ColumnOrSuperColumn column : slice){ - String name = new String(column.column.name); - - //if column name matches CHILDREN , deserialize value to child list. - if(name.equals(CHILDREN)){ - List<Node> tmp = deserialize(new String(column.column.value)); - data.add(tmp); - }else{ - String key = name.substring(1); //size of prefix - data.set(key.getBytes(),column.column.value); - } - } - }catch(Exception _e){ - _e.printStackTrace(); - } - - return data; - } - - public List<CassandraNode> multiCreateNode(List<CassandraNode> _list) - { - final Map<String,Map<String,List<Mutation>>> mutationMap = new HashMap<String,Map<String,List<Mutation>>>(); - - Map<String,List<Mutation>> nodeTable = new HashMap<String,List<Mutation>>(); - Map<String,List<Mutation>> tipTable = new HashMap<String,List<Mutation>>(); - for(CassandraNode node : _list){ - LinkedList<Mutation> list = new LinkedList<Mutation>(); - Mutation mut = new Mutation(); - ColumnOrSuperColumn column = new ColumnOrSuperColumn(); - mut.column_or_supercolumn = column; - column.column.name = CHILDREN.getBytes(); - column.column.value = serialize(node.getData().list()).getBytes(); - list.add(mut); - - for(byte[] key : node.getData().keys()){ - mut = new Mutation(); - column = new ColumnOrSuperColumn(); - mut.column_or_supercolumn = column; - column.column.name = key; - column.column.value = node.getData().get(key); - - list.add(mut); - } - - nodeTable.put(node.getID().toString(),list); - - mut = new Mutation(); - column = new ColumnOrSuperColumn(); - column.column.name = TIPID.getBytes(); - column.column.value = node.getID().getVersion().getBytes(); - list = new LinkedList<Mutation>(); - list.add(mut); - tipTable.put(node.getID().getUUID(),list); - } - mutationMap.put(NODETABLE,nodeTable); - mutationMap.put(TIPTABLE,tipTable); - - Runnable task = new Runnable(){ - @Override - public void run() - { - RequestSender sender = (RequestSender)Thread.currentThread(); - sender.batch_mutate(mutationMap,ConsistencyLevel.ONE); - } - }; - - m_service.execute(task); - - return _list; - } - - /** - * list serializer. - * ex. list{"hoge","fuga"} -> "hoge,fuga" - * @param _list - * @return selialized string - */ - public String serialize(List<Node> _list) - { - String prefix = ""; - StringBuffer buf = new StringBuffer(); - for(Node child : _list){ - buf.append(prefix+child.getID().toString()); - prefix = ","; - } - - return buf.toString(); - } - - /** - * string deserializer. - * ex. "hoge,fuga" -> list{"hoge","fuga"} - * @param _selialized - * @return list - */ - public LinkedList<Node> deserialize(String _serialized) throws IllegalArgumentException - { - StringTokenizer tokens = new StringTokenizer(_serialized,","); - LinkedList<Node> res = new LinkedList<Node>(); - - while(tokens.hasMoreElements()){ - String tmp = tokens.nextToken(); - StringTokenizer uuidAndVer = new StringTokenizer(tmp,"@"); - - try{ - NodeID id = createID(uuidAndVer.nextToken(),uuidAndVer.nextToken()); - res.add(get(id)); - }catch(Exception _e){ - throw new IllegalArgumentException("unable to deserialize string ["+_serialized+"]",_e); - } - } - - return res; + return null; } public NodeID getTipID(String _uuid) { final String uuid = _uuid; - Callable<byte[]> task = new Callable<byte[]>(){ - @Override - public byte[] call() throws Exception - { - RequestSender sender = (RequestSender)Thread.currentThread(); - byte[] value = sender.get(NODETABLE,uuid,TIPID.getBytes(),ConsistencyLevel.ONE); - return value; - } - }; - - Future<byte[]> future = m_service.submit(task); - - try { - byte[] value = future.get(); - String id = new String(value); - StringTokenizer token = new StringTokenizer(id,"@"); - NodeID nodeID = createID(token.nextToken(),token.nextToken()); - return nodeID; - }catch(Exception _e){ - _e.printStackTrace(); - } - - return null; //not found. + return null; } public Node createNode(NodeID _id,NodeData _data) @@ -265,65 +116,7 @@ @Override public Boolean call() throws Exception { - RequestSender sender = (RequestSender)Thread.currentThread(); - - //mutation map - HashMap<String,Map<String,List<Mutation>>> map = new HashMap<String,Map<String,List<Mutation>>>(); - - /* - * create mutation map for NODETABLE - */ - if(data != null){ - LinkedList<Mutation> list = new LinkedList<Mutation>(); - HashMap<String,List<Mutation>> info = new HashMap<String,List<Mutation>>(); - Iterator<Node> itr = data.list().iterator(); - - /* - * create CSV from child list. - */ - StringBuffer buffer = new StringBuffer(); - for(String prefix = "";itr.hasNext();prefix = ","){ - buffer.append(String.format("%s%s",prefix,itr.next().getID().toString())); - } - Mutation mutChildren = new Mutation(); - ColumnOrSuperColumn children = new ColumnOrSuperColumn(); - children.column.name = CHILDREN.getBytes(); - children.column.value = buffer.toString().getBytes(); - mutChildren.column_or_supercolumn = children; - list.add(mutChildren); - - /* - * - */ - for(byte[] key : data.keys()){ - Mutation mut = new Mutation(); - ColumnOrSuperColumn column = new ColumnOrSuperColumn(); - column.column.name = key; - column.column.value = data.get(key); - mut.column_or_supercolumn = column; - list.add(mut); - } - info.put(id.toString(),list); - - map.put(NODETABLE,info); - } - - /* - * create mutation map for NODEIDTABLE - */ - HashMap<String,List<Mutation>> idtable_mutations = new HashMap<String,List<Mutation>>(); - LinkedList<Mutation> list = new LinkedList<Mutation>(); - - Mutation mutTipID = new Mutation(); - ColumnOrSuperColumn tipID = new ColumnOrSuperColumn(); - tipID.column.name = TIPID.getBytes(); - tipID.column.value = id.getVersion().getBytes(); - mutTipID.column_or_supercolumn = tipID; - - list.add(mutTipID); - idtable_mutations.put(TIPTABLE,list); - - return sender.batch_mutate(map,ConsistencyLevel.ONE); + return true; } }; @@ -372,141 +165,16 @@ return m_version; } } - - private static class RequestSender extends Thread + + @Override + public Node create(NodeData data) { - private int m_port; - private String m_host,m_ks; - private Cassandra.Client m_client; - - public RequestSender(Runnable _runnable,String _host,int _port,String _ks) throws TTransportException - { - super(_runnable); - m_port = _port; - m_host = _host; - m_ks = _ks; - - connect(); - } - - public void connect() throws TTransportException - { - TTransport tr = new TSocket(m_host,m_port); - TProtocol proto = new TBinaryProtocol(tr); - m_client = new Cassandra.Client(proto); - - tr.open(); - } - - public static RequestSender newInstance(Runnable _runnable,String _host,int _port,String _ks) - { - RequestSender sender = null; - try { - sender = new RequestSender(_runnable,_host,_port,_ks); - } catch (TTransportException _e) { - _e.printStackTrace(); - } - - return sender; - } - - public byte[] get(String _cf,String _key,byte[] _name,ConsistencyLevel _lv) - { - byte[] ret = null; - - ColumnPath path = new ColumnPath(); - path.column_family = _cf; - path.column = _name; - try { - ColumnOrSuperColumn cors = m_client.get(m_ks,_key,path,_lv); - ret = cors.column.value; - }catch(NotFoundException _e){ - System.out.println(String.format("column not found [%s][%s][%s]",_cf,_key,new String(_name))); - }catch(Exception _e){ - _e.printStackTrace(); - } - - return ret; - } - - public boolean insert(String _cf,String _key,byte[] _name,byte[] _value,ConsistencyLevel _lv) - { - ColumnPath path = new ColumnPath(); - path.column_family = _cf; - path.column = _name; - - try{ - m_client.insert(m_ks,_key,path,_value,System.currentTimeMillis()/1000,_lv); - return true; - }catch(Exception _e){ - _e.printStackTrace(); - } - - return false; - } - - public List<ColumnOrSuperColumn> get_slice(String _cf,String _key,ConsistencyLevel _lv) - { - List<ColumnOrSuperColumn> ret = null; - SliceRange sr = new SliceRange(new byte[0],new byte[0],false,-1); - SlicePredicate sp = new SlicePredicate(); - sp.slice_range = sr; - - try { - ret = m_client.get_slice(m_ks,_key,new ColumnParent(_cf),sp,_lv); - }catch(Exception _e){ - _e.printStackTrace(); - } - - return ret; - } - - public boolean batch_insert(String _cf,Map<String,List<ColumnOrSuperColumn>> _map,ConsistencyLevel _lv) - { - try{ - m_client.batch_insert(m_ks,_cf,_map,_lv); - return true; - }catch(Exception _e){ - _e.printStackTrace(); - } - - return false; - } - - public boolean batch_mutate(Map<String,Map<String,List<Mutation>>> _mutateMap,ConsistencyLevel _lv) - { - try { - m_client.batch_mutate(m_ks,_mutateMap,_lv); - return true; - }catch(Exception _e){ - _e.printStackTrace(); - } - - return false; - } - - public String toString() - { - return "[thread="+this.getName()+",host="+m_host+",port="+m_port+",ks="+m_ks+"]"; - } + return null; } - - private class RequestSenderFactory implements ThreadFactory + + @Override + public Node getTip(String uuid) { - private int m_port; - private String m_host,m_ks; - - public RequestSenderFactory(String _host,int _port,String _ks) - { - m_host = _host; - m_port = _port; - m_ks = _ks; - } - - @Override - public Thread newThread(Runnable _runnable) - { - return RequestSender.newInstance(_runnable,m_host,m_port,m_ks); - } + return null; } }