本地电脑16C16G,起了idea、网关、一个springboot下游服务、nacos,jemter起100个线程发请求

吞吐量在2.0w

同条件下,测试SpringCloud Gateway的吞吐量是1.1w,接近1.2w

列出市面上常见的注册中心与配置中心:

Nacos

默认模式:基于 AP 模型,侧重于服务的高可用性,允许数据在短时间内不一致。

可配置模式:Nacos 也支持 CP 模式,可通过配置切换,满足对一致性要求更高的场景。

Zookeeper

CP 系统:使用 ZAB(Zookeeper Atomic Broadcast)协议,保证数据的强一致性。

应用场景:适合对数据一致性要求高的场景,如分布式锁、配置管理。

Eureka

AP 系统:设计上强调服务的高可用性,即使在部分节点失联的情况下,仍能提供服务注册和发现功能。

一致性处理:允许服务实例的信息在短时间内不一致,依靠心跳和自我保护机制来最终达到一致。

Consul

CP 系统:使用 Raft 共识算法,确保数据的强一致性。

应用场景:适用于对一致性有严格要求的服务注册、配置管理等。

列出市面上常见的弹性框架,Sentinel、Hystrix、Resilience4j,最终选择Resilience4j

Sentinel:

整体较为庞大,配置相对复杂,与我网关的轻量理念不符

需要引入配置台和控制台,非Spring环境下引入成本大

Hystrix:

已停止维护,不适合用于新项目,Netfix官方推荐Resilience4j

Resilience4j:

轻量、模块化、lambda表达式编程、功能全面、使用简单、对环境没有要求,非Spring环境下也能很好继承

项目里的熔断用的是Resilience4j,为什么不选择Hystrix?

:::color4
这里可以吹一波

Hystrix熔断的问题:

默认机制是线程池代理,即有请求就有线程代理

hystrix是接口级别

一个接口一个线程池

用虚拟线程就可以解决这个问题

:::

熔断两种:

1.线程代理熔断

2.信号量,sentinel用的就是信号量,比较简单粗暴。但它只是一个开关,只能进或者不进或者知道进来多少。这时如果有一个请求的线程死循环了,也就只能让别的线程不要进来,但无法结束当前这个死循环的线程。而有了线程代理后就可以干很多事情了,只能说各有各的优缺点。如请求线程执行超过1s,浏览器已经timeout,但服务端的请求线程还没停,这时候最好的情况就是服务端你自个把自己给停了,但问题是你用信号量服务端怎么把自己给停了呢,这就是个问题,而线程代理就可以处理这个问题。还有一点,就是停请求一般停的是查询,写是不敢停的

大厂落地的时候不敢在网关上搞太多东西,因为出bug就gg了,一般不在api网关上熔断,虽然理论上可以

举例:

流量网关:nginx

api网关:vintage,api网关其实没啥神秘的,就是路由

rpc:motan

在api网关上做熔断的问题

1.单点故障

2.1个服务200个接口,就一个核心接口流量大。熔断颗粒度得做到接口

3.集群是变化的,得动态判断,比较费劲

熔断:挡流量。扛不住就减少请求呗

降级:是把一部分不重要的业务停了,活干不过来了,就不干不重要的

熔断起作用得看 熔断策略,一般两种,请求数量以及响应时间,请求数量及响应时间,如果是请求数量DDOS就会断,而响应时间得看接口性能

降级就是得有开关的,人来决定

限流

限流更偏向于控制并发的速率,信号量隔离更偏向于控制并发的数量,而且是同一时间的数量。

限流是说,希望能够把控并发的速率,比如希望是平滑的流量。

信号量隔离的话就是同一时间,对资源的访问数量,比如数据库资源,需要做信号量隔离,避免同一时间数据库访问数量过多

滑动窗口:

和令牌桶其实非常像,实现方式不同,效率可能更差点,因为需要保证剔除请求时的一个线程安全,需要synchronized

令牌桶:

能应对突发大流量,但是超出阈值后,则会把流量变得平滑

漏桶:

恒定非常平滑的流量,适用场景比如限制数据库每秒的写入次数。不允许大突发流量

列出市面上常见的http客户端:AsyncHttpClient、Apache HttpClient、OkHttp、Spring WebClient

为什么选择AsyncHttpClient?

Apache HttpClient:默认同步为主、异步支持较弱、配置较重

