简介
本文介绍如何处理RabbitMQ消息堆积(积压)。
对于消息队列(MQ)来说,消息丢失/消息重复/消费顺序/消息堆积(积压)是比较常见的问题,都属于消息异常,这几个问题比较重要,面试中也会经常问到。
消息堆积原因
- 消息堆积即消息没及时被消费,是生产者生产消息速度快于消费者消费的速度导致的。
- 消费者消费慢可能是因为:本身逻辑耗费时间较长、阻塞了。
预防措施
生产者
- 减少发布频率
- 考虑使用队列最大长度限制
消费者
- 增加消费者的处理能力 //优化代码;使用JDK的队列缓存数据,多线程去处理(若考虑顺序问题,就采用单例线程)
- 默认情况下,rabbitmq消费者为单线程串行消费(org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer类的concurrentConsumers与txSize(对应prefetchCount)都是1),设置并发消费两个关键属性concurrentConsumers和prefetchCount。concurrentConsumers:设置的是对每个listener在初始化的时候设置的并发消费者的个数;prefetchCount:每次从broker里面取的待消费的消息的个数。
配置方法:修改application.properties:spring.rabbitmq.listener.concurrency=m spring.rabbitmq.listener.prefetch=n
Spring Amqp的解释:prefetchCount(prefetch) The number of messages to accept from the broker in one socket frame. The higher this is the faster the messages can be delivered, but the higher the risk of non-sequential processing. Ignored if the acknowledgeMode is NONE. This will be increased, if necessary, to match the txSize concurrentConsumers(concurrency) The number of concurrent consumers to initially start for each listener.
综合(使用缓存)
- 生产者端缓存数据,在mq被消费完后再发送到mq,打破发送循环条件。设置合适的qos值(channel.BasicQos()方法:每次从队列拉取的消息数量),当qos值被用光,而新的ack没有被mq接收时,就可以跳出发送循环,去接收新的消息。
- 消费者主动block接收进程,消费者感受到接收消息过快时主动block,利用block和unblock方法调节接收速率,当接收线程被block时,跳出发送循环。
已出事故的解决措施
情况1:堆积的消息还需要使用
方案1:简单修复
修复consumer的问题,让他恢复消费速度,然后等待几个小时消费完毕
方案2:复杂修复
临时紧急扩容了,具体操作步骤和思路如下:
1)先修复consumer的问题,确保其恢复消费速度
2)新建一个topic,partition是原来的10倍,临时建立好原先10倍或者20倍的queue数量
3)然后写一个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的10倍数量的queue
4)接着临时征用10倍的机器来部署consumer,每一批consumer消费一个临时queue的数据
5)这种做法相当于是临时将queue资源和consumer资源扩大10倍,以正常的10倍速度来消费数据
6)等快速消费完积压数据之后,得恢复原先部署架构,重新用原先的consumer机器来消费消息
情况2:堆积的消息不需要使用
删除消息即可。(可以在RabbitMQ控制台删除,或者使用命令)。
请先
!