2025年6月16日

EventBridge 构建智能化时代的企业级云上事件枢纽
产品演进历程:在技术浪潮中的成长之路 早在 2018 年,Gartner 评估报告便将事件驱动模型(EventDriven Model)列为十大战略技术趋势之一,指出事件驱动架构(EDA,Eventdriven Architectures)将成为微服务架构未来的演进方向。 随着云原生与 Serverless 技术的迅猛发展,2020 年,阿里云重磅发布事件总线 EventBridge,构建了云原生环境下的统一事件枢纽。事件总线 EventBridge 支持阿里云服务、自定义应用、SaaS 应用以标准化、中心化的方式接入,并能够以标准化的 CloudEvents 1.0 协议在这些应用之间路由事件,帮助企业轻松构建松耦合、分布式的事件集成驱动架构。 自 6 月 3 日起,阿里云事件总线 EventBridge 正式商业化。历经五年迭代升级,事件总线 EventBridge 在产品功能和用户体验方面不断优化,积累了丰富的规模化生产实践经验。在数据智能化时代,事件总线 EventBridge 持续深耕企业云上事件集成,适用于各种规模和行业的事件集成场景。通过 Event 桥接各个系统,满足 AI 和企业集成等领域的各种数据集成需求,为企业提供便捷且创新的数据集成解决方案。 产品核心特性:构建企业级事件中枢的关键 从诞生到商业化,事件总线 EventBridge 不断优化产品核心特性,致力于定义企业级事件集成标准,为企业构建云上事件枢纽提供坚实可靠的支撑和保障。 + 稳定与安全:依托海量数据传输及运维经验,提供高稳定性且安全合规的企业集成服务; + 性能与成本:提供高性能且性价比高的企业集成方案,显著降低用户数据集成成本; + 开放与集成:提供丰富的跨产品、跨平台连接能力,促进云产品、应用程序、SaaS 服务相互集成; + 统一事件枢纽:统一事件界面,定义事件标准,打破云产品事件孤岛; + 事件驱动引擎:海量事件源,毫秒级触发能力,加速 EDA/Serverless 架构升级; + 流式事件通道:事件流提供轻量、实时、端到端的流式数据处理,对源端产生的事件进行实时抽取、转换和分析并加载至目标端。 多元应用场景:覆盖各类企业事件集成场景 EDA 事件驱动场景 事件总线 EventBridge 通过事件连接应用程序、云服务和 Serverless 服务,构建事件驱动架构(EDA,Eventdriven Architectures),实现应用与应用、应用与云服务之间的高效连接。 流式 ETL 场景 在企业集成场景中,事件总线 EventBridge 可以作为流式数据管道,提供基础的过滤与转换功能,支持不同数据仓库、数据处理程序、数据分析与处理系统之间的数据同步和跨地域备份,连接不同系统与服务。 AI 数据集成场景 事件总线 EventBridge 提供非结构化数据到结构化数据的链路集成,可处理多种数据源,如关系型数据库、API 数据、文件、ODPS 等,并支持将数据向量化后存储至向量数据库或其他数仓,同时支持数据清洗、转换和规范化。为 RAG 和模型数据准备等场景,提供一站式数据集成服务。 统一事件通知服务 事件总线 EventBridge 提供丰富的云产品事件源与事件的全生命周期管理工具,用户可以直接监听云产品产生的数据,并上报至监控和通知等下游服务,实现高效的事件管理和响应。 商用计费体系:计费模式灵活匹配企业需求 事件总线 EventBridge 已于 6 月 3 日起正式商业化,分为事件总线、事件流两类资源进行计费,计费模式如下: 计费组成说明 事件总线 EventBridge 各资源的计费组成信息请参见下图。 事件总线计费说明 事件总线【1】分为云服务专用事件总线和自定义事件总线。 + 云服务专用事件总线:是系统自动创建用于接收阿里云官方事件源的事件且不可修改的内置事件总线。 + 自定义事件总线:需要你自行创建并管理的事件总线。 事件源单价 | 计费项 | 计费单价 | | | | | 云服务专用总线事件发布 | 免费 | | 自定义总线事件发布 | 5.64元/百万次事件发布 | 每个 64KB 大小的事件计为 1 个事件。 事件目标单价 | 计费项 | 计费单价 | | | | | 阿里云服务目标事件推送/通知 | 免费 | | 自定义目标事件推送 | 1.29元/百万次事件推送 | 每个 64KB 大小的事件计为 1 个事件。 事件流计费说明 事件流【2】是端到端的流式事件通道,适用于端到端的流式数据处理场景。事件流属于收费服务,并支持按事件量计费和按 CU 额度计费,这两种计费【3】方式你只需二选一即可。 按事件量计费 | 计费项 | 计费单价 | | | | | 事件量 | 2.8元/百万条 | | 空置资源占用费 | 1元/天 | + 事件量:事件流拉取上游数据源的事件总量。每个 64KB 大小的事件计为 1 个事件。计算公式:事件量(向上取整)=单个事件大小/64KB。例如:批量投递了 300KB 的单个事件,则按照 5 个事件进行计费。若事件发生重试,则按照配置的重试规则每次重试计为 1 个事件。 + 空置资源占用量:若单个事件流在一个月内没有任何数据流入将会按照计费标准收取空置资源占用费,空置资源占用费将在有数据流入或者删除该事件流后取消计费。 按 CU 配额计费 名词介绍 CU 配额: CU(Capacity Unit)是事件总线任务的容量单位。若采用 CU 规格计费,则每个任务必须至少分配 1CU 作为最小配额。亦可在任务创建时指定允许弹性的最大 CU 规格和最小 CU 规格。 计费项和计费单价 + 计费项:CU 配额 + 计费单价:0.24元/小时/CU 说明 单个 CU 可支持的条件(条件为或): 1. 最大支持每秒事件量(EPS)5000Event/s(实际情况受限于链路上下游性能)。 2. 最大支持的峰值吞吐量(BPS)50MB/s 的容量。 为了方便你快速评估费用,事件总线(EventBridge)提供了价格计算器:事件总线(EventBridge)价格计算器【4】。 如果你在使用事件总线 EventBridge 的过程中有任何反馈或疑问,欢迎加入钉钉用户群(钉钉群号:31481771)与阿里云研发团队即时沟通。 【1】事件总线 https://help.aliyun.com/zh/eventbridge/userguide/eventbusoverview 【2】事件流 https://help.aliyun.com/zh/eventbridge/userguide/eventstreamoverview 【3】计费 https://help.aliyun.com/zh/eventbridge/productoverview/billingofeventstreams 【4】事件总线(EventBridge)价格计算器 https://eventbridge.console.aliyun.com/calculator
#社区动态

2025年6月11日

