Apache ShenYu源码阅读系列-注册中心实现原理之Http注册

Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API 网关。

ShenYu网关中,注册中心是用于将客户端信息注册到shenyu-adminadmin再通过数据同步将这些信息同步到网关,网关通过这些数据完成流量筛选。客户端信息主要包括接口信息URI信息

本文基于shenyu-2.4.1版本进行源码分析,官网的介绍请参考 客户端接入原理

1. 注册中心原理

当客户端启动时,读取接口信息和uri信息,通过指定的注册类型,将数据发送到shenyu-admin

图中的注册中心需要用户指定使用哪种注册类型,ShenYu当前支持HttpZookeeperEtcdConsulNacos进行注册。具体如何配置请参考 客户端接入配置

ShenYu在注册中心的原理设计上引入了DisruptorDisruptor队列在其中起到数据与操作解耦,利于扩展。如果注册请求过多,导致注册异常,也有数据缓冲作用。

如图所示,注册中心分为两个部分,一是注册中心客户端register-client,负载处理客户端数据读取。另一个是注册中心服务端register-server,负载处理服务端(就是shenyu-admin)数据写入。通过指定注册类型进行数据发送和接收。

  • 客户端:通常来说就是一个微服务,可以是springmvcspring-clouddubbogrpc等。
  • register-client:注册中心客户端,读取客户接口和uri信息。
  • Disruptor:数据与操作解耦,数据缓冲作用。
  • register-server:注册中心服务端,这里就是shenyu-admin,接收数据,写入数据库,发数据同步事件。
  • 注册类型:指定注册类型,完成数据注册,当前支持HttpZookeeperEtcdConsulNacos

本文分析的是使用Http的方式进行注册,所以具体的处理流程如下:

在客户端,数据出队列后,通过http传输数据,在服务端,提供相应的接口,接收数据,然后写入队列。

2. 客户端注册流程

当客户端启动后,根据相关配置,读取属性信息,然后写入队列。以官方提供的 shenyu-examples-http 为例,开始源码分析。官方提供的例子是一个由springboot构建的微服务。注册中心的相关配置可以参考官网 客户端接入配置

2.1 加载配置,读取属性

先用一张图串联下注册中心客户端初始化流程:

我们分析的是通过http的方式进行注册,所以需要进行如下配置:

shenyu:
  register:
    registerType: http
    serverLists: http://localhost:9095
  client:
    http:
        props:
          contextPath: /http
          appName: http
          port: 8189  
          isFull: false

每个属性表示的含义如下:

  • registerType : 服务注册类型,填写 http
  • serverList: 为http注册类型时,填写Shenyu-Admin项目的地址,注意加上http://,多个地址用英文逗号分隔。
  • port: 你本项目的启动端口,目前springmvc/tars/grpc需要进行填写。
  • contextPath: 为你的这个mvc项目在shenyu网关的路由前缀, 比如/order/product 等等,网关会根据你的这个前缀来进行路由。
  • appName:你的应用名称,不配置的话,会默认取 spring.application.name 的值。
  • isFull: 设置 true 代表代理你的整个服务,false表示代理你其中某几个controller;目前适用于springmvc/springcloud

项目启动后,会先加载配置文件,读取属性信息,生成相应的Bean

首先读取到的配置文件是 ShenyuSpringMvcClientConfiguration,它是shenyu 客户端http注册配置类,通过@Configuration表示这是一个配置类,通过@ImportAutoConfiguration引入其他配置类。创建SpringMvcClientBeanPostProcessor,主要处理元数据。创建ContextRegisterListener,主要处理 URI 信息。

/**
 * shenyu 客户端http注册配置类
 */
@Configuration
@ImportAutoConfiguration(ShenyuClientCommonBeanConfiguration.class)
public class ShenyuSpringMvcClientConfiguration {

    //创建SpringMvcClientBeanPostProcessor,主要处理元数据
    @Bean
    public SpringMvcClientBeanPostProcessor springHttpClientBeanPostProcessor(final ShenyuClientConfig clientConfig,final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
        return new SpringMvcClientBeanPostProcessor(clientConfig.getClient().get(RpcTypeEnum.HTTP.getName()), shenyuClientRegisterRepository);
    }
    
   // 创建ContextRegisterListener,主要处理 URI信息
    @Bean
    public ContextRegisterListener contextRegisterListener(final ShenyuClientConfig clientConfig) {
        return new ContextRegisterListener(clientConfig.getClient().get(RpcTypeEnum.HTTP.getName()));
    }
}

ShenyuClientCommonBeanConfigurationshenyu客户端通用配置类,会创建注册中心客户端通用的bean

  • 创建ShenyuClientRegisterRepository,通过工厂类创建而成。
  • 创建ShenyuRegisterCenterConfig,读取shenyu.register属性配置。
  • 创建ShenyuClientConfig,读取shenyu.client属性配置。

/**
 * shenyu客户端通用配置类
 */
@Configuration
public class ShenyuClientCommonBeanConfiguration {
    
   // 创建ShenyuClientRegisterRepository,通过工厂类创建而成。
    @Bean
    public ShenyuClientRegisterRepository shenyuClientRegisterRepository(final ShenyuRegisterCenterConfig config) {
        return ShenyuClientRegisterRepositoryFactory.newInstance(config);
    }
    
	// 创建ShenyuRegisterCenterConfig,读取shenyu.register属性配置
    @Bean
    @ConfigurationProperties(prefix = "shenyu.register")
    public ShenyuRegisterCenterConfig shenyuRegisterCenterConfig() {
        return new ShenyuRegisterCenterConfig();
    }
    
  // 创建ShenyuClientConfig,读取shenyu.client属性配置
    @Bean
    @ConfigurationProperties(prefix = "shenyu")
    public ShenyuClientConfig shenyuClientConfig() {
        return new ShenyuClientConfig();
    }
}

2.2 用于注册的 HttpClientRegisterRepository

