view rep/SessionManager.java @ 146:20beee6ca31a

*** empty log message ***
author pin
date Wed, 27 Aug 2008 23:14:39 +0900
parents 0bf7f8d0f5f7
children 6a5fe529b192
line wrap: on
line source

package rep;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;

import rep.channel.REPServerSocketChannel;
import rep.channel.REPSocketChannel;
import rep.channel.SelectionKeySimulator;
import rep.channel.SelectorSimulator;
import rep.handler.PacketSet;
import rep.handler.REPHandler;
import rep.simulator.REPSelector;
import rep.xml.SessionXMLDecoder;
import rep.xml.SessionXMLEncoder;

//+-------+--------+--------+-------+--------+---------+------+
//| cmd   | session| editor | seqid | lineno | textsiz | text |
//|       | id     | id     |       |        |         |      |
//+-------+--------+--------+-------+--------+---------+------+
//o-------header section (network order)-------------o
/*int cmd;	// command
int sid;	// session ID : uniqu to editing file
int eid;	// editor ID : owner editor ID = 1。Session に対して unique
int seqno;	// Sequence number : sequence number はエディタごとに管理
int lineno;	// line number
int textsize;   // textsize : bytesize
byte[] text;*/

public class SessionManager implements ConnectionListener, REPActionListener{
	
	
	private SessionList sessionlist;
	private List<Session> sessionList;
	private SessionManagerGUI gui;
	private Selector selector;
	private SessionManagerList smList;
	private String myHost;
	private boolean isMaster = true;
	private EditorList  ownEditorList;
	private List<Editor> editorList;
	private String maxHost;
	//private Set<SelectionKey> sessionList;
	private static int temp_port;
	private static int send_port;
	
	static final int DEFAULT_PORT = 8766;
	
	public SessionManager(int port) {
		gui = new SessionManagerGUI();
	}
	
	public void openSelector() throws IOException{
		selector = REPSelector.open();
	}
	
	public void mainLoop(int port) throws InterruptedException, IOException {
	
		REPServerSocketChannel<REPCommand> ssc = new REPServerSocketChannel<REPCommand>().create();
		
		ssc.configureBlocking(false);	//reuse address 必須
		
		ssc.socket().setReuseAddress(true);
		
		ssc.socket().bind(new InetSocketAddress(port));
		ssc.register(selector, SelectionKey.OP_ACCEPT);

		
		sessionlist = new SessionList();
		sessionList = new LinkedList<Session>();
		smList = new SessionManagerList();
		ownEditorList = new EditorList();
		editorList = new LinkedList<Editor>();
		
		while(true){
			selector.select();
			select();
		}
	}

	private void select() throws IOException {
		for(SelectionKeySimulator key : selector.selectedKeys()){
			if(key.isAcceptable()){
				/*** serverChannelはenableになったSelectionKeyのchannel ***/
				REPServerSocketChannel serverChannel = (REPServerSocketChannel)key.channel();
				REPSocketChannel channel = serverChannel.accept();	//keyからchannelを取って、accept 
				registerChannel (selector, channel, SelectionKey.OP_READ);
				channel = null;

				
			}else if(key.isReadable()){
				
//				REPSocketChannel<REPCommand> channel = (REPSocketChannel<REPCommand>)key.channel();
//				REPPacketReceive receive = new REPPacketReceive(channel);
//				receive.setkey(key);
//				REPCommand receivedCommand = receive.unpackUConv();
//				manage(channel, receivedCommand);
				
				REPHandler handler = (REPHandler)key.attachment();
				handler.handle(key);
				
			}else if(key.isConnectable()){
				System.out.println("Connectable");
			}
		}
	}
	
	private synchronized void registerChannel(Selector selector, SelectableChannel channel, int ops) throws IOException {
		if(channel == null) {
			return;
		}
		channel.configureBlocking(false);
		selector.wakeup();
		channel.register(selector, ops);
	}

