kafka零拷贝原理(kafka copyonwrite)

本篇文章给大家谈谈kafka零拷贝原理,以及kafka copyonwrite对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。

本文目录一览:

架构师经典总结:为什么零拷贝可以提升Kafka性能?

计算机系统是由“硬件”和“软件”两大部分组成,计算机硬件包括一个或多个处理器(CPU)、内存、键盘、显示器、磁盘、I/O接口以及其他一些外围设备比如打印机,绘图仪等等。 总之,计算机硬件部分是一个由多种电子和机械设备组成的硬件系统。

为了让人方便正确使用这些设备,就需要编写若干程序来管理这些设备,正是这些程序组成了计算机的软件系统。软件也可以分为两大类:系统软件和应用软件。人们首先直接在硬件上加载一层程序,用它来管理整个计算机硬件设备以及一些软件信息资源,同时还为用户提供开发应用程序的环境,这就是操作系统软件和实用软件。应用软件是在操作系统支持下,为实现用户要求而编制的各种应用程序。

CPU、内存和I/O接口组成的主设备通常称为主机,把没有加载操作系统的主机叫做裸机。裸机与操作系统软件的接口是由CPU的指令系统和厂商提供的系统BIOS组成。

由于操作系统向用户隐藏了系统使用的硬件设备,因此操作系统要为它上面的应用系统软件提供一组命令或系统调用接口供用户程序使用。比如我们需要使用磁盘,可以通过系统命名或系统调用来间接完成,而不需要亲自手动编写一个磁盘设备驱动程序。因此对于用户来说,当计枯键磨算机加载操作系统后,用户不直接与没斗计算机硬件打交道,而是利用操作系统提供的命令和功能区使用计算机。

由于操作系统处于硬件和软件的中央位置,因此很早就有人把操作系统成为计算机系统软件的核心,简称 核心 或 内核 。

内核态和用户态

从系统安全和保护的角度出发,在进行计算机体系结构设计时,处理机的执行模式一般设定为两种:分别称为内核模式(内核态)和用户模式(用户态)。当处理机处于内核模式执行时,意味着系统除了可以执行一般指令外,还可以执行特权指令,即可以执行访问各种控制寄存器的指令、I/O指令以及程序状态字。

当处理机处于用户模式执行时,只能执行一般指令,而不允许执行特权指令。这样做可以保护核心代码不受用户程序有意和无意的攻击。 显然,处理机在运行期间需要在内核模式和用户模式之前进行切换。

零拷贝

Kafka使用零拷贝(Zero-Copy)技术来提供它的性能,所谓的零拷贝是指将数据直接从磁盘文件复制到网卡设备中,而不需要经由应用程序之手,减少了内核和用户模式之间的上下文切换,零拷贝技术通过DMA技术实现。

直接存储器存取方式(Direct Memory Access, DMA) DMA控制方式是以存储器为中心,在主存和I/O设备之间建立一条直接通路,在DMA控制器的控制下进行设备和主存之间的数据交换。这种方式只在传输开始和传输结束时才需要CPU的干预。它非常适用于高速设备与主存之间的成批数据传输。

我们看下下面这样的这样一亮尺个场景:

客户端在游览器中发起请求获取内容,到看到具体内容经历了什么?

首先,该请求经过解析后,通过系统调用由用户态转为核心态执行,在核心态由操作系统中的TCP/IP协议代码和网卡驱动程序控制网卡把请求发送到相应的网络上, 等待Web服务器相应。当服务器返回时,由网卡接受,并通过内核传送给客户程序。

在服务器端,内核通过网卡从网络上接受Web请求,并通过系统调用传递给Web服务器。Web服务器根据此服务请求执行相应的服务进程,并由内核把结果发送到网络上传送给用户。

从上图中可以看到如果服务器从准备数据到发送数据经历了下面4个过程。

从上面过程可以看出,数据是先从内核模式--用户模式--内核模式,浪费了2次复制过程:第一次是从内核模式复制到用户模式;第二次是从用户模式再复制回内核模式,而且在上面的过程中,内核和用户模式的上下文切换也是4次。

如果采用了零拷贝技术,那么应用程序就可以直接请求内核把磁盘中的数据传输给Socket.

零拷贝技术通过DMA技术将文件内容复制到内核模式下的Read Buffer中。不过没有数据被复制到Socket Buffer,只有包含数据的位置和长度的信息的文件描述符被加到Socket Buffer中。DMA引擎直接将数据从内核模式中传递到网卡设备。这里上下文切换变成了2次,也只经历了2次复制过程就从磁盘中传送出去了。

kafka原理分析

作为一款典型的消息中间件产品,kafka系统仍然由producer、broker、consumer三部分组成。kafka涉及的几个常用概念和组件简派薯单介绍如下:

当consumer group的状态发生变化(如有consumer故障、增减consumer成员等)或consumer group消费的topic状态发生变化(如增加了partition,消费的topic发生变化),kafka集群会自动调整和重新分配consumer消费的partition,这个过程就叫做rebalance(再平衡)。

__consumer_offsets是kafka集群自己维护的一个特殊的topic,它里面存储的是每个consumer group已经消费了每个topic partition的offset。__consumer_offsets中offset消息的key由group id,topic name,partition id组成,格式为 {topic name}-${partition id},value值就是consumer提交的已消费的topic partition offset值。__consumer_offsets的分区数和副本数分别由offsets.topic.num.partitions(默认值为50)和offsets.topic.replication.factor(默认值为1)参数配置。我们通过公式 hash(group id) % offsets.topic.num.partitions 就可以计算出指定consumer group的已提交offset存储的partition。由于consumer group提交的offset消息只有最后一条消息有意义,所以__consumer_offsets是一个compact topic,kafka集群会周期性的对__consumer_offsets执行compact操作,只保留最新的一次提交offset。

