如何通过Apache ShenYu网关代理Dubbo服务

本文介绍了如何通过Apache ShenYu网关访问Dubbo服务,主要内容包括从简单示例到核心调用流程分析,并对设计原理进行了总结。

1. 介绍

  • Apache ShenYu

Apache ShenYu(Incubating) 是一个异步的,高性能的,跨语言的,响应式的 API 网关。兼容各种主流框架体系,支持热插拔,用户可以定制化开发,满足用户各种场景的现状和未来需求,经历过大规模场景的锤炼。

2021年5月,ShenYu捐献给 Apache 软件基金会,Apache 基金会全票通过,顺利进入孵化器。

  • Apache Dubbo

Apache Dubbo 是一款微服务开发框架,它提供了 RPC 通信 与 微服务治理 两大关键能力。这意味着,使用 Dubbo 开发的微服务,将具备相互之间的远程发现与通信能力, 同时利用 Dubbo 提供的丰富服务治理能力,可以实现诸如服务发现、负载均衡、流量调度等服务治理诉求。同时 Dubbo 是高度可扩展的,用户几乎可以在任意功能点去定制自己的实现,以改变框架的默认行为来满足自己的业务需求。

2. Dubbo快速开始

本小节介绍如何将Dubbo服务接入到ShenYu网关,您可以直接在工程下找到本小节的示例代码

2.1 启动shenyu-admin

shenyu-adminApache ShenYu后台管理系统, 启动的方式有多种,本文通过 本地部署 的方式启动。启动成功后,需要在基础配置->插件管理中,把dubbo 插件设置为开启,并设置你的注册地址,请确保注册中心已经开启。

2.2 启动shenyu网关

在这里通过 源码 的方式启动,直接运行shenyu-bootstrap中的ShenyuBootstrapApplication

在启动前,请确保网关已经引入相关依赖。如果客户端是apache dubbo,注册中心使用zookeeper,请参考如下配置:

        <!-- apache shenyu  apache dubbo plugin start-->
        <dependency>
            <groupId>org.apache.shenyu</groupId>
            <artifactId>shenyu-spring-boot-starter-plugin-apache-dubbo</artifactId>
            <version>${project.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.dubbo</groupId>
            <artifactId>dubbo</artifactId>
            <version>2.7.5</version>
        </dependency>
        <!-- Dubbo zookeeper registry dependency start -->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-client</artifactId>
            <version>4.0.1</version>
            <exclusions>
                <exclusion>
                    <artifactId>log4j</artifactId>
                    <groupId>log4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.0.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.1</version>
        </dependency>
        <!-- Dubbo zookeeper registry dependency end -->
        <!-- apache dubbo plugin end-->

2.3 启动shenyu-examples-dubbo

以官网提供的例子为例 shenyu-examples-dubbo 。 假如dubbo服务定义如下:

<beans /* ...... * />

    <dubbo:application name="test-dubbo-service"/>
    <dubbo:registry address="${dubbo.registry.address}"/>
    <dubbo:protocol name="dubbo" port="20888"/>

    <dubbo:service timeout="10000" interface="org.apache.shenyu.examples.dubbo.api.service.DubboTestService" ref="dubboTestService"/>

</beans>

声明应用服务名称,注册中心地址,使用dubbo协议,声明服务接口,对应接口实现类:

/**
 * DubboTestServiceImpl.
 */
@Service("dubboTestService")
public class DubboTestServiceImpl implements DubboTestService {
    
    @Override
    @ShenyuDubboClient(path = "/findById", desc = "Query by Id")
    public DubboTest findById(final String id) {
        return new DubboTest(id, "hello world shenyu Apache, findById");
    }

    //......
}

在接口实现类中,使用注解@ShenyuDubboClientshenyu-admin注册服务。

在配置文件application.yml中的配置信息:

server:
  port: 8011
  address: 0.0.0.0
  servlet:
    context-path: /
spring:
  main:
    allow-bean-definition-overriding: true
dubbo:
  registry:
    address: zookeeper://localhost:2181  # dubbo使用的注册中心
    
shenyu:
  register:
    registerType: http #注册方式
    serverLists: http://localhost:9095 #注册地址
    props:
      username: admin 
      password: 123456
  client:
    dubbo:
      props:
        contextPath: /dubbo  
        appName: dubbo

在配置文件中,声明dubbo使用的注册中心地址,dubbo服务向shenyu-admin注册,使用的方式是http,注册地址是http://localhost:9095

关于注册方式的使用,请参考 应用客户端接入

2.4 调用dubbo服务

shenyu-examples-dubbo项目成功启动之后会自动把加 @ShenyuDubboClient 注解的接口方法注册到网关。

打开 插件列表 -> Proxy -> dubbo 可以看到插件规则配置列表:

注册成功的选择器信息:

注册成功的规则信息:

选择器和规则是 Apache ShenYu 网关中最灵魂的东西。掌握好它,你可以对任何流量进行管理。对应为选择器与规则里面的匹配条件(conditions),根据不同的流量筛选规则,我们可以处理各种复杂的场景。流量筛选可以从Header, URI, Query, Cookie 等等Http请求获取数据。

然后可以采用 Match=RegexGroovyExclude等匹配方式,匹配出你所预想的数据。多组匹配添加可以使用And/Or的匹配策略。

具体的介绍与使用请看: 选择器与规则管理

发起GET请求,通过ShenYu网关调用dubbo服务:

GET http://localhost:9195/dubbo/findById?id=100
Accept: application/json

成功响应之后,结果如下:

{
  "name": "hello world shenyu Apache, findById",
  "id": "100"
}

至此,就成功的通过http请求访问dubbo服务了,ShenYu网关通过shenyu-plugin-dubbo模块将http协议转成了dubbo协议。

3. 深入理解Dubbo插件

在运行上述demo的过程中,是否存在一些疑问:

  • dubbo服务是如何注册到shenyu-admin
  • shenyu-admin是如何将数据同步到ShenYu网关?
  • DubboPlugin是如何将http协议转换到到dubbo协议?

带着这些疑问,来深入理解dubbo插件。

3.1 应用客户端接入

应用客户端接入是指将微服务接入到Apache ShenYu网关,当前支持HttpDubboSpring CloudgRPCMotanSofaTars等协议的接入。

将应用客户端接入到Apache ShenYu网关是通过注册中心来实现的,涉及到客户端注册和服务端同步数据。注册中心支持HttpZookeeperEtcdConsulNacos。默认是通过Http方式注册。

客户端接入的相关配置请参考 客户端接入配置

3.1.1 客户端注册

在你的微服务配置中声明注册中心客户端类型,如HttpZookeeper。 应用程序启动时使用SPI方式加载并初始化对应注册中心客户端,通过实现Spring Bean相关的后置处理器接口,在其中获取需要进行注册的服务接口信息,将获取的信息放入Disruptor中。

注册中心客户端从Disruptor中读取数据,并将接口信息注册到shenyu-adminDisruptor在其中起数据与操作解耦的作用,利于扩展。

3.1.2 服务端注册

shenyu-admin配置中声明注册中心服务端类型,如HttpZookeeper。当shenyu-admin启动时,读取配置类型,加载并初始化对应的注册中心服务端,注册中心服务端收到shenyu-client注册的接口信息后,将其放入Disruptor中,然后会触发注册处理逻辑,将服务接口信息更新并发布同步事件。

Disruptor在其中起到数据与操作解耦,利于扩展。如果注册请求过多,导致注册异常,也有数据缓冲作用。

3.2 数据同步原理

数据同步是指在 shenyu-admin 后台操作数据以后,使用何种策略将数据同步到 Apache ShenYu 网关。Apache ShenYu 网关当前支持ZooKeeperWebSocketHttp长轮询NacosEtcdConsul 进行数据同步。默认是通过WebSocket进行数据同步。

数据同步的相关配置请参考 数据同步配置

3.2.1 数据同步的意义

网关是流量请求的入口,在微服务架构中承担了非常重要的角色,网关高可用的重要性不言而喻。在使用网关的过程中,为了满足业务诉求,经常需要变更配置,比如流控规则、路由规则等等。因此,网关动态配置是保障网关高可用的重要因素。

当前数据同步特性如下:

  • 所有的配置都缓存在 Apache ShenYu 网关内存中,每次请求都使用本地缓存,速度非常快。
  • 用户可以在 shenyu-admin 后台任意修改数据,并马上同步到网关内存。
  • 支持 Apache ShenYu 的插件、选择器、规则数据、元数据、签名数据等数据同步。
  • 所有插件的选择器,规则都是动态配置,立即生效,不需要重启服务。
  • 数据同步方式支持 ZookeeperHttp 长轮询WebsocketNacosEtcdConsul
3.2.2 数据同步原理分析

下图展示了 Apache ShenYu 数据同步的流程,Apache ShenYu 网关在启动时,会从配置服务同步配置数据,并且支持推拉模式获取配置变更信息,然后更新本地缓存。管理员可以在管理后台(shenyu-admin),变更用户权限、规则、插件、流量配置,通过推拉模式将变更信息同步给 Apache ShenYu 网关,具体是 push 模式,还是 pull 模式取决于使用哪种同步方式。

在最初的版本中,配置服务依赖 Zookeeper 实现,管理后台将变更信息 push 给网关。而现在可以支持 WebSocketHttp长轮询ZookeeperNacosEtcdConsul,通过在配置文件中设置 shenyu.sync.${strategy} 指定对应的同步策略,默认使用 webosocket 同步策略,可以做到秒级数据同步。但是,有一点需要注意的是,Apache ShenYu网关 和 shenyu-admin 必须使用相同的同步策略。

如下图所示,shenyu-admin 在用户发生配置变更之后,会通过 EventPublisher 发出配置变更通知,由 EventDispatcher 处理该变更通知,然后根据配置的同步策略(http、weboscket、zookeeper、naocs、etcd、consul),将配置发送给对应的事件处理器。

  • 如果是 websocket 同步策略,则将变更后的数据主动推送给 shenyu-web,并且在网关层,会有对应的 WebsocketDataHandler 处理器来处理 shenyu-admin 的数据推送。
  • 如果是 zookeeper 同步策略,将变更数据更新到 zookeeper,而 ZookeeperSyncCache 会监听到 zookeeper 的数据变更,并予以处理。
  • 如果是 http 同步策略,由网关主动发起长轮询请求,默认有 90s 超时时间,如果 shenyu-admin 没有数据变更,则会阻塞 http 请求,如果有数据发生变更则响应变更的数据信息,如果超过 60s 仍然没有数据变更则响应空数据,网关层接到响应后,继续发起 http 请求,反复同样的请求。

3.3 流程分析

流程分析是从源码的角度,展示服务注册流程,数据同步流程和服务调用流程。

3.3.1 服务注册流程
  • 读取dubbo服务

使用注解@ShenyuDubboClient标记需要注册到网关的dubbo服务。

注解扫描通过ApacheDubboServiceBeanListener完成,它实现了ApplicationListener<ContextRefreshedEvent>接口,在Spring容器启动过程中,发生上下文刷新事件时,开始执行事件处理方法onApplicationEvent()。在重写的方法逻辑中,读取Dubbo服务ServiceBean,构建元数据对象和URI对象,并向shenyu-admin注册。

具体的注册逻辑由注册中心实现,请参考 客户端接入原理

  • 处理注册信息

客户端通过注册中心注册的元数据和URI数据,在shenyu-admin端进行处理,负责存储到数据库和同步给shenyu网关。Dubbo插件的客户端注册处理逻辑在ShenyuClientRegisterDubboServiceImpl中。继承关系如下:

  • ShenyuClientRegisterService:客户端注册服务,顶层接口;
  • FallbackShenyuClientRegisterService:注册失败,提供重试操作;
  • AbstractShenyuClientRegisterServiceImpl:抽象类,实现部分公共注册逻辑;
  • ShenyuClientRegisterDubboServiceImpl:实现Dubbo插件的注册;

注册信息包括选择器,规则和元数据。

整体的dubbo服务注册流程如下:

3.3.2 数据同步流程
  • admin更新数据 假设在在后台管理系统中,新增一条选择器数据,请求会进入SelectorController类中的createSelector()方法,它负责数据的校验,添加或更新数据,返回结果信息。

SelectorServiceImpl类中通过createOrUpdate()方法完成数据的转换,保存到数据库,发布事件,更新upstream

Service类完成数据的持久化操作,即保存数据到数据库。发布变更数据通过eventPublisher.publishEvent()完成,这个eventPublisher对象是一个ApplicationEventPublisher类,这个类的全限定名是org.springframework.context.ApplicationEventPublisher,发布数据的功能正是是通过Spring相关的功能来完成的。

当事件发布完成后,会自动进入到DataChangedEventDispatcher类中的onApplicationEvent()方法,根据不同数据类型和数据同步方式进行事件处理。

  • 网关数据同步

网关在启动时,根据指定的数据同步方式加载不同的配置类,初始化数据同步相关类。

在接收到数据后,进行反序列化操作,读取数据类型和操作类型。不同的数据类型,有不同的数据处理方式,所以有不同的实现类。但是它们之间也有相同的处理逻辑,所以可以通过模板方法设计模式来实现。相同的逻辑放在抽象类AbstractDataHandler中的handle()方法中,不同逻辑就交给各自的实现类。

新增一条选择器数据,是新增操作,会进入到SelectorDataHandler.doUpdate()具体的数据处理逻辑中。

在通用插件数据订阅者CommonPluginDataSubscriber,负责处理所有插件、选择器和规则信息

将数据保存到网关的内存中,BaseDataCache是最终缓存数据的类,通过单例模式实现。选择器数据就存到了SELECTOR_MAP这个Map中。在后续使用的时候,也是从这里拿数据。

上述逻辑用流程图表示如下:

3.3.3 服务调用流程

Dubbo插件体系中,类继承关系如下:

ShenyuPlugin:顶层接口,定义接口方法;

AbstractShenyuPlugin:抽象类,实现插件共有逻辑;

AbstractDubboPlugin:dubbo插件抽象类,实现dubbo共有逻辑(ShenYu网关支持ApacheDubbo和AlibabaDubbo);

ApacheDubboPlugin:ApacheDubbo插件。

  • org.apache.shenyu.web.handler.ShenyuWebHandler.DefaultShenyuPluginChain#execute()

通过ShenYu网关代理后,请求入口是ShenyuWebHandler,它实现了org.springframework.web.server.WebHandler接口,通过责任链设计模式将所有插件连接起来。

  • org.apache.shenyu.plugin.base.AbstractShenyuPlugin#execute()

当请求到网关时,判断某个插件是否执行,是通过指定的匹配逻辑来完成。在execute()方法中执行选择器和规则的匹配逻辑。

  • org.apache.shenyu.plugin.global.GlobalPlugin#execute()

最先被执行的是GlobalPlugin ,它是一个全局插件,在execute()方法中构建上下文信息。

  • org.apache.shenyu.plugin.base.RpcParamTransformPlugin#execute()

接着被执行的是RpcParamTransformPlugin , 它负责从http请求中读取参数,保存到exchange中,传递给rpc服务。在execute()方法中,执行该插件的核心逻辑:从exchange中获取请求信息,根据请求传入的内容形式处理参数。

  • org.apache.shenyu.plugin.dubbo.common.AbstractDubboPlugin

然后被执行的是DubboPlugin 。在doExecute()方法中,主要是检查元数据和参数。在doDubboInvoker()方法中设置特殊的上下文信息,然后开始dubbo的泛化调用。

genericInvoker()方法中:

  • 获取ReferenceConfig对象;
  • 获取泛化服务GenericService对象;
  • 构造请求参数pair对象;
  • 发起异步的泛化调用。

通过泛化调用就可以实现在网关调用dubbo服务了。

ReferenceConfig对象是支持泛化调用的关键对象 ,它的初始化操作是在数据同步的时候完成的。

  • org.apache.shenyu.plugin.response.ResponsePlugin#execute()

最后被执行的是ResponsePlugin ,它统一处理网关的响应结果信息。处理类型由MessageWriter决定,类继承关系如下:

MessageWriter:接口,定义消息处理方法;

NettyClientMessageWriter:处理Netty调用结果;

RPCMessageWriter:处理RPC调用结果;

WebClientMessageWriter:处理WebClient调用结果;

Dubbo服务调用,处理结果是RPCMessageWriter

  • org.apache.shenyu.plugin.response.strategy.RPCMessageWriter#writeWith()

writeWith()方法中处理响应结果,获取结果或处理异常。

分析至此,关于Dubbo插件的源码分析就完成了,分析流程图如下:

4. 小结

本文从实际案例出发,由浅入深分析了ShenYu网关对Dubbo服务的代理过程。涉及到的主要知识点如下:

  • 通过责任链设计模式执行插件;
  • 使用模板方法设计模式实现AbstractShenyuPlugin,处理通用的操作类型;
  • 使用单例设计模式实现缓存数据类BaseDataCache
  • 通过springboot starter即可引入不同的注册中心和数同步方式,扩展性很好;
  • 通过admin支持规则热更新,方便流量管控;
  • Disruptor队列是为了数据与操作解耦,以及数据缓冲。

Apache ShenYu源码阅读系列-Dubbo插件

Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API 网关。

ShenYu 网关使用 dubbo 插件完成对 dubbo服务的调用。你可以查看官方文档 Dubbo快速开始 了解如何使用该插件。

本文基于shenyu-2.4.3版本进行源码分析,官网的介绍请参考 Dubbo服务接入

1. 服务注册

以官网提供的例子为例 shenyu-examples-dubbo 。 假如你的dubbo服务定义如下(spring-dubbo.xml):

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://code.alibabatech.com/schema/dubbo
       https://code.alibabatech.com/schema/dubbo/dubbo.xsd">

    <dubbo:application name="test-dubbo-service"/>
    <dubbo:registry address="${dubbo.registry.address}"/>
    <dubbo:protocol name="dubbo" port="20888"/>

    <dubbo:service timeout="10000" interface="org.apache.shenyu.examples.dubbo.api.service.DubboTestService" ref="dubboTestService"/>

</beans>

声明应用服务名称,注册中心地址,使用dubbo协议,声明服务接口,对应接口实现类:

/**
 * DubboTestServiceImpl.
 */
@Service("dubboTestService")
public class DubboTestServiceImpl implements DubboTestService {
    
    @Override
    @ShenyuDubboClient(path = "/findById", desc = "Query by Id")
    public DubboTest findById(final String id) {
        return new DubboTest(id, "hello world shenyu Apache, findById");
    }

    //......
}

在接口实现类中,使用注解@ShenyuDubboClientshenyu-admin注册服务。该注解的作用及原理,稍后再进行分析。

在配置文件application.yml中的配置信息:

server:
  port: 8011
  address: 0.0.0.0
  servlet:
    context-path: /
spring:
  main:
    allow-bean-definition-overriding: true
dubbo:
  registry:
    address: zookeeper://localhost:2181  # dubbo使用的注册中心
    
shenyu:
  register:
    registerType: http #注册方式
    serverLists: http://localhost:9095 #注册地址
    props:
      username: admin 
      password: 123456
  client:
    dubbo:
      props:
        contextPath: /dubbo  
        appName: dubbo

在配置文件中,声明dubbo使用的注册中心地址,dubbo服务向shenyu-admin注册,使用的方式是http,注册地址是http://localhost:9095

关于注册方式的使用,请参考 应用客户端接入

1.1 声明注册接口

使用注解@ShenyuDubboClient将服务注册到网关。简单demo如下:

// dubbo服务
@Service("dubboTestService")
public class DubboTestServiceImpl implements DubboTestService {
    
    @Override
    @ShenyuDubboClient(path = "/findById", desc = "Query by Id") // 需要注册的方法
    public DubboTest findById(final String id) {
        return new DubboTest(id, "hello world shenyu Apache, findById");
    }

    //......
}

注解定义:

/**
 * 作用于类和方法上
 */
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
@Inherited
public @interface ShenyuDubboClient {
    
	//注册路径
    String path();
    
    //规则名称
    String ruleName() default "";
   
    //描述信息
    String desc() default "";

    //是否启用
    boolean enabled() default true;
}

1.2 扫描注解信息

注解扫描通过ApacheDubboServiceBeanListener完成,它实现了ApplicationListener<ContextRefreshedEvent>接口,在Spring容器启动过程中,发生上下文刷新事件时,开始执行事件处理方法onApplicationEvent()

在构造器实例化的过程中:

  • 读取属性配置
  • 开启线程池
  • 启动注册中心,用于向shenyu-admin注册
public class ApacheDubboServiceBeanListener implements ApplicationListener<ContextRefreshedEvent> {

	// ......

    //构造器
    public ApacheDubboServiceBeanListener(final PropertiesConfig clientConfig, final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
        //1.读取属性配置
        Properties props = clientConfig.getProps();
        String contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH);
        String appName = props.getProperty(ShenyuClientConstants.APP_NAME);
        if (StringUtils.isBlank(contextPath)) {
            throw new ShenyuClientIllegalArgumentException("apache dubbo client must config the contextPath or appName");
        }
        this.contextPath = contextPath;
        this.appName = appName;
        this.host = props.getProperty(ShenyuClientConstants.HOST);
        this.port = props.getProperty(ShenyuClientConstants.PORT);
        //2.开启线程池
        executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("shenyu-apache-dubbo-client-thread-pool-%d").build());
        //3.启动注册中心
        publisher.start(shenyuClientRegisterRepository);
    }

    /**
     * 上下文刷新事件,执行方法逻辑
     */
    @Override
    public void onApplicationEvent(final ContextRefreshedEvent contextRefreshedEvent) {
        //......
    }
  • ApacheDubboServiceBeanListener#onApplicationEvent()

