kafkaconsumer(kafkaconsumerpoll)

本篇文章给大家谈谈kafkaconsumer,以及kafkaconsumerpoll对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。

本文目录一览:

Kafka Consumer Offset解析

Kafka __consumer_offsets是一个特殊的存储元数据的Topic

数据格式可以想象成一个 KV 格式的消息,key 就是一个三元组:group.id+topic+分区号,而 value 就是 offset 的值。

查看方式:使用kafka自带的读取类

./bin/kafka-console-consumer.sh --topic __consumer_offsets --partition 01 --bootstrap-server xxx:9092 --formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter" --from-beginning --max-messages 30

一般情况下, 使用 OffsetsMessageFormatter 打印的格式可以概括为:衫燃蠢

"[%s,%s,%d]::[OffsetMetadata[%d,%s],CommitTime %d,ExpirationTime %d]".format(group, topic, partition, offset, metadata, commitTimestamp, expireTimestamp)

数据内容:

[flink-payment-alert_query_time_1576066085229,payment-result-count,4]::NULL

[flink-payment-alert_query_time_1576066085229,payment-result-count,3]::NULL

[flink-payment-alert_query_time_1576066085229,payment-result-count,9]::NULL

另段局外一种是

[work_default_yw.int.spring.cxyw.blackgold.kafka.orderdomain.core.sub,work_default_yw.int.spring.cxyw.blackgold.kafka.orderdomain.topic,0]::OffsetAndMetadata(offset=19, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1636939024066, expireTimestamp=None)

[work_default_yw.int.spring.cxyw.blackgold.kafka.orderdomain.core.sub,work_default_yw.int.spring.cxyw.blackgold.kafka.orderdomain.topic,0]::OffsetAndMetadata(offset=19, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1636939028621, expireTimestamp=None)

[work_default_yw.int.spring.cxyw.blackgold.kafka.orderdomain.core.sub,work_default_yw.int.spring.cxyw.blackgold.kafka.orderdomain.topic,0]::OffsetAndMetadata(offset=19, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1636939033680, expireTimestamp=None)

还有一种是或陪

[ProcessEngineBusinessProcess,CasBusinessTopic,1]::[OffsetMetadata[99649027,NO_METADATA],CommitTime 1636930671854,ExpirationTime 1637017071854]

[ProcessEngineBusinessProcess,CasBusinessTopic,0]::[OffsetMetadata[99650360,NO_METADATA],CommitTime 1636930671854,ExpirationTime 1637017071854]

[ProcessEngineBusinessProcess,CasBusinessTopic,3]::[OffsetMetadata[99640798,NO_METADATA],CommitTime 1636930672471,ExpirationTime 1637017072471]

分别解释一下:

在 Kafka 中有一个名为“delete-expired-group-metadata”的定时任务来负责清理过期的消费位移,这个定时任务的执行周期由参数 offsets.retention.check.interval.ms 控制,默认值为600000,即10分钟。这和普通的topic的不太一样

还有 metadata,一般情况下它的值要么为 null 要么为空字符串,OffsetsMessageFormatter 会把它展示为 NO_METADATA,否则就按实际值进行展示。

看一下源码里这些类的结构

