Mercurial > hg > Members > tatsuki > functionaljava-master > core
comparison src/main/java/fj/control/parallel/Strategy.java @ 0:fe80c1edf1be
add getLoop
author | tatsuki |
---|---|
date | Fri, 20 Mar 2015 21:04:03 +0900 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:fe80c1edf1be |
---|---|
1 package fj.control.parallel; | |
2 | |
3 import fj.Effect; | |
4 import fj.F; | |
5 import fj.F2; | |
6 import fj.Function; | |
7 import fj.P; | |
8 import fj.P1; | |
9 import static fj.Function.compose; | |
10 import static fj.Function.curry; | |
11 import fj.data.Java; | |
12 import fj.data.List; | |
13 import fj.data.Array; | |
14 import fj.function.Effect1; | |
15 | |
16 import java.util.concurrent.Callable; | |
17 import java.util.concurrent.CompletionService; | |
18 import java.util.concurrent.ExecutionException; | |
19 import java.util.concurrent.ExecutorService; | |
20 import java.util.concurrent.Future; | |
21 import java.util.concurrent.FutureTask; | |
22 | |
23 /** | |
24 * Functional-style parallel evaluation strategies. | |
25 * A Strategy is a method of evaluating a product-1, yielding another product-1 from which the result of its evaluation | |
26 * can be retrieved at a later time. | |
27 * <p/> | |
28 * | |
29 * @version %build.number% | |
30 */ | |
31 public final class Strategy<A> { | |
32 | |
33 private final F<P1<A>, P1<A>> f; | |
34 | |
35 private Strategy(final F<P1<A>, P1<A>> f) { | |
36 this.f = f; | |
37 } | |
38 | |
39 /** | |
40 * Returns the functional representation of this Strategy, a function that evaluates a product-1. | |
41 * | |
42 * @return The function representing this strategy, which evaluates a product-1. | |
43 */ | |
44 public F<P1<A>, P1<A>> f() { | |
45 return f; | |
46 } | |
47 | |
48 /** | |
49 * Constructs a strategy from the given evaluation function. | |
50 * | |
51 * @param f The execution function for the strategy | |
52 * @return A strategy that uses the given function to evaluate product-1s. | |
53 */ | |
54 public static <A> Strategy<A> strategy(final F<P1<A>, P1<A>> f) { | |
55 return new Strategy<A>(f); | |
56 } | |
57 | |
58 /** | |
59 * Apply the strategy to the given product-1. | |
60 * | |
61 * @param a A P1 to evaluate according to this strategy. | |
62 * @return A P1 that yields the value from calling the given product-1. | |
63 */ | |
64 public P1<A> par(final P1<A> a) { | |
65 return f().f(a); | |
66 } | |
67 | |
68 /** | |
69 * Promotes a function to a concurrent function. | |
70 * | |
71 * @param f A function to promote to a concurrent function. | |
72 * @return A function that executes concurrently when called, yielding a Future value. | |
73 */ | |
74 public <B> F<B, P1<A>> concurry(final F<B, A> f) { | |
75 return compose(f(), P1.<B, A>curry(f)); | |
76 } | |
77 | |
78 /** | |
79 * Promotes a function of arity-2 to a concurrent function. | |
80 * | |
81 * @param f The function to promote to a concurrent function. | |
82 * @return A function that executes concurrently when called, yielding a product-1 that returns the value. | |
83 */ | |
84 public <B, C> F<B, F<C, P1<A>>> concurry(final F2<B, C, A> f) { | |
85 return new F<B, F<C, P1<A>>>() { | |
86 public F<C, P1<A>> f(final B b) { | |
87 return concurry(curry(f).f(b)); | |
88 } | |
89 }; | |
90 } | |
91 | |
92 /** | |
93 * Waits for every Future in a list to obtain a value, and collects those values in a list. | |
94 * | |
95 * @param xs The list of Futures from which to get values. | |
96 * @return A list of values extracted from the Futures in the argument list. | |
97 */ | |
98 public static <A> List<P1<A>> mergeAll(final List<Future<A>> xs) { | |
99 return xs.map(Strategy.<A>obtain()); | |
100 } | |
101 | |
102 /** | |
103 * Evaluates a list of product-1s in parallel. | |
104 * | |
105 * @param ps A list to evaluate in parallel. | |
106 * @return A list of the values of the product-1s in the argument. | |
107 */ | |
108 public P1<List<A>> parList(final List<P1<A>> ps) { | |
109 return P1.sequence(ps.map(f())); | |
110 } | |
111 | |
112 /** | |
113 * Maps the given function over the given list in parallel using this strategy. | |
114 * | |
115 * @param f A function to map over the given list in parallel. | |
116 * @param bs A list over which to map the given function in parallel. | |
117 * @return A product-1 that returns the list with all of its elements transformed by the given function. | |
118 */ | |
119 public <B> P1<List<A>> parMap(final F<B, A> f, final List<B> bs) { | |
120 return P1.sequence(bs.map(concurry(f))); | |
121 } | |
122 | |
123 /** | |
124 * Maps the given function over the given array in parallel using this strategy. | |
125 * | |
126 * @param f A function to map over the given array in parallel. | |
127 * @param bs An array over which to map the given function in parallel. | |
128 * @return A product-1 that returns the array with all of its elements transformed by the given function. | |
129 */ | |
130 public <B> P1<Array<A>> parMap(final F<B, A> f, final Array<B> bs) { | |
131 return P1.sequence(bs.map(concurry(f))); | |
132 } | |
133 | |
134 /** | |
135 * A strict version of parMap over lists. | |
136 * Maps the given function over the given list in parallel using this strategy, | |
137 * blocking the current thread until all values have been obtained. | |
138 * | |
139 * @param f A function to map over the given list in parallel. | |
140 * @param bs A list over which to map the given function in parallel. | |
141 * @return A list with all of its elements transformed by the given function. | |
142 */ | |
143 public <B> List<A> parMap1(final F<B, A> f, final List<B> bs) { | |
144 return compose(P1.<List<A>>__1(), parMapList(f)).f(bs); | |
145 } | |
146 | |
147 /** | |
148 * A strict version of parMap over arrays. | |
149 * Maps the given function over the given arrays in parallel using this strategy, | |
150 * blocking the current thread until all values have been obtained. | |
151 * | |
152 * @param f A function to map over the given array in parallel. | |
153 * @param bs An array over which to map the given function in parallel. | |
154 * @return An array with all of its elements transformed by the given function. | |
155 */ | |
156 public <B> Array<A> parMap1(final F<B, A> f, final Array<B> bs) { | |
157 return compose(P1.<Array<A>>__1(), parMapArray(f)).f(bs); | |
158 } | |
159 | |
160 /** | |
161 * Promotes a function to a parallel function on lists using this strategy. | |
162 * | |
163 * @param f A function to transform into a parallel function on lists. | |
164 * @return The function transformed into a parallel function on lists. | |
165 */ | |
166 public <B> F<List<B>, P1<List<A>>> parMapList(final F<B, A> f) { | |
167 return new F<List<B>, P1<List<A>>>() { | |
168 public P1<List<A>> f(final List<B> as) { | |
169 return parMap(f, as); | |
170 } | |
171 }; | |
172 } | |
173 | |
174 /** | |
175 * First-class version of parMap on lists. | |
176 * | |
177 * @return A function that promotes another function to a parallel function on lists. | |
178 */ | |
179 public <B> F<F<B, A>, F<List<B>, P1<List<A>>>> parMapList() { | |
180 return new F<F<B, A>, F<List<B>, P1<List<A>>>>() { | |
181 public F<List<B>, P1<List<A>>> f(final F<B, A> f) { | |
182 return parMapList(f); | |
183 } | |
184 }; | |
185 } | |
186 | |
187 /** | |
188 * First-class version of parMap1 on lists (parallel list functor). | |
189 * | |
190 * @return A function that promotes another function to a blocking parallel function on lists. | |
191 */ | |
192 public <B> F<F<B, A>, F<List<B>, List<A>>> parMapList1() { | |
193 return new F<F<B, A>, F<List<B>, List<A>>>() { | |
194 public F<List<B>, List<A>> f(final F<B, A> f) { | |
195 return new F<List<B>, List<A>>() { | |
196 public List<A> f(final List<B> bs) { | |
197 return parMap1(f, bs); | |
198 } | |
199 }; | |
200 } | |
201 }; | |
202 } | |
203 | |
204 /** | |
205 * Promotes a function to a parallel function on arrays using this strategy. | |
206 * | |
207 * @param f A function to transform into a parallel function on arrays. | |
208 * @return The function transformed into a parallel function on arrays. | |
209 */ | |
210 public <B> F<Array<B>, P1<Array<A>>> parMapArray(final F<B, A> f) { | |
211 return new F<Array<B>, P1<Array<A>>>() { | |
212 public P1<Array<A>> f(final Array<B> as) { | |
213 return parMap(f, as); | |
214 } | |
215 }; | |
216 } | |
217 | |
218 /** | |
219 * First-class version of parMap on arrays. | |
220 * | |
221 * @return A function that promotes another function to a parallel function on arrays. | |
222 */ | |
223 public <B> F<F<B, A>, F<Array<B>, P1<Array<A>>>> parMapArray() { | |
224 return new F<F<B, A>, F<Array<B>, P1<Array<A>>>>() { | |
225 public F<Array<B>, P1<Array<A>>> f(final F<B, A> f) { | |
226 return parMapArray(f); | |
227 } | |
228 }; | |
229 } | |
230 | |
231 /** | |
232 * First-class version of parMap1 on arrays (parallel array functor). | |
233 * | |
234 * @return A function that promotes another function to a blocking parallel function on arrays. | |
235 */ | |
236 public <B> F<F<B, A>, F<Array<B>, Array<A>>> parMapArray1() { | |
237 return new F<F<B, A>, F<Array<B>, Array<A>>>() { | |
238 public F<Array<B>, Array<A>> f(final F<B, A> f) { | |
239 return new F<Array<B>, Array<A>>() { | |
240 public Array<A> f(final Array<B> bs) { | |
241 return parMap1(f, bs); | |
242 } | |
243 }; | |
244 } | |
245 }; | |
246 } | |
247 | |
248 /** | |
249 * Binds the given function in parallel across the given list, using the given strategy, with a final join. | |
250 * | |
251 * @param s The strategy to use for parallelization. | |
252 * @param f The function to bind across the given list. | |
253 * @param as The list across which to bind the given function. | |
254 * @return A P1 containing the result of the parallel map operation after the final join. | |
255 */ | |
256 public static <A, B> P1<List<B>> parFlatMap(final Strategy<List<B>> s, | |
257 final F<A, List<B>> f, | |
258 final List<A> as) { | |
259 return P1.fmap(List.<B>join()).f(s.parMap(f, as)); | |
260 } | |
261 | |
262 /** | |
263 * Binds the given function in parallel across the given array, using the given strategy, with a final join. | |
264 * | |
265 * @param s The strategy to use for parallelization. | |
266 * @param f The function to bind across the given array. | |
267 * @param as The array across which to bind the given function. | |
268 * @return A P1 containing the result of the parallel map operation after the final join. | |
269 */ | |
270 public static <A, B> P1<Array<B>> parFlatMap(final Strategy<Array<B>> s, | |
271 final F<A, Array<B>> f, | |
272 final Array<A> as) { | |
273 return P1.fmap(Array.<B>join()).f(s.parMap(f, as)); | |
274 } | |
275 | |
276 /** | |
277 * Sequentially evaluates chunks (sub-sequences) of a list in parallel. Splits the list into chunks, | |
278 * evaluating the chunks simultaneously, but each chunk as a sequence. | |
279 * | |
280 * @param s The strategy to use for parallelization. | |
281 * @param chunkLength The length of each sequence. | |
282 * @param as The list to evaluate in parallel chunks. | |
283 * @return A product-1 containing the list of results extracted from the given list of product-1s. | |
284 */ | |
285 public static <A> P1<List<A>> parListChunk(final Strategy<List<A>> s, | |
286 final int chunkLength, | |
287 final List<P1<A>> as) { | |
288 return P1.fmap(List.<A>join()).f(s.parList(as.partition(chunkLength).map(P1.<A>sequenceList()))); | |
289 } | |
290 | |
291 /** | |
292 * Zips together two lists in parallel using a given function, with this strategy. | |
293 * Calls the given function once for each corresponding pair in the lists, position-wise, | |
294 * passing elements from the first list to the first argument of the function, and elements from the second list | |
295 * to the second argument of the function, yielding a list of the results. | |
296 * If the lists are not of the same length, the remaining elements of the longer list are ignored. | |
297 * | |
298 * @param f The function of arity-2 with which to zip. | |
299 * @param bs A list to zip with the given function. | |
300 * @param cs A list to zip with the given function. | |
301 * @return The list of the results of calling the given function on corresponding elements of the given lists. | |
302 */ | |
303 public <B, C> P1<List<A>> parZipWith(final F2<B, C, A> f, final List<B> bs, final List<C> cs) { | |
304 return P1.sequence(bs.zipWith(cs, concurry(f))); | |
305 } | |
306 | |
307 /** | |
308 * Zips together two arrays in parallel using a given function, with this strategy. | |
309 * Calls the given function once for each corresponding pair in the arrays, position-wise, | |
310 * passing elements from the first array to the first argument of the function, and elements from the second array | |
311 * to the second argument of the function, yielding a array of the results. | |
312 * If the arrays are not of the same length, the remaining elements of the longer array are ignored. | |
313 * | |
314 * @param f The function of arity-2 with which to zip. | |
315 * @param bs A array to zip with the given function. | |
316 * @param cs A array to zip with the given function. | |
317 * @return The array of the results of calling the given function on corresponding elements of the given arrays. | |
318 */ | |
319 public <B, C> P1<Array<A>> parZipWith(final F2<B, C, A> f, final Array<B> bs, final Array<C> cs) { | |
320 return P1.sequence(bs.zipWith(cs, concurry(f))); | |
321 } | |
322 | |
323 /** | |
324 * Lifts a given function of arity-2 so that it zips together two lists in parallel, | |
325 * using this strategy, calling the function once for each corresponding pair in the lists, position-wise. | |
326 * | |
327 * @param f The function of arity-2 with which to zip. | |
328 * @return A transformation that zips two lists using the argument function, in parallel. | |
329 */ | |
330 public <B, C> F2<List<B>, List<C>, P1<List<A>>> parZipListWith(final F2<B, C, A> f) { | |
331 return new F2<List<B>, List<C>, P1<List<A>>>() { | |
332 public P1<List<A>> f(final List<B> bs, final List<C> cs) { | |
333 return parZipWith(f, bs, cs); | |
334 } | |
335 }; | |
336 } | |
337 | |
338 /** | |
339 * Lifts a given function of arity-2 so that it zips together two arrays in parallel, | |
340 * using this strategy, calling the function once for each corresponding pair in the arrays, position-wise. | |
341 * | |
342 * @param f The function of arity-2 with which to zip. | |
343 * @return A transformation that zips two arrays using the argument function, in parallel. | |
344 */ | |
345 public <B, C> F2<Array<B>, Array<C>, P1<Array<A>>> parZipArrayWith(final F2<B, C, A> f) { | |
346 return new F2<Array<B>, Array<C>, P1<Array<A>>>() { | |
347 public P1<Array<A>> f(final Array<B> bs, final Array<C> cs) { | |
348 return parZipWith(f, bs, cs); | |
349 } | |
350 }; | |
351 } | |
352 | |
353 /** | |
354 * Returns a function which returns a product-1 which waits for the given Future to obtain a value. | |
355 * | |
356 * @return A function which, given a Future, yields a product-1 that waits for it. | |
357 */ | |
358 public static <A> F<Future<A>, P1<A>> obtain() { | |
359 return new F<Future<A>, P1<A>>() { | |
360 public P1<A> f(final Future<A> t) { | |
361 return obtain(t); | |
362 } | |
363 }; | |
364 } | |
365 | |
366 /** | |
367 * Provides a product-1 that waits for the given future to obtain a value. | |
368 * | |
369 * @param t A Future for which to wait. | |
370 * @return A product-1 that waits for the given future to obtain a value. | |
371 */ | |
372 public static <A> P1<A> obtain(final Future<A> t) { | |
373 return new P1<A>() { | |
374 public A _1() { | |
375 try { | |
376 return t.get(); | |
377 } catch (InterruptedException e) { | |
378 Thread.currentThread().interrupt(); | |
379 throw new Error(e); | |
380 } catch (ExecutionException e) { | |
381 throw new Error(e); | |
382 } | |
383 } | |
384 }; | |
385 } | |
386 | |
387 /** | |
388 * Returns an Effect that waits for a given Future to obtain a value, discarding the value. | |
389 * | |
390 * @return An effect, which, given a Future, waits for it to obtain a value, discarding the value. | |
391 */ | |
392 public static <A> Effect1<Future<A>> discard() { | |
393 return new Effect1<Future<A>>() { | |
394 public void f(final Future<A> a) { | |
395 Strategy.<A>obtain().f(a)._1(); | |
396 } | |
397 }; | |
398 } | |
399 | |
400 /** | |
401 * Provides a simple parallelization strategy that creates, and discards, a new thread for | |
402 * every evaluation. | |
403 * | |
404 * @return a simple parallelization strategy that creates, and discards, a new thread for | |
405 * every evaluation. | |
406 */ | |
407 public static <A> Strategy<A> simpleThreadStrategy() { | |
408 return strategy(new F<P1<A>, P1<A>>() { | |
409 public P1<A> f(final P1<A> p) { | |
410 final FutureTask<A> t = new FutureTask<A>(Java.<A>P1_Callable().f(p)); | |
411 new Thread(t).start(); | |
412 return obtain(t); | |
413 } | |
414 }); | |
415 } | |
416 | |
417 /** | |
418 * Provides a parallelization strategy that uses an ExecutorService to control the method and | |
419 * degree of parallelism. | |
420 * | |
421 * @param s The ExecutorService to use for scheduling evaluations. | |
422 * @return A Strategy that uses the provided ExecutorService to control the method and degree | |
423 * of parallelism. | |
424 */ | |
425 public static <A> Strategy<A> executorStrategy(final ExecutorService s) { | |
426 return strategy(new F<P1<A>, P1<A>>() { | |
427 public P1<A> f(final P1<A> p) { | |
428 return obtain(s.submit(Java.<A>P1_Callable().f(p))); | |
429 } | |
430 }); | |
431 } | |
432 | |
433 /** | |
434 * Provides a parallelization strategy that uses a CompletionService to control the method and | |
435 * degree of parallelism, and where each parallel task's completion is registered with the service. | |
436 * | |
437 * @param s The CompletionService to use for scheduling evaluations and detect their completion. | |
438 * @return A Strategy that uses the provided CompletionService to control the method and degree of parallelism, | |
439 * and notifies the service of task completion. | |
440 */ | |
441 public static <A> Strategy<A> completionStrategy(final CompletionService<A> s) { | |
442 return strategy(new F<P1<A>, P1<A>>() { | |
443 public P1<A> f(final P1<A> p) { | |
444 return obtain(s.submit(Java.<A>P1_Callable().f(p))); | |
445 } | |
446 }); | |
447 } | |
448 | |
449 /** | |
450 * Provides a strategy that performs sequential (non-concurrent) evaluation of its argument. | |
451 * | |
452 * @return A strategy that performs sequential (non-concurrent) evaluation of its argument. | |
453 */ | |
454 public static <A> Strategy<A> seqStrategy() { | |
455 return strategy(new F<P1<A>, P1<A>>() { | |
456 public P1<A> f(final P1<A> a) { | |
457 return P.p(a._1()); | |
458 } | |
459 }); | |
460 } | |
461 | |
462 /** | |
463 * Provides a strategy that performs no evaluation of its argument. | |
464 * | |
465 * @return A strategy that performs no evaluation of its argument. | |
466 */ | |
467 public static <A> Strategy<A> idStrategy() { | |
468 return strategy(Function.<P1<A>>identity()); | |
469 } | |
470 | |
471 /** | |
472 * Maps the given bijective transformation across this strategy (Exponential Functor pattern). | |
473 * | |
474 * @param f A transformation from this strategy's codomain to the resulting strategy's codomain. | |
475 * @param g A transformation from the resulting strategy's domain to this strategy's domain. | |
476 * @return A new strategy that maps to this strategy and back again. | |
477 */ | |
478 public <B> Strategy<B> xmap(final F<P1<A>, P1<B>> f, final F<P1<B>, P1<A>> g) { | |
479 return strategy(compose(f, compose(f(), g))); | |
480 } | |
481 | |
482 /** | |
483 * Maps the given transformation across this strategy's domain (Invariant Functor pattern). | |
484 * | |
485 * @param f A transformation from this strategy's codomain to the resulting strategy's codomain. | |
486 * @return A new strategy that applies the given transformation after each application of this strategy. | |
487 */ | |
488 public Strategy<A> map(final F<P1<A>, P1<A>> f) { | |
489 return xmap(f, Function.<P1<A>>identity()); | |
490 } | |
491 | |
492 /** | |
493 * Maps the given transformation across this strategy's codomain (Invariant Functor pattern). | |
494 * | |
495 * @param f A transformation from the resulting strategy's domain to this strategy's domain. | |
496 * @return A new strategy that applies the given transformation before each application of this strategy. | |
497 */ | |
498 public Strategy<A> comap(final F<P1<A>, P1<A>> f) { | |
499 return xmap(Function.<P1<A>>identity(), f); | |
500 } | |
501 | |
502 /** | |
503 * Provides an error-handling strategy. Captures any uncaught runtime errors encountered by this strategy and applies | |
504 * the given side-effect to them. | |
505 * | |
506 * @param e The effect that should handle errors. | |
507 * @return A strategy that captures any runtime errors with a side-effect. | |
508 */ | |
509 public Strategy<A> errorStrategy(final Effect1<Error> e) { | |
510 return errorStrategy(this, e); | |
511 } | |
512 | |
513 /** | |
514 * Provides an error-handling strategy. Captures any uncaught runtime errors encountered by the given strategy | |
515 * and applies the given side-effect to them. | |
516 * | |
517 * @param s The strategy to equip with an error-handling effect. | |
518 * @param e The effect that should handle errors. | |
519 * @return A strategy that captures any runtime errors with a side-effect. | |
520 */ | |
521 public static <A> Strategy<A> errorStrategy(final Strategy<A> s, final Effect1<Error> e) { | |
522 return s.comap(new F<P1<A>, P1<A>>() { | |
523 public P1<A> f(final P1<A> a) { | |
524 return new P1<A>() { | |
525 public A _1() { | |
526 try { | |
527 return a._1(); | |
528 } catch (Throwable t) { | |
529 final Error error = new Error(t); | |
530 e.f(error); | |
531 throw error; | |
532 } | |
533 } | |
534 }; | |
535 } | |
536 }); | |
537 } | |
538 | |
539 /** | |
540 * Provides a normalising strategy that fully evaluates its Callable argument. | |
541 * | |
542 * @param s A non-normalising strategy to use for the evaluation. | |
543 * @return A new strategy that fully evaluates Callables, using the given strategy. | |
544 */ | |
545 public static <A> Strategy<Callable<A>> callableStrategy(final Strategy<Callable<A>> s) { | |
546 return s.comap(new F<P1<Callable<A>>, P1<Callable<A>>>() { | |
547 public P1<Callable<A>> f(final P1<Callable<A>> a) { | |
548 return P1.curry(Callables.<A>normalise()).f(a._1()); | |
549 } | |
550 }); | |
551 } | |
552 | |
553 } |