Soul源码阅读系列(三)插件是如何被加载和执行的?

在上篇文章中,我们通过一个案例演示了http用户如何接入到Soul网关中,本文将探索其中的原理:

  • Soul如何加载插件?
  • 业务接口如何注册到soul admin中?
  • Divide插件的原理是什么?

Soul 如何加载插件?

首先,我们来看看Soul是如何加载各个插件的?在上一篇文章中,我们看到了Divide插件的调用过程:

  • SoulWebHandler:它实现了WebHandler,重写了handle()方法,用于处理Soul网关中所有的请求。
  • DefaultSoulPluginChain:插件链执行类,以责任链的设计模式处理所有插件。
  • AbstractSoulPlugin:多个插件的父类,以模板方法设计模式实现各种插件类型。
  • DividePlugindivide插件,用于处理http请求。

通过查看源码,可以发现在SoulWebHandler中,是通过构造器的方式将所有插件设置进来(在Soul网关中使用了Reactor并行编程,代码中的scheduler就是由reactor包实现,此处的作用是创建线程池)。那么又会有两个问题:一是 插件是如何生成的?二是 SoulWebHandler是何时被创建的?

public final class SoulWebHandler implements WebHandler {
    //...   
    
    public SoulWebHandler(final List<SoulPlugin> plugins) {
        //保存所有插件
        this.plugins = plugins;
        String schedulerType = System.getProperty("soul.scheduler.type", "fixed");
        if (Objects.equals(schedulerType, "fixed")) {
            //获取可用线程数
            int threads = Integer.parseInt(System.getProperty(
                    "soul.work.threads", "" + Math.max((Runtime.getRuntime().availableProcessors() << 1) + 1, 16)));
            //创建CPU内核数量的多线程池
            scheduler = Schedulers.newParallel("soul-work-threads", threads);
        } else {
            //无限制的弹性线程池,可以一直创建线程
            scheduler = Schedulers.elastic();
        }
    }
 
    //...    
}

IDEA编辑器中点击SoulWebHandler,查看被调用的地方,发现是在SoulConfiguration这个配置文件中,通过注解的方式创建了webHandler这个bean。所有的插件plugins也是作为一个参数传进来的,还得继续向上追踪。(由于版面有限,源码中省略了部分逻辑)

@Configuration声明一个类是配置类,在Spring Boot 启动时会加载配置类。

@Bean注解作用于方法上,会从Spring容器中将同类型的SoulPlugin自动注入进来。

@Configuration
public class SoulConfiguration {
    //...
    
    @Bean("webHandler")
    public SoulWebHandler soulWebHandler(final ObjectProvider<List<SoulPlugin>> plugins) {
        List<SoulPlugin> pluginList = plugins.getIfAvailable(Collections::emptyList);
        //排序
        final List<SoulPlugin> soulPlugins = pluginList.stream()
                .sorted(Comparator.comparingInt(SoulPlugin::getOrder)).collect(Collectors.toList());
        soulPlugins.forEach(soulPlugin -> log.info("load plugin:[{}] [{}]", soulPlugin.named(), soulPlugin.getClass().getName()));
        return new SoulWebHandler(soulPlugins);
    }
 //...   
}

那到这里,就清楚了:只要实现了SoulPlugin的类就会被注入进来,所以再看看SoulPlugin的实现类有哪些?

通过编辑器发现有 34 个类实现了SoulPlugin接口,其中有AbstractSoulPlugin,还有实现http请求接入的DividePlugin,还有其他的插件,我们后面再陆续探究。

现在还有问题是:各个插件是何时被创建的?我们以DividePlugin插件为例,通过IDEA编辑器点击DividePlugin,发现在DividePluginConfiguration中被创建了。

DividePluginConfiguration也是一个配置文件,在里面配置了dividePlugin这个bean,它的类型是SoulPlugin,所以在SoulConfiguration会自动注入到soulWebHandler()方法中。

@Configuration
public class DividePluginConfiguration {

    @Bean
    public SoulPlugin dividePlugin() {
        return new DividePlugin();
    }
    //...
}

