caffeine

基本使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

@SneakyThrows
public static void main(String[] args) {
Cache<String, Integer> cache =
Caffeine.newBuilder()
.maximumSize(1000)
.recordStats()
// .expireAfterWrite(5, TimeUnit.SECONDS)
// .expireAfterAccess(2, TimeUnit.SECONDS)
.expireAfter(new Expiry<String, Integer>() {
@Override
public long expireAfterCreate(@NonNull String key, @NonNull Integer value, long currentTime) {
return currentTime;
}

@Override
public long expireAfterUpdate(@NonNull String key, @NonNull Integer value, long currentTime, @NonNegative long currentDuration) {
return currentDuration;
}

@Override
public long expireAfterRead(@NonNull String key, @NonNull Integer value, long currentTime, @NonNegative long currentDuration) {
return currentDuration;
}
})
.removalListener(new RemovalListener<String, Integer>() {
@Override
public void onRemoval(@Nullable String key, @Nullable Integer value, @NonNull RemovalCause cause) {

System.out.println("移除了key:" + key + " value :" + value + " cause : " + cause);
}
})
.build();

cache.put("cliffcw1", 1);
System.out.println(cache.getIfPresent("cliffcw1"));

//可变过期时间策略有没有提供,如果有,就put,
cache.policy().expireVariably().ifPresent(policy -> {
policy.put("cliffcw2", 2, 13, TimeUnit.SECONDS);
policy.put("cliffcw3", 2, 10, TimeUnit.SECONDS);
});

System.out.println("cliffcw2:" + cache.getIfPresent("cliffcw2"));

Thread.sleep(11000);

System.out.println("cliffcw22:" + cache.getIfPresent("cliffcw2"));
System.out.println("cliffcw3:" + cache.getIfPresent("cliffcw3"));

}
//删除是惰性删除

缺点

用本地需要考虑的点

  1. 功能能满足,get,put,过期
  2. 不能OOM,内存管理
  3. 监控展示(肯定不能是黑盒,无提示性语句)
  4. 统计(热key,大key,命中率 ….)

caffeine的缺点

  1. 功能基本满足,但多个业务场景,多种过期时间,不满足
  2. 只有key个数上限(不设置默认是)无法设置使用内存上限

解决:

  1. 给不同kv设置不同的过期时间

:::info
其实是有的,只是隐藏得比较深

// 可变过期时间策略没有提供,如果有,那就put。如果不可变那什么都没有

cache.policy().expireVariably().ifPresent( policy -> { policy.put( xx,xx,xx,xx ) })

:::

  1. 可以给内存设置上限

源码

build()

BoundedLocalManualCache 和 NoBoundedLocalManualCache

1
2
3
4
5
6
7
8
BoundedLocalManualCache(Caffeine<K, V> builder, @Nullable CacheLoader<? super K, V> loader) {
cache = LocalCacheFactory.newBoundedLocalCache(builder, loader, /* async */ false);
}