重写的方法逻辑:读取Dubbo服务ServiceBean,构建元数据对象和URI对象,并向shenyu-admin注册。

    @Override
    public void onApplicationEvent(final ContextRefreshedEvent contextRefreshedEvent) {
        //读取ServiceBean
        Map<String, ServiceBean> serviceBean = contextRefreshedEvent.getApplicationContext().getBeansOfType(ServiceBean.class);
        if (serviceBean.isEmpty()) {
            return;
        }
        //保证该方法只执行一次
        if (!registered.compareAndSet(false, true)) {
            return;
        }
        //处理元数据对象
        for (Map.Entry<String, ServiceBean> entry : serviceBean.entrySet()) {
            handler(entry.getValue());
        }
        //处理URI对象
        serviceBean.values().stream().findFirst().ifPresent(bean -> {
            publisher.publishEvent(buildURIRegisterDTO(bean));
        });
    }
  • handler()

    handler()方法中,从serviceBean中读取所有方法,判断方法上是否有ShenyuDubboClient注解,如果存在就构建元数据对象,并通过注册中心,向shenyu-admin注册该方法。

    private void handler(final ServiceBean<?> serviceBean) {
        //获取代理对象
        Object refProxy = serviceBean.getRef();
        //获取class信息
        Class<?> clazz = refProxy.getClass();
        if (AopUtils.isAopProxy(refProxy)) {
            clazz = AopUtils.getTargetClass(refProxy);
        }
        //获取所有方法
        Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(clazz);
        for (Method method : methods) {
            //读取ShenyuDubboClient注解信息
            ShenyuDubboClient shenyuDubboClient = method.getAnnotation(ShenyuDubboClient.class);
            if (Objects.nonNull(shenyuDubboClient)) {
                //构建元数据对象,并注册
                publisher.publishEvent(buildMetaDataDTO(serviceBean, shenyuDubboClient, method));
            }
        }
    }
  • buildMetaDataDTO()

    构建元数据对象,在这里构建方法注册的必要信息,后续用于选择器或规则匹配。

    private MetaDataRegisterDTO buildMetaDataDTO(final ServiceBean<?> serviceBean, final ShenyuDubboClient shenyuDubboClient, final Method method) {
        //应用名称
        String appName = buildAppName(serviceBean);
        //方法路径
        String path = contextPath + shenyuDubboClient.path();
        //描述信息
        String desc = shenyuDubboClient.desc();
        //服务名称
        String serviceName = serviceBean.getInterface();
        //规则名称
        String configRuleName = shenyuDubboClient.ruleName();
        String ruleName = ("".equals(configRuleName)) ? path : configRuleName;
        //方法名称
        String methodName = method.getName();
        //参数类型
        Class<?>[] parameterTypesClazz = method.getParameterTypes();
        String parameterTypes = Arrays.stream(parameterTypesClazz).map(Class::getName).collect(Collectors.joining(","));
        return MetaDataRegisterDTO.builder()
                .appName(appName)
                .serviceName(serviceName)
                .methodName(methodName)
                .contextPath(contextPath)
                .host(buildHost())
                .port(buildPort(serviceBean))
                .path(path)
                .ruleName(ruleName)
                .pathDesc(desc)
                .parameterTypes(parameterTypes)
                .rpcExt(buildRpcExt(serviceBean)) //dubbo服务的扩展信息
                .rpcType(RpcTypeEnum.DUBBO.getName())
                .enabled(shenyuDubboClient.enabled())
                .build();
    }
  • buildRpcExt()

    dubbo服务的扩展信息

    private String buildRpcExt(final ServiceBean serviceBean) {
        DubboRpcExt build = DubboRpcExt.builder()
                .group(StringUtils.isNotEmpty(serviceBean.getGroup()) ? serviceBean.getGroup() : "")//分组
                .version(StringUtils.isNotEmpty(serviceBean.getVersion()) ? serviceBean.getVersion() : "")//版本
                .loadbalance(StringUtils.isNotEmpty(serviceBean.getLoadbalance()) ? serviceBean.getLoadbalance() : Constants.DEFAULT_LOADBALANCE)//负载均衡策略,默认随机
                .retries(Objects.isNull(serviceBean.getRetries()) ? Constants.DEFAULT_RETRIES : serviceBean.getRetries())//重试次数,默认2
                .timeout(Objects.isNull(serviceBean.getTimeout()) ? Constants.DEFAULT_CONNECT_TIMEOUT : serviceBean.getTimeout())//超时,默认3000
                .sent(Objects.isNull(serviceBean.getSent()) ? Constants.DEFAULT_SENT : serviceBean.getSent())//sent,默认false
                .cluster(StringUtils.isNotEmpty(serviceBean.getCluster()) ? serviceBean.getCluster() : Constants.DEFAULT_CLUSTER)//集群策略,默认failover
                .url("")
                .build();
        return GsonUtils.getInstance().toJson(build);
    }
  • buildURIRegisterDTO()

    构建URI对象,注册服务本身的信息,后续可用于服务探活。

private URIRegisterDTO buildURIRegisterDTO(final ServiceBean serviceBean) {
        return URIRegisterDTO.builder()
                .contextPath(this.contextPath) //上下文路径
                .appName(buildAppName(serviceBean))//应用名称
                .rpcType(RpcTypeEnum.DUBBO.getName())//rpc类型:dubbo
                .host(buildHost()) //host
                .port(buildPort(serviceBean))//port
                .build();
 }

具体的注册逻辑由注册中心实现,请参考 客户端接入原理

//向注册中心,发布注册事件   
publisher.publishEvent();

1.3 处理注册信息

客户端通过注册中心注册的元数据和URI数据,在shenyu-admin端进行处理,负责存储到数据库和同步给shenyu网关。Dubbo插件的客户端注册处理逻辑在ShenyuClientRegisterDubboServiceImpl中。继承关系如下:

  • ShenyuClientRegisterService:客户端注册服务,顶层接口;
  • FallbackShenyuClientRegisterService:注册失败,提供重试操作;
  • AbstractShenyuClientRegisterServiceImpl:抽象类,实现部分公共注册逻辑;
  • ShenyuClientRegisterDubboServiceImpl:实现Dubbo插件的注册;
