2024年8月30日

基于 RocketMQ 的云原生 MQTT 消息引擎设计
作者|沁君 概述 随着智能家居、工业互联网和车联网的迅猛发展,面向 IoT(物联网)设备类的消息通讯需求正在经历前所未有的增长。在这样的背景下,高效和可靠的消息传输标准成为了枢纽。MQTT 协议作为新一代物联网场景中得到广泛认可的协议,正逐渐成为行业标准。 本次我们将介绍搭建在 RocketMQ 基础上实现的 MQTT 核心设计,本文重点分析 RocketMQ 如何适应这些变化,通过优化存储和计算架构、推送模型及服务器架构设计,推动 IoT 场景下消息处理的高效性和可扩展性以实现 MQTT 协议。 此外,阿里云 MQTT 以 RocketMQMQTT 为基础,不断进行迭代创新。阿里云是开源 RocketMQMQTT 的主要贡献者和使用者之一。面对设备通信峰谷时段差异性的挑战,本文将介绍阿里云如何将 Serverless 架构应用于消息队列,有效降低运营成本,同时利用云原生环境的特性,为 IoT 设备提供快速响应和灵活伸缩的通讯能力。 进一步地,我们将探讨介绍在云端生态体系中整合 MQTT 的实践,介绍基于统一存储的数据生态集成方案,展示其强大的技术能力和灵活的数据流转能力。 loT 消息场景 (消息场景对比) 物联网技术,作为当代科技领域的璀璨明星,其迅猛发展态势已成共识。据权威预测,至 2025 年,全球物联网设备安装基数有望突破 200 亿大关,这一数字无疑昭示着一个万物互联时代的到来。 更进一步,物联网数据量正以惊人的年增长率约 28% 蓬勃膨胀,预示着未来数据生态中超过 90% 的实时数据将源自物联网。这一趋势深刻改变了数据处理的格局,将实时流数据处理推向以物联网数据为核心的新阶段。 边缘计算的崛起,则是对这一变革的积极响应。预计未来,高达 75% 的数据处理任务将在远离传统数据中心或云端的边缘侧完成。鉴于物联网数据的海量特性,依赖云端进行全部数据处理不仅成本高昂,且难以满足低延迟要求。因此,有效利用边缘计算资源,就地进行数据初步处理,仅将提炼后的关键信息上传云端,成为了提升效率、优化用户体验的关键策略。 在此背景下,消息传递机制在物联网场景中的核心价值愈发凸显: + 桥梁作用:消息系统充当了物联网世界中的“神经网络”,无缝衔接设备与设备、设备与云端应用间的沟通渠道,构筑起云边端一体化的应用框架,确保了信息交流的即时与高效。 + 数据加工引擎:面对物联网持续涌动的数据洪流,基于消息队列(MQ)的事件流存储与流计算技术成为了解锁实时数据分析潜能的钥匙。这一机制不仅能够实时捕捉、存储数据流,还支持在数据产生的瞬间执行计算操作,为物联网应用提供了强大的数据处理基础架构,助力实现数据的即时洞察与决策响应。 总之,消息技术不仅是物联网架构的粘合剂,更是驱动数据流动与智能决策的核心动力,其在物联网领域的应用深度与广度,正随着技术迭代与市场需求的双重驱动而不断拓展。 同时传统消息场景和物联网消息场景有很多的不同,包含以下几个特点: 1)硬件资源差异 传统消息场景依托于高性能、高可靠性的服务集群,运算资源充沛,客户端部署环境多为容器、虚拟机乃至物理服务器,强调集中式计算能力。相比之下,物联网消息场景的客户端直接嵌入至网络边缘的微型设备中,如传感器、智能家电等,这些设备往往受限于极低的计算与存储资源,对能效比有着极高要求。 2)网络环境挑战 在经典的内部数据中心(IDC)环境中,消息处理享有稳定的网络条件和可控的带宽、延迟指标。而物联网环境则拓展至公共网络,面对的是复杂多变的网络状况,尤其在偏远地区或网络覆盖弱的区域,不稳定的连接成为常态,对消息传输的健壮性和效率提出了更高挑战。 3)客户端规模的量级跃迁 传统消息系统处理的日均消息量通常维持在百万级,适用于集中度较高的消息分发。物联网的兴起促使设备数量呈爆炸性增长,动辄涉及亿万级别的终端节点,这对系统的扩展性、消息路由的高效性提出了全新的要求。 4)生产与消费模式的演变 传统场景倾向于采用集中式同步生产模式,强调消息的一致性与有序处理。而物联网消息生成则体现出分散化的特性,每个设备根据其感知环境独立产生少量消息,这对消息收集与整合机制设计提出了新的思考。消费模式上,物联网场景经常涉及大规模广播或组播,单条消息可能触达数百万计的接收者,要求系统具备高效的广播能力和灵活的订阅管理机制。 RocketMQ 的融合架构设计 (融合架构设计图) 我们看到,物联网所需要的消息技术与经典的消息设计有很大的不同。接下来我们来看看基于 RocketMQ 的融合架构 MQTT 设计为了解决物联网的消息场景有哪些特点。 1)融入 MQTT 协议,适应物联网环境特性 RocketMQ 5.0 通过整合 RocketMQMQTT,紧密贴合了物联网领域广泛采用的MQTT协议标准。此协议针对低功耗、网络条件不稳定的情况优化,以其轻量级特性和丰富的功能集脱颖而出,支持不同的消息传递保障级别,满足了从“最多一次”到“仅一次”的多样化需求。协议的领域模型与 RocketMQ 的核心组件高度协调,促进了消息、主题、发布订阅模式的自然融合,为建立一个从设备到云端的无缝消息传递体系打下了稳固的基础。 2)灵活的存储与计算解耦架构 为了应对物联网场景下对高并发连接和大规模数据处理的需求,RocketMQ 5.0 采取了存储与计算分离的架构设计。RocketMQ Broker 作为核心存储组件,确保了数据的持久化与可靠性,而 MQTT 相关的逻辑操作则在专门的代理层实施,这不仅优化了对大量连接、复杂订阅关系的管理,也增强了实时消息推送的能力。这种架构允许根据业务负载动态调整代理层资源,通过增加代理节点来平滑应对设备连接数的增加,体现了系统良好的弹性和扩展潜力。 3)促进端云数据协同的整合架构 RocketMQMQTT 通过其整合的架构设计,促进了物联网设备与云端应用之间的高效数据共享。基于统一的消息存储策略,每条消息在系统内只需存储一次,即可供两端消费,减少了数据冗余,提高了数据流通的效率。此外,RocketMQ 作为数据流的存储中枢,自然而然地与流计算技术结合,为实时分析物联网生成的海量数据提供了便利,加速了数据价值的发掘过程。 存储设计 首先要解决的问题是物联网消息的存储模型。在发布订阅业务模型中,常用的存储模型有两种,写放大和读放大,我们将依次分析两种模型。 (写放大模型) (读放大模型) 写放大模型: 在此模型下,每位消费者拥有专属的消息队列,每条消息需要复制并分布到所有目标消费者的队列中,消费者仅关注并处理自己队列中的消息。以三级主题/Topic/subTopic/test 为例,若该主题吸引了大量客户端订阅,采取“一客一队列”的策略,即每个符合订阅规则的客户端或通配符订阅均维护一份消息副本,将导致消息的存储需求随订阅者数量线性增长。 尽管这种模式在某些传统消息场景,比如遵循 AMQP 协议的应用中表现得游刃有余,因为它确保了消息传递的隔离性和可靠性。但在物联网场景下,特别是当单个消息需被数以百万计的设备消费时,“写放大”策略将引发严重的存储资源消耗问题,迅速成为不可承受之重。 读放大的考量与挑战: 鉴于物联网场景的特殊需求,直接应用传统的“写放大”模型显然是不可行的。为解决这一难题,RocketMQMQTT 采取了更为高效与灵活的存储策略,旨在减少存储冗余,提高系统整体的可扩展性和资源利用率: 在“读放大”模式下,每条消息实际上被存储一次,而为了支持通配符订阅的高效检索,系统在消息存储阶段会创建额外的索引信息——即 consume queue(消费队列)。对于如/Topic/subTopic/这样的通配符订阅,系统会在每个匹配的通配符队列中生成相应的索引,使得订阅了不同通配符主题或具体主题的消费者,都能通过这些共享的存储实体找到并消费到消息。尽管这看似增加了“读”的复杂度,但实际上,每个 consume queue 作为索引,其体积远小于原始消息,显著降低了整体存储成本,同时提高了消息检索与分发的效率。 (原子分发示意图) 为此,我们设计了一种多维度分发的 Topic 队列模型,如上图所示,消息可以来自各个接入场景(如服务端的 MQ/AMQP、客户端的 MQTT),但只会写一份存到 commitlog 里面,然后分发出多个需求场景的队列索引(ConsumerQueue),如服务端场景(MQ/AMQP)可以按照一级 Topic 队列进行传统的服务端消费,客户端 MQTT 场景可以按照 MQTT 多级 Topic 以及通配符订阅进行消费消息。这样的一个队列模型就可以同时支持服务端和终端场景的接入和消息收发,达到一体化的目标。 实现这一模型,RocketMQ依托了两项关键技术特性: + 轻型队列(Light Message Queue) 这一特性允许一条消息被灵活地写入多个 topic queue 中,确保了消息能够高效地适应各种复杂的订阅模式,包括但不限于通配符订阅。它为读放大模型的实现提供了必要的灵活性和效率基础。 + 百万队列能力 RocketMQ 通过集成 RocksDB 这一高性能键值存储引擎,充分利用其在顺序写入方面的优势,实现了百万级别的队列管理能力。特别是通过定制化配置,去除了 RocksDB 内部的日志预写(WriteAhead Log, WAL),进一步优化了存储效率。RocksDB 不仅为 consume queue 提供了稳定高效的存储方案,还确保了即便在极端的队列数量下,系统依然能够保持高性能的索引处理能力。 (轻型队列的实现) 通过采用“读放大”模型,结合 RocketMQ 的轻型队列特性和百万队列的底层技术支持,我们不仅有效解决了物联网环境下消息存储与分发的挑战,还实现了存储成本与系统性能的双重优化。这种设计不仅减少了存储空间的占用,还通过高度优化的索引机制加快了消息检索速度,为大规模物联网设备的消息通信提供了一个既经济又高效的解决方案。 推送模型 (RocketMQMQTT 推送模型) 在介绍完底层队列存储模型之后,我们将重点探讨匹配查找和可靠送达的实现机制。在传统的消息队列 RocketMQ 中,经典的消费模式是消费者通过客户端直接发起长轮询请求,以精准地获取对应主题的队列消息。然而,在 MQTT 场景下,由于客户端数量众多且订阅关系复杂,长轮询模式显得不够有效,因此消费过程变得更加复杂。为此,我们采用了一种推拉结合的模型。 本模型的核心在于终端通过 MQTT 协议连接至代理节点,消息可以来源于多种场景(如MQ、AMQP、MQTT)。当消息存入主题队列后,通知逻辑模块将实时监测到新消息的到达,进而生成消息事件(即消息的主题名称),并推送至网关节点。网关节点根据连接终端的订阅状态进行内部匹配,识别能够接收这一消息的终端,随后触发拉取请求,以从存储层读取消息并推送至终端。 在这个流程中,一个关键问题是通知模块如何确定终端感兴趣的消息,以及哪些网关节点会对此类消息感兴趣。这实际上是一个核心的匹配搜索问题。常见的解决方案主要有两种:第一种是简单的事件广播,第二种是将线上订阅关系集中存储(例如图中的 Lookup 模块),然后进行匹配搜索,再执行精准推送。 虽然事件广播机制在扩展性上存在一定问题,但其性能表现仍然良好,因为我们推送的数据量相对较小,仅为 Topic 名称。此外,同一 Topic 的消息事件可以合并为一个事件,这是我们当前在生产环境中默认采用的方式。另一方面,将线上订阅关系集中存储在 RDS 或 Redis 中也是一种普遍的做法,但这需要保证数据的实性,匹配搜索的过程可能会对整体实时消息链的延迟产生影响。 在该模型中,还设计了一个缓存模块,以便在需要广播大量消息时,避免各个终端对存储层发起重复的读取请求,从而提高整体系统的效率。 阿里云 MQTT 在 Serverless 上的实践 随着云原生技术的不断发展,现代消息中间件逐渐以容器编排为基础,如何实现真正的无服务器架构及秒级弹性管理已成为一项重要的研究课题。 阿里云作为开源 RocketMQMQTT 的主要贡献者和使用者之一,在 MQTT 弹性设计上有很多优化方式和实践经验。我们将介绍阿里云在弹性上的设计思路,展示其如何实现高效、弹性强的 MQTT 消息中间件: 1)抽离网络连接层 阿里云 MQTT 采用类似 Sidecar 的模式,将网络连接层与核心业务逻辑进行分离,使用 Rust 语言来处理网络连接。与 Java 相比,Rust 在内存消耗和启动速度上具有显著优势,尤其在处理大规模 MQTT 连接时,能够有效降低内存占用。 2)秒级扩容 每个 Pod 的资源请求设置较低,同时预留部分 Pod 专门运行 Rust 进程。在扩容需求出现时,系统能够快速启动 MQTT Proxy 进程,省去 Pod 创建和资源挂载的时间,从而显著提升响应速度。 3)弹性预测与监控 利用连接数、TPS、内存、CPU 等白盒指标,以及 RT 等黑盒指标,阿里云 MQTT 依据指标联动规则,制定了合理的扩容策略。这使得系统能够提前预测负载变化并启动 Pod 扩容,确保长期平稳运行。 通过以上设计思路,阿里云能够构建一个高效、弹性强的基于 RocketMQ 的 MQTT 实现方案,充分利用 Rust 带来的性能优势,同时保持系统的稳定性与可扩展性。这种创新设计将在实际应用中显著提升用户体验,助力系统整体性能的优化。 阿里云 MQTT 在车联网中的实践架构 随着汽车出行领域新四化(电气化、智能化、网联化和共享化)的推进,各大汽车制造商正逐步构建以智能驾驶和智能网联为核心的车联网系统。这一新一代车联网系统对底层消息采集、传输和处理的平台架构提出了更高的要求。接下来,我们将介绍阿里云 MQTT 在车联网中的实践架构及其应用价值。 在架构图中,我们可以看到常见的车联网设备,包括车载终端、路测单元和手机端系统。这些设备确保了安全的连接与数据传输。车端的功能涵盖车机数据上报、POI 下发、文件推送、配置下发、消息推送等全新车联业务。这些操作将产生海量的消息 Topic,需要更加安全、稳定的接入与传输,以实现可靠的消息订阅与发布。路端则强调路侧 RSU 的安全接入,支持消息的采集、传输以及地图数据的实时更新。 接入端支持多种协议,包括 TCP、x509、TLS、WSS、WS、OpenAPI 和 AMQP,以满足不同应用场景的灵活需求。这种多协议支持确保了设备之间的无缝互联与高效通信。 在流转生态方面,物联网场景下,各种设备持续产生大量数据,业务方需要对这些数据进行深入分析与处理。采用 RocketMQ 作为存储层,系统能够只保存一份消息,并支持物联网设备和云端应用的共同消费。RocketMQ 的流存储特性使得流计算引擎能够无缝、实时地分析物联网数据,为关键决策提供及时支持。 借助阿里云 EventBridge,MQTT 物联网设备所生成的信息可以顺利流转至 Kafka、AMQP、FC、Flink 等其他中间件或数据处理平台,实现深度的数据分析与处理。事件总线 EventBridge 是阿里云提供的一款 Serverless 总线服务,支持阿里云服务、自定义应用、SaaS 应用以标准化、中心化的方式接入,并能够以标准化的 CloudEvents 1.0 协议在这些应用之间路由事件,帮助轻松构建松耦合、分布式的事件驱动架构。这种灵活的数据流转能力不仅提升了处理速度,还为未来智能化应用的创新和发展奠定了基础。 通过以上架构,可以清晰地看到阿里云 MQTT 在车联网领域的最佳实践,为实现未来智能出行提供了可靠的技术支撑。 结语 在物联网的蓬勃发展背景下,消息传递技术的不断演进已成为支撑智能家居、工业互联网以及车联网等领域的重要基石。通过对 RocketMQ 和 MQTT 协议的深度融合,我们不仅有效解决了物联网时代对高效、可靠消息传输的需求,也为设备通信带来了灵活的解决方案。 阿里云在这一领域的积极探索,通过引入 Serverless 架构,不断推进 MQTT 的技术迭代与创新。这样的设计能够在面对高并发连接和海量数据时,动态调整资源配置,降低成本并提升响应速度,确保了实时数据处理的高效性。 当前,社区正在推动 MQTT 5.0 协议方面已取得显著进展,新的协议特性如更丰富的错误码、更灵活的连接选项以及 will 消息、retain 消息、共享订阅功能都将进一步提升系统的灵活性和可靠性。与此同时,我们在致力于实现更快的弹性扩展能力,以便在面对突发流量时及时响应,提高系统的可用性和灵活性。 随着 IoT 生态体系的不断完善,面对日益复杂的消息场景,消息技术的价值愈发凸显。我们相信,未来通过不断优化的消息架构,能够推动更深层次的智能化应用,同时为构建万物互联的未来奠定坚实的基础。让我们期待 MQTT 在物联网技术的场景中展现其无限可能,同时也继续探索持续探索在确保安全、稳定、高效的消息中间件。
作者:沁君
#技术探索

2024年8月22日

