数据结构与算法思路

程序

深刻认识到程序=数据结构+算法,使用程序解决特定的问题,特定的问题使用何种数据结构,为了高效的得到结果,使用什么算法。

数据结构

最基础的数据结构是数组和链表,数组占用连续空间,通过下标实现O(1)时间复杂度访问;链表使用额外的指针连接数据节点,需要O(N)时间复杂度访问目标数据。

基于数组和链表组成多种高级数据结构:

  • 哈希表:数组+链表;
  • 二叉树:链表;
  • 栈 / 队列:数组或者链表;
  • 图:数组+链表。

算法

算法的本质是在特定的数据结构上更高效的穷举所有结果。

  • 回溯算法;
  • 贪心算法;
  • 分治算法;
  • 动态规划。

常用技巧

  • 双指针

在单链表中使用快慢指针寻找中间节点;在有序数组中高效的寻找目标数据;在滑动窗口中,右指针不断向前,满足条件后,左指针开始不断收缩。在回文字符串中,从中心不断向左右扩散,直到不满足条件;在盛水容器中,左右指针从两边开始移动,值小的不断移动。在nSum问题中,不断寻找目标值。

  • 二分搜索

在有序序列中查找数据,分析清楚左边界和右边界,循环结束条件。可以泛化到一般单调性函数上使用。

  • 单调栈/队列

遇到下个更大元素的问题,就直接使用单调栈吧。单调队列用在滑动窗口中求最大值。

  • 递归

想清楚递归函数的定义,假设已经知道了f(n-1)的结果,结合nums(i)如何计算f(n)。千万不要不断递归下去,这样会把自己陷进去。

递归在二叉树中应用很广。二叉树的遍历有前序,中序,后序三种方式。解决二叉树的各种问题,要考虑清楚使用哪种遍历方式,然后重点考虑当前节点需要作什么操作才能满足递归函数的定义。

  • 动态规划

动态规划常用于最值问题,它的数学证明很难,我们也不需要去做。想清楚dp()的定义,如果知道了dp(i-1)dp(i)怎么利用dp(i-1)计算出结果。中间过程有哪些参数会影响到结果?这些参数很有可能就是状态变量。在考虑用动态规划之前,不妨试试递归+备忘录的方式能否解决呢,他们俩的效率是一样的。

  • 回溯

如果要得到所有可能性的结果集,一般就可以考虑使用回溯。回溯就是穷举所有可能的结果,在循环中调用递归。在递归之前做出选择,在递归之后撤销选择。为了提高效率,考虑在每次递归前进行剪枝。

刷题

  • 从二叉树开始
  • 没有思路,直接看题解
  • 一道题目至少刷五遍

Apache ShenYu源码阅读系列-Agent模块源码分析

Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API 网关。

ShenYu网关中,Apache ShenYu 利用 Java Agent字节码增强 技术实现了无痕埋点,使得用户无需引入依赖即可接入第三方可观测性系统,获取 TracesMetricsLogging

本文基于shenyu-2.4.2版本进行源码分析,官网的介绍请参考 可观测性

具体而言,就是shenyu-agent模块,它基于 Java Agent 机制,通过ByteBuddy字节码增强库,在类加载时增强对象,属于静态代理。

  • AOP术语

在分析源码之前,介绍下AOP相关的术语,便于后续的理解:

  • JoinPoint :连接点,程序运行中的时间点,比如方法的执行点;
  • PointCut :切入点,匹配 JoinPoint 的条件;
  • Advice:通知,具体的执行逻辑;
  • Target :目标对象;
  • Proxy :代理对象。

  • 关于Byte Buddy

Byte Buddy是一个代码生成和操作库,在Java应用程序的运行期间创建和修改Java类。可以利用它创建任何类,不像JDK动态代理那样强制实现一个接口。此外,Byte Buddy提供了方便的API,用于手动、使用Java代理或在构建期间改变类。

  • 提供了非常方便的API接口,与强大的类,方法等匹配功能;
  • 开箱即用,零学习成本,屏蔽了底层操作字节码技术;
  • 强大的开放定制性功能,可以为任何实现的方法自定义字节码;
  • 最少运行时生成代码原则,性能高效;

1. premain入口

premain()函数javaagent 的入口函数,在 ShenYuShenyuAgentBootstrap 提供并实现整个agent的逻辑。

/**
 * agent 启动入口类
 */
public class ShenyuAgentBootstrap {
    
    /**
     * 入口函数 premain.
     */
    public static void premain(final String arguments, final Instrumentation instrumentation) throws Exception {
        // 1. 读取配置文件
        ShenyuAgentConfigUtils.setConfig(ShenyuAgentConfigLoader.load());
        // 2. 加载所有插件
        ShenyuAgentPluginLoader.getInstance().loadAllPlugins();
        // 3. 创建 agent
        AgentBuilder agentBuilder = new AgentBuilder.Default().with(new ByteBuddy().with(TypeValidation.ENABLED))
                .ignore(ElementMatchers.isSynthetic())
                .or(ElementMatchers.nameStartsWith("org.apache.shenyu.agent."));
        agentBuilder.type(ShenyuAgentTypeMatcher.getInstance())
                .transform(new ShenyuAgentTransformer())
                .with(AgentBuilder.RedefinitionStrategy.RETRANSFORMATION)
                .with(new TransformListener()).installOn(instrumentation);
        // 4. 启动插件
        PluginLifecycleManager lifecycleManager = new PluginLifecycleManager();
        lifecycleManager.startup(ShenyuAgentConfigUtils.getPluginConfigMap());
        Runtime.getRuntime().addShutdownHook(new Thread(lifecycleManager::close));
    }
}

premain函数的核心逻辑,就是上面的四步操作:

    1. 读取配置文件;
    1. 加载所有插件;
    1. 创建 agent;
    1. 启动插件。

接下来的源码分析就依次分析这四个操作。

2. 读取配置文件

  • ShenyuAgentConfigLoader#load()

配置文件的处理由 ShenyuAgentConfigLoader 完成,代码实现如下:

public final class ShenyuAgentConfigLoader {
    // 配置文件路径
    private static final String CONFIG_PATH = "config-path";
    
    /**
     * 加载配置文件.
     */
    public static ShenyuAgentConfig load() throws IOException {
        // 读取配置文件路径
        String configPath = System.getProperty(CONFIG_PATH);
        // 如果没有配置,就读取默认的文件 shenyu-agent.yaml
        File configFile = StringUtils.isEmpty(configPath) ? ShenyuAgentLocator.locatorConf("shenyu-agent.yaml") : new File(configPath);
        // 读取配置文件并解析
        return ShenyuYamlEngine.agentConfig(configFile);
    }
}

可以通过config-path指定配置文件的路径,如果没有指定的话,就读取默认的配置文件 shenyu-agent.yaml,然后通过ShenyuYamlEngine来解析配置文件。

配置文件的格式是yaml格式,如何配置,请参考官网的介绍 可观测性

默认配置文件shenyu-agent.yaml的格式内容如下:

appName: shenyu-agent  # 指定一个名称
supports:  # 当前支持哪些功能
  tracing: # 链路追踪的插件
#    - jaeger   
#    - opentelemetry
     - zipkin
  metrics:  # 统计度量插件
    - 
  logging:  # 日志信息插件
    - 
  
plugins:  # 每个插件的具体配置信息
  tracing:   # 链路追踪的插件
    jaeger:  # jaeger的相关配置
      host: "localhost"
      port: 5775
      props:
        SERVICE_NAME: "shenyu-agent"
        JAEGER_SAMPLER_TYPE: "const"
        JAEGER_SAMPLER_PARAM: "1"
    opentelemetry:  # opentelemetry的相关配置
      props:
        otel.traces.exporter: jaeger #zipkin #otlp
        otel.resource.attributes: "service.name=shenyu-agent"
        otel.exporter.jaeger.endpoint: "http://localhost:14250/api/traces"
    zipkin: # zipkin的相关配置
      host: "localhost"
      port: 9411
      props:
        SERVICE_NAME: "shenyu-agent"
        URL_VERSION: "/api/v2/spans"
        SAMPLER_TYPE: "const"
        SAMPLER_PARAM: "1"
  metrics:   # 统计度量插件
    prometheus: # prometheus的相关配置
      host: "localhost"
      port: 8081
      props:
  logging:   # 日志信息插件
    elasticSearch: # es的相关配置
      host: "localhost"
      port: 8082
      props:
    kafka:   # kafka的相关配置
      host: "localhost"
      port: 8082
      props:

需要开启哪个插件,就在supports中指定,然后再plugins指定插件的配置信息。

到目前为止,Apache ShenYu 发布的最新版本是2.4.2 版本,可以支持tracing的插件有jaegeropentelemetryzipkinmetricslogging将在后续的版本中陆续发布。

  • ShenyuYamlEngine#agentConfig()

ShenyuYamlEngine提供了如何自定义加载yaml格式的文件。

    public static ShenyuAgentConfig agentConfig(final File yamlFile) throws IOException {
        try (
               // 读取文件流
                FileInputStream fileInputStream = new FileInputStream(yamlFile);
                InputStreamReader inputStreamReader = new InputStreamReader(fileInputStream)
        ) {
            //指定对应的class
            Constructor constructor = new Constructor(ShenyuAgentConfig.class);
            //指定属性的class
            TypeDescription customTypeDescription = new TypeDescription(AgentPluginConfig.class);
            customTypeDescription.addPropertyParameters("plugins", Map.class);
            constructor.addTypeDescription(customTypeDescription);
            //通过Yaml工具包读取yaml文件 
            return new Yaml(constructor, new Representer(DUMPER_OPTIONS)).loadAs(inputStreamReader, ShenyuAgentConfig.class);
        }
    }

ShenyuAgentConfig是指定的Class类:

public final class ShenyuAgentConfig {
    // appName 服务名称,默认是 shenyu-agent 
    private String appName = "shenyu-agent";
    // supports 支持哪些插件
    private Map<String, List<String>> supports = new LinkedHashMap<>();
    // plugins 插件的属性信息
    private Map<String, Map<String, AgentPluginConfig>> plugins = new LinkedHashMap<>();
    
}

AgentPluginConfig是指定插件的Class类:

public final class AgentPluginConfig {
    // 指定插件的 host
    private String host;
     // 指定插件的 port
    private int port;
     // 指定插件的 password
    private String password;
     // 指定插件的 其他属性props 
    private Properties props;
}

通过配置文件,用户可以指定启用哪个插件,指定插件的属性信息。

3. 加载插件

  • ShenyuAgentPluginLoader#loadAllPlugins()

读取配置文件后,需要根据用户自定义的配置信息,加载指定的插件。由ShenyuAgentPluginLoader来完成。

ShenyuAgentPluginLoader是一个自定义的类加载器,采用单例设计模式。

// 自定义类加载器,继承 ClassLoader
public final class ShenyuAgentPluginLoader extends ClassLoader implements Closeable {
    // 私有变量
        private static final ShenyuAgentPluginLoader AGENT_PLUGIN_LOADER = new ShenyuAgentPluginLoader();
    // 私有构造器
    private ShenyuAgentPluginLoader() {
        super(ShenyuAgentPluginLoader.class.getClassLoader());
    }
    
    // 公开静态方法
    public static ShenyuAgentPluginLoader getInstance() {
        return AGENT_PLUGIN_LOADER;
    }
    
    /**
     * 加载所有的插件.
     */
    public void loadAllPlugins() throws IOException {
        // 1.定位插件路径
        File[] jarFiles = ShenyuAgentLocator.locatorPlugin().listFiles(file -> file.getName().endsWith(".jar"));
        if (Objects.isNull(jarFiles)) {
            return;
        }
        // 2.加载插件定义
        Map<String, ShenyuAgentJoinPoint> pointMap = new HashMap<>();
        try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
            for (File each : jarFiles) {
                outputStream.reset();
                JarFile jar = new JarFile(each, true);
                jars.add(new PluginJar(jar, each));
            }
        }
       
