2025年5月29日

Apache RocketMQ 源码解析 —— 秒级定时消息介绍
背景 如今rocketmq的应用场景日益拓宽,延时消息的需求也在增加。原本的特定级别延时消息已经不足以支撑rocketmq灵活的使用场景。因此,我们需要一个支持任意时间的延迟消息feature。 支持任意时间延迟的feature能够让使用者在消息发出时指定其消费时间,在生活与生产中具有非常重要的意义。 目标 1. 支持任意时延的延迟消息。 2. 提供延迟消息可靠的存储方式。 3. 保证延迟消息具有可靠的收发性能。 4. 提供延迟消息的可观测性排查能力。 架构 存储数据结构 本方案主要通过时间轮实现任意时延的定时消息。在此过程中,涉及两个核心的数据结构:TimerLog(存储消息索引)和TimerWheel(时间轮,用于定时消息到时)。 TimerLog,为本RIP中所设计的定时消息的记录文件,Append Only。每条记录包含一个prev_pos,指向前一条定时到同样时刻的记录。每条记录的内容可以包含定时消息本身,也可以只包含定时消息的位置信息。每一条记录包含如下信息: | 名称 | 大小 | 备注 | | | | | | size | 4B | 保存记录的大小 | | prev_pos | 8B | 前一条记录的位置 | | next_Pos | 8B | 后一条记录的位置,暂时为1,作为保留字段 | | magic | 4B | magic value | | delayed_time | 4B | 该条记录的定时时间 | | offset_real | 8B | 该条消息在commitLog中的位置 | | size_real | 4B | 该条消息在commitLog中的大小 | | hash_topic | 4B | 该条消息topic的hash code | | varbody | | 存储可变的body,暂时没有 | TimerWheel是对时刻表的一种抽象,通常使用数组实现。时刻表上的每一秒,顺序对应到数组中的位置,然后数组循环使用。时间轮的每一格,指向了TimerLog中的对应位置,如果这一格的时间到了,则按TimerLog中的对应位置以及prev_pos位置依次读出每条消息。 时间轮一格一格向前推进,配合TimerLog,依次读出到期的消息,从而达到定时消息的目的。时间轮的每一格设计如下: | delayed_time(8B) 延迟时间 | first_pos(8B) 首条位置 | last_pos(8B) 最后位置 | num(4B)消息条数 | | | | | | 上述设计的TimerLog与TimerWheel的协作如下图所示。 pipeline 在存储方面,采用本地文件系统作为可靠的延时消息存储介质。延时消息另存TimerLog文件中。通过时间轮对定时消息进行定位以及存取。针对长时间定时消息,通过消息滚动的方式避免过大的消息存储量。其具体架构如下所示: 从图中可以看出,共有五个Service分别处理定时消息的放置和存储。工作流如下: 1. 针对放置定时消息的service,每50ms从commitLog读取指定主题(TIMER_TOPIC)的定时消息。 1. TimerEnqueueGetService从commitLog读取得到定时主题的消息,并先将其放入enqueuePutQueue。 2. 另一个线程TimerEnqueuePutService将其放入timerLog,更新时间轮的存储内容。将该任务放进时间轮的指定位置。 2. 针对取出定时消息的service,每50ms读取下一秒的slot。有三个线程将读取到的消息重新放回commitLog。 1. 首先,TimerDequeueGetService每50ms读一次下一秒的slot,从timerLog中得到指定的msgs,并放进dequeueGetQueue。 2. 而后TimerDequeueGetMessageService从dequeueGetQueue中取出msg,并将其放入队列中。该队列为待写入commitLog的队列,dequeuePutQueue。 3. 最后TimerDequeuePutMessageService将这个queue中的消息取出,若已到期则修改topic,放回commitlog,否则继续按原topic写回CommitLog滚动。 代码实现 TimerLog与TimerWheel的协作实现 定时消息的核心存储由TimerLog和TimerWheel协同完成。TimerLog作为顺序写入的日志文件,每条记录包含消息在CommitLog中的物理偏移量(offsetPy)和延迟时间(delayed_time)。当消息到达时,TimerEnqueuePutService会将其索引信息追加到TimerLog,并通过prev_pos字段构建链表结构,确保同一时刻的多个消息可被快速遍历。 ```java // TimerLog.java 核心写入逻辑 public long append(byte[] data, int pos, int len) { MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); // 处理文件切换:当当前文件剩余空间不足时,填充空白段并创建新文件 if (len + MIN_BLANK_LEN mappedFile.getFileSize() mappedFile.getWrotePosition()) { ByteBuffer blankBuffer = ByteBuffer.allocate(MIN_BLANK_LEN); blankBuffer.putInt(mappedFile.getFileSize() mappedFile.getWrotePosition()); blankBuffer.putLong(0); // prev_pos置空 blankBuffer.putInt(BLANK_MAGIC_CODE); // 标记空白段 mappedFile.appendMessage(blankBuffer.array()); mappedFile = this.mappedFileQueue.getLastMappedFile(0); // 切换到新文件 } // 写入实际数据并返回物理偏移量 long currPosition = mappedFile.getFileFromOffset() + mappedFile.getWrotePosition(); mappedFile.appendMessage(data, pos, len); return currPosition; } ``` 此代码展示了消息追加的核心流程: 1. 检查当前文件的剩余空间,不足时填充空白段并创建新文件 2. 将消息索引数据写入内存映射文件 3. 返回写入位置的全局偏移量,供时间轮记录 时间轮槽位管理 TimerWheel通过数组结构管理时间槽位,每个槽位记录该时刻的首尾指针和消息数量。当消息入队时,putSlot方法会更新对应槽位的链表结构: ```java // TimerWheel.java 槽位更新逻辑 public void putSlot(long timeMs, long firstPos, long lastPos, int num, int magic) { localBuffer.get().position(getSlotIndex(timeMs) Slot.SIZE); localBuffer.get().putLong(timeMs / precisionMs); // 标准化时间戳 localBuffer.get().putLong(firstPos); // 链表头指针 localBuffer.get().putLong(lastPos); // 链表尾指针 localBuffer.get().putInt(num); // 当前槽位消息总数 localBuffer.get().putInt(magic); // 特殊标记(如滚动/删除) } ``` 该方法的实现细节: + getSlotIndex(timeMs)将时间戳映射到环形数组的索引 + 同时记录首尾指针以实现O(1)复杂度插入 消息入队流程 TimerEnqueueGetService:消息扫描服务 该服务作为定时消息入口,持续扫描CommitLog中的TIMER_TOPIC消息。其核心逻辑通过enqueue方法实现: ```java // TimerMessageStore.java public boolean enqueue(int queueId) { ConsumeQueueInterface cq = this.messageStore.getConsumeQueue(TIMER_TOPIC, queueId); ReferredIterator iterator = cq.iterateFrom(currQueueOffset); while (iterator.hasNext()) { CqUnit cqUnit = iterator.next(); MessageExt msgExt = getMessageByCommitOffset(cqUnit.getPos(), cqUnit.getSize()); // 构造定时请求对象 TimerRequest timerRequest = new TimerRequest( cqUnit.getPos(), cqUnit.getSize(), Long.parseLong(msgExt.getProperty(TIMER_OUT_MS)), System.currentTimeMillis(), MAGIC_DEFAULT, msgExt ); // 放入入队队列(阻塞式) while (!enqueuePutQueue.offer(timerRequest, 3, TimeUnit.SECONDS)) { if (!isRunningEnqueue()) return false; } currQueueOffset++; // 更新消费进度 } } ``` 关键设计点: 1. 增量扫描:通过currQueueOffset记录消费位移,避免重复处理 2. 消息转换:将ConsumeQueue中的索引转换为包含完整元数据的TimerRequest` TimerEnqueuePutService:时间轮写入服务 从队列获取请求后,该服务执行核心的定时逻辑: ```java // TimerMessageStore.java public boolean doEnqueue(long offsetPy, int sizePy, long delayedTime, MessageExt messageExt) { // 计算目标时间槽位 Slot slot = timerWheel.getSlot(delayedTime); // 构造TimerLog记录 ByteBuffer buffer = ByteBuffer.allocate(TimerLog.UNIT_SIZE); buffer.putLong(slot.lastPos); // 前驱指针指向原槽位尾 buffer.putLong(offsetPy); // CommitLog物理偏移 buffer.putInt(sizePy); // 消息大小 buffer.putLong(delayedTime); // 精确到毫秒的延迟时间 // 写入TimerLog并更新时间轮 long pos = timerLog.append(buffer.array()); timerWheel.putSlot(delayedTime, slot.firstPos, pos, slot.num + 1); } ``` 写入优化策略: + 空间预分配:当检测到当前MappedFile剩余空间不足时,自动填充空白段并切换文件 + 链表式存储:通过prev_pos字段构建时间槽位的倒序链表,确保新消息快速插入 + 批量提交:积累多个请求后批量写入,减少文件I/O次数 TimerEnqueuePutService从队列获取消息请求,处理消息滚动逻辑。当检测到延迟超过时间轮窗口时,将消息重新写入并标记为滚动状态: ```java // TimerMessageStore.java 消息滚动处理 boolean needRoll = delayedTime tmpWriteTimeMs = (long) timerRollWindowSlots precisionMs; if (needRoll) { magic |= MAGIC_ROLL; // 调整延迟时间为时间轮窗口中间点,确保滚动后仍有处理时间 delayedTime = tmpWriteTimeMs + (long) (timerRollWindowSlots / 2) precisionMs; } ``` 此逻辑的关键设计: 1. 当延迟时间超过当前时间轮容量时触发滚动 2. 将消息的delayed_time调整为窗口中间点,避免频繁滚动 3. 设置MAGIC_ROLL标记,出队时识别滚动消息 消息出队处理 TimerDequeueGetService:到期扫描服务 该服务以固定频率推进时间指针,触发到期消息处理: ```java // TimerMessageStore.java public int dequeue() throws Exception { // 获取当前时间槽位 Slot slot = timerWheel.getSlot(currReadTimeMs); // 遍历TimerLog链表 long currPos = slot.lastPos; while (currPos != 1) { SelectMappedBufferResult sbr = timerLog.getTimerMessage(currPos); ByteBuffer buf = sbr.getByteBuffer(); // 解析记录元数据 long prevPos = buf.getLong(); long offsetPy = buf.getLong(); int sizePy = buf.getInt(); long delayedTime = buf.getLong(); // 分类处理(普通消息/删除标记) if (isDeleteMarker(buf)) { deleteMsgStack.add(new TimerRequest(offsetPy, sizePy, delayedTime)); } else { normalMsgStack.addFirst(new TimerRequest(offsetPy, sizePy, delayedTime)); } currPos = prevPos; // 前向遍历链表 } // 分发到处理队列 splitAndDispatch(normalMsgStack); splitAndDispatch(deleteMsgStack); moveReadTime(); // 推进时间指针 } ``` TimerDequeuePutMessageService:消息投递服务 TimerDequeuePutMessageService将到期消息重新写入CommitLog。对于滚动消息,会修改主题属性并增加重试计数: ```java // TimerMessageStore.java 消息转换逻辑 MessageExtBrokerInner convert(MessageExt messageExt, long enqueueTime, boolean needRoll) { if (needRoll) { // 增加滚动次数标记 MessageAccessor.putProperty(messageExt, TIMER_ROLL_TIMES, Integer.parseInt(messageExt.getProperty(TIMER_ROLL_TIMES)) + 1 + ""); } // 恢复原始主题和队列ID messageInner.setTopic(messageInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC)); messageInner.setQueueId(Integer.parseInt( messageInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID))); return messageInner; } ``` 此转换过程确保: + 滚动消息保留原始主题信息 + 每次滚动增加TIMER_ROLL_TIMES属性 该服务最终将到期消息重新注入CommitLog: ```java // TimerMessageStore.java public void run() { while (!stopped) { TimerRequest req = dequeuePutQueue.poll(10, TimeUnit.MILLISECONDS); MessageExtBrokerInner innerMsg = convert(req.getMsg(), req.needRoll()); // 消息重投递逻辑 int result = doPut(innerMsg, req.needRoll()); switch (result) { case PUT_OK: // 成功:更新监控指标 timerMetrics.recordDelivery(req.getTopic()); break; case PUT_NEED_RETRY: // 重试:重新放回队列头部 dequeuePutQueue.putFirst(req); break; case PUT_NO_RETRY: // 丢弃:记录错误日志 log.warn("Discard undeliverable message:{}", req); } } } ``` 故障恢复机制 系统重启时通过recover方法重建时间轮状态。关键步骤包括遍历TimerLog文件并修正槽位指针: ```java // TimerMessageStore.java 恢复流程 private long recoverAndRevise(long beginOffset, boolean checkTimerLog) { List mappedFiles = timerLog.getMappedFileQueue().getMappedFiles(); for (MappedFile mappedFile : mappedFiles) { SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0); ByteBuffer bf = sbr.getByteBuffer(); while (position Google Doc: + Shimo:
#技术探索

2025年5月29日

