view rep/ServerMainLoop.java @ 391:3b0a5a55e3ee

24
author one@firefly.cr.ie.u-ryukyu.ac.jp
date Mon, 10 Nov 2008 22:25:14 +0900
parents aa07134fea32
children 19705f4b8015
line wrap: on
line source

package rep;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import rep.channel.REPLogger;
import rep.channel.REPSelectionKey;
import rep.channel.REPSelector;
import rep.channel.REPServerSocketChannel;
import rep.channel.REPSocketChannel;
import rep.gui.DoGUIUpdate;
import rep.gui.SessionManagerEvent;
import rep.gui.SessionManagerGUI;
import rep.handler.FirstConnector;
import rep.handler.REPNode;

public class ServerMainLoop  {

	public static REPLogger logger = REPLogger.singleton();
	public SessionManager manager;
	protected SessionManagerGUI gui;
	protected REPSelector<REPCommand> selector;
	protected List<PacketSet> waitingCommandInMerge= new LinkedList<PacketSet>();
	private BlockingQueue<SessionManagerEvent> waitingEventQueue 
								= new LinkedBlockingQueue<SessionManagerEvent>();
	public String myHost;
	private LinkedList<PacketSet> writeQueue = new LinkedList<PacketSet>();
	protected int receive_port;
	protected int parent_port;
	protected static final int DEFAULT_PORT = 8766;
	private SessionManagerEvent execAfterConnect = null;


	public void setReceivePort(int port) {
		receive_port = port;
	}

	void mainLoop(SessionManager sessionManager, int port, SessionManagerGUI gui) throws IOException,
			SocketException, ClosedChannelException {
		this.gui = gui;
		manager = sessionManager;
		receive_port = port;
		serverInit();
		mainLoop();
	}
	
	public void mainLoop() throws IOException {
		while(true){
		    checkWaitingCommandInMerge();
			if (checkInputEvent() ||
			    checkWaitingWrite()) { 
				   // try to do fair execution for waiting task
				   if(selector.selectNow() > 0) select();
				   continue;
			}
			// now we can wait for input packet or event
			selector.select();
			select();
		}
	}

	void serverInit() throws IOException, SocketException,
			ClosedChannelException {
		selector = REPSelector.<REPCommand>create();	
		REPServerSocketChannel<REPCommand> ssc = REPServerSocketChannel.<REPCommand>open(new REPCommandPacker());
		ssc.configureBlocking(false);       // Selector requires this
		ssc.socket().setReuseAddress(true);	//reuse address 必須
		//getAllByNameで取れた全てのアドレスに対してbindする
		try {
			ssc.socket().bind(new InetSocketAddress("::",receive_port));
		} catch (SocketException e) {
			// for some IPv6 implementation
			ssc.socket().bind(new InetSocketAddress(receive_port));
		}
		ssc.register(selector, SelectionKey.OP_ACCEPT,null); 
	}

	private boolean checkInputEvent() {
		SessionManagerEvent e;
		if((e = waitingEventQueue.poll())!=null){
			e.exec(manager);
			return true;
		}
		return false;
	}

	private boolean checkWaitingWrite() throws IOException {
		PacketSet p = writeQueue.poll();
		if (p!=null) {
			logger.writeLog("writing: "+p.command+" to: "
					+manager.editorList.editorByChannel(p.channel));
			p.channel.write(p.command);
			return true;
		}
		return false;
	}

	/**
	 * Check waiting command in merge
	 * @return true if there is a processed waiting command
	 * @throws IOException
	 */
	public void checkWaitingCommandInMerge() {
		List<PacketSet> w = waitingCommandInMerge;
		waitingCommandInMerge = new LinkedList<PacketSet>();
		for(PacketSet p: w) {
			REPNode e = p.getEditor();
			if(e.isMerging()) { // still merging do nothing
				waitingCommandInMerge.add(p);
			} else {
				try {
//					if (manager.sessionManage(e, p.command)) { // we don't need this
//						assert false;
//						return;
//					}
					e.manage(p.command);
				} catch (Exception e1) {
					// should be e.close()?
					close(p.channel);
				}		
			}
		}
	}


	public boolean hasWaitingCommand(REPSocketChannel<REPCommand>c) {
		for(PacketSet p:waitingCommandInMerge) {
			if (p.channel==c) {
				return true;
			}
		}
		return false;
	}

