Soul源码阅读系列(四)负载均衡的应用

divide插件中,Soul网关提供了负载均衡算法,对请求网关的IP选择一个真实的服务。在Soul中,负载均衡算法有三种:HashLoadBalanceRandomLoadBalanceRoundRobinLoadBalance。默认使用的是RandomLoadBalance算法,你可以对每个规则要使用何种策略进行设置。

使用时机

在执行divide插件时,会调用负载均衡算法,根据选择的结果,设置真实的请求服务。

//org.dromara.soul.plugin.divide.DividePlugin#doExecute
protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
	
    	//省略了其他代码
    
        final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
    	//使用负载均衡    
       DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip);
      
        // 设置真实的请求服务
        String domain = buildDomain(divideUpstream);
      
   		 //省略了其他代码
        return chain.execute(exchange);
    }
LoadBalance的继承关系如下

类的设计使用的时模板方法设计模式:在抽象类中实现每个组件通用的功能,一些具体的功能留给子类去实现。在这里AbstrctLoadBalance实现了负载均衡的通用方法:获取权重,入参判断等。doSelect()留给了子类去实现,即具体的负载均衡算法逻辑。

public abstract class AbstractLoadBalance implements LoadBalance {
	
    //子类去实现
    protected abstract DivideUpstream doSelect(List<DivideUpstream> upstreamList, String ip);

    @Override
    public DivideUpstream select(final List<DivideUpstream> upstreamList, final String ip) {
        if (CollectionUtils.isEmpty(upstreamList)) {
            return null;
        }
        //如果只有一个服务,就直接返回
        if (upstreamList.size() == 1) {
            return upstreamList.get(0);
        }
        return doSelect(upstreamList, ip);
    }

    protected int getWeight(final DivideUpstream upstream) {
     //省略了代码的具体实现
    }

	//省略其他代码
}

select()方法中,先判断有没有服务;然后判断是否只有一个,是的话,就直接返回。因为一个不需要负载均衡,只能请求它。有多个就会通过负载均衡选择具体的类。

RandomLoadBalance的原理

RandomLoadBalance 是加权随机算法的具体实现。假设我们有一组服务器 servers = [A, B, C],他们对应的权重为 weights = [5, 3, 2],权重总和为10。现在把这些权重值平铺在一维坐标值上,[0, 5) 区间属于服务器 A[5, 8) 区间属于服务器 B[8, 10) 区间属于服务器 C

接下来通过随机数生成器生成一个范围在 [0, 10) 之间的随机数,然后计算这个随机数会落到哪个区间上。比如数字3会落到服务器 A 对应的区间上,此时返回服务器 A 即可。权重越大的机器,在坐标轴上对应的区间范围就越大,因此随机数生成器生成的数字就会有更大的概率落到此区间内。

只要随机数生成器产生的随机数分布性很好,在经过多次选择后,每个服务器被选中的次数比例接近其权重比例。比如,经过一万次选择后,服务器 A 被选中的次数大约为5000次,服务器 B 被选中的次数约为3000次,服务器 C 被选中的次数约为2000次。

在代码实现上,并没有真正的创建各个区间,而是通过每次生成的随机数减去服务器的权重,当出现小于0时,就选择这个服务器。

还是用上面的例子,再说明一下,我们有 servers = [A, B, C]weights = [5, 3, 2],生成一个随机数offset = 7

  • 第一次循环,offset - 5 = 2 > 0,即 offset > 5, 表明其不会落在服务器 A 对应的区间上。
  • 第二次循环,offset - 3 = -1 < 0,即 5 < offset < 8,表明其会落在服务器 B 对应的区间上

    @Override
    public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {
        //所有权重
        int totalWeight = calculateTotalWeight(upstreamList);
        //是否相同
        boolean sameWeight = isAllUpStreamSameWeight(upstreamList);
        if (totalWeight > 0 && !sameWeight) {
            return random(totalWeight, upstreamList);
        }
        // 如果权重相同或者为0,则随机选择一个。
        return random(upstreamList);
    }

   //加权随机算法
    private DivideUpstream random(final int totalWeight, final List<DivideUpstream> upstreamList) {
        // 随机数
        int offset = RANDOM.nextInt(totalWeight);
        // 对每个服务器进行处理
        for (DivideUpstream divideUpstream : upstreamList) {
            //随机数 = 随机数 - 当前服务器的权重
            offset -= getWeight(divideUpstream);
            //小于0,就选中,表示落在这个区间内了
            if (offset < 0) {
                return divideUpstream;
            }
        }
        return upstreamList.get(0);
    }

  //如果权重相同或者为0,则随机选择一个。
    private DivideUpstream random(final List<DivideUpstream> upstreamList) {
        return upstreamList.get(RANDOM.nextInt(upstreamList.size()));
    }