上面的配置文件中生成的ShenyuClientRegisterRepository是客户端注册的具体实现,它是一个接口,它的实现类如下。

  • HttpClientRegisterRepository:通过http进行注册;
  • ConsulClientRegisterRepository:通过Consul进行注册;
  • EtcdClientRegisterRepository:通过Etcd进行注册;
  • NacosClientRegisterRepository:通过nacos进行注册;
  • ZookeeperClientRegisterRepository通过Zookeeper进行注册。

具体是哪一种方式,是通过SPI进行加载实现的,实现逻辑如下:


/**
 * 加载 ShenyuClientRegisterRepository
 */
public final class ShenyuClientRegisterRepositoryFactory {
    
    private static final Map<String, ShenyuClientRegisterRepository> REPOSITORY_MAP = new ConcurrentHashMap<>();
    
    /**
     * 创建 ShenyuClientRegisterRepository
     */
    public static ShenyuClientRegisterRepository newInstance(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig) {
        if (!REPOSITORY_MAP.containsKey(shenyuRegisterCenterConfig.getRegisterType())) {
            // 通过SPI的方式进行加载,类型由registerType决定
            ShenyuClientRegisterRepository result = ExtensionLoader.getExtensionLoader(ShenyuClientRegisterRepository.class).getJoin(shenyuRegisterCenterConfig.getRegisterType());
            //执行初始化操作
            result.init(shenyuRegisterCenterConfig);
            ShenyuClientShutdownHook.set(result, shenyuRegisterCenterConfig.getProps());
            REPOSITORY_MAP.put(shenyuRegisterCenterConfig.getRegisterType(), result);
            return result;
        }
        return REPOSITORY_MAP.get(shenyuRegisterCenterConfig.getRegisterType());
    }
}

加载类型通过registerType指定,也就是我们在配置文件中指定的类型:

shenyu:
  register:
    registerType: http
    serverLists: http://localhost:9095

我们指定的是http,所以会去加载HttpClientRegisterRepository。对象创建成功后,执行的初始化方法init()如下:

@Join
public class HttpClientRegisterRepository implements ShenyuClientRegisterRepository {
    
    @Override
    public void init(final ShenyuRegisterCenterConfig config) {
        this.serverList = Lists.newArrayList(Splitter.on(",").split(config.getServerLists()));
    }
  
  // 暂时省略其他逻辑
}

读取配置文件中的serverLists,即sheenyu-admin的地址,为后续数据发送做准备。类注解@Join用于SPI的加载。

SPI 全称为 Service Provider Interface, 是 JDK 内置的一种服务提供发现功能, 一种动态替换发现的机制。

shenyu-spiApache ShenYu网关自定义的SPI扩展实现,设计和实现原理参考了DubboSPI扩展实现

2.3 构建元数据的 SpringMvcClientBeanPostProcessor

创建SpringMvcClientBeanPostProcessor,负责元数据的构建和注册,它的构造函数逻辑如下:


/**
 *  spring mvc 客户端bean的后置处理器
 */
public class SpringMvcClientBeanPostProcessor implements BeanPostProcessor {

    /**
     * 通过构造函数进行实例化
     */
    public SpringMvcClientBeanPostProcessor(final PropertiesConfig clientConfig,
                                            final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
        // 读取配置属性
        Properties props = clientConfig.getProps();
        // 获取端口信息,并校验
        int port = Integer.parseInt(props.getProperty(ShenyuClientConstants.PORT));
        if (port <= 0) {
            String errorMsg = "http register param must config the port must > 0";
            LOG.error(errorMsg);
            throw new ShenyuClientIllegalArgumentException(errorMsg);
        }
        // 获取appName
        this.appName = props.getProperty(ShenyuClientConstants.APP_NAME);
        // 获取contextPath
        this.contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH);
        // 校验appName和contextPath
        if (StringUtils.isBlank(appName) && StringUtils.isBlank(contextPath)) {
            String errorMsg = "http register param must config the appName or contextPath";
            LOG.error(errorMsg);
            throw new ShenyuClientIllegalArgumentException(errorMsg);
        }
        // 获取 isFull
        this.isFull = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString()));
        // 开始事件发布
        publisher.start(shenyuClientRegisterRepository);
    }

    // 暂时省略了其他逻辑
    @Override
    public Object postProcessAfterInitialization(@NonNull final Object bean, @NonNull final String beanName) throws BeansException {
     // 暂时省略了其他逻辑
    }

}

在构造函数中,主要是读取属性信息,然后进行校验。

shenyu:
  client:
    http:
        props:
          contextPath: /http
          appName: http
          port: 8189  
          isFull: false

最后,执行了 publisher.start(),开始事件发布,为注册做准备。

  • ShenyuClientRegisterEventPublisher

ShenyuClientRegisterEventPublisher通过单例模式实现,主要是生成元数据URI订阅器(后续用于数据发布),然后启动Disruptor队列。提供了一个共有方法publishEvent(),发布事件,向Disruptor队列发数据。


public class ShenyuClientRegisterEventPublisher {
    // 私有变量
    private static final ShenyuClientRegisterEventPublisher INSTANCE = new ShenyuClientRegisterEventPublisher();
    
    private DisruptorProviderManage providerManage;
    
    private RegisterClientExecutorFactory factory;
    
    /**
     * 公开静态方法
     *
     * @return ShenyuClientRegisterEventPublisher instance
     */
    public static ShenyuClientRegisterEventPublisher getInstance() {
        return INSTANCE;
    }
    
    /**
     * Start方法执行
     *
     * @param shenyuClientRegisterRepository shenyuClientRegisterRepository
     */
    public void start(final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
        // 创建客户端注册工厂类
        factory = new RegisterClientExecutorFactory();
        // 添加元数据订阅器
        factory.addSubscribers(new ShenyuClientMetadataExecutorSubscriber(shenyuClientRegisterRepository));
        //  添加URI订阅器
        factory.addSubscribers(new ShenyuClientURIExecutorSubscriber(shenyuClientRegisterRepository));
        // 启动Disruptor队列
        providerManage = new DisruptorProviderManage(factory);
        providerManage.startup();
    }
    
    /**
     * 发布事件,向Disruptor队列发数据
     *
     * @param <T> the type parameter
     * @param data the data
     */
    public <T> void publishEvent(final T data) {
        DisruptorProvider<Object> provider = providerManage.getProvider();
        provider.onData(f -> f.setData(data));
    }
}

