【深入理解Kafka】生产者

从编程的角度而言,生产者就是负责向Kafka发送消息的应用程序

客户端开发

一个正常的生产逻辑需要具备以下几个步骤:

  1. 配置生产者客户端参数及创建相应的生产者实例。
  2. 构建待发送的消息。
  3. 发送消息。
  4. 关闭生产者实例。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 生产者客户端示例代码
public class KafkaProducerAnalysis {
public static final String brokerList = "localhost:9092";
public static final String topic = "topic-demo";

public static Properties initConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerilizer");
props.pus("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("client.id", "producer.client.id.demo");
return props;
}

public static void main(String[] args) {
Properties props = initConfig();
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "Hello, Kafka!");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
}
}

必要的参数配置

在创建真正的生产者实例前需要配置相应的参数,比如需要连接的Kafka集群地址。在Kafka 生产者客户端KafkaProducer中有3个参数是必填的。

  • bootstrap.servers:该参数用来指定生产者客户端连接Kafka集群所需的broker地址清单,具体的内容格式为host1:port1,host2:port2,可以设置一个或多个地址,中间以逗号隔开,此参数的默认值为“”。
    注意这里并非需要所有的 broker 地址,因为生产者会从给定的broker里查找到其他broker的信息。不过建议至少要设置两个以上的 broker 地址信息,当其中任意一个宕机时,生产者仍然可以连接到 Kafka集群上
  • key.serializer 和 value.serializer:broker 端接收的消息必须以字节数组(byte[])的形式存在。生产者使用的KafkaProducer <String , String> 和 ProducerRecord<String , String> 中的泛型 <String,String> 对应的就是消息中 key 和 value 的类型,生产者客户端使用这种方式可以让代码具有良好的可读性,不过在发往 broker 之前需要将消息中对应的 key 和 value 做相应的序列化操作来转换成字节数组。key.serializer 和 value.serializer 这两个参数分别用来指定 key 和 value 序列化操作的序列化器,这两个参数无默认值。注意这里必须填写序列化器的全限定名,如 org.apache.kafka.common.serialization.StringSerializer , 单单指定StringSerializer是错误的

在配置完参数后,我们就可以使用它来创建一个生产者实例

1
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

KafkaProducer 是线程安全的,可以在多个线程中共享单个 KafkaProducer 实例,也可以将 KafkaProducer 实例进行池化来供其他线程调用。

KafkaProducer 中有多个构造方法,比如在创建 KafkaProducer 实例时并没有设定 key.serializer 和 value.serializer 这两个配置参数,那么就需要在构造方法中添加对应的序列化器

1
KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());

其内部原理和无序列化器的构造方法一样,不过就实际应用而言,一般都选用public KafkaProducer(Properties properties)这个构造方法来创建 KafkaProducer 实例。

消息的发送

在创建完生产者实例之后,接下来的工作就是构建消息,即创建 ProducerRecord 对象。通过上面的生产者客户端示例代码中我们已经了解了 ProducerRecord 的属性结构,其中 topic 属性和 value 属性是必填项,其余属性是选填项,对应的 ProducerRecord 的构造方法也有多种。

创建生产者实例和构建消息之后,就可以开始发送消息了。发送消息主要有三种模式:发后即忘(fire-and-forget)、同步(sync)及异步(async)。

发后即忘

示例代码的发送方式就是发后即忘,它只管往Kafka中发送消息而并不关心消息是否正确到达。在大多数情况下,这种发送方式没有什么问题,不过在某些时候(比如发生不可重试异常时)会造成消息的丢失。这种发送方式的性能最高,可靠性也最差。

KafkaProducer 的 send() 方法并非是 void 类型,而是 Future<RecordMetadata>类型,实际上 send() 方法本身就是异步的,send() 方法返回的 Future 对象可以使调用方稍后获得发送的结果。示例中在执行 send() 方法之后直接链式调用了 get() 方法来阻塞等待 Kafka 的响应,直到消息发送成功,或者发生异常。

也可以在执行完 send() 方法之后不直接调用 get() 方法,比如下面的一种同步发送方式的实现:

1
2
3
4
5
6
7
try {
Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get();
System.out.println(metadata.topic() + "-" + metadata.partition() + ":" + metadata.offset());
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}

这样可以获取一个 RecordMetadata 对象,在 RecordMetadata 对象里包含了消息的一些元数据信息,比如当前消息的主题、分区号、分区中的偏移量(offset)、时间戳等。如果在应用代码中需要这些信息,则可以使用这个方式。如果不需要,则直接采用producer.send(record).get()的方式更省事。

Future 表示一个任务的生命周期,并提供了相应的方法来判断任务是否已经完成或取消,以及获取任务的结果和取消任务等。既然KafkaProducer.send()方法的返回值是一个 Future 类型的对象,那么完全可以用Java语言层面的技巧来丰富应用的实现,比如使用Future 中的get(long timeout, TimeUnit unit)方法实现可超时的阻塞。