#####

RandomLoadBalance 是一个简单,高效的负载均衡算法实现。在经过多次请求后,能够将调用请求按照权重值进行“均匀”分配。当然 RandomLoadBalance 也存在一定的缺点,当调用次数比较少时,Random 产生的随机数可能会比较集中,此时多数请求会落到同一台服务器上。这个缺点并不是很严重,多数情况下可以忽略。

HashLoadBalance的原理

通过哈希算法计算所有服务地址,保存到map。然后也对当前请求的IP计算hash值,根据这个值到map中获取服务。

这里其实应用了一致性哈希的思想,用于尽可能地降低节点变动带来的数据迁移开销,可以参看这里,。

    @Override
    public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {
        //线程安全,且有序的 map
        final ConcurrentSkipListMap<Long, DivideUpstream> treeMap = new ConcurrentSkipListMap<>();
       	//计算每个服务的hash
        for (DivideUpstream address : upstreamList) {
            for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
                //根据服务节点的 url 和 序号 计算hash值
                long addressHash = hash("SOUL-" + address.getUpstreamUrl() + "-HASH-" + i);
                treeMap.put(addressHash, address);
            }
        }
        //计算当前的hash
        long hash = hash(String.valueOf(ip));
        //返回所有(key >= hash)的映射集合
        SortedMap<Long, DivideUpstream> lastRing = treeMap.tailMap(hash);
        if (!lastRing.isEmpty()) {
            //取虚拟节点对应的真实服务节点
            return lastRing.get(lastRing.firstKey());
        }
        return treeMap.firstEntry().getValue();
    }
RoundRobinLoadBalance的原理

这里使用的是平滑加权轮询算法,实现比较复杂,完整代码逻辑请看源码。

假设我们有 servers = [A, B, C]weights = [5, 1, 1],这个权重,我们称之为“固定权重数组”,相应的,有一个叫“非固定权重数组”,“非固定权重数组”每次都会根据一定的规则发生变动。规则如下:每次有请求时,从当前权重中选择权重最大的服务器,用于处理请求。然后,更新非固定权重数组,它等于被选中请求的服务器固定权重减去总权重,其余的保留。以后发生的请求都按照这个规则来处理。

上面描述不是很好理解,下面还是举例进行说明。这里仍然使用服务器 [A, B, C] 对应权重 [5, 1, 1] 的例子说明,现在有7个请求依次进入负载均衡逻辑,选择过程如下:

@Override
    public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {
        String key = upstreamList.get(0).getUpstreamUrl();
        ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);

        for (DivideUpstream upstream : upstreamList) {
            String rKey = upstream.getUpstreamUrl();
            WeightedRoundRobin weightedRoundRobin = map.get(rKey);
            int weight = getWeight(upstream);
    		//创建服务权重
            if (weightedRoundRobin == null) {
                weightedRoundRobin = new WeightedRoundRobin();
                weightedRoundRobin.setWeight(weight);
                map.putIfAbsent(rKey, weightedRoundRobin);
            }
            //权重更新
            if (weight != weightedRoundRobin.getWeight()) {
                //weight changed
                weightedRoundRobin.setWeight(weight);
            }
            //增加权重
            long cur = weightedRoundRobin.increaseCurrent();
            weightedRoundRobin.setLastUpdate(now);
            //找出最大的
            if (cur > maxCurrent) {
                maxCurrent = cur;
                selectedInvoker = upstream;
                selectedWRR = weightedRoundRobin;
            }
            totalWeight += weight;
        }
        //省略了其他代码
        
        if (selectedInvoker != null) {
            selectedWRR.sel(totalWeight);
            return selectedInvoker;
        }
        // should not happen here
        return upstreamList.get(0);
    }

