2025年5月29日

Apache RocketMQ 源码解析 —— 秒级定时消息介绍
背景 如今rocketmq的应用场景日益拓宽,延时消息的需求也在增加。原本的特定级别延时消息已经不足以支撑rocketmq灵活的使用场景。因此,我们需要一个支持任意时间的延迟消息feature。 支持任意时间延迟的feature能够让使用者在消息发出时指定其消费时间,在生活与生产中具有非常重要的意义。 目标 1. 支持任意时延的延迟消息。 2. 提供延迟消息可靠的存储方式。 3. 保证延迟消息具有可靠的收发性能。 4. 提供延迟消息的可观测性排查能力。 架构 存储数据结构 本方案主要通过时间轮实现任意时延的定时消息。在此过程中,涉及两个核心的数据结构:TimerLog(存储消息索引)和TimerWheel(时间轮,用于定时消息到时)。 TimerLog,为本RIP中所设计的定时消息的记录文件,Append Only。每条记录包含一个prev_pos,指向前一条定时到同样时刻的记录。每条记录的内容可以包含定时消息本身,也可以只包含定时消息的位置信息。每一条记录包含如下信息: | 名称 | 大小 | 备注 | | | | | | size | 4B | 保存记录的大小 | | prev_pos | 8B | 前一条记录的位置 | | next_Pos | 8B | 后一条记录的位置,暂时为1,作为保留字段 | | magic | 4B | magic value | | delayed_time | 4B | 该条记录的定时时间 | | offset_real | 8B | 该条消息在commitLog中的位置 | | size_real | 4B | 该条消息在commitLog中的大小 | | hash_topic | 4B | 该条消息topic的hash code | | varbody | | 存储可变的body,暂时没有 | TimerWheel是对时刻表的一种抽象,通常使用数组实现。时刻表上的每一秒,顺序对应到数组中的位置,然后数组循环使用。时间轮的每一格,指向了TimerLog中的对应位置,如果这一格的时间到了,则按TimerLog中的对应位置以及prev_pos位置依次读出每条消息。 时间轮一格一格向前推进,配合TimerLog,依次读出到期的消息,从而达到定时消息的目的。时间轮的每一格设计如下: | delayed_time(8B) 延迟时间 | first_pos(8B) 首条位置 | last_pos(8B) 最后位置 | num(4B)消息条数 | | | | | | 上述设计的TimerLog与TimerWheel的协作如下图所示。 pipeline 在存储方面,采用本地文件系统作为可靠的延时消息存储介质。延时消息另存TimerLog文件中。通过时间轮对定时消息进行定位以及存取。针对长时间定时消息,通过消息滚动的方式避免过大的消息存储量。其具体架构如下所示: 从图中可以看出,共有五个Service分别处理定时消息的放置和存储。工作流如下: 1. 针对放置定时消息的service,每50ms从commitLog读取指定主题(TIMER_TOPIC)的定时消息。 1. TimerEnqueueGetService从commitLog读取得到定时主题的消息,并先将其放入enqueuePutQueue。 2. 另一个线程TimerEnqueuePutService将其放入timerLog,更新时间轮的存储内容。将该任务放进时间轮的指定位置。 2. 针对取出定时消息的service,每50ms读取下一秒的slot。有三个线程将读取到的消息重新放回commitLog。 1. 首先,TimerDequeueGetService每50ms读一次下一秒的slot,从timerLog中得到指定的msgs,并放进dequeueGetQueue。 2. 而后TimerDequeueGetMessageService从dequeueGetQueue中取出msg,并将其放入队列中。该队列为待写入commitLog的队列,dequeuePutQueue。 3. 最后TimerDequeuePutMessageService将这个queue中的消息取出,若已到期则修改topic,放回commitlog,否则继续按原topic写回CommitLog滚动。 代码实现 TimerLog与TimerWheel的协作实现 定时消息的核心存储由TimerLog和TimerWheel协同完成。TimerLog作为顺序写入的日志文件,每条记录包含消息在CommitLog中的物理偏移量(offsetPy)和延迟时间(delayed_time)。当消息到达时,TimerEnqueuePutService会将其索引信息追加到TimerLog,并通过prev_pos字段构建链表结构,确保同一时刻的多个消息可被快速遍历。 ```java // TimerLog.java 核心写入逻辑 public long append(byte[] data, int pos, int len) { MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); // 处理文件切换:当当前文件剩余空间不足时,填充空白段并创建新文件 if (len + MIN_BLANK_LEN mappedFile.getFileSize() mappedFile.getWrotePosition()) { ByteBuffer blankBuffer = ByteBuffer.allocate(MIN_BLANK_LEN); blankBuffer.putInt(mappedFile.getFileSize() mappedFile.getWrotePosition()); blankBuffer.putLong(0); // prev_pos置空 blankBuffer.putInt(BLANK_MAGIC_CODE); // 标记空白段 mappedFile.appendMessage(blankBuffer.array()); mappedFile = this.mappedFileQueue.getLastMappedFile(0); // 切换到新文件 } // 写入实际数据并返回物理偏移量 long currPosition = mappedFile.getFileFromOffset() + mappedFile.getWrotePosition(); mappedFile.appendMessage(data, pos, len); return currPosition; } ``` 此代码展示了消息追加的核心流程: 1. 检查当前文件的剩余空间,不足时填充空白段并创建新文件 2. 将消息索引数据写入内存映射文件 3. 返回写入位置的全局偏移量,供时间轮记录 时间轮槽位管理 TimerWheel通过数组结构管理时间槽位,每个槽位记录该时刻的首尾指针和消息数量。当消息入队时,putSlot方法会更新对应槽位的链表结构: ```java // TimerWheel.java 槽位更新逻辑 public void putSlot(long timeMs, long firstPos, long lastPos, int num, int magic) { localBuffer.get().position(getSlotIndex(timeMs) Slot.SIZE); localBuffer.get().putLong(timeMs / precisionMs); // 标准化时间戳 localBuffer.get().putLong(firstPos); // 链表头指针 localBuffer.get().putLong(lastPos); // 链表尾指针 localBuffer.get().putInt(num); // 当前槽位消息总数 localBuffer.get().putInt(magic); // 特殊标记(如滚动/删除) } ``` 该方法的实现细节: + getSlotIndex(timeMs)将时间戳映射到环形数组的索引 + 同时记录首尾指针以实现O(1)复杂度插入 消息入队流程 TimerEnqueueGetService:消息扫描服务 该服务作为定时消息入口,持续扫描CommitLog中的TIMER_TOPIC消息。其核心逻辑通过enqueue方法实现: ```java // TimerMessageStore.java public boolean enqueue(int queueId) { ConsumeQueueInterface cq = this.messageStore.getConsumeQueue(TIMER_TOPIC, queueId); ReferredIterator iterator = cq.iterateFrom(currQueueOffset); while (iterator.hasNext()) { CqUnit cqUnit = iterator.next(); MessageExt msgExt = getMessageByCommitOffset(cqUnit.getPos(), cqUnit.getSize()); // 构造定时请求对象 TimerRequest timerRequest = new TimerRequest( cqUnit.getPos(), cqUnit.getSize(), Long.parseLong(msgExt.getProperty(TIMER_OUT_MS)), System.currentTimeMillis(), MAGIC_DEFAULT, msgExt ); // 放入入队队列(阻塞式) while (!enqueuePutQueue.offer(timerRequest, 3, TimeUnit.SECONDS)) { if (!isRunningEnqueue()) return false; } currQueueOffset++; // 更新消费进度 } } ``` 关键设计点: 1. 增量扫描:通过currQueueOffset记录消费位移,避免重复处理 2. 消息转换:将ConsumeQueue中的索引转换为包含完整元数据的TimerRequest` TimerEnqueuePutService:时间轮写入服务 从队列获取请求后,该服务执行核心的定时逻辑: ```java // TimerMessageStore.java public boolean doEnqueue(long offsetPy, int sizePy, long delayedTime, MessageExt messageExt) { // 计算目标时间槽位 Slot slot = timerWheel.getSlot(delayedTime); // 构造TimerLog记录 ByteBuffer buffer = ByteBuffer.allocate(TimerLog.UNIT_SIZE); buffer.putLong(slot.lastPos); // 前驱指针指向原槽位尾 buffer.putLong(offsetPy); // CommitLog物理偏移 buffer.putInt(sizePy); // 消息大小 buffer.putLong(delayedTime); // 精确到毫秒的延迟时间 // 写入TimerLog并更新时间轮 long pos = timerLog.append(buffer.array()); timerWheel.putSlot(delayedTime, slot.firstPos, pos, slot.num + 1); } ``` 写入优化策略: + 空间预分配:当检测到当前MappedFile剩余空间不足时,自动填充空白段并切换文件 + 链表式存储:通过prev_pos字段构建时间槽位的倒序链表,确保新消息快速插入 + 批量提交:积累多个请求后批量写入,减少文件I/O次数 TimerEnqueuePutService从队列获取消息请求,处理消息滚动逻辑。当检测到延迟超过时间轮窗口时,将消息重新写入并标记为滚动状态: ```java // TimerMessageStore.java 消息滚动处理 boolean needRoll = delayedTime tmpWriteTimeMs = (long) timerRollWindowSlots precisionMs; if (needRoll) { magic |= MAGIC_ROLL; // 调整延迟时间为时间轮窗口中间点,确保滚动后仍有处理时间 delayedTime = tmpWriteTimeMs + (long) (timerRollWindowSlots / 2) precisionMs; } ``` 此逻辑的关键设计: 1. 当延迟时间超过当前时间轮容量时触发滚动 2. 将消息的delayed_time调整为窗口中间点,避免频繁滚动 3. 设置MAGIC_ROLL标记,出队时识别滚动消息 消息出队处理 TimerDequeueGetService:到期扫描服务 该服务以固定频率推进时间指针,触发到期消息处理: ```java // TimerMessageStore.java public int dequeue() throws Exception { // 获取当前时间槽位 Slot slot = timerWheel.getSlot(currReadTimeMs); // 遍历TimerLog链表 long currPos = slot.lastPos; while (currPos != 1) { SelectMappedBufferResult sbr = timerLog.getTimerMessage(currPos); ByteBuffer buf = sbr.getByteBuffer(); // 解析记录元数据 long prevPos = buf.getLong(); long offsetPy = buf.getLong(); int sizePy = buf.getInt(); long delayedTime = buf.getLong(); // 分类处理(普通消息/删除标记) if (isDeleteMarker(buf)) { deleteMsgStack.add(new TimerRequest(offsetPy, sizePy, delayedTime)); } else { normalMsgStack.addFirst(new TimerRequest(offsetPy, sizePy, delayedTime)); } currPos = prevPos; // 前向遍历链表 } // 分发到处理队列 splitAndDispatch(normalMsgStack); splitAndDispatch(deleteMsgStack); moveReadTime(); // 推进时间指针 } ``` TimerDequeuePutMessageService:消息投递服务 TimerDequeuePutMessageService将到期消息重新写入CommitLog。对于滚动消息,会修改主题属性并增加重试计数: ```java // TimerMessageStore.java 消息转换逻辑 MessageExtBrokerInner convert(MessageExt messageExt, long enqueueTime, boolean needRoll) { if (needRoll) { // 增加滚动次数标记 MessageAccessor.putProperty(messageExt, TIMER_ROLL_TIMES, Integer.parseInt(messageExt.getProperty(TIMER_ROLL_TIMES)) + 1 + ""); } // 恢复原始主题和队列ID messageInner.setTopic(messageInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC)); messageInner.setQueueId(Integer.parseInt( messageInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID))); return messageInner; } ``` 此转换过程确保: + 滚动消息保留原始主题信息 + 每次滚动增加TIMER_ROLL_TIMES属性 该服务最终将到期消息重新注入CommitLog: ```java // TimerMessageStore.java public void run() { while (!stopped) { TimerRequest req = dequeuePutQueue.poll(10, TimeUnit.MILLISECONDS); MessageExtBrokerInner innerMsg = convert(req.getMsg(), req.needRoll()); // 消息重投递逻辑 int result = doPut(innerMsg, req.needRoll()); switch (result) { case PUT_OK: // 成功:更新监控指标 timerMetrics.recordDelivery(req.getTopic()); break; case PUT_NEED_RETRY: // 重试:重新放回队列头部 dequeuePutQueue.putFirst(req); break; case PUT_NO_RETRY: // 丢弃:记录错误日志 log.warn("Discard undeliverable message:{}", req); } } } ``` 故障恢复机制 系统重启时通过recover方法重建时间轮状态。关键步骤包括遍历TimerLog文件并修正槽位指针: ```java // TimerMessageStore.java 恢复流程 private long recoverAndRevise(long beginOffset, boolean checkTimerLog) { List mappedFiles = timerLog.getMappedFileQueue().getMappedFiles(); for (MappedFile mappedFile : mappedFiles) { SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0); ByteBuffer bf = sbr.getByteBuffer(); while (position Google Doc: + Shimo:
#技术探索

2025年5月29日