同步

同步发送的方式可靠性高,要么消息被发送成功,要么发生异常。如果发生异常,则可以捕获并进行相应的处理,而不会像“发后即忘”的方式直接造成消息的丢失。不过同步发送的方式的性能会差很多,需要阻塞等待一条消息发送完之后才能发送下一条。

异步

一般是在 send() 方法里指定一个 Callback 的回调函数,Kafka在返回响应时调用该函数来实现异步的发送确认。有读者或许会有疑问,send() 方法的返回值类型就是 Future,而 Future 本身就可以用作异步的逻辑处理。这样做不是不行,只不过 Future 里的 get() 方法在何时调用,以及怎么调用都是需要面对的问题,消息不停地发送,那么诸多消息对应的 Future 对象的处理难免会引起代码处理逻辑的混乱。使用 Callback 的方式非常简洁明了,Kafka 有响应时就会回调,要么发送成功,要么抛出异常。

1
2
3
4
5
6
7
8
9
10
11
// 异步发送方式示例
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println(metadata.topic() + "-" + metadata.partition() + ":" + metadata.offset());
}
}
});

通常,一个 KafkaProducer 不会只负责发送单条消息,更多的是发送多条消息,在发送完这些消息之后,需要调用 KafkaProducer 的 close() 方法来回收资源

序列化

生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网络发送给 Kafka 。而在对侧,消费者需要用反序列化器(Deserializer)把从 Kafka 中收到的字节数组转换成相应的对象。在上面的示例代码中,为了方便,消息的 key 和 value 都使用了字符串,对应程序中的序列化器也使用了客户端自带的org.apache.kafka.common.serialization.StringSerializer,除了用于String类型的序列化器,还有 ByteArray、ByteBuffer、Bytes、Double、Integer、Long 这几种类型, 它们都实现了org.apache.kafka.common.serialization.Serializer 接口。

分区器

消息在通过 send() 方法发往 broker 的过程中,有可能需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列作用之后才能被真正地发往 broker。拦截器一般不是必需的,而序列化器是必需的。消息经过序列化之后就需要确定它发往的分区,如果消息 ProducerRecord 中指定了 partition 字段,那么就不需要分区器的作用,因为 partition 代表的就是所要发往的分区号。

如果消息 ProducerRecord 中没有指定 partition 字段,那么就需要依赖分区器,根据 key 这个字段来计算 partition 的值。分区器的作用就是为消息分配分区。

生产者拦截器

拦截器(Interceptor)是早在Kafka 0.10.0.0中就已经引入的一个功能,Kafka 一共有两种拦截器:生产者拦截器和消费者拦截器。本节主要讲述生产者拦截器的相关内容。

生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。

KafkaProducer 在将消息序列化和计算分区之前会调用生产者拦截器的 onSend() 方法来对消息进行相应的定制化操作。一般来说最好不要修改消息 ProducerRecord 的 topic、key 和 partition 等信息,如果要修改,则需确保对其有准确的判断,否则会与预想的效果出现偏差。比如修改key不仅会影响分区的计算,同样会影响broker端日志压缩(Log Compaction)的功能。

KafkaProducer 会在消息被应答(Acknowledgement)之前或消息发送失败时调用生产者拦截器的 onAcknowledgement() 方法,优先于用户设定的 Callback 之前执行。这个方法运行在 Producer 的 I/O 线程中,所以这个方法中实现的代码逻辑越简单越好,否则会影响消息的发送速度。

close() 方法主要用于在关闭拦截器时执行一些资源的清理工作。

原理分析

在前面的章节中,我们已经了解了 KafkaProducer 的具体使用方法,而本节的内容主要是对 Kafka 生产者客户端的内部原理进行分析,通过了解生产者客户端的整体脉络可以让我们更好地使用它,避免因为一些理解上的偏差而造成使用上的错误。

整体架构

生产者客户端的整体架构

整个生产者客户端由两个线程协调运行,这两个线程分别为主线程Sender 线程(发送线程)

在主线程中由 KafkaProducer 创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。RecordAccumulator 主要用来缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能

Sender 线程负责从 RecordAccumulator 中获取消息并将其发送到 Kafka 中。

主线程中发送过来的消息都会被追加到 RecordAccumulator 的某个双端队列(Deque)中,在 RecordAccumulator 的内部为每个分区都维护了一个双端队列,队列中的内容就是 ProducerBatch,即 Deque<ProducerBatch>。消息写入缓存时,追加到双端队列的尾部;Sender 读取消息时,从双端队列的头部读取。注意 ProducerBatch 不是 ProducerRecord,ProducerBatch 中可以包含一至多个 ProducerRecord。

通俗地说,ProducerRecord 是生产者中创建的消息,而 ProducerBatch 是指一个消息批次,ProducerRecord 会被包含在 ProducerBatch 中,这样可以使字节的使用更加紧凑。与此同时,将较小的 ProducerRecord 拼凑成一个较大的 ProducerBatch,也可以减少网络请求的次数以提升整体的吞吐量。

