zhaoyu@home:~$

kafka 设计

Persistence

FileSystem

kafka主要依赖文件系统存储和缓存消息。与过去若干年磁盘的访问延迟指标不同,现在的磁盘关键的性能指标是磁盘的吞吐量。在6个7200RPM 的SATA RAID-5硬盘组成的JBOD(just a bunch of disks)阵列上, 线性写入的速度可以达到600m/s,而随机写入的速度只有100k/s。 现代的操作系统使用read-ahead和write-behind技术,读取多个大的块数据, 并且将小的逻辑写入组成大的物理写入。在一些情况下,顺序的磁盘访问可以比内存的随机访问快。

为了弥补内存和磁盘之间的性能差异,现代操作系统大量使用内存作为磁盘缓存,变得更加普遍。当内存被回收时,OS很喜欢将所有的空闲内存转化为 磁盘缓存,所以所有的磁盘读取和写入都会通过磁盘缓存。这个特性在不直接使用I/O的情况下,不容易关闭。因此,一个进程即使对数据维护了一个 进程内的缓存,这份数据也有可能被OS的pagecache(这里可以理解为磁盘缓存)缓存,被存储两次。

此外, 我们基于jvm上面构建应用, 有花费时间在java内存上的人都知道两件事

  1. 内存中存有大量的对象需要消耗很高, 经常是双倍于存储到磁盘时大小(可能更多)
  2. java的垃圾收集器在内存数据增加是变得很烦琐的, 很慢。

所以,使用文件系统和pagecache 要优于内存或者其他存储结构,通过直接访问空闲内存,我们的可用缓存至少增加了一倍。有可能在将单个对象存储 为紧密的字节时,可用缓存会再次翻倍。这样,在32g的机器中,我们可以使用28-30的缓存,并不需要付出GC的开销,并且服务重启后缓存依然是热 缓存。而使用进程缓存,是一个完全的冷缓存,并且缓存的建立需要花费很长的时间(10GB cache可能需要10分钟)。

更简单的设计,我们不会维护很大的内存,并在内存快消耗完的时候flush到磁盘,所有的数据都是通过persistent log直接写入文件系统,并不需 要马上flush到磁盘,只是将数据转移到内核的pagecache中。 这种基于pagecache-centric的设计可以参考这篇文章

常量时间复杂度

消息系统中通常使用Btree或者其他相关的随机访问数据机构存储消息。Btree的时间复杂度为O(log N),O(log N)可以认为基本相当于常量时间。 但是在磁盘操作中不是这样的,磁盘寻址需要消耗10ms的时间,并且不能并行执行。所以几次磁盘寻址时间消耗很高。树状结构的查找会随着数据的 增长呈超线性增长。数据量增长一倍,查找时间增长会超过一倍。

使用持久化列表可以实现O(1)性能的时间复杂度,而且写入和读取不会相互阻塞。这样性能和数据的大小没有关系,可以做到其他消息系统中不能做 到的,将消息保存到列表中一段时间。

Efficiency

我们为kafka的性能做了大量的工作,一个主要的案例就是处理用户的活动日志。每个页面的访问都会产生好几十次的日志。我们努力使消息消费的成 本尽可能的小。

在一些类似得系统创建和运行的过程中我们发现,高效的多租户操作是效率的关键。应用在使用中的一些小的变化会成为下游基础系统的瓶颈,这些 小的变化也会产生新的问题。

一旦低效率的磁盘访问被消除,在系统中还有两个低效率的操作:大量小的IO操作,和过多的字节赋值。小的IO操作反生在客户端和服务器之间, 以及服务器端持久化数据的操作中。为了避免小的IO操作,我们的协议建立在一个抽象的“消息集合”的基础上,将消息组装到一起。这样操作相对于 一个个消息的发送来说,摊销了网络往返的开支。服务器一次将一大堆消息附加到日志中,消费者每次也会线性取一大堆消息。

