04 May 2022 |
ShenYu |
本文介绍了如何通过Apache ShenYu
网关访问Dubbo
服务,主要内容包括从简单示例到核心调用流程分析,并对设计原理进行了总结。
1. 介绍
Apache ShenYu(Incubating) 是一个异步的,高性能的,跨语言的,响应式的 API
网关。兼容各种主流框架体系,支持热插拔,用户可以定制化开发,满足用户各种场景的现状和未来需求,经历过大规模场景的锤炼。
2021年5月,ShenYu
捐献给 Apache
软件基金会,Apache 基金会全票通过,顺利进入孵化器。
Apache Dubbo
是一款微服务开发框架,它提供了 RPC
通信 与 微服务治理 两大关键能力。这意味着,使用 Dubbo
开发的微服务,将具备相互之间的远程发现与通信能力, 同时利用 Dubbo 提供的丰富服务治理能力,可以实现诸如服务发现、负载均衡、流量调度等服务治理诉求。同时 Dubbo
是高度可扩展的,用户几乎可以在任意功能点去定制自己的实现,以改变框架的默认行为来满足自己的业务需求。
2. Dubbo快速开始
本小节介绍如何将Dubbo
服务接入到ShenYu
网关,您可以直接在工程下找到本小节的示例代码 。
2.1 启动shenyu-admin
shenyu-admin
是Apache ShenYu
后台管理系统, 启动的方式有多种,本文通过 本地部署 的方式启动。启动成功后,需要在基础配置->
插件管理中,把dubbo
插件设置为开启,并设置你的注册地址,请确保注册中心已经开启。
2.2 启动shenyu网关
在这里通过 源码 的方式启动,直接运行shenyu-bootstrap
中的ShenyuBootstrapApplication
。
在启动前,请确保网关已经引入相关依赖。如果客户端是apache dubbo
,注册中心使用zookeeper
,请参考如下配置:
<!-- apache shenyu apache dubbo plugin start-->
<dependency>
<groupId>org.apache.shenyu</groupId>
<artifactId>shenyu-spring-boot-starter-plugin-apache-dubbo</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo</artifactId>
<version>2.7.5</version>
</dependency>
<!-- Dubbo zookeeper registry dependency start -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.0.1</version>
<exclusions>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.1</version>
</dependency>
<!-- Dubbo zookeeper registry dependency end -->
<!-- apache dubbo plugin end-->
2.3 启动shenyu-examples-dubbo
以官网提供的例子为例 shenyu-examples-dubbo 。 假如dubbo
服务定义如下:
<beans /* ...... * />
<dubbo:application name="test-dubbo-service"/>
<dubbo:registry address="${dubbo.registry.address}"/>
<dubbo:protocol name="dubbo" port="20888"/>
<dubbo:service timeout="10000" interface="org.apache.shenyu.examples.dubbo.api.service.DubboTestService" ref="dubboTestService"/>
</beans>
声明应用服务名称,注册中心地址,使用dubbo
协议,声明服务接口,对应接口实现类:
/**
* DubboTestServiceImpl.
*/
@Service("dubboTestService")
public class DubboTestServiceImpl implements DubboTestService {
@Override
@ShenyuDubboClient(path = "/findById", desc = "Query by Id")
public DubboTest findById(final String id) {
return new DubboTest(id, "hello world shenyu Apache, findById");
}
//......
}
在接口实现类中,使用注解@ShenyuDubboClient
向shenyu-admin
注册服务。
在配置文件application.yml
中的配置信息:
server:
port: 8011
address: 0.0.0.0
servlet:
context-path: /
spring:
main:
allow-bean-definition-overriding: true
dubbo:
registry:
address: zookeeper://localhost:2181 # dubbo使用的注册中心
shenyu:
register:
registerType: http #注册方式
serverLists: http://localhost:9095 #注册地址
props:
username: admin
password: 123456
client:
dubbo:
props:
contextPath: /dubbo
appName: dubbo
在配置文件中,声明dubbo
使用的注册中心地址,dubbo
服务向shenyu-admin
注册,使用的方式是http
,注册地址是http://localhost:9095
。
关于注册方式的使用,请参考 应用客户端接入 。
2.4 调用dubbo服务
shenyu-examples-dubbo
项目成功启动之后会自动把加 @ShenyuDubboClient
注解的接口方法注册到网关。
打开 插件列表 -> Proxy -> dubbo
可以看到插件规则配置列表:
注册成功的选择器信息:
注册成功的规则信息:
选择器和规则是 Apache ShenYu
网关中最灵魂的东西。掌握好它,你可以对任何流量进行管理。对应为选择器与规则里面的匹配条件(conditions),根据不同的流量筛选规则,我们可以处理各种复杂的场景。流量筛选可以从Header
, URI
, Query
, Cookie
等等Http请求获取数据。
然后可以采用 Match
,=
,Regex
,Groovy
,Exclude
等匹配方式,匹配出你所预想的数据。多组匹配添加可以使用And/Or
的匹配策略。
具体的介绍与使用请看: 选择器与规则管理 。
发起GET
请求,通过ShenYu
网关调用dubbo
服务:
GET http://localhost:9195/dubbo/findById?id=100
Accept: application/json
成功响应之后,结果如下:
{
"name": "hello world shenyu Apache, findById",
"id": "100"
}
至此,就成功的通过http
请求访问dubbo
服务了,ShenYu
网关通过shenyu-plugin-dubbo
模块将http
协议转成了dubbo
协议。
3. 深入理解Dubbo插件
在运行上述demo
的过程中,是否存在一些疑问:
dubbo
服务是如何注册到shenyu-admin
?
shenyu-admin
是如何将数据同步到ShenYu
网关?
DubboPlugin
是如何将http
协议转换到到dubbo协议?
带着这些疑问,来深入理解dubbo
插件。
3.1 应用客户端接入
应用客户端接入是指将微服务接入到Apache ShenYu
网关,当前支持Http
、 Dubbo
、 Spring Cloud
、 gRPC
、 Motan
、 Sofa
、 Tars
等协议的接入。
将应用客户端接入到Apache ShenYu
网关是通过注册中心来实现的,涉及到客户端注册和服务端同步数据。注册中心支持Http
、Zookeeper
、Etcd
、Consul
和Nacos
。默认是通过Http
方式注册。
客户端接入的相关配置请参考 客户端接入配置 。
3.1.1 客户端注册
在你的微服务配置中声明注册中心客户端类型,如Http
或Zookeeper
。
应用程序启动时使用SPI
方式加载并初始化对应注册中心客户端,通过实现Spring Bean
相关的后置处理器接口,在其中获取需要进行注册的服务接口信息,将获取的信息放入Disruptor
中。
注册中心客户端从Disruptor
中读取数据,并将接口信息注册到shenyu-admin
,Disruptor
在其中起数据与操作解耦的作用,利于扩展。
3.1.2 服务端注册
在shenyu-admin
配置中声明注册中心服务端类型,如Http
或Zookeeper
。当shenyu-admin
启动时,读取配置类型,加载并初始化对应的注册中心服务端,注册中心服务端收到shenyu-client
注册的接口信息后,将其放入Disruptor
中,然后会触发注册处理逻辑,将服务接口信息更新并发布同步事件。
Disruptor
在其中起到数据与操作解耦,利于扩展。如果注册请求过多,导致注册异常,也有数据缓冲作用。
3.2 数据同步原理
数据同步是指在 shenyu-admin
后台操作数据以后,使用何种策略将数据同步到 Apache ShenYu
网关。Apache ShenYu
网关当前支持ZooKeeper
、WebSocket
、Http长轮询
、Nacos
、Etcd
和 Consul
进行数据同步。默认是通过WebSocket
进行数据同步。
数据同步的相关配置请参考 数据同步配置 。
3.2.1 数据同步的意义
网关是流量请求的入口,在微服务架构中承担了非常重要的角色,网关高可用的重要性不言而喻。在使用网关的过程中,为了满足业务诉求,经常需要变更配置,比如流控规则、路由规则等等。因此,网关动态配置是保障网关高可用的重要因素。
当前数据同步特性如下:
- 所有的配置都缓存在
Apache ShenYu
网关内存中,每次请求都使用本地缓存,速度非常快。
- 用户可以在
shenyu-admin
后台任意修改数据,并马上同步到网关内存。
- 支持
Apache ShenYu
的插件、选择器、规则数据、元数据、签名数据等数据同步。
- 所有插件的选择器,规则都是动态配置,立即生效,不需要重启服务。
- 数据同步方式支持
Zookeeper
、Http 长轮询
、Websocket
、Nacos
、Etcd
和 Consul
。
3.2.2 数据同步原理分析
下图展示了 Apache ShenYu
数据同步的流程,Apache ShenYu
网关在启动时,会从配置服务同步配置数据,并且支持推拉模式获取配置变更信息,然后更新本地缓存。管理员可以在管理后台(shenyu-admin
),变更用户权限、规则、插件、流量配置,通过推拉模式将变更信息同步给 Apache ShenYu
网关,具体是 push
模式,还是 pull
模式取决于使用哪种同步方式。
在最初的版本中,配置服务依赖 Zookeeper
实现,管理后台将变更信息 push
给网关。而现在可以支持 WebSocket
、Http长轮询
、Zookeeper
、Nacos
、Etcd
和 Consul
,通过在配置文件中设置 shenyu.sync.${strategy}
指定对应的同步策略,默认使用 webosocket
同步策略,可以做到秒级数据同步。但是,有一点需要注意的是,Apache ShenYu
网关 和 shenyu-admin
必须使用相同的同步策略。
如下图所示,shenyu-admin
在用户发生配置变更之后,会通过 EventPublisher
发出配置变更通知,由 EventDispatcher
处理该变更通知,然后根据配置的同步策略(http、weboscket、zookeeper、naocs、etcd、consul
),将配置发送给对应的事件处理器。
- 如果是
websocket
同步策略,则将变更后的数据主动推送给 shenyu-web
,并且在网关层,会有对应的 WebsocketDataHandler
处理器来处理 shenyu-admin
的数据推送。
- 如果是
zookeeper
同步策略,将变更数据更新到 zookeeper
,而 ZookeeperSyncCache
会监听到 zookeeper
的数据变更,并予以处理。
- 如果是
http
同步策略,由网关主动发起长轮询请求,默认有 90s
超时时间,如果 shenyu-admin
没有数据变更,则会阻塞 http
请求,如果有数据发生变更则响应变更的数据信息,如果超过 60s
仍然没有数据变更则响应空数据,网关层接到响应后,继续发起 http
请求,反复同样的请求。
3.3 流程分析
流程分析是从源码的角度,展示服务注册流程,数据同步流程和服务调用流程。
3.3.1 服务注册流程
使用注解@ShenyuDubboClient
标记需要注册到网关的dubbo
服务。
注解扫描通过ApacheDubboServiceBeanListener
完成,它实现了ApplicationListener<ContextRefreshedEvent>
接口,在Spring
容器启动过程中,发生上下文刷新事件时,开始执行事件处理方法onApplicationEvent()
。在重写的方法逻辑中,读取Dubbo
服务ServiceBean
,构建元数据对象和URI
对象,并向shenyu-admin
注册。
具体的注册逻辑由注册中心实现,请参考 客户端接入原理 。
客户端通过注册中心注册的元数据和URI
数据,在shenyu-admin
端进行处理,负责存储到数据库和同步给shenyu
网关。Dubbo
插件的客户端注册处理逻辑在ShenyuClientRegisterDubboServiceImpl
中。继承关系如下:
- ShenyuClientRegisterService:客户端注册服务,顶层接口;
- FallbackShenyuClientRegisterService:注册失败,提供重试操作;
- AbstractShenyuClientRegisterServiceImpl:抽象类,实现部分公共注册逻辑;
- ShenyuClientRegisterDubboServiceImpl:实现
Dubbo
插件的注册;
注册信息包括选择器,规则和元数据。
整体的dubbo
服务注册流程如下:
3.3.2 数据同步流程
- admin更新数据
假设在在后台管理系统中,新增一条选择器数据,请求会进入
SelectorController
类中的createSelector()
方法,它负责数据的校验,添加或更新数据,返回结果信息。
在SelectorServiceImpl
类中通过createOrUpdate()
方法完成数据的转换,保存到数据库,发布事件,更新upstream
。
在Service
类完成数据的持久化操作,即保存数据到数据库。发布变更数据通过eventPublisher.publishEvent()
完成,这个eventPublisher
对象是一个ApplicationEventPublisher
类,这个类的全限定名是org.springframework.context.ApplicationEventPublisher
,发布数据的功能正是是通过Spring
相关的功能来完成的。
当事件发布完成后,会自动进入到DataChangedEventDispatcher
类中的onApplicationEvent()
方法,根据不同数据类型和数据同步方式进行事件处理。
网关在启动时,根据指定的数据同步方式加载不同的配置类,初始化数据同步相关类。
在接收到数据后,进行反序列化操作,读取数据类型和操作类型。不同的数据类型,有不同的数据处理方式,所以有不同的实现类。但是它们之间也有相同的处理逻辑,所以可以通过模板方法设计模式来实现。相同的逻辑放在抽象类AbstractDataHandler
中的handle()
方法中,不同逻辑就交给各自的实现类。
新增一条选择器数据,是新增操作,会进入到SelectorDataHandler.doUpdate()
具体的数据处理逻辑中。
在通用插件数据订阅者CommonPluginDataSubscriber
,负责处理所有插件、选择器和规则信息
将数据保存到网关的内存中,BaseDataCache
是最终缓存数据的类,通过单例模式实现。选择器数据就存到了SELECTOR_MAP
这个Map
中。在后续使用的时候,也是从这里拿数据。
上述逻辑用流程图表示如下:
3.3.3 服务调用流程
在Dubbo
插件体系中,类继承关系如下:
ShenyuPlugin:顶层接口,定义接口方法;
AbstractShenyuPlugin:抽象类,实现插件共有逻辑;
AbstractDubboPlugin:dubbo插件抽象类,实现dubbo
共有逻辑(ShenYu网关支持ApacheDubbo和AlibabaDubbo);
ApacheDubboPlugin:ApacheDubbo插件。
- org.apache.shenyu.web.handler.ShenyuWebHandler.DefaultShenyuPluginChain#execute()
通过ShenYu
网关代理后,请求入口是ShenyuWebHandler
,它实现了org.springframework.web.server.WebHandler
接口,通过责任链设计模式将所有插件连接起来。
- org.apache.shenyu.plugin.base.AbstractShenyuPlugin#execute()
当请求到网关时,判断某个插件是否执行,是通过指定的匹配逻辑来完成。在execute()
方法中执行选择器和规则的匹配逻辑。
- org.apache.shenyu.plugin.global.GlobalPlugin#execute()
最先被执行的是GlobalPlugin
,它是一个全局插件,在execute()
方法中构建上下文信息。
- org.apache.shenyu.plugin.base.RpcParamTransformPlugin#execute()
接着被执行的是RpcParamTransformPlugin
, 它负责从http
请求中读取参数,保存到exchange
中,传递给rpc
服务。在execute()
方法中,执行该插件的核心逻辑:从exchange
中获取请求信息,根据请求传入的内容形式处理参数。
- org.apache.shenyu.plugin.dubbo.common.AbstractDubboPlugin
然后被执行的是DubboPlugin
。在doExecute()
方法中,主要是检查元数据和参数。在doDubboInvoker()
方法中设置特殊的上下文信息,然后开始dubbo
的泛化调用。
在genericInvoker()
方法中:
- 获取
ReferenceConfig
对象;
- 获取泛化服务
GenericService
对象;
- 构造请求参数
pair
对象;
- 发起异步的泛化调用。
通过泛化调用就可以实现在网关调用dubbo
服务了。
ReferenceConfig
对象是支持泛化调用的关键对象 ,它的初始化操作是在数据同步的时候完成的。
- org.apache.shenyu.plugin.response.ResponsePlugin#execute()
最后被执行的是ResponsePlugin
,它统一处理网关的响应结果信息。处理类型由MessageWriter
决定,类继承关系如下:
MessageWriter:接口,定义消息处理方法;
NettyClientMessageWriter:处理Netty
调用结果;
RPCMessageWriter:处理RPC
调用结果;
WebClientMessageWriter:处理WebClient
调用结果;
Dubbo
服务调用,处理结果是RPCMessageWriter
。
- org.apache.shenyu.plugin.response.strategy.RPCMessageWriter#writeWith()
在writeWith()
方法中处理响应结果,获取结果或处理异常。
分析至此,关于Dubbo
插件的源码分析就完成了,分析流程图如下:
4. 小结
本文从实际案例出发,由浅入深分析了ShenYu
网关对Dubbo服务的代理过程。涉及到的主要知识点如下:
- 通过责任链设计模式执行插件;
- 使用模板方法设计模式实现
AbstractShenyuPlugin
,处理通用的操作类型;
- 使用单例设计模式实现缓存数据类
BaseDataCache
;
- 通过
springboot starter
即可引入不同的注册中心和数同步方式,扩展性很好;
- 通过
admin
支持规则热更新,方便流量管控;
Disruptor
队列是为了数据与操作解耦,以及数据缓冲。
25 Apr 2022 |
ShenYu |
Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API
网关。
ShenYu
网关使用 dubbo
插件完成对 dubbo
服务的调用。你可以查看官方文档 Dubbo快速开始 了解如何使用该插件。
本文基于shenyu-2.4.3
版本进行源码分析,官网的介绍请参考 Dubbo服务接入 。
1. 服务注册
以官网提供的例子为例 shenyu-examples-dubbo 。 假如你的dubbo
服务定义如下(spring-dubbo.xml
):
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://code.alibabatech.com/schema/dubbo
https://code.alibabatech.com/schema/dubbo/dubbo.xsd">
<dubbo:application name="test-dubbo-service"/>
<dubbo:registry address="${dubbo.registry.address}"/>
<dubbo:protocol name="dubbo" port="20888"/>
<dubbo:service timeout="10000" interface="org.apache.shenyu.examples.dubbo.api.service.DubboTestService" ref="dubboTestService"/>
</beans>
声明应用服务名称,注册中心地址,使用dubbo
协议,声明服务接口,对应接口实现类:
/**
* DubboTestServiceImpl.
*/
@Service("dubboTestService")
public class DubboTestServiceImpl implements DubboTestService {
@Override
@ShenyuDubboClient(path = "/findById", desc = "Query by Id")
public DubboTest findById(final String id) {
return new DubboTest(id, "hello world shenyu Apache, findById");
}
//......
}
在接口实现类中,使用注解@ShenyuDubboClient
向shenyu-admin
注册服务。该注解的作用及原理,稍后再进行分析。
在配置文件application.yml
中的配置信息:
server:
port: 8011
address: 0.0.0.0
servlet:
context-path: /
spring:
main:
allow-bean-definition-overriding: true
dubbo:
registry:
address: zookeeper://localhost:2181 # dubbo使用的注册中心
shenyu:
register:
registerType: http #注册方式
serverLists: http://localhost:9095 #注册地址
props:
username: admin
password: 123456
client:
dubbo:
props:
contextPath: /dubbo
appName: dubbo
在配置文件中,声明dubbo
使用的注册中心地址,dubbo
服务向shenyu-admin
注册,使用的方式是http
,注册地址是http://localhost:9095
。
关于注册方式的使用,请参考 应用客户端接入 。
1.1 声明注册接口
使用注解@ShenyuDubboClient
将服务注册到网关。简单demo
如下:
// dubbo服务
@Service("dubboTestService")
public class DubboTestServiceImpl implements DubboTestService {
@Override
@ShenyuDubboClient(path = "/findById", desc = "Query by Id") // 需要注册的方法
public DubboTest findById(final String id) {
return new DubboTest(id, "hello world shenyu Apache, findById");
}
//......
}
注解定义:
/**
* 作用于类和方法上
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
@Inherited
public @interface ShenyuDubboClient {
//注册路径
String path();
//规则名称
String ruleName() default "";
//描述信息
String desc() default "";
//是否启用
boolean enabled() default true;
}
1.2 扫描注解信息
注解扫描通过ApacheDubboServiceBeanListener
完成,它实现了ApplicationListener<ContextRefreshedEvent>
接口,在Spring
容器启动过程中,发生上下文刷新事件时,开始执行事件处理方法onApplicationEvent()
。
在构造器实例化的过程中:
- 读取属性配置
- 开启线程池
- 启动注册中心,用于向
shenyu-admin
注册
public class ApacheDubboServiceBeanListener implements ApplicationListener<ContextRefreshedEvent> {
// ......
//构造器
public ApacheDubboServiceBeanListener(final PropertiesConfig clientConfig, final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
//1.读取属性配置
Properties props = clientConfig.getProps();
String contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH);
String appName = props.getProperty(ShenyuClientConstants.APP_NAME);
if (StringUtils.isBlank(contextPath)) {
throw new ShenyuClientIllegalArgumentException("apache dubbo client must config the contextPath or appName");
}
this.contextPath = contextPath;
this.appName = appName;
this.host = props.getProperty(ShenyuClientConstants.HOST);
this.port = props.getProperty(ShenyuClientConstants.PORT);
//2.开启线程池
executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("shenyu-apache-dubbo-client-thread-pool-%d").build());
//3.启动注册中心
publisher.start(shenyuClientRegisterRepository);
}
/**
* 上下文刷新事件,执行方法逻辑
*/
@Override
public void onApplicationEvent(final ContextRefreshedEvent contextRefreshedEvent) {
//......
}
- ApacheDubboServiceBeanListener#onApplicationEvent()
重写的方法逻辑:读取Dubbo
服务ServiceBean
,构建元数据对象和URI
对象,并向shenyu-admin
注册。
@Override
public void onApplicationEvent(final ContextRefreshedEvent contextRefreshedEvent) {
//读取ServiceBean
Map<String, ServiceBean> serviceBean = contextRefreshedEvent.getApplicationContext().getBeansOfType(ServiceBean.class);
if (serviceBean.isEmpty()) {
return;
}
//保证该方法只执行一次
if (!registered.compareAndSet(false, true)) {
return;
}
//处理元数据对象
for (Map.Entry<String, ServiceBean> entry : serviceBean.entrySet()) {
handler(entry.getValue());
}
//处理URI对象
serviceBean.values().stream().findFirst().ifPresent(bean -> {
publisher.publishEvent(buildURIRegisterDTO(bean));
});
}
private void handler(final ServiceBean<?> serviceBean) {
//获取代理对象
Object refProxy = serviceBean.getRef();
//获取class信息
Class<?> clazz = refProxy.getClass();
if (AopUtils.isAopProxy(refProxy)) {
clazz = AopUtils.getTargetClass(refProxy);
}
//获取所有方法
Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(clazz);
for (Method method : methods) {
//读取ShenyuDubboClient注解信息
ShenyuDubboClient shenyuDubboClient = method.getAnnotation(ShenyuDubboClient.class);
if (Objects.nonNull(shenyuDubboClient)) {
//构建元数据对象,并注册
publisher.publishEvent(buildMetaDataDTO(serviceBean, shenyuDubboClient, method));
}
}
}
private MetaDataRegisterDTO buildMetaDataDTO(final ServiceBean<?> serviceBean, final ShenyuDubboClient shenyuDubboClient, final Method method) {
//应用名称
String appName = buildAppName(serviceBean);
//方法路径
String path = contextPath + shenyuDubboClient.path();
//描述信息
String desc = shenyuDubboClient.desc();
//服务名称
String serviceName = serviceBean.getInterface();
//规则名称
String configRuleName = shenyuDubboClient.ruleName();
String ruleName = ("".equals(configRuleName)) ? path : configRuleName;
//方法名称
String methodName = method.getName();
//参数类型
Class<?>[] parameterTypesClazz = method.getParameterTypes();
String parameterTypes = Arrays.stream(parameterTypesClazz).map(Class::getName).collect(Collectors.joining(","));
return MetaDataRegisterDTO.builder()
.appName(appName)
.serviceName(serviceName)
.methodName(methodName)
.contextPath(contextPath)
.host(buildHost())
.port(buildPort(serviceBean))
.path(path)
.ruleName(ruleName)
.pathDesc(desc)
.parameterTypes(parameterTypes)
.rpcExt(buildRpcExt(serviceBean)) //dubbo服务的扩展信息
.rpcType(RpcTypeEnum.DUBBO.getName())
.enabled(shenyuDubboClient.enabled())
.build();
}
-
buildRpcExt()
dubbo
服务的扩展信息
private String buildRpcExt(final ServiceBean serviceBean) {
DubboRpcExt build = DubboRpcExt.builder()
.group(StringUtils.isNotEmpty(serviceBean.getGroup()) ? serviceBean.getGroup() : "")//分组
.version(StringUtils.isNotEmpty(serviceBean.getVersion()) ? serviceBean.getVersion() : "")//版本
.loadbalance(StringUtils.isNotEmpty(serviceBean.getLoadbalance()) ? serviceBean.getLoadbalance() : Constants.DEFAULT_LOADBALANCE)//负载均衡策略,默认随机
.retries(Objects.isNull(serviceBean.getRetries()) ? Constants.DEFAULT_RETRIES : serviceBean.getRetries())//重试次数,默认2
.timeout(Objects.isNull(serviceBean.getTimeout()) ? Constants.DEFAULT_CONNECT_TIMEOUT : serviceBean.getTimeout())//超时,默认3000
.sent(Objects.isNull(serviceBean.getSent()) ? Constants.DEFAULT_SENT : serviceBean.getSent())//sent,默认false
.cluster(StringUtils.isNotEmpty(serviceBean.getCluster()) ? serviceBean.getCluster() : Constants.DEFAULT_CLUSTER)//集群策略,默认failover
.url("")
.build();
return GsonUtils.getInstance().toJson(build);
}
private URIRegisterDTO buildURIRegisterDTO(final ServiceBean serviceBean) {
return URIRegisterDTO.builder()
.contextPath(this.contextPath) //上下文路径
.appName(buildAppName(serviceBean))//应用名称
.rpcType(RpcTypeEnum.DUBBO.getName())//rpc类型:dubbo
.host(buildHost()) //host
.port(buildPort(serviceBean))//port
.build();
}
具体的注册逻辑由注册中心实现,请参考 客户端接入原理 。
//向注册中心,发布注册事件
publisher.publishEvent();
1.3 处理注册信息
客户端通过注册中心注册的元数据和URI
数据,在shenyu-admin
端进行处理,负责存储到数据库和同步给shenyu
网关。Dubbo
插件的客户端注册处理逻辑在ShenyuClientRegisterDubboServiceImpl
中。继承关系如下:
- ShenyuClientRegisterService:客户端注册服务,顶层接口;
- FallbackShenyuClientRegisterService:注册失败,提供重试操作;
- AbstractShenyuClientRegisterServiceImpl:抽象类,实现部分公共注册逻辑;
- ShenyuClientRegisterDubboServiceImpl:实现
Dubbo
插件的注册;
1.3.1 注册服务
@Override
public String register(final MetaDataRegisterDTO dto) {
//1. 注册选择器
String selectorHandler = selectorHandler(dto);
String selectorId = selectorService.registerDefault(dto, PluginNameAdapter.rpcTypeAdapter(rpcType()), selectorHandler);
//2. 注册规则
String ruleHandler = ruleHandler();
RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);
ruleService.registerDefault(ruleDTO);
//3. 注册元数据
registerMetadata(dto);
//4. 注册ContextPath
String contextPath = dto.getContextPath();
if (StringUtils.isNotEmpty(contextPath)) {
registerContextPath(dto);
}
return ShenyuResultMessage.SUCCESS;
}
1.3.1.1 注册选择器
- org.apache.shenyu.admin.service.impl.SelectorServiceImpl#registerDefault()
构建contextPath
,查找选择器信息是否存在,如果存在就返回id
;不存在就创建默认的选择器信息。
@Override
public String registerDefault(final MetaDataRegisterDTO dto, final String pluginName, final String selectorHandler) {
// 构建contextPath
String contextPath = ContextPathUtils.buildContextPath(dto.getContextPath(), dto.getAppName());
// 通过名称查找选择器信息是否存在
SelectorDO selectorDO = findByNameAndPluginName(contextPath, pluginName);
if (Objects.isNull(selectorDO)) {
// 不存在就创建默认的选择器信息
return registerSelector(contextPath, pluginName, selectorHandler);
}
return selectorDO.getId();
}
-
默认选择器信息
在这里构建默认选择器信息及其条件属性。
//注册选择器
private String registerSelector(final String contextPath, final String pluginName, final String selectorHandler) {
//构建选择器
SelectorDTO selectorDTO = buildSelectorDTO(contextPath, pluginMapper.selectByName(pluginName).getId());
selectorDTO.setHandle(selectorHandler);
//注册默认选择器
return registerDefault(selectorDTO);
}
//构建选择器
private SelectorDTO buildSelectorDTO(final String contextPath, final String pluginId) {
//构建默认选择器
SelectorDTO selectorDTO = buildDefaultSelectorDTO(contextPath);
selectorDTO.setPluginId(pluginId);
//构建默认选择器的条件属性
selectorDTO.setSelectorConditions(buildDefaultSelectorConditionDTO(contextPath));
return selectorDTO;
}
private SelectorDTO buildDefaultSelectorDTO(final String name) {
return SelectorDTO.builder()
.name(name) // 名称
.type(SelectorTypeEnum.CUSTOM_FLOW.getCode()) // 默认类型自定义
.matchMode(MatchModeEnum.AND.getCode()) //默认匹配方式 and
.enabled(Boolean.TRUE) //默认启开启
.loged(Boolean.TRUE) //默认记录日志
.continued(Boolean.TRUE) //默认继续后续选择器
.sort(1) //默认顺序1
.build();
}
private List<SelectorConditionDTO> buildDefaultSelectorConditionDTO(final String contextPath) {
SelectorConditionDTO selectorConditionDTO = new SelectorConditionDTO();
selectorConditionDTO.setParamType(ParamTypeEnum.URI.getName()); // 默认参数类型URI
selectorConditionDTO.setParamName("/");
selectorConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); // 默认匹配策略 match
selectorConditionDTO.setParamValue(contextPath + AdminConstants.URI_SUFFIX); // 默认值 /contextPath/**
return Collections.singletonList(selectorConditionDTO);
}
@Override
public String registerDefault(final SelectorDTO selectorDTO) {
//选择器信息
SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);
//选择器条件属性
List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();
if (StringUtils.isEmpty(selectorDTO.getId())) {
// 向数据库插入选择器信息
selectorMapper.insertSelective(selectorDO);
// 向数据库插入选择器条件属性
selectorConditionDTOs.forEach(selectorConditionDTO -> {
selectorConditionDTO.setSelectorId(selectorDO.getId()); selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));
});
}
// 发布同步事件,向网关同步选择信息及其条件属性
publishEvent(selectorDO, selectorConditionDTOs);
return selectorDO.getId();
}
1.3.1.2 注册规则
在注册服务的第二步中,开始构建默认规则,然后注册规则。
@Override
public String register(final MetaDataRegisterDTO dto) {
//1. 注册选择器
//......
//2. 注册规则
// 默认规则处理属性
String ruleHandler = ruleHandler();
// 构建默认规则信息
RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);
// 注册规则
ruleService.registerDefault(ruleDTO);
//3. 注册元数据
//......
//4. 注册ContextPath
//......
return ShenyuResultMessage.SUCCESS;
}
@Override
protected String ruleHandler() {
// 默认规则处理属性
return new DubboRuleHandle().toJson();
}
Dubbo
插件默认规则处理属性
public class DubboRuleHandle implements RuleHandle {
/**
* dubbo服务版本信息.
*/
private String version;
/**
* 分组.
*/
private String group;
/**
* 重试次数.
*/
private Integer retries = 0;
/**
* 负载均衡策略:默认随机
*/
private String loadbalance = LoadBalanceEnum.RANDOM.getName();
/**
* 超时,默认3000
*/
private long timeout = Constants.TIME_OUT;
}
// 构建默认规则信息
private RuleDTO buildRpcDefaultRuleDTO(final String selectorId, final MetaDataRegisterDTO metaDataDTO, final String ruleHandler) {
return buildRuleDTO(selectorId, ruleHandler, metaDataDTO.getRuleName(), metaDataDTO.getPath());
}
// 构建默认规则信息
private RuleDTO buildRuleDTO(final String selectorId, final String ruleHandler, final String ruleName, final String path) {
RuleDTO ruleDTO = RuleDTO.builder()
.selectorId(selectorId) //关联的选择器id
.name(ruleName) //规则名称
.matchMode(MatchModeEnum.AND.getCode()) // 默认匹配模式 and
.enabled(Boolean.TRUE) // 默认开启
.loged(Boolean.TRUE) //默认记录日志
.sort(1) //默认顺序 1
.handle(ruleHandler)
.build();
RuleConditionDTO ruleConditionDTO = RuleConditionDTO.builder()
.paramType(ParamTypeEnum.URI.getName()) // 默认参数类型URI
.paramName("/")
.paramValue(path) //参数值path
.build();
if (path.indexOf("*") > 1) {
ruleConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); //如果path中有*,操作类型则默认为 match
} else {
ruleConditionDTO.setOperator(OperatorEnum.EQ.getAlias()); // 否则,默认操作类型 =
}
ruleDTO.setRuleConditions(Collections.singletonList(ruleConditionDTO));
return ruleDTO;
}
- org.apache.shenyu.admin.service.impl.RuleServiceImpl#registerDefault()
注册规则:向数据库插入记录,并向网关发布事件,进行数据同步。
@Override
public String registerDefault(final RuleDTO ruleDTO) {
RuleDO exist = ruleMapper.findBySelectorIdAndName(ruleDTO.getSelectorId(), ruleDTO.getName());
if (Objects.nonNull(exist)) {
return "";
}
RuleDO ruleDO = RuleDO.buildRuleDO(ruleDTO);
List<RuleConditionDTO> ruleConditions = ruleDTO.getRuleConditions();
if (StringUtils.isEmpty(ruleDTO.getId())) {
// 向数据库插入规则信息
ruleMapper.insertSelective(ruleDO);
//向数据库插入规则体条件属性
ruleConditions.forEach(ruleConditionDTO -> {
ruleConditionDTO.setRuleId(ruleDO.getId()); ruleConditionMapper.insertSelective(RuleConditionDO.buildRuleConditionDO(ruleConditionDTO));
});
}
// 向网关发布事件,进行数据同步
publishEvent(ruleDO, ruleConditions);
return ruleDO.getId();
}
1.3.1.3 注册元数据
元数据主要用于RPC
服务的调用。
@Override
public String register(final MetaDataRegisterDTO dto) {
//1. 注册选择器
//......
//2. 注册规则
//......
//3. 注册元数据
registerMetadata(dto);
//4. 注册ContextPath
//......
return ShenyuResultMessage.SUCCESS;
}
@Override
protected void registerMetadata(final MetaDataRegisterDTO dto) {
// 获取metaDataService
MetaDataService metaDataService = getMetaDataService();
// 元数据是否存在
MetaDataDO exist = metaDataService.findByPath(dto.getPath());
// 插入或更新元数据
metaDataService.saveOrUpdateMetaData(exist, dto);
}
@Override
public void saveOrUpdateMetaData(final MetaDataDO exist, final MetaDataRegisterDTO metaDataDTO) {
DataEventTypeEnum eventType;
// 数据类型转换 DTO->DO
MetaDataDO metaDataDO = MetaDataTransfer.INSTANCE.mapRegisterDTOToEntity(metaDataDTO);
// 插入数据
if (Objects.isNull(exist)) {
Timestamp currentTime = new Timestamp(System.currentTimeMillis());
metaDataDO.setId(UUIDUtils.getInstance().generateShortUuid());
metaDataDO.setDateCreated(currentTime);
metaDataDO.setDateUpdated(currentTime);
metaDataMapper.insert(metaDataDO);
eventType = DataEventTypeEnum.CREATE;
} else {
// 更新数据
metaDataDO.setId(exist.getId());
metaDataMapper.update(metaDataDO);
eventType = DataEventTypeEnum.UPDATE;
}
// 发布同步事件到网关
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.META_DATA, eventType,
Collections.singletonList(MetaDataTransfer.INSTANCE.mapToData(metaDataDO))));
}
1.3.2 注册URI
- org.apache.shenyu.admin.service.register.FallbackShenyuClientRegisterService#registerURI()
服务端收到客户端注册的URI
信息后,进行处理。
@Override
public String registerURI(final String selectorName, final List<URIRegisterDTO> uriList) {
String result;
String key = key(selectorName);
try {
this.removeFallBack(key);
// 注册URI
result = this.doRegisterURI(selectorName, uriList);
logger.info("Register success: {},{}", selectorName, uriList);
} catch (Exception ex) {
logger.warn("Register exception: cause:{}", ex.getMessage());
result = "";
// 注册失败后,进行重试
this.addFallback(key, new FallbackHolder(selectorName, uriList));
}
return result;
}
- org.apache.shenyu.admin.service.register.AbstractShenyuClientRegisterServiceImpl#doRegisterURI()
从客户端注册的URI
中获取有效的URI
,更新对应的选择器handle
属性,向网关发送选择器更新事件。
@Override
public String doRegisterURI(final String selectorName, final List<URIRegisterDTO> uriList) {
//参数检查
if (CollectionUtils.isEmpty(uriList)) {
return "";
}
//获取选择器信息
SelectorDO selectorDO = selectorService.findByNameAndPluginName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));
if (Objects.isNull(selectorDO)) {
throw new ShenyuException("doRegister Failed to execute,wait to retry.");
}
// 获取有效的URI
List<URIRegisterDTO> validUriList = uriList.stream().filter(dto -> Objects.nonNull(dto.getPort()) && StringUtils.isNotBlank(dto.getHost())).collect(Collectors.toList());
// 构建选择器的handle属性
String handler = buildHandle(validUriList, selectorDO);
if (handler != null) {
selectorDO.setHandle(handler);
SelectorData selectorData = selectorService.buildByName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));
selectorData.setHandle(handler);
// 向数据库更新选择器的handle属性
selectorService.updateSelective(selectorDO);
// 向网关发送选择器更新事件
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData)));
}
return ShenyuResultMessage.SUCCESS;
}
关于服务注册的源码分析就以及完成了,分析流程图如下:
接下来就分析dubbo
插件是如何根据这些信息向http
服务发起调用。
2. 服务调用
dubbo
插件是ShenYu
网关用于将http
请求转成 dubbo协议
,调用dubbo
服务的核心处理插件。
以官网提供的案例 Dubbo快速开始 为例,一个dubbo
服务通过注册中心向shenyu-admin
注册后,通过ShenYu
网关代理,请求如下:
GET http://localhost:9195/dubbo/findById?id=100
Accept: application/json
Dubbo
插件中,类继承关系如下:
- ShenyuPlugin:顶层接口,定义接口方法;
- AbstractShenyuPlugin:抽象类,实现插件共有逻辑;
- AbstractDubboPlugin:dubbo插件抽象类,实现
dubbo
共有逻辑;
- ApacheDubboPlugin:ApacheDubbo插件。
ShenYu网关支持ApacheDubbo和AlibabaDubbo
2.1 接收请求
通过ShenYu
网关代理后,请求入口是ShenyuWebHandler
,它实现了org.springframework.web.server.WebHandler
接口。
public final class ShenyuWebHandler implements WebHandler, ApplicationListener<SortPluginEvent> {
//......
/**
* 处理web请求
*/
@Override
public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {
// 执行默认插件链
Mono<Void> execute = new DefaultShenyuPluginChain(plugins).execute(exchange);
if (scheduled) {
return execute.subscribeOn(scheduler);
}
return execute;
}
private static class DefaultShenyuPluginChain implements ShenyuPluginChain {
private int index;
private final List<ShenyuPlugin> plugins;
/**
* 实例化默认插件链
*/
DefaultShenyuPluginChain(final List<ShenyuPlugin> plugins) {
this.plugins = plugins;
}
/**
* 执行每个插件.
*/
@Override
public Mono<Void> execute(final ServerWebExchange exchange) {
return Mono.defer(() -> {
if (this.index < plugins.size()) {
// 获取当前执行插件
ShenyuPlugin plugin = plugins.get(this.index++);
// 是否跳过当前插件
boolean skip = plugin.skip(exchange);
if (skip) {
// 如果跳过就执行下一个
return this.execute(exchange);
}
// 执行当前插件
return plugin.execute(exchange, this);
}
return Mono.empty();
});
}
}
}
2.2 匹配规则
- org.apache.shenyu.plugin.base.AbstractShenyuPlugin#execute()
在execute()
方法中执行选择器和规则的匹配逻辑。
@Override
public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
// 插件名称
String pluginName = named();
// 插件信息
PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
if (pluginData != null && pluginData.getEnabled()) {
// 选择器信息
final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);
if (CollectionUtils.isEmpty(selectors)) {
return handleSelectorIfNull(pluginName, exchange, chain);
}
// 匹配选择器
SelectorData selectorData = matchSelector(exchange, selectors);
if (Objects.isNull(selectorData)) {
return handleSelectorIfNull(pluginName, exchange, chain);
}
selectorLog(selectorData, pluginName);
// 规则信息
List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());
if (CollectionUtils.isEmpty(rules)) {
return handleRuleIfNull(pluginName, exchange, chain);
}
// 匹配规则
RuleData rule;
if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {
//get last
rule = rules.get(rules.size() - 1);
} else {
rule = matchRule(exchange, rules);
}
if (Objects.isNull(rule)) {
return handleRuleIfNull(pluginName, exchange, chain);
}
ruleLog(rule, pluginName);
// 执行插件
return doExecute(exchange, chain, selectorData, rule);
}
return chain.execute(exchange);
}
2.3 执行GlobalPlugin
- org.apache.shenyu.plugin.global.GlobalPlugin#execute()
GlobalPlugin
是一个全局插件,在execute()
方法中构建上下文信息。
public class GlobalPlugin implements ShenyuPlugin {
// 构建上下文信息
private final ShenyuContextBuilder builder;
//......
@Override
public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
// 构建上下文信息,传入到 exchange 中
ShenyuContext shenyuContext = builder.build(exchange);
exchange.getAttributes().put(Constants.CONTEXT, shenyuContext);
return chain.execute(exchange);
}
//......
}
- org.apache.shenyu.plugin.global.DefaultShenyuContextBuilder#build()
构建默认的上下文信息。
public class DefaultShenyuContextBuilder implements ShenyuContextBuilder {
//......
@Override
public ShenyuContext build(final ServerWebExchange exchange) {
//构建参数
Pair<String, MetaData> buildData = buildData(exchange);
//包装ShenyuContext
return decoratorMap.get(buildData.getLeft()).decorator(buildDefaultContext(exchange.getRequest()), buildData.getRight());
}
private Pair<String, MetaData> buildData(final ServerWebExchange exchange) {
//......
//根据请求的uri获取元数据
MetaData metaData = MetaDataCache.getInstance().obtain(request.getURI().getPath());
if (Objects.nonNull(metaData) && Boolean.TRUE.equals(metaData.getEnabled())) {
exchange.getAttributes().put(Constants.META_DATA, metaData);
return Pair.of(metaData.getRpcType(), metaData);
} else {
return Pair.of(RpcTypeEnum.HTTP.getName(), new MetaData());
}
}
//设置默认的上下文信息
private ShenyuContext buildDefaultContext(final ServerHttpRequest request) {
String appKey = request.getHeaders().getFirst(Constants.APP_KEY);
String sign = request.getHeaders().getFirst(Constants.SIGN);
String timestamp = request.getHeaders().getFirst(Constants.TIMESTAMP);
ShenyuContext shenyuContext = new ShenyuContext();
String path = request.getURI().getPath();
shenyuContext.setPath(path); //请求路径
shenyuContext.setAppKey(appKey);
shenyuContext.setSign(sign);
shenyuContext.setTimestamp(timestamp);
shenyuContext.setStartDateTime(LocalDateTime.now());
Optional.ofNullable(request.getMethod()).ifPresent(httpMethod -> shenyuContext.setHttpMethod(httpMethod.name()));//请求方法
return shenyuContext;
}
}
- org.apache.shenyu.plugin.dubbo.common.context.DubboShenyuContextDecorator#decorator()
包装ShenyuContext
:
public class DubboShenyuContextDecorator implements ShenyuContextDecorator {
@Override
public ShenyuContext decorator(final ShenyuContext shenyuContext, final MetaData metaData) {
shenyuContext.setModule(metaData.getAppName());//获取AppName
shenyuContext.setMethod(metaData.getServiceName()); //获取ServiceName
shenyuContext.setContextPath(metaData.getContextPath()); //获取contextPath
shenyuContext.setRpcType(RpcTypeEnum.DUBBO.getName()); // dubbo服务
return shenyuContext;
}
@Override
public String rpcType() {
return RpcTypeEnum.DUBBO.getName();
}
}
RpcParamTransformPlugin
负责从http
请求中读取参数,保存到exchange
中,传递给rpc
服务。
- org.apache.shenyu.plugin.base.RpcParamTransformPlugin#execute()
在execute()
方法中,执行该插件的核心逻辑:从exchange
中获取请求信息,根据请求传入的内容形式处理参数。
public class RpcParamTransformPlugin implements ShenyuPlugin {
@Override
public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
//从exchange中获取请求信息
ServerHttpRequest request = exchange.getRequest();
ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
if (Objects.nonNull(shenyuContext)) {
// 如果请求参数格式是APPLICATION_JSON
MediaType mediaType = request.getHeaders().getContentType();
if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {
return body(exchange, request, chain);
}
// 如果请求参数格式是APPLICATION_FORM_URLENCODED
if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType)) {
return formData(exchange, request, chain);
}
//一般查询请求
return query(exchange, request, chain);
}
return chain.execute(exchange);
}
// 如果请求参数格式是APPLICATION_JSON
private Mono<Void> body(final ServerWebExchange exchange, final ServerHttpRequest serverHttpRequest, final ShenyuPluginChain chain) {
return Mono.from(DataBufferUtils.join(serverHttpRequest.getBody())
.flatMap(body -> {
exchange.getAttributes().put(Constants.PARAM_TRANSFORM, resolveBodyFromRequest(body));//解析body,保存到exchange中
return chain.execute(exchange);
}));
}
// 如果请求参数格式是APPLICATION_FORM_URLENCODED
private Mono<Void> formData(final ServerWebExchange exchange, final ServerHttpRequest serverHttpRequest, final ShenyuPluginChain chain) {
return Mono.from(DataBufferUtils.join(serverHttpRequest.getBody())
.flatMap(map -> {
String param = resolveBodyFromRequest(map);
LinkedMultiValueMap<String, String> linkedMultiValueMap;
try {
linkedMultiValueMap = BodyParamUtils.buildBodyParams(URLDecoder.decode(param, StandardCharsets.UTF_8.name())); //格式化数据
} catch (UnsupportedEncodingException e) {
return Mono.error(e);
}
exchange.getAttributes().put(Constants.PARAM_TRANSFORM, HttpParamConverter.toMap(() -> linkedMultiValueMap));// 保存到exchange中
return chain.execute(exchange);
}));
}
//一般查询请求
private Mono<Void> query(final ServerWebExchange exchange, final ServerHttpRequest serverHttpRequest, final ShenyuPluginChain chain) {
exchange.getAttributes().put(Constants.PARAM_TRANSFORM, HttpParamConverter.ofString(() -> serverHttpRequest.getURI().getQuery()));//保存到exchange中
return chain.execute(exchange);
}
//......
}
2.5 执行DubboPlugin
- org.apache.shenyu.plugin.dubbo.common.AbstractDubboPlugin#doExecute()
在doExecute()
方法中,主要是检查元数据和参数。
public abstract class AbstractDubboPlugin extends AbstractShenyuPlugin {
@Override
public Mono<Void> doExecute(final ServerWebExchange exchange,
final ShenyuPluginChain chain,
final SelectorData selector,
final RuleData rule) {
//获取参数
String param = exchange.getAttribute(Constants.PARAM_TRANSFORM);
//获取上下文信息
ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
assert shenyuContext != null;
//获取元数据
MetaData metaData = exchange.getAttribute(Constants.META_DATA);
//检查元数据
if (!checkMetaData(metaData)) {
LOG.error(" path is : {}, meta data have error : {}", shenyuContext.getPath(), metaData);
exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.META_DATA_ERROR, null);
return WebFluxResultUtils.result(exchange, error);
}
//检查元数据和参数
if (Objects.nonNull(metaData) && StringUtils.isNoneBlank(metaData.getParameterTypes()) && StringUtils.isBlank(param)) {
exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.DUBBO_HAVE_BODY_PARAM, null);
return WebFluxResultUtils.result(exchange, error);
}
//设置rpcContext
this.rpcContext(exchange);
//进行dubbo服务调用
return this.doDubboInvoker(exchange, chain, selector, rule, metaData, param);
}
}
- org.apache.shenyu.plugin.apache.dubbo.ApacheDubboPlugin#doDubboInvoker()
在doDubboInvoker()
方法中设置特殊的上下文信息,然后开始dubbo的泛化调用。
public class ApacheDubboPlugin extends AbstractDubboPlugin {
@Override
protected Mono<Void> doDubboInvoker(final ServerWebExchange exchange,
final ShenyuPluginChain chain,
final SelectorData selector,
final RuleData rule,
final MetaData metaData,
final String param) {
//设置当前的选择器和规则信息,以及请求地址,用于支持dubbo的灰度
RpcContext.getContext().setAttachment(Constants.DUBBO_SELECTOR_ID, selector.getId());
RpcContext.getContext().setAttachment(Constants.DUBBO_RULE_ID, rule.getId());
RpcContext.getContext().setAttachment(Constants.DUBBO_REMOTE_ADDRESS, Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress());
//dubbo的泛化调用
final Mono<Object> result = dubboProxyService.genericInvoker(param, metaData, exchange);
//执行下一个插件
return result.then(chain.execute(exchange));
}
}
- org.apache.shenyu.plugin.apache.dubbo.proxy.ApacheDubboProxyService#genericInvoker()
genericInvoker()
方法:
- 获取
ReferenceConfig
对象;
- 获取泛化服务
GenericService
对象;
- 构造请求参数
pair
对象;
- 发起异步的泛化调用。
public class ApacheDubboProxyService {
//......
/**
* Generic invoker object.
*/
public Mono<Object> genericInvoker(final String body, final MetaData metaData, final ServerWebExchange exchange) throws ShenyuException {
//1.获取ReferenceConfig对象
ReferenceConfig<GenericService> reference = ApacheDubboConfigCache.getInstance().get(metaData.getPath());
//如果没有获取到
if (Objects.isNull(reference) || StringUtils.isEmpty(reference.getInterface())) {
//失效当前缓存的信息
ApacheDubboConfigCache.getInstance().invalidate(metaData.getPath());
//使用元数据进行再次初始化
reference = ApacheDubboConfigCache.getInstance().initRef(metaData);
}
//2.获取泛化服务GenericService对象
GenericService genericService = reference.get();
//3.构造请求参数pair对象
Pair<String[], Object[]> pair;
if (StringUtils.isBlank(metaData.getParameterTypes()) || ParamCheckUtils.dubboBodyIsEmpty(body)) {
pair = new ImmutablePair<>(new String[]{}, new Object[]{});
} else {
pair = dubboParamResolveService.buildParameter(body, metaData.getParameterTypes());
}
//4.发起异步的泛化调用
return Mono.fromFuture(invokeAsync(genericService, metaData.getMethodName(), pair.getLeft(), pair.getRight()).thenApply(ret -> {
//处理结果
if (Objects.isNull(ret)) {
ret = Constants.DUBBO_RPC_RESULT_EMPTY;
}
exchange.getAttributes().put(Constants.RPC_RESULT, ret);
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName());
return ret;
})).onErrorMap(exception -> exception instanceof GenericException ? new ShenyuException(((GenericException) exception).getExceptionMessage()) : new ShenyuException(exception));//处理异常
}
//泛化调用,异步操作
private CompletableFuture<Object> invokeAsync(final GenericService genericService, final String method, final String[] parameterTypes, final Object[] args) throws GenericException {
genericService.$invoke(method, parameterTypes, args);
Object resultFromFuture = RpcContext.getContext().getFuture();
return resultFromFuture instanceof CompletableFuture ? (CompletableFuture<Object>) resultFromFuture : CompletableFuture.completedFuture(resultFromFuture);
}
}
通过泛化调用就可以实现在网关调用dubbo
服务了。
ReferenceConfig
对象是支持泛化调用的关键对象 ,它的初始化操作是在数据同步的时候完成的。这里涉及两部分数据,一是同步的插件handler
信息,二是同步的插件元数据信息。
- org.apache.shenyu.plugin.dubbo.common.handler.AbstractDubboPluginDataHandler#handlerPlugin()
当插件数据更新时,数据同步模块会将数据从shenyu-admin
同步到网关。在handlerPlugin()
中执行初始化操作。
public abstract class AbstractDubboPluginDataHandler implements PluginDataHandler {
//......
//初始化配置缓存
protected abstract void initConfigCache(DubboRegisterConfig dubboRegisterConfig);
@Override
public void handlerPlugin(final PluginData pluginData) {
if (Objects.nonNull(pluginData) && Boolean.TRUE.equals(pluginData.getEnabled())) {
//数据反序列化
DubboRegisterConfig dubboRegisterConfig = GsonUtils.getInstance().fromJson(pluginData.getConfig(), DubboRegisterConfig.class);
DubboRegisterConfig exist = Singleton.INST.get(DubboRegisterConfig.class);
if (Objects.isNull(dubboRegisterConfig)) {
return;
}
if (Objects.isNull(exist) || !dubboRegisterConfig.equals(exist)) {
// 执行初始化操作
this.initConfigCache(dubboRegisterConfig);
}
Singleton.INST.single(DubboRegisterConfig.class, dubboRegisterConfig);
}
}
//......
}
- org.apache.shenyu.plugin.apache.dubbo.handler.ApacheDubboPluginDataHandler#initConfigCache()
执行初始化操作。
public class ApacheDubboPluginDataHandler extends AbstractDubboPluginDataHandler {
@Override
protected void initConfigCache(final DubboRegisterConfig dubboRegisterConfig) {
//执行初始化操作
ApacheDubboConfigCache.getInstance().init(dubboRegisterConfig);
//失效之前缓存的结果
ApacheDubboConfigCache.getInstance().invalidateAll();
}
}
- org.apache.shenyu.plugin.apache.dubbo.cache.ApacheDubboConfigCache#init()
在初始化中,设置registryConfig
和consumerConfig
。
public final class ApacheDubboConfigCache extends DubboConfigCache {
//......
/**
* 初始化
*/
public void init(final DubboRegisterConfig dubboRegisterConfig) {
//创建ApplicationConfig
if (Objects.isNull(applicationConfig)) {
applicationConfig = new ApplicationConfig("shenyu_proxy");
}
//协议或者地址发生改变时,需要更新registryConfig
if (needUpdateRegistryConfig(dubboRegisterConfig)) {
RegistryConfig registryConfigTemp = new RegistryConfig();
registryConfigTemp.setProtocol(dubboRegisterConfig.getProtocol());
registryConfigTemp.setId("shenyu_proxy");
registryConfigTemp.setRegister(false);
registryConfigTemp.setAddress(dubboRegisterConfig.getRegister()); Optional.ofNullable(dubboRegisterConfig.getGroup()).ifPresent(registryConfigTemp::setGroup);
registryConfig = registryConfigTemp;
}
//创建ConsumerConfig
if (Objects.isNull(consumerConfig)) {
consumerConfig = ApplicationModel.getConfigManager().getDefaultConsumer().orElseGet(() -> {
ConsumerConfig consumerConfig = new ConsumerConfig();
consumerConfig.refresh();
return consumerConfig;
});
//设置ConsumerConfig
Optional.ofNullable(dubboRegisterConfig.getThreadpool()).ifPresent(consumerConfig::setThreadpool);
Optional.ofNullable(dubboRegisterConfig.getCorethreads()).ifPresent(consumerConfig::setCorethreads);
Optional.ofNullable(dubboRegisterConfig.getThreads()).ifPresent(consumerConfig::setThreads);
Optional.ofNullable(dubboRegisterConfig.getQueues()).ifPresent(consumerConfig::setQueues);
}
}
//是否需要更新注册配置
private boolean needUpdateRegistryConfig(final DubboRegisterConfig dubboRegisterConfig) {
if (Objects.isNull(registryConfig)) {
return true;
}
return !Objects.equals(dubboRegisterConfig.getProtocol(), registryConfig.getProtocol())
|| !Objects.equals(dubboRegisterConfig.getRegister(), registryConfig.getAddress())
|| !Objects.equals(dubboRegisterConfig.getProtocol(), registryConfig.getProtocol());
}
//......
}
- org.apache.shenyu.plugin.apache.dubbo.subscriber.ApacheDubboMetaDataSubscriber#onSubscribe()
当元数据更新时,数据同步模块会将数据从shenyu-admin
同步到网关。在onSubscribe()
方法中执行元数据更新操作。
public class ApacheDubboMetaDataSubscriber implements MetaDataSubscriber {
//本地内存缓存
private static final ConcurrentMap<String, MetaData> META_DATA = Maps.newConcurrentMap();
//元数据发生更新
public void onSubscribe(final MetaData metaData) {
// dubbo服务的元数据更新
if (RpcTypeEnum.DUBBO.getName().equals(metaData.getRpcType())) {
//对应的元数据是否存在
MetaData exist = META_DATA.get(metaData.getPath());
if (Objects.isNull(exist) || Objects.isNull(ApacheDubboConfigCache.getInstance().get(metaData.getPath()))) {
// 首次初始化
ApacheDubboConfigCache.getInstance().initRef(metaData);
} else {
// 对应的元数据发生了更新操作
if (!Objects.equals(metaData.getServiceName(), exist.getServiceName())
|| !Objects.equals(metaData.getRpcExt(), exist.getRpcExt())
|| !Objects.equals(metaData.getParameterTypes(), exist.getParameterTypes())
|| !Objects.equals(metaData.getMethodName(), exist.getMethodName())) {
//根据最新的元数据再次构建ReferenceConfig
ApacheDubboConfigCache.getInstance().build(metaData);
}
}
//本地内存缓存
META_DATA.put(metaData.getPath(), metaData);
}
}
//删除元数据
public void unSubscribe(final MetaData metaData) {
if (RpcTypeEnum.DUBBO.getName().equals(metaData.getRpcType())) {
//使ReferenceConfig失效
ApacheDubboConfigCache.getInstance().invalidate(metaData.getPath());
META_DATA.remove(metaData.getPath());
}
}
}
- org.apache.shenyu.plugin.apache.dubbo.cache.ApacheDubboConfigCache#initRef()
通过metaData
构建ReferenceConfig
对象。
public final class ApacheDubboConfigCache extends DubboConfigCache {
//......
public ReferenceConfig<GenericService> initRef(final MetaData metaData) {
try {
//先尝试从缓存中获取,存在就直接返回
ReferenceConfig<GenericService> referenceConfig = cache.get(metaData.getPath());
if (StringUtils.isNoneBlank(referenceConfig.getInterface())) {
return referenceConfig;
}
} catch (ExecutionException e) {
LOG.error("init dubbo ref exception", e);
}
//不存在,就构建
return build(metaData);
}
/**
* Build reference config.
*/
@SuppressWarnings("deprecation")
public ReferenceConfig<GenericService> build(final MetaData metaData) {
if (Objects.isNull(applicationConfig) || Objects.isNull(registryConfig)) {
return new ReferenceConfig<>();
}
ReferenceConfig<GenericService> reference = new ReferenceConfig<>(); //新建ReferenceConfig
reference.setGeneric("true"); //泛化调用
reference.setAsync(true);//支持异步
reference.setApplication(applicationConfig);//设置应用配置
reference.setRegistry(registryConfig);//设置注册中心配置
reference.setConsumer(consumerConfig);//设置消费者配置
reference.setInterface(metaData.getServiceName());//设置服务接口
reference.setProtocol("dubbo");//设置dubbo协议
reference.setCheck(false); //不检查 service provider
reference.setLoadbalance("gray");//支持灰度
Map<String, String> parameters = new HashMap<>(2);
parameters.put("dispatcher", "direct");
reference.setParameters(parameters);//自定义参数
String rpcExt = metaData.getRpcExt();//rpc扩展参数
DubboParam dubboParam = parserToDubboParam(rpcExt);//反序列化
if (Objects.nonNull(dubboParam)) {
if (StringUtils.isNoneBlank(dubboParam.getVersion())) {
reference.setVersion(dubboParam.getVersion());//设置版本
}
if (StringUtils.isNoneBlank(dubboParam.getGroup())) {
reference.setGroup(dubboParam.getGroup());//设置分组
}
if (StringUtils.isNoneBlank(dubboParam.getUrl())) {
reference.setUrl(dubboParam.getUrl());//设置url
}
if (StringUtils.isNoneBlank(dubboParam.getCluster())) {
reference.setCluster(dubboParam.getCluster());//设置Cluster type
}
Optional.ofNullable(dubboParam.getTimeout()).ifPresent(reference::setTimeout);//timeout
Optional.ofNullable(dubboParam.getRetries()).ifPresent(reference::setRetries);//retires
Optional.ofNullable(dubboParam.getSent()).ifPresent(reference::setSent);//Whether to ack async-sent
}
try {
//获取GenericService
Object obj = reference.get();
if (Objects.nonNull(obj)) {
LOG.info("init apache dubbo reference success there meteData is :{}", metaData);
//缓存当前的reference
cache.put(metaData.getPath(), reference);
}
} catch (Exception e) {
LOG.error("init apache dubbo reference exception", e);
}
return reference;
}
//......
}
2.6 执行ResponsePlugin
- org.apache.shenyu.plugin.response.ResponsePlugin#execute()
响应结果由ResponsePlugin
插件处理。
@Override
public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
assert shenyuContext != null;
// 根据rpc类型处理结果
return writerMap.get(shenyuContext.getRpcType()).writeWith(exchange, chain);
}
处理类型由MessageWriter
决定,类继承关系如下:
- MessageWriter:接口,定义消息处理方法;
- NettyClientMessageWriter:处理
Netty
调用结果;
- RPCMessageWriter:处理
RPC
调用结果;
- WebClientMessageWriter:处理
WebClient
调用结果;
Dubbo
服务调用,处理结果当然是RPCMessageWriter
了。
- org.apache.shenyu.plugin.response.strategy.RPCMessageWriter#writeWith()
在writeWith()
方法中处理响应结果。
public class RPCMessageWriter implements MessageWriter {
@Override
public Mono<Void> writeWith(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
return chain.execute(exchange).then(Mono.defer(() -> {
Object result = exchange.getAttribute(Constants.RPC_RESULT); //获取结果
if (Objects.isNull(result)) { //处理异常
Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.SERVICE_RESULT_ERROR, null);
return WebFluxResultUtils.result(exchange, error);
}
return WebFluxResultUtils.result(exchange, result);//返回结果
}));
}
}
分析至此,关于Dubbo
插件的源码分析就完成了,分析流程图如下:
3. 小结
本文源码分析从Dubbo
服务注册开始,到Dubbo
插件的服务调用。Dubbo
插件主要用来处理将http
请求转成dubbo
协议,主要逻辑是通过泛化调用实现。
10 Apr 2022 |
ShenYu |
Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API
网关。
ShenYu
网关使用 divide
插件来处理 http
请求。你可以查看官方文档 Http快速开始 了解如何使用该插件。
本文基于shenyu-2.4.3
版本进行源码分析,官网的介绍请参考 Http服务接入 。
1. 服务注册
1.1 声明注册接口
使用注解@ShenyuSpringMvcClient
将服务注册到网关。简单demo
如下:
@RestController
@RequestMapping("/order")
@ShenyuSpringMvcClient(path = "/order") // API注册
public class OrderController {
@GetMapping("/findById")
@ShenyuSpringMvcClient(path = "/findById", desc = "Find by id") // 方法注册
public OrderDTO findById(@RequestParam("id") final String id) {
return build(id, "hello world findById");
}
}
注解定义:
/**
* 作用于类和方法上
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface ShenyuSpringMvcClient {
//注册路径
String path() default "";
//规则名称
String ruleName() default "";
//描述信息
String desc() default "";
//是否启用
boolean enabled() default true;
//注册元数据
boolean registerMetaData() default false;
}
1.2 扫描注解信息
注解扫描通过SpringMvcClientBeanPostProcessor
完成,它实现了BeanPostProcessor
接口,是Spring
提供的后置处理器。
在构造器实例化的过程中:
- 读取属性配置
- 添加注解,读取
path
信息
- 启动注册中心,向
shenyu-admin
注册
public class SpringMvcClientBeanPostProcessor implements BeanPostProcessor {
//...
/**
* 构造器实例化
*/
public SpringMvcClientBeanPostProcessor(final PropertiesConfig clientConfig,
final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
// 1. 读取属性配置
Properties props = clientConfig.getProps();
this.appName = props.getProperty(ShenyuClientConstants.APP_NAME);
this.contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH, "");
if (StringUtils.isBlank(appName) && StringUtils.isBlank(contextPath)) {
String errorMsg = "http register param must config the appName or contextPath";
LOG.error(errorMsg);
throw new ShenyuClientIllegalArgumentException(errorMsg);
}
this.isFull = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString()));
// 2. 添加注解
mappingAnnotation.add(ShenyuSpringMvcClient.class);
mappingAnnotation.add(PostMapping.class);
mappingAnnotation.add(GetMapping.class);
mappingAnnotation.add(DeleteMapping.class);
mappingAnnotation.add(PutMapping.class);
mappingAnnotation.add(RequestMapping.class);
// 3. 启动注册中心
publisher.start(shenyuClientRegisterRepository);
}
@Override
public Object postProcessAfterInitialization(@NonNull final Object bean, @NonNull final String beanName) throws BeansException {
// 重写后置处理器逻辑
return bean;
}
- SpringMvcClientBeanPostProcessor#postProcessAfterInitialization()
重写后置处理器逻辑:读取注解信息,构建元数据对象和URI
对象,并向shenyu-admin
注册。
@Override
public Object postProcessAfterInitialization(@NonNull final Object bean, @NonNull final String beanName) throws BeansException {
// 1. 如果是注册整个服务或者不是Controller类,就不处理
if (Boolean.TRUE.equals(isFull) || !hasAnnotation(bean.getClass(), Controller.class)) {
return bean;
}
// 2. 读取类上的注解 ShenyuSpringMvcClient
final ShenyuSpringMvcClient beanShenyuClient = AnnotationUtils.findAnnotation(bean.getClass(), ShenyuSpringMvcClient.class);
// 2.1构建superPath
final String superPath = buildApiSuperPath(bean.getClass());
// 2.2 是否注册整个类方法
if (Objects.nonNull(beanShenyuClient) && superPath.contains("*")) {
// 构建元数据对象,然后向shenyu-admin注册
publisher.publishEvent(buildMetaDataDTO(beanShenyuClient, pathJoin(contextPath, superPath)));
return bean;
}
// 3. 读取所有方法
final Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(bean.getClass());
for (Method method : methods) {
// 3.1 读取方法上的注解 ShenyuSpringMvcClient
ShenyuSpringMvcClient methodShenyuClient = AnnotationUtils.findAnnotation(method, ShenyuSpringMvcClient.class);
// 如果方法上面没有注解,就用类上面的注解
methodShenyuClient = Objects.isNull(methodShenyuClient) ? beanShenyuClient : methodShenyuClient;
if (Objects.nonNull(methodShenyuClient)) {
// 3.2 构建path信息,构建元数据对象,向shenyu-admin注册
publisher.publishEvent(buildMetaDataDTO(methodShenyuClient, buildApiPath(method, superPath)));
}
}
return bean;
}
- 1.如果是注册整个服务或者不是
Controller
类,就不处理
- 2.读取类上的注解
ShenyuSpringMvcClient
,如果是注册整个类,就在这里构建元数据对象,然后向shenyu-admin
注册
- 3.处理方法上的注解
ShenyuSpringMvcClient
,针对特定方法构建path
信息,构建元数据对象,然后向shenyu-admin
注册
这里有两个取path
的方法,需要特别说明一下:
private String buildApiSuperPath(@NonNull final Class<?> method) {
// 先从类上的注解ShenyuSpringMvcClient取path属性
ShenyuSpringMvcClient shenyuSpringMvcClient = AnnotationUtils.findAnnotation(method, ShenyuSpringMvcClient.class);
if (Objects.nonNull(shenyuSpringMvcClient) && StringUtils.isNotBlank(shenyuSpringMvcClient.path())) {
return shenyuSpringMvcClient.path();
}
// 从当前类的RequestMapping注解中取path信息
RequestMapping requestMapping = AnnotationUtils.findAnnotation(method, RequestMapping.class);
if (Objects.nonNull(requestMapping) && ArrayUtils.isNotEmpty(requestMapping.path()) && StringUtils.isNotBlank(requestMapping.path()[0])) {
return requestMapping.path()[0];
}
return "";
}
private String buildApiPath(@NonNull final Method method, @NonNull final String superPath) {
// 1. 读取方法上的注解ShenyuSpringMvcClient
ShenyuSpringMvcClient shenyuSpringMvcClient = AnnotationUtils.findAnnotation(method, ShenyuSpringMvcClient.class);
// 1.1如果存在path,就构建
if (Objects.nonNull(shenyuSpringMvcClient) && StringUtils.isNotBlank(shenyuSpringMvcClient.path())) {
//1.2完整 path = contextPath+superPath+methodPath
return pathJoin(contextPath, superPath, shenyuSpringMvcClient.path());
}
// 2.从方法的其他注解上获取path信息
final String path = getPathByMethod(method);
if (StringUtils.isNotBlank(path)) {
// 2.1 完整的path = contextPath+superPath+methodPath
return pathJoin(contextPath, superPath, path);
}
return pathJoin(contextPath, superPath);
}
private String getPathByMethod(@NonNull final Method method) {
// 遍历接口注解获取path信息
for (Class<? extends Annotation> mapping : mappingAnnotation) {
final String pathByAnnotation = getPathByAnnotation(AnnotationUtils.findAnnotation(method, mapping), pathAttributeNames);
if (StringUtils.isNotBlank(pathByAnnotation)) {
return pathByAnnotation;
}
}
return null;
}
扫描注解完成后,构建元数据对象,然后将该对象发送到shenyu-admin
,即可完成注册。
private MetaDataRegisterDTO buildMetaDataDTO(@NonNull final ShenyuSpringMvcClient shenyuSpringMvcClient, final String path) {
return MetaDataRegisterDTO.builder()
.contextPath(contextPath) // contextPath
.appName(appName) // appName
.path(path) // 注册路径,在网关规则匹配时使用
.pathDesc(shenyuSpringMvcClient.desc()) // 描述信息
.rpcType(RpcTypeEnum.HTTP.getName()) // divide插件,默认时http类型
.enabled(shenyuSpringMvcClient.enabled()) // 是否启用规则
.ruleName(StringUtils.defaultIfBlank(shenyuSpringMvcClient.ruleName(), path))//规则名称
.registerMetaData(shenyuSpringMvcClient.registerMetaData()) //是否注册元数据信息
.build();
}
具体的注册逻辑由注册中心实现,在之前的文章中已经分析过了,这里就不再深入分析。
1.3 注册URI信息
ContextRegisterListener
负责将客户端的URI
信息注册到shenyu-admin
,它实现了ApplicationListener
接口,发生上下文刷新事件ContextRefreshedEvent
时,执行onApplicationEvent()
方法,实现注册逻辑。
public class ContextRegisterListener implements ApplicationListener<ContextRefreshedEvent>, BeanFactoryAware {
//......
/**
* 构造器实例化
*/
public ContextRegisterListener(final PropertiesConfig clientConfig) {
// 读取属性配置
final Properties props = clientConfig.getProps();
this.isFull = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString()));
this.contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH);
if (Boolean.TRUE.equals(isFull)) {
if (StringUtils.isBlank(contextPath)) {
final String errorMsg = "http register param must config the contextPath";
LOG.error(errorMsg);
throw new ShenyuClientIllegalArgumentException(errorMsg);
}
}
this.port = Integer.parseInt(Optional.ofNullable(props.getProperty(ShenyuClientConstants.PORT)).orElseGet(() -> "-1"));
this.appName = props.getProperty(ShenyuClientConstants.APP_NAME);
this.protocol = props.getProperty(ShenyuClientConstants.PROTOCOL, ShenyuClientConstants.HTTP);
this.host = props.getProperty(ShenyuClientConstants.HOST);
}
@Override
public void setBeanFactory(final BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
}
// 执行应用事件
@Override
public void onApplicationEvent(@NonNull final ContextRefreshedEvent contextRefreshedEvent) {
// 保证该方法执行一次
if (!registered.compareAndSet(false, true)) {
return;
}
// 1. 如果是注册整个服务
if (Boolean.TRUE.equals(isFull)) {
// 构建元数据,并注册
publisher.publishEvent(buildMetaDataDTO());
}
try {
// 获取端口信息
final int mergedPort = port <= 0 ? PortUtils.findPort(beanFactory) : port;
// 2. 构建URI数据,并注册
publisher.publishEvent(buildURIRegisterDTO(mergedPort));
} catch (ShenyuException e) {
throw new ShenyuException(e.getMessage() + "please config ${shenyu.client.http.props.port} in xml/yml !");
}
}
// 构建URI数据
private URIRegisterDTO buildURIRegisterDTO(final int port) {
return URIRegisterDTO.builder()
.contextPath(this.contextPath) // contextPath
.appName(appName) // appName
.protocol(protocol) // 服务使用的协议
.host(IpUtils.isCompleteHost(this.host) ? this.host : IpUtils.getHost(this.host)) //主机
.port(port) // 端口
.rpcType(RpcTypeEnum.HTTP.getName()) // divide插件,默认注册http类型
.build();
}
// 构建元数据
private MetaDataRegisterDTO buildMetaDataDTO() {
return MetaDataRegisterDTO.builder()
.contextPath(contextPath)
.appName(appName)
.path(contextPath)
.rpcType(RpcTypeEnum.HTTP.getName())
.enabled(true)
.ruleName(contextPath)
.build();
}
}
1.4 处理注册信息
客户端通过注册中心注册的元数据和URI
数据,在shenyu-admin
进行处理,负责存储到数据库和同步给shenyu
网关。Divide
插件的客户端注册处理逻辑在ShenyuClientRegisterDivideServiceImpl
中。继承关系如下:
- ShenyuClientRegisterService:客户端注册服务,顶层接口;
- FallbackShenyuClientRegisterService:注册失败,提供重试操作;
- AbstractShenyuClientRegisterServiceImpl:抽象类,实现部分公共注册逻辑;
- AbstractContextPathRegisterService:抽象类,负责注册
ContextPath
;
- ShenyuClientRegisterDivideServiceImpl:实现
Divide
插件的注册;
1.4.1 注册服务
@Override
public String register(final MetaDataRegisterDTO dto) {
//1. 注册选择器
String selectorHandler = selectorHandler(dto);
String selectorId = selectorService.registerDefault(dto, PluginNameAdapter.rpcTypeAdapter(rpcType()), selectorHandler);
//2. 注册规则
String ruleHandler = ruleHandler();
RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);
ruleService.registerDefault(ruleDTO);
//3. 注册元数据
registerMetadata(dto);
//4. 注册ContextPath
String contextPath = dto.getContextPath();
if (StringUtils.isNotEmpty(contextPath)) {
registerContextPath(dto);
}
return ShenyuResultMessage.SUCCESS;
}
1.4.1.1 注册选择器
- org.apache.shenyu.admin.service.impl.SelectorServiceImpl#registerDefault()
构建contextPath
,查找选择器信息是否存在,如果存在就返回id
;不存在就创建默认的选择器信息。
@Override
public String registerDefault(final MetaDataRegisterDTO dto, final String pluginName, final String selectorHandler) {
// 构建contextPath
String contextPath = ContextPathUtils.buildContextPath(dto.getContextPath(), dto.getAppName());
// 通过名称查找选择器信息是否存在
SelectorDO selectorDO = findByNameAndPluginName(contextPath, pluginName);
if (Objects.isNull(selectorDO)) {
// 不存在就创建默认的选择器信息
return registerSelector(contextPath, pluginName, selectorHandler);
}
return selectorDO.getId();
}
-
默认选择器信息
在这里构建默认选择器信息及其条件属性。
//注册选择器
private String registerSelector(final String contextPath, final String pluginName, final String selectorHandler) {
//构建选择器
SelectorDTO selectorDTO = buildSelectorDTO(contextPath, pluginMapper.selectByName(pluginName).getId());
selectorDTO.setHandle(selectorHandler);
//注册默认选择器
return registerDefault(selectorDTO);
}
//构建选择器
private SelectorDTO buildSelectorDTO(final String contextPath, final String pluginId) {
//构建默认选择器
SelectorDTO selectorDTO = buildDefaultSelectorDTO(contextPath);
selectorDTO.setPluginId(pluginId);
//构建默认选择器的条件属性
selectorDTO.setSelectorConditions(buildDefaultSelectorConditionDTO(contextPath));
return selectorDTO;
}
private SelectorDTO buildDefaultSelectorDTO(final String name) {
return SelectorDTO.builder()
.name(name) // 名称
.type(SelectorTypeEnum.CUSTOM_FLOW.getCode()) // 默认类型自定义
.matchMode(MatchModeEnum.AND.getCode()) //默认匹配方式 and
.enabled(Boolean.TRUE) //默认启开启
.loged(Boolean.TRUE) //默认记录日志
.continued(Boolean.TRUE) //默认继续后续选择器
.sort(1) //默认顺序1
.build();
}
private List<SelectorConditionDTO> buildDefaultSelectorConditionDTO(final String contextPath) {
SelectorConditionDTO selectorConditionDTO = new SelectorConditionDTO();
selectorConditionDTO.setParamType(ParamTypeEnum.URI.getName()); // 默认参数类型URI
selectorConditionDTO.setParamName("/");
selectorConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); // 默认匹配策略 match
selectorConditionDTO.setParamValue(contextPath + AdminConstants.URI_SUFFIX); // 默认值 /contextPath/**
return Collections.singletonList(selectorConditionDTO);
}
@Override
public String registerDefault(final SelectorDTO selectorDTO) {
//选择器信息
SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);
//选择器条件属性
List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();
if (StringUtils.isEmpty(selectorDTO.getId())) {
// 向数据库插入选择器信息
selectorMapper.insertSelective(selectorDO);
// 向数据库插入选择器条件属性
selectorConditionDTOs.forEach(selectorConditionDTO -> {
selectorConditionDTO.setSelectorId(selectorDO.getId()); selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));
});
}
// 发布同步事件,向网关同步选择信息及其条件属性
publishEvent(selectorDO, selectorConditionDTOs);
return selectorDO.getId();
}
1.4.1.2 注册规则
在注册服务的第二步中,开始构建默认规则,然后注册规则。
@Override
public String register(final MetaDataRegisterDTO dto) {
//1. 注册选择器
//......
//2. 注册规则
// 默认规则处理属性
String ruleHandler = ruleHandler();
// 构建默认规则信息
RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);
// 注册规则
ruleService.registerDefault(ruleDTO);
//3. 注册元数据
//......
//4. 注册ContextPath
//......
return ShenyuResultMessage.SUCCESS;
}
@Override
protected String ruleHandler() {
// 默认规则处理属性
return new DivideRuleHandle().toJson();
}
Divide
插件默认规则处理属性
public class DivideRuleHandle implements RuleHandle {
/**
* 负载均衡:默认随机
*/
private String loadBalance = LoadBalanceEnum.RANDOM.getName();
/**
* 重试策略:默认重试当前服务
*/
private String retryStrategy = RetryEnum.CURRENT.getName();
/**
* 重试次数:默认3次
*/
private int retry = 3;
/**
* 调用超时:默认 3000
*/
private long timeout = Constants.TIME_OUT;
/**
* header最大值:10240
*/
private long headerMaxSize = Constants.HEADER_MAX_SIZE;
/**
* request最大值:102400
*/
private long requestMaxSize = Constants.REQUEST_MAX_SIZE;
}
// 构建默认规则信息
private RuleDTO buildRpcDefaultRuleDTO(final String selectorId, final MetaDataRegisterDTO metaDataDTO, final String ruleHandler) {
return buildRuleDTO(selectorId, ruleHandler, metaDataDTO.getRuleName(), metaDataDTO.getPath());
}
// 构建默认规则信息
private RuleDTO buildRuleDTO(final String selectorId, final String ruleHandler, final String ruleName, final String path) {
RuleDTO ruleDTO = RuleDTO.builder()
.selectorId(selectorId) //关联的选择器id
.name(ruleName) //规则名称
.matchMode(MatchModeEnum.AND.getCode()) // 默认匹配模式 and
.enabled(Boolean.TRUE) // 默认开启
.loged(Boolean.TRUE) //默认记录日志
.sort(1) //默认顺序 1
.handle(ruleHandler)
.build();
RuleConditionDTO ruleConditionDTO = RuleConditionDTO.builder()
.paramType(ParamTypeEnum.URI.getName()) // 默认参数类型URI
.paramName("/")
.paramValue(path) //参数值path
.build();
if (path.indexOf("*") > 1) {
ruleConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); //如果path中有*,操作类型则默认为 match
} else {
ruleConditionDTO.setOperator(OperatorEnum.EQ.getAlias()); // 否则,默认操作类型 =
}
ruleDTO.setRuleConditions(Collections.singletonList(ruleConditionDTO));
return ruleDTO;
}
- org.apache.shenyu.admin.service.impl.RuleServiceImpl#registerDefault()
注册规则:向数据库插入记录,并向网关发布事件,进行数据同步。
@Override
public String registerDefault(final RuleDTO ruleDTO) {
RuleDO exist = ruleMapper.findBySelectorIdAndName(ruleDTO.getSelectorId(), ruleDTO.getName());
if (Objects.nonNull(exist)) {
return "";
}
RuleDO ruleDO = RuleDO.buildRuleDO(ruleDTO);
List<RuleConditionDTO> ruleConditions = ruleDTO.getRuleConditions();
if (StringUtils.isEmpty(ruleDTO.getId())) {
// 向数据库插入规则信息
ruleMapper.insertSelective(ruleDO);
//向数据库插入规则体条件属性
ruleConditions.forEach(ruleConditionDTO -> {
ruleConditionDTO.setRuleId(ruleDO.getId()); ruleConditionMapper.insertSelective(RuleConditionDO.buildRuleConditionDO(ruleConditionDTO));
});
}
// 向网关发布事件,进行数据同步
publishEvent(ruleDO, ruleConditions);
return ruleDO.getId();
}
1.4.1.3 注册元数据
@Override
public String register(final MetaDataRegisterDTO dto) {
//1. 注册选择器
//......
//2. 注册规则
//......
//3. 注册元数据
registerMetadata(dto);
//4. 注册ContextPath
//......
return ShenyuResultMessage.SUCCESS;
}
@Override
protected void registerMetadata(final MetaDataRegisterDTO dto) {
if (dto.isRegisterMetaData()) { // 如果注册元数据
// 获取metaDataService
MetaDataService metaDataService = getMetaDataService();
// 元数据是否存在
MetaDataDO exist = metaDataService.findByPath(dto.getPath());
// 插入或更新元数据
metaDataService.saveOrUpdateMetaData(exist, dto);
}
}
@Override
public void saveOrUpdateMetaData(final MetaDataDO exist, final MetaDataRegisterDTO metaDataDTO) {
DataEventTypeEnum eventType;
// 数据类型转换 DTO->DO
MetaDataDO metaDataDO = MetaDataTransfer.INSTANCE.mapRegisterDTOToEntity(metaDataDTO);
// 插入数据
if (Objects.isNull(exist)) {
Timestamp currentTime = new Timestamp(System.currentTimeMillis());
metaDataDO.setId(UUIDUtils.getInstance().generateShortUuid());
metaDataDO.setDateCreated(currentTime);
metaDataDO.setDateUpdated(currentTime);
metaDataMapper.insert(metaDataDO);
eventType = DataEventTypeEnum.CREATE;
} else {
// 更新数据
metaDataDO.setId(exist.getId());
metaDataMapper.update(metaDataDO);
eventType = DataEventTypeEnum.UPDATE;
}
// 发布同步事件到网关
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.META_DATA, eventType,
Collections.singletonList(MetaDataTransfer.INSTANCE.mapToData(metaDataDO))));
}
1.4.1.4 注册ContextPath
@Override
public String register(final MetaDataRegisterDTO dto) {
//1. 注册选择器
//......
//2. 注册规则
//......
//3. 注册元数据
//......
//4. 注册ContextPath
String contextPath = dto.getContextPath();
if (StringUtils.isNotEmpty(contextPath)) {
registerContextPath(dto);
}
return ShenyuResultMessage.SUCCESS;
}
- org.apache.shenyu.admin.service.register.AbstractContextPathRegisterService#registerContextPath()
@Override
public void registerContextPath(final MetaDataRegisterDTO dto) {
// 设置选择器的contextPath
String contextPathSelectorId = getSelectorService().registerDefault(dto, PluginEnum.CONTEXT_PATH.getName(), "");
ContextMappingRuleHandle handle = new ContextMappingRuleHandle();
handle.setContextPath(PathUtils.decoratorContextPath(dto.getContextPath()));
// 设置规则的contextPath
getRuleService().registerDefault(buildContextPathDefaultRuleDTO(contextPathSelectorId, dto, handle.toJson()));
}
1.4.2 注册URI
- org.apache.shenyu.admin.service.register.FallbackShenyuClientRegisterService#registerURI()
服务端收到客户端注册的URI
信息后,进行处理。
@Override
public String registerURI(final String selectorName, final List<URIRegisterDTO> uriList) {
String result;
String key = key(selectorName);
try {
this.removeFallBack(key);
// 注册URI
result = this.doRegisterURI(selectorName, uriList);
logger.info("Register success: {},{}", selectorName, uriList);
} catch (Exception ex) {
logger.warn("Register exception: cause:{}", ex.getMessage());
result = "";
// 注册失败后,进行重试
this.addFallback(key, new FallbackHolder(selectorName, uriList));
}
return result;
}
- org.apache.shenyu.admin.service.register.AbstractShenyuClientRegisterServiceImpl#doRegisterURI()
从客户端注册的URI
中获取有效的URI
,更新对应的选择器handle
属性,向网关发送选择器更新事件。
@Override
public String doRegisterURI(final String selectorName, final List<URIRegisterDTO> uriList) {
//参数检查
if (CollectionUtils.isEmpty(uriList)) {
return "";
}
//获取选择器信息
SelectorDO selectorDO = selectorService.findByNameAndPluginName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));
if (Objects.isNull(selectorDO)) {
throw new ShenyuException("doRegister Failed to execute,wait to retry.");
}
// 获取有效的URI
List<URIRegisterDTO> validUriList = uriList.stream().filter(dto -> Objects.nonNull(dto.getPort()) && StringUtils.isNotBlank(dto.getHost())).collect(Collectors.toList());
// 构建选择器的handle属性
String handler = buildHandle(validUriList, selectorDO);
if (handler != null) {
selectorDO.setHandle(handler);
SelectorData selectorData = selectorService.buildByName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));
selectorData.setHandle(handler);
// 向数据库更新选择器的handle属性
selectorService.updateSelective(selectorDO);
// 向网关发送选择器更新事件
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData)));
}
return ShenyuResultMessage.SUCCESS;
}
关于服务注册的源码分析就以及完成了,分析流程图如下:
接下来就分析divide
插件是如何根据这些信息向http
服务发起调用。
2. 服务调用
divide
插件是网关用于处理 http协议
请求的核心处理插件。
以官网提供的案例 Http快速开始 为例,一个直连请求如下:
GET http://localhost:8189/order/findById?id=100
Accept: application/json
通过ShenYu
网关代理后,请求如下:
GET http://localhost:9195/http/order/findById?id=100
Accept: application/json
通过ShenYu
网关代理后的服务仍然能够请求到之前的服务,在这里起作用的就是divide
插件。类继承关系如下:
- ShenyuPlugin:顶层接口,定义接口方法;
- AbstractShenyuPlugin:抽象类,实现插件共有逻辑;
- DividePlugin:Divide插件。
2.1 接收请求
通过ShenYu
网关代理后,请求入口是ShenyuWebHandler
,它实现了org.springframework.web.server.WebHandler
接口。
public final class ShenyuWebHandler implements WebHandler, ApplicationListener<SortPluginEvent> {
//......
/**
* 处理web请求
*/
@Override
public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {
// 执行默认插件链
Mono<Void> execute = new DefaultShenyuPluginChain(plugins).execute(exchange);
if (scheduled) {
return execute.subscribeOn(scheduler);
}
return execute;
}
private static class DefaultShenyuPluginChain implements ShenyuPluginChain {
private int index;
private final List<ShenyuPlugin> plugins;
/**
* 实例化默认插件链
*/
DefaultShenyuPluginChain(final List<ShenyuPlugin> plugins) {
this.plugins = plugins;
}
/**
* 执行每个插件.
*/
@Override
public Mono<Void> execute(final ServerWebExchange exchange) {
return Mono.defer(() -> {
if (this.index < plugins.size()) {
// 获取当前执行插件
ShenyuPlugin plugin = plugins.get(this.index++);
// 是否跳过当前插件
boolean skip = plugin.skip(exchange);
if (skip) {
// 如果跳过就执行下一个
return this.execute(exchange);
}
// 执行当前插件
return plugin.execute(exchange, this);
}
return Mono.empty();
});
}
}
}
2.2 匹配规则
- org.apache.shenyu.plugin.base.AbstractShenyuPlugin#execute()
在execute()
方法中执行选择器和规则的匹配逻辑。
@Override
public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
// 插件名称
String pluginName = named();
// 插件信息
PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
if (pluginData != null && pluginData.getEnabled()) {
// 选择器信息
final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);
if (CollectionUtils.isEmpty(selectors)) {
return handleSelectorIfNull(pluginName, exchange, chain);
}
// 匹配选择器
SelectorData selectorData = matchSelector(exchange, selectors);
if (Objects.isNull(selectorData)) {
return handleSelectorIfNull(pluginName, exchange, chain);
}
selectorLog(selectorData, pluginName);
// 规则信息
List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());
if (CollectionUtils.isEmpty(rules)) {
return handleRuleIfNull(pluginName, exchange, chain);
}
// 匹配规则
RuleData rule;
if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {
//get last
rule = rules.get(rules.size() - 1);
} else {
rule = matchRule(exchange, rules);
}
if (Objects.isNull(rule)) {
return handleRuleIfNull(pluginName, exchange, chain);
}
ruleLog(rule, pluginName);
// 执行插件
return doExecute(exchange, chain, selectorData, rule);
}
return chain.execute(exchange);
}
2.3 执行divide插件
- org.apache.shenyu.plugin.divide.DividePlugin#doExecute()
在doExecute()
方法中执行divide
插件的具体逻辑:
- 校验
header
大小;
- 校验
request
大小;
- 获取服务列表;
- 实现负载均衡;
- 设置请求
url
,超时时间,重试策略。
@Override
protected Mono<Void> doExecute(final ServerWebExchange exchange, final ShenyuPluginChain chain, final SelectorData selector, final RuleData rule) {
// 获取上下文信息
ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
assert shenyuContext != null;
// 获取规则的handle属性
DivideRuleHandle ruleHandle = DividePluginDataHandler.CACHED_HANDLE.get().obtainHandle(CacheKeyUtils.INST.getKey(rule));
long headerSize = 0;
// 校验header大小
for (List<String> multiHeader : exchange.getRequest().getHeaders().values()) {
for (String value : multiHeader) {
headerSize += value.getBytes(StandardCharsets.UTF_8).length;
}
}
if (headerSize > ruleHandle.getHeaderMaxSize()) {
LOG.error("request header is too large");
Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.REQUEST_HEADER_TOO_LARGE, null);
return WebFluxResultUtils.result(exchange, error);
}
// 校验request大小
if (exchange.getRequest().getHeaders().getContentLength() > ruleHandle.getRequestMaxSize()) {
LOG.error("request entity is too large");
Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.REQUEST_ENTITY_TOO_LARGE, null);
return WebFluxResultUtils.result(exchange, error);
}
// 获取服务列表upstreamList
List<Upstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());
if (CollectionUtils.isEmpty(upstreamList)) {
LOG.error("divide upstream configuration error: {}", rule);
Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL, null);
return WebFluxResultUtils.result(exchange, error);
}
// 请求ip
String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
// 实现负载均衡
Upstream upstream = LoadBalancerFactory.selector(upstreamList, ruleHandle.getLoadBalance(), ip);
if (Objects.isNull(upstream)) {
LOG.error("divide has no upstream");
Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL, null);
return WebFluxResultUtils.result(exchange, error);
}
// 设置url
String domain = upstream.buildDomain();
exchange.getAttributes().put(Constants.HTTP_DOMAIN, domain);
// 设置超时时间
exchange.getAttributes().put(Constants.HTTP_TIME_OUT, ruleHandle.getTimeout());
exchange.getAttributes().put(Constants.HTTP_RETRY, ruleHandle.getRetry());
// 设置重试策略
exchange.getAttributes().put(Constants.RETRY_STRATEGY, ruleHandle.getRetryStrategy());
exchange.getAttributes().put(Constants.LOAD_BALANCE, ruleHandle.getLoadBalance());
exchange.getAttributes().put(Constants.DIVIDE_SELECTOR_ID, selector.getId());
return chain.execute(exchange);
}
2.4 发起请求
默认由WebClientPlugin
向http
服务发起调用请求,类继承关系如下:
- ShenyuPlugin:顶层插件,定义插件方法;
- AbstractHttpClientPlugin:抽象类,实现请求调用的公共逻辑;
- WebClientPlugin:通过
WebClient
发起请求;
- NettyHttpClientPlugin:通过
Netty
发起请求。
发起请求调用:
- org.apache.shenyu.plugin.httpclient.AbstractHttpClientPlugin#execute()
在execute()
方法中发起请求调用:
- 获取指定的超时时间,重试次数
- 发起请求
- 根据指定的重试策略进行失败后重试操作
public abstract class AbstractHttpClientPlugin<R> implements ShenyuPlugin {
protected static final Logger LOG = LoggerFactory.getLogger(AbstractHttpClientPlugin.class);
@Override
public final Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
// 获取上下文信息
final ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
assert shenyuContext != null;
// 获取uri
final URI uri = exchange.getAttribute(Constants.HTTP_URI);
if (Objects.isNull(uri)) {
Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.CANNOT_FIND_URL, null);
return WebFluxResultUtils.result(exchange, error);
}
// 获取指定的超时时间
final long timeout = (long) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_TIME_OUT)).orElse(3000L);
final Duration duration = Duration.ofMillis(timeout);
// 获取指定重试次数
final int retryTimes = (int) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_RETRY)).orElse(0);
// 获取指定的重试策略
final String retryStrategy = (String) Optional.ofNullable(exchange.getAttribute(Constants.RETRY_STRATEGY)).orElseGet(RetryEnum.CURRENT::getName);
LOG.info("The request urlPath is {}, retryTimes is {}, retryStrategy is {}", uri.toASCIIString(), retryTimes, retryStrategy);
// 构建header
final HttpHeaders httpHeaders = buildHttpHeaders(exchange);
// 发起请求
final Mono<R> response = doRequest(exchange, exchange.getRequest().getMethodValue(), uri, httpHeaders, exchange.getRequest().getBody())
.timeout(duration, Mono.error(new TimeoutException("Response took longer than timeout: " + duration)))
.doOnError(e -> LOG.error(e.getMessage(), e));
// 重试策略CURRENT,对当前服务进行重试
if (RetryEnum.CURRENT.getName().equals(retryStrategy)) {
//old version of DividePlugin and SpringCloudPlugin will run on this
return response.retryWhen(Retry.anyOf(TimeoutException.class, ConnectTimeoutException.class, ReadTimeoutException.class, IllegalStateException.class)
.retryMax(retryTimes)
.backoff(Backoff.exponential(Duration.ofMillis(200), Duration.ofSeconds(20), 2, true)))
.onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th))
.flatMap((Function<Object, Mono<? extends Void>>) o -> chain.execute(exchange));
}
// 对其他服务进行重试
// 排除已经调用过的服务
final Set<URI> exclude = Sets.newHashSet(uri);
// 请求重试
return resend(response, exchange, duration, httpHeaders, exclude, retryTimes)
.onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th))
.flatMap((Function<Object, Mono<? extends Void>>) o -> chain.execute(exchange));
}
private Mono<R> resend(final Mono<R> clientResponse,
final ServerWebExchange exchange,
final Duration duration,
final HttpHeaders httpHeaders,
final Set<URI> exclude,
final int retryTimes) {
Mono<R> result = clientResponse;
// 根据指定的重试次数进行重试
for (int i = 0; i < retryTimes; i++) {
result = resend(result, exchange, duration, httpHeaders, exclude);
}
return result;
}
private Mono<R> resend(final Mono<R> response,
final ServerWebExchange exchange,
final Duration duration,
final HttpHeaders httpHeaders,
final Set<URI> exclude) {
return response.onErrorResume(th -> {
final String selectorId = exchange.getAttribute(Constants.DIVIDE_SELECTOR_ID);
final String loadBalance = exchange.getAttribute(Constants.LOAD_BALANCE);
//查询可用服务
final List<Upstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selectorId)
.stream().filter(data -> {
final String trimUri = data.getUrl().trim();
for (URI needToExclude : exclude) {
// exclude already called
if ((needToExclude.getHost() + ":" + needToExclude.getPort()).equals(trimUri)) {
return false;
}
}
return true;
}).collect(Collectors.toList());
if (CollectionUtils.isEmpty(upstreamList)) {
// no need to retry anymore
return Mono.error(new ShenyuException(ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL_AFTER_FAILOVER.getMsg()));
}
// 请求ip
final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
// 实现负载均衡
final Upstream upstream = LoadBalancerFactory.selector(upstreamList, loadBalance, ip);
if (Objects.isNull(upstream)) {
// no need to retry anymore
return Mono.error(new ShenyuException(ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL_AFTER_FAILOVER.getMsg()));
}
final URI newUri = RequestUrlUtils.buildRequestUri(exchange, upstream.buildDomain());
// 排除已经调用的uri
exclude.add(newUri);
// 进行再次调用
return doRequest(exchange, exchange.getRequest().getMethodValue(), newUri, httpHeaders, exchange.getRequest().getBody())
.timeout(duration, Mono.error(new TimeoutException("Response took longer than timeout: " + duration)))
.doOnError(e -> LOG.error(e.getMessage(), e));
});
}
//......
}
- org.apache.shenyu.plugin.httpclient.WebClientPlugin#doRequest()
在doRequest()
方法中通过webClient
发起真正的请求调用。
@Override
protected Mono<ClientResponse> doRequest(final ServerWebExchange exchange, final String httpMethod, final URI uri,
final HttpHeaders httpHeaders, final Flux<DataBuffer> body) {
return webClient.method(HttpMethod.valueOf(httpMethod)).uri(uri) //请求uri
.headers(headers -> headers.addAll(httpHeaders)) // 请求header
.body(BodyInserters.fromDataBuffers(body))
.exchange() // 发起请求
.doOnSuccess(res -> {
if (res.statusCode().is2xxSuccessful()) { // 成功
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName());
} else { // 失败
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.ERROR.getName());
}
exchange.getResponse().setStatusCode(res.statusCode());
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_ATTR, res);
});
}
2.5 处理响应结果
- org.apache.shenyu.plugin.response.ResponsePlugin#execute()
响应结果由ResponsePlugin
插件处理。
@Override
public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
assert shenyuContext != null;
// 根据rpc类型处理结果
return writerMap.get(shenyuContext.getRpcType()).writeWith(exchange, chain);
}
处理类型由MessageWriter
决定,类继承关系如下:
- MessageWriter:接口,定义消息处理方法;
- NettyClientMessageWriter:处理
Netty
调用结果;
- RPCMessageWriter:处理
RPC
调用结果;
- WebClientMessageWriter:处理
WebClient
调用结果;
默认是通过WebCient
发起http
请求。
- org.apache.shenyu.plugin.response.strategy.WebClientMessageWriter#writeWith()
在writeWith()
方法中处理响应结果。
@Override
public Mono<Void> writeWith(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
return chain.execute(exchange).then(Mono.defer(() -> {
// 获取响应
ServerHttpResponse response = exchange.getResponse();
ClientResponse clientResponse = exchange.getAttribute(Constants.CLIENT_RESPONSE_ATTR);
if (Objects.isNull(clientResponse)) {
Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.SERVICE_RESULT_ERROR, null);
return WebFluxResultUtils.result(exchange, error);
}
//获取cookies和headers
response.getCookies().putAll(clientResponse.cookies());
response.getHeaders().putAll(clientResponse.headers().asHttpHeaders());
// image, pdf or stream does not do format processing.
// 处理特殊响应类型
if (clientResponse.headers().contentType().isPresent()) {
final String media = clientResponse.headers().contentType().get().toString().toLowerCase();
if (media.matches(COMMON_BIN_MEDIA_TYPE_REGEX)) {
return response.writeWith(clientResponse.body(BodyExtractors.toDataBuffers()))
.doOnCancel(() -> clean(exchange));
}
}
// 处理一般响应类型
clientResponse = ResponseUtils.buildClientResponse(response, clientResponse.body(BodyExtractors.toDataBuffers()));
return clientResponse.bodyToMono(byte[].class)
.flatMap(originData -> WebFluxResultUtils.result(exchange, originData))
.doOnCancel(() -> clean(exchange));
}));
}
分析至此,关于Divide
插件的源码分析就完成了,分析流程图如下:
3. 小结
本文源码分析从http
服务注册开始,到divide
插件的服务调用。divide
插件主要用来处理http
请求。有些源码没有进入深入分析,比如负载均衡的实现,服务探活,将在后续继续分析。