@Override
public BoundedLocalCache<K, V> cache() {
return cache;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
build:

BoundedLocalCache -> LocalCache -> ConcurrentHashMap

final class LocalCacheFactory:
//针对每种配置,caffeine会生成每种配置类,但它们都继承于LocalCache,这样的话就不用在代码中进行if判断,因为if判断哪在cpu底层也是需要耗时的,所以说优化到极致

//这是根据配置生成对应配置类的类名,然后根据这个类名去找到对应的配置类
//思想:就是用元数据空间换 if分支判断时间
//想象有什么特殊的应用场景????
static String getClassName(Caffeine<?, ?> builder) {
var className = new StringBuilder(LocalCacheFactory.class.getPackageName()).append('.');
if (builder.isStrongKeys()) {
className.append('S');
} else {
className.append('W');
}
if (builder.isStrongValues()) {
className.append('S');
} else {
className.append('I');
}
if (builder.removalListener != null) {
className.append('L');
}
if (builder.isRecordingStats()) {
className.append('S');
}
if (builder.evicts()) {
className.append('M');
if (builder.isWeighted()) {
className.append('W');
} else {
className.append('S');
}
}
if (builder.expiresAfterAccess() || builder.expiresVariable()) {
className.append('A');
}
if (builder.expiresAfterWrite()) {
className.append('W');
}
if (builder.refreshAfterWrite()) {
className.append('R');
}
return className.toString();
}

put

底层其实就是一个data:ConcurrentHashMap

put -> data -> writeBuffer.offer(全局有界队列)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
// 方法作用:将键值对放入缓存,如果键已存在则覆盖旧值(除非onlyIfAbsent为true)
// 参数说明:
// key: 缓存键
// value: 缓存值
// expiry: 过期策略,用于计算条目的过期时间
// onlyIfAbsent: 如果为true,则仅当键不存在时才放入(类似putIfAbsent)
// 返回值:如果键已存在且被覆盖,则返回旧值;否则返回null
@Nullable
V put(K key, V value, Expiry<K, V> expiry, boolean onlyIfAbsent) {
// 参数校验:确保键和值不为空
requireNonNull(key);
requireNonNull(value);

// 初始化节点变量,用于在键不存在时创建新节点
Node<K, V> node = null;
// 获取当前时间(用于过期时间计算)
long now = expirationTicker().read();
// 计算新值的权重(用于基于权重的容量控制)
int newWeight = weigher.weigh(key, value);
// 创建用于查找的键对象(可能是弱引用或强引用,根据配置)
Object lookupKey = nodeFactory.newLookupKey(key);

// 通过自旋循环处理并发情况,attempts为重试次数
for (int attempts = 1; ; attempts++) {
// 从底层ConcurrentHashMap中获取已存在的节点
Node<K, V> prior = data.get(lookupKey);

// ========== 场景1:键不存在(新增缓存条目) ==========
if (prior == null) {
// 如果尚未创建新节点,则创建它
if (node == null) {
// 创建新节点,包含键、值、权重和创建时间
node = nodeFactory.newNode(key, keyReferenceQueue(), value, valueReferenceQueue(), newWeight, now);
// 设置节点的可变过期时间(基于创建时间计算)
setVariableTime(node, expireAfterCreate(key, value, expiry, now));
}

// 尝试将新节点原子性地放入缓存Map
prior = data.putIfAbsent(node.getKeyReference(), node);

// 如果成功放入(prior为null表示没有其他线程抢先放入)
if (prior == null) {
// 执行写后操作:将添加任务提交到写缓冲区,异步处理缓存维护
afterWrite(new AddTask(node, newWeight));
return null; // 新增条目,返回null
} else if (onlyIfAbsent) {
// 如果其他线程抢先放入了该键,且onlyIfAbsent为true,则尝试快速返回现有值
V currentValue = prior.getValue();
if ((currentValue != null) && !hasExpired(prior, now)) {
// 记录访问时间并返回当前值(不覆盖)
if (!isComputingAsync(prior)) {
tryExpireAfterRead(prior, key, currentValue, expiry(), now);
setAccessTime(prior, now);
}
afterRead(prior, now, /* recordHit */ false);
return currentValue;
}
}
}
// ========== 场景2:键存在且onlyIfAbsent为true ==========
else if (onlyIfAbsent) {
// 快速路径:不覆盖现有值,只返回当前值
V currentValue = prior.getValue();
if ((currentValue != null) && !hasExpired(prior, now)) {
if (!isComputingAsync(prior)) {
tryExpireAfterRead(prior, key, currentValue, expiry(), now);
setAccessTime(prior, now);
}
afterRead(prior, now, /* recordHit */ false);
return currentValue;
}
}

// ========== 处理节点状态异常情况 ==========
// 如果之前获取的节点已失效(可能被其他线程移除),则重试
if (!prior.isAlive()) {
// 如果重试次数达到一定阈值(如自旋等待后仍无效),则通过计算操作确保节点状态
if ((attempts & MAX_PUT_SPIN_WAIT_ATTEMPTS) != 0) {
Thread.onSpinWait(); // 提示CPU进行自旋等待优化
continue;
}
// 通过computeIfPresent方法处理,确保在节点存活状态下进行操作
data.computeIfPresent(lookupKey, (k, n) -> {
requireIsAlive(key, n);
return n;
});
continue;
}

// ========== 场景3:键存在且需要更新值 ==========
// 以下变量用于记录更新过程中的状态
V oldValue;
long varTime; // 新的可变过期时间
int oldWeight; // 旧权重值
boolean expired = false; // 标记旧值是否已过期
boolean mayUpdate = true; // 是否允许更新
boolean exceedsTolerance = false; // 是否超过时间容忍度

// 对现有节点加锁,确保更新操作的原子性
synchronized (prior) {
// 再次检查节点是否存活(防止在获取锁期间状态变化)
if (!prior.isAlive()) {
continue; // 如果节点不再存活,重试
}

// 获取旧值和旧权重
oldValue = prior.getValue();
oldWeight = prior.getWeight();

// 根据不同的情况计算新的过期时间
if (oldValue == null) {
// 旧值已被垃圾回收,视为新创建
varTime = expireAfterCreate(key, value, expiry, now);
notifyEviction(key, null, RemovalCause.COLLECTED);
} else if (hasExpired(prior, now)) {
// 节点已过期,按新创建处理
expired = true;
varTime = expireAfterCreate(key, value, expiry, now);
notifyEviction(key, oldValue, RemovalCause.EXPIRED);
} else if (onlyIfAbsent) {
// 如果onlyIfAbsent为true,则不更新值,只更新访问时间
mayUpdate = false;
varTime = expireAfterRead(prior, key, value, expiry, now);
} else {
// 正常更新,计算更新后的过期时间
varTime = expireAfterUpdate(prior, key, value, expiry, now);
}

// 如果允许更新值,则执行更新操作
if (mayUpdate) {
// 检查是否超过写入容忍度(用于控制更新频率,避免频繁维护)
exceedsTolerance = (expiresAfterWrite() && (now - prior.getWriteTime()) > EXPIRE_WRITE_TOLERANCE)
|| (expiresVariable() && Math.abs(varTime - prior.getVariableTime()) > EXPIRE_WRITE_TOLERANCE);

// 更新节点的值和权重
prior.setValue(value, valueReferenceQueue());
prior.setWeight(newWeight);
setWriteTime(prior, now);
discardRefresh(prior.getKeyReference()); // 丢弃可能的刷新任务
}

// 设置节点的可变时间(用于过期策略)和访问时间
setVariableTime(prior, varTime);
setAccessTime(prior, now);
}

// ========== 通知相关事件 ==========
if (expired) {
notifyRemoval(key, oldValue, RemovalCause.EXPIRED);
} else if (oldValue == null) {
notifyRemoval(key, /* oldValue */ null, RemovalCause.COLLECTED);
} else if (mayUpdate) {
notifyOnReplace(key, oldValue, value); // 通知值被替换
}

// ========== 决定后续维护操作 ==========
// 计算权重变化
int weightedDifference = mayUpdate ? (newWeight - oldWeight) : 0;

// 根据条件决定执行写后操作还是读后操作
if ((oldValue == null) || (weightedDifference != 0) || expired) {
// 需要执行写后维护(如大小调整、驱逐检查等)
afterWrite(new UpdateTask(prior, weightedDifference));
} else if (!onlyIfAbsent && exceedsTolerance) {
// 即使值未变,但时间偏差超过容忍度,也需要写后维护
afterWrite(new UpdateTask(prior, weightedDifference));
} else {
// 仅记录访问(轻量级操作)
if (mayUpdate) {
setWriteTime(prior, now);
}
afterRead(prior, now, /* recordHit */ false);
}

// 返回旧值(如果过期则返回null)
return expired ? null : oldValue;
}
}

data.put(keyRef,node)

writeBuffer.offer(task);

schedule

end

//写入writeBuffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void afterWrite(Runnable task) {
//写入然后满了 就会重试 默认140次
for (int i = 0; i < WRITE_BUFFER_RETRIES; i++) {
if (writeBuffer.offer(task)) {
scheduleAfterWrite();
return;
}
scheduleDrainBuffers();
Thread.onSpinWait();
}

// In scenarios where the writing threads cannot make progress then they attempt to provide
// assistance by performing the eviction work directly. This can resolve cases where the
// maintenance task is scheduled but not running. That might occur due to all of the executor's
// threads being busy (perhaps writing into this cache), the write rate greatly exceeds the
// consuming rate, priority inversion, or if the executor silently discarded the maintenance
// task. Unfortunately this cannot resolve when the eviction is blocked waiting on a long-
// running computation due to an eviction listener, the victim is being computed on by a writer,
// or the victim residing in the same hash bin as a computing entry. In those cases a warning is
// logged to encourage the application to decouple these computations from the map operations.

//重试后还是写入不了,就会主动同步调起maintenance:消费Buffer 清理key 淘汰key
lock();
try {
maintenance(task);
} catch (RuntimeException e) {
logger.log(Level.ERROR, "Exception thrown when performing the maintenance task", e);
} finally {
evictionLock.unlock();
}
rescheduleCleanUpIfIncomplete();
}