这种简单的优化产生了数量级的加速效果,批量处理使得网络包增大,顺序磁盘访问增大,连续的内存块等等。所有这些让kafka的随机消息写入的 突发流变成面向用户的线性写入。

再说说字节复制,在消息量少的情况下不会产生问题,但是在高消息负荷下是显著的。为了避免这个问题,我们使用了一个可以被producer,broker, consumer共享的标准二进制消息格式(所以消息可以在他们之间不需要修改,可以直接传递)。

broker维护的消息日志只是一个文件的路径,每个文件由一个顺序的消息集合产生并写入磁盘,维护这种日志格式优化了最重要的操作:持久化日志 的网络传输。unix系统为从pagecache向socket传输数据提供了一种高度优化的代码路径,linux系统通过sendfile系统调用实现。

为了理解sendfile的重要性,我们理解下数据从文件传输到socket的步骤。

  1. 从磁盘将数据读取到内核空间中的pagecache 。
  2. 应用从pagecache读取数据到用户空间buffer。
  3. 应用将数据写入在内核空间中的一个socket buffer。
  4. 操作系统从socket buffer复制数据到NIC buffer,在NIC中发送数据到网络。

这个过程明显效率很低,使用了两次system call和四次复制,使用sendfile,OS会直接将数据从pagecache复制到NIC,然后发送到网络。在这种优 化中,只会进行一次复制。这种优化让消息消费接近网络传输的极限。

pagecache和sendfile的结合,意味着在kafka集群中,大多数consumer使用时,你将看不到磁盘的读取活动,因为他们完全从cache取数据。

关于更多sendfile和零拷贝,查看文章

端到端的批量压缩

在一些情况下,瓶颈不是CPU和磁盘,而是网络带宽。特别是数据中心之间在广域网上通过一个数据管道传输时。首先,用户可以不通过kafka压缩消息, 但是这会导致很低的压缩率,大多数的冗余是由于相同类型的消息的重复。所以多个消息的压缩比单个消息的压缩高效很多。

kafka支持有效的批量处理。消息将以压缩形式写入日志。并且只能被consumer解压。 kafka支持GZIP,Snappy 和 LZ4压缩协议。

Producer

负载均衡

producer直接发送数据给broker,该broker是没有任何中间路由层的分区的leader 。为了做到这一点,所以的kafka的节点可以回答哪个服务器 是活着的,一个topic分区的leaders在任何时候允许producer直接发送请求到服务器上。

客户端控制消息会发布到topic的那个分区上。这个可能是随机的,使用一种负载均衡实现,或者使用一些分区函数。我们为分区函数提供了接口, 该接口允许用户为分区指定一个key,并使用这个key对分区进行hash(如果需要的话,可以重写分区函数)。例如,使用user id作为key,那么 一个用户的数据将会发送到相同的分区中。这允许consumer在处理消费时做一些本地化的处理。这种风格的分区常被用于那些对本地处理比较敏感的 消费者。

异步发送

批量处理是提高效率的有效手段,为了能够批量,produer会在内存中累计一定的数据,并在一次请求中批量发送。批量处理可以通过制定固定的 消息数实现,也可以通过制定特定的延时(64k或者10ms)。通过配置这个缓冲实现了牺牲少量的延迟时间,获得更好的吞吐量的机制。

Consumer

consumer通过向即将要消费的leader brokers发送“fetch”请求来消费消息。consumer在每次请求中指定了日志的offset并获得一堆日志。 也可以回滚offset重新消费消息。

推和拉

kafka和传统的消息系统设计一样,producer推送消息给broker,consumer从broker中拉取消息。但是在一些如Scribe,Apache Flume的日志中 心系统中,数据是推送给下游的的数据流。两种方式各有利弊。但是在推送机制的系统中,当代理控制数据传输速率时,消费者很难处理。如果消费 率低于生产率,那么消费者会被消息淹没。而基于拉取的系统中,消息处理只是落后,消费者尽可能地赶上。