group coordinator运行在kafka某个broker上,负责consumer group内所有的consumer成员管理、所有的消费的topic的partition的消费关系分配、offset管理、触发rebalance等功能。group coordinator管理partition分配时,会指定consumer group内某个consumer作为group leader执行具体的partition分配任务。存储某个consumer group已提交offset的__consumer_offsets partition leader副本所在的broker就是该consumer group的协调器运行的broker。

跟大多数分布式系统一样,集群有一个master角色管理整个集群,协调集群中各个成员的行为。kafka集群中的controller就相当于其它分布式系统的master,用来负责集群topic的分区分配,分区leader选举以及维护集群的所有partition的ISR等集群协调功能。集群中哪个borker是controller也是通过一致性协议选举产生的,2.8版本之前通腔销过zookeeper进行选主,2.8版本后通过kafka raft协议进行选举。如果controller崩溃,集群会重新选举一个broker作为新的controller,并增加controller epoch值(相当于zookeeper ZAB协议的epoch,raft协议的term值)

当kafka集群新建了topic或为一个topic新增了partition,controller需要为这些新增加的partition分配到具体的broker上,并把分配结果记录下来,供producer和consumer查询获取。

因为只有partition的leader副本才会处理producer和consumer的读写请求,而partition的其他follower副本需要从相应的leader副本同步消息,为了尽量保证集群中所有broker的负载是均衡的,controller在进行集群全局partition副本伍羡游分配时需要使partition的分布情况是如下这样的:

在默认情况下,kafka采用轮询(round-robin)的方式分配partition副本。由于partition leader副本承担的流量比follower副本大,kafka会先分配所有topic的partition leader副本,使所有partition leader副本全局尽量平衡,然后再分配各个partition的follower副本。partition第一个follower副本的位置是相应leader副本的下一个可用broker,后面的副本位置依此类推。

举例来说,假设我们有两个topic,每个topic有两个partition,每个partition有两个副本,这些副本分别标记为1-1-1,1-1-2,1-2-1,1-2-2,2-1-1,2-1-2,2-2-1,2-2-2(编码格式为topic-partition-replia,编号均从1开始,第一个replica是leader replica,其他的是follower replica)。共有四个broker,编号是1-4。我们先对broker按broker id进行排序,然后分配leader副本,最后分配foller副本。

1)没有配置broker.rack的情况

现将副本1-1-1分配到broker 1,然后1-2-1分配到broker 2,依此类推,2-2-1会分配到broker 4。partition 1-1的leader副本分配在broker 1上,那么下一个可用节点是broker 2,所以将副本1-1-2分配到broker 2上。同理,partition 1-2的leader副本分配在broker 2上,那么下一个可用节点是broker 3,所以将副本1-1-2分配到broker 3上。依此类推分配其他的副本分片。最后分配的结果如下图所示:

2)配置了broker.rack的情况

假设配置了两个rack,broker 1和broker 2属于Rack 1,broker 3和broker 4属于Rack 2。我们对rack和rack内的broker分别排序。然后先将副本1-1-1分配到Rack 1的broker 1,然后将副本1-2-1分配到下一个Rack的第一个broker,即Rack 2的broker 3。其他的parttition leader副本依此类推。然后分配follower副本,partition 1-1的leader副本1-1-1分配在Rack 1的broker上,下一个可用的broker是Rack 2的broker 3,所以分配到broker 3上,其他依此类推。最后分配的结果如下图所示:

kafka除了按照集群情况自动分配副本,也提供了reassign工具人工分配和迁移副本到指定broker,这样用户可以根据集群实际的状态和各partition的流量情况分配副本

kafka集群controller的一项功能是在partition的副本中选择一个副本作为leader副本。在topic的partition创建时,controller首先分配的副本就是leader副本,这个副本又叫做preference leader副本。

当leader副本所在broker失效时(宕机或网络分区等),controller需要为在该broker上的有leader副本的所有partition重新选择一个leader,选择方法就是在该partition的ISR中选择第一个副本作为新的leader副本。但是,如果ISR成员只有一个,就是失效的leader自身,其余的副本都落后于leader怎么办?kafka提供了一个unclean.leader.election配置参数,它的默认值为true。当unclean.leader.election值为true时,controller还是会在非ISR副本中选择一个作为leader,但是这时候使用者需要承担数据丢失和数据不一致的风险。当unclean.leader.election值为false时,则不会选择新的leader,该partition处于不可用状态,只能恢复失效的leader使partition重新变为可用。

当preference leader失效后,controller重新选择一个新的leader,但是preference leader又恢复了,而且同步上了新的leader,是ISR的成员,这时候preference leader仍然会成为实际的leader,原先的新leader变为follower。因为在partition leader初始分配时,使按照集群副本均衡规则进行分配的,这样做可以让集群尽量保持平衡。

为了保证topic的高可用,topic的partition往往有多个副本,所有的follower副本像普通的consumer一样不断地从相应的leader副本pull消息。每个partition的leader副本会维护一个ISR列表存储到集群信息库里,follower副本成为ISR成员或者说与leader是同步的,需要满足以下条件:

1)follower副本处于活跃状态,与zookeeper(2.8之前版本)或kafka raft master之间的心跳正常

2)follower副本最近replica.lag.time.max.ms(默认是10秒)时间内从leader同步过最新消息。需要注意的是,一定要拉取到最新消息,如果最近replica.lag.time.max.ms时间内拉取过消息,但不是最新的,比如落后follower在追赶leader过程中,也不会成为ISR。

