使用Java字节码分析四则运算

下面展示了一个简单的 Java类,进行了四则运算。


public class Hello{

	public static void main(String[] args){
		int a = 1;
		int b = 2 + a;
		int c = 3 * b;
		int d = c - a;
		float e = d / 2f;
		
		System.out.println(e);
	}
}

通过命令 javac -g Hello.java 进行编译,然后通过命令 java Hello 运行编译后的代码,得到结果是 4.0

接着通过命令 javap -c -verbose Hello 进行反编译,输出结果如下:

Classfile /D:/Hello.class
  Last modified 2021-6-19; size 604 bytes
  MD5 checksum 2529ef11eea3e7947cfc9553f36bda77
  Compiled from "Hello.java"
public class Hello
  minor version: 0
  major version: 52
  flags: ACC_PUBLIC, ACC_SUPER
Constant pool:
   #1 = Methodref          #5.#26         // java/lang/Object."<init>":()V
   #2 = Fieldref           #27.#28        // java/lang/System.out:Ljava/io/PrintStream;
   #3 = Methodref          #29.#30        // java/io/PrintStream.println:(F)V
   #4 = Class              #31            // Hello
   #5 = Class              #32            // java/lang/Object
   #6 = Utf8               <init>
   #7 = Utf8               ()V
   #8 = Utf8               Code
   #9 = Utf8               LineNumberTable
  #10 = Utf8               LocalVariableTable
  #11 = Utf8               this
  #12 = Utf8               LHello;
  #13 = Utf8               main
  #14 = Utf8               ([Ljava/lang/String;)V
  #15 = Utf8               args
  #16 = Utf8               [Ljava/lang/String;
  #17 = Utf8               a
  #18 = Utf8               I
  #19 = Utf8               b
  #20 = Utf8               c
  #21 = Utf8               d
  #22 = Utf8               e
  #23 = Utf8               F
  #24 = Utf8               SourceFile
  #25 = Utf8               Hello.java
  #26 = NameAndType        #6:#7          // "<init>":()V
  #27 = Class              #33            // java/lang/System
  #28 = NameAndType        #34:#35        // out:Ljava/io/PrintStream;
  #29 = Class              #36            // java/io/PrintStream
  #30 = NameAndType        #37:#38        // println:(F)V
  #31 = Utf8               Hello
  #32 = Utf8               java/lang/Object
  #33 = Utf8               java/lang/System
  #34 = Utf8               out
  #35 = Utf8               Ljava/io/PrintStream;
  #36 = Utf8               java/io/PrintStream
  #37 = Utf8               println
  #38 = Utf8               (F)V
{
  public Hello();
    descriptor: ()V
    flags: ACC_PUBLIC
    Code:
      stack=1, locals=1, args_size=1
         0: aload_0
         1: invokespecial #1                  // Method java/lang/Object."<init>":()V
         4: return
      LineNumberTable:
        line 2: 0
      LocalVariableTable:
        Start  Length  Slot  Name   Signature
            0       5     0  this   LHello;

  public static void main(java.lang.String[]);
    descriptor: ([Ljava/lang/String;)V
    flags: ACC_PUBLIC, ACC_STATIC
    Code:
      stack=2, locals=6, args_size=1
         0: iconst_1
         1: istore_1
         2: iconst_2
         3: iload_1
         4: iadd
         5: istore_2
         6: iconst_3
         7: iload_2
         8: imul
         9: istore_3
        10: iload_3
        11: iload_1
        12: isub
        13: istore        4
        15: iload         4
        17: i2f
        18: fconst_2
        19: fdiv
        20: fstore        5
        22: getstatic     #2                  // Field java/lang/System.out:Ljava/io/PrintStream;
        25: fload         5
        27: invokevirtual #3                  // Method java/io/PrintStream.println:(F)V
        30: return
      LineNumberTable:
        line 5: 0
        line 6: 2
        line 7: 6
        line 8: 10
        line 9: 15
        line 11: 22
        line 12: 30
      LocalVariableTable:
        Start  Length  Slot  Name   Signature
            0      31     0  args   [Ljava/lang/String;
            2      29     1     a   I
            6      25     2     b   I
           10      21     3     c   I
           15      16     4     d   I
           22       9     5     e   F

在编译时加上了 -g 参数,是为了生成局部变量表 LocalVariableTable ;在反编译时加上了 -verbose 是为了输出附件信息。 在上面的例子中反编译输出的结果主要包括:类的来源,校验和,版本号,常量池,构造函数,main函数。

尝试对其中的信息进行分析:

Classfile /D:/Hello.class   //描述了文件来源
  Last modified 2021-6-19; size 604 bytes //修改信息,文件大小
  MD5 checksum 2529ef11eea3e7947cfc9553f36bda77    // MD5 校验和
  Compiled from "Hello.java"  // 对哪个类进行反编译
public class Hello
  minor version: 0
  major version: 52   // java 版本号
  flags: ACC_PUBLIC, ACC_SUPER  // 该类是 public
Constant pool:  // 常量池
   #1 = Methodref          #5.#26         // java/lang/Object."<init>":()V
   #2 = Fieldref           #27.#28        // java/lang/System.out:Ljava/io/PrintStream;
   #3 = Methodref          #29.#30        // java/io/PrintStream.println:(F)V
   #4 = Class              #31            // Hello
   #5 = Class              #32            // java/lang/Object
   #6 = Utf8               <init>
   #7 = Utf8               ()V
   #8 = Utf8               Code
   #9 = Utf8               LineNumberTable
  #10 = Utf8               LocalVariableTable
  #11 = Utf8               this
  #12 = Utf8               LHello;
  #13 = Utf8               main
  #14 = Utf8               ([Ljava/lang/String;)V
  #15 = Utf8               args
  #16 = Utf8               [Ljava/lang/String;
  #17 = Utf8               a
  #18 = Utf8               I
  #19 = Utf8               b
  #20 = Utf8               c
  #21 = Utf8               d
  #22 = Utf8               e
  #23 = Utf8               F
  #24 = Utf8               SourceFile
  #25 = Utf8               Hello.java
  #26 = NameAndType        #6:#7          // "<init>":()V
  #27 = Class              #33            // java/lang/System
  #28 = NameAndType        #34:#35        // out:Ljava/io/PrintStream;
  #29 = Class              #36            // java/io/PrintStream
  #30 = NameAndType        #37:#38        // println:(F)V
  #31 = Utf8               Hello
  #32 = Utf8               java/lang/Object
  #33 = Utf8               java/lang/System
  #34 = Utf8               out
  #35 = Utf8               Ljava/io/PrintStream;
  #36 = Utf8               java/io/PrintStream
  #37 = Utf8               println
  #38 = Utf8               (F)V
{
  public Hello(); //构造器
    descriptor: ()V
    flags: ACC_PUBLIC
    Code:
      stack=1, locals=1, args_size=1 //构造器函数 使用的栈深度是1,用于存放this指针
         0: aload_0
         1: invokespecial #1                  // Method java/lang/Object."<init>":()V 
         4: return
      LineNumberTable: //行号表:java源文件行号与字节码文件偏移量之间的对应关系
        line 2: 0
      LocalVariableTable: // 局部变量表:在一个方法中用到的变量
        Start  Length  Slot  Name   Signature
            0       5     0  this   LHello;

  public static void main(java.lang.String[]); // main 方法
    descriptor: ([Ljava/lang/String;)V  //方法描述:是一个String对象的数组
    flags: ACC_PUBLIC, ACC_STATIC   // 方法是 public static
    Code:
      stack=2, locals=6, args_size=1 // 当前方法栈深度是2,有6个变量,1个入参
         0: iconst_1  // 常量值1放到栈
         1: istore_1  // 将栈顶值放到 局部变量表中的1号槽位
         2: iconst_2  // 常量值2放到栈
         3: iload_1   // 将局部变量表中的1号槽位的值放到栈顶
         4: iadd      //执行一次加法操作
         5: istore_2  //将栈顶值放到 局部变量表中的2号槽位
         6: iconst_3  // 常量值3放到栈
         7: iload_2    // 将局部变量表中的2号槽位的值放到栈顶
         8: imul       //执行一次乘法操作
         9: istore_3   // 将栈顶值放到 局部变量表中的3号槽位
        10: iload_3    // 将局部变量表中的3号槽位的值放到栈顶
        11: iload_1    // 将局部变量表中的1号槽位的值放到栈顶
        12: isub       //执行一次减法操作
        13: istore        4   // 将栈顶一个int类型的值放到 局部变量表中的4号槽位
        15: iload         4   // 将局部变量表中的4号槽位的值放到栈顶
        17: i2f      //int 类型转换成 float类型
        18: fconst_2   // 一个float类型的常量值2放到栈
        19: fdiv       //执行一次除法操作
        20: fstore        5  // 将栈顶一个float类型的值放到 局部变量表中的5号槽位
        22: getstatic     #2                  // Field java/lang/System.out:Ljava/io/PrintStream;
        25: fload         5
        27: invokevirtual #3                  // Method java/io/PrintStream.println:(F)V //方法调用
        30: return      //返回
      LineNumberTable: //行号表:java源文件行号与字节码文件偏移量之间的对应关系
        line 5: 0     // 源文件中表示 int a = 1;
        line 6: 2
        line 7: 6
        line 8: 10
        line 9: 15
        line 11: 22
        line 12: 30
      LocalVariableTable: // 局部变量表:在一个方法中用到的变量
        Start  Length  Slot  Name   Signature
            0      31     0  args   [Ljava/lang/String; // 0号槽位对应变量args, 类型是Stirng[]
            2      29     1     a   I    // 1号槽位对应变量a, 类型是Integer
            6      25     2     b   I    // 2号槽位对应变量b, 类型是Integer
           10      21     3     c   I    // 3号槽位对应变量c, 类型是Integer
           15      16     4     d   I    // 4号槽位对应变量d, 类型是Integer
           22       9     5     e   F    // 5号槽位对应变量f, 类型是Float

在上面对其中的信息进行了注释,相信能够看的明白各个助记符的含义。

文章到这里,都还没有介绍 Java字节码,因为我想先通过实际的用例来说明Java字节码的含义。Java bytecode 由单字节( byte )的指令组成, 理论上最多支持 256 个操作码(opcode)。实际上Java只使用了200左右的操作码, 还有一些操作码则保留给调试操作。

操作码, 或者称为 指令 ,主要由 类型前缀 操作名称 两部分组成。 例如, i 前缀代表 integer ,所以,iadd 表示对整数执行加法运算。i2f 表示 int类型转换成 float类型。 fdiv 表示对浮点数执行除法运算。

根据指令的性质,主要分为四个大类:

  • 栈操作指令,包括与局部变量交互的指令
  • 程序流程控制指令
  • 对象操作指令,包括方法调用指令
  • 算术运算以及类型转换指令

在上面给的例子中,可以看到四则运算的过程有很多的操作,因为JVM是一台基于栈的计算机器。每个线程都有一个独属于自己的线程栈(JVM stack), 用于存储 栈帧 (Frame)。每一次方法调用,JVM都会自动创建一个栈帧。 栈帧 由 操作数栈 , 局部变量表 以及一个 class指针 组成。 class指针 指向当前方法 在运行时常量池中对应的class。用一个图说明他们之间的关系。

操作数栈和局部变量表之间频繁使用的指令是 storeload

刚才提到了JVM是一台基于栈的计算机器,现在用一个简单的示例看一下计算过程。

public class Hello{

	public static void main(String[] args){
		int a = 4;
		int b = 5;
		int c = a + b;

		System.out.println(c);
	}
}

编译 :javac -g Hello.java

运行: java Hello

反编译: javap -c -verbose Hello (下面只是展示了main函数中的计算过程)

  public static void main(java.lang.String[]);
    descriptor: ([Ljava/lang/String;)V
    flags: ACC_PUBLIC, ACC_STATIC
    Code:
      stack=2, locals=4, args_size=1
         0: iconst_4
         1: istore_1
         2: iconst_5
         3: istore_2
         4: iload_1
         5: iload_2
         6: iadd
         7: istore_3
         8: getstatic     #2                  // Field java/lang/System.out:Ljava/io/PrintStream;
        11: iload_3
        12: invokevirtual #3                  // Method java/io/PrintStream.println:(I)V
        15: return
      LineNumberTable:
        line 5: 0
        line 6: 2
        line 7: 4
        line 9: 8
        line 10: 15
      LocalVariableTable:
        Start  Length  Slot  Name   Signature
            0      16     0  args   [Ljava/lang/String;
            2      14     1     a   I
            4      12     2     b   I
            8       8     3     c   I

对上面的指令步步解析:

首先是 ` 0: iconst_4 ,生成一个整数类型的常量值 4` 并放到栈顶。

然后是 ` 1: istore_1 ,将栈顶的整数值存储到局部变量表中1号槽位,即 a=4`。

接着是 ` 2: iconst_5 ,生成一个整数类型的常量值 5` 并放到栈顶。

然后是 ` 3: istore_2 ,将栈顶的整数值存储到局部变量表中2号槽位,即 b=4`。

接着是 ` 4: iload_1 ,将局部变量表中1`号槽位的值加载到栈顶。

接着是 ` 5: iload_2 ,将局部变量表中2`号槽位的值加载到栈顶。

接着是 ` 6: iadd` ,执行一次加法运算。

接着是 ` 7: istore_3 ,将栈顶的整数值存储到局部变量表中3号槽位,即 c=9`。

接着是 ` 8: getstatic ,获取静态字段,即此时执行了 System.out`。

接着是 ` 11: iload_3 ,将局部变量表中3`号槽位的值加载到栈顶。

接着 ` 12: invokevirtual ,执行方法调用,即此时执行了 out.println(c)`。

最后是 ` 15: return` ,方法返回。

最后,从上面的分析过程,我们还可以看到:一个简单的加法操作是需要三个指令才能完成的:int c = a + b

         4: iload_1
         5: iload_2
         6: iadd

Soul源码阅读系列(十二)基于Http长轮询的数据同步(二)

在上一篇文章中,通过跟踪源码的方式了解了http长轮询的执行流程。但是,自己还有一些疑问,本篇文章是在官网的基础上进行了拓展,加入一些自己的理解。

zookeeperwebsocket 数据同步的机制比较简单,而 http 同步会相对复杂一些。Soul 借鉴了 ApolloNacos 的设计思想,取其精华,自己实现了 http 长轮询数据同步功能。注意,这里并非传统的 ajax 长轮询!

http 长轮询机制如上所示,请求逻辑是Soul网关主动请求 soul-admin 的配置服务。响应逻辑有两种:soul-admin端本身的配置修改和60s的等待时间到了。

http 请求到达 sou-admin 之后,并非立马响应数据,而是利用 Servlet3.0 的异步机制,异步响应数据。首先,将长轮询请求任务 LongPollingClient 扔到阻塞队列 BlocingQueue 中,并且开启调度任务,每60s 执行一次,将队列中的请求拿出,发送对应的响应。如果没有发生配置信息的更改,也需要对请求响应,好让网关知道,不需要一直等待。当然,网关请求配置服务时,也有 90s 的超时时间。

class LongPollingClient implements Runnable {
    LongPollingClient(final AsyncContext ac, final String ip, final long timeoutTime) {
        // 省略......
    }
    @Override
    public void run() {
        // 加入定时任务,如果60s之内没有配置变更,则60s后执行,响应http请求
        this.asyncTimeoutFuture = scheduler.schedule(() -> {
            // clients是阻塞队列,保存了来自soul-web的请求信息
            clients.remove(LongPollingClient.this);
            List<ConfigGroupEnum> changedGroups = HttpLongPollingDataChangedListener.compareMD5((HttpServletRequest) asyncContext.getRequest());
            //发送响应
            sendResponse(changedGroups);
        }, timeoutTime, TimeUnit.MILLISECONDS);
        //放到阻塞队列中
        clients.add(this);
    }
}

如果这段时间内,soul-admin发生了数据信息的更改,此时,会挨个移除队列中的长轮询请求,并响应数据,告知是哪个 Group 的数据发生了变更(我们将插件、规则、流量配置、用户配置数据分成不同的组)。网关收到响应信息之后,只知道是哪个 Group 发生了配置变更,还需要再次请求该 Group 的配置数据。

// soul-admin发生了配置变更,挨个将队列中的请求移除,并予以响应
class DataChangeTask implements Runnable {
    DataChangeTask(final ConfigGroupEnum groupKey) {
        this.groupKey = groupKey;
    }
    @Override
    public void run() {
        try {
            //挨个处理阻塞队列中的请求
            for (Iterator<LongPollingClient> iter = clients.iterator(); iter.hasNext(); ) {
                LongPollingClient client = iter.next();
                //移除
                iter.remove();
                //响应
                client.sendResponse(Collections.singletonList(groupKey));
            }
        } catch (Throwable e) {
            LOGGER.error("data change error.", e);
        }
    }
}

soul-web 网关层接收到 http 响应信息之后,拉取变更信息(如果有变更的话),然后再次请求 soul-admin 的配置服务,如此反复循环。

长轮询体现在请求任务会一直执行。

class HttpLongPollingTask implements Runnable {

    	//省略其他代码

        @Override
        public void run() {
            //一直循环下去
            while (RUNNING.get()) {
                for (int time = 1; time <= retryTimes; time++) {
                    try {
                        doLongPolling(server);
                    } catch (Exception e) {
                        //一直循环下去
                    }
                }
            }
            log.warn("Stop http long polling.");
        }
    }

轮询:客户端每隔几秒钟向服务端发送 http 请求,服务端在收到请求后,不论是否有数据更新,都直接进行响应。在服务端响应完成,就会关闭这个 TCP 连接。这种方式实现非常简单,兼容性也比较好,只要支持 http 协议就可以用这种方式实现。缺点就是非常消耗资源,会占用较多的内存和带宽。

长轮询:客户端发送请求后服务器端不会立即返回数据,服务器端会阻塞,请求连接挂起,直到服务端有数据更新或者是连接超时才返回,客户端才再次发出请求新建连接、如此反复从而获取最新数据。相比轮询,长轮询减少了很多不必要的 http 请求次数,相比之下节约了资源。

Soul源码阅读系列(十一)基于Http长轮询的数据同步(一)

在上一篇文章中,跟踪了基于Nacos的数据同步原理,本篇文章将要跟踪基于Http长轮询的数据同步原理。

如果是 http 同步策略,soul-web 主动发起长轮询请求,默认有 90s 超时时间,如果 soul-admin 没有数据变更,则会阻塞 http 请求,如果有数据发生变更则响应变更的数据信息,如果超过 60s 仍然没有数据变更则响应空数据,网关层接到响应后,继续发起http请求,反复同样的请求。

同步的核心逻辑是:在soul-admin后台修改数据,先保存到数据库,然后保存到soul-admin的内存;在网关有定时任务执行,即发起长轮询,发起http请求到soul-admin去获取变更的数据。

本文的分析是想通过跟踪源码的方式来理解同步的核心逻辑,数据同步分析步骤如下:

  • 1.修改选择器
  • 2.更新数据
  • 3.接收数据
  • 4.使用更新后的数据
1. 修改选择器

在演示案例之前,将soul-admin的数据同步方式配置为http:

soul:
  database:
    dialect: mysql
    init_script: "META-INF/schema.sql"
    init_enable: true
  sync:
#    websocket:
#      enabled: true
#      zookeeper:
#          url: localhost:2181
#          sessionTimeout: 5000
#          connectionTimeout: 2000
      http:
        enabled: true

soul-bootstrap也配置一下数据同步方式为http:

soul :
    file:
      enabled: true
    corss:
      enabled: true
    dubbo :
      parameter: multi
    sync:
#        websocket :
#             urls: ws://localhost:9095/websocket

#        zookeeper:
#             url: localhost:2181
#             sessionTimeout: 5000
#             connectionTimeout: 2000
        http:
             url : http://localhost:9095

现在,我们以一个实际调用过程为例,比如在Soul网关管理系统中,对选择器的配置信息进行修改:查询条件中id=99才能匹配成功。具体信息如下所示:

点击确认后,进入到soul-adminupdateSelector()这个接口。

    @PutMapping("/{id}")
    public SoulAdminResult updateSelector(@PathVariable("id") final String id, @RequestBody final SelectorDTO selectorDTO) {
        Objects.requireNonNull(selectorDTO);
        selectorDTO.setId(id);
        Integer updateCount = selectorService.createOrUpdate(selectorDTO);
        return SoulAdminResult.success(SoulResultMessage.UPDATE_SUCCESS, updateCount);
    }

2.更新数据

进入到后端系统后,会现在数据中更新信息,然后通过publishEvent()方法将更新的信息同步到网关。(下面代码只是展示了主要的逻辑,完整的代码请参考Soul源码。)

 @Transactional(rollbackFor = RuntimeException.class)
    public int createOrUpdate(final SelectorDTO selectorDTO) {
        int selectorCount;
        SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);
		//将更新的数据保存到soul-admin的数据库
        
        //省略了其他代码
        
        //将更新的数据同步到网关
        publishEvent(selectorDO, selectorConditionDTOs);
        return selectorCount;
    }

publishEvent()方法中调用了eventPublisher.publishEvent(),这个eventPublisher对象是一个ApplicationEventPublisher类,这个类的全限定名是org.springframework.context.ApplicationEventPublisher。看到这儿,我们知道了发布数据是通过Spring相关的功能来完成的。

    private void publishEvent(final RuleDO ruleDO, final List<RuleConditionDTO> ruleConditions) {
		//省略了其他代码......
        // publish change event.
        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, DataEventTypeEnum.UPDATE,
                Collections.singletonList(RuleDO.transFrom(ruleDO, pluginDO.getName(), conditionDataList))));
    }

Spring完成事件发布后,肯定有对应的监听器来处理它,这个监听器是ApplicationListener接口。在Soul中是通过DataChangedEventDispatcher这个类来完成具体监听工作的,它实现了ApplicationListener接口。

//处理监听事件
@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
	//省略了其他代码......
    
    @Override
    @SuppressWarnings("unchecked")
    public void onApplicationEvent(final DataChangedEvent event) {
        for (DataChangedListener listener : listeners) {
            switch (event.getGroupKey()) {
                case APP_AUTH: //认证授权
                    listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
                    break;
                case PLUGIN: //修改了插件
                    listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
                    break;
                case RULE:  //修改了规则
                    listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
                    break;
                case SELECTOR: //修改了选择器
                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
                    break;
                case META_DATA: //修改了元数据
                    listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
                    break;
                default:
                    throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
            }
        }
    }

    //在bean初始化的时候,将实现DataChangedListener接口的bean加载进来。
    @Override
    public void afterPropertiesSet() {
        Collection<DataChangedListener> listenerBeans = applicationContext.getBeansOfType(DataChangedListener.class).values();
        this.listeners = Collections.unmodifiableList(new ArrayList<>(listenerBeans));
    }

}

注意一下,这个DataChangedEventDispatcher还实现了InitializingBean接口,并重写了它的afterPropertiesSet()方法,做的事情是:在bean初始化的时候,将实现DataChangedListener接口的bean加载进来。通过查看源码,可以看到4种数据同步的方式都实现了该接口,其中就有我们这次使用的Nacos数据同步方式。

1

当监听器监听到有事件发布后,会执行onApplicationEvent()方法,这里面的逻辑是循环处理DataChangedListener,通过switch / case表达式匹配修改的是什么类型信息,我们这里修改的是选择器,所以会匹配到listener.onSelectorChanged()这个方法。(这里虽然用了循环的方式处理每一个listener,但在实际中我们只需要一种数据同步方式就好。)

本次使用的是http长轮询进行数据同步,所以listener.onSelectorChanged()的实际执行方法是HttpLongPollingDataChangedListener#onSelectorChanged,它继承了AbstractDataChangedListener。这里面做的事情是:

  • 1.更新选择器信息到缓存;
  • 2.设置响应。

    public void onSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {
        if (CollectionUtils.isEmpty(changed)) {
            return;
        }
        //更新选择器信息到缓存
        this.updateSelectorCache();
        //设置响应
        this.afterSelectorChanged(changed, eventType);
    }

真正更新数据的操作是通过updateCache完成,将新的数据放到CACHE中,这个CACHEConcurrentMap类型。网关有定时任务来这个CACHE里获取数据。

 	 //更新选择器信息到缓存
    protected void updateSelectorCache() {
        this.updateCache(ConfigGroupEnum.SELECTOR, selectorService.listAll());
    }

    protected <T> void updateCache(final ConfigGroupEnum group, final List<T> data) {
        String json = GsonUtils.getInstance().toJson(data);
        ConfigDataCache newVal = new ConfigDataCache(group.name(), json, Md5Utils.md5(json), System.currentTimeMillis());
        //更新新的数据
        ConfigDataCache oldVal = CACHE.put(newVal.getGroup(), newVal);
        log.info("update config cache[{}], old: {}, updated: {}", group, oldVal, newVal);
    }

设置响应的过程是在定时任务中完成的。

//scheduler 是个定时任务  
@Override
    protected void afterSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {
        scheduler.execute(new DataChangeTask(ConfigGroupEnum.SELECTOR));
    }

//定时任务  
    class DataChangeTask implements Runnable {
		//省略其他代码

        @Override
        public void run() {
            for (Iterator<LongPollingClient> iter = clients.iterator(); iter.hasNext();) {
                LongPollingClient client = iter.next();
                iter.remove();
                client.sendResponse(Collections.singletonList(groupKey));
                //省略其他代码
            }
        }
    }

	//发送响应
    void sendResponse(final List<ConfigGroupEnum> changedGroups) {
			//省略其他代码
            generateResponse((HttpServletResponse) asyncContext.getResponse(), changedGroups);
            asyncContext.complete();
        }

	//产生响应
    private void generateResponse(final HttpServletResponse response, final List<ConfigGroupEnum> changedGroups) {
        try {
            response.setHeader("Pragma", "no-cache");
            response.setDateHeader("Expires", 0);
            response.setHeader("Cache-Control", "no-cache,no-store");
            response.setContentType(MediaType.APPLICATION_JSON_VALUE);
            response.setStatus(HttpServletResponse.SC_OK);
            response.getWriter().println(GsonUtils.getInstance().toJson(SoulAdminResult.success(SoulResultMessage.SUCCESS, changedGroups)));
        } catch (IOException ex) {
            log.error("Sending response failed.", ex);
        }
    }
3.接收数据

Soul网关中,接收数据的操作是主动通过http长轮询发起http请求到soul-admin获取数据。处理逻辑在org.dromara.soul.sync.data.http.HttpSyncDataService类中,Soul网关启动时就会执行。

private void start() {
        // It could be initialized multiple times, so you need to control that.
        if (RUNNING.compareAndSet(false, true)) {
            // fetch all group configs.
            this.fetchGroupConfig(ConfigGroupEnum.values());
            int threadSize = serverList.size();
            this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>(),
                    SoulThreadFactory.create("http-long-polling", true));
            // 发起长轮询
            this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));
        } else {
            log.info("soul http long polling was started, executor=[{}]", executor);
        }
    }

http长轮询的任务是:不停的进行轮询,先向soul-admin发起请求查看是否有配置信息(包括插件,选择器,规则和元数据)变更;如果有配置信息变更,再发起请求获取变更的数据;最后更新网关的缓存数据。

如果有配置信息变更class HttpLongPollingTask implements Runnable {
		//省略其他代码
        @Override
        public void run() {
            while (RUNNING.get()) {
                for (int time = 1; time <= retryTimes; time++) {
                    try {
                        doLongPolling(server);
                    } catch (Exception e) {
                   //省略其他代码
                    }
                }
            }
            log.warn("Stop http long polling.");
        }
    }

private void doLongPolling(final String server) {
    //省略其他代码
        MultiValueMap<String, String> params = new LinkedMultiValueMap<>(8);
        for (ConfigGroupEnum group : ConfigGroupEnum.values()) {
		//先发起请求,查看是否有配置信息变更
        String listenerUrl = server + "/configs/listener";
        JsonArray groupJson = null;
        try {
            String json = this.httpClient.postForEntity(listenerUrl, httpEntity, String.class).getBody();
        } catch (RestClientException e) {
//省略其他代码
        }
         //如果有配置信息变更
        if (groupJson != null) {
            if (ArrayUtils.isNotEmpty(changedGroups)) {
                log.info("Group config changed: {}", Arrays.toString(changedGroups));
                //获取变更的配置信息
                this.doFetchGroupConfig(server, changedGroups);
            }
        }
    }
    
    private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) {
        //省略其他代码
        
        //再发起请求,获取变更的配置信息
        String url = server + "/configs/fetch?" + StringUtils.removeEnd(params.toString(), "&");

        try {
            json = this.httpClient.getForObject(url, String.class);
        } catch (RestClientException e) {
//省略其他代码
        }
        // 使用获取的配置信息更新缓存
        boolean updated = this.updateCacheWithJson(json);
        if (updated) {
            log.info("get latest configs: [{}]", json);
            return;
        }
    }

在代码中this.updateCacheWithJson(json),使用获取的配置信息更新缓存的处理操作,实际还是由CommonPluginDataSubscriber来处理。CommonPluginDataSubscriber在处理数据时,根据数据类型和操作类型来分别处理。当前我们测试的是更新选择器信息,所以会进入更新的逻辑,就是下面代码中的BaseDataCache.getInstance().cacheRuleData(ruleData);


public class CommonPluginDataSubscriber implements PluginDataSubscriber {
    
   //省略了其他代码
     
   private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {
        Optional.ofNullable(classData).ifPresent(data -> {
            if (data instanceof PluginData) {
  				//省略处理插件的逻辑
            } else if (data instanceof SelectorData) { //处理选择器信息
                SelectorData selectorData = (SelectorData) data;
                if (dataType == DataEventTypeEnum.UPDATE) { //更新操作
                    BaseDataCache.getInstance().cacheSelectData(selectorData);
                    Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));
                } else if (dataType == DataEventTypeEnum.DELETE) { //删除操作
                    BaseDataCache.getInstance().removeSelectData(selectorData);
                    Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));
                }
            } else if (data instanceof RuleData) {
                //省略处理规则的逻辑
            }
        });
    }
}

