2021-6-25-Soul源码阅读系列(十三)Resilience4j插件

本篇文章分析的是Resilience4j插件,Resilience4JSpring Cloud Gateway推荐的容错方案,它是一个轻量级的容错库。它可以提供熔断和限流的功能。

操作前准备:启动shenyu-adminshenyu网关,shenyu-examples-http测试用例。

Soul 网关最近换名字了,新的名字叫ShenYu,所以文章中可能出现书写不一致的地方。

Resilience4j 功能演示

要在shenyu网关使用Resilience4j插件,需要引入依赖:

        <!-- shenyu resilience4j plugin start-->
        <dependency>
            <groupId>org.dromara</groupId>
            <artifactId>shenyu-spring-boot-starter-plugin-resilience4j</artifactId>
            <version>${project.version}</version>
        </dependency>
        <!-- shenyu resilience4j plugin end-->

然后需要在shenyu-admin中依次开启插件,添加选择器,添加规则:

规则字段解析:

  • limitRefreshPeriod: 刷新令牌的时间间隔,单位ms,默认值:500。
  • limitForPeriod: 每次刷新令牌的数量,默认值:50。
  • timeoutDurationRate: 等待获取令牌的超时时间,单位ms,默认值:5000。
  • circuitEnable: 是否开启熔断,0:关闭,1:开启,默认值:0。
  • timeoutDuration: 熔断超时时间,单位ms,默认值:30000。
  • fallbackUri: 降级处理的uri
  • slidingWindowSize: 滑动窗口大小,默认值:100。
  • slidingWindowType: 滑动窗口类型,0:基于计数,1:基于时间,默认值:0。
  • minimumNumberOfCalls: 开启熔断的最小请求数,超过这个请求数才开启熔断统计,默认值:100。
  • waitIntervalInOpen: 熔断器开启持续时间,单位ms,默认值:10。
  • bufferSizeInHalfOpen: 半开状态下的环形缓冲区大小,必须达到此数量才会计算失败率,默认值:10。
  • failureRateThreshold:错误率百分比,达到这个阈值,熔断器才会开启,默认值50。

上述是默认参数,在插件中还有参数校验逻辑,如果参数值小于默认值,会直接赋值默认值,因此方便测试效果直接修改源码的配置 : 每次刷新令牌的数量为2 ,刷新令牌的时间间隔为1s,超时时间为1s

public void checkData(final Resilience4JHandle resilience4JHandle) {
        resilience4JHandle.setTimeoutDurationRate(Math.max(resilience4JHandle.getTimeoutDurationRate(), Constants.TIMEOUT_DURATION_RATE));

        // 刷新令牌的时间间隔为1s
        resilience4JHandle.setLimitRefreshPeriod(1000);
        // 每次刷新令牌的数量为2
        resilience4JHandle.setLimitForPeriod(2);

        resilience4JHandle.setCircuitEnable(Math.max(resilience4JHandle.getCircuitEnable(), Constants.CIRCUIT_ENABLE));

        // 超时时间为1s
        resilience4JHandle.setTimeoutDuration(1000);

        resilience4JHandle.setFallbackUri(!"0".equals(resilience4JHandle.getFallbackUri()) ? resilience4JHandle.getFallbackUri() : "");
        resilience4JHandle.setSlidingWindowSize(Math.max(resilience4JHandle.getSlidingWindowSize(), Constants.SLIDING_WINDOW_SIZE));
        resilience4JHandle.setSlidingWindowType(Math.max(resilience4JHandle.getSlidingWindowType(), Constants.SLIDING_WINDOW_TYPE));
        resilience4JHandle.setMinimumNumberOfCalls(Math.max(resilience4JHandle.getMinimumNumberOfCalls(), Constants.MINIMUM_NUMBER_OF_CALLS));
        resilience4JHandle.setWaitIntervalFunctionInOpenState(Math.max(resilience4JHandle.getWaitIntervalFunctionInOpenState(), Constants.WAIT_INTERVAL_FUNCTION_IN_OPEN_STATE));
        resilience4JHandle.setPermittedNumberOfCallsInHalfOpenState(Math.max(resilience4JHandle.getPermittedNumberOfCallsInHalfOpenState(), Constants.PERMITTED_NUMBER_OF_CALLS_IN_HALF_OPEN_STATE));
        resilience4JHandle.setFailureRateThreshold(Math.max(resilience4JHandle.getFailureRateThreshold(), Constants.FAILURE_RATE_THRESHOLD));
    }