SpringMvcClientBeanPostProcessor的构造函数逻辑分析完了,主要是读取属性配置,创建元数据和URI订阅器, 启动Disruptor队列。要注意到它实现了BeanPostProcessor,这是Spring提供的一个接口,在Bean的生命周期中,真正开始使用之前,会执行后置处理器的postProcessAfterInitialization()方法。

  • postProcessAfterInitialization() 方法

SpringMvcClientBeanPostProcessor作为一个后置处理器,它的功能是:读取注解中的元数据,并向admin注册。

// 后置处理器
public class SpringMvcClientBeanPostProcessor implements BeanPostProcessor {
   // 省略了其他逻辑
    
    // 后置处理器:读取注解中的元数据,并向admin注册
    @Override
    public Object postProcessAfterInitialization(@NonNull final Object bean, @NonNull final String beanName) throws BeansException {
        // 配置属性,如果 isFull=true 的话,表示注册整个微服务
        if (isFull) {
            return bean;
        }
        // 获取当前bean的Controller注解
        Controller controller = AnnotationUtils.findAnnotation(bean.getClass(), Controller.class);
         // 获取当前bean的RequestMapping注解
        RequestMapping requestMapping = AnnotationUtils.findAnnotation(bean.getClass(), RequestMapping.class);
        // 如果这个bean是一个接口
        if (controller != null || requestMapping != null) {
               // 获取当前bean的 ShenyuSpringMvcClient 注解
            ShenyuSpringMvcClient clazzAnnotation = AnnotationUtils.findAnnotation(bean.getClass(), ShenyuSpringMvcClient.class);
            String prePath = "";
            //如果没有 ShenyuSpringMvcClient 注解,就返回,表示这个接口不需要注册
            if (Objects.isNull(clazzAnnotation)) {
                return bean;
            }
             //如果 ShenyuSpringMvcClient 注解中的path属性包括 * ,表示注册整个接口
            if (clazzAnnotation.path().indexOf("*") > 1) {
                // 构建元数据,发送注册事件
                publisher.publishEvent(buildMetaDataDTO(clazzAnnotation, prePath));
                return bean;
            }
            
            prePath = clazzAnnotation.path();
            // 获取当前bean的所有方法
            final Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(bean.getClass());
            // 遍历方法
            for (Method method : methods) {
                // 获取当前方法上的注解 ShenyuSpringMvcClient
                ShenyuSpringMvcClient shenyuSpringMvcClient = AnnotationUtils.findAnnotation(method, ShenyuSpringMvcClient.class);
                // 如果方法上有注解ShenyuSpringMvcClient,就表示该方法需要注册
                if (Objects.nonNull(shenyuSpringMvcClient)) {
                    // 构建元数据,发送注册事件
                    publisher.publishEvent(buildMetaDataDTO(shenyuSpringMvcClient, prePath));
                }
            }
        }
        return bean;
    }

    // 构造元数据
    private MetaDataRegisterDTO buildMetaDataDTO(final ShenyuSpringMvcClient shenyuSpringMvcClient, final String prePath) {
        // contextPath上下文名称
        String contextPath = this.contextPath;
        // appName应用名称
        String appName = this.appName;
        // path注册路径
        String path;
        if (StringUtils.isEmpty(contextPath)) {
            path = prePath + shenyuSpringMvcClient.path();
        } else {
            path = contextPath + prePath + shenyuSpringMvcClient.path();
        }
        // desc描述信息
        String desc = shenyuSpringMvcClient.desc();
        // ruleName规则名称,没有填写的话就和path一致
        String configRuleName = shenyuSpringMvcClient.ruleName();
        String ruleName = StringUtils.isBlank(configRuleName) ? path : configRuleName;
        // 构建元数据
        return MetaDataRegisterDTO.builder()
                .contextPath(contextPath)
                .appName(appName)
                .path(path)
                .pathDesc(desc)
                .rpcType(RpcTypeEnum.HTTP.getName())
                .enabled(shenyuSpringMvcClient.enabled())
                .ruleName(ruleName)
                .registerMetaData(shenyuSpringMvcClient.registerMetaData())
                .build();
    }
}

在后置处理器中,需要读取配置属性,如果 isFull=true 的话,表示注册整个微服务。获取当前beanController注解、RequestMapping注解、ShenyuSpringMvcClient 注解,通过读取这些注解信息判断当前bean是否是接口?接口是否需要注册?方法是否需要注册?然后根据ShenyuSpringMvcClient 注解中的属性构建元数据,最后通过publisher.publishEvent()发布事件进行注册。

Controller注解和RequestMapping注解是由Spring提供的,这个大家应该很熟悉,不过多赘述。ShenyuSpringMvcClient 注解是由Apache ShenYu提供的,用于注册SpringMvc客户端,它的定义如下:


/**
 * shenyu 客户端接口,用于方法上或类上
 */
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface ShenyuSpringMvcClient {
    // path 注册路径
    String path();
    
    // ruleName 规则名称
    String ruleName() default "";
    
    // desc 描述信息
    String desc() default "";

    // enabled是否启用
    boolean enabled() default true;
    
    // registerMetaData 注册元数据
    boolean  registerMetaData() default false;
}

它的使用如下:

  • 注册整个接口
@RestController
@RequestMapping("/test")
@ShenyuSpringMvcClient(path = "/test/**")  // 表示整个接口注册
public class HttpTestController {
 //......
}
  • 注册当前方法
@RestController
@RequestMapping("/order")
@ShenyuSpringMvcClient(path = "/order")
public class OrderController {

    /**
     * Save order dto.
     *
     * @param orderDTO the order dto
     * @return the order dto
     */
    @PostMapping("/save")
    @ShenyuSpringMvcClient(path = "/save", desc = "Save order") // 注册当前方法
    public OrderDTO save(@RequestBody final OrderDTO orderDTO) {
        orderDTO.setName("hello world save order");
        return orderDTO;
    }
  • publisher.publishEvent() 发布注册事件

该方法会将数据发送到Disruptor队列中,关于Disruptor队列更多细节这里不做更多介绍,这不影响分析注册的流程。

当数据发送后,Disruptor队列的消费者会处理数据,进行消费。

