Apache RocketMQ 5.4 新版 POP 消费机制详解

作者|同程旅行、杨肖
2026年4月8日

从 RocketMQ 5.0 开始,社区引入了 POP 这种消费模式。客户端发起 POP 请求后,Broker 从 queue 中取出消息返回,并在 Broker 侧维护不可见期、ACK、续租和超时恢复这套状态——尤其在非 FIFO 场景下,它允许多个消费者并发处理同一 queue 上的消息。

在实际业务里,POP 往往不是因为“想换一种消费接口”才被拿出来,而是在消费并发已经碰到瓶颈时,才真正体现出价值。

一个很典型的场景是:某个 Topic 的 queue 数已经拉到很高,比如 500;消费客户端也已经拉到 500;但一个客户端处理一个 queue,吞吐还是不够。写入侧没有明显瓶颈,真正卡住的是消费侧并发。这个时候,业务真正需要的,往往不是继续把 Topic 切得更碎,而是让多个客户端共同处理同一个 queue 上的数据。

从这个场景往下看,POP 可以先理解成一种“消息被 Broker 借出一段时间”的消费模型:

  • 消息取走后,不会立刻算完成
  • 消息会先进入不可见期
  • 处理成功后再 ACK
  • 处理时间不够时可以续租
  • 超时后还要重新进入后续消费流程

顺着这个语义再来看源码,后面的 invisibleTime、ACK、changeInvisibleTime、revive,以及 Broker 里那套“已投递但未完成”的状态管理,就比较容易串起来。这里把 Broker 为消息保存的这类“消费中”状态统称为“投递记录”

图注:POP 消费的核心语义——消息被 Broker 借出一段时间,取走后进入不可见期,成功后 ACK,不够则续租,超时则 Revive 重投到 Retry Topic。

一、为什么需要 POP

▍1.1 传统 Pull 为什么不太够用

传统 Pull 模式下,消费进度主要由消费位点驱动;在同一消费组内,一个队列在同一时刻通常只会稳定地分配给一个消费者实例。

这套模型很适合“按位点往前推进”的流式消费,但在下面这类场景里,局限会暴露得更明显:

  • 单条消息处理时间明显长于一次普通消费循环
  • 处理成功与否不能只靠“位点推进了没有”来表达
  • 客户端需要显式告诉 Broker:我处理完了、还没处理完、需要再给我一点时间
  • 更希望消费语义接近“消息先领出处理、完成后再确认”,而不是单纯按 queue 上的位点顺序往前推进

问题不再只是“怎么把消息拉出来”,而是“怎么让消息被领出来处理一段时间,同时还能继续放大消费并发”

单靠继续加 queue,本质上还是在用更多 queue 换并发;但当 queue 数已经很高时,很多时候业务更需要的,其实是允许多个客户端共同处理同一个 queue 上的数据。

这也正是非 FIFO POP 真正有意义的地方:它把原来更接近“一个 queue 稳定分给一个消费者实例”的消费方式,往“同一 queue 上可以存在多条被不同消费者领走、但尚未最终确认的消息”这个方向推进了一步。

▍1.2 POP 的关键变化在哪里

POP 真正补上的,不是一个新的拉消息动作,而是 Broker 侧一层明确的“处理中状态”。

消息被取走后,不会立刻算消费完成,而是先进入一段受 Broker 管理的处理中窗口:这段时间里消息不可见,成功后 ACK,处理不完可以续租,超时则重新进入后续消费流程。

这样一来,Broker 就可以同时跟踪同一 queue 上多条“已经投递、但尚未最终完成”的消息,POP 能支撑更高消费并发的关键也在这里。

后面会反复出现的 invisibleTime、ACK、changeInvisibleTime、revive,本质上都是围绕这层状态展开的。

更详细的对比如下图:

二、POP 的几个核心概念

