1
|
1 package suikwasha.distributedalgorithm.algorithms.krs;
|
|
2
|
|
3 import java.nio.ByteBuffer;
|
|
4 import java.util.Iterator;
|
|
5
|
|
6 import fj.P;
|
|
7 import fj.P2;
|
|
8
|
|
9 import suikwasha.distributedalgorithm.framework.Algorithm;
|
|
10 import suikwasha.distributedalgorithm.framework.Context;
|
|
11 import suikwasha.distributedalgorithm.framework.Message;
|
|
12 import suikwasha.distributedalgorithm.framework.Port;
|
|
13
|
|
14 public class KorachRotemSantoroAlgorithm implements Algorithm
|
|
15 {
|
|
16 private final boolean isInitProcess;
|
|
17 private final boolean direction;
|
|
18 private final long num;
|
|
19 private long max;
|
|
20
|
|
21 public static final int MESSAGE_SIZE = 1 + 8; // size of (byte + long) in java
|
|
22
|
|
23 public KorachRotemSantoroAlgorithm(long _num,boolean _direction,boolean _isInitProcess)
|
|
24 {
|
|
25 num = _num;
|
|
26 max = -1;
|
|
27 direction = _direction;
|
|
28 isInitProcess = _isInitProcess;
|
|
29 }
|
|
30
|
|
31 /*
|
|
32 * message format.
|
|
33 * flag | long value
|
|
34 * 0xFF | 0xFF 0xFF 0xFF 0xFF 0xFF 0xFF 0xFF 0xFF
|
|
35 */
|
|
36 public static ByteBuffer s(boolean _flagMax,long _value)
|
|
37 {
|
|
38 ByteBuffer b = ByteBuffer.allocate(MESSAGE_SIZE);
|
|
39 b.put((_flagMax) ? (byte)1 : (byte)0);
|
|
40 b.putLong(_value);
|
|
41 b.rewind();
|
|
42 return b;
|
|
43 }
|
|
44
|
|
45 public P2<Message,Port> RECV(Port... ports)
|
|
46 {
|
|
47 while(true){
|
|
48 for(Port p : ports){
|
|
49 Message message = p.tryReceive();
|
|
50 if(message != null){
|
|
51 return P.p(message,p);
|
|
52 }
|
|
53 try{
|
|
54 Thread.sleep(1);
|
|
55 }catch(Exception _e){
|
|
56 _e.printStackTrace();
|
|
57 }
|
|
58 }
|
|
59 }
|
|
60
|
|
61 }
|
|
62
|
|
63 public void execute(Context _c)
|
|
64 {
|
|
65 Iterator<Port> ports = _c.getPorts().iterator();
|
|
66 Port left,right;
|
|
67 if(direction){
|
|
68 left = ports.next();
|
|
69 right = ports.next();
|
|
70 }else{
|
|
71 right = ports.next();
|
|
72 left = ports.next();
|
|
73 }
|
|
74
|
|
75 Message newMessage,receivedMessage;
|
|
76
|
|
77 if(isInitProcess){
|
|
78 /*
|
|
79 * initial process starts from here.
|
|
80 */
|
|
81 max = num;
|
|
82 right.send(new Message(s(false,max)));
|
|
83 }else{
|
|
84 /*
|
|
85 * waiting process starts from here.
|
|
86 */
|
|
87
|
|
88 P2<Message,Port> ret = RECV(right,left);
|
|
89 receivedMessage = ret._1();
|
|
90 Port receivedPort = ret._2();
|
|
91
|
|
92 ByteBuffer b = receivedMessage.getMessage();
|
|
93 long value = b.getLong(1); // see message format above
|
|
94 max = Math.max(num,value);
|
|
95
|
|
96 newMessage = receivedMessage.newMessage(s(false,max));
|
|
97 if(receivedPort == right){
|
|
98 left.send(newMessage);
|
|
99 }else{
|
|
100 right.send(newMessage);
|
|
101 }
|
|
102 }
|
|
103
|
|
104 /*
|
|
105 * label : L in text book.
|
|
106 */
|
|
107 while(true){
|
|
108 P2<Message,Port> ret = RECV(right,left);
|
|
109
|
|
110 receivedMessage = ret._1();
|
|
111 Port x = ret._2();
|
|
112
|
|
113 ByteBuffer b = receivedMessage.getMessage();
|
|
114 byte flagMax = b.get();
|
|
115 long value = b.getLong();
|
|
116 if(max == value){
|
|
117 newMessage = receivedMessage.newMessage(s(true,max));
|
|
118 if(x == right){
|
|
119 left.send(newMessage);
|
|
120 }else{
|
|
121 right.send(newMessage);
|
|
122 }
|
|
123
|
|
124 // stop;
|
|
125 return;
|
|
126 }
|
|
127
|
|
128 if(flagMax == 1){
|
|
129 max = value;
|
|
130 newMessage = receivedMessage.newMessage(s(true,max));
|
|
131 if(x == right){
|
|
132 left.send(newMessage);
|
|
133 }else{
|
|
134 right.send(newMessage);
|
|
135 }
|
|
136
|
|
137 // stop;
|
|
138 return;
|
|
139 }
|
|
140
|
|
141 if(value > max){
|
|
142 max = value;
|
|
143 newMessage = receivedMessage.newMessage(s(false,max));
|
|
144 if(x == right){
|
|
145 left.send(newMessage);
|
|
146 }else{
|
|
147 right.send(newMessage);
|
|
148 }
|
|
149 }
|
|
150 }
|
|
151 }
|
|
152 }
|