Apache RocketMQ 源码解析 —— Controller 高可用切换架构
一、原理及核心概念浅述 1.1 核心架构 1.2 核心概念 1. controller:负责管理broker间的主备关系,可以挂在namesrv中,不影响namesrv能力,支持独立部署。 2. master/slave:主备身份。 3. syncStateSet:字面意思为“同步状态集合”。当备节点能够及时跟上主节点,则会纳入syncStateSet。 4. epoch:用于记录每一次主备切换时的状态,避免切换后产生数据丢失或者不一致的情况。 为方便理解,在某些过程中可以把controller当作班主任,master作为小组长,slave作为小组成员。同步过程是各位同学向小组长抄作业的过程,位于syncStateSet中的是优秀作业。 二、相关代码文件及说明 核心是“controller+broker+复制过程”,因此分三块进行叙述。 2.1 Controller 该部分代码主要集中在rocketmqcontroller模块下,主要有如下代码文件: + ControllerManager: 负责管理controller,其中存储了许多controller相关配置,并负责了心跳管理等核心功能。(班主任管理条例) + DLederController: Controller的DLedger实现。包含了controller的基本功能。在其中实现了副本信息管理、broker存活情况探测、选举Master等核心功能。(某种班主任) + DefaultBrokerHeartbeatManager: 负责管理broker心跳,其中包含了broker存活情况表,以及在broker下线时的listeners,当副本掉线时,触发重新选举。(点名册) + ReplicasInfoManager: 负责controller中事件的处理。即各种选举事件、更换SyncStateSet事件等等。(小组登记册) + ControllerRequestProcessor: 处理向controller发送的requests,例如让controller选举、向controller注册broker、心跳、更换SyncStateSet等等。(班主任信箱) + DefaultElectPolicy: 选举Master的策略。可以选择从sync状态的副本中选,也可以支持从所有副本中(无论是否同步)的unclean选举。(班规) + ...... 2.2 Broker 该部分代码主要集中在rocketmqbroker模块中,可进入org/apache/rocketmq/broker/controller进行查看: + ReplicasManager: 完成自己作为一个replica的使命——找controller,角色管理,Master更新(Expand/Shrink)SyncStateSet等等。 2.3 复制模块 该部分代码主要集中在rocketmqstore模块中的ha文件夹下: + HAService: 每个Replica必备的的service,负责管理作为主、备的同步任务。 + HAClient: 每个Slave 的HAService中必备的client,负责管理同步任务中的读、写操作。 + HAConnection: 代表在Master中的HA连接,每个connection理论上对应一个slave。在该connection类中存储了传输过程中的诸多内容,包括channel、传输状态、当前传输位点等等信息。 三、核心流程 3.1 心跳 核心CODE:BROKER_HEARTBEAT Broker端: 该部分较简单,带上code向controller发request,不再赘述: BrokerController.sendHeartbeat() brokerOuterAPI.sendHeartbeat() Controller端: 1. 首先由ControllerRequestProcessor接收到code,进入处理逻辑: ```java private RemotingCommand handleBrokerHeartbeat(ChannelHandlerContext ctx, RemotingCommand request) throws Exception { final BrokerHeartbeatRequestHeader requestHeader = (BrokerHeartbeatRequestHeader) request.decodeCommandCustomHeader(BrokerHeartbeatRequestHeader.class); if (requestHeader.getBrokerId() == null) { return RemotingCommand.createResponseCommand(ResponseCode.CONTROLLER_INVALID_REQUEST, "Heart beat with empty brokerId"); } this.heartbeatManager.onBrokerHeartbeat(requestHeader.getClusterName(), requestHeader.getBrokerName(), requestHeader.getBrokerAddr(), requestHeader.getBrokerId(), requestHeader.getHeartbeatTimeoutMills(), ctx.channel(), requestHeader.getEpoch(), requestHeader.getMaxOffset(), requestHeader.getConfirmOffset(), requestHeader.getElectionPriority()); return RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "Heart beat success"); } ``` 2. 之后在onBrokerHeartbeat()中,主要更新controller brokerHeartbeatManager中的brokerLiveTable: ```java public void onBrokerHeartbeat(String clusterName, String brokerName, String brokerAddr, Long brokerId, Long timeoutMillis, Channel channel, Integer epoch, Long maxOffset, Long confirmOffset, Integer electionPriority) { BrokerIdentityInfo brokerIdentityInfo = new BrokerIdentityInfo(clusterName, brokerName, brokerId); BrokerLiveInfo prev = this.brokerLiveTable.get(brokerIdentityInfo); ...... if (null == prev) { this.brokerLiveTable.put(...); log.info("new broker registered, {}, brokerId:{}", brokerIdentityInfo, realBrokerId); } else { prev.setXXX(......) } } ``` 3.2 选举 相关CODE: CONTROLLER_ELECT_MASTER 有如下几种情形可能触发选举: 1. controller主动发起,通过triggerElectMaster(): 1. HeartbeatManager监听到有broker心跳失效。 (班主任发现有小组同学退学了) 2. Controller检测到有一组Replica Set不存在master。(班主任发现有组长虽然在名册里,但是挂了) 2. broker发起将自己选为master,通过ReplicaManager.brokerElect(): 1. Broker向controller查metadata时,没找到master信息。(同学定期检查小组情况,问班主任为啥没小组长) 2. Broker向controller注册完后,仍未从controller获取到master信息。(同学报道后发现没小组长,汇报) 3. 通过tools发起: 1. 通过选举命令ReElectMasterSubCommand发起。(校长直接任命) 上述所有过程,最终均触发: controller.electMaster() replicasInfoManager.electMaster() // 即,所有小组长必须通过班主任任命 ```java public ControllerResult electMaster(final ElectMasterRequestHeader request, final ElectPolicy electPolicy) { ... // 从request中取信息 ... if (syncStateInfo.isFirstTimeForElect()) { // 从未注册,直接任命 newMaster = brokerId; } // 按选举政策选主 if (newMaster == null) { // we should assign this assignedBrokerId when the brokerAddress need to be elected by force Long assignedBrokerId = request.getDesignateElect() ? brokerId : null; newMaster = electPolicy.elect(brokerReplicaInfo.getClusterName(), brokerReplicaInfo.getBrokerName(), syncStateSet, allReplicaBrokers, oldMaster, assignedBrokerId); } if (newMaster != null && newMaster.equals(oldMaster)) { // 老主 == 新主 // old master still valid, change nothing String err = String.format("The old master %s is still alive, not need to elect new master for broker %s", oldMaster, brokerReplicaInfo.getBrokerName()); LOGGER.warn("{}", err); // the master still exist response.setXXX() result.setBody(new ElectMasterResponseBody(syncStateSet).encode()); result.setCodeAndRemark(ResponseCode.CONTROLLER_MASTER_STILL_EXIST, err); return result; } // a new master is elected if (newMaster != null) { // 出现不一样的新主 final int masterEpoch = syncStateInfo.getMasterEpoch(); final int syncStateSetEpoch = syncStateInfo.getSyncStateSetEpoch(); final HashSet newSyncStateSet = new HashSet<(); //设置新的syncStateSet newSyncStateSet.add(newMaster); response.setXXX()... ElectMasterResponseBody responseBody = new ElectMasterResponseBody(newSyncStateSet); } result.setBody(responseBody.encode()); final ElectMasterEvent event = new ElectMasterEvent(brokerName, newMaster); result.addEvent(event); return result; } // 走到这里,说明没有主,选举失败 // If elect failed and the electMaster is triggered by controller (we can figure it out by brokerAddress), // we still need to apply an ElectMasterEvent to tell the statemachine // that the master was shutdown and no new master was elected. if (request.getBrokerId() == null) { final ElectMasterEvent event = new ElectMasterEvent(false, brokerName); result.addEvent(event); result.setCodeAndRemark(ResponseCode.CONTROLLER_MASTER_NOT_AVAILABLE, "Old master has down and failed to elect a new broker master"); } else { result.setCodeAndRemark(ResponseCode.CONTROLLER_ELECT_MASTER_FAILED, "Failed to elect a new master"); } return result; } ``` 3.3 更新SyncStateSet 核心CODE: CONTROLLER_ALTER_SYNC_STATE_SET 1. 由master发起,主动向controller更换syncStateSet(等价于小组长汇报优秀作业) 2. controllerRequestProcessor接收更换syncStateSet的请求,进入handleAlterSyncStateSet()方法: ```java private RemotingCommand handleAlterSyncStateSet(ChannelHandlerContext ctx, RemotingCommand request) throws Exception { final AlterSyncStateSetRequestHeader controllerRequest = (AlterSyncStateSetRequestHeader) request.decodeCommandCustomHeader(AlterSyncStateSetRequestHeader.class); final SyncStateSet syncStateSet = RemotingSerializable.decode(request.getBody(), SyncStateSet.class); final CompletableFuture future = this.controllerManager.getController().alterSyncStateSet(controllerRequest, syncStateSet); if (future != null) { return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS); } return RemotingCommand.createResponseCommand(null); } ``` 3. 之后进入Controller.alterSyncStateSet() replicasInfoManager.alterSyncStateSet()方法: ```java public ControllerResult alterSyncStateSet( final AlterSyncStateSetRequestHeader request, final SyncStateSet syncStateSet, final BrokerValidPredicate brokerAlivePredicate) { final String brokerName = request.getBrokerName(); ... final Set newSyncStateSet = syncStateSet.getSyncStateSet(); final SyncStateInfo syncStateInfo = this.syncStateSetInfoTable.get(brokerName); final BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName); // 检查syncStateSet是否有变化 final Set oldSyncStateSet = syncStateInfo.getSyncStateSet(); if (oldSyncStateSet.size() == newSyncStateSet.size() && oldSyncStateSet.containsAll(newSyncStateSet)) { String err = "The newSyncStateSet is equal with oldSyncStateSet, no needed to update syncStateSet"; ... } // 检查是否是master发起的 if (!syncStateInfo.getMasterBrokerId().equals(request.getMasterBrokerId())) { String err = String.format("Rejecting alter syncStateSet request because the current leader is:{%s}, not {%s}", syncStateInfo.getMasterBrokerId(), request.getMasterBrokerId()); ... } // 检查master的任期epoch是否一致 if (request.getMasterEpoch() != syncStateInfo.getMasterEpoch()) { String err = String.format("Rejecting alter syncStateSet request because the current master epoch is:{%d}, not {%d}", syncStateInfo.getMasterEpoch(), request.getMasterEpoch()); ... } // 检查syncStateSet的epoch if (syncStateSet.getSyncStateSetEpoch() != syncStateInfo.getSyncStateSetEpoch()) { String err = String.format("Rejecting alter syncStateSet request because the current syncStateSet epoch is:{%d}, not {%d}", syncStateInfo.getSyncStateSetEpoch(), syncStateSet.getSyncStateSetEpoch()); ... } // 检查新的syncStateSet的合理性 for (Long replica : newSyncStateSet) { // 检查replica是否存在 if (!brokerReplicaInfo.isBrokerExist(replica)) { String err = String.format("Rejecting alter syncStateSet request because the replicas {%s} don't exist", replica); ... } // 检查broker是否存活 if (!brokerAlivePredicate.check(brokerReplicaInfo.getClusterName(), brokerReplicaInfo.getBrokerName(), replica)) { String err = String.format("Rejecting alter syncStateSet request because the replicas {%s} don't alive", replica); ... } } // 检查是否包含master if (!newSyncStateSet.contains(syncStateInfo.getMasterBrokerId())) { String err = String.format("Rejecting alter syncStateSet request because the newSyncStateSet don't contains origin leader {%s}", syncStateInfo.getMasterBrokerId()); ... } // 更新epoch int epoch = syncStateInfo.getSyncStateSetEpoch() + 1; ... // 生成事件,替换syncStateSet final AlterSyncStateSetEvent event = new AlterSyncStateSetEvent(brokerName, newSyncStateSet); ... } ``` 4. 最后通过syncStateInfo.updateSyncStateSetInfo(),更新syncStateSetInfoTable.get(brokerName)得到的syncStateInfo信息(该过程可以理解为班主任在班级分组册上找到了组长的名字,拿出组员名单,更新)。 3.4 复制 该部分较复杂,其中HAService/HAClient/HAConnection以及其中的各种Service/Reader/Writer容易产生混淆,对阅读造成阻碍。因此绘制本图帮助理解(可在粗读源码后回头理解): 下面对HA复制过程作拆解,分别讲解: 1. 在各个replica的DefaultMessageStore中均注册了HAService,负责管理HA的复制。 2. 在Master的 HAService中有一个AcceptSocketService, 负责自动接收各个slave的连接: ```java protected abstract class AcceptSocketService extends ServiceThread { ... / Starts listening to slave connections. @throws Exception If fails. / public void beginAccept() throws Exception { ... } @Override public void shutdown(final boolean interrupt) { ... } @Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { this.selector.select(1000); Set selected = this.selector.selectedKeys(); if (selected != null) { for (SelectionKey k : selected) { if (k.isAcceptable()) { SocketChannel sc = ((ServerSocketChannel) k.channel()).accept(); if (sc != null) { DefaultHAService.log.info("HAService receive new connection, " + sc.socket().getRemoteSocketAddress()); try { HAConnection conn = createConnection(sc); conn.start(); DefaultHAService.this.addConnection(conn); } catch (Exception e) { log.error("new HAConnection exception", e); sc.close(); } } } ... } } ``` 3. 在各个Slave 的HAService中存在一个HAClient,负责向master发起连接、传输请求。 ```java public class AutoSwitchHAClient extends ServiceThread implements HAClient { ... } public interface HAClient { void start(); void shutdown(); void wakeup(); void updateMasterAddress(String newAddress); void updateHaMasterAddress(String newAddress); String getMasterAddress(); String getHaMasterAddress(); long getLastReadTimestamp(); long getLastWriteTimestamp(); HAConnectionState getCurrentState(); void changeCurrentState(HAConnectionState haConnectionState); void closeMaster(); long getTransferredByteInSecond(); } ``` 4. 当master收到slave的连接请求后,将会创建一个HAConnection,负责收发内容。 ```java public interface HAConnection { void start(); void shutdown(); void close(); SocketChannel getSocketChannel(); HAConnectionState getCurrentState(); String getClientAddress(); long getTransferredByteInSecond(); long getTransferFromWhere(); long getSlaveAckOffset(); } ``` 5. Master的HAConnection会与Slave的HAClient建立连接,二者均通过HAWriter(较简单,不解读,位于HAWriter类)往socket中写内容,再通过HAReader读取socket中的内容。只不过一个是HAServerReader,一个是HAClientReader: ```java public abstract class AbstractHAReader { private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); protected final List readHookList = new ArrayList<(); public boolean read(SocketChannel socketChannel, ByteBuffer byteBufferRead) { int readSizeZeroTimes = 0; while (byteBufferRead.hasRemaining()) { ... boolean result = processReadResult(byteBufferRead); ... } } ... protected abstract boolean processReadResult(ByteBuffer byteBufferRead); } ``` 6. 两种HAReader均实现了processReadResult()方法,负责处理从socket中得到的数据。client需要详细阐述该方法,因为涉及到如何将读进来的数据写入commitlog,client的processReadResult(): ```java @Override protected boolean processReadResult(ByteBuffer byteBufferRead) { int readSocketPos = byteBufferRead.position(); try { while (true) { ... switch (AutoSwitchHAClient.this.currentState) { case HANDSHAKE: { ... // 握手阶段,先检查commitlog完整性,截断 } break; case TRANSFER: { // 传输阶段,将body写入commitlog ... byte[] bodyData = new byte[bodySize]; ... if (bodySize 0) { // 传输阶段,将body写入commitlog AutoSwitchHAClient.this.messageStore.appendToCommitLog(masterOffset, bodyData, 0, bodyData.length); } haService.updateConfirmOffset(Math.min(confirmOffset, messageStore.getMaxPhyOffset())); ... break; } default: break; } if (isComplete) { continue; } } // 检查buffer中是否还有数据, 如果有, compact() ... break; } } ... } ``` 7. server的processReadResult()主要用于接收client的握手等请求,较简单。更需要解释其WriteSocketService如何向socket中调用HAwriter去写数据: ```java abstract class AbstractWriteSocketService extends ServiceThread { ... private void transferToSlave() throws Exception { ... int size = this.getNextTransferDataSize(); if (size 0) { ... buildTransferHeaderBuffer(this.transferOffset, size); this.lastWriteOver = this.transferData(size); } else { // 无需传输,直接更新caught up的时间 AutoSwitchHAConnection.this.haService.updateConnectionLastCaughtUpTime(AutoSwitchHAConnection.this.slaveId, System.currentTimeMillis()); haService.getWaitNotifyObject().allWaitForRunning(100); } } @Override public void run() { AutoSwitchHAConnection.LOGGER.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { this.selector.select(1000); switch (currentState) { case HANDSHAKE: // Wait until the slave send it handshake msg to master. // 等待slave的握手请求,并进行回复 break; case TRANSFER: ... transferToSlave(); break; default: ... } } catch (Exception e) { ... } } ... // 在service结束后的一些事情 } ... } ``` 此处同样附上server实现processReadResult(),读socket中数据的代码: ```java @Override protected boolean processReadResult(ByteBuffer byteBufferRead) { while (true) { ... HAConnectionState slaveState = HAConnectionState.values()[byteBufferRead.getInt(readPosition)]; switch (slaveState) { case HANDSHAKE: // 收到了client的握手 ... LOGGER.info("Receive slave handshake, slaveBrokerId:{}, isSyncFromLastFile:{}, isAsyncLearner:{}", AutoSwitchHAConnection.this.slaveId, AutoSwitchHAConnection.this.isSyncFromLastFile, AutoSwitchHAConnection.this.isAsyncLearner); break; case TRANSFER: // 收到了client的transfer状态 ... // 更新client状态信息 break; default: ... } ... } ``` 3.5 Active Controller的选举 该选举主要通过DLedger实现,在DLedgerController中通过RoleChangeHandler.handle()更新自身身份: ```java class RoleChangeHandler implements DLedgerLeaderElector.RoleChangeHandler { private final String selfId; private final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("DLedgerControllerRoleChangeHandler_")); private volatile MemberState.Role currentRole = MemberState.Role.FOLLOWER; public RoleChangeHandler(final String selfId) { this.selfId = selfId; } @Override public void handle(long term, MemberState.Role role) { Runnable runnable = () { switch (role) { case CANDIDATE: this.currentRole = MemberState.Role.CANDIDATE; // 停止扫描inactive broker任务 ... case FOLLOWER: this.currentRole = MemberState.Role.FOLLOWER; // 停止扫描inactive broker任务 ... case LEADER: { log.info("Controller {} change role to leader, try process a initial proposal", this.selfId); int tryTimes = 0; while (true) { // 将会开始扫描inactive brokers ... break; } } }; this.executorService.submit(runnable); } ... } ```
#技术探索

2025年5月29日

Apache RocketMQ 源码解析 —— POP 消费模式逻辑介绍
一、背景 在RocketMQ 5.0之前,消费有两种方式可以从Broker获取消息,分别为Pull模式和Push模式。 1. Pull模式:消费需要不断的从阻塞队列中获取数据,如果没有数据就等待,这个阻塞队列中的数据由消息拉取线程从Broker拉取消息之后加入的,所以Pull模式下消费需要不断主动从Broker拉取消息。 2. Push模式:需要注册消息监听器,当有消息到达时会通过回调函数进行消息消费,从表面上看就像是Broker主动推送给消费者一样,所以叫做推模式,底层依旧是消费者从Broker拉取数据然后触发回调函数进行消息消费,只不过不需要像Pull模式一样不断判断是否有消息到来。 不过不管是Pull模式还是Push模式,在集群模式下,一个消息队列只能分配给同一个消费组内的某一个消费者进行消费,所以需要进行Rebalance负载均衡为每个消费者分配消息队列之后才可以进行消息消费。 Rebalance的工作是在每个消费者端进行的,消费端负责的工作太多,除了负载均衡还有消费位点管理等功能,如果新增一种语言的支持,就需要重新实现一遍对应的业务逻辑代码。 除此以外,在RocketMQ 5.0以前负载均衡是以消息队列为维度为每个消费者分配的,一个消息队列只能分给组内一个消费者消费,所以会存在以下问题: (1)队列一次只能分给组内一个消费者消费,扩展能力有限。 (2)消息队列数量与消费者数量比例不均衡时,可能会导致某些消费者没有消息队列可以分配或者某些消费者承担过多的消息队列,分配不均匀; (3)如果某个消费者hang住,会导致分配到该消费者的消息队列中的消息无法消费,导致消息积压; 在RocketMQ 5.0增加了Pop模式消费,将负载均衡、消费位点管理等功能放到了Broker端,减少客户端的负担,使其变得轻量级,并且5.0之后支持消息粒度的负载均衡。当前支持任意时延的定时消息已在开源社区开源,而其支持的任意时延定时特性可以帮助POP进行一定优化。主要优化点在于InvisibleTime的设置。 二、原理 POP模式下,消息的消费有如下几个过程: 客户端发起POP消费请求Broker处理POP请求,返回消息客户端消费完成,返回ACKBroker处理Ack请求,在内部进行消费完成的位点更新。 可以用下图表示: 三、核心代码 3.1 核心Processor 3.1.1 PopMessageProcessor 该线程用于处理客户端发出的POP请求。在收到POP请求后,将处理该请求,找到待POP的队列: ```java int index = (PermName.isPriority(topicConfig.getPerm()) ? i : randomQ + i) % queueIdList.size(); int queueId = queueIdList.get(index); if (brokerController.getBrokerConfig().isMarkTaintWhenPopQueueList()) { this.brokerController.getPullMessageProcessor().getPullRebalanceManager() .addOrUpdateTaint(requestHeader.getTopic(), requestHeader.getConsumerGroup(), queueId); } if (queueId = 0 && queueId 1) { // 通过位运算,标记指定下标的CK消息被ACK标记。 markBitCAS(pointWrapper.getBits(), indexOfAck); } ``` 在上述存储完成后,后续通过其它线程进行处理。处理包括CK、AK的存储,以及抵消等操作。主要在3.2节中阐述。 3.1.3 ChangeInvisibleTimeProcessor 该Processor主要用于修改消息不可见时间。此外,其中还定义了一些实用的方法,例如CK的buffer存储方法,以及持久化CK的方法。此处不过多展开。 3.2 核心Service 3.2.1 PopReviveService 该Service被AckMessageProcessor启动。该Service主要对重试队列中的消息进行merge,尝试消除此前没有CK、ACK的消息。 在该Service中,会定期消费Revive队列中的CK、ACK对。在其中会维护一个map,将CK和ACK进行存放与检索: ```java if (PopAckConstants.CK_TAG.equals(messageExt.getTags())) { ...... // 将CK存放入map map.put(point.getT() + point.getC() + point.getQ() + point.getSo() + point.getPt(), point); point.setRo(messageExt.getQueueOffset()); ...... } else if (PopAckConstants.ACK_TAG.equals(messageExt.getTags())) { ...... // 从map中检索是否有CK AckMsg ackMsg = JSON.parseObject(raw, AckMsg.class); String mergeKey = ackMsg.getT() + ackMsg.getC() + ackMsg.getQ() + ackMsg.getSo() + ackMsg.getPt(); PopCheckPoint point = map.get(mergeKey); ...... } ``` 在完成消息的取出后,会进行revive过程中未ACK消息的重试,以及位点提交: 3.2.2 PopBufferMergeService 该Service主要对Buffer中的CK、ACK进行检查。根据检查结果将展开不同的操作。此外,在该Service中还定义了许多实用的CK、ACK操作方法,能够对CK、ACK进行检查以及存储等相关操作。 该Service会启动一个线程,定期执行`scan()`方法。此外,在扫描达到一定次数之后,将执行`scanGarbage()`方法清理数据。 在`scan()`中将会执行如下操作: 1. 获取buffer中所有键值对,作为CK集合。 ```java Iterator iterator = buffer.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry entry = iterator.next(); PopCheckPointWrapper pointWrapper = entry.getValue(); ``` 2. 针对每一个CK集合,检查其中的每个CK是否均被ACK,如果已经被ACK,则删除该CK集合。 ```java if ((pointWrapper.isJustOffset() && pointWrapper.isCkStored()) || isCkDone(pointWrapper) || (isCkDoneForFinish(pointWrapper) && pointWrapper.isCkStored())) { if (brokerController.getBrokerConfig().isEnablePopLog()) { POP_LOGGER.info("[PopBuffer]ck done, {}", pointWrapper); } iterator.remove(); counter.decrementAndGet(); continue; } ``` 3. 针对未被ACK的CK,将CK、ACK信息进行存储,待一定时间后进行重试。 ```java if (removeCk) { // put buffer ak to store if (pointWrapper.getReviveQueueOffset() < 0) { putCkToStore(pointWrapper, false); countCk++; } if (!pointWrapper.isCkStored()) { continue; } for (byte i = 0; i < point.getN(); i++) { // reput buffer ak to store if (DataConverter.getBit(pointWrapper.getBits().get(), i) && !DataConverter.getBit(pointWrapper.getToStoreBits().get(), i)) { if (putAckToStore(pointWrapper, i)) { count++; markBitCAS(pointWrapper.getToStoreBits(), i); } } } ``` 在上述操作中,CK与ACK能够成对存储,且定时重试。直到消费者发送回成对的ACK、CK请求,这对请求方可抵消,也象征该消息被消费成功。 代码详解 可参考如下PPT中的介绍内容:
#技术探索

2024年11月18日

基于Apache RocketMQ 事件驱动架构的 AI 应用实践
AI 应用在商业化服务的阶段会面临诸多挑战,比如更快的服务交付速度,更实时、精准的结果以及更人性化的体验等,传统架构限制于同步交互,无法满足上述需求,本篇文章给大家分享一下如何基于事件驱动架构应对上述挑战。 盘点 AI 应用场景 在深入探讨事件驱动架构跟 AI 结合前,我们先梳理一下 AI 应用的现状。 从应用架构层面,大致可以把 AI 应用分为以下三类: 1)基于基础模型的扩展应用,典型的如 ChatGpt(文本生成)、StableDiffusion(图像生成)、CosyVoice(声音生成)等,这类应用通常会以模型能力为核心,提供相对原子化的服务。 2)智能知识库应用,如 Langchain chatchat,这类应用是以 LLM 为核心,基于 RAG(增强检索技术)构建的具有广泛的业务场景的应用。 3)智能体应用,智能体应用核心要点是应用以 LLM 为交互中枢,能够通过工具的调用联通外部世界,复杂的表现形式如多智能体协作等,是企业 AI 应用落地最具想象空间的一类应用。 浅析 AI “原生” 说到“原生”二字,它代表的是对某种概念的广泛认知,比如提移动原生应用立马可以联想到手机端的 APP,提云原生应用很多开发者立马可以想到容器化等,而对于 AI “原生”,除了 ChatGpt,Midjourney 等几款头部 AI 应用,我们似乎还没有看到像移动应用那样广泛的“原生”应用被定义出来,当然今天也没有办法给出明确的结论,只是通过一些事实,帮大家推演 AI “原生”的方向,希望能够帮助慢慢凝聚在内心中那个对“AI 原生”的影像。 AI 给应用架构带来的变化 当 AI 能力加入后,我们的应用架构发生了较大的变化。RAG,Agent 等编程范式被引入,传统的工作流也因为有了 AI 节点,变得与以往有所不同。 AI 应用架构RAG AI 应用架构Agent 加入 AI 节点的工作流 AI 应用的变化趋势 从观察知名 AI 厂商的产品形态演进看,AI 应用由前面提到的基础模型扩展、智能知识库、智能体三类叠加又相对分离,在慢慢向由智能体统一管控约束的方向发展。 比如 Open AI 的 Canvas,Claude Artifacts,Vercel v0 等产品特性。它们都表现出了一系列的共性:智能内核,多模态,LUI 交互。 从另外一个角度理解,AI 原生的应用只有突破之前的用户体验才有可能让用户买单。分散的基础模型能力,多模态能力都只能在某些场景下有体验提升,某些方面甚至不如传统应用的用户体验。所以需要整合,将对话式交互,智能模型和多模态叠加从而构建出超越传统应用的使用体验。 使用事件驱动构建 AI 原生 这里并不是单纯为了追求技术的先进性而使用事件驱动架构,是因为实践中顺序式的架构有时候无法满足业务需求。 传统顺序式的架构在构建 AI 原生的挑战 顺序调用无法保障推理体验 模型服务的推理耗时远高于传统意义的网络服务调用,比如在文生图这个场景下使用 StableDiffusion 服务,即使经过算法优化后最快也是秒级,并发量较大的时候,会很容易导致服务器宕机。此外如声音的合成,数字人的合成等耗时可能是分钟级的,此时顺序调用明显就不太合适。选择事件驱动的架构可以快速响应用户,推理服务按需执行,这样既能够保障用户体验,同时也降低系统宕机风险。 顺序调用无法支持实时数据构建的需求 在智能问答系统中,结果的好坏跟数据有很大的关系。问答召回数据的实时性和准确性很大程度影响着智能问答系统的用户体验,从系统架构层面,问答和数据的更新是分开的。靠人工去更新海量数据不现实,通过设置定时任务以及构建知识库数据更新的工作流能够更加有效的解决数据实时更新的问题,事件驱动架构在这个场景下优势非常明显。 双向互动场景无法实现 在问答服务场景下,拟人化的行为能够得到用户好感从而扩展商机,传统的问答式应用架构相对机械死板,而使用消息队列作为信息传输可以有效主动触达用户,通过合理的意图判断,主动向用户问好,是有效的留存手段。 事件驱动构建 AI 原生的实践 接下来分享一下基于事件驱动架构构建的 AI 应用的一些实践。 StableDiffusion 异步推理 前面提到了关于文生图模型 StableDiffusion 在服务客户中遇到的问题,我们利用事件驱动架构,使用函数计算和轻量消息队列(原 MNS)构建了 StableDiffusion 的异步推理架构,用户请求到来时经过函数计算网关到达 API 代理函数,API 代理函数对请求进行打标鉴权,之后将请求发送到 MNS 队列,同时记录请求的元数据和推理信息到表格存储 TableStore,推理函数根据任务队列进行消费,调度 GPU 实例启动 StableDiffusion 进行服务,结束后返回图片结果以及更新请求状态,端侧通过页面上的轮询告知用户。 VoiceAgent 实时对话 这是一个相对复杂的应用,使用者可以通过语音跟背后的智能问答服务实时对话,同时还能够接收到来自智能服务的主动询问。 整体依然采用事件驱动架构,其 RTC Server 部分安装 rocketmqclient,订阅中心化的服务 topic,由定时任务(主要是意图分析)触发向队列 topic 生产消息内容,然后由 rocketmqclient 消费,进行主动询问。 VoiceAgent 知识库实时数据流 对于问答的另外一端,知识库的自动更新,则是通过 Catch Data Capture 策略,比如由外部系统数据源触发,或者通过将文档上传 OSS 触发。数据经过切片,向量化之后存储到向量数据库以及全文检索数据库。 面向 AI 原生应用的事件驱动架构 最后分享一下作为 AI 应用开发者的一套组合方案:通过 阿里云云应用平台 CAP(Cloud Application Platform) 选出基础模型服务,如 Ollama,ComfyUI,Cosyvoice,Embedding 等进行快速托管,使用 RcoketMQ,Kafka,MNS, Eventbridge 等搭建数据流管道和消息中心,本地利用 Spring AI Alibaba 等框架开发后端服务,实现 RAG,Agent 等能力。前端使用 Nextjs 框架构建界面,之后将开发好的前后端通过 Serverless Devs 工具部署到 CAP 平台,进行线上调用访问,最终上生产采用云原生网关保驾护航,对于长期的知识库或者智能体的运维则通过 ARMS 进行指标监控。
作者:寒斜
#技术探索

