RocketMQ重复消息解决方案

重复消费案例场景分析

订单系统已经跟各个系统进行了解耦,也就是说当订单支付成功之后,会发送一条消息到MQ里去,然后红包系统从里面获取消息派发红包,优惠券系统从里面获取消息派发优惠券,其他系统也是同理。
对线上系统的日志进行分析的时候,发现了一个奇怪的问题,那就是优惠券系统似乎对同一条消息重复的处理了两次,导致他给一个用户重复的派发了两个优惠券
notion image

生产者发消息重复

第三方支付接口超时重试

我们的订单系统收到一个支付成功的通知之后,他在发送消息到MQ的时候,会重发把一个消息发送两次吗?
可能有的朋友乍一看觉得应该不可能,但是其实在生产环境中运行的系统,显然是有可能把一个消息重复发两次的!首先,假设用户在支付成功之后,我们的订单系统收到了一个支付成功的通知,接着他就向MQ发送了一条订单支付成功的消息,这个大家都知道没有什么问题。
但是偏偏可能因为不知道什么原因,你的订单系统处理的速度有点慢,我们看下图。
notion image
然后可能就因为你的订单系统处理的速度有点慢了,这就导致支付系统跟你订单系统之间的请求出现了超时,此时有可能支付系统再次重试调用了你订单系统的接口去通知你,这个订单支付成功了,然后你的订单系统这个时候可能又一次推送了一条消息到MQ里去,相当于是一个订单支付成功的消息,你重复推送了两次到MQ!
此时相当于是MQ里就会对一个订单的支付成功消息,总共有两条,我们看下图的示意。
notion image
那如果你订单系统对一个订单重复推送了两次支付成功消息到MQ,MQ里对一个订单有两条重复的支付成功消息,优惠券系统必然会消费到一个订单的两条重复的支付成功消息,也必然会针对这个订单给用户重复的派发两个优惠券,我们看下图。
notion image
所以大家看到这里,通过一步一图的方式,可以很清晰的看到我们用于发送消息到MQ的订单系统,如果出现了接口超时等问题,可能会导致上游的支付系统重试调用订单系统的接口,进而导致订单系统对一个消息重复发送两条到MQ里去!

订单系统自己重复发送消息

接着我们来考虑第二种情况,假设支付系统没有对一个订单重复调用你的订单系统的接口,而是你订单系统自己可能就重复发送消息到MQ里去
那这是一个什么情况呢?我们来分析一下。假设我们的订单系统为了保证消息一定能投递到MQ里去,因此采用了重试的代码,
我们看下面的代码片段,如果发现MQ发送有异常,则会进行几次重试。
notion image
但是这种重试的方式,其实是一把双刃剑,因为正是这个重试就可能导致消息重复发送我们来考虑一个情况,假设你发送了一条消息到MQ了,其实MQ是已经接收到这条消息了,结果MQ返回响应给你的时候,网络有问题超时了,就是你没能及时收到MQ返回给你的响应。
但是大家一定要明确一点,此时MQ里其实是已经有你发送过去的消息了,只不过他返回给你的响应没能给到你而已!我们看下面的图示
notion image
这个时候,你的代码里可能会发现一个网络超时的异常,然后你就会进行重试再次发送这个消息到MQ去,然后MQ必然会收到一条一模一样的消息,进而导致你的消息重复发送了!
看下图的示意
notion image
所以这种重试代码在使用的时候一定要小心!因为他还是有一定的概率会导致你重发消息的!

优惠券系统重复消费一条消息

即使你没有重复发送消息到MQ,哪怕MQ里就一条消息,优惠券系统也有可能会重复进行消费这是为什么呢?我们一步一步来分析一下。
假设你的优惠券系统拿到了一条订单成功支付的消息,然后都已经进行处理了,也就是说都已经对这个订单给你发了一张优惠券了,本来我们之前讲过,这个时候他应该返回一个CONSUME_SUCCESS的状态,然后提交消费进度offset到broker的。
但是不巧的是,你刚刚发完优惠券,还没来得及提交消息offset到broker呢!优惠券系统就进行了一次重启!比如可能优惠券系统的代码更新了,需要重启进行重新部署。
我们看下面的图示
notion image
这时因为你没提交这条消息的offset给broker,broker并不知道你已经处理完了这条消息,然后优惠券系统重启之后,broker就会再次把这条消息交给你,让你再一次进行处理,然后你会再一次发送一张优惠券,导致重复发送了两次优惠券!这就是对同一条消息,优惠券系统重复处理两次的原因,我们看下面的图示。
notion image
对类似优惠券系统这样的业务系统,我们肯定是会频繁的更新代码的,可能每隔几天就需要重启一次系统进行代码的更新
所以其实你重启优惠券系统的时候,可能有一批消息刚处理完,还没来得及提交offset给broker呢,然后你重启之后就会再一次重复处理这批消息,这种情况可能是家常便饭!
另外就是对于系统之间的调用,有的时候出现超时和重试的情况也是很常见的,所以你负责发消息到MQ的系统,很可能时不时的出现一次超时,然后被别人重试调用你的接口,你可能会重复发送一条消息到MQ里去,这可能也是家常便饭!

对系统核心流程引入幂等性机制

到底应该如何去避免MQ中的消息进行重复处理。
要解决这个问题,我们就要先给大家讲一个概念,叫做幂等性机制。
这个幂等性机制,其实就是用来避免对同一个请求或者同一条消息进行重复处理的机制,所谓的幂等,他的意思就是,比如你有一个接口,然后如果别人对一次请求重试了多次,来调用你的接口,你必须保证自己系统的数据是正常的,不能多出来一些重复的数据,这就是幂等性的意思。
那么对于我们的MQ而言,就是你从MQ里获取消息的时候,要保证对同一个消息只能处理一次,不能重复处理多次,导致出现重复的数据。
因此要解决MQ的消息重复问题,关键就是要引入幂等性机制。

