消息队列MQ常见问题和解决方案

发布时间:2022-04-03 15:32:35 作者:yexindonglai@163.com 阅读(2348)

一、 前言

本文章主要讲解消息队列的场景、产生的问题、解决方案这几个方面着手,提供一套完整的解决方案,主要解决以下几个问题

  • 消息队列是什么?
  • 为什么需要消息队列?
  • 消息队列会产生哪些问题?
  • 如何解决这些问题?

以下问题和解决方案都由博主长期开发得来的经验,相对来说比较通用,也就是大部分人都认可的方案,;当然,具体情况还得具体分析,市面上的解决方案也都五花八门,但是万变不离其宗,一项技术的出现都是为了解决某个问题而存在的;另外,本文主要讲解的是方案的思路和想法,不涉及任何代码;

1、消息队列是什么

消息队列是应用之间的通讯方式主要是由三部分组成,分别是:

  1. 生产者(producer):负责生产消息,并发送给消息服务器。它是整个消息的发起方
  2. 消息服务(broker,也被称为中转站):消息中间件的服务端MQ,接收并处理消息,负责消息的存储和投递,是整个消息系统中最核心的部分;
  3. 消费者(comsumer):负责消息的消费,根据消息的内容去处理各种业务逻辑;

2、为什么需要消息队列

说到消息队列,大部分人都能联想到那六个字:解耦异步削峰;旨在高并发情况下出现的阻塞问题;那这6个字都是什么意思呢?我们一个个地解答:

  1. 解耦:可以将一些相关的但是偶尔度又不太高的系统关联起来,比如订单系统和优惠券系统,有关联度,但是又不那么紧密;还可以将不同语言之间的 项目做通讯,比如订单系统是java开发的,优惠券系统是Go语言开发的,增加了系统的灵活性;
  2. 异步:主要应用在实时性要求不严格的场景,比如用注册发送验证码、下单通知、发放优惠券等。生产者只需要将消息发送给MQ,剩下的都MQ和消费者处理;生产者只需要发送给broker后就可以直接告诉客户端已经处理完成;
  3. 削峰:指的是短时间内有大量请求进来,短时间内无法处理这么多任务,可以将这些任务先发给MQ,根据MQ的存储和分发功能,平稳地处理这些任务;

但其实,除了这些之外,消息队列还可以实现一个非常重要的功能,就是分布式事务,而且用消息队列实现分布式事务能够达到更高的效率;

2.1、传统模式

在传统模式下服务之间的调用的是串行的, 必须一个个地去处理,如果说设计到的服务比较多,或者操作的数据比较多,这肯定会大大增加响应的时间;高并发的项目下,响应时间会更长,这种响应时长是不被用户接收的。

2.2、消息模式

为了解决传统模式的通病,就发明了消息模式,服务器A将消息写入消息队列MQ,需要消息的系统自己从消息队列中订阅,从而系统A不需要做任何修改。

2.3、市面上常见的消息队列有哪些
  1. RabbitMQ :目前主流的消息服务,大多数公司用的,用erlang语言开发,健壮、稳定、易用、跨平台、支持多种语言、文档齐全;可做到消息0丢失,缺点是吞吐量较低,学习成本较高;
  2. ActiveMQ :并发量较低,维护越来越少,适用于老项目和并发量不高的项目;
  3. Kafka :吞吐量高,单机每秒可处理的TPS达到了百万条,为大数据而生,缺点是消息丢失不支持重试;有可能会丢失消息
  4. ZeroMQ :无broker的消息队列,在生产者和消费者各自维护一个队列
  5. RocketMQ :阿里人根据kafka的特性用java开发的消息队列,单机吞吐量可达到10万级,可用性高、消息可靠,可做到0丢失,现已捐赠给apache;缺点是社区活跃度不高,只支持java和c++

二、消息队列会出现哪些问题?

使用了消息队列后,虽然解决了一些问题,但是新的问题也随之而来,主要有以下3个问题:

  1. 消息丢失
  2. 重复消费(幂等性问题)
  3. 消费速率
  4. 消息积压

1、消息丢失

消息丢失是一个很严重的问题,特别的那些需要保证一致性的数据,比如有以下一个场景:

  1. 小红和小明各有100元,
  2. 小红给小明转账2元,就需要在小红的账上减去2元,在小明的账上加2元,
  3. 如果小明添加请求成功了,但是小红的扣减请求失败了,本来总额是200元,因为扣减失败,总额变成了202元,导致了数据不一致的问题;

那要如何解决这种消息丢失的问题呢,首先要整个消息系统分为三个部分,分别是:

  1. 生产者发送给broker防止消息丢失
  2. broker防止消息丢失
  3. 消费者防止消息丢失

1.1、生产者发送给broker防止消息丢失

