博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊HystrixThreadPool
阅读量:7154 次
发布时间:2019-06-29

本文共 11874 字,大约阅读时间需要 39 分钟。

  hot3.png

本文主要研究一下HystrixThreadPool

HystrixThreadPool

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/HystrixThreadPool.java

/** * ThreadPool used to executed {@link HystrixCommand#run()} on separate threads when configured to do so with {@link HystrixCommandProperties#executionIsolationStrategy()}. * 

* Typically each {@link HystrixCommandGroupKey} has its own thread-pool so that any one group of commands can not starve others from being able to run. *

* A {@link HystrixCommand} can be configured with a thread-pool explicitly by injecting a {@link HystrixThreadPoolKey} or via the * {@link HystrixCommandProperties#executionIsolationThreadPoolKeyOverride()} otherwise it * will derive a {@link HystrixThreadPoolKey} from the injected {@link HystrixCommandGroupKey}. *

* The pool should be sized large enough to handle normal healthy traffic but small enough that it will constrain concurrent execution if backend calls become latent. *

* For more information see the Github Wiki: https://github.com/Netflix/Hystrix/wiki/Configuration#wiki-ThreadPool and https://github.com/Netflix/Hystrix/wiki/How-it-Works#wiki-Isolation */public interface HystrixThreadPool { /** * Implementation of {@link ThreadPoolExecutor}. * * @return ThreadPoolExecutor */ public ExecutorService getExecutor(); public Scheduler getScheduler(); public Scheduler getScheduler(Func0

shouldInterruptThread); /** * Mark when a thread begins executing a command. */ public void markThreadExecution(); /** * Mark when a thread completes executing a command. */ public void markThreadCompletion(); /** * Mark when a command gets rejected from the threadpool */ public void markThreadRejection(); /** * Whether the queue will allow adding an item to it. *

* This allows dynamic control of the max queueSize versus whatever the actual max queueSize is so that dynamic changes can be done via property changes rather than needing an app * restart to adjust when commands should be rejected from queuing up. * * @return boolean whether there is space on the queue */ public boolean isQueueSpaceAvailable(); //......}

这个接口主要是定义getExecutor、getScheduler方法,以及一系列的mark方法(markThreadExecution、markThreadCompletion、markThreadRejection)

HystrixThreadPoolDefault

/**     * @ExcludeFromJavadoc     * @ThreadSafe     */    /* package */static class HystrixThreadPoolDefault implements HystrixThreadPool {        private static final Logger logger = LoggerFactory.getLogger(HystrixThreadPoolDefault.class);        private final HystrixThreadPoolProperties properties;        private final BlockingQueue
queue; private final ThreadPoolExecutor threadPool; private final HystrixThreadPoolMetrics metrics; private final int queueSize; public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) { this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults); HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy(); this.queueSize = properties.maxQueueSize().get(); this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey, concurrencyStrategy.getThreadPool(threadPoolKey, properties), properties); this.threadPool = this.metrics.getThreadPool(); this.queue = this.threadPool.getQueue(); /* strategy: HystrixMetricsPublisherThreadPool */ HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties); } @Override public ThreadPoolExecutor getExecutor() { touchConfig(); return threadPool; } @Override public Scheduler getScheduler() { //by default, interrupt underlying threads on timeout return getScheduler(new Func0
() { @Override public Boolean call() { return true; } }); } @Override public Scheduler getScheduler(Func0
shouldInterruptThread) { touchConfig(); return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread); } // allow us to change things via fast-properties by setting it each time private void touchConfig() { final int dynamicCoreSize = properties.coreSize().get(); final int configuredMaximumSize = properties.maximumSize().get(); int dynamicMaximumSize = properties.actualMaximumSize(); final boolean allowSizesToDiverge = properties.getAllowMaximumSizeToDivergeFromCoreSize().get(); boolean maxTooLow = false; if (allowSizesToDiverge && configuredMaximumSize < dynamicCoreSize) { //if user sets maximum < core (or defaults get us there), we need to maintain invariant of core <= maximum dynamicMaximumSize = dynamicCoreSize; maxTooLow = true; } // In JDK 6, setCorePoolSize and setMaximumPoolSize will execute a lock operation. Avoid them if the pool size is not changed. if (threadPool.getCorePoolSize() != dynamicCoreSize || (allowSizesToDiverge && threadPool.getMaximumPoolSize() != dynamicMaximumSize)) { if (maxTooLow) { logger.error("Hystrix ThreadPool configuration for : " + metrics.getThreadPoolKey().name() + " is trying to set coreSize = " + dynamicCoreSize + " and maximumSize = " + configuredMaximumSize + ". Maximum size will be set to " + dynamicMaximumSize + ", the coreSize value, since it must be equal to or greater than the coreSize value"); } threadPool.setCorePoolSize(dynamicCoreSize); threadPool.setMaximumPoolSize(dynamicMaximumSize); } threadPool.setKeepAliveTime(properties.keepAliveTimeMinutes().get(), TimeUnit.MINUTES); } @Override public void markThreadExecution() { metrics.markThreadExecution(); } @Override public void markThreadCompletion() { metrics.markThreadCompletion(); } @Override public void markThreadRejection() { metrics.markThreadRejection(); } /** * Whether the threadpool queue has space available according to the
queueSizeRejectionThreshold settings. * * Note that the
queueSize is an final instance variable on HystrixThreadPoolDefault, and not looked up dynamically. * The data structure is static, so this does not make sense as a dynamic lookup. * The
queueSizeRejectionThreshold can be dynamic (up to
queueSize), so that should * still get checked on each invocation. *

