changeset 365:c432755c3555

distributed session debug continue... SELECT/SELECT_ACK loop
author kono
date Mon, 20 Oct 2008 17:00:42 +0900
parents c965ef2b5fd6
children e391433fa9f1
files rep/FirstConnector.java rep/Forwarder.java rep/SessionManager.java rep/SessionManagerList.java test/sematest/TestInterManagerSession.java
diffstat 5 files changed, 85 insertions(+), 47 deletions(-) [+]
line wrap: on
line diff
--- a/rep/FirstConnector.java	Mon Oct 20 13:44:34 2008 +0900
+++ b/rep/FirstConnector.java	Mon Oct 20 17:00:42 2008 +0900
@@ -61,10 +61,12 @@
 			manager.smList.addWaitingSessionManager(fw, command);
 			break;
 		}
-//		case SMCMD_SM_JOIN_ACK:
-//			manager.setSessionManagerID(command.sid);
-//			fw = new Forwarder(manager);
-//			break;
+		case SMCMD_SM_JOIN_ACK:
+			manager.setSessionManagerID(command.sid);
+			manager.afterConnect();
+			fw = new Forwarder(manager);
+			manager.setParent(fw);
+			break;
 		default: throw new IOException();
 		}
 		//myHost を設定。
--- a/rep/Forwarder.java	Mon Oct 20 13:44:34 2008 +0900
+++ b/rep/Forwarder.java	Mon Oct 20 17:00:42 2008 +0900
@@ -26,6 +26,7 @@
 	
 	public void send(REPCommand command) {
 		assert(command!=null);
+		assert(channel!=null);
 		REPCommand c = new REPCommand(command);
 		manager.addWriteQueue(new PacketSet(channel,null,  c));
 	}
--- a/rep/SessionManager.java	Mon Oct 20 13:44:34 2008 +0900
+++ b/rep/SessionManager.java	Mon Oct 20 17:00:42 2008 +0900
@@ -78,6 +78,7 @@
 	private Forwarder sm_join_channel;
 	// Routing table for session and session manager.
 	private RoutingTable routingTable = new RoutingTable();