Apache RocketMQ + “太乙” = 开源贡献新体验
Apache RocketMQ 是 Apache 基金会托管的顶级项目,自 2012 年诞生于阿里巴巴,服务于淘宝等核心交易系统,历经多次双十一万亿级数据洪峰稳定性验证,至今已有十余年发展历程。RocketMQ 致力于构建低延迟、高并发、高可用、高可靠的分布式“消息、事件、流”统一处理平台,覆盖云边端⼀体化数据处理场景,帮助企业和开发者在智能化时代,轻松构建事件驱动架构的云原生应用。 Apache RocketMQ 的茁壮成长离不开全球 800 多位开发者的积极参与和贡献。如今,Apache RocketMQ 开源社区将携手"太乙"平台,共同开启一场开源贡献竞赛,为广大开发者提供一个全新的体验平台和参与机会!新一轮开源竞赛于 6 月 1 日正式启动。 关于“太乙”平台 "太乙"是特色化示范性软件学院年度质量检测指标开源数据的官方唯一指定获取平台,服务于示范性软件学院联盟( https://www.pses.com.cn/home/publicresource )204 家高校成员单位。 “太乙”平台( https://www.taiyi.top/ )是浙江大学软件学院自主研发的开源能力评价与服务系统,提供开发者开源贡献价值评价与开源竞赛等服务,旨在精准衡量开发者的贡献价值、影响力和技能水平。 平台通过系统化分析开发者在开源社区中的各类可量化贡献,构建起定性与定量相结合的全维度评价体系,对开发者进行全面刻画。 + 在定性评价方面,平台从影响力、贡献度、语言能力、项目经验和活跃度五个维度对开发者进行宏观审视; + 在定量评价方面,则依据项目重要性、贡献类型、内容关键性、贡献体量及复杂度等指标,并结合程序语言分析与自然语言处理等技术,提供精准、客观、自动化的价值评估,充分认可每一份贡献。 “太乙”平台联合各大公司与知名社区,全年不间断、滚动式发布开源竞赛。基于科学的价值自动化评价系统,参赛者可根据贡献价值大小等比例获得奖金,甚至获得头部企业的实习与就业机会,太乙系统能够支持低成本、高效率的长周期开源竞赛组织,激发开发者的积极性,推动中国开源生态的发展。 Apache RocketMQ x “太乙” 开源竞赛 __ 本轮竞赛于 2025 年 6 月 1 日 启动,竞赛链接: https://www.taiyi.top/competitiondetails?id=6836d00651c0e4a2bd63770c 我们联合太乙平台,为 Apache RocketMQ 的开发者和学习者提供了一站式引导服务。我们不仅提供了详尽的原理介绍、文档说明以及部署教程,还特别设计了一键式自动搭建体验环境,帮助开发者轻松体验 Apache RocketMQ 的部署流程和消息收发过程。 为了进一步促进开发者融入社区生态,我们还精心准备了一份贡献指南:涵盖了社区生态、入门指引、issue 推荐等等。相信通过参与本次开源竞赛,开发者们能够快速掌握如何在 Apache RocketMQ 开源社区作出贡献,顺利成为一名 Contributor!欢迎开发者们踊跃参与~ 你可以获得的 参与 Apache RocketMQ x “太乙”开源竞赛,你将获得宝贵的开源社区贡献经历、可量化的贡献奖金以及成为 Apache 顶级社区 Contributor/Committer/PMC 的成长机会,这些都将为你未来的职业发展提供强大助力! 为确保开源竞赛奖金分配机制的公平性与合理性,兼顾开发者与项目发起方的共同利益,“太乙”平台采用了一套科学的公式来计算可分配的奖金额度。随着开发者提交贡献的增加,可分配的奖金总额将随之上涨,最高达 5000 元。 此次竞赛活动将进一步促进 Apache RocketMQ 技术生态的繁荣与发展,为参与者创造更多学习、交流与成长的机会。我们诚邀每一位热衷于分布式消息处理技术探索与实践的开发者加入,在实践中不断提升自我,共同推动 Apache RocketMQ 的技术进步与社区发展。 让我们在开源的道路上,携手共进,创造不凡! 点击:了解太乙开源服务系统更多详情
#社区动态

2025年5月12日

开源之夏 2025|Apache RocketMQ 社区项目期待你的参与!
开源之夏 2025 开源之夏是由中国科学院软件研究所“开源软件供应链点亮计划”发起并长期支持的一项暑期开源活动,旨在鼓励在校学生积极参与开源软件的开发维护,培养和发掘更多优秀的开发者,促进优秀开源软件社区的蓬勃发展,助力开源软件供应链建设。开源之夏于 2020 年正式发起,开源之夏 2025 是第六届活动。 活动联合各大开源社区,针对重要开源软件的开发与维护提供项目开发任务,并向全球高校学生开放报名。学生可自主选择感兴趣的项目进行申请,中选后在项目开发者(社区导师)的指导下进行开发。 通过参与活动,不仅可以结识开源界小伙伴和技术大牛,获得社区导师的专业指导,与开源项目开发者深度交流,还能获得丰富的项目实践经验,提升项目开发技能,为学习深造提供助力,为职业发展积攒履历。 此外,根据项目的难易程度和完成情况,结项者将获取开源之夏活动劳务报酬和结项证书。项目难度分为基础和进阶两档,对应结项劳务报酬分别为:税前 8000 元人民币和税前 12000 元人民币。 Apache RocketMQ 社区项目 Apache RocketMQ 于 2012 年诞生于阿里巴巴核心电商系统,于 2016 年捐赠给 Apache 基金会,于 2017 年成为 Apache 顶级项目。现 Apache RocketMQ 致力于构建低延迟、低成本、高可用的分布式“消息、事件、流”统一处理平台,覆盖“云、边、端”⼀体化数据处理场景,帮助企业和开发者在智能化时代,轻松构建事件驱动架构的云原生 / AI 原生应用。 本次开源之夏共提供 2 个课题项目: 1. RocketMQ 主备副本利用系统内置 Topic 完成元数据增量同步,项目社区导师:金融通 2. RocketMQ 路由反向更新机制,项目社区导师:ShannonDing 如何参与项目? 欢迎扫描上方海报二维码,查看 Apache RocketMQ 社区项目详情,其中有项目导师的姓名与联系邮箱,可通过邮件与导师进行沟通,并准备项目申请材料、提交项目申请,每位同学可以申请一个项目。 以下是开源之夏的活动流程,更多参与指南请查看
#社区动态

2024年12月20日

Apache RocketMQ 荣获 2024 开源创新榜单“年度开源项目”
近日,以“新纪天工、开物焕彩——致敬开源的力量”为活动主题的“重大科技成就发布会(首场)”在国家科技传播中心成功举办,并隆重揭晓了 2024 开源创新榜单,旨在致敬中国开源力量,传播推广开源科技成就,营造中国开源创新生态。 2024 年开源创新榜单由中国科协科学技术传播中心、中国计算机学会、中国通信学会、中国科学院软件研究所共同主办,中国开发者社区承办,以王怀民院士为首组建评审委员会,进行研讨评审,面向中国开源行业领域,遴选具有创新性、贡献度和影响力的开源项目、社区、应用场景与开源事件。 在评审出的 10 个年度开源项目中,Apache RocketMQ 成功入选。 Apache RocketMQ 社区近况 Apache RocketMQ 创新论文连续被软件工程顶级会议录用 (1)2024 年 9 月,由阿里云消息队列团队发表的关于 RocketMQ 锁性能优化论文《Beyond the Bottleneck: Enhancing HighConcurrency Systems with Lock Tuning》被 CCFA 类软件工程顶级会议 FM 2024 录用。 高并发系统常常面临性能瓶颈,主要是由于线程间激烈竞争锁导致的等待和上下文切换。作为一家云计算公司,我们非常重视性能的最大化。为此,我们对轻量级自旋锁进行了改进,并提出了一种简洁的参数微调策略,能够在最低风险条件下突破系统性能瓶颈。该策略在高吞吐量消息队列系统 Apache RocketMQ 中得到了验证,实现了 X86 CPU 性能提升 37.58% 和 ARM CPU 性能提升 32.82%。此外,我们还确认了这种方法在不同代码版本和 IO 刷新策略下的一致有效性,显示出其在实际应用中的广泛适用性。这项工作不仅为解决高并发系统的性能问题提供了实用工具,还突显了形式化技术在工程问题解决中的实际价值。 (2)2023 年 9 月,由阿里云消息队列团队发表的关于 RocketMQ 高可用范式设计论文《RocketHA: A Logbased Storage High Availability Paradigm for Messaging and Streaming Scenarios》被软件工程 CCFA 类顶级会议 ASE 2023 录用。 该论文详细探讨了 RocketMQ 在其发展历程中所蕴含的高可用性设计理念,凝聚了团队在行业应用中积累的宝贵经验。为了应对分布式系统中常见的故障,如崩溃和网络分区,RocketHA 提出了一种基于日志存储的高可用性设计框架。该框架由六个基本组件构成,旨在实现系统在面对各种故障时的自动集群恢复。具体而言,RocketHA 通过模块化设计,实现了消息、事件及流场景的高可用性,确保系统能够在发生意外故障时迅速且有效地恢复。此外,该设计还优先考虑了高吞吐量与数据丢失防护,以保障系统在进行大规模数据处理时的稳定性和可靠性。评估结果表明,RocketMQ 在多种负载和故障场景下都表现出卓越的高可用性和快速恢复能力。本文提出的 RocketHA 的设计理念可为其他基于日志存储的系统提供参考和借鉴,推动相关领域的研究与开发。 GSoC(Google Summer of Code) 2024 在谷歌主办的 GSoC 2024 中,Apache RocketMQ 开源社区共提报通过两个选题: 1. RocketMQ Dashboard Supports RocketMQ 5.0 Architecture and Enhances Usability:该题目旨在强化 RocketMQ 的开源控制台能力。 2. Optimizing Lock Mechanisms in Apache RocketMQ:该题目旨在优化锁行为,优化 RocketMQ 的性能以及资源占用。 两个题目均成功结项,第一个题目为 Apache RocketMQ 发布了 rocketmqdashboard 2.0.0,自此RocketMQ Dashboard 支持 Apache RocketMQ 5.0 。第二个题目创新性地提出了 ABS 锁,为轻量化的自旋锁提供了一套退避策略,从而实现低成本、有限制的锁自旋行为,同时适应不同强度的资源争抢情况 Apache RocketMQ 社区 5.3.0、5.3.1 版本发布 Apache RocketMQ 社区近期发布了 5.3.0 和 5.3.1 两个版本,两个版本主要修复现有的 bug 并提升系统的整体稳定性和性能。值得一提的是,Apache RocketMQ 5.3.0 引入了 Apache RocketMQ ACL 2.0 支持,为用户带来了更加灵活和安全的访问控制机制。这些改进和新增功能将显著提升 Apache RocketMQ 在生产环境中的稳定性和安全性,进一步满足用户的业务需求。 Apache RocketMQ 中文社区全新升级 2024 年 7 月,Apache RocketMQ 中文社区(https://rocketmq.io)全新升级,致力于为每一位热衷于 RocketMQ 技术探索与实践的开发者,打造一个集时效性、全面性、深度于一体的一站式学习平台。 最全最新资讯: Apache RocketMQ 中文社区提供从基础到深入的全面学习资料,涵盖原理介绍、架构解读、源码分析等基础知识,高级性能使用、技术前沿探索、场景最佳实践等博客文章,用户反馈的真实答疑样例等,并及时更新版本发布、架构演进和功能迭代等社区动态,以及社区相关活动和会议信息,为您提供更多学习和交流的机会。 智能专家答疑: Apache RocketMQ 中文社区基于 Apache RocketMQ 领域专业知识库,并结合先进的大模型技术进行优化,为您提供 AI 问答助手,作为您的智能学习伴侣。通过自然语言问答,让您的疑问得到迅速解答,使您的学习之旅更加轻松有趣。 关于 Apache RocketMQ RocketMQ 致力于构建低延迟、高并发、高可用、高可靠的分布式“消息、事件、流”统一处理平台,覆盖云边端⼀体化数据处理场景,帮助企业和开发者在智能化时代,轻松构建事件驱动架构的云原生应用。 RocketMQ 自 2012 年诞生于阿里巴巴集团的核心交易链路,至今已经历十余年“双十一”的万亿级数据洪峰验证。2015 年,阿里云面向企业提供商业化的消息队列服务,其中包括云消息队列 RocketMQ 版。2016 年,阿里巴巴向 Apache 软件基金会捐赠了 RocketMQ 项目,RocketMQ 进入 Apache 孵化器。2017 年,Apache RocketMQ 成为 Apache 顶级项目,在开源消息中间件领域占据领导地位。2022 年,Apache RocketMQ 5.0 正式发布,全面拥抱云原生架构、超融合架构,进一步拓展事件驱动、物联网等场景。
#社区动态

2023年1月5日

Apache RocketMQ 斩获 InfoQ 2022 年度十大开源新锐项目
以“深入数字经济·洞见技术价值”为主题的【InfoQ 2022 中国技术力量年终榜单】正式公布获奖名单。其中,Apache RocketMQ以其卓越的易用性、社区活跃性、成熟度、产品优越性、代码健康度等荣获【2022 年度十大开源新锐项目】。 作为主流的分布式消息中间件,RocketMQ于 2012 年开源,并在 2017 年正式成为 Apache 顶级项目,持续迸发出旺盛的生命力。 伴随着云原生时代的到来以及实时计算的兴起, 生于云、长于云的 RocketMQ 5.0 应运而生,全新升级为云原生消息、事件、流融合处理平台,帮助用户更容易地构建下一代事件驱动和流处理应用。RocketMQ 5.0 专注于消息基础架构的云原生化演进,聚焦在消息领域的后处理场景,支持消息的流式处理和轻计算,帮助用户实现消息的就近计算和分析,并全面拥抱 Serverless 和 EDA。 在技术迎来重要革新的同时,回顾 Apache RocketMQ 社区这些年的成长历程。目前,全球 Apache RocketMQ Contributors  700+,促进整个社区长期和健康发展。同时,为了帮助社区开发者更好地找到感兴趣的技术方向,快速参与到社区并推动相关特性优化的快速演进,RocketMQ 还成立内核、批处理、Connect、Streaming、多语言客户端、RocketMQFlink、Operator、Exporter 等不同兴趣小组。 为更好聚集本地开发者,我们在北京、深圳、苏州等城市相继成立当地社区,定期举行线下活动,共同讨论 RocketMQ 相关的落地实践与新特性需求,大量创新从社区的各类活动中产生并且落地。除此之外,RocketMQ 还非常重视社区间的合作,先后与 Apache DolphinScheduler,Apache Hudi 等社区组织了多次联合 Meetup,在打造 RocketMQ 上下游生态的同时,也为不同社区开发者近距离讨论提供了平台。 在社区成员以及众多的开发者共同推动下,全球超过数万家企业在使用 Apache RocketMQ,这其中不仅有字节跳动、快手、小米、滴滴、同城艺龙等互联网头部企业,还有众多银行、券商、保险,基金公司等金融公司。经过多年发展,RocketMQ 已成为微服务领域业务消息首选。 本次获奖离不开全体社区成员的共同努力,是全体社区成员的共同荣誉!社区将再接再厉,不忘初心,持续促进  Apache RocketMQ 项目和社区的持续发展。
#社区动态

2022年12月27日

RocketMQ 多语言 SDK 开源贡献召集令
目前 Apache RocketMQ 5.0 SDK [1]正在社区开源,开发与迭代也在火热进行中,欢迎广大社区的朋友们能够参与其中。我们欢迎任何形式的贡献,包括但不限于新 feature、bugfix、代码优化、生态集成、测试工作、文档撰写。更加欢迎能够认领一个完整的特定语言实现的同学,踏出第一步,你就是 contributor!更有惊喜礼品和成为 committer 的机会等着你! 写在前面 Apache RocketMQ 是由阿里巴巴集团捐赠给 Apache 开源基金会的一款低延迟、高并发、高可用、高可靠的分布式消息中间件,并于 2017 年正式从 Apache 社区毕业,成为 Apache 顶级项目(TLP)。也是国内首个非 Hadoop 生态体系的互联网中间件顶级项目。 面向过去,RocketMQ 经过多年淘宝双十一的洗礼和考验,赢得了诸多客户的认可和青睐。面向未来,RocketMQ 历久弥新,为了更好地迎接云原生时代的来临,基于存算分离架构的 RocketMQ 5.0 应运而生。 RocketMQ 5.0 中引入了全新的无状态 Proxy 组件,在水平拓展,故障应急,多协议等方面都进行了诸多支持与改进(关于 RocketMQ 5.0 的详细介绍,欢迎关注 Rocketmq 官网[2])。同时也为接下来多语言客户端的实现打下了良好基础。 新的多语言SDK RocketMQ 5.0 客户端相比较于 4.x 的版本进行了诸多改进,会是未来社区客户端演进的主流方向。RocketMQ 4.x SDK 的多语言支持并不完美,协议的较高复杂度和语言绑定的实现细节使得多语言的支持与维护都变得棘手,而用户对多语言的诉求是强烈的。值此契机,RocketMQ 5.0 基于 gRPC 正式推出了全新的多语言 SDK。 相比较于 RocketMQ 4.x 的 SDK。RocketMQ 5.0 展现出了一副全新的面貌: 采用全新极简的,immutable 的 API 设计,使得 API 上手更简单,跨语言的对齐也变得更加简单; 完善的错误处理体系和错误码设计,开发者和用户对错误的处理可以更加得心应手; 在 PushConsumer/PullConsumer 之外新推出无状态 SimpleConsumer,实现逻辑轻量,用户可以自行管理消费侧消息的接收与应答,同时也对有更多定制化需求的客户提供了便利。 实现轻量化,代码量相比较旧有实现缩减 3/4 以上,开发和维护的成本更低; 标准化的 logging/tracing/metrics 输出,降低实现复杂度的同时,可观测性的提升会使得生产环境下的问题更容易被捕捉; gRPC 多语言特性,为 RocketMQ 5.0 客户端的多语言实现提供了支撑。RocketMQ 全新的客户端的协议层被替换,语言无关的 IDL 使得协议的维护和实现都更为极为简单。同时得益于 gRPC 强大的生态体系,使得 RocketMQ 与周边的集成也变得更为简便。 RocketMQ 5.0 中引入了新的的 pop 消费,创造性地在原生的队列模型之上支持了这种无状态的消费模式。不同于原始的更适用于流场景的队列模型,pop 机制更面向于业务消息的场景,使得开发者和用户可以只关心消息本身,可以通过「SimpleConsumer」提供单条消息级别的接受/重试/修改不可见时间以及删除等 API 能力。 Roadmap 目前 5.0 多语言 SDK 的 Java/C++ 已经有了相对比较完整的实现。 Go/C 已经提供了基础的 Producer/SimpleConsumer 的实现,其余的语言实现(PHP/Python/JavaScript/Rust 等)还在社区进行中,欢迎大家广泛参与。 对于一个从零开始的特定语言实现,一个大概的步骤如下: 部署 rocketmqnamesrv[3]和 rocketmqproxy[4] 方便与客户端进行调试,为降低部署成本,rocketmqproxy 可以采用 LOCAL 模式进行部署。 熟悉 rocketmqapis中的 IDL,适配新的 gRPC/Protobuf 协议:IDL 中描述了 5.0 SDK 中的语言无关的协议描述,通过 gRPC protoc 工具自动生成协议层代码。 应用新的 API 规范和设计:可以参考 Java 的 API 设计[5],总体指导思想是不可变性且行为明确。 实现 Producer/SimpleConsumer:Producer 提供最基本的四种不同类型消息的发送功能(普通/顺序/定时/事务),SimpleConsumer 提供基于 pop 语义的无状态消息接受/重试/修改不可见时间等能力。 统一的错误处理体系:由服务端产生的异常与错误均有完善的异常错误码和异常信息,各个语言实现需要以最适合的方式暴露给客户。 实现 PushConsumer:RocketMQ 4.x 中最为常用的消费者类型,用户侧只需要明确订阅关系和定义消息监听器行为即可,客户端实现中需要自动帮用户从远端获取消息。 客户端全方位可观测性:规范的日志输出,实现基于 OpenTelemetry/OpenCensus 的客户端 metrics 体系。 按照以上流程开发者在开发过程中出现的任何问题,都欢迎以 issue/pull request 的形式反馈到社区。 如何参与贡献 我们欢迎任何形式的贡献,包括且不限于新 feature、bugfix、代码优化、生态集成、测试工作、文档撰写。更加欢迎能够认领一个完整的特定语言实现的同学!不要犹豫,欢迎大家以 issue/pull request 的形式将你的想法反馈到社区,一起来建设更好的 RocketMQ! 相关资料 rocketmqclients: RocketMQ 5.0 多语言客户端实现 rocketmq: RocketMQ 主仓库(内置 5.0 proxy 实现) rocketmqapis: RocketMQ 5.0 协议具体定义 《RIP37: RocketMQ 全新统一 API 设计》 《RIP39: RocketMQ gRPC 协议支持》 相关链接 [1] Apache RocketMQ 5.0 SDK [2] rocketmq 官网 [3] rocketmqnamesrv [4] rocketmqproxy [5] Java 的 API 设计
作者:艾阳坤
#社区动态

2022年11月9日

消息队列Apache RocketMQ 5.0:从消息服务到云原生事件流平台
前言 回顾 RocketMQ 的发展历程,至今已十年有余。2022 年 RocketMQ 5.0 正式发布,全面迈进云原生时代。 11 月 5 日,2022 杭州 · 云栖大会上,阿里云智能高级产品专家杨秋弟在云原生峰会上发表主题演讲,发布消息队列 RocketMQ 5.0:从消息服务到云原生事件流处理平台。 阿里云智能高级产品专家&Apache RocketMQ 联合创始人 杨秋弟 Apache RocketMQ 发展史 回顾 Apache RocketMQ 过去十年的发展历程,可分为“诞生于互联网”与“成长于云计算”两大阶段。 第一个阶段是 RocketMQ 的从 0 到 1,在阿里内部规模化落地。2012 年,为了支撑超大规模电商互联网架构,阿里中间件研发了 RocketMQ,并在产品诞生初期开源,2017 年 RocketMQ 统一了阿里消息技术体系。 第二个阶段是云计算。2015 年 RocketMQ 上云,这也是业界首个提供公共云 SaaS 形态的开源消息队列;2016 年,阿里把 RocketMQ 捐赠给 Apache,2017 年孵化毕业,成为国内首个 TLP 的互联网中间件。 十年磨一剑,出鞘必锋芒。在这十年的过程中,通过集团打磨稳定性,依托云计算孵化创新,开源共建加速标准化建立与生态连接,RocketMQ 始终坚持开源、集团、商业三位一体的发展思路,内核演进和产品迭代齐头并进。2022 年 RocketMQ 5.0 正式发布宣告着全面迈进云原生时代。 RocketMQ 5.0:从消息服务到云原生事件流平台 回顾过去这十年,RocketMQ 服务于集团几乎所有的业务,在阿里云上更是累计服务了 10 万余家企业客户,覆盖互联网、零售、金融、汽车等 20 多个行业,大规模的生产实践持续累积产品的核心优势。 多样性。企业级应用复杂的业务诉求,催生 RocketMQ 提供丰富的消息类型,比如定时消息、事务消息、顺序消息等等。此外,也提供了像消息轨迹、消息回溯等一系列的消息治理能力。 一致性。无论是淘宝交易还是蚂蚁支付都天然对数据一致性有着极高的要求,RocketMQ 提供的分布式事务消息是业内第一个提供该特性的消息产品,将异步解耦与数据一致性完美融合,是金融客户中不可或缺的产品能力。 稳定性。稳定性是产品的根基,更是一个系统工程,RocketMQ 长期在电商和金融领域中打磨,不仅提供最高达 99.99% SLA,更是帮助客户提供全方位的健康巡检与故障排查能力,如消息轨迹、消息回溯、死信机制等等,提供多样化的稳定性兜底手段。 高性能。在双十一的极限流量下,RocketMQ 具备无限扩展能力,支持千万级并发与万亿级消息洪峰,P9999 写延迟在 1ms 内,100%在 100ms 内。 可以说,在消息领域,尤其在业务消息领域,RocketMQ 在国内已经做到顶尖,成为企业客户的首选。 而随着云原生以及数字化时代的到来,RocketMQ 也在快速的发生着变化,那么变化主要体现在哪些方面呢? 首先,全面拥抱云原生。向下,消息系统自身实现云原生架构的演进,充分释放云基础设施的池化能力,全方位提高消息的核心技术指标。向上,消息产品形态持续演进,成为云原生应用架构的核心引擎。比如微服务、事件驱动、Serverless 等现代化应用架构。 其次,全面拥抱实时数据。企业的数字化转型从原来的业务数字化迈向了数字业务化。对业务数据的实时洞察、实时决策成为指导业务成功的关键要素。消息队列也将从在线业务架构的基础设施,延伸到实时数据架构的基础设施,从而实现事务分析一体化。 随着 5.0 的发布,RocketMQ也正式从消息服务升级到云原生事件流处理平台。 RocketMQ 5.0:云原生架构升级 首先来看 RocketMQ 自身的云原生架构演进。从下面的全景图可以看出,RocketMQ 从客户端到服务端都进行了全方位的改造,具体体现在以下几个方面: 轻量化。RocketMQ 4.0 诞生于 2012 年的淘宝电商,当时大部分的业务还跑在物理机上,单节点计算能力强,客户端节点数少且较为稳定,因此,富客户端的接入方式不仅更加高效,更可以提供诸如客户端侧负载均衡、消息缓存、故障转移等一系列企业级特性。但这种模式在云原生时代发生了改变,轻量客户端更容易被云原生技术栈所集成。因此,RocketMQ 5.0 客户端采用轻量 SDK 设计理念,将原来富客户端的逻辑下沉到服务端,满足现代化应用轻量化、Serverless 化以及 Mesh 化的趋势,更容易被集成;同时也正是因为轻量化,使得 SDK 多语言开发成本低了很多,快速覆盖当下主流的多语言版本。 弹性。存算分离架构让无状态计算节点可以快速伸缩,而分级存储以及冷热分离架构更是让消息存储具备更强的弹性能力。 高可用。基于全新的 Leaderless 架构,去 ZK 依赖的同时,可以做到副本数灵活选择,同步异步自动升降级,实现秒级故障转移;面向云的多可用区、多地域组建全局高可用能力。 基础设施云原生化。RocketMQ 整体架构走向 Kubernetes 化,拥抱 OpenTelemetry,依托于阿里云提供的 ARMS、Prometheus 以及Grafana 实现可观测能力的云原生化。 而 RocketMQ 5.0 本次的升级,除了在技术架构云原生化之外,在产品能力以及成本优化方面都有着重大的提升,我们来逐一分解。 轻量无状态消费模型 RocketMQ 4.0 采用按队列消费模型,消费者完全按照队列负载均衡,非常适合批量拉取快速消费,对单一消息状态不敏感的场景,比如流计算。然而在业务消息领域,尤其是金融场景以及事件驱动架构之下,每一条消息状态都是极为重要的。再加上不同业务类型的消息处理耗时也是各不相同,从毫秒级、到秒级甚至到分钟级,队列的负载不均衡或者短暂的 Block 都可能会引发消息的局部堆积,从而影响到最终用户的体验。因此,RocketMQ 5.0 全新推出按消息负载的轻量无状态消费模型,通过 PoP 机制巧妙地在队列模型之上构建了消息模型,业务只需要关心消息而无需关心队列,所有 API 都能够支持单条消息级别控制,如消息的消费、重试、删除等。而基于消息消费模型,客户端、连接和消费都是无状态的,可在任意 Proxy 节点上飘移,真正做到轻量化。 RocketMQ 5.0 提供按队列消费模型以及按消息消费模型,更好的满足事件与流的业务场景,正可谓鱼与熊掌兼得。 海量消息分级存储 RocketMQ 5.0 的另一个重大升级则是海量消息的分级存储。对消息队列了解的同学都知道,消息通常都是流动的短时间的存储,业内大部分消息产品对消息的保留时间都比较优先,3 天,7 天,最长 15 天不等。有限的存储空间使不仅限制了消息的保留时长,更在某些场景下可能会导致业务资损,比如在消息还没有被消费的时候,因为磁盘空间不足或者消息过期而被清除,这在金融等领域都是不可接受的。所以,RocketMQ 一直想要解决这样的问题,让存储变得更有弹性。 RocketMQ 5.0 基于ESSD、对象存储打造冷热分离以及分级存储架构,提供低成本的无限存储能力,确保消息不会因为本地磁盘空间不足而提前被清除,造成业务资损。我们提供消息存储的 Serverless,客户只需按实际存储使用量付费,而无需预购存储空间。 此外,流量削峰是消息队列极为常见的场景,而由此带来的海量消息堆积能力以及在堆积情况下的性能稳定性成为衡量产品性能的核心指标。RocketMQ 5.0 基于冷热数据分离架构进一步做到读写隔离,避免在堆积的场景下影响热数据的写入性能。分级存储的冷数据碎片规整能力更是提高了冷数据的读取性能,为用户提供一致的冷读 SLA。 售卖系列全线升级,最高降本 50% 从前面的介绍我们已经了解到,RocketMQ 5.0 在技术架构以及产品能力上都有着明显提升。 而 5.0 推出全新的售卖形态与计费策略相比 4.0 更简单、更灵活也更为普惠。实例的综合成本最高可降低 50%。接入门槛最低可降至 390 元/月,远低于自建成本。消息存储支持 Serverless 弹性能力,按需付费可大幅降低闲置成本。结合冷热分离的多级存储能力,相比开源自建可降低 67%,大幅降低消息队列的使用成本。 EventBridge:云上事件枢纽 事件驱动是一个起源很早的概念,早在几十年前,无论是操作系统内核的设计、还是客户端编程框架都大量采用了事件驱动技术。RocketMQ PushConsumer 提供的 API 其实就是一种事件驱动的编程范式,但在微服务应用架构当中,集成与通信才是刚需,事件驱动的价值并没有那么明显的体现。 而随着云原生时代的到来,计算力的构成越来越多样化。作为云原生的代表技术,Serverless 架构范式也是事件驱动的。无论是阿里云的函数计算、还是 AWS 的 Lambda,它们的主要触发源都是各种形态的事件,云产品事件,如 OSS 文件上传触发用户基于函数进行文件加工处理;用户业务事件,如 RocketMQ 触发函数运行消费逻辑处理等等。 以事件驱动为核心理念,阿里云推出了 EventBridge 产品,其使命就是打造云上的事件枢纽。通过EventBridge 可以兑现四大业务价值: 统一事件枢纽。阿里云从 IaaS、PaaS到第三方的 SaaS,每天都有数以亿计的事件产生,但却没有一种简单和统一的方式来触达这些事件;这些事件散落在各个地方形成『事件孤岛』,很难挖掘出有用的业务价值。只有充分发挥数据的规模效应,建立起数据之间的血缘关系,我们才能更好的发掘出数据的价值;所以 EventBridge 首要任务便是统一阿里云上的事件规范,拥抱CloudEvents 事件标准,打造阿里云统一的事件枢纽。 事件驱动引擎。当 EventBridge 连接了海量的事件源后,基于 RocketMQ 毫秒级的事件触发能力,必将加速企业 EDA/Serverless 的架构升级。 开放与集成。EventBridge 提供丰富的跨云、跨平台、跨产品、跨地域以及跨账号的连接能力,能够促进云产品、应用程序、SaaS 服务的相互集成。 低代码。EventBridge 借助Serverless 的应用中心,通过简单的规则匹配以及丰富的模板,即可实现事件的分发、过滤、转换等处理,进一步提升事件驱动的效率。 让消息无处不在,让事件无所不及 依托于 EventBridge、RocketMQ 以及函数计算 FC 的强强联合,目前 EventBridge 的事件生态已初具规模。 在云产品事件集成方面,目前已经集成 200+云产品事件源,3000 多种事件类型。 在数据集成与处理方面,EventBridge 与微服务应用、大数据、IoT 等场景深度集成。比如与消息生态的融合,阿里云 6 款消息队列产品通过 EventBridge 实现消息数据的互联互通,并且通过 EventBridge 的全球事件网络赋予消息全球消息路由的能力,同时也可以通过EventBridge 提供的丰富的模板,实现 ETL 数据处理能力。 在 SaaS 应用集成方面,包括钉钉、聚石塔以及云上 50 多个 SaaS 服务都可以通过 EventBridgehook 方式连接到 EventBridge。 在事件触发方面,EventBridge 目前已经触达 10 多个事件目标,海量的事件源都可以通过 EventBridge 触发包括 FC/SAE 等在内的 10 多款事件目标云产品。除此之外,目前 EventBridge 已经对接了阿里云全量的云产品 API,任何一个事件都可以通过云产品 API 的方式进行触达。 未来还有会更多的事件源以及事件目标接入到 EventBridge 上来。 RocketMQ Streams:轻量级计算的新选择 正如开篇所提到的,基于云原生架构的全面升级,RocketMQ 5.0 也将从在线业务架构的基础设施,延伸到实时数据架构的基础设施,实现事务分析一体化。将 RocketMQ Streams 打造成为轻量级计算的新选择。 业内最常见如 Flink、Spark 等大数据框架大多采用中心化的 MasterSlave 架构,依赖和部署比较重,每个任务的执行都需要很大的开销,有较高的使用成本。而与之不同的是,RocketMQ Streams 着重打造轻资源,高性能的轻量计算引擎,无额外依赖,最低1core,1g 即可完成部署,适用于大数据量、高过滤、轻窗口计算的场景,在资源敏感型场景,如消息队列流计算、安全风控,边缘计算等,RocketMQ Streams 具有有很大优势。阿里云消息团队与安全团队合作,通过对过滤场景做大量优化,性能提升 35 倍,资源节省 50%80%。 目前,RocketMQ Streams 已经在开源社区发布,未来计划在 2023 年 4 月在阿里云完成商业化。 RocketMQ 这十年,我们一同向前 RocketMQ历经十余年的打磨,已经取得了众多成果。全球拥有 700+的贡献者,1.8w Star 数,超过 80%的主流云厂商提供了 RocketMQ 的商业托管服务,Apache RocketMQ 社区始终保持着极高的活跃度,因此,也荣获了科创中国“开源创新榜”,中日韩开源软件优秀技术奖等十多个国内外开源奖项。 而阿里云作为 RocketMQ 的起源和核心贡献者,不仅 100%覆盖全集团业务,承载千万级并发万亿级消息洪峰。十多年以来更是累计服务 10w+万企业客户,覆盖互联网、零售、汽车等 20 多个行业,超过 75%的头部企业选择使用 RocketMQ。期望阿里云的消息队列 RocketMQ 可以成为广大企业客户的心之所选。也诚邀更广大的开发者来积极参与RocketMQ 的开源社区建设,一起将 RocketMQ 打造为消息领域的引领者。
#社区动态

2022年7月22日

生于云、长于云,RocketMQ 5.0再出发
7 月 21 日7 月 22 日,由 Apache RocketMQ 社区主办,阿里云天池平台、云原生应用平台承办的首届 RocketMQ Summit 全球开发者峰会拉开帷幕。Apache RocketMQ 联合创始人林清山发布 RocketMQ 能力全景图,为众多开发者阐述 RocketMQ 5.0 的技术定位与发展方向,来自快手、小米、字节跳动等互联网头部企业的 40 位演讲嘉宾与众多开发者分享各自行业的最佳实践与技术探索经验。 阿里云云原生应用平台负责人丁宇表示,开源让云计算更加的标准化、云计算让开源产品化和规模化,未来的数字世界,将构建在云计算和开源之上。阿里巴巴将以开源的方式,践行开放共享好科技理念,把开源作为技术战略的重要组成部分。 今天,阿里巴巴的开源项目总数超过 3000 个,涵盖云计算、大数据、AI、中间件、数据库、容器、Serverless、高可用等领域,拥有超过 30000 名 Contributor,超过百万 Star,位列中国企业社区贡献榜首,连续十年蝉联中国厂商开源活跃度第一、影响力第一。未来,阿里云也将会持续投入 RocketMQ 的开源建设,构建更加繁荣的社区生态。希望与更多的开发者、贡献者一起,追求极致、开放共享,实现开源技术的普惠。 基于自身实践,RocketMQ 社区对于消息队列演进趋势的洞察 消息队列作为最经典的中间件之一,已经有三十多年历史。伴随着技术发展,消息队列领域不断扩展,迸发新生命力,作为国内大规模实践先行者,RocketMQ 社区认为消息领域将迎来以下趋势变化: 1. 全面拥抱云原生 消息队列将向上演进消息型的产品形态,更好去支撑微服务、事件驱动、Serverless 化等云原生应用架构;向下演进消息系统自身云原生架构,通过系统重构充分释放基础设施的弹性计算、存储、网络等能力,全方位提升消息技术指标,降低消息成本,提高消息队列弹性能力。 2. 全面拥抱物联网 物联网技术将更广泛的落地到各行各业,万物互联、边缘计算进一步拓展消息的边界。面向物联网的消息队列要海量异构设备接入,海量消息队列存储,能够随处运行,具备云边端一体的无边界部署能力。 3. 全面拥抱实时数据 企业的数字化转型的步伐不断加速,从业务数字化迈向数字业务化。数字化企业持续产生业务数据,对业务数据实时洞察与决策,才能帮助企业快速响应商机、把握商机,使得业务获得更大成功。同时,消息队列也将从在线业务架构的基础设施延伸到实时数据架构的基础设施,达到事务分析一体化。 四大方向,全面解读 RocketMQ 5.0 架构演进 1. 面向微服务 引入微服务架构,数字化企业以“高内聚、低耦合”的方式高效协作。微服务架构也带来新问题,比如大量同步微服务会面临延迟增大、可用性降低等风险。为了解决这个问题,越来越多的企业引入消息队列建设异步微服务体系,进一步提高微服务的韧性,降低响应延迟。 业界的微服务技术趋势,一方面是已经形成了事实标准,比如说像 Spring Cloud 体系,Dubbo 体系,通信协议有 HTTP、AMQP 等,另一方面下一代的微服务体系也在快速发展中,主要体现在基础设施下沉,比如 Servicemesh,Serverless 等技术。 在这个趋势下,RocketMQ 5.0, 在 SDK 层面将原来的重型客户端往轻量客户端演进,基于标准 gRPC 作为 remoting 层实现 SDK,同时也将更多客户端逻辑下沉到服务端,比如消息重试、负载均衡等,大幅度降低多语言 SDK 的实现成本。轻量客户端更好的匹配了 ServiceMesh 的需求,RocketMQ 的 Mesh 能力已正式合入 CNCF Envoy 官方社区。 在负载均衡方面,RocketMQ 从原来的队列粒度负载均衡演进到了消息粒度负载均衡模式,消息粒度负载均衡更加匹配 Serverless 应用的场景,无状态 Serverless 应用弹性伸缩过程不会触发频繁的队列重平衡,降低消息重复率和端到端延迟。 RocketMQ 5.0 提供无状态 proxy,通过 proxy 可以很方便的扩展更多标准消息协议以及流量治理功能。无状态 proxy 也具备良好的的网络穿透能力,可以灵活应对企业在上云过程中面临复杂跨网络访问场景。 今天我们以 RocketMQ 5.0 核心能力为基础,支撑了阿里云 RocketMQ、MNS、RabbitMQ 等多款云消息产品。其中阿里云 RabbitMQ 是一款兼容 AMQP 协议、RabbitMQ SDK 的消息服务,可以帮助开源存量用户无缝上云。同时它也充分释放了底层 RocketMQ 云原生架构的技术红利,具备和 RocketMQ 一致的高性能、无限扩展、高可用等特点,是云原生的 RabbitMQ。 2. 事件驱动(EDA) 事件驱动在 18 年被 Gartner 评为年度十大技术趋势。在未来新型的数字化商业解决方案中,会有 60% 以上的商业数字化解决方案采纳 EDA 架构。EDA 为软件架构带来彻底解耦,实现更灵活的业务扩展和业务敏捷能力,不仅可以用于单一业务领域的微服务解耦,还可以用于跨部门、跨组织、跨业务领域的事件集成。消息队列是 EDA 架构中最核心的组件,承担 eventbroker 的职责。随着 EDA 架构被大规模跨组织的落地,要进一步提高行业级生产力,标准化也迫在眉睫。为此 CNCF 推出了 CloudEvent 规范,基于统一的规范,跨系统、跨组织的数字化协同有了共同的“语言”,能够实现更高效的系统集成,有了规范也方便沉淀面向事件的统一基础软件设施,提高研发效率。 面向 EDA 趋势,RocketMQ 5.0 发布全新产品形态——Eventbridge。整个领域模型以事件为中心,并拥抱 CloudEvent 规范,CloudEvent 社区开源 SDK 可无缝接入 Eventbridge。同时,还提供各种低代码事件编排、过滤、路由能力,灵活实现各种事件集成。 今天我们以 RocketMQ 5.0 核心能力为基础,支撑了阿里云 EventBridge 产品,助力云客户实现事件驱动、事件集成的商业生态。 3. 物联网 全球的 IoT 设备爆发式增长,预计到了 2025 年将达到 200 多亿台,。并且物联网也带来了边缘计算的兴起,未来将有 75% 的数据将在传统数据中心或云环境之外进行处理。目前物联网行业已经形成了多个标准协议,其中最流行莫过于 MQTT,这是"发布订阅"模式的消息协议,除此之外还有各种车联网协议、工业协议等等,物联网消息队列要具备多样化异构海量设备接入能力。RocketMQ 可作为物联网应用的基础通信设施,用于 IoT、移动设备的数据上报,还有指令下行,为 IoT 业务连接云边端。 面向 IoT 的趋势,RocketMQ 5.0 发布轻量级百万队列引擎,轻量元数据服务。在新存储内核之上,建设物联网形态消息队列 MQTT,支持标准物联网协议,支持海量物联网设备接入和海量队列存储。 RocketMQ 5.0 遵循零外部依赖的精简架构原则,新 HA 架构为低资源消耗场景提供更多选择,用户可以权衡可靠性、成本、可用性,选择最优副本策略。比如边缘场景由于资源受限,RocketMQ 不一定需要提供三副本存储,可以是 2 副本,甚至是单副本就能满足业务需求。 今天以 RocketMQ 5.0 核心能力为基础,支撑了阿里云微消息队列 MQTT,为客户提供云端一体化消息解决方案,实现万物互联、云端互联。 4. 实时大数据 未来大数据将走向实时化,预测在 2025 年实时大数据的比例将达到 30%。数字化企业通过实时感知、实时分析、实时决策,能够抓住商机、快速响应用户,实时大数据的重要性愈发突出。消息队列是实时大数据的关键技术之一,作为事件流的核心存储,它承担数据的分发,数据的缓冲,还有轻量的流处理的作用。 事件流技术越来越多的在 IoT 场景进行使用,IDC 预测未来 95% 的实时事件流将来自IoT场景;另外有越来越多的交易事件需要进行实时分析,挖掘更多业务价值,事件流技术也开始呈现事务分析一体化的趋势。 面向事件流的趋势,RocketMQ 5.0 在流存储和流分析能力进行重点打造: 流存储方面,支持批量索引,大幅度提高 RocketMQ 吞吐量。支持 compacttopic,用于实现流处理过程中的状态存储,零外部依赖。除了功能特性之外,RocketMQ 5.0 的流存储同时进行了云原生架构改造,引入逻辑队列的概念,解耦了数据逻辑分区跟物理存储之间的绑定关系,能够实现全局固定分区前提下进行无缝扩缩容,零数据迁移。 流分析方面,RocketMQ 5.0 全新发布了轻量的流计算引擎,它可以兼容 flink SQL,方便用户在不同场景无缝切换。如果用户需要大而全流计算能力,可以使用大型计算平台。如果用户有边缘计算、资源受限、简单流处理场景可以直接使用 RocketMQ 的 RSQL 来支持。 今天我们以 RocketMQ 5.0 事件流能力为基础,支撑了阿里云 Kafka 产品,存量 Kafka 用户实现无缝上云。同时基于 RocketMQ 的逻辑队列能力,阿里云 Kafka 具备快速弹性伸缩能力,提供了 Serverless 化的产品形态。让存量 Kafka 用户也能够体验到云原生架构的红利。 不断演进,RocketMQ 正式迈进 5.0 时代 在过去七年大规模云计算实践中,RocketMQ 不停自我演进。今天 RocketMQ 正式迈进了 5.0 的时代。从互联网业务消息中间扩展到“消息、事件、流”超融合处理平台,解锁了更全面能力。 在消息领域,全面拥抱云原生技术,以获得更好的弹性伸缩。在事件领域产品形态进行全面升级,拥抱行业标准,让事件驱动的架构无处不在,从单一业务的数字化系统扩展到跨组织跨业务的数字化商业生态事件驱动的架构,也同时让云计算原生的技术能够更大规模的落地,提高云产品跟用户业务的集中度。让 Serverless 的技术能够被更大范围的采纳,帮助企业客户去降本增效。在流存储和流计算领域,流存储增强批量的特性,大幅度提高 RocketMQ 数据吞吐量,新增逻辑队列能力,解耦逻辑资源跟物理资源,在流场景也具备无缝伸缩能力;新增轻量流处理引擎也提供了实时事件流处理、流分析能力。 RocketMQ 基于端云一体化架构,实现完整物联网消息队列能力,从原来连接应用扩展到连接物联网设备。同时 RocketMQ 5.0 也继续保持着极简架构原则,即便产品能力全面提升,也依然能够以最低资源消耗,最低运维代价去搭建服务。 现在 RocketMQ 已经真正具备连接一切,随处运行的能力,提供云、边端一体化实时数据解决方案。物联网设备持续的产生的数据,边缘 RocketMQ 可以进行实时数据分析,快速响应业务。通过实时 ETL,实时决策产生的高价值事件,或者数据可以传输到云端,通过 RocketMQ  eventing 能力连接更强大的公有云平台,利用云的一站式平台技术,进一步放大每份数据的价值。 不止于开源,RocketMQ 赋能海量行业客户 今天,基于 RocketMQ 5.0 为内核,阿里云也打造一站式消息平台,在统一云原生消息内核基础上,提供 6 种消息产品形态,有面向 IoT 场景的微消息队列 MQTT,有面向 EDA 场景的 EventBridge,有面向开源用户无缝上云的托管开源产品如 Kafka、RabbitMQ、RocketMQ。 通过多样化产品形态,RocketMQ 在阿里云上面已服务数万个企业用户,帮助其完成数字化转型的同时。RocketMQ 也得到业界的广泛认可。近期获得多个奖项,包括 OSCHINA 优秀技术团队奖、中国开源云联盟优秀基础软件、中国科学技术协会颁布的科创中国开源创新榜等,并进入 Apache 中国开源项目领导者象限。RocketMQ 成为第一个通过信通院可信云分布式消息队列服务的“增强级”认证,第一个通过信通院金融级稳定性评测的“先进级”认证。 随着潜在用户数的增大,RocketMQ 的商业价值也被进一步的放大。目前已经有十家的云厂商提供 Apache RocketMQ 的商业服务,它几乎覆盖了国内主流的公共云厂商。这样 RocketMQ 的用户就有了更多的选择,真正实现无厂商锁定,RocketMQ 已经成为原生消息的事实标准。 万物皆云的时代,RocketMQ 让数字化转型更简单高效,也将消息、事件、流的价值最大程度释放。Apache RocketMQ 将不断推动技术演进与落地实践,帮助企业真正实现高质量数字化转型与创新。
#社区动态

2022年2月22日

RocketMQ-Streams 首个版本发布,轻量级计算的新选择
RocketMQStreams 聚焦「大数据量高过滤轻窗口计算」场景,核心打造轻资源,高性能优势,在资源敏感场景有很大优势,最低 1Core,1G 可部署。通过大量过滤优化,性能比其他大数据提升 25 倍性能。广泛应用于安全,风控,边缘计算,消息队列流计算。 RocketMQStreams 兼容 Flink 的 SQL,udf/udtf/udaf,将来我们会和 Flink 生态做深度融合,即可以独立运行,也可发布成 Flink 任务,跑在 Flink 集群,对于有 Flink 集群的场景,即能享有轻资源优势,可以做到统一部署和运维。 01 _RocketMQStreams 特点及应用场景_  RocketMQStreams 应用场景 计算场景:适合大数据量高过滤轻窗口计算的场景。不同于主流计算引擎,需要先部署集群,写任务,发布,调优,运行这么复杂的过程。RocketMQStreams 本身就是一个 lib 包,基于 SDK 写完流任务,可以直接运行。支持大数据开发需要的计算特性:ExactlyONCE,灵活窗口(滚动、滑动、会话),双流Join,高吞吐、低延迟、高性能。最低 1Core,1G 可以运行。 SQL引擎:RocketMQStreams 可视作一个 SQL 引擎,兼容 Flink SQL 语法,支持 Flink udf/udtf/udaf 的扩展。支持 SQL 热升级,写完 SQL,通过 SDK 提交 SQL,就可以完成 SQL 的热发布。 ETL引擎:RocketMQStreams 还可视作 ETL 引擎,在很多大数据场景,需要完成数据从一个源经过 ETl,汇聚到统一存储,里面内置了 grok,正则解析等函数,可以结合 SQL 一块完成数据 ETL 。 开发 SDK,它也是一个数据开发 SDK 包,里面的大多数组件都可以单独使用,如 Source/sink,它屏蔽了数据源,数据存储细节,提供统一编程接口,一套代码,切换输入输出,不需要改变代码。  RocketMQStreams 设计思路 设计目标 依赖少,部署简单,1Core,1G 单实例可部署,可随意扩展规模。 实现需要的大数据特性:ExactlyONCE,灵活窗口(滚动、滑动、会话),双流 Join,高吞吐、低延迟、高性能。 实现成本可控,实现低资源,高性能。 兼容 Flink SQL,UDF/UDTF,让非技术人员更易上手。 设计思路 采用 sharednothing 的分布式架构设计,依赖消息队列做负载均衡和容错机制,单实例可启动,增加实例实现能力扩展。并发能力取决于分片数。 利用消息队列的分片做 shuffle,利用消息队列负载均衡实现容错。 利用存储实现状态备份,实现 ExactlyONCE 的语义。用结构化远程存储实现快速启动,不必等本地存储恢复。  RocketMQStreams 特点和创新 02 _RocketMQStreams SDK 详解_  Hello World 按照惯例,我们先从一个例子来了解 RocketMQStreams namespace:相同 namespace 的任务可以跑在一个进程里,可以共享配置 pipelineName:job name DataStreamSource:创建 source 节点 map:用户函数,可以通过实现 MapFunction 扩展功能 toPrint:结果打印出来 start:启动任务 运行上面代码就会启动一个实例。如果想多实例并发,可以启动多个实例,每个实例消费部分 RocketMQ 的数据。 运行结果:把原始消息拼接上“”,并打印出来  RocketMQStreams SDK StreamBuilder 做为起点,通过设置 namespace,jobName 创建一个 DataStreamSource 。 DataStreamSource 通过 from 方法,设置 source,创建 DataStream 对象。 DataStream 提供多种操作,会产生不同的流: to 操作产生 DataStreamAction window 操作产生 WindowStream 配置 window 参数 join 操作产生 JoinStream 配置 join 条件 Split 操作产生 SplitStream 配置 split 条件 其他操作产生 DataStream DataStreamAction 启动整个任务,也可以配置任务的各种策略参数。支持异步启动和同步启动。  RocketMQStreams 算子  RocketMQStreams 算子 SQL 有两种部署模式,1 是直接运行 client 启动 SQL,见第一个红框;2 是搭建 server 集群,通过 client 提交 SQL 实现热部署,见第二个红框。 RocketMQStreams SQL 扩展,支持多种扩展方式: 通过 FlinkUDF,UDTF,UDAF 扩展 SQL 能力,在 SQL 中通过 create function 引入,有个限制条件,即 UDF 在 open 时未用到 Flink FunctionContext 的内容。 通过内置函数扩展 SQL 的函数,语法同 Flink 语法,函数名是内置函数的名称,类名是固定的。如下图,引入了一个 now 的函数,输出当前时间。系统内置了 200 多个函数,可按需引入。 通过扩展函数实现,实现一个函数很简单,只需要在 class 上标注 Function,在需要发布成函数的方法上标注 FunctionMethod,并设置需要发布的函数名即可,如果需要系统信息,前面两个函数可以是 IMessage 和 Abstract,如果不需要,直接写参数即可,参数无格式要求。如下图,创建了一个 now 的函数,两种写法都可以。可以通过 currentTime=now()来调用,会在 Message 中增加一个 key=currentTime,value=当前时间的变量。 把现有 java 代码发布成函数,通过策略配置,把 java 代码的类名,方法名,期望用到的函数名,配置进去,把 java 的 jar 包 copy 到 jar 包目录即可。下图是几种扩展的应用实例。 03 _RocketMQStreams 架构及原理实现_  整体架构  Source 实现 Source 要求实现最少消费一次的语义,系统通过 checkpoint 系统消息实现,在提交 offset 前发送 checkpoint 消息,通知所有算子刷新内存。 Source 支持分片的自动负载均衡和容错。 数据源在分片移除时,发送移除系统消息,让算子完成分片清理工作。 当有新分片时, 发送新增分片消息,让算子完成分片的初始化。 数据源通过 start 方法,启动 consuemr 获取消息。 原始消息经过编码,附加头部信息包装成 Message 投递给后续算子。  Sink 实现 Sink 是实时性和吞吐的一个结合。 实现一个 Sink 只要继承 AbstractSink 类实现 batchInsert 方法即可。batchInsert 的含义是一批数据写入存储,需要子类调用存储接口实现,尽量应用存储的批处理接口,提高吞吐。 常规的使用方式是写 Messagecacheflush存储的方式,系统会严格保证,每次批次写入存储的量不超过 batchsize 的量,如果超了,会拆分成多批写入。 Sink 有一个 cache,数据默认写 cache,批次写入存储,提高吞吐量。(一个分片一个 cache)。 可以开启自动刷新,每个分片会有一个线程,定时刷新 cache 数据到存储,提高实时性。实现类:DataSourceAutoFlushTask 。 也可以通过调用 flush 方法刷新 cache 到存储。 Sink 的 cache 会有内存保护,当 cache 的消息条数batchSize,会强制刷新,释放内存。  RocketMQStreams ExactlyONCE Source 确保在 commit offset 时,会发送 checkpoint 系统消息,收到消息的组件会完成存盘操作。消息至少消费一次。 每条消息会有消息头部,里面封装了 QueueId 和 offset 。 组件在存储数据时,会把 QueueId 和处理的最大 offset 存储下来,当有消息重复时,根据 maxoffset 去重。 内存保护,一个 checkpoint 周期可能有多次 flush(条数触发),保障内存占用可控。  RocketMQStreams Window 支持滚动,滑动和会话窗口。支持事件时间和自然时间(消息进入算子的时间)。 支持高性能模式和高可靠模式,高性能模式不依赖远程存储,但在分片切换时的窗口数据会有丢失。 快速启动,无需等本地存储恢复,在发生错误或分片切换时,异步从远程存储恢复数据,同时直接访问远程存储计算。 利用消息队列负载均衡,实现扩容缩容,每个 Queue 是一个分组,一个分组同一刻只被一台机器消费。 正常计算依赖本地存储,具备 Flink 相似的计算性能。 支持三种触发模式,可以均衡 watermark 延迟和实时性要求 04 _RocketMQStreams 在云安全的应用_  在安全应用的背景 公共云转战专有云,在入侵检测计算方面遇到了资源问题,大数据集群默认不输出,输出最低 6 台高配机器,用户很难接受因为买云盾增配一套大数据集群。 专有云用户升级,运维困难,无法快速升级能力和修复 bug。  流计算在安全的应用 基于安全特点(大数据高过滤轻窗口计算)打造轻量级计算引擎:经过分析所有的规则都会做前置过滤,然后才会做较重的统计,窗口,join 操作,且过滤率比较高,基于此特点,可以用更轻的方案实现统计,join 操作。 通过 RocketMQStreams,覆盖 100%专有云规则(正则,join,统计)。 轻资源,内存是公共云引擎的 1/70,CPU 是 1/6,通过指纹过滤优化,性能提升 5 倍以上,且资源不随规则线性增加,新增规则无资源压力。复用以前的正则引擎资源,可支持 95%以上局点,不需要增加额外物理资源。 通过高压缩维表,支持千万情报。1000 W 数据只需要 330 M 内存。 通过 C/S 部署模式,SQL 和引擎可热发布,尤其护网场景,可快速上线规则。 05 _RocketMQStreams 未来规划_ 新版本下载地址:
作者:袁小栋、程君杰
#社区动态 #流处理

2022年1月26日

Apache RocketMQ + Hudi 快速构建 Lakehouse
本文目录 背景知识 大数据时代的构架演进 RocketMQ Connector&Stream Apache Hudi 构建Lakehouse实操 本文标题包含三个关键词:Lakehouse、RocketMQ、Hudi。我们先从整体Lakehouse架构入手,随后逐步分析架构产生的原因、架构组件特点以及构建Lakehouse架构的实操部分。 背景知识 1、Lakehouse架构 Lakehouse最初由Databrick提出,并对Lakehouse架构特征有如下要求: (1)事务支持 企业内部许多数据管道通常会并发读写数据。对ACID事务的支持确保了多方并发读写数据时的一致性问题; (2)Schema enforcement and governance Lakehouse应该有一种方式可以支持模式执行和演进、支持DW schema的范式(如星星或雪花模型),能够对数据完整性进行推理,并且具有健壮的治理和审计机制; (3)开放性 使用的存储格式是开放式和标准化的(如parquet),并且为各类工具和引擎,包括机器学习和Python/R库,提供API,以便它们可以直接有效地访问数据; (4)BI支持 Lakehouse可以直接在源数据上使用BI工具。这样可以提高数据新鲜度、减少延迟,并且降低了在数据池和数据仓库中操作两个数据副本的成本; (5)存储与计算分离 在实践中,这意味着存储和计算使用单独的集群,因此这些系统能够扩展到支持更大的用户并发和数据量。一些现代数仓也具有此属性; (6)支持从非结构化数据到结构化数据的多种数据类型 Lakehouse可用于存储、优化、分析和访问许多数据应用所需的包括image、video、audio、text以及半结构化数据; (7)支持各种工作负载 包括数据科学、机器学习以及SQL和分析。可能需要多种工具来支持这些工作负载,但它们底层都依赖同一数据存储库; (8)端到端流 实时报表是许多企业中的标准应用。对流的支持消除了需要构建单独系统来专门用于服务实时数据应用的需求。 从上述对Lakehouse架构的特点描述我们可以看出,针对单一功能,我们可以利用某些开源产品组合构建出一套解决方案。但对于全部功能的支持,目前好像没有一个通用的解决方案。接下来,我们先了解大数据时代主流的数据处理架构是怎样的。 大数据时代的架构演进 1、大数据时代的开源产品 大数据时代的开源产品种类繁多,消息领域的RocketMQ、Kafka;计算领域的flink、spark、storm;存储领域的HDFS、Hbase、Redis、ElasticSearch、Hudi、DeltaLake等等。 为什么会产生这么多开源产品呢?首先在大数据时代数据量越来越大,而且每个业务的需求也各不相同,因此就产生出各种类型的产品供架构师选择,用于支持各类场景。然而众多的品类产品也给架构师们带来一些困扰,比如选型困难、试错成本高、学习成本高、架构复杂等等。 2、当前主流的多层架构 大数据领域的处理处理场景包含数据分析、BI、科学计算、机器学习、指标监控等场景,针对不同场景,业务方会根据业务特点选择不同的计算引擎和存储引擎;例如交易指标可以采用binlog + CDC+ RocketMQ + Flink + Hbase + ELK组合,用于BI和Metric可视化。 (1)多层架构的优点:支持广泛的业务场景; (2)多层架构的缺点: 处理链路长,延迟高; 数据副本多,成本翻倍; 学习成本高; 造成多层架构缺点主要原因是存储链路和计算链路太长。 我们真的需要如此多的解决方案来支持广泛的业务场景吗?Lakehouse架构是否可以统一解决方案? 多层架构的存储层是否可以合并?Hudi产品是否能够支持多种存储需求? 多层架构的计算层是否可以合并?RocketMQ stream是否能够融合消息层和计算层? 当前主流的多层架构 3、Lakehouse架构产生 Lakehouse架构是多层架构的升级版本,将存储层复杂度继续降低到一层。再进一步压缩计算层,将消息层和计算层融合,RocketMQ stream充当计算的角色。我们得到如下图所示的新架构。新架构中,消息出入口通过RocketMQ connector实现,消息计算层由RocketMQ stream实现,在RocketMQ内部完成消息计算中间态的流转;计算结果通过RocketMQHudiconnector收口落库Hudi,Hudi支持多种索引,并提供统一的API输出给不同产品。 Lakehouse架构 下面我们分析下该架构的特点。 (1)Lakehouse架构的优点: 链路更短,更适合实时场景,数据新鲜感高; 成本可控,降低了存储成本; 学习成本低,对程序员友好; 运维复杂度大幅降低; (2)Lakehouse架构的缺点 对消息产品和数据湖产品的稳定性、易用性等要求高,同时消息产品需要支持计算场景,数据湖产品需要提供强大的索引功能。 (3)选择 在Lakehouse架构中我们选择消息产品RocketMQ和数据湖产品Hudi。 同时,可以利用RocketMQ stream在RocketMQ集群上将计算层放在其中集成,这样就将计算层降低到一层,能够满足绝大部分中小型大数据处理场景。 接下来我们逐步分析RocketMQ和Hudi两款产品的特点。 RocketMQ Connector & Stream RocketMQ 发展历程图 RocketMQ从2017年开始进入Apache孵化,2018年RocketMQ 4.0发布完成云原生化,2021年RocketMQ 5.0发布全面融合消息、事件、流。 1、业务消息领域首选 RocketMQ作为一款“让人睡得着觉的消息产品”成为业务消息领域的首选,这主要源于产品的以下特点: (1)金融级高可靠 经历了阿里巴巴双十一的洪峰检验; (2)极简架构 如下图所示, RocketMQ的架构主要包含两部分包括:源数据集群NameServer Cluster和计算存储集群Broker Cluster。 RocketMQ 构架图 NameServer节点无状态,可以非常简单的进行横向扩容。Broker节点采用主备方式保证数据高可靠性,支持一主多备的场景,配置灵活。 搭建方式:只需要简单的代码就可以搭建RocketMQ集群: Jar: nohup sh bin/mqnamesrv & nohup sh bin/mqbroker n localhost:9876 & On K8S: kubectl apply f example/rocketmq_cluster.yaml (3)极低运维成本 RocketMQ的运维成本很低,提供了很好的CLI工具MQAdmin,MQAdmin提供了丰富的命令支持,覆盖集群健康状态检查、集群进出流量管控等多个方面。例如,mqadmin clusterList一条命令可以获取到当前集群全部节点状态(生产消费流量、延迟、排队长度、磁盘水位等);mqadmin updateBrokerConfig命令可以实时设置broker节点或topic的可读可写状态,从而可以动态摘除临时不可用节点,达到生产消费的流量迁移效果。 (4)丰富的消息类型 RocketMQ支持的消息类型包括:普通消息、事务消息、延迟消息、定时消息、顺序消息等。能够轻松支持大数据场景和业务场景。 (5)高吞吐、低延迟 压测场景主备同步复制模式,每台Broker节点都可以将磁盘利用率打满,同时可以将p99延迟控制在毫秒级别。 2、RocketMQ 5.0概况 RocketMQ 5.0是生于云、长于云的云原生消息、事件、流超融合平台,它具有以下特点: (1)轻量级SDK 全面支持云原生通信标准 gRPC 协议; 无状态 Pop 消费模式,多语言友好,易集成; (2)极简架构 无外部依赖,降低运维负担; 节点间松散耦合,任意服务节点可随时迁移; (3)可分可合的存储计算分离 Broker 升级为真正的无状态服务节点,无 binding; Broker 和 Store节点分离部署、独立扩缩; 多协议标准支持,无厂商锁定; 可分可合,适应多种业务场景,降低运维负担; 如下图所示,计算集群(Broker)主要包括抽象模型和相对应的协议适配,以及消费能力和治理能力。存储集群(Store)主要分为消息存储CommitLog(多类型消息存储、多模态存储)和索引存储Index(多元索引)两部分,如果可以充分发挥云上存储的能力,将CommitLog和Index配置在云端的文件系统就可以天然的实现存储和计算分离。 (4)多模存储支持 满足不同基础场景下的高可用诉求; 充分利用云上基础设施,降低成本; (5)云原生基础设施: 可观测性能力云原生化,OpenTelemetry 标准化; Kubernetes 一键式部署扩容交付。 RocketMQ 5.02021年度大事件及未来规划 3、RocketMQConnector a、传统数据流 (1)传统数据流的弊端 生产者消费者代码需要自己实现,成本高; 数据同步的任务没有统一管理; 重复开发,代码质量参差不齐; (2)解决方案:RocketMQ Connector 合作共建,复用数据同步任务代码; 统一的管理调度,提高资源利用率; b、RocketMQ Connector数据同步流程 相比传统数据流,RocketMQ connector数据流的不同在于将 source 和 sink 进行统一管理,同时它开放源码,社区也很活跃。 4、RocketMQ Connector架构 如上图所示,RocketMQ Connector架构主要包含Runtime和Worker两部分,另外还有生态Source&Sink。 (1)标准:OpenMessaging (2)生态:支持ActiveMQ、Cassandra、ES、JDBC、JMS、MongoDB、Kafka、RabbitMQ、Mysql、Flume、Hbase、Redis等大数据领域的大部分产品; (3)组件:Manager统一管理调度,如果有多个任务可以将所有任务统一进行负载均衡,均匀的分配到不同Worker上,同时Worker可以进行横向扩容。 5、RocketMQ Stream RocketMQ Stream是一款将计算层压缩到一层的产品。它支持一些常见的算子如window、join、维表,兼容Flink SQL、UDF/UDAF/UDTF。 Apache Hudi Hudi 是一个流式数据湖平台,支持对海量数据快速更新。内置表格式,支持事务的存储层、一系列表服务、数据服务(开箱即用的摄取工具)以及完善的运维监控工具。Hudi 可以将存储卸载到阿里云上的 OSS、AWS 的S3这些存储上。 Hudi的特性包括: 事务性写入,MVCC/OCC并发控制; 对记录级别的更新、删除的原生支持; 面向查询优化:小文件自动管理,针对增量拉取优化的设计,自动压缩、聚类以优化文件布局; Apache Hudi是一套完整的数据湖平台。它的特点有: 各模块紧密集成,自我管理; 使用 Spark、Flink、Java 写入; 使用 Spark、Flink、Hive、Presto、Trino、Impala、 AWS Athena/Redshift等进行查询; 进行数据操作的开箱即用工具/服务。 Apache Hudi主要针对以下三类场景进行优化: 1、流式处理栈 (1) 增量处理; (2) 快速、高效; (3) 面向行; (4) 未优化扫描; 2、批处理栈 (1) 批量处理; (2) 低效; (3) 扫描、列存格式; 3、增量处理栈 (1) 增量处理; (2) 快速、高效; (3) 扫描、列存格式。 构建 Lakehouse 实操 该部分只介绍主流程和实操配置项,本机搭建的实操细节可以参考附录部分。 1、准备工作 RocketMQ version:4.9.0 rocketmqconnecthudi version:0.0.1SNAPSHOT Hudi version:0.8.0 2、构建RocketMQHudiconnector (1) 下载: _  git clone _ (2) 配置: /data/lakehouse/rocketmqexternals/rocketmqconnect/rocketmqconnectruntime/target/distribution/conf/connect.conf 中connectorplugin 路径 (3) 编译: cd rocketmqexternals/rocketmqconnecthudi mvn clean install DskipTest U rocketmqconnecthudi0.0.1SNAPSHOTjarwithdependencies.jar就是我们需要使用的rocketmqhudiconnector 3、运行 (1) 启动或使用现有的RocketMQ集群,并初始化元数据Topic: connectorclustertopic (集群信息) connectorconfigtopic (配置信息) connectoroffsettopic (sink消费进度) connectorpositiontopic (source数据处理进度 并且为了保证消息有序,每个topic可以只建一个queue) (2) 启动RocketMQ connector运行时 cd /data/lakehouse/rocketmqexternals/rocketmqconnect/rocketmqconnectruntime sh ./run_worker.sh Worker可以启动多个 (3) 配置并启动RocketMQhudiconnector任务 请求RocketMQ connector runtime创建任务 curl http://{runtimeip}:{runtimeport}/connectors/{rocketmqhudisinkconnectorname} ?config='{"connectorclass":"org.apache.rocketmq.connect.hudi.connector.HudiSinkConnector","topicNames":"topicc","tablePath":"file:///tmp/hudi_connector_test","tableName":"hudi_connector_test_table","insertShuffleParallelism":"2","upsertShuffleParallelism":"2","deleteParallelism":"2","sourcerecordconverter":"org.apache.rocketmq.connect.runtime.converter.RocketMQConverter","sourcerocketmq":"127.0.0.1:9876","srccluster":"DefaultCluster","refreshinterval":"10000","schemaPath":"/data/lakehouse/config/user.avsc"\}’ 启动成功会打印如下日志: 20210906 16:23:14 INFO pool2thread1 Open HoodieJavaWriteClient successfully (4) 此时向source topic生产的数据会自动写入到1Hudi对应的table中,可以通过Hudi的api进行查询。 4、配置解析 (1) RocketMQ connector需要配置RocketMQ集群信息和connector插件位置,包含:connect工作节点id标识workerid、connect服务命令接收端口httpPort、rocketmq集群namesrvAddr、connect本地配置储存目录storePathRootDir、connector插件目录pluginPaths 。 RocketMQ connector配置表 (2) Hudi任务需要配置Hudi表路径tablePath和表名称tableName,以及Hudi使用的Schema文件。 Hudi任务配置表 _点击__即可查看Lakehouse构建实操视频_ 附录:在本地Mac系统构建Lakehouse demo 涉及到的组件:rocketmq、rocketmqconnectorruntime、rocketmqconnecthudi、hudi、hdfs、avro、sparkshell0、启动hdfs 下载hadoop包 cd /Users/osgoo/Documents/hadoop2.10.1 vi coresite.xml fs.defaultFS hdfs://localhost:9000 vi hdfssite.xml dfs.replication 1 ./bin/hdfs namenode format ./sbin/startdfs.sh jps 看下namenode,datanode lsof i:9000 ./bin/hdfs dfs mkdir p /Users/osgoo/Downloads 1、启动rocketmq集群,创建rocketmqconnector内置topic QickStart:https://rocketmq.apache.org/docs/quickstart/ sh mqadmin updatetopic t connectorclustertopic n localhost:9876 c DefaultCluster sh mqadmin updatetopic t connectorconfigtopic n localhost:9876 c DefaultCluster sh mqadmin updatetopic t connectoroffsettopic n localhost:9876 c DefaultCluster sh mqadmin updatetopic t connectorpositiontopic n localhost:9876 c DefaultCluster 2、创建数据入湖的源端topic,testhudi1 sh mqadmin updatetopic t testhudi1 n localhost:9876 c DefaultCluster 3、编译rocketmqconnecthudi0.0.1SNAPSHOTjarwithdependencies.jar cd rocketmqconnecthudi mvn clean install DskipTest U 4、启动rocketmqconnector runtime 配置connect.conf workerId=DEFAULT_WORKER_1 storePathRootDir=/Users/osgoo/Downloads/storeRoot Http port for user to access REST API httpPort=8082 Rocketmq namesrvAddr namesrvAddr=localhost:9876 Source or sink connector jar file dir,The default value is rocketmqconnectsample pluginPaths=/Users/osgoo/Downloads/connectorplugins 拷贝 rocketmqhudiconnector.jar 到 pluginPaths=/Users/osgoo/Downloads/connectorplugins sh run_worker.sh 5、配置入湖config curl http://localhost:8082/connectors/rocketmqconnecthudi?config='\{"connectorclass":"org.apache.rocketmq.connect.hudi.connector.HudiSinkConnector","topicNames":"testhudi1","tablePath":"hdfs://localhost:9000/Users/osgoo/Documents/basepath7","tableName":"t7","insertShuffleParallelism":"2","upsertShuffleParallelism":"2","deleteParallelism":"2","sourcerecordconverter":"org.apache.rocketmq.connect.runtime.converter.RocketMQConverter","sourcerocketmq":"127.0.0.1:9876","sourcecluster":"DefaultCluster","refreshinterval":"10000","schemaPath":"/Users/osgoo/Downloads/user.avsc"\}' 6、发送消息到testhudi1 7、 利用spark读取 cd /Users/osgoo/Downloads/spark3.1.2binhadoop3.2/bin ./sparkshell \ packages org.apache.hudi:hudispark3bundle_2.12:0.9.0,org.apache.spark:sparkavro_2.12:3.0.1 \ conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' import org.apache.hudi.QuickstartUtils._ import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ val tableName = "t7" val basePath = "hdfs://localhost:9000/Users/osgoo/Documents/basepath7" val tripsSnapshotDF = spark. read. format("hudi"). load(basePath + "/") tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") spark.sql("select from hudi_trips_snapshot").show()
#社区动态 #生态集成
收藏
收藏暂无数据,请从小助手对话框添加
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
无疑 AI答疑专家
当前服务输出的内容均由人工智能模型生成,其生成内容的准确性和完整性无法保证,不代表我们的态度或观点。
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
专家答疑