Mercurial > hg > RemoteEditor > REPSessionManager
annotate rep/ServerMainLoop.java @ 491:5945266c970d
before unMergedCmds fix , deadlockTimer API
author | one |
---|---|
date | Sat, 23 Oct 2010 12:34:46 +0900 |
parents | cc262a519b8a |
children |
rev | line source |
---|---|
390 | 1 package rep; |
2 | |
3 import java.io.IOException; | |
4 import java.net.InetSocketAddress; | |
5 import java.net.SocketException; | |
6 import java.nio.channels.ClosedChannelException; | |
7 import java.nio.channels.SelectionKey; | |
8 import java.util.LinkedList; | |
9 import java.util.Set; | |
10 import java.util.concurrent.BlockingQueue; | |
11 import java.util.concurrent.LinkedBlockingQueue; | |
12 | |
13 import rep.channel.REPLogger; | |
14 import rep.channel.REPSelectionKey; | |
15 import rep.channel.REPSelector; | |
16 import rep.channel.REPServerSocketChannel; | |
17 import rep.channel.REPSocketChannel; | |
18 import rep.gui.SessionManagerEvent; | |
19 import rep.gui.SessionManagerGUI; | |
20 import rep.handler.FirstConnector; | |
21 import rep.handler.REPNode; | |
22 | |
411 | 23 /** |
24 * @author kono | |
25 * Single Threaded Server Main Loop | |
26 * maintain multiple connections | |
27 * gui interface is provided. | |
28 * Protocols are handled by our manager. | |
412 | 29 * We believe this is an protocol independent server. |
411 | 30 */ |
390 | 31 public class ServerMainLoop { |
32 | |
33 public static REPLogger logger = REPLogger.singleton(); | |
34 public SessionManager manager; | |
35 protected SessionManagerGUI gui; | |
36 protected REPSelector<REPCommand> selector; | |
37 private BlockingQueue<SessionManagerEvent> waitingEventQueue | |
38 = new LinkedBlockingQueue<SessionManagerEvent>(); | |
39 public String myHost; | |
40 private LinkedList<PacketSet> writeQueue = new LinkedList<PacketSet>(); | |
41 protected int receive_port; | |
42 protected int parent_port; | |
43 protected static final int DEFAULT_PORT = 8766; | |
44 private SessionManagerEvent execAfterConnect = null; | |
430 | 45 private boolean running = true; |
491 | 46 // dead lock detection in msec, -1 means no deadlock detection |
47 public int deadlockTime = -1; | |
390 | 48 |
49 | |
50 public void setReceivePort(int port) { | |
51 receive_port = port; | |
52 } | |
53 | |
54 void mainLoop(SessionManager sessionManager, int port, SessionManagerGUI gui) throws IOException, | |
55 SocketException, ClosedChannelException { | |
56 this.gui = gui; | |
57 manager = sessionManager; | |
58 receive_port = port; | |
59 serverInit(); | |
60 mainLoop(); | |
61 } | |
62 | |
63 public void mainLoop() throws IOException { | |
491 | 64 int deadlock = deadlockTime; |
430 | 65 while(running){ |
399 | 66 manager.checkWaitingCommandInMerge(); |
390 | 67 if (checkInputEvent() || |
68 checkWaitingWrite()) { | |
420 | 69 continue; |
390 | 70 // try to do fair execution for waiting task |
420 | 71 //if(selector.selectNow() > 0) select(); |
72 //continue; | |
390 | 73 } |
74 // now we can wait for input packet or event | |
491 | 75 if (deadlock-- ==0) deadlockDetected(); |
462 | 76 selector.select(1); |
491 | 77 if (select()) deadlock=deadlockTime; |
390 | 78 } |
79 } | |
80 | |
485 | 81 public void deadlockDetected() throws IOException { |
82 } | |
83 | |
390 | 84 void serverInit() throws IOException, SocketException, |
85 ClosedChannelException { | |
86 selector = REPSelector.<REPCommand>create(); | |
87 REPServerSocketChannel<REPCommand> ssc = REPServerSocketChannel.<REPCommand>open(new REPCommandPacker()); | |
88 ssc.configureBlocking(false); // Selector requires this | |
89 ssc.socket().setReuseAddress(true); //reuse address 必須 | |
90 //getAllByNameで取れた全てのアドレスに対してbindする | |
91 try { | |
92 ssc.socket().bind(new InetSocketAddress("::",receive_port)); | |
93 } catch (SocketException e) { | |
391 | 94 // for some IPv6 implementation |
390 | 95 ssc.socket().bind(new InetSocketAddress(receive_port)); |
96 } | |
97 ssc.register(selector, SelectionKey.OP_ACCEPT,null); | |
98 } | |
99 | |
100 private boolean checkInputEvent() { | |
101 SessionManagerEvent e; | |
102 if((e = waitingEventQueue.poll())!=null){ | |
103 e.exec(manager); | |
104 return true; | |
105 } | |
106 return false; | |
107 } | |
430 | 108 |
109 public void serverStop() { | |
110 running = false; | |
111 selector.wakeup(); | |
112 } | |
390 | 113 |
411 | 114 /** |
115 * To avoid dead locks, we write a command one at a time | |
116 * during select(). | |
117 * @return | |
118 * @throws IOException | |
119 */ | |
390 | 120 private boolean checkWaitingWrite() throws IOException { |
121 PacketSet p = writeQueue.poll(); | |
122 if (p!=null) { | |
458
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
123 sendLog(p); |
390 | 124 p.channel.write(p.command); |
125 return true; | |
126 } | |
127 return false; | |
128 } | |
458
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
129 /** |
411 | 130 * Debug message |
131 * @param p | |
132 */ | |
401 | 133 private void sendLog(PacketSet p) { |
134 REPNode to; | |
135 String s; | |
458
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
136 to = p.channel; // manager.editorList.editorByChannel(p.channel.channel); |
401 | 137 if (to==null) |
138 s = p.channel.toString(); | |
139 else | |
140 s = to.toString(); | |
141 logger.writeLog("writing: "+p.command+" to: " + s); | |
142 } | |
143 | |
390 | 144 |
399 | 145 public void close(REPSocketChannel<REPCommand> channel) { |
390 | 146 REPSelectionKey<REPCommand>key = channel.keyFor1(selector); |
147 REPNode handler = (REPNode)key.attachment(); | |
148 key.cancel(); | |
149 handler.cancel(channel); | |
150 // we have to remove session/editor | |
151 } | |
152 | |
411 | 153 /** |
154 * Main Select routing | |
155 * check incoming connection request and incoming packet | |
156 * A request is handled by a handler object which is attached | |
157 * to the SelectionKey. | |
485 | 158 * @return |
411 | 159 * @throws IOException |
160 */ | |
485 | 161 private boolean select() throws IOException { |
162 boolean flag = false; | |
390 | 163 Set<REPSelectionKey<REPCommand>> keys = selector.selectedKeys1(); |
164 for(REPSelectionKey<REPCommand> key : keys){ | |
165 if(key.isAcceptable()){ | |
166 /* | |
167 * Incoming connection. We don't know which, editor or | |
168 * session manager. Assign FirstConnector to distinguish. | |
169 */ | |
170 REPSocketChannel<REPCommand> channel = key.accept(new REPCommandPacker()); | |
171 logger.writeLog("SessionManager.select() : key.isAcceptable : channel = " + channel); | |
172 registerChannel(channel, new FirstConnector(manager,channel)); | |
485 | 173 flag = true; |
390 | 174 } else if(key.isReadable()){ |
175 /* | |
176 * Incoming packets are handled by a various forwarder. | |
177 * A handler throw IOException() in case of a trouble to | |
178 * close the channel. | |
179 */ | |
180 REPNode handler = (REPNode)key.attachment(); | |
181 try { | |
182 REPCommand command = key.channel1().read(); | |
183 handler.handle(command, key); | |
485 | 184 flag = true; |
390 | 185 } catch (IOException e) { |
186 key.cancel(); | |
187 handler.cancel(key.channel1()); | |
188 } | |
189 } | |
190 } | |
485 | 191 return flag; |
390 | 192 } |
193 | |
194 public void registerChannel(REPSocketChannel<REPCommand> channel, REPNode handler) throws IOException { | |
195 if(channel == null) { | |
196 return; | |
197 } | |
198 // handler.setChannel(channel); | |
199 channel.configureBlocking(false); | |
200 channel.register(selector, SelectionKey.OP_READ, handler); | |
201 } | |
202 | |
203 public void setMyHostName(String localHostName) { | |
204 myHost = localHostName + receive_port; | |
205 setHostToEditor(myHost); | |
206 } | |
207 | |
208 public String myHost() { | |
209 return myHost; | |
210 } | |
211 | |
212 private void setHostToEditor(String myHost2) { | |
213 for(REPNode editor : manager.editorList.values()){ | |
214 if (editor.channel!=null) | |
215 editor.setHost(myHost2); | |
216 } | |
217 } | |
218 | |
219 | |
220 public void buttonPressed(SessionManagerEvent event) { | |
221 try { | |
222 waitingEventQueue.put(event); | |
223 } catch (InterruptedException e) {} | |
224 selector.wakeup(); | |
225 } | |
226 | |
227 public void syncExec(SessionManagerEvent event) { | |
228 try { | |
229 waitingEventQueue.put(event); | |
230 } catch (InterruptedException e) { | |
231 } | |
232 } | |
233 | |
234 public void addWriteQueue(PacketSet packetSet) { | |
235 writeQueue.addLast(packetSet); | |
236 assert(writeQueue.size()<SessionManager.packetLimit) ; | |
237 } | |
238 | |
239 public void setParentPort(int port) { | |
240 parent_port = port; | |
241 } | |
242 | |
243 public int getParentPort() { | |
244 return parent_port; | |
245 } | |
246 | |
247 public int getPort() { | |
248 return receive_port; | |
249 } | |
250 | |
251 public void execAfterConnect(SessionManagerEvent sessionManagerEvent) { | |
252 execAfterConnect = sessionManagerEvent; | |
253 } | |
254 | |
255 public void afterConnect() { | |
256 SessionManagerEvent e = execAfterConnect; | |
257 execAfterConnect = null; | |
258 if (e!=null) e.exec(manager); | |
259 } | |
260 | |
261 void removeChannel(SessionManager sessionManager, REPNode channel) { | |
262 REPSelectionKey<REPCommand> key = channel.channel.keyFor1(selector); | |
263 key.cancel(); | |
264 try { | |
265 channel.channel.close1(); | |
266 } catch (IOException e) { | |
267 } | |
268 } | |
269 | |
270 public String toString() { | |
271 return ""+myHost+":"+receive_port; | |
272 } | |
273 | |
274 | |
275 public void setGUI(SessionManagerGUI gui) { | |
276 this.gui = gui; | |
277 } | |
278 | |
279 public void setManager(SessionManager sessionManager) { | |
280 manager = sessionManager; | |
281 } | |
491 | 282 |
283 public void setDeadlockTime(int t) { | |
284 deadlockTime = t; | |
285 } | |
390 | 286 |
287 } |