2022年12月27日

RocketMQ 多语言 SDK 开源贡献召集令
目前 Apache RocketMQ 5.0 SDK [1]正在社区开源,开发与迭代也在火热进行中,欢迎广大社区的朋友们能够参与其中。我们欢迎任何形式的贡献,包括但不限于新 feature、bugfix、代码优化、生态集成、测试工作、文档撰写。更加欢迎能够认领一个完整的特定语言实现的同学,踏出第一步,你就是 contributor!更有惊喜礼品和成为 committer 的机会等着你! 写在前面 Apache RocketMQ 是由阿里巴巴集团捐赠给 Apache 开源基金会的一款低延迟、高并发、高可用、高可靠的分布式消息中间件,并于 2017 年正式从 Apache 社区毕业,成为 Apache 顶级项目(TLP)。也是国内首个非 Hadoop 生态体系的互联网中间件顶级项目。 面向过去,RocketMQ 经过多年淘宝双十一的洗礼和考验,赢得了诸多客户的认可和青睐。面向未来,RocketMQ 历久弥新,为了更好地迎接云原生时代的来临,基于存算分离架构的 RocketMQ 5.0 应运而生。 RocketMQ 5.0 中引入了全新的无状态 Proxy 组件,在水平拓展,故障应急,多协议等方面都进行了诸多支持与改进(关于 RocketMQ 5.0 的详细介绍,欢迎关注 Rocketmq 官网[2])。同时也为接下来多语言客户端的实现打下了良好基础。 新的多语言SDK RocketMQ 5.0 客户端相比较于 4.x 的版本进行了诸多改进,会是未来社区客户端演进的主流方向。RocketMQ 4.x SDK 的多语言支持并不完美,协议的较高复杂度和语言绑定的实现细节使得多语言的支持与维护都变得棘手,而用户对多语言的诉求是强烈的。值此契机,RocketMQ 5.0 基于 gRPC 正式推出了全新的多语言 SDK。 相比较于 RocketMQ 4.x 的 SDK。RocketMQ 5.0 展现出了一副全新的面貌: 采用全新极简的,immutable 的 API 设计,使得 API 上手更简单,跨语言的对齐也变得更加简单; 完善的错误处理体系和错误码设计,开发者和用户对错误的处理可以更加得心应手; 在 PushConsumer/PullConsumer 之外新推出无状态 SimpleConsumer,实现逻辑轻量,用户可以自行管理消费侧消息的接收与应答,同时也对有更多定制化需求的客户提供了便利。 实现轻量化,代码量相比较旧有实现缩减 3/4 以上,开发和维护的成本更低; 标准化的 logging/tracing/metrics 输出,降低实现复杂度的同时,可观测性的提升会使得生产环境下的问题更容易被捕捉; gRPC 多语言特性,为 RocketMQ 5.0 客户端的多语言实现提供了支撑。RocketMQ 全新的客户端的协议层被替换,语言无关的 IDL 使得协议的维护和实现都更为极为简单。同时得益于 gRPC 强大的生态体系,使得 RocketMQ 与周边的集成也变得更为简便。 RocketMQ 5.0 中引入了新的的 pop 消费,创造性地在原生的队列模型之上支持了这种无状态的消费模式。不同于原始的更适用于流场景的队列模型,pop 机制更面向于业务消息的场景,使得开发者和用户可以只关心消息本身,可以通过「SimpleConsumer」提供单条消息级别的接受/重试/修改不可见时间以及删除等 API 能力。 Roadmap 目前 5.0 多语言 SDK 的 Java/C++ 已经有了相对比较完整的实现。 Go/C 已经提供了基础的 Producer/SimpleConsumer 的实现,其余的语言实现(PHP/Python/JavaScript/Rust 等)还在社区进行中,欢迎大家广泛参与。 对于一个从零开始的特定语言实现,一个大概的步骤如下: 部署 rocketmqnamesrv[3]和 rocketmqproxy[4] 方便与客户端进行调试,为降低部署成本,rocketmqproxy 可以采用 LOCAL 模式进行部署。 熟悉 rocketmqapis中的 IDL,适配新的 gRPC/Protobuf 协议:IDL 中描述了 5.0 SDK 中的语言无关的协议描述,通过 gRPC protoc 工具自动生成协议层代码。 应用新的 API 规范和设计:可以参考 Java 的 API 设计[5],总体指导思想是不可变性且行为明确。 实现 Producer/SimpleConsumer:Producer 提供最基本的四种不同类型消息的发送功能(普通/顺序/定时/事务),SimpleConsumer 提供基于 pop 语义的无状态消息接受/重试/修改不可见时间等能力。 统一的错误处理体系:由服务端产生的异常与错误均有完善的异常错误码和异常信息,各个语言实现需要以最适合的方式暴露给客户。 实现 PushConsumer:RocketMQ 4.x 中最为常用的消费者类型,用户侧只需要明确订阅关系和定义消息监听器行为即可,客户端实现中需要自动帮用户从远端获取消息。 客户端全方位可观测性:规范的日志输出,实现基于 OpenTelemetry/OpenCensus 的客户端 metrics 体系。 按照以上流程开发者在开发过程中出现的任何问题,都欢迎以 issue/pull request 的形式反馈到社区。 如何参与贡献 我们欢迎任何形式的贡献,包括且不限于新 feature、bugfix、代码优化、生态集成、测试工作、文档撰写。更加欢迎能够认领一个完整的特定语言实现的同学!不要犹豫,欢迎大家以 issue/pull request 的形式将你的想法反馈到社区,一起来建设更好的 RocketMQ! 相关资料 rocketmqclients: RocketMQ 5.0 多语言客户端实现 rocketmq: RocketMQ 主仓库(内置 5.0 proxy 实现) rocketmqapis: RocketMQ 5.0 协议具体定义 《RIP37: RocketMQ 全新统一 API 设计》 《RIP39: RocketMQ gRPC 协议支持》 相关链接 [1] Apache RocketMQ 5.0 SDK [2] rocketmq 官网 [3] rocketmqnamesrv [4] rocketmqproxy [5] Java 的 API 设计
作者:艾阳坤
#社区动态

2022年12月21日

消息收发弹性——生产集群如何解决大促场景消息收发的弹性&降本诉求
产品介绍—什么是消息收发弹性 大家好,我是来自阿里云云原生消息团队的赖福智,花名宸罡,今天来给大家分享下阿里云 RocketMQ5.0 实例的消息弹性收发功能,并且通过该功能生产集群是如果解决大促场景消息收发的弹性以及降本诉求的。 阿里云弹性策略 本次将会从产品介绍,产品使用及限制,使用方式及演示三个方面来介绍。在介绍 Rocketmq5.0 实例的消息首发弹性之前,先从整体上看下阿里云的弹性策略。我们通常认为业务方往往存在预期外的突发业务热点和毛刺流量,常规扩容无法及时应对,这样一来服务会有不确定性的风险。因此为了应对突发流量,我们设计了一套处理机制,最基本的是要满足规格内的预期流量,然后是应对弹性区间内的突发流量可以随时开启的弹性能力,最后是要有对完全超过弹性上限流量的限流限流能力。针对弹性区间的突发流量,传统自建集群通过常规扩容方式应对,需要分钟级的处理时间,在这段时间内业务会受损,并且为了这部分偶尔的突发流量扩容到一个较大的规格并不划算。云上5.0实例的消息收发弹性能力对弹性区间内的突发流量可以做到秒级响应,针对大促这种预期内的短期突发流量可以按量收费更加实惠,仅当用户真正用到这部分弹性能力才收费。 消息收发弹性简介 接下来我们就看具体看下 5.0 实例的消息收发弹性,消息收发弹性最直观的感受就是在 5.0 实例的详情页面的自适应弹性 TPS 这部分,可以看到在正常消息收发 TPS 的旁边额外有一个自适应弹性 TPS。通过这部分弹性 TPS 的设置,用户可以快速、低成本的应对大促这种短时间突发流量的场景。 这时可能有小伙伴会问为什么我不直接升级规格提高标准收发 TPS,而是使用弹性 TPS 呢?让我们假设一个典型的大促场景,比如在今晚 0 点有大促活动,使用消息弹性功能的用户完全可以提前几天就把弹性功能打开,大促结束等流量恢复后再把弹性功能关闭,实际上不关闭也不会有什么问题,不使用则不收费。 如果通过升级规格来提升标准 TPS 应对大促流量,用户同样是提前几天就把规格升高了,那么在大促前这几天按照高规格收费但实际又跑不到高规格的 TPS,实际上花了更多的钱但是确造成了资源的浪费。如果用户为了避免资源浪费在大促当天 0 点前升级规格,一个是需要用户付出额外的精力来关注 RocketMQ 按时升配,再就是实例的升配是一个重资源操作,扩容耗时长,无法做到即开即用秒级生效,很有可能已经到 0 点了但是升配还没有完成。 使用消息弹性功能的话可以做到秒级生效开箱即用,并且如果没有使用到这部分额外的弹性 TPS 是不会收费的。但是弹性 TPS 也不是个解决问题的万能银弹,也是有上限的,基本上可以在规格标准 TPS 的基础上额外有一半的弹性 TPS,如果标准 TPS+ 弹性 TPS 仍然无法满足用户业务需求,此时意味着仅扩容弹性节点已经无法满足需求,同时需要扩容存储节点,所以需要升配规格,这部分的原理后面会详细解释。 也有用户会问,如果我的日常 TPS 在 2500 左右,可不可以购买一个 2000 标准 TPS 的实例并且一直开着 1000 的弹性 TPS 满足需求呢?这种情况我们建议直接使用标准 TPS 大于 2500 的实例,因为弹性 TPS 这部分的使用会额外计费,如果一天 24 小时都在大量使用弹性 TPS,从计费上来说直接使用更高规格的实例更实惠。 5.0 实例消息收发弹性的实现方式,和传统自建方式的对比 接下来我们看下阿里云 RocketMQ5.0 实例是怎么实现消息收发弹性的,并且在扩容场景和自建 RocketMQ 有什么优势。传统自建 RocketMQ 集群如左图显示,是一个存储计算不分离的架构,这种架构下 Broker 是一个很重的组件,因为它同时需要处理客户端的请求,也要负责数据的读取写入,Broker 同时负责计算和存储。作为一个有状态的节点,Broker 扩容是一个很重的操作,时间会很慢,而且在很多时候我们并不需要扩容存储能力,仅仅需要应对高 TPS 请求的计算能力,此时随着 Broker 扩容的存储扩容实际上被浪费了。 再来看下 RocketMQ5.0 实例消息收发弹性是怎么做的,首先 5.0 实例的架构是存储计算分离的模式,用户的客户端仅会请求计算层的计算节点,计算节点操作存储节点读写消息,客户端并不会直接访问存储节点。开启消息收发弹性功能意味着开启了计算层的弹性能力。得益于这种存储计算分离的架构,可以让我们快速低成本的扩容计算层节点,计算层节点作为无状态应用可以做到秒级扩容,十分便捷。而且在云厂商拥有大量资源池的前提下可以做到资源的弹性扩容。可以说 RocketMQ5.0 实例的消息收发弹性能力依赖于阿里云作为云厂商的弹性能力和存算分离的技术方案得以实现。 在大促这种短时间大流量的场景下,大部分都是不需要扩容存储节点的,此时就可以通过开通消息收发弹性的能力满足需求。 产品使用及限制:消息收发弹性的使用及限制 支持版本 消息收发弹性的功能仅在专业版和铂金版支持,标准版实例不支持,并且专业版的单机规格作为给用户使用的测试版本也不支持。 弹性上限 不同规格实例的弹性 TPS 上限不同,基本上在标准 TPS 的基础上额外有一半的弹性 TPS,下图所示为专业版的弹性 TPS 上限。受篇幅所限,其他规格的弹性上限可以参考官方文档,就不再列出了。 计费方式 弹性 TPS 是额外计费的,计费周期按小时计费,不足 1 小时,按 1 小时计算。计费方式为超过限制的 TPS× 使用时长(小时)× 弹性规格单价(元/TPS/小时)弹性规格单价如下图,不同地域的单价会有略微差异。 SLA 可能有小伙伴会担心使用到这部分额外的弹性 TPS 会不会有问题,毕竟这部分是在标准 TPS 之上额外的能力,有一种自己实例超负荷运转的感觉。这个是完全不用担心的,不同规格的弹性上限已经经过压测验证,和规格标准 TPS 享受一样的稳定性 SLA 保证。 使用方式及演示:结合业务场景的最佳使用方式 开启方式、生效时间、收发比例 最后我们来实际操作下开启弹性收发能力并且验证该功能。RocketMQ5.0实例依然支持使用 RocketMQ4.0 实例的 1.x 客户端访问,所以这里分别提供了 1.x 客户端和 5.x 客户端的测试代码实例。 该程序开启了 200 个线程的线程池通过 ratelimiter 根据输入参数设置每秒最大的发送消息条数,打印失败的原因,并且每秒统计成功发送的消息量 在这里我已经提前购买好了一个专业版的实例,默认是不会开启消息收发弹性能力的。我们可以点击这里的开启弹性按钮进入实例修改页面开启弹性功能。这里要注意的是开启之后的弹性 TPS 依然受实例整体的消息收发占比设置,用户可以根据自己的消息收发场景设置该比例。 再开启之前我们来尝试下每秒发送 2300 个消息会怎么样,可以看到已经被限流了,并且每秒成功发送的量要比 2000 多一些。接着我们将弹性开启,并且将默认的收发比 1:1 改为 4:5,这是修改后的实例状态,现在让我们继续每秒发送 2300 个消息来验证下,可以看到已经都成功发送了。
作者:宸罡
#技术探索

2022年12月14日