+	private SessionManagerEvent execAfterConnect = null;;
 
 	public static void main(String[] args) throws InterruptedException, IOException {
 		
@@ -292,7 +293,18 @@
 	void updateGUI() {
 		//リストのコピーをGUIに渡す
 		LinkedList<Session> sList = new LinkedList<Session>(sessionList.values());
-		LinkedList<Editor> eList = new LinkedList<Editor>(editorList.values());
+		LinkedList<Editor> eList;
+		if (false) { 
+			// local editor only 
+			eList = new LinkedList<Editor>();
+			for(Editor e:editorList.values()) {
+				if (getSMID(e.eid)==smList.sessionManagerID()) {
+					eList.add(e);
+				}
+			}
+		} else {
+			eList = new LinkedList<Editor>(editorList.values());
+		}
 		//GUIに反映
 		Runnable doRun = new DoGUIUpdate(sList, eList, gui);
 		gui.invokeLater(doRun);
@@ -332,7 +344,7 @@
 			REPSocketChannel<REPCommand> sessionchannel = REPSocketChannel.<REPCommand>create(new REPCommandPacker());
 			sessionchannel.connect(addr);
 			while(!sessionchannel.finishConnect());
-			Forwarder sm = new Forwarder(this);
+			Forwarder sm = new FirstConnector(this);
 			registerChannel(sessionchannel, sm);
 			sm_join(sm);
 		}catch (IOException e) {
@@ -374,14 +386,12 @@
 	public void selectSession(SelectButtonEvent event) throws IOException {
 		int sid = event.getSID();
 		Session session = sessionList.get(sid);
-		
+		if (session==null) throw new IOException();
 		Editor editor = (Editor)event.getEditor();
-		if(editor == null){
-			logger.writeLog("Error SessionManager.selectSession(): editor = " + editor);
-			return;
-		}
 		if (editor.hasSession()) return;
-
+		// assert(getSMID(editor.eid)==smList.sessionManagerID());
+		// assert(editor.channel!=null);
+		editor.setSID(sid); // mark as selected
 		selectSession(sid, session, editor.getEID(), editor);
 	}
 
@@ -390,7 +400,7 @@
 	 *    called from GUI or incoming SMCMD_SELECT command.
 	 */
 	private void selectSession(int sid, Session session, int eid, Forwarder editor) {
-		if(session.hasOwner()){
+		if(session.hasOwner() && editor.channel!=null){
 			// we have selected session.
 			REPCommand sendCommand = new REPCommand();
 			if (editor.isDirect()&&editor.getEID()==eid) {
@@ -398,6 +408,7 @@
 				session.addForwarder(editor);
 				sendUpdate(session.getSID());
 				sendCommand.setCMD(REP.SMCMD_JOIN_ACK);
+				return;
 			} else {
 				// We have a session, but joined editor is on the other sm.
 				// SELECT_ACK is sent to the session ring to
@@ -416,17 +427,20 @@
 			sendCommand.string = session.getName();
 			editor.send(sendCommand);
 		}else {
-			// session searching continue...
-			Forwarder next = routingTable.toSession(sid);
-			
-			// create dummy editor for this session
-			Forwarder f = new Editor(this, false, makeID(editorList.newEid()));
-			f.setChannel(editor.channel); // incoming channel
-			f.setNext(next);
-			f.setHost(myHost);
-			f.setSID(sid);
-			session.setFirstForwarder(f);
-			
+			Forwarder next;
+			if (editor.channel==null) {
+				next = routingTable.toSessionManager(getSMID(eid));
+			} else {
+				// session searching continue...
+				next = routingTable.toSession(sid);
+				// create dummy editor for this session
+				Forwarder f = new Editor(this, false, makeID(editorList.newEid()));
+				f.setChannel(editor.channel); // incoming channel
+				f.setNext(next);
+				f.setHost(myHost);
+				f.setSID(sid);
+				session.setFirstForwarder(f);
+			}
 			// pass the select command to the next path.
 			REPCommand command = new REPCommand();
 			command.setCMD(REP.SMCMD_SELECT);
@@ -620,8 +634,8 @@
 		{
 			Session session = sessionList.get(command.sid);
 			if (session==null) {
-				sessionList.put(command.sid,
-						new Session(command.sid, command.string,null));
+				session = new Session(command.sid, command.string,null);
+				sessionList.put(command.sid,session);
 			}
 			selectSession(command.sid, session, command.eid, forwarder);
 		}
@@ -794,5 +808,18 @@
 		return sessionList.get(sid);
 	}
 
+	public void execAfterConnect(SessionManagerEvent sessionManagerEvent) {
+		execAfterConnect  = sessionManagerEvent;
+	}
+
+	public void afterConnect() {
+		if (execAfterConnect!=null) execAfterConnect.exec(this);
+		execAfterConnect = null;
+	}
+
+	public void setParent(Forwarder fw) {
+		smList.setParent(fw);
+	}
+
 
 }
--- a/rep/SessionManagerList.java	Mon Oct 20 13:44:34 2008 +0900
+++ b/rep/SessionManagerList.java	Mon Oct 20 17:00:42 2008 +0900
@@ -67,6 +67,10 @@
 		assert false;
 	}
 
+	public void setParent(Forwarder fw) {
+		parent = fw;
+	}
+
 
 
 
--- a/test/sematest/TestInterManagerSession.java	Mon Oct 20 13:44:34 2008 +0900
+++ b/test/sematest/TestInterManagerSession.java	Mon Oct 20 17:00:42 2008 +0900
@@ -29,12 +29,6 @@
 	private SessionManagerEvent ev2[] = {
 			new SessionManagerEvent() {
 				public void exec(SessionManager manager) {	
-					for(TestEditor editor:editors) {
-						if(editor.getPort()==manager.getPort()) {
-							logger.writeLog("Start client "+editor);
-							editor.start();
-						}
-					}
 					int i = sessionManagers.length;
 					for(SessionManager slave:slaveSessionManagers) {
 						if (slave.getParentPort()==masterPort) {
@@ -47,12 +41,6 @@
 			new SessionManagerEvent() {
 				public void exec(SessionManager manager) {	
 					manager.connectSession(host);
-					for(TestEditor editor:editors) {
-						if(editor.getPort()==manager.getPort()) {
-							logger.writeLog("Start client "+editor);
-							editor.start();
-						}
-					}
 				}
 			},
 			new SessionManagerEvent() {
@@ -63,17 +51,33 @@
 							manager.getPort());
 					}
 					manager.connectSession(host);
-					for(TestEditor editor:editors) {
-						if(editor.getPort()==manager.getPort()) {
-							logger.writeLog("Start client "+editor);
-							editor.start();		
-							editors[0].setCommand(editorStartCmds);
-						}
-					}
+					manager.execAfterConnect(
+							new SessionManagerEvent() {
+								public void exec(SessionManager manager) {
+									for(SessionManager m:sessionManagers) {
+										startEditor(m);
+									}
+									for(SessionManager m:slaveSessionManagers) {
+										startEditor(m);
+									}
+									editors[0].setCommand(editorStartCmds);
+								}
+							}
+					);
 				}
+
 			}
 	};
 
+	private void startEditor(SessionManager m) {
+		for(TestEditor editor:editors) {
+			if(editor.getPort()==m.getPort()) {
+				logger.writeLog("Start client "+editor);
+				editor.start();		
+			}
+		}
+	}
+	
 	/*
 	 * Create all editors, master session managers and slave session 
 	 * managers with specified port. All instances are not started yet.
@@ -111,7 +115,7 @@
 		//cmds.add(new REPCommand(REP.SMCMD_JOIN,0,0,0,0,"Editor0-file"));
 		cmds.add(new REPCommand(REP.SMCMD_PUT,0,0,0,0,"Editor0-file"));
 		cmds.add(new REPCommand(REP.REPCMD_INSERT,0,0,0,0,"m0"));
-		cmds.add(new REPCommand(REP.REPCMD_DELETE,0,0,0,0,"m0"));
+		//cmds.add(new REPCommand(REP.REPCMD_DELETE,0,0,0,0,"m0"));
 		editorStartCmds = cmds;
 		LinkedList<REPCommand>nullcmds = new LinkedList<REPCommand>();
 		editors[0].setCommand(nullcmds);
@@ -129,7 +133,7 @@
 	protected void startTest() {
 		int i = 0;
 		for(SessionManager master:sessionManagers) {
-			logger.writeLog("Start master "+master);
+			logger.writeLog("Start master "+i+" "+master);
 			i = startSessionManager(master,i, masterPort + i);
 		}
 	}