Soul网关中的Hystrix熔断插件(二)

在上一篇文章中,体验到了soulhystrix插件熔断的功能,今天来分析其中执行过程和相关原理。

HystrixPlugin插件中就是主要的核心处理逻辑:

protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
       //省略了其他代码
       //构建command对象
        Command command = fetchCommand(hystrixHandle, exchange, chain);
        return Mono.create(s -> {
            //执行具体请求
            Subscription sub = command.fetchObservable().subscribe(s::success,
                    s::error, s::success);
            s.onCancel(sub::unsubscribe);
            //熔断器打开了
            if (command.isCircuitBreakerOpen()) {
                log.error("hystrix execute have circuitBreaker is Open! groupKey:{},commandKey:{}", hystrixHandle.getGroupKey(), hystrixHandle.getCommandKey());
            }
        }).doOnError(throwable -> { //错误信息处理
            log.error("hystrix execute exception:", throwable);
            exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.ERROR.getName());
            //执行下一个插件
            chain.execute(exchange);
        }).then();
    }

通过上面的执行逻辑,可以得到两个个关键点:

  • command对象的构建;
  • command.fetchObservable().subscribe()方法的执行;

command对象构建过程:

    private Command fetchCommand(final HystrixHandle hystrixHandle, final ServerWebExchange exchange, final SoulPluginChain chain) {
        //信号量模式
        if (hystrixHandle.getExecutionIsolationStrategy() == HystrixIsolationModeEnum.SEMAPHORE.getCode()) {
            return new HystrixCommand(HystrixBuilder.build(hystrixHandle),
                exchange, chain, hystrixHandle.getCallBackUri());
        }
        //线程池模式
        return new HystrixCommandOnThread(HystrixBuilder.buildForHystrixCommand(hystrixHandle),
            exchange, chain, hystrixHandle.getCallBackUri());
    }

Hystrix提供了两种隔离模式:一种是信号量,一种是线程池。本篇文章先分析基于信号量的隔离模式。

HystrixCommand继承了HystrixObservableCommand,这个类来自于com.netflix,就是Hystrix熔断器的类。

public class HystrixCommand extends HystrixObservableCommand<Void> implements Command 

HystrixCommand中,有重要的实现方法:

@Override
    protected Observable<Void> construct() {
        return RxReactiveStreams.toObservable(chain.execute(exchange));
    }

	//任务执行失败或者熔断打开的时候被执行
    @Override
    protected Observable<Void> resumeWithFallback() {
        return RxReactiveStreams.toObservable(doFallback());
    }

    private Mono<Void> doFallback() {
        if (isFailedExecution()) {
            log.error("hystrix execute have error: ", getExecutionException());
        }
        final Throwable exception = getExecutionException();
        return doFallback(exchange, exception);
    }

	//调用 subscribe 方法后 执行注事件
    @Override
    public Observable<Void> fetchObservable() {
        return this.toObservable();
    }

    @Override
    public URI getCallBackUri() {
        return callBackUri;
    }
  • construct + fetchObservable 负责注册请求任务事件,其中 fetchObservablesoul 自定义接口 Command 的方法,为了提供统一的 API doExecutor 调用。
  • 事件由 HystrixPlugin.doExecutor() 方法中的 subscribe 调用真正发起执行
  • resumeWithFallback 触发 Hrstrix fallback 机制,接口 Command default 方法 doFallback 负责真正执行 fallback 逻辑。

最后,本文主要分析了 HystrixPlugin doExecutor 的执行流程,关于hystrix熔断本身的机制并没有数量清楚,后续还需要再去看一看。

参考链接:

Soul网关中的Hystrix熔断插件(一)

本篇文章要体验Soul网关中对Hystrix熔断插件的支持。当系统承接的流量太大时,为防止这些大流量将系统压垮,常常考虑使用熔断机制,将请求断开,以保护系统。

参考之前的流程,启动soul-adminsoul网关,以及业务系统(本次的测试演示使用的是soul-examples-http)。注意在soul网关中,加入hystrix插件。

        <!-- soul hystrix plugin start-->
        <dependency>
            <groupId>org.dromara</groupId>
            <artifactId>soul-spring-boot-starter-plugin-hystrix</artifactId>
            <version>${project.version}</version>
        </dependency>
        <!-- soul hystrix plugin end-->

启动soul-admin后,在管理控制台也需要手动打开hystrix插件。

开启后,在hystrix插件中新建一个选择器:

然后创建相应的选择器规则:

规则中的字段含义:

  • 跳闸最小请求数量 :最小的请求量,至少要达到这个量才会触发熔断
  • 错误百分比阀值 : 这段时间内,发生异常的百分比。
  • 最大并发量 : 最大的并发量
  • 跳闸休眠时间(ms) :熔断以后恢复的时间。
  • 分组Key: 一般设置为:contextPath,如果不设置,默认值就是业务系统的contextPath
  • 命令Key: 一般设置为具体的路径接口。