一文详解 RocketMQ 如何利用 Raft 进行高可用保障
作者|季俊涛 前言 Apache RocketMQ 自诞生以来,因其架构简单、业务功能丰富、具备极强可扩展性等特点被众多企业开发者以及云厂商广泛采用。历经十余年的大规模场景打磨,RocketMQ 已经成为业内共识的金融级可靠业务消息首选方案,被广泛应用于互联网、大数据、移动互联网、物联网等领域的业务场景。由于其业务场景愈加丰富,在工业界的使用率日益提高,开发者们也必须更完善地考虑 RocketMQ 的可靠性、可用性。 由于 RocketMQ 底层实际上是一种基于日志的存储系统,而前人为了避免这种存储系统中单个机器可能出现的数据丢失、单点故障等问题,已经有了相对成熟的解决方案——例如同时复制数据到多个机器上。在这个过程中,需要解决的问题便被简化了:如何保证多个机器上的数据是一致的,而且这种一致性强大到可以对抗宕机、脑裂等问题。而这些问题,可以通过分布式一致性算法来彻底解决。 在开源的 Apache RocketMQ 中,我们已经引入了 DLedger [2] 和 SOFAJRaft [3] 来作为 Raft [4] 算法的具体实现,以支撑系统高可用。本文将介绍 RocketMQ 如何利用Raft(一种简单有效的分布式一致性算法)进行高可用的保障。 分布式一致性算法:Raft 共识(Consensus)是分布式系统中实现容错的一个基本问题。它指的是在一个系统中,多个服务器需要就某些值达成一致的意见。一旦它们对一个值做出了决定,这个决定就是不可变更的 [1]。典型的共识算法能够在多数服务器可用的情况下继续运行;比如说,在一个有 5 台服务器的集群中,即使有 2 台服务器宕机,整个集群依然能够正常运作。如果宕机的服务器数量超过半数,集群就无法继续正常运行了(但它也绝不会返回错误的结果)。 业界比较有名的分布式一致性算法是 paxos [12],不过可惜的是它比较晦涩难懂,难懂的代价就是很少有人能掌握它然后基于它做出可靠的实现。不过幸好 Raft 及时出现,它易于理解,并且已经有非常多的业界使用先例,比如 tikv、etcd 等。 这是 Raft 的原始论文,详细描述了 Raft:《In Search of an Understandingable Consensus Algorithm》 [4]。本文的略短版本在 2014 年 USENIX 年度技术会议上获得了最佳论文奖。有意思的是,在论文的 Abstract 中,第一句话便是:Raft is a consensus algorithm for managing a replicated log. 在英文中,log 不仅有“日志”的意思,还有“木头”的意思,一组"replicated log"便组成了木筏,这和 Raft 的英文原意不谋而合。而且,Raft 的主页中,也采用了三根木头组成的筏作为 logo(如图 1)。当然,对 Raft 更正式的解释还是这些单词的首字母缩写:Re{liable | plicated | dundant} And FaultTolerant. 也就是 Raft 被提出时要解决的问题初衷。 (图 1:Raft 主页标题后附的头图) Raft 算法是一种为了管理复制日志的共识算法,它将整个共识过程分解为几个子问题:领导者选举、日志复制和安全性。整个 Raft 算法因此变得易理解、易论证、易实现,从而让分布式一致性协议可以较为简单地实现。Raft 和 Paxos 一样,只要保证 n/2+1 节点即多数派节点正常就可以对外提供服务。 在 Raft 算法中,集群中的每个节点(服务器)可以处于以下三种状态之一: a. Follower(跟随者):这是所有节点的初始状态。它们被动地响应来自领导者和候选者的请求。 b. Candidate(候选者):当跟随者在一段时间内没有收到来自领导者的消息时,它们会成为候选者,并开始选举过程以成为新的领导者。 c. Leader(领导者):集群中的管理节点,负责处理所有客户端请求,并将日志条目复制到其他节点。 Raft 算法通过任期的概念来分隔时间,每个任期开始都会进行一次领导者选举。任期是一个递增的数字,每次选举都会增加。如果跟随者在“选举超时”之内没有收到领导者的心跳,它会将自己的任期号加一,并转变为候选者状态来发起一次领导者选举。候选者首先给自己投票,并向其他节点发送请求投票的 RPC。如果接收节点在当前任期内还没有投票,它会同意投票给请求者。 如果候选者在一次选举中从集群的大多数节点获得了选票,它就会成为领导者。在此过程中生成的任期号用于节点之间的通信,以防止过时的信息导致错误。例如,如果节点收到任期号比自己小的请求,它会拒绝该请求。 极端情况下集群可能会出现脑裂或网络问题,此时集群可能会被分割成几个互不通信的子集。不过由于 Raft 算法要求一个领导者必须拥有集群大多数节点的支持,这保证了即使在脑裂的情况下,最多只有一个子集能够选出一个有效的领导者。 Raft 通过日志复制来保持节点间的一致性。对于一个无限增长的序列 a[1, 2, 3…],如果对于任意整数 i,a[i] 的值满足分布式一致性,这个系统就满足一致性状态机的要求 基本上所有的真实系统都会有源源不断的操作,这时候单独对某个特定的值达成一致显然是不够的。为了让真实系统保证所有的副本的一致性,通常会把操作转化为 writeaheadlog(WAL)。然后让系统中所有副本对 WAL 保持一致,这样每个副本按照顺序执行 WAL 里的操作,就能保证最终的状态是一致的,如图 2 所示。 (图 2:如何通过日志复制来确保节点间数据一致。阶段一为 Client 向 leader 发送写请求,阶段二为 Leader 把‘操作’转化为 WAL 并复制,阶段三为 Leader 收到多数派应答,并将操作应用到状态机) 领导者在收到客户端的请求后,会先将请求作为新的日志条目追加到它的日志中,然后并行地将该条目复制到其他节点。只有当大多数节点都写入了这个日志条目,领导者才会将该操作提交,并应用到它的状态机上,同时通知其他节点也提交这个日志条目。因此,Raft 确保了即使在领导者崩溃或网络分区的情况下,也不会有数据丢失。任何被提交的日志条目都保证在后续的任期中也存在于任意新的领导者的日志中。 简单来说,Raft 算法的特点就是 Strong Leader: a. 系统中必须存在且同一时刻只能有一个 Leader,只有 Leader 可以接受 Clients 发过来的请求; b. Leader 负责主动与所有 Followers 通信,负责将“提案”发送给所有 Followers,同时收集多数派的 Followers 应答; c. Leader 还需向所有 Followers 主动发送心跳维持领导地位(保持存在感)。 一句话总结 Strong Leader: "你们不要 BB! 按我说的做,做完了向我汇报!"。另外,身为 Leader 必须保持一直 BB(heartbeat)的状态,否则就会有别人跳出来想要 BB 。 为了更直观的感受到 Raft 算法的运行原理,笔者强烈推荐观看下面网站中的演示。它以动画的形式,非常直观地展示了 Raft 算法是如何运行的,以及如何应对脑裂等问题的:_https://thesecretlivesofdata.com/raft/_ (图 3:Raft 算法选举过程图示) 相信观看过上面网站中的图解后,读者应该了解了 Raft 的设计思想与具体算法,下面我们直接切入正题,讲解 RocketMQ 与 Raft 的前世今生。 RocketMQ 与 Raft 的前世今生 RocketMQ 尝试融合 Raft 算法已经非常之久,这期间的融合方式也经历过变革。发展至今,Raft 也只是 RocketMQ 高可用机制中的一小部分,RocketMQ 已然发展出了一套适合自身的高可用共识协议。 本章主要阐述 RocketMQ 为了在系统内实现 Raft 算法作出过哪些尝试,以及当前 Raft 在 RocketMQ 中的存在形态与具体作用。 Raft 在 RocketMQ 中的初期形态 RocketMQ 引入 Raft 协议的主要原因是为了增强系统的高可用性和故障自动恢复能力。在 4.5 版本之前,RocketMQ 只有 Master/Slave 一种部署方式,即一组 broker 中仅有一个 Master,有零到多个 Slave,这些 Slave 以同步或者异步的方式去复制 Master 中的数据。然而这种方式存在一些限制: 1. 故障转移不是完全自动的: 当 Master 节点出现故障时,需要人工介入进行手动重启或者切换到 Slave 节点。 2. 对外部依赖较高: 虽然可以通过第三方协调服务(如 ZooKeeper 或 etcd)实现自动选主,但这增加了部署和运维的复杂性,同时第三方服务本身的故障也可能影响到 RocketMQ 的集群。 为了解决上述问题,RocketMQ 引入了基于 Raft 协议的 DLedger 存储库 [5]。DLedger 是一个分布式日志复制技术,它使用 Raft 协议,可以在多个副本之间安全地复制和同步数据。RocketMQ 4.5 版本发布后,可以采用 RocketMQ on DLedger 方式进行部署。DLedger commitlog 代替了原来的 CommitLog,使得 CommitLog 拥有了选举复制能力,然后通过角色透传的方式,raft 角色透传给外部 broker 角色,leader 对应原来的 master,follower 和 candidate 对应原来的 slave: (图 4:RocketMQ on DLedger 部署形态,每个 broker 间的角色由 Raft CommitLog 向外透传) 因此 RocketMQ 的 broker 拥有了自动故障转移的能力。在一组 broker 中, Master 挂了以后,依靠 DLedger 自动选主能力,会重新选出 leader,然后通过角色透传变成新的 Master。DLedger 还可以构建高可用的嵌入式 KV 存储。我们把对一些数据的操作记录到 DLedger 中,然后根据数据量或者实际需求,恢复到hashmap 或者 rocksdb 中,从而构建一致的、高可用的 KV 存储系统,应用到元信息管理等场景。 我们测试了各种故障下 Dledger 表现情况,包括随机对称分区,随机杀死节点,随机暂停一些节点的进程模拟慢节点的状况,以及 bridge、partitionmajoritiesring 等复杂的非对称网络分区。在这些故障下,DLedger 都保证了一致性,验证了 DLedger 有很好可靠性 [6]。 总结来说,引入 Raft 协议后,RocketMQ 的多副本架构得以增强,提高了系统的可靠性和自我恢复能力,同时也简化了整个系统的架构,降低了运维的复杂性。这种架构通过 Master 故障后短时间内重新选出新的 Master 来解决单主问题,但是由于 Raft 选主和复制能力一同在数据链路(CommitLog)上,因此存在以下问题: 1. Broker 组内的副本数必须是 3 副本及以上才有切换能力,因此部署的最低成本是有上升的。 2. Raft 多数派限制导致三副本副本必须两副本响应才能返回,五副本需要三副本才能返回,因此 ACK 是不够灵活的,这也导致“发送延迟低”和“副本冗余小”两种要求很难做到。 3. 由于存储复制链路用的是 OpenMessaging DLedger 库,导致 RocketMQ 原生的一些存储能力没办法利用,包括像 TransientPool、零拷贝的能力,如果要在 Raft 模式下使用的话,就需要移植一遍到 DLedger 库,开发特性以及 bug 修复也需要做两次,这样的维护和开发成本是非常高的。 此外,将选举逻辑嵌入数据链路中可能会引发一连串的问题,这直接与我们追求的高可用和稳定性目标背道而驰。以选举发生在数据链路中的假设情景为起点,我们可以设想一个由多个节点构成的存储集群,其中节点需要定期进行选举来决定谁负责数据流的管理任务。在这种设计下,选举不仅是控制面的一部分,而且直接影响数据链路的稳定性。一旦发生选举失败或不一致的情况,整个数据链路可能会受阻,导致写入能力的丧失或数据丢失。 但是我们将目光看向 PolarStore 时就能发现,它的设计思想包含了“控制面和数据面分离”:数据面操作仅依赖于本地缓存的全量元数据,而对控制面的依赖最小化。这种设计的优势在于即使控制面完全不可用,数据面依然能够依据本地缓存维持正常的读写操作。在这种情况下,控制面的选举机制永远不会影响到数据面的可用性。这种分离架构为存储系统带来了相当强的鲁棒性,即使在遭遇故障时也能够保持业务的连续性。 总而言之,将选举逻辑与数据链路解耦是保障存储系统高可用性和稳定性的关键。通过将控制面的复杂性和潜在故障隔离,可以确保即使在面临控制面故障时,数据面依然能够保持其核心功能,从而为用户提供持续的服务。这种健壮的设计理念在现代分布式存储系统中是至关重要的——在控制面遭遇问题时,数据面能够以最小的影响继续运作。 这个例子告诉我们,数据面的可用性如果和控制面解耦,那么控制面挂掉对数据面的影响很轻微。否则,要么要不断去提高控制面的可用性,要么就要接受故障的级联发生 [7]。这也是我们后文中 RocketMQ 的演进方向。 现在的 Raft Controller:控制面/数据面分离 上文中提到,我们以 DLedger 的形式将 Raft 引入了 RocketMQ,但这种引入实际上是给了 CommitLog 选举的能力。这种设计固然直接有效,能够直接赋予一致性给最重要的组件。但是这样的设计也让选举、复制的过程被耦合到了一起。当二者耦合时,每次的选举、拷贝便都强依赖 DLedger 的 Raft 实现,这对于未来的扩展性是非常不友好的。 那么,有没有更可靠、更灵活的解决方案呢?我们不妨把目光转向学术界,看看他们的灵感。在 OSDI' 20 上,Meta 公司管控平面元数据存储统一平台 Delos [8] 相关论文获得了的 Best Paper Award。这篇论文提供了一个全新的视角与思路来解决选主过程中“控制面与数据面耦合”的问题:虚拟共识(Virtual Consensus)。它在论文中描述了在生产环境中实现在线切换共识协议的工作。这种设计旨在通过虚拟化共享日志 API 来实现虚拟化共识,从而允许服务在不停机的情况下更改共识协议。 论文的出发点是,在生产环境中,系统往往高度集成了共识机制,因此更换共识协议会涉及到非常复杂且深入的系统改动。 以前文提到的 DLedger CommitLog 为例,其分布式共识协议中的数据流(负责容错)和控制流(负责同步共识组配置)是密切相关的。在这种紧密耦合的系统中进行修改是极其困难的,这就使得开发和实施新共识协议的代价变得相当昂贵,甚至单纯的更新微小特性都面临重大挑战。更不用提在这种环境下引入更先进的共识算法(未来如果有的话),这必然花费相当重大的成本。 相比之下,Delos 提出了一种新方法,通过分离控制层和数据层来克服这些挑战。在这个架构中,控制层负责领导选举,而数据层则由 VirtualLog 和下层的 Loglets 组成,用于管理数据。VirtualLog 提供了一个共享日志的抽象层,将不同 Loglets(代表不同共识算法实例)串联起来。这种抽象层在 VirtualLog 和各个 Loglet 之间建立日志条目的映射关系。要切换到新的共识协议时,简单地指示 VirtualLog 将新的日志条目交由新 Loglet 处理以达成共识即可。这种设计实现了共识协议的无缝切换,并显著降低了更换共识协议的复杂性和成本。 (图 5:Delos 设计架构示意图,控制面和数据面被分开,以 VirtualLog 的形式进行协作) 这种设计既然已经在学术界开诚布公,那 RocketMQ 也可以在工业界将其落地并作验证。在 RocketMQ 5.0 中,我们提出了 Raft Controller 的概念:仅在上层协议中使用 Raft 与其它选主算法,而下层数据链路的复制则由 Broker 中的一套数据复制算法负责,用于响应上层的选主结果。 这个设计理念在行业中其实并不罕见,反而已经成为了一种成熟且广泛被验证的实践。以 Apache Kafka 为例,这个高性能的消息传递系统采用了分层的架构策略,在早期版本中使用了 ZooKeeper 来构建其元数据的控制平面,这一控制平面负责管理集群的状态和元数据信息。随着 Kafka 的发展,新版本引入了自研的 KRaft 模式,进一步内部化并提升了元数据管理。此外,Kafka 的 ISR(InSync Replicas) [15] 协议承担了数据传输的重任,提供了高吞吐量和可配置的复制机制,确保了数据平面的灵活性和可靠性。 同样地,Apache BookKeeper,一个低延迟且高吞吐的存储服务,也采用了类似的架构思想。它利用 ZooKeeper 来管理控制平面,包括维护一致的服务状态和元数据信息。在数据平面方面,BookKeeper 利用其自研的 Quorum 协议来处理写操作,并确保读操作的低延迟性能。 类似这种设计,我们的 Broker 不再自己负责自己的选举,而是由上层 Controller 对下层的角色进行指示,下层根据指示结果进行数据的拷贝,从而达到选举与复制分离的目的。 (图 6:来源于 ATA 文章《全新 RocketMQ 5.0 高可用设计解读》) 不过与 Delos 不同的是,我们在这里面其实有三层共识 —— Controller 间的共识协议(Raft),Controller 对 Broker 选主时的共识协议(SyncState Set),Broker 间复制时的共识协议(主备确认复制算法)。我们在这个过程中额外增加了 Controller 间的共识,以保证控制节点也是强一致的。这三种共识算法具体实现在这里不加以赘述,有兴趣的可以看我们 RocketMQ 社区中的 RIP31/32/34/44 几个说明文档。 这个设计也在我们被 ASE 23' 录用的论文 [9] 中得以体现:Controller 组件承担了切换链路中的核心角色,但是又不影响数据链路的正常运行,即便其面临宕机、夯机、网络分区等问题,也不会导致 broker 的数据丢失、不一致。我们在后续测试中甚至模拟了更加严苛的场景,例如 Controller 与 Broker 同时宕机、同时夯机、同时进行随机网络分区等等,我们的设计均有非常好的表现。 下面,我们对 RocketMQ 中的共识协议作展开,深入地剖析 RocketMQ 中的共识是如何实现的。 RocketMQ 中的共识协议 首先我们在这里放一张大图,用于描述 Raft Controller 具体是如何实现控制面、数据面的共识的。 (图 7:Raft Controller 具体设计架构及其运作原理) 上图中,绿色部分的是 Controller,也就是 RocketMQ 划分出来的控制面。它自身包含了两种共识算法,分别是保证 Controller 自身共识的 Raft 算法。以及保证 Broker 共识的选主算法,SyncState Set(后文简称 3S)算法,这个算法是我们参考 PacificA 算法 [10] 提出的一套用于数据面选主的分布式共识协议。红色部分是 Broker,也就是我们最核心的数据面,这里面忽略了 Broker 的其它存储结构,仅保留复制过程中实现共识的核心文件:epoch 文件。我们基于它实现了数据复制过程中的共识协议。 下面我们针对控制面和数据面的共识,分别进行阐述。 控制面共识 控制面共识共有两层: a. Raft Controller 自身的共识——用于保证 Controller 间的数据强一致性 b. 3S 算法——用于保证 Broker 选主结果的强一致。 下面对这两种共识算法分别进行解释。 Raft In Controller Raft 在 RocketMQ 5.0 前,只在 CommitLog 中存在,以 DLedger CommitLog 的方式向外透传角色。但是经过我们前文的分析,可以知道这种方式是有不容忽视的弊端在的:选举、复制过程强耦合,复制过程强依赖 DLedger 存储库,迭代难度高。因此,我们考虑将 Raft 的能力向上移动,让其用于在控制面中实现原数据强一致。这样一来,选举的过程便与日志复制的过程区分开了,而且每次选举的成本相对更低,只需要同步非常有限的数据量。 在 Controller 中,我们将 Raft 算法用于选举 Controller 中的 Active Controller,由它来负责处理数据面的选举、同步等任务,其余的 Controller 则只负责同步 Active Controller 的处理结果。这样的设计能够保证 Controller 本身也是高可用的,且保证了仅有一个 Controller 在处理 Broker 的选举事务。 在最新的 RocketMQ 中,在这一层共提供了两种 Controller 内的分布式共识算法的实现:DLedger 与 JRaft。这两种共识算法可以在 Controller 的配置文件中被非常简单地选择。本质上来说,这两者都是 Raft 算法的具体实现,只不过具体的实现方式有些差异: a. DLedger [2] 是一个基于 raft 协议的 commitlog 存储库,是一个 append only 的日志系统,早期针对 RocketMQ 的诸多场景有过相当多的适配。同时,它是一个轻量级的 java library。它对外提供的 API 非常简单,append 和 get。 b. JRaft 全称为 SOFAJRaft [3],它是一个基于 Raft 一致性算法的生产级高性能 Java 实现,支持 MULTIRAFTGROUP,适用于高负载低延迟的场景。SOFAJRaft 是从百度的 braft [11] 移植而来的,做了一些优化和改进。 这两种 Raft 的具体实现都对外提供了非常简单的 API 接口,所以我们可以把更多的精力放在处理 Active Controller 的事务上。 3S Algorithm 抛开 Controller 本身的共识算法,我们将目光聚焦于 Active Controller 在整个过程中起的作用,这也是我们控制面共识的核心——3S 算法。 3S 算法中的SyncState Set 概念与 Kafka 的 InSync Replica(ISR) [17] 机制类似,都参考了微软的 PacificA 算法。与以分区为维度的 ISR 不同,3S 算法以整个 Broker 的维度发起选举,且针对 RocketMQ 的需要选举场景作了系统的归纳。相比较来说,3S 算法更加简单,选举更加高效,面对大量分区场景能有更加强大的表现。 3S 算法主要作用在 Controller与 Broker 的交互过程中,Active Controller 会处理每个 Broker 的心跳与选举工作。和 Raft 类似的,3S 算法也有心跳机制来实现类似租约的功效——当 Master Broker 一定时长没有上报心跳,就可能触发重新选举。不过与 Raft 不同的是,3S 算法有一个共识处理的核心:Controller。这种中心化的设计能够让数据面的选主更加简单,达成共识更加迅速。 在这种设计下,Broker 的心跳不再向同级别(数据面)发送,而是统一向上(控制面)发送。发送选举请求后,由 Controller 来决定哪个 Broker 可以作为 Master 存在,而其它 Broker 自然退化为 Slave。Controller 的选择原则可以是多样的(同步进度、节点资源等指标),也可以是简单有效的(数据同步进度达到一定阈值),只需要这个节点位于 SyncState Set 中。这也是一种 Strong Leader 的形式,只不过和 Raft 不同的地方在于: Raft 像是小组作业,同学们(Broker)互相投票进行小组长的票选,而 3S 算法则由班主任(Controller)根据举手快慢直接任命。 (图 8:多个 Broker 集群向 Active Controller 汇报集群内的主备角色以及同步情况) 如上图所示,三个 Broker 集群中的 Leader 都会定期向 Active Controller 上报集群的同步状态: a. A 集群的所有节点的同步进度都很良好,因此 Leader 上报的 SyncState Set 是所有节点。 b. B 集群的 Follower2 可能刚刚启动,仍旧在同步历史消息,因此 SyncState Set 并不会包含它——当 Leader 宕机时,Controller 自然也不会选择它。 c. C 集群中,虽然 1 号 Leader 已经宕机,但是 Controller 迅速便能决定 SyncState Set 中的 3 号节点作为替代,提拔为主节点,整个集群便能正常运转,此时,即便 3 号节点又宕机,也能选择 2 号节点为主节点,不影响集群运行状态。 这种设计的好处在哪里呢?Raft 算法的实现原理其实是“投票”,同学间彼此平等,靠投票结果“少数服从多数”。因此,对于一个有 2n+1 节点的集群来说,Raft 最多只能容忍n个节点失效,至少需要保证有 n+1 个节点是持续运行的。但是 3S 算法有一个选举中心,每次选举的 RPC 都向上发送,它不需要得到其它节点的认可便可选举出一个节点。因此对于之前提到的 2n+1 节点的集群来说,最多能容忍 2n 个节点的失效,即副本的数量不需要超过副本总数的一半,不需要满足 “多数派” 原则。通常,副本数大于等于 2 即可, 如此,便在可靠性和吞吐量方面取得平衡。 上面的例子比较简明扼要地介绍了 3S 算法和 Raft 的关系与不同,可以说 3S 算法的设计思想来源于 Raft,但是在特定场景下又优于 Raft。 此外,3S 算法的共识以整个 Broker 为维度,因此我们对选主时机作了优化,有如下几种情形可能触发选举,括号内的红色是更加形象的描述,将选主过程具像化为选小组长的过程,以便理解: a. 控制面,Controller 主动发起 (班主任发起): i.HeartbeatManager 监听到有 broker 心跳失效。(班主任发现有小组同学退学了) ii.Controller 检测到有一组 SyncState Set 不存在 master。(班主任发现有组长虽然在名册里,但是旷课了) b. 数据面,Broker 发起将自己选为 master (同学毛遂自荐): i. Broker 向 controller 查询元数据时,没找到master信息。(同学定期检查小组情况,问班主任为啥没小组长) ii. Broker 向 controller 注册完后,仍未从 controller 获取到 master 信息。(同学报道后发现没小组长,汇报并自荐) c. 运维侧,通过 RocketMQ Admin Tools 发起,是运维能力的一部分 (校长直接任命)。 通过上述两方面优化,3S 算法在 RocketMQ 5.0 中,展露了非常强大的功能性,让 Controller 成为了高可用设计范式中不可或缺的组件。其实 3S 算法在实际使用场景中还有很多细节上的处理优化,能够容忍前文提到的更加严峻的场景:如控制面和数据面同时发生故障,且故障节点超过一半以上的场景。这部分结果会在后文的混沌实验中得以展示。 数据面共识 控制面通过 Raft 算法保障了 Controller 间的角色共识,以及通过 3S 算法保障了 Broker 中的角色共识。那么在 Broker 角色被确定后,其数据面该如何根据选举结果保障数据的强一致呢?这里的算法并不复杂,因此笔者从实现角度介绍一下 RocketMQ 的设计,RocketMQ 的数据面共识主要由下面两个组件构成: HAClient: 每个 Slave 的 HAService 中必备的 client,负责管理同步任务中的读、写操作。 HAConnection: 代表在 Master 中的 HA 连接,每个 connection 理论上对应一个 slave。在该 connection 类中存储了传输过程中的诸多内容,包括 channel、传输状态、当前传输位点等等信息。 为了更形象地描述清楚 RocketMQ 在这方面的设计结构,笔者绘制了下面这幅图,可以看出核心还是数据的传输过程,分别设计了一个 Reader 与一个 Writer: (图 9:数据面复制过程的具体实现,Master 与 Slave 分别设计,但选举完成后可互相切换) 这么简单的设计,是如何确保数据写入时的强一致的呢? 核心的共识其实存在于 HAConnection(也就是图中左下角那个深蓝色框)的建立、维护过程中。每个 Broker 集群的主节点都会维护和所有 Slave 的连接关系,并将其存于 Connection 表中,在每次 Slave 来请求代复制数据后,都会反馈复制的最后位点与结果,因此主节点也可以基于此来确定上报给 Controller 的 SyncState Set。在 HAConnection 的建立过程中,有一个确保数据一致性的 HandShake 阶段。这个阶段能够对 CommitLog 作截断,从而保障复制位点之前的所有数据都是强一致的。这个过程通过 epoch 文件的标记实现:Epoch 文件中包含了每一次选举的状态,每次选举完成后,主节点都会在 epoch 文件上留下自己的痕迹,即当前的选举代数 + 当前的初始位点。 从这里也可以看到,我们数据面的共识算法也有一些 Raft 的影子:Raft 算法在每次选举后也会给任期数自增一,这个任期数的大小决定了后续选主的权威性。而在数据面共识算法中,选主的结果已经认定,任期数被用于多次选主结果的共识表征——当任期数与日志位点一致时,代表这两台 broker 就选主这件事达成过一致,因此可以认为此前的数据是强一致的,只需要保证后续数据的强一致即可。为了方便理解,可以通过下图进行描述: (图 10:来源于文章《全新 RocketMQ 5.0 高可用设计解读》) 类似上面这张图,最上面那个方块长条实际是 RocketMQ 的日志存储形态 MappedFile。下面两条方块组成的长条分别是主和备的 commitlog,备节点会从后向前找到最大的 一致的位点,然后截断到这个位点,开始向后复制。这种复制在 RocketMQ 中有单独的一个 Service 去执行,因此主备节点的复制和选举过程其实是彻底解耦开的,只有当一个备节点尽可能跟上主节点时,这个备才会被纳入 SyncState Set,后续才有资格参加选举。 拥抱故障,把故障当作常态 俗话说的好,“空谈误国,实干兴邦”,设计究竟是先进还是冗余,需要通过各方面的检验。 对于 RocketMQ 这种大规模在生产中被使用的系统,我们必须模拟出足够接近现实情况的故障,才能检验其可用性在现实场景中究竟如何。在这里,我们需要引入一个新的概念——混沌工程。 混沌工程的原始定义 [13] 为:“Chaos engineering is the discipline of experimenting on a system in order to build confidence in the system's capability to withstand turbulent conditions in production. ” 从原始定义看,混沌工程实际上是一种软件工程方法,旨在通过在软件系统的生产环境中故意引入混乱来验证系统的可靠性。混沌工程的基本假设是,生产环境是复杂且不可预测的,而通过模拟各种失败,可以发现并解决潜在的问题。这种方式有助于确保系统能够在面临真实世界中的各种挑战时,持续并有效地运行。 大家都写过代码,也都深知一个精心设计过的系统总是能够巧妙通过各个测试样例,但是上线后总会遇到各种问题。因此对于一个系统来说,能够出色地通过复杂且不可预知的频繁故障的混沌工程的考验,而不是测试样例,才能说明这个系统是高可用的。 下面我们将详细介绍,我们为了验证 RocketMQ 的高可用,对其作过哪些“拷打”。 OpenChaos OpenChaos [14] 作为云原生场景量身定制的混沌“刑具”,位于 OpenMessaging 名下,托管于 Linux 基金会。目前,它支持以下平台的混沌测试:Apache RocketMQ、Apache Kafka、DLedger、Redis、Zookeeper、Etcd、Nacos。 目前 OpenChaos 支持注入多种故障类型,其中最主要的便是: 1. randompartition (fixedpartition):随机(固定)隔离节点与网络的其他部分。 2. randomloss:随机选定的节点丢失网络数据包。 3. randomkill (minorkill, majorkill, fixedkill):终止随机(少数、多数、固定)的进程并重启它们。 4. randomsuspend (minorsuspend, majorsuspend, fixedsuspend):使用 SIGSTOP/SIGCONT 暂停随机(少数、多数、固定)的节点。 在实际场景中,最常见的故障就是这四种:网络分区、丢包、宕机、夯机。此外,OpenChaos 还支持其它更复杂的特定场景,例如 Ring(每个节点能够看到大多数其他节点,但没有节点能看到与任何其他节点相同的多数节点)和 Bridge(网络一分为二,但保留中间的一个节点,该节点与两边的组件保有不间断的双向连通性),形成条件非常严苛,而且它们阻碍共识生效的原理都是“通过阻碍各节点间的可见性,来避免形成全局多数派”,理论上来说,通过足够久的网络分区、丢包,也能模拟出这些情况,甚至更复杂的情况。 因此,我们注入了大量上面罗列的四种混沌故障,观察集群是否有出现消息丢失的情况,并统计了故障恢复时间。 具体测试场景 我们混沌测试的验证实验环境如下: a. namesrv 一台,内含 namesrv 进程,openchaos 的混沌测试进程也在该机器上启动,向 controller/broker 发出控制指令。 b. controller 三台,内含 controller 进程。 c. broker 三台,同属一个集群,分别为主备,内含 broker 进程。 上述 7 台机器的配置为,处理器:8 vCPU,内存:16 GiB,规格族:ecs.c7.2xlarge,公网带宽:5Mbps,内网带宽:5/ 最高 10 Gbps。 在测试中,我们设置了如下的若干种随机测试场景,每种场景都会持续至少 60 秒,且恢复后会保证 60 秒的时间间隔再注入下一次故障: a. 机器宕机,这个混沌故障注入通过 kill 9 命令实现,将会杀死范围内的随机进程。 i. Broker 节点,随机宕机一半以上的节点,至少保留一台 Broker 工作。 ii. Controller 节点,随机宕机一半以上的节点,以及全部宕机。iii. Broker+Controller 节点,随机宕机一半以上的节点。 b. 机器夯机,这个混沌故障注入通过kill SIGSTOP 命令实现,模拟进程暂停的情况。 i. Broker 节点,随机夯机一半以上的节点,至少保留一台 Broker 工作。 ii. Controller 节点,随机夯机一半以上的节点,以及全部夯机。 iii. Broker+Controller 节点,随机夯机一半以上的节点。 c. 机器丢包,这个混沌故障注入通过 iptables 命令实现,可以指定机器间特定比例的丢包事件。 i. Broker 间随机丢包 80%。 ii. Controller 间随机丢包 80%。 iii. Broker 和 Controller 间随机丢包 80%。 d. 网络分区,这个混沌故障注入也是通过 iptables 命令实现,能够将节点间完全分区。 i. Broker 间随机网络分区。 ii. Controller 间随机网络分区。 iii. Broker 合 Controller 间随机网络分区。 实际测试场景、组别远多于上述罗列的所有故障场景,但是存在一些包含关系,例如单台 broker/controller 的启停,便不再单独罗列。此外我们均针对 Broker 的重要参数配置进行了交叉测试。测试的开关有:transientStorePoolEnable(是否使用直接内存),slaveReadEnable(备节点是否提供读消息能力)。我们还针对 Controller 的类型(DLedger/JRaft)也进行了分别的测试,每组场景的测试至少重复 5 次,每次至少持续 60 分钟。 实验结论 针对上述提出的所有场景,混沌测试的总时长至少有: 12(场景数) 2(Broker开关数) 2 (Controller类型) 5(每组测试次数) 60(单组时长) =14400 分钟。 由于设置的注入时长一分钟,恢复时长一分钟,因此至少共计注入故障 14400/2 = 7200 次(实际上注入时长、注入次数远多于上述统计值)。 在这些记录在册的测试结果中,有如下测试结论: a. RocketMQ 无消息丢失,数据在故障注入前后均保持强一致。 b. 恢复时长基本等于客户端的路由间隔时间,在路由及时的情况下,能够保证恢复 RTO 约等于 3 秒。 c. Controller 任意形式下的故障,包括宕机、夯机、网络故障等等,均不影响 Broker 的正常运转。 总结 本文总结了 RocketMQ 与 Raft 的前世今生。从最开始的忠实应用 Raft 发展 DLedger CommitLog,到如今的控制面/数据面分离,并分别基于 Raft 协议作专属于 RocketMQ 的演化。如今 RocketMQ 的高可用已经逐渐趋于成熟:基于三层共识协议,分别实现 Controller 间、Controller&Broker、Broker 间的共识。 在这种设计下,RocketMQ 的角色、数据共识被妥善地划分到了多个层次间,并能够彼此有序地协作。当选主与复制不再耦合,我们便能更好地腾出手脚发展各个层次间的共识协议——例如,当出现比 Raft 更加优秀的共识算法时,我们可以直接将其应用于 Controller 中,且对于我们的数据面无任何影响。 可以说 Raft 的设计给 RocketMQ 的高可用注入了非常多的养分,能够让 RocketMQ 在其基础上吸纳其设计思想,并作出适合自己的改进。RocketMQ 的共识算法与高可用设计 [9] 在 2023 年也得到了学术界的认可,被 CCFA 类学术会议 ASE 23' 录用。期待在未来能够出现更加优秀的共识算法,能够在 RocketMQ 的实际场景中被适配、发扬。 参考链接: _ _ _ [4] Diego Ongaro and John Ousterhout. 2014. In search of an understandable consensus algorithm. In Proceedings of the 2014 USENIX conference on USENIX Annual Technical Conference (USENIX ATC'14). USENIX Association, USA, 305–320. _ _ [8] Mahesh Balakrishnan, Jason Flinn, Chen Shen, Mihir Dharamshi, Ahmed Jafri, Xiao Shi, Santosh Ghosh, Hazem Hassan, Aaryaman Sagar, Rhed Shi, Jingming Liu, Filip Gruszczynski, Xianan Zhang, Huy Hoang, Ahmed Yossef, Francois Richard, and Yee Jiun Song. 2020. Virtual consensus in delos. In Proceedings of the 14th USENIX Conference on Operating Systems Design and Implementation (OSDI'20). USENIX Association, USA, Article 35, 617–632. [9] Juntao Ji, Rongtong Jin, Yubao Fu, Yinyou Gu, TsungHan Tsai, and Qingshan Lin. 2023. RocketHA: A High Availability Design Paradigm for Distributed LogBased Storage System. In Proceedings of the 38th IEEE/ACM International Conference on Automated Software Engineering (ASE 2023), Luxembourg, September 1115, 2023, pp. 18191824. IEEE. _ _ [12] Leslie Lamport. 2001. Paxos Made Simple. ACM SIGACT News (Distributed Computing Column) 32, 4 (Whole Number 121, December 2001), 5158. _ _ _
作者:季俊涛
#技术探索

