2025年12月16日

AgentScope x RocketMQ:打造企业级高可靠 A2A 智能体通信基座
引言 Agentic AI 时代已至,在智能客服、代码生成、流程自动化等场景中,多智能体(MultiAgent)协作正从构想走向落地。然而,当多个 Agent 需要像一个团队那样高效协作时,脆弱的通信机制可能因网络抖动或服务宕机,就让整个系统瞬间瘫痪,导致昂贵的计算任务失败、会话状态丢失。 如何为这些聪明的“数字员工”们构建一个真正可靠、高效的通信基座? 本文将为您介绍 Apache RocketMQ 全新推出的轻量级通信模型 LiteTopic,如何在 AI 应用场景中有效简化系统架构、提升稳定性与可靠性,并结合 A2A(AgenttoAgent)协议与阿里巴巴 AgentScope 框架的生产实践案例,深入剖析面向智能体通信的落地实践与技术实现。 RocketMQ for AI:重新定义 AI 应用通信范式 1.1 传统应用:单向、无反馈的事件驱动模式 在传统应用的事件驱动场景中,业务逻辑编排通常由人工预先约定,消息生产方成功发送消息后,便无需关注后续的处理逻辑。 下图以注册系统为例:用户发起账户注册请求后,注册系统向 RocketMQ 发送“新用户注册”的消息后便立即返回,无需关心下游的邮件或短信通知系统如何处理。邮件或短信通知系统再分别从 RocketMQ 拉取消息,驱动各自的发送流程。整条业务链路为单向、无反馈的事件驱动模式。 1.2 从单向事件到双向交互:AI 应用对通信提出新挑战 在 AI 应用场景中,业务逻辑编排通常由大模型动态生成,消息生产方需等待并处理响应结果,才能驱动后续的逻辑执行。 下图以典型的 AI 会话场景为例:用户所连接的 Gateway 不仅需要发送请求,还需要处理推理响应结果,并将结果推送给浏览器,形成完整的交互闭环。 结合真实 AI 应用场景的深度调研,我们发现 AI 场景具有四个显著特征,对底层通信模式提出了全新且严苛的挑战: + 更长的响应时间:传统互联网应用追求毫秒级响应延时,而 AI 应用的响应时长普遍达到分钟级以上。更关键的是,AI 应用单次业务的运行时间具有高度不可预测性。 + 更复杂的交互:AI 应用的多轮对话持续时间长,对话历史可达数十轮甚至更多。单次上下文传输可能达到几十甚至上百 MB,上下文管理难度高。多 Agent 之间的协同编排逻辑更加复杂,需要精确的状态同步。 + 更昂贵的计算资源:AI 推理依赖昂贵的 GPU 资源,瞬时高并发流量可能冲击推理服务稳定性,导致算力资源浪费,并且任务失败重试的成本极高。 + 更精细化的事件驱动:由于计算能力有限,异步事件驱动需要更精准的消费速度控制。同时,必须实现分级的事件驱动策略,以确保高优先级任务优先获得宝贵的计算资源。 1.3 RocketMQ LiteTopic:专为 AI 场景设计的通信模型 为了应对上述挑战,Apache RocketMQ 推出了以轻量级通信模型 LiteTopic 为核心的一系列新特性: + 轻量级通信模型 —— 为海量会话而生 其核心是百万级轻量资源管理能力。基于极低的资源动态创建开销,可轻松支持海量会话(Session)场景,并提供更细粒度的订阅管理,适用于长时 Session、AI 工作流和 AgenttoAgent 交互等场景。 + 企业级上下文管理 —— 让会话状态可靠持久 以连续的消息流完整保存 Session 上下文,通过顺序保障、排他消费等机制严格确保上下文的完整性与一致性。同时原生支持大消息体(数十 MB 甚至更大),轻松满足 AI 场景下庞大数据负载的传输需求。 1.4 LiteTopic 技术解析:百万队列支撑海量并发会话 LiteTopic 基于 RocketMQ 业界领先的百万队列核心技术构建,其底层本质是独立的 Queue。 + 它为每个独立会话(Session)创建一个专属的、低成本的“私有通道”——即轻量主题(LiteTopic),从而能够以极低的资源开销支撑海量并发会话的需求。 + 轻量级的 LiteTopic 在消息分配与发送行为上与顺序 Topic 一致(其所属 Queue 由单一 Broker 独占,消息始终路由至该 Broker,而非在多个 Broker 间轮询发送),这种设计天然确保了消息的严格顺序性,并极大降低了资源管理和路由的复杂度。 1.4.1 LiteConsumer 支持单节点粒度的订阅关系管理 与传统消息队列中“同一 Consumer Group ID(CID)必须全局一致订阅相同 Topic”的强约束不同,LiteConsumer 创新性地支持 CID 内各节点按需进行差异化订阅。每个节点可根据实际负载、业务场景或运行时需求,独立订阅不同的 LiteTopic,从而构建更加灵活、弹性的消费拓扑。 这一机制从根本上规避了因订阅关系不一致所引发的消费异常、重复消费或 Rebalance 风暴等问题,显著提升了系统的灵活性、可扩展性与稳定性。同时,它更契合 AI 时代轻量、动态、点对点的交互模式,为构建轻量级请求响应式消息收发模型提供了原生支持。 1.4.2 LiteConsumer 的核心能力 + 多节点差异化订阅:同一 CID 下的不同节点可独立订阅各自的 LiteTopic,实现细粒度、个性化的订阅策略。 + 动态订阅扩展:支持在运行时实时为单个节点新增 LiteTopic 订阅,无需重启服务或影响其他节点的正常消费。 + 动态退订能力:支持在运行时实时取消单个节点对特定 LiteTopic 的订阅,实现精准的资源释放与流量治理。 1.5 生产案例:RocketMQ LiteTopic 如何重塑 AI 应用架构? 以下案例基于某客户真实的 AI 应用场景,通过架构对比直观展示采用传统 RocketMQ 通信模型与引入 LiteTopic 轻量级通信模型前后的显著差异。 采用 RocketMQ LiteTopic 轻量级通信模型后,客户架构实现了质的提升:不仅彻底移除了对 Redis 的依赖,还避免了广播推送带来的带宽与计算资源浪费。整体架构更轻量,系统稳定性与可靠性也得到显著提升。 1.5.1 改造前:依赖 Redis + 广播的臃肿架构 整体的业务流程步骤如下: 1. 任务提交:用户请求到达后,应用接入层节点将推理任务写入 Redis。 2. 任务处理:Worker 集群扫描 Redis 并处理推理任务,将推理过程中的中间结果以多条顺序消息的形式发送至 RocketMQ。 3. 结果持久化与通知:Consumer 集群顺序消费 RocketMQ 消息,将最终推理结果存入 Redis,并基于 RocketMQ 广播通知所有应用接入层节点。 4. 结果推送:应用接入层节点收到广播消息后,仅当结果归属于自身连接时,才从 Redis 获取完整结果并推送给客户端;否则直接忽略该消息。 传统架构采用“先存储、再广播、后过滤”的模式,在高并发 AI 场景下效率低下且成本高昂: + 架构臃肿且脆弱:强依赖外部组件 Redis,增加了系统的复杂度和潜在故障点,运维成本高,可用性受限。 + 资源浪费严重:无效的广播机制导致大量带宽被占用,且每个应用接入层节点都需进行计算密集型的过滤操作。 + 链路冗长低效:数据流转需多次读写 Redis,通信链路长、延迟高,应用接入层节点宕机后会话状态将全部丢失,严重影响用户体验。 1.5.2 改造后:基于 RocketMQ LiteTopic 的极简可靠架构 引入 LiteTopic 后,业务流程被大幅简化,实现了端到端的可靠、高效通信: 1. 会话绑定与动态订阅:应用接入层节点在发起推理请求时携带唯一身份标识(如 Session ID),并立即订阅该标识对应的 LiteTopic(无需预创建 consumer group、topic)。 2. 结果持久化发送:智能应用(Worker)根据请求中的身份标识,将推理结果直接发送至对应的 LiteTopic(同样无需预创建)。 3. 精准接收消费:应用接入层节点各自精准接收属于自己的 response 消息,无需过滤,无任何冗余消费。 1.5.3 核心价值:为 AI 会话注入“记忆”,实现断点续传与恢复 客户接入 LiteTopic 轻量级通信模型后,通过将 LiteTopic 与 Session 维度进行细粒度绑定,以极低成本实现了生产级的会话续传与恢复能力。 在按照上一小节的流程实现端到端的可靠通信后,在网关机器下线/宕机时: + 自动重连:客户端检测到连接断开后,自动发起重连请求。 + 动态订阅:新接管的应用接入层节点实例根据 Session ID,动态订阅原 session 对应的 LiteTopic(无需预创建)。 + 断点续传:新应用接入层节点从上次成功消费的 Offset 位点开始拉取消息,精准恢复到故障前的状态(不会丢消息,也不会重复消费已处理的消息)。 + 恢复会话:自动恢复 Session 的完整上下文,用户完全无感知,业务流程无缝衔接。 基于 RocketMQ LiteTopic 打造企业级 Session 管理 2.1 AI 场景下 Session 的四大核心要求 在 AI 应用场景下,业界对 Session 的特性提出了以下四项核心要求: + 低延迟:面向实时交互场景,要求快速响应。 + 时序性:必须严格按对话时间顺序组织内容,确保上下文的连续性与逻辑一致性。 + 单会话隔离:保障不同用户/会话间的数据隔离,避免消息串话或状态混淆。 + 上下文压缩:支持通过截断或摘要控制上下文长度,避免超出模型窗口限制导致溢出。 2.2 RocketMQ LiteTopic 实现 Session 的四大优势 基于 RocketMQ LiteTopic 实现 Session 的核心价值,在于将“Session”从内存易失状态转化为可持久、可追溯、可恢复的事件流,为多智能体系统提供企业级会话韧性,彻底解决传统架构中会话状态丢失、无法恢复等痛点。 1. 会话状态持久化 —— 进程重启不丢会话 消息天然持久化存储于 CommitLog,即使应用宕机或网络中断,也能通过消息重放完整重建会话上下文(如对话历史、任务状态、中间结果)。 如下图所示,应用 A 将响应输出的 TaskEvent / TaskUpdateEvent 转换为 RocketMQ LiteTopic 中存储的消息(Message)。当应用 A 重启后,可从 CommitLog 中重放所有消息,完整恢复会话状态。 2. 消息回溯与重放 —— 断点精准恢复 支持按时间 / Offset 回溯消费,应用重启后可从断点精确恢复会话,实现无缝续聊与任务接力,避免重复推理带来的算力浪费。 当应用宕机后重新启动,可以指定某个 Session(LiteTopic)中的具体位点开始继续消费,或从上次消费成功的位点开始消费。 3. Session 隔离与路由 —— 多会话并行无干扰 通过轻量级 LiteTopic 实现会话级隔离(如 Session ID 作为 LiteTopic 的唯一标识),确保多用户/多会话并行运行时互不干扰。 多用户多 Session 的消息存储于不同的 LiteTopic,在数据存储维度实现天然隔离,无需应用层手动过滤。 4. 流量削峰与缓冲 —— 保护下游应用稳定性 高并发会话请求被缓冲至 Broker,避免下游 Agent 瞬时过载崩溃,提升系统整体稳定性。下游应用根据自身处理能力按需消费消息,实现“削峰填谷”。 如下图所示,应用 A 发出的任务请求可在 Broker 中持久化堆积,下游应用 B 根据自身消费能力按需拉取并处理,有效保障系统稳定性。 基于 RocketMQ 构建高可靠 A2A 通信通道 在上一章,我们解决了单个会话的持久化与恢复问题。现在,让我们将视野放大:当成百上千个功能各异的 Agent 需要协作时,它们之间如何建立标准化的通信?这正是 A2A 协议诞生的意义所在。 3.1 A2A 协议 AgenttoAgent(简称 A2A)是一项由 Google 于 2025 年发起,并贡献至 Linux 基金会的开源通信协议。其核心目标是建立跨厂商、跨框架的标准化互操作机制,使异构 AI 智能体(Agents)能够自动发现、可靠通信并高效协作,从而构建开放、可组合、可扩展的多智能体系统生态。 3.2 单智能体 vs. 多智能体架构:能力边界与协同范式的演进 在深入探讨如何构建 A2A 通信之前,我们首先需要理解,为什么多智能体协同是必然趋势。我们从六个维度对比单智能体与多智能体的能力差异: 3.3 同步 RPC 与 RocketMQ 异步通信的对比 明确了多智能体架构的优势后,下一个关键问题是:如何实现 Agent 之间的通信? A2A 协议原生支持的同步 RPC 协议包括 JSONRPC、gRPC 和 REST。然而,在企业级的复杂场景下,这些同步协议面临诸多挑战。下表从多个维度对比同步 RPC 与 RocketMQ 异步通信模型的差异: 3.4 开箱即用:基于 RocketMQ 的 A2A 协议实现 为加速 A2A 协议在异步通信场景的落地,我们基于 RocketMQ SDK 实现了 A2A 协议的 ClientTransport 接口。该实现旨在帮助用户在搭建多智能体应用时,能够专注于自身业务逻辑,快速构建高可靠、开箱即用的 A2A 通信方案。 发送普通同步请求:EventKind sendMessage(MessageSendParams request, @Nullable ClientCallContext context)发送Stream请求:void sendMessageStreaming(MessageSendParams request, Consumer eventConsumer…)重订订阅任务数据:void resubscribe(TaskIdParams request, Consumer eventConsumer, Consumer errorConsumer查询任务完成状态:Task getTask(TaskQueryParams request, @Nullable ClientCallContext context)取消任务执行Task cancelTask(TaskIdParams request, @Nullable ClientCallContext context)以及其他方法 开源项目地址 基于 RocketMQ 实现的 A2A 通信 RocketMQTransport 部分代码现已开源,欢迎关注! 项目地址:_https://github.com/apache/rocketmqa2a_ 3.5 架构解析:如何通过 RocketMQ 实现 Agent 间通信? 在一个典型的多智能体协作架构中,通信流程如下: + 应用 A 扮演 Supervisor 角色,负责对用户输入的需求进行任务分解,并将拆分后的子任务分别发送至应用 B 的业务 Topic(Normal Topic1)和应用 C 的业务 Topic(Normal Topic2)。 + 应用 B 集群从 Normal Topic1 拉取消息并执行相应逻辑处理,随后将结果发布到应用 A 订阅的 LiteTopic。 + 应用 C 集群则从 Normal Topic2 拉取消息进行处理,并同样将结果写入该 LiteTopic。 + 应用 A 集群通过拉取 LiteTopic 中的消息,汇聚各子任务的响应结果,进而驱动后续的业务逻辑编排。 AgentScope × RocketMQ:构建多智能体应用的最佳组合 理论与架构已经铺垫完毕,接下来,让我们结合一个完整的实战案例,看看如何将这套强大的通信基座,与顶尖的智能体开发框架 AgentScope 相结合,构建一个真正可用的多智能体应用。 4.1 AgentScope:面向多智能体的开发者友好框架 AgentScope 是阿里巴巴继 AI 模型社区 ModelScope 后,在 Agent 领域的又一战略级开源力作。它以“开发者为中心”,专注于提供智能体开发的开源框架,为构建复杂的多智能体应用提供了从设计、开发到调试的全套解决方案。它具备以下核心优势: + 对开发者透明:拒绝隐式魔法,所有环节(提示、API、智能体、工作流)可见、可控。 + 实时可介入:原生支持运行时中断与自定义处理。 + 更智能:内置工具管理、长期记忆、智能 RAG 等能力。 + 模型无关:一次编写,无缝适配各类大模型。 + 乐高式构建:模块化设计,组件解耦、自由组合。 + 面向多智能体:显式消息传递与工作流编排,专为协作场景打造。 + 高度可定制:全面开放工具、提示、智能体、工作流及可视化扩展,鼓励深度定制。 4.2 AgentScope x RocketMQ 的集成架构与合作展望 在明确了 AgentScope 的功能定位与应用价值之后,我们将进一步探讨其通信层与 RocketMQ 的现有集成机制,并展望双方在技术协同与生态共建方面的未来合作方向。 4.2.1 AgentScope 与 RocketMQ 集成架构 当 AgentScope 作为 Agent 应用服务提供者时,其内部支持符合 A2A(AgenttoAgent)协议的多种通信方式,包括基于 JSONRPC 的 WebService 和 RocketMQ Service,用于接收并处理来自其他 Agent 的 A2A 协议请求。同时,AgentScope 通过 wellknown 服务接口向外标准化地透出其所承载 Agent 的核心能力信息,包括但不限于: name(名称) description(描述) capabilities(能力列表) additionalInterfaces(额外支持的接口或协议) 这些元数据使调用方能够清晰识别该 Agent 提供的主要功能、所支持的通信协议及其对应的接入方式。 当 AgentScope 作为 Agent 应用服务的调用者时,它首先通过访问目标 Agent 暴露的 wellknown 服务,动态获取其详细的能力描述、支持的协议类型及对应的服务接入点(如 JSONRPC 端点或 RocketMQ Topic 信息)。随后,在通信层,AgentScope 利用 A2A 协议定义的传输客户端(如 JSONRPCTransport 或 RocketMQTransport)发起请求,并对返回的响应结果进行统一解析与处理,从而实现跨 Agent 的标准化、可互操作的协同调用。 1. 基于 RocketMQ 协议通信架构图 2. 基于 JSONRPC 协议通信架构图 4.2.2 合作展望 随着人工智能与分布式系统技术的深度融合,消息中间件与智能体(Agent)平台的协同正成为构建下一代智能分布式应用的关键路径。作为 Apache 软件基金会顶级项目,RocketMQ 凭借高吞吐、低延迟和高可靠等核心特性,已成为全球广泛采用的分布式消息队列,在金融、电商、物联网等关键领域积累了深厚的技术积淀,并于近期推出了轻量级通信模型 LiteTopic,进一步拓展了其在 AI 应用场景中的适用性。与此同时,AgentScope 作为新兴的智能体编排与运行平台,专注于为多智能体系统提供统一的调度、通信与治理能力。二者在技术理念与应用场景上高度契合,展现出广阔的合作前景与协同创新潜力。 1. 技术互补:构建“消息驱动 + 智能决策”的新型架构 RocketMQ 提供了强大的异步通信、事件驱动和流式处理能力。AgentScope 则聚焦于智能体生命周期管理、任务分解、上下文感知与自主协作。未来,二者可深度融合,形成“消息即事件、事件触发智能体行为”的新型架构: + 智能体间通信的标准化通道:利用 RocketMQ 作为 AgentScope 内部或跨集群智能体之间的可靠通信总线,保障高并发、有序、可追溯的消息传递,提升多智能体系统的鲁棒性与扩展性。 2. 生态共建:推动开源与标准协同发展 双方可基于开源社区开展深度合作: + 集成适配器开发:共同开发 RocketMQ 与 AgentScope 的官方集成插件,简化开发者接入流程。 + 联合参考架构发布:推出面向典型场景(如智能客服等场景)的联合解决方案模板,加速行业落地。 + 参与标准制定:在事件驱动架构(EDA)、智能体通信协议等领域协同推进开放标准,促进生态互操作性。 4.3 场景案例:用 AgentScope 与 RocketMQ 打造“智能旅行助手” 本案例以 AgentScope 作为 AI 智能体应用开发框架,构建了三个智能体: + SupervisorAgent(总控):负责与用户交互,任务分解与逻辑编排。 + WeatherAgent(天气专家):负责查询天气信息。 + TravelAgent(旅行专家):负责依据天气进行用户的行程规划。 SupervisorAgent 应用具有如下逻辑: + 如果用户只查询天气情况,则直接请求 WeatherAgent 进行天气信息查询; + 如果用户希望做出行程规划,则先向 WeatherAgent 发出天气查询请求,获取对应天气信息后,再带着天气信息向 TravelAgent 发出行程规划请求,TravelAgent 对行程结果进行规划后将响应结果发送至 SupervisorAgent 订阅的 LiteTopic,SupervisorAgent 应用将结果发送至用户侧。 实战演练:三步构建高可靠多智能体应用 阿里云官网现已提供免费试用、一键部署的《》,带您亲手搭建并运行一个多智能体应用,并基于 RocketMQ LiteTopic 实现多智能体异步通信能力——具备持久化、高可靠、可追溯等特性,显著提升 AI 应用交互的稳定性与可观测性。 5.1 方案概览:技术架构与云资源 本方案将带领您搭建一个多智能体(MultiAgent)系统,能够根据用户的需求查询天气信息并制定行程规划。 为简化部署过程,我们将在 1 台云服务器 ECS 上部署 3 个独立的 Agent(SupervisorAgent,WeatherAgent 和 TravelAgent,具体功能可参考 4.3),并且通过 RocketMQ 消息服务实现 Agent 之间的异步通信。 本方案的技术架构包含构建一个完整多智能体应用所需的所有云资源: 5.2 三步体验:从创建资源到部署 Agent 1. 免费一键部署资源 访问体验方案页面,点击“免费试用”,进入实验操作界面后,点击“立即试用”即可领取免费试用点,自动开始创建资源。 2. 创建 Topic 和 Group 共创建 3 个 Topic,配置参数见下表,其余参数保持默认。 共创建 3 个 Group,配置参数见下表,其余参数保持默认。 3. 创建部署智能体应用 在阿里云百炼的应用管理页面,根据示例文档中提供的模型参数和提示词,分别创建并发布两个智能体应用(天气助手 Agent、行程助手 Agent)。 远程连接云服务器 ECS 根据提供的执行脚本部署示例应用程序。等待应用启动完毕,大约需要 3~5 分钟,直到终端显示 You 提示符,便可直接在终端中输入信息与智能体交互。 5.3 结果验证:任务执行与消息轨迹追踪 1. 在 You 提示符后,输入 帮我做一个下周三到下周日杭州周边自驾游方案 并回车。 2. 等待智能体执行任务,最终会返回结合天气信息的行程规划内容,过程如下: a. SupervisorAgent 接收用户输入,向消息队列发送一条消息 杭州下周三到周日的天气情况怎么样?。 b. WeatherAgent 监听到上述消息,执行天气查询,并将结果发往消息队列。 c. SupervisorAgent 监听到上述消息,获取了天气查询结果,然后向消息队列发送一条消息 杭州下周三至周日天气已知,天气为,请基于此制定一份从杭州出发的周边2人3天4晚自驾游行程规划(下周三出发,周日返回),包含住宿、餐饮与景点推荐。 d. TravelAgent 监听到上述消息,执行行程规划,并将结果发往消息队列。 3. 查看消息轨迹:在云消息队列 RocketMQ 版实例详情页,可以按 Topic 或按 LiteTopic 查询到相关的消息轨迹。 目前,该解决方案已在阿里云官网上线,欢迎点击即可部署体验~ 邀请您钉钉扫码加入 RocketMQ for AI 用户交流群,探索更多产品功能与应用场景,与我们共建 AI MQ 的未来!
#技术探索

2025年12月1日

打造你的专属 AI 导游:基于 RocketMQ 的多智能体异步通信实战
前言 在现代 AI 应用中,多智能体(MultiAgent)系统已成为解决复杂问题的关键架构。然而,随着智能体数量增多和任务复杂度提升,传统的同步通信模式逐渐暴露出级联阻塞、资源利用率低和可扩展性差等瓶颈。为应对这些挑战,RocketMQ for AI 提供了面向 AI 场景的异步通信解决方案,通过事件驱动架构实现智能体间的高效协作。本文将探讨和演示如何利用 RocketMQ 构建一个高效、可靠且可扩展的多智能体系统,以解决企业级 AI 应用中的核心通信难题。 多智能体系统的通信需求与核心挑战 随着 AI 应用的复杂度不断提升,单智能体(AI Agent)因其能力边界和知识局限,已难以独立胜任动态、多维度的决策任务。因此,多智能体(MultiAgent)系统正迅速成为构建复杂 AI 应用的核心范式。MultiAgent 系统通常由一个主智能体(Supervisor Agent)负责将复杂任务分解,并分发给多个具备特定领域能力的子智能体(Sub Agent)并行执行,最终汇聚结果以达成共同目标。 整个系统的智能与效能,高度依赖于智能体间通信的效率与可靠性。为了实现不同厂商、不同技术栈开发的智能体高效协作,行业需要为它们建立一套标准化的“交互协议”与“工作流程”,例如 Google 提出的 A2A(AgenttoAgent)协议。然而,底层的通信模式仍是决定系统性能、可靠性和成本效率的关键。传统的同步调用模式在简单的“一对一”交互中尚可应对,但在 MultiAgent 系统这种涉及多个长周期任务并行协作的复杂场景下,其弊端逐渐凸显,主要体现为三大核心挑战: 1. 同步阻塞与性能瓶颈:在同步调用模式下,主智能体分发任务后必须等待子智能体返回执行结果,才能继续下一步规划。在包含多个长耗时任务的复杂链路中,这极易引发“级联阻塞”,严重限制了系统的并发处理能力和整体吞吐量,导致协作效率低下,系统难以扩展。 2. 系统可用性挑战:同步通信的强依赖特性,使得智能体间的调用关系如同“串联电路”,且通常缺乏可靠的重试与容错机制。任何一个智能体节点的故障或超时,都可能导致整个任务链路中断。任务失败不仅影响用户体验,还会造成中间过程消耗的宝贵算力资源被浪费。 3. 消费调度与成本效率困境:MultiAgent 系统中,上下游智能体的吞吐量差异巨大,任务负载也常出现波峰波谷。若缺乏精细化的流量控制与差异化调度能力,流量洪峰可能导致部分智能体服务过载甚至“雪崩”。同时,在算力资源有限的情况下,系统无法保证高优任务被优先处理,难以实现算力利用率的最大化,最终陷入“忙时过载、闲时浪费”的资源困境。 这些挑战共同制约了多智能体系统的性能、可靠性与成本效率,成为阻碍复杂 AI 应用规模化落地的重要因素。 RocketMQ for AI:构建智能体高效协作的异步通信引擎 要解决上述挑战,核心在于将系统架构从“请求响应(RequestReply)”的同步调用模式,转变为基于事件驱动的异步通信模式。RocketMQ for AI 通过一系列专为 AI 场景设计的特性,为多智能体系统的可靠通信与高效协作构建了一个强大的异步通信引擎。 1. 异步通信,提升协作扩展性:在异步通信模式下,主智能体将任务作为“消息”发送至消息队列后,便可立即返回处理其他工作,无需等待子智能体处理和反馈;子智能体作为“消费者”独立地从队列中获取任务并进行处理。这种“发布订阅”模式彻底消除了级联阻塞,使主智能体可以轻松地向多个子智能体并发分发任务,极大提升了协作效率与系统吞吐量,缩短了复杂任务的端到端时长。RocketMQ 专为 AI 场景推出的轻量主题模型(LiteTopic),支持百万级轻量资源与高性能动态订阅,为系统的动态扩展提供了坚实基础。 2. 持久化与重试机制,提升系统可用性:异步解耦打破了智能体间的调用强依赖,显著提升了系统整体可用性。RocketMQ 将智能体通信的请求和结果均持久化到消息队列,这相当于为任务处理流程提供了 checkpoint 能力。即使某个智能体服务短暂宕机或网络故障,任务消息也不会丢失,待服务恢复后可继续处理。结合 RocketMQ 内置的可靠重试与死信队列机制,可以确保任务最终成功交付,避免因瞬时故障导致整个任务链路失败和算力资源浪费,极大提升了系统的韧性和可用性。 3. 精细化调度,保障稳定性与优化成本效率:面对稀缺且昂贵的 AI 算力资源,RocketMQ 提供了丰富的消息调度策略,以实现成本与效率的最优平衡。通过控制消息的消费速率,可以对任务请求进行缓冲,起到“削峰填谷”的作用,防止下游智能体被突发流量冲垮,保护服务稳定性。通过优先级队列,可以确保在有限的算力资源下,高优先级任务能够被智能体优先处理,实现资源利用率的最大化。 场景实践:通过 RocketMQ 实现 MultiAgent 系统异步通信 下图展示了一个基于 RocketMQ LiteTopic 实现的多智能体异步通信的典型流程,包含一个主智能体(Supervisor Agent)和两个子智能体(SubAgent)。 1. 接收请求阶段:为每个 Sub Agent 创建一个 Topic 作为请求任务的缓冲队列。 2. 返回结果阶段: a. 为 Supervisor Agent 创建一个用于接收响应结果的 Topic,并让其订阅这个 Response Topic。该 Topic 可采用 RocketMQ 专为 AI 场景新发布的 Lite Topic 类型; b. 当 SubAgent 完成任务后,它会将结果发送至该 Response Topic,可以为每个独立任务动态创建一个专属的子 LiteTopic(例如,以任务 ID 或问题 ID 命名); c. Supervisor Agent 通过 MQ 的异步通知机制实时获取这些子 LiteTopic 中的结果,并可通过 HTTP SSE(ServerSent Events)等协议推送给 Web 端。 场景示例: 现在,我们通过一个具体的天气查询与行程规划 MultiAgent 系统实例,展示如何利用 RocketMQ 实现智能体间的异步通信与高效协作。 1. 方案架构 为简化 MultiAgent 系统的部署过程,我们将在 1 台云服务器 ECS 上部署 3 个独立的 Agent—— 1 个主智能体(Supervisor Agent)、一个负责天气查询的子智能体(Weather Agent) 和一个负责行程规划的子智能体(TravelAgent),并且通过云消息队列 RocketMQ 版实现 Agent 之间的异步通信。 2. 实施步骤 a. 创建资源: i. 创建专有网络 VPC(为云服务器 ECS 等云资源构建云上私有网络)、云服务器 ECS(用于部署 MultiAgent 系统)、云消息队列 RocketMQ 版(提供消息队列服务,实现 Agent 之间的异步通信)。 ii. 在云消息队列 RocketMQ 版实例下创建 3 个 Topic:WeatherAgentTask(普通消息,用于 WeatherAgent 接收任务消息)、TravelAgentTask(普通消息,用于 TravelAgent 接收任务消息),WorkerAgentResponse(轻量消息,用于 SupervisorAgent 接收各个子 Agent 返回的任务结果)。 iii. 在云消息队列 RocketMQ 版实例下创建 3 个 Group:WeatherAgentTaskConsumerGroup(消费模式 CLUSTERING,并发投递,用于消费 WeatherAgentTask 的普通消息)、TravelAgentTaskConsumerGroup(消费模式 CLUSTERING,并发投递,用于消费 TravelAgentTask 的普通消息)、WorkerAgentResponseConsumerGroup(消费模式 LITE_SELECTIVE,顺序投递,用于消费 WorkerAgentResponse 的轻量消息)。 b. 创建智能体应用: i. 开通大模型服务平台百炼(用于调用模型服务),并获取百炼 API Key。 ii. 在百炼的应用管理页面,根据示例文档中(在此不详细展开)提供的模型参数和提示词,分别创建并发布两个智能体应用(天气助手 Agent、行程助手 Agent)。 c. 部署智能体应用:远程连接云服务器 ECS 根据提供的执行脚本部署示例应用程序。等待应用启动完毕,大约需要 3~5 分钟,直到终端显示 You 提示符,便可直接在终端中输入信息与智能体交互。 3. 效果验证 a. 输入帮我做一个下周三到下周日杭州周边自驾游方案。 b. 等待智能体执行任务,最终会返回结合天气信息的行程规划内容,过程如下: i. Supervisor Agent 接收用户输入,向消息队列发送一条消息杭州下周三到周日的天气情况怎么样?。 ii. Weather Agent 监听到上述消息,执行天气查询,并将结果发往消息队列。 iii.Supervisor Agent 监听到上述消息,获取了天气查询结果,然后向消息队列发送一条消息杭州下周三至周日天气已知,天气为,请基于此制定一份从杭州出发的周边2人3天4晚自驾游行程规划(下周三出发,周日返回),包含住宿、餐饮与景点推荐。 iv. Travel Agent 监听到上述消息,执行行程规划,并将结果发往消息队列。 v. Supervisor Agent 监听到上述消息,获取了行程规划结果并返回给用户。 c. 查看消息轨迹:在云消息队列 RocketMQ 版实例详情页,可以按 Topic 或按 LiteTopic 查询到相关的消息轨迹。 目前,该解决方案已在阿里云官网上线,欢迎点击即可部署体验~ 邀请您钉钉扫码加入 RocketMQ for AI 用户交流群,探索更多产品功能与应用场景,与我们共建 AI MQ 的未来!
#行业实践

2025年11月27日

多源 RAG 自动化处理:从 0 到 1 构建事件驱动的实时 RAG 应用
前言 当企业想用大模型和内部非公开信息打造智能问答系统时,RAG(RetrievalAugmented Generation,检索增强生成)已成为必备技术。然而,在实际落地中,构建 RAG 应用的数据准备过程繁琐复杂且充满挑战,让很多企业和开发者望而却步。本文将介绍构建 RAG 的最佳实践:通过阿里云事件总线 EventBridge 提供的多源 RAG 处理方案,基于事件驱动架构为企业 AI 应用打造高效、可靠、自动化的数据管道,轻松解决 RAG 数据处理难题。 为什么 RAG 是治愈模型幻觉的“良方”? 大语言模型(LLM)就像一个博览群书、记忆力超群的“学霸”,尽管文采斐然、对答如流,但偶尔也会犯一些令人啼笑皆非的错误,比如凭空编造事实或提供过时信息,这就是我们常说的“模型幻觉”。 这背后的原因很简单:这位“学霸”的知识完全来自于“毕业前”学过的海量教材(即训练数据),尽管覆盖了维基百科、新闻、书籍等通用知识和各领域的专业知识,但存在两个天然局限: + 知识领域局限:它对企业内部、垂直领域等私域知识知之甚少。比如,它不了解你公司内部的规章制度,也无法接触电商平台的用户数据等非公开信息。 + 知识时效局限:它的知识更新停留在训练数据截止的那个时间点,无法获取实时信息,比如股票行情、时事新闻等不断更新的动态数据。 为了治好大语言模型“一本正经胡说八道”的毛病,我们必须让它从“闭卷考试”升级为“开卷考试”,RAG(RetrievalAugmented Generation,检索增强生成)技术应运而生。 RAG 的核心理念,可以通俗地理解为“先查找资料,再生成答案”。当收到一个问题时,它不会让大模型直接凭记忆回答问题,而是分两步走: 1. 检索(Retrieval):从一个可随时更新的外部知识库(如企业内部文档、产品手册等)中,快速检索出与问题最相关的信息片段。 2. 生成(Generation):将检索到的信息片段连同用户问题一起作为上下文提供给大模型,引导它基于这些可靠的“证据”生成准确、有理有据且可追溯来源的回答。 _(来自阿里云大模型平台服务百炼 知识库功能 文档示例)_ 通过这种方式,不仅能有效减少模型幻觉,大幅提升生成答案的准确性与时效性,还让模型在无需耗费巨资和时间进行重新训练的情况下,就能轻松扩展知识边界。凭借这些显著优势,RAG 已成为企业构建可靠、智能 AI 应用的首选方案。 RAG 落地挑战:数据处理的“三重困境” 尽管 RAG 的原理听起来简单明了,但在实际落地时,无数企业和开发者却深陷数据处理的泥潭。 AI 时代的数据处理,与过去以结构化数据为主的传统数据处理模式截然不同。我们面对的是由海量、异构、多模态数据构成的洪流,数据处理的复杂度和挑战呈指数级增长。企业对实时性要求也不断提高,任何数据延迟都可能影响模型效果。 企业和开发者在落地 RAG 时,普遍会陷入数据处理的“三重困境”: 1. 扩展之困——异构化数据源的“接入鸿沟” 现代企业的数据通常散落在 ERP、CRM、OA、IoT 设备、社交媒体等数十个系统中,涵盖结构化数据(如数据库、表格)、非结构化数据(如 PDF、网页、图片、音视频)和半结构化数据(如 JSON、XML)。若采用传统点对点连接的数据集成方式,每接入一个新数据源,都需要复杂的定制化开发,扩展性极差,响应速度慢,严重拖慢 AI 应用的迭代速度。 2. 运维之难——脆弱数据管道的“运维噩梦” RAG 的数据处理链路漫长且复杂,涉及数据采集、清洗、切块、向量化、入库、检索等多个环节。整条链路如同一个脆弱的“黑箱”,任何一个环节的微小故障都可能导致全链路瘫痪。在实际运维过程中,数据源接口变更、数据质量问题、系统负载突增等突发状况层出不穷,数据管道的问题排查、修复和系统更新,都极其耗时耗力,让运维团队疲于奔命。 3. 稳定之痛——数据管道的“可靠性危机” 数据管道的稳定性是 AI 应用落地的基石。数据丢失、重复、延迟、质量下降以及系统故障等数据处理链路中的任何问题,都可能直接导致模型推理结果的偏差甚至错误,进而影响业务决策和用户体验。传统数据处理架构的紧耦合设计,导致任何一个组件故障都可能影响整个系统运行,并且缺乏有效的监控和告警机制,往往在造成严重影响后才发现问题。 因此,我们迫切需要一种全新的数据处理范式,来构建一个灵活、可扩展、实时、智能的数据处理管道。 破局之道:事件架构驱动重塑 AI 数据管道 事件驱动架构(EventDriven Architecture,EDA)为应对 AI 数据处理的复杂性挑战,提供了坚实的技术基础。在事件驱动架构中,“事件(Event)”是核心概念,它本质上是一次状态变化的数字化表达。在 AI 数据处理场景中,数据的产生、变更、处理、存储等各个环节都可以被抽象为事件。例如,当新的训练数据上传到系统时,产生数据接收事件;当数据经过清洗和转换后,产生数据处理完成事件;当向量化处理完成后,产生向量生成事件;当数据成功存储到向量数据库后,产生数据入库事件。 这种“事件化”的处理方式,使整个 AI 数据处理流程变得标准化、清晰、可控且可追溯,带来三大优势: 1. 松耦合 数据处理流程被分解为独立的事件和处理单元。数据工程、算法、平台等团队可以独立开发、部署和迭代各自负责的组件,无需关心对方的内部实现。一个组件的变更不会影响其他部分,系统容错能力和迭代效率更高。 2. 可扩展性与稳定性 每个组件都可以根据实际负载独立扩展,当某个组件成为瓶颈时,只需增加该组件的实例数量,而无需对整个系统进行扩容。同时,通过引入智能监控和自动恢复机制,系统能够及时发现和处理各种异常情况,保证数据链路稳定运行。 3. 端到端实时性 在智能客服、实时推荐等场景中,毫秒级的响应至关重要。事件驱动架构可以确保事件一旦发生,便能被立即捕获并触发后续处理。这使得 RAG 的知识库能够近乎实时地吸收新信息,让大模型始终掌握着最新“情报”。 综上所述,采用事件驱动架构的系统在敏捷性、可扩展性和可靠性方面实现了质的飞跃,这正是 AI 应用规模化落地的基石。 EventBridge 多源 RAG 处理方案:为 AI 场景提供高效数据管道 阿里云事件总线 EventBridge 基于事件驱动架构,将 AI 能力深度融入数据处理全链路,为企业和开发者提供专为 AI 应用设计的、端到端的、智能化的数据处理中间件。 EventBridge 通过一系列 ETL for AI Data 的全新能力,提供多源 RAG 处理方案:将 RAG 数据准备的全流程(从多源异构数据提取、清洗、切块、向量化再到入库)彻底实现自动化。 开发者现在可以通过 EventBridge 简单的“白屏化”配置,轻松实现: 1. 无缝对接多源数据 轻松接入主流的对象存储(OSS)、消息队列(如 Kafka、RocketMQ、MQTT)、日志服务(如 SLS)、数据库服务(如 MySQL)等多种数据源,覆盖结构化数据(如数据库、表格)、非结构化数据(如 PDF、网页、图片、音视频)和半结构化数据(如 JSON、XML)。 2. 智能化的数据处理 自动完成文档解析(Loader)、文本切分(Chunking)和向量化(Embedding)的完整数据转换流程,内置多种核心技术,支持多种非结构化数据(如 TEXT、JSON、XML、YAML、CSV)的智能解析和处理,提供完整的 Loader 技术体系,包括多种分块策略、单文档加载、批量数据加载,确保大规模数据的可靠处理;对结构化数据采用流式处理架构,能够实时处理高吞吐量的数据流,可实现复杂的流式数据转换和聚合操作。 3. 一键式向量入库 提供统一的向量数据库接入接口,支持将处理好的向量数据直接加载到主流向量数据库(如 DashVector、Milvus)中,也兼容传统数据库的向量扩展插件。只需简单的图形界面配置(拖拽方式配置数据源、处理逻辑、目标数据库等),系统会自动生成复杂的向量数据处理和入库流程。提供丰富的预置模板,可基于模板快速搭建数据处理流程。提供完善的监控仪表板和告警机制,可实时查看数据处理的状态、性能指标、错误信息等,及时发现和解决问题。 场景实践:从 0 到 1 构建基于事件驱动架构的实时 RAG 应用 接下来,我们将通过一个完整的实战场景,带你从零开始,利用阿里云事件总线 EventBridge、对象存储 OSS、函数计算 FC、向量检索服务 DashVector 和大模型服务平台百炼,快速构建一个实时的 RAG 应用。 方案概览 + 首先,通过 EventBridge 构建一个高效的 ETL 数据管道:能够自动从数据源(对象存储 OSS)中实时提取数据,通过函数计算 FC 灵活定义数据转换的逻辑,进行清洗、切块和向量化,并将处理结果持续加载到目标(向量检索服务 DashVector),形成一个动态更新的知识库。 + 然后,通过函数计算(FC)的 Web 函数构建一个简单的 RAG 应用,调用大模型服务平台百炼进行推理,以 DashVector 中的向量数据作为知识库。 + 最后,我们通过输入与知识库相关的用户问题,测试 RAG 应用的回答效果。 方案架构 方案提供的默认设置完成部署后,在阿里云上搭建的系统如下图所示。实际部署时您可以根据资源规划修改部分设置,但最终形成的运行环境与下图相似。 实施步骤 1. 构建自动化数据管道: 1. 创建事件流:在事件总线 EventBridge 控制台创建并配置一个事件流,作为数据处理管道的核心。 2. 配置数据源与目标:创建并配置对象存储 OSS Bucket 作为数据源(Source),创建并配置向量检索服务 DashVector 作为数据投递的目标(Sink)。 3. 配置数据转换逻辑(Transform):选择“内容向量化”的函数模板创建一个函数,并在函数代码中填写获取的百炼 APIKEY,这个函数将负责对数据进行切块和向量化。 2. 构建 RAG 应用: 1. 创建 Web 函数:创建一个 Web 函数(注意和之前创建的用于处理数据流的事件函数区分)。 2. 编写应用代码:这个函数将作为 RAG 应用的后端,负责接收用户查询,从 DashVector 检索知识,并调用百炼大模型生成回答。需要在函数代码中配置百炼和向量检索服务 DashVector 的相关访问凭证(如 APIKEY、Endpoint 等)。 3. 部署应用:部署代码成功后,RAG 应用即构建完成并可供访问。 效果验证 1. 更新知识库:将包含私有数据的文件(例如,一份名为百炼系列手机产品介绍.txt 的文档,包含了虚拟手机厂商的商品数据)上传到 OSS Bucket 中。 2. 查看向量生成:文件上传成功后,EventBridge 会自动捕获这一事件并触发数据处理流程。稍等片刻,即可在 DashVector 控制台查看已生成的向量。 3. 测试问答效果:通过创建的 RAG 应用发起访问,输入一个与你上传文档相关的问题,例如:“百炼 X1 手机的分辨率是多少?”。 4. 获取精准回答:RAG 应用会自动检索知识库,并将相关信息连同问题一起发送给百炼大模型。很快就会收到一个基于私有数据生成的精准回答。在函数的执行日志中,还可以看到向量检索召回的具体原文片段,从而验证整个 RAG 链路的有效性。 目前,该解决方案已在阿里云官网上线,欢迎点击即可部署体验~ 邀请您钉钉扫码加入 EventBridge 用户交流群,探索更多产品功能,与我们共同定义和构建 AI 数据处理的未来!
#技术探索

2025年11月27日

从 Transform 到 Transformer,用 EventBridge 与百炼构建实时智能的 ETL 数据管道
前言 作为数据处理领域的经典模式,ETL(ExtractTransformLoad)通过提取、转换、加载三个步骤,高效地处理着各类结构化数据。然而,面对 AI 时代海量、异构、实时的“数据洪流”,传统 ETL 链路,尤其是其核心的转换(Transform)环节,正面临严峻挑战。本文将从一个初级开发者也能理解和上手的视角,探讨 AI 时代的数据处理新范式:如何利用基于 Transformer 架构的大语言模型(LLM)重塑传统数据处理中的转换(Transform)环节,并结合事件驱动架构(EventDriven Architecture, EDA),为 AI 数据处理链路“注入实时智能”。 传统 ETL 在 AI 时代的困境:“T”不仅是“转换”,更是“痛点” ETL(ExtractTransformLoad)是数据处理的核心流程。它负责从不同数据源中提取数据,经过清洗、转换和整合后,加载到统一的数据仓库,为后续的数据分析与商业智能提供支撑。 我们可以将 ETL 生动地比作一个“中央厨房”:“提取(Extract)” 是采购生鲜食材(原始数据),“加载(Load)” 是将精美菜肴(可用数据)端上餐桌,而 “转换(Transform)” 则是至关重要的烹饪环节。在过去,厨房采购的多是规格统一的食材(结构化数据),依靠“老菜谱”(固定的规则代码)便能高效、稳定地完成处理。然而,AI 时代带来了形态各异、数量庞大的“山珍海味”——文本、图片、音频、视频等非结构化数据,且往往要求实时处理。这使得厨房里最关键的烹饪环节(Transform)不堪重负,那套为标准食材设计的“老菜谱”已然捉襟见肘。 以一个常见的地址信息标准化场景为例。用户输入的地址格式五花八门,可能是文本、文档、扫描图片以及客服对话等。后端系统为了便于入库与分析,必须将这些信息精准地解析为“省市区街道”的标准化结构。更棘手的是,原始地址信息还可能存在错别字、信息缺失或语义模糊等问题。在传统 ETL 模式下,工程师不得不编写数千行复杂的正则表达式和判断逻辑,并维护一个庞大的地址库。这种方式不仅开发和维护成本高、扩展性差,而且难以覆盖所有非标准格式和异常情况。 这正是传统 ETL 在 AI 时代“力不从心”的缩影,但也只是冰山一角。在 AI 应用中,数据转换的需求远不止于此,例如:从用户评论中提取情感倾向、为非结构化文档打标签、从图片中识别特定内容等。传统基于固定规则的 Transform 工具,面对这些充满“语义”和“不确定性”的任务,几乎束手无策。最终,数据处理链路变得难以扩展、运维复杂且稳定性差,严重拖慢了 AI 应用的创新步伐。 为 ETL 更换“AI 大脑”:用 Transformer(AI)代替 Transform 既然传统 Transform 难以理解数据背后的“语义”,为何不让擅长此道的 AI 来主导这个环节呢? 本文巧妙地运用了“Transformer”一词的双关含义:它既指代 ETL 中的“转换器”(Transformer),更特指驱动大语言模型(LLM)的核心架构——Transformer。 这正是“用 Transformer 代替 Transform”的核心思想:将大语言模型(LLM)的智能直接注入 ETL 的数据转换环节。许多过去需要复杂代码才能实现的数据转换任务,如今通过与 AI 交互即可轻松完成。 回到我们的“中央厨房”比喻,现在负责加工菜品的,不再是一本固化的菜谱,而是一位拥有米其林星级水准、能理解并处理任何食材的全能大厨。 以前文提到的“地址信息标准化”场景为例,集成了 LLM 的数据管道能做到: + 结构化解析:利用大模型的语义理解能力,即使地址格式不规范、字段顺序混乱,也能准确识别出省、市、区、街道等核心要素。例如,对于“北京市海淀区中关村大街1号”,大模型能准确解析出结构化字段:省份为“北京”,城市为“北京”,区县为“海淀区”,街道为“中关村大街1号”。 + 自动识别纠错:基于大模型强大的语言知识与上下文推理,自动识别并纠正地址信息中的错别字或格式错误,再结合地理知识库,能够进一步确保纠正的准确性。例如,将“北京市海定区”纠正为“北京市海淀区”,将“中关村大街一号”标准化为“中关村大街1号”。 + 智能信息补全:借助大模型的上下文推理能力,并集成完整的地理信息数据库,能够根据已有地址信息智能补全缺失的部分。例如,通过详细地址自动推断邮政编码,根据区县信息补全城市和省份。 + 格式标准化:最终,所有解析后的信息将被统一输出为标准格式,如结构化的字段信息(省、市、区、街道、邮政编码等)和规范化的地址字符串,确保下游系统能够直接、高效地进行数据处理和分析。 这样,我们就不再需要维护复杂的规则库,只需在数据流中集成大语言模型的能力,即可实现智能化的数据清洗和标准化处理。 事件驱动架构(EDA):ETL for AI Date 的“天选载体” 解决了数据“如何转换”的问题后,我们还需要一个现代化的架构来承载整个流程。相较于传统的批量处理模式,事件驱动架构(EventDriven Architecture, EDA)已被公认为是 AI 时代数据处理的理想选择。 如果说“用 Transformer 代替 Transform”是为数据管道植入“AI 大脑”,那么 EDA 就是构建这套智能系统的“神经网络”,让数据和 AI 指令高效地流动起来。它将系统中的每一次状态变化(如“新文件上传”、“新评论产生”)都视为一个独立的“事件”,实时触发相应的处理动作,并准确地将处理结果路由到指定的下游系统。 阿里云事件总线 EventBridge:AI 时代的事件中枢 作为 AI 时代的企业级事件枢纽,事件总线 EventBridge 为 AI 数据处理提供强大的事件驱动能力。它能够帮你轻松构建高效、智能的 ETL 数据管道,无缝集成 AI 能力,并且原生支持实时推理和异步推理两种模式,以满足不同场景的需求。其核心价值体现在以下几个方面: + 实时智能决策:将 AI 推理能力无缝融入数据处理流程,使系统能对实时数据进行智能分析与决策,极大提升业务的智能化水平。 + 系统解耦、灵活扩展:数据源、AI 模型和业务系统通过 EventBridge 完全解耦,任何组件的变更都不会影响其他部分,让系统更加灵活、更易于维护和扩展。 + AI 能力沉淀与复用:相同的 AI 推理服务可被多个业务场景复用,避免重复开发,最大化 AI 模型的价值与利用率。 + Serverless 与易用性:作为全托管的 Serverless 服务,EventBridge 免去了繁琐的部署和运维工作。通过简单的控制台配置或 API 调用,即可快速构建和管理事件驱动的数据处理流程,并支持事件过滤、转换、路由、重试等高级能力。 核心能力:从不确定到确定的“结构化输出” 在 ETL 的 “转换(Transform)” 环节中,EventBridge 允许用户直接调用大语言模型(例如通义千问或百炼大模型服务平台上的模型),甚至是更复杂的 AI Agent。 但问题来了:LLM 的输出具有不确定性,有时像“开盲盒”,这在要求稳定可靠的企业级应用中是无法接受的。 为了解决这个核心痛点,EventBridge 提供了强大的结构化输出(Structured Output)能力,确保 AI 的每一次处理结果都以稳定、可靠的格式返回。 例如,当需要模型分析用户评论的情感时,我们期望的不是“我觉得这句话是积极的”这类模棱两可的文本,而是一个清晰的 JSON 对象,如:{ "sentiment": "积极", "summary": "产品设计和性能出色,客户非常满意" }。 EventBridge 通过以下两项核心技术,产品化地实现了这一关键能力: + JsonSchema 原生支持:对于支持 JsonSchema 的模型,EventBridge 会利用其原生能力,确保模型输出严格遵循用户定义的格式,实现高性能与高可靠性。 + 智能提示词注入:对于不支持 JsonSchema 的模型,EventBridge 会采用智能提示词注入技术,引导模型生成稳定、可靠的结构化输出,并确保多轮对话格式的一致性。 这项能力极大地降低了在 ETL 流程中使用 AI 的门槛,为企业级应用的稳定性提供了坚实保障。 场景实践:EventBridge+百炼,让 AI 赋能数据链路 我们以一个具体的场景为例,展示如何通过阿里云的事件总线 EventBridge 与大模型服务平台百炼的结合,构建 AI 赋能的数据链路。 通过在 EventBridge 的事件流中原生集成百炼大模型服务,使开发者可以在数据流中直接调用 AI,从而轻松、高效地完成传统 ETL 中复杂的转换(Transform)任务。 应用场景:敏感信息过滤与数据脱敏 本方案将演示如何利用 AI 对实时数据流进行清洗与脱敏处理,自动发现并屏蔽业务数据中的敏感关键词,从而保障业务的数据合规性。 方案架构与实施步骤 该方案的核心是利用 EventBridge 作为数据中枢,将来自不同数据源的实时数据流,分发给百炼 AI 模型进行推理,再将推理结果通过 EventBridge 路由到下游的业务系统或数据存储中。 下图展示了该方案的参考架构: 实施该方案主要包括以下三个步骤: 1. 构建数据管道:创建一个 EventBridge 事件流,作为整个数据处理的核心链路。 2. 配置数据源与目标:创建两个轻量消息队列,分别作为事件流的源(Source)和目标(Sink)。 3. 集成 AI 能力:在事件流的转换(Transform)环节,配置调用已开通的阿里云百炼大模型服务,将 AI 的智能处理能力无缝嵌入数据管道中。 整个流程通过在 EventBridge 控制台进行简单的点击和配置即可完成,无需编写复杂的代码来连接数据源、AI 模型和下游系统,大大提升了开发效率。 实现效果 部署完成后,可以通过以下方式验证数据脱敏效果:从 Source(源)队列发送带有敏感信息的数据,然后在 Sink(目标)队列接收经过事件流处理后的数据。 + 输入示例:包含敏感信息的数据,如["客户张三(13812345678)反馈了一个问题..."] + 输出示例:经过 AI 脱敏后的数据,如["客户(1385678)反馈了一个问题..."] 注:由于大模型生成结果存在随机性,实际测试时输出结果的格式可能存在差异。在生产环境中,可以结合 EventBridge 的结构化输出能力与提示词工程,获得稳定可靠的输出结果。 开发者关心的问题(Q&A) 问:每次数据转换都调用 LLM,成本会不会很高? 答:成本是大家最关心的问题。首先,EventBridge 本身是 Serverless 服务,按实际处理的事件数量付费。主要的成本来自 LLM 调用,但你可以根据场景需求灵活选择不同规模和成本的模型。对于简单任务,轻量级模型已足够,成本较低。对于非实时场景,可采用异步批量调用的方式,进一步优化成本。 问:LLM 推理有延迟,会影响实时性吗? 答:事件驱动架构的异步和解耦特性天然适合这种情况。数据源发送事件后无需等待处理结果,整个系统流程不会被阻塞。EventBridge 支持实时和异步两种推理模式,你可以根据业务对延迟的敏感度进行选择,确保用户体验不受影响。 总结 用 Transformer(LLM)升级 Transform(转换),并以事件驱动架构(EDA)作为承载,是 AI 时代数据处理范式的一次“智”变。阿里云 EventBridge 与百炼的结合,为开发者提供了一条低门槛、高灵活性的路径,将强大的 AI 能力无缝融入实时数据流,让你的应用轻松实现智能化。 目前,该解决方案已在阿里云官网上线,欢迎点击即可部署体验~ 邀请您钉钉扫码加入 EventBridge 用户交流群,探索更多产品功能,与我们共同定义和构建 AI 数据处理的未来!
#技术探索

2025年11月7日

官宣上线!RocketMQ for AI:企业级 AI 应用异步通信首选方案
企业级 AI 应用开发面临新挑战 随着人工智能技术的飞速发展,模型迭代日新月异,企业正积极构建 AI 应用以提升用户体验和降低人力成本。然而,与传统微服务应用相比,企业在推进 AI 应用落地的过程中,普遍呈现出三个显著特征: + 任务处理耗时长:传统微服务应用通常能实现毫秒级响应,而 AI 应用的处理周期跨度极大——从几分钟到数小时不等。这种长耗时与不确定性,要求系统架构必须在任务调度、资源分配和用户体验设计上进行重新考量,避免同步调用带来的长时间阻塞。 + 算力资源稀缺性且成本高昂:AI 应用的训练与推理高度依赖 GPU 等稀缺且昂贵的算力资源。因此,任何因网络或应用异常导致的任务重复处理,都会直接造成算力资源浪费和成本增加。如何保障任务在异常情况下不丢失、不重复,成为控制成本的关键。 + 算力利用率与业务流量波动的矛盾:业务请求天然存在波峰波谷。为应对流量高峰以保障服务稳定,企业需要预留大量算力,导致流量低谷时资源闲置;反之,若为节约成本而缩减资源,又难以应对高峰请求,可能导致系统过载或任务积压。如何在有限算力下实现高效调度,既提高资源利用率,又保障高优任务及时响应和系统稳定性,构成了一个核心矛盾。 这些业务特点在 AI 应用的开发和集成过程中,引出了以下典型的业务场景问题: + 单智能体(Agent)局限性与多智能体(MultiAgent)协作:由于单智能体缺乏专业分工、难以整合多领域知识,无法在复杂场景中实现动态协作决策。因此,随着 AI 应用场景变得更加复杂,单 Agent 应用会逐步向多 Agent 应用演进。然而,在 AI 任务处理耗时长的背景下,智能体间的通信(Agent2Agent)必须解决长耗时同步调用带来的阻塞问题以及应用的协作扩展性问题。 + 大规模会话状态管理,并保障会话连续性和任务处理可靠性:在网络或应用节点发生异常时,如何保障用户会话的连续性体验,并确保会话任务不被重复处理以避免算力资源浪费,成为一大挑战。 + 在有限算力下实现高效调度,并保障高优任务的及时响应:如何在有限算力资源下实现高效任务调度,从而既能提高算力资源利用率,保障高优任务被及时处理,又能确保算力服务整体稳定性。 在上述场景中,消息队列能够起到至关重要的作用: + 首先,通过消息队列将同步调用改为异步通知,是解决长耗时阻塞的关键。 + 其次,消息队列天然的“削峰填谷”能力可以平滑请求流量,缓解算力资源的处理压力。 + 再结合定速消费和消息优先级等高级特性,便能更有效地调度有限的算力资源。 为能够有效解决上述问题,RocketMQ 推出了针对性的解决方案。 RocketMQ for AI 重磅发布 RocketMQ 专门为 AI 场景推出了全新 LiteTopic 模型,相较于 RocketMQ 其他类型的 Topic,LiteTopic 具备以下核心特点: + 轻量资源:LiteTopic 是轻量资源,支持在父 Topic 下创建百万数量级的 LiteTopic,满足大规模任务需求。 + 自动化生命周期管理:LiteTopic 可在收发请求时自动创建,并可设置过期时间,到期后自动删除,简化了业务开发和资源管理。 + 高性能订阅:在消费订阅方面,每个消费者可以动态订阅或取消订阅多达万级的 LiteTopic 集合。如图中所示,消费者 1 订阅列表是 LiteTopic 1 和 LiteTopic 2,消费者 2 订阅列表是 LiteTopic 3 和 LiteTopic 4。 + 排他消费:确保一个 LiteTopic 在同一时间只被一个消费者订阅,这在会话保持等场景中至关重要。 + 顺序性保障:每个 LiteTopic 内部的消息严格保证顺序存储。 目前,这些能力已在阿里云云消息队列 RocketMQ 版 5.x 系列实例上正式发布,并会逐步贡献到 Apache RocketMQ 开源社区,欢迎大家使用。 场景应用一:MultiAgent 异步通信__ 延续前文对多智能体(MultiAgent)通信场景的讨论,我们在此详细阐述 RocketMQ 如何解决多智能体应用开发中的长耗时阻塞问题。 图中展示了多智能体(MultiAgent)应用中一个 Supervisor Agent(主智能体)和两个 Sub Agent(子智能体)之间的异步通信流程: 1. 接收请求阶段:为每个 Sub Agent 创建一个 Topic 作为请求任务的缓冲队列,可以是优先级 Topic,从而保障高优任务能够被优先处理。 2. 返回结果阶段: a. 为 Supervisor Agent 创建一个用于接收响应结果的 Topic,并让其订阅这个 Response Topic。该 Topic 可采用 RocketMQ 专为 AI 场景新发布的 Lite Topic 类型; b. 当 SubAgent 完成任务后,它会将结果发送至该 Response Topic,可以为每个独立任务动态创建一个专属的子 LiteTopic(例如,以任务 ID 或问题 ID 命名); c. Supervisor Agent 通过 MQ 的异步通知机制实时获取这些子 LiteTopic 中的结果,并可通过 HTTP SSE(ServerSent Events)等协议推送给 Web 端。 这一架构充分利用了 Lite Topic 的以下核心能力,解决了长耗时调用的难题: + 轻量资源:支持创建百万量级的子 LiteTopic,可以满足海量请求任务的通信需求。 + 自动化生命周期管理:子 LiteTopic 支持自动创建和删除,可以简化业务代码,降低资源管理投入。 + 顺序性保障:每个子 LiteTopic 的消息均按顺序存储和消费,可以保证流式响应结果的顺序性。 场景应用二:分布式会话状态管理 LiteTopic 的能力还可以有效解决会话场景中的挑战,例如保障长耗时会话的状态连续性、避免任务重试带来的成本增加等。 实现原理如图所示:在一个多节点高可用集群的应用服务中,不同用户的会话被分发到不同节点上。与前述的返回响应结果场景类似,系统为每个会话分配一个专属 LiteTopic 来传递消息(如会话结果)。每个应用服务节点仅订阅其关联会话所对应的 LiteTopic 集合,并将接收到的消息按顺序推送至 Web 端。 在此基础上,系统通过分布式架构和 RocketMQ 的一系列核心特性,实现高可用性保障: + 故障切换:当网络异常等原因导致 Web 端 2 重连到集群中的另一个节点 2 时,节点 2 会立即订阅此会话对应的 LiteTopic 2。 + 消费转移:由于排他消费特性,LiteTopic 2 的消息将不再推送给节点 1,转为推送给节点 2。 + 无缝续传:得益于消息持久化和消费位点持久化两大特性,节点 2 能够从上一次中断的位置无缝衔接,推送的数据流会接着之前的消费进度推送给节点 2。 最终,用户在 Web 端感受到的是会话没有中断,从而获得连续的会话体验。同时系统也避免了因连接切换而触发不必要的任务重试,有效节约了宝贵的算力资源和成本。 场景应用三:算力资源高效调度 在算力资源成本高昂且供给有限的背景下,如何实现资源的高效调度,是一个典型的应用场景。消息队列在此扮演了关键角色: + 首先,利用其天然的异步解耦和“削峰填谷”能力,可以平滑波动的请求流量,平稳地调用模型服务或算力服务。 + 其次,通过消费者限流(定速消费)能力,可以有效保护核心算力资源的稳定性,防止其因瞬时流量冲击而过载。 + 最后,消息优先级能力能够确保有限的算力资源被优先分配给高优任务(如高价值或高紧急度的任务)使用。 值得一提的是,RocketMQ 的优先级能力具备一个独特优势:消息的优先级支持在投递后动态修改。 例如,一个普通用户的任务正在队列中排队,此时该用户付费充值将账号升级为 VIP 账号。系统便可以动态提高其已在排队中的任务消息的优先级,让任务立刻被优先执行。 LiteTopic 模型技术解析 为支持百万量级的 LiteTopic,同时保障高并发与低延迟的消息发送和消费流程,其技术实现的核心要点如下: 1. 发送流程: 为实现快速、自动创建与删除 LiteTopic,基于 RocketMQ 新版本 RocksDB 的 KV Store 存储能力,实现对海量元数据信息的高效管理。 + 统一存储、多路分发:RocketMQ服务端接收到消息后,将所有消息数据统一存储在底层的 CommitLog 文件中且仅存储一份,这种单一文件的追加模式(Append)避免了磁盘碎片化,保障了极致的写入性能。但通过多路分发机制,可以为不同的 LiteTopic 生成独立的消费索引(ConsumerQueue,简称 CQ)。 + 索引存储引擎升级:摒弃了传统的文件型 CQ 结构,而是替换为高性能的 KV 存储引擎 RocksDB。通过将队列索引信息和消息物理偏移量(Physical Offset)作为键值对存储,充分发挥 RocksDB 在顺序写入方面的高性能优势,从而实现对百万级队列的高效管理。 2. 消费流程: 消费流程的核心挑战是:当每个 LiteTopic 内仅有少量消息时,若逐一推送,将导致并发处理能力和系统性能大幅下降。 为解决此问题,RocketMQ 在 LiteTopic 存储模型的基础上,进一步对消息分发与投递机制进行优化,针对单个消费者订阅上万个 LiteTopic 的场景,重新设计了一套创新的事件驱动拉取(EventDriven Pull)机制: 每当有新消息到达时,系统会立即触发订阅关系匹配,并将所有符合订阅条件的消息聚合到一个“就绪集合”(Ready Set)中。消费者可以直接从这个 Ready Set 中合并批量拉取来自多个 LiteTopic 的消息。通过这种方式,有效提高了消费并发度,降低了网络开销,从而显著提升了整体性能。 为企业级 AI 应用提供全方面的异步通信保障 随着 AI 技术的快速发展和应用落地,RocketMQ 已完成向“AI MQ”方向的战略升级,不仅支持传统的微服务应用,也致力于为企业级 AI 应用的开发和集成提供一站式异步通信解决方案,涵盖会话管理、Agent 通信、知识库构建以及模型算力调度等典型场景。同时,阿里云云消息队列 RocketMQ 版产品通过在成本与稳定性方面的持续优化,进一步帮助用户降本增效。 目前,RocketMQ for AI 相关能力已在阿里巴巴集团内部以及阿里云大模型服务平台百炼、通义灵码等产品中经过大规模生产环境的验证,且取得显著成效,充分证明了其在高并发、复杂的 AI 场景下的成熟度与可靠性。 展望未来,RocketMQ 将持续在 AI 领域进行技术迭代与创新,赋能更多应用场景,并积极与 AgentScope、Spring AI alibaba、Dify 等主流 AI 生态系统/服务合作集成,共建高效、智能的 AI 应用基础设施,以及逐步将经过阿里集团 AI 业务验证过的方案与特性,持续反馈到开源社区。 诚邀您扫码参与问卷调研,反馈真实使用场景和痛点,帮助我们打造更符合 AI 时代需求的消息引擎。也欢迎钉钉扫码加入交流群(群号:110085036316),与我们交流探讨~ 目前,轻量主题(LiteTopic)[1]功能已在阿里云云消息队列 RocketMQ 版[2]非 Serverless 系列(包年包月、按量付费)和 Serverless 系列的独享实例支持,可提交工单申请白名单(提单时需要提供购买实例的主账号 uid 和实例所属地域)。 同时,阿里云官网已上线 RocketMQ for AI 的解决方案,欢迎! 相关链接: [1] 轻量主题(LiteTopic) _https://help.aliyun.com/zh/apsaramqforrocketmq/cloudmessagequeuerocketmq5xseries/developerreference/litetopic_ [2] 云消息队列 RocketMQ 版 _https://www.aliyun.com/product/rocketmq_
#技术探索

2025年10月28日

PalmPay 基于 Apache RocketMQ 搭建非洲普惠金融“高速通道”
PalmPay:非洲领先的移动支付平台 PalmPay 是非洲知名的移动支付平台,目前主要在尼日利亚、加纳、坦桑尼亚、肯尼亚开展金融科技服务,提供包括电子支付、转账汇款、手机话费及流量充值、水电煤及有线电视等便民缴费服务。 自 2018 年成立以来,PalmPay 深耕非洲市场,也迅速成为非洲领先的金融科技公司,对非洲的金融格局产生了切实的影响。PalmPay 致力于提供安全、易用、创新的数字支付服务,获得了数百万用户和商家的信任与支持,推动了整个非洲大陆普惠金融的发展。 随着非洲基础设施的不断完善和互联网消费需求的持续增长,PalmPay 未来会拓展更多国家,不断实现更强大的技术创新和更广阔的地域覆盖,为更多非洲用户提供便捷的支付服务。通过在本地化内容领域的不懈深耕,致力于为非洲用户带来更方便、更多元的互联网体验。 高速增长下的“阵痛”:支付业务面临的技术挑战 作为一家金融科技公司,PalmPay 致力于为用户提供便捷、安全且灵活的移动支付与金融服务。随着业务规模持续扩张,用户基数与终端设备数量激增,对后台系统提出了更高要求。为了提升运营效率和服务质量,PalmPay 面临着以下技术挑战: + 交易事务一致性:在分布式系统中,业务逻辑通常涉及数据库操作和消息发送(如支付后发送扣款通知)。若数据库操作成功但消息发送失败,会导致数据不一致。传统方式需自行实现补偿机制,复杂且容易出错。 + 高效的消息处理:在高并发支付场景下,消息系统的性能至关重要。当业务量突增导致消息中间件性能下降时,会大幅增加平均响应时间,致使业务处理出现明显延迟,从而影响用户体验。 + 资源的弹性伸缩:按照业务峰值配置资源的传统方式,在业务低谷期会造成资源闲置浪费,当面对突发流量时,实例扩容速度较慢,可能无法在短时间内完成扩容,进而影响服务稳定性。 为应对上述挑战,PalmPay 采用了基于阿里云 RocketMQ 消息中间件——云消息队列 RocketMQ 版,显著提升了整体架构的稳定性和可扩展性,提高了消息处理效率,确保了高并发场景下的业务连续性,最终优化了用户体验。 破局之道:阿里云 RocketMQ 如何化解三大挑战 PalmPay 通过云消息队列 RocketMQ 版与业务系统集成的解决方案,结合其支付核心系统的特点,显著提升了整体架构的稳定性与可扩展性,优化了消息处理效率,确保了高并发交易场景下的业务连续性与数据一致性,从而进一步提升了用户体验和服务质量。此外,通过在本地进行私有化部署,满足了其合规性要求。 + RocketMQ 事务消息在支付业务中的应用:在支付业务中,用户完成交易后,系统需将支付结果(如支付成功/失败)实时推送至用户端(如 App 通知、短信或邮件),并确保支付系统内部的交易状态与消息推送系统保持一致。RocketMQ 事务消息通过“半消息(Half Message)”与“事务回查(Transaction Check)”机制,确保了本地事务提交与消息发送紧密耦合。只有本地事务提交成功,消息才会被真正发送并对消费者可见;若本地事务失败,消息则会被回滚或丢弃。该机制极大地保证了本地数据与消息通知之间的一致性。 + RocketMQ 在高并发交易处理中的作用:业务高峰期,PalmPay 面临巨大的交易并发压力。传统架构下,交易请求直接打到业务系统,容易造成系统拥堵甚至雪崩。为此,PalmPay 将 RocketMQ 作为交易异步处理的核心组件,通过消息队列实现交易请求的缓冲与削峰填谷。RocketMQ 的高性能写入能力和横向扩展架构,使其能够轻松应对突发流量,确保系统在高负载下依然保持稳定运行。同时,RocketMQ 的广播与集群消费模式支持多种消费策略,帮助 PalmPay 实现了灵活的负载均衡机制,进而提升了系统整体的吞吐能力和响应速度,SLA 稳定性支持 99.99%。 + 资源弹性伸缩与运维效率提升:结合阿里云 RocketMQ Serverless 的技术优势,PalmPay 实现了消息队列资源的按需弹性伸缩。系统可在业务低谷时自动释放闲置资源以降低运营成本,并在流量高峰时快速扩容以保障服务稳定性。同时,RocketMQ 提供了完善的消息追踪、监控告警和自动运维能力,显著降低了系统运维的复杂度,提升了整体运维效率。 通过这一系列基于 RocketMQ 的技术优化,PalmPay 成功构建了一个高可用、高可靠、高弹性的消息中间件体系。这不仅为非洲地区日益增长的数字支付需求提供了坚实的技术支撑,也为未来业务的持续扩展和全球化布局奠定了坚实的基础。 云消息队列 RocketMQ 版 5.x Serverless 系列核心优势 云消息队列 RocketMQ 版 5.x Serverless 系列基于存算分离架构,可在保证稳定性的前提下,通过资源快速伸缩实现资源使用量与实际业务负载紧密匹配,并支持按照实际使用量计费,从而有效降低运维压力和使用成本。 在业务波动较大的场景下,非 Serverless 实例(包年包月和按量付费)与 Serverless 实例在使用规格上存在明显差异,具体变化情况如下图所示: 云消息队列 RocketMQ 版 Serverless 实例具备灵活的资源伸缩能力,能够满足业务在不同发展阶段的资源需求。其核心优势如下: + 开箱即用,兼容开源版本:以业务应用为中心,使开发人员无需关注 Serverless 实例的资源规模与稳定性,能更专注于核心业务代码的开发,进而降低企业的运维成本。 + 自适应弹性:Serverless 实例采用动态资源调整策略,可根据实时业务负载自动弹性伸缩,企业因此无需预先估算并配置实例规格,减少资源闲置浪费。 + 按实际使用量付费:根据实际使用的消息量、Topic 资源、网络流量、存储等资源计算费用,并按小时结算,真正实现按量付费,从而节省使用成本。 展望未来:加速非洲普惠金融发展创新 通过采用阿里云云消息队列 RocketMQ 版,PalmPay 成功构建了一套高可用、高可靠、高弹性的消息中间件体系,全面提升了系统的稳定性、消息处理效率与业务连续性。云消息队列 RocketMQ 版在支付消息通知、高并发交易处理以及资源弹性伸缩等方面发挥了关键作用,有力支撑了 PalmPay 在非洲市场快速增长的数字支付需求。 借助云消息队列 RocketMQ 版的高性能、低延迟和灵活扩展能力,PalmPay 实现了支付业务的异步化、解耦化与智能化升级,不仅优化了用户体验,也显著提升了系统运维效率和业务响应能力。未来,随着 PalmPay 持续拓展更多金融服务场景,云消息队列 RocketMQ 版将继续为其提供坚实的技术底座,助力其在非洲乃至全球推动普惠金融的深入发展与数字化创新。
作者:横槊、建源、文婷、稚柳
#行业实践

2025年10月28日

Apache RocketMQ for AI 荣获 2025 年度 OSCAR “开源+人工智能”典型案例
2025 年 10 月 28 日,中国通信标准化协会在北京举办第九届 “OSCAR 开源产业大会”,重磅发布 2025 年度 OSCAR“开源+”典型案例评选结果。 近年来,开源技术驱动各领域飞速发展,深刻影响信息技术产业发展格局。为深化开源技术在各行业的融合应用,普及开源文化,推动形成“众研、众用、众创”的开源生态格局,中国信息通信研究院联合中国互联网协会启动 2025 年度 OSCAR“开源+”典型案例征集工作,旨在挖掘具有行业范式价值的典型实践,促进各行各业经验共享,为数字中国建设注入新动能。 其中,Apache RocketMQ for AI:AI 应用全链路异步解决方案 荣获 “开源+人工智能”专项类别典型案例。 2025 OSCAR “开源+人工智能”专项类别典型案例聚焦人工智能领域开源突破,重点征集大模型及具身智能开源、AI 数据集开放、AI 原生应用开发工具链等前沿实践。 Apache RocketMQ 正在从传统消息中间件演进为专为 AI 时代打造的消息引擎,通过 LiteTopic 轻量模型、消息优先级、定速消费等核心能力,为企业级 AI 应用的多智能体(MultiAgent)通信、大规模任务调度、长会话状态管理等场景提供全链路异步解决方案。 MultiAgent 异步通信 LiteTopic 模型凭借其百万级资源创建、自动化生命周期管理和顺序性保障,高效适配 Agent2Agent 异步通信需求,有效解决长耗时调用阻塞和 MultiAgent 应用协作扩展性难题。 AI 任务智能调度 RocketMQ 作为请求和后端算力服务的缓冲区,可平滑请求流量,最大化 AI 算力利用率;消费者限流能力保障核心 AI 算力服务稳定性;消息优先级将有限算力资源优先分配给高优先级请求任务。 分布式会话状态管理 当用户与应用服务开启会话时,RocketMQ 动态为该会话创建一个以会话 ID(SessionID)作为唯一标识的专属消息队列(LiteTopic)。所有会话相关数据,如历史记录和中间结果,均作为消息在此队列中有序传递。该架构实现了应用“无状态化”,显著简化开发复杂度、提升用户交互体验,并有效减少会话重试资源浪费。 目前,RocketMQ for AI 的核心特性已在阿里云云消息队列 RocketMQ 版产品中发布,后续将逐步贡献到开源社区。相关能力已在阿里巴巴集团内部以及阿里云大模型服务平台百炼、通义灵码等产品中经过大规模生产环境的验证,且取得显著成效,充分证明了其在高并发、复杂的 AI 场景下的成熟度与可靠性。 展望未来,我们将持续在 AI 领域进行技术迭代与创新,并积极与生态伙伴(如业务框架、AI 平台等)合作集成,共建高效、智能的 AI 应用基础设施,并逐步将经过阿里集团 AI 业务验证过的方案与特性,持续反馈到开源社区。 欢迎钉钉搜索扫码加入 RocketMQ for AI 用户交流群(群号:110085036316),与我们交流探讨~
#社区动态

2025年10月23日

2025 OSCAR丨与创新者同频!Apache RocketMQ 邀您共赴开源之约
10 月 28 日,北京将迎来开源行业的一场重磅盛会——2025 OSCAR 开源产业大会。 本次大会汇聚全球开源领域的思想领袖、技术专家、企业高管以及创新实践者,围绕开源技术的未来、产业创新及生态协同展开深度讨论。 大会将围绕人工智能开源、软件供应链安全、开源商业化等多个核心议题展开深度对话,通过顶级专家的演讲、精彩的分论坛以及技术展示,与会者将获得关于开源技术的最新洞察和行业趋势,并探索开源技术如何在数字化转型、行业创新及社会价值创造中发挥关键作用。 现在,我们诚挚邀请您参与 OSCAR 开源产业大会,与开源人相聚,与创新者同频,一起探索开源技术的发展,共同见证开源生态的繁荣。 关于 OSCAR “OSCAR 开源产业大会”是由中国通信标准化协会主办,中国信息通信研究院承办,中国信息通信研究院云计算开源产业联盟、金融行业开源技术应用社区、通信行业开源社区、科技制造开源社区、汽车行业开源社区、可信开源社区共同体、可信开源合规计划支持的开源领域顶级盛会。 大会旨在汇聚全球开源领域的专家、学者、企业代表及社区成员,共同探讨开源技术的最新趋势、治理实践,以有效应对技术风险、法律合规、供应链韧性和人才短缺等挑战,从而寻求推动中国开源生态健康、有序发展的策略,促进开源与产业深度融合,为全球开源事业的繁荣贡献中国的智慧与力量。 开源项目及商业化论坛 Apache RocketMQ 议题 AI 应用的高延迟、长周期、强状态和资源密集等特性,对底层基础设施提出了前所未有的工程挑战。 传统消息队列在应对 AI 原生(AINative)场景的动态性、状态性和资源敏感性方面已显局限,面对这一变革,ApacheRocketMQ 完成了战略性升级,从传统消息中间件演进为面向 AI 时代的消息引擎。 本次分享将介绍如何基于 Apache RocketMQ 的新特性构建异步化 MultiAgent 系统,深入探讨 Agent 间的异步通信、上下文隔离、状态恢复与任务编排机制,并通过实际案例展示如何利用 RocketMQ 实现 MultiAgent 的任务调度。 (温馨提示:大会开源集市设置了 Apache RocketMQ 展位,欢迎前来交流,参与有奖互动~) 报名链接 扫描下方二维码 或【】立即报名 大会议程
#社区动态

2025年9月28日

海量接入、毫秒响应:易易互联基于 Apache RocketMQ + MQTT 构筑高可用物联网消息中枢
易易互联:打造安全、便捷、便宜的智能换电网络 易易互联科技有限公司成立于 2017 年,是吉利集团旗下汽车产业战略布局换电生态的全资子公司。依托吉利正向开发的 GBRC 换电平台架构,基于电池共享、车辆全生命周期运营,沉淀千项专利技术,积极参与国家换电标准制定,打造安全、便捷、便宜的智能换电网络,为营运市场提供更降本、更高效、更绿色的综合解决方案,让换电成为营运补能第一选择。 截至 2025 年 4 月,易易互联已布局和运营超 470 座换电站,覆盖了重庆、杭州、广州、成都、天津等 40 多个城市。计划到 2027 年,在全国建设并运营 2000 座换电站。 业务挑战:物联网通信架构如何支撑大规模换电网络? 随着换电网络的快速扩张,终端设备数量呈指数级增长,对底层物联网通信架构提出了前所未有的挑战。易易互联在采用 MQTT 协议支撑智能换电系统的过程中,面临三大核心业务难题: 1. 海量设备高并发连接与系统稳定性挑战 每座换电站集成机械臂、电池仓、充电模块、车辆识别系统、温控系统等多种物联网设备,同时需接入数万辆支持换电的营运车辆,形成一个终端数量庞大、分布广泛、持续在线的超大规模物联网网络。在此背景下,MQTT 服务必须支持数十万级设备的长连接并发接入。任何连接抖动或异常中断,都可能导致换电流程失败、车辆排队积压,严重影响用户体验。尤其在运营高峰时段,连接稳定性直接关系到换电效率与服务可用性,一旦出现系统级服务中断,将造成巨大的损失。 2. 高实时性与低延迟通信的严苛要求 换电流程高度依赖自动化与系统协同:从车辆进站识别、人车电池三者信息核验,换电指令下发、机械臂执行操作、车辆驶离,支付结算完成,整个流程需在几十秒内高效闭环。这对通信链路的实时性与端到端延迟控制提出了极高要求。尽管 MQTT 协议支持 QoS 机制保障消息可靠性,但在复杂场景下,仍可能出现延迟上升、响应滞后等问题。如何在确保消息不丢失的前提下,实现毫秒级指令响应与状态同步,成为系统架构设计的核心技术难点。通信延迟过高将直接拉长单次换电耗时,降低站点吞吐能力,削弱“高效补能”的核心竞争力。 3. 数据洪峰冲击与消息积压风险 在早晚交接班等用电高峰时段,多个换电站可能同时迎来大量车辆集中换电,短时间内产生海量设备状态数据上报,包括电池 SOC、温度、健康状态(SOH)、换电次数、故障码等关键信息。这种瞬时数据洪峰对 MQTT Broker 的消息吞吐能力构成巨大压力。若后端的数据分析、计费系统、调度平台等消费端处理能力不足,极易导致消息在 Broker 端积压,造成数据处理延迟。这不仅影响电池调度策略的实时优化和异常预警的及时触发,还可能引发计费延迟或错误,进而导致用户投诉,损害服务可信度。 综上所述,易易互联的业务高速增长对 MQTT 通信平台提出了“高并发、低延迟、强可靠、大吞吐”的综合要求。如何构建一个稳定、高效、可扩展的物联网消息中枢,已成为支撑其全国换电网络规模化运营与智能化升级的关键基础设施挑战。 解决方案:基于阿里云 MQTT+RocketMQ构建物联网消息中枢 为应对上述挑战,易易互联依托阿里云消息队列 MQTT 版与云消息队列 RocketMQ 版,构建了“边缘高效接入 + 中心弹性处理”的物联网通信新范式,全面提升系统的稳定性、可扩展性与业务连续性。 1. 基于阿里云 MQTT,实现海量终端的高效、稳定接入 阿里云消息队列 MQTT 版具备百万级并发连接能力与高可用集群架构,完美匹配易易互联换电站规模、数十万终端设备(换电站设备 + 营运车辆)的规模化接入需求。 + 高并发长连接支持 :通过云消息队列 MQTT 版的分布式集群架构,轻松支撑数万至数十万设备的稳定长连接,确保换电站机械臂、电池仓、温控系统及车辆终端始终在线、指令可达。 + 弱网优化与断线重连 :云消息队列 MQTT 版内置智能心跳机制与断线自动重连策略,有效应对地下车库、郊区等弱网环境,保障换电流程不因网络抖动中断。 + 端到端安全认证 :云消息队列 MQTT 版支持基于 X.509 证书、Token 的设备级身份认证,结合 Topic 级别的访问控制策略(ACL),杜绝非法设备接入,保障电池资产与系统安全。 2. 集成阿里云 RocketMQ,实现消息缓存与负载均衡 在高并发换电场景下,瞬时数据洪峰易导致后端系统过载。通过将阿里云消息队列 MQTT 版的消息流转至云消息队列 RocketMQ 版 ,构建“MQTT 接入 + RocketMQ 中转”的协同架构,实现消息的高效解耦与弹性处理。 + 消息缓冲与削峰填谷 :当早晚高峰出现大量车辆集中换电时,云消息队列 MQTT 版将设备上报的状态数据(如电池 SOC、温度、故障码)实时转发至云消息队列 RocketMQ 版。RocketMQ 作为高性能消息中间件,可缓冲突发流量,避免后端计费、调度、监控系统因瞬时压力过大而崩溃。 + 异步解耦与负载均衡 :后端业务系统(如电池调度平台、故障预警引擎、用户计费系统)以订阅方式从云消息队列 RocketMQ 版消费消息,实现生产与消费的异步解耦,提升系统整体吞吐能力与响应速度。 + 消息有序与可靠投递 :针对换电流程中的关键指令(如“换电抬杆”、“开始换电”、“电池锁定”),可通过云消息队列 RocketMQ 版的顺序消息能力保障执行顺序;结合事务消息机制,确保支付结算等关键操作的最终一致性。 业务价值:海量接入、实时响应、弹性处理、安全可信 面对换电生态高速发展的通信挑战,易易互联通过采用阿里云 MQTT + RocketMQ 的融合解决方案,成功构建了“海量接入、实时响应、弹性处理、安全可信”的物联网通信底座。该架构不仅显著提升了系统稳定性与可扩展性,更保障了高并发场景下的业务连续性,为实现“让换电成为营运补能第一选择”的战略目标提供了坚实的技术支撑。 核心业务价值如下: + 实现实时数据处理 :阿里云 MQTT 通过轻量级发布/订阅模式,实现终端设备与云端的毫秒级通信,确保换电状态、车辆行为、电池健康等数据的实时采集与响应,支撑全流程可视化与自动化控制。 + 实现高度可扩展性 :支持横向弹性扩展,轻松应对从 470 座到 2000 座换电站的设备接入需求,无需重构通信架构,支撑业务持续扩张。 + 保障系统可靠性与稳定性 :MQTT 支持多级 QoS 服务质量,结合 RocketMQ 的持久化与重试机制,确保关键消息不丢失、不重复,保障计费准确、指令可靠、资产安全。 + 提升整体性能与吞吐能力 :RocketMQ 的集群消费模式提供原生负载均衡机制,支持多消费者并行处理,显著提升后端系统的消息处理效率,确保高并发场景下的高效稳定运行。 展望未来,随着易易互联向“2027 年在全国建设并运营 2000 座换电站”的目标稳步迈进,这套基于阿里云构建的物联网消息中枢将持续发挥其核心引擎作用。它不仅为当前的业务运营提供了坚实保障,更为未来的智能化升级奠定了核心基础。易易互联与阿里云的成功合作,不仅是技术与业务的深度融合,也为整个新能源换电行业树立了数字化转型的标杆。通过持续的技术创新与架构演进,易易互联正加速推动其“让换电成为营运补能第一选择”的愿景成为现实,引领绿色出行新时代。
#行业实践

2025年9月28日

构建企业级 AI 应用:为什么我们需要 AI 中间件?
前言 9 月 26 日,2025 云栖大会 AI 中间件:AI 时代的中间件技术演进与创新实践论坛上,阿里云智能集团资深技术专家林清山发表主题演讲《未来已来:下一代 AI 中间件重磅发布,解锁 AI 应用架构新范式》,重磅发布阿里云 AI 中间件,提供面向分布式多 Agent 架构的基座,包括:AgentScopeJava(兼容 Spring AI Alibaba 生态),AI MQ(基于Apache RocketMQ 的 AI 能力升级),AI 网关 Higress,AI 注册与配置中心 Nacos,以及覆盖模型与算力的 AI 可观测体系。阿里云 AI 中间件核心技术全面开源,全面拥抱行业标准,加速企业级 AI 应用规模化落地。 AI Agent 大爆发 自从 ChatGPT 掀起了大模型浪潮,短短三年间,AI 技术几乎以月为单位迭代:从能对话、写文案的 Chatbot,进化到协助编码与办公的 Copilot,如今进一步迈向具备自主规划、工具调用、记忆与协作能力的 Agent。AI 不再只是“会说话的软件”,而是能理解目标、拆解任务、选择策略并执行的 “ 行动主体 ”,能够在复杂场景中进行多轮推理与闭环反馈。 如果说 2022 年是 Chatbot 的起点,2023 年是 Copilot 的浪潮,那么 2025 年,注定是 Agentic AI 全面爆发的元年。我们来回顾一下近几年的演进路线图。 + 2022 年,Chatbot 进入大众视野。以 GPT3 为代表,大家用它生成文本、做自然语言交互——简单客服、文案写作是典型场景。但它仍然是“被动应答”。 + 2023 年,Copilot 革命来了。GPT4 带来更长上下文、多模态输入,加上检索增强(RAG)的成熟,让 AI 从“被动回答”升级为“主动协作”。开发者使用代码 Copilot,写代码效率可以提升 50% 以上;Office Copilot 深度融合 AI 技术于 Word、Excel、PPT 等应用中,深刻改变传统办公模式,大幅减少重复性工作。AI 开始深度融入企业核心工作中。 + 2025 年,我们将迎来“能理解、会规划、能协作、敢行动”的 Agentic AI 的集中爆发。一方面,从 GPT5、Qwen3 到 DeepSeek,模型在自主规划、多模态融合能力上高速发展;另一方面,针对 Agent 和工具的集成,多 Agent 通信的能力,也在走向标准化,如 MCP、A2A 标准。应用层面,人形机器人走上春晚舞台;企业里的“数字员工”开始上岗,如财务 Agent 能够自动审批流程,AI 的角色从辅助者走向“能自主决策、能自动闭环”的主体。 有几组数字也在印证这一发展趋势:根据 Gartner 行业报告,全球 Agent 市场正以 44.5% 的年复合增长率扩张,预计到 2028 年将达到近三千亿美元规模;将有超过 15% 的企业日常决策由 Agent 自主完成;而有三分之一的企业软件,会原生嵌入 Agentic AI 能力。 这不是一阵风,而是一个长周期的结构性趋势。技术、政策、市场这三股力量,正在同频共振,形成加速度。 1. 技术突破:开源模型的迅速成熟降低了门槛,推理成本呈断崖式下降,多模态和大小模型的协同拓宽边界。这让 Agent 不再是“概念验证”,而是可以规模化落地。 2. 政策引导:中国“十四五”明确把 AI 纳入新基建,国务院“人工智能+行动意见”为中国 AI 产业指明方向;美国、欧盟也在同步发力,形成全球共振。 3. 市场需求:企业端急需提效降本,消费端渴望更沉浸的体验,资本市场面向 AI 赛道持续加码,大量 AI 创业公司涌现。 技术降低门槛,政策扫清障碍,市场反哺研发,形成闭环。算法、算力与场景叠加,AI 从实验室跃迁万千行业,已经势不可挡。 过去十年企业为了拥抱移动互联网浪潮,实现数字化转型,纷纷投入云原生应用的研发;而今天,企业要抓住 AI 的浪潮,实现智能化转型,更需要投入 AI 原生应用的研发,把 AI 的能力融入到自己的核心业务中。 AI 原生应用架构 AI 原生应用是指从架构设计到功能实现均以人工智能技术(大模型)为核心驱动力的应用。它将 AI 能力深度嵌入系统底层,通过数据驱动决策、动态模型演化和端到端 AI 流程重构业务逻辑。 和传统应用相比,它带来四方面的根本性变化。 + 第一是交互界面。过去我们点击按钮、填写表单;现在我们对话、拖拽、上传图像或音频,进行多模态的“共创”。 + 第二是业务逻辑。过去是规则驱动、静态代码、确定性执行;现在是数据驱动、动态推理、概率性决策,系统可以在不确定中找到更优路径。 + 第三是技术栈。传统的“微服务+关键词检索+关系型数据库+CPU”的组合,正被“Agent 智能体+语义检索+知识图谱+向量数据库+GPU”所取代。 + 第四是架构哲学。以前我们用 RPC、消息队列把微服务拼接起来;现在以 Agent 为执行单元,用上下文工程、记忆机制、工具调用来实现自主决策与行动。我们从“流程自动化”迈向“认知自动化”,软件范式从“人适应机器”,转向“机器理解人”。 这场变革不是空中楼阁,它有一条清晰的历史脉络。 回看传统应用架构的演进,从单体到微服务再到云原生,本质是业务复杂度上升和技术红利出现的合力。早期的单体应用,只要把交易、商品、支付、物流这些核心功能堆在一起就够了。但移动互联网爆发后,业务碎片化、场景多样化,迫使我们用微服务把系统“解耦”,让不同团队能并行迭代,敏捷开发;海量访问压力需要系统能够灵活弹性扩展;基于云计算提供的弹性基础设施,结合微服务、容器技术、和可观测体系,则让资源调度更智能、故障恢复更自动。那是一条从“大泥球”走向“乐高拼装”的道路,也为今天的 AI 原生应用打下了分布式协作的基石。 AI 应用自身也在沿着一条清晰的路径进化:从单体的 Chatbot,走向分布式的 Agentic AI。 业务层面的驱动力是,从“对话”走向“行动”。单体 Chatbot 只能回答问题;而在很多企业里,我们需要 AI 深度参与业务流程,承担企业不同业务模块的职责,不同业务板块有不同的上下文、工具、业务流程,大模型在注意力聚焦的情况下具备更好的准确性,这就推动架构走向多 Agent 协作,每个 Agent 专注于自己的核心能力。比如我们要构建一个全栈 Web 开发平台,支持用户用自然语言构建 Web 应用,Agentic AI 将构建一个“数字员工团队”,包含开发 Agent、产品 Agent、文档 Agent,多 Agent 自动协作,最终完成构建 Web 应用的完整任务。 技术层面,这种升级依赖几个关键突破:模型从理解到规划,从“会答”到“会想”(比如深度思考模式、ReAct);Agent 范式形成统一认知,它是一个具备“感知—决策—执行”能力的实体,记忆层存储历史经验(向量库+KV),决策层由大模型驱动进行规划(ReAct、思维链),行动层通过工具调用与多 Agent 协作来完成;关键标准也在逐渐形成,如 MCP 让智能体工具调用有了通用标准,A2A 把智能体分布式协作也定义了统一语言;而云原生能力,让这些智能体可以充分利用云的弹性能力,按需弹性、按量付费。 但是,要构建企业级 AI 应用,远不止“会调几个大模型 API”这么简单,将面临以下几个挑战。 + 第一类是开发效率。今天做一个像样的 Agent,往往要手工编织记忆、决策、工具调用的细节,缺少开箱即用的框架和工具,开发效率不高。更不用说随着业务发展,AI 应用持续扩展业务能力,如何实现业务敏捷迭代也是个难题。 + 第二类是集成。要提升模型准确性以及业务深度融合,不可避免要采用上下文工程技术,引入 RAG 架构。RAG 架构往往涉及多源数据接入,数据 ETL。要构建实时可用的企业知识库,将面对异构系统对接、数据治理等复杂工程。更别提如何让全新的 AI 应用和存量的传统系统交互集成,如何将存量系统纳入 Agent、MCP 的集成体系。 + 第三类是稳定运行和持续优化。多 Agent 协作带来复杂的调用链路,推理 Tokens 流量波动大,延迟不可控,传统体系难以灵活弹性扩展;稳定与安全也充满风险——模型幻觉可能造成决策偏差,工具调用可能越权,A2A 通信链路里可能泄露敏感数据;更糟糕的是,可观测性不足,一旦出现故障,你将在 10 个 Agent 协同的链路里找不到故障点。没有评估体系,很难衡量 Agent 的决策质量;数据质量会随着业务环境变化而衰减;模型一旦迭代,推理行为也可能漂移,今天还很稳的客服 Agent,明天可能输出不合规的内容。 而这些问题,单靠大模型内置的能力解决不了,它们需要一整套工程化、平台化的“中间层”来承接。 而这个中间层便是 AI 中间件的定位。它在 AI 应用与大模型之间,承担三件事。第一是连接与集成:把大模型、工具链、数据存储、微服务这些碎片打通,让 Agent 可以无缝调用知识、使用工具、对接业务。第二是能力抽象:把 A2A 通信、状态管理、数据集成这些“非业务共性能力”封装起来,屏蔽底层复杂度,让开发者聚焦业务。第三是工程化支撑:提供可观测、安全治理、弹性扩缩容等企业级能力,保障生产环境的稳定性与高效运维。 为什么说它是规模化落地的最后一块拼图?很简单。它能帮你突破 POC 陷阱——很多原型之所以上不了线,不是因为算法不行,而是工程体系不行。换句话说,AI 中间件是企业级 AI 应用的“操作系统”。它把 Agent 从“技术概念”变成“真正的生产力引擎” ,打通落地的最后一公里。 阿里云 AI 中间件全新发布 为了应对规模化落地企业级 AI 应用的挑战,我们发布了全新的阿里云 AI 中间件,它是一个面向分布式多 Agent 架构的基座,核心技术全面开源,全面拥抱行业标准。 它包括:AgentScopeJava(兼容 Spring AI Alibaba 生态),AI MQ(基于Apache RocketMQ 的 AI 能力升级),AI 网关 Higress,AI 注册与配置中心 Nacos,以及覆盖模型与算力的 AI 可观测体系。 我们的目标是:把复杂留下,把标准、能力、工具开放出来,让企业不用重复从头搭架子。 AI 编程框架AgentScope 首先是 AI 编程框架,Spring AI Alibaba 内核正式升级为 AgentScopeJava,这是 Java 生态拥抱 Agentic AI 的重要里程碑。对数百万 Java 开发者来说,一套熟悉的开发范式,就能构建企业级智能体应用。AgentScopeJava 为 Java 开发者提供最低学习门槛的 AI 开发框架。 它的架构有两层,在核心编排层,提供 Agentic API,开发者能够声明式地定义 Agent 的记忆、决策、工具调用等能力,代码复用明显提升;原生支持 MCP 协议,便于扩展工具与伙伴智能体;支持流式通信,端到端的交互延迟显著降低;支持 Humanintheloop,把关键决策交回给人审核。在安全运行时层面,它提供沙箱隔离,阻断越权工具调用;提供上下文管理,动态维护短期状态与长期记忆;并且原生支持 A2A,实现分布式多 Agent 自动编排与协作。 AI MQApsaraMQ 再说 AI 通信与集成的“中枢”——AI MQ。传统消息队列是为交易类、日志类场景设计的,到了 Agent 时代,它们很多特性不够用,因此我们对 ApsaraMQ 面向 Agentic AI 做了全方位能力升级,ApsaraMQ 是基于 Apache RocketMQ 构建的云原生消息服务。 在消息存储引擎(Apache RocketMQ 内核)方面,支持百万级 Topic 资源管理、百万级队列存储、百万级订阅分发。 得益于消息存储引擎的增强,我们推出了面向 AI 场景的消息模型 LiteTopic,它具备轻量资源、有状态异步通信的特性,可实现 AI 多轮对话 Session 保持、Session 级顺序流式输出、Agent 2 Agent 的可靠通信、多模态大消息体(50MB 以上)。提供面向 AI 稀缺算力的消费调度模式,包括优先级、定速、权重等模式,最大化资源有效利用率。提供 AI 数据集成,支持多数据源实时构建知识库,构建实时 RAG 架构;支持事件流异步推理,批量异步推理;支持流式 AI ETL 处理。 聊到分布式多 Agent 架构,就必须说 A2A 通信机制。目前在分布式多 Agent 通信实现上,一开始往往采用同步调用的方式,一旦业务继续发展,Agent 之间的交互、依赖关系也会变得更加复杂。同步调用的模式将面临多重痛点: + 多 Agent 多次 LLM 调用,全同步调用延迟高,降低客户体验。 + 上下游 Agent 吞吐量难以完全对齐,易出现部分 Agent 流量过载、雪崩。 + AI 任务涉及多 Agent 协作完成,单点 Agent 失败,没有可靠重试,导致任务整体失败,浪费中间过程的算力资源多。 + Agent 调用同步强依赖,如串联电路,可用性降低。 + 在算力资源不足的情况下,Agent 无法实现任务优先级处理。 基于 AI MQ 的全新特性 LiteTopic 实现 A2A 可靠异步通信模式则可以免除这些痛点,让系统具备更好的可扩展性: + 主 Agent 任务规划完成,可异步并发请求多个 Agent,缩短任务完成时间。 + Agent 解除调用强依赖和吞吐量强依赖,可用性提升。 + Agent 通信的请求和处理结果均持久化到 MQ,具备 checkpoint 能力,基于 MQ 的可靠重试,无中间资源浪费。 + 有限资源下,Agent 支持按优先级处理任务,比如优先处理付费用户。 + 基于 1 对多的发布订阅通信模式,系统具备良好的扩展能力,可以异步构建历史对话存储、Agent 记忆等。 AI 网关Higress 第三块是 AI 网关 Higress。它提供统一接入与管理,能同时接入不同的大模型、MCP 服务、Agent,做协议兼容与流量调度,实现智能路由与负载均衡。在 LLM 层,统一调度通用大模型与垂类小模型,支持语义缓存和灾备降级;在 MCP 层,支持通过协议转换复用存量微服务,让企业多年积累的数字化资产快速转换为 AI Agent 的工具;在 Agent 层,提供 REST 到 A2A 的协议桥接,打通传统微服务与智能体协作。 它也具备企业级安全治理能力,包括 Token 限流、敏感信息过滤、WebSocket 无损变更、零信任鉴权,保障服务稳定与数据安全。 最新版的 Higress 还发布了 AI 开放平台 HiMarket,让企业可以把自有 Agent/MCP 服务标准化发布,按调用量进行精细化运营。 AI 注册/配置中心Nacos 第四块是 AI 注册和配置中心 Nacos。在微服务时代,Nacos 国内市场占有率高达 70% 以上,包含 Azure 在内的海内外主流云厂商都提供了 Nacos 托管产品。面向 AI 时代的今天,Nacos 3.1.0 版本重磅发布,围绕 AI Agent 解决 AI A2A 场景和 AI Tools 场景的问题。 在 AI 工具方面,Nacos 支持了 MCP Registry 官方协议,无需任何代码改造,就能将传统应用快速转变为 MCP Server 并动态统一管理。 在多 Agent 协作方面,Nacos 是首个支持 A2A 协议的注册中心。Agent 可以将描述自身能力的 AgentCard 注册到 Nacos,其他 Agent 只需填写 Nacos 地址,即可实现分布式多 Agent 的能力编排,让 Agent 的分布式协作,像普通应用一样的顺滑和稳定。 在配置管理方面,AI 时代下,API Key 等凭证的安全至关重要。Nacos 提供动态加密配置能力,支持敏感数据的加密存储与安全推送,有效保障 AI 资产的安全性。此外,基于 Nacos 的动态配置推送能力,还能实现 Agent 能力的动态更新——无需重新部署,即可灵活调整 Agent 运行时的配置。 AI 可观测 第五块是AI 可观测。做 AI 应用的同学都有体会:传统可观测体系,面对大模型与智能体协作,往往力不从心。我们做的是一次“从 IaaS 到 MaaS”的全栈、一体化进化,目标是为企业提供一个智算应用的“上帝视角”。这个体系有四根支柱: + 端到端诊断:对主流 Agent 与应用框架实现无侵入追踪,不管是复杂的 RAG,还是多 Agent 协作,都能把调用链清清楚楚地呈现出来,帮助快速定位故障与性能瓶颈。 + 全栈智算监控:从底层 GPU、网络、存储,到容器编排,再到模型推理、训练,以及上层应用与智能体交互,建立多维度的指标与日志体系,实时感知全局健康度。 + 精细成本管理:对 Token 消耗、GPU 利用率做多维分析,与网关联动支持预算限额、访问限流、智能路由,帮助企业在探索 AI 价值的同时,把成本效益做到最好。 + 质量与安全评估:覆盖从输入、规划到输出的全流程,自动化评估语义准确性、内容质量、偏见与安全风险,为上线运行提供“持续体检”,让输出更安全、更可控、更合规。 展望 AI 从来不是一个单点技术,而是一张系统工程的“网”。大模型是大脑,工具是四肢,数据是血液,算力是肌肉,而中间件,是把这一切组织起来的“神经系统”和“骨架”。阿里云 AI 中间件核心技术目前已全面开源,包括 Nacos、Higress、Apache RocketMQ、AgentScopeJava 等等,未来也将持续围绕开源生态和社区开发者一起共建、共同成长。我们相信,只有把标准和能力开放出来,行业才会更快地形成“共识与共用”,企业才会更专注于业务创新,少做重复劳动。我们愿意和用户、社区一起,推动下一代 AI 基础设施的标准化、工程化。 未来两三年,我们将见证越来越多的企业,从一个个“单体 Chatbot”,走向一个个“可协作、可演进、可闭环”的数字员工团队;我们也将见证 AI 在生产线、仓储、金融风控、客服中台、研发协作等场景的落地,真正带来效率、质量与体验的跃迁。
#技术探索