changeset 363:1a8856580d38

*** empty log message ***
author kono
date Mon, 20 Oct 2008 03:03:28 +0900
parents f0bd158dace6
children c965ef2b5fd6
files rep/Forwarder.java rep/SessionManager.java
diffstat 2 files changed, 125 insertions(+), 54 deletions(-) [+]
line wrap: on
line diff
--- a/rep/Forwarder.java	Sun Oct 19 23:05:59 2008 +0900
+++ b/rep/Forwarder.java	Mon Oct 20 03:03:28 2008 +0900
@@ -79,10 +79,10 @@
 		if (manager.sessionManage(this, command)) return;
 		
 		Session s = manager.getSession(command.sid);
+		if (s==null) throw new IOException();
 		Forwarder editor = s.getFirstForwarder();
-		if (editor!=null) {
-			editor.manage(command);
-		} else throw new IOException();
+		if (editor==null) throw new IOException();
+		editor.manage(command);
 	}
 
 	public void setMode(REP cmd) {
--- a/rep/SessionManager.java	Sun Oct 19 23:05:59 2008 +0900
+++ b/rep/SessionManager.java	Mon Oct 20 03:03:28 2008 +0900
@@ -46,27 +46,37 @@
 public class SessionManager implements SessionManagerEventListener{
 	static public REPLogger logger = REPLogger.singleton();
 
-	SessionList sessionList;
+	SessionList sessionList;           
 	private SessionManagerGUI gui;
+	// Main nio.Selector of this server
 	private REPSelector<REPCommand> selector;
+	// Known Session Manager List, At most one parent. No parent means master.
 	SessionManagerList smList;
+	// Known Editor list. Connected Editor has a channel. 
+	// Session Manager Channel may have dummy editors.
 	EditorList editorList;
-	// editorList は、sessionList に入っているeditorとは別なeditorのlistらしい。
-	// private String maxHost;
+	// Commands for busy editor are kept in this queue.
 	private List<PacketSet> waitingCommandInMerge;
-	private BlockingQueue<SessionManagerEvent> waitingEventQueue = new LinkedBlockingQueue<SessionManagerEvent>();;
+	// Command from gui. Synchronization is required.
+	private BlockingQueue<SessionManagerEvent> waitingEventQueue 
+		= new LinkedBlockingQueue<SessionManagerEvent>();;
+	// host name of this server. One of connecting SocketChannel's hostname
 	String myHost;
+	// Single threaded write queueu. To avoid dead lock with too many writes.
 	private LinkedList<PacketSet> writeQueue = new LinkedList<PacketSet>();
 	private int receive_port;
 	private int parent_port;
 	static final int DEFAULT_PORT = 8766;
+	// Queue limit for debugging purpose.
 	private static final int packetLimit = 200;
 
+	// globalSessionID = SessionManagerID * MAXID + localSessionID
 	private static final int MAXID = 10000;
 	SessionXMLDecoder decoder = new SessionXMLDecoder();
 	SessionXMLEncoder encoder = new SessionXMLEncoder();
+	// SocketChannel for our parent. At most one parent is allowed.
 	private Forwarder sm_join_channel;
-
+	// Routing table for session and session manager.
 	private RoutingTable routingTable = new RoutingTable();
 
 	public static void main(String[] args) throws InterruptedException, IOException {
@@ -75,40 +85,35 @@
 		int port_s = DEFAULT_PORT;
 		//System.setProperty("file.encoding", "UTF-8");
 		if(args.length > 0){
+			if (args.length!=2) {
+				logger.writeLog("Usage: sessionManager our_port parent_port");
+				return;
+			}
 			port = Integer.parseInt(args[0]);
 			port_s = Integer.parseInt(args[1]);
 		}
 		SessionManager sm = new SessionManager();
 		sm.setReceivePort(port);
 		sm.setParentPort(port_s);
+		// Ok start main loop
 		sm.init(port,new SessionManagerGUIimpl(sm));
-		
-
 	}
 
-	
 	public void setReceivePort(int port) {
 		receive_port = port;
 	}
 
-
-	public void openSelector() throws IOException{
-		selector = REPSelector.<REPCommand>create();
-	}
-
 	public void init(int port, SessionManagerGUI gui) throws IOException, InterruptedException {
 		this.gui = gui;
-		openSelector();
 		init(port);
 		mainLoop(); 
 	}
 
-
 	private void init(int port) throws InterruptedException, IOException {
-	
+		selector = REPSelector.<REPCommand>create();	
 		REPServerSocketChannel<REPCommand> ssc = REPServerSocketChannel.<REPCommand>open(new REPCommandPacker());
-		ssc.configureBlocking(false);	//reuse address 必須
-		ssc.socket().setReuseAddress(true);
+		ssc.configureBlocking(false);       // Selector requires this
+		ssc.socket().setReuseAddress(true);	//reuse address 必須
 		//getAllByNameで取れた全てのアドレスに対してbindする
 		ssc.socket().bind(new InetSocketAddress(port));
 		ssc.register(selector, SelectionKey.OP_ACCEPT, 
@@ -123,10 +128,12 @@
 	}
 	
 	/*
+	 * The main loop.
+	 *     Check incoming events and waiting writes.
+	 *     Do select and call select() to check in coming packets.
 	 * We wrote everything in one thread, but we can assign
 	 * one thread for each communication channel and GUI event.
 	 */
-
 	public void mainLoop() throws IOException {
 		while(true){
 		    checkWaitingCommandInMerge();
@@ -142,6 +149,9 @@
 		}
 	}
 
+	/*
+	 * Synchronize GUI event in the main loop.
+	 */
 	private boolean checkInputEvent() {
 		SessionManagerEvent e;
 		if((e = waitingEventQueue.poll())!=null){
@@ -151,6 +161,9 @@
 		return false;
 	}
 
+	/*
+	 * Write a packet during the main loop.
+	 */
 	private boolean checkWaitingWrite() throws IOException {
 		PacketSet p = writeQueue.poll();
 		if (p!=null) {
@@ -186,7 +199,23 @@
 			}
 		}
 	}
+
+	/*
+	 * If we have waiting write commands, further sent commands also
+	 * wait to avoid out of order packet sending.
+	 */
+	public boolean hasWaitingCommand(REPSocketChannel<REPCommand>c) {
+		for(PacketSet p:waitingCommandInMerge) {
+			if (p.channel==c) {
+				return true;
+			}
+		}
+		return false;
+	}
 	
+	/*
+	 * Close a channel in case of exception or close. 
+	 */
 	private void close(REPSocketChannel<REPCommand> channel) {
 		REPSelectionKey<REPCommand>key = channel.keyFor1(selector);
 		REPHandler handler = (REPHandler)key.attachment();
@@ -196,26 +225,30 @@
 	}
 
 
-	public boolean hasWaitingCommand(REPSocketChannel<REPCommand>c) {
-		for(PacketSet p:waitingCommandInMerge) {
-			if (p.channel==c) {
-				return true;
-			}
-		}
-		return false;
-	}
-
+	/*
+	 * Do select operation on the Selector. Each key has a forwarder.
+	 * A forwarder can be a firstConnector, a forwarder for Session Manager
+	 * or an Editor.
+	 */
 	private void select() throws IOException {
 		
 		Set<REPSelectionKey<REPCommand>> keys = selector.selectedKeys1();
 		for(REPSelectionKey<REPCommand> key : keys){
 			if(key.isAcceptable()){
+				/*
+				 * Incoming connection. We don't know which, editor or
+				 * session manager. Assign FirstConnector to distinguish.
+				 */
 				REPSocketChannel<REPCommand> channel = key.accept(new REPCommandPacker());
 				logger.writeLog("SessionManager.select() : key.isAcceptable : channel = " + channel);
-				registerChannel (channel, new FirstConnector(this));
+				registerChannel(channel, new FirstConnector(this));
 				channel = null;
-
 			}else if(key.isReadable()){
+				/*
+				 * Incoming packets are handled by a various forwarder.
+				 * A hadler throw IOException() in case of a trouble to
+				 * close the channel.
+				 */
 				REPHandler handler = (REPHandler)(key.attachment());
 				try {
 					handler.handle(key);
@@ -236,18 +269,20 @@
 		channel.register(selector, SelectionKey.OP_READ, handler);
 	}
 
-
-	void cancel_sm_join() {
+	/*
+	 * After loop detection, we give up session manager join.
+	 */
+	private void cancel_sm_join() {
 		removeChannel(sm_join_channel);
 		sm_join_channel=null;
 	}
 
 
-	private void removeChannel(Forwarder sm_join_channel) {
-		REPSelectionKey<REPCommand> key = sm_join_channel.channel.keyFor1(selector);
+	private void removeChannel(Forwarder channel) {
+		REPSelectionKey<REPCommand> key = channel.channel.keyFor1(selector);
 		key.cancel();
 		try {
-			sm_join_channel.channel.close();
+			channel.channel.close();
 		} catch (IOException e) {
 		}
 	}
@@ -266,9 +301,6 @@
 
 	void setMyHostName(String localHostName) {
 		myHost = localHostName + receive_port;
-//		if(maxHost == null) {
-//			maxHost = myHost;
-//		}
 		setHostToEditor(myHost);
 	}
 
@@ -281,6 +313,7 @@
 
 
 	/**
+	 * GUI から、呼ばれて、Session Managerに接続する。
 	 * Host 名のSession Manager に SM_JOIN する。自分は、Session を持っていては
 	 * ならない。複数のSession Managerにjoinすることは出来ない。(NATを実装するまでは)。
 	 * @param host
@@ -290,10 +323,13 @@
 		if (!sessionList.isEmpty()) return;
 		if (!smList.isMaster()) return;
 		int port = parent_port;
+		/*
+		 * IPv6 対応では、複数のアドレスを取って、それのすべてに接続を試す必要が
+		 * ある。
+		 */
 		InetSocketAddress addr = new InetSocketAddress(host, port);
 		try {
 			REPSocketChannel<REPCommand> sessionchannel = REPSocketChannel.<REPCommand>create(new REPCommandPacker());
-
 			sessionchannel.connect(addr);
 			while(!sessionchannel.finishConnect());
 			Forwarder sm = new Forwarder(this);
@@ -303,6 +339,10 @@
 		}
 	}
 	
+	/**
+	 * channel に SMCMD_SM_JOIN command を送る。
+	 * @param channel
+	 */
 	private void sm_join(Forwarder channel){
 		sm_join_channel = channel;
 		//SM_JOINコマンドを生成。
@@ -319,8 +359,7 @@
 		
 		//SM_JOINコマンドを送信。
 		channel.send(command);
-		//SessionManagerのListに追加。
-		
+		// ack を受け取ったら、SessionManagerのListに追加。ここではやらない。
 	}
 	
 	/* 
@@ -334,7 +373,7 @@
 		
 		Editor editor = (Editor)event.getEditor();
 		if(editor == null){
-			logger.writeLog("SessionManager.selectSession():editor = " + editor);
+			logger.writeLog("Error SessionManager.selectSession(): editor = " + editor);
 			return;
 		}
 		if (editor.hasSession()) return;
@@ -344,17 +383,21 @@
 
 	/*
 	 * Select Session Protocol handler
+	 *    called from GUI or incoming SMCMD_SELECT command.
 	 */
 	private void selectSession(int sid, Session session, int eid, Forwarder editor) {
 		if(session.hasOwner()){
+			// we have selected session.
 			REPCommand sendCommand = new REPCommand();
 			if (editor.isDirect()&&editor.getEID()==eid) {
+				// Found directly connected joined editor. Send join_ack().
 				session.addForwarder(editor);
 				sendUpdate(session.getSID());
 				sendCommand.setCMD(REP.SMCMD_JOIN_ACK);
 			} else {
+				// We have a session, but joined editor is on the other sm.
 				// SELECT_ACK is sent to the session ring to
-				// find out joined editor
+				// find out the joined editor.
 				sendCommand.setCMD(REP.SMCMD_SELECT_ACK);
 				// Do not directly addForwarder(forwarder). It may be
 				// shared among sessions.
@@ -362,16 +405,17 @@
 				f.setChannel(editor.channel); // incoming channel
 				f.setHost(myHost);
 				f.setSID(sid);
-				session.addForwarder(f);
+				session.addForwarder(f); // f.next is set up here.
 			}
 			sendCommand.setEID(editor.getEID());
 			sendCommand.setSID(sid);
 			sendCommand.string = session.getName();
 			editor.send(sendCommand);
 		}else {
-			// session searching 
+			// session searching continue...
 			Forwarder next = routingTable.toSession(sid);
 			
+			// create dummy editor for this session
 			Forwarder f = new Editor(this, false, makeID(editorList.newEid()));
 			f.setChannel(editor.channel); // incoming channel
 			f.setNext(next);
@@ -379,6 +423,7 @@
 			f.setSID(sid);
 			session.setFirstForwarder(f);
 			
+			// pass the select command to the next path.
 			REPCommand command = new REPCommand();
 			command.setCMD(REP.SMCMD_SELECT);
 			command.setSID(sid);
@@ -388,6 +433,9 @@
 		}
 	}
 
+	/*
+	 * Create and send UPDATE command.
+	 */
 	private void sendUpdate(int sid) {
 		REPCommand command = makeREPCommandWithSessionList(REP.SMCMD_UPDATE);
 		command.setSID(sid);
@@ -395,14 +443,20 @@
 		smList.sendToMaster(command);
 	}
 
+	/*
+	 * Create new editor in this sessin manager. A dummy editor
+	 * is created also.
+	 */
 	public Editor newEditor(REPSocketChannel<REPCommand> channel) {
 		int eid =  makeID(editorList.newEid());
 		Editor editor = new Editor(this, eid, channel);
 		editorList.add(editor);
 		return editor;
 	}
-	
-	
+
+	/*
+	 * Create new session.
+	 */
 	public Session newSession(Forwarder master) {
 		int sid= makeID(sessionList.newSessionID());
 		Session session = new Session(sid, master);
@@ -414,6 +468,9 @@
 		waitingCommandInMerge.add(set);
 	}
 
+	/*
+	 * Synchronize GUI command in this session manager.
+	 */
 	public void buttonPressed(SessionManagerEvent event) {
 		try {
 			waitingEventQueue.put(event);
@@ -421,6 +478,10 @@
 		selector.wakeup();
 	}
 	
+	/*
+	 * Execute incoming event during the initialization for
+	 * testing purpose.
+	 */
 	public void syncExec(SessionManagerEvent event) {
 		try {
 			waitingEventQueue.put(event);
@@ -428,6 +489,9 @@
 		}
 	}
 
+	/*
+	 * GUI command interface for close session.
+	 */
 	public void closeSession(SessionManagerEvent event) {
 		Session session = ((CloseButtonEvent) event).getSession();
 		session.closeSession();
@@ -435,6 +499,9 @@
 		updateGUI();
 	}
 
+	/*
+	 * Remove editors which has the cannel.
+	 */
 	public void remove(REPSocketChannel<REPCommand> channel) {
 		int i = 0;
 		for(Session s:sessionList.values()) {
@@ -629,6 +696,12 @@
 	}
 
 
+	/**
+     * UPDATE/UPDATE_ACKにより送られてきたSessionの情報を追加する
+	 * @param command
+	 * @return
+	 * @throws IOException
+	 */
 	private String mergeUpdate(REPCommand command) throws IOException {
 		SessionList receivedSessionList;
 		try {
@@ -636,9 +709,9 @@
 		} catch (SAXException e) {
 			throw new IOException();
 		}
-		// UPDATE/UPDATE_ACKにより送られてきたSessionの情報を追加する
+		// 受け取った情報と自分の情報を混ぜる。
+		sessionList.merge(receivedSessionList);
 		//XMLを生成。送信コマンドにセット。
-		sessionList.merge(receivedSessionList);
 		return encoder.sessionListToXML(sessionList);
 
 	}
@@ -646,11 +719,9 @@
 	/*
 	 * id has SessionManager ID part
 	 */
-
 	private int makeID(int newid) {
 		return newid+smList.sessionManagerID()*MAXID;
 	}
-
 	
 	private int getSMID(int id) {
 		return id/MAXID;