kafka配置(kafka的消费者组)
本篇文章给大家谈谈kafka配置,以及kafka的消费者组对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。
本文目录一览:
- 1、阿里大牛的Kafka动态配置了解下?
- 2、Kafka生产者开发,原理分析,以及参数配置
- 3、Kafka身份认证与权限控制配置
- 4、kafka配置参数详解
- 5、kafkak配置仅允许受信任的JNDI连接如何配置?
- 6、Kafka的Topic配置详解
阿里大牛的Kafka动态配置了解下?
在开始分享之前,我们先来复习一下设置 Kafka 参数,特别是 Broker 端参数的方法。
在 Kafka 安装目录的 config 路径下,有个 server.properties 文件。通常情况下,我们会指定这个文件的路径来启动 Broker。如果要设置 Broker 端的任何参数,我们必须在这个文件中显式地增加一行对应的配置,之后启动 Broker 进程,令参数生效。我们常见的做法是,一次性设置好所有参数之后,再启动 Broker。当后面需要变更任何参数时,我们必须重启 Broker。但生产环境中的服务器,怎么能随意重启呢?所以,目前修改 Broker 端参数是非常痛苦的过程。
基于这个痛点,社区于 1.1.0 版本中正式引入了动态 Broker 参数(Dynamic BrokerConfigs)。所谓动态,就是指修改参数值后,无需重启 Broker 就能立即生效,而之前在server.properties 中配置的参数则称为静态参数(Static Configs)。显然,动态调整参数值而无需重启服务,是非常实用的功能。如果你想体验动态 Broker 参数的话,那就赶快升级到 1.1 版本吧。
当然了,当前最新的 2.3 版本中的 Broker 端参数有 200 多个,社区并没有将每个参数都升级成动态参数,它仅仅是把一部分参数变成了可动态调整。那么,我们应该如何分辨哪些参数是动态参数呢?
如果你打开 1.1 版本之后(含 1.1)的 Kafka 官网,你会发现Broker Configs表中增加了Dynamic Update Mode 列。该列有 3 类值,分别是 read-only、per-broker 和 cluster-wide。我来解释一下它们的含义。
我来举个例子说明一下 per-broker 和大首 cluster-wide 的区别。Broker 端参数 listeners 想必你应该不陌生吧。它是一个 per-broker 参数,这表示你只能为单个 Broker 动态调整listeners,而不能直接调整一批 Broker 的 listeners。log.retention.ms 参数是 cluster-wide 级别的,Kafka 允许为集群内所有 Broker 统一设置一个日志留存时间值。当然了,你也可以为单个 Broker 修改此值。
你可能会问,动态 Broker 参数的使用场景都有哪些呢?实际上,因为不必重启 Broker,动态 Broker 参数的使用场景非常广泛,通常包括但不限于以下几种:
在这些使用场景中,动态调整线程池大小应该算是最实用的功能了。很多时候,当 KafkaBroker 入站流量(inbound data)激增时,会造成 Broker 端请求积压(Backlog)。有了动态参数,我们就能够动态增加网络线程数和 I/O 线程数,快速消耗一些积压。当突发流量过去后,我们也能将线程数调整回来,减少对资源的浪费。整个过程都不需要重启Broker。你甚至可以将这套调整线程数的动作,封装进定时任务中,以实现自动扩缩容。
由于动态滚链数配置的特殊性,它必然有和普通只读参数不同的保存机制。下面我来介绍一下Kafka 是如何保存动态配置的。
首先,Kafka 将动态 Broker 参数保存在 ZooKeeper 中,具体的 znode 路径如下图所示。
我来解释一下图中的内容。changes 是用来实时监测动态参数变更的,不会保存参数值;topics 是用来保存 Kafka 主题级别参数的。虽然它们不属于动态 Broker 端参数,但其实它们也是能够动态变更的。
users 和 clients 则是用于动态调整客户端配额(Quota)的 znode 节点。所谓配额,是指Kafka 运维人员限制连入集群的客户端的吞吐量或者是限定它们使用的 CPU 资源。
分析到这里,我们就会发现,/config/brokers znode 才是真正保存动态 Broker 参数的地方。该 znode 下有唤脊两大类子节点。第一类子节点就只有一个,它有个固定的名字叫 default ,保存的是前面说过的 cluster-wide 范围的动态参数;另一类则以 broker.id 为名,保存的是特定 Broker 的 per-broker 范围参数。由于是 per-broker 范围,因此这类子节点可能存在多个。
我们一起来看一张图片,它展示的是我的一个 Kafka 集群环境上的动态 Broker 端参数。
在这张图中,我首先查看了 /config/brokers 下的子节点,我们可以看到,这里面有 default 节点和名为 0、1 的子节点。 default 节点中保存了我设置的 cluster-wide范围参数;0 和 1 节点中分别保存了我为 Broker 0 和 Broker1 设置的 per-broker 参数。
接下来,我分别展示了 cluster-wide 范围和 per-broker 范围的参数设置。拿num.io.threads 参数为例,其 cluster-wide 值被动态调整为 12,而在 Broker 0 上被设置成 16,在 Broker 1 上被设置成 8。我为 Broker 0 和 Broker 1 单独设置的值,会覆盖掉cluster-wide 值,但在其他 Broker 上,该参数默认值还是按 12 计算。
如果我们再把静态参数加进来一起讨论的话,cluster-wide、per-broker 和 static 参数的优先级是这样的:per-broker 参数 cluster-wide 参数 static 参数 Kafka 默认值。
另外,如果你仔细查看上图中的 ephemeralOwner 字段 ,你会发现它们的值都是 0x0。这表示这些 znode 都是持久化节点,它们将一直存在。即使 ZooKeeper 集群重启,这些数据也不会丢失,这样就能保证这些动态参数的值会一直生效。
讲完了保存原理,我们来说说如何配置动态 Broker 参数。目前,设置动态参数的工具行命令只有一个,那就是 Kafka 自带的 kafka-configs 脚本。接下来,我来以unclean.leader.election.enable 参数为例,演示一下如何动态调整。
下面这条命令展示了如何在集群层面设置全局值,即设置 cluster-wide 范围值。
总体来说命令很简单,但有一点需要注意。 如果要设置 cluster-wide 范围的动态参数,需要显式指定 entity-default 。现在,我们使用下面的命令来查看一下刚才的配置是否成功。
从输出来看,我们成功地在全局层面上设置该参数值为 true。注意 sensitive=false 的字眼,它表明我们要调整的参数不是敏感数据。如果我们调整的是类似于密码这样的参数时,该字段就会为 true,表示这属于敏感数据。
好了,调整完 cluster-wide 范围的参数,我来演示下如何设置 per-broker 范围参数。我们还是以 unclean.leader.election.enable 参数为例,我现在为 ID 为 1 的 Broker 设置一个不同的值。命令如下:
同样,我们使用下列命令,来查看一下刚刚的设置是否生效了。
这条命令的输出信息很多。我们关注两点即可。
如果我们要删除 cluster-wide 范围参数或 per-broker 范围参数,也非常简单,分别执行下面的命令就可以了。
删除动态参数要指定 delete-config 。当我们删除完动态参数配置后,再次运行查看命令,结果如下:
此时,刚才配置的所有动态参数都已经被成功移除了。
刚刚我只是举了一个参数的例子,如果你想要知道动态 Broker 参数都有哪些,一种方式是在 Kafka 官网中查看 Broker 端参数列表,另一种方式是直接运行无参数的 kafka-configs脚本,该脚本的说明文档会告诉你当前动态 Broker 参数都有哪些。我们可以先来看看下面这两张图。
看到有这么多动态 Broker 参数,你可能会问:这些我都需要调整吗?你能告诉我最常用的几个吗?根据我的实际使用经验,我来跟你分享一些有较大几率被动态调整值的参数。
1.log.retention.ms 。
修改日志留存时间应该算是一个比较高频的操作,毕竟,我们不可能完美地预估所有业务的消息留存时长。虽然该参数有对应的主题级别参数可以设置,但拥有在全局层面上动态变更的能力,依然是一个很好的功能亮点。
2.num.io.threads 和 num.network.threads 。
这是我们在前面提到的两组线程池。就我个人而言,我觉得这是动态 Broker 参数最实用的场景了。毕竟,在实际生产环境中,Broker 端请求处理能力经常要按需扩容。如果没有动态 Broker 参数,我们是无法做到这一点的。
3. 与 SSL 相关的参数 。
主要是 4 个参数(ssl.keystore.type、ssl.keystore.location、ssl.keystore.password 和ssl.key.password)。允许动态实时调整它们之后,我们就能创建那些过期时间很短的 SSL证书。每当我们调整时,Kafka 底层会重新配置 Socket 连接通道并更新 Keystore。新的连接会使用新的 Keystore,阶段性地调整这组参数,有利于增加安全性。
4.num.replica.fetchers 。
这也是我认为的最实用的动态 Broker 参数之一。Follower 副本拉取速度慢,在线上Kafka 环境中一直是一个老大难的问题。针对这个问题,常见的做法是增加该参数值,确保有充足的线程可以执行 Follower 副本向 Leader 副本的拉取。现在有了动态参数,你不需要再重启 Broker,就能立即在 Follower 端生效,因此我说这是很实用的应用场景。
本文重点讨论了 Kafka 1.1.0 版本引入的动态 Broker 参数。这类参数最大的好处在于,无需重启 Broker,就可以令变更生效,因此能够极大地降低运维成本。除此之外,还给出了动态参数的保存机制和设置方法。
[img]Kafka生产者开发,原理分析,以及参数配置
生产者开发(基于java),生产者发送消息主要有以下三步
那么我们进行抽象,大致可以得到这两个类。
另外Kafka为了表现以下封装的特性,把准备生产者的参数配成了一个Properties类,
以这个类为KafkaProducer构造函数入参。
那么KafkaProducer的参数具体可以配置什么呢?
由123步可知,可以配置拦截器,序列化器,分区器。
这些都可以自己实现特定接口(ProducerInterceptor,Serializer,Partioner),
然后放到Properties里面,最后给KafkaProducer
拦截器就是对ProducerRecord做一些处理,然后返回处理过的新的ProducerRecord(自定义拦截策略)
序列化器是要讲java对象转成byte[]数组然后进行网络传输(自己定义序列化策略)
分区器就是为消息选择分区(这里自己可以设计分区策略)
再次回到这张图
可以看到,有两个线程在完成消息的发送,一个是主线程,一核衡个是Sender线程。
主线程经过123步后,会将同一个partition的多个Record封装(压缩)到一个ProducerBatch对象中,
这样的目的是方便传输,提高效改毁做率,RecordAccumulator里面维持着一个双端ProducerBatch队列数组,
然后Sender线程从队头取ProducerBatch封装成Request,这里设计到一个逻辑到物理的转换。
分区是逻辑的,而broker才是物理的,一个区对应一个broker,所以要转换。
另外Sender线程里面维持了一个以Nodeid(就是对应broker)为Key,DequeRequest为值的Map,
这里面的Request指的是那种没有Response的Request。一旦有了Response就会清理掉余颂的。
这个是由通过leastLoadedNode节点实现的,不多说了。
其实除了123步中的参数,还有其它参数,这里就说一个
ack
acks=1。默认值即为1。生产者发送消息之后,只要分区的leader副本成功写入消息,那么它就会收到来自服务端的成功响应。
acks=0。生产者发送消息之后不需要等待任何服务端的响应。
acks=-1或acks=all。生产者在消息发送之后,需要等待ISR中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。
Kafka身份认证与权限控制配置
编辑原有配置文件vi /home/wucan/kafka/kafka_2.11-1.0.0/config/server.properties
listeners=SASL_PLAINTEXT://192.168.43.209:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
allow.everyone.if.no.acl.found=true
super.users=User:root
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
创建新的配置文件vi /home/wucan/kafka/kafka_2.11-1.0.0/config/kafka_server_jaas.conf
KafkaServer{
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka"
password="kafkapswd"
user_ kafkaa(用户名)="kafkaapswd"(密码)
user_ kafkab(用户名冲租)=" kafkabpswd"(密码)
user_ kafkac(用户名)=" kafkacpswd"(密码)
user_ kafkad(用户名)=" kafkadpswd"(密码);
};
修改执行文件vi /home/wucan/kafka/kafka_2.11-1.0.0/bin/kafka-server-start.sh
if ["x$KAFKA_OPTS" ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config=/空判拦home/wucan/kafka/kafka_2.11-1.0.0/config/kafka_server_jaas.conf"
fi
修改执行文件vi /home/wucan/kafka/kafka_2.11-1.0.0/bin/kafka-run-class.sh
KAFKA_SASL_OPTS='-Djava.security.auth.login.config=/斗胡home/wucan/kafka/kafka_2.11-1.0.0/config/kafka_server_jaas.conf'
if ["x$DAEMON_MODE" = "xtrue" ]; then
nohup $JAVA $KAFKA_HEAP_OPTS$KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS$KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" "$CONSOLE_OUTPUT_FILE" 21 /dev/null
else
exec $JAVA $KAFKA_HEAP_OPTS$KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH$KAFKA_OPTS "$@"
fi
创建新的配置文件vi /home/wucan/kafka/kafka_2.11-1.0.0/config/kafka_client_jaas.conf
KafkaClient{
org.apache.kafka.common.security.plain.PlainLoginModule required
username=" kafkaa"
password=" kafkaapswd";
};
修改执行文件
vi /home/wucan/kafka/kafka_2.11-1.0.0/bin/kafka-console-consumer.sh
vi /home/wucan/kafka/kafka_2.11-1.0.0/bin/kafka-console-producer.sh
if ["x$KAFKA_OPTS" ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/wucan/kafka/kafka_2.11-1.0.0/config/kafka_client_jaas.conf"
fi
运行jar包的服务器的指定路径下必须有kafka_ client_ jaas.conf文件
在程序中添加如下配置
System.setProperty("java.security.auth.login.config","xx/kafka_client_jaas.conf");
props.put("security.protocol","SASL_PLAINTEXT");
props.put("sasl.mechanism","PLAIN");
问题描述:发布消息、订阅消息时,出现如下错误,WARN [Consumer clientId=consumer-1, groupId=console-consumer-20752]Error while fetching metadata with correlation id 2 :{test2=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
解决方法:各客户端的用户名设置为相同,多个客户端同时管理会产生冲突。
kafka配置参数详解
kafka的配置分为 broker、帆态producter、consumer三个不同的配置
一 BROKER 的全局配置
最为核心的三个配置 broker.id、陵吵log.dir、zookeeper.connect 。
------------------------------------------- 系统 相关 -------------------------------------------
broker.id =1
log.dirs = /tmp/kafka-logs
port =6667
message.max.bytes =1000000
num.network.threads =3
num.io.threads =8
background.threads =4
queued.max.requests =500
host.name
advertised.host.name
advertised.port
socket.send.buffer.bytes =100*1024
socket.receive.buffer.bytes =100*1024
socket.request.max.bytes =100 1024 1024
------------------------------------------- LOG 相关 -------------------------------------------
log.segment.bytes =1024 1024 1024
log.roll.hours =24*7
log.cleanup.policy = delete
log.retention.minutes=7days
指定日志每隔多久检查看是否可以被删除,默认1分钟
log.cleanup.interval.mins=1
log.retention.bytes=-1
log.retention.check.interval.ms=5minutes
log.cleaner.enable=false
log.cleaner.threads =1
log.cleaner.io.max.bytes.per.second=None
log.cleaner.dedupe.buffer.size=500 1024 1024
log.cleaner.io.buffer.size=512*1024
log.cleaner.io.buffer.load.factor =0.9
log.cleaner.backoff.ms =15000
log.cleaner.min.cleanable.ratio=0.5
log.cleaner.delete.retention.ms =1day
log.index.size.max.bytes =10 1024 1024
log.index.interval.bytes =4096
log.flush.interval.messages=None
log.flush.scheduler.interval.ms =3000
log.flush.interval.ms = None
log.delete.delay.ms =60000
log.flush.offset.checkpoint.interval.ms =60000
------------------------------------------- TOPIC 相关 -------------------------------------------
auto.create.topics.enable =true
default.replication.factor =1
num.partitions =1
实例 --replication-factor3--partitions1--topic replicated-topic :名称replicated-topic有一个分尺轿侍区,分区被复制到三个broker上。
----------------------------------复制(Leader、replicas) 相关 ----------------------------------
controller.socket.timeout.ms =30000
controller.message.queue.size=10
replica.lag.time.max.ms =10000
replica.lag.max.messages =4000
replica.socket.timeout.ms=30*1000
replica.socket.receive.buffer.bytes=64*1024
replica.fetch.max.bytes =1024*1024
replica.fetch.wait.max.ms =500
replica.fetch.min.bytes =1
num.replica.fetchers=1
replica.high.watermark.checkpoint.interval.ms =5000
controlled.shutdown.enable =false
controlled.shutdown.max.retries =3
controlled.shutdown.retry.backoff.ms =5000
auto.leader.rebalance.enable =false
leader.imbalance.per.broker.percentage =10
leader.imbalance.check.interval.seconds =300
offset.metadata.max.bytes
----------------------------------ZooKeeper 相关----------------------------------
zookeeper.connect = localhost:2181
zookeeper.session.timeout.ms=6000
zookeeper.connection.timeout.ms =6000
zookeeper.sync.time.ms =2000
配置的修改
其中一部分配置是可以被每个topic自身的配置所代替,例如
新增配置
bin/kafka-topics.sh --zookeeper localhost:2181--create --topic my-topic --partitions1--replication-factor1--config max.message.bytes=64000--config flush.messages=1
修改配置
bin/kafka-topics.sh --zookeeper localhost:2181--alter --topic my-topic --config max.message.bytes=128000
删除配置 :
bin/kafka-topics.sh --zookeeper localhost:2181--alter --topic my-topic --deleteConfig max.message.bytes
二 CONSUMER 配置
最为核心的配置是group.id、zookeeper.connect
group.id
consumer.id
client.id = group id value
zookeeper.connect=localhost:2182
zookeeper.session.timeout.ms =6000
zookeeper.connection.timeout.ms =6000
zookeeper.sync.time.ms =2000
auto.offset.reset = largest
socket.timeout.ms=30*1000
socket.receive.buffer.bytes=64*1024
fetch.message.max.bytes =1024*1024
auto.commit.enable =true
auto.commit.interval.ms =60*1000
queued.max.message.chunks =10
rebalance.max.retries =4
rebalance.backoff.ms =2000
refresh.leader.backoff.ms
fetch.min.bytes =1
fetch.wait.max.ms =100
consumer.timeout.ms = -1
三 PRODUCER 的配置
比较核心的配置:metadata.broker.list、request.required.acks、producer.type、serializer.class
metadata.broker.list
request.required.acks =0
request.timeout.ms =10000
send.buffer.bytes=100*1024
key.serializer.class
partitioner.class=kafka.producer.DefaultPartitioner
compression.codec = none
compressed.topics=null
message.send.max.retries =3
retry.backoff.ms =100
topic.metadata.refresh.interval.ms =600*1000
client.id=""
------------------------------------------- 消息模式 相关 -------------------------------------------
producer.type=sync
queue.buffering.max.ms =5000
queue.buffering.max.messages =10000
queue.enqueue.timeout.ms = -1
batch.num.messages=200
serializer.class= kafka.serializer.DefaultEncoder
kafkak配置仅允许受信任的JNDI连接如何配置?
kafka的配置分为 broker、producter、consumer三个不同的配置
一 BROKER 的全局配置
最为核心的三个配置 broker.id、log.dir、zookeeper.connect 。
------------------------------------------- 系统 相关 -------------------------------------------
##每一个broker在集群中的唯一标示,要求是正数。在改变IP地址,不改变broker.id的话不会影响consumers
broker.id =1
##kafka数据的存放地址,多个地址的话用逗号分割 /tmp/kafka-logs-1,/tmp/kafka-logs-2
log.dirs = /tmp/kafka-logs
##提供给客户端响应的端口
port =6667
##消息体的最大大小,单位是字节
message.max.bytes =
## broker 处理消息的最大线程数,一般情况下不需要去修改
num.network.threads =3
## broker处理磁盘IO 的线程数 ,数值应该大于你的硬盘数
num.io.threads =8
## 一些后台任务处理的线程数,例如过期消息文件的删除等,一般情况下不需要去做修改
background.threads =4
## 等待IO线程处理的请求队列最大数,若是等待IO的请求超过这个数值,那么会停止接受外部消息,算是一种自我保护机制
queued.max.requests =500
##broker的主机地址,若是设置了,那么会绑定到这个地址上,若是没有,会绑定到所有的接口上,并将其中之一发送到ZK,一般不设置
host.name
## 打广告的地址,若是设置的话,会提供给producers, consumers,其他broker连接,具体如何使用还未深究
advertised.host.name
## 广告地址端口,必须不同于port中的设置
advertised.port
## socket的发送缓冲区,socket的调优参数SO_SNDBUFF
socket.send.buffer.bytes =100*1024
## socket的接受缓冲区,socket的调优参数SO_RCVBUFF
socket.receive.buffer.bytes =100*1024
## socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆芹纤盖
socket.request.max.bytes =100*1024*1024
------------------------------------------- LOG 相关 -------------------------------------------
## topic的分区是以一堆segment文件存储贺首昌的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖
log.segment.bytes =1024*1024*1024
## 这个参数会在日志segment没有达到log.segment.bytes设置的大小,也会强制新建一个segment 会被 topic创建时禅扒的指定参数覆盖
log.roll.hours =24*7
## 日志清理策略 选择有:delete和compact 主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖
log.cleanup.policy = delete
## 数据存储的最大时间 超过这个时间 会根据log.cleanup.policy设置的策略处理数据,也就是消费端能够多久去消费数据
## log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖
log.retention.minutes=7days
指定日志每隔多久检查看是否可以被删除,默认1分钟
log.cleanup.interval.mins=1
## topic每个分区的最大文件大小,一个topic的大小限制 = 分区数*log.retention.bytes 。-1没有大小限制
## log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖
log.retention.bytes=-1
## 文件大小检查的周期时间,是否处罚 log.cleanup.policy中设置的策略
log.retention.check.interval.ms=5minutes
## 是否开启日志压缩
log.cleaner.enable=false
## 日志压缩运行的线程数
log.cleaner.threads =1
## 日志压缩时候处理的最大大小
log.cleaner.io.max.bytes.per.second=None
## 日志压缩去重时候的缓存空间 ,在空间允许的情况下,越大越好
log.cleaner.dedupe.buffer.size=500*1024*1024
## 日志清理时候用到的IO块大小 一般不需要修改
log.cleaner.io.buffer.size=512*1024
## 日志清理中hash表的扩大因子 一般不需要修改
log.cleaner.io.buffer.load.factor =0.9
## 检查是否处罚日志清理的间隔
log.cleaner.backoff.ms =15000
## 日志清理的频率控制,越大意味着更高效的清理,同时会存在一些空间上的浪费,会被topic创建时的指定参数覆盖
log.cleaner.min.cleanable.ratio=0.5
## 对于压缩的日志保留的最长时间,也是客户端消费消息的最长时间,同log.retention.minutes的区别在于一个控制未压缩数据,一个控制压缩后的数据。会被topic创建时的指定参数覆盖
log.cleaner.delete.retention.ms =1day
## 对于segment日志的索引文件大小限制,会被topic创建时的指定参数覆盖
log.index.size.max.bytes =10*1024*1024
## 当执行一个fetch操作后,需要一定的空间来扫描最近的offset大小,设置越大,代表扫描速度越快,但是也更好内存,一般情况下不需要搭理这个参数
log.index.interval.bytes =4096
## log文件"sync"到磁盘之前累积的消息条数
## 因为磁盘IO操作是一个慢操作,但又是一个"数据可靠性"的必要手段
## 所以此参数的设置,需要在"数据可靠性"与"性能"之间做必要的权衡.
## 如果此值过大,将会导致每次"fsync"的时间较长(IO阻塞)
## 如果此值过小,将会导致"fsync"的次数较多,这也意味着整体的client请求有一定的延迟.
## 物理server故障,将会导致没有fsync的消息丢失.
log.flush.interval.messages=None
## 检查是否需要固化到硬盘的时间间隔
log.flush.scheduler.interval.ms =3000
## 仅仅通过interval来控制消息的磁盘写入时机,是不足的.
## 此参数用于控制"fsync"的时间间隔,如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔
## 达到阀值,也将触发.
log.flush.interval.ms = None
## 文件在索引中清除后保留的时间 一般不需要去修改
log.delete.delay.ms =60000
## 控制上次固化硬盘的时间点,以便于数据恢复 一般不需要去修改
log.flush.offset.checkpoint.interval.ms =60000
------------------------------------------- TOPIC 相关 -------------------------------------------
## 是否允许自动创建topic ,若是false,就需要通过命令创建topic
auto.create.topics.enable =true
## 一个topic ,默认分区的replication个数 ,不得大于集群中broker的个数
default.replication.factor =1
## 每个topic的分区个数,若是在topic创建时候没有指定的话 会被topic创建时的指定参数覆盖
num.partitions =1
实例 --replication-factor3--partitions1--topic replicated-topic :名称replicated-topic有一个分区,分区被复制到三个broker上。
----------------------------------复制(Leader、replicas) 相关 ----------------------------------
## partition leader与replicas之间通讯时,socket的超时时间
controller.socket.timeout.ms =30000
## partition leader与replicas数据同步时,消息的队列尺寸
controller.message.queue.size=10
## replicas响应partition leader的最长等待时间,若是超过这个时间,就将replicas列入ISR(in-sync replicas),并认为它是死的,不会再加入管理中
replica.lag.time.max.ms =10000
## 如果follower落后与leader太多,将会认为此follower[或者说partition relicas]已经失效
## 通常,在follower与leader通讯时,因为网络延迟或者链接断开,总会导致replicas中消息同步滞后
## 如果消息之后太多,leader将认为此follower网络延迟较大或者消息吞吐能力有限,将会把此replicas迁移
## 到其他follower中.
## 在broker数量较少,或者网络不足的环境中,建议提高此值.
replica.lag.max.messages =4000
##follower与leader之间的socket超时时间
replica.socket.timeout.ms=30*1000
## leader复制时候的socket缓存大小
replica.socket.receive.buffer.bytes=64*1024
## replicas每次获取数据的最大大小
replica.fetch.max.bytes =1024*1024
## replicas同leader之间通信的最大等待时间,失败了会重试
replica.fetch.wait.max.ms =500
## fetch的最小数据尺寸,如果leader中尚未同步的数据不足此值,将会阻塞,直到满足条件
replica.fetch.min.bytes =1
## leader 进行复制的线程数,增大这个数值会增加follower的IO
num.replica.fetchers=1
## 每个replica检查是否将最高水位进行固化的频率
replica.high.watermark.checkpoint.interval.ms =5000
## 是否允许控制器关闭broker ,若是设置为true,会关闭所有在这个broker上的leader,并转移到其他broker
controlled.shutdown.enable =false
## 控制器关闭的尝试次数
controlled.shutdown.max.retries =3
## 每次关闭尝试的时间间隔
controlled.shutdown.retry.backoff.ms =5000
## 是否自动平衡broker之间的分配策略
auto.leader.rebalance.enable =false
## leader的不平衡比例,若是超过这个数值,会对分区进行重新的平衡
leader.imbalance.per.broker.percentage =10
## 检查leader是否不平衡的时间间隔
leader.imbalance.check.interval.seconds =300
## 客户端保留offset信息的最大空间大小
offset.metadata.max.bytes
----------------------------------ZooKeeper 相关----------------------------------
##zookeeper集群的地址,可以是多个,多个之间用逗号分割 hostname1:port1,hostname2:port2,hostname3:port3
zookeeper.connect = localhost:2181
## ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大
zookeeper.session.timeout.ms=6000
## ZooKeeper的连接超时时间
zookeeper.connection.timeout.ms =6000
## ZooKeeper集群中leader和follower之间的同步实际那
zookeeper.sync.time.ms =2000
配置的修改
其中一部分配置是可以被每个topic自身的配置所代替,例如
新增配置
bin/kafka-topics.sh --zookeeper localhost:2181--create --topic my-topic --partitions1--replication-factor1--config max.message.bytes=64000--config flush.messages=1
修改配置
bin/kafka-topics.sh --zookeeper localhost:2181--alter --topic my-topic --config max.message.bytes=128000
删除配置 :
bin/kafka-topics.sh --zookeeper localhost:2181--alter --topic my-topic --deleteConfig max.message.bytes
二 CONSUMER 配置
最为核心的配置是group.id、zookeeper.connect
## Consumer归属的组ID,broker是根据group.id来判断是队列模式还是发布订阅模式,非常重要
group.id
## 消费者的ID,若是没有设置的话,会自增
consumer.id
## 一个用于跟踪调查的ID ,最好同group.id相同
client.id = group id value
## 对于zookeeper集群的指定,可以是多个 hostname1:port1,hostname2:port2,hostname3:port3 必须和broker使用同样的zk配置
zookeeper.connect=localhost:2182
## zookeeper的心跳超时时间,查过这个时间就认为是dead消费者
zookeeper.session.timeout.ms =6000
## zookeeper的等待连接时间
zookeeper.connection.timeout.ms =6000
## zookeeper的follower同leader的同步时间
zookeeper.sync.time.ms =2000
## 当zookeeper中没有初始的offset时候的处理方式 。smallest :重置为最小值 largest:重置为最大值 anythingelse:抛出异常
auto.offset.reset = largest
## socket的超时时间,实际的超时时间是:max.fetch.wait + socket.timeout.ms.
socket.timeout.ms=30*1000
## socket的接受缓存空间大小
socket.receive.buffer.bytes=64*1024
##从每个分区获取的消息大小限制
fetch.message.max.bytes =1024*1024
## 是否在消费消息后将offset同步到zookeeper,当Consumer失败后就能从zookeeper获取最新的offset
auto.commit.enable =true
## 自动提交的时间间隔
auto.commit.interval.ms =60*1000
## 用来处理消费消息的块,每个块可以等同于fetch.message.max.bytes中数值
queued.max.message.chunks =10
## 当有新的consumer加入到group时,将会reblance,此后将会有partitions的消费端迁移到新
## 的consumer上,如果一个consumer获得了某个partition的消费权限,那么它将会向zk注册
##"Partition Owner registry"节点信息,但是有可能此时旧的consumer尚没有释放此节点,
## 此值用于控制,注册节点的重试次数.
rebalance.max.retries =4
## 每次再平衡的时间间隔
rebalance.backoff.ms =2000
## 每次重新选举leader的时间
refresh.leader.backoff.ms
## server发送到消费端的最小数据,若是不满足这个数值则会等待,知道满足数值要求
fetch.min.bytes =1
## 若是不满足最小大小(fetch.min.bytes)的话,等待消费端请求的最长等待时间
fetch.wait.max.ms =100
## 指定时间内没有消息到达就抛出异常,一般不需要改
consumer.timeout.ms = -1
三 PRODUCER 的配置
比较核心的配置:metadata.broker.list、request.required.acks、producer.type、serializer.class
## 消费者获取消息元信息(topics, partitions and replicas)的地址,配置格式是:host1:port1,host2:port2,也可以在外面设置一个vip
metadata.broker.list
##消息的确认模式
##0:不保证消息的到达确认,只管发送,低延迟但是会出现消息的丢失,在某个server失败的情况下,有点像TCP
##1:发送消息,并会等待leader 收到确认后,一定的可靠性
## -1:发送消息,等待leader收到确认,并进行复制操作后,才返回,最高的可靠性
request.required.acks =0
## 消息发送的最长等待时间
request.timeout.ms =10000
## socket的缓存大小
send.buffer.bytes=100*1024
## key的序列化方式,若是没有设置,同serializer.class
key.serializer.class
## 分区的策略,默认是取模
partitioner.class=kafka.producer.DefaultPartitioner
## 消息的压缩模式,默认是none,可以有gzip和snappy
compression.codec = none
## 可以针对默写特定的topic进行压缩
compressed.topics=null
## 消息发送失败后的重试次数
message.send.max.retries =3
## 每次失败后的间隔时间
retry.backoff.ms =100
## 生产者定时更新topic元信息的时间间隔 ,若是设置为0,那么会在每个消息发送后都去更新数据
topic.metadata.refresh.interval.ms =600*1000
## 用户随意指定,但是不能重复,主要用于跟踪记录消息
client.id=""
------------------------------------------- 消息模式 相关 -------------------------------------------
## 生产者的类型 async:异步执行消息的发送 sync:同步执行消息的发送
producer.type=sync
## 异步模式下,那么就会在设置的时间缓存消息,并一次性发送
queue.buffering.max.ms =5000
## 异步的模式下 最长等待的消息数
queue.buffering.max.messages =10000
## 异步模式下,进入队列的等待时间 若是设置为0,那么要么进入队列,要么直接抛弃
queue.enqueue.timeout.ms = -1
## 异步模式下,每次发送的最大消息数,前提是触发了queue.buffering.max.messages或是queue.buffering.max.ms的限制
batch.num.messages=200
## 消息体的系列化处理类 ,转化为字节流进行传输
serializer.class= kafka.serializer.DefaultEncoder
Kafka的Topic配置详解
配置topic级别参数时,相同(参数)属性topic级别会覆盖全局的,否则默认为全局配置属性值。
创建topic参数可以设置一个或多个--config "Property(属性)",下面是创建一个topic名称为"my-topic"例子,它设置了2个参数max message size 和 flush rate.
(A)创建topic时配置参数
(B)修改topic时配置参数
覆盖已经有topic参数,下面例子修改"my-topic"的max message属性
(C)删除猛培游topic级别配置参数
注:配置中散的kafka集群枝销的根目录为/config/mobile/mq/mafka02,因此所有节点信息都在此目录下。
cleanup.policy
delete.retention.ms
delete.retention.ms
flush.messages
flush.ms
index.interval.bytes
message.max.bytes
min.cleanable.dirty.ratio
retention.bytes
retention.ms
segment.bytes
segment.index.bytes
log.roll.hours
参考资料:
关于kafka配置和kafka的消费者组的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。