1.3.1 注册服务
  • org.apache.shenyu.admin.service.register.AbstractShenyuClientRegisterServiceImpl#register()

    客户端通过注册中心注册的元数据MetaDataRegisterDTO对象在shenyu-adminregister()方法被接送到。

   @Override
    public String register(final MetaDataRegisterDTO dto) {
        //1. 注册选择器
        String selectorHandler = selectorHandler(dto);
        String selectorId = selectorService.registerDefault(dto, PluginNameAdapter.rpcTypeAdapter(rpcType()), selectorHandler);
        //2. 注册规则
        String ruleHandler = ruleHandler();
        RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);
        ruleService.registerDefault(ruleDTO);
        //3. 注册元数据
        registerMetadata(dto);
        //4. 注册ContextPath
        String contextPath = dto.getContextPath();
        if (StringUtils.isNotEmpty(contextPath)) {
            registerContextPath(dto);
        }
        return ShenyuResultMessage.SUCCESS;
    }
1.3.1.1 注册选择器
  • org.apache.shenyu.admin.service.impl.SelectorServiceImpl#registerDefault()

构建contextPath,查找选择器信息是否存在,如果存在就返回id;不存在就创建默认的选择器信息。

    @Override
    public String registerDefault(final MetaDataRegisterDTO dto, final String pluginName, final String selectorHandler) {
        // 构建contextPath
        String contextPath = ContextPathUtils.buildContextPath(dto.getContextPath(), dto.getAppName());
        // 通过名称查找选择器信息是否存在
        SelectorDO selectorDO = findByNameAndPluginName(contextPath, pluginName);
        if (Objects.isNull(selectorDO)) {
            // 不存在就创建默认的选择器信息
            return registerSelector(contextPath, pluginName, selectorHandler);
        }
        return selectorDO.getId();
    }
  • 默认选择器信息

    在这里构建默认选择器信息及其条件属性。

   //注册选择器
   private String registerSelector(final String contextPath, final String pluginName, final String selectorHandler) {
        //构建选择器
        SelectorDTO selectorDTO = buildSelectorDTO(contextPath, pluginMapper.selectByName(pluginName).getId());
        selectorDTO.setHandle(selectorHandler);
        //注册默认选择器
        return registerDefault(selectorDTO);
    }
     //构建选择器
    private SelectorDTO buildSelectorDTO(final String contextPath, final String pluginId) {
        //构建默认选择器
        SelectorDTO selectorDTO = buildDefaultSelectorDTO(contextPath);
        selectorDTO.setPluginId(pluginId);
         //构建默认选择器的条件属性
        selectorDTO.setSelectorConditions(buildDefaultSelectorConditionDTO(contextPath));
        return selectorDTO;
    }
  • 构建默认选择器
private SelectorDTO buildDefaultSelectorDTO(final String name) {
    return SelectorDTO.builder()
            .name(name) // 名称
            .type(SelectorTypeEnum.CUSTOM_FLOW.getCode()) // 默认类型自定义
            .matchMode(MatchModeEnum.AND.getCode()) //默认匹配方式 and
            .enabled(Boolean.TRUE)  //默认启开启
            .loged(Boolean.TRUE)  //默认记录日志
            .continued(Boolean.TRUE) //默认继续后续选择器
            .sort(1) //默认顺序1
            .build();
}
  • 构建默认选择器条件属性
private List<SelectorConditionDTO> buildDefaultSelectorConditionDTO(final String contextPath) {
    SelectorConditionDTO selectorConditionDTO = new SelectorConditionDTO();
    selectorConditionDTO.setParamType(ParamTypeEnum.URI.getName()); // 默认参数类型URI
    selectorConditionDTO.setParamName("/");
    selectorConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); // 默认匹配策略 match
    selectorConditionDTO.setParamValue(contextPath + AdminConstants.URI_SUFFIX); // 默认值 /contextPath/**
    return Collections.singletonList(selectorConditionDTO);
}
  • 注册默认选择器
@Override
public String registerDefault(final SelectorDTO selectorDTO) {
    //选择器信息
    SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);
    //选择器条件属性
    List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();
    if (StringUtils.isEmpty(selectorDTO.getId())) {
        // 向数据库插入选择器信息
        selectorMapper.insertSelective(selectorDO);
          // 向数据库插入选择器条件属性
        selectorConditionDTOs.forEach(selectorConditionDTO -> {
            selectorConditionDTO.setSelectorId(selectorDO.getId());            selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));
        });
    }
    // 发布同步事件,向网关同步选择信息及其条件属性
    publishEvent(selectorDO, selectorConditionDTOs);
    return selectorDO.getId();
}
1.3.1.2 注册规则

在注册服务的第二步中,开始构建默认规则,然后注册规则。

@Override
    public String register(final MetaDataRegisterDTO dto) {
        //1. 注册选择器
        //......
        
        //2. 注册规则
        // 默认规则处理属性
        String ruleHandler = ruleHandler();
        // 构建默认规则信息
        RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);
        // 注册规则
        ruleService.registerDefault(ruleDTO);
        
        //3. 注册元数据
        //......
        
        //4. 注册ContextPath
        //......
        
        return ShenyuResultMessage.SUCCESS;
    }
  • 默认规则处理属性
    @Override
    protected String ruleHandler() {
        // 默认规则处理属性
        return new DubboRuleHandle().toJson();
    }

Dubbo插件默认规则处理属性

public class DubboRuleHandle implements RuleHandle {

    /**
     * dubbo服务版本信息.
     */
    private String version;

    /**
     * 分组.
     */
    private String group;

    /**
     * 重试次数.
     */
    private Integer retries = 0;

    /**
     * 负载均衡策略:默认随机
     */
    private String loadbalance = LoadBalanceEnum.RANDOM.getName();

    /**
     * 超时,默认3000
     */
    private long timeout = Constants.TIME_OUT;
}
  • 构建默认规则信息
  // 构建默认规则信息
    private RuleDTO buildRpcDefaultRuleDTO(final String selectorId, final MetaDataRegisterDTO metaDataDTO, final String ruleHandler) {
        return buildRuleDTO(selectorId, ruleHandler, metaDataDTO.getRuleName(), metaDataDTO.getPath());
    }
   //  构建默认规则信息
    private RuleDTO buildRuleDTO(final String selectorId, final String ruleHandler, final String ruleName, final String path) {
        RuleDTO ruleDTO = RuleDTO.builder()
                .selectorId(selectorId) //关联的选择器id
                .name(ruleName) //规则名称
                .matchMode(MatchModeEnum.AND.getCode()) // 默认匹配模式 and
                .enabled(Boolean.TRUE) // 默认开启
                .loged(Boolean.TRUE) //默认记录日志
                .sort(1) //默认顺序 1
                .handle(ruleHandler)
                .build();
        RuleConditionDTO ruleConditionDTO = RuleConditionDTO.builder()
                .paramType(ParamTypeEnum.URI.getName()) // 默认参数类型URI
                .paramName("/")
                .paramValue(path) //参数值path
                .build();
        if (path.indexOf("*") > 1) {
            ruleConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); //如果path中有*,操作类型则默认为 match
        } else {
            ruleConditionDTO.setOperator(OperatorEnum.EQ.getAlias()); // 否则,默认操作类型 = 
        }
        ruleDTO.setRuleConditions(Collections.singletonList(ruleConditionDTO));
        return ruleDTO;
    }
  • org.apache.shenyu.admin.service.impl.RuleServiceImpl#registerDefault()

注册规则:向数据库插入记录,并向网关发布事件,进行数据同步。


    @Override
    public String registerDefault(final RuleDTO ruleDTO) {
        RuleDO exist = ruleMapper.findBySelectorIdAndName(ruleDTO.getSelectorId(), ruleDTO.getName());
        if (Objects.nonNull(exist)) {
            return "";
        }

        RuleDO ruleDO = RuleDO.buildRuleDO(ruleDTO);
        List<RuleConditionDTO> ruleConditions = ruleDTO.getRuleConditions();
        if (StringUtils.isEmpty(ruleDTO.getId())) {
            // 向数据库插入规则信息
            ruleMapper.insertSelective(ruleDO);
            //向数据库插入规则体条件属性
            ruleConditions.forEach(ruleConditionDTO -> {
                ruleConditionDTO.setRuleId(ruleDO.getId());                ruleConditionMapper.insertSelective(RuleConditionDO.buildRuleConditionDO(ruleConditionDTO));
            });
        }
        // 向网关发布事件,进行数据同步
        publishEvent(ruleDO, ruleConditions);
        return ruleDO.getId();
    }

1.3.1.3 注册元数据

元数据主要用于RPC服务的调用。

   @Override
    public String register(final MetaDataRegisterDTO dto) {
        //1. 注册选择器
        //......
        
        //2. 注册规则
        //......
        
        //3. 注册元数据
        registerMetadata(dto);
        
        //4. 注册ContextPath
        //......
        
        return ShenyuResultMessage.SUCCESS;
    }
  • org.apache.shenyu.admin.service.register.ShenyuClientRegisterDubboServiceImpl#registerMetadata()

    插入或更新元数据,然后发布同步事件到网关。

    @Override
    protected void registerMetadata(final MetaDataRegisterDTO dto) {
            // 获取metaDataService
            MetaDataService metaDataService = getMetaDataService();
            // 元数据是否存在
            MetaDataDO exist = metaDataService.findByPath(dto.getPath());
            // 插入或更新元数据
            metaDataService.saveOrUpdateMetaData(exist, dto);
    }

    @Override
    public void saveOrUpdateMetaData(final MetaDataDO exist, final MetaDataRegisterDTO metaDataDTO) {
        DataEventTypeEnum eventType;
        // 数据类型转换 DTO->DO
        MetaDataDO metaDataDO = MetaDataTransfer.INSTANCE.mapRegisterDTOToEntity(metaDataDTO);
        // 插入数据
        if (Objects.isNull(exist)) {
            Timestamp currentTime = new Timestamp(System.currentTimeMillis());
            metaDataDO.setId(UUIDUtils.getInstance().generateShortUuid());
            metaDataDO.setDateCreated(currentTime);
            metaDataDO.setDateUpdated(currentTime);
            metaDataMapper.insert(metaDataDO);
            eventType = DataEventTypeEnum.CREATE;
        } else {
            // 更新数据
            metaDataDO.setId(exist.getId());
            metaDataMapper.update(metaDataDO);
            eventType = DataEventTypeEnum.UPDATE;
        }
        // 发布同步事件到网关
        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.META_DATA, eventType,
                Collections.singletonList(MetaDataTransfer.INSTANCE.mapToData(metaDataDO))));
    }
1.3.2 注册URI
  • org.apache.shenyu.admin.service.register.FallbackShenyuClientRegisterService#registerURI()

服务端收到客户端注册的URI信息后,进行处理。

    @Override
    public String registerURI(final String selectorName, final List<URIRegisterDTO> uriList) {
        String result;
        String key = key(selectorName);
        try {
            this.removeFallBack(key);
            // 注册URI
            result = this.doRegisterURI(selectorName, uriList);
            logger.info("Register success: {},{}", selectorName, uriList);
        } catch (Exception ex) {
            logger.warn("Register exception: cause:{}", ex.getMessage());
            result = "";
            // 注册失败后,进行重试
            this.addFallback(key, new FallbackHolder(selectorName, uriList));
        }
        return result;
    }
  • org.apache.shenyu.admin.service.register.AbstractShenyuClientRegisterServiceImpl#doRegisterURI()

从客户端注册的URI中获取有效的URI,更新对应的选择器handle属性,向网关发送选择器更新事件。

@Override
    public String doRegisterURI(final String selectorName, final List<URIRegisterDTO> uriList) {
        //参数检查
        if (CollectionUtils.isEmpty(uriList)) {
            return "";
        }
        //获取选择器信息
        SelectorDO selectorDO = selectorService.findByNameAndPluginName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));
        if (Objects.isNull(selectorDO)) {
            throw new ShenyuException("doRegister Failed to execute,wait to retry.");
        }
        // 获取有效的URI
        List<URIRegisterDTO> validUriList = uriList.stream().filter(dto -> Objects.nonNull(dto.getPort()) && StringUtils.isNotBlank(dto.getHost())).collect(Collectors.toList());
        // 构建选择器的handle属性
        String handler = buildHandle(validUriList, selectorDO);
        if (handler != null) {
            selectorDO.setHandle(handler);
            SelectorData selectorData = selectorService.buildByName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));
            selectorData.setHandle(handler);
            // 向数据库更新选择器的handle属性
            selectorService.updateSelective(selectorDO);
            // 向网关发送选择器更新事件
            eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData)));
        }
        return ShenyuResultMessage.SUCCESS;
    }

关于服务注册的源码分析就以及完成了,分析流程图如下:

接下来就分析dubbo插件是如何根据这些信息向http服务发起调用。

2. 服务调用

dubbo插件是ShenYu网关用于将http请求转成 dubbo协议,调用dubbo服务的核心处理插件。

以官网提供的案例 Dubbo快速开始 为例,一个dubbo服务通过注册中心向shenyu-admin注册后,通过ShenYu网关代理,请求如下:

GET http://localhost:9195/dubbo/findById?id=100
Accept: application/json

Dubbo插件中,类继承关系如下:

  • ShenyuPlugin:顶层接口,定义接口方法;
  • AbstractShenyuPlugin:抽象类,实现插件共有逻辑;
  • AbstractDubboPlugin:dubbo插件抽象类,实现dubbo共有逻辑;
  • ApacheDubboPlugin:ApacheDubbo插件。