OkHttp:更多应用于Android

Spring WebClient:过度依赖Spring框架

AsyncHttpClient:异步非阻塞响应、轻量、对环境无要求,甚至非Spring环境下集成更好

函数的真谛

1
2
3
4
5
6
7
8
9
10
11
12
interface Promise{
boolean is(a,b)
}

fun getByCondition(Promise pm){
if(pm.is(a,b))
}

//调用
getByCondition( (a,b) -> (xxx) )


logger中的

interface Supplier{

T get();

}

在软件开发中,”interface supplier” 通常指的是 提供特定类型对象或数据的接口。 它可以被理解为一个 提供者,其主要功能是生成或获取某种类型的实例,而不需要调用者关心具体的创建过程。 在Java 8及以后的版本中,java.util.function.Supplier<T>接口就扮演着这样的角色,它代表了一个不接受参数但返回一个值(类型为T)的函数。

函数对象表现形式

lambda表达式

1
2
3
// Lambda 表达式
Predicate<String> startsWithA = s -> s.startsWith("A");
System.out.println(startsWithA.test("Apple")); // 输出 true

方法引用

1
2
3
// 方法引用
Predicate<String> startsWithA_MethodRef = String::startsWith;
System.out.println(startsWithA_MethodRef.test("Banana", "B")); // 输出 false

函数对象类型

参数个数类型相同,返回值类型相同 -> 函数式接口,只包含一个抽象方法,用@FunctionallInterface

自定义函数

1
2
3
4
5
6
7
8
//编译期检查,是否只有一个抽象方法
@FunctionInterface
interface Type{
boolean op(int a)
}
main{
Type type= a-> xxx
}

jdk函数

闭包

限制:参数以外的数据必须是final or effictive final

柯里化

Optional一条龙

Optional用法与争议点 - 扣钉日记 - 博客园

1
2
3
4
5
6
public String getCity(User user) throws Exception{  
return Optional.ofNullable(user)
.map(u-> u.getAddress())
.map(a->a.getCity())
.orElseThrow(()->new Exception("取指错误"));
}

目录

1、路由转发

2、失败重试

3、熔断降级

4、实现过程

4.1 正常路由转发

4.2 请求重试

4.3 熔断降级


1、路由转发

路由转发**** 是网关处理完毕所有过虑逻辑之后的最后一个要执行的操作,它负责将请求最终转发到某一个指定的后端服务,这里参考 Spring Cloud Gateway 的实现方式来模拟一个路由转发过滤器

在 Spring Cloud Gateway 这一先进的微服务网关解决方案中,路由转发过滤器扮演着至关重要的角色,负责对进站的 HTTP 请求进行精细化处理与精准调度。以下详述其核心功能及在微服务体系中的价值:

  1. 请求适配与重定向**** :路由转发过滤器具备强大的请求修饰能力,能够对请求的各组成部分进行灵活调整,包括但不限于请求头、主体内容、查询参数等。这种机制使得网关能够在请求抵达目标服务前对其进行定制化改造,确保其完全符合服务接口规范。此外,过滤器还支持动态重定向请求至其他目标服务,实现复杂路由场景下的精准投递。
  2. 安全屏障**** :作为微服务架构的入口防线,路由转发过滤器承载了关键的安全防护功能。通过集成身份验证与授权机制,过滤器能有效拦截未经授权的访问请求,确保仅授权用户方可触及特定服务资源。这一特性对于构建坚实的服务边界,防止未授权渗透,保障整个微服务生态系统安全至关重要。
  3. 智能缓存**** :为了提升系统性能、缓解服务压力并降低响应延迟,过滤器可集成缓存策略。针对特定请求或响应,过滤器能够识别其是否适合缓存,并在必要时直接从缓存中返回结果,避免对后端服务产生不必要的调用。这种机制在面对高并发、数据复用性强的场景时尤为高效,显著提升了系统的整体响应速度与吞吐能力。
  4. 日志审计与监控洞察**** :路由转发过滤器充当了微服务交互的透明观察者,实时捕获并记录请求与响应的详细信息。这些数据不仅可用于生成详细的访问日志,便于问题排查与合规审计,还能作为关键性能指标输入到监控系统,助力运维人员实时掌握服务状态,快速定位异常,确保微服务集群稳定运行。
  5. 流量治理与韧性保障**** :借助路由转发过滤器,网关得以实施精细的流量控制策略,如限流、熔断、降级等,以防止服务因瞬时流量激增而过载崩溃。通过智能调节进入服务的请求速率,过滤器在保障服务质量的同时,增强了系统的弹性和稳定性,为微服务架构应对各种突发情况提供了有力支撑。
  6. 负载均衡与服务发现**** :路由转发过滤器的核心职能之一在于实现请求到多个后端服务实例的透明转发,并依据预设的负载均衡算法,确保请求在各实例间均匀分布,最大限度利用服务资源,实现系统的水平扩展。同时,过滤器通常与服务注册与发现机制紧密集成,确保网关始终能准确找到并连接到可用的服务实例,实现服务间的无缝通信。

