Mercurial > hg > Members > shoshi > distributedalgorithm
view src/main/java/suikwasha/distributedalgorithm/algorithms/krs/KorachRotemSantoroAlgorithm.java @ 2:8e1f63faa2fd default tip
added Franklin's Algorithm
author | suikwasha |
---|---|
date | Tue, 23 Oct 2012 16:49:26 +0900 |
parents | d24bcb819032 |
children |
line wrap: on
line source
package suikwasha.distributedalgorithm.algorithms.krs; import java.nio.ByteBuffer; import java.util.Iterator; import fj.P; import fj.P2; import suikwasha.distributedalgorithm.framework.Algorithm; import suikwasha.distributedalgorithm.framework.Context; import suikwasha.distributedalgorithm.framework.Message; import suikwasha.distributedalgorithm.framework.Port; public class KorachRotemSantoroAlgorithm implements Algorithm { private final boolean isInitProcess; private final boolean direction; private final long num; private long max; public static final int MESSAGE_SIZE = 1 + 8; // size of (byte + long) in java public KorachRotemSantoroAlgorithm(long _num,boolean _direction,boolean _isInitProcess) { num = _num; max = -1; direction = _direction; isInitProcess = _isInitProcess; } /* * message format. * flag | long value * 0xFF | 0xFF 0xFF 0xFF 0xFF 0xFF 0xFF 0xFF 0xFF */ public static ByteBuffer s(boolean _flagMax,long _value) { ByteBuffer b = ByteBuffer.allocate(MESSAGE_SIZE); b.put((_flagMax) ? (byte)1 : (byte)0); b.putLong(_value); b.rewind(); return b; } public P2<Message,Port> RECV(Port... ports) { while(true){ for(Port p : ports){ Message message = p.tryReceive(); if(message != null){ return P.p(message,p); } } } } public void execute(Context _c) { Iterator<Port> ports = _c.getPorts().iterator(); Port left,right; if(direction){ left = ports.next(); right = ports.next(); }else{ right = ports.next(); left = ports.next(); } Message newMessage,receivedMessage; if(isInitProcess){ /* * initial process starts from here. */ max = num; right.send(new Message(s(false,max))); }else{ /* * waiting process starts from here. */ P2<Message,Port> ret = RECV(right,left); receivedMessage = ret._1(); Port receivedPort = ret._2(); ByteBuffer b = receivedMessage.getMessage(); long value = b.getLong(1); // see message format above max = Math.max(num,value); newMessage = receivedMessage.newMessage(s(false,max)); if(receivedPort == right){ left.send(newMessage); }else{ right.send(newMessage); } } /* * label : L in text book. */ while(true){ P2<Message,Port> ret = RECV(right,left); receivedMessage = ret._1(); Port x = ret._2(); ByteBuffer b = receivedMessage.getMessage(); byte flagMax = b.get(); long value = b.getLong(); if(max == value){ newMessage = receivedMessage.newMessage(s(true,max)); if(x == right){ left.send(newMessage); }else{ right.send(newMessage); } // stop; return; } if(flagMax == 1){ max = value; newMessage = receivedMessage.newMessage(s(true,max)); if(x == right){ left.send(newMessage); }else{ right.send(newMessage); } // stop; return; } if(value > max){ max = value; newMessage = receivedMessage.newMessage(s(false,max)); if(x == right){ left.send(newMessage); }else{ right.send(newMessage); } } } } }