事件总线 + 函数计算构建云上最佳事件驱动架构应用
距离阿里云事件总线(EventBridge)和 Serverless 函数计算(Function Compute,FC)宣布全面深度集成已经过去一年,站在系统元数据互通,产品深度集成的肩膀上,这一年我们又走过了哪些历程?从事件总线到事件流,从基于 CloudEvents 的事件总线触发到更具个性化的事件流触发,函数计算已成为事件总线生态不可或缺的重要组成部分,承载了 EventBridge 系统架构中越来越多的角色,事件流基础架构的函数 Transform,基于函数计算的多种下游 Sink Connector 投递目标支持,函数作为 EventBridge 端点 API Destination;基于事件总线统一,标准的事件通道能力,和基于函数计算敏捷、轻量、弹性的计算能力,我们将又一次起航探索云上事件驱动架构的最佳实践。 今天的主题围绕事件总线+函数计算,构建云上最佳事件驱动架构应用。希望通过今天的分享,能够帮助大家深入理解 Serverless 函数计算、EventBridge 事件总线对于构建云上事件驱动架构应用的价值和背后的逻辑、 为什么函数计算是云上事件驱动服务最佳实践?为什么我们如此需要事件总线服务?伴随着这些谜题的解开,最后,让我们一起了解应用于实际生产的一些 Serverless 事件驱动客户案例。 事件驱动架构的本质 Back to the Nature of EventDriven 大家可能会疑惑,事件驱动家喻户晓,为什么我们又要重新讨论事件驱动呢?我想这也正是我们需要讨论它的原因,回归本质,重新起航;事件驱动可能是一个比较宽泛的概念,而本文聚焦事件驱动架构的讨论,事件驱动架构作为一种软件设计模式,的确不是一个新的概念,伴随着计算机软件架构的演进,它已经存在了一段很久的时间,大家对它的讨论也从未停止过,当我们需要重新讨论一个已经存在的概念的时候,我想我们有必要重新回到它最开始的定义,一起探索那些本质的东西,重新认识它。 上面的这些内容是我从相关的一些资料上摘录的关于事件驱动的一些描述,“abstract”,“simple”,“asynchronous”,“messagedriven”这些具有代表性的词汇很好的给予事件驱动一个宏观的描述;从事件驱动的抽象概念,到它简洁的架构,以及事件驱动架构要达成的目的,和它在实际的系统架构中所展现的形态。 事件驱动架构基本概念及形态 在了解了关于事件驱动架构的一些基本描述之后,我们需要进一步明确事件驱动架构所涉及的一些基本概念和架构形态。根据维基百科描述,事件驱动架构涉及的核心概念如下所示: 围绕事件的流转,根据事件驱动架构的概念和基本形态,主要涉及以下四个核心部分: Event Producer:负责产生事件,并将产生的事件投递到事件通道; Event Channel:负责接收事件,并将接收的事件持久化存储,投递给订阅该事件的后端处理引擎; Event Processing Engine:负责对于订阅的事件做出响应和处理,根据事件更新系统状态; Downstream eventdriven activity:事件处理完成之后,对于事件处理响应的一种展示; 事件驱动架构要达成的目的 了解了事件驱动架构的基本形态,架构中事件通道的引入,解耦了事件生产和事件处理这两个最基本的系统角色,那么这样的架构模型所要达成的最终目的到底是什么? 系统架构松耦合 事件生产者与事件订阅者在逻辑上是分开的。事件的生成与使用的分离意味着服务具有互操作性,但可以独立扩缩、更新和部署。 只面向事件的松散耦合可以减少系统依赖项,并允许您以不同的语言和框架实现服务。您无需更改任何一个服务的逻辑,即可添加或移除事件生成方和接收方。您无需编写自定义代码来轮询、过滤和路由事件。 系统的可伸缩性 基于事件驱动架构的松耦合特性,意味着可以独立对事件生产者,事件通道服务,以及事件处理引擎进行独立的扩缩容;尤其对于后端事件处理引擎,可以根据消息处理响应 SLA 和后端资源供给进行弹性扩缩容;同时可以基于事件粒度构建不同规格的后端处理服务,实现更细粒度的系统弹性伸缩。 系统的可扩展性 系统的可扩展性,主要表现在当系统需要增加新的功能,如何快速的基于现有系统架构快速构建支持新的业务逻辑,在事件驱动架构应用中,围绕事件粒度的处理模式,能够天然快速支持增加新的基于事件的数据流抽象;当系统中增加新的事件类型的时候,无需调整变更发布整个系统,只需要关注需要订阅的事件进行事件处理逻辑的开发和部署即可,也可以基于原来的系统做很少的代码变更即可实现,也可以在业务初期通过独立的服务定于指定事件完成特定的业务逻辑支持。 为什么函数计算是云上事件驱动服务最佳实践? 在讨论完事件驱动架构基本模型之后,我想关于事件驱动的概念,形态我们有了统一的认识和理解,接下来我们进入议题的第二个部分,为什么函数计算是云上事件驱动服务最佳实践? 函数计算简介 函数计算是一款基于事件驱动的全托管计算服务,相关的产品细节可以见官网介绍。作为一款通用的事件驱动型计算服务,接下来我会从三个方面进行详细的介绍。 编程范式 使用函数计算,用户无需采购与管理服务器等基础设施,只需编写并上传代码。函数计算为你准备好计算资源,弹性地、可靠地运行任务,并提供日志查询、性能监控和报警等开箱即用功能,编程范式带来开发的敏捷性。按照函数粒度进行独立的功能单元开发,快速调试,快速的部署上线,省去了大量资源购买,环境搭建的运维工作;同时函数计算是一个事件驱动的模型,事件驱动,意味着用户不需要关注服务产品数据传递的问题,省去了在编写代码中涉及的大量服务访问连接的逻辑;“事件驱动” + “函数粒度开发” + “免服务器运维”等几个维度特征帮助函数计算支撑“聚焦业务逻辑敏捷开发”的底层逻辑。 计算模型 除了开发模式带来的研发效能提升之外,函数计算提供非常细粒度的计算资源和毫秒级计费模型,支撑按需计算,按量收费;能够支持按用户的请求,根据用户流量的模型为计算付费;当然按用户请求付费存在技术上巨大的挑战,要求函数计算实例的启动小于用户的 RT 要求,冷启动性能尤为重要,这时候极致弹性成为了 Serverless 按需付费,业务降本的底层技术支撑。函数计算通过“极致弹性” + “按需付费”的模型帮助 Serverless 函数计算实现真正的按需计算逻辑。 事件驱动 在基于云的开发环境,云产品承载的服务相对内聚,各自扮演着分布式系统架构中的各个重要角色,云产品之间的事件触发机制能够帮助客户更好的基于多个云产品构建自己的业务系统;否则在云产品之间 Watch 事件是非常复杂,开发代价非常昂贵的一件事;除了产品连接带来的开发效率之外,当用户订阅某个事件,并提供处理逻辑的时候,客户已经潜在的过滤掉了不需要处理的事件请求,事件驱动意味着每一次的事件触发请求都是一次完全有效的计算。 函数计算对于事件驱动架构的价值 为什么函数计算是云上最佳的事件驱动架构服务?函数计算对于事件驱动架构的核心价值到底是什么?事件驱动架构一直存在,在没有函数计算的时候,同样也有事件驱动架构,在微服务的时候也同样有事件驱动架构。如今,当我们重新再来讨论事件驱动架构的时候,到底是什么发生了变化,有哪些本质的区别?在整个事件驱动架构中,函数计算最大的价值在于帮助构建 “Event Processing Engine” 这个角色,我想主要是以下两个方面发生了本质的变化: 系统可扩展性价值 开发模式发生了本质的变化:函数计算提供的框架能力及编程模型,最大化的消除了客户业务逻辑之外的处理内容,极大的加速了客户业务开发,同时基于这样这样的开发模式,用户对于新增事件处理逻辑能够在最短的时间完成处理并上线,细粒度,专注业务的敏捷开发模式能够加速业务快速上线。 系统可伸缩性价值 计算模式发生了本质的变化:基于事件驱动架构事件粒度的处理逻辑和函数计算更细粒度力度计算弹性能力,能够从多个维度实现 “Event Processing Engine” 组件的弹性能力, 这我想这也是函数计算对于事件驱动架构的一个最核心价值。 为什么我们如此需要事件总线服务? 构建云上事件驱动架构挑战 函数计算以其轻量,快捷,能够利用事件驱动的方式与其他云产品进行联动的特点, 成为很多客户利用事件驱动架构构建业务系统的首选,随着业务及客户需求的不断增加,客户对于函数计算和更多云产品及服务的连接需求变得越来越多,同时对于其他云产品的客户而言, 也希望能够利用 Serverless 函数计算的特点帮助处理一些系统任务和事件。 尽管函数计算和云上的众多云产品进行了集成,提供了一些开箱即用的事件触发能力,那么我们为什么还需要事件总线服务来构建事件驱动应用架构呢?围绕函数计算构建事件驱动架构生态的过程中,我们面临主要来自三个方面的挑战。面对这些挑战,基于函数计算和事件总线帮助云上客户构建完备的事件驱动架构生态迫在眉睫。 事件源多样性挑战 事件驱动作为函数计算产品核心竞争力,打通函数计算和其它云产品,以及用户自定义应用,SaaS 服务的连通成为函数计算生态集成的迫切需求,但系统集成,生态建设从来都不是一件容易的事情。函数计算系统在和 EventBridge 集成之前,已经和 OSS,SLS 等用户典型场景的云产品进行了集成,也和阿里云的其它大概十多款产品进行了集成,不同系统具有不同的事件格式,不同系统的注册通知机制也各不相同,以及上游不同系统的失败处理机制也各不相同;部分系统支持同步的调用方式,部分系统支持异步的调用方式,调用方式的差异主要取决于上游系统在接入函数计算的时候当时面临的产品业务场景,对于新的产品能力和业务场景的扩展支持,在当时并未有太多的考虑。随着和更多云产品的集成,集成的投入,集成的困难度和底层数据管理难度越来越大。面对多种事件源集成的客观困难,函数计算急需提高和其他云产品的集成效率。 授权复杂及安全隐患 除此之外, 函数计算希望提升用户体验,保证用户只关心事件的处理;同时希望能够在面对大量的云产品时保证系统授权层面的复杂度。用户在使用事件触发的时候, 需要了解不同产品接入函数计算的权限要求,针对不同的产品需要提供不同的授权策略,对于客户使用函数计算带来了非常大的困难,为了加速产品接入,大量用户经常使用FullAcees权限,造成较大产品安全隐患, 和其它云产品的集成急需统一的授权界面,统一用户体验。 通用能力难以沉淀 面对上游不同的事件源, 如何更好的投递事件、更好的消费事件?如何进行事件的错误处理?函数计算调用方式如何选择?以及函数计算后端错误 Backpressure 能力的反馈、重试策略和上游系统参数设置、触发器数量的限制等问题获成为函数计算事件触发不得不面对的问题。为了更好的服务客户,提供可靠的消费处理能力,函数计算希望能够有一个统一的接入层,基于统一的接入层进行消费能力和流控能力的建设。通过沉淀在这样一个标准的层面,在保证调用灵活性的同时,提供可靠的服务质量。 事件总线简介 阿里云事件总线(EventBridge) 是一种无服务器事件总线,支持将用户的应用程序、第三方软件即服务 (SaaS)数据和阿里云服务的数据通过事件的方式轻松的连接到一起,这里汇聚了来自云产品及 SaaS 服务的丰富事件; 总线模式(EventBus) 从整个架构来看,EventBridge 通过事件总线,事件规则将事件源和事件目标进行连接。首先,让我们快速普及下 EventBridge 架构中涉及的几个核心概念: 事件:状态变化的记录; 事件源:事件的来源,事件的产生者,产生事件的系统和服务, 事件源生产事件并将其发布到事件总线; 事件总线:负责接收来自事件源的事件;EventBridge 支持两种类型的事件总线: 云服务专用事件总线:无需创建且不可修改的内置事件总线,用于接收您的阿里云官方事件源的事件。 自定义事件总线:标准存储态总线,用于接收自定义应用或存量消息数据的事件,一般事件驱动可选该总线。 事件规则:用于过滤,转化事件,帮助更好的投递事件; 事件目标:事件的消费者,负责具体事件的处理。 通过上面的流程,完成了事件的产生,事件的投递,事件的处理整个过程。当然事件并不是一个新的概念,事件驱动架构也不是一个新的概念,事件在我们的系统中无处不在,事件驱动架构同样伴随着整个计算机的架构演进,不断地被讨论。对于 EventBridge,采用云原生事件标准 CloudEvents 来描述事件;带来事件的标准化,这样的标准化和事件标准的开放性带来一个最显著的优势:接入的标准化,无论是对于事件源还是事件目标。 事件流模式(EventStreaming) 消息产品凭借其异步解耦、削峰填谷的特点,成为了互联网分布式架构的必要组成部分,Serverless 函数计算有着与其完全吻合的应用场景,针对消息产品生态集成,函数计算在架构层面做了专门的建设,基于 EventBridge 产品提供的 EventStreaming 通道能力建设了通用的消息消费服务 Poller Service,基于该架构对用户提供了 RocketMQ,Kafka,RabbitMQ,MNS 等多个消息类型触发能力。 将消费的逻辑服务化,从业务逻辑中剥离由平台提供,消费逻辑和处理逻辑的分离,将传统架构的消息拉模型转化成 Serverless 化的事件驱动推模型,能够支撑由函数计算承载消息处理的计算逻辑 ,实现消息处理的 Serverless 化。基于这样的架构,能够帮助客户解决消息客户端的集成连接问题,简化消息处理逻辑的实现,同时对于波峰波谷的业务模型,结合函数计算提供细粒度的计算弹性能力,能够实现资源的动态扩容,降低用户成本。 事件总线对于事件驱动架构的价值 简化统一事件源接入 沉淀通用事件通道能力 提升优化用户集成体验 利用函数计算提供的 HTTP 函数 URL 能力,结合事件总线端点 API 能力,能够快速的帮助客户进行系统扩展和集成。 客户场景案例分享 总线模式 + 函数计算用户案例 利用 ActionTrail 事件触发函数进行多账号审计管理 利用 OSS 文件上传事件触发函数扩容 ACK  集群资源 利用 OSS 文件上传执行 Terraform 文件并访问外部 API 做结果通知 事件流模式 + 函数计算用户案例 利用函数计算细粒度资源弹性特征,结合业务波峰波谷的特点,实现快速的消息清洗和处理。 事件流触发函数计算处理业务消息 事件流触发函数计算进行简单 ETL 数据同步 事件流触发函数进行简单 ETL 数据清洗入库 函数异步+事件流触发函数构建电商运营通知系统 在购物车加购,商品变更通知场景,利用函数计算异步系统(内部自带 Queue 能力),触发大量运营通知,利用函数异步的 Destination 能力将运营通知结果写入 MQ,然后利用事件流能力对 MQ 数据进行再次处理,写入HBase数据库中。
作者:世如
#行业实践 #事件驱动架构 #云原生