综上所述,Spring Cloud Gateway中的路由转发过滤器凭借其丰富的功能集与高度的可配置性,为微服务架构提供了全方位的请求处理、安全防护、性能优化、监控洞察与流量管理能力,是构建健壮、高效、易运维的微服务生态体系不可或缺的关键组件。

2、失败重试

请求重试是指在请求失败之后再次尝试请求,一般情况下重试可以减少请求因为服务GC卡顿、网络丢包、网络阻塞等短暂问题而导致的失败;然而重试会增加请求总数量,不合理的重试策略甚至可能在服务端不稳定时,导致重试流量风暴,从而压垮服务端导致故障。

请求失败一般可以按照层级划分为连接失败和请求失败;而请求一般可分为幂等和非幂等请求。

连接失败:由于TCP握手失败,实际业务请求并未发送至服务端,所以对此类错误是可以安全的重试的,配合超时配置将链接超时设置在毫秒级别,可以有效的避免偶发网络拥塞、网络丢包等网络故障导致的报错,提升整体稳定性。

请求失败:由于网络连接已经完成,实际业务请求可能已经发送至服务端,服务端的业务逻辑可能已经执行过了;比如服务端超时,而实际服务端业务逻辑会继续执行完成。因此对于幂等请求相对安全,但是对于非幂等的请求,重试可能会有较大风险。

在短视频APP例子中,比如获取账户信息的请求就是幂等请求,不会有服务端数据的修改,重试操作是比较安全的;但是对于添加评论的请求,如果请求超时进行重试,就可能导致评论服务最终收到多个添加评论的请求,最终添加多个重复的评论,显然这是不正确的,会最终导致数据异常。

因此重试配置一般可归于以下几类

  • 连接重试**** : 因为连接重试风险低,收益高,一般情况下默认开启。
  • 超时重试**** : 需要判断业务接口是否幂等,如非幂等风险是否可控,来决定是否启用;提供重试退避策略:重试等待固定时长或逐次提升等待时长。
  • Backup Request**** : 为减少服务的延迟波动。在设置时间内未返回,再次发送请求;例如使用P99作为阈值,来降低长尾问题。

同时为了防止大规模重试导致请求量总量成倍上升,最终压垮服务,重试一般需提供熔断错误率阈值,当请求错误率超过阈值时停止重试。

3、熔断降级

服务降级是在服务所发出「实际请求需求」大于下游「稳定/可提供QPS」阈值时所使用的一种「服务维稳手段」,保障在部分极端情况下整体系统可以通过牺牲部分能力方式换来一定程度的可用性,而不是超出阈值后导致系统雪崩。

服务降级常见有两种方式,业务根据自身需求进行对应选择:

  • 弃车保帅:按一定丢弃规则,仅丢弃部分请求,保障部分高优/核心 请求可以获得稳定服务。
  • 贫富相均:对于所有请求一视同仁,按照相同比例丢弃。

对于基础系统/核心业务/关键服务,其组件可用性有着极高要求,如果因其承载资源需求过高而整体完全不可用,会导致大面积服务调用链直接断裂,并使故障进一步扩散,引起「系统雪崩」,其造成的巨大损失是我们不能接受的。

在短视频APP的例子中,如下图,假如因为突发事件流量,导致账户服务的流量增加触发了限流;但是对于短视频的场景下,视频播放功能价值显然高于评论功能的价值,在有限的资源情况下,账户服务如果主动将评论服务的流量进行降级,将资源腾挪给视频信息服务,舍弃评论功能,保护视频播放能力显然能获得更高性价比。

