1
|
1 package suikwasha.distributedalgorithm.algorithms.krs;
|
|
2
|
|
3 import java.nio.ByteBuffer;
|
|
4 import java.util.Iterator;
|
|
5
|
|
6 import fj.P;
|
|
7 import fj.P2;
|
|
8
|
|
9 import suikwasha.distributedalgorithm.framework.Algorithm;
|
|
10 import suikwasha.distributedalgorithm.framework.Context;
|
|
11 import suikwasha.distributedalgorithm.framework.Message;
|
|
12 import suikwasha.distributedalgorithm.framework.Port;
|
|
13
|
|
14 public class KorachRotemSantoroAlgorithm implements Algorithm
|
|
15 {
|
|
16 private final boolean isInitProcess;
|
|
17 private final boolean direction;
|
|
18 private final long num;
|
|
19 private long max;
|
|
20
|
|
21 public static final int MESSAGE_SIZE = 1 + 8; // size of (byte + long) in java
|
|
22
|
|
23 public KorachRotemSantoroAlgorithm(long _num,boolean _direction,boolean _isInitProcess)
|
|
24 {
|
|
25 num = _num;
|
|
26 max = -1;
|
|
27 direction = _direction;
|
|
28 isInitProcess = _isInitProcess;
|
|
29 }
|
|
30
|
|
31 /*
|
|
32 * message format.
|
|
33 * flag | long value
|
|
34 * 0xFF | 0xFF 0xFF 0xFF 0xFF 0xFF 0xFF 0xFF 0xFF
|
|
35 */
|
|
36 public static ByteBuffer s(boolean _flagMax,long _value)
|
|
37 {
|
|
38 ByteBuffer b = ByteBuffer.allocate(MESSAGE_SIZE);
|
|
39 b.put((_flagMax) ? (byte)1 : (byte)0);
|
|
40 b.putLong(_value);
|
|
41 b.rewind();
|
|
42 return b;
|
|
43 }
|
|
44
|
|
45 public P2<Message,Port> RECV(Port... ports)
|
|
46 {
|
|
47 while(true){
|
|
48 for(Port p : ports){
|
|
49 Message message = p.tryReceive();
|
|
50 if(message != null){
|
|
51 return P.p(message,p);
|
|
52 }
|
|
53 }
|
|
54 }
|
|
55 }
|
|
56
|
|
57 public void execute(Context _c)
|
|
58 {
|
|
59 Iterator<Port> ports = _c.getPorts().iterator();
|
|
60 Port left,right;
|
|
61 if(direction){
|
|
62 left = ports.next();
|
|
63 right = ports.next();
|
|
64 }else{
|
|
65 right = ports.next();
|
|
66 left = ports.next();
|
|
67 }
|
|
68
|
|
69 Message newMessage,receivedMessage;
|
|
70
|
|
71 if(isInitProcess){
|
|
72 /*
|
|
73 * initial process starts from here.
|
|
74 */
|
|
75 max = num;
|
|
76 right.send(new Message(s(false,max)));
|
|
77 }else{
|
|
78 /*
|
|
79 * waiting process starts from here.
|
|
80 */
|
|
81
|
|
82 P2<Message,Port> ret = RECV(right,left);
|
|
83 receivedMessage = ret._1();
|
|
84 Port receivedPort = ret._2();
|
|
85
|
|
86 ByteBuffer b = receivedMessage.getMessage();
|
|
87 long value = b.getLong(1); // see message format above
|
|
88 max = Math.max(num,value);
|
|
89
|
|
90 newMessage = receivedMessage.newMessage(s(false,max));
|
|
91 if(receivedPort == right){
|
|
92 left.send(newMessage);
|
|
93 }else{
|
|
94 right.send(newMessage);
|
|
95 }
|
|
96 }
|
|
97
|
|
98 /*
|
|
99 * label : L in text book.
|
|
100 */
|
|
101 while(true){
|
|
102 P2<Message,Port> ret = RECV(right,left);
|
|
103
|
|
104 receivedMessage = ret._1();
|
|
105 Port x = ret._2();
|
|
106
|
|
107 ByteBuffer b = receivedMessage.getMessage();
|
|
108 byte flagMax = b.get();
|
|
109 long value = b.getLong();
|
|
110 if(max == value){
|
|
111 newMessage = receivedMessage.newMessage(s(true,max));
|
|
112 if(x == right){
|
|
113 left.send(newMessage);
|
|
114 }else{
|
|
115 right.send(newMessage);
|
|
116 }
|
|
117
|
|
118 // stop;
|
|
119 return;
|
|
120 }
|
|
121
|
|
122 if(flagMax == 1){
|
|
123 max = value;
|
|
124 newMessage = receivedMessage.newMessage(s(true,max));
|
|
125 if(x == right){
|
|
126 left.send(newMessage);
|
|
127 }else{
|
|
128 right.send(newMessage);
|
|
129 }
|
|
130
|
|
131 // stop;
|
|
132 return;
|
|
133 }
|
|
134
|
|
135 if(value > max){
|
|
136 max = value;
|
|
137 newMessage = receivedMessage.newMessage(s(false,max));
|
|
138 if(x == right){
|
|
139 left.send(newMessage);
|
|
140 }else{
|
|
141 right.send(newMessage);
|
|
142 }
|
|
143 }
|
|
144 }
|
|
145 }
|
|
146 }
|