changeset 355:98607350f7d1

*** empty log message ***
author kono
date Fri, 17 Oct 2008 22:11:34 +0900
parents 6ea3aa6c795f
children b18c24dcc5d2
files rep/FirstConnector.java rep/Forwarder.java rep/SessionManager.java rep/SessionManagerList.java test/sematest/TestInterManagerSession.java
diffstat 5 files changed, 238 insertions(+), 103 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rep/FirstConnector.java	Fri Oct 17 22:11:34 2008 +0900
@@ -0,0 +1,75 @@
+package rep;
+
+import java.io.IOException;
+
+import rep.channel.REPSelectionKey;
+import rep.channel.REPSocketChannel;
+
+public class FirstConnector extends Forwarder {
+
+	public FirstConnector(SessionManager manager) {
+		super(manager);
+	}
+	
+	public void cancel(REPSocketChannel<REPCommand> socketChannel) {
+		manager.remove(socketChannel);
+	}
+
+	public void handle(REPSelectionKey<REPCommand> key) throws Exception {
+		/*
+		 * 接続要求は、EditorかSlave Editorで、
+		 *    join, put, sm_join
+		 * が来る。それ以外はエラー。master もありか?
+		 *      sm_join_ack
+		 */
+		Forwarder fw;
+		REPSocketChannel<REPCommand> channel = key.channel1();
+		REPCommand command = channel.read();
+		SessionManager.logger.writeLog("FirstConnector: command = " + command);
+		switch(command.cmd) {
+		case SMCMD_JOIN: 
+		{
+			//どのSessionにも属さないエディタをリストに追加
+			//エディタとchannelは1対1 (ではない)
+			//エディタが新しくputする場合は新しくソケットを作る
+			//   1対1でない場合は、multiplexerを挿めば良い
+			// ここのeditorList はsessionのとは別物
+			Editor editor = new Editor(manager,-1,channel);
+			editor.setHost(manager.myHost);
+			manager.editorList.add(editor);
+			fw = editor;
+			break;
+		}
+		case SMCMD_PUT:
+		{
+			// put の場合でも、eid は、masterまで聞きにいく必要が
+			// ある。
+			Editor editor = new Editor(manager,0,channel);
+			editor.setHost(manager.myHost);
+			manager.editorList.add(editor);
+			fw = editor;
+			break;
+		}
+		case SMCMD_SM_JOIN:
+		{
+			fw = new Forwarder(manager);
+			manager.smList.addWaitingSessionManager(fw, command);
+			break;
+		}
+		case SMCMD_SM_JOIN_ACK:
+			manager.setSessionManagerID(command.sid);
+			fw = new Forwarder(manager);
+			break;
+		default: throw new IOException();
+		}
+		//myHost を設定。
+		//立ち上げ時にやるとlocalhostしか取れない
+		if(manager.myHost == null) manager.setMyHostName(getLocalHostName());
+
+		fw.setMode(command.cmd);
+		manager.registerChannel(channel, fw);
+		manager.sessionManage(fw, command);
+	
+	}
+
+}
--- a/rep/Forwarder.java	Fri Oct 17 18:40:08 2008 +0900
+++ b/rep/Forwarder.java	Fri Oct 17 22:11:34 2008 +0900
@@ -16,6 +16,7 @@
 	final int limit=100;
 	REPLogger ns = REPLogger.singleton();
 	SessionManager manager;
+	public REP mode = null;
 	
 	public Forwarder(SessionManager manager) {
 		this.manager = manager;
@@ -85,4 +86,17 @@
 		} else throw new IOException();
 	}
 
+	public void setMode(REP cmd) {
+		mode = cmd;
+	}
+
+	public boolean isEditor() {
+		return mode==REP.SMCMD_JOIN||mode==REP.SMCMD_PUT;
+	}
+	
+	public boolean isForwarder() {
+		return mode==REP.SMCMD_SM_JOIN||mode==REP.SMCMD_SM_JOIN_ACK;
+	}
+
+
 }
\ No newline at end of file
--- a/rep/SessionManager.java	Fri Oct 17 18:40:08 2008 +0900
+++ b/rep/SessionManager.java	Fri Oct 17 22:11:34 2008 +0900
@@ -62,8 +62,7 @@
 	static final int DEFAULT_PORT = 8766;
 	private static final int packetLimit = 200;
 	SessionXMLDecoder decoder = new SessionXMLDecoder();