发送消息到MQ的时候如何保证幂等性

当我们的订单系统发送消息到MQ的时候需要保证幂等性吗?
我们都知道,订单系统的接口可能会被重复调用导致发送重复的消息到MQ去,也可能自己有重试机制导致发送重复的消息到MQ,如下图所示
notion image
那么我们如果想要让订单系统别发送重复的消息到MQ去,应该怎么做呢?
大体上来说,常见的方案有两种。

业务判断法

第一个方案就是业务判断法,也就是说你的订单系统必须要知道自己到底是否发送过消息到MQ去,消息到底是否已经在MQ里了。
我们举个例子,当支付系统重试调用你的订单系统的接口时,你需要发送一个请求到MQ去,查询一下当前MQ里是否存在针对这个订单的支付消息?
如果MQ告诉你,针对id=1100这个订单的支付成功消息,在我这里已经有了,你之前已经写入进来了,那么订单系统就可以不要再次发送这条消息到MQ去了,我们看下图的示意。
notion image
这个业务判断法的核心就在于,你的消息肯定是存在于MQ。如果没发送过这个消息,MQ里肯定没有这个消息,如果发送过这个消息,MQ里肯定给有这个消息。
所以当你的订单系统的接口被重试调用的时候,你这个接口上来就应该发送请求到MQ里去查询一下,比如对订单id=1100这个订单的支付成功消息,在你MQ那里有没有?
如果有的话,我就不再重复发送消息了!

状态判断法

这个方法的核心在于,你需要引入一个Redis缓存来存储你是否发送过消息的状态,如果你成功发送了一个消息到MQ里去,你得在Redis缓存里写一条数据,标记这个消息已经发送过,我们看下图。
notion image
那么当你的订单接口被重复调用的时候,你只要根据订单id去Redis缓存里查询一下,这个订单的支付消息是否已经发送给MQ了,如果发送过了,你就别再次发送了!
我们看下图的示意。
notion image
其实两种幂等性机制都是很常用的,但是大家这里一定要知道一个事情,那就8是对于基于Redis的状态判断法,有可能没办法完全做到幂等性。
举个例子,你的支付系统发送请求给订单系统,然后已经发送消息到MQ去了,但是此时订单系统突然崩溃了,没来得及把消息发送的状态写入Redis
我们看下图
notion image
这个时候如果你的订单系统在其他机器上部署了,或者他重启了,那么这个时候订单系统被重试调用的时候,他去找Redis查询消息发送状态,会以为消息没发送过,然后会再次发送重复消息到MQ去
notion image
所以这种方案一般情况下是可以做到幂等性的,但是如果有时候你刚发送了消息到MQ,还没来得及写Redis,系统就挂了,之后你的接口被重试调用的时候,你查Redis还以为消息没发过,就会发送重复的消息到MQ去。

有没有必要在订单系统环节保证消息不重复发送?

所以在我们这个场景中,如果在订单系统环节要保证消息不重复发送,要不然就是直接通过查询MQ来判断消息是否发过,要不然就是通过引入Redis来保存消息发送状态。
其实这两种方案都不是太好。
因为RocketMQ虽然是支持你查询某个消息是否存在的,这个功能我们后面的案例会讲他的功能使用和底层原理,但是在这个环节你直接从MQ查询消息是没这个必要的,他的性能也不是太好,会影响你的接口的性能。
另外基于Redis的消息发送状态的方案,在极端情况下还是没法100%保证幂等性,所以也不是特别好的一个方案。
所以对于我们而言,在这里建议是不用在这个环节保证幂等性,也就是我们可以默许他可能会发送重复的消息到MQ里去。

消费者如何保证消息处理的幂等

接着我们来看优惠券系统假设会拿到重复的消息,那么如何保证消息处理的幂等性?
其实这里就比较简单了,直接基于业务判断法就可以了,因为优惠券系统每次拿到一条消息后给用户发一张优惠券,实际上核心就是在数据库里给用户插入一条优惠券记录
那么如果优惠券系统从MQ那里拿到一个订单的两条重复的支付成功消息,这个时候其实很简单,他只要先去优惠券数据库中查询一下,比如对订单id=1100的订单,是否已经发放过优惠券了,是否有优惠券记录,如果有的话,就不要重复发券了!通过这个业务判断的方法,就可以简单高效的避免消息的重复处理了,我们看下图。
通过这个业务判断的方法,就可以简单高效的避免消息的重复处理了,我们看下图。
notion image

MQ消息幂等性的方案总结

一般来说,对于MQ的重复消息问题而言,我们往MQ里重复发送一样的消息其实是还可以接收的,因为MQ里有多条重复消息,他不会对系统的核心数据直接造成影响,但是我们关键要保证的,是你从MQ里获取消息进行处理的时候,必须要保证消息不能重复处理。
这里的话,要保证消息的幂等性,我们优先推荐的其实还是业务判断法,直接根据你的数据存储中的记录来判断这个消息是否处理过,如果处理过了,那就别再次处理了。因为我们要知道,基于Redis的消息发送状态的方案,在一些极端情况下还是没法完全保证幂等性的。
一般是两种方案结合,redis缓存(分布式锁set ex nx和lua脚本释放锁)+数据库唯一键(兜底方案)