2024年11月6日

Apache RocketMQ 打破锁性能瓶颈之道
背景 Apache RocketMQ 是一个云原生消息传递和流式处理平台,可简化创建事件驱动型应用程序的过程。多年来随着 RocketMQ 的迭代,已经编写了大量代码来利用多核处理器,通过并发提高程序效率。因此管理并发性能变得至关重要,同时锁对于确保在访问共享资源时多个执行线程安全同步至关重要。尽管锁对于确保多核系统中的互斥性是必不可少的,但它们的使用也可能带来优化挑战。随着并发系统内部变得越来越复杂,部署有效的锁管理策略是保持性能的关键。 因此,在 GSOC 2024 中,我们 Apache RocketMQ 开源社区提报了一个非常具有挑战性的题目:《GSOC269 : Optimizing Lock Mechanisms in Apache RocketMQ》。在这个题目中,我们旨在优化锁行为,优化 RocketMQ 的性能以及资源占用。通过这个题目,我们创新性地提出了 ABS 锁 —— Adaptive Backoff Spin Lock。ABS 锁的思想是通过为轻量化的自旋锁提供一套退避策略,从而实现低成本、有限制的锁自旋行为,同时适应不同强度的资源争抢情况。 之所以取其缩写 ABS,是因为该锁的设计思想与刹车系统中的 ABS 系统有一定相似性。在早期系统中使用的自旋锁会一直进行自旋,但是当临界区较大时会产生大量无效自旋,类似刹车制动中的“抱死”结果,这导致我们的资源利用急剧增长。为了避免这种资源损耗,当前社区使用互斥锁对其进行了替代,避免临界区较大时的资源浪费。但是在这种替代方案落实后,当消息较小时,互斥锁的阻塞唤醒机制反而影响到消息发送的响应时间,并带来了更高的 CPU 损耗。 由于自旋锁在临界区较小时的响应时间(RT)以及 CPU 利用方面都有较好的表现,唯一的不足是在资源争抢时会带来资源浪费。因此我们决定对自旋锁进行优化,降低无效的资源损耗,从而更好的利用自旋锁的优点,使其同时适合争抢激烈或不激烈的场景。在实践中,我们已经证明了调整锁策略会影响 RocketMQ 的消息发送性能,带来显著的性能优化结果。 通过 ABS 锁,我们还能解决开源使用者对锁的抉择问题:在 RocketMQ 消息投递时存在两种锁定机制,SpinLock(swapAndSet),以及 ReentrantLock 互斥锁,但并无文档对其进行分析使用的场景。所以我们通过 ABS 锁对其进行整合,达到最优状态并且完成服务端的锁定机制闭环,不需要用户去决定当前场景锁定机制的选取,自然而然地根据争抢情况对锁参数进行微调。ABS 锁可以根据运行时条件动态调整其行为,例如锁争用级别和争用同一资源的线程数。这可以通过最大限度地减少与锁获取和释放相关的开销来提高性能,尤其是在高争用的情况下。通过实时监控系统的性能指标,ABS 锁可以在不同的锁定策略之间切换。 我们目前已经实现自适应锁定机制(ABS 锁),通过实验结果成功验证:自适应锁定机制达到不同场景下单独使用互斥锁/自旋锁的最优效果,简而言之就是取得不同场景下最优锁定机制的效果。本文将详细介绍 Apache RocketMQ 的锁机制迭代过程,并介绍 ABS 锁的优化效果。 相关概念介绍 在文章正式开始前,需要介绍一些本文中可能频繁用到的概念:临界区、互斥锁、自旋锁。了解清楚这些概念将有助于阅读本文的优化思想。 临界区 临界区(Critical Section)是一段供线程独占式访问的代码,也就是说若有一线程正在访问该代码段,其它线程想要访问,只能等待当前线程离开该代码段方可进入,这样保证了线程安全。一般临界区大小是受多方面影响的,例如,本文中消息发送过程的临界区大小可能受消息体大小影响。 互斥锁 互斥锁是一种独占锁,当线程 A 加锁成功后,此时互斥锁已经被线程 A 独占了,只要线程 A 没有释放手中的锁,线程 B 就会失败,就会释放掉 CPU 给其他线程,线程 B 加锁的代码就会被阻塞。 互斥锁加锁失败而阻塞是由操作系统内核实现的,当加锁失败后,内核将线程置为睡眠状态,等到锁被释放后,内核会在合适的时机唤醒线程,当这个线程加锁成功后就可以继续执行。具体的互斥锁行为可以参考下图: 互斥锁带来的性能开销主要在两次线程上下文切换的成本。 1. 当线程加锁失败时,内核将线程的状态从【运行】切换到睡眠状态,然后把CPU切换给其他线程运行; 2. 当锁被释放时,之前睡眠状态的线程会变成就绪状态,然后内核就会在合适的时间把CPU切换给该线程运行。 自旋锁 自旋锁通过 CPU 提供的原子操作 CAS(CompareAndSet),在用户态完成加锁和解锁操作,不会主动产生线程上下文切换,所以相比互斥锁来说,会快一些开销小一些。它和互斥锁的主要区别在于,当加锁失败,互斥锁使用线程切换应对,而自旋锁用忙等待应对。 RocketMQ 的锁机制迭代 下面,我们将介绍 RocketMQ 的锁机制迭代过程。由于自旋锁、互斥锁有各自的优劣势:自旋锁的优势在于其轻量级和低上下文切换开销,适合短时间等待的情况;而互斥锁的优势在于能够节约资源,适合长时间占有锁的场景。 因此本文中将其优势分别比喻为“鱼”和“熊掌”,我们将一步步介绍如何同时拿到最优质的“鱼”和“熊掌”。 鱼和熊掌的抉择——互斥/自旋 在早期为了减小线程上下文切换带来的资源损耗,Apache RocketMQ 选取了自旋锁进行实现。随着版本迭代 RocketMQ 的并发压力日益增长,导致当临界区较大时,自旋锁会产生大量无效自旋,这导致 Broker 的资源利用急剧增长。因此后期 RocketMQ 又使用互斥锁对其进行了替代,避免临界区较大时的资源浪费。 但是当消息较小时,互斥锁的阻塞唤醒机制反而影响到 RT 以及上下文切换引起的更高的 CPU 损耗,自旋锁在临界区较小时的响应时间(RT)以及 CPU 利用方面都有较好的表现,因此在过去我们一直被锁定机制的正确选取所困扰。直到今日,RocketMQ 的内部仍然保留着这两种锁,且通过一个开关进行控制:useReentrantLockWhenPutMessage。 这意味着,使用者需要在启动 broker 时,决定自己的锁类型——如果消息体都比较小,且发送 TPS 并不大,则使用自旋锁;当消息体较大时,或者竞争极为激烈时,则启用互斥锁。 鱼和熊掌兼得—— k 次退避锁 为了解决这个问题,我们启动了锁优化的工作。我们希望有一把锁能够同时具备自旋锁、互斥锁的特点,同时适用于竞争激烈和不激烈的情况。我们最终决定改造自旋锁,通过一把特殊的自旋锁,使系统在各种竞争情况下都保持非常优质的锁行为。自旋锁由于无限自旋直到获取到锁,在临界区较大时会产生较多的空转,耗费大量的 CPU 资源。为了能有效利用自旋锁的优势,因此我们要在临界区较大时对其空转次数的控制,从而避免大量空转,最大程度兼容临界区较大的场景。 最终我们通过对自旋锁的行为建模,提出了 k 次退避锁:进行 K 次自旋后还未获得锁后,执行 Thread.yield() 将 CPU 执行权交给操作系统。这种行为能够避免互斥锁的无谓上下文切换,也能避免高压场景下的无限自旋带来的 CPU 损耗。 这种行为能够缓解系统压力,取得自旋和 CPU 上下文切换两中方法中的最低资源损耗。 在刚过去不久的 FM 24 会议上,Juntao Ji 已经做出 k 次自旋锁的相关理论建模分享,以及实验验证[3]。在k次自旋锁的作用下,我们能找到系统性能的局部最优点,达到最大的 tps 性能。结果如下表所示: 以 X86 架构,同步刷盘的行为为例。实验结果表明,在 k= 10^3 时,发送速度不仅达到峰值(155019.20),CPU 使用率也达到最低。这表明退避策略成功地节省了 CPU 资源。此时,CPU 支持更高的性能水平和较低的利用率水平,这表明性能瓶颈已经转移——例如,可能已经转移到了磁盘上。在表中可以观察到,在具有相同的 k(10^3)和配置参数(最新代码,SYNC 刷盘模式)的 ARM CPU 上,RocketMQ 的性能提高了 10.4%。此外,如上图所示,当 k= 10^3 时,CPU 使用量大幅下降,从平均超过 1000% 下降到 750% 左右。资源消耗的减少表明,减轻其他系统瓶颈可能会导致更显著的性能提高。 最优质的鱼和熊掌—— ABS 锁 在上文已经证明了 k 次退避锁的有效性。但是当前还有一个问题:我们发现当临界区足够大时,自旋锁的资源损耗依旧远超互斥锁(多次退避仍然获取不到)。这样 k 值其实带来的影响是负面的,也就是说,最终逃避不了上下文切换的成本,反而还带来了自旋的等待成本。 因此我们决定通过实现 k 的动态调整再次优化,当临界区较大时,对k进行自适应增大,当达到与互斥锁相同级别资源损耗时(即 k 达到自适应的最大值),此时自旋锁已经不再适合此种场景,因此我们将对其进行互斥锁的切换。这也是我们最终要实现的 ABS 锁。 1. ABS 锁实现 我们通过对不同的锁定机制进行理论分析其所适合的场景,并对其在所适合的场景进行大量实验测试得出多个场景的最优锁定机制。最后通过对运行时条件的动态变化(竞争线程数/TPS/消息大小/临界区大小)进行最优锁定机制的切换。 2. ABS 锁的 K 值自适应策略 上面我们论证出控制自旋次数对于性能优化有不错的效果,但是这个 K 值对于不同系统是不一样的,因此我们需要实现自旋次数 K 的自适应。 简而言之,K 的自适应策略就是一种从低频次自旋到高频次自旋的演化过程,对应临界区争抢变得不断严重或是临界区不断变大的过程。当 k 逐渐增大的过程中,可以增加在线程退避之前就获取到锁的概率,但是当自旋次数增加到一定数量级时,此时自旋成本已经高于线程上下文切换的成本,说明此时已经不适合使用自旋锁——所以此时退化为互斥锁。 通过实验,我们将自旋次数 K 自适应最大值设置为1万,因为在实验中,我们发现当自旋次数大于1万时,竞争激烈时带来的优化效果会受到显著影响,甚至大于一次 CPU 上下文切换的代价。所以此时我们将其切换为互斥阻塞等待锁定机制。 为了自适应调整 K 值,我们提出了一个闭环的工作流,如下图所示: 在图中我们可以看到,我们主要衡量当前 k 值下,自旋获取锁的成功率。如果在当前 k 值下获取锁的成功率不够高,则适当增加 k 值,这将带来更大的获取概率。但是如果一直增加都无法有效提高拿锁成功率,则将其转为互斥锁,会带来更高的效益。 成为互斥锁代表当前可能有较高的突发流量,导致对锁的竞争变得激烈了。但是互斥锁是不适用于低压场景的,所以我们还需要决定如何从互斥锁转回自旋锁。因此我们记录了从自旋锁切换为互斥锁的请求速率。当整体请求速率低于这个数值的 80% 时,则切换回自旋锁。 实验及结果 为了验证 ABS 锁的正确性以及性能优化效果,我们做了多组实验,包括性能测试以及混沌故障测试。本章将介绍具体的实验设计以及结果。 1. 性能实验配置 1. namesrv 一台,broker一台,openmessagebenchmark 一台压测机 2. 上述三台机器的配置为,处理器:8 vCPU,内存:16 GiB,规格族:ecs.c7.2xlarge,公网带宽:5Mbps,内网带宽:5/ 最高 10 Gbps。 3. 消息体大小分别设置了1KB以及2B两种场景,用于影响发送消息时的临界区大小。 2. 性能测试结果 CPU 与耗时的优化 消息 body 大小 1 kb 时,我们记录了不同消息发送速率下的 CPU 占用情况。结果如下图所示: 结果表明,在消息量不断增加时,我们的自适应锁带来了非常明显的优势,有效降低了 CPU 的使用率。 另外,我们还对消息发送时的 P9999 做了记录。P9999 代表发送过程中,发送耗时排在 99.99% 的尾部请求,一般反映了这段时间内的最慢请求速度。结果如下图所示: 可以看到,在不同 TPS 下,我们的自适应锁都能带来更优质的 P9999,有效降低了发送过程中由于锁争抢带来的耗时。 不过,尾部的请求情况一般代表竞争极为激烈时的极端场景。可能有的消息经过多次锁请求尝试但是都未获得锁,因此导致了这种长尾效应。我们还测试了在不同 TPS 下的平均发送耗时情况,发现在竞争极为严重时,我们的 ABS 锁由于自带了一定自旋,所以会让平均延时大约提升 0.5 ms。使用时需要仔细衡量具体场景,以确定是否在高压时开启自适应锁。 我们同时还针对消息体更小的实验场景做了实验,下表是完整的测试结果: 最大性能提升 此外,为了计算由自适应锁带来的性能提升,我们还测试了 Broker 的最大性能,结果如下所示: | CPU Arch | Flush Policy | Original QPS | Optimal QPS | Improvement | | | | | | | | X86 | ASYNC | 176312.35 | 184214.98 | +4.47% | | X86 | SYNC | 177403.12 | 187215.47 | +5.56% | | ARM | ASYNC | 185321.49 | 206431.82 | +11.44% | | ARM | SYNC | 188312.17 | 212314.43 | +12.85% | 根据该表,我们可以认为自适应锁同时能够在多个场景下均找到最大的性能点,实现性能的释放。 实验总结 经过如上所有实验数据,我们可以得出如下结论: 1. 在临界区较小时,ABS 锁提供更高的 TPS 以及更低的响应延时,极限 TPS 提高 12.85% 甚至更高,极限 TPS 下响应时间降低了 50% 左右; 2. 在临界区较大时,ABS 锁提供更低的 RT 以及更低的资源损耗,CPU 损耗从 500%400%,减少无效资源浪费。 3. 故障测试 在软件工程领域,提倡"拥抱故障"的理念意味着认识到错误和异常是正常的一部分,而非完全避免。像 RocketMQ 这样的关键系统,它在生产环境中承受的压力远超于理想化的实验室测试。为此,采用了混沌工程策略,这是一种主动探寻系统极限和脆弱性的实践。 混沌工程的核心目标是增强系统的鲁棒性和容错能力,它是通过在实际操作中人为制造故障,如网络延迟、资源限制等,来观察系统如何应对突发状况。这样做是为了确认系统是否能在不确定和复杂的现实中保持稳定,能否在面对真实世界的问题时依然能高效运作。 RocketMQ 引入自适应锁定机制后,进行了严格的混沌工程实验,包括但不限于模拟分布式节点间的通信故障、负载峰值等情况,目的是验证新机制在压力下的性能和恢复能力。只有当系统经受住这种高强度的“拷打”测试,证明它能在不断变化的环境中维持高可用性,我们才认为这是一个成熟的、可靠的解决方案。 总结起来,通过混沌工程,我们对 RocketMQ 进行了实战演练,以此来衡量其实现高可用性的真正实力。 并希望通过这样的故障测试,证明我们对锁的调整不影响其数据写入的正确性。 实验配置 我们混沌测试的验证实验环境如下: 1. namesrv 一台,内含 namesrv 进程。 2. openchaos 的混沌测试一台,向 broker 发出控制指令。 3. broker 三台,同属一个集群,内含 broker 进程。 上述五台机器的配置为,处理器:8 vCPU,内存:16 GiB,规格族:ecs.c7.2xlarge,公网带宽:5Mbps,内网带宽:5/ 最高 10 Gbps。 在测试中,我们设置了如下的若干种随机测试场景,每种场景都会持续至少 30 秒,且恢复后会保证 30 秒的时间间隔再注入下一次故障: 1. 机器宕机,这个混沌故障注入通过 kill 9 命令实现,将会杀死范围内的随机进程。 2. 机器夯机,这个混沌故障注入通过kill SIGSTOP 命令实现,模拟进程暂停的情况。 每组场景的测试至少重复 5 次,每次至少持续 60 分钟。 实验结论 针对上述提出的所有场景,混沌测试的总时长至少有: 2(场景数) 5(每组测试次数) 60(单组时长) = 600分钟 由于设置的注入时长 30s,恢复时长 30s,因此至少共计注入故障 600/1 = 600 次(实际上注入时长、注入次数远多于上述统计值)。 在这些记录在册的测试结果中,RocketMQ 无消息丢失,数据在故障注入前后均保持强一致。 总结 本文实现了 RocketMQ 锁定机制的迭代过程以及自适应退避自旋锁机制(ABS 锁)的设计以及实现,随着并发系统内部变得越来越复杂,部署有效的锁管理策略是保持性能的关键。因此,我们希望更深入地探索该领域性能优化的潜力,探索性能的极限。 同时我们希望在未来结合多种分布式锁定机制以及其他优秀思想,对其进行实现以及性能测试,期待达到多端本地锁定状态共识:单次通信即可得到锁的成果,减少无效网络通信以及更低的总线资源损耗。但目前专注于消息投递时的锁定机制实现,并未对其进行具体实现以及测试。其它分布式锁机制可以参考如下。 未来可能引入的其它锁优化思想 延迟槽位思想(Delay card slot idea): 对每个客户端分配不同槽位的延时,让客户端进行延迟后再次请求,从而避免总线的大量无效碰撞。 CLH 思想: 循环读取上一个位点的值,更改自身状态。 MCS 思想: 与 CLH 相似,但是时在本地自旋,更改下一个节点的状态。解决了 CLH 在 NUMA 系统架构中获取 locked 域状态内存过远的问题 SMP CPU 架构 基于 Ppersistent CSMA 思想: Ppersistent CSMA(Persistent Carrier Sense Multiple Access)是一种网络通信协议的思想,它通常用于描述分布式环境中如无线局域网(WiFi)中的冲突避免策略。这个思想起源于 CSMA/CD(Carrier Sense Multiple Access with Collision Detection)协议,但它引入了一种更持久的监听机制。 1. 基本流程: 每个设备(节点)在发送数据前会持续监听信道是否空闲。如果信道忙,节点就会等待一段时间后再次尝试。 2. P计数器: 当检测到信道忙时,节点不会立即退回到等待状态,而是启动一个名为 P(persistent period)的计数器。计数器结束后,再尝试发送。 3. 碰撞处理: 如果有多个节点同时开始发送,在信道上发生碰撞,所有参与碰撞的节点都会注意到并增加各自的P计数器后再试。这使得试图发送的数据包在更长的时间内有机会通过,提高了整体效率。 4. 恢复阶段: 当 P 计数器归零后,如果信道依然繁忙,节点会进入恢复阶段,选择一个新的随机延迟时间后再次尝试。 Ppersistent CSMA 通过这种设计减少不必要的冲突次数,但可能会导致网络拥塞情况下的长时间空闲等待。 参考链接 [1] Apache RocketMQ [2] OpenMessaging OpenChaos [3] Ji, Juntao & Gu, Yinyou & Fu, Yubao & Lin, Qingshan. (2024). Beyond the Bottleneck: Enhancing HighConcurrency Systems with Lock Tuning. 10.1007/9783031711770_20. [4] Chaos Engineering [5] T. E. Anderson, "The performance of spin lock alternatives for sharedmoney multiprocessors," in IEEE Transactions on Parallel and Distributed Systems, vol. 1, no. 1, pp. 616, Jan. 1990, doi: 10.1109/71.80120 [6] Y. Woo, S. Kim, C. Kim and E. Seo, "Catnap: A Backoff Scheme for Kernel Spinlocks in ManyCore Systems," in IEEE Access, vol. 8, pp. 2984229856, 2020, doi: 10.1109/ACCESS.2020.2970998 [7] L. Li, P. Wagner, A. Mayer, T. Wild and A. Herkersdorf, "A nonintrusive, operating system independent spinlock profiler for embedded multicore systems," Design, Automation & Test in Europe Conference & Exhibition (DATE), 2017, Lausanne, Switzerland, 2017, pp. 322325, doi: 10.23919/DATE.2017.7927009. [8] OpenMessaging benchmark
作者:王怀远、季俊涛
#技术探索