Apache RocketMQ 源码解析 —— Controller 高可用切换架构
一、原理及核心概念浅述 1.1 核心架构 1.2 核心概念 1. controller:负责管理broker间的主备关系,可以挂在namesrv中,不影响namesrv能力,支持独立部署。 2. master/slave:主备身份。 3. syncStateSet:字面意思为“同步状态集合”。当备节点能够及时跟上主节点,则会纳入syncStateSet。 4. epoch:用于记录每一次主备切换时的状态,避免切换后产生数据丢失或者不一致的情况。 为方便理解,在某些过程中可以把controller当作班主任,master作为小组长,slave作为小组成员。同步过程是各位同学向小组长抄作业的过程,位于syncStateSet中的是优秀作业。 二、相关代码文件及说明 核心是“controller+broker+复制过程”,因此分三块进行叙述。 2.1 Controller 该部分代码主要集中在rocketmqcontroller模块下,主要有如下代码文件: + ControllerManager: 负责管理controller,其中存储了许多controller相关配置,并负责了心跳管理等核心功能。(班主任管理条例) + DLederController: Controller的DLedger实现。包含了controller的基本功能。在其中实现了副本信息管理、broker存活情况探测、选举Master等核心功能。(某种班主任) + DefaultBrokerHeartbeatManager: 负责管理broker心跳,其中包含了broker存活情况表,以及在broker下线时的listeners,当副本掉线时,触发重新选举。(点名册) + ReplicasInfoManager: 负责controller中事件的处理。即各种选举事件、更换SyncStateSet事件等等。(小组登记册) + ControllerRequestProcessor: 处理向controller发送的requests,例如让controller选举、向controller注册broker、心跳、更换SyncStateSet等等。(班主任信箱) + DefaultElectPolicy: 选举Master的策略。可以选择从sync状态的副本中选,也可以支持从所有副本中(无论是否同步)的unclean选举。(班规) + ...... 2.2 Broker 该部分代码主要集中在rocketmqbroker模块中,可进入org/apache/rocketmq/broker/controller进行查看: + ReplicasManager: 完成自己作为一个replica的使命——找controller,角色管理,Master更新(Expand/Shrink)SyncStateSet等等。 2.3 复制模块 该部分代码主要集中在rocketmqstore模块中的ha文件夹下: + HAService: 每个Replica必备的的service,负责管理作为主、备的同步任务。 + HAClient: 每个Slave 的HAService中必备的client,负责管理同步任务中的读、写操作。 + HAConnection: 代表在Master中的HA连接,每个connection理论上对应一个slave。在该connection类中存储了传输过程中的诸多内容,包括channel、传输状态、当前传输位点等等信息。 三、核心流程 3.1 心跳 核心CODE:BROKER_HEARTBEAT Broker端: 该部分较简单,带上code向controller发request,不再赘述: BrokerController.sendHeartbeat() brokerOuterAPI.sendHeartbeat() Controller端: 1. 首先由ControllerRequestProcessor接收到code,进入处理逻辑: ```java private RemotingCommand handleBrokerHeartbeat(ChannelHandlerContext ctx, RemotingCommand request) throws Exception { final BrokerHeartbeatRequestHeader requestHeader = (BrokerHeartbeatRequestHeader) request.decodeCommandCustomHeader(BrokerHeartbeatRequestHeader.class); if (requestHeader.getBrokerId() == null) { return RemotingCommand.createResponseCommand(ResponseCode.CONTROLLER_INVALID_REQUEST, "Heart beat with empty brokerId"); } this.heartbeatManager.onBrokerHeartbeat(requestHeader.getClusterName(), requestHeader.getBrokerName(), requestHeader.getBrokerAddr(), requestHeader.getBrokerId(), requestHeader.getHeartbeatTimeoutMills(), ctx.channel(), requestHeader.getEpoch(), requestHeader.getMaxOffset(), requestHeader.getConfirmOffset(), requestHeader.getElectionPriority()); return RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "Heart beat success"); } ``` 2. 之后在onBrokerHeartbeat()中,主要更新controller brokerHeartbeatManager中的brokerLiveTable: ```java public void onBrokerHeartbeat(String clusterName, String brokerName, String brokerAddr, Long brokerId, Long timeoutMillis, Channel channel, Integer epoch, Long maxOffset, Long confirmOffset, Integer electionPriority) { BrokerIdentityInfo brokerIdentityInfo = new BrokerIdentityInfo(clusterName, brokerName, brokerId); BrokerLiveInfo prev = this.brokerLiveTable.get(brokerIdentityInfo); ...... if (null == prev) { this.brokerLiveTable.put(...); log.info("new broker registered, {}, brokerId:{}", brokerIdentityInfo, realBrokerId); } else { prev.setXXX(......) } } ``` 3.2 选举 相关CODE: CONTROLLER_ELECT_MASTER 有如下几种情形可能触发选举: 1. controller主动发起,通过triggerElectMaster(): 1. HeartbeatManager监听到有broker心跳失效。 (班主任发现有小组同学退学了) 2. Controller检测到有一组Replica Set不存在master。(班主任发现有组长虽然在名册里,但是挂了) 2. broker发起将自己选为master,通过ReplicaManager.brokerElect(): 1. Broker向controller查metadata时,没找到master信息。(同学定期检查小组情况,问班主任为啥没小组长) 2. Broker向controller注册完后,仍未从controller获取到master信息。(同学报道后发现没小组长,汇报) 3. 通过tools发起: 1. 通过选举命令ReElectMasterSubCommand发起。(校长直接任命) 上述所有过程,最终均触发: controller.electMaster() replicasInfoManager.electMaster() // 即,所有小组长必须通过班主任任命 ```java public ControllerResult electMaster(final ElectMasterRequestHeader request, final ElectPolicy electPolicy) { ... // 从request中取信息 ... if (syncStateInfo.isFirstTimeForElect()) { // 从未注册,直接任命 newMaster = brokerId; } // 按选举政策选主 if (newMaster == null) { // we should assign this assignedBrokerId when the brokerAddress need to be elected by force Long assignedBrokerId = request.getDesignateElect() ? brokerId : null; newMaster = electPolicy.elect(brokerReplicaInfo.getClusterName(), brokerReplicaInfo.getBrokerName(), syncStateSet, allReplicaBrokers, oldMaster, assignedBrokerId); } if (newMaster != null && newMaster.equals(oldMaster)) { // 老主 == 新主 // old master still valid, change nothing String err = String.format("The old master %s is still alive, not need to elect new master for broker %s", oldMaster, brokerReplicaInfo.getBrokerName()); LOGGER.warn("{}", err); // the master still exist response.setXXX() result.setBody(new ElectMasterResponseBody(syncStateSet).encode()); result.setCodeAndRemark(ResponseCode.CONTROLLER_MASTER_STILL_EXIST, err); return result; } // a new master is elected if (newMaster != null) { // 出现不一样的新主 final int masterEpoch = syncStateInfo.getMasterEpoch(); final int syncStateSetEpoch = syncStateInfo.getSyncStateSetEpoch(); final HashSet newSyncStateSet = new HashSet<(); //设置新的syncStateSet newSyncStateSet.add(newMaster); response.setXXX()... ElectMasterResponseBody responseBody = new ElectMasterResponseBody(newSyncStateSet); } result.setBody(responseBody.encode()); final ElectMasterEvent event = new ElectMasterEvent(brokerName, newMaster); result.addEvent(event); return result; } // 走到这里,说明没有主,选举失败 // If elect failed and the electMaster is triggered by controller (we can figure it out by brokerAddress), // we still need to apply an ElectMasterEvent to tell the statemachine // that the master was shutdown and no new master was elected. if (request.getBrokerId() == null) { final ElectMasterEvent event = new ElectMasterEvent(false, brokerName); result.addEvent(event); result.setCodeAndRemark(ResponseCode.CONTROLLER_MASTER_NOT_AVAILABLE, "Old master has down and failed to elect a new broker master"); } else { result.setCodeAndRemark(ResponseCode.CONTROLLER_ELECT_MASTER_FAILED, "Failed to elect a new master"); } return result; } ``` 3.3 更新SyncStateSet 核心CODE: CONTROLLER_ALTER_SYNC_STATE_SET 1. 由master发起,主动向controller更换syncStateSet(等价于小组长汇报优秀作业) 2. controllerRequestProcessor接收更换syncStateSet的请求,进入handleAlterSyncStateSet()方法: ```java private RemotingCommand handleAlterSyncStateSet(ChannelHandlerContext ctx, RemotingCommand request) throws Exception { final AlterSyncStateSetRequestHeader controllerRequest = (AlterSyncStateSetRequestHeader) request.decodeCommandCustomHeader(AlterSyncStateSetRequestHeader.class); final SyncStateSet syncStateSet = RemotingSerializable.decode(request.getBody(), SyncStateSet.class); final CompletableFuture future = this.controllerManager.getController().alterSyncStateSet(controllerRequest, syncStateSet); if (future != null) { return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS); } return RemotingCommand.createResponseCommand(null); } ``` 3. 之后进入Controller.alterSyncStateSet() replicasInfoManager.alterSyncStateSet()方法: ```java public ControllerResult alterSyncStateSet( final AlterSyncStateSetRequestHeader request, final SyncStateSet syncStateSet, final BrokerValidPredicate brokerAlivePredicate) { final String brokerName = request.getBrokerName(); ... final Set newSyncStateSet = syncStateSet.getSyncStateSet(); final SyncStateInfo syncStateInfo = this.syncStateSetInfoTable.get(brokerName); final BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName); // 检查syncStateSet是否有变化 final Set oldSyncStateSet = syncStateInfo.getSyncStateSet(); if (oldSyncStateSet.size() == newSyncStateSet.size() && oldSyncStateSet.containsAll(newSyncStateSet)) { String err = "The newSyncStateSet is equal with oldSyncStateSet, no needed to update syncStateSet"; ... } // 检查是否是master发起的 if (!syncStateInfo.getMasterBrokerId().equals(request.getMasterBrokerId())) { String err = String.format("Rejecting alter syncStateSet request because the current leader is:{%s}, not {%s}", syncStateInfo.getMasterBrokerId(), request.getMasterBrokerId()); ... } // 检查master的任期epoch是否一致 if (request.getMasterEpoch() != syncStateInfo.getMasterEpoch()) { String err = String.format("Rejecting alter syncStateSet request because the current master epoch is:{%d}, not {%d}", syncStateInfo.getMasterEpoch(), request.getMasterEpoch()); ... } // 检查syncStateSet的epoch if (syncStateSet.getSyncStateSetEpoch() != syncStateInfo.getSyncStateSetEpoch()) { String err = String.format("Rejecting alter syncStateSet request because the current syncStateSet epoch is:{%d}, not {%d}", syncStateInfo.getSyncStateSetEpoch(), syncStateSet.getSyncStateSetEpoch()); ... } // 检查新的syncStateSet的合理性 for (Long replica : newSyncStateSet) { // 检查replica是否存在 if (!brokerReplicaInfo.isBrokerExist(replica)) { String err = String.format("Rejecting alter syncStateSet request because the replicas {%s} don't exist", replica); ... } // 检查broker是否存活 if (!brokerAlivePredicate.check(brokerReplicaInfo.getClusterName(), brokerReplicaInfo.getBrokerName(), replica)) { String err = String.format("Rejecting alter syncStateSet request because the replicas {%s} don't alive", replica); ... } } // 检查是否包含master if (!newSyncStateSet.contains(syncStateInfo.getMasterBrokerId())) { String err = String.format("Rejecting alter syncStateSet request because the newSyncStateSet don't contains origin leader {%s}", syncStateInfo.getMasterBrokerId()); ... } // 更新epoch int epoch = syncStateInfo.getSyncStateSetEpoch() + 1; ... // 生成事件,替换syncStateSet final AlterSyncStateSetEvent event = new AlterSyncStateSetEvent(brokerName, newSyncStateSet); ... } ``` 4. 最后通过syncStateInfo.updateSyncStateSetInfo(),更新syncStateSetInfoTable.get(brokerName)得到的syncStateInfo信息(该过程可以理解为班主任在班级分组册上找到了组长的名字,拿出组员名单,更新)。 3.4 复制 该部分较复杂,其中HAService/HAClient/HAConnection以及其中的各种Service/Reader/Writer容易产生混淆,对阅读造成阻碍。因此绘制本图帮助理解(可在粗读源码后回头理解): 下面对HA复制过程作拆解,分别讲解: 1. 在各个replica的DefaultMessageStore中均注册了HAService,负责管理HA的复制。 2. 在Master的 HAService中有一个AcceptSocketService, 负责自动接收各个slave的连接: ```java protected abstract class AcceptSocketService extends ServiceThread { ... / Starts listening to slave connections. @throws Exception If fails. / public void beginAccept() throws Exception { ... } @Override public void shutdown(final boolean interrupt) { ... } @Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { this.selector.select(1000); Set selected = this.selector.selectedKeys(); if (selected != null) { for (SelectionKey k : selected) { if (k.isAcceptable()) { SocketChannel sc = ((ServerSocketChannel) k.channel()).accept(); if (sc != null) { DefaultHAService.log.info("HAService receive new connection, " + sc.socket().getRemoteSocketAddress()); try { HAConnection conn = createConnection(sc); conn.start(); DefaultHAService.this.addConnection(conn); } catch (Exception e) { log.error("new HAConnection exception", e); sc.close(); } } } ... } } ``` 3. 在各个Slave 的HAService中存在一个HAClient,负责向master发起连接、传输请求。 ```java public class AutoSwitchHAClient extends ServiceThread implements HAClient { ... } public interface HAClient { void start(); void shutdown(); void wakeup(); void updateMasterAddress(String newAddress); void updateHaMasterAddress(String newAddress); String getMasterAddress(); String getHaMasterAddress(); long getLastReadTimestamp(); long getLastWriteTimestamp(); HAConnectionState getCurrentState(); void changeCurrentState(HAConnectionState haConnectionState); void closeMaster(); long getTransferredByteInSecond(); } ``` 4. 当master收到slave的连接请求后,将会创建一个HAConnection,负责收发内容。 ```java public interface HAConnection { void start(); void shutdown(); void close(); SocketChannel getSocketChannel(); HAConnectionState getCurrentState(); String getClientAddress(); long getTransferredByteInSecond(); long getTransferFromWhere(); long getSlaveAckOffset(); } ``` 5. Master的HAConnection会与Slave的HAClient建立连接,二者均通过HAWriter(较简单,不解读,位于HAWriter类)往socket中写内容,再通过HAReader读取socket中的内容。只不过一个是HAServerReader,一个是HAClientReader: ```java public abstract class AbstractHAReader { private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); protected final List readHookList = new ArrayList<(); public boolean read(SocketChannel socketChannel, ByteBuffer byteBufferRead) { int readSizeZeroTimes = 0; while (byteBufferRead.hasRemaining()) { ... boolean result = processReadResult(byteBufferRead); ... } } ... protected abstract boolean processReadResult(ByteBuffer byteBufferRead); } ``` 6. 两种HAReader均实现了processReadResult()方法,负责处理从socket中得到的数据。client需要详细阐述该方法,因为涉及到如何将读进来的数据写入commitlog,client的processReadResult(): ```java @Override protected boolean processReadResult(ByteBuffer byteBufferRead) { int readSocketPos = byteBufferRead.position(); try { while (true) { ... switch (AutoSwitchHAClient.this.currentState) { case HANDSHAKE: { ... // 握手阶段,先检查commitlog完整性,截断 } break; case TRANSFER: { // 传输阶段,将body写入commitlog ... byte[] bodyData = new byte[bodySize]; ... if (bodySize 0) { // 传输阶段,将body写入commitlog AutoSwitchHAClient.this.messageStore.appendToCommitLog(masterOffset, bodyData, 0, bodyData.length); } haService.updateConfirmOffset(Math.min(confirmOffset, messageStore.getMaxPhyOffset())); ... break; } default: break; } if (isComplete) { continue; } } // 检查buffer中是否还有数据, 如果有, compact() ... break; } } ... } ``` 7. server的processReadResult()主要用于接收client的握手等请求,较简单。更需要解释其WriteSocketService如何向socket中调用HAwriter去写数据: ```java abstract class AbstractWriteSocketService extends ServiceThread { ... private void transferToSlave() throws Exception { ... int size = this.getNextTransferDataSize(); if (size 0) { ... buildTransferHeaderBuffer(this.transferOffset, size); this.lastWriteOver = this.transferData(size); } else { // 无需传输,直接更新caught up的时间 AutoSwitchHAConnection.this.haService.updateConnectionLastCaughtUpTime(AutoSwitchHAConnection.this.slaveId, System.currentTimeMillis()); haService.getWaitNotifyObject().allWaitForRunning(100); } } @Override public void run() { AutoSwitchHAConnection.LOGGER.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { this.selector.select(1000); switch (currentState) { case HANDSHAKE: // Wait until the slave send it handshake msg to master. // 等待slave的握手请求,并进行回复 break; case TRANSFER: ... transferToSlave(); break; default: ... } } catch (Exception e) { ... } } ... // 在service结束后的一些事情 } ... } ``` 此处同样附上server实现processReadResult(),读socket中数据的代码: ```java @Override protected boolean processReadResult(ByteBuffer byteBufferRead) { while (true) { ... HAConnectionState slaveState = HAConnectionState.values()[byteBufferRead.getInt(readPosition)]; switch (slaveState) { case HANDSHAKE: // 收到了client的握手 ... LOGGER.info("Receive slave handshake, slaveBrokerId:{}, isSyncFromLastFile:{}, isAsyncLearner:{}", AutoSwitchHAConnection.this.slaveId, AutoSwitchHAConnection.this.isSyncFromLastFile, AutoSwitchHAConnection.this.isAsyncLearner); break; case TRANSFER: // 收到了client的transfer状态 ... // 更新client状态信息 break; default: ... } ... } ``` 3.5 Active Controller的选举 该选举主要通过DLedger实现,在DLedgerController中通过RoleChangeHandler.handle()更新自身身份: ```java class RoleChangeHandler implements DLedgerLeaderElector.RoleChangeHandler { private final String selfId; private final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("DLedgerControllerRoleChangeHandler_")); private volatile MemberState.Role currentRole = MemberState.Role.FOLLOWER; public RoleChangeHandler(final String selfId) { this.selfId = selfId; } @Override public void handle(long term, MemberState.Role role) { Runnable runnable = () { switch (role) { case CANDIDATE: this.currentRole = MemberState.Role.CANDIDATE; // 停止扫描inactive broker任务 ... case FOLLOWER: this.currentRole = MemberState.Role.FOLLOWER; // 停止扫描inactive broker任务 ... case LEADER: { log.info("Controller {} change role to leader, try process a initial proposal", this.selfId); int tryTimes = 0; while (true) { // 将会开始扫描inactive brokers ... break; } } }; this.executorService.submit(runnable); } ... } ```
#技术探索

