changeset 79:805645cf5ec0

topology modifired
author one
date Tue, 17 Nov 2009 18:19:39 +0900
parents 4fd2d1094bb9
children 04bd4ae97e7c
files src/fdl/test/topology/MetaProtocolEngine.java src/fdl/test/topology/NodeManager.java src/fdl/test/topology/RingTopologyManager.java src/fdl/test/topology/TopologyManagerEngine.java src/fdl/test/topology/ring/FDLindaRingNode.java src/fdl/test/topology/ring/RingMetaProtocolEngine.java
diffstat 6 files changed, 110 insertions(+), 46 deletions(-) [+]
line wrap: on
line diff
--- a/src/fdl/test/topology/MetaProtocolEngine.java	Tue Nov 17 18:18:37 2009 +0900
+++ b/src/fdl/test/topology/MetaProtocolEngine.java	Tue Nov 17 18:19:39 2009 +0900
@@ -24,14 +24,14 @@
 
 public class MetaProtocolEngine implements MetaEngine {
 	// Fields
-	private static String lastToken = "null";
-	private static int port = 10000;
-	private static int manageId = 60000;
-	private PSXLinda manager, psxLocal;
-	private ArrayList<PSXLinda> psxSendServers;
-	private MetaLinda fdlMeta;
-	private String managerHostName;
-	private String localHostName;
+	protected static String lastToken = "null";
+	protected static int port = 10000;
+	protected static int manageId = 60000;
+	protected PSXLinda manager, psxLocal;
+	protected ArrayList<PSXLinda> psxSendServers;
+	protected MetaLinda fdlMeta;
+	protected String managerHostName;
+	protected String localHostName;
 	
 	// Constructor
 	public MetaProtocolEngine(MetaLinda ml, String managerHostName) {
@@ -51,12 +51,12 @@
 		initSendServer();
 	}
 	
-	private void initSendServer() {
+	protected void initSendServer() {
 		sendLocalHostName();
 		connectSendServers();
 	}
 
-	private void initTopologyManager() {
+	protected void initTopologyManager() {
 		// Connect to TopologyManager Server
 		try {
 			manager = fdlMeta.open(managerHostName, port);
@@ -67,7 +67,7 @@
 		}
 	}
 	
-	private void sendLocalHostName() {
+	protected void sendLocalHostName() {
 		// TopologyManager に自分のホストネームを送信して、起動を伝える
 		ByteBuffer local;
 		local = ByteBuffer.wrap(localHostName.getBytes());
@@ -79,7 +79,7 @@
 		}
 	}
 	
-	private void connectSendServers() {
+	protected void connectSendServers() {
 		// TopologyManager から、送られてくる ConnectServer の hostName を取得して接続
 		System.out.println("[DEBUG] MethodCall connectSendServers()");
 		while (true) {
@@ -101,7 +101,7 @@
 		}
 	}
 	
-	private void connectSendServer(String hostName) {
+	protected void connectSendServer(String hostName) {
 		PSXLinda linda;
 		try {
 			linda = fdlMeta.open(hostName, port);
--- a/src/fdl/test/topology/NodeManager.java	Tue Nov 17 18:18:37 2009 +0900
+++ b/src/fdl/test/topology/NodeManager.java	Tue Nov 17 18:19:39 2009 +0900
@@ -15,7 +15,7 @@
 	private int manageId;
 	private ArrayList<NodeManager> waitingNodes;
 	private ArrayList<NodeManager> waitedNodes;
-	private ArrayList<NodeManager> sendNodes;
+//	private ArrayList<NodeManager> sendNodes;
 
 	public NodeManager(MetaLinda ml, int port, int manageId) {
 		this.port = port;
@@ -24,7 +24,7 @@
 		hostName = null;
 		waitingNodes = new ArrayList<NodeManager>();
 		waitedNodes = new ArrayList<NodeManager>();
-		sendNodes = new ArrayList<NodeManager>();
+//		sendNodes = new ArrayList<NodeManager>();
 	}
 	
 	public void addConnection(NodeManager node) {
@@ -35,7 +35,7 @@
 	public void finishConnection(NodeManager node) {
 		waitingNodes.remove(node);
 		node.waitedNodes.remove(this);
-		sendNodes.add(node);
+//		sendNodes.add(node);
 	}
 	
 	public void startUp(String hostName) {
--- a/src/fdl/test/topology/RingTopologyManager.java	Tue Nov 17 18:18:37 2009 +0900
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,15 +0,0 @@
-package fdl.test.topology;
-
-import java.io.IOException;
-
-public class RingTopologyManager extends TopologyManager {
-	public RingTopologyManager(int port) throws IOException {
-		//super(port);
-	}
-
-	void makeTopology() {
-		
-
-	}
-
-}
--- a/src/fdl/test/topology/TopologyManagerEngine.java	Tue Nov 17 18:18:37 2009 +0900
+++ b/src/fdl/test/topology/TopologyManagerEngine.java	Tue Nov 17 18:19:39 2009 +0900
@@ -19,13 +19,13 @@
 */
 
 public class TopologyManagerEngine implements MetaEngine {
-	private static String lastToken = "null";
-	private static int port = 10000;
-	private static int manageId = 60000;
-	private PSXLinda manager;
-	private MetaLinda fdlMeta;
-	private int nodeNum;
-	private NodeManager[] nodes;
+	protected static String lastToken = "null";
+	protected static int port = 10000;
+	protected static int manageId = 60000;
+	protected PSXLinda manager;
+	protected MetaLinda fdlMeta;
+	protected int nodeNum; // 後々は、nodelist から自動で取得したい
+	protected NodeManager[] nodes;
 		
 	// Constructor
 	public TopologyManagerEngine(MetaLinda ml, int nodeNum) {
@@ -42,11 +42,23 @@
 		makeTopology();
 	}
 	
-	private void makeTopology() {
+	protected void makeTopology() {
+		makeConnection();
+		acceptNewNode();
+		sendLastToken();
+	}
+	
+	// これを継承して、 Topology を形成
+	// 最終的には外部XMLを読み込んで接続するようにする
+	protected void makeConnection() {
 		// 接続を定義
 		nodes[0].addConnection(nodes[1]);
 		nodes[1].addConnection(nodes[0]);
+	}
+	
+	protected void acceptNewNode() {
 		// 起動状況を確認しつつ、接続命令を送信
+		// nodes の数だけ新規参入 node を待つ。
 		for (int i = 0; i < nodes.length; i++) {
 			PSXReply reply;
 			reply = manager.in(manageId);
@@ -61,10 +73,13 @@
 			String hostName = new String(data.array());
 			System.out.println("[DEBUG] GetNodeName: " + hostName);
 			nodes[i].startUp(hostName);
-		}
+		}	
+	}
+		
+	protected void sendLastToken() {
 		ByteBuffer lastTokenBB = ByteBuffer.wrap(lastToken.getBytes());
 		for (int i = 0; i < nodes.length; i++) {
-			// TODO: 参加ノードに実験開始を告知する ("null"を送る)
+			// 参加ノードに実験開始を告知する ("null"を送る)
 			nodes[i].linda.out(manageId, lastTokenBB);
 			System.out.println("[DEBUG] SendMsg: " + lastToken + " to " + nodes[i].hostName);
 
@@ -75,5 +90,5 @@
 			}
 		}
 	}
-
+	
 }
--- a/src/fdl/test/topology/ring/FDLindaRingNode.java	Tue Nov 17 18:18:37 2009 +0900
+++ b/src/fdl/test/topology/ring/FDLindaRingNode.java	Tue Nov 17 18:19:39 2009 +0900
@@ -18,6 +18,8 @@
 	// Fields
 	private static int localPort = 10000;
 	private static String managerHostName;
+	private static String relayNumString;
+	private static int relayNum;
 	private final static String usageString
 		= "Usage: FDLindaRingNode -manager SERVERNAME";
 
@@ -26,13 +28,17 @@
 		for (int i = 0; i < args.length; i++) {
 			if ("-manager".equals(args[i])) {
 				managerHostName = args[++i];
+			} else if ("-relay".equals(args[i])) {
+				relayNumString = args[++i];
 			} else {
 				System.err.println(usageString);
 			}
 		}
+		relayNum = Integer.parseInt(relayNumString);
+		
 		try {
 			FDLindaNode node = new FDLindaNode(localPort);
-			MetaEngine me = new MetaProtocolEngine(node.getMetaLinda(), managerHostName);
+			MetaEngine me = new RingMetaProtocolEngine(node.getMetaLinda(), managerHostName, relayNum);
 			node.setMetaEngine(me);
 			node.mainLoop();
 		} catch (IOException e) {
--- a/src/fdl/test/topology/ring/RingMetaProtocolEngine.java	Tue Nov 17 18:18:37 2009 +0900
+++ b/src/fdl/test/topology/ring/RingMetaProtocolEngine.java	Tue Nov 17 18:19:39 2009 +0900
@@ -1,10 +1,15 @@
 package fdl.test.topology.ring;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
 import fdl.MetaLinda;
+import fdl.PSXLinda;
+import fdl.PSXReply;
 import fdl.test.topology.MetaProtocolEngine;
 
 /**
-* MetaProtocolEngine
+* RingMetaProtocolEngine
 *
 * @author Kazuki Akamine
 * 
@@ -14,9 +19,62 @@
 */
 
 public class RingMetaProtocolEngine extends MetaProtocolEngine {
-
-	public RingMetaProtocolEngine(MetaLinda ml, String managerHostName) {
+	private int relayNum;
+	private static int relayId = 10;
+	
+	public RingMetaProtocolEngine(MetaLinda ml, String managerHostName, int relayNum) {
 		super(ml, managerHostName);
+		this.relayNum = relayNum;
+	}
+	
+	@Override public void mainLoop() {
+		// 接続処理
+		super.mainLoop();
+		// Ring 実験開始
+		relayTuple(relayId);
+	}
+	
+	private void relayTuple(int tupleId) {
+		while (true) {
+			ByteBuffer data = receiveToken(tupleId);
+			if (relayNum == 0) {
+				String token = new String(data.array());
+				if (!token.equals(lastToken)) {
+					// TODO: 実験結果をManagerに伝えるなどの処理
+					System.out.println("[Ring] relay finished: " + token);
+					// 実験終了を各ノードにリレーで伝える
+					sendToken(tupleId, ByteBuffer.wrap(lastToken.getBytes()));
+				}
+				return;
+			}
+			sendToken(tupleId, data);
+			relayNum--;
+		}
+	}
+	
+	private ByteBuffer receiveToken(int tupleId) {
+		PSXReply reply = psxLocal.in(tupleId);
+		do {
+			try {
+				psxLocal.sync(1);
+			} catch (IOException e) {
+				e.printStackTrace();
+			}
+		} while (!reply.ready());
+		ByteBuffer data = reply.getData();
+		return data;
+	}
+	
+	private void sendToken(int tupleId, ByteBuffer data) {
+		for (int i = 0; i < psxSendServers.size(); i++) {
+			PSXLinda linda = psxSendServers.get(i);
+			linda.out(tupleId, data);
+			try {
+				linda.sync(1);
+			} catch (IOException e) {
+				e.printStackTrace();
+			}
+		}
 	}
 
 }