follower在同步leader过程中,follower和leader都会维护几个参数,来表示他们之间的同步情况。leader和follower都会为自己的消息队列维护LEO(Last End Offset)和HW(High Watermark)。leader还会为每一个follower维护一个LEO。LEO表示leader或follower队列写入的最后一条消息的offset。HW表示的offset对应的消息写入了所有的ISR。当leader发现所有follower的LEO的最小值大于HW时,则会增加HW值到这个最小值LEO。follower拉取leader的消息时,同时能获取到leader维护的HW值,如果follower发现自己维护的HW值小于leader发送过来的HW值,也会增加本地的HW值到leader的HW值。这样我们可以得到一个不等式: follower HW = leader HW = follower LEO = leader LEO 。HW对应的log又叫做committed log,consumer消费partititon的消息时,只能消费到offset值小于或等于HW值的消息的,由于这个原因,kafka系统又称为分布式committed log消息系统。

kafka的消息内容存储在log.dirs参数配置的目录下。kafka每个partition的数据存放在本地磁盘log.dirs目录下的一个单独的目录下,目录命名规范为 ${topicName}-${partitionId} ,每个partition由多个LogSegment组成,每个LogSegment由一个数据文件(命名规范为: {baseOffset}.index)和一个时间戳索引文件(命名规范为:${baseOffset}.timeindex)组成,文件名的baseOffset就是相应LogSegment中第一条消息的offset。.index文件存储的是消息的offset到该消息在相应.log文件中的偏移,便于快速在.log文件中快速找到指定offset的消息。.index是一个稀疏索引,每隔一定间隔大小的offset才会建立相应的索引(比如每间隔10条消息建立一个索引)。.timeindex也是一个稀疏索引文件,这样可以根据消息的时间找到对应的消息。

可以考虑将消息日志存放到多个磁盘中,这样多个磁盘可以并发访问,增加消息读写的吞吐量。这种情况下,log.dirs配置的是一个目录列表,kafka会根据每个目录下partition的数量,将新分配的partition放到partition数最少的目录下。如果我们新增了一个磁盘,你会发现新分配的partition都出现在新增的磁盘上。

kafka提供了两个参数log.segment.bytes和log.segment.ms来控制LogSegment文件的大小。log.segment.bytes默认值是1GB,当LogSegment大小达到log.segment.bytes规定的阈值时,kafka会关闭当前LogSegment,生成一个新的LogSegment供消息写入,当前供消息写入的LogSegment称为活跃(Active)LogSegment。log.segment.ms表示最大多长时间会生成一个新的LogSegment,log.segment.ms没有默认值。当这两个参数都配置了值,kafka看哪个阈值先达到,触发生成新的LogSegment。

kafka还提供了log.retention.ms和log.retention.bytes两个参数来控制消息的保留时间。当消息的时间超过了log.retention.ms配置的阈值(默认是168小时,也就是一周),则会被认为是过期的,会被kafka自动删除。或者是partition的总的消息大小超过了log.retention.bytes配置的阈值时,最老的消息也会被kafka自动删除,使相应partition保留的总消息大小维持在log.retention.bytes阈值以下。这个地方需要注意的是,kafka并不是以消息为粒度进行删除的,而是以LogSegment为粒度删除的。也就是说,只有当一个LogSegment的最后一条消息的时间超过log.retention.ms阈值时,该LogSegment才会被删除。这两个参数都配置了值时,也是只要有一个先达到阈值,就会执行相应的删除策略

当我们使用KafkaProducer向kafka发送消息时非常简单,只要构造一个包含消息key、value、接收topic信息的ProducerRecord对象就可以通过KafkaProducer的send()向kafka发送消息了,而且是线程安全的。KafkaProducer支持通过三种消息发送方式

KafkaProducer客户端虽然使用简单,但是一条消息从客户端到topic partition的日志文件,中间需要经历许多的处理过程。KafkaProducer的内部结构如下所示:

从图中可以看出,消息的发送涉及两类线程,一类是调用KafkaProducer.send()方法的应用程序线程,因为KafkaProducer.send()是多线程安全的,所以这样的线程可以有多个;另一类是与kafka集群通信,实际将消息发送给kafka集群的Sender线程,当我们创建一个KafkaProducer实例时,会创建一个Sender线程,通过该KafkaProducer实例发送的所有消息最终通过该Sender线程发送出去。RecordAccumulator则是一个消息队列,是应用程序线程与Sender线程之间消息传递的桥梁。当我们调用KafkaProducer.send()方法时,消息并没有直接发送出去,只是写入了RecordAccumulator中相应的队列中,最终需要Sender线程在适当的时机将消息从RecordAccumulator队列取出来发送给kafka集群。

消息的发送过程如下:

在使用KafkaConsumer实例消费kafka消息时,有一个特性我们要特别注意,就是KafkaConsumer不是多线程安全的,KafkaConsumer方法都在调用KafkaConsumer的应用程序线程中运行(除了consumer向kafka集群发送的心跳,心跳在一个专门的单独线程中发送),所以我们调用KafkaConsumer的所有方法均需要保证在同一个线程中调用,除了KafkaConsumer.wakeup()方法,它设计用来通过其它线程向consumer线程发送信号,从而终止consumer执行。

跟producer一样,consumer要与kafka集群通信,消费kafka消息,首先需要获取消费的topic partition leader replica所在的broker地址等信息,这些信息可以通过向kafka集群任意broker发送Metadata请求消息获取。

