kafkaproducer(kafkaproducer avro)

本篇文章给大家谈谈kafkaproducer,以及kafkaproducer avro对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。

本文目录一览:

Kafka Producer

Producer负责向kafka发送消息。Producer客户端如下:

发送消息有三种模式:

生产者需要使用序列化器(Serializer)把对象转换成字节数组才能通过网络发送至kafka,而在对侧,消费者需要使用反序列化器(Deserializer)把kafka中收到的字节数组转换成相应的对象。

分区器的作用是消息分配分区。消息经过序列化之后,需要确定它发往的分区,如果指定了分区partition字段,就不需要分区器的作用了,如果没有指定,就需要根据key字段计算partition值。

kafka提供默认的分区器是org.apache.kafka.clients.producer.internals.DefaultPartitioner

实现了接口org.apache.kafka.clients.producer.Partitioner

partition的计算方式:

kafka在消息序列化和计算分区之前调用生产者拦截器onSend()方法对消息进行定制化操作。 主要实现org.apache.kafka.clients.producer.ProducerInterceptor接口。

整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和发送线程。在主线程中由 KafkaProducer 创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息收集器(RecordAccumulator,也称为消息累加器)中。发送线程负责从消息收集器中获取消息并将其发送到 Kafka 中。

主要用来缓存消息以便发送线程可以批量发送,进而减少网络传输的资源消耗以提升性能。消息收集器缓存的大小可以通过生产者客户端参数 buffer.memory 配置,默认值为 33554432B,即32MB。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候 KafkaProducer 的 send() 方法调用要么被阻塞,要么抛出异常,这个取决于参数 max.block.ms 的配置,此参数的默认值为60000,即60秒。

主线程中发送过来的消息都会被追加到消息收集器的某个双端队列(Deque)中,在其的内部为每个分区都维护了一个双端队列,队列中的内容就是ProducerBatch,即 Deque。消息写入缓存时,追加到双端队列的尾部;Sender 读取消息时,从双端队列的头部读取。注意 ProducerBatch 不是 ProducerRecord,ProducerBatch 中可以包含一至多个 ProducerRecord。

通俗地说,ProducerRecord 是生产者中创建的消息,而 ProducerBatch 是指一个消息批次,ProducerRecord 会被包含在薯局让 ProducerBatch 中,这样可以使字节的使用更加紧凑。与此同时,将较小的 ProducerRecord 拼凑成一个较大的 ProducerBatch,也可以减少网络请数局求的次数以提升整体的吞吐量。

如果生产者客户端需要向很多分区发送消息,则可以将 buffer.memory 参数适当调大以增加整体的吞吐量。

ProducerBatch 的大小和 batch.size 参数也有着密切的关系。当一条消息流入消息收集器时,会先寻找与消息分区所对应的双端队列(如果没有则新建),再从这个双端队列的尾部获取一个 ProducerBatch(如果没有则新建),查看 ProducerBatch 中是否还可以写入这个 ProducerRecord,如果可以则写入,如果不可以则需要创建一个新的 ProducerBatch。

在新建 ProducerBatch 时评估这条消息的大小是否超过 batch.size 参数的大小,如果不超过,那么就以 batch.size 参数的大小来创建 ProducerBatch,这样在使用完这段内存区域之腊衫后,可以通过 BufferPool 的管理来进行复用;如果超过,那么就以评估的大小来创建 ProducerBatch,这段内存区域不会被复用。

Sender 从 RecordAccumulator 中获取缓存的消息之后,会进一步将原本分区, Deque ProducerBatch 的保存形式转变成 Node, List ProducerBatch 的形式,其中 Node 表示 Kafka 集群的 broker 节点。

对于网络连接来说,生产者客户端是与具体的 broker 节点建立的连接,也就是向具体的 broker 节点发送消息,而并不关心消息属于哪一个分区;而对于 KafkaProducer 的应用逻辑而言,我们只关注向哪个分区中发送哪些消息,所以在这里需要做一个应用逻辑层面到网络I/O层面的转换。