服务降级究其根本,即是“断臂求生”。对于实际业务场景而言,降级方式的评估即是对「付出成本」和「实际收获」的评估,可以从以下三个维度去抽象细化:

  • 尽可能少丢弃—— “每一个请求都有价值”
  • 尽可能丢弃价值较低的请求—— “请求与请求的价值是不同的”
  • 尽可能丢弃性价比低的请求—— “吃了两份资源却只能产出一份价值的请求”

4、实现过程

4.1 正常路由转发

在这里创建两个服务,方便测试负载均衡和路由转发效果

这段代码和之前请求服务模块一样,将服务弄到注册中心

【自研网关系列】请求服务模块和客户端模块实现-CSDN博客

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@RestController
@ApiService(serviceId = "backend-http-server", protocol = ApiProtocol.HTTP, patternPath = "/http-server/**")
@Slf4j
public class HttpController {

@Autowired
private ApiProperties apiProperties;

@ApiInvoker(path = "/http-server/ping")
@GetMapping("/http-server/ping")
public String ping() {
log.info("{}", apiProperties);
return "pong1";
}
}

配置文件就是端口号和 Nacos 地址

1
2
3
4
5
6
7
server:
port: 8201

api:
registerAddress: 127.0.0.1:8848
env: dev
gray: false

启动三个类

继续 debug 来讲解流程

进入过滤器后,会根据配置中是否有熔断降级的逻辑,不是就正常路由

这里使用 Http 异步操作来提高性能和响应速度:

在传统的同步HTTP请求中,每个请求都需要等待服务器的响应,这会导致线程阻塞,从而降低程序的效率。而在异步HTTP请求中,请求发送后不需要等待服务器的响应,可以立即进行其他操作,当服务器响应到来时,会通过回调函数进行处理。这样可以大大提高程序的并发处理能力,从而提高程序的响应速度。

  1. whenComplete 方法**** :
    • whenComplete**** 是一个非异步的完成方法。
    • CompletableFuture**** 的执行完成或者发生异常时,它提供了一个回调。
    • 这个回调将在 CompletableFuture**** 执行的相同线程中执行。这意味着,如果 CompletableFuture**** 的操作是阻塞的,那么回调也会在同一个阻塞的线程中执行。
    • 在这段代码中,如果 whenComplete**** true**** ,则在 future**** 完成时使用 whenComplete**** 方法。这意味着 complete**** 方法将在 future**** 所在的线程中被调用。
  2. whenCompleteAsync 方法**** :
    • whenCompleteAsync**** 是异步的完成方法。
    • 它也提供了一个在 CompletableFuture**** 执行完成或者发生异常时执行的回调。
    • whenComplete**** 不同,这个回调将在不同的线程中异步执行。通常情况下,它将在默认的 ForkJoinPool**** 中的某个线程上执行,除非提供了自定义的 Executor****
    • 在代码中,如果 whenComplete**** false**** ,则使用 whenCompleteAsync**** 。这意味着 complete**** 方法将在不同的线程中异步执行。
    • 由于 ForkJoinPool中的线程是共用的**** ,ParallelStream中的线程也是用的ForkJoinPool,因此我推荐手动设定这个线程池的大小,否则会出现一些异常哦。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 默认路由逻辑:
* 根据 whenComplete 判断执行回调的线程是否阻塞执行;
* whenComplete 当异步操作完成时(无论成功还是失败),会立即执行回调函数;
* whenCompleteAsync 当异步操作完成时,会创建一个新的异步任务来执行回调函数。
*/
private CompletableFuture<Response> route(GatewayContext gatewayContext, Optional<Rule.HystrixConfig> hystrixConfig) {
// 异步请求发送
Request request = gatewayContext.getRequest().build();
CompletableFuture<Response> future = AsyncHttpHelper.getInstance().executeRequest(request);
boolean whenComplete = ConfigLoader.getConfig().isWhenComplete();

// 异步/非异步模型
if (whenComplete) {
future.whenComplete(((response, throwable) -> {
complete(request, response, throwable, gatewayContext);
}));
} else {
future.whenCompleteAsync(((response, throwable) -> {
complete(request, response, throwable, gatewayContext);
}));
}
return future;
}