2025年5月29日

Apache RocketMQ 源码解析 —— POP 消费模式逻辑介绍
一、背景 在RocketMQ 5.0之前,消费有两种方式可以从Broker获取消息,分别为Pull模式和Push模式。 1. Pull模式:消费需要不断的从阻塞队列中获取数据,如果没有数据就等待,这个阻塞队列中的数据由消息拉取线程从Broker拉取消息之后加入的,所以Pull模式下消费需要不断主动从Broker拉取消息。 2. Push模式:需要注册消息监听器,当有消息到达时会通过回调函数进行消息消费,从表面上看就像是Broker主动推送给消费者一样,所以叫做推模式,底层依旧是消费者从Broker拉取数据然后触发回调函数进行消息消费,只不过不需要像Pull模式一样不断判断是否有消息到来。 不过不管是Pull模式还是Push模式,在集群模式下,一个消息队列只能分配给同一个消费组内的某一个消费者进行消费,所以需要进行Rebalance负载均衡为每个消费者分配消息队列之后才可以进行消息消费。 Rebalance的工作是在每个消费者端进行的,消费端负责的工作太多,除了负载均衡还有消费位点管理等功能,如果新增一种语言的支持,就需要重新实现一遍对应的业务逻辑代码。 除此以外,在RocketMQ 5.0以前负载均衡是以消息队列为维度为每个消费者分配的,一个消息队列只能分给组内一个消费者消费,所以会存在以下问题: (1)队列一次只能分给组内一个消费者消费,扩展能力有限。 (2)消息队列数量与消费者数量比例不均衡时,可能会导致某些消费者没有消息队列可以分配或者某些消费者承担过多的消息队列,分配不均匀; (3)如果某个消费者hang住,会导致分配到该消费者的消息队列中的消息无法消费,导致消息积压; 在RocketMQ 5.0增加了Pop模式消费,将负载均衡、消费位点管理等功能放到了Broker端,减少客户端的负担,使其变得轻量级,并且5.0之后支持消息粒度的负载均衡。当前支持任意时延的定时消息已在开源社区开源,而其支持的任意时延定时特性可以帮助POP进行一定优化。主要优化点在于InvisibleTime的设置。 二、原理 POP模式下,消息的消费有如下几个过程: 客户端发起POP消费请求Broker处理POP请求,返回消息客户端消费完成,返回ACKBroker处理Ack请求,在内部进行消费完成的位点更新。 可以用下图表示: 三、核心代码 3.1 核心Processor 3.1.1 PopMessageProcessor 该线程用于处理客户端发出的POP请求。在收到POP请求后,将处理该请求,找到待POP的队列: ```java int index = (PermName.isPriority(topicConfig.getPerm()) ? i : randomQ + i) % queueIdList.size(); int queueId = queueIdList.get(index); if (brokerController.getBrokerConfig().isMarkTaintWhenPopQueueList()) { this.brokerController.getPullMessageProcessor().getPullRebalanceManager() .addOrUpdateTaint(requestHeader.getTopic(), requestHeader.getConsumerGroup(), queueId); } if (queueId = 0 && queueId 1) { // 通过位运算,标记指定下标的CK消息被ACK标记。 markBitCAS(pointWrapper.getBits(), indexOfAck); } ``` 在上述存储完成后,后续通过其它线程进行处理。处理包括CK、AK的存储,以及抵消等操作。主要在3.2节中阐述。 3.1.3 ChangeInvisibleTimeProcessor 该Processor主要用于修改消息不可见时间。此外,其中还定义了一些实用的方法,例如CK的buffer存储方法,以及持久化CK的方法。此处不过多展开。 3.2 核心Service 3.2.1 PopReviveService 该Service被AckMessageProcessor启动。该Service主要对重试队列中的消息进行merge,尝试消除此前没有CK、ACK的消息。 在该Service中,会定期消费Revive队列中的CK、ACK对。在其中会维护一个map,将CK和ACK进行存放与检索: ```java if (PopAckConstants.CK_TAG.equals(messageExt.getTags())) { ...... // 将CK存放入map map.put(point.getT() + point.getC() + point.getQ() + point.getSo() + point.getPt(), point); point.setRo(messageExt.getQueueOffset()); ...... } else if (PopAckConstants.ACK_TAG.equals(messageExt.getTags())) { ...... // 从map中检索是否有CK AckMsg ackMsg = JSON.parseObject(raw, AckMsg.class); String mergeKey = ackMsg.getT() + ackMsg.getC() + ackMsg.getQ() + ackMsg.getSo() + ackMsg.getPt(); PopCheckPoint point = map.get(mergeKey); ...... } ``` 在完成消息的取出后,会进行revive过程中未ACK消息的重试,以及位点提交: 3.2.2 PopBufferMergeService 该Service主要对Buffer中的CK、ACK进行检查。根据检查结果将展开不同的操作。此外,在该Service中还定义了许多实用的CK、ACK操作方法,能够对CK、ACK进行检查以及存储等相关操作。 该Service会启动一个线程,定期执行`scan()`方法。此外,在扫描达到一定次数之后,将执行`scanGarbage()`方法清理数据。 在`scan()`中将会执行如下操作: 1. 获取buffer中所有键值对,作为CK集合。 ```java Iterator iterator = buffer.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry entry = iterator.next(); PopCheckPointWrapper pointWrapper = entry.getValue(); ``` 2. 针对每一个CK集合,检查其中的每个CK是否均被ACK,如果已经被ACK,则删除该CK集合。 ```java if ((pointWrapper.isJustOffset() && pointWrapper.isCkStored()) || isCkDone(pointWrapper) || (isCkDoneForFinish(pointWrapper) && pointWrapper.isCkStored())) { if (brokerController.getBrokerConfig().isEnablePopLog()) { POP_LOGGER.info("[PopBuffer]ck done, {}", pointWrapper); } iterator.remove(); counter.decrementAndGet(); continue; } ``` 3. 针对未被ACK的CK,将CK、ACK信息进行存储,待一定时间后进行重试。 ```java if (removeCk) { // put buffer ak to store if (pointWrapper.getReviveQueueOffset() < 0) { putCkToStore(pointWrapper, false); countCk++; } if (!pointWrapper.isCkStored()) { continue; } for (byte i = 0; i < point.getN(); i++) { // reput buffer ak to store if (DataConverter.getBit(pointWrapper.getBits().get(), i) && !DataConverter.getBit(pointWrapper.getToStoreBits().get(), i)) { if (putAckToStore(pointWrapper, i)) { count++; markBitCAS(pointWrapper.getToStoreBits(), i); } } } ``` 在上述操作中,CK与ACK能够成对存储,且定时重试。直到消费者发送回成对的ACK、CK请求,这对请求方可抵消,也象征该消息被消费成功。 代码详解 可参考如下PPT中的介绍内容:
#技术探索

2025年5月12日

开源之夏 2025|Apache RocketMQ 社区项目期待你的参与!
开源之夏 2025 开源之夏是由中国科学院软件研究所“开源软件供应链点亮计划”发起并长期支持的一项暑期开源活动,旨在鼓励在校学生积极参与开源软件的开发维护,培养和发掘更多优秀的开发者,促进优秀开源软件社区的蓬勃发展,助力开源软件供应链建设。开源之夏于 2020 年正式发起,开源之夏 2025 是第六届活动。 活动联合各大开源社区,针对重要开源软件的开发与维护提供项目开发任务,并向全球高校学生开放报名。学生可自主选择感兴趣的项目进行申请,中选后在项目开发者(社区导师)的指导下进行开发。 通过参与活动,不仅可以结识开源界小伙伴和技术大牛,获得社区导师的专业指导,与开源项目开发者深度交流,还能获得丰富的项目实践经验,提升项目开发技能,为学习深造提供助力,为职业发展积攒履历。 此外,根据项目的难易程度和完成情况,结项者将获取开源之夏活动劳务报酬和结项证书。项目难度分为基础和进阶两档,对应结项劳务报酬分别为:税前 8000 元人民币和税前 12000 元人民币。 Apache RocketMQ 社区项目 Apache RocketMQ 于 2012 年诞生于阿里巴巴核心电商系统,于 2016 年捐赠给 Apache 基金会,于 2017 年成为 Apache 顶级项目。现 Apache RocketMQ 致力于构建低延迟、低成本、高可用的分布式“消息、事件、流”统一处理平台,覆盖“云、边、端”⼀体化数据处理场景,帮助企业和开发者在智能化时代,轻松构建事件驱动架构的云原生 / AI 原生应用。 本次开源之夏共提供 2 个课题项目: 1. RocketMQ 主备副本利用系统内置 Topic 完成元数据增量同步,项目社区导师:金融通 2. RocketMQ 路由反向更新机制,项目社区导师:ShannonDing 如何参与项目? 欢迎扫描上方海报二维码,查看 Apache RocketMQ 社区项目详情,其中有项目导师的姓名与联系邮箱,可通过邮件与导师进行沟通,并准备项目申请材料、提交项目申请,每位同学可以申请一个项目。 以下是开源之夏的活动流程,更多参与指南请查看
#社区动态

2024年12月20日