        loadAgentPluginDefinition(pointMap);
        Map<String, ShenyuAgentJoinPoint> joinPointMap = ImmutableMap.<String, ShenyuAgentJoinPoint>builder().putAll(pointMap).build();
        // 3.设置拦截点
        ShenyuAgentTypeMatcher.getInstance().setJoinPointMap(joinPointMap);
    }
}

3.1 定位插件路径

  • ShenyuAgentLocator#locatorPlugin()

整个shenyu项目经过maven打包后(执行mvn clean install命令),agent打包目录如下:

插件文件都是jar包形式存在的。

  • conf目录是配置文件的目录位置;
  • plugins目录是各个插件的目录位置。

相应的定位插件路径源码处理逻辑如下:

// 默认插件位于   /plugins 目录下
public static File locatorPlugin() {
        return new File(String.join("", locatorAgent().getPath(), "/plugins"));
}

// 定位shenyu-agent.jar的绝对路径
public static File locatorAgent() {
    // 找 ShenyuAgentLocator 所在的类路径(包名)
        String classResourcePath = String.join("", ShenyuAgentLocator.class.getName().replaceAll("\\.", "/"), ".class");
    // 找到 类 的绝对路径:磁盘路径+类路径
        URL resource = ClassLoader.getSystemClassLoader().getResource(classResourcePath);
        assert resource != null;
        String url = resource.toString();
    // 是否是以jar包形式存在
        int existFileInJarIndex = url.indexOf('!');
        boolean isInJar = existFileInJarIndex > -1;
    // 从jar包找到路径 或 从资源文件中找路径
        return isInJar ? getFileInJar(url, existFileInJarIndex) : getFileInResource(url, classResourcePath);
    }

  // 从jar包找到路径
    private static File getFileInJar(final String url, final int fileInJarIndex) {
        // jar包所在的绝对路径
        String realUrl = url.substring(url.indexOf("file:"), fileInJarIndex);
        try {
            // 以绝对路径创建File对象
            File agentJarFile = new File(new URL(realUrl).toURI());
            // 获取父文件
            return agentJarFile.exists() ? agentJarFile.getParentFile() : null;
        } catch (final MalformedURLException | URISyntaxException ex) {
            return null;
        }
    }

拿到所有的插件文件后,会去加载插件定义,即拦截点。

3.2 加载拦截点

  • ShenyuAgentPluginLoader#loadAgentPluginDefinition()

拦截点的默认配置是在 conf/tracing-point.yaml文件中,配置格式如下:

pointCuts:
  - targetClass: org.apache.shenyu.plugin.global.GlobalPlugin  # 拦截目标类
    points:
      - type: instanceMethod # 拦截点类型
        name: execute  # 拦截目标方法
    handlers:   # 处理器
      jaeger:   # 用于链路追踪的jaeger插件
        - org.apache.shenyu.agent.plugin.tracing.jaeger.handler.JaegerGlobalPluginHandler
      opentelemetry: # 用于链路追踪的opentelemetry插件
        - org.apache.shenyu.agent.plugin.tracing.opentelemetry.handler.OpenTelemetryGlobalPluginHandler
      zipkin:    # 用于链路追踪的zipkin插件
        - org.apache.shenyu.agent.plugin.tracing.zipkin.handler.ZipkinGlobalPluginHandler
        
        // ......

加载拦截点的方式是通过SPI的方式进行加载的,然后再收集这些拦截点。

 private void loadAgentPluginDefinition(final Map<String, ShenyuAgentJoinPoint> pointMap) {
        SPILoader.loadList(AgentPluginDefinition.class)  // SPI 加载拦截点
                .forEach(each -> each.collector().forEach(def -> {  // 收集拦截点
                    String classTarget = def.getClassTarget();
                    if (pointMap.containsKey(classTarget)) {
                        ShenyuAgentJoinPoint pluginInterceptorPoint = pointMap.get(classTarget);
                        pluginInterceptorPoint.getConstructorPoints().addAll(def.getConstructorPoints()); // 构造器类型拦截点
                        pluginInterceptorPoint.getInstanceMethodPoints().addAll(def.getInstanceMethodPoints()); // 实例方法类型拦截点
                        pluginInterceptorPoint.getStaticMethodPoints().addAll(def.getStaticMethodPoints()); // 静态方法类型拦截点
                    } else {
                        pointMap.put(classTarget, def);
                    }
                }));
    }
  • SPILoader.loadList(AgentPluginDefinition.class)

AgentPluginDefinition是拦截点接口,由@SPI标记:

@SPI // 该接口通过SPI进行加载
public interface AgentPluginDefinition {
    
    /**
     * 收集拦截点 
     */
    Collection<ShenyuAgentJoinPoint> collector();
}

TracingAgentPluginDefinition是它的一个实现类,用于定义链路追踪的拦截点:

@Join // SPI的实现类
public final class TracingAgentPluginDefinition extends AbstractAgentPluginDefinition {
       
    // 创建拦截点
    @Override
    protected Collection<JoinPointBuilder> joinPointBuilder() {
       // ......
    }
}

public abstract class AbstractAgentPluginDefinition implements AgentPluginDefinition {
    // 创建拦截点
    protected abstract Collection<JoinPointBuilder> joinPointBuilder();
    
    // 收集拦截点信息
    @Override
    public final Collection<ShenyuAgentJoinPoint> collector() {
        //......
    }
}

类之间的继承关系如下:

  • AgentPluginDefinition#collector()

    AgentPluginDefinition只是一个接口,定义了收集拦截点的操作方法,具体实现交给了子类。

public abstract class AbstractAgentPluginDefinition implements AgentPluginDefinition {
    
    // 子类去实现如何创建拦截点
    protected abstract Collection<JoinPointBuilder> joinPointBuilder();

    @Override
    public final Collection<ShenyuAgentJoinPoint> collector() {
        // 获取拦截点构建器
        Collection<JoinPointBuilder> joinPointBuilders = joinPointBuilder();
        // 创建拦截点对象 ShenyuAgentJoinPoint
        return joinPointBuilders.stream().map(JoinPointBuilder::install).collect(Collectors.toList());
    }
}

 // 创建拦截点对象 ShenyuAgentJoinPoint
public ShenyuAgentJoinPoint install() {
    // 四个构造参数分别是:目标对象,构造器拦截点,实例方法拦截点,静态方法拦截点
    return new ShenyuAgentJoinPoint(classTarget, constructorPoints, instanceMethodPoints, classStaticMethodPoints);
}
  • TracingAgentPluginDefinition#joinPointBuilder()

    创建用于链路追踪的拦截点。

@Join
public final class TracingAgentPluginDefinition extends AbstractAgentPluginDefinition {
    // 创建拦截点    
    @Override
    protected Collection<JoinPointBuilder> joinPointBuilder() {
        PointCutConfig config = null;
        try {
            // 读取默认的拦截点配置文件
            config = ShenyuYamlEngine.unmarshal(ShenyuAgentLocator.locatorConf("tracing-point.yaml"), PointCutConfig.class);
        } catch (IOException e) {
            LOG.error("Exception loader tracing point config is", e);
        }
        // 创建拦截点  
        return JoinPointBuilderFactory.create(config);
    }
}
  • JoinPointBuilderFactory#create()

    根据指定的配置文件创建拦截点 。

public static Collection<JoinPointBuilder> create(final PointCutConfig config) {
        //如果没有配置文件或为空,则返回空集合
        if (Objects.isNull(config) || config.getPointCuts().isEmpty()) {
            return Collections.emptyList();
        }
        return config.getPointCuts().stream() // 获取配置文件中定义的拦截点
                .filter(pointCut -> StringUtils.isNotEmpty(pointCut.getTargetClass())
                        && !pointCut.getPoints().isEmpty() && !pointCut.getHandlers().isEmpty()) // 拦截点必须要指定目标类,切入点,处理器
                .map(pointCut -> {
                    JoinPointBuilder builder = ShenyuAgentJoinPoint.interceptClass(pointCut.getTargetClass()); // 设置需要拦截的目标类
                    Set<String> supports = ShenyuAgentConfigUtils.getSupports(); // 获取当前支持哪些插件
                    List<String> handlers = pointCut.getHandlers().entrySet().stream()
                            .filter(entry -> supports.contains(entry.getKey())) // 指定的处理器必须是当前可支持的插件
                            .flatMap(entry -> entry.getValue().stream())
                            .collect(Collectors.toList());
                    String[] instanceMethods = pointCut
                            .getPoints()
                            .stream()
                            .filter(point -> PointType.INSTANCE_METHOD.getName().equals(point.getType()))
                            .map(Point::getName) // 拦截实例方法
                            .toArray(String[]::new);
                    if (instanceMethods.length > 0) {
                        builder.aroundInstanceMethod(ElementMatchers.namedOneOf(instanceMethods)).handlers(handlers).build(); // 为实例方法添加匹配器用于后续运行时动态匹配,并添加对应的处理器
                    }
                    String[] staticMethods = pointCut
                            .getPoints()
                            .stream()
                            .filter(point -> PointType.STATIC_METHOD.getName().equals(point.getType()))
                            .map(Point::getName)
                            .toArray(String[]::new); // 拦截静态方法
                    if (staticMethods.length > 0) {
                        builder.aroundStaticMethod(ElementMatchers.namedOneOf(staticMethods)).handlers(handlers).build();// 为静态方法添加匹配器用于后续运行时动态匹配,并添加对应的处理器
                    }
                    String[] constructorPoints = pointCut
                            .getPoints()
                            .stream()
                            .filter(point -> PointType.CONSTRUCTOR.getName().equals(point.getType()))
                            .map(Point::getName)
                            .toArray(String[]::new); // 拦截构造器
                    if (constructorPoints.length > 0) {
                        builder.onConstructor(ElementMatchers.namedOneOf(constructorPoints)).handlers(handlers).build();// 为构造器添加匹配器用于后续运行时动态匹配,并添加对应的处理器
                    }
                    return builder;
                }).collect(Collectors.toList()); // 返回匹配结果
    }

创建拦截点的主要实现逻辑是:根据配置文件读取配置信息,为指定的目标对象的目标方法添加相应的处理器。处理器有三种:实例方法处理器,静态方法处理器,构造函数处理器。

这里用到了ElementMatchers.namedOneOf() 方法,它表示方法名称在指定的参数中,就可以匹配上这个方法。ElementMatchersbytebuddy中的一个类,在ShenYu中,agent的创建也通过bytebuddy完成的。

后续将收集到的拦截点创建为拦截点对象ShenyuAgentJoinPoint

    public final Collection<ShenyuAgentJoinPoint> collector() {
        // 获取拦截点
        Collection<JoinPointBuilder> joinPointBuilders = joinPointBuilder();
        // 创建拦截点对象
        return joinPointBuilders.stream().map(JoinPointBuilder::install).collect(Collectors.toList());
    }

收集完拦截点之后,用Map保存了这些拦截点信息。

// pointMap: Key和Value分别表示目标类,拦截点
private void loadAgentPluginDefinition(final Map<String, ShenyuAgentJoinPoint> pointMap) {
        SPILoader.loadList(AgentPluginDefinition.class)  // SPI 加载拦截点
                .forEach(each -> each.collector().forEach(def -> {  // 收集拦截点
                    String classTarget = def.getClassTarget();
                    if (pointMap.containsKey(classTarget)) {
                        ShenyuAgentJoinPoint pluginInterceptorPoint = pointMap.get(classTarget);
                        pluginInterceptorPoint.getConstructorPoints().addAll(def.getConstructorPoints()); // 构造器类型拦截点
                        pluginInterceptorPoint.getInstanceMethodPoints().addAll(def.getInstanceMethodPoints()); // 实例方法类型拦截点
                        pluginInterceptorPoint.getStaticMethodPoints().addAll(def.getStaticMethodPoints()); // 静态方法类型拦截点
                    } else {
                        pointMap.put(classTarget, def); // 将拦截点信息保存到Map中
                    }
                }));
    }
   

