Soul网关中的数据同步之Nacos

在上一篇文章中,跟踪了基于ZooKeeper的数据同步原理,本篇文件将要跟踪基于Nacos的数据同步原理。

同步的核心逻辑是:在soul-admin后台修改数据,先保存到数据库;然后将修改的信息通过同步策略发送到soul网关;由网关处理后,保存在soul网关内存;使用时,从网关内存获取数据。

本文的分析是想通过跟踪源码的方式来理解同步的核心逻辑,数据同步分析步骤如下:

  • 1.修改选择器
  • 2.更新数据
  • 3.接收数据
  • 4.使用更新后的数据
1. 修改选择器

在演示案例之前,将soul-admin的数据同步方式配置为nacosnacos的启动方式:nacos-server-2.0.0-ALPHA.2\nacos\bin>startup.cmd -m standalone):

soul:
  database:
    dialect: mysql
    init_script: "META-INF/schema.sql"
    init_enable: true
  sync:
#    websocket:
#      enabled: true
#      zookeeper:
#          url: localhost:2181
#          sessionTimeout: 5000
#          connectionTimeout: 2000
#      http:
#        enabled: true
      nacos:
        url: localhost:8848
        namespace: 1c10d748-af86-43b9-8265-75f487d20c6c
        acm:
          enabled: false
          endpoint: acm.aliyun.com
          namespace:
          accessKey:
          secretKey:

soul-bootstrap也配置一下数据同步方式为nacos:

soul :
    file:
      enabled: true
    corss:
      enabled: true
    dubbo :
      parameter: multi
    sync:
#        websocket :
#             urls: ws://localhost:9095/websocket

#        zookeeper:
#             url: localhost:2181
#             sessionTimeout: 5000
#             connectionTimeout: 2000
#        http:
#             url : http://localhost:9095
        nacos:
              url: localhost:8848
              namespace: 1c10d748-af86-43b9-8265-75f487d20c6c
              acm:
                enabled: false
                endpoint: acm.aliyun.com
                namespace:
                accessKey:
                secretKey:

现在,我们以一个实际调用过程为例,比如在Soul网关管理系统中,对选择器的配置信息进行修改:查询条件中id=99才能匹配成功。具体信息如下所示:

点击确认后,进入到soul-adminupdateSelector()这个接口。

    @PutMapping("/{id}")
    public SoulAdminResult updateSelector(@PathVariable("id") final String id, @RequestBody final SelectorDTO selectorDTO) {
        Objects.requireNonNull(selectorDTO);
        selectorDTO.setId(id);
        Integer updateCount = selectorService.createOrUpdate(selectorDTO);
        return SoulAdminResult.success(SoulResultMessage.UPDATE_SUCCESS, updateCount);
    }

2.更新数据

进入到后端系统后,会现在数据中更新信息,然后通过publishEvent()方法将更新的信息同步到网关。(下面代码只是展示了主要的逻辑,完整的代码请参考Soul源码。)

 @Transactional(rollbackFor = RuntimeException.class)
    public int createOrUpdate(final SelectorDTO selectorDTO) {
        int selectorCount;
        SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);
		//将更新的数据保存到soul-admin的数据库
        
        //省略了其他代码
        
        //将更新的数据同步到网关
        publishEvent(selectorDO, selectorConditionDTOs);
        return selectorCount;
    }

publishEvent()方法中调用了eventPublisher.publishEvent(),这个eventPublisher对象是一个ApplicationEventPublisher类,这个类的全限定名是org.springframework.context.ApplicationEventPublisher。看到这儿,我们知道了发布数据是通过Spring相关的功能来完成的。

    private void publishEvent(final RuleDO ruleDO, final List<RuleConditionDTO> ruleConditions) {
		//省略了其他代码......
        // publish change event.
        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, DataEventTypeEnum.UPDATE,
                Collections.singletonList(RuleDO.transFrom(ruleDO, pluginDO.getName(), conditionDataList))));
    }

Spring完成事件发布后,肯定有对应的监听器来处理它,这个监听器是ApplicationListener接口。在Soul中是通过DataChangedEventDispatcher这个类来完成具体监听工作的,它实现了ApplicationListener接口。

//处理监听事件
@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
	//省略了其他代码......
    
    @Override
    @SuppressWarnings("unchecked")
    public void onApplicationEvent(final DataChangedEvent event) {
        for (DataChangedListener listener : listeners) {
            switch (event.getGroupKey()) {
                case APP_AUTH: //认证授权
                    listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
                    break;
                case PLUGIN: //修改了插件
                    listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
                    break;
                case RULE:  //修改了规则
                    listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
                    break;
                case SELECTOR: //修改了选择器
                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
                    break;
                case META_DATA: //修改了元数据
                    listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
                    break;
                default:
                    throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
            }
        }
    }

    //在bean初始化的时候,将实现DataChangedListener接口的bean加载进来。
    @Override
    public void afterPropertiesSet() {
        Collection<DataChangedListener> listenerBeans = applicationContext.getBeansOfType(DataChangedListener.class).values();
        this.listeners = Collections.unmodifiableList(new ArrayList<>(listenerBeans));
    }

}

