Mercurial > hg > FederatedLinda
view src/fdl/test/debug2/RoutingCallback.java @ 92:ea4ee892baf5
commit
author | kazz <kazz@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Thu, 22 Apr 2010 16:13:03 +0900 |
parents | |
children |
line wrap: on
line source
package fdl.test.debug2; import java.nio.ByteBuffer; import java.util.Date; import fdl.PSXCallback; import fdl.PSXReply; class RoutingCallback implements PSXCallback { TupleId tid; Routing routing; NodeProperty np; TreeProperty tp; DebugProperty dp; public RoutingCallback(TupleId tid, Routing routing, NodeProperty np, TreeProperty tp, DebugProperty dp) { this.tid = tid; this.routing = routing; this.np = np; this.tp = tp; this.dp = dp; } public void callback(ByteBuffer reply) { Debug.print("get: " + new String(reply.array())); String str = new String(reply.array()); String[] commands= str.split(","); switch (tid) { case TREETOP: case TREELEFT: case TREERIGHT: np.ml.in(TupleId.BODY.id); np.ml.out(TupleId.BODY.id, reply); Debug.print("Update body: " + new String(reply.array())); switch (tid) { case TREETOP: Routing r = np.nodes.get(tid); if (r.route.isEmpty()) { // TREE末端の処理 r.linda.out(r.dstId, reply); // 送ってきた方に送り返す np.ml.in(tid.id, this); Debug.print("UP! " + np.nodeId); return; } break; default: switch (tid) { case TREELEFT: tp.leftFlag = true; break; case TREERIGHT: tp.rightFlag = true; break; } if (tp.leftFlag && tp.rightFlag) { Debug.print("UP"); if (np.nodeId == 0) { if (tp.treeCounter++ < tp.treeLoopNum) { // int num = Integer.parseInt(new String(reply.array())); tp.startTree(ByteBuffer.wrap((""+tp.treeCounter).getBytes())); } else { tp.endTime = new Date(); Double resultTime = new Double(((tp.endTime.getTime() - tp.startTime.getTime()) / (double)tp.treeLoopNum)); ByteBuffer data = ByteBuffer.wrap(resultTime.toString().getBytes()); np.manager.out(TupleId.START.id, data); Debug.print("Finish Tree"); } } else { Routing r1 = np.nodes.get(TupleId.TREETOP); r1.linda.out(r1.dstId, reply); } } np.ml.in(tid.id, this); return; } break; case RINGLEFT: case RINGRIGHT: if (str.equals("shutdown")) { Debug.print("get shutdown command id: " + np.nodeId); if (np.nodeId != 0) { Routing r = null; r = np.nodes.get(tid.getMirrorId()); r.linda.out(r.dstId, ByteBuffer.wrap("shutdown".getBytes())); np.ml.fdl.queueExec(); } else { Debug.print("shutdown reaches last node!"); } np.running = false; return; } else if (commands[0].equals("print")) { Debug.print("Get Debug Message: print"); int printId = Integer.parseInt(commands[1]); PSXReply rep = np.ml.rd(printId); np.ml.sync(1); str += "<debug id=\"" + np.nodeId + "\"><host>" + np.localHostName + ":" + np.localPort + "</host><data id=\"" + printId + "\">"; if (rep.ready()) str += new String(rep.data.array()); str += "</data></debug>\n"; np.ml.in(tid.id, this); if (np.nodeId == 0) { // 実験終了 np.manager.out(TupleId.DEBUG.id, ByteBuffer.wrap(str.getBytes())); } else { Routing r = np.nodes.get(tid.getMirrorId()); r.linda.out(r.dstId, ByteBuffer.wrap(str.getBytes())); } return; } break; default: if (np.nodeId == 0 && tid == TupleId.RINGLEFT) { dp.relayCounter++; Debug.print("" + dp.relayCounter + " relay"); if (dp.relayCounter >= dp.relayNum) { // 実験終了 tp.endTime = new Date(); Double resultTime = new Double(((tp.endTime.getTime() - tp.startTime.getTime()) / (double)dp.relayNum)); ByteBuffer data = ByteBuffer.wrap(resultTime.toString().getBytes()); np.manager.out(TupleId.DEBUG.id, data); np.ml.in(tid.id, this); return; } } } for (Integer dstId : routing.route) { if (dstId == TupleId.TREELEFT.id) tp.leftFlag = false; else if (dstId == TupleId.TREERIGHT.id) tp.rightFlag = false; Routing r = np.nodes.get(TupleId.getTupleIdFromId(dstId)); r.linda.out(r.dstId, reply); } np.ml.in(tid.id, this); } }