case class OffsetAndMetadata(offsetMetadata: OffsetMetadata,

commitTimestamp: Long = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP,

expireTimestamp: Long = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP) {

case class OffsetMetadata(offset: Long, metadata: String = OffsetMetadata.NoMetadata) {

override def toString = "OffsetMetadata[%d,%s]"

.format(offset,

if (metadata != null metadata.length 0) metadata else "NO_METADATA")

}

@Deprecated

public static final long DEFAULT_TIMESTAMP = -1L; // for V0, V1

另外0.11.0之后对应的数据格式版本是V2,这个版本的消息相比于v0和v1的版本而言改动很大,同时还参考了Protocol Buffer而引入了变长整型(Varints)和ZigZag编码。

另外:

offset为什么会有墓碑消息?

因为offset本身也会过期清理.受offsets.retention.minutes 这个配置的影响

看下官网介绍

After a consumer group loses all its consumers (i.e. becomes empty) its offsets will be kept for this retention period before getting discarded. For standalone consumers (using manual assignment), offsets will be expired after the time of last commit plus this retention period.

当group里的consumer全部下线后过offsets.retention.minutes 时间后offset就会被删除

val OffsetsRetentionMinutes: Int = 7 * 24 * 60 // 默认7天

默认2.0之前是1天,2.0及以后是7天 这个官方真是..要么就改为2天,结果直接改为7天,改动不可谓不大,而且active的group不会过期

附:

另外active的group无法修改consumer offset?

Usually we do not allow committed offset changes while a group is active because we do not have a mechanism to notify the group of the change.

原因是无法通知到组成员consumer offset的变更

kafka consumer offset机制

kafka消息在分区中是按序一条一条存储的,假如分区中有10条消息,位移就是0-9,

consumer消费了5条消息,那么offset就是5,指向了下一条要消费桐宏或的记录,consumer

需要向kafka汇报自己的位移数据,因为consumer是能够消费多个分区的,所以offset

的粒度是分区,consumer需要为分配给他的各分区分别提交offset信息。

从用户的角度来说,位移提交分为自动提交和手动提交,在consumer的角度来说,位移

分为同步提交和异步提交。

kafka内部有个topic叫 ‘__consumer_offsets’,offset提交就是往这个topic发送一条消息,

消息格式是key value形式,key是由 groupId、主题名、分区号组成,消息体是位移值

及用户自定义数据和时间戳等。还有2种特殊的格式,一种是用于保存 Consumer Group 

信息的消息,用于注册group,另一种是 用于删除 Group 过期位移和删除 Group 的消息。

当kafka集群种第一台consumer启动时,便会创建__consumer_offsets主题,默认50个

分区和3个副本。

当提交方式是自动提交时,就算是当前consumer的offset已经不更新,kafka还是会自动

定期的往__consumer_offsets发送位移消息,所以得对位移主题的消息做定期删除,

假如对于同一个key有2条A和B,A早于B发送,那么A就是属于过期消息。

compact有点类似jvm gc的标记-整理,把过期消息删掉,把剩下的消息排列在一起

Kafka 提供了专门的后台线程定期地巡检待Compact 的主题,看看是否存在满足条件的

可删除数据,这个线程叫Log Cleaner,当我们发现位移主题日志过多的时候,可以

检查一下是否是这个线程挂了导致的

enable.auto.commit 默认即是true,

auto.commit.interval.ms 默认是5秒,表示kafka每5秒自动提交一次位移信息。

自动提交会有消息重复消费的问题,因为他是间隔时间提交一次,假如在间隔期间,

发生了Rebalance ,在Rebalance 之后所有的消费者必须从当前最新的offset开始

继续消费,那么上一次自动提交到Rebalance 的这段时间消费的数据的位移并没有

提交,所以会重复消费,即时我们通过减少 auto.commit.interval.ms 的值来提高提交频率,

那也仅仅是缩小了重复消费的时间窗口,所以我们看看能不能通过手动提交来避免重复消费。

commitSync()是consumer的同步api,手动提交的好处自然是我们可以控制提交的时机

和频率,由于是同步api,是会阻塞至broker返回结果才会结束这个阻塞状态,对于系统

而言,自然不想发生这种不是由于资源的限制导致的阻塞。

commitAsync()是consumer的异步api,commitAsync()不会阻塞,因此不会影响consumer的

tps,但是他的问题在于他无法重试,因为是异步提交,当因为网络或者系统资源阻塞

导致提交失败,那么他重试的时候,在这期间,consumer可能已经消费好多条消息

并且提交了,所以此时的重试提交的offset已然不是最新值绝戚了并没有意义,我们可以通过

异步和同局伍步提交相结合,我们使用同步提交来规避因为网络问题或者broker端的gc导致的

这种瞬时的提交失败,进而通过重试机制而提交offset,使用异步提交来规避提交时的阻塞

前面的commitSync()和commitAsync(),都是consumer poll消息,把这些消息消费完,

再提交最新的offset,如果poll的消息很多呢?消费时间较长,假如中间系统宕机,岂不是

得从头再来一遍,所以kafka提供分段提交的api

commitSync(MapTopicPartition, OffsetAndMetadata) 

 commitAsync(MapTopicPartition, OffsetAndMetadata)

假设我们poll了一秒钟的数据,有5000条,我们可以通过计数器,累计到100条,

便通过分段提交api向kafka提交一次offset。

Kafka Consumer自动提交机制

Consumer消费消息之后不需要手动提交,consumer客户端会自动提交已经消费的消息的offset。

调用栈为:

KafkaConsumer#assign

ConsumerCoordinator#maybeAutoCommitOffsetsNow

调用栈为:

KafkaConsumer#poll

KafkaConsumer#pollOnce

ConsumerCoordinator#poll

ConsumerCoordinator#maybeAutoCommitOffsetsAsync

调用栈为:

KafkaConsumer#poll

KafkaConsumer#pollOnce

ConsumerCoordinator#poll

AbstractCoordinator#ensureActiveGroup

AbstractCoordinator#joinGroupIfNeeded

AbstractCoordinator#onJoinPrepare

ConsumerCoordinator#maybeAutoCommitOffsetsSync

只要开启了自动提交,此处是一定会向协调者同步提交位移,因为需要重新rebalance,消费者组中各个消费者的分区既有可能会发生改变,重新消费之前一定要获取最新的唯一仿肆,备和轿尽最大努力避免重复消费。

调用栈为:

KafkaConsumer#close

ConsumerCoordinator#close

关闭的时棚肆候肯定是要同步提交消费位移的。

consumer(KafkaConsumer)

(一)消费者和消费者组

1、消费者:订阅并消费kafka消息,从属于消费者组

2、消费者组:一个群组里的消费者订阅的是同一个主题,每个消费者接受主题一部分分区的消息。

注:同一个消费者可以消费不同的partition,但是同一个partition不能被不同消费者消清毁笑费。

(二)消费者群组和分区再均衡

1、再均衡:分区的消费所有权从一个消费者转移到另一个消费者称为再均衡,为消费者组带来了答含高可用性和可伸缩性。

注:分区何时重新分配:加入消费者或者消费者崩溃等

2、如何判断消费者崩溃:消费者通过向群组协调器(某broker,不同群组可以有不同的群组协调器)发送心跳(一般在拉取消息或者提交偏移量的时候)表示自己仍旧存活,如果长时间不发送心跳则协调器认为期死亡并进行再均衡。

注:在0.10.1版本中,心跳行为不再和获取消息和提交偏移余薯量绑定在一起,有一个单独的心跳线程。

3、分配分区:消费者加入消费者组是,会像群组协调器发送请求,第一个加入的成为“群主”。群主从协调器那里获取成员列表,并负责给每一个消费者分配分区。完毕之后,将分配结果发送给协调器,协调器再将消息发送给所有的消费者,每个消费者只能看到自己的分配信息。只有群主知道所有的消费信息。

(三)参数配置

1、bootstrap.server:host:port

2、key.serializer:键序列化器

3、value.serializer:值序列化器

注:以上为必须设置的

4、group.id:从属的消费者组

5、fetch.min.bytes:消费者从服务器获取记录的最小字节数。

6、fetch.max.wait.ms:消费者等待消费消息的最大时间

7、max.partition.fetch.bytes:服务器从每个分区返回给消费者的最大字节数(需要比broker的设置max.message.size属性配置大,否则有些消息无法消费)

8、session.timeout.ms:指定该消费者在被认为死亡之前可以与服务器断开连接的时间,默认3秒

9、heartbeat.interval.ms:制定了poll方法向协调器发送心跳的频率。

注:一般9是8的三分之一

10、auto.offset.reset:消费者在读取一个没有偏移量分区或者无效偏移量分区的情况下如何处理(latest:从最新记录开始读取,earliest:从最早的记录开始读取)

11.、enable.auth.commit:消费者是否自动提交偏移量,默认为true

12、auto.commit.interval.ms:自动提交偏移量的时间间隔

13、partition.assignment.strategy:分区分配给消费者的策略:

(1)range:会把主题若干个连续分区分配给消费者

(2)roundRobin:会把主题的所有分区逐个分配给消费者

14、client.id:任意字符串,broker用来区分客户端发来的消息

15:max.poll.records:控制poll方法返回的最大记录数

16:receive.buffer.bytes/send.buffer.bytes:tcp缓冲池读写大小

(四)订阅主题

consumer.subscribe(list)

(五)轮训(消费者API的核心)

1、轮训作用: 只要消费者订阅了主题,轮训就会处理所有的细节(群组协调、分区再均衡、发送心跳、获取数据)

(1)获取数据

(2)第一次执行poll时,负责查找协调器,然后加入群组,接受分配的分区

(3)心跳的发送

(4)再均衡也是在轮训期间进行的

2、方法:poll(),消费者缓冲区没有数据时会发生阻塞,可以传一个阻塞时间,避免无限等待。0表示立即返回。

3、关闭:close(),网络连接随之关闭,立即触发再均衡。

4、线程安全:无法让一个线程运行多个消费者,也无法让多个线程公用一个消费者。

(六)提交和偏移量

1、提交:更新分区当前位置的操作

2、如何提交:消费者往一个特殊主题(_consumer_offset)发送消息,消息中包含每个分区中的偏移量。

3、偏移量:分区数据被消费的位置。

4、偏移量作用:当发生再均衡时,消费者可能会分配到不一样的分区,为了继续工作,消费者需要读取到每个分区最后一次提交的偏移量,然后从偏移量的地方继续处理。

5、提交偏移量的方式

(1)自动提交:经过一个时间间隔,提交上一次poll方法返回的偏移量。每次轮训都会检测是否应该提交偏移量。缺陷:可能导致重复消费

(2)手动提交:commitSysn()提交迁移量,最简单也最可靠,提交由poll方法返回的最新偏移量。缺点:忘了提交可能会丢数据,再均衡可能会重复消费

(3)异步提交:同步提交在提交过程中必须阻塞

(4)同步异步提交组合

(5)提交特定的偏移量

(七)再均衡监听器

(八)从特定偏移量读取数据(seek)

1、从分区开始:seekToBegining

2、从分区结束:seekToEnd

3、ConsumerRebalanceListener和seek结合使用

(九)如何退出

1、前言:wakeup方法是唯一安全退出轮训的方法,从poll方法中退出并抛出wakeupException异常。如果没有碰上轮训,则在下一次poll调用时抛出。

2、退出轮训

(1)另一个线程调用consumer.wakeup方法

(2)如果循环在主线程里可以在ShutdownHook里面调用该方法

3、退出之前调用close方法:告知协调器自己要离开,出发再均衡,不必等到超时。

(十)独立消费者(assign为自己分配分区)

[img]

Kafka 源码解析之 Consumer 两种 commit 机制和 partition 分配机制

先看下两种不同的 commit 机制,一种是同步 commit,一种是异步 commit,既然其作用都是 offset commit,应该不难猜到它们底层使用接口都是一样的

同步 commit

同步 commit 的实现方式,client.poll() 方法会阻塞直到这个request 完成或超时才会返回。

异步 commit

而对于异步的 commit,最后调用的都是 doCommitOffsetsAsync 方法,其具体实现如下:

在异步 commit 中,可以添加相应的回调函数,如果 request 处理成功或处理失败,ConsumerCoordinator 会通过 invokeCompletedOffsetCommitCallbacks() 方法唤醒相应的回调函数。

关键区别在于future是否会get,同步提交就是future会get.

consumer 提供的两种不同 partition 分配策略,可以通过 partition.assignment.strategy 参数进行配置,默认情况下使用的是 org.apache.kafka.clients.consumer.RangeAssignor,Kafka 中提供另一种 partition 的分配策略 org.apache.kafka.clients.consumer.RoundRobinAssignor

用户可以自定义相应的 partition 分配机制,只需要继承这个 AbstractPartitionAssignor 抽象类即可。

AbstractPartitionAssignor

AbstractPartitionAssignor 有一个抽象方法,如下所示:

assign() 这个方法,有两个参数:

RangeAssignor 和 RoundRobinAssignor 通过这个方法 assign() 的实现,来进行相应的 partition 分配。

直接看一下这个方法的实现:

假设 topic 的 partition 数为 numPartitionsForTopic,group 中订阅这个 topic 的 member 数为 consumersForTopic.size(),首先需要算出两个值:

分配的陪巧规则是:对于剩下的那些 partition 分配到前 consumersWithExtraPartition 个 consumer 上,也就是前 consumersWithExtraPartition 个 consumer 获得 topic-partition 列表会比后面多一个。

在上述的程序中,举了一个例子,假设有一个 topic 有 7 个 partition,group 有5个 consumer,这个5个 consumer 都订阅这个 topic,那么 range 的分配方式如下:

而如果 group 中有 consumer 没有订阅这个 topic,那么这个 consumer 将不会参与分配。下面再举个例子,将有两个 topic,一个 partition 有5个,一个孝银 partition 有7个,group 有5个 consumer,但是只有前3个订阅第一个 topic,而另一个 topic 是所有 consumer 都订阅了,那么其分配结果如下:

这个是 roundrobin 的实现,其实现方法如下:

roundrobin 的实现原则,简单来说就是:列出所有 topic-partition 和列出所有的 consumer member,然后开始分配,一轮之后继续下一轮,假设有有一个 topic,它有7个 partition,group 有3个 consumer 都订阅了这个 topic,那么其分配方式为:

对于多个 topic 的订阅,将有两个 topic,一个 partition 有5个,一个 partition 有7个,group 有5个 consumer,但是芦慎键只有前3个订阅第一个 topic,而另一个 topic 是所有 consumer 都订阅了,那么其分配结果如下:

roundrobin 分配方式与 range 的分配方式还是略有不同。

关于kafkaconsumer和kafkaconsumerpoll的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。

标签列表