2024年10月21日

Apache RocketMQ 创新论文被软件工程顶会 FM 2024 录用
近日,由阿里云消息队列团队发表的关于 RocketMQ 锁性能优化论文被 CCFA 类软件工程顶级会议 FM 2024 录用。 FM 2024 是由欧洲形式化方法协会(FME)组织的第 24 届国际研讨会,会议汇聚了来自各国的形式化研究学者,是形式化方法领域的顶级会议。FM 2021 强调形式化方法在广泛领域的开发和应用,包括软件、网络物理系统和基于计算机的综合系统。形式化方法以严格的数学化和机械化方法为基础来规约、构建和验证计算系统,是改善和确保计算系统质量的重要方法,其模型、技术和工具已延生成为计算思维的重要载体。 此次被录用的论文为《Beyond the Bottleneck: Enhancing HighConcurrency Systems with Lock Tuning》。此论文灵感来源于 RocketMQ 适配阿里云倚天 CPU 的性能优化过程中。RocketMQ 此前在发送消息的过程中存在两种锁:自旋锁和互斥锁。我们发现,不同 CPU 适合的锁行为并不相同。糟糕的锁行为可能导致性能的大幅下滑,而适配的锁行为能够在提升性能的同时降低资源损耗。这两种锁在版本迭代过程中,都在线上版本中使用过,且对于不同的版本来说,使用这两种锁可能带来截然不同的性能结果。 因此,本文旨在提出一种新的自适应 K 值退避锁,能够让高并发系统的部署者无需考虑两种锁的优劣势,只需使用一把锁即可实现性能的最优以及最低的资源损耗。 换言之,我们希望有一把锁能够同时具备自旋锁、互斥锁的特点,同时适用于竞争激烈和不激烈的情况。我们最终决定改造自旋锁,通过一把特殊的自旋锁,使系统在各种竞争情况下都保持非常优质的锁行为。自旋锁由于无限自旋直到获取到锁,在临界区较大时会产生较多的空转,耗费大量的 CPU 资源。为了能有效利用自旋锁的优势,因此我们要在临界区较大时对其空转次数的控制,从而避免大量空转,最大程度兼容临界区较大的场景。 最终,我们基于排队论,通过对自旋锁的行为建模,得到了自旋次数与系统负载的关系: 公式中,是一把锁的整体期望获取时间。它分别由两部分组成:期望自旋耗时以及期望上下文切换耗时。将二者与自旋次数 K 和系统负载 P 的关系代入,则得到了上述的最终公式。公式中的 Ts 是单次自旋耗时,Tc 是单次上下文切换耗时。 我们最终基于系统的最大压力场景提出了自适应 K 值退避锁:进行 K 次自旋后还未获得锁后,执行 Thread.yield() 将 CPU 执行权交给操作系统。 这种行为能够避免互斥锁的无谓上下文切换,也能避免高压场景下的无限自旋带来的 CPU 损耗。这种行为能够缓解系统压力,取得自旋和 CPU 上下文切换两种方法中的最低开销。 在自适应 K 值退避锁的作用下,我们能找到系统性能的局部最优点,达到最大的 TPS 性能。结果如下表所示: 消息发送最大 TPS 的性能优化结果 此外,我们还检查了各个 K 值下的 Broker 资源损耗情况,发现在最大 TPS 时的 K 值,同时也是资源占用相对最低时的 K 值: 各个 K 值下的 CPU 使用率 以 X86 架构,同步刷盘的行为为例。实验结果表明,在 k= 10^3 时,发送速度不仅达到峰值(155019.20),CPU 使用率也达到最低。这表明退避策略成功地节省了 CPU 资源。此时,CPU 支持更高的性能水平和较低的利用率水平,这表明性能瓶颈已经转移——例如,可能已经转移到了磁盘上。在表中可以观察到,在具有相同的 k(10^3)和配置参数(最新代码,SYNC 刷盘模式)的 ARM CPU 上,RocketMQ 的性能提高了 10.4%。此外,如上图所示,当 k= 10^3 时,CPU 使用量大幅下降,从平均超过 1000% 下降到 750% 左右。资源消耗的减少表明,减轻其他系统瓶颈可能可以带来更显著的性能提高。 附论文信息 录用论文题目: 《Beyond the Bottleneck: Enhancing HighConcurrency Systems with Lock Tuning》 作者: 季俊涛,古崟佑,傅玉宝,林清山 论文概述: 高并发系统常常面临性能瓶颈,主要是由于线程间激烈竞争锁导致的等待和上下文切换。作为一家云计算公司,我们非常重视性能的最大化。为此,我们对轻量级自旋锁进行了改进,并提出了一种简洁的参数微调策略,能够在最低风险条件下突破系统性能瓶颈。该策略在高吞吐量消息队列系统 Apache RocketMQ 中得到了验证,实现了 X86 CPU 性能提升 37.58% 和 ARM CPU 性能提升 32.82%。此外,我们还确认了这种方法在不同代码版本和 IO 刷新策略下的一致有效性,显示出其在实际应用中的广泛适用性。这项工作不仅为解决高并发系统的性能问题提供了实用工具,还突显了形式化技术在工程问题解决中的实际价值。 相关链接:
#技术探索

2024年8月30日

基于Apache RocketMQ 的云原生 MQTT 消息引擎设计
概述 随着智能家居、工业互联网和车联网的迅猛发展,面向 IoT(物联网)设备类的消息通讯需求正在经历前所未有的增长。在这样的背景下,高效和可靠的消息传输标准成为了枢纽。MQTT 协议作为新一代物联网场景中得到广泛认可的协议,正逐渐成为行业标准。 本次我们将介绍搭建在 RocketMQ 基础上实现的 MQTT 核心设计,本文重点分析 RocketMQ 如何适应这些变化,通过优化存储和计算架构、推送模型及服务器架构设计,推动 IoT 场景下消息处理的高效性和可扩展性以实现 MQTT 协议。 此外,阿里云 MQTT 以 RocketMQMQTT 为基础,不断进行迭代创新。阿里云是开源 RocketMQMQTT 的主要贡献者和使用者之一。面对设备通信峰谷时段差异性的挑战,本文将介绍阿里云如何将 Serverless 架构应用于消息队列,有效降低运营成本,同时利用云原生环境的特性,为 IoT 设备提供快速响应和灵活伸缩的通讯能力。 进一步地,我们将探讨介绍在云端生态体系中整合 MQTT 的实践,介绍基于统一存储的数据生态集成方案,展示其强大的技术能力和灵活的数据流转能力。 loT 消息场景 (消息场景对比) 物联网技术,作为当代科技领域的璀璨明星,其迅猛发展态势已成共识。据权威预测,至 2025 年,全球物联网设备安装基数有望突破 200 亿大关,这一数字无疑昭示着一个万物互联时代的到来。 更进一步,物联网数据量正以惊人的年增长率约 28% 蓬勃膨胀,预示着未来数据生态中超过 90% 的实时数据将源自物联网。这一趋势深刻改变了数据处理的格局,将实时流数据处理推向以物联网数据为核心的新阶段。 边缘计算的崛起,则是对这一变革的积极响应。预计未来,高达 75% 的数据处理任务将在远离传统数据中心或云端的边缘侧完成。鉴于物联网数据的海量特性,依赖云端进行全部数据处理不仅成本高昂,且难以满足低延迟要求。因此,有效利用边缘计算资源,就地进行数据初步处理,仅将提炼后的关键信息上传云端,成为了提升效率、优化用户体验的关键策略。 在此背景下,消息传递机制在物联网场景中的核心价值愈发凸显: + 桥梁作用:消息系统充当了物联网世界中的“神经网络”,无缝衔接设备与设备、设备与云端应用间的沟通渠道,构筑起云边端一体化的应用框架,确保了信息交流的即时与高效。 + 数据加工引擎:面对物联网持续涌动的数据洪流,基于消息队列(MQ)的事件流存储与流计算技术成为了解锁实时数据分析潜能的钥匙。这一机制不仅能够实时捕捉、存储数据流,还支持在数据产生的瞬间执行计算操作,为物联网应用提供了强大的数据处理基础架构,助力实现数据的即时洞察与决策响应。 总之,消息技术不仅是物联网架构的粘合剂,更是驱动数据流动与智能决策的核心动力,其在物联网领域的应用深度与广度,正随着技术迭代与市场需求的双重驱动而不断拓展。 同时传统消息场景和物联网消息场景有很多的不同,包含以下几个特点: 1)硬件资源差异 传统消息场景依托于高性能、高可靠性的服务集群,运算资源充沛,客户端部署环境多为容器、虚拟机乃至物理服务器,强调集中式计算能力。相比之下,物联网消息场景的客户端直接嵌入至网络边缘的微型设备中,如传感器、智能家电等,这些设备往往受限于极低的计算与存储资源,对能效比有着极高要求。 2)网络环境挑战 在经典的内部数据中心(IDC)环境中,消息处理享有稳定的网络条件和可控的带宽、延迟指标。而物联网环境则拓展至公共网络,面对的是复杂多变的网络状况,尤其在偏远地区或网络覆盖弱的区域,不稳定的连接成为常态,对消息传输的健壮性和效率提出了更高挑战。 3)客户端规模的量级跃迁 传统消息系统处理的日均消息量通常维持在百万级,适用于集中度较高的消息分发。物联网的兴起促使设备数量呈爆炸性增长,动辄涉及亿万级别的终端节点,这对系统的扩展性、消息路由的高效性提出了全新的要求。 4)生产与消费模式的演变 传统场景倾向于采用集中式同步生产模式,强调消息的一致性与有序处理。而物联网消息生成则体现出分散化的特性,每个设备根据其感知环境独立产生少量消息,这对消息收集与整合机制设计提出了新的思考。消费模式上,物联网场景经常涉及大规模广播或组播,单条消息可能触达数百万计的接收者,要求系统具备高效的广播能力和灵活的订阅管理机制。 RocketMQ 的融合架构设计 (融合架构设计图) 我们看到,物联网所需要的消息技术与经典的消息设计有很大的不同。接下来我们来看看基于 RocketMQ 的融合架构 MQTT 设计为了解决物联网的消息场景有哪些特点。 1)融入 MQTT 协议,适应物联网环境特性 RocketMQ 5.0 通过整合 RocketMQMQTT,紧密贴合了物联网领域广泛采用的MQTT协议标准。此协议针对低功耗、网络条件不稳定的情况优化,以其轻量级特性和丰富的功能集脱颖而出,支持不同的消息传递保障级别,满足了从“最多一次”到“仅一次”的多样化需求。协议的领域模型与 RocketMQ 的核心组件高度协调,促进了消息、主题、发布订阅模式的自然融合,为建立一个从设备到云端的无缝消息传递体系打下了稳固的基础。 2)灵活的存储与计算解耦架构 为了应对物联网场景下对高并发连接和大规模数据处理的需求,RocketMQ 5.0 采取了存储与计算分离的架构设计。RocketMQ Broker 作为核心存储组件,确保了数据的持久化与可靠性,而 MQTT 相关的逻辑操作则在专门的代理层实施,这不仅优化了对大量连接、复杂订阅关系的管理,也增强了实时消息推送的能力。这种架构允许根据业务负载动态调整代理层资源,通过增加代理节点来平滑应对设备连接数的增加,体现了系统良好的弹性和扩展潜力。 3)促进端云数据协同的整合架构 RocketMQMQTT 通过其整合的架构设计,促进了物联网设备与云端应用之间的高效数据共享。基于统一的消息存储策略,每条消息在系统内只需存储一次,即可供两端消费,减少了数据冗余,提高了数据流通的效率。此外,RocketMQ 作为数据流的存储中枢,自然而然地与流计算技术结合,为实时分析物联网生成的海量数据提供了便利,加速了数据价值的发掘过程。 存储设计 首先要解决的问题是物联网消息的存储模型。在发布订阅业务模型中,常用的存储模型有两种,写放大和读放大,我们将依次分析两种模型。 (写放大模型) (读放大模型) 写放大模型: 在此模型下,每位消费者拥有专属的消息队列,每条消息需要复制并分布到所有目标消费者的队列中,消费者仅关注并处理自己队列中的消息。以三级主题/Topic/subTopic/test 为例,若该主题吸引了大量客户端订阅,采取“一客一队列”的策略,即每个符合订阅规则的客户端或通配符订阅均维护一份消息副本,将导致消息的存储需求随订阅者数量线性增长。 尽管这种模式在某些传统消息场景,比如遵循 AMQP 协议的应用中表现得游刃有余,因为它确保了消息传递的隔离性和可靠性。但在物联网场景下,特别是当单个消息需被数以百万计的设备消费时,“写放大”策略将引发严重的存储资源消耗问题,迅速成为不可承受之重。 读放大的考量与挑战: 鉴于物联网场景的特殊需求,直接应用传统的“写放大”模型显然是不可行的。为解决这一难题,RocketMQMQTT 采取了更为高效与灵活的存储策略,旨在减少存储冗余,提高系统整体的可扩展性和资源利用率: 在“读放大”模式下,每条消息实际上被存储一次,而为了支持通配符订阅的高效检索,系统在消息存储阶段会创建额外的索引信息——即 consume queue(消费队列)。对于如/Topic/subTopic/这样的通配符订阅,系统会在每个匹配的通配符队列中生成相应的索引,使得订阅了不同通配符主题或具体主题的消费者,都能通过这些共享的存储实体找到并消费到消息。尽管这看似增加了“读”的复杂度,但实际上,每个 consume queue 作为索引,其体积远小于原始消息,显著降低了整体存储成本,同时提高了消息检索与分发的效率。 (原子分发示意图) 为此,我们设计了一种多维度分发的 Topic 队列模型,如上图所示,消息可以来自各个接入场景(如服务端的 MQ/AMQP、客户端的 MQTT),但只会写一份存到 commitlog 里面,然后分发出多个需求场景的队列索引(ConsumerQueue),如服务端场景(MQ/AMQP)可以按照一级 Topic 队列进行传统的服务端消费,客户端 MQTT 场景可以按照 MQTT 多级 Topic 以及通配符订阅进行消费消息。这样的一个队列模型就可以同时支持服务端和终端场景的接入和消息收发,达到一体化的目标。 实现这一模型,RocketMQ依托了两项关键技术特性: + 轻型队列(Light Message Queue) 这一特性允许一条消息被灵活地写入多个 topic queue 中,确保了消息能够高效地适应各种复杂的订阅模式,包括但不限于通配符订阅。它为读放大模型的实现提供了必要的灵活性和效率基础。 + 百万队列能力 RocketMQ 通过集成 RocksDB 这一高性能键值存储引擎,充分利用其在顺序写入方面的优势,实现了百万级别的队列管理能力。特别是通过定制化配置,去除了 RocksDB 内部的日志预写(WriteAhead Log, WAL),进一步优化了存储效率。RocksDB 不仅为 consume queue 提供了稳定高效的存储方案,还确保了即便在极端的队列数量下,系统依然能够保持高性能的索引处理能力。 (轻型队列的实现) 通过采用“读放大”模型,结合 RocketMQ 的轻型队列特性和百万队列的底层技术支持,我们不仅有效解决了物联网环境下消息存储与分发的挑战,还实现了存储成本与系统性能的双重优化。这种设计不仅减少了存储空间的占用,还通过高度优化的索引机制加快了消息检索速度,为大规模物联网设备的消息通信提供了一个既经济又高效的解决方案。 推送模型 (RocketMQMQTT 推送模型) 在介绍完底层队列存储模型之后,我们将重点探讨匹配查找和可靠送达的实现机制。在传统的消息队列 RocketMQ 中,经典的消费模式是消费者通过客户端直接发起长轮询请求,以精准地获取对应主题的队列消息。然而,在 MQTT 场景下,由于客户端数量众多且订阅关系复杂,长轮询模式显得不够有效,因此消费过程变得更加复杂。为此,我们采用了一种推拉结合的模型。 本模型的核心在于终端通过 MQTT 协议连接至代理节点,消息可以来源于多种场景(如MQ、AMQP、MQTT)。当消息存入主题队列后,通知逻辑模块将实时监测到新消息的到达,进而生成消息事件(即消息的主题名称),并推送至网关节点。网关节点根据连接终端的订阅状态进行内部匹配,识别能够接收这一消息的终端,随后触发拉取请求,以从存储层读取消息并推送至终端。 在这个流程中,一个关键问题是通知模块如何确定终端感兴趣的消息,以及哪些网关节点会对此类消息感兴趣。这实际上是一个核心的匹配搜索问题。常见的解决方案主要有两种:第一种是简单的事件广播,第二种是将线上订阅关系集中存储(例如图中的 Lookup 模块),然后进行匹配搜索,再执行精准推送。 虽然事件广播机制在扩展性上存在一定问题,但其性能表现仍然良好,因为我们推送的数据量相对较小,仅为 Topic 名称。此外,同一 Topic 的消息事件可以合并为一个事件,这是我们当前在生产环境中默认采用的方式。另一方面,将线上订阅关系集中存储在 RDS 或 Redis 中也是一种普遍的做法,但这需要保证数据的实性,匹配搜索的过程可能会对整体实时消息链的延迟产生影响。 在该模型中,还设计了一个缓存模块,以便在需要广播大量消息时,避免各个终端对存储层发起重复的读取请求,从而提高整体系统的效率。 阿里云 MQTT 在 Serverless 上的实践 随着云原生技术的不断发展,现代消息中间件逐渐以容器编排为基础,如何实现真正的无服务器架构及秒级弹性管理已成为一项重要的研究课题。 阿里云作为开源 RocketMQMQTT 的主要贡献者和使用者之一,在 MQTT 弹性设计上有很多优化方式和实践经验。我们将介绍阿里云在弹性上的设计思路,展示其如何实现高效、弹性强的 MQTT 消息中间件: 1)抽离网络连接层 阿里云 MQTT 采用类似 Sidecar 的模式,将网络连接层与核心业务逻辑进行分离,使用 Rust 语言来处理网络连接。与 Java 相比,Rust 在内存消耗和启动速度上具有显著优势,尤其在处理大规模 MQTT 连接时,能够有效降低内存占用。 2)秒级扩容 每个 Pod 的资源请求设置较低,同时预留部分 Pod 专门运行 Rust 进程。在扩容需求出现时,系统能够快速启动 MQTT Proxy 进程,省去 Pod 创建和资源挂载的时间,从而显著提升响应速度。 3)弹性预测与监控 利用连接数、TPS、内存、CPU 等白盒指标,以及 RT 等黑盒指标,阿里云 MQTT 依据指标联动规则,制定了合理的扩容策略。这使得系统能够提前预测负载变化并启动 Pod 扩容,确保长期平稳运行。 通过以上设计思路,阿里云能够构建一个高效、弹性强的基于 RocketMQ 的 MQTT 实现方案,充分利用 Rust 带来的性能优势,同时保持系统的稳定性与可扩展性。这种创新设计将在实际应用中显著提升用户体验,助力系统整体性能的优化。 阿里云 MQTT 在车联网中的实践架构 随着汽车出行领域新四化(电气化、智能化、网联化和共享化)的推进,各大汽车制造商正逐步构建以智能驾驶和智能网联为核心的车联网系统。这一新一代车联网系统对底层消息采集、传输和处理的平台架构提出了更高的要求。接下来,我们将介绍阿里云 MQTT 在车联网中的实践架构及其应用价值。 在架构图中,我们可以看到常见的车联网设备,包括车载终端、路测单元和手机端系统。这些设备确保了安全的连接与数据传输。车端的功能涵盖车机数据上报、POI 下发、文件推送、配置下发、消息推送等全新车联业务。这些操作将产生海量的消息 Topic,需要更加安全、稳定的接入与传输,以实现可靠的消息订阅与发布。路端则强调路侧 RSU 的安全接入,支持消息的采集、传输以及地图数据的实时更新。 接入端支持多种协议,包括 TCP、x509、TLS、WSS、WS、OpenAPI 和 AMQP,以满足不同应用场景的灵活需求。这种多协议支持确保了设备之间的无缝互联与高效通信。 在流转生态方面,物联网场景下,各种设备持续产生大量数据,业务方需要对这些数据进行深入分析与处理。采用 RocketMQ 作为存储层,系统能够只保存一份消息,并支持物联网设备和云端应用的共同消费。RocketMQ 的流存储特性使得流计算引擎能够无缝、实时地分析物联网数据,为关键决策提供及时支持。 借助阿里云 EventBridge,MQTT 物联网设备所生成的信息可以顺利流转至 Kafka、AMQP、FC、Flink 等其他中间件或数据处理平台,实现深度的数据分析与处理。事件总线 EventBridge 是阿里云提供的一款 Serverless 总线服务,支持阿里云服务、自定义应用、SaaS 应用以标准化、中心化的方式接入,并能够以标准化的 CloudEvents 1.0 协议在这些应用之间路由事件,帮助轻松构建松耦合、分布式的事件驱动架构。这种灵活的数据流转能力不仅提升了处理速度,还为未来智能化应用的创新和发展奠定了基础。 通过以上架构,可以清晰地看到阿里云 MQTT 在车联网领域的最佳实践,为实现未来智能出行提供了可靠的技术支撑。 结语 在物联网的蓬勃发展背景下,消息传递技术的不断演进已成为支撑智能家居、工业互联网以及车联网等领域的重要基石。通过对 RocketMQ 和 MQTT 协议的深度融合,我们不仅有效解决了物联网时代对高效、可靠消息传输的需求,也为设备通信带来了灵活的解决方案。 阿里云在这一领域的积极探索,通过引入 Serverless 架构,不断推进 MQTT 的技术迭代与创新。这样的设计能够在面对高并发连接和海量数据时,动态调整资源配置,降低成本并提升响应速度,确保了实时数据处理的高效性。 当前,社区正在推动 MQTT 5.0 协议方面已取得显著进展,新的协议特性如更丰富的错误码、更灵活的连接选项以及 will 消息、retain 消息、共享订阅功能都将进一步提升系统的灵活性和可靠性。与此同时,我们在致力于实现更快的弹性扩展能力,以便在面对突发流量时及时响应,提高系统的可用性和灵活性。 随着 IoT 生态体系的不断完善,面对日益复杂的消息场景,消息技术的价值愈发凸显。我们相信,未来通过不断优化的消息架构,能够推动更深层次的智能化应用,同时为构建万物互联的未来奠定坚实的基础。让我们期待 MQTT 在物联网技术的场景中展现其无限可能,同时也继续探索持续探索在确保安全、稳定、高效的消息中间件。
作者:沁君
#技术探索 #物联网

