diff src/main/java/christie/codegear/PriorityThreadPoolExecutors.java @ 196:ad49723367c2

add priority
author akahori
date Sat, 09 Mar 2019 14:03:06 +0900
parents
children ef5aad739292
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/codegear/PriorityThreadPoolExecutors.java	Sat Mar 09 14:03:06 2019 +0900
@@ -0,0 +1,133 @@
+package christie.codegear;
+
+import java.util.Comparator;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+// https://stackoverflow.com/questions/807223/how-do-i-implement-task-prioritization-using-an-executorservice-in-java-5/42831172#42831172
+public class PriorityThreadPoolExecutors {
+
+    public static ThreadPoolExecutor createThreadPool(int nThreads, int keepAliveTime) {
+        return new PriorityThreadPoolExecutor(nThreads, nThreads, keepAliveTime, TimeUnit.MILLISECONDS);
+    }
+    private static class PriorityThreadPoolExecutor extends ThreadPoolExecutor {
+        private static final int DEFAULT_PRIORITY = 0;
+        private static AtomicLong instanceCounter = new AtomicLong();
+
+        public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
+                                int keepAliveTime, TimeUnit unit) {
+            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, (BlockingQueue) new PriorityBlockingQueue<ComparableTask>(10,
+                    ComparableTask.comparatorByPriorityAndSequentialOrder()));
+        }
+
+        @Override
+        public void execute(Runnable command) {
+            // If this is ugly then delegator pattern needed
+            if (command instanceof ComparableTask) //Already wrapped
+                super.execute(command);
+            else {
+                super.execute(newComparableRunnableFor(command));
+            }
+        }
+
+        private Runnable newComparableRunnableFor(Runnable runnable) {
+            return new ComparableRunnable(ensurePriorityRunnable(runnable));
+        }
+
+        @Override
+        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
+            return new ComparableFutureTask<>(ensurePriorityRunnable(runnable), value);
+        }
+
+        private PriorityRunnable ensurePriorityRunnable(Runnable runnable) {
+            return (runnable instanceof PriorityRunnable) ? (PriorityRunnable) runnable
+                    : PriorityRunnable.of(runnable, DEFAULT_PRIORITY);
+        }
+
+        private class ComparableFutureTask<T> extends FutureTask<T> implements ComparableTask {
+            private Long sequentialOrder = instanceCounter.getAndIncrement();
+            private HasPriority hasPriority;
+
+            public ComparableFutureTask(PriorityRunnable priorityRunnable, T result) {
+                super(priorityRunnable, result);
+                this.hasPriority = priorityRunnable;
+            }
+
+            @Override
+            public long getInstanceCount() {
+                return sequentialOrder;
+            }
+
+            @Override
+            public int getPriority() {
+                return hasPriority.getPriority();
+            }
+        }
+
+        private static class ComparableRunnable implements Runnable, ComparableTask {
+            private Long instanceCount = instanceCounter.getAndIncrement();
+            private HasPriority hasPriority;
+            private Runnable runnable;
+
+            public ComparableRunnable(PriorityRunnable priorityRunnable) {
+                this.runnable = priorityRunnable;
+                this.hasPriority = priorityRunnable;
+            }
+
+            @Override
+            public void run() {
+                runnable.run();
+            }
+
+            @Override
+            public int getPriority() {
+                return hasPriority.getPriority();
+            }
+
+            @Override
+            public long getInstanceCount() {
+                return instanceCount;
+            }
+        }
+
+        private interface ComparableTask extends Runnable {
+            int getPriority();
+
+            long getInstanceCount();
+
+            static Comparator<ComparableTask> comparatorByPriorityAndSequentialOrder() {
+                return (o1, o2) -> {
+                    int priorityResult = o2.getPriority() - o1.getPriority();
+                    return priorityResult != 0 ? priorityResult
+                            : (int) (o1.getInstanceCount() - o2.getInstanceCount());
+                };
+            }
+
+        }
+
+    }
+
+    public interface HasPriority{
+        int getPriority();
+    }
+
+
+    public interface PriorityRunnable extends Runnable, HasPriority{
+
+        public static PriorityRunnable of(Runnable runnable, int priority) {
+            return new PriorityRunnable() {
+                @Override
+                public void run() {
+                    runnable.run();
+                }
+
+                @Override
+                public int getPriority() {
+                    return priority;
+                }
+            };
+        }
+    }
+
+}
\ No newline at end of file