3.3 设置拦截点

在加载所有插件的过程中最后一步是设置拦截点。

     public void loadAllPlugins() throws IOException {
        // 1.定位插件路径
        // ......
        // 2.加载插件定义
        // ......
        // 3.设置拦截点
        ShenyuAgentTypeMatcher.getInstance().setJoinPointMap(joinPointMap);
    }

设置拦截点就是将拦截点集合保存到ShenyuAgentTypeMatcher类中。它实现了ElementMatcher接口,用于自定义匹配逻辑。ElementMatcher也是bytebuddy中的接口。

// 使用单例设计模式
public final class ShenyuAgentTypeMatcher extends ElementMatcher.Junction.AbstractBase<TypeDefinition> {
    // 创建实例
    private static final ShenyuAgentTypeMatcher SHENYU_AGENT_TYPE_MATCHER = new ShenyuAgentTypeMatcher();
    // 拦截点集合
    private Map<String, ShenyuAgentJoinPoint> joinPointMap;
    
    private ShenyuAgentTypeMatcher() {
    }
    
    /**
     * 获取单例
     */
    public static ShenyuAgentTypeMatcher getInstance() {
        return SHENYU_AGENT_TYPE_MATCHER;
    }
 
    //自定义匹配逻辑,目标类在拦截点集合中就匹配成功
    @Override
    public boolean matches(final TypeDefinition target) {
        return joinPointMap.containsKey(target.getTypeName());
    }
    
    /**
     * 设置拦截点集合
     */
    public void setJoinPointMap(final Map<String, ShenyuAgentJoinPoint> joinPointMap) {
        this.joinPointMap = joinPointMap;
    }
}

4. 创建 agent

通过创建的agent,用于改变目标类的行为。

 public static void premain(final String arguments, final Instrumentation instrumentation) throws Exception {
        // 1. 读取配置文件
        // ......
        // 2. 加载所有插件
        // ......
        // 3. 创建 agent
        AgentBuilder agentBuilder = new AgentBuilder.Default().with(new ByteBuddy().with(TypeValidation.ENABLED)) // 通过ByteBuddy创建Agent,开启类型校验
                .ignore(ElementMatchers.isSynthetic()) // 忽略合成类
                .or(ElementMatchers.nameStartsWith("org.apache.shenyu.agent.")); // 忽略org.apache.shenyu.agent 的类
        agentBuilder.type(ShenyuAgentTypeMatcher.getInstance())//匹配加载类型,匹配器是ShenyuAgentTypeMatcher
                .transform(new ShenyuAgentTransformer()) // 匹配成功的,通过ShenyuAgentTransformer改变其行为
                .with(AgentBuilder.RedefinitionStrategy.RETRANSFORMATION) //指定重定义策略
                .with(new TransformListener()) //指定一个监听器,监听运行过程中的事件
                .installOn(instrumentation); // 将修改应用到instrumentation中
        // 4. 启动插件
        // ......
    }

在创建agent的过程中,需要注意两个点:

  • 是否匹配成功,由ShenyuAgentTypeMatcher决定;
  • 匹配成功的类,通过ShenyuAgentTransformer改变其行为;

接下来我们就来着重分析这两个类。

4.1 定义匹配逻辑

  • ShenyuAgentTypeMatcher#matches()

ShenyuAgentTypeMatcher使用单例的设计模式,实现了ElementMatcher接口,重写了matches()方法。

是否能够匹配成功的逻辑是:如果目标类在拦截点joinPointMap集合中,就匹配成功。


public final class ShenyuAgentTypeMatcher extends ElementMatcher.Junction.AbstractBase<TypeDefinition> {
    // ......
    
    private Map<String, ShenyuAgentJoinPoint> joinPointMap;

	// 自定义匹配逻辑:如果目标类在拦截点joinPointMap集合中,就匹配成功
    @Override
    public boolean matches(final TypeDefinition target) {
        return joinPointMap.containsKey(target.getTypeName());
    }
    
}

4.2 改变匹配类的行为

在加载目标类时,如果匹配成功,会通过ShenyuAgentTransformer改变其行为,它实现了Transformer接口,重写了transform()方法,Transformer也是bytebuddy的一个接口。

public final class ShenyuAgentTransformer implements Transformer {
    //在匹配类中额外定义一个字段,传递上下文信息
    private static final String EXTRA_DATA = "_$EXTRA_DATA$_";
    //类型匹配器    
    private static final ShenyuAgentTypeMatcher MATCHER = ShenyuAgentTypeMatcher.getInstance();
    
    //重写transform方法,重新定义匹配类的行为
    //加载类期间,执行一次
    @Override
    public Builder<?> transform(final Builder<?> builder, final TypeDescription typeDescription, final ClassLoader classLoader, final JavaModule module) {
        //不是匹配的类就跳过
        if (!MATCHER.containsType(typeDescription)) {
            return builder;
        }
        //为该类新增加一个字段
        Builder<?> result = builder.defineField(EXTRA_DATA, Object.class, Opcodes.ACC_PRIVATE | Opcodes.ACC_VOLATILE).implement(TargetObject.class).intercept(FieldAccessor.ofField(EXTRA_DATA));
        // 获取拦截点
        ShenyuAgentJoinPoint joinPoint = MATCHER.loadShenyuAgentJoinPoint(typeDescription);
        //拦截构造器
        result = interceptorConstructorPoint(typeDescription, joinPoint.getConstructorPoints(), result);
        //拦截静态方法
        result = interceptorStaticMethodPoint(typeDescription, joinPoint.getStaticMethodPoints(), result);
        //拦截实例方法
        result = interceptorInstanceMethodPoint(typeDescription, joinPoint.getInstanceMethodPoints(), result);
        return result;
    }

    // ......
}

transform()方法中,重新定义了匹配类的行为:

  • 新增了字段,是TargetObject的子类,用于传递上下文;
  • 根据指定配置,是否拦截构造器;
  • 根据指定配置,是否拦截静态方法;
  • 根据指定配置,拦截实例方法;
4.2.3 拦截实例方法
  • ShenyuAgentTransformer#interceptorInstanceMethodPoint()

根据切面构建实例方法拦截点,获取Builder对象。

    private Builder<?> interceptorInstanceMethodPoint(final TypeDescription description, final Collection<InstanceMethodPointCut> pointCuts, final Builder<?> builder) {
        Collection<ShenyuAgentTransformerPoint<?>> points = description.getDeclaredMethods().stream()
                .filter(each -> !(each.isAbstract() || each.isSynthetic())) //过滤抽象方法,合成方法
                .map(each -> buildInstanceMethodTransformationPoint(pointCuts, each)) //构建实例方法拦截点
                .filter(Objects::nonNull)
                .collect(Collectors.toList());
        return getBuilder(description, builder, points); //获取Builder对象
    }
  • ShenyuAgentTransformer#buildInstanceMethodTransformationPoint()

过滤匹配上的方法,为方法获取对应的handler处理器,最后创建实例方法拦截点对象。

    private ShenyuAgentTransformerPoint<?> buildInstanceMethodTransformationPoint(final Collection<InstanceMethodPointCut> pointCuts, final InDefinedShape methodDescription) {
        List<InstanceMethodPointCut> points = pointCuts.stream().filter(point -> point.getMatcher().matches(methodDescription)).collect(Collectors.toList()); //过滤能够匹配上的方法
        if (points.isEmpty()) {
            return null;
        }
        List<InstanceMethodHandler> handlers = points.stream()
                .flatMap(pointCut -> pointCut.getHandlers().stream())
                .map(handler -> (InstanceMethodHandler) MATCHER.getOrCreateInstance(handler)) //获取对应的handler处理器
                .filter(Objects::nonNull)
                .collect(Collectors.toList());
        return new ShenyuAgentTransformerPoint<>(methodDescription, new InstanceMethodInterceptor(handlers));//创建实例方法拦截点对象,创建实例方法拦截器
    }

方法能否匹配成功:当前方法名称是否是在tracing-point.yaml文件中配置的方法名称;

handler的获取:是根据tracing-point.yaml文件中配置的全限定名去加载对应的类。

  • InstanceMethodInterceptor#intercept()

实例方法拦截器InstanceMethodInterceptor会在运行期间动态处理拦截方法。

public class InstanceMethodInterceptor {
    // ......
    /**
     * 拦截目标对象.
     *
     * @param target 当前被拦截的目标对象
     * @param method 目标对象的目标方法
     * @param args 目标方法的参数
     * @param callable 目标方法调用
     * @return 目标方法调用结果
     * @throws 异常信息
     */
    @RuntimeType //定义运行时的目标方法
    public Object intercept(@This final Object target, @Origin final Method method, @AllArguments final Object[] args, @SuperCall final Callable<?> callable) throws Exception {
        //目标方法执行结果
        Object result = null;
        //目标对象
        TargetObject instance = (TargetObject) target;
        //依次调用handler
        for (InstanceMethodHandler handler : handlerList) {
            MethodResult methodResult = new MethodResult();
            // 前置处理逻辑
            try {
                handler.before(instance, method, args, methodResult);
            } catch (final Throwable ex) {
                LOG.error("Failed to execute the before method of method {} in class {}", method.getName(), target.getClass(), ex);
            }
            //调用目标方法
            try {
                result = callable.call();
            } catch (final Throwable ex) {
			   //处理异常
                try {
                    handler.onThrowing(instance, method, args, ex);
                } catch (final Throwable ignored) {
                    LOG.error("Failed to execute the error handler of method {} in class {}", method.getName(), target.getClass(), ex);
                    throw ex;
                }
            } finally {
                //后置处理逻辑
                try {
                    result = handler.after(instance, method, args, methodResult, result);
                } catch (final Throwable ex) {
                    LOG.error("Failed to execute the after method of method {} in class {}", method.getName(), target.getClass(), ex);
                }
            }
        }
        //返回目标方法调用结果
        return result;
    }
}

实例方法拦截器在目标方法调用前,增加了前置处理逻辑,后置处理逻辑,以及异常处理逻辑。

这里用到了Byte Buddy的几个注解:

  • @RuntimeType: 定义运行时的目标方法,提示ByteBuddy禁用严格的类型检查;
  • @This:当前被拦截的、动态生成的实例对象;
  • @Origin:原有方法;
  • @AllArguments:获取所有入参;
  • @SuperCall:用于调用父类版本的方法。

实例方法处理器InstanceMethodHandler只是定义了三个接口,具体实现逻辑由具体插件去处理。

public interface InstanceMethodHandler {
     // 前置处理逻辑
    default void before(final TargetObject target, final Method method, final Object[] args, final MethodResult result) {
    }
    
    // 后置处理逻辑
    default Object after(final TargetObject target, final Method method, final Object[] args, final MethodResult methodResult, final Object result) {
        return result;
    }
   
    // 异常处理逻辑
    default void onThrowing(final TargetObject target, final Method method, final Object[] args, final Throwable throwable) {
    }
}

  • ShenyuAgentTransformer#getBuilder()

    AgentBuilder获取时,为指定的目标方法指定对应的拦截器,在原有方法上添加新的逻辑,改变原有方法行为。

    
    private static Builder<?> getBuilder(final TypeDescription description, final Builder<?> builder, final Collection<ShenyuAgentTransformerPoint<?>> points) {
        final Builder<?>[] result = {builder};
        points.forEach(point -> {
            try {
                result[0] = builder.method(ElementMatchers.is(point.getDescription()))//指定目标方法
                        .intercept(MethodDelegation.withDefaultConfiguration().to(point.getInterceptor()));//指定拦截器
                // CHECKSTYLE:OFF
            } catch (final Throwable ex) {
                // CHECKSTYLE:ON
                LOG.error("Failed to load handler class: {}", description.getTypeName(), ex);
            }
        });
        return result[0];
    }