请求在从 Sender 线程发往 Kafka 之前还会保存到 InFlightRequests 中,保存对象的具体形式为 MapNodeId, Deque,它的主要作用是缓存了已经发出去但还没有收到响应的请求(NodeId 是一个 String 类型,表示节点的 id 编号)。

与此同时,InFlightRequests 还提供了许多管理类的方法,并且通过配置参数还可以限制每个连接(也就是客户端与 Node 之间的连接)最多缓存的请求数。这个配置参数为 max.in.flight.requests.per.connection,默认值为5,即每个连接最多只能缓存5个未响应的请求,超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应(Response)。通过比较 Deque 的 size 与这个参数的大小来判断对应的 Node 中是否已经堆积了很多未响应的消息,如果真是如此,那么说明这个 Node 节点负载较大或网络连接有问题,再继续向其发送请求会增大请求超时的可能。

Kafka系列之(4)——Kafka Producer流程解析

Kafka 0.9版本正式使用Java版本的producer替换了原Scala版本的producer。

注:ProducerRecord允许用户在创建消息对象的时候就直接指定要发送的分区,这样producer后续发送该消息时可以直接发送到指定分区,而不用先通过Partitioner计算目标分区了。另外,我们还可以直接指定消息的时间戳——但一定要慎重使用这个功能,因为它有可能会令时间戳索引机制失效。

流程描述:

用户首先构建待发送的消息对象ProducerRecord,然后调用KafkaProducer#send方法进行发送。KafkaProducer接收到消息后首先对其进行序列化,然后结合本地缓存的元数据信息一起发送给partitioner去确定目标分区,最后追加写入到内存中的消息缓冲池(accumulator)。此时KafkaProducer#send方法成功返回。同时,KafkaProducer中还有一个专门的耐乎Sender IO线程负责将缓冲池中的消息分批次发送给对应的broker,完成真正的消息发送逻辑。

新版本的producer从设计上来说具有以下几个特点:

总共创建两个线程:执行KafkaPrducer#send逻辑的线程——我们称之为“用户主线程”;执行发送逻辑的IO线程——我们称之为“Sender线程”。

不同于Scala老版本的producer,新版本producer完全异步发送消息,并提供了回调机制(callback)供用户判断消息是否成功发送。

batching机制——“分批发送“机制。每个批次(batch)中包含了若干个PRODUCE请求,因此具有更高的吞吐量。

更加合理的默认分区策略:对于无key消息而言,Scala版本分区策略是一段时间内(默认是10分钟)将消息发往固定的目标分区,这容易造成消息分布的不均匀,而新版本的producer采用轮询的方式均匀地将消息分发到不同的分区。

底层统一使用基于Selector的网络客户端实现,结合Java提供的Future实昌毕悉现完整地提供了更加健壮和优雅的生命周期管理。

关键参数

batch.size 我把它列在了首位,因为该参数对于调优producer至关重要。之前提到过新版producer采用分批发送机制,该参数即控制一个batch的大小。默认是16KB

acks 关乎到消息持久数铅性(durability)的一个参数。高吞吐量和高持久性很多时候是相矛盾的,需要先明确我们的目标是什么? 高吞吐量?高持久性?亦或是中等?因此该参数也有对应的三个取值:0, -1和1

linger.ms 减少网络IO,节省带宽之用。原理就是把原本需要多次发送的小batch,通过引入延时的方式合并成大batch发送,减少了网络传输的压力,从而提升吞吐量。当然,也会引入延时

compression.type producer 所使用的压缩器,目前支持gzip, snappy和lz4。压缩是在用户主线程完成的,通常都需要花费大量的CPU时间,但对于减少网络IO来说确实利器。生产环境中可以结合压力测试进行适当配置

max.in.flight.requests.per.connection 关乎消息乱序的一个配置参数。它指定了Sender线程在单个Socket连接上能够发送未应答PRODUCE请求的最大请求数。适当增加此值通常会增大吞吐量,从而整体上提升producer的性能。不过笔者始终觉得其效果不如调节batch.size来得明显,所以请谨慎使用。另外如果开启了重试机制,配置该参数大于1可能造成消息发送的乱序(先发送A,然后发送B,但B却先行被broker接收)