注意一下,这个DataChangedEventDispatcher还实现了InitializingBean接口,并重写了它的afterPropertiesSet()方法,做的事情是:在bean初始化的时候,将实现DataChangedListener接口的bean加载进来。通过查看源码,可以看到4种数据同步的方式都实现了该接口,其中就有我们这次使用的Nacos数据同步方式。

1

当监听器监听到有事件发布后,会执行onApplicationEvent()方法,这里面的逻辑是循环处理DataChangedListener,通过switch / case表达式匹配修改的是什么类型信息,我们这里修改的是选择器,所以会匹配到listener.onSelectorChanged()这个方法。(这里虽然用了循环的方式处理每一个listener,但在实际中我们只需要一种数据同步方式就好。)

本次使用的是Nacos进行数据同步,所以listener.onSelectorChanged()的实际执行方法是NacosDataChangedListener#onSelectorChanged。这里面做的事情是:

  • 1.更新选择器信息;
  • 2.发布更新的配置信息。

public void onSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {
        updateSelectorMap(getConfig(SELECTOR_DATA_ID));
        switch (eventType) {
            case DELETE:
                //省略了其他代码
            case REFRESH:
            case MYSELF:
			//省略了其他代码
            default:
                changed.forEach(selector -> {
                    //更新选择器信息
                    List<SelectorData> ls = SELECTOR_MAP
                            .getOrDefault(selector.getPluginName(), new ArrayList<>())
                            .stream()
                            .filter(s -> !s.getId().equals(selector.getId()))
                            .sorted(SELECTOR_DATA_COMPARATOR)
                            .collect(Collectors.toList());
                    ls.add(selector);
                    SELECTOR_MAP.put(selector.getPluginName(), ls);
                });
                break;
        }
    	//发布更新的配置信息
        publishConfig(SELECTOR_DATA_ID, SELECTOR_MAP);
    }

真正更新数据的操作是通过configService.publishConfig()完成。configService在程序启动的时候会注册为NacosConfigService


	//真正更新数据的操作是通过 configService.publishConfig()完成
    private void publishConfig(final String dataId, final Object data) {
        configService.publishConfig(dataId, GROUP, GsonUtils.getInstance().toJson(data));
    }
3.接收数据

Soul网关中,接收数据的操作是通过nacos进行监听的。通过 com.alibaba.nacos.api.config.listener.Listener#receiveConfigInfo来接收配置信息,然后去处理。

public class NacosCacheHandler {
    		//省略了其他代码
protected void watcherData(final String dataId, final OnChange oc) {
        Listener listener = new Listener() {
            //接收配置信息
            @Override
            public void receiveConfigInfo(final String configInfo) {
                oc.change(configInfo);
            }

            @Override
            public Executor getExecutor() {
                return null;
            }
        };
        oc.change(getConfigAndSignListener(dataId, listener));
        LISTENERS.getOrDefault(dataId, new ArrayList<>()).add(listener);
    }

protected void updateSelectorMap(final String configInfo) {
        try {
            if(StringUtils.isEmpty(configInfo)){
                return;
            }
            List<SelectorData> selectorDataList = GsonUtils.getInstance().toObjectMapList(configInfo, SelectorData.class).values().stream().flatMap(Collection::stream).collect(Collectors.toList());
            selectorDataList.forEach(selectorData -> Optional.ofNullable(pluginDataSubscriber).ifPresent(subscriber -> {
                subscriber.unSelectorSubscribe(selectorData); //订阅者删除之前的选择器配置信息
                subscriber.onSelectorSubscribe(selectorData); //订阅者保存当前的选择器配置信息
            }));
        } catch (JsonParseException e) {
            log.error("sync selector data have error:", e);
        }
    }
}

实际处理数据还是由CommonPluginDataSubscriber来处理。CommonPluginDataSubscriber在处理数据时,根据数据类型和操作类型来分别处理。当前我们测试的是更新选择器信息,所以会进入更新的逻辑,就是下面代码中的BaseDataCache.getInstance().cacheRuleData(ruleData);


public class CommonPluginDataSubscriber implements PluginDataSubscriber {
    
   //省略了其他代码
     
   private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {
        Optional.ofNullable(classData).ifPresent(data -> {
            if (data instanceof PluginData) {
  				//省略处理插件的逻辑
            } else if (data instanceof SelectorData) { //处理选择器信息
                SelectorData selectorData = (SelectorData) data;
                if (dataType == DataEventTypeEnum.UPDATE) { //更新操作
                    BaseDataCache.getInstance().cacheSelectData(selectorData);
                    Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));
                } else if (dataType == DataEventTypeEnum.DELETE) { //删除操作
                    BaseDataCache.getInstance().removeSelectData(selectorData);
                    Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));
                }
            } else if (data instanceof RuleData) {
                //省略处理规则的逻辑
            }
        });
    }
}

BaseDataCache.getInstance().cacheSelectData(selectorData);代码中,做的事情就是根据传入的变更信息来更新SELECTOR_MAP。这个SELECTOR_MAP缓存了选择器信息,网关在后续使用时,也是从这里获取具体的选择器去匹配请求。

public final class BaseDataCache {
    private static final ConcurrentMap<String, List<SelectorData>> SELECTOR_MAP = Maps.newConcurrentMap();
    
