view rep/Editor.java @ 364:c965ef2b5fd6

*** empty log message ***
author kono
date Mon, 20 Oct 2008 13:44:34 +0900
parents 034acadc0cdc
children e391433fa9f1
line wrap: on
line source

package rep;

import java.io.IOException;
import java.util.LinkedList;
import java.util.List;

import rep.channel.REPSelectionKey;
import rep.channel.REPSocketChannel;
import rep.handler.PacketSet;
import rep.optimizers.*;
import rep.translator.Translator;
import rep.translator.TranslatorImpl;

public class Editor extends Forwarder {

	private Translator translator;
	private List<REPCommand> sentList = new LinkedList<REPCommand>();
	// REPCommands we are going to send to the next editor
	//private REPCommand quit2 = null;
	private SessionManager manager;
	private REPCommand quit2=null;

	public Editor(SessionManager manager,boolean doOptimize,int editorNo){
		super(manager);
		this.manager = manager;
		eid = editorNo;
		REPCommandOptimizer optimizer;
		if (doOptimize) optimizer = new DeleteInsertOptimizer(); //タカノがつくったおぷてぃまいざ
		else            optimizer = new NullOptimizer();         //なにもしないけどOptimizer.
		translator = new TranslatorImpl(eid,optimizer);
		
	}

	public Editor(SessionManager manager,int editorNo, REPSocketChannel<REPCommand> channel){
		this(manager,false,editorNo);
		this.channel = channel;
	}

	public void translate(REPCommand command){

		if(command.eid == eid){
			//エディタからの新たな編集コマンド
			if (next==this) return; // singleton case
			translator.transSendCmd(command);
			sentList.add(new REPCommand(command));
			assert(sentList.size()<limit);
			next.send(command);
			return;
		}else if(command.eid == REP.MERGE_EID.id){
			//マージコマンドが返ってきた
			if(translator.checkMergeConflict(command)){
				//マージ中にエディタからの割り込みがあった場合
				translator.getMergeAgain(this);
			}
			endMerge();
		}else if(command.eid == next.getEID()){
			// 次のEditorで一周するコマンドが来た
			if(next==this) return; // singleton case
			((Editor) next).checkReturnedCommand(command);
		} else {
			//他のエディタからの編集コマンド
			assert (command.eid!=REP.MERGE_EID.id && command.eid!=eid );
			if (manager.hasWaitingCommand(channel)) {
				// We cannot do this operation before watingCommandQueue.
				manager.addWaitingCommand(new PacketSet(channel, this, command));
				return;
			}		
			if(!isMerging()) {
				translator.transReceiveCmd(next,command);
				return;
			}
			manager.addWaitingCommand(new PacketSet(getChannel(), this, new REPCommand(command)));
		}
		return;
	}

	boolean merge(REPCommand command) {
		//マージして送信
		return translator.catchOwnCommand(this);
	}

	void checkReturnedCommand(REPCommand command) {
		REPCommand prev = sentList.remove(0);
		if (prev==null || prev.seq != command.seq || prev.eid!=command.eid) {
			String err = "Editor.checkReturnedCommand() : command = " + command + " prev=";
			err += prev==null?"null":prev.toString();
			SessionManager.logger.writeLog(err);
			assert(false);
		}

		// START_MERGE を送る
		REPCommand cmd = new REPCommand(REP.SMCMD_START_MERGE,command.sid,REP.SM_EID.id,seq(),0,"");
		send(cmd);
		// Session Manager 側で、このeditorへの他のeditorからの
		// 入力を止めて、merge にそなえる。merge は、eidtor 側から
		// ACKが来てから始まる。
		translator.startMerge(cmd);
		return;
	}

	@Override
	public void setQuit2(REPCommand cmd) {
		quit2 = cmd;
		checkQuit();
		// do not send quit2 until we received all pending
		// command
	}

	@Override
	public void setEID(int eid) {
		this.eid = eid;
		translator.setEid(eid);
	}
	
	public String toString(){
		return ("Editor eid="+eid+" sid="+sid+" " + host  + ":" + file);
	}

	public boolean isMerging() {
		return translator.isMerging();
	}

	

	void endMerge() {
		if(translator.isMerging()) return;
		REPCommand mergeEnd = new REPCommand(REP.SMCMD_END_MERGE,eid,sid,seq(),0,"");
		send(mergeEnd);				
		if (quit2!=null) checkQuit();
	}

	private boolean checkQuit() {
		if (sentList.size()==0&&!isMerging()) {
			send(quit2);
			quit2 = null;
			return true;
		}
		return false;
	}

	@Override
	public boolean manage(REPCommand receivedCommand) {
		
		
		switch(receivedCommand.cmd){
		// Editor Command
		
		case REPCMD_DELETE:
		case REPCMD_INSERT:
		case REPCMD_NOP:
		{
			translate(receivedCommand);
			break;
		}

		case SMCMD_START_MERGE_ACK:
		{
			// マージの処理と次のエディタへコマンドを送信する処理
			translator.mergeAck();
			if (!merge(receivedCommand)) {
				// nothing to do, send END_MERGE
				endMerge();
			}
			break;
		}
		case SMCMD_QUIT:
		{
			next.send(receivedCommand);
			break;
		}
		case SMCMD_QUIT_2:
		{
			// QUIT_2 is returned.
			if (receivedCommand.eid!=eid) {
				// stop this editor unless this is the start, starter will stopped
				// by QUIT_2_ACK
				manager.remove(this);
			}
			// don't send quit_2 directly to the editor until all pending
			// merge is processed.
			next.setQuit2(receivedCommand);
			break;
		}
		case SMCMD_QUIT_2_ACK:
		{
			manager.remove(this);
			break;
		}
		default:
			return false;
		}
		return true;
	}
	

	@Override
	public void handle(REPSelectionKey<REPCommand> key) throws IOException {
		REPSocketChannel<REPCommand> channel = key.channel1();
		REPCommand command = channel.read();
		SessionManager.logger.writeLog("REPHandlerImpl.handle() read : command = " + command +" from "+channel);
		if (manager.sessionManage(this, command)) return;
		manage(command);
	}

	@Override
	public void cancel(REPSocketChannel<REPCommand> socketChannel) {
		manager.remove(socketChannel);
	}

	public boolean isMaster() {
		return mode==REP.SMCMD_PUT;
	}
}