retries 重试机制,对于瞬时失败的消息发送,开启重试后KafkaProducer会尝试再次发送消息。对于有强烈无消息丢失需求的用户来说,开启重试机制是必选项。

当用户调用KafkaProducer.send(ProducerRecord, Callback)时Kafka内部流程分析:

这是KafkaProducer#send逻辑的第一步,即为待发送消息进行序列化并计算目标分区,如下图所示:

如上图所示,一条所属topic是"test",消息体是"message"的消息被序列化之后结合KafkaProducer缓存的元数据(比如该topic分区数信息等)共同传给后面的Partitioner实现类进行目标分区的计算。

producer创建时会创建一个默认32MB(由buffer.memory参数指定)的accumulator缓冲区,专门保存待发送的消息。除了之前在“关键参数”段落中提到的linger.ms和batch.size等参数之外,该数据结构中还包含了一个特别重要的集合信息:消息批次信息(batches)。该集合本质上是一个HashMap,里面分别保存了每个topic分区下的batch队列,即前面说的批次是按照topic分区进行分组的。这样发往不同分区的消息保存在对应分区下的batch队列中。举个简单的例子,假设消息M1, M2被发送到test的0分区但属于不同的batch,M3分送到test的1分区,那么batches中包含的信息就是:{"test-0" - [batch1, batch2], "test-1" - [batch3]}。

单个topic分区下的batch队列中保存的是若干个消息批次。每个batch中最重要的3个组件包括:

compressor: 负责执行追加写入操作

batch缓冲区:由batch.size参数控制,消息被真正追加写入到的地方

thunks:保存消息回调逻辑的集合

这一步的目的就是将待发送的消息写入消息缓冲池中,具体流程如下图所示:

这一步执行完毕之后理论上讲KafkaProducer.send方法就执行完毕了,用户主线程所做的事情就是等待Sender线程发送消息并执行返回结果了。

此时,该Sender线程登场了。严格来说,Sender线程自KafkaProducer创建后就一直都在运行着 。它的工作流程基本上是这样的:

不断轮询缓冲区寻找 已做好发送准备的分区 ;

将轮询获得的各个batch按照目标分区所在的leader broker进行分组;

将分组后的batch通过底层创建的 Socket连接 发送给各个broker;

等待服务器端发送response回来。

为了说明上的方便,我还是基于图的方式来解释Sender线程的工作原理:

上图中Sender线程会发送PRODUCE请求给对应的broker,broker处理完毕之后发送对应的PRODUCE response。一旦Sender线程接收到response将依次(按照消息发送顺序)调用batch中的回调方法,如下图所示:

refer:

Kafka: Producer不调用close方法

producer,不论是简单版,改变分区策略版,自定义Interceptor版,自定义Partitioner版,返回值 Future 对象的同步发送版,都需要最后调用 close 方法发送消息。不写 producer.close() : 没有数据:

源码中注释说 close 方法会一直阻塞 main 线程,直到之前的所有发送请求全部完成。

close() 时: 关闭 producer 对象,主要操作是设置 close 标志,等待 RecordAccumulator 中的消息清空,关闭 sender 线程

没写 close 方法 consumer/broker 那边没有收到消息, 是因斗山链为消息压根就没有发出去所以在topic中始终没有读到消息。程序执行很快, close 方法会一直阻塞,直到之前的所有发送请求全部完成, 如果没有 producer.close() , JVM直接退出,不阻塞,所以没有消息。

不写 close 方法但是空孙 tsleep(100) : 给了足够时间让 sender 线程发送完成所有消息,所以有数据。测试了一下, tsleep(15) 15ms都还可以发送完成所有消息,但是 tsleep(10) 就不行了.

另外, //producer.close() 注释与否其实可以看出, producer.close() 调用了各个 interceptor 中的 close 方法。

实际生产中,据我19年夏天实习的唯宏经历,根本就不会 producer.close , 因为producer生产消息也是跟consumer类似源源不断的,如果非要关就放在 try...catch...finally 的 finally 中

[img]

关于kafkaproducer和kafkaproducer avro的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。

标签列表