  • QueueConsumer 消费数据

QueueConsumer是一个消费者,它实现了WorkHandler接口,它的创建过程在providerManage.startup()逻辑中。WorkHandler接口是disruptor的数据消费接口,只有一个方法是onEvent()

package com.lmax.disruptor;

public interface WorkHandler<T> {
    void onEvent(T var1) throws Exception;
}

QueueConsumer重写了onEvent()方法,主要逻辑是生成消费任务,然后在线程池中去执行。


/**
 * 
 * 队列消费者
 */
public class QueueConsumer<T> implements WorkHandler<DataEvent<T>> {
    
	// 省略了其他逻辑

    @Override
    public void onEvent(final DataEvent<T> t) {
        if (t != null) {
            // 通过工厂创建队列消费任务
            QueueConsumerExecutor<T> queueConsumerExecutor = factory.create();
            // 保存数据
            queueConsumerExecutor.setData(t.getData());
            // help gc
            t.setData(null);
            // 放在线程池中执行 消费任务
            executor.execute(queueConsumerExecutor);
        }
    }
}

QueueConsumerExecutor是在线程池中被执行的任务,它实现了Runnable接口,具体的实现类有两个:

  • RegisterClientConsumerExecutor:客户端消费者执行器;
  • RegisterServerConsumerExecutor:服务端消费者执行器。

顾名思义,一个负责处理客户端任务,一个负责处理服务端任务(服务端就是admin,在下文进行分析)。

  • RegisterClientConsumerExecutor 消费者执行器

重写的run()逻辑如下:


public final class RegisterClientConsumerExecutor extends QueueConsumerExecutor<DataTypeParent> {
    
	//...... 

    @Override
    public void run() {
        // 获取数据
        DataTypeParent dataTypeParent = getData();
        // 根据数据类型调用相应的处理器进行处理
        subscribers.get(dataTypeParent.getType()).executor(Lists.newArrayList(dataTypeParent));
    }
    
}

根据不同的数据类型调用不同的处理器去执行相应的任务。数据类型有两种,一个是元数据,记录客户端注册信息。一个是URI数据,记录客户端服务信息。

//数据类型
public enum DataType {
   // 元数据
    META_DATA,
    
   // URI数据
    URI,
}
  • ExecutorSubscriber#executor() 执行器订阅者

执行器订阅者也分为两类,一个是处理元数据,一个是处理URI。在客户端和服务端分别有两个,所以一共是四个。

  • ShenyuClientMetadataExecutorSubscriber#executor()

客户端这边对元数据处理逻辑是:遍历元数据信息,调用接口方法persistInterface()完成数据的发布。

public class ShenyuClientMetadataExecutorSubscriber implements ExecutorTypeSubscriber<MetaDataRegisterDTO> {
   
    //......
    
    @Override
    public DataType getType() {
        return DataType.META_DATA; // 元数据
    }
    
    @Override
    public void executor(final Collection<MetaDataRegisterDTO> metaDataRegisterDTOList) {
        for (MetaDataRegisterDTO metaDataRegisterDTO : metaDataRegisterDTOList) {
            // 调用接口方法persistInterface()完成数据的发布
            shenyuClientRegisterRepository.persistInterface(metaDataRegisterDTO);
        }
    }
}
  • ShenyuClientRegisterRepository#persistInterface()

ShenyuClientRegisterRepository是一个接口,用于表示客户端数据注册,它的实现类目前有五种,每一种就表示一种注册方法。

  • ConsulClientRegisterRepository:通过Consul实现客户端注册;
  • EtcdClientRegisterRepository:通过Etcd实现客户端注册;
  • HttpClientRegisterRepository:通过Http实现客户端注册;
  • NacosClientRegisterRepository:通过Nacos实现客户端注册;
  • ZookeeperClientRegisterRepository:通过Zookeeper实现客户端注册;

从图中可以看出,注册中心的加载是通过SPI的方式完成的。这个在前面提到过了,在客户端通用配置文件中,通过指定配置文件中的属性完成具体的类加载。


/**
 * 加载 ShenyuClientRegisterRepository
 */
public final class ShenyuClientRegisterRepositoryFactory {
    
    private static final Map<String, ShenyuClientRegisterRepository> REPOSITORY_MAP = new ConcurrentHashMap<>();
    
    /**
     * 创建 ShenyuClientRegisterRepository
     */
    public static ShenyuClientRegisterRepository newInstance(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig) {
        if (!REPOSITORY_MAP.containsKey(shenyuRegisterCenterConfig.getRegisterType())) {
            // 通过SPI的方式进行加载,类型由registerType决定
            ShenyuClientRegisterRepository result = ExtensionLoader.getExtensionLoader(ShenyuClientRegisterRepository.class).getJoin(shenyuRegisterCenterConfig.getRegisterType());
            //执行初始化操作
            result.init(shenyuRegisterCenterConfig);
            ShenyuClientShutdownHook.set(result, shenyuRegisterCenterConfig.getProps());
            REPOSITORY_MAP.put(shenyuRegisterCenterConfig.getRegisterType(), result);
            return result;
        }
        return REPOSITORY_MAP.get(shenyuRegisterCenterConfig.getRegisterType());
    }
}

本文的源码分析是基于Http的方式进行注册,所以我们先分析HttpClientRegisterRepository,其他的注册方式后续再分析。

通过http的方式注册很简单,就是调用工具类发送http请求。注册元数据和URI都是调用的同一个方法doRegister(),指定接口和类型就好。

  • /shenyu-client/register-metadata:服务端提供的接口用于注册元数据。
  • /shenyu-client/register-uri: 服务端提供的接口用于注册URI。
@Join
public class HttpClientRegisterRepository implements ShenyuClientRegisterRepository {
    // 服务端提供的接口用于注册元数据    
    private static final String META_PATH = "/shenyu-client/register-metadata";

    // 服务端提供的接口用于注册URI
    private static final String URI_PATH = "/shenyu-client/register-uri";

