26 Jan 2021 |
Soul |
本篇文章分析的是@SoulSpringMvcClient
注解,它的作用是:用于标记SpringMvc
服务中的接口,被标记的接口在系统启动的是时候,将当前接口注册到soul-admin
后台中。使用方式如下:
@RestController
@RequestMapping("/order")
@SoulSpringMvcClient(path = "/order")
public class OrderController {
//省略了其他代码
@GetMapping("/findById")
@SoulSpringMvcClient(path = "/findById", desc = "Find by id")
public OrderDTO findById(@RequestParam("id") final String id) {
OrderDTO orderDTO = new OrderDTO();
orderDTO.setId(id);
orderDTO.setName("hello world findById");
return orderDTO;
}
}
它的定义如下:
- 注解可以使用在类上,也可以使用在方法上;
path
:表示接口的路径;
ruleName
:表示规则名称;
desc
:接口描述信息;
rpcType
:传输类型,默认是http
,在soul
网关中,还有SpringCloud
,Dubbo
,Sofa
等类型;
enabled
:是否开启(是否被网关代理),默认是true
:
registerMetaData
:是否注册元数据信息,默认是false
。
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface SoulSpringMvcClient {
/**
* Path string.
*
* @return the string
*/
String path();
/**
* Rule name string.
*
* @return the string
*/
String ruleName() default "";
/**
* Desc string.
*
* @return String string
*/
String desc() default "";
/**
* Rpc type string.
*
* @return the string
*/
String rpcType() default "http";
/**
* Enabled boolean.
*
* @return the boolean
*/
boolean enabled() default true;
/**
* Register meta data boolean.
*
* @return the boolean
*/
boolean registerMetaData() default false;
}
注册过程在启动的时候完成,以soul
官网中的soul-examples-http
为例,它的案例演示请参考前面的文章。在配置文件中配置soul-admin
后台信息:
soul:
http:
adminUrl: http://localhost:9095 #soul-admin后台地址
port: 8188 # soul-admin后台端口
contextPath: /http # 当前服务的上下文路径
appName: http # 当前服务的名称
full: false # 是否全部被代理,如果为true,那么这个服务的所有接口都会被代理,就不用加注解了。soul-admin端也就不需要规则,只需要一个选择器。
业务系统启动时,会通过start
加载配置文件,SpringMvcClientBeanPostProcessor
这个后置处理器也会被加载。
对@SoulSpringMvcClient
注解的处理就在这个后置处理器中:
- 配置如果是
full=true
,就返回,表示处理所有;
- 获取当前
Bean
的Controller
,RestController
,RequestMapping
,注解,判断是否包含"*"
。如果是,就代理所有方法,注册当前Bean
;
- 获取所有方法,注册每个含有
@SoulSpringMvcClient
注解的方法。
public class SpringMvcClientBeanPostProcessor implements BeanPostProcessor {
//省略了其他代码
@Override
public Object postProcessAfterInitialization(@NonNull final Object bean, @NonNull final String beanName) throws BeansException {
//配置是否为true
if (soulSpringMvcConfig.isFull()) {
return bean;
}
Controller controller = AnnotationUtils.findAnnotation(bean.getClass(), Controller.class);
RestController restController = AnnotationUtils.findAnnotation(bean.getClass(), RestController.class);
RequestMapping requestMapping = AnnotationUtils.findAnnotation(bean.getClass(), RequestMapping.class);
//处理类
if (controller != null || restController != null || requestMapping != null) {
SoulSpringMvcClient clazzAnnotation = AnnotationUtils.findAnnotation(bean.getClass(), SoulSpringMvcClient.class);
String prePath = "";
if (Objects.nonNull(clazzAnnotation)) {
if (clazzAnnotation.path().indexOf("*") > 1) {
String finalPrePath = prePath;
//注册
executorService.execute(() -> RegisterUtils.doRegister(buildJsonParams(clazzAnnotation, finalPrePath), url,
RpcTypeEnum.HTTP));
return bean;
}
prePath = clazzAnnotation.path();
}
//遍历方法
final Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(bean.getClass());
for (Method method : methods) {
SoulSpringMvcClient soulSpringMvcClient = AnnotationUtils.findAnnotation(method, SoulSpringMvcClient.class);
if (Objects.nonNull(soulSpringMvcClient)) {
String finalPrePath = prePath;
//注册
executorService.execute(() -> RegisterUtils.doRegister(buildJsonParams(soulSpringMvcClient, finalPrePath), url,
RpcTypeEnum.HTTP));
}
}
}
return bean;
}
//省略了其他代码
}
注册的逻辑就是发请求信息到soul-admin
中,这个交给了线程池。什么时候发起请求,是线程池在调度处理,发请求的方式是通过OkHttp
发起post
请求。
//org.dromara.soul.client.common.utils.RegisterUtils#doRegister
public static void doRegister(final String json, final String url, final RpcTypeEnum rpcTypeEnum) {
try {
//通过OkHttp发起post请求
String result = OkHttpTools.getInstance().post(url, json);
if (AdminConstants.SUCCESS.equals(result)) {
log.info("{} client register success: {} ", rpcTypeEnum.getName(), json);
} else {
log.error("{} client register error: {} ", rpcTypeEnum.getName(), json);
}
} catch (IOException e) {
log.error("cannot register soul admin param, url: {}, request body: {}", url, json, e);
}
}
在soul-admin
端有个接口来接受当前的注解配置信息,这里面的逻辑就是创建或者更新选择器或插件信息。
@RestController
@RequestMapping("/soul-client")
public class SoulClientController {
//省略了其他代码
/**
* Register spring mvc string.
*
* @param springMvcRegisterDTO the spring mvc register dto
* @return the string
*/
@PostMapping("/springmvc-register")
public String registerSpringMvc(@RequestBody final SpringMvcRegisterDTO springMvcRegisterDTO) {
return soulClientRegisterService.registerSpringMvc(springMvcRegisterDTO);
}
}
通过上面的分析,就明白了@SoulSpringMvcClient
注解的执行过程及原理。
25 Jan 2021 |
Soul |
在上一篇文章中,通过跟踪源码的方式了解了http长轮询
的执行流程。但是,自己还有一些疑问,本篇文章是在官网的基础上进行了拓展,加入一些自己的理解。
zookeeper
、websocket
数据同步的机制比较简单,而 http
同步会相对复杂一些。Soul
借鉴了 Apollo
、Nacos
的设计思想,取其精华,自己实现了 http
长轮询数据同步功能。注意,这里并非传统的 ajax
长轮询!
http 长轮询
机制如上所示,请求逻辑是Soul网关
主动请求 soul-admin
的配置服务。响应逻辑有两种:soul-admin
端本身的配置修改和60s
的等待时间到了。
http
请求到达 sou-admin
之后,并非立马响应数据,而是利用 Servlet3.0
的异步机制,异步响应数据。首先,将长轮询请求任务 LongPollingClient
扔到阻塞队列 BlocingQueue
中,并且开启调度任务,每60s
执行一次,将队列中的请求拿出,发送对应的响应。如果没有发生配置信息的更改,也需要对请求响应,好让网关知道,不需要一直等待。当然,网关请求配置服务时,也有 90s
的超时时间。
class LongPollingClient implements Runnable {
LongPollingClient(final AsyncContext ac, final String ip, final long timeoutTime) {
// 省略......
}
@Override
public void run() {
// 加入定时任务,如果60s之内没有配置变更,则60s后执行,响应http请求
this.asyncTimeoutFuture = scheduler.schedule(() -> {
// clients是阻塞队列,保存了来自soul-web的请求信息
clients.remove(LongPollingClient.this);
List<ConfigGroupEnum> changedGroups = HttpLongPollingDataChangedListener.compareMD5((HttpServletRequest) asyncContext.getRequest());
//发送响应
sendResponse(changedGroups);
}, timeoutTime, TimeUnit.MILLISECONDS);
//放到阻塞队列中
clients.add(this);
}
}
如果这段时间内,soul-admin
发生了数据信息的更改,此时,会挨个移除队列中的长轮询请求,并响应数据,告知是哪个 Group
的数据发生了变更(我们将插件、规则、流量配置、用户配置数据分成不同的组)。网关收到响应信息之后,只知道是哪个 Group
发生了配置变更,还需要再次请求该 Group
的配置数据。
// soul-admin发生了配置变更,挨个将队列中的请求移除,并予以响应
class DataChangeTask implements Runnable {
DataChangeTask(final ConfigGroupEnum groupKey) {
this.groupKey = groupKey;
}
@Override
public void run() {
try {
//挨个处理阻塞队列中的请求
for (Iterator<LongPollingClient> iter = clients.iterator(); iter.hasNext(); ) {
LongPollingClient client = iter.next();
//移除
iter.remove();
//响应
client.sendResponse(Collections.singletonList(groupKey));
}
} catch (Throwable e) {
LOGGER.error("data change error.", e);
}
}
}
当 soul-web
网关层接收到 http
响应信息之后,拉取变更信息(如果有变更的话),然后再次请求 soul-admin
的配置服务,如此反复循环。
长轮询
体现在请求任务会一直执行。
class HttpLongPollingTask implements Runnable {
//省略其他代码
@Override
public void run() {
//一直循环下去
while (RUNNING.get()) {
for (int time = 1; time <= retryTimes; time++) {
try {
doLongPolling(server);
} catch (Exception e) {
//一直循环下去
}
}
}
log.warn("Stop http long polling.");
}
}
轮询:客户端每隔几秒钟向服务端发送 http
请求,服务端在收到请求后,不论是否有数据更新,都直接进行响应。在服务端响应完成,就会关闭这个 TCP
连接。这种方式实现非常简单,兼容性也比较好,只要支持 http
协议就可以用这种方式实现。缺点就是非常消耗资源,会占用较多的内存和带宽。
长轮询:客户端发送请求后服务器端不会立即返回数据,服务器端会阻塞,请求连接挂起,直到服务端有数据更新或者是连接超时才返回,客户端才再次发出请求新建连接、如此反复从而获取最新数据。相比轮询,长轮询减少了很多不必要的 http
请求次数,相比之下节约了资源。
参考文献:
23 Jan 2021 |
Soul |
在上一篇文章中,跟踪了基于Nacos
的数据同步原理,本篇文章将要跟踪基于Http长轮询
的数据同步原理。
如果是 http
同步策略,soul-web
主动发起长轮询请求,默认有 90s
超时时间,如果 soul-admin
没有数据变更,则会阻塞 http
请求,如果有数据发生变更则响应变更的数据信息,如果超过 60s 仍然没有数据变更则响应空数据,网关层接到响应后,继续发起http
请求,反复同样的请求。
同步的核心逻辑是:在soul-admin
后台修改数据,先保存到数据库,然后保存到soul-admin
的内存;在网关有定时任务执行,即发起长轮询,发起http
请求到soul-admin
去获取变更的数据。
本文的分析是想通过跟踪源码的方式来理解同步的核心逻辑,数据同步分析步骤如下:
- 1.修改选择器
- 2.更新数据
- 3.接收数据
- 4.使用更新后的数据
1. 修改选择器
在演示案例之前,将soul-admin
的数据同步方式配置为http
:
soul:
database:
dialect: mysql
init_script: "META-INF/schema.sql"
init_enable: true
sync:
# websocket:
# enabled: true
# zookeeper:
# url: localhost:2181
# sessionTimeout: 5000
# connectionTimeout: 2000
http:
enabled: true
在soul-bootstrap
也配置一下数据同步方式为http
:
soul :
file:
enabled: true
corss:
enabled: true
dubbo :
parameter: multi
sync:
# websocket :
# urls: ws://localhost:9095/websocket
# zookeeper:
# url: localhost:2181
# sessionTimeout: 5000
# connectionTimeout: 2000
http:
url : http://localhost:9095
现在,我们以一个实际调用过程为例,比如在Soul
网关管理系统中,对选择器的配置信息进行修改:查询条件中id=99
才能匹配成功。具体信息如下所示:
点击确认后,进入到soul-admin
的updateSelector()
这个接口。
@PutMapping("/{id}")
public SoulAdminResult updateSelector(@PathVariable("id") final String id, @RequestBody final SelectorDTO selectorDTO) {
Objects.requireNonNull(selectorDTO);
selectorDTO.setId(id);
Integer updateCount = selectorService.createOrUpdate(selectorDTO);
return SoulAdminResult.success(SoulResultMessage.UPDATE_SUCCESS, updateCount);
}
2.更新数据
进入到后端系统后,会现在数据中更新信息,然后通过publishEvent()
方法将更新的信息同步到网关。(下面代码只是展示了主要的逻辑,完整的代码请参考Soul
源码。)
@Transactional(rollbackFor = RuntimeException.class)
public int createOrUpdate(final SelectorDTO selectorDTO) {
int selectorCount;
SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);
//将更新的数据保存到soul-admin的数据库
//省略了其他代码
//将更新的数据同步到网关
publishEvent(selectorDO, selectorConditionDTOs);
return selectorCount;
}
在publishEvent()
方法中调用了eventPublisher.publishEvent()
,这个eventPublisher
对象是一个ApplicationEventPublisher
类,这个类的全限定名是org.springframework.context.ApplicationEventPublisher
。看到这儿,我们知道了发布数据是通过Spring
相关的功能来完成的。
private void publishEvent(final RuleDO ruleDO, final List<RuleConditionDTO> ruleConditions) {
//省略了其他代码......
// publish change event.
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, DataEventTypeEnum.UPDATE,
Collections.singletonList(RuleDO.transFrom(ruleDO, pluginDO.getName(), conditionDataList))));
}
Spring
完成事件发布后,肯定有对应的监听器来处理它,这个监听器是ApplicationListener
接口。在Soul
中是通过DataChangedEventDispatcher
这个类来完成具体监听工作的,它实现了ApplicationListener
接口。
//处理监听事件
@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
//省略了其他代码......
@Override
@SuppressWarnings("unchecked")
public void onApplicationEvent(final DataChangedEvent event) {
for (DataChangedListener listener : listeners) {
switch (event.getGroupKey()) {
case APP_AUTH: //认证授权
listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
break;
case PLUGIN: //修改了插件
listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
break;
case RULE: //修改了规则
listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
break;
case SELECTOR: //修改了选择器
listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
break;
case META_DATA: //修改了元数据
listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
break;
default:
throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
}
}
}
//在bean初始化的时候,将实现DataChangedListener接口的bean加载进来。
@Override
public void afterPropertiesSet() {
Collection<DataChangedListener> listenerBeans = applicationContext.getBeansOfType(DataChangedListener.class).values();
this.listeners = Collections.unmodifiableList(new ArrayList<>(listenerBeans));
}
}
注意一下,这个DataChangedEventDispatcher
还实现了InitializingBean
接口,并重写了它的afterPropertiesSet()
方法,做的事情是:在bean
初始化的时候,将实现DataChangedListener
接口的bean
加载进来。通过查看源码,可以看到4种数据同步的方式都实现了该接口,其中就有我们这次使用的Nacos
数据同步方式。
当监听器监听到有事件发布后,会执行onApplicationEvent()
方法,这里面的逻辑是循环处理DataChangedListener
,通过switch / case
表达式匹配修改的是什么类型信息,我们这里修改的是选择器,所以会匹配到listener.onSelectorChanged()
这个方法。(这里虽然用了循环的方式处理每一个listener
,但在实际中我们只需要一种数据同步方式就好。)
本次使用的是http长轮询
进行数据同步,所以listener.onSelectorChanged()
的实际执行方法是HttpLongPollingDataChangedListener#onSelectorChanged
,它继承了AbstractDataChangedListener
。这里面做的事情是:
public void onSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {
if (CollectionUtils.isEmpty(changed)) {
return;
}
//更新选择器信息到缓存
this.updateSelectorCache();
//设置响应
this.afterSelectorChanged(changed, eventType);
}
真正更新数据的操作是通过updateCache
完成,将新的数据放到CACHE
中,这个CACHE
是ConcurrentMap
类型。网关有定时任务来这个CACHE
里获取数据。
//更新选择器信息到缓存
protected void updateSelectorCache() {
this.updateCache(ConfigGroupEnum.SELECTOR, selectorService.listAll());
}
protected <T> void updateCache(final ConfigGroupEnum group, final List<T> data) {
String json = GsonUtils.getInstance().toJson(data);
ConfigDataCache newVal = new ConfigDataCache(group.name(), json, Md5Utils.md5(json), System.currentTimeMillis());
//更新新的数据
ConfigDataCache oldVal = CACHE.put(newVal.getGroup(), newVal);
log.info("update config cache[{}], old: {}, updated: {}", group, oldVal, newVal);
}
设置响应的过程是在定时任务中完成的。
//scheduler 是个定时任务
@Override
protected void afterSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {
scheduler.execute(new DataChangeTask(ConfigGroupEnum.SELECTOR));
}
//定时任务
class DataChangeTask implements Runnable {
//省略其他代码
@Override
public void run() {
for (Iterator<LongPollingClient> iter = clients.iterator(); iter.hasNext();) {
LongPollingClient client = iter.next();
iter.remove();
client.sendResponse(Collections.singletonList(groupKey));
//省略其他代码
}
}
}
//发送响应
void sendResponse(final List<ConfigGroupEnum> changedGroups) {
//省略其他代码
generateResponse((HttpServletResponse) asyncContext.getResponse(), changedGroups);
asyncContext.complete();
}
//产生响应
private void generateResponse(final HttpServletResponse response, final List<ConfigGroupEnum> changedGroups) {
try {
response.setHeader("Pragma", "no-cache");
response.setDateHeader("Expires", 0);
response.setHeader("Cache-Control", "no-cache,no-store");
response.setContentType(MediaType.APPLICATION_JSON_VALUE);
response.setStatus(HttpServletResponse.SC_OK);
response.getWriter().println(GsonUtils.getInstance().toJson(SoulAdminResult.success(SoulResultMessage.SUCCESS, changedGroups)));
} catch (IOException ex) {
log.error("Sending response failed.", ex);
}
}
3.接收数据
在Soul
网关中,接收数据的操作是主动通过http长轮询
发起http
请求到soul-admin
获取数据。处理逻辑在org.dromara.soul.sync.data.http.HttpSyncDataService
类中,Soul
网关启动时就会执行。
private void start() {
// It could be initialized multiple times, so you need to control that.
if (RUNNING.compareAndSet(false, true)) {
// fetch all group configs.
this.fetchGroupConfig(ConfigGroupEnum.values());
int threadSize = serverList.size();
this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
SoulThreadFactory.create("http-long-polling", true));
// 发起长轮询
this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));
} else {
log.info("soul http long polling was started, executor=[{}]", executor);
}
}
http长轮询
的任是:不停的进行轮询,先向soul-admin
发起请求查看是否有配置信息(包括插件,选择器,规则和元数据)变更;如果有配置信息变更,再发起请求获取变更的数据;最后更新网关的缓存数据。
如果有配置信息变更class HttpLongPollingTask implements Runnable {
//省略其他代码
@Override
public void run() {
while (RUNNING.get()) {
for (int time = 1; time <= retryTimes; time++) {
try {
doLongPolling(server);
} catch (Exception e) {
//省略其他代码
}
}
}
log.warn("Stop http long polling.");
}
}
private void doLongPolling(final String server) {
//省略其他代码
MultiValueMap<String, String> params = new LinkedMultiValueMap<>(8);
for (ConfigGroupEnum group : ConfigGroupEnum.values()) {
//先发起请求,查看是否有配置信息变更
String listenerUrl = server + "/configs/listener";
JsonArray groupJson = null;
try {
String json = this.httpClient.postForEntity(listenerUrl, httpEntity, String.class).getBody();
} catch (RestClientException e) {
//省略其他代码
}
//如果有配置信息变更
if (groupJson != null) {
if (ArrayUtils.isNotEmpty(changedGroups)) {
log.info("Group config changed: {}", Arrays.toString(changedGroups));
//获取变更的配置信息
this.doFetchGroupConfig(server, changedGroups);
}
}
}
private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) {
//省略其他代码
//再发起请求,获取变更的配置信息
String url = server + "/configs/fetch?" + StringUtils.removeEnd(params.toString(), "&");
try {
json = this.httpClient.getForObject(url, String.class);
} catch (RestClientException e) {
//省略其他代码
}
// 使用获取的配置信息更新缓存
boolean updated = this.updateCacheWithJson(json);
if (updated) {
log.info("get latest configs: [{}]", json);
return;
}
}
在代码中this.updateCacheWithJson(json)
,使用获取的配置信息更新缓存的处理操作,实际还是由CommonPluginDataSubscriber
来处理。CommonPluginDataSubscriber
在处理数据时,根据数据类型和操作类型来分别处理。当前我们测试的是更新选择器信息,所以会进入更新的逻辑,就是下面代码中的BaseDataCache.getInstance().cacheRuleData(ruleData);
。
public class CommonPluginDataSubscriber implements PluginDataSubscriber {
//省略了其他代码
private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {
Optional.ofNullable(classData).ifPresent(data -> {
if (data instanceof PluginData) {
//省略处理插件的逻辑
} else if (data instanceof SelectorData) { //处理选择器信息
SelectorData selectorData = (SelectorData) data;
if (dataType == DataEventTypeEnum.UPDATE) { //更新操作
BaseDataCache.getInstance().cacheSelectData(selectorData);
Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));
} else if (dataType == DataEventTypeEnum.DELETE) { //删除操作
BaseDataCache.getInstance().removeSelectData(selectorData);
Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));
}
} else if (data instanceof RuleData) {
//省略处理规则的逻辑
}
});
}
}
在BaseDataCache.getInstance().cacheSelectData(selectorData);
代码中,做的事情就是根据传入的变更信息来更新SELECTOR_MAP
。这个SELECTOR_MAP
缓存了选择器信息,网关在后续使用时,也是从这里获取具体的选择器去匹配请求。
public final class BaseDataCache {
private static final ConcurrentMap<String, List<SelectorData>> SELECTOR_MAP = Maps.newConcurrentMap();
//省略了其他代码......
//缓存选择器
public void cacheSelectData(final SelectorData selectorData) {
Optional.ofNullable(selectorData).ifPresent(this::selectorAccept);
}
//接受选择器
private void selectorAccept(final SelectorData data) {
String key = data.getPluginName();
if (SELECTOR_MAP.containsKey(key)) {
List<SelectorData> existList = SELECTOR_MAP.get(key);
//删除之前的选择器
final List<SelectorData> resultList = existList.stream().filter(r -> !r.getId().equals(data.getId())).collect(Collectors.toList());
resultList.add(data);
//保存现在的选择器
final List<SelectorData> collect = resultList.stream().sorted(Comparator.comparing(SelectorData::getSort)).collect(Collectors.toList());
SELECTOR_MAP.put(key, collect);
} else {
SELECTOR_MAP.put(key, Lists.newArrayList(data));
}
}
}
分析到这里,基于http长轮询
数据同步的工作就算完成了。核心逻辑是:网关主动请求soul-admin
获取变更的配置信息,将变更的信息放到网关的内存中,使用时再去内存中拿,所以Soul
网关的效率是很高的。
4. 使用更新后的数据
选择器信息完成更新后,通过http
去访问soul
网关,这里以divide
插件为例。关于divide
插件的使用请参考之前的文章。
发起一个GET
请求:http://localhost:9195/http/order/findById?id=100
,代码会执行到下面这个位置:
# org.dromara.soul.plugin.base.AbstractSoulPlugin#execute
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
String pluginName = named();
final PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
if (pluginData != null && pluginData.getEnabled()) {
//获取选择器信息
final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);
if (CollectionUtils.isEmpty(selectors)) {
return handleSelectorIsNull(pluginName, exchange, chain);
}
//省略了其他代码
return doExecute(exchange, chain, selectorData, rule);
}
return chain.execute(exchange);
}
代码BaseDataCache.getInstance().obtainSelectorData(pluginName);
就是我们在数据同步时操作的数据缓存类,RULE_MAP
就是刚才更新的规则信息。
public final class BaseDataCache {
private static final ConcurrentMap<String, List<SelectorData>> SELECTOR_MAP = Maps.newConcurrentMap();
//省略了其他代码......
public List<SelectorData> obtainSelectorData(final String pluginName) {
return SELECTOR_MAP.get(pluginName);
}
}
刚才,我们发起的请求:http://localhost:9195/http/order/findById?id=100
,是匹配不到选择器的:
{
"code": -107,
"message": "Can not find selector, please check your configuration!",
"data": null
}
因为,在开始的时候,更新了选择器的配置:查询条件中id=99
才能匹配成功。
所以,我们另外再发起一个id=99
请求:http://localhost:9195/http/order/findById?id=99
,就可以成功了。
{
"id": "99",
"name": "hello world findById"
}
最后,本文通过源码的方式跟踪了Soul
网关是如何通过http长轮询
完成数据同步的:数据修改后,先保存到 soul-admin
的内存,然后通过Soul
网关主动向soul-admin
发起http
请求获取配置信息,然后进行处理数据,最后将数据保存到网关内存。