最新文章

2026年5月27日

Apache RocketMQ 5.5.0 开源 LiteTopic:百万级 AI 会话专属通道
Apache RocketMQ 5.5.0 已正式发布。本次版本的重要特性之一,是社区提案 RIP83 定义的全新消息模型 LiteTopic 进入开源版本。LiteTopic 面向 AI Agent、异步任务和海量轻量会话场景,支持百万级轻量会话通道共存,并在轻量通道管理、消费状态持久化和事件驱动分发等方面进行了针对性设计。此前,阿里云云消息队列 RocketMQ 版已围绕相关 AI 通信场景提供能力;随着 LiteTopic 进入开源版本,这一消息模型也首次以开源形式面向全球开发者开放。 01 行业正在收敛:Agent 异步通信成为共识 在 RocketMQ 5.5.0 发布前后,AI 行业的协议和框架演进,也在不约而同地指向同一个方向。  高度关注 Transport 可扩展性、Agent Communication、任务生命周期管理以及会话状态外化等问题。其核心矛盾在于:有状态 Agent 会话与负载均衡、水平扩展之间存在天然冲突,而任务失败后的重试、结果过期后的清理等生产级生命周期语义,也需要更稳定的运行时支撑。  也发布了 Long Running Agent 方案,指出标准对话循环并不适合所有长时任务场景,系统需要更显式的状态持久化、事件驱动唤醒,以及多 Agent 委托协同。 两条路径,收敛到同一组基础设施需求: 1. 海量会话通道:每个 Agent 会话都需要独立通信通道,不能因水平扩展而丢失 2. 状态持久化与断点续传:服务重启、冷启动后,需要能够从上次中断处精确恢复 3. 异步生命周期管理:任务失败有重试,结果过期有回收,调度由事件驱动而非客户端轮询 这三个需求,不只是协议层或框架层的问题,也对消息通信基础设施提出了更高要求。 02 LiteTopic:AI 时代消息通信的开源答案 面对 AI 时代海量轻量会话、状态持久化与事件驱动调度等消息通信基础设施需求,Apache RocketMQ 社区在 2025 年发起 RIP83(Lite Topic: A New Message Model)提案,从消息模型层面设计了应对方案。随着 5.5.0 发布,LiteTopic 已进入开源版本。 为什么不能用传统的 Topic + Consumer Group 组合来解决?核心原因是:当每个 Agent 会话都使用独立 Group 订阅独立 Topic 时,读请求会随 Topic 数量快速膨胀;Broker 轮询全量 Topic 检查新消息,无效扫描的 CPU 开销也会随规模线性增长;传统 Topic 需要预先创建,无法很好适配按需动态分配;而 Group 模型天然偏向共享消费,也不完全适合会话级独占订阅。 LiteTopic 的设计目标,就是绕过这些限制。三个行业需求,对应三类优化方向: 需求 1 → 父 Topic + 动态子 Topic LiteTopic 采用“父 Topic 命名空间 + 轻量子 Topic 会话通道”双层结构。父 Topic 除了起到隔离命名空间的作用之外,还承担同一业务域的集中管控,解决海量 Topic 场景下的服务发现问题。子 LiteTopic 支持按需创建和轻量化管理,底层以 RocksDB 替代传统 ConsumeQueue 文件,从而提升单个 Broker 对海量 LiteTopic 的承载能力。 需求 2 → Broker 侧消费位点持久化 LiteTopic 将消费位点以“内存快照 + 增量持久化”方式存储在 Broker 端。Agent 节点异常重启后重新订阅时,可以基于已保存状态继续消费,降低业务层自行维护会话状态的复杂度。与 Google ADK 的 `DatabaseSessionService` 解决的是同一类问题,区别在于 LiteTopic 将这类能力内置于消息队列层,无需业务层另行实现。 需求 3 → 事件驱动调度 + 生命周期自管理 面对大量低频、分散的轻量主题,如果仍采用传统全量轮询方式,系统会在无效扫描上消耗较多资源。LiteTopic 通过事件驱动的 Ready Set 结构,在有消息写入或可读事件触发时再进行精准唤醒,而非全量轮询。同时,RocketMQ 本身具备的消费重试能力,也为 Agent 通信提供了生产级的失败兜底机制。订阅关系更贴近 ClientID 维度而非传统 Consumer Group 维度,Agent 节点上下线时也更有利于降低集群级重平衡带来的开销。 LiteTopic 与传统 Topic 的核心差异如下: | 维度 | 传统 Topic | LiteTopic | | | | | | 创建方式 | 需预先配置 | 首次消息发送时自动创 | | 索引存储 | ConsumeQueue 文件(每 Queue 独立文件) | RocksDB(统一键值管理) | | 可承载规模 | 万级(性能开始衰退) | 百万级共存 | | 生命周期 | 手动创建/删除 | 动态创建,自动回收 | | 消费者绑定 | Consumer Group 集群级绑定 | ClientID 客户端机器级绑定 | | 重平衡 | 集群级全局重平衡 | 仅更新单 Topic 绑定记录,降低海量轻量主题场景下的调度开销 | | 消息分发 | 每个 Topic 单独读请求 | 合并读取,事件驱动 | 03 核心机制深度拆解 3.1 RocksDB:百万级共存的存储基础 传统 Topic 的每个 Queue 对应一个 ConsumeQueue 文件——百万个 Topic 意味着数百万个小文件,文件系统碎片化读写是性能衰退的根因。 LiteTopic 将索引层切换为 RocksDB,以键值对方式统一管理消费位点: 写入路径不变:消息仍走 CommitLog 顺序追加,不改变高性能写入链路 索引统一管理:一个 RocksDB 引擎支撑海量 LiteTopic 共存 结合 TTL 自动回收机制,开发者无需长期手动清理历史会话资源,也能降低历史会话持续累积带来的资源占用压力。这使 LiteTopic 更适合承载大量轻量、动态的会话通道,也更适合长期运行的 AI 应用场景。 3.2 事件驱动 Ready Set:减少无效扫描的调度跃迁 传统 Pop 模式以长轮询驱动,消费者周期性向 Broker 发起请求,Broker 遍历全量主题检查新消息。如果百万级 LiteTopic 仍使用这种读模式,全量扫描效率低,会消耗大量 CPU。 5.5.0 引入专用的 LiteTopic 专用的事件驱动读取机制: Broker 内部维护“就绪集合” 有新消息写入或者可读事件触发时,才将对应 LiteTopic 放入就绪队列 事件触发时直接定位订阅的客户端,实现按需精准唤醒 这与 Google ADK Long Running Agent 的“wake up only when an external event arrives”在设计思路上是一致的——区别在于 LiteTopic 是消息队列层面的原生机制。 3.3 消费位点持久化与会话续传 LiteTopic 的两个关键语义,构成了 Agent 通信稳定性的基础: 订阅关系更贴近单会话通信语义:与传统消费模式中 Consumer Group 共享消费进度不同,LiteTopic 的订阅关系更贴近特定客户端连接,有助于降低海量会话并发场景下的系统调度复杂度。Broker 仅对绑定连接投递消息,Agent 节点上下线只更新该条绑定记录,不触发集群级重平衡——这是百万级会话并发场景下系统稳定性的核心保障。 消费位点 Broker 持久化:内存快照 + 增量持久化存储在 Broker 端,Agent 异常重启后 Broker 自动定位断点、继续投递,无需业务层实现状态管理。 04 五分钟跑通 MultiAgent 异步通信 以“主控 Agent 分发任务 → 执行 Agent 处理 → 结果异步回传”场景为例。 注:具体配置项、启动方式和管理命令请以 Apache RocketMQ 5.5.0 官方文档及示例为准。完整示例参考  仓库中的 `LiteProducerExample` 和 `LitePushConsumerExample`。 Step 1:启动 RocketMQ 5.5.0 ```bash 1. 下载并解压 wget https://mirrors.aliyun.com/apache/rocketmq/5.5.0/rocketmqall5.5.0binrelease.zip unzip rocketmqall5.5.0binrelease.zip && cd rocketmqall5.5.0binrelease 2. 追加 LiteTopic 配置到 broker.conf cat conf/broker.conf { String result = llmService.invoke(new String(msg.getBody())); replyProducer.send(buildReply(PARENT_TOPIC, "RESULT_" + sessionId, result)); return ConsumeResult.SUCCESS; }) .build(); // 动态订阅专属 LiteTopic,首次订阅自动创建 worker.subscribeLite("TASK_" + executorAgentId); // 宕机重启后 Broker 自动恢复消费位点,从断点继续投递 ``` 父 Topic `AGENT_TASK_NS` 作为命名空间,每个执行 Agent 独占一个 LiteTopic 接收任务,结果通过另一个 LiteTopic 异步回传。动态上下线不触发集群重平衡,宕机后消费位点自动续接,TTL 到期自动清理。 ```typescript const consumer = await new LitePushConsumerBuilder() .setClientConfiguration({ endpoints, namespace, sessionCredentials }) .setConsumerGroup(consumerGroup) .bindTopic(inboundTopic) .setMessageListener({ async consume(messageView: MessageView) { await messageHandler({ accountId, messageView, cfg }); return ConsumeResult.SUCCESS; } }) .startup(); ``` 05 阿里云商业版:开源之上的企业级增强 开源 5.5.0 提供 LiteTopic 核心模型:轻量存储、事件驱动分发、Broker 侧会话续传。阿里云云消息队列 RocketMQ 版在此基础上,针对企业级 AI 生产诉求做了专项增强: | 能力 | 开源 5.5.0 | 阿里云商业版 | | | | | | 百万级 LiteTopic | ✅ | ✅ | | 首次消息发送自动创建 | ✅ | ✅ | | 会话续传 | ✅ | ✅ | | 事件驱动 Ready Set | ✅ | ✅ | | 通配符订阅 | ✅ | ✅ | | Consume Suspend | ✅ | ✅ | | Serverless 弹性 | — | ✅ 分钟级百万吞吐自动伸缩,按量付费 | | EventBridge生态集成 | — | ✅ AI 数据集成、实时数据洞察、实时 Agent Context | LiteTopic 还提供了 Consume Suspend 能力,在 GPU 资源紧缺的背景下,AI 服务平台基于这个能力能够实现“千人千面”的精细化流量控制,做到分级服务和优先级资源调度。传统限流的困境在于:单个用户的慢任务会阻塞其他用户的消息处理,限流等待直接阻塞消费线程,影响整体吞吐;而且预创建的 High/Mid/Low 队列粒度太粗。LiteTopic 为每个用户/会话创建独立通道后,限流策略就可以独立执行——AI 服务平台通过消费挂起(Suspend) 机制实现平滑限流,不粗暴拒绝,而是将超限请求顺延到下个时间窗口: | 限流级别 | Suspend 时长 | 适用场景 | | | | | | 轻度限流 | 50 ~ 200 ms | 突发限流,平滑到下个窗口 | | 中度限流 | 200 ~ 1000 ms | 明显降低消费速率 | | 重度限流 | 1 ~ 5 s | 接近暂停消费 | | 忙闲调度 | 5 min ~ 8 hrs | 批量任务闲时处理 | 目前,阿里云大模型服务平台百炼网关已基于 LiteTopic 构建异步事件工作流,在生产环境以“千人千面”流控管理 AI 推理请求的流量峰值与算力调度。 06 与社区共建 AI 原生消息基础设施 5.5.0 是 Apache RocketMQ 走向 AI 原生的重要一步。Anthropic 和 Google 在协议层和框架层定义异步 Agent 通信标准,消息队列在基础设施层提供运行时保障——两个层次彼此互补。 值得关注的是,MCP 已成立 Triggers & Events Working Group,方向是补齐“Server 主动推送”能力——这恰恰是消息队列发布/订阅模型的天然能力。同时 MCP 正在推进 Server Registry(类似 A2A Agent Card),意味着 MCP 正向通用 Agent 通信协议演进。 社区正在探索的下一阶段方向: LiteTopic 作为 MCP 会话外化层:为 MCP Streamable HTTP 提供持久化、可异步投递的会话状态管理 LiteTopic 与 MCP Tasks 协议层对接:探索 RocketMQ Transport 承接 MCP 的传输、Task 调度和状态保持 集成到 Openclaw/Hermes Agent/QwenPaw 等开源通用 Agent 框架:用 Litetopic 扩展 Agent Channel, 将对话型 Agent 升级为事件驱动型 Agent,更深入地嵌入企业业务流程中 如果你对 RocketMQ for AI 方向感兴趣,欢迎通过以下方式参与共建: 参与 RIP83 及后续 RIP 讨论,贡献对 AI Agent 通信场景的设计思路 在 GitHub 提交 Issue、PR,或分享使用案例 加入钉钉技术交流群:44552972 相关链接 Apache RocketMQ 5.5.0 Release Notes:https://github.com/apache/rocketmq/releases/tag/rocketmqall5.5.0 RIP83 提案原文:https://github.com/apache/rocketmq/wiki/RIP%E2%80%9083LiteTopic:ANewMessageModel 阿里云 RocketMQ for AI 解决方案:https://www.aliyun.com/solution/techsolution/rocketmqformultiagentcommunication 阿里云 RocketMQ for AI 内容合集:https://www.aliyun.com/activity/middleware/aimq Apache RocketMQ 中文社区:https://rocketmqlearning.com/ 延伸阅读