* If a SynchronousQueue implementation is used (maxQueueSize <= 0), it always returns 0 as the size so this would always return true. */ @Override public boolean isQueueSpaceAvailable() { if (queueSize <= 0) { // we don't have a queue so we won't look for space but instead // let the thread-pool reject or not return true; } else { return threadPool.getQueue().size() < properties.queueSizeRejectionThreshold().get(); } } }

  • getExecutor方法返回的是HystrixThreadPoolMetrics的ThreadPoolExecutor,只是每次get的时候会touch下config,刷新下threadPool的配置
  • getScheduler方法这里用get不是太恰当,因为里头是new了一个HystrixContextScheduler
  • mark方法主要是委托给HystrixThreadPoolMetrics

HystrixThreadPoolProperties

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/HystrixThreadPoolProperties.java

/** * Properties for instances of {@link HystrixThreadPool}. * 

* Default implementation of methods uses Archaius (https://github.com/Netflix/archaius) * * Note a change in behavior in 1.5.7. Prior to that version, the configuration for 'coreSize' was used to control * both coreSize and maximumSize. This is a fixed-size threadpool that can never give up an unused thread. In 1.5.7+, * the values can diverge, and if you set coreSize < maximumSize, threads can be given up (subject to the keep-alive * time) * * It is OK to leave maximumSize unset using any version of Hystrix. If you do, then maximum size will default to * core size and you'll have a fixed-size threadpool. * * If you accidentally set maximumSize < coreSize, then maximum will be raised to coreSize * (this prioritizes keeping extra threads around rather than inducing threadpool rejections) */public abstract class HystrixThreadPoolProperties { /* defaults */ static int default_coreSize = 10; // core size of thread pool static int default_maximumSize = 10; // maximum size of thread pool static int default_keepAliveTimeMinutes = 1; // minutes to keep a thread alive static int default_maxQueueSize = -1; // size of queue (this can't be dynamically changed so we use 'queueSizeRejectionThreshold' to artificially limit and reject) // -1 turns it off and makes us use SynchronousQueue static boolean default_allow_maximum_size_to_diverge_from_core_size = false; //should the maximumSize config value get read and used in configuring the threadPool //turning this on should be a conscious decision by the user, so we default it to false static int default_queueSizeRejectionThreshold = 5; // number of items in queue static int default_threadPoolRollingNumberStatisticalWindow = 10000; // milliseconds for rolling number static int default_threadPoolRollingNumberStatisticalWindowBuckets = 10; // number of buckets in rolling number (10 1-second buckets) private final HystrixProperty

corePoolSize; private final HystrixProperty
maximumPoolSize; private final HystrixProperty
keepAliveTime; private final HystrixProperty
maxQueueSize; private final HystrixProperty
queueSizeRejectionThreshold; private final HystrixProperty
allowMaximumSizeToDivergeFromCoreSize; private final HystrixProperty
threadPoolRollingNumberStatisticalWindowInMilliseconds; private final HystrixProperty
threadPoolRollingNumberStatisticalWindowBuckets; //......}

主要是配置corePoolSize、maximumPoolSize、keepAliveTime、maxQueueSize、queueSizeRejectionThreshold、allowMaximumSizeToDivergeFromCoreSize、threadPoolRollingNumberStatisticalWindowInMilliseconds、threadPoolRollingNumberStatisticalWindowBuckets

HystrixThreadPoolMetrics

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/HystrixThreadPoolMetrics.java

/** * Used by {@link HystrixThreadPool} to record metrics. */public class HystrixThreadPoolMetrics extends HystrixMetrics {    //......    /**     * Invoked each time a thread is executed.     */    public void markThreadExecution() {        concurrentExecutionCount.incrementAndGet();    }        /**     * Invoked each time a thread completes.     */    public void markThreadCompletion() {        concurrentExecutionCount.decrementAndGet();    }        /**     * Invoked each time a command is rejected from the thread-pool     */    public void markThreadRejection() {        concurrentExecutionCount.decrementAndGet();    }    //......}

这几个mark方法是相应的计数。

小结

HystrixThreadPool主要是定义getExecutor、getScheduler方法,以及一系列的mark方法。这些mark方法主要是做一些指标的count统计。

doc

转载于:https://my.oschina.net/go4it/blog/1835383

你可能感兴趣的文章
6月14日奋战es5基础-1
查看>>
Socket IO与NIO(六)
查看>>
一图胜千言,8张图理解Java
查看>>
[算法]动态规划之最长递增子序列
查看>>
好程序员告诉你Java架构师学习路线
查看>>
Redis之主从集群环境搭建
查看>>
tab切换小例子
查看>>
Java封装
查看>>
【快学springboot】9.使用 @Transactional 注解配置事务管理
查看>>
匿名对象方案与实体对象方案对比
查看>>
NTP服务放大攻击的解决办法
查看>>
SonarQube代码质量管理工具的安装(Linux)
查看>>
Niginx +Tomcat 集群搭建
查看>>
atoi 实现
查看>>
机器学习中的度量——相关系数
查看>>
poj 3237 Tree
查看>>
SQL SERVER 占用资源高的SQL语句
查看>>
telnet不是内部命令也不是外部命令
查看>>
22. Angular 中用 a 标签 href 路由时在浏览器中显示异常 "%2F" 路由失败问题
查看>>
77.招聘信息管理 EXTJS 页面
查看>>