然后就是处理HTTP响应,并根据处理过程中是否存在异常来设置不同的响应内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void handleResponse(Request request, Response response, Throwable throwable, GatewayContext gatewayContext) {
String url = request.getUrl();
try {
if (Objects.nonNull(throwable)) {
if (throwable instanceof TimeoutException) {
log.warn("complete timeout {}", url);
gatewayContext.setThrowable(throwable);
gatewayContext.setResponse(GatewayResponse.buildGatewayResponse(ResponseCode.REQUEST_TIMEOUT));
} else if (throwable instanceof IOException) {
gatewayContext.setThrowable(new ConnectException(throwable, gatewayContext.getUniqueId(), url, ResponseCode.HTTP_RESPONSE_ERROR));
gatewayContext.setResponse(GatewayResponse.buildGatewayResponse(ResponseCode.HTTP_RESPONSE_ERROR));
}
} else {
gatewayContext.setResponse(GatewayResponse.buildGatewayResponse(response));
}
} catch (Exception e) {
gatewayContext.setThrowable(new ResponseException(ResponseCode.INTERNAL_ERROR));
gatewayContext.setResponse(GatewayResponse.buildGatewayResponse(ResponseCode.INTERNAL_ERROR));
log.error("complete process failed", e);
} finally {
gatewayContext.setContextStatus(ContextStatus.Written);
ResponseHelper.writeResponse(gatewayContext);
}
}

最终就是将HTTP响应写回客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 写回响应
*/
public static void writeResponse(IContext context) {
context.releaseRequest();

if (context.judgeContextStatus(ContextStatus.Written)) {
FullHttpResponse response = getHttpResponse(context, (GatewayResponse) context.getResponse());
// 如果不是保持连接的情况,响应后关闭通道
if (!context.isKeepAlive()) {
context.getNettyContext().writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
} else {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
context.getNettyContext().writeAndFlush(response);
}
context.setContextStatus(ContextStatus.Completed);
} else if (context.judgeContextStatus(ContextStatus.Completed)) {
context.invokeCompletedCallBacks();
}
}

这里顺便展示前面的负载均衡

这里请求的网关核心的端口,但可以访问8201或8202的端口,这就是路由转发的效果

4.2 请求重试