ShenYu网关支持ApacheDubbo和AlibabaDubbo

2.1 接收请求

通过ShenYu网关代理后,请求入口是ShenyuWebHandler,它实现了org.springframework.web.server.WebHandler接口。

public final class ShenyuWebHandler implements WebHandler, ApplicationListener<SortPluginEvent> {
    //......
    
    /**
     * 处理web请求
     */
    @Override
    public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {
       // 执行默认插件链
        Mono<Void> execute = new DefaultShenyuPluginChain(plugins).execute(exchange);
        if (scheduled) {
            return execute.subscribeOn(scheduler);
        }
        return execute;
    }
    
    private static class DefaultShenyuPluginChain implements ShenyuPluginChain {

        private int index;

        private final List<ShenyuPlugin> plugins;

        /**
         * 实例化默认插件链
         */
        DefaultShenyuPluginChain(final List<ShenyuPlugin> plugins) {
            this.plugins = plugins;
        }

        /**
         * 执行每个插件.
         */
        @Override
        public Mono<Void> execute(final ServerWebExchange exchange) {
            return Mono.defer(() -> {
                if (this.index < plugins.size()) {
                    // 获取当前执行插件
                    ShenyuPlugin plugin = plugins.get(this.index++);
                    // 是否跳过当前插件
                    boolean skip = plugin.skip(exchange);
                    if (skip) {
                        // 如果跳过就执行下一个
                        return this.execute(exchange);
                    }
                    // 执行当前插件
                    return plugin.execute(exchange, this);
                }
                return Mono.empty();
            });
        }
    }
}

2.2 匹配规则

  • org.apache.shenyu.plugin.base.AbstractShenyuPlugin#execute()

execute()方法中执行选择器和规则的匹配逻辑。

  • 匹配选择器;
  • 匹配规则;
  • 执行插件。
@Override
    public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
        // 插件名称
        String pluginName = named();
        // 插件信息
        PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
        if (pluginData != null && pluginData.getEnabled()) {
            // 选择器信息
            final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);
            if (CollectionUtils.isEmpty(selectors)) {
                return handleSelectorIfNull(pluginName, exchange, chain);
            }
            // 匹配选择器
            SelectorData selectorData = matchSelector(exchange, selectors);
            if (Objects.isNull(selectorData)) {
                return handleSelectorIfNull(pluginName, exchange, chain);
            }
            selectorLog(selectorData, pluginName);
            // 规则信息
            List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());
            if (CollectionUtils.isEmpty(rules)) {
                return handleRuleIfNull(pluginName, exchange, chain);
            }
            // 匹配规则
            RuleData rule;
            if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {
                //get last
                rule = rules.get(rules.size() - 1);
            } else {
                rule = matchRule(exchange, rules);
            }
            if (Objects.isNull(rule)) {
                return handleRuleIfNull(pluginName, exchange, chain);
            }
            ruleLog(rule, pluginName);
            // 执行插件
            return doExecute(exchange, chain, selectorData, rule);
        }
        return chain.execute(exchange);
    }

2.3 执行GlobalPlugin

  • org.apache.shenyu.plugin.global.GlobalPlugin#execute()

GlobalPlugin是一个全局插件,在execute()方法中构建上下文信息。

public class GlobalPlugin implements ShenyuPlugin {
    // 构建上下文信息
    private final ShenyuContextBuilder builder;
    
    //......
    
    @Override
    public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
       // 构建上下文信息,传入到 exchange 中
        ShenyuContext shenyuContext = builder.build(exchange);
        exchange.getAttributes().put(Constants.CONTEXT, shenyuContext);
        return chain.execute(exchange);
    }
    
    //......
}
  • org.apache.shenyu.plugin.global.DefaultShenyuContextBuilder#build()

构建默认的上下文信息。

public class DefaultShenyuContextBuilder implements ShenyuContextBuilder {
    //......
    
    @Override
    public ShenyuContext build(final ServerWebExchange exchange) {
        //构建参数
        Pair<String, MetaData> buildData = buildData(exchange);
        //包装ShenyuContext
        return decoratorMap.get(buildData.getLeft()).decorator(buildDefaultContext(exchange.getRequest()), buildData.getRight());
    }
    
    private Pair<String, MetaData> buildData(final ServerWebExchange exchange) {
        //......
        //根据请求的uri获取元数据
        MetaData metaData = MetaDataCache.getInstance().obtain(request.getURI().getPath());
        if (Objects.nonNull(metaData) && Boolean.TRUE.equals(metaData.getEnabled())) {
            exchange.getAttributes().put(Constants.META_DATA, metaData);
            return Pair.of(metaData.getRpcType(), metaData);
        } else {
            return Pair.of(RpcTypeEnum.HTTP.getName(), new MetaData());
        }
    }
    //设置默认的上下文信息
    private ShenyuContext buildDefaultContext(final ServerHttpRequest request) {
        String appKey = request.getHeaders().getFirst(Constants.APP_KEY);
        String sign = request.getHeaders().getFirst(Constants.SIGN);
        String timestamp = request.getHeaders().getFirst(Constants.TIMESTAMP);
        ShenyuContext shenyuContext = new ShenyuContext();
        String path = request.getURI().getPath();
        shenyuContext.setPath(path); //请求路径
        shenyuContext.setAppKey(appKey);
        shenyuContext.setSign(sign);
        shenyuContext.setTimestamp(timestamp);
        shenyuContext.setStartDateTime(LocalDateTime.now());
        Optional.ofNullable(request.getMethod()).ifPresent(httpMethod -> shenyuContext.setHttpMethod(httpMethod.name()));//请求方法
        return shenyuContext;
    }
 }
  • org.apache.shenyu.plugin.dubbo.common.context.DubboShenyuContextDecorator#decorator()

包装ShenyuContext

public class DubboShenyuContextDecorator implements ShenyuContextDecorator {
    
    @Override
    public ShenyuContext decorator(final ShenyuContext shenyuContext, final MetaData metaData) {
        shenyuContext.setModule(metaData.getAppName());//获取AppName
        shenyuContext.setMethod(metaData.getServiceName()); //获取ServiceName
        shenyuContext.setContextPath(metaData.getContextPath()); //获取contextPath
        shenyuContext.setRpcType(RpcTypeEnum.DUBBO.getName()); // dubbo服务
        return shenyuContext;
    }
    
    @Override
    public String rpcType() {
        return RpcTypeEnum.DUBBO.getName();
    }
}

2.4 执行RpcParamTransformPlugin

RpcParamTransformPlugin负责从http请求中读取参数,保存到exchange中,传递给rpc服务。

  • org.apache.shenyu.plugin.base.RpcParamTransformPlugin#execute()

execute()方法中,执行该插件的核心逻辑:从exchange中获取请求信息,根据请求传入的内容形式处理参数。

public class RpcParamTransformPlugin implements ShenyuPlugin {

    @Override
    public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
        //从exchange中获取请求信息
        ServerHttpRequest request = exchange.getRequest();
        ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
        if (Objects.nonNull(shenyuContext)) {
           // 如果请求参数格式是APPLICATION_JSON
            MediaType mediaType = request.getHeaders().getContentType();
            if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {
                return body(exchange, request, chain);
            }
            // 如果请求参数格式是APPLICATION_FORM_URLENCODED
            if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType)) {
                return formData(exchange, request, chain);
            }
            //一般查询请求
            return query(exchange, request, chain);
        }
        return chain.execute(exchange);
    }
    
    // 如果请求参数格式是APPLICATION_JSON
    private Mono<Void> body(final ServerWebExchange exchange, final ServerHttpRequest serverHttpRequest, final ShenyuPluginChain chain) {
        return Mono.from(DataBufferUtils.join(serverHttpRequest.getBody())
                .flatMap(body -> {
                    exchange.getAttributes().put(Constants.PARAM_TRANSFORM, resolveBodyFromRequest(body));//解析body,保存到exchange中
                    return chain.execute(exchange);
                }));
    }
   // 如果请求参数格式是APPLICATION_FORM_URLENCODED
    private Mono<Void> formData(final ServerWebExchange exchange, final ServerHttpRequest serverHttpRequest, final ShenyuPluginChain chain) {
        return Mono.from(DataBufferUtils.join(serverHttpRequest.getBody())
                .flatMap(map -> {
                    String param = resolveBodyFromRequest(map);
                    LinkedMultiValueMap<String, String> linkedMultiValueMap;
                    try {
                        linkedMultiValueMap = BodyParamUtils.buildBodyParams(URLDecoder.decode(param, StandardCharsets.UTF_8.name())); //格式化数据
                    } catch (UnsupportedEncodingException e) {
                        return Mono.error(e);
                    }
                    exchange.getAttributes().put(Constants.PARAM_TRANSFORM, HttpParamConverter.toMap(() -> linkedMultiValueMap));// 保存到exchange中
                    return chain.execute(exchange);
                }));
    }
    //一般查询请求
    private Mono<Void> query(final ServerWebExchange exchange, final ServerHttpRequest serverHttpRequest, final ShenyuPluginChain chain) {
        exchange.getAttributes().put(Constants.PARAM_TRANSFORM, HttpParamConverter.ofString(() -> serverHttpRequest.getURI().getQuery()));//保存到exchange中
        return chain.execute(exchange);
    }
    //......
 }

2.5 执行DubboPlugin

  • org.apache.shenyu.plugin.dubbo.common.AbstractDubboPlugin#doExecute()

doExecute()方法中,主要是检查元数据和参数。

public abstract class AbstractDubboPlugin extends AbstractShenyuPlugin {
    
    @Override
    public Mono<Void> doExecute(final ServerWebExchange exchange,
                                   final ShenyuPluginChain chain,
                                   final SelectorData selector,
                                   final RuleData rule) {
        //获取参数
        String param = exchange.getAttribute(Constants.PARAM_TRANSFORM);
        //获取上下文信息
        ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
        assert shenyuContext != null;
        //获取元数据
        MetaData metaData = exchange.getAttribute(Constants.META_DATA);
        //检查元数据
        if (!checkMetaData(metaData)) {
            LOG.error(" path is : {}, meta data have error : {}", shenyuContext.getPath(), metaData);
            exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
            Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.META_DATA_ERROR, null);
            return WebFluxResultUtils.result(exchange, error);
        }
        //检查元数据和参数
        if (Objects.nonNull(metaData) && StringUtils.isNoneBlank(metaData.getParameterTypes()) && StringUtils.isBlank(param)) {
            exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
            Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.DUBBO_HAVE_BODY_PARAM, null);
            return WebFluxResultUtils.result(exchange, error);
        }
        //设置rpcContext
        this.rpcContext(exchange);
        //进行dubbo服务调用
        return this.doDubboInvoker(exchange, chain, selector, rule, metaData, param);
    }
}
  • org.apache.shenyu.plugin.apache.dubbo.ApacheDubboPlugin#doDubboInvoker()

doDubboInvoker()方法中设置特殊的上下文信息,然后开始dubbo的泛化调用。

public class ApacheDubboPlugin extends AbstractDubboPlugin {
    
    @Override
    protected Mono<Void> doDubboInvoker(final ServerWebExchange exchange,
                                        final ShenyuPluginChain chain,
                                        final SelectorData selector,
                                        final RuleData rule,
                                        final MetaData metaData,
                                        final String param) {
        //设置当前的选择器和规则信息,以及请求地址,用于支持dubbo的灰度
        RpcContext.getContext().setAttachment(Constants.DUBBO_SELECTOR_ID, selector.getId());
        RpcContext.getContext().setAttachment(Constants.DUBBO_RULE_ID, rule.getId());
        RpcContext.getContext().setAttachment(Constants.DUBBO_REMOTE_ADDRESS, Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress());
        //dubbo的泛化调用
        final Mono<Object> result = dubboProxyService.genericInvoker(param, metaData, exchange);
        //执行下一个插件
        return result.then(chain.execute(exchange));
    }
}
  • org.apache.shenyu.plugin.apache.dubbo.proxy.ApacheDubboProxyService#genericInvoker()

genericInvoker()方法:

  • 获取ReferenceConfig对象;
  • 获取泛化服务GenericService对象;
  • 构造请求参数pair对象;
  • 发起异步的泛化调用。
public class ApacheDubboProxyService {
    //...... 

    /**
     * Generic invoker object.
     */
    public Mono<Object> genericInvoker(final String body, final MetaData metaData, final ServerWebExchange exchange) throws ShenyuException {
        //1.获取ReferenceConfig对象
        ReferenceConfig<GenericService> reference = ApacheDubboConfigCache.getInstance().get(metaData.getPath());
        //如果没有获取到
        if (Objects.isNull(reference) || StringUtils.isEmpty(reference.getInterface())) {
            //失效当前缓存的信息
            ApacheDubboConfigCache.getInstance().invalidate(metaData.getPath());
            //使用元数据进行再次初始化
            reference = ApacheDubboConfigCache.getInstance().initRef(metaData);
        }
        //2.获取泛化服务GenericService对象
        GenericService genericService = reference.get();
        //3.构造请求参数pair对象
        Pair<String[], Object[]> pair;
        if (StringUtils.isBlank(metaData.getParameterTypes()) || ParamCheckUtils.dubboBodyIsEmpty(body)) {
            pair = new ImmutablePair<>(new String[]{}, new Object[]{});
        } else {
            pair = dubboParamResolveService.buildParameter(body, metaData.getParameterTypes());
        }
        //4.发起异步的泛化调用
        return Mono.fromFuture(invokeAsync(genericService, metaData.getMethodName(), pair.getLeft(), pair.getRight()).thenApply(ret -> {
            //处理结果
            if (Objects.isNull(ret)) {
                ret = Constants.DUBBO_RPC_RESULT_EMPTY;
            }
            exchange.getAttributes().put(Constants.RPC_RESULT, ret);
            exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName());
            return ret;
        })).onErrorMap(exception -> exception instanceof GenericException ? new ShenyuException(((GenericException) exception).getExceptionMessage()) : new ShenyuException(exception));//处理异常
    }
    
    //泛化调用,异步操作
    private CompletableFuture<Object> invokeAsync(final GenericService genericService, final String method, final String[] parameterTypes, final Object[] args) throws GenericException {
        genericService.$invoke(method, parameterTypes, args);
        Object resultFromFuture = RpcContext.getContext().getFuture();
        return resultFromFuture instanceof CompletableFuture ? (CompletableFuture<Object>) resultFromFuture : CompletableFuture.completedFuture(resultFromFuture);
    }
}