    //注册URI
    @Override
    public void persistURI(final URIRegisterDTO registerDTO) {
        doRegister(registerDTO, URI_PATH, Constants.URI);
    }
    
    //注册接口(就是元数据信息)
    @Override
    public void persistInterface(final MetaDataRegisterDTO metadata) {
        doRegister(metadata, META_PATH, META_TYPE);
    }
    
    // 进行注册
    private <T> void doRegister(final T t, final String path, final String type) {
        // 遍历admin服务列表(admin可能是集群)
        for (String server : serverList) {
            try {
                // 调用工具类发送 http 请求
                RegisterUtils.doRegister(GsonUtils.getInstance().toJson(t), server + path, type);
                return;
            } catch (Exception e) {
                LOGGER.error("register admin url :{} is fail, will retry", server);
            }
        }
    }
}

将数据序列化后,通过OkHttp发送数据。


public final class RegisterUtils {
   
   //...... 

    // 通过OkHttp发送数据
    public static void doRegister(final String json, final String url, final String type) throws IOException {
        String result = OkHttpTools.getInstance().post(url, json);
        if (Objects.equals(SUCCESS, result)) {
            LOGGER.info("{} client register success: {} ", type, json);
        } else {
            LOGGER.error("{} client register error: {} ", type, json);
        }
    }
}

至此,客户端通过http的方式注册元数据的逻辑就分析完了。小结一下:通过读取自定义的注解信息构造元数据,将数据发到Disruptor队列,然后从队列中消费数据,将消费者放到线程池中去执行,最终通过发送http请求到admin

客户端元数据注册流程的源码分析过程完成了,用流程图描述如下:

2.4 构建 URI 的 ContextRegisterListener

创建 ContextRegisterListener,负责客户端URI数据的构建和注册,它的创建是在配置文件中完成。

@Configuration
@ImportAutoConfiguration(ShenyuClientCommonBeanConfiguration.class)
public class ShenyuSpringMvcClientConfiguration {
     // ......
    
    //  创建 ContextRegisterListener
    @Bean
    public ContextRegisterListener contextRegisterListener(final ShenyuClientConfig clientConfig) {
        return new ContextRegisterListener(clientConfig.getClient().get(RpcTypeEnum.HTTP.getName()));
    }
}

ContextRegisterListener实现了ApplicationListener接口,并重写了onApplicationEvent()方法,当有Spring事件发生后,该方法会执行。

// 实现了ApplicationListener接口
public class ContextRegisterListener implements ApplicationListener<ContextRefreshedEvent> {

     //......

    //通过构造函数完成实例化
    public ContextRegisterListener(final PropertiesConfig clientConfig) {
        // 读取 shenyu.client.http 配置信息
        Properties props = clientConfig.getProps();
        // isFull是否注册整个服务
        this.isFull = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString()));
        // contextPath上下文路径
        String contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH);
        this.contextPath = contextPath;
        if (isFull) {
            if (StringUtils.isBlank(contextPath)) {
                String errorMsg = "http register param must config the contextPath";
                LOG.error(errorMsg);
                throw new ShenyuClientIllegalArgumentException(errorMsg);
            }
            this.contextPath = contextPath + "/**";
        }
        // port 客户端端口信息
        int port = Integer.parseInt(props.getProperty(ShenyuClientConstants.PORT));
        // appName 应用名称
        this.appName = props.getProperty(ShenyuClientConstants.APP_NAME);
        // host信息
        this.host = props.getProperty(ShenyuClientConstants.HOST);
        this.port = port;
    }

    // 当有上下文刷新事件ContextRefreshedEvent发生时,该方法会执行
    @Override
    public void onApplicationEvent(@NonNull final ContextRefreshedEvent contextRefreshedEvent) {
        //保证该方法的内容只执行一次
        if (!registered.compareAndSet(false, true)) {
            return;
        }
        // 如果是 isFull=true 代表注册整个服务,构建元数据并注册
        if (isFull) {
            publisher.publishEvent(buildMetaDataDTO());
        }
        
        // 构建URI数据并注册
        publisher.publishEvent(buildURIRegisterDTO());
    }

    // 构建URI数据
    private URIRegisterDTO buildURIRegisterDTO() {
        String host = IpUtils.isCompleteHost(this.host) ? this.host : IpUtils.getHost(this.host);
        return URIRegisterDTO.builder()
                .contextPath(this.contextPath)
                .appName(appName)
                .host(host)
                .port(port)
                .rpcType(RpcTypeEnum.HTTP.getName())
                .build();
    }

    // 构建元数据
    private MetaDataRegisterDTO buildMetaDataDTO() {
        String contextPath = this.contextPath;
        String appName = this.appName;
        return MetaDataRegisterDTO.builder()
                .contextPath(contextPath)
                .appName(appName)
                .path(contextPath)
                .rpcType(RpcTypeEnum.HTTP.getName())
                .enabled(true)
                .ruleName(contextPath)
                .build();
    }
}

在构造函数中主要是读取属性配置。

onApplicationEvent()方法是有Spring事件发生时会执行,这里的参数是ContextRefreshedEvent,表示上下文刷新事件。当Spring容器就绪后执行此处逻辑:如果是 isFull=true 代表注册整个服务,构建元数据并注册,在前面分析的后置处理器SpringMvcClientBeanPostProcessor中没有处理 isFull=true 的情况,所以在此处进行了处理。然后再构建URI数据并注册。

ContextRefreshedEventSpring内置事件。ApplicationContext被初始化或刷新时,该事件被触发。这也可以在 ConfigurableApplicationContext接口中使用 refresh() 方法来发生。此处的初始化是指:所有的Bean被成功装载,后处理Bean被检测并激活,所有Singleton Bean 被预实例化,ApplicationContext容器已就绪可用。

注册逻辑都是通过 publisher.publishEvent()完成。在前面都已经分析过了:向Disruptor队列写入数据,再从中消费数据,最后通过ExecutorSubscriber去处理。

  • ExecutorSubscriber#executor()

执行器订阅者分为两类,一个是处理元数据,一个是处理URI。在客户端和服务端分别有两个,所以一共是四个。