2026年4月29日

Agent 开发范式演进:从环境工程出发,“简化”多源实时上下文
当 Agent 从 AI Coding 走向更广泛的行业场景,一个越来越现实的问题开始浮现:为什么软件工程领域的 Agent 跑得飞快,但很多行业里的 Agent 迟迟没有真正爆发? 一个常见答案是模型能力还不够。但从大量落地实践来看,真正的瓶颈往往不只在模型,而在上下文供给能力:Agent 能不能持续、低成本、可信地接入真实业务世界,决定它能否从 Demo 进入生产。 基于阿里云 EventHouse 的实践,本文尝试从“环境工程”视角,重新理解企业级 Agent 上下文构建问题,并围绕信息完备性、感官管理、知识对账、变更治理、普惠门槛五个维度,讨论如何让 Agent 更简单、可靠地接入多源、实时的业务环境。 为什么 AI Coding 先跑通了,而行业 Agent 却难以落地? 今年 2 月,Anthropic 发布了一份关于其平台 AI 调用的分析报告。数据显示,软件工程行业的调用量占比高达 49.7%,接近一半。 这个结果说明了一件事:Agent 目前最容易跑通的场景,恰恰是那些本身已经高度数字化、上下文天然在线化的领域,软件工程就是一个典型例子。 在软件研发过程中,程序员天然工作在一个数字世界中: + 输入端有 PRD、交互设计、技术方案、代码、Issue、日志等; + 输出端则可以直接完成 Design、Coding、Test、Deploy 等工作。 也就是说,程序员原本就处在一个可被机器“看见”、可被系统“接入”的环境中。AI Coding 之所以能够快速嵌入,根本原因不是“代码更适合 AI”,而是它的工作环境本身就已经完成了数字化表达。 但如果把场景切换到零售、制造、金融、物流等行业,问题就不一样了。 一个超市店长 Agent,如果不知道货架是不是空了、商品标签是不是填错了、隔壁门店有没有在搞促销、今天的生鲜为什么损耗高,那么即便它背后接的是最强模型,也很难做出合理决策。因为在这样的环境里,Agent 实际上仍然处于一种“半失明”的状态——它看不见真实业务里到底发生了什么。 所以,企业级 Agent 落地过程中,一个经常被低估但绕不开的问题是:怎么简单、可靠地为 Agent 构建多源、实时、可信的上下文? 围绕这个问题,我们尝试总结了五个关键判断。 信息完备性是前提:先让 Agent 看见真实业务世界 很多行业里的 Agent 之所以很难真正发挥作用,一个很重要的卡点是:它没有足够的信息感知能力。 这个道理其实并不复杂。无论是人还是 Agent,决策能力的上限,首先都取决于其对环境的观测能力。关键信息缺失,问题在逻辑上就可能无法被充分求解。换句话说,看不见,就很难判断对。 在信息论语境中,也可以用“完备性”(Completeness)来理解这个问题:如果观测环境所能提供的数字信息不具备足够完备性,那么很多任务在底层上就会变得不可解,或者至少无法稳定求解。 AI Coding 为什么更容易成功?因为程序员本来就在一个完整的数字环境中工作;而很多行业里的 Agent 之所以跑不起来,是因为它们只能看到很有限的数字切面。 所以,想让 Agent 真正进入业务现场,第一步不是继续调 Prompt、加 Skill、做记忆优化,而是先解决信息感知问题。 围绕这一点,EventHouse 提供了三类信息感知方式: + 主动监听(Polling / Monitoring):通过长轮询或定时任务持续监控数据源,在数据变更发生时尽快完成捕捉; + 事件订阅(Event Subscription):基于事件驱动架构(EDA),当业务事件发生时主动推送给 Agent,实现异步实时响应; + 挂载查询(Mount Query):对于海量历史数据或归档冷数据,不做全量搬运,而是按需触发即席查询,让 Agent 像“挂载磁盘”一样按需访问。 它们共同解决的是同一个问题:让 Agent 不再停留在静态、片段化的信息环境中,而是能够持续接入真实业务的动态变化。_02_ 信息不是越多越好:要给 Agent 一本“图书馆馆藏目录” 有了信息感知能力,是不是问题就解决了?其实还没有。 虽然信息“完备性”很重要,但信息绝不是越多越好,也不是对物理世界做一份 1:1 的机械复制。 一方面,这本来就做不到;另一方面,也没有必要。人类进化出来的能力,并不是“接收所有信息”,而是会自动过滤掉大量无效信息,保持注意力聚焦。否则就会出现所谓“感官超载”:输入很多,但重点不清、关联混乱,最后反而无法形成有效判断。 Agent 也是一样。 还有一个更容易被忽视的问题:Agent 感知到的信息,不等于 Agent 真正拥有了这些信息。 比如给 Agent 挂了一个 PostgreSQL 的 MCP,它理论上可以知道数据库里有哪些表、字段和描述,也可以执行 SQL。但如果它每次都是等问题来了,才临时去查元数据、理解表结构、拼接查询,这种方式就像“考试前临时抱佛脚”:不仅速度慢、Token 消耗高,而且很容易产生语义偏差。 这里有一个很贴切的比喻:这就像给了你一座图书馆。书都在里面,但如果没有目录系统,你每次要用的时候都只能一层楼一层楼找、一排书架一排书架翻,效率会很低,也很容易找错。更进一步说,角落里一本书虽然归你所有,但如果你根本不知道它存在,那它在实际上就等于不存在。 所以,Agent 需要的不只是更多信息,而是一份可以快速定位、持续更新、统一理解的“图书馆馆藏目录”。 EventHouse 的做法,是通过统一 Catalog 管理 Agent 可使用的信息资产,提前记录并维护数据的语义、Schema、新鲜度、来源、适用范围、关联关系等。 这样一来,Agent 就能更清楚地知道:自己手里有哪些信息、每类信息意味着什么、需要时应该去哪里找、哪些内容值得优先使用。 如果说第一步解决的是“有没有上下文”,那么统一 Catalog 解决的就是“上下文能不能被正确消费”。 知识不等于“信息囤积”:要与 Agent 做好“知识对账” 即便拥有了统一 Catalog,问题也并没有完全解决。 我们不会因为一个人拥有一座图书馆,就认为他已经具备了判断力。同样,Agent 也不会因为接入了更多数据源,就自动变得更聪明。信息能否转化为知识,决定了 Agent 最终能否真正用好这些上下文。 从经典的 DIKW(DataInformationKnowledgeWisdom)模型来看: + 数据(Data):客观事实的原始记录,是现实世界的符号化映射; + 信息(Information):被赋予语境与结构的数据,用来回答“What”,即“发生了什么”; + 知识(Knowledge):在信息基础上提炼出的规则、经验和方法,用来回答“How”,即“应该如何找到、理解和使用这些信息”; + 智慧(Wisdom):在明确目标与价值取向的前提下,在复杂情境中,对知识进行综合运用和权衡,进而作出合理决策。 回到企业级 Agent 的上下文构建问题里,真正关键的不只是“拿到更多信息”,而是形成一套可复用、可解释、可审查的取数与用数机制。换句话说,知识的本质不是信息囤积,而是知道如何从多个数据源中准确找出所需信息,并在正确的语义边界内完成解释和行动。 围绕这一点,EventHouse 从两个方向来生成 Agent 可使用的“知识”: + 一方面,基于统一 Catalog 中的数据定义、Schema 描述和语义信息; + 另一方面,结合客户对 Agent 的业务设定,例如角色设定(SOUL)、Prompt、Gold Sample、Benchmark 等配置。 最终,这些内容会被组织成一份可读、可审查、可持续迭代的 Knowledge Wiki。 这份 Wiki 的作用很关键。它既能被 Agent 消费,也能被人类专家审阅,从而让人和 Agent 之间建立一种“知识对账”机制:确认 Agent 对取数逻辑的理解是否正确,而不是把所有逻辑都藏在一个黑盒背后。 这一步的价值在于:让 Agent 不只是“连上数据”,而是真正开始“理解数据”。 知识的每一次迭代,都是一次生产级变更 Agent 的知识不是静态资产,而是持续演化的生产资料。 人的成长需要知识升级,Agent 也是一样。上游数据源可能发生变化,Schema 可能更新,客户对 Agent 的角色和目标设定可能调整,运行中的反馈也会不断积累。这些因素都会推动 Agent 的知识体系持续演进。 问题在于:新的 Knowledge Wiki 一旦生成,能不能直接上线被 Agent 使用? 从软件工程实践看,大量生产故障都与变更有关。到了 AI 应用阶段,这个规律并没有消失,只是变更对象发生了变化:从代码、镜像、配置和基础设施,扩展到了 Prompt、Knowledge Wiki、工具与插件、模型能力、Agent 行为策略等。 虽然变更内容不同,但对稳定性的要求没变。企业级 Agent 同样需要一套完整的变更治理机制:发布前可回归、发布中可灰度、发布后可回滚。 基于这一思路,EventHouse 借鉴 CI/CD 的工程方法,将一次 Agent 更新封装为一个可管理的“制品”。这个“制品”可以包含 Prompt、Knowledge Wiki、Gold Sample、Benchmark 以及其他与 Agent 行为相关的关键配置,并围绕它构建完整的持续发布流程: + 发布前:对多个“制品”进行 Benchmark 回归测试,选择更合适的版本发布; + 发布中:采用蓝绿发布,监控并对比新旧“制品”的线上效果; + 发布后:若新“制品”不达标,可从制品仓库快速回滚至历史版本。 这套机制既支持人工审核,也支持自动化演进。它的意义,不只是让 Agent 可以持续更新,而是让更新本身变成一件可治理、可验证、可恢复的事情。 “简单”与“可靠”不是附加项,而是 Agent 惠普的入场券 回看前面四点,无论是多源信息感知、统一 Catalog、知识 Wiki,还是制品化发布,本质上都在为两个目标服务:简单和可靠。 为什么这两个词如此重要? 一个很好的参照是电力普及的历史。电力刚出现的时候,并不是所有企业都能立刻用起来。问题并不在于它没有价值,而在于接入门槛太高:企业要自己买发电机、配维护人员、改造厂房布局,还要承担供电不稳定的风险。直到后来电网成为统一基础设施,工厂只需要一个标准插座,就能获得稳定电力,电气化才真正从少数人的能力变成全行业的能力。 今天的 AI,尤其是企业级 Agent,其实也正处在类似阶段。 很多组织并不是不想做 Agent,而是没有能力长期折腾数据集成、语义对齐、架构选型、变更治理和运维保障。如果一套能力只能被少数 AI Native 团队掌握,那么它还谈不上真正普惠。只有当 Agent 接入业务世界这件事,变得像“接电”一样低门槛、标准化、可持续,AI 才有可能真正进入千行百业。 从这个意义上说,EventHouse 想做的,就是 AI 时代面向 Agent 的“标准插座”: + 在广度上:打通消息系统、数据库、对象存储、SaaS 服务等多源数据接入,让 Agent 获得足够的信息感知能力; + 在深度上:统一对齐结构化、半结构化和非结构化数据语义,构建知识 Wiki; + 在流程上:把数据集成、存储、查询、检索整合为一体化服务; + 在形态上:提供 Serverless 体验,按量付费、秒级弹性、零运维。 其目标不是把每家企业都变成基础设施专家,而是尽可能降低 Agent 接入真实业务世界的门槛。 结语:企业级 Agent 的竞争,最终会落到上下文供给能力 AI 的下半场,比拼的不只是模型参数和推理能力,更是谁能以更低成本、更高可靠性,把真实世界持续、准确地搬进数字系统。 从软件工程享受到的“数字化红利”,到传统行业仍然缺失的“上下文基础设施”,企业级 Agent 的真正分水岭,正在从模型能力转向环境能力。谁能构建多源、实时、可信、可治理的上下文供给体系,谁就更有机会让 Agent 从“能演示”走向“能生产”。 这也是 EventHouse 持续投入的方向:把多源数据接入、语义管理、知识沉淀、变更治理和 Serverless 体验整合在一起,修一条从真实业务世界通向 Agent 的“高速公路”,让更多行业都能像插上电源一样,更轻松地获得 AI 带来的效率变革。 EventHouse 正在公测,欢迎一起探索下一代事件数据平台 如果你的团队正面临以下挑战,欢迎参与 EventHouse 公测,与我们共同探索企业级 Agent 的数据底座能力: + 数据沉睡:实时事件数据已积累到海量规模,却难以被高效分析和利用; + 链路滞后:跨数据源查询仍依赖复杂 ETL 流水线,分析结果总是滞后于业务; + 智能断层:希望让 AI Agent 直接接入业务数据,实现自主分析与智能决策; + 落地门槛高:希望以更低成本获得统一、实时、可治理的上下文供给能力。 让我们一起定义下一代事件数据平台! 👉 点击此处参与公测 _https://eventbridge.console.aliyun.com/cnchengdu/eventhouse/overview_ 👥 钉钉交流群:44552972

