2025年12月16日

AgentScope x RocketMQ:打造企业级高可靠 A2A 智能体通信基座
引言 Agentic AI 时代已至,在智能客服、代码生成、流程自动化等场景中,多智能体(MultiAgent)协作正从构想走向落地。然而,当多个 Agent 需要像一个团队那样高效协作时,脆弱的通信机制可能因网络抖动或服务宕机,就让整个系统瞬间瘫痪,导致昂贵的计算任务失败、会话状态丢失。 如何为这些聪明的“数字员工”们构建一个真正可靠、高效的通信基座? 本文将为您介绍 Apache RocketMQ 全新推出的轻量级通信模型 LiteTopic,如何在 AI 应用场景中有效简化系统架构、提升稳定性与可靠性,并结合 A2A(AgenttoAgent)协议与阿里巴巴 AgentScope 框架的生产实践案例,深入剖析面向智能体通信的落地实践与技术实现。 RocketMQ for AI:重新定义 AI 应用通信范式 1.1 传统应用:单向、无反馈的事件驱动模式 在传统应用的事件驱动场景中,业务逻辑编排通常由人工预先约定,消息生产方成功发送消息后,便无需关注后续的处理逻辑。 下图以注册系统为例:用户发起账户注册请求后,注册系统向 RocketMQ 发送“新用户注册”的消息后便立即返回,无需关心下游的邮件或短信通知系统如何处理。邮件或短信通知系统再分别从 RocketMQ 拉取消息,驱动各自的发送流程。整条业务链路为单向、无反馈的事件驱动模式。 1.2 从单向事件到双向交互:AI 应用对通信提出新挑战 在 AI 应用场景中,业务逻辑编排通常由大模型动态生成,消息生产方需等待并处理响应结果,才能驱动后续的逻辑执行。 下图以典型的 AI 会话场景为例:用户所连接的 Gateway 不仅需要发送请求,还需要处理推理响应结果,并将结果推送给浏览器,形成完整的交互闭环。 结合真实 AI 应用场景的深度调研,我们发现 AI 场景具有四个显著特征,对底层通信模式提出了全新且严苛的挑战: + 更长的响应时间:传统互联网应用追求毫秒级响应延时,而 AI 应用的响应时长普遍达到分钟级以上。更关键的是,AI 应用单次业务的运行时间具有高度不可预测性。 + 更复杂的交互:AI 应用的多轮对话持续时间长,对话历史可达数十轮甚至更多。单次上下文传输可能达到几十甚至上百 MB,上下文管理难度高。多 Agent 之间的协同编排逻辑更加复杂,需要精确的状态同步。 + 更昂贵的计算资源:AI 推理依赖昂贵的 GPU 资源,瞬时高并发流量可能冲击推理服务稳定性,导致算力资源浪费,并且任务失败重试的成本极高。 + 更精细化的事件驱动:由于计算能力有限,异步事件驱动需要更精准的消费速度控制。同时,必须实现分级的事件驱动策略,以确保高优先级任务优先获得宝贵的计算资源。 1.3 RocketMQ LiteTopic:专为 AI 场景设计的通信模型 为了应对上述挑战,Apache RocketMQ 推出了以轻量级通信模型 LiteTopic 为核心的一系列新特性: + 轻量级通信模型 —— 为海量会话而生 其核心是百万级轻量资源管理能力。基于极低的资源动态创建开销,可轻松支持海量会话(Session)场景,并提供更细粒度的订阅管理,适用于长时 Session、AI 工作流和 AgenttoAgent 交互等场景。 + 企业级上下文管理 —— 让会话状态可靠持久 以连续的消息流完整保存 Session 上下文,通过顺序保障、排他消费等机制严格确保上下文的完整性与一致性。同时原生支持大消息体(数十 MB 甚至更大),轻松满足 AI 场景下庞大数据负载的传输需求。 1.4 LiteTopic 技术解析:百万队列支撑海量并发会话 LiteTopic 基于 RocketMQ 业界领先的百万队列核心技术构建,其底层本质是独立的 Queue。 + 它为每个独立会话(Session)创建一个专属的、低成本的“私有通道”——即轻量主题(LiteTopic),从而能够以极低的资源开销支撑海量并发会话的需求。 + 轻量级的 LiteTopic 在消息分配与发送行为上与顺序 Topic 一致(其所属 Queue 由单一 Broker 独占,消息始终路由至该 Broker,而非在多个 Broker 间轮询发送),这种设计天然确保了消息的严格顺序性,并极大降低了资源管理和路由的复杂度。 1.4.1 LiteConsumer 支持单节点粒度的订阅关系管理 与传统消息队列中“同一 Consumer Group ID(CID)必须全局一致订阅相同 Topic”的强约束不同,LiteConsumer 创新性地支持 CID 内各节点按需进行差异化订阅。每个节点可根据实际负载、业务场景或运行时需求,独立订阅不同的 LiteTopic,从而构建更加灵活、弹性的消费拓扑。 这一机制从根本上规避了因订阅关系不一致所引发的消费异常、重复消费或 Rebalance 风暴等问题,显著提升了系统的灵活性、可扩展性与稳定性。同时,它更契合 AI 时代轻量、动态、点对点的交互模式,为构建轻量级请求响应式消息收发模型提供了原生支持。 1.4.2 LiteConsumer 的核心能力 + 多节点差异化订阅:同一 CID 下的不同节点可独立订阅各自的 LiteTopic,实现细粒度、个性化的订阅策略。 + 动态订阅扩展:支持在运行时实时为单个节点新增 LiteTopic 订阅,无需重启服务或影响其他节点的正常消费。 + 动态退订能力:支持在运行时实时取消单个节点对特定 LiteTopic 的订阅,实现精准的资源释放与流量治理。 1.5 生产案例:RocketMQ LiteTopic 如何重塑 AI 应用架构? 以下案例基于某客户真实的 AI 应用场景,通过架构对比直观展示采用传统 RocketMQ 通信模型与引入 LiteTopic 轻量级通信模型前后的显著差异。 采用 RocketMQ LiteTopic 轻量级通信模型后,客户架构实现了质的提升:不仅彻底移除了对 Redis 的依赖,还避免了广播推送带来的带宽与计算资源浪费。整体架构更轻量,系统稳定性与可靠性也得到显著提升。 1.5.1 改造前:依赖 Redis + 广播的臃肿架构 整体的业务流程步骤如下: 1. 任务提交:用户请求到达后,应用接入层节点将推理任务写入 Redis。 2. 任务处理:Worker 集群扫描 Redis 并处理推理任务,将推理过程中的中间结果以多条顺序消息的形式发送至 RocketMQ。 3. 结果持久化与通知:Consumer 集群顺序消费 RocketMQ 消息,将最终推理结果存入 Redis,并基于 RocketMQ 广播通知所有应用接入层节点。 4. 结果推送:应用接入层节点收到广播消息后,仅当结果归属于自身连接时,才从 Redis 获取完整结果并推送给客户端;否则直接忽略该消息。 传统架构采用“先存储、再广播、后过滤”的模式,在高并发 AI 场景下效率低下且成本高昂: + 架构臃肿且脆弱:强依赖外部组件 Redis,增加了系统的复杂度和潜在故障点,运维成本高,可用性受限。 + 资源浪费严重:无效的广播机制导致大量带宽被占用,且每个应用接入层节点都需进行计算密集型的过滤操作。 + 链路冗长低效:数据流转需多次读写 Redis,通信链路长、延迟高,应用接入层节点宕机后会话状态将全部丢失,严重影响用户体验。 1.5.2 改造后:基于 RocketMQ LiteTopic 的极简可靠架构 引入 LiteTopic 后,业务流程被大幅简化,实现了端到端的可靠、高效通信: 1. 会话绑定与动态订阅:应用接入层节点在发起推理请求时携带唯一身份标识(如 Session ID),并立即订阅该标识对应的 LiteTopic(无需预创建 consumer group、topic)。 2. 结果持久化发送:智能应用(Worker)根据请求中的身份标识,将推理结果直接发送至对应的 LiteTopic(同样无需预创建)。 3. 精准接收消费:应用接入层节点各自精准接收属于自己的 response 消息,无需过滤,无任何冗余消费。 1.5.3 核心价值:为 AI 会话注入“记忆”,实现断点续传与恢复 客户接入 LiteTopic 轻量级通信模型后,通过将 LiteTopic 与 Session 维度进行细粒度绑定,以极低成本实现了生产级的会话续传与恢复能力。 在按照上一小节的流程实现端到端的可靠通信后,在网关机器下线/宕机时: + 自动重连:客户端检测到连接断开后,自动发起重连请求。 + 动态订阅:新接管的应用接入层节点实例根据 Session ID,动态订阅原 session 对应的 LiteTopic(无需预创建)。 + 断点续传:新应用接入层节点从上次成功消费的 Offset 位点开始拉取消息,精准恢复到故障前的状态(不会丢消息,也不会重复消费已处理的消息)。 + 恢复会话:自动恢复 Session 的完整上下文,用户完全无感知,业务流程无缝衔接。 基于 RocketMQ LiteTopic 打造企业级 Session 管理 2.1 AI 场景下 Session 的四大核心要求 在 AI 应用场景下,业界对 Session 的特性提出了以下四项核心要求: + 低延迟:面向实时交互场景,要求快速响应。 + 时序性:必须严格按对话时间顺序组织内容,确保上下文的连续性与逻辑一致性。 + 单会话隔离:保障不同用户/会话间的数据隔离,避免消息串话或状态混淆。 + 上下文压缩:支持通过截断或摘要控制上下文长度,避免超出模型窗口限制导致溢出。 2.2 RocketMQ LiteTopic 实现 Session 的四大优势 基于 RocketMQ LiteTopic 实现 Session 的核心价值,在于将“Session”从内存易失状态转化为可持久、可追溯、可恢复的事件流,为多智能体系统提供企业级会话韧性,彻底解决传统架构中会话状态丢失、无法恢复等痛点。 1. 会话状态持久化 —— 进程重启不丢会话 消息天然持久化存储于 CommitLog,即使应用宕机或网络中断,也能通过消息重放完整重建会话上下文(如对话历史、任务状态、中间结果)。 如下图所示,应用 A 将响应输出的 TaskEvent / TaskUpdateEvent 转换为 RocketMQ LiteTopic 中存储的消息(Message)。当应用 A 重启后,可从 CommitLog 中重放所有消息,完整恢复会话状态。 2. 消息回溯与重放 —— 断点精准恢复 支持按时间 / Offset 回溯消费,应用重启后可从断点精确恢复会话,实现无缝续聊与任务接力,避免重复推理带来的算力浪费。 当应用宕机后重新启动,可以指定某个 Session(LiteTopic)中的具体位点开始继续消费,或从上次消费成功的位点开始消费。 3. Session 隔离与路由 —— 多会话并行无干扰 通过轻量级 LiteTopic 实现会话级隔离(如 Session ID 作为 LiteTopic 的唯一标识),确保多用户/多会话并行运行时互不干扰。 多用户多 Session 的消息存储于不同的 LiteTopic,在数据存储维度实现天然隔离,无需应用层手动过滤。 4. 流量削峰与缓冲 —— 保护下游应用稳定性 高并发会话请求被缓冲至 Broker,避免下游 Agent 瞬时过载崩溃,提升系统整体稳定性。下游应用根据自身处理能力按需消费消息,实现“削峰填谷”。 如下图所示,应用 A 发出的任务请求可在 Broker 中持久化堆积,下游应用 B 根据自身消费能力按需拉取并处理,有效保障系统稳定性。 基于 RocketMQ 构建高可靠 A2A 通信通道 在上一章,我们解决了单个会话的持久化与恢复问题。现在,让我们将视野放大:当成百上千个功能各异的 Agent 需要协作时,它们之间如何建立标准化的通信?这正是 A2A 协议诞生的意义所在。 3.1 A2A 协议 AgenttoAgent(简称 A2A)是一项由 Google 于 2025 年发起,并贡献至 Linux 基金会的开源通信协议。其核心目标是建立跨厂商、跨框架的标准化互操作机制,使异构 AI 智能体(Agents)能够自动发现、可靠通信并高效协作,从而构建开放、可组合、可扩展的多智能体系统生态。 3.2 单智能体 vs. 多智能体架构:能力边界与协同范式的演进 在深入探讨如何构建 A2A 通信之前,我们首先需要理解,为什么多智能体协同是必然趋势。我们从六个维度对比单智能体与多智能体的能力差异: 3.3 同步 RPC 与 RocketMQ 异步通信的对比 明确了多智能体架构的优势后,下一个关键问题是:如何实现 Agent 之间的通信? A2A 协议原生支持的同步 RPC 协议包括 JSONRPC、gRPC 和 REST。然而,在企业级的复杂场景下,这些同步协议面临诸多挑战。下表从多个维度对比同步 RPC 与 RocketMQ 异步通信模型的差异: 3.4 开箱即用:基于 RocketMQ 的 A2A 协议实现 为加速 A2A 协议在异步通信场景的落地,我们基于 RocketMQ SDK 实现了 A2A 协议的 ClientTransport 接口。该实现旨在帮助用户在搭建多智能体应用时,能够专注于自身业务逻辑,快速构建高可靠、开箱即用的 A2A 通信方案。 发送普通同步请求:EventKind sendMessage(MessageSendParams request, @Nullable ClientCallContext context)发送Stream请求:void sendMessageStreaming(MessageSendParams request, Consumer eventConsumer…)重订订阅任务数据:void resubscribe(TaskIdParams request, Consumer eventConsumer, Consumer errorConsumer查询任务完成状态:Task getTask(TaskQueryParams request, @Nullable ClientCallContext context)取消任务执行Task cancelTask(TaskIdParams request, @Nullable ClientCallContext context)以及其他方法 开源项目地址 基于 RocketMQ 实现的 A2A 通信 RocketMQTransport 部分代码现已开源,欢迎关注! 项目地址:_https://github.com/apache/rocketmqa2a_ 3.5 架构解析:如何通过 RocketMQ 实现 Agent 间通信? 在一个典型的多智能体协作架构中,通信流程如下: + 应用 A 扮演 Supervisor 角色,负责对用户输入的需求进行任务分解,并将拆分后的子任务分别发送至应用 B 的业务 Topic(Normal Topic1)和应用 C 的业务 Topic(Normal Topic2)。 + 应用 B 集群从 Normal Topic1 拉取消息并执行相应逻辑处理,随后将结果发布到应用 A 订阅的 LiteTopic。 + 应用 C 集群则从 Normal Topic2 拉取消息进行处理,并同样将结果写入该 LiteTopic。 + 应用 A 集群通过拉取 LiteTopic 中的消息,汇聚各子任务的响应结果,进而驱动后续的业务逻辑编排。 AgentScope × RocketMQ:构建多智能体应用的最佳组合 理论与架构已经铺垫完毕,接下来,让我们结合一个完整的实战案例,看看如何将这套强大的通信基座,与顶尖的智能体开发框架 AgentScope 相结合,构建一个真正可用的多智能体应用。 4.1 AgentScope:面向多智能体的开发者友好框架 AgentScope 是阿里巴巴继 AI 模型社区 ModelScope 后,在 Agent 领域的又一战略级开源力作。它以“开发者为中心”,专注于提供智能体开发的开源框架,为构建复杂的多智能体应用提供了从设计、开发到调试的全套解决方案。它具备以下核心优势: + 对开发者透明:拒绝隐式魔法,所有环节(提示、API、智能体、工作流)可见、可控。 + 实时可介入:原生支持运行时中断与自定义处理。 + 更智能:内置工具管理、长期记忆、智能 RAG 等能力。 + 模型无关:一次编写,无缝适配各类大模型。 + 乐高式构建:模块化设计,组件解耦、自由组合。 + 面向多智能体:显式消息传递与工作流编排,专为协作场景打造。 + 高度可定制:全面开放工具、提示、智能体、工作流及可视化扩展,鼓励深度定制。 4.2 AgentScope x RocketMQ 的集成架构与合作展望 在明确了 AgentScope 的功能定位与应用价值之后,我们将进一步探讨其通信层与 RocketMQ 的现有集成机制,并展望双方在技术协同与生态共建方面的未来合作方向。 4.2.1 AgentScope 与 RocketMQ 集成架构 当 AgentScope 作为 Agent 应用服务提供者时,其内部支持符合 A2A(AgenttoAgent)协议的多种通信方式,包括基于 JSONRPC 的 WebService 和 RocketMQ Service,用于接收并处理来自其他 Agent 的 A2A 协议请求。同时,AgentScope 通过 wellknown 服务接口向外标准化地透出其所承载 Agent 的核心能力信息,包括但不限于: name(名称) description(描述) capabilities(能力列表) additionalInterfaces(额外支持的接口或协议) 这些元数据使调用方能够清晰识别该 Agent 提供的主要功能、所支持的通信协议及其对应的接入方式。 当 AgentScope 作为 Agent 应用服务的调用者时,它首先通过访问目标 Agent 暴露的 wellknown 服务,动态获取其详细的能力描述、支持的协议类型及对应的服务接入点(如 JSONRPC 端点或 RocketMQ Topic 信息)。随后,在通信层,AgentScope 利用 A2A 协议定义的传输客户端(如 JSONRPCTransport 或 RocketMQTransport)发起请求,并对返回的响应结果进行统一解析与处理,从而实现跨 Agent 的标准化、可互操作的协同调用。 1. 基于 RocketMQ 协议通信架构图 2. 基于 JSONRPC 协议通信架构图 4.2.2 合作展望 随着人工智能与分布式系统技术的深度融合,消息中间件与智能体(Agent)平台的协同正成为构建下一代智能分布式应用的关键路径。作为 Apache 软件基金会顶级项目,RocketMQ 凭借高吞吐、低延迟和高可靠等核心特性,已成为全球广泛采用的分布式消息队列,在金融、电商、物联网等关键领域积累了深厚的技术积淀,并于近期推出了轻量级通信模型 LiteTopic,进一步拓展了其在 AI 应用场景中的适用性。与此同时,AgentScope 作为新兴的智能体编排与运行平台,专注于为多智能体系统提供统一的调度、通信与治理能力。二者在技术理念与应用场景上高度契合,展现出广阔的合作前景与协同创新潜力。 1. 技术互补:构建“消息驱动 + 智能决策”的新型架构 RocketMQ 提供了强大的异步通信、事件驱动和流式处理能力。AgentScope 则聚焦于智能体生命周期管理、任务分解、上下文感知与自主协作。未来,二者可深度融合,形成“消息即事件、事件触发智能体行为”的新型架构: + 智能体间通信的标准化通道:利用 RocketMQ 作为 AgentScope 内部或跨集群智能体之间的可靠通信总线,保障高并发、有序、可追溯的消息传递,提升多智能体系统的鲁棒性与扩展性。 2. 生态共建:推动开源与标准协同发展 双方可基于开源社区开展深度合作: + 集成适配器开发:共同开发 RocketMQ 与 AgentScope 的官方集成插件,简化开发者接入流程。 + 联合参考架构发布:推出面向典型场景(如智能客服等场景)的联合解决方案模板,加速行业落地。 + 参与标准制定:在事件驱动架构(EDA)、智能体通信协议等领域协同推进开放标准,促进生态互操作性。 4.3 场景案例:用 AgentScope 与 RocketMQ 打造“智能旅行助手” 本案例以 AgentScope 作为 AI 智能体应用开发框架,构建了三个智能体: + SupervisorAgent(总控):负责与用户交互,任务分解与逻辑编排。 + WeatherAgent(天气专家):负责查询天气信息。 + TravelAgent(旅行专家):负责依据天气进行用户的行程规划。 SupervisorAgent 应用具有如下逻辑: + 如果用户只查询天气情况,则直接请求 WeatherAgent 进行天气信息查询; + 如果用户希望做出行程规划,则先向 WeatherAgent 发出天气查询请求,获取对应天气信息后,再带着天气信息向 TravelAgent 发出行程规划请求,TravelAgent 对行程结果进行规划后将响应结果发送至 SupervisorAgent 订阅的 LiteTopic,SupervisorAgent 应用将结果发送至用户侧。 实战演练:三步构建高可靠多智能体应用 阿里云官网现已提供免费试用、一键部署的《》,带您亲手搭建并运行一个多智能体应用,并基于 RocketMQ LiteTopic 实现多智能体异步通信能力——具备持久化、高可靠、可追溯等特性,显著提升 AI 应用交互的稳定性与可观测性。 5.1 方案概览:技术架构与云资源 本方案将带领您搭建一个多智能体(MultiAgent)系统,能够根据用户的需求查询天气信息并制定行程规划。 为简化部署过程,我们将在 1 台云服务器 ECS 上部署 3 个独立的 Agent(SupervisorAgent,WeatherAgent 和 TravelAgent,具体功能可参考 4.3),并且通过 RocketMQ 消息服务实现 Agent 之间的异步通信。 本方案的技术架构包含构建一个完整多智能体应用所需的所有云资源: 5.2 三步体验:从创建资源到部署 Agent 1. 免费一键部署资源 访问体验方案页面,点击“免费试用”,进入实验操作界面后,点击“立即试用”即可领取免费试用点,自动开始创建资源。 2. 创建 Topic 和 Group 共创建 3 个 Topic,配置参数见下表,其余参数保持默认。 共创建 3 个 Group,配置参数见下表,其余参数保持默认。 3. 创建部署智能体应用 在阿里云百炼的应用管理页面,根据示例文档中提供的模型参数和提示词,分别创建并发布两个智能体应用(天气助手 Agent、行程助手 Agent)。 远程连接云服务器 ECS 根据提供的执行脚本部署示例应用程序。等待应用启动完毕,大约需要 3~5 分钟,直到终端显示 You 提示符,便可直接在终端中输入信息与智能体交互。 5.3 结果验证:任务执行与消息轨迹追踪 1. 在 You 提示符后,输入 帮我做一个下周三到下周日杭州周边自驾游方案 并回车。 2. 等待智能体执行任务,最终会返回结合天气信息的行程规划内容,过程如下: a. SupervisorAgent 接收用户输入,向消息队列发送一条消息 杭州下周三到周日的天气情况怎么样?。 b. WeatherAgent 监听到上述消息,执行天气查询,并将结果发往消息队列。 c. SupervisorAgent 监听到上述消息,获取了天气查询结果,然后向消息队列发送一条消息 杭州下周三至周日天气已知,天气为,请基于此制定一份从杭州出发的周边2人3天4晚自驾游行程规划(下周三出发,周日返回),包含住宿、餐饮与景点推荐。 d. TravelAgent 监听到上述消息,执行行程规划,并将结果发往消息队列。 3. 查看消息轨迹:在云消息队列 RocketMQ 版实例详情页,可以按 Topic 或按 LiteTopic 查询到相关的消息轨迹。 目前,该解决方案已在阿里云官网上线,欢迎点击即可部署体验~ 邀请您钉钉扫码加入 RocketMQ for AI 用户交流群,探索更多产品功能与应用场景,与我们共建 AI MQ 的未来!
#技术探索

2025年11月27日

多源 RAG 自动化处理:从 0 到 1 构建事件驱动的实时 RAG 应用
前言 当企业想用大模型和内部非公开信息打造智能问答系统时,RAG(RetrievalAugmented Generation,检索增强生成)已成为必备技术。然而,在实际落地中,构建 RAG 应用的数据准备过程繁琐复杂且充满挑战,让很多企业和开发者望而却步。本文将介绍构建 RAG 的最佳实践:通过阿里云事件总线 EventBridge 提供的多源 RAG 处理方案,基于事件驱动架构为企业 AI 应用打造高效、可靠、自动化的数据管道,轻松解决 RAG 数据处理难题。 为什么 RAG 是治愈模型幻觉的“良方”? 大语言模型(LLM)就像一个博览群书、记忆力超群的“学霸”,尽管文采斐然、对答如流,但偶尔也会犯一些令人啼笑皆非的错误,比如凭空编造事实或提供过时信息,这就是我们常说的“模型幻觉”。 这背后的原因很简单:这位“学霸”的知识完全来自于“毕业前”学过的海量教材(即训练数据),尽管覆盖了维基百科、新闻、书籍等通用知识和各领域的专业知识,但存在两个天然局限: + 知识领域局限:它对企业内部、垂直领域等私域知识知之甚少。比如,它不了解你公司内部的规章制度,也无法接触电商平台的用户数据等非公开信息。 + 知识时效局限:它的知识更新停留在训练数据截止的那个时间点,无法获取实时信息,比如股票行情、时事新闻等不断更新的动态数据。 为了治好大语言模型“一本正经胡说八道”的毛病,我们必须让它从“闭卷考试”升级为“开卷考试”,RAG(RetrievalAugmented Generation,检索增强生成)技术应运而生。 RAG 的核心理念,可以通俗地理解为“先查找资料,再生成答案”。当收到一个问题时,它不会让大模型直接凭记忆回答问题,而是分两步走: 1. 检索(Retrieval):从一个可随时更新的外部知识库(如企业内部文档、产品手册等)中,快速检索出与问题最相关的信息片段。 2. 生成(Generation):将检索到的信息片段连同用户问题一起作为上下文提供给大模型,引导它基于这些可靠的“证据”生成准确、有理有据且可追溯来源的回答。 _(来自阿里云大模型平台服务百炼 知识库功能 文档示例)_ 通过这种方式,不仅能有效减少模型幻觉,大幅提升生成答案的准确性与时效性,还让模型在无需耗费巨资和时间进行重新训练的情况下,就能轻松扩展知识边界。凭借这些显著优势,RAG 已成为企业构建可靠、智能 AI 应用的首选方案。 RAG 落地挑战:数据处理的“三重困境” 尽管 RAG 的原理听起来简单明了,但在实际落地时,无数企业和开发者却深陷数据处理的泥潭。 AI 时代的数据处理,与过去以结构化数据为主的传统数据处理模式截然不同。我们面对的是由海量、异构、多模态数据构成的洪流,数据处理的复杂度和挑战呈指数级增长。企业对实时性要求也不断提高,任何数据延迟都可能影响模型效果。 企业和开发者在落地 RAG 时,普遍会陷入数据处理的“三重困境”: 1. 扩展之困——异构化数据源的“接入鸿沟” 现代企业的数据通常散落在 ERP、CRM、OA、IoT 设备、社交媒体等数十个系统中,涵盖结构化数据(如数据库、表格)、非结构化数据(如 PDF、网页、图片、音视频)和半结构化数据(如 JSON、XML)。若采用传统点对点连接的数据集成方式,每接入一个新数据源,都需要复杂的定制化开发,扩展性极差,响应速度慢,严重拖慢 AI 应用的迭代速度。 2. 运维之难——脆弱数据管道的“运维噩梦” RAG 的数据处理链路漫长且复杂,涉及数据采集、清洗、切块、向量化、入库、检索等多个环节。整条链路如同一个脆弱的“黑箱”,任何一个环节的微小故障都可能导致全链路瘫痪。在实际运维过程中,数据源接口变更、数据质量问题、系统负载突增等突发状况层出不穷,数据管道的问题排查、修复和系统更新,都极其耗时耗力,让运维团队疲于奔命。 3. 稳定之痛——数据管道的“可靠性危机” 数据管道的稳定性是 AI 应用落地的基石。数据丢失、重复、延迟、质量下降以及系统故障等数据处理链路中的任何问题,都可能直接导致模型推理结果的偏差甚至错误,进而影响业务决策和用户体验。传统数据处理架构的紧耦合设计,导致任何一个组件故障都可能影响整个系统运行,并且缺乏有效的监控和告警机制,往往在造成严重影响后才发现问题。 因此,我们迫切需要一种全新的数据处理范式,来构建一个灵活、可扩展、实时、智能的数据处理管道。 破局之道:事件架构驱动重塑 AI 数据管道 事件驱动架构(EventDriven Architecture,EDA)为应对 AI 数据处理的复杂性挑战,提供了坚实的技术基础。在事件驱动架构中,“事件(Event)”是核心概念,它本质上是一次状态变化的数字化表达。在 AI 数据处理场景中,数据的产生、变更、处理、存储等各个环节都可以被抽象为事件。例如,当新的训练数据上传到系统时,产生数据接收事件;当数据经过清洗和转换后,产生数据处理完成事件;当向量化处理完成后,产生向量生成事件;当数据成功存储到向量数据库后,产生数据入库事件。 这种“事件化”的处理方式,使整个 AI 数据处理流程变得标准化、清晰、可控且可追溯,带来三大优势: 1. 松耦合 数据处理流程被分解为独立的事件和处理单元。数据工程、算法、平台等团队可以独立开发、部署和迭代各自负责的组件,无需关心对方的内部实现。一个组件的变更不会影响其他部分,系统容错能力和迭代效率更高。 2. 可扩展性与稳定性 每个组件都可以根据实际负载独立扩展,当某个组件成为瓶颈时,只需增加该组件的实例数量,而无需对整个系统进行扩容。同时,通过引入智能监控和自动恢复机制,系统能够及时发现和处理各种异常情况,保证数据链路稳定运行。 3. 端到端实时性 在智能客服、实时推荐等场景中,毫秒级的响应至关重要。事件驱动架构可以确保事件一旦发生,便能被立即捕获并触发后续处理。这使得 RAG 的知识库能够近乎实时地吸收新信息,让大模型始终掌握着最新“情报”。 综上所述,采用事件驱动架构的系统在敏捷性、可扩展性和可靠性方面实现了质的飞跃,这正是 AI 应用规模化落地的基石。 EventBridge 多源 RAG 处理方案:为 AI 场景提供高效数据管道 阿里云事件总线 EventBridge 基于事件驱动架构,将 AI 能力深度融入数据处理全链路,为企业和开发者提供专为 AI 应用设计的、端到端的、智能化的数据处理中间件。 EventBridge 通过一系列 ETL for AI Data 的全新能力,提供多源 RAG 处理方案:将 RAG 数据准备的全流程(从多源异构数据提取、清洗、切块、向量化再到入库)彻底实现自动化。 开发者现在可以通过 EventBridge 简单的“白屏化”配置,轻松实现: 1. 无缝对接多源数据 轻松接入主流的对象存储(OSS)、消息队列(如 Kafka、RocketMQ、MQTT)、日志服务(如 SLS)、数据库服务(如 MySQL)等多种数据源,覆盖结构化数据(如数据库、表格)、非结构化数据(如 PDF、网页、图片、音视频)和半结构化数据(如 JSON、XML)。 2. 智能化的数据处理 自动完成文档解析(Loader)、文本切分(Chunking)和向量化(Embedding)的完整数据转换流程,内置多种核心技术,支持多种非结构化数据(如 TEXT、JSON、XML、YAML、CSV)的智能解析和处理,提供完整的 Loader 技术体系,包括多种分块策略、单文档加载、批量数据加载,确保大规模数据的可靠处理;对结构化数据采用流式处理架构,能够实时处理高吞吐量的数据流,可实现复杂的流式数据转换和聚合操作。 3. 一键式向量入库 提供统一的向量数据库接入接口,支持将处理好的向量数据直接加载到主流向量数据库(如 DashVector、Milvus)中,也兼容传统数据库的向量扩展插件。只需简单的图形界面配置(拖拽方式配置数据源、处理逻辑、目标数据库等),系统会自动生成复杂的向量数据处理和入库流程。提供丰富的预置模板,可基于模板快速搭建数据处理流程。提供完善的监控仪表板和告警机制,可实时查看数据处理的状态、性能指标、错误信息等,及时发现和解决问题。 场景实践:从 0 到 1 构建基于事件驱动架构的实时 RAG 应用 接下来,我们将通过一个完整的实战场景,带你从零开始,利用阿里云事件总线 EventBridge、对象存储 OSS、函数计算 FC、向量检索服务 DashVector 和大模型服务平台百炼,快速构建一个实时的 RAG 应用。 方案概览 + 首先,通过 EventBridge 构建一个高效的 ETL 数据管道:能够自动从数据源(对象存储 OSS)中实时提取数据,通过函数计算 FC 灵活定义数据转换的逻辑,进行清洗、切块和向量化,并将处理结果持续加载到目标(向量检索服务 DashVector),形成一个动态更新的知识库。 + 然后,通过函数计算(FC)的 Web 函数构建一个简单的 RAG 应用,调用大模型服务平台百炼进行推理,以 DashVector 中的向量数据作为知识库。 + 最后,我们通过输入与知识库相关的用户问题,测试 RAG 应用的回答效果。 方案架构 方案提供的默认设置完成部署后,在阿里云上搭建的系统如下图所示。实际部署时您可以根据资源规划修改部分设置,但最终形成的运行环境与下图相似。 实施步骤 1. 构建自动化数据管道: 1. 创建事件流:在事件总线 EventBridge 控制台创建并配置一个事件流,作为数据处理管道的核心。 2. 配置数据源与目标:创建并配置对象存储 OSS Bucket 作为数据源(Source),创建并配置向量检索服务 DashVector 作为数据投递的目标(Sink)。 3. 配置数据转换逻辑(Transform):选择“内容向量化”的函数模板创建一个函数,并在函数代码中填写获取的百炼 APIKEY,这个函数将负责对数据进行切块和向量化。 2. 构建 RAG 应用: 1. 创建 Web 函数:创建一个 Web 函数(注意和之前创建的用于处理数据流的事件函数区分)。 2. 编写应用代码:这个函数将作为 RAG 应用的后端,负责接收用户查询,从 DashVector 检索知识,并调用百炼大模型生成回答。需要在函数代码中配置百炼和向量检索服务 DashVector 的相关访问凭证(如 APIKEY、Endpoint 等)。 3. 部署应用:部署代码成功后,RAG 应用即构建完成并可供访问。 效果验证 1. 更新知识库:将包含私有数据的文件(例如,一份名为百炼系列手机产品介绍.txt 的文档,包含了虚拟手机厂商的商品数据)上传到 OSS Bucket 中。 2. 查看向量生成:文件上传成功后,EventBridge 会自动捕获这一事件并触发数据处理流程。稍等片刻,即可在 DashVector 控制台查看已生成的向量。 3. 测试问答效果:通过创建的 RAG 应用发起访问,输入一个与你上传文档相关的问题,例如:“百炼 X1 手机的分辨率是多少?”。 4. 获取精准回答:RAG 应用会自动检索知识库,并将相关信息连同问题一起发送给百炼大模型。很快就会收到一个基于私有数据生成的精准回答。在函数的执行日志中,还可以看到向量检索召回的具体原文片段,从而验证整个 RAG 链路的有效性。 目前,该解决方案已在阿里云官网上线,欢迎点击即可部署体验~ 邀请您钉钉扫码加入 EventBridge 用户交流群,探索更多产品功能,与我们共同定义和构建 AI 数据处理的未来!
#技术探索

2025年11月27日

从 Transform 到 Transformer,用 EventBridge 与百炼构建实时智能的 ETL 数据管道
前言 作为数据处理领域的经典模式,ETL(ExtractTransformLoad)通过提取、转换、加载三个步骤,高效地处理着各类结构化数据。然而,面对 AI 时代海量、异构、实时的“数据洪流”,传统 ETL 链路,尤其是其核心的转换(Transform)环节,正面临严峻挑战。本文将从一个初级开发者也能理解和上手的视角,探讨 AI 时代的数据处理新范式:如何利用基于 Transformer 架构的大语言模型(LLM)重塑传统数据处理中的转换(Transform)环节,并结合事件驱动架构(EventDriven Architecture, EDA),为 AI 数据处理链路“注入实时智能”。 传统 ETL 在 AI 时代的困境:“T”不仅是“转换”,更是“痛点” ETL(ExtractTransformLoad)是数据处理的核心流程。它负责从不同数据源中提取数据,经过清洗、转换和整合后,加载到统一的数据仓库,为后续的数据分析与商业智能提供支撑。 我们可以将 ETL 生动地比作一个“中央厨房”:“提取(Extract)” 是采购生鲜食材(原始数据),“加载(Load)” 是将精美菜肴(可用数据)端上餐桌,而 “转换(Transform)” 则是至关重要的烹饪环节。在过去,厨房采购的多是规格统一的食材(结构化数据),依靠“老菜谱”(固定的规则代码)便能高效、稳定地完成处理。然而,AI 时代带来了形态各异、数量庞大的“山珍海味”——文本、图片、音频、视频等非结构化数据,且往往要求实时处理。这使得厨房里最关键的烹饪环节(Transform)不堪重负,那套为标准食材设计的“老菜谱”已然捉襟见肘。 以一个常见的地址信息标准化场景为例。用户输入的地址格式五花八门,可能是文本、文档、扫描图片以及客服对话等。后端系统为了便于入库与分析,必须将这些信息精准地解析为“省市区街道”的标准化结构。更棘手的是,原始地址信息还可能存在错别字、信息缺失或语义模糊等问题。在传统 ETL 模式下,工程师不得不编写数千行复杂的正则表达式和判断逻辑,并维护一个庞大的地址库。这种方式不仅开发和维护成本高、扩展性差,而且难以覆盖所有非标准格式和异常情况。 这正是传统 ETL 在 AI 时代“力不从心”的缩影,但也只是冰山一角。在 AI 应用中,数据转换的需求远不止于此,例如:从用户评论中提取情感倾向、为非结构化文档打标签、从图片中识别特定内容等。传统基于固定规则的 Transform 工具,面对这些充满“语义”和“不确定性”的任务,几乎束手无策。最终,数据处理链路变得难以扩展、运维复杂且稳定性差,严重拖慢了 AI 应用的创新步伐。 为 ETL 更换“AI 大脑”:用 Transformer(AI)代替 Transform 既然传统 Transform 难以理解数据背后的“语义”,为何不让擅长此道的 AI 来主导这个环节呢? 本文巧妙地运用了“Transformer”一词的双关含义:它既指代 ETL 中的“转换器”(Transformer),更特指驱动大语言模型(LLM)的核心架构——Transformer。 这正是“用 Transformer 代替 Transform”的核心思想:将大语言模型(LLM)的智能直接注入 ETL 的数据转换环节。许多过去需要复杂代码才能实现的数据转换任务,如今通过与 AI 交互即可轻松完成。 回到我们的“中央厨房”比喻,现在负责加工菜品的,不再是一本固化的菜谱,而是一位拥有米其林星级水准、能理解并处理任何食材的全能大厨。 以前文提到的“地址信息标准化”场景为例,集成了 LLM 的数据管道能做到: + 结构化解析:利用大模型的语义理解能力,即使地址格式不规范、字段顺序混乱,也能准确识别出省、市、区、街道等核心要素。例如,对于“北京市海淀区中关村大街1号”,大模型能准确解析出结构化字段:省份为“北京”,城市为“北京”,区县为“海淀区”,街道为“中关村大街1号”。 + 自动识别纠错:基于大模型强大的语言知识与上下文推理,自动识别并纠正地址信息中的错别字或格式错误,再结合地理知识库,能够进一步确保纠正的准确性。例如,将“北京市海定区”纠正为“北京市海淀区”,将“中关村大街一号”标准化为“中关村大街1号”。 + 智能信息补全:借助大模型的上下文推理能力,并集成完整的地理信息数据库,能够根据已有地址信息智能补全缺失的部分。例如,通过详细地址自动推断邮政编码,根据区县信息补全城市和省份。 + 格式标准化:最终,所有解析后的信息将被统一输出为标准格式,如结构化的字段信息(省、市、区、街道、邮政编码等)和规范化的地址字符串,确保下游系统能够直接、高效地进行数据处理和分析。 这样,我们就不再需要维护复杂的规则库,只需在数据流中集成大语言模型的能力,即可实现智能化的数据清洗和标准化处理。 事件驱动架构(EDA):ETL for AI Date 的“天选载体” 解决了数据“如何转换”的问题后,我们还需要一个现代化的架构来承载整个流程。相较于传统的批量处理模式,事件驱动架构(EventDriven Architecture, EDA)已被公认为是 AI 时代数据处理的理想选择。 如果说“用 Transformer 代替 Transform”是为数据管道植入“AI 大脑”,那么 EDA 就是构建这套智能系统的“神经网络”,让数据和 AI 指令高效地流动起来。它将系统中的每一次状态变化(如“新文件上传”、“新评论产生”)都视为一个独立的“事件”,实时触发相应的处理动作,并准确地将处理结果路由到指定的下游系统。 阿里云事件总线 EventBridge:AI 时代的事件中枢 作为 AI 时代的企业级事件枢纽,事件总线 EventBridge 为 AI 数据处理提供强大的事件驱动能力。它能够帮你轻松构建高效、智能的 ETL 数据管道,无缝集成 AI 能力,并且原生支持实时推理和异步推理两种模式,以满足不同场景的需求。其核心价值体现在以下几个方面: + 实时智能决策:将 AI 推理能力无缝融入数据处理流程,使系统能对实时数据进行智能分析与决策,极大提升业务的智能化水平。 + 系统解耦、灵活扩展:数据源、AI 模型和业务系统通过 EventBridge 完全解耦,任何组件的变更都不会影响其他部分,让系统更加灵活、更易于维护和扩展。 + AI 能力沉淀与复用:相同的 AI 推理服务可被多个业务场景复用,避免重复开发,最大化 AI 模型的价值与利用率。 + Serverless 与易用性:作为全托管的 Serverless 服务,EventBridge 免去了繁琐的部署和运维工作。通过简单的控制台配置或 API 调用,即可快速构建和管理事件驱动的数据处理流程,并支持事件过滤、转换、路由、重试等高级能力。 核心能力:从不确定到确定的“结构化输出” 在 ETL 的 “转换(Transform)” 环节中,EventBridge 允许用户直接调用大语言模型(例如通义千问或百炼大模型服务平台上的模型),甚至是更复杂的 AI Agent。 但问题来了:LLM 的输出具有不确定性,有时像“开盲盒”,这在要求稳定可靠的企业级应用中是无法接受的。 为了解决这个核心痛点,EventBridge 提供了强大的结构化输出(Structured Output)能力,确保 AI 的每一次处理结果都以稳定、可靠的格式返回。 例如,当需要模型分析用户评论的情感时,我们期望的不是“我觉得这句话是积极的”这类模棱两可的文本,而是一个清晰的 JSON 对象,如:{ "sentiment": "积极", "summary": "产品设计和性能出色,客户非常满意" }。 EventBridge 通过以下两项核心技术,产品化地实现了这一关键能力: + JsonSchema 原生支持:对于支持 JsonSchema 的模型,EventBridge 会利用其原生能力,确保模型输出严格遵循用户定义的格式,实现高性能与高可靠性。 + 智能提示词注入:对于不支持 JsonSchema 的模型,EventBridge 会采用智能提示词注入技术,引导模型生成稳定、可靠的结构化输出,并确保多轮对话格式的一致性。 这项能力极大地降低了在 ETL 流程中使用 AI 的门槛,为企业级应用的稳定性提供了坚实保障。 场景实践:EventBridge+百炼,让 AI 赋能数据链路 我们以一个具体的场景为例,展示如何通过阿里云的事件总线 EventBridge 与大模型服务平台百炼的结合,构建 AI 赋能的数据链路。 通过在 EventBridge 的事件流中原生集成百炼大模型服务,使开发者可以在数据流中直接调用 AI,从而轻松、高效地完成传统 ETL 中复杂的转换(Transform)任务。 应用场景:敏感信息过滤与数据脱敏 本方案将演示如何利用 AI 对实时数据流进行清洗与脱敏处理,自动发现并屏蔽业务数据中的敏感关键词,从而保障业务的数据合规性。 方案架构与实施步骤 该方案的核心是利用 EventBridge 作为数据中枢,将来自不同数据源的实时数据流,分发给百炼 AI 模型进行推理,再将推理结果通过 EventBridge 路由到下游的业务系统或数据存储中。 下图展示了该方案的参考架构: 实施该方案主要包括以下三个步骤: 1. 构建数据管道:创建一个 EventBridge 事件流,作为整个数据处理的核心链路。 2. 配置数据源与目标:创建两个轻量消息队列,分别作为事件流的源(Source)和目标(Sink)。 3. 集成 AI 能力:在事件流的转换(Transform)环节,配置调用已开通的阿里云百炼大模型服务,将 AI 的智能处理能力无缝嵌入数据管道中。 整个流程通过在 EventBridge 控制台进行简单的点击和配置即可完成,无需编写复杂的代码来连接数据源、AI 模型和下游系统,大大提升了开发效率。 实现效果 部署完成后,可以通过以下方式验证数据脱敏效果:从 Source(源)队列发送带有敏感信息的数据,然后在 Sink(目标)队列接收经过事件流处理后的数据。 + 输入示例:包含敏感信息的数据,如["客户张三(13812345678)反馈了一个问题..."] + 输出示例:经过 AI 脱敏后的数据,如["客户(1385678)反馈了一个问题..."] 注:由于大模型生成结果存在随机性,实际测试时输出结果的格式可能存在差异。在生产环境中,可以结合 EventBridge 的结构化输出能力与提示词工程,获得稳定可靠的输出结果。 开发者关心的问题(Q&A) 问:每次数据转换都调用 LLM,成本会不会很高? 答:成本是大家最关心的问题。首先,EventBridge 本身是 Serverless 服务,按实际处理的事件数量付费。主要的成本来自 LLM 调用,但你可以根据场景需求灵活选择不同规模和成本的模型。对于简单任务,轻量级模型已足够,成本较低。对于非实时场景,可采用异步批量调用的方式,进一步优化成本。 问:LLM 推理有延迟,会影响实时性吗? 答:事件驱动架构的异步和解耦特性天然适合这种情况。数据源发送事件后无需等待处理结果,整个系统流程不会被阻塞。EventBridge 支持实时和异步两种推理模式,你可以根据业务对延迟的敏感度进行选择,确保用户体验不受影响。 总结 用 Transformer(LLM)升级 Transform(转换),并以事件驱动架构(EDA)作为承载,是 AI 时代数据处理范式的一次“智”变。阿里云 EventBridge 与百炼的结合,为开发者提供了一条低门槛、高灵活性的路径,将强大的 AI 能力无缝融入实时数据流,让你的应用轻松实现智能化。 目前,该解决方案已在阿里云官网上线,欢迎点击即可部署体验~ 邀请您钉钉扫码加入 EventBridge 用户交流群,探索更多产品功能,与我们共同定义和构建 AI 数据处理的未来!
#技术探索

2025年11月7日

官宣上线!RocketMQ for AI:企业级 AI 应用异步通信首选方案
企业级 AI 应用开发面临新挑战 随着人工智能技术的飞速发展,模型迭代日新月异,企业正积极构建 AI 应用以提升用户体验和降低人力成本。然而,与传统微服务应用相比,企业在推进 AI 应用落地的过程中,普遍呈现出三个显著特征: + 任务处理耗时长:传统微服务应用通常能实现毫秒级响应,而 AI 应用的处理周期跨度极大——从几分钟到数小时不等。这种长耗时与不确定性,要求系统架构必须在任务调度、资源分配和用户体验设计上进行重新考量,避免同步调用带来的长时间阻塞。 + 算力资源稀缺性且成本高昂:AI 应用的训练与推理高度依赖 GPU 等稀缺且昂贵的算力资源。因此,任何因网络或应用异常导致的任务重复处理,都会直接造成算力资源浪费和成本增加。如何保障任务在异常情况下不丢失、不重复,成为控制成本的关键。 + 算力利用率与业务流量波动的矛盾:业务请求天然存在波峰波谷。为应对流量高峰以保障服务稳定,企业需要预留大量算力,导致流量低谷时资源闲置;反之,若为节约成本而缩减资源,又难以应对高峰请求,可能导致系统过载或任务积压。如何在有限算力下实现高效调度,既提高资源利用率,又保障高优任务及时响应和系统稳定性,构成了一个核心矛盾。 这些业务特点在 AI 应用的开发和集成过程中,引出了以下典型的业务场景问题: + 单智能体(Agent)局限性与多智能体(MultiAgent)协作:由于单智能体缺乏专业分工、难以整合多领域知识,无法在复杂场景中实现动态协作决策。因此,随着 AI 应用场景变得更加复杂,单 Agent 应用会逐步向多 Agent 应用演进。然而,在 AI 任务处理耗时长的背景下,智能体间的通信(Agent2Agent)必须解决长耗时同步调用带来的阻塞问题以及应用的协作扩展性问题。 + 大规模会话状态管理,并保障会话连续性和任务处理可靠性:在网络或应用节点发生异常时,如何保障用户会话的连续性体验,并确保会话任务不被重复处理以避免算力资源浪费,成为一大挑战。 + 在有限算力下实现高效调度,并保障高优任务的及时响应:如何在有限算力资源下实现高效任务调度,从而既能提高算力资源利用率,保障高优任务被及时处理,又能确保算力服务整体稳定性。 在上述场景中,消息队列能够起到至关重要的作用: + 首先,通过消息队列将同步调用改为异步通知,是解决长耗时阻塞的关键。 + 其次,消息队列天然的“削峰填谷”能力可以平滑请求流量,缓解算力资源的处理压力。 + 再结合定速消费和消息优先级等高级特性,便能更有效地调度有限的算力资源。 为能够有效解决上述问题,RocketMQ 推出了针对性的解决方案。 RocketMQ for AI 重磅发布 RocketMQ 专门为 AI 场景推出了全新 LiteTopic 模型,相较于 RocketMQ 其他类型的 Topic,LiteTopic 具备以下核心特点: + 轻量资源:LiteTopic 是轻量资源,支持在父 Topic 下创建百万数量级的 LiteTopic,满足大规模任务需求。 + 自动化生命周期管理:LiteTopic 可在收发请求时自动创建,并可设置过期时间,到期后自动删除,简化了业务开发和资源管理。 + 高性能订阅:在消费订阅方面,每个消费者可以动态订阅或取消订阅多达万级的 LiteTopic 集合。如图中所示,消费者 1 订阅列表是 LiteTopic 1 和 LiteTopic 2,消费者 2 订阅列表是 LiteTopic 3 和 LiteTopic 4。 + 排他消费:确保一个 LiteTopic 在同一时间只被一个消费者订阅,这在会话保持等场景中至关重要。 + 顺序性保障:每个 LiteTopic 内部的消息严格保证顺序存储。 目前,这些能力已在阿里云云消息队列 RocketMQ 版 5.x 系列实例上正式发布,并会逐步贡献到 Apache RocketMQ 开源社区,欢迎大家使用。 场景应用一:MultiAgent 异步通信__ 延续前文对多智能体(MultiAgent)通信场景的讨论,我们在此详细阐述 RocketMQ 如何解决多智能体应用开发中的长耗时阻塞问题。 图中展示了多智能体(MultiAgent)应用中一个 Supervisor Agent(主智能体)和两个 Sub Agent(子智能体)之间的异步通信流程: 1. 接收请求阶段:为每个 Sub Agent 创建一个 Topic 作为请求任务的缓冲队列,可以是优先级 Topic,从而保障高优任务能够被优先处理。 2. 返回结果阶段: a. 为 Supervisor Agent 创建一个用于接收响应结果的 Topic,并让其订阅这个 Response Topic。该 Topic 可采用 RocketMQ 专为 AI 场景新发布的 Lite Topic 类型; b. 当 SubAgent 完成任务后,它会将结果发送至该 Response Topic,可以为每个独立任务动态创建一个专属的子 LiteTopic(例如,以任务 ID 或问题 ID 命名); c. Supervisor Agent 通过 MQ 的异步通知机制实时获取这些子 LiteTopic 中的结果,并可通过 HTTP SSE(ServerSent Events)等协议推送给 Web 端。 这一架构充分利用了 Lite Topic 的以下核心能力,解决了长耗时调用的难题: + 轻量资源:支持创建百万量级的子 LiteTopic,可以满足海量请求任务的通信需求。 + 自动化生命周期管理:子 LiteTopic 支持自动创建和删除,可以简化业务代码,降低资源管理投入。 + 顺序性保障:每个子 LiteTopic 的消息均按顺序存储和消费,可以保证流式响应结果的顺序性。 场景应用二:分布式会话状态管理 LiteTopic 的能力还可以有效解决会话场景中的挑战,例如保障长耗时会话的状态连续性、避免任务重试带来的成本增加等。 实现原理如图所示:在一个多节点高可用集群的应用服务中,不同用户的会话被分发到不同节点上。与前述的返回响应结果场景类似,系统为每个会话分配一个专属 LiteTopic 来传递消息(如会话结果)。每个应用服务节点仅订阅其关联会话所对应的 LiteTopic 集合,并将接收到的消息按顺序推送至 Web 端。 在此基础上,系统通过分布式架构和 RocketMQ 的一系列核心特性,实现高可用性保障: + 故障切换:当网络异常等原因导致 Web 端 2 重连到集群中的另一个节点 2 时,节点 2 会立即订阅此会话对应的 LiteTopic 2。 + 消费转移:由于排他消费特性,LiteTopic 2 的消息将不再推送给节点 1,转为推送给节点 2。 + 无缝续传:得益于消息持久化和消费位点持久化两大特性,节点 2 能够从上一次中断的位置无缝衔接,推送的数据流会接着之前的消费进度推送给节点 2。 最终,用户在 Web 端感受到的是会话没有中断,从而获得连续的会话体验。同时系统也避免了因连接切换而触发不必要的任务重试,有效节约了宝贵的算力资源和成本。 场景应用三:算力资源高效调度 在算力资源成本高昂且供给有限的背景下,如何实现资源的高效调度,是一个典型的应用场景。消息队列在此扮演了关键角色: + 首先,利用其天然的异步解耦和“削峰填谷”能力,可以平滑波动的请求流量,平稳地调用模型服务或算力服务。 + 其次,通过消费者限流(定速消费)能力,可以有效保护核心算力资源的稳定性,防止其因瞬时流量冲击而过载。 + 最后,消息优先级能力能够确保有限的算力资源被优先分配给高优任务(如高价值或高紧急度的任务)使用。 值得一提的是,RocketMQ 的优先级能力具备一个独特优势:消息的优先级支持在投递后动态修改。 例如,一个普通用户的任务正在队列中排队,此时该用户付费充值将账号升级为 VIP 账号。系统便可以动态提高其已在排队中的任务消息的优先级,让任务立刻被优先执行。 LiteTopic 模型技术解析 为支持百万量级的 LiteTopic,同时保障高并发与低延迟的消息发送和消费流程,其技术实现的核心要点如下: 1. 发送流程: 为实现快速、自动创建与删除 LiteTopic,基于 RocketMQ 新版本 RocksDB 的 KV Store 存储能力,实现对海量元数据信息的高效管理。 + 统一存储、多路分发:RocketMQ服务端接收到消息后,将所有消息数据统一存储在底层的 CommitLog 文件中且仅存储一份,这种单一文件的追加模式(Append)避免了磁盘碎片化,保障了极致的写入性能。但通过多路分发机制,可以为不同的 LiteTopic 生成独立的消费索引(ConsumerQueue,简称 CQ)。 + 索引存储引擎升级:摒弃了传统的文件型 CQ 结构,而是替换为高性能的 KV 存储引擎 RocksDB。通过将队列索引信息和消息物理偏移量(Physical Offset)作为键值对存储,充分发挥 RocksDB 在顺序写入方面的高性能优势,从而实现对百万级队列的高效管理。 2. 消费流程: 消费流程的核心挑战是:当每个 LiteTopic 内仅有少量消息时,若逐一推送,将导致并发处理能力和系统性能大幅下降。 为解决此问题,RocketMQ 在 LiteTopic 存储模型的基础上,进一步对消息分发与投递机制进行优化,针对单个消费者订阅上万个 LiteTopic 的场景,重新设计了一套创新的事件驱动拉取(EventDriven Pull)机制: 每当有新消息到达时,系统会立即触发订阅关系匹配,并将所有符合订阅条件的消息聚合到一个“就绪集合”(Ready Set)中。消费者可以直接从这个 Ready Set 中合并批量拉取来自多个 LiteTopic 的消息。通过这种方式,有效提高了消费并发度,降低了网络开销,从而显著提升了整体性能。 为企业级 AI 应用提供全方面的异步通信保障 随着 AI 技术的快速发展和应用落地,RocketMQ 已完成向“AI MQ”方向的战略升级,不仅支持传统的微服务应用,也致力于为企业级 AI 应用的开发和集成提供一站式异步通信解决方案,涵盖会话管理、Agent 通信、知识库构建以及模型算力调度等典型场景。同时,阿里云云消息队列 RocketMQ 版产品通过在成本与稳定性方面的持续优化,进一步帮助用户降本增效。 目前,RocketMQ for AI 相关能力已在阿里巴巴集团内部以及阿里云大模型服务平台百炼、通义灵码等产品中经过大规模生产环境的验证,且取得显著成效,充分证明了其在高并发、复杂的 AI 场景下的成熟度与可靠性。 展望未来,RocketMQ 将持续在 AI 领域进行技术迭代与创新,赋能更多应用场景,并积极与 AgentScope、Spring AI alibaba、Dify 等主流 AI 生态系统/服务合作集成,共建高效、智能的 AI 应用基础设施,以及逐步将经过阿里集团 AI 业务验证过的方案与特性,持续反馈到开源社区。 诚邀您扫码参与问卷调研,反馈真实使用场景和痛点,帮助我们打造更符合 AI 时代需求的消息引擎。也欢迎钉钉扫码加入交流群(群号:110085036316),与我们交流探讨~ 目前,轻量主题(LiteTopic)[1]功能已在阿里云云消息队列 RocketMQ 版[2]非 Serverless 系列(包年包月、按量付费)和 Serverless 系列的独享实例支持,可提交工单申请白名单(提单时需要提供购买实例的主账号 uid 和实例所属地域)。 同时,阿里云官网已上线 RocketMQ for AI 的解决方案,欢迎! 相关链接: [1] 轻量主题(LiteTopic) _https://help.aliyun.com/zh/apsaramqforrocketmq/cloudmessagequeuerocketmq5xseries/developerreference/litetopic_ [2] 云消息队列 RocketMQ 版 _https://www.aliyun.com/product/rocketmq_
#技术探索

2025年9月28日

构建企业级 AI 应用:为什么我们需要 AI 中间件?
前言 9 月 26 日,2025 云栖大会 AI 中间件:AI 时代的中间件技术演进与创新实践论坛上,阿里云智能集团资深技术专家林清山发表主题演讲《未来已来:下一代 AI 中间件重磅发布,解锁 AI 应用架构新范式》,重磅发布阿里云 AI 中间件,提供面向分布式多 Agent 架构的基座,包括:AgentScopeJava(兼容 Spring AI Alibaba 生态),AI MQ(基于Apache RocketMQ 的 AI 能力升级),AI 网关 Higress,AI 注册与配置中心 Nacos,以及覆盖模型与算力的 AI 可观测体系。阿里云 AI 中间件核心技术全面开源,全面拥抱行业标准,加速企业级 AI 应用规模化落地。 AI Agent 大爆发 自从 ChatGPT 掀起了大模型浪潮,短短三年间,AI 技术几乎以月为单位迭代:从能对话、写文案的 Chatbot,进化到协助编码与办公的 Copilot,如今进一步迈向具备自主规划、工具调用、记忆与协作能力的 Agent。AI 不再只是“会说话的软件”,而是能理解目标、拆解任务、选择策略并执行的 “ 行动主体 ”,能够在复杂场景中进行多轮推理与闭环反馈。 如果说 2022 年是 Chatbot 的起点,2023 年是 Copilot 的浪潮,那么 2025 年,注定是 Agentic AI 全面爆发的元年。我们来回顾一下近几年的演进路线图。 + 2022 年,Chatbot 进入大众视野。以 GPT3 为代表,大家用它生成文本、做自然语言交互——简单客服、文案写作是典型场景。但它仍然是“被动应答”。 + 2023 年,Copilot 革命来了。GPT4 带来更长上下文、多模态输入,加上检索增强(RAG)的成熟,让 AI 从“被动回答”升级为“主动协作”。开发者使用代码 Copilot,写代码效率可以提升 50% 以上;Office Copilot 深度融合 AI 技术于 Word、Excel、PPT 等应用中,深刻改变传统办公模式,大幅减少重复性工作。AI 开始深度融入企业核心工作中。 + 2025 年,我们将迎来“能理解、会规划、能协作、敢行动”的 Agentic AI 的集中爆发。一方面,从 GPT5、Qwen3 到 DeepSeek,模型在自主规划、多模态融合能力上高速发展;另一方面,针对 Agent 和工具的集成,多 Agent 通信的能力,也在走向标准化,如 MCP、A2A 标准。应用层面,人形机器人走上春晚舞台;企业里的“数字员工”开始上岗,如财务 Agent 能够自动审批流程,AI 的角色从辅助者走向“能自主决策、能自动闭环”的主体。 有几组数字也在印证这一发展趋势:根据 Gartner 行业报告,全球 Agent 市场正以 44.5% 的年复合增长率扩张,预计到 2028 年将达到近三千亿美元规模;将有超过 15% 的企业日常决策由 Agent 自主完成;而有三分之一的企业软件,会原生嵌入 Agentic AI 能力。 这不是一阵风,而是一个长周期的结构性趋势。技术、政策、市场这三股力量,正在同频共振,形成加速度。 1. 技术突破:开源模型的迅速成熟降低了门槛,推理成本呈断崖式下降,多模态和大小模型的协同拓宽边界。这让 Agent 不再是“概念验证”,而是可以规模化落地。 2. 政策引导:中国“十四五”明确把 AI 纳入新基建,国务院“人工智能+行动意见”为中国 AI 产业指明方向;美国、欧盟也在同步发力,形成全球共振。 3. 市场需求:企业端急需提效降本,消费端渴望更沉浸的体验,资本市场面向 AI 赛道持续加码,大量 AI 创业公司涌现。 技术降低门槛,政策扫清障碍,市场反哺研发,形成闭环。算法、算力与场景叠加,AI 从实验室跃迁万千行业,已经势不可挡。 过去十年企业为了拥抱移动互联网浪潮,实现数字化转型,纷纷投入云原生应用的研发;而今天,企业要抓住 AI 的浪潮,实现智能化转型,更需要投入 AI 原生应用的研发,把 AI 的能力融入到自己的核心业务中。 AI 原生应用架构 AI 原生应用是指从架构设计到功能实现均以人工智能技术(大模型)为核心驱动力的应用。它将 AI 能力深度嵌入系统底层,通过数据驱动决策、动态模型演化和端到端 AI 流程重构业务逻辑。 和传统应用相比,它带来四方面的根本性变化。 + 第一是交互界面。过去我们点击按钮、填写表单;现在我们对话、拖拽、上传图像或音频,进行多模态的“共创”。 + 第二是业务逻辑。过去是规则驱动、静态代码、确定性执行;现在是数据驱动、动态推理、概率性决策,系统可以在不确定中找到更优路径。 + 第三是技术栈。传统的“微服务+关键词检索+关系型数据库+CPU”的组合,正被“Agent 智能体+语义检索+知识图谱+向量数据库+GPU”所取代。 + 第四是架构哲学。以前我们用 RPC、消息队列把微服务拼接起来;现在以 Agent 为执行单元,用上下文工程、记忆机制、工具调用来实现自主决策与行动。我们从“流程自动化”迈向“认知自动化”,软件范式从“人适应机器”,转向“机器理解人”。 这场变革不是空中楼阁,它有一条清晰的历史脉络。 回看传统应用架构的演进,从单体到微服务再到云原生,本质是业务复杂度上升和技术红利出现的合力。早期的单体应用,只要把交易、商品、支付、物流这些核心功能堆在一起就够了。但移动互联网爆发后,业务碎片化、场景多样化,迫使我们用微服务把系统“解耦”,让不同团队能并行迭代,敏捷开发;海量访问压力需要系统能够灵活弹性扩展;基于云计算提供的弹性基础设施,结合微服务、容器技术、和可观测体系,则让资源调度更智能、故障恢复更自动。那是一条从“大泥球”走向“乐高拼装”的道路,也为今天的 AI 原生应用打下了分布式协作的基石。 AI 应用自身也在沿着一条清晰的路径进化:从单体的 Chatbot,走向分布式的 Agentic AI。 业务层面的驱动力是,从“对话”走向“行动”。单体 Chatbot 只能回答问题;而在很多企业里,我们需要 AI 深度参与业务流程,承担企业不同业务模块的职责,不同业务板块有不同的上下文、工具、业务流程,大模型在注意力聚焦的情况下具备更好的准确性,这就推动架构走向多 Agent 协作,每个 Agent 专注于自己的核心能力。比如我们要构建一个全栈 Web 开发平台,支持用户用自然语言构建 Web 应用,Agentic AI 将构建一个“数字员工团队”,包含开发 Agent、产品 Agent、文档 Agent,多 Agent 自动协作,最终完成构建 Web 应用的完整任务。 技术层面,这种升级依赖几个关键突破:模型从理解到规划,从“会答”到“会想”(比如深度思考模式、ReAct);Agent 范式形成统一认知,它是一个具备“感知—决策—执行”能力的实体,记忆层存储历史经验(向量库+KV),决策层由大模型驱动进行规划(ReAct、思维链),行动层通过工具调用与多 Agent 协作来完成;关键标准也在逐渐形成,如 MCP 让智能体工具调用有了通用标准,A2A 把智能体分布式协作也定义了统一语言;而云原生能力,让这些智能体可以充分利用云的弹性能力,按需弹性、按量付费。 但是,要构建企业级 AI 应用,远不止“会调几个大模型 API”这么简单,将面临以下几个挑战。 + 第一类是开发效率。今天做一个像样的 Agent,往往要手工编织记忆、决策、工具调用的细节,缺少开箱即用的框架和工具,开发效率不高。更不用说随着业务发展,AI 应用持续扩展业务能力,如何实现业务敏捷迭代也是个难题。 + 第二类是集成。要提升模型准确性以及业务深度融合,不可避免要采用上下文工程技术,引入 RAG 架构。RAG 架构往往涉及多源数据接入,数据 ETL。要构建实时可用的企业知识库,将面对异构系统对接、数据治理等复杂工程。更别提如何让全新的 AI 应用和存量的传统系统交互集成,如何将存量系统纳入 Agent、MCP 的集成体系。 + 第三类是稳定运行和持续优化。多 Agent 协作带来复杂的调用链路,推理 Tokens 流量波动大,延迟不可控,传统体系难以灵活弹性扩展;稳定与安全也充满风险——模型幻觉可能造成决策偏差,工具调用可能越权,A2A 通信链路里可能泄露敏感数据;更糟糕的是,可观测性不足,一旦出现故障,你将在 10 个 Agent 协同的链路里找不到故障点。没有评估体系,很难衡量 Agent 的决策质量;数据质量会随着业务环境变化而衰减;模型一旦迭代,推理行为也可能漂移,今天还很稳的客服 Agent,明天可能输出不合规的内容。 而这些问题,单靠大模型内置的能力解决不了,它们需要一整套工程化、平台化的“中间层”来承接。 而这个中间层便是 AI 中间件的定位。它在 AI 应用与大模型之间,承担三件事。第一是连接与集成:把大模型、工具链、数据存储、微服务这些碎片打通,让 Agent 可以无缝调用知识、使用工具、对接业务。第二是能力抽象:把 A2A 通信、状态管理、数据集成这些“非业务共性能力”封装起来,屏蔽底层复杂度,让开发者聚焦业务。第三是工程化支撑:提供可观测、安全治理、弹性扩缩容等企业级能力,保障生产环境的稳定性与高效运维。 为什么说它是规模化落地的最后一块拼图?很简单。它能帮你突破 POC 陷阱——很多原型之所以上不了线,不是因为算法不行,而是工程体系不行。换句话说,AI 中间件是企业级 AI 应用的“操作系统”。它把 Agent 从“技术概念”变成“真正的生产力引擎” ,打通落地的最后一公里。 阿里云 AI 中间件全新发布 为了应对规模化落地企业级 AI 应用的挑战,我们发布了全新的阿里云 AI 中间件,它是一个面向分布式多 Agent 架构的基座,核心技术全面开源,全面拥抱行业标准。 它包括:AgentScopeJava(兼容 Spring AI Alibaba 生态),AI MQ(基于Apache RocketMQ 的 AI 能力升级),AI 网关 Higress,AI 注册与配置中心 Nacos,以及覆盖模型与算力的 AI 可观测体系。 我们的目标是:把复杂留下,把标准、能力、工具开放出来,让企业不用重复从头搭架子。 AI 编程框架AgentScope 首先是 AI 编程框架,Spring AI Alibaba 内核正式升级为 AgentScopeJava,这是 Java 生态拥抱 Agentic AI 的重要里程碑。对数百万 Java 开发者来说,一套熟悉的开发范式,就能构建企业级智能体应用。AgentScopeJava 为 Java 开发者提供最低学习门槛的 AI 开发框架。 它的架构有两层,在核心编排层,提供 Agentic API,开发者能够声明式地定义 Agent 的记忆、决策、工具调用等能力,代码复用明显提升;原生支持 MCP 协议,便于扩展工具与伙伴智能体;支持流式通信,端到端的交互延迟显著降低;支持 Humanintheloop,把关键决策交回给人审核。在安全运行时层面,它提供沙箱隔离,阻断越权工具调用;提供上下文管理,动态维护短期状态与长期记忆;并且原生支持 A2A,实现分布式多 Agent 自动编排与协作。 AI MQApsaraMQ 再说 AI 通信与集成的“中枢”——AI MQ。传统消息队列是为交易类、日志类场景设计的,到了 Agent 时代,它们很多特性不够用,因此我们对 ApsaraMQ 面向 Agentic AI 做了全方位能力升级,ApsaraMQ 是基于 Apache RocketMQ 构建的云原生消息服务。 在消息存储引擎(Apache RocketMQ 内核)方面,支持百万级 Topic 资源管理、百万级队列存储、百万级订阅分发。 得益于消息存储引擎的增强,我们推出了面向 AI 场景的消息模型 LiteTopic,它具备轻量资源、有状态异步通信的特性,可实现 AI 多轮对话 Session 保持、Session 级顺序流式输出、Agent 2 Agent 的可靠通信、多模态大消息体(50MB 以上)。提供面向 AI 稀缺算力的消费调度模式,包括优先级、定速、权重等模式,最大化资源有效利用率。提供 AI 数据集成,支持多数据源实时构建知识库,构建实时 RAG 架构;支持事件流异步推理,批量异步推理;支持流式 AI ETL 处理。 聊到分布式多 Agent 架构,就必须说 A2A 通信机制。目前在分布式多 Agent 通信实现上,一开始往往采用同步调用的方式,一旦业务继续发展,Agent 之间的交互、依赖关系也会变得更加复杂。同步调用的模式将面临多重痛点: + 多 Agent 多次 LLM 调用,全同步调用延迟高,降低客户体验。 + 上下游 Agent 吞吐量难以完全对齐,易出现部分 Agent 流量过载、雪崩。 + AI 任务涉及多 Agent 协作完成,单点 Agent 失败,没有可靠重试,导致任务整体失败,浪费中间过程的算力资源多。 + Agent 调用同步强依赖,如串联电路,可用性降低。 + 在算力资源不足的情况下,Agent 无法实现任务优先级处理。 基于 AI MQ 的全新特性 LiteTopic 实现 A2A 可靠异步通信模式则可以免除这些痛点,让系统具备更好的可扩展性: + 主 Agent 任务规划完成,可异步并发请求多个 Agent,缩短任务完成时间。 + Agent 解除调用强依赖和吞吐量强依赖,可用性提升。 + Agent 通信的请求和处理结果均持久化到 MQ,具备 checkpoint 能力,基于 MQ 的可靠重试,无中间资源浪费。 + 有限资源下,Agent 支持按优先级处理任务,比如优先处理付费用户。 + 基于 1 对多的发布订阅通信模式,系统具备良好的扩展能力,可以异步构建历史对话存储、Agent 记忆等。 AI 网关Higress 第三块是 AI 网关 Higress。它提供统一接入与管理,能同时接入不同的大模型、MCP 服务、Agent,做协议兼容与流量调度,实现智能路由与负载均衡。在 LLM 层,统一调度通用大模型与垂类小模型,支持语义缓存和灾备降级;在 MCP 层,支持通过协议转换复用存量微服务,让企业多年积累的数字化资产快速转换为 AI Agent 的工具;在 Agent 层,提供 REST 到 A2A 的协议桥接,打通传统微服务与智能体协作。 它也具备企业级安全治理能力,包括 Token 限流、敏感信息过滤、WebSocket 无损变更、零信任鉴权,保障服务稳定与数据安全。 最新版的 Higress 还发布了 AI 开放平台 HiMarket,让企业可以把自有 Agent/MCP 服务标准化发布,按调用量进行精细化运营。 AI 注册/配置中心Nacos 第四块是 AI 注册和配置中心 Nacos。在微服务时代,Nacos 国内市场占有率高达 70% 以上,包含 Azure 在内的海内外主流云厂商都提供了 Nacos 托管产品。面向 AI 时代的今天,Nacos 3.1.0 版本重磅发布,围绕 AI Agent 解决 AI A2A 场景和 AI Tools 场景的问题。 在 AI 工具方面,Nacos 支持了 MCP Registry 官方协议,无需任何代码改造,就能将传统应用快速转变为 MCP Server 并动态统一管理。 在多 Agent 协作方面,Nacos 是首个支持 A2A 协议的注册中心。Agent 可以将描述自身能力的 AgentCard 注册到 Nacos,其他 Agent 只需填写 Nacos 地址,即可实现分布式多 Agent 的能力编排,让 Agent 的分布式协作,像普通应用一样的顺滑和稳定。 在配置管理方面,AI 时代下,API Key 等凭证的安全至关重要。Nacos 提供动态加密配置能力,支持敏感数据的加密存储与安全推送,有效保障 AI 资产的安全性。此外,基于 Nacos 的动态配置推送能力,还能实现 Agent 能力的动态更新——无需重新部署,即可灵活调整 Agent 运行时的配置。 AI 可观测 第五块是AI 可观测。做 AI 应用的同学都有体会:传统可观测体系,面对大模型与智能体协作,往往力不从心。我们做的是一次“从 IaaS 到 MaaS”的全栈、一体化进化,目标是为企业提供一个智算应用的“上帝视角”。这个体系有四根支柱: + 端到端诊断:对主流 Agent 与应用框架实现无侵入追踪,不管是复杂的 RAG,还是多 Agent 协作,都能把调用链清清楚楚地呈现出来,帮助快速定位故障与性能瓶颈。 + 全栈智算监控:从底层 GPU、网络、存储,到容器编排,再到模型推理、训练,以及上层应用与智能体交互,建立多维度的指标与日志体系,实时感知全局健康度。 + 精细成本管理:对 Token 消耗、GPU 利用率做多维分析,与网关联动支持预算限额、访问限流、智能路由,帮助企业在探索 AI 价值的同时,把成本效益做到最好。 + 质量与安全评估:覆盖从输入、规划到输出的全流程,自动化评估语义准确性、内容质量、偏见与安全风险,为上线运行提供“持续体检”,让输出更安全、更可控、更合规。 展望 AI 从来不是一个单点技术,而是一张系统工程的“网”。大模型是大脑,工具是四肢,数据是血液,算力是肌肉,而中间件,是把这一切组织起来的“神经系统”和“骨架”。阿里云 AI 中间件核心技术目前已全面开源,包括 Nacos、Higress、Apache RocketMQ、AgentScopeJava 等等,未来也将持续围绕开源生态和社区开发者一起共建、共同成长。我们相信,只有把标准和能力开放出来,行业才会更快地形成“共识与共用”,企业才会更专注于业务创新,少做重复劳动。我们愿意和用户、社区一起,推动下一代 AI 基础设施的标准化、工程化。 未来两三年,我们将见证越来越多的企业,从一个个“单体 Chatbot”,走向一个个“可协作、可演进、可闭环”的数字员工团队;我们也将见证 AI 在生产线、仓储、金融风控、客服中台、研发协作等场景的落地,真正带来效率、质量与体验的跃迁。
#技术探索

2025年8月19日

基于 EventBridge 构筑 AI 领域高效数据集成方案
引言:AI 时代的数据处理变革 人工智能技术的发展经历了从感知智能到生成智能,再到智能体和具身智能的跨越式演进。这一过程不仅体现在算法模型的不断突破,更深刻地反映在对数据处理能力要求的根本性变化。根据麦肯锡的调研数据显示,2022 年,全球有 50% 的公司部署了 AI 技术,投资超过总预算的 4%。生成式 AI(GenAI)的崛起进一步推动了企业转型,其在流程优化、个性化服务等方面的应用已经超越了传统 AI 的范畴。 在这一技术变革的浪潮中,数据处理能力的重要性愈发凸显。传统的数据处理架构主要围绕结构化数据的批量处理而设计,采用的是相对静态的 ETL 模式。然而,AI 时代的数据处理需求呈现出截然不同的特征:数据源更加多样化,包括文本、图像、音频、视频等多模态数据;处理要求更加实时化,需要支持流式数据的即时处理和响应;应用场景更加智能化,需要结合大语言模型的推理能力进行数据的理解、转换和增强。 本文将从 AI 时代数据处理的挑战与机遇出发,深入分析事件驱动架构在 AI 数据处理中的技术优势,详细阐述 EventBridge for AI ETL 的实践案例,展示其在不同应用场景中的价值。我们希望能够为企业在 AI 转型过程中的数据基础设施建设,提供有价值的技术指导和实践参考,推动 AI 技术在更广泛领域的落地应用。 一、AI 时代数据处理的挑战与机遇 1. GenAI 的演进路径分析 生成式 AI 的发展经历了从简单到复杂、从单一到多元的演进过程,每个阶段都对数据处理能力提出了不同的要求。深入理解这一演进路径,对于把握 AI 数据处理的发展趋势具有重要意义。 最初起点:简单模型 API 调用阶段 在生成式 AI 发展的初期阶段,应用架构相对简单直接。用户通过 Query 向大语言模型发送请求,模型基于预训练的知识生成 Response 并返回给用户。这种架构虽然现在看来可能过于"简陋",但却是许多初期现象级 AI 应用产品的起点,如文本总结、AI 算命、AI 情感分析等应用都采用了这种直白的架构模式。 在这个阶段,数据处理的需求相对简单,主要集中在 Prompt 的优化上。开发者需要通过精心设计的提示词来引导模型生成期望的输出,数据处理更多体现在输入文本的预处理和输出结果的后处理上。然而,这种简单的架构很快就暴露出明显的局限性:模型的知识截止时间限制了其对最新信息的获取能力,缺乏领域专业知识导致在特定场景下的表现不佳,无法处理个性化和上下文相关的复杂查询。 增强上下文:RAG 技术的兴起 为了解决简单模型 API 调用的局限性,RAG(RetrievalAugmented Generation)技术应运而生。RAG 的核心思想是在模型生成回答之前,先从外部知识库通过之前用于搜广推的向量检索技术方案,检索相关信息,然后将检索到的信息作为上下文提供给模型,从而增强模型的生成能力。 RAG 技术的引入标志着 AI 数据处理进入了一个新的阶段,数据处理需求显著增加,主要体现在两个方面:首先是问题域特有信息的处理,例如在分析用户在某个平台的购买喜好时,需要实时获取和处理用户在该平台的购买数据;其次是时效信息的处理,如股票信息、实时新闻等需要不断更新的动态数据。 RAG 技术的实现需要构建一个完整的数据处理管道,包括数据收集、预处理、向量化、存储、检索和后处理等多个环节。这对数据处理系统的实时性、准确性和可扩展性提出了更高的要求。根据 Menlo Ventures 发布的市场调研报告,RAG 以 51% 的市场份额在企业市场中占据绝对优势,充分说明其在实际应用中的重要地位。 Agent 模式:智能体的规划与工具能力 随着 AI 技术的进一步发展,单纯的检索增强已经无法满足复杂应用场景的需求。Agent 模式的出现代表了 AI 应用架构的又一次重大演进。Agent 是在特定环境下具备 plan+tools 能力的智能体,其中"特定环境"限制了 Agent 的创建面向特定的场景和问题域,"plan" 说明 Agent 具有思考和规划能力,且能够根据反馈进行循环迭代,"tools" 则是指 Agent 具备与外部交互的能力。 Agent 模式对事件驱动和数据处理提出了更加复杂和多样化的需求。Agent 需要能够动态地选择和调用不同的工具来完成任务,这要求事件驱动系统具备高度的灵活性和可扩展性。同时,Agent 的规划和决策过程需要基于实时的环境信息和历史数据,这对数据的实时性和一致性提出了严格要求。 2. 数据种类的多样化挑战 AI 时代的数据处理面临着前所未有的数据种类多样化挑战。与传统的以结构化数据为主的处理模式不同,AI 应用需要处理包括文本、图像、音频、视频在内的多模态数据,每种数据类型都有其独特的处理要求和技术挑战。 结构化数据 结构化数据具有固定的格式和明确的字段定义,是传统数据处理系统最擅长处理的数据类型。在 AI 应用中,结构化数据主要来源于数据库、数据仓库、业务系统等,包括用户信息、交易记录、日志数据等。这类数据的处理相对成熟,主要挑战在于如何高效地进行数据清洗、转换和集成。 然而,即使是结构化数据的处理,在 AI 时代也面临新的挑战。首先是数据量的急剧增长,根据 IDC 的预测,全球数据量将从 2020 年的 64.2ZB 增长到 2025 年的 175ZB 。其次是数据源的多样化,企业需要整合来自不同系统、不同格式的结构化数据。最后是实时性要求的提高,AI 应用往往需要基于最新的数据进行推理和决策。 非结构化数据 非结构化数据在 AI 应用中占据越来越重要的地位。文本数据是较为通用的非结构化数据类型,包括文档、邮件、社交媒体内容、客服对话等。这类数据的处理需要运用自然语言处理技术,包括分词、实体识别、情感分析、语义理解等。 图像和视频数据的处理更加复杂,需要运用计算机视觉技术进行特征提取、目标检测、图像分类等。音频数据的处理则涉及语音识别、音频分类、声纹识别等技术。每种非结构化数据都需要专门的预处理、特征提取和向量化技术,这大大增加了数据处理系统的复杂性。 半结构化数据 半结构化数据是介于完全结构化的数据和完全无结构的数据之间的一种数据形式。它不符合关系数据库或其他数据表形式的严格结构,但包含标签或其他标记,用于分隔语义元素和执行记录和字段的层次结构。这使得它比非结构化数据更容易分析,也更具灵活性。 AI 系统,特别是机器学习和深度学习模型,需要大量的、多样化的数据进行训练和推理。半结构化数据凭借其灵活性和丰富的上下文信息,在 AI 的多个关键环节中扮演着核心角色。譬如标注信息存储,特征工程,A2A Message Events 等等。 多模态数据的融合处理 随着 AI 技术的发展,越来越多的应用需要同时处理多种模态的数据。例如,智能客服系统需要同时处理文本、语音和图像信息;智能推荐系统需要结合用户的行为数据、内容特征和社交关系等多维信息。多模态数据的融合处理不仅需要处理每种模态的数据,还需要建立不同模态之间的关联和映射关系。 这种融合处理的挑战在于如何保证不同模态数据的时间同步、语义一致和质量统一。同时,多模态数据的存储和检索也需要专门的技术支持,传统的关系型数据库往往无法满足这种需求,需要采用向量数据库、图数据库等新型存储技术。 3. 主流数据采集方式的演变 AI 时代的数据采集方式相比传统模式发生了显著变化。传统的数据采集主要依赖定期的批量抽取,而 AI 应用往往需要实时或近实时的数据流。这种变化对数据采集系统的架构和性能提出了新的要求。 + 实时数据流采集成为主流趋势。通过消息队列、流处理框架等技术,系统能够实时捕获和处理数据变化。Apache Kafka、Apache RocketMQ 等流处理平台在 AI 数据采集中发挥着越来越重要的作用。根据市场研究数据,流处理技术已经成为 2024 年数据集成的关键趋势。 + API 驱动的数据采集也变得越来越普遍。通过 RESTful API、GraphQL 等接口,系统能够按需获取外部数据源的信息。这种方式特别适合处理第三方服务的数据,如社交媒体数据、天气信息、金融数据等。 + 事件驱动的数据采集是另一个重要趋势。当特定事件发生时,系统自动触发数据采集和处理流程。这种方式能够大大提高数据处理的效率和实时性,特别适合处理用户行为数据、系统日志等事件型数据。 4. AI 时代的数据集成挑战 (图源:_https://x.com/RLanceMartin/status/1673380038274695169_) 上图是 Langchain 在 RAG 领域定义的数据集成。诚然它具有模块化、声明式设计,并为我们提供了大量实用程序和辅助功能,但是在工程化的复杂度依旧存在,我们依然会陷入针对 Data 领域的抽象和工程化实现。 所以,在 AI 与数据集成的实践过程中,我们总结出企业普遍面临三大核心痛点,这些痛点不仅影响了 AI 应用的开发效率,也制约了 AI 技术的规模化应用。 扩展难:数据源异构性挑战 随着企业数字化程度的提高,数据源的种类和数量呈爆炸式增长。企业需要整合来自 ERP、CRM、OA、电商平台、社交媒体、IoT 设备等各种系统的数据。这些数据源在数据格式、接口协议、更新频率、访问权限等方面存在巨大差异。 一个简单的数据集成项目在初期往往进展顺利,但随着需要接入的数据源增加,系统的复杂度呈指数级增长。每增加一个新的数据源,开发团队都需要了解其特定的数据格式和接口规范,开发相应的连接器和转换逻辑,并进行充分的测试和验证。这种线性增长的开发模式严重制约了 AI 项目的扩展能力。 更为严重的是,不同数据源之间往往存在数据格式不一致、字段命名不规范、数据质量参差不齐等问题。例如,同样是用户信息,不同系统可能使用不同的用户 ID 格式,时间字段可能采用不同的时区和格式,地址信息可能有不同的结构化程度。这些差异需要在数据集成过程中进行统一处理,进一步增加了系统的复杂性。 运维难:业务复杂性增长 AI 数据处理系统的运维复杂性远超传统的数据处理系统。首先,AI 应用对数据的实时性要求更高,任何数据延迟都可能影响模型的推理效果。其次,AI 数据处理涉及多个环节,包括数据采集、清洗、转换、向量化、存储、检索等,每个环节都可能出现问题。最后,AI 模型的迭代更新频繁,数据处理逻辑也需要相应调整。 在实际运维过程中,运维团队经常面临各种突发问题:数据源突然变更接口格式导致数据采集中断,数据质量问题导致模型推理结果异常,系统负载突增导致处理延迟,存储空间不足导致数据丢失等。这些问题往往需要跨团队协作解决,涉及数据工程师、算法工程师、运维工程师等多个角色。 传统的运维方式主要依赖人工监控和处理,这种方式在面对 AI 数据处理系统的复杂性时显得力不从心。企业迫切需要智能化的运维工具和自动化的故障处理机制,以降低运维成本和提高系统可靠性。 稳定性差:数据链路可靠性问题 数据链路的稳定性是 AI 应用能否成功上线生产环境的关键因素。在 AI 应用中,数据质量和处理链路的任何问题都可能导致模型推理结果的偏差甚至错误,进而影响业务决策和用户体验。 数据链路的稳定性问题主要体现在几个方面:数据丢失或重复,由于网络故障、系统异常等原因导致数据在传输过程中丢失或重复处理;数据延迟,由于处理能力不足、网络拥塞等原因导致数据处理延迟,影响 AI 应用的实时性;数据质量下降,由于数据源变更、处理逻辑错误等原因导致数据质量下降,影响模型的推理效果;系统故障,由于硬件故障、软件 bug 等原因导致整个数据处理链路中断。 这些稳定性问题的根本原因在于传统数据处理架构的紧耦合设计。在紧耦合架构中,任何一个组件的故障都可能影响整个系统的运行。同时,缺乏有效的监控和告警机制,问题往往在造成严重影响后才被发现。 为了解决这些问题,业界开始探索基于事件驱动架构的松耦合设计。通过将数据处理流程分解为独立的事件和处理单元,系统能够实现更好的容错能力和可扩展性。同时,通过引入智能监控和自动恢复机制,系统能够及时发现和处理各种异常情况,保证数据链路的稳定运行。 二、AI 数据处理的技术基石 事件驱动架构__ 1. 事件驱动架构的核心概念 事件驱动架构(EventDriven Architecture,EDA)作为一种现代软件架构模式,为解决 AI 时代数据处理的复杂性挑战提供了强有力的技术基础。 Event的本质:状态变化的数字化表达 在事件驱动架构中,Event(事件)是系统的核心概念。简单来说,事件就是状态的显著变化,是一切能够输入计算机中且能被处理的符号的数字化表达。这种定义看似简单,但却蕴含着深刻的技术内涵。 以一个典型的 4S 店售卖汽车的业务场景为例,我们可以清晰地看到事件的本质特征。当客户购买汽车并且其状态从"For Sale"变为"Sold"时,这构成了一个销售事件。成功交易后,从账户中扣除金额形成了一个支付事件。用户点击预订试驾后,将预约信息添加到指定用户的操作产生了一个预约事件。甚至用户资料和预约单本身也可以被视为事件的载体。 这种事件化的思维方式具有重要的技术优势。首先,事件提供了系统状态变化的完整记录,使得系统具备了天然的审计和回溯能力。其次,事件的异步特性使得系统组件之间能够实现松散耦合,提高了系统的可扩展性和容错能力。最后,事件的标准化格式使得不同系统之间的集成变得更加简单和可靠。 在 AI 数据处理场景中,事件的概念得到了进一步的扩展和深化。数据的产生、变更、处理、存储等各个环节都可以被抽象为事件。例如,当新的训练数据上传到系统时,产生数据接收事件;当数据经过清洗和转换后,产生数据处理完成事件;当向量化处理完成后,产生向量生成事件;当数据成功存储到向量数据库后,产生数据入库事件。这种事件化的处理方式使得整个 AI 数据处理流程变得清晰、可控和可监控。 2. EventBridge 架构深度解析 阿里云 EventBridge 作为事件驱动架构的具体实现,在技术架构设计上充分体现了 EDA 的核心理念,同时针对 AI 数据处理的特殊需求进行了深度优化。 源、过滤、转换、目标 EventBridge 的技术架构围绕四大核心能力构建:源(Source)、过滤(Filter)、转换(Transform)、目标(Sink)。这四大能力形成了完整的事件处理链路,为 AI 数据处理提供了全面的技术支撑。 源(Source)能力负责事件的接入和采集。EventBridge 支持多种类型的事件源,包括结构化数据源和非结构化数据源。结构化数据源涵盖了消息队列(Kafka、RocketMQ 等)、数据库(关系型数据库、数据仓库)、可观测性平台(SLS、Prometheus)、API 接口等。非结构化数据源则包括对象存储(CSV、PDF、TXT 等文件格式)以及各种自定义数据源。这种多样化的数据源支持使得 EventBridge 能够适应 AI 应用中复杂多变的数据接入需求。 过滤(Filter)能力提供了灵活的事件筛选机制。通过事件模式匹配,系统可以根据预定义的规则对事件进行筛选和路由。EventBridge 支持多种匹配模式,包括指定值匹配、前缀匹配、包含匹配、除外匹配、多模式匹配等。这种细粒度的过滤能力使得系统能够精确地控制事件的处理流程,避免不必要的计算资源消耗。 转换(Transform)能力是 EventBridge 在 AI 领域的核心创新。系统支持多种转换方式,包括自定义代码转换、自定义模型转换、自定义 API 转换等。特别值得注意的是,EventBridge 集成了百炼模型服务,能够调用大语言模型进行智能化的数据转换。这种 AI 驱动的转换能力使得系统能够处理传统 ETL 工具难以处理的复杂数据转换任务。 目标(Sink)能力负责处理后事件的输出和存储。EventBridge 支持多种目标类型,包括消息队列、数据库、数据仓库、可观测性平台、函数计算、API 接口、通知服务等。这种多样化的目标支持使得处理后的数据能够灵活地流向不同的下游系统,满足 AI 应用的多样化需求。 事件总线模型:N:M 的灵活路由 EventBridge 的事件总线模型采用了经典的 EDA(事件驱动)架构中的 N:M 模型,提供了多事件路由、事件匹配、事件转换等核心能力,帮助用户快速搭建事件驱动架构。 在事件总线模型中,多个事件源可以同时向事件总线发送事件,事件总线根据预定义的规则将事件路由到相应的目标服务。这种 N:M 的路由模式具有重要的技术优势。 + 首先,它实现了事件源和目标服务之间的完全解耦,事件源不需要知道有哪些目标服务在消费事件,目标服务也不需要知道事件来自哪个源。 + 其次,它支持动态的路由配置,可以在运行时添加或删除事件源和目标服务,而不影响系统的正常运行。 + 最后,它提供了强大的事件复制和广播能力,一个事件可以同时被多个目标服务处理,实现了数据的多路分发。 在 AI 数据处理场景中,事件总线模型的这些特性具有重要价值。例如,当新的训练数据到达时,可以同时触发数据预处理、质量检查、备份存储等多个处理流程。当模型推理完成时,可以同时更新缓存、记录日志、发送通知等。这种并行处理能力大大提高了 AI 数据处理的效率。 事件流模型:1:1 的高效传输 除了事件总线模型,EventBridge 还提供了事件流模型,采用标准的 Streaming(1:1)流式处理场景。事件流模型没有总线概念,适用于端到端的数据转储、数据同步及数据处理等场景,帮助用户轻松构建云上数据管道服务。 事件流模型的核心优势在于其高效的点对点传输能力。在这种模型中,事件从源直接流向目标,中间经过匹配和转换处理,但不需要经过复杂的路由逻辑。这种简化的处理流程使得事件流模型在处理大量数据时具有更高的性能和更低的延迟。 在 AI 数据处理中,事件流模型特别适合处理实时数据流。例如,将实时产生的用户行为数据直接流式处理并存储到向量数据库中,或者将传感器数据实时转换为模型输入格式。这种高效的流式处理能力为实时 AI 应用提供了重要的技术支撑。 3. 事件驱动架构在 AI 领域的应用价值 事件驱动架构在 AI 领域的应用价值不仅体现在技术层面的优势,更重要的是它为 AI 应用的规模化部署和运营提供了坚实的基础。 松散耦合设计 松散耦合是事件驱动架构的核心特征,也是其在 AI 领域应用的重要价值所在。在 AI 系统中,不同的组件往往由不同的团队开发和维护,包括数据工程团队、算法团队、平台团队等。松散耦合的设计使得这些团队能够独立地开发和部署各自的组件,而不需要过多地考虑其他组件的实现细节。 这种设计理念特别适合AI项目的迭代开发模式。AI 算法和模型往往需要频繁地更新和优化,如果系统采用紧耦合的设计,每次算法更新都可能需要修改多个组件。而在松散耦合的架构中,算法的更新只需要修改相应的事件处理逻辑,不会影响其他组件的正常运行。 可扩展性/稳定性保障 AI 应用的负载往往具有很强的不确定性和波动性。在某些时段,系统可能需要处理大量的数据和请求;而在其他时段,系统的负载可能相对较低。事件驱动架构的可扩展性特征使得系统能够根据实际负载动态调整资源配置。 在事件驱动架构中,每个组件都可以独立地进行扩展。当某个组件的处理能力不足时,可以增加该组件的实例数量,而不需要扩展整个系统。这种细粒度的扩展能力使得资源配置更加精确和高效。 端到端的实时传输 AI 应用往往对实时性有很高的要求,特别是在实时推理、智能客服等场景中。事件驱动架构的实时传输特性使得系统能够快速响应各种事件,大大提高了 AI 应用的实时性。 采用事件驱动架构的系统在可扩展性、可维护性和可靠性方面都有显著提升。在 AI 领域,这些优势更加明显,因为 AI 应用往往需要处理更加复杂和多变的数据处理需求。 三、解决方案详解EventBridge 多源 RAG 能力 EventBridge 多源 RAG 能力代表了事件驱动架构在 AI 数据处理领域的重要突破。这一能力的核心价值在于将传统的数据处理流程与现代 AI 技术深度融合,为企业构建智能化的数据管道提供了全新的技术范式。 1. 多源数据接入 在 AI 时代,数据源的多样性和复杂性达到了前所未有的程度。EventBridge 多源 RAG 能力通过统一的接入框架,实现了对各种异构数据源的无缝集成,为 AI 应用提供了丰富的数据基础。 非结构化数据 Loader 技术 非结构化数据在 AI 应用中占据越来越重要的地位,特别是在 RAG(检索增强生成)场景中。EventBridge 针对非结构化数据的处理需求,开发了一套完整的 Loader 技术体系,支持多种数据格式的智能解析和处理。 分块处理策略是非结构化数据处理的关键技术。在 RAG 应用中,长文档需要被分割成适当大小的文本块,以便进行向量化和检索。EventBridge 提供了多种分块策略,包括基于字符数的固定分块、基于段落结构的逻辑分块等。 单文档与批量加载是系统设计中的重要考虑因素。对于实时性要求较高的场景,系统支持单文档的即时加载和处理,确保新文档能够快速进入 RAG 系统。对于批量数据处理场景,系统提供了高效的批量加载机制,支持并行处理和断点续传,确保大规模数据的可靠处理。 结构化数据源集成 结构化数据源的集成是 EventBridge 多源 RAG 能力的另一个重要组成部分。与非结构化数据不同,结构化数据具有明确的格式定义和字段结构,但其集成挑战主要体现在数据源的多样性和实时性要求上。 数据系统集成是结构化数据接入的重要方式。EventBridge 支持主流的消息队列(如 Kafka、RocketMQ、MQTT),日志服务(如 SLS,Simple Log Service),数据库服务(如 MySQL)等。 实时流数据处理是 EventBridge 在结构化数据处理方面的重要创新。系统采用了流式处理架构,能够实时处理高吞吐量的数据流。可实现复杂的流式数据转换和聚合操作,为实时 RAG 应用提供丰富的事件源。 2. 向量数据库入库优化 向量数据库是 RAG 应用的核心基础设施,EventBridge 在向量数据库入库方面的优化为 RAG 应用提供了高效、可靠的数据存储支撑。 向量数据库支持 随着 AI 技术的发展,向量数据库市场呈现出百花齐放的态势。不同的向量数据库在性能特征、功能特性、成本结构等方面各有优势。EventBridge 通过提供统一的向量数据库接入接口,支持 Dashvector、Milvus 等主流向量数据库产品,为用户提供了灵活的选择空间。 传统数据库向量插件兼容 除了向量数据库,许多传统数据库也推出了向量扩展插件,如 PostgreSQL 的 PGvector、MySQL 的向量索引等。这些向量插件使得用户能够在现有的数据库基础设施上实现向量存储和检索功能,降低了系统的复杂度和成本。EventBridge 也即将支持向量插件方式入库,为开发者带来更多选择。 一键白屏化入库体验 为了降低向量数据库使用的技术门槛,EventBridge 提供了一键白屏化的入库体验。用户只需要通过简单的图形界面配置,就能够实现复杂的向量数据处理和入库流程。 + 直观的拖拽式配置界面:用户可以通过拖拽的方式配置数据源、处理逻辑、目标数据库等,系统会自动生成相应的处理流程。这种可视化的配置方式大大降低了系统使用的复杂度。 + 丰富的预置模板:涵盖常见的 RAG 应用场景,用户可以基于这些模板快速搭建自己的数据处理流程,然后根据具体需求进行定制化调整。 + 完善的监控仪表板和告警机制:用户可以实时查看数据处理的状态、性能指标、错误信息等,及时发现和解决问题。 四、解决方案详解实时推理与异步推理能力__ EventBridge 在推理接入方面提供了灵活的选择,支持实时推理和异步推理两种模式,以满足不同应用推理场景的需求。 1. 智能数据转换能力 数据转换是 ETL 流程中最复杂也是最关键的环节。EventBridge 多源 RAG 能力在数据转换方面的创新,主要通过深度集成大语言模型(LLM)的推理能力,将其自然语言理解和生成能力引入数据处理流程,实现传统 ETL 工具难以处理的复杂数据转换任务。 + 在数据清洗方面,LLM 能够智能识别和处理各种数据质量问题。例如,基于上下文自动纠错包含拼写错误的文本数据;将格式不规范的地址信息标准化为统一格式;将包含缩写和俚语的文本转换为标准的表达方式。 + 在数据增强方面,LLM 能够为原始数据添加语义信息,为 AI 应用提供更加丰富的数据基础。例如,从产品描述文本中自动提取产品的关键特征和属性;分析用户评论的情感倾向和关键观点;提取新闻文章的关键事件和实体信息。 + 在数据转换方面,LLM 能够实现复杂的格式转换和结构重组,大大降低了数据集成的复杂度。例如,将非结构化的文本转换为结构化的 JSON 格式;将表格数据转换为自然语言描述;将多种数据源的信息融合为统一的数据模型等。 2. 结构化输出技术 结构化输出是 EventBridge 在 AI 数据处理方面提供的重要能力。传统的大语言模型输出通常是非结构化的自然语言文本,往往需要进行二次解析和处理(例如,使用正则表达式、自然语言处理工具等方法从模型输出中提取结构化信息),这个过程不仅复杂而且容易出错。EventBridge 支持结构化输出,使得大语言模型能够直接生成 JSON、XML 等结构化的数据格式,避免了二次解析的复杂性,大大简化了数据处理流程。 在实际应用中,结构化输出技术能够支持复杂的数据结构定义。用户可以定义包含嵌套对象、数组、枚举值等复杂结构的输出格式,模型会严格按照定义的格式生成输出。这种精确的格式控制使得 AI 数据处理能够与下游系统无缝集成。 + JsonSchema 原生支持:JsonSchema 是 JSON 数据格式的标准化描述语言,广泛应用于 API 设计和数据验证。EventBridge 提供了对 JsonSchema 的原生支持,允许用户定义期望的输出格式,并确保模型输出严格符合定义的格式。JsonSchema 支持复杂的数据结构定义,包括字段类型、约束条件、默认值等详细信息,系统会在模型推理过程中进行实时验证和纠正,确保数据处理结果的一致性和可靠性。对于支持 JsonSchema 的模型,系统会优先使用其原生能力进行结构化输出,通常具有更好的性能和更高的准确性。 + 提示词注入优化:对于不支持 JsonSchema 原生能力的模型,EventBridge 采用智能提示词注入技术。系统会分析用户输出格式,并结合模型特性,生成相应的提示词模板并注入到模型的输入中,引导模型生成符合要求的结构化输出。系统将根据模型的输出质量动态调整提示词,优化输出效果。在多轮对话和上下文处理方面,系统能够智能管理提示词上下文,维护完整的对话历史,确保每轮输出的格式一致。因此,EventBridge 能够支持各类大语言模型,无论其是否原生支持结构化输出,都能生成高质量的结构化数据,为用户提供更广泛的模型兼容性和更大的选择空间。 五、EventBridge for AI ETL 的最佳实践 _ _ EventBridge 通过丰富的应用场景实践,为不同行业和业务需求提供了完整的解决方案。这些最佳实践不仅验证了技术方案的可行性,更为企业在 AI 数据转型过程中提供了宝贵的经验参考。 1. 数据预处理(Data Preprocessing) 数据预处理是机器学习和 AI 应用中的关键环节,数据质量直接影响模型的训练效果和推理准确性。EventBridge 在数据预处理方面提供了全面的解决方案,特别是在 SFT(Supervised FineTuning)训练数据准备方面展现出独特优势。 训练数据准备 监督微调(SFT)是大语言模型训练中的重要环节,需要高质量的标注数据来指导模型学习特定任务的能力。EventBridge 通过事件驱动的数据处理流程,能够高效地处理和准备 SFT 训练所需的数据集。 在数据收集阶段,系统能够从多个数据源实时收集原始数据,包括用户对话记录、文档库、知识库、API 响应等。通过事件驱动的方式,当新的数据产生时,系统能够立即触发数据处理流程,确保训练数据的时效性。例如,当客服系统产生新的对话记录时,系统会自动提取对话内容,进行格式标准化,并添加到训练数据集中。 在数据标注方面,EventBridge 集成了大语言模型的能力,能够实现半自动化的数据标注。系统可以使用预训练的模型对原始数据进行初步标注,然后通过人工审核和修正,形成高质量的训练样本。这种人机结合的标注方式大大提高了数据标注的效率和质量。 在数据格式转换方面,不同的模型训练框架往往需要特定的数据格式。EventBridge 能够借助中间函数计算节点,根据目标模型的要求,自动将数据转换为相应的格式。例如,将对话数据转换为 ChatML 格式,将问答数据转换为 Alpaca 格式等。这种自动化的格式转换能力大大简化了数据准备的工作量。 数据错误处理 数据错误是影响 AI 模型性能的重要因素,包括数据缺失、格式错误、逻辑错误等多种类型。EventBridge 通过 LLM 节点和函数节点,能够有效提高数据质量。 在数据缺失处理方面,系统采用了多种策略。对于数值型数据,可以使用均值、中位数、众数等统计方法进行填充;对于分类型数据,可以使用最频繁的类别进行填充;对于文本数据,可以使用语言模型生成合理的填充内容。系统会根据数据的特征和业务需求自动选择最合适的填充策略。 在格式错误处理方面,系统能够智能识别和修复各种格式问题。例如,对于日期时间数据,系统能够识别多种日期格式并统一转换为标准格式;对于数值数据,系统能够处理千分位分隔符、货币符号等格式问题;对于文本数据,系统能够处理编码问题、特殊字符等。 在逻辑错误处理方面,系统通过规则引擎和机器学习模型相结合的方式进行检测和修复。例如,检测年龄数据是否在合理范围内,检测地址信息是否符合地理逻辑,检测业务数据是否符合业务规则等。当发现逻辑错误时,系统会根据预定义的修复策略进行自动修复或标记为需要人工处理。 数据重复去除 数据重复是大规模数据处理中的常见问题,特别是在多源数据集成的场景中。EventBridge 提供了多过滤和中间过程转换,确保数据的唯一性和一致性。 在精确重复检测方面,系统通过哈希算法快速识别完全相同的记录。对于大规模数据集,系统采用了分布式哈希计算,能够在保证准确性的同时提高处理效率。 在近似重复检测方面,系统使用了多种相似度计算方法。对于文本数据,采用编辑距离、余弦相似度、Jaccard 相似度等方法;对于数值数据,采用欧氏距离、曼哈顿距离等方法;对于结构化数据,采用字段级别的相似度计算。系统会根据数据类型和业务需求自动选择最合适的相似度计算方法。 在重复处理策略方面,系统提供了多种选择。可以保留最新的记录,保留质量最高的记录,或者将重复记录合并为一条记录。系统还支持自定义的重复处理逻辑,用户可以根据具体的业务需求定义重复处理规则。 数据缺失补全 数据缺失是实际数据中的普遍现象,如何合理地处理缺失数据对模型性能有重要影响。EventBridge 可通过中间函数计算节点对数据进行处理和补全,提高数据的完整性和可用性。 在统计方法补全方面,系统支持多种经典的统计填充方法。对于数值型数据,可以使用均值、中位数、众数、线性插值、多项式插值等方法;对于分类型数据,可以使用众数、随机填充、基于分布的填充等方法。系统会根据数据的分布特征自动选择最合适的填充方法。 在机器学习方法补全方面,系统使用训练好的模型来预测缺失值。例如,使用回归模型预测数值型缺失值,使用分类模型预测分类型缺失值。这种方法能够考虑数据之间的复杂关系,通常能够获得更好的填充效果。 在深度学习方法补全方面,系统采用了自编码器、生成对抗网络等深度学习技术。这些方法能够学习数据的深层特征和复杂模式,对于高维数据和复杂缺失模式具有更好的处理效果。 2. RAG 检索增强生成(Retrievalaugmented Generation)数据入库 RAG 技术作为当前 AI 应用的重要范式,其数据入库过程的质量直接影响检索和生成的效果。如上文介绍 EventBridge 在 RAG 数据入库方面提供了从简单到复杂的完整解决方案。 简易数据入库流程 对于单一数据源的 RAG 应用场景,EventBridge 提供了简化的数据入库流程。这种流程特别适合快速原型开发和小规模应用部署。 在数据源配置方面,用户只需要指定数据源的类型和连接信息,系统会自动处理数据的读取和解析。支持的数据源包括对象存储(OSS)中的文档文件、数据库中的文本字段、API 接口返回的文本数据等。 在文档处理方面,系统提供了自动化的文档解析和分块处理。对于 PDF、Word、TXT 等常见文档格式,系统能够自动提取文本内容,并根据文档结构进行智能分块。分块策略可以基于段落、章节、固定长度等多种方式,用户可以根据具体需求进行配置。 在数据入库方面,系统支持多种向量数据库,用户可以选择最适合的存储方案。系统会自动处理向量数据的索引构建和存储优化,确保检索性能和存储效率。 增强多源数据入库 对于复杂的企业级 RAG 应用,往往需要整合来自多个数据源的信息。EventBridge 的增强多源数据入库能力能够处理这种复杂场景的需求。 在多源数据协调方面,系统提供了统一的数据处理框架。不同数据源的数据会被转换为统一的内部格式,然后进行统一的处理和存储。这种设计使得系统能够无缝地处理来自不同源的异构数据。 在数据融合方面,系统能够智能地合并来自不同源的相关信息。例如,将产品数据库中的产品信息与用户评论系统中的评论信息进行关联,形成更加完整的产品知识。系统使用实体识别和关系抽取技术来发现数据之间的关联关系。 在数据一致性方面,系统提供了多种一致性保证机制。包括数据版本管理、冲突检测和解决、数据同步等。当多个数据源包含相同实体的不同信息时,系统能够智能地进行信息合并和冲突解决。 在增量更新方面,系统支持实时的数据更新和同步。当源数据发生变化时,系统能够自动检测变化并更新相应的向量表示。这种增量更新能力确保了 RAG 系统能够获取最新的信息。 3. 数据清洗与标准化 (Data Cleansing & Standardization) 数据清洗与标准化是数据处理中的基础环节,EventBridge 通过集成大语言模型的能力,实现了智能化的数据清洗和标准化处理。 地址信息的标准化是一个典型的数据清洗场景。在实际应用中,来自不同渠道的地址信息往往格式不一、存在错别字、缺少关键信息等问题。EventBridge 通过 LLM 驱动的地址标准化能力,能够将各种格式的地址信息转换为统一的标准格式。 在地址解析方面,系统使用大语言模型来理解地址的语义结构。即使地址信息存在格式不规范、字段顺序混乱等问题,模型也能够准确识别出省、市、区、街道等各个组成部分。例如,对于"北京市海淀区中关村大街1号"这样的地址,系统能够准确解析出省份为"北京",城市为"北京",区县为"海淀区",街道为"中关村大街1号"。 在错误纠正方面,系统能够自动识别和纠正地址中的错别字和格式错误。例如,将"北京市海定区"纠正为"北京市海淀区",将"中关村大街一号"标准化为"中关村大街1号"。系统使用了地理知识库和语言模型相结合的方法,确保纠正的准确性。 在信息补全方面,系统能够根据已有的地址信息补全缺失的部分。例如,根据详细地址自动推断邮政编码,根据区县信息补全城市和省份信息。系统集成了完整的地理信息数据库,能够提供准确的地理信息补全。 在格式标准化方面,系统将所有地址信息转换为统一的标准格式。输出格式包括结构化的字段信息(省、市、区、街道、邮政编码等)和标准化的地址字符串。这种标准化的输出格式便于后续的数据处理和分析。 4. 合规与隐私保护 (Compliance & Privacy Protection) 在数据处理过程中,合规与隐私保护是不可忽视的重要方面。EventBridge 提供了全面的合规和隐私保护能力,确保数据处理符合相关法规要求。 数据脱敏是保护隐私的重要技术手段,EventBridge 可借助大模型,实现隐私信息识别,隐私信息处理等多种数据脱敏处理场景。为用户提供端到端的数据处理能力。 通过这些全面的合规和隐私保护措施,EventBridge 可确保了企业应用中的数据处理过程的合法性和安全性,为企业数据提供可靠的合规安全保证。 六、未来展望与发展路径__ AI 数据处理领域技术正处于快速发展的阶段,未来几年将会出现更多的技术创新和应用突破。EventBridge 作为这一领域的重要参与者,期待与更多开发者共同推动 AI 数据处理技术的发展和应用。 AI 数据处理技术的发展方向 在技术发展方向上,AI 数据处理将朝着更加智能化、自动化、实时化的方向发展。 智能化方面,未来的数据处理系统将更加依赖 AI 技术来实现自动化的数据理解、清洗、转换和增强。大语言模型的能力将进一步提升,能够处理更加复杂的数据处理任务。同时,多模态 AI 技术的发展将使得系统能够统一处理文本、图像、音频、视频等多种类型的数据。 自动化方面,数据处理流程将更加自动化,减少人工干预的需求。自动化的数据发现、数据治理、数据安全等功能将成为标准配置。机器学习技术将被广泛应用于数据处理流程的优化,系统能够自动学习和改进处理策略。 实时化方面,实时数据处理将成为主流需求。边缘计算技术的发展将使得数据处理能够在更接近数据源的地方进行,减少数据传输的延迟。流式处理技术将进一步成熟,能够支持更加复杂的实时数据处理场景。 行业标准化趋势 随着 AI 数据处理技术的成熟,行业标准化将成为重要趋势。标准化有助于降低技术门槛、促进技术交流、推动产业发展。在数据格式标准化方面,将出现更多的行业标准来规范数据的格式和结构,例如 CloudEvents 等。这些标准将有助于不同系统之间的数据交换和集成。 在 API 接口标准化方面,将出现统一的 API 规范来规范数据处理服务的接口。这些规范将有助于提高系统的互操作性和可移植性。 技术挑战与解决方案 未来的发展过程中,AI 数据处理技术仍将面临一些挑战,需要持续的技术创新来解决。 在数据质量挑战方面,随着数据源的增加和数据量的增长,数据质量问题将更加突出。需要开发更加智能的数据质量检测和修复技术,利用 AI 技术来自动识别和处理数据质量问题。 在性能挑战方面,随着数据处理需求的增长,系统的性能要求将更加严格。需要在算法优化、架构设计、硬件加速等方面持续创新,提升系统的处理能力。 在成本挑战方面,AI 数据处理的成本仍然较高,特别是大语言模型的调用成本。需要通过技术优化、资源调度、成本控制等手段来降低使用成本。 在安全挑战方面,数据安全和隐私保护的要求将更加严格。需要在数据加密、访问控制、隐私计算等方面持续投入,确保数据的安全性。 结语:构筑 AI 时代的数据传输基础设施 通过将事件驱动架构的技术优势与 AI 时代的数据处理需求深度融合,EventBridge 为企业构建智能化数据管道提供了全新的技术范式。 欢迎更多的数据侧伙伴加入 EventBridge 的生态体系,共同构建更多的数据源连接器、处理算法、应用模板等。通过开放的技术合作,更好地满足用户的多样化需求。AI 时代的数据基础设施建设是一个长期的过程,需要持续的技术创新和生态建设。我们相信,通过持续的努力和合作,我们能够构建更加智能、高效、可靠的 AI 数据集成基础设施,为人工智能技术的发展和应用提供强有力的支撑。 欢迎加入 EventBridge 用户交流群(钉钉群号:31481771)进行交流~
#技术探索

2025年8月13日

Apache RocketMQ EventBridge:为什么 GenAI 需要 EDA?
沈林,Apache RocketMQ PMC 成员,阿里云 EventBridge 负责人,专注于 EDA 研究。本文整理自作者在 Community Over Code Asia 2025 会议发表的主题演讲《Apache RocketMQ EventBridge: Why Your GenAI Needs EDA?》。 EDA 的核心特点是:以事件为中心,实时响应变化。它不像传统“请求响应”模式那样被动等待,而是“感知→触发→行动”全自动流转。在 AI 系统中,数据流、模型训练和推理、外部反馈等都可以作为“事件”,触发 AI 自动决策和联动执行。EDA 就像是 AI 时代的“神经系统”,让 AI 不仅能“思考”,还能“感知”和“行动”。它提升了系统的实时性、灵活性和自动化水平,是构建智能系统的关键支撑。AI 赋予系统“大脑”,EDA 构建系统的“神经”。 本文主要探讨在 AI 时代,EDA 的重要价值及它可以帮助我们解决的问题。 EDA 的第一重价值:通过 RAG 缓解 AI 幻觉 大家可能还有印象,2023 年上半年,Google 的早期 AI 模型发布时,回答一个关于詹姆斯·韦伯空间望远镜的问题时,犯了一个低级“错误”,这个答案本来在 Google 上很容易搜索到,但是 AI“一本正经”的给了一个错误答案,直接导致谷歌当天的股价跌了 8% 左右。但 AI 完全没有意识到自己的错误,这是为什么? 1. 为什么会有 AI 幻觉? AI 幻觉的产生机制比较复杂,可简单从训练和推理两个阶段进行分析: + 训练阶段: 数据覆盖不足:若训练数据不包含特定信息,模型无法“无师自通”; 过拟合:模型过度学习训练数据中的细节与噪声,导致在面对新数据时泛化能力差; 通用性与精度取舍:通用大模型为覆盖广泛领域,在特定垂直领域的准确性可能有所牺牲。 + 推理阶段: 自回归生成:LLM(大语言模型)推理本质上是一个自回归过程,基于现有 Token 预测下一个最可能的 Token,这种概率性生成机制使得幻觉成为其固有潜在分布的一部分。 连贯性优先于准确性:GenAI 输出的时候倾向于生成流畅连贯的答案,而非绝对准确的答案。 2. 如何减少 AI 幻觉? 为了解决 AI 幻觉,现在一般有三种主流的方式: + 模型微调(Finetuning): 模型不好?最能直接想到的方法就是优化模型:丰富模型训练的数据、优化模型的参数,让其在垂直场景领域,回答更加精准。这种方式在很多场景是非常有效的,而且依旧被广泛采用。但是这种方式,要求也是比较高的,如果没有一定的人力和算力成本投入,将很难实现。尤其是在知识更新频繁的领域,模型需要不断调整,长期维护,投入代价相对较高。 + 提示词工程(Prompt Engineering): 那可不可以不调整模型,而是在向 LLM 提问的时候,把相关的数据和限定条件一起给到 LLM?答案是可以的,这就是提示词工程。 但是如何构造一个好的提示词,把 LLM 需要的上下文信息给到它,这个要求也是非常高的。不同人使用,提示词的构造水平也不同:这种方式就像是把“问问题”变成了一件“手工艺术活”。而且提示词优化虽然可以“压平”部分幻觉,但只要模型权重未变,提示词没有带上相关数据,提示词只能暂时把幻觉“藏”起来,而无法真正去除。 + 检索增强生成(RAG): 那可不可以自动帮我们生成一个高质量的提示词呢?在这个提示词中,包含了 LLM 回答需要的关键信息。这个就和我们最后一个要讲的 RAG 非常像了,让我们看下 RAG 到底是什么。 3. 什么是 RAG? RAG 可以简单理解为:向 LLM 提问的时候,同时给这个问题,检索一个上下文,一起给到 LLM。比如:如果我们问 DeepSeek:本次 Apache 峰会有哪些讲师聊到了 RAG 这个话题?DeepSeek 肯定不知道,因为它没有这个数据,网上暂时也还没有相关数据。但如果给到它一个关于本次大会讲师的讲稿资料包。这样 DeepSeek 就有非常强相关的上下文,回答问题的时候,就不会跑题答偏。 那 AI 要如何做到这一点呢?主要分两步: + 建立索引:首先,需要提前把讲师资料包存起来。但资料包可能非常大,我们需要快速找到跟提问的问题相关联的数据,这里就需要用到向量化。向量化本质上是对一个事物,从多个特征维度,进行数值标记。比如,标记我这个人,可以从年龄、身高、性别等多个特征标记,标记越多越清晰。如果两个向量在多维空间中的“位置”越接近,说明它们越相似。所以,我们需要提前把数据进行向量化,存到向量数据库里。 + 检索生成:然后,当我们向 LLM 提问时,可以先把问题向量化,根据向量化后的结果,去向量数据库查询关联性最大的原始知识数据。最后,将查到的知识数据,作为上下文和问题一起传给 LLM,LLM 就可以给一个更加准确的回答了。 从这个过程中,我们会发现 RAG 有两个非常明显的优点: + 不需要用知识库数据给大模型训练,既节省了成本,又保证了数据隐私; + 不需要用提示词工程这样的“手工艺术活”,就可以让 AI 出现幻觉的概率变得足够低。 4. 为什么 EventBridge 适合做 RAG? 为什么是 EventBridge 适合来做 RAG 呢? 我们先来看下什么是 EventBridge: + EventBridge 的整个模型其实非常简洁:我们从下图左侧开始往右看,EventBridge 可以方便的把外部的数据,以标准化的事件格式,配合事件 Schema 集成到内部,中间可以存入事件总线(BUS),也可以选择不存储,然后通过过滤/转换,推送到下游服务中。 + 这个链路,正好可以满足 RAG 过程中需要的三要素:获取上游丰富的数据、自定义切分和向量化、持久化到多种向量化数据库中。 5. EventBridge 如何实现 RAG? 用一个场景举例,比如我们想建一个关于 EventBridge 知识的智能问答机器人,可以回答关于 EventBridge 的常见问题: + 我们需要把存在上游 OSS 的 EventBridge 文档,通过 EventBridge 的事件流,进行 Chunk 切分、Embedding 向量化,然后存储到向量数据库; + 完成之后,当我们向 EventBridge 智能机器人提问“EventBridge 是什么?”,智能机器人会先把这个问题向量化,然后去向量数据库查找匹配度最高的相关内容,并一起传递给 LLM,LLM 就能结合查到的资料,给出非常精准的回答,减少幻觉产生。 目前,阿里云大模型服务平台百炼的知识库 RAG 场景,已采用 EventBridge 的事件流能力,帮助众多客户减少了 LLM 问答中的幻觉问题,尤其在细分垂直领域效果显著。如果您感兴趣可以进行体验: _https://bailian.console.aliyun.com/?&tab=app/knowledgebase_ EDA 的第二重价值:推理触发器(Inference Trigger) 我们第二个想讨论的场景是推理触发器。 1. 程序使用 LLM 的规模将远超人类 目前,我们日常接触最多的 LLM 场景是人与 LLM 服务直接对话,如问 DeepSeek 一个问题或智能客服等。 但更常见且增长迅速的方式是程序触发 LLM。例如供应链优化和金融订单风控。 观察微服务就会发现,人调用 API 的量级远不如程序调用 API 的量级。相应地,我们可以想象,未来程序触发 LLM 的规模,也将远远超过人工使用 LLM。 这其中的机会,我们应该怎么把握? 2. 推理诉求无处不在 事实上,我们现有的商业系统中,已经存储了大量现成需要推理的场景。比如: + 消息服务里,存储了客户的评论,需要对其打标分析,这条评论是积极的还是消极的,并给个分数; + DB 里存储了产品的描述介绍信息,想让 AI 给一些产品描述优化建议; + OSS 或 S3 存储了大量的文档,想让 AI 对每个文档生成一个 100 字的文档摘要。 这些诉求在以前可能需要人工处理,但现在都可以交给 LLM,从而极大提升工作效率。那怎么让现有的商业系统,方便、快捷、低成本的使用 AI,甚至不需要写一行代码,这个就是 EventBridge 擅长的地方了。 3. 推理触发器:让模型被更好地使用 为此,EventBridge 提供了三把武器: + 第一把武器——实时推理并将结果存到目标端:通过 EventBridge 可以实时监听并获取存在 DB、消息、或者存储服务中的数据,然后实时调用 LLM 推理服务,并将推理结果输出存到目标端。此过程也可以结合上一部分讲到的 RAG,但中间不一定是一个 LLM,也可以是一个 Agent,甚至是一个 AI Workflow。 这个过程看似简单,但有很多需要注意的地方: 数据合并:我们刚才聊到,LLM 的推理本质上是一个自回归过程,这次的输出会作为下一次的输入,无法一次性拿到结果,很多 LLM 只能支持以流式的方式返回数据,但下游往往需要的是一个确定性的结果,所以我们需要对流式数据进行合并再输出; 数据格式:很多业务场景下有明确的格式要求。比如上面提到的例子,让 LLM 对客户评论打标和评分,需要输出一个 JSON 结构。但不是所有 LLM 在 API 层面都支持 JSON 结构输出,我们需要通过提示词进行优化,让它尽可能输出一个符合要求的 JSON 结构。 推理吞吐:LLM 的自回归生成方式,导致单次请求 RT 长、TPS 低。所以,我们需要提升高并发能力,把昂贵的 GPU 资源使用效率发挥到极致,同时需要做好 TPM 和 RPM 的限流,也就是每分钟请求次数和每分钟 Token 数的限流,以保证链路不会有大量限流异常。 可以看到,具体落地会遇到很多挑战,但 EventBridge 可以帮助客户便捷高效地解决这些问题。 + 第二把武器——基于推理结果触发任务执行: 除了让 LLM 推理输出结果存到目标端,EventBridge 还可以让 Agent 基于上游的某条消息,去调用某一个 Service,执行某一个动作,如发送邮件。 + 第三把武器——离线异步推理提高资源利用率: 对于实时性要求不高的推理场景,可以通过 EventBridge 实现离线异步推理,让稀缺的 GPU 资源被更好地调度利用,在云上的成本至少比实时推理便宜一半。 AI 的强大在于其应用,而 EDA(事件驱动架构)非常适合作为推理触发器,激活 AI 的价值。 EDA 的第三重价值:构建 Agent 通信基础设施 现在 AI Infra 非常热门,其概念非常广泛。对标 IT Infrastructure,我们这里讨论的话题是 AI 的通信。 1. 微服务的通信离不开 Messaging,Agent 间的通信应该如何? 在微服务时代,消息系统在微服务间的通信中扮演了重要角色。到了 AI 时代,消息系统是否依然起着关键作用?具体形式和产品又会有哪些变化? 2. Agent 和 Service 的通信:Function Calling、MCP 为了回答这个问题,我们先看下现在 AI 的通信是怎么做的。 首先,我们看下 Agent 和传统 Service 之间的通信。目前有两种主流的方式:Function Calling和MCP。 + Function Calling:是 OpenAI 公司在 2023 年提出的。因为 LLM 本身是文本生成器, 不具备访问外部系统的能力。但是我们可以对 LLM 进行训练微调,让 LLM 理解外部的一些工具函数的定义,这样在遇到提问时,就可以按需生成这些工具函数需要的参数,然后调用这些工具函数。 + MCP:是 2024 年 11 月 Anthropic 提出的,全称 Model Context Protocol‌,其本意是用来解决 LLM 无状态的问题。LLM 每次调用都是独立的,而 MCP 是用来给 LLM 提供运行上下文,相当于一个“Session 机制”。但是为什么 MCP 会被拿来和 Function Calling 放在一起呢?因为它也可以拿来调用工具函数。和 Function Calling 不同的是,它不需要 LLM 提前训练微调来理解函数的入参和返回值,而是通过上下文“提示词”告诉 LLM 参数返回值。所以 MCP 相比 Function Calling,对模型的依赖更小,更加通用,但效果相比 Function Calling 要差一点。 3. Agent 和 Agent 的通信:A2A 我们再来看下 Agent 与 Agent 之间是怎么通信的。 Google 在今年 4 月份的时候,提出了一个 A2A 的通讯协议,其核心运行机制分为四步: + 第一步:Client Agent 通过 Agent Card,看下远端 Agent 有哪些能力; + 第二步:根据 Agent Card 的能力描述,调用远端 Agent,创建一个 Task,让其帮忙完成一个任务; + 第三步:由于任务可能比较耗时,不一定能够立即响应。所以 A2A 协议允许远端 Agent 通过 SSE 协议,不间断的将任务的状态信息更新给 Client Agent; + 第四步:再将结果返回给 Client Agent,当然结果本身也是可以流式返回的。 4. MCP 和 A2A 之间的区别 那 MCP 和 A2A 协议有什么区别呢?Google 给它们的关系做了一个描述: + A2A 协议像一种沟通语言:如果把 Agent 比做人,一个人如果能力有限,想让其他 Agent 帮忙怎么办?A2A 协议就可以派上用场了,A2A 协议像一种沟通语言,可以让 Agent 和其他 Agent 用同一种语言交流,不至于说话的时候驴唇不对马嘴。 + MCP 就像工具使用说明书:Service 等价于人使用工具,可以提升人解决问题的能力。不过,使用工具也需要有些技巧,MCP 就像工具使用说明书,可以让Agent 更方便的使用这些 Service,来扩展 Agent 的能力。 我们把 Agent 比做人,把 Service 比做工具。但是,请人帮忙和请工具帮忙,真的可以分得这么清楚吗? 5. 预测 1:A2A and MCP 可能走向融合 A2A 和 MCP的职责,设想很完美,但是实际运行的时候会遇到很多挑战。我们先来一起看看在 MCP 和 A2A 协议下,两者用来声明一个 Agent 或者 Service 能力的时候是怎么样的? + 这里举例了一个“查询北京天气”的服务,会发现两者声明自己能力的时候,非常类似: + 其次,两者的传输层协议也都非常相似,都支持 SSE 和 JSONGRPC; + 最后,我们从工程师开发角度,推演一个场景:当一个 Agent 需要获取“查询天气”的能力时,它并不真正关心该能力是由一个 Service 还是另一个 Agent 提供的。Agent 的核心关注点在于能力的接口定义:即有哪些可用能力、如何调用,以及预期的返回结果是什么。至于该能力的后端提供者是 Service 还是 Agent,对于调用方而言是无需关注的实现细节。 这里我们的第一个预测是:A2A and MCP 未来可能会合并,但具体怎么发展,还要看生态的选择。 6. 预测 2: 点对点的通信是不够的 这里的第二个预测是:现有 MCP 和 A2A 协议中,只包含的点对点通信是不够的。按照 A2A 协议的推演,当一个系统中有很多 Agent 时,所有 Agent 都通过“长连接”集成在一起:大家第一个直观的感受是什么? 连接太多了! 如果两个 Agent 通过“长连接”集成在一起,感觉可能也没有什么。但是如果一个 Agent 同时需要和数百个甚至上千个 Agent 通信,系统中就会产生大量的长连接。 + 首先对每一个 Agent 来讲,资源开销就非常大; + 其次,网状的连接,一旦某一个 Agent 出现问题,hang 住了某些资源,会不会拖垮其他 Agent 的服务?甚至拖垮整个系统?这类问题在微服务中,再常见不过了。 + 最后,即使不会被出问题的 Agent 拖垮服务,但当这个出问题的 Agent 恢复时,之前的通讯是否依旧可以继续追踪?再次执行,是否已经幂等,是否有风险? 这里面有非常多的稳定、性能、成本、扩展性的挑战。这些问题在微服务中已经被多次验证过,有些经验我们可以学习过来。 7. EventBridge Super Agent 基于上面两个预测判断,我们给出了一个 RocketMQ EventBridge 的回答: 在这个模型中,我们引入了一个 EventBridge Agent Proxy 的角色。我们姑且称它为“Super”Agent ,但它不是一个真正的 Agent,而是可以代理 Agent 的能力。 + 首先,所有 Agent 都可以写一份自己的个人简历,把自己拥有的能力,注册到“Super”Agent上; + 如果某个 Agent 需要调用其他 Agent 的能力,它可以在 “Super” Agent 中查找是否有其需要的 Agent。如果有,就可以直接通过与 “Super” Agent 的交互,来获得这个能力; + 当这个 Agent 需要多个其他 Agent 的能力时,也不需要和每一个 Agent 交互,都可以通过 “Super” Agent 代理实现,将原本的 N:N 模型简化为 1:1 模型。 + 除此之外,“Super” Agent 中的 Proxy 除了 A2A 协议,还会路由和跟踪每一个 Task 的运行状态,即使在异常/重启/集群扩容等场景下,每一个 Task 都能被按预期处理,并把状态同步回 Agent。 “Super”Agent 和微服务注册中心有点类似,不过区别在于,它不光是提供了微服务查找寻址的作用,同时还起到了服务代理的作用。如果我们脑洞再大一点,可以不仅局限于 Task 级别的任务追踪和管理,甚至还可以往上考虑一层,提供“User”级别的上下文: + 我们现在的 Agent 都是没有记忆的,我们之前跟它说过的话,过几天再问它,它就不记得你了。 + 但是每个人使用工具的习惯是不一样的。如果 Agent 能更好的理解你,记得你,就可以提供更加人性化的服务。 + 作为 Agent 注册和代理中心,如果在为 Agent 提供代理的同时,还能同时提供“User”的上下文,并且用 Agent 的越多,“User”的身份画像越完善;反过来,Agent 越依赖,进入一个正向循环。 目前,EventBridge Agent 代理还处于理论探索阶段,欢迎大家一起交流。 参考文献与延伸阅读 + [RetrievalAugmented Generation for KnowledgeIntensive NLP Tasks, Lewis et al., 2020] + [Model Context Protocol (MCP) Specification, 2024] + [A2A: AgenttoAgent Communication Protocol, Google, 2024] + Apache RocketMQ EventBridge 官方文档: _https://rocketmq.apache.org/_
作者:沈林
#技术探索

2025年7月25日

Apache RocketMQ for AI 战略升级,开启 AI MQ 新时代
前言 随着 AIGC(生成式人工智能)浪潮席卷全球,大语言模型(LLM)正在深刻重塑千行百业、重构应用开发范式。这场由模型与算法驱动的技术革命,带来了前所未有的机遇,也为开发者构建 AI 应用带来了全新而严峻的工程挑战:如何保障长耗时对话的连续性?如何公平高效地调度有限的算力资源?如何避免多 AI Agent 或复杂工作流的级联阻塞问题?...... 这些挑战的核心诉求在于:我们需要一种可靠且高效的异步通信机制,来支撑应用、数据与模型之间的协同交互。作为分布式系统不可或缺的基础组件,Apache RocketMQ 在微服务异步解耦与数据流处理等方面表现出色。在 AI 时代,如何应对复杂多变的业务场景、满足更高的性能与体验要求,已成为 Apache RocketMQ 演进过程中的关键课题。 挑战显现:传统消息队列在 AI 场景中的局限性 在传统分布式架构中,消息队列作为实现异步解耦、流量削峰及数据流处理的成熟方案,其可靠性已得到广泛验证。然而,随着 AI 应用在交互模式、资源形态和应用架构上的根本性变革,如果客户采用同步阻塞架构、或者基于传统消息队列的异步化架构,都会面临很多新挑战。 + 交互模式:从“请求 响应”到“长时会话” 传统应用的交互模式一般是无状态,短平快的请求 响应模式,一个用户请求会在毫秒级返回结果,如收藏商品、加购物车、下单等场景。 而 AI 应用交互(如多轮对话,多模态)具有持续时间长(单次推理可达数秒至分钟级)、多轮次上下文依赖(对话历史可达数十轮)、计算资源消耗大等特征。现有的 AI 应用若采用 HTTP 长连接、 WebSocket 等协议结合后端同步阻塞架构,极易因为网络抖动、网关重启或连接超时等偶发问题,导致上下文丢失、推理任务中断,造成不可逆的算力浪费和用户体验的损害。 + 资源形态:从“通用服务器”到“稀缺算力” AI 推理依赖昂贵的 GPU 资源,瞬时高并发流量可能冲击推理服务稳定性,导致算力资源浪费。传统消息队列虽能实现流量削峰填谷,但在多租户共享资源池场景下,由于缺乏精细的消费流量控制机制,难以实现精细化、差异化的资源调度,导致资源利用率低下。 + 应用架构:从“服务调用”到“智能体协作” AI Agent 或多步工作流本质上是长周期任务的协同。若采用同步调用机制,任何单节点阻塞都可能引发整个任务链级联失败。因此,需要一个高效、可靠的异步通信枢纽,来连接这些独立且长时间运行的智能体或任务节点,实现非阻塞协同,保障分布式智能系统稳定运行。 此外,传统消息队列还面临其他挑战,如:在处理 AI 多模态等大负载时,因传统消息队列对消息大小有更严格的限制,需要采取繁琐的变通方案,从而增加了系统复杂度和故障风险;传统消息队列通常需要手动配置或复杂脚本进行 Topic 管理,会带来运维成本攀升与资源泄漏隐患等。 破局之道:Apache RocketMQ 进化为 AI 消息引擎 系化重构:采用存算分离架构实现资源弹性、通过存储层多副本机制保障高可用性、引入轻量级 SDK 提升客户端灵活性等等,最终达成了"高弹性、高可用、低成本"的核心目标,也为解决 AI 时代的工程难题打下了坚实的基础。 面对 AI 时代带来的全新挑战,Apache RocketMQ 进行了前瞻性战略升级,从传统消息中间件进化为专为 AI 时代打造的消息引擎,成为构建下一代 AI 应用不可或缺的关键基础设施。 这一演进的核心在于两大“颠覆性创新”: + 轻量化通信模型:支持动态创建百万级 LiteTopic,特别适用于长时会话、AI 工作流和 AgenttoAgent 交互等场景。显著提升系统的扩展性与灵活性,满足 AI 应用复杂的通信需求。 + 智能化资源调度:通过削峰填谷、定速消费、自适应负载均衡和优先级队列等功能,实现对稀缺算力资源的精细化管理和平稳高效调度,确保在高并发和多租户环境下高效利用资源。 这些创新使 Apache RocketMQ 成功突破了传统消息队列的局限,精准匹配 AI 应用的独特需求,为现代 AI 系统提供稳定且高效的消息中枢服务。 场景实践:RocketMQ for AI 如何破解 AI 工程挑战 “会话即主题”:用 LiteTopic 终结长会话状态管理难题 AI 应用的交互模式具有特殊性,即长耗时、多轮次且高度依赖高成本计算的会话。当应用依赖 SSE 或WebSocket 等长连接时,一旦连接中断(如网关重启、链接超时、网络不稳定触发),不仅会导致当前会话上下文的丢失,更会直接造成已投入的 AI 任务作废,从而浪费宝贵的算力资源。因此,构建一个健壮的会话管理机制,实现在长耗时的对话过程中保障会话上下文的连续性和完整性,减少重试带来的算力资源浪费,同时降低应用程序代码的复杂度,是该场景的核心技术攻坚点。 为解决长会话状态管理难题,RocketMQ for AI 提出了一种革命性的轻量化解决方案——“会话即主题”,系统可为每个独立会话(Session)或问题(Question)动态创建一个专属的轻量级主题(LiteTopic)。 当客户端与 AI 服务建立会话时,系统将动态创建一个以 SessionID 命名的专属队列(例如 chatbot/{sessionID}或 chatbot/{questionID})。该会话的所有交互历史和中间结果均以消息形式在该主题中有序传递 。即使客户端断连,重连后只需继续订阅原主题 LiteTopic chatbot/{sessionID},即可无缝恢复上下文,实现断点续传,继续推送响应结果。 该模型有效解决了“无状态后端”与“有状态体验”之间的矛盾,将开发者从繁琐的会话状态保持、重连处理与数据一致性校验中彻底解放出来。不仅大幅简化了工程实现,也从根本上避免了因任务中断重试造成的算力资源浪费,为用户带来流畅、连续、稳定的 AI 交互体验。 图 1 这一创新模式的实现,得益于 RocketMQ 专为 AI 场景设计的强大特性: + 百万级队列支持:RocketMQ 支持在单个集群中高效管理百万级 LiteTopic,能够为海量并发会话或任务提供独立 Topic,并且保障性能无损。 + 轻量化资源管理:RocketMQ 队列的创建和销毁极其轻量和自动化,系统可按需自动创建与回收 LiteTopic(如客户端连接断开或 TTL 到期时),避免资源泄漏和手动干预,显著降低运维复杂度和成本。 + 大消息体传输:RocketMQ 可处理数十 MB 甚至更大的消息体,充分满足 AIGC 场景中常见的庞大数据负载的传输需求,如大量上下文的 Prompt、高清图像或长篇文档等。 + 顺序消息保障:在单个会话队列中,通常采用 LLM 的流式输出模式以降低问答延迟,RocketMQ 原生支持顺序消息,确保推理结果流式输出到客户端的顺序性,保障会话体验连贯流畅。 + 全面可观测性:RocketMQ 全面支持 OpenTelemetry 标准的 Metrics 和 Tracing,可实时监控消息收发量、消息堆积等关键指标,查询消息收发轨迹详情,为多 Agent 系统的调试与优化提供有力支撑。 应用案例:阿里巴巴安全团队“安全小蜜”智能助手 阿里巴巴安全团队推出的“安全小蜜”智能助手,在应对大规模并发会话时,曾面临会话上下文丢失、任务中断导致资源浪费等挑战。 通过引入 RocketMQ 的 LiteTopic 能力重构会话保持机制,“安全小蜜”成功实现了会话状态的自动持久化与快速恢复。这不仅能够在多轮对话中,对用户的安全问题进行快速、精准的理解和响应,还大幅简化了工程实现复杂度,有效降低了因任务中断引发的资源浪费,整体提升了用户体验与业务处理效率。 目前,阿里云多个产品线的 AI 答疑机器人也已采用该方案完成升级,进一步验证了该架构在多样化 AI 场景下的通用性与有效性。 智能算力编排:不止于负载均衡,构建可控算力调度中枢 大模型服务在资源调度上,普遍面临两大核心挑战: + 负载不匹配:前端请求突发性强,而后端算力资源有限且相对稳定,直接对接易导致服务过载崩溃或算力资源浪费。 + 无差别分配:在实现流量平稳后,如何确保高优先级任务优先获得宝贵的计算资源,成为提升整体服务价值的关键。 在此背景下,Apache RocketMQ 发挥了关键作用:不仅作为前端请求与后端算力服务之间的缓冲调度层,将不规则的流量“整形”为平稳、可控的请求流,还通过定速消费、优先级队列等能力,提供“可控的算力调度中枢” ,实现对请求流量的细粒度控制,大幅提升资源利用效率与服务质量。 图 2 RocketMQ 所具备的一系列核心特性,为实现智能算力调度提供了坚实的基础: + 天然削峰填谷,保护核心 AI 算力:RocketMQ 天然具备“流量水库”的作用,能缓存突发请求,使后端 AI 模型服务根据自身处理能力,基于类似滑动窗口模式自适应消费负载均衡,避免系统过载或资源浪费。 + 定速消费,最大化 AI 算力利用率:RocketMQ 支持定速消费能力,可为消费者组 ConsumerGroup 设置消费 quota。开发者可灵活定义 AI 算力的每秒调用量,在保障核心 AI 算力不过载的前提下,最大限度提升吞吐量。 + 优先级队列,智能调度与分配算力资源:再进一步,RocketMQ 的消息优先级机制还为复杂的业务场景提供了灵活优雅的资源调度方案: 抢占式分配:当高价值任务(如 VIP 用户请求、关键系统分析)进入系统时,可将其标记为高优先级消息。RocketMQ 确保这些消息被优先消费,让宝贵的算力资源优先服务于最关键的任务。 按权重分配:在共享算力池场景下,可依据各业务请求的实时执行状态设置请求消息优先级,调整请求执行的先后顺序,既保障整体吞吐效率,又防止个别租户因资源饥饿而无法获得算力。 应用案例:阿里云大模型服务平台百炼、通义灵码 阿里云大模型服务平台百炼的网关系统通过引入 RocketMQ 实现了对请求流量的削峰填谷,有效将前端不规则的访问压力转化为平稳、可控的后端算力调度。同时,借助 RocketMQ 的消息优先级功能,根据用户的请求流量设置合理的优先级,避免了大流量用户请求导致小流量用户分配不到算力资源,显著提升了资源利用率和服务公平性。 通义灵码通过 RocketMQ 将其 codebase RAG 架构从原有的同步流程升级为异步流程,实现代码向量化与流量削峰填谷,保障了系统全链路的稳定性。 异步通信枢纽:LiteTopic 让 A2A 与 AI 工作流彻底告别同步阻塞 Google 提出的 A2A 协议推荐采用异步通信机制来解决 AI 任务长耗时带来的同步阻塞问题。其核心机制是将一次请求 响应(RequestReply)调用,解耦为一个初始请求和一个异步通知(pushNotificationConfig)。在各类 Agentic AI 平台的工作流中,每个节点执行完任务后都需要向下游节点通知执行结果,而异步通信正是支撑这种复杂协作的关键。 由于 AI 任务普遍运行时间长,工作流场景同样需要解决“同步调用导致级联阻塞”的问题。无论是 Agent 之间的外部通信,还是工作流内部的任务流转,都面临一个共同挑战:如何优雅地处理长耗时任务,避免系统阻塞?核心解决方案是采用统一的架构模式——将长耗时、有状态的交互,转化为由无状态、事件驱动的可靠异步通知机制来连接。 前文提到,Apache RocketMQ 全新推出的 LiteTopic 机制,凭借其轻量化、自动化的动态管理能力,可高效实现 RequestReply 模式的异步通信。核心流程如下: + 动态创建回复通道:当 Agent A 向 Agent B 发起请求时(如 message/send),无需同步等待响应。而是在请求中嵌入唯一的动态回复地址,例如 a2atopic/{taskID}。同时,Agent A 订阅该地址,RocketMQ 会在首次连接时自动创建这个轻量化的 SubTopic,相当于为本次任务开辟了一个专属的异步通信通道。 + 异步投递执行结果:Agent B 按照自己的节奏处理任务。在任务完成后,它将结果封装为消息,直接发布到请求中指定的回复地址 a2atopic/{taskID}。 + 自动回收通信资源:当 Agent A 成功接收并处理完结果后,会断开与该 LiteTopic 的连接。RocketMQ 的智能资源管理机制会检测到该 Topic 已无消费者,并在设定的 TTL(TimeToLive)后自动清理该 Topic 资源。整个过程完全自动化,无需人工干预,杜绝了资源泄露的风险。 RocketMQ 的 LiteTopic 方案优势在于其系统性的设计:百万级 LiteTopic 的海量并发能力,结合按需创建、用后即焚的零开销资源管理,从根本上解决了大规模 Agent 协作场景下的扩展性与易用性问题。同时,顺序消息保障机制确保了流式或多步任务的逻辑正确,而内置的持久化与高可用机制则保障了异步通信的最终一致性与可靠性。这些能力共同为 A2A 场景构建了一个真正健壮、高效且可扩展的异步通信基础设施。 应用案例:阿里 AI 实验室 阿里 AI 实验室在其多 AI Agent 工作流中,基于 RocketMQ 构建了一套高效、可靠的 Agent 编排体系。工作流中的每个节点均采用事件驱动架构,实现可靠、持久化的通信。借助 LiteTopic 机制,还能实现 Agent 之间的节点级通信,从而实现任务流程的精细化编排。 在多 Agent 协同执行 AI 任务的过程中,即使遇到 Agent 发布重启、调用超时等情况导致完整任务链中断,也能通过持久化事件流的可靠重试,继续推进中断的 AI 任务,既有效避免了资源浪费,又显著提升了用户体验。 架构解析:RocketMQ for AI 的关键技术升级 为实现前文所述的创新模型,Apache RocketMQ 需具备在单个集群中高效管理百万级 LiteTopic 的能力,但原有架构在支持该能力时面临两大核心挑战:在存储层面,原先基于文件的索引和元数据管理机制已难以支撑如此量级的 Topic;在消息分发投递过程中,当单个消费者订阅大量的 LiteTopic 时,旧有的长轮询通知机制在延迟和并发性能上也显得捉襟见肘。 因此,要实现海量 LiteTopic 的高效管理,必须攻克以下两个关键技术难题: + 百万级 LiteTopic 的元数据存储与索引结构的技术方案; + 面向海量 LiteTopic 订阅场景的高效消息分发与投递机制。 图 3 百万级 LiteTopic 的数量级跃升,意味着索引和元数据无法沿用之前的模型。若为每个主题维护一个或者多个基于物理文件的索引结构,将带来巨大的系统开销和运维负担。 为此,Apache RocketMQ 基于其 LMQ 存储引擎 和 KV Store 能力,重新设计了元数据管理和索引存储: + 统一存储、多路分发:所有消息在底层的 CommitLog 文件中仅存储一份,但通过多路分发机制,可以为不同的 LiteTopic 生成各自的消费索引(ConsumerQueue,简称 CQ)。 + 索引存储引擎升级:摒弃了传统的文件型 CQ 结构,替换为高性能的 KV 存储引擎 RocksDB。通过将队列索引信息和消息物理偏移量(Physical Offset)作为键值对存储,充分发挥 RocksDB 在顺序写入方面的高性能优势,从而实现对百万级队列的高效管理。 在 LiteTopic 存储模型的基础上,RocketMQ 进一步对消息分发与投递机制进行优化,针对单个消费者订阅上万个 LiteTopic 的场景,重新设计了一套创新的事件驱动拉取(EventDriven Pull)机制,如图 3 所示: + 订阅关系(Subscription Set)管理:Broker 负责管理消费者订阅关系 Subscription 的 LiteTopic Set,并支持增量更新,从而能够实时、主动地感知消息与订阅的匹配状态。 + 事件驱动与就绪集(Ready Set)维护:每当有新消息写入,Broker 会立即根据其维护的 Subscription Set 进行匹配,并将符合条件的消息(或其索引)添加到为消费者维护的 Ready Set 中。 + 高效 Poll Ready Set:消费者只需对 Ready Set 发起 poll 请求,即可从 Ready Set 中获取所有匹配的消息。这种方式允许 Broker 将来自不同主题、不同流量的消息进行合并与攒批,在一次响应中高效地返回给消费者,显著降低了网络交互频率,从而提升整体性能。 通过在存储层与分发机制的创新升级,Apache RocketMQ 有效解决了 LiteTopic 模型的关键挑战:在存储层面,采用高性能的 RocksDB 替代传统文件索引,实现了对百万级元数据的高效管理;在消息分发层面,通过创新的“事件驱动拉取”模型,由 Broker 主动维护订阅集与就绪集,将消费者的海量轮询转变为对聚合消息的单次高效拉取,确保了在海量订阅场景下的低延迟与高吞吐。 展望未来:开启 AI MQ 新时代,RocketMQ for AI 持续演进 Apache RocketMQ for AI 的演进,标志着其已从传统消息中间件,全面升级为专为 AI 时代打造的消息引擎。通过在轻量化通信模型与智能化资源调度方面的“颠覆性创新”,Apache RocketMQ 突破了传统消息中间件的能力边界,成为构建高可用、可扩展 AI 应用的关键基础设施,展现出其在 AI 工程化体系中的核心价值。 Apache RocketMQ for AI 的增强能力已在阿里巴巴集团内部以及阿里云大模型服务平台百炼、通义灵码等产品中经过大规模生产环境的验证,充分证明了其在高并发、复杂的 AI 场景下的成熟度与可靠性。 当然,这只是一个开始。AI 工程化仍处于快速发展阶段,Apache RocketMQ 作为核心基础设施,仍有广阔的优化与创新空间。未来,阿里云消息团队将持续围绕用户 AI 场景迭代升级,协同 Apache RocketMQ 开源社区的贡献者们打磨核心 AI 能力,并逐步将经过阿里集团 AI 业务验证过的方案与特性,持续反馈到开源社区。 我们坚信,通过持续的技术探索与开放共建,Apache RocketMQ for AI 将推动“AI 原生消息队列”(AI MQ)成为行业标准,助力全球开发者更轻松、更高效地构建下一代智能应用,共同推动 AI 工程实践的标准化、普及化与生态繁荣。
作者:文婷、不铭、墨岭、稚柳
#技术探索

2025年7月22日

Apache RocketMQ 5.0 架构解析:如何基于云原生架构支撑多元化场景
本文将从技术角度了解 RocketMQ 的云原生架构,了解 RocketMQ 如何基于一套统一的架构支撑多元化的场景。 文章主要包含三部分内容。首先介绍 RocketMQ 5.0 的核心概念和架构概览;然后从集群角度出发,从宏观视角学习 RocketMQ 的管控链路、数据链路、客户端和服务端如何交互;最后介绍消息队列最重要的模块存储系统,了解 RocketMQ 如何实现数据的存储和数据的高可用,以及如何利用云原生存储进一步提升竞争力。 概览 在介绍 RocketMQ 的架构之前,先从用户视角来看下 RocketMQ 的关键概念以及领域模型。如下图,这里按照消息的流转顺序来介绍。 在 RocketMQ 中,消息生产者一般对应业务系统的上游应用,在某个业务动作触发后发送消息到 Broker。Broker 是消息系统数据链路的核心,负责接收消息、存储消息、维护消息状态、消费者状态。多个 broker 组成一个消息服务集群,共同服务一个或多个 Topic。 生产者生产消息并发送到 Broker,消息是业务通信的载体,每个消息包含消息 ID、消息 Topic、消息体内容、消息属性、消息业务 key 等。每条消息都属于某个 Topic,表示同一个业务的语义。 在阿里内部,交易消息的 Topic 被称为 Trade,购物车消息称为 Cart,生产者应用会将消息发送到对应的 Topic 上。Topic 里还有 MessageQueue,用于消息服务的负载均衡与数据存储分片,每个 Topic 包含一个或多个 MessageQueue,分布在不同的消息 Broker。 生产者发送消息,Broker 存储消息,消费者负责消费消息。消费者一般对应业务系统的下游应用,同一个消费者应用集群共用一个 Consumer Group。消费者会与某个 Topic 产生订阅关系,订阅关系是 Consumer Group+Topic +过滤表达式的三元组,符合订阅关系的消息会被对应的消费者集群消费。 接下来就从技术实现角度进一步深入了解 RocketMQ。 架构概览 下图是一张 RocketMQ 5.0 的架构图,RocketMQ 5.0 的架构从上往下可分为 SDK、NameServer、Proxy 与 Store 层。 SDK 层包括 RocketMQ 的 SDK,用户基于 RocketMQ 自身的领域模型来使用 SDK。除了 RocketMQ 自身的 SDK 之外,还包括细分领域场景的业界标准 SDK,比如面向事件驱动的场景,RocketMQ 5.0 支持 CloudEvents 的 SDK;面向 IoT 的场景,RocketMQ 支持物联网 MQTT 协议的 SDK;为了方便更多传统应用迁移到 RocketMQ,还支持了 AMQP 协议,未来也会开源到社区版本里。 Nameserver 承担服务发现与负载均衡的职责。通过 NameServer,客户端能获取 Topic 的数据分片与服务地址,链接消息服务器进行消息收发。 消息服务包含计算层 Proxy 与存储层 RocketMQ Store。RocketMQ 5.0 是存算分离的架构,这里的存算分离强调的主要是模块和职责的分离。Proxy 与 RocketMQ Store 面向不同的业务场景可以合并部署,也可以分开部署。 计算层 Proxy 主要承载消息的上层业务逻辑,尤其是面向多场景、多协议的支持,比如承载 CloudEvents、MQTT、AMQP 的领域模型的实现逻辑与协议转换。面向不同的业务负载,还可将 Proxy 分离部署,独立弹性,比如在物联网场景,Proxy 层独立部署可以面向海量物联网设备连接数进行弹性伸缩,与存储流量扩缩容解耦。 RocketMQ Store 层则负责核心的消息存储,包括基于 Commitlog 的存储引擎、多元索引、多副本技术与云存储集成扩展。消息系统的状态全部下沉到 RocketMQ Store,其组件全部实现无状态化。 服务发现 下面详细看一下 RocketMQ 的服务发现,如下图所示。RocketMQ 的服务发现的核心是 NameServer,下图是 Proxy 与 Broker 合并部署的模式,也是 RocketMQ 最常见的模式。 每个 Broker 集群会负责某些 Topic 的服务,每个 broker 都会将自身服务的 topic 信息注册到 NameServer(下面简称 NS)集群,与每个 NameServer 进行通信,并定时与 NS 通过心跳机制来维持租约。服务注册的数据结构包含 topic 与 topic 分片。示例中 broker1 与 broker2 分别承载 topicA 的一个分片。在 NS 机器上会维护全局视图,topicA 有两个分片分别在 broker1 与 broker2。 RocketMQ SDK 在对 TopicA 进行正式的消息收发之前,会随机访问 NameServer 机器,从而获取到 topicA 有哪些分片,每个数据的分片在哪个 broker 上,与 broker 建立好长连接,然后再进行消息的收发。 大部分项目的服务发现机制会通过 zookeeper 或 etcd 等强一致的分布式协调组件来担任注册中心的角色,而 RocketMQ 有自己的特点,如果从 CAP 的角度来看,注册中心采用 AP 模式,NameServer 节点无状态,是 sharednothing 的架构,有更高的可用性。 如下图,RocketMQ 的存算分离可分可合,采用分离的部署模式,RocketMQ SDK 直接访问无状态的 Proxy 集群。该模式可以应对更复杂的网络环境,支持多网络类型的访问如公网访问,实现更好的安全控制。 在整个服务发现机制中,NameServer、Proxy 都为无状态,可以随时进行节点增减。有状态节点 Broker 的增减基于 NS 的注册机制,客户端可以实时感知、动态发现。在缩容过程中,RocketMQ Broker 还可以进行服务发现的读写权限控制,对缩容的节点禁写开读,待未读消息全消费后,再实现无损平滑下线。 负载均衡 通过上文的介绍了解了 SDK 是如何通过 NameServer 来发现 Topic 的分片信息 MessageQueue,以及 Broker 地址的,基于这些服务发现的元数据,下面再来详细介绍下消息流量是如何在生产者、RocketMQ Broker 和消费者集群进行负载均衡的。 生产链路的负载均衡如下图如所示:生产者通过服务发现机制获取到 Topic 的数据分片以及对应的 Broker 地址。服务发现机制是比较简单,在默认情况下采用 RoundRobin 的方式轮询发送到各个 Topic 队列,保证 Broker 集群的流量均衡。在顺序消息的场景下会略有不同,基于消息的业务主键 Hash 到某个队列发送,如果有热点业务主键,Broker 集群也可能出现热点。除此之外,基于元数据还能根据业务需要扩展更多的负载均衡算法,比如同机房优先算法,可以降低多机房部署场景下的延迟,提升性能。 消费者的负载均衡:拥有两种类型的负载均衡方式,包括队列级负载均衡和消息粒度的负载均衡。 最经典的模式是队列级负载均衡,消费者知道 Topic 的队列总数和同一个 Consumer Group 下的实例数,可以按照统一的分配算法,类似于一致性 hash 的方式,使每个消费者实例绑定对应队列,只消费绑定队列的消息,每个队列的消息也只会被消费者实例消费。该模式最大的缺点是负载不均衡,消费者实例要绑定队列且有临时状态。如果有三个队列,有两个消费者实例,则必然有消费者需要消费 2/3 的数据,如果有 4 个消费者,则第四个消费者会空跑。因此,RocketMQ 5.0 引入了消息粒度的负载均衡机制,无需绑定队列,消息在消费者集群随机分发,保障消费者集群的负载均衡。更重要的是,该模式更加符合未来 Serverless 化的趋势,Broker 的机器数、Topic 的队列数与消费者实例数完全解耦,可以独立扩缩容。 存储系统 前面通过架构概览和服务发现机制,已经对 RocketMQ 有比较全局性的了解,接下来将深入 RocketMQ 的存储系统。存储系统对 RocketMQ 的性能、成本、可用性有决定性作用。 RocketMQ 的存储核心由 commitlog、ConsumeQueue 与 index 文件组成。 消息存储首先写到 commitlog,刷盘并复制到 slave 节点完成持久化,commitlog 是 RocketMQ 存储的 source of true,可以通过它构建完整的消息索引。 相比于 Kafka,RocketMQ 将所有 topic 的数据都写到 commitlog 文件,最大化顺序 IO,使得 RocketMQ 单机可支撑万级的 topic。 写完 commitlog 之后,RocketMQ 会异步分发出多个索引,首先是 ConsumeQueue 索引,与 MessageQueue 对应,基于索引可以实现消息的精准定位,可以按照 topic、队列 ID 与位点定位到消息,消息回溯功能也是基于该能力实现的。 另外一个很重要的索引是哈希索引,它是消息可观测的基础。通过持久化的 hash 表来实现消息业务主键的查询能力,消息轨迹主要基于该能力实现。 除了消息本身的存储之外,broker 还承载了消息元数据的存储以及 topic 的文件,包括 broker 会对哪些 topic 提供服务,还维护了每个 topic 的队列数、读写权限、顺序性等属性,subscription、consumer offset 文件维护了 topic 的订阅关系以及每个消费者的消费进度,abort、checkpoint 文件则用于完成重启后的文件恢复,保障数据完整性。 Topic 高可用 前面站在单机的视角,从功能的层面学习 RocketMQ 的存储引擎,包括 commitlog 和索引。现在重新跳出来再从集群视角看 RocketMQ 的高可用。 RocketMQ 的高可用指当 RocketMQ 集群出现 NameServer、Broker 局部不可用时,指定的 topic 依然可读可写。 RocketMQ 可以应对三类故障场景。 场景 1:某对 Broker 的单机不可用 比如,当 Broker2 主节点宕机,备节点可用,TopicA 依然可读可写,其中分片 1 可读可写,分片 2 可读不可写,TopicA 在分片 2 的未读消息依然可以消费。总结来说,即只要 Broker 集群里任意一组 Broker 存活一个节点,则 Topic 的读写可用性不受影响。如果某组 Broker 主备全部宕机,则 Topic 新数据的读写也不受影响,未读消息会延迟,待任意主备启动才能继续消费。 场景 2:NameServer 集群部分不可用 由于 NameServer 是 sharednothing 架构,每个节点都为无状态,并且为 AP 模式,无需依赖多数派算法,因此只要有一台 NameServer 存活,则整个服务发现机制都正常,Topic 的读写可用性不受影响。 场景 3:NameServer 全部不可用 由于 RocketMQ 的 SDK 对服务发现元数据有缓存,只要 SDK 不重启,依然可以按照当下的 topic 元数据继续进行消息收发。 MessageQueue 的高可用基础概念 上一个小节中讲到 Topic 的高可用原理,从它的实现中可以发现虽然 Topic 持续可读可写,但是 Topic 的读写队列数发生变化。队列数变化,会对某些数据集成的业务有影响,比如说异构数据库 Binlog 同步,同一个记录的变更 binlog 会写入不同的队列,重放 binlog 可能会出现乱序,导致脏数据。所以还需要对现有的高可用进一步增强,要保障在局部节点不可用时,不仅 Topic 可读可写,并且 Topic 的可读写队列数量不变,指定的队列也是可读可写的。 如下图,NameServer 或 Broker 任意出现单点不可用,Topic A 依然保持 2 个队列,每个队列都具备读写能力。 5.0 HA 的特点 为了解决上述的场景,RocketMQ 5.0 引入全新的高可用机制,核心概念如下: + DLedger Controller:基于 raft 协议的强一致元数据组件,执行选主命令,维护状态机信息。 + SynStateSet:维护处于同步状态的副本组集合,集合里的节点都有完整的数据,主节点宕机后,从集合中选择新的主节点。 + Replication:用于不同副本之间的数据复制、数据校验、截断对齐等事项。 下面是 5.0 HA 的架构全景图,新的高可用架构具备多个优势。 + 在消息存储引入了朝代与开始位点的数据,基于这两个数据完成数据校验、截断对齐,在构建副本组的过程中简化数据一致性逻辑。 + 基于 DledgerController,无需引入 zk、etcd 等外部分布式一致性系统,并且 DledgerController 还可与 NameServer 合并部署,简化运维、节约机器资源。 + RocketMQ 对 DledgerController 是弱依赖,即便 Dledger 整体不可用,也只会影响选主,不影响正常的消息收发流程。 + 可定制,用户可以根据业务对数据可靠性、性能、成本综合选择,比如副本数可以是 2、3、4,副本直接可以是同步复制或异步复制。如 22 模式表示 2 副本并且两个副本的数据同步复制;23 模式表示 3 副本,只要有 2 个副本写成功即认为消息持久化成功。用户还可以将其中的副本部署在异地机房,异步复制实现容灾。如下图: 云原生存储对象存储 上文讲到的存储系统都是 RMQ 面向本地文件系统的实现,在云原生时代,将 RocketMQ 部署到云环境可以进一步利用云原生基础设施,比如云存储来进一步增强 RocketMQ 的存储能力。RocketMQ 5.0 提供了多级存储的特性,是内核级的存储扩展,面向对象存储扩展了对应的 Commitlog、ConsumeQueue 与 IndexFile。且采用了插件化的设计,多级存储可以有多种实现,在阿里云上基于 OSS 对象服务实现,在 AWS 上则可以面向 S3 的接口来实现。 通过引入了云原生的存储,RocketMQ 释放了很多红利。 第一个是无限存储能力,消息存储空间不受本地磁盘空间的限制,原来是保存几天,现在可以几个月、甚至存一年。另外对象存储也是业界成本最低的存储系统,特别适合冷数据存储。 第二个是 Topic 的 TTL,原来多个 Topic 的生命周期是和 Commitlog 绑定,统一的保留时间。现在每个 Topic 都会使用独立的对象存储 Commitlog 文件,可以有独立的 TTL。 第三个是存储系统进一步的存算分离,能把存储吞吐量的弹性和存储空间的弹性分离。 第四个是冷热数据隔离,分离了冷热数据的读链路,能大幅度提升冷读性能,不会影响在线业务。 总结 + RocketMQ 整体架构: + RocketMQ 负载均衡:AP 优先、分合模式、横向扩展、负载粒度; + RocketMQ 存储设计:存储引擎、高可用、云存储。
#技术探索

2025年7月22日

ApsaraMQ Serverless 能力再升级,事件驱动架构赋能 AI 应用
_本文整理于 2024 年云栖大会阿里云智能集团高级技术专家金吉祥(牟羽)带来的主题演讲《ApsaraMQ Serverless 能力再升级,事件驱动架构赋能 AI 应用》_ 云消息队列 ApsaraMQ 全系列产品 Serverless 化,支持按量付费、自适应弹性、跨可用区容灾,帮助客户降低使用和维护成本,专注业务创新。那 ApsaraMQ 是如何一步一步完成 Serverless 能力升级的?在智能化时代,我们的事件驱动架构又是如何拥抱 AI、赋能 AI 的? 本次分享将从以下四个方面展开: + 首先,回顾 ApsaraMQ Serverless 化的完整历程,即 ApsaraMQ 从阿里内部诞生、开源,到捐赠给 Apache 进行孵化,再到完成 Serverless 化升级的不断突破、与时俱进的过程。 + 然后,重点介绍 ApsaraMQ 的存算分离架构,这是全面 Serverless 化进程中不可或缺的前提。 + 接下来,会从技术层面重点解读近一年 ApsaraMQ Serverless 能力的演进升级,包括弹性能力的升级、基于 RDMA 进一步降低存储和计算层之间的 CPU 开销,以及对无感扩缩容的优化。 + 最后,介绍我们在面向 AI 原生应用的事件驱动架构上的探索,以及基于阿里云事件总线定向更新向量数据库,为 AI 应用融入实时最新数据的最佳实践。 ApsaraMQ Serverless 化历程 首先,我们来看 ApsaraMQ Serverless 化的整个发展历程。 + 2012 年,RocketMQ 在阿里巴巴内部诞生,并将代码在 Github 上开源; + 2016 年,云消息队列 RocketMQ 版开启商业化的同时,阿里云将 RocketMQ 捐赠给了 Apache,进入孵化期; + 2017 年,RocketMQ 以较短的时间孵化成为 Apache TLP,并快速迭代新功能特性,顺利发布 4.0 版本,支持了顺序、事务、定时等特殊类型的消息; + 2018 年,随着大数据、互联网技术的发展,数据量爆发式增长,云消息队列 Kafka 版商业化发布; + 2019 年,云消息队列 RabbitMQ 版、云消息队列 MQTT 版两款产品商业化发布,补齐了 ApsaraMQ 在 AMQP、MQTT 协议适配上的缺失; + 2021 年,经过一段时间的孵化,ApsaraMQ 家族中的另一款产品事件总线 EventBridge 开始公测,开始融合事件、消息、流处理; + 2023 年,ApsaraMQ 全系列产品依托存算分离架构,完成 Serverless 化升级,打造事件、消息、流一体的融合型消息处理平台; + 今年,我们专注提升核心技术能力,包括秒级弹性、无感发布、计算存储层之间的 RDMA 等,实现 Serverless 能力的进一步升级,并结合当下客户需求,通过事件驱动架构赋能 AI 原生应用。 存算分离架构全景 第二部分,我们来看 ApsaraMQ 存算分离架构的全景,这是 Serverless 化升级不可或缺的前序准备。 ApsaraMQ 存算分离架构全景:云原生时代的选择 ApsaraMQ 的存算分离架构,是云原生时代的选择。 + 从下往上看,这套架构是完全构建在云原生基础之上的,整个架构 K8s 化,充分利用 IaaS 层的资源弹性能力,同时也基于 OpenTelemetry 的标准实现了metrics、tracing 和 logging 的标准化。 + 再往上一层是基于阿里云飞天盘古构建的存储层,存储节点身份对等,Leaderless 化,副本数量灵活选择,可以在降低存储成本和保证消息可靠性之间实现较好的平衡。 + 再往上一层是在本次架构升级中独立抽出来的计算层,即无状态消息代理层,负责访问控制、模型抽象、协议适配、消息治理等。比较关键的点是,它是无状态的,可独立于存储层进行弹性。 + 在用户接入层,我们基于云原生的通信标准 gRPC 开发了一个全新的轻量级 SDK,与之前的富客户端形成了很好的互补。 “Proxy” 需要代理什么? 接下来我们重点看下这套架构里的核心点,即独立抽出来的 Proxy。 它是一层代理,那到底需要代理什么呢? 在之前的架构中,客户端与 Broker/NameServer 是直连模式,我们需要在中间插入一个代理层,由原先的二层变成三层。然后,将原先客户端侧部分业务逻辑下移,Broker、Namesrv 的部分业务逻辑上移,至中间的代理层,并始终坚持一个原则:往代理层迁移的能力必须是无状态的。 从这张图中,我们将原先客户端里面比较重的负载均衡、流量治理(小黑屋机制)以及 Push/Pull 的消费模型下移至了 Proxy,将原先 Broker 和 Namesrv 侧的访问控制(ACL)、客户端治理、消息路由等无状态能力上移至了 Proxy。然后在 Proxy 上进行模块化设计,抽象出访问控制、多协议适配、通用业务能力、流量治理以及通用的可观测性。 ApsaraMQ 存算分离的技术架构 接下来看 ApsaraMQ 存算分离的技术架构,在原先的架构基础上剥离出了无状态Proxy 组件,承接客户端侧所有的请求和流量;将无状态 Broker,即消息存储引擎和共享存储层进行剥离,让存储引擎的职责更加聚焦和收敛的同时,充分发挥共享存储层的冷热数据分离、跨可用区容灾的核心优势。 这个架构中无状态 Proxy 承担的职责包括: 1. 多协议适配:能够识别多种协议的请求,包括 remoting、gRPC 以及 Http 等协议,然后统一翻译成 Proxy 到 Broker、Namesrv 的 remoting 协议。 2. 流量治理、分发:Proxy 具备按照不同维度去识别客户端流量的能力,然后根据分发规则将客户端的流量导到后端正确的 Broker 集群上。 3. 通用业务能力扩展:包含但不限于访问控制、消息的 Tracing 以及可观测性等。 4. Topic 路由代理:Proxy 还代理了 Namesrv 的 TopicRoute 功能,客户端可以向 Proxy 问询某个 topic 的详细路由信息。 5. 消费模型升级:使用 Pop 模式来避免单客户端消费卡住导致消息堆积的历史问题。 无状态 Broker,即消息存储引擎的职责更加的聚焦和收敛,包括: 1. 核心存储能力的迭代:专注消息存储能力的演进,包括消息缓存、索引的构建、消费状态的维护以及高可用的切换等。 2. 存储模型的抽象:负责冷热存储的模型统一抽象。 共享存储为无状态 Broker 交付了核心的消息读写的能力,包括: 1. 热存储 EBS:热存储 EBS,提供高性能、高可用存储能力。 2. 冷存储 OSS:将冷数据卸载到对象存储 OSS,获取无限低成本存储空间。 3. 跨可用区容灾:基于 OSS 的同城冗余、Regional EBS 的核心能力构建跨可用区容灾,交付一写多读的消息存储能力。 基于云存储的存算分离架构,兼顾消息可靠性和成本 存算分离架构中的存储层是完全构建在阿里云飞天盘古存储体系之上的。基于这套架构,我们能够灵活控制消息的副本数量,在保证消息可靠性和降低存储成本之间做到一个比较好的平衡。 左图是存算分离存储架构中上传和读取的流程。可以看到,我们是在 CommitLog 异步构建 consumeQueue 和 Index 的过程中额外添加了按照 topic 拆分上传到对象存储的过程;在读取过程中智能识别读取消息的存储 Level,然后进行定向化读取。 基于云存储的架构,我们提供了 ApsaraMQ 的核心能力,包括: 1. 超长时间定时消息:超过一定时间的定时消息所在的时间轮会保存至对象存储上,快到期时时间轮再拉回到本地进行秒级精准定时。 2. 集群缩容自动接管:消息数据实时同步到对象存储,对象存储的数据能够动态挂载到任意 Broker,实现彻底存算分离,Broker 的无状态化。 3. 按需设置 TTL 成本优化:支持按需设置消息 TTL,不同重要程度的消息可设置不同的 TTL,满足消息保存时长需求的同时兼顾成本控制。 4. 冷热消息分离、分段预取:热数据的读取命中本地存储,速度快;冷数据的读取命中远端存储,速率恒定且不会影响热数据的读取。 5. 动态调控云存储的比例:动态调控 EBS 和 OSS 的比例,大比例的 EBS 能够具备更高的稳定性,应对 OSS 负载过高的背压,提升 EBS 作为 OSS 的前置容灾的能力;小比例的 EBS 则是可以最大化降低消息单位存储成本,让整体的存储成本趋同于 OSS 存储成本。 Serverless 能力再升级 有了存算分离架构的铺垫,ApsaraMQ 全系列产品 Serverless 化就更加顺畅了。接下来展开介绍 ApsaraMQ Serverless 能力的升级。 消息场景下的 Serverless 化 在消息场景下通常会有两个角色:消息服务的使用方和提供方。 在传统架构中通常是这样的流程:使用方会给提供方提需求:我要大促了,你保障下;提供方说给你部署 10 台够不够,使用方说好的;结果真到大促那一刻,真实流量比预估的要大很多倍,整个消息服务被击穿,硬生生挂了半小时。 在 Serverless 化改造前,仍需提前规划容量;但相比传统架构的提升点是,依托 IaaS 层的快速扩容能力,能够极大缩短扩容时间;再演进到当前的 Serverless 架构,我们期望消息服务提供方是能够非常淡定地应对大促。 Serverless 现在已经成为了一个趋势和云的发展方向。ApsaraMQ 全线产品实现弹性灵活的 Serverless 架构,既彰显了技术架构的先进性,也提升了客户的安全感。业务部门减少了容量评估的沟通成本,用多少付多少,更省成本;运维部门免去了容量规划,实现自动弹性扩缩,降低运维人员投入的同时,大大提升了资源的利用率。 Serverless 方案的 Why / What / How Serverless 化预期达到的收益是:省心——秒级弹性,免容量规划;省钱——用多少付多少;省力——少运维、免运维。 要解决的痛点是:用户使用容量模型比较难做精准预估;运维侧手动扩容,是一个非常耗时耗力的过程。 核心目标是: + 弹性要快:特别是针对一些脉冲型的秒杀业务,需要具备秒级万 TPS 的弹性能力。 + 缩容无感:应对 MQ 客户端与服务侧 TCP 长连接的特性,缩容、服务端发布时要无感。 + 成本要低:极致的性能优化,才能进一步降低用户侧的单位 TPS 成本。 通过如下几个关键路径构建 ApsaraMQ Serverless 核心能力: + 多租、安全隔离、业务流量隔离:是构建 Serverless 核心能力的基础。 + 物理预弹&逻辑弹性:物理预弹和逻辑弹性结合的极致弹性方案,通过镜像加速、元数据批量创建、主动路由更新等核心技术加速物理弹性,结合逻辑弹性最终交付秒级万 TPS 的极致弹性能力。 + RDMA:在存储和计算层之间引入 RDMA 技术,充分利用 RDMA 的零拷贝、内核旁路、CPU 卸载等特性进一步降低计算层与存储层之间的通信开销。 + 优雅上下线:依托 gRPC 协议的 GOAWAY 以及 Remoting 协议的扩展实现 TCP 长连接的优雅上下线。 + 控制资源水位:通过智能化的集群流量调度以及平滑 Topic 迁移方案,进一步控制单个集群的资源水位在一个合理的值。 这套 Serverless 方案可以同时满足如下几种场景: + 第一种是平稳型:流量上升到一定水位后会平稳运行。 + 第二种是稳中有“进”型:流量平稳运行的同时,偶尔会有一些小脉冲。 + 第三种是周期性“脉冲型”:流量会周期性地变化。 计算、存储与网络:充分利用云的弹性能力 我们前面也有提到,这套架构是完全构建在云原生基础设施之上的,我们在计算、存储、网络上都充分利用了云的弹性能力,具体来看: + 在计算侧,通过 K8s 充分利用 ECS 的弹性能力,通过弹性资源池、HPA 等核心技术支持计算层的快速弹性,并通过跨可用区部署提升可用性。 + 在存储侧,充分利用飞天盘古存储的弹性能力,支持自定义消息的存储时长,冷热数据分离,额外保障冷读的 SLA。 + 在网络侧,公网随开随用,安全和方便兼具,支持多种私网形态接入,并基于 CEN 构建全球互通的消息网络。 在这之上,结合 SRE 平台的智能集群流量调度、集群水位监控、物理预弹性等核心能力,最终交付了一套完整的 ApsaraMQ Serverless 化物理弹性能力。 秒级万 TPS 的极致弹性能力保障 依托于上面的基础物理资源的弹性能力,来看下我们是如何保障秒级万 TPS 的极致弹性能力的? 从两个维度来看用户视角的弹性: + 从限流维度看:在无损弹性上限以内的 TPS,都不会触发限流;超过无损弹性 TPS 上限后,会有秒级别的限流,通过逻辑弹性调整实例级别的限流阈值后,即可实现最终的 TPS 弹性。 + 从付费角度看:在预留规格内按规格进行预付费;超过预留规格的上限 TPS,超过部分按量付费,即用多少付多少。 为了满足用户视角的秒级弹性能力,我们通过物理弹性和逻辑弹性相结合的方式来进行保障: + 物理弹性是预弹的机制,弹性时间在分钟级,用户侧无任何感知,由服务侧来 Cover 成本。 + 逻辑弹性是实时生效的,弹性时间在秒级,同时在触发无损弹性 TPS 上限时,用户侧是会有秒级别的限流感知的,另一方面,用户是需要为弹出来的那部分流量进行按量付费的。 两者的关系是:物理弹性是逻辑弹性的支撑,逻辑弹性依赖物理弹性。从弹性流程上看,当用户的流量触发无损弹性上限时,优先判断物理资源是否充足,充足的话就进行秒级逻辑弹性,不充足的话就等待物理资源扩容后再进行逻辑弹性。当然这里会有个预弹的机制,尽量保障逻辑弹性时物理资源都是充足的。 从物理弹性加速来看,通过以下几个方面进行加速: + 计算、存储层按需弹性:计算层更关注 CPU、客户端连接数等核心指标;存储层更关注内存、磁盘 IO 等性能指标。 + 镜像下载加速:通过 PlaceHolder + 镜像缓存组件加速新节点的扩容。 + 新增元数据批量创建的机制:新增存储节点创建 5000 级别的 Topic 下降至 3 秒。 + 新增主动路由更新机制:降低存储节点物理扩容后承接读写流量的延迟。 我们的系统设计精密,旨在确保用户体验到极致的弹性能力,特别是实现每秒万次事务处理(TPS)的秒级弹性扩展。这一能力的保障策略围绕两个核心维度展开,并深度融合物理与逻辑弹性机制,确保在高吞吐需求下的无缝响应与成本效率。 ApsaraMQ on RDMA:降低计算与存储层之间通信开销 RDMA(全称远程内存直接访问)是一种高性能的网络通信技术,相比 TCP/IP 的网络模式,具有零拷贝、内核旁路、CPU 卸载等核心优势。ApsaraMQ Serverless 化架构具备存算分离的特点,非常适合在计算层和存储层之间引入 RDMA 技术,进一步降低内部组件之间的数据通信开销。 具体来看,计算层 Proxy 在接收到客户端各种协议的消息收发请求以后,通过内置的 Remoting Client 和存储层 Broker 进行数据交换。在原先的 TCP/IP 网络模式中,这些数据是需要从操作系统的用户态拷贝到内核态后,再经由网卡和对端进行交互。依托 RDMA 内核旁路特性,通过实现 RdmaEventLoop 的适配器,消息直接由用户态到 RDMA 网卡和对端进行交互。 从最终 BenchMark 的测试效果来看,引入 RDMA 技术后,存储层 Broker 的 CPU 资源消耗下降了 26.7%,计算层 Proxy 的 CPU 资源消耗下降了 10.1%,计算到存储的发送 RT 下降了 23.8%。 优雅上下线:ApsaraMQ Serverless 弹性如丝般顺滑 在 Serverless 场景下,物理资源的水平弹性扩缩是一个常态化的过程,同时结合 MQ 客户端和计算层 Proxy TCP 长连接的特点,在 Proxy 上下线过程中是比较容易出现连接断开的感知,比如客户端刚发出一个接收消息的请求,连接就被服务侧强行关闭了,是非常容易造成单次请求超时的异常的。 因此,我们通过 gRPC 协议 Http2 的 Go Away 机制以及 Remoting 协议层的扩展,结合 SLB 连接优雅终端的能力来实现 ApsaraMQ 在扩容态、缩容态、以及发布态的无感。 右图展示了缩容态下单台 Proxy 优雅下线的过程: 1. 当收到 Proxy0 需要下线的指令后,SLB 会将 Proxy0 摘除,保障新的连接不再建立到 Proxy0 上,同时 Proxy0 进入 Draining 状态,旧连接进行会话保持。 2. Proxy0 上收到新的请求后,返回的 Response Code 都更新为 Go Away;同时客户单收到 Go Away 的 Response 后,断开原先的连接,向 SLB 发起新建连接的请求。 3. SLB 收到新建连接的请求,会导流至 Proxy1。 4. 等待一段时间后 Proxy0 上的连接会自然消亡,最终达到无感下线的目的。 事件驱动架构赋能 AI 应用 接下来,将阐述面向 AI 原生应用的事件驱动架构如何拥抱 AI,赋能 AI 应用蓬勃发展。 + 面向 AI 应用的实时处理,具备实时 Embedding 至向量数据库、更新私有数据存储的能力,全面提升 AI 应用实时性、个性化和准确度。 + 面向 AI 应用的异步解耦,解耦 AI 推理链路的快慢服务,加快应用响应速度。 + 面向 AI 应用的消息负载,ApsaraMQ 支持主动 Pop 消费模式,允许动态设置每一条消息的个性化消费时长. 同时也支持优先级队列,满足下游多个大模型服务的优先级调度。 + 面向 AI 应用的消息弹性,ApsaraMQ 提供全模式的 Serverless 弹性能力,支持纯按量和预留+弹性、定时弹性等多种流量配置模型; 最后,让我们来看下阿里云事件总线 EventBridge 是如何实现数据实时向量化,提升 AI 应用的实时性和准确度的? 阿里云事件总线的 Event Streaming 事件流处理框架,具备监听多样化数据源,然后经过多次 Transform 之后再投递给下游数据目标的原子能力;依托这个框架,我们是很容易去实现数据的实时向量化的,从 RocketMQ、Kafka、OSS 等多个源监听到实时数据流入之后,经过文本化、切片、向量化等操作,最终存入向量数据库,作为 AI 应用实时问答的多维度数据检索的依据,最终提升了 AI 应用的实时性和准确度。
#技术探索