下面讨论 RocketMQ 5.4.0 里的 KV 路径,前面说的“投递记录”在这里具体指 PopConsumerRecord。旧路径和 FIFO 放到后文再补。

▍2.1 invisibleTime

invisibleTime 是 POP 最核心的语义。消息被 POP 成功后,在 popTime + invisibleTime 之前,这条消息对该消费组后续的 POP 请求不可见。

如果只抓住一件事,那就是:Broker 把消息借出去之后,不会立刻把它算作完成,而是先给它挂上一段“处理中窗口”。这段窗口就是 invisibleTime

▍2.2 ACK

ACK 的核心语义不是简单推进消费位点,而是把“这条消息已经完成”这件事落到 Broker 侧状态里。

更具体一点说,就是把这条消息对应的 PopConsumerRecord 删除掉。它不是“把 queue 往前推一步”,而是把这条消息从“已投递但未完成”的集合里移出去。

▍2.3 changeInvisibleTime

如果业务处理超过原来的 invisibleTime,客户端可以调用 changeInvisibleTime 续租。从语义上看,它不是在改消息内容,而是在把这次投递的可见时间整体往后推。

放到实现上,它体现为:

  • 写一条新的 PopConsumerRecord
  • 再删除旧记录

所以它本质上是在切换“这次投递对应的超时点”

▍2.4 revive

revive 主要对应超时恢复机制。

如果一条消息在不可见窗口内一直没有被 ACK,Broker 后续会把它重新投递到该消费组对应的 retry topic,后续 POP 请求再按 popFromRetryProbability 的轮转策略从 retry topic 取消息,这个过程就是 revive。也就是说,revive 不是把消息直接放回原始 queue,而是走重试投递路径。

需要特别注意的是,revive 并不是一个“附加的重试能力”,而是这条消费模型下的必然结果。

从 Broker 视角看,一旦消息返回给客户端后,后续读取位置已经继续前移,而这条消息本身又仍处于“已投递但未确认”的状态,Broker 就不能再只靠 offset 把它重新取回来。

因此,如果客户端在不可见期内一直没有 ACK,Broker 就必须依赖这层消息级未确认状态的扫描与 revive 机制,把消息重新投递到 retry topic,再进入后续 POP 消费流程。

这也是为什么 revive 本质上是在弥补“offset 已前移,但消息尚未完成”的状态缺口。

▍2.5 POP 还有 offset 吗?

有。

Broker 至少同时维护两层状态:

  • pullOffset / consumeOffset —— 决定后续从哪里继续读或推进
  • PopConsumerRecord —— 表达“哪些消息已经投递出去、但还没有最终完成”

所以 offset 前移不等于消息已经真正消费完成,只有对应的 PopConsumerRecord 被 ACK 删除、续租切换或超时恢复后,这条消息的状态才算真正闭环。

很多人第一次看 POP,会误以为“既然已经按消息 ACK 了,那是不是就没有 offset 了”。实际上并不是。POP 里 offset 一直都在,只是进入 POP 之后,offset 不再足以单独表达全部消费状态。

如果把 Pull 和 POP 放在一起看,差别不在“有没有 offset”,而在“offset 是否还足够表达全部消费状态”:

  • 对 Pull 来说,offset 基本就能同时表达“从哪里继续读”和“消费推进到了哪里”
  • 但对 POP 来说,Broker 侧还多了一层“已投递但未完成”的消息级状态

后面整篇文章,其实就是在把这层状态是怎么建立、怎么变更、怎么收尾讲清楚。

三、先按一条主线读源码

RocketMQ 5.4.0 里新旧两条实现路径仍然并存,但如果一开始就把两套状态模型交叉着讲,读者很容易在“当前这一步到底落在哪套实现里”上迷路。

所以下面只顺着一条线往下走:PopMessageProcessor → PopConsumerService → PopConsumerRecord → ACK / changeInvisibleTime / revive。旧路径和 FIFO,都放到后面再统一说。

