给开源项目提 PR 修 bug 的流程,其实就像“搬进别人家的房子帮忙修水管”,既要修好,还得符合人家的规矩。
我给你分成 发现问题 → 修复代码 → 提交 PR → 跟进 四个阶段,顺便补上注意事项。


1️⃣ 发现问题

  • 确认是 bug:先看看是不是自己环境问题、依赖版本问题。
  • 搜索 issue:在项目的 Issues 里查一下有没有人提过同样问题。
    • 有:可以在已有 issue 下留言,表明你也遇到,并且愿意修。
    • 没有:新建一个 issue,简明描述问题、复现步骤、期望结果。

2️⃣ 修复代码

  1. Fork 项目
    • 在 GitHub/GitLab 点击 Fork,把仓库复制到你自己的账号下。
  2. Clone 到本地
1
2
3
4
5
bash


复制编辑
git clone https://github.com/你的账号/项目名.git
  1. 创建分支(不要在 main/master 上直接改)
1
2
3
4
5
bash


复制编辑
git checkout -b fix/xxx-bug
  1. 本地运行并复现 bug(确保能稳定复现,修的时候才能验证成功)。
  2. 修改代码(遵循项目的代码规范,比如缩进、命名、注释格式)。
  3. 写单元测试(能防止 bug 再次出现,维护者会很喜欢)。
  4. 本地跑测试mvn test / npm test / pytest 等,看项目用什么工具)。

3️⃣ 提交 PR

  1. 提交修改
1
2
3
4
5
6
7
bash