2024年8月13日

谈谈 RocketMQ 5.0 分级存储背后一些有挑战的技术优化
作者|斜阳 RocketMQ 5.0 提出了分级存储的新方案,经过数个版本的深度打磨,RocketMQ 的分级存储日渐成熟,并成为降低存储成本的重要特性之一。事实上,几乎所有涉及到存储的产品都会尝试转冷降本,如何针对消息队列的业务场景去做一些有挑战的技术优化, 是非常有意思的事。 这篇文章就跟大家探讨下,在消息系统这样一个数据密集型应用的模型下,技术架构选型的分析与权衡,以及分级存储实现与未来演进,让云计算的资源红利真正传达给用户。 1. 背景与需求 RocketMQ 诞生于 2012 年,存储节点采用 sharednothing 的架构读写自己的本地磁盘,单节点上不同 topic 的消息数据会顺序追加写 CommitLog 再异步构建多种索引,这种架构的高水平扩展能力和易维护性带来了非常强的竞争力。 随着存储技术的发展和各种百G网络的普及,RocketMQ 存储层的瓶颈逐渐显现,一方面是数据量的膨胀远快于单体硬件,另一方面存储介质速度和单位容量价格始终存在矛盾。在云原生和 Serverless 的技术趋势下,只有通过技术架构的演进才能彻底解决单机磁盘存储空间上限的问题,同时带来更灵活的弹性与成本的下降,做到 “鱼与熊掌兼得”。 在设计分级存储时,希望能在以下方面做出一些技术优势: 实时: RocketMQ 在消息场景下往往是一写多读的,热数据会被缓存在内存中,如果能做到 “准实时” 而非选用基于时间或容量的淘汰算法将数据转储,可以减小数据复制的开销,利于缩短故障恢复的 RTO。读取时产生冷读请求被重定向,数据取回不需要“解冻时间”,且流量会被严格限制以防止对热数据写入的影响。 弹性: sharednothing 架构虽然简单,缩容或替换节点的场景下待下线节点的数据无法被其他节点读取,节点需要保持相当长时间只读时间,待消费者消费完全部数据,或者执行复杂的迁移流程才能缩容,这种 “扩容很快,缩容很慢” 的形态一点都不云原生,更长久的消息保存能力也会放大这个问题。分级存储设计如果能通过 shareddisk (共享存储) 的方式让在线节点实现代理读取下线节点的数据,既能节约成本也能简化运维。 差异化: 廉价介质随机读写能力较差,类 LSM 的结构都需要大量的 compation 来压缩回收空间。在满足针对不同 topic 设置不同的生命周期(消息保留时间,TTL)等业务需求的前提下,结合消息系统数据不可变和有序的特点,RocketMQ 自身需要尽量少的做格式 “规整” 来避免反复合并的写放大,节约计算资源。 竞争力: 分级存储还应考虑归档压缩,数据导出,列式存储和交互式查询分析能力等高阶技术演进。 2. 技术架构选型 2.1. 不同视角 不妨让我们站在一个新的视角看问题,消息系统对用户暴露的是收发消息,位点管理等一系列的 API,为用户提供了一种能够优雅处理动态数据流的方式,从这个角度说:消息系统拓宽了存储系统的边界。 其实服务端应用大多数是更底层 SQL,POSIX API 的封装,而封装的目的在于简化复杂度的同时,又实现了信息隐藏。 消息系统本身关注的是高可用,高吞吐和低成本,想尽量少的关心存储介质的选择和存储自身的系统升级,分片策略,迁移备份,进一步冷热分层压缩等问题,减少存储层的长期维护成本。 一个高效的、实现良好的存储层应该对不同存储后端有广泛的支持能力,消息系统的存储后端可以是本地磁盘,可以是各类数据库,也可以是分布式文件系统,对象存储,他们是可以轻松扩展的。 2.2. 存储后端调研 幸运的是,几乎所有的“分布式文件系统”或者“对象存储”都提供了“对象一旦上传或复制成功,即可立即读取”的强一致语义,就像 CAP 理论中的描述 “Every read receives the most recent write or an error” 保证了“分布式存储系统之内多副本之间的一致性”。对于应用来说,没有 “拜占庭错误” 是非常幸福的(本来有的数据变没了,破坏了存储节点的数据持久性),更容易做到“应用和分布式存储系统之间是一致的”,并显著减少应用的开发和维护成本。 常见的分布式文件系统有 Ali Pangu,HDFS,GlusterFS,Ceph,Lustre 等。对象存储有 Amazon S3,Aliyun OSS,Azure Blob Storage,Google Cloud Storage,OpenStack Swift 等。他们的简单对比如下: API 支持: 选用对象存储作为后端,通常无法像 HDFS 一样提供充分的 POSIX 能力支持,对于非 KV 型的操作往往存在一定性能问题,例如列出大量对象时需要数十秒,而在分布式文件系统中这类操作只需要毫秒甚至微秒。如果选用对象存储作为后端,弱化的 API 语义要求消息系统本身能够有序管理好这些对象的元数据。 容量与水平扩展: 对于云产品或者大规模企业的存储底座来说,以 HDFS 为例,当集群节点超过数百台,文件达到数亿量级以上时,NameNode 会产生性能瓶颈。一旦底层存储由于容量可用区等因素出现多套存储集群,这种 “本质复杂度” 在一定程度上削弱了 shareddisk 的架构简单性,并将这种复杂度向上传递给应用,影响消息产品本身的多租,迁移,容灾设计。典型的情况就是大型企业为了减少爆炸半径,往往会部署多套 K8s 并定制上层的 Cluster Federation(联邦)。 成本: 以国内云厂商官网公开的典型目录价为例: 本地磁盘,无副本 0.060.08 元/GB/月 云盘,SSD 1元/GB/月,高效云盘 0.35 元/GB/月 对象存储单 AZ 版 0.12 元/GB/月,多 AZ 版本 0.15 元/GB/月,低频 0.08 元/GB/月 分布式文件系统,如盘古 HDFS 接口,支持进一步转冷和 EC。 生态链: 对象存储和类 HDFS 都有足够多的经过生产验证的工具,监控报警层面对象存储的支持更产品化。 2.3. 直写还是转写 方案里,备受瞩目的点在于选择直写还是转写,我认为他们不冲突,两个方案 “可以分开有,都可以做强”。 多年来 RocketMQ 运行在基于本地存储的系统中,本地磁盘通常 IOPS 较高,成本较低但可靠性较差,大规模的生产实践中遇到的问题包括但不限于垂直扩容较难,坏盘,宿主机故障等。 直写: 指使用高可用的存储替换本地块存储,例如使用云盘多点挂载(分布式块存储形态,透明 rdma)或者直写分布式文件系统(下文简称 DFS)作为存储后端,此时主备节点可以共享存储,broker 的高可用中的数据流同步简化为只同步位点,在很大程度上减化了 RocketMQ 高可用的实现。 转写: 对于大部分数据密集型应用,出于故障恢复的考虑必须实时写日志,意味着无法对数据很好的进行攒批压缩,如果仅使用廉价介质,会带来更高的延迟以及更多的内存使用,无法满足生产需要。一个典型思路就是热数据使用容量小的高速介质先顺序写,compation 后转储到更廉价的存储系统中。 直写的目的是池化存储,转写的目的是降低数据的长期保存成本, 所以我认为一个理想的终态可以是两者的结合。RocketMQ 自己来做数据转冷,那有同学就会提出反问了,如果让 DFS 自身支持透明转冷,岂不是更好? 我的理解是 RocketMQ 希望在转冷这个动作时,能够做一些消息系统内部的格式变化来加速冷数据的读取,减少 IO 次数,配置不同 TTL 等。 相对于通用算法,消息系统自身对如何更好的压缩数据和加速读取的细节更加了解。 而且主动转冷的方案在审计和入湖的一些场景下,也可以被用于服务端批量转储数据到不同的平台,到 NoSQL 系统,到 ES,Click House,到对象存储,这一切是如此的自然~ 2.4. 技术架构演进 那么分级存储是一个尽善尽美的最终解决方案吗? 理想很美好,让我们来看一组典型生产场景的数据。 RocketMQ 在使用块存储时,存储节点存储成本大约会占到 30%50%。开启分级存储时,由于数据转储会产生一定的计算开销,主要包括数据复制,数据编解码,crc 校验等,不同场景下计算成本会上升 10%40%,通过换算,我们发现存储节点的总体拥有成本节约了 30% 左右。 考虑到商业和开源技术架构的一致性,选择了先实现转写模式,热数据的存储成本中随着存储空间显著减小,这能够更直接的降低存储成本,在我们充分建设好当前的转写逻辑时再将热数据的 WAL 机制和索引构建移植过来,实现基于分布式系统的直写技术,这种分阶段迭代会更加简明高效,这个阶段我们更加关注通用性和可用性。 可移植性: 直写分布式系统通常需要依赖特定 sdk,配合 rdma 等技术来降低延迟,对应用不完全透明,运维,人力,技术复杂度都有一定上升。保留成熟的本地存储,只需要实现存储插件就可以轻松的切换多种存储后端,不针对 IaaS 做深度绑定在可移植性上会有一定优势。 延迟与性能: 直写模式下存储紧密结合,应用层 ha 的简化也能降低延迟(写多数派成功才被消费者可见),但无论写云盘或者本地磁盘(同区域)延迟都会小于跨可用区的延迟,存储延迟在热数据收发链路不是瓶颈。 可用性: 存储后端往往都有复杂的容错和故障转移策略,直写与转写模式在公有云下可用性都满足诉求。考虑到转写模式下系统是弱依赖二级存储的,更适合开源与非公共云场景。 我们为什么不进一步压缩块存储的磁盘容量,做到几乎极致的成本呢? 事实上,在分级存储的场景下,一味的追求过小的本地磁盘容量价值不大。 主要有以下原因: 故障冗余,消息队列作为基础设施中重要的一环,稳定性高于一切。对象存储本身可用性较高,如果遇到网络波动等问题时,使用对象存储作为主存储,非常容易产生反压导致热数据无法写入, 而热数据属于在线生产业务,这对于可用性的影响是致命的。 过小的本地磁盘,在价格上没有明显的优势。 众所周知,云计算是注重普惠和公平的, 如果选用 50G 左右的块存储,又需要等价 150G 的 ESSD 级别的块存储能提供的 IOPS,则其单位成本几乎是普通块存储的数倍。 本地磁盘容量充足的情况下,上传时能够更好的通过 “攒批” 减少对象存储的请求费用。读取时能够对“温热” 数据提供更低的延迟和节约读取成本。 仅使用对象存储,难以对齐 RocketMQ 当前已经存在的丰富特性, 例如用于问题排查的随机消息索引,定时消息特性等,如果为了节约少量成本,极大的削弱基础设施的能力,反向要求业务方自建复杂的中间件体系是得不偿失的。 3. 分级存储的数据模型与实现 3.1. 模型与抽象 RocketMQ 本地存储数据模型如下: MappedFile:单个真实文件的句柄,也可以理解为 handle 或者说 fd,通过 mmap 实现内存映射文件。是一个 AppendOnly 的定长字节流语义的 Stream,支持字节粒度的追加写、随机读。每个 MappedFile 拥有自己的类型,写位点,创建更新时间等元数据。 MappedFileQueue:可以看做是零个或多个定长 MappedFile 组成的链表,提供了流的无边界语义。Queue 中最多只有最后一个文件可以是 Unseal 的状态(可写)。前面的文件都必须都是 Sealed 状态(只读),Seal 操作完成后 MappedFile 是 immutable(不可变)的。 CommitLog:MappedFileQueue 的封装,每个 “格子” 存储一条序列化的消息到无界的流中。 ConsumeQueue:顺序索引,指向 CommitLog 中消息在 FileQueue 中的偏移量(offset)。 RocketMQ 分级存储提供的数据模型和本地模型类似,改变了 CommitLog 和 ConsumeQueue 的概念: TieredFileSegment:和 MappedFile 类似,描述一个分级存储系统中文件的句柄。 TieredFlatFile:和 MappedFileQueue 类似。 TieredCommitLog 和本地 CommitLog 混合写不同,按照单个 Topic 单个队列的粒度拆分多条 CommitLog。 TieredConsumeQueue 指向 TieredCommitLog 偏移量的一个索引,是严格连续递增的。实际索引的位置会从指向的 CommitLog 的位置改为 TieredCommitLog 的偏移量。 CompositeFlatFile:组合 TieredCommitLog 和 TieredConsumeQueue 对象,并提供概念的封装。 3.2. 消息上传流程 RocketMQ 的存储实现了一个 Pipeline,类似于拦截器链,Netty 的 handler 链,读写请求会经过这个 Pipeline 的多个处理器。 Dispatcher 的概念是指为写入的数据构建索引,在分级存储模块初始化时,会创建 TieredDispatcher 注册为 CommitLog 的 dispatcher 链的一个处理器。每当有消息发送到 Broker 会调用 TieredDispatcher 进行消息分发。下面我们来追踪单条消息进入存储层的流程: 1. 消息被顺序追加到本地 commitlog 并更新本地 max offset(图中黄色部分),为了防止宕机时多副本产生“读摆动”,多副本中多数派的最小位点会作为“低水位”被确认,这个位点被称为 commit offset(图中 2500)。换句话说,commit offset 与 max offset 之间的数据是正在等待多副本同步的。 2. 当 commit offset = message offset 之后,消息会被上传到二级存储的 commitlog 的缓存中(绿色部分)并更新这个队列的 max offset。 3. 消息的索引会被追加到这个队列的 consume queue 中并更新 consume queue 的 max offset。 4. 一旦 commitlog 中缓存大小超过阈值或者等待达到一定时间,消息的缓存将被上传至 commitlog,之后才会将索引信息提交,这里有一个隐含的数据依赖,使索引被晚于原始数据更新。这个机制保证了所有 cq 索引中的数据都能在 commitlog 中找到。宕机场景下,分级存储中的 commitlog 可能会重复构建,此时没有 cq 指向这段数据。由于文件本身还是被使用 Queue 的模型管理的,使得整段数据在达到 TTL 时能被回收,此时并不会产生数据流的“泄漏”。 5. 当索引也上传完成的时候,更新分级存储中的 commit offset(绿色部分被提交)。 6. 系统重启或者宕机时,会选择多个 dispatcher 的最小位点向 max offset 重新分发,确保数据不丢失。 在实际执行中,上传部分由三组线程协同工作。 1. store dispatch 线程,由于该线程负责本地 cq 的分发,我们不能长时间阻塞该线程,否则会影响消息进入本地存储的“可见性延迟”。因此 store dispatch 每次只会尝试对拆分后的文件短暂加锁,如果加锁成功,将消息数据放入拆分后的 commitlog 文件的缓冲区则立即退出,该操作不会阻塞。若获取锁失败则立即返回。 2. store compensate 线程组,负责对本地 cq 进行定时扫描,当写入压力较高时,步骤 1 可能获取锁失败,这个环节会批量的将落后的数据放入 commitlog 中。原始数据被放入后会将 dispatch request 放入 write map。 3. build cq index 线程。write map 和 read map 是一个双缓冲队列的设计,该线程负责将 read map 中的数据构建 cq 并上传。如果 read map 为空,则交换缓冲区,这个双缓冲队列在多个线程共享访问时减少了互斥和竞争操作。 各类存储系统的缓冲攒批策略大同小异,而线上的 topic 写入流量往往是存在热点的,根据经典的二八原则,RocketMQ 分级存储模块目前采用了 “达到一定数据量”,“达到一定时间”两者取其小的合并方式。 这种方式简单可靠,对于大流量的 topic 很容易就可以达到批的最小数据量,对于流量较低的 topic 也不会占用过多的内存。从而减少了对象存储的请求数,其开销主要包括 restful 协议请求头,签名和传输等。诚然,攒批的逻辑仍然存在较大的优化空间,例如 IOT,数据分片同步等各个 topic 流量较为平均的场景使用类似 “滑动窗口” 的加权平均算法,或者基于信任值的流量控制策略可以更好的权衡延迟和吞吐。 3.3. NonStopWrite 特性 NonStopWrite 模型实际上是一致性模型的一部分。实际生产中,后端分布式存储系统的断连和网络问题偶尔会不可避免,而 Append 模型实际上一种强顺序的模型,参考 HDFS 的 23 异步写,我们提出了一种基于 Append 和 Put 的混合模型。 例如:对于如下图片中的 stream,commit / confirm offset = 150,max offset = 200。此时写出缓冲区中的数据包括 150200 的 uncommitted 部分,还有 200 以后源源不断的写入的新数据。 假设后端存储系统支持原子性写入,单个上传请求的数据内容是 150200 这个区间,当单次上传失败时,我们需要向服务端查询上一次写入的位点并进行错误处理。 如果返回的长度是 150,说明上传失败,应用需要重传 buffer。 如果返回的长度是 200,说明前一次上传成功但没有收到成功的 response,提升 commit offset 至 200。 而另一种解决方案是,使用 NonStopWrite 机制立刻新切换一个文件,以 150 作为文件名,立刻重传 150 至 200 的数据,如果有新的数据也可以立刻与这些数据一起上传,我们发现混合模型存在显著优势: 对于绝大部分没有收到成功的响应时,上传是失败的而不是超时,立刻切换文件可以不去 check in 文件长度,减少 rpc 数量。 立刻重传不会阻塞后续新的数据上传,不容易由于后端数据无法写出造成反压,导致前端写失败。 无论 150200 这段数据在第一个文件是到底是写成功还是失败都无关紧要,因为不会去读取这段数据。尤其是对于不支持请求粒度原子写入的模型来说,如果上一次请求的结果是 180,那么错误处理将会非常复杂。 3.4. 随机索引重排 21 年的时候,我第一次听到用“读扩散”或者“写扩散”来描述一个设计方案, 这两个词简洁的概括了应用性能设计的本质。各种业务场景下,我们总是选择通过读写扩散, 选择通过格式的变化,将数据额外转储到一份性能更好或者更廉价的存储, 或者通过读扩散减少数据冗余(减少索引提高了平均查询代价)。 RocketMQ 会在先内存构建基于 hash 的持久化索引文件 IndexFile(非 AppendOnly),再通过 mmap 异步的将数据持久化到磁盘。这个文件是为了支持用户通过 key,消息 ID 等信息来追踪一条消息。 对于单条消息会先计算 hash(topickey) % slot_num 选择 hash slot (黄色部分) 作为随机索引的指针,对象索引本身会附加到 index item 中,hash slot 使用“哈希拉链”的方式解决冲突,这样便形成了一条当前 slot 按照时间存入的倒序的链表。不难发现,查询时需要多次随机读取链表节点。 由于冷存储的 IOPS 代价是非常昂贵的,在设计上我们希望可以面向查询进行优化。新的文件结构类似于维护没有 GC 和只有一次 compation 的 LSM 树,数据结构的调整如下: 1. 等待本地一个 IndexFile 完全写满,规避修改操作,在高 IOPS 的存储介质上异步 compation,完成后删除原来的文件。 2. 从冷存储查询延迟高,而单次返回的数据量大小(不太大的场景)并不会明显改变延迟。compation 时优化数据结构,做到用一次查询连续的一段数据替换多次随机点查。 3. hash slot 的指向的 List 是连续的,查询时可以根据 hash slot 中的 item offset 和 item size 一次取出所有 hashcode 相同的记录并在内存中过滤。 3.5. 消息读取流程 3.5.1 读取策略 读取是写入的逆过程,优先从哪里取回想要的数据必然存在很多的工程考虑与权衡。如图所示,近期的数据被缓存在内存中,稍久远的数据存在与内存和二级存储上,更久远的数据仅存在于二级存储。当被访问的数据存在于内存中,由于内存的速度快速存储介质,直接将这部分数据通过网络写会给客户端即可。如果被访问的数据如图中 request 的指向,存在于本地磁盘又存在于二级存储,此时应该根据一二级存储的特性综合权衡请求落到哪一层。 有两种典型的想法: 1. 数据存储被视为多级缓存,越上层的介质随机读写速度快,请求优先向上层存储进行查询,当内存中不存在了就查询本地磁盘,如果还不存在才向二级存储查询。 2. 由于在转冷时主动对数据做了 compation,从二级存储读取的数据是连续的,此时可以把更宝贵一级存储的 IOPS 留给在线业务。 RocketMQ 的分级存储将这个选择抽象为了读取策略,通过请求中的逻辑位点(queue offset)判断数据处于哪个区间,再根据具体的策略进行选择: DISABLE:禁止从多级存储中读取消息,可能是数据源不支持。 NOT_IN_DISK:不在一级存储的的消息都会从二级存储中读取。 NOT_IN_MEM:不在内存中的消息即冷数据从多级存储读取。 FORCE:强制所有消息从多级存储中读取,目前仅供测试使用。 3.5.2 预读设计 TieredMessageFetcher 是 RocketMQ 分级存储取回数据的具体实现。 为了加速从二级存储读取的速度和减少整体上对二级存储请求数,采用了预读缓存的设计: 即 TieredMessageFetcher 读取消息时会预读更多的消息数据,预读缓存的设计参考了 TCP Tahoe 拥塞控制算法,每次预读的消息量类似拥塞窗口采用加法增、乘法减的流量控制机制。 加法增:从最小窗口开始,每次增加等同于客户端 batchSize 的消息量。 乘法减:当缓存的消息超过了缓存过期时间仍未被全部拉取,此时一般是客户端缓存满,消息数据反压到服务端,在清理缓存的同时会将下次预读消息量减半。 此外,在客户端消费速度较快时,向二级存储读取的消息量较大,此时会使用分段策略并发取回数据。 3.6. 定时消息的分级存储 除了普通消息,RocketMQ 支持设置未来几十天的长定时消息,而这部分数据严重挤占了热数据的存储空间。 RocketMQ 实现了基于本地文件系统的时间轮,整体设计如左侧所示。单节点上所有的定时消息会先写入 rmq_sys_wheel_timer 的系统 topic,进入时间轮,出队后这些消息的 topic 会被还原为真实的业务 topic。 “从磁盘读取数据”和“将消息索引放入时间轮”这两个动作涉及到 IO 与计算,为了减少这两个阶段的锁竞争引入了 Enqueue 作为中转的等待队列,EnqueuGet 和 EnqueuePut 分别负责写入和读取数据,这个设计简单可靠。 不难发现,所有的消息都会进入时间轮,这也是挤占存储空间的根本原因。 写入时,RocketMQ 的分级存储定时消息针对 EnqueuePut 做了一个分流,对于大于当前时间数小时的消息会被写入到基于分级存储的 TimerFlatFile 文件中,我们维护了一个 ConcurrentSkipListMap timerFlatFileTable; 每间隔 1 小时,设置一个 TimerFlatFile,对于 T+n 至 T+n+1 的定时消息,会先被混合追加到 T+n 所对应的文件中。 读取时,当前时间 + 1 小时的消息将被提前出队,这些消息又会重新进入本地 TimerStore 的系统 topic 中/此时,由于定时时间都是将来一小段时间的,他们不再会进入时间轮的结构中。 在这个设计上有一些工程性的考虑: timerFlatFileTable 中的 Key 很多,会不会让分级存储上的数据碎片化?分布式文件系统底层一般使用类 LSM 结构,RocketMQ 只关心 LBA 结构,可以通过优化 Enqueue 的 buffer 让写分级存储时数据达到攒批的效果。 可靠的位点,Enqueue 到“时间轮”和 timerFlatFileTable 可以共用一个 commit offset。对于单条消息来说,只要它进入时间轮或者被上传成功,我们就认为一条消息已经持久化了。由于更新到二级存储本身需要一些攒批缓冲的过程,会延迟 commit offset 的更新,但是这个缓冲时间是可控的。 我们发现偶尔本地存储转储到二级存储会较慢,使用双缓冲队列实现读写分离(如图片中绿色部分)此时消息被放入写缓存,随后转入读缓存队列,最后进入上传流程。 4. 分级存储企业级竞争力 4.1. 冷数据的压缩与归档 压缩是一种经典的时间与空间交换的权衡,其目的在于通过较小的 CPU 开销,实现更少的磁盘占用或网络 I/O 传输。目前,RocketMQ 的热存储在考虑延迟的情况下,仅对单条大于 4K 的消息进行单条压缩存储。对于冷存储服务其实可以做两个层面的压缩与归档处理。 消息队列业务层面,对于大多数业务 Topic,其 Body 通常存在相似性,可将其压缩至原大小的几分之一至几十分之一。 底层存储层面,使用 EC 纠删码,数据被分成若干个数据块,然后再根据一定的算法,生成一些冗余块。当数据丢失时,可以使用其余的数据块和冗余块来恢复丢失的数据块,从而保证数据的完整性和可靠性。典型的 EC 算法后存储空间的使用可以降低到 1.375 副本。 业界也有一些基于 FPGA 实现存储压缩加速的案例,我们将持续探索这方面的尝试。 4.2. 原生的只读挂载能力实现 Serverless 业界对 Serverless 有不同的理解,过去 RocketMQ 多节点之间不共享存储,导致“扩容快,缩容慢”,例如 A 机器需要下线,则必须等普通消息消费完,定时消息全部出队才能进行运维操作。分级存储设计通过 shareddisk的方式实现跨节点代理读取下线节点的数据,如右图所示:A 的数据此时可以被 B 节点读取,彻底释放了 A 的计算资源和一级存储资源。 这种缩容的主要流程如下: 1. RocketMQ 实现了一个简单的选举算法,正常情况下集群内每一个节点都持有对自己数据独占的写锁。 2. 待下线的节点做优雅下线,确保近期定时消息,事务消息,pop retry 消息都已被完整处理。上传自己的元数据信息到共享的二级存储,并释放自己的写锁。 3. 集群使用一定的负载均衡算法,新的节点获取写锁,将该 Broker 的数据以只读的形式挂载。 4. 将原来节点的元数据注册到 NameServer 对客户端暴露。 5. 对于原节点的写请求,例如位点更新,将在内存中处理并周期性快照到共享存储中。 5. 总结 RocketMQ 的存储在云原生时代的演进中遇到了更多有趣的场景和挑战,这是一个需要全链路调优的复杂工程。出于可移植性和通用性的考虑,我们还没有非常有效的使用 DPDK + SPDK + RDMA 这些新颖的技术,但我们解决了许多工程实践中会遇到的问题并构建了整个分级存储的框架。在后续的发展中,我们会推出更多的存储后端实现,针对延迟和吞吐量等细节做深度优化。 参考文档: [1] Chang, F., Dean, J., Ghemawat, S., et al. Bigtable: A distributed storage system for structured data. ACM Transactions on Computer Systems, 2008, 26(2): 4.[2] Liu, Y., Zhang, K., & Spear, M. DynamicSized Nonblocking Hash Tables. In Proceedings of the ACM Symposium on Principles of Distributed Computing, 2014.[3] Ongaro, D., & Ousterhout, J. In Search of an Understandable Consensus Algorithm. Proceedings of the USENIX Conference on Operating Systems Design and Implementation, 2014, 305320.[4] Apache RocketMQ. GitHub, _https://github.com/apache/rocketmq_[5] Verbitski, A., Gupta, A., Saha, D., et al. Amazon aurora: On avoiding distributed consensus for i/os, commits, and membership changes. In Proceedings of the 2018 International Conference on Management of Data, 2018, 789796.[6] Antonopoulos, P., Budovski, A., Diaconu, C., et al. Socrates: The new sql server in the cloud. In Proceedings of the 2019 International Conference on Management of Data, 2019, 17431756.[7] Li, Q. More Than Capacity: Performanceoriented Evolution of Pangu in Alibaba. Fast 2023_https://www.usenix.org/conference/fast23/presentation/liqiangdeployed_[8] Lu, S. Perseus: A FailSlow Detection Framework for Cloud Storage Systems. Fast 2023
作者:斜阳
#技术探索

