Mercurial > hg > Database > Alice
comparison src/main/java/alice/datasegment/LocalDataSegmentManager.java @ 345:8f71c3e6f11d
Change directory structure Maven standard
author | sugi |
---|---|
date | Wed, 16 Apr 2014 18:26:07 +0900 |
parents | |
children | 388e7d4b0624 |
comparison
equal
deleted
inserted
replaced
344:9f97ec18f8c5 | 345:8f71c3e6f11d |
---|---|
1 package alice.datasegment; | |
2 | |
3 import java.util.concurrent.ConcurrentHashMap; | |
4 import java.util.concurrent.LinkedBlockingQueue; | |
5 import java.util.concurrent.ThreadPoolExecutor; | |
6 import java.util.concurrent.TimeUnit; | |
7 | |
8 import org.apache.log4j.Logger; | |
9 | |
10 import alice.codesegment.CodeSegment; | |
11 | |
12 public class LocalDataSegmentManager extends DataSegmentManager { | |
13 | |
14 private String reverseKey = "local"; | |
15 private ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>(); | |
16 private Logger logger = Logger.getLogger("local"); | |
17 | |
18 private ThreadPoolExecutor dataSegmentExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), // initial number of threads | |
19 Runtime.getRuntime().availableProcessors(), | |
20 Integer.MAX_VALUE, // keepAliveTime | |
21 TimeUnit.SECONDS, | |
22 new LinkedBlockingQueue<Runnable>()); | |
23 | |
24 public LocalDataSegmentManager() { | |
25 new Thread(replyThread, "LocalDataSegmentManager-replyCommand").start(); | |
26 } | |
27 | |
28 private class RunCommand implements Runnable { | |
29 | |
30 DataSegmentKey key; | |
31 Command cmd; | |
32 | |
33 public RunCommand(DataSegmentKey key, Command cmd) { | |
34 this.key = key; | |
35 this.cmd = cmd; | |
36 } | |
37 | |
38 @Override | |
39 public void run() { | |
40 key.runCommand(cmd); | |
41 } | |
42 | |
43 } | |
44 | |
45 public void submitCommand(DataSegmentKey key, Command cmd) { | |
46 dataSegmentExecutor.execute(new RunCommand(key, cmd)); | |
47 } | |
48 | |
49 public DataSegmentKey getDataSegmentKey(String key) { | |
50 DataSegmentKey dsKey = dataSegments.get(key); | |
51 if (dsKey != null) | |
52 return dsKey; | |
53 if (key == null) | |
54 return null; | |
55 DataSegmentKey newDataSegmentKey = new DataSegmentKey(); | |
56 DataSegmentKey dataSegmentKey = dataSegments.putIfAbsent(key, newDataSegmentKey); | |
57 if (dataSegmentKey == null) { | |
58 dataSegmentKey = newDataSegmentKey; | |
59 } | |
60 return dataSegmentKey; | |
61 } | |
62 | |
63 @Override | |
64 public void put(String key, Object val) { | |
65 DataSegmentKey dataSegmentKey = getDataSegmentKey(key); | |
66 Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, reverseKey); | |
67 dataSegmentKey.runCommand(cmd); | |
68 if (logger.isDebugEnabled()) | |
69 logger.debug(cmd.getCommandString()); | |
70 } | |
71 | |
72 @Override | |
73 public void quickPut(String key, Object val) { | |
74 put(key, val); | |
75 } | |
76 | |
77 /** | |
78 * Enqueue update command to the queue of each DataSegment key | |
79 */ | |
80 | |
81 @Override | |
82 public void update(String key, Object val) { | |
83 DataSegmentKey dataSegmentKey = getDataSegmentKey(key); | |
84 Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, reverseKey); | |
85 dataSegmentKey.runCommand(cmd); | |
86 if (logger.isDebugEnabled()) | |
87 logger.debug(cmd.getCommandString()); | |
88 } | |
89 | |
90 | |
91 @Override | |
92 public void quickUpdate(String key, Object val) { | |
93 update(key, val); | |
94 } | |
95 | |
96 | |
97 | |
98 @Override | |
99 public void take(Receiver receiver, CodeSegment cs) { | |
100 DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); | |
101 int seq = this.seq.getAndIncrement(); | |
102 Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); | |
103 dataSegmentKey.runCommand(cmd); | |
104 if (logger.isDebugEnabled()) | |
105 logger.debug(cmd.getCommandString()); | |
106 } | |
107 | |
108 @Override | |
109 public void quickTake(Receiver receiver, CodeSegment cs) { | |
110 take(receiver, cs); | |
111 } | |
112 | |
113 @Override | |
114 public void peek(Receiver receiver, CodeSegment cs) { | |
115 DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); | |
116 int seq = this.seq.getAndIncrement(); | |
117 Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); | |
118 dataSegmentKey.runCommand(cmd); | |
119 if (logger.isDebugEnabled()) | |
120 logger.debug(cmd.getCommandString()); | |
121 } | |
122 | |
123 @Override | |
124 public void quickPeek(Receiver receiver, CodeSegment cs) { | |
125 peek(receiver, cs); | |
126 } | |
127 | |
128 | |
129 @Override | |
130 public void remove(String key) { | |
131 DataSegmentKey dataSegmentKey = getDataSegmentKey(key); | |
132 Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, replyQueue, null, null); | |
133 dataSegmentKey.runCommand(cmd); | |
134 if (logger.isDebugEnabled()) | |
135 logger.debug(cmd.getCommandString()); | |
136 } | |
137 | |
138 @Override public void finish() { | |
139 System.exit(0); | |
140 } | |
141 | |
142 @Override | |
143 public void close() { | |
144 | |
145 } | |
146 | |
147 public void recommand(Receiver receiver, CodeSegment cs) { | |
148 DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); | |
149 int seq = this.seq.getAndIncrement(); | |
150 Command cmd = new Command(receiver.type, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); | |
151 dataSegmentKey.runCommand(cmd); | |
152 if (logger.isDebugEnabled()) | |
153 logger.debug(cmd.getCommandString()); | |
154 | |
155 } | |
156 | |
157 @Override | |
158 public void ping(String returnKey) { | |
159 | |
160 } | |
161 | |
162 @Override | |
163 public void response(String returnKey) { | |
164 | |
165 } | |
166 | |
167 @Override | |
168 public void shutdown(String key) { | |
169 | |
170 } | |
171 | |
172 } |