view rep/handler/Editor.java @ 444:97593c486db6

blocking in-editor packet/out-editor packet in merging
author one
date Mon, 13 Sep 2010 14:52:11 +0900
parents bd086be276d2
children 22a741c1fa2d
line wrap: on
line source

package rep.handler;

import java.io.IOException;
import java.util.LinkedList;
import java.util.List;

import rep.PacketSet;
import rep.REP;
import rep.REPCommand;
import rep.ServerMainLoop;
import rep.SessionManager;
import rep.channel.REPSelectionKey;
import rep.channel.REPSocketChannel;
import rep.optimizers.*;

public class Editor extends Forwarder {

	private Translator translator;
	// REPCommands we are going to send to the next editor
	private LinkedList<REPCommand> sentList = new LinkedList<REPCommand>();
	protected LinkedList<PacketSet> waitingCommandInMerge= new LinkedList<PacketSet>();
	private REPCommand quit2=null;
	private REPCommand preMergeCommand;
	private boolean merging;
	public static boolean noMergeMode=false;
	static final boolean doOptimize = true;

	public Editor(SessionManager manager,int editorNo){
		// no translator case
		super(manager, null);
	}

	public Editor(int editorNo, SessionManager manager,REPSocketChannel<REPCommand> channel){
		super(editorNo,manager,channel);
		eid = editorNo;
		REPCommandOptimizer optimizer;
		if (doOptimize) optimizer = new DeleteInsertOptimizer(); //タカノがつくったおぷてぃまいざ
		else            optimizer = new NullOptimizer();         //なにもしないけどOptimizer.
		translator = new Translator(eid,optimizer);
	}
	
	/*
	 *  Merge Protocol
	(1) Editor CommandをSession Ring 上に流し、それが戻って来るまでに、他のEditorから
	    受け取った Editor Command をキューに入れておく。
	(2) 戻って来たタイミングで、キュー上のEditor Commandを、eid とCommandの
	    順序を基にソートする。(self merge)
	(3) 他のEditorにソートのタイミングを与えるために、Editor Command の
	    ack を、もう一周させる。
	(4) 他のEditorのCommandを受け取ってから、ack が来るまでのCommandをキューに
		入れておき、ack が来たら、eid とCommandの順序を基にソートする。(other merge)

		Editor には、ソートした編集結果になるように、それまで行なった編集をUndo
		して、ソートした編集結果を適用する。Undo が無駄な動作をしないように最適化する。
		
		handle()   
		   セッションの処理 
		   manage()
		       編集コマンドは translate() へ
		       		一周して来た編集コマンドのACKは廃棄 (merge queue から削除)
		          一周して来た自分のコマンドならself merge
		       		他のエディタの編集コマンドのACK->other merge
		       			それ以外は、そのまま実行、merge queue へ格納
		          merge は checkReturnedCommand() から
		             startMerge() へ 
		        			まず、接続されている Editor に START_MERGE を送る
		        			邪魔されないように、他のcommand は block する
		   manager()		
		      START_MERGE_ACK が来たら、translator.mergeAck() で教えて、
		   		merge()->
		   			translator.checkOwnCommand() へ 
		   				ここで、sort されて、Merge Command をEditorへ送信 
		   			checkEndMerge()から
		   				endMerge() が呼ばれる。
		   					自分のエディタにEND_MERGE で Merge終了を通知 
		   					自分のコマンドは、ACKに変えて送信 (3)
		   					それ以外は、そのまま送信 (一周させる)
		       
	 */

	public void translate(REPCommand command){				
		switch(command.cmd) {
		case REPCMD_INSERT_ACK:
		case REPCMD_DELETE_ACK:
			if (waitingRequired(command,null)) return;
			if (command.eid==eid) {
				// Second Phase が終わって同期が終了。
				checkAck(command);
				// SessionManager.logger.writeLog("Complete "+command);
				checkQuit();
				return;
			}
			checkReturnedCommand(command);
			return;
		case REPCMD_INSERT_USER:
			command.cmd = REP.REPCMD_INSERT;
			userEditorCommand(command);
			return;
		case REPCMD_DELETE_USER:
			command.cmd = REP.REPCMD_DELETE;
			userEditorCommand(command);
			return;
		case REPCMD_INSERT:
		case REPCMD_DELETE:
			if (command.eid == REP.MERGE_EID.id){
				//マージコマンドが返ってきた
				if(translator.checkMergeConflict(command)){
					//マージ中にエディタからの割り込みがあった場合
					translator.getMergeAgain(this);
				}
				checkEndMerge();
				return;
			} else if (command.eid == eid){ 
				// 編集コマンドが一周して来た
				if (waitingRequired(command,null)) return;
				checkReturnedCommand(command);
				return;
			}

			//他のエディタからの編集コマンド
			if (waitingRequired(command,null)) return;
			translator.transReceiveCmd(next,command);

			sendEditorCommand(command);
			return;
		default:
			assert(false);
		}
	}