就是在处理响应请求之前就执行,然后如果有异常,且小于最大重试次数,就重新执行一遍过滤器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private void complete(Request request, Response response, Throwable throwable, GatewayContext gatewayContext) {
// 请求已经处理完毕 释放请求资源
gatewayContext.releaseRequest();
// 获取上下文请求配置规则
Rule rule = gatewayContext.getRules();
// 获取重试次数
int currentRetryTimes = gatewayContext.getCurrentRetryTimes();
int confRetryTimes = rule.getRetryConfig().getTimes();
// 异常重试
if ((throwable instanceof TimeoutException
|| throwable instanceof IOException)
&& currentRetryTimes <= confRetryTimes) {
doRetry(gatewayContext, currentRetryTimes);
}
// 处理响应
handleResponse(request, response, throwable, gatewayContext);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 重试策略
*/
private void doRetry(GatewayContext gatewayContext, int retryTimes) {
gatewayContext.setCurrentRetryTimes(retryTimes + 1);
log.info("当前请求重试次数为{}", gatewayContext.getCurrentRetryTimes());
try {
// 重新执行过滤器逻辑
doFilter(gatewayContext);
} catch (Exception e) {
log.warn("重试请求失败, requestId={}", gatewayContext.getUniqueId(), e);
throw new RuntimeException(e);
}
}

4.3 熔断降级

首先在 nacos 配置中心中添加 hystrix 的配置

首先需要获得获取 hystrix 的配置

  1. 会判断对比请求路径和注册中心注册的路径参数
  2. 判断当前请求是否需要走熔断策略分支
1
2
3
4
5
6
7
private static Optional<Rule.HystrixConfig> getHystrixConfig(GatewayContext gatewayContext) {
Rule rule = gatewayContext.getRules();
Optional<Rule.HystrixConfig> hystrixConfig = rule.getHystrixConfigs().stream()
.filter(c -> StringUtils.equals(c.getPath(), gatewayContext.getRequest().getPath()))
.findFirst();
return hystrixConfig;
}

熔断降级请求策略

  1. 命令执行超过配置超时时间
  2. 命令执行出现异常或错误
  3. 连续失败率达到配置的阈值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void routeWithHystrix(GatewayContext gatewayContext, Optional<Rule.HystrixConfig> hystrixConfig) {
String key = gatewayContext.getUniqueId() + "." + gatewayContext.getRequest().getPath();
RouterHystrixCommand proxyCommand = null;

if (commandMap.containsKey(key)) {
proxyCommand = commandMap.get(key);
if (!hystrixConfig.get().equals(commandMap.get(key))) {
log.info("previous HystrixCommand instance hashCode: {}", proxyCommand.hashCode());
proxyCommand.updateHystrixCommandProperties(proxyCommand.getCommandKey().name());
proxyCommand = new RouterHystrixCommand(gatewayContext, hystrixConfig);

log.info("after HystrixCommand instance hashCode: {}", proxyCommand.hashCode());
commandMap.put(key, proxyCommand);
}
} else {
proxyCommand = new RouterHystrixCommand(gatewayContext, hystrixConfig);
commandMap.put(key, proxyCommand);
}
proxyCommand.execute();
}

其中 RouterHystrixCommand 是路由转发的内部类,RouterHystrixCommand 类的主要功能是执行实际的路由操作和熔断降级操作,它使用了 Hystrix 来实现这些功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/**
* Hystrix命令集合
*/
private class RouterHystrixCommand extends HystrixCommand<Object> {
private GatewayContext context;
private Optional<Rule.HystrixConfig> config;
public RouterHystrixCommand(GatewayContext context, Optional<Rule.HystrixConfig> config) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(context.getUniqueId()))
.andCommandKey(HystrixCommandKey.Factory.asKey(context.getRequest().getPath()))
.andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
// 核心线程数
.withCoreSize(config.get().getCoreThreadSize()))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
// 线程隔离类型
.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)
// 命令执行超时
.withExecutionTimeoutInMilliseconds(config.get().getTimeoutInMilliseconds())
// 超时中断
.withExecutionIsolationThreadInterruptOnTimeout(true)
.withExecutionTimeoutEnabled(true)));
this.config = config;
this.context = context;
}
@Override
protected Object run() throws Exception {
// 实际路由操作
route(context, config).get();
return null;
}
/**
* 熔断降级操作
*/
@Override
protected Object getFallback() {
// 是否是超时引发的熔断
if (isFailedExecution() || getExecutionException() instanceof HystrixTimeoutException) {
// 针对超时的异常处理
context.setResponse(GatewayResponse.buildGatewayResponse(ResponseCode.GATEWAY_FALLBACK_TIMEOUT));
} else {
// 其它类型异常熔断处理
context.setResponse(GatewayResponse.buildGatewayResponse(ResponseCode.GATEWAY_FALLBACK_ERROR, config.get().getFallbackResponse()));
}
context.setContextStatus(ContextStatus.Written);
return null;
}
/**
* 动态更新 CommandProperties 配置
* 1.因为 Hystrix 内部使用了缓存,如果仅仅修改 HystrixCommand.Setter 是没有用的;
* 2.利用反射获取 HystrixPropertiesFactory 的 commandProperties 字段,并更新
*/
protected void updateHystrixCommandProperties(String commandKey) {
try {
Field field = HystrixPropertiesFactory.class.getDeclaredField("commandProperties");
field.setAccessible(true);
ConcurrentHashMap<String, HystrixCommandProperties> commandProperties = (ConcurrentHashMap<String, HystrixCommandProperties>) field.get(null);
log.info("before update HystrixCommandProperties: {}", commandProperties.get(commandKey));
commandProperties.remove(commandKey);
} catch (NoSuchFieldException | IllegalAccessException e) {
log.error("Remove cache in HystrixCommandFactory failed, commandKey: {}", commandKey, e);
}
}
}

执行结果

目录

1、什么是过滤器

2、实现过滤器

3、实现流程

1、什么是过滤器

在我们的微服务架构中,构建了一个关键组件——网关服务,它作为系统的入口,负责对所有进、出流量进行统一管理和控制。为了实现这一功能,前面文章已成功将其注册至注册中心,并从配置中心获取了相关配置。接下来,我们将深入探讨如何构建网关服务的核心部分—— 过滤器链****

过滤器链,顾名思义,是由一系列有序排列的过滤器构成的执行链条。每个过滤器承载特定的业务逻辑,对经过的请求和响应进行特定处理。当一个过滤器完成其预设的过滤流程后,会遵循链条顺序,将请求传递给下一个过滤器继续执行。通过这种方式,过滤器链实现了对请求与响应的深度定制化处理。