2022年11月30日

RocketMQ 5.0 可观测能力升级:Metrics 指标分析
从消息的生命周期看可观测能力 在进入主题之前先来看一下 RocketMQ 生产者、消费者和服务端交互的流程: message produce and consume process RocketMQ 的消息是按照队列的方式分区有序储存的,这种队列模型使得生产者、消费者和读写队列都是多对多的映射关系,彼此之间可以无限水平扩展。对比传统的消息队列如 RabbitMQ 是很大的优势,尤其是在流式处理场景下能够保证同一队列的消息被相同的消费者处理,对于批量处理、聚合处理更友好。 接下来我们来看一下消息的整个生命周期中需要关注的重要节点: message life cycle 首先是消息发送:发送耗时是指一条消息从生产者开始发送到服务端接收到并储存在硬盘上的时间。如果是定时消息,需要到达指定的定时时间才能被消费者可见。 服务端收到消息后需要根据消息类型进行处理,对于定时/事务消息只有到了定时时间/事务提交才对消费者可见。RocketMQ 提供了消息堆积的特性,即消息发送到服务端后并不一定立即被拉取,可以按照客户端的消费能力进行投递。 从消费者的角度上看,有三个需要关注的阶段: 拉取消息:消息从开始拉取到抵达客户端的网络和服务端处理耗时; 消息排队:等待处理资源,即从消息抵达客户端到开始处理消息; 消息消费:从开始处理消息到最后提交位点/返回 ACK。 消息在生命周期的任何一个阶段,都可以清晰地被定义并且被观测到,这就是 RocketMQ 可观测的核心理念。而本文要介绍的 Metrics 就践行了这种理念,提供覆盖消息生命周期各个阶段的监控埋点。借助 Metrics 提供的原子能力我们可以搭建适合业务需要的监控系统: 日常巡检与监控预警; 宏观趋势/集群容量分析; 故障问题诊断。 RocketMQ 4.x Metrics 实现 – Exporter RocketMQ 团队贡献的 RocketMQ exporter 已被 Prometheus 官方的开源 Exporter 生态所收录,提供了 Broker、Producer、Consumer 各个阶段丰富的监控指标。 exporter metrics spec Exporter 原理解析 RocketMQ expoter 获取监控指标的流程如下图所示,Expoter 通过 MQAdminExt 向 RocketMQ 集群请求数据。获取的数据转换成 Prometheus 需要的格式,然后通过 /metics 接口暴露出来。 rocketmq exporter 随着 RocketMQ 的演进,exporter 模式逐渐暴露出一些缺陷: 无法支持 RocketMQ 5.x 中新加入的 Proxy 等模块的可观测需求; 指标定义不符合开源规范,难以和其他开源可观测组件搭配使用; 大量 RPC 调用给 Broker 带来额外的压力; 拓展性差,增加/修改指标需要先修改 Broker 的 admin 接口。 为解决以上问题,RocketMQ 社区决定拥抱社区标准,在 RocketMQ 5.x 中推出了基于 OpenTelemtry 的 Metrics 方案。 RocketMQ 5.x 原生 Metrics 实现 基于 OpenTelemtry 的 Metrics OpenTelemetry 是 CNCF 的一个可观测性项目,旨在提供可观测性领域的标准化方案,解决观测数据的数据模型、采集、处理、导出等的标准化问题,提供与三方 vendor 无关的服务。 在讨论新的 Metrics 方案时 RocketMQ 社区决定遵守 OpenTelemetry 规范,完全重新设计新 metrics 的指标定义:数据类型选用兼容 Promethues 的 Counter、Guage、Histogram,并且遵循 Promethues 推荐的指标命名规范,不兼容旧有的 rocketmqexporter 指标。新指标覆盖 broker、proxy、producer、consumer 等各个 module,对消息生命周期的全阶段提供监控能力。 指标上报方式 我们提供了三种指标上报的方式: Pull 模式:适合自运维 K8s 和 Promethues 集群的用户; Push 模式:适合希望对 metrics 数据做后处理或接入云厂商的可观测服务的用户; Exporter 兼容模式:适合已经在使用 Exporter 和有跨数据中心(或其他网络隔离环境)传输 metrics 数据需求的用户。 Pull Pull 模式旨在与 Prometheus 兼容。在 K8s 部署环境中无需部署额外的组件,prometheus 可以通过社区提供的 K8s 服务发现机制(创建 PodMonitor、ServiceMonitor CDR)自动获取要拉取的 broker/proxy 列表,并从他们提供的 endpoint 中拉取 metrics 数据。 pull mode Push OpenTelemetry 推荐使用 Push 模式,这意味着它需要部署一个 collector 来传输指标数据。 push mode OpenTelemetry 官方提供了 collector 的实现,支持对指标做自定义操作如过滤、富化,可以利用社区提供的插件实现自己的 collector。并且云厂商提供的可观测服务(如 AWS CloudWatch、阿里云 SLS)大多已经拥抱了 OpenTelemetry 社区,可以直接将数据推送到它们提供的 collector 中,无需额外的组件进行桥接。 OpenTelemetry collector 兼容 RocketMQ Exporter 新的 Metrics 也提供对 RocketMQ Exporter 的兼容,现在使用 exporter 的用户无需变更部署架构即可接入新 Metrics。而且控制面应用(如 Promethues)和数据面应用(如 RocketMQ)有可能隔离部署。因此借助 Exporter 作为代理来获取新的 Metrics 数据也不失为一种好的选择。 RocketMQ 社区在 Exporter 中嵌入了一个 OpenTelemetry collector 实现,Broker 将 Metrics 数据导出到 Exporter,Exporter 提供了一个新的 endpoint(下图中的 metricsv2)供 Prometheus 拉取。 exporter mode 构建监控体系最佳实践 丰富的指标覆盖与对社区标准的遵循使得可以轻而易举的借助 RocketMQ 的 Metrics 能力构建出适合业务需求的监控体系,这个章节主要以一个典型的流程介绍构建监控体系的最佳实践: 集群监控/巡检 触发告警 排查分析。 集群状态监控与巡检 我们将指标采集到 Promethues 后就可以基于这些指标配置监控,这里给出一些示例: 接口监控: 监控接口调用情况,可以据此快速抓出异常的请求对症下药 下图给出一些相关示例:所有 RPC 的耗时(avg、pt90、pt99 等)、成功率、失败原因、接口调用与返回值分布情况等。 rpc metrics 客户端监控: 监控客户端的使用情况,发现非预期的客户端使用如超大消息发送、客户端上下线、客户端版本治理等。 下图给出一些相关示例:客户端连接数、客户端语言/版本分布、发送的消息大小/类型分布。 client metrics Broker 监控: 监控 Broker 的水位和服务质量,及时发现集群容量瓶颈。 下图给出一些相关示例:Dispatch 延迟、消息保留时间、线程池排队、消息堆积情况。 broker metrics 以上的示例只是 Metrics 的冰山一角,需要根据业务需要灵活组合不同的指标配置监控与巡检。 告警配置 有了完善的监控就可以对需要关注的指标配置告警,比如可以配置 Broker 监控中 Dispatch 延迟这个指标的告警: broker alert 收到告警后可以联动监控查看具体原因,关联发送接口的失败率可以发现有 1.7% 的消费发送失败,对应的报错是没有创建订阅组: promblem analysis 问题排查分析 最后以消息堆积这个场景为例来看一下如何基于 Metrics 分析线上问题。 从消息生命周期看堆积问题 正如本文开篇所述,排查 RocketMQ 的问题需要结合消息的生命周期综合分析,如果片面的认定是服务端/客户端的故障未免会误入歧途。 对于堆积问题,我们主要关注消息生命周期中的两个阶段: 就绪消息:就绪消息是可供消费但还未被拉取的消息,即在服务端堆积的消息; 处理中消息:处理中的消息是被客户端拉取但是还未被消费的消息。 consume lag 多维度指标分析堆积问题 对于堆积问题,RocketMQ 提供了消费延迟相关指标 rocketmq_consumer_lag_latency 可以基于这个指标配置告警。告警的阈值需要根据当前业务对消费延迟的容忍程度灵活指定。 触发告警后就需要对消息堆积在还是就绪消息和处理中消息进行分析,RocketMQ 提供了 rocketmq_consumer_ready_messages 和 rocketmq_consumer_inflight_messages 这两个指标,结合其他消费相关指标与客户端配置综合分析即可判断出消息堆积的根因: case 1:就绪消息持续上涨,处理中消息达到客户端堆积上限 这是最常见的堆积场景,客户端处理中的消息量 rocketmq_consumer_inflight_messages 达到了客户端配置的阈值,即消费者的消费能力低于消息发送量。如果业务要求尽可能实时的消费消息就需要增加消费者机器数量,如果业务对消息延迟不是很敏感可以等待业务高峰过去后再消化堆积的消息。 case 2:就绪消息几乎为 0,处理中消息持续上涨 这个 case 多出现在使用 RocketMQ 4.x 客户端的场景,此时消费位点是顺序提交的,如果某条消息的消费卡住会导致位点无法提交。看起来的现象是消息在客户端大量堆积,即处理中消息持续上涨。可以结合消费轨迹和 rocketmq_process_time 这个指标抓出消费慢的消息分析上下游链路,找到根因优化消费逻辑。 case 3: 就绪消息持续上涨,处理中消息几乎为 0 此种场景说明客户端没有拉取到消息,一般有如下几种情况: 鉴权问题:检查 ACL 配置,如果使用公有云产品请检查 AK、SK 配置; 消费者 hang 住:尝试打印线程堆栈或 gc 信息判断是否是进程卡死; 服务端响应慢:结合 RPC 相关指标查看拉取消息接口调用量与耗时、硬盘读写延迟。检查是否为服务端问题,如硬盘 IOPS 被打满了等等。
作者:玄珏
#技术探索 #可观测

2022年11月25日