元数据的更新

我们使用如下的方式创建了一条消息 ProducerRecord

1
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "Hello, kafka!");

我们只知道主题的名称,对于其他一些必要的信息却一无所知。KafkaProducer 要将此消息追加到指定主题的某个分区所对应的 leader 副本之前,首先需要知道主题的分区数量,然后经过计算得出(或者直接指定)目标分区,之后 KafkaProducer 需要知道目标分区的 leader 副本所在的 broker 节点的地址、端口等信息才能建立连接,最终才能将消息发送到 Kafka ,在这一过程中所需要的信息都属于元数据信息

元数据是指 Kafka 集群的元数据,这些元数据具体记录了集群中有哪些主题,这些主题有哪些分区,每个分区的 leader 副本分配在哪个节点上,follower 副本分配在哪些节点上,哪些副本在AR、ISR等集合中,集群中有哪些节点,控制器节点又是哪一个等信息。

重要的生产者参数

在 KafkaProducer 中,除了之前提及的3个默认的客户端参数,大部分的参数都有合理的默认值,一般不需要修改它们。不过了解这些参数可以让我们更合理地使用生产者客户端,其中还有一些重要的参数涉及程序的可用性和性能,如果能够熟练掌握它们,也可以让我们在编写相关的程序时能够更好地进行性能调优与故障排查。

  • acks

    这个参数用来指定分区中必须要有多少个副本收到这条消息,之后生产者才会认为这条消息是成功写入的。acks 是生产者客户端中一个非常重要的参数,它涉及消息的可靠性和吞吐量之间的权衡。

  • max.request.size

    这个参数用来限制生产者客户端能发送的消息的最大值,默认值为 1048576B,即 1MB。一般情况下,这个默认值就可以满足大多数的应用场景了。

  • retries 和 retry.backoff.ms

    retries 参数用来配置生产者重试的次数,默认值为0,即在发生异常的时候不进行任何重试动作。消息在从生产者发出到成功写入服务器之前可能发生一些临时性的异常,比如网络抖动、leader副本的选举等,这种异常往往是可以自行恢复的,生产者可以通过配置 retries 大于0的值,以此通过内部重试来恢复而不是一味地将异常抛给生产者的应用程序。如果重试达到设定的次数,那么生产者就会放弃重试并返回异常。

    重试还和另一个参数 retry.backoff.ms 有关,这个参数的默认值为100,它用来设定两次重试之间的时间间隔,避免无效的频繁重试。

  • compression.type

    这个参数用来指定消息的压缩方式,默认值为“none”,即默认情况下,消息不会被压缩。

  • connections.max.idle.ms

    这个参数用来指定在多久之后关闭限制的连接,默认值是540000(ms),即9分钟。

  • linger.ms

    这个参数用来指定生产者发送 ProducerBatch 之前等待更多消息 (ProducerRecord) 加入 ProducerBatch 的时间,默认值为 0。生产者客户端会在 ProducerBatch 被填满或等待时间超过 linger.ms 值时发送出去。增大这个参数的值会增加消息的延迟,但是同时能提升一定的吞吐量。这个 linger.ms 参数与 TCP 协议中的 Nagle 算法有异曲同工之妙。

  • receive.buffer.bytes

    这个参数用来设置 Socket 接收消息缓冲区 (SO_RECBUF) 的大小,默认值为32768(B),即32KB。如果设置为-1,则使用操作系统的默认值。如果 Producer 与 Kafka 处于不同的机房,则可以适地调大这个参数值。

  • send.buffer.bytes

    这个参数用来设置 Socket 发送消息缓冲区 (SO_SNDBUF) 的大小,默认值为 131072(B),即 128KB。与 receive.buffer.bytes 参数一样,如果设置为-1,则使用操作系统的默认值。

  • request.timeout.ms

    这个参数用来配置 Producer 等待请求响应的最长时间,默认值为 30000(ms)。请求超时之后可以选择进行重试。注意这个参数需要比 broker 端参数 replica.lag.time.max.ms 的值要大,这样可以减少因客户端重试而引起的消息重复的概率。

总结

本文主要讲述了生产者客户端的具体用法及其整体架构,主要内容包括配置参数的详解、消息的发送方式、序列化器、分区器、拦截器等。在实际应用中,一套封装良好的且灵活易用的客户端可以避免开发人员重复劳动,也提高了开发效率,还可以提高程序的健壮性和可靠性,而Kafka的客户端正好包含了这些特质。对于 KafkaProducer 而言,它是线程安全的,我们可以在多线程的环境中复用它,而对于消费者客户端 KafkaConsumer 而言,它是非线程安全的,因为它具备了状态,具体怎么使用请看下一篇的内容。

参考

《深入理解Kafka:核心设计与实践原理》-朱忠华

评论