Mercurial > hg > RemoteEditor > REPSessionManager
annotate rep/ServerMainLoop.java @ 481:607f1dfe2b80
add comment
author | Shinji KONO <kono@ie.u-ryukyu.ac.jp> |
---|---|
date | Fri, 15 Oct 2010 19:47:53 +0900 |
parents | d295e84c5e03 |
children | cc262a519b8a |
rev | line source |
---|---|
390 | 1 package rep; |
2 | |
3 import java.io.IOException; | |
4 import java.net.InetSocketAddress; | |
5 import java.net.SocketException; | |
6 import java.nio.channels.ClosedChannelException; | |
7 import java.nio.channels.SelectionKey; | |
8 import java.util.LinkedList; | |
9 import java.util.Set; | |
10 import java.util.concurrent.BlockingQueue; | |
11 import java.util.concurrent.LinkedBlockingQueue; | |
12 | |
13 import rep.channel.REPLogger; | |
14 import rep.channel.REPSelectionKey; | |
15 import rep.channel.REPSelector; | |
16 import rep.channel.REPServerSocketChannel; | |
17 import rep.channel.REPSocketChannel; | |
18 import rep.gui.SessionManagerEvent; | |
19 import rep.gui.SessionManagerGUI; | |
20 import rep.handler.FirstConnector; | |
21 import rep.handler.REPNode; | |
22 | |
411 | 23 /** |
24 * @author kono | |
25 * Single Threaded Server Main Loop | |
26 * maintain multiple connections | |
27 * gui interface is provided. | |
28 * Protocols are handled by our manager. | |
412 | 29 * We believe this is an protocol independent server. |
411 | 30 */ |
390 | 31 public class ServerMainLoop { |
32 | |
33 public static REPLogger logger = REPLogger.singleton(); | |
34 public SessionManager manager; | |
35 protected SessionManagerGUI gui; | |
36 protected REPSelector<REPCommand> selector; | |
37 private BlockingQueue<SessionManagerEvent> waitingEventQueue | |
38 = new LinkedBlockingQueue<SessionManagerEvent>(); | |
39 public String myHost; | |
40 private LinkedList<PacketSet> writeQueue = new LinkedList<PacketSet>(); | |
41 protected int receive_port; | |
42 protected int parent_port; | |
43 protected static final int DEFAULT_PORT = 8766; | |
44 private SessionManagerEvent execAfterConnect = null; | |
430 | 45 private boolean running = true; |
390 | 46 |
47 | |
48 public void setReceivePort(int port) { | |
49 receive_port = port; | |
50 } | |
51 | |
52 void mainLoop(SessionManager sessionManager, int port, SessionManagerGUI gui) throws IOException, | |
53 SocketException, ClosedChannelException { | |
54 this.gui = gui; | |
55 manager = sessionManager; | |
56 receive_port = port; | |
57 serverInit(); | |
58 mainLoop(); | |
59 } | |
60 | |
61 public void mainLoop() throws IOException { | |
430 | 62 while(running){ |
399 | 63 manager.checkWaitingCommandInMerge(); |
390 | 64 if (checkInputEvent() || |
65 checkWaitingWrite()) { | |
420 | 66 continue; |
390 | 67 // try to do fair execution for waiting task |
420 | 68 //if(selector.selectNow() > 0) select(); |
69 //continue; | |
390 | 70 } |
71 // now we can wait for input packet or event | |
462 | 72 selector.select(1); |
390 | 73 select(); |
74 } | |
75 } | |
76 | |
77 void serverInit() throws IOException, SocketException, | |
78 ClosedChannelException { | |
79 selector = REPSelector.<REPCommand>create(); | |
80 REPServerSocketChannel<REPCommand> ssc = REPServerSocketChannel.<REPCommand>open(new REPCommandPacker()); | |
81 ssc.configureBlocking(false); // Selector requires this | |
82 ssc.socket().setReuseAddress(true); //reuse address 必須 | |
83 //getAllByNameで取れた全てのアドレスに対してbindする | |
84 try { | |
85 ssc.socket().bind(new InetSocketAddress("::",receive_port)); | |
86 } catch (SocketException e) { | |
391 | 87 // for some IPv6 implementation |
390 | 88 ssc.socket().bind(new InetSocketAddress(receive_port)); |
89 } | |
90 ssc.register(selector, SelectionKey.OP_ACCEPT,null); | |
91 } | |
92 | |
93 private boolean checkInputEvent() { | |
94 SessionManagerEvent e; | |
95 if((e = waitingEventQueue.poll())!=null){ | |
96 e.exec(manager); | |
97 return true; | |
98 } | |
99 return false; | |
100 } | |
430 | 101 |
102 public void serverStop() { | |
103 running = false; | |
104 selector.wakeup(); | |
105 } | |
390 | 106 |
411 | 107 /** |
108 * To avoid dead locks, we write a command one at a time | |
109 * during select(). | |
110 * @return | |
111 * @throws IOException | |
112 */ | |
390 | 113 private boolean checkWaitingWrite() throws IOException { |
114 PacketSet p = writeQueue.poll(); | |
115 if (p!=null) { | |
458
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
116 sendLog(p); |
390 | 117 p.channel.write(p.command); |
118 return true; | |
119 } | |
120 return false; | |
121 } | |
458
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
122 /** |
411 | 123 * Debug message |
124 * @param p | |
125 */ | |
401 | 126 private void sendLog(PacketSet p) { |
127 REPNode to; | |
128 String s; | |
458
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
129 to = p.channel; // manager.editorList.editorByChannel(p.channel.channel); |
401 | 130 if (to==null) |
131 s = p.channel.toString(); | |
132 else | |
133 s = to.toString(); | |
134 logger.writeLog("writing: "+p.command+" to: " + s); | |
135 } | |
136 | |
390 | 137 |
399 | 138 public void close(REPSocketChannel<REPCommand> channel) { |
390 | 139 REPSelectionKey<REPCommand>key = channel.keyFor1(selector); |
140 REPNode handler = (REPNode)key.attachment(); | |
141 key.cancel(); | |
142 handler.cancel(channel); | |
143 // we have to remove session/editor | |
144 } | |
145 | |
411 | 146 /** |
147 * Main Select routing | |
148 * check incoming connection request and incoming packet | |
149 * A request is handled by a handler object which is attached | |
150 * to the SelectionKey. | |
151 * @throws IOException | |
152 */ | |
390 | 153 private void select() throws IOException { |
154 | |
155 Set<REPSelectionKey<REPCommand>> keys = selector.selectedKeys1(); | |
156 for(REPSelectionKey<REPCommand> key : keys){ | |
157 if(key.isAcceptable()){ | |
158 /* | |
159 * Incoming connection. We don't know which, editor or | |
160 * session manager. Assign FirstConnector to distinguish. | |
161 */ | |
162 REPSocketChannel<REPCommand> channel = key.accept(new REPCommandPacker()); | |
163 logger.writeLog("SessionManager.select() : key.isAcceptable : channel = " + channel); | |
164 registerChannel(channel, new FirstConnector(manager,channel)); | |
165 } else if(key.isReadable()){ | |
166 /* | |
167 * Incoming packets are handled by a various forwarder. | |
168 * A handler throw IOException() in case of a trouble to | |
169 * close the channel. | |
170 */ | |
171 REPNode handler = (REPNode)key.attachment(); | |
172 try { | |
173 REPCommand command = key.channel1().read(); | |
174 handler.handle(command, key); | |
175 } catch (IOException e) { | |
176 key.cancel(); | |
177 handler.cancel(key.channel1()); | |
178 } | |
179 } | |
180 } | |
181 } | |
182 | |
183 public void registerChannel(REPSocketChannel<REPCommand> channel, REPNode handler) throws IOException { | |
184 if(channel == null) { | |
185 return; | |
186 } | |
187 // handler.setChannel(channel); | |
188 channel.configureBlocking(false); | |
189 channel.register(selector, SelectionKey.OP_READ, handler); | |
190 } | |
191 | |
192 public void setMyHostName(String localHostName) { | |
193 myHost = localHostName + receive_port; | |
194 setHostToEditor(myHost); | |
195 } | |
196 | |
197 public String myHost() { | |
198 return myHost; | |
199 } | |
200 | |
201 private void setHostToEditor(String myHost2) { | |
202 for(REPNode editor : manager.editorList.values()){ | |
203 if (editor.channel!=null) | |
204 editor.setHost(myHost2); | |
205 } | |
206 } | |
207 | |
208 | |
209 public void buttonPressed(SessionManagerEvent event) { | |
210 try { | |
211 waitingEventQueue.put(event); | |
212 } catch (InterruptedException e) {} | |
213 selector.wakeup(); | |
214 } | |
215 | |
216 public void syncExec(SessionManagerEvent event) { | |
217 try { | |
218 waitingEventQueue.put(event); | |
219 } catch (InterruptedException e) { | |
220 } | |
221 } | |
222 | |
223 public void addWriteQueue(PacketSet packetSet) { | |
224 writeQueue.addLast(packetSet); | |
225 assert(writeQueue.size()<SessionManager.packetLimit) ; | |
226 } | |
227 | |
228 public void setParentPort(int port) { | |
229 parent_port = port; | |
230 } | |
231 | |
232 public int getParentPort() { | |
233 return parent_port; | |
234 } | |
235 | |
236 public int getPort() { | |
237 return receive_port; | |
238 } | |
239 | |
240 public void execAfterConnect(SessionManagerEvent sessionManagerEvent) { | |
241 execAfterConnect = sessionManagerEvent; | |
242 } | |
243 | |
244 public void afterConnect() { | |
245 SessionManagerEvent e = execAfterConnect; | |
246 execAfterConnect = null; | |
247 if (e!=null) e.exec(manager); | |
248 } | |
249 | |
250 void removeChannel(SessionManager sessionManager, REPNode channel) { | |
251 REPSelectionKey<REPCommand> key = channel.channel.keyFor1(selector); | |
252 key.cancel(); | |
253 try { | |
254 channel.channel.close1(); | |
255 } catch (IOException e) { | |
256 } | |
257 } | |
258 | |
259 public String toString() { | |
260 return ""+myHost+":"+receive_port; | |
261 } | |
262 | |
263 | |
264 public void setGUI(SessionManagerGUI gui) { | |
265 this.gui = gui; | |
266 } | |
267 | |
268 public void setManager(SessionManager sessionManager) { | |
269 manager = sessionManager; | |
270 } | |
271 | |
272 } |