复制编辑
git add .
git commit -m "fix: 修复 xxx 导致的 xxx 问题"
git push origin fix/xxx-bug
- commit 信息一般用英文描述,遵循项目 commit 规范(如 Conventional Commits)。
  1. 在 GitHub 发起 Pull Request
    • 选择目标仓库的主分支作为 base,你的分支作为 compare
    • 在 PR 描述里写清:
      • 问题背景
      • 修复思路
      • 测试结果截图(可选)
      • 关联的 issue(比如 Closes #123 会自动关闭 issue)
  2. 注意 CI 检查:很多项目会自动跑代码检查、测试用例,不通过的话你需要再改。

4️⃣ 跟进与维护

  • 维护者可能会 review:他们可能会提修改建议,你要及时回复并修改代码。
  • PR 合并后:可以在后续 release 中看到自己的贡献。
  • 养成习惯:在你 fork 的项目里定期 git fetch upstream 同步源仓库代码。

💡 小技巧

  • 别急着直接修,先看看项目的 CONTRIBUTING.md(贡献指南)和 CODE_OF_CONDUCT.md(行为规范)。
  • 如果 bug 比较复杂,可以先发 issue 让维护者确认,避免白做。
  • 先修小 bug、文档错别字,熟悉流程后再做大功能,容易建立信任。

基本使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

@SneakyThrows
public static void main(String[] args) {
Cache<String, Integer> cache =
Caffeine.newBuilder()
.maximumSize(1000)
.recordStats()
// .expireAfterWrite(5, TimeUnit.SECONDS)
// .expireAfterAccess(2, TimeUnit.SECONDS)
.expireAfter(new Expiry<String, Integer>() {
@Override
public long expireAfterCreate(@NonNull String key, @NonNull Integer value, long currentTime) {
return currentTime;
}

@Override
public long expireAfterUpdate(@NonNull String key, @NonNull Integer value, long currentTime, @NonNegative long currentDuration) {
return currentDuration;
}

@Override
public long expireAfterRead(@NonNull String key, @NonNull Integer value, long currentTime, @NonNegative long currentDuration) {
return currentDuration;
}
})
.removalListener(new RemovalListener<String, Integer>() {
@Override
public void onRemoval(@Nullable String key, @Nullable Integer value, @NonNull RemovalCause cause) {

System.out.println("移除了key:" + key + " value :" + value + " cause : " + cause);
}
})
.build();

cache.put("cliffcw1", 1);
System.out.println(cache.getIfPresent("cliffcw1"));

//可变过期时间策略有没有提供,如果有,就put,
cache.policy().expireVariably().ifPresent(policy -> {
policy.put("cliffcw2", 2, 13, TimeUnit.SECONDS);
policy.put("cliffcw3", 2, 10, TimeUnit.SECONDS);
});

System.out.println("cliffcw2:" + cache.getIfPresent("cliffcw2"));

Thread.sleep(11000);

System.out.println("cliffcw22:" + cache.getIfPresent("cliffcw2"));
System.out.println("cliffcw3:" + cache.getIfPresent("cliffcw3"));

}
//删除是惰性删除

缺点

用本地需要考虑的点

  1. 功能能满足,get,put,过期
  2. 不能OOM,内存管理
  3. 监控展示(肯定不能是黑盒,无提示性语句)
  4. 统计(热key,大key,命中率 ….)

caffeine的缺点

  1. 功能基本满足,但多个业务场景,多种过期时间,不满足
  2. 只有key个数上限(不设置默认是)无法设置使用内存上限

解决:

  1. 给不同kv设置不同的过期时间

:::info
其实是有的,只是隐藏得比较深

// 可变过期时间策略没有提供,如果有,那就put。如果不可变那什么都没有

cache.policy().expireVariably().ifPresent( policy -> { policy.put( xx,xx,xx,xx ) })

:::

  1. 可以给内存设置上限

源码

build()

BoundedLocalManualCache 和 NoBoundedLocalManualCache

1
2
3
4
5
6
7
8
BoundedLocalManualCache(Caffeine<K, V> builder, @Nullable CacheLoader<? super K, V> loader) {
cache = LocalCacheFactory.newBoundedLocalCache(builder, loader, /* async */ false);
}

@Override
public BoundedLocalCache<K, V> cache() {
return cache;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
build:

BoundedLocalCache -> LocalCache -> ConcurrentHashMap

final class LocalCacheFactory:
//针对每种配置,caffeine会生成每种配置类,但它们都继承于LocalCache,这样的话就不用在代码中进行if判断,因为if判断哪在cpu底层也是需要耗时的,所以说优化到极致

//这是根据配置生成对应配置类的类名,然后根据这个类名去找到对应的配置类
//思想:就是用元数据空间换 if分支判断时间
//想象有什么特殊的应用场景????
static String getClassName(Caffeine<?, ?> builder) {
var className = new StringBuilder(LocalCacheFactory.class.getPackageName()).append('.');
if (builder.isStrongKeys()) {
className.append('S');
} else {
className.append('W');
}
if (builder.isStrongValues()) {
className.append('S');
} else {
className.append('I');
}
if (builder.removalListener != null) {
className.append('L');
}
if (builder.isRecordingStats()) {
className.append('S');
}
if (builder.evicts()) {
className.append('M');
if (builder.isWeighted()) {
className.append('W');
} else {
className.append('S');
}
}
if (builder.expiresAfterAccess() || builder.expiresVariable()) {
className.append('A');
}
if (builder.expiresAfterWrite()) {
className.append('W');
}
if (builder.refreshAfterWrite()) {
className.append('R');
}
return className.toString();
}

put

底层其实就是一个data:ConcurrentHashMap

put -> data -> writeBuffer.offer(全局有界队列)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
// 方法作用:将键值对放入缓存,如果键已存在则覆盖旧值(除非onlyIfAbsent为true)
// 参数说明:
// key: 缓存键
// value: 缓存值
// expiry: 过期策略,用于计算条目的过期时间
// onlyIfAbsent: 如果为true,则仅当键不存在时才放入(类似putIfAbsent)
// 返回值:如果键已存在且被覆盖,则返回旧值;否则返回null
@Nullable
V put(K key, V value, Expiry<K, V> expiry, boolean onlyIfAbsent) {
// 参数校验:确保键和值不为空
requireNonNull(key);
requireNonNull(value);

// 初始化节点变量,用于在键不存在时创建新节点
Node<K, V> node = null;
// 获取当前时间(用于过期时间计算)
long now = expirationTicker().read();
// 计算新值的权重(用于基于权重的容量控制)
int newWeight = weigher.weigh(key, value);
// 创建用于查找的键对象(可能是弱引用或强引用,根据配置)
Object lookupKey = nodeFactory.newLookupKey(key);

// 通过自旋循环处理并发情况,attempts为重试次数
for (int attempts = 1; ; attempts++) {
// 从底层ConcurrentHashMap中获取已存在的节点
Node<K, V> prior = data.get(lookupKey);

// ========== 场景1:键不存在(新增缓存条目) ==========
if (prior == null) {
// 如果尚未创建新节点,则创建它
if (node == null) {
// 创建新节点,包含键、值、权重和创建时间
node = nodeFactory.newNode(key, keyReferenceQueue(), value, valueReferenceQueue(), newWeight, now);
// 设置节点的可变过期时间(基于创建时间计算)
setVariableTime(node, expireAfterCreate(key, value, expiry, now));
}

// 尝试将新节点原子性地放入缓存Map
prior = data.putIfAbsent(node.getKeyReference(), node);

// 如果成功放入(prior为null表示没有其他线程抢先放入)
if (prior == null) {
// 执行写后操作:将添加任务提交到写缓冲区,异步处理缓存维护
afterWrite(new AddTask(node, newWeight));
return null; // 新增条目,返回null
} else if (onlyIfAbsent) {
// 如果其他线程抢先放入了该键,且onlyIfAbsent为true,则尝试快速返回现有值
V currentValue = prior.getValue();
if ((currentValue != null) && !hasExpired(prior, now)) {
// 记录访问时间并返回当前值(不覆盖)
if (!isComputingAsync(prior)) {
tryExpireAfterRead(prior, key, currentValue, expiry(), now);
setAccessTime(prior, now);
}
afterRead(prior, now, /* recordHit */ false);
return currentValue;
}
}
}
// ========== 场景2:键存在且onlyIfAbsent为true ==========
else if (onlyIfAbsent) {
// 快速路径:不覆盖现有值,只返回当前值
V currentValue = prior.getValue();
if ((currentValue != null) && !hasExpired(prior, now)) {
if (!isComputingAsync(prior)) {
tryExpireAfterRead(prior, key, currentValue, expiry(), now);
setAccessTime(prior, now);
}
afterRead(prior, now, /* recordHit */ false);
return currentValue;
}
}

// ========== 处理节点状态异常情况 ==========
// 如果之前获取的节点已失效(可能被其他线程移除),则重试
if (!prior.isAlive()) {
// 如果重试次数达到一定阈值(如自旋等待后仍无效),则通过计算操作确保节点状态
if ((attempts & MAX_PUT_SPIN_WAIT_ATTEMPTS) != 0) {
Thread.onSpinWait(); // 提示CPU进行自旋等待优化
continue;
}
// 通过computeIfPresent方法处理,确保在节点存活状态下进行操作
data.computeIfPresent(lookupKey, (k, n) -> {
requireIsAlive(key, n);
return n;
});
continue;
}

// ========== 场景3:键存在且需要更新值 ==========
// 以下变量用于记录更新过程中的状态
V oldValue;
long varTime; // 新的可变过期时间
int oldWeight; // 旧权重值
boolean expired = false; // 标记旧值是否已过期
boolean mayUpdate = true; // 是否允许更新
boolean exceedsTolerance = false; // 是否超过时间容忍度

// 对现有节点加锁,确保更新操作的原子性
synchronized (prior) {
// 再次检查节点是否存活(防止在获取锁期间状态变化)
if (!prior.isAlive()) {
continue; // 如果节点不再存活,重试
}

// 获取旧值和旧权重
oldValue = prior.getValue();
oldWeight = prior.getWeight();

// 根据不同的情况计算新的过期时间
if (oldValue == null) {
// 旧值已被垃圾回收,视为新创建
varTime = expireAfterCreate(key, value, expiry, now);
notifyEviction(key, null, RemovalCause.COLLECTED);
} else if (hasExpired(prior, now)) {
// 节点已过期,按新创建处理
expired = true;
varTime = expireAfterCreate(key, value, expiry, now);
notifyEviction(key, oldValue, RemovalCause.EXPIRED);
} else if (onlyIfAbsent) {
// 如果onlyIfAbsent为true,则不更新值,只更新访问时间
mayUpdate = false;
varTime = expireAfterRead(prior, key, value, expiry, now);
} else {
// 正常更新,计算更新后的过期时间
varTime = expireAfterUpdate(prior, key, value, expiry, now);
}

// 如果允许更新值,则执行更新操作
if (mayUpdate) {
// 检查是否超过写入容忍度(用于控制更新频率,避免频繁维护)
exceedsTolerance = (expiresAfterWrite() && (now - prior.getWriteTime()) > EXPIRE_WRITE_TOLERANCE)
|| (expiresVariable() && Math.abs(varTime - prior.getVariableTime()) > EXPIRE_WRITE_TOLERANCE);

// 更新节点的值和权重
prior.setValue(value, valueReferenceQueue());
prior.setWeight(newWeight);
setWriteTime(prior, now);
discardRefresh(prior.getKeyReference()); // 丢弃可能的刷新任务
}

// 设置节点的可变时间(用于过期策略)和访问时间
setVariableTime(prior, varTime);
setAccessTime(prior, now);
}

// ========== 通知相关事件 ==========
if (expired) {
notifyRemoval(key, oldValue, RemovalCause.EXPIRED);
} else if (oldValue == null) {
notifyRemoval(key, /* oldValue */ null, RemovalCause.COLLECTED);
} else if (mayUpdate) {
notifyOnReplace(key, oldValue, value); // 通知值被替换
}

// ========== 决定后续维护操作 ==========
// 计算权重变化
int weightedDifference = mayUpdate ? (newWeight - oldWeight) : 0;

// 根据条件决定执行写后操作还是读后操作
if ((oldValue == null) || (weightedDifference != 0) || expired) {
// 需要执行写后维护(如大小调整、驱逐检查等)
afterWrite(new UpdateTask(prior, weightedDifference));
} else if (!onlyIfAbsent && exceedsTolerance) {
// 即使值未变,但时间偏差超过容忍度,也需要写后维护
afterWrite(new UpdateTask(prior, weightedDifference));
} else {
// 仅记录访问(轻量级操作)
if (mayUpdate) {
setWriteTime(prior, now);
}
afterRead(prior, now, /* recordHit */ false);
}

// 返回旧值(如果过期则返回null)
return expired ? null : oldValue;
}
}

data.put(keyRef,node)

writeBuffer.offer(task);

schedule

end

//写入writeBuffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void afterWrite(Runnable task) {
//写入然后满了 就会重试 默认140次
for (int i = 0; i < WRITE_BUFFER_RETRIES; i++) {
if (writeBuffer.offer(task)) {
scheduleAfterWrite();
return;
}
scheduleDrainBuffers();
Thread.onSpinWait();
}

// In scenarios where the writing threads cannot make progress then they attempt to provide
// assistance by performing the eviction work directly. This can resolve cases where the
// maintenance task is scheduled but not running. That might occur due to all of the executor's
// threads being busy (perhaps writing into this cache), the write rate greatly exceeds the
// consuming rate, priority inversion, or if the executor silently discarded the maintenance
// task. Unfortunately this cannot resolve when the eviction is blocked waiting on a long-
// running computation due to an eviction listener, the victim is being computed on by a writer,
// or the victim residing in the same hash bin as a computing entry. In those cases a warning is
// logged to encourage the application to decouple these computations from the map operations.

//重试后还是写入不了,就会主动同步调起maintenance:消费Buffer 清理key 淘汰key
lock();
try {
maintenance(task);
} catch (RuntimeException e) {
logger.log(Level.ERROR, "Exception thrown when performing the maintenance task", e);
} finally {
evictionLock.unlock();
}
rescheduleCleanUpIfIncomplete();
}

maintenance

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@GuardedBy("evictionLock")
void maintenance(@Nullable Runnable task) {
setDrainStatusRelease(PROCESSING_TO_IDLE);

try {
drainReadBuffer();

drainWriteBuffer();
if (task != null) {
task.run();
}

drainKeyReferences();
drainValueReferences();

expireEntries();
evictEntries();

climb();
} finally {
if ((drainStatusOpaque() != PROCESSING_TO_IDLE)
|| !casDrainStatus(PROCESSING_TO_IDLE, IDLE)) {
setDrainStatusOpaque(REQUIRED);
}
}
}

若:writeBuffer满了,offer return false。异步任务处理不过来,循环完成后,会主动同步调其maintance(这时候put就会被阻塞,其实是同步的,也是一种保护措施,因为写太多,会OOM,阻塞也会去减慢写的速度):就会去消费Buffer,节点过期,节点淘汰。如果非常爆炸性的put的画,性能就不是很好了

读多写少用的才是本地缓存

get

get -> data -> readBuffer.offer(每个线程一个队列,减少了竞争)

异步任务

会被包装成PerformCleanUpTask它其实是一个Runnable,然后丢到线程池去执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@GuardedBy("evictionLock")
void maintenance(@Nullable Runnable task) {
setDrainStatusRelease(PROCESSING_TO_IDLE);

try {
drainReadBuffer();

drainWriteBuffer();
if (task != null) {
task.run();
}

drainKeyReferences();
drainValueReferences();

expireEntries();
evictEntries();

climb();
} finally {
if ((drainStatusOpaque() != PROCESSING_TO_IDLE)
|| !casDrainStatus(PROCESSING_TO_IDLE, IDLE)) {
setDrainStatusOpaque(REQUIRED);
}
}
}

内存管理,(淘汰策略:key上限),W-TinyLFU

像内存管理,我肯定是有内存的数据才能进行管理,而像这些数据我肯定是不能在put or get 的主线程去做的,有些内存组件其实就是这么去做的,所以性能才差

writeBuffer和readBuffer的作用:就是把统计操作和读写操作分离了(一定情况下),两者不会相互影响

内存模型:

过期策略

  1. 全局统一key一个过期时间
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
void expireAfterWriteEntries(long now) {
if (!expiresAfterWrite()) {
return;
}
long duration = expiresAfterWriteNanos();//全局过期时间 如 10秒
for (;;) {
Node<K, V> node = writeOrderDeque().peekFirst();
if ((node == null) || ((now - node.getWriteTime()) < duration) //判断是否过期
|| !evictEntry(node, RemovalCause.EXPIRED, now)) { //去淘汰
break;
}
}
}
//...
//判断是否过期
if (expiresAfterWrite()) {
expired |= ((now - n.getWriteTime()) >= expiresAfterWriteNanos());
}
//...
//已过期
makeDead(n);

//已过期
removed[0] = true;
return null;
//把date 给 put(key, null);

//..
//移除写顺序队列,好等下一次循环再次peekFirst进行过期检查
writeOrderDeque().remove(node);

//..
//发出移除key通知,可自定义监听器
notifyRemoval(key, value[0], actualCause[0]);
  1. 每个key单独一个过期时间

使用的是时间轮算法,时间轮算法过期

1
2
3
4
5
void expireVariableEntries(long now) {
if (expiresVariable()) {
timerWheel().advance(now);
}
}

时间轮:本质其实就是 数组 + 链表

定时器tick从时间轮里取任务时,整体时间复杂度可以为O(1);

包括put也是

而像Linux的双层时间轮,在次基础上还优化了。采用了双层时间轮,支持高延迟,其设计思想类似于分针和秒针

【层1:秒轮(快速轮)】
slot0 slot1 slot2 … slot59

【层2:分轮(慢速轮)】
slot0 slot1 … slot59

Tick 推进时:

  • 秒轮每走一格
  • 秒轮走满一圈 → 分轮前进一步,并将该分轮槽内的任务下沉到秒轮对应槽

淘汰策略:W-TinyLFU

思想大体就是我觉得就很像jvm的那种分代思想。像这个W-TinyLFU算法,它的话就是把内存总的分成了两端,一个main cache,一个是windows Cache,而main cache里又分为probation(LRU)(试用期) 和 protected(LRU)(受保护的)。(W-TinyLRU 通常包含 两个主要组件:windowCache和mainCache)

工作流程:

  1. 当前一个新的item进来时,会先进入我们的windowCache。这里的windowCache比较小,且内存管理就直接使用传统LRU了。
  2. 若有item从我们的windowCache淘汰出来的话,会尝试进入mainCache
    1. 能否进入看TinyCache对该item的计数是否 大于 主缓存中频率最低的候选淘汰对象 的 频率计数。可以就进入并,不可以就out
  3. mainCache整体采用的也是LRU(但是)
  4. 整体流程呈现一种 先接纳后淘汰的流程

MainCache里的状态流转

W-TinyLFU解决了LRU和LFU什么问题?

  1. LRU:突发流量污染问题
  2. LFU:老资历不腾空给 新数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
//淘汰节点
evictEntries();
//..
//先window 再main
int candidates = evictFromWindow();
evictFromMain(candidates);

//evictFromWindow
Node<K, V> node = accessOrderWindowDeque().peek();
//设100个size
//1个给window 99个给main区
while (windowWeightedSize() > windowMaximum()) {
//..
accessOrderWindowDeque().remove(node);
accessOrderProbationDeque().add(node);

//..
}

//evictFromMain
//probation区的第一个
Node<K, V> victim = accessOrderProbationDeque().peekFirst();
//probation区的末尾个,也就是刚才从window移过来的
Node<K, V> candidate = accessOrderProbationDeque().peekLast();
//key个数有没有超过上限
while (weightedSize() > maximum()) {
//........
//对比频率

//W - TinyLFU的精髓
//计数
admit(candidateKey, victimKey);//frequencySketch() 很好的思想,特殊的位计数法,

//..
evictEntry(evict, RemovalCause.SIZE, 0L);

}

//自动调节各区域大小
climb();

W-TinyLFU 的精髓是 位计数法

如何一个高性能的本地缓存组件,caffeine在拼多多高并发业务场景下性能还是不行

改进缺点

  1. 给不同key设置不同的过期时间,API不够好
1
2
3
4
5
6
7
public void put(K key, V value, Duration timeout) {
if(timeout == null) cache.put(k, v);
else
cache.policy()
.expireVariably()
.ifPresent(policy -> policy.put(key, value, timeout));
}

2.

对比guava 架构

整个链路

高速队列

基于提问者的这个问题,歪师傅也想起了两个类似的场景。

一个是我参与开发过的一个对客发送短信的消息系统,简化一下整个流程大概是这样的:

上面这个图片会出现什么问题呢?

就是消息堆积。

当某个业务系统调用短信发送接口,批量发送消息的时候,比如发送营销活动时,大量的消息就在队列里面堆着,慢慢消费。

其实堆积也没有关系,毕竟营销活动的实时性要求不是那么高,不要求立马发送到客户手机上去。

但是,如果在消息堆积起来之后,突然有用户申请了验证码短信呢?

需要把前面堆积的消费完成后,才会发送验证码短信,这个已经来不及了,甚至验证码已经过期很久了你才发过去。

客户肯定会骂娘,因为获取不到验证码,他就不能进行后续业务。

如果大量客户因为收不到验证码不能进行后续业务,引起群体性的客诉,甚至用户恐慌,这个对于企业来说是一个非常严重的事件。

怎么办呢?

解决方案非常简单,再搞一个“高速”队列出来:

验证码消息直接扔到“高速”队列中去,这个队列专门用来处理验证码、动账通知这一类时效性要求极高的消息,从业务场景上分析,也不会出现消息堆积。

不是特别复杂的方案,大道至简,问题得到了解决。

类比到前面说的“快慢”线程池,其实是一样的思想,都是从资源上进行隔离。

只不过我说的这个场景更加简单,不需要去收集信息进行动态判断。业务流程上天然的就能区分出来,哪些消息实时性比较高,应该走“高速”队列;哪些消息慢慢 发没关系,可以应该走“常规”队列。

而这个所谓的“高速”和“常规”,只是开发人员给一个普通队列赋予的一个属性而已,站在 MQ 的角度,这两个队列没有任何区别。

最短响应时间策略:

  1. 遍历服务提供者,计算每个服务的预计等待时间:(成功总耗时/成功请求数) * 活跃数
  2. 选择时间最短的服务,若多个则按权重选择,权重相同则随机。

加权轮询算法

Dubbo的加权轮询算法经历了三个主要阶段,其演进目标是在保证按权重分配请求的前提下,兼顾性能和请求分布的平滑性。

  1. Dubbo 2.6.4 版本及之前:存在严重性能问题的朴素实现。
  2. 第一次优化:修复了性能问题,时间复杂度降至O(1),但请求分布不平滑。
  3. 最终方案(平滑加权轮询):在保证O(1)时间复杂度的基础上,实现了平滑的请求分布,避免了某个节点短时间内压力激增。

该算法的灵感来源于Nginx的平滑加权轮询算法。其核心思想是,通过动态调整一个“当前权重”值,让请求能够更均匀地分散到各个节点上,而不是连续地分配给高权重节点。

对于每一次请求,选择过程如下:

  1. 遍历并更新:遍历所有Invoker,将每个Invoker的 <font style="color:rgb(0, 0, 0);">current</font>值加上其固定的 <font style="color:rgb(0, 0, 0);">weight</font>
  2. 选择最大者:从步骤1更新后的 <font style="color:rgb(0, 0, 0);">current</font>值中,选出最大的一个。该值对应的Invoker即为本次选中的节点。
  3. 调整选中者权重:将选中节点 <font style="color:rgb(0, 0, 0);">current</font>值减去总权重 <font style="color:rgb(0, 0, 0);">total</font>
  4. 返回结果:返回选中的Invoker。

关键点:第3步“减去总权重”是保证算法能够循环往复、实现平滑的核心。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 非完整代码,仅为说明算法逻辑
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// 1. 获取第一个Invoker和它的固定权重
Invoker<T> initInvoker = invokers.get(0);
int weight = getWeight(initInvoker, invocation);

// 2. 初始化所有Invoker的当前权重和总权重
// `weightArray` 是一个数组,保存了每个Invoker的【固定权重】和【当前权重】
// 这里是为了找到最大的固定权重和总权重
int totalWeight = weight;
int maxWeight = weight;
for (int i = 1; i < length; i++) {
weight = getWeight(invokers.get(i), invocation);
maxWeight = Math.max(maxWeight, weight);
totalWeight += weight;
}

// 3. 核心算法循环
// 如果最大权重大于0,且所有Invoker的当前权重不全都相等,进入状态机逻辑
if (maxWeight > 0 && !sameWeight) {
// 状态机模式,循环直到选出一个Invoker
for (int i = 0; i < length; i++) {
// 当前索引的Invoker,将其【当前权重】增加【固定权重】
int current = weightArray[i].current += weightArray[i].weight;
// 如果当前值大于已知最大值,更新最大值和选中的Invoker
if (current > maxCurrent) {
maxCurrent = current;
selectedInvoker = invokers.get(i);
}
}
// 选中后,将【选中Invoker】的【当前权重】减去【总权重】
// 这是实现平滑的关键步骤!
weightArray[selectedIndex].current -= totalWeight;
return selectedInvoker;
}
// 4. 如果所有权重相同,则退化为普通轮询
return invokers.get(sequence++ % length);
}