以上准备工作就做完了,现在开始压测,压测工具使用的是SuperBenchmarker。压测命令如下,并发请求15,压10s

sb -u http://localhost:9195/http/order/findById?id=123 -c 15 -N 10

得到的结果信息如下:

---------------Finished!----------------
Finished at 2021/2/4 20:33:48 (took 00:00:13.6597429)
Status 500:    52239
Status 200:    18

RPS: 4673.3 (requests/second)
Max: 72ms
Min: 0ms
Avg: 0.1ms

  50%   below 0ms
  60%   below 0ms
  70%   below 0ms
  80%   below 0ms
  90%   below 0ms
  95%   below 0ms
  98%   below 2ms
  99%   below 3ms
99.9%   below 8ms

可以看到其中有很多请求都是返回的500,这是后端网关主动断开的请求。查看后端网关的日志信息:

2021-02-04 20:33:47.629  INFO 18504 --- [work-threads-17] o.d.soul.plugin.base.AbstractSoulPlugin  : hystrix rule success match , rule name :hystrix-rule-test
2021-02-04 20:33:47.629 ERROR 18504 --- [work-threads-17] o.d.soul.plugin.hystrix.HystrixPlugin    : hystrix execute have circuitBreaker is Open! groupKey:/http,commandKey:/order/findById
2021-02-04 20:33:47.629  INFO 18504 --- [work-threads-18] o.d.soul.plugin.base.AbstractSoulPlugin  : hystrix selector success match , selector name :hystrix
2021-02-04 20:33:47.629  INFO 18504 --- [work-threads-18] o.d.soul.plugin.base.AbstractSoulPlugin  : hystrix rule success match , rule name :hystrix-rule-test
2021-02-04 20:33:47.629 ERROR 18504 --- [work-threads-18] o.d.soul.plugin.hystrix.HystrixPlugin    : hystrix execute have circuitBreaker is Open! groupKey:/http,commandKey:/order/findById
2021-02-04 20:33:47.629  INFO 18504 --- [-work-threads-1] o.d.soul.plugin.base.AbstractSoulPlugin  : hystrix selector success match , selector name :hystrix
2021-02-04 20:33:47.629  INFO 18504 --- [-work-threads-1] o.d.soul.plugin.base.AbstractSoulPlugin  : hystrix rule success match , rule name :hystrix-rule-test
2021-02-04 20:33:47.629  INFO 18504 --- [-work-threads-2] o.d.soul.plugin.base.AbstractSoulPlugin  : hystrix selector success match , selector name :hystrix
2021-02-04 20:33:47.629  INFO 18504 --- [-work-threads-2] o.d.soul.plugin.base.AbstractSoulPlugin  : hystrix rule success match , rule name :hystrix-rule-test
2021-02-04 20:33:47.629 ERROR 18504 --- [-work-threads-2] o.d.soul.plugin.hystrix.HystrixPlugin    : hystrix execute have circuitBreaker is Open! groupKey:/http,commandKey:/order/findById
2021-02-04 20:33:47.630 ERROR 18504 --- [-work-threads-1] o.d.soul.plugin.hystrix.HystrixPlugin    : hystrix execute have circuitBreaker is Open! groupKey:/http,commandKey:/order/findById
2021-02-04 20:33:47.630  INFO 18504 --- [-work-threads-3] o.d.soul.plugin.base.AbstractSoulPlugin  : hystrix selector success match , selector name :hystrix

日志信息中明确记录了Error信息:hystrix execute have circuitBreaker is Open!。在业务系统无法接受请求时便主动断开(我们配置的规则中最大并发量是1)。这里就测试出了Soul网关中Hystrix熔断器的作用。

我们再试试把hystrix插件关掉再测一下:

$ sb -u http://localhost:9195/http/order/findById?id=123 -c 15 -N 10
Starting at 2021/2/4 20:44:11
[Press C to stop the test]
29871   (RPS: 2229.8)
---------------Finished!----------------
Finished at 2021/2/4 20:44:25 (took 00:00:13.5005204)
Status 200:    29871

RPS: 2691.4 (requests/second)
Max: 62ms
Min: 0ms
Avg: 1.2ms

  50%   below 1ms
  60%   below 1ms
  70%   below 1ms
  80%   below 1ms
  90%   below 2ms
  95%   below 3ms
  98%   below 4ms
  99%   below 4ms
99.9%   below 9ms

这一次的请求就都成功了。

最后,本文通过实际的案例体验了soul网关中的熔断插件hystrix。明天开始分析原理~

Soul网关中的Sofa插件执行原理(二)