    //省略了其他代码......
    
    //缓存选择器
    public void cacheSelectData(final SelectorData selectorData) {
        Optional.ofNullable(selectorData).ifPresent(this::selectorAccept);
    }

    //接受选择器
    private void selectorAccept(final SelectorData data) {
        String key = data.getPluginName();
        if (SELECTOR_MAP.containsKey(key)) {
            List<SelectorData> existList = SELECTOR_MAP.get(key);
            //删除之前的选择器
            final List<SelectorData> resultList = existList.stream().filter(r -> !r.getId().equals(data.getId())).collect(Collectors.toList());
            resultList.add(data);
            //保存现在的选择器
            final List<SelectorData> collect = resultList.stream().sorted(Comparator.comparing(SelectorData::getSort)).collect(Collectors.toList());
            SELECTOR_MAP.put(key, collect);
        } else {
            SELECTOR_MAP.put(key, Lists.newArrayList(data));
        }
    }
}

分析到这里,基于nacos数据同步的工作就算完成了。核心逻辑就是就更新的信息放到网关的内存中,使用时再去内存中拿,所以Soul网关的效率是很高的。

4. 使用更新后的数据

选择器信息完成更新后,通过http去访问soul网关,这里以divide插件为例。关于divide插件的使用请参考之前的文章。

发起一个GET请求:http://localhost:9195/http/order/findById?id=100,代码会执行到下面这个位置:

# org.dromara.soul.plugin.base.AbstractSoulPlugin#execute
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
        String pluginName = named();
        final PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
        if (pluginData != null && pluginData.getEnabled()) {
            //获取选择器信息
            final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);
            if (CollectionUtils.isEmpty(selectors)) {
                return handleSelectorIsNull(pluginName, exchange, chain);
            }
            
            //省略了其他代码
            
            return doExecute(exchange, chain, selectorData, rule);
        }
        return chain.execute(exchange);
    }

代码BaseDataCache.getInstance().obtainSelectorData(pluginName);就是我们在数据同步时操作的数据缓存类,RULE_MAP就是刚才更新的规则信息。

public final class BaseDataCache {
    private static final ConcurrentMap<String, List<SelectorData>> SELECTOR_MAP = Maps.newConcurrentMap();
    
    //省略了其他代码......  
    public List<SelectorData> obtainSelectorData(final String pluginName) {
        return SELECTOR_MAP.get(pluginName);
    }
}

刚才,我们发起的请求:http://localhost:9195/http/order/findById?id=100,是匹配不到选择器的:

{
    "code": -107,
    "message": "Can not find selector, please check your configuration!",
    "data": null
}

因为,在开始的时候,更新了选择器的配置:查询条件中id=99才能匹配成功。

所以,我们另外再发起一个id=99请求:http://localhost:9195/http/order/findById?id=99,就可以成功了。


{
    "id": "99",
    "name": "hello world findById"
}

最后,本文通过源码的方式跟踪了Soul网关是如何通过Nacos完成数据同步的:数据修改后,通过Spring发布修改事件,由NacosConfigService发送数据。在网关层有Listener接收变更的配置数据,然后进行处理数据,最后将数据保存到网关内存。

Soul网关中的数据同步之ZooKeeper

在上一篇文章中,跟踪了基于WebSocket的数据同步原理,本篇文件将要跟踪基于ZoomKeeper的数据同步原理。

  • 基于 zookeeper 的同步原理很简单,主要是依赖 zookeeper 的 watch 机制,soul-web 会监听配置的节点,soul-admin 在启动的时候,会将数据全量写入 zookeeper,后续数据发生变更时,会增量更新 zookeeper 的节点,与此同时,soul-web 会监听配置信息的节点,一旦有信息变更时,会更新本地缓存。

同步的核心逻辑是:在soul-admin后台修改数据,先保存到数据库;然后将修改的信息通过同步策略发送到soul网关;由网关处理后,保存在soul网关内存;使用时,从网关内存获取数据。

本文的分析是想通过跟踪源码的方式来理解同步的核心逻辑,数据同步分析步骤如下:

  • 1.修改选择器
  • 2.更新数据
  • 3.接收数据
  • 4.使用更新后的数据
1. 修改选择器

在演示案例之前,配置一下soul-admin端的数据同步方式为ZooKeeperZooKeeper的启动和安装请参考前面的文章):

soul:
  database:
    dialect: mysql
    init_script: "META-INF/schema.sql"
    init_enable: true
  sync:
#    websocket:
#      enabled: true
      zookeeper:
          url: localhost:2181
          sessionTimeout: 5000
          connectionTimeout: 2000

soul-bootstrap也配置一下数据同步方式为ZooKeeper:

soul :
    file:
      enabled: true
    corss:
      enabled: true
    dubbo :
      parameter: multi
    sync:
#        websocket :
#             urls: ws://localhost:9095/websocket

        zookeeper:
             url: localhost:2181
             sessionTimeout: 5000
             connectionTimeout: 2000

现在,我们以一个实际调用过程为例,比如在Soul网关管理系统中,对选择器的配置信息进行修改:查询条件中id=100才能匹配成功。具体信息如下所示:

1

点击确认后,进入到soul-adminupdateSelector()这个接口。

    @PutMapping("/{id}")
    public SoulAdminResult updateSelector(@PathVariable("id") final String id, @RequestBody final SelectorDTO selectorDTO) {
        Objects.requireNonNull(selectorDTO);
        selectorDTO.setId(id);
        Integer updateCount = selectorService.createOrUpdate(selectorDTO);
        return SoulAdminResult.success(SoulResultMessage.UPDATE_SUCCESS, updateCount);
    }

2.更新数据

进入到后端系统后,会现在数据中更新信息,然后通过publishEvent()方法将更新的信息同步到网关。(下面代码只是展示了主要的逻辑,完整的代码请参考Soul源码。)

 @Transactional(rollbackFor = RuntimeException.class)
    public int createOrUpdate(final SelectorDTO selectorDTO) {
        int selectorCount;
        SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);
		//将更新的数据保存到soul-admin的数据库
        
        //省略了其他代码
        
        //将更新的数据同步到网关
        publishEvent(selectorDO, selectorConditionDTOs);
        return selectorCount;
    }

publishEvent()方法中调用了eventPublisher.publishEvent(),这个eventPublisher对象是一个ApplicationEventPublisher类,这个类的全限定名是org.springframework.context.ApplicationEventPublisher。看到这儿,我们知道了发布数据是通过Spring相关的功能来完成的。

    private void publishEvent(final RuleDO ruleDO, final List<RuleConditionDTO> ruleConditions) {
		//省略了其他代码......
        // publish change event.
        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, DataEventTypeEnum.UPDATE,
                Collections.singletonList(RuleDO.transFrom(ruleDO, pluginDO.getName(), conditionDataList))));
    }

Spring完成事件发布后,肯定有对应的监听器来处理它,这个监听器是ApplicationListener接口。在Soul中是通过DataChangedEventDispatcher这个类来完成具体监听工作的,它实现了ApplicationListener接口。

//处理监听事件
@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
	//省略了其他代码......
    
    @Override
    @SuppressWarnings("unchecked")
    public void onApplicationEvent(final DataChangedEvent event) {
        for (DataChangedListener listener : listeners) {
            switch (event.getGroupKey()) {
                case APP_AUTH: //认证授权
                    listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
                    break;
                case PLUGIN: //修改了插件
                    listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
                    break;
                case RULE:  //修改了规则
                    listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
                    break;
                case SELECTOR: //修改了选择器
                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
                    break;
                case META_DATA: //修改了元数据
                    listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
                    break;
                default:
                    throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
            }
        }
    }

    //在bean初始化的时候,将实现DataChangedListener接口的bean加载进来。
    @Override
    public void afterPropertiesSet() {
        Collection<DataChangedListener> listenerBeans = applicationContext.getBeansOfType(DataChangedListener.class).values();
        this.listeners = Collections.unmodifiableList(new ArrayList<>(listenerBeans));
    }

}

注意一下,这个DataChangedEventDispatcher还实现了InitializingBean接口,并重写了它的afterPropertiesSet()方法,做的事情是:在bean初始化的时候,将实现DataChangedListener接口的bean加载进来。通过查看源码,可以看到4种数据同步的方式都实现了该接口,其中就有我们这次使用的ZooKeeper数据同步方式。

1

当监听器监听到有事件发布后,会执行onApplicationEvent()方法,这里面的逻辑是循环处理DataChangedListener,通过switch / case表达式匹配修改的是什么类型信息,我们这里修改的是选择器,所以会匹配到listener.onSelectorChanged()这个方法。(这里虽然用了循环的方式处理每一个listener,但在实际中我们只需要一种数据同步方式就好。)

本次使用的是ZooKeeper进行数据同步,所以listener.onSelectorChanged()的实际执行方法是ZookeeperDataChangedListener中的onSelectorChanged。这里面做的事情是:

  • 1.构建路径用于存放变更数据;
  • 2.更新数据到ZooKeeper

public void onSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {
        
        for (SelectorData data : changed) {
            //构建路径
            final String selectorRealPath = ZkPathConstants.buildSelectorRealPath(data.getPluginName(), data.getId());

            //省略了其他代码......
            
            final String selectorParentPath = ZkPathConstants.buildSelectorParentPath(data.getPluginName());
			
            //更新数据到ZooKeeper
            upsertZkNode(selectorRealPath, data);
        }
    }

真正更新数据的操作是通过 zkClient.writeData()完成。


	//真正更新数据的操作是通过 zkClient.writeData()完成
    private void upsertZkNode(final String path, final Object data) {
        if (!zkClient.exists(path)) {
            zkClient.createPersistent(path, true);
        }
        zkClient.writeData(path, data);
    }
3.接收数据

Soul网关中,接受数据的操作也是通过zkClient进行订阅变更。通过 zkClient.subscribeDataChanges()ZooKeeper订阅变更的数据,然后去处理。

public class ZookeeperSyncDataService implements SyncDataService, AutoCloseable {
    	//省略了其他代码......
       
