345
|
1 package alice.datasegment;
|
|
2
|
|
3 import java.util.concurrent.ConcurrentHashMap;
|
|
4 import java.util.concurrent.LinkedBlockingQueue;
|
|
5 import java.util.concurrent.atomic.AtomicInteger;
|
|
6
|
|
7 import org.apache.log4j.Logger;
|
|
8
|
|
9 import alice.codesegment.CodeSegment;
|
|
10
|
|
11 public abstract class DataSegmentManager {
|
|
12
|
|
13 protected ConcurrentHashMap<Integer, Command> seqHash = new ConcurrentHashMap<Integer, Command>();
|
|
14 protected LinkedBlockingQueue<Command> replyQueue = new LinkedBlockingQueue<Command>();
|
|
15 protected AtomicInteger seq = new AtomicInteger(1); // waiting for PUT or UPDATE at unique sequence number
|
|
16 // but it doesn't need for Local
|
|
17
|
|
18 protected Runnable replyThread = new Runnable() {
|
|
19 Logger logger = Logger.getLogger("reply");
|
|
20 @Override
|
|
21 public void run() {
|
|
22 while (true) {
|
|
23 try {
|
|
24 Command reply = replyQueue.take();
|
|
25 Command cmd = getAndRemoveCmd(reply.seq);
|
|
26 if (cmd == null) {
|
|
27 logger.warn("conflict sequence number");
|
|
28 continue;
|
|
29 }
|
|
30 cmd.cs.ids.reply(cmd.receiver, reply);
|
|
31 if (logger.isDebugEnabled())
|
|
32 logger.debug(reply.getCommandString() + " " + cmd.getCommandString());
|
|
33 } catch (InterruptedException e) {
|
|
34 e.printStackTrace();
|
|
35 }
|
|
36 }
|
|
37 }
|
|
38
|
|
39 };
|
|
40
|
|
41 public Command getAndRemoveCmd(int index){
|
|
42 return seqHash.remove(index);
|
|
43 }
|
|
44
|
|
45 public void addReplyCommand(Command cmd) {
|
|
46 try {
|
|
47 replyQueue.put(cmd);
|
|
48 } catch (InterruptedException e) {
|
|
49 e.printStackTrace();
|
|
50 }
|
|
51 }
|
|
52
|
|
53 public abstract void put(String key, Object val);
|
|
54 public abstract void update(String key, Object val);
|
|
55 public abstract void take(Receiver receiver, CodeSegment cs);
|
|
56 public abstract void peek(Receiver receiver, CodeSegment cs);
|
|
57
|
|
58 public abstract void quickPut(String key, Object val);
|
|
59 public abstract void quickUpdate(String key, Object val);
|
|
60 public abstract void quickPeek(Receiver receiver, CodeSegment cs);
|
|
61 public abstract void quickTake(Receiver receiver, CodeSegment cs);
|
|
62
|
|
63 public abstract void remove(String key);
|
|
64 public abstract void shutdown(String key);
|
|
65 public abstract void close();
|
|
66 public abstract void finish();
|
|
67
|
|
68 public abstract void ping(String returnKey);
|
|
69 public abstract void response(String returnKey);
|
|
70
|
|
71 }
|