Soul源码阅读系列(三)插件是如何被加载和执行的?
15 Mar 2021 | Soul |在上篇文章中,我们通过一个案例演示了http
用户如何接入到Soul
网关中,本文将探索其中的原理:
Soul
如何加载插件?- 业务接口如何注册到
soul admin
中? Divide
插件的原理是什么?
Soul 如何加载插件?
首先,我们来看看Soul
是如何加载各个插件的?在上一篇文章中,我们看到了Divide
插件的调用过程:
SoulWebHandler
:它实现了WebHandler
,重写了handle()
方法,用于处理Soul
网关中所有的请求。DefaultSoulPluginChain
:插件链执行类,以责任链的设计模式处理所有插件。AbstractSoulPlugin
:多个插件的父类,以模板方法设计模式实现各种插件类型。DividePlugin
:divide
插件,用于处理http
请求。
通过查看源码,可以发现在SoulWebHandler
中,是通过构造器的方式将所有插件设置进来(在Soul
网关中使用了Reactor
并行编程,代码中的scheduler
就是由reactor
包实现,此处的作用是创建线程池)。那么又会有两个问题:一是 插件是如何生成的?二是 SoulWebHandler
是何时被创建的?
public final class SoulWebHandler implements WebHandler {
//...
public SoulWebHandler(final List<SoulPlugin> plugins) {
//保存所有插件
this.plugins = plugins;
String schedulerType = System.getProperty("soul.scheduler.type", "fixed");
if (Objects.equals(schedulerType, "fixed")) {
//获取可用线程数
int threads = Integer.parseInt(System.getProperty(
"soul.work.threads", "" + Math.max((Runtime.getRuntime().availableProcessors() << 1) + 1, 16)));
//创建CPU内核数量的多线程池
scheduler = Schedulers.newParallel("soul-work-threads", threads);
} else {
//无限制的弹性线程池,可以一直创建线程
scheduler = Schedulers.elastic();
}
}
//...
}
在IDEA
编辑器中点击SoulWebHandler
,查看被调用的地方,发现是在SoulConfiguration
这个配置文件中,通过注解的方式创建了webHandler
这个bean
。所有的插件plugins
也是作为一个参数传进来的,还得继续向上追踪。(由于版面有限,源码中省略了部分逻辑)
@Configuration
声明一个类是配置类,在Spring Boot
启动时会加载配置类。
@Bean
注解作用于方法上,会从Spring
容器中将同类型的SoulPlugin
自动注入进来。
@Configuration
public class SoulConfiguration {
//...
@Bean("webHandler")
public SoulWebHandler soulWebHandler(final ObjectProvider<List<SoulPlugin>> plugins) {
List<SoulPlugin> pluginList = plugins.getIfAvailable(Collections::emptyList);
//排序
final List<SoulPlugin> soulPlugins = pluginList.stream()
.sorted(Comparator.comparingInt(SoulPlugin::getOrder)).collect(Collectors.toList());
soulPlugins.forEach(soulPlugin -> log.info("load plugin:[{}] [{}]", soulPlugin.named(), soulPlugin.getClass().getName()));
return new SoulWebHandler(soulPlugins);
}
//...
}
那到这里,就清楚了:只要实现了SoulPlugin
的类就会被注入进来,所以再看看SoulPlugin
的实现类有哪些?
通过编辑器发现有 34
个类实现了SoulPlugin
接口,其中有AbstractSoulPlugin
,还有实现http
请求接入的DividePlugin
,还有其他的插件,我们后面再陆续探究。
现在还有问题是:各个插件是何时被创建的?我们以DividePlugin
插件为例,通过IDEA
编辑器点击DividePlugin
,发现在DividePluginConfiguration
中被创建了。
DividePluginConfiguration
也是一个配置文件,在里面配置了dividePlugin
这个bean,它的类型是SoulPlugin
,所以在SoulConfiguration
会自动注入到soulWebHandler()
方法中。
@Configuration
public class DividePluginConfiguration {
@Bean
public SoulPlugin dividePlugin() {
return new DividePlugin();
}
//...
}
DividePluginConfiguration
是通过spring boot starter
的方式自动加载的。
spring boot starter
可以自动加载依赖,它在Spring Boot
启动时,自动加载资源文件夹META-INF\spring.factories
中配置的类。
分析到这里,就知道了Soul
是如何加载插件的:在soul
网关启动的时候,spring boot starter
自动加载相关配置类,创建插件bean
,然后在SoulWebHandler
中将插件bean
注入进来,保存到List
中。用一张图来描述一下上述过程:
业务接口如何注册到 soul admin 中?
知道了插件是怎么被加载到soul
网关中的之后,接下来再看看业务接口是如何注册到soul admin
中?如下图所示,展示了divide
插件对应的选择器列表和选择器规则列表,这些选择器和规则我们并没有自己添加上去,为什么就会存在呢?
是因为在业务系统中进行了配置:
adminUrl
:soul-admin
的地址,用于将业务系统的接口注册到soul-admin
后台管理中去;port
:业务系统的端口;contextPath
:业务系统在网关中的上下文名称;appName
:业务系统的名称;full
:是否代理全部,如果是true
,则代理业务系统的所有接口。
soul:
http:
adminUrl: http://localhost:9095 #soul-admin的地址
port: 8188
contextPath: /http
appName: http
full: false
还有一个关键注解是@SoulSpringMvcClient
,业务系统在启动时,会读取这个注解进行处理。处理类是SpringMvcClientBeanPostProcessor
,实现了BeanPostProcessor
,是一个后置处理器,在bean
的创建前后,分别有方法进行处理。
BeanPostProcessor
接口是Spring
中的一个后置处理器接口,它的作用主要是如果我们需要在Spring
容器完成Bean
的实例化、配置和其他的初始化前后添加一些自己的逻辑处理,就可以实现该接口的,然后注册到容器中。
SpringMvcClientBeanPostProcessor
部分源码如下,它做了下面几件事情:
- 在
bean
初始化前,创建线程池,其中的线程用于将接口信息发送到soul-admin
; - 在
bean
初始化后,处理SoulSpringMvcClient
注解; - 如果
SoulSpringMvcClient
注解作用于类上,那么就表示该类的所有接口都被网关代理,通过线程池中的线程将接口类发送到soul-admin
中进行注册; - 如果
SoulSpringMvcClient
注解作用于方法上,那么就将该方法的接口信息通过线程池中的线程发送到soul-admin
中进行注册。
public class SpringMvcClientBeanPostProcessor implements BeanPostProcessor {
//..
//在bean初始化前,创建线程池,其中的线程用于将接口信息发送到soul-admin中去
public SpringMvcClientBeanPostProcessor(final SoulSpringMvcConfig soulSpringMvcConfig) {
//...
url = soulSpringMvcConfig.getAdminUrl() + "/soul-client/springmvc-register";
executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
}
@Override
public Object postProcessAfterInitialization(@NonNull final Object bean, @NonNull final String beanName) throws BeansException {
//full是否为 true
if (soulSpringMvcConfig.isFull()) {
return bean;
}
//获取 Controller 注解
Controller controller = AnnotationUtils.findAnnotation(bean.getClass(), Controller.class);
//获取 RequestMapping 注解
RequestMapping requestMapping = AnnotationUtils.findAnnotation(bean.getClass(), RequestMapping.class);
if (controller != null || requestMapping != null) {
//获取 SoulSpringMvcClient 注解
SoulSpringMvcClient clazzAnnotation = AnnotationUtils.findAnnotation(bean.getClass(), SoulSpringMvcClient.class);
String prePath = "";
//是否作用于类上
if (Objects.nonNull(clazzAnnotation)) {
if (clazzAnnotation.path().indexOf("*") > 1) {
String finalPrePath = prePath;
//将接口信息发送到soul-amin
executorService.execute(() -> RegisterUtils.doRegister(buildJsonParams(clazzAnnotation, finalPrePath), url,
RpcTypeEnum.HTTP));
return bean;
}
prePath = clazzAnnotation.path();
}
final Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(bean.getClass());
for (Method method : methods) {
SoulSpringMvcClient soulSpringMvcClient = AnnotationUtils.findAnnotation(method, SoulSpringMvcClient.class);
//SoulSpringMvcClient注解是否作用于方法上
if (Objects.nonNull(soulSpringMvcClient)) {
String finalPrePath = prePath;
//将接口信息发送到soul-amin
executorService.execute(() -> RegisterUtils.doRegister(buildJsonParams(soulSpringMvcClient, finalPrePath), url,
RpcTypeEnum.HTTP));
}
}
}
return bean;
}
//...
}
通过debug
可以看到发送的某个方法接口信息,包括业务系统名称,上下文名称,方法请求路径,是否启用等等。
{"appName":"http","context":"/http","path":"/http/test/**","pathDesc":"","rpcType":"http","host":"192.168.236.75","port":8188,"ruleName":"/http/test/**","enabled":true,"registerMetaData":false}
发送的路径是soul-admin
后端的一个接口:
http://localhost:9095/soul-client/springmvc-register
那接着再跟踪一下springmvc-register
这个接口又做了什么?
进入它的实现类(下面的源码只保留的主要逻辑),开始处理选择器信息,处理规则信息,都处理成功了,就返回一个成功的信息。
public String registerSpringMvc(final SpringMvcRegisterDTO dto) {
//...
String selectorId = handlerSpringMvcSelector(dto);
handlerSpringMvcRule(selectorId, dto);
return SoulResultMessage.SUCCESS;
}
选择器和规则的处理逻辑是相似的:先更新soul-admin
数据中的选择器或规则信息;然后将选择器或规则信息通过发布事件的方式发送到soul
网关。
private String handlerSpringMvcSelector(final SpringMvcRegisterDTO dto) {
//...
//更新选择器信息,保存到数据库
selectorMapper.updateSelective(selectorDO);
//保存业务系统信息
upstreamCheckService.submit(contextPath, addDivideUpstream);
//发布事件
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData)));
}
return selectorId;
}
发布事件的操作比较复杂,涉及到数据同步的原理,我们后面再专门进行分析,今天就不再深入了。
分析到这里,终于可以回答 业务接口如何注册到soul admin 中?
这个问题了。在业务系统中,将想要被soul
网关代理的接口,加上@SoulSpringMvcClient
注解,当系统启动时,将接口信息通过多线程的方式(基于http
的POST
请求)发送到soul-admin
后台管理系统中。在soul-admin
中,一方面将接口信息保存到自己的数据库,另一方面发布接口信息事件到soul
网关,这样soul
网关就知道哪些接口可以被代理,哪些接口直接跳过。再用图片描述一下处理过程:
Divide 插件的原理是什么?
在上面的分析中明白了插件的加载过程和接口信息的注册过程,现在来解决最后一个问题:divide
插件的执行原理。
由前面的分析文章可以知道,网关的所有请求最终都会来到 SoulWebHandler
进行处理。handle()
相当于请求入口,参数 ServerWebExchange
携带了请求信息。在这个方法里面,创建了一个对象保存所有插件,并将请求交给了之前插件的线程池中。
public final class SoulWebHandler implements WebHandler {
//...
@Override
public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {
return new DefaultSoulPluginChain(plugins).execute(exchange).subscribeOn(scheduler);
}
}
在DefaultSoulPluginChain
这个类中的execute()
方法采用了责任链的设计模式,依次处理所有插件,所以DividePlugin
会在这里被执行。
private static class DefaultSoulPluginChain implements SoulPluginChain {
//...
@Override
public Mono<Void> execute(final ServerWebExchange exchange) {
return Mono.defer(() -> {
if (this.index < plugins.size()) {
SoulPlugin plugin = plugins.get(this.index++);
Boolean skip = plugin.skip(exchange);
if (skip) {
return this.execute(exchange);
}
return plugin.execute(exchange, this);
}
return Mono.empty();
});
}
}
}
DividePlugin
继承了AbstractSoulPlugin
,所以会执行plugin.execute()
方法,这方法是许多插件的共有方法,也就是模板方法。插件类的继承关系使用了模板方法设计模式,共有方法是execute()
,抽象方法是doExecute()
。
共有方法execute()
主要作用是:匹配插件,匹配选择器,匹配规则。有一个没有匹配上,就去处理下一个插件。soul
网关的内存中保存了插件,选择器和规则信息,这些信息的实时更新是从soul-admin
发布事件同步过来的。
- 匹配插件:从soul网关的内存中获取插件信息,判断插件是否存在,是否被启用 ,如果没有找到就执行下一个插件。
- 匹配选择器:从soul网关的内存中获取选择器信息,判断选择器信息能否匹配成功,如果没有匹配上就执行下一个插件。
- 匹配规则:从soul网关的内存中获取规则信息,判断规则信息能否匹配成功,如果没有匹配上就执行下一个插件。
public abstract class AbstractSoulPlugin implements SoulPlugin {
//..
protected abstract Mono<Void> doExecute(ServerWebExchange exchange, SoulPluginChain chain, SelectorData selector, RuleData rule);
//...
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
String pluginName = named();
//从soul网关的内存中获取插件信息
final PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
//插件是否存在,是否被启用
if (pluginData != null && pluginData.getEnabled()) {
//从soul网关的内存中获取选择器信息
final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);
//如果选择器信息不存在,就处理下一个插件
if (CollectionUtils.isEmpty(selectors)) {
return handleSelectorIsNull(pluginName, exchange, chain);
}
//插件是否可以匹配上
final SelectorData selectorData = matchSelector(exchange, selectors);
if (Objects.isNull(selectorData)) {
return handleSelectorIsNull(pluginName, exchange, chain);
}
//记录选择器日志
selectorLog(selectorData, pluginName);
//从soul网关的内存中获取规则信息
final List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());
//规则不存在,则执行下一个插件
if (CollectionUtils.isEmpty(rules)) {
return handleRuleIsNull(pluginName, exchange, chain);
}
RuleData rule;
if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {
//get last
rule = rules.get(rules.size() - 1);
} else {
//是否匹配规则
rule = matchRule(exchange, rules);
}
if (Objects.isNull(rule)) {
return handleRuleIsNull(pluginName, exchange, chain);
}
//记录规则日志
ruleLog(rule, pluginName);
//执行每个插件自己的执行逻辑
return doExecute(exchange, chain, selectorData, rule);
}
//处理下一个插件
return chain.execute(exchange);
}
//..
}
在DividePlugin
中的doExecute()
方法的主要功能是:
- 对业务系统实现负载均衡。
soul
网关自己实现了负载均衡,目前支持:轮询,随机,哈希三种方式。 - 构建
http
请求,用于请求真正的http
业务接口。但是执行http
操作的功能是统一交给了WebClientPlugin
。
@Slf4j
public class DividePlugin extends AbstractSoulPlugin {
@Override
protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
//...
final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
//实现负载均衡
DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip);
if (Objects.isNull(divideUpstream)) {
log.error("divide has no upstream");
Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
//构建http请求
// set the http url
String domain = buildDomain(divideUpstream);
String realURL = buildRealURL(domain, soulContext, exchange);
exchange.getAttributes().put(Constants.HTTP_URL, realURL);
// set the http timeout
exchange.getAttributes().put(Constants.HTTP_TIME_OUT, ruleHandle.getTimeout());
exchange.getAttributes().put(Constants.HTTP_RETRY, ruleHandle.getRetry());
return chain.execute(exchange);
}
//...
}
分析到这里就算是弄清楚了DividePlugin
的执行原理了:匹配插件,匹配选择器,匹配规则,负载均衡,构建http
请求。
至此,我们就分析完了开始提到的三个问题。弄清楚了插件加载过程,业务接口注册过程,Divide
插件执行原理。