通过泛化调用就可以实现在网关调用dubbo服务了。

ReferenceConfig对象是支持泛化调用的关键对象 ,它的初始化操作是在数据同步的时候完成的。这里涉及两部分数据,一是同步的插件handler信息,二是同步的插件元数据信息。

  • org.apache.shenyu.plugin.dubbo.common.handler.AbstractDubboPluginDataHandler#handlerPlugin()

当插件数据更新时,数据同步模块会将数据从shenyu-admin同步到网关。在handlerPlugin()中执行初始化操作。

public abstract class AbstractDubboPluginDataHandler implements PluginDataHandler {
    //......
    
    //初始化配置缓存
   protected abstract void initConfigCache(DubboRegisterConfig dubboRegisterConfig);

    @Override
    public void handlerPlugin(final PluginData pluginData) {
        if (Objects.nonNull(pluginData) && Boolean.TRUE.equals(pluginData.getEnabled())) {
            //数据反序列化
            DubboRegisterConfig dubboRegisterConfig = GsonUtils.getInstance().fromJson(pluginData.getConfig(), DubboRegisterConfig.class);
            DubboRegisterConfig exist = Singleton.INST.get(DubboRegisterConfig.class);
            if (Objects.isNull(dubboRegisterConfig)) {
                return;
            }
            if (Objects.isNull(exist) || !dubboRegisterConfig.equals(exist)) {
                // 执行初始化操作
                this.initConfigCache(dubboRegisterConfig);
            }
            Singleton.INST.single(DubboRegisterConfig.class, dubboRegisterConfig);
        }
    }
    //......
}
  • org.apache.shenyu.plugin.apache.dubbo.handler.ApacheDubboPluginDataHandler#initConfigCache()

执行初始化操作。

public class ApacheDubboPluginDataHandler extends AbstractDubboPluginDataHandler {

    @Override
    protected void initConfigCache(final DubboRegisterConfig dubboRegisterConfig) {
        //执行初始化操作
        ApacheDubboConfigCache.getInstance().init(dubboRegisterConfig);
        //失效之前缓存的结果
        ApacheDubboConfigCache.getInstance().invalidateAll();
    }
}

  • org.apache.shenyu.plugin.apache.dubbo.cache.ApacheDubboConfigCache#init()

在初始化中,设置registryConfigconsumerConfig

public final class ApacheDubboConfigCache extends DubboConfigCache {
    //......  
   /**
     * 初始化
     */
    public void init(final DubboRegisterConfig dubboRegisterConfig) {
        //创建ApplicationConfig
        if (Objects.isNull(applicationConfig)) {
            applicationConfig = new ApplicationConfig("shenyu_proxy");
        }
        //协议或者地址发生改变时,需要更新registryConfig
        if (needUpdateRegistryConfig(dubboRegisterConfig)) {
            RegistryConfig registryConfigTemp = new RegistryConfig();
            registryConfigTemp.setProtocol(dubboRegisterConfig.getProtocol());
            registryConfigTemp.setId("shenyu_proxy");
            registryConfigTemp.setRegister(false);
            registryConfigTemp.setAddress(dubboRegisterConfig.getRegister());            Optional.ofNullable(dubboRegisterConfig.getGroup()).ifPresent(registryConfigTemp::setGroup);
            registryConfig = registryConfigTemp;
        }
        //创建ConsumerConfig
        if (Objects.isNull(consumerConfig)) {
            consumerConfig = ApplicationModel.getConfigManager().getDefaultConsumer().orElseGet(() -> {
                ConsumerConfig consumerConfig = new ConsumerConfig();
                consumerConfig.refresh();
                return consumerConfig;
            });
           
 //设置ConsumerConfig
            Optional.ofNullable(dubboRegisterConfig.getThreadpool()).ifPresent(consumerConfig::setThreadpool); 
            Optional.ofNullable(dubboRegisterConfig.getCorethreads()).ifPresent(consumerConfig::setCorethreads);
            
 Optional.ofNullable(dubboRegisterConfig.getThreads()).ifPresent(consumerConfig::setThreads);
            
 Optional.ofNullable(dubboRegisterConfig.getQueues()).ifPresent(consumerConfig::setQueues);
        }
    }
    
    //是否需要更新注册配置
    private boolean needUpdateRegistryConfig(final DubboRegisterConfig dubboRegisterConfig) {
        if (Objects.isNull(registryConfig)) {
            return true;
        }
        return !Objects.equals(dubboRegisterConfig.getProtocol(), registryConfig.getProtocol())
                || !Objects.equals(dubboRegisterConfig.getRegister(), registryConfig.getAddress())
                || !Objects.equals(dubboRegisterConfig.getProtocol(), registryConfig.getProtocol());
    }

    //......
}
  • org.apache.shenyu.plugin.apache.dubbo.subscriber.ApacheDubboMetaDataSubscriber#onSubscribe()

当元数据更新时,数据同步模块会将数据从shenyu-admin同步到网关。在onSubscribe()方法中执行元数据更新操作。

public class ApacheDubboMetaDataSubscriber implements MetaDataSubscriber {
    //本地内存缓存
    private static final ConcurrentMap<String, MetaData> META_DATA = Maps.newConcurrentMap();

    //元数据发生更新
    public void onSubscribe(final MetaData metaData) {
        // dubbo服务的元数据更新
        if (RpcTypeEnum.DUBBO.getName().equals(metaData.getRpcType())) {
            //对应的元数据是否存在
            MetaData exist = META_DATA.get(metaData.getPath());
            if (Objects.isNull(exist) || Objects.isNull(ApacheDubboConfigCache.getInstance().get(metaData.getPath()))) {
                // 首次初始化
                ApacheDubboConfigCache.getInstance().initRef(metaData);
            } else {
                // 对应的元数据发生了更新操作
                if (!Objects.equals(metaData.getServiceName(), exist.getServiceName())
                        || !Objects.equals(metaData.getRpcExt(), exist.getRpcExt())
                        || !Objects.equals(metaData.getParameterTypes(), exist.getParameterTypes())
                        || !Objects.equals(metaData.getMethodName(), exist.getMethodName())) {
                    //根据最新的元数据再次构建ReferenceConfig
                    ApacheDubboConfigCache.getInstance().build(metaData);
                }
            }
            //本地内存缓存
            META_DATA.put(metaData.getPath(), metaData);
        }
    }

    //删除元数据
    public void unSubscribe(final MetaData metaData) {
        if (RpcTypeEnum.DUBBO.getName().equals(metaData.getRpcType())) {
            //使ReferenceConfig失效
            ApacheDubboConfigCache.getInstance().invalidate(metaData.getPath());
            META_DATA.remove(metaData.getPath());
        }
    }
}
  • org.apache.shenyu.plugin.apache.dubbo.cache.ApacheDubboConfigCache#initRef()

通过metaData构建ReferenceConfig对象。

public final class ApacheDubboConfigCache extends DubboConfigCache {
    //......
    
    public ReferenceConfig<GenericService> initRef(final MetaData metaData) {
            try {
                //先尝试从缓存中获取,存在就直接返回
                ReferenceConfig<GenericService> referenceConfig = cache.get(metaData.getPath());
                if (StringUtils.isNoneBlank(referenceConfig.getInterface())) {
                    return referenceConfig;
                }
            } catch (ExecutionException e) {
                LOG.error("init dubbo ref exception", e);
            }
           //不存在,就构建
            return build(metaData);
        }

        /**
         * Build reference config.
         */
        @SuppressWarnings("deprecation")
        public ReferenceConfig<GenericService> build(final MetaData metaData) {
            if (Objects.isNull(applicationConfig) || Objects.isNull(registryConfig)) {
                return new ReferenceConfig<>();
            }
            ReferenceConfig<GenericService> reference = new ReferenceConfig<>(); //新建ReferenceConfig
            reference.setGeneric("true"); //泛化调用
            reference.setAsync(true);//支持异步

            reference.setApplication(applicationConfig);//设置应用配置
            reference.setRegistry(registryConfig);//设置注册中心配置
            reference.setConsumer(consumerConfig);//设置消费者配置
            reference.setInterface(metaData.getServiceName());//设置服务接口
            reference.setProtocol("dubbo");//设置dubbo协议
            reference.setCheck(false); //不检查 service provider
            reference.setLoadbalance("gray");//支持灰度

            Map<String, String> parameters = new HashMap<>(2);
            parameters.put("dispatcher", "direct");
            reference.setParameters(parameters);//自定义参数

            String rpcExt = metaData.getRpcExt();//rpc扩展参数
            DubboParam dubboParam = parserToDubboParam(rpcExt);//反序列化
            if (Objects.nonNull(dubboParam)) {
                if (StringUtils.isNoneBlank(dubboParam.getVersion())) {
                    reference.setVersion(dubboParam.getVersion());//设置版本
                }
                if (StringUtils.isNoneBlank(dubboParam.getGroup())) {
                    reference.setGroup(dubboParam.getGroup());//设置分组
                }
                if (StringUtils.isNoneBlank(dubboParam.getUrl())) {
                    reference.setUrl(dubboParam.getUrl());//设置url
                }
                if (StringUtils.isNoneBlank(dubboParam.getCluster())) {
                    reference.setCluster(dubboParam.getCluster());//设置Cluster type
                }
                Optional.ofNullable(dubboParam.getTimeout()).ifPresent(reference::setTimeout);//timeout
                Optional.ofNullable(dubboParam.getRetries()).ifPresent(reference::setRetries);//retires
                Optional.ofNullable(dubboParam.getSent()).ifPresent(reference::setSent);//Whether to ack async-sent
            }
            try {
                //获取GenericService
                Object obj = reference.get();
                if (Objects.nonNull(obj)) {
                    LOG.info("init apache dubbo reference success there meteData is :{}", metaData);
                    //缓存当前的reference
                    cache.put(metaData.getPath(), reference);
                }
            } catch (Exception e) {
                LOG.error("init apache dubbo reference exception", e);
            }
            return reference;
        }
    //......
    }

2.6 执行ResponsePlugin

  • org.apache.shenyu.plugin.response.ResponsePlugin#execute()

响应结果由ResponsePlugin插件处理。

    @Override
    public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
        ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
        assert shenyuContext != null;
        // 根据rpc类型处理结果
        return writerMap.get(shenyuContext.getRpcType()).writeWith(exchange, chain);
    }

处理类型由MessageWriter决定,类继承关系如下:

  • MessageWriter:接口,定义消息处理方法;
  • NettyClientMessageWriter:处理Netty调用结果;
  • RPCMessageWriter:处理RPC调用结果;
  • WebClientMessageWriter:处理WebClient调用结果;

Dubbo服务调用,处理结果当然是RPCMessageWriter了。

  • org.apache.shenyu.plugin.response.strategy.RPCMessageWriter#writeWith()

writeWith()方法中处理响应结果。


public class RPCMessageWriter implements MessageWriter {

    @Override
    public Mono<Void> writeWith(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
        return chain.execute(exchange).then(Mono.defer(() -> {
            Object result = exchange.getAttribute(Constants.RPC_RESULT); //获取结果
            if (Objects.isNull(result)) { //处理异常
                Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.SERVICE_RESULT_ERROR, null);
                return WebFluxResultUtils.result(exchange, error);
            }
            return WebFluxResultUtils.result(exchange, result);//返回结果
        }));
    }
}

分析至此,关于Dubbo插件的源码分析就完成了,分析流程图如下:

3. 小结

本文源码分析从Dubbo服务注册开始,到Dubbo插件的服务调用。Dubbo插件主要用来处理将http请求转成dubbo协议,主要逻辑是通过泛化调用实现。

Apache ShenYu源码阅读系列-Divide插件

Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API 网关。

ShenYu 网关使用 divide 插件来处理 http 请求。你可以查看官方文档 Http快速开始 了解如何使用该插件。

本文基于shenyu-2.4.3版本进行源码分析,官网的介绍请参考 Http服务接入

1. 服务注册

1.1 声明注册接口

使用注解@ShenyuSpringMvcClient将服务注册到网关。简单demo如下:

@RestController
@RequestMapping("/order")
@ShenyuSpringMvcClient(path = "/order")  // API注册
public class OrderController {
    @GetMapping("/findById")
    @ShenyuSpringMvcClient(path = "/findById", desc = "Find by id") // 方法注册
    public OrderDTO findById(@RequestParam("id") final String id) {
        return build(id, "hello world findById");
    }
}

注解定义:


/**
 * 作用于类和方法上
 */
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface ShenyuSpringMvcClient {
    
	//注册路径
    String path() default "";
    
    //规则名称
    String ruleName() default "";
   
    //描述信息
    String desc() default "";

    //是否启用
    boolean enabled() default true;
    
    //注册元数据
    boolean registerMetaData() default false;
}

1.2 扫描注解信息

注解扫描通过SpringMvcClientBeanPostProcessor完成,它实现了BeanPostProcessor接口,是Spring提供的后置处理器。

在构造器实例化的过程中:

  • 读取属性配置
  • 添加注解,读取path信息
  • 启动注册中心,向shenyu-admin注册