2024年8月22日

一文详解Apache RocketMQ 如何利用 Raft 进行高可用保障
前言 Apache RocketMQ 自诞生以来,因其架构简单、业务功能丰富、具备极强可扩展性等特点被众多企业开发者以及云厂商广泛采用。历经十余年的大规模场景打磨,RocketMQ 已经成为业内共识的金融级可靠业务消息首选方案,被广泛应用于互联网、大数据、移动互联网、物联网等领域的业务场景。由于其业务场景愈加丰富,在工业界的使用率日益提高,开发者们也必须更完善地考虑 RocketMQ 的可靠性、可用性。 由于 RocketMQ 底层实际上是一种基于日志的存储系统,而前人为了避免这种存储系统中单个机器可能出现的数据丢失、单点故障等问题,已经有了相对成熟的解决方案——例如同时复制数据到多个机器上。在这个过程中,需要解决的问题便被简化了:如何保证多个机器上的数据是一致的,而且这种一致性强大到可以对抗宕机、脑裂等问题。而这些问题,可以通过分布式一致性算法来彻底解决。 在开源的 Apache RocketMQ 中,我们已经引入了 DLedger [2] 和 SOFAJRaft [3] 来作为 Raft [4] 算法的具体实现,以支撑系统高可用。本文将介绍 RocketMQ 如何利用Raft(一种简单有效的分布式一致性算法)进行高可用的保障。 分布式一致性算法:Raft 共识(Consensus)是分布式系统中实现容错的一个基本问题。它指的是在一个系统中,多个服务器需要就某些值达成一致的意见。一旦它们对一个值做出了决定,这个决定就是不可变更的 [1]。典型的共识算法能够在多数服务器可用的情况下继续运行;比如说,在一个有 5 台服务器的集群中,即使有 2 台服务器宕机,整个集群依然能够正常运作。如果宕机的服务器数量超过半数,集群就无法继续正常运行了(但它也绝不会返回错误的结果)。 业界比较有名的分布式一致性算法是 paxos [12],不过可惜的是它比较晦涩难懂,难懂的代价就是很少有人能掌握它然后基于它做出可靠的实现。不过幸好 Raft 及时出现,它易于理解,并且已经有非常多的业界使用先例,比如 tikv、etcd 等。 这是 Raft 的原始论文,详细描述了 Raft:《In Search of an Understandingable Consensus Algorithm》 [4]。本文的略短版本在 2014 年 USENIX 年度技术会议上获得了最佳论文奖。有意思的是,在论文的 Abstract 中,第一句话便是:Raft is a consensus algorithm for managing a replicated log. 在英文中,log 不仅有“日志”的意思,还有“木头”的意思,一组"replicated log"便组成了木筏,这和 Raft 的英文原意不谋而合。而且,Raft 的主页中,也采用了三根木头组成的筏作为 logo(如图 1)。当然,对 Raft 更正式的解释还是这些单词的首字母缩写:Re{liable | plicated | dundant} And FaultTolerant. 也就是 Raft 被提出时要解决的问题初衷。 (图 1:Raft 主页标题后附的头图) Raft 算法是一种为了管理复制日志的共识算法,它将整个共识过程分解为几个子问题:领导者选举、日志复制和安全性。整个 Raft 算法因此变得易理解、易论证、易实现,从而让分布式一致性协议可以较为简单地实现。Raft 和 Paxos 一样,只要保证 n/2+1 节点即多数派节点正常就可以对外提供服务。 在 Raft 算法中,集群中的每个节点(服务器)可以处于以下三种状态之一: a. Follower(跟随者):这是所有节点的初始状态。它们被动地响应来自领导者和候选者的请求。 b. Candidate(候选者):当跟随者在一段时间内没有收到来自领导者的消息时,它们会成为候选者,并开始选举过程以成为新的领导者。 c. Leader(领导者):集群中的管理节点,负责处理所有客户端请求,并将日志条目复制到其他节点。 Raft 算法通过任期的概念来分隔时间,每个任期开始都会进行一次领导者选举。任期是一个递增的数字,每次选举都会增加。如果跟随者在“选举超时”之内没有收到领导者的心跳,它会将自己的任期号加一,并转变为候选者状态来发起一次领导者选举。候选者首先给自己投票,并向其他节点发送请求投票的 RPC。如果接收节点在当前任期内还没有投票,它会同意投票给请求者。 如果候选者在一次选举中从集群的大多数节点获得了选票,它就会成为领导者。在此过程中生成的任期号用于节点之间的通信,以防止过时的信息导致错误。例如,如果节点收到任期号比自己小的请求,它会拒绝该请求。 极端情况下集群可能会出现脑裂或网络问题,此时集群可能会被分割成几个互不通信的子集。不过由于 Raft 算法要求一个领导者必须拥有集群大多数节点的支持,这保证了即使在脑裂的情况下,最多只有一个子集能够选出一个有效的领导者。 Raft 通过日志复制来保持节点间的一致性。对于一个无限增长的序列 a[1, 2, 3…],如果对于任意整数 i,a[i] 的值满足分布式一致性,这个系统就满足一致性状态机的要求 基本上所有的真实系统都会有源源不断的操作,这时候单独对某个特定的值达成一致显然是不够的。为了让真实系统保证所有的副本的一致性,通常会把操作转化为 writeaheadlog(WAL)。然后让系统中所有副本对 WAL 保持一致,这样每个副本按照顺序执行 WAL 里的操作,就能保证最终的状态是一致的,如图 2 所示。 (图 2:如何通过日志复制来确保节点间数据一致。阶段一为 Client 向 leader 发送写请求,阶段二为 Leader 把‘操作’转化为 WAL 并复制,阶段三为 Leader 收到多数派应答,并将操作应用到状态机) 领导者在收到客户端的请求后,会先将请求作为新的日志条目追加到它的日志中,然后并行地将该条目复制到其他节点。只有当大多数节点都写入了这个日志条目,领导者才会将该操作提交,并应用到它的状态机上,同时通知其他节点也提交这个日志条目。因此,Raft 确保了即使在领导者崩溃或网络分区的情况下,也不会有数据丢失。任何被提交的日志条目都保证在后续的任期中也存在于任意新的领导者的日志中。 简单来说,Raft 算法的特点就是 Strong Leader: a. 系统中必须存在且同一时刻只能有一个 Leader,只有 Leader 可以接受 Clients 发过来的请求; b. Leader 负责主动与所有 Followers 通信,负责将“提案”发送给所有 Followers,同时收集多数派的 Followers 应答; c. Leader 还需向所有 Followers 主动发送心跳维持领导地位(保持存在感)。 一句话总结 Strong Leader: "你们不要 BB! 按我说的做,做完了向我汇报!"。另外,身为 Leader 必须保持一直 BB(heartbeat)的状态,否则就会有别人跳出来想要 BB 。 为了更直观的感受到 Raft 算法的运行原理,笔者强烈推荐观看下面网站中的演示。它以动画的形式,非常直观地展示了 Raft 算法是如何运行的,以及如何应对脑裂等问题的:_https://thesecretlivesofdata.com/raft/_ (图 3:Raft 算法选举过程图示) 相信观看过上面网站中的图解后,读者应该了解了 Raft 的设计思想与具体算法,下面我们直接切入正题,讲解 RocketMQ 与 Raft 的前世今生。 RocketMQ 与 Raft 的前世今生 RocketMQ 尝试融合 Raft 算法已经非常之久,这期间的融合方式也经历过变革。发展至今,Raft 也只是 RocketMQ 高可用机制中的一小部分,RocketMQ 已然发展出了一套适合自身的高可用共识协议。 本章主要阐述 RocketMQ 为了在系统内实现 Raft 算法作出过哪些尝试,以及当前 Raft 在 RocketMQ 中的存在形态与具体作用。 Raft 在 RocketMQ 中的初期形态 RocketMQ 引入 Raft 协议的主要原因是为了增强系统的高可用性和故障自动恢复能力。在 4.5 版本之前,RocketMQ 只有 Master/Slave 一种部署方式,即一组 broker 中仅有一个 Master,有零到多个 Slave,这些 Slave 以同步或者异步的方式去复制 Master 中的数据。然而这种方式存在一些限制: 1. 故障转移不是完全自动的: 当 Master 节点出现故障时,需要人工介入进行手动重启或者切换到 Slave 节点。 2. 对外部依赖较高: 虽然可以通过第三方协调服务(如 ZooKeeper 或 etcd)实现自动选主,但这增加了部署和运维的复杂性,同时第三方服务本身的故障也可能影响到 RocketMQ 的集群。 为了解决上述问题,RocketMQ 引入了基于 Raft 协议的 DLedger 存储库 [5]。DLedger 是一个分布式日志复制技术,它使用 Raft 协议,可以在多个副本之间安全地复制和同步数据。RocketMQ 4.5 版本发布后,可以采用 RocketMQ on DLedger 方式进行部署。DLedger commitlog 代替了原来的 CommitLog,使得 CommitLog 拥有了选举复制能力,然后通过角色透传的方式,raft 角色透传给外部 broker 角色,leader 对应原来的 master,follower 和 candidate 对应原来的 slave: (图 4:RocketMQ on DLedger 部署形态,每个 broker 间的角色由 Raft CommitLog 向外透传) 因此 RocketMQ 的 broker 拥有了自动故障转移的能力。在一组 broker 中, Master 挂了以后,依靠 DLedger 自动选主能力,会重新选出 leader,然后通过角色透传变成新的 Master。DLedger 还可以构建高可用的嵌入式 KV 存储。我们把对一些数据的操作记录到 DLedger 中,然后根据数据量或者实际需求,恢复到hashmap 或者 rocksdb 中,从而构建一致的、高可用的 KV 存储系统,应用到元信息管理等场景。 我们测试了各种故障下 Dledger 表现情况,包括随机对称分区,随机杀死节点,随机暂停一些节点的进程模拟慢节点的状况,以及 bridge、partitionmajoritiesring 等复杂的非对称网络分区。在这些故障下,DLedger 都保证了一致性,验证了 DLedger 有很好可靠性 [6]。 总结来说,引入 Raft 协议后,RocketMQ 的多副本架构得以增强,提高了系统的可靠性和自我恢复能力,同时也简化了整个系统的架构,降低了运维的复杂性。这种架构通过 Master 故障后短时间内重新选出新的 Master 来解决单主问题,但是由于 Raft 选主和复制能力一同在数据链路(CommitLog)上,因此存在以下问题: 1. Broker 组内的副本数必须是 3 副本及以上才有切换能力,因此部署的最低成本是有上升的。 2. Raft 多数派限制导致三副本副本必须两副本响应才能返回,五副本需要三副本才能返回,因此 ACK 是不够灵活的,这也导致“发送延迟低”和“副本冗余小”两种要求很难做到。 3. 由于存储复制链路用的是 OpenMessaging DLedger 库,导致 RocketMQ 原生的一些存储能力没办法利用,包括像 TransientPool、零拷贝的能力,如果要在 Raft 模式下使用的话,就需要移植一遍到 DLedger 库,开发特性以及 bug 修复也需要做两次,这样的维护和开发成本是非常高的。 此外,将选举逻辑嵌入数据链路中可能会引发一连串的问题,这直接与我们追求的高可用和稳定性目标背道而驰。以选举发生在数据链路中的假设情景为起点,我们可以设想一个由多个节点构成的存储集群,其中节点需要定期进行选举来决定谁负责数据流的管理任务。在这种设计下,选举不仅是控制面的一部分,而且直接影响数据链路的稳定性。一旦发生选举失败或不一致的情况,整个数据链路可能会受阻,导致写入能力的丧失或数据丢失。 但是我们将目光看向 PolarStore 时就能发现,它的设计思想包含了“控制面和数据面分离”:数据面操作仅依赖于本地缓存的全量元数据,而对控制面的依赖最小化。这种设计的优势在于即使控制面完全不可用,数据面依然能够依据本地缓存维持正常的读写操作。在这种情况下,控制面的选举机制永远不会影响到数据面的可用性。这种分离架构为存储系统带来了相当强的鲁棒性,即使在遭遇故障时也能够保持业务的连续性。 总而言之,将选举逻辑与数据链路解耦是保障存储系统高可用性和稳定性的关键。通过将控制面的复杂性和潜在故障隔离,可以确保即使在面临控制面故障时,数据面依然能够保持其核心功能,从而为用户提供持续的服务。这种健壮的设计理念在现代分布式存储系统中是至关重要的——在控制面遭遇问题时,数据面能够以最小的影响继续运作。 这个例子告诉我们,数据面的可用性如果和控制面解耦,那么控制面挂掉对数据面的影响很轻微。否则,要么要不断去提高控制面的可用性,要么就要接受故障的级联发生 [7]。这也是我们后文中 RocketMQ 的演进方向。 现在的 Raft Controller:控制面/数据面分离 上文中提到,我们以 DLedger 的形式将 Raft 引入了 RocketMQ,但这种引入实际上是给了 CommitLog 选举的能力。这种设计固然直接有效,能够直接赋予一致性给最重要的组件。但是这样的设计也让选举、复制的过程被耦合到了一起。当二者耦合时,每次的选举、拷贝便都强依赖 DLedger 的 Raft 实现,这对于未来的扩展性是非常不友好的。 那么,有没有更可靠、更灵活的解决方案呢?我们不妨把目光转向学术界,看看他们的灵感。在 OSDI' 20 上,Meta 公司管控平面元数据存储统一平台 Delos [8] 相关论文获得了的 Best Paper Award。这篇论文提供了一个全新的视角与思路来解决选主过程中“控制面与数据面耦合”的问题:虚拟共识(Virtual Consensus)。它在论文中描述了在生产环境中实现在线切换共识协议的工作。这种设计旨在通过虚拟化共享日志 API 来实现虚拟化共识,从而允许服务在不停机的情况下更改共识协议。 论文的出发点是,在生产环境中,系统往往高度集成了共识机制,因此更换共识协议会涉及到非常复杂且深入的系统改动。 以前文提到的 DLedger CommitLog 为例,其分布式共识协议中的数据流(负责容错)和控制流(负责同步共识组配置)是密切相关的。在这种紧密耦合的系统中进行修改是极其困难的,这就使得开发和实施新共识协议的代价变得相当昂贵,甚至单纯的更新微小特性都面临重大挑战。更不用提在这种环境下引入更先进的共识算法(未来如果有的话),这必然花费相当重大的成本。 相比之下,Delos 提出了一种新方法,通过分离控制层和数据层来克服这些挑战。在这个架构中,控制层负责领导选举,而数据层则由 VirtualLog 和下层的 Loglets 组成,用于管理数据。VirtualLog 提供了一个共享日志的抽象层,将不同 Loglets(代表不同共识算法实例)串联起来。这种抽象层在 VirtualLog 和各个 Loglet 之间建立日志条目的映射关系。要切换到新的共识协议时,简单地指示 VirtualLog 将新的日志条目交由新 Loglet 处理以达成共识即可。这种设计实现了共识协议的无缝切换,并显著降低了更换共识协议的复杂性和成本。 (图 5:Delos 设计架构示意图,控制面和数据面被分开,以 VirtualLog 的形式进行协作) 这种设计既然已经在学术界开诚布公,那 RocketMQ 也可以在工业界将其落地并作验证。在 RocketMQ 5.0 中,我们提出了 Raft Controller 的概念:仅在上层协议中使用 Raft 与其它选主算法,而下层数据链路的复制则由 Broker 中的一套数据复制算法负责,用于响应上层的选主结果。 这个设计理念在行业中其实并不罕见,反而已经成为了一种成熟且广泛被验证的实践。以 Apache Kafka 为例,这个高性能的消息传递系统采用了分层的架构策略,在早期版本中使用了 ZooKeeper 来构建其元数据的控制平面,这一控制平面负责管理集群的状态和元数据信息。随着 Kafka 的发展,新版本引入了自研的 KRaft 模式,进一步内部化并提升了元数据管理。此外,Kafka 的 ISR(InSync Replicas) [15] 协议承担了数据传输的重任,提供了高吞吐量和可配置的复制机制,确保了数据平面的灵活性和可靠性。 同样地,Apache BookKeeper,一个低延迟且高吞吐的存储服务,也采用了类似的架构思想。它利用 ZooKeeper 来管理控制平面,包括维护一致的服务状态和元数据信息。在数据平面方面,BookKeeper 利用其自研的 Quorum 协议来处理写操作,并确保读操作的低延迟性能。 类似这种设计,我们的 Broker 不再自己负责自己的选举,而是由上层 Controller 对下层的角色进行指示,下层根据指示结果进行数据的拷贝,从而达到选举与复制分离的目的。 (图 6:来源于 ATA 文章《全新 RocketMQ 5.0 高可用设计解读》) 不过与 Delos 不同的是,我们在这里面其实有三层共识 —— Controller 间的共识协议(Raft),Controller 对 Broker 选主时的共识协议(SyncState Set),Broker 间复制时的共识协议(主备确认复制算法)。我们在这个过程中额外增加了 Controller 间的共识,以保证控制节点也是强一致的。这三种共识算法具体实现在这里不加以赘述,有兴趣的可以看我们 RocketMQ 社区中的 RIP31/32/34/44 几个说明文档。 这个设计也在我们被 ASE 23' 录用的论文 [9] 中得以体现:Controller 组件承担了切换链路中的核心角色,但是又不影响数据链路的正常运行,即便其面临宕机、夯机、网络分区等问题,也不会导致 broker 的数据丢失、不一致。我们在后续测试中甚至模拟了更加严苛的场景,例如 Controller 与 Broker 同时宕机、同时夯机、同时进行随机网络分区等等,我们的设计均有非常好的表现。 下面,我们对 RocketMQ 中的共识协议作展开,深入地剖析 RocketMQ 中的共识是如何实现的。 RocketMQ 中的共识协议 首先我们在这里放一张大图,用于描述 Raft Controller 具体是如何实现控制面、数据面的共识的。 (图 7:Raft Controller 具体设计架构及其运作原理) 上图中,绿色部分的是 Controller,也就是 RocketMQ 划分出来的控制面。它自身包含了两种共识算法,分别是保证 Controller 自身共识的 Raft 算法。以及保证 Broker 共识的选主算法,SyncState Set(后文简称 3S)算法,这个算法是我们参考 PacificA 算法 [10] 提出的一套用于数据面选主的分布式共识协议。红色部分是 Broker,也就是我们最核心的数据面,这里面忽略了 Broker 的其它存储结构,仅保留复制过程中实现共识的核心文件:epoch 文件。我们基于它实现了数据复制过程中的共识协议。 下面我们针对控制面和数据面的共识,分别进行阐述。 控制面共识 控制面共识共有两层: a. Raft Controller 自身的共识——用于保证 Controller 间的数据强一致性 b. 3S 算法——用于保证 Broker 选主结果的强一致。 下面对这两种共识算法分别进行解释。 Raft In Controller Raft 在 RocketMQ 5.0 前,只在 CommitLog 中存在,以 DLedger CommitLog 的方式向外透传角色。但是经过我们前文的分析,可以知道这种方式是有不容忽视的弊端在的:选举、复制过程强耦合,复制过程强依赖 DLedger 存储库,迭代难度高。因此,我们考虑将 Raft 的能力向上移动,让其用于在控制面中实现原数据强一致。这样一来,选举的过程便与日志复制的过程区分开了,而且每次选举的成本相对更低,只需要同步非常有限的数据量。 在 Controller 中,我们将 Raft 算法用于选举 Controller 中的 Active Controller,由它来负责处理数据面的选举、同步等任务,其余的 Controller 则只负责同步 Active Controller 的处理结果。这样的设计能够保证 Controller 本身也是高可用的,且保证了仅有一个 Controller 在处理 Broker 的选举事务。 在最新的 RocketMQ 中,在这一层共提供了两种 Controller 内的分布式共识算法的实现:DLedger 与 JRaft。这两种共识算法可以在 Controller 的配置文件中被非常简单地选择。本质上来说,这两者都是 Raft 算法的具体实现,只不过具体的实现方式有些差异: a. DLedger [2] 是一个基于 raft 协议的 commitlog 存储库,是一个 append only 的日志系统,早期针对 RocketMQ 的诸多场景有过相当多的适配。同时,它是一个轻量级的 java library。它对外提供的 API 非常简单,append 和 get。 b. JRaft 全称为 SOFAJRaft [3],它是一个基于 Raft 一致性算法的生产级高性能 Java 实现,支持 MULTIRAFTGROUP,适用于高负载低延迟的场景。SOFAJRaft 是从百度的 braft [11] 移植而来的,做了一些优化和改进。 这两种 Raft 的具体实现都对外提供了非常简单的 API 接口,所以我们可以把更多的精力放在处理 Active Controller 的事务上。 3S Algorithm 抛开 Controller 本身的共识算法,我们将目光聚焦于 Active Controller 在整个过程中起的作用,这也是我们控制面共识的核心——3S 算法。 3S 算法中的SyncState Set 概念与 Kafka 的 InSync Replica(ISR) [17] 机制类似,都参考了微软的 PacificA 算法。与以分区为维度的 ISR 不同,3S 算法以整个 Broker 的维度发起选举,且针对 RocketMQ 的需要选举场景作了系统的归纳。相比较来说,3S 算法更加简单,选举更加高效,面对大量分区场景能有更加强大的表现。 3S 算法主要作用在 Controller与 Broker 的交互过程中,Active Controller 会处理每个 Broker 的心跳与选举工作。和 Raft 类似的,3S 算法也有心跳机制来实现类似租约的功效——当 Master Broker 一定时长没有上报心跳,就可能触发重新选举。不过与 Raft 不同的是,3S 算法有一个共识处理的核心:Controller。这种中心化的设计能够让数据面的选主更加简单,达成共识更加迅速。 在这种设计下,Broker 的心跳不再向同级别(数据面)发送,而是统一向上(控制面)发送。发送选举请求后,由 Controller 来决定哪个 Broker 可以作为 Master 存在,而其它 Broker 自然退化为 Slave。Controller 的选择原则可以是多样的(同步进度、节点资源等指标),也可以是简单有效的(数据同步进度达到一定阈值),只需要这个节点位于 SyncState Set 中。这也是一种 Strong Leader 的形式,只不过和 Raft 不同的地方在于: Raft 像是小组作业,同学们(Broker)互相投票进行小组长的票选,而 3S 算法则由班主任(Controller)根据举手快慢直接任命。 (图 8:多个 Broker 集群向 Active Controller 汇报集群内的主备角色以及同步情况) 如上图所示,三个 Broker 集群中的 Leader 都会定期向 Active Controller 上报集群的同步状态: a. A 集群的所有节点的同步进度都很良好,因此 Leader 上报的 SyncState Set 是所有节点。 b. B 集群的 Follower2 可能刚刚启动,仍旧在同步历史消息,因此 SyncState Set 并不会包含它——当 Leader 宕机时,Controller 自然也不会选择它。 c. C 集群中,虽然 1 号 Leader 已经宕机,但是 Controller 迅速便能决定 SyncState Set 中的 3 号节点作为替代,提拔为主节点,整个集群便能正常运转,此时,即便 3 号节点又宕机,也能选择 2 号节点为主节点,不影响集群运行状态。 这种设计的好处在哪里呢?Raft 算法的实现原理其实是“投票”,同学间彼此平等,靠投票结果“少数服从多数”。因此,对于一个有 2n+1 节点的集群来说,Raft 最多只能容忍n个节点失效,至少需要保证有 n+1 个节点是持续运行的。但是 3S 算法有一个选举中心,每次选举的 RPC 都向上发送,它不需要得到其它节点的认可便可选举出一个节点。因此对于之前提到的 2n+1 节点的集群来说,最多能容忍 2n 个节点的失效,即副本的数量不需要超过副本总数的一半,不需要满足 “多数派” 原则。通常,副本数大于等于 2 即可, 如此,便在可靠性和吞吐量方面取得平衡。 上面的例子比较简明扼要地介绍了 3S 算法和 Raft 的关系与不同,可以说 3S 算法的设计思想来源于 Raft,但是在特定场景下又优于 Raft。 此外,3S 算法的共识以整个 Broker 为维度,因此我们对选主时机作了优化,有如下几种情形可能触发选举,括号内的红色是更加形象的描述,将选主过程具像化为选小组长的过程,以便理解: a. 控制面,Controller 主动发起 (班主任发起): i.HeartbeatManager 监听到有 broker 心跳失效。(班主任发现有小组同学退学了) ii.Controller 检测到有一组 SyncState Set 不存在 master。(班主任发现有组长虽然在名册里,但是旷课了) b. 数据面,Broker 发起将自己选为 master (同学毛遂自荐): i. Broker 向 controller 查询元数据时,没找到master信息。(同学定期检查小组情况,问班主任为啥没小组长) ii. Broker 向 controller 注册完后,仍未从 controller 获取到 master 信息。(同学报道后发现没小组长,汇报并自荐) c. 运维侧,通过 RocketMQ Admin Tools 发起,是运维能力的一部分 (校长直接任命)。 通过上述两方面优化,3S 算法在 RocketMQ 5.0 中,展露了非常强大的功能性,让 Controller 成为了高可用设计范式中不可或缺的组件。其实 3S 算法在实际使用场景中还有很多细节上的处理优化,能够容忍前文提到的更加严峻的场景:如控制面和数据面同时发生故障,且故障节点超过一半以上的场景。这部分结果会在后文的混沌实验中得以展示。 数据面共识 控制面通过 Raft 算法保障了 Controller 间的角色共识,以及通过 3S 算法保障了 Broker 中的角色共识。那么在 Broker 角色被确定后,其数据面该如何根据选举结果保障数据的强一致呢?这里的算法并不复杂,因此笔者从实现角度介绍一下 RocketMQ 的设计,RocketMQ 的数据面共识主要由下面两个组件构成: HAClient: 每个 Slave 的 HAService 中必备的 client,负责管理同步任务中的读、写操作。 HAConnection: 代表在 Master 中的 HA 连接,每个 connection 理论上对应一个 slave。在该 connection 类中存储了传输过程中的诸多内容,包括 channel、传输状态、当前传输位点等等信息。 为了更形象地描述清楚 RocketMQ 在这方面的设计结构,笔者绘制了下面这幅图,可以看出核心还是数据的传输过程,分别设计了一个 Reader 与一个 Writer: (图 9:数据面复制过程的具体实现,Master 与 Slave 分别设计,但选举完成后可互相切换) 这么简单的设计,是如何确保数据写入时的强一致的呢? 核心的共识其实存在于 HAConnection(也就是图中左下角那个深蓝色框)的建立、维护过程中。每个 Broker 集群的主节点都会维护和所有 Slave 的连接关系,并将其存于 Connection 表中,在每次 Slave 来请求代复制数据后,都会反馈复制的最后位点与结果,因此主节点也可以基于此来确定上报给 Controller 的 SyncState Set。在 HAConnection 的建立过程中,有一个确保数据一致性的 HandShake 阶段。这个阶段能够对 CommitLog 作截断,从而保障复制位点之前的所有数据都是强一致的。这个过程通过 epoch 文件的标记实现:Epoch 文件中包含了每一次选举的状态,每次选举完成后,主节点都会在 epoch 文件上留下自己的痕迹,即当前的选举代数 + 当前的初始位点。 从这里也可以看到,我们数据面的共识算法也有一些 Raft 的影子:Raft 算法在每次选举后也会给任期数自增一,这个任期数的大小决定了后续选主的权威性。而在数据面共识算法中,选主的结果已经认定,任期数被用于多次选主结果的共识表征——当任期数与日志位点一致时,代表这两台 broker 就选主这件事达成过一致,因此可以认为此前的数据是强一致的,只需要保证后续数据的强一致即可。为了方便理解,可以通过下图进行描述: (图 10:来源于文章《全新 RocketMQ 5.0 高可用设计解读》) 类似上面这张图,最上面那个方块长条实际是 RocketMQ 的日志存储形态 MappedFile。下面两条方块组成的长条分别是主和备的 commitlog,备节点会从后向前找到最大的 一致的位点,然后截断到这个位点,开始向后复制。这种复制在 RocketMQ 中有单独的一个 Service 去执行,因此主备节点的复制和选举过程其实是彻底解耦开的,只有当一个备节点尽可能跟上主节点时,这个备才会被纳入 SyncState Set,后续才有资格参加选举。 拥抱故障,把故障当作常态 俗话说的好,“空谈误国,实干兴邦”,设计究竟是先进还是冗余,需要通过各方面的检验。 对于 RocketMQ 这种大规模在生产中被使用的系统,我们必须模拟出足够接近现实情况的故障,才能检验其可用性在现实场景中究竟如何。在这里,我们需要引入一个新的概念——混沌工程。 混沌工程的原始定义 [13] 为:“Chaos engineering is the discipline of experimenting on a system in order to build confidence in the system's capability to withstand turbulent conditions in production. ” 从原始定义看,混沌工程实际上是一种软件工程方法,旨在通过在软件系统的生产环境中故意引入混乱来验证系统的可靠性。混沌工程的基本假设是,生产环境是复杂且不可预测的,而通过模拟各种失败,可以发现并解决潜在的问题。这种方式有助于确保系统能够在面临真实世界中的各种挑战时,持续并有效地运行。 大家都写过代码,也都深知一个精心设计过的系统总是能够巧妙通过各个测试样例,但是上线后总会遇到各种问题。因此对于一个系统来说,能够出色地通过复杂且不可预知的频繁故障的混沌工程的考验,而不是测试样例,才能说明这个系统是高可用的。 下面我们将详细介绍,我们为了验证 RocketMQ 的高可用,对其作过哪些“拷打”。 OpenChaos OpenChaos [14] 作为云原生场景量身定制的混沌“刑具”,位于 OpenMessaging 名下,托管于 Linux 基金会。目前,它支持以下平台的混沌测试:Apache RocketMQ、Apache Kafka、DLedger、Redis、Zookeeper、Etcd、Nacos。 目前 OpenChaos 支持注入多种故障类型,其中最主要的便是: 1. randompartition (fixedpartition):随机(固定)隔离节点与网络的其他部分。 2. randomloss:随机选定的节点丢失网络数据包。 3. randomkill (minorkill, majorkill, fixedkill):终止随机(少数、多数、固定)的进程并重启它们。 4. randomsuspend (minorsuspend, majorsuspend, fixedsuspend):使用 SIGSTOP/SIGCONT 暂停随机(少数、多数、固定)的节点。 在实际场景中,最常见的故障就是这四种:网络分区、丢包、宕机、夯机。此外,OpenChaos 还支持其它更复杂的特定场景,例如 Ring(每个节点能够看到大多数其他节点,但没有节点能看到与任何其他节点相同的多数节点)和 Bridge(网络一分为二,但保留中间的一个节点,该节点与两边的组件保有不间断的双向连通性),形成条件非常严苛,而且它们阻碍共识生效的原理都是“通过阻碍各节点间的可见性,来避免形成全局多数派”,理论上来说,通过足够久的网络分区、丢包,也能模拟出这些情况,甚至更复杂的情况。 因此,我们注入了大量上面罗列的四种混沌故障,观察集群是否有出现消息丢失的情况,并统计了故障恢复时间。 具体测试场景 我们混沌测试的验证实验环境如下: a. namesrv 一台,内含 namesrv 进程,openchaos 的混沌测试进程也在该机器上启动,向 controller/broker 发出控制指令。 b. controller 三台,内含 controller 进程。 c. broker 三台,同属一个集群,分别为主备,内含 broker 进程。 上述 7 台机器的配置为,处理器:8 vCPU,内存:16 GiB,规格族:ecs.c7.2xlarge,公网带宽:5Mbps,内网带宽:5/ 最高 10 Gbps。 在测试中,我们设置了如下的若干种随机测试场景,每种场景都会持续至少 60 秒,且恢复后会保证 60 秒的时间间隔再注入下一次故障: a. 机器宕机,这个混沌故障注入通过 kill 9 命令实现,将会杀死范围内的随机进程。 i. Broker 节点,随机宕机一半以上的节点,至少保留一台 Broker 工作。 ii. Controller 节点,随机宕机一半以上的节点,以及全部宕机。iii. Broker+Controller 节点,随机宕机一半以上的节点。 b. 机器夯机,这个混沌故障注入通过kill SIGSTOP 命令实现,模拟进程暂停的情况。 i. Broker 节点,随机夯机一半以上的节点,至少保留一台 Broker 工作。 ii. Controller 节点,随机夯机一半以上的节点,以及全部夯机。 iii. Broker+Controller 节点,随机夯机一半以上的节点。 c. 机器丢包,这个混沌故障注入通过 iptables 命令实现,可以指定机器间特定比例的丢包事件。 i. Broker 间随机丢包 80%。 ii. Controller 间随机丢包 80%。 iii. Broker 和 Controller 间随机丢包 80%。 d. 网络分区,这个混沌故障注入也是通过 iptables 命令实现,能够将节点间完全分区。 i. Broker 间随机网络分区。 ii. Controller 间随机网络分区。 iii. Broker 合 Controller 间随机网络分区。 实际测试场景、组别远多于上述罗列的所有故障场景,但是存在一些包含关系,例如单台 broker/controller 的启停,便不再单独罗列。此外我们均针对 Broker 的重要参数配置进行了交叉测试。测试的开关有:transientStorePoolEnable(是否使用直接内存),slaveReadEnable(备节点是否提供读消息能力)。我们还针对 Controller 的类型(DLedger/JRaft)也进行了分别的测试,每组场景的测试至少重复 5 次,每次至少持续 60 分钟。 实验结论 针对上述提出的所有场景,混沌测试的总时长至少有: 12(场景数) 2(Broker开关数) 2 (Controller类型) 5(每组测试次数) 60(单组时长) =14400 分钟。 由于设置的注入时长一分钟,恢复时长一分钟,因此至少共计注入故障 14400/2 = 7200 次(实际上注入时长、注入次数远多于上述统计值)。 在这些记录在册的测试结果中,有如下测试结论: a. RocketMQ 无消息丢失,数据在故障注入前后均保持强一致。 b. 恢复时长基本等于客户端的路由间隔时间,在路由及时的情况下,能够保证恢复 RTO 约等于 3 秒。 c. Controller 任意形式下的故障,包括宕机、夯机、网络故障等等,均不影响 Broker 的正常运转。 总结 本文总结了 RocketMQ 与 Raft 的前世今生。从最开始的忠实应用 Raft 发展 DLedger CommitLog,到如今的控制面/数据面分离,并分别基于 Raft 协议作专属于 RocketMQ 的演化。如今 RocketMQ 的高可用已经逐渐趋于成熟:基于三层共识协议,分别实现 Controller 间、Controller&Broker、Broker 间的共识。 在这种设计下,RocketMQ 的角色、数据共识被妥善地划分到了多个层次间,并能够彼此有序地协作。当选主与复制不再耦合,我们便能更好地腾出手脚发展各个层次间的共识协议——例如,当出现比 Raft 更加优秀的共识算法时,我们可以直接将其应用于 Controller 中,且对于我们的数据面无任何影响。 可以说 Raft 的设计给 RocketMQ 的高可用注入了非常多的养分,能够让 RocketMQ 在其基础上吸纳其设计思想,并作出适合自己的改进。RocketMQ 的共识算法与高可用设计 [9] 在 2023 年也得到了学术界的认可,被 CCFA 类学术会议 ASE 23' 录用。期待在未来能够出现更加优秀的共识算法,能够在 RocketMQ 的实际场景中被适配、发扬。 参考链接: _ _ _ [4] Diego Ongaro and John Ousterhout. 2014. In search of an understandable consensus algorithm. In Proceedings of the 2014 USENIX conference on USENIX Annual Technical Conference (USENIX ATC'14). USENIX Association, USA, 305–320. _ _ [8] Mahesh Balakrishnan, Jason Flinn, Chen Shen, Mihir Dharamshi, Ahmed Jafri, Xiao Shi, Santosh Ghosh, Hazem Hassan, Aaryaman Sagar, Rhed Shi, Jingming Liu, Filip Gruszczynski, Xianan Zhang, Huy Hoang, Ahmed Yossef, Francois Richard, and Yee Jiun Song. 2020. Virtual consensus in delos. In Proceedings of the 14th USENIX Conference on Operating Systems Design and Implementation (OSDI'20). USENIX Association, USA, Article 35, 617–632. [9] Juntao Ji, Rongtong Jin, Yubao Fu, Yinyou Gu, TsungHan Tsai, and Qingshan Lin. 2023. RocketHA: A High Availability Design Paradigm for Distributed LogBased Storage System. In Proceedings of the 38th IEEE/ACM International Conference on Automated Software Engineering (ASE 2023), Luxembourg, September 1115, 2023, pp. 18191824. IEEE. _ _ [12] Leslie Lamport. 2001. Paxos Made Simple. ACM SIGACT News (Distributed Computing Column) 32, 4 (Whole Number 121, December 2001), 5158. _ _ _
作者:季俊涛
#技术探索 #高可用

