view src/remoteeditor/editors/REPEditor.java @ 213:bea1625524fe default tip

when you put and join to SessionManager, this plugin open a window for inputting address and port of SessionManager
author kazz
date Mon, 20 Dec 2010 14:14:41 +0900
parents 58c0e5c876b8
children
line wrap: on
line source

package remoteeditor.editors;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.util.LinkedList;
import rep.REP;
import rep.REPCommand;
import rep.REPCommandPacker;
import rep.channel.REPSelectionKey;
import rep.channel.REPSelector;
import rep.channel.REPSocketChannel;


public class REPEditor extends Thread implements REPTextListener{
	
	private REPSocketChannel<REPCommand> channel;
	REPSelector<REPCommand> selector;
	private boolean running = true;
	private boolean inputLock = false;
	private long timeout = 1;
	private int syncCounter = 0;
	private LinkedList<REPCommand> userCommand = new LinkedList<REPCommand>();
	private LinkedList<Runnable> runners = new LinkedList<Runnable>();
	private String name = "test";
	private int seq;
	private int eid;
	private int sid;
	private REPText repText;
	private boolean hasInputLock = true;
	private boolean master;
	private boolean syncEnable = true;
	private REPCommand quit = null;
	private String hostname = "localhost";
	private int port = 8766;
	
	public REPEditor(REPText repText, boolean master){
		this.repText = repText;
		this.master = master;
		if(master){
			userCommand.add(new REPCommand(REP.SMCMD_PUT,0,0,0,0,name +"-file"));
		}else{
			userCommand.add(new REPCommand(REP.SMCMD_JOIN, 0, 0, 0, 0, name));
		}

		repText.addTextListener(this);
	}
	
	public REPEditor(REPText repText, boolean master, String hostname, int port) {
		this(repText, master);
		this.hostname = hostname;
		this.port = port;
	}
	
	public void textDeleted(REPTextEvent event) {
		Logger.print(event.getText());
		addUserInput(new REPCommand(REP.REPCMD_DELETE_USER, 0, 0, 0, event.getLineno(), event.getText()));
	}

	public void textInserted(REPTextEvent event) {
		Logger.print(event.getText());
		addUserInput(new REPCommand(REP.REPCMD_INSERT_USER, 0, 0, 0, event.getLineno(), event.getText()));
	}

	private void addUserInput(final REPCommand command) {
		Runnable runner = new Runnable(){
			public void run(){
				userCommand.add(command);
				timeout = 1;
			}
		};
		synchronized(runners){
			runners.add(runner);
		}
		if(selector != null){
			selector.wakeup();
		}
	}
	
	public void run(){
		/*
		 * Create Socket and connect to the session manager
		 */
		try {
			channel = REPSocketChannel.<REPCommand>create(new REPCommandPacker());
		} catch (IOException e) {
			e.printStackTrace();
			return;
		}
		try {
			InetSocketAddress semaIP = new InetSocketAddress(hostname, port);
			while (!channel.connect(semaIP)){
				Logger.print("SeMa not listen to socket yet, wait");
			}
		} catch (IOException e) {
			e.printStackTrace();
			return;
		}
		/*
		 * Start editor main loop
		 *         public REPCommand(REP cmd,int sid,int eid, int seq, int lineno,  String string) 
		 */
		try {
			mainloop();
		} catch (IOException e) {
		}
	}

	/*
	 * Editor main loop with input lock
	 */
	void mainloop() throws IOException {
		
		channel.configureBlocking(false);
		selector = REPSelector.create();
		channel.register(selector, SelectionKey.OP_READ);
		while(running) {
			
			synchronized(runners){
				for(Runnable runner : runners){
					runner.run();
				}
				runners.clear();
			}
			
//			if(inputLock){
			if (repText.isMerging()) {
				// No user input during merge mode (optional)
				if (selector.select(0)>0) {
					handle(channel.read());
				}
				continue;
			} else if (selector.select(timeout)<=0) {
				if (syncCounter>0) {
					syncText(); // send the master editor buffer to clients. 
				}
				userInput();
			}
			// selector(timeout) returns 0, but it may contain readable channel..
			for(REPSelectionKey<REPCommand> key : selector.selectedKeys1()) {
				REPSocketChannel<REPCommand> ch = key.channel1();
				handle(ch.read());
			}
		}
	}



