changeset 384:bcdf5476b8e4

restructured-version
author one@firefly.cr.ie.u-ryukyu.ac.jp
date Mon, 10 Nov 2008 22:16:37 +0900
parents 6d48db302b07
children 1fca50ce3508
files rep/Session.java rep/SessionManager.java rep/handler/Dispatcher.java rep/handler/Editor.java rep/handler/FirstConnector.java rep/handler/NullForwarder.java rep/handler/REPNode.java test/sematest/TestSessionManager.java
diffstat 8 files changed, 46 insertions(+), 322 deletions(-) [+]
line wrap: on
line diff
--- a/rep/Session.java	Mon Nov 10 22:13:40 2008 +0900
+++ b/rep/Session.java	Mon Nov 10 22:16:37 2008 +0900
@@ -144,7 +144,7 @@
 	public REPNode getForwarder(REPSocketChannel<REPCommand> channel) {
 		REPNode f = first;
 		while(f.channel!=channel) f = f.next;
-		SessionManager.logger.writeLog("getFirstForwarder="+f.next+"=>"+f.next.channel);
+		ServerMainLoop.logger.writeLog("getFirstForwarder="+f.next+"=>"+f.next.channel);
 		return f.next;
 	}
 
@@ -201,7 +201,7 @@
 				break;
 			}
 		}
-		SessionManager.logger.writeLog(log);
+		ServerMainLoop.logger.writeLog(log);
 	}
 
 }
--- a/rep/SessionManager.java	Mon Nov 10 22:13:40 2008 +0900
+++ b/rep/SessionManager.java	Mon Nov 10 22:16:37 2008 +0900
@@ -4,36 +4,25 @@
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.channels.ClosedChannelException;
-import java.nio.channels.SelectionKey;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
 
 import org.xml.sax.SAXException;
 
 
 
 import rep.channel.REPLogger;
-import rep.channel.REPServerSocketChannel;
 import rep.channel.REPSocketChannel;
 import rep.gui.CloseButtonEvent;
-import rep.gui.DoGUIUpdate;
 import rep.gui.SelectButtonEvent;
 import rep.gui.SessionManagerEvent;
 import rep.gui.SessionManagerEventListener;
 import rep.gui.SessionManagerGUI;
 import rep.gui.SessionManagerGUIimpl;
-import rep.handler.Dispatcher;
 import rep.handler.Editor;
 import rep.handler.REPNode;
 import rep.handler.FirstConnector;
 import rep.handler.Forwarder;
-import rep.channel.REPSelector;
 import rep.xml.SessionXMLDecoder;
 import rep.xml.SessionXMLEncoder;
-import rep.channel.REPSelectionKey;
 
 /*
 	+-------+--------+--------+-------+--------+---------+------+
@@ -53,32 +42,16 @@
 	byte[] text;
 */
 