	private void userEditorCommand(REPCommand command) {
		//エディタからの新たな編集コマンド
		if (next==this) return; // singleton case
		translator.transSendCmd(command);
		sendEditorCommand(command);
		return;
	}
	
//	private void checkDouble(List<REPCommand> sentList) {
//		if (sentList.size()==0) return;
//		int count = 0;
//		REPCommand f = sentList.get(0); 
//		for(REPCommand c:sentList) {
//			if (c.eid==f.eid&&c.seq==f.seq) {
//				count++;
//			}
//		}
//		assert(count==1);
//		if (true) return;
//		count = 0;
//		for(PacketSet c:waitingCommandInMerge) {
//			for(REPCommand g:sentList) { 
//				if (c.command.eid==g.eid&&c.command.seq==g.seq) {
//					count++;
//				}
//			}
//		}
//		assert(count==0);
//	}

	private boolean waitingRequired(REPCommand command, REPSocketChannel<REPCommand> channel) {
		if (hasWaitingCommand()) {
			// We cannot do this operation before watingCommandQueue.
			addWaitingCommand(new PacketSet(channel, this, new REPCommand(command)));
			return true;
		} else if (isMerging()) { 
			addWaitingCommand(new PacketSet(channel, this, new REPCommand(command)));
			return true;
		} 
		//ServerMainLoop.logger.writeLog("Editor eid:"+eid+" no waiting");
		return false;
	}

    public void addWaitingCommand(PacketSet set) {
//		if (preMergeCommand!=null) {
//			if (preMergeCommand.eid==set.command.eid
//					&& preMergeCommand.seq==set.command.seq) {
//				assert(false);
//			}
//		}
		waitingCommandInMerge.add(set);
	}

	/**
	 * 他のエディタへのコマンドの送信
	 * @param command
	 * 
	 * sendList にキープする必要がある。 
	 */
	private void sendEditorCommand(REPCommand command) {
		REPCommand keep = new REPCommand(command);
		sentList.add(keep);
		//ServerMainLoop.logger.writeLog("Editor eid:"+eid+" sentList = "+sentList);
		assert(sentList.size()<limit);
		if (command.cmd==REP.REPCMD_DELETE) {
			// delete のundo用の文字列は、外に出す意味はない
			command.string=null;
		}
		next.send(command);
	}

	boolean merge(REPCommand command) {
		//マージして送信
		return translator.catchOwnCommand(this, command);
	}

	@Override
	public  List<REPCommand> getSentList() {
		return sentList;
	}

	/**
	 * 一周して来たcommandの処理。
	 * 
	 *   INSERT/DELETEを受け取った時に、sentListに登録
	 *    INSERT_ACK/DELETE_ACKが来たら一周。そこで、Mergeする。
	 *    
	 *    自分が出したINSERT/DELETEが戻って来たら、ACKに変更して、Merge。 
	 * 
	 * 途中から参加した場合、自分が受けとってないcommandのACKが先に来ることが
	 * ある。それは、無視して良い。
	 * @param command
	 */
	void checkReturnedCommand(REPCommand command) {
		startMerge(command);
		return;
	}

	private boolean checkAck(REPCommand command) {
		assert(!merging);
		REPCommand prev = sentList.pollFirst();
		if (prev==null || prev.seq != command.seq || prev.eid!=command.eid) {
			String err = "Editor eid="+eid+" checkReturnedCommand() : command = " + command + " prev="+
				(prev==null?"null":prev)+" sentList=";
			err += sentList;
			ServerMainLoop.logger.writeLog(err);
			assert(false);
		}
		return true;
	}

	private void startMerge(REPCommand command) {
		preMergeCommand = new REPCommand(command);
		preMergeCommand.string = "";
		// merge は必須だが、EditorのCommand実装をテストするには邪魔なので、off に出来るようにする。
		if (noMergeMode) {
			checkQuit();
			endMerge();
			return;
		}
		// START_MERGE を送る
		//    送らないで良い場合もある?
		REPCommand cmd = new REPCommand(REP.SMCMD_START_MERGE,command.sid,REP.SM_EID.id,seq(),0,"");
		send(cmd);
		merging = true;
		// Session Manager 側で、このeditorへの他のeditorからの
		// 入力を止めて、merge にそなえる。merge は、eidtor 側から
		// ACKが来てから始まる。
		translator.startMerge(command);
	}

	@Override
	public void setQuit2(REPCommand cmd) {
		quit2 = cmd;
		checkQuit();
		// do not send quit2 until we received all pending
		// command
	}

	@Override
	public void setEID(int eid) {
		this.eid = eid;
		if (translator!=null)
			translator.setEid(eid);
	}
	
	public String toString(){
		return ("Editor eid="+eid+" sid="+sid+" " + host  + ":" + file);
	}

	public boolean isMerging() {
		return translator.isMerging();
	}


	void checkEndMerge() {
		if (merging) {
			if (translator.isMerging()) return;
			endMerge();
			merging = false;
		}
		if (quit2!=null) checkQuit();
	}