小结,本文主要讲解了Soul网关中所采用的的负载均衡算法及其实现原理。

参考文献

Soul源码阅读系列(三)插件是如何被加载和执行的?

在上篇文章中,我们通过一个案例演示了http用户如何接入到Soul网关中,本文将探索其中的原理:

  • Soul如何加载插件?
  • 业务接口如何注册到soul admin中?
  • Divide插件的原理是什么?

Soul 如何加载插件?

首先,我们来看看Soul是如何加载各个插件的?在上一篇文章中,我们看到了Divide插件的调用过程:

  • SoulWebHandler:它实现了WebHandler,重写了handle()方法,用于处理Soul网关中所有的请求。
  • DefaultSoulPluginChain:插件链执行类,以责任链的设计模式处理所有插件。
  • AbstractSoulPlugin:多个插件的父类,以模板方法设计模式实现各种插件类型。
  • DividePlugindivide插件,用于处理http请求。

通过查看源码,可以发现在SoulWebHandler中,是通过构造器的方式将所有插件设置进来(在Soul网关中使用了Reactor并行编程,代码中的scheduler就是由reactor包实现,此处的作用是创建线程池)。那么又会有两个问题:一是 插件是如何生成的?二是 SoulWebHandler是何时被创建的?

public final class SoulWebHandler implements WebHandler {
    //...   
    
    public SoulWebHandler(final List<SoulPlugin> plugins) {
        //保存所有插件
        this.plugins = plugins;
        String schedulerType = System.getProperty("soul.scheduler.type", "fixed");
        if (Objects.equals(schedulerType, "fixed")) {
            //获取可用线程数
            int threads = Integer.parseInt(System.getProperty(
                    "soul.work.threads", "" + Math.max((Runtime.getRuntime().availableProcessors() << 1) + 1, 16)));
            //创建CPU内核数量的多线程池
            scheduler = Schedulers.newParallel("soul-work-threads", threads);
        } else {
            //无限制的弹性线程池,可以一直创建线程
            scheduler = Schedulers.elastic();
        }
    }
 
    //...    
}

IDEA编辑器中点击SoulWebHandler,查看被调用的地方,发现是在SoulConfiguration这个配置文件中,通过注解的方式创建了webHandler这个bean。所有的插件plugins也是作为一个参数传进来的,还得继续向上追踪。(由于版面有限,源码中省略了部分逻辑)

@Configuration声明一个类是配置类,在Spring Boot 启动时会加载配置类。

@Bean注解作用于方法上,会从Spring容器中将同类型的SoulPlugin自动注入进来。

@Configuration
public class SoulConfiguration {
    //...
    
    @Bean("webHandler")
    public SoulWebHandler soulWebHandler(final ObjectProvider<List<SoulPlugin>> plugins) {
        List<SoulPlugin> pluginList = plugins.getIfAvailable(Collections::emptyList);
        //排序
        final List<SoulPlugin> soulPlugins = pluginList.stream()
                .sorted(Comparator.comparingInt(SoulPlugin::getOrder)).collect(Collectors.toList());
        soulPlugins.forEach(soulPlugin -> log.info("load plugin:[{}] [{}]", soulPlugin.named(), soulPlugin.getClass().getName()));
        return new SoulWebHandler(soulPlugins);
    }
 //...   
}

那到这里,就清楚了:只要实现了SoulPlugin的类就会被注入进来,所以再看看SoulPlugin的实现类有哪些?

通过编辑器发现有 34 个类实现了SoulPlugin接口,其中有AbstractSoulPlugin,还有实现http请求接入的DividePlugin,还有其他的插件,我们后面再陆续探究。

现在还有问题是:各个插件是何时被创建的?我们以DividePlugin插件为例,通过IDEA编辑器点击DividePlugin,发现在DividePluginConfiguration中被创建了。

DividePluginConfiguration也是一个配置文件,在里面配置了dividePlugin这个bean,它的类型是SoulPlugin,所以在SoulConfiguration会自动注入到soulWebHandler()方法中。

@Configuration
public class DividePluginConfiguration {

    @Bean
    public SoulPlugin dividePlugin() {
        return new DividePlugin();
    }
    //...
}