2024年8月13日

RocketMQ 批处理模型演进之路
作者|谷乂 RocketMQ 的目标,是致力于打造一个消息、事件、流一体的超融合处理平台。这意味着它需要满足各个场景下各式各样的要求,而批量处理则是流计算领域对于极致吞吐量要求的经典解法,这当然也意味着 RocketMQ 也有一套属于自己风格的批处理模型。 至于什么样的批量模型才叫“属于自己风格”呢,且听我娓娓道来。 什么是批处理 首先,既然谈 RocketMQ 的批处理模型,那就得聊聊什么是“批处理”,以及为什么批处理是极致吞吐量要求下的经典解法。在我看来,批处理是一种泛化的方法论,它处在各个系统的方方面面,无论是传统工业还是互联网,甚至在日常生活中,都能看到它的身影。 批处理的核心思想是将多个任务或数据集合在一起,进行统一处理。这种方法的优势在于可以充分利用系统资源,减少任务切换带来的开销,从而提高整体效率。比如在工业制造中,工厂通常会将相同类型的零部件批量生产,以降低生产成本和提高生产速度。在互联网领域,批处理则表现为批量数据的存储、传输和处理,以优化性能和提升系统吞吐量。 批处理在极致吞吐量需求下的应用,更加显著。例如,在大数据分析中,海量的数据需要集中处理才能得出有意义的结果。如果逐条处理数据,不仅效率低下,还可能造成系统瓶颈。通过批处理,可以将数据划分为若干批次,在预定的时间窗口内统一处理,从而提高系统的并行处理能力,提升整体吞吐量。 此外,批处理其实并不意味着牺牲延时,就比如在 CPU Cache 中,对单个字节的操作无论如何时间上都是会优于多个字节,但是这样的比较并没有意义,因为延时的感知并不是无穷小的,用户常常并不关心 CPU 执行一条指令需要花多长时间,而是执行完单个“任务/作业”需要多久,在宏观的概念上,反而批处理具有更低的延时。 RocketMQ 批处理模型演进 接下来我们看看,RocketMQ 与批处理的“如胶似漆、形影相随”吧,其实在 RocketMQ 的诞生之初,就已经埋下了批处理的种子,这颗种子,我们暂且叫它——早期的批处理模型。 早期批处理模型 下图,是作为用户视角上感知比较强的老三样,分别是 Producer、Consumer、Broker: 而早期批处理模型,实际上只和 Producer、Broker 有关,在这条链路上会有批量消息的概念,当消息到达 Broker 后这个概念就会消失。基于这点我们来看具体是怎么回事。首先批量消息的源头实际上就是 Producer 端的 Send 接口,在大部分场景下,我们发送一条消息都会使用以下的形式去操作: ```java SendResult send(Message msg); ``` 非常地简明扼要,将一条消息发送到 Broker,如果我们要使用上早期的批处理模型,也只需要稍作修改: ```java SendResult send(Collection msgs) ``` 可以看到,将多条消息串成一个集合,然后依旧是调用 send 接口,就可以完成早期批处理模型的使用了(从用户侧视角看就已经 ok 了),就像下图一样,两军交战,谁火力更猛高下立判~ 那么真就到此为止了吗?当然不是,首先这里的集合是有讲究的,并不是随意将多条消息放在一起,就可以 send 出去的,它需要满足一些约束条件: 相同 Topic。 不能是 RetryTopic。 不能是定时消息。 相同 isWaitStoreMsgOK 标记。 这些约束条件暂时先不展开,因为就如同它字面意思一样浅显易懂,但是这也意味着它的使用并不是随心所欲的,有一定的学习成本,也有一定的开发要求,使用前需要根据这些约束条件自行分类,然后再装进“大炮”中点火发射。这里可能有人会问,这不是为难我胖虎吗?为什么要加这么多约束?是不是故意的?实际上并非如此,我们可以想象一下,假如我们是商家: 客户 A 买了两件物品,在发货阶段我们很自然的就可以将其打包在一起(将多个 Message 串成一个 ArrayList),然后一次性交给快递小哥给它 Send 出去,甚至还能省一笔邮费呢~ 客户 B 和客户 C 各买了一件物品,此时我效仿之前的行为打包到一起,然后告诉快递小哥这里面一个发到黑龙江,一个发到海南,然后掏出一笔邮费,然后。。。就没有然后了。 很显然,第二个场景很可能会收到快递小哥一个大大的白眼,这种事情理所应当的做不了,这也是为什么属于同一个 Collection 的消息必须要满足各种各样的约束条件了,在 Broker 实际收到一个“批量消息”时,会做以下处理: 首先它会根据这一批消息的某些属性,挑选出对应的队列,也就是上图中最底下的「p1、p2......」,在选定好队列之后,就可以进行后续的写入等操作了,这也是为什么必须要求相同 Topic,因为不同的 Topic 是没法选定同一个队列的。 接下来就到了上图所示流程,可以看到这里分别来了三个消息,分别是 《四条消息》《一条消息》《三条消息》,接下来他们会依次进入 unPack 流程,这个流程有点像序列化过程,因为从客户端发送上来的消息都是内存结构的,距离实际存储在文件系统中的结构还有一些不同。在 unPack 过程中,会分别解包成:四条消息、一条消息、三条消息;此时和连续 Send 八条消息是没有任何区别的,也就是在这一刻,批量消息的生命周期就走到了尽头,此刻往后,“众生平等、不分你我”。 也正是这个机制,Consumer 其实并不知道 Producer 发送的时候“到底是发射弓箭,还是点燃大炮”。这么做有个非常好的优点,那就是有着最高的兼容性,一切的一切好像和单条消息 Send 的经典用法没有任何区别,在这种情况下,每条消息都有最高的自由度,例如各自独立的 tag、独立的 keys、唯一的 msgId 等等,而基于这些所衍生出来的生态(例如消息轨迹)都是无缝衔接的。也就是说:只需要更换发送者使用的 Send 接口,就可以获得极大的发送性能提升,而消费者端无需任何改动。 索引构建流水线改造 我一向用词都非常的严谨,可以看到上一段的结尾:“获得极大的发送性能提升”,至于为什么这么讲,是因为距离整体系统的提升还有一些距离,也就是这一段的标题“索引构建流水线改造”。 首先我们要有一个共识,那就是对于消息队列这种系统,整体性能上限比值“消费/生产”应该要满足至少大于等于一,因为大部分情况下,我们的生产出来的消息至少应该被消费一次(否则直接都不用 Send 了岂不美哉)。 其实在以往,发送性能没有被拔高之前,它就是整个生产到消费链路上的短板,也就是说消费速率可以轻松超过生产速率,整个过程也就非常协调。but!在使用早期批处理模型后,生产速率的大幅度提升就暴露了另外一个问题,也就是会出现消费速率跟不上生产的情况,这种情况下,去谈整个系统的性能都是“无稽之谈”。 而出现消费速率短板的原因,还要从索引构建讲起。由于消费是要找到具体的消息位置,那就必须依赖于索引,也就是说,一条消息的索引构建完成之前,是无法被消费到的。 下图就是索引构建流程的简易图: 这是整个直接决定消费速率上限的流程。通过一个叫 ReputMessageService 的线程,顺序扫描 CommitLog 文件,将其分割为一条一条的消息,再对这些消息进行校验等行为,将其转换成一条条的索引信息,并写入对应分区的 ConsumeQueue 文件。 整个过程是完全串行的,从分割消息,到转换索引,到写入文件,每一条消息都要经过这么一次流转。因为一开始是串行实现,所以改造起来也非常的自然,那就是通过流水线改造,提高它的并发度,这里面有几个需要解决的问题: CommitLog 的扫描过程并行难度高,因为每条消息的长度是不一致的,无法简单地分割出消息边界来分配任务。 单条消息的索引构建任务并不重,因此不能简单忽略掉任务流转过程中的开销(队列入队出队)。 写入 ConsumeQueue 文件的时候要求写入时机队列维度有序,否则会带来额外的检查开销等。 针对这几个难点,在设计中也引入了“批量处理”的思路,其实大到架构设计、小到实现细节,处处都体现了这一理念,下图就是改造后的流程: 由于 CommitLog 扫描过程很难并行化处理,那就干脆不做并行化改造了,就使用单线程去顺序扫描,但是扫描的时候会进行一个简单的批处理,扫描出来的消息并不是单条的,而是尽可能凑齐一个较大的 buffer 块,默认是 4MB,这个由多条消息构成的 buffer 块我们不妨将其称为一个 batch msg。 然后就是对这些 batch msg 进行并行解析,将 batch msg 以单条消息的粒度扫描出来,并构建对应的 DispatchRequest 结构,最终依次落盘到 ConsumeQueue 文件中。其中的关键点在于 batch msg 的顺序如何保证,以及 DispatchRequest 在流转时怎么保证顺序和效率。为此我专门实现了一个轻量级的队列 DispatchRequestOrderlyQueue,这个 Queue 采用环状结构,可以随着顺序标号不断递进,并且能做到 “无序入队,有序出队”,详细设计和实现均在开源 RocketMQ 仓库中,这里就不多赘述。 在经过改造后,索引构建流程不再成为扯后腿的一员,从原本眼中钉的角色美美隐身了~ BatchCQ 模型 经过上述索引构建流水线改造后,整个系统也就实现了最基本的批处理模型,可以在最小修改、最高兼容性的情况下让性能获得质的飞跃。 但是这并不够!因为早期的模型出于兼容性等考虑,所以依旧束手束脚的,于是 BatchCQ 模型诞生了,主要原因分为两个维度: 性能上: 早期模型中,Broker 端在准备写入阶段需要进行解包,会有一定的额外开销。 CommitLog 文件中不具备批量信息,索引需要分多次构建。 能力上: 无法实现端到端的批量行为,如加密、压缩。 那 BatchCQ 又是如何改进上述的问题的呢?其实也非常地直观,那就是“见字如面”,将 ConsumeQueue 也批量化。这个模型去掉 Broker 端写入前的解包行为,索引也只进行一次构建: 就像上图所示,如果把索引比做信封,原先每个信封只能包含一份索引信息,在批量化后则可以塞下任意数量的索引信息,具体的存储结构也发生了较大变化: 比如说如果来了两批消息,分别是(3+2)条,在普通的 CQ 模型里会分别插入 5 个 slot,分别索引到 5 条消息。但是在 BatchCQ 模型中,(3+2)条消息会只插入 2 个 slot,分别索引到 3 条以及 2 条。 也是因为这个特点,所以 CQ 原有的格式也发生了变化,为了记录更多信息不得不加入 Base Offset、Batch Num 等元素,而这些更改也让原来定位索引位置的逻辑发生了变化。 普通 CQ:每个 Slot 定长,【Slot 长度 QueueOffset】位点可以直接找到索引,复杂度 O(1)。 BatchCQ:通过二分法查找,复杂度 O(log n)。 虽然这部分只涉及到了 ConsumeQueue 的修改,但是它作为核心链路的一环,影响是很大的,首先一批的消息会被当作同一条消息来处理,不需要重新 unPack ,而且这些消息都会具有相同的 TAG、Keys 甚至 MessageId,想唯一区分同一批的消息,只能根据它们的 QueueOffset 了,这一点会让消息轨迹等依靠 MessageId 的能力无法直接兼容使用,但是消息的处理粒度依然可以保持不变(依赖的是 QueueOffset)。 AutoBatch 模型 通过 BatchCQ 改造之后,我们其实已经获得极致的吞吐量了。那个 AutoBatch 又是个啥呢? 这里又要从头说起,在早期批处理模型的总结里,提到了一个比较大的缺陷,那就是“使用起来不够顺手”,用户是需要关心各种约束条件的,就像前面提到的 Topic、消息类型、特殊 Flag 等,在 BatchCQ 里面其实是新增了 Keys、Tag 等维度的限制,错误使用会出现一些非预期的情况。 不难看出,无论是早期批处理模型、还是 BatchCQ 模型,使用起来都有一定的学习成本,除了需要关注各种使用方式外,想要用好,还有一些隐藏在暗处的问题需要主动去解决: 无论是早期的批处理模型,还是 batchCQ 模型,都需要发送端自行将消息分类打包。 消息分类和打包成本高,分类需要关心分类依据,打包需要关心触发时机。 分类依据复杂,早期批处理模型需要关注多个属性,batchCQ 在这基础上新增了多个限制。 打包时机不易掌握,使用不当容易出现性能下降、时延不稳定、分区不均衡等问题。 为了解决以上问题,AutoBatch 应运而生,它就是一台能自动分拣的无情打包机器,全天候运转,精密又高效,将以往需要用户关注的细节统统屏蔽,它具有以下几个优点: AutoBatch 托管分类和打包能力,只需要简单配置即可使用。 用户侧不感知托管的过程,使用原有发送接口即可享受批处理带来的性能提升,同时兼容同步发送和异步发送。 AutoBatch 同时兼容早期的批处理模型和 batchCQ 模型。 实现轻量,性能优秀,设计上优化延时抖动、小分区等问题。 首先到底有多简单呢?让我们来看一下: ```java // 发送端开启 AutoBatch 能力 rmqProducer.setAutoBatch(true); ``` 也就是说,只需要加入这么一行,就可以开启 RocketMQ 的性能模式,获得早期的批处理模型或者 BatchCQ 模型带来的极致吞吐量提升。在开启 AutoBatch 的开关后,用户所有已有的行为都不需要作出改变,使用原来经典的 Send(Message msg)即可;当然也可以进行更精细的内存控制和延时控制: ```java // 设置单个 MessageBatch 大小(kb) rmqProducer.batchMaxBytes(32 1024); // 设置最大聚合等待时间(ms) rmqProducer.batchMaxDelayMs(10); // 设置所有聚合器最大内存使用(kb) rmqProducer.totalBatchMaxBytes(32 1024 1024); ``` 那么它具体轻量在哪?又高效在哪?下面这个简易的流程图应该能给大家一个答案: 首先它只引入了一个单线程的背景线程——background thread,这个背景线程以 1/2 的 maxDelayMs 周期运行,将扫描到超过等待时机缓冲区的消息提交到异步发送的线程池中,此时就完成了时间维度的聚合。空间维度的聚合则是由发送线程在传递时进行检查,如果满足 maxBytes,则原地发送。 整个设计非常地精简,只额外引入了一个周期运行的线程,这样做可以避免因为 AutoBatch 模型本身出现性能短板,而且 batchMessage 的序列化过程也做了精简,去掉了发送时候所有的检测(在聚合过程中已提前分类)。 才艺展示 上面分享了 RocketMQ 在批处理模型上的演进,那么它们具体效果也就必须拉出来给大家做一个才艺展示了,以下所有的压测结果均来自于 OpenmessagingBenchmark 框架,压测中使用的各项配置如下所示: | | 压测机器 | x86芯片机器 | | | | | | 规格 | 32核(vCPU)64 GiB20 Mbpsecs.c7.8xlarge | 8核(vCPU)64 GiB20 Mbpsecs.r7.2xlarge | | 云盘 | 无 | ESSD云盘 PL1 965GiB (50000 IOPS) | | 操作系统 | Alibaba Cloud Linux 3.2104 LTS 64位 | Alibaba Cloud Linux 3.2104 LTS 64位 | | JDK版本 | openjdk version "11.0.19" 20230418 LTSOpenJDK Runtime Environment (Red_Hat11.0.19.0.71.0.1.al8) (build 11.0.19+7LTS) | openjdk version "11.0.19" 20230418 LTSOpenJDK Runtime Environment (Red_Hat11.0.19.0.71.0.1.al8) (build 11.0.19+7LTS) | 准备工作 为 OpenmessagingBenchmark 进行压测环境,首先部署一套开源社区上最新的 RocketMQ,然后配置好 Namesrv 接入点等信息,然后打开 RocketMQ 的性能模式——AutoBatch,将 autoBatch 字段设置为 true: 早期批处理模型 ```java bin/benchmark drivers driverrocketmq/rocketmq.yaml workloads/1topic100partitions1kb4p4c1000k.yaml ``` 开启 autobatch 能力后,就会使用早期批处理模型进行性能提升,可以看到提升幅度非常大,由原来的 8w 提升至 27w 附近,为原来的 300%。 索引构建流水线优化 流水线优化是需要在服务端开启的,下面是一个简单的配置例子: ```java // 开启索引构建流水线优化 enableBuildConsumeQueueConcurrently=true // 调整内存中消息最大消费阈值 maxTransferBytesOnMessageInMemory=256M maxTransferCountOnMessageInMemory=32K // 调整磁盘中消息最大消费阈值 maxTransferBytesOnMessageInDisk=64M maxTransferCountOnMessageInDisk=32K ``` 可以看到,只有开启索引构建优化,才能做到 稳稳地达到 27w 的吞吐,在没有开启的时候,消费速率不足会触发冷读直至影响到整个系统的稳定性,同时也不具备生产意义,所以在使用批量模型的时候也务必需要开启索引构建优化。 BatchCQ模型 BatchCQ 模型的使用与前面提到的两者不同,它不是通过开关开启的,BatchCQ 其实是一种 Topic 类型,当创建 topic 的时候指定其为 BatchCQ 类型,既可拥有最极致的吞吐量优势。 ```java // Topic 的各种属性在 TopicAttributes 中设置 public static final EnumAttribute QUEUE_TYPE_ATTRIBUTE = new EnumAttribute("queue.type", false, newHashSet("BatchCQ", "SimpleCQ"), "SimpleCQ"); topicConfig.getAttributes().put("+" + TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName(), "BatchCQ"); ``` 当使用 BatchCQ 模型的时候,与早期批处理模型已经有了天壤之别,因此我们寻求了和开源 Kafka 的对比,部署架构如下: RocketMQ 3 主 3 备架构,使用轻量级 Container 部署。 节点 1: MasterA,SlaveC 节点 2: MasterC,SlaveB 节点 3: MasterB,SlaveA Kafka 3 个节点,设置分区副本数为 2。 压测结果 | | MQ | Kafka | | | | | | 16partions | TPS: 251439.34P99: 264.0 | TPS: 267296.34P99: 1384.01 | | 10000partiotions | TPS: 249981.94P99: 1341.01 | 报错无数据 | 可以看到,在使用 BatchCQ 类型的 Topic 时,RocketMQ 与 Kafka 的性能基本持平: 16partitions,二者吞吐量相差 5% 以内,且 RocketMQ 则具有明显更低的延时表现。 10000partitions,得益于 RocketMQ 的存储结构更为集中,在大量分区场景下吞吐量几乎保持不变。而Kafka在默认配置的情况下出现报错无法使用。 因此在极致吞吐量的需求下,BatchCQ 模型能够很好地承接极致需求的流量,而且如果更换性能更好的本地磁盘,同样的机器配置能达到更高的上限。
作者:谷乂
#技术探索

2024年8月9日

