kafka消费积压(kafka消费积压shell监控)
本篇文章给大家谈谈kafka消费积压,以及kafka消费积压shell监控对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。
本文目录一览:
如何删除kafka积压数据
Kafka删除数据有两种方式
按照时间,超过一段时间后删除过期消息
按照消息大小,消息数配山量超过一定大小后删除最旧的数据
Kafka删除数据的最小单位:segment
Kafka删除数据主逻辑:kafka源码
def cleanupLogs() { debug("Beginning log cleanup...") var total = 0 val startMs = time.milliseconds for(log - allLogs; if !log.config.compact) { debug("Garbage collecting '" 差没+ log.name + "'") total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log) } debug("Log cleanup completed. " + total + " files deleted in " + (time.milliseconds - startMs) / 1000 + " seconds") }
Kafka一段时间(配置文件设置)调用一次 cleanupLogs,删除所有应该删除的日志数据。
cleanupExpiredSegments 负责清理超时的数据
private def cleanupExpiredSegments(log: Log): Int = { val startMs = time.milliseconds log.deleteOldSegments(startMs - _.lastModified log.config.retentionMs) }
cleanupSegmentsToMaintainSize 负责清理超过大小的数据
brprivate def cleanupSegmentsToMaintainSize(log: Log): Int = { 虚卖纳 if(log.config.retentionSize 0 || log.size log.config.retentionSize) return 0 var diff = log.size - log.config.retentionSize def shouldDelete(segment: LogSegment) = { if(diff - segment.size = 0) { diff -= segment.size true } else { false } } log.deleteOldSegments(shouldDelete) }
[img]kafka出现若干分区不消费的现象
近日,有用户反馈kafka有topic出现某个消费组消费的时候,搭缓启有几个分区一直不消费消息,消息一直积压(图1)。除了一直积压外,还有一个现象就是消费组一直在重均衡,大约每5分钟就会重均衡一次。具体表现为消费分区的owner一直在改变(图2)。
业务侧没有报错,同时kafka服务端日志也一切正常,同事先将消费组的机器滚动重启,仍然还是那几个分区没有消费,之后将这几个不消费的分区迁移至别的broker上,依然没有消费。
还有一个奇怪的地方,就是每次重均衡后,不消费的那几个分区的消费owner所在机器的网络都有流量变化。按理说不消费应该就是拉取不到分区不会有流量的。于是让运维去拉了下不消费的consumer的jstack日志。一知如看果然发现了问题所在。
让业务方去查证业务日志,验证了积压的这几个分区,总是在循环的拉取同一批消息。哪皮
临时解决方法就是跳过有问题的消息,将offset重置到有问题的消息之后。本质上还是要业务侧修改业务逻辑,增加超时或者异常处理机制,最好不要采用自动提交offset的方式,可以手动管理。
查看kafka积压情况
./kafka-consumer-groups.sh --bootstrap-server ip:9092 --group consume_group --describe
kafk消费端连错误主题报错
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {qukan_log_v3-198=2289560518} 报错原因:当消费者消费offset大于或小于当前kafka集群的offset值时,消费会报错(比如场景一:一个consumer group消费某topic,当consumer group间隔几天不消费,Kafka内部数据会自动清除之前的数据,程序再次启动时,会找之前消费到的offset进行消费,此时,若Kafka已经删除此offset值,就会产生此报错。场景二:consumer group消费一直有积压,topic保留时间为1hour,当积压的数据已经被删除,消费到被删除的数据时,会出现找不到offset情况,然后报此错误)。 解决办法:换个groupid进行消费或者解决积压问题 2、报错信息: kafka: error while consuming qukan_client_collect_cmd_8037_v3/23: lz4: invalidheaderchecksum: got 1a; expected 82 原因:sarama包版本太低,不能解压缩lz4 解决办法: config := sarama.NewConfig() config.Version = sarama.V2_1_0_0 (换成对应的Kafka版本) 3、报错信息: kafka server: The client is not authorized to access this topic. 原因:带acl认证的Kafka未授慧宏巧权前键 4、报错信息: the compression code is invalid or its codec has not been imported kafka-go 原因:当用户用kafka-go消费topic时,consumer不能自绝敏动解压缩。因此加上下面代码就能解决; 解决办法: lz4.NewCompressionCodec() // 加上这行 r := kafka.NewReader(kafka.ReaderConfig{
kafka manager管理服务问题——积压假现象
关于kafka-manager管理kafka集群,使用后,出现了一件比较刺激的事情。积压假象,消费队列观察队列消费是持续积压,但服务正常。消费服务也正常处理,多维度查看消费服务后,确悔笑定消费者没蠢含有问题,但是对于管理的kafka队列并没有下降,而是在持续增加积压数据。大胆的对kafka-manager管理服带前笑务进行重启,再次观察,发现数据并为积压。但还是不敢掉以轻心,毕竟刚启动数据只是正在统计,持续观察后,发现数据确实未出现增加。
关于kafka消费积压和kafka消费积压shell监控的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。