在要被测试的方法中加上日志信息:

    @GetMapping("/findById")
    @ShenyuSpringMvcClient(path = "/findById", desc = "Find by id")
    public OrderDTO findById(@RequestParam("id") final String id) throws InterruptedException {
        OrderDTO orderDTO = new OrderDTO();
        orderDTO.setId(id);
        orderDTO.setName("hello world findById");

        log.info("限流测试");

        return orderDTO;
    }

限流测试

上述准备工作完成后,就能进行测试了。根据配置:每次刷新令牌的数量为2 ,刷新令牌的时间间隔为1s,超时时间为1s。在1s内只有两个线程能够拿到令牌,可以通过,这样就达到了限流的作用。

使用SuperBenchmarker工具,4个线程,执行10s

C:\Users>sb -u http://localhost:9195/http/order/findById?id=2 -c 4 -N 10
Starting at 2021/6/25 12:21:16
[Press C to stop the test]
24      (RPS: 1.5)
---------------Finished!----------------
Finished at 2021/6/25 12:21:33 (took 00:00:16.4476099)
26      (RPS: 1.6)                      Status 200:    26

RPS: 2.2 (requests/second)
Max: 2005ms
Min: 382ms
Avg: 1716.3ms

  50%   below 1997ms
  60%   below 1998ms
  70%   below 1998ms
  80%   below 1999ms
  90%   below 2001ms
  95%   below 2003ms
  98%   below 2005ms
  99%   below 2005ms
99.9%   below 2005ms

可以看到结果信息中RPS: 2.2 (requests/second),表示每秒请求数量是2.2

然后,日志中也能看到,在同一秒内,只有两个请求:

2021-06-25 12:21:30.020  INFO 35060 --- [ctor-http-nio-5] o.a.s.e.http.controller.OrderController  : 限流测试
2021-06-25 12:21:30.020  INFO 35060 --- [ctor-http-nio-4] o.a.s.e.http.controller.OrderController  : 限流测试
2021-06-25 12:21:31.022  INFO 35060 --- [ctor-http-nio-5] o.a.s.e.http.controller.OrderController  : 限流测试
2021-06-25 12:21:31.023  INFO 35060 --- [ctor-http-nio-4] o.a.s.e.http.controller.OrderController  : 限流测试
2021-06-25 12:21:32.019  INFO 35060 --- [ctor-http-nio-5] o.a.s.e.http.controller.OrderController  : 限流测试
2021-06-25 12:21:32.019  INFO 35060 --- [ctor-http-nio-4] o.a.s.e.http.controller.OrderController  : 限流测试
2021-06-25 12:21:33.019  INFO 35060 --- [ctor-http-nio-4] o.a.s.e.http.controller.OrderController  : 限流测试
2021-06-25 12:21:33.019  INFO 35060 --- [ctor-http-nio-5] o.a.s.e.http.controller.OrderController  : 限流测试

熔断测试

从配置信息我们知道熔断器默认是关闭,我们需要手动打开,即将参数circuitEnable的值设置为1

Resilience4JHandle#checkData()手动设置超时时间为1s:

        // 超时时间为1s
        resilience4JHandle.setTimeoutDuration(1000);

在被测试的方法中加上线程休眠时间:

    @GetMapping("/findById")
    @ShenyuSpringMvcClient(path = "/findById", desc = "Find by id")
    public OrderDTO findById(@RequestParam("id") final String id) throws InterruptedException {
        OrderDTO orderDTO = new OrderDTO();
        orderDTO.setId(id);
        orderDTO.setName("hello world findById");

        log.info("限流测试");

        int i = RandomUtils.nextInt(1,3);
        if(i %2 == 0){
            Thread.sleep(2000);
        }

        return orderDTO;
    } 

通过postman多次发送请求,就会发生因超时出现的熔断。

Resilience4J原理分析

Resilience4JPlugin 继承于模板抽象类 AbstractSoulPlugin,所以 doExecutor 是其执行真正功能逻辑的代码:

    protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
		//省略了其他代码
        //是否开启熔断
        if (resilience4JHandle.getCircuitEnable() == 1) {
            return combined(exchange, chain, rule);
        }
        //限流处理
        return rateLimiter(exchange, chain, rule);
    }