过滤器链中的成员可根据其作用范围分为全局过滤器和局部过滤器两种类型:

  1. 全局过滤器**** :这类过滤器具有广泛的适用性,对所有进入网关的请求均进行处理。它们通常负责执行诸如身份验证、权限校验、日志记录、跨域支持等通用性操作,确保所有请求在到达具体业务服务前符合系统的基本规范和要求。
  2. 局部过滤器**** :Spring Cloud框架已为我们预置了一套局部过滤器,用于应对特定场景下的请求处理。尽管如此,我们依然可以根据实际需求,通过继承并实现相关接口来自定义局部过滤器,以满足特定业务逻辑或优化性能。

过滤器链的运作机制遵循严格的流程:

  • 请求首先被送入链首的过滤器进行处理。
  • 每个过滤器依据自身职责对请求进行检查、修改或增强,然后将处理后的请求传递给链中的下一个过滤器。
  • 这一过程持续进行,直到链尾的路由过滤器接收到请求。路由过滤器的核心职责是根据请求信息和预设的路由规则,准确地将请求转发至相应的后台服务进行实际业务处理。
  • 后台服务完成任务后,将响应返回给路由过滤器。
  • 路由过滤器再将响应沿着过滤器链逆序传递,让每个过滤器有机会对响应进行必要的后期处理。
  • 最终,经过完整过滤器链洗礼的响应被写回客户端。

在过滤器链执行过程中,若遇到任何异常情况,可通过设置专门的异常处理过滤器来捕获并妥善处理这些异常,如返回友好的错误信息、记录异常日志等,确保系统的稳定性和用户体验。

当请求在整个生命周期中均正常流转且后台服务处理完毕后,使用 context.writeAndFlush() 方法将处理结果(即响应数据)高效地写回客户端,标志着一次完整的请求响应流程在过滤器链的保驾护航下圆满结束。

综上所述,过滤器链作为网关服务的核心组件,通过串联各个具有特定功能的过滤器,对进出系统的请求与响应进行全方位、多层次的精细化管理,实现了微服务架构下流量的有效管控与优化。

大概流程图:

2、实现过滤器

项目结构图

具体代码在 github 上,不一一展示

  1. Filter:这是一个接口,定义了过滤器需要实现的方法。所有的过滤器都需要实现这个接口,并实现doFilter方法来执行具体的过滤操作。getOrder方法用于获取过滤器的执行顺序。
  2. FilterAspect:这是一个注解,用于标记过滤器的一些属性,如ID、名称和执行顺序。这个注解被应用在实现了Filter接口的类上。
  3. FilterChainFactory:这是一个接口,定义了过滤器链工厂需要实现的方法。过滤器链工厂的主要职责是根据给定的上下文构建过滤器链。
  4. GatewayFilterChain:这是一个类,代表了过滤器链。它包含了一个过滤器列表,并提供了添加过滤器和执行过滤器链的方法。
  5. GatewayFilterChainFactory:这个类的具体实现可能会根据你的应用有所不同,但一般来说,它应该是FilterChainFactory接口的一个实现,负责创建GatewayFilterChain实例。这个类可能会使用单例模式,以确保整个应用只有一个GatewayFilterChainFactory实例。

总的来说,这些类和接口共同工作,以创建和管理过滤器链。过滤器链是由一系列过滤器组成的,这些过滤器按照特定的顺序执行,以对通过网关的请求进行处理。

3、实现流程

和之前一样,通过 debug 的方式来讲解过滤器链的实现

首先过滤器工厂具体实现类的无参构造

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* SPI加载本地过滤器实现类对象
* 过滤器存储映射 过滤器id - 过滤器对象
*/
public GatewayFilterChainFactory() {
//加载所有过滤器
ServiceLoader<Filter> serviceLoader = ServiceLoader.load(Filter.class);
serviceLoader.stream().forEach(filterProvider -> {
Filter filter = filterProvider.get();
FilterAspect annotation = filter.getClass().getAnnotation(FilterAspect.class);
log.info("load filter success:{},{},{},{}", filter.getClass(), annotation.id(), annotation.name(), annotation.order());
//添加到过滤集合
String filterId = annotation.id();
if (StringUtils.isEmpty(filterId)) {
filterId = filter.getClass().getName();
}
processorFilterIdMap.put(filterId, filter);
processFilterIdName.put(filterId, annotation.name());
});
}