    private void subscribeSelectorDataChanges(final String path) {
        //订阅数据变更
        zkClient.subscribeDataChanges(path, new IZkDataListener() {
            @Override
            public void handleDataChange(final String dataPath, final Object data) {
                //处理数据
                cacheSelectorData((SelectorData) data);
            }

            @Override
            public void handleDataDeleted(final String dataPath) {
                unCacheSelectorData(dataPath);
            }
        });
    }
      
    //处理数据
    private void cacheSelectorData(final SelectorData selectorData) {
        Optional.ofNullable(selectorData)
                .ifPresent(data -> Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> e.onSelectorSubscribe(data)));
    }
}

实际处理数据还是由CommonPluginDataSubscriber来处理。CommonPluginDataSubscriber在处理数据时,根据数据类型和操作类型来分别处理。当前我们测试的是更新选择器信息,所以会进入更新的逻辑,就是下面代码中的BaseDataCache.getInstance().cacheRuleData(ruleData);


public class CommonPluginDataSubscriber implements PluginDataSubscriber {
    
   //省略了其他代码
     
   private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {
        Optional.ofNullable(classData).ifPresent(data -> {
            if (data instanceof PluginData) {
                PluginData pluginData = (PluginData) data;
                if (dataType == DataEventTypeEnum.UPDATE) {
                     //省略了其他代码
                }
            } else if (data instanceof SelectorData) {
                SelectorData selectorData = (SelectorData) data;
                if (dataType == DataEventTypeEnum.UPDATE) {
                    //更新网关内存中缓存的信息
                    BaseDataCache.getInstance().cacheSelectData(selectorData);
                   //更新部分插件信息 Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));
                } 
                }
            } else if (data instanceof RuleData) {
               //省略了其他代码
                }
            }
        });
    }
}

BaseDataCache.getInstance().cacheSelectData(selectorData);代码中,做的事情就是根据传入的变更信息来更新SELECTOR_MAP。这个SELECTOR_MAP缓存了选择器信息,网关在后续使用时,也是从这里获取具体的选择器去匹配请求。

public final class BaseDataCache {
    private static final ConcurrentMap<String, List<SelectorData>> SELECTOR_MAP = Maps.newConcurrentMap();
    
    //省略了其他代码......
    
    //缓存选择器
    public void cacheSelectData(final SelectorData selectorData) {
        Optional.ofNullable(selectorData).ifPresent(this::selectorAccept);
    }

    //接受选择器
    private void selectorAccept(final SelectorData data) {
        String key = data.getPluginName();
        if (SELECTOR_MAP.containsKey(key)) {
            List<SelectorData> existList = SELECTOR_MAP.get(key);
            //删除之前的选择器
            final List<SelectorData> resultList = existList.stream().filter(r -> !r.getId().equals(data.getId())).collect(Collectors.toList());
            resultList.add(data);
            //保存现在的选择器
            final List<SelectorData> collect = resultList.stream().sorted(Comparator.comparing(SelectorData::getSort)).collect(Collectors.toList());
            SELECTOR_MAP.put(key, collect);
        } else {
            SELECTOR_MAP.put(key, Lists.newArrayList(data));
        }
    }
}

分析到这里,基于ZooKeeper数据同步的工作就算完成了。核心逻辑就是就更新的信息放到网关的内存中,使用时再去内存中拿,所以Soul网关的效率是很高的。

4. 使用更新后的数据

选择器信息完成更新后,通过http去访问soul网关,这里以divide插件为例。关于divide插件的使用请参考之前的文章。

发起一个GET请求:http://localhost:9195/http/order/findById?id=1,代码会执行到下面这个位置:

# org.dromara.soul.plugin.base.AbstractSoulPlugin#execute
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
        String pluginName = named();
        final PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
        if (pluginData != null && pluginData.getEnabled()) {
            //获取选择器信息
            final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);
            if (CollectionUtils.isEmpty(selectors)) {
                return handleSelectorIsNull(pluginName, exchange, chain);
            }
            
            //省略了其他代码
            
            return doExecute(exchange, chain, selectorData, rule);
        }
        return chain.execute(exchange);
    }

代码BaseDataCache.getInstance().obtainSelectorData(pluginName);就是我们在数据同步时操作的数据缓存类,RULE_MAP就是刚才更新的规则信息。

public final class BaseDataCache {
    private static final ConcurrentMap<String, List<SelectorData>> SELECTOR_MAP = Maps.newConcurrentMap();
    
    //省略了其他代码......  
    public List<SelectorData> obtainSelectorData(final String pluginName) {
        return SELECTOR_MAP.get(pluginName);
    }
}

刚才,我们发起的请求:http://localhost:9195/http/order/findById?id=1,是匹配不到选择器的:

{
    "code": -107,
    "message": "Can not find selector, please check your configuration!",
    "data": null
}

因为,在开始的时候,更新了选择器的配置:查询条件中id=100才能匹配成功。

所以,我们另外再发起一个id=100请求:http://localhost:9195/http/order/findById?id=100,就可以成功了。


{
    "id": "100",
    "name": "hello world findById"
}

最后,本文通过源码的方式跟踪了Soul网关是如何通过ZooKeeper完成数据同步的:数据修改后,通过Spring发布修改事件,由zkClient发送数据。在网关层也有zkClient订阅变更的数据,然后进行处理数据,最后将数据保存到网关内存。

