spring cloud hystrix实践

徐键
介绍

hystrix是spring cloud的熔断降级组件,由netflix公司开源,通过命令模式结合rxjava框架实现,命令模式封装了用户具体业务,使用rxjava对命令的执行结果进行统计,根据统计结果按一定策略执行熔断降级,避免造成应用失败雪崩。
执行流程如下图: 流程说明:
1.每次调用创建一个新的HystrixCommand,把依赖调用封装在run()方法中.
2:执行execute()/queue做同步或异步调用.
3:判断熔断器(circuit-breaker)是否打开,如果打开跳到步骤8,进行降级策略,如果关闭进入后续步骤.
4:判断线程池/队列/信号量是否跑满,如果跑满进入降级步骤8,否则继续后续步骤.
5:调用HystrixCommand的run方法.运行依赖逻辑
5a:依赖逻辑调用超时,进入步骤8.
6:判断逻辑是否调用成功
6a:返回成功调用结果
6b:调用出错,进入步骤8.
7:计算熔断器状态,所有的运行状态(成功, 失败, 拒绝,超时)上报给熔断器,用于统计从而判断熔断器状态.
8:getFallback()降级逻辑.
  以下四种情况将触发getFallback调用:
(1):run()方法抛出非HystrixBadRequestException异常。
(2):run()方法调用超时
(3):熔断器开启拦截调用
(4):线程池/队列/信号量是否跑满
8a:没有实现getFallback的Command将直接抛出异常
8b:fallback降级逻辑调用成功直接返回
8c:降级逻辑调用失败抛出异常
9:返回执行成功结果

官方文档有详细的使用示例:
https://github.com/Netflix/Hystrix/wiki/How-To-Use

使用

    快递业务涉及很多外部对接,为了各个对接接口的隔离和失败的降级防止雪崩,所以引入了hystrix作为降级组件。 使用非常方便

1.引入依赖:

<dependency>  
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-hystrix</artifactId>
    <version>1.4.5.RELEASE</version>
</dependency>  

2.启动了增加annotation @EnableCircuitBreaker

3.需要隔离降级的方法增加注解:

@HystrixCommand(groupKey= "yunda",commandKey="scanningOrder",threadPoolKey="scanningOrder-thread",
            threadPoolProperties = {@HystrixProperty(name="maximumSize",value = "20")},
            commandProperties = {@HystrixProperty(name="execution.isolation.thread.timeoutInMilliseconds",value = "3000")},
            fallbackMethod="scanningOrderFallback",
            ignoreExceptions={BusinessException.class}
    )

    spring cloud使用aop方式将包含@HystrixCommand注解的方法进行代理,包装了一层HystrixCommond并做了扩展。 该注解上可以配置hystrix各项参数
配置说明:
groupKey指定命令分组,同一个分组使用一个线程池。
commandKey指定命令名称。
threadPoolKey指定代码执行的具体线程名称。
commandProperties 可配置命令执行及指标统计相关参数:
execution.isolation.thread.timeoutInMilliseconds 该参数配置业务代码超时时间(默认1000ms),执行超过该时间会触发降级方法,没有降级方法将抛出HystrixRuntimeException
metrics.rollingStats.timeInMilliseconds参数可配置执行结果统计的滑动窗口时间 默认10000ms
metrics.rollingStats.numBuckets参数可配置滑动窗口包含多少段 默认10
circuitBreaker.requestVolumeThreshold参数可配置触发熔断的请求量阈值 默认20
circuitBreaker.errorThresholdPercentage可配置触发熔断的失败比率,默认50%
circuitBreaker.sleepWindowInMilliseconds可配置触发熔断到恢复的时间窗,默认5s
threadPoolProperties 可配置线程池相关参数,hystrix默认使用线程池进行业务隔离,核心线程数和最大线程数默认都是10个线程,并且使用SynchronizedQueue,即默认限制了最大并发数为10
fallbackMethod指定了降级的方法。
ignoreExceptions指定哪些异常是不需要降级的,比如我们需要给前端返回一个BusinessException,就不需要降级。
核心的配置上面都介绍了,更具体的可以看代码里的HystrixCommandProperties和HystrixThreadPoolProperties两个类。

使用过程中遇到一个问题: 当代码执行异常触发降级之后,降级的方法也是返回一个具体Exception,最终抛出的是对应的Exception 而当方法超时触发降级并且降级的方法也是返回一个具体Exception,最终抛出的却是HystrixRuntimeException 查看代码之后发现commond都是返回了HystrixRuntimeException,而实现aop的HystrixCommandAspect中对HystrixRuntimeException 做了处理,如果是命令执行失败类型(HystrixRuntimeException.FailureType.COMMAND_EXCEPTION),将抛出具体异常 即降级中的Exception,失败类型不是命令执行失败类型(比如超时),将直接抛出HystrixRuntimeException,这样就不能返回我们需要返回的降级方法的异常了,只能通过修改具体实现来解决了。

指标统计

hystrix熔断决策及dashboard展示都是依赖于指标的统计,基于不同的HystrixEventStream实现发射不同的事件流。
类图: HystrixCommandMetrics中的指标流如下:

//统计滚动窗口总请求数 失败请求数及失败比率用于熔断判断
private HealthCountsStream healthCountsStream;  
//统计滚动窗口时间内各分段时间的命令执行结果
private final RollingCommandEventCounterStream rollingCommandEventCounterStream;  
//汇总命令所有时间执行结果
private final CumulativeCommandEventCounterStream cumulativeCommandEventCounterStream;  
//滚动窗口不同占比请求的命令耗时统计
private final RollingCommandLatencyDistributionStream rollingCommandLatencyDistributionStream;  
//滚动窗口不同占比请求的用户代码耗时统计
private final RollingCommandUserLatencyDistributionStream rollingCommandUserLatencyDistributionStream;  
//滚动窗口最大并发统计
private final RollingCommandMaxConcurrencyStream rollingCommandMaxConcurrencyStream;  

以上的统计除了rollingCommandMaxConcurrencyStream是基于HystrixCommandStartStream,其他都是基于HystrixCommandCompletionStream,选择一个统计流看看具体实现,比如RollingCommandEventCounterStream。 类图:
看下基类的BucketedCounterStream的Observable实现

this.bucketedStream = Observable.defer(new Func0<Observable<Bucket>>() {  
    @Override
    public Observable<Bucket> call() {
        return inputEventStream
                .observe()
                .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //bucket it by the counter window so we can emit to the next operator in time chunks, not on every OnNext
                .flatMap(reduceBucketToSummary)                //for a given bucket, turn it into a long array containing counts of event types
                .startWith(emptyEventCountsToStart);           //start it with empty arrays to make consumer logic as generic as possible (windows are always full)
    }
});

inputEventStream是上面的HystrixCommandCompletionStream命令完成事件流,通过rxjava的window api按每秒打开一个窗口做处理,
rxjava window api:
https://mcxiaoke.gitbooks.io/rxdocs/content/operators/Window.html
处理方法是:

this.reduceBucketToSummary = new Func1<Observable<Event>, Observable<Bucket>>() {  
    @Override
    public Observable<Bucket> call(Observable<Event> eventBucket) {
        return eventBucket.reduce(getEmptyBucketSummary(), appendRawEventToBucket);
    }
};

appendRawEventToBucket在HystrixCommandMetrics定义:

public static final Func2<long[], HystrixCommandCompletion, long[]> appendEventToBucket = new Func2<long[], HystrixCommandCompletion, long[]>() {  
    @Override
    public long[] call(long[] initialCountArray, HystrixCommandCompletion execution) {
        ExecutionResult.EventCounts eventCounts = execution.getEventCounts();
        for (HystrixEventType eventType: ALL_EVENT_TYPES) {
            switch (eventType) {
                case EXCEPTION_THROWN: break; //this is just a sum of other anyway - don't do the work here
                default:
                    initialCountArray[eventType.ordinal()] += eventCounts.getCount(eventType);
                    break;
            }
        }
        return initialCountArray;
    }
};

做的处理就是对命令的执行结果转换为了Long数组,不同执行结果次数存于不同下标。 而BucketedRollingCounterStream继承于BucketedCounterStream,实现的Observable如下:

this.sourceStream = bucketedStream      //stream broken up into buckets  
    .window(numBuckets, 1)          //emit overlapping windows of buckets
    .flatMap(reduceWindowToSummary) //convert a window of bucket-summaries into a single summary
    .doOnSubscribe(new Action0() {
        @Override
        public void call() {
            isSourceCurrentlySubscribed.set(true);
        }
    })
    .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
            isSourceCurrentlySubscribed.set(false);
        }
    })
    .share()                        //multiple subscribers should get same data
    .onBackpressureDrop();          //if there are slow consumers, data should not buffer

也是window api发射numBuckets(默认10)之后新开窗口并且skip了1,即默认窗口时间10s分10段,每次达到10段新开窗口并skip最后1s的窗口,从而达到了滚动的效果。对每秒的window Observable做reduceWindowToSummary操作,实现如下:

Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() {  
    @Override
    public Observable<Output> call(Observable<Bucket> window) {
        return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
    }
};

使用rxjava 的scan api对每一项应用了reduceBucket操作,scan api定义:
https://mcxiaoke.gitbooks.io/rxdocs/content/operators/Scan.html
reduceBucket定义在HystrixCommandMetrics的bucketAggregator:

public static final Func2<long[], long[], long[]> bucketAggregator = new Func2<long[], long[], long[]>() {  
    @Override
    public long[] call(long[] cumulativeEvents, long[] bucketEventCounts) {
        for (HystrixEventType eventType: ALL_EVENT_TYPES) {
            switch (eventType) {
                case EXCEPTION_THROWN:
                    for (HystrixEventType exceptionEventType: HystrixEventType.EXCEPTION_PRODUCING_EVENT_TYPES) {
                        cumulativeEvents[eventType.ordinal()] += bucketEventCounts[exceptionEventType.ordinal()];
                    }
                    break;
                default:
                    cumulativeEvents[eventType.ordinal()] += bucketEventCounts[eventType.ordinal()];
                    break;
            }
        }
        return cumulativeEvents;
    }
};

实现了bucket里面的不同执行结果次数的相加,做到了滚动窗口内每个桶内各项执行结果的统计。
总体指标流程如下图: 实现涉及到很多rxjava api 可看文档:
https://mcxiaoke.gitbooks.io/rxdocs/content/Intro.html
由于本人能力有限,如有不对之处,欢迎指正。