view test/editortest/TestEditor2.java @ 416:b7f42fc75a36

(no commit message)
author one
date Wed, 31 Dec 2008 14:47:39 +0900
parents 648c676bf9df
children
line wrap: on
line source

package test.editortest;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.util.LinkedList;
import java.util.List;

import rep.REP;
import rep.REPCommand;
import rep.REPCommandPacker;
import rep.SessionManager;
import rep.channel.REPLogger;
import rep.channel.REPSelectionKey;
import rep.channel.REPSelector;
import rep.channel.REPSocketChannel;
import test.Text;


/**
 * @author kono
 *	Basic Remote Editor client implementation
 *     should support multi-session
 *     currently multi-session requires new channel, that is
 *     only one session for this editor.
 */
public class TestEditor2 extends Thread{
	private InetSocketAddress semaIP;
	private REPLogger ns;
	private int seq = 0;
	public Text text;
	public LinkedList<REPCommand> cmds;
	private int eid = 0;
	private int sid = 0;
	REPSocketChannel<REPCommand> channel;
	REPCommand nop = new REPCommand(REP.REPCMD_NOP, 0, 0, 0, 0, "");
	boolean running = true;
	long timeout = 1;
	private String name;
	private boolean inputLock=false;
	public boolean detached=false;
	public boolean master=false;
	REPCommand quit=null;
	private int syncCounter=0;
	private boolean hasInputLock=false;
	private int port;
	private REPSelector<REPCommand> selector;
	private boolean syncEnable=true;
	private LogTarget logTarget;
	private REPText repText;
	private REPSimpleEditor simpleEditor;


	public TestEditor2(String name, String _host,int _port, boolean master){
		super(name);
		LinkedList<REPCommand>cmdList = new LinkedList<REPCommand>();
		String[] txts = {
			"aaa", "bbb", // "ccc", "ddd", "eee",
		};	
		port = _port;
		semaIP = new InetSocketAddress(_host, _port);
		ns = REPLogger.singleton();
		this.name = name;
		cmds = cmdList;
		if (master) {
			this.master=true;
			text = new Text(txts);
			cmds.add(new REPCommand(REP.SMCMD_PUT,0,0,0,0,name+"-file"));
			cmds.add(new REPCommand(REP.REPCMD_INSERT_USER,0,0,0,0,"m0"));
			cmds.add(new REPCommand(REP.REPCMD_DELETE_USER,0,0,0,0,"m0"));
			cmds.add(new REPCommand(REP.SMCMD_QUIT,0,0,0,0,""));
		} else {
			text = new Text(new String[0]);
			cmds.add(new REPCommand(REP.SMCMD_JOIN,0,0,0,0,name));
			cmds.add(new REPCommand(REP.REPCMD_INSERT_USER,0,0,0,0,"c0"));
			cmds.add(new REPCommand(REP.REPCMD_DELETE_USER,0,0,0,0,"c0"));
		}
		
		simpleEditor = new REPSimpleEditor();
		simpleEditor.setVisible(true);
		simpleEditor.setButtonEnabled(false);
		simpleEditor.setTestEditor(this);
		simpleEditor.setText(text);
		repText = simpleEditor.getREPText();
		logTarget = simpleEditor;
	}
	
	public TestEditor2(String name, String _host,int _port, boolean master,
			String[] txts,LinkedList<REPCommand> cmdList){
		super(name);
		port = _port;
		semaIP = new InetSocketAddress(_host, _port);
		ns = REPLogger.singleton();
		this.name = name;
		cmds = cmdList;
		if (master) {
			this.master=true;
			text = new Text(txts);
		} else {
			text = new Text(new String[0]);
		}
	}

	public void run(){
		/*
		 * Create Socket and connect to the session manager
		 */
		try {
			channel = REPSocketChannel.<REPCommand>create(new REPCommandPacker());
		} catch (IOException e) {	return;	}

		ns.writeLog("try to connect to SessionManager whose ip is "+semaIP+" "+name, 1);
		try {
			while (!channel.connect(semaIP)){
				ns.writeLog("SeMa not listen to socket yet, wait", 1);
			}
		} catch (IOException e) { return; }
		ns.writeLog("successes to connect "+name);
		/*
		 * Start editor main loop
		 *         public REPCommand(REP cmd,int sid,int eid, int seq, int lineno,  String string) 
		 */
		try {
			mainloop();
		} catch (IOException e) {
		}
	}

	/*
	 * Editor main loop with input lock
	 */
	private void mainloop() throws IOException {
		
		channel.configureBlocking(false);
		REPSelector<REPCommand> selector = REPSelector.create();
		channel.register(selector, SelectionKey.OP_READ);
		this.selector = selector;
		while(running) {
			if (inputLock) {
				// No user input during merge mode (optional)
				if (selector.select(0)>0) {
					handle(channel.read());
				}
				continue;
			} else if (selector.select(timeout)<=0) {
				if (syncCounter>0) {
					syncText(); // send the master editor buffer to clients. 
				}
				userInput();
			}
			// selector(timeout) returns 0, but it may contain readable channel..
			for(REPSelectionKey<REPCommand> key : selector.selectedKeys1()) {
				REPSocketChannel<REPCommand> ch = key.channel1();
				handle(ch.read());
			}
		}
	}

	private void syncText() {
		/*
		 * Send delete/insert one at a time to synchronize
		 * all clients. SYNC is requested by the session manager.
		 */
		if (syncCounter>text.size()) {
			SessionManager.logger.writeLog("Sync Completed.");
			syncCounter=0;
		} else {
			if (inputLock) return;
			int i=syncCounter-1;
			REPCommand del = new REPCommand(REP.REPCMD_DELETE_USER,sid,eid,0,i, text.get(i));
			REPCommand ins = new REPCommand(REP.REPCMD_INSERT_USER,sid,eid,0,i, text.get(i));
			sendCommand(del);
			sendCommand(ins);
			syncCounter++;
		}
	}