maintenance

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@GuardedBy("evictionLock")
void maintenance(@Nullable Runnable task) {
setDrainStatusRelease(PROCESSING_TO_IDLE);

try {
drainReadBuffer();

drainWriteBuffer();
if (task != null) {
task.run();
}

drainKeyReferences();
drainValueReferences();

expireEntries();
evictEntries();

climb();
} finally {
if ((drainStatusOpaque() != PROCESSING_TO_IDLE)
|| !casDrainStatus(PROCESSING_TO_IDLE, IDLE)) {
setDrainStatusOpaque(REQUIRED);
}
}
}

若:writeBuffer满了,offer return false。异步任务处理不过来,循环完成后,会主动同步调其maintance(这时候put就会被阻塞,其实是同步的,也是一种保护措施,因为写太多,会OOM,阻塞也会去减慢写的速度):就会去消费Buffer,节点过期,节点淘汰。如果非常爆炸性的put的画,性能就不是很好了

读多写少用的才是本地缓存

get

get -> data -> readBuffer.offer(每个线程一个队列,减少了竞争)

异步任务

会被包装成PerformCleanUpTask它其实是一个Runnable,然后丢到线程池去执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@GuardedBy("evictionLock")
void maintenance(@Nullable Runnable task) {
setDrainStatusRelease(PROCESSING_TO_IDLE);

try {
drainReadBuffer();

drainWriteBuffer();
if (task != null) {
task.run();
}

drainKeyReferences();
drainValueReferences();

expireEntries();
evictEntries();

climb();
} finally {
if ((drainStatusOpaque() != PROCESSING_TO_IDLE)
|| !casDrainStatus(PROCESSING_TO_IDLE, IDLE)) {
setDrainStatusOpaque(REQUIRED);
}
}
}