Soul网关中的数据同步之WebSocket

在前面几篇文章中我们体验了如何将自己的服务接入到Soul网关中,接下来几篇我们将要了解的是Soul是如何完成数据同步的,在官网中介绍了4种同步方式:基于WebSocket的数据同步,基于ZoomKeeper的数据同步,基于Http长轮询的数据同步和基于Nacos的数据同步。我们将依次进行分析,本篇文章分析的是基于WebSocket的数据同步。

数据同步的原理在官网已经有讲述了数据同步原理

Soul 数据同步的流程,Soul 网关在启动时,会从从配置服务同步配置数据,并且支持推拉模式获取配置变更信息,并且更新本地缓存。而管理员在管理后台,变更用户、规则、插件、流量配置,通过推拉模式将变更信息同步给 Soul 网关,具体是 push 模式,还是 pull 模式取决于配置。

1

  • 如果是 websocket 同步策略,则将变更后的数据主动推送给 soul-web,并且在网关层,会有对应的 WebsocketDataHandler 处理器处理来处 admin 的数据推送。

同步的核心逻辑是:在soul-admin后台修改数据,先保存到数据库;然后将修改的信息通过同步策略发送到soul网关;由网关处理后,保存在soul网关内存;使用时,从网关内存获取数据。

本文的分析是想通过跟踪源码的方式来理解同步的核心逻辑,数据同步分析步骤如下:

  • 1.修改规则
  • 2.更新数据
  • 3.接受数据
  • 4.使用更新后的数据
1. 修改规则

我们以一个实际调用过程为例,比如在Soul网关管理系统中,对一项规则进行修改:将divide插件中的/http/order/findById规则的重试次数修改为2。具体信息如下所示:

1

点击确认后,进入到soul-adminupdateRule()这个接口。

    @PutMapping("/{id}")
    public SoulAdminResult updateRule(@PathVariable("id") final String id, @RequestBody final RuleDTO ruleDTO) {
        Objects.requireNonNull(ruleDTO);
        ruleDTO.setId(id);
        Integer updateCount = ruleService.createOrUpdate(ruleDTO);
        return SoulAdminResult.success(SoulResultMessage.UPDATE_SUCCESS, updateCount);
    }

2.更新数据

进入到后端系统后,会现在数据中更新信息,然后通过publishEvent()方法将更新的信息同步到网关。(下面代码只是展示了主要的逻辑,完整的代码请参考Soul源码。)

@Transactional(rollbackFor = Exception.class)
    public int createOrUpdate(final RuleDTO ruleDTO) {
        int ruleCount;
        RuleDO ruleDO = RuleDO.buildRuleDO(ruleDTO);
        List<RuleConditionDTO> ruleConditions = ruleDTO.getRuleConditions();
        if (StringUtils.isEmpty(ruleDTO.getId())) {
            //在数据库更新数据
            ruleCount = ruleMapper.insertSelective(ruleDO);
      		//省略了其他代码......
        } else {
            //在数据库更新数据
            ruleCount = ruleMapper.updateSelective(ruleDO);
           //省略了其他代码......
        }
        
        //将更新的数据同步到Soul网关
        publishEvent(ruleDO, ruleConditions);
        return ruleCount;
    }

publishEvent()方法中调用了eventPublisher.publishEvent(),这个eventPublisher对象是一个ApplicationEventPublisher类,这个类的全限定名是org.springframework.context.ApplicationEventPublisher。看到这儿,我们知道了发布数据是通过Spring相关的功能来完成的。

    private void publishEvent(final RuleDO ruleDO, final List<RuleConditionDTO> ruleConditions) {
		//省略了其他代码......
        // publish change event.
        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, DataEventTypeEnum.UPDATE,
                Collections.singletonList(RuleDO.transFrom(ruleDO, pluginDO.getName(), conditionDataList))));
    }

Spring完成事件发布后,肯定有对应的监听器来处理它,这个监听器是ApplicationListener接口。在Soul中是通过DataChangedEventDispatcher这个类来完成具体监听工作的,它实现了ApplicationListener接口。

//处理监听事件
@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
	//省略了其他代码......
    
    @Override
    @SuppressWarnings("unchecked")
    public void onApplicationEvent(final DataChangedEvent event) {
        for (DataChangedListener listener : listeners) {
            switch (event.getGroupKey()) {
                case APP_AUTH: //认证授权
                    listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
                    break;
                case PLUGIN: //修改了插件
                    listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
                    break;
                case RULE:  //修改了规则
                    listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
                    break;
                case SELECTOR: //修改了选择器
                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
                    break;
                case META_DATA: //修改了元数据
                    listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
                    break;
                default:
                    throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
            }
        }
    }

    //在bean初始化的时候,将实现DataChangedListener接口的bean加载进来。
    @Override
    public void afterPropertiesSet() {
        Collection<DataChangedListener> listenerBeans = applicationContext.getBeansOfType(DataChangedListener.class).values();
        this.listeners = Collections.unmodifiableList(new ArrayList<>(listenerBeans));
    }

}