BaseDataCache.getInstance().cacheSelectData(selectorData);代码中,做的事情就是根据传入的变更信息来更新SELECTOR_MAP。这个SELECTOR_MAP缓存了选择器信息,网关在后续使用时,也是从这里获取具体的选择器去匹配请求。

public final class BaseDataCache {
    private static final ConcurrentMap<String, List<SelectorData>> SELECTOR_MAP = Maps.newConcurrentMap();
    
    //省略了其他代码......
    
    //缓存选择器
    public void cacheSelectData(final SelectorData selectorData) {
        Optional.ofNullable(selectorData).ifPresent(this::selectorAccept);
    }

    //接受选择器
    private void selectorAccept(final SelectorData data) {
        String key = data.getPluginName();
        if (SELECTOR_MAP.containsKey(key)) {
            List<SelectorData> existList = SELECTOR_MAP.get(key);
            //删除之前的选择器
            final List<SelectorData> resultList = existList.stream().filter(r -> !r.getId().equals(data.getId())).collect(Collectors.toList());
            resultList.add(data);
            //保存现在的选择器
            final List<SelectorData> collect = resultList.stream().sorted(Comparator.comparing(SelectorData::getSort)).collect(Collectors.toList());
            SELECTOR_MAP.put(key, collect);
        } else {
            SELECTOR_MAP.put(key, Lists.newArrayList(data));
        }
    }
}