内存管理,(淘汰策略:key上限),W-TinyLFU

像内存管理,我肯定是有内存的数据才能进行管理,而像这些数据我肯定是不能在put or get 的主线程去做的,有些内存组件其实就是这么去做的,所以性能才差

writeBuffer和readBuffer的作用:就是把统计操作和读写操作分离了(一定情况下),两者不会相互影响

内存模型:

过期策略

  1. 全局统一key一个过期时间
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
void expireAfterWriteEntries(long now) {
if (!expiresAfterWrite()) {
return;
}
long duration = expiresAfterWriteNanos();//全局过期时间 如 10秒
for (;;) {
Node<K, V> node = writeOrderDeque().peekFirst();
if ((node == null) || ((now - node.getWriteTime()) < duration) //判断是否过期
|| !evictEntry(node, RemovalCause.EXPIRED, now)) { //去淘汰
break;
}
}
}
//...
//判断是否过期
if (expiresAfterWrite()) {
expired |= ((now - n.getWriteTime()) >= expiresAfterWriteNanos());
}
//...
//已过期
makeDead(n);

//已过期
removed[0] = true;
return null;
//把date 给 put(key, null);

//..
//移除写顺序队列,好等下一次循环再次peekFirst进行过期检查
writeOrderDeque().remove(node);