我们知道,一个consumer group有多个consumer,一个topic有多个partition,而且topic的partition在同一时刻只能被consumer group内的一个consumer消费,那么consumer在消费partition消息前需要先确定消费topic的哪个partition。partition的分配通过group coordinator来实现。基本过程如下:

我们可以通过实现接口org.apache.kafka.clients.consumer.internals.PartitionAssignor自定义partition分配策略,但是kafka已经提供了三种分配策略可以直接使用。

partition分配完后,每个consumer知道了自己消费的topic partition,通过metadata请求可以获取相应partition的leader副本所在的broker信息,然后就可以向broker poll消息了。但是consumer从哪个offset开始poll消息?所以consumer在第一次向broker发送FetchRequest poll消息之前需要向Group Coordinator发送OffsetFetchRequest获取消费消息的起始位置。Group Coordinator会通过key {topic}-${partition}查询 __consumer_offsets topic中是否有offset的有效记录,如果存在,则将consumer所属consumer group最近已提交的offset返回给consumer。如果没有(可能是该partition是第一次分配给该consumer group消费,也可能是该partition长时间没有被该consumer group消费),则根据consumer配置参数auto.offset.reset值确定consumer消费的其实offset。如果auto.offset.reset值为latest,表示从partition的末尾开始消费,如果值为earliest,则从partition的起始位置开始消费。当然,consumer也可以随时通过KafkaConsumer.seek()方法人工设置消费的起始offset。

kafka broker在收到FetchRequest请求后,会使用请求中topic partition的offset查一个skiplist表(该表的节点key值是该partition每个LogSegment中第一条消息的offset值)确定消息所属的LogSegment,然后继续查LogSegment的稀疏索引表(存储在.index文件中),确定offset对应的消息在LogSegment文件中的位置。为了提升消息消费的效率,consumer通过参数fetch.min.bytes和max.partition.fetch.bytes告诉broker每次拉取的消息总的最小值和每个partition的最大值(consumer一次会拉取多个partition的消息)。当kafka中消息较少时,为了让broker及时将消息返回给consumer,consumer通过参数fetch.max.wait.ms告诉broker即使消息大小没有达到fetch.min.bytes值,在收到请求后最多等待fetch.max.wait.ms时间后,也将当前消息返回给consumer。fetch.min.bytes默认值为1MB,待fetch.max.wait.ms默认值为500ms。

为了提升消息的传输效率,kafka采用零拷贝技术让内核通过DMA把磁盘中的消息读出来直接发送到网络上。因为kafka写入消息时将消息写入内存中就返回了,如果consumer跟上了producer的写入速度,拉取消息时不需要读磁盘,直接从内存获取消息发送出去就可以了。

为了避免发生再平衡后,consumer重复拉取消息,consumer需要将已经消费完的消息的offset提交给group coordinator。这样发生再平衡后,consumer可以从上次已提交offset出继续拉取消息。

kafka提供了多种offset提交方式

partition offset提交和管理对kafka消息系统效率来说非常关键,它直接影响了再平衡后consumer是否会重复拉取消息以及重复拉取消息的数量。如果offset提交的比较频繁,会增加consumer和kafka broker的消息处理负载,降低消息处理效率;如果offset提交的间隔比较大,再平衡后重复拉取的消息就会比较多。还有比较重要的一点是,kafka只是简单的记录每次提交的offset值,把最后一次提交的offset值作为最新的已提交offset值,作为再平衡后消息的起始offset,而什么时候提交offset,每次提交的offset值具体是多少,kafka几乎不关心(这个offset对应的消息应该存储在kafka中,否则是无效的offset),所以应用程序可以先提交3000,然后提交2000,再平衡后从2000处开始消费,决定权完全在consumer这边。

kafka中的topic partition与consumer group中的consumer的消费关系其实是一种配对关系,当配对双方发生了变化时,kafka会进行再平衡,也就是重新确定这种配对关系,以提升系统效率、高可用性和伸缩性。当然,再平衡也会带来一些负面效果,比如在再平衡期间,consumer不能消费kafka消息,相当于这段时间内系统是不可用的。再平衡后,往往会出现消息的重复拉取和消费的现象。

触发再平衡的条件包括:

需要注意的是,kafka集群broker的增减或者topic partition leader重新选主这类集群状态的变化并不会触发在平衡

有两种情况与日常应用开发比较关系比较密切:

consumer在调用subscribe()方法时,支持传入一个ConsumerRebalanceListener监听器,ConsumerRebalanceListener提供了两个方法,onPartitionRevoked()方法在consumer停止消费之后,再平衡开始之前被执行。可以发现,这个地方是提交offset的好时机。onPartitonAssigned()方法则会在重新进行partition分配好了之后,但是新的consumer还未消费之前被执行。

我们在提到kafka时,首先想到的是它的吞吐量非常大,这也是很多人选择kafka作为消息传输组件的重要原因。

以下是保证kafka吞吐量大的一些设计考虑:

但是kafka是不是总是这么快?我们同时需要看到kafka为了追求快舍弃了一些特性:

所以,kafka在消息独立、允许少量消息丢失或重复、不关心消息顺序的场景下可以保证非常高的吞吐量,但是在需要考虑消息事务、严格保证消息顺序等场景下producer和consumer端需要进行复杂的考虑和处理,可能会比较大的降低kafka的吞吐量,例如对可靠性和保序要求比较高的控制类消息需要非常谨慎的权衡是否适合使用kafka。

我们通过producer向kafka集群发送消息,总是期望消息能被consumer成功消费到。最不能忍的是producer收到了kafka集群消息写入的正常响应,但是consumer仍然没有消费到消息。

