为什么使用双端队列?

双端队列区别于普通的队列,可以直接往队列头塞元素,这样可以满足分区内数据有序性,再尝试发送。

DQ由RecordAccumulator维护

一个topic里的partition都会对应一个DQ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
::方法内容
-> : 方法调用方法

**/

sendProducerRequest():
//回调
RequestCompletionHandler callback = response -> handleProduceResponse(response, recordsByPartition, time.milliseconds());
->
handleProduceResponse() ->
completeBatch() :
if (canRetry(batch, response, now)) {
//......

//消息从新入队
reenqueueBatch(batch, now);
}
reenqueueBatch: this.accumulator.reenqueue(batch, currentTimeMs);
/**
* Re-enqueue the given record batch in the accumulator. In Sender.completeBatch method, we check
* whether the batch has reached deliveryTimeoutMs or not. Hence we do not do the delivery timeout check here.
*/
public void reenqueue(ProducerBatch batch, long now) {
batch.reenqueued(now);
Deque<ProducerBatch> deque = getOrCreateDeque(batch.topicPartition);
synchronized (deque) {
if (transactionManager != null)
insertInSequenceOrder(deque, batch);
else
deque.addFirst(batch);
}
}