view src/main/java/suikwasha/distributedalgorithm/algorithms/krs/KorachRotemSantoroAlgorithm.java @ 1:d24bcb819032

trying to add Selector
author suikwasha
date Fri, 19 Oct 2012 23:48:11 +0900
parents
children 8e1f63faa2fd
line wrap: on
line source

package suikwasha.distributedalgorithm.algorithms.krs;

import java.nio.ByteBuffer;
import java.util.Iterator;

import fj.P;
import fj.P2;

import suikwasha.distributedalgorithm.framework.Algorithm;
import suikwasha.distributedalgorithm.framework.Context;
import suikwasha.distributedalgorithm.framework.Message;
import suikwasha.distributedalgorithm.framework.Port;

public class KorachRotemSantoroAlgorithm implements Algorithm
{
	private final boolean isInitProcess;
	private final boolean direction;
	private final long num;
	private long max;
	
	public static final int MESSAGE_SIZE = 1 + 8; // size of (byte + long) in java
	
	public KorachRotemSantoroAlgorithm(long _num,boolean _direction,boolean _isInitProcess)
	{
		num = _num;
		max = -1;
		direction = _direction;
		isInitProcess = _isInitProcess;
	}
	
	/*
	 * message format.
	 * flag | long value 
	 * 0xFF | 0xFF 0xFF 0xFF 0xFF 0xFF 0xFF 0xFF 0xFF
	 */
	public static ByteBuffer s(boolean _flagMax,long _value)
	{
		ByteBuffer b = ByteBuffer.allocate(MESSAGE_SIZE); 
		b.put((_flagMax) ? (byte)1 : (byte)0);
		b.putLong(_value);
		b.rewind();
		return b;
	}
	
	public P2<Message,Port> RECV(Port... ports)
	{
		while(true){
			for(Port p : ports){
				Message message = p.tryReceive();
				if(message != null){
					return P.p(message,p);
				}
				try{
					Thread.sleep(1);
				}catch(Exception _e){
					_e.printStackTrace();
				}
			}
		}
		
	}

	public void execute(Context _c)
	{
		Iterator<Port> ports = _c.getPorts().iterator();
		Port left,right;
		if(direction){
			left = ports.next();
			right = ports.next();
		}else{
			right = ports.next();
			left = ports.next();
		}
		
		Message newMessage,receivedMessage;
		
		if(isInitProcess){
			/*
			 * initial process starts from here.
			 */
			max = num;
			right.send(new Message(s(false,max)));
		}else{
			/*
			 * waiting process starts from here.
			 */
			
			P2<Message,Port> ret = RECV(right,left);
			receivedMessage = ret._1();
			Port receivedPort = ret._2();
			
			ByteBuffer b = receivedMessage.getMessage();
			long value = b.getLong(1); // see message format above
			max = Math.max(num,value);
			
			newMessage = receivedMessage.newMessage(s(false,max));
			if(receivedPort == right){
				left.send(newMessage);
			}else{
				right.send(newMessage);
			}
		}
		
		/*
		 * label : L in text book.
		 */
		while(true){
			P2<Message,Port> ret = RECV(right,left);
			
			receivedMessage = ret._1();
			Port x = ret._2();
			
			ByteBuffer b = receivedMessage.getMessage();
			byte flagMax = b.get();
			long value = b.getLong();
			if(max == value){
				newMessage = receivedMessage.newMessage(s(true,max));
				if(x == right){
					left.send(newMessage);
				}else{
					right.send(newMessage);
				}
				
				// stop;
				return;
			}
			
			if(flagMax == 1){
				max = value;
				newMessage = receivedMessage.newMessage(s(true,max));
				if(x == right){
					left.send(newMessage);
				}else{
					right.send(newMessage);
				}
				
				// stop;
				return;
			}
			
			if(value > max){
				max = value;
				newMessage = receivedMessage.newMessage(s(false,max));
				if(x == right){
					left.send(newMessage);
				}else{
					right.send(newMessage);
				}
			}
		}
	}
}