主要功能是判断是否开启熔断,如果是,就进入组合模式(即处理熔断又处理限流);否则,只做限流处理。

  • 组合模式
    private Mono<Void> combined(final ServerWebExchange exchange, final ShenyuPluginChain chain, final RuleData rule) {
        //配置信息
        Resilience4JConf conf = Resilience4JBuilder.build(rule);
        return combinedExecutor.run(
                chain.execute(exchange).doOnSuccess(v -> { //执行成功后的处理逻辑
                    HttpStatus status = exchange.getResponse().getStatusCode();
                    if (status == null || !status.is2xxSuccessful()) {
                        exchange.getResponse().setStatusCode(null);
                        throw new CircuitBreakerStatusCodeException(status == null ? HttpStatus.INTERNAL_SERVER_ERROR : status);
                    }
                }), fallback(combinedExecutor, exchange, conf.getFallBackUri()),  // 降级处理
            conf); //配置信息
    }

combinedExecutor.run()方法负责执行真实的逻辑:先执行熔断的功能,后执行限流的功能。再处理超时,错误信息,最后进行降级处理。

    public <T> Mono<T> run(final Mono<T> run, final Function<Throwable, Mono<T>> fallback, final Resilience4JConf resilience4JConf) {

        //链式调用
        Mono<T> to = run.transformDeferred(CircuitBreakerOperator.of(circuitBreaker))//先处理熔断
                .transformDeferred(RateLimiterOperator.of(rateLimiter))//后限流
                .timeout(resilience4JConf.getTimeLimiterConfig().getTimeoutDuration())//超时处理
                .doOnError(/*...*/);//错误处理
        if (fallback != null) {
            to = to.onErrorResume(fallback);//降级处理
        }
        return to;
    }
  • 限流模式
    private Mono<Void> rateLimiter(final ServerWebExchange exchange, final SoulPluginChain chain, final RuleData rule) {
        return ratelimiterExecutor.run(
                chain.execute(exchange),  //责任链继续执行
            fallback(ratelimiterExecutor, exchange, null), //降级处理
            Resilience4JBuilder.build(rule))//规则配置
                .onErrorResume(throwable -> ratelimiterExecutor.withoutFallback(exchange, throwable));
    }

限流的功能主要在ratelimiterExecutor.run()中,负责生成限流器并执行。

    @Override
    public <T> Mono<T> run(final Mono<T> toRun, final Function<Throwable, Mono<T>> fallback, final Resilience4JConf conf) {
        //限流器
        RateLimiter rateLimiter = Resilience4JRegistryFactory.rateLimiter(conf.getId(), conf.getRateLimiterConfig());
        Mono<T> to = toRun.transformDeferred(RateLimiterOperator.of(rateLimiter));//执行限流功能
        if (fallback != null) { //降级
            return to.onErrorResume(fallback);
        }
        return to;
    }

小结,本文结合实际案例演示了Resilience4j插件,并分析了组合模式和限流模式的执行原理。

JVM堆内存分配

一张图展示 JVM 内存分配

记录下几个主要的参数设置:

  • -Xmx :堆内存最大值,默认为物理内存的 1/4-Xmx-XX:MaxHeapSize是等价的。
  • -Xms:堆内存初始值,默认为物理内存的 1/64,这里包括初始化和最小值的设置,如果想分别设置,可以通过参数 -XX:InitialHeapSize设置堆内存的初始值;可以通过参数 -XX:MinHeapSize 设置堆内存的最小值。
  • -Xmn:设置年轻代(或者叫Young区)的大小,这里包括初始化和最大值的设置,如果想分别设置,可以通过参数 -XX:NewSize设置Young区的初始值;可以通过参数 -XX:MaxNewSize 设置Young区的最大值。

一般建议将-Xmx-Xms设置成相同的值,以避免因内存分配造成的耗时。

运行一个简单的Spring Boot程序,不设置任何参数,通过命令 jps查看Java进程,再通过命令jmap -heap pid,查看当前应用堆内存分配状态信息。

> jmap -heap 16488
Attaching to process ID 16488, please wait...
Debugger attached successfully.
Server compiler detected.
JVM version is 25.251-b08

