diff rep/SessionManager.java @ 316:77f443f6dc9f

add session manager channel handler
author kono
date Tue, 07 Oct 2008 18:15:33 +0900
parents 20fb70068089
children c83a3faec487
line wrap: on
line diff
--- a/rep/SessionManager.java	Mon Oct 06 18:58:49 2008 +0900
+++ b/rep/SessionManager.java	Tue Oct 07 18:15:33 2008 +0900
@@ -17,8 +17,9 @@
 import rep.channel.REPSocketChannel;
 import rep.handler.PacketSet;
 import rep.handler.REPHandler;
-import rep.handler.REPHandlerImpl;
-import rep.handler.REPHandlerInMerge;
+import rep.handler.REPEditorHandler;
+import rep.handler.REPHandlerEditorInMerge;
+import rep.handler.REPSessionManagerHandler;
 import rep.channel.REPSelector;
 import rep.xml.SessionXMLDecoder;
 import rep.xml.SessionXMLEncoder;
@@ -52,14 +53,31 @@
 	// editorList は、sessionList に入っているeditorとは別なeditorのlistらしい。
 	private String maxHost;
 	private List<PacketSet> waitingCommandInMerge;
-	REPHandler normalHandler = new REPHandlerImpl(this);
-	REPHandler handlerInMerge =new REPHandlerInMerge(this); 
+	REPHandler normalHandler = new REPEditorHandler(this);
+	REPHandler handlerInMerge =new REPHandlerEditorInMerge(this); 
 	private BlockingQueue<SessionManagerEvent> waitingEventQueue = new LinkedBlockingQueue<SessionManagerEvent>();;
 	private String myHost;
 	private static int receive_port;
 	private static int parent_port;
 	static final int DEFAULT_PORT = 8766;
