最新文章

2025年5月29日

Apache RocketMQ 源码解析 —— 秒级定时消息介绍
背景 如今rocketmq的应用场景日益拓宽,延时消息的需求也在增加。原本的特定级别延时消息已经不足以支撑rocketmq灵活的使用场景。因此,我们需要一个支持任意时间的延迟消息feature。 支持任意时间延迟的feature能够让使用者在消息发出时指定其消费时间,在生活与生产中具有非常重要的意义。 目标 1. 支持任意时延的延迟消息。 2. 提供延迟消息可靠的存储方式。 3. 保证延迟消息具有可靠的收发性能。 4. 提供延迟消息的可观测性排查能力。 架构 存储数据结构 本方案主要通过时间轮实现任意时延的定时消息。在此过程中,涉及两个核心的数据结构:TimerLog(存储消息索引)和TimerWheel(时间轮,用于定时消息到时)。 TimerLog,为本RIP中所设计的定时消息的记录文件,Append Only。每条记录包含一个prev_pos,指向前一条定时到同样时刻的记录。每条记录的内容可以包含定时消息本身,也可以只包含定时消息的位置信息。每一条记录包含如下信息: | 名称 | 大小 | 备注 | | | | | | size | 4B | 保存记录的大小 | | prev_pos | 8B | 前一条记录的位置 | | next_Pos | 8B | 后一条记录的位置,暂时为1,作为保留字段 | | magic | 4B | magic value | | delayed_time | 4B | 该条记录的定时时间 | | offset_real | 8B | 该条消息在commitLog中的位置 | | size_real | 4B | 该条消息在commitLog中的大小 | | hash_topic | 4B | 该条消息topic的hash code | | varbody | | 存储可变的body,暂时没有 | TimerWheel是对时刻表的一种抽象,通常使用数组实现。时刻表上的每一秒,顺序对应到数组中的位置,然后数组循环使用。时间轮的每一格,指向了TimerLog中的对应位置,如果这一格的时间到了,则按TimerLog中的对应位置以及prev_pos位置依次读出每条消息。 时间轮一格一格向前推进,配合TimerLog,依次读出到期的消息,从而达到定时消息的目的。时间轮的每一格设计如下: | delayed_time(8B) 延迟时间 | first_pos(8B) 首条位置 | last_pos(8B) 最后位置 | num(4B)消息条数 | | | | | | 上述设计的TimerLog与TimerWheel的协作如下图所示。 pipeline 在存储方面,采用本地文件系统作为可靠的延时消息存储介质。延时消息另存TimerLog文件中。通过时间轮对定时消息进行定位以及存取。针对长时间定时消息,通过消息滚动的方式避免过大的消息存储量。其具体架构如下所示: 从图中可以看出,共有五个Service分别处理定时消息的放置和存储。工作流如下: 1. 针对放置定时消息的service,每50ms从commitLog读取指定主题(TIMER_TOPIC)的定时消息。 1. TimerEnqueueGetService从commitLog读取得到定时主题的消息,并先将其放入enqueuePutQueue。 2. 另一个线程TimerEnqueuePutService将其放入timerLog,更新时间轮的存储内容。将该任务放进时间轮的指定位置。 2. 针对取出定时消息的service,每50ms读取下一秒的slot。有三个线程将读取到的消息重新放回commitLog。 1. 首先,TimerDequeueGetService每50ms读一次下一秒的slot,从timerLog中得到指定的msgs,并放进dequeueGetQueue。 2. 而后TimerDequeueGetMessageService从dequeueGetQueue中取出msg,并将其放入队列中。该队列为待写入commitLog的队列,dequeuePutQueue。 3. 最后TimerDequeuePutMessageService将这个queue中的消息取出,若已到期则修改topic,放回commitlog,否则继续按原topic写回CommitLog滚动。 代码实现 TimerLog与TimerWheel的协作实现 定时消息的核心存储由TimerLog和TimerWheel协同完成。TimerLog作为顺序写入的日志文件,每条记录包含消息在CommitLog中的物理偏移量(offsetPy)和延迟时间(delayed_time)。当消息到达时,TimerEnqueuePutService会将其索引信息追加到TimerLog,并通过prev_pos字段构建链表结构,确保同一时刻的多个消息可被快速遍历。 ```java // TimerLog.java 核心写入逻辑 public long append(byte[] data, int pos, int len) { MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); // 处理文件切换:当当前文件剩余空间不足时,填充空白段并创建新文件 if (len + MIN_BLANK_LEN mappedFile.getFileSize() mappedFile.getWrotePosition()) { ByteBuffer blankBuffer = ByteBuffer.allocate(MIN_BLANK_LEN); blankBuffer.putInt(mappedFile.getFileSize() mappedFile.getWrotePosition()); blankBuffer.putLong(0); // prev_pos置空 blankBuffer.putInt(BLANK_MAGIC_CODE); // 标记空白段 mappedFile.appendMessage(blankBuffer.array()); mappedFile = this.mappedFileQueue.getLastMappedFile(0); // 切换到新文件 } // 写入实际数据并返回物理偏移量 long currPosition = mappedFile.getFileFromOffset() + mappedFile.getWrotePosition(); mappedFile.appendMessage(data, pos, len); return currPosition; } ``` 此代码展示了消息追加的核心流程: 1. 检查当前文件的剩余空间,不足时填充空白段并创建新文件 2. 将消息索引数据写入内存映射文件 3. 返回写入位置的全局偏移量,供时间轮记录 时间轮槽位管理 TimerWheel通过数组结构管理时间槽位,每个槽位记录该时刻的首尾指针和消息数量。当消息入队时,putSlot方法会更新对应槽位的链表结构: ```java // TimerWheel.java 槽位更新逻辑 public void putSlot(long timeMs, long firstPos, long lastPos, int num, int magic) { localBuffer.get().position(getSlotIndex(timeMs) Slot.SIZE); localBuffer.get().putLong(timeMs / precisionMs); // 标准化时间戳 localBuffer.get().putLong(firstPos); // 链表头指针 localBuffer.get().putLong(lastPos); // 链表尾指针 localBuffer.get().putInt(num); // 当前槽位消息总数 localBuffer.get().putInt(magic); // 特殊标记(如滚动/删除) } ``` 该方法的实现细节: + getSlotIndex(timeMs)将时间戳映射到环形数组的索引 + 同时记录首尾指针以实现O(1)复杂度插入 消息入队流程 TimerEnqueueGetService:消息扫描服务 该服务作为定时消息入口,持续扫描CommitLog中的TIMER_TOPIC消息。其核心逻辑通过enqueue方法实现: ```java // TimerMessageStore.java public boolean enqueue(int queueId) { ConsumeQueueInterface cq = this.messageStore.getConsumeQueue(TIMER_TOPIC, queueId); ReferredIterator iterator = cq.iterateFrom(currQueueOffset); while (iterator.hasNext()) { CqUnit cqUnit = iterator.next(); MessageExt msgExt = getMessageByCommitOffset(cqUnit.getPos(), cqUnit.getSize()); // 构造定时请求对象 TimerRequest timerRequest = new TimerRequest( cqUnit.getPos(), cqUnit.getSize(), Long.parseLong(msgExt.getProperty(TIMER_OUT_MS)), System.currentTimeMillis(), MAGIC_DEFAULT, msgExt ); // 放入入队队列(阻塞式) while (!enqueuePutQueue.offer(timerRequest, 3, TimeUnit.SECONDS)) { if (!isRunningEnqueue()) return false; } currQueueOffset++; // 更新消费进度 } } ``` 关键设计点: 1. 增量扫描:通过currQueueOffset记录消费位移,避免重复处理 2. 消息转换:将ConsumeQueue中的索引转换为包含完整元数据的TimerRequest` TimerEnqueuePutService:时间轮写入服务 从队列获取请求后,该服务执行核心的定时逻辑: ```java // TimerMessageStore.java public boolean doEnqueue(long offsetPy, int sizePy, long delayedTime, MessageExt messageExt) { // 计算目标时间槽位 Slot slot = timerWheel.getSlot(delayedTime); // 构造TimerLog记录 ByteBuffer buffer = ByteBuffer.allocate(TimerLog.UNIT_SIZE); buffer.putLong(slot.lastPos); // 前驱指针指向原槽位尾 buffer.putLong(offsetPy); // CommitLog物理偏移 buffer.putInt(sizePy); // 消息大小 buffer.putLong(delayedTime); // 精确到毫秒的延迟时间 // 写入TimerLog并更新时间轮 long pos = timerLog.append(buffer.array()); timerWheel.putSlot(delayedTime, slot.firstPos, pos, slot.num + 1); } ``` 写入优化策略: + 空间预分配:当检测到当前MappedFile剩余空间不足时,自动填充空白段并切换文件 + 链表式存储:通过prev_pos字段构建时间槽位的倒序链表,确保新消息快速插入 + 批量提交:积累多个请求后批量写入,减少文件I/O次数 TimerEnqueuePutService从队列获取消息请求,处理消息滚动逻辑。当检测到延迟超过时间轮窗口时,将消息重新写入并标记为滚动状态: ```java // TimerMessageStore.java 消息滚动处理 boolean needRoll = delayedTime tmpWriteTimeMs = (long) timerRollWindowSlots precisionMs; if (needRoll) { magic |= MAGIC_ROLL; // 调整延迟时间为时间轮窗口中间点,确保滚动后仍有处理时间 delayedTime = tmpWriteTimeMs + (long) (timerRollWindowSlots / 2) precisionMs; } ``` 此逻辑的关键设计: 1. 当延迟时间超过当前时间轮容量时触发滚动 2. 将消息的delayed_time调整为窗口中间点,避免频繁滚动 3. 设置MAGIC_ROLL标记,出队时识别滚动消息 消息出队处理 TimerDequeueGetService:到期扫描服务 该服务以固定频率推进时间指针,触发到期消息处理: ```java // TimerMessageStore.java public int dequeue() throws Exception { // 获取当前时间槽位 Slot slot = timerWheel.getSlot(currReadTimeMs); // 遍历TimerLog链表 long currPos = slot.lastPos; while (currPos != 1) { SelectMappedBufferResult sbr = timerLog.getTimerMessage(currPos); ByteBuffer buf = sbr.getByteBuffer(); // 解析记录元数据 long prevPos = buf.getLong(); long offsetPy = buf.getLong(); int sizePy = buf.getInt(); long delayedTime = buf.getLong(); // 分类处理(普通消息/删除标记) if (isDeleteMarker(buf)) { deleteMsgStack.add(new TimerRequest(offsetPy, sizePy, delayedTime)); } else { normalMsgStack.addFirst(new TimerRequest(offsetPy, sizePy, delayedTime)); } currPos = prevPos; // 前向遍历链表 } // 分发到处理队列 splitAndDispatch(normalMsgStack); splitAndDispatch(deleteMsgStack); moveReadTime(); // 推进时间指针 } ``` TimerDequeuePutMessageService:消息投递服务 TimerDequeuePutMessageService将到期消息重新写入CommitLog。对于滚动消息,会修改主题属性并增加重试计数: ```java // TimerMessageStore.java 消息转换逻辑 MessageExtBrokerInner convert(MessageExt messageExt, long enqueueTime, boolean needRoll) { if (needRoll) { // 增加滚动次数标记 MessageAccessor.putProperty(messageExt, TIMER_ROLL_TIMES, Integer.parseInt(messageExt.getProperty(TIMER_ROLL_TIMES)) + 1 + ""); } // 恢复原始主题和队列ID messageInner.setTopic(messageInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC)); messageInner.setQueueId(Integer.parseInt( messageInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID))); return messageInner; } ``` 此转换过程确保: + 滚动消息保留原始主题信息 + 每次滚动增加TIMER_ROLL_TIMES属性 该服务最终将到期消息重新注入CommitLog: ```java // TimerMessageStore.java public void run() { while (!stopped) { TimerRequest req = dequeuePutQueue.poll(10, TimeUnit.MILLISECONDS); MessageExtBrokerInner innerMsg = convert(req.getMsg(), req.needRoll()); // 消息重投递逻辑 int result = doPut(innerMsg, req.needRoll()); switch (result) { case PUT_OK: // 成功:更新监控指标 timerMetrics.recordDelivery(req.getTopic()); break; case PUT_NEED_RETRY: // 重试:重新放回队列头部 dequeuePutQueue.putFirst(req); break; case PUT_NO_RETRY: // 丢弃:记录错误日志 log.warn("Discard undeliverable message:{}", req); } } } ``` 故障恢复机制 系统重启时通过recover方法重建时间轮状态。关键步骤包括遍历TimerLog文件并修正槽位指针: ```java // TimerMessageStore.java 恢复流程 private long recoverAndRevise(long beginOffset, boolean checkTimerLog) { List mappedFiles = timerLog.getMappedFileQueue().getMappedFiles(); for (MappedFile mappedFile : mappedFiles) { SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0); ByteBuffer bf = sbr.getByteBuffer(); while (position Google Doc: + Shimo:

2025年5月29日

Apache RocketMQ 源码解析 —— Controller 高可用切换架构
一、原理及核心概念浅述 1.1 核心架构 1.2 核心概念 1. controller:负责管理broker间的主备关系,可以挂在namesrv中,不影响namesrv能力,支持独立部署。 2. master/slave:主备身份。 3. syncStateSet:字面意思为“同步状态集合”。当备节点能够及时跟上主节点,则会纳入syncStateSet。 4. epoch:用于记录每一次主备切换时的状态,避免切换后产生数据丢失或者不一致的情况。 为方便理解,在某些过程中可以把controller当作班主任,master作为小组长,slave作为小组成员。同步过程是各位同学向小组长抄作业的过程,位于syncStateSet中的是优秀作业。 二、相关代码文件及说明 核心是“controller+broker+复制过程”,因此分三块进行叙述。 2.1 Controller 该部分代码主要集中在rocketmqcontroller模块下,主要有如下代码文件: + ControllerManager: 负责管理controller,其中存储了许多controller相关配置,并负责了心跳管理等核心功能。(班主任管理条例) + DLederController: Controller的DLedger实现。包含了controller的基本功能。在其中实现了副本信息管理、broker存活情况探测、选举Master等核心功能。(某种班主任) + DefaultBrokerHeartbeatManager: 负责管理broker心跳,其中包含了broker存活情况表,以及在broker下线时的listeners,当副本掉线时,触发重新选举。(点名册) + ReplicasInfoManager: 负责controller中事件的处理。即各种选举事件、更换SyncStateSet事件等等。(小组登记册) + ControllerRequestProcessor: 处理向controller发送的requests,例如让controller选举、向controller注册broker、心跳、更换SyncStateSet等等。(班主任信箱) + DefaultElectPolicy: 选举Master的策略。可以选择从sync状态的副本中选,也可以支持从所有副本中(无论是否同步)的unclean选举。(班规) + ...... 2.2 Broker 该部分代码主要集中在rocketmqbroker模块中,可进入org/apache/rocketmq/broker/controller进行查看: + ReplicasManager: 完成自己作为一个replica的使命——找controller,角色管理,Master更新(Expand/Shrink)SyncStateSet等等。 2.3 复制模块 该部分代码主要集中在rocketmqstore模块中的ha文件夹下: + HAService: 每个Replica必备的的service,负责管理作为主、备的同步任务。 + HAClient: 每个Slave 的HAService中必备的client,负责管理同步任务中的读、写操作。 + HAConnection: 代表在Master中的HA连接,每个connection理论上对应一个slave。在该connection类中存储了传输过程中的诸多内容,包括channel、传输状态、当前传输位点等等信息。 三、核心流程 3.1 心跳 核心CODE:BROKER_HEARTBEAT Broker端: 该部分较简单,带上code向controller发request,不再赘述: BrokerController.sendHeartbeat() brokerOuterAPI.sendHeartbeat() Controller端: 1. 首先由ControllerRequestProcessor接收到code,进入处理逻辑: ```java private RemotingCommand handleBrokerHeartbeat(ChannelHandlerContext ctx, RemotingCommand request) throws Exception { final BrokerHeartbeatRequestHeader requestHeader = (BrokerHeartbeatRequestHeader) request.decodeCommandCustomHeader(BrokerHeartbeatRequestHeader.class); if (requestHeader.getBrokerId() == null) { return RemotingCommand.createResponseCommand(ResponseCode.CONTROLLER_INVALID_REQUEST, "Heart beat with empty brokerId"); } this.heartbeatManager.onBrokerHeartbeat(requestHeader.getClusterName(), requestHeader.getBrokerName(), requestHeader.getBrokerAddr(), requestHeader.getBrokerId(), requestHeader.getHeartbeatTimeoutMills(), ctx.channel(), requestHeader.getEpoch(), requestHeader.getMaxOffset(), requestHeader.getConfirmOffset(), requestHeader.getElectionPriority()); return RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "Heart beat success"); } ``` 2. 之后在onBrokerHeartbeat()中,主要更新controller brokerHeartbeatManager中的brokerLiveTable: ```java public void onBrokerHeartbeat(String clusterName, String brokerName, String brokerAddr, Long brokerId, Long timeoutMillis, Channel channel, Integer epoch, Long maxOffset, Long confirmOffset, Integer electionPriority) { BrokerIdentityInfo brokerIdentityInfo = new BrokerIdentityInfo(clusterName, brokerName, brokerId); BrokerLiveInfo prev = this.brokerLiveTable.get(brokerIdentityInfo); ...... if (null == prev) { this.brokerLiveTable.put(...); log.info("new broker registered, {}, brokerId:{}", brokerIdentityInfo, realBrokerId); } else { prev.setXXX(......) } } ``` 3.2 选举 相关CODE: CONTROLLER_ELECT_MASTER 有如下几种情形可能触发选举: 1. controller主动发起,通过triggerElectMaster(): 1. HeartbeatManager监听到有broker心跳失效。 (班主任发现有小组同学退学了) 2. Controller检测到有一组Replica Set不存在master。(班主任发现有组长虽然在名册里,但是挂了) 2. broker发起将自己选为master,通过ReplicaManager.brokerElect(): 1. Broker向controller查metadata时,没找到master信息。(同学定期检查小组情况,问班主任为啥没小组长) 2. Broker向controller注册完后,仍未从controller获取到master信息。(同学报道后发现没小组长,汇报) 3. 通过tools发起: 1. 通过选举命令ReElectMasterSubCommand发起。(校长直接任命) 上述所有过程,最终均触发: controller.electMaster() replicasInfoManager.electMaster() // 即,所有小组长必须通过班主任任命 ```java public ControllerResult electMaster(final ElectMasterRequestHeader request, final ElectPolicy electPolicy) { ... // 从request中取信息 ... if (syncStateInfo.isFirstTimeForElect()) { // 从未注册,直接任命 newMaster = brokerId; } // 按选举政策选主 if (newMaster == null) { // we should assign this assignedBrokerId when the brokerAddress need to be elected by force Long assignedBrokerId = request.getDesignateElect() ? brokerId : null; newMaster = electPolicy.elect(brokerReplicaInfo.getClusterName(), brokerReplicaInfo.getBrokerName(), syncStateSet, allReplicaBrokers, oldMaster, assignedBrokerId); } if (newMaster != null && newMaster.equals(oldMaster)) { // 老主 == 新主 // old master still valid, change nothing String err = String.format("The old master %s is still alive, not need to elect new master for broker %s", oldMaster, brokerReplicaInfo.getBrokerName()); LOGGER.warn("{}", err); // the master still exist response.setXXX() result.setBody(new ElectMasterResponseBody(syncStateSet).encode()); result.setCodeAndRemark(ResponseCode.CONTROLLER_MASTER_STILL_EXIST, err); return result; } // a new master is elected if (newMaster != null) { // 出现不一样的新主 final int masterEpoch = syncStateInfo.getMasterEpoch(); final int syncStateSetEpoch = syncStateInfo.getSyncStateSetEpoch(); final HashSet newSyncStateSet = new HashSet<(); //设置新的syncStateSet newSyncStateSet.add(newMaster); response.setXXX()... ElectMasterResponseBody responseBody = new ElectMasterResponseBody(newSyncStateSet); } result.setBody(responseBody.encode()); final ElectMasterEvent event = new ElectMasterEvent(brokerName, newMaster); result.addEvent(event); return result; } // 走到这里,说明没有主,选举失败 // If elect failed and the electMaster is triggered by controller (we can figure it out by brokerAddress), // we still need to apply an ElectMasterEvent to tell the statemachine // that the master was shutdown and no new master was elected. if (request.getBrokerId() == null) { final ElectMasterEvent event = new ElectMasterEvent(false, brokerName); result.addEvent(event); result.setCodeAndRemark(ResponseCode.CONTROLLER_MASTER_NOT_AVAILABLE, "Old master has down and failed to elect a new broker master"); } else { result.setCodeAndRemark(ResponseCode.CONTROLLER_ELECT_MASTER_FAILED, "Failed to elect a new master"); } return result; } ``` 3.3 更新SyncStateSet 核心CODE: CONTROLLER_ALTER_SYNC_STATE_SET 1. 由master发起,主动向controller更换syncStateSet(等价于小组长汇报优秀作业) 2. controllerRequestProcessor接收更换syncStateSet的请求,进入handleAlterSyncStateSet()方法: ```java private RemotingCommand handleAlterSyncStateSet(ChannelHandlerContext ctx, RemotingCommand request) throws Exception { final AlterSyncStateSetRequestHeader controllerRequest = (AlterSyncStateSetRequestHeader) request.decodeCommandCustomHeader(AlterSyncStateSetRequestHeader.class); final SyncStateSet syncStateSet = RemotingSerializable.decode(request.getBody(), SyncStateSet.class); final CompletableFuture future = this.controllerManager.getController().alterSyncStateSet(controllerRequest, syncStateSet); if (future != null) { return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS); } return RemotingCommand.createResponseCommand(null); } ``` 3. 之后进入Controller.alterSyncStateSet() replicasInfoManager.alterSyncStateSet()方法: ```java public ControllerResult alterSyncStateSet( final AlterSyncStateSetRequestHeader request, final SyncStateSet syncStateSet, final BrokerValidPredicate brokerAlivePredicate) { final String brokerName = request.getBrokerName(); ... final Set newSyncStateSet = syncStateSet.getSyncStateSet(); final SyncStateInfo syncStateInfo = this.syncStateSetInfoTable.get(brokerName); final BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName); // 检查syncStateSet是否有变化 final Set oldSyncStateSet = syncStateInfo.getSyncStateSet(); if (oldSyncStateSet.size() == newSyncStateSet.size() && oldSyncStateSet.containsAll(newSyncStateSet)) { String err = "The newSyncStateSet is equal with oldSyncStateSet, no needed to update syncStateSet"; ... } // 检查是否是master发起的 if (!syncStateInfo.getMasterBrokerId().equals(request.getMasterBrokerId())) { String err = String.format("Rejecting alter syncStateSet request because the current leader is:{%s}, not {%s}", syncStateInfo.getMasterBrokerId(), request.getMasterBrokerId()); ... } // 检查master的任期epoch是否一致 if (request.getMasterEpoch() != syncStateInfo.getMasterEpoch()) { String err = String.format("Rejecting alter syncStateSet request because the current master epoch is:{%d}, not {%d}", syncStateInfo.getMasterEpoch(), request.getMasterEpoch()); ... } // 检查syncStateSet的epoch if (syncStateSet.getSyncStateSetEpoch() != syncStateInfo.getSyncStateSetEpoch()) { String err = String.format("Rejecting alter syncStateSet request because the current syncStateSet epoch is:{%d}, not {%d}", syncStateInfo.getSyncStateSetEpoch(), syncStateSet.getSyncStateSetEpoch()); ... } // 检查新的syncStateSet的合理性 for (Long replica : newSyncStateSet) { // 检查replica是否存在 if (!brokerReplicaInfo.isBrokerExist(replica)) { String err = String.format("Rejecting alter syncStateSet request because the replicas {%s} don't exist", replica); ... } // 检查broker是否存活 if (!brokerAlivePredicate.check(brokerReplicaInfo.getClusterName(), brokerReplicaInfo.getBrokerName(), replica)) { String err = String.format("Rejecting alter syncStateSet request because the replicas {%s} don't alive", replica); ... } } // 检查是否包含master if (!newSyncStateSet.contains(syncStateInfo.getMasterBrokerId())) { String err = String.format("Rejecting alter syncStateSet request because the newSyncStateSet don't contains origin leader {%s}", syncStateInfo.getMasterBrokerId()); ... } // 更新epoch int epoch = syncStateInfo.getSyncStateSetEpoch() + 1; ... // 生成事件,替换syncStateSet final AlterSyncStateSetEvent event = new AlterSyncStateSetEvent(brokerName, newSyncStateSet); ... } ``` 4. 最后通过syncStateInfo.updateSyncStateSetInfo(),更新syncStateSetInfoTable.get(brokerName)得到的syncStateInfo信息(该过程可以理解为班主任在班级分组册上找到了组长的名字,拿出组员名单,更新)。 3.4 复制 该部分较复杂,其中HAService/HAClient/HAConnection以及其中的各种Service/Reader/Writer容易产生混淆,对阅读造成阻碍。因此绘制本图帮助理解(可在粗读源码后回头理解): 下面对HA复制过程作拆解,分别讲解: 1. 在各个replica的DefaultMessageStore中均注册了HAService,负责管理HA的复制。 2. 在Master的 HAService中有一个AcceptSocketService, 负责自动接收各个slave的连接: ```java protected abstract class AcceptSocketService extends ServiceThread { ... / Starts listening to slave connections. @throws Exception If fails. / public void beginAccept() throws Exception { ... } @Override public void shutdown(final boolean interrupt) { ... } @Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { this.selector.select(1000); Set selected = this.selector.selectedKeys(); if (selected != null) { for (SelectionKey k : selected) { if (k.isAcceptable()) { SocketChannel sc = ((ServerSocketChannel) k.channel()).accept(); if (sc != null) { DefaultHAService.log.info("HAService receive new connection, " + sc.socket().getRemoteSocketAddress()); try { HAConnection conn = createConnection(sc); conn.start(); DefaultHAService.this.addConnection(conn); } catch (Exception e) { log.error("new HAConnection exception", e); sc.close(); } } } ... } } ``` 3. 在各个Slave 的HAService中存在一个HAClient,负责向master发起连接、传输请求。 ```java public class AutoSwitchHAClient extends ServiceThread implements HAClient { ... } public interface HAClient { void start(); void shutdown(); void wakeup(); void updateMasterAddress(String newAddress); void updateHaMasterAddress(String newAddress); String getMasterAddress(); String getHaMasterAddress(); long getLastReadTimestamp(); long getLastWriteTimestamp(); HAConnectionState getCurrentState(); void changeCurrentState(HAConnectionState haConnectionState); void closeMaster(); long getTransferredByteInSecond(); } ``` 4. 当master收到slave的连接请求后,将会创建一个HAConnection,负责收发内容。 ```java public interface HAConnection { void start(); void shutdown(); void close(); SocketChannel getSocketChannel(); HAConnectionState getCurrentState(); String getClientAddress(); long getTransferredByteInSecond(); long getTransferFromWhere(); long getSlaveAckOffset(); } ``` 5. Master的HAConnection会与Slave的HAClient建立连接,二者均通过HAWriter(较简单,不解读,位于HAWriter类)往socket中写内容,再通过HAReader读取socket中的内容。只不过一个是HAServerReader,一个是HAClientReader: ```java public abstract class AbstractHAReader { private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); protected final List readHookList = new ArrayList<(); public boolean read(SocketChannel socketChannel, ByteBuffer byteBufferRead) { int readSizeZeroTimes = 0; while (byteBufferRead.hasRemaining()) { ... boolean result = processReadResult(byteBufferRead); ... } } ... protected abstract boolean processReadResult(ByteBuffer byteBufferRead); } ``` 6. 两种HAReader均实现了processReadResult()方法,负责处理从socket中得到的数据。client需要详细阐述该方法,因为涉及到如何将读进来的数据写入commitlog,client的processReadResult(): ```java @Override protected boolean processReadResult(ByteBuffer byteBufferRead) { int readSocketPos = byteBufferRead.position(); try { while (true) { ... switch (AutoSwitchHAClient.this.currentState) { case HANDSHAKE: { ... // 握手阶段,先检查commitlog完整性,截断 } break; case TRANSFER: { // 传输阶段,将body写入commitlog ... byte[] bodyData = new byte[bodySize]; ... if (bodySize 0) { // 传输阶段,将body写入commitlog AutoSwitchHAClient.this.messageStore.appendToCommitLog(masterOffset, bodyData, 0, bodyData.length); } haService.updateConfirmOffset(Math.min(confirmOffset, messageStore.getMaxPhyOffset())); ... break; } default: break; } if (isComplete) { continue; } } // 检查buffer中是否还有数据, 如果有, compact() ... break; } } ... } ``` 7. server的processReadResult()主要用于接收client的握手等请求,较简单。更需要解释其WriteSocketService如何向socket中调用HAwriter去写数据: ```java abstract class AbstractWriteSocketService extends ServiceThread { ... private void transferToSlave() throws Exception { ... int size = this.getNextTransferDataSize(); if (size 0) { ... buildTransferHeaderBuffer(this.transferOffset, size); this.lastWriteOver = this.transferData(size); } else { // 无需传输,直接更新caught up的时间 AutoSwitchHAConnection.this.haService.updateConnectionLastCaughtUpTime(AutoSwitchHAConnection.this.slaveId, System.currentTimeMillis()); haService.getWaitNotifyObject().allWaitForRunning(100); } } @Override public void run() { AutoSwitchHAConnection.LOGGER.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { this.selector.select(1000); switch (currentState) { case HANDSHAKE: // Wait until the slave send it handshake msg to master. // 等待slave的握手请求,并进行回复 break; case TRANSFER: ... transferToSlave(); break; default: ... } } catch (Exception e) { ... } } ... // 在service结束后的一些事情 } ... } ``` 此处同样附上server实现processReadResult(),读socket中数据的代码: ```java @Override protected boolean processReadResult(ByteBuffer byteBufferRead) { while (true) { ... HAConnectionState slaveState = HAConnectionState.values()[byteBufferRead.getInt(readPosition)]; switch (slaveState) { case HANDSHAKE: // 收到了client的握手 ... LOGGER.info("Receive slave handshake, slaveBrokerId:{}, isSyncFromLastFile:{}, isAsyncLearner:{}", AutoSwitchHAConnection.this.slaveId, AutoSwitchHAConnection.this.isSyncFromLastFile, AutoSwitchHAConnection.this.isAsyncLearner); break; case TRANSFER: // 收到了client的transfer状态 ... // 更新client状态信息 break; default: ... } ... } ``` 3.5 Active Controller的选举 该选举主要通过DLedger实现,在DLedgerController中通过RoleChangeHandler.handle()更新自身身份: ```java class RoleChangeHandler implements DLedgerLeaderElector.RoleChangeHandler { private final String selfId; private final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("DLedgerControllerRoleChangeHandler_")); private volatile MemberState.Role currentRole = MemberState.Role.FOLLOWER; public RoleChangeHandler(final String selfId) { this.selfId = selfId; } @Override public void handle(long term, MemberState.Role role) { Runnable runnable = () { switch (role) { case CANDIDATE: this.currentRole = MemberState.Role.CANDIDATE; // 停止扫描inactive broker任务 ... case FOLLOWER: this.currentRole = MemberState.Role.FOLLOWER; // 停止扫描inactive broker任务 ... case LEADER: { log.info("Controller {} change role to leader, try process a initial proposal", this.selfId); int tryTimes = 0; while (true) { // 将会开始扫描inactive brokers ... break; } } }; this.executorService.submit(runnable); } ... } ```

