消息乱序的场景
订单数据库的binlog消息乱序
一个案例,就是大数据团队需要获取订单数据库中的全部数据,然后将订单数据保存一份在自己的大数据存储系统中,比如HDFS、Hive、HBase等接着基于大数据技术对这些数据进行计算,如下图所示。
如果让大数据系统自己直接跑复杂的大SQL在订单系统的数据库上来出一些数据报表,是会严重影响订单系统的性能的,所以后来这个方案优化为了,基于Canal这样的中间件去监听订单数据库的binlog,就是一些增删改操作的日志,然后把这些binlog发送到MQ里去。
接着大数据系统自己从MQ里获取binlog,落地到自己的大数据存储中去,然后对自己的存储中的数据进行计算得到数据报表即可,我们看下图。
这个技术方案原本大家都以为会运行的很良好,结果没想到大数据团队在上了这个技术方案一段时间之后,遇到了一些奇怪的问题。他们通过这个方案计算出来的数据报表,被公司的管理层和运营同事发现,很多数据指标都是错误的!
于是他们就展开了排查,在对自己的大数据存储中的订单数据与订单数据库中的订单数据进行了一次比对之后,发现他们那儿的一些订单数据是不对的。
比如在订单数据库中一个订单的字段A的值是100,结果在大数据存储中的一个订单的字段A的值是0。
那如果两边的订单数据的字段值都不一致的话,必然会导致最终计算出来的数据报表的指标是错误的!
因此大数据团队针对这个问题,在系统中打印了很多的日志,然后观察了几天,发现了一个惊人的问题,那就是订单数据库的binlog在通过MQ同步的过程中,出现了奇怪的消息乱序的现象!
简单来说,比如订单系统在更新订单数据库的时候,有两条SQL语句:
insert into order values(xx, 0) update order set xxvalue=100 where id=xxx
就是先插入了一条订单数据,刚开始他一个字段的值是0,接着更新他的一个字段的值是100。
然后这两条SQL语句是对应着两个binlog的,也就是两个更新日志,一个binlog是insert语句的,一个binlog是update语句的,这个binlog会进入到MQ中去。
然后大数据系统从MQ获取出来binlog的时候,居然是先获取出来了update语句的binlog,然后再获取了insert语句的binlog也就是说,这个时候会先执行更新操作,但是此时数据根本不存在,没法进行更新,接着执行插入操作,也就是插入一条字段值为0的订单数据进去,最后大数据存储中的订单记录的字段值就是0。
我们看右图,有一个清晰的过程展示。
正是这个消息乱序的原因,导致了大数据存储中的数据都错乱了。
为什么基于MQ来传输数据会出现消息乱序?
可以给每个Topic指定多个
MessageQueue
,然后你写入消息的时候,其实是会把消息均匀分发给不同的MessageQueue
的。
比如我们这里在写入binlog到MQ的时候,可能会把insert binlog
写入到一个MessageQueue
里去,update binlog
写入到另外一个MessageQueue
里去我们看下面的图示
接着大数据系统在获取binlog的时候,可能会部署多台机器组成一个
Consumer Group
,对于Consumer Group
中的每台机器都会负责消费一部分MessageQueue
的消息,所以可能一台机器从上图的ConsumeQueue01
中获取到了insert binlog,一台机器从上图的ConsumeQueue02
中获取到了update binlog如下图所示
因为我们看到上图中,是两台机器上的大数据系统并行的去获取binlog,所以完全有可能是其中一个大数据系统先获取到了update
binlog去执行了更新操作,此时存储中没有数据,自然是没法更新的。
然后另外一个大数据系统再获取到insert binlog去执行插入操作,最终导致只有一个字段值为0的订单数据,如下图:
消息乱序原因总结
我们完全可以清晰的看到,在使用MQ的时候出现消息乱序是非常正常的一个问题,因为我们原本有顺序的消息,完全可能会分发到不同的MessageQueue中去,然后不同机器上部署的Consumer可能会用混乱的顺序从不同的MessageQueue里获取消息然后处理。
所以在实际使用MQ的时候,我们必须要考虑到这个问题。
更多场景
RocketMQ如何解决消息乱序问题
让属于同一个订单的binlog进入一个MessageQueue
所以要解决这个消息的乱序问题,最根本的方法其实非常简单,就是得想办法让一个订单的binlog进入到一个
MessageQueue
里去。给大家举个例子,比如对一个订单,我们先后执行了insert、update两条SQL语句,也就对应了2个binlog。
那么我们现在就必须要想办法让这个订单的2个binlog都直接进入到Topic下的一个
MessageQueue
里去。那么我们这个时候应该怎么做呢?我们完全可以根据订单id来进行判断,我们可以往MQ里发送binlog的时候,根据订单id来判断一下,如果订单id相同,你必须保证他进入同一个
MessageQueue
。我们这里可以采用取模的方法,比如有一个订单id是1100,那么他可能有2个binlog,对这两个binlog,我们必须要用订单id=1100对
MessageQueue
的数量进行取模,比如MessageQueue
一共有15个,那么此时订单id=1100对15取模,就是5也就是说,凡是订单id=1100的binlog,都应该进入位置为5的MessageQueue
中去!通过这个方法,我们就可以让一个订单的binlog都按照顺序进入到一个
MessageQueue
中去,看下面的图:获取binlog有序
我们的MySQL数据库的binlog一定都是有顺序的。
比如,订单系统对订单数据库执行了两条SQL,先是insert语句,然后是update语句。
那么此时MySQL数据库自己必然是在磁盘文件里按照顺序先写入insert语句的binlog,然后写入update语句的binlog,如下图所示:
当我们从MySQL数据库中获取他的binlog的时候,此时也必须是按照binlog的顺序来获取的,也就是说比如Canal作为一个中间件从MySQL那里监听和获取binlog,那么当binlog传输到Canal的时候,也必然是有先后顺序的,先是insert binlog,然后是update binlog,如下图所示。
接着我们将binlog发送给MQ的时候,必须将一个订单的binlog都发送到一个
MessageQueue
里去,而且发送过去的时候,也必须是严格按照顺序来发送的只有这样,最终才能让一个订单的binlog进入同一个
MessageQueue
,而且还是有序的,如下图所示:所以我们必须要严格做到以上几点,才能保证一个订单的binlog绝对有序的进入一个MessageQueue中。
Consumer有序处理一个订单的binlog
一个Consumer可以处理多个MessageQueue的消息,但是一个MessageQueue只能交给一个Consumer来进行处理,所以一个订单的binlog只会有序的交给一个Consumer来进行处理!
我们看下图,这样的话一个大数据系统就可以获取到一个订单的有序的binlog,然后有序的根据binlog把数据还原到自己的存储中去。
有序消息的方案中消息失败处理
在Consumer处理消息的时候,可能会因为底层存储挂了导致消息处理失败,之前我们说过,此时可以返回
RECONSUME_LATER
状态,然后broker会过一会儿自动给我们重试。但是这个方案用在我们的有序消息中可以吗?
那绝对是不行的,因为如果你的consumer获取到订单的一个insert binlog,结果处理失败了,此时返回了
RECONSUME_LATER
,那么这条消息会进入重试队列,过一会儿才会交给你重试。但是此时broker会直接把下一条消息,也就是这个订单的update binlog交给你来处理,此时万一你执行成功了,就根本没有数据可以更新!又会出现消息乱序的问题,我们看下图
所以对于有序消息的方案中,如果你遇到消息处理失败的场景,就必须返回
SUSPEND_CURRENT_QUEUE_A_MOMENT
这个状态,意思是先等一会儿,一会儿再继续处理这批消息,而不能把这批消息放入重试队列去,然后直接处理下一批消息。有序消息方案与其他消息方案的结合
如果你一定要求消息是有序的,那么必须得用上述的有序消息方案,同时对这个方案,如果你要确保消息不丢失,那么可以和消息零丢失方案结合起来,如果你要避免消息重复处理,还需要在消费者那里处理消息的时候,去看一下,消息如果已经存在就不能重复插入,等等。
同时还需要设计自己的消息处理失败的方案,也就是不能进入重试队列,而是暂停等待一会儿,继续处理这批消息。
顺序消息机制的代码实现
如何让一个订单的binlog进入一个MessageQueue?
我们先来看第一个代码落地的分析,首先要实现消息顺序,必须让一个订单的binlog都进入一个MessageQueue中,此时我们可以写
如下的代码:
在上面的代码片段中,我们可以看到,关键因素就是两个,一个是发送消息的时候传入一个
MessageQueueSelector
,在里面你要根据订单id和MessageQueue
数量去选择这个订单id的数据进入哪个MessageQueue
。同时在发送消息的时候除了带上消息自己以外,还要带上订单id,然后
MessageQueueSelector
就会根据订单id去选择一个MessageQueue
发送过去,这样的话,就可以保证一个订单的多个binlog都会进入一个MessageQueue
中去。消费者如何保证按照顺序来获取一个MessageQueue中的消息?
接着我们来看第二块,就是消费者如何按照顺序,来获取一个MessageQueue中的消息?我们看下面的代码:
在上面的代码中,大家可以注意一下,我们使用的是
MessageListenerOrderly
这个东西,他里面有Orderly这个名称也就是说,Consumer会对每一个ConsumeQueue,都仅仅用一个线程来处理其中的消息。比如对
ConsumeQueue01
中的订单id=1100的多个binlog,会交给一个线程来按照binlog顺序来依次处理。否则如果ConsumeQueue01
中的订单id=1100的多个binlog交给Consumer中的多个线程来处理的话,那还是会有消息乱序的问题。