kafka生产者原理分析

1. 前言

Apache Kafka® is an event streaming platform

生产者在发送消息到kafka之前,需要经过拦截器(ProducerInterceptors)、序列化器(keySerializer、valueSerializer)、分区器(Partitioner)最后放入消息累加器(RecordAccumulator),由Sender线程发送到kafka集群。

2. 拦截器

责任链模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* This is called when client sends the record to KafkaProducer, before key and value gets serialized.
* The method calls {@link ProducerInterceptor#onSend(ProducerRecord)} method. ProducerRecord
* returned from the first interceptor's onSend() is passed to the second interceptor onSend(), and so on in the
* interceptor chain. The record returned from the last interceptor is returned from this method.
*
* This method does not throw exceptions. Exceptions thrown by any of interceptor methods are caught and ignored.
* If an interceptor in the middle of the chain, that normally modifies the record, throws an exception,
* the next interceptor in the chain will be called with a record returned by the previous interceptor that did not
* throw an exception.
*
* @param record the record from client
* @return producer record to send to topic/partition
*/
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
ProducerRecord<K, V> interceptRecord = record;
for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
try {
interceptRecord = interceptor.onSend(interceptRecord);
} catch (Exception e) {
// do not propagate interceptor exception, log and continue calling other interceptors
// be careful not to throw exception from here
if (record != null)
log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e);
else
log.warn("Error executing interceptor onSend callback", e);
}
}
return interceptRecord;
}

3. 序列化器

  • KeySerializer
  • ValueSerializer

注意:生产者和消费者配置的序列化方式必须一致。之前cacs由于生产者用json序列化,消费者用string反序列化,导致内容前后多了双引号。

4. 分区器

分三种情况:

  1. 优先使用指定的分区。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    /**
    * computes partition for given record.
    * if the record has partition returns the value otherwise
    * calls configured partitioner class to compute the partition.
    */
    private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
    Integer partition = record.partition();
    return partition != null ?
    partition :
    partitioner.partition(
    record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
    }

    注意:需要关注人为的分区策略是否可以均匀的将消息分配到分区里。

  2. 否则使用序列化后的key对分区数取模。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    /**
    * Compute the partition for the given record.
    *
    * @param topic The topic name
    * @param numPartitions The number of partitions of the given {@code topic}
    * @param key The key to partition on (or null if no key)
    * @param keyBytes serialized key to partition on (or null if no key)
    * @param value The value to partition on or null
    * @param valueBytes serialized value to partition on or null
    * @param cluster The current cluster metadata
    */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
    int numPartitions) {
    if (keyBytes == null) {
    return stickyPartitionCache.partition(topic, cluster);
    }
    // hash the keyBytes to choose a partition
    return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

注意:是否会导致计算分区不均匀?

  1. 如果key为空,就用粘性分区StickyPartitionCache

5. RecordAccumulator

采用的是双端队列Deque来存储消息,由Sender线程异步的发送到kafka集群。

Deque(double-ended queue,双端队列)是一种具有队列和栈的性质的数据结构。双端队列中的元素可以从两端弹出。

步骤:

  1. 根据分区拿到Deque,没有就创建新的Deque
  2. 对dq加锁
  3. tryAppend()
  4. 解锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock,
boolean abortOnNewBatch,
long nowMs) throws InterruptedException {

...
//获取Deque
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
//加锁
synchronized (dq) {
if (closed)
throw new KafkaException("Producer closed while send in progress");
//追加内容
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
if (appendResult != null)
return appendResult;
}
...
}

6. 架构

  1. 用户组装消息:ProducerRecord
  2. ProducerInterceptors对消息进行拦截。
  3. Serializer对消息的key和value进行序列化
  4. Partitioner为消息选择合适的Partition
  5. RecordAccumulator收集消息,放入双端队列(Deque),以实现批量发送(MemoryRecords)
  6. Sender从RecordAccumulator获取消息,将发往不同broker的消息分组打包,以减少网络IO
  7. 构造ClientRequest
  8. 将ClientRequest交给NetworkClient,准备发送
  9. NetworkClient将请求放入KafkaChannel的缓存
  10. 执行网络I/O,发送请求
  11. 收到响应,调用Client Request的回调函数
  12. 调用RecordBatch的回调函数,最终调用每个消息注册的回调函数

消息发送的过程中,设计两个线程协同工作。主线程首先将业务数据封装成ProducerRecord对象,之后调用send方法将消息放入RecordAccumulator中暂存,Sender线程负责将消息信息构成请求,并最终执行网络I/O的线程,他从Record Accumulator中取出消息并批量发送出去。

7. 请求数据结构

kafka-clients-2.6.0.jar!/common/message/ProduceRequest.json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 0,
"type": "request",
"name": "ProduceRequest",
// Version 1 and 2 are the same as version 0.
//
// Version 3 adds the transactional ID, which is used for authorization when attempting to write
// transactional data. Version 3 also adds support for Kafka Message Format v2.
//
// Version 4 is the same as version 3, but the requestor must be prepared to handle a
// KAFKA_STORAGE_ERROR.
//
// Version 5 and 6 are the same as version 3.
//
// Starting in version 7, records can be produced using ZStandard compression. See KIP-110.
//
// Starting in Version 8, response has RecordErrors and ErrorMEssage. See KIP-467.
"validVersions": "0-8",
"flexibleVersions": "none",
"fields": [
{ "name": "TransactionalId", "type": "string", "versions": "3+", "nullableVersions": "0+", "entityType": "transactionalId",
"about": "The transactional ID, or null if the producer is not transactional." },
{ "name": "Acks", "type": "int16", "versions": "0+",
"about": "The number of acknowledgments the producer requires the leader to have received before considering a request complete. Allowed values: 0 for no acknowledgments, 1 for only the leader and -1 for the full ISR." },
{ "name": "TimeoutMs", "type": "int32", "versions": "0+",
"about": "The timeout to await a response in miliseconds." },
{ "name": "Topics", "type": "[]TopicProduceData", "versions": "0+",
"about": "Each topic to produce to.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The topic name." },
{ "name": "Partitions", "type": "[]PartitionProduceData", "versions": "0+",
"about": "Each partition to produce to.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "Records", "type": "bytes", "versions": "0+", "nullableVersions": "0+",
"about": "The record data to be produced." }
]}
]}
]
}

8. 问题

8.1. 为什么用双端队列

Sender线程里,将失败的时候重新放入队列

1
2
3
4
5
6
7
8
9
10
11
12
if (error != Errors.NONE) {
if (canRetry(batch, response, now)) {
log.warn(
"Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}",
correlationId,
batch.topicPartition,
this.retries - batch.attempts() - 1,
error);
reenqueueBatch(batch, now);
...
}
}

8.2. kafka如何保证消息的顺序性

kafka只保证分区内消息的顺序性。

  1. 生产者端,每个分区使用一个Deque,使用了锁保证了队列顺序性。
  2. 服务端,使用顺序存储
  3. 消费者,一个分区在同一个Group内只能被一个消费者订阅。

9. 扩展

ConcurrentHashMap 分段锁 VS RecordAccumulator


kafka生产者原理分析
https://www.wekri.com/kafka/kafka-producer-principle-analysis/
Author
Echo
Posted on
April 18, 2021
Licensed under