另一个拉取设计的好处就是可以批量发送数据给消费者。基于推送的的系统,要么马上发送请求,要么累积一定的数据后再发送,而不管消费者是否 能处理得过来。如果调整为低延迟,这将导致每次只发送一条消息,导致缓存还没完全利用就发送,这很浪费。 基于拉取的设计可以避免这个问题, 可以从当前日志位置拉取可用的消息,不必引入延迟而得到高效的批处理性能。

基于拉取的系统的不足之处就是消费者需要频繁地循环检测是否有数据到来。为了避免这一点,我们可以设置拉取请求的参数,允许消费者在轮询时 阻塞,直到数据到达。

消费位置

追踪哪些消息被消费了,是一个消息系统的关键性能点之一。 大多数的消息系统在broker上保存了消息被消费的元数据。当一个消息被消费者处理时,broker立即在本地记录或者它会等待消费者的通知。这是 一个相当直接的选择。事实上对于一个服务器来说,这个状态的变化不够明确。当这个消息被消费后,服务器可以立即删除,保持较小的数据集。

但是让broker和consumer关于哪些消息被消费达成一致,不是一件轻松的事。如果broker将一个消息交费网络处理,如果consumer处理失败了, 那么消息就会丢失。为了解决这个问题,大部分消息系统添加了一个确认状态,当消息被发送给网络时,消息被标记为发送但是没有被消费。 broker等待消费者确认并记录消息消费成功。这种策略避免了消息丢失,但是引入了新的问题。首先,如果消费者已经消费了记录,但是反馈时失败, 那么消息可能会被消费两次。其次,是性能消耗。broker必须为每个消息保存多个状态。还有一些麻烦的问题,比如,消息被发送,但是一直没有 接受到反馈。

kafka使用不同的处理方式,topic被划分到一个有序的分区集合当中,在一个时刻,每个分区仅被一个订阅消费组中的一个消费者消费。这就意味着, 每个分区上消费者的位置是单个integer,称为offset,记录了下个需要消费的消息。这使得被消费掉的消息状态非常小。每个分区只有一个数字。 这个状态可以定期的检查。这使得消息确认状态的成本非常低。

这种方式的另一个好处是,消费者可以自由地回滚offset重新消费数据。

离线数据加载

可扩展的持久化存储能力,允许消费者定期把数据导入到如hadoop或者数据库中的离线系统中。

消息分发语义

接下来我们讨论kafka为producer和consumer消息分发提供的保障策略。下面有三种消息分发保障策略:

  1. at most once-消息可能会丢失,但是不会被重复分发。
  2. at least once-消息不会被丢失,但是可能多次分发。
  3. Exactly once-有且仅有一次,消息只会被分发一次。

许多消息系统提供了有且仅有一次的分发策略,但是并不是精确意义上的(并没有考虑消息失败,多个消费者,数据写入磁盘等情况)。而kafka定义 的保障策略十分直接。一旦消息被提交,就不会丢失,只要kafka把这个消息复制到活着的服务器的分区上。

每个语义会被拆为两个问题:producer发布一个消息的保障和consumer消费消息的保障。 如果producer在发布消息时发生网络错误,那么它将无法确认这个消息是否发布成功,这个类似于使用自增主键插入数据到数据库。

在0.11.0.0之前,如果producer接收提交确认状态失败,他会重新发送这条消息。这就提供了at least once的分发策略。自0.11.0.0开始, kafka producer支持幂等分发选项,保证重新发送不会导致日志中的条目重复,为了实现这个,broker会给每个producer分配一个ID,producer 为每个消息设置一个有序的编号,用来去重。也是自0.11.0.0开始,producer支持像事务一样向多个topic分区同时发送消息,所有的消息要么都 成功,要么都失败。主要使用该特性实现topic之间的exactly-once 分发策略。

