2021年6月14日

云原生消息队列RocketMQ:为什么我们选择 RocketMQ
说起消息队列,ActiveMQ、RabbitMQ、RocketMQ、Kafka、Pulsar 等纷纷涌入我们的脑海中, 在如此众多的开源消息队列产品中,作为一名合格的架构师如何给出高性价比的方案呢?商业化的产品暂不纳入选项中。 接下来我将从选型要素、RocketMQ 的优势两个方面解释为什么选择 RocketMQ 。 选型要素 首先从公司、消息队列服务提供者(一般是中间件团队)、最终用户三个角度来简单总结分析。 一、从公司层面看, 关注如下几点: 1. 技术成本 技术成本,一般包含服务器成本、二次开发成本、后期维护成本等,言而总之:都是钱。 服务器目前基本都使用云服务器,不同的云厂商的相同配置的服务器性能也有一定差异, 服务器成本一般需要了解:云厂商机器性能、云厂商优惠、所需服务器配置、服务器台数、单台服务器目前的价格、单台服务器优惠后的价格等。 2. 人力成本 人力成本,一般包含现有技术人员成本、新人招聘成本。 新的技术选型对于目前的技术人员接受程度怎么样,学习的难易程度怎样等,都是需要考虑的。如果太难的话,上线周期会变长、业务需求实现速度慢,甚至有人直接离职。 新人招聘成本,一般招聘一个新人有如下几个过程:简历筛选、预约面试、数轮面试、发 offer 、接受 offer 、正式入职、试用期、转正。这中间涉及到猎头成本、人力资源沟通成本、面试成本、新人入职后环境适应成本等等。 3. 其他 目前处于不同阶段的互联网公司对于技术成本、人力成本有着不一样的要求,但是很多有一定规模的公司实际上还是用“买买买”的心态来对待的:只要业务发展快速,买服务器、招人都不是问题,如果成本高了就做技术降成本、裁员。这不仅是员工之痛,也是业务之痛,更是公司之痛。 二、从中间件组层面看, 关注如下几点: 1. 稳定 公司级的服务首要的一点就是稳定。拥有稳定的组件、稳定的服务,业务才能有条不紊的进行。所以说,无论什么时候, 稳定都是王道。 2. 功能支持 不同的业务场景需要的功能也不尽相同,通常我们会考虑重试、死信机制,位点重置,定时延迟消息、事物消息,主从切换,权限控制等方面。 3. 性能 目前包含写入延迟和吞吐。 4. 管理平台 首先需要满足最终用户接入、查看、排障,管理员管控 topic 、消费者方便等。管理平台有现成的最好,方便二次开发 。 5. 监控、报警 监控报警是否完善、是否方便接入公司内部自研体系,或者行业的事实标准 Prometheus 。 6. 运维 & 支持 & 开源社区 如果产品上线后, 大部分时间,我们都是在做运维&支持。运维包含服务部署、迁移、服务升级、解决系统 Bug 、用户使用答疑、管理平台和监控报警平台升级等。 7. 其他 我们除了依赖自身以外,也可以借助社区的力量,同一个问题可能别人遇到过并且提交过 PR ,已经得到解决,我们就可以以此作为借鉴。所以社区的活跃情况也是非常重要的考虑。 三、从最终用户(一般包含业务后端研发以及他们的 Leader )看 1. 稳定性 对于业务的研发和他们的 Leader ,他们的核心任务是实现业务逻辑。如果一个服务三天两头总是有问题, 对于他们来说是比较致命的,所以稳定性是比较核心的一部分。 2. 改造现有项目的难度 旧项目改造其实是业务研发接入新中间件实际操作最多的部分。 3. 新项目接入是否便捷 是否便捷接入跟他们的工作量有着直接的关联。 4. 与目前的 App 微服务框架兼容怎样 新项目的接入和公司微服务框架兼容都比较容易。一般中间件在提供服务时都会考虑业务研发接入的便利性。 RocketMQ 的优势 下面将按照选项要素的要求, 分析 RocketMQ 在这方面的优势。 一、RocketMQ 如何解决和友好面对公司层面的诉求 1. 技术成本 就技术成熟度而言,在经历阿里双十一数万亿洪峰、微众银行、民生银行、蚂蚁金服、平安、字节跳动、快手、美团、京东、网易等各种行业大厂的考验后,就不言而喻了。 RocketMQ 对于服务器的配置要求不高, 普通的云主机都可以。曾经我们验证 8C 16G 500G SSD 的 2 主 2 从的集群,发送 tps 可以到 4~5w ,消费 tps 峰值 20w +,稳定在 8w~9w 。并且,还能根据业务实际的需求无感的横向扩展。 综合而言, 技术成本相对可控且人才多。 2. 人力成本 人力成本主要是现有的技术人员的学习成本、招新人的成本。 RocketMQ 是 java 开发的,代码也非常稳定、有条理,各个版本之间除了功能有差异之外,Api 、传输协议几乎没有太多变化,对于升级而言也更加方便。 java 也是目前中间件采用的比较主流的语言,使用的技术人员非常广泛。RocketMQ 在金融行业比如:微众银行、民生银行、蚂蚁金服、平安; 其他行业公司,比如阿里、字节跳动、快手、美团、京东、网易等与大量中小企业都在使用,候选人范围相对较大。 RocketMQ 社区也比较活跃,钉钉群、微信群、QQ 群众多,社区文档非常丰富和完善,原理剖析视频、文档也非常多,非常易于学习和入门。 下面是钉钉群,欢迎大家加群留言、答疑。 对于 java 方面的消息队列方面的人才相比 C/C++、C、Python、Go 等还是更多的:主流的 Kafka 是 scala + java、pulsar 是 java ,对于招聘也有极大的优势。 综合而言,RocketMQ 技术员对于人力成本比较友好。 二、从中间件组层面看,RocketMQ 是如何提供优秀的能力,为业务保驾护航呢? 1. 稳定性 金融级可靠、阿里双十一稳定支持万亿级消息洪峰,在笔者之前所在公司也有过 2 年+零事故的佳绩。 2. 功能丰富,支持的场景众多 重试、死信机制,友好、无感的业务重试机制。 顺序消息、事物消息 万级 Topic 数量支持 消息过滤 消息轨迹追踪 主从自动切换 原生支持 Prometheus 监控 原生支持易用管理平台:RocketMQ Console 访问权限控制(ACL) 3. 性能 RocketMQ 可以支持 99.9% 的写入延迟在 2 ms ,其他的开源消息队列中间件基本都是大于 5 ms ;目前大部分消息队列中间间都支持横向扩展,吞吐上横向扩展几乎都可以满足。RocketMQ 的在滴滴做的性能测试: _ _, 大家参考。 发送、消费 tps 和 kafka 一个数量级,Topic 数量剧增对于性能影响较小。 4. 管理平台 RocketMQ Console 原生支持: 5. 监控、报警 RocketMQ Exporter 原生支持 Prometheus: 6. 运维 & 支持 & 开源社区 无 zk 等第三方依赖,开箱即用 社区钉钉群、微信群、QQ 群非常活跃,钉钉群、微信群有问必答。 社区最近新来一位小姐姐 Commiter ,团队也在不断壮大。 综合看来,RocketMQ 稳定、可靠、性能好,开箱即用,不依赖 Zookeeper ,系统的稳定性更高,复杂度更小。监控报警等周边设施完善,场景支持全,社区活跃、文档丰富,是中间件团队的不二之选。 三、对于最终用户:业务研发、业务研发 Leader,他们的核心担忧是提供的技术是否稳定可靠、是否快速方便的接入 从中间件组层面看这个问题时,RocketMQ 稳定、可靠,那对于接入是否友好呢? RocketMQ 提供 java 原生客户端、Spring 客户端,C++ 客户端、Python 客户端、Go 客户端等多类型、多语言的客户端,对于各种项目都可以统一接入。 微服务框架中 Spring Cloud 基本已经成为事实标准,RocketMQ 支持 Spring boot Starter 和 Spring Cloud Function 等多种方式融合入微服务框架,对于 Spring 体系支持更加方便快捷。 Kafka vs RocketMQ 实际中,很多人应该面临过 RocketMQ vs Kafka ,Kafka 适合对于延迟不敏感、批量型、Topic 数量可控、对于消息丢失不敏感的场景。比如大数据场景的 MySQL2Hive、MySQL2Flink 的数据流通道,日志数据流通道等。 RocketMQ 适用于金融转账消息、订单状态变更消息、手机消息 Push 等业务场景。这些场景 Topic 数量通常过万,对于消息延迟和丢失极度敏感,数据通常是论条处理。对于海量数据的问题,一般地横向扩容完全可以解决。 合适的场景选择合适的产品,万能的产品是不存在的,都是折中,都是取舍。 作者介绍 李伟,Apache RocketMQ 社区 Commiter ,Python 客户端项目负责人, Apache RocketMQ 北京社区联合发起人,Apache Doris Contributor 。目前就职于腾讯,主要负责 OLAP 数据库开发,对分布式存储系统设计和研发有丰富经验,也热衷于知识分享和社区活动。 RocketMQ 学习资料 阿里云知行实验室提供一系列的 RocketMQ 在线实操环境,包含操作文档、ubuntu 实验环境,大家随时尝试玩玩: Apache RocketMQ 开源入门最佳实践: 《RocketMQ 分布式消息中间件:核心原理与最佳实践》随书实战:_ 在 Spring 生态中玩转 RocketMQ: 实验预览图如下: 其他资源 RocketMQ vs. ActiveMQ vs. Kafka: RocketMQ 源码: RocketMQ Exporter 源码: RocketMQ Spring 源码: RocketMQ C++ 客户端源码: RocketMQ Python 客户端源码: RocketMQ Go 客户端源码: RocketMQ Console 源码: RocketMQ Flink Connector 源码: RocketMQ 如何保证消息可靠: 大揭秘!RocketMQ 如何管理消费进度:
作者:李伟
#行业实践