RocketMQ 的消费者类型详解与最佳实践
在 RocketMQ 5.0 中,更加强调了客户端类型的概念,尤其是消费者类型。为了满足多样的 RocketMQ 中一共有三种不同的消费者类型,分别是 PushConsumer、SimpleConsumer 和 PullConsumer。不同的消费者类型对应着不同的业务场景。 消费者类型概览 本篇文章也会根据不同的消费者类型来进行讲述。在介绍不同的消息类型之前,先明确一下不同 RocketMQ 消费者中的一个通用工作流程:在消费者中,到达客户端的消息都是由客户端主动向服务端请求并挂起长轮询获得的。为了保证消息到达的及时性,客户端需要不断地向服务端发起请求(请求是否需要由客户端主动发起则与具体的客户端类型有关),而新的符合条件的消息一旦到达服务端,就会客户端请求走。最终根据客户端处理的结果不同,服务端对消息的处理结果进行记录。 另外 PushConsumer 和 SimpleConsumer 中还会有一个 ConsumerGroup 的概念,ConsumerGroup 相当于是一组相同订阅关系的消费者的共同身份标识。而服务端也会根据 ConsumerGroup 来记录对应的消费进度。同一个 ConsumerGroup 下的消息消费者将共同消费符合当前订阅组要求的所有消息,而不是独立进行消费。相比较于 PullConsumer,PushConsumer 和 SimpleConsumer 更加适用于业务集成的场景,由服务端来托管消费状态和进度,相对来说更加的轻量与简单。 简单来说: PushConsumer :全托管的消费者类型,用户只需要注册消息监听器即可,符合对应订阅关系的消息就会调用对应的消费方法,是与业务集成最为普遍的消费者类型。 SimpleConsumer:解耦消息消费与进度同步的消费者类型,用户自主接受来自服务端的消息,并对单条消息进行消息确认。和 PushConsumer 一样也由服务端托管消费进度,适用于用户需要自主控制消费速率的业务场景。 PullConsumer:使用流处理框架进行管理的消费者类型,用户按照队列(Topic 的最小逻辑组成单位)来进行消息的接收并可以选择自动或者手动提交消费位点。 PushConsumer PushConsumer 是 RocketMQ 目前使用最为广泛的消费者。用户只需要确认好订阅关系之后,注册相对应的 Listener 即可。符合对应订阅关系的消息在由 Producer 发出后,消费者的 Listener 接口也会被即时调用,那么此时用户需要在 Listener 中去实现对应的业务逻辑。 使用简介 以下是 Push 消费者的使用示例: PushConsumer pushConsumer = provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) // set the consumer group name. .setConsumerGroup(consumerGroup) // set the subscription for the consumer. .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .setMessageListener(messageView { // handle the received message and return consume result. LOGGER.info("consume message={}", messageView); return ConsumeResult.SUCCESS; }) .build(); // block the main thread, no need for production environment. Thread.sleep(Long.MAX_VALUE); // close the push consumer when you don't need it anymore. pushConsumer.close(); 用户需要根据自己业务处理结果的不同来返回 ConsumeResult.SUCCESS或者 ConsumeResult.FAILURE。当用户返回 ConsumeResult.SUCCESS时,消息则被视为消费成功;当用户返回 ConsumeResult.FAILURE时,则服务端视为消费失败,会进行该条消息的退避重试,消息的退避重试是指,在消息被消费成功之前,当前消息会被多次投递到用户注册的 MessageListener 中直到消费成功,而两次消费之间的时间间隔则是符合退避规律的。 特别的,每个 ConsumerGroup 都会有一个最大消费次数的设置,如果当前消息的消费次数超过了这个设置,则消息不会再被投递,转而被投递进入死信队列。这个消费次数在消息每次被投递到 MessageListener 时都会进行自增。譬如:如果消息的最大消费次数为 1,那么无论对于这条消息,当前是被返回消费成功还是消费失败,都只会被消费这一次。 应用场景与最佳实践 PushConsumer 是一种近乎全托管的消费者,这里的托管的含义在于用户本身并不需要关心消息的接收,而只需要关注消息的消费过程,除此之外的所有逻辑都在 Push 消费者的实现中封装掉了,用户只需要根据每条收到的消息返回不同的消费结果即可,因此也是最为普适的消费者类型。 MessageListener 是针对单条消息设计的监听器接口: / MessageListener is used only for the push consumer to process message consumption synchronously. Refer to {@link PushConsumer}, push consumer will get message from server and dispatch the message to the backend thread pool to consumer message concurrently. / public interface MessageListener { / The callback interface to consume the message. You should process the {@link MessageView} and return the corresponding {@link ConsumeResult}. The consumption is successful only when {@link ConsumeResultSUCCESS } is returned, null pointer is returned or exception is thrown would cause message consumption failure too. / ConsumeResult consume(MessageView messageView); } 绝大多数场景下,使用方应该快速处理消费逻辑并返回消费成功,不宜长时间阻塞消费逻辑。对于消费逻辑比较重的情形,建议可以先行提交消费状态,然后对消息进行异步处理。 实际在 Push 消费者的实现中,为了保证消息消费的及时性,消息是会被预先拉取客户端再进行后续的消费的,因此在客户端中存在对已拉取消息大小的缓存。为了防止缓存的消息过多导致客户端内存泄漏,也提前预留了客户端参数供使用者自行进行设置。 // 设置本地最大缓存消息数目为 16 条 pushConsumer.setMaxCachedMessageCount(16); // 设置本地最大缓存消息占用内存大小为 128 MB pushConsumer.setMaxCachedMessageSizeInBytes(128 1024 1024); SimpleConsumer 相比较 PushConsumer,SimpleConsumer 则暴露了更多的细节给使用者。在 SimpleConsumer 中,用户将自行控制消息的接收与处理。 使用简介 以下是 SimpleConsumer 的使用示例: SimpleConsumer consumer = provider.newSimpleConsumerBuilder() .setClientConfiguration(clientConfiguration) // Set the consumer group name. .setConsumerGroup(consumerGroup) // set await duration for longpolling. .setAwaitDuration(awaitDuration) // Set the subscription for the consumer. .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .build(); // Max message num for each long polling. int maxMessageNum = 16; // Set message invisible duration after it is received. Duration invisibleDuration = Duration.ofSeconds(15); final List messages = consumer.receive(maxMessageNum, invisibleDuration); LOGGER.info("Received {} message(s)", messages.size()); for (MessageView message : messages) { final MessageId messageId = message.getMessageId(); try { consumer.ack(message); LOGGER.info("Message is acknowledged successfully, messageId={}", messageId); } catch (Throwable t) { LOGGER.error("Message is failed to be acknowledged, messageId={}", messageId, t); } } // Close the simple consumer when you don't need it anymore. consumer.close(); 在 SimpleConsumer 中用户需要自行进行消息的拉取,这一动作通过 SimpleConsumerreceive 这个接口进行,然后再根据自己业务逻辑处理结果的不同再对拉取到的消息进行不同的处理。SimpleConsumerreceive 也是通过长轮询来接受来自服务端的消息,具体的长轮询时间可以使用 SimpleConsumerBuildersetAwaitDuration 来进行设置。 在 SimpleConsumer 中,用户需要通过 SimpleConsumerreceive 设置一个消息不重复的时间窗口(或者说关于通过这个接口收到的消息的一个不可见时间窗口),这个时间窗口从用户接受到这条消息开始计时,在这段时间之内消息是不会重复投递到消费者的,而超出这个时间窗口之后,则会对这条消息进行再一次的投递。在这个过程中,消息的消费次数也会进行递增。与 PushConsumer 类似的是,一旦消费次数超出 ConsumerGroup 的最大次数,也就不会进行重投了。 相比较于 PushConsumer 而言,SimpleConsumer 用户可以自主控制接受消息的节奏。SimpleConsumerreceive 会针对于当前的订阅关系去服务端拉取符合条件的消息。SimpleConsumer 实际上的每次消息接收请求是按照具体 Topic 的分区来 one by one 发起请求的,实际的 Topic 分区可能会比较多,因此为了保证消息接收的及时性,建议综合自己的业务处理能力一定程度上提高 SimpleConsumerreceive 的并发度。 用户在接受到消息之后,可以选择对消息使用 ack 或者 changeInvisibleDuration,前者即对服务端表示对这条消息的确认,与 PushConsumer 中的消费成功类似,而 changeInvisibleDuration 则表示延迟当前消息的可见时间,即需要服务端在当前一段时间之后再向客户端进行投递。值得注意的是,这里消息的再次投递也是需要遵循 ConsumerGroup 的最大消费次数的限制,即一旦消息的最大消费次数超出了最大消费次数(每次消息到达可见时间都会进行消费次数的自增),则不再进行投递,转而进入死信队列。举例来说: 进行 ack,即表示消息消费成功被确认,消费进度被服务端同步。 进行 changeInvisibleDuration, 1)如果消息已经超过当前 ConsumerGroup 的最大消费次数,那么消息后续会被投递进入死信队列 2)如果消息未超过当前 ConsumerGroup 的最大消费次数,若请求在上一次消息可见时间到来之前发起,则修改成功,否则则修改失败。 应用场景与最佳实践 在 PushConsumer 中,消息是单条地被投递进入 MessageListener来处理的,而在 SimpleConsumer 中用户可以同时拿到一批消息,每批消息的最大条数也由 SimpleConsumerreceive 来决定。在一些 IO 密集型的应用中,会是一个更加方便的选择。此时用户可以每次拿到一批消息并集中进行处理从而提高消费速度。 PullConsumer PullConsumer 也是 RocketMQ 一直以来都支持的消费者类型,RocketMQ 5.0 中全新的 PullConsumer API 还在演进中,敬请期待。下文中的 PullConsumer 会使用 4.0 中现存的 LitePullConsumer 进行论述,也是当前推荐的方式。 使用简介 现存的 LitePullConsumer 中的主要接口 // PullConsumer 中的主要接口 public interface LitePullConsumer { // 注册路由变化监听器 void registerTopicMessageQueueChangeListener(String topic, TopicMessageQueueChangeListener topicMessageQueueChangeListener) throws MQClientException; // 将队列 assign 给当前消费者 void assign(Collection messageQueues); // 针对当前 assigned 的队列获取消息 List poll(long timeout); // 查找当前队列在服务端提交的位点 Long committed(MessageQueue messageQueue) throws MQClientException; // 设置是否自动提交队列位点 void setAutoCommit(boolean autoCommit); // 同步提交队列位点 void commitSync(); } 在 RocketMQ 中,无论是消息的发送还是接收,都是通过队列来进行的,一个 Topic 由若干个队列组成,消息本身也是按照队列的形式来一个个进行存储的,同一个队列中的消息拥有不同的位点,且位点的大小是随随消息达到服务端的时间逐次递增的,本质上不同 ConsumerGroup 在服务端的消费进度就是一个个队列中的位点信息,客户端将自己的消费进度同步给服务端本质上其实就是在同步一个个消息的位点。 在 PullConsumer 中将队列这个概念完整地暴露给了用户。用户可以针对自己关心的 topic 设置路由监听器从而感知队列的变化,并将队列 assign 给当前消费者,当用户使用 LitePullConsumerpoll 时会尝试获取已经 assign 好了的队列中的消息。如果设置了 LitePullConsumersetAutoCommit 的话,一旦消息达到了客户端就会自动进行位点的提交,否则则需要使用 LitePullConsumercommitSync 接口来进行手动提交。 应用场景与最佳实践 PullConsumer 中用户拥有对消息位点管理的绝对自主权,可以自行管理消费进度,这是与 PushConsumer 和 SimpleConsumer 最为本质的不同,这也使得 PullConsumer 在流计算这种需要同时自主控制消费速率和消费进度的场景能得到非常广泛的应用。更多情况下,PullConsumer 是与具体的流计算框架进行集成的。
作者:凌楚
#行业实践 #功能特性

2022年11月18日

RocketMQ 客户端负载均衡机制详解及最佳实践
前言 本文介绍 RocketMQ 负载均衡机制,主要涉及负载均衡发生的时机、客户端负载均衡对消费的影响(消息堆积/消费毛刺等)并且给出一些最佳实践的推荐。 负载均衡意义 上图是 RocketMQ 的消息储存模型:消息是按照队列的方式分区有序储存的。RocketMQ 的队列模型使得生产者、消费者和读写队列都是多对多的映射关系,彼此之间都可以无限水平扩展。对比传统的消息队列如 RabbitMQ 是很大的优势。尤其是在流式处理场景下有天然优势,能够保证同一队列的消息被相同的消费者处理,对于批量处理、聚合处理更友好。 消费者消费某个 topic 的消息等同于消费这个 topic 上所有队列的消息(上图中 Consumer A1 消费队列 1,Consumer A2 消费队列 2、3)。 所以,要保证每个消费者的负载尽量均衡,也就是要给这些消费者分配相同数量的队列,并保证在异常情况下(如客户端宕机)队列可以在不同消费者之间迁移。 负载均衡机制解析 负载均衡时机 负载均衡是客户端与服务端互相配合的过程,我们先综合服务端和客户端的职责回答第一个问题:何时会发生负载均衡。 客户端主动负载均衡   上图是 RocketMQ 客户端相关类的结构,其中 MQClientInstance 负责和服务端的交互以及底层服务的协调,这其中就包括负载均衡。 MQClientInstance 中有两个相关的方法 rebalanceImmediately 和 doRebalance,我们分析负载均衡的时机只要找到何时调用这两个方法即可: 1. 启动时立即进行负载均衡; 2. 定时(默认 20s)负载均衡一次。   服务端通知负载均衡   服务端通知客户端进行负载均衡也是通过 MQClientInstancerebalanceImmediately 方法实现的,我们同样在服务端代码中寻找相关调用。 分析以上几个方法可以得出结论,在如下场景服务端会主动通知客户端触发负载均衡: 1. 客户端上下线 上线 1. 新客户端发送心跳到服务端 下线 2. 客户端发送下线请求到服务端 3. 底层连接异常:响应 netty channel 的 IDLE/CLOSE/EXCEPTION 事件 2. 订阅关系变化:订阅新 topic 或有旧的 topic 不再订阅 负载均衡策略 前文已经介绍了负载均衡实际是变更消费者负责处理的队列数量,这里每次需要变更的队列数量和受到影响的客户端数量是由负载均衡策略决定的。 我们来分析一下比较常见的负载均衡策略: 平均分配 平均分配(AllocateMessageQueueAveragely)是默认的负载均衡策略: 如果我们有 4 个客户端,24 个队列,当第二个客户端下线时: 以默认的负载均衡策略(AllocateMessageQueueAveragely)为例,重新分配队列数量为 8。 默认的负载均衡策略能将队列尽量均衡的分配到每个客户端,但是每次负载均衡重新分配队列数量较多,尤其是在客户端数量很多的场景。 | 客户端 | 队列分配变化 | 队列数变化 | | | | | | Client1 | 1~6 1~8 | 6 8 | | Client2 | 7~12 | 6 0 | | Client3 | 13~18 9~16 | 6 8 | | Client4 | 19~24 17~24 | 6 8 | 一致性哈希 基于一致性哈希算法的负载均衡策略(AllocateMessageQueueConsistentHash)每次负载均衡会重新分配尽可能少的队列数量,但是可能会出现负载不均的情况。 | 客户端 | 队列分配变化 | 队列数变化 | | | | | | Client1 | 1~6 1~9 | 6 9 | | Client2 | 7~12 | 6 0 | | Client3 | 13~18 10~18 | 6 9 | | Client4 | 19~24 19~24 | 6 8 | 负载均衡对消费的影响 我们以一个真实的线上场景来举例: 下图中绿色的线代表发送 tps,黄色的线代表消费 tps,我们很容易发现在 21:00 和 21:50 分左右存在消费毛刺。 这两个时间点在进行应用发布,根据我们上文的分析某个消费者下线后同组的其他消费者感知这一变化需要一定时间,导致有秒级的消费延迟产生。在发布结束后消费者快速处理堆积的消息,可以发现消费速度有一个明显的上涨。 这个例子展示了下线时由于负载均衡带来了短暂的消息处理延迟,新的消费者会从服务端获取消费位点继续之前的消费进度。如果消费者异常宕机或者没有调用 shutdown 优雅下线,没有上传自己的最新消费位点,会使得新分配的消费者重复消费。 这里我们总结下负载均衡对消费的影响,当某个客户端触发负载均衡时: 1. 对于新分配的队列可能会重复消费,这也是官方要求消费要做好幂等的原因; 2. 对于不再负责的队列会短时间消费停止,如果原本的消费 TPS 很高或者正好出现生产高峰就会造成消费毛刺。  最佳实践 避免频繁上下线 为了避免负载均衡的影响应该尽量减少客户端的上下线,同时做好消费幂等。 同时在有应用重启或下线前要调用 shutdown 方法,这样服务端在收到客户端的下线请求后会通知客户端及时触发负载均衡,减少消费延迟。 选择合适的负载均衡策略 需要根据业务需要灵活选择负载均衡策略: 需要保证客户端的负载尽可能的均衡:选择默认的平均分配策略; 需要降低应用重启带来的消费延迟:选择一致性哈希的分配策略。  当然还有其他负载均衡策略由于时间关系不一一介绍了,留给读者自行探索。 保证客户端订阅一致 RocketMQ 的负载均衡是每个客户端独立进行计算,所以务必要保证每个客户端的负载均衡算法和订阅语句一致。 负载均衡策略不一致会导致多个客户端分配到相同队列或有客户端分不到队列; 订阅语句不一致会导致有消息未能消费。  RocketMQ 5.0 消息级别负载均衡 为了彻底解决客户端负载均衡导致的重复消费和消费延迟问题,RocketMQ 5.0 提出了消息级别的负载均衡机制。 同一个队列的消息可以由多个消费者消费,服务端会确保消息不重不漏的被客户端消费到: 消息粒度的负载均衡机制,是基于内部的单条消息确认语义实现的。消费者获取某条消息后,服务端会将该消息加锁,保证这条消息对其他消费者不可见,直到该消息消费成功或消费超时。因此,即使多个消费者同时消费同一队列的消息,服务端也可保证消息不会被多个消费者重复消费。 在 4.x 的客户端中,顺序消费的实现强依赖于队列的分配。RocketMQ 5.0 在消息维度的负载均衡的基础上也实现了顺序消费的语意:不同消费者处理同一个消息组内的消息时,会严格按照先后顺序锁定消息状态,确保同一消息组的消息串行消费。 如上图所述,队列 Queue1 中有 4 条顺序消息,这 4 条消息属于同一消息组 G1,存储顺序由 M1 到 M4。在消费过程中,前面的消息 M1、M2 被 消费者Consumer A1 处理时,只要消费状态没有提交,消费者 A2 是无法并行消费后续的 M3、M4 消息的,必须等前面的消息提交消费状态后才能消费后面的消息。
作者:玄珏
#行业实践

