kafka查看topic列表(查看kafka的topic列表)

本篇文章给大家谈谈kafka查看topic列表,以及查看kafka的topic列表对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。

本文目录一览:

Kafka简介+Kafka Tool使用简介+使用实例

详细安装访问:

macOS 可以用homebrew快速安装,访问地址:

原文链接:

查看topic列表:

创建topic:

--create :创建命令;

--topic :后面指定topic名称;

--replication-factor :后面指定副本数;

--partitions :指定分区数,根据broker的数量决定;

--zookeeper :后面指定 zookeeper.connect 的zk链接

查看某个topic:

Kafka 作为消息雹岩烂系统的一种, 当然可 以像其他消 息中 间件一样作为消息数据中转的平台。 下面以 Java 语言为例,看一下如何使用 Kafka 来发送和接收消息。

1、引入依赖

2、消息生产者

示例 中用 KafkaProducer 类来创建一个消息生产者,该类的构造函数入参是一系列属性值。下面看一下这些属性具体都是什么含义。

bootstrap.servers 表示 Kafka 集群 。 如果集群中有多台物理服务器,则服务器地址之间用逗号分隔, 比如” 192.168.1.1 :9092,192.168.1.2:9092” 。 localhost 是笔者电脑的地址,9092 是 Kafka 服务器默认监听的端口号。

key.serializer 和 value.serializer 表示消息的序列化类型 。 Kafka 的消息是以键值对的形式发送到 Kafka 服务器的,在消息被发送到服务器之前,消息生产者需要把不同类型的 消息序列化为 二 进制类型,示例中是枣明发送文本消息到服务器 , 所以使用的是StringSerializer。

key.deserializer 和 value.deserializer 表示消息的反序列化类型。把来自 Kafka 集群的二进制消 息反序列 化 为指定 的 类型,因为序列化用的是String类型,所以用StringDeserializer 来反序列化。

zk.connect 用于指定 Kafka 连接 ZooKeeper 的 URL ,源漏提供了基于 ZooKeeper 的集群服务器自动感知功能, 可以动态从 ZooKeeper 中读取 Kafka 集群配置信息。

有 了 消息生产者之后 , 就可以调用 send 方法发送消息了。该方法的入参是 ProducerRecord类型对象 , ProducerRecord 类提供了多种构造函数形参,常见的有如下三种 :

ProducerRecord(topic,partition,key,value);

ProducerRecord(topic,key,value);

ProducerRecord(topic, value) ;