DividePluginConfiguration是通过spring boot starter的方式自动加载的。

spring boot starter可以自动加载依赖,它在Spring Boot启动时,自动加载资源文件夹META-INF\spring.factories中配置的类。

分析到这里,就知道了Soul是如何加载插件的:在soul网关启动的时候,spring boot starter自动加载相关配置类,创建插件bean,然后在SoulWebHandler中将插件bean注入进来,保存到List中。用一张图来描述一下上述过程:

业务接口如何注册到 soul admin 中?

知道了插件是怎么被加载到soul网关中的之后,接下来再看看业务接口是如何注册到soul admin中?如下图所示,展示了divide插件对应的选择器列表和选择器规则列表,这些选择器和规则我们并没有自己添加上去,为什么就会存在呢?

是因为在业务系统中进行了配置:

  • adminUrlsoul-admin的地址,用于将业务系统的接口注册到soul-admin后台管理中去;
  • port:业务系统的端口;
  • contextPath:业务系统在网关中的上下文名称;
  • appName:业务系统的名称;
  • full:是否代理全部,如果是true,则代理业务系统的所有接口。
soul:
  http:
    adminUrl: http://localhost:9095 #soul-admin的地址
    port: 8188
    contextPath: /http
    appName: http
    full: false

还有一个关键注解是@SoulSpringMvcClient,业务系统在启动时,会读取这个注解进行处理。处理类是SpringMvcClientBeanPostProcessor,实现了BeanPostProcessor,是一个后置处理器,在bean的创建前后,分别有方法进行处理。

BeanPostProcessor 接口是 Spring 中的一个后置处理器接口,它的作用主要是如果我们需要在 Spring 容器完成 Bean 的实例化、配置和其他的初始化前后添加一些自己的逻辑处理,就可以实现该接口的,然后注册到容器中。

SpringMvcClientBeanPostProcessor部分源码如下,它做了下面几件事情:

  • bean初始化前,创建线程池,其中的线程用于将接口信息发送到soul-admin
  • bean初始化后,处理SoulSpringMvcClient注解;
  • 如果SoulSpringMvcClient注解作用于类上,那么就表示该类的所有接口都被网关代理,通过线程池中的线程将接口类发送到soul-admin中进行注册;
  • 如果SoulSpringMvcClient注解作用于方法上,那么就将该方法的接口信息通过线程池中的线程发送到soul-admin中进行注册。
public class SpringMvcClientBeanPostProcessor implements BeanPostProcessor {
	//..

	//在bean初始化前,创建线程池,其中的线程用于将接口信息发送到soul-admin中去
    public SpringMvcClientBeanPostProcessor(final SoulSpringMvcConfig soulSpringMvcConfig) {
		//...
        url = soulSpringMvcConfig.getAdminUrl() + "/soul-client/springmvc-register";
        executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
    }

    @Override
    public Object postProcessAfterInitialization(@NonNull final Object bean, @NonNull final String beanName) throws BeansException {
        //full是否为 true
        if (soulSpringMvcConfig.isFull()) {
            return bean;
        }
        //获取 Controller 注解
        Controller controller = AnnotationUtils.findAnnotation(bean.getClass(), Controller.class);
        //获取 RequestMapping 注解
        RequestMapping requestMapping = AnnotationUtils.findAnnotation(bean.getClass(), RequestMapping.class);
        if (controller != null || requestMapping != null) {
            //获取 SoulSpringMvcClient 注解
            SoulSpringMvcClient clazzAnnotation = AnnotationUtils.findAnnotation(bean.getClass(), SoulSpringMvcClient.class);
            String prePath = "";
            //是否作用于类上
            if (Objects.nonNull(clazzAnnotation)) {
                if (clazzAnnotation.path().indexOf("*") > 1) {
                    String finalPrePath = prePath;
                    //将接口信息发送到soul-amin
                    executorService.execute(() -> RegisterUtils.doRegister(buildJsonParams(clazzAnnotation, finalPrePath), url,
                            RpcTypeEnum.HTTP));
                    return bean;
                }
                prePath = clazzAnnotation.path();
            }
            
            final Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(bean.getClass());
            for (Method method : methods) {
                SoulSpringMvcClient soulSpringMvcClient = AnnotationUtils.findAnnotation(method, SoulSpringMvcClient.class);
                //SoulSpringMvcClient注解是否作用于方法上
                if (Objects.nonNull(soulSpringMvcClient)) {
                    String finalPrePath = prePath;
                     //将接口信息发送到soul-amin
                    executorService.execute(() -> RegisterUtils.doRegister(buildJsonParams(soulSpringMvcClient, finalPrePath), url,
                            RpcTypeEnum.HTTP));
                }
            }
        }
        return bean;
    }
    
