changeset 313:0585fd2410b8 single-insert-command

Single Insert Command worked.
author kono
date Sun, 05 Oct 2008 22:36:24 +0900
parents f39a8045175d
children edb373aa421e
files rep/Editor.java rep/REP.java rep/SessionManager.java rep/handler/REPHandlerInMerge.java rep/translater/Translater.java rep/translater/TranslaterImp1.java test/sematest/TestEditor.java test/sematest/TestSessionManager.java
diffstat 8 files changed, 116 insertions(+), 68 deletions(-) [+]
line wrap: on
line diff
--- a/rep/Editor.java	Sun Oct 05 11:39:18 2008 +0900
+++ b/rep/Editor.java	Sun Oct 05 22:36:24 2008 +0900
@@ -12,6 +12,7 @@
 public class Editor {
 	private int eid;            // unique id in a session
 	private int sid = -1 ;      // globally unique session id
+	private int seq = 0;
 	private REPSocketChannel<REPCommand> myChannel;
 	private String host;
 	private String file;
@@ -62,12 +63,9 @@
 		if(command.eid == eid){
 			if(checkReturnedCommand(command)){
 				//エディタからのコマンドが元のエディタに戻ってきた
-				//マージして送信
-				ArrayList<REPCommand> cmds = translater.catchOwnCommand(command);
-				//optimizer
-				//マージ中のエディタからの割り込み検知に使う
-				sentMergedList.addAll(cmds);
-				sendMergedCommand(cmds);
+				// START_MERGE を送る
+				REPCommand cmd = new REPCommand(REP.SMCMD_START_MERGE,command.sid,REP.SM_EID.id,seq(),0,"");
+				list.add(cmd);
 				return list;
 			}else{
 				//エディタからの新たな編集コマンド
@@ -96,15 +94,31 @@
 		return list;
 	}
 	
+	boolean merge(REPCommand command) {
+		REPCommand prev = translater.prev();
+		if(prev==null) return false;
+		assert(prev.eid==command.eid);
+		//マージして送信
+		ArrayList<REPCommand> cmds = translater.catchOwnCommand();
+		//optimizer
+		if (cmds.size()==0) 
+			return false; // no merge phase is necessary
+		//マージ中のエディタからの割り込み検知に使う
+		sentMergedList.addAll(cmds);
+		sendMergedCommand(cmds);
+		return true;
+	}
+	
 	private void sendMergedCommand(ArrayList<REPCommand> cmds) {
 		for(REPCommand mergeCommand : cmds){
 			mergeCommand.setEID(REP.MERGE_EID.id);
+			mergeCommand.setSEQID(seq());
 			writeQueue.add(mergeCommand);
 			assert(writeQueue.size()<limit);
 		}
 	}
 
-	private boolean checkReturnedCommand(REPCommand command) {
+	boolean checkReturnedCommand(REPCommand command) {
 		if(sentList.size() > 0){
 			if(sentList.get(0).seq == command.seq){
 				sentList.remove(0);
@@ -126,7 +140,7 @@
 		// select loop で送るしかない。
 		REPCommand cmd;
 		if (writeQueue.size()>0) {
-			cmd = writeQueue.remove(0);
+			cmd = new REPCommand(writeQueue.remove(0));
 			ns.writeLog("SessionManager write to "+myChannel+" cmd="+cmd);
 			myChannel.write(cmd);
 			return true;
@@ -199,5 +213,8 @@
 	public void setSID(int sessionID) {
 		sid   = sessionID;
 	}
+	public int seq() {
+		return seq++;
+	}
 
 }
--- a/rep/REP.java	Sun Oct 05 11:39:18 2008 +0900
+++ b/rep/REP.java	Sun Oct 05 22:36:24 2008 +0900
@@ -44,6 +44,7 @@
 	 SMCMD_QUIT_2 ( 78),
 
 
+	 SM_EID ( -1),
 	 MERGE_EID ( -2),
 	 SMCMD_CH_MASTER ( 79),
 	 SMCMD_UPDATE_UP ( 80),
--- a/rep/SessionManager.java	Sun Oct 05 11:39:18 2008 +0900
+++ b/rep/SessionManager.java	Sun Oct 05 22:36:24 2008 +0900
@@ -89,29 +89,44 @@
 		
 
 	}
+	
+	/*
+	 * We wrote everything in one thread, but we can assign
+	 * one thread for each communication channel and GUI event.
+	 */
 
 	public void mainLoop() throws IOException {
 		while(true){
-			SessionManagerEvent e;
-			while((e = waitingEventQueue.poll())!=null){
-				e.exec();
-			}
-			for(Session s:sessionList) {
-				for(Editor editor: s.getEditorList()) 
-					if (editor.doWaitingWrite()) break;
+			if (checkInputEvent() ||
+			    checkWaitingWrite() || 
+			    checkWaitingCommandInMerge()) {
+				   // try to do fair execution for waiting task
+				   if(selector.selectNow() > 0) select();
+				   continue;
 			}
-			// if there are waiting command during merge operation, do process it
-			if(checkWaitingCommandInMerge()){
-				if(selector.selectNow() > 0){
-					select();
-				}
-				continue;
-			}
+			// now we can wait for input packet or event
 			selector.select();
 			select();
 		}
 	}
 
+	private boolean checkInputEvent() {
+		SessionManagerEvent e;
+		if((e = waitingEventQueue.poll())!=null){
+			e.exec();
+			return true;
+		}
+		return false;
+	}
+
+	private boolean checkWaitingWrite() throws IOException {
+		for(Session s:sessionList) {
+			for(Editor editor: s.getEditorList()) 
+				if (editor.doWaitingWrite()) return true;
+		}
+		return false;
+	}
+
 	/**
 	 * Check waiting command in merge
 	 * @return true if there is a processed waiting command
@@ -387,28 +402,36 @@
 		case REPCMD_INSERT:
 		case REPCMD_NOP:
 		{
-			//sid から Session を取得
+			// sid から Session を取得
 			Session session = getSession(receivedCommand.sid);
 			if (session==null) throw new IOException();
-			//マージの処理と次のエディタへコマンドを送信する処理
+			// 次のエディタへコマンドを送信する処理
 			Editor editor = session.getEditor(channel);
 			boolean old = editor.isMerging();
 			session.translate(channel, receivedCommand);
-			boolean newState = editor.isMerging();
-			if (old!=newState) {
-				// prevEditor なのは変だと思うが...
+			if(editor.isMerging()!=old){
+				assert(old==false);
+				REPCommand mergeEnd = new REPCommand(REP.SMCMD_END_MERGE,receivedCommand.sid,editor.getEID(),editor.seq(),0,"");
+				editor.send(mergeEnd);
 				Editor prevEditor = session.getPrevEditor(editor);
+				setNormalState(prevEditor.getChannel(), session.getSID());
+			}
+			break;
+		}
+		case SMCMD_START_MERGE_ACK:
+		{
+			// sid から Session を取得
+			Session session = getSession(receivedCommand.sid);
+			if (session==null) throw new IOException();
+			// マージの処理と次のエディタへコマンドを送信する処理
+			Editor editor = session.getEditor(channel);
+			if (editor.merge(receivedCommand)) {
 				//マージ中のエディタはコマンドを受け取らない
-				// この代入は状態が変わったときだけ行えば良い。毎回、new するのは変
-				if(editor.isMerging()){
-					//Handlerを切り替える
-					setMergeState(prevEditor.getChannel(), session.getSID());
-				}else {
-					setNormalState(prevEditor.getChannel(), session.getSID());
-				}
+				Editor prevEditor = session.getPrevEditor(editor);
+				setMergeState(prevEditor.getChannel(), session.getSID());
 			}
+			break;
 		}
-		break;
 		case SMCMD_QUIT:
 		{
 			Session session = getSession(receivedCommand.sid);
@@ -459,16 +482,7 @@
 		return null;
 	}
 	
-	public Editor getEditor(REPSocketChannel<REPCommand> channel){
-		for(Editor editor : editorList){
-			if(editor.getChannel() == channel){
-				return editor;
-			}
-		}
-		return null;
-	}
-
-	private Session getSession(int sid) {
+	public Session getSession(int sid) {
 		for(Session session : sessionList){
 			if(session.getSID() == sid) return session;
 		}
--- a/rep/handler/REPHandlerInMerge.java	Sun Oct 05 11:39:18 2008 +0900
+++ b/rep/handler/REPHandlerInMerge.java	Sun Oct 05 22:36:24 2008 +0900
@@ -3,6 +3,7 @@
 import java.io.IOException;
 import rep.Editor;
 import rep.REPCommand;
+import rep.Session;
 import rep.SessionManager;
 import rep.channel.REPSelectionKey;
 import rep.channel.REPSocketChannel;
@@ -24,7 +25,8 @@
 		// if (manager.isMerging(command.sid()))...
 		//    同じchannelで、merge中のsessionは一つは限らない。
 		//    なので、sid をinstanceで持つのではだめ。
-		Editor editor = manager.getEditor(channel);
+		Session s = manager.getSession(command.sid);
+		Editor editor = s.getEditor(channel);
 		manager.addWaitingCommand(new PacketSet(channel, editor, command));
 	}
 
--- a/rep/translater/Translater.java	Sun Oct 05 11:39:18 2008 +0900
+++ b/rep/translater/Translater.java	Sun Oct 05 22:36:24 2008 +0900
@@ -18,7 +18,7 @@
 	 * but in this case, you can use also transReceiveCmd()
 	 * @param command which the editor sent.
 	 */
-	abstract public ArrayList<REPCommand> catchOwnCommand(REPCommand cmd);
+	abstract public ArrayList<REPCommand> catchOwnCommand();
 	
 	/**
 	 * Translate Command cmd that was received from SeMa.
--- a/rep/translater/TranslaterImp1.java	Sun Oct 05 11:39:18 2008 +0900
+++ b/rep/translater/TranslaterImp1.java	Sun Oct 05 22:36:24 2008 +0900
@@ -53,14 +53,10 @@
 	 * Dequeue command cmd that was returned.
 	 * @param cmd
 	 */
-	public ArrayList<REPCommand> catchOwnCommand(REPCommand cmd){
+	public ArrayList<REPCommand> catchOwnCommand(){
 		ArrayList<REPCommand> returnCmds = new ArrayList<REPCommand>();
 		ArrayList<REPCommand> cmds = new ArrayList<REPCommand>();
-		// ringである以上、戻ってきたコマンドは確実にキューsentCmdsの先頭にある事を期待している
-		REPCommand tmp = sentCmds.poll();
-		assert tmp.seq==cmd.seq;
-		assert cmd.eid==eid;
-
+		prev();
 				//スタック上にあるコマンドを全部undoコマンドにする
 		while ( !unMergedCmds.isEmpty() ){
 			REPCommand cmd0 = unMergedCmds.pop();
@@ -81,6 +77,10 @@
 		return returnCmds;
 	}
 
+	public REPCommand prev() {
+		return sentCmds.poll();
+	}
+
 	private REPCommand createUndo(REPCommand cmd){
 		String str = new String(cmd.string);
 		REPCommand retCmd = new REPCommand(cmd.cmd, cmd.sid, cmd.eid, cmd.seq, cmd.lineno, str);
--- a/test/sematest/TestEditor.java	Sun Oct 05 11:39:18 2008 +0900
+++ b/test/sematest/TestEditor.java	Sun Oct 05 22:36:24 2008 +0900
@@ -26,6 +26,7 @@
 	private InetSocketAddress semaIP;
 	private REPLogger ns;
 	private int seq = 0;
+	private int prevSeq = 0;
 	public Text text;
 	public LinkedList<REPCommand> cmds;
 	private int eid = 0;
@@ -34,7 +35,6 @@
 	boolean running = true;
 	long timeout = 1;
 	private String name;
-	private REPCommand nop = new REPCommand(REP.REPCMD_NOP,0,0,0,0,"");
 	private boolean inputLock=false;
 	public boolean detached=false;
 	public boolean master=false;
@@ -179,6 +179,7 @@
 			case SMCMD_JOIN:
 			case SMCMD_PUT:
 				sendCommand(cmd,seq++);
+				prevSeq = seq;
 				/*
 				 * To prevent confusion, stop user input until the ack
 				 */
@@ -194,7 +195,8 @@
 	}
 
 
-	private void sendCommand(REPCommand cmd,int seq) {
+	private void sendCommand(REPCommand cmd1,int seq) {
+		REPCommand cmd = new REPCommand(cmd1);
 		cmd.setSEQID(seq);
 		cmd.setEID(eid);
 		cmd.setSID(sid);
@@ -208,29 +210,32 @@
 		switch(cmd.cmd) {
 		case REPCMD_INSERT	:
 			text.insert(cmd.lineno, cmd.string);
+			if (cmd.eid==REP.MERGE_EID.id) break;
+			addNop();
 			sendCommand(cmd,cmd.seq);
-			// sendCommand(nop); session manager do this for me
 			break;
 		case REPCMD_INSERT_ACK	:
 			assert(false);
 			break;
 		case REPCMD_DELETE	:
 			String del = text.delete(cmd.lineno);
+			if (cmd.eid==REP.MERGE_EID.id) break;
+			addNop();
 			cmd.setString(del);
 			sendCommand(cmd,cmd.seq);
-			// sendCommand(nop); session manager do this for me
 			break;
 		 case REPCMD_DELETE_ACK	:
 				assert(false);
 			 break;
-		 case REPCMD_CLOSE	:
+		 case REPCMD_NOP		:
+ 			 if (cmd.eid==REP.MERGE_EID.id) break;
+			 addNop();
+			 sendCommand(cmd,cmd.seq);
+			 break;		 case REPCMD_CLOSE	:
 		 case REPCMD_CLOSE_2	:
 				assert(false);
 			 break;
-		 case REPCMD_NOP		:
-			 sendCommand(cmd,cmd.seq);
-			 sendCommand(nop,seq++);
-			 break;
+
 		 case SMCMD_JOIN_ACK	:
 			 sid = cmd.sid;
 			 eid = cmd.eid;
@@ -283,4 +288,12 @@
 		 	 break;
 		}
 	}
+
+	private void addNop() {
+		if (seq!=prevSeq) return;
+		// We haven't send any command, add nop before retransmition.
+		REPCommand nop = new REPCommand(REP.REPCMD_NOP, sid, eid, seq, 0, "");
+		sendCommand(nop,seq++);
+		prevSeq = seq;
+	}
 }
--- a/test/sematest/TestSessionManager.java	Sun Oct 05 11:39:18 2008 +0900
+++ b/test/sematest/TestSessionManager.java	Sun Oct 05 22:36:24 2008 +0900
@@ -80,11 +80,12 @@
 		if (i<ev1.length)
 			sm.syncExec(ev1[i]);
 		Runnable start = new Runnable() {
-			public void run() {		try {
-				sm.init(port1,gui);
-			} catch (IOException e) {
-			} catch (InterruptedException e) {
-			}
+			public void run() {		
+				try {
+					sm.init(port1,gui);
+				} catch (IOException e) {
+				} catch (InterruptedException e) {
+				}
 			}
 		};
 		new Thread(start).run();
@@ -105,7 +106,7 @@
 		 *    isSimulation=true     thread base simulation for PathFinder
 		 *    isSimulation=false    socket based communication mode
 		 */
-		REPServerSocketChannel.isSimulation = false;
+		REPServerSocketChannel.isSimulation = true;
 		TestSessionManager test = new TestSessionManager(1, 0, 2);
 		logger.setLogLevel(5);
 		test.startTest();