view rep/handler/Translator.java @ 464:295c257ac073

ack list separation
author one
date Fri, 01 Oct 2010 18:48:40 +0900
parents 629211b606e4
children c83af820eb62
line wrap: on
line source

package rep.handler;

import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.TreeSet;

import rep.REP;
import rep.REPCommand;
import rep.SessionManager;
import rep.channel.REPLogger;
import rep.optimizers.REPCommandOptimizer;

public class Translator {
	public int eid;
	
	public REPCommandOptimizer optimizer;
	private LinkedList<REPCommand> unMergedCmds;
	private LinkedList<REPCommand> sentMergedList;
	private TreeSet<REPCommand> sortedEditCmds;
	boolean mergeAgain;
	public REPLogger logger = SessionManager.logger;
	boolean merge_mode = false;

	public Translator(int _eid,REPCommandOptimizer opt){
		eid = _eid;
		optimizer = opt;
		unMergedCmds = new LinkedList<REPCommand>();
		mergeAgain = false;
		sentMergedList = new LinkedList<REPCommand>();
	}

	/**
	 * New command from an editor
	 * The command is sent to the next editor
	 * @param cmd
	 * @return translated command.
	 */
	public REPCommand transSendCmd(REPCommand cmd){
		assert(cmd.eid==eid);
		unMergedCmds.addLast(cmd);
		
		//マージ中にユーザから割り込みがあった場合
		if(isMerging()){
			mergeAgain = true;
		}
		
		return cmd;
	}
	
	/**
	 * My command is returned from the session ring, and START_MERGE_ACK
	 * is returned. At this
	 * stage my writeQueue is empty, our editor is waiting for me.
	 * Start merge process.
	 * @param cmd
	 */
	public boolean catchOwnCommand(REPNode editor, REPCommand prev){
		logger.writeLog("beforeMerge"+eid+":"+unMergedCmds);
		LinkedList<REPCommand> output = new LinkedList<REPCommand>();
		// merge queue上にあるコマンドを全部undoコマンドするのと同時に
		// sort したコマンド列を生成する
		for( REPCommand cmd0 : unMergedCmds) {
			output.add( createUndo(cmd0) );
		}

		sortedEditCmds = new TreeSet<REPCommand>(new REPCommandComparator(1));
		logger.writeLog("sentList"+eid+":"+editor.getSentList());
		for( REPCommand cmd0 : editor.getSentList()) {
			if (cmd0.cmd==REP.REPCMD_INSERT || cmd0.cmd==REP.REPCMD_DELETE)
				sortedEditCmds.add(cmd0);
		}
		logger.writeLog("sortedMerge"+eid+":"+sortedEditCmds);
		// logger.writeLog("Ediotr"+eid+" Merge:: sorted sent list => Eid="+eid+cmds+" ack="+prev);
		output.addAll(sortedEditCmds);
		// ACKが来たものは必ず先頭
		
		// unMerged command のdeleteのundo string は、この時点で使えない。
		// Editor 側から送り返して来たものを使う必要がある。
		unMergedCmds.clear();
		logger.writeLog("outputMerge"+eid+":"+output);
		return optimizedSend(editor,output);
	}

	/**
	 * Received all merge command ack
	 */
	public void endMerge() {
		sortedEditCmds = null;
		unMergedCmds = new LinkedList<REPCommand>();
	}
	/**
	 * Sent optimized merged command list
	 * @param editor 
	 * @param output
	 * @return if any sent commands output 
	 */
	public boolean optimizedSend(REPNode editor, LinkedList<REPCommand> output) {
		/*
		 * Optimized send の場合は、unMergedCommand のつじつまを合わせる必要がある。
		 */
		sentMergedList.clear();
		List<REPCommand> output1 = optimizer.optimize(output);
		if (output1.size()==0) {
			merge_mode = false;
			return false;
		}
		for(REPCommand c:output1) {
			REPCommand m = new REPCommand(c);
			m.setEID(REP.MERGE_EID.id);
			m.setSEQID(editor.seq());
			sentMergedList.addLast(m);
			editor.sendToEditor(m);
		}
		logger.writeLog("OptimizedOutputMerge"+eid+":"+sentMergedList);
		merge_mode = true;
		return true;
	}
	