图注:POP 源码的三层架构——请求入口(PopMessageProcessor)、核心服务(PopConsumerService)、状态存储(PopConsumerCache + PopConsumerKVStore)。核心投递记录 PopConsumerRecord 贯穿全流程。

▍3.1 统一入口:PopMessageProcessor

POP 请求统一从 PopMessageProcessor.processRequest() 进入。它负责:

  • 校验 Broker / Topic / ConsumerGroup 权限
  • 校验 maxMsgNums,当前限制最大 32
  • 校验 queueId
  • 校验 timerWheelEnable,没开时 POP 直接拒绝
  • 构造过滤表达式和 MessageFilter
  • 根据配置决定走哪条 POP 实现路径
  • 在没取到消息时进入 PopLongPollingService

代码位置:broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java,入口方法 processRequest(…)

它是 POP 的入口,但不是全部逻辑所在。

3.2 核心组件:PopConsumerService

brokerConfig.isPopConsumerKVServiceEnable()
为 true 时,POP 主流程会委托给 PopConsumerService.popAsync(…)。

这一套是这里真正的主角,核心组件包括:

PopConsumerContext

封装单次 POP 的上下文,保存本次返回的 GetMessageResult 和本次生成的 PopConsumerRecord,汇总返回给客户端的 startOffsetInfo、msgOffsetInfo。

PopConsumerRecord

表示“一条消息被 POP 出去但还没最终确认”的记录。核心字段:popTime / groupId / topicId / queueId / retryFlag / invisibleTime / offset / attemptTimes / attemptId。其中 attemptTimes 是 revive 重试次数,每次 revive 失败后递增,用于退避策略。visibilityTimeout = popTime + invisibleTime。

PopConsumerKVStore

默认是 RocksDB 存储实现。

PopConsumerCache

enablePopBufferMerge 打开时的内存缓冲,缓冲区定期刷盘或触发 revive。

PopConsumerLockService

按 groupId + topicId 做互斥。

代码位置:broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java,入口方法 popAsync(…)

四、POP 请求主流程

下面把一次 POP 请求从入口到返回完整走一遍。

图注:一次完整 POP 请求的 10 步主流程 + 长轮询备选路径(A/B/C)。注意长轮询本质仍是“请求驱动的拉取”,唤醒时重新调用 processRequest()。

4.1 请求进入 Broker

客户端发 POP_MESSAGE 请求后,PopMessageProcessor 先做一轮硬校验:

  • Broker 是否可读
  • Topic 是否存在且可读
  • ConsumerGroup 是否存在且允许消费
  • requestHeader.getMaxMsgNums() ≤ 32
  • Store 是否开启 timer wheel
  • queueId 是否合法

如果请求带过滤表达式,还会构建普通 Topic 和对应 retry topic 的 SubscriptionData,以及对应的 ExpressionMessageFilter。

4.2 创建 PopConsumerContext

进入 PopConsumerService.popAsync(…) 后,会先创建 PopConsumerContext,里面记录:客户端地址、popTime、invisibleTime、消费组、attemptId、本次累积返回的消息结果和投递记录。这个对象贯穿单次 POP。

4.3 加锁粒度:groupId + topicId

PopConsumerService 会先尝试对 groupId + topicId 加锁。注意这里不是“每个队列一把锁”,而是消费组和 Topic 粒度的互斥。

这么做的目的,主要是保证一次 POP 紧密相关的状态变更不会互相踩踏:取消息、建投递记录、提交 offset,都在这把锁保护下串起来。

4.4 先取重试还是先取普通消息

POP 不是永远先取普通 Topic。PopConsumerService 里会根据配置决定是否优先取重试消息:

  • popFromRetryProbability
  • popFromRetryProbabilityForPriority

并结合以下条件算出 preferRetry:Topic 类型是否为 PRIORITY,且 requestCount % 100 < subscriptionGroupConfig.getPriorityFactor()(满足时进入 priority 模式,使用 popFromRetryProbabilityForPriority),否则使用 popFromRetryProbability 作为概率基数。