这里还是和注册和配置中心一样的思路,用 SPI 来加载类,加载实时可用的过滤器,组装为网关过滤器链

然后就将各个过滤器的id,名称,排序弄到 ConcurrentHashMap 中

其中 GatewayFilterChainFactory 类采用的是单例模式

在网关之中,GatewayFilterChainFactory 负责创建和管理过滤器链。由于过滤器链在整个应用中是 共享**** 的,因此没有必要为每个请求创建一个新的 GatewayFilterChainFactory 实例。

使用单例模式可以确保所有的请求都使用同一个GatewayFilterChainFactory实例,这样可以避免重复创建实例,节省内存,并 保证所有请求使用的过滤器链的一致性****

此外,GatewayFilterChainFactory 在初始化时会加载所有的过滤器,这可能是一个 耗时**** 的操作。如果每个请求都创建一个新的 GatewayFilterChainFactory 实例,那么这个耗时的初始化操作就会被重复执行,这会影响应用的性能。

使用单例模式,初始化操作只会执行一次,可以提高应用的性能。 总的来说,这里使用单例模式是为了保证过滤器链的一致性,节省内存,提高性能。

在 NettyCoreProcessor 会获取 GatewayFilterChainFactory 这个单例的

1
2
3
4
/**
* 过滤器链工厂
*/
private FilterChainFactory chainFactory = GatewayFilterChainFactory.getInstance();

在 process 方法中调用构建过滤器链条这个功能

1
2
3
4
5
// 创建并填充 GatewayContext 以保存有关传入请求的信息。
GatewayContext gatewayContext = RequestHelper.doContext(request, ctx);

// 组装过滤器并执行过滤操作
chainFactory.buildFilterChain(gatewayContext).doFilter(gatewayContext);

构建过滤器链条利用本地缓存,主要确保对于同一规则ID的请求,可以复用已经构建的过滤器链,而不需要每次都重新构建,从而提高了性能

1
2
3
4
5
/**
* 过滤器链缓存(服务ID ——> 过滤器链)
* ruleId —— GatewayFilterChain
*/
private Cache<String, GatewayFilterChain> chainCache = Caffeine.newBuilder().recordStats().expireAfterWrite(10, TimeUnit.SECONDS).build();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 构建过滤器链条
*/
@Override
public GatewayFilterChain buildFilterChain(GatewayContext ctx) throws Exception {
// 获取规则ID
String ruleId = ctx.getRules().getId();

// 从缓存中获取过滤器链
GatewayFilterChain chain = chainCache.getIfPresent(ruleId);

// 如果缓存中没有过滤器链,那么构建一个新的过滤器链
if (chain == null) {
chain = doBuildFilterChain(ctx.getRules());
// 将新构建的过滤器链添加到缓存中
chainCache.put(ruleId, chain);
}

// 返回过滤器链
return chain;
}

构建一个新的过滤器链

根据给定的规则构建一个过滤器链,这个过滤器链用于处理 HTTP 请求

为什么每个服务请求最终最后需要添加路由过滤器

在网关中,路由过滤器的作用是将 请求路由(转发)到适当的后端服务****

在过滤器链中,路由过滤器通常是最后一个执行的过滤器,因为它需要在所有其他过滤器(如权限、限流、负载均衡等)成功执行后才进行路由。

这个方法中,路由过滤器被添加到过滤器链的末尾,这是因为在执行所有其他过滤器并对请求进行各种检查和处理后,最后的步骤是将请求路由到适当的后端服务。

如果没有路由过滤器,那么即使请求通过了所有其他过滤器,也无法到达任何后端服务,因此每个服务请求最终都需要添加路由过滤器。

在 NettyCoreProcessor 中还会执行 doFilter() 方法,就是遍历过滤器链,逐个执行过滤

1
2
// 组装过滤器并执行过滤操作
chainFactory.buildFilterChain(gatewayContext).doFilter(gatewayContext);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 执行过滤器链
*/
public GatewayContext doFilter(GatewayContext ctx) {
if (filters.isEmpty()) {
return ctx;
}
try {
for (Filter filter : filters) {
filter.doFilter(ctx);
}
} catch (Exception e) {
log.error("执行过滤器发生异常: {}", e.getMessage());
throw new RuntimeException(e);
}
return ctx;
}

大概的过滤器设计就这样,还是需要自己根据代码 debug 一下才能根据清晰,点个 !!!