通过以上的处理逻辑,就可以实现无侵入拦截实例方法了。

拦截方法intercept()的处理逻辑是:

  • 依次处理每个handler
  • 调用handler的前置方法;
  • 调用目标方法;
  • 如果目标方法有异常,则调用handler的异常处理方法;
  • 调用handler的后置方法。

接下来看看拦截静态方法。

4.2.3 拦截静态方法
  • ShenyuAgentTransformer#interceptorStaticMethodPoint()

过滤出静态方法,然后为静态方法构建静态方法拦截点。

    private Builder<?> interceptorStaticMethodPoint(final TypeDescription description, final Collection<StaticMethodPointCut> pointCuts, final Builder<?> builder) {
        Collection<ShenyuAgentTransformerPoint<?>> points = description.getDeclaredMethods().stream()
                .filter(each -> each.isStatic() && !(each.isAbstract() || each.isSynthetic())) // 当前方法是静态方法,不是抽象方法,不是合成方法
                .map(methodDescription -> buildStaticMethodTransformationPoint(pointCuts, methodDescription)) // 构建静态方法拦截点
                .filter(Objects::nonNull)
                .collect(Collectors.toList());
        return getBuilder(description, builder, points);
    }
  • ShenyuAgentTransformer#buildStaticMethodTransformationPoint()

根据配置文件进行过滤,判断当前的静态方法是否需要拦截。然后获取对应的处理器,最后构建静态方法拦截器对象。

    private ShenyuAgentTransformerPoint<?> buildStaticMethodTransformationPoint(final Collection<StaticMethodPointCut> pointCuts, final InDefinedShape methodDescription) {
        List<StaticMethodPointCut> staticMethodPoints = pointCuts.stream().filter(point -> point.getMatcher().matches(methodDescription)).collect(Collectors.toList()); //根据配置文件进行过滤,判断当前的静态方法是否需要拦截
        if (staticMethodPoints.isEmpty()) { // 如果没有配置,就直接返回了
            return null;
        }
        List<StaticMethodHandler> handlers = staticMethodPoints.stream()
                .flatMap(pointCut -> pointCut.getHandlers().stream())
                .map(handler -> (StaticMethodHandler) MATCHER.getOrCreateInstance(handler)) //获取对应的处理器
                .filter(Objects::nonNull)
                .collect(Collectors.toList());
        return new ShenyuAgentTransformerPoint<>(methodDescription, new StaticMethodInterceptor(handlers)); //构建静态方法拦截器对象
    }
  • StaticMethodInterceptor#intercept()

在运行时,会拦截目标方法,执行拦截器的处理逻辑 。


public class StaticMethodInterceptor {
   //......  
    
    /**
     * 拦截目标方法.
     */
    @RuntimeType
    public Object intercept(@Origin final Class<?> klass, @Origin final Method method, @AllArguments final Object[] args, @SuperCall final Callable<?> callable) throws Exception {
        Object result = null;
        // handler循环处理
        for (StaticMethodHandler handler : handlerList) {
            MethodResult methodResult = new MethodResult();
            try {
                //前置方法
                handler.before(klass, method, args, new MethodResult());
            } catch (final Throwable ex) {
                LOG.error("Failed to execute the before method of method {} in class {}", method.getName(), klass, ex);
            }
            try {
                // 调用当前方法
                // 目标方法是不是应该只会被调用一次?
                result = callable.call();
            } catch (final Throwable ex) {
                try {
                    //异常逻辑处理
                    handler.onThrowing(klass, method, args, ex);
                } catch (final Throwable ignored) {
                    LOG.error("Failed to execute the error handler of method {} in class {}", method.getName(), klass, ex);
                    throw ex;
                }
            } finally {
                try {
                    // 后置方法
                    handler.after(klass, method, args, methodResult);
                } catch (final Throwable ex) {
                    LOG.error("Failed to execute the after method of method {} in class {}", method.getName(), klass, ex);
                }
            }
        }
        //返回方法调用结果
        return result;
    }
}

拦截方法intercept()的处理逻辑是:

  • 依次处理每个handler
  • 调用handler的前置方法;
  • 调用目标方法;
  • 如果目标方法有异常,则调用handler的异常处理方法;
  • 调用handler的后置方法。

最后再看看如何拦截构造器。

4.2.3 拦截构造器
  • ShenyuAgentTransformer#interceptorConstructorPoint()

过滤出构造器,然后构建构造器的拦截点,最后创建builder对象,为构造方法添加拦截器。

 private Builder<?> interceptorConstructorPoint(final TypeDescription description, final Collection<ConstructorPointCut> constructorPoints, final Builder<?> builder) {
        Collection<ShenyuAgentTransformerPoint<? extends ConstructorInterceptor>> constructorAdviceComposePoints = description.getDeclaredMethods().stream()
                .filter(MethodDescription::isConstructor) //过滤出构造器
                .map(each -> buildConstructorTransformerPoint(constructorPoints, each))//构建构造器的拦截点
                .filter(Objects::nonNull)
                .collect(Collectors.toList());
        final Builder<?>[] result = {builder};
     // 创建builder对象,为构造方法添加拦截器
        constructorAdviceComposePoints.forEach(point -> {
            try {
                result[0] = builder.constructor(ElementMatchers.is(point.getDescription()))
                        .intercept(SuperMethodCall.INSTANCE.andThen(MethodDelegation.withDefaultConfiguration()
                                .to(point.getInterceptor())));//先调用父类构造器,然后添加拦截器
                // CHECKSTYLE:OFF
            } catch (final Throwable ex) {
                // CHECKSTYLE:ON
                LOG.error("Failed to load handler class: {}", description.getTypeName(), ex);
            }
        });
        return result[0];
    }
  • ShenyuAgentTransformer#buildConstructorTransformerPoint()

先获取到构造器拦截点,然后为拦截点创建handler实例对象,最后创建构造器拦截器对象。

private ShenyuAgentTransformerPoint<? extends ConstructorInterceptor> buildConstructorTransformerPoint(
            final Collection<ConstructorPointCut> constructorPoints, final InDefinedShape methodDescription) {
    //获取构造器拦截点
        List<ConstructorPointCut> constructorPointCutList = constructorPoints.stream().filter(each -> each.getMatcher().matches(methodDescription)).collect(Collectors.toList());
        if (constructorPointCutList.isEmpty()) {
            return null;
        }
        List<ConstructorHandler> handlers = constructorPointCutList.stream()
                .flatMap(pointCut -> pointCut.getHandlers().stream())
                .map(handler -> (ConstructorHandler) MATCHER.getOrCreateInstance(handler)) //创建拦截点的handler实例对象
                .filter(Objects::nonNull)
                .collect(Collectors.toList());
        return new ShenyuAgentTransformerPoint<>(methodDescription, new ConstructorInterceptor(handlers));//创建构造器拦截器
    }
  • ConstructorInterceptor#intercept()

构造器拦截器:在调用目标方法的构造器之前,执行每个handler的处理逻辑。


public class ConstructorInterceptor {
    //......
    
    /**
     * 拦截方法.
     */
    @RuntimeType
    public void intercept(@This final TargetObject target, @AllArguments final Object[] args) {
        for (ConstructorHandler handler : handlerList) {
            try {
                // handler处理逻辑
                handler.onConstructor(target, args);
            } catch (final Throwable throwable) {
                LOG.error("Constructor advice execution error. class: {}", target.getClass().getTypeName(), throwable);
            }
        }
    }
}

分析到此,我们分析完了创建agent的整个过程:

  • 根据配置文件,定义匹配逻辑ShenyuAgentTypeMatcher
  • 定义ShenyuAgentTransformer对象,改变匹配类的行为;
  • 通过InstanceMethodInterceptor拦截实例对象方法;
  • 通过StaticMethodInterceptor拦截静态方法;
  • 通过ConstructorInterceptor拦截构造器。

这里没有提到每个handler的处理逻辑,是因为handler的实现逻辑由每个插件自定义。比如,当前实例方法拦截器InstanceMethodHandler的实现类就有jaegeropentelemetryzipkin

5. 启动插件

创建完 agent之后,启动各个插件。

 public static void premain(final String arguments, final Instrumentation instrumentation) throws Exception {
        // 1. 读取配置文件
        // ......
        // 2. 加载所有插件
        // ......
        // 3. 创建 agent
       
        // 4. 启动插件
        PluginLifecycleManager lifecycleManager = new PluginLifecycleManager();
        lifecycleManager.startup(ShenyuAgentConfigUtils.getPluginConfigMap());
        //添加hook函数用于关闭插件
        Runtime.getRuntime().addShutdownHook(new Thread(lifecycleManager::close));
    }
  • PluginLifecycleManager

    PluginLifecycleManager负责插件的生命周期管理,用于启动插件和关闭插件。

    插件的启动,必须要在配置文件中指定。


public class PluginLifecycleManager {
   //......      
    /**
     * 启动插件.
     */
    public void startup(final Map<String, AgentPluginConfig> configMap) {
        //从配置文件中获取支持的插件名称
        Set<String> support = ShenyuAgentConfigUtils.getSupports();
        configMap.entrySet().stream()
                .filter(entry -> support.contains(entry.getKey())) //包含在配置文件中
                .forEach(entry -> Optional.ofNullable(SPILoader.load(AgentPluginBootService.class, entry.getKey())) //通过SPI加载插件启动类
                        .ifPresent(bootService -> {
                            try {
                                LOG.info("start shenyu plugin: {}", entry.getKey());
                                bootService.start(entry.getValue());  // 启动插件:执行插件的具体启动逻辑
                            } catch (final Throwable ex) {
                                LOG.error("Failed to start shenyu plugin", ex);
                            }
                        }));
    }
    
    /**
     * 关闭插件
     */
    public void close() {
        //通过SPI加载插件启动类
        SPILoader.loadList(AgentPluginBootService.class).forEach(each -> {
            try {
                each.close(); // 关闭插件:执行插件的具体关闭逻辑
            } catch (final Throwable ex) {
                LOG.error("Failed to close shenyu agent plugin", ex);
            }
        });
    }
}

插件的启动和关闭也是有每个插件具体去实现的,然后通过SPI去加载。

6. 总结

  • shenyu-agent模块的实现主要是通过Byte Buddy工具包;
  • 在配置文件shenyu-agent.yaml中,指定插件信息;
  • 插件加载过程通过SPI完成;
  • 拦截点通过配置文件指定,设计灵活;
  • 插件接口定义和实现分开,支持多种插件类型。

Apache ShenYu源码阅读系列-注册中心实现原理之Http注册

Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API 网关。

ShenYu网关中,注册中心是用于将客户端信息注册到shenyu-adminadmin再通过数据同步将这些信息同步到网关,网关通过这些数据完成流量筛选。客户端信息主要包括接口信息URI信息

本文基于shenyu-2.4.1版本进行源码分析,官网的介绍请参考 客户端接入原理

1. 注册中心原理

当客户端启动时,读取接口信息和uri信息,通过指定的注册类型,将数据发送到shenyu-admin

图中的注册中心需要用户指定使用哪种注册类型,ShenYu当前支持HttpZookeeperEtcdConsulNacos进行注册。具体如何配置请参考 客户端接入配置

ShenYu在注册中心的原理设计上引入了DisruptorDisruptor队列在其中起到数据与操作解耦,利于扩展。如果注册请求过多,导致注册异常,也有数据缓冲作用。