拿到 probability 后,preferRetry = (probability > 0) && (requestCount % (100 / probability) == 0)。

取消息的实际顺序也与 preferRetry 和 queueId 模式有关:

  • preferRetry=true:先取 retry topic(v1/v2),再取普通 Topic,不再重复取 retry
  • preferRetry=false 且 queueId==-1:先取普通 Topic,再取 retry topic(v1/v2)
  • preferRetry=false 且 queueId!=-1:只取指定普通队列,这次请求里不会再补读 retry topic

也就是说,它不是简单的“普通队列空了再去重试队列”,而是带概率控制、并且受 queueId 模式约束的优先级轮转策略。

4.5 支持 retry topic v1/v2

这里会根据配置决定是否从这些重试主题取消息:KeyBuilder.buildPopRetryTopicV1(…) 和 KeyBuilder.buildPopRetryTopicV2(…)。并且 retryFlag 会记在 PopConsumerRecord 里,后面 revive/返回消息时都要用到。

4.6 queueId 的两种模式

指定队列:如果 queueId != -1,只从指定队列取。

全队列 POP:如果 queueId == -1,会遍历 Topic 的所有读队列。遍历起点和方向不是固定的:结合请求计数做轮转,尽量避免每次都从固定队列开始;同时受 priorityOrderAsc 控制,决定从低 queueId 向高递增,还是从高 queueId 向低递减。

4.7 真正取消息

每个队列最终都会调用 getMessageAsync(…) 去读存储。这里有几个关键点:

  • POP 使用的是 pullOffset
  • 如果发现 offset 过小/过大/无效,会修正 offset 后重试读取
  • 读到消息后,会同步更新 commitPullOffset(…) 和 commitOffset(…)

要注意:commitPullOffset 更像“下一次从哪里继续试探着拉”;打开 enablePopBufferMerge 时,commitOffset 可能会被压到 PopConsumerCache 里最小的未确认 offset,而不是这次读取的 nextBeginOffset。

commitOffset 仍然是 POP 在 Broker 侧维护的一部分消费进度,但真正决定一条消息是否已经消费完成的,仍然是后面的 ACK、续租和 revive。

4.8 in-flight 背压:isPopShouldStop

在每个队列真正取消息之前,PopConsumerService 还会先调用 isPopShouldStop(groupId, topicId, queueId) 做背压判断。

当 enablePopMessageThreshold=true 且该队列当前 in-flight 消息数(由 PopConsumerCache 统计的未 ACK 记录数)已经达到 popInflightMessageThreshold 时,isPopShouldStop 返回 true,这次队列迭代直接跳过,不再读存储。

这和长轮询里的 POLLING_FULL 是两个独立的流控层:

  • POLLING_FULL:长轮询挂起队列容量已满,新请求无法挂起
  • isPopShouldStop:某个队列的 in-flight 消息过多,主动限制继续发送

4.9 读到消息后如何建模

PopConsumerContext.addGetMessageResult(…) 会先在本次请求上下文里,为每条命中的消息挂一条 PopConsumerRecord。

可以把它理解成:Broker 在说“这条 offset=xxx 的消息,我在 popTime 时刻交给了这个消费组,它要在 invisibleTime 后才算超时。”

这是 POP 跟 Pull 最大的不同。但要注意,这一步只是先把“投递记录”挂在 PopConsumerContext 里;真正写到缓存或 KV Store,是下一步的事。

4.10 投递记录写到哪里

如果这次 POP 确实取到了消息,这些 PopConsumerRecord 就会被真正保存下来:

  • 如果 enablePopBufferMerge=true 且缓存没满,先写 PopConsumerCache
  • 否则直接写 PopConsumerKVStore(默认 RocksDB)

所以这里真正的核心投递记录就是 PopConsumerRecord。

投递记录写入路径图