	private REPCommand createUndo(REPCommand cmd){
		REPCommand retCmd = new REPCommand(cmd);
		if (cmd.cmd==REP.REPCMD_INSERT) {
			retCmd.cmd=REP.REPCMD_DELETE;
			retCmd.string="";
		}
		else if (cmd.cmd==REP.REPCMD_DELETE) retCmd.cmd=REP.REPCMD_INSERT;
		return retCmd;
	}

	class REPCommandComparator implements Comparator<REPCommand>{
		int base;
		REPCommandComparator(int base) {
			this.base = base;
		}
		public int compare(REPCommand o1, REPCommand o2) {
			int eid1 = o1.eid-base; if (eid1<0) eid1 += Integer.MAX_VALUE;
			int eid2 = o2.eid-base; if (eid2<0) eid2 += Integer.MAX_VALUE;
			if ( eid1<eid2 ) return -1;
			if ( eid1>eid2 ) return 1;
			if ( o1.seq<o2.seq ) return -1;
			if ( o1.seq>o2.seq ) return 1;
			// assert(false);  // this can happen in MergedAgain case
			return 0;
		}
	}
	
	/**
	 * Translate Command  that was received from SeMa.
	 * @param cmd the command to be translated.
	 * @return translated command.
	 */
	public void transReceiveCmd(REPNode nextEditor,REPCommand cmd){
		assert (cmd.eid != eid);
		unMergedCmds.addFirst(cmd);
	}

	public void setEid(int _eid){
		eid = _eid;
	}

	public boolean checkMergeConflict(REPCommand command) {
		unMergedCmds.addFirst(command);
		
		REPCommand prev = sentMergedList.getFirst();
		if (prev.seq==command.seq) {
			// logger.writeLog("Input eid="+eid+"SentMergedList = "+sentMergedList);
			sentMergedList.removeFirst();
		} 
		//  previous merge command may be returned

		if(sentMergedList.size()==0 && !mergeAgain) {
			merge_mode=false;
		}
		return mergeAgain;
	}

	public void getMergeAgain(REPNode editor) {
		if (sentMergedList.size()>0) return; //  wait for previous merge completion
		
		LinkedList<REPCommand> returnCommand = new LinkedList<REPCommand>();
		LinkedList<REPCommand> merge = new LinkedList<REPCommand>();
		LinkedList<REPCommand> conflict = new LinkedList<REPCommand>();
		for(REPCommand command : unMergedCmds) {
			returnCommand.add(createUndo(command));
		}
		for(REPCommand command : unMergedCmds) {
			if(command.eid == REP.MERGE_EID.id){
				merge.addLast(command);
				sortedEditCmds.add(command);
			}
		}
		for(REPCommand command : unMergedCmds){
			if(command.eid == eid){
				command.eid = REP.MERGE_EID.id;
				conflict.addLast(command);
			}
		}
		unMergedCmds.clear();
		returnCommand.addAll(merge);
		returnCommand.addAll(conflict);
		logger.writeLog("MergeAgain "+eid+" ret="+returnCommand.size());
		mergeAgain = false;
		optimizedSend(editor, returnCommand);
	}
//
//	public boolean isFinished() {
//		if(unMergedCmds.size() > 0) return false;
//		if(sentMergedList.size() > 0) return false;
//		return true;
//	}

	public boolean isMerging() {
		return merge_mode;
	}

	public void startMerge(REPCommand cmd) {
	}

	/**
	 *  receive SMCMD_START_MERGE_ACK
	 */
	public void mergeAck() {
		logger.writeLog("Editor"+eid+": START MERGE "+
				((unMergedCmds.size()>0)?" and top of unMergedCmds = "+ unMergedCmds.getLast():""));
		merge_mode = true;
	}




}