Apache RocketMQ 荣获 2024 开源创新榜单“年度开源项目”
近日,以“新纪天工、开物焕彩——致敬开源的力量”为活动主题的“重大科技成就发布会(首场)”在国家科技传播中心成功举办,并隆重揭晓了 2024 开源创新榜单,旨在致敬中国开源力量,传播推广开源科技成就,营造中国开源创新生态。 2024 年开源创新榜单由中国科协科学技术传播中心、中国计算机学会、中国通信学会、中国科学院软件研究所共同主办,中国开发者社区承办,以王怀民院士为首组建评审委员会,进行研讨评审,面向中国开源行业领域,遴选具有创新性、贡献度和影响力的开源项目、社区、应用场景与开源事件。 在评审出的 10 个年度开源项目中,Apache RocketMQ 成功入选。 Apache RocketMQ 社区近况 Apache RocketMQ 创新论文连续被软件工程顶级会议录用 (1)2024 年 9 月,由阿里云消息队列团队发表的关于 RocketMQ 锁性能优化论文《Beyond the Bottleneck: Enhancing HighConcurrency Systems with Lock Tuning》被 CCFA 类软件工程顶级会议 FM 2024 录用。 高并发系统常常面临性能瓶颈,主要是由于线程间激烈竞争锁导致的等待和上下文切换。作为一家云计算公司,我们非常重视性能的最大化。为此,我们对轻量级自旋锁进行了改进,并提出了一种简洁的参数微调策略,能够在最低风险条件下突破系统性能瓶颈。该策略在高吞吐量消息队列系统 Apache RocketMQ 中得到了验证,实现了 X86 CPU 性能提升 37.58% 和 ARM CPU 性能提升 32.82%。此外,我们还确认了这种方法在不同代码版本和 IO 刷新策略下的一致有效性,显示出其在实际应用中的广泛适用性。这项工作不仅为解决高并发系统的性能问题提供了实用工具,还突显了形式化技术在工程问题解决中的实际价值。 (2)2023 年 9 月,由阿里云消息队列团队发表的关于 RocketMQ 高可用范式设计论文《RocketHA: A Logbased Storage High Availability Paradigm for Messaging and Streaming Scenarios》被软件工程 CCFA 类顶级会议 ASE 2023 录用。 该论文详细探讨了 RocketMQ 在其发展历程中所蕴含的高可用性设计理念,凝聚了团队在行业应用中积累的宝贵经验。为了应对分布式系统中常见的故障,如崩溃和网络分区,RocketHA 提出了一种基于日志存储的高可用性设计框架。该框架由六个基本组件构成,旨在实现系统在面对各种故障时的自动集群恢复。具体而言,RocketHA 通过模块化设计,实现了消息、事件及流场景的高可用性,确保系统能够在发生意外故障时迅速且有效地恢复。此外,该设计还优先考虑了高吞吐量与数据丢失防护,以保障系统在进行大规模数据处理时的稳定性和可靠性。评估结果表明,RocketMQ 在多种负载和故障场景下都表现出卓越的高可用性和快速恢复能力。本文提出的 RocketHA 的设计理念可为其他基于日志存储的系统提供参考和借鉴,推动相关领域的研究与开发。 GSoC(Google Summer of Code) 2024 在谷歌主办的 GSoC 2024 中,Apache RocketMQ 开源社区共提报通过两个选题: 1. RocketMQ Dashboard Supports RocketMQ 5.0 Architecture and Enhances Usability:该题目旨在强化 RocketMQ 的开源控制台能力。 2. Optimizing Lock Mechanisms in Apache RocketMQ:该题目旨在优化锁行为,优化 RocketMQ 的性能以及资源占用。 两个题目均成功结项,第一个题目为 Apache RocketMQ 发布了 rocketmqdashboard 2.0.0,自此RocketMQ Dashboard 支持 Apache RocketMQ 5.0 。第二个题目创新性地提出了 ABS 锁,为轻量化的自旋锁提供了一套退避策略,从而实现低成本、有限制的锁自旋行为,同时适应不同强度的资源争抢情况 Apache RocketMQ 社区 5.3.0、5.3.1 版本发布 Apache RocketMQ 社区近期发布了 5.3.0 和 5.3.1 两个版本,两个版本主要修复现有的 bug 并提升系统的整体稳定性和性能。值得一提的是,Apache RocketMQ 5.3.0 引入了 Apache RocketMQ ACL 2.0 支持,为用户带来了更加灵活和安全的访问控制机制。这些改进和新增功能将显著提升 Apache RocketMQ 在生产环境中的稳定性和安全性,进一步满足用户的业务需求。 Apache RocketMQ 中文社区全新升级 2024 年 7 月,Apache RocketMQ 中文社区(https://rocketmq.io)全新升级,致力于为每一位热衷于 RocketMQ 技术探索与实践的开发者,打造一个集时效性、全面性、深度于一体的一站式学习平台。 最全最新资讯: Apache RocketMQ 中文社区提供从基础到深入的全面学习资料,涵盖原理介绍、架构解读、源码分析等基础知识,高级性能使用、技术前沿探索、场景最佳实践等博客文章,用户反馈的真实答疑样例等,并及时更新版本发布、架构演进和功能迭代等社区动态,以及社区相关活动和会议信息,为您提供更多学习和交流的机会。 智能专家答疑: Apache RocketMQ 中文社区基于 Apache RocketMQ 领域专业知识库,并结合先进的大模型技术进行优化,为您提供 AI 问答助手,作为您的智能学习伴侣。通过自然语言问答,让您的疑问得到迅速解答,使您的学习之旅更加轻松有趣。 关于 Apache RocketMQ RocketMQ 致力于构建低延迟、高并发、高可用、高可靠的分布式“消息、事件、流”统一处理平台,覆盖云边端⼀体化数据处理场景,帮助企业和开发者在智能化时代,轻松构建事件驱动架构的云原生应用。 RocketMQ 自 2012 年诞生于阿里巴巴集团的核心交易链路,至今已经历十余年“双十一”的万亿级数据洪峰验证。2015 年,阿里云面向企业提供商业化的消息队列服务,其中包括云消息队列 RocketMQ 版。2016 年,阿里巴巴向 Apache 软件基金会捐赠了 RocketMQ 项目,RocketMQ 进入 Apache 孵化器。2017 年,Apache RocketMQ 成为 Apache 顶级项目,在开源消息中间件领域占据领导地位。2022 年,Apache RocketMQ 5.0 正式发布,全面拥抱云原生架构、超融合架构,进一步拓展事件驱动、物联网等场景。
#社区动态

2024年11月18日

基于Apache RocketMQ 事件驱动架构的 AI 应用实践
AI 应用在商业化服务的阶段会面临诸多挑战,比如更快的服务交付速度,更实时、精准的结果以及更人性化的体验等,传统架构限制于同步交互,无法满足上述需求,本篇文章给大家分享一下如何基于事件驱动架构应对上述挑战。 盘点 AI 应用场景 在深入探讨事件驱动架构跟 AI 结合前,我们先梳理一下 AI 应用的现状。 从应用架构层面,大致可以把 AI 应用分为以下三类: 1)基于基础模型的扩展应用,典型的如 ChatGpt(文本生成)、StableDiffusion(图像生成)、CosyVoice(声音生成)等,这类应用通常会以模型能力为核心,提供相对原子化的服务。 2)智能知识库应用,如 Langchain chatchat,这类应用是以 LLM 为核心,基于 RAG(增强检索技术)构建的具有广泛的业务场景的应用。 3)智能体应用,智能体应用核心要点是应用以 LLM 为交互中枢,能够通过工具的调用联通外部世界,复杂的表现形式如多智能体协作等,是企业 AI 应用落地最具想象空间的一类应用。 浅析 AI “原生” 说到“原生”二字,它代表的是对某种概念的广泛认知,比如提移动原生应用立马可以联想到手机端的 APP,提云原生应用很多开发者立马可以想到容器化等,而对于 AI “原生”,除了 ChatGpt,Midjourney 等几款头部 AI 应用,我们似乎还没有看到像移动应用那样广泛的“原生”应用被定义出来,当然今天也没有办法给出明确的结论,只是通过一些事实,帮大家推演 AI “原生”的方向,希望能够帮助慢慢凝聚在内心中那个对“AI 原生”的影像。 AI 给应用架构带来的变化 当 AI 能力加入后,我们的应用架构发生了较大的变化。RAG,Agent 等编程范式被引入,传统的工作流也因为有了 AI 节点,变得与以往有所不同。 AI 应用架构RAG AI 应用架构Agent 加入 AI 节点的工作流 AI 应用的变化趋势 从观察知名 AI 厂商的产品形态演进看,AI 应用由前面提到的基础模型扩展、智能知识库、智能体三类叠加又相对分离,在慢慢向由智能体统一管控约束的方向发展。 比如 Open AI 的 Canvas,Claude Artifacts,Vercel v0 等产品特性。它们都表现出了一系列的共性:智能内核,多模态,LUI 交互。 从另外一个角度理解,AI 原生的应用只有突破之前的用户体验才有可能让用户买单。分散的基础模型能力,多模态能力都只能在某些场景下有体验提升,某些方面甚至不如传统应用的用户体验。所以需要整合,将对话式交互,智能模型和多模态叠加从而构建出超越传统应用的使用体验。 使用事件驱动构建 AI 原生 这里并不是单纯为了追求技术的先进性而使用事件驱动架构,是因为实践中顺序式的架构有时候无法满足业务需求。 传统顺序式的架构在构建 AI 原生的挑战 顺序调用无法保障推理体验 模型服务的推理耗时远高于传统意义的网络服务调用,比如在文生图这个场景下使用 StableDiffusion 服务,即使经过算法优化后最快也是秒级,并发量较大的时候,会很容易导致服务器宕机。此外如声音的合成,数字人的合成等耗时可能是分钟级的,此时顺序调用明显就不太合适。选择事件驱动的架构可以快速响应用户,推理服务按需执行,这样既能够保障用户体验,同时也降低系统宕机风险。 顺序调用无法支持实时数据构建的需求 在智能问答系统中,结果的好坏跟数据有很大的关系。问答召回数据的实时性和准确性很大程度影响着智能问答系统的用户体验,从系统架构层面,问答和数据的更新是分开的。靠人工去更新海量数据不现实,通过设置定时任务以及构建知识库数据更新的工作流能够更加有效的解决数据实时更新的问题,事件驱动架构在这个场景下优势非常明显。 双向互动场景无法实现 在问答服务场景下,拟人化的行为能够得到用户好感从而扩展商机,传统的问答式应用架构相对机械死板,而使用消息队列作为信息传输可以有效主动触达用户,通过合理的意图判断,主动向用户问好,是有效的留存手段。 事件驱动构建 AI 原生的实践 接下来分享一下基于事件驱动架构构建的 AI 应用的一些实践。 StableDiffusion 异步推理 前面提到了关于文生图模型 StableDiffusion 在服务客户中遇到的问题,我们利用事件驱动架构,使用函数计算和轻量消息队列(原 MNS)构建了 StableDiffusion 的异步推理架构,用户请求到来时经过函数计算网关到达 API 代理函数,API 代理函数对请求进行打标鉴权,之后将请求发送到 MNS 队列,同时记录请求的元数据和推理信息到表格存储 TableStore,推理函数根据任务队列进行消费,调度 GPU 实例启动 StableDiffusion 进行服务,结束后返回图片结果以及更新请求状态,端侧通过页面上的轮询告知用户。 VoiceAgent 实时对话 这是一个相对复杂的应用,使用者可以通过语音跟背后的智能问答服务实时对话,同时还能够接收到来自智能服务的主动询问。 整体依然采用事件驱动架构,其 RTC Server 部分安装 rocketmqclient,订阅中心化的服务 topic,由定时任务(主要是意图分析)触发向队列 topic 生产消息内容,然后由 rocketmqclient 消费,进行主动询问。 VoiceAgent 知识库实时数据流 对于问答的另外一端,知识库的自动更新,则是通过 Catch Data Capture 策略,比如由外部系统数据源触发,或者通过将文档上传 OSS 触发。数据经过切片,向量化之后存储到向量数据库以及全文检索数据库。 面向 AI 原生应用的事件驱动架构 最后分享一下作为 AI 应用开发者的一套组合方案:通过 阿里云云应用平台 CAP(Cloud Application Platform) 选出基础模型服务,如 Ollama,ComfyUI,Cosyvoice,Embedding 等进行快速托管,使用 RcoketMQ,Kafka,MNS, Eventbridge 等搭建数据流管道和消息中心,本地利用 Spring AI Alibaba 等框架开发后端服务,实现 RAG,Agent 等能力。前端使用 Nextjs 框架构建界面,之后将开发好的前后端通过 Serverless Devs 工具部署到 CAP 平台,进行线上调用访问,最终上生产采用云原生网关保驾护航,对于长期的知识库或者智能体的运维则通过 ARMS 进行指标监控。
作者:寒斜
#技术探索

2024年11月6日

