2025年7月14日

CommunityOverCode Asia 2025 Messaging 专题预告,Apache RocketMQ 邀您共探相关议题
CommunityOverCode 是 Apache 软件基金会(ASF)的官方全球系列大会,其前身为 ApacheCon。每年的 CommunityOverCode Asia 都吸引着来自全球各个层次的参与者、社区共同探索 "明天的技术"。2025年7月25日至27日, CommunityOverCode Asia 2025 将在北京举办,带来 Apache 项目的最新发展和新兴创新。 本次 CommunityOverCode Asia 2025 的 Messaging 专题,将给大家带来 Apache 社区建设和发展的最新资讯和前沿实践,现在就一起来看看吧! 专题介绍 伴随着云原生 Serverless、物联网、实时数据技术的规模化落地,事件驱动架构、事件流技术得以更广泛的应用,使得消息队列成为越来越重要的基础设施。今天在Apache生态中已经涌现了多个优秀的消息项目,包括:Apache Pulsar、Apache Kafka、Apache RocketMQ、Apache ActiveMQ、Apache Inlong 等等。 在这个主题中,你将了解到不同的消息系统如何基于自身的架构特点做出最优的技术演进方向,包括存算分离、Serverless、消息流一体等;也能学习各大厂商如何结合自身的行业特点、业务场景选择合适的消息技术,获得消息技术的最佳实践。 出品人 翟佳 Apache Pulsar & Apache BookKeeper PMC member,谙流科技的联合创始人兼 CEO。曾是 StreamNative 的联合创始人担任 CTO 和中国区负责人职务。硕士毕业于中国科学院计算技术研究所,之后加入 EMC,从事分布式、文件系统、流存储相关的设计和开发。主要从事开源项目 Apache BookKeeper 和 Apache Pulsar 的设计和开发工作,集中在消息和流存储方向。在分布式、存储、消息等领域有深厚的积累和经验,在相关领域先后获得中美专利 10 余项。 林清山 Apache RocketMQ 联合创始人& PMC member,阿里云消息产品线负责人,阿里云架构组中间件组长。十多年分布式中间件、消息队列研发经验,致力于消息、EDA、事件流处理、云原生等方向的探索和研究。 胡宗棠 中国移动云能力中心,中间件和大数据领域技术专家,移动云中间件/大数据团队负责人。8 年以上消息中间件内核开发和架构设计经历,从无到有参与移动云 RocketMQ、MQTT、Kafka 等多款主流消息中间件系列产品的内核架构和研发,作为技术嘉宾,曾多次参与 Apache Conf Aisa 2022/2023/2024 主论坛/分论坛、ApacheRocketMQ Summit/Meetup、云原生服务大会技术分享,开源项目实践经验丰富担任 Apache RocketMQ、Nacos、openEuler messagemiddleware sig 和 openMessaging 等开源社区的 Maintainer/Committer。信通院 2023 年《云计算开源标准突出贡献专家》,《2024 信通院 OSCAR 尖峰开源人物》,多个开源社区的开源先锋等荣誉。 议程亮点 7 月 25 日 14:00 17:15 演讲议题:Apache RocketMQ Eventbridge|您的 GenAI 为何需要 EDA? 分享时间:7 月 25 日 14:0014:30 议题介绍: 如果您身处 AI 领域,EDA 为 LLM 和 AI Agent 提供的重要帮助不容忽视。本演讲将重点探讨 EDA 为 GenAI 带来的改变: 1. 通过实时 RAG,EDA 让您的 LLM 更加智能。 2. EDA 帮助您更好地使用 LLM。 3. 借助 MCP,EDA 赋能您的 Agent。 4. 为了增强多 Agent 能力,您应该关注 EDA。 此外,本演讲还将展示 Apache RocketMQ Eventbridge 在阿里云上在这些领域的实践和探索。 演讲嘉宾: Lin Shen: Apache RocketMQ PMC Apache RocketMQ PMC成员,阿里云 EventBridge 负责人,专注于 EDA 研究。 演讲议题:Apache Pulsar 在腾讯云上的高可用性最佳实践 分享时间:7 月 25 日 14:3015:00 议题介绍:Apache Pulsar 是一个云原生的分布式消息和流式传输平台。Apache Pulsar 采用存储与计算分离的架构,支持大型集群、多租户、百万级主题、跨区域数据复制、持久化存储、分层存储、高可扩展性等企业级及金融级服务。Apache Pulsar 提供统一的消费模型,同时支持消息队列和流式传输场景。它不仅能为队列场景提供企业级的读写服务质量和强一致性保证,还能为流式传输场景提供高吞吐量和低延迟。 Apache Pulsar 已在企业关键业务中落地,拥有丰富的应用场景。目前,腾讯云也已在生产实践中大规模应用 Apache Pulsar 近 5 年。在生产实践中,腾讯云对 Apache Pulsar 进行了一系列性能优化和稳定性增强,以确保用户在不同场景下系统能够稳定高效地运行。 本次演讲我们将重点讲解腾讯云上 Apache Pulsar 在高可用性方面的最佳实践,包括 Broker 集群和 Bookkeeper 集群的高可用性、Zookeeper 集群的高可用性以及跨集群的高可用性实践,希望能够为开发者提供一些参考。 演讲嘉宾: Mingze Han|tencent 毕业于武汉大学计算机专业。腾讯云高级研发工程师。开源社区爱好者。在美团和腾讯拥有超过 8 年的消息队列相关研发和运营经验。目前主要负责腾讯云 TDMQ for Pulsar 产品的核心部分。同时,我是 Apache Pulsar Contributor 和 RoP Maintenanceer。 演讲议题:使用 Apache Pulsar 构建高可靠订阅和推送服务|挑战与最佳实践 分享时间:7 月 25 日 15:0015:30 议题介绍:分析华为云物联网基于 Apache Pulsar 构建订阅和推送服务的最佳实践。订阅和推送服务看似简单,实则涉及诸多架构决策。本演讲将深入探讨我们如何设计统一的推送服务架构,以确保海量数据分发的高可靠性和稳定性,从而帮助客户高效集成数据流,加速数字化转型。 演讲嘉宾: Zhangjian He|Huawei Cloud IoT Senior Engineer | Huawei Cloud Open Source Team Member Apache BookKeeper&Apache ServiceComb PMC member、Apache Pulsar Committer,华为云工业软件平台云服务产品编码&开源负责人、华为云OSDT成员。 演讲议题:解密 Apache Kafka 的架构创新与多场景最佳实践 分享时间:7 月 25 日 15:4516:15 议题介绍:阿里云 Kafka 基于开源 Kafka 全面升级,融合弹性计算、高性能分布式文件存储和容器服务等云原生基础设施,实现高性能存储计算分离架构。云原生 Kafka 的快速恢复机制大幅降低 RTO,读写隔离机制则充分保障服务质量。低延迟、高吞吐量、极致弹性和便捷的运维能力,使云原生 Kafka 架构足以应对众多业务场景。本次演讲在详细讲解云原生 Kafka 高性能实现原理的同时,还将从车联网、人工智能等领域展现云原生 Kafka 的多场景最佳实践。 演讲嘉宾: Fujian Zhao|Alibaba Senior Software Engineer 2020 年 6 月毕业于东南大学,2020 年 7 月加入阿里巴巴。 演讲议题:Apache Pulsar 在大规模分区环境下的指标收集实践 分享时间:7 月 25 日 16:1516:45 议题介绍:Apache Pulsar 是一个高性能消息队列,支持海量 Topic,允许用户在单个集群内创建数十万甚至数百万个分区。指标是我们在生产环境中识别和诊断问题的关键工具。可观察性的有效性直接影响故障排除的速度。 在分区数量众多的场景下,启用 Topic 级指标可能会在短时间内生成大量指标字符串,从而导致严重的内存波动,并可能导致集群不稳定。 本次演讲将分享我们在此类高分区场景下收集指标的实践经验,并探讨如何在不影响在线内存稳定性的情况下最大限度地收集指标数据,从而实现高效的问题识别和系统监控。 演讲嘉宾: Lin Lin|Huawei, SDE expert, Middleware technology director 华为 SDE 专家,中间件技术总监,曾就职于腾讯、蚂蚁金服 专注于中间件和基础设施领域,拥有超过 10 年的相关经验,致力于打造高品质的基础设施 Apache Pulsar Committer & PMC 成员 演讲议题:Apache Pulsar 在小红书公司线上场景的探索与实践 分享时间:7 月 25 日 16:4517:15 议题介绍:本次分享将比较 Pulsar 与 RocketMQ 的特性,以及 Pulsar 在小红书线上场景中的实现方式(包括特性延迟、多活、压缩等),以及企业获得的实际收益。 演讲嘉宾: 卢世吉|Apache BookKeeper PMC 小红书公司在线MQ负责人,Apache BookKeeper PMC,拥有5年MQ开发经验,致力于打造稳定可靠的基础组件。 Linlin Duan|rednote 消息队列高级研发工程师。 Xiangying Meng | Apache Pulsar PMC 小红书高级研发工程师,拥有 4 年 Apache Pulsar 内核研发与实践落地经验。曾就职于 StreamNative。专注于消息队列领域,持续探索其技术前沿与更多可能性。 7 月 26 日 14:00 17:45 演讲议题:腾讯云上的 Kafka:无缝迁移与灾难恢复 分享时间:7 月 26 日 14:0014:30 议题介绍:本次演讲将介绍如何将自建 Kafka 集群无缝迁移至腾讯云 CKafka 解决方案,无需关注迁移过程中生产消费应用的切换顺序。同时还将详细介绍腾讯云 CKafka 的故障容灾方案。 演讲嘉宾: Shilin Lu|Tencent Cloud Expert Engineer 毕业于天津大学软件工程专业,目前负责腾讯云 Kafka 内核开发与优化工作,拥有 7 年消息中间件开发运维经验。曾在腾讯、字节跳动等公司负责消息中间件研发,具备大规模集群运维经验。同时作为 Apache Kafka 和 RocketMQ 开源项目贡献者,积极参与开源社区建设。 演讲议题:构建云 MQTT 解决方案的经验教训 分享时间:7 月 26 日 14:3015:00 议题介绍:MQTT 已成为物联网 (IoT) 的事实标准,为汽车、制造、电信、石油天然气等行业的众多应用提供支持。 在腾讯云,我们开发了基于共享弹性流日志的稳健云 MQTT 解决方案。在本次演讲中,我们将分享开发过程中的关键见解和经验教训。我们将首先概述整体架构,重点介绍性能优化,并分享最佳实践。接下来,我们将深入探讨如何在共享日志之上实现 MQTT 协议——这种设计可扩展以支持其他消息传递协议。我们还将探讨动态扩展系统以支持数百万连接设备并保持高性能和可靠性的策略。最后,我们将讨论如何将存储和计算分离,并结合专用的 MQTT 负载均衡器,帮助我们降低总体拥有成本 (TCO),并灵活高效地应对流量高峰。 演讲嘉宾: Senze Zhang|Apache RocketMQ Committer Senze Zhang 是一位资深的消息队列系统工程师,在高性能消息解决方案的设计、实现和优化方面拥有丰富的经验,参与部署过 RocketMQ、Kafka、MQTT 等多个业界领先的消息队列平台。 演讲议题:使用 Apache RocketMQ 赋能无服务器消息架构 分享时间:7 月 26 日 15:0015:30 议题介绍:我们最新的研究成果已被 ACM FSE 2025 行业专题收录,Apache RocketMQ 为无服务器消息系统奠定了坚实的基础,解决了传统中间件在可扩展性、成本和元数据方面的挑战。通过解耦存储和计算,Apache RocketMQ 实现了独立的资源扩展,这对于不可预测的云工作负载至关重要。其弹性写入分区消除了单队列吞吐量限制,而轻量级消息队列则以最小的冷启动延迟支持数百万个队列。 我们通过基于 RocketMQ 的 RabbitMQ 实现证明了这一点,该实现在克服 RabbitMQ 扩展限制的同时,保留了完整的协议兼容性。借助 RocketMQ 的架构,我们实现了无限的水平扩展能力,并将元数据管理效率提高了 1000% 以上。该无服务器解决方案已在阿里云上商业化运营。我们的经验表明,RocketMQ 能够将传统消息系统转型为云原生服务,为面向未来的无服务器架构提供了蓝图。 演讲嘉宾: SJuntao Ji|Senior Development Engineer (Alibaba Cloud Computing) 阿里云计算高级中间件研发工程师,负责 RocketMQ 和 RabbitMQ 的功能开发。Apache RocketMQ 开源社区贡献者。在 CCFA 会议或期刊(ASE 23'、FM 24'、FSE 25' 等)发表多篇顶级论文。 演讲议题:基于 Commitlog 和 RocksDB 的 RocketMQ 存储引擎 分享时间:7 月 26 日 15:4516:15 议题介绍:基于 Commitlog 和 RocksDB 的 RocketMQ 存储引擎 延迟消息、事务消息、POP 和索引是 RocketMQ 的核心功能。这些功能涉及内存中的多种状态转换,并且基于文件系统实现。现有实现面临以下问题: 1、基于队列的实现会导致更多的合并操作(例如,POP 消费结果合并、事务消息与 OP 消息合并),从而导致工作流复杂、效率低下且可扩展性差。 2、TimerWheel、TimerLog、Revive 和 Index 文件需要额外的存储空间。 3、内存中过多的中间状态会导致性能问题,例如在 POP 消费过程中,CK、ACK 和 CKMock 等临时对象会对堆内存造成巨大的 GC 压力。 4、大量的磁盘文件和中间状态使分层存储和弹性伸缩变得复杂。 RocksDB 是 Facebook 开发的一款高性能嵌入式键值存储引擎,广泛应用于数据库、日志系统和分布式存储。RocksDB 与 Commitlog 和 RocksDB 统一存储引擎的关键在于将消息数据分离成两部分,在文件系统上 Commitlog 的同时迁移 RocksDB 中的所有索引模型。这种方法具有显著的优势: 1、利用 RocksDB 的快速查找功能减少数据处理中的协调/同步,从而简化复杂性。例如,RocksDB 的键值存储 CRUD 操作可以取代 POP 消费中原有的双队列合并过程。 2、重构存储模型,减少存储数据量,提高存储效率。例如,RocksDB 的索引功能可以完全替代现有的索引功能,无需再保存任何索引文件。 3、使用 RocksDB 作为统一的数据迁移接口,简化了分层存储和弹性伸缩的流程,这比目前的实现方式要早得多。 这种存储架构显著优化了这些核心特性。重构不仅简化了代码库、降低了维护成本,还为 RocketMQ 提供了强大而灵活的解决方案,以应对不断增长的业务需求和复杂的存储环境。 演讲嘉宾: Zhou Li|aliyun 阿里云消息专家,负责阿里巴巴核心消息中间件。 演讲议题:RocketMQ 5.0 中的虚拟队列:增强对 Remoting 协议客户端的向后兼容性 分享时间:7 月 26 日 16:1516:45 议题介绍:Apache RocketMQ 是一个以低延迟、高性能和高可靠性著称的分布式消息与流处理平台。最新发布的 5.0 版本带来了两项重大进步: 1、实现了存储与计算的解耦,进一步提升了系统的可扩展性和云原生适配能力。 2、引入了 POP 消费模式,将负载均衡逻辑从客户端迁移到了 Broker 端。 为适应这些新特性,社区推出了全新的基于 gRPC 协议的客户端。然而,现有通过 Remoting 协议客户端接入 RocketMQ 的用户,若不更新代码和替换客户端 SDK,将无法享受到 5.0 的这些创新能力。为增强对 Remoting 协议客户端的向后兼容性,我们在 RocketMQ 5.0 中提出了虚拟队列方案,并已在腾讯云的实际应用中得到了充分验证。 演讲嘉宾: Shengzhong Liu|Tencent 自 2019 年从东南大学毕业以来,一直在腾讯云担任软件开发工程师,近年来专注于消息队列技术的相关工作。 演讲议题:小米 RocketMQMQTT 最佳实践:提升质量和成本效率之旅 分享时间:7 月 26 日 16:4517:15 议题介绍:本次演讲将介绍小米 MQTT 系统架构的演变,重点介绍其在稳定性、性能和成本优化方面的进步。关键举措包括管理海量主题、确保消息顺序、灾难恢复机制、分层存储解决方案、字典压缩技术、数据集成、容器化等等。 演讲嘉宾: Fan Wang|Xiaomi Message Queue Team Leader, Apache RocketMQ Committer 2018 年加入小米,专注于消息和存储系统,负责小米 MQ、HBase、ElasticSearch 等平台。 点击即可参与报名!
#社区动态

2025年7月8日

朗新科技集团如何用Apache RocketMQ“快、准、狠”破解业务难题?
朗新科技集团:让数字化的世界更美好 朗新科技集团股份有限公司是领先的能源科技企业,长期深耕电力能源领域,通过新一代数字化、人工智能、物联网、电力电子技术等新质生产力,服务城市、产业、生活中的能源场景,推动社会绿色发展。 朗新科技集团初创于 1996 年,总部位于江苏无锡,在国内外设有多个研发中心和分支机构,长期为超过 1.2 万多家政企客户和 4.7 亿多大众生活用户提供技术与运营服务,在电力营销数字化、新能源汽车聚合充电、分布式光伏云以及家庭能源缴费等领域处于全国领先地位。 朗新科技集团持续在相关领域探索创新,推动能源绿色低碳转型,惠及千家万户。作为国家鼓励的重点软件企业,朗新荣获了多项行业权威认证和奖项,连续四年荣登中国新经济企业 500 强榜单,并在多个能源科技细分领域保持领先地位,促进整个行业的繁荣发展。 业务扩张背景下,消息队列面临诸多挑战 朗新科技集团的核心业务之一聚焦于聚合充电场景,专注面向企业(ToB)和政府(ToG)提供充电桩业务。在充电桩系统中,关键事件包括“充电开始”、“充电结束”、“故障告警”等。通过分布式消息队列 RocketMQ 可以实现这些事件消息的异步处理,以增强系统的灵活性和可扩展性。此外,RocketMQ 还承担着传递计费请求、支付状态等消息的重要职责,对于确保整个支付流程顺畅进行至关重要。 然而,随着新能源汽车产业的迅猛发展,新能源汽车保有量激增,充电桩规模以及充电服务需求呈现指数级增长趋势。在此背景下,朗新科技集团积极实施战略扩张,但原先基于阿里云 ECS 自建并维护的开源 RocketMQ 却逐渐暴露出诸多问题,包括运维成本高、系统稳定性不足以及难以应对大规模的数据吞吐量等,这些问题对用户体验造成了显著影响。核心业务痛点如下: 1. 稳定性问题:出现消息丢失现象。ToB 和 ToG 业务对于服务的可用性和数据的可靠性要求极高,消息数据丢失是不可接受的。因为一条充电桩状态消息的丢失,就可能导致用户跑空电却无法充电的问题,对用户体验造成很大影响。 2. 系统架构缺少容灾:充电桩业务对跨可用区、跨地域容灾有迫切需求,随着业务规模增长,以及产业中心的分布式转移规划,明确需要建设跨地域容灾系统。然而,技术团队在多可用区容灾方面的技术储备与经验不够丰富。 3. 运维成本过高:每天业务消息量的波峰波谷明显且差值较大,波谷期资源利用率偏低,容易导致资源浪费,造成成本冗余。此外,临时扩容周期长且需大量人力投入。 共建云消息队列 RocketMQ 版:优势显著,业务难题迎刃而解 稳定可靠&弹性降本 针对业务痛点 1 和 3,朗新决定与阿里云共建云消息队列 RocketMQ 版 5.0 Serverless系列。其作为 RocketMQ 的商业版本,在确保消息收、发的可靠性以及实现数据多副本存储方面,都有卓越的表现。Serverless 系列能够有效应对流量波峰波谷显著的问题,不仅有助于降低资源成本,还减少了实例弹性伸缩和运维的人力投入。带来的核心优势如下: + 提高服务可用性:自建开源 RocketMQ 的 SLA 保障不充分,一旦出现故障,需要运维人员自行处理和恢复等。而云消息队列 RocketMQ 版原生支持多可用区部署,服务可用性最高可达 99.99%。 + 提高数据可靠性:自建开源 RocketMQ 需要运维人员自行管理多副本 HA,运维门槛高。而云消息队列 RocketMQ 版默认支持三副本 HA,提供数据的多级存储,数据可靠性最高可达 10个9。 + 提高资源利用率,降低成本:自建开源 RocketMQ 为了确保能够处理业务峰值流量,需要按照最高需求购买实例规格,容易造成资源浪费。而云消息队列 RocketMQ 版 5.0 Serverless 系列采用动态资源调整策略,根据实时业务负载自动弹性伸缩,按量付费,无需预先估算并配置实例规格。 提高可用性和容错力 针对业务痛点 2,朗新当前自建开源 RocketMQ 采用的是单中心系统架构,当单中心异常时,将影响整个业务系统。为此,朗新计划采用云消息队列 RocketMQ 版建设双活中心,以提升系统的可用性和容错能力。云消息队列 RocketMQ 版提供全球消息备份的容灾能力,能够支持多中心灾备、双活系统架构的系统建设。带来的核心优势如下: + 提高数据可靠性:通过在两地数据中心的消息中间件之间实现全量数据同步备份,提高数据可靠性。 + 增强服务连续性:借助消息服务的两地容灾机制,保证服务高可用性,业务可快速恢复,延续性强。 + 降低开发成本:简化配置和管理,轻松实现两地数据的相互备份,提高效率并节省业务的开发成本。 为何选择云消息队列 RocketMQ 版? 朗新之所以和阿里云共建云消息队列 RocketMQ 版,主要归于以下几个关键因素: + 高可靠性和高可用性:RocketMQ 诞生于阿里巴巴集团,历经多年“双十一”万亿级数据洪峰验证。作为国内领先的云服务提供商之一,阿里云运营着国内规模最大的 RocketMQ 集群,支撑了云上数十万客户的生产应用实践。云消息队列 RocketMQ 版提供 SLA,保障服务的高可用性和数据的高可靠性,为企业核心业务链路保驾护航。 + 支持灾备与双活架构:云消息队列 RocketMQ 版通过成熟的产品化能力和解决方案,助力企业快速构建灾备、双活系统架构。面对数据中心或地域级别的故障时,能够实现业务的快速切换与恢复,从而有效避免业务上的巨大损失,显著增强系统的整体稳定性。 + Serverless 弹性降本:云消息队列 RocketMQ 版 5.0 Serverless 系列采用存储计算分离架构,具备自适应弹性能力,能够高效处理突发流量,并且无需运维,按实际使用量计费。朗新在切换到云消息队列 RocketMQ 版 Serverless 实例后,使用成本相较自建降低了 30%。 展望未来,朗新科技集团将进一步深化与阿里云消息队列团队的合作,依托自身丰富的能源领域技术实践,以及阿里云强大的基础设施、产品能力,携手推进行业数字化进程,促进能源科技行业的发展。
#行业实践

2025年7月8日

C5GAME 游戏饰品交易平台借助 Apache RocketMQ Serverless 保障千万级玩家流畅体验
C5GAME:安全便捷,国内领先的游戏饰品交易平台 C5GAME 游戏饰品交易平台( www.c5game.com )是国内领先的 STEAM 游戏饰品交易的服务平台,专注于 CS:GO 以及 DOTA2 等热门游戏装备 C2C 中介交易。自网站上线以来,C5GAME 凭借其安全便捷的交易和流畅友好的体验,迅速在玩家群体中积攒了良好的口碑,积累了千万级注册用户,实现了累计交易额超过 100 亿元,确立了其在国内游戏饰品交易领域的领先地位。目前 C5GAME 正积极拓展国际市场,致力于打造一个全球化的 STEAM 游戏饰品交易平台,海外用户规模正在迅速扩大。 C5GAME 网站基于 STEAM 官方提供的 API,研发了先进的机器人交易系统,确保玩家在进行游戏饰品买卖与存取时的安全性和便捷性。同时,C5GAME 持续优化用户体验,满足用户日益增长的交易需求,在保障安全的基础上,致力于提供更加智能化、人性化的服务体验。例如,根据用户的实际反馈,C5GAME 自主研发了一套智能检索系统,使平台更加本土化,允许玩家通过简称快速准确地查找所需饰品,极大提升了搜索效率和用户体验。 千万级注册玩家、百亿交易额背后面临的业务挑战 在互联网时代高速发展的浪潮中,游戏行业蓬勃发展,各类游戏如雨后春笋般涌现,并推动了游戏饰品交易行业的爆发式增长。在此背景下,C5GAME 游戏饰品交易平台上的玩家数量和交易量显著增加,同时也带来了一系列挑战: 1. 系统耦合复杂:由于交易系统与多个核心子系统紧密相连,高度耦合的复杂架构增加了系统故障的风险。 2. 活动期稳定性挑战:由于平台频繁推出促销活动,且不定期推出平台用户的补贴活动,这些活动时段会吸引大量用户,导致流量激增,对系统稳定性带来严峻考验。 3. 技术选型难题:选择自建开源中间件可能因资源投入不足而无法满足业务需求,甚至可能带来技术风险。 4. 运维效率提升需求:对于交易核心链路,任何订单异常都需要及时排查处理。因此,构建一个强大且全面的工具体系来支持高效运维尤为重要。 5. 成本控制压力:每天业务消息量的波峰波谷相差较大,为应对高峰期的高并发请求而购买高规格实例,会导致成本过高,在非高峰期时段资源利用率较低,造成大量的资源浪费。 面对上述问题,C5GAME 需要采取有效措施优化系统架构、增强服务稳定性、选择合适的技术方案、加强运维能力以及合理规划资源等,保障业务高效、稳定的同时有效控制成本。 云消息队列 RocketMQ 版:异步解耦、可靠高效、弹性降本 异步通信模型 通过云消息队列 RocketMQ 版的异步消息通信模式,各子系统之间无需建立强耦合的直接连接,调用方只需将请求转换为消息发送至 RocketMQ,一旦消息发送成功,即可视为该异步链路调用完成,剩下的工作 RocketMQ 会负责将事件可靠通知到下游的调用系统,确保任务执行完成。 以下是异步通信模式的主要优势: + 简化系统架构:调用方和被调用方通过 RocketMQ 通信,系统是星型拓扑结构,易于维护和管理。 + 上下游弱耦合:上下游系统之间弱耦合,由 RocketMQ 负责消息缓冲和异步恢复。上下游系统能够独立进行升级和变更,不会互相影响。 + 流量削峰填谷:RocketMQ 具备强大的流量缓冲和整形能力,能够在业务流量高峰期间保护下游系统不被击垮。 异步消息通信模式降低了系统间的依赖度和架构的复杂度,同时提升了整体的稳定性、可靠性和可扩展性。 基于定时消息的事件驱动 在游戏饰品交易中,订单流转过程中经常会存在多个超时状态的任务。这些任务需要得到可靠和及时的处理,强烈依赖于底层系统的分布式调度机制。尤其是在月底的大型促销活动中,大量的预售订单需要定时支付尾款等场景,会产生大量的定时任务。 基于云消息队列 RocketMQ 版的定时消息功能,以其事件驱动的方式,确保了在大促高峰期,处理海量堆积任务时的高性能、高可靠。 RocketMQ 5.0 Serverless 对于自建开源 RocketMQ 集群,为保证业务稳定性,往往需要按照业务请求的峰值去配置集群资源,包括 CPU、内存、存储、网络等。在实际生产中,由于业务消息量的波峰波谷明显,集群资源有大部分时间处于低利用率状态,造成闲置浪费。 云消息队列 RocketMQ 版 5.0 系列 Serverless 实例可以很好地解决这个问题,它能够通过资源快速伸缩实现资源使用量与实际业务负载贴近,并支持按照实际使用量计费,有效降低企业的运维压力和使用成本。 C5GAME 借助 RocketMQ Serverless保障千万级玩家流畅体验 C5GAME 通过采用云消息队列 RocketMQ 版 Serverless 系列,有效解决了现有架构中存在的性能瓶颈,极大增强了交易系统的灵活性和稳定性,有效实现了流量的削峰填谷,显著提升了整体运维效率,确保了千万级玩家能够享受到流畅的游戏交易体验。同时,还帮助 C5GAME 节省了资源和运维成本,使开发团队能够更专注于业务创新,为广大游戏玩家提供更丰富的功能和更友好的体验。 1. 订单系统异步化:通过云消息队列 RocketMQ 版实现订单系统异步化,有效实现流量削峰填谷,增强了系统在活动期间的稳定性。 2. 超时订单处理:使用云消息队列 RocketMQ 版的定时消息功能,应对订单支付超时等复杂场景的处理,简化业务逻辑的复杂度。 3. 运维体系构建:基于云消息队列 RocketMQ 版丰富的 Metrics、Trace 等可观测工具,构建了一整套运维体系,极大提升了日常问题排查和巡检的效率。 4. 资源弹性降本:云消息队列 RocketMQ 版 5.0 serverless 系列提供动态资源调整策略,根据实时业务负载自动弹性伸缩,按量付费,无需预先估算并配置实例规格。C5GAME 在切换到云消息队列 RocketMQ 版 5.0 Serverless 实例后,使用成本相较自建降低了 60%。 展望未来,随着 C5GAME 不断推出创新功能和营销活动,云消息队列 RocketMQ 版将继续助力 C5GAME 为广大游戏玩家提供更流畅、更优质的服务体验。
#行业实践

2025年7月3日

Apache RocketMQ 创新论文入选顶会 ACM FSE 2025
近日,由阿里云消息团队发表的 Apache RocketMQ 创新论文被 CCFA 类软件工程顶级会议 FSE 2025 Industry Track 录用。 ACM FSE(The ACM International Conference on the Foundations of Software Engineering)是享有盛誉的国际学术会议,被《中国计算机学会推荐国际学术会议和期刊目录》列为 CCFA 类软件工程顶级会议。该会议汇聚学术界与工业界专家,聚焦软件工程前沿研究与实践应用,其发表论文以卓越的创新性、重要性和影响力著称,对软件工程领域的发展与创新起到关键推动作用。 此次被录用的论文为《Designing for Scalability: Building a Universal Serverless Messaging Architecture with Apache RocketMQ》。该研究基于 Apache RocketMQ 构建 Serverless 消息系统,研发适配多种主流消息协议(如 RabbitMQ、MQTT 和 Kafka)的代理层,成功解决了传统中间件在可伸缩性、成本及元数据管理等方面的难题。阿里云消息团队据此实现了云消息队列 ApsaraMQ 全系列产品 Serverless 化,让用户专注于业务开发,进一步提效降本。 创新亮点 消息中间件在数字化与智能化时代发挥着至关重要的作用,它提供异步解耦、集成、高性能高可靠等核心价值,支撑分布式系统间的高效通信,优化整体应用性能和资源利用率。从用户视角看,现代消息中间件应以无服务器(Serverless)架构运行,使用户仅需关注消息的发布与消费行为,同时应具备强大的弹性扩展能力以应对业务负载的动态变化。 阿里云基于 Apache RocketMQ 的架构优势,构建了 Serverless 消息系统范式,并提供包括 RabbitMQ、MQTT 和 Kafka 在内的主流消息中间件解决方案。RocketMQ 通过解耦存储和计算,实现计算与存储资源的独立弹性扩展,适用于云环境的不可预测工作负载。此外,RocketMQ 原生支持百万级元数据管理,兼具冷启动、高可用、无限横向扩展等特性,有效满足智能化时代对消息系统在弹性、性能与可靠性等方面的综合需求。 论文中展示了依托 RocketMQ 存储引擎实现 RabbitMQ 无限水平扩展,在兼容完整协议的基础上突破了 RabbitMQ 原有的扩展限制,元数据管理效率、最大吞吐量、堆积能力等相比开源均可以提升超过1000%。阿里云消息团队基于此 Serverless 消息系统范式,实现了云消息队列 ApsaraMQ 全系列产品 Serverless 化,让用户专注于业务开发,进一步提效降本。实践表明,该架构范式能够将传统消息系统转变为高弹性、低成本、高性能的云原生消息服务,为无服务器架构应用提供强大支撑。 附论文信息 录用论文题目:《Designing for Scalability: Building a Universal Serverless Messaging Architecture with Apache RocketMQ》 作者:季俊涛,金融通,傅玉宝,林清山 论文概述:消息中间件在现代世界中发挥着至关重要的作用,它促进了分布式系统之间的无缝通信,并提升了整体应用性能。从用户的角度来看,消息中间件应以无服务器的方式运行,专注于消息的发布和消费。随着业务吞吐量的增加,任何消息中间件都必须具备强大的横向扩展能力。为此,阿里云基于 Apache RocketMQ 实现了无服务器范式,并在这种无服务器形式下开发了多个主流的消息中间件解决方案,如 RabbitMQ、MQTT 和 Kafka。以 RabbitMQ 为案例进行研究,RabbitMQ 具有难以扩展的架构,而我们基于 RocketMQ 的 RabbitMQ 与所有开源客户端兼容,并为单个队列提供无限的吞吐量限制。此外,消息元数据管理能力、削峰填谷能力比开源 RabbitMQ 高出超过 1000%。基于 Apache RocketMQ 的消息中间件无服务器架构预计将在阿里云中支持稳定的商业应用,并有潜力在未来作为大规模消息系统集群的可靠解决方案。
#社区动态

2025年6月18日

乐刻运动:基于 Apache RocketMQ + MQTT 实现健身产业数字化升级
乐刻运动:助推数字经济与健身产业深度融合发展 乐刻运动,2015 年创立于杭州的健身产业互联网平台,以让每个人平等享有运动健康的资源和权利为使命,以每天响应 1 亿人次的运动健康需求为愿景。乐刻以用户运营为核心,构建数智中台,打通场景、用户、教练、服务,对健身产业进行数字化升级改造,提高运营效率和供应链管理能力,搭建健身服务新零售生态,助推数字经济与健身产业深度融合发展。截至 2024 年 12月,乐刻运动已在全国 30 多个城市开设超过 1700 家门店。 乐刻运动在数字化升级中的挑战 在数字化升级过程中,乐刻运动计划在各门店部署物联网设备,覆盖用户进门、签到以及运动器材等场景,以便实时、持续地收集相关信息并对其进行维护管理,从而进一步提升门店运营效率和服务质量。然而,随着业务规模的持续扩大,终端设备数量也在不断增加,带来了以下挑战: 1. 高并发连接与实时监控:鉴于健身房门店众多,且设备种类和数量繁多,需要一个能够支持大规模并发连接的可靠通信架构,确保所有终端设备的状态被及时监测,并迅速响应任何异常情况。 2. 轻量低带宽的消息传输:由于网络资源的限制,在客户端设备与服务端之间的连接上,需要采用一种轻量级、低带宽的消息传输协议,以优化数据传输效率并减少对现有网络基础设施的压力。 3. 高效的消息处理机制:在业务高峰期时,客户端上报数据量大且频率高,需要一套高效的消息处理机制,来避免因服务器应用有限,无法及时消费,而造成消息堆积的问题。 结合 RocketMQ 与 MQTT 的高效解决方案 为应对上述挑战,乐刻运动采用 RocketMQ 与 MQTT 协议相结合的解决方案,显著提升了整体架构的稳定性和可扩展性,提高了消息处理效率,确保了高并发场景下的业务连续性,最终优化了用户体验。 1. MQTT 海量终端数据实时收集:通过 MQTT 协议,系统能够实时收集健身房内各类物联网设备上报的数据,满足高并发需求,确保数据传输的高效与可靠。 2. RocketMQ 消息缓存与负载均衡:将 MQTT 的消息流出挂载到 RocketMQ,通过 RocketMQ 对客户端采集到的大量消息进行消息缓存和负载均衡,从而有效缓解服务端的压力,确保系统的稳定运行。 云消息队列助力乐刻运动数字化升级 在实施上述方案的过程中,乐刻运动选择了阿里云的云消息队列 RocketMQ 版和云消息队列 MQTT 版作为核心消息中间件,这两个产品在实际生产环境中展现出显著的优势和价值。 1. 产品简介 + 云消息队列 RocketMQ 版:云消息队列 RocketMQ 版是阿里云基于 Apache RocketMQ 构建的低延迟、高并发、高可用、高可靠的分布式“消息、事件、流”统一处理平台。 + 云消息队列 MQTT 版:云消息队列 MQTT 版是专为移动互联网(MI)、物联网(IoT)领域设计的消息产品,覆盖直播互动、金融支付、智能餐饮、即时聊天、移动 Apps、智能设备、车联网等多种应用场景;通过对 MQTT、WebSocket 等协议的全面支持,连接端云之间的双向通信,实现 C2C、C2B、B2C 等业务场景之间的消息通信,可支撑千万级设备与消息并发。 2. 实际生产环境中的优势和价值 + 实时数据处理:云消息队列 MQTT 版通过 MQTT 协议实现终端设备与服务器之间的实时通信,利用发布/订阅模式,确保设备状态和用户行为的及时监控和响应,从而实现高效的实时数据处理能力。 + 高度可扩展性:云消息队列 MQTT 版具备强大的横向扩展能力,能够轻松支持海量终端设备和传感器的接入,能够轻松应对健身房规模不断扩大带来的设备接入需求。 + 可靠性和稳定性:云消息队列 MQTT 版的 MQTT 协议支持多种服务质量(QoS)级别,可以根据业务需求选择合适的级别,确保消息可靠传递,防止数据丢失。 + 性能表现卓越:服务端应用之间隐含着对等和任务分摊的关系,云消息队列 RocketMQ 版的集群消费模式提供原生的负载均衡机制,能够提升系统的整体性能,确保高并发场景下的高效稳定运行。 乐刻运动通过采用阿里云的云消息队列 RocketMQ 版和云消息队列 MQTT 版,不仅提升了系统的实时数据处理能力,还增强了系统的可扩展性、可靠性和性能,为业务的持续发展和流畅的用户体验,提供了坚实的技术支持,进一步推动了数字经济与健身产业的深度融合。
#行业实践

2025年6月18日

乐言科技:云原生加速电商行业赋能,基于 Apache RocketMQ 云服务降本 37%
深耕 AI SaaS+,助力数万电商客户数智化转型 上海乐言科技股份有限公司( 以下简称“乐言科技”,官网:https://www.leyantech.com/ )自 2016 年成立以来,专注于利用自然语言处理和深度学习等核心 AI 技术,为电商、金融、医疗、科学等多个垂直领域提供整体解决方案。公司在杭州、广州等地设有分支机构,已成为国内领先的人工智能企业。 深耕行业八年,乐言科技形成了完整的能力栈,发布了“乐言 GPT 大模型”,推进大模型解决方案赋能行业,并与头部品牌合作探索创新应用。公司已申报多个重大项目,获得多项荣誉和认证,并积极参与标准编制,以“引领人工智能技术,为客户创造价值”为使命,持续推动 AI 技术与行业的深度融合。 自研智能客服机器人“乐语助人”日均服务超千万人次 乐言科技致力于提升行业服务效率,核心业务之一是智能客服机器人,面向电商企业提供 AI SaaS+ 服务。其自主研发的电商智能客服机器人“乐语助人”( 官网介绍视频:https://www.leyantech.com/themes/leyan/public/assets/video.mp4 )适用于天猫、淘宝、京东等国内主流电商平台,基于自然语言处理、知识图谱、深度学习等领先的人工智能技术,具备充分的语言理解能力,可以模拟金牌客服的回复逻辑,进行买家咨询接待、业务问题处理、智能推荐、客情维系等工作。在降低人工客服团队营运开支的同时,大幅提升了客服人均接待效率与营销转化率,为电商商家创造了更多利润。 目前,“乐语助人”每天服务超过 2000 万人次,与六万余家电商客户合作,提供 AI SaaS+ 全链路数智化解决方案,助力企业完成数智化转型。上海乐言科技股份有限公司累计 SaaS 软件年收入约十亿元,并积极探索海外市场,推出跨境电商 AIGC 解决方案,服务 400 多万海外店铺。 智能客服机器人业务量激增,自建消息队列面临诸多痛点 在智能客服机器人系统中,“对话消息分发”是核心功能之一,对提高回复效率和处理高并发请求等起到关键作用。 在系统建设初期,由于业务规模较小,开发与运维团队的规模及技术能力有限,乐言科技统一采用自建 Apahce Kafka 作为消息中间件,以实现业务解耦与流量削峰,增强系统的灵活性和可扩展性。同时,Apache Kafka 还作为各数据系统(如 AI、大数据等)之间的数据通道。因此,确保其消息服务流程的顺畅至关重要。 然而,随着业务规模增长和系统复杂度增加,消息处理的精细化需求日益凸显,单一消息中间件架构需额外投入更多技术资源以维持效能,其扩展性与灵活性也逐渐成为系统演进的约束条件。同时,自建 Apache Kafka 集群的运维成本持续攀升,还逐渐暴露出系统稳定性不足、精准投递功能笨重等问题,导致运维压力倍增。 核心痛点如下: + 稳定性和弹性问题:公司核心业务系统共用 Apache Kafka 大集群。不同业务系统对集群的 IO 压力重叠,会造成彼此影响,例如:侧重高吞吐量系统可能会对延迟敏感的系统造成影响。而集群的扩容和缩容需要对分区进行重新均衡,也会对延迟敏感的对话消息造成稳定性影响。 + 运维成本过高:为了应对共用集群带来的影响,公司对 Apache Kafka 集群进行了拆分。然而不同集群每天业务消息量的波峰波谷明显且差值较大,波谷期资源利用率偏低,容易导致资源浪费,造成成本冗余。此外,临时扩容周期长且需大量人力投入。 + 无法精细化消息处理:Apache Kafka 仅充当消息管道,无法根据消息 Tag 进行精准消费和 SQL 过滤。业务系统为满足精准消费的需求,需要增加研发成本,基于 Apache Kafka Topic 进行额外开发,容易出错且灵活性很差,制约了我们新业务模式的展开速度。这在对接大客户的定制化需求时,尤为迫切。 + 消息级别可观测性差:Apache Kafka 无法直接查看每条消息的详情和消费状态,无法满足问题排查和运营支持的需求,需要开发额外工具或系统进行支持。 精准破局:从自建开源消息队列到阿里云消息队列 因此,乐言科技基于消息类型特征与业务逻辑复杂度拆分业务,并精准匹配消息队列选型策略: + 业务解耦与强一致性场景:针对侧重于业务解耦、涉及较多后置逻辑处理的场景(如强一致性、顺序消息等),采用阿里云消息队列 RocketMQ 版 Serverless 系列,以满足高可靠性与确定性需求。 + 实时流处理场景:大数据及日志类实时流处理业务沿用 Apache Kafka 架构,并计划迁移至阿里云消息队列 Kafka 版,以提升资源弹性与成本效益,持续优化技术架构。 对于业务解耦场景,采用云消息队列 RocketMQ 版 Serverless 系列替换自建开源 Apache Kafka,可以实现更高效的精细化消息处理,具体优势如下: 1. 高效实现分布式顺序消息:仅需按照顺序消息的投递 API 和定义顺序消费 Group 组,即可实现分布式顺序消息,相比 Kafka 指定 Partition 投递和消费扩展性强,业务仅需按照所需设置 MessageGroup,实现更灵活,与服务端绑定低。 2. 支持服务端消息过滤:在实际业务场景中,同一个主题下的消息往往会被多个不同的下游业务处理,各下游业务的处理逻辑不同,且只关注自身逻辑需要的消息子集。云消息队列 RocketMQ 版支持 Tag 标签过滤和 SQL 属性过滤,使用云消息队列 RocketMQ 版的消息过滤功能,可以帮助消费者更高效地过滤自己需要的消息集合,避免大量无效消息投递给消费者,降低下游系统处理压力。实现降低客户端的开发工作量和处理流量。 3. Serverless 系列弹性降本:云消息队列 RocketMQ 版 Serverless 系列能够通过资源动态伸缩,实现资源使用量与实际业务负载贴近,并支持按照实际使用量计费,无需按照最高峰值预留资源,有效降低运维的压力和使用成本。 采用云消息队列 RocketMQ 版 Serverless 系列,整体降本 37% 1. 保障业务稳定 通过使用云消息队列 RocketMQ 版 Serverless 系列替换自建开源 Apache Kafka,成功实现业务拆分解耦与流量隔离,有效避免了业务流量冲突导致的中间件并发问题。云消息队列 RocketMQ 版提供 99.99% 服务可用性和容灾保障,显著提升了整体业务的稳定性和连续性。 2. 降低开发成本 借助云消息队列 RocketMQ 版 Serverless 系列的顺序消息与消息过滤能力,将复杂的分布式顺序消息场景简化,有效减少了业务逻辑的复杂性,降低了开发成本。 3. 提升运维效率 基于云消息队列 RocketMQ 版提供的丰富的 Metrics 和 Trace 可观测工具,构建了完整的运维体系,极大提升了日常问题排查和巡检效率。 4. 资源弹性降本 云消息队列 RocketMQ 版 Serverless 系列采用动态资源调整策略,根据实时业务负载自动弹性伸缩,按量付费,无需预先估算并配置实例规格。通过将对话引擎、基础数据服务等业务迁移至云消息队列 RocketMQ 版 Serverless 系列,整体成本相较于之前降低了 37%。 云原生生态深度赋能乐言科技架构升级与创新突破 乐言科技依托云原生架构及阿里云云原生产品体系,实现基础设施与业务解耦以及弹性调度,在提升业务稳定性的同时,显著增加研发效能并降低运维成本,加速电商客户定制化需求交付,推动云计算与 AI 技术在电商领域的深度融合。 + 在大促等流量突增场景中,云原生架构通过秒级自适应弹性扩容,保障业务连续性,结合微服务引擎 MSE Nacos 的自动扩缩容和节点自愈能力,系统抗风险能力显著提升。MSE Nacos 团队基于双版本(社区与商业)维护经验持续优化商业产品的核心能力,比如性能提升、配置标签灰度、推空保护、配置中心的传输和存储加密,进一步提升微服务可用性与安全性。 + 在智能客服场景中,乐言科技采用日志服务 SLS 替代原有的自建日志系统,统一采集与存储多平台的客服沟通记录,以便用于数据分析驱动产品演进。相比自建日志系统,SLS 凭借高可用性与高吞吐量优势,有效解决了业务增长带来的存储成本激增、稳定性不足及人力投入过高等问题,显著降低综合运维成本。同时,为了进一步观测云上资源使用情况,使用企业云监控导出云上监控数据,与实际业务需求相结合,为构建智能化运维体系提供强有力的支撑。 面对 AI 技术发展与海外市场拓展等机遇,乐言科技将深化与阿里云的合作,基于业务需求迭代云原生架构,深度应用云原生产品,助力电商客户实现数智化转型,持续推动 AI 技术在行业应用中的创新突破。 :
#行业实践

2025年6月16日

EventBridge 构建智能化时代的企业级云上事件枢纽
产品演进历程:在技术浪潮中的成长之路 早在 2018 年,Gartner 评估报告便将事件驱动模型(EventDriven Model)列为十大战略技术趋势之一,指出事件驱动架构(EDA,Eventdriven Architectures)将成为微服务架构未来的演进方向。 随着云原生与 Serverless 技术的迅猛发展,2020 年,阿里云重磅发布事件总线 EventBridge,构建了云原生环境下的统一事件枢纽。事件总线 EventBridge 支持阿里云服务、自定义应用、SaaS 应用以标准化、中心化的方式接入,并能够以标准化的 CloudEvents 1.0 协议在这些应用之间路由事件,帮助企业轻松构建松耦合、分布式的事件集成驱动架构。 自 6 月 3 日起,阿里云事件总线 EventBridge 正式商业化。历经五年迭代升级,事件总线 EventBridge 在产品功能和用户体验方面不断优化,积累了丰富的规模化生产实践经验。在数据智能化时代,事件总线 EventBridge 持续深耕企业云上事件集成,适用于各种规模和行业的事件集成场景。通过 Event 桥接各个系统,满足 AI 和企业集成等领域的各种数据集成需求,为企业提供便捷且创新的数据集成解决方案。 产品核心特性:构建企业级事件中枢的关键 从诞生到商业化,事件总线 EventBridge 不断优化产品核心特性,致力于定义企业级事件集成标准,为企业构建云上事件枢纽提供坚实可靠的支撑和保障。 + 稳定与安全:依托海量数据传输及运维经验,提供高稳定性且安全合规的企业集成服务; + 性能与成本:提供高性能且性价比高的企业集成方案,显著降低用户数据集成成本; + 开放与集成:提供丰富的跨产品、跨平台连接能力,促进云产品、应用程序、SaaS 服务相互集成; + 统一事件枢纽:统一事件界面,定义事件标准,打破云产品事件孤岛; + 事件驱动引擎:海量事件源,毫秒级触发能力,加速 EDA/Serverless 架构升级; + 流式事件通道:事件流提供轻量、实时、端到端的流式数据处理,对源端产生的事件进行实时抽取、转换和分析并加载至目标端。 多元应用场景:覆盖各类企业事件集成场景 EDA 事件驱动场景 事件总线 EventBridge 通过事件连接应用程序、云服务和 Serverless 服务,构建事件驱动架构(EDA,Eventdriven Architectures),实现应用与应用、应用与云服务之间的高效连接。 流式 ETL 场景 在企业集成场景中,事件总线 EventBridge 可以作为流式数据管道,提供基础的过滤与转换功能,支持不同数据仓库、数据处理程序、数据分析与处理系统之间的数据同步和跨地域备份,连接不同系统与服务。 AI 数据集成场景 事件总线 EventBridge 提供非结构化数据到结构化数据的链路集成,可处理多种数据源,如关系型数据库、API 数据、文件、ODPS 等,并支持将数据向量化后存储至向量数据库或其他数仓,同时支持数据清洗、转换和规范化。为 RAG 和模型数据准备等场景,提供一站式数据集成服务。 统一事件通知服务 事件总线 EventBridge 提供丰富的云产品事件源与事件的全生命周期管理工具,用户可以直接监听云产品产生的数据,并上报至监控和通知等下游服务,实现高效的事件管理和响应。 商用计费体系:计费模式灵活匹配企业需求 事件总线 EventBridge 已于 6 月 3 日起正式商业化,分为事件总线、事件流两类资源进行计费,计费模式如下: 计费组成说明 事件总线 EventBridge 各资源的计费组成信息请参见下图。 事件总线计费说明 事件总线【1】分为云服务专用事件总线和自定义事件总线。 + 云服务专用事件总线:是系统自动创建用于接收阿里云官方事件源的事件且不可修改的内置事件总线。 + 自定义事件总线:需要你自行创建并管理的事件总线。 事件源单价 | 计费项 | 计费单价 | | | | | 云服务专用总线事件发布 | 免费 | | 自定义总线事件发布 | 5.64元/百万次事件发布 | 每个 64KB 大小的事件计为 1 个事件。 事件目标单价 | 计费项 | 计费单价 | | | | | 阿里云服务目标事件推送/通知 | 免费 | | 自定义目标事件推送 | 1.29元/百万次事件推送 | 每个 64KB 大小的事件计为 1 个事件。 事件流计费说明 事件流【2】是端到端的流式事件通道,适用于端到端的流式数据处理场景。事件流属于收费服务,并支持按事件量计费和按 CU 额度计费,这两种计费【3】方式你只需二选一即可。 按事件量计费 | 计费项 | 计费单价 | | | | | 事件量 | 2.8元/百万条 | | 空置资源占用费 | 1元/天 | + 事件量:事件流拉取上游数据源的事件总量。每个 64KB 大小的事件计为 1 个事件。计算公式:事件量(向上取整)=单个事件大小/64KB。例如:批量投递了 300KB 的单个事件,则按照 5 个事件进行计费。若事件发生重试,则按照配置的重试规则每次重试计为 1 个事件。 + 空置资源占用量:若单个事件流在一个月内没有任何数据流入将会按照计费标准收取空置资源占用费,空置资源占用费将在有数据流入或者删除该事件流后取消计费。 按 CU 配额计费 名词介绍 CU 配额: CU(Capacity Unit)是事件总线任务的容量单位。若采用 CU 规格计费,则每个任务必须至少分配 1CU 作为最小配额。亦可在任务创建时指定允许弹性的最大 CU 规格和最小 CU 规格。 计费项和计费单价 + 计费项:CU 配额 + 计费单价:0.24元/小时/CU 说明 单个 CU 可支持的条件(条件为或): 1. 最大支持每秒事件量(EPS)5000Event/s(实际情况受限于链路上下游性能)。 2. 最大支持的峰值吞吐量(BPS)50MB/s 的容量。 为了方便你快速评估费用,事件总线(EventBridge)提供了价格计算器:事件总线(EventBridge)价格计算器【4】。 如果你在使用事件总线 EventBridge 的过程中有任何反馈或疑问,欢迎加入钉钉用户群(钉钉群号:31481771)与阿里云研发团队即时沟通。 【1】事件总线 https://help.aliyun.com/zh/eventbridge/userguide/eventbusoverview 【2】事件流 https://help.aliyun.com/zh/eventbridge/userguide/eventstreamoverview 【3】计费 https://help.aliyun.com/zh/eventbridge/productoverview/billingofeventstreams 【4】事件总线(EventBridge)价格计算器 https://eventbridge.console.aliyun.com/calculator
#社区动态

2025年6月11日

Apache RocketMQ + “太乙” = 开源贡献新体验
Apache RocketMQ 是 Apache 基金会托管的顶级项目,自 2012 年诞生于阿里巴巴,服务于淘宝等核心交易系统,历经多次双十一万亿级数据洪峰稳定性验证,至今已有十余年发展历程。RocketMQ 致力于构建低延迟、高并发、高可用、高可靠的分布式“消息、事件、流”统一处理平台,覆盖云边端⼀体化数据处理场景,帮助企业和开发者在智能化时代,轻松构建事件驱动架构的云原生应用。 Apache RocketMQ 的茁壮成长离不开全球 800 多位开发者的积极参与和贡献。如今,Apache RocketMQ 开源社区将携手"太乙"平台,共同开启一场开源贡献竞赛,为广大开发者提供一个全新的体验平台和参与机会!新一轮开源竞赛于 6 月 1 日正式启动。 关于“太乙”平台 "太乙"是特色化示范性软件学院年度质量检测指标开源数据的官方唯一指定获取平台,服务于示范性软件学院联盟( https://www.pses.com.cn/home/publicresource )204 家高校成员单位。 “太乙”平台( https://www.taiyi.top/ )是浙江大学软件学院自主研发的开源能力评价与服务系统,提供开发者开源贡献价值评价与开源竞赛等服务,旨在精准衡量开发者的贡献价值、影响力和技能水平。 平台通过系统化分析开发者在开源社区中的各类可量化贡献,构建起定性与定量相结合的全维度评价体系,对开发者进行全面刻画。 + 在定性评价方面,平台从影响力、贡献度、语言能力、项目经验和活跃度五个维度对开发者进行宏观审视; + 在定量评价方面,则依据项目重要性、贡献类型、内容关键性、贡献体量及复杂度等指标,并结合程序语言分析与自然语言处理等技术,提供精准、客观、自动化的价值评估,充分认可每一份贡献。 “太乙”平台联合各大公司与知名社区,全年不间断、滚动式发布开源竞赛。基于科学的价值自动化评价系统,参赛者可根据贡献价值大小等比例获得奖金,甚至获得头部企业的实习与就业机会,太乙系统能够支持低成本、高效率的长周期开源竞赛组织,激发开发者的积极性,推动中国开源生态的发展。 Apache RocketMQ x “太乙” 开源竞赛 __ 本轮竞赛于 2025 年 6 月 1 日 启动,竞赛链接: https://www.taiyi.top/competitiondetails?id=6836d00651c0e4a2bd63770c 我们联合太乙平台,为 Apache RocketMQ 的开发者和学习者提供了一站式引导服务。我们不仅提供了详尽的原理介绍、文档说明以及部署教程,还特别设计了一键式自动搭建体验环境,帮助开发者轻松体验 Apache RocketMQ 的部署流程和消息收发过程。 为了进一步促进开发者融入社区生态,我们还精心准备了一份贡献指南:涵盖了社区生态、入门指引、issue 推荐等等。相信通过参与本次开源竞赛,开发者们能够快速掌握如何在 Apache RocketMQ 开源社区作出贡献,顺利成为一名 Contributor!欢迎开发者们踊跃参与~ 你可以获得的 参与 Apache RocketMQ x “太乙”开源竞赛,你将获得宝贵的开源社区贡献经历、可量化的贡献奖金以及成为 Apache 顶级社区 Contributor/Committer/PMC 的成长机会,这些都将为你未来的职业发展提供强大助力! 为确保开源竞赛奖金分配机制的公平性与合理性,兼顾开发者与项目发起方的共同利益,“太乙”平台采用了一套科学的公式来计算可分配的奖金额度。随着开发者提交贡献的增加,可分配的奖金总额将随之上涨,最高达 5000 元。 此次竞赛活动将进一步促进 Apache RocketMQ 技术生态的繁荣与发展,为参与者创造更多学习、交流与成长的机会。我们诚邀每一位热衷于分布式消息处理技术探索与实践的开发者加入,在实践中不断提升自我,共同推动 Apache RocketMQ 的技术进步与社区发展。 让我们在开源的道路上,携手共进,创造不凡! 点击:了解太乙开源服务系统更多详情
#社区动态

2025年5月29日

Apache RocketMQ 源码解析 —— 秒级定时消息介绍
背景 如今rocketmq的应用场景日益拓宽,延时消息的需求也在增加。原本的特定级别延时消息已经不足以支撑rocketmq灵活的使用场景。因此,我们需要一个支持任意时间的延迟消息feature。 支持任意时间延迟的feature能够让使用者在消息发出时指定其消费时间,在生活与生产中具有非常重要的意义。 目标 1. 支持任意时延的延迟消息。 2. 提供延迟消息可靠的存储方式。 3. 保证延迟消息具有可靠的收发性能。 4. 提供延迟消息的可观测性排查能力。 架构 存储数据结构 本方案主要通过时间轮实现任意时延的定时消息。在此过程中,涉及两个核心的数据结构:TimerLog(存储消息索引)和TimerWheel(时间轮,用于定时消息到时)。 TimerLog,为本RIP中所设计的定时消息的记录文件,Append Only。每条记录包含一个prev_pos,指向前一条定时到同样时刻的记录。每条记录的内容可以包含定时消息本身,也可以只包含定时消息的位置信息。每一条记录包含如下信息: | 名称 | 大小 | 备注 | | | | | | size | 4B | 保存记录的大小 | | prev_pos | 8B | 前一条记录的位置 | | next_Pos | 8B | 后一条记录的位置,暂时为1,作为保留字段 | | magic | 4B | magic value | | delayed_time | 4B | 该条记录的定时时间 | | offset_real | 8B | 该条消息在commitLog中的位置 | | size_real | 4B | 该条消息在commitLog中的大小 | | hash_topic | 4B | 该条消息topic的hash code | | varbody | | 存储可变的body,暂时没有 | TimerWheel是对时刻表的一种抽象,通常使用数组实现。时刻表上的每一秒,顺序对应到数组中的位置,然后数组循环使用。时间轮的每一格,指向了TimerLog中的对应位置,如果这一格的时间到了,则按TimerLog中的对应位置以及prev_pos位置依次读出每条消息。 时间轮一格一格向前推进,配合TimerLog,依次读出到期的消息,从而达到定时消息的目的。时间轮的每一格设计如下: | delayed_time(8B) 延迟时间 | first_pos(8B) 首条位置 | last_pos(8B) 最后位置 | num(4B)消息条数 | | | | | | 上述设计的TimerLog与TimerWheel的协作如下图所示。 pipeline 在存储方面,采用本地文件系统作为可靠的延时消息存储介质。延时消息另存TimerLog文件中。通过时间轮对定时消息进行定位以及存取。针对长时间定时消息,通过消息滚动的方式避免过大的消息存储量。其具体架构如下所示: 从图中可以看出,共有五个Service分别处理定时消息的放置和存储。工作流如下: 1. 针对放置定时消息的service,每50ms从commitLog读取指定主题(TIMER_TOPIC)的定时消息。 1. TimerEnqueueGetService从commitLog读取得到定时主题的消息,并先将其放入enqueuePutQueue。 2. 另一个线程TimerEnqueuePutService将其放入timerLog,更新时间轮的存储内容。将该任务放进时间轮的指定位置。 2. 针对取出定时消息的service,每50ms读取下一秒的slot。有三个线程将读取到的消息重新放回commitLog。 1. 首先,TimerDequeueGetService每50ms读一次下一秒的slot,从timerLog中得到指定的msgs,并放进dequeueGetQueue。 2. 而后TimerDequeueGetMessageService从dequeueGetQueue中取出msg,并将其放入队列中。该队列为待写入commitLog的队列,dequeuePutQueue。 3. 最后TimerDequeuePutMessageService将这个queue中的消息取出,若已到期则修改topic,放回commitlog,否则继续按原topic写回CommitLog滚动。 代码实现 TimerLog与TimerWheel的协作实现 定时消息的核心存储由TimerLog和TimerWheel协同完成。TimerLog作为顺序写入的日志文件,每条记录包含消息在CommitLog中的物理偏移量(offsetPy)和延迟时间(delayed_time)。当消息到达时,TimerEnqueuePutService会将其索引信息追加到TimerLog,并通过prev_pos字段构建链表结构,确保同一时刻的多个消息可被快速遍历。 ```java // TimerLog.java 核心写入逻辑 public long append(byte[] data, int pos, int len) { MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); // 处理文件切换:当当前文件剩余空间不足时,填充空白段并创建新文件 if (len + MIN_BLANK_LEN mappedFile.getFileSize() mappedFile.getWrotePosition()) { ByteBuffer blankBuffer = ByteBuffer.allocate(MIN_BLANK_LEN); blankBuffer.putInt(mappedFile.getFileSize() mappedFile.getWrotePosition()); blankBuffer.putLong(0); // prev_pos置空 blankBuffer.putInt(BLANK_MAGIC_CODE); // 标记空白段 mappedFile.appendMessage(blankBuffer.array()); mappedFile = this.mappedFileQueue.getLastMappedFile(0); // 切换到新文件 } // 写入实际数据并返回物理偏移量 long currPosition = mappedFile.getFileFromOffset() + mappedFile.getWrotePosition(); mappedFile.appendMessage(data, pos, len); return currPosition; } ``` 此代码展示了消息追加的核心流程: 1. 检查当前文件的剩余空间,不足时填充空白段并创建新文件 2. 将消息索引数据写入内存映射文件 3. 返回写入位置的全局偏移量,供时间轮记录 时间轮槽位管理 TimerWheel通过数组结构管理时间槽位,每个槽位记录该时刻的首尾指针和消息数量。当消息入队时,putSlot方法会更新对应槽位的链表结构: ```java // TimerWheel.java 槽位更新逻辑 public void putSlot(long timeMs, long firstPos, long lastPos, int num, int magic) { localBuffer.get().position(getSlotIndex(timeMs) Slot.SIZE); localBuffer.get().putLong(timeMs / precisionMs); // 标准化时间戳 localBuffer.get().putLong(firstPos); // 链表头指针 localBuffer.get().putLong(lastPos); // 链表尾指针 localBuffer.get().putInt(num); // 当前槽位消息总数 localBuffer.get().putInt(magic); // 特殊标记(如滚动/删除) } ``` 该方法的实现细节: + getSlotIndex(timeMs)将时间戳映射到环形数组的索引 + 同时记录首尾指针以实现O(1)复杂度插入 消息入队流程 TimerEnqueueGetService:消息扫描服务 该服务作为定时消息入口,持续扫描CommitLog中的TIMER_TOPIC消息。其核心逻辑通过enqueue方法实现: ```java // TimerMessageStore.java public boolean enqueue(int queueId) { ConsumeQueueInterface cq = this.messageStore.getConsumeQueue(TIMER_TOPIC, queueId); ReferredIterator iterator = cq.iterateFrom(currQueueOffset); while (iterator.hasNext()) { CqUnit cqUnit = iterator.next(); MessageExt msgExt = getMessageByCommitOffset(cqUnit.getPos(), cqUnit.getSize()); // 构造定时请求对象 TimerRequest timerRequest = new TimerRequest( cqUnit.getPos(), cqUnit.getSize(), Long.parseLong(msgExt.getProperty(TIMER_OUT_MS)), System.currentTimeMillis(), MAGIC_DEFAULT, msgExt ); // 放入入队队列(阻塞式) while (!enqueuePutQueue.offer(timerRequest, 3, TimeUnit.SECONDS)) { if (!isRunningEnqueue()) return false; } currQueueOffset++; // 更新消费进度 } } ``` 关键设计点: 1. 增量扫描:通过currQueueOffset记录消费位移,避免重复处理 2. 消息转换:将ConsumeQueue中的索引转换为包含完整元数据的TimerRequest` TimerEnqueuePutService:时间轮写入服务 从队列获取请求后,该服务执行核心的定时逻辑: ```java // TimerMessageStore.java public boolean doEnqueue(long offsetPy, int sizePy, long delayedTime, MessageExt messageExt) { // 计算目标时间槽位 Slot slot = timerWheel.getSlot(delayedTime); // 构造TimerLog记录 ByteBuffer buffer = ByteBuffer.allocate(TimerLog.UNIT_SIZE); buffer.putLong(slot.lastPos); // 前驱指针指向原槽位尾 buffer.putLong(offsetPy); // CommitLog物理偏移 buffer.putInt(sizePy); // 消息大小 buffer.putLong(delayedTime); // 精确到毫秒的延迟时间 // 写入TimerLog并更新时间轮 long pos = timerLog.append(buffer.array()); timerWheel.putSlot(delayedTime, slot.firstPos, pos, slot.num + 1); } ``` 写入优化策略: + 空间预分配:当检测到当前MappedFile剩余空间不足时,自动填充空白段并切换文件 + 链表式存储:通过prev_pos字段构建时间槽位的倒序链表,确保新消息快速插入 + 批量提交:积累多个请求后批量写入,减少文件I/O次数 TimerEnqueuePutService从队列获取消息请求,处理消息滚动逻辑。当检测到延迟超过时间轮窗口时,将消息重新写入并标记为滚动状态: ```java // TimerMessageStore.java 消息滚动处理 boolean needRoll = delayedTime tmpWriteTimeMs = (long) timerRollWindowSlots precisionMs; if (needRoll) { magic |= MAGIC_ROLL; // 调整延迟时间为时间轮窗口中间点,确保滚动后仍有处理时间 delayedTime = tmpWriteTimeMs + (long) (timerRollWindowSlots / 2) precisionMs; } ``` 此逻辑的关键设计: 1. 当延迟时间超过当前时间轮容量时触发滚动 2. 将消息的delayed_time调整为窗口中间点,避免频繁滚动 3. 设置MAGIC_ROLL标记,出队时识别滚动消息 消息出队处理 TimerDequeueGetService:到期扫描服务 该服务以固定频率推进时间指针,触发到期消息处理: ```java // TimerMessageStore.java public int dequeue() throws Exception { // 获取当前时间槽位 Slot slot = timerWheel.getSlot(currReadTimeMs); // 遍历TimerLog链表 long currPos = slot.lastPos; while (currPos != 1) { SelectMappedBufferResult sbr = timerLog.getTimerMessage(currPos); ByteBuffer buf = sbr.getByteBuffer(); // 解析记录元数据 long prevPos = buf.getLong(); long offsetPy = buf.getLong(); int sizePy = buf.getInt(); long delayedTime = buf.getLong(); // 分类处理(普通消息/删除标记) if (isDeleteMarker(buf)) { deleteMsgStack.add(new TimerRequest(offsetPy, sizePy, delayedTime)); } else { normalMsgStack.addFirst(new TimerRequest(offsetPy, sizePy, delayedTime)); } currPos = prevPos; // 前向遍历链表 } // 分发到处理队列 splitAndDispatch(normalMsgStack); splitAndDispatch(deleteMsgStack); moveReadTime(); // 推进时间指针 } ``` TimerDequeuePutMessageService:消息投递服务 TimerDequeuePutMessageService将到期消息重新写入CommitLog。对于滚动消息,会修改主题属性并增加重试计数: ```java // TimerMessageStore.java 消息转换逻辑 MessageExtBrokerInner convert(MessageExt messageExt, long enqueueTime, boolean needRoll) { if (needRoll) { // 增加滚动次数标记 MessageAccessor.putProperty(messageExt, TIMER_ROLL_TIMES, Integer.parseInt(messageExt.getProperty(TIMER_ROLL_TIMES)) + 1 + ""); } // 恢复原始主题和队列ID messageInner.setTopic(messageInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC)); messageInner.setQueueId(Integer.parseInt( messageInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID))); return messageInner; } ``` 此转换过程确保: + 滚动消息保留原始主题信息 + 每次滚动增加TIMER_ROLL_TIMES属性 该服务最终将到期消息重新注入CommitLog: ```java // TimerMessageStore.java public void run() { while (!stopped) { TimerRequest req = dequeuePutQueue.poll(10, TimeUnit.MILLISECONDS); MessageExtBrokerInner innerMsg = convert(req.getMsg(), req.needRoll()); // 消息重投递逻辑 int result = doPut(innerMsg, req.needRoll()); switch (result) { case PUT_OK: // 成功:更新监控指标 timerMetrics.recordDelivery(req.getTopic()); break; case PUT_NEED_RETRY: // 重试:重新放回队列头部 dequeuePutQueue.putFirst(req); break; case PUT_NO_RETRY: // 丢弃:记录错误日志 log.warn("Discard undeliverable message:{}", req); } } } ``` 故障恢复机制 系统重启时通过recover方法重建时间轮状态。关键步骤包括遍历TimerLog文件并修正槽位指针: ```java // TimerMessageStore.java 恢复流程 private long recoverAndRevise(long beginOffset, boolean checkTimerLog) { List mappedFiles = timerLog.getMappedFileQueue().getMappedFiles(); for (MappedFile mappedFile : mappedFiles) { SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0); ByteBuffer bf = sbr.getByteBuffer(); while (position Google Doc: + Shimo:
#技术探索

2025年5月29日

Apache RocketMQ 源码解析 —— Controller 高可用切换架构
一、原理及核心概念浅述 1.1 核心架构 1.2 核心概念 1. controller:负责管理broker间的主备关系,可以挂在namesrv中,不影响namesrv能力,支持独立部署。 2. master/slave:主备身份。 3. syncStateSet:字面意思为“同步状态集合”。当备节点能够及时跟上主节点,则会纳入syncStateSet。 4. epoch:用于记录每一次主备切换时的状态,避免切换后产生数据丢失或者不一致的情况。 为方便理解,在某些过程中可以把controller当作班主任,master作为小组长,slave作为小组成员。同步过程是各位同学向小组长抄作业的过程,位于syncStateSet中的是优秀作业。 二、相关代码文件及说明 核心是“controller+broker+复制过程”,因此分三块进行叙述。 2.1 Controller 该部分代码主要集中在rocketmqcontroller模块下,主要有如下代码文件: + ControllerManager: 负责管理controller,其中存储了许多controller相关配置,并负责了心跳管理等核心功能。(班主任管理条例) + DLederController: Controller的DLedger实现。包含了controller的基本功能。在其中实现了副本信息管理、broker存活情况探测、选举Master等核心功能。(某种班主任) + DefaultBrokerHeartbeatManager: 负责管理broker心跳,其中包含了broker存活情况表,以及在broker下线时的listeners,当副本掉线时,触发重新选举。(点名册) + ReplicasInfoManager: 负责controller中事件的处理。即各种选举事件、更换SyncStateSet事件等等。(小组登记册) + ControllerRequestProcessor: 处理向controller发送的requests,例如让controller选举、向controller注册broker、心跳、更换SyncStateSet等等。(班主任信箱) + DefaultElectPolicy: 选举Master的策略。可以选择从sync状态的副本中选,也可以支持从所有副本中(无论是否同步)的unclean选举。(班规) + ...... 2.2 Broker 该部分代码主要集中在rocketmqbroker模块中,可进入org/apache/rocketmq/broker/controller进行查看: + ReplicasManager: 完成自己作为一个replica的使命——找controller,角色管理,Master更新(Expand/Shrink)SyncStateSet等等。 2.3 复制模块 该部分代码主要集中在rocketmqstore模块中的ha文件夹下: + HAService: 每个Replica必备的的service,负责管理作为主、备的同步任务。 + HAClient: 每个Slave 的HAService中必备的client,负责管理同步任务中的读、写操作。 + HAConnection: 代表在Master中的HA连接,每个connection理论上对应一个slave。在该connection类中存储了传输过程中的诸多内容,包括channel、传输状态、当前传输位点等等信息。 三、核心流程 3.1 心跳 核心CODE:BROKER_HEARTBEAT Broker端: 该部分较简单,带上code向controller发request,不再赘述: BrokerController.sendHeartbeat() brokerOuterAPI.sendHeartbeat() Controller端: 1. 首先由ControllerRequestProcessor接收到code,进入处理逻辑: ```java private RemotingCommand handleBrokerHeartbeat(ChannelHandlerContext ctx, RemotingCommand request) throws Exception { final BrokerHeartbeatRequestHeader requestHeader = (BrokerHeartbeatRequestHeader) request.decodeCommandCustomHeader(BrokerHeartbeatRequestHeader.class); if (requestHeader.getBrokerId() == null) { return RemotingCommand.createResponseCommand(ResponseCode.CONTROLLER_INVALID_REQUEST, "Heart beat with empty brokerId"); } this.heartbeatManager.onBrokerHeartbeat(requestHeader.getClusterName(), requestHeader.getBrokerName(), requestHeader.getBrokerAddr(), requestHeader.getBrokerId(), requestHeader.getHeartbeatTimeoutMills(), ctx.channel(), requestHeader.getEpoch(), requestHeader.getMaxOffset(), requestHeader.getConfirmOffset(), requestHeader.getElectionPriority()); return RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "Heart beat success"); } ``` 2. 之后在onBrokerHeartbeat()中,主要更新controller brokerHeartbeatManager中的brokerLiveTable: ```java public void onBrokerHeartbeat(String clusterName, String brokerName, String brokerAddr, Long brokerId, Long timeoutMillis, Channel channel, Integer epoch, Long maxOffset, Long confirmOffset, Integer electionPriority) { BrokerIdentityInfo brokerIdentityInfo = new BrokerIdentityInfo(clusterName, brokerName, brokerId); BrokerLiveInfo prev = this.brokerLiveTable.get(brokerIdentityInfo); ...... if (null == prev) { this.brokerLiveTable.put(...); log.info("new broker registered, {}, brokerId:{}", brokerIdentityInfo, realBrokerId); } else { prev.setXXX(......) } } ``` 3.2 选举 相关CODE: CONTROLLER_ELECT_MASTER 有如下几种情形可能触发选举: 1. controller主动发起,通过triggerElectMaster(): 1. HeartbeatManager监听到有broker心跳失效。 (班主任发现有小组同学退学了) 2. Controller检测到有一组Replica Set不存在master。(班主任发现有组长虽然在名册里,但是挂了) 2. broker发起将自己选为master,通过ReplicaManager.brokerElect(): 1. Broker向controller查metadata时,没找到master信息。(同学定期检查小组情况,问班主任为啥没小组长) 2. Broker向controller注册完后,仍未从controller获取到master信息。(同学报道后发现没小组长,汇报) 3. 通过tools发起: 1. 通过选举命令ReElectMasterSubCommand发起。(校长直接任命) 上述所有过程,最终均触发: controller.electMaster() replicasInfoManager.electMaster() // 即,所有小组长必须通过班主任任命 ```java public ControllerResult electMaster(final ElectMasterRequestHeader request, final ElectPolicy electPolicy) { ... // 从request中取信息 ... if (syncStateInfo.isFirstTimeForElect()) { // 从未注册,直接任命 newMaster = brokerId; } // 按选举政策选主 if (newMaster == null) { // we should assign this assignedBrokerId when the brokerAddress need to be elected by force Long assignedBrokerId = request.getDesignateElect() ? brokerId : null; newMaster = electPolicy.elect(brokerReplicaInfo.getClusterName(), brokerReplicaInfo.getBrokerName(), syncStateSet, allReplicaBrokers, oldMaster, assignedBrokerId); } if (newMaster != null && newMaster.equals(oldMaster)) { // 老主 == 新主 // old master still valid, change nothing String err = String.format("The old master %s is still alive, not need to elect new master for broker %s", oldMaster, brokerReplicaInfo.getBrokerName()); LOGGER.warn("{}", err); // the master still exist response.setXXX() result.setBody(new ElectMasterResponseBody(syncStateSet).encode()); result.setCodeAndRemark(ResponseCode.CONTROLLER_MASTER_STILL_EXIST, err); return result; } // a new master is elected if (newMaster != null) { // 出现不一样的新主 final int masterEpoch = syncStateInfo.getMasterEpoch(); final int syncStateSetEpoch = syncStateInfo.getSyncStateSetEpoch(); final HashSet newSyncStateSet = new HashSet<(); //设置新的syncStateSet newSyncStateSet.add(newMaster); response.setXXX()... ElectMasterResponseBody responseBody = new ElectMasterResponseBody(newSyncStateSet); } result.setBody(responseBody.encode()); final ElectMasterEvent event = new ElectMasterEvent(brokerName, newMaster); result.addEvent(event); return result; } // 走到这里,说明没有主,选举失败 // If elect failed and the electMaster is triggered by controller (we can figure it out by brokerAddress), // we still need to apply an ElectMasterEvent to tell the statemachine // that the master was shutdown and no new master was elected. if (request.getBrokerId() == null) { final ElectMasterEvent event = new ElectMasterEvent(false, brokerName); result.addEvent(event); result.setCodeAndRemark(ResponseCode.CONTROLLER_MASTER_NOT_AVAILABLE, "Old master has down and failed to elect a new broker master"); } else { result.setCodeAndRemark(ResponseCode.CONTROLLER_ELECT_MASTER_FAILED, "Failed to elect a new master"); } return result; } ``` 3.3 更新SyncStateSet 核心CODE: CONTROLLER_ALTER_SYNC_STATE_SET 1. 由master发起,主动向controller更换syncStateSet(等价于小组长汇报优秀作业) 2. controllerRequestProcessor接收更换syncStateSet的请求,进入handleAlterSyncStateSet()方法: ```java private RemotingCommand handleAlterSyncStateSet(ChannelHandlerContext ctx, RemotingCommand request) throws Exception { final AlterSyncStateSetRequestHeader controllerRequest = (AlterSyncStateSetRequestHeader) request.decodeCommandCustomHeader(AlterSyncStateSetRequestHeader.class); final SyncStateSet syncStateSet = RemotingSerializable.decode(request.getBody(), SyncStateSet.class); final CompletableFuture future = this.controllerManager.getController().alterSyncStateSet(controllerRequest, syncStateSet); if (future != null) { return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS); } return RemotingCommand.createResponseCommand(null); } ``` 3. 之后进入Controller.alterSyncStateSet() replicasInfoManager.alterSyncStateSet()方法: ```java public ControllerResult alterSyncStateSet( final AlterSyncStateSetRequestHeader request, final SyncStateSet syncStateSet, final BrokerValidPredicate brokerAlivePredicate) { final String brokerName = request.getBrokerName(); ... final Set newSyncStateSet = syncStateSet.getSyncStateSet(); final SyncStateInfo syncStateInfo = this.syncStateSetInfoTable.get(brokerName); final BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName); // 检查syncStateSet是否有变化 final Set oldSyncStateSet = syncStateInfo.getSyncStateSet(); if (oldSyncStateSet.size() == newSyncStateSet.size() && oldSyncStateSet.containsAll(newSyncStateSet)) { String err = "The newSyncStateSet is equal with oldSyncStateSet, no needed to update syncStateSet"; ... } // 检查是否是master发起的 if (!syncStateInfo.getMasterBrokerId().equals(request.getMasterBrokerId())) { String err = String.format("Rejecting alter syncStateSet request because the current leader is:{%s}, not {%s}", syncStateInfo.getMasterBrokerId(), request.getMasterBrokerId()); ... } // 检查master的任期epoch是否一致 if (request.getMasterEpoch() != syncStateInfo.getMasterEpoch()) { String err = String.format("Rejecting alter syncStateSet request because the current master epoch is:{%d}, not {%d}", syncStateInfo.getMasterEpoch(), request.getMasterEpoch()); ... } // 检查syncStateSet的epoch if (syncStateSet.getSyncStateSetEpoch() != syncStateInfo.getSyncStateSetEpoch()) { String err = String.format("Rejecting alter syncStateSet request because the current syncStateSet epoch is:{%d}, not {%d}", syncStateInfo.getSyncStateSetEpoch(), syncStateSet.getSyncStateSetEpoch()); ... } // 检查新的syncStateSet的合理性 for (Long replica : newSyncStateSet) { // 检查replica是否存在 if (!brokerReplicaInfo.isBrokerExist(replica)) { String err = String.format("Rejecting alter syncStateSet request because the replicas {%s} don't exist", replica); ... } // 检查broker是否存活 if (!brokerAlivePredicate.check(brokerReplicaInfo.getClusterName(), brokerReplicaInfo.getBrokerName(), replica)) { String err = String.format("Rejecting alter syncStateSet request because the replicas {%s} don't alive", replica); ... } } // 检查是否包含master if (!newSyncStateSet.contains(syncStateInfo.getMasterBrokerId())) { String err = String.format("Rejecting alter syncStateSet request because the newSyncStateSet don't contains origin leader {%s}", syncStateInfo.getMasterBrokerId()); ... } // 更新epoch int epoch = syncStateInfo.getSyncStateSetEpoch() + 1; ... // 生成事件,替换syncStateSet final AlterSyncStateSetEvent event = new AlterSyncStateSetEvent(brokerName, newSyncStateSet); ... } ``` 4. 最后通过syncStateInfo.updateSyncStateSetInfo(),更新syncStateSetInfoTable.get(brokerName)得到的syncStateInfo信息(该过程可以理解为班主任在班级分组册上找到了组长的名字,拿出组员名单,更新)。 3.4 复制 该部分较复杂,其中HAService/HAClient/HAConnection以及其中的各种Service/Reader/Writer容易产生混淆,对阅读造成阻碍。因此绘制本图帮助理解(可在粗读源码后回头理解): 下面对HA复制过程作拆解,分别讲解: 1. 在各个replica的DefaultMessageStore中均注册了HAService,负责管理HA的复制。 2. 在Master的 HAService中有一个AcceptSocketService, 负责自动接收各个slave的连接: ```java protected abstract class AcceptSocketService extends ServiceThread { ... / Starts listening to slave connections. @throws Exception If fails. / public void beginAccept() throws Exception { ... } @Override public void shutdown(final boolean interrupt) { ... } @Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { this.selector.select(1000); Set selected = this.selector.selectedKeys(); if (selected != null) { for (SelectionKey k : selected) { if (k.isAcceptable()) { SocketChannel sc = ((ServerSocketChannel) k.channel()).accept(); if (sc != null) { DefaultHAService.log.info("HAService receive new connection, " + sc.socket().getRemoteSocketAddress()); try { HAConnection conn = createConnection(sc); conn.start(); DefaultHAService.this.addConnection(conn); } catch (Exception e) { log.error("new HAConnection exception", e); sc.close(); } } } ... } } ``` 3. 在各个Slave 的HAService中存在一个HAClient,负责向master发起连接、传输请求。 ```java public class AutoSwitchHAClient extends ServiceThread implements HAClient { ... } public interface HAClient { void start(); void shutdown(); void wakeup(); void updateMasterAddress(String newAddress); void updateHaMasterAddress(String newAddress); String getMasterAddress(); String getHaMasterAddress(); long getLastReadTimestamp(); long getLastWriteTimestamp(); HAConnectionState getCurrentState(); void changeCurrentState(HAConnectionState haConnectionState); void closeMaster(); long getTransferredByteInSecond(); } ``` 4. 当master收到slave的连接请求后,将会创建一个HAConnection,负责收发内容。 ```java public interface HAConnection { void start(); void shutdown(); void close(); SocketChannel getSocketChannel(); HAConnectionState getCurrentState(); String getClientAddress(); long getTransferredByteInSecond(); long getTransferFromWhere(); long getSlaveAckOffset(); } ``` 5. Master的HAConnection会与Slave的HAClient建立连接,二者均通过HAWriter(较简单,不解读,位于HAWriter类)往socket中写内容,再通过HAReader读取socket中的内容。只不过一个是HAServerReader,一个是HAClientReader: ```java public abstract class AbstractHAReader { private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); protected final List readHookList = new ArrayList<(); public boolean read(SocketChannel socketChannel, ByteBuffer byteBufferRead) { int readSizeZeroTimes = 0; while (byteBufferRead.hasRemaining()) { ... boolean result = processReadResult(byteBufferRead); ... } } ... protected abstract boolean processReadResult(ByteBuffer byteBufferRead); } ``` 6. 两种HAReader均实现了processReadResult()方法,负责处理从socket中得到的数据。client需要详细阐述该方法,因为涉及到如何将读进来的数据写入commitlog,client的processReadResult(): ```java @Override protected boolean processReadResult(ByteBuffer byteBufferRead) { int readSocketPos = byteBufferRead.position(); try { while (true) { ... switch (AutoSwitchHAClient.this.currentState) { case HANDSHAKE: { ... // 握手阶段,先检查commitlog完整性,截断 } break; case TRANSFER: { // 传输阶段,将body写入commitlog ... byte[] bodyData = new byte[bodySize]; ... if (bodySize 0) { // 传输阶段,将body写入commitlog AutoSwitchHAClient.this.messageStore.appendToCommitLog(masterOffset, bodyData, 0, bodyData.length); } haService.updateConfirmOffset(Math.min(confirmOffset, messageStore.getMaxPhyOffset())); ... break; } default: break; } if (isComplete) { continue; } } // 检查buffer中是否还有数据, 如果有, compact() ... break; } } ... } ``` 7. server的processReadResult()主要用于接收client的握手等请求,较简单。更需要解释其WriteSocketService如何向socket中调用HAwriter去写数据: ```java abstract class AbstractWriteSocketService extends ServiceThread { ... private void transferToSlave() throws Exception { ... int size = this.getNextTransferDataSize(); if (size 0) { ... buildTransferHeaderBuffer(this.transferOffset, size); this.lastWriteOver = this.transferData(size); } else { // 无需传输,直接更新caught up的时间 AutoSwitchHAConnection.this.haService.updateConnectionLastCaughtUpTime(AutoSwitchHAConnection.this.slaveId, System.currentTimeMillis()); haService.getWaitNotifyObject().allWaitForRunning(100); } } @Override public void run() { AutoSwitchHAConnection.LOGGER.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { this.selector.select(1000); switch (currentState) { case HANDSHAKE: // Wait until the slave send it handshake msg to master. // 等待slave的握手请求,并进行回复 break; case TRANSFER: ... transferToSlave(); break; default: ... } } catch (Exception e) { ... } } ... // 在service结束后的一些事情 } ... } ``` 此处同样附上server实现processReadResult(),读socket中数据的代码: ```java @Override protected boolean processReadResult(ByteBuffer byteBufferRead) { while (true) { ... HAConnectionState slaveState = HAConnectionState.values()[byteBufferRead.getInt(readPosition)]; switch (slaveState) { case HANDSHAKE: // 收到了client的握手 ... LOGGER.info("Receive slave handshake, slaveBrokerId:{}, isSyncFromLastFile:{}, isAsyncLearner:{}", AutoSwitchHAConnection.this.slaveId, AutoSwitchHAConnection.this.isSyncFromLastFile, AutoSwitchHAConnection.this.isAsyncLearner); break; case TRANSFER: // 收到了client的transfer状态 ... // 更新client状态信息 break; default: ... } ... } ``` 3.5 Active Controller的选举 该选举主要通过DLedger实现,在DLedgerController中通过RoleChangeHandler.handle()更新自身身份: ```java class RoleChangeHandler implements DLedgerLeaderElector.RoleChangeHandler { private final String selfId; private final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("DLedgerControllerRoleChangeHandler_")); private volatile MemberState.Role currentRole = MemberState.Role.FOLLOWER; public RoleChangeHandler(final String selfId) { this.selfId = selfId; } @Override public void handle(long term, MemberState.Role role) { Runnable runnable = () { switch (role) { case CANDIDATE: this.currentRole = MemberState.Role.CANDIDATE; // 停止扫描inactive broker任务 ... case FOLLOWER: this.currentRole = MemberState.Role.FOLLOWER; // 停止扫描inactive broker任务 ... case LEADER: { log.info("Controller {} change role to leader, try process a initial proposal", this.selfId); int tryTimes = 0; while (true) { // 将会开始扫描inactive brokers ... break; } } }; this.executorService.submit(runnable); } ... } ```
#技术探索