Kafka 学习笔记
入门概念
术语
Kafka 的三层消息架构:
- 第一层是主题层,每个主题可以配置 M 个分区,而每个分区又可以配置 N 个副本。
- 第二层是分区层,每个分区的 N 个副本中只能有一个充当领导者角色,对外提供服务;其他 N-1 个副本是追随者副本,只是提供数据冗余之用。
- 第三层是消息层,分区中包含若干条消息,每条消息的位移从 0 开始,依次递增。
- 最后,客户端程序只能与分区的领导者副本进行交互。
Kafka Broker 是如何持久化数据的:
总的来说,Kafka 使用消息日志(Log)来保存数据,一个日志就是磁盘上一个只能追加写(Append-only)消息的物理文件。因为只能追加写入,故避免了缓慢的随机 I/O 操作,改为性能较好的顺序 I/O 写操作,这也是实现 Kafka 高吞吐量特性的一个重要手段。不过如果你不停地向一个日志写入消息,最终也会耗尽所有的磁盘空间,因此 Kafka 必然要定期地删除消息以回收磁盘。怎么删除呢?简单来说就是通过日志段(Log Segment)机制。在 Kafka 底层,一个日志又进一步细分成多个日志段,消息被追加写到当前最新的日志段中,当写满了一个日志段后,Kafka 会自动切分出一个新的日志段,并将老的日志段封存起来。Kafka 在后台还有定时任务会定期地检查老的日志段是否能够被删除,从而实现回收磁盘空间的目的。
消费者组(Consumer Group):
所谓的消费者组,指的是多个消费者实例共同组成一个组来消费一组主题。这组主题中的每个分区都只会被组内的一个消费者实例消费,其他消费者实例不能消费它。为什么要引入消费者组呢?主要是为了提升消费者端的吞吐量。多个消费者实例同时消费,加速整个消费端的吞吐量(TPS)。我会在专栏的后面详细介绍消费者组机制,所以现在你只需要了解消费者组是做什么的即可。另外这里的消费者实例可以是运行消费者应用的进程,也可以是一个线程,它们都称为一个消费者实例(Consumer Instance)。
重平衡(Rebalance):
消费者组里面的所有消费者实例不仅“瓜分”订阅主题的数据,而且更酷的是它们还能彼此协助。假设组内某个实例挂掉了,Kafka 能够自动检测到,然后把这个 Failed 实例之前负责的分区转移给其他活着的消费者。这个过程就是 Kafka 中大名鼎鼎的“重平衡”(Rebalance)。
所有名词术语:
- 消息:Record。Kafka 是消息引擎嘛,这里的消息就是指 Kafka 处理的主要对象。
- 主题:Topic。主题是承载消息的逻辑容器,在实际使用中多用来区分具体的业务。
- 分区:Partition。一个有序不变的消息序列。每个主题下可以有多个分区。
- 消息位移:Offset。表示分区中每条消息的位置信息,是一个单调递增且不变的值。
- 副本:Replica。Kafka 中同一条消息能够被拷贝到多个地方以提供数据冗余,这些地方就是所谓的副本。副本还分为领导者副本和追随者副本,各自有不同的角色划分。副本是在分区层级下的,即每个分区可配置多个副本实现高可用。
- 生产者:Producer。向主题发布新消息的应用程序。
- 消费者:Consumer。从主题订阅新消息的应用程序。
- 消费者位移:Consumer Offset。表征消费者消费进度,每个消费者都有自己的消费者位移。
- 消费者组:Consumer Group。多个消费者实例共同组成的一个组,同时消费多个分区以实现高吞吐。
- 重平衡:Rebalance。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。
kafka 只是消息引擎么
Kafka 在设计之初就旨在提供三个方面的特性:
- 提供一套 API 实现生产者和消费者;
- 降低网络传输和磁盘存储开销;
- 实现高伸缩性架构。
基本使用
Broker 端参数
存储信息
首先 Broker 是需要配置存储信息的,即 Broker 使用哪些磁盘。那么针对存储信息的重要参数有以下这么几个:
- log.dirs:这是非常重要的参数,指定了 Broker 需要使用的若干个文件目录路径。要知道这个参数是没有默认值的,这说明什么?这说明它必须由你亲自指定。
- log.dir:注意这是 dir,结尾没有 s,说明它只能表示单个路径,它是补充上一个参数用的。
这两个参数应该怎么设置呢?很简单,你只要设置log.dirs,即第一个参数就好了,不要设置log.dir。而且更重要的是,在线上生产环境中一定要为log.dirs配置多个路径,具体格式是一个 CSV 格式,也就是用逗号分隔的多个路径,比如/home/kafka1,/home/kafka2,/home/kafka3这样。
ZooKeeper 相关的设置
它是一个分布式协调框架,负责协调管理并保存 Kafka 集群的所有元数据信息,比如集群都有哪些 Broker 在运行、创建了哪些 Topic,每个 Topic 都有多少分区以及这些分区的 Leader 副本都在哪些机器上等信息。
Kafka 与 ZooKeeper 相关的最重要的参数当属zookeeper.connect。这也是一个 CSV 格式的参数,比如我可以指定它的值为zk1:2181,zk2:2181,zk3:2181。2181 是 ZooKeeper 的默认端口。
如果你有两套 Kafka 集群,假设分别叫它们 kafka1 和 kafka2,那么两套集群的zookeeper.connect参数可以这样指定:zk1:2181,zk2:2181,zk3:2181/kafka1和zk1:2181,zk2:2181,zk3:2181/kafka2。切记 chroot 只需要写一次,而且是加到最后的。我经常碰到有人这样指定:zk1:2181/kafka1,zk2:2181/kafka2,zk3:2181/kafka3,这样的格式是不对的。
Topic 管理
- auto.create.topics.enable:是否允许自动创建 Topic。
- unclean.leader.election.enable:是否允许 Unclean Leader 选举。
- auto.leader.rebalance.enable:是否允许定期进行 Leader 选举。
数据留存
- log.retention.{hours|minutes|ms}:这是个“三兄弟”,都是控制一条消息数据被保存多长时间。从优先级上来说 ms 设置最高、minutes 次之、hours 最低。
- log.retention.bytes:这是指定 Broker 为消息保存的总磁盘容量大小。
- message.max.bytes:控制 Broker 能够接收的最大消息大小。
Topic 级别参数
答案就是 Topic 级别参数会覆盖全局 Broker 参数的值,而每个 Topic 都能设置自己的参数值,这就是所谓的 Topic 级别参数。
保存消息
从保存消息方面来考量的话,下面这组参数是非常重要的:
- retention.ms:规定了该 Topic 消息被保存的时长。默认是 7 天,即该 Topic 只保存最近 7 天的消息。一旦设置了这个值,它会覆盖掉 Broker 端的全局参数值。
- retention.bytes:规定了要为该 Topic 预留多大的磁盘空间。和全局参数作用相似,这个值通常在多租户的 Kafka 集群中会有用武之地。当前默认值是 -1,表示可以无限使用磁盘空间。
设置方式
Topic 级别参数的设置就是这种情况,我们有两种方式可以设置:
- 创建 Topic 时进行设置
- 修改 Topic 时设置
现在让我们用以下命令来创建 Topic:
1 | bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic transaction --partitions 1 --replication-factor 1 --config retention.ms=15552000000 --config max.message.bytes=5242880 |
下面看看使用另一个自带的命令kafka-configs来修改 Topic 级别参数。假设我们现在要发送最大值是 10MB 的消息,该如何修改呢?命令如下:
1 | bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name transaction --alter --add-config max.message.bytes=10485760 |
JVM 参数
1 | export KAFKA_HEAP_OPTS=--Xms6g --Xmx6g |
客户端实践及原理剖析
生产者消息分区机制原理剖析
其实分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性(Scalability)。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且,我们还可以通过添加新的节点机器来增加整体系统的吞吐量。
分区策略
所谓分区策略是决定生产者将消息发送到哪个分区的算法。Kafka 为我们提供了默认的分区策略,同时它也支持你自定义分区策略。
如果要自定义分区策略,你需要显式地配置生产者端的参数partitioner.class。这个参数该怎么设定呢?方法很简单,在编写生产者程序时,你可以编写一个具体的类实现org.apache.kafka.clients.producer.Partitioner接口。这个接口也很简单,只定义了两个方法:partition()和close(),通常你只需要实现最重要的 partition 方法。我们来看看这个方法的方法签名:
1 | int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster); |
这里的topic、key、keyBytes、value和valueBytes都属于消息数据,cluster则是集群信息(比如当前 Kafka 集群共有多少主题、多少 Broker 等)。
轮询策略
轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是我们最常用的分区策略之一。
随机策略
1 | List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); |
按消息键保序策略
1 | List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); |
生产者压缩算法面面观
Kafka 的消息层次都分为两层:消息集合(message set)以及消息(message)。一个消息集合中包含若干条日志项(record item),而日志项才是真正封装消息的地方。Kafka 底层的消息日志由一系列消息集合日志项组成。Kafka 通常不会直接操作具体的一条条消息,它总是在消息集合这个层面上进行写入操作
何时压缩?
在 Kafka 中,压缩可能发生在两个地方:生产者端和 Broker 端。
比如下面这段程序代码展示了如何构建一个开启 GZIP 的 Producer 对象:
1 | Properties props = new Properties(); |
何时解压缩?
通常来说解压缩发生在消费者程序中,也就是说 Producer 发送压缩消息到 Broker 后,Broker 照单全收并原样保存起来。当 Consumer 程序请求这部分消息时,Broker 依然原样发送出去,当消息到达 Consumer 端后,由 Consumer 自行解压缩还原成之前的消息。
如果用一句话总结一下压缩和解压缩,那么我希望你记住这句话:Producer 端压缩、Broker 端保持、Consumer 端解压缩。
无消息丢失配置怎么实现?
一句话概括,Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证。
当 Kafka 的若干个 Broker 成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交。此时,这条消息在 Kafka 看来就正式变为“已提交”消息了。
案例 1:生产者程序丢失数据
目前 Kafka Producer 是异步发送消息的,也就是说如果你调用的是 producer.send(msg) 这个 API,那么它通常会立即返回,但此时你不能认为消息发送已成功完成。
这种发送方式有个有趣的名字,叫“fire and forget”,翻译一下就是“发射后不管”。这个术语原本属于导弹制导领域,后来被借鉴到计算机领域中,它的意思是,执行完一个操作后不去管它的结果是否成功。调用 producer.send(msg) 就属于典型的“fire and forget”,因此如果出现消息丢失,我们是无法知晓的。这个发送方式挺不靠谱吧,不过有些公司真的就是在使用这个 API 发送消息。
实际上,解决此问题的方法非常简单:Producer 永远要使用带有回调通知的发送 API,也就是说不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。不要小瞧这里的 callback(回调),它能准确地告诉你消息是否真的提交成功了。一旦出现消息提交失败的情况,你就可以有针对性地进行处理。
案例 2:消费者程序丢失数据
Consumer 端丢失数据主要体现在 Consumer 端要消费的消息不见了。Consumer 程序有个“位移”的概念,表示的是这个 Consumer 当前消费到的 Topic 分区的位置。下面这张图来自于官网,它清晰地展示了 Consumer 端的位移数据。
正确使用书签有两个步骤:第一步是读书,第二步是更新书签页。如果这两步的顺序颠倒了,就可能出现这样的场景:当前的书签页是第 90 页,我先将书签放到第 100 页上,之后开始读书。当阅读到第 95 页时,我临时有事中止了阅读。那么问题来了,当我下次直接跳到书签页阅读时,我就丢失了第 96~99 页的内容,即这些消息就丢失了。同理,Kafka 中 Consumer 端的消息丢失就是这么一回事。
要对抗这种消息丢失,办法很简单:维持先消费消息(阅读),再更新位移(书签)的顺序即可。这样就能最大限度地保证消息不丢失。
当然,这种处理方式可能带来的问题是消息的重复处理,类似于同一页书被读了很多遍,但这不属于消息丢失的情形。
为了加快阅读速度,你把书中的 10 个章节分别委托给你的 10 个朋友,请他们帮你阅读,并拜托他们告诉你主旨大意。当电子书临近过期时,这 10 个人告诉你说他们读完了自己所负责的那个章节的内容,于是你放心地把该书还了回去。不料,在这 10 个人向你描述主旨大意时,你突然发现有一个人对你撒了谎,他并没有看完他负责的那个章节。那么很显然,你无法知道那一章的内容了。
这个问题的解决方案也很简单:如果是多线程异步处理消费消息,Consumer 程序不要开启自动提交位移,而是要应用程序手动提交位移。
最佳实践
看完这两个案例之后,我来分享一下 Kafka 无消息丢失的配置,每一个其实都能对应上面提到的问题。
- 不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,一定要使用带有回调通知的 send 方法。
- 设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
- 设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
- 设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。
- 设置 replication.factor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
- 设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。
- 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。
- 确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Cons
客户端都有哪些不常见但是很高级的功能?
拦截器
如果你用过 Spring Interceptor 或是 Apache Flume,那么应该不会对拦截器这个概念感到陌生,其基本思想就是允许应用程序在不修改逻辑的情况下,动态地实现一组可插拔的事件处理逻辑链。
Kafka 拦截器
Kafka 拦截器分为生产者拦截器和消费者拦截器
1 | Properties props = new Properties(); |
org.apache.kafka.clients.producer.ProducerInterceptor 接口。该接口是 Kafka 提供的,里面有两个核心的方法。
- onSend:该方法会在消息发送之前被调用。如果你想在发送之前对消息“美美容”,这个方法是你唯一的机会。
- onAcknowledgement:该方法会在消息成功提交或发送失败之后被调用。还记得我在上一期中提到的发送回调通知 callback 吗?onAcknowledgement 的调用要早于 callback 的调用。值得注意的是,这个方法和 onSend 不是在同一个线程中被调用的,因此如果你在这两个方法中调用了某个共享可变对象,一定要保证线程安全哦。还有一点很重要,这个方法处在 Producer 发送的主路径中,所以最好别放一些太重的逻辑进去,否则你会发现你的 Producer TPS 直线下降。
同理,指定消费者拦截器也是同样的方法,只是具体的实现类要实现 org.apache.kafka.clients.consumer.ConsumerInterceptor 接口,这里面也有两个核心方法。
- onConsume:该方法在消息返回给 Consumer 程序之前调用。也就是说在开始正式处理消息之前,拦截器会先拦一道,搞一些事情,之后再返回给你。
- onCommit:Consumer 在提交位移之后调用该方法。通常你可以在该方法中做一些记账类的动作,比如打日志等。
典型使用场景
其实,跟很多拦截器的用法相同,Kafka 拦截器可以应用于包括客户端监控、端到端系统性能检测、消息审计等多种功能在内的场景。
Java生产者是如何管理TCP连接的?
Apache Kafka的所有通信都是基于TCP的,而不是于HTTP或其他协议的
1 为什采用TCP?
(1)TCP拥有一些高级功能,如多路复用请求和同时轮询多个连接的能力。
(2)很多编程语言的HTTP库功能相对的比较简陋。
名词解释:
多路复用请求:multiplexing request,是将两个或多个数据合并到底层—物理连接中的过程。TCP的多路复用请求会在一条物理连接上创建若干个虚拟连接,每个虚拟连接负责流转各自对应的数据流。严格讲:TCP并不能多路复用,只是提供可靠的消息交付语义保证,如自动重传丢失的报文。
2 何时创建TCP连接?
(1)在创建KafkaProducer实例时,
A:生产者应用会在后台创建并启动一个名为Sender的线程,该Sender线程开始运行时,首先会创建与Broker的连接。
B:此时不知道要连接哪个Broker,kafka会通过METADATA请求获取集群的元数据,连接所有的Broker。
(2)还可能在更新元数据后,或在消息发送时
3 何时关闭TCP连接
(1)Producer端关闭TCP连接的方式有两种:用户主动关闭,或kafka自动关闭。
A:用户主动关闭,通过调用producer.close()方关闭,也包括kill -9暴力关闭。
B:Kafka自动关闭,这与Producer端参数connection.max.idles.ms的值有关,默认为9分钟,9分钟内没有任何请求流过,就会被自动关闭。这个参数可以调整。
C:第二种方式中,TCP连接是在Broker端被关闭的,但这个连接请求是客户端发起的,对TCP而言这是被动的关闭,被动关闭会产生大量的CLOSE_WAIT连接。
幂等生产者和事务生产者
所谓的消息交付可靠性保障,是指 Kafka 对 Producer 和 Consumer 要处理的消息提供什么样的承诺。
常见的承诺有以下三种:
- 最多一次(at most once):消息可能会丢失,但绝不会被重复发送。
- 至少一次(at least once):消息不会丢失,但有可能被重复发送。
- 精确一次(exactly once):消息不会丢失,也不会被重复发送。
目前,Kafka 默认提供的交付可靠性保障是第二种,即至少一次。
Kafka 是怎么做到精确一次的呢?简单来说,这是通过两种机制:幂等性(Idempotence)和事务(Transaction)。
幂等性(Idempotence)
- 在计算机领域中,幂等性的含义稍微有一些不同:
在命令式编程语言(比如 C)中,若一个子程序是幂等的,那它必然不能修改系统状态。这样不管运行这个子程序多少次,与该子程序关联的那部分系统状态保持不变。 - 在函数式编程语言(比如 Scala 或 Haskell)中,很多纯函数(pure function)天然就是幂等的,它们不执行任何的 side effect。
幂等性有很多好处,其最大的优势在于我们可以安全地重试任何幂等性操作,反正它们也不会破坏我们的系统状态。如果是非幂等性操作,我们还需要担心某些操作执行多次对状态的影响,但对于幂等性操作而言,我们根本无需担心此事。
幂等性 Producer
enable.idempotence 被设置成 true 后,Producer 自动升级成幂等性 Producer,其他所有的代码逻辑都不需要改变。Kafka 自动帮你做消息的重复去重。底层具体的原理很简单,就是经典的用空间去换时间的优化思路,即在 Broker 端多保存一些字段。当 Producer 发送了具有相同字段值的消息后,Broker 能够自动知晓这些消息已经重复了,于是可以在后台默默地把它们“丢弃”掉。当然,实际的实现原理并没有这么简单,但你大致可以这么理解。
看上去,幂等性 Producer 的功能很酷,使用起来也很简单,仅仅设置一个参数就能保证消息不重复了,但实际上,我们必须要了解幂等性 Producer 的作用范围。
首先,它只能保证单分区上的幂等性,即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。其次,它只能实现单会话上的幂等性,不能实现跨会话的幂等性。这里的会话,你可以理解为 Producer 进程的一次运行。当你重启了 Producer 进程之后,这种幂等性保证就丧失了。
那么你可能会问,如果我想实现多分区以及多会话上的消息无重复,应该怎么做呢?答案就是事务(transaction)或者依赖事务型 Producer。这也是幂等性 Producer 和事务型 Producer 的最大区别!
实现逻辑很简单:
- 区分producer会话
producer每次启动后,首先向broker申请一个全局唯一的pid,用来标识本次会话。
- 消息检测
message_v2 增加了sequence number字段,producer每发一批消息,seq就加1。
broker在内存维护(pid,seq)映射,收到消息后检查seq,如果,
1 | new_seq=old_seq+1: 正常消息; |
- producer重试
producer在收到明确的的消息丢失ack,或者超时后未收到ack,要进行重试。
事务型 Producer
事务型 Producer 能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。另外,事务型 Producer 也不惧进程的重启。Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理。
设置事务型 Producer 的方法也很简单,满足两个要求即可:
- 和幂等性 Producer 一样,开启 enable.idempotence = true。
- 设置 Producer 端参数 transactional. id。最好为其设置一个有意义的名字。
1 | producer.initTransactions(); |
在 Consumer 端,读取事务型 Producer 发送的消息也是需要一些变更的。修改起来也很简单,设置 isolation.level 参数的值即可。当前这个参数有两个取值:
- read_uncommitted:这是默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取。很显然,如果你用了事务型 Producer,那么对应的 Consumer 就不要使用这个值。
- read_committed:表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然了,它也能看到非事务型 Producer 写入的所有消息。
消费者组到底是什么?
Consumer Group :Kafka提供的可扩展且具有容错性的消息者机制。
1,重要特征:
A:组内可以有多个消费者实例(Consumer Instance)。
B:消费者组的唯一标识被称为Group ID,组内的消费者共享这个公共的ID。
C:消费者组订阅主题,主题的每个分区只能被组内的一个消费者消费
D:消费者组机制,同时实现了消息队列模型和发布/订阅模型。
2,重要问题:
A:消费组中的实例与分区的关系:
消费者组中的实例个数,最好与订阅主题的分区数相同,否则多出的实例只会被闲置。一个分区只能被一个消费者实例订阅。
B:消费者组的位移管理方式:
(1)对于Consumer Group而言,位移是一组KV对,Key是分区,V对应Consumer消费该分区的最新位移。
(2)Kafka的老版本消费者组的位移保存在Zookeeper中,好处是Kafka减少了Kafka Broker端状态保存开销。但ZK是一个分布式的协调框架,不适合进行频繁的写更新,这种大吞吐量的写操作极大的拖慢了Zookeeper集群的性能。
(3)Kafka的新版本采用了将位移保存在Kafka内部主题的方法。
C:消费者组的重平衡:
(1)重平衡:本质上是一种协议,规定了消费者组下的每个消费者如何达成一致,来分配订阅topic下的每个分区。
(2)触发条件:
a,组成员数发生变更
b,订阅主题数发生变更
c,定阅主题分区数发生变更
(3)影响:
Rebalance 的设计是要求所有consumer实例共同参与,全部重新分配所有用分区。并且Rebalance的过程比较缓慢,这个过程消息消费会中止。
揭开神秘的“位移主题”面纱
1,诞生背景
A :老版本的Kafka会把位移信息保存在Zk中,当Consumer重启后,自动从Zk中读取位移信息。这种设计使Kafka Broker不需要保存位移数据,可减少Broker端需要持有的状态空间,有利于实现高伸缩性。
B :但zk不适用于高频的写操作,这令zk集群性能严重下降,在新版本中将消费者的位移数据作为一条条普通的Kafka消息,提交至内部主题(_consumer_offsets)中保存。实现高持久性和高频写操作。
2,特点:
A :位移主题是一个普通主题,同样可以被手动创建,修改,删除。。
B :位移主题的消息格式是kafka定义的,不可以被手动修改,若修改格式不正确,kafka将会崩溃。
C :位移主题保存了三部分内容:Group ID,主题名,分区号。
3,创建:
A :当Kafka集群中的第一个Consumer程序启动时,Kafka会自动创建位移主题。也可以手动创建
B :分区数依赖于Broker端的offsets.topic.num.partitions的取值,默认为50
C :副本数依赖于Broker端的offsets.topic.replication.factor的取值,默认为3
4,使用:
A :当Kafka提交位移消息时会使用这个主题
B :位移提交得分方式有两种:手动和自动提交位移。
C :推荐使用手动提交位移,自动提交位移会存在问题:只要consumer一直启动设置,他就会无限期地向主题写入消息。
5,清理:
A :Kafka使用Compact策略来删除位移主题中的过期消息,避免位移主题无限膨胀。
B :kafka提供专门的后台线程定期巡检待compcat的主题,查看是否存在满足条件的可删除数据。
对于同一个 Key 的两条消息 M1 和 M2,如果 M1 的发送时间早于 M2,那么 M1 就是过期消息。Compact 的过程就是扫描日志的所有消息,剔除那些过期的消息,然后把剩下的消息整理在一起。我在这里贴一张来自官网的图片,来说明 Compact 过程。
Kafka 提供了专门的后台线程定期地巡检待 Compact 的主题,看看是否存在满足条件的可删除数据。这个后台线程叫 Log Cleaner。很多实际生产环境中都出现过位移主题无限膨胀占用过多磁盘空间的问题,如果你的环境中也有这个问题,我建议你去检查一下 Log Cleaner 线程的状态,通常都是这个线程挂掉了导致的。
6,注意事项:
A :建议不要修改默认分区数,在kafka中有些许功能写死的是50个分区
B :建议不要使用自动提交模式,采用手动提交,避免消费者无限制的写入消息。
C :后台定期巡检线程叫Log Cleaner,若线上遇到位移主题无限膨胀占用过多磁盘,应该检查此线程的工作状态。
消费者组重平衡能避免吗?
Rebalance 概念
Rebalance 就是让一个 Consumer Group 下所有的 Consumer 实例就如何消费订阅主题的所有分区达成共识的过程。在 Rebalance 过程中,所有 Consumer 实例共同参与,在协调者组件的帮助下,完成订阅主题分区的分配。但是,在整个过程中,所有实例都不能消费任何消息,因此它对 Consumer 的 TPS 影响很大。
所谓协调者,在 Kafka 中对应的术语是 Coordinator,它专门为 Consumer Group 服务,负责为 Group 执行 Rebalance 以及提供位移管理和组成员管理等。
具体来讲,Consumer 端应用程序在提交位移时,其实是向 Coordinator 所在的 Broker 提交位移。同样地,当 Consumer 应用启动时,也是向 Coordinator 所在的 Broker 发送各种请求,然后由 Coordinator 负责执行消费者组的注册、成员管理记录等元数据管理操作。
所有 Broker 在启动时,都会创建和开启相应的 Coordinator 组件。也就是说,所有 Broker 都有各自的 Coordinator 组件。那么,Consumer Group 如何确定为它服务的 Coordinator 在哪台 Broker 上呢?答案就在我们之前说过的 Kafka 内部位移主题 __consumer_offsets 身上。
目前,Kafka 为某个 Consumer Group 确定 Coordinator 所在的 Broker 的算法有 2 个步骤。
- 第 1 步:确定由位移主题的哪个分区来保存该 Group 数据:partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)。
- 第 2 步:找出该分区 Leader 副本所在的 Broker,该 Broker 即为对应的 Coordinator。
简单解释一下上面的算法。首先,Kafka 会计算该 Group 的 group.id 参数的哈希值。比如你有个 Group 的 group.id 设置成了“test-group”,那么它的 hashCode 值就应该是 627841412。其次,Kafka 会计算 __consumer_offsets 的分区数,通常是 50 个分区,之后将刚才那个哈希值对分区数进行取模加求绝对值计算,即 abs(627841412 % 50) = 12。此时,我们就知道了位移主题的分区 12 负责保存这个 Group 的数据。有了分区号,算法的第 2 步就变得很简单了,我们只需要找出位移主题分区 12 的 Leader 副本在哪个 Broker 上就可以了。这个 Broker,就是我们要找的 Coordinator。
Rebalance 的弊端是
- Rebalance 影响 Consumer 端 TPS。这个之前也反复提到了,这里就不再具体讲了。总之就是,在 Rebalance 期间,Consumer 会停下手头的事情,什么也干不了。
- Rebalance 很慢。如果你的 Group 下成员很多,就一定会有这样的痛点。还记得我曾经举过的那个国外用户的例子吧?他的 Group 下有几百个 Consumer 实例,Rebalance 一次要几个小时。在那种场景下,Consumer Group 的 Rebalance 已经完全失控了。
- Rebalance 效率不高。当前 Kafka 的设计机制决定了每次 Rebalance 时,Group 下的所有成员都要参与进来,而且通常不会考虑局部性原理,但局部性原理对提升系统性能是特别重要的。
Rebalance 发生的时机
Rebalance 发生的时机有三个:
- 组成员数量发生变化
- 订阅主题数量发生变化
- 订阅主题的分区数发生变化
相关的主要参数
- session.timeout.ms:即如果 Coordinator 在 10 秒之内没有收到 Group 下某 Consumer 实例的心跳,它就会认为这个 Consumer 实例已经挂了。
- heartbeat.interval.ms:控制发送心跳请求频率的参数
- max.poll.interval.ms:限定了 Consumer 端应用程序两次调用 poll 方法的最大时间间隔
- GC 参数
你可以“无脑”地应用在你的生产环境中。
- 设置 session.timeout.ms = 6s。
- 设置 heartbeat.interval.ms = 2s。
- 要保证 Consumer 实例在被判定为“dead”之前,能够发送至少 3 轮的心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。
Kafka中位移提交那些事儿
Consumer 端有个位移的概念,它和消息在分区中的位移不是一回事儿,虽然它们的英文都是 Offset。今天我们要聊的位移是 Consumer 的消费位移,它记录了 Consumer 要消费的下一条消息的位移。
Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。因为 Consumer 能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的,即 Consumer 需要为分配给它的每个分区提交各自的位移数据。
从用户的角度来说,位移提交分为自动提交和手动提交;从 Consumer 端的角度来说,位移提交分为同步提交和异步提交。
自动提交
1 | Properties props = new Properties(); |
手动提交/同步提交
开启手动提交位移的方法就是设置 enable.auto.commit 为 false。
1 | while (true) { |
可见,调用 consumer.commitSync() 方法的时机,是在你处理完了 poll() 方法返回的所有消息之后。
它能保证不出现消费丢失的情况。但自动提交位移的一个问题在于,它可能会出现重复消费。
异步提交
1 | while (true) { |
commitAsync 的问题在于,出现问题时它不会自动重试。因为它是异步操作,倘若提交失败后自动重试,那么它重试时提交的位移值可能早已经“过期”或不是最新值了。因此,异步提交的重试其实没有意义,所以 commitAsync 是不会重试的。
结合同步异步
1 | try { |
对于常规性、阶段性的手动提交,我们调用 commitAsync() 避免程序阻塞,而在 Consumer 要关闭前,我们调用 commitSync() 方法执行同步阻塞式的位移提交,以确保 Consumer 关闭前能够保存正确的位移数据。
细化提交信息粒度
你的 poll 方法返回的不是 500 条消息,而是 5000 条。那么,你肯定不想把这 5000 条消息都处理完之后再提交位移,因为一旦中间出现差错,之前处理的全部都要重来一遍。
1 | private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); |
CommitFailedException异常怎么处理?
所谓 CommitFailedException,顾名思义就是 Consumer 客户端在提交位移时出现了错误或异常,而且还是那种不可恢复的严重异常。如果异常是可恢复的瞬时错误,提交位移的 API 自己就能规避它们了,因为很多提交位移的 API 方法是支持自动错误重试的,比如我们在上一期中提到的 commitSync 方法。
看看社区对这个异常的最新解释:
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
这段话前半部分的意思是,本次提交位移失败了,原因是消费者组已经开启了 Rebalance 过程,并且将要提交位移的分区分配给了另一个消费者实例。出现这个情况的原因是,你的消费者实例连续两次调用 poll 方法的时间间隔超过了期望的 max.poll.interval.ms 参数值。这通常表明,你的消费者实例花费了太长的时间进行消息处理,耽误了调用 poll 方法。
在后半部分,社区给出了两个相应的解决办法:
- 增加期望的时间间隔 max.poll.interval.ms 参数值。
- 减少 poll 方法一次性返回的消息数量,即减少 max.poll.records 参数值。
场景一
我们先说说最常见的场景。当消息处理的总时间超过预设的 max.poll.interval.ms 参数值时,Kafka Consumer 端会抛出 CommitFailedException 异常。这是该异常最“正宗”的登场方式。你只需要写一个 Consumer 程序,使用 KafkaConsumer.subscribe 方法随意订阅一个主题,之后设置 Consumer 端参数 max.poll.interval.ms=5 秒,最后在循环调用 KafkaConsumer.poll 方法之间,插入 Thread.sleep(6000) 和手动提交位移,就可以成功复现这个异常了。在这里,我展示一下主要的代码逻辑。
1 | … |
具体来说有 4 种方法:
- 缩短单条消息处理的时间。比如,之前下游系统消费一条消息的时间是 100 毫秒,优化之后成功地下降到 50 毫秒,那么此时 Consumer 端的 TPS 就提升了一倍。
- 增加 Consumer 端允许下游系统消费一批消息的最大时长。这取决于 Consumer 端参数 max.poll.interval.ms 的值。在最新版的 Kafka 中,该参数的默认值是 5 分钟。如果你的消费逻辑不能简化,那么提高该参数值是一个不错的办法。
- 减少下游系统一次性消费的消息总数。这取决于 Consumer 端参数 max.poll.records 的值。当前该参数的默认值是 500 条,表明调用一次 KafkaConsumer.poll 方法,最多返回 500 条消息。
- 下游系统使用多线程来加速消费。这应该算是“最高级”同时也是最难实现的解决办法了。具体的思路就是,让下游系统手动创建多个消费线程处理 poll 方法返回的一批消息。之前你使用 Kafka Consumer 消费数据更多是单线程的,所以当消费速度无法匹及 Kafka Consumer 消息返回的速度时,它就会抛出 CommitFailedException 异常。
场景二
如果你的应用中同时出现了设置相同 group.id 值的消费者组程序和独立消费者程序,那么当独立消费者程序手动提交位移时,Kafka 就会立即抛出 CommitFailedException 异常,因为 Kafka 无法识别这个具有相同 group.id 的消费者实例,于是就向它返回一个错误,表明它不是消费者组内合法的成员。
多线程开发消费者实例
Kafka Java Consumer 设计原理
在开始探究之前,我先简单阐述下 Kafka Java Consumer 为什么采用单线程的设计。了解了这一点,对我们后面制定多线程方案大有裨益。
从 Kafka 0.10.1.0 版本开始,KafkaConsumer 就变为了双线程的设计,即用户主线程和心跳线程。
所谓用户主线程,就是你启动 Consumer 应用程序 main 方法的那个线程,而新引入的心跳线程(Heartbeat Thread)只负责定期给对应的 Broker 机器发送心跳请求,以标识消费者应用的存活性(liveness)。引入这个心跳线程还有一个目的,那就是期望它能将心跳频率与主线程调用 KafkaConsumer.poll 方法的频率分开,从而解耦真实的消息处理逻辑与消费者组成员存活性管理。
不过,虽然有心跳线程,但实际的消息获取逻辑依然是在用户主线程中完成的。因此,在消费消息的这个层面上,我们依然可以安全地认为 KafkaConsumer 是单线程的设计。
多线程方案
首先,我们要明确的是,KafkaConsumer 类不是线程安全的 (thread-safe)。所有的网络 I/O 处理都是发生在用户主线程中,因此,你在使用过程中必须要确保线程安全。简单来说,就是你不能在多个线程中共享同一个 KafkaConsumer 实例,否则程序会抛出 ConcurrentModificationException 异常。
当然了,这也不是绝对的。KafkaConsumer 中有个方法是例外的,它就是 wakeup(),你可以在其他线程中安全地调用 KafkaConsumer.wakeup() 来唤醒 Consumer。
鉴于 KafkaConsumer 不是线程安全的事实,我们能够制定两套多线程方案。
方案一
1.消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer 实例,负责完整的消息获取、消息处理流程。如下图所示:
1 | public class KafkaConsumerRunner implements Runnable { |
这段代码创建了一个 Runnable 类,表示执行消费获取和消费处理的逻辑。每个 KafkaConsumerRunner 类都会创建一个专属的 KafkaConsumer 实例。在实际应用中,你可以创建多个 KafkaConsumerRunner 实例,并依次执行启动它们,以实现方案 1 的多线程架构。
方案二
2.消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。获取消息的线程可以是一个,也可以是多个,每个线程维护专属的 KafkaConsumer 实例,处理消息则交由特定的线程池来做,从而实现消息获取与消息处理的真正解耦。具体架构如下图所示:
因为该方案将消息获取和消息处理分开了,也就是说获取某条消息的线程不是处理该消息的线程,因此无法保证分区内的消费顺序。举个例子,比如在某个分区中,消息 1 在消息 2 之前被保存,那么 Consumer 获取消息的顺序必然是消息 1 在前,消息 2 在后,但是,后面的 Worker 线程却有可能先处理消息 2,再处理消息 1,这就破坏了消息在分区中的顺序。还是那句话,如果你在意 Kafka 中消息的先后顺序,方案 2 的这个劣势是致命的。
1 | private final KafkaConsumer<String, String> consumer; |
这段代码最重要的地方是最后一行:当 Consumer 的 poll 方法返回消息后,由专门的线程池来负责处理具体的消息。调用 poll 方法的主线程不负责消息处理逻辑,这样就实现了方案 2 的多线程架构。
ThreadPoolExecutor的构造函数
1 | public ThreadPoolExecutor(int corePoolSize, // 指定了线程池中的线程数量,它的数量决定了添加的任务是开辟新的线程去执行,还是放到workQueue任务队列中去; |
比较
我们来打个比方。比如一个完整的消费者应用程序要做的事情是 1、2、3、4、5,那么方案 1 的思路是粗粒度化的工作划分,也就是说方案 1 会创建多个线程,每个线程完整地执行 1、2、3、4、5,以实现并行处理的目标,它不会进一步分割具体的子任务;而方案 2 则更细粒度化,它会将 1、2 分割出来,用单线程(也可以是多线程)来做,对于 3、4、5,则用另外的多个线程来做。
这两种方案孰优孰劣呢?应该说是各有千秋。我总结了一下这两种方案的优缺点,我们先来看看下面这张表格。
Java 消费者是如何管理TCP连接的?
我们先从消费者创建 TCP 连接开始讨论。消费者端主要的程序入口是 KafkaConsumer 类。和生产者不同的是,构建 KafkaConsumer 实例时是不会创建任何 TCP 连接的,也就是说,当你执行完 new KafkaConsumer(properties) 语句后,你会发现,没有 Socket 连接被创建出来。这一点和 Java 生产者是有区别的,主要原因就是生产者入口类 KafkaProducer 在构建实例的时候,会在后台默默地启动一个 Sender 线程,这个 Sender 线程负责 Socket 连接的创建。
创建 TCP 连接
如果 Socket 不是在构造函数中创建的,那么是在 KafkaConsumer.subscribe 或 KafkaConsumer.assign 方法中创建的吗?严格来说也不是。我还是直接给出答案吧:TCP 连接是在调用 KafkaConsumer.poll 方法时被创建的。再细粒度地说,在 poll 方法内部有 3 个时机可以创建 TCP 连接。
- 1.发起 FindCoordinator 请求时
- 2.连接协调者时
- 3.消费数据时:假设消费者要消费 5 个分区的数据,这 5 个分区各自的领导者副本分布在 4 台 Broker 上,那么该消费者在消费时会创建与这 4 台 Broker 的 Socket 连接。
我们来看看这段日志。
[2019-05-27 10:00:54,142] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9092 (id: -1 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)
…
[2019-05-27 10:00:54,188] DEBUG [Consumer clientId=consumer-1, groupId=test] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(name=‘t4’)], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node localhost:9092 (id: -1 rack: null) (org.apache.kafka.clients.NetworkClient:1097)
…
[2019-05-27 10:00:54,188] TRACE [Consumer clientId=consumer-1, groupId=test] Sending FIND_COORDINATOR {key=test,key_type=0} with correlation id 0 to node -1 (org.apache.kafka.clients.NetworkClient:496)
[2019-05-27 10:00:54,203] TRACE [Consumer clientId=consumer-1, groupId=test] Completed receive from node -1 for FIND_COORDINATOR with correlation id 0, received {throttle_time_ms=0,error_code=0,error_message=null, node_id=2,host=localhost,port=9094} (org.apache.kafka.clients.NetworkClient:837)
…
[2019-05-27 10:00:54,204] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9094 (id: 2147483645 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)
…
[2019-05-27 10:00:54,237] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9094 (id: 2 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)
[2019-05-27 10:00:54,237] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9092 (id: 0 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)
[2019-05-27 10:00:54,238] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9093 (id: 1 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)
这里我稍微解释一下,日志的第一行是消费者程序创建的第一个 TCP 连接,就像我们前面说的,这个 Socket 用于发送 FindCoordinator 请求。由于这是消费者程序创建的第一个连接,此时消费者对于要连接的 Kafka 集群一无所知,因此它连接的 Broker 节点的 ID 是 -1,表示消费者根本不知道要连接的 Kafka Broker 的任何信息。
值得注意的是日志的第二行,消费者复用了刚才创建的那个 Socket 连接,向 Kafka 集群发送元数据请求以获取整个集群的信息。
日志的第三行表明,消费者程序开始发送 FindCoordinator 请求给第一步中连接的 Broker,即 localhost:9092,也就是 nodeId 等于 -1 的那个。在十几毫秒之后,消费者程序成功地获悉协调者所在的 Broker 信息,也就是第四行标为橙色的“node_id = 2”。
完成这些之后,消费者就已经知道协调者 Broker 的连接信息了,因此在日志的第五行发起了第二个 Socket 连接,创建了连向 localhost:9094 的 TCP。只有连接了协调者,消费者进程才能正常地开启消费者组的各种功能以及后续的消息消费。
在日志的最后三行中,消费者又分别创建了新的 TCP 连接,主要用于实际的消息获取。还记得我刚才说的吗?要消费的分区的领导者副本在哪台 Broker 上,消费者就要创建连向哪台 Broker 的 TCP。在我举的这个例子中,localhost:9092,localhost:9093 和 localhost:9094 这 3 台 Broker 上都有要消费的分区,因此消费者创建了 3 个 TCP 连接。
看完这段日志,你应该会发现日志中的这些 Broker 节点的 ID 在不断变化。有时候是 -1,有时候是 2147483645,只有在最后的时候才回归正常值 0、1 和 2。这又是怎么回事呢?
前面我们说过了 -1 的来由,即消费者程序(其实也不光是消费者,生产者也是这样的机制)首次启动时,对 Kafka 集群一无所知,因此用 -1 来表示尚未获取到 Broker 数据。
那么 2147483645 是怎么来的呢?它是由 Integer.MAX_VALUE 减去协调者所在 Broker 的真实 ID 计算得来的。看第四行标为橙色的内容,我们可以知道协调者 ID 是 2,因此这个 Socket 连接的节点 ID 就是 Integer.MAX_VALUE 减去 2,即 2147483647 减去 2,也就是 2147483645。这种节点 ID 的标记方式是 Kafka 社区特意为之的结果,目的就是要让组协调请求和真正的数据获取请求使用不同的 Socket 连接。
至于后面的 0、1、2,那就很好解释了。它们表征了真实的 Broker ID,也就是我们在 server.properties 中配置的 broker.id 值。
消费者程序会创建 3 类 TCP 连接:
- 确定协调者和获取集群元数据。
- 连接协调者,令其执行组成员管理操作。
- 执行实际的消息获取。
何时关闭 TCP 连接?
和生产者类似,消费者关闭 Socket 也分为主动关闭和 Kafka 自动关闭。主动关闭是指你显式地调用消费者 API 的方法去关闭消费者,具体方式就是手动调用 KafkaConsumer.close() 方法,或者是执行 Kill 命令,不论是 Kill -2 还是 Kill -9;而 Kafka 自动关闭是由消费者端参数 connection.max.idle.ms 控制的,该参数现在的默认值是 9 分钟,即如果某个 Socket 连接上连续 9 分钟都没有任何请求“过境”的话,那么消费者会强行“杀掉”这个 Socket 连接。
不过,和生产者有些不同的是,如果在编写消费者程序时,你使用了循环的方式来调用 poll 方法消费消息,那么上面提到的所有请求都会被定期发送到 Broker,因此这些 Socket 连接上总是能保证有请求在发送,从而也就实现了“长连接”的效果。
针对上面提到的三类 TCP 连接,你需要注意的是,当第三类 TCP 连接成功创建后,消费者程序就会废弃第一类 TCP 连接,之后在定期请求元数据时,它会改为使用第三类 TCP 连接。也就是说,最终你会发现,第一类 TCP 连接会在后台被默默地关闭掉。对一个运行了一段时间的消费者程序来说,只会有后面两类 TCP 连接存在。
消费者组消费进度监控都怎么实现?
1 为什么要监控
A :对于Kafka消费者,最重要的事情就是监控它们的消费进度(消费的滞后程度)常称为:Consumer Lag
B :Lag的单位是消息数,他直接反映了一个消费者的运行情况。一个正常的消费者的Lag应当很小,设置为0。这表明消费者能够及时地消费生产者生产出来的消息。反之,一个消费者Lag值很大的话表明它无法跟上生产者的速度。
C :如果消费者速度无法匹及生产者的数据,极有可能导致它消费的数据已经不在操作系统的页缓存中了,那些数据就失去了享有Zero Copy技术的条件,不得不从磁盘中读取,进一步拉大了与生产者的差距。并且会越来大。
所以:在实际业务场景中必须时刻关注消费者的消费进度。一旦出现Lag逐步增加的趋势,就要立即定位问题,及时处理,避免问题扩散。
2 如何监控
A :使用Kafka自带的命令行工具kafka-consumer-groups脚本
B :使用Kafka Java Conssumer API编程
C :使用Kafka自带的JMX监控指标
3 方法具体分析
A :Kafka自带命令
(1) kafka-consumer-groups脚本是kafka为我们提供的最直接的监控消费者消费进度工具。
(2) 使用:
$ bin/kafka-consumer-groups.sh –bootstrap-server <Kafka broker连接信息 > –describe –group <group 名称 >
<Kafka broker 连接信息 >:主机:端口
<group 名称 > :要监控的消费组的 group.id值
(3)展示的信息:主题,分区,该消费者组最新消费消息的位移值(CURRENT-OFFSET值),每个分区当前最新生产的消息的位移值(LOG-END-OFFSET),LAG(前两者的差值),消费者实例ID,消费者连接Broker的主机名以及消费者的CLENT-ID信息。
B :Kafka Java Consumer API
(1)首先获取给定的消费者组的最新消费消息的位移
(2)在获取订阅分区的最新消息位移
(3)最后执行相应的减法操作,获取Lag值并封装进一个Map对象。
1 | public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException { |
C :Kafka JMX监控指标
使用Kafka默认提供 的JMX监控指标来监控消费者的Lag值。
(1)Kafka消费者提供了一个名为Kafka.consumer:type=consumer-fetch-manager-metrics,client-id=”{client-id}”的JMX指标。
(2)有两个重要的属性:records-lag-max 和 records-lead-min 分别表示消费者在测试窗口时间内曾经达到的最大的Lag值和最小的Lead值。
(3)Lead值是指消费者最新消费消息的位移和分区当前第一条消息的位移的差值。即:Lag越大,Lead就越小。
深入Kafka内核
Kafka副本机制详解
所谓的副本机制(Replication),也可以称之为备份机制,通常是指分布式系统在多台网络互联的机器上保存有相同的数据拷贝。
副本机制有什么好处呢?
- 提供数据冗余。即使系统部分组件失效,系统依然能够继续运转,因而增加了整体可用性以及数据持久性。
- 提供高伸缩性。支持横向扩展,能够通过增加机器的方式来提升读性能,进而提高读操作吞吐量。
- 改善数据局部性。允许将数据放入与用户地理位置相近的地方,从而降低系统延时。
这些优点都是在分布式系统教科书中最常被提及的,但是有些遗憾的是,对于 Apache Kafka 而言,目前只能享受到副本机制带来的第 1 个好处,也就是提供数据冗余实现高可用性和高持久性。
副本定义
所谓副本(Replica),本质就是一个只能追加写消息的提交日志。根据 Kafka 副本机制的定义,同一个分区下的所有副本保存有相同的消息序列,这些副本分散保存在不同的 Broker 上,从而能够对抗部分 Broker 宕机带来的数据不可用。
副本角色
既然分区下能够配置多个副本,而且这些副本的内容还要一致,那么很自然的一个问题就是:我们该如何确保副本中所有的数据都是一致的呢?特别是对 Kafka 而言,当生产者发送消息到某个主题后,消息是如何同步到对应的所有副本中的呢?针对这个问题,最常见的解决方案就是采用基于领导者(Leader-based)的副本机制。Apache Kafka 就是这样的设计。
第一,在 Kafka 中,副本分成两类:领导者副本(Leader Replica)和追随者副本(Follower Replica)。每个分区在创建时都要选举一个副本,称为领导者副本,其余的副本自动称为追随者副本。
第二,Kafka 的副本机制比其他分布式系统要更严格一些。在 Kafka 中,追随者副本是不对外提供服务的。这就是说,任何一个追随者副本都不能响应消费者和生产者的读写请求。所有的请求都必须由领导者副本来处理,或者说,所有的读写请求都必须发往领导者副本所在的 Broker,由该 Broker 负责处理。追随者副本不处理客户端请求,它唯一的任务就是从领导者副本异步拉取消息,并写入到自己的提交日志中,从而实现与领导者副本的同步。
第三,当领导者副本挂掉了,或者说领导者副本所在的 Broker 宕机时,Kafka 依托于 ZooKeeper 提供的监控功能能够实时感知到,并立即开启新一轮的领导者选举,从追随者副本中选一个作为新的领导者。老 Leader 副本重启回来后,只能作为追随者副本加入到集群中。
1.方便实现“Read-your-writes”
所谓 Read-your-writes,顾名思义就是,当你使用生产者 API 向 Kafka 成功写入消息后,马上使用消费者 API 去读取刚才生产的消息。
举个例子,比如你平时发微博时,你发完一条微博,肯定是希望能立即看到的,这就是典型的 Read-your-writes 场景。如果允许追随者副本对外提供服务,由于副本同步是异步的,因此有可能出现追随者副本还没有从领导者副本那里拉取到最新的消息,从而使得客户端看不到最新写入的消息。
2.方便实现单调读(Monotonic Reads)
就是对于一个消费者用户而言,在多次消费消息时,它不会看到某条消息一会儿存在一会儿不存在。
如果允许追随者副本提供读服务,那么假设当前有 2 个追随者副本 F1 和 F2,它们异步地拉取领导者副本数据。倘若 F1 拉取了 Leader 的最新消息而 F2 还未及时拉取,那么,此时如果有一个消费者先从 F1 读取消息之后又从 F2 拉取消息,它可能会看到这样的现象:第一次消费时看到的最新消息在第二次消费时不见了,这就不是单调读一致性。但是,如果所有的读请求都是由 Leader 来处理,那么 Kafka 就很容易实现单调读一致性。
In-sync Replicas(ISR)
ISR 中的副本都是与 Leader 同步的副本,相反,不在 ISR 中的追随者副本就被认为是与 Leader 不同步的。
我们首先要明确的是,Leader 副本天然就在 ISR 中。也就是说,ISR 不只是追随者副本集合,它必然包括 Leader 副本。甚至在某些情况下,ISR 只有 Leader 这一个副本。
这个标准就是 Broker 端参数 replica.lag.time.max.ms 参数值。这个参数的含义是 Follower 副本能够落后 Leader 副本的最长时间间隔,当前默认值是 10 秒。这就是说,只要一个 Follower 副本落后 Leader 副本的时间不连续超过 10 秒,那么 Kafka 就认为该 Follower 副本与 Leader 是同步的,即使此时 Follower 副本中保存的消息明显少于 Leader 副本中的消息。
Unclean 领导者选举(Unclean Leader Election)
Kafka 把所有不在 ISR 中的存活副本都称为非同步副本。通常来说,非同步副本落后 Leader 太多,因此,如果选择这些副本作为新 Leader,就可能出现数据的丢失。毕竟,这些副本中保存的消息远远落后于老 Leader 中的消息。在 Kafka 中,选举这种副本的过程称为 Unclean 领导者选举。Broker 端参数 unclean.leader.election.enable 控制是否允许 Unclean 领导者选举。
如果你听说过 CAP 理论的话,你一定知道,一个分布式系统通常只能同时满足一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)中的两个。显然,在这个问题上,Kafka 赋予你选择 C 或 A 的权利。
请求是怎么被处理的?
无论是 Kafka 客户端还是 Broker 端,它们之间的交互都是通过“请求 / 响应”的方式完成的。比如,客户端会通过网络发送消息生产请求给 Broker,而 Broker 处理完成后,会发送对应的响应给到客户端。
Apache Kafka 自己定义了一组请求协议,用于实现各种各样的交互操作。比如常见的 PRODUCE 请求是用于生产消息的,FETCH 请求是用于消费消息的,METADATA 请求是用于请求 Kafka 集群元数据信息的。
处理请求的 2 种常见方案
1.顺序处理请求
1 | while (true) { |
这个方法实现简单,但是有个致命的缺陷,那就是吞吐量太差。由于只能顺序处理每个请求,因此,每个请求都必须等待前一个请求处理完毕才能得到处理。这种方式只适用于请求发送非常不频繁的系统。
2.每个请求使用单独线程处理
1 | while (true) { |
这个方法反其道而行之,完全采用异步的方式。系统会为每个入站请求都创建单独的线程来处理。这个方法的好处是,它是完全异步的,每个请求的处理都不会阻塞下一个请求。但缺陷也同样明显。为每个请求都创建线程的做法开销极大,在某些场景下甚至会压垮整个服务。还是那句话,这个方法只适用于请求发送频率很低的业务场景。
既然这两种方案都不好,那么,Kafka 是如何处理请求的呢?用一句话概括就是,Kafka 使用的是 Reactor 模式。
Kafka 是如何处理请求的?
简单来说,Reactor 模式是事件驱动架构的一种实现方式,特别适合应用于处理多个客户端并发向服务器端发送请求的场景。我借用 Doug Lea 的一页 PPT 来说明一下 Reactor 的架构,并借此引出 Kafka 的请求处理模型。
从这张图中,我们可以发现,多个客户端会发送请求给到 Reactor。Reactor 有个请求分发线程 Dispatcher,也就是图中的 Acceptor,它会将不同的请求下发到多个工作线程中处理。
在这个架构中,Acceptor 线程只是用于请求分发,不涉及具体的逻辑处理,非常得轻量级,因此有很高的吞吐量表现。而这些工作线程可以根据实际业务处理需要任意增减,从而动态调节系统负载能力。
如果我们来为 Kafka 画一张类似的图的话,那它应该是这个样子的:
显然,这两张图长得差不多。Kafka 的 Broker 端有个 SocketServer 组件,类似于 Reactor 模式中的 Dispatcher,它也有对应的 Acceptor 线程和一个工作线程池,只不过在 Kafka 中,这个工作线程池有个专属的名字,叫网络线程池。Kafka 提供了 Broker 端参数 num.network.threads,用于调整该网络线程池的线程数。其默认值是 3,表示每台 Broker 启动时会创建 3 个网络线程,专门处理客户端发送的请求。
Acceptor 线程采用轮询的方式将入站请求公平地发到所有网络线程中,因此,在实际使用过程中,这些线程通常都有相同的几率被分配到待处理请求。这种轮询策略编写简单,同时也避免了请求处理的倾斜,有利于实现较为公平的请求处理调度。
当网络线程拿到请求后,它不是自己处理,而是将请求放入到一个共享请求队列中。Broker 端还有个 IO 线程池,负责从该队列中取出请求,执行真正的处理。如果是 PRODUCE 生产请求,则将消息写入到底层的磁盘日志中;如果是 FETCH 请求,则从磁盘或页缓存中读取消息。
IO 线程池处中的线程才是执行请求逻辑的线程。Broker 端参数 num.io.threads 控制了这个线程池中的线程数。目前该参数默认值是 8,表示每台 Broker 启动后自动创建 8 个 IO 线程处理请求。你可以根据实际硬件条件设置此线程池的个数。
当 IO 线程处理完请求后,会将生成的响应发送到网络线程池的响应队列中,然后由对应的网络线程负责将 Response 返还给客户端。
请求队列是所有网络线程共享的,而响应队列则是每个网络线程专属的。这么设计的原因就在于,Dispatcher 只是用于请求分发而不负责响应回传,因此只能让每个网络线程自己发送 Response 给客户端,所以这些 Response 也就没必要放在一个公共的地方。
我们再来看看刚刚的那张图,图中有一个叫 Purgatory 的组件,这是 Kafka 中著名的“炼狱”组件。它是用来缓存延时请求(Delayed Request)的。所谓延时请求,就是那些一时未满足条件不能立刻处理的请求。比如设置了 acks=all 的 PRODUCE 请求,一旦设置了 acks=all,那么该请求就必须等待 ISR 中所有副本都接收了消息后才能返回,此时处理该请求的 IO 线程就必须等待其他 Broker 的写入结果。当请求不能立刻处理时,它就会暂存在 Purgatory 中。稍后一旦满足了完成条件,IO 线程会继续处理该请求,并将 Response 放入对应网络线程的响应队列中。
控制类请求和数据类请求分离
Kafka 社区把 PRODUCE 和 FETCH 这类请求称为数据类请求,把 LeaderAndIsr、StopReplica 这类请求称为控制类请求。细究起来,当前这种一视同仁的处理方式对控制类请求是不合理的。为什么呢?因为控制类请求有这样一种能力:它可以直接令数据类请求失效!
我来举个例子说明一下。假设我们有个主题只有 1 个分区,该分区配置了两个副本,其中 Leader 副本保存在 Broker 0 上,Follower 副本保存在 Broker 1 上。假设 Broker 0 这台机器积压了很多的 PRODUCE 请求,此时你如果使用 Kafka 命令强制将该主题分区的 Leader、Follower 角色互换,那么 Kafka 内部的控制器组件(Controller)会发送 LeaderAndIsr 请求给 Broker 0,显式地告诉它,当前它不再是 Leader,而是 Follower 了,而 Broker 1 上的 Follower 副本因为被选为新的 Leader,因此停止向 Broker 0 拉取消息。
这时,一个尴尬的场面就出现了:如果刚才积压的 PRODUCE 请求都设置了 acks=all,那么这些在 LeaderAndIsr 发送之前的请求就都无法正常完成了。就像前面说的,它们会被暂存在 Purgatory 中不断重试,直到最终请求超时返回给客户端。
设想一下,如果 Kafka 能够优先处理 LeaderAndIsr 请求,Broker 0 就会立刻抛出 NOT_LEADER_FOR_PARTITION 异常,快速地标识这些积压 PRODUCE 请求已失败,这样客户端不用等到 Purgatory 中的请求超时就能立刻感知,从而降低了请求的处理时间。
基于这些问题,社区于 2.3 版本正式实现了数据类请求和控制类请求的分离。其实,在社区推出方案之前,我自己尝试过修改这个设计。
那么,社区是如何解决的呢?很简单,你可以再看一遍今天的第三张图,社区完全拷贝了这张图中的一套组件,实现了两类请求的分离。也就是说,Kafka Broker 启动后,会在后台分别创建两套网络线程池和 IO 线程池的组合,它们分别处理数据类请求和控制类请求。至于所用的 Socket 端口,自然是使用不同的端口了,你需要提供不同的 listeners 配置,显式地指定哪套端口用于处理哪类请求。
消费者组重平衡全流程解析
它的作用是让组内所有的消费者实例就消费哪些主题分区达成一致。重平衡需要借助 Kafka Broker 端的 Coordinator 组件,在 Coordinator 的帮助下完成整个消费者组的分区重分配。
触发与通知
重平衡的 3 个触发条件:
- 组成员数量发生变化。
- 订阅主题数量发生变化。
- 订阅主题的分区数发生变化。
重平衡过程是如何通知到其他消费者实例的?答案就是,靠消费者端的心跳线程(Heartbeat Thread)。
重平衡的通知机制正是通过心跳线程来完成的。当协调者决定开启新一轮重平衡后,它会将“REBALANCE_IN_PROGRESS”封装进心跳请求的响应中,发还给消费者实例。当消费者实例发现心跳响应中包含了“REBALANCE_IN_PROGRESS”,就能立马知道重平衡又开始了,这就是重平衡的通知机制。
费者端参数 heartbeat.interval.ms 的真实用途,我来解释一下。从字面上看,它就是设置了心跳的间隔时间,但这个参数的真正作用是控制重平衡通知的频率。如果你想要消费者实例更迅速地得到通知,那么就可以给这个参数设置一个非常小的值,这样消费者就能更快地感知到重平衡已经开启了。
消费者组状态机
重平衡一旦开启,Broker 端的协调者组件就要开始忙了,主要涉及到控制消费者组的状态流转。当前,Kafka 设计了一套消费者组状态机(State Machine),来帮助协调者完成整个重平衡流程。严格来说,这套状态机属于非常底层的设计,Kafka 官网上压根就没有提到过,但你最好还是了解一下,因为它能够帮助你搞懂消费者组的设计原理,比如消费者组的过期位移(Expired Offsets)删除等。
目前,Kafka 为消费者组定义了 5 种状态,它们分别是:Empty、Dead、PreparingRebalance、CompletingRebalance 和 Stable。
了解了这些状态的含义之后,我们来看一张图片,它展示了状态机的各个状态流转。
我来解释一下消费者组启动时的状态流转过程。一个消费者组最开始是 Empty 状态,当重平衡过程开启后,它会被置于 PreparingRebalance 状态等待成员加入,之后变更到 CompletingRebalance 状态等待分配方案,最后流转到 Stable 状态完成重平衡。
当有新成员加入或已有成员退出时,消费者组的状态从 Stable 直接跳到 PreparingRebalance 状态,此时,所有现存成员就必须重新申请加入组。当所有成员都退出组后,消费者组状态变更为 Empty。Kafka 定期自动删除过期位移的条件就是,组要处于 Empty 状态。因此,如果你的消费者组停掉了很长时间(超过 7 天),那么 Kafka 很可能就把该组的位移数据删除了。
Kafka 在尝试定期删除过期位移。现在你知道了,只有 Empty 状态下的组,才会执行过期位移删除的操作。
消费者端重平衡流程
在消费者端,重平衡分为两个步骤:分别是加入组和等待领导者消费者(Leader Consumer)分配方案。这两个步骤分别对应两类特定的请求:JoinGroup 请求和 SyncGroup 请求。
当组内成员加入组时,它会向协调者发送 JoinGroup 请求。在该请求中,每个成员都要将自己订阅的主题上报,这样协调者就能收集到所有成员的订阅信息。一旦收集了全部成员的 JoinGroup 请求后,协调者会从这些成员中选择一个担任这个消费者组的领导者。
通常情况下,第一个发送 JoinGroup 请求的成员自动成为领导者。你一定要注意区分这里的领导者和之前我们介绍的领导者副本,它们不是一个概念。这里的领导者是具体的消费者实例,它既不是副本,也不是协调者。领导者消费者的任务是收集所有成员的订阅信息,然后根据这些信息,制定具体的分区消费分配方案。
选出领导者之后,协调者会把消费者组订阅信息封装进 JoinGroup 请求的响应体中,然后发给领导者,由领导者统一做出分配方案后,进入到下一步:发送 SyncGroup 请求。
在这一步中,领导者向协调者发送 SyncGroup 请求,将刚刚做出的分配方案发给协调者。值得注意的是,其他成员也会向协调者发送 SyncGroup 请求,只不过请求体中并没有实际的内容。这一步的主要目的是让协调者接收分配方案,然后统一以 SyncGroup 响应的方式分发给所有成员,这样组内所有成员就都知道自己该消费哪些分区了。
JoinGroup 请求的处理过程。
JoinGroup 请求的主要作用是将组成员订阅信息发送给领导者消费者,待领导者制定好分配方案后,重平衡流程进入到 SyncGroup 请求阶段。
SyncGroup 请求的处理流程。
SyncGroup 请求的主要目的,就是让协调者把领导者制定的分配方案下发给各个组内成员。当所有成员都成功接收到分配方案后,消费者组进入到 Stable 状态,即开始正常的消费工作。
消费者端的重平衡流程我已经介绍完了。接下来,我们从协调者端来看一下重平衡是怎么执行的。
Broker 端重平衡场景剖析
要剖析协调者端处理重平衡的全流程,我们必须要分几个场景来讨论。这几个场景分别是新成员加入组、组成员主动离组、组成员崩溃离组、组成员提交位移。
场景一:新成员入组。
新成员入组是指组处于 Stable 状态后,有新成员加入。如果是全新启动一个消费者组,Kafka 是有一些自己的小优化的,流程上会有些许的不同。我们这里讨论的是,组稳定了之后有新成员加入的情形。
当协调者收到新的 JoinGroup 请求后,它会通过心跳请求响应的方式通知组内现有的所有成员,强制它们开启新一轮的重平衡。具体的过程和之前的客户端重平衡流程是一样的。
场景二:组成员主动离组。
就是指消费者实例所在线程或进程调用 close() 方法主动通知协调者它要退出。这个场景就涉及到了第三类请求:LeaveGroup 请求。协调者收到 LeaveGroup 请求后,依然会以心跳响应的方式通知其他成员。
场景三:组成员崩溃离组。
崩溃离组是指消费者实例出现严重故障,突然宕机导致的离组。它和主动离组是有区别的,因为后者是主动发起的离组,协调者能马上感知并处理。但崩溃离组是被动的,协调者通常需要等待一段时间才能感知到,这段时间一般是由消费者端参数 session.timeout.ms 控制的。也就是说,Kafka 一般不会超过 session.timeout.ms 就能感知到这个崩溃。
场景四:重平衡时协调者对组内成员提交位移的处理。
正常情况下,每个组内成员都会定期汇报位移给协调者。当重平衡开启时,协调者会给予成员一段缓冲时间,要求每个成员必须在这段时间内快速地上报自己的位移信息,然后再开启正常的 JoinGroup/SyncGroup 请求发送。
Kafka控制器
1作用:
控制器组件(Controller),是Apache Kafka的核心组件。它的主要作用是Apache Zookeeper的帮助下管理和协调整个Kafka集群。
集群中任意一台Broker都能充当控制器的角色,但在运行过程中,只能有一个Broker成为控制器。
2 特点:控制器是重度依赖Zookeeper。
3 产生:
控制器是被选出来的,Broker在启动时,会尝试去Zookeeper中创建/controller节点。Kafka当前选举控制器的规则是:第一个成功创建/controller节点的Broker会被指定为控制器。
4 功能:
A :主题管理(创建,删除,增加分区)
当执行kafka-topics脚本时,大部分的后台工作都是控制器来完成的。
B :分区重分配
Kafka-reassign-partitions脚本提供的对已有主题分区进行细粒度的分配功能。
C :Preferred领导者选举
Preferred领导者选举主要是Kafka为了避免部分Broker负载过重而提供的一种换Leade的方案。
D :集群成员管理(新增Broker,Broker主动关闭,Broker宕机)
控制器组件会利用watch机制检查Zookeeper的/brokers/ids节点下的子节点数量变更。当有新Broker启动后,它会在/brokers下创建专属的znode节点。一旦创建完毕,Zookeeper会通过Watch机制将消息通知推送给控制器,这样,控制器就能自动地感知到这个变化。进而开启后续新增Broker作业。
侦测Broker存活性则是依赖于刚刚提到的另一个机制:临时节点。每个Broker启动后,会在/brokers/ids下创建一个临时的znode。当Broker宕机或主机关闭后,该Broker与Zookeeper的会话结束,这个znode会被自动删除。同理,Zookeeper的Watch机制将这一变更推送给控制器,这样控制器就能知道有Broker关闭或宕机了,从而进行善后。
E :数据服务
控制器上保存了最全的集群元数据信息,其他所有Broker会定期接收控制器发来的元数据更新请求,从而更新其内存中的缓存数据。
5 控制器保存的数据
控制器中保存的这些数据在Zookeeper中也保存了一份。每当控制器初始化时,它都会从Zookeeper上读取对应的元数据并填充到自己的缓存中。
6 控制器故障转移(Failover)
故障转移是指:当运行中的控制器突然宕机或意外终止时,Kafka能够快速地感知到,并立即启用备用控制器来替代之前失败的控制器。
7 内部设计原理
A :控制器的内部设计相当复杂
控制器是多线程的设计,会在内部创建很多线程。如:
(1)为每个Broker创建一个对应的Socket连接,然后在创建一个专属的线程,用于向这些Broker发送特定的请求。
(2)控制连接zookeeper,也会创建单独的线程来处理Watch机制通知回调。
(3)控制器还会为主题删除创建额外的I/O线程。
这些线程还会访问共享的控制器缓存数据,为了维护数据安全性,控制在代码中大量使用ReetrantLock同步机制,进一步拖慢了整个控制器的处理速度。
B :在0.11版对控制器的低沉设计进了重构。
(1)最大的改进是:把多线程的方案改成了单线程加事件对列的方案。
a. 单线程+队列的实现方式:社区引入了一个事件处理线程,统一处理各种控制器事件,然后控制器将原来执行的操作全部建模成一个个独立的事件,发送到专属的事件队列中,供此线程消费。
b. 单线程不代表之前提到的所有线程都被干掉了,控制器只是把缓存状态变更方面的工作委托给了这个线程而已。
(2)第二个改进:将之前同步操作Zookeeper全部改为异步操作。
a. Zookeeper本身的API提供了同步写和异步写两种方式。同步操作zk,在有大量主题分区发生变更时,Zookeeper容易成为系统的瓶颈。
关于高水位和Leader Epoch的讨论
你可能听说过高水位(High Watermark),但不一定耳闻过 Leader Epoch。前者是 Kafka 中非常重要的概念,而后者是社区在 0.11 版本中新推出的,主要是为了弥补高水位机制的一些缺陷。
什么是高水位?
教科书中关于水位的经典定义通常是这样的:
在时刻 T,任意创建时间(Event Time)为 T’,且 T’≤T 的所有事件都已经到达或被观测到,那么 T 就被定义为水位。
在 Kafka 的世界中,水位的概念有一点不同。Kafka 的水位不是时间戳,更与时间无关。它是和位置信息绑定的,具体来说,它是用消息位移来表征的。另外,Kafka 源码使用的表述是高水位,因此,今天我也会统一使用“高水位”或它的缩写 HW 来进行讨论。值得注意的是,Kafka 中也有低水位(Low Watermark),它是与 Kafka 删除消息相关联的概念。
高水位的作用
- 定义消息可见性,即用来标识分区下的哪些消息是可以被消费者消费的。
- 帮助 Kafka 完成副本同步。
在分区高水位以下的消息被认为是已提交消息,反之就是未提交消息。消费者只能消费已提交消息,即图中位移小于 8 的所有消息。注意,这里我们不讨论 Kafka 事务,因为事务机制会影响消费者所能看到的消息的范围,它不只是简单依赖高水位来判断。它依靠一个名为 LSO(Log Stable Offset)的位移值来判断事务型消费者的可见性。
这也从侧面告诉了我们一个重要的事实,那就是:同一个副本对象,其高水位值不会大于 LEO 值。
高水位和 LEO 是副本对象的两个重要属性。Kafka 所有副本都有对应的高水位和 LEO 值,而不仅仅是 Leader 副本。只不过 Leader 副本比较特殊,Kafka 使用 Leader 副本的高水位来定义所在分区的高水位。换句话说,分区的高水位就是其 Leader 副本的高水位。
高水位更新机制
每个副本对象都保存了一组高水位值和 LEO 值,但实际上,在 Leader 副本所在的 Broker 上,还保存了其他 Follower 副本的 LEO 值。我们一起来看看下面这张图。
在这张图中,我们可以看到,Broker 0 上保存了某分区的 Leader 副本和所有 Follower 副本的 LEO 值,而 Broker 1 上仅仅保存了该分区的某个 Follower 副本。Kafka 把 Broker 0 上保存的这些 Follower 副本又称为远程副本(Remote Replica)。Kafka 副本机制在运行过程中,会更新 Broker 1 上 Follower 副本的高水位和 LEO 值,同时也会更新 Broker 0 上 Leader 副本的高水位和 LEO 以及所有远程副本的 LEO,但它不会更新远程副本的高水位值,也就是我在图中标记为灰色的部分。
为什么要在 Broker 0 上保存这些远程副本呢?其实,它们的主要作用是,帮助 Leader 副本确定其高水位,也就是分区高水位。
更新机制如下。
什么叫与 Leader 副本保持同步。判断的条件有两个。
- 该远程 Follower 副本在 ISR 中。
- 该远程 Follower 副本 LEO 值落后于 Leader 副本 LEO 值的时间,不超过 Broker 端参数 replica.lag.time.max.ms 的值。如果使用默认值的话,就是不超过 10 秒。
我们分别从 Leader 副本和 Follower 副本两个维度,来总结一下高水位和 LEO 的更新机制。
Leader 副本
处理生产者请求的逻辑如下:
1.写入消息到本地磁盘。
2.更新分区高水位值。
- i. 获取 Leader 副本所在 Broker 端保存的所有远程副本 LEO 值(LEO-1,LEO-2,……,LEO-n)。
- ii. 获取 Leader 副本高水位值:currentHW。
- iii. 更新 currentHW = max{currentHW, min(LEO-1, LEO-2, ……,LEO-n)}。
处理 Follower 副本拉取消息的逻辑如下:
1.读取磁盘(或页缓存)中的消息数据。
2.使用 Follower 副本发送请求中的位移值更新远程副本 LEO 值。
3.更新分区高水位值(具体步骤与处理生产者请求的步骤相同)。
Follower 副本
从 Leader 拉取消息的处理逻辑如下:
1.写入消息到本地磁盘。
2.更新 LEO 值。
3.更新高水位值。
- i. 获取 Leader 发送的高水位值:currentHW。
- ii. 获取步骤 2 中更新过的 LEO 值:currentLEO。
- iii. 更新高水位为 min(currentHW, currentLEO)。
副本同步机制解析
Leader Epoch 登场
从刚才的分析中,我们知道,Follower 副本的高水位更新需要一轮额外的拉取请求才能实现。如果把上面那个例子扩展到多个 Follower 副本,情况可能更糟,也许需要多轮拉取请求。也就是说,Leader 副本高水位更新和 Follower 副本高水位更新在时间上是存在错配的。这种错配是很多“数据丢失”或“数据不一致”问题的根源。
所谓 Leader Epoch,我们大致可以认为是 Leader 版本。它由两部分数据组成。
- Epoch。一个单调增加的版本号。每当副本领导权发生变更时,都会增加该版本号。小版本号的 Leader 被认为是过期 Leader,不能再行使 Leader 权力。
- 起始位移(Start Offset)。Leader 副本在该 Epoch 值上写入的首条消息的位移。
Kafka Broker 会在内存中为每个分区都缓存 Leader Epoch 数据,同时它还会定期地将这些信息持久化到一个 checkpoint 文件中。当 Leader 副本写入消息到磁盘时,Broker 会尝试更新这部分缓存。如果该 Leader 是首次写入消息,那么 Broker 会向缓存中增加一个 Leader Epoch 条目,否则就不做更新。这样,每次有 Leader 变更时,新的 Leader 副本会查询这部分缓存,取出对应的 Leader Epoch 的起始位移,以避免数据丢失和不一致的情况。
我稍微解释一下,单纯依赖高水位是怎么造成数据丢失的。开始时,副本 A 和副本 B 都处于正常状态,A 是 Leader 副本。某个使用了默认 acks 设置的生产者程序向 A 发送了两条消息,A 全部写入成功,此时 Kafka 会通知生产者说两条消息全部发送成功。
现在我们假设 Leader 和 Follower 都写入了这两条消息,而且 Leader 副本的高水位也已经更新了,但 Follower 副本高水位还未更新——这是可能出现的。还记得吧,Follower 端高水位的更新与 Leader 端有时间错配。倘若此时副本 B 所在的 Broker 宕机,当它重启回来后,副本 B 会执行日志截断操作,将 LEO 值调整为之前的高水位值,也就是 1。这就是说,位移值为 1 的那条消息被副本 B 从磁盘中删除,此时副本 B 的底层磁盘文件中只保存有 1 条消息,即位移值为 0 的那条消息。
当执行完截断操作后,副本 B 开始从 A 拉取消息,执行正常的消息同步。如果就在这个节骨眼上,副本 A 所在的 Broker 宕机了,那么 Kafka 就别无选择,只能让副本 B 成为新的 Leader,此时,当 A 回来后,需要执行相同的日志截断操作,即将高水位调整为与 B 相同的值,也就是 1。这样操作之后,位移值为 1 的那条消息就从这两个副本中被永远地抹掉了。这就是这张图要展示的数据丢失场景。
严格来说,这个场景发生的前提是 Broker 端参数 min.insync.replicas 设置为 1。此时一旦消息被写入到 Leader 副本的磁盘,就会被认为是“已提交状态”,但现有的时间错配问题导致 Follower 端的高水位更新是有滞后的。如果在这个短暂的滞后时间窗口内,接连发生 Broker 宕机,那么这类数据的丢失就是不可避免的。
场景和之前大致是类似的,只不过引用 Leader Epoch 机制后,Follower 副本 B 重启回来后,需要向 A 发送一个特殊的请求去获取 Leader 的 LEO 值。在这个例子中,该值为 2。当获知到 Leader LEO=2 后,B 发现该 LEO 值不比它自己的 LEO 值小,而且缓存中也没有保存任何起始位移值 > 2 的 Epoch 条目,因此 B 无需执行任何日志截断操作。这是对高水位机制的一个明显改进,即副本是否执行日志截断不再依赖于高水位进行判断。
现在,副本 A 宕机了,B 成为 Leader。同样地,当 A 重启回来后,执行与 B 相同的逻辑判断,发现也不用执行日志截断,至此位移值为 1 的那条消息在两个副本中均得到保留。后面当生产者程序向 B 写入新消息时,副本 B 所在的 Broker 缓存中,会生成新的 Leader Epoch 条目:[Epoch=1, Offset=2]。之后,副本 B 会使用这个条目帮助判断后续是否执行日志截断操作。这样,通过 Leader Epoch 机制,Kafka 完美地规避了这种数据丢失场景。
本文链接: http://woaixiaoyuyu.github.io/2021/08/15/kafka%E5%AD%A6%E4%B9%A0%E8%AE%B0%E5%BD%95/
版权声明: 本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。转载请注明出处!