Apache RocketMQ 打破锁性能瓶颈之道
背景 Apache RocketMQ 是一个云原生消息传递和流式处理平台,可简化创建事件驱动型应用程序的过程。多年来随着 RocketMQ 的迭代,已经编写了大量代码来利用多核处理器,通过并发提高程序效率。因此管理并发性能变得至关重要,同时锁对于确保在访问共享资源时多个执行线程安全同步至关重要。尽管锁对于确保多核系统中的互斥性是必不可少的,但它们的使用也可能带来优化挑战。随着并发系统内部变得越来越复杂,部署有效的锁管理策略是保持性能的关键。 因此,在 GSOC 2024 中,我们 Apache RocketMQ 开源社区提报了一个非常具有挑战性的题目:《GSOC269 : Optimizing Lock Mechanisms in Apache RocketMQ》。在这个题目中,我们旨在优化锁行为,优化 RocketMQ 的性能以及资源占用。通过这个题目,我们创新性地提出了 ABS 锁 —— Adaptive Backoff Spin Lock。ABS 锁的思想是通过为轻量化的自旋锁提供一套退避策略,从而实现低成本、有限制的锁自旋行为,同时适应不同强度的资源争抢情况。 之所以取其缩写 ABS,是因为该锁的设计思想与刹车系统中的 ABS 系统有一定相似性。在早期系统中使用的自旋锁会一直进行自旋,但是当临界区较大时会产生大量无效自旋,类似刹车制动中的“抱死”结果,这导致我们的资源利用急剧增长。为了避免这种资源损耗,当前社区使用互斥锁对其进行了替代,避免临界区较大时的资源浪费。但是在这种替代方案落实后,当消息较小时,互斥锁的阻塞唤醒机制反而影响到消息发送的响应时间,并带来了更高的 CPU 损耗。 由于自旋锁在临界区较小时的响应时间(RT)以及 CPU 利用方面都有较好的表现,唯一的不足是在资源争抢时会带来资源浪费。因此我们决定对自旋锁进行优化,降低无效的资源损耗,从而更好的利用自旋锁的优点,使其同时适合争抢激烈或不激烈的场景。在实践中,我们已经证明了调整锁策略会影响 RocketMQ 的消息发送性能,带来显著的性能优化结果。 通过 ABS 锁,我们还能解决开源使用者对锁的抉择问题:在 RocketMQ 消息投递时存在两种锁定机制,SpinLock(swapAndSet),以及 ReentrantLock 互斥锁,但并无文档对其进行分析使用的场景。所以我们通过 ABS 锁对其进行整合,达到最优状态并且完成服务端的锁定机制闭环,不需要用户去决定当前场景锁定机制的选取,自然而然地根据争抢情况对锁参数进行微调。ABS 锁可以根据运行时条件动态调整其行为,例如锁争用级别和争用同一资源的线程数。这可以通过最大限度地减少与锁获取和释放相关的开销来提高性能,尤其是在高争用的情况下。通过实时监控系统的性能指标,ABS 锁可以在不同的锁定策略之间切换。 我们目前已经实现自适应锁定机制(ABS 锁),通过实验结果成功验证:自适应锁定机制达到不同场景下单独使用互斥锁/自旋锁的最优效果,简而言之就是取得不同场景下最优锁定机制的效果。本文将详细介绍 Apache RocketMQ 的锁机制迭代过程,并介绍 ABS 锁的优化效果。 相关概念介绍 在文章正式开始前,需要介绍一些本文中可能频繁用到的概念:临界区、互斥锁、自旋锁。了解清楚这些概念将有助于阅读本文的优化思想。 临界区 临界区(Critical Section)是一段供线程独占式访问的代码,也就是说若有一线程正在访问该代码段,其它线程想要访问,只能等待当前线程离开该代码段方可进入,这样保证了线程安全。一般临界区大小是受多方面影响的,例如,本文中消息发送过程的临界区大小可能受消息体大小影响。 互斥锁 互斥锁是一种独占锁,当线程 A 加锁成功后,此时互斥锁已经被线程 A 独占了,只要线程 A 没有释放手中的锁,线程 B 就会失败,就会释放掉 CPU 给其他线程,线程 B 加锁的代码就会被阻塞。 互斥锁加锁失败而阻塞是由操作系统内核实现的,当加锁失败后,内核将线程置为睡眠状态,等到锁被释放后,内核会在合适的时机唤醒线程,当这个线程加锁成功后就可以继续执行。具体的互斥锁行为可以参考下图: 互斥锁带来的性能开销主要在两次线程上下文切换的成本。 1. 当线程加锁失败时,内核将线程的状态从【运行】切换到睡眠状态,然后把CPU切换给其他线程运行; 2. 当锁被释放时,之前睡眠状态的线程会变成就绪状态,然后内核就会在合适的时间把CPU切换给该线程运行。 自旋锁 自旋锁通过 CPU 提供的原子操作 CAS(CompareAndSet),在用户态完成加锁和解锁操作,不会主动产生线程上下文切换,所以相比互斥锁来说,会快一些开销小一些。它和互斥锁的主要区别在于,当加锁失败,互斥锁使用线程切换应对,而自旋锁用忙等待应对。 RocketMQ 的锁机制迭代 下面,我们将介绍 RocketMQ 的锁机制迭代过程。由于自旋锁、互斥锁有各自的优劣势:自旋锁的优势在于其轻量级和低上下文切换开销,适合短时间等待的情况;而互斥锁的优势在于能够节约资源,适合长时间占有锁的场景。 因此本文中将其优势分别比喻为“鱼”和“熊掌”,我们将一步步介绍如何同时拿到最优质的“鱼”和“熊掌”。 鱼和熊掌的抉择——互斥/自旋 在早期为了减小线程上下文切换带来的资源损耗,Apache RocketMQ 选取了自旋锁进行实现。随着版本迭代 RocketMQ 的并发压力日益增长,导致当临界区较大时,自旋锁会产生大量无效自旋,这导致 Broker 的资源利用急剧增长。因此后期 RocketMQ 又使用互斥锁对其进行了替代,避免临界区较大时的资源浪费。 但是当消息较小时,互斥锁的阻塞唤醒机制反而影响到 RT 以及上下文切换引起的更高的 CPU 损耗,自旋锁在临界区较小时的响应时间(RT)以及 CPU 利用方面都有较好的表现,因此在过去我们一直被锁定机制的正确选取所困扰。直到今日,RocketMQ 的内部仍然保留着这两种锁,且通过一个开关进行控制:useReentrantLockWhenPutMessage。 这意味着,使用者需要在启动 broker 时,决定自己的锁类型——如果消息体都比较小,且发送 TPS 并不大,则使用自旋锁;当消息体较大时,或者竞争极为激烈时,则启用互斥锁。 鱼和熊掌兼得—— k 次退避锁 为了解决这个问题,我们启动了锁优化的工作。我们希望有一把锁能够同时具备自旋锁、互斥锁的特点,同时适用于竞争激烈和不激烈的情况。我们最终决定改造自旋锁,通过一把特殊的自旋锁,使系统在各种竞争情况下都保持非常优质的锁行为。自旋锁由于无限自旋直到获取到锁,在临界区较大时会产生较多的空转,耗费大量的 CPU 资源。为了能有效利用自旋锁的优势,因此我们要在临界区较大时对其空转次数的控制,从而避免大量空转,最大程度兼容临界区较大的场景。 最终我们通过对自旋锁的行为建模,提出了 k 次退避锁:进行 K 次自旋后还未获得锁后,执行 Thread.yield() 将 CPU 执行权交给操作系统。这种行为能够避免互斥锁的无谓上下文切换,也能避免高压场景下的无限自旋带来的 CPU 损耗。 这种行为能够缓解系统压力,取得自旋和 CPU 上下文切换两中方法中的最低资源损耗。 在刚过去不久的 FM 24 会议上,Juntao Ji 已经做出 k 次自旋锁的相关理论建模分享,以及实验验证[3]。在k次自旋锁的作用下,我们能找到系统性能的局部最优点,达到最大的 tps 性能。结果如下表所示: 以 X86 架构,同步刷盘的行为为例。实验结果表明,在 k= 10^3 时,发送速度不仅达到峰值(155019.20),CPU 使用率也达到最低。这表明退避策略成功地节省了 CPU 资源。此时,CPU 支持更高的性能水平和较低的利用率水平,这表明性能瓶颈已经转移——例如,可能已经转移到了磁盘上。在表中可以观察到,在具有相同的 k(10^3)和配置参数(最新代码,SYNC 刷盘模式)的 ARM CPU 上,RocketMQ 的性能提高了 10.4%。此外,如上图所示,当 k= 10^3 时,CPU 使用量大幅下降,从平均超过 1000% 下降到 750% 左右。资源消耗的减少表明,减轻其他系统瓶颈可能会导致更显著的性能提高。 最优质的鱼和熊掌—— ABS 锁 在上文已经证明了 k 次退避锁的有效性。但是当前还有一个问题:我们发现当临界区足够大时,自旋锁的资源损耗依旧远超互斥锁(多次退避仍然获取不到)。这样 k 值其实带来的影响是负面的,也就是说,最终逃避不了上下文切换的成本,反而还带来了自旋的等待成本。 因此我们决定通过实现 k 的动态调整再次优化,当临界区较大时,对k进行自适应增大,当达到与互斥锁相同级别资源损耗时(即 k 达到自适应的最大值),此时自旋锁已经不再适合此种场景,因此我们将对其进行互斥锁的切换。这也是我们最终要实现的 ABS 锁。 1. ABS 锁实现 我们通过对不同的锁定机制进行理论分析其所适合的场景,并对其在所适合的场景进行大量实验测试得出多个场景的最优锁定机制。最后通过对运行时条件的动态变化(竞争线程数/TPS/消息大小/临界区大小)进行最优锁定机制的切换。 2. ABS 锁的 K 值自适应策略 上面我们论证出控制自旋次数对于性能优化有不错的效果,但是这个 K 值对于不同系统是不一样的,因此我们需要实现自旋次数 K 的自适应。 简而言之,K 的自适应策略就是一种从低频次自旋到高频次自旋的演化过程,对应临界区争抢变得不断严重或是临界区不断变大的过程。当 k 逐渐增大的过程中,可以增加在线程退避之前就获取到锁的概率,但是当自旋次数增加到一定数量级时,此时自旋成本已经高于线程上下文切换的成本,说明此时已经不适合使用自旋锁——所以此时退化为互斥锁。 通过实验,我们将自旋次数 K 自适应最大值设置为1万,因为在实验中,我们发现当自旋次数大于1万时,竞争激烈时带来的优化效果会受到显著影响,甚至大于一次 CPU 上下文切换的代价。所以此时我们将其切换为互斥阻塞等待锁定机制。 为了自适应调整 K 值,我们提出了一个闭环的工作流,如下图所示: 在图中我们可以看到,我们主要衡量当前 k 值下,自旋获取锁的成功率。如果在当前 k 值下获取锁的成功率不够高,则适当增加 k 值,这将带来更大的获取概率。但是如果一直增加都无法有效提高拿锁成功率,则将其转为互斥锁,会带来更高的效益。 成为互斥锁代表当前可能有较高的突发流量,导致对锁的竞争变得激烈了。但是互斥锁是不适用于低压场景的,所以我们还需要决定如何从互斥锁转回自旋锁。因此我们记录了从自旋锁切换为互斥锁的请求速率。当整体请求速率低于这个数值的 80% 时,则切换回自旋锁。 实验及结果 为了验证 ABS 锁的正确性以及性能优化效果,我们做了多组实验,包括性能测试以及混沌故障测试。本章将介绍具体的实验设计以及结果。 1. 性能实验配置 1. namesrv 一台,broker一台,openmessagebenchmark 一台压测机 2. 上述三台机器的配置为,处理器:8 vCPU,内存:16 GiB,规格族:ecs.c7.2xlarge,公网带宽:5Mbps,内网带宽:5/ 最高 10 Gbps。 3. 消息体大小分别设置了1KB以及2B两种场景,用于影响发送消息时的临界区大小。 2. 性能测试结果 CPU 与耗时的优化 消息 body 大小 1 kb 时,我们记录了不同消息发送速率下的 CPU 占用情况。结果如下图所示: 结果表明,在消息量不断增加时,我们的自适应锁带来了非常明显的优势,有效降低了 CPU 的使用率。 另外,我们还对消息发送时的 P9999 做了记录。P9999 代表发送过程中,发送耗时排在 99.99% 的尾部请求,一般反映了这段时间内的最慢请求速度。结果如下图所示: 可以看到,在不同 TPS 下,我们的自适应锁都能带来更优质的 P9999,有效降低了发送过程中由于锁争抢带来的耗时。 不过,尾部的请求情况一般代表竞争极为激烈时的极端场景。可能有的消息经过多次锁请求尝试但是都未获得锁,因此导致了这种长尾效应。我们还测试了在不同 TPS 下的平均发送耗时情况,发现在竞争极为严重时,我们的 ABS 锁由于自带了一定自旋,所以会让平均延时大约提升 0.5 ms。使用时需要仔细衡量具体场景,以确定是否在高压时开启自适应锁。 我们同时还针对消息体更小的实验场景做了实验,下表是完整的测试结果: 最大性能提升 此外,为了计算由自适应锁带来的性能提升,我们还测试了 Broker 的最大性能,结果如下所示: | CPU Arch | Flush Policy | Original QPS | Optimal QPS | Improvement | | | | | | | | X86 | ASYNC | 176312.35 | 184214.98 | +4.47% | | X86 | SYNC | 177403.12 | 187215.47 | +5.56% | | ARM | ASYNC | 185321.49 | 206431.82 | +11.44% | | ARM | SYNC | 188312.17 | 212314.43 | +12.85% | 根据该表,我们可以认为自适应锁同时能够在多个场景下均找到最大的性能点,实现性能的释放。 实验总结 经过如上所有实验数据,我们可以得出如下结论: 1. 在临界区较小时,ABS 锁提供更高的 TPS 以及更低的响应延时,极限 TPS 提高 12.85% 甚至更高,极限 TPS 下响应时间降低了 50% 左右; 2. 在临界区较大时,ABS 锁提供更低的 RT 以及更低的资源损耗,CPU 损耗从 500%400%,减少无效资源浪费。 3. 故障测试 在软件工程领域,提倡"拥抱故障"的理念意味着认识到错误和异常是正常的一部分,而非完全避免。像 RocketMQ 这样的关键系统,它在生产环境中承受的压力远超于理想化的实验室测试。为此,采用了混沌工程策略,这是一种主动探寻系统极限和脆弱性的实践。 混沌工程的核心目标是增强系统的鲁棒性和容错能力,它是通过在实际操作中人为制造故障,如网络延迟、资源限制等,来观察系统如何应对突发状况。这样做是为了确认系统是否能在不确定和复杂的现实中保持稳定,能否在面对真实世界的问题时依然能高效运作。 RocketMQ 引入自适应锁定机制后,进行了严格的混沌工程实验,包括但不限于模拟分布式节点间的通信故障、负载峰值等情况,目的是验证新机制在压力下的性能和恢复能力。只有当系统经受住这种高强度的“拷打”测试,证明它能在不断变化的环境中维持高可用性,我们才认为这是一个成熟的、可靠的解决方案。 总结起来,通过混沌工程,我们对 RocketMQ 进行了实战演练,以此来衡量其实现高可用性的真正实力。 并希望通过这样的故障测试,证明我们对锁的调整不影响其数据写入的正确性。 实验配置 我们混沌测试的验证实验环境如下: 1. namesrv 一台,内含 namesrv 进程。 2. openchaos 的混沌测试一台,向 broker 发出控制指令。 3. broker 三台,同属一个集群,内含 broker 进程。 上述五台机器的配置为,处理器:8 vCPU,内存:16 GiB,规格族:ecs.c7.2xlarge,公网带宽:5Mbps,内网带宽:5/ 最高 10 Gbps。 在测试中,我们设置了如下的若干种随机测试场景,每种场景都会持续至少 30 秒,且恢复后会保证 30 秒的时间间隔再注入下一次故障: 1. 机器宕机,这个混沌故障注入通过 kill 9 命令实现,将会杀死范围内的随机进程。 2. 机器夯机,这个混沌故障注入通过kill SIGSTOP 命令实现,模拟进程暂停的情况。 每组场景的测试至少重复 5 次,每次至少持续 60 分钟。 实验结论 针对上述提出的所有场景,混沌测试的总时长至少有: 2(场景数) 5(每组测试次数) 60(单组时长) = 600分钟 由于设置的注入时长 30s,恢复时长 30s,因此至少共计注入故障 600/1 = 600 次(实际上注入时长、注入次数远多于上述统计值)。 在这些记录在册的测试结果中,RocketMQ 无消息丢失,数据在故障注入前后均保持强一致。 总结 本文实现了 RocketMQ 锁定机制的迭代过程以及自适应退避自旋锁机制(ABS 锁)的设计以及实现,随着并发系统内部变得越来越复杂,部署有效的锁管理策略是保持性能的关键。因此,我们希望更深入地探索该领域性能优化的潜力,探索性能的极限。 同时我们希望在未来结合多种分布式锁定机制以及其他优秀思想,对其进行实现以及性能测试,期待达到多端本地锁定状态共识:单次通信即可得到锁的成果,减少无效网络通信以及更低的总线资源损耗。但目前专注于消息投递时的锁定机制实现,并未对其进行具体实现以及测试。其它分布式锁机制可以参考如下。 未来可能引入的其它锁优化思想 延迟槽位思想(Delay card slot idea): 对每个客户端分配不同槽位的延时,让客户端进行延迟后再次请求,从而避免总线的大量无效碰撞。 CLH 思想: 循环读取上一个位点的值,更改自身状态。 MCS 思想: 与 CLH 相似,但是时在本地自旋,更改下一个节点的状态。解决了 CLH 在 NUMA 系统架构中获取 locked 域状态内存过远的问题 SMP CPU 架构 基于 Ppersistent CSMA 思想: Ppersistent CSMA(Persistent Carrier Sense Multiple Access)是一种网络通信协议的思想,它通常用于描述分布式环境中如无线局域网(WiFi)中的冲突避免策略。这个思想起源于 CSMA/CD(Carrier Sense Multiple Access with Collision Detection)协议,但它引入了一种更持久的监听机制。 1. 基本流程: 每个设备(节点)在发送数据前会持续监听信道是否空闲。如果信道忙,节点就会等待一段时间后再次尝试。 2. P计数器: 当检测到信道忙时,节点不会立即退回到等待状态,而是启动一个名为 P(persistent period)的计数器。计数器结束后,再尝试发送。 3. 碰撞处理: 如果有多个节点同时开始发送,在信道上发生碰撞,所有参与碰撞的节点都会注意到并增加各自的P计数器后再试。这使得试图发送的数据包在更长的时间内有机会通过,提高了整体效率。 4. 恢复阶段: 当 P 计数器归零后,如果信道依然繁忙,节点会进入恢复阶段,选择一个新的随机延迟时间后再次尝试。 Ppersistent CSMA 通过这种设计减少不必要的冲突次数,但可能会导致网络拥塞情况下的长时间空闲等待。 参考链接 [1] Apache RocketMQ [2] OpenMessaging OpenChaos [3] Ji, Juntao & Gu, Yinyou & Fu, Yubao & Lin, Qingshan. (2024). Beyond the Bottleneck: Enhancing HighConcurrency Systems with Lock Tuning. 10.1007/9783031711770_20. [4] Chaos Engineering [5] T. E. Anderson, "The performance of spin lock alternatives for sharedmoney multiprocessors," in IEEE Transactions on Parallel and Distributed Systems, vol. 1, no. 1, pp. 616, Jan. 1990, doi: 10.1109/71.80120 [6] Y. Woo, S. Kim, C. Kim and E. Seo, "Catnap: A Backoff Scheme for Kernel Spinlocks in ManyCore Systems," in IEEE Access, vol. 8, pp. 2984229856, 2020, doi: 10.1109/ACCESS.2020.2970998 [7] L. Li, P. Wagner, A. Mayer, T. Wild and A. Herkersdorf, "A nonintrusive, operating system independent spinlock profiler for embedded multicore systems," Design, Automation & Test in Europe Conference & Exhibition (DATE), 2017, Lausanne, Switzerland, 2017, pp. 322325, doi: 10.23919/DATE.2017.7927009. [8] OpenMessaging benchmark
作者:王怀远、季俊涛
#技术探索