2022年11月9日

消息队列Apache RocketMQ 5.0:从消息服务到云原生事件流平台
前言 回顾 RocketMQ 的发展历程,至今已十年有余。2022 年 RocketMQ 5.0 正式发布,全面迈进云原生时代。 11 月 5 日,2022 杭州 · 云栖大会上,阿里云智能高级产品专家杨秋弟在云原生峰会上发表主题演讲,发布消息队列 RocketMQ 5.0:从消息服务到云原生事件流处理平台。 阿里云智能高级产品专家&Apache RocketMQ 联合创始人 杨秋弟 Apache RocketMQ 发展史 回顾 Apache RocketMQ 过去十年的发展历程,可分为“诞生于互联网”与“成长于云计算”两大阶段。 第一个阶段是 RocketMQ 的从 0 到 1,在阿里内部规模化落地。2012 年,为了支撑超大规模电商互联网架构,阿里中间件研发了 RocketMQ,并在产品诞生初期开源,2017 年 RocketMQ 统一了阿里消息技术体系。 第二个阶段是云计算。2015 年 RocketMQ 上云,这也是业界首个提供公共云 SaaS 形态的开源消息队列;2016 年,阿里把 RocketMQ 捐赠给 Apache,2017 年孵化毕业,成为国内首个 TLP 的互联网中间件。 十年磨一剑,出鞘必锋芒。在这十年的过程中,通过集团打磨稳定性,依托云计算孵化创新,开源共建加速标准化建立与生态连接,RocketMQ 始终坚持开源、集团、商业三位一体的发展思路,内核演进和产品迭代齐头并进。2022 年 RocketMQ 5.0 正式发布宣告着全面迈进云原生时代。 RocketMQ 5.0:从消息服务到云原生事件流平台 回顾过去这十年,RocketMQ 服务于集团几乎所有的业务,在阿里云上更是累计服务了 10 万余家企业客户,覆盖互联网、零售、金融、汽车等 20 多个行业,大规模的生产实践持续累积产品的核心优势。 多样性。企业级应用复杂的业务诉求,催生 RocketMQ 提供丰富的消息类型,比如定时消息、事务消息、顺序消息等等。此外,也提供了像消息轨迹、消息回溯等一系列的消息治理能力。 一致性。无论是淘宝交易还是蚂蚁支付都天然对数据一致性有着极高的要求,RocketMQ 提供的分布式事务消息是业内第一个提供该特性的消息产品,将异步解耦与数据一致性完美融合,是金融客户中不可或缺的产品能力。 稳定性。稳定性是产品的根基,更是一个系统工程,RocketMQ 长期在电商和金融领域中打磨,不仅提供最高达 99.99% SLA,更是帮助客户提供全方位的健康巡检与故障排查能力,如消息轨迹、消息回溯、死信机制等等,提供多样化的稳定性兜底手段。 高性能。在双十一的极限流量下,RocketMQ 具备无限扩展能力,支持千万级并发与万亿级消息洪峰,P9999 写延迟在 1ms 内,100%在 100ms 内。 可以说,在消息领域,尤其在业务消息领域,RocketMQ 在国内已经做到顶尖,成为企业客户的首选。 而随着云原生以及数字化时代的到来,RocketMQ 也在快速的发生着变化,那么变化主要体现在哪些方面呢? 首先,全面拥抱云原生。向下,消息系统自身实现云原生架构的演进,充分释放云基础设施的池化能力,全方位提高消息的核心技术指标。向上,消息产品形态持续演进,成为云原生应用架构的核心引擎。比如微服务、事件驱动、Serverless 等现代化应用架构。 其次,全面拥抱实时数据。企业的数字化转型从原来的业务数字化迈向了数字业务化。对业务数据的实时洞察、实时决策成为指导业务成功的关键要素。消息队列也将从在线业务架构的基础设施,延伸到实时数据架构的基础设施,从而实现事务分析一体化。 随着 5.0 的发布,RocketMQ也正式从消息服务升级到云原生事件流处理平台。 RocketMQ 5.0:云原生架构升级 首先来看 RocketMQ 自身的云原生架构演进。从下面的全景图可以看出,RocketMQ 从客户端到服务端都进行了全方位的改造,具体体现在以下几个方面: 轻量化。RocketMQ 4.0 诞生于 2012 年的淘宝电商,当时大部分的业务还跑在物理机上,单节点计算能力强,客户端节点数少且较为稳定,因此,富客户端的接入方式不仅更加高效,更可以提供诸如客户端侧负载均衡、消息缓存、故障转移等一系列企业级特性。但这种模式在云原生时代发生了改变,轻量客户端更容易被云原生技术栈所集成。因此,RocketMQ 5.0 客户端采用轻量 SDK 设计理念,将原来富客户端的逻辑下沉到服务端,满足现代化应用轻量化、Serverless 化以及 Mesh 化的趋势,更容易被集成;同时也正是因为轻量化,使得 SDK 多语言开发成本低了很多,快速覆盖当下主流的多语言版本。 弹性。存算分离架构让无状态计算节点可以快速伸缩,而分级存储以及冷热分离架构更是让消息存储具备更强的弹性能力。 高可用。基于全新的 Leaderless 架构,去 ZK 依赖的同时,可以做到副本数灵活选择,同步异步自动升降级,实现秒级故障转移;面向云的多可用区、多地域组建全局高可用能力。 基础设施云原生化。RocketMQ 整体架构走向 Kubernetes 化,拥抱 OpenTelemetry,依托于阿里云提供的 ARMS、Prometheus 以及Grafana 实现可观测能力的云原生化。 而 RocketMQ 5.0 本次的升级,除了在技术架构云原生化之外,在产品能力以及成本优化方面都有着重大的提升,我们来逐一分解。 轻量无状态消费模型 RocketMQ 4.0 采用按队列消费模型,消费者完全按照队列负载均衡,非常适合批量拉取快速消费,对单一消息状态不敏感的场景,比如流计算。然而在业务消息领域,尤其是金融场景以及事件驱动架构之下,每一条消息状态都是极为重要的。再加上不同业务类型的消息处理耗时也是各不相同,从毫秒级、到秒级甚至到分钟级,队列的负载不均衡或者短暂的 Block 都可能会引发消息的局部堆积,从而影响到最终用户的体验。因此,RocketMQ 5.0 全新推出按消息负载的轻量无状态消费模型,通过 PoP 机制巧妙地在队列模型之上构建了消息模型,业务只需要关心消息而无需关心队列,所有 API 都能够支持单条消息级别控制,如消息的消费、重试、删除等。而基于消息消费模型,客户端、连接和消费都是无状态的,可在任意 Proxy 节点上飘移,真正做到轻量化。 RocketMQ 5.0 提供按队列消费模型以及按消息消费模型,更好的满足事件与流的业务场景,正可谓鱼与熊掌兼得。 海量消息分级存储 RocketMQ 5.0 的另一个重大升级则是海量消息的分级存储。对消息队列了解的同学都知道,消息通常都是流动的短时间的存储,业内大部分消息产品对消息的保留时间都比较优先,3 天,7 天,最长 15 天不等。有限的存储空间使不仅限制了消息的保留时长,更在某些场景下可能会导致业务资损,比如在消息还没有被消费的时候,因为磁盘空间不足或者消息过期而被清除,这在金融等领域都是不可接受的。所以,RocketMQ 一直想要解决这样的问题,让存储变得更有弹性。 RocketMQ 5.0 基于ESSD、对象存储打造冷热分离以及分级存储架构,提供低成本的无限存储能力,确保消息不会因为本地磁盘空间不足而提前被清除,造成业务资损。我们提供消息存储的 Serverless,客户只需按实际存储使用量付费,而无需预购存储空间。 此外,流量削峰是消息队列极为常见的场景,而由此带来的海量消息堆积能力以及在堆积情况下的性能稳定性成为衡量产品性能的核心指标。RocketMQ 5.0 基于冷热数据分离架构进一步做到读写隔离,避免在堆积的场景下影响热数据的写入性能。分级存储的冷数据碎片规整能力更是提高了冷数据的读取性能,为用户提供一致的冷读 SLA。 售卖系列全线升级,最高降本 50% 从前面的介绍我们已经了解到,RocketMQ 5.0 在技术架构以及产品能力上都有着明显提升。 而 5.0 推出全新的售卖形态与计费策略相比 4.0 更简单、更灵活也更为普惠。实例的综合成本最高可降低 50%。接入门槛最低可降至 390 元/月,远低于自建成本。消息存储支持 Serverless 弹性能力,按需付费可大幅降低闲置成本。结合冷热分离的多级存储能力,相比开源自建可降低 67%,大幅降低消息队列的使用成本。 EventBridge:云上事件枢纽 事件驱动是一个起源很早的概念,早在几十年前,无论是操作系统内核的设计、还是客户端编程框架都大量采用了事件驱动技术。RocketMQ PushConsumer 提供的 API 其实就是一种事件驱动的编程范式,但在微服务应用架构当中,集成与通信才是刚需,事件驱动的价值并没有那么明显的体现。 而随着云原生时代的到来,计算力的构成越来越多样化。作为云原生的代表技术,Serverless 架构范式也是事件驱动的。无论是阿里云的函数计算、还是 AWS 的 Lambda,它们的主要触发源都是各种形态的事件,云产品事件,如 OSS 文件上传触发用户基于函数进行文件加工处理;用户业务事件,如 RocketMQ 触发函数运行消费逻辑处理等等。 以事件驱动为核心理念,阿里云推出了 EventBridge 产品,其使命就是打造云上的事件枢纽。通过EventBridge 可以兑现四大业务价值: 统一事件枢纽。阿里云从 IaaS、PaaS到第三方的 SaaS,每天都有数以亿计的事件产生,但却没有一种简单和统一的方式来触达这些事件;这些事件散落在各个地方形成『事件孤岛』,很难挖掘出有用的业务价值。只有充分发挥数据的规模效应,建立起数据之间的血缘关系,我们才能更好的发掘出数据的价值;所以 EventBridge 首要任务便是统一阿里云上的事件规范,拥抱CloudEvents 事件标准,打造阿里云统一的事件枢纽。 事件驱动引擎。当 EventBridge 连接了海量的事件源后,基于 RocketMQ 毫秒级的事件触发能力,必将加速企业 EDA/Serverless 的架构升级。 开放与集成。EventBridge 提供丰富的跨云、跨平台、跨产品、跨地域以及跨账号的连接能力,能够促进云产品、应用程序、SaaS 服务的相互集成。 低代码。EventBridge 借助Serverless 的应用中心,通过简单的规则匹配以及丰富的模板,即可实现事件的分发、过滤、转换等处理,进一步提升事件驱动的效率。 让消息无处不在,让事件无所不及 依托于 EventBridge、RocketMQ 以及函数计算 FC 的强强联合,目前 EventBridge 的事件生态已初具规模。 在云产品事件集成方面,目前已经集成 200+云产品事件源,3000 多种事件类型。 在数据集成与处理方面,EventBridge 与微服务应用、大数据、IoT 等场景深度集成。比如与消息生态的融合,阿里云 6 款消息队列产品通过 EventBridge 实现消息数据的互联互通,并且通过 EventBridge 的全球事件网络赋予消息全球消息路由的能力,同时也可以通过EventBridge 提供的丰富的模板,实现 ETL 数据处理能力。 在 SaaS 应用集成方面,包括钉钉、聚石塔以及云上 50 多个 SaaS 服务都可以通过 EventBridgehook 方式连接到 EventBridge。 在事件触发方面,EventBridge 目前已经触达 10 多个事件目标,海量的事件源都可以通过 EventBridge 触发包括 FC/SAE 等在内的 10 多款事件目标云产品。除此之外,目前 EventBridge 已经对接了阿里云全量的云产品 API,任何一个事件都可以通过云产品 API 的方式进行触达。 未来还有会更多的事件源以及事件目标接入到 EventBridge 上来。 RocketMQ Streams:轻量级计算的新选择 正如开篇所提到的,基于云原生架构的全面升级,RocketMQ 5.0 也将从在线业务架构的基础设施,延伸到实时数据架构的基础设施,实现事务分析一体化。将 RocketMQ Streams 打造成为轻量级计算的新选择。 业内最常见如 Flink、Spark 等大数据框架大多采用中心化的 MasterSlave 架构,依赖和部署比较重,每个任务的执行都需要很大的开销,有较高的使用成本。而与之不同的是,RocketMQ Streams 着重打造轻资源,高性能的轻量计算引擎,无额外依赖,最低1core,1g 即可完成部署,适用于大数据量、高过滤、轻窗口计算的场景,在资源敏感型场景,如消息队列流计算、安全风控,边缘计算等,RocketMQ Streams 具有有很大优势。阿里云消息团队与安全团队合作,通过对过滤场景做大量优化,性能提升 35 倍,资源节省 50%80%。 目前,RocketMQ Streams 已经在开源社区发布,未来计划在 2023 年 4 月在阿里云完成商业化。 RocketMQ 这十年,我们一同向前 RocketMQ历经十余年的打磨,已经取得了众多成果。全球拥有 700+的贡献者,1.8w Star 数,超过 80%的主流云厂商提供了 RocketMQ 的商业托管服务,Apache RocketMQ 社区始终保持着极高的活跃度,因此,也荣获了科创中国“开源创新榜”,中日韩开源软件优秀技术奖等十多个国内外开源奖项。 而阿里云作为 RocketMQ 的起源和核心贡献者,不仅 100%覆盖全集团业务,承载千万级并发万亿级消息洪峰。十多年以来更是累计服务 10w+万企业客户,覆盖互联网、零售、汽车等 20 多个行业,超过 75%的头部企业选择使用 RocketMQ。期望阿里云的消息队列 RocketMQ 可以成为广大企业客户的心之所选。也诚邀更广大的开发者来积极参与RocketMQ 的开源社区建设,一起将 RocketMQ 打造为消息领域的引领者。
#社区动态