如图所示,注册中心分为两个部分,一是注册中心客户端register-client,负载处理客户端数据读取。另一个是注册中心服务端register-server,负载处理服务端(就是shenyu-admin)数据写入。通过指定注册类型进行数据发送和接收。

  • 客户端:通常来说就是一个微服务,可以是springmvcspring-clouddubbogrpc等。
  • register-client:注册中心客户端,读取客户接口和uri信息。
  • Disruptor:数据与操作解耦,数据缓冲作用。
  • register-server:注册中心服务端,这里就是shenyu-admin,接收数据,写入数据库,发数据同步事件。
  • 注册类型:指定注册类型,完成数据注册,当前支持HttpZookeeperEtcdConsulNacos

本文分析的是使用Http的方式进行注册,所以具体的处理流程如下:

在客户端,数据出队列后,通过http传输数据,在服务端,提供相应的接口,接收数据,然后写入队列。

2. 客户端注册流程

当客户端启动后,根据相关配置,读取属性信息,然后写入队列。以官方提供的 shenyu-examples-http 为例,开始源码分析。官方提供的例子是一个由springboot构建的微服务。注册中心的相关配置可以参考官网 客户端接入配置

2.1 加载配置,读取属性

先用一张图串联下注册中心客户端初始化流程:

我们分析的是通过http的方式进行注册,所以需要进行如下配置:

shenyu:
  register:
    registerType: http
    serverLists: http://localhost:9095
  client:
    http:
        props:
          contextPath: /http
          appName: http
          port: 8189  
          isFull: false

每个属性表示的含义如下:

  • registerType : 服务注册类型,填写 http
  • serverList: 为http注册类型时,填写Shenyu-Admin项目的地址,注意加上http://,多个地址用英文逗号分隔。
  • port: 你本项目的启动端口,目前springmvc/tars/grpc需要进行填写。
  • contextPath: 为你的这个mvc项目在shenyu网关的路由前缀, 比如/order/product 等等,网关会根据你的这个前缀来进行路由。
  • appName:你的应用名称,不配置的话,会默认取 spring.application.name 的值。
  • isFull: 设置 true 代表代理你的整个服务,false表示代理你其中某几个controller;目前适用于springmvc/springcloud

项目启动后,会先加载配置文件,读取属性信息,生成相应的Bean

首先读取到的配置文件是 ShenyuSpringMvcClientConfiguration,它是shenyu 客户端http注册配置类,通过@Configuration表示这是一个配置类,通过@ImportAutoConfiguration引入其他配置类。创建SpringMvcClientBeanPostProcessor,主要处理元数据。创建ContextRegisterListener,主要处理 URI 信息。

/**
 * shenyu 客户端http注册配置类
 */
@Configuration
@ImportAutoConfiguration(ShenyuClientCommonBeanConfiguration.class)
public class ShenyuSpringMvcClientConfiguration {

    //创建SpringMvcClientBeanPostProcessor,主要处理元数据
    @Bean
    public SpringMvcClientBeanPostProcessor springHttpClientBeanPostProcessor(final ShenyuClientConfig clientConfig,final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
        return new SpringMvcClientBeanPostProcessor(clientConfig.getClient().get(RpcTypeEnum.HTTP.getName()), shenyuClientRegisterRepository);
    }
    
   // 创建ContextRegisterListener,主要处理 URI信息
    @Bean
    public ContextRegisterListener contextRegisterListener(final ShenyuClientConfig clientConfig) {
        return new ContextRegisterListener(clientConfig.getClient().get(RpcTypeEnum.HTTP.getName()));
    }
}

ShenyuClientCommonBeanConfigurationshenyu客户端通用配置类,会创建注册中心客户端通用的bean

  • 创建ShenyuClientRegisterRepository,通过工厂类创建而成。
  • 创建ShenyuRegisterCenterConfig,读取shenyu.register属性配置。
  • 创建ShenyuClientConfig,读取shenyu.client属性配置。

/**
 * shenyu客户端通用配置类
 */
@Configuration
public class ShenyuClientCommonBeanConfiguration {
    
   // 创建ShenyuClientRegisterRepository,通过工厂类创建而成。
    @Bean
    public ShenyuClientRegisterRepository shenyuClientRegisterRepository(final ShenyuRegisterCenterConfig config) {
        return ShenyuClientRegisterRepositoryFactory.newInstance(config);
    }
    
	// 创建ShenyuRegisterCenterConfig,读取shenyu.register属性配置
    @Bean
    @ConfigurationProperties(prefix = "shenyu.register")
    public ShenyuRegisterCenterConfig shenyuRegisterCenterConfig() {
        return new ShenyuRegisterCenterConfig();
    }
    
  // 创建ShenyuClientConfig,读取shenyu.client属性配置
    @Bean
    @ConfigurationProperties(prefix = "shenyu")
    public ShenyuClientConfig shenyuClientConfig() {
        return new ShenyuClientConfig();
    }
}

2.2 用于注册的 HttpClientRegisterRepository

上面的配置文件中生成的ShenyuClientRegisterRepository是客户端注册的具体实现,它是一个接口,它的实现类如下。

  • HttpClientRegisterRepository:通过http进行注册;
  • ConsulClientRegisterRepository:通过Consul进行注册;
  • EtcdClientRegisterRepository:通过Etcd进行注册;
  • NacosClientRegisterRepository:通过nacos进行注册;
  • ZookeeperClientRegisterRepository通过Zookeeper进行注册。

具体是哪一种方式,是通过SPI进行加载实现的,实现逻辑如下:


/**
 * 加载 ShenyuClientRegisterRepository
 */
public final class ShenyuClientRegisterRepositoryFactory {
    
    private static final Map<String, ShenyuClientRegisterRepository> REPOSITORY_MAP = new ConcurrentHashMap<>();
    
    /**
     * 创建 ShenyuClientRegisterRepository
     */
    public static ShenyuClientRegisterRepository newInstance(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig) {
        if (!REPOSITORY_MAP.containsKey(shenyuRegisterCenterConfig.getRegisterType())) {
            // 通过SPI的方式进行加载,类型由registerType决定
            ShenyuClientRegisterRepository result = ExtensionLoader.getExtensionLoader(ShenyuClientRegisterRepository.class).getJoin(shenyuRegisterCenterConfig.getRegisterType());
            //执行初始化操作
            result.init(shenyuRegisterCenterConfig);
            ShenyuClientShutdownHook.set(result, shenyuRegisterCenterConfig.getProps());
            REPOSITORY_MAP.put(shenyuRegisterCenterConfig.getRegisterType(), result);
            return result;
        }
        return REPOSITORY_MAP.get(shenyuRegisterCenterConfig.getRegisterType());
    }
}

加载类型通过registerType指定,也就是我们在配置文件中指定的类型:

shenyu:
  register:
    registerType: http
    serverLists: http://localhost:9095

我们指定的是http,所以会去加载HttpClientRegisterRepository。对象创建成功后,执行的初始化方法init()如下:

@Join
public class HttpClientRegisterRepository implements ShenyuClientRegisterRepository {
    
    @Override
    public void init(final ShenyuRegisterCenterConfig config) {
        this.serverList = Lists.newArrayList(Splitter.on(",").split(config.getServerLists()));
    }
  
  // 暂时省略其他逻辑
}

读取配置文件中的serverLists,即sheenyu-admin的地址,为后续数据发送做准备。类注解@Join用于SPI的加载。

SPI 全称为 Service Provider Interface, 是 JDK 内置的一种服务提供发现功能, 一种动态替换发现的机制。

shenyu-spiApache ShenYu网关自定义的SPI扩展实现,设计和实现原理参考了DubboSPI扩展实现

2.3 构建元数据的 SpringMvcClientBeanPostProcessor

创建SpringMvcClientBeanPostProcessor,负责元数据的构建和注册,它的构造函数逻辑如下:


/**
 *  spring mvc 客户端bean的后置处理器
 */
public class SpringMvcClientBeanPostProcessor implements BeanPostProcessor {

    /**
     * 通过构造函数进行实例化
     */
    public SpringMvcClientBeanPostProcessor(final PropertiesConfig clientConfig,
                                            final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
        // 读取配置属性
        Properties props = clientConfig.getProps();
        // 获取端口信息,并校验
        int port = Integer.parseInt(props.getProperty(ShenyuClientConstants.PORT));
        if (port <= 0) {
            String errorMsg = "http register param must config the port must > 0";
            LOG.error(errorMsg);
            throw new ShenyuClientIllegalArgumentException(errorMsg);
        }
        // 获取appName
        this.appName = props.getProperty(ShenyuClientConstants.APP_NAME);
        // 获取contextPath
        this.contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH);
        // 校验appName和contextPath
        if (StringUtils.isBlank(appName) && StringUtils.isBlank(contextPath)) {
            String errorMsg = "http register param must config the appName or contextPath";
            LOG.error(errorMsg);
            throw new ShenyuClientIllegalArgumentException(errorMsg);
        }
        // 获取 isFull
        this.isFull = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString()));
        // 开始事件发布
        publisher.start(shenyuClientRegisterRepository);
    }

    // 暂时省略了其他逻辑
    @Override
    public Object postProcessAfterInitialization(@NonNull final Object bean, @NonNull final String beanName) throws BeansException {
     // 暂时省略了其他逻辑
    }

}

在构造函数中,主要是读取属性信息,然后进行校验。

shenyu:
  client:
    http:
        props:
          contextPath: /http
          appName: http
          port: 8189  
          isFull: false

最后,执行了 publisher.start(),开始事件发布,为注册做准备。

  • ShenyuClientRegisterEventPublisher

ShenyuClientRegisterEventPublisher通过单例模式实现,主要是生成元数据URI订阅器(后续用于数据发布),然后启动Disruptor队列。提供了一个共有方法publishEvent(),发布事件,向Disruptor队列发数据。


public class ShenyuClientRegisterEventPublisher {
    // 私有变量
    private static final ShenyuClientRegisterEventPublisher INSTANCE = new ShenyuClientRegisterEventPublisher();
    
    private DisruptorProviderManage providerManage;
    
    private RegisterClientExecutorFactory factory;
    
    /**
     * 公开静态方法
     *
     * @return ShenyuClientRegisterEventPublisher instance
     */
    public static ShenyuClientRegisterEventPublisher getInstance() {
        return INSTANCE;
    }
    
    /**
     * Start方法执行
     *
     * @param shenyuClientRegisterRepository shenyuClientRegisterRepository
     */
    public void start(final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
        // 创建客户端注册工厂类
        factory = new RegisterClientExecutorFactory();
        // 添加元数据订阅器
        factory.addSubscribers(new ShenyuClientMetadataExecutorSubscriber(shenyuClientRegisterRepository));
        //  添加URI订阅器
        factory.addSubscribers(new ShenyuClientURIExecutorSubscriber(shenyuClientRegisterRepository));
        // 启动Disruptor队列
        providerManage = new DisruptorProviderManage(factory);
        providerManage.startup();
    }
    
    /**
     * 发布事件,向Disruptor队列发数据
     *
     * @param <T> the type parameter
     * @param data the data
     */
    public <T> void publishEvent(final T data) {
        DisruptorProvider<Object> provider = providerManage.getProvider();
        provider.onData(f -> f.setData(data));
    }
}

SpringMvcClientBeanPostProcessor的构造函数逻辑分析完了,主要是读取属性配置,创建元数据和URI订阅器, 启动Disruptor队列。要注意到它实现了BeanPostProcessor,这是Spring提供的一个接口,在Bean的生命周期中,真正开始使用之前,会执行后置处理器的postProcessAfterInitialization()方法。

  • postProcessAfterInitialization() 方法

SpringMvcClientBeanPostProcessor作为一个后置处理器,它的功能是:读取注解中的元数据,并向admin注册。

// 后置处理器
public class SpringMvcClientBeanPostProcessor implements BeanPostProcessor {
   // 省略了其他逻辑
    