using thread-local object allocation.
Parallel GC with 8 thread(s)  // 并行GC

Heap Configuration:
   MinHeapFreeRatio         = 0
   MaxHeapFreeRatio         = 100
   MaxHeapSize              = 4250927104 (4054.0MB)  // 最大堆内存
   NewSize                  = 88604672 (84.5MB)      // Young区初始值
   MaxNewSize               = 1416626176 (1351.0MB)  // Young区最大值
   OldSize                  = 177733632 (169.5MB)    // Old区
   NewRatio                 = 2
   SurvivorRatio            = 8
   MetaspaceSize            = 21807104 (20.796875MB)
   CompressedClassSpaceSize = 1073741824 (1024.0MB)
   MaxMetaspaceSize         = 17592186044415 MB
   G1HeapRegionSize         = 0 (0.0MB)

Heap Usage:
PS Young Generation // Young区使用情况
Eden Space:
   capacity = 159907840 (152.5MB)
   used     = 39639584 (37.803253173828125MB)
   free     = 120268256 (114.69674682617188MB)
   24.789018474641395% used
From Space:
   capacity = 12582912 (12.0MB)
   used     = 0 (0.0MB)
   free     = 12582912 (12.0MB)
   0.0% used
To Space:
   capacity = 12058624 (11.5MB)
   used     = 0 (0.0MB)
   free     = 12058624 (11.5MB)
   0.0% used
PS Old Generation  // Old区使用情况
   capacity = 155189248 (148.0MB)
   used     = 12011296 (11.454864501953125MB)
   free     = 143177952 (136.54513549804688MB)
   7.73977331213049% used

13830 interned Strings occupying 1314552 bytes.

从上图,可以看出 OldSize / NewSize = 169.5 / 84.5 = 2OldSize + NewSize = 169.5 + 84.5 = 256,这个约为物理内存的1/64,也就是-Xms的大小。

自定义类加载器

假设,现在有一个Hello.xlass 文件,里面有一个hello()方法,但是此文件内容是一个所有字节(x=255-x)被处理后的文件,那么你应该如何正确读取这个文件呢?

这里就需要自定义类加载器,来加载这个文件了。

首先,我们还是看一下Java的类加载过程。

类的生命周期 1.加载:找Class文件 2.验证:验证格式和依赖 3.准备:为类变量(static修饰的变量)分配内存并设置初始零值。注意,此时实例变量还没有分配内存。 4.解析:符号解析为引用 5.初始化:构造器,实例变量分配内存并赋值,静态变量赋值,静态代码块 6.使用 7.卸载

三类加载器 启动类加载器 扩展类加载器 应用类加载器

加载器的特点:双亲委派;负责依赖;缓存加载。

对于双亲委派模型:一个类加载器在进行加载时,不会自己去尝试加载这个类。先看看有没有加载过这个类,然后将这个请求委派给父类加载器去完成,只有当父加载器反馈自己无法完成加载请求时,子加载器才会自己去尝试加载这个类。

这样做的好处是,防止同名的类出现混乱,也能提高安全性。举个例子,比如java.lang.Object这个类,无论哪个类加载器加载时,最终都会委派给Bootstrap加载器去加载,这就保证了整个系统运行过程中的Object都是同一个类。否则,如果用户自己编写了一个java.lang.Object类,并放在程序的classpath中,最终系统将会出现多个不同的Object类,整个Java体系就变得一团混乱了。

在实现自己的ClassLoader之前,先看一下JDK中的ClassLoader是怎么实现的:

protected Class<?> loadClass(String name, boolean resolve)
        throws ClassNotFoundException
    {
        synchronized (getClassLoadingLock(name)) {
            // 1. 首先,检查是否被加载过了
            // First, check if the class has already been loaded
            Class<?> c = findLoadedClass(name);
            if (c == null) {
                long t0 = System.nanoTime();
                try {
                    if (parent != null) {
                        // 2. 没有加载过,就交给父类去加载
                        c = parent.loadClass(name, false);
                    } else {
                        c = findBootstrapClassOrNull(name);
                    }
                } catch (ClassNotFoundException e) {
                    // ClassNotFoundException thrown if class not found
                    // from the non-null parent class loader
                }

                if (c == null) {
                    // If still not found, then invoke findClass in order
                    // to find the class.
                    long t1 = System.nanoTime();
                    // 3. 还是没有加载到,才交给自己去加载
                    c = findClass(name);

                    // this is the defining class loader; record the stats
                    sun.misc.PerfCounter.getParentDelegationTime().addTime(t1 - t0);
                    sun.misc.PerfCounter.getFindClassTime().addElapsedTimeFrom(t1);
                    sun.misc.PerfCounter.getFindClasses().increment();
                }
            }
            if (resolve) {
                resolveClass(c);
            }
            return c;
        }
    }

