diff rep/SessionManager.java @ 355:98607350f7d1

*** empty log message ***
author kono
date Fri, 17 Oct 2008 22:11:34 +0900
parents 0d47ff22ee0e
children b18c24dcc5d2
line wrap: on
line diff
--- a/rep/SessionManager.java	Fri Oct 17 18:40:08 2008 +0900
+++ b/rep/SessionManager.java	Fri Oct 17 22:11:34 2008 +0900
@@ -62,8 +62,7 @@
 	static final int DEFAULT_PORT = 8766;
 	private static final int packetLimit = 200;
 	SessionXMLDecoder decoder = new SessionXMLDecoder();
-
-	boolean smjoin_mode;
+	private Forwarder sm_join_channel;
 
 	public static void main(String[] args) throws InterruptedException, IOException {
 		
@@ -208,7 +207,7 @@
 			if(key.isAcceptable()){
 				REPSocketChannel<REPCommand> channel = key.accept(new REPCommandPacker());
 				logger.writeLog("SessionManager.select() : key.isAcceptable : channel = " + channel);
-				registerChannel (channel, SelectionKey.OP_READ,new Forwarder(this));
+				registerChannel (channel, new FirstConnector(this));
 				channel = null;
 
 			}else if(key.isReadable()){
@@ -223,18 +222,29 @@
 		}
 	}
 	
-	void registerChannel(REPSocketChannel<REPCommand> channel, int ops,Forwarder handler) throws IOException {
+	void registerChannel(REPSocketChannel<REPCommand> channel,Forwarder handler) throws IOException {
 		if(channel == null) {
 			return;
 		}
 		handler.setChannel(channel);
 		channel.configureBlocking(false);
-		channel.register(selector, ops, handler);
+		channel.register(selector, SelectionKey.OP_READ, handler);
 	}
 
 
 	void cancel_sm_join() {
-		smjoin_mode=false;
+		removeChannel(sm_join_channel);
+		sm_join_channel=null;
+	}
+
+
+	private void removeChannel(Forwarder sm_join_channel) {
+		REPSelectionKey<REPCommand> key = sm_join_channel.channel.keyFor1(selector);
+		key.cancel();
+		try {
+			sm_join_channel.channel.close();
+		} catch (IOException e) {
+		}
 	}
 
 
@@ -294,7 +304,16 @@
 		}
 	}
 
+
+	/**
+	 * Host 名のSession Manager に SM_JOIN する。自分は、Session を持っていては
+	 * ならない。複数のSession Managerにjoinすることは出来ない。(NATを実装するまでは)。
+	 * @param host
+	 */
 	public void connectSession(String host) {
+		if (sm_join_channel!=null) return;
+		if (!sessionList.isEmpty()) return;
+		if (!smList.isMaster()) return;
 		int port = parent_port;
 		InetSocketAddress addr = new InetSocketAddress(host, port);
 		try {
@@ -303,14 +322,14 @@
 			sessionchannel.connect(addr);
 			while(!sessionchannel.finishConnect());
 			Forwarder sm = new Forwarder(this);
-			registerChannel(sessionchannel, SelectionKey.OP_READ,sm);
+			registerChannel(sessionchannel, sm);
 			sm_join(sm);
 		}catch (IOException e) {
 		}
 	}
 	
 	private void sm_join(Forwarder channel){
-		smjoin_mode = true;
+		sm_join_channel = channel;
 		//SM_JOINコマンドを生成。
 		REPCommand command = new REPCommand();
 		command.setCMD(REP.SMCMD_SM_JOIN);
@@ -321,8 +340,9 @@
 		setMyHostName(channel.getLocalHostName());
 		
 		//XMLを生成。送信コマンドにセット。
-		SessionXMLEncoder encoder = new SessionXMLEncoder(sessionList);
-		String string = encoder.sessionListToXML();
+		//SessionXMLEncoder encoder = new SessionXMLEncoder(sessionList);
+		//String string = encoder.sessionListToXML();
+		String string = myHost;
 		command.setString(string);
 		
 		//SM_JOINコマンドを送信。
@@ -439,25 +459,15 @@
 	}
 
 