kafka提供了一些机制来保证消息的可靠传递,但是有一些因素需要仔细权衡考虑,这些因素往往会影响kafka的吞吐量,需要在可靠性与吞吐量之间求得平衡:

kafka只保证partition消息顺序,不保证topic级别的顺序,而且保证的是partition写入顺序与读取顺序一致,不是业务端到端的保序。

如果对保序要求比较高,topic需要只设置一个partition。这时可以把参数max.in.flight.requests.per.connection设置为1,而retries设置为大于1的数。这样即使发生了可恢复型错误,仍然能保证消息顺序,但是如果发生不可恢复错误,应用层进行重试的话,就无法保序了。也可以采用同步发送的方式,但是这样也极大的降低了吞吐量。如果消息携带了表示顺序的字段,可以在接收端对消息进行重新排序以保证最终的有序。

Linux中的零拷贝技术

参考文章: 浅析Linux中的零拷贝技术

内核和用户空间,共享内存。数据copy到内核区后,只需要把地址共享给应用程序即可,无需再copy一次数据到用户空间。

优点:

缺点:

应用:

kafka生产者发送消息到broker的时候,broker的网络接收到数据后,copy到broker的内核空间。然后通过mmap技术,broker会修改消息头,添加一些元数据。所以,写入数据很快。当然顺序IO也是关键技术

内核直接发送数据到socket,无需用户空间参与。

优点:

缺点:

为了节省内核里面的一次copy,我们可以使用优化过的sendfile。该系统方法需要由特定的硬件来支持,并不是所有系统都支持。如下:

sendfile的时候,直接把内核空间的地址传递给socket缓存,DMA直接从指定地址读取数据到流里面。

sendfile只适用于将数据从文件拷贝到套接字上,限定了它的使用范围。Linux在2.6.17版本引入splice系统调用,用于在两个文件描述慎袭符中移动数据。

splice调用在两个文件描述符之间移动数据,而不需要数据在内核空间和用户空间来回拷贝。他从fd_in拷贝len长度的数据到fd_out,但是有一方必须是管道设备,这也是目前splice的一些局限性。flags参数有以下几种取值:

splice调用利用了Linux提出的管道缓冲区机制, 所以稿正至少一个描述符要为管道。

以上几种零拷贝技术都是减少数据在用户空间和内核空间拷贝技术实现的,但是有些时候,数据必须在用户空间和内核空间之间拷贝。这时候,我们只能针对数据在用户空间和内核空间拷贝的时机上下功夫了。Linux通常利用写时复制(copy on write)来减少系统开销,这个技术又时常称作COW。

摘录网上:

传统的fork()系统调用直接把所有的资源复制给新创建的进程。这种实现过于简单并且效率低下,因为它拷贝的数据也许并不共享,更糟的情况是,如果新进程打算立即执行一个新的映像,那么所有的拷贝都将前功尽弃。Linux的fork()使用写时拷贝(copy-on-write)页实现。写时拷贝是一种可以推迟甚至免除拷贝数据的技术。内核此时并不复制整个进程地址空间,而是让父进程和子进程共享同一个拷贝。只有在需要写入的时候,数据才会被复制,从而使各个进程拥有各自的拷贝。也就是说,资源的复制只有在需要写入的时候才进行,在此之前,只是以只读方式共享。这种技术使地址空间上的键孝悔页的拷贝被推迟到实际发生写入的时候。在页根本不会被写入的情况下—举例来说,fork()后立即调用exec()—它们就无需复制了。fork()的实际开销就是复制父进程的页表以及给子进程创建惟一的进程描述符。在一般情况下,进程创建后都会马上运行一个可执行的文件,这种优化可以避免拷贝大量根本就不会被使用的数据(地址空间里常常包含数十兆的数据)。由于Unix强调进程快速执行的能力,所以这个优化是很重要的。这里补充一点:Linux COW与exec没有必然联系。

我总结下: copy-on-write技术其实是一种延迟复制的技术,只有需要用(写)的时候,才去复制数据。

Kafka相关面试题

title: Kafka常见问题

date: 2020-04-01 16:25:49

update: 2020-04-01 20:31:30

excerpt: Kafka 面试中常见问题

toc_min_depth: 3

tags:

Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。

位置

内容

kafka中的消费者在读取服务端的数据时,需要将服务端的磁盘文件通过网络发送到消费者进程,网络发送需要经过几种网络节点。如下图所示:

传统的读取文件数据并发送到网络的步骤如下:

(1)操作系统将数据从磁盘文件中读取到内核空间的页面缓存;

(2)应用程序将数据从内核空间读入用户空间缓冲区;

(3)应用程序将读到数据写回内核空间并放入socket缓冲区;

(4)操作系统将数据从socket缓冲区复制到网卡接口,此时数据才能通过网络发送。

通常情况下,Kafka的消息会有多个订阅者,生产者发布的消息会被不同的消费者多次消费,为了优化这个流程,Kafka使用了“零拷贝技术”,如下图所示:

“零拷贝技术”只用将磁盘文件的数据复制到页面缓存中一次,然后将数据从页面渗掘哪缓存直接发送到网络中(发送给不同的订阅者时,都可以使用同一个页面缓存),避免了重复复制操作。

如果有10个消费者,传统方式下,数据复制次数为4*10=40次,而使用“零拷贝技术”只需要1+10=11次,一次为从磁盘复制到页面缓存,10次表示10个消费者各自读取一次页面缓存。

传统的文件拷贝通常需要从用户态去转到核心态,经过read buffer,然丛码后再返回到用户态的应用层buffer,然后再从用户态把数据拷贝到核心态的socket buffer,然后发送到网卡。