图注:PopConsumerRecord 从创建到持久化的完整写入路径。enablePopBufferMerge=true 时经内存缓冲刷盘,否则直写 RocksDB。存储键为 visibilityTimeout + groupId + topicId + queueId + offset。

4.11 返回给客户端前的重试消息改写

如果取到的是 retry topic 上的消息,返回客户端前,Broker 可能会对“客户端看到的 topic”做一层重编码;但这里要特别注意两件事:

  • 这一层更多是“响应呈现层”的行为,不等于 ACK/续租时真正使用的 topic
  • 客户端后续 ACK、续租最终依赖的仍然是 PROPERTY_POP_CK / receipt handle 里携带的 retry 标记,客户端会据此恢复 real topic

所以从客户端视角看,“日志里看到的消息 topic”和“Broker 这次实际从哪个主题取到消息”,不一定始终一一对应。这个问题后面再单独展开。

4.12 没取到消息时的长轮询

如果本次 POP 没取到消息,不一定立刻返回空。只要请求里的 pollTime > 0,请求就会进入 PopLongPollingService.polling(…)。

长轮询服务会把请求按 topic + consumerGroup + queueId 挂到 pollingMap 里。已经挂起的请求,主要会在下面几种场景被重新处理:

  • 新消息到达
  • 挂起超时
  • 服务停止时的清理唤醒

这里的“重新处理”不是 Broker 手里已经攥着一批消息,等条件满足后直接把它推给挂起请求,而是重新调用一次 processor.processRequest(…),也就是再走一遍 POP 处理流程。

需要特别注意的是,POP 的长轮询本质上仍然是“请求驱动的拉取”,而不是“服务端主动推送”。

要额外注意的是,POLLING_FULL 不是唤醒场景。如果长轮询队列本身已经满了,请求不会进入挂起队列,而是会直接返回 POLLING_FULL。

五、ACK、续租和 revive

消息返回客户端之后,还剩三件事要闭环:处理完成怎么 ACK,时间不够怎么续租,超时之后又怎么恢复。

5.1 ACK

ACK 请求入口是 AckMessageProcessor。最重要的是它最终会落到 PopConsumerService.ackAsync(…)。它的主流程可以理解成这样:

  1. AckMessageProcessor.appendAckNew(…) 先从 extraInfo 或 BatchAck 里还原这次 POP 的关键信息
  2. 然后调用 PopConsumerService.ackAsync(…)
  3. ackAsync(…) 会根据 popTime/groupId/topicId/queueId/invisibleTime/offset 构造对应的 PopConsumerRecord 作为查找 key;底层存储键最终会折算成 visibilityTimeout + groupId + topicId + queueId + offset
  4. 然后先尝试从 PopConsumerCache 删除;删不到再去 PopConsumerKVStore 删除

所以这里的 ACK,就是把这条消息对应的“进行中记录”删掉。

5.2 为什么 ACK 不是简单提交 offset

因为 POP 是“消息级确认”,不是“队列级顺序推进”。同一批次返回的多条消息里,可能有的已经处理完成,有的还在处理中,所以 Broker 必须保留消息级的未确认状态,而不能只靠一个队列 offset 表达完成状态。

放在这里看,这层状态就是 PopConsumerRecord。

5.3 changeInvisibleTime

ChangeInvisibleTimeProcessor 处理续租请求。它最终会进入 PopConsumerService.changeInvisibilityDuration(…)。这条分支会做两件事:

  • 写入一条新的 PopConsumerRecord(changedPopTime + changedInvisibleTime)
  • 删除旧的 PopConsumerRecord

本质上等于把原来的“超时点”整体往后挪。

要注意,这里新生成的记录不是重新写回 PopConsumerCache,而是直接写 PopConsumerKVStore,然后再删除旧记录。

5.4 超时未 ACK 后如何 revive