Dubbo的加权轮询负载均衡算法通过三次演进,最终采用了平滑加权轮询算法。该算法通过引入动态的“当前权重”(<font style="color:rgb(0, 0, 0);">current</font>),并在每次选中后减去“总权重”(<font style="color:rgb(0, 0, 0);">total</font>),巧妙地实现了:

  1. 严格按权重分配:在多次请求后,各节点被选中的次数比严格等于其权重比。
  2. 请求分布平滑:高权重节点的请求被分散开,避免了瞬时压力过大。
  3. 高性能:时间复杂度为O(n),其中n是服务节点数,通常很小,可视为常量级。

总结

  • 原理
    • 最短响应时间策略:选择平均响应时间最短的服务,结合权重和活跃数计算预计等待时间。
    • 最小活跃数策略:选择活跃请求数最少的服务,需配合<font style="color:rgb(0, 0, 0);">ActiveLimitFilter</font>使用,否则退化为加权随机。
    • 一致性哈希策略:通过哈希环和虚拟节点分布请求,减少节点变动时的数据迁移。
    • 加权轮询策略:请求按权重比例轮流分配,平滑加权算法避免请求集中。
    • 加权随机策略:根据权重设置随机概率,权重越大的服务被选中的概率越高。
  • 作用:优化资源利用、提升响应速度、保证高可用性。
  • 应用场景
    • 最短响应时间:适用于对延迟敏感的场景,如实时计算。
    • 最小活跃数:适合处理时间差异大的服务,如慢查询优化。
    • 一致性哈希:需要会话保持或顺序处理的场景,如缓存分片。
    • 加权轮询/随机:服务器性能不均时,按权重分配负载。