这里是注册URI信息,所以执行类是ShenyuClientURIExecutorSubscriber

  • ShenyuClientURIExecutorSubscriber#executor()

主要逻辑是遍历URI数据集合,通过persistURI()方法实现数据注册。


public class ShenyuClientURIExecutorSubscriber implements ExecutorTypeSubscriber<URIRegisterDTO> {
    
    //......
    
    @Override
    public DataType getType() {
        return DataType.URI; //数据类型是URI
    }
    
    // 注册URI数据
    @Override
    public void executor(final Collection<URIRegisterDTO> dataList) {
        for (URIRegisterDTO uriRegisterDTO : dataList) {
            Stopwatch stopwatch = Stopwatch.createStarted();
            while (true) {
                try (Socket ignored = new Socket(uriRegisterDTO.getHost(), uriRegisterDTO.getPort())) {
                    break;
                } catch (IOException e) {
                    long sleepTime = 1000;
                    // maybe the port is delay exposed
                    if (stopwatch.elapsed(TimeUnit.SECONDS) > 5) {
                        LOG.error("host:{}, port:{} connection failed, will retry",
                                uriRegisterDTO.getHost(), uriRegisterDTO.getPort());
                        // If the connection fails for a long time, Increase sleep time
                        if (stopwatch.elapsed(TimeUnit.SECONDS) > 180) {
                            sleepTime = 10000;
                        }
                    }
                    try {
                        TimeUnit.MILLISECONDS.sleep(sleepTime);
                    } catch (InterruptedException ex) {
                        ex.printStackTrace();
                    }
                }
            }
            //添加hook,优雅停止客户端 
            ShenyuClientShutdownHook.delayOtherHooks();
            
            // 注册URI
            shenyuClientRegisterRepository.persistURI(uriRegisterDTO);
        }
    }
}

代码中的while(true)循环是为了保证客户端已经成功启动了,通过hostport可以连接上。

后面的逻辑是:添加hook函数,用于优雅停止客户端 。

通过persistURI()方法实现数据注册。整个逻辑也在前面分析过了,最终就是通过OkHttp客户端向shenyu-admin发起http,通过http的方式注册URI

分析到这里就将客户端的注册逻辑分析完了,将构建的元数据和URI数据发送到Disruptor队列,再从中消费,读取数据,通过httpadmin发送数据。

客户端URI注册流程的源码分析完成了,流程图如下:

3. 服务端注册流程

3.1 注册接口ShenyuHttpRegistryController

从前面的分析可以知道,服务端提供了注册的两个接口:

  • /shenyu-client/register-metadata:服务端提供的接口用于注册元数据。
  • /shenyu-client/register-uri: 服务端提供的接口用于注册URI。

这两个接口位于ShenyuHttpRegistryController中,它实现了ShenyuServerRegisterRepository接口,是服务端注册的实现类。它用@Join标记,表示通过SPI进行加载。

// shenuyu客户端接口
@RequestMapping("/shenyu-client")
@Join
public class ShenyuHttpRegistryController implements ShenyuServerRegisterRepository {

    private ShenyuServerRegisterPublisher publisher;

    @Override
    public void init(final ShenyuServerRegisterPublisher publisher, final ShenyuRegisterCenterConfig config) {
        this.publisher = publisher;
    }
    
    // 注册元数据
    @PostMapping("/register-metadata")
    @ResponseBody
    public String registerMetadata(@RequestBody final MetaDataRegisterDTO metaDataRegisterDTO) {
        publish(metaDataRegisterDTO);
        return ShenyuResultMessage.SUCCESS;
    }
        
   // 注册URI
    @PostMapping("/register-uri")
    @ResponseBody
    public String registerURI(@RequestBody final URIRegisterDTO uriRegisterDTO) {
        publish(uriRegisterDTO);
        return ShenyuResultMessage.SUCCESS;
    }

    // 发布注册事件
    private <T> void publish(final T t) {
        publisher.publish(Collections.singletonList(t));
    }
}

两个注册接口获取到数据好,就调用了publish()方法,把数据发布到Disruptor队列中。

  • ShenyuServerRegisterRepository接口

ShenyuServerRegisterRepository接口是服务注册接口,它有五个实现类,表示有五种注册方式:

  • ConsulServerRegisterRepository:通过Consul实现注册;
  • EtcdServerRegisterRepository:通过Etcd实现注册;
  • NacosServerRegisterRepository:通过Nacos实现注册;
  • ShenyuHttpRegistryController:通过Http实现注册;
  • ZookeeperServerRegisterRepository:通过Zookeeper实现注册。

具体用哪一种方式,是通过配置文件指定的,然后通过SPI进行加载。

shenyu-admin中的application.yml文件中配置注册方式,registerType指定注册类型,当用http进行注册时,serverLists不需要填写,更多配置说明可以参考官网 客户端接入配置

shenyu:
  register:
    registerType: http 
    serverLists: 
  • RegisterCenterConfiguration 加载配置

在引入相关依赖和属性配置后,启动shenyu-admin时,会先加载配置文件,和注册中心相关的配置文件类是RegisterCenterConfiguration

// 注册中心配置类
@Configuration
public class RegisterCenterConfiguration {
    // 读取配置属性
    @Bean
    @ConfigurationProperties(prefix = "shenyu.register")
    public ShenyuRegisterCenterConfig shenyuRegisterCenterConfig() {
        return new ShenyuRegisterCenterConfig();
    }
    
    //创建ShenyuServerRegisterRepository,用于服务端注册
    @Bean
    public ShenyuServerRegisterRepository shenyuServerRegisterRepository(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig, final List<ShenyuClientRegisterService> shenyuClientRegisterService) {
        // 1.从配置属性中获取注册类型
        String registerType = shenyuRegisterCenterConfig.getRegisterType();
        // 2.通过注册类型,以SPI的方法加载实现类
        ShenyuServerRegisterRepository registerRepository = ExtensionLoader.getExtensionLoader(ShenyuServerRegisterRepository.class).getJoin(registerType);
        // 3.获取publisher,向Disruptor队列中写数据
        RegisterServerDisruptorPublisher publisher = RegisterServerDisruptorPublisher.getInstance();
        // 4.注册Service, rpcType -> registerService
        Map<String, ShenyuClientRegisterService> registerServiceMap = shenyuClientRegisterService.stream().collect(Collectors.toMap(ShenyuClientRegisterService::rpcType, e -> e));
        // 5.事件发布的准备工作
        publisher.start(registerServiceMap);
        // 6.注册的初始化操作
        registerRepository.init(publisher, shenyuRegisterCenterConfig);
        return registerRepository;
    }
}

