05 Feb 2021 |
Soul |
在上一篇文章中,体验到了soul
中hystrix
插件熔断的功能,今天来分析其中执行过程和相关原理。
在HystrixPlugin
插件中就是主要的核心处理逻辑:
protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
//省略了其他代码
//构建command对象
Command command = fetchCommand(hystrixHandle, exchange, chain);
return Mono.create(s -> {
//执行具体请求
Subscription sub = command.fetchObservable().subscribe(s::success,
s::error, s::success);
s.onCancel(sub::unsubscribe);
//熔断器打开了
if (command.isCircuitBreakerOpen()) {
log.error("hystrix execute have circuitBreaker is Open! groupKey:{},commandKey:{}", hystrixHandle.getGroupKey(), hystrixHandle.getCommandKey());
}
}).doOnError(throwable -> { //错误信息处理
log.error("hystrix execute exception:", throwable);
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.ERROR.getName());
//执行下一个插件
chain.execute(exchange);
}).then();
}
通过上面的执行逻辑,可以得到两个个关键点:
command
对象的构建;
command.fetchObservable().subscribe()
方法的执行;
command
对象构建过程:
private Command fetchCommand(final HystrixHandle hystrixHandle, final ServerWebExchange exchange, final SoulPluginChain chain) {
//信号量模式
if (hystrixHandle.getExecutionIsolationStrategy() == HystrixIsolationModeEnum.SEMAPHORE.getCode()) {
return new HystrixCommand(HystrixBuilder.build(hystrixHandle),
exchange, chain, hystrixHandle.getCallBackUri());
}
//线程池模式
return new HystrixCommandOnThread(HystrixBuilder.buildForHystrixCommand(hystrixHandle),
exchange, chain, hystrixHandle.getCallBackUri());
}
Hystrix
提供了两种隔离模式:一种是信号量,一种是线程池。本篇文章先分析基于信号量的隔离模式。
HystrixCommand
继承了HystrixObservableCommand
,这个类来自于com.netflix
,就是Hystrix
熔断器的类。
public class HystrixCommand extends HystrixObservableCommand<Void> implements Command
在HystrixCommand
中,有重要的实现方法:
@Override
protected Observable<Void> construct() {
return RxReactiveStreams.toObservable(chain.execute(exchange));
}
//任务执行失败或者熔断打开的时候被执行
@Override
protected Observable<Void> resumeWithFallback() {
return RxReactiveStreams.toObservable(doFallback());
}
private Mono<Void> doFallback() {
if (isFailedExecution()) {
log.error("hystrix execute have error: ", getExecutionException());
}
final Throwable exception = getExecutionException();
return doFallback(exchange, exception);
}
//调用 subscribe 方法后 执行注事件
@Override
public Observable<Void> fetchObservable() {
return this.toObservable();
}
@Override
public URI getCallBackUri() {
return callBackUri;
}
construct + fetchObservable
负责注册请求任务事件,其中 fetchObservable
是 soul
自定义接口 Command
的方法,为了提供统一的 API
给 doExecutor
调用。
- 事件由
HystrixPlugin.doExecutor()
方法中的 subscribe
调用真正发起执行
resumeWithFallback
触发 Hrstrix fallbac
k 机制,接口 Command
中 default
方法 doFallback
负责真正执行 fallback
逻辑。
最后,本文主要分析了 HystrixPlugin doExecutor
的执行流程,关于hystrix
熔断本身的机制并没有数量清楚,后续还需要再去看一看。
参考链接:
04 Feb 2021 |
Soul |
本篇文章要体验Soul
网关中对Hystrix
熔断插件的支持。当系统承接的流量太大时,为防止这些大流量将系统压垮,常常考虑使用熔断机制,将请求断开,以保护系统。
参考之前的流程,启动soul-admin
和soul
网关,以及业务系统(本次的测试演示使用的是soul-examples-http
)。注意在soul
网关中,加入hystrix
插件。
<!-- soul hystrix plugin start-->
<dependency>
<groupId>org.dromara</groupId>
<artifactId>soul-spring-boot-starter-plugin-hystrix</artifactId>
<version>${project.version}</version>
</dependency>
<!-- soul hystrix plugin end-->
启动soul-admin
后,在管理控制台也需要手动打开hystrix
插件。
开启后,在hystrix
插件中新建一个选择器:
然后创建相应的选择器规则:
规则中的字段含义:
- 跳闸最小请求数量 :最小的请求量,至少要达到这个量才会触发熔断
- 错误百分比阀值 : 这段时间内,发生异常的百分比。
- 最大并发量 : 最大的并发量
- 跳闸休眠时间
(ms)
:熔断以后恢复的时间。
- 分组
Key
: 一般设置为:contextPath
,如果不设置,默认值就是业务系统的contextPath
。
- 命令
Key
: 一般设置为具体的路径接口。
以上准备工作就做完了,现在开始压测,压测工具使用的是SuperBenchmarker
。压测命令如下,并发请求15,压10s
。
sb -u http://localhost:9195/http/order/findById?id=123 -c 15 -N 10
得到的结果信息如下:
---------------Finished!----------------
Finished at 2021/2/4 20:33:48 (took 00:00:13.6597429)
Status 500: 52239
Status 200: 18
RPS: 4673.3 (requests/second)
Max: 72ms
Min: 0ms
Avg: 0.1ms
50% below 0ms
60% below 0ms
70% below 0ms
80% below 0ms
90% below 0ms
95% below 0ms
98% below 2ms
99% below 3ms
99.9% below 8ms
可以看到其中有很多请求都是返回的500
,这是后端网关主动断开的请求。查看后端网关的日志信息:
2021-02-04 20:33:47.629 INFO 18504 --- [work-threads-17] o.d.soul.plugin.base.AbstractSoulPlugin : hystrix rule success match , rule name :hystrix-rule-test
2021-02-04 20:33:47.629 ERROR 18504 --- [work-threads-17] o.d.soul.plugin.hystrix.HystrixPlugin : hystrix execute have circuitBreaker is Open! groupKey:/http,commandKey:/order/findById
2021-02-04 20:33:47.629 INFO 18504 --- [work-threads-18] o.d.soul.plugin.base.AbstractSoulPlugin : hystrix selector success match , selector name :hystrix
2021-02-04 20:33:47.629 INFO 18504 --- [work-threads-18] o.d.soul.plugin.base.AbstractSoulPlugin : hystrix rule success match , rule name :hystrix-rule-test
2021-02-04 20:33:47.629 ERROR 18504 --- [work-threads-18] o.d.soul.plugin.hystrix.HystrixPlugin : hystrix execute have circuitBreaker is Open! groupKey:/http,commandKey:/order/findById
2021-02-04 20:33:47.629 INFO 18504 --- [-work-threads-1] o.d.soul.plugin.base.AbstractSoulPlugin : hystrix selector success match , selector name :hystrix
2021-02-04 20:33:47.629 INFO 18504 --- [-work-threads-1] o.d.soul.plugin.base.AbstractSoulPlugin : hystrix rule success match , rule name :hystrix-rule-test
2021-02-04 20:33:47.629 INFO 18504 --- [-work-threads-2] o.d.soul.plugin.base.AbstractSoulPlugin : hystrix selector success match , selector name :hystrix
2021-02-04 20:33:47.629 INFO 18504 --- [-work-threads-2] o.d.soul.plugin.base.AbstractSoulPlugin : hystrix rule success match , rule name :hystrix-rule-test
2021-02-04 20:33:47.629 ERROR 18504 --- [-work-threads-2] o.d.soul.plugin.hystrix.HystrixPlugin : hystrix execute have circuitBreaker is Open! groupKey:/http,commandKey:/order/findById
2021-02-04 20:33:47.630 ERROR 18504 --- [-work-threads-1] o.d.soul.plugin.hystrix.HystrixPlugin : hystrix execute have circuitBreaker is Open! groupKey:/http,commandKey:/order/findById
2021-02-04 20:33:47.630 INFO 18504 --- [-work-threads-3] o.d.soul.plugin.base.AbstractSoulPlugin : hystrix selector success match , selector name :hystrix
日志信息中明确记录了Error
信息:hystrix execute have circuitBreaker is Open!
。在业务系统无法接受请求时便主动断开(我们配置的规则中最大并发量是1)。这里就测试出了Soul
网关中Hystrix
熔断器的作用。
我们再试试把hystrix
插件关掉再测一下:
$ sb -u http://localhost:9195/http/order/findById?id=123 -c 15 -N 10
Starting at 2021/2/4 20:44:11
[Press C to stop the test]
29871 (RPS: 2229.8)
---------------Finished!----------------
Finished at 2021/2/4 20:44:25 (took 00:00:13.5005204)
Status 200: 29871
RPS: 2691.4 (requests/second)
Max: 62ms
Min: 0ms
Avg: 1.2ms
50% below 1ms
60% below 1ms
70% below 1ms
80% below 1ms
90% below 2ms
95% below 3ms
98% below 4ms
99% below 4ms
99.9% below 9ms
这一次的请求就都成功了。
最后,本文通过实际的案例体验了soul
网关中的熔断插件hystrix
。明天开始分析原理~
03 Feb 2021 |
Soul |
在上一篇文章中,我们通过跟踪源码的方式理解了Sofa
插件的执行原理,将发起的http
请求转化为sofa
的泛化调用,但是有个关键的地方没有展开讲:就是服务的配置信息是怎么来的?以及代理对象是怎么得到的?
本文的分析思路和之前的Apache Dubbo
是一样的。
public Mono<Object> genericInvoker(final String body, final MetaData metaData, final ServerWebExchange exchange) throws SoulException {
//获取服务
ConsumerConfig<GenericService> reference = ApplicationConfigCache.getInstance().get(metaData.getPath());
if (Objects.isNull(reference) || StringUtils.isEmpty(reference.getInterfaceId())) {
ApplicationConfigCache.getInstance().invalidate(metaData.getServiceName());
reference = ApplicationConfigCache.getInstance().initRef(metaData);
}
//获取代理对象
GenericService genericService = reference.refer();
//省略了其他代码
genericService.$invoke(metaData.getMethodName(), pair.getLeft(), pair.getRight());
return Mono.fromFuture(future.thenApply(ret -> {
//省略了其他代码
})).onErrorMap(SoulException::new);
}
这篇文章,我们就来解决这个问题。先说结论:服务配置信息的来源是soul-admin
同步到网关。
整个流程涉及到的类和方法如下:
soul-admin
端:
WebsocketCollector
:onMessage()
;
SyncDataServiceImpl
:syncAll()
;
MetaDataService
:syncData()
;
soul-bootstrap
网关端:
SoulWebsocketClient
: onMessage()
,handleResult()
;
WebsocketDataHandler
:executor()
;
AbstractDataHandler
:handle()
;
MetaDataHandler
:doRefresh()
;
SofaMetaDataSubscriber
:onSubscribe()
;
ApplicationConfigCache
:initRef()
,build()
;
在本文的测试中,soul-admin
端和soul-bootstrap
网关端之间的数据同步是通过websocket
进行。所以在网关启动的时候会进行数据同步操作。
在soul-bootstrap
网关端前面几个类主要处理同步的数据类型,最终流转到ApplicationConfigCache
,在这里面进行构建服务build()
。
public ConsumerConfig<GenericService> build(final MetaData metaData) {
ConsumerConfig<GenericService> reference = new ConsumerConfig<>();
reference.setGeneric(true);
reference.setApplication(applicationConfig);
reference.setRegistry(registryConfig); //注册中心地址
reference.setInterfaceId(metaData.getServiceName());
reference.setProtocol(RpcConstants.PROTOCOL_TYPE_BOLT);
reference.setInvokeType(RpcConstants.INVOKER_TYPE_CALLBACK);
reference.setRepeatedReferLimit(-1);
String rpcExt = metaData.getRpcExt();//元数据信息
SofaParamExtInfo sofaParamExtInfo = GsonUtils.getInstance().fromJson(rpcExt, SofaParamExtInfo.class);
if (Objects.nonNull(sofaParamExtInfo)) {
if (StringUtils.isNoneBlank(sofaParamExtInfo.getLoadbalance())) {
final String loadBalance = sofaParamExtInfo.getLoadbalance();
reference.setLoadBalancer(buildLoadBalanceName(loadBalance));
}
Optional.ofNullable(sofaParamExtInfo.getTimeout()).ifPresent(reference::setTimeout);
Optional.ofNullable(sofaParamExtInfo.getRetries()).ifPresent(reference::setRetries);
}
//从注册中心获取代理服务
Object obj = reference.refer();
if (obj != null) {
log.info("init sofa reference success there meteData is :{}", metaData.toString());
cache.put(metaData.getPath(), reference);//将服务信息放到cache中去
}
return reference;
}
在同步过程中,将所有的元数据信息及注册中心的服务放到了cache
中,在使用的时候,就会到这个cache
中去拿。就是在文章开始的地方提到的泛化调用。
public Mono<Object> genericInvoker(final String body, final MetaData metaData, final ServerWebExchange exchange) throws SoulException {
//从缓存的cache中获取服务
ConsumerConfig<GenericService> reference = ApplicationConfigCache.getInstance().get(metaData.getPath());
if (Objects.isNull(reference) || StringUtils.isEmpty(reference.getInterfaceId())) {
ApplicationConfigCache.getInstance().invalidate(metaData.getServiceName());
reference = ApplicationConfigCache.getInstance().initRef(metaData);
}
//获取代理对象
GenericService genericService = reference.refer();
//省略了其他代码
genericService.$invoke(metaData.getMethodName(), pair.getLeft(), pair.getRight());
return Mono.fromFuture(future.thenApply(ret -> {
//省略了其他代码
})).onErrorMap(SoulException::new);
}
小结:本文再次梳理了Soul
网关中Sofa
插件里面的服务是如何被加载的,包括服务的配置信息和 代理对象的生成。