//..
//发出移除key通知,可自定义监听器
notifyRemoval(key, value[0], actualCause[0]);
  1. 每个key单独一个过期时间

使用的是时间轮算法,时间轮算法过期

1
2
3
4
5
void expireVariableEntries(long now) {
if (expiresVariable()) {
timerWheel().advance(now);
}
}

时间轮:本质其实就是 数组 + 链表

定时器tick从时间轮里取任务时,整体时间复杂度可以为O(1);

包括put也是

而像Linux的双层时间轮,在次基础上还优化了。采用了双层时间轮,支持高延迟,其设计思想类似于分针和秒针

【层1:秒轮(快速轮)】
slot0 slot1 slot2 … slot59

【层2:分轮(慢速轮)】
slot0 slot1 … slot59

Tick 推进时:

  • 秒轮每走一格
  • 秒轮走满一圈 → 分轮前进一步,并将该分轮槽内的任务下沉到秒轮对应槽

淘汰策略:W-TinyLFU

思想大体就是我觉得就很像jvm的那种分代思想。像这个W-TinyLFU算法,它的话就是把内存总的分成了两端,一个main cache,一个是windows Cache,而main cache里又分为probation(LRU)(试用期) 和 protected(LRU)(受保护的)。(W-TinyLRU 通常包含 两个主要组件:windowCache和mainCache)

工作流程:

  1. 当前一个新的item进来时,会先进入我们的windowCache。这里的windowCache比较小,且内存管理就直接使用传统LRU了。
  2. 若有item从我们的windowCache淘汰出来的话,会尝试进入mainCache
    1. 能否进入看TinyCache对该item的计数是否 大于 主缓存中频率最低的候选淘汰对象 的 频率计数。可以就进入并,不可以就out
  3. mainCache整体采用的也是LRU(但是)
  4. 整体流程呈现一种 先接纳后淘汰的流程

MainCache里的状态流转

W-TinyLFU解决了LRU和LFU什么问题?

  1. LRU:突发流量污染问题
  2. LFU:老资历不腾空给 新数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
//淘汰节点
evictEntries();
//..
//先window 再main
int candidates = evictFromWindow();
evictFromMain(candidates);

//evictFromWindow
Node<K, V> node = accessOrderWindowDeque().peek();
//设100个size
//1个给window 99个给main区
while (windowWeightedSize() > windowMaximum()) {
//..
accessOrderWindowDeque().remove(node);
accessOrderProbationDeque().add(node);

//..
}

//evictFromMain
//probation区的第一个
Node<K, V> victim = accessOrderProbationDeque().peekFirst();
//probation区的末尾个,也就是刚才从window移过来的
Node<K, V> candidate = accessOrderProbationDeque().peekLast();
//key个数有没有超过上限
while (weightedSize() > maximum()) {
//........
//对比频率

//W - TinyLFU的精髓
//计数
admit(candidateKey, victimKey);//frequencySketch() 很好的思想,特殊的位计数法,

//..
evictEntry(evict, RemovalCause.SIZE, 0L);

}

//自动调节各区域大小
climb();

W-TinyLFU 的精髓是 位计数法

如何一个高性能的本地缓存组件,caffeine在拼多多高并发业务场景下性能还是不行

改进缺点

  1. 给不同key设置不同的过期时间,API不够好
1
2
3
4
5
6
7
public void put(K key, V value, Duration timeout) {
if(timeout == null) cache.put(k, v);
else
cache.policy()
.expireVariably()
.ifPresent(policy -> policy.put(key, value, timeout));
}

2.

对比guava 架构