kafka版本(kafka版本选择)
本篇文章给大家谈谈kafka版本,以及kafka版本选择对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。
本文目录一览:
- 1、kafka——消费者原理解析
- 2、FlinkKafkaConsumer官网文档翻译
- 3、Kafka压缩
- 4、kafka2.8.0版本(一):搭建部署
- 5、kafka生产者怎么样能够保障数据不丢,不重复且分区内数据有序!
kafka——消费者原理解析
kafka采用发布订阅模式:一对多。发布订阅模式又分两种:
Kafka为这两种模型提供了单一的消费者抽象模型: 消费者组 (consumer group)。 消费者用一个消费者组名标记自己。 一缺昌个发布在Topic上消息被分发给此消费者组中的一个消费者。 假如所有的消费者都在一个组中,那么这就变成了队列模型。 假如所有的消费者都在不同的组中,那么就完全变成了发布-订阅模型。 一个消费者组中消费者订阅同一个Topic,每个消费者接受Topic的一部分分区的消息,从而实现对消费者的横向扩展,对消息进行分流。
注意:当单个消费者无法跟上数据生成的速度,就可以增加更多的消费者分担负载,每个消费者只处理部分partition的消息,从而实现单个应用程序的横向伸缩。但是不要让消费者的数量多于partition的数量,此时多余的消费者会空闲。此外,Kafka还允许多个应用程序从同一个Topic读取所有的消息,此时只要保证每个应用程序有自己的消费者组即可。
消费者组的概念就是:当有多个应用程序都需要从Kafka获取消息时,让每个app对应一个消费者组,从而使每个应用程序都能获取一个或多个Topic的全部消息;在每个消费者组中,往消费者组中添加消费者来伸缩读取能力和处理能力,消费者组中的每个消费者只处理每个Topic的一部分的消息,每个消费者对应一个线程。
在同一个群组中,无法让一个线程运行多个消费者,也无法让多线线程安全地共享一个消费者。按照规则,一个消费者使用一个线程,如果要在同一个消费者组中运行多个消费者,需要让每个消费者运行在自己的线程中。最好把消费者的逻辑封装在自己的对象中,然后使用java的ExecutorService启动多个线程,使每个消费者运行在自己的线程上,可参考
一个 consumer group 中有多个 consumer,一个 topic 有多个 partition,所以必然会涉及到 partition 的分配问题,即确定哪个 partition 由哪个 consumer 来消费。
关于如何设置partition值需要考虑的因素
Kafka 有两种分配策略,一个是 RoundRobin,一个是 Range,默认为Range,当消费者组内消费者发生变化时,会触发分区分配策略(方法重新分配)。
以上三种现象会使partition的所有权在消费者之间转移,这样的行为叫作再均衡。
再均衡的优点 :
再均尺慎衡的缺点 :
RoundRobin 轮询方式将分区所有作为一个整体进行 Hash 排序,消费者组内分配分区个数最大差别为 1,是按照组来分的,可以解决多个消费者消费数据不均衡的问题。
但是,当消费者组内订阅不同主题时,可能造成消费混乱,如下图所示,Consumer0 订阅主题 A,Consumer1 订阅主题 B。
将 A、B 主题的分区排序后分配给消费者组,TopicB 分区中的数据可能 分配到 Consumer0 中。
Range 方式是按照主题来分的,不会产生轮询方式的消费混乱问题。
但是,如下图所示,Consumer0、Consumer1 同时订阅了主题 A 和伏困扒 B,可能造成消息分配不对等问题,当消费者组内订阅的主题越多,分区分配可能越不均衡。
由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。
consumer group +topic + partition 唯一确定一个offest
Kafka 0.9 版本之前,consumer 默认将 offset 保存在 Zookeeper 中,从 0.9 版本开始,
consumer 默认将 offset 保存在 Kafka 一个内置的 topic 中,该 topic 为__consumer_offsets。
你如果特别好奇,实在想看看offset什么的,也可以执行下面操作:
修改配置文件 consumer.properties
再启动一个消费者
当消费者崩溃或者有新的消费者加入,那么就会触发再均衡(rebalance),完成再均衡后,每个消费者可能会分配到新的分区,而不是之前处理那个,为了能够继续之前的工作,消费者需要读取每个partition最后一次提交的偏移量,然后从偏移量指定的地方继续处理。
case1:如果提交的偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理。
case2:如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失。
自动提交的优点是方便,但是可能会重复处理消息
不足:broker在对提交请求作出回应之前,应用程序会一直阻塞,会限制应用程序的吞吐量。
因此,在消费者关闭之前一般会组合使用commitAsync和commitSync提交偏移量。
ConsumerRebalanceListener需要实现的两个方法
下面的例子演示如何在失去partition的所有权之前通过onPartitionRevoked()方法来提交偏移量。
Consumer有个Rebalance的特性,即重新负载均衡,该特性依赖于一个协调器来实现。每当Consumer Group中有Consumer退出或有新的Consumer加入都会触发Rebalance。
之所以要重新负载均衡,是为了将退出的Consumer所负责处理的数据再重新分配到组内的其他Consumer上进行处理。或当有新加入的Consumer时,将组内其他Consumer的负载压力,重新进均匀分配,而不会说新加入一个Consumer就闲在那。
下面就用几张图简单描述一下,各种情况触发Rebalance时,组内成员是如何与协调器进行交互的。
Tips :图中的Coordinator是协调器,而generation则类似于乐观锁中的版本号,每当成员入组成功就会更新,也是起到一个并发控制的作用。
参考:
[img]FlinkKafkaConsumer官网文档翻译
以下内容翻译者段空自 Apache Flink Kafka Connector官网(内容顺序稍作修改)
Flink为Kafka topic读取和写入数据提供了特殊的Kafka连接器。Flink Kafka消费者与Flink的检查点机制集成可以保证下游的exactly-once语义。为了实现这一点,Flink并不完全依赖Kafka自身维护的消费者组offset,而燃渣是在Flink内部管理这些offset。
通用kafka(1.0.0版本及以后):
kafka版本(0.11版本及以前):
构造器有以下参数:
Flink Kafka Consumer如何将Kafka中的二进制数据转换为Java / Scala对象。这就需要指定序列化和反序列化方式。每个Kafka消息都会调用 T deserialize(byte[] message) 方法。
后边的太难了,自己去官网看吧:
说明:
启用Flink的checkpoint机制后,Flink Kafka Consumer在消费kafka消息的同时,会周期的并保持一致性的将offset写入checkpoint中。如果作业失败,Flink会将程序恢复到最新的checkpoint的状态,并从存储在checkpoint中的偏移量开始重新消费Kafka中的消息。
注意,只有在有足够的slots可用来首瞎重新启动时,Flink才能重新启动。
如果没有启用checkpoint机制,Kafka使用者将定期向Zookeeper提交偏移量。
Flink Kafka Consumer可以将偏移量提交到Kafka broker(或0.8中的Zookeeper)。注意,Flink Kafka Consumer并不依赖提交的偏移量来保证容错。提交的偏移量只是用于方便查看Consumer消费的进度。
提交偏移量有多种不同的方式,与是否为Job启用了checkpoint机制有关。
Kafka压缩
首先说明一点kafka的压缩和kafka的compact是不同的,compact就是相同的斗好拦key只保留一条,是数据清理方式的一种和kafka的定期删除策略是并列的;而kafka的压缩是指数据不删除只是采用压缩算法进行压缩。
kafka从0.7版本就开始支持压缩功能:
1)kafka的发送端将消息按照批量(如果批量设置一条或者很小,可能有相反的效果)的方式进行压缩。
2)服务器空胡端直接将压缩消息保存(特别注意,如果kafka的版本不同,那么就存在broker需要先解压缩再压缩的问题,导致消耗资源过多)。
3)消费端自动解压缩,测试了下,发送端无论采用什么压缩模式,消费端无论设置什么解压模式,都可以自动完成解压缩功能。
4)压缩消息可以和非压缩消息混存,也就是说如果你kafka里面先保存的是非压缩消息,后面改成压缩,不用担心,kafka消费端自动支持。
测试的kafka版本:kafka_2.12-1.1.1
测试的kafka客户端版本袜芹:0.10.2.1
测试数据的条数:20000
kafka支持三种压缩算法,lz4、snappy、gzip,
通过上面数据来看,gzip的压缩效果最好,但是生成耗时更长,snappy和lz4的数据差不多,更倾向于lz4,具huxi大神的书上所说kafka里面对snappy做了硬编码,所以性能上最好的是lz4,推荐使用此压缩算法。
压缩率对比:
性能对比图:
很简单:
消费端无论设置什么压缩模式,都可以正确的解压kafka的消息,也就是说消费端可以不设置解压缩,
不过可能性能有所下降。
kafka2.8.0版本(一):搭建部署
在新发布的kafka2.8.0版本中,已经可以不启动zk就可以使用kafka了,在config目录下多了一个kraft目录,在该目录中有一套新的配置文件,可以直接脱离ZooKeeper运行。
到kraft目录中修改server.properties文件,单机的话主要是修改下面的配置
需要把localhost修改卜芹厅为ip地址,否者别的节点连不上kakfa。
测试是否可以连接可以使用如下命令:首运
通过现在三行命令,即可开启一个单机的broker
这样脱离ZooKeeper的kafka就启动了,当前版本还型隐是测试阶段,所以官方不推荐使用。
kafka生产者怎么样能够保障数据不丢,不重复且分区内数据有序!
acks=0,生产者发送过来数据就不管了,可靠性差,效率高;
acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等;
acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低;
在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据,
对可靠性要求比较高的场景。
至少一次(At Least Once)= ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
最多一次(At Most Once)= ACK级别悔让御设置为0
总结:
At Least Once可以保证数据不丢失,但是不能保证数据不重复;
At Most Once可以保证数据不重复,但是不能保证数据不丢失。
精确一次(Exactly Once):滑缺对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复碧岩也不丢失。
Kafka 0.11版本以后,引入了一项重大特性:幂等性和事务。
幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。
经上所述,如果保障数据不丢,不重复需要保证一下条件:
精确一次(Exactly Once) = 幂等性 + 至少一次( ack=-1 + 分区副本数=2 + ISR最小副本数量=2)
开启参数 enable.idempotence 默认为 true,false 关闭。
如何保障kafka内数据有序呢?
kafka在1.x及以后版本保证数据单分区有序,条件如下:
(2)开启幂等性
max.in.flight.requests.per.connection需要设置小于等于5。
(1)未开启幂等性
max.in.flight.requests.per.connection需要设置为1。
原因说明:因为在kafka1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据,
故无论如何,都可以保证最近5个request的数据都是有序的
关于kafka版本和kafka版本选择的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。