view rep/SessionManager.java @ 122:790c8dd42a7b

*** empty log message ***
author kono
date Tue, 26 Aug 2008 16:33:47 +0900
parents 628e7ffeb097
children 5b1a0574b406
line wrap: on
line source

package rep;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
import java.util.LinkedList;
import java.util.StringTokenizer;

import rep.xml.SessionXMLDecoder;
import rep.xml.SessionXMLEncoder;

//+-------+--------+--------+-------+--------+---------+------+
//| cmd   | session| editor | seqid | lineno | textsiz | text |
//|       | id     | id     |       |        |         |      |
//+-------+--------+--------+-------+--------+---------+------+
//o-------header section (network order)-------------o
/*int cmd;	// command
int sid;	// session ID : uniqu to editing file
int eid;	// editor ID : owner editor ID = 1。Session に対してユニーク
int seqno;	// Sequence number : sequence number はエディタごとに管理
int lineno;	// line number
int textsize;   // textsize : bytesize
byte[] text;*/

public class SessionManager implements ConnectionListener, REPActionListener{
	
	
	private SessionList sessionlist;
	//SocketChannel sessionchannel;
	private SessionManagerGUI gui;
	private Selector selector;
	private SessionManagerList smList;
	private String myHost;
	private boolean isMaster = true;
	private EditorList  ownEditorList;
	private String maxHost;
	//private boolean addressIsGlobal;
	//private SocketChannel sessionchannel;
	//private boolean co;
	private static int temp_port;
	private static int send_port;
	
	static final int DEFAULT_PORT = 8766;
	
	public SessionManager(int port) {
		gui = new SessionManagerGUI();
	}
	
	public void openSelector() throws IOException{
		selector = Selector.open();
	}
	
	public void sessionManagerNet(int port) throws InterruptedException, IOException {
	
		ServerSocketChannel ssc = ServerSocketChannel.open();
		ssc.configureBlocking(false);	//reuse address 必須
		
		ssc.socket().setReuseAddress(true);
		
		ssc.socket().bind(new InetSocketAddress(port));
		ssc.register(selector, SelectionKey.OP_ACCEPT);

		
		sessionlist = new SessionList();
		smList = new SessionManagerList();
		ownEditorList = new EditorList();
		
		while(true){
			selector.select();
			for(SelectionKey key : selector.selectedKeys()){
				if(key.isAcceptable()){
					/*** serverChannelはenableになったSelectionKeyのchannel ***/
					ServerSocketChannel serverChannel = (ServerSocketChannel)key.channel();
					/*** EditorChannel を用いない記述 ***/
					SocketChannel channel = serverChannel.accept();	//keyからchannelを取って、accept 
					registerChannel (selector, channel, SelectionKey.OP_READ);
					channel = null;
					
					/*** EditorChannel を用いた記述 ****/
					//EditorChannel echannel = (EditorChannel) ssc.accept();
					//echannel.setIO();
					//registerChannel(selector, echannel, SelectionKey.OP_READ);
					//echannel = null;
					
					/*** SelectableEditorChannel ***/
					//SocketChannel channel = ssc.accept();
					//SelectableEditorChannel echannel2 = new SelectableEditorChannel(channel);
					//registerChannel(selector, echannel2, SelectionKey.OP_READ);
					//channel = null;
					//echannel2 = null;
					
				}else if(key.isReadable()){
					
					/*** EditorChannel を用いない記述 ***/
					SocketChannel channel = (SocketChannel)key.channel();
					REPPacketReceive receive = new REPPacketReceive(channel); //getPacket(), putPacket() にする。
					receive.setkey(key);
					REPCommand receivedCommand = receive.unpackUConv();
					//REPCommand receivedCommand = receive.unpack();
					manager(channel, receivedCommand);
					
					/*** EditorChannel を用いた記述 ****/
					//EditorChannel echannel = (EditorChannel) key.channel();
					//REPCommand command = echannel.getPacket();
					//manager(echannel, command);
					
				}else if(key.isConnectable()){
					System.out.println("Connectable");
				}
			}
		}
	}
	
