comparison src/main/java/alice/datasegment/LocalDataSegmentManager.java @ 345:8f71c3e6f11d

Change directory structure Maven standard
author sugi
date Wed, 16 Apr 2014 18:26:07 +0900
parents
children 388e7d4b0624
comparison
equal deleted inserted replaced
344:9f97ec18f8c5 345:8f71c3e6f11d
1 package alice.datasegment;
2
3 import java.util.concurrent.ConcurrentHashMap;
4 import java.util.concurrent.LinkedBlockingQueue;
5 import java.util.concurrent.ThreadPoolExecutor;
6 import java.util.concurrent.TimeUnit;
7
8 import org.apache.log4j.Logger;
9
10 import alice.codesegment.CodeSegment;
11
12 public class LocalDataSegmentManager extends DataSegmentManager {
13
14 private String reverseKey = "local";
15 private ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>();
16 private Logger logger = Logger.getLogger("local");
17
18 private ThreadPoolExecutor dataSegmentExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), // initial number of threads
19 Runtime.getRuntime().availableProcessors(),
20 Integer.MAX_VALUE, // keepAliveTime
21 TimeUnit.SECONDS,
22 new LinkedBlockingQueue<Runnable>());
23
24 public LocalDataSegmentManager() {
25 new Thread(replyThread, "LocalDataSegmentManager-replyCommand").start();
26 }
27
28 private class RunCommand implements Runnable {
29
30 DataSegmentKey key;
31 Command cmd;
32
33 public RunCommand(DataSegmentKey key, Command cmd) {
34 this.key = key;
35 this.cmd = cmd;
36 }
37
38 @Override
39 public void run() {
40 key.runCommand(cmd);
41 }
42
43 }
44
45 public void submitCommand(DataSegmentKey key, Command cmd) {
46 dataSegmentExecutor.execute(new RunCommand(key, cmd));
47 }
48
49 public DataSegmentKey getDataSegmentKey(String key) {
50 DataSegmentKey dsKey = dataSegments.get(key);
51 if (dsKey != null)
52 return dsKey;
53 if (key == null)
54 return null;
55 DataSegmentKey newDataSegmentKey = new DataSegmentKey();
56 DataSegmentKey dataSegmentKey = dataSegments.putIfAbsent(key, newDataSegmentKey);
57 if (dataSegmentKey == null) {
58 dataSegmentKey = newDataSegmentKey;
59 }
60 return dataSegmentKey;
61 }
62
63 @Override
64 public void put(String key, Object val) {
65 DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
66 Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, reverseKey);
67 dataSegmentKey.runCommand(cmd);
68 if (logger.isDebugEnabled())
69 logger.debug(cmd.getCommandString());
70 }
71
72 @Override
73 public void quickPut(String key, Object val) {
74 put(key, val);
75 }
76
77 /**
78 * Enqueue update command to the queue of each DataSegment key
79 */
80
81 @Override
82 public void update(String key, Object val) {
83 DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
84 Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, reverseKey);
85 dataSegmentKey.runCommand(cmd);
86 if (logger.isDebugEnabled())
87 logger.debug(cmd.getCommandString());
88 }
89
90
91 @Override
92 public void quickUpdate(String key, Object val) {
93 update(key, val);
94 }
95
96
97
98 @Override
99 public void take(Receiver receiver, CodeSegment cs) {
100 DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key);
101 int seq = this.seq.getAndIncrement();
102 Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
103 dataSegmentKey.runCommand(cmd);
104 if (logger.isDebugEnabled())
105 logger.debug(cmd.getCommandString());
106 }
107
108 @Override
109 public void quickTake(Receiver receiver, CodeSegment cs) {
110 take(receiver, cs);
111 }
112
113 @Override
114 public void peek(Receiver receiver, CodeSegment cs) {
115 DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key);
116 int seq = this.seq.getAndIncrement();
117 Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
118 dataSegmentKey.runCommand(cmd);
119 if (logger.isDebugEnabled())
120 logger.debug(cmd.getCommandString());
121 }
122
123 @Override
124 public void quickPeek(Receiver receiver, CodeSegment cs) {
125 peek(receiver, cs);
126 }
127
128
129 @Override
130 public void remove(String key) {
131 DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
132 Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, replyQueue, null, null);
133 dataSegmentKey.runCommand(cmd);
134 if (logger.isDebugEnabled())
135 logger.debug(cmd.getCommandString());
136 }
137
138 @Override public void finish() {
139 System.exit(0);
140 }
141
142 @Override
143 public void close() {
144
145 }
146
147 public void recommand(Receiver receiver, CodeSegment cs) {
148 DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key);
149 int seq = this.seq.getAndIncrement();
150 Command cmd = new Command(receiver.type, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
151 dataSegmentKey.runCommand(cmd);
152 if (logger.isDebugEnabled())
153 logger.debug(cmd.getCommandString());
154
155 }
156
157 @Override
158 public void ping(String returnKey) {
159
160 }
161
162 @Override
163 public void response(String returnKey) {
164
165 }
166
167 @Override
168 public void shutdown(String key) {
169
170 }
171
172 }