-
-	boolean smjoin_mode;
+	private Forwarder sm_join_channel;
 
 	public static void main(String[] args) throws InterruptedException, IOException {
 		
@@ -208,7 +207,7 @@
 			if(key.isAcceptable()){
 				REPSocketChannel<REPCommand> channel = key.accept(new REPCommandPacker());
 				logger.writeLog("SessionManager.select() : key.isAcceptable : channel = " + channel);
-				registerChannel (channel, SelectionKey.OP_READ,new Forwarder(this));
+				registerChannel (channel, new FirstConnector(this));
 				channel = null;
 
 			}else if(key.isReadable()){
@@ -223,18 +222,29 @@
 		}
 	}
 	
-	void registerChannel(REPSocketChannel<REPCommand> channel, int ops,Forwarder handler) throws IOException {
+	void registerChannel(REPSocketChannel<REPCommand> channel,Forwarder handler) throws IOException {
 		if(channel == null) {
 			return;
 		}
 		handler.setChannel(channel);
 		channel.configureBlocking(false);
-		channel.register(selector, ops, handler);
+		channel.register(selector, SelectionKey.OP_READ, handler);
 	}
 
 
 	void cancel_sm_join() {
-		smjoin_mode=false;
+		removeChannel(sm_join_channel);
+		sm_join_channel=null;
+	}
+
+
+	private void removeChannel(Forwarder sm_join_channel) {
+		REPSelectionKey<REPCommand> key = sm_join_channel.channel.keyFor1(selector);
+		key.cancel();
+		try {
+			sm_join_channel.channel.close();
+		} catch (IOException e) {
+		}
 	}
 
 
@@ -294,7 +304,16 @@
 		}
 	}
 
+
+	/**
+	 * Host 名のSession Manager に SM_JOIN する。自分は、Session を持っていては
+	 * ならない。複数のSession Managerにjoinすることは出来ない。(NATを実装するまでは)。
+	 * @param host
+	 */
 	public void connectSession(String host) {
+		if (sm_join_channel!=null) return;
+		if (!sessionList.isEmpty()) return;
+		if (!smList.isMaster()) return;
 		int port = parent_port;
 		InetSocketAddress addr = new InetSocketAddress(host, port);
 		try {
@@ -303,14 +322,14 @@
 			sessionchannel.connect(addr);
 			while(!sessionchannel.finishConnect());
 			Forwarder sm = new Forwarder(this);
-			registerChannel(sessionchannel, SelectionKey.OP_READ,sm);
+			registerChannel(sessionchannel, sm);
 			sm_join(sm);
 		}catch (IOException e) {
 		}
 	}
 	
 	private void sm_join(Forwarder channel){
-		smjoin_mode = true;
+		sm_join_channel = channel;
 		//SM_JOINコマンドを生成。
 		REPCommand command = new REPCommand();
 		command.setCMD(REP.SMCMD_SM_JOIN);
@@ -321,8 +340,9 @@
 		setMyHostName(channel.getLocalHostName());
 		
 		//XMLを生成。送信コマンドにセット。
-		SessionXMLEncoder encoder = new SessionXMLEncoder(sessionList);
-		String string = encoder.sessionListToXML();
+		//SessionXMLEncoder encoder = new SessionXMLEncoder(sessionList);
+		//String string = encoder.sessionListToXML();
+		String string = myHost;
 		command.setString(string);
 		
 		//SM_JOINコマンドを送信。
@@ -439,25 +459,15 @@
 	}
 
 