DividePluginConfiguration是通过spring boot starter的方式自动加载的。

spring boot starter可以自动加载依赖,它在Spring Boot启动时,自动加载资源文件夹META-INF\spring.factories中配置的类。

分析到这里,就知道了Soul是如何加载插件的:在soul网关启动的时候,spring boot starter自动加载相关配置类,创建插件bean,然后在SoulWebHandler中将插件bean注入进来,保存到List中。用一张图来描述一下上述过程:

业务接口如何注册到 soul admin 中?

知道了插件是怎么被加载到soul网关中的之后,接下来再看看业务接口是如何注册到soul admin中?如下图所示,展示了divide插件对应的选择器列表和选择器规则列表,这些选择器和规则我们并没有自己添加上去,为什么就会存在呢?

是因为在业务系统中进行了配置:

  • adminUrlsoul-admin的地址,用于将业务系统的接口注册到soul-admin后台管理中去;
  • port:业务系统的端口;
  • contextPath:业务系统在网关中的上下文名称;
  • appName:业务系统的名称;
  • full:是否代理全部,如果是true,则代理业务系统的所有接口。
soul:
  http:
    adminUrl: http://localhost:9095 #soul-admin的地址
    port: 8188
    contextPath: /http
    appName: http
    full: false

还有一个关键注解是@SoulSpringMvcClient,业务系统在启动时,会读取这个注解进行处理。处理类是SpringMvcClientBeanPostProcessor,实现了BeanPostProcessor,是一个后置处理器,在bean的创建前后,分别有方法进行处理。

BeanPostProcessor 接口是 Spring 中的一个后置处理器接口,它的作用主要是如果我们需要在 Spring 容器完成 Bean 的实例化、配置和其他的初始化前后添加一些自己的逻辑处理,就可以实现该接口的,然后注册到容器中。

SpringMvcClientBeanPostProcessor部分源码如下,它做了下面几件事情:

  • bean初始化前,创建线程池,其中的线程用于将接口信息发送到soul-admin
  • bean初始化后,处理SoulSpringMvcClient注解;
  • 如果SoulSpringMvcClient注解作用于类上,那么就表示该类的所有接口都被网关代理,通过线程池中的线程将接口类发送到soul-admin中进行注册;
  • 如果SoulSpringMvcClient注解作用于方法上,那么就将该方法的接口信息通过线程池中的线程发送到soul-admin中进行注册。
public class SpringMvcClientBeanPostProcessor implements BeanPostProcessor {
	//..

	//在bean初始化前,创建线程池,其中的线程用于将接口信息发送到soul-admin中去
    public SpringMvcClientBeanPostProcessor(final SoulSpringMvcConfig soulSpringMvcConfig) {
		//...
        url = soulSpringMvcConfig.getAdminUrl() + "/soul-client/springmvc-register";
        executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
    }

    @Override
    public Object postProcessAfterInitialization(@NonNull final Object bean, @NonNull final String beanName) throws BeansException {
        //full是否为 true
        if (soulSpringMvcConfig.isFull()) {
            return bean;
        }
        //获取 Controller 注解
        Controller controller = AnnotationUtils.findAnnotation(bean.getClass(), Controller.class);
        //获取 RequestMapping 注解
        RequestMapping requestMapping = AnnotationUtils.findAnnotation(bean.getClass(), RequestMapping.class);
        if (controller != null || requestMapping != null) {
            //获取 SoulSpringMvcClient 注解
            SoulSpringMvcClient clazzAnnotation = AnnotationUtils.findAnnotation(bean.getClass(), SoulSpringMvcClient.class);
            String prePath = "";
            //是否作用于类上
            if (Objects.nonNull(clazzAnnotation)) {
                if (clazzAnnotation.path().indexOf("*") > 1) {
                    String finalPrePath = prePath;
                    //将接口信息发送到soul-amin
                    executorService.execute(() -> RegisterUtils.doRegister(buildJsonParams(clazzAnnotation, finalPrePath), url,
                            RpcTypeEnum.HTTP));
                    return bean;
                }
                prePath = clazzAnnotation.path();
            }
            
            final Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(bean.getClass());
            for (Method method : methods) {
                SoulSpringMvcClient soulSpringMvcClient = AnnotationUtils.findAnnotation(method, SoulSpringMvcClient.class);
                //SoulSpringMvcClient注解是否作用于方法上
                if (Objects.nonNull(soulSpringMvcClient)) {
                    String finalPrePath = prePath;
                     //将接口信息发送到soul-amin
                    executorService.execute(() -> RegisterUtils.doRegister(buildJsonParams(soulSpringMvcClient, finalPrePath), url,
                            RpcTypeEnum.HTTP));
                }
            }
        }
        return bean;
    }
    