2022年10月31日

RocketMQ 重试机制详解及最佳实践
引言 本文主要介绍在使用 RocketMQ 时为什么需要重试与兜底机制,生产者与消费者触发重试的条件和具体行为,如何在 RocketMQ 中合理使用重试机制,帮助构建弹性,高可用系统的最佳实践。 RocketMQ 的重试机制包括三部分,分别是生产者重试,服务端内部数据复制遇到非预期问题时重试,消费者消费重试。本文中仅讨论生产者重试和消费者消费重试两种面向用户侧的实现。 生产者发送重试 RocketMQ 的生产者在发送消息到服务端时,可能会因为网络问题,服务异常等原因导致调用失败,这时候应该怎么办?如何尽可能的保证消息不丢失呢? 1. 生产者重试次数 RocketMQ 在客户端中内置了请求重试逻辑,支持在初始化时配置消息发送最大重试次数(默认为 2 次),失败时会按照设置的重试次数重新发送。直到消息发送成功,或者达到最大重试次数时结束,并在最后一次失败后返回调用错误的响应。对于同步发送和异步发送,均支持消息发送重试。 同步发送:调用线程会一直阻塞,直到某次重试成功或最终重试失败(返回错误码或抛出异常)。 异步发送:调用线程不会阻塞,但调用结果会通过回调的形式,以异常事件或者成功事件返回。  2. 生产者重试间隔 在介绍生产者重试前,我们先来了解下流控的概念,流控一般是指服务端压力过大,容量不足时服务端会限制客户端收发消息的行为,是服务端自我保护的一种设计。RocketMQ 会根据当前是否触发了流控而采用不同的重试策略: 非流控错误场景:其他触发条件触发重试后,均会立即进行重试,无等待间隔。 流控错误场景:系统会按照预设的指数退避策略进行延迟重试。 为什么要引入退避和随机抖动?  如果故障是由过载流控引起的,重试会增加服务端负载,导致情况进一步恶化,因此客户端在遇到流控时会在两次尝试之间等待一段时间。每次尝试后的等待时间都呈指数级延长。指数回退可能导致很长的回退时间,因为指数函数增长很快。指数退避算法通过以下参数控制重试行为,更多信息,请参见 connectionbackoff.md。 INITIAL_BACKOFF:第一次失败重试前后需等待多久,默认值:1 秒; MULTIPLIER :指数退避因子,即退避倍率,默认值:1.6; JITTER :随机抖动因子,默认值:0.2; MAX_BACKOFF :等待间隔时间上限,默认值:120 秒; MIN_CONNECT_TIMEOUT :最短重试间隔,默认值:20 秒。 ConnectWithBackoff() current_backoff = INITIAL_BACKOFF current_deadline = now() + INITIAL_BACKOFF while (TryConnect(Max(current_deadline, now() + MIN_CONNECT_TIMEOUT))!= SUCCESS) SleepUntil(current_deadline) current_backoff = Min(current_backoff MULTIPLIER, MAX_BACKOFF) current_deadline = now() + current_backoff + UniformRandom(JITTER current_backoff, JITTER current_backoff) 特别说明:对于事务消息,只会进行透明重试(transparent retries),网络超时或异常等场景不会进行重试。 3. 重试带来的副作用 不停的重试看起来很美好,但也是有副作用的,主要包括两方面:消息重复,服务端压力增大 远程调用的不确定性,因请求超时触发消息发送重试流程,此时客户端无法感知服务端的处理结果;客户端进行的消息发送重试可能会导致消费方重复消费,应该按照用户ID、业务主键等信息幂等处理消息。  较多的重试次数也会增大服务端的处理压力。  4. 用户的最佳实践是什么 1)合理设置发送超时时间,发送的最大次数 发送的最大次数在初始化客户端时配置在 ClientConfiguration;对于某些实时调用类场景,可能会导致消息发送请求链路被阻塞导致业务请求整体耗时高或耗时;需要合理评估每次调用请求的超时时间以及最大重试次数,避免影响全链路的耗时。 2)如何保证发送消息不丢失 由于分布式环境的复杂性,例如网络不可达时 RocketMQ 客户端发送请求重试机制并不能保证消息发送一定成功。业务方需要捕获异常,并做好冗余保护处理,常见的解决方案有两种: 1. 向调用方返回业务处理失败; 2. 尝试将失败的消息存储到数据库,然后由后台线程定时重试,保证业务逻辑的最终一致性。  3)关注流控异常导致无法重试 触发流控的根本原因是系统容量不足,如果因为突发原因触发消息流控,且客户端内置的重试流程执行失败,则建议执行服务端扩容,将请求调用临时替换到其他系统进行应急处理。 4)早期版本客户端如何使用故障延迟机制进行发送重试? 对于 RocketMQ 4.x 和 3.x 以下客户端开启故障延迟机制可以用: producer.setSendLatencyFaultEnable(true) 配置重试次数使用: producer.setRetryTimesWhenSendFailed() producer.setRetryTimesWhenSendAsyncFailed() 消费者消费重试 消息中间件做异步解耦时的一个典型问题是如果下游服务处理消息事件失败,那应该怎么做呢? RocketMQ 的消息确认机制以及消费重试策略可以帮助分析如下问题: 如何保证业务完整处理消息? 消费重试策略可以在设计实现消费者逻辑时保证每条消息处理的完整性,避免部分消息消费异常导致业务状态不一致。 业务应用异常时处理中的消息状态如何恢复? 当系统出现异常(宕机故障)等场景时,处理中的消息状态如何恢复,消费重试具体行为是什么。 1. 什么是消费重试? 什么时候认为消费失败? 消费者在接收到消息后将调用用户的消费函数执行业务逻辑。如果客户端返回消费失败 ReconsumeLater,抛出非预期异常,或消息处理超时(包括在 PushConsumer 中排队超时),只要服务端服务端一定时间内没收到响应,将认为消费失败。  消费重试是什么? 消费者在消费某条消息失败后,服务端会根据重试策略重新向客户端投递该消息。超过一次定数后若还未消费成功,则该消息将不再继续重试,直接被发送到死信队列中;  重试过程状态机:消息在重试流程中的状态和变化逻辑;  重试间隔:上一次消费失败或超时后,下次重新尝试消费的间隔时间;  最大重试次数:消息可被重试消费的最大次数。   2. 消息重试的场景 需要注意重试是应对异常情况,给予程序再次消费失败消息的机会,不应该被用作常态化的链路。 推荐使用场景: 业务处理失败,失败原因跟当前的消息内容相关,预期一段时间后可执行成功; 是一个小概率事件,对于大批的消息只有很少量的失败,后面的消息大概率会消费成功,是非常态化的。   正例:消费逻辑是扣减库存,极少量商品因为乐观锁版本冲突导致扣减失败,重试一般立刻成功。 错误使用场景: 消费处理逻辑中使用消费失败来做条件判断的结果分流,是不合理的。  反例:订单在数据库中状态已经是已取消,此时如果收到发货的消息,处理时不应返回消费失败,而应该返回成功并标记不用发货。 消费处理中使用消费失败来做处理速率限流,是不合理的。 限流的目的是将超出流量的消息暂时堆积在队列中达到削峰的作用,而不是让消息进入重试链路。 这种做法会让消息反复在服务端和客户端之间传递,增大了系统的开销,主要包括以下方面: RocketMQ 内部重试涉及写放大,每一次重试将生成新的重试消息,大量重试将带来严重的 IO 压力; 重试有复杂的退避逻辑,内部实现为梯度定时器,该定时器本身不具备高吞吐的特性,大量重试将导致重试消息无法及时出队。重试的间隔将不稳定,将导致大量重试消息延后消费,即削峰的周期被大幅度延长。  3. 不要以重试替代限流 上述误用的场景实际上是组合了限流和重试能力来进行削峰,RocketMQ 推荐的削峰最佳手段为组合限流和堆积,业务以保护自身为前提,需要对消费流量进行限流,并利用 RocketMQ 提供的堆积能力将超出业务当前处理的消息滞后消费,以达到削峰的目的。下图中超过处理能力的消息都应该被堆积在服务端,而不是通过消费失败进行重试。 如果不想依赖额外的产品/组件来完成该功能,也可以利用一些本地工具类,比如 Guava 的 RateLimiter 来完成单机限流。如下所示,声明一个 50 QPS 的 RateLimiter,在消费前以阻塞的方式 acquire 一个令牌,获取到即处理消息,未获取到阻塞。 RateLimiter rateLimiter = RateLimiter.create(50); PushConsumer pushConsumer = provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) // 设置订阅组名称 .setConsumerGroup(consumerGroup) // 设置订阅的过滤器 .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .setMessageListener(messageView { // 阻塞直到获得一个令牌,也可以配置一个超时时间 rateLimiter.acquire(); LOGGER.info("Consume message={}", messageView); return ConsumeResult.SUCCESS; }) .build(); 4. PushConsumer 消费重试策略 PushConsumer 消费消息时,消息的几个主要状态如下: Ready:已就绪状态。消息在消息队列RocketMQ版服务端已就绪,可以被消费者消费; Inflight:处理中状态。消息被消费者客户端获取,处于消费中还未返回消费结果的状态; Commit:提交状态。消费成功的状态,消费者返回成功响应即可结束消息的状态机; DLQ:死信状态 消费逻辑的最终兜底机制,若消息一直处理失败并不断进行重试,直到超过最大重试次数还未成功,此时消息不会再重试。 该消息会被投递至死信队列。您可以通过消费死信队列的消息进行业务恢复。 最大重试次数   PushConsumer 的最大重试次数由创建时决定。 例如,最大重试次数为 3 次,则该消息最多可被投递 4 次,1 次为原始消息,3 次为重试投递次数。 重试间隔时间 无序消息(非顺序消息):重试间隔为阶梯时间,具体时间如下: 说明:若重试次数超过 16 次,后面每次重试间隔都为 2 小时。 顺序消息:重试间隔为固定时间,默认为 3 秒。  5. SimpleConsumer 消费重试策略 和 PushConsumer 消费重试策略不同,SimpleConsumer 消费者的重试间隔是预分配的,每次获取消息消费者会在调用 API 时设置一个不可见时间参数 InvisibleDuration,即消息的最大处理时长。若消息消费失败触发重试,不需要设置下一次重试的时间间隔,直接复用不可见时间参数的取值。 由于不可见时间为预分配的,可能和实际业务中的消息处理时间差别较大,可以通过 API 接口修改不可见时间。 例如,预设消息处理耗时最多 20 ms,但实际业务中 20 ms内消息处理不完,可以修改消息不可见时间,延长消息处理时间,避免消息触发重试机制。 修改消息不可见时间需要满足以下条件: 消息处理未超时 消息处理未提交消费状态  如下图所示,消息不可见时间修改后立即生效,即从调用 API 时刻开始,重新计算消息不可见时间。 最大重试次数 与 PushConsumer 相同。 消息重试间隔   消息重试间隔 = 不可见时间 - 消息实际处理时长 例如:消息不可见时间为 30 ms,实际消息处理用了 10 ms 就返回失败响应,则距下次消息重试还需要 20 ms,此时的消息重试间隔即为 20 ms;若直到 30 ms 消息还未处理完成且未返回结果,则消息超时,立即重试,此时重试间隔即为 0 ms。 SimpleConsumer 的消费重试间隔通过消息的不可见时间控制。 //消费示例:使用SimpleConsumer消费普通消息,主动获取消息处理并提交。 ClientServiceProvider provider1 = ClientServiceProvider.loadService(); String topic1 = "Your Topic"; FilterExpression filterExpression1 = new FilterExpression("Your Filter Tag", FilterExpressionType.TAG); SimpleConsumer simpleConsumer = provider1.newSimpleConsumerBuilder() //设置消费者分组。 .setConsumerGroup("Your ConsumerGroup") //设置接入点。 .setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("Your Endpoint").build()) //设置预绑定的订阅关系。 .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .build(); List messageViewList = null; try { //SimpleConsumer需要主动获取消息,并处理。 messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30)); messageViewList.forEach(messageView { System.out.println(messageView); //消费处理完成后,需要主动调用ACK提交消费结果。 //没有ack会被认为消费失败 try { simpleConsumer.ack(messageView); } catch (ClientException e) { e.printStackTrace(); } }); } catch (ClientException e) { //如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。 e.printStackTrace(); } 修改消息的不可见时间   案例:某产品使用消息队列来发送解耦“视频渲染”的业务逻辑,发送方发送任务编号,消费方收到编号后处理任务。由于消费方的业务逻辑耗时较长,消费者重新消费到同一个任务时,该任务未完成,只能返回消费失败。在这种全新的 API 下,用户可以调用可以通过修改不可见时间给消息续期,实现对单条消息状态的精确控制。 simpleConsumer.changeInvisibleDuration(); simpleConsumer.changeInvisibleDurationAsync(); 6. 功能约束与最佳实践 设置消费的最大超时时间和次数   尽快明确的向服务端返回成功或失败,不要以超时(有时是异常抛出)代替消费失败。 不要用重试机制来进行业务限流  错误示例:如果当前消费速度过高触发限流,则返回消费失败,等待下次重新消费。 正确示例:如果当前消费速度过高触发限流,则延迟获取消息,稍后再消费。 发送重试和消费重试会导致相同的消息重复消费,消费方应该有一个良好的幂等设计  正确示例:某系统中消费的逻辑是为某个用户发送短信,该短信已经发送成功了,当消费者应用重复收到该消息,此时应该返回消费成功。 总结 本文主要介绍重试的基本概念,生产者消费者收发消息时触发重试的条件和具体行为,以及 RocketMQ 收发容错的最佳实践。 重试策略帮助我们从随机的、短暂的瞬态故障中恢复,是在容忍错误时,提高可用性的一种强大机制。但请谨记 “重试是对于分布式系统来说自私的”,因为客户端认为其请求很重要,并要求服务端花费更多资源来处理,盲目的重试设计不可取,合理的使用重试可以帮助我们构建更加弹性且可靠的系统。
作者: 斜阳
#行业实践 #功能特性