2024年8月13日

谈谈Apache RocketMQ 5.0 分级存储背后一些有挑战的技术优化
RocketMQ 5.0 提出了分级存储的新方案,经过数个版本的深度打磨,RocketMQ 的分级存储日渐成熟,并成为降低存储成本的重要特性之一。事实上,几乎所有涉及到存储的产品都会尝试转冷降本,如何针对消息队列的业务场景去做一些有挑战的技术优化, 是非常有意思的事。 这篇文章就跟大家探讨下,在消息系统这样一个数据密集型应用的模型下,技术架构选型的分析与权衡,以及分级存储实现与未来演进,让云计算的资源红利真正传达给用户。 1. 背景与需求 RocketMQ 诞生于 2012 年,存储节点采用 sharednothing 的架构读写自己的本地磁盘,单节点上不同 topic 的消息数据会顺序追加写 CommitLog 再异步构建多种索引,这种架构的高水平扩展能力和易维护性带来了非常强的竞争力。 随着存储技术的发展和各种百G网络的普及,RocketMQ 存储层的瓶颈逐渐显现,一方面是数据量的膨胀远快于单体硬件,另一方面存储介质速度和单位容量价格始终存在矛盾。在云原生和 Serverless 的技术趋势下,只有通过技术架构的演进才能彻底解决单机磁盘存储空间上限的问题,同时带来更灵活的弹性与成本的下降,做到 “鱼与熊掌兼得”。 在设计分级存储时,希望能在以下方面做出一些技术优势: 实时: RocketMQ 在消息场景下往往是一写多读的,热数据会被缓存在内存中,如果能做到 “准实时” 而非选用基于时间或容量的淘汰算法将数据转储,可以减小数据复制的开销,利于缩短故障恢复的 RTO。读取时产生冷读请求被重定向,数据取回不需要“解冻时间”,且流量会被严格限制以防止对热数据写入的影响。 弹性: sharednothing 架构虽然简单,缩容或替换节点的场景下待下线节点的数据无法被其他节点读取,节点需要保持相当长时间只读时间,待消费者消费完全部数据,或者执行复杂的迁移流程才能缩容,这种 “扩容很快,缩容很慢” 的形态一点都不云原生,更长久的消息保存能力也会放大这个问题。分级存储设计如果能通过 shareddisk (共享存储) 的方式让在线节点实现代理读取下线节点的数据,既能节约成本也能简化运维。 差异化: 廉价介质随机读写能力较差,类 LSM 的结构都需要大量的 compation 来压缩回收空间。在满足针对不同 topic 设置不同的生命周期(消息保留时间,TTL)等业务需求的前提下,结合消息系统数据不可变和有序的特点,RocketMQ 自身需要尽量少的做格式 “规整” 来避免反复合并的写放大,节约计算资源。 竞争力: 分级存储还应考虑归档压缩,数据导出,列式存储和交互式查询分析能力等高阶技术演进。 2. 技术架构选型 2.1. 不同视角 不妨让我们站在一个新的视角看问题,消息系统对用户暴露的是收发消息,位点管理等一系列的 API,为用户提供了一种能够优雅处理动态数据流的方式,从这个角度说:消息系统拓宽了存储系统的边界。 其实服务端应用大多数是更底层 SQL,POSIX API 的封装,而封装的目的在于简化复杂度的同时,又实现了信息隐藏。 消息系统本身关注的是高可用,高吞吐和低成本,想尽量少的关心存储介质的选择和存储自身的系统升级,分片策略,迁移备份,进一步冷热分层压缩等问题,减少存储层的长期维护成本。 一个高效的、实现良好的存储层应该对不同存储后端有广泛的支持能力,消息系统的存储后端可以是本地磁盘,可以是各类数据库,也可以是分布式文件系统,对象存储,他们是可以轻松扩展的。 2.2. 存储后端调研 幸运的是,几乎所有的“分布式文件系统”或者“对象存储”都提供了“对象一旦上传或复制成功,即可立即读取”的强一致语义,就像 CAP 理论中的描述 “Every read receives the most recent write or an error” 保证了“分布式存储系统之内多副本之间的一致性”。对于应用来说,没有 “拜占庭错误” 是非常幸福的(本来有的数据变没了,破坏了存储节点的数据持久性),更容易做到“应用和分布式存储系统之间是一致的”,并显著减少应用的开发和维护成本。 常见的分布式文件系统有 Ali Pangu,HDFS,GlusterFS,Ceph,Lustre 等。对象存储有 Amazon S3,Aliyun OSS,Azure Blob Storage,Google Cloud Storage,OpenStack Swift 等。他们的简单对比如下: API 支持: 选用对象存储作为后端,通常无法像 HDFS 一样提供充分的 POSIX 能力支持,对于非 KV 型的操作往往存在一定性能问题,例如列出大量对象时需要数十秒,而在分布式文件系统中这类操作只需要毫秒甚至微秒。如果选用对象存储作为后端,弱化的 API 语义要求消息系统本身能够有序管理好这些对象的元数据。 容量与水平扩展: 对于云产品或者大规模企业的存储底座来说,以 HDFS 为例,当集群节点超过数百台,文件达到数亿量级以上时,NameNode 会产生性能瓶颈。一旦底层存储由于容量可用区等因素出现多套存储集群,这种 “本质复杂度” 在一定程度上削弱了 shareddisk 的架构简单性,并将这种复杂度向上传递给应用,影响消息产品本身的多租,迁移,容灾设计。典型的情况就是大型企业为了减少爆炸半径,往往会部署多套 K8s 并定制上层的 Cluster Federation(联邦)。 成本: 以国内云厂商官网公开的典型目录价为例: 本地磁盘,无副本 0.060.08 元/GB/月 云盘,SSD 1元/GB/月,高效云盘 0.35 元/GB/月 对象存储单 AZ 版 0.12 元/GB/月,多 AZ 版本 0.15 元/GB/月,低频 0.08 元/GB/月 分布式文件系统,如盘古 HDFS 接口,支持进一步转冷和 EC。 生态链: 对象存储和类 HDFS 都有足够多的经过生产验证的工具,监控报警层面对象存储的支持更产品化。 2.3. 直写还是转写 方案里,备受瞩目的点在于选择直写还是转写,我认为他们不冲突,两个方案 “可以分开有,都可以做强”。 多年来 RocketMQ 运行在基于本地存储的系统中,本地磁盘通常 IOPS 较高,成本较低但可靠性较差,大规模的生产实践中遇到的问题包括但不限于垂直扩容较难,坏盘,宿主机故障等。 直写: 指使用高可用的存储替换本地块存储,例如使用云盘多点挂载(分布式块存储形态,透明 rdma)或者直写分布式文件系统(下文简称 DFS)作为存储后端,此时主备节点可以共享存储,broker 的高可用中的数据流同步简化为只同步位点,在很大程度上减化了 RocketMQ 高可用的实现。 转写: 对于大部分数据密集型应用,出于故障恢复的考虑必须实时写日志,意味着无法对数据很好的进行攒批压缩,如果仅使用廉价介质,会带来更高的延迟以及更多的内存使用,无法满足生产需要。一个典型思路就是热数据使用容量小的高速介质先顺序写,compation 后转储到更廉价的存储系统中。 直写的目的是池化存储,转写的目的是降低数据的长期保存成本, 所以我认为一个理想的终态可以是两者的结合。RocketMQ 自己来做数据转冷,那有同学就会提出反问了,如果让 DFS 自身支持透明转冷,岂不是更好? 我的理解是 RocketMQ 希望在转冷这个动作时,能够做一些消息系统内部的格式变化来加速冷数据的读取,减少 IO 次数,配置不同 TTL 等。 相对于通用算法,消息系统自身对如何更好的压缩数据和加速读取的细节更加了解。 而且主动转冷的方案在审计和入湖的一些场景下,也可以被用于服务端批量转储数据到不同的平台,到 NoSQL 系统,到 ES,Click House,到对象存储,这一切是如此的自然~ 2.4. 技术架构演进 那么分级存储是一个尽善尽美的最终解决方案吗? 理想很美好,让我们来看一组典型生产场景的数据。 RocketMQ 在使用块存储时,存储节点存储成本大约会占到 30%50%。开启分级存储时,由于数据转储会产生一定的计算开销,主要包括数据复制,数据编解码,crc 校验等,不同场景下计算成本会上升 10%40%,通过换算,我们发现存储节点的总体拥有成本节约了 30% 左右。 考虑到商业和开源技术架构的一致性,选择了先实现转写模式,热数据的存储成本中随着存储空间显著减小,这能够更直接的降低存储成本,在我们充分建设好当前的转写逻辑时再将热数据的 WAL 机制和索引构建移植过来,实现基于分布式系统的直写技术,这种分阶段迭代会更加简明高效,这个阶段我们更加关注通用性和可用性。 可移植性: 直写分布式系统通常需要依赖特定 sdk,配合 rdma 等技术来降低延迟,对应用不完全透明,运维,人力,技术复杂度都有一定上升。保留成熟的本地存储,只需要实现存储插件就可以轻松的切换多种存储后端,不针对 IaaS 做深度绑定在可移植性上会有一定优势。 延迟与性能: 直写模式下存储紧密结合,应用层 ha 的简化也能降低延迟(写多数派成功才被消费者可见),但无论写云盘或者本地磁盘(同区域)延迟都会小于跨可用区的延迟,存储延迟在热数据收发链路不是瓶颈。 可用性: 存储后端往往都有复杂的容错和故障转移策略,直写与转写模式在公有云下可用性都满足诉求。考虑到转写模式下系统是弱依赖二级存储的,更适合开源与非公共云场景。 我们为什么不进一步压缩块存储的磁盘容量,做到几乎极致的成本呢? 事实上,在分级存储的场景下,一味的追求过小的本地磁盘容量价值不大。 主要有以下原因: 故障冗余,消息队列作为基础设施中重要的一环,稳定性高于一切。对象存储本身可用性较高,如果遇到网络波动等问题时,使用对象存储作为主存储,非常容易产生反压导致热数据无法写入, 而热数据属于在线生产业务,这对于可用性的影响是致命的。 过小的本地磁盘,在价格上没有明显的优势。 众所周知,云计算是注重普惠和公平的, 如果选用 50G 左右的块存储,又需要等价 150G 的 ESSD 级别的块存储能提供的 IOPS,则其单位成本几乎是普通块存储的数倍。 本地磁盘容量充足的情况下,上传时能够更好的通过 “攒批” 减少对象存储的请求费用。读取时能够对“温热” 数据提供更低的延迟和节约读取成本。 仅使用对象存储,难以对齐 RocketMQ 当前已经存在的丰富特性, 例如用于问题排查的随机消息索引,定时消息特性等,如果为了节约少量成本,极大的削弱基础设施的能力,反向要求业务方自建复杂的中间件体系是得不偿失的。 3. 分级存储的数据模型与实现 3.1. 模型与抽象 RocketMQ 本地存储数据模型如下: MappedFile:单个真实文件的句柄,也可以理解为 handle 或者说 fd,通过 mmap 实现内存映射文件。是一个 AppendOnly 的定长字节流语义的 Stream,支持字节粒度的追加写、随机读。每个 MappedFile 拥有自己的类型,写位点,创建更新时间等元数据。 MappedFileQueue:可以看做是零个或多个定长 MappedFile 组成的链表,提供了流的无边界语义。Queue 中最多只有最后一个文件可以是 Unseal 的状态(可写)。前面的文件都必须都是 Sealed 状态(只读),Seal 操作完成后 MappedFile 是 immutable(不可变)的。 CommitLog:MappedFileQueue 的封装,每个 “格子” 存储一条序列化的消息到无界的流中。 ConsumeQueue:顺序索引,指向 CommitLog 中消息在 FileQueue 中的偏移量(offset)。 RocketMQ 分级存储提供的数据模型和本地模型类似,改变了 CommitLog 和 ConsumeQueue 的概念: TieredFileSegment:和 MappedFile 类似,描述一个分级存储系统中文件的句柄。 TieredFlatFile:和 MappedFileQueue 类似。 TieredCommitLog 和本地 CommitLog 混合写不同,按照单个 Topic 单个队列的粒度拆分多条 CommitLog。 TieredConsumeQueue 指向 TieredCommitLog 偏移量的一个索引,是严格连续递增的。实际索引的位置会从指向的 CommitLog 的位置改为 TieredCommitLog 的偏移量。 CompositeFlatFile:组合 TieredCommitLog 和 TieredConsumeQueue 对象,并提供概念的封装。 3.2. 消息上传流程 RocketMQ 的存储实现了一个 Pipeline,类似于拦截器链,Netty 的 handler 链,读写请求会经过这个 Pipeline 的多个处理器。 Dispatcher 的概念是指为写入的数据构建索引,在分级存储模块初始化时,会创建 TieredDispatcher 注册为 CommitLog 的 dispatcher 链的一个处理器。每当有消息发送到 Broker 会调用 TieredDispatcher 进行消息分发。下面我们来追踪单条消息进入存储层的流程: 1. 消息被顺序追加到本地 commitlog 并更新本地 max offset(图中黄色部分),为了防止宕机时多副本产生“读摆动”,多副本中多数派的最小位点会作为“低水位”被确认,这个位点被称为 commit offset(图中 2500)。换句话说,commit offset 与 max offset 之间的数据是正在等待多副本同步的。 2. 当 commit offset = message offset 之后,消息会被上传到二级存储的 commitlog 的缓存中(绿色部分)并更新这个队列的 max offset。 3. 消息的索引会被追加到这个队列的 consume queue 中并更新 consume queue 的 max offset。 4. 一旦 commitlog 中缓存大小超过阈值或者等待达到一定时间,消息的缓存将被上传至 commitlog,之后才会将索引信息提交,这里有一个隐含的数据依赖,使索引被晚于原始数据更新。这个机制保证了所有 cq 索引中的数据都能在 commitlog 中找到。宕机场景下,分级存储中的 commitlog 可能会重复构建,此时没有 cq 指向这段数据。由于文件本身还是被使用 Queue 的模型管理的,使得整段数据在达到 TTL 时能被回收,此时并不会产生数据流的“泄漏”。 5. 当索引也上传完成的时候,更新分级存储中的 commit offset(绿色部分被提交)。 6. 系统重启或者宕机时,会选择多个 dispatcher 的最小位点向 max offset 重新分发,确保数据不丢失。 在实际执行中,上传部分由三组线程协同工作。 1. store dispatch 线程,由于该线程负责本地 cq 的分发,我们不能长时间阻塞该线程,否则会影响消息进入本地存储的“可见性延迟”。因此 store dispatch 每次只会尝试对拆分后的文件短暂加锁,如果加锁成功,将消息数据放入拆分后的 commitlog 文件的缓冲区则立即退出,该操作不会阻塞。若获取锁失败则立即返回。 2. store compensate 线程组,负责对本地 cq 进行定时扫描,当写入压力较高时,步骤 1 可能获取锁失败,这个环节会批量的将落后的数据放入 commitlog 中。原始数据被放入后会将 dispatch request 放入 write map。 3. build cq index 线程。write map 和 read map 是一个双缓冲队列的设计,该线程负责将 read map 中的数据构建 cq 并上传。如果 read map 为空,则交换缓冲区,这个双缓冲队列在多个线程共享访问时减少了互斥和竞争操作。 各类存储系统的缓冲攒批策略大同小异,而线上的 topic 写入流量往往是存在热点的,根据经典的二八原则,RocketMQ 分级存储模块目前采用了 “达到一定数据量”,“达到一定时间”两者取其小的合并方式。 这种方式简单可靠,对于大流量的 topic 很容易就可以达到批的最小数据量,对于流量较低的 topic 也不会占用过多的内存。从而减少了对象存储的请求数,其开销主要包括 restful 协议请求头,签名和传输等。诚然,攒批的逻辑仍然存在较大的优化空间,例如 IOT,数据分片同步等各个 topic 流量较为平均的场景使用类似 “滑动窗口” 的加权平均算法,或者基于信任值的流量控制策略可以更好的权衡延迟和吞吐。 3.3. NonStopWrite 特性 NonStopWrite 模型实际上是一致性模型的一部分。实际生产中,后端分布式存储系统的断连和网络问题偶尔会不可避免,而 Append 模型实际上一种强顺序的模型,参考 HDFS 的 23 异步写,我们提出了一种基于 Append 和 Put 的混合模型。 例如:对于如下图片中的 stream,commit / confirm offset = 150,max offset = 200。此时写出缓冲区中的数据包括 150200 的 uncommitted 部分,还有 200 以后源源不断的写入的新数据。 假设后端存储系统支持原子性写入,单个上传请求的数据内容是 150200 这个区间,当单次上传失败时,我们需要向服务端查询上一次写入的位点并进行错误处理。 如果返回的长度是 150,说明上传失败,应用需要重传 buffer。 如果返回的长度是 200,说明前一次上传成功但没有收到成功的 response,提升 commit offset 至 200。 而另一种解决方案是,使用 NonStopWrite 机制立刻新切换一个文件,以 150 作为文件名,立刻重传 150 至 200 的数据,如果有新的数据也可以立刻与这些数据一起上传,我们发现混合模型存在显著优势: 对于绝大部分没有收到成功的响应时,上传是失败的而不是超时,立刻切换文件可以不去 check in 文件长度,减少 rpc 数量。 立刻重传不会阻塞后续新的数据上传,不容易由于后端数据无法写出造成反压,导致前端写失败。 无论 150200 这段数据在第一个文件是到底是写成功还是失败都无关紧要,因为不会去读取这段数据。尤其是对于不支持请求粒度原子写入的模型来说,如果上一次请求的结果是 180,那么错误处理将会非常复杂。 3.4. 随机索引重排 21 年的时候,我第一次听到用“读扩散”或者“写扩散”来描述一个设计方案, 这两个词简洁的概括了应用性能设计的本质。各种业务场景下,我们总是选择通过读写扩散, 选择通过格式的变化,将数据额外转储到一份性能更好或者更廉价的存储, 或者通过读扩散减少数据冗余(减少索引提高了平均查询代价)。 RocketMQ 会在先内存构建基于 hash 的持久化索引文件 IndexFile(非 AppendOnly),再通过 mmap 异步的将数据持久化到磁盘。这个文件是为了支持用户通过 key,消息 ID 等信息来追踪一条消息。 对于单条消息会先计算 hash(topickey) % slot_num 选择 hash slot (黄色部分) 作为随机索引的指针,对象索引本身会附加到 index item 中,hash slot 使用“哈希拉链”的方式解决冲突,这样便形成了一条当前 slot 按照时间存入的倒序的链表。不难发现,查询时需要多次随机读取链表节点。 由于冷存储的 IOPS 代价是非常昂贵的,在设计上我们希望可以面向查询进行优化。新的文件结构类似于维护没有 GC 和只有一次 compation 的 LSM 树,数据结构的调整如下: 1. 等待本地一个 IndexFile 完全写满,规避修改操作,在高 IOPS 的存储介质上异步 compation,完成后删除原来的文件。 2. 从冷存储查询延迟高,而单次返回的数据量大小(不太大的场景)并不会明显改变延迟。compation 时优化数据结构,做到用一次查询连续的一段数据替换多次随机点查。 3. hash slot 的指向的 List 是连续的,查询时可以根据 hash slot 中的 item offset 和 item size 一次取出所有 hashcode 相同的记录并在内存中过滤。 3.5. 消息读取流程 3.5.1 读取策略 读取是写入的逆过程,优先从哪里取回想要的数据必然存在很多的工程考虑与权衡。如图所示,近期的数据被缓存在内存中,稍久远的数据存在与内存和二级存储上,更久远的数据仅存在于二级存储。当被访问的数据存在于内存中,由于内存的速度快速存储介质,直接将这部分数据通过网络写会给客户端即可。如果被访问的数据如图中 request 的指向,存在于本地磁盘又存在于二级存储,此时应该根据一二级存储的特性综合权衡请求落到哪一层。 有两种典型的想法: 1. 数据存储被视为多级缓存,越上层的介质随机读写速度快,请求优先向上层存储进行查询,当内存中不存在了就查询本地磁盘,如果还不存在才向二级存储查询。 2. 由于在转冷时主动对数据做了 compation,从二级存储读取的数据是连续的,此时可以把更宝贵一级存储的 IOPS 留给在线业务。 RocketMQ 的分级存储将这个选择抽象为了读取策略,通过请求中的逻辑位点(queue offset)判断数据处于哪个区间,再根据具体的策略进行选择: DISABLE:禁止从多级存储中读取消息,可能是数据源不支持。 NOT_IN_DISK:不在一级存储的的消息都会从二级存储中读取。 NOT_IN_MEM:不在内存中的消息即冷数据从多级存储读取。 FORCE:强制所有消息从多级存储中读取,目前仅供测试使用。 3.5.2 预读设计 TieredMessageFetcher 是 RocketMQ 分级存储取回数据的具体实现。 为了加速从二级存储读取的速度和减少整体上对二级存储请求数,采用了预读缓存的设计: 即 TieredMessageFetcher 读取消息时会预读更多的消息数据,预读缓存的设计参考了 TCP Tahoe 拥塞控制算法,每次预读的消息量类似拥塞窗口采用加法增、乘法减的流量控制机制。 加法增:从最小窗口开始,每次增加等同于客户端 batchSize 的消息量。 乘法减:当缓存的消息超过了缓存过期时间仍未被全部拉取,此时一般是客户端缓存满,消息数据反压到服务端,在清理缓存的同时会将下次预读消息量减半。 此外,在客户端消费速度较快时,向二级存储读取的消息量较大,此时会使用分段策略并发取回数据。 3.6. 定时消息的分级存储 除了普通消息,RocketMQ 支持设置未来几十天的长定时消息,而这部分数据严重挤占了热数据的存储空间。 RocketMQ 实现了基于本地文件系统的时间轮,整体设计如左侧所示。单节点上所有的定时消息会先写入 rmq_sys_wheel_timer 的系统 topic,进入时间轮,出队后这些消息的 topic 会被还原为真实的业务 topic。 “从磁盘读取数据”和“将消息索引放入时间轮”这两个动作涉及到 IO 与计算,为了减少这两个阶段的锁竞争引入了 Enqueue 作为中转的等待队列,EnqueuGet 和 EnqueuePut 分别负责写入和读取数据,这个设计简单可靠。 不难发现,所有的消息都会进入时间轮,这也是挤占存储空间的根本原因。 写入时,RocketMQ 的分级存储定时消息针对 EnqueuePut 做了一个分流,对于大于当前时间数小时的消息会被写入到基于分级存储的 TimerFlatFile 文件中,我们维护了一个 ConcurrentSkipListMap timerFlatFileTable; 每间隔 1 小时,设置一个 TimerFlatFile,对于 T+n 至 T+n+1 的定时消息,会先被混合追加到 T+n 所对应的文件中。 读取时,当前时间 + 1 小时的消息将被提前出队,这些消息又会重新进入本地 TimerStore 的系统 topic 中/此时,由于定时时间都是将来一小段时间的,他们不再会进入时间轮的结构中。 在这个设计上有一些工程性的考虑: timerFlatFileTable 中的 Key 很多,会不会让分级存储上的数据碎片化?分布式文件系统底层一般使用类 LSM 结构,RocketMQ 只关心 LBA 结构,可以通过优化 Enqueue 的 buffer 让写分级存储时数据达到攒批的效果。 可靠的位点,Enqueue 到“时间轮”和 timerFlatFileTable 可以共用一个 commit offset。对于单条消息来说,只要它进入时间轮或者被上传成功,我们就认为一条消息已经持久化了。由于更新到二级存储本身需要一些攒批缓冲的过程,会延迟 commit offset 的更新,但是这个缓冲时间是可控的。 我们发现偶尔本地存储转储到二级存储会较慢,使用双缓冲队列实现读写分离(如图片中绿色部分)此时消息被放入写缓存,随后转入读缓存队列,最后进入上传流程。 4. 分级存储企业级竞争力 4.1. 冷数据的压缩与归档 压缩是一种经典的时间与空间交换的权衡,其目的在于通过较小的 CPU 开销,实现更少的磁盘占用或网络 I/O 传输。目前,RocketMQ 的热存储在考虑延迟的情况下,仅对单条大于 4K 的消息进行单条压缩存储。对于冷存储服务其实可以做两个层面的压缩与归档处理。 消息队列业务层面,对于大多数业务 Topic,其 Body 通常存在相似性,可将其压缩至原大小的几分之一至几十分之一。 底层存储层面,使用 EC 纠删码,数据被分成若干个数据块,然后再根据一定的算法,生成一些冗余块。当数据丢失时,可以使用其余的数据块和冗余块来恢复丢失的数据块,从而保证数据的完整性和可靠性。典型的 EC 算法后存储空间的使用可以降低到 1.375 副本。 业界也有一些基于 FPGA 实现存储压缩加速的案例,我们将持续探索这方面的尝试。 4.2. 原生的只读挂载能力实现 Serverless 业界对 Serverless 有不同的理解,过去 RocketMQ 多节点之间不共享存储,导致“扩容快,缩容慢”,例如 A 机器需要下线,则必须等普通消息消费完,定时消息全部出队才能进行运维操作。分级存储设计通过 shareddisk的方式实现跨节点代理读取下线节点的数据,如右图所示:A 的数据此时可以被 B 节点读取,彻底释放了 A 的计算资源和一级存储资源。 这种缩容的主要流程如下: 1. RocketMQ 实现了一个简单的选举算法,正常情况下集群内每一个节点都持有对自己数据独占的写锁。 2. 待下线的节点做优雅下线,确保近期定时消息,事务消息,pop retry 消息都已被完整处理。上传自己的元数据信息到共享的二级存储,并释放自己的写锁。 3. 集群使用一定的负载均衡算法,新的节点获取写锁,将该 Broker 的数据以只读的形式挂载。 4. 将原来节点的元数据注册到 NameServer 对客户端暴露。 5. 对于原节点的写请求,例如位点更新,将在内存中处理并周期性快照到共享存储中。 5. 总结 RocketMQ 的存储在云原生时代的演进中遇到了更多有趣的场景和挑战,这是一个需要全链路调优的复杂工程。出于可移植性和通用性的考虑,我们还没有非常有效的使用 DPDK + SPDK + RDMA 这些新颖的技术,但我们解决了许多工程实践中会遇到的问题并构建了整个分级存储的框架。在后续的发展中,我们会推出更多的存储后端实现,针对延迟和吞吐量等细节做深度优化。 参考文档: [1] Chang, F., Dean, J., Ghemawat, S., et al. Bigtable: A distributed storage system for structured data. ACM Transactions on Computer Systems, 2008, 26(2): 4.[2] Liu, Y., Zhang, K., & Spear, M. DynamicSized Nonblocking Hash Tables. In Proceedings of the ACM Symposium on Principles of Distributed Computing, 2014.[3] Ongaro, D., & Ousterhout, J. In Search of an Understandable Consensus Algorithm. Proceedings of the USENIX Conference on Operating Systems Design and Implementation, 2014, 305320.[4] Apache RocketMQ. GitHub, _https://github.com/apache/rocketmq_[5] Verbitski, A., Gupta, A., Saha, D., et al. Amazon aurora: On avoiding distributed consensus for i/os, commits, and membership changes. In Proceedings of the 2018 International Conference on Management of Data, 2018, 789796.[6] Antonopoulos, P., Budovski, A., Diaconu, C., et al. Socrates: The new sql server in the cloud. In Proceedings of the 2019 International Conference on Management of Data, 2019, 17431756.[7] Li, Q. More Than Capacity: Performanceoriented Evolution of Pangu in Alibaba. Fast 2023_https://www.usenix.org/conference/fast23/presentation/liqiangdeployed_[8] Lu, S. Perseus: A FailSlow Detection Framework for Cloud Storage Systems. Fast 2023
作者:斜阳
#技术探索 #云原生