-	boolean sessionManage(Forwarder forwarder, REPCommand receivedCommand) throws ClosedChannelException,
+	boolean sessionManage(Forwarder forwarder, REPCommand command) throws ClosedChannelException,
 			IOException, SAXException {
-		switch(receivedCommand.cmd){
+		switch(command.cmd){
 	
 		// Session Manager Command
 	
 		case SMCMD_JOIN:
 		{
-			//どのSessionにも属さないエディタをリストに追加
-			//エディタとchannelは1対1 (ではない)
-			//エディタが新しくputする場合は新しくソケットを作る
-			// ここのeditorList はsessionのとは別物
-			Editor editor1 = new Editor(this,-1,forwarder.channel);
-			registerChannel(forwarder.channel,SelectionKey.OP_READ,editor1);
-			editor1.setHost(myHost);
-			editorList.add(editor1);
-	
 			updateGUI();
-	
 		}
 	
 		break;
@@ -473,18 +483,15 @@
 			// 自分の親に作ってもらう必要がある。自分が親なら自分で作って良い。
 			
 			int sid = sessionList.size();
-			Editor editor = new Editor(this,0, forwarder.channel);
-			registerChannel(forwarder.channel,SelectionKey.OP_READ,editor);
-			editorList.add(editor);
-			editor.setHost(myHost);
-			Session session = new Session(sid, receivedCommand.string, editor);
+			Editor editor = (Editor) forwarder;
+			Session session = new Session(sid, command.string, (Editor)forwarder);
 			session.hasOwner(true);
 			sessionList.add(session);
 	
 			updateGUI();
 	
 			//エディタにAckを送信
-			REPCommand sendCommand = new REPCommand(receivedCommand);
+			REPCommand sendCommand = new REPCommand(command);
 			sendCommand.setCMD(REP.SMCMD_PUT_ACK);
 			sendCommand.setEID(editor.getEID());
 			sendCommand.setSID(session.getSID());
@@ -493,10 +500,10 @@
 			//他のSessionManagerへSessionの追加を報告
 			//親に送って、親から子へ
 			SessionXMLEncoder sessionEncoder = new SessionXMLEncoder(session);
-			REPCommand command = new REPCommand();
-			command.setSID(session.getSID());
-			command.setString(sessionEncoder.sessionListToXML());
-			command.setCMD(REP.SMCMD_UPDATE);
+			REPCommand command1 = new REPCommand();
+			command1.setSID(session.getSID());
+			command1.setString(sessionEncoder.sessionListToXML());
+			command1.setCMD(REP.SMCMD_UPDATE);
 			smList.sendToSlaves( command);
 	
 		}
@@ -510,19 +517,19 @@
 			//他のSessionManagerをエディタとしてSessionに追加
 			Forwarder next = new Forwarder(this);
 			next.setChannel(forwarder.channel);
-			Session session = getSession(receivedCommand.sid);
+			Session session = getSession(command.sid);
 			session.addForwarder(next);
 	
 			if(session.hasOwner()){
 				//このSessionManagerがオーナーを持っている場合、Sessionにエディタを追加し、エディタへAckを返す
-				REPCommand sendCommand = new REPCommand(receivedCommand);
+				REPCommand sendCommand = new REPCommand(command);
 				sendCommand.setCMD(REP.SMCMD_SELECT_ACK);
 				sendCommand.setEID(next.getEID());
 				next.send(sendCommand);
 			}else{
 				//オーナーを持ってない場合は、オーナーを持っているSessionManagerへSELECTコマンドを中継する
 				Forwarder owner = session.getOwner();
-				owner.send(receivedCommand);
+				owner.send(command);
 			}
 		}
 	
@@ -530,20 +537,20 @@
 	
 		case SMCMD_SELECT_ACK:
 		{
-			String hostport = receivedCommand.string;
+			String hostport = command.string;
 			Forwarder editor1 = getEditor(hostport);
 	
 			if(editor1 != null) {
 				//host, port を見て、このコマンドが自分が送信したSelectコマンドのAckかどうかを判断する
-				REPCommand command = new REPCommand();
-				command.setCMD(REP.SMCMD_JOIN_ACK);
-				command.setSID(receivedCommand.sid);
-				command.setEID(receivedCommand.eid);
+				REPCommand command1 = new REPCommand();
+				command1.setCMD(REP.SMCMD_JOIN_ACK);
+				command1.setSID(command.sid);
+				command1.setEID(command.eid);
 				editor1.send(command);
 	
 			}else{
 				//自分が送信したコマンドでなければ、次のSessionManagerへ中継する
-				smList.sendToSlaves(receivedCommand);
+				smList.sendToSlaves(command);
 			}
 		}
 	
@@ -551,68 +558,51 @@
 		case SMCMD_SM_JOIN:
 	
 		{
-			// SM_JOIN中にSMCMD_SM_JOINが来たら、自分のSM_JOINを
-			// 取り消す。
-			if (smjoin_mode) cancel_sm_join();
+			// SM_JOIN中にSMCMD_SM_JOINが来たら、これはループなので、
+			///自分のSM_JOINを取り消す。
+			if (sm_join_channel!=null) cancel_sm_join();
 			// SMCMD_SM_JOIN は、master まで上昇する。
 			//    masterでなければ、自分のparentに転送する。
-			if(smList.isMaster()) {
+			if(isMaster()) {
 				//    master であれば、SessionManager IDを決めて、
 				//    自分のsmList に登録
-				int sid = smList.addNewSessionManager(receivedCommand);
-				//SessionListからXMLを生成。
-				//joinしてきたSessionManagerに対してACKを送信。
-				SessionXMLEncoder sessionlistEncoder = new SessionXMLEncoder(sessionList);
-				REPCommand sendCommand = new REPCommand();
+				Forwarder sm;
+				int psid = command.eid;
+				if (forwarder.sid!=-1) {
+				// すでに channelはSessionManager Idを持っていて、
+				// direct link ではないので、
+				// channel を持たないForwarderとして登録する
+					sm = new Forwarder(this);
+				} else {
+					sm = forwarder;
+				}
+				int sid = smList.addNewSessionManager(sm,command);		
+				REPCommand sendCommand = makeREPCommandWithSessionList(REP.SMCMD_SM_JOIN_ACK);
+				// command.eid==smList.sesionManagerID() の場合は、
+				// 待っている自分の下のsessionManagerにsidをassignする必要がある。
 				sendCommand.setSID(sid); // new Session manager ID
 				// 複数のSM_JOIN_ACKを識別するには、最初にSM_JOINを受け付けた
 				// Session manager IDを使う。
-				sendCommand.setEID(receivedCommand.eid);
-				sendCommand.setCMD(REP.SMCMD_SM_JOIN_ACK);
-				sendCommand.setString(sessionlistEncoder.sessionListToXML());
-				smList.sendToSlaves(sendCommand);
-				break;
+				sendCommand.setEID(psid);
+				send_sm_join_ack(psid, sid, sendCommand);
+			} else {
+				if (forwarder.sid==-1) {
+					// direct link の場合は、識別のために、EIDに直上の
+					// smid を入れておく。
+					command.setEID(smList.sessionManagerID());
+				}
+				smList.sendToMaster(command);
 			}
-			// 
-	
-			//XMLからSessionListオブジェクトを生成する。
-			//SessionList receivedSessionList = decoder.decode(receivedCommand.string);
-	
-			//myHost を設定。
-			//立ち上げ時にやるとlocalhostしか取れない
-			if(myHost == null) setMyHostName(forwarder.getLocalHostName());
-	
-			//maxHost を設定。
-			//			if(setMaxHost(channel, receivedSessionList.getMaxHost())){
-			//				REPCommand sendCommand = new REPCommand();
-			//				sendCommand.setCMD(REP.SMCMD_CH_MASTER);
-			//				sendCommand.setString(maxHost);
-			//				smList.sendExcept(channel, sendCommand);
-			//			}
-	
-	
 		}
 		break;
 	
-		case SMCMD_SM_JOIN_ACK:
-	
-			//XMLからSessionListオブジェクトを生成。
-			//SessionList receivedSessionList2 = decoder.decode(receivedCommand.string);
-	
-			//maxHostを決定。
-			//			if(setMaxHost(channel, receivedSessionList2.getMaxHost())){
-			//				REPCommand sendCommand = new REPCommand();
-			//				sendCommand.setCMD(REP.SMCMD_CH_MASTER);
-			//				sendCommand.setString(maxHost);
-			//				smList.sendExcept(channel, sendCommand);
-			//			}
-	
-	
+		case SMCMD_SM_JOIN_ACK:				
+			send_sm_join_ack(command.eid, command.sid, command);
 			break;
 	
 		case SMCMD_UPDATE:
 		{
-			SessionList receivedSessionList3 = decoder.decode(receivedCommand.string);
+			SessionList receivedSessionList3 = decoder.decode(command.string);
 	
 			//UPDATEコマンドにより送られてきたSessionの情報を追加する
 			LinkedList<Session> list = receivedSessionList3.getList();
@@ -622,7 +612,7 @@
 			}
 	
 			//他のSessionManagerへ中継する
-			smList.sendToSlaves(receivedCommand);
+			smList.sendToSlaves(command);
 	
 			updateGUI();
 		}
@@ -630,20 +620,20 @@
 	
 		case SMCMD_UPDATE_ACK:
 		{
-			if(!hasSession(receivedCommand.sid)) {
+			if(!hasSession(command.sid)) {
 				// accept new Session
 				// ここで初めてsession id が決まる。
 				// このコマンドは、master session manager が出すはず
 				Forwarder sm = new Forwarder(this);
 				sm.setChannel(forwarder.channel);
-				Session session = new Session(receivedCommand.sid,receivedCommand.string,null);
+				Session session = new Session(command.sid,command.string,null);
 				session.addForwarder(sm);
 	
 				sessionList.add(session);
 				
 				updateGUI();
 			}
-			smList.sendToSlaves(receivedCommand);
+			smList.sendToSlaves(command);
 		}
 			break;
 	
@@ -666,4 +656,36 @@
 		return true;
 	}
 
+
+	void send_sm_join_ack(int psid, int sid,REPCommand sendCommand) {
+		if (psid==smList.sessionManagerID()) {
+			// 直下のsessionManagerにIDを割り振る必要がある。
+			smList.assignSessionManagerIDtoWaitingSM(sid);
+			// ここで smList に一つだけ追加されるので
+			// 待っている最初のsm一つにだけ、sm_joinが新たに送られる。
+		}
+		smList.sendToSlaves(sendCommand);
+	}
+
+
+	private REPCommand makeREPCommandWithSessionList(REP cmd) {
+		//SessionListからXMLを生成。
+		//joinしてきたSessionManagerに対してACKを送信。
+		SessionXMLEncoder sessionlistEncoder = new SessionXMLEncoder(sessionList);
+		REPCommand sendCommand = new REPCommand();
+		sendCommand.setCMD(cmd);
+		sendCommand.setString(sessionlistEncoder.sessionListToXML());
+		return sendCommand;
+	}
+
+
+	public boolean isMaster() {
+		return smList.isMaster();
+	}
+
+
+	public void setSessionManagerID(int sid) {
+		smList.setSessionManagerID(sid);		
+	}
+
 }