    //...
}

通过debug可以看到发送的某个方法接口信息,包括业务系统名称,上下文名称,方法请求路径,是否启用等等。

{"appName":"http","context":"/http","path":"/http/test/**","pathDesc":"","rpcType":"http","host":"192.168.236.75","port":8188,"ruleName":"/http/test/**","enabled":true,"registerMetaData":false}

发送的路径是soul-admin后端的一个接口:

http://localhost:9095/soul-client/springmvc-register

那接着再跟踪一下springmvc-register这个接口又做了什么?

进入它的实现类(下面的源码只保留的主要逻辑),开始处理选择器信息,处理规则信息,都处理成功了,就返回一个成功的信息。

    public String registerSpringMvc(final SpringMvcRegisterDTO dto) {
		//...
        String selectorId = handlerSpringMvcSelector(dto);
        handlerSpringMvcRule(selectorId, dto);
        return SoulResultMessage.SUCCESS;
    }

选择器和规则的处理逻辑是相似的:先更新soul-admin数据中的选择器或规则信息;然后将选择器或规则信息通过发布事件的方式发送到soul网关。

private String handlerSpringMvcSelector(final SpringMvcRegisterDTO dto) {
        //...
        //更新选择器信息,保存到数据库
        selectorMapper.updateSelective(selectorDO);
        //保存业务系统信息
        upstreamCheckService.submit(contextPath, addDivideUpstream);
        //发布事件
        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData)));
        }
        return selectorId;
    }

发布事件的操作比较复杂,涉及到数据同步的原理,我们后面再专门进行分析,今天就不再深入了。

分析到这里,终于可以回答 业务接口如何注册到soul admin 中? 这个问题了。在业务系统中,将想要被soul网关代理的接口,加上@SoulSpringMvcClient注解,当系统启动时,将接口信息通过多线程的方式(基于httpPOST请求)发送到soul-admin后台管理系统中。在soul-admin中,一方面将接口信息保存到自己的数据库,另一方面发布接口信息事件到soul网关,这样soul网关就知道哪些接口可以被代理,哪些接口直接跳过。再用图片描述一下处理过程:

Divide 插件的原理是什么?

在上面的分析中明白了插件的加载过程和接口信息的注册过程,现在来解决最后一个问题:divide插件的执行原理。

由前面的分析文章可以知道,网关的所有请求最终都会来到 SoulWebHandler 进行处理。handle()相当于请求入口,参数 ServerWebExchange携带了请求信息。在这个方法里面,创建了一个对象保存所有插件,并将请求交给了之前插件的线程池中。

public final class SoulWebHandler implements WebHandler {   
	//...
    @Override
    public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {
        return new DefaultSoulPluginChain(plugins).execute(exchange).subscribeOn(scheduler);
    }
}

DefaultSoulPluginChain这个类中的execute()方法采用了责任链的设计模式,依次处理所有插件,所以DividePlugin会在这里被执行。


    private static class DefaultSoulPluginChain implements SoulPluginChain {
        //...
        @Override
        public Mono<Void> execute(final ServerWebExchange exchange) {
            return Mono.defer(() -> {
                if (this.index < plugins.size()) {
                    SoulPlugin plugin = plugins.get(this.index++);
                    Boolean skip = plugin.skip(exchange);
                    if (skip) {
                        return this.execute(exchange);
                    }
                    return plugin.execute(exchange, this);
                }
                return Mono.empty();
            });
        }
    }
}

DividePlugin继承了AbstractSoulPlugin,所以会执行plugin.execute()方法,这方法是许多插件的共有方法,也就是模板方法。插件类的继承关系使用了模板方法设计模式,共有方法是execute(),抽象方法是doExecute()

