Mercurial > hg > RemoteEditor > REPSessionManager
annotate rep/ServerMainLoop.java @ 485:cc262a519b8a
add dead lock detection (don't forget remove )
author | one |
---|---|
date | Wed, 20 Oct 2010 22:59:19 +0900 |
parents | d295e84c5e03 |
children | 5945266c970d |
rev | line source |
---|---|
390 | 1 package rep; |
2 | |
3 import java.io.IOException; | |
4 import java.net.InetSocketAddress; | |
5 import java.net.SocketException; | |
6 import java.nio.channels.ClosedChannelException; | |
7 import java.nio.channels.SelectionKey; | |
8 import java.util.LinkedList; | |
9 import java.util.Set; | |
10 import java.util.concurrent.BlockingQueue; | |
11 import java.util.concurrent.LinkedBlockingQueue; | |
12 | |
13 import rep.channel.REPLogger; | |
14 import rep.channel.REPSelectionKey; | |
15 import rep.channel.REPSelector; | |
16 import rep.channel.REPServerSocketChannel; | |
17 import rep.channel.REPSocketChannel; | |
18 import rep.gui.SessionManagerEvent; | |
19 import rep.gui.SessionManagerGUI; | |
20 import rep.handler.FirstConnector; | |
21 import rep.handler.REPNode; | |
22 | |
411 | 23 /** |
24 * @author kono | |
25 * Single Threaded Server Main Loop | |
26 * maintain multiple connections | |
27 * gui interface is provided. | |
28 * Protocols are handled by our manager. | |
412 | 29 * We believe this is an protocol independent server. |
411 | 30 */ |
390 | 31 public class ServerMainLoop { |
32 | |
33 public static REPLogger logger = REPLogger.singleton(); | |
34 public SessionManager manager; | |
35 protected SessionManagerGUI gui; | |
36 protected REPSelector<REPCommand> selector; | |
37 private BlockingQueue<SessionManagerEvent> waitingEventQueue | |
38 = new LinkedBlockingQueue<SessionManagerEvent>(); | |
39 public String myHost; | |
40 private LinkedList<PacketSet> writeQueue = new LinkedList<PacketSet>(); | |
41 protected int receive_port; | |
42 protected int parent_port; | |
43 protected static final int DEFAULT_PORT = 8766; | |
44 private SessionManagerEvent execAfterConnect = null; | |
430 | 45 private boolean running = true; |
390 | 46 |
47 | |
48 public void setReceivePort(int port) { | |
49 receive_port = port; | |
50 } | |
51 | |
52 void mainLoop(SessionManager sessionManager, int port, SessionManagerGUI gui) throws IOException, | |
53 SocketException, ClosedChannelException { | |
54 this.gui = gui; | |
55 manager = sessionManager; | |
56 receive_port = port; | |
57 serverInit(); | |
58 mainLoop(); | |
59 } | |
60 | |
61 public void mainLoop() throws IOException { | |
485 | 62 int deadlock = 0; |
430 | 63 while(running){ |
399 | 64 manager.checkWaitingCommandInMerge(); |
390 | 65 if (checkInputEvent() || |
66 checkWaitingWrite()) { | |
420 | 67 continue; |
390 | 68 // try to do fair execution for waiting task |
420 | 69 //if(selector.selectNow() > 0) select(); |
70 //continue; | |
390 | 71 } |
72 // now we can wait for input packet or event | |
485 | 73 if (deadlock++>1000) deadlockDetected(); |
462 | 74 selector.select(1); |
485 | 75 if (select()) deadlock=0; |
390 | 76 } |
77 } | |
78 | |
485 | 79 public void deadlockDetected() throws IOException { |
80 } | |
81 | |
390 | 82 void serverInit() throws IOException, SocketException, |
83 ClosedChannelException { | |
84 selector = REPSelector.<REPCommand>create(); | |
85 REPServerSocketChannel<REPCommand> ssc = REPServerSocketChannel.<REPCommand>open(new REPCommandPacker()); | |
86 ssc.configureBlocking(false); // Selector requires this | |
87 ssc.socket().setReuseAddress(true); //reuse address 必須 | |
88 //getAllByNameで取れた全てのアドレスに対してbindする | |
89 try { | |
90 ssc.socket().bind(new InetSocketAddress("::",receive_port)); | |
91 } catch (SocketException e) { | |
391 | 92 // for some IPv6 implementation |
390 | 93 ssc.socket().bind(new InetSocketAddress(receive_port)); |
94 } | |
95 ssc.register(selector, SelectionKey.OP_ACCEPT,null); | |
96 } | |
97 | |
98 private boolean checkInputEvent() { | |
99 SessionManagerEvent e; | |
100 if((e = waitingEventQueue.poll())!=null){ | |
101 e.exec(manager); | |
102 return true; | |
103 } | |
104 return false; | |
105 } | |
430 | 106 |
107 public void serverStop() { | |
108 running = false; | |
109 selector.wakeup(); | |
110 } | |
390 | 111 |
411 | 112 /** |
113 * To avoid dead locks, we write a command one at a time | |
114 * during select(). | |
115 * @return | |
116 * @throws IOException | |
117 */ | |
390 | 118 private boolean checkWaitingWrite() throws IOException { |
119 PacketSet p = writeQueue.poll(); | |
120 if (p!=null) { | |
458
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
121 sendLog(p); |
390 | 122 p.channel.write(p.command); |
123 return true; | |
124 } | |
125 return false; | |
126 } | |
458
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
127 /** |
411 | 128 * Debug message |
129 * @param p | |
130 */ | |
401 | 131 private void sendLog(PacketSet p) { |
132 REPNode to; | |
133 String s; | |
458
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
134 to = p.channel; // manager.editorList.editorByChannel(p.channel.channel); |
401 | 135 if (to==null) |
136 s = p.channel.toString(); | |
137 else | |
138 s = to.toString(); | |
139 logger.writeLog("writing: "+p.command+" to: " + s); | |
140 } | |
141 | |
390 | 142 |
399 | 143 public void close(REPSocketChannel<REPCommand> channel) { |
390 | 144 REPSelectionKey<REPCommand>key = channel.keyFor1(selector); |
145 REPNode handler = (REPNode)key.attachment(); | |
146 key.cancel(); | |
147 handler.cancel(channel); | |
148 // we have to remove session/editor | |
149 } | |
150 | |
411 | 151 /** |
152 * Main Select routing | |
153 * check incoming connection request and incoming packet | |
154 * A request is handled by a handler object which is attached | |
155 * to the SelectionKey. | |
485 | 156 * @return |
411 | 157 * @throws IOException |
158 */ | |
485 | 159 private boolean select() throws IOException { |
160 boolean flag = false; | |
390 | 161 Set<REPSelectionKey<REPCommand>> keys = selector.selectedKeys1(); |
162 for(REPSelectionKey<REPCommand> key : keys){ | |
163 if(key.isAcceptable()){ | |
164 /* | |
165 * Incoming connection. We don't know which, editor or | |
166 * session manager. Assign FirstConnector to distinguish. | |
167 */ | |
168 REPSocketChannel<REPCommand> channel = key.accept(new REPCommandPacker()); | |
169 logger.writeLog("SessionManager.select() : key.isAcceptable : channel = " + channel); | |
170 registerChannel(channel, new FirstConnector(manager,channel)); | |
485 | 171 flag = true; |
390 | 172 } else if(key.isReadable()){ |
173 /* | |
174 * Incoming packets are handled by a various forwarder. | |
175 * A handler throw IOException() in case of a trouble to | |
176 * close the channel. | |
177 */ | |
178 REPNode handler = (REPNode)key.attachment(); | |
179 try { | |
180 REPCommand command = key.channel1().read(); | |
181 handler.handle(command, key); | |
485 | 182 flag = true; |
390 | 183 } catch (IOException e) { |
184 key.cancel(); | |
185 handler.cancel(key.channel1()); | |
186 } | |
187 } | |
188 } | |
485 | 189 return flag; |
390 | 190 } |
191 | |
192 public void registerChannel(REPSocketChannel<REPCommand> channel, REPNode handler) throws IOException { | |
193 if(channel == null) { | |
194 return; | |
195 } | |
196 // handler.setChannel(channel); | |
197 channel.configureBlocking(false); | |
198 channel.register(selector, SelectionKey.OP_READ, handler); | |
199 } | |
200 | |
201 public void setMyHostName(String localHostName) { | |
202 myHost = localHostName + receive_port; | |
203 setHostToEditor(myHost); | |
204 } | |
205 | |
206 public String myHost() { | |
207 return myHost; | |
208 } | |
209 | |
210 private void setHostToEditor(String myHost2) { | |
211 for(REPNode editor : manager.editorList.values()){ | |
212 if (editor.channel!=null) | |
213 editor.setHost(myHost2); | |
214 } | |
215 } | |
216 | |
217 | |
218 public void buttonPressed(SessionManagerEvent event) { | |
219 try { | |
220 waitingEventQueue.put(event); | |
221 } catch (InterruptedException e) {} | |
222 selector.wakeup(); | |
223 } | |
224 | |
225 public void syncExec(SessionManagerEvent event) { | |
226 try { | |
227 waitingEventQueue.put(event); | |
228 } catch (InterruptedException e) { | |
229 } | |
230 } | |
231 | |
232 public void addWriteQueue(PacketSet packetSet) { | |
233 writeQueue.addLast(packetSet); | |
234 assert(writeQueue.size()<SessionManager.packetLimit) ; | |
235 } | |
236 | |
237 public void setParentPort(int port) { | |
238 parent_port = port; | |
239 } | |
240 | |
241 public int getParentPort() { | |
242 return parent_port; | |
243 } | |
244 | |
245 public int getPort() { | |
246 return receive_port; | |
247 } | |
248 | |
249 public void execAfterConnect(SessionManagerEvent sessionManagerEvent) { | |
250 execAfterConnect = sessionManagerEvent; | |
251 } | |
252 | |
253 public void afterConnect() { | |
254 SessionManagerEvent e = execAfterConnect; | |
255 execAfterConnect = null; | |
256 if (e!=null) e.exec(manager); | |
257 } | |
258 | |
259 void removeChannel(SessionManager sessionManager, REPNode channel) { | |
260 REPSelectionKey<REPCommand> key = channel.channel.keyFor1(selector); | |
261 key.cancel(); | |
262 try { | |
263 channel.channel.close1(); | |
264 } catch (IOException e) { | |
265 } | |
266 } | |
267 | |
268 public String toString() { | |
269 return ""+myHost+":"+receive_port; | |
270 } | |
271 | |
272 | |
273 public void setGUI(SessionManagerGUI gui) { | |
274 this.gui = gui; | |
275 } | |
276 | |
277 public void setManager(SessionManager sessionManager) { | |
278 manager = sessionManager; | |
279 } | |
280 | |
281 } |