2024年10月21日

基于 Apache RocketMQ 的 ApsaraMQ Serverless 架构升级
_本文整理于 2024 年云栖大会阿里云智能集团高级技术专家金吉祥(牟羽)带来的主题演讲《ApsaraMQ Serverless 能力再升级,事件驱动架构赋能 AI 应用》_ 云消息队列 ApsaraMQ 全系列产品 Serverless 化,支持按量付费、自适应弹性、跨可用区容灾,帮助客户降低使用和维护成本,专注业务创新。那 ApsaraMQ 是如何一步一步完成 Serverless 能力升级的?在智能化时代,我们的事件驱动架构又是如何拥抱 AI、赋能 AI 的? 本次分享将从以下四个方面展开: + 首先,回顾 ApsaraMQ Serverless 化的完整历程,即 ApsaraMQ 从阿里内部诞生、开源,到捐赠给 Apache 进行孵化,再到完成 Serverless 化升级的不断突破、与时俱进的过程。 + 然后,重点介绍 ApsaraMQ 的存算分离架构,这是全面 Serverless 化进程中不可或缺的前提。 + 接下来,会从技术层面重点解读近一年 ApsaraMQ Serverless 能力的演进升级,包括弹性能力的升级、基于 RDMA 进一步降低存储和计算层之间的 CPU 开销,以及对无感扩缩容的优化。 + 最后,介绍我们在面向 AI 原生应用的事件驱动架构上的探索,以及基于阿里云事件总线定向更新向量数据库,为 AI 应用融入实时最新数据的最佳实践。 ApsaraMQ Serverless 化历程 首先,我们来看 ApsaraMQ Serverless 化的整个发展历程。 + 2012 年,RocketMQ 在阿里巴巴内部诞生,并将代码在 Github 上开源; + 2016 年,云消息队列 RocketMQ 版开启商业化的同时,阿里云将 RocketMQ 捐赠给了 Apache,进入孵化期; + 2017 年,RocketMQ 以较短的时间孵化成为 Apache TLP,并快速迭代新功能特性,顺利发布 4.0 版本,支持了顺序、事务、定时等特殊类型的消息; + 2018 年,随着大数据、互联网技术的发展,数据量爆发式增长,云消息队列 Kafka 版商业化发布; + 2019 年,云消息队列 RabbitMQ 版、云消息队列 MQTT 版两款产品商业化发布,补齐了 ApsaraMQ 在 AMQP、MQTT 协议适配上的缺失; + 2021 年,经过一段时间的孵化,ApsaraMQ 家族中的另一款产品事件总线 EventBridge 开始公测,开始融合事件、消息、流处理; + 2023 年,ApsaraMQ 全系列产品依托存算分离架构,完成 Serverless 化升级,打造事件、消息、流一体的融合型消息处理平台; + 今年,我们专注提升核心技术能力,包括秒级弹性、无感发布、计算存储层之间的 RDMA 等,实现 Serverless 能力的进一步升级,并结合当下客户需求,通过事件驱动架构赋能 AI 原生应用。 存算分离架构全景 第二部分,我们来看 ApsaraMQ 存算分离架构的全景,这是 Serverless 化升级不可或缺的前序准备。 ApsaraMQ 存算分离架构全景:云原生时代的选择 ApsaraMQ 的存算分离架构,是云原生时代的选择。 + 从下往上看,这套架构是完全构建在云原生基础之上的,整个架构 K8s 化,充分利用 IaaS 层的资源弹性能力,同时也基于 OpenTelemetry 的标准实现了metrics、tracing 和 logging 的标准化。 + 再往上一层是基于阿里云飞天盘古构建的存储层,存储节点身份对等,Leaderless 化,副本数量灵活选择,可以在降低存储成本和保证消息可靠性之间实现较好的平衡。 + 再往上一层是在本次架构升级中独立抽出来的计算层,即无状态消息代理层,负责访问控制、模型抽象、协议适配、消息治理等。比较关键的点是,它是无状态的,可独立于存储层进行弹性。 + 在用户接入层,我们基于云原生的通信标准 gRPC 开发了一个全新的轻量级 SDK,与之前的富客户端形成了很好的互补。 “Proxy” 需要代理什么? 接下来我们重点看下这套架构里的核心点,即独立抽出来的 Proxy。 它是一层代理,那到底需要代理什么呢? 在之前的架构中,客户端与 Broker/NameServer 是直连模式,我们需要在中间插入一个代理层,由原先的二层变成三层。然后,将原先客户端侧部分业务逻辑下移,Broker、Namesrv 的部分业务逻辑上移,至中间的代理层,并始终坚持一个原则:往代理层迁移的能力必须是无状态的。 从这张图中,我们将原先客户端里面比较重的负载均衡、流量治理(小黑屋机制)以及 Push/Pull 的消费模型下移至了 Proxy,将原先 Broker 和 Namesrv 侧的访问控制(ACL)、客户端治理、消息路由等无状态能力上移至了 Proxy。然后在 Proxy 上进行模块化设计,抽象出访问控制、多协议适配、通用业务能力、流量治理以及通用的可观测性。 ApsaraMQ 存算分离的技术架构 接下来看 ApsaraMQ 存算分离的技术架构,在原先的架构基础上剥离出了无状态 Proxy 组件,承接客户端侧所有的请求和流量;将无状态 Broker,即消息存储引擎和共享存储层进行剥离,让存储引擎的职责更加聚焦和收敛的同时,充分发挥共享存储层的冷热数据分离、跨可用区容灾的核心优势。 这个架构中无状态 Proxy 承担的职责包括: 1. 多协议适配: 能够识别多种协议的请求,包括 remoting、gRPC 以及 Http 等协议,然后统一翻译成 Proxy 到 Broker、Namesrv 的 remoting 协议。 2. 流量治理、分发: Proxy 具备按照不同维度去识别客户端流量的能力,然后根据分发规则将客户端的流量导到后端正确的 Broker 集群上。 3. 通用业务能力扩展: 包含但不限于访问控制、消息的 Tracing 以及可观测性等。 4. Topic 路由代理: Proxy 还代理了 Namesrv 的 TopicRoute 功能,客户端可以向 Proxy 问询某个 topic 的详细路由信息。 5. 消费模型升级: 使用 Pop 模式来避免单客户端消费卡住导致消息堆积的历史问题。 无状态 Broker,即消息存储引擎的职责更加的聚焦和收敛,包括: 1. 核心存储能力的迭代: 专注消息存储能力的演进,包括消息缓存、索引的构建、消费状态的维护以及高可用的切换等。 2. 存储模型的抽象: 负责冷热存储的模型统一抽象。 共享存储为无状态 Broker 交付了核心的消息读写的能力,包括: 1. 热存储 EBS: 热存储 EBS,提供高性能、高可用存储能力。 2. 冷存储 OSS: 将冷数据卸载到对象存储 OSS,获取无限低成本存储空间。 3. 跨可用区容灾: 基于 OSS 的同城冗余、Regional EBS 的核心能力构建跨可用区容灾,交付一写多读的消息存储能力。 基于云存储的存算分离架构,兼顾消息可靠性和成本 存算分离架构中的存储层是完全构建在阿里云飞天盘古存储体系之上的。基于这套架构,我们能够灵活控制消息的副本数量,在保证消息可靠性和降低存储成本之间做到一个比较好的平衡。 左图是存算分离存储架构中上传和读取的流程。可以看到,我们是在 CommitLog 异步构建 consumeQueue 和 Index 的过程中额外添加了按照 topic 拆分上传到对象存储的过程;在读取过程中智能识别读取消息的存储 Level,然后进行定向化读取。 基于云存储的架构,我们提供了 ApsaraMQ 的核心能力,包括: 1. 超长时间定时消息: 超过一定时间的定时消息所在的时间轮会保存至对象存储上,快到期时时间轮再拉回到本地进行秒级精准定时。 2. 集群缩容自动接管: 消息数据实时同步到对象存储,对象存储的数据能够动态挂载到任意 Broker,实现彻底存算分离,Broker 的无状态化。 3. 按需设置 TTL 成本优化: 支持按需设置消息 TTL,不同重要程度的消息可设置不同的 TTL,满足消息保存时长需求的同时兼顾成本控制。 4. 冷热消息分离、分段预取: 热数据的读取命中本地存储,速度快;冷数据的读取命中远端存储,速率恒定且不会影响热数据的读取。 5. 动态调控云存储的比例: 动态调控 EBS 和 OSS 的比例,大比例的 EBS 能够具备更高的稳定性,应对 OSS 负载过高的背压,提升 EBS 作为 OSS 的前置容灾的能力;小比例的 EBS 则是可以最大化降低消息单位存储成本,让整体的存储成本趋同于 OSS 存储成本。 Serverless 能力再升级 有了存算分离架构的铺垫,ApsaraMQ 全系列产品 Serverless 化就更加顺畅了。接下来展开介绍 ApsaraMQ Serverless 能力的升级。 消息场景下的 Serverless 化 在消息场景下通常会有两个角色:消息服务的使用方和提供方。 在传统架构中通常是这样的流程:使用方会给提供方提需求:我要大促了,你保障下;提供方说给你部署 10 台够不够,使用方说好的;结果真到大促那一刻,真实流量比预估的要大很多倍,整个消息服务被击穿,硬生生挂了半小时。 在 Serverless 化改造前,仍需提前规划容量;但相比传统架构的提升点是,依托 IaaS 层的快速扩容能力,能够极大缩短扩容时间;再演进到当前的 Serverless 架构,我们期望消息服务提供方是能够非常淡定地应对大促。 Serverless 现在已经成为了一个趋势和云的发展方向。ApsaraMQ 全线产品实现弹性灵活的 Serverless 架构,既彰显了技术架构的先进性,也提升了客户的安全感。业务部门减少了容量评估的沟通成本,用多少付多少,更省成本;运维部门免去了容量规划,实现自动弹性扩缩,降低运维人员投入的同时,大大提升了资源的利用率。 Serverless 方案的 Why / What / How Serverless 化预期达到的收益是:省心——秒级弹性,免容量规划;省钱——用多少付多少;省力——少运维、免运维。 要解决的痛点是:用户使用容量模型比较难做精准预估;运维侧手动扩容,是一个非常耗时耗力的过程。 核心目标是: + 弹性要快: 特别是针对一些脉冲型的秒杀业务,需要具备秒级万 TPS 的弹性能力。 + 缩容无感: 应对 MQ 客户端与服务侧 TCP 长连接的特性,缩容、服务端发布时要无感。 + 成本要低: 极致的性能优化,才能进一步降低用户侧的单位 TPS 成本。 通过如下几个关键路径构建 ApsaraMQ Serverless 核心能力: + 多租、安全隔离、业务流量隔离: 是构建 Serverless 核心能力的基础。 + 物理预弹&逻辑弹性: 物理预弹和逻辑弹性结合的极致弹性方案,通过镜像加速、元数据批量创建、主动路由更新等核心技术加速物理弹性,结合逻辑弹性最终交付秒级万 TPS 的极致弹性能力。 + RDMA: 在存储和计算层之间引入 RDMA 技术,充分利用 RDMA 的零拷贝、内核旁路、CPU 卸载等特性进一步降低计算层与存储层之间的通信开销。 + 优雅上下线: 依托 gRPC 协议的 GOAWAY 以及 Remoting 协议的扩展实现 TCP 长连接的优雅上下线。 + 控制资源水位: 通过智能化的集群流量调度以及平滑 Topic 迁移方案,进一步控制单个集群的资源水位在一个合理的值。 这套 Serverless 方案可以同时满足如下几种场景: + 第一种是平稳型:流量上升到一定水位后会平稳运行。 + 第二种是稳中有“进”型:流量平稳运行的同时,偶尔会有一些小脉冲。 + 第三种是周期性“脉冲型”:流量会周期性地变化。 计算、存储与网络:充分利用云的弹性能力 我们前面也有提到,这套架构是完全构建在云原生基础设施之上的,我们在计算、存储、网络上都充分利用了云的弹性能力,具体来看: + 在计算侧,通过 K8s 充分利用 ECS 的弹性能力,通过弹性资源池、HPA 等核心技术支持计算层的快速弹性,并通过跨可用区部署提升可用性。 + 在存储侧,充分利用飞天盘古存储的弹性能力,支持自定义消息的存储时长,冷热数据分离,额外保障冷读的 SLA。 + 在网络侧,公网随开随用,安全和方便兼具,支持多种私网形态接入,并基于 CEN 构建全球互通的消息网络。 在这之上,结合 SRE 平台的智能集群流量调度、集群水位监控、物理预弹性等核心能力,最终交付了一套完整的 ApsaraMQ Serverless 化物理弹性能力。 秒级万 TPS 的极致弹性能力保障 依托于上面的基础物理资源的弹性能力,来看下我们是如何保障秒级万 TPS 的极致弹性能力的? 从两个维度来看用户视角的弹性: + 从限流维度看: 在无损弹性上限以内的 TPS,都不会触发限流;超过无损弹性 TPS 上限后,会有秒级别的限流,通过逻辑弹性调整实例级别的限流阈值后,即可实现最终的 TPS 弹性。 + 从付费角度看: 在预留规格内按规格进行预付费;超过预留规格的上限 TPS,超过部分按量付费,即用多少付多少。 为了满足用户视角的秒级弹性能力,我们通过物理弹性和逻辑弹性相结合的方式来进行保障: + 物理弹性是预弹的机制,弹性时间在分钟级,用户侧无任何感知,由服务侧来 Cover 成本。 + 逻辑弹性是实时生效的,弹性时间在秒级,同时在触发无损弹性 TPS 上限时,用户侧是会有秒级别的限流感知的,另一方面,用户是需要为弹出来的那部分流量进行按量付费的。 两者的关系是:物理弹性是逻辑弹性的支撑,逻辑弹性依赖物理弹性。从弹性流程上看,当用户的流量触发无损弹性上限时,优先判断物理资源是否充足,充足的话就进行秒级逻辑弹性,不充足的话就等待物理资源扩容后再进行逻辑弹性。当然这里会有个预弹的机制,尽量保障逻辑弹性时物理资源都是充足的。 从物理弹性加速来看,通过以下几个方面进行加速: + 计算、存储层按需弹性: 计算层更关注 CPU、客户端连接数等核心指标;存储层更关注内存、磁盘 IO 等性能指标。 + 镜像下载加速: 通过 PlaceHolder + 镜像缓存组件加速新节点的扩容。 + 新增元数据批量创建的机制: 新增存储节点创建 5000 级别的 Topic 下降至 3 秒。 + 新增主动路由更新机制: 降低存储节点物理扩容后承接读写流量的延迟。 我们的系统设计精密,旨在确保用户体验到极致的弹性能力,特别是实现每秒万次事务处理(TPS)的秒级弹性扩展。这一能力的保障策略围绕两个核心维度展开,并深度融合物理与逻辑弹性机制,确保在高吞吐需求下的无缝响应与成本效率。 ApsaraMQ on RDMA:降低计算与存储层之间通信开销 RDMA(全称远程内存直接访问)是一种高性能的网络通信技术,相比 TCP/IP 的网络模式,具有零拷贝、内核旁路、CPU 卸载等核心优势。ApsaraMQ Serverless 化架构具备存算分离的特点,非常适合在计算层和存储层之间引入 RDMA 技术,进一步降低内部组件之间的数据通信开销。 具体来看,计算层 Proxy 在接收到客户端各种协议的消息收发请求以后,通过内置的 Remoting Client 和存储层 Broker 进行数据交换。在原先的 TCP/IP 网络模式中,这些数据是需要从操作系统的用户态拷贝到内核态后,再经由网卡和对端进行交互。依托 RDMA 内核旁路特性,通过实现 RdmaEventLoop 的适配器,消息直接由用户态到 RDMA 网卡和对端进行交互。 从最终 BenchMark 的测试效果来看,引入 RDMA 技术后,存储层 Broker 的 CPU 资源消耗下降了 26.7%,计算层 Proxy 的 CPU 资源消耗下降了 10.1%,计算到存储的发送 RT 下降了 23.8%。 优雅上下线:ApsaraMQ Serverless 弹性如丝般顺滑 在 Serverless 场景下,物理资源的水平弹性扩缩是一个常态化的过程,同时结合 MQ 客户端和计算层 Proxy TCP 长连接的特点,在 Proxy 上下线过程中是比较容易出现连接断开的感知,比如客户端刚发出一个接收消息的请求,连接就被服务侧强行关闭了,是非常容易造成单次请求超时的异常的。 因此,我们通过 gRPC 协议 Http2 的 Go Away 机制以及 Remoting 协议层的扩展,结合 SLB 连接优雅终端的能力来实现 ApsaraMQ 在扩容态、缩容态、以及发布态的无感。 右图展示了缩容态下单台 Proxy 优雅下线的过程: 1. 当收到 Proxy0 需要下线的指令后,SLB 会将 Proxy0 摘除,保障新的连接不再建立到 Proxy0 上,同时 Proxy0 进入 Draining 状态,旧连接进行会话保持。 2. Proxy0 上收到新的请求后,返回的 Response Code 都更新为 Go Away;同时客户单收到 Go Away 的 Response 后,断开原先的连接,向 SLB 发起新建连接的请求。 3. SLB 收到新建连接的请求,会导流至 Proxy1。 4. 等待一段时间后 Proxy0 上的连接会自然消亡,最终达到无感下线的目的。 事件驱动架构赋能 AI 应用 AI 无疑是当今互联网行业的热门话题,同时也是本届云栖大会的核心议题之一。接下来,将阐述面向 AI 原生应用的事件驱动架构如何拥抱 AI,赋能 AI 应用蓬勃发展。 + 面向 AI 应用的实时处理,具备实时 Embedding 至向量数据库、更新私有数据存储的能力,全面提升 AI 应用实时性、个性化和准确度。 + 面向 AI 应用的异步解耦,解耦 AI 推理链路的快慢服务,加快应用响应速度。 + 面向 AI 应用的消息负载,ApsaraMQ 支持主动 Pop 消费模式,允许动态设置每一条消息的个性化消费时长. 同时也支持优先级队列,满足下游多个大模型服务的优先级调度。 + 面向 AI 应用的消息弹性,ApsaraMQ 提供全模式的 Serverless 弹性能力,支持纯按量和预留+弹性、定时弹性等多种流量配置模型; 最后,让我们来看下阿里云事件总线 EventBridge 是如何实现数据实时向量化,提升 AI 应用的实时性和准确度的? 阿里云事件总线的 Event Streaming 事件流处理框架,具备监听多样化数据源,然后经过多次 Transform 之后再投递给下游数据目标的原子能力;依托这个框架,我们是很容易去实现数据的实时向量化的,从 RocketMQ、Kafka、OSS 等多个源监听到实时数据流入之后,经过文本化、切片、向量化等操作,最终存入向量数据库,作为 AI 应用实时问答的多维度数据检索的依据,最终提升了 AI 应用的实时性和准确度。
作者:金吉祥
#行业实践 #云原生