    //...
}

通过debug可以看到发送的某个方法接口信息,包括业务系统名称,上下文名称,方法请求路径,是否启用等等。

{"appName":"http","context":"/http","path":"/http/test/**","pathDesc":"","rpcType":"http","host":"192.168.236.75","port":8188,"ruleName":"/http/test/**","enabled":true,"registerMetaData":false}

发送的路径是soul-admin后端的一个接口:

http://localhost:9095/soul-client/springmvc-register

那接着再跟踪一下springmvc-register这个接口又做了什么?

进入它的实现类(下面的源码只保留的主要逻辑),开始处理选择器信息,处理规则信息,都处理成功了,就返回一个成功的信息。

    public String registerSpringMvc(final SpringMvcRegisterDTO dto) {
		//...
        String selectorId = handlerSpringMvcSelector(dto);
        handlerSpringMvcRule(selectorId, dto);
        return SoulResultMessage.SUCCESS;
    }

选择器和规则的处理逻辑是相似的:先更新soul-admin数据中的选择器或规则信息;然后将选择器或规则信息通过发布事件的方式发送到soul网关。

private String handlerSpringMvcSelector(final SpringMvcRegisterDTO dto) {
        //...
        //更新选择器信息,保存到数据库
        selectorMapper.updateSelective(selectorDO);
        //保存业务系统信息
        upstreamCheckService.submit(contextPath, addDivideUpstream);
        //发布事件
        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData)));
        }
        return selectorId;
    }

发布事件的操作比较复杂,涉及到数据同步的原理,我们后面再专门进行分析,今天就不再深入了。

分析到这里,终于可以回答 业务接口如何注册到soul admin 中? 这个问题了。在业务系统中,将想要被soul网关代理的接口,加上@SoulSpringMvcClient注解,当系统启动时,将接口信息通过多线程的方式(基于httpPOST请求)发送到soul-admin后台管理系统中。在soul-admin中,一方面将接口信息保存到自己的数据库,另一方面发布接口信息事件到soul网关,这样soul网关就知道哪些接口可以被代理,哪些接口直接跳过。再用图片描述一下处理过程:

Divide 插件的原理是什么?

在上面的分析中明白了插件的加载过程和接口信息的注册过程,现在来解决最后一个问题:divide插件的执行原理。

由前面的分析文章可以知道,网关的所有请求最终都会来到 SoulWebHandler 进行处理。handle()相当于请求入口,参数 ServerWebExchange携带了请求信息。在这个方法里面,创建了一个对象保存所有插件,并将请求交给了之前插件的线程池中。

public final class SoulWebHandler implements WebHandler {   
	//...
    @Override
    public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {
        return new DefaultSoulPluginChain(plugins).execute(exchange).subscribeOn(scheduler);
    }
}

DefaultSoulPluginChain这个类中的execute()方法采用了责任链的设计模式,依次处理所有插件,所以DividePlugin会在这里被执行。


    private static class DefaultSoulPluginChain implements SoulPluginChain {
        //...
        @Override
        public Mono<Void> execute(final ServerWebExchange exchange) {
            return Mono.defer(() -> {
                if (this.index < plugins.size()) {
                    SoulPlugin plugin = plugins.get(this.index++);
                    Boolean skip = plugin.skip(exchange);
                    if (skip) {
                        return this.execute(exchange);
                    }
                    return plugin.execute(exchange, this);
                }
                return Mono.empty();
            });
        }
    }
}