在配置类中生成了两个bean

  • shenyuRegisterCenterConfig:读取属性配置;

  • shenyuServerRegisterRepository:用于服务端注册。

在创建shenyuServerRegisterRepository的过程中,也进行了一系列的准备工作:

  • 1.从配置属性中获取注册类型。
  • 2.通过注册类型,以SPI的方法加载实现类:比如指定的类型是http,就会加载ShenyuHttpRegistryController
  • 3.获取publisher,向Disruptor队列中写数据。
  • 4.注册ServicerpcType -> registerService:获取注册的Service,每种rpc都有对应的Service。本文的客户端构建是通过springboot,属于http类型,还有其他客户端类型:dubboSpring CloudgRPC等。
  • 5.事件发布的准备工作:添加服务端元数据和URI订阅器,处理数据。并且启动Disruptor队列。
  • 6.注册的初始化操作:http类型的注册初始化操作就是保存publisher

  • RegisterServerDisruptorPublisher#publish()

服务端向Disruptor队列写入数据的发布者 ,通过单例模式构建。


public class RegisterServerDisruptorPublisher implements ShenyuServerRegisterPublisher {
    //私有属性
    private static final RegisterServerDisruptorPublisher INSTANCE = new RegisterServerDisruptorPublisher();

    //公开静态方法获取实例
    public static RegisterServerDisruptorPublisher getInstance() {
        return INSTANCE;
    }
    
   //事件发布的准备工作,添加服务端元数据和URI订阅器,处理数据。并且启动Disruptor队列。
    public void start(final Map<String, ShenyuClientRegisterService> shenyuClientRegisterService) {
        //服务端注册工厂
        factory = new RegisterServerExecutorFactory();
        //添加URI数据订阅器
        factory.addSubscribers(new URIRegisterExecutorSubscriber(shenyuClientRegisterService));
        //添加元数据订阅器
        factory.addSubscribers(new MetadataExecutorSubscriber(shenyuClientRegisterService));
        //启动Disruptor队列
        providerManage = new DisruptorProviderManage(factory);
        providerManage.startup();
    }
    
    // 向队列中写入数据
    @Override
    public <T> void publish(final T data) {
        DisruptorProvider<Object> provider = providerManage.getProvider();
        provider.onData(f -> f.setData(data));
    }
    
    @Override
    public void close() {
        providerManage.getProvider().shutdown();
    }
}

配置文件的加载,可看作是注册中心服务端初始化流程,用图描述如下:

3.2 消费数据QueueConsumer

在前面分析了客户端disruptor队列消费数据的过。服务端也是一样的逻辑,只是其中执行任务的执行者变了。

QueueConsumer是一个消费者,它实现了WorkHandler接口,它的创建过程在providerManage.startup()逻辑中。WorkHandler接口是disruptor的数据消费接口,只有一个方法是onEvent()

package com.lmax.disruptor;

public interface WorkHandler<T> {
    void onEvent(T var1) throws Exception;
}

QueueConsumer重写了onEvent()方法,主要逻辑是生成消费任务,然后在线程池中去执行。

/**
 * 
 * 队列消费者
 */
public class QueueConsumer<T> implements WorkHandler<DataEvent<T>> {
    
	// 省略了其他逻辑

    @Override
    public void onEvent(final DataEvent<T> t) {
        if (t != null) {
            // 通过工厂创建队列消费任务
            QueueConsumerExecutor<T> queueConsumerExecutor = factory.create();
            // 保存数据
            queueConsumerExecutor.setData(t.getData());
            // help gc
            t.setData(null);
            // 放在线程池中执行 消费任务
            executor.execute(queueConsumerExecutor);
        }
    }
}

QueueConsumerExecutor是在线程池中被执行的任务,它实现了Runnable接口,具体的实现类有两个:

  • RegisterClientConsumerExecutor:客户端消费者执行器;
  • RegisterServerConsumerExecutor:服务端消费者执行器。

顾名思义,一个负责处理客户端任务,一个负责处理服务端任务。

  • RegisterServerConsumerExecutor#run()

RegisterServerConsumerExecutor是服务端消费者执行器,它通过QueueConsumerExecutor间接实现了Runnable接口,并重写了run()方法。


public final class RegisterServerConsumerExecutor extends QueueConsumerExecutor<List<DataTypeParent>> {
   // ...

    @Override
    public void run() {
        //获取从disruptor队列中拿到的数据
        List<DataTypeParent> results = getData();
        // 数据校验
        results = results.stream().filter(data -> isValidData(data)).collect(Collectors.toList());
        if (CollectionUtils.isEmpty(results)) {
            return;
        }
        //根据类型执行操作
        getType(results).executor(results);
    }
    
    // 根据类型获取订阅者
    private ExecutorSubscriber getType(final List<DataTypeParent> list) {
        DataTypeParent result = list.get(0);
        return subscribers.get(result.getType());
    }
}

  • ExecutorSubscriber#executor()

执行器订阅者分为两类,一个是处理元数据,一个是处理URI。在客户端和服务端分别有两个,所以一共是四个。

  • MetadataExecutorSubscriber#executor()

如果是注册元数据,则通过MetadataExecutorSubscriber#executor()实现:根据类型获取注册Service,调用register()

public class MetadataExecutorSubscriber implements ExecutorTypeSubscriber<MetaDataRegisterDTO> {
 
    //......

    @Override
    public DataType getType() {
        return DataType.META_DATA;  // 元数据类型
    }