共有方法execute()主要作用是:匹配插件,匹配选择器,匹配规则。有一个没有匹配上,就去处理下一个插件。soul网关的内存中保存了插件,选择器和规则信息,这些信息的实时更新是从soul-admin发布事件同步过来的。

  • 匹配插件:从soul网关的内存中获取插件信息,判断插件是否存在,是否被启用 ,如果没有找到就执行下一个插件。
  • 匹配选择器:从soul网关的内存中获取选择器信息,判断选择器信息能否匹配成功,如果没有匹配上就执行下一个插件。
  • 匹配规则:从soul网关的内存中获取规则信息,判断规则信息能否匹配成功,如果没有匹配上就执行下一个插件。
public abstract class AbstractSoulPlugin implements SoulPlugin {
    //..
        protected abstract Mono<Void> doExecute(ServerWebExchange exchange, SoulPluginChain chain, SelectorData selector, RuleData rule);

   //...
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
        String pluginName = named();
       //从soul网关的内存中获取插件信息
        final PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
      //插件是否存在,是否被启用    
    if (pluginData != null && pluginData.getEnabled()) {
          //从soul网关的内存中获取选择器信息
            final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);
           //如果选择器信息不存在,就处理下一个插件
            if (CollectionUtils.isEmpty(selectors)) {
                return handleSelectorIsNull(pluginName, exchange, chain);
            }
        //插件是否可以匹配上
            final SelectorData selectorData = matchSelector(exchange, selectors);
            if (Objects.isNull(selectorData)) {
                return handleSelectorIsNull(pluginName, exchange, chain);
            }
        //记录选择器日志
        selectorLog(selectorData, pluginName);
        
        //从soul网关的内存中获取规则信息
            final List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());
        //规则不存在,则执行下一个插件    
        if (CollectionUtils.isEmpty(rules)) {
                return handleRuleIsNull(pluginName, exchange, chain);
            }
            RuleData rule;
            if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {
                //get last
                rule = rules.get(rules.size() - 1);
            } else {
                //是否匹配规则
                rule = matchRule(exchange, rules);
            }
            if (Objects.isNull(rule)) {
                return handleRuleIsNull(pluginName, exchange, chain);
            }
        //记录规则日志
            ruleLog(rule, pluginName);
        //执行每个插件自己的执行逻辑
            return doExecute(exchange, chain, selectorData, rule);
        }
    //处理下一个插件
        return chain.execute(exchange);
    }
    //..
}

DividePlugin中的doExecute()方法的主要功能是:

  • 对业务系统实现负载均衡。soul网关自己实现了负载均衡,目前支持:轮询,随机,哈希三种方式。
  • 构建http请求,用于请求真正的http业务接口。但是执行http操作的功能是统一交给了WebClientPlugin
@Slf4j
public class DividePlugin extends AbstractSoulPlugin {

    @Override
    protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
       //...
        final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
        //实现负载均衡
        DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip);
        if (Objects.isNull(divideUpstream)) {
            log.error("divide has no upstream");
            Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
            return WebFluxResultUtils.result(exchange, error);
        }
        //构建http请求
        // set the http url
        String domain = buildDomain(divideUpstream);
        String realURL = buildRealURL(domain, soulContext, exchange);
        exchange.getAttributes().put(Constants.HTTP_URL, realURL);
        // set the http timeout
        exchange.getAttributes().put(Constants.HTTP_TIME_OUT, ruleHandle.getTimeout());
        exchange.getAttributes().put(Constants.HTTP_RETRY, ruleHandle.getRetry());
        return chain.execute(exchange);
    }
    //...
}

分析到这里就算是弄清楚了DividePlugin的执行原理了:匹配插件,匹配选择器,匹配规则,负载均衡,构建http请求。

至此,我们就分析完了开始提到的三个问题。弄清楚了插件加载过程,业务接口注册过程,Divide插件执行原理。

Soul源码阅读系列(二)Divide插件

今天体验的是Souldivide插件,它的主要作用是用于http的代理。在文章后面一节简单分析了divide插件的执行原理。

Divide插件使用案例

Soul官方在soul-examples模块提供了测试样例,其中的soul-examples-http模块演示的通http发起请求到soul网关,然后再到真实的服务。模块目录及配置信息如下:

soul.http是有关Soul的配置,adminUrlSoul的后台管理地址,port是业务系统的端口,contextPath是业务系统的请求路径。

在项目的pom文件中引入soul相关依赖,当前版本是2.2.1

<dependency>
    <groupId>org.dromara</groupId>
    <artifactId>soul-spring-boot-starter-client-springmvc</artifactId>
    <version>${soul.version}</version>
</dependency>

在需要被代理的接口上使用注解@SoulSpringMvcClient@SoulSpringMvcClient注解会把当前接口注册到soul网关中。使用方式如下:

如果其他接口也想被网关代理,使用方式是一样的,在@SoulSpringMvcClient注解中,指定path即可。

参考之前的文章,启动Soul AdminSoul BootstrapSoul的后台管理地址,是一个SpringBoot项目,只需要修改一下数据库的地址就可以运行了。项目会自动创建对应的库和表。项目启动后的登录地址是http://localhost:9095/,用户名是admin,密码是123456。后台界面如下:

最后运行SoulTestHttpApplication,启动soul-examples-http项目。

当三个系统(本身的业务系统,Soul后台管理系统Soul Admin,Soul核心网关Soul Bootstrap)都启动成功后,就能够使用divide插件了。

发起一个Get请求: http://localhost:8188/order/findById?id=99 
得到的响应结果:
{
  "id": "99",
  "name": "hello world findById"
}

上面就是一个普通的http请求,直接请求业务系统的后端服务,现在通过Soul网关来访问该服务。

同样发起一个Get请求:http://localhost:9195/http/order/findById?id=99
得到的响应结果:
{
  "id": "99",
  "name": "hello world findById"
}

这个localhost:9195地址就是网关的地址,/http是业务系统在网关中的名称。那么,现在的请求就是先通过Soul网关,再由网关转发到实际的请求接口。

通过后台管理系统可以发现:主要模块有插件列表和系统管理,在插件列表中可以对各个插件进行管理,每个插件都可以添加多个选择器,每个选择器都可以添加多条规则。实际这就是Soul拦截URL后的匹配规则:插件->选择器->规则,这个后面再细说。

以上就是Soul作为一个网关起到转发的作用,这个功能模块对应的插件是divide插件。接下来,我们跟踪一下divide插件的源码,看看它的执行原理。

Divide插件执行原理

当我们第一次接触时,可能不知道它的执行逻辑在源代码的哪个位置,那怎么办呢?

答案是 ,如何猜测呢?我们想要查看的是divide插件,那就去插件模块soul-plugin看看。然后再找找有没有跟divide有关的,发现有一个soul-plugin-divide。进入这个模块里面,有一个DividePlugin类,它有doExecute()方法,那我们也能猜测它可能就是divide插件的执行逻辑。

有了上面的猜想,我们还需要进行验证,看看对不对,在doExecute()方法加上断点进行debug调试。将soul-bootstrap项目以debug模式进行重启,然后发起请求:

http://localhost:9195/http/order/findById?id=99

成功发起请求后,执行逻辑会在我们打断点的地方停住,那么证明我们的猜想是正确的(在这里可以再次体会到命名的重要性)。这个时候要注意观察IDEA编辑器提供的方法调用栈信息:

方法调用栈里面有很多方法,但是前面四个是soul中的方法调用,后面的与reactor编程模型有关,先暂时忽略它。在前面四个方法中,调用关系如下:

  • SoulWebHandler:它实现了WebHandler,重写了handle()方法,用于处理Soul网关中所有的请求。
  • DefaultSoulPluginChain:插件链执行类,以责任链的设计模式处理所有插件。
  • AbstractSoulPlugin:多个插件的父类,以模板方法设计模式实现各种插件类型。
  • DividePlugindivide插件,用于处理http请求。

分析到这里,就能够清楚的看到Soul网关处理一个http请求的过程,具体实现就在以上四个类及对应的方法中。实际的代码解析将在下一篇文章中进行分析,因为比较多,做好准备~

到这里,本篇文章就结束了,小结一下:本篇文章通过一个案例演示了http请求怎么接入到Soul网关中,以及Divide插件的执行原理。