	private void close(REPSocketChannel<REPCommand> channel) {
		REPSelectionKey<REPCommand>key = channel.keyFor1(selector);
		REPNode handler = (REPNode)key.attachment();
		key.cancel();
		handler.cancel(channel);
		// we have to remove session/editor
	}

	private void select() throws IOException {
		
		Set<REPSelectionKey<REPCommand>> keys = selector.selectedKeys1();
		for(REPSelectionKey<REPCommand> key : keys){
			if(key.isAcceptable()){
				/*
				 * Incoming connection. We don't know which, editor or
				 * session manager. Assign FirstConnector to distinguish.
				 */
				REPSocketChannel<REPCommand> channel = key.accept(new REPCommandPacker());
				logger.writeLog("SessionManager.select() : key.isAcceptable : channel = " + channel);
				registerChannel(channel, new FirstConnector(manager,channel));
			} else if(key.isReadable()){
				/*
				 * Incoming packets are handled by a various forwarder.
				 * A handler throw IOException() in case of a trouble to
				 * close the channel.
				 */
				REPNode handler = (REPNode)key.attachment();
				try {
					REPCommand command = key.channel1().read();
					handler.handle(command, key);
				} catch (IOException e) {
					key.cancel();
					handler.cancel(key.channel1());
				}
			}
		}
	}

	public void registerChannel(REPSocketChannel<REPCommand> channel, REPNode handler) throws IOException {
		if(channel == null) {
			return;
		}
		// handler.setChannel(channel);
		channel.configureBlocking(false);
		channel.register(selector, SelectionKey.OP_READ, handler);
	}

	protected void updateGUI() {
		//リストのコピーをGUIに渡す
		LinkedList<Session> sList = new LinkedList<Session>(manager.sessionList.values());
		LinkedList<REPNode> eList;
		if (false) { 
			// local editor only 
			eList = new LinkedList<REPNode>();
			for(REPNode e:manager.editorList.values()) {
				if (manager.getSMID(e.eid)==manager.smList.sessionManagerID()) {
					eList.add(e);
				}
			}
		} else {
			eList = new LinkedList<REPNode>(manager.editorList.values());
		}
		//GUIに反映
		Runnable doRun = new DoGUIUpdate(sList, eList, gui);
		gui.invokeLater(doRun);
	}

	public void setMyHostName(String localHostName) {
		myHost = localHostName + receive_port;
		setHostToEditor(myHost);
	}

	public String myHost() {
		return myHost;
	}
	
	private void setHostToEditor(String myHost2) {
		for(REPNode editor : manager.editorList.values()){
			if (editor.channel!=null)
				editor.setHost(myHost2);
		}
	}

	public void addWaitingCommand(PacketSet set) {
		waitingCommandInMerge.add(set);
	}

	public void buttonPressed(SessionManagerEvent event) {
		try {
			waitingEventQueue.put(event);
		} catch (InterruptedException e) {}
		selector.wakeup();
	}

	public void syncExec(SessionManagerEvent event) {
		try {
			waitingEventQueue.put(event);
		} catch (InterruptedException e) {
		}
	}

	public void addWriteQueue(PacketSet packetSet) {
		writeQueue.addLast(packetSet);
		assert(writeQueue.size()<SessionManager.packetLimit) ;
	}

	public void setParentPort(int port) {
		parent_port = port;
	}

	public int getParentPort() {
		return parent_port;
	}

	public int getPort() {
		return receive_port;
	}

	public void execAfterConnect(SessionManagerEvent sessionManagerEvent) {
		execAfterConnect  = sessionManagerEvent;
	}

	public void afterConnect() {
		SessionManagerEvent e = execAfterConnect;
		execAfterConnect = null;
		if (e!=null) e.exec(manager);
	}

	void removeChannel(SessionManager sessionManager, REPNode channel) {
		REPSelectionKey<REPCommand> key = channel.channel.keyFor1(selector);
		key.cancel();
		try {
			channel.channel.close1();
		} catch (IOException e) {
		}
	}

	public String toString() {
		return ""+myHost+":"+receive_port;
	}


	public void setGUI(SessionManagerGUI gui) {
		this.gui = gui;
	}

	public void setManager(SessionManager sessionManager) {
		manager = sessionManager;
	}

}