    // 后置处理器:读取注解中的元数据,并向admin注册
    @Override
    public Object postProcessAfterInitialization(@NonNull final Object bean, @NonNull final String beanName) throws BeansException {
        // 配置属性,如果 isFull=true 的话,表示注册整个微服务
        if (isFull) {
            return bean;
        }
        // 获取当前bean的Controller注解
        Controller controller = AnnotationUtils.findAnnotation(bean.getClass(), Controller.class);
         // 获取当前bean的RequestMapping注解
        RequestMapping requestMapping = AnnotationUtils.findAnnotation(bean.getClass(), RequestMapping.class);
        // 如果这个bean是一个接口
        if (controller != null || requestMapping != null) {
               // 获取当前bean的 ShenyuSpringMvcClient 注解
            ShenyuSpringMvcClient clazzAnnotation = AnnotationUtils.findAnnotation(bean.getClass(), ShenyuSpringMvcClient.class);
            String prePath = "";
            //如果没有 ShenyuSpringMvcClient 注解,就返回,表示这个接口不需要注册
            if (Objects.isNull(clazzAnnotation)) {
                return bean;
            }
             //如果 ShenyuSpringMvcClient 注解中的path属性包括 * ,表示注册整个接口
            if (clazzAnnotation.path().indexOf("*") > 1) {
                // 构建元数据,发送注册事件
                publisher.publishEvent(buildMetaDataDTO(clazzAnnotation, prePath));
                return bean;
            }
            
            prePath = clazzAnnotation.path();
            // 获取当前bean的所有方法
            final Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(bean.getClass());
            // 遍历方法
            for (Method method : methods) {
                // 获取当前方法上的注解 ShenyuSpringMvcClient
                ShenyuSpringMvcClient shenyuSpringMvcClient = AnnotationUtils.findAnnotation(method, ShenyuSpringMvcClient.class);
                // 如果方法上有注解ShenyuSpringMvcClient,就表示该方法需要注册
                if (Objects.nonNull(shenyuSpringMvcClient)) {
                    // 构建元数据,发送注册事件
                    publisher.publishEvent(buildMetaDataDTO(shenyuSpringMvcClient, prePath));
                }
            }
        }
        return bean;
    }

    // 构造元数据
    private MetaDataRegisterDTO buildMetaDataDTO(final ShenyuSpringMvcClient shenyuSpringMvcClient, final String prePath) {
        // contextPath上下文名称
        String contextPath = this.contextPath;
        // appName应用名称
        String appName = this.appName;
        // path注册路径
        String path;
        if (StringUtils.isEmpty(contextPath)) {
            path = prePath + shenyuSpringMvcClient.path();
        } else {
            path = contextPath + prePath + shenyuSpringMvcClient.path();
        }
        // desc描述信息
        String desc = shenyuSpringMvcClient.desc();
        // ruleName规则名称,没有填写的话就和path一致
        String configRuleName = shenyuSpringMvcClient.ruleName();
        String ruleName = StringUtils.isBlank(configRuleName) ? path : configRuleName;
        // 构建元数据
        return MetaDataRegisterDTO.builder()
                .contextPath(contextPath)
                .appName(appName)
                .path(path)
                .pathDesc(desc)
                .rpcType(RpcTypeEnum.HTTP.getName())
                .enabled(shenyuSpringMvcClient.enabled())
                .ruleName(ruleName)
                .registerMetaData(shenyuSpringMvcClient.registerMetaData())
                .build();
    }
}

在后置处理器中,需要读取配置属性,如果 isFull=true 的话,表示注册整个微服务。获取当前beanController注解、RequestMapping注解、ShenyuSpringMvcClient 注解,通过读取这些注解信息判断当前bean是否是接口?接口是否需要注册?方法是否需要注册?然后根据ShenyuSpringMvcClient 注解中的属性构建元数据,最后通过publisher.publishEvent()发布事件进行注册。

Controller注解和RequestMapping注解是由Spring提供的,这个大家应该很熟悉,不过多赘述。ShenyuSpringMvcClient 注解是由Apache ShenYu提供的,用于注册SpringMvc客户端,它的定义如下:


/**
 * shenyu 客户端接口,用于方法上或类上
 */
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface ShenyuSpringMvcClient {
    // path 注册路径
    String path();
    
    // ruleName 规则名称
    String ruleName() default "";
    
    // desc 描述信息
    String desc() default "";

    // enabled是否启用
    boolean enabled() default true;
    
    // registerMetaData 注册元数据
    boolean  registerMetaData() default false;
}

它的使用如下:

  • 注册整个接口
@RestController
@RequestMapping("/test")
@ShenyuSpringMvcClient(path = "/test/**")  // 表示整个接口注册
public class HttpTestController {
 //......
}
  • 注册当前方法
@RestController
@RequestMapping("/order")
@ShenyuSpringMvcClient(path = "/order")
public class OrderController {

    /**
     * Save order dto.
     *
     * @param orderDTO the order dto
     * @return the order dto
     */
    @PostMapping("/save")
    @ShenyuSpringMvcClient(path = "/save", desc = "Save order") // 注册当前方法
    public OrderDTO save(@RequestBody final OrderDTO orderDTO) {
        orderDTO.setName("hello world save order");
        return orderDTO;
    }
  • publisher.publishEvent() 发布注册事件

该方法会将数据发送到Disruptor队列中,关于Disruptor队列更多细节这里不做更多介绍,这不影响分析注册的流程。

当数据发送后,Disruptor队列的消费者会处理数据,进行消费。

  • QueueConsumer 消费数据

QueueConsumer是一个消费者,它实现了WorkHandler接口,它的创建过程在providerManage.startup()逻辑中。WorkHandler接口是disruptor的数据消费接口,只有一个方法是onEvent()

package com.lmax.disruptor;

public interface WorkHandler<T> {
    void onEvent(T var1) throws Exception;
}

QueueConsumer重写了onEvent()方法,主要逻辑是生成消费任务,然后在线程池中去执行。


/**
 * 
 * 队列消费者
 */
public class QueueConsumer<T> implements WorkHandler<DataEvent<T>> {
    
	// 省略了其他逻辑

    @Override
    public void onEvent(final DataEvent<T> t) {
        if (t != null) {
            // 通过工厂创建队列消费任务
            QueueConsumerExecutor<T> queueConsumerExecutor = factory.create();
            // 保存数据
            queueConsumerExecutor.setData(t.getData());
            // help gc
            t.setData(null);
            // 放在线程池中执行 消费任务
            executor.execute(queueConsumerExecutor);
        }
    }
}

QueueConsumerExecutor是在线程池中被执行的任务,它实现了Runnable接口,具体的实现类有两个:

  • RegisterClientConsumerExecutor:客户端消费者执行器;
  • RegisterServerConsumerExecutor:服务端消费者执行器。

顾名思义,一个负责处理客户端任务,一个负责处理服务端任务(服务端就是admin,在下文进行分析)。

  • RegisterClientConsumerExecutor 消费者执行器

重写的run()逻辑如下:


public final class RegisterClientConsumerExecutor extends QueueConsumerExecutor<DataTypeParent> {
    
	//...... 

    @Override
    public void run() {
        // 获取数据
        DataTypeParent dataTypeParent = getData();
        // 根据数据类型调用相应的处理器进行处理
        subscribers.get(dataTypeParent.getType()).executor(Lists.newArrayList(dataTypeParent));
    }
    
}

根据不同的数据类型调用不同的处理器去执行相应的任务。数据类型有两种,一个是元数据,记录客户端注册信息。一个是URI数据,记录客户端服务信息。

//数据类型
public enum DataType {
   // 元数据
    META_DATA,
    
   // URI数据
    URI,
}
  • ExecutorSubscriber#executor() 执行器订阅者

执行器订阅者也分为两类,一个是处理元数据,一个是处理URI。在客户端和服务端分别有两个,所以一共是四个。

  • ShenyuClientMetadataExecutorSubscriber#executor()

客户端这边对元数据处理逻辑是:遍历元数据信息,调用接口方法persistInterface()完成数据的发布。

public class ShenyuClientMetadataExecutorSubscriber implements ExecutorTypeSubscriber<MetaDataRegisterDTO> {
   
    //......
    
    @Override
    public DataType getType() {
        return DataType.META_DATA; // 元数据
    }
    
    @Override
    public void executor(final Collection<MetaDataRegisterDTO> metaDataRegisterDTOList) {
        for (MetaDataRegisterDTO metaDataRegisterDTO : metaDataRegisterDTOList) {
            // 调用接口方法persistInterface()完成数据的发布
            shenyuClientRegisterRepository.persistInterface(metaDataRegisterDTO);
        }
    }
}
  • ShenyuClientRegisterRepository#persistInterface()

ShenyuClientRegisterRepository是一个接口,用于表示客户端数据注册,它的实现类目前有五种,每一种就表示一种注册方法。

  • ConsulClientRegisterRepository:通过Consul实现客户端注册;
  • EtcdClientRegisterRepository:通过Etcd实现客户端注册;
  • HttpClientRegisterRepository:通过Http实现客户端注册;
  • NacosClientRegisterRepository:通过Nacos实现客户端注册;
  • ZookeeperClientRegisterRepository:通过Zookeeper实现客户端注册;

从图中可以看出,注册中心的加载是通过SPI的方式完成的。这个在前面提到过了,在客户端通用配置文件中,通过指定配置文件中的属性完成具体的类加载。


/**
 * 加载 ShenyuClientRegisterRepository
 */
public final class ShenyuClientRegisterRepositoryFactory {
    
    private static final Map<String, ShenyuClientRegisterRepository> REPOSITORY_MAP = new ConcurrentHashMap<>();
    
    /**
     * 创建 ShenyuClientRegisterRepository
     */
    public static ShenyuClientRegisterRepository newInstance(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig) {
        if (!REPOSITORY_MAP.containsKey(shenyuRegisterCenterConfig.getRegisterType())) {
            // 通过SPI的方式进行加载,类型由registerType决定
            ShenyuClientRegisterRepository result = ExtensionLoader.getExtensionLoader(ShenyuClientRegisterRepository.class).getJoin(shenyuRegisterCenterConfig.getRegisterType());
            //执行初始化操作
            result.init(shenyuRegisterCenterConfig);
            ShenyuClientShutdownHook.set(result, shenyuRegisterCenterConfig.getProps());
            REPOSITORY_MAP.put(shenyuRegisterCenterConfig.getRegisterType(), result);
            return result;
        }
        return REPOSITORY_MAP.get(shenyuRegisterCenterConfig.getRegisterType());
    }
}

本文的源码分析是基于Http的方式进行注册,所以我们先分析HttpClientRegisterRepository,其他的注册方式后续再分析。

通过http的方式注册很简单,就是调用工具类发送http请求。注册元数据和URI都是调用的同一个方法doRegister(),指定接口和类型就好。

  • /shenyu-client/register-metadata:服务端提供的接口用于注册元数据。
  • /shenyu-client/register-uri: 服务端提供的接口用于注册URI。
@Join
public class HttpClientRegisterRepository implements ShenyuClientRegisterRepository {
    // 服务端提供的接口用于注册元数据    
    private static final String META_PATH = "/shenyu-client/register-metadata";

    // 服务端提供的接口用于注册URI
    private static final String URI_PATH = "/shenyu-client/register-uri";

    //注册URI
    @Override
    public void persistURI(final URIRegisterDTO registerDTO) {
        doRegister(registerDTO, URI_PATH, Constants.URI);
    }
    
    //注册接口(就是元数据信息)
    @Override
    public void persistInterface(final MetaDataRegisterDTO metadata) {
        doRegister(metadata, META_PATH, META_TYPE);
    }
    