	/*
	 * Simulate User Input
	 */
	private void userInput() {
		REPCommand cmd = cmds.poll();
		if (cmd!=null) {
			Logger.print(logTarget, "User Input : " + cmd);
			switch(cmd.cmd) {
			case REPCMD_INSERT_USER:
				text.insert(cmd.lineno, cmd.string);
				repText.insert(cmd.lineno, cmd.string);
				sendCommand(cmd);
				break;
			case REPCMD_DELETE_USER:
				String del = text.delete(cmd.lineno);
				del = repText.delete(cmd.lineno);
				cmd.setString(del);
				sendCommand(cmd);
				break;
			case SMCMD_QUIT:
				/*
				 * start termination phase 1 by the master editor.
				 * after this command do not send any user input.
				 * clients simply disconnect from the session manager.
				 */
				cmds.clear();
				cmd.eid = -1;
				quit = cmd;
				break;
			case SMCMD_JOIN:
			case SMCMD_PUT:
				sendCommand(cmd);
				/*
				 * To prevent confusion, stop user input until the ack
				 */
				inputLock = true; // wait until ACK
				break;
			default:
				assert(false);
			}
		} else {
			if(syncCounter==0) {
			// no more command to send, and we don't have syncCounter
				timeout = 0;
				if (quit!=null) {
					if (quit.eid==-1)
						sendCommand(quit);
					else 
						forwardCommand(quit);
					quit=null;
				}
			}
		}
	}


	private void sendCommand(REPCommand cmd1) {
		REPCommand cmd = new REPCommand(cmd1);
		cmd.setSEQID(seq++);
		cmd.setEID(eid);
		cmd.setSID(sid);
		ns.writeLog(name +" send "+cmd);
		channel.write(cmd);
	}

	private void forwardCommand(REPCommand cmd1) {
		REPCommand cmd = new REPCommand(cmd1);
		ns.writeLog(name +" forward "+cmd);
		//Logger.print(logTarget, "write : " + cmd);
		channel.write(cmd);
	}

	private void handle(REPCommand cmd) {
		if (cmd==null) return;
		ns.writeLog(name +": read "+cmd + " textsize="+text.size());
		Logger.print(logTarget, "read : " + cmd);
		switch(cmd.cmd) {
		case REPCMD_INSERT	:
			if (cmd.eid!=eid) {
				text.insert(cmd.lineno, cmd.string);
				repText.insert(cmd.lineno, cmd.string);
			}
			forwardCommand(cmd);
			break;
		case REPCMD_DELETE	:
			if (cmd.eid!=eid) {
				String del=""; 
				if(cmd.lineno<text.size()) {
					del = text.delete(cmd.lineno);
					del = repText.delete(cmd.lineno);
				}
				cmd.setString(del);
			}
			forwardCommand(cmd);
			break;
		case REPCMD_NOP		:
		case REPCMD_INSERT_ACK		:
		case REPCMD_DELETE_ACK		:
			forwardCommand(cmd);
			break;		 
		case REPCMD_CLOSE	:
		case REPCMD_CLOSE_2	:
			assert(false);
			break;

		case SMCMD_JOIN_ACK	:
			sid = cmd.sid;
			eid = cmd.eid;
			name += "(eid="+eid+",sid="+sid+")";
			inputLock = false;
			break;
		case SMCMD_PUT_ACK	:
			sid = cmd.sid;
			eid = cmd.eid;
			name += "(eid="+eid+",sid="+sid+")";
			inputLock = false;
			break;
		case SMCMD_QUIT		:
			if (cmd.eid!=eid)
				quit = cmd;
			else // eid=-1 means do not forward but send it.
				quit = new REPCommand(REP.SMCMD_QUIT_2, 
						sid, -1, seq, 0, "");
			timeout=1;
			// stop input processing after this command
			cmds.clear();
			break;
		case SMCMD_START_MERGE :
			// lock user input during merge (optional)
			inputLock = hasInputLock;
			cmd.cmd = REP.SMCMD_START_MERGE_ACK;
			sendCommand(cmd);
			break;
		case SMCMD_END_MERGE :
			inputLock = false;
			break;
		case SMCMD_QUIT_2 :
			if (cmd.eid!=eid) {
				forwardCommand(cmd);
			} else {
				cmd.cmd = REP.SMCMD_QUIT_2_ACK;
				sendCommand(cmd);
			}
			running = false;
			break;
		case SMCMD_SYNC:
			// start contents sync with newly joined editor
			cmd.cmd = REP.SMCMD_SYNC_ACK;
			forwardCommand(cmd);
			//if (cmd.eid==eid) {
			if (master && syncEnable ) {
				syncCounter = 1;
				timeout = 1;
			}
			break;
		default:
			assert(false);
		break;
		}
	}


	public int getPort() {
		return port;
	}

	public synchronized void setCommand(LinkedList<REPCommand> cmds) {
		this.cmds = cmds;
		timeout=1;
		if(selector!=null) selector.wakeup();
	}

	public void setText(List<String> list) {
		text = new Text(list);
	}

	public void setLogTarget(LogTarget target) {
		logTarget = target;
	}

	public void setREPText(REPText repText) {
		this.repText = repText;
	}

	public void addUserInput(REPCommand command) {
		cmds.add(command);
		if(selector!=null) selector.wakeup();
	}
}