其中 topic 和 value 是必填的, partition 和 key 是可选的 。如果指定了 pa时tion,那么消息会被发送至指定的 partition ;如果没指定 partition 但指定了 Key,那么消息会按照 hash(key)发送至对应的 partition: 如果既没指定 partition 也没指定 key,那么 消息会按照 round-robin 模式发送(即以轮询的方式依次发送〉到每一个 partition。示例中将向 test-topic 主题发送三条消息。

3、消息消费者

和消息生产者类似,这里用 KafkaConsumer 类来创建一个消息消费者,该类的构造函数入参也是一系列属性值。

bootstrap. servers 和生产者一样,表示 Kafka 集群。

group.id 表示消费者的分组 ID。

enable.auto.commit 表示 Consumer 的 offset 是否自 动提交 。

auto.commit.interval .ms 用于设置自动提交 offset 到 ZooKeeper 的时间间隔,时间单位是毫秒。

key. deserializer 和 value.deserializer 表示用字符串来反序列化消息数据。

消息消费者使用 subscribe 方法 订阅了 Topic 为 test-topic 的消息。 Consumer 调用poll 方法来轮询 Kafka 集群的消息, 一直等到 Kafka 集群中没有消息或达到超时时间(示例中设置超时时间为 100 毫秒)为止 。 如果读取到消息,则打印出消息记录的 pa此ition, offset、key 等。

Kafka(四)集群之kafka

在章节二( )中,我们部署了单机的kafka,现在我们部署一套集群模式的kafka。

这里我准备了三台虚拟机:

192.168.184.134

192.168.184.135

192.168.184.136

每台机器部署一个zk和kafka。

上一章节中zk集群已经神中部署完毕。

在章节二中,134这台机器已经有kafka存在了,我们在另外两台机器上安装kafka:

在上面的文件中有几个关键点,我们一一进行配置,我会对配置中的说明翻译:

以下这两个listeners,advertised_listeners 是对外暴露的服务端口,真正建立连接用的是 listeners。

在内网中我们使用listenners就可以了,在docker等容器或云中使用advertised。游判山

下面这个是日志路径的配置

下面这个是个重点的东西,topic在磁盘上会分为多个partitions存储,相比单一文件存储,增加了并行性,在后续文章中会详细去讲解:

日志的保存时间:

以下是zookeeper的配置:

这里我们直接设置后台启动,三个节点都是如此:

这里面有个小坑,还记得之前我们搭建的单机环境吗?那时候默认的日志文件夹在/tmp/kafka-logs下面,生成了很多内容,导致我们134这个节点无法启动成功,报错如下:

解决这个问题只需要把/tmp/kafka-logs文件删除就好了。

看到日志出现这一句表明启动成功了:

下面我们验证下是否搭建成功了,首先使用kafkatool工机具连接看下:

我们在134节点创建一个topic:

查看topic列表:

在kafkatool中查看:

创建生产者:

创建消费者:

生成者发送冲游消息:

消费者接收消息:

到此为止,kafka的集群搭建已经完成了。在后面的文章我们会去学习如何在springboot中集成kafka。

Kafka查看topic、consumer group状态命令

以下命令中使用的zookeeper配置地址为127.0.0.1:2181,bootstrap--server(即broker)地址为: 127.0.0.1:9292

1,查看kafka topic列表,使用--list参数

bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list

__consumer_offsets

lx_test_topic

test

2,查看kafka特定topic的详情,源租使用--topic与--describe参数

bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic lx_test_topic --describe

Topic:lx_test_topic PartitionCount:1 ReplicationFactor:1 Configs:

Topic: lx_test_topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0

列出了lx_test_topic的parition数量、replica因子团知以及每个partition的leader、replica信息

3,查看consumer group列表,使用--list参数

查看consumer group列表有新、旧两种命令,分别查看新版(信息保存在broker中)consumer列表和老版(信息保雹或兆存在zookeeper中)consumer列表,因而需要区分指定bootstrap--server和zookeeper参数:

bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9292 --list

lx_test

bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --list

console-consumer-86845

console-consumer-11967

4,查看特定consumer group 详情,使用--group与--describe参数

同样根据新/旧版本的consumer,分别指定bootstrap-server与zookeeper参数:

bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9292 --group lx_test --describe

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER

lx_test lx_test_topic 0 465 465 0 kafka-python-1.3.1_/127.0.0.1

bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --group console-consumer-11967 --describe

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER

Could not fetch offset from zookeeper for group console-consumer-11967 partition [lx_test_topic,0] due to missing offset data in zookeeper.

console-consumer-11967 lx_test_topic 0 unknown 465 unknown console-consumer-11967_aws-lx-1513787888172-d3a91f05-0

其中依次展示group名称、消费的topic名称、partition id、consumer group最后一次提交的offset、最后提交的生产消息offset、消费offset与生产offset之间的差值、当前消费topic-partition的group成员id(不一定包含hostname)

上面示例中console-consumer-11967是为了测试临时起的一个console consumer,缺少在zookeeper中保存的current_offset信息。

查看Kafka某Topic的Partition详细信息时,使用如下哪个命令()

查看Kafka某Topic的Partition详细信神燃息时,使用如下哪个命令(渗瞎含)

A.bin/kafka-topics.sh--create

B.bin/丛笑kafka-topics.sh--delete

C.bin/kafka-topics.sh--describe(正确答案)

D.bin/kafka-topics.sh--1ist

linux 怎样查看kafka的某topic数据?

基于0.8.0版本。

##查看topic分布情况kafka-list-topic.sh

bin/kafka-list-topic.sh

--zookeeper

192.168.197.170:2181,192.168.197.171:2181

(列出所有topic的分区情况)

bin/kafka-list-topic.sh

--zookeeper

192.168.197.170:2181,192.168.197.171:2181

--topic

test

(查看test的分区情况)

其实kafka-list-topic.sh里面就一句

exec

$(dirname

$0)/kafka-run-class.sh

kafka.admin.listtopiccommand

$@

实际是通过

kafka-run-class.sh脚本执行的包kafka.admin下面乎仔茄的类

##创建topic

kafka-create-topic.sh

bin/kafka-create-topic.sh

--replica

2

--partition

8

--topic

test

--zookeeper

192.168.197.170:2181,192.168.197.171:2181

创建名为test的topic,

8个分区分别存放数据,数据备份总共2份

bin/kafka-create-topic.sh

--replica

1

--partition

1

--topic

test2

--zookeeper

192.168.197.170:2181,192.168.197.171:2181

结果

topic:

test2

partition:

leader:

170

replicas:

170

isr:

170

##重新分配分区kafka-reassign-partitions.sh

这个命令可以分区指定到想要的--broker-list上

bin/kafka-reassign-partitions.sh

--topics-to-move-json-file

topics-to-move.json

--broker-list

"171"

--zookeeper

192.168.197.170:2181,192.168.197.171:2181

--execute

cat

topic-to-move.json

{"topics":

[{"topic":

"test2"}],

"version":1

}

##为topic增加

partition数目kafka-add-partitions.sh

bin/kafka-add-partitions.sh

--topic

test

--partition

2

--zookeeper

192.168.197.170:2181,192.168.197.171:2181

(戚启为topic

test增加2个分区)

##控制台接收消息

bin/kafka-console-consumer.sh

--zookeeper

192.168.197.170:2181,192.168.197.171:2181

--from-beginning

--topic

test

##控制台发送消息

bin/kafka-console-producer.sh

--broker-list

192.168.197.170:9092,192.168.197.171:

9092

--topic

test

##手动均衡topic,

kafka-preferred-replica-election.sh

bin/kafka-preferred-replica-election.sh

--zookeeper

192.168.197.170:2181,192.168.197.171:2181

--path-to-json-file

preferred-click.json

cat

preferred-click.json

{

"partitions":

[

{"topic":

"click",

"partition":

0},

{"topic":

"click",

"partition":

1},

{"topic":

"click",

"partition":

2},

{"topic":

"click",

"partition":

3},

{"topic":

"click",

"partition":

4},

{"topic":

"click",

"partition":

5},

{"topic":

"click",

"partition":

6},

{"topic":

"click",

"partition":

7},

{"topic":

"play",

"partition":

0},

{"岁察topic":

"play",

"partition":

1},

{"topic":

"play",

"partition":

2},

{"topic":

"play",

"partition":

3},

{"topic":

"play",

"partition":

4},

{"topic":

"play",

"partition":

5},

{"topic":

"play",

"partition":

6},

{"topic":

"play",

"partition":

7}

]

}

##删除topic,慎用,只会删除zookeeper中的元数据,消息文件须手动删除

bin/kafka-run-class.sh

kafka.admin.deletetopiccommand

--topic

test666

--zookeeper

192.168.197.170:2181

,192.168.197.171:2181

[img]

关于kafka查看topic列表和查看kafka的topic列表的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。

标签列表