2024年10月21日

Apache RocketMQ 创新论文被软件工程顶会 FM 2024 录用
近日,由阿里云消息队列团队发表的关于 RocketMQ 锁性能优化论文被 CCFA 类软件工程顶级会议 FM 2024 录用。 FM 2024 是由欧洲形式化方法协会(FME)组织的第 24 届国际研讨会,会议汇聚了来自各国的形式化研究学者,是形式化方法领域的顶级会议。FM 2021 强调形式化方法在广泛领域的开发和应用,包括软件、网络物理系统和基于计算机的综合系统。形式化方法以严格的数学化和机械化方法为基础来规约、构建和验证计算系统,是改善和确保计算系统质量的重要方法,其模型、技术和工具已延生成为计算思维的重要载体。 此次被录用的论文为《Beyond the Bottleneck: Enhancing HighConcurrency Systems with Lock Tuning》。此论文灵感来源于 RocketMQ 适配阿里云倚天 CPU 的性能优化过程中。RocketMQ 此前在发送消息的过程中存在两种锁:自旋锁和互斥锁。我们发现,不同 CPU 适合的锁行为并不相同。糟糕的锁行为可能导致性能的大幅下滑,而适配的锁行为能够在提升性能的同时降低资源损耗。这两种锁在版本迭代过程中,都在线上版本中使用过,且对于不同的版本来说,使用这两种锁可能带来截然不同的性能结果。 因此,本文旨在提出一种新的自适应 K 值退避锁,能够让高并发系统的部署者无需考虑两种锁的优劣势,只需使用一把锁即可实现性能的最优以及最低的资源损耗。 换言之,我们希望有一把锁能够同时具备自旋锁、互斥锁的特点,同时适用于竞争激烈和不激烈的情况。我们最终决定改造自旋锁,通过一把特殊的自旋锁,使系统在各种竞争情况下都保持非常优质的锁行为。自旋锁由于无限自旋直到获取到锁,在临界区较大时会产生较多的空转,耗费大量的 CPU 资源。为了能有效利用自旋锁的优势,因此我们要在临界区较大时对其空转次数的控制,从而避免大量空转,最大程度兼容临界区较大的场景。 最终,我们基于排队论,通过对自旋锁的行为建模,得到了自旋次数与系统负载的关系: 公式中,是一把锁的整体期望获取时间。它分别由两部分组成:期望自旋耗时以及期望上下文切换耗时。将二者与自旋次数 K 和系统负载 P 的关系代入,则得到了上述的最终公式。公式中的 Ts 是单次自旋耗时,Tc 是单次上下文切换耗时。 我们最终基于系统的最大压力场景提出了自适应 K 值退避锁:进行 K 次自旋后还未获得锁后,执行 Thread.yield() 将 CPU 执行权交给操作系统。 这种行为能够避免互斥锁的无谓上下文切换,也能避免高压场景下的无限自旋带来的 CPU 损耗。这种行为能够缓解系统压力,取得自旋和 CPU 上下文切换两种方法中的最低开销。 在自适应 K 值退避锁的作用下,我们能找到系统性能的局部最优点,达到最大的 TPS 性能。结果如下表所示: 消息发送最大 TPS 的性能优化结果 此外,我们还检查了各个 K 值下的 Broker 资源损耗情况,发现在最大 TPS 时的 K 值,同时也是资源占用相对最低时的 K 值: 各个 K 值下的 CPU 使用率 以 X86 架构,同步刷盘的行为为例。实验结果表明,在 k= 10^3 时,发送速度不仅达到峰值(155019.20),CPU 使用率也达到最低。这表明退避策略成功地节省了 CPU 资源。此时,CPU 支持更高的性能水平和较低的利用率水平,这表明性能瓶颈已经转移——例如,可能已经转移到了磁盘上。在表中可以观察到,在具有相同的 k(10^3)和配置参数(最新代码,SYNC 刷盘模式)的 ARM CPU 上,RocketMQ 的性能提高了 10.4%。此外,如上图所示,当 k= 10^3 时,CPU 使用量大幅下降,从平均超过 1000% 下降到 750% 左右。资源消耗的减少表明,减轻其他系统瓶颈可能可以带来更显著的性能提高。 附论文信息 录用论文题目: 《Beyond the Bottleneck: Enhancing HighConcurrency Systems with Lock Tuning》 作者: 季俊涛,古崟佑,傅玉宝,林清山 论文概述: 高并发系统常常面临性能瓶颈,主要是由于线程间激烈竞争锁导致的等待和上下文切换。作为一家云计算公司,我们非常重视性能的最大化。为此,我们对轻量级自旋锁进行了改进,并提出了一种简洁的参数微调策略,能够在最低风险条件下突破系统性能瓶颈。该策略在高吞吐量消息队列系统 Apache RocketMQ 中得到了验证,实现了 X86 CPU 性能提升 37.58% 和 ARM CPU 性能提升 32.82%。此外,我们还确认了这种方法在不同代码版本和 IO 刷新策略下的一致有效性,显示出其在实际应用中的广泛适用性。这项工作不仅为解决高并发系统的性能问题提供了实用工具,还突显了形式化技术在工程问题解决中的实际价值。 相关链接:
#技术探索

2024年8月30日

