changeset 20:a0fd653d1121

Debug Client and Meta Engine for logging.
author kono
date Tue, 19 Aug 2008 06:26:20 +0900
parents 0243987383b7
children fac6e0073b1a
files src/fdl/ComDebug.java src/fdl/ComDebug_Client.java src/fdl/CommDebugHook.java src/fdl/FDLindaServ.java src/fdl/MetaEngine.java
diffstat 5 files changed, 84 insertions(+), 168 deletions(-) [+]
line wrap: on
line diff
--- a/src/fdl/ComDebug.java	Tue Aug 19 05:33:32 2008 +0900
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,134 +0,0 @@
-package fdl;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.util.Hashtable;
-import java.util.Iterator;
-import java.util.LinkedList;
-
-/*
- * こういう実装ではなくて、ldserv 中に、メタ
- * プロトコル用のエンジンを作った方が良い。そのためには、ldserve 内部
- * から、PSX Linda APIを呼び出せる仕組みが必要。Thread にすると、
- * 同期が面倒なので、Thread にしたくないが、Engine の書き方は、
- * Main Loop 的にしたい。少し変だが、Meta Engine 側の PSX sync から
- * ldserve のMain Loop を呼び出すか?
- */
-
-public class ComDebug {
-	static final boolean debug = true;
-	//public static int seq = 0;
-	public static Hashtable<String, Integer> Com_Hashtable  = new Hashtable<String, Integer>();  
-	public static LinkedList<SocketChannel> Report_Channellist = new LinkedList<SocketChannel>();
-	
-	ComDebug(){
-		
-	}
-
-	public static void Report(LinkedList<SocketChannel> reportCh_list, ByteBuffer command, String report_txt) throws IOException {
-		//レポートするチャンネルが0ならreturn
-		if(reportCh_list.isEmpty()) {
-			return;
-		}
-		//dataをセット
-		ByteBuffer data = ByteBuffer.allocateDirect(24+(report_txt).length()*2);
-		data.clear();  // position = 0 
-		for(int i=0;i<report_txt.length();i++) {
-			data.putChar(report_txt.charAt(i));
-		}
-    	data.flip();    // limit = current position, position = 0
-    	
-		//commandをセット
-    	PSX.setReportCommand(command, report_txt);
-
-		//送信
-    	TupleSpace io = new TupleSpace();
-		Iterator <SocketChannel> it = reportCh_list.iterator();
-		while(it.hasNext()) {
-		    io.send(it.next(), command, data);
-		}
-	}
-
-	private static String getRemoteHostAndPort(SocketChannel channel) {
-		String socketString = channel.socket().getRemoteSocketAddress().toString();
-		String[] split = socketString.split("/");
-		int length = split.length;
-		String hostAndPort = split[length-1];
-		split = hostAndPort.split(":");
-		String host = split[0];
-		String port = split[1];
-		int portnum = Integer.parseInt(port);
-		try {
-			InetSocketAddress address = new InetSocketAddress(InetAddress.getByName(host), portnum);
-			host = address.getHostName().toString();
-            return (host +":"+port);
-        }
-        catch( UnknownHostException e ){
-        	return hostAndPort;
-        }		
-	}
-	
-	private static String getLocalHostAndPort(SocketChannel channel) {
-		String socketString = channel.socket().getLocalSocketAddress().toString();
-		String[] split = socketString.split("/");
-		int length = split.length;
-		String hostAndPort = split[length-1];
-		split = hostAndPort.split(":");
-		String host = split[0];
-		String port = split[1];
-		try {
-            InetAddress localhost = InetAddress.getLocalHost();
-            host = localhost.getHostName();            
-            return (host +":"+port);
-        }
-        catch( UnknownHostException e ){
-        	return (host +":"+port);
-        }
-	}
-	
-	public static String Com_inc(SelectionKey key, Hashtable<String, Integer> comlist,int mode, int id, int seq, String sendtext) {
-		//通信ログ Hostname:port 'mode' =number  形式でインクリメント
-		int cnt = 0;
-		SocketChannel ch = (SocketChannel) key.channel();
-		if (sendtext==null) sendtext="none";
-		
-		String remoteString = getRemoteHostAndPort(ch);
-		String localString =  getLocalHostAndPort(ch);
-
-		String ComKey = localString + "--" + remoteString + " " + (char)mode;
-		if(comlist.containsKey(ComKey)){
-			cnt = comlist.get(ComKey);
-		}
-		cnt++;
-		comlist.put(ComKey, cnt);
-		long time = System.currentTimeMillis();
-		return (time+" "+ComKey+"="+cnt+" id="+id+" seq="+seq+" data="+sendtext);
-	}
-	
-	public static void addChannel(SelectionKey key, LinkedList<SocketChannel> reportCh_list) {
-		SocketChannel repch = (SocketChannel) key.channel();
-		reportCh_list.add(repch);
-	}
-	
-	public static void delChannel(SelectionKey key, LinkedList<SocketChannel> reportCh_list) {
-		SocketChannel repch = (SocketChannel) key.channel();
-		reportCh_list.remove(repch);
-	}
-
-	
-	public void reportCh_remove(SelectionKey key, LinkedList<SocketChannel> reportCh_list) throws IOException {
-		//レポートチャンネルが0ならreturn
-		if(reportCh_list.isEmpty()) {
-			return;
-		}else {
-			System.out.println("ComDebug Report Channel remove :"+key.channel());
-			delChannel(key,reportCh_list);
-		}
-	}
-	
-}
--- a/src/fdl/ComDebug_Client.java	Tue Aug 19 05:33:32 2008 +0900
+++ b/src/fdl/ComDebug_Client.java	Tue Aug 19 06:26:20 2008 +0900
@@ -2,6 +2,8 @@
 
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
 
 /*
  * それぞれのLinda Serverのmeta protocolに接続して、順々にモニタデータを
@@ -11,60 +13,96 @@
 
 	static int id;
 	static final boolean debug = false;
+	ByteBuffer nullBuffer = ByteBuffer.allocate(0);
+	PSXCallback debugCallback ; 
+
+	FederatedLinda fdl;
+	PSXLindaInterface psx;
+	LinkedList<String> hosts = new LinkedList<String>();
+	LinkedList<Integer> ports = new LinkedList<Integer>();
+	LinkedList<PSXLindaInterface> psxs = new LinkedList<PSXLindaInterface>();
 
 	public static void main(String[] args) {
-		FederatedLinda fdl;
-		PSXLindaInterface psx;
-		String host  = "localhost";
-		int port = 10000;
+		ComDebug_Client com = new ComDebug_Client();
+		com.run(args);
+	}
+
+	public void run(String[] args) {
 		int connect_num = 1;
-		final String usages = "usage: ComDebug_Client [-h host -p port]";
-
+		final String usages = "usage: ComDebug_Client [-h host -p port]*";
 		//引数判定
 		try {
 			if (args.length < 2) {
 				System.err.println(usages);
 			}
-				
+
 			for (int i=0; i<args.length; ++i) {
 				if("-h".equals(args[i])) {
-					host = (String)(args[++i]);
-					System.err.println("host = "+host);
+					hosts.add(args[++i]);
+					System.err.println("host = "+hosts.getLast());
 				} else {
 					//System.err.println(usages);
 				}
 				if("-p".equals(args[i])) {
-					port = Integer.parseInt(args[++i]);
-					System.err.println("port = "+port);
+					ports.add(Integer.parseInt(args[++i]));
+					System.err.println("port = "+ports.getLast());
 				} else {
 					//System.err.println(usages);
 				}
 			}
+			if (hosts.size()!=ports.size()) {
+				System.err.println("-h and -p pair is always necessary.");
+				System.exit(1);
+			}
 		} catch (NumberFormatException e) {
 			e.printStackTrace();
 		}
-		
-		
+
+
 		try {
-			PSXReply r;
-		    fdl = FederatedLinda.init();
-		    psx = fdl.open(host,port);
-		    r = psx.in(65535);
-			fdl.sync(1);
-			if(debug == true){
-			    System.out.println("PSXReply =>"+r.toString());
-		    }
+			fdl = FederatedLinda.init();
+			for(int i=0;i<hosts.size();i++) {
+				String host = hosts.get(i);
+				int port = ports.get(i);
+
+				psx = fdl.open(host,port);
+				psxs.add(psx);
+				class MyCallBack implements PSXCallback {
+					PSXLindaInterface psx;
+					public MyCallBack(PSXLindaInterface psx) {
+						this.psx = psx;
+					}
+					@Override
+					public void callback(ByteBuffer reply) {
+						int p = psxs.indexOf(psx);
+						System.out.println("PSXReply("+p+") =>"+reply);
+					}
+				}
+				psx.in(65535, new MyCallBack(psx)); 
 
-		    System.out.println("COM_DEBUG Connected.["+host+":"+port+"]");
-		    psx.in(PSX.PRIVILEGED_ID_START+connect_num);
-		    connect_num++;
-		    while(true) {
-		    	fdl.sync(1000);
-		    }
+				System.out.println("COM_DEBUG Connected.["+host+":"+port+"]");
+				psx.out(PSX.META_MONITOR, nullBuffer, 0);
+				debugCallback = 
+					new PSXCallback() {
+					@Override
+					public void callback(ByteBuffer reply) {
+						System.out.println(PSX.getdataString(reply));
+						psx.out(PSX.META_MONITOR, nullBuffer, 0);
+						psx.in(PSX.META_MONITOR_DATA,debugCallback); 
+					}
+				};
+				psx.out(PSX.META_MONITOR, nullBuffer, 0);
+				psx.in(PSX.META_MONITOR_DATA,debugCallback); 
+
+				connect_num++;
+			}
+			while(true) {
+				fdl.sync(1000);
+			}
 		} catch (IOException nfex) {
 			nfex.printStackTrace();
-		    System.out.println("Faild.");
-		    return;
+			System.out.println("Faild.");
+			return;
 		}
 	}
 }
--- a/src/fdl/CommDebugHook.java	Tue Aug 19 05:33:32 2008 +0900
+++ b/src/fdl/CommDebugHook.java	Tue Aug 19 06:26:20 2008 +0900
@@ -65,7 +65,7 @@
 	
 	public ByteBuffer getLog() {
 		String log = logs.poll();
-		if (log==null) return ByteBuffer.allocate(0);
+		if (log==null) return null;
 		return PSX.string2ByteBuffer(log);
 	}
 
--- a/src/fdl/FDLindaServ.java	Tue Aug 19 05:33:32 2008 +0900
+++ b/src/fdl/FDLindaServ.java	Tue Aug 19 06:26:20 2008 +0900
@@ -18,6 +18,7 @@
 	public int port = DEF_PORT;
 	private AbstractSelector selector;
 	private ServerSocketChannel ssChannel;
+	public TupleSpace tupleSpace;
 	
 	public static void main(final String[] args) {
 		final String usages = "usage: FDLindaServ [-p port]";
@@ -46,8 +47,10 @@
 	}
 	
 	private void mainLoop() {
+		MetaLinda ml = new MetaLinda(tupleSpace, this);
+		MetaEngine me = new MetaEngine(ml);
 		while(true) {
-			checkTuple();
+			me.mainLoop();
 		}
 	}
 
@@ -63,7 +66,7 @@
 		//ssChannel.socket().setReuseAddress(true);
 		System.out.println("Server: litening at "+ssChannel);
 		//セレクタにチャンネルを登録
-        TupleSpace tupleSpace = new TupleSpace();
+        tupleSpace = new TupleSpace();
 		ssChannel.register(selector, SelectionKey.OP_ACCEPT, new AcceptHandler(tupleSpace));
 
 
--- a/src/fdl/MetaEngine.java	Tue Aug 19 05:33:32 2008 +0900
+++ b/src/fdl/MetaEngine.java	Tue Aug 19 06:26:20 2008 +0900
@@ -12,6 +12,10 @@
 	boolean running = true;
 	CommDebugHook commDebug;
 
+	public MetaEngine(MetaLinda meta) {
+		this.meta = meta;
+	}
+	
 	PSXCallback monitor_callback_start =
 			new PSXCallback() {public void callback(ByteBuffer reply) { 
 				meta.ts.hook = commDebug = new CommDebugHook();
@@ -20,8 +24,13 @@
 				meta.in(PSX.META_MONITOR,monitor_callback);
 	}};
 	PSXCallback monitor_callback =
-		new PSXCallback() {public void callback(ByteBuffer reply) { 
-			ByteBuffer data = commDebug.getLog();
+		new PSXCallback() {public void callback(ByteBuffer reply) {
+			ByteBuffer data;
+			do {
+				data = commDebug.getLog();
+				if (data!=null) break;
+				meta.sync();
+			} while (true);
 			meta.out(PSX.META_MONITOR_DATA, data, data.limit());
 			meta.in(PSX.META_MONITOR,monitor_callback);
 		}};