可以使用回调接口的方式,也叫确认模式,步骤如下:

  1. 生产者生产消息后,先将消息内容保存到本地缓存中,
  2. 然后将消息发送给broker,并且在发送消息时,给每个消息生成唯一的id编号(ACK),
  3. broker收到消息并持久化到本地后,调用生产者的回调接口,发送唯一的id(ACK)给生产者,告诉生产者这条数据已经接收成功;
  4. 生产者收到broker发回的ack后,就知道这条消息已经发送成功了, 在本地缓存删除这条消息;

1.1.1、生产者发送失败了怎么办?

可能是网络原因,生产者发送过去了,但是broker没收到;这种情况下可以启用重发机制,启动一个后台的定时任务,每隔一段时间去判断已发送的消息是否超时未收到ack,因为生产需要等待broker返回ack,在一定时长内若未收到broker的ack,那么生产者的定时任务就可以从缓存中取出那个消息再发一次,直到发送成功为止;

1.2、broker防止消息丢失

broker收到消息之后,如果只是将数据保存到内存中,万一发生断电的情况,数据也就没了,所以为了防止这种情况发生,必须要将数据持久化到本地,以文件的方式保存起来;这样一来,就算断电,主要持久化的文件还在,重新启动后数据即可立即恢复;并且搭建mq高可用,同时在多台节点保存消息内容,万一其中一台节点发生硬盘损坏,其他机器上的数据依然不会丢失;

1.3 、消费者防止消息丢失

消费者在消费时有2种模式:

  1. broker发送消息给消费者进行消费(发送模式)
  2. 由消费者主动去broker拉取消息进行消费(拉取模式)
1.3.1、 broker发送给消费者进行消费(发送模式)

采用消息确认机制,和生产者一样,当broker发生消息给消费者后,消费者在成功消费后会返回一个 ack 给broker,broker接收到消费者返回的ack后才会从磁盘(或者内存)删除已消费的消息数据;

1.3.1.1.、如果broker未收到消费者返回的ack怎么办?

这个问题很好解决,broker启用重发机制即可;一段时间后后未收到ack就再发一次,可以参考rabbitMQ的重试机制(2s,4s,8s,10s,16s),若未收到ack,2秒后第一次重试,第二次为4秒,第三次为8秒,以此类推。。。

1.3.2 消费者主动去broker拉取消息进行消费(拉取模式)

由消费者主动拉取消息进行消费,可以解决消费速率问题,消费成功后不返回任何信息,消费失败将消息放回消息队列中;下次拉取时继续消费;

2、幂等性问题:重复消费

既然用到了重试机制,同一个消息就会发送多次,有可能broker发送第一个消息后,消费者处理的时间比较长,还没处理完broker触发了超时机制,同一个消息发送了第二次,但是这时候消费者已经消费完了第一条消息,broker发来的第二条消息又消费了一次,造成了重复消费的问题;也叫幂等性问题;

要解决这个问题,就得在生产者上做文章,生产者 在发送消息的时候,给每个消息生成一个全局唯一的uuid,并且在消费的时候加上分布式锁,即能解决问题;

3、消费速率问题:如果broker发送太快了,消费者无法承受这么大的流量怎么办?

在发送模式中的消费速率问题,比如消费者每秒的吞吐量是10000,但是broker每秒想消费者发送了12000个请求,就会速率问题,消费者处理不了那么多请求会进入阻塞,如果这时候broker还继续发送超量的请求,就可能会导致消费者服务雪崩效应;为了解决这个问题,需要将broker的发送速率降低,每秒发送的消息不可超过10000个,并且消费者做高可用,增加多个消费者服务集群,保证服务的可靠性;

4、消息积压

4.1、消息积压的原因

消息积压的原因一般情况下都是消息的速度赶不上生产的速度,比如一秒钟的时间里面生产者生产了1000条消息,而消费者在一秒钟里面只消费了500条消息,这种情况下没消费完的消息就会积压在队列里面;

4.2、消息积压出现的问题

  1. 随着消息数据堆积越来越多,消费者寻址的性能越来越慢;最后导致整个kafka对外提供服务的性能很差,从而造成其他服务的访问速度也变慢,最后造成雪崩效应;
  2. 消息积压还可能会造成的一个问题就是磁盘被堆满,导致生产者的消息数据无法写入磁盘;一直报错,从而引发的连锁反应,也会造成雪崩效应;

4.3、消息积压解决方案

出现线上消息积压的问题时,如果MQ已经因为消息积压变得不可用了,首先做的第一步肯定是要先将MQ恢复过来,但是要恢复又不想让消息丢失的话,就肯定不能删除已经存储的消息,所以最好的办法就是先扩容,让MQ先用起来;

然后马上在代码层面做以下优化

  1. 在消费中使用多线程或线程池一起消费;
  2. 在业务层面优化代码,提升消费速度
    3.使用多个消费者集群一起消费;

关键字消息队列

上一篇: redis实现分布式锁

下一篇: Hystrix底层原理