comparison rep/ServerMainLoop.java @ 399:19705f4b8015

waitingCommandInMerge
author one
date Mon, 24 Nov 2008 23:11:51 +0900
parents 3b0a5a55e3ee
children 2cf5392b2a9f
comparison
equal deleted inserted replaced
398:7de83b6a34e7 399:19705f4b8015
4 import java.net.InetSocketAddress; 4 import java.net.InetSocketAddress;
5 import java.net.SocketException; 5 import java.net.SocketException;
6 import java.nio.channels.ClosedChannelException; 6 import java.nio.channels.ClosedChannelException;
7 import java.nio.channels.SelectionKey; 7 import java.nio.channels.SelectionKey;
8 import java.util.LinkedList; 8 import java.util.LinkedList;
9 import java.util.List;
10 import java.util.Set; 9 import java.util.Set;
11 import java.util.concurrent.BlockingQueue; 10 import java.util.concurrent.BlockingQueue;
12 import java.util.concurrent.LinkedBlockingQueue; 11 import java.util.concurrent.LinkedBlockingQueue;
13 12
14 import rep.channel.REPLogger; 13 import rep.channel.REPLogger;
26 25
27 public static REPLogger logger = REPLogger.singleton(); 26 public static REPLogger logger = REPLogger.singleton();
28 public SessionManager manager; 27 public SessionManager manager;
29 protected SessionManagerGUI gui; 28 protected SessionManagerGUI gui;
30 protected REPSelector<REPCommand> selector; 29 protected REPSelector<REPCommand> selector;
31 protected List<PacketSet> waitingCommandInMerge= new LinkedList<PacketSet>();
32 private BlockingQueue<SessionManagerEvent> waitingEventQueue 30 private BlockingQueue<SessionManagerEvent> waitingEventQueue
33 = new LinkedBlockingQueue<SessionManagerEvent>(); 31 = new LinkedBlockingQueue<SessionManagerEvent>();
34 public String myHost; 32 public String myHost;
35 private LinkedList<PacketSet> writeQueue = new LinkedList<PacketSet>(); 33 private LinkedList<PacketSet> writeQueue = new LinkedList<PacketSet>();
36 protected int receive_port; 34 protected int receive_port;
52 mainLoop(); 50 mainLoop();
53 } 51 }
54 52
55 public void mainLoop() throws IOException { 53 public void mainLoop() throws IOException {
56 while(true){ 54 while(true){
57 checkWaitingCommandInMerge(); 55 manager.checkWaitingCommandInMerge();
58 if (checkInputEvent() || 56 if (checkInputEvent() ||
59 checkWaitingWrite()) { 57 checkWaitingWrite()) {
60 // try to do fair execution for waiting task 58 // try to do fair execution for waiting task
61 if(selector.selectNow() > 0) select(); 59 if(selector.selectNow() > 0) select();
62 continue; 60 continue;
101 return true; 99 return true;
102 } 100 }
103 return false; 101 return false;
104 } 102 }
105 103
106 /** 104
107 * Check waiting command in merge 105 public void close(REPSocketChannel<REPCommand> channel) {
108 * @return true if there is a processed waiting command
109 * @throws IOException
110 */
111 public void checkWaitingCommandInMerge() {
112 List<PacketSet> w = waitingCommandInMerge;
113 waitingCommandInMerge = new LinkedList<PacketSet>();
114 for(PacketSet p: w) {
115 REPNode e = p.getEditor();
116 if(e.isMerging()) { // still merging do nothing
117 waitingCommandInMerge.add(p);
118 } else {
119 try {
120 // if (manager.sessionManage(e, p.command)) { // we don't need this
121 // assert false;
122 // return;
123 // }
124 e.manage(p.command);
125 } catch (Exception e1) {
126 // should be e.close()?
127 close(p.channel);
128 }
129 }
130 }
131 }
132
133
134 public boolean hasWaitingCommand(REPSocketChannel<REPCommand>c) {
135 for(PacketSet p:waitingCommandInMerge) {
136 if (p.channel==c) {
137 return true;
138 }
139 }
140 return false;
141 }
142
143 private void close(REPSocketChannel<REPCommand> channel) {
144 REPSelectionKey<REPCommand>key = channel.keyFor1(selector); 106 REPSelectionKey<REPCommand>key = channel.keyFor1(selector);
145 REPNode handler = (REPNode)key.attachment(); 107 REPNode handler = (REPNode)key.attachment();
146 key.cancel(); 108 key.cancel();
147 handler.cancel(channel); 109 handler.cancel(channel);
148 // we have to remove session/editor 110 // we have to remove session/editor
221 if (editor.channel!=null) 183 if (editor.channel!=null)
222 editor.setHost(myHost2); 184 editor.setHost(myHost2);
223 } 185 }
224 } 186 }
225 187
226 public void addWaitingCommand(PacketSet set) {
227 waitingCommandInMerge.add(set);
228 }
229 188
230 public void buttonPressed(SessionManagerEvent event) { 189 public void buttonPressed(SessionManagerEvent event) {
231 try { 190 try {
232 waitingEventQueue.put(event); 191 waitingEventQueue.put(event);
233 } catch (InterruptedException e) {} 192 } catch (InterruptedException e) {}