-	boolean sessionManage(Forwarder forwarder, REPCommand receivedCommand) throws ClosedChannelException,
+	boolean sessionManage(Forwarder forwarder, REPCommand command) throws ClosedChannelException,
 			IOException, SAXException {
-		switch(receivedCommand.cmd){
+		switch(command.cmd){
 	
 		// Session Manager Command
 	
 		case SMCMD_JOIN:
 		{
-			//どのSessionにも属さないエディタをリストに追加
-			//エディタとchannelは1対1 (ではない)
-			//エディタが新しくputする場合は新しくソケットを作る
-			// ここのeditorList はsessionのとは別物
-			Editor editor1 = new Editor(this,-1,forwarder.channel);
-			registerChannel(forwarder.channel,SelectionKey.OP_READ,editor1);
-			editor1.setHost(myHost);
-			editorList.add(editor1);
-	
 			updateGUI();
-	
 		}
 	
 		break;
@@ -473,18 +483,15 @@
 			// 自分の親に作ってもらう必要がある。自分が親なら自分で作って良い。
 			
 			int sid = sessionList.size();
-			Editor editor = new Editor(this,0, forwarder.channel);
-			registerChannel(forwarder.channel,SelectionKey.OP_READ,editor);
-			editorList.add(editor);
-			editor.setHost(myHost);
-			Session session = new Session(sid, receivedCommand.string, editor);
+			Editor editor = (Editor) forwarder;
+			Session session = new Session(sid, command.string, (Editor)forwarder);
 			session.hasOwner(true);
 			sessionList.add(session);
 	
 			updateGUI();
 	
 			//エディタにAckを送信
-			REPCommand sendCommand = new REPCommand(receivedCommand);
+			REPCommand sendCommand = new REPCommand(command);
 			sendCommand.setCMD(REP.SMCMD_PUT_ACK);
 			sendCommand.setEID(editor.getEID());
 			sendCommand.setSID(session.getSID());
@@ -493,10 +500,10 @@
 			//他のSessionManagerへSessionの追加を報告
 			//親に送って、親から子へ
 			SessionXMLEncoder sessionEncoder = new SessionXMLEncoder(session);
-			REPCommand command = new REPCommand();
-			command.setSID(session.getSID());
-			command.setString(sessionEncoder.sessionListToXML());
-			command.setCMD(REP.SMCMD_UPDATE);
+			REPCommand command1 = new REPCommand();
+			command1.setSID(session.getSID());
+			command1.setString(sessionEncoder.sessionListToXML());
+			command1.setCMD(REP.SMCMD_UPDATE);
 			smList.sendToSlaves( command);
 	
 		}
@@ -510,19 +517,19 @@
 			//他のSessionManagerをエディタとしてSessionに追加
 			Forwarder next = new Forwarder(this);
 			next.setChannel(forwarder.channel);
-			Session session = getSession(receivedCommand.sid);
+			Session session = getSession(command.sid);
 			session.addForwarder(next);
 	
 			if(session.hasOwner()){
 				//このSessionManagerがオーナーを持っている場合、Sessionにエディタを追加し、エディタへAckを返す
-				REPCommand sendCommand = new REPCommand(receivedCommand);
+				REPCommand sendCommand = new REPCommand(command);
 				sendCommand.setCMD(REP.SMCMD_SELECT_ACK);
 				sendCommand.setEID(next.getEID());
 				next.send(sendCommand);
 			}else{
 				//オーナーを持ってない場合は、オーナーを持っているSessionManagerへSELECTコマンドを中継する
 				Forwarder owner = session.getOwner();
-				owner.send(receivedCommand);
+				owner.send(command);
 			}
 		}
 	
@@ -530,20 +537,20 @@
 	
 		case SMCMD_SELECT_ACK:
 		{
-			String hostport = receivedCommand.string;
+			String hostport = command.string;
 			Forwarder editor1 = getEditor(hostport);
 	
 			if(editor1 != null) {
 				//host, port を見て、このコマンドが自分が送信したSelectコマンドのAckかどうかを判断する
-				REPCommand command = new REPCommand();
-				command.setCMD(REP.SMCMD_JOIN_ACK);
-				command.setSID(receivedCommand.sid);
-				command.setEID(receivedCommand.eid);
+				REPCommand command1 = new REPCommand();
+				command1.setCMD(REP.SMCMD_JOIN_ACK);
+				command1.setSID(command.sid);
+				command1.setEID(command.eid);
 				editor1.send(command);
 	
 			}else{
 				//自分が送信したコマンドでなければ、次のSessionManagerへ中継する
-				smList.sendToSlaves(receivedCommand);
+				smList.sendToSlaves(command);
 			}
 		}
 	
@@ -551,68 +558,51 @@
 		case SMCMD_SM_JOIN:
 	
 		{
-			// SM_JOIN中にSMCMD_SM_JOINが来たら、自分のSM_JOINを
-			// 取り消す。
-			if (smjoin_mode) cancel_sm_join();
+			// SM_JOIN中にSMCMD_SM_JOINが来たら、これはループなので、
+			///自分のSM_JOINを取り消す。
+			if (sm_join_channel!=null) cancel_sm_join();
 			// SMCMD_SM_JOIN は、master まで上昇する。
 			//    masterでなければ、自分のparentに転送する。
-			if(smList.isMaster()) {
+			if(isMaster()) {
 				//    master であれば、SessionManager IDを決めて、
 				//    自分のsmList に登録
-				int sid = smList.addNewSessionManager(receivedCommand);
-				//SessionListからXMLを生成。
-				//joinしてきたSessionManagerに対してACKを送信。
-				SessionXMLEncoder sessionlistEncoder = new SessionXMLEncoder(sessionList);
-				REPCommand sendCommand = new REPCommand();
+				Forwarder sm;
+				int psid = command.eid;
+				if (forwarder.sid!=-1) {
+				// すでに channelはSessionManager Idを持っていて、
+				// direct link ではないので、
+				// channel を持たないForwarderとして登録する
+					sm = new Forwarder(this);
+				} else {
+					sm = forwarder;
+				}
+				int sid = smList.addNewSessionManager(sm,command);		
+				REPCommand sendCommand = makeREPCommandWithSessionList(REP.SMCMD_SM_JOIN_ACK);
+				// command.eid==smList.sesionManagerID() の場合は、
+				// 待っている自分の下のsessionManagerにsidをassignする必要がある。
 				sendCommand.setSID(sid); // new Session manager ID
 				// 複数のSM_JOIN_ACKを識別するには、最初にSM_JOINを受け付けた
 				// Session manager IDを使う。
-				sendCommand.setEID(receivedCommand.eid);
-				sendCommand.setCMD(REP.SMCMD_SM_JOIN_ACK);
-				sendCommand.setString(sessionlistEncoder.sessionListToXML());
-				smList.sendToSlaves(sendCommand);
-				break;
+				sendCommand.setEID(psid);
+				send_sm_join_ack(psid, sid, sendCommand);
+			} else {
+				if (forwarder.sid==-1) {
+					// direct link の場合は、識別のために、EIDに直上の
+					// smid を入れておく。
+					command.setEID(smList.sessionManagerID());
+				}
+				smList.sendToMaster(command);
 			}
-			// 
-	
-			//XMLからSessionListオブジェクトを生成する。
-			//SessionList receivedSessionList = decoder.decode(receivedCommand.string);
-	
-			//myHost を設定。
-			//立ち上げ時にやるとlocalhostしか取れない
-			if(myHost == null) setMyHostName(forwarder.getLocalHostName());
-	
-			//maxHost を設定。
-			//			if(setMaxHost(channel, receivedSessionList.getMaxHost())){
-			//				REPCommand sendCommand = new REPCommand();
-			//				sendCommand.setCMD(REP.SMCMD_CH_MASTER);
-			//				sendCommand.setString(maxHost);
-			//				smList.sendExcept(channel, sendCommand);
-			//			}
-	
-	
 		}
 		break;
 	
-		case SMCMD_SM_JOIN_ACK:
-	
-			//XMLからSessionListオブジェクトを生成。
-			//SessionList receivedSessionList2 = decoder.decode(receivedCommand.string);
-	
-			//maxHostを決定。
-			//			if(setMaxHost(channel, receivedSessionList2.getMaxHost())){
-			//				REPCommand sendCommand = new REPCommand();
-			//				sendCommand.setCMD(REP.SMCMD_CH_MASTER);
-			//				sendCommand.setString(maxHost);
-			//				smList.sendExcept(channel, sendCommand);
-			//			}
-	
-	
+		case SMCMD_SM_JOIN_ACK:				
+			send_sm_join_ack(command.eid, command.sid, command);
 			break;
 	
 		case SMCMD_UPDATE:
 		{
-			SessionList receivedSessionList3 = decoder.decode(receivedCommand.string);
+			SessionList receivedSessionList3 = decoder.decode(command.string);
 	
 			//UPDATEコマンドにより送られてきたSessionの情報を追加する
 			LinkedList<Session> list = receivedSessionList3.getList();
@@ -622,7 +612,7 @@
 			}
 	
 			//他のSessionManagerへ中継する
-			smList.sendToSlaves(receivedCommand);
+			smList.sendToSlaves(command);
 	
 			updateGUI();
 		}
@@ -630,20 +620,20 @@
 	
 		case SMCMD_UPDATE_ACK:
 		{
-			if(!hasSession(receivedCommand.sid)) {
+			if(!hasSession(command.sid)) {
 				// accept new Session
 				// ここで初めてsession id が決まる。
 				// このコマンドは、master session manager が出すはず
 				Forwarder sm = new Forwarder(this);
 				sm.setChannel(forwarder.channel);
-				Session session = new Session(receivedCommand.sid,receivedCommand.string,null);
+				Session session = new Session(command.sid,command.string,null);
 				session.addForwarder(sm);
 	
 				sessionList.add(session);
 				
 				updateGUI();
 			}
-			smList.sendToSlaves(receivedCommand);
+			smList.sendToSlaves(command);
 		}
 			break;
 	
@@ -666,4 +656,36 @@
 		return true;
 	}
 
+
+	void send_sm_join_ack(int psid, int sid,REPCommand sendCommand) {
+		if (psid==smList.sessionManagerID()) {
+			// 直下のsessionManagerにIDを割り振る必要がある。
+			smList.assignSessionManagerIDtoWaitingSM(sid);
+			// ここで smList に一つだけ追加されるので
+			// 待っている最初のsm一つにだけ、sm_joinが新たに送られる。
+		}
+		smList.sendToSlaves(sendCommand);
+	}
+
+
+	private REPCommand makeREPCommandWithSessionList(REP cmd) {
+		//SessionListからXMLを生成。
+		//joinしてきたSessionManagerに対してACKを送信。
+		SessionXMLEncoder sessionlistEncoder = new SessionXMLEncoder(sessionList);
+		REPCommand sendCommand = new REPCommand();
+		sendCommand.setCMD(cmd);
+		sendCommand.setString(sessionlistEncoder.sessionListToXML());
+		return sendCommand;
+	}
+
+
+	public boolean isMaster() {
+		return smList.isMaster();
+	}
+
+
+	public void setSessionManagerID(int sid) {
+		smList.setSessionManagerID(sid);		
+	}
+
 }