-public class SessionManager implements SessionManagerEventListener{
-	static public REPLogger logger = REPLogger.singleton();
-
-	SessionList sessionList;           
-	private SessionManagerGUI gui;
-	// Main nio.Selector of this server
-	private REPSelector<REPCommand> selector;
+public class SessionManager extends ServerMainLoop 
+		implements SessionManagerEventListener {
+	SessionList sessionList = new SessionList();           
 	// Known Session Manager List, At most one parent. No parent means master.
-	SessionManagerList smList;
+	SessionManagerList smList = new SessionManagerList();
 	// Known Editor list. Connected Editor has a channel. 
 	// Session Manager Channel may have dummy editors.
-	EditorList editorList;
-	// Commands for busy editor are kept in this queue.
-	private List<PacketSet> waitingCommandInMerge;
-	// Command from gui. Synchronization is required.
-	private BlockingQueue<SessionManagerEvent> waitingEventQueue 
-		= new LinkedBlockingQueue<SessionManagerEvent>();;
-	// host name of this server. One of connecting SocketChannel's hostname
-	public String myHost;
-	// Single threaded write queueu. To avoid dead lock with too many writes.
-	private LinkedList<PacketSet> writeQueue = new LinkedList<PacketSet>();
-	private int receive_port;
-	private int parent_port;
-	static final int DEFAULT_PORT = 8766;
+	EditorList editorList = new EditorList();
 	// Queue limit for debugging purpose.
-	private static final int packetLimit = 200;
+	static final int packetLimit = 200;
 
 	// globalSessionID = SessionManagerID * MAXID + localSessionID
 	private static final int MAXID = 10000;
@@ -88,12 +61,12 @@
 	private REPNode sm_join_channel;
 	// Routing table for session and session manager.
 	private RoutingTable routingTable = new RoutingTable(this);
-	private SessionManagerEvent execAfterConnect = null;;
-
+	
+	static public REPLogger logger = REPLogger.singleton();
+	
 	public static void main(String[] args) throws InterruptedException, IOException {
-		
-		int port = DEFAULT_PORT;
-		int port_s = DEFAULT_PORT;
+		int port =ServerMainLoop.DEFAULT_PORT;
+		int port_s = ServerMainLoop.DEFAULT_PORT;
 		//System.setProperty("file.encoding", "UTF-8");
 		if(args.length > 0){
 			if (args.length!=2) {
@@ -110,174 +83,8 @@
 		sm.init(port,new SessionManagerGUIimpl(sm));
 	}
 
-	public void setReceivePort(int port) {
-		receive_port = port;
-	}
-
 	public void init(int port, SessionManagerGUI gui) throws IOException, InterruptedException {
-		this.gui = gui;
-		init(port);
-		mainLoop(); 
-	}
-
-	private void init(int port) throws InterruptedException, IOException {
-		selector = REPSelector.<REPCommand>create();	
-		REPServerSocketChannel<REPCommand> ssc = REPServerSocketChannel.<REPCommand>open(new REPCommandPacker());
-		ssc.configureBlocking(false);       // Selector requires this
-		ssc.socket().setReuseAddress(true);	//reuse address 必須
-		//getAllByNameで取れた全てのアドレスに対してbindする
-		ssc.socket().bind(new InetSocketAddress(port));
-		ssc.register(selector, SelectionKey.OP_ACCEPT, 
-				new Dispatcher(this));  // FirstConnector?
-
-		sessionList = new SessionList();
-		smList = new SessionManagerList();
-		editorList = new EditorList();
-		waitingCommandInMerge = new LinkedList<PacketSet>();
-		
-
-	}
-	
-	/*
-	 * The main loop.
-	 *     Check incoming events and waiting writes.
-	 *     Do select and call select() to check in coming packets.
-	 * We wrote everything in one thread, but we can assign
-	 * one thread for each communication channel and GUI event.
-	 */
-	public void mainLoop() throws IOException {
-		while(true){
-		    checkWaitingCommandInMerge();
-			if (checkInputEvent() ||
-			    checkWaitingWrite()) { 
-				   // try to do fair execution for waiting task
-				   if(selector.selectNow() > 0) select();
-				   continue;
-			}
-			// now we can wait for input packet or event
-			selector.select();
-			select();
-		}
-	}
-
-	/*
-	 * Synchronize GUI event in the main loop.
-	 */
-	private boolean checkInputEvent() {
-		SessionManagerEvent e;
-		if((e = waitingEventQueue.poll())!=null){
-			e.exec(this);
-			return true;
-		}
-		return false;
-	}
-
-	/*
-	 * Write a packet during the main loop.
-	 */
-	private boolean checkWaitingWrite() throws IOException {
-		PacketSet p = writeQueue.poll();
-		if (p!=null) {
-			p.channel.write(p.command);
-			return true;
-		}
-		return false;
-	}
-
-	/**
-	 * Check waiting command in merge
-	 * @return true if there is a processed waiting command
-	 * @throws IOException
-	 */
-	private void checkWaitingCommandInMerge() {
-		List<PacketSet> w = waitingCommandInMerge;
-		waitingCommandInMerge = new LinkedList<PacketSet>();
-		for(PacketSet p: w) {
-			REPNode e = p.getEditor();
-			if(e.isMerging()) { // still merging do nothing
-				waitingCommandInMerge.add(p);
-			} else {
-				try {
-					if (sessionManage(e, p.command)) { // we don't need this
-						assert false;
-						return;
-					}
-					e.manage(p.command);
-				} catch (Exception e1) {
-					// should be e.close()?
-					close(p.channel);
-				}		
-			}
-		}
-	}
-
-	/*
-	 * If we have waiting write commands, further sent commands also
-	 * wait to avoid out of order packet sending.
-	 */
-	public boolean hasWaitingCommand(REPSocketChannel<REPCommand>c) {
-		for(PacketSet p:waitingCommandInMerge) {
-			if (p.channel==c) {
-				return true;
-			}
-		}
-		return false;
-	}
-	
-	/*
-	 * Close a channel in case of exception or close. 
-	 */
-	private void close(REPSocketChannel<REPCommand> channel) {
-		REPSelectionKey<REPCommand>key = channel.keyFor1(selector);
-		REPNode handler = (REPNode)key.attachment();
-		key.cancel();
-		handler.cancel(channel);
-		// we have to remove session/enditor
-	}
-
-
-	/*
-	 * Do select operation on the Selector. Each key has a forwarder.
-	 * A forwarder can be a firstConnector, a forwarder for Session Manager
-	 * or an Editor.
-	 */
-	private void select() throws IOException {
-		
-		Set<REPSelectionKey<REPCommand>> keys = selector.selectedKeys1();
-		for(REPSelectionKey<REPCommand> key : keys){
-			if(key.isAcceptable()){
-				/*
-				 * Incoming connection. We don't know which, editor or
-				 * session manager. Assign FirstConnector to distinguish.
-				 */
-				REPSocketChannel<REPCommand> channel = key.accept(new REPCommandPacker());
-				logger.writeLog("SessionManager.select() : key.isAcceptable : channel = " + channel);
-				registerChannel(channel, new FirstConnector(this));
-				channel = null;
-			}else if(key.isReadable()){
-				/*
-				 * Incoming packets are handled by a various forwarder.
-				 * A hadler throw IOException() in case of a trouble to
-				 * close the channel.
-				 */
-				REPNode handler = (REPNode)key.attachment();
-				try {
-					handler.handle(key);
-				} catch (IOException e) {
-					key.cancel();
-					handler.cancel(key.channel1());
-				}
-			}
-		}
-	}
-	
-	public void registerChannel(REPSocketChannel<REPCommand> channel,REPNode handler) throws IOException {
-		if(channel == null) {
-			return;
-		}
-		handler.setChannel(channel);
-		channel.configureBlocking(false);
-		channel.register(selector, SelectionKey.OP_READ, handler);
+		mainLoop(this, port, gui); 
 	}
 
 	/*
@@ -285,56 +92,11 @@
 	 */
 	private void cancel_sm_join() {
 		logger.writeLog("Loop detected "+this);
-		removeChannel(sm_join_channel);
+		removeChannel(this, sm_join_channel);
 		sm_join_channel=null;
 	}
 
 
-	private void removeChannel(REPNode channel) {
-		REPSelectionKey<REPCommand> key = channel.channel.keyFor1(selector);
-		key.cancel();
-		try {
-			channel.channel.close1();
-		} catch (IOException e) {
-		}
-	}
-
-
-	void updateGUI() {
-		//リストのコピーをGUIに渡す
-		LinkedList<Session> sList = new LinkedList<Session>(sessionList.values());
-		LinkedList<REPNode> eList;
-		if (false) { 
-			// local editor only 
-			eList = new LinkedList<REPNode>();
-			for(REPNode e:editorList.values()) {
-				if (getSMID(e.eid)==smList.sessionManagerID()) {
-					eList.add(e);
-				}
-			}
-		} else {
-			eList = new LinkedList<REPNode>(editorList.values());
-		}
-		//GUIに反映
-		Runnable doRun = new DoGUIUpdate(sList, eList, gui);
-		gui.invokeLater(doRun);
-	}
-
-
-
-	public void setMyHostName(String localHostName) {
-		myHost = localHostName + receive_port;
-		setHostToEditor(myHost);
-	}
-
-	private void setHostToEditor(String myHost2) {
-		for(REPNode editor : editorList.values()){
-			if (editor.channel!=null)
-				editor.setHost(myHost2);
-		}
-	}
-
-
 	/**
 	 * GUI から、呼ばれて、Session Managerに接続する。
 	 * Host 名のSession Manager に SM_JOIN する。自分は、Session を持っていては
@@ -507,31 +269,6 @@
 		return session;
 	}
 
-	public void addWaitingCommand(PacketSet set) {
-		waitingCommandInMerge.add(set);
-	}
-
-	/*
-	 * Synchronize GUI command in this session manager.
-	 */
-	public void buttonPressed(SessionManagerEvent event) {
-		try {
-			waitingEventQueue.put(event);
-		} catch (InterruptedException e) {}
-		selector.wakeup();
-	}
-	
-	/*
-	 * Execute incoming event during the initialization for
-	 * testing purpose.
-	 */
-	public void syncExec(SessionManagerEvent event) {
-		try {
-			waitingEventQueue.put(event);
-		} catch (InterruptedException e) {
-		}
-	}
-
 	/*
 	 * GUI command interface for close session.
 	 */
@@ -557,12 +294,6 @@
 	}
 
 
-	public void addWriteQueue(PacketSet packetSet) {
-		writeQueue.addLast(packetSet);
-		assert(writeQueue.size()<packetLimit) ;
-	}
-
-
 	public void remove(Editor editor) {
 		Session s = sessionList.get(editor.getSID());
 		if (s==null) {
@@ -583,18 +314,6 @@
 		sendUpdate(s0.getSID());
 	}
 
-	public void setParentPort(int port) {
-		parent_port = port;
-	}
-	public int getParentPort() {
-		return parent_port;
-	}
-	
-	public int getPort() {
-		return receive_port;
-	}
-
-
 	public boolean sessionManage(REPNode forwarder, REPCommand command) throws ClosedChannelException,
 			IOException {
 		switch(command.cmd){
@@ -822,7 +541,7 @@
 		return newid+smList.sessionManagerID()*MAXID;
 	}
 	
-	private int getSMID(int id) {
+	int getSMID(int id) {
 		return id/MAXID;
 	}
 
@@ -882,15 +601,6 @@
 		return sessionList.get(sid);
 	}
 
-	public void execAfterConnect(SessionManagerEvent sessionManagerEvent) {
-		execAfterConnect  = sessionManagerEvent;
-	}
-
-	public void afterConnect() {
-		if (execAfterConnect!=null) execAfterConnect.exec(this);
-		execAfterConnect = null;
-	}
-
 	public void setParent(REPNode fw) {
 		smList.setParent(fw);
 	}
@@ -898,12 +608,6 @@
 	public String toString() {
 		int myId = 0;
 		if (smList!=null) myId = smList.sessionManagerID();
-		return "rep.SessionManager-"+myId+"@"+myHost+":"+receive_port; 
+		return "rep.SessionManager-"+myId+"@"+super.toString(); 
 	}
-
-	public void addWaitingSessionManager(REPNode fw, REPCommand command) {
-		smList.addWaitingSessionManager(fw, command) ;
-	}
-
-
 }
--- a/rep/handler/Dispatcher.java	Mon Nov 10 22:13:40 2008 +0900
+++ b/rep/handler/Dispatcher.java	Mon Nov 10 22:16:37 2008 +0900
@@ -3,6 +3,7 @@
 import java.io.IOException;
 
 import rep.REPCommand;
+import rep.ServerMainLoop;
 import rep.Session;
 import rep.SessionManager;
 import rep.channel.REPSelectionKey;
@@ -42,7 +43,7 @@
 		 */
 		REPSocketChannel<REPCommand> channel = key.channel1();
 		REPCommand command = channel.read();
-		SessionManager.logger.writeLog("REPHandlerImpl.handle() : command = " + command);
+		ServerMainLoop.logger.writeLog("REPHandlerImpl.handle() : command = " + command);
 		if (manager.sessionManage(this, command)) return;
 		
 		distpatchToEditor(channel, command);
--- a/rep/handler/Editor.java	Mon Nov 10 22:13:40 2008 +0900
+++ b/rep/handler/Editor.java	Mon Nov 10 22:16:37 2008 +0900
@@ -7,6 +7,7 @@
 import rep.PacketSet;
 import rep.REP;
 import rep.REPCommand;
+import rep.ServerMainLoop;
 import rep.SessionManager;
 import rep.channel.REPSelectionKey;
 import rep.channel.REPSocketChannel;
@@ -89,7 +90,7 @@
 		if (prev==null || prev.seq != command.seq || prev.eid!=command.eid) {
 			String err = "Editor.checkReturnedCommand() : command = " + command + " prev=";
 			err += prev==null?"null":prev.toString();
-			SessionManager.logger.writeLog(err);
+			ServerMainLoop.logger.writeLog(err);
 			assert(false);
 		}
 
@@ -207,7 +208,7 @@
 	public void handle(REPSelectionKey<REPCommand> key) throws IOException {
 		REPSocketChannel<REPCommand> channel = key.channel1();
 		REPCommand command = channel.read();
-		SessionManager.logger.writeLog("REPHandlerImpl.handle() read : command = " + command +" from "+channel);
+		ServerMainLoop.logger.writeLog("REPHandlerImpl.handle() read : command = " + command +" from "+channel);
 		if (manager.sessionManage(this, command)) return;
 		manage(command);
 	}
--- a/rep/handler/FirstConnector.java	Mon Nov 10 22:13:40 2008 +0900
+++ b/rep/handler/FirstConnector.java	Mon Nov 10 22:16:37 2008 +0900
@@ -3,6 +3,7 @@
 import java.io.IOException;
 
 import rep.REPCommand;
+import rep.ServerMainLoop;
 import rep.Session;
 import rep.SessionManager;
 import rep.channel.REPSelectionKey;
@@ -28,7 +29,7 @@
 		REPNode fw;
 		REPSocketChannel<REPCommand> channel = key.channel1();
 		REPCommand command = channel.read();
-		SessionManager.logger.writeLog("FirstConnector: command = " + command);
+		ServerMainLoop.logger.writeLog("FirstConnector: command = " + command);
 		switch(command.cmd) {
 		case SMCMD_JOIN: 
 		{
@@ -37,7 +38,7 @@
 			//エディタが新しくputする場合は新しくソケットを作る
 			//   1対1でない場合は、multiplexerを挿めば良い
 			REPNode editor = manager.newEditor(channel);
-			editor.setHost(manager.myHost);
+			editor.setHost(manager.myHost());
 			command.eid = editor.eid;
 			command.sid = -1;
 			editor.setSID(-1);
@@ -74,9 +75,9 @@
 		}
 		//myHost を設定。
 		//立ち上げ時にやるとlocalhostしか取れない
-		if(manager.myHost == null) manager.setMyHostName(getLocalHostName());
+		if(manager.myHost() == null) manager.setMyHostName(getLocalHostName());
 		fw.setMode(command.cmd);
-		fw.setHost(manager.myHost);
+		fw.setHost(manager.myHost());
 		manager.registerChannel(channel, fw);
 		manager.sessionManage(fw, command);
 	
--- a/rep/handler/NullForwarder.java	Mon Nov 10 22:13:40 2008 +0900
+++ b/rep/handler/NullForwarder.java	Mon Nov 10 22:16:37 2008 +0900
@@ -7,6 +7,11 @@
 import rep.channel.REPSelectionKey;
 import rep.channel.REPSocketChannel;
 
+/**
+ * @author kono
+ *  No connection
+ *    if a manager.parent is set to this, the manager is a master. 
+ */
 public class NullForwarder extends Forwarder {
 
 	public NullForwarder(SessionManager manager) {
--- a/rep/handler/REPNode.java	Mon Nov 10 22:13:40 2008 +0900
+++ b/rep/handler/REPNode.java	Mon Nov 10 22:16:37 2008 +0900
@@ -7,10 +7,21 @@
 import rep.channel.REPSelectionKey;
 import rep.channel.REPSocketChannel;
 
+/**
+ * @author kono
+ *    Abstract class for all REP node
+ *    Sub classes:
+ *        FirstConnector     waiting first connection and determines its type.
+ *        Editor             editor direct connect or no connection (master/slave)
+ *        Forwarder          send command to the other session manager 
+ *                                 base class for other communication node
+ *        Dispatcher         Session Manager entry, dispatch commands to editors.
+ *        NullForwarder      REP node with no connection
+ */
 public abstract class REPNode {
 
-	public int eid; // globally unique
-	public int sid=-1; // globally unique
+	public int eid;        // globally unique (contains SessionManagerID)
+	public int sid=-1;     // globally unique
 	public String host;
 	public String file;
 	public REP mode;
--- a/test/sematest/TestSessionManager.java	Mon Nov 10 22:13:40 2008 +0900
+++ b/test/sematest/TestSessionManager.java	Mon Nov 10 22:16:37 2008 +0900
@@ -1,6 +1,7 @@
 package test.sematest;
 
 import java.io.IOException;
+
 import rep.SessionManager;
 import rep.channel.REPLogger;
 import rep.channel.REPServerSocketChannel;