	public void manage(REPSocketChannel<REPCommand> channel, REPCommand receivedCommand) {
		if(receivedCommand == null) return;
		//Editor editor;
		Session session;
		REPCommand sendCommand = new REPCommand(receivedCommand);
		REPPacketSend send = new REPPacketSend(channel);
		//SessionXMLEncoder encoder = new SessionXMLEncoder();

		switch(receivedCommand.cmd){

		case REP.SMCMD_JOIN:
//			editor = new Editor(channel);
//			editor.setHost(myHost);
//			int tempeid = ownEditorList.addEditor(editor);
//			gui.setComboEditor(tempeid, channel);
			
			editorList.add(new Editor(editorList.size(), channel));

			break;

		case REP.SMCMD_JOIN_ACK:

			break;

		case REP.SMCMD_PUT:
//			editor = new Editor(channel);
//			editor.setHost(myHost);
//			ownEditorList.addEditor(editor);
//			editor.setEID(1);
//			editor.setName(receivedCommand.string);
//			session = new Session(editor);
//			session.setOwner(true);
//			session.addEditor(editor);
//			sessionlist.addSession(session);
//			gui.setComboSession(session.getSID(), session.getName());
//			gui.setComboEditor(editor.getEID(), editor.getChannel());
//			session.addToRoutingTable(editor);
//			sendCommand.setCMD(REP.SMCMD_PUT_ACK);
//			sendCommand.setEID(1);
//			sendCommand.setSID(session.getSID());
//			editor.send(sendCommand);

//			SessionXMLEncoder sessionEncoder = new SessionXMLEncoder(session);
//			REPCommand command = new REPCommand();
//			command.setSID(session.getSID());
//			command.setString(sessionEncoder.sessionListToXML());

//			command.setCMD(REP.SMCMD_UPDATE);
//			smList.sendExcept(channel, command);

			//エディタのリストに追加
			editorList.add(new Editor(editorList.size(), channel));

			//Sessionを生成
			int sid = sessionList.size();
			sessionList.add(new Session(sid, new Editor(0, channel)));

			break;

		case REP.SMCMD_SELECT:
//			editor = new Editor(channel);
//
//			session = sessionlist.getSession(receivedCommand.sid);
//
//			if(session.isOwner()){
//				int eid2 = session.addEditor(editor);
//				editor.setEID(eid2);
//				sendCommand.setCMD(REP.SMCMD_SELECT_ACK);
//				sendCommand.setEID(eid2);
//				send.send(sendCommand);
//			}else {
//				Editor master = session.getMaster();
//				master.send(receivedCommand);
//				session.addEditor(editor);
//			}
			
			Editor editor = getEditor(channel);
			sessionList.get(receivedCommand.sid).addEditor(editor);

			break;

		case REP.SMCMD_SELECT_ACK:

			String hostport = receivedCommand.string;
			Editor editor2 = ownEditorList.getEditor(hostport);
			if(editor2 != null) {
				REPCommand command2 = new REPCommand();
				command2.setCMD(REP.SMCMD_JOIN_ACK);
				command2.setSID(receivedCommand.sid);
				command2.setEID(receivedCommand.eid);
				editor2.send(command2);
			}else{
				smList.sendExcept(channel, receivedCommand);
			}

			//receivedCommand.setCMD(REP.SMCMD_JOIN_ACK);
			//receivedCommand.setEID(receivedCommand.eid);
			//session = sessionlist.getSession(receivedCommand.sid);
			//session.sendToEditor(receivedCommand);
			//Editor editor3 = session3.getEditorList().get(0);
			//REPPacketSend send = new REPPacketSend(editor3.getChannel());
			//send.send(repCmd);
			break;

		case REP.SMCMD_SM_JOIN:

			//SessionManagerのリストへ追加
			smList.add(channel);

			//XMLからSessionListオブジェクトを生成する。
			SessionXMLDecoder decoder = new SessionXMLDecoder();
			SessionList receivedSessionList = decoder.decode(receivedCommand.string);

			//SessionListへ追加し変換テーブルを生成する。
			sessionlist.update(channel, receivedSessionList);

			//myHost を設定。
			if(myHost == null) setMyHostName(getLocalHostName(channel));

			//maxHost を設定。
			if(setMaxHost(channel, receivedSessionList.getMaxHost())){
				sendCommand = new REPCommand();
				sendCommand.setCMD(REP.SMCMD_CH_MASTER);
				sendCommand.setString(maxHost);
				smList.sendExcept(channel, sendCommand);
			}

			//SessionListからXMLを生成。
			//joinしてきたSessionManagerに対してACKを送信。
			SessionXMLEncoder sessionlistEncoder = new SessionXMLEncoder(sessionlist);
			sendCommand = new REPCommand();
			sendCommand.setCMD(REP.SMCMD_SM_JOIN_ACK);
			sendCommand.setString(sessionlistEncoder.sessionListToXML());
			send.send(sendCommand);

			//その他の SessionManager に対して SMCMD_UPDATEを 送信。
			sendCommand = new REPCommand();
			sendCommand.setCMD(REP.SMCMD_UPDATE);
			sendCommand.setString(receivedCommand.string);
			smList.sendExcept(channel, sendCommand);

			//その他のSessionManagerに対してSMCMD_SM_JOINを送信。
			//sendCommand = new REPCommand();
			//sendCommand.setCMD(REP.SMCMD_SM_JOIN);
			//sendCommand.setString(receivedCommand.string);
			//smList.sendExcept(channel, sendCommand);

			if(isMaster){
			}else {
			}

			break;

		case REP.SMCMD_SM_JOIN_ACK:

			//XMLからSessionListオブジェクトを生成。
			SessionXMLDecoder decoder2 = new SessionXMLDecoder();
			SessionList receivedSessionList2 = decoder2.decode(receivedCommand.string);

			//maxHostを決定。
			if(setMaxHost(channel, receivedSessionList2.getMaxHost())){
				sendCommand = new REPCommand();
				sendCommand.setCMD(REP.SMCMD_CH_MASTER);
				sendCommand.setString(maxHost);
				smList.sendExcept(channel, sendCommand);
			}

			if(isMaster){
			}else{
			}

			break;

		case REP.SMCMD_UPDATE:

			SessionXMLDecoder decoder3 = new SessionXMLDecoder();
			SessionList receivedSessionList3 = decoder3.decode(receivedCommand.string);

			//SessionListへ追加し変換テーブルを生成する。
			sessionlist.update(channel, receivedSessionList3);

			smList.sendExcept(channel, receivedCommand);

			for(Session session3 : receivedSessionList3.getList()){
				gui.setComboSession(session3.getSID(), session3.getName());
			}

			//SessionのownerのEditor
			//editor = new Editor(channel);
			//editor.setName(receivedCommand.string);



			//session = new Session(editor);
			//session.addEditor(editor);

			//sessionlist.addSession(session);

			//gui.setComboSession(session.getSID(), session.getName());

			//if(isMaster){
			//	receivedCommand.setCMD(REP.SMCMD_UPDATE_ACK);
			//	smList.sendToSlave(receivedCommand);
			//}else{
			//	receivedCommand.setCMD(REP.SMCMD_UPDATE);
			//	smList.sendToMaster(receivedCommand);
			//}
			break;

		case REP.SMCMD_UPDATE_ACK:
			if(receivedCommand.sid > sessionlist.getList().size()){
				editor = new Editor(channel);
				editor.setName(receivedCommand.string);

				session = new Session(editor);
				session.addEditor(editor);

				sessionlist.addSession(session);

				gui.setComboSession(session.getSID(), session.getName());
			}
			smList.sendToSlave(receivedCommand);
			break;

//			case REP.REPCMD_READ:
//			//sessionlist.sendCmd(channel, repCmd);
//			break;

		case REP.SMCMD_CH_MASTER:
			//maxHost を設定。
			if(setMaxHost(channel, receivedCommand.string)){
				sendCommand = new REPCommand();
				sendCommand.setCMD(REP.SMCMD_CH_MASTER);
				sendCommand.setString(maxHost);
				smList.sendExcept(channel, sendCommand);
			}
			break;

		case REP.SMCMD_GET_UNDO_ACK:
			editor = ownEditorList.getEditor(channel);
			break;

		default:
			//sid から Session を取得
			session = getSession(receivedCommand.sid);
			//マージの処理と次のエディタへコマンドを送信する処理
			session.translate(channel, receivedCommand);
		break;
		}
	}

