1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Override
public @Nullable V getIfPresent(Object key, boolean recordStats) {
Node<K, V> node = data.get(nodeFactory.newLookupKey(key));
if (node == null) {
if (recordStats) {
statsCounter().recordMisses(1);
}
if (drainStatusOpaque() == REQUIRED) {
scheduleDrainBuffers();
}
return null;
}

V value = node.getValue();
long now = expirationTicker().read();
if (hasExpired(node, now) || (collectValues() && (value == null))) {
if (recordStats) {
statsCounter().recordMisses(1);
}
scheduleDrainBuffers();
return null;
}

if ((value != null) && !isComputingAsync(value)) {
@SuppressWarnings("unchecked")
var castedKey = (K) key;
setAccessTime(node, now);
tryExpireAfterRead(node, castedKey, value, expiry(), now);
}
V refreshed = afterRead(node, now, recordStats);
return (refreshed == null) ? value : refreshed;

这是一个get方法,先看get方法干了什么。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@SuppressWarnings({"MultiVariableDeclaration",
"OvershadowingSubclassFields", "PMD.OneDeclarationPerLine"})
abstract class MpscChunkedArrayQueue<E> extends MpscChunkedArrayQueueColdProducerFields<E> {
byte p000, p001, p002, p003, p004, p005, p006, p007;
byte p008, p009, p010, p011, p012, p013, p014, p015;
byte p016, p017, p018, p019, p020, p021, p022, p023;
byte p024, p025, p026, p027, p028, p029, p030, p031;
byte p032, p033, p034, p035, p036, p037, p038, p039;
byte p040, p041, p042, p043, p044, p045, p046, p047;
byte p048, p049, p050, p051, p052, p053, p054, p055;
byte p056, p057, p058, p059, p060, p061, p062, p063;
byte p064, p065, p066, p067, p068, p069, p070, p071;
byte p072, p073, p074, p075, p076, p077, p078, p079;
byte p080, p081, p082, p083, p084, p085, p086, p087;
byte p088, p089, p090, p091, p092, p093, p094, p095;
byte p096, p097, p098, p099, p100, p101, p102, p103;
byte p104, p105, p106, p107, p108, p109, p110, p111;
byte p112, p113, p114, p115, p116, p117, p118, p119;

MpscChunkedArrayQueue(int initialCapacity, int maxCapacity) {
super(initialCapacity, maxCapacity);
}
前面这堆无意义的字符,是用来隔离冷热数据的垫片,为什么是120Byte 是因为一般来说对象头有12B,数据刚好落在第三行,如果想要刚好凑到第二行,因为情况可能会有变化,甚至cpu的缓存是120Bye,那这时候垫片就垫不到了。

MPSC 模型

1. 并发原语层——索引与标记

  • producerIndex / consumerIndex
    volatile 长整形,偶数存序号、奇数当扩容锁,一条 CAS 完成“抢槽位 + 加锁”

  • producerLimit
    由消费者提前算好并发布的水位线,生产者只需 volatile 读即可无锁判断“快满”


2. 数据存储层——数组 + 链表

  • chunk 数组(2 的幂)
    真正存元素;末尾留 1 个槽放JUMP对象,表示“后面还有新数组”

  • 单向链表
    旧数组 buffer[last] 指向新数组,实现无界扩展,消费者顺着链表继续消费


3. 伪共享隔离层——冷热字段垫片

  • 120 byte 填充段(15×8 byte)
    producerIndex / consumerIndex热变量推到独占缓存行,避免多核互相失效

  • 继承链分段
    冷字段 → 垫 → 热字段 → 垫,Java 字段排列即可跨 64 B 边界,零注解、零 Unsafe


4. 无锁算法层——CAS + Ordered Store

  • offer
    CAS 抢 producerIndex → Ordered store 写元素 → 槽满则自旋扩容

  • poll
    plain 读本地快照 → Ordered load 拿元素 → 消费完一批更新水位producerLimit


5. 扩容协调层——单向链表 + JUMP 标记

  • 扩容线程原子置奇数当锁,生产者自旋;

  • 新数组链接到旧数组尾,JUMP 标记提醒消费者跳船到新块;

  • 旧数组完全消费后标注 BUFFER_CONSUMED,后续可被 GC


一眼看全景

Table

Copy

层级 关键实体 作用
并发原语 *Index / *Limit 无锁抢槽、水位预告
存储结构 chunk[] + 链表 可无限扩展的环形缓冲
隔离优化 120 byte 垫片 跨缓存行,防伪共享
算法流程 CAS + Ordered 读写均无锁、顺序一致
扩容协调 JUMP + 单向链表 在线扩容,消费者透明

把这五层拼在一起,就是 JCTools MpscUnboundedArrayQueue 能在 4~6 ns 内完成一次 offer/poll 的底层原因