    @Override
    public void executor(final Collection<MetaDataRegisterDTO> metaDataRegisterDTOList) {
        // 遍历元数据列表
        for (MetaDataRegisterDTO metaDataRegisterDTO : metaDataRegisterDTOList) {
            // 根据类型获取注册Service
            ShenyuClientRegisterService shenyuClientRegisterService = this.shenyuClientRegisterService.get(metaDataRegisterDTO.getRpcType());
            Objects.requireNonNull(shenyuClientRegisterService);
            // 对元数据进行注册,加锁确保顺序执行,防止并发错误
            synchronized (ShenyuClientRegisterService.class) {
                shenyuClientRegisterService.register(metaDataRegisterDTO);
            }
        }
    }
}
  • URIRegisterExecutorSubscriber#executor()

如果是注册元数据,则通过URIRegisterExecutorSubscriber#executor()实现:构建URI数据,根据注册类型查找Service,通过registerURI方法实现注册。


public class URIRegisterExecutorSubscriber implements ExecutorTypeSubscriber<URIRegisterDTO> {
    //......
    
    @Override
    public DataType getType() {
        return DataType.URI; // URI数据类型
    }
    
    @Override
    public void executor(final Collection<URIRegisterDTO> dataList) {
        if (CollectionUtils.isEmpty(dataList)) {
            return;
        }
        // 构建URI数据类型,通过registerURI方法实现注册
        findService(dataList).ifPresent(service -> {
            Map<String, List<URIRegisterDTO>> listMap = buildData(dataList);
            listMap.forEach(service::registerURI);
        });
    }
    
    // 根据类型查找Service
    private Optional<ShenyuClientRegisterService> findService(final Collection<URIRegisterDTO> dataList) {
        return dataList.stream().map(dto -> shenyuClientRegisterService.get(dto.getRpcType())).findFirst();
    }
}

  • ShenyuClientRegisterService#register()

ShenyuClientRegisterService是注册方法接口,它有多个实现类:

  • AbstractContextPathRegisterService:抽象类,处理部分公共逻辑;
  • AbstractShenyuClientRegisterServiceImpl::抽象类,处理部分公共逻辑;
  • ShenyuClientRegisterDivideServiceImpldivide类,处理http注册类型;
  • ShenyuClientRegisterDubboServiceImpldubbo类,处理dubbo注册类型;
  • ShenyuClientRegisterGrpcServiceImplgRPC类,处理gRPC注册类型;
  • ShenyuClientRegisterMotanServiceImplMotan类,处理Motan注册类型;
  • ShenyuClientRegisterSofaServiceImplSofa类,处理Sofa注册类型;
  • ShenyuClientRegisterSpringCloudServiceImplSpringCloud类,处理SpringCloud注册类型;
  • ShenyuClientRegisterTarsServiceImplTars类,处理Tars注册类型;

从上面可以看出每种微服务都有对应的注册实现类,本文的源码分析是 以官方提供的 shenyu-examples-http 为例,是属http注册类型,所以元数据和URI数据的注册实现类是 ShenyuClientRegisterDivideServiceImpl

  • register(): 注册元数据
public String register(final MetaDataRegisterDTO dto) {
        // 1.注册选择器信息
        String selectorHandler = selectorHandler(dto);
        String selectorId = selectorService.registerDefault(dto, PluginNameAdapter.rpcTypeAdapter(rpcType()), selectorHandler);
        // 2.注册规则信息
        String ruleHandler = ruleHandler();
        RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);
        ruleService.registerDefault(ruleDTO);
        // 3.注册元数据信息
        registerMetadata(dto);
        // 4.注册contextPath
        String contextPath = dto.getContextPath();
        if (StringUtils.isNotEmpty(contextPath)) {
            registerContextPath(dto);
        }
        return ShenyuResultMessage.SUCCESS;
    }

整个注册逻辑可以分为4个步骤:

  • 1.注册选择器信息
  • 2.注册规则信息
  • 3.注册元数据信息
  • 4.注册contextPath

admin这一侧通过客户端的元数据信息需要构建选择器、规则、元数据和ContextPath。具体的注册过程和细节处理跟rpc类型有关。我们就不再继续向下追踪了,对于注册中心的逻辑分析,跟踪到这里就够了。

服务端元数据注册流程的源码分析完了,流程图描述如下:

  • registerURI(): 注册URI数据
public String registerURI(final String selectorName, final List<URIRegisterDTO> uriList) {
        if (CollectionUtils.isEmpty(uriList)) {
            return "";
        }
        // 对应的选择器是否存在
        SelectorDO selectorDO = selectorService.findByNameAndPluginName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));
        if (Objects.isNull(selectorDO)) {
            return "";
        }
        // 处理选择器中的handler信息
        String handler = buildHandle(uriList, selectorDO);
        selectorDO.setHandle(handler);
        SelectorData selectorData = selectorService.buildByName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));
        selectorData.setHandle(handler);
       
        // 更新数据库中的记录
        selectorService.updateSelective(selectorDO);
        // 发布事件
        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData)));
        return ShenyuResultMessage.SUCCESS;
    }

admin拿到URI数据后,主要是更新选择器中的handler信息,然后写入到数据库,最后发布事件通知网关。通知网关的逻辑是由数据同步操作完成,这在之前的文章中已经分析过了,就不再赘述。

服务端URI注册流程的源码分析完成了,用图描述如下:

至此,服务端注册流程也就分析完了,主要通过对外提供的接口,接受客户端的注册信息,然后写入到Disruptor队列,再从中消费数据,根据接收到的元数据和URI数据更新admin的选择器、规则、元数据和选择器的handler

4. 总结

本文主要对Apache ShenYu网关中的http注册模块进行了源码分析。涉及到的主要知识点,归纳如下:

  • 注册中心是为了将客户端信息注册到admin,方便流量筛选;
  • http注册是将客户端元数据信息和URI信息注册到admin
  • http服务的接入通过注解@ShenyuSpringMvcClient标识;
  • 注册信息的构建主要通过Spring的后置处理器BeanPostProcessor和应用监听器ApplicationListener
  • 注册类型的加载通过SPI完成;
  • 引入Disruptor队列是为了数据与操作解耦,以及数据缓冲。
  • 注册中心的实现采用了面向接口编程,使用模板方法、单例、观察者等设计模式。