PopConsumerService 自身就是一个背景线程服务,会定期扫描 PopConsumerKVStore 里的过期记录:

  • 实现上通过 scanExpiredRecords(…) 扫描已经进入过期窗口的 PopConsumerRecord
  • 再去读原始消息
  • 把原始消息重写到 retry topic
  • 删除旧记录

如果打开了 enablePopBufferMerge,还要再补一层理解:部分记录会先进入 PopConsumerCache,缓存线程会先把未过期记录刷到 store,把已过期记录直接交给 revive 逻辑处理。所以“后台线程只扫 KVStore”是主路径,但不是全部。

如果 revive 失败,不是无限重试,而是会按 attemptTimes 做退避重试。超过最大尝试次数后,日志里会出现 message may be lost。

5.5 revive 重投到哪里

如果原消息本来就来自重试主题,则继续回到对应重试主题。如果原消息来自普通 Topic,则会重投到 POP retry topic(v1 或 v2,取决于 Broker 配置),并且会带上 PROPERTY_FIRST_POP_TIME、PROPERTY_ORIGIN_GROUP 和最新的重试次数。

图注:ACK 删除 PopConsumerRecord 标记完成,changeInvisibleTime 写新记录+删旧记录将超时点后推。两者操作路径、写入目标、最终效果均不同。

六、最后补三个容易混的点

6.1 FIFO 为什么不是这套模型

前面讲的这套模型里,核心一直是“每条消息对应一条 PopConsumerRecord,再围绕它做 ACK、续租和 revive”。FIFO 不是这套思路。

FIFO 更像“带不可见期的顺序消费”。Broker 关心的不是每条消息各自有没有一条独立投递记录,而是同一队列上的顺序窗口有没有被前面的未 ACK 消息卡住。对应到源码上,核心状态从 PopConsumerRecord 换成了 OrderInfo,由 ConsumerOrderInfoManager 维护。

所以 FIFO 不走前面那套消息级 retry/revive 主流程。它的超时恢复也不是后台扫描,而是后续 POP 通过 OrderInfo.needBlock() 判断阻塞窗口是否已经过期;一旦阻塞解除,因为 committedOffset 还没前进,消息会从原始 queue 原地重新投递,而不是进 retry topic。

6.2 旧路径在做什么

旧路径和前面讲的是同一个问题的另一套实现。如果 isPopConsumerKVServiceEnable() 为 false,Broker 不再用 PopConsumerRecord 表示“已投递未确认”,而是换成 PopCheckPoint / AckMsg / BatchAckMsg 这一组对象。

大致可以把它理解成:

  • POP 取消息后生成 PopCheckPoint
  • ACK 通过 AckMsg / BatchAckMsg + PopBufferMergeService 表达
  • changeInvisibleTime 会追加新的 PopCheckPoint,再去 ACK 旧 checkpoint
  • 超时恢复依赖 revive topic 和 PopReviveService

旧路径和前文解决的是同一个问题,但中间状态模型已经完全换了一套。理解到 checkpoint、ack 日志和 revive topic 这一层就够了。

▍6.3 涉及重试消息时,客户端看到的 topic 为什么会不同

这里最容易混淆的一点是:客户端看到的消息 topic,不等于 ACK / 续租真正作用的 real topic。

POP 返回消息时,Broker 可能会对响应里的 topic 做重编码,尤其是消息实际来自 retry topic 时更容易让人误会。但客户端后续 ACK / changeInvisibleTime 依赖的并不是当前消息对象上的 topic,而是 PROPERTY_POP_CK / receipt handle 里的 retry 标记,再据此恢复 real topic。

所以仅凭客户端日志里看到的 topic,不能反推 Broker 这次一定是从普通 topic 读到的,还是从 retry topic 读到的。把这件事分开看就不容易绕:日志里的 topic 更像是响应呈现层,ACK / 续租用的 topic 才是语义层。

图注:客户端日志中的 topic 属于“呈现层”(可能被 Broker 重编码),而 ACK/续租使用的 real topic 属于“语义层”(来自 PROPERTY_POP_CK 中的 retry 标记)。两者不能混为一谈。