	private synchronized void registerChannel(Selector selector, SelectableChannel channel, int ops) throws IOException {
		if(channel == null) {
			return;
		}
		//System.out.println("registerChannel()");
		channel.configureBlocking(false);
		selector.wakeup();
		channel.register(selector, ops);
	}

	private void manager(SocketChannel channel, REPCommand receivedCommand) {
		if(receivedCommand == null) return;
		Editor editor;
		Session session;
		REPCommand sendCommand = receivedCommand.clone();
		REPPacketSend send = new REPPacketSend(channel);
		//SessionXMLEncoder encoder = new SessionXMLEncoder();
		
		switch(receivedCommand.cmd){
		
		case REP.SMCMD_JOIN:
			editor = new Editor(channel);
			editor.setHost(myHost);
			int tempeid = ownEditorList.addEditor(editor);
			gui.setComboEditor(tempeid, channel);
			
			break;
			
		case REP.SMCMD_JOIN_ACK:
//				editorList.setEID(repCmd);
//				editorList.sendJoinAck(repCmd);
//				sessionmanagerGUI.setComboEditor(repCmd.eid, channel);
			break;
			
		case REP.SMCMD_PUT:
			editor = new Editor(channel);
			editor.setHost(myHost);
			ownEditorList.addEditor(editor);
			editor.setEID(1);
			//String string2 = setUTF16(receivedCommand.string);
			editor.setName(receivedCommand.string);
			//editor.setName(receivedCommand.string);
			session = new Session(editor);
			session.setOwner(true);
			session.addEditor(editor);
			sessionlist.addSession(session);
			gui.setComboSession(session.getSID(), session.getName());
			gui.setComboEditor(editor.getEID(), editor.getChannel());
			session.addToRoutingTable(editor);
			sendCommand.setCMD(REP.SMCMD_PUT_ACK);
			sendCommand.setEID(1);
			sendCommand.setSID(session.getSID());
			editor.send(sendCommand);
			
			//if(isMaster){
			SessionXMLEncoder sessionEncoder = new SessionXMLEncoder(session);
			REPCommand command = new REPCommand();
			command.setSID(session.getSID());
			command.setString(sessionEncoder.sessionListToXML());
			
			command.setCMD(REP.SMCMD_UPDATE);
			smList.sendExcept(channel, command);
			
			
//			if(isMaster){
//				command.setCMD(REP.SMCMD_UPDATE_ACK);
//				smList.sendToSlave(command);
//			}else{
//				command.setCMD(REP.SMCMD_UPDATE);
//				smList.sendToMaster(command);
//			}
			break;
			
//		case REP.SMCMD_PUT_ACK:
//			break;
			
		case REP.SMCMD_SELECT:
//			sessionlist.addEditor(channel, repCmd.sid, repCmd);	//sessionlistへ追加
			editor = new Editor(channel);
			
			session = sessionlist.getSession(receivedCommand.sid);
			
			if(session.isOwner()){
				int eid2 = session.addEditor(editor);
				editor.setEID(eid2);
				//REPPacketSend send = new REPPacketSend(channel);
				//receivedCommand.setCMD(REP.SMCMD_SELECT_ACK);
				//receivedCommand.setEID(eid2);
				sendCommand.setCMD(REP.SMCMD_SELECT_ACK);
				sendCommand.setEID(eid2);
				send.send(sendCommand);
			}else {
				Editor master = session.getMaster();
				master.send(receivedCommand);
				session.addEditor(editor);
			}
			
			break;
			
		case REP.SMCMD_SELECT_ACK:
			
			String hostport = receivedCommand.string;
			Editor editor2 = ownEditorList.getEditor(hostport);
			if(editor2 != null) {
				REPCommand command2 = new REPCommand();
				command2.setCMD(REP.SMCMD_JOIN_ACK);
				command2.setSID(receivedCommand.sid);
				command2.setEID(receivedCommand.eid);
				editor2.send(command2);
			}else{
				smList.sendExcept(channel, receivedCommand);
			}
			
			//receivedCommand.setCMD(REP.SMCMD_JOIN_ACK);
			//receivedCommand.setEID(receivedCommand.eid);
			//session = sessionlist.getSession(receivedCommand.sid);
			//session.sendToEditor(receivedCommand);
			//Editor editor3 = session3.getEditorList().get(0);
			//REPPacketSend send = new REPPacketSend(editor3.getChannel());
			//send.send(repCmd);
			break;
			
		case REP.SMCMD_SM_JOIN:
			
			//SessionManagerのリストへ追加
			smList.add(channel);
			
			//XMLからSessionListオブジェクトを生成する。
			SessionXMLDecoder decoder = new SessionXMLDecoder();
			SessionList receivedSessionList = decoder.decode(receivedCommand.string);
			
			//SessionListへ追加し変換テーブルを生成する。
			sessionlist.update(channel, receivedSessionList);
			
			//myHost を設定。
			if(myHost == null) setMyHostName(getLocalHostName(channel));
			
			//maxHost を設定。
			if(setMaxHost(channel, receivedSessionList.getMaxHost())){
				sendCommand = new REPCommand();
				sendCommand.setCMD(REP.SMCMD_CH_MASTER);
				sendCommand.setString(maxHost);
				smList.sendExcept(channel, sendCommand);
			}
			
			//SessionListからXMLを生成。
			//joinしてきたSessionManagerに対してACKを送信。
			SessionXMLEncoder sessionlistEncoder = new SessionXMLEncoder(sessionlist);
			sendCommand = new REPCommand();
			sendCommand.setCMD(REP.SMCMD_SM_JOIN_ACK);
			sendCommand.setString(sessionlistEncoder.sessionListToXML());
			send.send(sendCommand);
			
			//その他の SessionManager に対して SMCMD_UPDATEを 送信。
			sendCommand = new REPCommand();
			sendCommand.setCMD(REP.SMCMD_UPDATE);
			sendCommand.setString(receivedCommand.string);
			smList.sendExcept(channel, sendCommand);
			
			//その他のSessionManagerに対してSMCMD_SM_JOINを送信。
			//sendCommand = new REPCommand();
			//sendCommand.setCMD(REP.SMCMD_SM_JOIN);
			//sendCommand.setString(receivedCommand.string);
			//smList.sendExcept(channel, sendCommand);
			
			if(isMaster){
			}else {
			}
			
			break;
			
		case REP.SMCMD_SM_JOIN_ACK:
			
			//XMLからSessionListオブジェクトを生成。
			SessionXMLDecoder decoder2 = new SessionXMLDecoder();
			SessionList receivedSessionList2 = decoder2.decode(receivedCommand.string);
			
			//maxHostを決定。
			if(setMaxHost(channel, receivedSessionList2.getMaxHost())){
				sendCommand = new REPCommand();
				sendCommand.setCMD(REP.SMCMD_CH_MASTER);
				sendCommand.setString(maxHost);
				smList.sendExcept(channel, sendCommand);
			}
			
			if(isMaster){
			}else{
			}
			
			break;
			
		case REP.SMCMD_UPDATE:
			
			SessionXMLDecoder decoder3 = new SessionXMLDecoder();
			SessionList receivedSessionList3 = decoder3.decode(receivedCommand.string);
			
			//SessionListへ追加し変換テーブルを生成する。
			sessionlist.update(channel, receivedSessionList3);
			
			smList.sendExcept(channel, receivedCommand);
			
			for(Session session3 : receivedSessionList3.getList()){
				gui.setComboSession(session3.getSID(), session3.getName());
			}
			
			//SessionのownerのEditor
			//editor = new Editor(channel);
			//editor.setName(receivedCommand.string);
			
			
			
			//session = new Session(editor);
			//session.addEditor(editor);
			
			//sessionlist.addSession(session);
			
			//gui.setComboSession(session.getSID(), session.getName());
			
			//if(isMaster){
			//	receivedCommand.setCMD(REP.SMCMD_UPDATE_ACK);
			//	smList.sendToSlave(receivedCommand);
			//}else{
			//	receivedCommand.setCMD(REP.SMCMD_UPDATE);
			//	smList.sendToMaster(receivedCommand);
			//}
			break;
			
		case REP.SMCMD_UPDATE_ACK:
			if(receivedCommand.sid > sessionlist.getList().size()){
				editor = new Editor(channel);
				editor.setName(receivedCommand.string);
				
				session = new Session(editor);
				session.addEditor(editor);
				
				sessionlist.addSession(session);
				
				gui.setComboSession(session.getSID(), session.getName());
			}
			smList.sendToSlave(receivedCommand);
			break;
			
//		case REP.REPCMD_READ:
//			//sessionlist.sendCmd(channel, repCmd);
//			break;
			
		case REP.SMCMD_CH_MASTER:
			//maxHost を設定。
			if(setMaxHost(channel, receivedCommand.string)){
				sendCommand = new REPCommand();
				sendCommand.setCMD(REP.SMCMD_CH_MASTER);
				sendCommand.setString(maxHost);
				smList.sendExcept(channel, sendCommand);
			}
			break;
			
		case REP.SMCMD_GET_UNDO_ACK:
			editor = ownEditorList.getEditor(channel);
			editor.addUndoCommand(receivedCommand);
			break;
			
		default:
			//sessionlist.sendCmd(channel, repCmd);
			editor = ownEditorList.getEditor(channel);
			if(receivedCommand.seq < 0){
				//editor = ownEditorList.getEditor(channel);
				if(editor != null) {
					editor.addUndoCommand(receivedCommand);
				}
				break;
			}
			//editor.setKindOfUndoCmd(reverseCmd(receivedCommand.cmd));
			sessionlist.sendToNextEditor(channel, receivedCommand);
			break;
		}
	}