在上一篇文章中,我们通过跟踪源码的方式理解了Sofa插件的执行原理,将发起的http请求转化为sofa的泛化调用,但是有个关键的地方没有展开讲:就是服务的配置信息是怎么来的?以及代理对象是怎么得到的?

本文的分析思路和之前的Apache Dubbo是一样的。

 public Mono<Object> genericInvoker(final String body, final MetaData metaData, final ServerWebExchange exchange) throws SoulException {
     //获取服务
        ConsumerConfig<GenericService> reference = ApplicationConfigCache.getInstance().get(metaData.getPath());
        if (Objects.isNull(reference) || StringUtils.isEmpty(reference.getInterfaceId())) {
            ApplicationConfigCache.getInstance().invalidate(metaData.getServiceName());
            reference = ApplicationConfigCache.getInstance().initRef(metaData);
        }
     	//获取代理对象
     GenericService genericService = reference.refer();
     
     //省略了其他代码
     
        genericService.$invoke(metaData.getMethodName(), pair.getLeft(), pair.getRight());
        return Mono.fromFuture(future.thenApply(ret -> {
            //省略了其他代码
        })).onErrorMap(SoulException::new);
    }

这篇文章,我们就来解决这个问题。先说结论:服务配置信息的来源是soul-admin同步到网关。

整个流程涉及到的类和方法如下:

soul-admin端:

  • WebsocketCollector:onMessage()
  • SyncDataServiceImpl:syncAll();
  • MetaDataService:syncData();

soul-bootstrap网关端:

  • SoulWebsocketClient: onMessage(),handleResult();
  • WebsocketDataHandler:executor();
  • AbstractDataHandler:handle();
  • MetaDataHandler:doRefresh();
  • SofaMetaDataSubscriber:onSubscribe();
  • ApplicationConfigCache:initRef(),build();

在本文的测试中,soul-admin端和soul-bootstrap网关端之间的数据同步是通过websocket进行。所以在网关启动的时候会进行数据同步操作。

soul-bootstrap网关端前面几个类主要处理同步的数据类型,最终流转到ApplicationConfigCache,在这里面进行构建服务build()

public ConsumerConfig<GenericService> build(final MetaData metaData) {
        ConsumerConfig<GenericService> reference = new ConsumerConfig<>();
        reference.setGeneric(true);
        reference.setApplication(applicationConfig);
        reference.setRegistry(registryConfig); //注册中心地址
        reference.setInterfaceId(metaData.getServiceName());
        reference.setProtocol(RpcConstants.PROTOCOL_TYPE_BOLT);
        reference.setInvokeType(RpcConstants.INVOKER_TYPE_CALLBACK);
        reference.setRepeatedReferLimit(-1);
        String rpcExt = metaData.getRpcExt();//元数据信息
        SofaParamExtInfo sofaParamExtInfo = GsonUtils.getInstance().fromJson(rpcExt, SofaParamExtInfo.class);
        if (Objects.nonNull(sofaParamExtInfo)) {
            if (StringUtils.isNoneBlank(sofaParamExtInfo.getLoadbalance())) {
                final String loadBalance = sofaParamExtInfo.getLoadbalance();
                reference.setLoadBalancer(buildLoadBalanceName(loadBalance));
            }
            Optional.ofNullable(sofaParamExtInfo.getTimeout()).ifPresent(reference::setTimeout);
            Optional.ofNullable(sofaParamExtInfo.getRetries()).ifPresent(reference::setRetries);
        }
  	  //从注册中心获取代理服务
        Object obj = reference.refer();
        if (obj != null) {
            log.info("init sofa reference success there meteData is :{}", metaData.toString());
            cache.put(metaData.getPath(), reference);//将服务信息放到cache中去
        }
        return reference;
    }

在同步过程中,将所有的元数据信息及注册中心的服务放到了cache中,在使用的时候,就会到这个cache中去拿。就是在文章开始的地方提到的泛化调用。

 public Mono<Object> genericInvoker(final String body, final MetaData metaData, final ServerWebExchange exchange) throws SoulException {
     //从缓存的cache中获取服务
        ConsumerConfig<GenericService> reference = ApplicationConfigCache.getInstance().get(metaData.getPath());
        if (Objects.isNull(reference) || StringUtils.isEmpty(reference.getInterfaceId())) {
            ApplicationConfigCache.getInstance().invalidate(metaData.getServiceName());
            reference = ApplicationConfigCache.getInstance().initRef(metaData);
        }
     	//获取代理对象
     GenericService genericService = reference.refer();
     
     //省略了其他代码
     
        genericService.$invoke(metaData.getMethodName(), pair.getLeft(), pair.getRight());
        return Mono.fromFuture(future.thenApply(ret -> {
            //省略了其他代码
        })).onErrorMap(SoulException::new);
    }

小结:本文再次梳理了Soul网关中Sofa插件里面的服务是如何被加载的,包括服务的配置信息和 代理对象的生成。