2022年10月24日

EventBridge 生态实践:融合 SLS 构建一体化日志服务
引言 阿里云日志服务 SLS 是一款优秀的日志服务产品,提供一站式地数据采集、加工、查询与分析、可视化、告警、消费与投递等服务。对于使用 SLS 的用户业务而言,SLS 上存储的日志信息反映着业务的运行状态,通过适当地流转加工即可创建一定价值。 另一方面,阿里云 EventBridge 作为云上事件枢纽,每天承载着大量事件的流转。云上资源的操作事件、消息队列中的数据、用户业务中的自定义事件等,是否有一站式的配置工具来将这些数据统一收敛到 SLS,进而使用 SLS 强大的加工、分析能力也是一个具有价值的问题。 为了支持上述日志、数据流入流出 SLS 的场景,阿里云 EventBridge 在近期支持了 SLS 能力。用户在 EventBridge 上通过简单地配置,即可实现数据写入 SLS 和将 SLS 中日志路由到不同的 EventBridge 目标端。EventBridge 对 SLS 的支持是全面的,用户既可以在事件总线中使用 SLS,也可以在事件流中使用。本文将从 SLS 在 EventBridge上 的使用以及若干最佳实践场景等方面,为大家介绍如何基于 EventBridge 构建 SLS 相关应用。 基于 EventBridge 使用 SLS 阿里云 SLS 日志服务 SLS[1] 是一款云原生观测与分析平台,为 Log、Metric、Trace 等数据提供大规模、低成本、实时的平台化服务,提供数据采集、加工、查询与分析、可视化、告警、消费与投递等功能。 SLS 在 EventBridge 上的应用 阿里云 EventBridge 提供了事件总线[2]与事件流[3]两款不同应用场景的事件路由服务。 事件总线底层拥有事件的持久化能力,可以按照需要将事件经事件规则路由到多个目标。而事件流则更轻量化,对源端产生的事件实时抽取、转换和分析并加载至目标端,无需创建事件总线,端到端转储效率更高,使用更轻便,适用于端到端的流式数据处理场景。SLS 目前对事件总线与事件流均已支持。 针对 SLS 事件源,EventBridge 会构造一个 SLS source connector,其会实时地从 SLS 服务端拉取日志。数据拉取到 EventBridge 后,会进行一定的结构封装,保留用户日志、SLS 系统参数等数据,同时增加 event 所需要的一些系统属性。 SLS Event 样例可参考如下示例。 data 部分代表用户日志内容,其中以“__”开头和结尾的字段表示日志项的 SLS 系统属性。 { "datacontenttype": "application/json;charset=utf8", "aliyunaccountid": "1756789", "data": { "key1": "value1", "key2": "value2", "__topic__": "TopicCategory", "__source__": "SourceCategory", "__client_ip__": "122.231..", "__receive_time__": "1663487595", "__pack_id__": "59b662b2257796280" }, "subject": "acs:log:cnqingdao:1756789:project/demoproject/logstore/logstore1", "aliyunoriginalaccountid": "1756789", "source": "testSLS", "type": "sls:connector", "aliyunpublishtime": "20220918T07:53:15.387Z", "specversion": "1.0", "aliyuneventbusname": "demoBus", "id": "demoprojectlogstore11MTY2MzExODM5ODY4NjAxOTQyMw==0", "time": "20220918T07:53:12Z", "aliyunregionid": "cnqingdao", "aliyunpublishaddr": "10.50.132.112" } 针对 SLS 事件目标,EventBridge 使用 logProducer 将 event 整体作为一个字段投递到 SLS,字段 key 名称为“content”。 使用介绍 SLS 事件源   在使用 SLS 作为事件源时(这里包含了事件总线中的事件源和事件流中的事件源),需要提供以下参数: 日志项目(SLS Project) 日志库(SLS LogStore) 起始消费位点 调用角色  在创建 SLS 事件源时,EventBridge 会自动在对应 LogStore 下创建一个以“eventbridge”开头的消费组,事件源或事件流被删除时,对应消费组资源也会被清理。 日志项目与日志库参数,用户根据已创建的 Project 和 LogStore 去填写即可。 起始消费位点参数指定了新任务启动时的初始消费位点。这里可以选择“最早位点”、“最新位点”与“指定时间”。“最早位点”即从当前 LogStore 中最早的日志开始消费,会导致大量历史日志被读取,建议结合业务谨慎选择;“最新位点”则表示消费对应 EventBridge 任务启动后的日志;“指定时间”需要用户填写时间戳(以秒为单位),消费从此时刻开始的日志。 针对调用角色,其实是允许 EventBridge 以这个角色的身份去调用读取用户 SLS 日志。用户需要创建一个自定义角色,并将其授信给事件总线 EventBridge。角色的权限方面则可以按照需要去进行设置,在权限最小的原则基础上,权限策略提供的角色应保证事件总线 EventBridge 可以读取对应 LogStore 日志与消费组的增删操作,至少赋予角色 LogStore 消费权限与消费组的增删操作。参考示例: { "Version": "1", "Statement": [ { "Action": [ "log:ListShards", "log:GetCursorOrData", "log:GetConsumerGroupCheckPoint", "log:UpdateConsumerGroup", "log:ConsumerGroupHeartBeat", "log:ConsumerGroupUpdateCheckPoint", "log:ListConsumerGroup", "log:CreateConsumerGroup", "log:DeleteConsumerGroup" ], "Resource": [ "acs:log:::project//logstore/", "acs:log:::project//logstore//" ], "Effect": "Allow" } ] } SLS 事件目标   在使用 SLS 作为事件目标时(这里包含了事件总线中的事件目标和事件流中的事件目标),需要提供以下参数: 日志项目(SLS Project) 日志库(SLS LogStore) Topic 调用角色  日志项目、日志库参数含义同 SLS 事件源。Topic 即 SLS 日志主题,用户可以根据需要进行设置,非必填内容。 在创建 SLS 事件目标时,确保使用的调用角色有写入给定日志库权限即可。参考示例: { "Version":"1", "Statement":[ { "Effect":"Allow", "Action":[ "log:PostLogStoreLogs" ], "Resource":[ "acs:log:::project//logstore/" ] } ] } 使用示例 SLS 事件源和事件目标,其事件总线与事件流的参数配置相同,这里示例了如何创建  SLS 事件源和事件目标的 EventBridge 事件流。 前期准备   1. 开通 EventBridge 服务; 2. 开通 SLS 服务并创建 Project 与 Store。 创建 SLS 事件源   1. 登陆 EventBridge 控制台,点击左侧导航栏,选择“事件流”,在事件流列表页点击“创建事件流”; 2. “基本信息”中“事件流名称”与“描述”按照需要填写即可; 3. 在创建事件流,选择事件提供方时,下拉框选择“日志服务 SLS”; 4. 在“日志服务 SLS”一栏中选配置 SLS Project、LogStore、起始消费位点与角色配置。 创建 SLS 事件目标   1. 在创建事件流的事件目标时,服务类型选择“日志服务”; 2. 配置 SLS Project、LogStore、日志主题、日志内容、角色配置等参数。 3. 保存启动即可创建事件流。 最佳实践示例 异步架构完备性校验 在使用消息队列搭建异步应用架构时,会偶发遇到消息丢失的情况,这种情况下的问题排查通常较为麻烦,需要确定问题到底是出在发送端、消费端还是消息队列上,这种场景可以使用 SLS + EventBridge 来进行相关预警和现场保留。 1. 业务 1 发送消息到消息队列,业务 2 异步消费 MQ 中的消息,实现架构解耦; 2. 消息发送端和消费端,在完成消费发送、消费的相关操作后,均将操作日志打印出来,并采集到 SLS 上,日志中可以包含消息 ID 等字段以确保可溯源; 3. 配置 EventBridge 事件流,事件提供方为 SLS,事件接收方为函数计算 FC; 4. FC 中的服务读取 SLS 中日志内容,若发现针对某条消息,若仅有发送日志无消费日志,则说明可能存在漏消息的可能性,需要相关人员及时介入排查。 异常业务异步处理 部分消息队列如 RocketMQ 有死信队列能力,当用户消费失败达到一定次数时,消息会被投递到死信队列。用户也可以使用 SLS + EventBridge 构建业务死信队列,以完成对异常情况的处理。 例如下图是一个电商平台的订单处理系统,当订单处理成功时,相关信息会被写入 DB 或者进行后续操作。但如果订单处理异常用户又不想要阻塞现有订单处理流程,则可以将处理异常订单的流程异步处理。 1. 用户下单/付款,订单系统进行业务处理,处理成功则将数据变更写入 DB; 2. 订单处理异常,记录相关信息日志; 3. 搭建 EventBridge 事件规则。事件源为 SLS,事件目标为函数计算 FC; 4. 当有异常业务日志产生时,日志内容被 SLS 事件源拉取,随后投递到 FC,由专门的服务来处理异常订单。当然,在架构设计时也可以将异常订单信息直接投递到函数计算,但对于大部分业务系统而言,当有异常出现时通常都会进行相关日志的打印,即异常日志大概率是存在的,这个时候使用 SLS + EventBridge 则无需再使用函数计算的发送客户端,仅按需打印日志即可,对业务的侵入性更小。 消息备份 目前阿里云上的消息队列产品种类丰富,用户在使用消息队列实现业务解耦的同时,也会产生对消息内容进行加工分析的需求。SLS 拥有强大的数据加工能力,使用 EventBridge 将消息路由到 SLS,在实现消息备份的同时也可以利用 SLS 的分析加工能力来提升业务的可观测性。 1. 搭建 EventBridge 事件流。事件提供方为各种云上消息队列,事件目标方为日志服务 SLS; 2. 使用 SLS 的能力完成消息的加工、查询、分析与可视化。 自建 SQL 审计 目前 EventBridge 已经支持了 DTS 作为事件源的能力,使用 EventBridge 可以轻松实现构建自定义 SQL 审计的需求。 1. 用户新建 DTS 数据订阅任务,捕获数据库变更; 2. 搭建 EventBridge 事件流,事件提供方为 DTS,事件接收方为日志服务 SLS; 3. 用户需要对 SQL 进行审计时,通过查询 SLS 进行。 _相关链接_ _[1] 日志服务SLS_ _[2] 事件总线_ _[3] 事件流_
作者:昶风
#行业实践 #生态集成

2022年10月20日