2025年5月29日

Apache RocketMQ 源码解析 —— POP 消费模式逻辑介绍
一、背景 在RocketMQ 5.0之前,消费有两种方式可以从Broker获取消息,分别为Pull模式和Push模式。 1. Pull模式:消费需要不断的从阻塞队列中获取数据,如果没有数据就等待,这个阻塞队列中的数据由消息拉取线程从Broker拉取消息之后加入的,所以Pull模式下消费需要不断主动从Broker拉取消息。 2. Push模式:需要注册消息监听器,当有消息到达时会通过回调函数进行消息消费,从表面上看就像是Broker主动推送给消费者一样,所以叫做推模式,底层依旧是消费者从Broker拉取数据然后触发回调函数进行消息消费,只不过不需要像Pull模式一样不断判断是否有消息到来。 不过不管是Pull模式还是Push模式,在集群模式下,一个消息队列只能分配给同一个消费组内的某一个消费者进行消费,所以需要进行Rebalance负载均衡为每个消费者分配消息队列之后才可以进行消息消费。 Rebalance的工作是在每个消费者端进行的,消费端负责的工作太多,除了负载均衡还有消费位点管理等功能,如果新增一种语言的支持,就需要重新实现一遍对应的业务逻辑代码。 除此以外,在RocketMQ 5.0以前负载均衡是以消息队列为维度为每个消费者分配的,一个消息队列只能分给组内一个消费者消费,所以会存在以下问题: (1)队列一次只能分给组内一个消费者消费,扩展能力有限。 (2)消息队列数量与消费者数量比例不均衡时,可能会导致某些消费者没有消息队列可以分配或者某些消费者承担过多的消息队列,分配不均匀; (3)如果某个消费者hang住,会导致分配到该消费者的消息队列中的消息无法消费,导致消息积压; 在RocketMQ 5.0增加了Pop模式消费,将负载均衡、消费位点管理等功能放到了Broker端,减少客户端的负担,使其变得轻量级,并且5.0之后支持消息粒度的负载均衡。当前支持任意时延的定时消息已在开源社区开源,而其支持的任意时延定时特性可以帮助POP进行一定优化。主要优化点在于InvisibleTime的设置。 二、原理 POP模式下,消息的消费有如下几个过程: 客户端发起POP消费请求Broker处理POP请求,返回消息客户端消费完成,返回ACKBroker处理Ack请求,在内部进行消费完成的位点更新。 可以用下图表示: 三、核心代码 3.1 核心Processor 3.1.1 PopMessageProcessor 该线程用于处理客户端发出的POP请求。在收到POP请求后,将处理该请求,找到待POP的队列: ```java int index = (PermName.isPriority(topicConfig.getPerm()) ? i : randomQ + i) % queueIdList.size(); int queueId = queueIdList.get(index); if (brokerController.getBrokerConfig().isMarkTaintWhenPopQueueList()) { this.brokerController.getPullMessageProcessor().getPullRebalanceManager() .addOrUpdateTaint(requestHeader.getTopic(), requestHeader.getConsumerGroup(), queueId); } if (queueId = 0 && queueId 1) { // 通过位运算,标记指定下标的CK消息被ACK标记。 markBitCAS(pointWrapper.getBits(), indexOfAck); } ``` 在上述存储完成后,后续通过其它线程进行处理。处理包括CK、AK的存储,以及抵消等操作。主要在3.2节中阐述。 3.1.3 ChangeInvisibleTimeProcessor 该Processor主要用于修改消息不可见时间。此外,其中还定义了一些实用的方法,例如CK的buffer存储方法,以及持久化CK的方法。此处不过多展开。 3.2 核心Service 3.2.1 PopReviveService 该Service被AckMessageProcessor启动。该Service主要对重试队列中的消息进行merge,尝试消除此前没有CK、ACK的消息。 在该Service中,会定期消费Revive队列中的CK、ACK对。在其中会维护一个map,将CK和ACK进行存放与检索: ```java if (PopAckConstants.CK_TAG.equals(messageExt.getTags())) { ...... // 将CK存放入map map.put(point.getT() + point.getC() + point.getQ() + point.getSo() + point.getPt(), point); point.setRo(messageExt.getQueueOffset()); ...... } else if (PopAckConstants.ACK_TAG.equals(messageExt.getTags())) { ...... // 从map中检索是否有CK AckMsg ackMsg = JSON.parseObject(raw, AckMsg.class); String mergeKey = ackMsg.getT() + ackMsg.getC() + ackMsg.getQ() + ackMsg.getSo() + ackMsg.getPt(); PopCheckPoint point = map.get(mergeKey); ...... } ``` 在完成消息的取出后,会进行revive过程中未ACK消息的重试,以及位点提交: 3.2.2 PopBufferMergeService 该Service主要对Buffer中的CK、ACK进行检查。根据检查结果将展开不同的操作。此外,在该Service中还定义了许多实用的CK、ACK操作方法,能够对CK、ACK进行检查以及存储等相关操作。 该Service会启动一个线程,定期执行`scan()`方法。此外,在扫描达到一定次数之后,将执行`scanGarbage()`方法清理数据。 在`scan()`中将会执行如下操作: 1. 获取buffer中所有键值对,作为CK集合。 ```java Iterator iterator = buffer.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry entry = iterator.next(); PopCheckPointWrapper pointWrapper = entry.getValue(); ``` 2. 针对每一个CK集合,检查其中的每个CK是否均被ACK,如果已经被ACK,则删除该CK集合。 ```java if ((pointWrapper.isJustOffset() && pointWrapper.isCkStored()) || isCkDone(pointWrapper) || (isCkDoneForFinish(pointWrapper) && pointWrapper.isCkStored())) { if (brokerController.getBrokerConfig().isEnablePopLog()) { POP_LOGGER.info("[PopBuffer]ck done, {}", pointWrapper); } iterator.remove(); counter.decrementAndGet(); continue; } ``` 3. 针对未被ACK的CK,将CK、ACK信息进行存储,待一定时间后进行重试。 ```java if (removeCk) { // put buffer ak to store if (pointWrapper.getReviveQueueOffset() < 0) { putCkToStore(pointWrapper, false); countCk++; } if (!pointWrapper.isCkStored()) { continue; } for (byte i = 0; i < point.getN(); i++) { // reput buffer ak to store if (DataConverter.getBit(pointWrapper.getBits().get(), i) && !DataConverter.getBit(pointWrapper.getToStoreBits().get(), i)) { if (putAckToStore(pointWrapper, i)) { count++; markBitCAS(pointWrapper.getToStoreBits(), i); } } } ``` 在上述操作中,CK与ACK能够成对存储,且定时重试。直到消费者发送回成对的ACK、CK请求,这对请求方可抵消,也象征该消息被消费成功。 代码详解 可参考如下PPT中的介绍内容:
查看全部文章
ABOUT US
Apache RocketMQ事件驱动架构全景图
微服务
Higress
Dubbo
Sentinel
Seata
Spring Cloud
Nacos
物联网
家电
汽车
穿戴设备
充电桩
工业设备
手机
事件驱动架构平台
RabbitMQ
Kafka
EventBridge
MQTT
RocketMQ
MNS
Apache RocketMQ as Core
计算
模型服务
函数计算
容器
存储
对象存储
数据库
NoSQL
分析
Flink
Spark
Elastic Search
事件
云服务器
对象存储
云监控
SaaS事件
通知
语音
短信
邮箱
移动推送

产品特点

为什么学习Apache RocketMQ

云原生
生于云,长于云,无限弹性扩缩,K8S 友好
高吞吐
万亿级吞吐保证,同时满足微服务于大数据场景
流处理
提供轻量、高扩展、高性能和丰富功能的流计算引擎
金融级
金融级的稳定性,广泛用于交易核心链路
架构极简
零外部依赖,Shared-nothing 架构
生态友好
无缝对接微服务、实时计算、数据湖等周边生态
浙ICP备12022327号-1120
收藏
收藏暂无数据,请从小助手对话框添加
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
无疑 AI答疑专家
当前服务输出的内容均由人工智能模型生成,其生成内容的准确性和完整性无法保证,不代表我们的态度或观点。
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
专家答疑