Mercurial > hg > Database > Alice
comparison src/alice/datasegment/DataSegmentManager.java @ 14:e3f1b21718b0
implements RemoteDataSegment
author | kazz <kazz@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Sun, 15 Jan 2012 00:56:25 +0900 |
parents | 30f97d776a3e |
children | 72dd27d952b0 |
comparison
equal
deleted
inserted
replaced
13:30f97d776a3e | 14:e3f1b21718b0 |
---|---|
1 package alice.datasegment; | 1 package alice.datasegment; |
2 | 2 |
3 import java.util.concurrent.ConcurrentHashMap; | 3 import java.util.concurrent.ConcurrentHashMap; |
4 import java.util.concurrent.LinkedBlockingQueue; | 4 import java.util.concurrent.LinkedBlockingQueue; |
5 import java.util.concurrent.atomic.AtomicInteger; | |
5 | 6 |
6 import org.msgpack.type.Value; | 7 import org.msgpack.type.Value; |
7 | 8 |
8 import alice.codesegment.CodeSegment; | 9 import alice.codesegment.CodeSegment; |
9 | 10 |
10 public abstract class DataSegmentManager { | 11 public abstract class DataSegmentManager { |
12 | |
11 protected ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>(); | 13 protected ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>(); |
12 protected ConcurrentHashMap<Integer, Command> seqHash = new ConcurrentHashMap<Integer, Command>(); | 14 protected ConcurrentHashMap<Integer, Command> seqHash = new ConcurrentHashMap<Integer, Command>(); |
13 public LinkedBlockingQueue<Command> replyQueue = new LinkedBlockingQueue<Command>(); | 15 public LinkedBlockingQueue<Command> replyQueue = new LinkedBlockingQueue<Command>(); |
16 protected AtomicInteger seq = new AtomicInteger(1); | |
17 protected Runnable replyThread = new Runnable() { | |
18 | |
19 @Override | |
20 public void run() { | |
21 while (true) { | |
22 try { | |
23 Command reply = replyQueue.take(); | |
24 Command cmd = seqHash.get(reply.seq); | |
25 cmd.cs.ids.reply(cmd.argKey, new DataSegmentValue(reply.index, reply.val)); | |
26 } catch (InterruptedException e) { | |
27 e.printStackTrace(); | |
28 } | |
29 } | |
30 } | |
31 | |
32 }; | |
14 | 33 |
15 public abstract void put(String key, Value val); | 34 public abstract void put(String key, Value val); |
16 public abstract void update(String key, Value val); | 35 public abstract void update(String key, Value val); |
17 public void take(String argKey, String key, CodeSegment cs) { | 36 public void take(String argKey, String key, CodeSegment cs) { |
18 take(argKey, key, 0, cs); | 37 take(argKey, key, 0, cs); |