	private Editor getEditor(REPSocketChannel<REPCommand> channel) {
		// TODO Auto-generated method stub
		for(Editor editor : editorList){
			if(editor.getChannel() == channel){
				return editor;
			}
		}
		return null;
	}

	private Session getSession(int sid) {
		for(Session session : sessionList){
			if(session.getSID() == sid) return session;
		}
		return null;
	}

	private boolean setMaxHost(REPSocketChannel channel, String maxHost2) {
		// TODO Auto-generated method stub
		return false;
	}

	private int reverseCmd(int cmd) {
		int kindOfCmd = 0;
		switch(cmd){
		case REP.REPCMD_INSERT:
			kindOfCmd = REP.REPCMD_DELETE;
			break;
		case REP.REPCMD_DELETE:
			kindOfCmd = REP.REPCMD_INSERT;
			break;
		case REP.REPCMD_REPLACE:
			kindOfCmd = REP.REPCMD_REPLACE;
			break;
		}
		return kindOfCmd;
	}



	private void setMyHostName(String localHostName) {
		myHost = localHostName + temp_port;
		if(maxHost == null) {
			maxHost = myHost;
			sessionlist.setMaxHost(maxHost);
		}
		ownEditorList.setHost(myHost);
	}

	private void setMaster(boolean b, REPSocketChannel channel) {
		isMaster = b;
		System.out.println("isMaster = " + b);
		smList.setMaster(channel);
	}