	private void userInput() {
		REPCommand command = userCommand.poll();
		if(command != null){
			switch(command.cmd){
			case REPCMD_DELETE_USER:
				sendCommand(command);
				break;
			case REPCMD_INSERT_USER:
				sendCommand(command);
				break;
			case SMCMD_PUT:
			case SMCMD_JOIN:
				sendCommand(command);
				break;
			}
		}else{
			if(syncCounter == 0){
				timeout = 0;
			}
		}
	}

	private void handle(REPCommand cmd) {
		if (cmd==null) return;
		switch(cmd.cmd) {
		case REPCMD_INSERT	:
			if (cmd.eid!=eid) {
				repText.insert(cmd.lineno, cmd.string);
			}
			forwardCommand(cmd);
			break;
		case REPCMD_DELETE	:
			if (cmd.eid!=eid) {
				String del=""; 
				if(cmd.lineno<repText.size()) {
					del = repText.delete(cmd.lineno);
				}
				cmd.setString(del);
			}
			forwardCommand(cmd);
			break;
		case REPCMD_NOP		:
		case REPCMD_MERGE_MARK		:
		case REPCMD_INSERT_ACK		:
		case REPCMD_DELETE_ACK		:
			forwardCommand(cmd);
			break;		 
		case REPCMD_CLOSE	:
		case REPCMD_CLOSE_2	:
			assert(false);
			break;

		case SMCMD_JOIN_ACK	:
			sid = cmd.sid;
			eid = cmd.eid;
			setName(name+eid);
			name += "(sid="+sid+")";
			inputLock = false;
			break;
		case SMCMD_PUT_ACK	:
			sid = cmd.sid;
			eid = cmd.eid;
			setName(name+eid);
			name += "(sid="+sid+")";
			inputLock = false;
			break;
		case SMCMD_QUIT		:
			if (cmd.eid!=eid)
				quit = cmd;
			else // eid=-1 means do not forward but send it.
				quit = new REPCommand(REP.SMCMD_QUIT_2, 
						sid, -1, seq, 0, "");
			timeout=1;
			if (quit.eid==-1)
				sendCommand(quit);
			else 
				forwardCommand(quit);
			quit=null;
			//close connection user
			
			break;
		case SMCMD_START_MERGE :
			// lock user input during merge (optional)
			inputLock = hasInputLock;
			cmd.cmd = REP.SMCMD_START_MERGE_ACK;
			sendCommand(cmd);
			break;
		case SMCMD_END_MERGE :
			inputLock = false;
			break;
			// master editor changes QUIT_2 to QUIT_2_ACK
			// Session manager should do this
		case SMCMD_QUIT_2 :
			if (cmd.eid!=eid) {
				forwardCommand(cmd);
			} else {
				cmd.cmd = REP.SMCMD_QUIT_2_ACK;
				sendCommand(cmd);
			}
			running = false;
			break;
		case SMCMD_SYNC:
			// start contents sync with newly joined editor
			cmd.cmd = REP.SMCMD_SYNC_ACK;
			forwardCommand(cmd);
			//if (cmd.eid==eid) {
			if (master && syncEnable ) {
				syncCounter = 1;
				timeout = 1;
			}
			break;
		default:
			assert(false);
		break;
		}
	}
	
	private void forwardCommand(REPCommand command) {
		REPCommand cmd = new REPCommand(command);
		channel.write(cmd);
	}

	private void sendCommand(REPCommand command) {
		REPCommand cmd = new REPCommand(command);
		cmd.setSEQID(seq++);
		cmd.setEID(eid);
		cmd.setSID(sid);
		channel.write(cmd);
	}

	private void syncText() {
		Logger.print();
		if(syncCounter>repText.size()){
			syncCounter = 0;
		}else {
			if(inputLock) return;
			int i = syncCounter - 1;
			REPCommand del = new REPCommand(REP.REPCMD_DELETE_USER, sid, eid, 0, i, repText.get(i));
			REPCommand ins = new REPCommand(REP.REPCMD_INSERT_USER, sid, eid, 0, i, repText.get(i));
			sendCommand(del);
			sendCommand(ins);
			syncCounter++;
		}
	}

}