本文主要总结一些 Kafka 作为消息队列使用时,当线上出现消息大量积压后的处理方法以及平时使用时应该注意的点。一般从三个方向排查分析问题,消费者的问题、分区设计的问题和网络问题。
消费者导致
大部分消息积压基本都是消费者出现了状况,消费者处理的速度慢、或者消费者出现了故障。确认消费者出现问题后,我们首先要做的就是恢复或者提高消费速度。
提高消费速度
一般来说,我们会对消息队列里面消息数量做监控,当消息到达一定阈值的时候,自动化地去对消费者的数量扩容。但是因为 Kafka 的 Topic 的 Partition 数量是有限制的,当消费者的数量扩容到 Partition 数量之后,再对消费者数量进行增加就没有作用了。如果这时并没有解决消费者速度的问题,我们需要能够快速把消费的能力再放大,相应的步骤如下:
- 问题修复: 先解决消费者的问题,以恢复消费速度,随后停止所有消费者。
- 新建 Topic: 创建一个新 Topic,其 Partition 数量是原先的 10 倍,并且创建相应数量的队列。一般不直接对原 Topic 的分区进行调整,因为分区增加之后没有办法去缩小。
- 数据分发: 编写并部署一个临时的消费者程序,此程序将积压的数据消费后,不进行耗时处理,而是均匀分发至新创建的队列中。
- 并发消费: 临时增加 10 倍的消费者机器,每一批消费者消费一个临时队列的数据,相当于以原本的10倍速度消费数据。
- 架构恢复: 在积压数据消费完后,恢复原始架构,重新用原先的消费者机器来消费消息。
上面的方法能够快速地解决线上堆积了大量的消息。除此之外,我们对消费者的设计也会影响到消费的速率。
其他的注意点
控制消费者获取消息的数量
Kafka 支持批量消费数据。在消费者端,可以通过修改配置参数来改变每次从 Kafka 服务器获取的消息数量。
这主要通过以下两个配置参数控制:
-
fetch.min.bytes
: 消费者从服务器获取的最小数据,如果不满足这个值,消费者会等待直到满足此值。 -
fetch.max.wait.ms
: 消费者等待 Kafka 服务端积累数据的最长时间。
还有一个与之相关的参数: max.poll.records
:它能够控制消费者每次调用 poll() 时返回的最大记录数。
以上三个参数都是在实时性和吞吐量之间进行权衡的,你可以根据业务需求,对这些参数进行适当的调整。在调整这些参数之前,最好先在测试环境中进行性能测试,以确定最佳的参数设置。
- 如果你的业务场景对实时性要求较高,即希望消息一生成就能尽快被消费,那么
fetch.min.bytes
可以设置得较小,fetch.max.wait.ms
也可以设置得较小,这样可以确保消费者能够更快地获取到新的消息。 - 如果你的业务场景对吞吐量要求较高,即希望消费者能够以较高的速度处理大量的消息,那么
fetch.min.bytes
可以设置得较大,fetch.max.wait.ms
也可以设置得较大,这样可以通过增加每次获取的消息数量,提高消费者的处理效率。 -
max.poll.records
参数用于控制每次调用poll()时返回的最大记录数。如果你希望消费者每次能处理更多的消息,可以增大这个值。但是,这个值的增大可能会增加消费者处理消息的延迟。
优化消费者的处理逻辑
对于消费者的处理代码,优化其代码逻辑。有以下的点可以考虑:
- 是否能够使用多线程来加快业务逻辑的处理速度;
- 是否能够减少数据库的调用次数;
- 是否能够减少下游 PRC 调用的次数。
分区设计不合理导致
观察 Kafka 的消费在不同的 Partition 上的消息堆积情况,如果消息主要分布在部分 Parition 上,那么我们也许要确认是否是分区设计不合理导致。如果是的话,可能需要重新设计分区策略。比如,考虑使用更细粒度的key进行分区,或者增加总的分区数。
网络带宽瓶颈导致
如果网络带宽不足,可能导致消费者无法及时拉取数据。如果是硬件或者网络瓶颈,可以考虑扩容硬件或者升级网络。另外开启消息压缩,可以减少网络传输和磁盘存储的压力,通过 compression.type
可以设置压缩算法。