	private int reverseCmd(int cmd) {
		// TODO Auto-generated method stub
		int kindOfCmd = 0;
		switch(cmd){
		case REP.REPCMD_INSERT:
			kindOfCmd = REP.REPCMD_DELETE;
			break;
		case REP.REPCMD_DELETE:
			kindOfCmd = REP.REPCMD_INSERT;
			break;
		case REP.REPCMD_REPLACE:
			kindOfCmd = REP.REPCMD_REPLACE;
			break;
		}
		return kindOfCmd;
	}

	private String setUTF16(String string) {
    	//CharBuffer cb = CharBuffer.wrap(string);
   		Charset charset = Charset.forName("UTF-16");
   		ByteBuffer buffer = ByteBuffer.allocateDirect(string.length() * 2);
		//CharsetEncoder encoder = charset.newEncoder();
		try {
			buffer = charset.encode(string);
		} catch (IllegalStateException e) {
			e.printStackTrace();
		}
		buffer.rewind();
		String text = null;
		for(int i=0;i<string.length();i++) {
			text +=buffer.getChar();
		}
		String string2 = text;
		return string2;
	}

	private boolean setMaxHost(SocketChannel channel, String host) {
		if(maxHost == null) {
			maxHost = myHost;
			sessionlist.setMaxHost(maxHost);
		}
		if(host.compareTo(maxHost) > 0){
			//host > MaxHost なら maxHost = host
			//masterを設定する。
			maxHost = host;
			sessionlist.setMaxHost(maxHost);
			setMaster(false, channel);
			return true;
		}else{
			return false;
		}
	}