并不是所有的情况都需要使用这种强分发策略保障。对于延迟敏感的使用,我们可以指定响应的延迟,如可以指定producer等待消息被提交要 使用10ms的时间。producer也可以指定完全的异步发送或者等到leader拥有该消息。

接下来,我们从consumer的角度探讨分发策略。拥有相同日志的所有复制分区,他们的offset都相同。consumer控制日志中的offset。如果 consumer不发生任何问题,那么它只需要在内存中保存该offset。但是如果consumer发生错误或者想将该分区转交给另一个进程处理。那么它有 几种选择去处理消息并更新position。

  1. consumer可以读取消息,保存在日志中的offset,最后处理消息。但是有可能在保存offset后,没有处理消息之前发生崩溃,这种情况下, 新的consumer接管分区,并从保存的offset开始处理,即使有一些offset前面的消息可能没有被处理。这种处理和at-most-once分发策略对应, 消费者处理失败的消息有可能没有被处理。
  2. consumer可以读取消息,然后处理消息,最后保存日志中的offset。可能会处理消息成功,但是没有来得及保存offset,这种情况下,当新 的consumer接管分区时,offset开始的一些消息可能被已经被处理过了。这种处理和at-least-once分发策略对应。在多数情况下,message 有一个主键,所以更新是等效的(收到同一个消息两次,重复处理相同的记录)。

关于Exactly once ,当从一个topic消费的同时生产一个topic时,我们可以借助0.11.0.0以后的事务生产能力。在默认的“read_uncommitted” 隔离级别中,所有的消息对于消费者是可见的,但是在“read_committed”隔离级别中,consummer只能读取不在事务中的消息。(和数据库的隔离级 别类似)

当写入外部系统时,需要协同consumer的offset和实际的存储的输出。实现这一点需要引入分布式事务的两阶段提交,但是很多consumer想要写入 的输出系统不支持两阶段提交。我们可以更简单的处理这个问题,通过将让consumer记录offset和实际的输出。例如,我们将kafka消息输出到HDFS 中,我们同时记录offset和每个message数据,如果offset和message都是65,那么说明输出到HDFS成功,如果offset为65,而message只记录 到64,那么说明65没有输出成功,需要保持offset和实际存储的数据一致。

kafka默认确保at-least-once分发策略,通过在producer上禁用重新发送,并且在consumer上处理一批消息前修改offset,用户就可以实现 at-most-once分发策略。

Replication

kafka通过配置replication factor参数,为每个topic的分区创建了若干副本。这使得集群中的一个服务器挂掉时,副本可以在失败的情况下 恢复过来。

其他的消息系统也实现了复制特征,但是在我们挑剔的视角里。它其实并没有被大量使用,且有很大的缺点:slave并不活跃,吞吐量严重受到限制, 并且需要手动配置等等。但是kafka默认使用了一个replication。

副本的单元是topic分区,每个分区有一个leader和多个follower。所有的读写都要通过leader。一般情况下,有超过broker数量的很多分区, 并且leader会均匀地分配到各个broker中。follower中的日志和leader中的日志完全相同(当然,会有很小的时间段,leader领先follower 一些还没有来得及复制的消息)。

follower就像和其他消费者一样从leader消费消息,并记录到follower日志中。

和很多分布式系统一样,kafka定义一个节点是否存活有两个条件:

  1. 一个节点必须和zookeeper保持会话联系(通过心跳机制)。
  2. 如果是slave节点,它必须能够复制在leader中的写操作,并不能落下“太多”。

为避免“alived”和“failed”的模糊不清,我们将满足上述条件的节点称为“in-sync”。leader会保持跟踪一组“in-sync”节点,如果一个follower 死亡、卡住、或者落后了。leader会把它从同步复制列表中删除。决定一个节点卡住或者落后是通过“replica.lag.time.max.ms”参数来配置的。