七、再往外看:为什么 Kafka 也在演进这类能力

如果把 POP 从 RocketMQ 自己的实现细节里跳出来看,它解决的其实不是一个 RocketMQ 独有的问题,而是一类更普遍的消息消费诉求:

  • 共享消费
  • 显式确认
  • 处理中窗口控制
  • 超时后的再次投递

RocketMQ POP 是其中一种实现方式。它仍然建立在 RocketMQ 自己的 Topic、queue、retry topic、FIFO/非 FIFO 分支之上,但消费语义上已经明显引入了“消息被借出一段时间、完成后确认、超时后恢复”的队列式特征。

Kafka 近几个版本也在沿着这个方向演进。按照 Apache Kafka 官方文档和 KIP-932 的表述:

  • Kafka 4.1 把 share groups 作为 preview 形态引入
  • Kafka 4.2 把 Queues for Kafka / share groups 标记为 production-ready
  • 它提供的是一种不同于传统 consumer group 的共享消费模型
  • 不再要求每个 partition 只能分配给一个消费者实例处理
  • 消费者可以做逐条 ACK,并跟踪 delivery attempts;同时系统仍然针对 batch processing 做了优化
  • 官方文档也明确说,这类能力更适合“records are processed one at a time”,而不是典型的 ordered stream

如果只看高层语义,会发现它和 POP 想解决的问题其实很像:都在补“共享消费 + 显式确认 + 失败后继续处理”这类能力。当然,两者并不是一套实现:

  • RocketMQ POP 的关键语义是 invisibleTime、ACK、续租、revive,以及围绕这些语义建立的 Broker 侧投递记录
  • Kafka share groups 的关键语义是共享消费组、逐条 ACK、delivery attempts,以及围绕 share group 建立的新协调与状态管理

这也说明,“共享消费、逐条确认、处理中窗口控制、失败后继续处理”并不是某一个消息系统偶然长出来的特性,而是消息系统里真实存在的一类业务需求。不同消息系统都在沿着各自的体系往这个方向演进,只是具体实现路径和抽象方式并不相同。

八、关键源码索引

以下以 release-5.4.0 分支、提交 b5da00ad0 为代码基线。

功能关键类 / 方法
POP 请求入口PopMessageProcessor.processRequest()
KV 路径主流程PopConsumerService.popAsync()
重试消息重编码PopConsumerService.recodeRetryMessage()
ACK 主流程AckMessageProcessor.processRequest()→ appendAckNew()→ PopConsumerService.ackAsync()
续租主流程ChangeInvisibleTimeProcessor.processRequestAsync()→ PopConsumerService.changeInvisibilityDuration()
revive 主流程PopConsumerService.revive(PopConsumerRecord) / revive(AtomicLong, int)
旧路径buffer mergePopBufferMergeService
旧路径 revivePopReviveService
FIFO 顺序状态QueueLevelConsumerManager
长轮询挂起PopLongPollingService.polling()
长轮询唤醒PopLongPollingService.wakeUp()

总结

POP 消费机制的核心可以概括为三句话:

第一,POP 补上了 Broker 侧的“处理中状态”。 消息被取走后不立刻算完成,而是进入 invisibleTime 不可见窗口,Broker 通过 PopConsumerRecord 跟踪每条“已投递但未完成”的消息。

第二,消费进度不再仅靠 offset 表达。 offset 决定从哪里继续读,PopConsumerRecord 决定哪些消息还在处理中。ACK 删除记录、续租切换超时点、revive 处理超时恢复,三者共同构成状态闭环。

第三,这是消息系统的一类共性需求。 从 RocketMQ 的 POP 到 Kafka 的 share groups,不同系统都在用各自的方式实现“共享消费 + 显式确认 + 处理中窗口控制”,因为这是实际业务中的真实诉求。