	public static void main(String[] args) throws InterruptedException, IOException {
		int port = DEFAULT_PORT;
		int port_s = DEFAULT_PORT;
		//System.setProperty("file.encoding", "UTF-8");
		if(args.length > 0){
			port = Integer.parseInt(args[0]);
			port_s = Integer.parseInt(args[1]);
		}
		temp_port = port;
		send_port = port_s;
		SessionManager sm = new SessionManager(port);
		sm.openSelector();
		sm.openWindow();
		sm.mainLoop(port);
	}

	private void openWindow() {
		Thread th = new Thread( gui ); 
		th.start();
		//System.out.println(sessionmanagerGUI.toString());
		gui.addConnectionListener(this);
		gui.addREPActionListener(this);
	}

	private void connectSession(String host) {
		int port = DEFAULT_PORT;
		port = send_port;
		InetSocketAddress addr = new InetSocketAddress(host, port);
		try {
			REPSocketChannel sessionchannel = new REPSocketChannel<REPCommand>().create();
			sessionchannel.configureBlocking(true);
			sessionchannel.connect(addr);
			while(!sessionchannel.finishConnect()){
				System.out.print("test afro");
			}
			System.out.println("");
			registerChannel(selector, sessionchannel, SelectionKey.OP_READ);
			
			sm_join(sessionchannel);
			
		}catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	private void sm_join(REPSocketChannel channel){
		
		//SM_JOINコマンドを生成。
		REPCommand command = new REPCommand();
		command.setCMD(REP.SMCMD_SM_JOIN);
		
		//hostnameをセット。
		setMyHostName(getLocalHostName(channel));
		
		//XMLを生成。送信コマンドにセット。
		SessionXMLEncoder encoder = new SessionXMLEncoder(sessionlist);
		String string = encoder.sessionListToXML();
		command.setString(string);
		
		//SM_JOINコマンドを送信。
		REPPacketSend send = new REPPacketSend(channel);
		send.send(command);
		
		//SessionManagerのListに追加。
		smList.add(channel);
	}

	private String getLocalHostName(REPSocketChannel channel) {
		String host = null;
		host = channel.socket().getLocalAddress().getHostName();
		return host;
	}

	public void connectionOccured(ConnectionEvent event) {
		connectSession(event.getHost());
	}

	public void ActionOccured(REPActionEvent event) {
		
		
		/*** 書き直し ***/
		REPSocketChannel channel = event.getEditorChannel();
		int sid = event.getSID();
		Session session = sessionlist.getSession(sid);
		if(session.isOwner()){
			int eid = session.addEditor(new Editor(channel));
			REPCommand sendCommand = new REPCommand();
			sendCommand.setCMD(REP.SMCMD_JOIN_ACK);
			sendCommand.setEID(eid);
			sendCommand.setSID(sid);
			REPPacketSend sender = new REPPacketSend(channel);
			sender.send(sendCommand);
		}else {
			REPSocketChannel editorChannel = event.getEditorChannel();
			sid = event.getSID();
			Editor editor = new Editor(editorChannel);
			editor.setHost(myHost);
			session = sessionlist.getSession(sid);
			session.addEditor(editor);
			
			Editor owner = session.getMaster();
			
			REPCommand command = new REPCommand();
			command.setCMD(REP.SMCMD_SELECT);
			command.setSID(sid);
			command.setString(editor.getHost() + ":" + editor.getPort());
			owner.send(command);
		}
		
		
	}

	public void undo() {
		ownEditorList.undoAllEditors();
		System.out.println("Undo!");
	}

	public void addWaitingCommand(PacketSet set) {
		// TODO Auto-generated method stub
		
	}
}