传统的数据传输需要多次的用户态和核心态之间的切换,而且还要把数据复制多次,最终才打到网卡。

如果减少了用户态与核心态之间的切换,是不是就会更快了呢?

此时我们会发现用户态“空空如也”。数据没有来到用户态,而是直接在核心态就进行了传输,但这样依然还是有多次复制。首先数据被读取到read buffer中,然后发到socket buffer,最后才发到网卡。虽然减少了用户态和核心态的切换,但依然存在多次数据复制。

如果可以进一步减少数据复制的次数,甚至没有数据复制是不是就会做到最快呢?

DMA

别急,这里我们先介绍一个新的武器:DMA。

DMA,全称叫Direct Memory Access,一种可让某些散清硬件子系统去直接访问系统主内存,而不用依赖CPU的计算机系统的功能。听着是不是很厉害,跳过CPU,直接访问主内存。传统的内存访问都需要通过CPU的调度来完成。如下图:

而DMA,则可以绕过CPU,硬件自己去直接访问系统主内存。如下图:

很多硬件都支持DMA,这其中就包括网卡。

零拷贝

回到本文中的文件传输,有了DMA后,就可以实现绝对的零拷贝了,因为网卡是直接去访问系统主内存的。如下图:

Java的零拷贝实现

在Java中的零拷贝实现是在FileChannel中,其中有个方法transferTo(position,fsize,src)。

传统的文件传输是通过java.io.DataOutputStream,java.io.FileInputStream来实现的,然后通过while循环来读取input,然后写入到output中。

零拷贝则是通过java.nio.channels.FileChannel中的transferTo方法来实现的。transferTo方法底层是基于操作系统的sendfile这个system call来实现的(不再需要拷贝到用户态了),sendfile负责把数据从某个fd(file descriptor)传输到另一个fd。

sendfile:

Java的transferTo:

传统方式与零拷贝性能对比

可以看出速度快出至少三倍多。Kafka在文件传输的过程中正是使用了零拷贝技术对文件进行拷贝。建议以后多用FileChannel的transferTo吧。

总结

需要用到的类:

KafkaProducer :需要创建一个生产者对象,用来发送数据

ProducerConfig :获取所需的一系列配置参数

ProducerRecord :每条数据都要封装成一个ProducerRecord对象

几个比较重要的配置项

//kafka集群,broker-list

props.put("bootstrap.servers", "hadoop102:9092");

Kafka架构及基本原理简析

  Kafka是一个由Scala和Java编写的企业级的消息发布和订阅系统,最早是由Linkedin公司开发,最终开源到Apache软件基金会的项目。Kafka是一个分布式的,支持分区的,多副本的和多订阅者的高吞吐量的消息系统,被广泛应用在应用解耦、异步处理、限流削峰和消息驱动等场景。本文将针对Kafka的架构和相关组件进行简单的介绍。在介绍Kafka的架构之前,我们先了解一下Kafk的核心概念。

  在详细介绍Kafka的架构和基本组件之前,需要先了解一下Kafka的一些核心概念。

Producer: 消息的生产者,负责往Kafka集群中发送消息;

Consumer: 消息的消费者,主动从Kafka集群中拉取消息。

Consumer Group: 每个Consumer属于一个特定的Consumer Group,新建Consumer的时候需要指定对应的Consumer Group ID。

Broker: Kafka集群中的服务实例,也称之为节点,每个Kafka集群包含一个或者多个Broker(一个Broker就是一个服务器或节点)。

Message: 通过Kafka集群进行传递的对象实体,存储需要传送的信息。

Topic: 消息的类别,主要用于对消息进行逻辑上的区分,每条发送到Kafka集群的消息都需要有一个指定的Topic,消费者根据Topic对指定的消息进行消费。

Partition: 消息的分区,Partition是一个物理上的概念,相当于一个文件夹,Kafka会为每个topic的每个分区创建一个文件夹,一个Topic的消息会存储在一个或者多个Partition中。

Segment: 一个partition当中存在多个segment文件段(分段存储),每个Segment分为两部分,.log文件和 .index 文件,其中 .index 文件是索引文件,主要用于快速查询.log 文件当中数据的偏移量位置;

.log文件: 存放Message的数据文件,在Kafka中把数据文件就叫做日志文件。一个分区下面默认有n多个.log文件(分段存储)。一个.log文件大掘悄尘默认1G,消息会不断追加在.log文件中,当.log文件的大小超过1G的时候,会自动新建一个新的.log文件。

.index文件: 存放.log文件的索引数据,每个.index文件有一个对应同名的.log文件。

  后面我们会对上面的一些核心概念进行更深入的介绍。在介绍完Kafka的核心概念之后,我们来看一下Kafka的对外提供的基本功能,组件及架构设计。

  如上图所示,Kafka主要包含四个主要的API组件:

1. Producer API

  应用程序通过Producer API向Kafka集群发送一个或多个Topic的消息。

2. Consumer API

  应用程序通过Consumer API,向Kafka集群订阅一个或多个Topic的消运蚂息,并处理这些Topic下接收到的消息。

3. Streams API

  应用程序通过使用Streams API充当流处理器(Stream Processor),从一个或者多个判禅Topic获取输入流,并生产一个输出流到一个或者多个Topic,能够有效地将输入流进行转变后变成输出流输出到Kafka集群。