	private void endMerge() {
		REPCommand mergeEnd = new REPCommand(REP.SMCMD_END_MERGE,sid,eid,seq(),0,"");
		send(mergeEnd);
		sentList.remove(0);
		if (preMergeCommand.eid==eid) {
			// First Phase End, send ACK
			REPCommand keep = new REPCommand(preMergeCommand);
			switch(keep.cmd) {
			case REPCMD_INSERT: keep.cmd = REP.REPCMD_INSERT_ACK;break;
			case REPCMD_DELETE: keep.cmd = REP.REPCMD_DELETE_ACK;break;
			default: assert(false);
			}
			sentList.add(keep);
			//ServerMainLoop.logger.writeLog("Editor eid:"+eid+" sentList = "+sentList);
			assert(sentList.size()<limit);
			next.send(keep);
		} else {
			next.send(preMergeCommand);
		}
		preMergeCommand = null;
	}

	private boolean checkQuit() {
		if (quit2!=null && sentList.size()==0&&!isMerging()) {
			send(quit2);
			quit2 = null;
			return true;
		}
		return false;
	}

	@Override
	public boolean manage(REPCommand command) {
		
		
		switch(command.cmd){
		// Editor Command

		case REPCMD_DELETE:
		case REPCMD_INSERT:
		case REPCMD_DELETE_USER:
		case REPCMD_INSERT_USER:
		case REPCMD_DELETE_ACK:
		case REPCMD_INSERT_ACK:
		{
			translate(command);
			break;
		}

		case SMCMD_START_MERGE_ACK:
		{
			// マージの処理と次のエディタへコマンドを送信する処理
			translator.mergeAck();
			if (!merge(preMergeCommand)) {
				// nothing to do, send END_MERGE
				checkEndMerge();
			}
			break;
		}
		
		case SMCMD_SYNC:
			if (isMaster()) 
				send(command);
			else
				next.send(command);
			
		case SMCMD_QUIT:
		{
			next.send(command);
			break;
		}
		case SMCMD_QUIT_2:
		{
			// QUIT_2 is returned.
			if (command.eid!=eid) {
				// stop this editor unless this is the start, starter will stopped
				// by QUIT_2_ACK
				manager.remove(this);
			}
			// don't send quit_2 directly to the editor until all pending
			// merge is processed.
			//   this does not work in distributed case.
			if (next.isDirect())
				next.setQuit2(command);
			else
				next.send(command);
			break;
		}
		case SMCMD_QUIT_2_ACK:
		{
			manager.remove(this);
			break;
		}
		default:
			assert false;
			return false;
		}
		return true;
	}
	
	/**
	 * send command to the editor
	 *   called from another Editor instance such as next.send(command)
	 */
	@Override
	public void send(REPCommand command) {
		if (command.eid == REP.MERGE_EID.id || 
				command.cmd==REP.SMCMD_END_MERGE ||
				!waitingRequired(command,channel)) {
			super.send(command);
		}
	}
	
	@Override
	public void handle(REPCommand command, REPSelectionKey<REPCommand> key) throws IOException {
		//ServerMainLoop.logger.writeLog("Manager "+manager.getId()+" read : command = " + command 
				// +" from "+manager.editorList.editorByChannel(channel));
		if (command.cmd==REP.SMCMD_JOIN||command.cmd==REP.SMCMD_PUT) {
			// assert false;
			// 一つのエディタ上に複数のセッションが作られた場合。
			// 若干問題があるらしい
			next = new Forwarder(manager,next.channel);
			REPNode first = new FirstConnector(manager,channel);
			first.handle(command, key);
			key.attach(new Dispatcher(manager,channel));
			return;
		}
		if (manager.sessionManage(this, command)) return;
		manage(command);
	}

	@Override
	public void cancel(REPSocketChannel<REPCommand> socketChannel) {
		manager.remove(socketChannel);
	}

	public boolean isMaster() {
		return mode==REP.SMCMD_PUT;
	}

	
	/* Handle special case first, usually these cases
	 * are handled in the next Editor in a session manager, but 
	 * it is forwarded here.
	 */
	public void forwardedCommandManage(REPCommand command) {
		if (command.cmd==REP.SMCMD_QUIT_2) {
			// we have to wait next editor's finishing before sending this.
			// this is odd, but the editor itself does not know it's merging
			// state. Only this session manager knows it.
			setQuit2(command);
			return;
		} 
		send(command);
	}

	/**
	 * Check waiting command in merge
	 * @return true if there is a processed waiting command
	 * @throws IOException
	 */
	public void checkWaitingCommandInMerge() {
		if (translator==null||isMerging()) return;
		LinkedList<PacketSet> w = waitingCommandInMerge;
		waitingCommandInMerge = new LinkedList<PacketSet>();
		while(w.size()>0) {
			if (isMerging()) {
				w.addAll(waitingCommandInMerge);
				waitingCommandInMerge = w;
				return;
			}
			PacketSet p = w.remove(0);
			try {
				if (p.channel!=null)
					send(p.command);
				else
					manage(p.command);
			} catch (Exception e1) {
				assert false;
				manager.close(p.channel);
				return;
			}
		}
	}


	public boolean hasWaitingCommand() {
		return waitingCommandInMerge.size()>0;
	}
}