public class SpringMvcClientBeanPostProcessor implements BeanPostProcessor {
    //...
    /**
     * 构造器实例化
     */
    public SpringMvcClientBeanPostProcessor(final PropertiesConfig clientConfig,
                                            final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
        // 1. 读取属性配置
        Properties props = clientConfig.getProps();
        this.appName = props.getProperty(ShenyuClientConstants.APP_NAME);
        this.contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH, "");
        if (StringUtils.isBlank(appName) && StringUtils.isBlank(contextPath)) {
            String errorMsg = "http register param must config the appName or contextPath";
            LOG.error(errorMsg);
            throw new ShenyuClientIllegalArgumentException(errorMsg);
        }
        this.isFull = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString()));
        // 2. 添加注解
        mappingAnnotation.add(ShenyuSpringMvcClient.class);
        mappingAnnotation.add(PostMapping.class);
        mappingAnnotation.add(GetMapping.class);
        mappingAnnotation.add(DeleteMapping.class);
        mappingAnnotation.add(PutMapping.class);
        mappingAnnotation.add(RequestMapping.class);
        // 3. 启动注册中心
        publisher.start(shenyuClientRegisterRepository);
    }
    
    @Override
    public Object postProcessAfterInitialization(@NonNull final Object bean, @NonNull final String beanName) throws BeansException {
       // 重写后置处理器逻辑
        
        return bean;
    }
    
  • SpringMvcClientBeanPostProcessor#postProcessAfterInitialization()

重写后置处理器逻辑:读取注解信息,构建元数据对象和URI对象,并向shenyu-admin注册。

    @Override
    public Object postProcessAfterInitialization(@NonNull final Object bean, @NonNull final String beanName) throws BeansException {
        // 1. 如果是注册整个服务或者不是Controller类,就不处理
        if (Boolean.TRUE.equals(isFull) || !hasAnnotation(bean.getClass(), Controller.class)) {
            return bean;
        }
        // 2. 读取类上的注解 ShenyuSpringMvcClient
        final ShenyuSpringMvcClient beanShenyuClient = AnnotationUtils.findAnnotation(bean.getClass(), ShenyuSpringMvcClient.class);
        // 2.1构建superPath
        final String superPath = buildApiSuperPath(bean.getClass());
        // 2.2 是否注册整个类方法
        if (Objects.nonNull(beanShenyuClient) && superPath.contains("*")) {
            // 构建元数据对象,然后向shenyu-admin注册
            publisher.publishEvent(buildMetaDataDTO(beanShenyuClient, pathJoin(contextPath, superPath)));
            return bean;
        }
        // 3. 读取所有方法
        final Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(bean.getClass());
        for (Method method : methods) {
            // 3.1 读取方法上的注解 ShenyuSpringMvcClient
            ShenyuSpringMvcClient methodShenyuClient = AnnotationUtils.findAnnotation(method, ShenyuSpringMvcClient.class);
            // 如果方法上面没有注解,就用类上面的注解
            methodShenyuClient = Objects.isNull(methodShenyuClient) ? beanShenyuClient : methodShenyuClient;
            if (Objects.nonNull(methodShenyuClient)) {
               // 3.2 构建path信息,构建元数据对象,向shenyu-admin注册
                publisher.publishEvent(buildMetaDataDTO(methodShenyuClient, buildApiPath(method, superPath)));
            }
        }
        
        return bean;
    }
  • 1.如果是注册整个服务或者不是Controller类,就不处理
  • 2.读取类上的注解 ShenyuSpringMvcClient,如果是注册整个类,就在这里构建元数据对象,然后向shenyu-admin注册
  • 3.处理方法上的注解 ShenyuSpringMvcClient,针对特定方法构建path信息,构建元数据对象,然后向shenyu-admin注册

这里有两个取path的方法,需要特别说明一下:

  • buildApiSuperPath()

    构造SuperPath:先从类上的注解ShenyuSpringMvcClientpath属性,如果没有,就从当前类的RequestMapping注解中取path信息。

    private String buildApiSuperPath(@NonNull final Class<?> method) {
        // 先从类上的注解ShenyuSpringMvcClient取path属性
        ShenyuSpringMvcClient shenyuSpringMvcClient = AnnotationUtils.findAnnotation(method, ShenyuSpringMvcClient.class);
        if (Objects.nonNull(shenyuSpringMvcClient) && StringUtils.isNotBlank(shenyuSpringMvcClient.path())) {
            return shenyuSpringMvcClient.path();
        }
        // 从当前类的RequestMapping注解中取path信息
        RequestMapping requestMapping = AnnotationUtils.findAnnotation(method, RequestMapping.class);
        if (Objects.nonNull(requestMapping) && ArrayUtils.isNotEmpty(requestMapping.path()) && StringUtils.isNotBlank(requestMapping.path()[0])) {
            return requestMapping.path()[0];
        }
        return "";
    }
  • buildApiPath()

    构建path:先读取方法上的注解ShenyuSpringMvcClient,如果存在就构建;否则从方法的其他注解上获取path信息;完整的path = contextPath(上下文信息)+superPath(类信息)+methodPath(方法信息)

    private String buildApiPath(@NonNull final Method method, @NonNull final String superPath) {
        // 1. 读取方法上的注解ShenyuSpringMvcClient
        ShenyuSpringMvcClient shenyuSpringMvcClient = AnnotationUtils.findAnnotation(method, ShenyuSpringMvcClient.class);
        // 1.1如果存在path,就构建
        if (Objects.nonNull(shenyuSpringMvcClient) && StringUtils.isNotBlank(shenyuSpringMvcClient.path())) {
            //1.2完整 path = contextPath+superPath+methodPath
            return pathJoin(contextPath, superPath, shenyuSpringMvcClient.path());
        }
        // 2.从方法的其他注解上获取path信息
        final String path = getPathByMethod(method);
        if (StringUtils.isNotBlank(path)) {
             // 2.1 完整的path = contextPath+superPath+methodPath
            return pathJoin(contextPath, superPath, path);
        }
        return pathJoin(contextPath, superPath);
    }
  • getPathByMethod()

    从方法的其他注解上获取path信息,其他注解包括:

    • ShenyuSpringMvcClient
    • PostMapping
    • GetMapping
    • DeleteMapping
    • PutMapping
    • RequestMapping

    private String getPathByMethod(@NonNull final Method method) {
        // 遍历接口注解获取path信息
        for (Class<? extends Annotation> mapping : mappingAnnotation) {
            final String pathByAnnotation = getPathByAnnotation(AnnotationUtils.findAnnotation(method, mapping), pathAttributeNames);
            if (StringUtils.isNotBlank(pathByAnnotation)) {
                return pathByAnnotation;
            }
        }
        return null;
    }

扫描注解完成后,构建元数据对象,然后将该对象发送到shenyu-admin,即可完成注册。

  • 元数据对象

    包括当前注册方法的规则信息:contextPath,appName,注册路径,描述信息,注册类型,是否启用,规则名称和是否注册元数据。

 private MetaDataRegisterDTO buildMetaDataDTO(@NonNull final ShenyuSpringMvcClient shenyuSpringMvcClient, final String path) {
        return MetaDataRegisterDTO.builder()
                .contextPath(contextPath) // contextPath
                .appName(appName) // appName
                .path(path) // 注册路径,在网关规则匹配时使用
                .pathDesc(shenyuSpringMvcClient.desc()) // 描述信息
                .rpcType(RpcTypeEnum.HTTP.getName()) // divide插件,默认时http类型
                .enabled(shenyuSpringMvcClient.enabled()) // 是否启用规则
                .ruleName(StringUtils.defaultIfBlank(shenyuSpringMvcClient.ruleName(), path))//规则名称
                .registerMetaData(shenyuSpringMvcClient.registerMetaData()) //是否注册元数据信息
                .build();
    }

具体的注册逻辑由注册中心实现,在之前的文章中已经分析过了,这里就不再深入分析。

1.3 注册URI信息

ContextRegisterListener负责将客户端的URI信息注册到shenyu-admin,它实现了ApplicationListener接口,发生上下文刷新事件ContextRefreshedEvent时,执行onApplicationEvent()方法,实现注册逻辑。


public class ContextRegisterListener implements ApplicationListener<ContextRefreshedEvent>, BeanFactoryAware {
	//......
    
    /**
     * 构造器实例化
     */
    public ContextRegisterListener(final PropertiesConfig clientConfig) {
        // 读取属性配置
        final Properties props = clientConfig.getProps();
        this.isFull = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString()));
        this.contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH);
        if (Boolean.TRUE.equals(isFull)) {
            if (StringUtils.isBlank(contextPath)) {
                final String errorMsg = "http register param must config the contextPath";
                LOG.error(errorMsg);
                throw new ShenyuClientIllegalArgumentException(errorMsg);
            }
        }
        this.port = Integer.parseInt(Optional.ofNullable(props.getProperty(ShenyuClientConstants.PORT)).orElseGet(() -> "-1"));
        this.appName = props.getProperty(ShenyuClientConstants.APP_NAME);
        this.protocol = props.getProperty(ShenyuClientConstants.PROTOCOL, ShenyuClientConstants.HTTP);
        this.host = props.getProperty(ShenyuClientConstants.HOST);
    }

    @Override
    public void setBeanFactory(final BeanFactory beanFactory) throws BeansException {
        this.beanFactory = beanFactory;
    }

    // 执行应用事件
    @Override
    public void onApplicationEvent(@NonNull final ContextRefreshedEvent contextRefreshedEvent) {
        // 保证该方法执行一次
        if (!registered.compareAndSet(false, true)) {
            return;
        }
        // 1. 如果是注册整个服务
        if (Boolean.TRUE.equals(isFull)) {
            // 构建元数据,并注册
            publisher.publishEvent(buildMetaDataDTO());
        }
        try {
            // 获取端口信息
            final int mergedPort = port <= 0 ? PortUtils.findPort(beanFactory) : port;
            // 2. 构建URI数据,并注册
            publisher.publishEvent(buildURIRegisterDTO(mergedPort));
        } catch (ShenyuException e) {
            throw new ShenyuException(e.getMessage() + "please config ${shenyu.client.http.props.port} in xml/yml !");
        }
    }

    // 构建URI数据
    private URIRegisterDTO buildURIRegisterDTO(final int port) {
        return URIRegisterDTO.builder()
            .contextPath(this.contextPath) // contextPath
            .appName(appName) // appName
            .protocol(protocol) // 服务使用的协议
            .host(IpUtils.isCompleteHost(this.host) ? this.host : IpUtils.getHost(this.host)) //主机
            .port(port) // 端口
            .rpcType(RpcTypeEnum.HTTP.getName()) // divide插件,默认注册http类型
            .build();
    }

    // 构建元数据
    private MetaDataRegisterDTO buildMetaDataDTO() {
        return MetaDataRegisterDTO.builder()
            .contextPath(contextPath)
            .appName(appName)
            .path(contextPath)
            .rpcType(RpcTypeEnum.HTTP.getName())
            .enabled(true)
            .ruleName(contextPath)
            .build();
    }
}

1.4 处理注册信息

客户端通过注册中心注册的元数据和URI数据,在shenyu-admin进行处理,负责存储到数据库和同步给shenyu网关。Divide插件的客户端注册处理逻辑在ShenyuClientRegisterDivideServiceImpl中。继承关系如下:

  • ShenyuClientRegisterService:客户端注册服务,顶层接口;
  • FallbackShenyuClientRegisterService:注册失败,提供重试操作;
  • AbstractShenyuClientRegisterServiceImpl:抽象类,实现部分公共注册逻辑;
  • AbstractContextPathRegisterService:抽象类,负责注册ContextPath
  • ShenyuClientRegisterDivideServiceImpl:实现Divide插件的注册;
1.4.1 注册服务
  • org.apache.shenyu.admin.service.register.AbstractShenyuClientRegisterServiceImpl#register()

    客户端通过注册中心注册的元数据MetaDataRegisterDTO对象在shenyu-adminregister()方法被接送到。

   @Override
    public String register(final MetaDataRegisterDTO dto) {
        //1. 注册选择器
        String selectorHandler = selectorHandler(dto);
        String selectorId = selectorService.registerDefault(dto, PluginNameAdapter.rpcTypeAdapter(rpcType()), selectorHandler);
        //2. 注册规则
        String ruleHandler = ruleHandler();
        RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);
        ruleService.registerDefault(ruleDTO);
        //3. 注册元数据
        registerMetadata(dto);
        //4. 注册ContextPath
        String contextPath = dto.getContextPath();
        if (StringUtils.isNotEmpty(contextPath)) {
            registerContextPath(dto);
        }
        return ShenyuResultMessage.SUCCESS;
    }
1.4.1.1 注册选择器
  • org.apache.shenyu.admin.service.impl.SelectorServiceImpl#registerDefault()

构建contextPath,查找选择器信息是否存在,如果存在就返回id;不存在就创建默认的选择器信息。

    @Override
    public String registerDefault(final MetaDataRegisterDTO dto, final String pluginName, final String selectorHandler) {
        // 构建contextPath
        String contextPath = ContextPathUtils.buildContextPath(dto.getContextPath(), dto.getAppName());
        // 通过名称查找选择器信息是否存在
        SelectorDO selectorDO = findByNameAndPluginName(contextPath, pluginName);
        if (Objects.isNull(selectorDO)) {
            // 不存在就创建默认的选择器信息
            return registerSelector(contextPath, pluginName, selectorHandler);
        }
        return selectorDO.getId();
    }
  • 默认选择器信息

    在这里构建默认选择器信息及其条件属性。

   //注册选择器
   private String registerSelector(final String contextPath, final String pluginName, final String selectorHandler) {
        //构建选择器
        SelectorDTO selectorDTO = buildSelectorDTO(contextPath, pluginMapper.selectByName(pluginName).getId());
        selectorDTO.setHandle(selectorHandler);
        //注册默认选择器
        return registerDefault(selectorDTO);
    }
     //构建选择器
    private SelectorDTO buildSelectorDTO(final String contextPath, final String pluginId) {
        //构建默认选择器
        SelectorDTO selectorDTO = buildDefaultSelectorDTO(contextPath);
        selectorDTO.setPluginId(pluginId);
         //构建默认选择器的条件属性
        selectorDTO.setSelectorConditions(buildDefaultSelectorConditionDTO(contextPath));
        return selectorDTO;
    }
  • 构建默认选择器
