Mercurial > hg > FederatedLinda
view src/fdl/test/topology/ring/RingMetaProtocolEngine.java @ 80:04bd4ae97e7c
RingTopology is finished.
author | one |
---|---|
date | Sun, 22 Nov 2009 13:59:05 +0900 |
parents | 805645cf5ec0 |
children | c001797f3fdb |
line wrap: on
line source
package fdl.test.topology.ring; import java.io.IOException; import java.nio.ByteBuffer; import java.text.SimpleDateFormat; import java.util.Date; import fdl.MetaLinda; import fdl.PSXLinda; import fdl.PSXReply; import fdl.test.topology.MetaProtocolEngine; /** * RingMetaProtocolEngine * * @author Kazuki Akamine * * Ring 接続実験用の MetaProtocolEngine * Ring 接続実験の具体的な処理を記述する。 * */ public class RingMetaProtocolEngine extends MetaProtocolEngine { private int relayNum; private static int relayId = 10; private Date startTime, endTime; private SimpleDateFormat DF = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS"); public RingMetaProtocolEngine(MetaLinda ml, String managerHostName, int relayNum) { super(ml, managerHostName); this.relayNum = relayNum; startTime = null; endTime = null; } @Override public void mainLoop() { // 接続処理 super.mainLoop(); // Ring 実験開始 relayTuple(relayId); } private void relayTuple(int tupleId) { while (true) { ByteBuffer data = receiveToken(tupleId); if (startTime == null) { startTime = new Date(); } if (relayNum == 0) { String token = new String(data.array()); if (!token.equals(lastToken)) { // TODO: 実験結果をManagerに伝えるなどの処理 sendResult(); System.out.println("[Ring] relay finished: " + token); // 実験終了を各ノードにリレーで伝える sendToken(tupleId, ByteBuffer.wrap(lastToken.getBytes())); } return; } sendToken(tupleId, data); System.out.println("[DEBUG] Relay..."); relayNum--; } } private void sendResult() { this.endTime = new Date(); Long resultTime = new Long(endTime.getTime() - startTime.getTime()); ByteBuffer data = ByteBuffer.wrap(resultTime.toString().getBytes()); manager.out(relayId, data); try { manager.sync(1); } catch (IOException e) { e.printStackTrace(); } } private ByteBuffer receiveToken(int tupleId) { PSXReply reply = psxLocal.in(tupleId); do { try { psxLocal.sync(1); } catch (IOException e) { e.printStackTrace(); } } while (!reply.ready()); ByteBuffer data = reply.getData(); return data; } private void sendToken(int tupleId, ByteBuffer data) { for (int i = 0; i < psxSendServers.size(); i++) { PSXLinda linda = psxSendServers.get(i); linda.out(tupleId, data); try { linda.sync(1); } catch (IOException e) { e.printStackTrace(); } } } }