2026年4月8日

Apache RocketMQ 5.4 新版 POP 消费机制详解
从 RocketMQ 5.0 开始,社区引入了 POP 这种消费模式。客户端发起 POP 请求后,Broker 从 queue 中取出消息返回,并在 Broker 侧维护不可见期、ACK、续租和超时恢复这套状态——尤其在非 FIFO 场景下,它允许多个消费者并发处理同一 queue 上的消息。 在实际业务里,POP 往往不是因为“想换一种消费接口”才被拿出来,而是在消费并发已经碰到瓶颈时,才真正体现出价值。 一个很典型的场景是:某个 Topic 的 queue 数已经拉到很高,比如 500;消费客户端也已经拉到 500;但一个客户端处理一个 queue,吞吐还是不够。写入侧没有明显瓶颈,真正卡住的是消费侧并发。这个时候,业务真正需要的,往往不是继续把 Topic 切得更碎,而是让多个客户端共同处理同一个 queue 上的数据。 从这个场景往下看,POP 可以先理解成一种“消息被 Broker 借出一段时间”的消费模型: + 消息取走后,不会立刻算完成 + 消息会先进入不可见期 + 处理成功后再 ACK + 处理时间不够时可以续租 + 超时后还要重新进入后续消费流程 顺着这个语义再来看源码,后面的 invisibleTime、ACK、changeInvisibleTime、revive,以及 Broker 里那套“已投递但未完成”的状态管理,就比较容易串起来。这里把 Broker 为消息保存的这类“消费中”状态统称为“投递记录”。 图注:POP 消费的核心语义——消息被 Broker 借出一段时间,取走后进入不可见期,成功后 ACK,不够则续租,超时则 Revive 重投到 Retry Topic。 一、为什么需要 POP ▍1.1 传统 Pull 为什么不太够用 传统 Pull 模式下,消费进度主要由消费位点驱动;在同一消费组内,一个队列在同一时刻通常只会稳定地分配给一个消费者实例。 这套模型很适合“按位点往前推进”的流式消费,但在下面这类场景里,局限会暴露得更明显: + 单条消息处理时间明显长于一次普通消费循环 + 处理成功与否不能只靠“位点推进了没有”来表达 + 客户端需要显式告诉 Broker:我处理完了、还没处理完、需要再给我一点时间 + 更希望消费语义接近“消息先领出处理、完成后再确认”,而不是单纯按 queue 上的位点顺序往前推进 问题不再只是“怎么把消息拉出来”,而是“怎么让消息被领出来处理一段时间,同时还能继续放大消费并发”。 单靠继续加 queue,本质上还是在用更多 queue 换并发;但当 queue 数已经很高时,很多时候业务更需要的,其实是允许多个客户端共同处理同一个 queue 上的数据。 这也正是非 FIFO POP 真正有意义的地方:它把原来更接近“一个 queue 稳定分给一个消费者实例”的消费方式,往“同一 queue 上可以存在多条被不同消费者领走、但尚未最终确认的消息”这个方向推进了一步。 ▍1.2 POP 的关键变化在哪里 POP 真正补上的,不是一个新的拉消息动作,而是 Broker 侧一层明确的“处理中状态”。 消息被取走后,不会立刻算消费完成,而是先进入一段受 Broker 管理的处理中窗口:这段时间里消息不可见,成功后 ACK,处理不完可以续租,超时则重新进入后续消费流程。 这样一来,Broker 就可以同时跟踪同一 queue 上多条“已经投递、但尚未最终完成”的消息,POP 能支撑更高消费并发的关键也在这里。 后面会反复出现的 invisibleTime、ACK、changeInvisibleTime、revive,本质上都是围绕这层状态展开的。 更详细的对比如下图: 二、POP 的几个核心概念 下面讨论 RocketMQ 5.4.0 里的 KV 路径,前面说的“投递记录”在这里具体指 PopConsumerRecord。旧路径和 FIFO 放到后文再补。 ▍2.1 invisibleTime invisibleTime 是 POP 最核心的语义。消息被 POP 成功后,在 popTime + invisibleTime 之前,这条消息对该消费组后续的 POP 请求不可见。 如果只抓住一件事,那就是:Broker 把消息借出去之后,不会立刻把它算作完成,而是先给它挂上一段“处理中窗口”。这段窗口就是 invisibleTime。 ▍2.2 ACK ACK 的核心语义不是简单推进消费位点,而是把“这条消息已经完成”这件事落到 Broker 侧状态里。 更具体一点说,就是把这条消息对应的 PopConsumerRecord 删除掉。它不是“把 queue 往前推一步”,而是把这条消息从“已投递但未完成”的集合里移出去。 ▍2.3 changeInvisibleTime 如果业务处理超过原来的 invisibleTime,客户端可以调用 changeInvisibleTime 续租。从语义上看,它不是在改消息内容,而是在把这次投递的可见时间整体往后推。 放到实现上,它体现为: + 写一条新的 PopConsumerRecord + 再删除旧记录 所以它本质上是在切换“这次投递对应的超时点”。 ▍2.4 revive revive 主要对应超时恢复机制。 如果一条消息在不可见窗口内一直没有被 ACK,Broker 后续会把它重新投递到该消费组对应的 retry topic,后续 POP 请求再按 popFromRetryProbability 的轮转策略从 retry topic 取消息,这个过程就是 revive。也就是说,revive 不是把消息直接放回原始 queue,而是走重试投递路径。 需要特别注意的是,revive 并不是一个“附加的重试能力”,而是这条消费模型下的必然结果。 从 Broker 视角看,一旦消息返回给客户端后,后续读取位置已经继续前移,而这条消息本身又仍处于“已投递但未确认”的状态,Broker 就不能再只靠 offset 把它重新取回来。 因此,如果客户端在不可见期内一直没有 ACK,Broker 就必须依赖这层消息级未确认状态的扫描与 revive 机制,把消息重新投递到 retry topic,再进入后续 POP 消费流程。 这也是为什么 revive 本质上是在弥补“offset 已前移,但消息尚未完成”的状态缺口。 ▍2.5 POP 还有 offset 吗? 有。 Broker 至少同时维护两层状态: + pullOffset / consumeOffset —— 决定后续从哪里继续读或推进 + PopConsumerRecord —— 表达“哪些消息已经投递出去、但还没有最终完成” 所以 offset 前移不等于消息已经真正消费完成,只有对应的 PopConsumerRecord 被 ACK 删除、续租切换或超时恢复后,这条消息的状态才算真正闭环。 很多人第一次看 POP,会误以为“既然已经按消息 ACK 了,那是不是就没有 offset 了”。实际上并不是。POP 里 offset 一直都在,只是进入 POP 之后,offset 不再足以单独表达全部消费状态。 如果把 Pull 和 POP 放在一起看,差别不在“有没有 offset”,而在“offset 是否还足够表达全部消费状态”: + 对 Pull 来说,offset 基本就能同时表达“从哪里继续读”和“消费推进到了哪里” + 但对 POP 来说,Broker 侧还多了一层“已投递但未完成”的消息级状态 后面整篇文章,其实就是在把这层状态是怎么建立、怎么变更、怎么收尾讲清楚。 三、先按一条主线读源码 RocketMQ 5.4.0 里新旧两条实现路径仍然并存,但如果一开始就把两套状态模型交叉着讲,读者很容易在“当前这一步到底落在哪套实现里”上迷路。 所以下面只顺着一条线往下走:PopMessageProcessor → PopConsumerService → PopConsumerRecord → ACK / changeInvisibleTime / revive。旧路径和 FIFO,都放到后面再统一说。 图注:POP 源码的三层架构——请求入口(PopMessageProcessor)、核心服务(PopConsumerService)、状态存储(PopConsumerCache + PopConsumerKVStore)。核心投递记录 PopConsumerRecord 贯穿全流程。 ▍3.1 统一入口:PopMessageProcessor POP 请求统一从 PopMessageProcessor.processRequest() 进入。它负责: + 校验 Broker / Topic / ConsumerGroup 权限 + 校验 maxMsgNums,当前限制最大 32 + 校验 queueId + 校验 timerWheelEnable,没开时 POP 直接拒绝 + 构造过滤表达式和 MessageFilter + 根据配置决定走哪条 POP 实现路径 + 在没取到消息时进入 PopLongPollingService 代码位置:broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java,入口方法 processRequest(...) 它是 POP 的入口,但不是全部逻辑所在。 ▍3.2 核心组件:PopConsumerService 当brokerConfig.isPopConsumerKVServiceEnable() 为 true 时,POP 主流程会委托给 PopConsumerService.popAsync(...)。 这一套是这里真正的主角,核心组件包括: PopConsumerContext 封装单次 POP 的上下文,保存本次返回的 GetMessageResult 和本次生成的 PopConsumerRecord,汇总返回给客户端的 startOffsetInfo、msgOffsetInfo。 PopConsumerRecord 表示“一条消息被 POP 出去但还没最终确认”的记录。核心字段:popTime / groupId / topicId / queueId / retryFlag / invisibleTime / offset / attemptTimes / attemptId。其中 attemptTimes 是 revive 重试次数,每次 revive 失败后递增,用于退避策略。visibilityTimeout = popTime + invisibleTime。 PopConsumerKVStore 默认是 RocksDB 存储实现。 PopConsumerCache enablePopBufferMerge 打开时的内存缓冲,缓冲区定期刷盘或触发 revive。 PopConsumerLockService 按 groupId + topicId 做互斥。 代码位置:broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java,入口方法 popAsync(...) 四、POP 请求主流程 下面把一次 POP 请求从入口到返回完整走一遍。 图注:一次完整 POP 请求的 10 步主流程 + 长轮询备选路径(A/B/C)。注意长轮询本质仍是“请求驱动的拉取”,唤醒时重新调用 processRequest()。 ▍4.1 请求进入 Broker 客户端发 POP_MESSAGE 请求后,PopMessageProcessor 先做一轮硬校验: + Broker 是否可读 + Topic 是否存在且可读 + ConsumerGroup 是否存在且允许消费 + requestHeader.getMaxMsgNums() ≤ 32 + Store 是否开启 timer wheel + queueId 是否合法 如果请求带过滤表达式,还会构建普通 Topic 和对应 retry topic 的 SubscriptionData,以及对应的 ExpressionMessageFilter。 ▍4.2 创建 PopConsumerContext 进入 PopConsumerService.popAsync(...) 后,会先创建 PopConsumerContext,里面记录:客户端地址、popTime、invisibleTime、消费组、attemptId、本次累积返回的消息结果和投递记录。这个对象贯穿单次 POP。 ▍4.3 加锁粒度:groupId + topicId PopConsumerService 会先尝试对 groupId + topicId 加锁。注意这里不是“每个队列一把锁”,而是消费组和 Topic 粒度的互斥。 这么做的目的,主要是保证一次 POP 紧密相关的状态变更不会互相踩踏:取消息、建投递记录、提交 offset,都在这把锁保护下串起来。 ▍4.4 先取重试还是先取普通消息 POP 不是永远先取普通 Topic。PopConsumerService 里会根据配置决定是否优先取重试消息: + popFromRetryProbability + popFromRetryProbabilityForPriority 并结合以下条件算出 preferRetry:Topic 类型是否为 PRIORITY,且 requestCount % 100 拿到 probability 后,preferRetry = (probability 0) && (requestCount % (100 / probability) == 0)。 取消息的实际顺序也与 preferRetry 和 queueId 模式有关: + preferRetry=true:先取 retry topic(v1/v2),再取普通 Topic,不再重复取 retry + preferRetry=false 且 queueId==1:先取普通 Topic,再取 retry topic(v1/v2) + preferRetry=false 且 queueId!=1:只取指定普通队列,这次请求里不会再补读 retry topic 也就是说,它不是简单的“普通队列空了再去重试队列”,而是带概率控制、并且受 queueId 模式约束的优先级轮转策略。 ▍4.5 支持 retry topic v1/v2 这里会根据配置决定是否从这些重试主题取消息:KeyBuilder.buildPopRetryTopicV1(...) 和 KeyBuilder.buildPopRetryTopicV2(...)。并且 retryFlag 会记在 PopConsumerRecord 里,后面 revive/返回消息时都要用到。 ▍4.6 queueId 的两种模式 指定队列:如果 queueId != 1,只从指定队列取。 全队列 POP:如果 queueId == 1,会遍历 Topic 的所有读队列。遍历起点和方向不是固定的:结合请求计数做轮转,尽量避免每次都从固定队列开始;同时受 priorityOrderAsc 控制,决定从低 queueId 向高递增,还是从高 queueId 向低递减。 ▍4.7 真正取消息 每个队列最终都会调用 getMessageAsync(...) 去读存储。这里有几个关键点: + POP 使用的是 pullOffset + 如果发现 offset 过小/过大/无效,会修正 offset 后重试读取 + 读到消息后,会同步更新 commitPullOffset(...) 和 commitOffset(...) 要注意:commitPullOffset 更像“下一次从哪里继续试探着拉”;打开 enablePopBufferMerge 时,commitOffset 可能会被压到 PopConsumerCache 里最小的未确认 offset,而不是这次读取的 nextBeginOffset。 commitOffset 仍然是 POP 在 Broker 侧维护的一部分消费进度,但真正决定一条消息是否已经消费完成的,仍然是后面的 ACK、续租和 revive。 ▍4.8 inflight 背压:isPopShouldStop 在每个队列真正取消息之前,PopConsumerService 还会先调用 isPopShouldStop(groupId, topicId, queueId) 做背压判断。 当 enablePopMessageThreshold=true 且该队列当前 inflight 消息数(由 PopConsumerCache 统计的未 ACK 记录数)已经达到 popInflightMessageThreshold 时,isPopShouldStop 返回 true,这次队列迭代直接跳过,不再读存储。 这和长轮询里的 POLLING_FULL 是两个独立的流控层: + POLLING_FULL:长轮询挂起队列容量已满,新请求无法挂起 + isPopShouldStop:某个队列的 inflight 消息过多,主动限制继续发送 ▍4.9 读到消息后如何建模 PopConsumerContext.addGetMessageResult(...) 会先在本次请求上下文里,为每条命中的消息挂一条 PopConsumerRecord。 可以把它理解成:Broker 在说“这条 offset=xxx 的消息,我在 popTime 时刻交给了这个消费组,它要在 invisibleTime 后才算超时。” 这是 POP 跟 Pull 最大的不同。但要注意,这一步只是先把“投递记录”挂在 PopConsumerContext 里;真正写到缓存或 KV Store,是下一步的事。 ▍4.10 投递记录写到哪里 如果这次 POP 确实取到了消息,这些 PopConsumerRecord 就会被真正保存下来: + 如果 enablePopBufferMerge=true 且缓存没满,先写 PopConsumerCache + 否则直接写 PopConsumerKVStore(默认 RocksDB) 所以这里真正的核心投递记录就是 PopConsumerRecord。 投递记录写入路径图 图注:PopConsumerRecord 从创建到持久化的完整写入路径。enablePopBufferMerge=true 时经内存缓冲刷盘,否则直写 RocksDB。存储键为 visibilityTimeout + groupId + topicId + queueId + offset。 ▍4.11 返回给客户端前的重试消息改写 如果取到的是 retry topic 上的消息,返回客户端前,Broker 可能会对“客户端看到的 topic”做一层重编码;但这里要特别注意两件事: + 这一层更多是“响应呈现层”的行为,不等于 ACK/续租时真正使用的 topic + 客户端后续 ACK、续租最终依赖的仍然是 PROPERTY_POP_CK / receipt handle 里携带的 retry 标记,客户端会据此恢复 real topic 所以从客户端视角看,“日志里看到的消息 topic”和“Broker 这次实际从哪个主题取到消息”,不一定始终一一对应。这个问题后面再单独展开。 ▍4.12 没取到消息时的长轮询 如果本次 POP 没取到消息,不一定立刻返回空。只要请求里的 pollTime 0,请求就会进入 PopLongPollingService.polling(...)。 长轮询服务会把请求按 topic + consumerGroup + queueId 挂到 pollingMap 里。已经挂起的请求,主要会在下面几种场景被重新处理: + 新消息到达 + 挂起超时 + 服务停止时的清理唤醒 这里的“重新处理”不是 Broker 手里已经攥着一批消息,等条件满足后直接把它推给挂起请求,而是重新调用一次 processor.processRequest(...),也就是再走一遍 POP 处理流程。 需要特别注意的是,POP 的长轮询本质上仍然是“请求驱动的拉取”,而不是“服务端主动推送”。 要额外注意的是,POLLING_FULL 不是唤醒场景。如果长轮询队列本身已经满了,请求不会进入挂起队列,而是会直接返回 POLLING_FULL。 五、ACK、续租和 revive 消息返回客户端之后,还剩三件事要闭环:处理完成怎么 ACK,时间不够怎么续租,超时之后又怎么恢复。 ▍5.1 ACK ACK 请求入口是 AckMessageProcessor。最重要的是它最终会落到 PopConsumerService.ackAsync(...)。它的主流程可以理解成这样: 1. AckMessageProcessor.appendAckNew(...) 先从 extraInfo 或 BatchAck 里还原这次 POP 的关键信息 2. 然后调用 PopConsumerService.ackAsync(...) 3. ackAsync(...) 会根据 popTime/groupId/topicId/queueId/invisibleTime/offset 构造对应的 PopConsumerRecord 作为查找 key;底层存储键最终会折算成 visibilityTimeout + groupId + topicId + queueId + offset 4. 然后先尝试从 PopConsumerCache 删除;删不到再去 PopConsumerKVStore 删除 所以这里的 ACK,就是把这条消息对应的“进行中记录”删掉。 ▍5.2 为什么 ACK 不是简单提交 offset 因为 POP 是“消息级确认”,不是“队列级顺序推进”。同一批次返回的多条消息里,可能有的已经处理完成,有的还在处理中,所以 Broker 必须保留消息级的未确认状态,而不能只靠一个队列 offset 表达完成状态。 放在这里看,这层状态就是 PopConsumerRecord。 ▍5.3 changeInvisibleTime ChangeInvisibleTimeProcessor 处理续租请求。它最终会进入 PopConsumerService.changeInvisibilityDuration(...)。这条分支会做两件事: + 写入一条新的 PopConsumerRecord(changedPopTime + changedInvisibleTime) + 删除旧的 PopConsumerRecord 本质上等于把原来的“超时点”整体往后挪。 要注意,这里新生成的记录不是重新写回 PopConsumerCache,而是直接写 PopConsumerKVStore,然后再删除旧记录。 ▍5.4 超时未 ACK 后如何 revive PopConsumerService 自身就是一个背景线程服务,会定期扫描 PopConsumerKVStore 里的过期记录: + 实现上通过 scanExpiredRecords(...) 扫描已经进入过期窗口的 PopConsumerRecord + 再去读原始消息 + 把原始消息重写到 retry topic + 删除旧记录 如果打开了 enablePopBufferMerge,还要再补一层理解:部分记录会先进入 PopConsumerCache,缓存线程会先把未过期记录刷到 store,把已过期记录直接交给 revive 逻辑处理。所以“后台线程只扫 KVStore”是主路径,但不是全部。 如果 revive 失败,不是无限重试,而是会按 attemptTimes 做退避重试。超过最大尝试次数后,日志里会出现 message may be lost。 ▍5.5 revive 重投到哪里 如果原消息本来就来自重试主题,则继续回到对应重试主题。如果原消息来自普通 Topic,则会重投到 POP retry topic(v1 或 v2,取决于 Broker 配置),并且会带上 PROPERTY_FIRST_POP_TIME、PROPERTY_ORIGIN_GROUP 和最新的重试次数。 图注:ACK 删除 PopConsumerRecord 标记完成,changeInvisibleTime 写新记录+删旧记录将超时点后推。两者操作路径、写入目标、最终效果均不同。 六、最后补三个容易混的点 ▍6.1 FIFO 为什么不是这套模型 前面讲的这套模型里,核心一直是“每条消息对应一条 PopConsumerRecord,再围绕它做 ACK、续租和 revive”。FIFO 不是这套思路。 FIFO 更像“带不可见期的顺序消费”。Broker 关心的不是每条消息各自有没有一条独立投递记录,而是同一队列上的顺序窗口有没有被前面的未 ACK 消息卡住。对应到源码上,核心状态从 PopConsumerRecord 换成了 OrderInfo,由 ConsumerOrderInfoManager 维护。 所以 FIFO 不走前面那套消息级 retry/revive 主流程。它的超时恢复也不是后台扫描,而是后续 POP 通过 OrderInfo.needBlock() 判断阻塞窗口是否已经过期;一旦阻塞解除,因为 committedOffset 还没前进,消息会从原始 queue 原地重新投递,而不是进 retry topic。 ▍6.2 旧路径在做什么 旧路径和前面讲的是同一个问题的另一套实现。如果 isPopConsumerKVServiceEnable() 为 false,Broker 不再用 PopConsumerRecord 表示“已投递未确认”,而是换成 PopCheckPoint / AckMsg / BatchAckMsg 这一组对象。 大致可以把它理解成: + POP 取消息后生成 PopCheckPoint + ACK 通过 AckMsg / BatchAckMsg + PopBufferMergeService 表达 + changeInvisibleTime 会追加新的 PopCheckPoint,再去 ACK 旧 checkpoint + 超时恢复依赖 revive topic 和 PopReviveService 旧路径和前文解决的是同一个问题,但中间状态模型已经完全换了一套。理解到 checkpoint、ack 日志和 revive topic 这一层就够了。 ▍6.3 涉及重试消息时,客户端看到的 topic 为什么会不同 这里最容易混淆的一点是:客户端看到的消息 topic,不等于 ACK / 续租真正作用的 real topic。 POP 返回消息时,Broker 可能会对响应里的 topic 做重编码,尤其是消息实际来自 retry topic 时更容易让人误会。但客户端后续 ACK / changeInvisibleTime 依赖的并不是当前消息对象上的 topic,而是 PROPERTY_POP_CK / receipt handle 里的 retry 标记,再据此恢复 real topic。 所以仅凭客户端日志里看到的 topic,不能反推 Broker 这次一定是从普通 topic 读到的,还是从 retry topic 读到的。把这件事分开看就不容易绕:日志里的 topic 更像是响应呈现层,ACK / 续租用的 topic 才是语义层。 图注:客户端日志中的 topic 属于“呈现层”(可能被 Broker 重编码),而 ACK/续租使用的 real topic 属于“语义层”(来自 PROPERTY_POP_CK 中的 retry 标记)。两者不能混为一谈。 七、再往外看:为什么 Kafka 也在演进这类能力 如果把 POP 从 RocketMQ 自己的实现细节里跳出来看,它解决的其实不是一个 RocketMQ 独有的问题,而是一类更普遍的消息消费诉求: + 共享消费 + 显式确认 + 处理中窗口控制 + 超时后的再次投递 RocketMQ POP 是其中一种实现方式。它仍然建立在 RocketMQ 自己的 Topic、queue、retry topic、FIFO/非 FIFO 分支之上,但消费语义上已经明显引入了“消息被借出一段时间、完成后确认、超时后恢复”的队列式特征。 Kafka 近几个版本也在沿着这个方向演进。按照 Apache Kafka 官方文档和 KIP932 的表述: + Kafka 4.1 把 share groups 作为 preview 形态引入 + Kafka 4.2 把 Queues for Kafka / share groups 标记为 productionready + 它提供的是一种不同于传统 consumer group 的共享消费模型 + 不再要求每个 partition 只能分配给一个消费者实例处理 + 消费者可以做逐条 ACK,并跟踪 delivery attempts;同时系统仍然针对 batch processing 做了优化 + 官方文档也明确说,这类能力更适合“records are processed one at a time”,而不是典型的 ordered stream 如果只看高层语义,会发现它和 POP 想解决的问题其实很像:都在补“共享消费 + 显式确认 + 失败后继续处理”这类能力。当然,两者并不是一套实现: + RocketMQ POP 的关键语义是 invisibleTime、ACK、续租、revive,以及围绕这些语义建立的 Broker 侧投递记录 + Kafka share groups 的关键语义是共享消费组、逐条 ACK、delivery attempts,以及围绕 share group 建立的新协调与状态管理 这也说明,“共享消费、逐条确认、处理中窗口控制、失败后继续处理”并不是某一个消息系统偶然长出来的特性,而是消息系统里真实存在的一类业务需求。不同消息系统都在沿着各自的体系往这个方向演进,只是具体实现路径和抽象方式并不相同。 八、关键源码索引 以下以 release5.4.0 分支、提交 b5da00ad0 为代码基线。 | 功能 | 关键类 / 方法 | | :: | | | POP 请求入口 | PopMessageProcessor.processRequest() | | KV 路径主流程 | PopConsumerService.popAsync() | | 重试消息重编码 | PopConsumerService.recodeRetryMessage() | | ACK 主流程 | AckMessageProcessor.processRequest()→ appendAckNew()→ PopConsumerService.ackAsync() | | 续租主流程 | ChangeInvisibleTimeProcessor.processRequestAsync()→ PopConsumerService.changeInvisibilityDuration() | | revive 主流程 | PopConsumerService.revive(PopConsumerRecord) / revive(AtomicLong, int) | | 旧路径buffer merge | PopBufferMergeService | | 旧路径 revive | PopReviveService | | FIFO 顺序状态 | QueueLevelConsumerManager | | 长轮询挂起 | PopLongPollingService.polling() | | 长轮询唤醒 | PopLongPollingService.wakeUp() | 总结 POP 消费机制的核心可以概括为三句话: 第一,POP 补上了 Broker 侧的“处理中状态”。 消息被取走后不立刻算完成,而是进入 invisibleTime 不可见窗口,Broker 通过 PopConsumerRecord 跟踪每条“已投递但未完成”的消息。 第二,消费进度不再仅靠 offset 表达。 offset 决定从哪里继续读,PopConsumerRecord 决定哪些消息还在处理中。ACK 删除记录、续租切换超时点、revive 处理超时恢复,三者共同构成状态闭环。 第三,这是消息系统的一类共性需求。 从 RocketMQ 的 POP 到 Kafka 的 share groups,不同系统都在用各自的方式实现“共享消费 + 显式确认 + 处理中窗口控制”,因为这是实际业务中的真实诉求。
查看全部文章
ABOUT US
Apache RocketMQ事件驱动架构全景图
微服务
Higress
Dubbo
Sentinel
Seata
Spring Cloud
Nacos
物联网
家电
汽车
穿戴设备
充电桩
工业设备
手机
事件驱动架构平台
RabbitMQ
Kafka
EventBridge
MQTT
RocketMQ
MNS
Apache RocketMQ as Core
计算
模型服务
函数计算
容器
存储
对象存储
数据库
NoSQL
分析
Flink
Spark
Elastic Search
事件
云服务器
对象存储
云监控
SaaS事件
通知
语音
短信
邮箱
移动推送

产品特点

为什么学习Apache RocketMQ

云原生
生于云,长于云,无限弹性扩缩,K8S 友好
高吞吐
万亿级吞吐保证,同时满足微服务于大数据场景
流处理
提供轻量、高扩展、高性能和丰富功能的流计算引擎
金融级
金融级的稳定性,广泛用于交易核心链路
架构极简
零外部依赖,Shared-nothing 架构
生态友好
无缝对接微服务、实时计算、数据湖等周边生态
浙ICP备12022327号-1120