4. Connect API

  允许应用程序通过Connect API构建和运行可重用的生产者或者消费者,能够把kafka主题连接到现有的应用程序或数据系统。Connect实际上就做了两件事情:使用Source Connector从数据源(如:DB)中读取数据写入到Topic中,然后再通过Sink Connector读取Topic中的数据输出到另一端(如:DB),以实现消息数据在外部存储和Kafka集群之间的传输。

  接下来我们将从Kafka的架构出发,重点介绍Kafka的主要组件及实现原理。Kafka支持消息持久化,消费端是通过主动拉取消息进行消息消费的,订阅状态和订阅关系由客户端负责维护,消息消费完后不会立刻删除,会保留历史消息,一般默认保留7天,因此可以通过在支持多订阅者时,消息无需复制多分,只需要存储一份就可以。下面将详细介绍每个组件的实现原理。

1. Producer

  Producer是Kafka中的消息生产者,主要用于生产带有特定Topic的消息,生产者生产的消息通过Topic进行归类,保存在Kafka 集群的Broker上,具体的是保存在指定的partition 的目录下,以Segment的方式(.log文件和.index文件)进行存储。

2. Consumer

  Consumer是Kafka中的消费者,主要用于消费指定Topic的消息,Consumer是通过主动拉取的方式从Kafka集群中消费消息,消费者一定属于某一个特定的消费组。

3. Topic

  Kafka中的消息是根据Topic进行分类的,Topic是支持多订阅的,一个Topic可以有多个不同的订阅消息的消费者。Kafka集群Topic的数量没有限制,同一个Topic的数据会被划分在同一个目录下,一个Topic可以包含1至多个分区,所有分区的消息加在一起就是一个Topic的所有消息。

4. Partition

  在Kafka中,为了提升消息的消费速度,可以为每个Topic分配多个Partition,这也是就之前我们说到的,Kafka是支持多分区的。默认情况下,一个Topic的消息只存放在一个分区中。Topic的所有分区的消息合并起来,就是一个Topic下的所有消息。每个分区都有一个从0开始的编号,每个分区内的数据都是有序的,但是不同分区直接的数据是不能保证有序的,因为不同的分区需要不同的Consumer去消费,每个Partition只能分配一个Consumer,但是一个Consumer可以同时一个Topic的多个Partition。

5. Consumer Group

  Kafka中的每一个Consumer都归属于一个特定的Consumer Group,如果不指定,那么所有的Consumer都属于同一个默认的Consumer Group。Consumer Group由一个或多个Consumer组成,同一个Consumer Group中的Consumer对同一条消息只消费一次。每个Consumer Group都有一个唯一的ID,即Group ID,也称之为Group Name。Consumer Group内的所有Consumer协调在一起订阅一个Topic的所有Partition,且每个Partition只能由一个Consuemr Group中的一个Consumer进行消费,但是可以由不同的Consumer Group中的一个Consumer进行消费。如下图所示:

在层级关系上来说Consumer Group好比是跟Topic对应的,而Consumer就对应于Topic下的Partition。Consumer Group中的Consumer数量和Topic下的Partition数量共同决定了消息消费的并发量,且Partition数量决定了最终并发量,因为一个Partition只能由一个Consumer进行消费。当一个Consumer Group中Consumer数量超过订阅的Topic下的Partition数量时,Kafka会为每个Partition分配一个Consumer,多出来的Consumer会处于空闲状态。当Consumer Group中Consumer数量少于当前定于的Topic中的Partition数量是,单个Consumer将承担多个Partition的消费工作。如上图所示,Consumer Group B中的每个Consumer需要消费两个Partition中的数据,而Consumer Group C中会多出来一个空闲的Consumer4。总结下来就是:同一个Topic下的Partition数量越多,同一时间可以有越多的Consumer进行消费,消费的速度就会越快,吞吐量就越高。同时,Consumer Group中的Consumer数量需要控制为小于等于Partition数量,且最好是整数倍:如1,2,4等。

6. Segment

  考虑到消息消费的性能,Kafka中的消息在每个Partition中是以分段的形式进行存储的,即每1G消息新建一个Segment,每个Segment包含两个文件:.log文件和.index文件。之前我们已经说过,.log文件就是Kafka实际存储Producer生产的消息,而.index文件采用稀疏索引的方式存储.log文件中对应消息的逻辑编号和物理偏移地址(offset),以便于加快数据的查询速度。.log文件和.index文件是一一对应,成对出现的。下图展示了.log文件和.index文件在Partition中的存在方式。

  Kafka里面每一条消息都有自己的逻辑offset(相对偏移量)以及存在物理磁盘上面实际的物理地址便宜量Position,也就是说在Kafka中一条消息有两个位置:offset(相对偏移量)和position(磁盘物理偏移地址)。在kafka的设计中,将消息的offset作为了Segment文件名的一部分。Segment文件命名规则为:Partition全局的第一个Segment从0开始,后续每个segment文件名为上一个Partition的最大offset(Message的offset,非实际物理地偏移地址,实际物理地址需映射到.log中,后面会详细介绍在.log文件中查询消息的原理)。数值最大为64位long大小,由20位数字表示,前置用0填充。

  上图展示了.index文件和.log文件直接的映射关系,通过上图,我们可以简单介绍一下Kafka在Segment中查找Message的过程:

   1. 根据需要消费的下一个消息的offset,这里假设是7,使用二分查找在Partition中查找到文件名小于(一定要小于,因为文件名编号等于当前offset的文件里存的都是大于当前offset的消息)当前offset的最大编号的.index文件,这里自然是查找到了00000000000000000000.index。

   2. 在.index文件中,使用二分查找,找到offset小于或者等于指定offset(这里假设是7)的最大的offset,这里查到的是6,然后获取到index文件中offset为6指向的Position(物理偏移地址)为258。

   3. 在.log文件中,从磁盘位置258开始顺序扫描,直到找到offset为7的Message。