    // 进行注册
    private <T> void doRegister(final T t, final String path, final String type) {
        // 遍历admin服务列表(admin可能是集群)
        for (String server : serverList) {
            try {
                // 调用工具类发送 http 请求
                RegisterUtils.doRegister(GsonUtils.getInstance().toJson(t), server + path, type);
                return;
            } catch (Exception e) {
                LOGGER.error("register admin url :{} is fail, will retry", server);
            }
        }
    }
}

将数据序列化后,通过OkHttp发送数据。


public final class RegisterUtils {
   
   //...... 

    // 通过OkHttp发送数据
    public static void doRegister(final String json, final String url, final String type) throws IOException {
        String result = OkHttpTools.getInstance().post(url, json);
        if (Objects.equals(SUCCESS, result)) {
            LOGGER.info("{} client register success: {} ", type, json);
        } else {
            LOGGER.error("{} client register error: {} ", type, json);
        }
    }
}

至此,客户端通过http的方式注册元数据的逻辑就分析完了。小结一下:通过读取自定义的注解信息构造元数据,将数据发到Disruptor队列,然后从队列中消费数据,将消费者放到线程池中去执行,最终通过发送http请求到admin

客户端元数据注册流程的源码分析过程完成了,用流程图描述如下:

2.4 构建 URI 的 ContextRegisterListener

创建 ContextRegisterListener,负责客户端URI数据的构建和注册,它的创建是在配置文件中完成。

@Configuration
@ImportAutoConfiguration(ShenyuClientCommonBeanConfiguration.class)
public class ShenyuSpringMvcClientConfiguration {
     // ......
    
    //  创建 ContextRegisterListener
    @Bean
    public ContextRegisterListener contextRegisterListener(final ShenyuClientConfig clientConfig) {
        return new ContextRegisterListener(clientConfig.getClient().get(RpcTypeEnum.HTTP.getName()));
    }
}

ContextRegisterListener实现了ApplicationListener接口,并重写了onApplicationEvent()方法,当有Spring事件发生后,该方法会执行。

// 实现了ApplicationListener接口
public class ContextRegisterListener implements ApplicationListener<ContextRefreshedEvent> {

     //......

    //通过构造函数完成实例化
    public ContextRegisterListener(final PropertiesConfig clientConfig) {
        // 读取 shenyu.client.http 配置信息
        Properties props = clientConfig.getProps();
        // isFull是否注册整个服务
        this.isFull = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString()));
        // contextPath上下文路径
        String contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH);
        this.contextPath = contextPath;
        if (isFull) {
            if (StringUtils.isBlank(contextPath)) {
                String errorMsg = "http register param must config the contextPath";
                LOG.error(errorMsg);
                throw new ShenyuClientIllegalArgumentException(errorMsg);
            }
            this.contextPath = contextPath + "/**";
        }
        // port 客户端端口信息
        int port = Integer.parseInt(props.getProperty(ShenyuClientConstants.PORT));
        // appName 应用名称
        this.appName = props.getProperty(ShenyuClientConstants.APP_NAME);
        // host信息
        this.host = props.getProperty(ShenyuClientConstants.HOST);
        this.port = port;
    }

    // 当有上下文刷新事件ContextRefreshedEvent发生时,该方法会执行
    @Override
    public void onApplicationEvent(@NonNull final ContextRefreshedEvent contextRefreshedEvent) {
        //保证该方法的内容只执行一次
        if (!registered.compareAndSet(false, true)) {
            return;
        }
        // 如果是 isFull=true 代表注册整个服务,构建元数据并注册
        if (isFull) {
            publisher.publishEvent(buildMetaDataDTO());
        }
        
        // 构建URI数据并注册
        publisher.publishEvent(buildURIRegisterDTO());
    }

    // 构建URI数据
    private URIRegisterDTO buildURIRegisterDTO() {
        String host = IpUtils.isCompleteHost(this.host) ? this.host : IpUtils.getHost(this.host);
        return URIRegisterDTO.builder()
                .contextPath(this.contextPath)
                .appName(appName)
                .host(host)
                .port(port)
                .rpcType(RpcTypeEnum.HTTP.getName())
                .build();
    }

    // 构建元数据
    private MetaDataRegisterDTO buildMetaDataDTO() {
        String contextPath = this.contextPath;
        String appName = this.appName;
        return MetaDataRegisterDTO.builder()
                .contextPath(contextPath)
                .appName(appName)
                .path(contextPath)
                .rpcType(RpcTypeEnum.HTTP.getName())
                .enabled(true)
                .ruleName(contextPath)
                .build();
    }
}

在构造函数中主要是读取属性配置。

onApplicationEvent()方法是有Spring事件发生时会执行,这里的参数是ContextRefreshedEvent,表示上下文刷新事件。当Spring容器就绪后执行此处逻辑:如果是 isFull=true 代表注册整个服务,构建元数据并注册,在前面分析的后置处理器SpringMvcClientBeanPostProcessor中没有处理 isFull=true 的情况,所以在此处进行了处理。然后再构建URI数据并注册。

ContextRefreshedEventSpring内置事件。ApplicationContext被初始化或刷新时,该事件被触发。这也可以在 ConfigurableApplicationContext接口中使用 refresh() 方法来发生。此处的初始化是指:所有的Bean被成功装载,后处理Bean被检测并激活,所有Singleton Bean 被预实例化,ApplicationContext容器已就绪可用。

注册逻辑都是通过 publisher.publishEvent()完成。在前面都已经分析过了:向Disruptor队列写入数据,再从中消费数据,最后通过ExecutorSubscriber去处理。

  • ExecutorSubscriber#executor()

执行器订阅者分为两类,一个是处理元数据,一个是处理URI。在客户端和服务端分别有两个,所以一共是四个。

这里是注册URI信息,所以执行类是ShenyuClientURIExecutorSubscriber

  • ShenyuClientURIExecutorSubscriber#executor()

主要逻辑是遍历URI数据集合,通过persistURI()方法实现数据注册。


public class ShenyuClientURIExecutorSubscriber implements ExecutorTypeSubscriber<URIRegisterDTO> {
    
    //......
    
    @Override
    public DataType getType() {
        return DataType.URI; //数据类型是URI
    }
    
    // 注册URI数据
    @Override
    public void executor(final Collection<URIRegisterDTO> dataList) {
        for (URIRegisterDTO uriRegisterDTO : dataList) {
            Stopwatch stopwatch = Stopwatch.createStarted();
            while (true) {
                try (Socket ignored = new Socket(uriRegisterDTO.getHost(), uriRegisterDTO.getPort())) {
                    break;
                } catch (IOException e) {
                    long sleepTime = 1000;
                    // maybe the port is delay exposed
                    if (stopwatch.elapsed(TimeUnit.SECONDS) > 5) {
                        LOG.error("host:{}, port:{} connection failed, will retry",
                                uriRegisterDTO.getHost(), uriRegisterDTO.getPort());
                        // If the connection fails for a long time, Increase sleep time
                        if (stopwatch.elapsed(TimeUnit.SECONDS) > 180) {
                            sleepTime = 10000;
                        }
                    }
                    try {
                        TimeUnit.MILLISECONDS.sleep(sleepTime);
                    } catch (InterruptedException ex) {
                        ex.printStackTrace();
                    }
                }
            }
            //添加hook,优雅停止客户端 
            ShenyuClientShutdownHook.delayOtherHooks();
            
            // 注册URI
            shenyuClientRegisterRepository.persistURI(uriRegisterDTO);
        }
    }
}

代码中的while(true)循环是为了保证客户端已经成功启动了,通过hostport可以连接上。

后面的逻辑是:添加hook函数,用于优雅停止客户端 。

通过persistURI()方法实现数据注册。整个逻辑也在前面分析过了,最终就是通过OkHttp客户端向shenyu-admin发起http,通过http的方式注册URI

分析到这里就将客户端的注册逻辑分析完了,将构建的元数据和URI数据发送到Disruptor队列,再从中消费,读取数据,通过httpadmin发送数据。

客户端URI注册流程的源码分析完成了,流程图如下:

3. 服务端注册流程

3.1 注册接口ShenyuHttpRegistryController

从前面的分析可以知道,服务端提供了注册的两个接口:

  • /shenyu-client/register-metadata:服务端提供的接口用于注册元数据。
  • /shenyu-client/register-uri: 服务端提供的接口用于注册URI。

这两个接口位于ShenyuHttpRegistryController中,它实现了ShenyuServerRegisterRepository接口,是服务端注册的实现类。它用@Join标记,表示通过SPI进行加载。

// shenuyu客户端接口
@RequestMapping("/shenyu-client")
@Join
public class ShenyuHttpRegistryController implements ShenyuServerRegisterRepository {

    private ShenyuServerRegisterPublisher publisher;

    @Override
    public void init(final ShenyuServerRegisterPublisher publisher, final ShenyuRegisterCenterConfig config) {
        this.publisher = publisher;
    }
    
    // 注册元数据
    @PostMapping("/register-metadata")
    @ResponseBody
    public String registerMetadata(@RequestBody final MetaDataRegisterDTO metaDataRegisterDTO) {
        publish(metaDataRegisterDTO);
        return ShenyuResultMessage.SUCCESS;
    }
        
   // 注册URI
    @PostMapping("/register-uri")
    @ResponseBody
    public String registerURI(@RequestBody final URIRegisterDTO uriRegisterDTO) {
        publish(uriRegisterDTO);
        return ShenyuResultMessage.SUCCESS;
    }

    // 发布注册事件
    private <T> void publish(final T t) {
        publisher.publish(Collections.singletonList(t));
    }
}

两个注册接口获取到数据好,就调用了publish()方法,把数据发布到Disruptor队列中。

  • ShenyuServerRegisterRepository接口

ShenyuServerRegisterRepository接口是服务注册接口,它有五个实现类,表示有五种注册方式:

  • ConsulServerRegisterRepository:通过Consul实现注册;
  • EtcdServerRegisterRepository:通过Etcd实现注册;
  • NacosServerRegisterRepository:通过Nacos实现注册;
  • ShenyuHttpRegistryController:通过Http实现注册;
  • ZookeeperServerRegisterRepository:通过Zookeeper实现注册。

具体用哪一种方式,是通过配置文件指定的,然后通过SPI进行加载。

shenyu-admin中的application.yml文件中配置注册方式,registerType指定注册类型,当用http进行注册时,serverLists不需要填写,更多配置说明可以参考官网 客户端接入配置

shenyu:
  register:
    registerType: http 
    serverLists: 
  • RegisterCenterConfiguration 加载配置

在引入相关依赖和属性配置后,启动shenyu-admin时,会先加载配置文件,和注册中心相关的配置文件类是RegisterCenterConfiguration

// 注册中心配置类
@Configuration
public class RegisterCenterConfiguration {
    // 读取配置属性
    @Bean
    @ConfigurationProperties(prefix = "shenyu.register")
    public ShenyuRegisterCenterConfig shenyuRegisterCenterConfig() {
        return new ShenyuRegisterCenterConfig();
    }
    
    //创建ShenyuServerRegisterRepository,用于服务端注册
    @Bean
    public ShenyuServerRegisterRepository shenyuServerRegisterRepository(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig, final List<ShenyuClientRegisterService> shenyuClientRegisterService) {
        // 1.从配置属性中获取注册类型
        String registerType = shenyuRegisterCenterConfig.getRegisterType();
        // 2.通过注册类型,以SPI的方法加载实现类
        ShenyuServerRegisterRepository registerRepository = ExtensionLoader.getExtensionLoader(ShenyuServerRegisterRepository.class).getJoin(registerType);
        // 3.获取publisher,向Disruptor队列中写数据
        RegisterServerDisruptorPublisher publisher = RegisterServerDisruptorPublisher.getInstance();
        // 4.注册Service, rpcType -> registerService
        Map<String, ShenyuClientRegisterService> registerServiceMap = shenyuClientRegisterService.stream().collect(Collectors.toMap(ShenyuClientRegisterService::rpcType, e -> e));
        // 5.事件发布的准备工作
        publisher.start(registerServiceMap);
        // 6.注册的初始化操作
        registerRepository.init(publisher, shenyuRegisterCenterConfig);
        return registerRepository;
    }
}

