Mercurial > hg > RemoteEditor > REPSessionManager
annotate rep/ServerMainLoop.java @ 459:66c4f6b29baf
fix getWriteQueue
author | one |
---|---|
date | Fri, 24 Sep 2010 03:24:06 +0900 |
parents | c22f6833d736 |
children | ef70109af810 |
rev | line source |
---|---|
390 | 1 package rep; |
2 | |
3 import java.io.IOException; | |
4 import java.net.InetSocketAddress; | |
5 import java.net.SocketException; | |
6 import java.nio.channels.ClosedChannelException; | |
7 import java.nio.channels.SelectionKey; | |
8 import java.util.LinkedList; | |
9 import java.util.Set; | |
10 import java.util.concurrent.BlockingQueue; | |
11 import java.util.concurrent.LinkedBlockingQueue; | |
12 | |
13 import rep.channel.REPLogger; | |
14 import rep.channel.REPSelectionKey; | |
15 import rep.channel.REPSelector; | |
16 import rep.channel.REPServerSocketChannel; | |
17 import rep.channel.REPSocketChannel; | |
18 import rep.gui.SessionManagerEvent; | |
19 import rep.gui.SessionManagerGUI; | |
458
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
20 import rep.handler.Editor; |
390 | 21 import rep.handler.FirstConnector; |
22 import rep.handler.REPNode; | |
23 | |
411 | 24 /** |
25 * @author kono | |
26 * Single Threaded Server Main Loop | |
27 * maintain multiple connections | |
28 * gui interface is provided. | |
29 * Protocols are handled by our manager. | |
412 | 30 * We believe this is an protocol independent server. |
411 | 31 */ |
390 | 32 public class ServerMainLoop { |
33 | |
34 public static REPLogger logger = REPLogger.singleton(); | |
35 public SessionManager manager; | |
36 protected SessionManagerGUI gui; | |
37 protected REPSelector<REPCommand> selector; | |
38 private BlockingQueue<SessionManagerEvent> waitingEventQueue | |
39 = new LinkedBlockingQueue<SessionManagerEvent>(); | |
40 public String myHost; | |
41 private LinkedList<PacketSet> writeQueue = new LinkedList<PacketSet>(); | |
42 protected int receive_port; | |
43 protected int parent_port; | |
44 protected static final int DEFAULT_PORT = 8766; | |
45 private SessionManagerEvent execAfterConnect = null; | |
430 | 46 private boolean running = true; |
390 | 47 |
48 | |
49 public void setReceivePort(int port) { | |
50 receive_port = port; | |
51 } | |
52 | |
53 void mainLoop(SessionManager sessionManager, int port, SessionManagerGUI gui) throws IOException, | |
54 SocketException, ClosedChannelException { | |
55 this.gui = gui; | |
56 manager = sessionManager; | |
57 receive_port = port; | |
58 serverInit(); | |
59 mainLoop(); | |
60 } | |
61 | |
62 public void mainLoop() throws IOException { | |
430 | 63 while(running){ |
399 | 64 manager.checkWaitingCommandInMerge(); |
390 | 65 if (checkInputEvent() || |
66 checkWaitingWrite()) { | |
420 | 67 continue; |
390 | 68 // try to do fair execution for waiting task |
420 | 69 //if(selector.selectNow() > 0) select(); |
70 //continue; | |
390 | 71 } |
72 // now we can wait for input packet or event | |
73 selector.select(); | |
74 select(); | |
75 } | |
76 } | |
77 | |
78 void serverInit() throws IOException, SocketException, | |
79 ClosedChannelException { | |
80 selector = REPSelector.<REPCommand>create(); | |
81 REPServerSocketChannel<REPCommand> ssc = REPServerSocketChannel.<REPCommand>open(new REPCommandPacker()); | |
82 ssc.configureBlocking(false); // Selector requires this | |
83 ssc.socket().setReuseAddress(true); //reuse address 必須 | |
84 //getAllByNameで取れた全てのアドレスに対してbindする | |
85 try { | |
86 ssc.socket().bind(new InetSocketAddress("::",receive_port)); | |
87 } catch (SocketException e) { | |
391 | 88 // for some IPv6 implementation |
390 | 89 ssc.socket().bind(new InetSocketAddress(receive_port)); |
90 } | |
91 ssc.register(selector, SelectionKey.OP_ACCEPT,null); | |
92 } | |
93 | |
94 private boolean checkInputEvent() { | |
95 SessionManagerEvent e; | |
96 if((e = waitingEventQueue.poll())!=null){ | |
97 e.exec(manager); | |
98 return true; | |
99 } | |
100 return false; | |
101 } | |
430 | 102 |
103 public void serverStop() { | |
104 running = false; | |
105 selector.wakeup(); | |
106 } | |
390 | 107 |
411 | 108 /** |
109 * To avoid dead locks, we write a command one at a time | |
110 * during select(). | |
111 * @return | |
112 * @throws IOException | |
113 */ | |
390 | 114 private boolean checkWaitingWrite() throws IOException { |
115 PacketSet p = writeQueue.poll(); | |
116 if (p!=null) { | |
458
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
117 sendLog(p); |
390 | 118 p.channel.write(p.command); |
119 return true; | |
120 } | |
121 return false; | |
122 } | |
123 | |
411 | 124 /** |
458
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
125 * Move all command to the editor from manager's writing queue to the editor's waiting queue |
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
126 * @param editor |
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
127 */ |
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
128 public void getWriteQueue(Editor editor) { |
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
129 LinkedList<PacketSet> w = new LinkedList<PacketSet>(); |
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
130 for(PacketSet p:writeQueue) { |
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
131 if (p.channel==editor) { |
459 | 132 editor.waitingCommandInMerge.addLast(p); |
458
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
133 } else { |
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
134 w.addLast(p); |
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
135 } |
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
136 } |
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
137 writeQueue = w; |
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
138 } |
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
139 /** |
411 | 140 * Debug message |
141 * @param p | |
142 */ | |
401 | 143 private void sendLog(PacketSet p) { |
144 REPNode to; | |
145 String s; | |
458
c22f6833d736
synchronize Editor's waiting queue and Manager's writing queue
one
parents:
450
diff
changeset
|
146 to = p.channel; // manager.editorList.editorByChannel(p.channel.channel); |
401 | 147 if (to==null) |
148 s = p.channel.toString(); | |
149 else | |
150 s = to.toString(); | |
151 logger.writeLog("writing: "+p.command+" to: " + s); | |
152 } | |
153 | |
390 | 154 |
399 | 155 public void close(REPSocketChannel<REPCommand> channel) { |
390 | 156 REPSelectionKey<REPCommand>key = channel.keyFor1(selector); |
157 REPNode handler = (REPNode)key.attachment(); | |
158 key.cancel(); | |
159 handler.cancel(channel); | |
160 // we have to remove session/editor | |
161 } | |
162 | |
411 | 163 /** |
164 * Main Select routing | |
165 * check incoming connection request and incoming packet | |
166 * A request is handled by a handler object which is attached | |
167 * to the SelectionKey. | |
168 * @throws IOException | |
169 */ | |
390 | 170 private void select() throws IOException { |
171 | |
172 Set<REPSelectionKey<REPCommand>> keys = selector.selectedKeys1(); | |
173 for(REPSelectionKey<REPCommand> key : keys){ | |
174 if(key.isAcceptable()){ | |
175 /* | |
176 * Incoming connection. We don't know which, editor or | |
177 * session manager. Assign FirstConnector to distinguish. | |
178 */ | |
179 REPSocketChannel<REPCommand> channel = key.accept(new REPCommandPacker()); | |
180 logger.writeLog("SessionManager.select() : key.isAcceptable : channel = " + channel); | |
181 registerChannel(channel, new FirstConnector(manager,channel)); | |
182 } else if(key.isReadable()){ | |
183 /* | |
184 * Incoming packets are handled by a various forwarder. | |
185 * A handler throw IOException() in case of a trouble to | |
186 * close the channel. | |
187 */ | |
188 REPNode handler = (REPNode)key.attachment(); | |
189 try { | |
190 REPCommand command = key.channel1().read(); | |
191 handler.handle(command, key); | |
192 } catch (IOException e) { | |
193 key.cancel(); | |
194 handler.cancel(key.channel1()); | |
195 } | |
196 } | |
197 } | |
198 } | |
199 | |
200 public void registerChannel(REPSocketChannel<REPCommand> channel, REPNode handler) throws IOException { | |
201 if(channel == null) { | |
202 return; | |
203 } | |
204 // handler.setChannel(channel); | |
205 channel.configureBlocking(false); | |
206 channel.register(selector, SelectionKey.OP_READ, handler); | |
207 } | |
208 | |
209 public void setMyHostName(String localHostName) { | |
210 myHost = localHostName + receive_port; | |
211 setHostToEditor(myHost); | |
212 } | |
213 | |
214 public String myHost() { | |
215 return myHost; | |
216 } | |
217 | |
218 private void setHostToEditor(String myHost2) { | |
219 for(REPNode editor : manager.editorList.values()){ | |
220 if (editor.channel!=null) | |
221 editor.setHost(myHost2); | |
222 } | |
223 } | |
224 | |
225 | |
226 public void buttonPressed(SessionManagerEvent event) { | |
227 try { | |
228 waitingEventQueue.put(event); | |
229 } catch (InterruptedException e) {} | |
230 selector.wakeup(); | |
231 } | |
232 | |
233 public void syncExec(SessionManagerEvent event) { | |
234 try { | |
235 waitingEventQueue.put(event); | |
236 } catch (InterruptedException e) { | |
237 } | |
238 } | |
239 | |
240 public void addWriteQueue(PacketSet packetSet) { | |
241 writeQueue.addLast(packetSet); | |
242 assert(writeQueue.size()<SessionManager.packetLimit) ; | |
243 } | |
244 | |
245 public void setParentPort(int port) { | |
246 parent_port = port; | |
247 } | |
248 | |
249 public int getParentPort() { | |
250 return parent_port; | |
251 } | |
252 | |
253 public int getPort() { | |
254 return receive_port; | |
255 } | |
256 | |
257 public void execAfterConnect(SessionManagerEvent sessionManagerEvent) { | |
258 execAfterConnect = sessionManagerEvent; | |
259 } | |
260 | |
261 public void afterConnect() { | |
262 SessionManagerEvent e = execAfterConnect; | |
263 execAfterConnect = null; | |
264 if (e!=null) e.exec(manager); | |
265 } | |
266 | |
267 void removeChannel(SessionManager sessionManager, REPNode channel) { | |
268 REPSelectionKey<REPCommand> key = channel.channel.keyFor1(selector); | |
269 key.cancel(); | |
270 try { | |
271 channel.channel.close1(); | |
272 } catch (IOException e) { | |
273 } | |
274 } | |
275 | |
276 public String toString() { | |
277 return ""+myHost+":"+receive_port; | |
278 } | |
279 | |
280 | |
281 public void setGUI(SessionManagerGUI gui) { | |
282 this.gui = gui; | |
283 } | |
284 | |
285 public void setManager(SessionManager sessionManager) { | |
286 manager = sessionManager; | |
287 } | |
288 | |
289 } |