view src/fdl/test/topology/ring/RingMetaProtocolEngine.java @ 81:c001797f3fdb

connect bug fix
author one
date Mon, 23 Nov 2009 20:39:39 +0900
parents 04bd4ae97e7c
children
line wrap: on
line source

package fdl.test.topology.ring;

import java.io.IOException;
import java.nio.ByteBuffer;
//import java.text.SimpleDateFormat;
import java.util.Date;

import fdl.MetaLinda;
import fdl.PSXLinda;
import fdl.PSXReply;
import fdl.test.topology.MetaProtocolEngine;

/**
* RingMetaProtocolEngine
*
* @author Kazuki Akamine
* 
* Ring 接続実験用の MetaProtocolEngine
* Ring 接続実験の具体的な処理を記述する。
*  
*/

public class RingMetaProtocolEngine extends MetaProtocolEngine {
	private int relayNum;
	private static int relayId = 10;
	private Date startTime, endTime;
	//private SimpleDateFormat DF = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
	public RingMetaProtocolEngine(MetaLinda ml, String managerHostName, int relayNum) {
		super(ml, managerHostName);
		this.relayNum = relayNum;
		startTime = null;
		endTime = null;
	}
	
	@Override public void mainLoop() {
		// 接続処理
		super.mainLoop();
		// Ring 実験開始
		relayTuple(relayId);
	}
	
	private void relayTuple(int tupleId) {
		while (true) {
			ByteBuffer data = receiveToken(tupleId);
			if (startTime == null) {
				startTime = new Date(); 
			}
			if (relayNum == 0) {
				String token = new String(data.array());
				if (!token.equals(lastToken)) {
					// TODO: 実験結果をManagerに伝えるなどの処理
					sendResult();
					System.out.println("[Ring] relay finished: " + token);
					// 実験終了を各ノードにリレーで伝える
					sendToken(tupleId, ByteBuffer.wrap(lastToken.getBytes()));
				}
				return;
			}
			sendToken(tupleId, data);
			System.out.println("[DEBUG] Relay...");
			relayNum--;
		}
	}
	
	private void sendResult() {
		this.endTime = new Date();
		Long resultTime = new Long(endTime.getTime() - startTime.getTime()); 
		ByteBuffer data = ByteBuffer.wrap(resultTime.toString().getBytes());
		manager.out(relayId, data);
		try {
			manager.sync(1);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	private ByteBuffer receiveToken(int tupleId) {
		PSXReply reply = psxLocal.in(tupleId);
		do {
			try {
				psxLocal.sync(1);
			} catch (IOException e) {
				e.printStackTrace();
			}
		} while (!reply.ready());
		ByteBuffer data = reply.getData();
		return data;
	}
	
	private void sendToken(int tupleId, ByteBuffer data) {
		for (int i = 0; i < psxSendServers.size(); i++) {
			PSXLinda linda = psxSendServers.get(i);
			linda.out(tupleId, data);
			try {
				linda.sync(1);
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}

}