	private void setMyHostName(String localHostName) {
		myHost = localHostName + temp_port;
		if(maxHost == null) {
			maxHost = myHost;
			sessionlist.setMaxHost(maxHost);
		}
		ownEditorList.setHost(myHost);
	}

	private void setMaster(boolean b, SocketChannel channel) {
		isMaster = b;
		System.out.println("isMaster = " + b);
		smList.setMaster(channel);
	}

	public static void main(String[] args) throws InterruptedException, IOException {
		int port = DEFAULT_PORT;
		int port_s = DEFAULT_PORT;
		//System.setProperty("file.encoding", "UTF-8");
		if(args.length > 0){
			port = Integer.parseInt(args[0]);
			port_s = Integer.parseInt(args[1]);
		}
		temp_port = port;
		send_port = port_s;
		SessionManager sm = new SessionManager(port);
		sm.openSelector();
		sm.openWindow();
		sm.sessionManagerNet(port);
	}

	private void openWindow() {
		Thread th = new Thread( gui ); 
		th.start();
		//System.out.println(sessionmanagerGUI.toString());
		gui.addConnectionListener(this);
		gui.addREPActionListener(this);
	}

	private void connectSession(String host) {
		int port = DEFAULT_PORT;
		port = send_port;
		InetSocketAddress addr = new InetSocketAddress(host, port);
		try {
			SocketChannel sessionchannel = SocketChannel.open();
			sessionchannel.configureBlocking(true);
			sessionchannel.connect(addr);
			while(!sessionchannel.finishConnect()){
				System.out.print("test afro");
			}
			System.out.println("");
			registerChannel(selector, sessionchannel, SelectionKey.OP_READ);
			
			sm_join(sessionchannel);
			
		}catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	private void sm_join(SocketChannel channel){
		
		//SM_JOINコマンドを生成。
		REPCommand command = new REPCommand();
		command.setCMD(REP.SMCMD_SM_JOIN);
		
		//hostnameをセット。
		setMyHostName(getLocalHostName(channel));
		
		//XMLを生成。送信コマンドにセット。
		SessionXMLEncoder encoder = new SessionXMLEncoder(sessionlist);
		String string = encoder.sessionListToXML();
		command.setString(string);
		
		//SM_JOINコマンドを送信。
		REPPacketSend send = new REPPacketSend(channel);
		send.send(command);
		
		//SessionManagerのListに追加。
		smList.add(channel);
	}

	private String getLocalHostName(SocketChannel channel) {
		String host = null;
		host = channel.socket().getLocalAddress().getHostName();
		return host;
	}

//	private String getSocketString(SocketChannel sessionchannel) {
//		SocketAddress socket = sessionchannel.socket().getRemoteSocketAddress();
//		//String inetAddressString = sessionchannel.socket().getInetAddress().toString();
//		StringTokenizer stn = new StringTokenizer(socket.toString(), "/");
//		String socketString = null;
//		while(stn.hasMoreTokens()){
//			socketString = stn.nextToken();
//			//System.out.println(socketString);
//		}
//		return socketString;
//	}

	public void connectionOccured(ConnectionEvent event) {
		connectSession(event.getHost());
	}

	public void ActionOccured(REPActionEvent event) {
		
		/*** 元の ***/
//		SocketChannel editorChannel = event.getEditorChannel();
//		int sid = event.getSID();
//		Editor editor = new Editor(editorChannel);
//		editor.setHost(this.myHost);
//		Session session = sessionlist.getSession(sid);
//		session.addEditor(editor);
//		
//		Editor owner = session.getMaster();
//		
//		REPCommand command = new REPCommand();
//		command.setCMD(REP.SMCMD_SELECT);
//		command.setSID(sid);
//		command.setString(editor.getHost() + ":" + editor.getPort());
//		owner.send(command);
		
		/*** 書き直し ***/
		SocketChannel channel = event.getEditorChannel();
		int sid = event.getSID();
		Session session = sessionlist.getSession(sid);
		if(session.isOwner()){
			int eid = session.addEditor(new Editor(channel));
			REPCommand sendCommand = new REPCommand();
			sendCommand.setCMD(REP.SMCMD_JOIN_ACK);
			sendCommand.setEID(eid);
			sendCommand.setSID(sid);
			REPPacketSend sender = new REPPacketSend(channel);
			sender.send(sendCommand);
		}else {
			SocketChannel editorChannel = event.getEditorChannel();
			sid = event.getSID();
			Editor editor = new Editor(editorChannel);
			editor.setHost(myHost);
			session = sessionlist.getSession(sid);
			session.addEditor(editor);
			
			Editor owner = session.getMaster();
			
			REPCommand command = new REPCommand();
			command.setCMD(REP.SMCMD_SELECT);
			command.setSID(sid);
			command.setString(editor.getHost() + ":" + editor.getPort());
			owner.send(command);
		}
		
		
		//REPPacketSend send = new REPPacketSend(editorChannel);
		//send.send(new REPCommand(REP.SMCMD_SELECT_ACK, sid, eid, 0,0,0,""));
		
		
		
		//sessionlist.sendSelect(sid);
	}

	public void undo() {
		// TODO Auto-generated method stub
		ownEditorList.undoAllEditors();
		System.out.println("Undo!");
	}
}