在配置类中生成了两个bean

  • shenyuRegisterCenterConfig:读取属性配置;

  • shenyuServerRegisterRepository:用于服务端注册。

在创建shenyuServerRegisterRepository的过程中,也进行了一系列的准备工作:

  • 1.从配置属性中获取注册类型。
  • 2.通过注册类型,以SPI的方法加载实现类:比如指定的类型是http,就会加载ShenyuHttpRegistryController
  • 3.获取publisher,向Disruptor队列中写数据。
  • 4.注册ServicerpcType -> registerService:获取注册的Service,每种rpc都有对应的Service。本文的客户端构建是通过springboot,属于http类型,还有其他客户端类型:dubboSpring CloudgRPC等。
  • 5.事件发布的准备工作:添加服务端元数据和URI订阅器,处理数据。并且启动Disruptor队列。
  • 6.注册的初始化操作:http类型的注册初始化操作就是保存publisher

  • RegisterServerDisruptorPublisher#publish()

服务端向Disruptor队列写入数据的发布者 ,通过单例模式构建。


public class RegisterServerDisruptorPublisher implements ShenyuServerRegisterPublisher {
    //私有属性
    private static final RegisterServerDisruptorPublisher INSTANCE = new RegisterServerDisruptorPublisher();

    //公开静态方法获取实例
    public static RegisterServerDisruptorPublisher getInstance() {
        return INSTANCE;
    }
    
   //事件发布的准备工作,添加服务端元数据和URI订阅器,处理数据。并且启动Disruptor队列。
    public void start(final Map<String, ShenyuClientRegisterService> shenyuClientRegisterService) {
        //服务端注册工厂
        factory = new RegisterServerExecutorFactory();
        //添加URI数据订阅器
        factory.addSubscribers(new URIRegisterExecutorSubscriber(shenyuClientRegisterService));
        //添加元数据订阅器
        factory.addSubscribers(new MetadataExecutorSubscriber(shenyuClientRegisterService));
        //启动Disruptor队列
        providerManage = new DisruptorProviderManage(factory);
        providerManage.startup();
    }
    
    // 向队列中写入数据
    @Override
    public <T> void publish(final T data) {
        DisruptorProvider<Object> provider = providerManage.getProvider();
        provider.onData(f -> f.setData(data));
    }
    
    @Override
    public void close() {
        providerManage.getProvider().shutdown();
    }
}

配置文件的加载,可看作是注册中心服务端初始化流程,用图描述如下:

3.2 消费数据QueueConsumer

在前面分析了客户端disruptor队列消费数据的过。服务端也是一样的逻辑,只是其中执行任务的执行者变了。

QueueConsumer是一个消费者,它实现了WorkHandler接口,它的创建过程在providerManage.startup()逻辑中。WorkHandler接口是disruptor的数据消费接口,只有一个方法是onEvent()

package com.lmax.disruptor;

public interface WorkHandler<T> {
    void onEvent(T var1) throws Exception;
}

QueueConsumer重写了onEvent()方法,主要逻辑是生成消费任务,然后在线程池中去执行。

/**
 * 
 * 队列消费者
 */
public class QueueConsumer<T> implements WorkHandler<DataEvent<T>> {
    
	// 省略了其他逻辑

    @Override
    public void onEvent(final DataEvent<T> t) {
        if (t != null) {
            // 通过工厂创建队列消费任务
            QueueConsumerExecutor<T> queueConsumerExecutor = factory.create();
            // 保存数据
            queueConsumerExecutor.setData(t.getData());
            // help gc
            t.setData(null);
            // 放在线程池中执行 消费任务
            executor.execute(queueConsumerExecutor);
        }
    }
}

QueueConsumerExecutor是在线程池中被执行的任务,它实现了Runnable接口,具体的实现类有两个:

  • RegisterClientConsumerExecutor:客户端消费者执行器;
  • RegisterServerConsumerExecutor:服务端消费者执行器。

顾名思义,一个负责处理客户端任务,一个负责处理服务端任务。

  • RegisterServerConsumerExecutor#run()

RegisterServerConsumerExecutor是服务端消费者执行器,它通过QueueConsumerExecutor间接实现了Runnable接口,并重写了run()方法。


public final class RegisterServerConsumerExecutor extends QueueConsumerExecutor<List<DataTypeParent>> {
   // ...

    @Override
    public void run() {
        //获取从disruptor队列中拿到的数据
        List<DataTypeParent> results = getData();
        // 数据校验
        results = results.stream().filter(data -> isValidData(data)).collect(Collectors.toList());
        if (CollectionUtils.isEmpty(results)) {
            return;
        }
        //根据类型执行操作
        getType(results).executor(results);
    }
    
    // 根据类型获取订阅者
    private ExecutorSubscriber getType(final List<DataTypeParent> list) {
        DataTypeParent result = list.get(0);
        return subscribers.get(result.getType());
    }
}

  • ExecutorSubscriber#executor()

执行器订阅者分为两类,一个是处理元数据,一个是处理URI。在客户端和服务端分别有两个,所以一共是四个。

  • MetadataExecutorSubscriber#executor()

如果是注册元数据,则通过MetadataExecutorSubscriber#executor()实现:根据类型获取注册Service,调用register()

public class MetadataExecutorSubscriber implements ExecutorTypeSubscriber<MetaDataRegisterDTO> {
 
    //......

    @Override
    public DataType getType() {
        return DataType.META_DATA;  // 元数据类型
    }

    @Override
    public void executor(final Collection<MetaDataRegisterDTO> metaDataRegisterDTOList) {
        // 遍历元数据列表
        for (MetaDataRegisterDTO metaDataRegisterDTO : metaDataRegisterDTOList) {
            // 根据类型获取注册Service
            ShenyuClientRegisterService shenyuClientRegisterService = this.shenyuClientRegisterService.get(metaDataRegisterDTO.getRpcType());
            Objects.requireNonNull(shenyuClientRegisterService);
            // 对元数据进行注册,加锁确保顺序执行,防止并发错误
            synchronized (ShenyuClientRegisterService.class) {
                shenyuClientRegisterService.register(metaDataRegisterDTO);
            }
        }
    }
}
  • URIRegisterExecutorSubscriber#executor()

如果是注册元数据,则通过URIRegisterExecutorSubscriber#executor()实现:构建URI数据,根据注册类型查找Service,通过registerURI方法实现注册。


public class URIRegisterExecutorSubscriber implements ExecutorTypeSubscriber<URIRegisterDTO> {
    //......
    
    @Override
    public DataType getType() {
        return DataType.URI; // URI数据类型
    }
    
    @Override
    public void executor(final Collection<URIRegisterDTO> dataList) {
        if (CollectionUtils.isEmpty(dataList)) {
            return;
        }
        // 构建URI数据类型,通过registerURI方法实现注册
        findService(dataList).ifPresent(service -> {
            Map<String, List<URIRegisterDTO>> listMap = buildData(dataList);
            listMap.forEach(service::registerURI);
        });
    }
    
    // 根据类型查找Service
    private Optional<ShenyuClientRegisterService> findService(final Collection<URIRegisterDTO> dataList) {
        return dataList.stream().map(dto -> shenyuClientRegisterService.get(dto.getRpcType())).findFirst();
    }
}

  • ShenyuClientRegisterService#register()

ShenyuClientRegisterService是注册方法接口,它有多个实现类:

  • AbstractContextPathRegisterService:抽象类,处理部分公共逻辑;
  • AbstractShenyuClientRegisterServiceImpl::抽象类,处理部分公共逻辑;
  • ShenyuClientRegisterDivideServiceImpldivide类,处理http注册类型;
  • ShenyuClientRegisterDubboServiceImpldubbo类,处理dubbo注册类型;
  • ShenyuClientRegisterGrpcServiceImplgRPC类,处理gRPC注册类型;
  • ShenyuClientRegisterMotanServiceImplMotan类,处理Motan注册类型;
  • ShenyuClientRegisterSofaServiceImplSofa类,处理Sofa注册类型;
  • ShenyuClientRegisterSpringCloudServiceImplSpringCloud类,处理SpringCloud注册类型;
  • ShenyuClientRegisterTarsServiceImplTars类,处理Tars注册类型;

从上面可以看出每种微服务都有对应的注册实现类,本文的源码分析是 以官方提供的 shenyu-examples-http 为例,是属http注册类型,所以元数据和URI数据的注册实现类是 ShenyuClientRegisterDivideServiceImpl

  • register(): 注册元数据
public String register(final MetaDataRegisterDTO dto) {
        // 1.注册选择器信息
        String selectorHandler = selectorHandler(dto);
        String selectorId = selectorService.registerDefault(dto, PluginNameAdapter.rpcTypeAdapter(rpcType()), selectorHandler);
        // 2.注册规则信息
        String ruleHandler = ruleHandler();
        RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);
        ruleService.registerDefault(ruleDTO);
        // 3.注册元数据信息
        registerMetadata(dto);
        // 4.注册contextPath
        String contextPath = dto.getContextPath();
        if (StringUtils.isNotEmpty(contextPath)) {
            registerContextPath(dto);
        }
        return ShenyuResultMessage.SUCCESS;
    }

整个注册逻辑可以分为4个步骤:

  • 1.注册选择器信息
  • 2.注册规则信息
  • 3.注册元数据信息
  • 4.注册contextPath

admin这一侧通过客户端的元数据信息需要构建选择器、规则、元数据和ContextPath。具体的注册过程和细节处理跟rpc类型有关。我们就不再继续向下追踪了,对于注册中心的逻辑分析,跟踪到这里就够了。

服务端元数据注册流程的源码分析完了,流程图描述如下:

  • registerURI(): 注册URI数据
public String registerURI(final String selectorName, final List<URIRegisterDTO> uriList) {
        if (CollectionUtils.isEmpty(uriList)) {
            return "";
        }
        // 对应的选择器是否存在
        SelectorDO selectorDO = selectorService.findByNameAndPluginName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));
        if (Objects.isNull(selectorDO)) {
            return "";
        }
        // 处理选择器中的handler信息
        String handler = buildHandle(uriList, selectorDO);
        selectorDO.setHandle(handler);
        SelectorData selectorData = selectorService.buildByName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));
        selectorData.setHandle(handler);
       
        // 更新数据库中的记录
        selectorService.updateSelective(selectorDO);
        // 发布事件
        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData)));
        return ShenyuResultMessage.SUCCESS;
    }

admin拿到URI数据后,主要是更新选择器中的handler信息,然后写入到数据库,最后发布事件通知网关。通知网关的逻辑是由数据同步操作完成,这在之前的文章中已经分析过了,就不再赘述。

服务端URI注册流程的源码分析完成了,用图描述如下:

至此,服务端注册流程也就分析完了,主要通过对外提供的接口,接受客户端的注册信息,然后写入到Disruptor队列,再从中消费数据,根据接收到的元数据和URI数据更新admin的选择器、规则、元数据和选择器的handler

4. 总结

本文主要对Apache ShenYu网关中的http注册模块进行了源码分析。涉及到的主要知识点,归纳如下:

  • 注册中心是为了将客户端信息注册到admin,方便流量筛选;
  • http注册是将客户端元数据信息和URI信息注册到admin
  • http服务的接入通过注解@ShenyuSpringMvcClient标识;
  • 注册信息的构建主要通过Spring的后置处理器BeanPostProcessor和应用监听器ApplicationListener
  • 注册类型的加载通过SPI完成;
  • 引入Disruptor队列是为了数据与操作解耦,以及数据缓冲。
  • 注册中心的实现采用了面向接口编程,使用模板方法、单例、观察者等设计模式。