08 Oct 2021 |
ShenYu |
Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API
网关。
在ShenYu
网关中,数据同步是指,当在后台管理系统中,数据发送了更新后,如何将更新的数据同步到网关中。Apache ShenYu
网关当前支持ZooKeeper
、WebSocket
、Http长轮询
、Nacos
、Etcd
和 Consul
进行数据同步。本文的主要内容是基于Http长轮询
的数据同步源码分析。
本文基于shenyu-2.4.0
版本进行源码分析,官网的介绍请参考 数据同步原理 。
1. Http长轮询
这里直接引用官网的相关描述:
Zookeeper
和WebSocket
数据同步的机制比较简单,而 Http长轮询
则比较复杂。 Apache ShenYu
借鉴了 Apollo
、Nacos
的设计思想,取其精华,自己实现了 Http长轮询
数据同步功能。注意,这里并非传统的 ajax
长轮询!
Http长轮询
机制如上所示,Apache ShenYu
网关主动请求 shenyu-admin
的配置服务,读取超时时间为 90s
,意味着网关层请求配置服务最多会等待 90s
,这样便于 shenyu-admin
配置服务及时响应变更数据,从而实现准实时推送。
Http长轮询
机制是由网关主动请求 shenyu-admin
,所以这次的源码分析,我们从网关这一侧开始。
2. 网关数据同步
2.1 加载配置
Http长轮询
数据同步配置的加载是通过spring boot
的starter
机制,当我们引入相关依赖和在配置文件中有如下配置时,就会加载。
在pom
文件中引入依赖:
<!--shenyu data sync start use http-->
<dependency>
<groupId>org.apache.shenyu</groupId>
<artifactId>shenyu-spring-boot-starter-sync-data-http</artifactId>
<version>${project.version}</version>
</dependency>
在application.yml
配置文件中添加配置:
shenyu:
sync:
http:
url : http://localhost:9095
当网关启动时,配置类HttpSyncDataConfiguration
就会执行,加载相应的Bean
。
/**
* Http sync data configuration for spring boot.
*/
@Configuration
@ConditionalOnClass(HttpSyncDataService.class)
@ConditionalOnProperty(prefix = "shenyu.sync.http", name = "url")
@Slf4j
public class HttpSyncDataConfiguration {
/**
* Http sync data service.
* 创建 HttpSyncDataService
* @param httpConfig http的配置
* @param pluginSubscriber 插件数据订阅
* @param metaSubscribers 元数据订阅
* @param authSubscribers 认证数据订阅
* @return the sync data service
*/
@Bean
public SyncDataService httpSyncDataService(final ObjectProvider<HttpConfig> httpConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
log.info("you use http long pull sync shenyu data");
return new HttpSyncDataService(Objects.requireNonNull(httpConfig.getIfAvailable()), Objects.requireNonNull(pluginSubscriber.getIfAvailable()),
metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
}
/**
* Http config http config.
* 读取http的配置
* @return the http config
*/
@Bean
@ConfigurationProperties(prefix = "shenyu.sync.http")
public HttpConfig httpConfig() {
return new HttpConfig();
}
}
HttpSyncDataConfiguration
是Http长轮询
数据同步的配置类,负责创建HttpSyncDataService
(负责http
数据同步的具体实现)和HttpConfig
(admin
属性配置)。它的注解如下:
@Configuration
:表示这是一个配置类;
@ConditionalOnClass(HttpSyncDataService.class)
:条件注解,表示要有HttpSyncDataService
这个类;
@ConditionalOnProperty(prefix = "shenyu.sync.http", name = "url")
:条件注解,要有shenyu.sync.http.url
这个属性配置。
2.2 属性初始化
在HttpSyncDataService
的构造函数中,完成属性初始化。
public class HttpSyncDataService implements SyncDataService, AutoCloseable {
// 省略了属性字段......
public HttpSyncDataService(final HttpConfig httpConfig, final PluginDataSubscriber pluginDataSubscriber, final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {
// 1.创建数据处理器
this.factory = new DataRefreshFactory(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);
// 2.获取admin属性配置
this.httpConfig = httpConfig;
// shenyu-admin的url, 多个用逗号(,)分割
this.serverList = Lists.newArrayList(Splitter.on(",").split(httpConfig.getUrl()));
// 3.创建httpClient,用于向admin发起请求
this.httpClient = createRestTemplate();
// 4.开始执行长轮询任务
this.start();
}
//......
}
上面代码中省略了其他函数和相关字段,在构造函数中完成属性的初始化,主要是:
-
创建数据处理器,用于后续缓存各种类型的数据(插件、选择器、规则、元数据和认证数据);
-
获取admin
属性配置,主要是获取admin
的url
,admin
有可能是集群,多个用逗号(,)
分割;
-
创建httpClient
,使用的是RestTemplate
,用于向admin
发起请求;
private RestTemplate createRestTemplate() {
OkHttp3ClientHttpRequestFactory factory = new OkHttp3ClientHttpRequestFactory();
// 建立连接超时时间为 10s
factory.setConnectTimeout((int) this.connectionTimeout.toMillis());
// 网关主动请求 shenyu-admin 的配置服务,读取超时时间为 90s
factory.setReadTimeout((int) HttpConstants.CLIENT_POLLING_READ_TIMEOUT);
return new RestTemplate(factory);
}
-
开始执行长轮询任务。
2.3 开始长轮询
- HttpSyncDataService#start()
在start()
方法中,干了两件事情,一个是获取全量数据,即请求admin
端获取所有需要同步的数据,然后将获取到的数据缓存到网关内存中。另一个是开启多线程执行长轮询任务。
private void start() {
// 只初始化一次,通过原子类实现。
RUNNING = new AtomicBoolean(false);
// It could be initialized multiple times, so you need to control that.
if (RUNNING.compareAndSet(false, true)) {
// fetch all group configs.
// 初次启动,获取全量数据
this.fetchGroupConfig(ConfigGroupEnum.values());
// 一个后台服务,一个线程
int threadSize = serverList.size();
// 自定义线程池
this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
ShenyuThreadFactory.create("http-long-polling", true));
// start long polling, each server creates a thread to listen for changes.
// 开始长轮询,一个admin服务,创建一个线程用于数据同步
this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));
} else {
log.info("shenyu http long polling was started, executor=[{}]", executor);
}
}
2.3.1 获取全量数据
- HttpSyncDataService#fetchGroupConfig()
ShenYu
将所有需要同步的数据进行了分组,一共有5种数据类型,分别是插件、选择器、规则、元数据和认证数据。
public enum ConfigGroupEnum {
APP_AUTH, // 认证数据
PLUGIN, //插件
RULE, // 规则
SELECTOR, // 选择器
META_DATA; // 元数据
}
admin
有可能是集群,这里通过循环的方式向每个admin
发起请求,有一个执行成功了,那么向admin
获取全量数据并缓存到网关的操作就执行成功。如果出现了异常,就向下一个admin
发起请求。
private void fetchGroupConfig(final ConfigGroupEnum... groups) throws ShenyuException {
// admin有可能是集群,这里通过循环的方式向每个admin发起请求
for (int index = 0; index < this.serverList.size(); index++) {
String server = serverList.get(index);
try {
// 真正去执行
this.doFetchGroupConfig(server, groups);
// 有一个成功,就成功了,可以退出循环
break;
} catch (ShenyuException e) {
// 出现异常,尝试执行下一个
// 最后一个也执行失败了,抛出异常
// no available server, throw exception.
if (index >= serverList.size() - 1) {
throw e;
}
log.warn("fetch config fail, try another one: {}", serverList.get(index + 1));
}
}
}
- HttpSyncDataService#doFetchGroupConfig()
在此方法中,首先拼装请求参数,然后通过httpClient
发起请求,到admin
中获取数据,最后将获取到的数据更新到网关内存中。
// 向admin后台管理系统发起请求,获取所有同步数据
private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) {
// 1. 拼请求参数,所有分组枚举类型
StringBuilder params = new StringBuilder();
for (ConfigGroupEnum groupKey : groups) {
params.append("groupKeys").append("=").append(groupKey.name()).append("&");
}
// admin端提供的接口 /configs/fetch
String url = server + "/configs/fetch?" + StringUtils.removeEnd(params.toString(), "&");
log.info("request configs: [{}]", url);
String json = null;
try {
// 2. 发起请求,获取变更数据
json = this.httpClient.getForObject(url, String.class);
} catch (RestClientException e) {
String message = String.format("fetch config fail from server[%s], %s", url, e.getMessage());
log.warn(message);
throw new ShenyuException(message, e);
}
// update local cache
// 3. 更新网关内存中数据
boolean updated = this.updateCacheWithJson(json);
// 更新成功,此方法就执行完成了
if (updated) {
log.info("get latest configs: [{}]", json);
return;
}
// not updated. it is likely that the current config server has not been updated yet. wait a moment.
log.info("The config of the server[{}] has not been updated or is out of date. Wait for 30s to listen for changes again.", server);
// 服务端没有数据更新,就等30s
ThreadUtils.sleep(TimeUnit.SECONDS, 30);
}
从代码中,可以看到 admin
端提供的获取全量数据接口是 /configs/fetch
,这里先不进一步深入,放在后文再分析。
获取到admin
返回结果数据,并成功更新,那么此方法就执行结束了。如果没有更新成功,那么有可能是服务端没有数据更新,就等待30s
。
这里需要提前说明一下,网关在判断是否更新成功时,有比对数据的操作,马上就会提到。
- HttpSyncDataService#updateCacheWithJson
更新网关内存中的数据。使用GSON
进行反序列化,从属性data
中拿真正的数据,然后交给DataRefreshFactory
去做更新。
private boolean updateCacheWithJson(final String json) {
// 使用GSON进行反序列化
JsonObject jsonObject = GSON.fromJson(json, JsonObject.class);
JsonObject data = jsonObject.getAsJsonObject("data");
// if the config cache will be updated?
return factory.executor(data);
}
- DataRefreshFactory#executor()
根据不同数据类型去更新数据,返回更新结果。这里采用了parallelStream()
进行并行更新,具体更新逻辑交给了dataRefresh.refresh()
方法。在更新结果中,有一种数据类型进行了更新,就表示此次操作发生了更新。
public boolean executor(final JsonObject data) {
//并行更新数据
List<Boolean> result = ENUM_MAP.values().parallelStream()
.map(dataRefresh -> dataRefresh.refresh(data))
.collect(Collectors.toList());
//有一个更新就表示此次发生了更新操作
return result.stream().anyMatch(Boolean.TRUE::equals);
}
- AbstractDataRefresh#refresh()
数据更新逻辑采用的是模板方法设计模式,通用操作在抽象方法中完成,不同的实现逻辑由子类完成。5种数据类型具体的更新逻辑有些差异,但是也存在通用的更新逻辑,类图关系如下:
在通用的refresh()
方法中,负责数据类型转换,判断是否需要更新,和实际的数据刷新操作。
@Override
public Boolean refresh(final JsonObject data) {
boolean updated = false;
// 数据类型转换
JsonObject jsonObject = convert(data);
if (null != jsonObject) {
// 得到数据类型
ConfigData<T> result = fromJson(jsonObject);
// 是否需要更新
if (this.updateCacheIfNeed(result)) {
updated = true;
// 真正的更新逻辑,数据刷新操作
refresh(result.getData());
}
}
return updated;
}
- AbstractDataRefresh#updateCacheIfNeed()
数据转换的过程,就是根据不同的数据类型进行转换,我们就不再进一步追踪了,看看数据是否需要更新的逻辑。方法名是updateCacheIfNeed()
,通过方法重载实现。
// result是数据
protected abstract boolean updateCacheIfNeed(ConfigData<T> result);
// newVal是获取到的最新的值
// groupEnum 是哪种数据类型
protected boolean updateCacheIfNeed(final ConfigData<T> newVal, final ConfigGroupEnum groupEnum) {
// 如果是第一次,那么直接放到cache中,返回 true,表示此次进行了更新
if (GROUP_CACHE.putIfAbsent(groupEnum, newVal) == null) {
return true;
}
ResultHolder holder = new ResultHolder(false);
GROUP_CACHE.merge(groupEnum, newVal, (oldVal, value) -> {
// md5 值相同,不需要更新
if (StringUtils.equals(oldVal.getMd5(), newVal.getMd5())) {
log.info("Get the same config, the [{}] config cache will not be updated, md5:{}", groupEnum, oldVal.getMd5());
return oldVal;
}
// 当前缓存的数据修改时间大于 新来的数据,不需要更新
// must compare the last update time
if (oldVal.getLastModifyTime() >= newVal.getLastModifyTime()) {
log.info("Last update time earlier than the current configuration, the [{}] config cache will not be updated", groupEnum);
return oldVal;
}
log.info("update {} config: {}", groupEnum, newVal);
holder.result = true;
return newVal;
});
return holder.result;
}
从上面的源码中可以看到,有两种情况不需要更新:
- 两个的数据的
md5
值相同,不需要更新;
- 当前缓存的数据修改时间大于 新来的数据,不需要更新。
其他情况需要更新数据。
分析到这里,就将start()
方法中初次启动,获取全量数据的逻辑分析完了,接下来是长轮询的操作。为了方便,我将start()
方法再粘贴一次:
private void start() {
// It could be initialized multiple times, so you need to control that.
if (RUNNING.compareAndSet(false, true)) {
// fetch all group configs.
// 初次启动,获取全量数据
this.fetchGroupConfig(ConfigGroupEnum.values());
// 一个后台服务,一个线程
int threadSize = serverList.size();
// 自定义线程池
this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
ShenyuThreadFactory.create("http-long-polling", true));
// start long polling, each server creates a thread to listen for changes.
// 开始长轮询,一个admin服务,创建一个线程用于数据同步
this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));
} else {
log.info("shenyu http long polling was started, executor=[{}]", executor);
}
}
2.3.2 执行长轮询任务
- HttpLongPollingTask#run()
长轮询任务是HttpLongPollingTask
,它实现了Runnable
接口,任务逻辑在run()
方法中。通过while()
循环实现不断执行任务,即长轮询。在每一次的轮询中有三次重试逻辑,一次轮询任务失败了,等 5s
再继续,3
次都失败了,等 5
分钟再试。
开始长轮询,一个admin
服务,创建一个线程用于数据同步。
class HttpLongPollingTask implements Runnable {
private String server;
// 默认重试 3 次
private final int retryTimes = 3;
HttpLongPollingTask(final String server) {
this.server = server;
}
@Override
public void run() {
// 一直轮询
while (RUNNING.get()) {
for (int time = 1; time <= retryTimes; time++) {
try {
doLongPolling(server);
} catch (Exception e) {
// print warnning log.
if (time < retryTimes) {
log.warn("Long polling failed, tried {} times, {} times left, will be suspended for a while! {}",
time, retryTimes - time, e.getMessage());
// 长轮询失败了,等 5s 再继续
ThreadUtils.sleep(TimeUnit.SECONDS, 5);
continue;
}
// print error, then suspended for a while.
log.error("Long polling failed, try again after 5 minutes!", e);
// 3 次都失败了,等 5 分钟再试
ThreadUtils.sleep(TimeUnit.MINUTES, 5);
}
}
}
log.warn("Stop http long polling.");
}
}
- HttpSyncDataService#doLongPolling()
执行长轮询任务的核心逻辑:
- 根据数据类型组装请求参数:
md5
和 lastModifyTime
;
- 组装请求头和请求体;
- 向
admin
发起请求,判断组数据是否发生变更;
- 根据发生变更的组,再去获取数据。
private void doLongPolling(final String server) {
// 组装请求参数:md5 和 lastModifyTime
MultiValueMap<String, String> params = new LinkedMultiValueMap<>(8);
for (ConfigGroupEnum group : ConfigGroupEnum.values()) {
ConfigData<?> cacheConfig = factory.cacheConfigData(group);
if (cacheConfig != null) {
String value = String.join(",", cacheConfig.getMd5(), String.valueOf(cacheConfig.getLastModifyTime()));
params.put(group.name(), Lists.newArrayList(value));
}
}
// 组装请求头和请求体
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
HttpEntity httpEntity = new HttpEntity(params, headers);
String listenerUrl = server + "/configs/listener";
log.debug("request listener configs: [{}]", listenerUrl);
JsonArray groupJson = null;
//向admin发起请求,判断组数据是否发生变更
//这里只是判断了某个组是否发生变更
try {
String json = this.httpClient.postForEntity(listenerUrl, httpEntity, String.class).getBody();
log.debug("listener result: [{}]", json);
groupJson = GSON.fromJson(json, JsonObject.class).getAsJsonArray("data");
} catch (RestClientException e) {
String message = String.format("listener configs fail, server:[%s], %s", server, e.getMessage());
throw new ShenyuException(message, e);
}
// 根据发生变更的组,再去获取数据
/**
* 官网对此处的解释:
* 网关收到响应信息之后,只知道是哪个 Group 发生了配置变更,还需要再次请求该 Group 的配置数据。
* 这里可能会存在一个疑问:为什么不是直接将变更的数据写出?
* 我们在开发的时候,也深入讨论过该问题,因为 http 长轮询机制只能保证准实时,如果在网关层处理不及时,
* 或者管理员频繁更新配置,很有可能便错过了某个配置变更的推送,安全起见,我们只告知某个 Group 信息发生了变更。
*
* 个人理解:
* 如果将变更数据直接写出,当管理员频繁更新配置时,第一次更新了,将client移除阻塞队列,返回响应信息给网关。
* 如果这个时候进行了第二次更新,那么当前的client是不在阻塞队列中,所以这一次的变更就会错过。
* 网关层处理不及时,也是同理。
* 这是一个长轮询,一个网关一个同步线程,可能存在耗时的过程。
* 如果admin有数据变更,当前网关client是没有在阻塞队列中,就不到数据。
*/
if (groupJson != null) {
// fetch group configuration async.
ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class);
if (ArrayUtils.isNotEmpty(changedGroups)) {
log.info("Group config changed: {}", Arrays.toString(changedGroups));
// 主动向admin获取变更的数据,根据分组不同,全量拿数据
this.doFetchGroupConfig(server, changedGroups);
}
}
}
这里需要特别解释一点的是:在长轮询任务中,为什么不直接拿到变更的数据?而是先判断哪个分组数据发生了变更,然后再次请求admin
,获取变更数据?
官网对此处的解释是:
网关收到响应信息之后,只知道是哪个 Group 发生了配置变更,还需要再次请求该 Group 的配置数据。
这里可能会存在一个疑问:为什么不是直接将变更的数据写出?
我们在开发的时候,也深入讨论过该问题,因为 http
长轮询机制只能保证准实时,如果在网关层处理不及时,
或者管理员频繁更新配置,很有可能便错过了某个配置变更的推送,安全起见,我们只告知某个 Group 信息发生了变更。
个人理解是:
如果将变更数据直接写出,管理员频繁更新配置时,第一次更新了,将client
移除阻塞队列,返回响应信息给网关。如果这个时候进行了第二次更新,那么当前的client
是不在阻塞队列中,所以这一次的变更就会错过。网关层处理不及时,也是同理。 这是一个长轮询,一个网关一个同步线程,可能存在耗时的过程。如果admin
有数据变更,当前网关client是没有在阻塞队列中,就不到数据。
我们还没有分析到admin
端的处理逻辑,先大概说一下。在admin
端,会将网关client
放到阻塞队列,有数据变更,网关client
就会出队列,发送变更数据。所以,如果有数据变更时,网关client
不在阻塞队列,那么就无法得到当前变更的数据。
知道哪个分组数据发生变更时,主动再向admin
获取变更的数据,根据分组不同,全量拿数据。调用方法是doFetchGroupConfig()
,这个在前面已经分析过了。
分析到这里,网关端的数据同步操作就完成了。长轮询任务就是不断向admin
发起请求,看看数据是否发生变更,如果有分组数据发生变更,那么就再主动向admin
发起请求,获取变更数据,然后更新网关内存中的数据。
网关端长轮询任务流程:
3. admin数据同步
从前面分析的过程中,可以看到,网关端主要调用admin
的两个接口:
/configs/listener
:判断组数据是否发生变更;
/configs/fetch
:获取变更组数据。
直接从这两个接口分析的话,可能有的地方不好理解,所以我们还是从admin
启动流程开始分析数据同步过程。
3.1 加载配置
如果在配置文件application.yml
中,进行了如下配置,就表示通过http长轮询
的方式进行数据同步。
shenyu:
sync:
http:
enabled: true
程序启动时,通过springboot
条件装配实现数据同步类的配置加载。在这个过程中,会创建HttpLongPollingDataChangedListener
,负责处理长轮询的相关实现逻辑。
/**
* 数据同步配置类
* 通过springboot条件装配实现
* The type Data sync configuration.
*/
@Configuration
public class DataSyncConfiguration {
/**
* http长轮询
* http long polling.
*/
@Configuration
@ConditionalOnProperty(name = "shenyu.sync.http.enabled", havingValue = "true")
@EnableConfigurationProperties(HttpSyncProperties.class)
static class HttpLongPollingListener {
@Bean
@ConditionalOnMissingBean(HttpLongPollingDataChangedListener.class)
public HttpLongPollingDataChangedListener httpLongPollingDataChangedListener(final HttpSyncProperties httpSyncProperties) {
return new HttpLongPollingDataChangedListener(httpSyncProperties);
}
}
}
3.2 数据变更监听器实例化
- HttpLongPollingDataChangedListener
数据变更监听器通过构造函数的方式完成实例化和初始化操作。在构造函数中会创建阻塞队列,用于存放客户端;创建线程池,用于执行延迟任务,周期任务;保存长轮询相关属性信息。
public HttpLongPollingDataChangedListener(final HttpSyncProperties httpSyncProperties) {
// 默认客户端(这里是网关)1024个
this.clients = new ArrayBlockingQueue<>(1024);
// 创建线程池
// ScheduledThreadPoolExecutor 可以执行延迟任务,周期任务,普通任务
this.scheduler = new ScheduledThreadPoolExecutor(1,
ShenyuThreadFactory.create("long-polling", true));
// 长轮询的属性信息
this.httpSyncProperties = httpSyncProperties;
}
另外,它的类图关系如下:
实现了InitializingBean
接口,所以在bean
的初始化过程中执行afterInitialize()
方法。通过线程池执行周期任务:更新内存中(CACHE)
的数据每隔5
分钟执行一次,5
分钟后开始执行。刷新本地缓存就是从数据库读取数据到本地缓存(这里就是内存),通过refreshLocalCache()
完成。
/**
* 在 InitializingBean接口中的afterPropertiesSet()方法中被调用,即在bean的初始化过程中执行
*/
@Override
protected void afterInitialize() {
long syncInterval = httpSyncProperties.getRefreshInterval().toMillis();
// Periodically check the data for changes and update the cache
// 执行周期任务:更新内存中(CACHE)的数据每隔5分钟执行一次,5分钟后开始执行
// 防止admin先启动一段时间后,产生了数据;然后网关初次连接时,没有拿到全量数据
scheduler.scheduleWithFixedDelay(() -> {
log.info("http sync strategy refresh config start.");
try {
// 从数据库读取数据到本地缓存(这里就是内存)
this.refreshLocalCache();
log.info("http sync strategy refresh config success.");
} catch (Exception e) {
log.error("http sync strategy refresh config error!", e);
}
}, syncInterval, syncInterval, TimeUnit.MILLISECONDS);
log.info("http sync strategy refresh interval: {}ms", syncInterval);
}
分别对5种数据类型进行更新。
// 从数据库读取数据到本地缓存(这里就是内存)
private void refreshLocalCache() {
//更新认证数据
this.updateAppAuthCache();
//更新插件数据
this.updatePluginCache();
//更新规则数据
this.updateRuleCache();
//更新选择器数据
this.updateSelectorCache();
//更新元数据
this.updateMetaDataCache();
}
5个更新方法的逻辑是类似的,调用service
方法获取数据,然后放到内存CACHE
中。以更新规则数据方法updateRuleCache()
为例,传入规则枚举类型,调用ruleService.listAll()
从数据库获取所有规则数据。
/**
* Update rule cache.
*/
protected void updateRuleCache() {
this.updateCache(ConfigGroupEnum.RULE, ruleService.listAll());
}
使用数据库中的数据更新内存中的数据。
// 缓存数据的 Map
protected static final ConcurrentMap<String, ConfigDataCache> CACHE = new ConcurrentHashMap<>();
/**
* if md5 is not the same as the original, then update lcoal cache.
* 更新缓存中的数据
* @param group ConfigGroupEnum
* @param <T> the type of class
* @param data the new config data
*/
protected <T> void updateCache(final ConfigGroupEnum group, final List<T> data) {
//数据序列化
String json = GsonUtils.getInstance().toJson(data);
//传入md5值和修改时间
ConfigDataCache newVal = new ConfigDataCache(group.name(), json, Md5Utils.md5(json), System.currentTimeMillis());
//更新分组数据
ConfigDataCache oldVal = CACHE.put(newVal.getGroup(), newVal);
log.info("update config cache[{}], old: {}, updated: {}", group, oldVal, newVal);
}
初始化的过程就是启动周期性任务,定时从数据库获取数据更新内存数据。
接下来开始对两个接口开始分析:
/configs/listener
:判断组数据是否发生变更;
/configs/fetch
:获取变更组数据。
3.3 数据变更轮询接口
/configs/listener
:判断组数据是否发生变更;
接口类是ConfigController
,只有使用http长轮询
进行数据同步时才会生效。接口方法listener()
没有其他逻辑,直接调用doLongPolling()
方法。
/**
* This Controller only when HttpLongPollingDataChangedListener exist, will take effect.
*/
@ConditionalOnBean(HttpLongPollingDataChangedListener.class)
@RestController
@RequestMapping("/configs")
@Slf4j
public class ConfigController {
@Resource
private HttpLongPollingDataChangedListener longPollingListener;
// 省略其他逻辑
/**
* Listener.
* 监听数据变更,执行长轮询
* @param request the request
* @param response the response
*/
@PostMapping(value = "/listener")
public void listener(final HttpServletRequest request, final HttpServletResponse response) {
longPollingListener.doLongPolling(request, response);
}
}
- HttpLongPollingDataChangedListener#doLongPolling()
执行长轮询任务:如果有数据变更,将会立即响应给客户端(这里就是网关端)。否则,客户端会一直被阻塞,直到有数据变更或者超时。
/**
* 执行长轮询:如果有数据变更,会立即响应给客户端(这里就是网关端)。
* 否则,否则客户端会一直被阻塞,直到有数据变更或者超时。
* @param request
* @param response
*/
public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) {
// compare group md5
// 比较md5,判断网关的数据和admin端的数据是否一致,得到发生变更的数据组
List<ConfigGroupEnum> changedGroup = compareChangedGroup(request);
String clientIp = getRemoteIp(request);
// response immediately.
// 有变更的数据,则立即向网关响应
if (CollectionUtils.isNotEmpty(changedGroup)) {
this.generateResponse(response, changedGroup);
log.info("send response with the changed group, ip={}, group={}", clientIp, changedGroup);
return;
}
// 没有变更,则将客户端(这里就是网关)放进阻塞队列
// listen for configuration changed.
final AsyncContext asyncContext = request.startAsync();
// AsyncContext.settimeout() does not timeout properly, so you have to control it yourself
asyncContext.setTimeout(0L);
// block client's thread.
scheduler.execute(new LongPollingClient(asyncContext, clientIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT));
}
- HttpLongPollingDataChangedListener#compareChangedGroup()
判断组数据是否发生变更,判断逻辑是比较网关端和admin
端的md5
值和lastModifyTime
。
- 如果
md5
值不一样,那么需要更新;
- 如果
admin
端的lastModifyTime
大于网关端的lastModifyTime
,那么需要更新。
/**
* 判断组数据是否发生变更
* @param request
* @return
*/
private List<ConfigGroupEnum> compareChangedGroup(final HttpServletRequest request) {
List<ConfigGroupEnum> changedGroup = new ArrayList<>(ConfigGroupEnum.values().length);
for (ConfigGroupEnum group : ConfigGroupEnum.values()) {
// 网关端数据的md5值和lastModifyTime
String[] params = StringUtils.split(request.getParameter(group.name()), ',');
if (params == null || params.length != 2) {
throw new ShenyuException("group param invalid:" + request.getParameter(group.name()));
}
String clientMd5 = params[0];
long clientModifyTime = NumberUtils.toLong(params[1]);
ConfigDataCache serverCache = CACHE.get(group.name());
// do check. 判断组数据是否发生变更
if (this.checkCacheDelayAndUpdate(serverCache, clientMd5, clientModifyTime)) {
changedGroup.add(group);
}
}
return changedGroup;
}
没有变更数据,则将客户端(这里就是网关)放进阻塞队列。阻塞时间是60秒,即60秒后移除,并响应客户端。
class LongPollingClient implements Runnable {
// 省略了其他逻辑
@Override
public void run() {
try {
// 60秒后移除,并响应客户端
this.asyncTimeoutFuture = scheduler.schedule(() -> {
clients.remove(LongPollingClient.this);
List<ConfigGroupEnum> changedGroups = compareChangedGroup((HttpServletRequest) asyncContext.getRequest());
sendResponse(changedGroups);
}, timeoutTime, TimeUnit.MILLISECONDS);
// 添加到阻塞队列
clients.add(this);
} catch (Exception ex) {
log.error("add long polling client error", ex);
}
}
/**
* Send response.
*
* @param changedGroups the changed groups
*/
void sendResponse(final List<ConfigGroupEnum> changedGroups) {
// cancel scheduler
if (null != asyncTimeoutFuture) {
asyncTimeoutFuture.cancel(false);
}
// 响应变更的组
generateResponse((HttpServletResponse) asyncContext.getResponse(), changedGroups);
asyncContext.complete();
}
}
3.4 获取变更数据接口
根据网关传入的参数,获取分组数据,返回结果。主要实现方法是longPollingListener.fetchConfig()
。
@ConditionalOnBean(HttpLongPollingDataChangedListener.class)
@RestController
@RequestMapping("/configs")
@Slf4j
public class ConfigController {
@Resource
private HttpLongPollingDataChangedListener longPollingListener;
/**
* Fetch configs shenyu result.
* 全量获取分组数据
* @param groupKeys the group keys
* @return the shenyu result
*/
@GetMapping("/fetch")
public ShenyuAdminResult fetchConfigs(@NotNull final String[] groupKeys) {
Map<String, ConfigData<?>> result = Maps.newHashMap();
for (String groupKey : groupKeys) {
ConfigData<?> data = longPollingListener.fetchConfig(ConfigGroupEnum.valueOf(groupKey));
result.put(groupKey, data);
}
return ShenyuAdminResult.success(ShenyuResultMessage.SUCCESS, result);
}
// 省略了其他接口
}
- AbstractDataChangedListener#fetchConfig()
数据获取直接从CACHE
中拿,然后根据不同分组类型进行匹配,封装。
/**
* fetch configuration from cache.
* 获取分组下的全量数据
* @param groupKey the group key
* @return the configuration data
*/
public ConfigData<?> fetchConfig(final ConfigGroupEnum groupKey) {
// 直接从 CACHE 中拿数据
ConfigDataCache config = CACHE.get(groupKey.name());
switch (groupKey) {
case APP_AUTH: // 认证数据
List<AppAuthData> appAuthList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<AppAuthData>>() {
}.getType());
return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), appAuthList);
case PLUGIN: // 插件数据
List<PluginData> pluginList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<PluginData>>() {
}.getType());
return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), pluginList);
case RULE: // 规则数据
List<RuleData> ruleList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<RuleData>>() {
}.getType());
return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), ruleList);
case SELECTOR: // 选择器数据
List<SelectorData> selectorList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<SelectorData>>() {
}.getType());
return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), selectorList);
case META_DATA: // 元数据
List<MetaData> metaList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<MetaData>>() {
}.getType());
return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), metaList);
default: // 其他类型,抛出异常
throw new IllegalStateException("Unexpected groupKey: " + groupKey);
}
3.5 数据变更
在之前的websocket
数据同步和zookeeper
数据同步源码分析文章中,我们知道admin
端数据同步设计结构如下:
各种数据变更监听器都是DataChangedListener
的子类。
当在admin
端修改数据后,通过Spring
的事件处理机制,发送事件通知。发送逻辑如下:
/**
* Event forwarders, which forward the changed events to each ConfigEventListener.
* 数据变更事件分发器:当admin端有数据发生变更时,将变更数据同步到 ShenYu 网关
* 数据变更依赖于Spring的事件监听机制:ApplicationEventPublisher --> ApplicationEvent --> ApplicationListener
*
*/
@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
//省略了其他逻辑
/**
* 有数据变更时,调用此方法
* @param event
*/
@Override
@SuppressWarnings("unchecked")
public void onApplicationEvent(final DataChangedEvent event) {
// 遍历数据变更监听器(一般使用一种数据同步的方式就好了)
for (DataChangedListener listener : listeners) {
// 哪种数据发生变更
switch (event.getGroupKey()) {
case APP_AUTH: // 认证信息
listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
break;
case PLUGIN: // 插件信息
listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
break;
case RULE: // 规则信息
listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
break;
case SELECTOR: // 选择器信息
listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
break;
case META_DATA: // 元数据
listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
break;
default: // 其他类型,抛出异常
throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
}
}
}
}
假设,对插件信息进行了修改,通过http长轮询
的方式进行数据同步,那么listener.onPluginChanged()
的实际调用的是org.apache.shenyu.admin.listener.AbstractDataChangedListener#onPluginChanged
:
/**
* 在admin的操作,有插件发生了更新
* @param changed the changed
* @param eventType the event type
*/
@Override
public void onPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {
if (CollectionUtils.isEmpty(changed)) {
return;
}
// 更新内存CACHE
this.updatePluginCache();
// 执行变更任务
this.afterPluginChanged(changed, eventType);
}
有两个处理操作,一是更新内存CACHE
,这个在前面分析过了;另一个是执行变更任务,在线程池中执行。
- HttpLongPollingDataChangedListener#afterPluginChanged()
@Override
protected void afterPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {
// 在线程池中执行
scheduler.execute(new DataChangeTask(ConfigGroupEnum.PLUGIN));
}
数据变更任务:将阻塞队列中的客户端依次移除,并发送响应,通知网关有组数据发生变更。
class DataChangeTask implements Runnable {
//省略了其他逻辑
@Override
public void run() {
// 阻塞队列中的客户端超过了给定的值100,则分批执行
if (clients.size() > httpSyncProperties.getNotifyBatchSize()) {
List<LongPollingClient> targetClients = new ArrayList<>(clients.size());
clients.drainTo(targetClients);
List<List<LongPollingClient>> partitionClients = Lists.partition(targetClients, httpSyncProperties.getNotifyBatchSize());
// 分批执行
partitionClients.forEach(item -> scheduler.execute(() -> doRun(item)));
} else {
// 执行任务
doRun(clients);
}
}
private void doRun(final Collection<LongPollingClient> clients) {
// 通知所有客户端发生了数据变更
for (Iterator<LongPollingClient> iter = clients.iterator(); iter.hasNext();) {
LongPollingClient client = iter.next();
iter.remove();
// 发送响应
client.sendResponse(Collections.singletonList(groupKey));
log.info("send response with the changed group,ip={}, group={}, changeTime={}", client.ip, groupKey, changeTime);
}
}
}
至此,admin
端数据同步逻辑就分析完了。在基于http长轮询
数据同步是,它主要有三个功能:
- 提供数据变更监听接口;
- 提供获取变更数据接口;
- 有数据变更时,移除阻塞队列中的客户端,并响应结果。
最后,用三张图描述下admin
端长轮询任务流程:
/configs/listener
数据变更监听接口:
4. 总结
本文主要对ShenYu
网关中的http长轮询
数据同步进行了源码分析。涉及到的主要知识点如下:
http长轮询
由网关端主动发起请求,不断请求admin
端;
- 变更数据以组为粒度(认证信息、插件、选择器、规则、元数据);
http长轮询
结果只拿到了变更组,还需要再次发起请求获取组数据;
- 数据是否更新由
md5
值和修改时间lastModifyTime
决定。
16 Sep 2021 |
ShenYu |
Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API
网关。
在ShenYu
网关中,数据同步是指,当在后台管理系统中,数据发送了更新后,如何将更新的数据同步到网关中。Apache ShenYu
网关当前支持ZooKeeper
、WebSocket
、Http长轮询
、Nacos
、Etcd
和 Consul
进行数据同步。本文的主要内容是基于ZooKeeper
的数据同步源码分析。
本文基于shenyu-2.4.0
版本进行源码分析,官网的介绍请参考 数据同步原理 。
1. 关于ZooKeeper
Apache ZooKeeper
是Apache
软件基金会的一个软件项目,它为大型分布式计算提供开源的分布式配置服务、同步服务和命名注册。ZooKeeper
节点将它们的数据存储于一个分层的名字空间,非常类似于一个文件系统或一个前缀树结构。客户端可以在节点读写,从而以这种方式拥有一个共享的配置服务。
2. Admin数据同步
我们从一个实际案例进行源码追踪,比如在后台管理系统中,对Divide
插件中的一条选择器数据进行更新,将权重更新为90:
2.1 接收数据
- SelectorController.createSelector()
进入SelectorController
类中的updateSelector()
方法,它负责数据的校验,添加或更新数据,返回结果信息。
@Validated
@RequiredArgsConstructor
@RestController
@RequestMapping("/selector")
public class SelectorController {
@PutMapping("/{id}")
public ShenyuAdminResult updateSelector(@PathVariable("id") final String id, @Valid @RequestBody final SelectorDTO selectorDTO) {
// 设置当前选择器数据id
selectorDTO.setId(id);
// 创建或更新操作
Integer updateCount = selectorService.createOrUpdate(selectorDTO);
// 返回结果信息
return ShenyuAdminResult.success(ShenyuResultMessage.UPDATE_SUCCESS, updateCount);
}
// ......
}
2.2 处理数据
- SelectorServiceImpl.createOrUpdate()
在SelectorServiceImpl
类中通过createOrUpdate()
方法完成数据的转换,保存到数据库,发布事件,更新upstream
。
@RequiredArgsConstructor
@Service
public class SelectorServiceImpl implements SelectorService {
// 负责事件发布的eventPublisher
private final ApplicationEventPublisher eventPublisher;
@Override
@Transactional(rollbackFor = Exception.class)
public int createOrUpdate(final SelectorDTO selectorDTO) {
int selectorCount;
// 构建数据 DTO --> DO
SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);
List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();
// 判断是添加还是更新
if (StringUtils.isEmpty(selectorDTO.getId())) {
// 插入选择器数据
selectorCount = selectorMapper.insertSelective(selectorDO);
// 插入选择器中的条件数据
selectorConditionDTOs.forEach(selectorConditionDTO -> {
selectorConditionDTO.setSelectorId(selectorDO.getId());
selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));
});
// check selector add
// 权限检查
if (dataPermissionMapper.listByUserId(JwtUtils.getUserInfo().getUserId()).size() > 0) {
DataPermissionDTO dataPermissionDTO = new DataPermissionDTO();
dataPermissionDTO.setUserId(JwtUtils.getUserInfo().getUserId());
dataPermissionDTO.setDataId(selectorDO.getId());
dataPermissionDTO.setDataType(AdminConstants.SELECTOR_DATA_TYPE);
dataPermissionMapper.insertSelective(DataPermissionDO.buildPermissionDO(dataPermissionDTO));
}
} else {
// 更新数据,先删除再新增
selectorCount = selectorMapper.updateSelective(selectorDO);
//delete rule condition then add
selectorConditionMapper.deleteByQuery(new SelectorConditionQuery(selectorDO.getId()));
selectorConditionDTOs.forEach(selectorConditionDTO -> {
selectorConditionDTO.setSelectorId(selectorDO.getId());
SelectorConditionDO selectorConditionDO = SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO);
selectorConditionMapper.insertSelective(selectorConditionDO);
});
}
// 发布事件
publishEvent(selectorDO, selectorConditionDTOs);
// 更新upstream
updateDivideUpstream(selectorDO);
return selectorCount;
}
// ......
}
在Serrvice
类完成数据的持久化操作,即保存数据到数据库,这个比较简单,就不深入追踪了。关于更新upstream
操作,放到后面对应的章节中进行分析,重点关注发布事件的操作,它会执行数据同步。
publishEvent()
方法的逻辑是:找到选择器对应的插件,构建条件数据,发布变更数据。
private void publishEvent(final SelectorDO selectorDO, final List<SelectorConditionDTO> selectorConditionDTOs) {
// 找到选择器对应的插件
PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId());
// 构建条件数据
List<ConditionData> conditionDataList = selectorConditionDTOs.stream().map(ConditionTransfer.INSTANCE::mapToSelectorDTO).collect(Collectors.toList());
// 发布变更数据
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE,
Collections.singletonList(SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList))));
}
发布变更数据通过eventPublisher.publishEvent()
完成,这个eventPublisher
对象是一个ApplicationEventPublisher
类,这个类的全限定名是org.springframework.context.ApplicationEventPublisher
。看到这儿,我们知道了发布数据是通过Spring
相关的功能来完成的。
关于ApplicationEventPublisher
:
当有状态发生变化时,发布者调用 ApplicationEventPublisher
的 publishEvent
方法发布一个事件,Spring
容器广播事件给所有观察者,调用观察者的 onApplicationEvent
方法把事件对象传递给观察者。调用 publishEvent
方法有两种途径,一种是实现接口由容器注入 ApplicationEventPublisher
对象然后调用其方法,另一种是直接调用容器的方法,两种方法发布事件没有太大区别。
ApplicationEventPublisher
:发布事件;
ApplicationEvent
:Spring
事件,记录事件源、时间和数据;
ApplicationListener
:事件监听者,观察者;
在Spring
的事件发布机制中,有三个对象,
一个是发布事件的ApplicationEventPublisher
,在ShenYu
中通过构造器注入了一个eventPublisher
。
另一个对象是ApplicationEvent
,在ShenYu
中通过DataChangedEvent
继承了它,表示事件对象。
public class DataChangedEvent extends ApplicationEvent {
//......
}
最后一个是 ApplicationListener
,在ShenYu
中通过DataChangedEventDispatcher
类实现了该接口,作为事件的监听者,负责处理事件对象。
@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
//......
}
2.3 分发数据
- DataChangedEventDispatcher.onApplicationEvent()
当事件发布完成后,会自动进入到DataChangedEventDispatcher
类中的onApplicationEvent()
方法,进行事件处理。
@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
/**
* 有数据变更时,调用此方法
* @param event
*/
@Override
@SuppressWarnings("unchecked")
public void onApplicationEvent(final DataChangedEvent event) {
// 遍历数据变更监听器(一般使用一种数据同步的方式就好了)
for (DataChangedListener listener : listeners) {
// 哪种数据发生变更
switch (event.getGroupKey()) {
case APP_AUTH: // 认证信息
listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
break;
case PLUGIN: // 插件信息
listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
break;
case RULE: // 规则信息
listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
break;
case SELECTOR: // 选择器信息
listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
break;
case META_DATA: // 元数据
listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
break;
default: // 其他类型,抛出异常
throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
}
}
}
}
当有数据变更时,调用onApplicationEvent
方法,然后遍历所有数据变更监听器,判断是哪种数据类型,交给相应的数据监听器进行处理。
ShenYu
将所有数据进行了分组,一共是五种:认证信息、插件信息、规则信息、选择器信息和元数据。
这里的数据变更监听器(DataChangedListener
),就是数据同步策略的抽象,它的具体实现有:
这几个实现类就是当前ShenYu
支持的同步策略:
WebsocketDataChangedListener
:基于websocket
的数据同步;
ZookeeperDataChangedListener
:基于zookeeper
的数据同步;
ConsulDataChangedListener
:基于consul
的数据同步;
EtcdDataDataChangedListener
:基于etcd
的数据同步;
HttpLongPollingDataChangedListener
:基于http长轮询
的数据同步;
NacosDataChangedListener
:基于nacos
的数据同步;
既然有这么多种实现策略,那么如何确定使用哪一种呢?
因为本文是基于Zookeeper
的数据同步源码分析,所以这里以ZookeeperDataChangedListener
为例,分析它是如何被加载并实现的。
通过在源码工程中进行全局搜索,可以看到,它的实现是在DataSyncConfiguration
类完成的。
/**
* 数据同步配置类
* 通过springboot条件装配实现
* The type Data sync configuration.
*/
@Configuration
public class DataSyncConfiguration {
/**
* zookeeper数据同步
* The type Zookeeper listener.
*/
@Configuration
@ConditionalOnProperty(prefix = "shenyu.sync.zookeeper", name = "url") // 条件属性,满足才会被加载
@Import(ZookeeperConfiguration.class)
static class ZookeeperListener {
/**
* Config event listener data changed listener.
* 创建Zookeeper数据变更监听器
* @param zkClient the zk client
* @return the data changed listener
*/
@Bean
@ConditionalOnMissingBean(ZookeeperDataChangedListener.class)
public DataChangedListener zookeeperDataChangedListener(final ZkClient zkClient) {
return new ZookeeperDataChangedListener(zkClient);
}
/**
* Zookeeper data init zookeeper data init.
* 创建 Zookeeper 数据初始化类
* @param zkClient the zk client
* @param syncDataService the sync data service
* @return the zookeeper data init
*/
@Bean
@ConditionalOnMissingBean(ZookeeperDataInit.class)
public ZookeeperDataInit zookeeperDataInit(final ZkClient zkClient, final SyncDataService syncDataService) {
return new ZookeeperDataInit(zkClient, syncDataService);
}
}
//省略了其他代码......
}
这个配置类是通过SpringBoot
条件装配类实现的。在ZookeeperListener
类上面有几个注解:
-
@Configuration
:配置文件,应用上下文;
-
@ConditionalOnProperty(prefix = "shenyu.sync.zookeeper", name = "url")
:属性条件判断,满足条件,该配置类才会生效。也就是说,当我们有如下配置时,就会采用zookeeper
进行数据同步。
shenyu:
sync:
zookeeper:
url: localhost:2181
sessionTimeout: 5000
connectionTimeout: 2000
-
` @Import(ZookeeperConfiguration.class):导入另一个类
ZookeeperConfiguration`;
@EnableConfigurationProperties(ZookeeperProperties.class) // 启用zk属性配置类
public class ZookeeperConfiguration {
/**
* register zkClient in spring ioc.
* 向 Spring IOC 容器注册 zkClient
* @param zookeeperProp the zookeeper configuration
* @return ZkClient {@linkplain ZkClient}
*/
@Bean
@ConditionalOnMissingBean(ZkClient.class)
public ZkClient zkClient(final ZookeeperProperties zookeeperProp) {
return new ZkClient(zookeeperProp.getUrl(), zookeeperProp.getSessionTimeout(), zookeeperProp.getConnectionTimeout()); // 读取zk配置信息,并创建zkClient
}
}
@Data
@ConfigurationProperties(prefix = "shenyu.sync.zookeeper") // zk属性配置
public class ZookeeperProperties {
private String url;
private Integer sessionTimeout;
private Integer connectionTimeout;
private String serializer;
}
当我们主动配置,采用zookeeper
进行数据同步时,zookeeperDataChangedListener
就会生成。所以在事件处理方法onApplicationEvent()
中,就会到相应的listener
中。在我们的案例中,是对一条选择器数据进行更新,数据同步采用的是zookeeper
,所以,代码会进入到ZookeeperDataChangedListener
进行选择器数据变更处理。
@Override
@SuppressWarnings("unchecked")
public void onApplicationEvent(final DataChangedEvent event) {
// 遍历数据变更监听器(一般使用一种数据同步的方式就好了)
for (DataChangedListener listener : listeners) {
// 哪种数据发生变更
switch (event.getGroupKey()) {
// 省略了其他逻辑
case SELECTOR: // 选择器信息
listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType()); // 在我们的案例中,会进入到ZookeeperDataChangedListener进行选择器数据变更处理
break;
}
}
2.4 Zookeeper数据变更监听器
/**
* 使用 zookeeper 发布变更数据
*/
public class ZookeeperDataChangedListener implements DataChangedListener {
// 选择器信息发生改变
@Override
public void onSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {
// 刷新操作
if (eventType == DataEventTypeEnum.REFRESH && !changed.isEmpty()) {
String selectorParentPath = DefaultPathConstants.buildSelectorParentPath(changed.get(0).getPluginName());
deleteZkPathRecursive(selectorParentPath);
}
// 发生变更的数据
for (SelectorData data : changed) {
// 构建选择器数据的真实路径
String selectorRealPath = DefaultPathConstants.buildSelectorRealPath(data.getPluginName(), data.getId());
// 如果是删除操作
if (eventType == DataEventTypeEnum.DELETE) {
// 删除当前数据
deleteZkPath(selectorRealPath);
continue;
}
// 父节点路径
String selectorParentPath = DefaultPathConstants.buildSelectorParentPath(data.getPluginName());
// 创建父节点
createZkNode(selectorParentPath);
// 插入或更新数据
insertZkNode(selectorRealPath, data);
}
}
// 创建 zk 节点
private void createZkNode(final String path) {
// 不存在才创建
if (!zkClient.exists(path)) {
zkClient.createPersistent(path, true);
}
}
// 插入zk节点
private void insertZkNode(final String path, final Object data) {
// 创建节点
createZkNode(path);
// 通过 zkClient 写入数据
zkClient.writeData(path, null == data ? "" : GsonUtils.getInstance().toJson(data));
}
}
只要将变动的数据正确写入到zk
的节点上,admin
这边的操作就执行完成了。ShenYu
在使用zk
进行数据同步时,zk
的节点是通过精心设计的。
在我们当前的案例中,对Divide
插件中的一条选择器数据进行更新,将权重更新为90,就会对图中的特定节点更新。
我们用时序图将上面的更新流程串联起来。
3. 网关数据同步
假设ShenYu
网关已经在正常运行,使用的数据同步方式也是zookeeper
。那么当在admin
端更新选择器数据后,并且向zk
发送了变更的数据,那网关是如何接收并处理数据的呢?接下来我们就继续进行源码分析,一探究竟。
3.1 ZkClient接收数据
- ZkClient.subscribeDataChanges()
在网关端有一个ZookeeperSyncDataService
类,它通过ZkClient
订阅了数据节点,当数据发生变更时,可以感知到。
/**
* 使用 zookeeper 缓存数据
*/
public class ZookeeperSyncDataService implements SyncDataService, AutoCloseable {
private void subscribeSelectorDataChanges(final String path) {
// zkClient订阅数据节点
zkClient.subscribeDataChanges(path, new IZkDataListener() {
@Override
public void handleDataChange(final String dataPath, final Object data) {
cacheSelectorData(GsonUtils.getInstance().fromJson(data.toString(), SelectorData.class)); // 节点数据被更新
}
@Override
public void handleDataDeleted(final String dataPath) {
unCacheSelectorData(dataPath); // 节点数据被删除
}
});
}
// 省略了其他逻辑
}
ZooKeeper
的Watch
机制,会给订阅的客户端发送节点变更通知。在我们的案例中,更新选择器信息,就会进入到handleDataChange()
方法。通过cacheSelectorData()
去处理数据。
3.2 处理数据
- ZookeeperSyncDataService.cacheSelectorData()
经过判空逻辑之后,缓存选择器数据的操作又交给了PluginDataSubscriber
处理。
private void cacheSelectorData(final SelectorData selectorData) {
Optional.ofNullable(selectorData)
.ifPresent(data -> Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> e.onSelectorSubscribe(data)));
}
PluginDataSubscriber
是一个接口,它只有一个CommonPluginDataSubscriber
实现类,负责处理插件、选择器和规则数据。
3.3 通用插件数据订阅者
- PluginDataSubscriber.onSelectorSubscribe()
它没有其他逻辑,直接调用subscribeDataHandler()
方法。在方法中,更具数据类型(插件、选择器或规则),操作类型(更新或删除),去执行不同逻辑。
/**
* 通用插件数据订阅者,负责处理所有插件、选择器和规则信息
* The type Common plugin data subscriber.
*/
public class CommonPluginDataSubscriber implements PluginDataSubscriber {
//......
// 处理选择器数据
@Override
public void onSelectorSubscribe(final SelectorData selectorData) {
subscribeDataHandler(selectorData, DataEventTypeEnum.UPDATE);
}
// 订阅数据处理器,处理数据的更新或删除
private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {
Optional.ofNullable(classData).ifPresent(data -> {
// 插件数据
if (data instanceof PluginData) {
PluginData pluginData = (PluginData) data;
if (dataType == DataEventTypeEnum.UPDATE) { // 更新操作
// 将数据保存到网关内存
BaseDataCache.getInstance().cachePluginData(pluginData);
// 如果每个插件还有自己的处理逻辑,那么就去处理
Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));
} else if (dataType == DataEventTypeEnum.DELETE) { // 删除操作
// 从网关内存移除数据
BaseDataCache.getInstance().removePluginData(pluginData);
// 如果每个插件还有自己的处理逻辑,那么就去处理
Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData));
}
} else if (data instanceof SelectorData) { // 选择器数据
SelectorData selectorData = (SelectorData) data;
if (dataType == DataEventTypeEnum.UPDATE) { // 更新操作
// 将数据保存到网关内存
BaseDataCache.getInstance().cacheSelectData(selectorData);
// 如果每个插件还有自己的处理逻辑,那么就去处理 Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));
} else if (dataType == DataEventTypeEnum.DELETE) { // 删除操作
// 从网关内存移除数据
BaseDataCache.getInstance().removeSelectData(selectorData);
// 如果每个插件还有自己的处理逻辑,那么就去处理
Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));
}
} else if (data instanceof RuleData) { // 规则数据
RuleData ruleData = (RuleData) data;
if (dataType == DataEventTypeEnum.UPDATE) { // 更新操作
// 将数据保存到网关内存
BaseDataCache.getInstance().cacheRuleData(ruleData);
// 如果每个插件还有自己的处理逻辑,那么就去处理
Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));
} else if (dataType == DataEventTypeEnum.DELETE) { // 删除操作
// 从网关内存移除数据
BaseDataCache.getInstance().removeRuleData(ruleData);
// 如果每个插件还有自己的处理逻辑,那么就去处理
Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.removeRule(ruleData));
}
}
});
}
}
3.4 数据缓存到内存
那么更新一条选择器数据,会进入下面的逻辑:
// 将数据保存到网关内存
BaseDataCache.getInstance().cacheSelectData(selectorData);
// 如果每个插件还有自己的处理逻辑,那么就去处理 Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));
一是将数据保存到网关的内存中。BaseDataCache
是最终缓存数据的类,通过单例模式实现。选择器数据就存到了SELECTOR_MAP
这个Map
中。在后续使用的时候,也是从这里拿数据。
public final class BaseDataCache {
// 私有变量
private static final BaseDataCache INSTANCE = new BaseDataCache();
// 私有构造器
private BaseDataCache() {
}
/**
* Gets instance.
* 公开方法
* @return the instance
*/
public static BaseDataCache getInstance() {
return INSTANCE;
}
/**
* 缓存选择器数据的Map
* pluginName -> SelectorData.
*/
private static final ConcurrentMap<String, List<SelectorData>> SELECTOR_MAP = Maps.newConcurrentMap();
public void cacheSelectData(final SelectorData selectorData) {
Optional.ofNullable(selectorData).ifPresent(this::selectorAccept);
}
/**
* cache selector data.
* 缓存选择器数据
* @param data the selector data
*/
private void selectorAccept(final SelectorData data) {
String key = data.getPluginName();
if (SELECTOR_MAP.containsKey(key)) { // 更新操作,先删除再插入
List<SelectorData> existList = SELECTOR_MAP.get(key);
final List<SelectorData> resultList = existList.stream().filter(r -> !r.getId().equals(data.getId())).collect(Collectors.toList());
resultList.add(data);
final List<SelectorData> collect = resultList.stream().sorted(Comparator.comparing(SelectorData::getSort)).collect(Collectors.toList());
SELECTOR_MAP.put(key, collect);
} else { // 新增操作,直接放到Map中
SELECTOR_MAP.put(key, Lists.newArrayList(data));
}
}
}
二是如果每个插件还有自己的处理逻辑,那么就去处理。 通过idea
编辑器可以看到,当新增一条选择器后,有如下的插件还有处理。这里我们就不再展开了。
经过以上的源码追踪,并通过一个实际的案例,在admin
端新增更新一条选择器数据,就将zookeeper
数据同步的流程分析清楚了。
我们还是通过时序图将网关端的数据同步流程串联一下:
数据同步的流程已经分析完了,为了不让同步流程被打断,在分析过程中就忽略了其他逻辑。我们还需要分析Admin
同步数据初始化和网关同步操作初始化的流程。
4. Admin同步数据初始化
当admin
启动后,会将当前的数据信息全量同步到zk
中,实现逻辑如下:
/**
* Zookeeper 数据初始化
*/
public class ZookeeperDataInit implements CommandLineRunner {
private final ZkClient zkClient;
private final SyncDataService syncDataService;
/**
* Instantiates a new Zookeeper data init.
*
* @param zkClient the zk client
* @param syncDataService the sync data service
*/
public ZookeeperDataInit(final ZkClient zkClient, final SyncDataService syncDataService) {
this.zkClient = zkClient;
this.syncDataService = syncDataService;
}
@Override
public void run(final String... args) {
String pluginPath = DefaultPathConstants.PLUGIN_PARENT;
String authPath = DefaultPathConstants.APP_AUTH_PARENT;
String metaDataPath = DefaultPathConstants.META_DATA;
// 判断zk中是否存在数据
if (!zkClient.exists(pluginPath) && !zkClient.exists(authPath) && !zkClient.exists(metaDataPath)) {
syncDataService.syncAll(DataEventTypeEnum.REFRESH);
}
}
}
判断zk
中是否存在数据,如果不存在,则进行同步。
ZookeeperDataInit
实现了CommandLineRunner
接口。它是springboot
提供的接口,会在所有 Spring Beans
初始化之后执行run()
方法,常用于项目中初始化的操作。
- SyncDataService.syncAll()
从数据库查询数据,然后进行全量数据同步,所有的认证信息、插件信息、选择器信息、规则信息和元数据信息。主要是通过eventPublisher
发布同步事件。这里就跟前面提到的同步逻辑就又联系起来了,eventPublisher
通过publishEvent()
发布完事件后,有ApplicationListener
执行事件变更操作,在ShenYu
中就是前面提到的DataChangedEventDispatcher
。
@Service
public class SyncDataServiceImpl implements SyncDataService {
// 事件发布
private final ApplicationEventPublisher eventPublisher;
/***
* 全量数据同步
* @param type the type
* @return
*/
@Override
public boolean syncAll(final DataEventTypeEnum type) {
// 同步认证信息
appAuthService.syncData();
// 同步插件信息
List<PluginData> pluginDataList = pluginService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, type, pluginDataList));
// 同步选择器信息
List<SelectorData> selectorDataList = selectorService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, type, selectorDataList));
// 同步规则信息
List<RuleData> ruleDataList = ruleService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, type, ruleDataList));
// 同步元数据信息
metaDataService.syncData();
return true;
}
}
5. 网关同步操作初始化
网关这边的数据同步初始化操作主要是订阅zk
中的节点,当有数据变更时,收到变更数据。这依赖于ZooKeeper
的Watch
机制。在ShenYu
中,负责zk
数据同步的是ZookeeperSyncDataService
,也在前面提到过。
ZookeeperSyncDataService
的功能逻辑是在实例化的过程中完成的:对zk
中的shenyu
数据同步节点完成订阅。这里的订阅分两类,一类是已经存在的节点上面数据发生更新,这通过zkClient.subscribeDataChanges()
方法实现;另一类是当前节点下有新增或删除节点,即子节点发生变化,这通过 zkClient.subscribeChildChanges()
方法实现。
ZookeeperSyncDataService
的代码有点多,这里我们以插件数据的读取和订阅进行追踪,其他类型的数据操作原理是一样的。
/**
* zookeeper 数据同步服务
*/
public class ZookeeperSyncDataService implements SyncDataService, AutoCloseable {
// 在实例化的时候,完成从zk中读取数据的操作,并订阅节点
public ZookeeperSyncDataService( /*省略构造参数参数*/ ) {
this.zkClient = zkClient;
this.pluginDataSubscriber = pluginDataSubscriber;
this.metaDataSubscribers = metaDataSubscribers;
this.authDataSubscribers = authDataSubscribers;
// 订阅插件、选择器和规则数据
watcherData();
// 订阅认证数据
watchAppAuth();
// 订阅元数据
watchMetaData();
}
private void watcherData() {
// 插件节点路径
final String pluginParent = DefaultPathConstants.PLUGIN_PARENT;
// 所有插件节点
List<String> pluginZKs = zkClientGetChildren(pluginParent);
for (String pluginName : pluginZKs) {
// 订阅当前所有插件、选择器和规则数据
watcherAll(pluginName);
}
// 订阅子节点(新增或删除一个插件)
zkClient.subscribeChildChanges(pluginParent, (parentPath, currentChildren) -> {
if (CollectionUtils.isNotEmpty(currentChildren)) {
for (String pluginName : currentChildren) {
// 需要订阅子节点的所有插件、选择器和规则数据
watcherAll(pluginName);
}
}
});
}
private void watcherAll(final String pluginName) {
// 订阅插件数据
watcherPlugin(pluginName);
// 订阅选择器数据
watcherSelector(pluginName);
// 订阅规则数据
watcherRule(pluginName);
}
private void watcherPlugin(final String pluginName) {
// 当前插件路径
String pluginPath = DefaultPathConstants.buildPluginPath(pluginName);
// 是否存在,不存在就创建
if (!zkClient.exists(pluginPath)) {
zkClient.createPersistent(pluginPath, true);
}
// 读取zk上当前节点数据,并反序列化
PluginData pluginData = null == zkClient.readData(pluginPath) ? null
: GsonUtils.getInstance().fromJson((String) zkClient.readData(pluginPath), PluginData.class);
// 缓存到网关内存中
cachePluginData(pluginData);
// 订阅插件节点
subscribePluginDataChanges(pluginPath, pluginName);
}
private void cachePluginData(final PluginData pluginData) {
// 省略实现逻辑,其实就是 CommonPluginDataSubscriber 中的操作,跟前面都能联系起来
}
private void subscribePluginDataChanges(final String pluginPath, final String pluginName) {
// 订阅数据变更:更新或删除
zkClient.subscribeDataChanges(pluginPath, new IZkDataListener() {
@Override
public void handleDataChange(final String dataPath, final Object data) { // 更新操作
// 省略实现逻辑,其实就是 CommonPluginDataSubscriber 中的操作,跟前面都能联系起来
}
@Override
public void handleDataDeleted(final String dataPath) { // 删除操作
// 省略实现逻辑,其实就是 CommonPluginDataSubscriber 中的操作,跟前面都能联系起来
}
});
}
}
上面的源代码中都给出了注释,相信大家可以看明白。订阅插件数据的主要逻辑如下:
- 构造当前插件路径
- 路径是否存在,不存在就创建
- 读取zk上当前节点数据,并反序列化
- 插件数据缓存到网关内存中
- 订阅插件节点
6. 总结
本文通过一个实际案例,对zookeeper
的数据同步原理进行了源码分析。涉及到的主要知识点如下:
- 基于
zookeeper
的数据同步,主要是通过watch
机制实现;
- 通过
Spring
完成事件发布和监听;
- 通过抽象
DataChangedListener
接口,支持多种同步策略,面向接口编程;
- 使用单例设计模式实现缓存数据类
BaseDataCache
;
- 通过
SpringBoot
的条件装配和starter
加载机制实现配置类的加载。
11 Sep 2021 |
ShenYu |
Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API
网关。
在ShenYu
网关中,数据同步是指,当在后台管理系统中,数据发送了更新后,如何将更新的数据同步到网关中。Apache ShenYu
网关当前支持ZooKeeper
、WebSocket
、Http长轮询
、Nacos
、Etcd
和 Consul
进行数据同步。本文的主要内容是基于WebSocket
的数据同步源码分析。
本文基于shenyu-2.4.0
版本进行源码分析,官网的介绍请参考 数据同步原理 。
1. 关于WebSocket通信
WebSocket
协议诞生于2008
年,在2011
年成为国际标准。它可以双向通信,服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息。WebSocket
协议建立在 TCP
协议之上,属于应用层,性能开销小,通信高效,协议标识符是ws
。
2. Admin数据同步
我们从一个实际案例进行源码追踪,比如在后台管理系统中,新增一条选择器数据:
2.1 接收数据
- SelectorController.createSelector()
进入SelectorController
类中的createSelector()
方法,它负责数据的校验,添加或更新数据,返回结果信息。
@Validated
@RequiredArgsConstructor
@RestController
@RequestMapping("/selector")
public class SelectorController {
@PostMapping("")
public ShenyuAdminResult createSelector(@Valid @RequestBody final SelectorDTO selectorDTO) { // @Valid 数校验
// 添加或更新数据
Integer createCount = selectorService.createOrUpdate(selectorDTO);
// 返回结果信息
return ShenyuAdminResult.success(ShenyuResultMessage.CREATE_SUCCESS, createCount);
}
// ......
}
2.2 处理数据
- SelectorServiceImpl.createOrUpdate()
在SelectorServiceImpl
类中通过createOrUpdate()
方法完成数据的转换,保存到数据库,发布事件,更新upstream
。
@RequiredArgsConstructor
@Service
public class SelectorServiceImpl implements SelectorService {
// 负责事件发布的eventPublisher
private final ApplicationEventPublisher eventPublisher;
@Override
@Transactional(rollbackFor = Exception.class)
public int createOrUpdate(final SelectorDTO selectorDTO) {
int selectorCount;
// 构建数据 DTO --> DO
SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);
List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();
// 判断是添加还是更新
if (StringUtils.isEmpty(selectorDTO.getId())) {
// 插入选择器数据
selectorCount = selectorMapper.insertSelective(selectorDO);
// 插入选择器中的条件数据
selectorConditionDTOs.forEach(selectorConditionDTO -> {
selectorConditionDTO.setSelectorId(selectorDO.getId());
selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));
});
// check selector add
// 权限检查
if (dataPermissionMapper.listByUserId(JwtUtils.getUserInfo().getUserId()).size() > 0) {
DataPermissionDTO dataPermissionDTO = new DataPermissionDTO();
dataPermissionDTO.setUserId(JwtUtils.getUserInfo().getUserId());
dataPermissionDTO.setDataId(selectorDO.getId());
dataPermissionDTO.setDataType(AdminConstants.SELECTOR_DATA_TYPE);
dataPermissionMapper.insertSelective(DataPermissionDO.buildPermissionDO(dataPermissionDTO));
}
} else {
// 更新数据,先删除再新增
selectorCount = selectorMapper.updateSelective(selectorDO);
//delete rule condition then add
selectorConditionMapper.deleteByQuery(new SelectorConditionQuery(selectorDO.getId()));
selectorConditionDTOs.forEach(selectorConditionDTO -> {
selectorConditionDTO.setSelectorId(selectorDO.getId());
SelectorConditionDO selectorConditionDO = SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO);
selectorConditionMapper.insertSelective(selectorConditionDO);
});
}
// 发布事件
publishEvent(selectorDO, selectorConditionDTOs);
// 更新upstream
updateDivideUpstream(selectorDO);
return selectorCount;
}
// ......
}
在Serrvice
类完成数据的持久化操作,即保存数据到数据库,这个大家应该很熟悉了,就不展开。关于更新upstream
操作,放到后面对应的章节中进行分析,重点关注发布事件的操作,它会进行数据同步。
publishEvent()
方法的逻辑是:找到选择器对应的插件,构建条件数据,发布变更数据。
private void publishEvent(final SelectorDO selectorDO, final List<SelectorConditionDTO> selectorConditionDTOs) {
// 找到选择器对应的插件
PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId());
// 构建条件数据
List<ConditionData> conditionDataList = selectorConditionDTOs.stream().map(ConditionTransfer.INSTANCE::mapToSelectorDTO).collect(Collectors.toList());
// 发布变更数据
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE,
Collections.singletonList(SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList))));
}
发布变更数据通过eventPublisher.publishEvent()
完成,这个eventPublisher
对象是一个ApplicationEventPublisher
类,这个类的全限定名是org.springframework.context.ApplicationEventPublisher
。看到这儿,我们知道了发布数据是通过Spring
相关的功能来完成的。
关于ApplicationEventPublisher
:
当有状态发生变化时,发布者调用 ApplicationEventPublisher
的 publishEvent
方法发布一个事件,Spring
容器广播事件给所有观察者,调用观察者的 onApplicationEvent
方法把事件对象传递给观察者。调用 publishEvent
方法有两种途径,一种是实现接口由容器注入 ApplicationEventPublisher
对象然后调用其方法,另一种是直接调用容器的方法,两种方法发布事件没有太大区别。
ApplicationEventPublisher
:发布事件;
ApplicationEvent
:Spring
事件,记录事件源、时间和数据;
ApplicationListener
:事件监听者,观察者;
在Spring
的事件发布机制中,有三个对象,
一个是发布事件的ApplicationEventPublisher
,在ShenYu
中通过构造器注入了一个eventPublisher
。
另一个对象是ApplicationEvent
,在ShenYu
中通过DataChangedEvent
继承了它,表示事件对象。
public class DataChangedEvent extends ApplicationEvent {
//......
}
最后一个是 ApplicationListener
,在ShenYu
中通过DataChangedEventDispatcher
类实现了该接口,作为事件的监听者,负责处理事件对象。
@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
//......
}
2.3 分发数据
- DataChangedEventDispatcher.onApplicationEvent()
当事件发布完成后,会自动进入到DataChangedEventDispatcher
类中的onApplicationEvent()
方法,进行事件处理。
@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
/**
* 有数据变更时,调用此方法
* @param event
*/
@Override
@SuppressWarnings("unchecked")
public void onApplicationEvent(final DataChangedEvent event) {
// 遍历数据变更监听器(一般使用一种数据同步的方式就好了)
for (DataChangedListener listener : listeners) {
// 哪种数据发生变更
switch (event.getGroupKey()) {
case APP_AUTH: // 认证信息
listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
break;
case PLUGIN: // 插件信息
listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
break;
case RULE: // 规则信息
listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
break;
case SELECTOR: // 选择器信息
listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
break;
case META_DATA: // 元数据
listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
break;
default: // 其他类型,抛出异常
throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
}
}
}
}
当有数据变更时,调用onApplicationEvent
方法,然后遍历所有数据变更监听器,判断是哪种数据类型,交给相应的数据监听器进行处理。
ShenYu
将所有数据进行了分组,一共是五种:认证信息、插件信息、规则信息、选择器信息和元数据。
这里的数据变更监听器(DataChangedListener
),就是数据同步策略的抽象,它的具体实现有:
这几个实现类就是当前ShenYu
支持的同步策略:
WebsocketDataChangedListener
:基于websocket
的数据同步;
ZookeeperDataChangedListener
:基于zookeeper
的数据同步;
ConsulDataChangedListener
:基于consul
的数据同步;
EtcdDataDataChangedListener
:基于etcd
的数据同步;
HttpLongPollingDataChangedListener
:基于http长轮询
的数据同步;
NacosDataChangedListener
:基于nacos
的数据同步;
既然有这么多种实现策略,那么如何确定使用哪一种呢?
因为本文是基于websocket
的数据同步源码分析,所以这里以WebsocketDataChangedListener
为例,分析它是如何被加载并实现的。
通过在源码工程中进行全局搜索,可以看到,它的实现是在DataSyncConfiguration
类完成的。
/**
* 数据同步配置类
* 通过springboot条件装配实现
* The type Data sync configuration.
*/
@Configuration
public class DataSyncConfiguration {
/**
* websocket数据同步(默认策略)
* The WebsocketListener(default strategy).
*/
@Configuration
@ConditionalOnProperty(name = "shenyu.sync.websocket.enabled", havingValue = "true", matchIfMissing = true)
@EnableConfigurationProperties(WebsocketSyncProperties.class)
static class WebsocketListener {
/**
* Config event listener data changed listener.
* 配置websocket数据变更监听器
* @return the data changed listener
*/
@Bean
@ConditionalOnMissingBean(WebsocketDataChangedListener.class)
public DataChangedListener websocketDataChangedListener() {
return new WebsocketDataChangedListener();
}
/**
* Websocket collector.
* Websocket处理类:建立连接,发送消息,关闭连接等操作
* @return the websocket collector
*/
@Bean
@ConditionalOnMissingBean(WebsocketCollector.class)
public WebsocketCollector websocketCollector() {
return new WebsocketCollector();
}
/**
* Server endpoint exporter
*
* @return the server endpoint exporter
*/
@Bean
@ConditionalOnMissingBean(ServerEndpointExporter.class)
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
//......
}
这个配置类是通过SpringBoot
条件装配类实现的。在WebsocketListener
类上面有几个注解:
-
@Configuration
:配置文件,应用上下文;
-
@ConditionalOnProperty(name = "shenyu.sync.websocket.enabled", havingValue = "true", matchIfMissing = true)
:属性条件判断,满足条件,该配置类才会生效。也就是说,当我们有如下配置时,就会采用websocket
进行数据同步。不过,这里需要注意下matchIfMissing = true
这个属性,它表示,如果你没有如下的配置,该配置类也会生效。基于websocket
的数据同步时官方推荐的方式,也是默认采用的方式。
shenyu:
sync:
websocket:
enabled: true
-
@EnableConfigurationProperties
:启用配置属性;
当我们主动配置,采用websocket
进行数据同步时,WebsocketDataChangedListener
就会生成。所以在事件处理方法onApplicationEvent()
中,就会到相应的listener
中。在我们的案例中,是新增加了一条选择器数据,数据同步采用的是websocket
,所以,代码会进入到WebsocketDataChangedListener
进行选择器数据变更处理。
@Override
@SuppressWarnings("unchecked")
public void onApplicationEvent(final DataChangedEvent event) {
// 遍历数据变更监听器(一般使用一种数据同步的方式就好了)
for (DataChangedListener listener : listeners) {
// 哪种数据发生变更
switch (event.getGroupKey()) {
// 省略了其他逻辑
case SELECTOR: // 选择器信息
listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType()); // 在我们的案例中,会进入到WebsocketDataChangedListener进行选择器数据变更处理
break;
}
}
2.4 Websocket数据变更监听器
// 选择器数据有更新
@Override
public void onSelectorChanged(final List<SelectorData> selectorDataList, final DataEventTypeEnum eventType) {
// 构造 WebsocketData 数据
WebsocketData<SelectorData> websocketData =
new WebsocketData<>(ConfigGroupEnum.SELECTOR.name(), eventType.name(), selectorDataList);
// 通过websocket发送数据
WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);
}
2.5 Websocket发送数据
- WebsocketCollector.send()
在send()
方法中,判断了一下同步的类型,根据不同的类型,进行处理。
@Slf4j
@ServerEndpoint(value = "/websocket", configurator = WebsocketConfigurator.class)
public class WebsocketCollector {
/**
* Send.
*
* @param message the message
* @param type the type
*/
public static void send(final String message, final DataEventTypeEnum type) {
if (StringUtils.isNotBlank(message)) {
// 如果是MYSELF(第一次的全量同步)
if (DataEventTypeEnum.MYSELF == type) {
// 从threadlocal中获取session
Session session = (Session) ThreadLocalUtil.get(SESSION_KEY);
if (session != null) {
// 向该session发送全量数据
sendMessageBySession(session, message);
}
} else {
// 后续的增量同步
// 向所有的session中同步变更数据
SESSION_SET.forEach(session -> sendMessageBySession(session, message));
}
}
}
private static void sendMessageBySession(final Session session, final String message) {
try {
// 通过websocket的session把消息发送出去
session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.error("websocket send result is exception: ", e);
}
}
}
我们给的案例是一个新增操作 ,是一个增量同步,所以会走
SESSION_SET.forEach(session -> sendMessageBySession(session, message));
这个逻辑。
再通过
session.getBasicRemote().sendText(message);
将数据发送了出去。
至此,当admin
端发生数据变更时,就将变更的数据以增量形式通过WebSocket
发给了网关。
分析到这里,不知道大家有没有疑问呢?比如session
是怎么来的?网关如何和admin
建立连接的?
不要着急,我们接下来就进行网关端的同步分析。
不过,在继续源码分析前,我们用一张图将上面的分析过程串联起来。
3. 网关数据同步
假设ShenYu
网关已经在正常运行了,使用的数据同步方式也是websocket
。那么当在admin
端新增一条选择器数据后,并且通过WebSocket
发送到网关,那网关是如何接收并处理数据的呢?接下来我们就继续进行源码分析,一探究竟。
3.1 WebsocketClient接收数据
- ShenyuWebsocketClient.onMessage()
在网关端有一个ShenyuWebsocketClient
类,它继承了WebSocketClient
,可以和WebSocket
建立连接并通信。
public final class ShenyuWebsocketClient extends WebSocketClient {
// ......
}
当在admin
端通过websocket
发送数据后,ShenyuWebsocketClient
就可以通过onMessage()
接收到数据,然后就可以自己进行处理。
public final class ShenyuWebsocketClient extends WebSocketClient {
// 接受到消息后执行
@Override
public void onMessage(final String result) {
// 处理接收到的数据
handleResult(result);
}
private void handleResult(final String result) {
// 数据反序列化
WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);
// 哪种数据类型,插件、选择器、规则...
ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());
// 哪种操作类型,更新、删除...
String eventType = websocketData.getEventType();
String json = GsonUtils.getInstance().toJson(websocketData.getData());
// 处理数据
websocketDataHandler.executor(groupEnum, json, eventType);
}
}
接收到数据后,首先进行了反序列化操作,读取数据类型和操作类型,紧接着,就交给websocketDataHandler.executor()
进行处理。
3.2 执行Websocket事件处理器
- WebsocketDataHandler.executor()
通过工厂模式创建了Websocket
数据处理器,每种数据类型,都提供了一个处理器:
插件 –> 插件数据处理器;
选择器 –> 选择器数据处理器;
规则 –> 规则数据处理器;
认证信息 –> 认证数据处理器;
元数据 –> 元数据处理器。
/**
* 通过工厂模式创建 Websocket数据处理器
* The type Websocket cache handler.
*/
public class WebsocketDataHandler {
private static final EnumMap<ConfigGroupEnum, DataHandler> ENUM_MAP = new EnumMap<>(ConfigGroupEnum.class);
/**
* Instantiates a new Websocket data handler.
* 每种数据类型,提供一个处理器
* @param pluginDataSubscriber the plugin data subscriber
* @param metaDataSubscribers the meta data subscribers
* @param authDataSubscribers the auth data subscribers
*/
public WebsocketDataHandler(final PluginDataSubscriber pluginDataSubscriber,
final List<MetaDataSubscriber> metaDataSubscribers,
final List<AuthDataSubscriber> authDataSubscribers) {
// 插件 --> 插件数据处理器
ENUM_MAP.put(ConfigGroupEnum.PLUGIN, new PluginDataHandler(pluginDataSubscriber));
// 选择器 --> 选择器数据处理器
ENUM_MAP.put(ConfigGroupEnum.SELECTOR, new SelectorDataHandler(pluginDataSubscriber));
// 规则 --> 规则数据处理器
ENUM_MAP.put(ConfigGroupEnum.RULE, new RuleDataHandler(pluginDataSubscriber));
// 认证信息 --> 认证数据处理器
ENUM_MAP.put(ConfigGroupEnum.APP_AUTH, new AuthDataHandler(authDataSubscribers));
// 元数据 --> 元数据处理器
ENUM_MAP.put(ConfigGroupEnum.META_DATA, new MetaDataHandler(metaDataSubscribers));
}
/**
* Executor.
*
* @param type the type
* @param json the json
* @param eventType the event type
*/
public void executor(final ConfigGroupEnum type, final String json, final String eventType) {
// 根据数据类型,找到对应的数据处理器
ENUM_MAP.get(type).handle(json, eventType);
}
}
不同的数据类型,有不同的数据处理方式,所以有不同的实现类。但是它们之间也有相同的处理逻辑,所以可以通过模板方法设计模式来实现。相同的逻辑放在抽象类中的handle()
方法中,不同逻辑就交给各自的实现类。
我们的案例是新增了一条选择器数据,所以会交给SelectorDataHandler
( 选择器 –> 选择器数据处理器)进行数据处理。
3.3 判断事件类型
- AbstractDataHandler.handle()
实现数据变更的通用逻辑处理:根据不同的操作类型调用不同方法。
public abstract class AbstractDataHandler<T> implements DataHandler {
/**
* Convert list.
* 不同的逻辑由各自实现类去实现
* @param json the json
* @return the list
*/
protected abstract List<T> convert(String json);
/**
* Do refresh.
* 不同的逻辑由各自实现类去实现
* @param dataList the data list
*/
protected abstract void doRefresh(List<T> dataList);
/**
* Do update.
* 不同的逻辑由各自实现类去实现
* @param dataList the data list
*/
protected abstract void doUpdate(List<T> dataList);
/**
* Do delete.
* 不同的逻辑由各自实现类去实现
* @param dataList the data list
*/
protected abstract void doDelete(List<T> dataList);
// 通用逻辑,抽象类实现
@Override
public void handle(final String json, final String eventType) {
List<T> dataList = convert(json);
if (CollectionUtils.isNotEmpty(dataList)) {
DataEventTypeEnum eventTypeEnum = DataEventTypeEnum.acquireByName(eventType);
switch (eventTypeEnum) {
case REFRESH:
case MYSELF:
doRefresh(dataList); //刷新数据,全量同步
break;
case UPDATE:
case CREATE:
doUpdate(dataList); // 更新或创建数据,增量同步
break;
case DELETE:
doDelete(dataList); // 删除数据
break;
default:
break;
}
}
}
}
新增一条选择器数据,是新增操作,通过switch-case
进入到 doUpdate()
方法中。
3.4 进入具体的数据处理器
- SelectorDataHandler.doUpdate()
/**
* 选择器数据处理器
* The type Selector data handler.
*/
@RequiredArgsConstructor
public class SelectorDataHandler extends AbstractDataHandler<SelectorData> {
private final PluginDataSubscriber pluginDataSubscriber;
//......
// 更新操作
@Override
protected void doUpdate(final List<SelectorData> dataList) {
dataList.forEach(pluginDataSubscriber::onSelectorSubscribe);
}
}
遍历数据,进入onSelectorSubscribe()
方法。
- PluginDataSubscriber.onSelectorSubscribe()
它没有其他逻辑,直接调用subscribeDataHandler()
方法。在方法中,更具数据类型(插件、选择器或规则),操作类型(更新或删除),去执行不同逻辑。
/**
* 通用插件数据订阅者,负责处理所有插件、选择器和规则信息
* The type Common plugin data subscriber.
*/
public class CommonPluginDataSubscriber implements PluginDataSubscriber {
//......
// 处理选择器数据
@Override
public void onSelectorSubscribe(final SelectorData selectorData) {
subscribeDataHandler(selectorData, DataEventTypeEnum.UPDATE);
}
// 订阅数据处理器,处理数据的更新或删除
private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {
Optional.ofNullable(classData).ifPresent(data -> {
// 插件数据
if (data instanceof PluginData) {
PluginData pluginData = (PluginData) data;
if (dataType == DataEventTypeEnum.UPDATE) { // 更新操作
// 将数据保存到网关内存
BaseDataCache.getInstance().cachePluginData(pluginData);
// 如果每个插件还有自己的处理逻辑,那么就去处理
Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));
} else if (dataType == DataEventTypeEnum.DELETE) { // 删除操作
// 从网关内存移除数据
BaseDataCache.getInstance().removePluginData(pluginData);
// 如果每个插件还有自己的处理逻辑,那么就去处理
Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData));
}
} else if (data instanceof SelectorData) { // 选择器数据
SelectorData selectorData = (SelectorData) data;
if (dataType == DataEventTypeEnum.UPDATE) { // 更新操作
// 将数据保存到网关内存
BaseDataCache.getInstance().cacheSelectData(selectorData);
// 如果每个插件还有自己的处理逻辑,那么就去处理 Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));
} else if (dataType == DataEventTypeEnum.DELETE) { // 删除操作
// 从网关内存移除数据
BaseDataCache.getInstance().removeSelectData(selectorData);
// 如果每个插件还有自己的处理逻辑,那么就去处理
Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));
}
} else if (data instanceof RuleData) { // 规则数据
RuleData ruleData = (RuleData) data;
if (dataType == DataEventTypeEnum.UPDATE) { // 更新操作
// 将数据保存到网关内存
BaseDataCache.getInstance().cacheRuleData(ruleData);
// 如果每个插件还有自己的处理逻辑,那么就去处理
Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));
} else if (dataType == DataEventTypeEnum.DELETE) { // 删除操作
// 从网关内存移除数据
BaseDataCache.getInstance().removeRuleData(ruleData);
// 如果每个插件还有自己的处理逻辑,那么就去处理
Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.removeRule(ruleData));
}
}
});
}
}
那么新增一条选择器数据,会进入下面的逻辑:
// 将数据保存到网关内存
BaseDataCache.getInstance().cacheSelectData(selectorData);
// 如果每个插件还有自己的处理逻辑,那么就去处理 Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));
一是将数据保存到网关的内存中。BaseDataCache
是最终缓存数据的类,通过单例模式实现。选择器数据就存到了SELECTOR_MAP
这个Map
中。在后续使用的时候,也是从这里拿数据。
public final class BaseDataCache {
// 私有变量
private static final BaseDataCache INSTANCE = new BaseDataCache();
// 私有构造器
private BaseDataCache() {
}
/**
* Gets instance.
* 公开方法
* @return the instance
*/
public static BaseDataCache getInstance() {
return INSTANCE;
}
/**
* 缓存选择器数据的Map
* pluginName -> SelectorData.
*/
private static final ConcurrentMap<String, List<SelectorData>> SELECTOR_MAP = Maps.newConcurrentMap();
public void cacheSelectData(final SelectorData selectorData) {
Optional.ofNullable(selectorData).ifPresent(this::selectorAccept);
}
/**
* cache selector data.
* 缓存选择器数据
* @param data the selector data
*/
private void selectorAccept(final SelectorData data) {
String key = data.getPluginName();
if (SELECTOR_MAP.containsKey(key)) { // 更新操作,先删除再插入
List<SelectorData> existList = SELECTOR_MAP.get(key);
final List<SelectorData> resultList = existList.stream().filter(r -> !r.getId().equals(data.getId())).collect(Collectors.toList());
resultList.add(data);
final List<SelectorData> collect = resultList.stream().sorted(Comparator.comparing(SelectorData::getSort)).collect(Collectors.toList());
SELECTOR_MAP.put(key, collect);
} else { // 新增操作,直接放到Map中
SELECTOR_MAP.put(key, Lists.newArrayList(data));
}
}
}
二是如果每个插件还有自己的处理逻辑,那么就去处理。 通过idea
编辑器可以看到,当新增一条选择器后,有如下的插件还有处理。这里我们就不再展开了。
经过以上的源码追踪,并通过一个实际的案例,在admin
端新增一条选择器数据,就将websocket
数据同步的流程分析清除了。
我们还是用下面的一张图将网关端的数据同步流程串联一下:
数据同步的流程已经分析完了,但是还有一些问题没有分析到,就是网关是如何跟admin
建立连接的?
4. 网关和admin建立websocket连接
在网关的配置文件中有如下配置,并且引入了相关依赖,就会启动websocket
相关服务。
shenyu:
file:
enabled: true
cross:
enabled: true
dubbo :
parameter: multi
sync:
websocket : # 使用websocket进行数据同步
urls: ws://localhost:9095/websocket # admin端的websocket地址
在网关中引入websocket
的依赖。
<!--shenyu data sync start use websocket-->
<dependency>
<groupId>org.apache.shenyu</groupId>
<artifactId>shenyu-spring-boot-starter-sync-data-websocket</artifactId>
<version>${project.version}</version>
</dependency>
通过springboot
的条件装配,创建相关的bean
。在网关启动的时候,如果我们配置了shenyu.sync.websocket.urls
,那么Websocket
数据同步配置就会被加载。这里通过spring boot starter
完成依赖的加载。
/**
* Websocket数据同步配置
* 通过springboot实现条件注入
* Websocket sync data configuration for spring boot.
*/
@Configuration
@ConditionalOnClass(WebsocketSyncDataService.class)
@ConditionalOnProperty(prefix = "shenyu.sync.websocket", name = "urls")
@Slf4j
public class WebsocketSyncDataConfiguration {
/**
* Websocket sync data service.
* Websocket数据同步服务
* @param websocketConfig the websocket config
* @param pluginSubscriber the plugin subscriber
* @param metaSubscribers the meta subscribers
* @param authSubscribers the auth subscribers
* @return the sync data service
*/
// 创建websocketSyncDataService
@Bean
public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
log.info("you use websocket sync shenyu data.......");
return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),
metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
}
/**
* Config websocket config.
*
* @return the websocket config
*/
@Bean
@ConfigurationProperties(prefix = "shenyu.sync.websocket")
public WebsocketConfig websocketConfig() {
return new WebsocketConfig(); // 创建WebsocketConfig
}
}
在项目的resources/META-INF
目录先新建spring.factories
文件,在文件中指明配置类。
在WebsocketSyncDataService
中做了如下几件事情:
- 读取配置中的
urls
,这个表示admin
端的同步地址,有多个的话,使用”,”分割;
- 创建调度线程池,一个
admin
分配一个,用于执行定时任务;
- 创建
ShenyuWebsocketClient
,一个admin
分配一个,用于和admin
建立websocket
通信;
- 开始和
admin
端的websocket
建立连接;
- 执行定时任务,每隔10秒执行一次。主要作用是判断
websocket
连接是否已经断开,如果已经断开,则尝试重连。如果没有断开,就进行 ping-pong
检测。
/**
* Websocket数据同步服务
* Websocket sync data service.
*/
@Slf4j
public class WebsocketSyncDataService implements SyncDataService, AutoCloseable {
private final List<WebSocketClient> clients = new ArrayList<>();
private final ScheduledThreadPoolExecutor executor;
/**
* Instantiates a new Websocket sync cache.
* 创建Websocket数据同步服务
* @param websocketConfig the websocket config
* @param pluginDataSubscriber the plugin data subscriber
* @param metaDataSubscribers the meta data subscribers
* @param authDataSubscribers the auth data subscribers
*/
public WebsocketSyncDataService(final WebsocketConfig websocketConfig,
final PluginDataSubscriber pluginDataSubscriber,
final List<MetaDataSubscriber> metaDataSubscribers,
final List<AuthDataSubscriber> authDataSubscribers) {
// admin端的同步地址,有多个的话,使用","分割
String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");
// 创建调度线程池,一个admin分配一个
executor = new ScheduledThreadPoolExecutor(urls.length, ShenyuThreadFactory.create("websocket-connect", true));
for (String url : urls) {
try {
//创建WebsocketClient,一个admin分配一个
clients.add(new ShenyuWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));
} catch (URISyntaxException e) {
log.error("websocket url({}) is error", url, e);
}
}
try {
for (WebSocketClient client : clients) {
// 和websocket server建立连接
boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);
if (success) {
log.info("websocket connection is successful.....");
} else {
log.error("websocket connection is error.....");
}
// 执行定时任务,每隔10秒执行一次
// 主要作用是判断websocket连接是否已经断开,如果已经断开,则尝试重连。
// 如果没有断开,就进行 ping-pong 检测
executor.scheduleAtFixedRate(() -> {
try {
if (client.isClosed()) {
boolean reconnectSuccess = client.reconnectBlocking();
if (reconnectSuccess) {
log.info("websocket reconnect server[{}] is successful.....", client.getURI().toString());
} else {
log.error("websocket reconnection server[{}] is error.....", client.getURI().toString());
}
} else {
client.sendPing();
log.debug("websocket send to [{}] ping message successful", client.getURI().toString());
}
} catch (InterruptedException e) {
log.error("websocket connect is error :{}", e.getMessage());
}
}, 10, 10, TimeUnit.SECONDS);
}
/* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/
} catch (InterruptedException e) {
log.info("websocket connection...exception....", e);
}
}
@Override
public void close() {
// 关闭 websocket client
for (WebSocketClient client : clients) {
if (!client.isClosed()) {
client.close();
}
}
// 关闭线程池
if (Objects.nonNull(executor)) {
executor.shutdown();
}
}
}
在ShenYu
中创建的WebSocket
客户端,用于和admin
端通信。第一次成功建立连接后,同步全量数据,后续进行增量同步。
/**
* 在ShenYu中自定义的WebSocket客户端
* The type shenyu websocket client.
*/
@Slf4j
public final class ShenyuWebsocketClient extends WebSocketClient {
private volatile boolean alreadySync = Boolean.FALSE;
private final WebsocketDataHandler websocketDataHandler;
/**
* Instantiates a new shenyu websocket client.
* 创建ShenyuWebsocketClient
* @param serverUri the server uri 服务端uri
* @param pluginDataSubscriber the plugin data subscriber 插件数据订阅器
* @param metaDataSubscribers the meta data subscribers 元数据订阅器
* @param authDataSubscribers the auth data subscribers 认证数据订阅器
*/
public ShenyuWebsocketClient(final URI serverUri, final PluginDataSubscriber pluginDataSubscriber,final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {
super(serverUri);
this.websocketDataHandler = new WebsocketDataHandler(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);
}
// 成功建立连接后执行
@Override
public void onOpen(final ServerHandshake serverHandshake) {
// 防止重新建立连接时,再次执行,所以用alreadySync进行判断
if (!alreadySync) {
// 同步所有数据,MYSELF 类型
send(DataEventTypeEnum.MYSELF.name());
alreadySync = true;
}
}
// 接受到消息后执行
@Override
public void onMessage(final String result) {
// 处理接收到的数据
handleResult(result);
}
// 关闭后执行
@Override
public void onClose(final int i, final String s, final boolean b) {
this.close();
}
// 失败后执行
@Override
public void onError(final Exception e) {
this.close();
}
@SuppressWarnings("ALL")
private void handleResult(final String result) {
// 数据反序列化
WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);
// 哪种数据类型,插件、选择器、规则...
ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());
// 哪种操作类型,更新、删除...
String eventType = websocketData.getEventType();
String json = GsonUtils.getInstance().toJson(websocketData.getData());
// 处理数据
websocketDataHandler.executor(groupEnum, json, eventType);
}
}
5. 总结
本文通过一个实际案例,对websocket
的数据同步原理进行了源码分析。涉及到的主要知识点如下:
websocket
支持双向通信,性能好,推荐使用;
- 通过
Spring
完成事件发布和监听;
- 通过抽象
DataChangedListener
接口,支持多种同步策略,面向接口编程;
- 使用工厂模式创建
WebsocketDataHandler
,实现不同数据类型的处理;
- 使用模板方法设计模式实现
AbstractDataHandler
,处理通用的操作类型;
- 使用单例设计模式实现缓存数据类
BaseDataCache
;
- 通过
SpringBoot
的条件装配和starter
加载机制实现配置类的加载。