解析 RocketMQ 多样消费功能-消息过滤
什么是消息过滤 在消息中间件的使用过程中,一个主题对应的消费者想要通过规则只消费这个主题下具备某些特征的消息,过滤掉自己不关心的消息,这个功能就叫消息过滤。 就如上图所描述的,生产者会向主题中写入形形色色的消息,有橙色的、黄色的、还有灰色的,而这个主题有两个消费者,第一个消费者只想要消费橙色的消息,第二个消费者只想要消费黄色的和灰色的消息,那么这个效果就需要通过消息过滤来实现。 消息过滤的应用场景 我们以常见的电商场景为例,来看看消息过滤在实际应用过程中起到的作用。 电商平台在设计时,往往存在系统拆分细、功能模块多、调用链路长、系统依赖复杂等特点,消息中间件在其中就起到了异步解耦、异步通信的作用,特别是在双十一这样的流量高峰期,消息中间件还起到了削峰填谷的作用。 而在消息中间件使用方面,电商平台因为覆盖的领域众多会产生很多的消息主题,消息收发量也随着交易量和订阅系统的增加而增大。随着业务系统的水平拆解和垂直增加,相关的消息呈现出高订阅比和低投递比的状态,比如一个主题订阅比是 300:1,即 1 个主题的订阅者有 300 个,但是投递比却只有 15:300,即一条消息只有 15 个订阅者需要投递,其他 285 个订阅者全部过滤了这条消息。那解决这些场景,就需要使用到消息过滤。 举例来说,在交易链路中,一个订单的处理流程分为下单、扣减库存、支付等流程,这个流程会涉及订单操作和状态机的变化。下游的系统,如积分、物流、通知、实时计算等,他们会通过消息中间件监听订单的变更消息。但是它们对订单不同操作和状态的消息有着不同的需求,如积分系统只关心下单消息,只要下单就扣减积分。物流系统只关系支付和收货消息,支付就发起物流订单,收货就完成物流订单。实时计算系统会统计订单不同状态的数据,所有消息都要接收。 试想一下如果没有消息过滤这个功能,我们会怎么支持以上消息过滤的功能呢?能想到的一般有以下两个方案: 1. 通过将主题进行拆分,将不同的消息发送到不同主题上。 对于生产者来说,这意味着消费者有多少消费场景,就需要新建多少个 Topic,这无疑会给生产者带来巨大的维护成本。对消费者来说,消费者有可能需要同时订阅多个 Topic,这同样带来了很大的维护成本。另外,消息被主题拆分后,他们之间的消费顺序就无法保证了,比如对于一个订单,它的下单、支付等操作显然是要被顺序处理的。 2. 消费者收到消息后,根据消息体对消息按照规则硬编码自行过滤。 这意味着所有的消息都会推送到消费者端进行计算,这无疑增加了网络带宽,也增加了消费者在内存和 CPU 上的消耗。 有了消息过滤这个功能,生产者只需向一个主题进行投递消息,服务端根据订阅规则进行计算,并按需投递给每个消费者。这样对生产者和消费者的代码维护就非常友好,同时也能很大程度上降低网络带宽,同时减少消费者的内存占用和 CPU 的消耗。 RocketMQ 消息过滤的模式 RocketMQ 是众多消息中间件中为数不多支持消息过滤的系统。这也是其作为业务集成消息首选方案的重要基础之一。 在功能层面,RocketMQ 支持两种过滤方式,Tag 标签过滤和 SQL 属性过滤,下面我来这两个过滤方式使用方式和技术原理进行介绍 Tag 标签过滤 功能介绍 Tag 标签过滤方式是 RocketMQ 提供的基础消息过滤能力,基于生产者为消息设置的 Tag 标签进行匹配。生产者在发送消息时,设置消息的 Tag 标签,消费者按需指定已有的 Tag 标签来进行匹配订阅。 过滤语法 1. 单 Tag 匹配:过滤表达式为目标 Tag,表示只有消息标签为指定目标 Tag 的消息符合匹配条件,会被发送给消费者; 2. 多 Tag 匹配:多个 Tag 之间为或的关系,不同 Tag 间使用两个竖线(||)隔开。例如,Tag1||Tag2||Tag3,表示标签为 Tag1 或 Tag2 或 Tag3 的消息都满足匹配条件,都会被发送给消费者进行消费; 3. 全 Tag 匹配:使用星号()作为全匹配表达式。表示主题下的所有消息都将被发送给消费者进行消费。 使用方式 1. 发送消息,设置 Tag 标签 Message message = provider.newMessageBuilder() .setTopic("TopicA") .setKeys("messageKey") //设置消息Tag,用于消费端根据指定Tag过滤消息 .setTag("TagA") .setBody("messageBody".getBytes()) .build(); 2. 订阅消息,匹配单个 Tag 标签 //只订阅消息标签为“TagA”的消息 FilterExpression filterExpression = new FilterExpression("TagA", FilterExpressionType.TAG); pushConsumer.subscribe("TopicA", filterExpression); 3. 订阅消息,匹配多个 Tag 标签 //只订阅消息标签为“TagA”、“TagB”或“TagC”的消息 FilterExpression filterExpression = new FilterExpression("TagA||TagB||TagC", FilterExpressionType.TAG); pushConsumer.subscribe("TopicA", filterExpression); 4. 订阅消息,匹配所有 Tag 标签,即不过滤 //使用Tag标签过滤消息,订阅所有消息 FilterExpression filterExpression = new FilterExpression("", FilterExpressionType.TAG); pushConsumer.subscribe("TopicA", filterExpression); 技术原理 RocketMQ 在存储消息的时候,是通过 AppendOnly 的方式将所有主题的消息都写在同一个 CommitLog 文件中,这可以有效的提升了消息的写入速率。为了消费时能够快速检索消息,它会在后台启动异步方式将消息所在位点、消息的大小,以及消息的标签哈希值存储到 ConsumeQueue 索引文件中。将标签存储到这个索引文件中,就是为了在通过标签进行消息过滤的时候,可以在索引层面就可以获取到消息的标签,不需要从 CommitLog 文件中读取,这样就减少消息读取产生的系统 IO 和内存开销。标签存储哈希值,主要是为了保证 ConsumeQueue 索引文件能够定长处理,这样可以有效较少存储空间,提升这个索引文件的读取效率。 整个 Tag 标签过滤的流程如下: 1. 生产者对消息打上自己的业务标签,发送给我们的服务端 Broker; 2. Broker 将消息写入 CommitLog 中,然后通过异步线程将消息分发到 ConsumeQueue 索引文件中; 3. 消费者启动后,定时向 Broker 发送心跳请求,将订阅关系上传到 Broker 端,Broker 将订阅关系及标签的哈希值保存在内存中; 4. 消费者向 Broker 拉取消息,Broker 会通过订阅关系和队列去 ConsumeQueue 中检索消息,将订阅关系中的标签哈希值和消息中的标签哈希值做比较,如果匹配就返回给消费者; 5. 消费者收到消息后,会将消息中的标签值和本地订阅关系中标签值做精确匹配,匹配成功才会交给消费线程进行消费。 SQL 属性过滤 功能介绍 SQL 属性过滤是 RocketMQ 提供的高级消息过滤方式,通过生产者为消息设置的属性(Key)及属性值(Value)进行匹配。生产者在发送消息时可设置多个属性,消费者订阅时可设置S QL 语法的过滤表达式过滤多个属性。 过滤语法 1. 数值比较:, =, , IN 3. 判空运算:IS NULL or IS NOT NULL 4. 逻辑运算:AND, OR, NOT 使用方式 1. 发送消息,设置属性 Message message = provider.newMessageBuilder() .setTopic("TopicA") .setKeys("messageKey") //设置消息属性,用于消费端根据指定属性过滤消息。 .addProperty("Channel", "TaoBao") .addProperty("Price", "5999") .setBody("messageBody".getBytes()) .build(); 2. 订阅消息,匹配单个属性 FilterExpression filterExpression = new FilterExpression("Channel='TaoBao'", FilterExpressionType.SQL92); pushConsumer.subscribe("TopicA", filterExpression); 3. 订阅消息,匹配多个属性 FilterExpression filterExpression = new FilterExpression("Channel='TaoBao' AND Price5000", FilterExpressionType.SQL92); pushConsumer.subscribe("TopicA", filterExpression); 4. 订阅消息,匹配所有属性 FilterExpression filterExpression = new FilterExpression("True", FilterExpressionType.SQL92); pushConsumer.subscribe("TopicA", filterExpression); 技术原理   由于 SQL 过滤需要将消息的属性和 SQL 表达式进行匹配,这会对服务端的内存和 CPU 增加很大的开销。为了降低这个开销,RocketMQ 采用了布隆过滤器进行优化。当 Broker 在收到消息后,会预先对所有的订阅者进行 SQL 匹配,并将匹配结果生成布隆过滤器的位图存储在 ConsumeQueueExt 索引扩展文件中。在消费时,Broker 就会使用使用这个过滤位图,通过布隆过滤器对消费者的 SQL 进行过滤,这可以避免消息在一定不匹配的时候,不需要去 CommitLog 中将消息的属性拉取到内存进行计算,可以有效地降低属性和 SQL 进行匹配的消息量,减少服务端的内存和 CPU 开销。 整个 SQL 过滤的处理流程如下: 1. 消费者通过心跳上传订阅关系,Broker 判断如果是 SQL 过滤,就会通过布隆过滤器的算法,生成这个 SQL 对应的布隆过滤匹配参数;  2. 生产者对消息设置上自己的业务属性,发送给我们的服务端 Broker;  3. Broker 收到后将消息写入 CommitLog 中,然后通过异步线程将消息分发到 ConsumeQueue 索引文件中。在写入之前,会将这条消息的属性和当前所有订阅关系中 SQL 进行匹配,如果通过,则将 SQL 对应的布隆过滤匹配参数合并成一个完整的布隆过滤位图;  4. 消费者消费消息的时候,Broker 会先获取预先生成的布隆过滤匹配参数,然后通过布隆过滤器对 ConsumeQueueExt 的布隆过滤位图和消费者的布隆过滤匹配参数进行匹配;  5. 布隆过滤器返回匹配成功只能说明消息属性和 SQL 可能匹配,Broker 还需要从 CommitLog 中将消息属性取出来,再做一次和 SQL 的精确匹配,这个时候匹配成功才会将消息投递给消费者  差异及对比 最佳实践 主题划分及消息定义 主题和消息背后的本质其实就是业务实体的属性、行为或状态发生了变化。只有发生了变化,生产者才会往主题里面发送消息,消费者才需要监听这些的消息,去完成自身的业务逻辑。 那么如何做好主题划分和消息定义呢,我们以订单实体为例,来看看主题划分和消息定义的原则。 主题划分的原则   1. 业务领域是否一致 不同的业务领域背后有不同的业务实体,其属性、行为及状态的定义天差地别。比如商品和订单,他们属于两个完全独立且不同的领域,就不能定义成同一个主题。 2. 业务场景是否一致 同一个业务领域不同的业务场景或者技术场景,不能定义一个主题。如订单流程和订单缓存刷新都和订单有关系,但是订单缓存刷新可能需要被不同的流程触发,放在一起就会导致部分场景订单缓存不刷新的情况。 3. 消息类型是否一致 同一个业务领域和业务场景,对消息类型有不同需求,比如订单处理过程中,我们需要发送一个事务消息,同时也需要发送一个定时消息,那么这两个消息就不能共用一个主题。 消息定义的原则   1. 无标签无属性 对于业务实体极其简单的消息,是可以不需要定义标签和属性,比如 MySQLBinlog 的同步。所有的消费者都没有消息过滤需求的,也无需定义标签和属性。 2. 如何定义标签 标签过滤是 RocketMQ 中使用最简单,且过滤性能最好的一种过滤方式。为了发挥其巨大的优势,可以考虑优先使用。在使用时,我们需要确认这个字段在业务实体和业务流程中是否是唯一定义的,并且它是被绝大多数消费者作为过滤条件的,那么可以将它作为标签来定义。比如订单中有下单渠道和订单操作这两个字段,并且在单次消息发送过程中都是唯一定义,但是订单操作被绝大多数消费者应用为过滤条件,那么它最合适作为标签。 3. 如何定义属性 属性过滤的开销相对比较大,所以只有在标签过滤无法满足时,才推荐使用。比如标签已经被其他字段占用,或者过滤条件不可枚举,需要支持多属性复杂逻辑的过滤,就只能使用属性过滤了。 保持订阅关系一致 订阅关系一致是指同一个消费者组下面的所有的消费者所订阅的 Topic 和过滤表达式都必须完全一致。 正如上图所示,一个消费者组包含两个消费者,他们同时订阅了 TopicA 这个主题,但是消费者一订阅的是 TagA 这个标签的消息,消费者二订阅的是 TagB 这个标签的消息,那么他们两者的订阅关系就存在不一致。 导致的问题: 那么订阅关系不一致会导致什么问题呢? 1. 频繁复杂均衡 在 RocketMQ 实现中,消费者客户端默认每 30 秒向 Broker 发送一次心跳,这个过程会上传订阅关系,Broker 发现变化了就进行订阅关系覆盖,同时会触发客户端进行负载均衡。那么订阅关系不一致的两个客户端会交叉上传自己的订阅关系,从而导致客户端频繁进行负载均衡。 2. 消费速率下降 客户端触发了负载均衡,会导致消费者所持有的消费队列发生变化,出现间断性暂停消息拉取,导致整体消费速率下降,甚至出现消息积压。 3. 消息重复消费 客户端触发了负载均衡,会导致已经消费成功的消息因为消费队列发生变化而放弃向 Broker 提交消费位点。Broker 会认为这条消息没有消费成功而重新向消费者发起投递,从而导致消息重复消费。 4. 消息未消费 订阅关系的不一致,会有两种场景会导致消息未消费。第一种是消费者的订阅关系和 Broker 当前订阅关系不一致,导致消息在 Broker 服务端就被过滤了。第二种是消费者的订阅关系和 Broker 当前的虽然一致,但是 Broker 投递给了其他的消费者,被其他消费者本地过滤了。 使用的建议 在消息过滤使用中,有以下建议: 1. 不要共用消费者组 不同业务系统千万不要使用同一个消费者组订阅同一个主题的消息。一般不同业务系统由不同团队维护,很容易发生一个团队修改了订阅关系而没有通知到其他团队,从而导致订阅关系不一致的情况。 2. 不频繁变更订阅关系 频繁变更订阅关系这种情况比较少,但也存在部分用户实现在线规则或者动态参数来设置订阅关系。这有可能导致订阅关系发生变化,触发客户端负载均衡的情况。 3. 变更做好风险评估 由于业务的发展,需求的变更,订阅关系不可能一直不变,但是变更订阅关系过程中,需要考虑整体发布完成需要的总体时间,以及发布过程中订阅关系不一致而对业务可能带来的风险。 4. 消费做好幂等处理 不管是订阅关系不一致,还是客户端上下线,都会导致消息的重复投递,所以消息幂等处理永远是消息消费的黄金法则。在业务逻辑中,消费者需要保证对已经处理过的消息直接返回成功,避免二次消费对业务造成的损害,如果返回失败就会导致消息一直重复投递直到进死信。 到此,本文关于消息过滤的分享就到此结束了,非常感谢大家能够花费宝贵的时间阅读,有不对的地方麻烦指正,感谢大家对 RocketMQ 的关注。
作者:徒钟
#技术探索 #功能特性
收藏
收藏暂无数据,请从小助手对话框添加
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
无疑 AI答疑专家

loading...

当前服务输出的内容均由人工智能模型生成,其生成内容的准确性和完整性无法保证,不代表我们的态度或观点。
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
专家答疑