分析到这里,基于http长轮询数据同步的工作就算完成了。核心逻辑是:网关主动请求soul-admin获取变更的配置信息,将变更的信息放到网关的内存中,使用时再去内存中拿,所以Soul网关的效率是很高的。

4. 使用更新后的数据

选择器信息完成更新后,通过http去访问soul网关,这里以divide插件为例。关于divide插件的使用请参考之前的文章。

发起一个GET请求:http://localhost:9195/http/order/findById?id=100,代码会执行到下面这个位置:

# org.dromara.soul.plugin.base.AbstractSoulPlugin#execute
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
        String pluginName = named();
        final PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
        if (pluginData != null && pluginData.getEnabled()) {
            //获取选择器信息
            final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);
            if (CollectionUtils.isEmpty(selectors)) {
                return handleSelectorIsNull(pluginName, exchange, chain);
            }
            
            //省略了其他代码
            
            return doExecute(exchange, chain, selectorData, rule);
        }
        return chain.execute(exchange);
    }

代码BaseDataCache.getInstance().obtainSelectorData(pluginName);就是我们在数据同步时操作的数据缓存类,RULE_MAP就是刚才更新的规则信息。

public final class BaseDataCache {
    private static final ConcurrentMap<String, List<SelectorData>> SELECTOR_MAP = Maps.newConcurrentMap();
    
    //省略了其他代码......  
    public List<SelectorData> obtainSelectorData(final String pluginName) {
        return SELECTOR_MAP.get(pluginName);
    }
}

刚才,我们发起的请求:http://localhost:9195/http/order/findById?id=100,是匹配不到选择器的:

{
    "code": -107,
    "message": "Can not find selector, please check your configuration!",
    "data": null
}

因为,在开始的时候,更新了选择器的配置:查询条件中id=99才能匹配成功。

所以,我们另外再发起一个id=99请求:http://localhost:9195/http/order/findById?id=99,就可以成功了。


{
    "id": "99",
    "name": "hello world findById"
}

最后,本文通过源码的方式跟踪了Soul网关是如何通过http长轮询完成数据同步的:数据修改后,先保存到 soul-admin的内存,然后通过Soul网关主动向soul-admin发起http请求获取配置信息,然后进行处理数据,最后将数据保存到网关内存。