kafkatools(kafkatools怎么看那个top数据较大)

本篇文章给大家谈谈kafkatools,以及kafkatools怎么看那个top数据较大对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。

本文目录一览:

kafka如何查看日志量

我们在使用kafka的过程中有时候可以需要查看我们生产的消息的各种信息,这些都是被记录在卡夫卡的log文李圆件中的。由于log文件的特殊格式,我们是无法直接查看log文件中的信息的。本文提供一下方式查看kafka的log文件中所记录的信息源扰脊。

执行命令

bin/kafka-run-class.sh kafka.tools.DumpLogSegments

我们可以看到都需要哪些参数,根据雹渗不同的需求我们可以选择不同的参数。

执行如下命令查看某个log文件

bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/test3-0/00000000000000000000.log  --print-data-log

这里 --print-data-log 是表示查看消息内容的,不加此项是查看不到详细的消息内容。如果要查看多个log文件可以用逗号分隔。

Kafka简介+Kafka Tool使用简介+使用实例

详细安装访问:

macOS 可以用homebrew快速安装,访问地址:

原文链接:

查看topic列表:

创建topic:

--create :创建命令;

--topic :后面指定topic名称;

--replication-factor :后面指定副本数;

--partitions :指定分区数,根据broker的数量决定;

--zookeeper :后面指定 zookeeper.connect 的zk链接

查看某个topic:

Kafka 作为消息雹岩烂系统的一种, 当然可 以像其他消 息中 间件一样作为消息数据中转的平台。 下面以 Java 语言为例,看一下如何使用 Kafka 来发送和接收消息。

1、引入依赖

2、消息生产者

示例 中用 KafkaProducer 类来创建一个消息生产者,该类的构造函数入参是一系列属性值。下面看一下这些属性具体都是什么含义。

bootstrap.servers 表示 Kafka 集群 。 如果集群中有多台物理服务器,则服务器地址之间用逗号分隔, 比如” 192.168.1.1 :9092,192.168.1.2:9092” 。 localhost 是笔者电脑的地址,9092 是 Kafka 服务器默认监听的端口号。

key.serializer 和 value.serializer 表示消息的序列化类型 。 Kafka 的消息是以键值对的形式发送到 Kafka 服务器的,在消息被发送到服务器之前,消息生产者需要把不同类型的 消息序列化为 二 进制类型,示例中是枣明发送文本消息到服务器 , 所以使用的是StringSerializer。

key.deserializer 和 value.deserializer 表示消息的反序列化类型。把来自 Kafka 集群的二进制消 息反序列 化 为指定 的 类型,因为序列化用的是String类型,所以用StringDeserializer 来反序列化。

zk.connect 用于指定 Kafka 连接 ZooKeeper 的 URL ,源漏提供了基于 ZooKeeper 的集群服务器自动感知功能, 可以动态从 ZooKeeper 中读取 Kafka 集群配置信息。

有 了 消息生产者之后 , 就可以调用 send 方法发送消息了。该方法的入参是 ProducerRecord类型对象 , ProducerRecord 类提供了多种构造函数形参,常见的有如下三种 :

ProducerRecord(topic,partition,key,value);

ProducerRecord(topic,key,value);

ProducerRecord(topic, value) ;

其中 topic 和 value 是必填的, partition 和 key 是可选的 。如果指定了 pa时tion,那么消息会被发送至指定的 partition ;如果没指定 partition 但指定了 Key,那么消息会按照 hash(key)发送至对应的 partition: 如果既没指定 partition 也没指定 key,那么 消息会按照 round-robin 模式发送(即以轮询的方式依次发送〉到每一个 partition。示例中将向 test-topic 主题发送三条消息。

3、消息消费者

和消息生产者类似,这里用 KafkaConsumer 类来创建一个消息消费者,该类的构造函数入参也是一系列属性值。

bootstrap. servers 和生产者一样,表示 Kafka 集群。

group.id 表示消费者的分组 ID。

enable.auto.commit 表示 Consumer 的 offset 是否自 动提交 。

auto.commit.interval .ms 用于设置自动提交 offset 到 ZooKeeper 的时间间隔,时间单位是毫秒。

key. deserializer 和 value.deserializer 表示用字符串来反序列化消息数据。

消息消费者使用 subscribe 方法 订阅了 Topic 为 test-topic 的消息。 Consumer 调用poll 方法来轮询 Kafka 集群的消息, 一直等到 Kafka 集群中没有消息或达到超时时间(示例中设置超时时间为 100 毫秒)为止 。 如果读取到消息,则打印出消息记录的 pa此ition, offset、key 等。

[img]

Kafka1.0.0如何查看队列消费进度

按照网上的方法尝试了./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper **:2181 --group ** --topic **

发现并不管用,查看了kafka 的jar包,貌似ConsumerOffsetChecker 类已经被移除,上github上看了芦让判下kafka的源码,告知我:

不管是kafka-consumer-offset-checker.sh脚本还是run-class.sh kafka.tools.ConsumerOffsetChecker都已经失效,可以使用kafka-consumer-groups.sh。

使用如下配置:

当然,这种方法是针对高级级kafka读取api,因为高级滑丛级api需要按照group中的consumer按照offset来读取数据,并且所有维护offset信息陪改都保存在zookeeper中。但是低级api不需要group,offset由consumer自行维护,所以针对这类topic用户是没法使用这个命令读取结果的。

kafka查询和修改topic的offset

输出

输出

注意如果你的kafka设置了zookeeper root,比如为/kafka,那么命令应该改为:

重启相关的应用程序,就可以从设置的offset开始读数据了。

手动更新Kafka存在Zookeeper中的偏移量。我们有时候需要手动将某个主题的偏移量设置成某个埋碰碧值,弯举这时候我们就需要更新Zookeeper中的数吵腊据了。Kafka内置为我们提供了修改偏移量的类:kafka.tools.UpdateOffsetsInZK,我们可以通过它修改Zookeeper中某个主题的偏移量,具体操作如下:

在不输入参数的情况下,我们可以得知kafka.tools.UpdateOffsetsInZK类需要输入的参数。我们的consumer.properties文件配置内容如下:

这个工具只能把Zookeeper中偏移量设置成earliest或者latest,如下:

关于kafkatools和kafkatools怎么看那个top数据较大的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。

标签列表