16 Mar 2022 |
Algorithm |
程序
深刻认识到程序=数据结构+算法
,使用程序解决特定的问题,特定的问题使用何种数据结构,为了高效的得到结果,使用什么算法。
数据结构
最基础的数据结构是数组和链表,数组占用连续空间,通过下标实现O(1)
时间复杂度访问;链表使用额外的指针连接数据节点,需要O(N)
时间复杂度访问目标数据。
基于数组和链表组成多种高级数据结构:
- 哈希表:数组+链表;
- 二叉树:链表;
- 栈 / 队列:数组或者链表;
- 图:数组+链表。
算法
算法的本质是在特定的数据结构上更高效的穷举所有结果。
常用技巧
在单链表中使用快慢指针寻找中间节点;在有序数组中高效的寻找目标数据;在滑动窗口中,右指针不断向前,满足条件后,左指针开始不断收缩。在回文字符串中,从中心不断向左右扩散,直到不满足条件;在盛水容器中,左右指针从两边开始移动,值小的不断移动。在nSum
问题中,不断寻找目标值。
在有序序列中查找数据,分析清楚左边界和右边界,循环结束条件。可以泛化到一般单调性函数上使用。
遇到下个更大元素的问题,就直接使用单调栈吧。单调队列用在滑动窗口中求最大值。
想清楚递归函数的定义,假设已经知道了f(n-1)
的结果,结合nums(i)
如何计算f(n)
。千万不要不断递归下去,这样会把自己陷进去。
递归在二叉树中应用很广。二叉树的遍历有前序,中序,后序三种方式。解决二叉树的各种问题,要考虑清楚使用哪种遍历方式,然后重点考虑当前节点需要作什么操作才能满足递归函数的定义。
动态规划常用于最值问题,它的数学证明很难,我们也不需要去做。想清楚dp()
的定义,如果知道了dp(i-1)
,dp(i)
怎么利用dp(i-1)
计算出结果。中间过程有哪些参数会影响到结果?这些参数很有可能就是状态变量。在考虑用动态规划之前,不妨试试递归+备忘录的方式能否解决呢,他们俩的效率是一样的。
如果要得到所有可能性的结果集,一般就可以考虑使用回溯。回溯就是穷举所有可能的结果,在循环中调用递归。在递归之前做出选择,在递归之后撤销选择。为了提高效率,考虑在每次递归前进行剪枝。
刷题
- 从二叉树开始
- 没有思路,直接看题解
- 一道题目至少刷五遍
16 Feb 2022 |
ShenYu |
Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API
网关。
在ShenYu
网关中,Apache ShenYu
利用 Java Agent
和 字节码增强
技术实现了无痕埋点,使得用户无需引入依赖即可接入第三方可观测性系统,获取 Traces
、Metrics
和 Logging
。
本文基于shenyu-2.4.2
版本进行源码分析,官网的介绍请参考 可观测性 。
具体而言,就是shenyu-agent
模块,它基于 Java Agent
机制,通过ByteBuddy
字节码增强库,在类加载时增强对象,属于静态代理。
在分析源码之前,介绍下AOP
相关的术语,便于后续的理解:
JoinPoint
:连接点,程序运行中的时间点,比如方法的执行点;
PointCut
:切入点,匹配 JoinPoint
的条件;
Advice
:通知,具体的执行逻辑;
Target
:目标对象;
-
Proxy
:代理对象。
- 关于Byte Buddy
Byte Buddy
是一个代码生成和操作库,在Java
应用程序的运行期间创建和修改Java
类。可以利用它创建任何类,不像JDK
动态代理那样强制实现一个接口。此外,Byte Buddy
提供了方便的API,用于手动、使用Java
代理或在构建期间改变类。
- 提供了非常方便的API接口,与强大的类,方法等匹配功能;
- 开箱即用,零学习成本,屏蔽了底层操作字节码技术;
- 强大的开放定制性功能,可以为任何实现的方法自定义字节码;
- 最少运行时生成代码原则,性能高效;
1. premain入口
premain()函数
是javaagent
的入口函数,在 ShenYu
由 ShenyuAgentBootstrap
提供并实现整个agent
的逻辑。
/**
* agent 启动入口类
*/
public class ShenyuAgentBootstrap {
/**
* 入口函数 premain.
*/
public static void premain(final String arguments, final Instrumentation instrumentation) throws Exception {
// 1. 读取配置文件
ShenyuAgentConfigUtils.setConfig(ShenyuAgentConfigLoader.load());
// 2. 加载所有插件
ShenyuAgentPluginLoader.getInstance().loadAllPlugins();
// 3. 创建 agent
AgentBuilder agentBuilder = new AgentBuilder.Default().with(new ByteBuddy().with(TypeValidation.ENABLED))
.ignore(ElementMatchers.isSynthetic())
.or(ElementMatchers.nameStartsWith("org.apache.shenyu.agent."));
agentBuilder.type(ShenyuAgentTypeMatcher.getInstance())
.transform(new ShenyuAgentTransformer())
.with(AgentBuilder.RedefinitionStrategy.RETRANSFORMATION)
.with(new TransformListener()).installOn(instrumentation);
// 4. 启动插件
PluginLifecycleManager lifecycleManager = new PluginLifecycleManager();
lifecycleManager.startup(ShenyuAgentConfigUtils.getPluginConfigMap());
Runtime.getRuntime().addShutdownHook(new Thread(lifecycleManager::close));
}
}
premain函数
的核心逻辑,就是上面的四步操作:
-
- 读取配置文件;
-
- 加载所有插件;
-
- 创建 agent;
-
- 启动插件。
接下来的源码分析就依次分析这四个操作。
2. 读取配置文件
- ShenyuAgentConfigLoader#load()
配置文件的处理由 ShenyuAgentConfigLoader
完成,代码实现如下:
public final class ShenyuAgentConfigLoader {
// 配置文件路径
private static final String CONFIG_PATH = "config-path";
/**
* 加载配置文件.
*/
public static ShenyuAgentConfig load() throws IOException {
// 读取配置文件路径
String configPath = System.getProperty(CONFIG_PATH);
// 如果没有配置,就读取默认的文件 shenyu-agent.yaml
File configFile = StringUtils.isEmpty(configPath) ? ShenyuAgentLocator.locatorConf("shenyu-agent.yaml") : new File(configPath);
// 读取配置文件并解析
return ShenyuYamlEngine.agentConfig(configFile);
}
}
可以通过config-path
指定配置文件的路径,如果没有指定的话,就读取默认的配置文件 shenyu-agent.yaml
,然后通过ShenyuYamlEngine
来解析配置文件。
配置文件的格式是yaml
格式,如何配置,请参考官网的介绍 可观测性 。
默认配置文件shenyu-agent.yaml
的格式内容如下:
appName: shenyu-agent # 指定一个名称
supports: # 当前支持哪些功能
tracing: # 链路追踪的插件
# - jaeger
# - opentelemetry
- zipkin
metrics: # 统计度量插件
-
logging: # 日志信息插件
-
plugins: # 每个插件的具体配置信息
tracing: # 链路追踪的插件
jaeger: # jaeger的相关配置
host: "localhost"
port: 5775
props:
SERVICE_NAME: "shenyu-agent"
JAEGER_SAMPLER_TYPE: "const"
JAEGER_SAMPLER_PARAM: "1"
opentelemetry: # opentelemetry的相关配置
props:
otel.traces.exporter: jaeger #zipkin #otlp
otel.resource.attributes: "service.name=shenyu-agent"
otel.exporter.jaeger.endpoint: "http://localhost:14250/api/traces"
zipkin: # zipkin的相关配置
host: "localhost"
port: 9411
props:
SERVICE_NAME: "shenyu-agent"
URL_VERSION: "/api/v2/spans"
SAMPLER_TYPE: "const"
SAMPLER_PARAM: "1"
metrics: # 统计度量插件
prometheus: # prometheus的相关配置
host: "localhost"
port: 8081
props:
logging: # 日志信息插件
elasticSearch: # es的相关配置
host: "localhost"
port: 8082
props:
kafka: # kafka的相关配置
host: "localhost"
port: 8082
props:
需要开启哪个插件,就在supports
中指定,然后再plugins
指定插件的配置信息。
到目前为止,Apache ShenYu
发布的最新版本是2.4.2
版本,可以支持tracing
的插件有jaeger
、opentelemetry
和zipkin
,metrics
和logging
将在后续的版本中陆续发布。
- ShenyuYamlEngine#agentConfig()
ShenyuYamlEngine
提供了如何自定义加载yaml
格式的文件。
public static ShenyuAgentConfig agentConfig(final File yamlFile) throws IOException {
try (
// 读取文件流
FileInputStream fileInputStream = new FileInputStream(yamlFile);
InputStreamReader inputStreamReader = new InputStreamReader(fileInputStream)
) {
//指定对应的class
Constructor constructor = new Constructor(ShenyuAgentConfig.class);
//指定属性的class
TypeDescription customTypeDescription = new TypeDescription(AgentPluginConfig.class);
customTypeDescription.addPropertyParameters("plugins", Map.class);
constructor.addTypeDescription(customTypeDescription);
//通过Yaml工具包读取yaml文件
return new Yaml(constructor, new Representer(DUMPER_OPTIONS)).loadAs(inputStreamReader, ShenyuAgentConfig.class);
}
}
ShenyuAgentConfig
是指定的Class类:
public final class ShenyuAgentConfig {
// appName 服务名称,默认是 shenyu-agent
private String appName = "shenyu-agent";
// supports 支持哪些插件
private Map<String, List<String>> supports = new LinkedHashMap<>();
// plugins 插件的属性信息
private Map<String, Map<String, AgentPluginConfig>> plugins = new LinkedHashMap<>();
}
AgentPluginConfig
是指定插件的Class类:
public final class AgentPluginConfig {
// 指定插件的 host
private String host;
// 指定插件的 port
private int port;
// 指定插件的 password
private String password;
// 指定插件的 其他属性props
private Properties props;
}
通过配置文件,用户可以指定启用哪个插件,指定插件的属性信息。
3. 加载插件
- ShenyuAgentPluginLoader#loadAllPlugins()
读取配置文件后,需要根据用户自定义的配置信息,加载指定的插件。由ShenyuAgentPluginLoader
来完成。
ShenyuAgentPluginLoader
是一个自定义的类加载器,采用单例设计模式。
// 自定义类加载器,继承 ClassLoader
public final class ShenyuAgentPluginLoader extends ClassLoader implements Closeable {
// 私有变量
private static final ShenyuAgentPluginLoader AGENT_PLUGIN_LOADER = new ShenyuAgentPluginLoader();
// 私有构造器
private ShenyuAgentPluginLoader() {
super(ShenyuAgentPluginLoader.class.getClassLoader());
}
// 公开静态方法
public static ShenyuAgentPluginLoader getInstance() {
return AGENT_PLUGIN_LOADER;
}
/**
* 加载所有的插件.
*/
public void loadAllPlugins() throws IOException {
// 1.定位插件路径
File[] jarFiles = ShenyuAgentLocator.locatorPlugin().listFiles(file -> file.getName().endsWith(".jar"));
if (Objects.isNull(jarFiles)) {
return;
}
// 2.加载插件定义
Map<String, ShenyuAgentJoinPoint> pointMap = new HashMap<>();
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
for (File each : jarFiles) {
outputStream.reset();
JarFile jar = new JarFile(each, true);
jars.add(new PluginJar(jar, each));
}
}
loadAgentPluginDefinition(pointMap);
Map<String, ShenyuAgentJoinPoint> joinPointMap = ImmutableMap.<String, ShenyuAgentJoinPoint>builder().putAll(pointMap).build();
// 3.设置拦截点
ShenyuAgentTypeMatcher.getInstance().setJoinPointMap(joinPointMap);
}
}
3.1 定位插件路径
- ShenyuAgentLocator#locatorPlugin()
整个shenyu
项目经过maven
打包后(执行mvn clean install
命令),agent
打包目录如下:
插件文件都是jar
包形式存在的。
conf
目录是配置文件的目录位置;
plugins
目录是各个插件的目录位置。
相应的定位插件路径源码处理逻辑如下:
// 默认插件位于 /plugins 目录下
public static File locatorPlugin() {
return new File(String.join("", locatorAgent().getPath(), "/plugins"));
}
// 定位shenyu-agent.jar的绝对路径
public static File locatorAgent() {
// 找 ShenyuAgentLocator 所在的类路径(包名)
String classResourcePath = String.join("", ShenyuAgentLocator.class.getName().replaceAll("\\.", "/"), ".class");
// 找到 类 的绝对路径:磁盘路径+类路径
URL resource = ClassLoader.getSystemClassLoader().getResource(classResourcePath);
assert resource != null;
String url = resource.toString();
// 是否是以jar包形式存在
int existFileInJarIndex = url.indexOf('!');
boolean isInJar = existFileInJarIndex > -1;
// 从jar包找到路径 或 从资源文件中找路径
return isInJar ? getFileInJar(url, existFileInJarIndex) : getFileInResource(url, classResourcePath);
}
// 从jar包找到路径
private static File getFileInJar(final String url, final int fileInJarIndex) {
// jar包所在的绝对路径
String realUrl = url.substring(url.indexOf("file:"), fileInJarIndex);
try {
// 以绝对路径创建File对象
File agentJarFile = new File(new URL(realUrl).toURI());
// 获取父文件
return agentJarFile.exists() ? agentJarFile.getParentFile() : null;
} catch (final MalformedURLException | URISyntaxException ex) {
return null;
}
}
拿到所有的插件文件后,会去加载插件定义,即拦截点。
3.2 加载拦截点
- ShenyuAgentPluginLoader#loadAgentPluginDefinition()
拦截点的默认配置是在 conf/tracing-point.yaml
文件中,配置格式如下:
pointCuts:
- targetClass: org.apache.shenyu.plugin.global.GlobalPlugin # 拦截目标类
points:
- type: instanceMethod # 拦截点类型
name: execute # 拦截目标方法
handlers: # 处理器
jaeger: # 用于链路追踪的jaeger插件
- org.apache.shenyu.agent.plugin.tracing.jaeger.handler.JaegerGlobalPluginHandler
opentelemetry: # 用于链路追踪的opentelemetry插件
- org.apache.shenyu.agent.plugin.tracing.opentelemetry.handler.OpenTelemetryGlobalPluginHandler
zipkin: # 用于链路追踪的zipkin插件
- org.apache.shenyu.agent.plugin.tracing.zipkin.handler.ZipkinGlobalPluginHandler
// ......
加载拦截点的方式是通过SPI
的方式进行加载的,然后再收集这些拦截点。
private void loadAgentPluginDefinition(final Map<String, ShenyuAgentJoinPoint> pointMap) {
SPILoader.loadList(AgentPluginDefinition.class) // SPI 加载拦截点
.forEach(each -> each.collector().forEach(def -> { // 收集拦截点
String classTarget = def.getClassTarget();
if (pointMap.containsKey(classTarget)) {
ShenyuAgentJoinPoint pluginInterceptorPoint = pointMap.get(classTarget);
pluginInterceptorPoint.getConstructorPoints().addAll(def.getConstructorPoints()); // 构造器类型拦截点
pluginInterceptorPoint.getInstanceMethodPoints().addAll(def.getInstanceMethodPoints()); // 实例方法类型拦截点
pluginInterceptorPoint.getStaticMethodPoints().addAll(def.getStaticMethodPoints()); // 静态方法类型拦截点
} else {
pointMap.put(classTarget, def);
}
}));
}
- SPILoader.loadList(AgentPluginDefinition.class)
AgentPluginDefinition
是拦截点接口,由@SPI
标记:
@SPI // 该接口通过SPI进行加载
public interface AgentPluginDefinition {
/**
* 收集拦截点
*/
Collection<ShenyuAgentJoinPoint> collector();
}
TracingAgentPluginDefinition
是它的一个实现类,用于定义链路追踪的拦截点:
@Join // SPI的实现类
public final class TracingAgentPluginDefinition extends AbstractAgentPluginDefinition {
// 创建拦截点
@Override
protected Collection<JoinPointBuilder> joinPointBuilder() {
// ......
}
}
public abstract class AbstractAgentPluginDefinition implements AgentPluginDefinition {
// 创建拦截点
protected abstract Collection<JoinPointBuilder> joinPointBuilder();
// 收集拦截点信息
@Override
public final Collection<ShenyuAgentJoinPoint> collector() {
//......
}
}
类之间的继承关系如下:
public abstract class AbstractAgentPluginDefinition implements AgentPluginDefinition {
// 子类去实现如何创建拦截点
protected abstract Collection<JoinPointBuilder> joinPointBuilder();
@Override
public final Collection<ShenyuAgentJoinPoint> collector() {
// 获取拦截点构建器
Collection<JoinPointBuilder> joinPointBuilders = joinPointBuilder();
// 创建拦截点对象 ShenyuAgentJoinPoint
return joinPointBuilders.stream().map(JoinPointBuilder::install).collect(Collectors.toList());
}
}
// 创建拦截点对象 ShenyuAgentJoinPoint
public ShenyuAgentJoinPoint install() {
// 四个构造参数分别是:目标对象,构造器拦截点,实例方法拦截点,静态方法拦截点
return new ShenyuAgentJoinPoint(classTarget, constructorPoints, instanceMethodPoints, classStaticMethodPoints);
}
@Join
public final class TracingAgentPluginDefinition extends AbstractAgentPluginDefinition {
// 创建拦截点
@Override
protected Collection<JoinPointBuilder> joinPointBuilder() {
PointCutConfig config = null;
try {
// 读取默认的拦截点配置文件
config = ShenyuYamlEngine.unmarshal(ShenyuAgentLocator.locatorConf("tracing-point.yaml"), PointCutConfig.class);
} catch (IOException e) {
LOG.error("Exception loader tracing point config is", e);
}
// 创建拦截点
return JoinPointBuilderFactory.create(config);
}
}
public static Collection<JoinPointBuilder> create(final PointCutConfig config) {
//如果没有配置文件或为空,则返回空集合
if (Objects.isNull(config) || config.getPointCuts().isEmpty()) {
return Collections.emptyList();
}
return config.getPointCuts().stream() // 获取配置文件中定义的拦截点
.filter(pointCut -> StringUtils.isNotEmpty(pointCut.getTargetClass())
&& !pointCut.getPoints().isEmpty() && !pointCut.getHandlers().isEmpty()) // 拦截点必须要指定目标类,切入点,处理器
.map(pointCut -> {
JoinPointBuilder builder = ShenyuAgentJoinPoint.interceptClass(pointCut.getTargetClass()); // 设置需要拦截的目标类
Set<String> supports = ShenyuAgentConfigUtils.getSupports(); // 获取当前支持哪些插件
List<String> handlers = pointCut.getHandlers().entrySet().stream()
.filter(entry -> supports.contains(entry.getKey())) // 指定的处理器必须是当前可支持的插件
.flatMap(entry -> entry.getValue().stream())
.collect(Collectors.toList());
String[] instanceMethods = pointCut
.getPoints()
.stream()
.filter(point -> PointType.INSTANCE_METHOD.getName().equals(point.getType()))
.map(Point::getName) // 拦截实例方法
.toArray(String[]::new);
if (instanceMethods.length > 0) {
builder.aroundInstanceMethod(ElementMatchers.namedOneOf(instanceMethods)).handlers(handlers).build(); // 为实例方法添加匹配器用于后续运行时动态匹配,并添加对应的处理器
}
String[] staticMethods = pointCut
.getPoints()
.stream()
.filter(point -> PointType.STATIC_METHOD.getName().equals(point.getType()))
.map(Point::getName)
.toArray(String[]::new); // 拦截静态方法
if (staticMethods.length > 0) {
builder.aroundStaticMethod(ElementMatchers.namedOneOf(staticMethods)).handlers(handlers).build();// 为静态方法添加匹配器用于后续运行时动态匹配,并添加对应的处理器
}
String[] constructorPoints = pointCut
.getPoints()
.stream()
.filter(point -> PointType.CONSTRUCTOR.getName().equals(point.getType()))
.map(Point::getName)
.toArray(String[]::new); // 拦截构造器
if (constructorPoints.length > 0) {
builder.onConstructor(ElementMatchers.namedOneOf(constructorPoints)).handlers(handlers).build();// 为构造器添加匹配器用于后续运行时动态匹配,并添加对应的处理器
}
return builder;
}).collect(Collectors.toList()); // 返回匹配结果
}
创建拦截点的主要实现逻辑是:根据配置文件读取配置信息,为指定的目标对象的目标方法添加相应的处理器。处理器有三种:实例方法处理器,静态方法处理器,构造函数处理器。
这里用到了ElementMatchers.namedOneOf()
方法,它表示方法名称在指定的参数中,就可以匹配上这个方法。ElementMatchers
是bytebuddy
中的一个类,在ShenYu
中,agent
的创建也通过bytebuddy
完成的。
后续将收集到的拦截点创建为拦截点对象ShenyuAgentJoinPoint
。
public final Collection<ShenyuAgentJoinPoint> collector() {
// 获取拦截点
Collection<JoinPointBuilder> joinPointBuilders = joinPointBuilder();
// 创建拦截点对象
return joinPointBuilders.stream().map(JoinPointBuilder::install).collect(Collectors.toList());
}
收集完拦截点之后,用Map
保存了这些拦截点信息。
// pointMap: Key和Value分别表示目标类,拦截点
private void loadAgentPluginDefinition(final Map<String, ShenyuAgentJoinPoint> pointMap) {
SPILoader.loadList(AgentPluginDefinition.class) // SPI 加载拦截点
.forEach(each -> each.collector().forEach(def -> { // 收集拦截点
String classTarget = def.getClassTarget();
if (pointMap.containsKey(classTarget)) {
ShenyuAgentJoinPoint pluginInterceptorPoint = pointMap.get(classTarget);
pluginInterceptorPoint.getConstructorPoints().addAll(def.getConstructorPoints()); // 构造器类型拦截点
pluginInterceptorPoint.getInstanceMethodPoints().addAll(def.getInstanceMethodPoints()); // 实例方法类型拦截点
pluginInterceptorPoint.getStaticMethodPoints().addAll(def.getStaticMethodPoints()); // 静态方法类型拦截点
} else {
pointMap.put(classTarget, def); // 将拦截点信息保存到Map中
}
}));
}
3.3 设置拦截点
在加载所有插件的过程中最后一步是设置拦截点。
public void loadAllPlugins() throws IOException {
// 1.定位插件路径
// ......
// 2.加载插件定义
// ......
// 3.设置拦截点
ShenyuAgentTypeMatcher.getInstance().setJoinPointMap(joinPointMap);
}
设置拦截点就是将拦截点集合保存到ShenyuAgentTypeMatcher
类中。它实现了ElementMatcher
接口,用于自定义匹配逻辑。ElementMatcher
也是bytebuddy
中的接口。
// 使用单例设计模式
public final class ShenyuAgentTypeMatcher extends ElementMatcher.Junction.AbstractBase<TypeDefinition> {
// 创建实例
private static final ShenyuAgentTypeMatcher SHENYU_AGENT_TYPE_MATCHER = new ShenyuAgentTypeMatcher();
// 拦截点集合
private Map<String, ShenyuAgentJoinPoint> joinPointMap;
private ShenyuAgentTypeMatcher() {
}
/**
* 获取单例
*/
public static ShenyuAgentTypeMatcher getInstance() {
return SHENYU_AGENT_TYPE_MATCHER;
}
//自定义匹配逻辑,目标类在拦截点集合中就匹配成功
@Override
public boolean matches(final TypeDefinition target) {
return joinPointMap.containsKey(target.getTypeName());
}
/**
* 设置拦截点集合
*/
public void setJoinPointMap(final Map<String, ShenyuAgentJoinPoint> joinPointMap) {
this.joinPointMap = joinPointMap;
}
}
4. 创建 agent
通过创建的agent,用于改变目标类的行为。
public static void premain(final String arguments, final Instrumentation instrumentation) throws Exception {
// 1. 读取配置文件
// ......
// 2. 加载所有插件
// ......
// 3. 创建 agent
AgentBuilder agentBuilder = new AgentBuilder.Default().with(new ByteBuddy().with(TypeValidation.ENABLED)) // 通过ByteBuddy创建Agent,开启类型校验
.ignore(ElementMatchers.isSynthetic()) // 忽略合成类
.or(ElementMatchers.nameStartsWith("org.apache.shenyu.agent.")); // 忽略org.apache.shenyu.agent 的类
agentBuilder.type(ShenyuAgentTypeMatcher.getInstance())//匹配加载类型,匹配器是ShenyuAgentTypeMatcher
.transform(new ShenyuAgentTransformer()) // 匹配成功的,通过ShenyuAgentTransformer改变其行为
.with(AgentBuilder.RedefinitionStrategy.RETRANSFORMATION) //指定重定义策略
.with(new TransformListener()) //指定一个监听器,监听运行过程中的事件
.installOn(instrumentation); // 将修改应用到instrumentation中
// 4. 启动插件
// ......
}
在创建agent的过程中,需要注意两个点:
- 是否匹配成功,由
ShenyuAgentTypeMatcher
决定;
- 匹配成功的类,通过
ShenyuAgentTransformer
改变其行为;
接下来我们就来着重分析这两个类。
4.1 定义匹配逻辑
- ShenyuAgentTypeMatcher#matches()
ShenyuAgentTypeMatcher
使用单例的设计模式,实现了ElementMatcher
接口,重写了matches()
方法。
是否能够匹配成功的逻辑是:如果目标类在拦截点joinPointMap
集合中,就匹配成功。
public final class ShenyuAgentTypeMatcher extends ElementMatcher.Junction.AbstractBase<TypeDefinition> {
// ......
private Map<String, ShenyuAgentJoinPoint> joinPointMap;
// 自定义匹配逻辑:如果目标类在拦截点joinPointMap集合中,就匹配成功
@Override
public boolean matches(final TypeDefinition target) {
return joinPointMap.containsKey(target.getTypeName());
}
}
4.2 改变匹配类的行为
在加载目标类时,如果匹配成功,会通过ShenyuAgentTransformer
改变其行为,它实现了Transformer
接口,重写了transform()
方法,Transformer
也是bytebuddy
的一个接口。
public final class ShenyuAgentTransformer implements Transformer {
//在匹配类中额外定义一个字段,传递上下文信息
private static final String EXTRA_DATA = "_$EXTRA_DATA$_";
//类型匹配器
private static final ShenyuAgentTypeMatcher MATCHER = ShenyuAgentTypeMatcher.getInstance();
//重写transform方法,重新定义匹配类的行为
//加载类期间,执行一次
@Override
public Builder<?> transform(final Builder<?> builder, final TypeDescription typeDescription, final ClassLoader classLoader, final JavaModule module) {
//不是匹配的类就跳过
if (!MATCHER.containsType(typeDescription)) {
return builder;
}
//为该类新增加一个字段
Builder<?> result = builder.defineField(EXTRA_DATA, Object.class, Opcodes.ACC_PRIVATE | Opcodes.ACC_VOLATILE).implement(TargetObject.class).intercept(FieldAccessor.ofField(EXTRA_DATA));
// 获取拦截点
ShenyuAgentJoinPoint joinPoint = MATCHER.loadShenyuAgentJoinPoint(typeDescription);
//拦截构造器
result = interceptorConstructorPoint(typeDescription, joinPoint.getConstructorPoints(), result);
//拦截静态方法
result = interceptorStaticMethodPoint(typeDescription, joinPoint.getStaticMethodPoints(), result);
//拦截实例方法
result = interceptorInstanceMethodPoint(typeDescription, joinPoint.getInstanceMethodPoints(), result);
return result;
}
// ......
}
在transform()
方法中,重新定义了匹配类的行为:
- 新增了字段,是
TargetObject
的子类,用于传递上下文;
- 根据指定配置,是否拦截构造器;
- 根据指定配置,是否拦截静态方法;
- 根据指定配置,拦截实例方法;
4.2.3 拦截实例方法
- ShenyuAgentTransformer#interceptorInstanceMethodPoint()
根据切面构建实例方法拦截点,获取Builder
对象。
private Builder<?> interceptorInstanceMethodPoint(final TypeDescription description, final Collection<InstanceMethodPointCut> pointCuts, final Builder<?> builder) {
Collection<ShenyuAgentTransformerPoint<?>> points = description.getDeclaredMethods().stream()
.filter(each -> !(each.isAbstract() || each.isSynthetic())) //过滤抽象方法,合成方法
.map(each -> buildInstanceMethodTransformationPoint(pointCuts, each)) //构建实例方法拦截点
.filter(Objects::nonNull)
.collect(Collectors.toList());
return getBuilder(description, builder, points); //获取Builder对象
}
- ShenyuAgentTransformer#buildInstanceMethodTransformationPoint()
过滤匹配上的方法,为方法获取对应的handler
处理器,最后创建实例方法拦截点对象。
private ShenyuAgentTransformerPoint<?> buildInstanceMethodTransformationPoint(final Collection<InstanceMethodPointCut> pointCuts, final InDefinedShape methodDescription) {
List<InstanceMethodPointCut> points = pointCuts.stream().filter(point -> point.getMatcher().matches(methodDescription)).collect(Collectors.toList()); //过滤能够匹配上的方法
if (points.isEmpty()) {
return null;
}
List<InstanceMethodHandler> handlers = points.stream()
.flatMap(pointCut -> pointCut.getHandlers().stream())
.map(handler -> (InstanceMethodHandler) MATCHER.getOrCreateInstance(handler)) //获取对应的handler处理器
.filter(Objects::nonNull)
.collect(Collectors.toList());
return new ShenyuAgentTransformerPoint<>(methodDescription, new InstanceMethodInterceptor(handlers));//创建实例方法拦截点对象,创建实例方法拦截器
}
方法能否匹配成功:当前方法名称是否是在tracing-point.yaml
文件中配置的方法名称;
handler
的获取:是根据tracing-point.yaml
文件中配置的全限定名去加载对应的类。
- InstanceMethodInterceptor#intercept()
实例方法拦截器InstanceMethodInterceptor
会在运行期间动态处理拦截方法。
public class InstanceMethodInterceptor {
// ......
/**
* 拦截目标对象.
*
* @param target 当前被拦截的目标对象
* @param method 目标对象的目标方法
* @param args 目标方法的参数
* @param callable 目标方法调用
* @return 目标方法调用结果
* @throws 异常信息
*/
@RuntimeType //定义运行时的目标方法
public Object intercept(@This final Object target, @Origin final Method method, @AllArguments final Object[] args, @SuperCall final Callable<?> callable) throws Exception {
//目标方法执行结果
Object result = null;
//目标对象
TargetObject instance = (TargetObject) target;
//依次调用handler
for (InstanceMethodHandler handler : handlerList) {
MethodResult methodResult = new MethodResult();
// 前置处理逻辑
try {
handler.before(instance, method, args, methodResult);
} catch (final Throwable ex) {
LOG.error("Failed to execute the before method of method {} in class {}", method.getName(), target.getClass(), ex);
}
//调用目标方法
try {
result = callable.call();
} catch (final Throwable ex) {
//处理异常
try {
handler.onThrowing(instance, method, args, ex);
} catch (final Throwable ignored) {
LOG.error("Failed to execute the error handler of method {} in class {}", method.getName(), target.getClass(), ex);
throw ex;
}
} finally {
//后置处理逻辑
try {
result = handler.after(instance, method, args, methodResult, result);
} catch (final Throwable ex) {
LOG.error("Failed to execute the after method of method {} in class {}", method.getName(), target.getClass(), ex);
}
}
}
//返回目标方法调用结果
return result;
}
}
实例方法拦截器在目标方法调用前,增加了前置处理逻辑,后置处理逻辑,以及异常处理逻辑。
这里用到了Byte Buddy
的几个注解:
@RuntimeType
: 定义运行时的目标方法,提示ByteBuddy
禁用严格的类型检查;
@This
:当前被拦截的、动态生成的实例对象;
@Origin
:原有方法;
@AllArguments
:获取所有入参;
@SuperCall
:用于调用父类版本的方法。
实例方法处理器InstanceMethodHandler
只是定义了三个接口,具体实现逻辑由具体插件去处理。
public interface InstanceMethodHandler {
// 前置处理逻辑
default void before(final TargetObject target, final Method method, final Object[] args, final MethodResult result) {
}
// 后置处理逻辑
default Object after(final TargetObject target, final Method method, final Object[] args, final MethodResult methodResult, final Object result) {
return result;
}
// 异常处理逻辑
default void onThrowing(final TargetObject target, final Method method, final Object[] args, final Throwable throwable) {
}
}
private static Builder<?> getBuilder(final TypeDescription description, final Builder<?> builder, final Collection<ShenyuAgentTransformerPoint<?>> points) {
final Builder<?>[] result = {builder};
points.forEach(point -> {
try {
result[0] = builder.method(ElementMatchers.is(point.getDescription()))//指定目标方法
.intercept(MethodDelegation.withDefaultConfiguration().to(point.getInterceptor()));//指定拦截器
// CHECKSTYLE:OFF
} catch (final Throwable ex) {
// CHECKSTYLE:ON
LOG.error("Failed to load handler class: {}", description.getTypeName(), ex);
}
});
return result[0];
}
通过以上的处理逻辑,就可以实现无侵入拦截实例方法了。
拦截方法intercept()
的处理逻辑是:
- 依次处理每个
handler
;
- 调用
handler
的前置方法;
- 调用目标方法;
- 如果目标方法有异常,则调用
handler
的异常处理方法;
- 调用
handler
的后置方法。
接下来看看拦截静态方法。
4.2.3 拦截静态方法
- ShenyuAgentTransformer#interceptorStaticMethodPoint()
过滤出静态方法,然后为静态方法构建静态方法拦截点。
private Builder<?> interceptorStaticMethodPoint(final TypeDescription description, final Collection<StaticMethodPointCut> pointCuts, final Builder<?> builder) {
Collection<ShenyuAgentTransformerPoint<?>> points = description.getDeclaredMethods().stream()
.filter(each -> each.isStatic() && !(each.isAbstract() || each.isSynthetic())) // 当前方法是静态方法,不是抽象方法,不是合成方法
.map(methodDescription -> buildStaticMethodTransformationPoint(pointCuts, methodDescription)) // 构建静态方法拦截点
.filter(Objects::nonNull)
.collect(Collectors.toList());
return getBuilder(description, builder, points);
}
- ShenyuAgentTransformer#buildStaticMethodTransformationPoint()
根据配置文件进行过滤,判断当前的静态方法是否需要拦截。然后获取对应的处理器,最后构建静态方法拦截器对象。
private ShenyuAgentTransformerPoint<?> buildStaticMethodTransformationPoint(final Collection<StaticMethodPointCut> pointCuts, final InDefinedShape methodDescription) {
List<StaticMethodPointCut> staticMethodPoints = pointCuts.stream().filter(point -> point.getMatcher().matches(methodDescription)).collect(Collectors.toList()); //根据配置文件进行过滤,判断当前的静态方法是否需要拦截
if (staticMethodPoints.isEmpty()) { // 如果没有配置,就直接返回了
return null;
}
List<StaticMethodHandler> handlers = staticMethodPoints.stream()
.flatMap(pointCut -> pointCut.getHandlers().stream())
.map(handler -> (StaticMethodHandler) MATCHER.getOrCreateInstance(handler)) //获取对应的处理器
.filter(Objects::nonNull)
.collect(Collectors.toList());
return new ShenyuAgentTransformerPoint<>(methodDescription, new StaticMethodInterceptor(handlers)); //构建静态方法拦截器对象
}
- StaticMethodInterceptor#intercept()
在运行时,会拦截目标方法,执行拦截器的处理逻辑 。
public class StaticMethodInterceptor {
//......
/**
* 拦截目标方法.
*/
@RuntimeType
public Object intercept(@Origin final Class<?> klass, @Origin final Method method, @AllArguments final Object[] args, @SuperCall final Callable<?> callable) throws Exception {
Object result = null;
// handler循环处理
for (StaticMethodHandler handler : handlerList) {
MethodResult methodResult = new MethodResult();
try {
//前置方法
handler.before(klass, method, args, new MethodResult());
} catch (final Throwable ex) {
LOG.error("Failed to execute the before method of method {} in class {}", method.getName(), klass, ex);
}
try {
// 调用当前方法
// 目标方法是不是应该只会被调用一次?
result = callable.call();
} catch (final Throwable ex) {
try {
//异常逻辑处理
handler.onThrowing(klass, method, args, ex);
} catch (final Throwable ignored) {
LOG.error("Failed to execute the error handler of method {} in class {}", method.getName(), klass, ex);
throw ex;
}
} finally {
try {
// 后置方法
handler.after(klass, method, args, methodResult);
} catch (final Throwable ex) {
LOG.error("Failed to execute the after method of method {} in class {}", method.getName(), klass, ex);
}
}
}
//返回方法调用结果
return result;
}
}
拦截方法intercept()
的处理逻辑是:
- 依次处理每个
handler
;
- 调用
handler
的前置方法;
- 调用目标方法;
- 如果目标方法有异常,则调用
handler
的异常处理方法;
- 调用
handler
的后置方法。
最后再看看如何拦截构造器。
4.2.3 拦截构造器
- ShenyuAgentTransformer#interceptorConstructorPoint()
过滤出构造器,然后构建构造器的拦截点,最后创建builder
对象,为构造方法添加拦截器。
private Builder<?> interceptorConstructorPoint(final TypeDescription description, final Collection<ConstructorPointCut> constructorPoints, final Builder<?> builder) {
Collection<ShenyuAgentTransformerPoint<? extends ConstructorInterceptor>> constructorAdviceComposePoints = description.getDeclaredMethods().stream()
.filter(MethodDescription::isConstructor) //过滤出构造器
.map(each -> buildConstructorTransformerPoint(constructorPoints, each))//构建构造器的拦截点
.filter(Objects::nonNull)
.collect(Collectors.toList());
final Builder<?>[] result = {builder};
// 创建builder对象,为构造方法添加拦截器
constructorAdviceComposePoints.forEach(point -> {
try {
result[0] = builder.constructor(ElementMatchers.is(point.getDescription()))
.intercept(SuperMethodCall.INSTANCE.andThen(MethodDelegation.withDefaultConfiguration()
.to(point.getInterceptor())));//先调用父类构造器,然后添加拦截器
// CHECKSTYLE:OFF
} catch (final Throwable ex) {
// CHECKSTYLE:ON
LOG.error("Failed to load handler class: {}", description.getTypeName(), ex);
}
});
return result[0];
}
- ShenyuAgentTransformer#buildConstructorTransformerPoint()
先获取到构造器拦截点,然后为拦截点创建handler
实例对象,最后创建构造器拦截器对象。
private ShenyuAgentTransformerPoint<? extends ConstructorInterceptor> buildConstructorTransformerPoint(
final Collection<ConstructorPointCut> constructorPoints, final InDefinedShape methodDescription) {
//获取构造器拦截点
List<ConstructorPointCut> constructorPointCutList = constructorPoints.stream().filter(each -> each.getMatcher().matches(methodDescription)).collect(Collectors.toList());
if (constructorPointCutList.isEmpty()) {
return null;
}
List<ConstructorHandler> handlers = constructorPointCutList.stream()
.flatMap(pointCut -> pointCut.getHandlers().stream())
.map(handler -> (ConstructorHandler) MATCHER.getOrCreateInstance(handler)) //创建拦截点的handler实例对象
.filter(Objects::nonNull)
.collect(Collectors.toList());
return new ShenyuAgentTransformerPoint<>(methodDescription, new ConstructorInterceptor(handlers));//创建构造器拦截器
}
- ConstructorInterceptor#intercept()
构造器拦截器:在调用目标方法的构造器之前,执行每个handler
的处理逻辑。
public class ConstructorInterceptor {
//......
/**
* 拦截方法.
*/
@RuntimeType
public void intercept(@This final TargetObject target, @AllArguments final Object[] args) {
for (ConstructorHandler handler : handlerList) {
try {
// handler处理逻辑
handler.onConstructor(target, args);
} catch (final Throwable throwable) {
LOG.error("Constructor advice execution error. class: {}", target.getClass().getTypeName(), throwable);
}
}
}
}
分析到此,我们分析完了创建agent
的整个过程:
- 根据配置文件,定义匹配逻辑
ShenyuAgentTypeMatcher
;
- 定义
ShenyuAgentTransformer
对象,改变匹配类的行为;
- 通过
InstanceMethodInterceptor
拦截实例对象方法;
- 通过
StaticMethodInterceptor
拦截静态方法;
- 通过
ConstructorInterceptor
拦截构造器。
这里没有提到每个handler
的处理逻辑,是因为handler
的实现逻辑由每个插件自定义。比如,当前实例方法拦截器InstanceMethodHandler
的实现类就有jaeger
,opentelemetry
和zipkin
。
5. 启动插件
创建完 agent
之后,启动各个插件。
public static void premain(final String arguments, final Instrumentation instrumentation) throws Exception {
// 1. 读取配置文件
// ......
// 2. 加载所有插件
// ......
// 3. 创建 agent
// 4. 启动插件
PluginLifecycleManager lifecycleManager = new PluginLifecycleManager();
lifecycleManager.startup(ShenyuAgentConfigUtils.getPluginConfigMap());
//添加hook函数用于关闭插件
Runtime.getRuntime().addShutdownHook(new Thread(lifecycleManager::close));
}
public class PluginLifecycleManager {
//......
/**
* 启动插件.
*/
public void startup(final Map<String, AgentPluginConfig> configMap) {
//从配置文件中获取支持的插件名称
Set<String> support = ShenyuAgentConfigUtils.getSupports();
configMap.entrySet().stream()
.filter(entry -> support.contains(entry.getKey())) //包含在配置文件中
.forEach(entry -> Optional.ofNullable(SPILoader.load(AgentPluginBootService.class, entry.getKey())) //通过SPI加载插件启动类
.ifPresent(bootService -> {
try {
LOG.info("start shenyu plugin: {}", entry.getKey());
bootService.start(entry.getValue()); // 启动插件:执行插件的具体启动逻辑
} catch (final Throwable ex) {
LOG.error("Failed to start shenyu plugin", ex);
}
}));
}
/**
* 关闭插件
*/
public void close() {
//通过SPI加载插件启动类
SPILoader.loadList(AgentPluginBootService.class).forEach(each -> {
try {
each.close(); // 关闭插件:执行插件的具体关闭逻辑
} catch (final Throwable ex) {
LOG.error("Failed to close shenyu agent plugin", ex);
}
});
}
}
插件的启动和关闭也是有每个插件具体去实现的,然后通过SPI
去加载。
6. 总结
shenyu-agent
模块的实现主要是通过Byte Buddy
工具包;
- 在配置文件
shenyu-agent.yaml
中,指定插件信息;
- 插件加载过程通过
SPI
完成;
- 拦截点通过配置文件指定,设计灵活;
- 插件接口定义和实现分开,支持多种插件类型。
22 Nov 2021 |
ShenYu |
Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API
网关。
在ShenYu
网关中,注册中心是用于将客户端信息注册到shenyu-admin
,admin
再通过数据同步将这些信息同步到网关,网关通过这些数据完成流量筛选。客户端信息主要包括接口信息
和URI信息
。
本文基于shenyu-2.4.1
版本进行源码分析,官网的介绍请参考 客户端接入原理 。
1. 注册中心原理
当客户端启动时,读取接口信息和uri信息
,通过指定的注册类型,将数据发送到shenyu-admin
。
图中的注册中心需要用户指定使用哪种注册类型,ShenYu
当前支持Http
、Zookeeper
、Etcd
、Consul
和Nacos
进行注册。具体如何配置请参考 客户端接入配置 。
ShenYu
在注册中心的原理设计上引入了Disruptor
,Disruptor
队列在其中起到数据与操作解耦,利于扩展。如果注册请求过多,导致注册异常,也有数据缓冲作用。
如图所示,注册中心分为两个部分,一是注册中心客户端register-client
,负载处理客户端数据读取。另一个是注册中心服务端register-server
,负载处理服务端(就是shenyu-admin
)数据写入。通过指定注册类型进行数据发送和接收。
- 客户端:通常来说就是一个微服务,可以是
springmvc
,spring-cloud
,dubbo
,grpc
等。
register-client
:注册中心客户端,读取客户接口和uri
信息。
Disruptor
:数据与操作解耦,数据缓冲作用。
register-server
:注册中心服务端,这里就是shenyu-admin
,接收数据,写入数据库,发数据同步事件。
- 注册类型:指定注册类型,完成数据注册,当前支持
Http
、Zookeeper
、Etcd
、Consul
和Nacos
。
本文分析的是使用Http
的方式进行注册,所以具体的处理流程如下:
在客户端,数据出队列后,通过http
传输数据,在服务端,提供相应的接口,接收数据,然后写入队列。
2. 客户端注册流程
当客户端启动后,根据相关配置,读取属性信息,然后写入队列。以官方提供的 shenyu-examples-http 为例,开始源码分析。官方提供的例子是一个由springboot
构建的微服务。注册中心的相关配置可以参考官网 客户端接入配置 。
2.1 加载配置,读取属性
先用一张图串联下注册中心客户端初始化流程:
我们分析的是通过http
的方式进行注册,所以需要进行如下配置:
shenyu:
register:
registerType: http
serverLists: http://localhost:9095
client:
http:
props:
contextPath: /http
appName: http
port: 8189
isFull: false
每个属性表示的含义如下:
registerType
: 服务注册类型,填写 http
。
serverList
: 为http
注册类型时,填写Shenyu-Admin
项目的地址,注意加上http://
,多个地址用英文逗号分隔。
port
: 你本项目的启动端口,目前springmvc/tars/grpc
需要进行填写。
contextPath
: 为你的这个mvc
项目在shenyu
网关的路由前缀, 比如/order
,/product
等等,网关会根据你的这个前缀来进行路由。
appName
:你的应用名称,不配置的话,会默认取 spring.application.name
的值。
isFull
: 设置 true
代表代理你的整个服务,false
表示代理你其中某几个controller
;目前适用于springmvc/springcloud
。
项目启动后,会先加载配置文件,读取属性信息,生成相应的Bean
。
首先读取到的配置文件是 ShenyuSpringMvcClientConfiguration
,它是shenyu
客户端http
注册配置类,通过@Configuration
表示这是一个配置类,通过@ImportAutoConfiguration
引入其他配置类。创建SpringMvcClientBeanPostProcessor
,主要处理元数据。创建ContextRegisterListener
,主要处理 URI
信息。
/**
* shenyu 客户端http注册配置类
*/
@Configuration
@ImportAutoConfiguration(ShenyuClientCommonBeanConfiguration.class)
public class ShenyuSpringMvcClientConfiguration {
//创建SpringMvcClientBeanPostProcessor,主要处理元数据
@Bean
public SpringMvcClientBeanPostProcessor springHttpClientBeanPostProcessor(final ShenyuClientConfig clientConfig,final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
return new SpringMvcClientBeanPostProcessor(clientConfig.getClient().get(RpcTypeEnum.HTTP.getName()), shenyuClientRegisterRepository);
}
// 创建ContextRegisterListener,主要处理 URI信息
@Bean
public ContextRegisterListener contextRegisterListener(final ShenyuClientConfig clientConfig) {
return new ContextRegisterListener(clientConfig.getClient().get(RpcTypeEnum.HTTP.getName()));
}
}
ShenyuClientCommonBeanConfiguration
是shenyu
客户端通用配置类,会创建注册中心客户端通用的bean
。
- 创建
ShenyuClientRegisterRepository
,通过工厂类创建而成。
- 创建
ShenyuRegisterCenterConfig
,读取shenyu.register
属性配置。
- 创建
ShenyuClientConfig
,读取shenyu.client
属性配置。
/**
* shenyu客户端通用配置类
*/
@Configuration
public class ShenyuClientCommonBeanConfiguration {
// 创建ShenyuClientRegisterRepository,通过工厂类创建而成。
@Bean
public ShenyuClientRegisterRepository shenyuClientRegisterRepository(final ShenyuRegisterCenterConfig config) {
return ShenyuClientRegisterRepositoryFactory.newInstance(config);
}
// 创建ShenyuRegisterCenterConfig,读取shenyu.register属性配置
@Bean
@ConfigurationProperties(prefix = "shenyu.register")
public ShenyuRegisterCenterConfig shenyuRegisterCenterConfig() {
return new ShenyuRegisterCenterConfig();
}
// 创建ShenyuClientConfig,读取shenyu.client属性配置
@Bean
@ConfigurationProperties(prefix = "shenyu")
public ShenyuClientConfig shenyuClientConfig() {
return new ShenyuClientConfig();
}
}
2.2 用于注册的 HttpClientRegisterRepository
上面的配置文件中生成的ShenyuClientRegisterRepository
是客户端注册的具体实现,它是一个接口,它的实现类如下。
HttpClientRegisterRepository
:通过http
进行注册;
ConsulClientRegisterRepository
:通过Consul
进行注册;
EtcdClientRegisterRepository
:通过Etcd
进行注册;
NacosClientRegisterRepository
:通过nacos
进行注册;
ZookeeperClientRegisterRepository
通过Zookeeper
进行注册。
具体是哪一种方式,是通过SPI
进行加载实现的,实现逻辑如下:
/**
* 加载 ShenyuClientRegisterRepository
*/
public final class ShenyuClientRegisterRepositoryFactory {
private static final Map<String, ShenyuClientRegisterRepository> REPOSITORY_MAP = new ConcurrentHashMap<>();
/**
* 创建 ShenyuClientRegisterRepository
*/
public static ShenyuClientRegisterRepository newInstance(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig) {
if (!REPOSITORY_MAP.containsKey(shenyuRegisterCenterConfig.getRegisterType())) {
// 通过SPI的方式进行加载,类型由registerType决定
ShenyuClientRegisterRepository result = ExtensionLoader.getExtensionLoader(ShenyuClientRegisterRepository.class).getJoin(shenyuRegisterCenterConfig.getRegisterType());
//执行初始化操作
result.init(shenyuRegisterCenterConfig);
ShenyuClientShutdownHook.set(result, shenyuRegisterCenterConfig.getProps());
REPOSITORY_MAP.put(shenyuRegisterCenterConfig.getRegisterType(), result);
return result;
}
return REPOSITORY_MAP.get(shenyuRegisterCenterConfig.getRegisterType());
}
}
加载类型通过registerType
指定,也就是我们在配置文件中指定的类型:
shenyu:
register:
registerType: http
serverLists: http://localhost:9095
我们指定的是http
,所以会去加载HttpClientRegisterRepository
。对象创建成功后,执行的初始化方法init()
如下:
@Join
public class HttpClientRegisterRepository implements ShenyuClientRegisterRepository {
@Override
public void init(final ShenyuRegisterCenterConfig config) {
this.serverList = Lists.newArrayList(Splitter.on(",").split(config.getServerLists()));
}
// 暂时省略其他逻辑
}
读取配置文件中的serverLists
,即sheenyu-admin
的地址,为后续数据发送做准备。类注解@Join
用于SPI
的加载。
SPI
全称为 Service Provider Interface
, 是 JDK
内置的一种服务提供发现功能, 一种动态替换发现的机制。
shenyu-spi 是Apache ShenYu
网关自定义的SPI
扩展实现,设计和实现原理参考了Dubbo
的 SPI扩展实现 。
2.3 构建元数据的 SpringMvcClientBeanPostProcessor
创建SpringMvcClientBeanPostProcessor
,负责元数据的构建和注册,它的构造函数逻辑如下:
/**
* spring mvc 客户端bean的后置处理器
*/
public class SpringMvcClientBeanPostProcessor implements BeanPostProcessor {
/**
* 通过构造函数进行实例化
*/
public SpringMvcClientBeanPostProcessor(final PropertiesConfig clientConfig,
final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
// 读取配置属性
Properties props = clientConfig.getProps();
// 获取端口信息,并校验
int port = Integer.parseInt(props.getProperty(ShenyuClientConstants.PORT));
if (port <= 0) {
String errorMsg = "http register param must config the port must > 0";
LOG.error(errorMsg);
throw new ShenyuClientIllegalArgumentException(errorMsg);
}
// 获取appName
this.appName = props.getProperty(ShenyuClientConstants.APP_NAME);
// 获取contextPath
this.contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH);
// 校验appName和contextPath
if (StringUtils.isBlank(appName) && StringUtils.isBlank(contextPath)) {
String errorMsg = "http register param must config the appName or contextPath";
LOG.error(errorMsg);
throw new ShenyuClientIllegalArgumentException(errorMsg);
}
// 获取 isFull
this.isFull = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString()));
// 开始事件发布
publisher.start(shenyuClientRegisterRepository);
}
// 暂时省略了其他逻辑
@Override
public Object postProcessAfterInitialization(@NonNull final Object bean, @NonNull final String beanName) throws BeansException {
// 暂时省略了其他逻辑
}
}
在构造函数中,主要是读取属性信息,然后进行校验。
shenyu:
client:
http:
props:
contextPath: /http
appName: http
port: 8189
isFull: false
最后,执行了 publisher.start()
,开始事件发布,为注册做准备。
- ShenyuClientRegisterEventPublisher
ShenyuClientRegisterEventPublisher
通过单例模式实现,主要是生成元数据
和URI
订阅器(后续用于数据发布),然后启动Disruptor
队列。提供了一个共有方法publishEvent()
,发布事件,向Disruptor队列发数据。
public class ShenyuClientRegisterEventPublisher {
// 私有变量
private static final ShenyuClientRegisterEventPublisher INSTANCE = new ShenyuClientRegisterEventPublisher();
private DisruptorProviderManage providerManage;
private RegisterClientExecutorFactory factory;
/**
* 公开静态方法
*
* @return ShenyuClientRegisterEventPublisher instance
*/
public static ShenyuClientRegisterEventPublisher getInstance() {
return INSTANCE;
}
/**
* Start方法执行
*
* @param shenyuClientRegisterRepository shenyuClientRegisterRepository
*/
public void start(final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
// 创建客户端注册工厂类
factory = new RegisterClientExecutorFactory();
// 添加元数据订阅器
factory.addSubscribers(new ShenyuClientMetadataExecutorSubscriber(shenyuClientRegisterRepository));
// 添加URI订阅器
factory.addSubscribers(new ShenyuClientURIExecutorSubscriber(shenyuClientRegisterRepository));
// 启动Disruptor队列
providerManage = new DisruptorProviderManage(factory);
providerManage.startup();
}
/**
* 发布事件,向Disruptor队列发数据
*
* @param <T> the type parameter
* @param data the data
*/
public <T> void publishEvent(final T data) {
DisruptorProvider<Object> provider = providerManage.getProvider();
provider.onData(f -> f.setData(data));
}
}
SpringMvcClientBeanPostProcessor
的构造函数逻辑分析完了,主要是读取属性配置,创建元数据和URI
订阅器, 启动Disruptor
队列。要注意到它实现了BeanPostProcessor
,这是Spring
提供的一个接口,在Bean
的生命周期中,真正开始使用之前,会执行后置处理器的postProcessAfterInitialization()
方法。
- postProcessAfterInitialization() 方法
SpringMvcClientBeanPostProcessor
作为一个后置处理器,它的功能是:读取注解中的元数据,并向admin
注册。
// 后置处理器
public class SpringMvcClientBeanPostProcessor implements BeanPostProcessor {
// 省略了其他逻辑
// 后置处理器:读取注解中的元数据,并向admin注册
@Override
public Object postProcessAfterInitialization(@NonNull final Object bean, @NonNull final String beanName) throws BeansException {
// 配置属性,如果 isFull=true 的话,表示注册整个微服务
if (isFull) {
return bean;
}
// 获取当前bean的Controller注解
Controller controller = AnnotationUtils.findAnnotation(bean.getClass(), Controller.class);
// 获取当前bean的RequestMapping注解
RequestMapping requestMapping = AnnotationUtils.findAnnotation(bean.getClass(), RequestMapping.class);
// 如果这个bean是一个接口
if (controller != null || requestMapping != null) {
// 获取当前bean的 ShenyuSpringMvcClient 注解
ShenyuSpringMvcClient clazzAnnotation = AnnotationUtils.findAnnotation(bean.getClass(), ShenyuSpringMvcClient.class);
String prePath = "";
//如果没有 ShenyuSpringMvcClient 注解,就返回,表示这个接口不需要注册
if (Objects.isNull(clazzAnnotation)) {
return bean;
}
//如果 ShenyuSpringMvcClient 注解中的path属性包括 * ,表示注册整个接口
if (clazzAnnotation.path().indexOf("*") > 1) {
// 构建元数据,发送注册事件
publisher.publishEvent(buildMetaDataDTO(clazzAnnotation, prePath));
return bean;
}
prePath = clazzAnnotation.path();
// 获取当前bean的所有方法
final Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(bean.getClass());
// 遍历方法
for (Method method : methods) {
// 获取当前方法上的注解 ShenyuSpringMvcClient
ShenyuSpringMvcClient shenyuSpringMvcClient = AnnotationUtils.findAnnotation(method, ShenyuSpringMvcClient.class);
// 如果方法上有注解ShenyuSpringMvcClient,就表示该方法需要注册
if (Objects.nonNull(shenyuSpringMvcClient)) {
// 构建元数据,发送注册事件
publisher.publishEvent(buildMetaDataDTO(shenyuSpringMvcClient, prePath));
}
}
}
return bean;
}
// 构造元数据
private MetaDataRegisterDTO buildMetaDataDTO(final ShenyuSpringMvcClient shenyuSpringMvcClient, final String prePath) {
// contextPath上下文名称
String contextPath = this.contextPath;
// appName应用名称
String appName = this.appName;
// path注册路径
String path;
if (StringUtils.isEmpty(contextPath)) {
path = prePath + shenyuSpringMvcClient.path();
} else {
path = contextPath + prePath + shenyuSpringMvcClient.path();
}
// desc描述信息
String desc = shenyuSpringMvcClient.desc();
// ruleName规则名称,没有填写的话就和path一致
String configRuleName = shenyuSpringMvcClient.ruleName();
String ruleName = StringUtils.isBlank(configRuleName) ? path : configRuleName;
// 构建元数据
return MetaDataRegisterDTO.builder()
.contextPath(contextPath)
.appName(appName)
.path(path)
.pathDesc(desc)
.rpcType(RpcTypeEnum.HTTP.getName())
.enabled(shenyuSpringMvcClient.enabled())
.ruleName(ruleName)
.registerMetaData(shenyuSpringMvcClient.registerMetaData())
.build();
}
}
在后置处理器中,需要读取配置属性,如果 isFull=true
的话,表示注册整个微服务。获取当前bean
的Controller
注解、RequestMapping
注解、ShenyuSpringMvcClient
注解,通过读取这些注解信息判断当前bean
是否是接口?接口是否需要注册?方法是否需要注册?然后根据ShenyuSpringMvcClient
注解中的属性构建元数据,最后通过publisher.publishEvent()
发布事件进行注册。
Controller
注解和RequestMapping
注解是由Spring
提供的,这个大家应该很熟悉,不过多赘述。ShenyuSpringMvcClient
注解是由Apache ShenYu
提供的,用于注册SpringMvc
客户端,它的定义如下:
/**
* shenyu 客户端接口,用于方法上或类上
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface ShenyuSpringMvcClient {
// path 注册路径
String path();
// ruleName 规则名称
String ruleName() default "";
// desc 描述信息
String desc() default "";
// enabled是否启用
boolean enabled() default true;
// registerMetaData 注册元数据
boolean registerMetaData() default false;
}
它的使用如下:
@RestController
@RequestMapping("/test")
@ShenyuSpringMvcClient(path = "/test/**") // 表示整个接口注册
public class HttpTestController {
//......
}
@RestController
@RequestMapping("/order")
@ShenyuSpringMvcClient(path = "/order")
public class OrderController {
/**
* Save order dto.
*
* @param orderDTO the order dto
* @return the order dto
*/
@PostMapping("/save")
@ShenyuSpringMvcClient(path = "/save", desc = "Save order") // 注册当前方法
public OrderDTO save(@RequestBody final OrderDTO orderDTO) {
orderDTO.setName("hello world save order");
return orderDTO;
}
- publisher.publishEvent() 发布注册事件
该方法会将数据发送到Disruptor
队列中,关于Disruptor
队列更多细节这里不做更多介绍,这不影响分析注册的流程。
当数据发送后,Disruptor
队列的消费者会处理数据,进行消费。
QueueConsumer
是一个消费者,它实现了WorkHandler
接口,它的创建过程在providerManage.startup()
逻辑中。WorkHandler
接口是disruptor
的数据消费接口,只有一个方法是onEvent()
。
package com.lmax.disruptor;
public interface WorkHandler<T> {
void onEvent(T var1) throws Exception;
}
QueueConsumer
重写了onEvent()
方法,主要逻辑是生成消费任务,然后在线程池中去执行。
/**
*
* 队列消费者
*/
public class QueueConsumer<T> implements WorkHandler<DataEvent<T>> {
// 省略了其他逻辑
@Override
public void onEvent(final DataEvent<T> t) {
if (t != null) {
// 通过工厂创建队列消费任务
QueueConsumerExecutor<T> queueConsumerExecutor = factory.create();
// 保存数据
queueConsumerExecutor.setData(t.getData());
// help gc
t.setData(null);
// 放在线程池中执行 消费任务
executor.execute(queueConsumerExecutor);
}
}
}
QueueConsumerExecutor
是在线程池中被执行的任务,它实现了Runnable
接口,具体的实现类有两个:
RegisterClientConsumerExecutor
:客户端消费者执行器;
RegisterServerConsumerExecutor
:服务端消费者执行器。
顾名思义,一个负责处理客户端任务,一个负责处理服务端任务(服务端就是admin
,在下文进行分析)。
- RegisterClientConsumerExecutor 消费者执行器
重写的run()
逻辑如下:
public final class RegisterClientConsumerExecutor extends QueueConsumerExecutor<DataTypeParent> {
//......
@Override
public void run() {
// 获取数据
DataTypeParent dataTypeParent = getData();
// 根据数据类型调用相应的处理器进行处理
subscribers.get(dataTypeParent.getType()).executor(Lists.newArrayList(dataTypeParent));
}
}
根据不同的数据类型调用不同的处理器去执行相应的任务。数据类型有两种,一个是元数据,记录客户端注册信息。一个是URI
数据,记录客户端服务信息。
//数据类型
public enum DataType {
// 元数据
META_DATA,
// URI数据
URI,
}
- ExecutorSubscriber#executor() 执行器订阅者
执行器订阅者也分为两类,一个是处理元数据,一个是处理URI
。在客户端和服务端分别有两个,所以一共是四个。
- ShenyuClientMetadataExecutorSubscriber#executor()
客户端这边对元数据处理逻辑是:遍历元数据信息,调用接口方法persistInterface()
完成数据的发布。
public class ShenyuClientMetadataExecutorSubscriber implements ExecutorTypeSubscriber<MetaDataRegisterDTO> {
//......
@Override
public DataType getType() {
return DataType.META_DATA; // 元数据
}
@Override
public void executor(final Collection<MetaDataRegisterDTO> metaDataRegisterDTOList) {
for (MetaDataRegisterDTO metaDataRegisterDTO : metaDataRegisterDTOList) {
// 调用接口方法persistInterface()完成数据的发布
shenyuClientRegisterRepository.persistInterface(metaDataRegisterDTO);
}
}
}
- ShenyuClientRegisterRepository#persistInterface()
ShenyuClientRegisterRepository
是一个接口,用于表示客户端数据注册,它的实现类目前有五种,每一种就表示一种注册方法。
ConsulClientRegisterRepository
:通过Consul
实现客户端注册;
EtcdClientRegisterRepository
:通过Etcd
实现客户端注册;
HttpClientRegisterRepository
:通过Http
实现客户端注册;
NacosClientRegisterRepository
:通过Nacos
实现客户端注册;
ZookeeperClientRegisterRepository
:通过Zookeeper
实现客户端注册;
从图中可以看出,注册中心的加载是通过SPI
的方式完成的。这个在前面提到过了,在客户端通用配置文件中,通过指定配置文件中的属性完成具体的类加载。
/**
* 加载 ShenyuClientRegisterRepository
*/
public final class ShenyuClientRegisterRepositoryFactory {
private static final Map<String, ShenyuClientRegisterRepository> REPOSITORY_MAP = new ConcurrentHashMap<>();
/**
* 创建 ShenyuClientRegisterRepository
*/
public static ShenyuClientRegisterRepository newInstance(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig) {
if (!REPOSITORY_MAP.containsKey(shenyuRegisterCenterConfig.getRegisterType())) {
// 通过SPI的方式进行加载,类型由registerType决定
ShenyuClientRegisterRepository result = ExtensionLoader.getExtensionLoader(ShenyuClientRegisterRepository.class).getJoin(shenyuRegisterCenterConfig.getRegisterType());
//执行初始化操作
result.init(shenyuRegisterCenterConfig);
ShenyuClientShutdownHook.set(result, shenyuRegisterCenterConfig.getProps());
REPOSITORY_MAP.put(shenyuRegisterCenterConfig.getRegisterType(), result);
return result;
}
return REPOSITORY_MAP.get(shenyuRegisterCenterConfig.getRegisterType());
}
}
本文的源码分析是基于Http
的方式进行注册,所以我们先分析HttpClientRegisterRepository
,其他的注册方式后续再分析。
通过http
的方式注册很简单,就是调用工具类发送http
请求。注册元数据和URI都是调用的同一个方法doRegister()
,指定接口和类型就好。
/shenyu-client/register-metadata
:服务端提供的接口用于注册元数据。
/shenyu-client/register-uri
: 服务端提供的接口用于注册URI。
@Join
public class HttpClientRegisterRepository implements ShenyuClientRegisterRepository {
// 服务端提供的接口用于注册元数据
private static final String META_PATH = "/shenyu-client/register-metadata";
// 服务端提供的接口用于注册URI
private static final String URI_PATH = "/shenyu-client/register-uri";
//注册URI
@Override
public void persistURI(final URIRegisterDTO registerDTO) {
doRegister(registerDTO, URI_PATH, Constants.URI);
}
//注册接口(就是元数据信息)
@Override
public void persistInterface(final MetaDataRegisterDTO metadata) {
doRegister(metadata, META_PATH, META_TYPE);
}
// 进行注册
private <T> void doRegister(final T t, final String path, final String type) {
// 遍历admin服务列表(admin可能是集群)
for (String server : serverList) {
try {
// 调用工具类发送 http 请求
RegisterUtils.doRegister(GsonUtils.getInstance().toJson(t), server + path, type);
return;
} catch (Exception e) {
LOGGER.error("register admin url :{} is fail, will retry", server);
}
}
}
}
将数据序列化后,通过OkHttp
发送数据。
public final class RegisterUtils {
//......
// 通过OkHttp发送数据
public static void doRegister(final String json, final String url, final String type) throws IOException {
String result = OkHttpTools.getInstance().post(url, json);
if (Objects.equals(SUCCESS, result)) {
LOGGER.info("{} client register success: {} ", type, json);
} else {
LOGGER.error("{} client register error: {} ", type, json);
}
}
}
至此,客户端通过http
的方式注册元数据的逻辑就分析完了。小结一下:通过读取自定义的注解信息构造元数据,将数据发到Disruptor
队列,然后从队列中消费数据,将消费者放到线程池中去执行,最终通过发送http
请求到admin
。
客户端元数据注册流程的源码分析过程完成了,用流程图描述如下:
2.4 构建 URI 的 ContextRegisterListener
创建 ContextRegisterListener
,负责客户端URI
数据的构建和注册,它的创建是在配置文件中完成。
@Configuration
@ImportAutoConfiguration(ShenyuClientCommonBeanConfiguration.class)
public class ShenyuSpringMvcClientConfiguration {
// ......
// 创建 ContextRegisterListener
@Bean
public ContextRegisterListener contextRegisterListener(final ShenyuClientConfig clientConfig) {
return new ContextRegisterListener(clientConfig.getClient().get(RpcTypeEnum.HTTP.getName()));
}
}
ContextRegisterListener
实现了ApplicationListener
接口,并重写了onApplicationEvent()
方法,当有Spring事件发生后,该方法会执行。
// 实现了ApplicationListener接口
public class ContextRegisterListener implements ApplicationListener<ContextRefreshedEvent> {
//......
//通过构造函数完成实例化
public ContextRegisterListener(final PropertiesConfig clientConfig) {
// 读取 shenyu.client.http 配置信息
Properties props = clientConfig.getProps();
// isFull是否注册整个服务
this.isFull = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString()));
// contextPath上下文路径
String contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH);
this.contextPath = contextPath;
if (isFull) {
if (StringUtils.isBlank(contextPath)) {
String errorMsg = "http register param must config the contextPath";
LOG.error(errorMsg);
throw new ShenyuClientIllegalArgumentException(errorMsg);
}
this.contextPath = contextPath + "/**";
}
// port 客户端端口信息
int port = Integer.parseInt(props.getProperty(ShenyuClientConstants.PORT));
// appName 应用名称
this.appName = props.getProperty(ShenyuClientConstants.APP_NAME);
// host信息
this.host = props.getProperty(ShenyuClientConstants.HOST);
this.port = port;
}
// 当有上下文刷新事件ContextRefreshedEvent发生时,该方法会执行
@Override
public void onApplicationEvent(@NonNull final ContextRefreshedEvent contextRefreshedEvent) {
//保证该方法的内容只执行一次
if (!registered.compareAndSet(false, true)) {
return;
}
// 如果是 isFull=true 代表注册整个服务,构建元数据并注册
if (isFull) {
publisher.publishEvent(buildMetaDataDTO());
}
// 构建URI数据并注册
publisher.publishEvent(buildURIRegisterDTO());
}
// 构建URI数据
private URIRegisterDTO buildURIRegisterDTO() {
String host = IpUtils.isCompleteHost(this.host) ? this.host : IpUtils.getHost(this.host);
return URIRegisterDTO.builder()
.contextPath(this.contextPath)
.appName(appName)
.host(host)
.port(port)
.rpcType(RpcTypeEnum.HTTP.getName())
.build();
}
// 构建元数据
private MetaDataRegisterDTO buildMetaDataDTO() {
String contextPath = this.contextPath;
String appName = this.appName;
return MetaDataRegisterDTO.builder()
.contextPath(contextPath)
.appName(appName)
.path(contextPath)
.rpcType(RpcTypeEnum.HTTP.getName())
.enabled(true)
.ruleName(contextPath)
.build();
}
}
在构造函数中主要是读取属性配置。
onApplicationEvent()
方法是有Spring
事件发生时会执行,这里的参数是ContextRefreshedEvent
,表示上下文刷新事件。当Spring
容器就绪后执行此处逻辑:如果是 isFull=true
代表注册整个服务,构建元数据并注册,在前面分析的后置处理器SpringMvcClientBeanPostProcessor
中没有处理 isFull=true
的情况,所以在此处进行了处理。然后再构建URI
数据并注册。
ContextRefreshedEvent
是Spring
内置事件。ApplicationContext
被初始化或刷新时,该事件被触发。这也可以在 ConfigurableApplicationContext
接口中使用 refresh()
方法来发生。此处的初始化是指:所有的Bean
被成功装载,后处理Bean
被检测并激活,所有Singleton Bean
被预实例化,ApplicationContext
容器已就绪可用。
注册逻辑都是通过 publisher.publishEvent()
完成。在前面都已经分析过了:向Disruptor
队列写入数据,再从中消费数据,最后通过ExecutorSubscriber
去处理。
- ExecutorSubscriber#executor()
执行器订阅者分为两类,一个是处理元数据,一个是处理URI
。在客户端和服务端分别有两个,所以一共是四个。
这里是注册URI
信息,所以执行类是ShenyuClientURIExecutorSubscriber
。
- ShenyuClientURIExecutorSubscriber#executor()
主要逻辑是遍历URI数据集合,通过persistURI()
方法实现数据注册。
public class ShenyuClientURIExecutorSubscriber implements ExecutorTypeSubscriber<URIRegisterDTO> {
//......
@Override
public DataType getType() {
return DataType.URI; //数据类型是URI
}
// 注册URI数据
@Override
public void executor(final Collection<URIRegisterDTO> dataList) {
for (URIRegisterDTO uriRegisterDTO : dataList) {
Stopwatch stopwatch = Stopwatch.createStarted();
while (true) {
try (Socket ignored = new Socket(uriRegisterDTO.getHost(), uriRegisterDTO.getPort())) {
break;
} catch (IOException e) {
long sleepTime = 1000;
// maybe the port is delay exposed
if (stopwatch.elapsed(TimeUnit.SECONDS) > 5) {
LOG.error("host:{}, port:{} connection failed, will retry",
uriRegisterDTO.getHost(), uriRegisterDTO.getPort());
// If the connection fails for a long time, Increase sleep time
if (stopwatch.elapsed(TimeUnit.SECONDS) > 180) {
sleepTime = 10000;
}
}
try {
TimeUnit.MILLISECONDS.sleep(sleepTime);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
//添加hook,优雅停止客户端
ShenyuClientShutdownHook.delayOtherHooks();
// 注册URI
shenyuClientRegisterRepository.persistURI(uriRegisterDTO);
}
}
}
代码中的while(true)
循环是为了保证客户端已经成功启动了,通过host
和port
可以连接上。
后面的逻辑是:添加hook
函数,用于优雅停止客户端 。
通过persistURI()
方法实现数据注册。整个逻辑也在前面分析过了,最终就是通过OkHttp
客户端向shenyu-admin
发起http
,通过http
的方式注册URI
。
分析到这里就将客户端的注册逻辑分析完了,将构建的元数据和URI数据发送到Disruptor
队列,再从中消费,读取数据,通过http
向admin
发送数据。
客户端URI
注册流程的源码分析完成了,流程图如下:
3. 服务端注册流程
3.1 注册接口ShenyuHttpRegistryController
从前面的分析可以知道,服务端提供了注册的两个接口:
/shenyu-client/register-metadata
:服务端提供的接口用于注册元数据。
/shenyu-client/register-uri
: 服务端提供的接口用于注册URI。
这两个接口位于ShenyuHttpRegistryController
中,它实现了ShenyuServerRegisterRepository
接口,是服务端注册的实现类。它用@Join
标记,表示通过SPI
进行加载。
// shenuyu客户端接口
@RequestMapping("/shenyu-client")
@Join
public class ShenyuHttpRegistryController implements ShenyuServerRegisterRepository {
private ShenyuServerRegisterPublisher publisher;
@Override
public void init(final ShenyuServerRegisterPublisher publisher, final ShenyuRegisterCenterConfig config) {
this.publisher = publisher;
}
// 注册元数据
@PostMapping("/register-metadata")
@ResponseBody
public String registerMetadata(@RequestBody final MetaDataRegisterDTO metaDataRegisterDTO) {
publish(metaDataRegisterDTO);
return ShenyuResultMessage.SUCCESS;
}
// 注册URI
@PostMapping("/register-uri")
@ResponseBody
public String registerURI(@RequestBody final URIRegisterDTO uriRegisterDTO) {
publish(uriRegisterDTO);
return ShenyuResultMessage.SUCCESS;
}
// 发布注册事件
private <T> void publish(final T t) {
publisher.publish(Collections.singletonList(t));
}
}
两个注册接口获取到数据好,就调用了publish()
方法,把数据发布到Disruptor
队列中。
ShenyuServerRegisterRepository
接口
ShenyuServerRegisterRepository
接口是服务注册接口,它有五个实现类,表示有五种注册方式:
ConsulServerRegisterRepository
:通过Consul
实现注册;
EtcdServerRegisterRepository
:通过Etcd
实现注册;
NacosServerRegisterRepository
:通过Nacos
实现注册;
ShenyuHttpRegistryController
:通过Http
实现注册;
ZookeeperServerRegisterRepository
:通过Zookeeper
实现注册。
具体用哪一种方式,是通过配置文件指定的,然后通过SPI
进行加载。
在shenyu-admin
中的application.yml
文件中配置注册方式,registerType
指定注册类型,当用http
进行注册时,serverLists
不需要填写,更多配置说明可以参考官网 客户端接入配置 。
shenyu:
register:
registerType: http
serverLists:
- RegisterCenterConfiguration 加载配置
在引入相关依赖和属性配置后,启动shenyu-admin
时,会先加载配置文件,和注册中心相关的配置文件类是RegisterCenterConfiguration
。
// 注册中心配置类
@Configuration
public class RegisterCenterConfiguration {
// 读取配置属性
@Bean
@ConfigurationProperties(prefix = "shenyu.register")
public ShenyuRegisterCenterConfig shenyuRegisterCenterConfig() {
return new ShenyuRegisterCenterConfig();
}
//创建ShenyuServerRegisterRepository,用于服务端注册
@Bean
public ShenyuServerRegisterRepository shenyuServerRegisterRepository(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig, final List<ShenyuClientRegisterService> shenyuClientRegisterService) {
// 1.从配置属性中获取注册类型
String registerType = shenyuRegisterCenterConfig.getRegisterType();
// 2.通过注册类型,以SPI的方法加载实现类
ShenyuServerRegisterRepository registerRepository = ExtensionLoader.getExtensionLoader(ShenyuServerRegisterRepository.class).getJoin(registerType);
// 3.获取publisher,向Disruptor队列中写数据
RegisterServerDisruptorPublisher publisher = RegisterServerDisruptorPublisher.getInstance();
// 4.注册Service, rpcType -> registerService
Map<String, ShenyuClientRegisterService> registerServiceMap = shenyuClientRegisterService.stream().collect(Collectors.toMap(ShenyuClientRegisterService::rpcType, e -> e));
// 5.事件发布的准备工作
publisher.start(registerServiceMap);
// 6.注册的初始化操作
registerRepository.init(publisher, shenyuRegisterCenterConfig);
return registerRepository;
}
}
在配置类中生成了两个bean
:
在创建shenyuServerRegisterRepository
的过程中,也进行了一系列的准备工作:
服务端向Disruptor
队列写入数据的发布者 ,通过单例模式构建。
public class RegisterServerDisruptorPublisher implements ShenyuServerRegisterPublisher {
//私有属性
private static final RegisterServerDisruptorPublisher INSTANCE = new RegisterServerDisruptorPublisher();
//公开静态方法获取实例
public static RegisterServerDisruptorPublisher getInstance() {
return INSTANCE;
}
//事件发布的准备工作,添加服务端元数据和URI订阅器,处理数据。并且启动Disruptor队列。
public void start(final Map<String, ShenyuClientRegisterService> shenyuClientRegisterService) {
//服务端注册工厂
factory = new RegisterServerExecutorFactory();
//添加URI数据订阅器
factory.addSubscribers(new URIRegisterExecutorSubscriber(shenyuClientRegisterService));
//添加元数据订阅器
factory.addSubscribers(new MetadataExecutorSubscriber(shenyuClientRegisterService));
//启动Disruptor队列
providerManage = new DisruptorProviderManage(factory);
providerManage.startup();
}
// 向队列中写入数据
@Override
public <T> void publish(final T data) {
DisruptorProvider<Object> provider = providerManage.getProvider();
provider.onData(f -> f.setData(data));
}
@Override
public void close() {
providerManage.getProvider().shutdown();
}
}
配置文件的加载,可看作是注册中心服务端初始化流程,用图描述如下:
3.2 消费数据QueueConsumer
在前面分析了客户端disruptor
队列消费数据的过。服务端也是一样的逻辑,只是其中执行任务的执行者变了。
QueueConsumer
是一个消费者,它实现了WorkHandler
接口,它的创建过程在providerManage.startup()
逻辑中。WorkHandler
接口是disruptor
的数据消费接口,只有一个方法是onEvent()
。
package com.lmax.disruptor;
public interface WorkHandler<T> {
void onEvent(T var1) throws Exception;
}
QueueConsumer
重写了onEvent()
方法,主要逻辑是生成消费任务,然后在线程池中去执行。
/**
*
* 队列消费者
*/
public class QueueConsumer<T> implements WorkHandler<DataEvent<T>> {
// 省略了其他逻辑
@Override
public void onEvent(final DataEvent<T> t) {
if (t != null) {
// 通过工厂创建队列消费任务
QueueConsumerExecutor<T> queueConsumerExecutor = factory.create();
// 保存数据
queueConsumerExecutor.setData(t.getData());
// help gc
t.setData(null);
// 放在线程池中执行 消费任务
executor.execute(queueConsumerExecutor);
}
}
}
QueueConsumerExecutor
是在线程池中被执行的任务,它实现了Runnable
接口,具体的实现类有两个:
RegisterClientConsumerExecutor
:客户端消费者执行器;
RegisterServerConsumerExecutor
:服务端消费者执行器。
顾名思义,一个负责处理客户端任务,一个负责处理服务端任务。
RegisterServerConsumerExecutor#run()
RegisterServerConsumerExecutor
是服务端消费者执行器,它通过QueueConsumerExecutor
间接实现了Runnable
接口,并重写了run()
方法。
public final class RegisterServerConsumerExecutor extends QueueConsumerExecutor<List<DataTypeParent>> {
// ...
@Override
public void run() {
//获取从disruptor队列中拿到的数据
List<DataTypeParent> results = getData();
// 数据校验
results = results.stream().filter(data -> isValidData(data)).collect(Collectors.toList());
if (CollectionUtils.isEmpty(results)) {
return;
}
//根据类型执行操作
getType(results).executor(results);
}
// 根据类型获取订阅者
private ExecutorSubscriber getType(final List<DataTypeParent> list) {
DataTypeParent result = list.get(0);
return subscribers.get(result.getType());
}
}
- ExecutorSubscriber#executor()
执行器订阅者分为两类,一个是处理元数据,一个是处理URI
。在客户端和服务端分别有两个,所以一共是四个。
- MetadataExecutorSubscriber#executor()
如果是注册元数据,则通过MetadataExecutorSubscriber#executor()
实现:根据类型获取注册Service
,调用register()
。
public class MetadataExecutorSubscriber implements ExecutorTypeSubscriber<MetaDataRegisterDTO> {
//......
@Override
public DataType getType() {
return DataType.META_DATA; // 元数据类型
}
@Override
public void executor(final Collection<MetaDataRegisterDTO> metaDataRegisterDTOList) {
// 遍历元数据列表
for (MetaDataRegisterDTO metaDataRegisterDTO : metaDataRegisterDTOList) {
// 根据类型获取注册Service
ShenyuClientRegisterService shenyuClientRegisterService = this.shenyuClientRegisterService.get(metaDataRegisterDTO.getRpcType());
Objects.requireNonNull(shenyuClientRegisterService);
// 对元数据进行注册,加锁确保顺序执行,防止并发错误
synchronized (ShenyuClientRegisterService.class) {
shenyuClientRegisterService.register(metaDataRegisterDTO);
}
}
}
}
- URIRegisterExecutorSubscriber#executor()
如果是注册元数据,则通过URIRegisterExecutorSubscriber#executor()
实现:构建URI
数据,根据注册类型查找Service,
通过registerURI
方法实现注册。
public class URIRegisterExecutorSubscriber implements ExecutorTypeSubscriber<URIRegisterDTO> {
//......
@Override
public DataType getType() {
return DataType.URI; // URI数据类型
}
@Override
public void executor(final Collection<URIRegisterDTO> dataList) {
if (CollectionUtils.isEmpty(dataList)) {
return;
}
// 构建URI数据类型,通过registerURI方法实现注册
findService(dataList).ifPresent(service -> {
Map<String, List<URIRegisterDTO>> listMap = buildData(dataList);
listMap.forEach(service::registerURI);
});
}
// 根据类型查找Service
private Optional<ShenyuClientRegisterService> findService(final Collection<URIRegisterDTO> dataList) {
return dataList.stream().map(dto -> shenyuClientRegisterService.get(dto.getRpcType())).findFirst();
}
}
- ShenyuClientRegisterService#register()
ShenyuClientRegisterService
是注册方法接口,它有多个实现类:
AbstractContextPathRegisterService
:抽象类,处理部分公共逻辑;
AbstractShenyuClientRegisterServiceImpl
::抽象类,处理部分公共逻辑;
ShenyuClientRegisterDivideServiceImpl
:divide
类,处理http
注册类型;
ShenyuClientRegisterDubboServiceImpl
:dubbo
类,处理dubbo
注册类型;
ShenyuClientRegisterGrpcServiceImpl
:gRPC
类,处理gRPC
注册类型;
ShenyuClientRegisterMotanServiceImpl
:Motan
类,处理Motan
注册类型;
ShenyuClientRegisterSofaServiceImpl
:Sofa
类,处理Sofa
注册类型;
ShenyuClientRegisterSpringCloudServiceImpl
:SpringCloud
类,处理SpringCloud
注册类型;
ShenyuClientRegisterTarsServiceImpl
:Tars
类,处理Tars
注册类型;
从上面可以看出每种微服务都有对应的注册实现类,本文的源码分析是 以官方提供的 shenyu-examples-http 为例,是属http
注册类型,所以元数据和URI数据的注册实现类是 ShenyuClientRegisterDivideServiceImpl
:
public String register(final MetaDataRegisterDTO dto) {
// 1.注册选择器信息
String selectorHandler = selectorHandler(dto);
String selectorId = selectorService.registerDefault(dto, PluginNameAdapter.rpcTypeAdapter(rpcType()), selectorHandler);
// 2.注册规则信息
String ruleHandler = ruleHandler();
RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);
ruleService.registerDefault(ruleDTO);
// 3.注册元数据信息
registerMetadata(dto);
// 4.注册contextPath
String contextPath = dto.getContextPath();
if (StringUtils.isNotEmpty(contextPath)) {
registerContextPath(dto);
}
return ShenyuResultMessage.SUCCESS;
}
整个注册逻辑可以分为4个步骤:
- 1.注册选择器信息
- 2.注册规则信息
- 3.注册元数据信息
- 4.注册
contextPath
在admin
这一侧通过客户端的元数据信息需要构建选择器、规则、元数据和ContextPath
。具体的注册过程和细节处理跟rpc
类型有关。我们就不再继续向下追踪了,对于注册中心的逻辑分析,跟踪到这里就够了。
服务端元数据注册流程的源码分析完了,流程图描述如下:
public String registerURI(final String selectorName, final List<URIRegisterDTO> uriList) {
if (CollectionUtils.isEmpty(uriList)) {
return "";
}
// 对应的选择器是否存在
SelectorDO selectorDO = selectorService.findByNameAndPluginName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));
if (Objects.isNull(selectorDO)) {
return "";
}
// 处理选择器中的handler信息
String handler = buildHandle(uriList, selectorDO);
selectorDO.setHandle(handler);
SelectorData selectorData = selectorService.buildByName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));
selectorData.setHandle(handler);
// 更新数据库中的记录
selectorService.updateSelective(selectorDO);
// 发布事件
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData)));
return ShenyuResultMessage.SUCCESS;
}
admin
拿到URI
数据后,主要是更新选择器中的handler
信息,然后写入到数据库,最后发布事件通知网关。通知网关的逻辑是由数据同步操作完成,这在之前的文章中已经分析过了,就不再赘述。
服务端URI
注册流程的源码分析完成了,用图描述如下:
至此,服务端注册流程也就分析完了,主要通过对外提供的接口,接受客户端的注册信息,然后写入到Disruptor
队列,再从中消费数据,根据接收到的元数据和URI
数据更新admin
的选择器、规则、元数据和选择器的handler
。
4. 总结
本文主要对Apache ShenYu
网关中的http注册
模块进行了源码分析。涉及到的主要知识点,归纳如下:
- 注册中心是为了将客户端信息注册到
admin
,方便流量筛选;
http
注册是将客户端元数据信息和URI
信息注册到admin
;
http
服务的接入通过注解@ShenyuSpringMvcClient
标识;
- 注册信息的构建主要通过
Spring
的后置处理器BeanPostProcessor
和应用监听器ApplicationListener
;
- 注册类型的加载通过
SPI
完成;
- 引入
Disruptor
队列是为了数据与操作解耦,以及数据缓冲。
- 注册中心的实现采用了面向接口编程,使用模板方法、单例、观察者等设计模式。