sendProducerRequest(): //回调 RequestCompletionHandlercallback= response -> handleProduceResponse(response, recordsByPartition, time.milliseconds()); -> handleProduceResponse() -> completeBatch() : if (canRetry(batch, response, now)) { //...... //消息从新入队 reenqueueBatch(batch, now); } reenqueueBatch: this.accumulator.reenqueue(batch, currentTimeMs); /** * Re-enqueue the given record batch in the accumulator. In Sender.completeBatch method, we check * whether the batch has reached deliveryTimeoutMs or not. Hence we do not do the delivery timeout check here. */ publicvoidreenqueue(ProducerBatch batch, long now) { batch.reenqueued(now); Deque<ProducerBatch> deque = getOrCreateDeque(batch.topicPartition); synchronized (deque) { if (transactionManager != null) insertInSequenceOrder(deque, batch); else deque.addFirst(batch); } }