注意一下,这个DataChangedEventDispatcher还实现了InitializingBean接口,并重写了它的afterPropertiesSet()方法,做的事情是:在bean初始化的时候,将实现DataChangedListener接口的bean加载进来。通过查看源码,可以看到4种数据同步的方式都实现了该接口,其中就有我们这次使用的WebSocket数据同步方式。

1

当监听器监听到有事件发布后,会执行onApplicationEvent()方法,这里面的逻辑是循环处理DataChangedListener,通过switch / case表达式匹配修改的是什么类型信息,我们这里修改的是规则,所以会匹配到listener.onRuleChanged()这个方法。(这里虽然用了循环的方式处理每一个listener,但在实际中我们只需要一种数据同步方式就好。)

本次使用的是WebSocket进行数据同步,所以listener.onRuleChanged()的实际执行方法是WebsocketDataChangedListener中的onRuleChanged()。这里面做的事情是:

  • 1.将更新数据转成WebsocketData的形式;
  • 2.通过WebsocketCollector发布数据(数据又转成了json)。

public class WebsocketDataChangedListener implements DataChangedListener {
	//省略了其他代码......

    @Override
    public void onRuleChanged(final List<RuleData> ruleDataList, final DataEventTypeEnum eventType) {
        WebsocketData<RuleData> configData =
                new WebsocketData<>(ConfigGroupEnum.RULE.name(), eventType.name(), ruleDataList);
        WebsocketCollector.send(GsonUtils.getInstance().toJson(configData), eventType);
    }

  //省略了其他代码......

}

WebsocketCollector中的send()方法如下,核心逻辑就是session.getBasicRemote().sendText(message);。到这儿,soul-admin就通过websocket就更新规则的数据发布出去了,等待WebSocketClient去接收了。

public static void send(final String message, final DataEventTypeEnum type) {
        if (StringUtils.isNotBlank(message)) {
		//省略了其他代码......
            for (Session session : SESSION_SET) {
                try {
                    session.getBasicRemote().sendText(message);
                } catch (IOException e) {
                    log.error("websocket send result is exception: ", e);
                }
            }
        }
    }
3.接收数据

Soul网关中,SoulWebsocketClient继承了WebSocketClient,所以它会处理soul-admin通过WebSocket发出的数据。处理的入口是在onMessage()方法中,它又调用了handleResult()方法,核心方法是websocketDataHandler.executor()

public final class SoulWebsocketClient extends WebSocketClient {
    	//省略了其他代码......
   
    @Override
    public void onMessage(final String result) {
        handleResult(result);
    }
    	//省略了其他代码......
    
    @SuppressWarnings("ALL")
    private void handleResult(final String result) {
        WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);
        ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());
        String eventType = websocketData.getEventType();
        String json = GsonUtils.getInstance().toJson(websocketData.getData());
        websocketDataHandler.executor(groupEnum, json, eventType);
    }
}

WebsocketDataHandler可以看成是一个工厂类,里面定义好了处理信息的类型:插件,选择器,规则,认证,元数据。


public class WebsocketDataHandler {

    private static final EnumMap<ConfigGroupEnum, DataHandler> ENUM_MAP = new EnumMap<>(ConfigGroupEnum.class);

    public WebsocketDataHandler(final PluginDataSubscriber pluginDataSubscriber,
                                final List<MetaDataSubscriber> metaDataSubscribers,
                                final List<AuthDataSubscriber> authDataSubscribers) {
        ENUM_MAP.put(ConfigGroupEnum.PLUGIN, new PluginDataHandler(pluginDataSubscriber));
        ENUM_MAP.put(ConfigGroupEnum.SELECTOR, new SelectorDataHandler(pluginDataSubscriber));
        ENUM_MAP.put(ConfigGroupEnum.RULE, new RuleDataHandler(pluginDataSubscriber));
        ENUM_MAP.put(ConfigGroupEnum.APP_AUTH, new AuthDataHandler(authDataSubscribers));
        ENUM_MAP.put(ConfigGroupEnum.META_DATA, new MetaDataHandler(metaDataSubscribers));
    }

    public void executor(final ConfigGroupEnum type, final String json, final String eventType) {
        ENUM_MAP.get(type).handle(json, eventType);
    }
}

根据传入的数据类型,使用对应的Handler去处理,比如,我们修改的是规则信息,所以这里会调用RuleDataHandler来处理。跟踪进去后,发现RuleDataHandler继承了AbstractDataHandler类,其他几种数据类型也继承了该类。

1

通过源码发现,这里运用了模板方法的设计模式。定义好了通用的方法handle(),其他方法都是抽象方法,由子类去实现。在handle()方法中通过switch / case表达式去匹配操作类型,然后执行实际的方法。

public abstract class AbstractDataHandler<T> implements DataHandler {
   
    //抽象方法
    protected abstract void doUpdate(List<T> dataList);

    //这里省略了其他抽象方法

    //通用方法
    @Override
    public void handle(final String json, final String eventType) {
        List<T> dataList = convert(json);
        if (CollectionUtils.isNotEmpty(dataList)) {
            DataEventTypeEnum eventTypeEnum = DataEventTypeEnum.acquireByName(eventType);
            switch (eventTypeEnum) {
                case REFRESH:
                case MYSELF:
                    doRefresh(dataList);
                    break;
                case UPDATE:
                case CREATE:
                    doUpdate(dataList); //根据操作类型匹配实际的执行方法。
                    break;
                case DELETE:
                    doDelete(dataList);
                    break;
                default:
                    break;
            }
        }
    }
}