--- a/rep/SessionManagerList.java	Fri Oct 17 18:40:08 2008 +0900
+++ b/rep/SessionManagerList.java	Fri Oct 17 22:11:34 2008 +0900
@@ -8,17 +8,12 @@
 	private LinkedList<Forwarder> list = new LinkedList<Forwarder>();
 	private int mySMID;
 	private Forwarder parent=null;
+	private LinkedList<Forwarder> waiting= new LinkedList<Forwarder>();
 
 	public void add(Forwarder channel) {
 		list.add(channel);
 	}
 
-	public void sendUpdate(int sessionID, String string) {
-		for(Forwarder f : list){
-			f.send(new REPCommand(REP.SMCMD_UPDATE, 0, mySMID, 0, 0, string));
-		}
-	}
-
 	public void setMaster(Forwarder f) {
 		this.parent = f;
 	}
@@ -37,10 +32,12 @@
 		return parent==null;
 	}
 
-	public int addNewSessionManager(REPCommand receivedCommand) {
-		return mySMID;
-		// TODO Auto-generated method stub
-		
+	public int addNewSessionManager(Forwarder fw,REPCommand receivedCommand) {
+		list.add(fw);
+		int sid = list.size();
+		fw.setSID(sid);
+		fw.setName(receivedCommand.string);
+		return sid;
 	}
 
 	public boolean isSessionManagerChannel(REPSocketChannel<REPCommand> channel) {
@@ -50,6 +47,32 @@
 		return false;
 	}
 
+	public void setSessionManagerID(int sid) {
+		mySMID = sid;
+	}
+	
+	public int sessionManagerID() {
+		return mySMID;
+	}
+
+	public void addWaitingSessionManager(Forwarder fw, REPCommand command) {
+		// SID assign 待ちのSessionManager Channelを登録する
+		waiting.add(fw);
+		
+	}
+
+	public void assignSessionManagerIDtoWaitingSM(int sid) {
+		// 待っていたSession Manager ChannelにSession IDを登録し,Session Manager List
+		// に登録する。この次のsm_join_ackでSIDが確定する。
+		Forwarder waiter;
+		if ((waiter=waiting.poll())!=null) {
+			waiter.setSID(sid);
+			list.add(waiter);
+			return;
+		}
+		assert false;
+	}
+
 
 
 
--- a/test/sematest/TestInterManagerSession.java	Fri Oct 17 18:40:08 2008 +0900
+++ b/test/sematest/TestInterManagerSession.java	Fri Oct 17 22:11:34 2008 +0900
@@ -103,7 +103,8 @@
 		 * Define pending command and set null command for now.
 		 */
 		LinkedList<REPCommand>cmds = new LinkedList<REPCommand>();
-		cmds.add(new REPCommand(REP.SMCMD_PUT,0,0,0,0,"Editor0-file"));
+		cmds.add(new REPCommand(REP.SMCMD_JOIN,0,0,0,0,"Editor0-file"));
+		//cmds.add(new REPCommand(REP.SMCMD_PUT,0,0,0,0,"Editor0-file"));
 		cmds.add(new REPCommand(REP.REPCMD_INSERT,0,0,0,0,"m0"));
 		cmds.add(new REPCommand(REP.REPCMD_DELETE,0,0,0,0,"m0"));
 		editorStartCmds = cmds;