序
本文主要研究一下HystrixThreadPool
HystrixThreadPool
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/HystrixThreadPool.java
/** * ThreadPool used to executed {@link HystrixCommand#run()} on separate threads when configured to do so with {@link HystrixCommandProperties#executionIsolationStrategy()}. ** Typically each {@link HystrixCommandGroupKey} has its own thread-pool so that any one group of commands can not starve others from being able to run. *
* A {@link HystrixCommand} can be configured with a thread-pool explicitly by injecting a {@link HystrixThreadPoolKey} or via the * {@link HystrixCommandProperties#executionIsolationThreadPoolKeyOverride()} otherwise it * will derive a {@link HystrixThreadPoolKey} from the injected {@link HystrixCommandGroupKey}. *
* The pool should be sized large enough to handle normal healthy traffic but small enough that it will constrain concurrent execution if backend calls become latent. *
* For more information see the Github Wiki: https://github.com/Netflix/Hystrix/wiki/Configuration#wiki-ThreadPool and https://github.com/Netflix/Hystrix/wiki/How-it-Works#wiki-Isolation */public interface HystrixThreadPool { /** * Implementation of {@link ThreadPoolExecutor}. * * @return ThreadPoolExecutor */ public ExecutorService getExecutor(); public Scheduler getScheduler(); public Scheduler getScheduler(Func0
shouldInterruptThread); /** * Mark when a thread begins executing a command. */ public void markThreadExecution(); /** * Mark when a thread completes executing a command. */ public void markThreadCompletion(); /** * Mark when a command gets rejected from the threadpool */ public void markThreadRejection(); /** * Whether the queue will allow adding an item to it. * * This allows dynamic control of the max queueSize versus whatever the actual max queueSize is so that dynamic changes can be done via property changes rather than needing an app * restart to adjust when commands should be rejected from queuing up. * * @return boolean whether there is space on the queue */ public boolean isQueueSpaceAvailable(); //......}
这个接口主要是定义getExecutor、getScheduler方法,以及一系列的mark方法(
markThreadExecution、markThreadCompletion、markThreadRejection
)
HystrixThreadPoolDefault
/** * @ExcludeFromJavadoc * @ThreadSafe */ /* package */static class HystrixThreadPoolDefault implements HystrixThreadPool { private static final Logger logger = LoggerFactory.getLogger(HystrixThreadPoolDefault.class); private final HystrixThreadPoolProperties properties; private final BlockingQueuequeue; private final ThreadPoolExecutor threadPool; private final HystrixThreadPoolMetrics metrics; private final int queueSize; public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) { this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults); HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy(); this.queueSize = properties.maxQueueSize().get(); this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey, concurrencyStrategy.getThreadPool(threadPoolKey, properties), properties); this.threadPool = this.metrics.getThreadPool(); this.queue = this.threadPool.getQueue(); /* strategy: HystrixMetricsPublisherThreadPool */ HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties); } @Override public ThreadPoolExecutor getExecutor() { touchConfig(); return threadPool; } @Override public Scheduler getScheduler() { //by default, interrupt underlying threads on timeout return getScheduler(new Func0 () { @Override public Boolean call() { return true; } }); } @Override public Scheduler getScheduler(Func0 shouldInterruptThread) { touchConfig(); return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread); } // allow us to change things via fast-properties by setting it each time private void touchConfig() { final int dynamicCoreSize = properties.coreSize().get(); final int configuredMaximumSize = properties.maximumSize().get(); int dynamicMaximumSize = properties.actualMaximumSize(); final boolean allowSizesToDiverge = properties.getAllowMaximumSizeToDivergeFromCoreSize().get(); boolean maxTooLow = false; if (allowSizesToDiverge && configuredMaximumSize < dynamicCoreSize) { //if user sets maximum < core (or defaults get us there), we need to maintain invariant of core <= maximum dynamicMaximumSize = dynamicCoreSize; maxTooLow = true; } // In JDK 6, setCorePoolSize and setMaximumPoolSize will execute a lock operation. Avoid them if the pool size is not changed. if (threadPool.getCorePoolSize() != dynamicCoreSize || (allowSizesToDiverge && threadPool.getMaximumPoolSize() != dynamicMaximumSize)) { if (maxTooLow) { logger.error("Hystrix ThreadPool configuration for : " + metrics.getThreadPoolKey().name() + " is trying to set coreSize = " + dynamicCoreSize + " and maximumSize = " + configuredMaximumSize + ". Maximum size will be set to " + dynamicMaximumSize + ", the coreSize value, since it must be equal to or greater than the coreSize value"); } threadPool.setCorePoolSize(dynamicCoreSize); threadPool.setMaximumPoolSize(dynamicMaximumSize); } threadPool.setKeepAliveTime(properties.keepAliveTimeMinutes().get(), TimeUnit.MINUTES); } @Override public void markThreadExecution() { metrics.markThreadExecution(); } @Override public void markThreadCompletion() { metrics.markThreadCompletion(); } @Override public void markThreadRejection() { metrics.markThreadRejection(); } /** * Whether the threadpool queue has space available according to the queueSizeRejectionThreshold
settings. * * Note that thequeueSize
is an final instance variable on HystrixThreadPoolDefault, and not looked up dynamically. * The data structure is static, so this does not make sense as a dynamic lookup. * ThequeueSizeRejectionThreshold
can be dynamic (up toqueueSize
), so that should * still get checked on each invocation. ** If a SynchronousQueue implementation is used (
maxQueueSize
<= 0), it always returns 0 as the size so this would always return true. */ @Override public boolean isQueueSpaceAvailable() { if (queueSize <= 0) { // we don't have a queue so we won't look for space but instead // let the thread-pool reject or not return true; } else { return threadPool.getQueue().size() < properties.queueSizeRejectionThreshold().get(); } } }
- getExecutor方法返回的是HystrixThreadPoolMetrics的ThreadPoolExecutor,只是每次get的时候会touch下config,刷新下threadPool的配置
- getScheduler方法这里用get不是太恰当,因为里头是new了一个HystrixContextScheduler
- mark方法主要是委托给HystrixThreadPoolMetrics
HystrixThreadPoolProperties
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/HystrixThreadPoolProperties.java
/** * Properties for instances of {@link HystrixThreadPool}. ** Default implementation of methods uses Archaius (https://github.com/Netflix/archaius) * * Note a change in behavior in 1.5.7. Prior to that version, the configuration for 'coreSize' was used to control * both coreSize and maximumSize. This is a fixed-size threadpool that can never give up an unused thread. In 1.5.7+, * the values can diverge, and if you set coreSize < maximumSize, threads can be given up (subject to the keep-alive * time) * * It is OK to leave maximumSize unset using any version of Hystrix. If you do, then maximum size will default to * core size and you'll have a fixed-size threadpool. * * If you accidentally set maximumSize < coreSize, then maximum will be raised to coreSize * (this prioritizes keeping extra threads around rather than inducing threadpool rejections) */public abstract class HystrixThreadPoolProperties { /* defaults */ static int default_coreSize = 10; // core size of thread pool static int default_maximumSize = 10; // maximum size of thread pool static int default_keepAliveTimeMinutes = 1; // minutes to keep a thread alive static int default_maxQueueSize = -1; // size of queue (this can't be dynamically changed so we use 'queueSizeRejectionThreshold' to artificially limit and reject) // -1 turns it off and makes us use SynchronousQueue static boolean default_allow_maximum_size_to_diverge_from_core_size = false; //should the maximumSize config value get read and used in configuring the threadPool //turning this on should be a conscious decision by the user, so we default it to false static int default_queueSizeRejectionThreshold = 5; // number of items in queue static int default_threadPoolRollingNumberStatisticalWindow = 10000; // milliseconds for rolling number static int default_threadPoolRollingNumberStatisticalWindowBuckets = 10; // number of buckets in rolling number (10 1-second buckets) private final HystrixProperty
corePoolSize; private final HystrixProperty maximumPoolSize; private final HystrixProperty keepAliveTime; private final HystrixProperty maxQueueSize; private final HystrixProperty queueSizeRejectionThreshold; private final HystrixProperty allowMaximumSizeToDivergeFromCoreSize; private final HystrixProperty threadPoolRollingNumberStatisticalWindowInMilliseconds; private final HystrixProperty threadPoolRollingNumberStatisticalWindowBuckets; //......}
主要是配置corePoolSize、maximumPoolSize、keepAliveTime、maxQueueSize、queueSizeRejectionThreshold、allowMaximumSizeToDivergeFromCoreSize、threadPoolRollingNumberStatisticalWindowInMilliseconds、threadPoolRollingNumberStatisticalWindowBuckets
HystrixThreadPoolMetrics
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/HystrixThreadPoolMetrics.java
/** * Used by {@link HystrixThreadPool} to record metrics. */public class HystrixThreadPoolMetrics extends HystrixMetrics { //...... /** * Invoked each time a thread is executed. */ public void markThreadExecution() { concurrentExecutionCount.incrementAndGet(); } /** * Invoked each time a thread completes. */ public void markThreadCompletion() { concurrentExecutionCount.decrementAndGet(); } /** * Invoked each time a command is rejected from the thread-pool */ public void markThreadRejection() { concurrentExecutionCount.decrementAndGet(); } //......}
这几个mark方法是相应的计数。
小结
HystrixThreadPool主要是定义getExecutor、getScheduler方法,以及一系列的mark方法。这些mark方法主要是做一些指标的count统计。