private SelectorDTO buildDefaultSelectorDTO(final String name) {
    return SelectorDTO.builder()
            .name(name) // 名称
            .type(SelectorTypeEnum.CUSTOM_FLOW.getCode()) // 默认类型自定义
            .matchMode(MatchModeEnum.AND.getCode()) //默认匹配方式 and
            .enabled(Boolean.TRUE)  //默认启开启
            .loged(Boolean.TRUE)  //默认记录日志
            .continued(Boolean.TRUE) //默认继续后续选择器
            .sort(1) //默认顺序1
            .build();
}


  • 构建默认选择器条件属性
private List<SelectorConditionDTO> buildDefaultSelectorConditionDTO(final String contextPath) {
    SelectorConditionDTO selectorConditionDTO = new SelectorConditionDTO();
    selectorConditionDTO.setParamType(ParamTypeEnum.URI.getName()); // 默认参数类型URI
    selectorConditionDTO.setParamName("/");
    selectorConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); // 默认匹配策略 match
    selectorConditionDTO.setParamValue(contextPath + AdminConstants.URI_SUFFIX); // 默认值 /contextPath/**
    return Collections.singletonList(selectorConditionDTO);
}
  • 注册默认选择器
@Override
public String registerDefault(final SelectorDTO selectorDTO) {
    //选择器信息
    SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);
    //选择器条件属性
    List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();
    if (StringUtils.isEmpty(selectorDTO.getId())) {
        // 向数据库插入选择器信息
        selectorMapper.insertSelective(selectorDO);
          // 向数据库插入选择器条件属性
        selectorConditionDTOs.forEach(selectorConditionDTO -> {
            selectorConditionDTO.setSelectorId(selectorDO.getId());            selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));
        });
    }
    // 发布同步事件,向网关同步选择信息及其条件属性
    publishEvent(selectorDO, selectorConditionDTOs);
    return selectorDO.getId();
}
1.4.1.2 注册规则

在注册服务的第二步中,开始构建默认规则,然后注册规则。

@Override
    public String register(final MetaDataRegisterDTO dto) {
        //1. 注册选择器
        //......
        
        //2. 注册规则
        // 默认规则处理属性
        String ruleHandler = ruleHandler();
        // 构建默认规则信息
        RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);
        // 注册规则
        ruleService.registerDefault(ruleDTO);
        
        //3. 注册元数据
        //......
        
        //4. 注册ContextPath
        //......
        
        return ShenyuResultMessage.SUCCESS;
    }
  • 默认规则处理属性
    @Override
    protected String ruleHandler() {
        // 默认规则处理属性
        return new DivideRuleHandle().toJson();
    }

Divide插件默认规则处理属性


public class DivideRuleHandle implements RuleHandle {

    /**
     * 负载均衡:默认随机
     */
    private String loadBalance = LoadBalanceEnum.RANDOM.getName();

    /**
     * 重试策略:默认重试当前服务
     */
    private String retryStrategy = RetryEnum.CURRENT.getName();

    /**
     * 重试次数:默认3次
     */
    private int retry = 3;

    /**
     * 调用超时:默认 3000
     */
    private long timeout = Constants.TIME_OUT;

    /**
     * header最大值:10240
     */
    private long headerMaxSize = Constants.HEADER_MAX_SIZE;

    /**
     * request最大值:102400
     */
    private long requestMaxSize = Constants.REQUEST_MAX_SIZE;
}
  • 构建默认规则信息
  // 构建默认规则信息
    private RuleDTO buildRpcDefaultRuleDTO(final String selectorId, final MetaDataRegisterDTO metaDataDTO, final String ruleHandler) {
        return buildRuleDTO(selectorId, ruleHandler, metaDataDTO.getRuleName(), metaDataDTO.getPath());
    }
   //  构建默认规则信息
    private RuleDTO buildRuleDTO(final String selectorId, final String ruleHandler, final String ruleName, final String path) {
        RuleDTO ruleDTO = RuleDTO.builder()
                .selectorId(selectorId) //关联的选择器id
                .name(ruleName) //规则名称
                .matchMode(MatchModeEnum.AND.getCode()) // 默认匹配模式 and
                .enabled(Boolean.TRUE) // 默认开启
                .loged(Boolean.TRUE) //默认记录日志
                .sort(1) //默认顺序 1
                .handle(ruleHandler)
                .build();
        RuleConditionDTO ruleConditionDTO = RuleConditionDTO.builder()
                .paramType(ParamTypeEnum.URI.getName()) // 默认参数类型URI
                .paramName("/")
                .paramValue(path) //参数值path
                .build();
        if (path.indexOf("*") > 1) {
            ruleConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); //如果path中有*,操作类型则默认为 match
        } else {
            ruleConditionDTO.setOperator(OperatorEnum.EQ.getAlias()); // 否则,默认操作类型 = 
        }
        ruleDTO.setRuleConditions(Collections.singletonList(ruleConditionDTO));
        return ruleDTO;
    }
  • org.apache.shenyu.admin.service.impl.RuleServiceImpl#registerDefault()

注册规则:向数据库插入记录,并向网关发布事件,进行数据同步。


    @Override
    public String registerDefault(final RuleDTO ruleDTO) {
        RuleDO exist = ruleMapper.findBySelectorIdAndName(ruleDTO.getSelectorId(), ruleDTO.getName());
        if (Objects.nonNull(exist)) {
            return "";
        }

        RuleDO ruleDO = RuleDO.buildRuleDO(ruleDTO);
        List<RuleConditionDTO> ruleConditions = ruleDTO.getRuleConditions();
        if (StringUtils.isEmpty(ruleDTO.getId())) {
            // 向数据库插入规则信息
            ruleMapper.insertSelective(ruleDO);
            //向数据库插入规则体条件属性
            ruleConditions.forEach(ruleConditionDTO -> {
                ruleConditionDTO.setRuleId(ruleDO.getId());                ruleConditionMapper.insertSelective(RuleConditionDO.buildRuleConditionDO(ruleConditionDTO));
            });
        }
        // 向网关发布事件,进行数据同步
        publishEvent(ruleDO, ruleConditions);
        return ruleDO.getId();
    }

1.4.1.3 注册元数据
   @Override
    public String register(final MetaDataRegisterDTO dto) {
        //1. 注册选择器
        //......
        
        //2. 注册规则
        //......
        
        //3. 注册元数据
        registerMetadata(dto);
        
        //4. 注册ContextPath
        //......
        
        return ShenyuResultMessage.SUCCESS;
    }
  • org.apache.shenyu.admin.service.register.ShenyuClientRegisterDivideServiceImpl#registerMetadata()

    插入或更新元数据,然后发布同步事件到网关。


    @Override
    protected void registerMetadata(final MetaDataRegisterDTO dto) {
        if (dto.isRegisterMetaData()) { // 如果注册元数据
            // 获取metaDataService
            MetaDataService metaDataService = getMetaDataService();
            // 元数据是否存在
            MetaDataDO exist = metaDataService.findByPath(dto.getPath());
            // 插入或更新元数据
            metaDataService.saveOrUpdateMetaData(exist, dto);
        }
    }

    @Override
    public void saveOrUpdateMetaData(final MetaDataDO exist, final MetaDataRegisterDTO metaDataDTO) {
        DataEventTypeEnum eventType;
        // 数据类型转换 DTO->DO
        MetaDataDO metaDataDO = MetaDataTransfer.INSTANCE.mapRegisterDTOToEntity(metaDataDTO);
        // 插入数据
        if (Objects.isNull(exist)) {
            Timestamp currentTime = new Timestamp(System.currentTimeMillis());
            metaDataDO.setId(UUIDUtils.getInstance().generateShortUuid());
            metaDataDO.setDateCreated(currentTime);
            metaDataDO.setDateUpdated(currentTime);
            metaDataMapper.insert(metaDataDO);
            eventType = DataEventTypeEnum.CREATE;
        } else {
            // 更新数据
            metaDataDO.setId(exist.getId());
            metaDataMapper.update(metaDataDO);
            eventType = DataEventTypeEnum.UPDATE;
        }
        // 发布同步事件到网关
        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.META_DATA, eventType,
                Collections.singletonList(MetaDataTransfer.INSTANCE.mapToData(metaDataDO))));
    }
1.4.1.4 注册ContextPath
   @Override
    public String register(final MetaDataRegisterDTO dto) {
        //1. 注册选择器
        //......
        
        //2. 注册规则
        //......
        
        //3. 注册元数据
        //......
        
        //4. 注册ContextPath
        String contextPath = dto.getContextPath();
        if (StringUtils.isNotEmpty(contextPath)) {
            registerContextPath(dto);
        }
        return ShenyuResultMessage.SUCCESS;
    }
  • org.apache.shenyu.admin.service.register.AbstractContextPathRegisterService#registerContextPath()
    @Override
    public void registerContextPath(final MetaDataRegisterDTO dto) {
        // 设置选择器的contextPath
        String contextPathSelectorId = getSelectorService().registerDefault(dto, PluginEnum.CONTEXT_PATH.getName(), "");
        ContextMappingRuleHandle handle = new ContextMappingRuleHandle();
        handle.setContextPath(PathUtils.decoratorContextPath(dto.getContextPath()));
        // 设置规则的contextPath
        getRuleService().registerDefault(buildContextPathDefaultRuleDTO(contextPathSelectorId, dto, handle.toJson()));
    }
1.4.2 注册URI
  • org.apache.shenyu.admin.service.register.FallbackShenyuClientRegisterService#registerURI()

服务端收到客户端注册的URI信息后,进行处理。

    @Override
    public String registerURI(final String selectorName, final List<URIRegisterDTO> uriList) {
        String result;
        String key = key(selectorName);
        try {
            this.removeFallBack(key);
            // 注册URI
            result = this.doRegisterURI(selectorName, uriList);
            logger.info("Register success: {},{}", selectorName, uriList);
        } catch (Exception ex) {
            logger.warn("Register exception: cause:{}", ex.getMessage());
            result = "";
            // 注册失败后,进行重试
            this.addFallback(key, new FallbackHolder(selectorName, uriList));
        }
        return result;
    }
  • org.apache.shenyu.admin.service.register.AbstractShenyuClientRegisterServiceImpl#doRegisterURI()

从客户端注册的URI中获取有效的URI,更新对应的选择器handle属性,向网关发送选择器更新事件。

@Override
    public String doRegisterURI(final String selectorName, final List<URIRegisterDTO> uriList) {
        //参数检查
        if (CollectionUtils.isEmpty(uriList)) {
            return "";
        }
        //获取选择器信息
        SelectorDO selectorDO = selectorService.findByNameAndPluginName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));
        if (Objects.isNull(selectorDO)) {
            throw new ShenyuException("doRegister Failed to execute,wait to retry.");
        }
        // 获取有效的URI
        List<URIRegisterDTO> validUriList = uriList.stream().filter(dto -> Objects.nonNull(dto.getPort()) && StringUtils.isNotBlank(dto.getHost())).collect(Collectors.toList());
        // 构建选择器的handle属性
        String handler = buildHandle(validUriList, selectorDO);
        if (handler != null) {
            selectorDO.setHandle(handler);
            SelectorData selectorData = selectorService.buildByName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));
            selectorData.setHandle(handler);
            // 向数据库更新选择器的handle属性
            selectorService.updateSelective(selectorDO);
            // 向网关发送选择器更新事件
            eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData)));
        }
        return ShenyuResultMessage.SUCCESS;
    }

关于服务注册的源码分析就以及完成了,分析流程图如下:

接下来就分析divide插件是如何根据这些信息向http服务发起调用。

2. 服务调用

divide插件是网关用于处理 http协议请求的核心处理插件。

以官网提供的案例 Http快速开始 为例,一个直连请求如下:

GET http://localhost:8189/order/findById?id=100
Accept: application/json

通过ShenYu网关代理后,请求如下:

GET http://localhost:9195/http/order/findById?id=100
Accept: application/json

通过ShenYu网关代理后的服务仍然能够请求到之前的服务,在这里起作用的就是divide插件。类继承关系如下:

  • ShenyuPlugin:顶层接口,定义接口方法;
  • AbstractShenyuPlugin:抽象类,实现插件共有逻辑;
  • DividePlugin:Divide插件。

2.1 接收请求

通过ShenYu网关代理后,请求入口是ShenyuWebHandler,它实现了org.springframework.web.server.WebHandler接口。

public final class ShenyuWebHandler implements WebHandler, ApplicationListener<SortPluginEvent> {
    //......
    
    /**
     * 处理web请求
     */
    @Override
    public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {
       // 执行默认插件链
        Mono<Void> execute = new DefaultShenyuPluginChain(plugins).execute(exchange);
        if (scheduled) {
            return execute.subscribeOn(scheduler);
        }
        return execute;
    }
    
    private static class DefaultShenyuPluginChain implements ShenyuPluginChain {

        private int index;

        private final List<ShenyuPlugin> plugins;

        /**
         * 实例化默认插件链
         */
        DefaultShenyuPluginChain(final List<ShenyuPlugin> plugins) {
            this.plugins = plugins;
        }