-	
+
+	public static void main(String[] args) throws InterruptedException, IOException {
+		
+		int port = DEFAULT_PORT;
+		int port_s = DEFAULT_PORT;
+		//System.setProperty("file.encoding", "UTF-8");
+		if(args.length > 0){
+			port = Integer.parseInt(args[0]);
+			port_s = Integer.parseInt(args[1]);
+		}
+		receive_port = port;
+		parent_port = port_s;
+		SessionManager sm = new SessionManager();
+		sm.init(port,new SessionManagerGUIimpl(sm));
+		
+
+	}
+
 	
 	public void openSelector() throws IOException{
 		selector = REPSelector.<REPCommand>create();
@@ -121,7 +139,7 @@
 
 	private boolean checkWaitingWrite() throws IOException {
 		for(Session s:sessionList) {
-			for(Editor editor: s.getEditorList()) 
+			for(Forwarder editor: s.getEditorList()) 
 				if (editor.doWaitingWrite()) return true;
 		}
 		return false;
@@ -184,11 +202,26 @@
 	public void manage(REPSocketChannel<REPCommand> channel, REPCommand receivedCommand) throws IOException {
 		if(receivedCommand == null) return;
 		//Session session;
-		REPCommand sendCommand = new REPCommand(receivedCommand);
 		REPSocketChannel<REPCommand> send = channel;
 
 		switch(receivedCommand.cmd){
 
+		case REPCMD_DELETE:
+		case REPCMD_INSERT:
+		case REPCMD_NOP:
+		{
+			// sid から Session を取得
+			Session session = getSession(receivedCommand.sid);
+			if (session==null) throw new IOException();
+			// 次のエディタへコマンドを送信する処理
+			Editor editor = session.getEditor(channel);
+			if (editor.isMerging()) {
+				addWaitingCommand(new PacketSet(channel, editor, receivedCommand));
+				break;
+			}
+			editor.translate(session.getNextEditor(editor), receivedCommand);
+			break;
+		}
 		case SMCMD_JOIN:
 		{
 			//どのSessionにも属さないエディタをリストに追加
@@ -225,6 +258,7 @@
 			updateGUI();
 
 			//エディタにAckを送信
+			REPCommand sendCommand = new REPCommand(receivedCommand);
 			sendCommand.setCMD(REP.SMCMD_PUT_ACK);
 			sendCommand.setEID(editor.getEID());
 			sendCommand.setSID(session.getSID());
@@ -252,12 +286,13 @@
 
 			if(session.hasOwner()){
 				//このSessionManagerがオーナーを持っている場合、Sessionにエディタを追加し、エディタへAckを返す
+				REPCommand sendCommand = new REPCommand(receivedCommand);
 				sendCommand.setCMD(REP.SMCMD_SELECT_ACK);
 				sendCommand.setEID(editor.getEID());
 				editor.send(sendCommand);
 			}else{
 				//オーナーを持ってない場合は、オーナーを持っているSessionManagerへSELECTコマンドを中継する
-				Editor owner = session.getOwner();
+				Forwarder owner = session.getOwner();
 				owner.send(receivedCommand);
 			}
 		}
@@ -267,7 +302,7 @@
 		case SMCMD_SELECT_ACK:
 		{
 			String hostport = receivedCommand.string;
-			Editor editor = getEditor(hostport);
+			Forwarder editor = getEditor(hostport);
 
 			if(editor != null) {
 				//host, port を見て、このコマンドが自分が送信したSelectコマンドのAckかどうかを判断する
@@ -288,6 +323,11 @@
 		case SMCMD_SM_JOIN:
 
 		{
+			// このchannelの相手は、SessionManager なので、
+			// 特別なhandlerを接続する必要がある
+			channel.register(selector, SelectionKey.OP_READ, 
+					new REPSessionManagerHandler(this));
+			
 			//SessionManagerのリストへ追加
 			smList.add(channel);
 
@@ -301,7 +341,7 @@
 
 			//maxHost を設定。
 			if(setMaxHost(channel, receivedSessionList.getMaxHost())){
-				sendCommand = new REPCommand();
+				REPCommand sendCommand = new REPCommand();
 				sendCommand.setCMD(REP.SMCMD_CH_MASTER);
 				sendCommand.setString(maxHost);
 				smList.sendExcept(channel, sendCommand);
@@ -310,7 +350,7 @@
 			//SessionListからXMLを生成。
 			//joinしてきたSessionManagerに対してACKを送信。
 			SessionXMLEncoder sessionlistEncoder = new SessionXMLEncoder(sessionList);
-			sendCommand = new REPCommand();
+			REPCommand sendCommand = new REPCommand();
 			sendCommand.setCMD(REP.SMCMD_SM_JOIN_ACK);
 			sendCommand.setString(sessionlistEncoder.sessionListToXML());
 			send.write(sendCommand);
@@ -332,7 +372,7 @@
 
 			//maxHostを決定。
 			if(setMaxHost(channel, receivedSessionList2.getMaxHost())){
-				sendCommand = new REPCommand();
+				REPCommand sendCommand = new REPCommand();
 				sendCommand.setCMD(REP.SMCMD_CH_MASTER);
 				sendCommand.setString(maxHost);
 				smList.sendExcept(channel, sendCommand);
@@ -390,7 +430,7 @@
 		{
 			//maxHost を設定。
 			if(setMaxHost(channel, receivedCommand.string)){
-				sendCommand = new REPCommand();
+				REPCommand sendCommand = new REPCommand();
 				sendCommand.setCMD(REP.SMCMD_CH_MASTER);
 				sendCommand.setString(maxHost);
 				smList.sendExcept(channel, sendCommand);
@@ -398,27 +438,6 @@
 		}
 			break;
 
-		case REPCMD_DELETE:
-		case REPCMD_INSERT:
-		case REPCMD_NOP:
-		{
-			// sid から Session を取得
-			Session session = getSession(receivedCommand.sid);
-			if (session==null) throw new IOException();
-			// 次のエディタへコマンドを送信する処理
-			Editor editor = session.getEditor(channel);
-			Editor.TranslatorResult r = editor.translate(session.getNextEditor(editor), receivedCommand);
-			if(r==Editor.TranslatorResult.MERGE_END) {
-				endMerge(receivedCommand, session, editor);
-			} else if (r==Editor.TranslatorResult.START_MERGE) {
-				// マージ中のエディタはコマンドを受け取らない
-				// ここで止めることによって、SMCMD_START_MERGE_ACK
-				// が来た時には、editor.writeQueue はemptyになる
-				Editor prevEditor = session.getPrevEditor(editor);
-				setMergeState(prevEditor.getChannel(), session.getSID());
-			}
-			break;
-		}
 		case SMCMD_START_MERGE_ACK:
 		{
 			// sid から Session を取得
@@ -458,8 +477,6 @@
 			Editor editor) {
 		REPCommand mergeEnd = new REPCommand(REP.SMCMD_END_MERGE,receivedCommand.sid,editor.getEID(),editor.seq(),0,"");
 		editor.send(mergeEnd);
-		Editor prevEditor = session.getPrevEditor(editor);
-		setNormalState(prevEditor.getChannel(), session.getSID());
 	}
 
 	private void updateGUI() {
@@ -471,17 +488,7 @@
 		gui.invokeLater(doRun);
 	}
 
-	private void setNormalState(REPSocketChannel<REPCommand> channel, int sid) {
-		SelectionKey key = channel.keyFor(selector);
-		key.attach(normalHandler);
-	}
-
-	private void setMergeState(REPSocketChannel<REPCommand> channel, int sid) {
-		SelectionKey key = channel.keyFor(selector);
-		key.attach(handlerInMerge);
-	}
-
-	private Editor getEditor(String hostport) {
+	private Forwarder getEditor(String hostport) {
 		for(Editor editor : editorList){
 			if(editor.getHost() == hostport){
 				return editor;
@@ -490,11 +497,11 @@
 		return null;
 	}
 	
-	public Session getSession(int sid) {
+	public Session getSession(int sid) throws IOException {
 		for(Session session : sessionList){
 			if(session.getSID() == sid) return session;
 		}
-		return null;
+		throw new IOException();
 	}
 
 	private boolean setMaxHost(REPSocketChannel<REPCommand> channel, String maxHost2) {
@@ -520,23 +527,6 @@
 		}
 	}
 
-	public static void main(String[] args) throws InterruptedException, IOException {
-		
-		int port = DEFAULT_PORT;
-		int port_s = DEFAULT_PORT;
-		//System.setProperty("file.encoding", "UTF-8");
-		if(args.length > 0){
-			port = Integer.parseInt(args[0]);
-			port_s = Integer.parseInt(args[1]);
-		}
-		receive_port = port;
-		parent_port = port_s;
-		SessionManager sm = new SessionManager();
-		sm.init(port,new SessionManagerGUIimpl(sm));
-		
-
-	}
-
 	public void connectSession(String host) {
 		int port = DEFAULT_PORT;
 		port = parent_port;
@@ -584,7 +574,7 @@
 		return host;
 	}
 
-	public void selectSession(SelectButtonEvent event) {
+	public void selectSession(SelectButtonEvent event) throws IOException {
 		REPSocketChannel<REPCommand> channel = event.getEditorChannel();
 		int sid = event.getSID();
 		int eid = event.getEID();
@@ -613,7 +603,7 @@
 			session = getSession(sid);
 			session.addEditor(editor);
 			
-			Editor owner = session.getOwner();
+			Forwarder owner = session.getOwner();
 			
 			REPCommand command = new REPCommand();
 			command.setCMD(REP.SMCMD_SELECT);