2024年8月9日

深度解读Apache RocketMQ 存储机制
来源 | 阿里开发者公众号 简介:RocketMQ 实现了灵活的多分区和多副本机制,有效的避免了集群内单点故障对于整体服务可用性的影响。存储机制和高可用策略是 RocketMQ 稳定性的核心,社区上关于 RocketMQ 目前存储实现的分析与讨论一直是一个热议的话题。本文想从一个不一样的视角,着重于作者眼中的这种存储实现是在解决哪些复杂的问题,因此我从本文最初的版本中删去了冗杂的代码细节分析,由浅入深的分析存储机制的缺陷与优化方向。 RocketMQ 实现了灵活的多分区和多副本机制,有效的避免了集群内单点故障对于整体服务可用性的影响。存储机制和高可用策略是 RocketMQ 稳定性的核心,社区上关于 RocketMQ 目前存储实现的分析与讨论一直是一个热议的话题。本文想从一个不一样的视角,着重于谈谈我眼中的这种存储实现是在解决哪些复杂的问题,因此我从本文最初的版本中删去了冗杂的代码细节分析,由浅入深的分析存储机制的缺陷与优化方向。 RocketMQ 的架构模型与存储分类 先来简单介绍下 RocketMQ 的架构模型。RocketMQ 是一个典型的发布订阅系统,通过 Broker 节点中转和持久化数据,解耦上下游。Broker 是真实存储数据的节点,由多个水平部署但不一定完全对等的副本组构成,单个副本组的不同节点的数据会达到最终一致。对于单个副本组来说同一时间最多只会有一个可读写的 Master 和若干个只读的 Slave,主故障时需要选举来进行单点故障的容错,此时这个副本组是可读不可写的。NameServer 是独立的一个无状态组件,接受 Broker 的元数据注册并动态维护着一些映射关系,同时为客户端提供服务发现的能力。在这个模型中,我们使用不同主题 (Topic) 来区分不同类别信息流,为消费者设置订阅组 (Group) 进行更好的管理与负载均衡。 如下图中间部分所示: 1. 服务端 Broker Master1 和 Slave1 构成其中的一个副本组。 2. 服务端 Broker 1 和 Broker 2 两个副本组以负载均衡的形式共同为客户端提供读写。 RocketMQ 目前的存储实现可以分为几个部分: 1. 元数据管理 具体指当前存储节点的主题 Topic,订阅组 Group,消费进度 ConsumerOffset。 多个配置文件 Config,以及为了故障恢复的存储 Checkpoint 和 FileLock。 用来记录副本主备身份的 Epoch / SN (sequence number) 文件等(5.0beta 引入,也可以看作 term) 2. 消息数据管理,包括消息存储的文件 CommitLog,文件版定时消息的 TimerLog。 3. 索引数据管理,包括按队列的顺序索引 ConsumeQueue 和随机索引 IndexFile。 元数据管理与优化 为了提升整体的吞吐量与提供跨副本组的高可用能力,RocketMQ 服务端一般会为单个 Topic 创建多个逻辑分区,即在多个副本组上各自维护部分分区 (Partition),我们把它称为队列 (MessageQueue)。同一个副本组上同一个 Topic 的队列数相同并从 0 开始连续编号,不同副本组上的 MessageQueue 数量可以不同。 例如 topica 可以在 broker1 主副本上有 4 个队列,编号 (queueId) 是 03,在 broker1 备副本上完全相同,但是 broker2 上可能就只有 2 个队列,编号 01。在 Broker 上元数据的组织管理方式是与上述模型匹配的,每一个 Topic 的 TopicConfig,包含了几个核心的属性,名称,读写队列数,权限与许多元数据标识,这个模型类似于 K8s 的 StatefulSet,队列从 0 开始编号,扩缩队列都在尾部操作(例如 24 个队列缩分区到 16,是留下了编号为 015 的分区)。这使得我们无需像 Kafka 一样对每个分区单独维护状态机,同时大幅度的简化了关于分区的实现。 我们会在存储节点的内存中简单的维护 Map 的结构来将 TopicName 直接映射到它的具体参数。这个设计足够的简单,也隐含了一些缺陷,例如它没有实现一个原生 Namespace 机制来实现存储层面上多租户环境下的元数据的隔离,这也是 RocketMQ 5.0 向云原生时代迈进过程中一个重要的演进方向。 当 Broker 接收到外部管控命令,例如创建或删除一些 Topic,这个内存 Map 中就会对应的更新或者删除一个 KV 对,需要立刻序列化一次并向磁盘覆盖,否则就会造成丢失更新。对于单租户的场景下,Topic (Key) 的数量不会超过几千个,文件大小也只有数百 KB,速度是非常快。但是在云上大多租的场景下,一个存储节点的 Topic 可以达到十几 MB。每次变更一个 KV 就全量向磁盘覆盖写这个大文件,这个操作的开销非常高,尤其是在数据需要跨集群,跨节点迁移,或者应急情况下扩容逃生场景下,同步写文件严重延长了外围管控命令的响应时间,也成为云上大共享模式下严峻的挑战之一。在这个背景下,两个解决方案很自然的就产生了,即批量更新接口和增量更新机制。 1. 批量更新指每次服务端可以接受一批 TopicConfig 的更新,这样 Broker 刷写文件的频率就显著的降低。 2. 增量更新指将这个 Map 的持久化换成逻辑替换成 KV 型的数据库或实现元数据的 Append 写,以 Compaction 的形式维护一致性。 除了最重要的 Topic 信息,Broker 还管理着 Group 信息,消费组的消费进度 ConsumerOffset 和多个配置文件。Group 的变更和 Topic 类似,都是只有新建或者删除时才需要持久化。而 ConsumeOffset 是用来维护每个订阅组的消费进度的,结构如 Map。这里我们从文件本身的作用和数据结构的角度进行分析下,Topic Group 虽然数量多,但是变化的频率还是比较低的,而提交与持久化位点时时刻刻都在进行,进而导致这个 Map 几乎在实时更新,但是上一次更新后的数据 (last commit offset) 对当前来说又没有什么用,并且允许丢少量更新。所以这里 RocketMQ 没有像 Topic Group 那样采取数据变化时刷写文件,而是使用一个定时任务对这个 Map 做 CheckPoint。这个周期默认是 5 秒,所以当服务端主备切换或者正常发布时,都会有秒级的消息重复。 那么这里还有没有优化的空间呢?事实上大部分的订阅组都是不在线的,每次我们也只需要更新位点有变化的这部分订阅组。所以这里我们可以采取一个差分优化的策略(参加过 ACM 的选手应该更熟悉,搜索差分数据传输),在主备同步 Offset 或者持久化的时候只更新变化的内容。假如此时我们除了知道当前的 Offset,还需要一个历史 Offset 的提交记录怎么办,这种情况下,也使用一个内置的系统 Topic 来保存每次提交(某种意义上的自举实现,Kafka 就是使用一个内部 Topic 来保存位点),通过回放或查找消息来追溯消费进度。由于 RocketMQ 支持海量 Topic,元数据的规模会更加大,采用目前的实现开销更小。所以选用哪种实现完全是由我们所面对的需求决定的,实现也可以是灵活多变的。当然,在 RocketMQ 元数据管理上,如何在上层保证分布式环境下多个副本组上的数据一致又是另外一个令人头疼的难题,后续文章会更加详细的讨论这点。 消息数据管理 很多文章都提到 RocketMQ 存储的核心是一个极致优化的顺序写盘,以 append only 的形式不断的将新的消息追加到文件末尾。 RocketMQ 使用了一种称为 MappedByteBuffer 的内存映射文件的办法,将一个文件映射到进程的地址空间,实现文件的磁盘地址和进程的一段虚拟地址关联,实际上是利用了NIO 中的 FileChannel 模型。在进行这种绑定后,用户进程就可以用指针(偏移量)的形式写入磁盘而不用进行 read / write 的系统调用,减少了数据在缓冲区之间来回拷贝的开销。当然这种内核实现的机制有一些限制,单个 mmap 的文件不能太大 (RocketMQ 选择了 1G),此时再把多个 mmap 的文件用一个链表串起来构成一个逻辑队列 (称为 MappedFileQueue),就可以在逻辑上实现一个无需考虑长度的存储空间来保存全部的消息。 这里不同 Topic 的消息直接进行混合的 append only 写,相比于随机写来说性能的提升非常显著的。还有一个重要的细节,这里的混合写的写放大非常低。当我们回头去看 Google 实现的 BigTable 的理论模型,各种 LSM 树及其变种,都是将原来的直接维护树转为增量写的方式来保证写性能,再叠加周期性的异步合并来减少文件的个数,这个动作也称为 Compaction。RocksDB 和 LevelDB 在写放大,读放大,空间放大都有几倍到几十倍的开销。得益于消息本身的不可变性,和非堆积的场景下,数据一旦写入中间代理 Broker 很快就会被下游消费掉的特性,此时我们不需要在写入时就维护 memTable,避免了数据的分发与重建。相比于各种数据库的存储引擎,消息这样近似 FIFO 的实现可以节省大量的资源,同时减少了 CheckPoint 的复杂度。对于同一个副本组上的多个副本之间的数据复制都是全部由存储层自行管理,这个设计类似于 bigtable 和 GFS,azure 的 Partation layer,也被称为 Layered Replication 分层架构。 单条消息的存储格式 RocketMQ 有一套相对复杂的消息存储编码用来将消息对象序列化,随后再将一个非定长的数据落到上述的真实的写入到文件中,值得注意的存储格式中包括了索引队列的编号和位置。 存储时单条消息本身元数据占用的存储空间为固定的 91B + 部分属性,而消息的 payload 通常大于 2K,也就是说元数据带来的额外存储开销只增加了 5%10% 左右。很明显,单条消息越大,存储本身额外的开销(比例)就相对的越少。但如果有大消息的诉求,例如想在 body 中保存一张序列化后的图片(二进制大对象),从目前的实现上说,在消息中保存引用,将真实数据保存到到其他组件,消费时读取引用(比如文件名或者 uk)其实是一个更合适的设计。 多条消息的连续写 上文提到,不同 Topic 的消息数据是直接混合追加数据到 CommitLog 中 (也就是上文提到的 MappedFileQueue),再交由其他后端线程做分发。其实我觉得 RocketMQ 这种 CommitLog 与元数据的分开管理的机制也有一些 PacificaA (微软提出的复制框架) 的影子,从而以一种更简单的方式实现强一致。这里的强一致指的是在 Master Broker (对应于 PacificA 的 Primary) 对所有消息的持久化进行定序,再通过全序广播 (total order broadcast) 实现线性一致 (Linearizability)。这几种实现都会需要解决两个类似的问题,一是如何实现单机下的顺序写,二是如何加快写入的速度。 如果是副本组是异步多写的(高性能中可靠性),将日志非最新(水位最高)的备选为主,主备的数据日志可能会产生分叉。在 RocketMQ 5.0 中,主备会通过基于版本的协商机制,使用落后补齐,截断未提交数据等方式来保证数据的一致性。顺便一提,RocketMQ 5.0 中实现了 logic queue 方案解决全局分区数变化的问题,这和 PacificaA 中通过 newseal 新增副本组和分片 merge 给计算层读的一些优化策略有一些异曲同工之妙,具体可以参考这个设计方案[10]。 独占锁实现顺序写 如何保证单机存储写 CommitLog 的顺序性,直观的想法就是对写入动作加独占锁保护,即同一时刻只允许一个线程加锁成功,那么该选什么样的锁实现才合适呢?RocketMQ 目前实现了两种方式。1. 基于 AQS 的 ReentrantLock 2. 基于 CAS 的 SpinLock 那么什么时候选取 spinlock,什么时候选取 reentranlock?回忆下两种锁的实现,对于 ReentrantLock,底层 AQS 抢不到锁的话会休眠,但是 SpinLock 会一直抢锁,造成明显的 CPU 占用。SpinLock 在 trylock 失败时,可以预期持有锁的线程会很快退出临界区,死循环的忙等待很可能要比进程挂起等待更高效。这也是为什么在高并发下为了保持 CPU 平稳占用而采用方式一,单次请求响应时间短的场景下采用方式二能够减少 CPU 开销。两种实现适用锁内操作时间不同的场景,那线程拿到锁之后需要进行哪些动作呢? 1. 预计算索引的位置,即 ConsumeQueueOffset,这个值也需要保证严格递增。 2. 计算在 CommitLog 存储的位置,physicalOffset 物理偏移量,也就是全局文件的位置。 3. 记录存储时间戳 storeTimestamp,主要是为了保证消息投递的时间严格保序。 因此不少文章也会建议在同步持久化的时候采用 ReentrantLock,异步持久化的时候采用 SpinLock。那么这个地方还有没有优化的空间?目前可以考虑使用较新的 futex 取代 spinlock 机制。futex 维护了一个内核层的等待队列和许多个 SpinLock 链表。当获得锁时,尝试 cas 修改,如果成功则获得锁,否则就将当前线程 uaddr hash 放入到等待队列 (wait queue),分散对等待队列的竞争,减小单个队列的长度。这听起来是不是也有一点点 concurrentHashMap 和 LongAddr 的味道,其实核心思想都是类似的,即分散竞争。 成组提交与可见性 受限于磁盘IO,块存储的响应通常非常慢。要求所有请求立即持久化是不可能的,为了提升性能,大部分的系统总是将操作日志缓存到内存中,比如在满足"日志缓冲区中数据量超过一定大小 / 距离上次刷入磁盘超过一定时间" 的任一条件时,通过后台线程定期持久化操作日志。但这种成组提交的做法有一个很大的问题,存储系统意外故障时,会丢失最后一部分更新操作。例如数据库引擎总是要求先将操作日志刷入磁盘 (优先写入 redo log) 才能更新内存中的数据,这样断电重启则可以通过 undo log 进行事务回滚与丢弃。 在消息系统的实现上有一些微妙的不同,不同场景下对消息的可靠性要求不同,在金融云场景下可能要求主备都同步持久化完成消息才对下游可见,但日志场景希望尽可能低的延迟,同时允许故障场景少量丢失。此时可以将 RocketMQ 配置为单主异步持久化来提高性能,降低成本。此时宕机,存储层会损失最后一小段没保存的消息,而下游的消费者实际上已经收到了。当下游的消费者重置位点到一个更早的时间,回放至同样位点的时候,只能读取到了新写入的消息,但读取不到之前消费过的消息(相同位点的消息不是同一条),这是一种 read uncommitted。 这样会有什么问题呢?对于普通消息来说,由于这条消息已经被下游处理,最坏的影响是重置位点时无法消费到。但是对于 Flink 这样的流计算框架,以 RocketMQ 作为 Source 的时候,通过回放最近一次 CheckPoint 到当前的数据的 offset 来实现高可用,不可重复读会造成计算系统没法做到精确的 excatly once 消费,计算的结果也就不正确了。相应的解决的方案之一是在副本组多数派确认的时候才构建被消费者可见的索引,这么做宏观上的影响就是写入的延迟增加了,这也可以从另一个角度解读为隔离级别的提升带来的代价。 对于权衡延迟和吞吐量这个问题,可以通过加快主备复制速度,改变复制的协议等手段来优化,这里大家可以看下 SIGMOD 2022 关于 Kafka 运行在 RDMA 网络上显著降低延迟的论文《KafkaDirect: Zerocopy Data Access for Apache Kafka over RDMA Networks》链接[9]。 持久化机制 关于这一块的讨论在社区里讨论是最多的,不少文章都把持久化机制称为刷盘。我不喜欢这个词,因为它不准确。在 RocketMQ 中提供了三种方式来持久化,对应了三个不同的线程实现,实际使用中只会选择一个。 同步持久化,使用 GroupCommitService。 异步持久化且未开启 TransientStorePool 缓存,使用 FlushRealTimeService。 异步持久化且开启 TransientStorePool 缓存,使用 CommitRealService。 持久化 同步刷盘的落盘线程统一都是 GroupCommitService。写入线程仅仅负责唤醒落盘线程,将消息转交给存储线程,而不会等待消息存储完成之后就立刻返回了。我个人对这个设计的理解是,消息写入线程相对与存储线程来说也可以看作 IO 线程,而真实存储的线程需要攒批持久化会陷入中断,所以才要大费周章的做转交。 从同步刷盘的实现看,落盘线程每隔 10 ms 会检查一次,如果有数据未持久化,便将 page cache 中的数据刷入磁盘。此时操作系统 crash 或者断电,那未落盘的数据丢失会不会对生产者有影响呢?此时生产者只要使用了可靠发送 (指非 oneway 的 rpc 调用),这时对于发送者来说还没有收到成功的响应,此时客户端会进行重试,将消息写入其他可用的节点。 异步持久化对应的线程是 FlushRealTimeService,实现上又分为固定频率和非固定频率,核心区别是线程是否响应中断。所谓的固定频率是指每次有新的消息到来的时候不管,不响应中断,每隔 500ms(可配置)flush 一次,如果发现未落盘数据不足(默认 16K),直接进入下一个循环,如果数据写入量很少,一直没有填充满16K,就不会落盘了吗?这里还有一个基于时间的兜底方案,即线程发现距离上次写入已经很久了(默认 10 秒),也会执行一次 flush。但事实上 FileChannel 还是 MappedByteBuffer 的 force() 方法都不能精确控制写入的数据量,这里的写行为也只是对内核的一种建议。对于非固定频率实现,即每次有新的消息到来的时候,都会发送唤醒信号,当唤醒动作在数据量较大时,存在性能损耗,但消息量较少且情况下实时性好,更省资源。在生产中,具体选择哪种持久化实现由具体的场景决定。是同步写还是多副本异步写来保证数据存储的可靠性,本质上是读写延迟和和成本之间的权衡。 读写分离 广义上来说,读写分离这个名词有两个不同的含义: 像数据库一样主写从读,分摊读压力,牺牲延迟可靠性更高,适用于消息读写比非常高的场景。 存储写入将消息暂存至 DirectByteBuffer,当数据成功写入后,再归还给缓冲池,将写入 page cache 的动作异步化。 这里主要来讨论第二点,当 Broker 配置异步持久化且开启缓冲池,启用的异步刷盘线程是 CommitRealTimeService。我们知道操作系统本身一般是当 page cache 上积累了大量脏页后才会触发一次 flush 动作(由一些 vm 参数控制,比如 dirty_background_ratio 和 dirty_ratio)。这里有一个很有意思的说法是 CPU 的 cache 是由硬件维护一致性,而 page cache 需要由软件来维护,也被称为 syncable。高并发下写入 page cache 可能会造成刷脏页时磁盘压力较高,导致写入时出现毛刺现象。为了解决这个问题,出现了读写分离的实现,使用堆外内存将消息 hold 住,然后进行异步批量写入。 RocketMQ 启动时会默认初始化 5 块(参数 transientStorePoolSize 决定)堆外内存(DirectByteBuffer)循环利用,由于复用堆外内存,这个小方案也被成为池化,池化的好处及弊端如下: 好处:数据写堆外后便很快返回,减少了用户态与内核态的切换开销。 弊端:数据可靠性降为最低级别,进程重启就会丢数据(当然这里一般配合多副本机制进行保障),也会增加一些端到端的延迟。 宕机与故障恢复 宕机一般是由于底层的硬件问题导致,RocketMQ 宕机后如果磁盘没有永久故障,一般只需要原地重启,Broker 首先会进行存储状态的恢复,加载 CommitLog,ConsumeQueue 到内存,完成 HA 协商,最后初始化 Netty Server 提供服务。目前的实现是最后初始化对用户可见的网络层服务,实际上这里也可以先初始化网络库,分批将 Topic 注册到 NameServer,这样正常升级时可以对用户的影响更小。 在 recover 的过程中还有很多软件工程实现上的细节,比如从块设备加载的时候需要校验消息的 crc 看是否产生错误,对最后一小段未确认的消息进行 dispatch 等操作。默认从倒数第三个文件 recover CommitLog 加载消息到 page cache (假设未持久化的数据 文件的生命周期 聊完了消息的生产保存,再来讨论下消息的生命周期,只要磁盘没有满,消息可以长期保存。前面提到 RocketMQ 将消息混合保存在 CommitLog,对于消息和流这样近似 FIFO 的系统来说,越近期的消息价值越高,所以默认以滚动的形式从前向后删除最久远的消息,而不会关注文件上的消息是否全部被消费。触发文件清除操作的是一个定时任务,默认每 10s 执行一次。在一次定时任务触发时,可能会有多个物理文件超过过期时间可被删除,因此删除一个文件不但要判断这个文件是否还被使用,还需要间隔一定时间(参数 deletePhysicFilesInterval)再删除另外一个文件,由于删除文件是一个非常耗费 IO 的操作,可能会引起存储抖动,导致新消息写入和消费的延迟。所以又新增了一个定时删除的能力,使用 deleteWhen 配置操作时间(默认是凌晨4点)。我们把由于磁盘空间不足导致的删除称为被动行为,由于高速介质通常比较贵(傲腾 ESSD等),出于成本考虑,我们还会异步的主动的将热数据转移到二级介质上。在一些特殊的场景下,删除的同时可能还需要对磁盘做安全擦除来防止数据恢复。 避免存储抖动 快速失败 消息被服务端 Netty 的 IO 线程读取后就会进入到阻塞队列中排队,而单个 Broker 节点有时会因为 GC,IO 抖动等因素造成短时存储写失败。如果请求来不及处理,排队的请求就会越积越多导致 OOM,客户端视角看从发送到收到服务端响应的时间大大延长,最终发送超时。RocketMQ 为了缓解这种抖动问题,引入了快速失败机制,即开启一个扫描线程,不断的去检查队列中的第一个排队节点,如果该节点的排队时间已经超过了 200ms,就会拿出这个请求,立即向客户端返回失败,客户端会重试到其他副本组(客户端还有一些熔断与隔离机制),实现整体服务的高可用。 存储系统不止是被动的感知一些下层原因导致的失败,RocketMQ 还设计了很多简单有效的算法来进行主动估算。例如消息写入时 RocketMQ 想要判断操作系统的 page cache 是否繁忙,但是 JVM 本身没有提供这样的 Monitor 工具来评估 page cache 繁忙程度,于是利用系统的处理时间来判断写入是否超过1秒,如果超时的话,让新请求会快速失败。再比如客户端消费时会判断当前主的内存使用率比较高,大于物理内存的 40%时,就会建议客户端从备机拉取消息。 预分配与文件预热 为了在 CommitLog 写满之后快速的切换物理文件,后台使用一个后台线程异步创建新的文件并进行对进行内存锁定,还大费周章的设计了一个额外文件预热开关(配置 warmMapedFileEnable),这么做主要有两个原因:请求分配内存并进行 mlock 系统调用后并不一定会为进程完全锁定这些物理内存,此时的内存分页可能是写时复制的。此时需要向每个内存页中写入一些假的值,有些固态的主控可能会对数据压缩,所以这里不会写入 0。 调用 mmap 进行映射后,OS 只是建立虚拟内存地址至物理地址的映射表,而实际并没有加载任何文件至内存中。这里可能会有大量缺页中断。RocketMQ 在做 mmap 内存映射的同时进行 madvise 调用,同时向 OS 表明 WILLNEED 的意愿。使 OS 做一次内存映射后对应的文件数据尽可能多的预加载至内存中,从而达到内存预热的效果。 当然,这么做也是有弊端的。预热后,写文件的耗时缩短了很多,但预热本身就会带来一些写放大。整体来看,这么做能在一定程度上提高响应时间的稳定性,减少毛刺现象,但在 IO 本身压力很高的情况下则不建议开启。RocketMQ 是适用于 Topic 数量较多的业务消息场景。所以 RocketMQ 采用了和 Kafka 不一样的零拷贝方案,Kafka 采用的是阻塞式 IO 进行 sendfile,适用于系统日志消息这种高吞吐量的大块文件。而 RocketMQ 选择了 mmap + write 非阻塞式 IO (基于多路复用) 作为零拷贝方式,这是因为 RocketMQ 定位于业务级消息这种小数据块/高频率的 IO 传输,当想要更低的延迟的时候选择 mmap 更合适。 当 kernal 把可用的内存分配后 free 的内存就不够了,如果进程一下产生大量的新分配需求或者缺页中断,还需要将通过淘汰算法进行内存回收,此时可能会产生抖动,写入会有短时的毛刺现象。 冷数据读取 对于 RocketMQ 来说,读取冷数据可能有两种情况。 请求来自于这个副本组的其他节点,进行副本组内的数据复制,也可能是离线转储到其他系统。 请求来自于客户端,是消费者来消费几个小时以前的数据,属于正常的业务诉求。 对于第一种情况,在 RocketMQ 低版本源码中,对于需要大量复制 CommitLog 的情况(例如备磁盘故障,或新上线一个备机),主默认使用 DMA 拷贝的形式将数据直接通过网络复制给备机,此时由于大量的缺页中断阻塞了 io 线程,此时会影响 Netty 处理新的请求,在实现上让一些组件之间的内部通信使用 fastRemoting 提供的第二个端口,解决这个问题的临时方案还包括先用业务线程将数据 load 回内存而不使用零拷贝,但这个做法没有从本质上解决阻塞的问题。对于冷拷贝的情况,可以使用 madvice 建议 os 读取避免影响主的消息写入,也可以从其他备复制数据。 对于第二种情况,对各个存储产品来说都是一个挑战,客户端消费一条消息时,热数据全部存储在 page cache,对于冷数据会退化为随机读(系统会有一个对 page cache 连续读的预测机制)。需要消费超过几个小时之前的数据的场景下,消费者一般都是做数据分析或者离线任务,此时下游的目标都是吞吐量优先而非延迟。对于 RocketMQ 来说有两个比较好的解决方案,第一是同 redirect 的方式将读取请求转发给备进行分摊读压力,或者是从转储后的二级介质读取。在数据转储后,RocketMQ 本身的数据存储格式会发生变化,详见后文。 索引数据管理 在数据写入 CommitLog 后,在服务端当 MessageStore 向 CommitLog 写入一些消息后,有一个后端的 ReputMessageService 服务 (dispatch 线程) 会异步的构建多种索引,满足不同形式的读取诉求。 队列维度的有序索引 ConsumeQueue 在 RocketMQ 的模型下,消息本身存在的逻辑队列称为 MessageQueue,而对应的物理索引文件称为 ConsumeQueue。从某种意义上说 MessageQueue = 多个连续 ConsumeQueue 索引 + CommitLog 文件。 ConsumeQueue 相对与 CommitLog 来说是一个更加轻量。dispatch 线程会源源不断的将消息从 CommitLog 取出,再拿出消息在 CommitLog 中的物理偏移量 (相对于文件存储的 Index),消息长度以及Tag Hash 作为单条消息的索引,分发到对应的消费队列。偏移 + 长度构成了对 CommitLog 的引用 (Ref)。这种 Ref 机制对于单挑消息只有 20B,显著降低了索引存储开销。ConsumeQueue 实际写入的实现与 CommitLog 不同,CommitLog 有很多存储策略可以选择且混合存储,一个 ConsumeQueue 只会保存一个 Topic 的一个分区的索引,持久化默认使用 FileChannel,实际上这里使用 mmap 的话对小数据量的请求更加友好,不用陷入中断。 客户端的 pull 请求到服务端执行了如下流程来查询消息: 1. 根据 Tag 的 Hash 值查询 ConsumeQueue 文件(由 physicOffset + size + Tag HashCode 组成) 2. 根据 ConsumeQueue 拿到 physicOffset + size 3. 根据 physicOffset 查询 CommitLog 文件(上文的MappedFileQueue)获得消息 RocketMQ 中默认指定每个消费队列的文件存储 30 万条索引,而一个索引占用 20 个字节,这样每个文件的大小是 300 _1000 _20 / 1024 / 1024 ≈ 5.72M。为什么消费队列文件存储消息的个数要设置成 30 万呢?这个经验值适合消息量比较大的场景,事实上这个值对于大部分场景来说是偏大的,有效数据的真实占用率很低,导致ConsumeQueue 空载率高。 先来看看如果过大或者过小会带来什么问题。因为消息总是有失效期的,例如 3 天失效,如果消费队列的文件设置过大的话,有可能一个文件中包含了过去一个月的消息索引,但这个时候原始的数据已经滚动没了,白白浪费了很多空间。但也不宜太小,导致 ConsumeQueue 也有大量小文件,降低读写性能。 下面给出一个非严谨的空载率推导过程: 假设此时单机的 Topic = 5000,单节点单个 Topic 的队列数一般是 8,分区数量 = 4万。以 1T 消息数据为例,每条消息大小是 4KB,索引数量 = 消息数量 = 1024 1024 1024 / 4 = 2.68 亿。最少需要的 ConsumeQueue = 索引数量 / 30万 = 895 个,实际使用率 (有效数据量) 约等于 2.4%。随着 ConsumeQueue Offset 的原子自增滚动,cq 头部是无效数据导致占用的磁盘空间会变大。根据公有云线上的情况来看,非 0 数据约占 5%,实际有效数据只占 1%。对于 ConsumeQueue 这样的索引文件,我们可以使用 RocksDB 或者傲腾这样的持久化内存来存储,或者对 ConsumeQueue 单独实现一个用户态文件系统,几个方案都可以减少整体索引文件大小,提高访问性能。这一点在后文关于存储机制的优化中,我们再详聊。 由于 CommitLog ConsumerQueue Offset 的关系从消息写入的那一刻开始就确定了,在 Topic 跨副本组迁移,副本组要下线等需要切流的场景下,如果需要消息可读,需要采用复制数据的方案来实现 Topic 跨副本组迁移,只能采用消息级别的拷贝,而不能简单的把一个分区从副本组 A 移动到副本组 B。有一些消息产品在面对这个场景时,采用了数据按分区复制的方案,这种方案可能会立刻产生大量的数据传输(分区 rebalance),而 RocketMQ 的切流一般可以做到秒级生效。 消息维度的随机索引 IndexFile RocketMQ 作为业务消息的首选,上文中 ReputMessageService 线程除了构建消费队列的索引外,还同时为每条消息根据 id, key 构建了索引到 IndexFile。这是方便快速快速定位目标消息而产生的,当然这个构建随机索引的能力是可以降级的,IndexFile文件结构如下: IndexFile 也是定长的,从单个文件的数据结构来说,这是实现了一种简单原生的哈希拉链机制。当一条新的消息索引进来时,首先使用 hash 算法命中黄色部分 500w 个 slot 中的一个,如果存在冲突就使用拉链解决,将最新索引数据的 next 指向上一条索引位置。同时将消息的索引数据 append 至文件尾部(绿色部分),这样便形成了一条当前 slot 按照时间存入的倒序的链表。这里其实也是一种 LSM compaction 在消息模型下的改进,降低了写放大。 存储机制的演进方向 RocketMQ 的存储设计是以简单可靠队列模型作为核心来抽象的,也因此产生了一些缺陷和对应的优化方案。 KV 模型与 Queue 模型结合 RocketMQ 实现了单条业务消息的退避重试,在生产实践中,我们发现部分用户在客户端消费限流时直接将消息返回失败,在重试消息量比较大的时候,由于原有实现下重试队列数有限,导致重试消息无法很好的负载均衡到所有客户端。同时,消息来回的在服务端和客户端之间传输,使得两侧的开销都增加了,用户侧正确的做法应该是消费限流时,让消费的线程等待一会儿。从存储服务的角度上来说,这其实是一种队列模型的不足,让一条队列只能被一个消费者持有。RocketMQ 提出了 pop 消费这种全新的概念,让单条队列的消息能够被多个客户端消费到,这涉及到服务端对单条消息的加解锁,KV 模型就非常契合这个场景。从长远来看,像定时消息事务消息可以有一些基于 KV 的更原生的实现,这也是 RocketMQ 未来努力的方向之一。 消息的压缩与归档存储 压缩就是用时间去换空间的经典 tradeoff,希望以较小的 CPU 开销带来更少的磁盘占用或更少的网络 I/O 传输。目前 RocketMQ 客户端从延迟考虑仅单条大于 4K 的消息进行单条压缩存储的。服务端对于收到的消息没有立刻进行压缩存储有多个原因,例如为了保证数据能够及时的写入磁盘,消息稀疏的时候攒批效果比较差等,所以 Body 没有压缩存储。而对于大部分的业务 Topic 来说,其实 Body 一般都有很大程度上是相似的,可以压缩到原来的几分之一到几十分之一。 存储一般有高速(高频)介质与低速介质,热数据存放在高频介质上(如傲腾,ESSD,SSD),冷数据存放在低频介质上(NAS,OSS),以此来满足低成本保存更久的数据。从高频介质转到更低频的 NAS 或者 OSS 时,不可避免的产生了一次数据拷贝。我们可以在这个过程中异步的对数据进行规整(闲时资源富余)。那么我们为什么要做规整呢,直接零拷贝复制不香吗?答案就是低频介质虽然便宜大碗,但通常 iops 和吞吐量更低。对于 RocketMQ 来说需要规整的数据就是索引和 CommitLog 中的消息,也就是说在高频介质与低频介质上消息的存储格式可以是完全不同的。当热消息降级到二级存储的时候,数据密集且异步,这里就是一个非常合适的机会进行压缩和规整。业界也有一些基于 FPGA 来加速存储压缩的案例,将来我们也会持续的做这方面的尝试。 存储层资源共享与争抢 磁盘 IO 的抢占 没错,这里想谈谈的其实是硬盘的调度算法。在一个考虑性价比的场景下,由于 RocketMQ 的存储机制,我们可以把索引文件存储在 SSD,消息本身放在 HDD 里,因为热消息总是在 PageCache 中的,所以在 IO 调度上优先满足写而饿死读。对于没有堆积的消费者来说,消费到的数据是从 page cache 拷贝到 socket 再传输给用户,实时性已经很高了。而对于消费冷数据(几个小时,几天以前的数据)用户的诉求一般是尽快获取到消息即可,此时服务端可以选择尽快满足用户的 Pull 请求,由于大量的随机 IO,这样磁盘会产生严重的 rt 抖动。仔细考虑,这里其实用户想要的是尽可能大的吞吐量,假设访问冷数据需要 200 毫秒,假设在服务端把冷读的行为滞后,再加上延迟 500 毫秒再返回给用户数据,并没有显著的区别。而这里的 500 毫秒,服务端内部就可以合并大量的 IO 操作,我们也可以使用 madvice 系统调用去建议内核读取。这里的合并带来的收益很高,可以显著的减少对热数据的写入的影响,大幅度提升性能。 用户态文件系统 还是为了解决随机读效率低的问题,我们可以设计一个用户态文件系统,让 IO 调用全部 kernelbypass。 主要有几个方向: 1. 多点挂载。常用的 Ext4 等文件系统不支持多点挂载,让存储能够支持多个实例的对同一份数据的共享访问。 2. 调整对于 IO 的合并策略,IO优先级,polling 模式,队列深度等。 3. 使用文件系统类似 O_DIRECT 的非缓存方式读写数据。 RocketMQ 的未来 RocketMQ 存储系统经过多年的发展,基本功能特性已经比较完善,通过一系列的创新技术解决了分布式存储系统中的难题,稳定的服务于阿里集团和海量的云上用户。RocketMQ 在云原生时代的演进中遇到了更多的有趣的场景和挑战,这是一个需要全链路调优的复杂工程。我们会持续在规模,稳定性,多活容灾等企业级特性,成本与弹性等方面发力,将 RocketMQ 打造为“消息,事件,流”一体化的融合平台。同时,我们也会将开源行动更加可持续的发展下去,为社会创造价值。 参考文献 [3]. J. DeBrabant, A. Pavlo, S. Tu, M. Stonebraker, and S. B. Zdonik. Anticaching: A new approach to database management system architecture. PVLDB, 6(14):1942–1953, 2013. [4]. 《RocketMQ 技术内幕》 [5]. 一致性协议中的“幽灵复现”. [https://zhuanlan.zhihu.com/p/47025699.](https://zhuanlan.zhihu.com/p/47025699. [6]. Calder B, Wang J, Ogus A, et al. Windows Azure Storage: a highly available cloud storage service with strong consistency[C]//Proceedings of the TwentyThird ACM Symposium on Operating Systems Principles. ACM, 2011: 143157. [7]. Chen Z, Cong G, Aref W G. STAR: A distributed stream warehouse system for spatial data[C] 2020: 27612764. [8]. design dataintensive application 《构建数据密集型应用》
作者:斜阳
#技术探索

2024年8月9日

深度剖析 RocketMQ 5.0 之架构解析:云原生架构如何支撑多元化场景?
简介: 了解 RocketMQ 5.0 的核心概念和架构概览;然后我们会从集群角度出发,从宏观视角学习 RocketMQ 的管控链路、数据链路、客户端和服务端如何交互;学习 RocketMQ 如何实现数据的存储,数据的高可用,如何利用云原生存储进一步提升竞争力。 1.前言 从初代开源消息队列崛起,到 PC 互联网、移动互联网爆发式发展,再到如今 IoT、云计算、云原生引领了新的技术趋势,消息中间件的发展已经走过了 30 多个年头。 目前,消息中间件在国内许多行业的关键应用中扮演着至关重要的角色。随着数字化转型的深入,客户在使用消息技术的过程中往往同时涉及交叉场景,比如同时进行物联网消息、微服务消息的处理,同时进行应用集成、数据集成、实时分析等,企业需要为此维护多套消息系统,付出更多的资源成本和学习成本。 在这样的背景下,2022 年,RocketMQ 5.0 正式发布,相对于 RocketMQ 4.0,架构走向云原生化,并且覆盖了更多的业务场景。想要掌握最新版本 RocketMQ 的应用,就需要进行更加体系化的深入了解。 2.背景 本节课的内容是 RocketMQ 5.0 的架构解析。前面的课程中,我们了解到 RocketMQ 5.0 可以支撑多样化的业务场景,不仅仅是业务消息,它还会支持流处理、物联网、事件驱动等场景。在进入具体的业务领域场景之前,我们先从技术的角度来了解 RocketMQ 的云原生架构,看它是如何基于这一套统一的架构支撑多元化场景的。 首先,我们会了解 RocketMQ 5.0 的核心概念和架构概览;然后我们会从集群角度出发,从宏观视角学习 RocketMQ 的管控链路、数据链路、客户端和服务端如何交互;最后,我们将回到消息队列最重要的模块存储系统,学习 RocketMQ 如何实现数据的存储,数据的高可用,如何利用云原生存储进一步提升竞争力。 3. 概览 3.1. RocketMQ 领域模型 在学习 RocketMQ 的架构之前,我们先从用户视角来来看 RocketMQ 的关键概念以及领域模型。如下图,我们按照消息的流转顺序来介绍。 最左边是消息生产者,一般对应业务系统的上游应用,在某个业务动作触发后发送消息到 Broker。Broker 是消息系统数据链路的核心,负责接收消息、存储消息、维护消息状态、消费者状态。多个 Broker 组成一个消息服务集群,共同服务一个或者多个 Topic 。刚才提到生产者生产消息并发送到 Broker,消息是业务通信的载体,每个消息包含消息 ID、消息 Topic、消息体内容、消息属性、消息业务 key 等。每条消息都属于某个 Topic,表示同一个业务语义,在阿里内部,我们交易消息的 Topic 叫做 Trade,购物车消息叫 Cart,生产者应用会把消息发送到对应的 Topic 上。Topic 里面还有 MessageQueue,这个用于消息服务的负载均衡和数据存储分片,每个 Topic 会包含一个或者多个 Message Queue 分布在不同的消息 Broker。生产者发送消息,Broker 存储消息,下一步就是消费者消费消息。消费者一般对应业务系统的下游应用,同一个消费者应用集群会共用一个 Consumer Group。消费者会和某个 Topic 产生订阅关系,订阅关系是 Consumer Group + Topic + 过滤表达式的三元组,符合订阅关系的消息就会被对应的消费者集群消费。接下来我们从技术实现角度进一步深入了解 RocketMQ 。 3.2. RocketMQ 5.0 架构概览 这是 RocketMQ 5.0 的架构概览图,从上往下看,可分为 SDK、NameServer、Proxy 和 Store 层。 我们首先来看 SDK 层,包括了 RocketMQ 的 SDK ,用户基于 RocketMQ 自身的领域模型来使用这个 SDK 。除了 RocketMQ 自身的 SDK 之外,还包括了细分领域场景的业界标准 SDK 。面向事件驱动的场景,RocketMQ 5.0 支持 CloudEvents 的 SDK;面向 IoT 的场景,RocketMQ 支持物联网 MQTT 协议的 SDK;为了方便更多的传统应用迁移到 RocketMQ,我们还支持了 AMQP 协议,未来也会开源到社区版本里。另外一个组件是是 NameServer,它承担服务发现和负载均衡的职责。通过 NameServer,客户端能获取 Topic 的数据分片和服务地址,链接消息服务器进行消息收发。 消息服务包含计算层 Proxy 和存储层 RocketMQ Store。RocketMQ 5.0 是存算分离的架构,这里的存算分离强调的是模块的分离,职责的分离。Proxy 和 RocketMQ Store 面向不同的业务场景可以合并部署,也可以分开部署。计算层 Proxy 主要承载的消息的上层业务逻辑,尤其是面向多场景、多协议的支持,比如承载 CloudEvents、MQTT、AMQP 的领域模型的实现逻辑和协议转换。面向不同的业务负载,还可以把 Proxy 分离部署,独立弹性,比如在物联网场景,Proxy 层独立部署可以面向海量物联网设备连接数进行弹性伸缩,和存储流量扩缩容解耦。RocketMQ Store 层则是负责核心的消息存储,这里包括基于 Commitlog 的存储引擎、多元索引、多副本技术和云存储集成扩展。消息系统的状态都下沉到 RocketMQ Store,其他组件全部实现无状态化。 4. 服务发现 4.1. 服务发现 第二部分我们来详细看一下 RocketMQ 的服务发现。RocketMQ 的服务发现是通过 NameServer(简称NS) 来实现的。 我们通过下方这个图来了解服务发现的机制,这个是 Proxy 和 Broker 合并部署的模式,也是 RocketMQ 最常见的模式。前面提到每个 Broker 集群会负责某些 Topic 的服务,每个 Broker 都会把自身服务哪些 Topic 注册到 NameServer 集群,和每个 NameServer 进行通信,并定时和 NS 通过心跳机制来维持租约。服务注册的数据结构包含 Topic 和 Topic 分片 MessageQueue。 在示例中 Broker1 和 Broker2 分别承载 TopicA 的一个分片。在 NS 机器上会维护全局视图,TopicA 有两个分片分别在 Broker1 和 Broker2 。RocketMQ SDK 在对 TopicA 进行正式的消息收发之前,它会随机访问一个 NameServer 机器,从而知道这个 TopicA 有哪些分片,每个数据的分片在哪个 Broker 上面,它会跟这些 Broker 建立好长连接,然后再进行消息的收发。大部分的项目的服务发现机制会通过 zookeeper 或者 etcd 等强一致的分布式协调组件来担任注册中心的角色,而 RocketMQ 有自己的特点,如果从 CAP 的角度来看,它的注册中心采用的是 AP 的模式,NameServer 节点无状态,是 ShareNothing 的架构,有更高的可用性。 再看下方这个图,我们说 RocketMQ 的存算分离是可分可合,这里采用的就是分离的部署模式,RocketMQ SDK 直接访问无状态的 Proxy 集群。这个模式可以应对更加复杂的网络环境,支持多网络类型的访问,如公网访问,实现更好的安全控制。 在整个服务发现机制中,NameServer、Proxy 都是无状态的,可以随时进行节点增减。有状态节点 Broker 的增减基于 NS 的注册机制,客户端可以实时感知、动态发现。在缩容过程中,RocketMQ Broker 还可以进行服务发现的读写权限控制,对缩容的节点禁写开读,待未读消息全消费,实现无损平滑下线。 4.2. 负载均衡 刚才我们已经知道 SDK 如何通过 NameServer 来发现 Topic 的分片信息 MessageQueue,以及 Broker 地址。基于这些服务发现的元数据,我们再来详细看看消息流量是如何在生产者、RocketMQ Broker 和消费者集群进行负载均衡的。 先来看生产链路的负载均衡,生产者通过服务发现机制,知道了 Topic 的数据分片以及对应的 Broker 地址。它的服务发现机制是比较简单的,在默认情况下采用 Round Robin 的方式轮询发送到各个 Topic 队列,保证了 Broker 集群的流量均衡。在顺序消息的场景下会略有特殊,会基于消息的业务主键 Hash 到某个队列发送,这样一来,如果有热点业务主键,那 Broker 集群也可能出现热点。除此之外,我们基于这些元数据还能根据业务需要扩展更多的负载均衡算法,比如同机房优先算法,可以降低多机房部署场景下的延迟,提升性能。 再看消费者的负载均衡,相对来说会比生产者更复杂,它有两种类型的负载均衡方式。最经典的模式是队列级负载均衡,消费者知道 Topic 的队列总数,也知道同一个 Consumer Group 下的实例数,就可以按照统一的分配算法,类似一致性 hash 的方式,让每个消费者实例绑定对应的队列,只消费绑定队列的消息,每个队列的消息也只会被一个消费者实例消费。 这种模式最大的缺点就是负载不均衡,消费者实例要绑定队列、有临时状态。如果我们有三个队列,有两个消费者实例,那就必然有一个消费者需要消费三分之二的数据,如果我们有四个消费者,那么第四个消费者就要空跑。所以在 RocketMQ 5.0 里面,我们引入了消息粒度的负载均衡机制,无需绑定队列,消息在消费者集群随机分发,这样就可以保障消费者集群的负载均衡。更重要的是这种模式更加符合未来 Serverless 化的趋势,Broker 的机器数、Topic 的队列数和消费者实例数完全解耦,可以独立扩缩容。 5. 存储系统 前面通过架构概览和服务发现机制,我们已经对 RocketMQ 有比较全局性的了解。接下来我们将深入 RocketMQ 的存储系统,这个模块对 RocketMQ 的性能、成本、可用性有决定性作用。 5.1. 存储核心 先来看一下 RocketMQ 的存储核心。存储核心由 Commitlog、Consumequeue 和 Index 文件组成。消息存储首先写到 Commitlog,刷盘并复制到 slave 节点来完成持久化,Commitlog 是 RocketMQ 存储的 source of true,通过它可以构建完整的消息索引。相比于 Kafka 而言,RocketMQ 把所有 Topic 的数据都写到 Commitlog 文件,最大化顺序 io,使得 RocketMQ 单机可以支撑万级的 Topic。 在写完 Commitlog 之后,RocketMQ 会异步分发出多个索引,首先是 ConsumeQueue 索引,这个和 MessageQueue 是对应的,基于这个索引可以实现消息的精准定位,可以按照 Topic、队列 id 和位点定位到消息,消息回溯功能也是基于这个实现的。另外一个很重要的索引是哈希索引,它是消息可观测的基础。通过持久化的 hash 表来实现消息业务主键的查询能力,消息轨迹主要是基于这个来实现的。 除了消息本身的存储之外,Broker 还承载了消息元数据的存储。包括 topics 的文件,表示该 Broker 会对哪些 Topic 提供服务,还维护了每个 Topic 队列数、读写权限、顺序性等属性。还有一个 Subscription、ConsumerOffset 文件,这两个维护了 Topic 的订阅关系以及每个消费者的消费进度。还有 Abort、Checkpoint 文件则是用于完成重启后的文件恢复,保障数据完整性。 5.2. Topic 高可用 上面的内容中,我们站在单机的视角,从功能的层面学习 RocketMQ 的存储引擎,包括 Commitlog 和索引。现在我们重新跳出来,再从集群视角看 RocketMQ 的高可用。我们先定义一下 RocketMQ 的高可用,指当 RocketMQ 集群出现 NameServer、Broker 局部不可用的时候,指定的 Topic 依然是可读可写的。 RocketMQ 可以应对三类故障场景。 第一种 case,某对主备单机不可用。如下方这个图,当 Broker2 主宕机,备可用。TopicA 依然可读可写,其中分片1可读可写,分片 2 可读不可写,Topic A 在分片 2 的未读消息依然可以消费。总结起来就是 Broker 集群里,只要任意一组 Broker 存活一个节点,Topic 的读写可用性不受影响。如果某组 Broker 主备全部宕机,那么 Topic 新数据的读写也不受影响,未读消息会延迟,待任意主备启动才能继续消费。 接下来,再看 NameServer 集群的故障情况,由于 NameServer 是 ShareNothing 的架构,每个节点都是无状态的,并且是 AP 模式,不需要依赖多数派算法,所以只要有一台 NameServer 存活,整个服务发现机制都是正常的,Topic 的读写可用性不受影响。 甚至在更极端的情况下,整个 NS 都不可用,由于 RocketMQ 的 SDK 对服务发现元数据有缓存,只要 SDK 不重启,它依然可以按照当下的 topic 元数据,继续进行消息收发。 5.3. MessageQueue 高可用 从 Topic 高可用的实现中我们发现,虽然 Topic 持续可读可写,但是 Topic 的读写队列数会发生变化。队列数变化,会对某些数据集成的业务有影响,比如说异构数据库 Binlog 同步,同一个记录的变更 Binlog 会写入不同的队列,重放 Binlog 可能会出现乱序,导致脏数据。所以我们还需要对现有的高可用进一步增强,要保障局部节点不可用时,不仅 Topic 可读可写,并且 Topic 的可读写队列数量不变,指定的队列也是可读可写的。 如下图,NameServer 或 Broker 任意出现单点不可用,Topic A 依然保持 2 个队列,每个队列都具备读写能力。 为了解决 MessageQueue 高可用的场景,RocketMQ 5.0 引入全新的高可用机制。我们先来了解其中的核心概念: Dledger Controller,这是一个基于 raft 协议的强一致元数据组件,来执行选主命令、维护状态机信息。 SynStateSet,如图,它维护了处于同步状态的副本组集合,这个集合里的节点都有完整的数据,当主节点宕机后,就从这个集合中选择新的主节点。 Replication,用于不同副本之间的数据复制、数据校验、截断对齐等事项。 下图是 RocketMQ 5.0 HA 的架构全景图,这个高可用架构具有多个优势。 一是在消息存储引入了朝代和开始位点,基于这两个数据,完成数据校验、截断对齐,在构建副本组的过程中简化数据一致性逻辑。 二是基于 Dledger Controller,我们不需要引入 zk、etcd 等外部分布式一致性系统,并且 Dledger Controller 还可以和 NameServer 合并部署,简化运维、节约机器资源。 三是 RocketMQ 对 Dledger Controller 是弱依赖,即便 Dledger 整体不可用了,也只会影响选主,不影响正常的消息收发流程。 四是可定制,用户可以根据业务对数据可靠性、性能、成本综合选择,比如副本数可以是2、3、4、5,副本直接可以是同步复制、异步复制。如 22 模式表示,2 副本、并且数据同步复制;23 模式表示3副本,2副本多数派完成复制,才算成功。用户还可以将其中的一个副本部署在异地机房,异步复制实现容灾。 5.4. 云原生存储 前面我们讲的存储系统都是 RocketMQ 面向本地文件系统的实现。但是在云原生时代,当我们把 RocketMQ 部署到云环境,可以进一步利用云原生基础设施,如云存储来进一步增强 RocketMQ 的存储能力。在 RocketMQ 5.0 里面我们提供了多级存储的特性,它是内核级的存储扩展,我们面向对象存储扩展了对应的 Commitlog、ConsumeQueue 和 IndexFile;我们采用了插件化的设计,多级存储可以有多种实现,在阿里云上,我们基于 OSS 对象服务实现,在 AWS 上我们则可以面向 S3 的接口来实现。 通过引入了这个云原生的存储,RocketMQ 释放了很多红利: 无限存储能力,消息存储空间不受本地磁盘空间的限制,原来是保存几天,现在可以几个月、甚至存一年。另外对象存储也是业界成本最低的存储系统,特别适合冷数据存储。 Topic 的 TTL,原来多个 Topic 的生命周期是和 Commitlog 绑定,统一的保留时间。现在每个 Topic 都会使用独立的对象存储 Commitlog 文件,可以有独立的 TTL。 存储系统进一步的存算分离,能把存储吞吐量的弹性和存储空间的弹性分离。 冷热数据隔离,分离了冷热数据的读链路,能大幅度提升冷读性能,不会影响在线业务。
作者:隆基
#技术探索

2024年8月9日

深度剖析 RocketMQ 5.0 之消息进阶:如何支撑复杂业务消息场景?
简介: 本文主要学习 RocketMQ 的一致性特性,一致性对于交易、金融都是刚需。从大规模复杂业务出发,学习 RocketMQ 的 SQL 订阅、定时消息等特性。再从高可用的角度来看,这里更多的是大型公司对于高阶可用性的要求,如同城容灾、异地多活等。 1. 前言 从初代开源消息队列崛起,到 PC 互联网、移动互联网爆发式发展,再到如今 IoT、云计算、云原生引领了新的技术趋势,消息中间件的发展已经走过了 30 多个年头。 目前,消息中间件在国内许多行业的关键应用中扮演着至关重要的角色。随着数字化转型的深入,客户在使用消息技术的过程中往往同时涉及交叉场景,比如同时进行物联网消息、微服务消息的处理,同时进行应用集成、数据集成、实时分析等,企业需要为此维护多套消息系统,付出更多的资源成本和学习成本。 在这样的背景下,2022 年,RocketMQ 5.0 正式发布,相对于 RocketMQ 4.0,架构走向云原生化,并且覆盖了更多的业务场景。想要掌握最新版本 RocketMQ 的应用,就需要进行更加体系化的深入了解。 2. 背景 今天的课程是 RocketMQ 5.0 消息进阶。这节课依然聚焦在业务消息场景,我们在 RocketMQ 5.0 概述里面就提到 RocketMQ 可以应对复杂的业务消息场景。这节课我们就从功能特性的角度出发,来看 RocketMQ 是如何去解决复杂业务场景的。 第一部分会先学习 RocketMQ 的一致性特性,一致性对于交易、金融都是刚需。第二部分,我们从大规模复杂业务出发,学习 RocketMQ 的 SQL 订阅、定时消息等特性。第三部分,我们再从高可用的角度来看,这里更多的是大型公司对于高阶可用性的要求,如同城容灾、异地多活等。 3. 一致性 3.1. 事务消息 3.1.1. 场景 我们先来看 RocketMQ 的第一个特性——事务消息,这是和一致性相关的特性,这也是 RocketMQ 有别于其他消息队列的一个最具区分度的特性。我们还是继续沿用大规模电商系统的案例,如图,我们仔细梳理一下流程,付款成功会在交易系统中订单数据库将订单状态更新为已付款,然后交易系统再发一条消息给 RocketMQ,RocketMQ 把订单已付款的事件通知给所有下游的应用,保障后续的履约环节。 但是这个流程有个问题,就是交易系统写数据库和发消息是分开的,它不是一个事务。会出现多种异常情况,比如数据库写成功了,但消息发失败了,这个订单的状态下游应用接收不到,对于电商业务可能就造成大量用户付款了,但是卖家不发货。如果先发消息成功,再写数据库失败,会造成下游应用认为订单已付款,推进卖家发货,但是实际用户未付款成功。这些异常都会对电商业务造成大量脏数据,产生灾难性业务后果。 这就需要使用 RocketMQ 高阶特性——事务消息。事务消息的能力是要保障生产者的本地事务(如写数据库)、发消息事务的一致性,最后通过 Broker at least once 的消费语义,保证消费者的本地事务也能执行成功。最终实现生产者、消费者对同一业务的事务状态达到最终一致。 3.1.2. 原理 如下图,事务消息的实现是两个阶段:提交+事务补偿机制结合实现的。 首先,生产者会发送 half 消息,也就是 prepare 消息,broker 会把 half 队列中。接下来生产者执行本地事务,一般是写数据库,本地事务完成后,会往 RocketMQ 发送 commit 操作,RocketMQ 会把 commit 操作写入 OP 队列,并进行 compact,把已提交的消息写到 ConsumeQueue 对消费者可见。反过来如果是 rollback 操作,则会跳过对应的 half 消息。面对异常的情况,比如生产者在发送 commit 或者 rollback 之前宕机了,RocketMQ broker 还会有补偿检查机制,定期回查 Producer 的事务状态,继续推进事务。 无论是 Prepare 消息、还是 Commit/Rollback 消息、或者是 compact 环节,在存储层面都是遵守 RocketMQ 以顺序读写为主的设计理念,达到最优吞吐量。 3.1.3. demo 接下来,我们来看一个事务消息的简单示例。 使用事务消息需要实现一个事务状态的查询器,这也是和普通消息一个最大的区别。如果我们是一个交易系统,这个事务回查器的实现可能就是根据订单 ID 去查询数据库来确定这个订单的状态到底是否是提交,比如说创建成功、已付款、已退款之类的。主体的消息生产流程也有很多不同,需要开启分布式事务,进行两阶段提交,先发一个 prepare 的消息,然后再去执行本地事务。这里的本地事务一般就是执行数据库操作。然后如果本地事务执行成功的话,就整体 commit,把之前的 prepare 的消息提交掉。这样一来,消费者就可以消费这条消息。如果本地事务出现异常的话,那么就把整个事务 rollback 掉,之前的那条 prepare 的消息也会被取消掉,整个过程就回滚了。事务消息的用法变化主要体现在生产者代码,消费者使用方式和普通消息一致,demo 里面就不展示了。 3.2. 顺序消息 3.2.1. 场景 + 原理 第二个高级特性是顺序消息,这个也是 RocketMQ 的特色能力之一。它解决的是顺序一致性的问题,要保障同一个业务的消息,生产和消费的顺序保持一致。在阿里曾有个场景是买卖家数据库复制,由于阿里订单数据库采用分库分表技术,面向买卖家不同的业务场景,分别按照买家主键和卖家主键拆分成买卖家数据库。两个数据库的同步就是采用了 Binlog 顺序分发的机制,通过使用顺序消息,把买家库的 Binlog 变更按照严格顺序在卖家库回放,以此达到订单数据库的一致性。如果没有顺序保障,那么就可能出现数据库级别的脏数据,将会带来严重的业务错误。 顺序消息的实现原理如下图,充分利用 Log 天然顺序读写的特点高效实现。在 Broker 存储模型中,每个 Topic 都会有固定的 ConsumeQueue,可以理解为 Topic 的分区生产者为发送消息加上业务 Key,在这个 case 里面可以用订单 ID,同一订单 ID 的消息会顺序发送到同一个 Topic 分区,每个分区在某个时刻只会被一个消费者锁定,消费者顺序读取同一个分区的消息串行消费,以此来达到顺序一致性。 3.2.2. demo 接下来,我们来看顺序消息的一个简单demo。 对于顺序消息来说,生产者跟消费者都有需要注意的地方。 在生产阶段,首先要定义一个消息的 group。每条消息都可以选择一个业务 ID 作为消息 Group,这个业务 ID 尽量离散、随机。因为同一个业务 ID 会分配到同一个数据存储分片,生产和消费都在这个数据分片上串行,如果业务 ID 有热点,会造成严重的数据倾斜和局部消息堆积。比如说在电商交易的场景,一般会选择订单 ID 进行业务消息分组,因为订单 ID 会比较离散。但如果我们选择的是卖家 ID,就有可能会出现热点,热点卖家的流量会远大于普通卖家。 在消费阶段的话,消费阶段有跟常规的消息收发一样有两种模式,一种是全托管的 push consumer 模式,一种是半托管的simple consumer 的模式,RocketMQ SDK 会保障同一个分组的消息串行进入业务消费逻辑。需要注意的点是,我们自身的业务消费代码也要串行进行,然后同步返回消费成功确认。不要把同分组的消息又放到另外的线程池并发消费,这样会破坏顺序语义。 4. 大规模业务 4.1. SQL 过滤 4.1.1. 场景 第三个高级特性是 SQL 消费模式,这个也是复杂业务场景的刚需。我们回到阿里的电商场景,阿里的整个电商业务都是围绕着交易展开,有数百个不同的业务在订阅交易的消息。这些业务基本上都面向某个细分领域,都只需要交易 Topic 下的部分消息。按照传统的模式,一般就是全量订阅交易 Topic,在消费者本地过滤即可,但是这样会消耗大量的计算、网络资源,特别是在双十一的峰值,这个方案的成本是无法接受的。 4.1.2. 原理 为了解决这个问题,RocketMQ 提供了 SQL 消费模式。在交易场景下,每笔订单消息都会带有不同维度的业务属性,包括卖家 ID、买家 ID、类目、省市、价格、订单状态等属性,而 SQL 过滤就是能让消费者通过 SQL 语句过滤消费目标消息。如下图,某个消费者只想关注某个价格区间内的订单创建消息,于是创建这个订阅关系 【Topic=Trade ,SQL: status= order create and (Price between 50 and 100)】,Broker 会在服务端运行 SQL 计算,只返回有效数据给消费者。为了提高性能,Broker 还引入了布隆过滤器模块,在消息写入分发时刻,提前计算结果,写入位图过滤器,减少无效 IO。总的来说就是把过滤链路不断前置,从消费端本地过滤,到服务端写时过滤,达到最优性能。 4.1.3. demo 接下来,我们再来看一个 SQL 订阅的示例。目前 RocketMQ SQL 过滤支持如下的语法,包括属性非空判断、属性大小比较、属性区间过滤、集合判断和逻辑计算,能满足绝大部分的过滤需求。 在消息生产阶段,我们除了设置 Topic、Tag 之外,还能添加多个自定义属性。比如在这个案例里面,我们设置了一个 region 的属性,表示这条消息是从杭州 region 发出来的。在消费的时候,我们就可以对根据自定义属性来进行 SQL 过滤订阅了。第一个 case 是我们用了一个 filter expression,判断 region 这个字段不为空且等于杭州才消费。第二个 case 添加更多的条件,如果这是一笔订单消息,我们还可以同时判断 region 条件和价格区间来决定是否消费。第三个 case 是全接收模式,表达式直接为 True,这个订阅方式会接收某一个主题下面的全量消息,不进行任何过滤。 4.2. 定时消息 4.2.1. 场景+原理 第四个高级特性是定时消息,生产者可以指定某条消息在发送后经过一定时间后才对消费者可见。有不少业务场景需要大规模的定时事件触发,比如典型的电商场景,基本上都有订单创建30分钟未付款就自动关闭订单的逻辑,定时消息能为这个场景带来极大的便利性。 RocketMQ 的定时消息是基于时间轮来实现的。TimerWheel,相信大家并不陌生,模拟表盘转动,来达到对时间进行排序的目的。TimerWheel 中的每一格代表着最小的时间刻度,称之为Tick,RocketMQ 里面是每一个 Tick 为一秒,同一个时刻的消息会写到同一个格子里。由于每个时刻可能会同时触发多条消息,并且每条消息的写入时刻都不一样,所以 RocketMQ 也同时引入了 Timerlog 的数据结构,Timerlog 按照顺序 append 方式写入数据,每个元素都包含消息的物理索引、以及指向同一个时刻的前一条消息,组成一个逻辑链表。TimeWheel 的每个格子都维护这个时刻的消息链表的头尾指针。类似表盘,TimerWheel 会有一个指针,代表当前时刻,绕着 TimerWheel 循环转动,指针所指之处,代表这一个 Tick 到期,所有内容一起弹出,会写到 ConsumeQueue,对消费者可见。 目前 RocketMQ 的定时消息性能已经远超 RabbitMQ 和 ActiveMQ。 5. 全局高可用 接下来,我们再讲一下 RocketMQ 的全局高可用技术解决方案。 在消息基础原理的文章里,我们提到 RocketMQ 的高可用架构,主要是指 RocketMQ 集群内的数据多副本和服务高可用。今天我们这里讲的高可用是全局的,就是业界经常说的同城容灾、两地三中心、异地多活等架构。现在蚂蚁支付和阿里交易采用的是异地多活的架构。异地多活相对于冷备、同城容灾、两地三中心模式具备更多的优点,可以应对城市级别的灾难,如地震、断电等事件。除此之外,一些因为人为的操作,比如说某个基础系统变更,引入新的 bug,导致的整个机房级别的不可用,异地多活的架构可以直接把流量切到可用机房,优先保障业务连续性,再去定位具体的问题。另一方面,异地多活还能实现机房级别的扩容,单一机房的计算存储资源是有限的,异地多活架构可以把业务流量按照比例分散在全国各地机房。同时多活架构实现了所有机房都在提供业务服务,而不是冷备状态,资源利用率大幅度提升。由于是多活状态,面对极端场景的切流,可用性更有保障,信心更足。 在异地多活的架构中,RocketMQ 承担的是基础架构的多活能力。多活的架构分为几个模块: 首先是接入层,通过统一接入层按照业务 ID 把用户请求分散到多个机房,业务ID一般可采用用户ID。 其次是应用层,应用层一般无状态,当请求进入某个机房后,需要尽量保障该请求的整个链路都在单元内封闭,包括 RPC、数据库访问、消息读写。这样才可以降低访问延迟,保障系统性能不会因为多活架构衰退。 再往下是数据层,包括数据库,消息队列等有状态系统。这里侧重讲解 RocketMQ 的异地多活,RocketMQ 通过 connector 组件,实现按 topic 粒度实时同步消息的数据;按照 Consumer 和 Topic 的组合粒度实时同步消费状态。 最后还需要全局的管控层。管控层要维护全局的单元化规则,哪些流量走到哪些机房;管理多活元数据配置,哪些应用需要多活、哪些 Topic 需要多活;另外在切流时刻,要协调所有系统的切流过程,控制好切流顺序。
作者:隆基
#技术探索 #功能特性

2024年8月9日

深度剖析 RocketMQ 5.0 之 IoT 消息:物联网需要什么样的消息技术?
简介: 本文来学习一个典型的物联网技术架构,以及在这个技术架构里面,消息队列所发挥的作用。在物联网的场景里面,对消息技术的要求和面向服务端应用的消息技术有什么区别?学习 RocketMQ 5.0 的子产品 MQTT,是如何解决这些物联网技术难题的。 1.前言 从初代开源消息队列崛起,到 PC 互联网、移动互联网爆发式发展,再到如今 IoT、云计算、云原生引领了新的技术趋势,消息中间件的发展已经走过了 30 多个年头。 目前,消息中间件在国内许多行业的关键应用中扮演着至关重要的角色。随着数字化转型的深入,客户在使用消息技术的过程中往往同时涉及交叉场景,比如同时进行物联网消息、微服务消息的处理,同时进行应用集成、数据集成、实时分析等,企业需要为此维护多套消息系统,付出更多的资源成本和学习成本。 在这样的背景下,2022 年,RocketMQ 5.0 正式发布,相对于 RocketMQ 4.0,架构走向云原生化,并且覆盖了更多的业务场景。想要掌握最新版本 RocketMQ 的应用,就需要进行更加体系化的深入了解。 2.背景 本节课分为三个部分,第一部分,我们来学习一个典型的物联网技术架构,以及在这个技术架构里面,消息队列所发挥的作用。第二部分我们会讲在物联网的场景里面,对消息技术的要求和面向服务端应用的消息技术有什么区别?第三部分,我们会学习 RocketMQ 5.0 的子产品 MQTT,是如何解决这些物联网技术难题的。 3. 物联网消息场景 我们先来了解一下物联网的场景是什么,以及消息在物联网里面有什么作用。 物联网肯定是最近几年最火的技术趋势之一,有大量的研究机构、行业报告都提出了物联网快速发展的态势。首先是物联网设备规模爆发式增长,预测会在 2025 年达到 200 多亿台。 其次是物联网的数据规模,来自物联网的数据增速接近 28%,并且未来有 90% 以上的实时数据来自物联网场景。这也就意味着未来的实时流数据处理数据类型会有大量物联网数据。 最后一个重要的趋势是边缘计算,未来会有 75% 的数据在传统数据中心或者云环境之外来处理,这里的边缘指的是商店、工厂、火车等等这些离数据源更近的地方。由于物联网产生的数据规模很大,如果全部数据传输到云端处理,会面临难以承受的成本,应该充分利用边缘的资源直接计算,再把高价值的计算结果传输云端;另一方面,在离用户近的地方计算直接响应,可以降低延迟,提升用户体验。 物联网的发展速度这么快,数据规模那么大,跟消息有什么关系呢?我们通过这个图来看一下消息在物联网场景发挥的作用:第一个作用是连接,承担通信的职责,支持设备和设备的通信,设备和云端应用的通信,比如传感器数据上报、云端指令下发等等这些功能,支撑 IoT 的应用架构,连接云边端。第二个作用是数据处理,物联网设备源源不断的产生数据流,有大量需要实时流处理的场景,比如设备维护,高温预警等等。基于 MQ 的事件流存储和流计算能力,可以构建物联网场景的数据架构。 4. 物联网消息技术 下面我们来看看在物联网场景里,对消息技术有什么诉求?我们先从这个表格来分析物联网消息技术跟之前我们讲的经典消息技术有什么区别? 经典的消息主要是为服务端系统提供发布订阅的能力,而物联网的消息技术是为物联网设备之间、设备和服务端之间提供发布订阅的能力。我们来分别看一下各自场景的特点。 在经典消息场景里,消息 broker、消息客户端都是服务端系统,这些系统都是部署在 IDC 或者公共云环境。无论是消息客户端、消息服务端,都会部署在配置比较不错的服务器机型,有容器、虚拟机、物理机等等这些形式。同时,客户端和消息服务端一般都是部署在同一个机房,属于内网环境,网络带宽特别高,而且网络质量稳定。客户端的数量一般对应到应用服务器的数量,规模较小,一般都是数百、数千台服务器,只有超大规模的互联网公司才能达到百万级。从生产消费的角度来看,每个客户端的消息生产发送量一般对应到其业务的 TPS,能达数百数千的 TPS。在消息消费方面,一般是采用集群消费,一个应用集群共享一个消费者 ID,共同分担该消费组的消息。每条消息的订阅比一般也不高,正常情况下不会超过 10 个。 而在 IoT 消息场景,很多条件都不一样,甚至是相反的。IoT的消息客户端是微型设备,计算存储资源都很有限,消息服务端可能要部署在边缘环境,使用的服务器配置也会比较差。另一方面物联网设备,一般是通过公网的环境来连接的,它的环境特别复杂,而且经常会不断移动,有些时候会断网或处于弱网环境,网络质量差。物联网场景中,消息的客户端实例数对应到物联网设备数,可以到亿级别,比大型互联网公司的服务器数量要大很多。每个设备的消息 tps 不高,但是一条消息有可能同时被百万级的设备接受,订阅比特别高。 5. RocketMQ MQTT 从这里可以看出,物联网需要的消息技术和经典的消息设计很不一样。接下来我们再来看,为了应对物联网的消息场景,RocketMQ 5.0 做了哪些事情?RocketMQ 5.0 里面,我们发布了一个子产品,叫做 RocketMQ MQTT。它有三个技术特点: 首先,它采用的标准的物联网协议 MQTT,该协议面向物联网弱网环境、低算力的特点设计,协议十分精简。同时有很丰富的特性,支持多种订阅模式,多种消息的 QoS,比如有最多一次,最少一次,当且仅当一次。它的领域模型设计也是 消息、 主题、发布订阅等等这些概念,和 RocketMQ 特别匹配,这为打造一个云端一体的 RocketMQ 产品形态奠定了基础。 第二,它采用的是纯算分离的架构。RocketMQ Broker 作为存储层,MQTT 相关的领域逻辑都在 MQTT Proxy 层实现,并面向海量的连接、订阅关系、实时推送深度优化,Proxy 层可以根据物联网业务的负载独立弹性,如连接数增加,只需要新增 proxy 节点。 第三,它采用的是端云一体化的架构,因为领域模型接近、并且以 RocketMQ 作为存储层,每条消息只存一份,这份消息既能被物联网设备消费,也能被云端应用消费。另外 RocketMQ 本身是天然的流存储,流计算引擎可以无缝对 IoT 数据进行实时分析。 5.1. IoT 消息存储模型 接下来我们再从几个关键的技术点,来深入了解 RocketMQ 的物联网技术实现。 5.1.1. 读放大为主,写放大为辅 首先要解决的问题是物联网消息的存储模型,在发布订阅的业务模型里,一般会采用两种存储模型,一种是读放大,每条消息只写到一个公共队列,所有的消费者读取这个共享队列,维护自己的消费位点。另外一种模型是写放大模型,每个消费者有自己的队列,每条消息都要分发到目标消费者的队列中,消费者只读自己的队列。 因为在物联网场景里,一条消息可能会有百万级的设备消费,所以,很显然,选择读放大的模型能显著降低存储成本、提高性能。 但是,只选择读放大的模式没法完全满足要求,MQTT 协议有其特殊性,它的 Topic 是多级 Topic,而且订阅方式既有精准订阅,也有通配符匹配订阅。比如在家居场景,我们定义一个多级主题,比如家/浴室/温度,有直接订阅完整多级主题的 家/浴室/温度,也有采用通配符订阅只关注温度的,还有只关注一级主题为 家的所有消息。 对于直接订阅完整的多级主题消费者可以采用读放大的方式直接读取对应多级主题的公共队列;而采用通配符订阅的消费者无法反推消息的 Topic,所以需要在消息存储时根据通配符的订阅关系多写一个通配符队列,这样消费者就可以根据其订阅的通配符队列读取消息。 这就是 RocketMQ 采用的读放大为主,写放大为辅的存储模型。 5.1.2. 端云一体化存储 基于上节课的分析,我们设计了 RocketMQ 端云一体化的存储模型,看下这张图。 消息可以来自各个接入场景(如服务端的 RMQ/AMQP,设备端的 MQTT),但只会写一份存到 commitlog 里面,然后分发出多个需求场景的队列索引,比如服务端场景(MQ/AMQP)可以按照一级 Topic 队列进行传统的服务端消费,设备端场景可以按照 MQTT 多级 Topic 以及通配符订阅进行消费消息。 这样我们就可以基于同一套存储引擎,同时支持服务端应用集成和 IoT 场景的消息收发,达到端云一体化。 5.2. 队列规模问题 我们都知道像 Kafka 这样的消息队列每个 Topic 是独立文件,但是随着 Topic 增多消息文件数量也增多,顺序写就退化成了随机写,性能明显下降。RocketMQ 在 Kafka 的基础上进行了改进,使用了一个 Commitlog 文件来保存所有的消息内容,再使用 CQ 索引文件来表示每个 Topic 里面的消息队列,因为 CQ 索引数据比较小,文件增多对 IO 影响要小很多,所以在队列数量上可以达到十万级。但是这个终端设备队列的场景下,十万级的队列数量还是太小了,我们希望进一步提升一个数量级,达到百万级队列数量,所以,我们引入了 Rocksdb 引擎来进行 CQ 索引分发。 面向 IoT 的百万级队列设计 Rocksdb 是一个广泛使用的单机 KV 存储引擎,有高性能的顺序写能力。因为我们有了 commitlog 已具备了消息顺序流存储,所以可以去掉 Rocksdb 引擎里面的 WAL,基于 Rocksdb 来保存 CQ 索引。在分发的时候我们使用了 Rocksdb 的 WriteBatch 原子特性,分发的时候把当前的 MaxPhyOffset 注入进去,因为 Rocksdb 能够保证原子存储,后续可以根据这个 MaxPhyOffset 来做 Recover 的 checkpoint。最后,我们也提供了一个 Compaction 的自定义实现,来进行 PhyOffset 的确认,以清理已删除的脏数据。 5.3. IoT 消息推送模型 介绍了底层的队列存储模型后,我们再详细描述一下匹配查找和可靠触达是怎么做的。在 RocketMQ 的经典消费模式里,消费者是直接采用长轮询的方式,从客户端直接发起请求,精确读取对应的 topic 队列。而在 MQTT 场景里,因为客户端数量、订阅关系数量规模巨大,无法采用原来的长轮询模式,消费链路的实现更加复杂。这里使用的是推拉结合的模型。 这里展示的是一个推拉模型,终端设备通过 MQTT 协议连到 Proxy 节点。消息可以来自多种场景(MQ/AMQP/MQTT)发送过来,存到 Topic 队列后会有一个 notify 逻辑模块来实时感知这个新消息到达,然后会生成消息事件(就是消息的 Topic 名称),把这个事件推送至网关节点,网关节点根据它连上的终端设备订阅情况进行内部匹配,找到哪些终端设备能匹配上,然后会触发 pull 请求去存储层读取消息,再推送终端设备。 一个重要问题,就是 notify 模块怎么知道一条消息在哪些网关节点上面的终端设备感兴趣,这个其实就是关键的匹配查找问题。一般有两种方式:第一种,简单的广播事件;第二种,集中存储在线订阅关系(比如图里的 lookup 模块),然后进行匹配查找,再精准推送。事件广播机制看起来有扩展性问题,但是其实性能并不差,因为我们推送的数据很小,就是 Topic 名称,而且相同 Topic 的消息事件可以合并成一个事件,我们线上就是默认采用的这个方式。集中存储在线订阅关系,这个也是常见的一种做法,如保存到 RDS、Redis 等等,但要保证数据的实时一致性也是有难度的,而且要进行匹配查找对整个消息的实时链路RT开销也会有一定的影响。这幅图里还有一个 Cache 模块,用来做消息队列 cache,避免在大广播比场景下每个终端设备都向存储层发起读数据情况。
作者:隆基
#技术探索 #物联网

2024年8月9日

深度剖析 RocketMQ 5.0 之事件驱动:云时代的事件驱动有啥不同?
简介: 本文技术理念的层面了解一下事件驱动的概念。RocketMQ 5.0 在面向云时代的事件驱动架构新推出的子产品 EventBridge,最后再结合几个具体的案例帮助大家了解云时代的事件驱动方案。 1.前言 从初代开源消息队列崛起,到 PC 互联网、移动互联网爆发式发展,再到如今 IoT、云计算、云原生引领了新的技术趋势,消息中间件的发展已经走过了 30 多个年头。 目前,消息中间件在国内许多行业的关键应用中扮演着至关重要的角色。随着数字化转型的深入,客户在使用消息技术的过程中往往同时涉及交叉场景,比如同时进行物联网消息、微服务消息的处理,同时进行应用集成、数据集成、实时分析等,企业需要为此维护多套消息系统,付出更多的资源成本和学习成本。 在这样的背景下,2022 年,RocketMQ 5.0 正式发布,相对于 RocketMQ 4.0,架构走向云原生化,并且覆盖了更多的业务场景。想要掌握最新版本 RocketMQ 的应用,就需要进行更加体系化的深入了解。 2.背景 今天我们要学习的课程是 RocketMQ 5.0 的事件驱动。事件驱动是一个经典的概念,通过今天这节课,我们会掌握云时代的事件驱动和之前有哪些不同 这是今天我们要学习的内容,第一部分先从技术理念的层面了解一下事件驱动的概念。第二部分会讲,RocketMQ 5.0 在面向云时代的事件驱动架构新推出的子产品 EventBridge,最后再结合几个具体的案例帮助大家了解云时代的事件驱动方案。 3. 事件驱动架构 3.1. 事件驱动架构定义 首先我们来学习一下什么是事件驱动。先从事件驱动的定义来看,事件驱动本质上是一种软件设计模式。它能够最大化降低不同模块以及不同系统之间的耦合度。 这里有一个典型的事件驱动架构图,首先是事件生产者发送事件到 EventBroker,然后 EventBroker 会把事件路由到对应的消费者进行事件处理。事件处理能够灵活扩展,随时增减事件消费者,事件生产者对此透明。 为什么说事件驱动是个很经典的设计模式呢,因为早在几十年前,就出现过多种事件驱动的技术,比如桌面客户端编程框架,点击按钮就可以触发 onclick 事件,开发者编写业务逻辑响应事件。在编程语言上,也经常会采用事件驱动的代码模式,比如 callback、handler 这类的函数。进入分布式系统的时代,系统之间的通信协同也会采用事件驱动的方式。 你有没有发现,这里的图和之前 RocketMQ 的消息应用解耦图很像。没错,无论是消息的发布订阅,还是事件的生产消费都是为了进行代码解耦、系统解耦。消息队列更偏技术实现,大部分的 EventBroker 都是基于消息队列实现的,而事件驱动更偏向于架构理念。 3.2. 事件的特征 从技术角度来看,消息队列是和 RPC 对应的,一个是同步通信,一个是异步通信。消息队列并不会规定消息的内容,只负责传输二进制内容。如果从技术实现来看,的确,EDA 需要的核心技术就是消息队列的技术。事件驱动跟消息驱动最大的区别就是,事件是一种特殊的消息,只有消息满足了某些特征,才能把它叫做事件。 我打个比方,来看左边这个图。消息就像是一个抽象类,有多种子类,最主要的就是 Command 和 Event 两种。以信号灯为例,向信号灯发送打开的消息,这就是一种 Command,信号灯接受这个 Command 并开灯。开灯后,信号灯对外发出信号灯变成绿色的消息,这个就是一种 Event。 对于 Event 来说,有四个主要的特征: 第一,它是一个不可变的,事件就是表示已经发生了的事情,已经成为事实。 第二,事件有时间概念,并且对同一个实体来说事件的发送是有序的。如信号灯按顺序发送了绿、黄、红等事件。 第三,事件是无预期的,这个就是EDA架构之所以能够实现最大化解耦的特点,事件的产生者对于谁是事件消费者,怎么消费这个事件是不关心的。 第四,由于事件驱动是彻底解耦的,并且对于下游怎么去消费事件没有预期,所以事件是具象化的,应该包括尽可能详尽的信息,让下游消费者各取所需。比如像交通交通信号灯事件,包含多个字段,包括它的来源是谁、它的类型是什么?它的主题是什么?是具体的哪一个信号灯,另外它会包含唯一的ID,便于跟踪?它会有事件发生时间,事件的内容。 3.3. 云时代的事件驱动 在全行业数字化转型的时代,事件驱动架构应用范围扩大,成为 Gartner 年度十大技术趋势。在新型的数字化商业解决方案里,会有 60% 采纳 EDA 架构。 事件驱动作为一个经典的架构模式,为什么会在云时代再度成为焦点呢?主要有两个原因: 首先是云原生技术带来的,其中之一是微服务。微服务是云原生应用架构的核心,引入微服务架构,数字化企业能够按照小型化的业务单元和团队划分,以“高内聚、低耦合”的方式高效协作。但是微服务架构也会带来新的问题,比如大量同步微服务会面临延迟增大、可用性降低等风险,采用事件驱动的微服务体系,可提高微服务的韧性,降低延迟,实现更彻底的解耦。 另外一个云原生代表技术 Serverless 架构范式本身也是事件驱动的。现在主要的 Serverless 产品形态,无论是阿里云的函数计算、还是 AWS 的 Lambda,它们的主要触发源都是各种形态的事件,比如云产品事件,OSS 文件上传,触发用户基于函数进行文件加工处理计算;用户业务事件,EventBroker 触发函数运行消费逻辑;云产品运维事件,用户通过响应事件,在云平台的基础上扩展自己的自动化运维体系。事件驱动架构的大规模使用,能够帮助数字化企业释放云计算 Serverless 的技术红利。 IoT 也是事件驱动架构的重要推动力,有大量的 IoT 应用构建都是基于事件驱动的,比如传感器上报设备事件,温度变化事件、地址位置变化事件等等,云端应用订阅这些事件触发对应的业务流程。 在全行业大规模数字化转型后,跨业务、跨组织的业务合作会从线下搬到线上,在数字经济时代,数字化商业生态规模会持续扩大,跨组织业务协同更需要彻底解耦。而 EDA 天然具备的异步、解耦的特性就可以解决这一系列的问题。比如阿里聚石塔业务就是事件驱动的模式,聚石塔实时发布交易事件,合作伙伴包括ISV、软件服务商、品牌商家订阅消费交易事件,建设个性化的 CRM、商家运营、后台管理系统等等,形成一个庞大的电子商务数字化生态。 4. EventBridge 4.1. 云时代的事件驱动能力抽象 接下来进入第二个部分的内容,一起学习一下 RocketMQ 5.0 的 EventBridge。在了解这个系统的技术实现之前,我们先来了解一下 EventBridge 对事件驱动的通用能力抽象,从这里也可以了解到 EventBridge 的领域模型。 我们从左往右看这张图。最左边是事件源,因为这个事件是希望被跨平台消费的,所以我们希望采用业界标准来作为事件的格式。同时,事件是有可能被跨组织消费的,所以我们需要一个统一的事件中心,让这些不同的事件源都注册到这个事件中心。对消费者来说就好比是一个事件商店,能够选择自己感兴趣的事件订阅。在事件消费者开始编写消费逻辑的时候,他还需要对这个事件的格式有更清楚的了解,需要知道这个事件有哪些内容,有哪些字段,分别是什么含义,才能编写正确的消费业务逻辑。所以,EventBridge 还提供了 schema 中心,有这个 schema 中心后,消费者对于事件的格式也就一目了然,不用跟事件源的发起者进行沟通了,整个效率也得到了大幅度的提升。再往后面看,就到了事件消费的环节,因为事件的消费者种类很多,不同消费者关注不同的事件类型,EventBridge 需要提供丰富的过滤规则。即便多个消费者对同一个事件感兴趣,但是可能只需要事件的部分内容,EventBridge 还提供了事件转换的能力。这就是 RocketMQ 5.0 对事件驱动的能力抽象。 4.2. 统一事件标准 在云计算的时代、大规模数字化转型时代,我们强调事件驱动架构往往跨越了不同的组织,不同的平台。所以事件驱动架构需要一个统一的事件标准。在 EventBridge 这个产品里,我们采纳了 CNCF 基金会中的 CloudEvents 标准,这个是业界事件的事实标准,这个标准就是为了简化事件声明,提升事件在跨服务、跨平台的互操作性。 CloudEvents 带来了很多价值: 第一,它提供了一种规范,使得跨组织、跨平台的事件集成,有了共同语言,加速更多的事件集成。然后也因为有的规范,所以它可以加速跨服务,跨平台的事件的集成。 第二,随着 Serverless 的普及,各大云厂商都提供函数计算的服务,有了 CloudEvents 规范,用户在函数计算的使用上就可以实现无厂商绑定。 第三,webhook 是一种通用的集成模式,有了 CloudEvents 规范作为统一格式,不同系统的 webhook 能实现更好的互操作性。 最后,基于这样统一的规范,其实是更有利于沉淀事件驱动的基础软件设施的,比如跨服务的事件 Tracing 链路追踪。 4.3. RocketMQ EventBridge 如下图是 RocketMQ 面向 EDA 场景全新推出的产品形态 EventBridge。 它的核心技术都是基于 RocketMQ,但是在产品界面上面向事件驱动的业务进行一层抽象,核心领域对象从消息变成 CloudEvents。基于统一事件标准来构建事件驱动的数字生态。它的事件源也很多样,可以是云产品事件,可以是 SaaS 平台事件,应用自定义事件、通用的 WebHook。当然,它的事件目标更是多样化的,通过事件规则引擎把事件路由到不同的消费者,典型的消费者,比如函数计算,也可以是存储系统,消息通知如钉钉短信,还有通用的的 webhook。通过事件驱动这种彻底解耦的架构,更适合建设混合云、多云的数字化系统。 为了提升事件驱动的研发效率,EventBridge 也支持 Schema 的特性,支持事件信息的解释、预览,甚至还可以自动化的生成代码,让开发者以低代码、0 代码的方式完成事件集成。 EventBridge 的另一个比较重要的特性是事件规则引擎。因为不同的事件消费者,他们对于事件的兴趣是不一样的。所以我们提供了七种事件过滤模式,包括前缀匹配、后缀匹配、除外匹配、数值匹配等等,可以进行各种复杂的组合逻辑过滤,只推送消费者感兴趣的事件。 当然,就算都关心同一个事件,不同消费者对事件内部的信息关注点也会有所不同。为了提升事件消费效率,我们也提供了四种事件转化器,可以只推送给消费者它关心的事件字段。还可以对事件进行自定义的模板转化,满足更灵活的业务诉求。 作为 RocketMQ 的子项目,在 EventBridge 里也同样提供了完整的可观测的能力。能够根据事件的时间、类型查询事件列表。每个事件都会生成唯一 ID。用户可以根据唯一 ID 去精确的定位事件的内容、发生时间、对应的事件规则,下游的消费状况,精准排查问题。 5. 典型案例 接下来结合几个典型案例来看 EventBridge 的使用场景。 第一个案例适用于使用大量云产品的公司。C 客户是一家以智能消费终端为核心的科技公司,希望收集账号里全部的云上事件,方便后续做分析或故障处理。公共云的 EventBridge 汇聚了所有的云产品事件,通过 EventBridge,客户能收集全量的事件对齐进行自定义的业务处理。还能够配置事件规则,过滤异常事件推送给监控系统或者钉钉,及时关注处理。 第二个案例是 SaaS 事件的集成。现在随着整个云计算生态的繁荣,有不少企业不仅使用了公共云的 IaaS、PaaS 产品,也会同时使用三方的 SaaS 产品,比如各种 ERP、CRM 等系统。基于 EventBridge 标准的 HTTP、webhook 的集成能力,能够无缝连接三方 SaaS 系统作为事件源,企业能够收集到他所关心的所有 SaaS 事件,方便后续管理,比如申请单,入职单,报销单,订单等等这些场景。 第三个案例是 SaaS 平台集成,以钉钉为例,钉钉是典型的 SaaS 平台,他有繁荣的生态,拥有 4000+ 家的生态伙伴,包括 ISV 生态伙伴、硬件生态伙伴、服务商、咨询生态和交付生态伙伴等等。通过 EventBridge 把公共云的 Paas 层生态和钉钉的 SaaS 层生态连接起来,而且依赖 EventBridge 完成整体事件生命周期的管理,以 WebHook 的形式推送给下游 ISV 接收端。比如钉钉的官方事件源包括视频会议、日程、通讯录、审批流、钉盘、宜搭等,企业和 SaaS 厂商可以充分利用这些官方应用的事件构建企业级的应用系统,也可以把钉钉的官方数据流和其他系统做深度集成。
作者:隆基
#技术探索 #事件驱动架构

2023年7月27日

RocketMQ 在业务消息场景的优势详解
一、消息场景 RocketMQ5.0是消息事件流一体的实时数据处理平台,是业务消息领域的事实标准,很多互联网公司在业务消息场景会使用RocketMQ。 我们反复提到的“消息、业务消息”,指的是分布式应用解耦,是RocketMQ的业务基本盘。通过本文,我们将深入了解RocketMQ5.0在业务消息场景的优势能力,了解为什么RocketMQ能够成为业务消息领域的事实标准。 RocketMQ在业务消息领域的经典场景是应用解耦,这也是RocketMQ诞生初期解决阿里电商分布式互联网架构的核心场景,主要承担分布式应用(微服务)的异步集成,达到应用解耦的效果。解耦是所有的软件架构最重要的追求。 分布式应用(微服务)采用同步RPC与异步消息的对比。比如在业务系统中,有三个上游应用与4个下游应用,采用同步RPC的方式,会有34的依赖复杂度;而采用异步消息的方式则可以化繁为简,简化为3+4的依赖复杂度,从乘法简化为加法。 通过引入消息队列实现应用的异步集成可以获得四大解耦优势。 代码解耦:极大提升业务敏捷度。如果用同步调用的方式,每次扩展业务逻辑都需要上游应用显式调用下游应用接口,代码直接耦合,上游应用要做变更发布,业务迭代互相掣肘。而通过使用消息队列扩展新的业务逻辑,只需要增加下游应用订阅某个Topic,上下游应用互相透明,业务可以保持灵活独立快速迭代。 延迟解耦:如果使用同步调用的方式,随着业务逻辑的增加,用户操作的远程调用次数会越来越多,业务响应越来越慢,性能衰减,业务发展不可持续。而使用消息队列,无论增加多少业务,上游应用只需调用一次消息队列的发送接口即可响应线上用户,延迟为常量,基本在5ms以内。 可用性解耦:如果使用同步调用的方式,任何下游业务不可用都会导致整个链路失败。该种结构下类似于串联电路,甚至在部分调用失败的情况下,还会出现状态不一致。而采用RocketMQ进行异步集成,只要RocketMQ服务可用,用户的业务操作便可用。RocketMQ服务通过多对主备组成的broker集群提供,只要有一对主备可用,则整体服务可用,作为基础软件,可用性远大于普通的业务应用,下游应用的业务推进都可以通过MQ的可靠消息投递来达成。 流量解耦:即削峰填谷。如果采用同步调用的方式,上下游的容量必须对齐,否则会出现级联不可用。容量完全对齐需要投入大量精力进行全链路压测与更多机器成本。而通过引入RocketMQ,基于RocketMQ亿级消息的堆积能力,对于实时性要求不高的下游业务,可以尽最大努力消费,既保证了系统稳定性,又降低了机器成本与研发运维成本。 二、基础特性 阿里的交易应用流程为:用户在淘宝上下单时会调用交易应用创建订单,交易应用将订单落到数据库,然后生产一条订单创建的消息到RocketMQ,返回给终端用户订单创建成功的接口。完成的交易流程推进则是依赖RocketMQ将订单创建消息投递给下游应用,会员应用收到订单消息,需要给买家赠送积分、淘金币,触发用户激励相关的业务。购物车应用则是负责删除在购物车里面的商品,避免用户重复购买。同时,支付系统与物流系统也都会基于订单状态的变更,推进支付环节与履约环节。 过去十年多年,阿里电商业务持续蓬勃发展,交易的下游应用已达数百个,并且还在不断增加。基于RocketMQ的电商架构极大提高了阿里电商业务的敏捷度,上游核心的交易系统完全无需关心哪些应用在订阅交易消息,交易应用的延迟与可用性也一直保持在很高水准,只依赖少量的核心系统与RocketMQ,不会受数百个下游应用的影响。 交易的下游业务类型不一,有大量的业务场景不需要实时消费交易数据,比如物流场景能容忍一定的延迟。通过RocketMQ的亿级堆积能力,极大降低了机器成本。RocketMQ的sharednothing架构具备无限横向扩展的能力,已经连续10年支撑了高速增长的双十一消息峰值,在几年前达到亿级TPS。 三、增强能力 经典场景下,RocketMQ相对于其他消息队列,拥有诸多差异化优势与增强。 首先,稳定性方面,稳定性交易是金融场景最重要的需求。RocketMQ的稳定性不仅限于高可用架构,而是通过全方位的产品能力来构建稳定性竞争力。比如重试队列,当下游消费者因为业务数据不ready或其他原因导致某条消息消费失败,RocketMQ不会因此阻塞消费,而是能将此消息加入到重试队列,然后按时间衰减重试。如果某条消息因为某些因素经过十几次重试始终无法消费成功,则RocketMQ会将它转到死信队列,用户可以通过其他手段来处理失败的消息,是金融行业的刚需。 同时,消费成功后如果因为代码bug导致业务不符合预期,应用可以对业务bug进行修复并重新发布,然后应用消息回溯的功能将消息拉回到之前的时间点,让业务按照正确逻辑重新处理。 RocketMQ的消费实现机制采用自适应拉模式的消费,在极端的场景下能够避免消费者被大流量打垮。同时,在消费者的SDK里,做了缓存本地的消息数量与消息内存占用的阈值保护,防止消费应用的内存风险。 其次,RocketMQ还具备优秀的可观测能力,是稳定性的重要辅助手段。RocketMQ是业界第一个提供消息消息级别可观测能力的消息队列,每条消息都可以带上业务主键,比如在交易场景,用户可以将订单ID作为消息的业务主键。当某个订单的业务需要排查,用户可以基于订单ID查询该条消息的生成时间以及消息内容。消息的可观测数据还能继续下钻,通过消息轨迹查看消息由哪台生产者机器发送、由哪些消费者机器在什么时间消费、消费状态是成功或失败等。 除此之外,它支持了几十种核心的度量数据,包括集群生产者流量分布、慢消费者排行、消费的平均延迟、消费堆积数量、消费成功率等。基于丰富的指标,用户可以搭建更加完善的监控报警体系来进一步加固稳定性。 为了支撑更灵活的应用架构,RocketMQ在生产与消费等关键接口提供了多种模式。 生产者接口:RocketMQ同时提供了同步发送接口与异步发送接口。同步发送是最常用的模式,业务流程的编排是串行的,在应用发完消息、Broker完成存储后返回成功后,应用再执行下一步逻辑。然而在某些场景下,完成业务涉及多个远程调用,应用为了进一步降低延迟、提高性能,会采用全异步化的方式,并发发出远程调用(可以是多次发消息或RPC的组合),异步收集结果推,进业务逻辑。 在消费者的接口方面也提供了两种方式: 监听器模式被动消费:这是目前使用最广泛的方式,用户无需关心客户端何时去Broker拉取消息,何时向Broker发出消费成功的确认,也无需维护消费线程池、本地消息缓存等细节。只需要写一段消息监听器的业务逻辑,根据业务执行结果返回Success或Failure。它属于全托管的模式,用户可以专注于业务逻辑的编写,而将实现细节完全委托给RocketMQ客户端。 主动消费模式:将更多的自主权交给用户,也称为Simple Consumer。在该种模式下,用户可以自己决定何时去Broker读取消息、何时发起消费确认消息。对业务逻辑的执行线程也有自主可控性,读取完消息后,可以将消费逻辑放在自定义的线程池执行。在某些场景下,不同消息的处理时长与优先级会有所不同,采用Simple Consumer的模式,用户可根据消息的属性、大小做二次分发,隔离到不同的业务线程池执行处理。该模式还提供了消息粒度消费超时时间的设定能力,针对某些消费耗时长的消息,用户能够调用change Invisible Duration接口,延长消费时间,避免超时重试。 四、总结 消息经典场景:应用解耦; RocketMQ基础特性:发布订阅、可靠消息、亿级堆积、无限扩展; 业务消息场景的增强能力:稳定性、可观测、多样化接口。 【活动】一键体验 RocketMQ 六大生产环境 免费试用+30秒一键体验,低门槛、快速、高效、易操作,带你了解“历经万亿级数据洪峰考验”的云消息队列RocketMQ! 点击阅读原文,立即参与活动!
作者:隆基
#技术探索

2023年7月20日

从互联网到云时代,Apache RocketMQ 是如何演进的?
2022年,RocketMQ5.0的正式版发布。相对于4.0版本而言,架构走向云原生化,并且覆盖了更多业务场景。 一、消息队列演进史 操作系统、数据库、中间件是基础软件的三驾马车,而消息队列属于最经典的中间件之一,已经有30多年的历史。消息队列的发展主要经历了以下几个阶段: 第一阶段(19802000年):80年代诞生了第一款消息队列The Information Bus,第一次提出发布订阅模式来解决软件之间的通信问题;90年代是国际商业软件巨头的时代,IBM、Oracle、Microsoft纷纷推出自己的MQ,其中最具代表性的为IBM MQ,价格昂贵,面向高端企业,主要是大型金融、电信等企业。该类商业MQ一般采用高端硬件,软硬件一体机交付,MQ本身的软件架构为单机架构。 第二阶段(2000~2007年):进入00年代后,初代开源消息队列崛起,诞生了JMS、AMQP两大标准,与之对应的两个实现分别为ActiveMQ、RabbitMQ,他们引领了初期的开源消息队列技术。开源极大促进了消息队列的流行,降低了使用门槛,技术普惠化,逐渐成为企业级架构的标配。相比于今天而言,这类MQ主要面向传统企业级应用和小流量场景,横向扩展能力较弱。 第三阶段(2007~2017年):PC互联网、移动互联网爆发式发展。由于传统的消息队列无法承受亿级用户的访问流量与海量数据传输,诞生了互联网消息中间件,核心能力是全面采用分布式架构,具备很强的横向扩展能力,开源典型代表有Kafka、RocketMQ,闭源的有淘宝Notify。Kafka的诞生还将消息中间件从消息领域延伸到了流领域,从分布式应用的异步解耦场景延伸到大数据领域的流存储与流计算场景。 第四阶段(2014~至今):云计算、IoT、大数据引领了新的浪潮。 二、互联网时代的RocketMQ 阿里的电商系统最初是个庞大的单体巨石应用,在研发效率、稳定性方面都无法满足淘宝和天猫飞速的发展。为了解决问题,2008年,淘宝与天猫发起了一次最大规模的架构升级,启动了“五彩石”项目,将单体应用拆分为分布式应用,同时抽象淘宝、天猫的共同底座——业务中台,包括交易中心、商品中心、买家中心等。在业务中台之下,同时诞生了阿里中间件(初期三大件包括消息、RPC、分布式数据层),RocketMQ是其中之一。 虽然在当时业界已经存在不少商业或开源的消息队列,比如IBMMQ、ActiveMQ、RabbitMQ,但无一例外,它们都诞生于传统企业级应用的场景,无法承受互联网对于高并发、无限扩展的苛刻要求。以RabbitMQ为例,RabbitMQ的队列流量与存储负载都为单机,无法满足业务横向扩展的需求。当时另一款具备无限横向扩展能力的消息队列是Kafka,但其主要用于日志类场景,未经过大规模核心业务稳定性验证,而且偏向于简单的log型消息队列,无法满足电商对于复杂消息功能特性的诉求,比如消息过滤、延迟消息等。 另一方面,传统的消息队列无法解决电商业务对于分布式一致性的要求。通过消息队列实现应用异步解耦后,电商业务还需要保障不同上下游应用对于订单状态要达成最终一致,否则会产生大量脏数据,造成业务错误。 大规模的电商系统,既要高性能又要一致性,传统的分布式事务技术束手无策。比如IBM MQ虽然可以使用XA事务来满足分布式一致性的功能诉求,但是XA带来的延迟与成本,对于海量的互联网流量难以承受。 为了解决电商业务对于消息队列的高性能、一致性、无限扩展等需求,自研消息队列成为了当时阿里唯一的出路,最终互联网消息队列RocketMQ应运而生。 为了支持超大规模的复杂电商业务,RocketMQ面向四个方面进行了重点建设,形成了四大优势能力。 ① 支撑超大规模复杂业务的能力,具备丰富的消息特性。 每一个大型互联网公司都会有主营业务(比如阿里是交易、蚂蚁是支付、饿了么是外卖),以主营业务为中心扩展业务能力,阿里电商是围绕交易事件建设的电商操作系统,每笔交易事件都会触发不同的业务,不同细分业务会关注不同类型的交易事件,比如垂直市场只关注某个类目的交易事件、天猫超市只关注某个卖家的交易事件、购物车只关注下单成功的交易事件等。 RocketMQ的SQL订阅提供灵活的消息过滤能力,能够满足下游消费者按照不同的业务维度进行消息过滤的诉求。 在大型互联网业务中,还会有各种定时事件触发场景,最典型的是交易超时关闭机制,阿里交易或者12306订票都有类似的机制。RocketMQ的定时消息能够很方便的满足这类诉求。 ② 一致性。 无论是阿里交易还是蚂蚁支付,都天然对数据一致性有着极高要求,RocketMQ在一致性方面也打造了多个关键特性。最具代表性的是分布式事务消息,RocketMQ是第一个实现该种特性的消息队列,能够保障交易的上下游对于订单状态达到最终一致。该方案也成为异步消息一致性方案的事实标准,被多个互联网公司所采纳,甚至也有公司将移植到定制版的Kafka种。除了分布式一致性之外,RocketMQ还提供了顺序消息的特性,满足顺序一致性的需求。 ③ 稳定性。 稳定性是交易与金融场景的基石特性,也是RocketMQ的根本。RocketMQ除了具备核心服务的HA之外,还具备了全局高可用能力,在阿里内部支持同城多活、异地多活、中心容灾等高阶HA能力。同时,稳定性也不局限于数据与服务的高可用,RocketMQ从产品层面对稳定性进行了全方位的建设,如消息轨迹、消息回溯、消息死信机制。 ④ 高性能。 在双十一的极限流量下,RocketMQ写消息延迟4个9在1ms内,100%在100ms内。RocketMQ采用sharednothing分布式架构,在吞吐量方面也具备无限扩展的能力,已经连续10年支持了双十一万亿级消息洪峰,为百万级的应用实例提供低延迟消息服务。互联网的故事还在进行,云计算规模化落地的时代悄然而来。 三、云计算时代的RocketMQ5.0 2015年,RocketMQ的首个云消息服务在阿里云上线,开启了大规模的云计算实践的序幕。同时RocketMQ也是业界第一个提供公有云服务的开源消息队列。 在大规模的云计算业务场景下,RocketMQ面临着全新的挑战与机遇。 多样性:它不再仅服务于某一家公司的内部业务,不再局限于互联网或金融企业,需要实现全行业、全场景的覆盖。 标准化:对于服务企业内部的自研消息队列而言,无需考虑协议或API的标准化。但是对于云消息服务而言,因为服务对象是外部企业客户,据信通院统计,80%以上的企业客户已经采纳开源技术和标准技术。因此,作为一款云消息服务,需要提供对业界的事实标准协议、接口、SDK的兼容,才能保证客户平滑上云,同时打消客户技术绑定的担忧。 云原生:云原生理念深入人心,消息队列要更好地帮助客户实现云原生应用架构,为业务降本提效。 新趋势:各种新技术的兴起,包括IoT、5G、边缘计算、事件驱动,还有事件流技术。面向技术的新趋势与多样化的业务需求,RocketMQ进行了自我进化,演进到5.0版本。 为了充分释放云的技术红利,RocketMQ5.0在技术架构上进行了云原生的演进。从客户端到服务端都进行了全方位的改造,更高弹性、可用性、更低成本。 客户端采用轻量SDK设计理念,将原来富客户端的逻辑下沉到Broker,满足现代化应用轻量化、Serverless的趋势。 Broker彻底进行弹性架构改造,分离RocketMQ Proxy与Store层,其中Proxy是完全无状态的计算节点,专注多协议、多领域场景覆盖,可以面向不同工作负载独立弹性,如物联网、微服务、大数据不同场景有不同的资源诉求。Store层则专注消息的高可用存储,包括副本复制、主备切换与云存储集成。同时对RocketMQ的Topic资源进行三层解耦,面向消息的Topic、面向流的Topic逻辑分片、面向底层存储的Topic物理分片,每一层都可以独立弹性。 在存储层引入了Leaderless的高可用架构,Store节点身份对等,Leaderless化,0外部依赖。多副本策略可定制,可用性+可靠性+成本灵活组合,面向多可用区、多region组建Geo高可用能力。 为了满足云时代多样化的用户需求,RocketMQ5.0从原来的互联网业务消息中间件扩展到"消息、事件、流"超融合处理平台,解锁更全面的能力。 在消息领域,全面拥抱云原生技术,更好的弹性架构与高可用能力。 在事件领域,支持CloudEvent规范,以事件为中心的产品新界面,助力客户建设跨业务、跨组织的数字化商业生态。 在流领域,流存储增强批量特性,大幅度提高数据吞吐量;新增逻辑队列能力,解耦逻辑资源与物理资源,在流场景也具备无缝伸缩能力;新增流数据库RSQLDB,提供实时事件流处理、流分析能力。 RocketMQ基于端云一体化架构实现了完整的物联网消息队列的能力,从原来的连接应用扩展到连接物联网设备。同时RocketMQ5.0也继续保持极简架构的原则,能够以最低的资源消耗、运维成本搭建服务,适合边缘计算。 除了的产品核心能力之外,RocketMQ5.0积极建设开源生态。 一方面是应用架构生态的建设,既有经典的开源项目、规范的集成,比如JMS、AMQP等,也有云原生技术生态的集成,比如CloudEvents、Dapr、Envoy。同时RocketMQ也会进一步发力数据架构生态,全链路集成大数据的摄入、数据存储、数据处理、数据分析组件,从离线大数据到实时大数据。 【活动】一键体验 RocketMQ 六大生产环境 免费试用+30秒一键体验,低门槛、快速、高效、易操作,带你了解“历经万亿级数据洪峰考验”的云消息队列RocketMQ! 点击阅读原文,立即参与活动!
作者:隆基
#技术探索 #云原生

2023年7月13日

RocketMQ 5.0 无状态实时性消费详解
背景 RocketMQ 5.0版本引入了Proxy模块、无状态pop消费机制和gRPC协议等创新功能,同时还推出了一种全新的客户端类型:SimpleConsumer。SimpleConsumer客户端采用了无状态的pop机制,彻底解决了在客户端发布消息、上下线时可能出现的负载均衡问题。然而,这种新机制也带来了一个新的挑战:当客户端数量较少且消息数量较少时,可能会出现消息消费延时的情况。。 在当前的消息产品中,消费普通使用了长轮询机制,即客户端向服务端发送一个超时时间相对较长的请求,该请求会一直挂起,除非队列中存在消息或该请求到达设定的长轮询时间。 然而,在引入Proxy之后,目前的长轮询机制出现了一个问题。客户端层面的长轮询和Proxy与Broker内部的长轮询之间互相耦合,也就是说,一次客户端对Proxy的长轮询只对应一次Proxy对Broker的长轮询。因此,在以下情况下会出现问题:当客户端数量较少且后端存在多个可用的Broker时,如果请求到达了没有消息的Broker,就会触发长轮询挂起逻辑。此时,即使另一台Broker存在消息,由于请求挂在了另一个Broker上,也无法拉取到消息。这导致客户端无法实时接收到消息,即false empty response。 这种情况可能导致以下现象:用户发送一条消息后,再次发起消费请求,但该请求却无法实时拉取到消息。这种情况对于消息传递的实时性和可靠性产生了不利影响。 AWS的文档里也有描述此等现象,他们的解决方案是通过查询是所有的后端服务,减少false empty response。 其他产品 在设计方案时,首先是需要目前存在的消息商业化产品是如何处理该问题的。 MNS采取了以下策略,主要是将长轮询时间切割为多个短轮询时间片,以尽可能覆盖所有的Broker。 首先,在长轮询时间内,会对后端的Broker进行多次请求。其次,当未超过短轮询配额时,优先使用短轮询消费请求来与Broker进行通信,否则将使用长轮询,其时间等于客户端的长轮询时间。此外,考虑到过多的短轮询可能会导致CPU和网络资源消耗过多的问题,因此在短轮询超过一定数量且剩余时间充足时,最后一次请求将转为长轮询。 然而,上述策略虽以尽可能轮询完所有的Broker为目标,但并不能解决所有问题。当轮询时间较短或Broker数量较多时,无法轮询完所有的Broker。即使时间足够充足的情况下,也有可能出现时间错位的情况,即在短轮询请求结束后,才有消息在该Broker上就绪,导致无法及时取回该消息。 解法 技术方案 首先,需要明确该问题的范围和条件。该问题只会在客户端数量较少且请求较少的情况下出现。当客户端数量较多且具备充足的请求能力时,该问题不会出现。因此,理想情况是设计一个自适应的方案,能够在客户端数量较多时不引入额外成本来解决该问题。 为了解决该问题,关键在于将前端的客户端长轮询和后端的Broker长轮询解耦,并赋予Proxy感知后端消息个数的能力,使其能够优先选择有消息的Broker,避免false empty response。 考虑到Pop消费本身的无状态属性,期望设计方案的逻辑与Pop一致,而不在代理中引入额外的状态来处理该问题。 另外,简洁性是非常重要的,因此期望该方案能够保持简单可靠,不引入过多的复杂性。 1. 为了解决该问题,本质上是要将前端的客户端长轮询和后端的Broker长轮询解耦开来,并赋予Proxy感知后端消息个数的能力,能够优先选择有消息的Broker,避免false empty response。 2. 由于Pop消费本身的无状态属性,因此期望该方案的设计逻辑和Pop一致,而不在Proxy引入额外的状态来处理这个事情。 3. Simplicity is ALL,因此期望这个方案简单可靠。 我们使用了NOTIFICATION,可以获取到后端是否有尚未消费的消息。拥有了上述后端消息情况的信息,就能够更加智能地指导Proxy侧的消息拉取。 通过重构NOTIFICATION,我们对其进行了一些改进,以更好地适应这个方案的要求。 pop with notify 一个客户端的请求可以被抽象为一个长轮询任务,该轮询任务由通知任务和请求任务组成。 通知任务的目的是获取Broker是否存在可消费的消息,对应的是Notification请求;而请求任务的目的是消费Broker上的消息,对应的是Pop请求。 首先,长轮询任务会执行一次Pop请求,以确保在消息积压的情况下能够高效处理。如果成功获取到消息,则会正常返回结果并结束任务。如果没有获取到消息,并且还有剩余的轮询时间,则会向每个Broker提交一个异步通知任务。 在任务通知返回时,如果不存在任何消息,长轮询任务将被标记为已完成状态。然而,如果相关的Broker存在消息,该结果将被添加到队列中,并且消费任务将被启动。该队列的目的在于缓存多个返回结果,以备将来的重试之需。对于单机代理而言,只要存在一个通知结果返回消息,Proxy即可进行消息拉取操作。然而,在实际的分布式环境中,可能会存在多个代理,因此即使通知结果返回消息存在,也不能保证客户端能够成功拉取消息。因此,该队列的设计旨在避免发生这种情况。 消费任务会从上述队列中获取结果,若无结果,则直接返回。这是因为只有在通知任务返回该Broker存在消息时,消费任务才会被触发。因此,若消费任务无法获取结果,可推断其他并发的消费任务已经处理了该消息。 消费任务从队列获取到结果后,会进行加锁,以确保一个长轮询任务只有一个正在进行的消费任务,以避免额外的未被处理的消息。 如果获取到消息或长轮询时间结束,该任务会被标记完成并返回结果。但如果没有获取到消息(可能是其他客户端的并发操作),则会继续发起该路由所对应的异步通知任务,并尝试进行消费。 自适应切换 考虑到当请求较多时,无需采用pop with notify机制,可使用原先的pop长轮询broker方案,但是需要考虑的是,如何在两者之间进行自适应切换。目前是基于当前Proxy统计的pop请求数做判断,当请求数少于某一值时,则认为当前请求较少,使用pop with notify;反之则使用pop长轮询。 由于上述方案基于的均为单机视角,因此当消费请求在proxy侧不均衡时,可能会导致判断条件结果有所偏差。 Metric 为了之后进一步调优长轮询和观察长轮询的效果,我们设计了一组metric指标,来记录并观测实时长轮询的表现和损耗。 1. 客户端发起的长轮询次数 (is_long_polling) 2. pop with notify次数 (通过现有rpc metric统计) 3. 首次pop请求命中消息次数 (未触发notify) (is_short_polling_hit) 总结 通过如上方案,我们成功设计了一套基于无状态消费方式的实时消费方案,在做到客户端无状态消费的同时,还能够避免false empty response,保证消费的实时性,同时,相较于原先PushConsumer的长轮询方案,能够大量减少用户侧无效请求数量,降低网络开销, 产品侧 需明确长轮询和短轮询的区分,可以参考AWS的定义,当轮询时间大于0时,长轮询生效。 且需明确一个长轮询最小时间,因为长轮询时间过小时无意义,AWS的最小值采取了1s,我们是否需要follow,还是采取一个更大的值。
#技术探索 #功能特性 #云原生

2023年4月11日

Apache RocketMQ 多级存储设计与实现
设计总览 RocketMQ 多级存储旨在不影响热数据读写的前提下将数据卸载到其他存储介质中,适用于两种场景: 1. 冷热数据分离:RocketMQ 新近产生的消息会缓存在 page cache 中,我们称之为热数据;当缓存超过了内存的容量就会有热数据被换出成为冷数据。如果有少许消费者尝试消费冷数据就会从硬盘中重新加载冷数据到 page cache,这会导致读写 IO 竞争并挤压 page cache 的空间。而将冷数据的读取链路切换为多级存储就可以避免这个问题; 2. 延长消息保留时间:将消息卸载到更大更便宜的存储介质中,可以用较低的成本实现更长的消息保存时间。同时多级存储支持为 topic 指定不同的消息保留时间,可以根据业务需要灵活配置消息 TTL。 RocketMQ 多级存储对比 Kafka 和 Pulsar 的实现最大的不同是我们使用准实时的方式上传消息,而不是等一个 CommitLog 写满后再上传,主要基于以下几点考虑: 1. 均摊成本:RocketMQ 多级存储需要将全局 CommitLog 转换为 topic 维度并重新构建消息索引,一次性处理整个 CommitLog 文件会带来性能毛刺; 2. 对小规格实例更友好:小规格实例往往配置较小的内存,这意味着热数据会更快换出成为冷数据,等待 CommitLog 写满再上传本身就有冷读风险。采取准实时上传的方式既能规避消息上传时的冷读风险,又能尽快使得冷数据可以从多级存储读取。 Quick Start 多级存储在设计上希望降低用户心智负担:用户无需变更客户端就能实现无感切换冷热数据读写链路,通过简单的修改服务端配置即可具备多级存储的能力,只需以下两步: 1. 修改 Broker 配置,指定使用 org.apache.rocketmq.tieredstore.TieredMessageStore 作为 messageStorePlugIn 2. 配置你想使用的储存介质,以卸载消息到其他硬盘为例:配置 tieredBackendServiceProvider 为 org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment,同时指定新储存的文件路径:tieredStoreFilepath 可选项:支持修改 tieredMetadataServiceProvider 切换元数据存储的实现,默认是基于 json 的文件存储 更多使用说明和配置项可以在 GitHub 上查看多级存储的 技术架构 architecture 接入层:TieredMessageStore/TieredDispatcher/TieredMessageFetcher 接入层实现 MessageStore 中的部分读写接口,并为他们增加了异步语意。TieredDispatcher 和 TieredMessageFetcher 分别实现了多级存储的上传/下载逻辑,相比于底层接口这里做了较多的性能优化:包括使用独立的线程池,避免慢 IO 阻塞访问热数据;使用预读缓存优化性能等。 容器层:TieredCommitLog/TieredConsumeQueue/TieredIndexFile/TieredFileQueue 容器层实现了和 DefaultMessageStore 类似的逻辑文件抽象,同样将文件划分为 CommitLog、ConsumeQueue、IndexFile,并且每种逻辑文件类型都通过 FileQueue 持有底层物理文件的引用。有所不同的是多级存储的 CommitLog 改为 queue 维度。 驱动层:TieredFileSegment 驱动层负责维护逻辑文件到物理文件的映射,通过实现 TieredStoreProvider 对接底层文件系统读写接口(Posix、S3、OSS、MinIO 等)。目前提供了 PosixFileSegment 的实现,可以将数据转移到其他硬盘或通过 fuse 挂载的对象存储上。 消息上传 RocketMQ 多级存储的消息上传是由 dispatch 机制触发的:初始化多级存储时会将 TieredDispatcher 注册为 CommitLog 的 dispacher。这样每当有消息发送到 Broker 会调用 TieredDispatcher 进行消息分发,TieredDispatcher 将该消息写入到 upload buffer 后立即返回成功。整个 dispatch 流程中不会有任何阻塞逻辑,确保不会影响本地 ConsumeQueue 的构建。 TieredDispatcher TieredDispatcher 写入 upload buffer 的内容仅为消息的引用,不会将消息的 body 读入内存。因为多级储存以 queue 维度构建 CommitLog,此时需要重新生成 commitLog offset 字段 upload buffer 触发 upload buffer 上传时读取到每条消息的 commitLog offset 字段时采用拼接的方式将新的 offset 嵌入到原消息中 上传进度控制 每个队列都会有两个关键位点控制上传进度: 1. dispatch offset:已经写入缓存但是未上传的消息位点 2. commit offset:已上传的消息位点 upload progress 类比消费者,dispatch offset 相当于拉取消息的位点,commit offset 相当于确认消费的位点。commit offset 到 dispatch offset 之间的部分相当于已拉取未消费的消息 消息读取 TieredMessageStore 实现了 MessageStore 中的消息读取相关接口,通过请求中的逻辑位点(queue offset)判断是否从多级存储中读取消息,根据配置(tieredStorageLevel)有四种策略: DISABLE:禁止从多级存储中读取消息; NOT_IN_DISK:不在 DefaultMessageStore 中的消息从多级存储中读取; NOT_IN_MEM:不在 page cache 中的消息即冷数据从多级存储读取; FORCE:强制所有消息从多级存储中读取,目前仅供测试使用。 ${a} 需要从多级存储中读取的消息会交由 TieredMessageFetcher 处理:首先校验参数是否合法,然后按照逻辑位点(queue offset)发起拉取请求。TieredConsumeQueue/TieredCommitLog 将逻辑位点换算为对应文件的物理位点从 TieredFileSegment 读取消息。 ${b} TieredFileSegment 维护每个储存在文件系统中的物理文件位点,并通过为不同存储介质实现的接口从中读取所需的数据。 ${c} 预读缓存 TieredMessageFetcher 读取消息时会预读一部分消息供下次使用,这些消息暂存在预读缓存中 ${d} 预读缓存的设计参考了 TCP Tahoe 拥塞控制算法,每次预读的消息量类似拥塞窗口采用加法增、乘法减的机制控制: 加法增:从最小窗口开始,每次增加等同于客户端 batchSize 的消息量。 乘法减:当缓存的消息超过了缓存过期时间仍未被全部拉取,在清理缓存的同时会将下次预读消息量减半。 预读缓存支持在读取消息量较大时分片并发请求,以取得更大带宽和更小的延迟。 某个 topic 消息的预读缓存由消费这个 topic 的所有 group 共享,缓存失效策略为: 1. 所有订阅这个 topic 的 group 都访问了缓存 2. 到达缓存过期时间 故障恢复 上文中我们介绍上传进度由 commit offset 和 dispatch offset 控制。多级存储会为每个 topic、queue、fileSegment 创建元数据并持久化这两种位点。当 Broker 重启后会从元数据中恢复,继续从 commit offset 开始上传消息,之前缓存的消息会重新上传并不会丢失。 开发计划 面向云原生的存储系统要最大化利用云上存储的价值,而对象存储正是云计算红利的体现。 RocketMQ 多级存储希望一方面利用对象存储低成本的优势延长消息存储时间、拓展数据的价值;另一方面利用其共享存储的特性在多副本架构中兼得成本和数据可靠性,以及未来向 Serverless 架构演进。 tag 过滤 多级存储拉取消息时没有计算消息的 tag 是否匹配,tag 过滤交给客户端处理。这样会带来额外的网络开销,计划后续在服务端增加 tag 过滤能力。 广播消费以及多个消费进度不同的消费者 预读缓存失效需要所有订阅这个 topic 的 group 都访问了缓存,这在多个 group 消费进度不一致的情况下很难触发,导致无用的消息在缓存中堆积。 需要计算出每个 group 的消费 qps 来估算某个 group 能否在缓存失效前用上缓存的消息。如果缓存的消息预期在失效前都不会被再次访问,那么它应该被立即过期。相应的对于广播消费,消息的过期策略应被优化为所有 Client 都读取这条消息后才失效。 和高可用架构的融合 目前主要面临以下三个问题: 1. 元数据同步:如何可靠的在多个节点间同步元数据,slave 晋升时如何校准和补全缺失的元数据; 2. 禁止上传超过 confirm offset 的消息:为了避免消息回退,上传的最大 offset 不能超过 confirm offset; 3. slave 晋升时快速启动多级存储:只有 master 节点具有写权限,在 slave 节点晋升后需要快速拉起多级存储断点续传。
作者:张森泽
#技术探索 #云原生

2023年1月13日

RocketMQ 集成生态再升级:轻松构建云上数据管道
阿里云消息队列 RocketMQ 版是阿里云基于 Apache RocketMQ 构建的低延迟、高并发、高可用、高可靠的分布式“消息、事件、流”统一处理平台,面向互联网分布式应用场景提供微服务异步解耦、流式数据处理、事件驱动处理等核心能力。其自诞生以来一直为阿里集团提供稳定可靠的消息服务,历经多年双十一万亿级流量洪峰的验证。 随着业务需求场景日渐丰富,在多年经验积累后,阿里云 RocketMQ 也迎来了革命性的更新,正式发布了阿里云消息队列 RocketMQ 版 5.0,在架构、网络、负载均衡、存储等诸多方面进行了显著优化。其定位不再局限于消息解耦场景,将全新布局事件驱动和消息流式处理场景。 阿里云 EventBridge 作为云上事件枢纽一直以来都保持着对云上事件、数据的友好生态支持。随着 RocketMQ 5.0版本的用户日渐增多,EventBridge 在近期对 RocketMQ Connector 进行了全面升级。升级之后的 RocketMQ Connector 不仅可以支持RocketMQ 5.0 版本,同时也能支持云上自建 RocketMQ 实例。除此之外,基于成熟的事件流能力,用户使用 EventBridge 也能轻松构建消息路由能力,实现对灾备、数据同步的需求。 本文将从业务架构和 API 使用等方面讲解如何使用 EventBridge 创建阿里云 RocketMQ 4.0、5.0 版本,开源自建版本以及消息路由的相关任务。 EventBridgeRocketMQ 4.0 业务架构 RocketMQ 4.0 版本使用较为经典的 clientnameserverbroker 架构,整个应用主要由生产者、消费者、NameServer 和 Broker 组成。 Name Server:是一个几乎无状态节点,可集群部署,在消息队列 RocketMQ 版中提供命名服务,更新和发现 Broker 服务。 Broker:消息中转角色,负责存储消息,转发消息。分为 Master Broker 和 Slave Broker,一个 Master Broker 可以对应多个 Slave Broker,但是一个 Slave Broker 只能对应一个 Master Broker。Broker 启动后需要完成一次将自己注册至 Name Server 的操作;随后每隔 30s 定期向 Name Server 上报 Topic 路由信息。 生产者:与 Name Server 集群中的其中一个节点(随机)建立长连接(Keepalive),定期从 Name Server 读取 Topic 路由信息,并向提供 Topic 服务的 Master Broker 建立长连接,且定时向 Master Broker 发送心跳。 消费者:与 Name Server 集群中的其中一个节点(随机)建立长连接,定期从  Name Server 拉取 Topic 路由信息,并向提供 Topic 服务的 Master Broker、Slave Broker 建立长连接,且定时向 Master Broker、Slave Broker 发送心跳。Consumer 既可以从 Master Broker 订阅消息,也可以从 Slave Broker 订阅消息,订阅规则由 Broker 配置决定。 EventBridge在获取用户授权之后,利用生成的 sts 临时授权对客户的  RocketMQ 实例进行消息读取或写入。 API 使用 在 API 介绍方面,我们以创建「自定义总线自定义事件源」为例,事件目标以及事件流中的API基本一致。 基于 EventBridge 创建 RocketMQ 4.0 任务的 API 和之前基本保持了一致。具体参数如下 版本:代表阿里云消息队列 RocketMQ 版本,可选择 4.x 或 5.x; RocketMQ 实例:RocketMQ 对应的实例 ID。用户在阿里云 RocketMQ控制台每创建一个实例都会有一个对应的实例 ID,如MQ_INST_123456789_BX6zY7ah; Topic:RocketMQ Topic。选择此 topic 作为事件源的读取对象或者事件目标的写入对象; Tag:RocketMQ 消费 Tag,用于消费者过滤消息使用; Group ID:RocketMQ 消费组,标识一组特定的消费者,仅事件源有此参数; 消费位点:初始消费位点。可选择最新位点、最早位点、或者指定时间戳。 EventBridgeRocketMQ 5.0 业务架构 RocketMQ 5.0 版将通用的存储逻辑下沉,集中解决消息存储的多副本、低延迟、海量队列分区等技术问题,将上层的消息处理剥离出完全的无状态计算层,主要完成协议适配、权限管理、消费状态、可观测运维体系支持,Broker 则继续专注于存储能力的持续优化。存算分离的架构设计,使得从 SDK 接入到线上运维全链路带来全面提升: 1. 轻量版 SDK 的开放和全链路可观测系统的提升:同时支持 4.x 通信协议和全新的 gRPC 通信协议,并内置 OpenTelemetry 埋点支持,新版本 SDK 新增了 10 余个指标埋点。 2. 消息级负载均衡:新版本 SDK 不再参与实际存储队列的负载均衡,消息负载均衡将更加轻量,以单条消息为调度最小单元。 3. 多网络访问支持:新版本支持单一实例同时暴露公网、内网等访问形式,方便客户多网络接入访问。 4. 海量分级存储:新版本开放分级存储历史消息保存能力,消息低成本无大小限制,最长保存 30 天。冷热数据进行分离设计,极大降低消费历史消息对实例的性能影响。 RocketMQ 5.0 版本 可以支持 VPC 内部安全识别,用户上云无需修改代码。在用户授予 EventBridge 网络和 RocketMQ 相关权限之后,用户在 EventBridge 创建 MQ 5.0 Source&Sink 任务的时,EventBridge 会根据 RocketMQ 5.0 实例的 VPC 信息,调用网络组件获取相应代理信息。MQ sdk 侧通过配置代理实现消息的收发。 API 使用 相比于 4.0 实例,5.0 实例多了 VPC、交换机和安全组 3 个参数。 5.0 实例新增了 VPC 属性,用户需要在对应 vpc 内去访问 MQ 5.0 实例。EventBridge 在获得用户授权之后,也是经由 5.0 实例对应的 VPC 内进行消息的收发。创建任务时前端会自动填充好实例的 vpc 和交换机信息。 安全组参数限制了 EventBridge 在 vpc 内的访问策略,用户可以选择使用已有安全组也可以选择快速创建,让 EventBridge 快速创建一个安全组供任务使用。安全组策略推荐使用默认的安全组策略。使用上推荐第一次在此vpc内创建任务时,使用 EventBridge 自动创建一个安全组,后续在此 VPC 内再创建其他任务时,在使用已有中选择 EventBridge 创建的安全组。 EventBridge自建 Apache RocketMQ 针对用户在阿里云自建 Apache RocketMQ 集群的场景,EventBridge 也支持了消息导出能力。用户通过配置接入点、topic、groupID、VPC 等信息,即可将自建集群中的消息导入 EventBridge,进而对接 EventBridge 目前支持的大量下游生态。 业务架构 抽象来看,EventBridge 访问自建 MQ 实例的链路和阿里云 5.0 版本基本一致,都是从用户 vpc 发起对 MQ 实例的访问。区别在于接入点的不同,前者是用户自建 MQ 集群的nameserver,而后者为阿里云 RocketMQ 提供的接入点,不需要感知真实的 MQ 集群是部署在用户 vpc 还是阿里云 RocketMQ 自身的生产环境。 API 使用 在 API 使用方面,自建集群的大部分参数需要用户手动填入。 接入点:nameserver 地址。后续会支持 proxy 地址; Topic:RocketMQ Topic。选择此 topic 作为事件源的读取对象或者事件目标的写入对象; Tag:RocketMQ 消费 Tag,用于消费者过滤消息使用; Group ID:RocketMQ 消费组,标识一组特定的消费者,仅事件源有此参数; FilterType:过滤模式,目前支持 Tag 过滤; 认证模式:如果开启 ACL 鉴权,可在此配置鉴权信息; 消费位点:初始消费位点; VPC:自建 MQ 集群对应的 VPC 参数信息; 交换机:自建 MQ 集群对应的交换机信息; 安全组:EventBridge使用此安全组访问用户自建 MQ 集群,安全组规定了 EventBridge 在此 vpc 内的访问策略。 RocketMQ 消息路由 当用户有灾备或者消息同步的需求时,可能就会需要消息路由能力,即将 A region 下某实例 topic 的消息同步到 B region 的某 topic 中。 对于 EventBridge 而言,消息路由并非单独的一个产品能力,用户通过使用事件流即可实现消息路由。 针对非跨境场景的消息路由,如从北京同步消息到上海,跨 region 网络打通能力由 EventBridge 来实现,用户无需关注过多实现细节。 针对跨境场景,如北京同步消息到新加坡,EventBridge 使用的是公网链路完成对目标实例的写入,使用的是目标 MQ 实例的公网接入点。消息出公网的能力需要用户提供,即需要用户提供 VPC、交换机和安全组配置,此VPC须带有NAT等访问公网能力, EventBridge 使用此 VPC 实现写入目标端公网接入点。 在 API 使用方面,创建消息路由任务本质上是创建事件流,API 参数和上面各类型 RocketMQ 实例任务一致,这里以创建一个青岛到呼和浩特的 RocketMQ 消息路由为例。 1.进入 EventBridge 控制台,regionBar 选择到呼和浩特,点击左侧“事件流”,然后选择“创建事件流”。 2.在事件源页面,事件提供方选择“消息队列 RocketMQ 版”,地域选择青岛,剩余 RocketMQ 相关参数按需求选择。 3.规则页面按需填写,这里选择默认内容。 4.在“目标”页面,服务类型选择“消息队列 RocketMQ 版”,剩余参数按需填写。 5.点击“创建”,等待事件流任务启动即可。 总结 本文介绍了 EventBridge 对接各类型 RocketMQ 实例的基本原理与对应的 API 使用说明,便于已经使用了 RocketMQ 5.0 版本和自建 MQ 实例的用户可以借助 EventBridge 的能力实现事件驱动业务架构的搭建。同时针对灾备和业务消息同步的场景,本文也基于事件流讲解了如何基于 EventBridge 创建 RocketMQ 消息路由任务。
作者:昶风
#技术探索 #生态集成