RuleDataHandler中,更新方法的逻辑交给了pluginDataSubscriberonRuleSubscribe()方法。而这是一个接口,它的实现类是CommonPluginDataSubscriber

public class RuleDataHandler extends AbstractDataHandler<RuleData> {

    private final PluginDataSubscriber pluginDataSubscriber;

    //省略了其他方法
       
    @Override
    protected void doUpdate(final List<RuleData> dataList) {
        dataList.forEach(pluginDataSubscriber::onRuleSubscribe);
    }

    
}

CommonPluginDataSubscriber在处理更新的信息,当前我们测试的是更新规则信息,所以会进入更新的逻辑,就是下面代码中的BaseDataCache.getInstance().cacheRuleData(ruleData);


public class CommonPluginDataSubscriber implements PluginDataSubscriber {
    
   //省略其他代码
    
    //处理订阅信息
    @Override
    public void onRuleSubscribe(final RuleData ruleData) {
        subscribeDataHandler(ruleData, DataEventTypeEnum.UPDATE);
    }
     
    private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {
        Optional.ofNullable(classData).ifPresent(data -> {
            if (data instanceof PluginData) {
                PluginData pluginData = (PluginData) data;
                if (dataType == DataEventTypeEnum.UPDATE) {
					//省略其他代码
                }
            } else if (data instanceof SelectorData) {
            //省略其他代码
            } else if (data instanceof RuleData) {
                RuleData ruleData = (RuleData) data;
                if (dataType == DataEventTypeEnum.UPDATE) {
                    //更新通用缓存信息
                    BaseDataCache.getInstance().cacheRuleData(ruleData);
                    //更新插件中的缓存信息
                    Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));
                } 
                //省略其他代码
                }
            }
        });
    }
}

BaseDataCache.getInstance().cacheRuleData(ruleData);代码中,做的事情就是根据传入的变更信息来更新RULE_MAP。这个RULE_MAP缓存了规则信息,网关在后续使用时,也是从这里获取具体规则去匹配请求。

public final class BaseDataCache {
    private static final ConcurrentMap<String, List<RuleData>> RULE_MAP = Maps.newConcurrentMap();
    
    //省略了其他代码......
    
    //缓存规则
    public void cacheRuleData(final RuleData ruleData) {
        Optional.ofNullable(ruleData).ifPresent(this::ruleAccept);
    }

    //接受规则
    private void ruleAccept(final RuleData data) {
                String selectorId = data.getSelectorId();
                if (RULE_MAP.containsKey(selectorId)) {
                    List<RuleData> existList = RULE_MAP.get(selectorId);
                    //删除原来的规则
                    final List<RuleData> resultList = existList.stream().filter(r -> !r.getId().equals(data.getId())).collect(Collectors.toList());
                    resultList.add(data);
                    //保存新的规则
                    final List<RuleData> collect = resultList.stream().sorted(Comparator.comparing(RuleData::getSort)).collect(Collectors.toList());
                    RULE_MAP.put(selectorId, collect);
                } else {
                    RULE_MAP.put(selectorId, Lists.newArrayList(data));
                }
        }
}

分析到这里,数据同步的工作就算完成了。核心逻辑就是就更新的信息放到网关的内存中,使用时再去内存中拿,所以Soul网关的效率是很高的。

4. 使用更新后的数据

规则信息完成更新后,通过http去访问soul网关,这里以divide插件为例。关于divide插件的使用请参考之前的文章。

发起一个GET请求:http://localhost:9195/http/order/findById?id=1,代码会执行到下面这个位置:

# org.dromara.soul.plugin.base.AbstractSoulPlugin#execute
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
        String pluginName = named();
        final PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
        if (pluginData != null && pluginData.getEnabled()) {
            final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);

			//省略了其他代码
            
            //从缓存中获取规则信息
            final List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());
            if (CollectionUtils.isEmpty(rules)) {
                return handleRuleIsNull(pluginName, exchange, chain);
            }
            RuleData rule;
            if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {
                //get last
                rule = rules.get(rules.size() - 1);
            } else {
                rule = matchRule(exchange, rules);
            }
            //省略了其他代码
            return doExecute(exchange, chain, selectorData, rule);
        }
        return chain.execute(exchange);
    }

代码BaseDataCache.getInstance().obtainRuleData(selectorData.getId());就是我们在数据同步时操作的数据缓存类,RULE_MAP就是刚才更新的规则信息。

public final class BaseDataCache {
    private static final ConcurrentMap<String, List<RuleData>> RULE_MAP = Maps.newConcurrentMap();
    
    //省略了其他代码......  
	public List<RuleData> obtainRuleData(final String selectorId) {
        return RULE_MAP.get(selectorId);
    }
}

最后,本文通过源码的方式跟踪了Soul网关是如何通过WebSocket完成数据同步的:数据修改后,通过Spring发布修改事件,由WebSocket发送数据,SoulWebSocket会处理数据,最后将数据保存到内存。