至此,我们就简单介绍完了Segment的基本组件.index文件和.log文件的存储和查询原理。但是我们会发现一个问题:.index文件中的offset并不是按顺序连续存储的,为什么Kafka要将索引文件设计成这种不连续的样子?这种不连续的索引设计方式称之为稀疏索引,Kafka中采用了稀疏索引的方式读取索引,kafka每当.log中写入了4k大小的数据,就往.index里以追加的写入一条索引记录。使用稀疏索引主要有以下原因:

   (1) 索引稀疏存储,可以大幅降低.index文件占用存储空间大小。

   (2) 稀疏索引文件较小,可以全部读取到内存中,可以避免读取索引的时候进行频繁的IO磁盘操作,以便通过索引快速地定位到.log文件中的Message。

7. Message

  Message是实际发送和订阅的信息是实际载体,Producer发送到Kafka集群中的每条消息,都被Kafka包装成了一个Message对象,之后再存储在磁盘中,而不是直接存储的。Message在磁盘中的物理结构如下所示。

  其中 key 和 value 存储的是实际的Message内容,长度不固定,而其他都是对Message内容的统计和描述,长度固定。因此在查找实际Message过程中,磁盘指针会根据Message的 offset 和 message length 计算移动位数,以加速Message的查找过程。之所以可以这样加速,因为Kafka的.log文件都是顺序写的,往磁盘上写数据时,就是追加数据,没有随机写的操作。

8.Partition Replicas

  最后我们简单聊一下Kafka中的Partition Replicas(分区副本)机制,0.8版本以前的Kafka是没有副本机制的。创建Topic时,可以为Topic指定分区,也可以指定副本个数。kafka 中的分区副本如下图所示:

  Kafka通过副本因子(replication-factor)控制消息副本保存在几个Broker(服务器)上,一般情况下副本数等于Broker的个数,且同一个副本因子不能放在同一个Broker中。副本因子是以分区为单位且区分角色;主副本称之为Leader(任何时刻只有一个),从副本称之为 Follower(可以有多个),处于同步状态的副本叫做in-sync-replicas(ISR)。Leader负责读写数据,Follower不负责对外提供数据读写,只从Leader同步数据,消费者和生产者都是从leader读写数据,不与follower交互,因此Kafka并不是读写分离的。同时使用Leader进行读写的好处是,降低了数据同步带来的数据读取延迟,因为Follower只能从Leader同步完数据之后才能对外提供读取服务。

  如果一个分区有三个副本因子,就算其中一个挂掉,那么只会剩下的两个中,选择一个leader,如下图所示。但不会在其他的broker中,另启动一个副本(因为在另一台启动的话,必然存在数据拷贝和传输,会长时间占用网络IO,Kafka是一个高吞吐量的消息系统,这个情况不允许发生)。如果指定分区的所有副本都挂了,Consumer如果发送数据到指定分区的话,将写入不成功。Consumer发送到指定Partition的消息,会首先写入到Leader Partition中,写完后还需要把消息写入到ISR列表里面的其它分区副本中,写完之后这个消息才能提交offset。

  到这里,差不多把Kafka的架构和基本原理简单介绍完了。Kafka为了实现高吞吐量和容错,还引入了很多优秀的设计思路,如零拷贝,高并发网络设计,顺序存储,以后有时间再说。

[img]

零拷贝技术

知识补充:

1、DMA是直接内存访问( Direct Memory Access ) 技术,早期 DMA 只存在在主板上,如今由于 I/O 设备越来越多,数据传输的需求也不尽相同,所以每个 I/O 设备里面都有自己的 DMA 控制器。

2、每次系统调用都得先从用户态切换到内激闹核态,等内核完成任务后,再从内核态切换回用户态。

我们都听过 kafka 很快,其中一个原因是 kafka 使用零拷贝。看看从文件中读取数据并通过网络将数据传输到另一个程序的场景,在内部的复制操作,需要在用户模式和内核明旁罩模式之间进行四次上下文切换,并且进行数据复制四次。

完成所有四个操作后,它将再次切换到用户模式。

回顾以上动作,其实发现实际上第二个和第三个数据拷贝是可以避免的。Java 类库通过 java.nio.channels 中的 transferTo() 方法在 UNIX 系统上执行零副本,使用零拷贝的应用程序请求内核直接将数据从磁盘文件复制到套接字,而不通过应用程序。零拷贝极大提高了应用程序性能,并减启樱少了内核和用户模式之间的上下文切换次数。

我们看看 transferTo() 是如何复制数据的?

这里我们还需要 3 个副本和 2 个上下文切换。

但当前还没有达到零拷贝,如果底层网卡支持收集操作,可以进一步减少内核重复拷贝数据的操作。在 Linux 内核 2.4 及更高版本中,套接字缓冲区描述符支持该场景。

Kafka 和 Nginx 都有实现零拷贝技术,这将大大提高文件传输的性能。拷贝技术,本质上讲就是通过减少非必要的内存拷贝以及上下文切换,来提高文件在通道间复制速度的一种技术。以本文中的transferTo()方法为例,通过该技术,可以将原来 四次内存间拷贝减少成两次,将四次上下文切换减少成两次 ,大大提高复制的速度。但零拷贝技术并非万能的,它有自己的使用场景,对于将大量数据从一个 I/O 通道复制到另一个通道的情况(例如 Web 服务器),都是合适的。

关于kafka零拷贝原理和kafka copyonwrite的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。

标签列表