comparison src/alice/datasegment/DataSegmentManager.java @ 14:e3f1b21718b0

implements RemoteDataSegment
author kazz <kazz@cr.ie.u-ryukyu.ac.jp>
date Sun, 15 Jan 2012 00:56:25 +0900
parents 30f97d776a3e
children 72dd27d952b0
comparison
equal deleted inserted replaced
13:30f97d776a3e 14:e3f1b21718b0
1 package alice.datasegment; 1 package alice.datasegment;
2 2
3 import java.util.concurrent.ConcurrentHashMap; 3 import java.util.concurrent.ConcurrentHashMap;
4 import java.util.concurrent.LinkedBlockingQueue; 4 import java.util.concurrent.LinkedBlockingQueue;
5 import java.util.concurrent.atomic.AtomicInteger;
5 6
6 import org.msgpack.type.Value; 7 import org.msgpack.type.Value;
7 8
8 import alice.codesegment.CodeSegment; 9 import alice.codesegment.CodeSegment;
9 10
10 public abstract class DataSegmentManager { 11 public abstract class DataSegmentManager {
12
11 protected ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>(); 13 protected ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>();
12 protected ConcurrentHashMap<Integer, Command> seqHash = new ConcurrentHashMap<Integer, Command>(); 14 protected ConcurrentHashMap<Integer, Command> seqHash = new ConcurrentHashMap<Integer, Command>();
13 public LinkedBlockingQueue<Command> replyQueue = new LinkedBlockingQueue<Command>(); 15 public LinkedBlockingQueue<Command> replyQueue = new LinkedBlockingQueue<Command>();
16 protected AtomicInteger seq = new AtomicInteger(1);
17 protected Runnable replyThread = new Runnable() {
18
19 @Override
20 public void run() {
21 while (true) {
22 try {
23 Command reply = replyQueue.take();
24 Command cmd = seqHash.get(reply.seq);
25 cmd.cs.ids.reply(cmd.argKey, new DataSegmentValue(reply.index, reply.val));
26 } catch (InterruptedException e) {
27 e.printStackTrace();
28 }
29 }
30 }
31
32 };
14 33
15 public abstract void put(String key, Value val); 34 public abstract void put(String key, Value val);
16 public abstract void update(String key, Value val); 35 public abstract void update(String key, Value val);
17 public void take(String argKey, String key, CodeSegment cs) { 36 public void take(String argKey, String key, CodeSegment cs) {
18 take(argKey, key, 0, cs); 37 take(argKey, key, 0, cs);