view src/fdl/test/debug/MetaProtocolEngine.java @ 85:d0d8aeaebccf

add routing table
author one
date Mon, 08 Feb 2010 11:07:57 +0900
parents c0575f877591
children c0591636a71a
line wrap: on
line source

package fdl.test.debug;

import java.io.*;
import java.net.InetAddress;
import java.net.UnknownHostException;

import java.nio.ByteBuffer;
import java.util.*;

import javax.xml.parsers.*;

import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;

import fdl.*;

/**
* MetaProtocolEngine
*
* @author Kazuki Akamine
* 
* 接続する機能までを実装した MetaEngine
* これを継承して、具体的な処理を書く
*  
*/

public class MetaProtocolEngine implements MetaEngine {
	// Fields
	public static final int DEFAULTPORT = 10000;
	public static final int MANAGE = 60000;
	public static final int DEBUG = 61000;
	
	private MetaLinda ml;
	private String localHostName;
	private int localPort;
	private PSXLinda manager;
	private String managerHostName;
	private int managerPort = DEFAULTPORT;
	private boolean running = true;
	private boolean connected = false;
	private boolean debugConnected = false;
	private int nodeId;
	private HashMap<Integer, Routing> nodes;
	
	// Callback class
	class AcceptXMLCallback implements PSXCallback {
		int tid;
		
		private DocumentBuilderFactory dbFactory = null;
		private DocumentBuilder docBuilder = null;
		protected Document document;
		
		public AcceptXMLCallback(int tid) {
			this.tid = tid;
			dbFactory = DocumentBuilderFactory.newInstance();
			try {
				docBuilder = dbFactory.newDocumentBuilder();
			} catch (ParserConfigurationException e) {
				e.printStackTrace();
			}

		}
		public void callback(ByteBuffer reply) {
			String xml = new String(reply.array());
			print(xml);
			parseXML(xml);
			
			ml.in(tid, this);
		}
		@SuppressWarnings("deprecation")
		protected void parseXML(String xml) {
			Document doc = null;
			try {
				doc = docBuilder.parse(new StringBufferInputStream(xml));
			} catch (SAXException e) {
				e.printStackTrace();
			} catch (IOException e) {
				e.printStackTrace();
			}
			
			Element root = doc.getDocumentElement();
			if(root.getTagName().equals("connections")) {
				nodeId = new Integer(root.getAttribute("id")).intValue();
				NodeList connections = root.getElementsByTagName("connection");
				for (int i = 0; i < connections.getLength(); i++) {
					Element connection = (Element)connections.item(i);
					Element host = (Element)connection.getElementsByTagName("host").item(0);
					Element port = (Element)connection.getElementsByTagName("port").item(0);
					Element t = (Element)connection.getElementsByTagName("tid").item(0);
					int srcId = new Integer(connection.getAttribute("id")).intValue();
					String dstHostName = host.getTextContent();
					int dstPort = new Integer(port.getAttribute("id")).intValue();
					int dstId = new Integer(t.getAttribute("id")).intValue();
					try {
						PSXLindaImpl linda = (PSXLindaImpl) ml.open(dstHostName, dstPort);
						Routing r = new Routing(linda, dstId);
						nodes.put(new Integer(srcId), r);
						ml.in(srcId, new RoutingCallback(srcId, r));
					} catch (IOException e) {
						e.printStackTrace();
					}
				}
			} else if (root.getTagName().equals("routing")) {
				print("Routing xml received!");

				NodeList routing = root.getElementsByTagName("source");
				for (int i = 0; i < routing.getLength(); i++) {
					Element src = (Element) routing.item(i);
					Integer srcId = new Integer(src.getAttribute("id"));
					Routing r = nodes.get(srcId);
					NodeList dest = src.getElementsByTagName("dest");
					for (int j = 0; j < dest.getLength(); j++) {
						Element dst = (Element) dest.item(j);
						Integer dstId = new Integer(dst.getAttribute("id"));
						r.route.add(dstId);
					}
				}
				
			}
			if (tid == MANAGE) connected = true;
			else if (tid == DEBUG) debugConnected = true;
			if (connected && debugConnected) {
				sendLocalHostName();
				print("Send local host name");
				connected = debugConnected = false;
			}
		}
		
	} 

	private class RoutingCallback implements PSXCallback {
		int tid;
		Routing routing;
		public RoutingCallback(int tid, Routing routing) {
			this.tid = tid;
			this.routing = routing;
		} 
		
		public void callback(ByteBuffer reply) {
			Iterator<Integer> it = routing.route.iterator();
			while (it.hasNext()) {
				Integer dstId = it.next();
				Routing r = nodes.get(dstId);
				r.linda.out(r.dstId, reply);
				try {
					r.linda.sync(1);
				} catch (IOException e) {
					e.printStackTrace();
				}
				ml.in(tid, this);
			}
		}
		
	}
	
	// Constructor
	public MetaProtocolEngine(int port, MetaLinda ml, String managerHostName, int managerPort) {
		this.ml = ml;
		this.localPort = port;
		this.managerHostName = managerHostName;
		this.managerPort = managerPort;
		this.nodes = new HashMap<Integer, Routing>();
		try {
			this.localHostName = InetAddress.getLocalHost().getHostName();
		} catch (UnknownHostException e) {
			e.printStackTrace();
		}
		manager = connectServer(managerHostName, managerPort);
		sendLocalHostName();
	}

	public void mainLoop() {
		initPoller();
		while (running) {
			ml.sync();
		}
	}
	
	protected void initPoller() {
		ml.in(MANAGE, new AcceptXMLCallback(MANAGE));
		ml.in(DEBUG, new AcceptXMLCallback(DEBUG));
	}
	
	protected void sendLocalHostName() {
		// TopologyManager に自分のホストネームを送信して、起動を伝える
		ByteBuffer local;
		local = ByteBuffer.wrap((localHostName + ":" + new Integer(localPort).toString()).getBytes());
		manager.out(MANAGE, local);
		try {
			manager.sync(1);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	protected PSXLinda connectServer(String hostName, int port) {
		PSXLinda linda = null;
		boolean connectPSX = true;
		while (connectPSX) {
			try {
				linda = ml.open(hostName, port);
				connectPSX = false;
			} catch (IOException e) {
				try {
					Thread.sleep(40);
				} catch (InterruptedException e1) {
				}
			}
		}
		print("Connect to " + hostName);
		return linda;
	}
	
	void print(String str) {
		System.err.println("[DEBUG] " + localHostName + ": " + str);
	}
	
}