comparison src/main/java/fj/control/parallel/Strategy.java @ 0:fe80c1edf1be

add getLoop
author tatsuki
date Fri, 20 Mar 2015 21:04:03 +0900
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:fe80c1edf1be
1 package fj.control.parallel;
2
3 import fj.Effect;
4 import fj.F;
5 import fj.F2;
6 import fj.Function;
7 import fj.P;
8 import fj.P1;
9 import static fj.Function.compose;
10 import static fj.Function.curry;
11 import fj.data.Java;
12 import fj.data.List;
13 import fj.data.Array;
14 import fj.function.Effect1;
15
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.CompletionService;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.Future;
21 import java.util.concurrent.FutureTask;
22
23 /**
24 * Functional-style parallel evaluation strategies.
25 * A Strategy is a method of evaluating a product-1, yielding another product-1 from which the result of its evaluation
26 * can be retrieved at a later time.
27 * <p/>
28 *
29 * @version %build.number%
30 */
31 public final class Strategy<A> {
32
33 private final F<P1<A>, P1<A>> f;
34
35 private Strategy(final F<P1<A>, P1<A>> f) {
36 this.f = f;
37 }
38
39 /**
40 * Returns the functional representation of this Strategy, a function that evaluates a product-1.
41 *
42 * @return The function representing this strategy, which evaluates a product-1.
43 */
44 public F<P1<A>, P1<A>> f() {
45 return f;
46 }
47
48 /**
49 * Constructs a strategy from the given evaluation function.
50 *
51 * @param f The execution function for the strategy
52 * @return A strategy that uses the given function to evaluate product-1s.
53 */
54 public static <A> Strategy<A> strategy(final F<P1<A>, P1<A>> f) {
55 return new Strategy<A>(f);
56 }
57
58 /**
59 * Apply the strategy to the given product-1.
60 *
61 * @param a A P1 to evaluate according to this strategy.
62 * @return A P1 that yields the value from calling the given product-1.
63 */
64 public P1<A> par(final P1<A> a) {
65 return f().f(a);
66 }
67
68 /**
69 * Promotes a function to a concurrent function.
70 *
71 * @param f A function to promote to a concurrent function.
72 * @return A function that executes concurrently when called, yielding a Future value.
73 */
74 public <B> F<B, P1<A>> concurry(final F<B, A> f) {
75 return compose(f(), P1.<B, A>curry(f));
76 }
77
78 /**
79 * Promotes a function of arity-2 to a concurrent function.
80 *
81 * @param f The function to promote to a concurrent function.
82 * @return A function that executes concurrently when called, yielding a product-1 that returns the value.
83 */
84 public <B, C> F<B, F<C, P1<A>>> concurry(final F2<B, C, A> f) {
85 return new F<B, F<C, P1<A>>>() {
86 public F<C, P1<A>> f(final B b) {
87 return concurry(curry(f).f(b));
88 }
89 };
90 }
91
92 /**
93 * Waits for every Future in a list to obtain a value, and collects those values in a list.
94 *
95 * @param xs The list of Futures from which to get values.
96 * @return A list of values extracted from the Futures in the argument list.
97 */
98 public static <A> List<P1<A>> mergeAll(final List<Future<A>> xs) {
99 return xs.map(Strategy.<A>obtain());
100 }
101
102 /**
103 * Evaluates a list of product-1s in parallel.
104 *
105 * @param ps A list to evaluate in parallel.
106 * @return A list of the values of the product-1s in the argument.
107 */
108 public P1<List<A>> parList(final List<P1<A>> ps) {
109 return P1.sequence(ps.map(f()));
110 }
111
112 /**
113 * Maps the given function over the given list in parallel using this strategy.
114 *
115 * @param f A function to map over the given list in parallel.
116 * @param bs A list over which to map the given function in parallel.
117 * @return A product-1 that returns the list with all of its elements transformed by the given function.
118 */
119 public <B> P1<List<A>> parMap(final F<B, A> f, final List<B> bs) {
120 return P1.sequence(bs.map(concurry(f)));
121 }
122
123 /**
124 * Maps the given function over the given array in parallel using this strategy.
125 *
126 * @param f A function to map over the given array in parallel.
127 * @param bs An array over which to map the given function in parallel.
128 * @return A product-1 that returns the array with all of its elements transformed by the given function.
129 */
130 public <B> P1<Array<A>> parMap(final F<B, A> f, final Array<B> bs) {
131 return P1.sequence(bs.map(concurry(f)));
132 }
133
134 /**
135 * A strict version of parMap over lists.
136 * Maps the given function over the given list in parallel using this strategy,
137 * blocking the current thread until all values have been obtained.
138 *
139 * @param f A function to map over the given list in parallel.
140 * @param bs A list over which to map the given function in parallel.
141 * @return A list with all of its elements transformed by the given function.
142 */
143 public <B> List<A> parMap1(final F<B, A> f, final List<B> bs) {
144 return compose(P1.<List<A>>__1(), parMapList(f)).f(bs);
145 }
146
147 /**
148 * A strict version of parMap over arrays.
149 * Maps the given function over the given arrays in parallel using this strategy,
150 * blocking the current thread until all values have been obtained.
151 *
152 * @param f A function to map over the given array in parallel.
153 * @param bs An array over which to map the given function in parallel.
154 * @return An array with all of its elements transformed by the given function.
155 */
156 public <B> Array<A> parMap1(final F<B, A> f, final Array<B> bs) {
157 return compose(P1.<Array<A>>__1(), parMapArray(f)).f(bs);
158 }
159
160 /**
161 * Promotes a function to a parallel function on lists using this strategy.
162 *
163 * @param f A function to transform into a parallel function on lists.
164 * @return The function transformed into a parallel function on lists.
165 */
166 public <B> F<List<B>, P1<List<A>>> parMapList(final F<B, A> f) {
167 return new F<List<B>, P1<List<A>>>() {
168 public P1<List<A>> f(final List<B> as) {
169 return parMap(f, as);
170 }
171 };
172 }
173
174 /**
175 * First-class version of parMap on lists.
176 *
177 * @return A function that promotes another function to a parallel function on lists.
178 */
179 public <B> F<F<B, A>, F<List<B>, P1<List<A>>>> parMapList() {
180 return new F<F<B, A>, F<List<B>, P1<List<A>>>>() {
181 public F<List<B>, P1<List<A>>> f(final F<B, A> f) {
182 return parMapList(f);
183 }
184 };
185 }
186
187 /**
188 * First-class version of parMap1 on lists (parallel list functor).
189 *
190 * @return A function that promotes another function to a blocking parallel function on lists.
191 */
192 public <B> F<F<B, A>, F<List<B>, List<A>>> parMapList1() {
193 return new F<F<B, A>, F<List<B>, List<A>>>() {
194 public F<List<B>, List<A>> f(final F<B, A> f) {
195 return new F<List<B>, List<A>>() {
196 public List<A> f(final List<B> bs) {
197 return parMap1(f, bs);
198 }
199 };
200 }
201 };
202 }
203
204 /**
205 * Promotes a function to a parallel function on arrays using this strategy.
206 *
207 * @param f A function to transform into a parallel function on arrays.
208 * @return The function transformed into a parallel function on arrays.
209 */
210 public <B> F<Array<B>, P1<Array<A>>> parMapArray(final F<B, A> f) {
211 return new F<Array<B>, P1<Array<A>>>() {
212 public P1<Array<A>> f(final Array<B> as) {
213 return parMap(f, as);
214 }
215 };
216 }
217
218 /**
219 * First-class version of parMap on arrays.
220 *
221 * @return A function that promotes another function to a parallel function on arrays.
222 */
223 public <B> F<F<B, A>, F<Array<B>, P1<Array<A>>>> parMapArray() {
224 return new F<F<B, A>, F<Array<B>, P1<Array<A>>>>() {
225 public F<Array<B>, P1<Array<A>>> f(final F<B, A> f) {
226 return parMapArray(f);
227 }
228 };
229 }
230
231 /**
232 * First-class version of parMap1 on arrays (parallel array functor).
233 *
234 * @return A function that promotes another function to a blocking parallel function on arrays.
235 */
236 public <B> F<F<B, A>, F<Array<B>, Array<A>>> parMapArray1() {
237 return new F<F<B, A>, F<Array<B>, Array<A>>>() {
238 public F<Array<B>, Array<A>> f(final F<B, A> f) {
239 return new F<Array<B>, Array<A>>() {
240 public Array<A> f(final Array<B> bs) {
241 return parMap1(f, bs);
242 }
243 };
244 }
245 };
246 }
247
248 /**
249 * Binds the given function in parallel across the given list, using the given strategy, with a final join.
250 *
251 * @param s The strategy to use for parallelization.
252 * @param f The function to bind across the given list.
253 * @param as The list across which to bind the given function.
254 * @return A P1 containing the result of the parallel map operation after the final join.
255 */
256 public static <A, B> P1<List<B>> parFlatMap(final Strategy<List<B>> s,
257 final F<A, List<B>> f,
258 final List<A> as) {
259 return P1.fmap(List.<B>join()).f(s.parMap(f, as));
260 }
261
262 /**
263 * Binds the given function in parallel across the given array, using the given strategy, with a final join.
264 *
265 * @param s The strategy to use for parallelization.
266 * @param f The function to bind across the given array.
267 * @param as The array across which to bind the given function.
268 * @return A P1 containing the result of the parallel map operation after the final join.
269 */
270 public static <A, B> P1<Array<B>> parFlatMap(final Strategy<Array<B>> s,
271 final F<A, Array<B>> f,
272 final Array<A> as) {
273 return P1.fmap(Array.<B>join()).f(s.parMap(f, as));
274 }
275
276 /**
277 * Sequentially evaluates chunks (sub-sequences) of a list in parallel. Splits the list into chunks,
278 * evaluating the chunks simultaneously, but each chunk as a sequence.
279 *
280 * @param s The strategy to use for parallelization.
281 * @param chunkLength The length of each sequence.
282 * @param as The list to evaluate in parallel chunks.
283 * @return A product-1 containing the list of results extracted from the given list of product-1s.
284 */
285 public static <A> P1<List<A>> parListChunk(final Strategy<List<A>> s,
286 final int chunkLength,
287 final List<P1<A>> as) {
288 return P1.fmap(List.<A>join()).f(s.parList(as.partition(chunkLength).map(P1.<A>sequenceList())));
289 }
290
291 /**
292 * Zips together two lists in parallel using a given function, with this strategy.
293 * Calls the given function once for each corresponding pair in the lists, position-wise,
294 * passing elements from the first list to the first argument of the function, and elements from the second list
295 * to the second argument of the function, yielding a list of the results.
296 * If the lists are not of the same length, the remaining elements of the longer list are ignored.
297 *
298 * @param f The function of arity-2 with which to zip.
299 * @param bs A list to zip with the given function.
300 * @param cs A list to zip with the given function.
301 * @return The list of the results of calling the given function on corresponding elements of the given lists.
302 */
303 public <B, C> P1<List<A>> parZipWith(final F2<B, C, A> f, final List<B> bs, final List<C> cs) {
304 return P1.sequence(bs.zipWith(cs, concurry(f)));
305 }
306
307 /**
308 * Zips together two arrays in parallel using a given function, with this strategy.
309 * Calls the given function once for each corresponding pair in the arrays, position-wise,
310 * passing elements from the first array to the first argument of the function, and elements from the second array
311 * to the second argument of the function, yielding a array of the results.
312 * If the arrays are not of the same length, the remaining elements of the longer array are ignored.
313 *
314 * @param f The function of arity-2 with which to zip.
315 * @param bs A array to zip with the given function.
316 * @param cs A array to zip with the given function.
317 * @return The array of the results of calling the given function on corresponding elements of the given arrays.
318 */
319 public <B, C> P1<Array<A>> parZipWith(final F2<B, C, A> f, final Array<B> bs, final Array<C> cs) {
320 return P1.sequence(bs.zipWith(cs, concurry(f)));
321 }
322
323 /**
324 * Lifts a given function of arity-2 so that it zips together two lists in parallel,
325 * using this strategy, calling the function once for each corresponding pair in the lists, position-wise.
326 *
327 * @param f The function of arity-2 with which to zip.
328 * @return A transformation that zips two lists using the argument function, in parallel.
329 */
330 public <B, C> F2<List<B>, List<C>, P1<List<A>>> parZipListWith(final F2<B, C, A> f) {
331 return new F2<List<B>, List<C>, P1<List<A>>>() {
332 public P1<List<A>> f(final List<B> bs, final List<C> cs) {
333 return parZipWith(f, bs, cs);
334 }
335 };
336 }
337
338 /**
339 * Lifts a given function of arity-2 so that it zips together two arrays in parallel,
340 * using this strategy, calling the function once for each corresponding pair in the arrays, position-wise.
341 *
342 * @param f The function of arity-2 with which to zip.
343 * @return A transformation that zips two arrays using the argument function, in parallel.
344 */
345 public <B, C> F2<Array<B>, Array<C>, P1<Array<A>>> parZipArrayWith(final F2<B, C, A> f) {
346 return new F2<Array<B>, Array<C>, P1<Array<A>>>() {
347 public P1<Array<A>> f(final Array<B> bs, final Array<C> cs) {
348 return parZipWith(f, bs, cs);
349 }
350 };
351 }
352
353 /**
354 * Returns a function which returns a product-1 which waits for the given Future to obtain a value.
355 *
356 * @return A function which, given a Future, yields a product-1 that waits for it.
357 */
358 public static <A> F<Future<A>, P1<A>> obtain() {
359 return new F<Future<A>, P1<A>>() {
360 public P1<A> f(final Future<A> t) {
361 return obtain(t);
362 }
363 };
364 }
365
366 /**
367 * Provides a product-1 that waits for the given future to obtain a value.
368 *
369 * @param t A Future for which to wait.
370 * @return A product-1 that waits for the given future to obtain a value.
371 */
372 public static <A> P1<A> obtain(final Future<A> t) {
373 return new P1<A>() {
374 public A _1() {
375 try {
376 return t.get();
377 } catch (InterruptedException e) {
378 Thread.currentThread().interrupt();
379 throw new Error(e);
380 } catch (ExecutionException e) {
381 throw new Error(e);
382 }
383 }
384 };
385 }
386
387 /**
388 * Returns an Effect that waits for a given Future to obtain a value, discarding the value.
389 *
390 * @return An effect, which, given a Future, waits for it to obtain a value, discarding the value.
391 */
392 public static <A> Effect1<Future<A>> discard() {
393 return new Effect1<Future<A>>() {
394 public void f(final Future<A> a) {
395 Strategy.<A>obtain().f(a)._1();
396 }
397 };
398 }
399
400 /**
401 * Provides a simple parallelization strategy that creates, and discards, a new thread for
402 * every evaluation.
403 *
404 * @return a simple parallelization strategy that creates, and discards, a new thread for
405 * every evaluation.
406 */
407 public static <A> Strategy<A> simpleThreadStrategy() {
408 return strategy(new F<P1<A>, P1<A>>() {
409 public P1<A> f(final P1<A> p) {
410 final FutureTask<A> t = new FutureTask<A>(Java.<A>P1_Callable().f(p));
411 new Thread(t).start();
412 return obtain(t);
413 }
414 });
415 }
416
417 /**
418 * Provides a parallelization strategy that uses an ExecutorService to control the method and
419 * degree of parallelism.
420 *
421 * @param s The ExecutorService to use for scheduling evaluations.
422 * @return A Strategy that uses the provided ExecutorService to control the method and degree
423 * of parallelism.
424 */
425 public static <A> Strategy<A> executorStrategy(final ExecutorService s) {
426 return strategy(new F<P1<A>, P1<A>>() {
427 public P1<A> f(final P1<A> p) {
428 return obtain(s.submit(Java.<A>P1_Callable().f(p)));
429 }
430 });
431 }
432
433 /**
434 * Provides a parallelization strategy that uses a CompletionService to control the method and
435 * degree of parallelism, and where each parallel task's completion is registered with the service.
436 *
437 * @param s The CompletionService to use for scheduling evaluations and detect their completion.
438 * @return A Strategy that uses the provided CompletionService to control the method and degree of parallelism,
439 * and notifies the service of task completion.
440 */
441 public static <A> Strategy<A> completionStrategy(final CompletionService<A> s) {
442 return strategy(new F<P1<A>, P1<A>>() {
443 public P1<A> f(final P1<A> p) {
444 return obtain(s.submit(Java.<A>P1_Callable().f(p)));
445 }
446 });
447 }
448
449 /**
450 * Provides a strategy that performs sequential (non-concurrent) evaluation of its argument.
451 *
452 * @return A strategy that performs sequential (non-concurrent) evaluation of its argument.
453 */
454 public static <A> Strategy<A> seqStrategy() {
455 return strategy(new F<P1<A>, P1<A>>() {
456 public P1<A> f(final P1<A> a) {
457 return P.p(a._1());
458 }
459 });
460 }
461
462 /**
463 * Provides a strategy that performs no evaluation of its argument.
464 *
465 * @return A strategy that performs no evaluation of its argument.
466 */
467 public static <A> Strategy<A> idStrategy() {
468 return strategy(Function.<P1<A>>identity());
469 }
470
471 /**
472 * Maps the given bijective transformation across this strategy (Exponential Functor pattern).
473 *
474 * @param f A transformation from this strategy's codomain to the resulting strategy's codomain.
475 * @param g A transformation from the resulting strategy's domain to this strategy's domain.
476 * @return A new strategy that maps to this strategy and back again.
477 */
478 public <B> Strategy<B> xmap(final F<P1<A>, P1<B>> f, final F<P1<B>, P1<A>> g) {
479 return strategy(compose(f, compose(f(), g)));
480 }
481
482 /**
483 * Maps the given transformation across this strategy's domain (Invariant Functor pattern).
484 *
485 * @param f A transformation from this strategy's codomain to the resulting strategy's codomain.
486 * @return A new strategy that applies the given transformation after each application of this strategy.
487 */
488 public Strategy<A> map(final F<P1<A>, P1<A>> f) {
489 return xmap(f, Function.<P1<A>>identity());
490 }
491
492 /**
493 * Maps the given transformation across this strategy's codomain (Invariant Functor pattern).
494 *
495 * @param f A transformation from the resulting strategy's domain to this strategy's domain.
496 * @return A new strategy that applies the given transformation before each application of this strategy.
497 */
498 public Strategy<A> comap(final F<P1<A>, P1<A>> f) {
499 return xmap(Function.<P1<A>>identity(), f);
500 }
501
502 /**
503 * Provides an error-handling strategy. Captures any uncaught runtime errors encountered by this strategy and applies
504 * the given side-effect to them.
505 *
506 * @param e The effect that should handle errors.
507 * @return A strategy that captures any runtime errors with a side-effect.
508 */
509 public Strategy<A> errorStrategy(final Effect1<Error> e) {
510 return errorStrategy(this, e);
511 }
512
513 /**
514 * Provides an error-handling strategy. Captures any uncaught runtime errors encountered by the given strategy
515 * and applies the given side-effect to them.
516 *
517 * @param s The strategy to equip with an error-handling effect.
518 * @param e The effect that should handle errors.
519 * @return A strategy that captures any runtime errors with a side-effect.
520 */
521 public static <A> Strategy<A> errorStrategy(final Strategy<A> s, final Effect1<Error> e) {
522 return s.comap(new F<P1<A>, P1<A>>() {
523 public P1<A> f(final P1<A> a) {
524 return new P1<A>() {
525 public A _1() {
526 try {
527 return a._1();
528 } catch (Throwable t) {
529 final Error error = new Error(t);
530 e.f(error);
531 throw error;
532 }
533 }
534 };
535 }
536 });
537 }
538
539 /**
540 * Provides a normalising strategy that fully evaluates its Callable argument.
541 *
542 * @param s A non-normalising strategy to use for the evaluation.
543 * @return A new strategy that fully evaluates Callables, using the given strategy.
544 */
545 public static <A> Strategy<Callable<A>> callableStrategy(final Strategy<Callable<A>> s) {
546 return s.comap(new F<P1<Callable<A>>, P1<Callable<A>>>() {
547 public P1<Callable<A>> f(final P1<Callable<A>> a) {
548 return P1.curry(Callables.<A>normalise()).f(a._1());
549 }
550 });
551 }
552
553 }