        /**
         * 执行每个插件.
         */
        @Override
        public Mono<Void> execute(final ServerWebExchange exchange) {
            return Mono.defer(() -> {
                if (this.index < plugins.size()) {
                    // 获取当前执行插件
                    ShenyuPlugin plugin = plugins.get(this.index++);
                    // 是否跳过当前插件
                    boolean skip = plugin.skip(exchange);
                    if (skip) {
                        // 如果跳过就执行下一个
                        return this.execute(exchange);
                    }
                    // 执行当前插件
                    return plugin.execute(exchange, this);
                }
                return Mono.empty();
            });
        }
    }
}

2.2 匹配规则

  • org.apache.shenyu.plugin.base.AbstractShenyuPlugin#execute()

execute()方法中执行选择器和规则的匹配逻辑。

  • 匹配选择器;
  • 匹配规则;
  • 执行插件。
@Override
    public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
        // 插件名称
        String pluginName = named();
        // 插件信息
        PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
        if (pluginData != null && pluginData.getEnabled()) {
            // 选择器信息
            final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);
            if (CollectionUtils.isEmpty(selectors)) {
                return handleSelectorIfNull(pluginName, exchange, chain);
            }
            // 匹配选择器
            SelectorData selectorData = matchSelector(exchange, selectors);
            if (Objects.isNull(selectorData)) {
                return handleSelectorIfNull(pluginName, exchange, chain);
            }
            selectorLog(selectorData, pluginName);
            // 规则信息
            List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());
            if (CollectionUtils.isEmpty(rules)) {
                return handleRuleIfNull(pluginName, exchange, chain);
            }
            // 匹配规则
            RuleData rule;
            if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {
                //get last
                rule = rules.get(rules.size() - 1);
            } else {
                rule = matchRule(exchange, rules);
            }
            if (Objects.isNull(rule)) {
                return handleRuleIfNull(pluginName, exchange, chain);
            }
            ruleLog(rule, pluginName);
            // 执行插件
            return doExecute(exchange, chain, selectorData, rule);
        }
        return chain.execute(exchange);
    }

2.3 执行divide插件

  • org.apache.shenyu.plugin.divide.DividePlugin#doExecute()

doExecute()方法中执行divide插件的具体逻辑:

  • 校验header大小;
  • 校验request大小;
  • 获取服务列表;
  • 实现负载均衡;
  • 设置请求url,超时时间,重试策略。
@Override
    protected Mono<Void> doExecute(final ServerWebExchange exchange, final ShenyuPluginChain chain, final SelectorData selector, final RuleData rule) {
        // 获取上下文信息
        ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
        assert shenyuContext != null;
        // 获取规则的handle属性
        DivideRuleHandle ruleHandle = DividePluginDataHandler.CACHED_HANDLE.get().obtainHandle(CacheKeyUtils.INST.getKey(rule));
        long headerSize = 0;
        // 校验header大小
        for (List<String> multiHeader : exchange.getRequest().getHeaders().values()) {
            for (String value : multiHeader) {
                headerSize += value.getBytes(StandardCharsets.UTF_8).length;
            }
        }
        if (headerSize > ruleHandle.getHeaderMaxSize()) {
            LOG.error("request header is too large");
            Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.REQUEST_HEADER_TOO_LARGE, null);
            return WebFluxResultUtils.result(exchange, error);
        }
        
        // 校验request大小
        if (exchange.getRequest().getHeaders().getContentLength() > ruleHandle.getRequestMaxSize()) {
            LOG.error("request entity is too large");
            Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.REQUEST_ENTITY_TOO_LARGE, null);
            return WebFluxResultUtils.result(exchange, error);
        }
        // 获取服务列表upstreamList
        List<Upstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());
        if (CollectionUtils.isEmpty(upstreamList)) {
            LOG.error("divide upstream configuration error: {}", rule);
            Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL, null);
            return WebFluxResultUtils.result(exchange, error);
        }
        // 请求ip
        String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
        // 实现负载均衡
        Upstream upstream = LoadBalancerFactory.selector(upstreamList, ruleHandle.getLoadBalance(), ip);
        if (Objects.isNull(upstream)) {
            LOG.error("divide has no upstream");
            Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL, null);
            return WebFluxResultUtils.result(exchange, error);
        }
        // 设置url
        String domain = upstream.buildDomain();
        exchange.getAttributes().put(Constants.HTTP_DOMAIN, domain);
        // 设置超时时间
        exchange.getAttributes().put(Constants.HTTP_TIME_OUT, ruleHandle.getTimeout());
        exchange.getAttributes().put(Constants.HTTP_RETRY, ruleHandle.getRetry());
        // 设置重试策略
        exchange.getAttributes().put(Constants.RETRY_STRATEGY, ruleHandle.getRetryStrategy());
        exchange.getAttributes().put(Constants.LOAD_BALANCE, ruleHandle.getLoadBalance());
        exchange.getAttributes().put(Constants.DIVIDE_SELECTOR_ID, selector.getId());
        return chain.execute(exchange);
    }

2.4 发起请求

默认由WebClientPluginhttp服务发起调用请求,类继承关系如下:

  • ShenyuPlugin:顶层插件,定义插件方法;
  • AbstractHttpClientPlugin:抽象类,实现请求调用的公共逻辑;
  • WebClientPlugin:通过WebClient发起请求;
  • NettyHttpClientPlugin:通过Netty发起请求。

发起请求调用:

  • org.apache.shenyu.plugin.httpclient.AbstractHttpClientPlugin#execute()

execute()方法中发起请求调用:

  • 获取指定的超时时间,重试次数
  • 发起请求
  • 根据指定的重试策略进行失败后重试操作

public abstract class AbstractHttpClientPlugin<R> implements ShenyuPlugin {

    protected static final Logger LOG = LoggerFactory.getLogger(AbstractHttpClientPlugin.class);

    @Override
    public final Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
        // 获取上下文信息
        final ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
        assert shenyuContext != null;
        // 获取uri
        final URI uri = exchange.getAttribute(Constants.HTTP_URI);
        if (Objects.isNull(uri)) {
            Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.CANNOT_FIND_URL, null);
            return WebFluxResultUtils.result(exchange, error);
        }
        // 获取指定的超时时间
        final long timeout = (long) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_TIME_OUT)).orElse(3000L);
        final Duration duration = Duration.ofMillis(timeout);
        // 获取指定重试次数
        final int retryTimes = (int) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_RETRY)).orElse(0);
        // 获取指定的重试策略
        final String retryStrategy = (String) Optional.ofNullable(exchange.getAttribute(Constants.RETRY_STRATEGY)).orElseGet(RetryEnum.CURRENT::getName);
        LOG.info("The request urlPath is {}, retryTimes is {}, retryStrategy is {}", uri.toASCIIString(), retryTimes, retryStrategy);
        // 构建header
        final HttpHeaders httpHeaders = buildHttpHeaders(exchange);
        // 发起请求
        final Mono<R> response = doRequest(exchange, exchange.getRequest().getMethodValue(), uri, httpHeaders, exchange.getRequest().getBody())
                .timeout(duration, Mono.error(new TimeoutException("Response took longer than timeout: " + duration)))
                .doOnError(e -> LOG.error(e.getMessage(), e));
        
        // 重试策略CURRENT,对当前服务进行重试
        if (RetryEnum.CURRENT.getName().equals(retryStrategy)) {
            //old version of DividePlugin and SpringCloudPlugin will run on this
            return response.retryWhen(Retry.anyOf(TimeoutException.class, ConnectTimeoutException.class, ReadTimeoutException.class, IllegalStateException.class)
                    .retryMax(retryTimes)
                    .backoff(Backoff.exponential(Duration.ofMillis(200), Duration.ofSeconds(20), 2, true)))
                    .onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th))
                    .flatMap((Function<Object, Mono<? extends Void>>) o -> chain.execute(exchange));
        }
        
        // 对其他服务进行重试
        // 排除已经调用过的服务
        final Set<URI> exclude = Sets.newHashSet(uri);
        // 请求重试
        return resend(response, exchange, duration, httpHeaders, exclude, retryTimes)
                .onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th))
                .flatMap((Function<Object, Mono<? extends Void>>) o -> chain.execute(exchange));
    }

    private Mono<R> resend(final Mono<R> clientResponse,
                           final ServerWebExchange exchange,
                           final Duration duration,
                           final HttpHeaders httpHeaders,
                           final Set<URI> exclude,
                           final int retryTimes) {
        Mono<R> result = clientResponse;
        // 根据指定的重试次数进行重试
        for (int i = 0; i < retryTimes; i++) {
            result = resend(result, exchange, duration, httpHeaders, exclude);
        }
        return result;
    }

    private Mono<R> resend(final Mono<R> response,
                           final ServerWebExchange exchange,
                           final Duration duration,
                           final HttpHeaders httpHeaders,
                           final Set<URI> exclude) {
        return response.onErrorResume(th -> {
            final String selectorId = exchange.getAttribute(Constants.DIVIDE_SELECTOR_ID);
            final String loadBalance = exchange.getAttribute(Constants.LOAD_BALANCE);
            //查询可用服务
            final List<Upstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selectorId)
                    .stream().filter(data -> {
                        final String trimUri = data.getUrl().trim();
                        for (URI needToExclude : exclude) {
                            // exclude already called
                            if ((needToExclude.getHost() + ":" + needToExclude.getPort()).equals(trimUri)) {
                                return false;
                            }
                        }
                        return true;
                    }).collect(Collectors.toList());
            if (CollectionUtils.isEmpty(upstreamList)) {
                // no need to retry anymore
                return Mono.error(new ShenyuException(ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL_AFTER_FAILOVER.getMsg()));
            }
            // 请求ip
            final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
            // 实现负载均衡
            final Upstream upstream = LoadBalancerFactory.selector(upstreamList, loadBalance, ip);
            if (Objects.isNull(upstream)) {
                // no need to retry anymore
                return Mono.error(new ShenyuException(ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL_AFTER_FAILOVER.getMsg()));
            }
            final URI newUri = RequestUrlUtils.buildRequestUri(exchange, upstream.buildDomain());
            // 排除已经调用的uri
            exclude.add(newUri);
             // 进行再次调用
            return doRequest(exchange, exchange.getRequest().getMethodValue(), newUri, httpHeaders, exchange.getRequest().getBody())
                    .timeout(duration, Mono.error(new TimeoutException("Response took longer than timeout: " + duration)))
                    .doOnError(e -> LOG.error(e.getMessage(), e));
        });
    }

    //......
}

  • org.apache.shenyu.plugin.httpclient.WebClientPlugin#doRequest()

doRequest()方法中通过webClient发起真正的请求调用。


@Override
    protected Mono<ClientResponse> doRequest(final ServerWebExchange exchange, final String httpMethod, final URI uri,
                                             final HttpHeaders httpHeaders, final Flux<DataBuffer> body) {
        return webClient.method(HttpMethod.valueOf(httpMethod)).uri(uri) //请求uri
                .headers(headers -> headers.addAll(httpHeaders)) // 请求header
                .body(BodyInserters.fromDataBuffers(body))
                .exchange() // 发起请求
                .doOnSuccess(res -> {
                    if (res.statusCode().is2xxSuccessful()) { // 成功
                        exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName());
                    } else { // 失败
                        exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.ERROR.getName());
                    }
                    exchange.getResponse().setStatusCode(res.statusCode());
                    exchange.getAttributes().put(Constants.CLIENT_RESPONSE_ATTR, res);
                });
    }

2.5 处理响应结果

  • org.apache.shenyu.plugin.response.ResponsePlugin#execute()

响应结果由ResponsePlugin插件处理。

    @Override
    public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
        ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
        assert shenyuContext != null;
        // 根据rpc类型处理结果
        return writerMap.get(shenyuContext.getRpcType()).writeWith(exchange, chain);
    }

处理类型由MessageWriter决定,类继承关系如下:

  • MessageWriter:接口,定义消息处理方法;
  • NettyClientMessageWriter:处理Netty调用结果;
  • RPCMessageWriter:处理RPC调用结果;
  • WebClientMessageWriter:处理WebClient调用结果;

默认是通过WebCient发起http请求。

  • org.apache.shenyu.plugin.response.strategy.WebClientMessageWriter#writeWith()

writeWith()方法中处理响应结果。


    @Override
    public Mono<Void> writeWith(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
        return chain.execute(exchange).then(Mono.defer(() -> {
            // 获取响应
            ServerHttpResponse response = exchange.getResponse();
            ClientResponse clientResponse = exchange.getAttribute(Constants.CLIENT_RESPONSE_ATTR);
            if (Objects.isNull(clientResponse)) {
                Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.SERVICE_RESULT_ERROR, null);
                return WebFluxResultUtils.result(exchange, error);
            }
            //获取cookies和headers
            response.getCookies().putAll(clientResponse.cookies());
            response.getHeaders().putAll(clientResponse.headers().asHttpHeaders());
            // image, pdf or stream does not do format processing.
            // 处理特殊响应类型
            if (clientResponse.headers().contentType().isPresent()) {
                final String media = clientResponse.headers().contentType().get().toString().toLowerCase();
                if (media.matches(COMMON_BIN_MEDIA_TYPE_REGEX)) {
                    return response.writeWith(clientResponse.body(BodyExtractors.toDataBuffers()))
                            .doOnCancel(() -> clean(exchange));
                }
            }
            // 处理一般响应类型
            clientResponse = ResponseUtils.buildClientResponse(response, clientResponse.body(BodyExtractors.toDataBuffers()));
            return clientResponse.bodyToMono(byte[].class)
                    .flatMap(originData -> WebFluxResultUtils.result(exchange, originData))
                    .doOnCancel(() -> clean(exchange));
        }));
    }

分析至此,关于Divide插件的源码分析就完成了,分析流程图如下:

3. 小结

本文源码分析从http服务注册开始,到divide插件的服务调用。divide插件主要用来处理http请求。有些源码没有进入深入分析,比如负载均衡的实现,服务探活,将在后续继续分析。