view test/channeltest/testSeMaSlave.java @ 382:4b87f89b3afd

REP Session Manager (Java version) new structure
author one@firefly.cr.ie.u-ryukyu.ac.jp
date Mon, 10 Nov 2008 22:07:45 +0900
parents c5be84d53c7f
children
line wrap: on
line source

package test.channeltest;

import java.io.IOException;
import java.net.SocketAddress;
import java.util.LinkedList;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;

import rep.channel.REPLogger;
import rep.channel.REPPack;
import rep.channel.REPSelectionKey;
import rep.channel.REPSelector;
import rep.channel.REPServerSocketChannel;
import rep.channel.REPSocketChannel;

public class testSeMaSlave extends Thread{

	SocketAddress ownIP;
	SocketAddress masterIP;
	boolean running=true;
	REPLogger ns;
	LinkedList<ClientInfo> cis;
	
	public testSeMaSlave(String name, String oname,int oport, String mname,int  mport){
		super(name);
		ownIP = new InetSocketAddress(oname,oport);
		masterIP = new InetSocketAddress(mname,mport);
		cis = new LinkedList<ClientInfo>();
		ns = REPLogger.singleton();
	}
	public void init(){
		
	}

	public void run() {
		REPSelector<String> selector;
		REPSocketChannel<String> masterCH ;
		try {
			REPPack<String> pack = new StringPacker();
			REPServerSocketChannel<String> scs = REPServerSocketChannel.<String>open(pack);
			selector = REPSelector.create();
			masterCH = connectToMaster(selector);
			scs.socket().bind(ownIP);
			scs.configureBlocking(false);
			scs.register(selector, SelectionKey.OP_ACCEPT,null);
			masterCH.configureBlocking(false);
			masterCH.register(selector, SelectionKey.OP_READ,null);


			ns.writeLog("Slave SessionManager starts main routin.", 1);

			/* Main Loop */
			while(running){

				selector.select(); 

				for(REPSelectionKey<String> key : selector.selectedKeys1()){

					if(key.isAcceptable()){
						REPSocketChannel<String> channel = key.accept(pack);
						if(channel==null) continue;
						channel.configureBlocking(false);
						//selector.register(channel, SelectionKey.OP_READ,null);
						channel.register(selector, SelectionKey.OP_READ, null);
						ns.writeLog("accepts a client.", 1);

					}else if(key.isReadable()){
						try {
							REPSocketChannel<String> channel = key.channel1();
							String packet = channel.read();
							if (packet==null) continue;
							//if (channel==masterCH){
							if (packet.matches("^SeMa\\d.*")){
								ns.writeLog("receives String from master ==> `"+packet+"\'", 1);
								for (ClientInfo ci: cis){
									if (!packet.matches(".*"+ci.str+".*")) continue;
									ci.channel.write(this.getName()+": loopback master ==>`"+packet+"\'");
								}
							}else{
								ns.writeLog("receives String==> `"+packet+"\'", 1);
								//channlel.write("from "+this.getName()+": save");
								masterCH.write(this.getName()+": pass packet`"+packet+"\'");
								cis.add(new ClientInfo(packet, channel));
							}
						}catch (IOException e1) {
							ns.writeLog("channel "+ns+" closed.");
							key.cancel();
						}
					}
				}
			}
		} catch (IOException e1) {
			e1.printStackTrace();
		}

	}
	
	private REPSocketChannel<String> connectToMaster(Selector _selector) throws IOException {
		REPSocketChannel<String> channel = REPSocketChannel.<String>create(new StringPacker());
		ns.writeLog("is connecting to masterSeMa whose ip is"+masterIP, 1);
		while(!channel.connect(masterIP)){
			ns.writeLog("SeMa not listen to socket yet, wait", 1);
			Thread.yield();
		}
		ns.writeLog("connecting was successful.", 1);

		return channel;
	}
}
class ClientInfo{
	String str;
	REPSocketChannel<String> channel;
	ClientInfo(String _str, REPSocketChannel<String> _channel){
		str = _str;
		channel = _channel;
	}
}