深度解读 RocketMQ 存储机制
作者 | 斜阳来源 | 阿里开发者公众号 简介:RocketMQ 实现了灵活的多分区和多副本机制,有效的避免了集群内单点故障对于整体服务可用性的影响。存储机制和高可用策略是 RocketMQ 稳定性的核心,社区上关于 RocketMQ 目前存储实现的分析与讨论一直是一个热议的话题。本文想从一个不一样的视角,着重于作者眼中的这种存储实现是在解决哪些复杂的问题,因此我从本文最初的版本中删去了冗杂的代码细节分析,由浅入深的分析存储机制的缺陷与优化方向。 RocketMQ 实现了灵活的多分区和多副本机制,有效的避免了集群内单点故障对于整体服务可用性的影响。存储机制和高可用策略是 RocketMQ 稳定性的核心,社区上关于 RocketMQ 目前存储实现的分析与讨论一直是一个热议的话题。本文想从一个不一样的视角,着重于谈谈我眼中的这种存储实现是在解决哪些复杂的问题,因此我从本文最初的版本中删去了冗杂的代码细节分析,由浅入深的分析存储机制的缺陷与优化方向。 RocketMQ 的架构模型与存储分类 先来简单介绍下 RocketMQ 的架构模型。RocketMQ 是一个典型的发布订阅系统,通过 Broker 节点中转和持久化数据,解耦上下游。Broker 是真实存储数据的节点,由多个水平部署但不一定完全对等的副本组构成,单个副本组的不同节点的数据会达到最终一致。对于单个副本组来说同一时间最多只会有一个可读写的 Master 和若干个只读的 Slave,主故障时需要选举来进行单点故障的容错,此时这个副本组是可读不可写的。NameServer 是独立的一个无状态组件,接受 Broker 的元数据注册并动态维护着一些映射关系,同时为客户端提供服务发现的能力。在这个模型中,我们使用不同主题 (Topic) 来区分不同类别信息流,为消费者设置订阅组 (Group) 进行更好的管理与负载均衡。 如下图中间部分所示: 1. 服务端 Broker Master1 和 Slave1 构成其中的一个副本组。 2. 服务端 Broker 1 和 Broker 2 两个副本组以负载均衡的形式共同为客户端提供读写。 RocketMQ 目前的存储实现可以分为几个部分: 1. 元数据管理 具体指当前存储节点的主题 Topic,订阅组 Group,消费进度 ConsumerOffset。 多个配置文件 Config,以及为了故障恢复的存储 Checkpoint 和 FileLock。 用来记录副本主备身份的 Epoch / SN (sequence number) 文件等(5.0beta 引入,也可以看作 term) 2. 消息数据管理,包括消息存储的文件 CommitLog,文件版定时消息的 TimerLog。 3. 索引数据管理,包括按队列的顺序索引 ConsumeQueue 和随机索引 IndexFile。 元数据管理与优化 为了提升整体的吞吐量与提供跨副本组的高可用能力,RocketMQ 服务端一般会为单个 Topic 创建多个逻辑分区,即在多个副本组上各自维护部分分区 (Partition),我们把它称为队列 (MessageQueue)。同一个副本组上同一个 Topic 的队列数相同并从 0 开始连续编号,不同副本组上的 MessageQueue 数量可以不同。 例如 topica 可以在 broker1 主副本上有 4 个队列,编号 (queueId) 是 03,在 broker1 备副本上完全相同,但是 broker2 上可能就只有 2 个队列,编号 01。在 Broker 上元数据的组织管理方式是与上述模型匹配的,每一个 Topic 的 TopicConfig,包含了几个核心的属性,名称,读写队列数,权限与许多元数据标识,这个模型类似于 K8s 的 StatefulSet,队列从 0 开始编号,扩缩队列都在尾部操作(例如 24 个队列缩分区到 16,是留下了编号为 015 的分区)。这使得我们无需像 Kafka 一样对每个分区单独维护状态机,同时大幅度的简化了关于分区的实现。 我们会在存储节点的内存中简单的维护 Map 的结构来将 TopicName 直接映射到它的具体参数。这个设计足够的简单,也隐含了一些缺陷,例如它没有实现一个原生 Namespace 机制来实现存储层面上多租户环境下的元数据的隔离,这也是 RocketMQ 5.0 向云原生时代迈进过程中一个重要的演进方向。 当 Broker 接收到外部管控命令,例如创建或删除一些 Topic,这个内存 Map 中就会对应的更新或者删除一个 KV 对,需要立刻序列化一次并向磁盘覆盖,否则就会造成丢失更新。对于单租户的场景下,Topic (Key) 的数量不会超过几千个,文件大小也只有数百 KB,速度是非常快。但是在云上大多租的场景下,一个存储节点的 Topic 可以达到十几 MB。每次变更一个 KV 就全量向磁盘覆盖写这个大文件,这个操作的开销非常高,尤其是在数据需要跨集群,跨节点迁移,或者应急情况下扩容逃生场景下,同步写文件严重延长了外围管控命令的响应时间,也成为云上大共享模式下严峻的挑战之一。在这个背景下,两个解决方案很自然的就产生了,即批量更新接口和增量更新机制。 1. 批量更新指每次服务端可以接受一批 TopicConfig 的更新,这样 Broker 刷写文件的频率就显著的降低。 2. 增量更新指将这个 Map 的持久化换成逻辑替换成 KV 型的数据库或实现元数据的 Append 写,以 Compaction 的形式维护一致性。 除了最重要的 Topic 信息,Broker 还管理着 Group 信息,消费组的消费进度 ConsumerOffset 和多个配置文件。Group 的变更和 Topic 类似,都是只有新建或者删除时才需要持久化。而 ConsumeOffset 是用来维护每个订阅组的消费进度的,结构如 Map。这里我们从文件本身的作用和数据结构的角度进行分析下,Topic Group 虽然数量多,但是变化的频率还是比较低的,而提交与持久化位点时时刻刻都在进行,进而导致这个 Map 几乎在实时更新,但是上一次更新后的数据 (last commit offset) 对当前来说又没有什么用,并且允许丢少量更新。所以这里 RocketMQ 没有像 Topic Group 那样采取数据变化时刷写文件,而是使用一个定时任务对这个 Map 做 CheckPoint。这个周期默认是 5 秒,所以当服务端主备切换或者正常发布时,都会有秒级的消息重复。 那么这里还有没有优化的空间呢?事实上大部分的订阅组都是不在线的,每次我们也只需要更新位点有变化的这部分订阅组。所以这里我们可以采取一个差分优化的策略(参加过 ACM 的选手应该更熟悉,搜索差分数据传输),在主备同步 Offset 或者持久化的时候只更新变化的内容。假如此时我们除了知道当前的 Offset,还需要一个历史 Offset 的提交记录怎么办,这种情况下,也使用一个内置的系统 Topic 来保存每次提交(某种意义上的自举实现,Kafka 就是使用一个内部 Topic 来保存位点),通过回放或查找消息来追溯消费进度。由于 RocketMQ 支持海量 Topic,元数据的规模会更加大,采用目前的实现开销更小。所以选用哪种实现完全是由我们所面对的需求决定的,实现也可以是灵活多变的。当然,在 RocketMQ 元数据管理上,如何在上层保证分布式环境下多个副本组上的数据一致又是另外一个令人头疼的难题,后续文章会更加详细的讨论这点。 消息数据管理 很多文章都提到 RocketMQ 存储的核心是一个极致优化的顺序写盘,以 append only 的形式不断的将新的消息追加到文件末尾。 RocketMQ 使用了一种称为 MappedByteBuffer 的内存映射文件的办法,将一个文件映射到进程的地址空间,实现文件的磁盘地址和进程的一段虚拟地址关联,实际上是利用了NIO 中的 FileChannel 模型。在进行这种绑定后,用户进程就可以用指针(偏移量)的形式写入磁盘而不用进行 read / write 的系统调用,减少了数据在缓冲区之间来回拷贝的开销。当然这种内核实现的机制有一些限制,单个 mmap 的文件不能太大 (RocketMQ 选择了 1G),此时再把多个 mmap 的文件用一个链表串起来构成一个逻辑队列 (称为 MappedFileQueue),就可以在逻辑上实现一个无需考虑长度的存储空间来保存全部的消息。 这里不同 Topic 的消息直接进行混合的 append only 写,相比于随机写来说性能的提升非常显著的。还有一个重要的细节,这里的混合写的写放大非常低。当我们回头去看 Google 实现的 BigTable 的理论模型,各种 LSM 树及其变种,都是将原来的直接维护树转为增量写的方式来保证写性能,再叠加周期性的异步合并来减少文件的个数,这个动作也称为 Compaction。RocksDB 和 LevelDB 在写放大,读放大,空间放大都有几倍到几十倍的开销。得益于消息本身的不可变性,和非堆积的场景下,数据一旦写入中间代理 Broker 很快就会被下游消费掉的特性,此时我们不需要在写入时就维护 memTable,避免了数据的分发与重建。相比于各种数据库的存储引擎,消息这样近似 FIFO 的实现可以节省大量的资源,同时减少了 CheckPoint 的复杂度。对于同一个副本组上的多个副本之间的数据复制都是全部由存储层自行管理,这个设计类似于 bigtable 和 GFS,azure 的 Partation layer,也被称为 Layered Replication 分层架构。 单条消息的存储格式 RocketMQ 有一套相对复杂的消息存储编码用来将消息对象序列化,随后再将一个非定长的数据落到上述的真实的写入到文件中,值得注意的存储格式中包括了索引队列的编号和位置。 存储时单条消息本身元数据占用的存储空间为固定的 91B + 部分属性,而消息的 payload 通常大于 2K,也就是说元数据带来的额外存储开销只增加了 5%10% 左右。很明显,单条消息越大,存储本身额外的开销(比例)就相对的越少。但如果有大消息的诉求,例如想在 body 中保存一张序列化后的图片(二进制大对象),从目前的实现上说,在消息中保存引用,将真实数据保存到到其他组件,消费时读取引用(比如文件名或者 uk)其实是一个更合适的设计。 多条消息的连续写 上文提到,不同 Topic 的消息数据是直接混合追加数据到 CommitLog 中 (也就是上文提到的 MappedFileQueue),再交由其他后端线程做分发。其实我觉得 RocketMQ 这种 CommitLog 与元数据的分开管理的机制也有一些 PacificaA (微软提出的复制框架) 的影子,从而以一种更简单的方式实现强一致。这里的强一致指的是在 Master Broker (对应于 PacificA 的 Primary) 对所有消息的持久化进行定序,再通过全序广播 (total order broadcast) 实现线性一致 (Linearizability)。这几种实现都会需要解决两个类似的问题,一是如何实现单机下的顺序写,二是如何加快写入的速度。 如果是副本组是异步多写的(高性能中可靠性),将日志非最新(水位最高)的备选为主,主备的数据日志可能会产生分叉。在 RocketMQ 5.0 中,主备会通过基于版本的协商机制,使用落后补齐,截断未提交数据等方式来保证数据的一致性。顺便一提,RocketMQ 5.0 中实现了 logic queue 方案解决全局分区数变化的问题,这和 PacificaA 中通过 newseal 新增副本组和分片 merge 给计算层读的一些优化策略有一些异曲同工之妙,具体可以参考这个设计方案[10]。 独占锁实现顺序写 如何保证单机存储写 CommitLog 的顺序性,直观的想法就是对写入动作加独占锁保护,即同一时刻只允许一个线程加锁成功,那么该选什么样的锁实现才合适呢?RocketMQ 目前实现了两种方式。1. 基于 AQS 的 ReentrantLock 2. 基于 CAS 的 SpinLock 那么什么时候选取 spinlock,什么时候选取 reentranlock?回忆下两种锁的实现,对于 ReentrantLock,底层 AQS 抢不到锁的话会休眠,但是 SpinLock 会一直抢锁,造成明显的 CPU 占用。SpinLock 在 trylock 失败时,可以预期持有锁的线程会很快退出临界区,死循环的忙等待很可能要比进程挂起等待更高效。这也是为什么在高并发下为了保持 CPU 平稳占用而采用方式一,单次请求响应时间短的场景下采用方式二能够减少 CPU 开销。两种实现适用锁内操作时间不同的场景,那线程拿到锁之后需要进行哪些动作呢? 1. 预计算索引的位置,即 ConsumeQueueOffset,这个值也需要保证严格递增。 2. 计算在 CommitLog 存储的位置,physicalOffset 物理偏移量,也就是全局文件的位置。 3. 记录存储时间戳 storeTimestamp,主要是为了保证消息投递的时间严格保序。 因此不少文章也会建议在同步持久化的时候采用 ReentrantLock,异步持久化的时候采用 SpinLock。那么这个地方还有没有优化的空间?目前可以考虑使用较新的 futex 取代 spinlock 机制。futex 维护了一个内核层的等待队列和许多个 SpinLock 链表。当获得锁时,尝试 cas 修改,如果成功则获得锁,否则就将当前线程 uaddr hash 放入到等待队列 (wait queue),分散对等待队列的竞争,减小单个队列的长度。这听起来是不是也有一点点 concurrentHashMap 和 LongAddr 的味道,其实核心思想都是类似的,即分散竞争。 成组提交与可见性 受限于磁盘IO,块存储的响应通常非常慢。要求所有请求立即持久化是不可能的,为了提升性能,大部分的系统总是将操作日志缓存到内存中,比如在满足"日志缓冲区中数据量超过一定大小 / 距离上次刷入磁盘超过一定时间" 的任一条件时,通过后台线程定期持久化操作日志。但这种成组提交的做法有一个很大的问题,存储系统意外故障时,会丢失最后一部分更新操作。例如数据库引擎总是要求先将操作日志刷入磁盘 (优先写入 redo log) 才能更新内存中的数据,这样断电重启则可以通过 undo log 进行事务回滚与丢弃。 在消息系统的实现上有一些微妙的不同,不同场景下对消息的可靠性要求不同,在金融云场景下可能要求主备都同步持久化完成消息才对下游可见,但日志场景希望尽可能低的延迟,同时允许故障场景少量丢失。此时可以将 RocketMQ 配置为单主异步持久化来提高性能,降低成本。此时宕机,存储层会损失最后一小段没保存的消息,而下游的消费者实际上已经收到了。当下游的消费者重置位点到一个更早的时间,回放至同样位点的时候,只能读取到了新写入的消息,但读取不到之前消费过的消息(相同位点的消息不是同一条),这是一种 read uncommitted。 这样会有什么问题呢?对于普通消息来说,由于这条消息已经被下游处理,最坏的影响是重置位点时无法消费到。但是对于 Flink 这样的流计算框架,以 RocketMQ 作为 Source 的时候,通过回放最近一次 CheckPoint 到当前的数据的 offset 来实现高可用,不可重复读会造成计算系统没法做到精确的 excatly once 消费,计算的结果也就不正确了。相应的解决的方案之一是在副本组多数派确认的时候才构建被消费者可见的索引,这么做宏观上的影响就是写入的延迟增加了,这也可以从另一个角度解读为隔离级别的提升带来的代价。 对于权衡延迟和吞吐量这个问题,可以通过加快主备复制速度,改变复制的协议等手段来优化,这里大家可以看下 SIGMOD 2022 关于 Kafka 运行在 RDMA 网络上显著降低延迟的论文《KafkaDirect: Zerocopy Data Access for Apache Kafka over RDMA Networks》链接[9]。 持久化机制 关于这一块的讨论在社区里讨论是最多的,不少文章都把持久化机制称为刷盘。我不喜欢这个词,因为它不准确。在 RocketMQ 中提供了三种方式来持久化,对应了三个不同的线程实现,实际使用中只会选择一个。 同步持久化,使用 GroupCommitService。 异步持久化且未开启 TransientStorePool 缓存,使用 FlushRealTimeService。 异步持久化且开启 TransientStorePool 缓存,使用 CommitRealService。 持久化 同步刷盘的落盘线程统一都是 GroupCommitService。写入线程仅仅负责唤醒落盘线程,将消息转交给存储线程,而不会等待消息存储完成之后就立刻返回了。我个人对这个设计的理解是,消息写入线程相对与存储线程来说也可以看作 IO 线程,而真实存储的线程需要攒批持久化会陷入中断,所以才要大费周章的做转交。 从同步刷盘的实现看,落盘线程每隔 10 ms 会检查一次,如果有数据未持久化,便将 page cache 中的数据刷入磁盘。此时操作系统 crash 或者断电,那未落盘的数据丢失会不会对生产者有影响呢?此时生产者只要使用了可靠发送 (指非 oneway 的 rpc 调用),这时对于发送者来说还没有收到成功的响应,此时客户端会进行重试,将消息写入其他可用的节点。 异步持久化对应的线程是 FlushRealTimeService,实现上又分为固定频率和非固定频率,核心区别是线程是否响应中断。所谓的固定频率是指每次有新的消息到来的时候不管,不响应中断,每隔 500ms(可配置)flush 一次,如果发现未落盘数据不足(默认 16K),直接进入下一个循环,如果数据写入量很少,一直没有填充满16K,就不会落盘了吗?这里还有一个基于时间的兜底方案,即线程发现距离上次写入已经很久了(默认 10 秒),也会执行一次 flush。但事实上 FileChannel 还是 MappedByteBuffer 的 force() 方法都不能精确控制写入的数据量,这里的写行为也只是对内核的一种建议。对于非固定频率实现,即每次有新的消息到来的时候,都会发送唤醒信号,当唤醒动作在数据量较大时,存在性能损耗,但消息量较少且情况下实时性好,更省资源。在生产中,具体选择哪种持久化实现由具体的场景决定。是同步写还是多副本异步写来保证数据存储的可靠性,本质上是读写延迟和和成本之间的权衡。 读写分离 广义上来说,读写分离这个名词有两个不同的含义: 像数据库一样主写从读,分摊读压力,牺牲延迟可靠性更高,适用于消息读写比非常高的场景。 存储写入将消息暂存至 DirectByteBuffer,当数据成功写入后,再归还给缓冲池,将写入 page cache 的动作异步化。 这里主要来讨论第二点,当 Broker 配置异步持久化且开启缓冲池,启用的异步刷盘线程是 CommitRealTimeService。我们知道操作系统本身一般是当 page cache 上积累了大量脏页后才会触发一次 flush 动作(由一些 vm 参数控制,比如 dirty_background_ratio 和 dirty_ratio)。这里有一个很有意思的说法是 CPU 的 cache 是由硬件维护一致性,而 page cache 需要由软件来维护,也被称为 syncable。高并发下写入 page cache 可能会造成刷脏页时磁盘压力较高,导致写入时出现毛刺现象。为了解决这个问题,出现了读写分离的实现,使用堆外内存将消息 hold 住,然后进行异步批量写入。 RocketMQ 启动时会默认初始化 5 块(参数 transientStorePoolSize 决定)堆外内存(DirectByteBuffer)循环利用,由于复用堆外内存,这个小方案也被成为池化,池化的好处及弊端如下: 好处:数据写堆外后便很快返回,减少了用户态与内核态的切换开销。 弊端:数据可靠性降为最低级别,进程重启就会丢数据(当然这里一般配合多副本机制进行保障),也会增加一些端到端的延迟。 宕机与故障恢复 宕机一般是由于底层的硬件问题导致,RocketMQ 宕机后如果磁盘没有永久故障,一般只需要原地重启,Broker 首先会进行存储状态的恢复,加载 CommitLog,ConsumeQueue 到内存,完成 HA 协商,最后初始化 Netty Server 提供服务。目前的实现是最后初始化对用户可见的网络层服务,实际上这里也可以先初始化网络库,分批将 Topic 注册到 NameServer,这样正常升级时可以对用户的影响更小。 在 recover 的过程中还有很多软件工程实现上的细节,比如从块设备加载的时候需要校验消息的 crc 看是否产生错误,对最后一小段未确认的消息进行 dispatch 等操作。默认从倒数第三个文件 recover CommitLog 加载消息到 page cache (假设未持久化的数据 文件的生命周期 聊完了消息的生产保存,再来讨论下消息的生命周期,只要磁盘没有满,消息可以长期保存。前面提到 RocketMQ 将消息混合保存在 CommitLog,对于消息和流这样近似 FIFO 的系统来说,越近期的消息价值越高,所以默认以滚动的形式从前向后删除最久远的消息,而不会关注文件上的消息是否全部被消费。触发文件清除操作的是一个定时任务,默认每 10s 执行一次。在一次定时任务触发时,可能会有多个物理文件超过过期时间可被删除,因此删除一个文件不但要判断这个文件是否还被使用,还需要间隔一定时间(参数 deletePhysicFilesInterval)再删除另外一个文件,由于删除文件是一个非常耗费 IO 的操作,可能会引起存储抖动,导致新消息写入和消费的延迟。所以又新增了一个定时删除的能力,使用 deleteWhen 配置操作时间(默认是凌晨4点)。我们把由于磁盘空间不足导致的删除称为被动行为,由于高速介质通常比较贵(傲腾 ESSD等),出于成本考虑,我们还会异步的主动的将热数据转移到二级介质上。在一些特殊的场景下,删除的同时可能还需要对磁盘做安全擦除来防止数据恢复。 避免存储抖动 快速失败 消息被服务端 Netty 的 IO 线程读取后就会进入到阻塞队列中排队,而单个 Broker 节点有时会因为 GC,IO 抖动等因素造成短时存储写失败。如果请求来不及处理,排队的请求就会越积越多导致 OOM,客户端视角看从发送到收到服务端响应的时间大大延长,最终发送超时。RocketMQ 为了缓解这种抖动问题,引入了快速失败机制,即开启一个扫描线程,不断的去检查队列中的第一个排队节点,如果该节点的排队时间已经超过了 200ms,就会拿出这个请求,立即向客户端返回失败,客户端会重试到其他副本组(客户端还有一些熔断与隔离机制),实现整体服务的高可用。 存储系统不止是被动的感知一些下层原因导致的失败,RocketMQ 还设计了很多简单有效的算法来进行主动估算。例如消息写入时 RocketMQ 想要判断操作系统的 page cache 是否繁忙,但是 JVM 本身没有提供这样的 Monitor 工具来评估 page cache 繁忙程度,于是利用系统的处理时间来判断写入是否超过1秒,如果超时的话,让新请求会快速失败。再比如客户端消费时会判断当前主的内存使用率比较高,大于物理内存的 40%时,就会建议客户端从备机拉取消息。 预分配与文件预热 为了在 CommitLog 写满之后快速的切换物理文件,后台使用一个后台线程异步创建新的文件并进行对进行内存锁定,还大费周章的设计了一个额外文件预热开关(配置 warmMapedFileEnable),这么做主要有两个原因:请求分配内存并进行 mlock 系统调用后并不一定会为进程完全锁定这些物理内存,此时的内存分页可能是写时复制的。此时需要向每个内存页中写入一些假的值,有些固态的主控可能会对数据压缩,所以这里不会写入 0。 调用 mmap 进行映射后,OS 只是建立虚拟内存地址至物理地址的映射表,而实际并没有加载任何文件至内存中。这里可能会有大量缺页中断。RocketMQ 在做 mmap 内存映射的同时进行 madvise 调用,同时向 OS 表明 WILLNEED 的意愿。使 OS 做一次内存映射后对应的文件数据尽可能多的预加载至内存中,从而达到内存预热的效果。 当然,这么做也是有弊端的。预热后,写文件的耗时缩短了很多,但预热本身就会带来一些写放大。整体来看,这么做能在一定程度上提高响应时间的稳定性,减少毛刺现象,但在 IO 本身压力很高的情况下则不建议开启。RocketMQ 是适用于 Topic 数量较多的业务消息场景。所以 RocketMQ 采用了和 Kafka 不一样的零拷贝方案,Kafka 采用的是阻塞式 IO 进行 sendfile,适用于系统日志消息这种高吞吐量的大块文件。而 RocketMQ 选择了 mmap + write 非阻塞式 IO (基于多路复用) 作为零拷贝方式,这是因为 RocketMQ 定位于业务级消息这种小数据块/高频率的 IO 传输,当想要更低的延迟的时候选择 mmap 更合适。 当 kernal 把可用的内存分配后 free 的内存就不够了,如果进程一下产生大量的新分配需求或者缺页中断,还需要将通过淘汰算法进行内存回收,此时可能会产生抖动,写入会有短时的毛刺现象。 冷数据读取 对于 RocketMQ 来说,读取冷数据可能有两种情况。 请求来自于这个副本组的其他节点,进行副本组内的数据复制,也可能是离线转储到其他系统。 请求来自于客户端,是消费者来消费几个小时以前的数据,属于正常的业务诉求。 对于第一种情况,在 RocketMQ 低版本源码中,对于需要大量复制 CommitLog 的情况(例如备磁盘故障,或新上线一个备机),主默认使用 DMA 拷贝的形式将数据直接通过网络复制给备机,此时由于大量的缺页中断阻塞了 io 线程,此时会影响 Netty 处理新的请求,在实现上让一些组件之间的内部通信使用 fastRemoting 提供的第二个端口,解决这个问题的临时方案还包括先用业务线程将数据 load 回内存而不使用零拷贝,但这个做法没有从本质上解决阻塞的问题。对于冷拷贝的情况,可以使用 madvice 建议 os 读取避免影响主的消息写入,也可以从其他备复制数据。 对于第二种情况,对各个存储产品来说都是一个挑战,客户端消费一条消息时,热数据全部存储在 page cache,对于冷数据会退化为随机读(系统会有一个对 page cache 连续读的预测机制)。需要消费超过几个小时之前的数据的场景下,消费者一般都是做数据分析或者离线任务,此时下游的目标都是吞吐量优先而非延迟。对于 RocketMQ 来说有两个比较好的解决方案,第一是同 redirect 的方式将读取请求转发给备进行分摊读压力,或者是从转储后的二级介质读取。在数据转储后,RocketMQ 本身的数据存储格式会发生变化,详见后文。 索引数据管理 在数据写入 CommitLog 后,在服务端当 MessageStore 向 CommitLog 写入一些消息后,有一个后端的 ReputMessageService 服务 (dispatch 线程) 会异步的构建多种索引,满足不同形式的读取诉求。 队列维度的有序索引 ConsumeQueue 在 RocketMQ 的模型下,消息本身存在的逻辑队列称为 MessageQueue,而对应的物理索引文件称为 ConsumeQueue。从某种意义上说 MessageQueue = 多个连续 ConsumeQueue 索引 + CommitLog 文件。 ConsumeQueue 相对与 CommitLog 来说是一个更加轻量。dispatch 线程会源源不断的将消息从 CommitLog 取出,再拿出消息在 CommitLog 中的物理偏移量 (相对于文件存储的 Index),消息长度以及Tag Hash 作为单条消息的索引,分发到对应的消费队列。偏移 + 长度构成了对 CommitLog 的引用 (Ref)。这种 Ref 机制对于单挑消息只有 20B,显著降低了索引存储开销。ConsumeQueue 实际写入的实现与 CommitLog 不同,CommitLog 有很多存储策略可以选择且混合存储,一个 ConsumeQueue 只会保存一个 Topic 的一个分区的索引,持久化默认使用 FileChannel,实际上这里使用 mmap 的话对小数据量的请求更加友好,不用陷入中断。 客户端的 pull 请求到服务端执行了如下流程来查询消息: 1. 根据 Tag 的 Hash 值查询 ConsumeQueue 文件(由 physicOffset + size + Tag HashCode 组成) 2. 根据 ConsumeQueue 拿到 physicOffset + size 3. 根据 physicOffset 查询 CommitLog 文件(上文的MappedFileQueue)获得消息 RocketMQ 中默认指定每个消费队列的文件存储 30 万条索引,而一个索引占用 20 个字节,这样每个文件的大小是 300 _1000 _20 / 1024 / 1024 ≈ 5.72M。为什么消费队列文件存储消息的个数要设置成 30 万呢?这个经验值适合消息量比较大的场景,事实上这个值对于大部分场景来说是偏大的,有效数据的真实占用率很低,导致ConsumeQueue 空载率高。 先来看看如果过大或者过小会带来什么问题。因为消息总是有失效期的,例如 3 天失效,如果消费队列的文件设置过大的话,有可能一个文件中包含了过去一个月的消息索引,但这个时候原始的数据已经滚动没了,白白浪费了很多空间。但也不宜太小,导致 ConsumeQueue 也有大量小文件,降低读写性能。 下面给出一个非严谨的空载率推导过程: 假设此时单机的 Topic = 5000,单节点单个 Topic 的队列数一般是 8,分区数量 = 4万。以 1T 消息数据为例,每条消息大小是 4KB,索引数量 = 消息数量 = 1024 1024 1024 / 4 = 2.68 亿。最少需要的 ConsumeQueue = 索引数量 / 30万 = 895 个,实际使用率 (有效数据量) 约等于 2.4%。随着 ConsumeQueue Offset 的原子自增滚动,cq 头部是无效数据导致占用的磁盘空间会变大。根据公有云线上的情况来看,非 0 数据约占 5%,实际有效数据只占 1%。对于 ConsumeQueue 这样的索引文件,我们可以使用 RocksDB 或者傲腾这样的持久化内存来存储,或者对 ConsumeQueue 单独实现一个用户态文件系统,几个方案都可以减少整体索引文件大小,提高访问性能。这一点在后文关于存储机制的优化中,我们再详聊。 由于 CommitLog ConsumerQueue Offset 的关系从消息写入的那一刻开始就确定了,在 Topic 跨副本组迁移,副本组要下线等需要切流的场景下,如果需要消息可读,需要采用复制数据的方案来实现 Topic 跨副本组迁移,只能采用消息级别的拷贝,而不能简单的把一个分区从副本组 A 移动到副本组 B。有一些消息产品在面对这个场景时,采用了数据按分区复制的方案,这种方案可能会立刻产生大量的数据传输(分区 rebalance),而 RocketMQ 的切流一般可以做到秒级生效。 消息维度的随机索引 IndexFile RocketMQ 作为业务消息的首选,上文中 ReputMessageService 线程除了构建消费队列的索引外,还同时为每条消息根据 id, key 构建了索引到 IndexFile。这是方便快速快速定位目标消息而产生的,当然这个构建随机索引的能力是可以降级的,IndexFile文件结构如下: IndexFile 也是定长的,从单个文件的数据结构来说,这是实现了一种简单原生的哈希拉链机制。当一条新的消息索引进来时,首先使用 hash 算法命中黄色部分 500w 个 slot 中的一个,如果存在冲突就使用拉链解决,将最新索引数据的 next 指向上一条索引位置。同时将消息的索引数据 append 至文件尾部(绿色部分),这样便形成了一条当前 slot 按照时间存入的倒序的链表。这里其实也是一种 LSM compaction 在消息模型下的改进,降低了写放大。 存储机制的演进方向 RocketMQ 的存储设计是以简单可靠队列模型作为核心来抽象的,也因此产生了一些缺陷和对应的优化方案。 KV 模型与 Queue 模型结合 RocketMQ 实现了单条业务消息的退避重试,在生产实践中,我们发现部分用户在客户端消费限流时直接将消息返回失败,在重试消息量比较大的时候,由于原有实现下重试队列数有限,导致重试消息无法很好的负载均衡到所有客户端。同时,消息来回的在服务端和客户端之间传输,使得两侧的开销都增加了,用户侧正确的做法应该是消费限流时,让消费的线程等待一会儿。从存储服务的角度上来说,这其实是一种队列模型的不足,让一条队列只能被一个消费者持有。RocketMQ 提出了 pop 消费这种全新的概念,让单条队列的消息能够被多个客户端消费到,这涉及到服务端对单条消息的加解锁,KV 模型就非常契合这个场景。从长远来看,像定时消息事务消息可以有一些基于 KV 的更原生的实现,这也是 RocketMQ 未来努力的方向之一。 消息的压缩与归档存储 压缩就是用时间去换空间的经典 tradeoff,希望以较小的 CPU 开销带来更少的磁盘占用或更少的网络 I/O 传输。目前 RocketMQ 客户端从延迟考虑仅单条大于 4K 的消息进行单条压缩存储的。服务端对于收到的消息没有立刻进行压缩存储有多个原因,例如为了保证数据能够及时的写入磁盘,消息稀疏的时候攒批效果比较差等,所以 Body 没有压缩存储。而对于大部分的业务 Topic 来说,其实 Body 一般都有很大程度上是相似的,可以压缩到原来的几分之一到几十分之一。 存储一般有高速(高频)介质与低速介质,热数据存放在高频介质上(如傲腾,ESSD,SSD),冷数据存放在低频介质上(NAS,OSS),以此来满足低成本保存更久的数据。从高频介质转到更低频的 NAS 或者 OSS 时,不可避免的产生了一次数据拷贝。我们可以在这个过程中异步的对数据进行规整(闲时资源富余)。那么我们为什么要做规整呢,直接零拷贝复制不香吗?答案就是低频介质虽然便宜大碗,但通常 iops 和吞吐量更低。对于 RocketMQ 来说需要规整的数据就是索引和 CommitLog 中的消息,也就是说在高频介质与低频介质上消息的存储格式可以是完全不同的。当热消息降级到二级存储的时候,数据密集且异步,这里就是一个非常合适的机会进行压缩和规整。业界也有一些基于 FPGA 来加速存储压缩的案例,将来我们也会持续的做这方面的尝试。 存储层资源共享与争抢 磁盘 IO 的抢占 没错,这里想谈谈的其实是硬盘的调度算法。在一个考虑性价比的场景下,由于 RocketMQ 的存储机制,我们可以把索引文件存储在 SSD,消息本身放在 HDD 里,因为热消息总是在 PageCache 中的,所以在 IO 调度上优先满足写而饿死读。对于没有堆积的消费者来说,消费到的数据是从 page cache 拷贝到 socket 再传输给用户,实时性已经很高了。而对于消费冷数据(几个小时,几天以前的数据)用户的诉求一般是尽快获取到消息即可,此时服务端可以选择尽快满足用户的 Pull 请求,由于大量的随机 IO,这样磁盘会产生严重的 rt 抖动。仔细考虑,这里其实用户想要的是尽可能大的吞吐量,假设访问冷数据需要 200 毫秒,假设在服务端把冷读的行为滞后,再加上延迟 500 毫秒再返回给用户数据,并没有显著的区别。而这里的 500 毫秒,服务端内部就可以合并大量的 IO 操作,我们也可以使用 madvice 系统调用去建议内核读取。这里的合并带来的收益很高,可以显著的减少对热数据的写入的影响,大幅度提升性能。 用户态文件系统 还是为了解决随机读效率低的问题,我们可以设计一个用户态文件系统,让 IO 调用全部 kernelbypass。 主要有几个方向: 1. 多点挂载。常用的 Ext4 等文件系统不支持多点挂载,让存储能够支持多个实例的对同一份数据的共享访问。 2. 调整对于 IO 的合并策略,IO优先级,polling 模式,队列深度等。 3. 使用文件系统类似 O_DIRECT 的非缓存方式读写数据。 RocketMQ 的未来 RocketMQ 存储系统经过多年的发展,基本功能特性已经比较完善,通过一系列的创新技术解决了分布式存储系统中的难题,稳定的服务于阿里集团和海量的云上用户。RocketMQ 在云原生时代的演进中遇到了更多的有趣的场景和挑战,这是一个需要全链路调优的复杂工程。我们会持续在规模,稳定性,多活容灾等企业级特性,成本与弹性等方面发力,将 RocketMQ 打造为“消息,事件,流”一体化的融合平台。同时,我们也会将开源行动更加可持续的发展下去,为社会创造价值。 参考文献 [3]. J. DeBrabant, A. Pavlo, S. Tu, M. Stonebraker, and S. B. Zdonik. Anticaching: A new approach to database management system architecture. PVLDB, 6(14):1942–1953, 2013. [4]. 《RocketMQ 技术内幕》 [5]. 一致性协议中的“幽灵复现”. [https://zhuanlan.zhihu.com/p/47025699.](https://zhuanlan.zhihu.com/p/47025699. [6]. Calder B, Wang J, Ogus A, et al. Windows Azure Storage: a highly available cloud storage service with strong consistency[C]//Proceedings of the TwentyThird ACM Symposium on Operating Systems Principles. ACM, 2011: 143157. [7]. Chen Z, Cong G, Aref W G. STAR: A distributed stream warehouse system for spatial data[C] 2020: 27612764. [8]. design dataintensive application 《构建数据密集型应用》
作者:斜阳
#技术探索

2024年8月9日

深度剖析 RocketMQ 5.0 之架构解析:云原生架构如何支撑多元化场景?
作者 | 隆基 简介: 了解 RocketMQ 5.0 的核心概念和架构概览;然后我们会从集群角度出发,从宏观视角学习 RocketMQ 的管控链路、数据链路、客户端和服务端如何交互;学习 RocketMQ 如何实现数据的存储,数据的高可用,如何利用云原生存储进一步提升竞争力。 1.前言 从初代开源消息队列崛起,到 PC 互联网、移动互联网爆发式发展,再到如今 IoT、云计算、云原生引领了新的技术趋势,消息中间件的发展已经走过了 30 多个年头。 目前,消息中间件在国内许多行业的关键应用中扮演着至关重要的角色。随着数字化转型的深入,客户在使用消息技术的过程中往往同时涉及交叉场景,比如同时进行物联网消息、微服务消息的处理,同时进行应用集成、数据集成、实时分析等,企业需要为此维护多套消息系统,付出更多的资源成本和学习成本。 在这样的背景下,2022 年,RocketMQ 5.0 正式发布,相对于 RocketMQ 4.0,架构走向云原生化,并且覆盖了更多的业务场景。想要掌握最新版本 RocketMQ 的应用,就需要进行更加体系化的深入了解。 2.背景 本节课的内容是 RocketMQ 5.0 的架构解析。前面的课程中,我们了解到 RocketMQ 5.0 可以支撑多样化的业务场景,不仅仅是业务消息,它还会支持流处理、物联网、事件驱动等场景。在进入具体的业务领域场景之前,我们先从技术的角度来了解 RocketMQ 的云原生架构,看它是如何基于这一套统一的架构支撑多元化场景的。 首先,我们会了解 RocketMQ 5.0 的核心概念和架构概览;然后我们会从集群角度出发,从宏观视角学习 RocketMQ 的管控链路、数据链路、客户端和服务端如何交互;最后,我们将回到消息队列最重要的模块存储系统,学习 RocketMQ 如何实现数据的存储,数据的高可用,如何利用云原生存储进一步提升竞争力。 3. 概览 3.1. RocketMQ 领域模型 在学习 RocketMQ 的架构之前,我们先从用户视角来来看 RocketMQ 的关键概念以及领域模型。如下图,我们按照消息的流转顺序来介绍。 最左边是消息生产者,一般对应业务系统的上游应用,在某个业务动作触发后发送消息到 Broker。Broker 是消息系统数据链路的核心,负责接收消息、存储消息、维护消息状态、消费者状态。多个 Broker 组成一个消息服务集群,共同服务一个或者多个 Topic 。刚才提到生产者生产消息并发送到 Broker,消息是业务通信的载体,每个消息包含消息 ID、消息 Topic、消息体内容、消息属性、消息业务 key 等。每条消息都属于某个 Topic,表示同一个业务语义,在阿里内部,我们交易消息的 Topic 叫做 Trade,购物车消息叫 Cart,生产者应用会把消息发送到对应的 Topic 上。Topic 里面还有 MessageQueue,这个用于消息服务的负载均衡和数据存储分片,每个 Topic 会包含一个或者多个 Message Queue 分布在不同的消息 Broker。生产者发送消息,Broker 存储消息,下一步就是消费者消费消息。消费者一般对应业务系统的下游应用,同一个消费者应用集群会共用一个 Consumer Group。消费者会和某个 Topic 产生订阅关系,订阅关系是 Consumer Group + Topic + 过滤表达式的三元组,符合订阅关系的消息就会被对应的消费者集群消费。接下来我们从技术实现角度进一步深入了解 RocketMQ 。 3.2. RocketMQ 5.0 架构概览 这是 RocketMQ 5.0 的架构概览图,从上往下看,可分为 SDK、NameServer、Proxy 和 Store 层。 我们首先来看 SDK 层,包括了 RocketMQ 的 SDK ,用户基于 RocketMQ 自身的领域模型来使用这个 SDK 。除了 RocketMQ 自身的 SDK 之外,还包括了细分领域场景的业界标准 SDK 。面向事件驱动的场景,RocketMQ 5.0 支持 CloudEvents 的 SDK;面向 IoT 的场景,RocketMQ 支持物联网 MQTT 协议的 SDK;为了方便更多的传统应用迁移到 RocketMQ,我们还支持了 AMQP 协议,未来也会开源到社区版本里。另外一个组件是是 NameServer,它承担服务发现和负载均衡的职责。通过 NameServer,客户端能获取 Topic 的数据分片和服务地址,链接消息服务器进行消息收发。 消息服务包含计算层 Proxy 和存储层 RocketMQ Store。RocketMQ 5.0 是存算分离的架构,这里的存算分离强调的是模块的分离,职责的分离。Proxy 和 RocketMQ Store 面向不同的业务场景可以合并部署,也可以分开部署。计算层 Proxy 主要承载的消息的上层业务逻辑,尤其是面向多场景、多协议的支持,比如承载 CloudEvents、MQTT、AMQP 的领域模型的实现逻辑和协议转换。面向不同的业务负载,还可以把 Proxy 分离部署,独立弹性,比如在物联网场景,Proxy 层独立部署可以面向海量物联网设备连接数进行弹性伸缩,和存储流量扩缩容解耦。RocketMQ Store 层则是负责核心的消息存储,这里包括基于 Commitlog 的存储引擎、多元索引、多副本技术和云存储集成扩展。消息系统的状态都下沉到 RocketMQ Store,其他组件全部实现无状态化。 4. 服务发现 4.1. 服务发现 第二部分我们来详细看一下 RocketMQ 的服务发现。RocketMQ 的服务发现是通过 NameServer(简称NS) 来实现的。 我们通过下方这个图来了解服务发现的机制,这个是 Proxy 和 Broker 合并部署的模式,也是 RocketMQ 最常见的模式。前面提到每个 Broker 集群会负责某些 Topic 的服务,每个 Broker 都会把自身服务哪些 Topic 注册到 NameServer 集群,和每个 NameServer 进行通信,并定时和 NS 通过心跳机制来维持租约。服务注册的数据结构包含 Topic 和 Topic 分片 MessageQueue。 在示例中 Broker1 和 Broker2 分别承载 TopicA 的一个分片。在 NS 机器上会维护全局视图,TopicA 有两个分片分别在 Broker1 和 Broker2 。RocketMQ SDK 在对 TopicA 进行正式的消息收发之前,它会随机访问一个 NameServer 机器,从而知道这个 TopicA 有哪些分片,每个数据的分片在哪个 Broker 上面,它会跟这些 Broker 建立好长连接,然后再进行消息的收发。大部分的项目的服务发现机制会通过 zookeeper 或者 etcd 等强一致的分布式协调组件来担任注册中心的角色,而 RocketMQ 有自己的特点,如果从 CAP 的角度来看,它的注册中心采用的是 AP 的模式,NameServer 节点无状态,是 ShareNothing 的架构,有更高的可用性。 再看下方这个图,我们说 RocketMQ 的存算分离是可分可合,这里采用的就是分离的部署模式,RocketMQ SDK 直接访问无状态的 Proxy 集群。这个模式可以应对更加复杂的网络环境,支持多网络类型的访问,如公网访问,实现更好的安全控制。 在整个服务发现机制中,NameServer、Proxy 都是无状态的,可以随时进行节点增减。有状态节点 Broker 的增减基于 NS 的注册机制,客户端可以实时感知、动态发现。在缩容过程中,RocketMQ Broker 还可以进行服务发现的读写权限控制,对缩容的节点禁写开读,待未读消息全消费,实现无损平滑下线。 4.2. 负载均衡 刚才我们已经知道 SDK 如何通过 NameServer 来发现 Topic 的分片信息 MessageQueue,以及 Broker 地址。基于这些服务发现的元数据,我们再来详细看看消息流量是如何在生产者、RocketMQ Broker 和消费者集群进行负载均衡的。 先来看生产链路的负载均衡,生产者通过服务发现机制,知道了 Topic 的数据分片以及对应的 Broker 地址。它的服务发现机制是比较简单的,在默认情况下采用 Round Robin 的方式轮询发送到各个 Topic 队列,保证了 Broker 集群的流量均衡。在顺序消息的场景下会略有特殊,会基于消息的业务主键 Hash 到某个队列发送,这样一来,如果有热点业务主键,那 Broker 集群也可能出现热点。除此之外,我们基于这些元数据还能根据业务需要扩展更多的负载均衡算法,比如同机房优先算法,可以降低多机房部署场景下的延迟,提升性能。 再看消费者的负载均衡,相对来说会比生产者更复杂,它有两种类型的负载均衡方式。最经典的模式是队列级负载均衡,消费者知道 Topic 的队列总数,也知道同一个 Consumer Group 下的实例数,就可以按照统一的分配算法,类似一致性 hash 的方式,让每个消费者实例绑定对应的队列,只消费绑定队列的消息,每个队列的消息也只会被一个消费者实例消费。 这种模式最大的缺点就是负载不均衡,消费者实例要绑定队列、有临时状态。如果我们有三个队列,有两个消费者实例,那就必然有一个消费者需要消费三分之二的数据,如果我们有四个消费者,那么第四个消费者就要空跑。所以在 RocketMQ 5.0 里面,我们引入了消息粒度的负载均衡机制,无需绑定队列,消息在消费者集群随机分发,这样就可以保障消费者集群的负载均衡。更重要的是这种模式更加符合未来 Serverless 化的趋势,Broker 的机器数、Topic 的队列数和消费者实例数完全解耦,可以独立扩缩容。 5. 存储系统 前面通过架构概览和服务发现机制,我们已经对 RocketMQ 有比较全局性的了解。接下来我们将深入 RocketMQ 的存储系统,这个模块对 RocketMQ 的性能、成本、可用性有决定性作用。 5.1. 存储核心 先来看一下 RocketMQ 的存储核心。存储核心由 Commitlog、Consumequeue 和 Index 文件组成。消息存储首先写到 Commitlog,刷盘并复制到 slave 节点来完成持久化,Commitlog 是 RocketMQ 存储的 source of true,通过它可以构建完整的消息索引。相比于 Kafka 而言,RocketMQ 把所有 Topic 的数据都写到 Commitlog 文件,最大化顺序 io,使得 RocketMQ 单机可以支撑万级的 Topic。 在写完 Commitlog 之后,RocketMQ 会异步分发出多个索引,首先是 ConsumeQueue 索引,这个和 MessageQueue 是对应的,基于这个索引可以实现消息的精准定位,可以按照 Topic、队列 id 和位点定位到消息,消息回溯功能也是基于这个实现的。另外一个很重要的索引是哈希索引,它是消息可观测的基础。通过持久化的 hash 表来实现消息业务主键的查询能力,消息轨迹主要是基于这个来实现的。 除了消息本身的存储之外,Broker 还承载了消息元数据的存储。包括 topics 的文件,表示该 Broker 会对哪些 Topic 提供服务,还维护了每个 Topic 队列数、读写权限、顺序性等属性。还有一个 Subscription、ConsumerOffset 文件,这两个维护了 Topic 的订阅关系以及每个消费者的消费进度。还有 Abort、Checkpoint 文件则是用于完成重启后的文件恢复,保障数据完整性。 5.2. Topic 高可用 上面的内容中,我们站在单机的视角,从功能的层面学习 RocketMQ 的存储引擎,包括 Commitlog 和索引。现在我们重新跳出来,再从集群视角看 RocketMQ 的高可用。我们先定义一下 RocketMQ 的高可用,指当 RocketMQ 集群出现 NameServer、Broker 局部不可用的时候,指定的 Topic 依然是可读可写的。 RocketMQ 可以应对三类故障场景。 第一种 case,某对主备单机不可用。如下方这个图,当 Broker2 主宕机,备可用。TopicA 依然可读可写,其中分片1可读可写,分片 2 可读不可写,Topic A 在分片 2 的未读消息依然可以消费。总结起来就是 Broker 集群里,只要任意一组 Broker 存活一个节点,Topic 的读写可用性不受影响。如果某组 Broker 主备全部宕机,那么 Topic 新数据的读写也不受影响,未读消息会延迟,待任意主备启动才能继续消费。 接下来,再看 NameServer 集群的故障情况,由于 NameServer 是 ShareNothing 的架构,每个节点都是无状态的,并且是 AP 模式,不需要依赖多数派算法,所以只要有一台 NameServer 存活,整个服务发现机制都是正常的,Topic 的读写可用性不受影响。 甚至在更极端的情况下,整个 NS 都不可用,由于 RocketMQ 的 SDK 对服务发现元数据有缓存,只要 SDK 不重启,它依然可以按照当下的 topic 元数据,继续进行消息收发。 5.3. MessageQueue 高可用 从 Topic 高可用的实现中我们发现,虽然 Topic 持续可读可写,但是 Topic 的读写队列数会发生变化。队列数变化,会对某些数据集成的业务有影响,比如说异构数据库 Binlog 同步,同一个记录的变更 Binlog 会写入不同的队列,重放 Binlog 可能会出现乱序,导致脏数据。所以我们还需要对现有的高可用进一步增强,要保障局部节点不可用时,不仅 Topic 可读可写,并且 Topic 的可读写队列数量不变,指定的队列也是可读可写的。 如下图,NameServer 或 Broker 任意出现单点不可用,Topic A 依然保持 2 个队列,每个队列都具备读写能力。 为了解决 MessageQueue 高可用的场景,RocketMQ 5.0 引入全新的高可用机制。我们先来了解其中的核心概念: Dledger Controller,这是一个基于 raft 协议的强一致元数据组件,来执行选主命令、维护状态机信息。 SynStateSet,如图,它维护了处于同步状态的副本组集合,这个集合里的节点都有完整的数据,当主节点宕机后,就从这个集合中选择新的主节点。 Replication,用于不同副本之间的数据复制、数据校验、截断对齐等事项。 下图是 RocketMQ 5.0 HA 的架构全景图,这个高可用架构具有多个优势。 一是在消息存储引入了朝代和开始位点,基于这两个数据,完成数据校验、截断对齐,在构建副本组的过程中简化数据一致性逻辑。 二是基于 Dledger Controller,我们不需要引入 zk、etcd 等外部分布式一致性系统,并且 Dledger Controller 还可以和 NameServer 合并部署,简化运维、节约机器资源。 三是 RocketMQ 对 Dledger Controller 是弱依赖,即便 Dledger 整体不可用了,也只会影响选主,不影响正常的消息收发流程。 四是可定制,用户可以根据业务对数据可靠性、性能、成本综合选择,比如副本数可以是2、3、4、5,副本直接可以是同步复制、异步复制。如 22 模式表示,2 副本、并且数据同步复制;23 模式表示3副本,2副本多数派完成复制,才算成功。用户还可以将其中的一个副本部署在异地机房,异步复制实现容灾。 5.4. 云原生存储 前面我们讲的存储系统都是 RocketMQ 面向本地文件系统的实现。但是在云原生时代,当我们把 RocketMQ 部署到云环境,可以进一步利用云原生基础设施,如云存储来进一步增强 RocketMQ 的存储能力。在 RocketMQ 5.0 里面我们提供了多级存储的特性,它是内核级的存储扩展,我们面向对象存储扩展了对应的 Commitlog、ConsumeQueue 和 IndexFile;我们采用了插件化的设计,多级存储可以有多种实现,在阿里云上,我们基于 OSS 对象服务实现,在 AWS 上我们则可以面向 S3 的接口来实现。 通过引入了这个云原生的存储,RocketMQ 释放了很多红利: 无限存储能力,消息存储空间不受本地磁盘空间的限制,原来是保存几天,现在可以几个月、甚至存一年。另外对象存储也是业界成本最低的存储系统,特别适合冷数据存储。 Topic 的 TTL,原来多个 Topic 的生命周期是和 Commitlog 绑定,统一的保留时间。现在每个 Topic 都会使用独立的对象存储 Commitlog 文件,可以有独立的 TTL。 存储系统进一步的存算分离,能把存储吞吐量的弹性和存储空间的弹性分离。 冷热数据隔离,分离了冷热数据的读链路,能大幅度提升冷读性能,不会影响在线业务。
作者:隆基
#技术探索

2024年8月9日

深度剖析 RocketMQ 5.0 之消息进阶:如何支撑复杂业务消息场景?
作者 | 隆基 简介: 本文主要学习 RocketMQ 的一致性特性,一致性对于交易、金融都是刚需。从大规模复杂业务出发,学习 RocketMQ 的 SQL 订阅、定时消息等特性。再从高可用的角度来看,这里更多的是大型公司对于高阶可用性的要求,如同城容灾、异地多活等。 1. 前言 从初代开源消息队列崛起,到 PC 互联网、移动互联网爆发式发展,再到如今 IoT、云计算、云原生引领了新的技术趋势,消息中间件的发展已经走过了 30 多个年头。 目前,消息中间件在国内许多行业的关键应用中扮演着至关重要的角色。随着数字化转型的深入,客户在使用消息技术的过程中往往同时涉及交叉场景,比如同时进行物联网消息、微服务消息的处理,同时进行应用集成、数据集成、实时分析等,企业需要为此维护多套消息系统,付出更多的资源成本和学习成本。 在这样的背景下,2022 年,RocketMQ 5.0 正式发布,相对于 RocketMQ 4.0,架构走向云原生化,并且覆盖了更多的业务场景。想要掌握最新版本 RocketMQ 的应用,就需要进行更加体系化的深入了解。 2. 背景 今天的课程是 RocketMQ 5.0 消息进阶。这节课依然聚焦在业务消息场景,我们在 RocketMQ 5.0 概述里面就提到 RocketMQ 可以应对复杂的业务消息场景。这节课我们就从功能特性的角度出发,来看 RocketMQ 是如何去解决复杂业务场景的。 第一部分会先学习 RocketMQ 的一致性特性,一致性对于交易、金融都是刚需。第二部分,我们从大规模复杂业务出发,学习 RocketMQ 的 SQL 订阅、定时消息等特性。第三部分,我们再从高可用的角度来看,这里更多的是大型公司对于高阶可用性的要求,如同城容灾、异地多活等。 3. 一致性 3.1. 事务消息 3.1.1. 场景 我们先来看 RocketMQ 的第一个特性——事务消息,这是和一致性相关的特性,这也是 RocketMQ 有别于其他消息队列的一个最具区分度的特性。我们还是继续沿用大规模电商系统的案例,如图,我们仔细梳理一下流程,付款成功会在交易系统中订单数据库将订单状态更新为已付款,然后交易系统再发一条消息给 RocketMQ,RocketMQ 把订单已付款的事件通知给所有下游的应用,保障后续的履约环节。 但是这个流程有个问题,就是交易系统写数据库和发消息是分开的,它不是一个事务。会出现多种异常情况,比如数据库写成功了,但消息发失败了,这个订单的状态下游应用接收不到,对于电商业务可能就造成大量用户付款了,但是卖家不发货。如果先发消息成功,再写数据库失败,会造成下游应用认为订单已付款,推进卖家发货,但是实际用户未付款成功。这些异常都会对电商业务造成大量脏数据,产生灾难性业务后果。 这就需要使用 RocketMQ 高阶特性——事务消息。事务消息的能力是要保障生产者的本地事务(如写数据库)、发消息事务的一致性,最后通过 Broker at least once 的消费语义,保证消费者的本地事务也能执行成功。最终实现生产者、消费者对同一业务的事务状态达到最终一致。 3.1.2. 原理 如下图,事务消息的实现是两个阶段:提交+事务补偿机制结合实现的。 首先,生产者会发送 half 消息,也就是 prepare 消息,broker 会把 half 队列中。接下来生产者执行本地事务,一般是写数据库,本地事务完成后,会往 RocketMQ 发送 commit 操作,RocketMQ 会把 commit 操作写入 OP 队列,并进行 compact,把已提交的消息写到 ConsumeQueue 对消费者可见。反过来如果是 rollback 操作,则会跳过对应的 half 消息。面对异常的情况,比如生产者在发送 commit 或者 rollback 之前宕机了,RocketMQ broker 还会有补偿检查机制,定期回查 Producer 的事务状态,继续推进事务。 无论是 Prepare 消息、还是 Commit/Rollback 消息、或者是 compact 环节,在存储层面都是遵守 RocketMQ 以顺序读写为主的设计理念,达到最优吞吐量。 3.1.3. demo 接下来,我们来看一个事务消息的简单示例。 使用事务消息需要实现一个事务状态的查询器,这也是和普通消息一个最大的区别。如果我们是一个交易系统,这个事务回查器的实现可能就是根据订单 ID 去查询数据库来确定这个订单的状态到底是否是提交,比如说创建成功、已付款、已退款之类的。主体的消息生产流程也有很多不同,需要开启分布式事务,进行两阶段提交,先发一个 prepare 的消息,然后再去执行本地事务。这里的本地事务一般就是执行数据库操作。然后如果本地事务执行成功的话,就整体 commit,把之前的 prepare 的消息提交掉。这样一来,消费者就可以消费这条消息。如果本地事务出现异常的话,那么就把整个事务 rollback 掉,之前的那条 prepare 的消息也会被取消掉,整个过程就回滚了。事务消息的用法变化主要体现在生产者代码,消费者使用方式和普通消息一致,demo 里面就不展示了。 3.2. 顺序消息 3.2.1. 场景 + 原理 第二个高级特性是顺序消息,这个也是 RocketMQ 的特色能力之一。它解决的是顺序一致性的问题,要保障同一个业务的消息,生产和消费的顺序保持一致。在阿里曾有个场景是买卖家数据库复制,由于阿里订单数据库采用分库分表技术,面向买卖家不同的业务场景,分别按照买家主键和卖家主键拆分成买卖家数据库。两个数据库的同步就是采用了 Binlog 顺序分发的机制,通过使用顺序消息,把买家库的 Binlog 变更按照严格顺序在卖家库回放,以此达到订单数据库的一致性。如果没有顺序保障,那么就可能出现数据库级别的脏数据,将会带来严重的业务错误。 顺序消息的实现原理如下图,充分利用 Log 天然顺序读写的特点高效实现。在 Broker 存储模型中,每个 Topic 都会有固定的 ConsumeQueue,可以理解为 Topic 的分区生产者为发送消息加上业务 Key,在这个 case 里面可以用订单 ID,同一订单 ID 的消息会顺序发送到同一个 Topic 分区,每个分区在某个时刻只会被一个消费者锁定,消费者顺序读取同一个分区的消息串行消费,以此来达到顺序一致性。 3.2.2. demo 接下来,我们来看顺序消息的一个简单demo。 对于顺序消息来说,生产者跟消费者都有需要注意的地方。 在生产阶段,首先要定义一个消息的 group。每条消息都可以选择一个业务 ID 作为消息 Group,这个业务 ID 尽量离散、随机。因为同一个业务 ID 会分配到同一个数据存储分片,生产和消费都在这个数据分片上串行,如果业务 ID 有热点,会造成严重的数据倾斜和局部消息堆积。比如说在电商交易的场景,一般会选择订单 ID 进行业务消息分组,因为订单 ID 会比较离散。但如果我们选择的是卖家 ID,就有可能会出现热点,热点卖家的流量会远大于普通卖家。 在消费阶段的话,消费阶段有跟常规的消息收发一样有两种模式,一种是全托管的 push consumer 模式,一种是半托管的simple consumer 的模式,RocketMQ SDK 会保障同一个分组的消息串行进入业务消费逻辑。需要注意的点是,我们自身的业务消费代码也要串行进行,然后同步返回消费成功确认。不要把同分组的消息又放到另外的线程池并发消费,这样会破坏顺序语义。 4. 大规模业务 4.1. SQL 过滤 4.1.1. 场景 第三个高级特性是 SQL 消费模式,这个也是复杂业务场景的刚需。我们回到阿里的电商场景,阿里的整个电商业务都是围绕着交易展开,有数百个不同的业务在订阅交易的消息。这些业务基本上都面向某个细分领域,都只需要交易 Topic 下的部分消息。按照传统的模式,一般就是全量订阅交易 Topic,在消费者本地过滤即可,但是这样会消耗大量的计算、网络资源,特别是在双十一的峰值,这个方案的成本是无法接受的。 4.1.2. 原理 为了解决这个问题,RocketMQ 提供了 SQL 消费模式。在交易场景下,每笔订单消息都会带有不同维度的业务属性,包括卖家 ID、买家 ID、类目、省市、价格、订单状态等属性,而 SQL 过滤就是能让消费者通过 SQL 语句过滤消费目标消息。如下图,某个消费者只想关注某个价格区间内的订单创建消息,于是创建这个订阅关系 【Topic=Trade ,SQL: status= order create and (Price between 50 and 100)】,Broker 会在服务端运行 SQL 计算,只返回有效数据给消费者。为了提高性能,Broker 还引入了布隆过滤器模块,在消息写入分发时刻,提前计算结果,写入位图过滤器,减少无效 IO。总的来说就是把过滤链路不断前置,从消费端本地过滤,到服务端写时过滤,达到最优性能。 4.1.3. demo 接下来,我们再来看一个 SQL 订阅的示例。目前 RocketMQ SQL 过滤支持如下的语法,包括属性非空判断、属性大小比较、属性区间过滤、集合判断和逻辑计算,能满足绝大部分的过滤需求。 在消息生产阶段,我们除了设置 Topic、Tag 之外,还能添加多个自定义属性。比如在这个案例里面,我们设置了一个 region 的属性,表示这条消息是从杭州 region 发出来的。在消费的时候,我们就可以对根据自定义属性来进行 SQL 过滤订阅了。第一个 case 是我们用了一个 filter expression,判断 region 这个字段不为空且等于杭州才消费。第二个 case 添加更多的条件,如果这是一笔订单消息,我们还可以同时判断 region 条件和价格区间来决定是否消费。第三个 case 是全接收模式,表达式直接为 True,这个订阅方式会接收某一个主题下面的全量消息,不进行任何过滤。 4.2. 定时消息 4.2.1. 场景+原理 第四个高级特性是定时消息,生产者可以指定某条消息在发送后经过一定时间后才对消费者可见。有不少业务场景需要大规模的定时事件触发,比如典型的电商场景,基本上都有订单创建30分钟未付款就自动关闭订单的逻辑,定时消息能为这个场景带来极大的便利性。 RocketMQ 的定时消息是基于时间轮来实现的。TimerWheel,相信大家并不陌生,模拟表盘转动,来达到对时间进行排序的目的。TimerWheel 中的每一格代表着最小的时间刻度,称之为Tick,RocketMQ 里面是每一个 Tick 为一秒,同一个时刻的消息会写到同一个格子里。由于每个时刻可能会同时触发多条消息,并且每条消息的写入时刻都不一样,所以 RocketMQ 也同时引入了 Timerlog 的数据结构,Timerlog 按照顺序 append 方式写入数据,每个元素都包含消息的物理索引、以及指向同一个时刻的前一条消息,组成一个逻辑链表。TimeWheel 的每个格子都维护这个时刻的消息链表的头尾指针。类似表盘,TimerWheel 会有一个指针,代表当前时刻,绕着 TimerWheel 循环转动,指针所指之处,代表这一个 Tick 到期,所有内容一起弹出,会写到 ConsumeQueue,对消费者可见。 目前 RocketMQ 的定时消息性能已经远超 RabbitMQ 和 ActiveMQ。 5. 全局高可用 接下来,我们再讲一下 RocketMQ 的全局高可用技术解决方案。 在消息基础原理的文章里,我们提到 RocketMQ 的高可用架构,主要是指 RocketMQ 集群内的数据多副本和服务高可用。今天我们这里讲的高可用是全局的,就是业界经常说的同城容灾、两地三中心、异地多活等架构。现在蚂蚁支付和阿里交易采用的是异地多活的架构。异地多活相对于冷备、同城容灾、两地三中心模式具备更多的优点,可以应对城市级别的灾难,如地震、断电等事件。除此之外,一些因为人为的操作,比如说某个基础系统变更,引入新的 bug,导致的整个机房级别的不可用,异地多活的架构可以直接把流量切到可用机房,优先保障业务连续性,再去定位具体的问题。另一方面,异地多活还能实现机房级别的扩容,单一机房的计算存储资源是有限的,异地多活架构可以把业务流量按照比例分散在全国各地机房。同时多活架构实现了所有机房都在提供业务服务,而不是冷备状态,资源利用率大幅度提升。由于是多活状态,面对极端场景的切流,可用性更有保障,信心更足。 在异地多活的架构中,RocketMQ 承担的是基础架构的多活能力。多活的架构分为几个模块: 首先是接入层,通过统一接入层按照业务 ID 把用户请求分散到多个机房,业务ID一般可采用用户ID。 其次是应用层,应用层一般无状态,当请求进入某个机房后,需要尽量保障该请求的整个链路都在单元内封闭,包括 RPC、数据库访问、消息读写。这样才可以降低访问延迟,保障系统性能不会因为多活架构衰退。 再往下是数据层,包括数据库,消息队列等有状态系统。这里侧重讲解 RocketMQ 的异地多活,RocketMQ 通过 connector 组件,实现按 topic 粒度实时同步消息的数据;按照 Consumer 和 Topic 的组合粒度实时同步消费状态。 最后还需要全局的管控层。管控层要维护全局的单元化规则,哪些流量走到哪些机房;管理多活元数据配置,哪些应用需要多活、哪些 Topic 需要多活;另外在切流时刻,要协调所有系统的切流过程,控制好切流顺序。
作者:隆基
#技术探索

2024年8月9日

深度剖析 RocketMQ 5.0 之 IoT 消息:物联网需要什么样的消息技术?
作者|隆基 简介: 本文来学习一个典型的物联网技术架构,以及在这个技术架构里面,消息队列所发挥的作用。在物联网的场景里面,对消息技术的要求和面向服务端应用的消息技术有什么区别?学习 RocketMQ 5.0 的子产品 MQTT,是如何解决这些物联网技术难题的。 1.前言 从初代开源消息队列崛起,到 PC 互联网、移动互联网爆发式发展,再到如今 IoT、云计算、云原生引领了新的技术趋势,消息中间件的发展已经走过了 30 多个年头。 目前,消息中间件在国内许多行业的关键应用中扮演着至关重要的角色。随着数字化转型的深入,客户在使用消息技术的过程中往往同时涉及交叉场景,比如同时进行物联网消息、微服务消息的处理,同时进行应用集成、数据集成、实时分析等,企业需要为此维护多套消息系统,付出更多的资源成本和学习成本。 在这样的背景下,2022 年,RocketMQ 5.0 正式发布,相对于 RocketMQ 4.0,架构走向云原生化,并且覆盖了更多的业务场景。想要掌握最新版本 RocketMQ 的应用,就需要进行更加体系化的深入了解。 2.背景 本节课分为三个部分,第一部分,我们来学习一个典型的物联网技术架构,以及在这个技术架构里面,消息队列所发挥的作用。第二部分我们会讲在物联网的场景里面,对消息技术的要求和面向服务端应用的消息技术有什么区别?第三部分,我们会学习 RocketMQ 5.0 的子产品 MQTT,是如何解决这些物联网技术难题的。 3. 物联网消息场景 我们先来了解一下物联网的场景是什么,以及消息在物联网里面有什么作用。 物联网肯定是最近几年最火的技术趋势之一,有大量的研究机构、行业报告都提出了物联网快速发展的态势。首先是物联网设备规模爆发式增长,预测会在 2025 年达到 200 多亿台。 其次是物联网的数据规模,来自物联网的数据增速接近 28%,并且未来有 90% 以上的实时数据来自物联网场景。这也就意味着未来的实时流数据处理数据类型会有大量物联网数据。 最后一个重要的趋势是边缘计算,未来会有 75% 的数据在传统数据中心或者云环境之外来处理,这里的边缘指的是商店、工厂、火车等等这些离数据源更近的地方。由于物联网产生的数据规模很大,如果全部数据传输到云端处理,会面临难以承受的成本,应该充分利用边缘的资源直接计算,再把高价值的计算结果传输云端;另一方面,在离用户近的地方计算直接响应,可以降低延迟,提升用户体验。 物联网的发展速度这么快,数据规模那么大,跟消息有什么关系呢?我们通过这个图来看一下消息在物联网场景发挥的作用:第一个作用是连接,承担通信的职责,支持设备和设备的通信,设备和云端应用的通信,比如传感器数据上报、云端指令下发等等这些功能,支撑 IoT 的应用架构,连接云边端。第二个作用是数据处理,物联网设备源源不断的产生数据流,有大量需要实时流处理的场景,比如设备维护,高温预警等等。基于 MQ 的事件流存储和流计算能力,可以构建物联网场景的数据架构。 4. 物联网消息技术 下面我们来看看在物联网场景里,对消息技术有什么诉求?我们先从这个表格来分析物联网消息技术跟之前我们讲的经典消息技术有什么区别? 经典的消息主要是为服务端系统提供发布订阅的能力,而物联网的消息技术是为物联网设备之间、设备和服务端之间提供发布订阅的能力。我们来分别看一下各自场景的特点。 在经典消息场景里,消息 broker、消息客户端都是服务端系统,这些系统都是部署在 IDC 或者公共云环境。无论是消息客户端、消息服务端,都会部署在配置比较不错的服务器机型,有容器、虚拟机、物理机等等这些形式。同时,客户端和消息服务端一般都是部署在同一个机房,属于内网环境,网络带宽特别高,而且网络质量稳定。客户端的数量一般对应到应用服务器的数量,规模较小,一般都是数百、数千台服务器,只有超大规模的互联网公司才能达到百万级。从生产消费的角度来看,每个客户端的消息生产发送量一般对应到其业务的 TPS,能达数百数千的 TPS。在消息消费方面,一般是采用集群消费,一个应用集群共享一个消费者 ID,共同分担该消费组的消息。每条消息的订阅比一般也不高,正常情况下不会超过 10 个。 而在 IoT 消息场景,很多条件都不一样,甚至是相反的。IoT的消息客户端是微型设备,计算存储资源都很有限,消息服务端可能要部署在边缘环境,使用的服务器配置也会比较差。另一方面物联网设备,一般是通过公网的环境来连接的,它的环境特别复杂,而且经常会不断移动,有些时候会断网或处于弱网环境,网络质量差。物联网场景中,消息的客户端实例数对应到物联网设备数,可以到亿级别,比大型互联网公司的服务器数量要大很多。每个设备的消息 tps 不高,但是一条消息有可能同时被百万级的设备接受,订阅比特别高。 5. RocketMQ MQTT 从这里可以看出,物联网需要的消息技术和经典的消息设计很不一样。接下来我们再来看,为了应对物联网的消息场景,RocketMQ 5.0 做了哪些事情?RocketMQ 5.0 里面,我们发布了一个子产品,叫做 RocketMQ MQTT。它有三个技术特点: 首先,它采用的标准的物联网协议 MQTT,该协议面向物联网弱网环境、低算力的特点设计,协议十分精简。同时有很丰富的特性,支持多种订阅模式,多种消息的 QoS,比如有最多一次,最少一次,当且仅当一次。它的领域模型设计也是 消息、 主题、发布订阅等等这些概念,和 RocketMQ 特别匹配,这为打造一个云端一体的 RocketMQ 产品形态奠定了基础。 第二,它采用的是纯算分离的架构。RocketMQ Broker 作为存储层,MQTT 相关的领域逻辑都在 MQTT Proxy 层实现,并面向海量的连接、订阅关系、实时推送深度优化,Proxy 层可以根据物联网业务的负载独立弹性,如连接数增加,只需要新增 proxy 节点。 第三,它采用的是端云一体化的架构,因为领域模型接近、并且以 RocketMQ 作为存储层,每条消息只存一份,这份消息既能被物联网设备消费,也能被云端应用消费。另外 RocketMQ 本身是天然的流存储,流计算引擎可以无缝对 IoT 数据进行实时分析。 5.1. IoT 消息存储模型 接下来我们再从几个关键的技术点,来深入了解 RocketMQ 的物联网技术实现。 5.1.1. 读放大为主,写放大为辅 首先要解决的问题是物联网消息的存储模型,在发布订阅的业务模型里,一般会采用两种存储模型,一种是读放大,每条消息只写到一个公共队列,所有的消费者读取这个共享队列,维护自己的消费位点。另外一种模型是写放大模型,每个消费者有自己的队列,每条消息都要分发到目标消费者的队列中,消费者只读自己的队列。 因为在物联网场景里,一条消息可能会有百万级的设备消费,所以,很显然,选择读放大的模型能显著降低存储成本、提高性能。 但是,只选择读放大的模式没法完全满足要求,MQTT 协议有其特殊性,它的 Topic 是多级 Topic,而且订阅方式既有精准订阅,也有通配符匹配订阅。比如在家居场景,我们定义一个多级主题,比如家/浴室/温度,有直接订阅完整多级主题的 家/浴室/温度,也有采用通配符订阅只关注温度的,还有只关注一级主题为 家的所有消息。 对于直接订阅完整的多级主题消费者可以采用读放大的方式直接读取对应多级主题的公共队列;而采用通配符订阅的消费者无法反推消息的 Topic,所以需要在消息存储时根据通配符的订阅关系多写一个通配符队列,这样消费者就可以根据其订阅的通配符队列读取消息。 这就是 RocketMQ 采用的读放大为主,写放大为辅的存储模型。 5.1.2. 端云一体化存储 基于上节课的分析,我们设计了 RocketMQ 端云一体化的存储模型,看下这张图。 消息可以来自各个接入场景(如服务端的 RMQ/AMQP,设备端的 MQTT),但只会写一份存到 commitlog 里面,然后分发出多个需求场景的队列索引,比如服务端场景(MQ/AMQP)可以按照一级 Topic 队列进行传统的服务端消费,设备端场景可以按照 MQTT 多级 Topic 以及通配符订阅进行消费消息。 这样我们就可以基于同一套存储引擎,同时支持服务端应用集成和 IoT 场景的消息收发,达到端云一体化。 5.2. 队列规模问题 我们都知道像 Kafka 这样的消息队列每个 Topic 是独立文件,但是随着 Topic 增多消息文件数量也增多,顺序写就退化成了随机写,性能明显下降。RocketMQ 在 Kafka 的基础上进行了改进,使用了一个 Commitlog 文件来保存所有的消息内容,再使用 CQ 索引文件来表示每个 Topic 里面的消息队列,因为 CQ 索引数据比较小,文件增多对 IO 影响要小很多,所以在队列数量上可以达到十万级。但是这个终端设备队列的场景下,十万级的队列数量还是太小了,我们希望进一步提升一个数量级,达到百万级队列数量,所以,我们引入了 Rocksdb 引擎来进行 CQ 索引分发。 面向 IoT 的百万级队列设计 Rocksdb 是一个广泛使用的单机 KV 存储引擎,有高性能的顺序写能力。因为我们有了 commitlog 已具备了消息顺序流存储,所以可以去掉 Rocksdb 引擎里面的 WAL,基于 Rocksdb 来保存 CQ 索引。在分发的时候我们使用了 Rocksdb 的 WriteBatch 原子特性,分发的时候把当前的 MaxPhyOffset 注入进去,因为 Rocksdb 能够保证原子存储,后续可以根据这个 MaxPhyOffset 来做 Recover 的 checkpoint。最后,我们也提供了一个 Compaction 的自定义实现,来进行 PhyOffset 的确认,以清理已删除的脏数据。 5.3. IoT 消息推送模型 介绍了底层的队列存储模型后,我们再详细描述一下匹配查找和可靠触达是怎么做的。在 RocketMQ 的经典消费模式里,消费者是直接采用长轮询的方式,从客户端直接发起请求,精确读取对应的 topic 队列。而在 MQTT 场景里,因为客户端数量、订阅关系数量规模巨大,无法采用原来的长轮询模式,消费链路的实现更加复杂。这里使用的是推拉结合的模型。 这里展示的是一个推拉模型,终端设备通过 MQTT 协议连到 Proxy 节点。消息可以来自多种场景(MQ/AMQP/MQTT)发送过来,存到 Topic 队列后会有一个 notify 逻辑模块来实时感知这个新消息到达,然后会生成消息事件(就是消息的 Topic 名称),把这个事件推送至网关节点,网关节点根据它连上的终端设备订阅情况进行内部匹配,找到哪些终端设备能匹配上,然后会触发 pull 请求去存储层读取消息,再推送终端设备。 一个重要问题,就是 notify 模块怎么知道一条消息在哪些网关节点上面的终端设备感兴趣,这个其实就是关键的匹配查找问题。一般有两种方式:第一种,简单的广播事件;第二种,集中存储在线订阅关系(比如图里的 lookup 模块),然后进行匹配查找,再精准推送。事件广播机制看起来有扩展性问题,但是其实性能并不差,因为我们推送的数据很小,就是 Topic 名称,而且相同 Topic 的消息事件可以合并成一个事件,我们线上就是默认采用的这个方式。集中存储在线订阅关系,这个也是常见的一种做法,如保存到 RDS、Redis 等等,但要保证数据的实时一致性也是有难度的,而且要进行匹配查找对整个消息的实时链路RT开销也会有一定的影响。这幅图里还有一个 Cache 模块,用来做消息队列 cache,避免在大广播比场景下每个终端设备都向存储层发起读数据情况。
作者:隆基
#技术探索

2024年8月9日

深度剖析 RocketMQ 5.0 之事件驱动:云时代的事件驱动有啥不同?
作者|隆基 简介: 本文技术理念的层面了解一下事件驱动的概念。RocketMQ 5.0 在面向云时代的事件驱动架构新推出的子产品 EventBridge,最后再结合几个具体的案例帮助大家了解云时代的事件驱动方案。 1.前言 从初代开源消息队列崛起,到 PC 互联网、移动互联网爆发式发展,再到如今 IoT、云计算、云原生引领了新的技术趋势,消息中间件的发展已经走过了 30 多个年头。 目前,消息中间件在国内许多行业的关键应用中扮演着至关重要的角色。随着数字化转型的深入,客户在使用消息技术的过程中往往同时涉及交叉场景,比如同时进行物联网消息、微服务消息的处理,同时进行应用集成、数据集成、实时分析等,企业需要为此维护多套消息系统,付出更多的资源成本和学习成本。 在这样的背景下,2022 年,RocketMQ 5.0 正式发布,相对于 RocketMQ 4.0,架构走向云原生化,并且覆盖了更多的业务场景。想要掌握最新版本 RocketMQ 的应用,就需要进行更加体系化的深入了解。 2.背景 今天我们要学习的课程是 RocketMQ 5.0 的事件驱动。事件驱动是一个经典的概念,通过今天这节课,我们会掌握云时代的事件驱动和之前有哪些不同 这是今天我们要学习的内容,第一部分先从技术理念的层面了解一下事件驱动的概念。第二部分会讲,RocketMQ 5.0 在面向云时代的事件驱动架构新推出的子产品 EventBridge,最后再结合几个具体的案例帮助大家了解云时代的事件驱动方案。 3. 事件驱动架构 3.1. 事件驱动架构定义 首先我们来学习一下什么是事件驱动。先从事件驱动的定义来看,事件驱动本质上是一种软件设计模式。它能够最大化降低不同模块以及不同系统之间的耦合度。 这里有一个典型的事件驱动架构图,首先是事件生产者发送事件到 EventBroker,然后 EventBroker 会把事件路由到对应的消费者进行事件处理。事件处理能够灵活扩展,随时增减事件消费者,事件生产者对此透明。 为什么说事件驱动是个很经典的设计模式呢,因为早在几十年前,就出现过多种事件驱动的技术,比如桌面客户端编程框架,点击按钮就可以触发 onclick 事件,开发者编写业务逻辑响应事件。在编程语言上,也经常会采用事件驱动的代码模式,比如 callback、handler 这类的函数。进入分布式系统的时代,系统之间的通信协同也会采用事件驱动的方式。 你有没有发现,这里的图和之前 RocketMQ 的消息应用解耦图很像。没错,无论是消息的发布订阅,还是事件的生产消费都是为了进行代码解耦、系统解耦。消息队列更偏技术实现,大部分的 EventBroker 都是基于消息队列实现的,而事件驱动更偏向于架构理念。 3.2. 事件的特征 从技术角度来看,消息队列是和 RPC 对应的,一个是同步通信,一个是异步通信。消息队列并不会规定消息的内容,只负责传输二进制内容。如果从技术实现来看,的确,EDA 需要的核心技术就是消息队列的技术。事件驱动跟消息驱动最大的区别就是,事件是一种特殊的消息,只有消息满足了某些特征,才能把它叫做事件。 我打个比方,来看左边这个图。消息就像是一个抽象类,有多种子类,最主要的就是 Command 和 Event 两种。以信号灯为例,向信号灯发送打开的消息,这就是一种 Command,信号灯接受这个 Command 并开灯。开灯后,信号灯对外发出信号灯变成绿色的消息,这个就是一种 Event。 对于 Event 来说,有四个主要的特征: 第一,它是一个不可变的,事件就是表示已经发生了的事情,已经成为事实。 第二,事件有时间概念,并且对同一个实体来说事件的发送是有序的。如信号灯按顺序发送了绿、黄、红等事件。 第三,事件是无预期的,这个就是EDA架构之所以能够实现最大化解耦的特点,事件的产生者对于谁是事件消费者,怎么消费这个事件是不关心的。 第四,由于事件驱动是彻底解耦的,并且对于下游怎么去消费事件没有预期,所以事件是具象化的,应该包括尽可能详尽的信息,让下游消费者各取所需。比如像交通交通信号灯事件,包含多个字段,包括它的来源是谁、它的类型是什么?它的主题是什么?是具体的哪一个信号灯,另外它会包含唯一的ID,便于跟踪?它会有事件发生时间,事件的内容。 3.3. 云时代的事件驱动 在全行业数字化转型的时代,事件驱动架构应用范围扩大,成为 Gartner 年度十大技术趋势。在新型的数字化商业解决方案里,会有 60% 采纳 EDA 架构。 事件驱动作为一个经典的架构模式,为什么会在云时代再度成为焦点呢?主要有两个原因: 首先是云原生技术带来的,其中之一是微服务。微服务是云原生应用架构的核心,引入微服务架构,数字化企业能够按照小型化的业务单元和团队划分,以“高内聚、低耦合”的方式高效协作。但是微服务架构也会带来新的问题,比如大量同步微服务会面临延迟增大、可用性降低等风险,采用事件驱动的微服务体系,可提高微服务的韧性,降低延迟,实现更彻底的解耦。 另外一个云原生代表技术 Serverless 架构范式本身也是事件驱动的。现在主要的 Serverless 产品形态,无论是阿里云的函数计算、还是 AWS 的 Lambda,它们的主要触发源都是各种形态的事件,比如云产品事件,OSS 文件上传,触发用户基于函数进行文件加工处理计算;用户业务事件,EventBroker 触发函数运行消费逻辑;云产品运维事件,用户通过响应事件,在云平台的基础上扩展自己的自动化运维体系。事件驱动架构的大规模使用,能够帮助数字化企业释放云计算 Serverless 的技术红利。 IoT 也是事件驱动架构的重要推动力,有大量的 IoT 应用构建都是基于事件驱动的,比如传感器上报设备事件,温度变化事件、地址位置变化事件等等,云端应用订阅这些事件触发对应的业务流程。 在全行业大规模数字化转型后,跨业务、跨组织的业务合作会从线下搬到线上,在数字经济时代,数字化商业生态规模会持续扩大,跨组织业务协同更需要彻底解耦。而 EDA 天然具备的异步、解耦的特性就可以解决这一系列的问题。比如阿里聚石塔业务就是事件驱动的模式,聚石塔实时发布交易事件,合作伙伴包括ISV、软件服务商、品牌商家订阅消费交易事件,建设个性化的 CRM、商家运营、后台管理系统等等,形成一个庞大的电子商务数字化生态。 4. EventBridge 4.1. 云时代的事件驱动能力抽象 接下来进入第二个部分的内容,一起学习一下 RocketMQ 5.0 的 EventBridge。在了解这个系统的技术实现之前,我们先来了解一下 EventBridge 对事件驱动的通用能力抽象,从这里也可以了解到 EventBridge 的领域模型。 我们从左往右看这张图。最左边是事件源,因为这个事件是希望被跨平台消费的,所以我们希望采用业界标准来作为事件的格式。同时,事件是有可能被跨组织消费的,所以我们需要一个统一的事件中心,让这些不同的事件源都注册到这个事件中心。对消费者来说就好比是一个事件商店,能够选择自己感兴趣的事件订阅。在事件消费者开始编写消费逻辑的时候,他还需要对这个事件的格式有更清楚的了解,需要知道这个事件有哪些内容,有哪些字段,分别是什么含义,才能编写正确的消费业务逻辑。所以,EventBridge 还提供了 schema 中心,有这个 schema 中心后,消费者对于事件的格式也就一目了然,不用跟事件源的发起者进行沟通了,整个效率也得到了大幅度的提升。再往后面看,就到了事件消费的环节,因为事件的消费者种类很多,不同消费者关注不同的事件类型,EventBridge 需要提供丰富的过滤规则。即便多个消费者对同一个事件感兴趣,但是可能只需要事件的部分内容,EventBridge 还提供了事件转换的能力。这就是 RocketMQ 5.0 对事件驱动的能力抽象。 4.2. 统一事件标准 在云计算的时代、大规模数字化转型时代,我们强调事件驱动架构往往跨越了不同的组织,不同的平台。所以事件驱动架构需要一个统一的事件标准。在 EventBridge 这个产品里,我们采纳了 CNCF 基金会中的 CloudEvents 标准,这个是业界事件的事实标准,这个标准就是为了简化事件声明,提升事件在跨服务、跨平台的互操作性。 CloudEvents 带来了很多价值: 第一,它提供了一种规范,使得跨组织、跨平台的事件集成,有了共同语言,加速更多的事件集成。然后也因为有的规范,所以它可以加速跨服务,跨平台的事件的集成。 第二,随着 Serverless 的普及,各大云厂商都提供函数计算的服务,有了 CloudEvents 规范,用户在函数计算的使用上就可以实现无厂商绑定。 第三,webhook 是一种通用的集成模式,有了 CloudEvents 规范作为统一格式,不同系统的 webhook 能实现更好的互操作性。 最后,基于这样统一的规范,其实是更有利于沉淀事件驱动的基础软件设施的,比如跨服务的事件 Tracing 链路追踪。 4.3. RocketMQ EventBridge 如下图是 RocketMQ 面向 EDA 场景全新推出的产品形态 EventBridge。 它的核心技术都是基于 RocketMQ,但是在产品界面上面向事件驱动的业务进行一层抽象,核心领域对象从消息变成 CloudEvents。基于统一事件标准来构建事件驱动的数字生态。它的事件源也很多样,可以是云产品事件,可以是 SaaS 平台事件,应用自定义事件、通用的 WebHook。当然,它的事件目标更是多样化的,通过事件规则引擎把事件路由到不同的消费者,典型的消费者,比如函数计算,也可以是存储系统,消息通知如钉钉短信,还有通用的的 webhook。通过事件驱动这种彻底解耦的架构,更适合建设混合云、多云的数字化系统。 为了提升事件驱动的研发效率,EventBridge 也支持 Schema 的特性,支持事件信息的解释、预览,甚至还可以自动化的生成代码,让开发者以低代码、0 代码的方式完成事件集成。 EventBridge 的另一个比较重要的特性是事件规则引擎。因为不同的事件消费者,他们对于事件的兴趣是不一样的。所以我们提供了七种事件过滤模式,包括前缀匹配、后缀匹配、除外匹配、数值匹配等等,可以进行各种复杂的组合逻辑过滤,只推送消费者感兴趣的事件。 当然,就算都关心同一个事件,不同消费者对事件内部的信息关注点也会有所不同。为了提升事件消费效率,我们也提供了四种事件转化器,可以只推送给消费者它关心的事件字段。还可以对事件进行自定义的模板转化,满足更灵活的业务诉求。 作为 RocketMQ 的子项目,在 EventBridge 里也同样提供了完整的可观测的能力。能够根据事件的时间、类型查询事件列表。每个事件都会生成唯一 ID。用户可以根据唯一 ID 去精确的定位事件的内容、发生时间、对应的事件规则,下游的消费状况,精准排查问题。 5. 典型案例 接下来结合几个典型案例来看 EventBridge 的使用场景。 第一个案例适用于使用大量云产品的公司。C 客户是一家以智能消费终端为核心的科技公司,希望收集账号里全部的云上事件,方便后续做分析或故障处理。公共云的 EventBridge 汇聚了所有的云产品事件,通过 EventBridge,客户能收集全量的事件对齐进行自定义的业务处理。还能够配置事件规则,过滤异常事件推送给监控系统或者钉钉,及时关注处理。 第二个案例是 SaaS 事件的集成。现在随着整个云计算生态的繁荣,有不少企业不仅使用了公共云的 IaaS、PaaS 产品,也会同时使用三方的 SaaS 产品,比如各种 ERP、CRM 等系统。基于 EventBridge 标准的 HTTP、webhook 的集成能力,能够无缝连接三方 SaaS 系统作为事件源,企业能够收集到他所关心的所有 SaaS 事件,方便后续管理,比如申请单,入职单,报销单,订单等等这些场景。 第三个案例是 SaaS 平台集成,以钉钉为例,钉钉是典型的 SaaS 平台,他有繁荣的生态,拥有 4000+ 家的生态伙伴,包括 ISV 生态伙伴、硬件生态伙伴、服务商、咨询生态和交付生态伙伴等等。通过 EventBridge 把公共云的 Paas 层生态和钉钉的 SaaS 层生态连接起来,而且依赖 EventBridge 完成整体事件生命周期的管理,以 WebHook 的形式推送给下游 ISV 接收端。比如钉钉的官方事件源包括视频会议、日程、通讯录、审批流、钉盘、宜搭等,企业和 SaaS 厂商可以充分利用这些官方应用的事件构建企业级的应用系统,也可以把钉钉的官方数据流和其他系统做深度集成。
作者:隆基
#技术探索

2024年7月24日

Apache RocketMQ ACL 2.0 全新升级
引言 RocketMQ 作为一款流行的分布式消息中间件,被广泛应用于各种大型分布式系统和微服务中,承担着异步通信、系统解耦、削峰填谷和消息通知等重要的角色。随着技术的演进和业务规模的扩大,安全相关的挑战日益突出,消息系统的访问控制也变得尤为重要。然而,RocketMQ 现有的 ACL 1.0 版本已经无法满足未来的发展。因此,我们推出了 RocketMQ ACL 2.0 升级版,进一步提升 RocketMQ 数据的安全性。本文将介绍 RocketMQ ACL 2.0 的新特性、工作原理,以及相关的配置和实践。 升级的背景 ACL 1.0 痛点问题 RocketMQ ACL 1.0 的认证和授权流程如上图所示,在使用过程中,存在着以下痛点问题: 绕过访问控制的 IP 白名单:在标准安全实践中,IP 白名单通常用于限制客户端只能从特定 IP 或 IP 段访问资源。然而,ACL 1.0 中,IP 白名单被异常用于绕过鉴权验证的手段, 偏离了标准实践中的安全意图。这种设计上的偏差可能造成潜在的安全隐患,特别是在公网场景中,多个客户端共享同一个 IP 的情况下,会导致未授权的 IP 地址绕过正常的访问控制检查对集群中的数据进行访问。 缺乏管控 API 精细化控制:RocketMQ 提供了 130 多个管控 API,支持了集群配置,Topic、Group 的元数据管理,以及消息查询、位点重置等操作。这些操作涉及到敏感数据的处理,以及影响系统的稳定性。因此,根据用户不同角色或职责,精确定义可访问的 API 和数据范围变得至关重要。然而,ACL 1.0 仅对其中 9 个 API 进行了支持,包括 Topic、Group 元数据,以及Broker配置,剩下的 API 有可能被攻击者利用,对系统进行攻击,窃取敏感的数据。此外,要实施对这么多的管控 API 进行访问控制,现有的设计会导致大量的编码工作,并且在新增 API 时也增加了遗漏的风险。 缺少集群组件间访问控制:在 RocketMQ 架构中,涵盖了 NameServer、Broker 主从节点、Proxy 等多个关键组件。目前,这些组件之间的互相访问缺失了关键的的权限验证机制。因此,一但旦在集群外自行搭建 Broker 从节点或 Proxy 组件,便可以绕过现有的安全机制,访问并获取集群内的敏感数据,这无疑给系统的数据安全和集群的稳定性造成巨大的威胁。 特性与原理 ACL 2.0 新特性 RocketMQ ACL 2.0 针对 ACL 1.0 中的问题进行了解决,同时还带来了六个主要的新特性,具体如下: 精细的API资源权限定义:ACL 2.0 对 RocketMQ 系统中所有的资源都进行了定义,包括集群、命名空间、主题、消费者组,以实现对所有类型的资源进行独立的访问控制。此外,它将所有的 API 都纳入权限控制范畴,覆盖了包括消息收发、集群管理、元数据等各项操作,确保所有资源的任何操作都施加了严格的权限控制。 授权资源的多种匹配模式:在资源众多的集群环境中,为每个资源进行逐一授权会带来繁复的配置过程和管理负担。因此,ACL 2.0 引入了三种灵活的匹配模式:完全匹配、前缀匹配,以及通配符匹配。这些模式可以让用户根据资源的命名规范和结构特点,快速地进行统一的设定,简化权限的管理操作,提升配置的效率。 支持集群组件间访问控制:由于将所有资源类型和API操作都纳入了访问控制体系,集群内部组件之间的连接和访问也受到了权限控制,包括 Broker 主从之间的 Leader 选举、数据复制的过程,以及 Proxy 到 Broker 的数据访问等环节,这可以有效地避免潜在的数据泄露问题和对系统稳定性的风险,加强了整个集群的安全性和可靠性。 用户认证和权限校验分离:通过对认证和授权这两个关键模块进行解耦,系统可以提供类似“只认证不鉴权”等方式的灵活选择,以适应各种不同场景的需求。此外,两个组件可以分别演进、独立发展,从而诞生出多样的认证方式和先进的鉴权方法。 安全性和性能之间的平衡:当启用 ACL 后,客户端的每次请求都必须会经过完整的认证和授权流程。这确保了系统的安全性,但同时也引入了性能上的开销。在 ACL 2.0 中,提供了无状态认证授权策略和有状态认证授权策略,来分别满足对安全有极致要求,以及安全可控但性能优先这两种不同的安全和性能需求。 灵活可扩展的插件化机制:当前市场上,认证方式存在多种实现,授权方式也有不同场景的定制需求。因此,ACL 2.0 设计了一套插件化的框架,在不同层面上进行接口的定义和抽象,以支持未来对认证和授权进行扩展,满足用户根据自身业务需求定制和实现相应的解决方案。 访问控制模型 基于角色的访问控制(RBAC)和基于属性的访问控制(ABAC)是访问控制体系中两种主要的方法。RocketMQ ACL 2.0 将这两种方法进行了融合,打造出了一套更加灵活和强大的访问控制系统。RBAC 是基于角色的访问控制模型,通过角色进行权限的分配。RocketMQ ACL 2.0 将用户角色划分为超级用户(Super)和普通用户(Normal),超级用户具有最高级别的权限,能够无需授权即可访问资源,这简化了集群初始化及日常运维过程中的权限依赖问题。而普通用户在访问资源之前,需要被赋予相应的权限,适用于业务场景中,对资源进行按需访问。ABAC 是基于属性的访问控制模型,通过用户、资源、环境、操作等多维属性来表达访问控制策略。RocketMQ ACL 2.0 为普通用户提供了这种灵活的访问控制机制。帮助管理员根据业务需求、用户职责等因素,对资源进行更加精细的访问控制。在安全体系中,认证和授权分别扮演着不同的角色,RocetMQ ACL 2.0 将认证和授权进行了模块分离。这可以确保两个组件各司其职,降低系统的复杂度。认证服务致力于验证用户身份的合法性,而授权服务则专注于管理用户权限和访问控制。这样的划分不仅可以让代码更易于管理、维护和扩展,也为用户提供了使用上的灵活性。根据需求,用户可以选择单独启用认证或授权服务,也可以选择同时启用两者。这使得 RocketMQ ACL 既可以满足简单场景的快速部署,也能够适应复杂环境下对安全性的严格要求。 认证(Authentication) 认证作为一种安全机制,旨在验证发起访问请求者的身份真实性。它用于确保只有那些经过身份验证的合法用户或实体才能访问受保护的资源或执行特定的操作。简而言之,认证就是在资源或服务被访问之前回答“你是谁?”这个问题。RocketMQ ACL 2.0 版本维持了与 ACL 1.0 相同的认证机制,即基于 AK/SK 的认证方式。这种方式主要通过对称加密技术来核验客户端的身份,保证敏感的认证信息(如密码)不会在网络上明文传输,从而提升了整体的认证安全性。 主体模型 为了提升 RocketMQ 系统的访问控制和权限管理,ACL 2.0 针对主体模型做了以下改进和扩展: 1. 统一主体模型的抽象:为了实现不同实体的访问控制和权限管理,设计了统一的主体接口,允许系统中多个实例作为资源访问的主体。用户作为访问资源的主体之一,按照该模型实现了主体的接口。这为未来新实体类型的权限适配提供了扩展能力。 2. 角色分级与权限赋予: 超级用户:为了简化管理流程,超级用户被自动授予了全部权限,无需单独配置,从而简化了系统的初始化和日常的运维管理工作。 普通用户:普通用户的权限则需要明确授权。ACL 2.0 提供了相关的权限管理工具,可以根据组织的政策和安全需求,为普通用户赋予合适的权限。 3. 支持用户状态管理:为了应对可能出现的安全风险,比如用户密码泄露,ACL 2.0 提供了用户的启用与禁用功能。当发生安全事件,可以通过禁用用户状态,快速进行止血,从而达到阻止非法访问的目的。 认证流程 客户端流程: 1. 客户端在构建 RPC 请求时,检查是否设置了用户名和密码,若未配置,则直接发送请求; 2. 若已配置,则使用预设的加密算法对请求参数进行加密处理,并生成对应的数字签名(Signature)。 3. 在请求中附加用户名和 Signature,并将其发送至服务端以进行身份验证。 服务端流程: 1. 服务端接收到请求后,首先检查是否开启认证,若未开启,则不校验直接通过;若已开启了,则进入下一步。 2. 服务端对请求进行认证相关的参数进行解析和组装,获取包括用户名和 Signature 等信息。 3. 通过用户名在本地库中查询用户相关信息,用户不存在,则返回处理无;用户存在,则进入下一步。 4. 获取用户密码,采用相同的加密算法对请求进行加密生成 Signature,并和客户端传递的 Signature 进行比对,若两者一致,则认证成功,不一致,则认证失败。 授权(Authorization) 核心概念 授权作为一种安全机制,旨在确定访问请求者是否拥有对特定资源进行操作的权限。简而言之,授权就是在资源被访问之前回答“谁在何种环境下对哪些资源执行何种操作”这个问题。基于“属性的访问控制(ABAC)”模型,RocketMQ ACL 2.0 涵盖了以下一系列的核心概念。在系统实现中,都会以以下概念作为指导,完成整个权限管理和授权机制的设计和实现。 权限模型 基于属性的访问控制(ABAC)模型的核心概念,ACL 2.0 对权限模型做了精心的设计,要点如下: 向后兼容的权限策略:默认情况下,ACL 2.0 只匹配和检验用户自定义的权限,若未找到匹配项,则视为无权限访问资源。但考虑到 ACL 1.0 中,存在默认权限的设置,允许对未匹配资源进行“无权限访问”和“有权限访问”的默认判定。因此,我们针对默认权限策略进行了兼容,确保 ACL 1.0 到 ACL 2.0 的无缝迁移。 灵活的资源匹配模式:在资源类型方面,ACL 2.0 支持了集群(Cluster)、命名空间(Namespace)、主题(Topic)、消费者组(Group)等类型,用于对不同类型的资源进行访问控制。在资源名称方面,引入了完全匹配(LITERAL)、前缀匹配(PREFIXED),以及通配符匹配(ANY)三种模式,方便用户根据资源的命名规范和结构,快速设定统一的访问规则,简化权限的管理。 精细的资源操作类型:在消息的发送和消费的接口方面,分别定义为 PUB 和 SUB 这两种操作。在集群和资源的管理的接口方面,分别定义为 CREATE、UPDATE、DELETE、LIST、GET 五种操作。通过这种操作类型的细化,可以帮助用户在资源的操作层面,无需关心具体的接口定义,简化对操作的理解和配置。 坚实的访问环境校验:在请求访问的环境方面,ACL 2.0 加入了客户端请求 IP 来源的校验,这个校验控制在每个资源的级别,可以精确到对每个资源进行控制。IP 来源可以是特定的 IP 地址或者是一个 IP 段,来满足不同粒度的 IP 访问控制,为系统的安全性增添一道坚实的防线。 授权流程 客户端流程: 1. 客户端在构建 RPC 请求时,构建本次调用的接口入参,接口对应权限背后的操作定义。 2. 客户端在接口入参中设置本次访问的资源信息,然后将用户和资源等参数传递到服务端。 服务端流程: 1. 服务端在收到请求后,首先检查是否开启授权,若未开启,则不校验直接通过;若已开启了,则进入下一步。 2. 服务端对请求中和授权相关的参数进行解析和组装,这些数据包括用户信息、访问的资源、执行的操作,以及请求的环境等。 3. 通过用户名在本地数据存储中查询用户相关信息,若用户不存在,则返回错误;若用户存在,则进入下一步。 4. 判断当前用户是否是超级用户,若超级用户,则直接通过请求,无需做授权检查,若普通用户,则进入下一步进行详细的授权检查。 5. 根据用户名获取相关的授权策略列表,并对本次请求的资源、操作,以及环境进行匹配,同时按照优先级进行排序。 6. 根据优先级最高的授权策略做出决策,若授权策略允许该操作,则返回授权成功,若拒绝该操作,则返回无权限错误。 授权参数的解析 在 ACL 2.0 中,更具操作类型和请求频率,对授权相关参数(包括资源、操作等)的解析进行了优化。1. 硬编码方式解析对于消息发送和消费这类接口,参数相对较为复杂,且请求频次也相对较高。考虑到解析的便捷性和性能上的要求,采用硬编码的方式进行解析。2. 注解方式解析对于大量的管控接口,采用硬编码的方式工作量巨大,且这些接口调用频次较低,对性能要求不高,所以采用注解的方式进行解析,提高编码效率。 权限策略优先级 在权限策略匹配方面,由于支持了模糊的资源匹配模式,可能出现同一个资源对应多个权限策略。因此,需要一套优先级的机制来确定最终使用哪一套权限策略。假设配置了以下授权策略,按照以上优先级资源的匹配情况如下: 认证授权策略 出于安全和性能的权衡和考虑,RocketMQ ACL 2.0 为认证和授权提供了两种策略:无状态认证授权策略(Stateless)和有状态认证授权策略(Stateful)。 无状态认证授权策略(Stateless): 在这种策略下,每个请求都会经过独立的认证和授权过程,不依赖于任何先前的会话和状态信息。这种严格的策略可以保证更高级别的安全保证。对权限进行变更,可以更加实时的反应在随后的请求中,无需任何等待。然而,这种策略在高吞吐的场景中可能会导致显著的性能负担,如增加系统 CPU 的使用率以及请求的耗时。 有状态认证授权策略(Stateful): 在这种策略下,同一个客户端连接,相同资源以及相同的操作下,第一次请求会经过完整的认证和授权,后续请求则不再进行重复认证和授权。这种方法可以有效地降低性能小号,减少请求的耗时,特别适合吞吐量较高的场景。但是,这种策略可能引入了安全上的妥协,对权限的变更也无法做到实时的生效。 在这两者策略的选择上,需要权衡系统的安全性要求和性能需求。如果系统对安全性的要求很高,并且可以容忍一定的性能损耗,那么无状态认证授权策略可能是更好的选择。相反,如果系统需要处理大量的并发请求,且可以在一定程度上放宽安全要求,那么有状态认证授权策略可能更合适。在实际部署时,还应该结合具体的业务场景和安全要求来做出决策。 插件化机制 为了适应未来持续发展的认证鉴权方式,以及满足用户针对特定场景的定制需求,RocketMQ ACL 2.0 在多个环节上提供了灵活性和可扩展性。 认证和授权策略的扩展:默认情况下,RocketMQ ACL 2.0 提供了无状态认证授权策略(Stateless)和有状态认证授权策略(Stateful),以满足绝大多数用户对安全和性能的要求。但是,后续仍然可以探索出更优的策略,来兼顾安全和性能之间的平衡。 认证和授权方式的扩展:当前,在认证方面,市场上已经沉淀了多种成熟的实现,RocketMQ 目前只实现了其中一种,通过插件化的能力进行预留,未来可以轻松的引入更多的认证机制。在授权方面,RocketMQ 基于 ABAC 模型实现了一套主流的授权方式,以适应广泛的用户需求。但也提供了插件化的能力,方便未来能适配出更多贴合未来发展的解决方案。 认证和授权流程的编排:基于责任链设计模式,RocketMQ ACL 2.0 对其默认的认证和授权流程进行了灵活的编排。用户可以扩展或重写这些责任链节点,从而能够定制针对其具体业务场景的认证和授权逻辑。 用户和权限存储的扩展:RocketMQ 默认采用 RocksDB 在 Broker 节点上本地存储用户和权限数据。然而,通过实现预定义的接口,用户可以轻松地将这些数据迁移至任何第三方服务或存储系统中,从而优化其架构设计和操作效率。 审计日志 审计日志,用于记录和监控所有关于认证和授权的访问控制操作。通过升级日志,我们可以追踪到每一个访问的请求,确保系统的可靠性和安全性,同时,它也有助于问题的排查,进行安全的升级和满足合规的要求。RocketMQ ACL 2.0 对认证和授权相关的审计日志都进行了支持,格式如下: 认证日志 ``` 认证成功日志 [AUTHENTICATION] User:rocketmq is authenticated success with Signature = eMX/+tH/7Bc0TObtDYMcK9Ls+gg=. 认证失败日志 [AUTHENTICATION] User:rocketmq is authenticated failed with Signature = eMX/+tH/7Bc0TObtDYMcK9Ls+xx=. ``` 授权日志 ``` 授权成功日志 [AUTHORIZATION] Subject = User:rocketmq is Allow Action = Pub from sourceIp = 192.168.0.2 on resource = Topic:TPTEST for request = 10. 授权失败日志 [AUTHORIZATION] Subject = User:rocketmq is Deny Action = Sub from sourceIp = 192.168.0.2 on resource = Topic:GIDTEST for request = 10. ``` 配置与使用 部署架构 在部署架构方面,RocketMQ 提供了两种部署形态,分别是存算一体架构和存算分离架构。 存算一体架构 在 RocketMQ 存算一体架构中,Broker 组件同时承担了计算和存储的职责,并对外提供服务,接收所有客户端的访问请求。因此,由 Broker 组件承担认证和授权的重要角色。此外,Broker 组件还负责认证和授权相关的元数据的维护和存储。 存算分离架构 在 RocketMQ 存算分离架构中,存储由 Broker 组件负责,计算由 Proxy 组件负责,所有的对外请求都是由 Proxy 对外进行服务。因此,请求的认证和授权都由 Proxy 组件承担。Broker 承担元数据存储,为 Proxy 组件提供所需的认证和授权元数据的查询和管理服务。 集群配置 认证配置 参数列表 想要在服务端开启认证功能,相关的参数和使用案例主要包含如下: Broker 配置 ``` authenticationEnabled = true authenticationProvider = org.apache.rocketmq.auth.authentication.provider.DefaultAuthenticationProvider initAuthenticationUser = {"username":"rocketmq","password":"12345678"} innerClientAuthenticationCredentials = {"accessKey":"rocketmq","secretKey":"12345678"} authenticationMetadataProvider = org.apache.rocketmq.auth.authentication.provider.LocalAuthenticationMetadataProvider ``` Proxy 配置 ``` { "authenticationEnabled": true, "authenticationProvider": "org.apache.rocketmq.auth.authentication.provider.DefaultAuthenticationProvider", "authenticationMetadataProvider": "org.apache.rocketmq.proxy.auth.ProxyAuthenticationMetadataProvider", "innerClientAuthenticationCredentials": "{\"accessKey\":\"rocketmq\", \"secretKey\":\"12345678\"}" } ``` 授权配置 参数列表 想要在服务端开启授权功能,相关的参数和使用案例主要包含如下: Broker 配置 ``` authorizationEnabled = true authorizationProvider = org.apache.rocketmq.auth.authorization.provider.DefaultAuthorizationProvider authorizationMetadataProvider = org.apache.rocketmq.auth.authorization.provider.LocalAuthorizationMetadataProvider ``` Proxy 配置 ``` { "authorizationEnabled": true, "authorizationProvider": "org.apache.rocketmq.auth.authorization.provider.DefaultAuthorizationProvider", "authorizationMetadataProvider": "org.apache.rocketmq.proxy.auth.ProxyAuthorizationMetadataProvider" } ``` 如何使用 命令行使用 用户管理关于 ACL 用户的管理,相关的接口定义和使用案例如下。 接口定义 使用案例 ``` 创建用户 sh mqadmin createUser n 127.0.0.1:9876 c DefaultCluster u rocketmq p rocketmq 创建用户,指定用户类型 sh mqadmin createUser n 127.0.0.1:9876 c DefaultCluster u rocketmq p rocketmq t Super 更新用户 sh mqadmin updateUser n 127.0.0.1:9876 c DefaultCluster u rocketmq p 12345678 删除用户 sh mqadmin deleteUser n 127.0.0.1:9876 c DefaultCluster u rocketmq 查询用户详情 sh mqadmin getUser n 127.0.0.1:9876 c DefaultCluster u rocketmq 查询用户列表 sh mqadmin listUser n 127.0.0.1:9876 c DefaultCluster 查询用户列表,带过滤条件 sh mqadmin listUser n 127.0.0.1:9876 c DefaultCluster f mq ``` ACL 管理关于 ACL 授权的管理,相关的接口定义和使用案例如下。 接口定义 使用案例 ``` 创建授权 sh mqadmin createAcl n 127.0.0.1:9876 c DefaultCluster s User:rocketmq r Topic:,Group: a Pub,Sub i 192.168.1.0/24 d Allow 更新授权 sh mqadmin updateAcl n 127.0.0.1:9876 c DefaultCluster s User:rocketmq r Topic:,Group: a Pub,Sub i 192.168.1.0/24 d Deny 删除授权 sh mqadmin deleteAcl n 127.0.0.1:9876 c DefaultCluster s User:rocketmq 删除授权,指定资源 sh mqadmin deleteAcl n 127.0.0.1:9876 c DefaultCluster s User:rocketmq r Topic: 查询授权列表 sh mqadmin listAcl n 127.0.0.1:9876 c DefaultCluster 查询授权列表,带过滤条件 sh mqadmin listAcl n 127.0.0.1:9876 c DefaultCluster s User:rocketmq r Topic: 查询授权详情 sh mqadmin getAcl n 127.0.0.1:9876 c DefaultCluster s User:rocketmq ``` 客户端使用 关于 ACL 的使用,ACL 2.0 和 ACL 1.0 的使用方式一样,没有任何区别,具体参考官方案例。 消息发送 ``` ClientServiceProvider provider = ClientServiceProvider.loadService(); StaticSessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(ACCESS_KEY, SECRET_KEY); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(ENDPOINTS) .setCredentialProvider(sessionCredentialsProvider) .build(); Producer producer = provider.newProducerBuilder() .setClientConfiguration(clientConfiguration) .setTopics(TOPICS) .build(); ``` 消息消费 ``` ClientServiceProvider provider = ClientServiceProvider.loadService(); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(ENDPOINTS) .setCredentialProvider(sessionCredentialsProvider) .build(); FilterExpression filterExpression = new FilterExpression(TAG, FilterExpressionType.TAG); PushConsumer pushConsumer = provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) .setConsumerGroup(CONSUMER_GROUP) .setSubscriptionExpressions(Collections.singletonMap(TOPIC, filterExpression)) .setMessageListener(messageView { return ConsumeResult.SUCCESS; }) .build(); ``` 扩容与迁移 扩容 如果想要在运行过程中的集群扩容一台 Broker,就需要将所有的元数据都同步到这台新的 Broker 上,ACL 2.0 提供了相应的拷贝用户和拷贝授权的接口来支持这项操作。 接口定义 使用案例 ``` 拷贝用户 sh mqadmin copyUser n 127.0.0.1:9876 f 192.168.0.1:10911 t 192.168.0.2:10911 拷贝授权 sh mqadmin copyAcl n 127.0.0.1:9876 f 192.168.0.1:10911 t 192.168.0.2:10911 ``` 迁移 如果已经使用上了 ACL 1.0,想要无缝地迁移至 ACL 2.0,也提供了相应的解决方案,只需要做以下配置即可。 配置定义 在 Broker 的配置文件中开启以下配置: ``` migrateAuthFromV1Enabled = true ``` 特别说明 启用以上配置后,将在 Broker 启动过程中自动触发执行。该迁移功能会把 ACL 1.0 中的用户权限信息写入 ACL 2.0 的相应存储结构中。对于在 ACL 2.0 中尚未存在的用户和权限,系统将自动添加。对于已存在的用户和权限,迁移功能不会进行覆盖,以避免重写 ACL 2.0 中已经进行的任何修改。ACL 1.0 中关于 IP 白名单,由于是用于绕过访问控制的检查,和 ACL 2.0 的行为不匹配,所以不会迁移到 ACL 2.0 中。如果已经使用相关的能力,请完成改造后再做迁移。 规划与总结 规划 关于 RocketMQ ACL 的未来规划,可能会体现在以下两个方面: 丰富的认证和授权扩展:市场上存在丰富的认证和授权解决方案,其他的存储或计算产品也都采用了各种各样的实现方式。为了紧跟行业的发展趋势,RocketMQ ACL 未来也将努力创新,以满足更为广泛和多变的客户需求。同时,也将持续深化研究和发展更加出色的认证和授权策略,以达到安全性和性能之间的理想平衡。 可视化的用户权限操作:当前,在 ACL 中进行用户和权限的配置仅能通过命令行工具,不够友好。未来我们希望能在 RocketMQ Dashboard 上提供一个清晰、易用的可视化管理界面,从而简化配置流程并降低管理的技术门槛。另一方面,现有的 Dashboard 尚未集成 ACL 访问控制体系,后续也要将它纳入进来,以实现用户在 Dashboard 上对各项资源进行操作的访问权限。 总结 RocketMQ ACL 2.0 不管是在模型设计、可扩展性方面,还是安全性和性能方面都进行了全新的升级。旨在能够为用户提供精细化的访问控制,同时,简化权限的配置流程。欢迎大家尝试体验新版本,并应用在生产环境中。非常期待大家的在社区中反馈、讨论,和参与贡献,共同推进 RocketMQ 社区的成长和技术进步。
作者:徒钟
#行业实践 #最佳实践