Mercurial > hg > RemoteEditor > REPSessionManager
annotate rep/ServerMainLoop.java @ 458:c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
author | one |
---|---|
date | Fri, 24 Sep 2010 02:41:13 +0900 |
parents | 21cb16b7f3df |
children | 66c4f6b29baf |
rev | line source |
---|---|
390 | 1 package rep; |
2 | |
3 import java.io.IOException; | |
4 import java.net.InetSocketAddress; | |
5 import java.net.SocketException; | |
6 import java.nio.channels.ClosedChannelException; | |
7 import java.nio.channels.SelectionKey; | |
458
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
8 import java.util.Collection; |
390 | 9 import java.util.LinkedList; |
10 import java.util.Set; | |
11 import java.util.concurrent.BlockingQueue; | |
12 import java.util.concurrent.LinkedBlockingQueue; | |
13 | |
14 import rep.channel.REPLogger; | |
15 import rep.channel.REPSelectionKey; | |
16 import rep.channel.REPSelector; | |
17 import rep.channel.REPServerSocketChannel; | |
18 import rep.channel.REPSocketChannel; | |
19 import rep.gui.SessionManagerEvent; | |
20 import rep.gui.SessionManagerGUI; | |
458
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
21 import rep.handler.Editor; |
390 | 22 import rep.handler.FirstConnector; |
23 import rep.handler.REPNode; | |
24 | |
411 | 25 /** |
26 * @author kono | |
27 * Single Threaded Server Main Loop | |
28 * maintain multiple connections | |
29 * gui interface is provided. | |
30 * Protocols are handled by our manager. | |
412 | 31 * We believe this is an protocol independent server. |
411 | 32 */ |
390 | 33 public class ServerMainLoop { |
34 | |
35 public static REPLogger logger = REPLogger.singleton(); | |
36 public SessionManager manager; | |
37 protected SessionManagerGUI gui; | |
38 protected REPSelector<REPCommand> selector; | |
39 private BlockingQueue<SessionManagerEvent> waitingEventQueue | |
40 = new LinkedBlockingQueue<SessionManagerEvent>(); | |
41 public String myHost; | |
42 private LinkedList<PacketSet> writeQueue = new LinkedList<PacketSet>(); | |
43 protected int receive_port; | |
44 protected int parent_port; | |
45 protected static final int DEFAULT_PORT = 8766; | |
46 private SessionManagerEvent execAfterConnect = null; | |
430 | 47 private boolean running = true; |
390 | 48 |
49 | |
50 public void setReceivePort(int port) { | |
51 receive_port = port; | |
52 } | |
53 | |
54 void mainLoop(SessionManager sessionManager, int port, SessionManagerGUI gui) throws IOException, | |
55 SocketException, ClosedChannelException { | |
56 this.gui = gui; | |
57 manager = sessionManager; | |
58 receive_port = port; | |
59 serverInit(); | |
60 mainLoop(); | |
61 } | |
62 | |
63 public void mainLoop() throws IOException { | |
430 | 64 while(running){ |
399 | 65 manager.checkWaitingCommandInMerge(); |
390 | 66 if (checkInputEvent() || |
67 checkWaitingWrite()) { | |
420 | 68 continue; |
390 | 69 // try to do fair execution for waiting task |
420 | 70 //if(selector.selectNow() > 0) select(); |
71 //continue; | |
390 | 72 } |
73 // now we can wait for input packet or event | |
74 selector.select(); | |
75 select(); | |
76 } | |
77 } | |
78 | |
79 void serverInit() throws IOException, SocketException, | |
80 ClosedChannelException { | |
81 selector = REPSelector.<REPCommand>create(); | |
82 REPServerSocketChannel<REPCommand> ssc = REPServerSocketChannel.<REPCommand>open(new REPCommandPacker()); | |
83 ssc.configureBlocking(false); // Selector requires this | |
84 ssc.socket().setReuseAddress(true); //reuse address 必須 | |
85 //getAllByNameで取れた全てのアドレスに対してbindする | |
86 try { | |
87 ssc.socket().bind(new InetSocketAddress("::",receive_port)); | |
88 } catch (SocketException e) { | |
391 | 89 // for some IPv6 implementation |
390 | 90 ssc.socket().bind(new InetSocketAddress(receive_port)); |
91 } | |
92 ssc.register(selector, SelectionKey.OP_ACCEPT,null); | |
93 } | |
94 | |
95 private boolean checkInputEvent() { | |
96 SessionManagerEvent e; | |
97 if((e = waitingEventQueue.poll())!=null){ | |
98 e.exec(manager); | |
99 return true; | |
100 } | |
101 return false; | |
102 } | |
430 | 103 |
104 public void serverStop() { | |
105 running = false; | |
106 selector.wakeup(); | |
107 } | |
390 | 108 |
411 | 109 /** |
110 * To avoid dead locks, we write a command one at a time | |
111 * during select(). | |
112 * @return | |
113 * @throws IOException | |
114 */ | |
390 | 115 private boolean checkWaitingWrite() throws IOException { |
116 PacketSet p = writeQueue.poll(); | |
117 if (p!=null) { | |
458
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
118 sendLog(p); |
390 | 119 p.channel.write(p.command); |
120 return true; | |
121 } | |
122 return false; | |
123 } | |
124 | |
411 | 125 /** |
458
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
126 * Move all command to the editor from manager's writing queue to the editor's waiting queue |
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
127 * @param editor |
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
128 */ |
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
129 public void getWriteQueue(Editor editor) { |
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
130 LinkedList<PacketSet> w = new LinkedList<PacketSet>(); |
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
131 for(PacketSet p:writeQueue) { |
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
132 if (p.channel==editor) { |
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
133 editor.addWaitingCommand(p); |
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
134 } else { |
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
135 w.addLast(p); |
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
136 } |
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
137 } |
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
138 writeQueue = w; |
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
139 } |
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
140 /** |
411 | 141 * Debug message |
142 * @param p | |
143 */ | |
401 | 144 private void sendLog(PacketSet p) { |
145 REPNode to; | |
146 String s; | |
458
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
147 to = p.channel; // manager.editorList.editorByChannel(p.channel.channel); |
401 | 148 if (to==null) |
149 s = p.channel.toString(); | |
150 else | |
151 s = to.toString(); | |
152 logger.writeLog("writing: "+p.command+" to: " + s); | |
153 } | |
154 | |
390 | 155 |
399 | 156 public void close(REPSocketChannel<REPCommand> channel) { |
390 | 157 REPSelectionKey<REPCommand>key = channel.keyFor1(selector); |
158 REPNode handler = (REPNode)key.attachment(); | |
159 key.cancel(); | |
160 handler.cancel(channel); | |
161 // we have to remove session/editor | |
162 } | |
163 | |
411 | 164 /** |
165 * Main Select routing | |
166 * check incoming connection request and incoming packet | |
167 * A request is handled by a handler object which is attached | |
168 * to the SelectionKey. | |
169 * @throws IOException | |
170 */ | |
390 | 171 private void select() throws IOException { |
172 | |
173 Set<REPSelectionKey<REPCommand>> keys = selector.selectedKeys1(); | |
174 for(REPSelectionKey<REPCommand> key : keys){ | |
175 if(key.isAcceptable()){ | |
176 /* | |
177 * Incoming connection. We don't know which, editor or | |
178 * session manager. Assign FirstConnector to distinguish. | |
179 */ | |
180 REPSocketChannel<REPCommand> channel = key.accept(new REPCommandPacker()); | |
181 logger.writeLog("SessionManager.select() : key.isAcceptable : channel = " + channel); | |
182 registerChannel(channel, new FirstConnector(manager,channel)); | |
183 } else if(key.isReadable()){ | |
184 /* | |
185 * Incoming packets are handled by a various forwarder. | |
186 * A handler throw IOException() in case of a trouble to | |
187 * close the channel. | |
188 */ | |
189 REPNode handler = (REPNode)key.attachment(); | |
190 try { | |
191 REPCommand command = key.channel1().read(); | |
192 handler.handle(command, key); | |
193 } catch (IOException e) { | |
194 key.cancel(); | |
195 handler.cancel(key.channel1()); | |
196 } | |
197 } | |
198 } | |
199 } | |
200 | |
201 public void registerChannel(REPSocketChannel<REPCommand> channel, REPNode handler) throws IOException { | |
202 if(channel == null) { | |
203 return; | |
204 } | |
205 // handler.setChannel(channel); | |
206 channel.configureBlocking(false); | |
207 channel.register(selector, SelectionKey.OP_READ, handler); | |
208 } | |
209 | |
210 public void setMyHostName(String localHostName) { | |
211 myHost = localHostName + receive_port; | |
212 setHostToEditor(myHost); | |
213 } | |
214 | |
215 public String myHost() { | |
216 return myHost; | |
217 } | |
218 | |
219 private void setHostToEditor(String myHost2) { | |
220 for(REPNode editor : manager.editorList.values()){ | |
221 if (editor.channel!=null) | |
222 editor.setHost(myHost2); | |
223 } | |
224 } | |
225 | |
226 | |
227 public void buttonPressed(SessionManagerEvent event) { | |
228 try { | |
229 waitingEventQueue.put(event); | |
230 } catch (InterruptedException e) {} | |
231 selector.wakeup(); | |
232 } | |
233 | |
234 public void syncExec(SessionManagerEvent event) { | |
235 try { | |
236 waitingEventQueue.put(event); | |
237 } catch (InterruptedException e) { | |
238 } | |
239 } | |
240 | |
241 public void addWriteQueue(PacketSet packetSet) { | |
242 writeQueue.addLast(packetSet); | |
243 assert(writeQueue.size()<SessionManager.packetLimit) ; | |
244 } | |
245 | |
246 public void setParentPort(int port) { | |
247 parent_port = port; | |
248 } | |
249 | |
250 public int getParentPort() { | |
251 return parent_port; | |
252 } | |
253 | |
254 public int getPort() { | |
255 return receive_port; | |
256 } | |
257 | |
258 public void execAfterConnect(SessionManagerEvent sessionManagerEvent) { | |
259 execAfterConnect = sessionManagerEvent; | |
260 } | |
261 | |
262 public void afterConnect() { | |
263 SessionManagerEvent e = execAfterConnect; | |
264 execAfterConnect = null; | |
265 if (e!=null) e.exec(manager); | |
266 } | |
267 | |
268 void removeChannel(SessionManager sessionManager, REPNode channel) { | |
269 REPSelectionKey<REPCommand> key = channel.channel.keyFor1(selector); | |
270 key.cancel(); | |
271 try { | |
272 channel.channel.close1(); | |
273 } catch (IOException e) { | |
274 } | |
275 } | |
276 | |
277 public String toString() { | |
278 return ""+myHost+":"+receive_port; | |
279 } | |
280 | |
281 | |
282 public void setGUI(SessionManagerGUI gui) { | |
283 this.gui = gui; | |
284 } | |
285 | |
286 public void setManager(SessionManager sessionManager) { | |
287 manager = sessionManager; | |
288 } | |
289 | |
290 } |