view rep/handler/Forwarder.java @ 401:2cf5392b2a9f

add INSERT_USER, DELETE_USER JOIN_ACK fix
author one
date Tue, 25 Nov 2008 03:07:05 +0900
parents 19705f4b8015
children 8009dd7b2013
line wrap: on
line source

package rep.handler;

import java.io.IOException;

import rep.PacketSet;
import rep.REP;
import rep.REPCommand;
import rep.Session;
import rep.SessionManager;
import rep.channel.REPLogger;
import rep.channel.REPSelectionKey;
import rep.channel.REPSocketChannel;

/**
 * @author kono
 *    Forward Editor Command to the other Session Manager
 *    Basic send API is supported.
 */
public class Forwarder extends REPNode  {
	int seq = 0;
	// REPCommands we sent to the next editor
	final int limit=100; // debugging purpose, assert check only
	final REPLogger ns = REPLogger.singleton();

	public Forwarder(SessionManager manager,
			REPSocketChannel<REPCommand> channel) {
		super(manager,channel);
	}

	public Forwarder(int editorNo, SessionManager manager,
			REPSocketChannel<REPCommand> channel) {
		super(editorNo,manager,channel);
	}

	public int seq() {
		return seq++;
	}

	@Override
	public void send(REPCommand command) {
		assert(command!=null);
		assert(channel!=null);
		REPCommand c = new REPCommand(command);
		manager.addWriteQueue(new PacketSet(channel,null,  c));
	}
	
	public void sendWithSeq(REPCommand command) {
		assert(command!=null);
		assert(channel!=null);
		REPCommand c = new REPCommand(command);
		c.setSEQID(seq());
		manager.addWriteQueue(new PacketSet(channel,null,  c));
	}
	
	public REPSocketChannel<REPCommand> getChannel() {
		return channel;
	}
	
	public void setChannel(REPSocketChannel<REPCommand> channel) {
		this.channel = channel;
	}

	public void setQuit2(REPCommand cmd) {
		send(cmd);
	}

	public void setNext(Forwarder next) {
		this.next = next;
	}
	
	public REPNode getNextForwarder() {
		return next;
	}

	public boolean manage(REPCommand command) {
		assert false;
		return true;
	}

	public String toString(){
		return ("Forwarder:" + channel);
	}
	
	public String getLocalHostName() {
		return channel.getLocalHostName();
	}

	@Override
	public void cancel(REPSocketChannel<REPCommand> socketChannel) {
		manager.remove(socketChannel);
	}

	@Override
	public void handle(REPCommand command, REPSelectionKey<REPCommand> key) throws IOException {
		assert false;
	}


	public boolean isMerging() {
		return false;
	}

	/* 
	 * Connect to the session which has a active connection (forwarder or
	 * editor) 
	 */
	public void selectSession(REPCommand sendCommand,Session session) {
		// we have a selected session.
		if (isDirect()&&getEID()==sendCommand.eid) {
			// Found directly connected joined editor. Send join_ack().
			// we have one more point to send JOIN_ACK to the editor.
			session.addForwarder(this);
			joinAck(sendCommand, session.getSID());
		} else {
			// We have a session, but joined editor is on the other sm.
			// SELECT_ACK is sent to the session ring to
			// find out the joined editor to send JOIN_ACK.
			sendCommand.setCMD(REP.SMCMD_SELECT_ACK);
		}
		sendCommand.string = session.getName();
		send(sendCommand);

	}

	@Override
	public void joinAck(REPCommand sendCommand, int sid) {
		manager.sendUpdate(sid);
		sendCommand.setCMD(REP.SMCMD_JOIN_ACK);
		send(sendCommand);
		if (manager.sync) {
			REPCommand sync = new REPCommand(sendCommand);
			sync.setCMD(REP.SMCMD_SYNC);
			sendSync(sync);
		}
	}

	@Override
	public void forwardedCommandManage(REPCommand command) {
		
	}

	@Override
	public void checkWaitingCommandInMerge() {
		
	}

	
	
	
	
}