基于Apache RocketMQ 的云原生 MQTT 消息引擎设计
概述 随着智能家居、工业互联网和车联网的迅猛发展,面向 IoT(物联网)设备类的消息通讯需求正在经历前所未有的增长。在这样的背景下,高效和可靠的消息传输标准成为了枢纽。MQTT 协议作为新一代物联网场景中得到广泛认可的协议,正逐渐成为行业标准。 本次我们将介绍搭建在 RocketMQ 基础上实现的 MQTT 核心设计,本文重点分析 RocketMQ 如何适应这些变化,通过优化存储和计算架构、推送模型及服务器架构设计,推动 IoT 场景下消息处理的高效性和可扩展性以实现 MQTT 协议。 此外,阿里云 MQTT 以 RocketMQMQTT 为基础,不断进行迭代创新。阿里云是开源 RocketMQMQTT 的主要贡献者和使用者之一。面对设备通信峰谷时段差异性的挑战,本文将介绍阿里云如何将 Serverless 架构应用于消息队列,有效降低运营成本,同时利用云原生环境的特性,为 IoT 设备提供快速响应和灵活伸缩的通讯能力。 进一步地,我们将探讨介绍在云端生态体系中整合 MQTT 的实践,介绍基于统一存储的数据生态集成方案,展示其强大的技术能力和灵活的数据流转能力。 loT 消息场景 (消息场景对比) 物联网技术,作为当代科技领域的璀璨明星,其迅猛发展态势已成共识。据权威预测,至 2025 年,全球物联网设备安装基数有望突破 200 亿大关,这一数字无疑昭示着一个万物互联时代的到来。 更进一步,物联网数据量正以惊人的年增长率约 28% 蓬勃膨胀,预示着未来数据生态中超过 90% 的实时数据将源自物联网。这一趋势深刻改变了数据处理的格局,将实时流数据处理推向以物联网数据为核心的新阶段。 边缘计算的崛起,则是对这一变革的积极响应。预计未来,高达 75% 的数据处理任务将在远离传统数据中心或云端的边缘侧完成。鉴于物联网数据的海量特性,依赖云端进行全部数据处理不仅成本高昂,且难以满足低延迟要求。因此,有效利用边缘计算资源,就地进行数据初步处理,仅将提炼后的关键信息上传云端,成为了提升效率、优化用户体验的关键策略。 在此背景下,消息传递机制在物联网场景中的核心价值愈发凸显: + 桥梁作用:消息系统充当了物联网世界中的“神经网络”,无缝衔接设备与设备、设备与云端应用间的沟通渠道,构筑起云边端一体化的应用框架,确保了信息交流的即时与高效。 + 数据加工引擎:面对物联网持续涌动的数据洪流,基于消息队列(MQ)的事件流存储与流计算技术成为了解锁实时数据分析潜能的钥匙。这一机制不仅能够实时捕捉、存储数据流,还支持在数据产生的瞬间执行计算操作,为物联网应用提供了强大的数据处理基础架构,助力实现数据的即时洞察与决策响应。 总之,消息技术不仅是物联网架构的粘合剂,更是驱动数据流动与智能决策的核心动力,其在物联网领域的应用深度与广度,正随着技术迭代与市场需求的双重驱动而不断拓展。 同时传统消息场景和物联网消息场景有很多的不同,包含以下几个特点: 1)硬件资源差异 传统消息场景依托于高性能、高可靠性的服务集群,运算资源充沛,客户端部署环境多为容器、虚拟机乃至物理服务器,强调集中式计算能力。相比之下,物联网消息场景的客户端直接嵌入至网络边缘的微型设备中,如传感器、智能家电等,这些设备往往受限于极低的计算与存储资源,对能效比有着极高要求。 2)网络环境挑战 在经典的内部数据中心(IDC)环境中,消息处理享有稳定的网络条件和可控的带宽、延迟指标。而物联网环境则拓展至公共网络,面对的是复杂多变的网络状况,尤其在偏远地区或网络覆盖弱的区域,不稳定的连接成为常态,对消息传输的健壮性和效率提出了更高挑战。 3)客户端规模的量级跃迁 传统消息系统处理的日均消息量通常维持在百万级,适用于集中度较高的消息分发。物联网的兴起促使设备数量呈爆炸性增长,动辄涉及亿万级别的终端节点,这对系统的扩展性、消息路由的高效性提出了全新的要求。 4)生产与消费模式的演变 传统场景倾向于采用集中式同步生产模式,强调消息的一致性与有序处理。而物联网消息生成则体现出分散化的特性,每个设备根据其感知环境独立产生少量消息,这对消息收集与整合机制设计提出了新的思考。消费模式上,物联网场景经常涉及大规模广播或组播,单条消息可能触达数百万计的接收者,要求系统具备高效的广播能力和灵活的订阅管理机制。 RocketMQ 的融合架构设计 (融合架构设计图) 我们看到,物联网所需要的消息技术与经典的消息设计有很大的不同。接下来我们来看看基于 RocketMQ 的融合架构 MQTT 设计为了解决物联网的消息场景有哪些特点。 1)融入 MQTT 协议,适应物联网环境特性 RocketMQ 5.0 通过整合 RocketMQMQTT,紧密贴合了物联网领域广泛采用的MQTT协议标准。此协议针对低功耗、网络条件不稳定的情况优化,以其轻量级特性和丰富的功能集脱颖而出,支持不同的消息传递保障级别,满足了从“最多一次”到“仅一次”的多样化需求。协议的领域模型与 RocketMQ 的核心组件高度协调,促进了消息、主题、发布订阅模式的自然融合,为建立一个从设备到云端的无缝消息传递体系打下了稳固的基础。 2)灵活的存储与计算解耦架构 为了应对物联网场景下对高并发连接和大规模数据处理的需求,RocketMQ 5.0 采取了存储与计算分离的架构设计。RocketMQ Broker 作为核心存储组件,确保了数据的持久化与可靠性,而 MQTT 相关的逻辑操作则在专门的代理层实施,这不仅优化了对大量连接、复杂订阅关系的管理,也增强了实时消息推送的能力。这种架构允许根据业务负载动态调整代理层资源,通过增加代理节点来平滑应对设备连接数的增加,体现了系统良好的弹性和扩展潜力。 3)促进端云数据协同的整合架构 RocketMQMQTT 通过其整合的架构设计,促进了物联网设备与云端应用之间的高效数据共享。基于统一的消息存储策略,每条消息在系统内只需存储一次,即可供两端消费,减少了数据冗余,提高了数据流通的效率。此外,RocketMQ 作为数据流的存储中枢,自然而然地与流计算技术结合,为实时分析物联网生成的海量数据提供了便利,加速了数据价值的发掘过程。 存储设计 首先要解决的问题是物联网消息的存储模型。在发布订阅业务模型中,常用的存储模型有两种,写放大和读放大,我们将依次分析两种模型。 (写放大模型) (读放大模型) 写放大模型: 在此模型下,每位消费者拥有专属的消息队列,每条消息需要复制并分布到所有目标消费者的队列中,消费者仅关注并处理自己队列中的消息。以三级主题/Topic/subTopic/test 为例,若该主题吸引了大量客户端订阅,采取“一客一队列”的策略,即每个符合订阅规则的客户端或通配符订阅均维护一份消息副本,将导致消息的存储需求随订阅者数量线性增长。 尽管这种模式在某些传统消息场景,比如遵循 AMQP 协议的应用中表现得游刃有余,因为它确保了消息传递的隔离性和可靠性。但在物联网场景下,特别是当单个消息需被数以百万计的设备消费时,“写放大”策略将引发严重的存储资源消耗问题,迅速成为不可承受之重。 读放大的考量与挑战: 鉴于物联网场景的特殊需求,直接应用传统的“写放大”模型显然是不可行的。为解决这一难题,RocketMQMQTT 采取了更为高效与灵活的存储策略,旨在减少存储冗余,提高系统整体的可扩展性和资源利用率: 在“读放大”模式下,每条消息实际上被存储一次,而为了支持通配符订阅的高效检索,系统在消息存储阶段会创建额外的索引信息——即 consume queue(消费队列)。对于如/Topic/subTopic/这样的通配符订阅,系统会在每个匹配的通配符队列中生成相应的索引,使得订阅了不同通配符主题或具体主题的消费者,都能通过这些共享的存储实体找到并消费到消息。尽管这看似增加了“读”的复杂度,但实际上,每个 consume queue 作为索引,其体积远小于原始消息,显著降低了整体存储成本,同时提高了消息检索与分发的效率。 (原子分发示意图) 为此,我们设计了一种多维度分发的 Topic 队列模型,如上图所示,消息可以来自各个接入场景(如服务端的 MQ/AMQP、客户端的 MQTT),但只会写一份存到 commitlog 里面,然后分发出多个需求场景的队列索引(ConsumerQueue),如服务端场景(MQ/AMQP)可以按照一级 Topic 队列进行传统的服务端消费,客户端 MQTT 场景可以按照 MQTT 多级 Topic 以及通配符订阅进行消费消息。这样的一个队列模型就可以同时支持服务端和终端场景的接入和消息收发,达到一体化的目标。 实现这一模型,RocketMQ依托了两项关键技术特性: + 轻型队列(Light Message Queue) 这一特性允许一条消息被灵活地写入多个 topic queue 中,确保了消息能够高效地适应各种复杂的订阅模式,包括但不限于通配符订阅。它为读放大模型的实现提供了必要的灵活性和效率基础。 + 百万队列能力 RocketMQ 通过集成 RocksDB 这一高性能键值存储引擎,充分利用其在顺序写入方面的优势,实现了百万级别的队列管理能力。特别是通过定制化配置,去除了 RocksDB 内部的日志预写(WriteAhead Log, WAL),进一步优化了存储效率。RocksDB 不仅为 consume queue 提供了稳定高效的存储方案,还确保了即便在极端的队列数量下,系统依然能够保持高性能的索引处理能力。 (轻型队列的实现) 通过采用“读放大”模型,结合 RocketMQ 的轻型队列特性和百万队列的底层技术支持,我们不仅有效解决了物联网环境下消息存储与分发的挑战,还实现了存储成本与系统性能的双重优化。这种设计不仅减少了存储空间的占用,还通过高度优化的索引机制加快了消息检索速度,为大规模物联网设备的消息通信提供了一个既经济又高效的解决方案。 推送模型 (RocketMQMQTT 推送模型) 在介绍完底层队列存储模型之后,我们将重点探讨匹配查找和可靠送达的实现机制。在传统的消息队列 RocketMQ 中,经典的消费模式是消费者通过客户端直接发起长轮询请求,以精准地获取对应主题的队列消息。然而,在 MQTT 场景下,由于客户端数量众多且订阅关系复杂,长轮询模式显得不够有效,因此消费过程变得更加复杂。为此,我们采用了一种推拉结合的模型。 本模型的核心在于终端通过 MQTT 协议连接至代理节点,消息可以来源于多种场景(如MQ、AMQP、MQTT)。当消息存入主题队列后,通知逻辑模块将实时监测到新消息的到达,进而生成消息事件(即消息的主题名称),并推送至网关节点。网关节点根据连接终端的订阅状态进行内部匹配,识别能够接收这一消息的终端,随后触发拉取请求,以从存储层读取消息并推送至终端。 在这个流程中,一个关键问题是通知模块如何确定终端感兴趣的消息,以及哪些网关节点会对此类消息感兴趣。这实际上是一个核心的匹配搜索问题。常见的解决方案主要有两种:第一种是简单的事件广播,第二种是将线上订阅关系集中存储(例如图中的 Lookup 模块),然后进行匹配搜索,再执行精准推送。 虽然事件广播机制在扩展性上存在一定问题,但其性能表现仍然良好,因为我们推送的数据量相对较小,仅为 Topic 名称。此外,同一 Topic 的消息事件可以合并为一个事件,这是我们当前在生产环境中默认采用的方式。另一方面,将线上订阅关系集中存储在 RDS 或 Redis 中也是一种普遍的做法,但这需要保证数据的实性,匹配搜索的过程可能会对整体实时消息链的延迟产生影响。 在该模型中,还设计了一个缓存模块,以便在需要广播大量消息时,避免各个终端对存储层发起重复的读取请求,从而提高整体系统的效率。 阿里云 MQTT 在 Serverless 上的实践 随着云原生技术的不断发展,现代消息中间件逐渐以容器编排为基础,如何实现真正的无服务器架构及秒级弹性管理已成为一项重要的研究课题。 阿里云作为开源 RocketMQMQTT 的主要贡献者和使用者之一,在 MQTT 弹性设计上有很多优化方式和实践经验。我们将介绍阿里云在弹性上的设计思路,展示其如何实现高效、弹性强的 MQTT 消息中间件: 1)抽离网络连接层 阿里云 MQTT 采用类似 Sidecar 的模式,将网络连接层与核心业务逻辑进行分离,使用 Rust 语言来处理网络连接。与 Java 相比,Rust 在内存消耗和启动速度上具有显著优势,尤其在处理大规模 MQTT 连接时,能够有效降低内存占用。 2)秒级扩容 每个 Pod 的资源请求设置较低,同时预留部分 Pod 专门运行 Rust 进程。在扩容需求出现时,系统能够快速启动 MQTT Proxy 进程,省去 Pod 创建和资源挂载的时间,从而显著提升响应速度。 3)弹性预测与监控 利用连接数、TPS、内存、CPU 等白盒指标,以及 RT 等黑盒指标,阿里云 MQTT 依据指标联动规则,制定了合理的扩容策略。这使得系统能够提前预测负载变化并启动 Pod 扩容,确保长期平稳运行。 通过以上设计思路,阿里云能够构建一个高效、弹性强的基于 RocketMQ 的 MQTT 实现方案,充分利用 Rust 带来的性能优势,同时保持系统的稳定性与可扩展性。这种创新设计将在实际应用中显著提升用户体验,助力系统整体性能的优化。 阿里云 MQTT 在车联网中的实践架构 随着汽车出行领域新四化(电气化、智能化、网联化和共享化)的推进,各大汽车制造商正逐步构建以智能驾驶和智能网联为核心的车联网系统。这一新一代车联网系统对底层消息采集、传输和处理的平台架构提出了更高的要求。接下来,我们将介绍阿里云 MQTT 在车联网中的实践架构及其应用价值。 在架构图中,我们可以看到常见的车联网设备,包括车载终端、路测单元和手机端系统。这些设备确保了安全的连接与数据传输。车端的功能涵盖车机数据上报、POI 下发、文件推送、配置下发、消息推送等全新车联业务。这些操作将产生海量的消息 Topic,需要更加安全、稳定的接入与传输,以实现可靠的消息订阅与发布。路端则强调路侧 RSU 的安全接入,支持消息的采集、传输以及地图数据的实时更新。 接入端支持多种协议,包括 TCP、x509、TLS、WSS、WS、OpenAPI 和 AMQP,以满足不同应用场景的灵活需求。这种多协议支持确保了设备之间的无缝互联与高效通信。 在流转生态方面,物联网场景下,各种设备持续产生大量数据,业务方需要对这些数据进行深入分析与处理。采用 RocketMQ 作为存储层,系统能够只保存一份消息,并支持物联网设备和云端应用的共同消费。RocketMQ 的流存储特性使得流计算引擎能够无缝、实时地分析物联网数据,为关键决策提供及时支持。 借助阿里云 EventBridge,MQTT 物联网设备所生成的信息可以顺利流转至 Kafka、AMQP、FC、Flink 等其他中间件或数据处理平台,实现深度的数据分析与处理。事件总线 EventBridge 是阿里云提供的一款 Serverless 总线服务,支持阿里云服务、自定义应用、SaaS 应用以标准化、中心化的方式接入,并能够以标准化的 CloudEvents 1.0 协议在这些应用之间路由事件,帮助轻松构建松耦合、分布式的事件驱动架构。这种灵活的数据流转能力不仅提升了处理速度,还为未来智能化应用的创新和发展奠定了基础。 通过以上架构,可以清晰地看到阿里云 MQTT 在车联网领域的最佳实践,为实现未来智能出行提供了可靠的技术支撑。 结语 在物联网的蓬勃发展背景下,消息传递技术的不断演进已成为支撑智能家居、工业互联网以及车联网等领域的重要基石。通过对 RocketMQ 和 MQTT 协议的深度融合,我们不仅有效解决了物联网时代对高效、可靠消息传输的需求,也为设备通信带来了灵活的解决方案。 阿里云在这一领域的积极探索,通过引入 Serverless 架构,不断推进 MQTT 的技术迭代与创新。这样的设计能够在面对高并发连接和海量数据时,动态调整资源配置,降低成本并提升响应速度,确保了实时数据处理的高效性。 当前,社区正在推动 MQTT 5.0 协议方面已取得显著进展,新的协议特性如更丰富的错误码、更灵活的连接选项以及 will 消息、retain 消息、共享订阅功能都将进一步提升系统的灵活性和可靠性。与此同时,我们在致力于实现更快的弹性扩展能力,以便在面对突发流量时及时响应,提高系统的可用性和灵活性。 随着 IoT 生态体系的不断完善,面对日益复杂的消息场景,消息技术的价值愈发凸显。我们相信,未来通过不断优化的消息架构,能够推动更深层次的智能化应用,同时为构建万物互联的未来奠定坚实的基础。让我们期待 MQTT 在物联网技术的场景中展现其无限可能,同时也继续探索持续探索在确保安全、稳定、高效的消息中间件。
作者:沁君
#技术探索 #物联网
收藏
收藏暂无数据,请从小助手对话框添加
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
无疑 AI答疑专家
当前服务输出的内容均由人工智能模型生成,其生成内容的准确性和完整性无法保证,不代表我们的态度或观点。
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
专家答疑