2024年8月13日

Apache RocketMQ 批处理模型演进之路
RocketMQ 的目标,是致力于打造一个消息、事件、流一体的超融合处理平台。这意味着它需要满足各个场景下各式各样的要求,而批量处理则是流计算领域对于极致吞吐量要求的经典解法,这当然也意味着 RocketMQ 也有一套属于自己风格的批处理模型。 至于什么样的批量模型才叫“属于自己风格”呢,且听我娓娓道来。 什么是批处理 首先,既然谈 RocketMQ 的批处理模型,那就得聊聊什么是“批处理”,以及为什么批处理是极致吞吐量要求下的经典解法。在我看来,批处理是一种泛化的方法论,它处在各个系统的方方面面,无论是传统工业还是互联网,甚至在日常生活中,都能看到它的身影。 批处理的核心思想是将多个任务或数据集合在一起,进行统一处理。这种方法的优势在于可以充分利用系统资源,减少任务切换带来的开销,从而提高整体效率。比如在工业制造中,工厂通常会将相同类型的零部件批量生产,以降低生产成本和提高生产速度。在互联网领域,批处理则表现为批量数据的存储、传输和处理,以优化性能和提升系统吞吐量。 批处理在极致吞吐量需求下的应用,更加显著。例如,在大数据分析中,海量的数据需要集中处理才能得出有意义的结果。如果逐条处理数据,不仅效率低下,还可能造成系统瓶颈。通过批处理,可以将数据划分为若干批次,在预定的时间窗口内统一处理,从而提高系统的并行处理能力,提升整体吞吐量。 此外,批处理其实并不意味着牺牲延时,就比如在 CPU Cache 中,对单个字节的操作无论如何时间上都是会优于多个字节,但是这样的比较并没有意义,因为延时的感知并不是无穷小的,用户常常并不关心 CPU 执行一条指令需要花多长时间,而是执行完单个“任务/作业”需要多久,在宏观的概念上,反而批处理具有更低的延时。 RocketMQ 批处理模型演进 接下来我们看看,RocketMQ 与批处理的“如胶似漆、形影相随”吧,其实在 RocketMQ 的诞生之初,就已经埋下了批处理的种子,这颗种子,我们暂且叫它——早期的批处理模型。 早期批处理模型 下图,是作为用户视角上感知比较强的老三样,分别是 Producer、Consumer、Broker: 而早期批处理模型,实际上只和 Producer、Broker 有关,在这条链路上会有批量消息的概念,当消息到达 Broker 后这个概念就会消失。基于这点我们来看具体是怎么回事。首先批量消息的源头实际上就是 Producer 端的 Send 接口,在大部分场景下,我们发送一条消息都会使用以下的形式去操作: ```java SendResult send(Message msg); ``` 非常地简明扼要,将一条消息发送到 Broker,如果我们要使用上早期的批处理模型,也只需要稍作修改: ```java SendResult send(Collection msgs) ``` 可以看到,将多条消息串成一个集合,然后依旧是调用 send 接口,就可以完成早期批处理模型的使用了(从用户侧视角看就已经 ok 了),就像下图一样,两军交战,谁火力更猛高下立判~ 那么真就到此为止了吗?当然不是,首先这里的集合是有讲究的,并不是随意将多条消息放在一起,就可以 send 出去的,它需要满足一些约束条件: 相同 Topic。 不能是 RetryTopic。 不能是定时消息。 相同 isWaitStoreMsgOK 标记。 这些约束条件暂时先不展开,因为就如同它字面意思一样浅显易懂,但是这也意味着它的使用并不是随心所欲的,有一定的学习成本,也有一定的开发要求,使用前需要根据这些约束条件自行分类,然后再装进“大炮”中点火发射。这里可能有人会问,这不是为难我胖虎吗?为什么要加这么多约束?是不是故意的?实际上并非如此,我们可以想象一下,假如我们是商家: 客户 A 买了两件物品,在发货阶段我们很自然的就可以将其打包在一起(将多个 Message 串成一个 ArrayList),然后一次性交给快递小哥给它 Send 出去,甚至还能省一笔邮费呢~ 客户 B 和客户 C 各买了一件物品,此时我效仿之前的行为打包到一起,然后告诉快递小哥这里面一个发到黑龙江,一个发到海南,然后掏出一笔邮费,然后。。。就没有然后了。 很显然,第二个场景很可能会收到快递小哥一个大大的白眼,这种事情理所应当的做不了,这也是为什么属于同一个 Collection 的消息必须要满足各种各样的约束条件了,在 Broker 实际收到一个“批量消息”时,会做以下处理: 首先它会根据这一批消息的某些属性,挑选出对应的队列,也就是上图中最底下的「p1、p2......」,在选定好队列之后,就可以进行后续的写入等操作了,这也是为什么必须要求相同 Topic,因为不同的 Topic 是没法选定同一个队列的。 接下来就到了上图所示流程,可以看到这里分别来了三个消息,分别是 《四条消息》《一条消息》《三条消息》,接下来他们会依次进入 unPack 流程,这个流程有点像序列化过程,因为从客户端发送上来的消息都是内存结构的,距离实际存储在文件系统中的结构还有一些不同。在 unPack 过程中,会分别解包成:四条消息、一条消息、三条消息;此时和连续 Send 八条消息是没有任何区别的,也就是在这一刻,批量消息的生命周期就走到了尽头,此刻往后,“众生平等、不分你我”。 也正是这个机制,Consumer 其实并不知道 Producer 发送的时候“到底是发射弓箭,还是点燃大炮”。这么做有个非常好的优点,那就是有着最高的兼容性,一切的一切好像和单条消息 Send 的经典用法没有任何区别,在这种情况下,每条消息都有最高的自由度,例如各自独立的 tag、独立的 keys、唯一的 msgId 等等,而基于这些所衍生出来的生态(例如消息轨迹)都是无缝衔接的。也就是说:只需要更换发送者使用的 Send 接口,就可以获得极大的发送性能提升,而消费者端无需任何改动。 索引构建流水线改造 我一向用词都非常的严谨,可以看到上一段的结尾:“获得极大的发送性能提升”,至于为什么这么讲,是因为距离整体系统的提升还有一些距离,也就是这一段的标题“索引构建流水线改造”。 首先我们要有一个共识,那就是对于消息队列这种系统,整体性能上限比值“消费/生产”应该要满足至少大于等于一,因为大部分情况下,我们的生产出来的消息至少应该被消费一次(否则直接都不用 Send 了岂不美哉)。 其实在以往,发送性能没有被拔高之前,它就是整个生产到消费链路上的短板,也就是说消费速率可以轻松超过生产速率,整个过程也就非常协调。but!在使用早期批处理模型后,生产速率的大幅度提升就暴露了另外一个问题,也就是会出现消费速率跟不上生产的情况,这种情况下,去谈整个系统的性能都是“无稽之谈”。 而出现消费速率短板的原因,还要从索引构建讲起。由于消费是要找到具体的消息位置,那就必须依赖于索引,也就是说,一条消息的索引构建完成之前,是无法被消费到的。 下图就是索引构建流程的简易图: 这是整个直接决定消费速率上限的流程。通过一个叫 ReputMessageService 的线程,顺序扫描 CommitLog 文件,将其分割为一条一条的消息,再对这些消息进行校验等行为,将其转换成一条条的索引信息,并写入对应分区的 ConsumeQueue 文件。 整个过程是完全串行的,从分割消息,到转换索引,到写入文件,每一条消息都要经过这么一次流转。因为一开始是串行实现,所以改造起来也非常的自然,那就是通过流水线改造,提高它的并发度,这里面有几个需要解决的问题: CommitLog 的扫描过程并行难度高,因为每条消息的长度是不一致的,无法简单地分割出消息边界来分配任务。 单条消息的索引构建任务并不重,因此不能简单忽略掉任务流转过程中的开销(队列入队出队)。 写入 ConsumeQueue 文件的时候要求写入时机队列维度有序,否则会带来额外的检查开销等。 针对这几个难点,在设计中也引入了“批量处理”的思路,其实大到架构设计、小到实现细节,处处都体现了这一理念,下图就是改造后的流程: 由于 CommitLog 扫描过程很难并行化处理,那就干脆不做并行化改造了,就使用单线程去顺序扫描,但是扫描的时候会进行一个简单的批处理,扫描出来的消息并不是单条的,而是尽可能凑齐一个较大的 buffer 块,默认是 4MB,这个由多条消息构成的 buffer 块我们不妨将其称为一个 batch msg。 然后就是对这些 batch msg 进行并行解析,将 batch msg 以单条消息的粒度扫描出来,并构建对应的 DispatchRequest 结构,最终依次落盘到 ConsumeQueue 文件中。其中的关键点在于 batch msg 的顺序如何保证,以及 DispatchRequest 在流转时怎么保证顺序和效率。为此我专门实现了一个轻量级的队列 DispatchRequestOrderlyQueue,这个 Queue 采用环状结构,可以随着顺序标号不断递进,并且能做到 “无序入队,有序出队”,详细设计和实现均在开源 RocketMQ 仓库中,这里就不多赘述。 在经过改造后,索引构建流程不再成为扯后腿的一员,从原本眼中钉的角色美美隐身了~ BatchCQ 模型 经过上述索引构建流水线改造后,整个系统也就实现了最基本的批处理模型,可以在最小修改、最高兼容性的情况下让性能获得质的飞跃。 但是这并不够!因为早期的模型出于兼容性等考虑,所以依旧束手束脚的,于是 BatchCQ 模型诞生了,主要原因分为两个维度: 性能上: 早期模型中,Broker 端在准备写入阶段需要进行解包,会有一定的额外开销。 CommitLog 文件中不具备批量信息,索引需要分多次构建。 能力上: 无法实现端到端的批量行为,如加密、压缩。 那 BatchCQ 又是如何改进上述的问题的呢?其实也非常地直观,那就是“见字如面”,将 ConsumeQueue 也批量化。这个模型去掉 Broker 端写入前的解包行为,索引也只进行一次构建: 就像上图所示,如果把索引比做信封,原先每个信封只能包含一份索引信息,在批量化后则可以塞下任意数量的索引信息,具体的存储结构也发生了较大变化: 比如说如果来了两批消息,分别是(3+2)条,在普通的 CQ 模型里会分别插入 5 个 slot,分别索引到 5 条消息。但是在 BatchCQ 模型中,(3+2)条消息会只插入 2 个 slot,分别索引到 3 条以及 2 条。 也是因为这个特点,所以 CQ 原有的格式也发生了变化,为了记录更多信息不得不加入 Base Offset、Batch Num 等元素,而这些更改也让原来定位索引位置的逻辑发生了变化。 普通 CQ:每个 Slot 定长,【Slot 长度 QueueOffset】位点可以直接找到索引,复杂度 O(1)。 BatchCQ:通过二分法查找,复杂度 O(log n)。 虽然这部分只涉及到了 ConsumeQueue 的修改,但是它作为核心链路的一环,影响是很大的,首先一批的消息会被当作同一条消息来处理,不需要重新 unPack ,而且这些消息都会具有相同的 TAG、Keys 甚至 MessageId,想唯一区分同一批的消息,只能根据它们的 QueueOffset 了,这一点会让消息轨迹等依靠 MessageId 的能力无法直接兼容使用,但是消息的处理粒度依然可以保持不变(依赖的是 QueueOffset)。 AutoBatch 模型 通过 BatchCQ 改造之后,我们其实已经获得极致的吞吐量了。那个 AutoBatch 又是个啥呢? 这里又要从头说起,在早期批处理模型的总结里,提到了一个比较大的缺陷,那就是“使用起来不够顺手”,用户是需要关心各种约束条件的,就像前面提到的 Topic、消息类型、特殊 Flag 等,在 BatchCQ 里面其实是新增了 Keys、Tag 等维度的限制,错误使用会出现一些非预期的情况。 不难看出,无论是早期批处理模型、还是 BatchCQ 模型,使用起来都有一定的学习成本,除了需要关注各种使用方式外,想要用好,还有一些隐藏在暗处的问题需要主动去解决: 无论是早期的批处理模型,还是 batchCQ 模型,都需要发送端自行将消息分类打包。 消息分类和打包成本高,分类需要关心分类依据,打包需要关心触发时机。 分类依据复杂,早期批处理模型需要关注多个属性,batchCQ 在这基础上新增了多个限制。 打包时机不易掌握,使用不当容易出现性能下降、时延不稳定、分区不均衡等问题。 为了解决以上问题,AutoBatch 应运而生,它就是一台能自动分拣的无情打包机器,全天候运转,精密又高效,将以往需要用户关注的细节统统屏蔽,它具有以下几个优点: AutoBatch 托管分类和打包能力,只需要简单配置即可使用。 用户侧不感知托管的过程,使用原有发送接口即可享受批处理带来的性能提升,同时兼容同步发送和异步发送。 AutoBatch 同时兼容早期的批处理模型和 batchCQ 模型。 实现轻量,性能优秀,设计上优化延时抖动、小分区等问题。 首先到底有多简单呢?让我们来看一下: ```java // 发送端开启 AutoBatch 能力 rmqProducer.setAutoBatch(true); ``` 也就是说,只需要加入这么一行,就可以开启 RocketMQ 的性能模式,获得早期的批处理模型或者 BatchCQ 模型带来的极致吞吐量提升。在开启 AutoBatch 的开关后,用户所有已有的行为都不需要作出改变,使用原来经典的 Send(Message msg)即可;当然也可以进行更精细的内存控制和延时控制: ```java // 设置单个 MessageBatch 大小(kb) rmqProducer.batchMaxBytes(32 1024); // 设置最大聚合等待时间(ms) rmqProducer.batchMaxDelayMs(10); // 设置所有聚合器最大内存使用(kb) rmqProducer.totalBatchMaxBytes(32 1024 1024); ``` 那么它具体轻量在哪?又高效在哪?下面这个简易的流程图应该能给大家一个答案: 首先它只引入了一个单线程的背景线程——background thread,这个背景线程以 1/2 的 maxDelayMs 周期运行,将扫描到超过等待时机缓冲区的消息提交到异步发送的线程池中,此时就完成了时间维度的聚合。空间维度的聚合则是由发送线程在传递时进行检查,如果满足 maxBytes,则原地发送。 整个设计非常地精简,只额外引入了一个周期运行的线程,这样做可以避免因为 AutoBatch 模型本身出现性能短板,而且 batchMessage 的序列化过程也做了精简,去掉了发送时候所有的检测(在聚合过程中已提前分类)。 才艺展示 上面分享了 RocketMQ 在批处理模型上的演进,那么它们具体效果也就必须拉出来给大家做一个才艺展示了,以下所有的压测结果均来自于 OpenmessagingBenchmark 框架,压测中使用的各项配置如下所示: | | 压测机器 | x86芯片机器 | | | | | | 规格 | 32核(vCPU)64 GiB20 Mbpsecs.c7.8xlarge | 8核(vCPU)64 GiB20 Mbpsecs.r7.2xlarge | | 云盘 | 无 | ESSD云盘 PL1 965GiB (50000 IOPS) | | 操作系统 | Alibaba Cloud Linux 3.2104 LTS 64位 | Alibaba Cloud Linux 3.2104 LTS 64位 | | JDK版本 | openjdk version "11.0.19" 20230418 LTSOpenJDK Runtime Environment (Red_Hat11.0.19.0.71.0.1.al8) (build 11.0.19+7LTS) | openjdk version "11.0.19" 20230418 LTSOpenJDK Runtime Environment (Red_Hat11.0.19.0.71.0.1.al8) (build 11.0.19+7LTS) | 准备工作 为 OpenmessagingBenchmark 进行压测环境,首先部署一套开源社区上最新的 RocketMQ,然后配置好 Namesrv 接入点等信息,然后打开 RocketMQ 的性能模式——AutoBatch,将 autoBatch 字段设置为 true: 早期批处理模型 ```java bin/benchmark drivers driverrocketmq/rocketmq.yaml workloads/1topic100partitions1kb4p4c1000k.yaml ``` 开启 autobatch 能力后,就会使用早期批处理模型进行性能提升,可以看到提升幅度非常大,由原来的 8w 提升至 27w 附近,为原来的 300%。 索引构建流水线优化 流水线优化是需要在服务端开启的,下面是一个简单的配置例子: ```java // 开启索引构建流水线优化 enableBuildConsumeQueueConcurrently=true // 调整内存中消息最大消费阈值 maxTransferBytesOnMessageInMemory=256M maxTransferCountOnMessageInMemory=32K // 调整磁盘中消息最大消费阈值 maxTransferBytesOnMessageInDisk=64M maxTransferCountOnMessageInDisk=32K ``` 可以看到,只有开启索引构建优化,才能做到 稳稳地达到 27w 的吞吐,在没有开启的时候,消费速率不足会触发冷读直至影响到整个系统的稳定性,同时也不具备生产意义,所以在使用批量模型的时候也务必需要开启索引构建优化。 BatchCQ模型 BatchCQ 模型的使用与前面提到的两者不同,它不是通过开关开启的,BatchCQ 其实是一种 Topic 类型,当创建 topic 的时候指定其为 BatchCQ 类型,既可拥有最极致的吞吐量优势。 ```java // Topic 的各种属性在 TopicAttributes 中设置 public static final EnumAttribute QUEUE_TYPE_ATTRIBUTE = new EnumAttribute("queue.type", false, newHashSet("BatchCQ", "SimpleCQ"), "SimpleCQ"); topicConfig.getAttributes().put("+" + TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName(), "BatchCQ"); ``` 当使用 BatchCQ 模型的时候,与早期批处理模型已经有了天壤之别,因此我们寻求了和开源 Kafka 的对比,部署架构如下: RocketMQ 3 主 3 备架构,使用轻量级 Container 部署。 节点 1: MasterA,SlaveC 节点 2: MasterC,SlaveB 节点 3: MasterB,SlaveA Kafka 3 个节点,设置分区副本数为 2。 压测结果 | | MQ | Kafka | | | | | | 16partions | TPS: 251439.34P99: 264.0 | TPS: 267296.34P99: 1384.01 | | 10000partiotions | TPS: 249981.94P99: 1341.01 | 报错无数据 | 可以看到,在使用 BatchCQ 类型的 Topic 时,RocketMQ 与 Kafka 的性能基本持平: 16partitions,二者吞吐量相差 5% 以内,且 RocketMQ 则具有明显更低的延时表现。 10000partitions,得益于 RocketMQ 的存储结构更为集中,在大量分区场景下吞吐量几乎保持不变。而Kafka在默认配置的情况下出现报错无法使用。 因此在极致吞吐量的需求下,BatchCQ 模型能够很好地承接极致需求的流量,而且如果更换性能更好的本地磁盘,同样的机器配置能达到更高的上限。
作者:谷乂
#技术探索 #功能特性
收藏
收藏暂无数据,请从小助手对话框添加
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
无疑 AI答疑专家
当前服务输出的内容均由人工智能模型生成,其生成内容的准确性和完整性无法保证,不代表我们的态度或观点。
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
专家答疑