# HG changeset patch # User akahori # Date 1552107786 -32400 # Node ID ad49723367c2d20858192af6cb3a38affa81d1c0 # Parent a0be7c83fff8c44903f0aa4f561fcf485972a222 add priority diff -r a0be7c83fff8 -r ad49723367c2 src/main/java/christie/codegear/CodeGear.java --- a/src/main/java/christie/codegear/CodeGear.java Sat Mar 09 10:19:03 2019 +0900 +++ b/src/main/java/christie/codegear/CodeGear.java Sat Mar 09 14:03:06 2019 +0900 @@ -26,17 +26,17 @@ protected abstract void run(CodeGearManager cgm); - public void setup(CodeGearManager cgm){ + public void setup(CodeGearManager cgm, int priority){ this.cgm = cgm; - this.cge = new CodeGearExecutor(this, this.cgm); + this.cge = new CodeGearExecutor(this, this.cgm, priority); this.localDGM = cgm.getLocalDGM(); for (Field field : this.getClass().getDeclaredFields()) {//AnnotationからInputDataGearをセット if (field.isAnnotationPresent(Take.class)) { - Take ano = field.getAnnotation(Take.class); + //Take ano = field.getAnnotation(Take.class); setTakeCommand("local", field.getName(), new DataGear(field.getType())); } else if (field.isAnnotationPresent(Peek.class)) { - Peek ano = field.getAnnotation(Peek.class); + //Peek ano = field.getAnnotation(Peek.class); setPeekCommand("local", field.getName(), new DataGear(field.getType())); } else if (field.isAnnotationPresent(TakeFrom.class)) { TakeFrom ano = field.getAnnotation(TakeFrom.class); @@ -105,4 +105,5 @@ public CodeGearExecutor getCge() { return cge; } + } diff -r a0be7c83fff8 -r ad49723367c2 src/main/java/christie/codegear/CodeGearExecutor.java --- a/src/main/java/christie/codegear/CodeGearExecutor.java Sat Mar 09 10:19:03 2019 +0900 +++ b/src/main/java/christie/codegear/CodeGearExecutor.java Sat Mar 09 14:03:06 2019 +0900 @@ -3,14 +3,26 @@ public class CodeGearExecutor implements Runnable { CodeGear cg; CodeGearManager cgm; + private int priority = Thread.NORM_PRIORITY; - public CodeGearExecutor(CodeGear cg, CodeGearManager cgm){ + public CodeGearExecutor(CodeGear cg, CodeGearManager cgm, int priority){ this.cg = cg; this.cgm = cgm; + this.priority = priority; } @Override public void run() { cg.run(cgm); } + + public int getPriority() { + return priority; + } + + public void setPriority(int priority) { + this.priority = priority; + } + + } diff -r a0be7c83fff8 -r ad49723367c2 src/main/java/christie/codegear/CodeGearManager.java --- a/src/main/java/christie/codegear/CodeGearManager.java Sat Mar 09 10:19:03 2019 +0900 +++ b/src/main/java/christie/codegear/CodeGearManager.java Sat Mar 09 14:03:06 2019 +0900 @@ -31,6 +31,8 @@ this.localPort = localPort; daemon = new ChristieDaemon(localPort, this); daemon.listen(); + + } public LocalDataGearManager getLocalDGM(){ @@ -53,11 +55,15 @@ } public void submit(CodeGear cg){ - threadPoolExecutor.execute(cg.getCge()); + threadPoolExecutor.execute(PriorityThreadPoolExecutors.PriorityRunnable.of(cg.getCge(), cg.getCge().getPriority())); } public void setup(CodeGear cg){ - cg.setup(this); + setup(cg, Thread.NORM_PRIORITY); + } + + public void setup(CodeGear cg, int priority){ + cg.setup(this, priority); } public ConcurrentHashMap getCgmList() { diff -r a0be7c83fff8 -r ad49723367c2 src/main/java/christie/codegear/InputDataGear.java --- a/src/main/java/christie/codegear/InputDataGear.java Sat Mar 09 10:19:03 2019 +0900 +++ b/src/main/java/christie/codegear/InputDataGear.java Sat Mar 09 14:03:06 2019 +0900 @@ -4,6 +4,7 @@ import christie.annotation.PeekFrom; import christie.annotation.Take; import christie.annotation.TakeFrom; +import christie.datagear.DataGears; import christie.datagear.command.Command; import christie.datagear.dg.DataGear; @@ -17,6 +18,7 @@ * inputDataGearの待ち合わせの管理 */ public class InputDataGear { + public DataGears dataGears; public ConcurrentHashMap inputValue = new ConcurrentHashMap();//受け皿 public CodeGearManager cgm; public CodeGear cg; @@ -41,10 +43,10 @@ public void setInputs(String key, DataGear dg){ inputValue.put(key, dg); - count(); + decriment(); } - public synchronized void count(){//Commandが実行されるたびにデクリメント + public synchronized void decriment(){//Commandが実行されるたびにデクリメント if (count.decrementAndGet() == 0){ setInputValue(); submitCG(); diff -r a0be7c83fff8 -r ad49723367c2 src/main/java/christie/codegear/PriorityThreadPoolExecutors.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/christie/codegear/PriorityThreadPoolExecutors.java Sat Mar 09 14:03:06 2019 +0900 @@ -0,0 +1,133 @@ +package christie.codegear; + +import java.util.Comparator; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; + + +// https://stackoverflow.com/questions/807223/how-do-i-implement-task-prioritization-using-an-executorservice-in-java-5/42831172#42831172 +public class PriorityThreadPoolExecutors { + + public static ThreadPoolExecutor createThreadPool(int nThreads, int keepAliveTime) { + return new PriorityThreadPoolExecutor(nThreads, nThreads, keepAliveTime, TimeUnit.MILLISECONDS); + } + private static class PriorityThreadPoolExecutor extends ThreadPoolExecutor { + private static final int DEFAULT_PRIORITY = 0; + private static AtomicLong instanceCounter = new AtomicLong(); + + public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, + int keepAliveTime, TimeUnit unit) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, (BlockingQueue) new PriorityBlockingQueue(10, + ComparableTask.comparatorByPriorityAndSequentialOrder())); + } + + @Override + public void execute(Runnable command) { + // If this is ugly then delegator pattern needed + if (command instanceof ComparableTask) //Already wrapped + super.execute(command); + else { + super.execute(newComparableRunnableFor(command)); + } + } + + private Runnable newComparableRunnableFor(Runnable runnable) { + return new ComparableRunnable(ensurePriorityRunnable(runnable)); + } + + @Override + protected RunnableFuture newTaskFor(Runnable runnable, T value) { + return new ComparableFutureTask<>(ensurePriorityRunnable(runnable), value); + } + + private PriorityRunnable ensurePriorityRunnable(Runnable runnable) { + return (runnable instanceof PriorityRunnable) ? (PriorityRunnable) runnable + : PriorityRunnable.of(runnable, DEFAULT_PRIORITY); + } + + private class ComparableFutureTask extends FutureTask implements ComparableTask { + private Long sequentialOrder = instanceCounter.getAndIncrement(); + private HasPriority hasPriority; + + public ComparableFutureTask(PriorityRunnable priorityRunnable, T result) { + super(priorityRunnable, result); + this.hasPriority = priorityRunnable; + } + + @Override + public long getInstanceCount() { + return sequentialOrder; + } + + @Override + public int getPriority() { + return hasPriority.getPriority(); + } + } + + private static class ComparableRunnable implements Runnable, ComparableTask { + private Long instanceCount = instanceCounter.getAndIncrement(); + private HasPriority hasPriority; + private Runnable runnable; + + public ComparableRunnable(PriorityRunnable priorityRunnable) { + this.runnable = priorityRunnable; + this.hasPriority = priorityRunnable; + } + + @Override + public void run() { + runnable.run(); + } + + @Override + public int getPriority() { + return hasPriority.getPriority(); + } + + @Override + public long getInstanceCount() { + return instanceCount; + } + } + + private interface ComparableTask extends Runnable { + int getPriority(); + + long getInstanceCount(); + + static Comparator comparatorByPriorityAndSequentialOrder() { + return (o1, o2) -> { + int priorityResult = o2.getPriority() - o1.getPriority(); + return priorityResult != 0 ? priorityResult + : (int) (o1.getInstanceCount() - o2.getInstanceCount()); + }; + } + + } + + } + + public interface HasPriority{ + int getPriority(); + } + + + public interface PriorityRunnable extends Runnable, HasPriority{ + + public static PriorityRunnable of(Runnable runnable, int priority) { + return new PriorityRunnable() { + @Override + public void run() { + runnable.run(); + } + + @Override + public int getPriority() { + return priority; + } + }; + } + } + +} \ No newline at end of file diff -r a0be7c83fff8 -r ad49723367c2 src/main/java/christie/codegear/StartCodeGear.java --- a/src/main/java/christie/codegear/StartCodeGear.java Sat Mar 09 10:19:03 2019 +0900 +++ b/src/main/java/christie/codegear/StartCodeGear.java Sat Mar 09 14:03:06 2019 +0900 @@ -7,12 +7,13 @@ public abstract class StartCodeGear extends CodeGear{ static ConcurrentHashMap cgmList = new ConcurrentHashMap<>(); - static LinkedBlockingQueue taskQueue = new LinkedBlockingQueue(); - static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), // initial number of threads + //static LinkedBlockingQueue taskQueue = new LinkedBlockingQueue(); + /*static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), // initial number of threads Runtime.getRuntime().availableProcessors(), Integer.MAX_VALUE, // keepAliveTime TimeUnit.SECONDS, - taskQueue); + taskQueue);*/ + static ThreadPoolExecutor threadPoolExecutor = PriorityThreadPoolExecutors.createThreadPool(Runtime.getRuntime().availableProcessors(), Integer.MAX_VALUE); static int cgmCount = 1; public StartCodeGear(CodeGearManager cgm){