分片分段的意义

MSS和MTU

IP分片了,TCP为什么还要分段?

tcp分段后是为了在传输层保证协议的可靠性,因此给每个段打上标识符,可以实现失败重传和流量控制

的操作

不分段的话,如果这时候某个IP分片丢失了,那么整个IP报文的分片将重传。因为接受方收不到完整数据

就不会回复ack,tcp这边收不到ack,就会触发重传机制,同时因为不知道是哪个分片丢了,所以会将整

个IP报文的分片重传

:::color4
如果TCP把这份数据,分段为N个小于等于MSS长度的数据包,到了IP层后加上IP头和TCP头,还是小于MTU,那么IP层也不会再进行分包。此时在传输路上发生了丢包,那么TCP重传的时候也只是重传那一小部分的MSS段。效率会比TCP不分段时更高。

:::

即分段的意义就是为了提高重传的效率

同为传输层协议的UDP,就不会进行一个分段,重传时就直接重传一整段

TCP分段了,IP为啥还要分片呢?

一般来说,在发送端,tcp分段后,IP就不会继续分片了。但在传输链路过程中,mtu是会变化的,IP分

片就起到一个兜底的作用

主机号和网络号是什么?

网络号用来标识处于哪个网络,而主机号则用于标识处于哪台设备

eg:

:::color1
192.168.10.25 = 11000000.10101000.00001010.00011001

根据子网掩码的 1 位数:

- **前 24 位**:网络号
- **后 8 位**:主机号

网络号:11000000.10101000.00001010 = 192.168.10

主机号:00011001 = 25

:::

所以为什么?

如果不区分的话,那么IP地址将不够用,因为那就意味着每台设备就是一个全新的网络。而且划分后可以区分一个个子网,方便子网内的计算机通信