changeset 196:ad49723367c2

add priority
author akahori
date Sat, 09 Mar 2019 14:03:06 +0900
parents a0be7c83fff8
children 4d8f90e8a92c
files src/main/java/christie/codegear/CodeGear.java src/main/java/christie/codegear/CodeGearExecutor.java src/main/java/christie/codegear/CodeGearManager.java src/main/java/christie/codegear/InputDataGear.java src/main/java/christie/codegear/PriorityThreadPoolExecutors.java src/main/java/christie/codegear/StartCodeGear.java
diffstat 6 files changed, 167 insertions(+), 12 deletions(-) [+]
line wrap: on
line diff
--- a/src/main/java/christie/codegear/CodeGear.java	Sat Mar 09 10:19:03 2019 +0900
+++ b/src/main/java/christie/codegear/CodeGear.java	Sat Mar 09 14:03:06 2019 +0900
@@ -26,17 +26,17 @@
 
     protected abstract void run(CodeGearManager cgm);
 
-    public void setup(CodeGearManager cgm){
+    public void setup(CodeGearManager cgm, int priority){
         this.cgm = cgm;
-        this.cge = new CodeGearExecutor(this, this.cgm);
+        this.cge = new CodeGearExecutor(this, this.cgm, priority);
         this.localDGM = cgm.getLocalDGM();
 
         for (Field field : this.getClass().getDeclaredFields()) {//AnnotationからInputDataGearをセット
             if (field.isAnnotationPresent(Take.class)) {
-                Take ano = field.getAnnotation(Take.class);
+                //Take ano = field.getAnnotation(Take.class);
                 setTakeCommand("local", field.getName(), new DataGear(field.getType()));
             } else if (field.isAnnotationPresent(Peek.class)) {
-                Peek ano = field.getAnnotation(Peek.class);
+                //Peek ano = field.getAnnotation(Peek.class);
                 setPeekCommand("local", field.getName(), new DataGear(field.getType()));
             } else if (field.isAnnotationPresent(TakeFrom.class)) {
                 TakeFrom ano = field.getAnnotation(TakeFrom.class);
@@ -105,4 +105,5 @@
     public CodeGearExecutor getCge() {
         return cge;
     }
+
 }
--- a/src/main/java/christie/codegear/CodeGearExecutor.java	Sat Mar 09 10:19:03 2019 +0900
+++ b/src/main/java/christie/codegear/CodeGearExecutor.java	Sat Mar 09 14:03:06 2019 +0900
@@ -3,14 +3,26 @@
 public class CodeGearExecutor implements Runnable {
     CodeGear cg;
     CodeGearManager cgm;
+    private int priority = Thread.NORM_PRIORITY;
 
-    public CodeGearExecutor(CodeGear cg, CodeGearManager cgm){
+    public CodeGearExecutor(CodeGear cg, CodeGearManager cgm, int priority){
         this.cg = cg;
         this.cgm = cgm;
+        this.priority = priority;
     }
 
     @Override
     public void run() {
         cg.run(cgm);
     }
+
+    public int getPriority() {
+        return priority;
+    }
+
+    public void setPriority(int priority) {
+        this.priority = priority;
+    }
+
+
 }
--- a/src/main/java/christie/codegear/CodeGearManager.java	Sat Mar 09 10:19:03 2019 +0900
+++ b/src/main/java/christie/codegear/CodeGearManager.java	Sat Mar 09 14:03:06 2019 +0900
@@ -31,6 +31,8 @@
         this.localPort = localPort;
         daemon = new ChristieDaemon(localPort, this);
         daemon.listen();
+
+
     }
 
     public LocalDataGearManager getLocalDGM(){
@@ -53,11 +55,15 @@
     }
 
     public void submit(CodeGear cg){
-        threadPoolExecutor.execute(cg.getCge());
+        threadPoolExecutor.execute(PriorityThreadPoolExecutors.PriorityRunnable.of(cg.getCge(), cg.getCge().getPriority()));
     }
 
     public void setup(CodeGear cg){
-        cg.setup(this);
+        setup(cg, Thread.NORM_PRIORITY);
+    }
+
+    public void setup(CodeGear cg, int priority){
+        cg.setup(this, priority);
     }
 
     public ConcurrentHashMap<Integer, CodeGearManager> getCgmList() {
--- a/src/main/java/christie/codegear/InputDataGear.java	Sat Mar 09 10:19:03 2019 +0900
+++ b/src/main/java/christie/codegear/InputDataGear.java	Sat Mar 09 14:03:06 2019 +0900
@@ -4,6 +4,7 @@
 import christie.annotation.PeekFrom;
 import christie.annotation.Take;
 import christie.annotation.TakeFrom;
+import christie.datagear.DataGears;
 import christie.datagear.command.Command;
 import christie.datagear.dg.DataGear;
 
@@ -17,6 +18,7 @@
  * inputDataGearの待ち合わせの管理
  */
 public class InputDataGear {
+    public DataGears dataGears;
     public ConcurrentHashMap<String, DataGear> inputValue = new ConcurrentHashMap<String, DataGear>();//受け皿
     public CodeGearManager cgm;
     public CodeGear cg;
@@ -41,10 +43,10 @@
 
     public void setInputs(String key, DataGear dg){
         inputValue.put(key, dg);
-        count();
+        decriment();
     }
 
-    public synchronized void count(){//Commandが実行されるたびにデクリメント
+    public synchronized void decriment(){//Commandが実行されるたびにデクリメント
         if (count.decrementAndGet() == 0){
             setInputValue();
             submitCG();
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/codegear/PriorityThreadPoolExecutors.java	Sat Mar 09 14:03:06 2019 +0900
@@ -0,0 +1,133 @@
+package christie.codegear;
+
+import java.util.Comparator;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+// https://stackoverflow.com/questions/807223/how-do-i-implement-task-prioritization-using-an-executorservice-in-java-5/42831172#42831172
+public class PriorityThreadPoolExecutors {
+
+    public static ThreadPoolExecutor createThreadPool(int nThreads, int keepAliveTime) {
+        return new PriorityThreadPoolExecutor(nThreads, nThreads, keepAliveTime, TimeUnit.MILLISECONDS);
+    }
+    private static class PriorityThreadPoolExecutor extends ThreadPoolExecutor {
+        private static final int DEFAULT_PRIORITY = 0;
+        private static AtomicLong instanceCounter = new AtomicLong();
+
+        public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
+                                int keepAliveTime, TimeUnit unit) {
+            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, (BlockingQueue) new PriorityBlockingQueue<ComparableTask>(10,
+                    ComparableTask.comparatorByPriorityAndSequentialOrder()));
+        }
+
+        @Override
+        public void execute(Runnable command) {
+            // If this is ugly then delegator pattern needed
+            if (command instanceof ComparableTask) //Already wrapped
+                super.execute(command);
+            else {
+                super.execute(newComparableRunnableFor(command));
+            }
+        }
+
+        private Runnable newComparableRunnableFor(Runnable runnable) {
+            return new ComparableRunnable(ensurePriorityRunnable(runnable));
+        }
+
+        @Override
+        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
+            return new ComparableFutureTask<>(ensurePriorityRunnable(runnable), value);
+        }
+
+        private PriorityRunnable ensurePriorityRunnable(Runnable runnable) {
+            return (runnable instanceof PriorityRunnable) ? (PriorityRunnable) runnable
+                    : PriorityRunnable.of(runnable, DEFAULT_PRIORITY);
+        }
+
+        private class ComparableFutureTask<T> extends FutureTask<T> implements ComparableTask {
+            private Long sequentialOrder = instanceCounter.getAndIncrement();
+            private HasPriority hasPriority;
+
+            public ComparableFutureTask(PriorityRunnable priorityRunnable, T result) {
+                super(priorityRunnable, result);
+                this.hasPriority = priorityRunnable;
+            }
+
+            @Override
+            public long getInstanceCount() {
+                return sequentialOrder;
+            }
+
+            @Override
+            public int getPriority() {
+                return hasPriority.getPriority();
+            }
+        }
+
+        private static class ComparableRunnable implements Runnable, ComparableTask {
+            private Long instanceCount = instanceCounter.getAndIncrement();
+            private HasPriority hasPriority;
+            private Runnable runnable;
+
+            public ComparableRunnable(PriorityRunnable priorityRunnable) {
+                this.runnable = priorityRunnable;
+                this.hasPriority = priorityRunnable;
+            }
+
+            @Override
+            public void run() {
+                runnable.run();
+            }
+
+            @Override
+            public int getPriority() {
+                return hasPriority.getPriority();
+            }
+
+            @Override
+            public long getInstanceCount() {
+                return instanceCount;
+            }
+        }
+
+        private interface ComparableTask extends Runnable {
+            int getPriority();
+
+            long getInstanceCount();
+
+            static Comparator<ComparableTask> comparatorByPriorityAndSequentialOrder() {
+                return (o1, o2) -> {
+                    int priorityResult = o2.getPriority() - o1.getPriority();
+                    return priorityResult != 0 ? priorityResult
+                            : (int) (o1.getInstanceCount() - o2.getInstanceCount());
+                };
+            }
+
+        }
+
+    }
+
+    public interface HasPriority{
+        int getPriority();
+    }
+
+
+    public interface PriorityRunnable extends Runnable, HasPriority{
+
+        public static PriorityRunnable of(Runnable runnable, int priority) {
+            return new PriorityRunnable() {
+                @Override
+                public void run() {
+                    runnable.run();
+                }
+
+                @Override
+                public int getPriority() {
+                    return priority;
+                }
+            };
+        }
+    }
+
+}
\ No newline at end of file
--- a/src/main/java/christie/codegear/StartCodeGear.java	Sat Mar 09 10:19:03 2019 +0900
+++ b/src/main/java/christie/codegear/StartCodeGear.java	Sat Mar 09 14:03:06 2019 +0900
@@ -7,12 +7,13 @@
 
 public abstract class StartCodeGear extends CodeGear{
     static ConcurrentHashMap<Integer, CodeGearManager> cgmList = new ConcurrentHashMap<>();
-    static LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>();
-    static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), // initial number of threads
+    //static LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>();
+    /*static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), // initial number of threads
             Runtime.getRuntime().availableProcessors(),
             Integer.MAX_VALUE, // keepAliveTime
             TimeUnit.SECONDS,
-            taskQueue);
+            taskQueue);*/
+    static ThreadPoolExecutor threadPoolExecutor = PriorityThreadPoolExecutors.createThreadPool(Runtime.getRuntime().availableProcessors(), Integer.MAX_VALUE);
     static int cgmCount = 1;
 
     public StartCodeGear(CodeGearManager cgm){