2021年4月3日

阿里的 RocketMQ 如何让双十一峰值之下 0 故障?
2020 年双十一交易峰值达到 58.3 W 笔/秒,消息中间件 RocketMQ 继续数年 0 故障丝般顺滑地完美支持了整个集团大促的各类业务平稳。2020 年双十一大促中,消息中间件 RocketMQ 发生了以下几个方面的变化: 云原生化实践:完成运维层面的云原生化改造,实现 Kubernetes 化。 性能优化:消息过滤优化交易集群性能提升 30%。 全新的消费模型:对于延迟敏感业务提供新的消费模式,降低因发布、重启等场景下导致的消费延迟。 云原生化实践 1. 背景 Kubernetes 作为目前云原生化技术栈实践中重要的一环,其生态已经逐步建立并日益丰富。目前,服务于集团内部的 RocketMQ 集群拥有巨大的规模以及各种历史因素,因此在运维方面存在相当一部分痛点,我们希望能够通过云原生技术栈来尝试找到对应解决方案,并同时实现降本提效,达到无人值守的自动化运维。 消息中间件早在 2016 年,通过内部团队提供的中间件部署平台实现了容器化和自动化发布,整体的运维比 2016 年前已经有了很大的提高,但是作为一个有状态的服务,在运维层面仍然存在较多的问题。 中间件部署平台帮我们完成了资源的申请,容器的创建、初始化、镜像安装等一系列的基础工作,但是因为中间件各个产品都有自己不同的部署逻辑,所以在应用的发布上,就是各应用自己的定制化了。中间件部署平台的开发也不完全了解集团内 RocketMQ 的部署过程是怎样的。 因此在 2016 年的时候,部署平台需要我们去亲自实现消息中间件的应用发布代码。虽然部署平台大大提升了我们的运维效率,甚至还能实现一键发布,但是这样的方案也有不少的问题。比较明显的就是,当我们的发布逻辑有变化的时候,还需要去修改部署平台对应的代码,需要部署平台升级来支持我们,用最近比较流行的一个说法,就是相当不云原生。 同样在故障机替换、集群缩容等操作中,存在部分人工参与的工作,如切流,堆积数据的确认等。我们尝试过在部署平台中集成更多消息中间件自己的运维逻辑,不过在其他团队的工程里写自己的业务代码,确实也是一个不太友好的实现方案,因此我们希望通过 Kubernetes 来实现消息中间件自己的 operator 。我们同样希望利用云化后云盘的多副本能力来降低我们的机器成本并降低主备运维的复杂程度。 经过一段时间的跟进与探讨,最终再次由内部团队承担了建设云原生应用运维平台的任务,并依托于中间件部署平台的经验,借助云原生技术栈,实现对有状态应用自动化运维的突破。 2. 实现 整体的实现方案如上图所示,通过自定义的 CRD 对消息中间件的业务模型进行抽象,将原有的在中间件部署平台的业务发布部署逻辑下沉到消息中间件自己的 operator 中,托管在内部 Kubernetes 平台上。该平台负责所有的容器生产、初始化以及集团内一切线上环境的基线部署,屏蔽掉 IaaS 层的所有细节。 Operator 承担了所有的新建集群、扩容、缩容、迁移的全部逻辑,包括每个 pod 对应的 brokerName 自动生成、配置文件,根据集群不同功能而配置的各种开关,元数据的同步复制等等。同时之前一些人工的相关操作,比如切流时候的流量观察,下线前的堆积数据观察等也全部集成到了 operator 中。当我们有需求重新修改各种运维逻辑的时候,也再也不用去依赖通用的具体实现,修改自己的 operator 即可。 最后线上的实际部署情况去掉了图中的所有的 replica 备机。在 Kubernetes 的理念中,一个集群中每个实例的状态是一致的,没有依赖关系,而如果按照消息中间件原有的主备成对部署的方案,主备之间是有严格的对应关系,并且在上下线发布过程中有严格的顺序要求,这种部署模式在 Kubernetes 的体系下是并不提倡的。若依然采用以上老的架构方式,会导致实例控制的复杂性和不可控性,同时我们也希望能更多的遵循 Kubernetes 的运维理念。 云化后的 ECS 使用的是高速云盘,底层将对数据做了多备份,因此数据的可用性得到了保障。并且高速云盘在性能上完全满足 MQ 同步刷盘,因此,此时就可以把之前的异步刷盘改为同步,保证消息写入时的不丢失问题。云原生模式下,所有的实例环境均是一致性的,依托容器技术和 Kubernetes 的技术,可实现任何实例挂掉(包含宕机引起的挂掉),都能自动自愈,快速恢复。 解决了数据的可靠性和服务的可用性后,整个云原生化后的架构可以变得更加简单,只有 broker 的概念,再无主备之分。 3. 大促验证 上图是 Kubernetes 上线后双十一大促当天的发送 RT 统计,可见大促期间的发送 RT 较为平稳,整体符合预期,云原生化实践完成了关键性的里程碑。 性能优化 1. 背景 RocketMQ 至今已经连续七年 0 故障支持集团的双十一大促。自从 RocketMQ 诞生以来,为了能够完全承载包括集团业务中台交易消息等核心链路在内的各类关键业务,复用了原有的上层协议逻辑,使得各类业务方完全无感知的切换到 RocketMQ 上,并同时充分享受了更为稳定和强大的 RocketMQ 消息中间件的各类特性。 当前,申请订阅业务中台的核心交易消息的业务方一直都在不断持续增加,并且随着各类业务复杂度提升,业务方的消息订阅配置也变得更加复杂繁琐,从而使得交易集群的进行过滤的计算逻辑也变得更为复杂。这些业务方部分沿用旧的协议逻辑(Header 过滤),部分使用 RocketMQ 特有的 SQL 过滤。 2. 主要成本 目前集团内部 RocketMQ 的大促机器成本绝大部分都是交易消息相关的集群,在双十一零点峰值期间,交易集群的峰值和交易峰值成正比,叠加每年新增的复杂订阅带来了额外 CPU 过滤计算逻辑,交易集群都是大促中机器成本增长最大的地方。 3. 优化过程 由于历史原因,大部分的业务方主要还是使用 Header 过滤,内部实现其实是aviator 表达式。仔细观察交易消息集群的业务方过滤表达式,可以发现绝大部分都指定类似MessageType == xxxx这样的条件。翻看 aviator 的源码可以发现这样的条件最终会调用 Java 的字符串比较String.compareTo()。 由于交易消息包括大量不同业务的 MessageType,光是有记录的起码有几千个,随着交易业务流程复杂化,MessageType 的增长更是繁多。随着交易峰值的提高,交易消息峰值正比增长,叠加这部分更加复杂的过滤,持续增长的将来,交易集群的成本极可能和交易峰值指数增长,因此决心对这部分进行优化。 原有的过滤流程如下,每个交易消息需要逐个匹配不同 group 的订阅关系表达式,如果符合表达式,则选取对应的 group 的机器进行投递。如下图所示: 对此流程进行优化的思路需要一定的灵感,在这里借助数据库索引的思路:原有流程可以把所有订阅方的过滤表达式看作数据库的记录,每次消息过滤就相当于一个带有特定条件的数据库查询,把所有匹配查询(消息)的记录(过滤表达式)选取出来作为结果。为了加快查询结果,可以选择 MessageType 作为一个索引字段进行索引化,每次查询变为先匹配 MessageType 主索引,然后把匹配上主索引的记录再进行其它条件(如下图的 sellerId 和 testA )匹配,优化流程如下图所示: 以上优化流程确定后,要关注的技术点有两个: 如何抽取每个表达式中的 MessageType 字段? 如何对 MessageType 字段进行索引化? 对于技术点 1 ,需要针对 aviator 的编译流程进行 hook ,深入 aviator 源码后,可以发现 aviator 的编译是典型的Recursive descent,同时需要考虑到提取后父表达式的短路问题。 在编译过程中针对 messageType==XXX 这种类型进行提取后,把原有的 message==XXX 转变为 true/false 两种情况,然后针对 true、false 进行表达式的短路即可得出表达式优化提取后的情况。例如: 表达式: messageType=='200tradepaiddone' && buyerId==123456 提取为两个子表达式: 子表达式1(messageType==200tradepaiddone):buyerId==123456 子表达式2(messageType!=200tradepaiddone):false 具体到 aviator 的实现里,表达式编译会把每个 token 构建一个 List ,类似如下图所示(为方便理解,绿色方框的是 token ,其它框表示表达式的具体条件组合): 提取了 messageType ,有两种情况: 情况一:messageType == '200tradepaiddone',则把之前 token 的位置合并成true,然后进行表达式短路计算,最后优化成 buyerId==123456 ,具体如下: 情况二:messageType != '200tradepaiddone',则把之前 token 的位置合并成 false ,表达式短路计算后,最后优化成 false ,具体如下: 这样就完成 messageType 的提取。这里可能有人就有一个疑问,为什么要考虑到上面的情况二,messageType != '200tradepaiddone',这是因为必须要考虑到多个条件的时候,比如: (messageType=='200tradepaiddone' && buyerId==123456) || (messageType=='200tradesuccess' && buyerId==3333) 就必须考虑到不等于的情况了。同理,如果考虑到多个表达式嵌套,需要逐步进行短路计算。但整体逻辑是类似的,这里就不再赘述。 说完技术点 1,我们继续关注技术点 2,考虑到高效过滤,直接使用 HashMap 结构进行索引化即可,即把 messageType 的值作为 HashMap 的 key ,把提取后的子表达式作为 HashMap 的 value ,这样每次过滤直接通过一次 hash 计算即可过滤掉绝大部分不适合的表达式,大大提高了过滤效率。 3. 优化效果 该优化最主要降低了 CPU 计算逻辑,根据优化前后的性能情况对比,我们发现不同的交易集群中的订阅方订阅表达式复杂度越高,优化效果越好,这个是符合我们的预期的,其中最大的 CPU 优化有32%的提升,大大降低了本年度 RocketMQ 的部署机器成本。 全新的消费模型 —— POP 消费 1. 背景 RocketMQ 的 PULL 消费对于机器异常 hang 时并不十分友好。如果遇到客户端机器 hang 住,但处于半死不活的状态,与 broker 的心跳没有断掉的时候,客户端 rebalance 依然会分配消费队列到 hang 机器上,并且 hang 机器消费速度很慢甚至无法消费的时候,这样会导致消费堆积。另外类似还有服务端 Broker 发布时,也会由于客户端多次 rebalance 导致消费延迟影响等无法避免的问题。如下图所示: 当 Pull Client 2 发生 hang 机器的时候,它所分配到的三个 Broker 上的 Q2 都出现严重的红色堆积。对于此,我们增加了一种新的消费模型 —— POP 消费,能够解决此类稳定性问题。如下图所示: POP 消费中,三个客户端并不需要 rebalance 去分配消费队列,取而代之的是,它们都会使用 POP 请求所有的 broker 获取消息进行消费。broker 内部会把自身的三个队列的消息根据一定的算法分配给请求的 POP Client。即使 Pop Client 2 出现 hang,但内部队列的消息也会让 Pop Client1 和 Pop Client2 进行消费。这样就 hang 机器造成的避免了消费堆积。 2. 实现 POP 消费和原来 PULL 消费对比,最大的一点就是弱化了队列这个概念,PULL 消费需要客户端通过 rebalance 把 broker 的队列分配好,从而去消费分配到自己专属的队列,新的 POP 消费中,客户端的机器会直接到每个 broker 的队列进行请求消费, broker 会把消息分配返回给等待的机器。随后客户端消费结束后返回对应的 Ack 结果通知 broker,broker 再标记消息消费结果,如果超时没响应或者消费失败,再会进行重试。 POP 消费的架构图如上图所示。Broker 对于每次 POP 的请求,都会有以下三个操作: 对应的队列进行加锁,然后从 store 层获取该队列的消息; 然后写入 CK 消息,表明获取的消息要被 POP 消费; 最后提交当前位点,并释放锁。 CK 消息实际上是记录了 POP 消息具体位点的定时消息,当客户端超时没响应的时候,CK 消息就会重新被 broker 消费,然后把 CK 消息的位点的消息写入重试队列。如果 broker 收到客户端的消费结果的 Ack ,删除对应的 CK 消息,然后根据具体结果判断是否需要重试。 从整体流程可见,POP 消费并不需要 reblance ,可以避免 rebalance 带来的消费延时,同时客户端可以消费 broker 的所有队列,这样就可以避免机器 hang 而导致堆积的问题。
作者:愈安
#行业实践 #高可用