上面代码主要逻辑是:

1、首先查找.class是否被加载过。

2、如果.class文件没有被加载过,那么会去找加载器的父加载器。如果父加载器不是null,那么就执行父加载器的loadClass方法,把类加载请求一直向上抛,直到父加载器为null(这个时候就到了Bootstrap ClassLoader)为止。

3、如果父加载器没有成功,就交给子类加载器去加载。

看一下findClass这个方法:

    protected Class<?> findClass(String name) throws ClassNotFoundException {
        throw new ClassNotFoundException(name);
    }

没有任何实现,直接抛出了一个异常,而且是protected的,所以:这个方法就是用于继承后,需要重写

从上面的分析可以知道:

1、如果不想打破双亲委派模型,那么只需要重写findClass方法即可。

2、如果想打破双亲委派模型,那么就重写整个loadClass方法。

自定义的类加载器如下:

  • 重写findClass方法;
  • 获取字节数组;
  • 根据要求处理字节数组;
  • 使用新的字节数组定义类。

import java.nio.file.Files;
import java.nio.file.Paths;

/**
 * 自定义类加载器
 * <p>
 * 参考资料:
 * https://www.cnblogs.com/xrq730/p/4847337.html
 * https://segmentfault.com/a/1190000012925715
 * https://github.com/sodawy/JAVA-000/tree/main/Week_01
 */
public class MyClassLoader extends ClassLoader {
    public static final byte DIGITAL_255 = (byte) 255;
    private String filePath;

    public MyClassLoader(String filePath) {
        this.filePath = filePath;
    }

    //重写该方法
    protected Class<?> findClass(String name) throws ClassNotFoundException {
        try {
            byte[] bytes = getClassBytes(filePath);

            // 根据要求处理字节码
            byte[] deBytes = handleByte(bytes);

            // 使用新的字节数组定义类
            return defineClass(name, deBytes, 0, bytes.length);
        } catch (Exception e) {
            e.printStackTrace();
        }

        return super.findClass(name);
    }

    /**
     * 根据自定义的要求处理字节码
     *
     * @param oldBytes 原来的字节数组
     * @return 放回处理后的数组
     */
    private byte[] handleByte(byte[] oldBytes) {
        byte[] newBytes = new byte[oldBytes.length];

        for (int i = 0; i < oldBytes.length; i++) {
            newBytes[i] = (byte) (DIGITAL_255 - oldBytes[i]);
        }
        return newBytes;
    }

    /**
     * 获取字节数组
     * @param filePath class文件路径
     * @return 字节数组
     * @throws Exception
     */
    private byte[] getClassBytes(String filePath) throws Exception {
        return Files.readAllBytes(Paths.get(filePath));
    }
}

自定义类加载器写完后,进行测试:


import org.junit.Test;

import java.lang.reflect.Method;

public class TestMyClassLoader {

    @Test
    public void test() throws Exception {
        String filePath = "D:\\Hello\\Hello.xlass";
        //使用自定义类加载器加载类
        MyClassLoader myClassLoader = new MyClassLoader(filePath);
        Class<?> clazz = myClassLoader.loadClass("Hello");

        //实例化对象
        Object obj = clazz.newInstance();
        //获取声明的方法
        Method method = clazz.getDeclaredMethod("hello");
        //方法调用
        method.invoke(obj);

        System.out.println(obj);
        System.out.println(obj.getClass().getClassLoader());
    }
}

至此,我们就完成了如何自定义类加载器。

自定义类加载器的应用
  • Tomcat上可以部署多个不同的应用,但是它们可以使用同一份类库的不同版本。
  • 对于非.class的文件,需要转为Java类,就需要自定义类加载器。
  • 加密解密。