DividePlugin继承了AbstractSoulPlugin,所以会执行plugin.execute()方法,这方法是许多插件的共有方法,也就是模板方法。插件类的继承关系使用了模板方法设计模式,共有方法是execute(),抽象方法是doExecute()

共有方法execute()主要作用是:匹配插件,匹配选择器,匹配规则。有一个没有匹配上,就去处理下一个插件。soul网关的内存中保存了插件,选择器和规则信息,这些信息的实时更新是从soul-admin发布事件同步过来的。

  • 匹配插件:从soul网关的内存中获取插件信息,判断插件是否存在,是否被启用 ,如果没有找到就执行下一个插件。
  • 匹配选择器:从soul网关的内存中获取选择器信息,判断选择器信息能否匹配成功,如果没有匹配上就执行下一个插件。
  • 匹配规则:从soul网关的内存中获取规则信息,判断规则信息能否匹配成功,如果没有匹配上就执行下一个插件。
public abstract class AbstractSoulPlugin implements SoulPlugin {
    //..
        protected abstract Mono<Void> doExecute(ServerWebExchange exchange, SoulPluginChain chain, SelectorData selector, RuleData rule);

   //...
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
        String pluginName = named();
       //从soul网关的内存中获取插件信息
        final PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
      //插件是否存在,是否被启用    
    if (pluginData != null && pluginData.getEnabled()) {
          //从soul网关的内存中获取选择器信息
            final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);
           //如果选择器信息不存在,就处理下一个插件
            if (CollectionUtils.isEmpty(selectors)) {
                return handleSelectorIsNull(pluginName, exchange, chain);
            }
        //插件是否可以匹配上
            final SelectorData selectorData = matchSelector(exchange, selectors);
            if (Objects.isNull(selectorData)) {
                return handleSelectorIsNull(pluginName, exchange, chain);
            }
        //记录选择器日志
        selectorLog(selectorData, pluginName);
        
        //从soul网关的内存中获取规则信息
            final List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());
        //规则不存在,则执行下一个插件    
        if (CollectionUtils.isEmpty(rules)) {
                return handleRuleIsNull(pluginName, exchange, chain);
            }
            RuleData rule;
            if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {
                //get last
                rule = rules.get(rules.size() - 1);
            } else {
                //是否匹配规则
                rule = matchRule(exchange, rules);
            }
            if (Objects.isNull(rule)) {
                return handleRuleIsNull(pluginName, exchange, chain);
            }
        //记录规则日志
            ruleLog(rule, pluginName);
        //执行每个插件自己的执行逻辑
            return doExecute(exchange, chain, selectorData, rule);
        }
    //处理下一个插件
        return chain.execute(exchange);
    }
    //..
}

DividePlugin中的doExecute()方法的主要功能是:

  • 对业务系统实现负载均衡。soul网关自己实现了负载均衡,目前支持:轮询,随机,哈希三种方式。
  • 构建http请求,用于请求真正的http业务接口。但是执行http操作的功能是统一交给了WebClientPlugin
@Slf4j
public class DividePlugin extends AbstractSoulPlugin {

    @Override
    protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
       //...
        final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
        //实现负载均衡
        DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip);
        if (Objects.isNull(divideUpstream)) {
            log.error("divide has no upstream");
            Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
            return WebFluxResultUtils.result(exchange, error);
        }
        //构建http请求
        // set the http url
        String domain = buildDomain(divideUpstream);
        String realURL = buildRealURL(domain, soulContext, exchange);
        exchange.getAttributes().put(Constants.HTTP_URL, realURL);
        // set the http timeout
        exchange.getAttributes().put(Constants.HTTP_TIME_OUT, ruleHandle.getTimeout());
        exchange.getAttributes().put(Constants.HTTP_RETRY, ruleHandle.getRetry());
        return chain.execute(exchange);
    }
    //...
}

分析到这里就算是弄清楚了DividePlugin的执行原理了:匹配插件,匹配选择器,匹配规则,负载均衡,构建http请求。

至此,我们就分析完了开始提到的三个问题。弄清楚了插件加载过程,业务接口注册过程,Divide插件执行原理。