2025年6月16日

EventBridge 构建智能化时代的企业级云上事件枢纽
产品演进历程:在技术浪潮中的成长之路 早在 2018 年,Gartner 评估报告便将事件驱动模型(EventDriven Model)列为十大战略技术趋势之一,指出事件驱动架构(EDA,Eventdriven Architectures)将成为微服务架构未来的演进方向。 随着云原生与 Serverless 技术的迅猛发展,2020 年,阿里云重磅发布事件总线 EventBridge,构建了云原生环境下的统一事件枢纽。事件总线 EventBridge 支持阿里云服务、自定义应用、SaaS 应用以标准化、中心化的方式接入,并能够以标准化的 CloudEvents 1.0 协议在这些应用之间路由事件,帮助企业轻松构建松耦合、分布式的事件集成驱动架构。 自 6 月 3 日起,阿里云事件总线 EventBridge 正式商业化。历经五年迭代升级,事件总线 EventBridge 在产品功能和用户体验方面不断优化,积累了丰富的规模化生产实践经验。在数据智能化时代,事件总线 EventBridge 持续深耕企业云上事件集成,适用于各种规模和行业的事件集成场景。通过 Event 桥接各个系统,满足 AI 和企业集成等领域的各种数据集成需求,为企业提供便捷且创新的数据集成解决方案。 产品核心特性:构建企业级事件中枢的关键 从诞生到商业化,事件总线 EventBridge 不断优化产品核心特性,致力于定义企业级事件集成标准,为企业构建云上事件枢纽提供坚实可靠的支撑和保障。 + 稳定与安全:依托海量数据传输及运维经验,提供高稳定性且安全合规的企业集成服务; + 性能与成本:提供高性能且性价比高的企业集成方案,显著降低用户数据集成成本; + 开放与集成:提供丰富的跨产品、跨平台连接能力,促进云产品、应用程序、SaaS 服务相互集成; + 统一事件枢纽:统一事件界面,定义事件标准,打破云产品事件孤岛; + 事件驱动引擎:海量事件源,毫秒级触发能力,加速 EDA/Serverless 架构升级; + 流式事件通道:事件流提供轻量、实时、端到端的流式数据处理,对源端产生的事件进行实时抽取、转换和分析并加载至目标端。 多元应用场景:覆盖各类企业事件集成场景 EDA 事件驱动场景 事件总线 EventBridge 通过事件连接应用程序、云服务和 Serverless 服务,构建事件驱动架构(EDA,Eventdriven Architectures),实现应用与应用、应用与云服务之间的高效连接。 流式 ETL 场景 在企业集成场景中,事件总线 EventBridge 可以作为流式数据管道,提供基础的过滤与转换功能,支持不同数据仓库、数据处理程序、数据分析与处理系统之间的数据同步和跨地域备份,连接不同系统与服务。 AI 数据集成场景 事件总线 EventBridge 提供非结构化数据到结构化数据的链路集成,可处理多种数据源,如关系型数据库、API 数据、文件、ODPS 等,并支持将数据向量化后存储至向量数据库或其他数仓,同时支持数据清洗、转换和规范化。为 RAG 和模型数据准备等场景,提供一站式数据集成服务。 统一事件通知服务 事件总线 EventBridge 提供丰富的云产品事件源与事件的全生命周期管理工具,用户可以直接监听云产品产生的数据,并上报至监控和通知等下游服务,实现高效的事件管理和响应。 商用计费体系:计费模式灵活匹配企业需求 事件总线 EventBridge 已于 6 月 3 日起正式商业化,分为事件总线、事件流两类资源进行计费,计费模式如下: 计费组成说明 事件总线 EventBridge 各资源的计费组成信息请参见下图。 事件总线计费说明 事件总线【1】分为云服务专用事件总线和自定义事件总线。 + 云服务专用事件总线:是系统自动创建用于接收阿里云官方事件源的事件且不可修改的内置事件总线。 + 自定义事件总线:需要你自行创建并管理的事件总线。 事件源单价 | 计费项 | 计费单价 | | | | | 云服务专用总线事件发布 | 免费 | | 自定义总线事件发布 | 5.64元/百万次事件发布 | 每个 64KB 大小的事件计为 1 个事件。 事件目标单价 | 计费项 | 计费单价 | | | | | 阿里云服务目标事件推送/通知 | 免费 | | 自定义目标事件推送 | 1.29元/百万次事件推送 | 每个 64KB 大小的事件计为 1 个事件。 事件流计费说明 事件流【2】是端到端的流式事件通道,适用于端到端的流式数据处理场景。事件流属于收费服务,并支持按事件量计费和按 CU 额度计费,这两种计费【3】方式你只需二选一即可。 按事件量计费 | 计费项 | 计费单价 | | | | | 事件量 | 2.8元/百万条 | | 空置资源占用费 | 1元/天 | + 事件量:事件流拉取上游数据源的事件总量。每个 64KB 大小的事件计为 1 个事件。计算公式:事件量(向上取整)=单个事件大小/64KB。例如:批量投递了 300KB 的单个事件,则按照 5 个事件进行计费。若事件发生重试,则按照配置的重试规则每次重试计为 1 个事件。 + 空置资源占用量:若单个事件流在一个月内没有任何数据流入将会按照计费标准收取空置资源占用费,空置资源占用费将在有数据流入或者删除该事件流后取消计费。 按 CU 配额计费 名词介绍 CU 配额: CU(Capacity Unit)是事件总线任务的容量单位。若采用 CU 规格计费,则每个任务必须至少分配 1CU 作为最小配额。亦可在任务创建时指定允许弹性的最大 CU 规格和最小 CU 规格。 计费项和计费单价 + 计费项:CU 配额 + 计费单价:0.24元/小时/CU 说明 单个 CU 可支持的条件(条件为或): 1. 最大支持每秒事件量(EPS)5000Event/s(实际情况受限于链路上下游性能)。 2. 最大支持的峰值吞吐量(BPS)50MB/s 的容量。 为了方便你快速评估费用,事件总线(EventBridge)提供了价格计算器:事件总线(EventBridge)价格计算器【4】。 如果你在使用事件总线 EventBridge 的过程中有任何反馈或疑问,欢迎加入钉钉用户群(钉钉群号:31481771)与阿里云研发团队即时沟通。 【1】事件总线 https://help.aliyun.com/zh/eventbridge/userguide/eventbusoverview 【2】事件流 https://help.aliyun.com/zh/eventbridge/userguide/eventstreamoverview 【3】计费 https://help.aliyun.com/zh/eventbridge/productoverview/billingofeventstreams 【4】事件总线(EventBridge)价格计算器 https://eventbridge.console.aliyun.com/calculator
#社区动态

2025年6月11日

Apache RocketMQ + “太乙” = 开源贡献新体验
Apache RocketMQ 是 Apache 基金会托管的顶级项目,自 2012 年诞生于阿里巴巴,服务于淘宝等核心交易系统,历经多次双十一万亿级数据洪峰稳定性验证,至今已有十余年发展历程。RocketMQ 致力于构建低延迟、高并发、高可用、高可靠的分布式“消息、事件、流”统一处理平台,覆盖云边端⼀体化数据处理场景,帮助企业和开发者在智能化时代,轻松构建事件驱动架构的云原生应用。 Apache RocketMQ 的茁壮成长离不开全球 800 多位开发者的积极参与和贡献。如今,Apache RocketMQ 开源社区将携手"太乙"平台,共同开启一场开源贡献竞赛,为广大开发者提供一个全新的体验平台和参与机会!新一轮开源竞赛于 6 月 1 日正式启动。 关于“太乙”平台 "太乙"是特色化示范性软件学院年度质量检测指标开源数据的官方唯一指定获取平台,服务于示范性软件学院联盟( https://www.pses.com.cn/home/publicresource )204 家高校成员单位。 “太乙”平台( https://www.taiyi.top/ )是浙江大学软件学院自主研发的开源能力评价与服务系统,提供开发者开源贡献价值评价与开源竞赛等服务,旨在精准衡量开发者的贡献价值、影响力和技能水平。 平台通过系统化分析开发者在开源社区中的各类可量化贡献,构建起定性与定量相结合的全维度评价体系,对开发者进行全面刻画。 + 在定性评价方面,平台从影响力、贡献度、语言能力、项目经验和活跃度五个维度对开发者进行宏观审视; + 在定量评价方面,则依据项目重要性、贡献类型、内容关键性、贡献体量及复杂度等指标,并结合程序语言分析与自然语言处理等技术,提供精准、客观、自动化的价值评估,充分认可每一份贡献。 “太乙”平台联合各大公司与知名社区,全年不间断、滚动式发布开源竞赛。基于科学的价值自动化评价系统,参赛者可根据贡献价值大小等比例获得奖金,甚至获得头部企业的实习与就业机会,太乙系统能够支持低成本、高效率的长周期开源竞赛组织,激发开发者的积极性,推动中国开源生态的发展。 Apache RocketMQ x “太乙” 开源竞赛 __ 本轮竞赛于 2025 年 6 月 1 日 启动,竞赛链接: https://www.taiyi.top/competitiondetails?id=6836d00651c0e4a2bd63770c 我们联合太乙平台,为 Apache RocketMQ 的开发者和学习者提供了一站式引导服务。我们不仅提供了详尽的原理介绍、文档说明以及部署教程,还特别设计了一键式自动搭建体验环境,帮助开发者轻松体验 Apache RocketMQ 的部署流程和消息收发过程。 为了进一步促进开发者融入社区生态,我们还精心准备了一份贡献指南:涵盖了社区生态、入门指引、issue 推荐等等。相信通过参与本次开源竞赛,开发者们能够快速掌握如何在 Apache RocketMQ 开源社区作出贡献,顺利成为一名 Contributor!欢迎开发者们踊跃参与~ 你可以获得的 参与 Apache RocketMQ x “太乙”开源竞赛,你将获得宝贵的开源社区贡献经历、可量化的贡献奖金以及成为 Apache 顶级社区 Contributor/Committer/PMC 的成长机会,这些都将为你未来的职业发展提供强大助力! 为确保开源竞赛奖金分配机制的公平性与合理性,兼顾开发者与项目发起方的共同利益,“太乙”平台采用了一套科学的公式来计算可分配的奖金额度。随着开发者提交贡献的增加,可分配的奖金总额将随之上涨,最高达 5000 元。 此次竞赛活动将进一步促进 Apache RocketMQ 技术生态的繁荣与发展,为参与者创造更多学习、交流与成长的机会。我们诚邀每一位热衷于分布式消息处理技术探索与实践的开发者加入,在实践中不断提升自我,共同推动 Apache RocketMQ 的技术进步与社区发展。 让我们在开源的道路上,携手共进,创造不凡! 点击:了解太乙开源服务系统更多详情
#社区动态

2025年5月29日

Apache RocketMQ 源码解析 —— 秒级定时消息介绍
背景 如今rocketmq的应用场景日益拓宽,延时消息的需求也在增加。原本的特定级别延时消息已经不足以支撑rocketmq灵活的使用场景。因此,我们需要一个支持任意时间的延迟消息feature。 支持任意时间延迟的feature能够让使用者在消息发出时指定其消费时间,在生活与生产中具有非常重要的意义。 目标 1. 支持任意时延的延迟消息。 2. 提供延迟消息可靠的存储方式。 3. 保证延迟消息具有可靠的收发性能。 4. 提供延迟消息的可观测性排查能力。 架构 存储数据结构 本方案主要通过时间轮实现任意时延的定时消息。在此过程中,涉及两个核心的数据结构:TimerLog(存储消息索引)和TimerWheel(时间轮,用于定时消息到时)。 TimerLog,为本RIP中所设计的定时消息的记录文件,Append Only。每条记录包含一个prev_pos,指向前一条定时到同样时刻的记录。每条记录的内容可以包含定时消息本身,也可以只包含定时消息的位置信息。每一条记录包含如下信息: | 名称 | 大小 | 备注 | | | | | | size | 4B | 保存记录的大小 | | prev_pos | 8B | 前一条记录的位置 | | next_Pos | 8B | 后一条记录的位置,暂时为1,作为保留字段 | | magic | 4B | magic value | | delayed_time | 4B | 该条记录的定时时间 | | offset_real | 8B | 该条消息在commitLog中的位置 | | size_real | 4B | 该条消息在commitLog中的大小 | | hash_topic | 4B | 该条消息topic的hash code | | varbody | | 存储可变的body,暂时没有 | TimerWheel是对时刻表的一种抽象,通常使用数组实现。时刻表上的每一秒,顺序对应到数组中的位置,然后数组循环使用。时间轮的每一格,指向了TimerLog中的对应位置,如果这一格的时间到了,则按TimerLog中的对应位置以及prev_pos位置依次读出每条消息。 时间轮一格一格向前推进,配合TimerLog,依次读出到期的消息,从而达到定时消息的目的。时间轮的每一格设计如下: | delayed_time(8B) 延迟时间 | first_pos(8B) 首条位置 | last_pos(8B) 最后位置 | num(4B)消息条数 | | | | | | 上述设计的TimerLog与TimerWheel的协作如下图所示。 pipeline 在存储方面,采用本地文件系统作为可靠的延时消息存储介质。延时消息另存TimerLog文件中。通过时间轮对定时消息进行定位以及存取。针对长时间定时消息,通过消息滚动的方式避免过大的消息存储量。其具体架构如下所示: 从图中可以看出,共有五个Service分别处理定时消息的放置和存储。工作流如下: 1. 针对放置定时消息的service,每50ms从commitLog读取指定主题(TIMER_TOPIC)的定时消息。 1. TimerEnqueueGetService从commitLog读取得到定时主题的消息,并先将其放入enqueuePutQueue。 2. 另一个线程TimerEnqueuePutService将其放入timerLog,更新时间轮的存储内容。将该任务放进时间轮的指定位置。 2. 针对取出定时消息的service,每50ms读取下一秒的slot。有三个线程将读取到的消息重新放回commitLog。 1. 首先,TimerDequeueGetService每50ms读一次下一秒的slot,从timerLog中得到指定的msgs,并放进dequeueGetQueue。 2. 而后TimerDequeueGetMessageService从dequeueGetQueue中取出msg,并将其放入队列中。该队列为待写入commitLog的队列,dequeuePutQueue。 3. 最后TimerDequeuePutMessageService将这个queue中的消息取出,若已到期则修改topic,放回commitlog,否则继续按原topic写回CommitLog滚动。 代码实现 TimerLog与TimerWheel的协作实现 定时消息的核心存储由TimerLog和TimerWheel协同完成。TimerLog作为顺序写入的日志文件,每条记录包含消息在CommitLog中的物理偏移量(offsetPy)和延迟时间(delayed_time)。当消息到达时,TimerEnqueuePutService会将其索引信息追加到TimerLog,并通过prev_pos字段构建链表结构,确保同一时刻的多个消息可被快速遍历。 ```java // TimerLog.java 核心写入逻辑 public long append(byte[] data, int pos, int len) { MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); // 处理文件切换:当当前文件剩余空间不足时,填充空白段并创建新文件 if (len + MIN_BLANK_LEN mappedFile.getFileSize() mappedFile.getWrotePosition()) { ByteBuffer blankBuffer = ByteBuffer.allocate(MIN_BLANK_LEN); blankBuffer.putInt(mappedFile.getFileSize() mappedFile.getWrotePosition()); blankBuffer.putLong(0); // prev_pos置空 blankBuffer.putInt(BLANK_MAGIC_CODE); // 标记空白段 mappedFile.appendMessage(blankBuffer.array()); mappedFile = this.mappedFileQueue.getLastMappedFile(0); // 切换到新文件 } // 写入实际数据并返回物理偏移量 long currPosition = mappedFile.getFileFromOffset() + mappedFile.getWrotePosition(); mappedFile.appendMessage(data, pos, len); return currPosition; } ``` 此代码展示了消息追加的核心流程: 1. 检查当前文件的剩余空间,不足时填充空白段并创建新文件 2. 将消息索引数据写入内存映射文件 3. 返回写入位置的全局偏移量,供时间轮记录 时间轮槽位管理 TimerWheel通过数组结构管理时间槽位,每个槽位记录该时刻的首尾指针和消息数量。当消息入队时,putSlot方法会更新对应槽位的链表结构: ```java // TimerWheel.java 槽位更新逻辑 public void putSlot(long timeMs, long firstPos, long lastPos, int num, int magic) { localBuffer.get().position(getSlotIndex(timeMs) Slot.SIZE); localBuffer.get().putLong(timeMs / precisionMs); // 标准化时间戳 localBuffer.get().putLong(firstPos); // 链表头指针 localBuffer.get().putLong(lastPos); // 链表尾指针 localBuffer.get().putInt(num); // 当前槽位消息总数 localBuffer.get().putInt(magic); // 特殊标记(如滚动/删除) } ``` 该方法的实现细节: + getSlotIndex(timeMs)将时间戳映射到环形数组的索引 + 同时记录首尾指针以实现O(1)复杂度插入 消息入队流程 TimerEnqueueGetService:消息扫描服务 该服务作为定时消息入口,持续扫描CommitLog中的TIMER_TOPIC消息。其核心逻辑通过enqueue方法实现: ```java // TimerMessageStore.java public boolean enqueue(int queueId) { ConsumeQueueInterface cq = this.messageStore.getConsumeQueue(TIMER_TOPIC, queueId); ReferredIterator iterator = cq.iterateFrom(currQueueOffset); while (iterator.hasNext()) { CqUnit cqUnit = iterator.next(); MessageExt msgExt = getMessageByCommitOffset(cqUnit.getPos(), cqUnit.getSize()); // 构造定时请求对象 TimerRequest timerRequest = new TimerRequest( cqUnit.getPos(), cqUnit.getSize(), Long.parseLong(msgExt.getProperty(TIMER_OUT_MS)), System.currentTimeMillis(), MAGIC_DEFAULT, msgExt ); // 放入入队队列(阻塞式) while (!enqueuePutQueue.offer(timerRequest, 3, TimeUnit.SECONDS)) { if (!isRunningEnqueue()) return false; } currQueueOffset++; // 更新消费进度 } } ``` 关键设计点: 1. 增量扫描:通过currQueueOffset记录消费位移,避免重复处理 2. 消息转换:将ConsumeQueue中的索引转换为包含完整元数据的TimerRequest` TimerEnqueuePutService:时间轮写入服务 从队列获取请求后,该服务执行核心的定时逻辑: ```java // TimerMessageStore.java public boolean doEnqueue(long offsetPy, int sizePy, long delayedTime, MessageExt messageExt) { // 计算目标时间槽位 Slot slot = timerWheel.getSlot(delayedTime); // 构造TimerLog记录 ByteBuffer buffer = ByteBuffer.allocate(TimerLog.UNIT_SIZE); buffer.putLong(slot.lastPos); // 前驱指针指向原槽位尾 buffer.putLong(offsetPy); // CommitLog物理偏移 buffer.putInt(sizePy); // 消息大小 buffer.putLong(delayedTime); // 精确到毫秒的延迟时间 // 写入TimerLog并更新时间轮 long pos = timerLog.append(buffer.array()); timerWheel.putSlot(delayedTime, slot.firstPos, pos, slot.num + 1); } ``` 写入优化策略: + 空间预分配:当检测到当前MappedFile剩余空间不足时,自动填充空白段并切换文件 + 链表式存储:通过prev_pos字段构建时间槽位的倒序链表,确保新消息快速插入 + 批量提交:积累多个请求后批量写入,减少文件I/O次数 TimerEnqueuePutService从队列获取消息请求,处理消息滚动逻辑。当检测到延迟超过时间轮窗口时,将消息重新写入并标记为滚动状态: ```java // TimerMessageStore.java 消息滚动处理 boolean needRoll = delayedTime tmpWriteTimeMs = (long) timerRollWindowSlots precisionMs; if (needRoll) { magic |= MAGIC_ROLL; // 调整延迟时间为时间轮窗口中间点,确保滚动后仍有处理时间 delayedTime = tmpWriteTimeMs + (long) (timerRollWindowSlots / 2) precisionMs; } ``` 此逻辑的关键设计: 1. 当延迟时间超过当前时间轮容量时触发滚动 2. 将消息的delayed_time调整为窗口中间点,避免频繁滚动 3. 设置MAGIC_ROLL标记,出队时识别滚动消息 消息出队处理 TimerDequeueGetService:到期扫描服务 该服务以固定频率推进时间指针,触发到期消息处理: ```java // TimerMessageStore.java public int dequeue() throws Exception { // 获取当前时间槽位 Slot slot = timerWheel.getSlot(currReadTimeMs); // 遍历TimerLog链表 long currPos = slot.lastPos; while (currPos != 1) { SelectMappedBufferResult sbr = timerLog.getTimerMessage(currPos); ByteBuffer buf = sbr.getByteBuffer(); // 解析记录元数据 long prevPos = buf.getLong(); long offsetPy = buf.getLong(); int sizePy = buf.getInt(); long delayedTime = buf.getLong(); // 分类处理(普通消息/删除标记) if (isDeleteMarker(buf)) { deleteMsgStack.add(new TimerRequest(offsetPy, sizePy, delayedTime)); } else { normalMsgStack.addFirst(new TimerRequest(offsetPy, sizePy, delayedTime)); } currPos = prevPos; // 前向遍历链表 } // 分发到处理队列 splitAndDispatch(normalMsgStack); splitAndDispatch(deleteMsgStack); moveReadTime(); // 推进时间指针 } ``` TimerDequeuePutMessageService:消息投递服务 TimerDequeuePutMessageService将到期消息重新写入CommitLog。对于滚动消息,会修改主题属性并增加重试计数: ```java // TimerMessageStore.java 消息转换逻辑 MessageExtBrokerInner convert(MessageExt messageExt, long enqueueTime, boolean needRoll) { if (needRoll) { // 增加滚动次数标记 MessageAccessor.putProperty(messageExt, TIMER_ROLL_TIMES, Integer.parseInt(messageExt.getProperty(TIMER_ROLL_TIMES)) + 1 + ""); } // 恢复原始主题和队列ID messageInner.setTopic(messageInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC)); messageInner.setQueueId(Integer.parseInt( messageInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID))); return messageInner; } ``` 此转换过程确保: + 滚动消息保留原始主题信息 + 每次滚动增加TIMER_ROLL_TIMES属性 该服务最终将到期消息重新注入CommitLog: ```java // TimerMessageStore.java public void run() { while (!stopped) { TimerRequest req = dequeuePutQueue.poll(10, TimeUnit.MILLISECONDS); MessageExtBrokerInner innerMsg = convert(req.getMsg(), req.needRoll()); // 消息重投递逻辑 int result = doPut(innerMsg, req.needRoll()); switch (result) { case PUT_OK: // 成功:更新监控指标 timerMetrics.recordDelivery(req.getTopic()); break; case PUT_NEED_RETRY: // 重试:重新放回队列头部 dequeuePutQueue.putFirst(req); break; case PUT_NO_RETRY: // 丢弃:记录错误日志 log.warn("Discard undeliverable message:{}", req); } } } ``` 故障恢复机制 系统重启时通过recover方法重建时间轮状态。关键步骤包括遍历TimerLog文件并修正槽位指针: ```java // TimerMessageStore.java 恢复流程 private long recoverAndRevise(long beginOffset, boolean checkTimerLog) { List mappedFiles = timerLog.getMappedFileQueue().getMappedFiles(); for (MappedFile mappedFile : mappedFiles) { SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0); ByteBuffer bf = sbr.getByteBuffer(); while (position Google Doc: + Shimo:
#技术探索

2025年5月29日

Apache RocketMQ 源码解析 —— Controller 高可用切换架构
一、原理及核心概念浅述 1.1 核心架构 1.2 核心概念 1. controller:负责管理broker间的主备关系,可以挂在namesrv中,不影响namesrv能力,支持独立部署。 2. master/slave:主备身份。 3. syncStateSet:字面意思为“同步状态集合”。当备节点能够及时跟上主节点,则会纳入syncStateSet。 4. epoch:用于记录每一次主备切换时的状态,避免切换后产生数据丢失或者不一致的情况。 为方便理解,在某些过程中可以把controller当作班主任,master作为小组长,slave作为小组成员。同步过程是各位同学向小组长抄作业的过程,位于syncStateSet中的是优秀作业。 二、相关代码文件及说明 核心是“controller+broker+复制过程”,因此分三块进行叙述。 2.1 Controller 该部分代码主要集中在rocketmqcontroller模块下,主要有如下代码文件: + ControllerManager: 负责管理controller,其中存储了许多controller相关配置,并负责了心跳管理等核心功能。(班主任管理条例) + DLederController: Controller的DLedger实现。包含了controller的基本功能。在其中实现了副本信息管理、broker存活情况探测、选举Master等核心功能。(某种班主任) + DefaultBrokerHeartbeatManager: 负责管理broker心跳,其中包含了broker存活情况表,以及在broker下线时的listeners,当副本掉线时,触发重新选举。(点名册) + ReplicasInfoManager: 负责controller中事件的处理。即各种选举事件、更换SyncStateSet事件等等。(小组登记册) + ControllerRequestProcessor: 处理向controller发送的requests,例如让controller选举、向controller注册broker、心跳、更换SyncStateSet等等。(班主任信箱) + DefaultElectPolicy: 选举Master的策略。可以选择从sync状态的副本中选,也可以支持从所有副本中(无论是否同步)的unclean选举。(班规) + ...... 2.2 Broker 该部分代码主要集中在rocketmqbroker模块中,可进入org/apache/rocketmq/broker/controller进行查看: + ReplicasManager: 完成自己作为一个replica的使命——找controller,角色管理,Master更新(Expand/Shrink)SyncStateSet等等。 2.3 复制模块 该部分代码主要集中在rocketmqstore模块中的ha文件夹下: + HAService: 每个Replica必备的的service,负责管理作为主、备的同步任务。 + HAClient: 每个Slave 的HAService中必备的client,负责管理同步任务中的读、写操作。 + HAConnection: 代表在Master中的HA连接,每个connection理论上对应一个slave。在该connection类中存储了传输过程中的诸多内容,包括channel、传输状态、当前传输位点等等信息。 三、核心流程 3.1 心跳 核心CODE:BROKER_HEARTBEAT Broker端: 该部分较简单,带上code向controller发request,不再赘述: BrokerController.sendHeartbeat() brokerOuterAPI.sendHeartbeat() Controller端: 1. 首先由ControllerRequestProcessor接收到code,进入处理逻辑: ```java private RemotingCommand handleBrokerHeartbeat(ChannelHandlerContext ctx, RemotingCommand request) throws Exception { final BrokerHeartbeatRequestHeader requestHeader = (BrokerHeartbeatRequestHeader) request.decodeCommandCustomHeader(BrokerHeartbeatRequestHeader.class); if (requestHeader.getBrokerId() == null) { return RemotingCommand.createResponseCommand(ResponseCode.CONTROLLER_INVALID_REQUEST, "Heart beat with empty brokerId"); } this.heartbeatManager.onBrokerHeartbeat(requestHeader.getClusterName(), requestHeader.getBrokerName(), requestHeader.getBrokerAddr(), requestHeader.getBrokerId(), requestHeader.getHeartbeatTimeoutMills(), ctx.channel(), requestHeader.getEpoch(), requestHeader.getMaxOffset(), requestHeader.getConfirmOffset(), requestHeader.getElectionPriority()); return RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "Heart beat success"); } ``` 2. 之后在onBrokerHeartbeat()中,主要更新controller brokerHeartbeatManager中的brokerLiveTable: ```java public void onBrokerHeartbeat(String clusterName, String brokerName, String brokerAddr, Long brokerId, Long timeoutMillis, Channel channel, Integer epoch, Long maxOffset, Long confirmOffset, Integer electionPriority) { BrokerIdentityInfo brokerIdentityInfo = new BrokerIdentityInfo(clusterName, brokerName, brokerId); BrokerLiveInfo prev = this.brokerLiveTable.get(brokerIdentityInfo); ...... if (null == prev) { this.brokerLiveTable.put(...); log.info("new broker registered, {}, brokerId:{}", brokerIdentityInfo, realBrokerId); } else { prev.setXXX(......) } } ``` 3.2 选举 相关CODE: CONTROLLER_ELECT_MASTER 有如下几种情形可能触发选举: 1. controller主动发起,通过triggerElectMaster(): 1. HeartbeatManager监听到有broker心跳失效。 (班主任发现有小组同学退学了) 2. Controller检测到有一组Replica Set不存在master。(班主任发现有组长虽然在名册里,但是挂了) 2. broker发起将自己选为master,通过ReplicaManager.brokerElect(): 1. Broker向controller查metadata时,没找到master信息。(同学定期检查小组情况,问班主任为啥没小组长) 2. Broker向controller注册完后,仍未从controller获取到master信息。(同学报道后发现没小组长,汇报) 3. 通过tools发起: 1. 通过选举命令ReElectMasterSubCommand发起。(校长直接任命) 上述所有过程,最终均触发: controller.electMaster() replicasInfoManager.electMaster() // 即,所有小组长必须通过班主任任命 ```java public ControllerResult electMaster(final ElectMasterRequestHeader request, final ElectPolicy electPolicy) { ... // 从request中取信息 ... if (syncStateInfo.isFirstTimeForElect()) { // 从未注册,直接任命 newMaster = brokerId; } // 按选举政策选主 if (newMaster == null) { // we should assign this assignedBrokerId when the brokerAddress need to be elected by force Long assignedBrokerId = request.getDesignateElect() ? brokerId : null; newMaster = electPolicy.elect(brokerReplicaInfo.getClusterName(), brokerReplicaInfo.getBrokerName(), syncStateSet, allReplicaBrokers, oldMaster, assignedBrokerId); } if (newMaster != null && newMaster.equals(oldMaster)) { // 老主 == 新主 // old master still valid, change nothing String err = String.format("The old master %s is still alive, not need to elect new master for broker %s", oldMaster, brokerReplicaInfo.getBrokerName()); LOGGER.warn("{}", err); // the master still exist response.setXXX() result.setBody(new ElectMasterResponseBody(syncStateSet).encode()); result.setCodeAndRemark(ResponseCode.CONTROLLER_MASTER_STILL_EXIST, err); return result; } // a new master is elected if (newMaster != null) { // 出现不一样的新主 final int masterEpoch = syncStateInfo.getMasterEpoch(); final int syncStateSetEpoch = syncStateInfo.getSyncStateSetEpoch(); final HashSet newSyncStateSet = new HashSet<(); //设置新的syncStateSet newSyncStateSet.add(newMaster); response.setXXX()... ElectMasterResponseBody responseBody = new ElectMasterResponseBody(newSyncStateSet); } result.setBody(responseBody.encode()); final ElectMasterEvent event = new ElectMasterEvent(brokerName, newMaster); result.addEvent(event); return result; } // 走到这里,说明没有主,选举失败 // If elect failed and the electMaster is triggered by controller (we can figure it out by brokerAddress), // we still need to apply an ElectMasterEvent to tell the statemachine // that the master was shutdown and no new master was elected. if (request.getBrokerId() == null) { final ElectMasterEvent event = new ElectMasterEvent(false, brokerName); result.addEvent(event); result.setCodeAndRemark(ResponseCode.CONTROLLER_MASTER_NOT_AVAILABLE, "Old master has down and failed to elect a new broker master"); } else { result.setCodeAndRemark(ResponseCode.CONTROLLER_ELECT_MASTER_FAILED, "Failed to elect a new master"); } return result; } ``` 3.3 更新SyncStateSet 核心CODE: CONTROLLER_ALTER_SYNC_STATE_SET 1. 由master发起,主动向controller更换syncStateSet(等价于小组长汇报优秀作业) 2. controllerRequestProcessor接收更换syncStateSet的请求,进入handleAlterSyncStateSet()方法: ```java private RemotingCommand handleAlterSyncStateSet(ChannelHandlerContext ctx, RemotingCommand request) throws Exception { final AlterSyncStateSetRequestHeader controllerRequest = (AlterSyncStateSetRequestHeader) request.decodeCommandCustomHeader(AlterSyncStateSetRequestHeader.class); final SyncStateSet syncStateSet = RemotingSerializable.decode(request.getBody(), SyncStateSet.class); final CompletableFuture future = this.controllerManager.getController().alterSyncStateSet(controllerRequest, syncStateSet); if (future != null) { return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS); } return RemotingCommand.createResponseCommand(null); } ``` 3. 之后进入Controller.alterSyncStateSet() replicasInfoManager.alterSyncStateSet()方法: ```java public ControllerResult alterSyncStateSet( final AlterSyncStateSetRequestHeader request, final SyncStateSet syncStateSet, final BrokerValidPredicate brokerAlivePredicate) { final String brokerName = request.getBrokerName(); ... final Set newSyncStateSet = syncStateSet.getSyncStateSet(); final SyncStateInfo syncStateInfo = this.syncStateSetInfoTable.get(brokerName); final BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName); // 检查syncStateSet是否有变化 final Set oldSyncStateSet = syncStateInfo.getSyncStateSet(); if (oldSyncStateSet.size() == newSyncStateSet.size() && oldSyncStateSet.containsAll(newSyncStateSet)) { String err = "The newSyncStateSet is equal with oldSyncStateSet, no needed to update syncStateSet"; ... } // 检查是否是master发起的 if (!syncStateInfo.getMasterBrokerId().equals(request.getMasterBrokerId())) { String err = String.format("Rejecting alter syncStateSet request because the current leader is:{%s}, not {%s}", syncStateInfo.getMasterBrokerId(), request.getMasterBrokerId()); ... } // 检查master的任期epoch是否一致 if (request.getMasterEpoch() != syncStateInfo.getMasterEpoch()) { String err = String.format("Rejecting alter syncStateSet request because the current master epoch is:{%d}, not {%d}", syncStateInfo.getMasterEpoch(), request.getMasterEpoch()); ... } // 检查syncStateSet的epoch if (syncStateSet.getSyncStateSetEpoch() != syncStateInfo.getSyncStateSetEpoch()) { String err = String.format("Rejecting alter syncStateSet request because the current syncStateSet epoch is:{%d}, not {%d}", syncStateInfo.getSyncStateSetEpoch(), syncStateSet.getSyncStateSetEpoch()); ... } // 检查新的syncStateSet的合理性 for (Long replica : newSyncStateSet) { // 检查replica是否存在 if (!brokerReplicaInfo.isBrokerExist(replica)) { String err = String.format("Rejecting alter syncStateSet request because the replicas {%s} don't exist", replica); ... } // 检查broker是否存活 if (!brokerAlivePredicate.check(brokerReplicaInfo.getClusterName(), brokerReplicaInfo.getBrokerName(), replica)) { String err = String.format("Rejecting alter syncStateSet request because the replicas {%s} don't alive", replica); ... } } // 检查是否包含master if (!newSyncStateSet.contains(syncStateInfo.getMasterBrokerId())) { String err = String.format("Rejecting alter syncStateSet request because the newSyncStateSet don't contains origin leader {%s}", syncStateInfo.getMasterBrokerId()); ... } // 更新epoch int epoch = syncStateInfo.getSyncStateSetEpoch() + 1; ... // 生成事件,替换syncStateSet final AlterSyncStateSetEvent event = new AlterSyncStateSetEvent(brokerName, newSyncStateSet); ... } ``` 4. 最后通过syncStateInfo.updateSyncStateSetInfo(),更新syncStateSetInfoTable.get(brokerName)得到的syncStateInfo信息(该过程可以理解为班主任在班级分组册上找到了组长的名字,拿出组员名单,更新)。 3.4 复制 该部分较复杂,其中HAService/HAClient/HAConnection以及其中的各种Service/Reader/Writer容易产生混淆,对阅读造成阻碍。因此绘制本图帮助理解(可在粗读源码后回头理解): 下面对HA复制过程作拆解,分别讲解: 1. 在各个replica的DefaultMessageStore中均注册了HAService,负责管理HA的复制。 2. 在Master的 HAService中有一个AcceptSocketService, 负责自动接收各个slave的连接: ```java protected abstract class AcceptSocketService extends ServiceThread { ... / Starts listening to slave connections. @throws Exception If fails. / public void beginAccept() throws Exception { ... } @Override public void shutdown(final boolean interrupt) { ... } @Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { this.selector.select(1000); Set selected = this.selector.selectedKeys(); if (selected != null) { for (SelectionKey k : selected) { if (k.isAcceptable()) { SocketChannel sc = ((ServerSocketChannel) k.channel()).accept(); if (sc != null) { DefaultHAService.log.info("HAService receive new connection, " + sc.socket().getRemoteSocketAddress()); try { HAConnection conn = createConnection(sc); conn.start(); DefaultHAService.this.addConnection(conn); } catch (Exception e) { log.error("new HAConnection exception", e); sc.close(); } } } ... } } ``` 3. 在各个Slave 的HAService中存在一个HAClient,负责向master发起连接、传输请求。 ```java public class AutoSwitchHAClient extends ServiceThread implements HAClient { ... } public interface HAClient { void start(); void shutdown(); void wakeup(); void updateMasterAddress(String newAddress); void updateHaMasterAddress(String newAddress); String getMasterAddress(); String getHaMasterAddress(); long getLastReadTimestamp(); long getLastWriteTimestamp(); HAConnectionState getCurrentState(); void changeCurrentState(HAConnectionState haConnectionState); void closeMaster(); long getTransferredByteInSecond(); } ``` 4. 当master收到slave的连接请求后,将会创建一个HAConnection,负责收发内容。 ```java public interface HAConnection { void start(); void shutdown(); void close(); SocketChannel getSocketChannel(); HAConnectionState getCurrentState(); String getClientAddress(); long getTransferredByteInSecond(); long getTransferFromWhere(); long getSlaveAckOffset(); } ``` 5. Master的HAConnection会与Slave的HAClient建立连接,二者均通过HAWriter(较简单,不解读,位于HAWriter类)往socket中写内容,再通过HAReader读取socket中的内容。只不过一个是HAServerReader,一个是HAClientReader: ```java public abstract class AbstractHAReader { private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); protected final List readHookList = new ArrayList<(); public boolean read(SocketChannel socketChannel, ByteBuffer byteBufferRead) { int readSizeZeroTimes = 0; while (byteBufferRead.hasRemaining()) { ... boolean result = processReadResult(byteBufferRead); ... } } ... protected abstract boolean processReadResult(ByteBuffer byteBufferRead); } ``` 6. 两种HAReader均实现了processReadResult()方法,负责处理从socket中得到的数据。client需要详细阐述该方法,因为涉及到如何将读进来的数据写入commitlog,client的processReadResult(): ```java @Override protected boolean processReadResult(ByteBuffer byteBufferRead) { int readSocketPos = byteBufferRead.position(); try { while (true) { ... switch (AutoSwitchHAClient.this.currentState) { case HANDSHAKE: { ... // 握手阶段,先检查commitlog完整性,截断 } break; case TRANSFER: { // 传输阶段,将body写入commitlog ... byte[] bodyData = new byte[bodySize]; ... if (bodySize 0) { // 传输阶段,将body写入commitlog AutoSwitchHAClient.this.messageStore.appendToCommitLog(masterOffset, bodyData, 0, bodyData.length); } haService.updateConfirmOffset(Math.min(confirmOffset, messageStore.getMaxPhyOffset())); ... break; } default: break; } if (isComplete) { continue; } } // 检查buffer中是否还有数据, 如果有, compact() ... break; } } ... } ``` 7. server的processReadResult()主要用于接收client的握手等请求,较简单。更需要解释其WriteSocketService如何向socket中调用HAwriter去写数据: ```java abstract class AbstractWriteSocketService extends ServiceThread { ... private void transferToSlave() throws Exception { ... int size = this.getNextTransferDataSize(); if (size 0) { ... buildTransferHeaderBuffer(this.transferOffset, size); this.lastWriteOver = this.transferData(size); } else { // 无需传输,直接更新caught up的时间 AutoSwitchHAConnection.this.haService.updateConnectionLastCaughtUpTime(AutoSwitchHAConnection.this.slaveId, System.currentTimeMillis()); haService.getWaitNotifyObject().allWaitForRunning(100); } } @Override public void run() { AutoSwitchHAConnection.LOGGER.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { this.selector.select(1000); switch (currentState) { case HANDSHAKE: // Wait until the slave send it handshake msg to master. // 等待slave的握手请求,并进行回复 break; case TRANSFER: ... transferToSlave(); break; default: ... } } catch (Exception e) { ... } } ... // 在service结束后的一些事情 } ... } ``` 此处同样附上server实现processReadResult(),读socket中数据的代码: ```java @Override protected boolean processReadResult(ByteBuffer byteBufferRead) { while (true) { ... HAConnectionState slaveState = HAConnectionState.values()[byteBufferRead.getInt(readPosition)]; switch (slaveState) { case HANDSHAKE: // 收到了client的握手 ... LOGGER.info("Receive slave handshake, slaveBrokerId:{}, isSyncFromLastFile:{}, isAsyncLearner:{}", AutoSwitchHAConnection.this.slaveId, AutoSwitchHAConnection.this.isSyncFromLastFile, AutoSwitchHAConnection.this.isAsyncLearner); break; case TRANSFER: // 收到了client的transfer状态 ... // 更新client状态信息 break; default: ... } ... } ``` 3.5 Active Controller的选举 该选举主要通过DLedger实现,在DLedgerController中通过RoleChangeHandler.handle()更新自身身份: ```java class RoleChangeHandler implements DLedgerLeaderElector.RoleChangeHandler { private final String selfId; private final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("DLedgerControllerRoleChangeHandler_")); private volatile MemberState.Role currentRole = MemberState.Role.FOLLOWER; public RoleChangeHandler(final String selfId) { this.selfId = selfId; } @Override public void handle(long term, MemberState.Role role) { Runnable runnable = () { switch (role) { case CANDIDATE: this.currentRole = MemberState.Role.CANDIDATE; // 停止扫描inactive broker任务 ... case FOLLOWER: this.currentRole = MemberState.Role.FOLLOWER; // 停止扫描inactive broker任务 ... case LEADER: { log.info("Controller {} change role to leader, try process a initial proposal", this.selfId); int tryTimes = 0; while (true) { // 将会开始扫描inactive brokers ... break; } } }; this.executorService.submit(runnable); } ... } ```
#技术探索

2025年5月29日

Apache RocketMQ 源码解析 —— POP 消费模式逻辑介绍
一、背景 在RocketMQ 5.0之前,消费有两种方式可以从Broker获取消息,分别为Pull模式和Push模式。 1. Pull模式:消费需要不断的从阻塞队列中获取数据,如果没有数据就等待,这个阻塞队列中的数据由消息拉取线程从Broker拉取消息之后加入的,所以Pull模式下消费需要不断主动从Broker拉取消息。 2. Push模式:需要注册消息监听器,当有消息到达时会通过回调函数进行消息消费,从表面上看就像是Broker主动推送给消费者一样,所以叫做推模式,底层依旧是消费者从Broker拉取数据然后触发回调函数进行消息消费,只不过不需要像Pull模式一样不断判断是否有消息到来。 不过不管是Pull模式还是Push模式,在集群模式下,一个消息队列只能分配给同一个消费组内的某一个消费者进行消费,所以需要进行Rebalance负载均衡为每个消费者分配消息队列之后才可以进行消息消费。 Rebalance的工作是在每个消费者端进行的,消费端负责的工作太多,除了负载均衡还有消费位点管理等功能,如果新增一种语言的支持,就需要重新实现一遍对应的业务逻辑代码。 除此以外,在RocketMQ 5.0以前负载均衡是以消息队列为维度为每个消费者分配的,一个消息队列只能分给组内一个消费者消费,所以会存在以下问题: (1)队列一次只能分给组内一个消费者消费,扩展能力有限。 (2)消息队列数量与消费者数量比例不均衡时,可能会导致某些消费者没有消息队列可以分配或者某些消费者承担过多的消息队列,分配不均匀; (3)如果某个消费者hang住,会导致分配到该消费者的消息队列中的消息无法消费,导致消息积压; 在RocketMQ 5.0增加了Pop模式消费,将负载均衡、消费位点管理等功能放到了Broker端,减少客户端的负担,使其变得轻量级,并且5.0之后支持消息粒度的负载均衡。当前支持任意时延的定时消息已在开源社区开源,而其支持的任意时延定时特性可以帮助POP进行一定优化。主要优化点在于InvisibleTime的设置。 二、原理 POP模式下,消息的消费有如下几个过程: 客户端发起POP消费请求Broker处理POP请求,返回消息客户端消费完成,返回ACKBroker处理Ack请求,在内部进行消费完成的位点更新。 可以用下图表示: 三、核心代码 3.1 核心Processor 3.1.1 PopMessageProcessor 该线程用于处理客户端发出的POP请求。在收到POP请求后,将处理该请求,找到待POP的队列: ```java int index = (PermName.isPriority(topicConfig.getPerm()) ? i : randomQ + i) % queueIdList.size(); int queueId = queueIdList.get(index); if (brokerController.getBrokerConfig().isMarkTaintWhenPopQueueList()) { this.brokerController.getPullMessageProcessor().getPullRebalanceManager() .addOrUpdateTaint(requestHeader.getTopic(), requestHeader.getConsumerGroup(), queueId); } if (queueId = 0 && queueId 1) { // 通过位运算,标记指定下标的CK消息被ACK标记。 markBitCAS(pointWrapper.getBits(), indexOfAck); } ``` 在上述存储完成后,后续通过其它线程进行处理。处理包括CK、AK的存储,以及抵消等操作。主要在3.2节中阐述。 3.1.3 ChangeInvisibleTimeProcessor 该Processor主要用于修改消息不可见时间。此外,其中还定义了一些实用的方法,例如CK的buffer存储方法,以及持久化CK的方法。此处不过多展开。 3.2 核心Service 3.2.1 PopReviveService 该Service被AckMessageProcessor启动。该Service主要对重试队列中的消息进行merge,尝试消除此前没有CK、ACK的消息。 在该Service中,会定期消费Revive队列中的CK、ACK对。在其中会维护一个map,将CK和ACK进行存放与检索: ```java if (PopAckConstants.CK_TAG.equals(messageExt.getTags())) { ...... // 将CK存放入map map.put(point.getT() + point.getC() + point.getQ() + point.getSo() + point.getPt(), point); point.setRo(messageExt.getQueueOffset()); ...... } else if (PopAckConstants.ACK_TAG.equals(messageExt.getTags())) { ...... // 从map中检索是否有CK AckMsg ackMsg = JSON.parseObject(raw, AckMsg.class); String mergeKey = ackMsg.getT() + ackMsg.getC() + ackMsg.getQ() + ackMsg.getSo() + ackMsg.getPt(); PopCheckPoint point = map.get(mergeKey); ...... } ``` 在完成消息的取出后,会进行revive过程中未ACK消息的重试,以及位点提交: 3.2.2 PopBufferMergeService 该Service主要对Buffer中的CK、ACK进行检查。根据检查结果将展开不同的操作。此外,在该Service中还定义了许多实用的CK、ACK操作方法,能够对CK、ACK进行检查以及存储等相关操作。 该Service会启动一个线程,定期执行`scan()`方法。此外,在扫描达到一定次数之后,将执行`scanGarbage()`方法清理数据。 在`scan()`中将会执行如下操作: 1. 获取buffer中所有键值对,作为CK集合。 ```java Iterator iterator = buffer.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry entry = iterator.next(); PopCheckPointWrapper pointWrapper = entry.getValue(); ``` 2. 针对每一个CK集合,检查其中的每个CK是否均被ACK,如果已经被ACK,则删除该CK集合。 ```java if ((pointWrapper.isJustOffset() && pointWrapper.isCkStored()) || isCkDone(pointWrapper) || (isCkDoneForFinish(pointWrapper) && pointWrapper.isCkStored())) { if (brokerController.getBrokerConfig().isEnablePopLog()) { POP_LOGGER.info("[PopBuffer]ck done, {}", pointWrapper); } iterator.remove(); counter.decrementAndGet(); continue; } ``` 3. 针对未被ACK的CK,将CK、ACK信息进行存储,待一定时间后进行重试。 ```java if (removeCk) { // put buffer ak to store if (pointWrapper.getReviveQueueOffset() < 0) { putCkToStore(pointWrapper, false); countCk++; } if (!pointWrapper.isCkStored()) { continue; } for (byte i = 0; i < point.getN(); i++) { // reput buffer ak to store if (DataConverter.getBit(pointWrapper.getBits().get(), i) && !DataConverter.getBit(pointWrapper.getToStoreBits().get(), i)) { if (putAckToStore(pointWrapper, i)) { count++; markBitCAS(pointWrapper.getToStoreBits(), i); } } } ``` 在上述操作中,CK与ACK能够成对存储,且定时重试。直到消费者发送回成对的ACK、CK请求,这对请求方可抵消,也象征该消息被消费成功。 代码详解 可参考如下PPT中的介绍内容:
#技术探索

2025年5月12日

开源之夏 2025|Apache RocketMQ 社区项目期待你的参与!
开源之夏 2025 开源之夏是由中国科学院软件研究所“开源软件供应链点亮计划”发起并长期支持的一项暑期开源活动,旨在鼓励在校学生积极参与开源软件的开发维护,培养和发掘更多优秀的开发者,促进优秀开源软件社区的蓬勃发展,助力开源软件供应链建设。开源之夏于 2020 年正式发起,开源之夏 2025 是第六届活动。 活动联合各大开源社区,针对重要开源软件的开发与维护提供项目开发任务,并向全球高校学生开放报名。学生可自主选择感兴趣的项目进行申请,中选后在项目开发者(社区导师)的指导下进行开发。 通过参与活动,不仅可以结识开源界小伙伴和技术大牛,获得社区导师的专业指导,与开源项目开发者深度交流,还能获得丰富的项目实践经验,提升项目开发技能,为学习深造提供助力,为职业发展积攒履历。 此外,根据项目的难易程度和完成情况,结项者将获取开源之夏活动劳务报酬和结项证书。项目难度分为基础和进阶两档,对应结项劳务报酬分别为:税前 8000 元人民币和税前 12000 元人民币。 Apache RocketMQ 社区项目 Apache RocketMQ 于 2012 年诞生于阿里巴巴核心电商系统,于 2016 年捐赠给 Apache 基金会,于 2017 年成为 Apache 顶级项目。现 Apache RocketMQ 致力于构建低延迟、低成本、高可用的分布式“消息、事件、流”统一处理平台,覆盖“云、边、端”⼀体化数据处理场景,帮助企业和开发者在智能化时代,轻松构建事件驱动架构的云原生 / AI 原生应用。 本次开源之夏共提供 2 个课题项目: 1. RocketMQ 主备副本利用系统内置 Topic 完成元数据增量同步,项目社区导师:金融通 2. RocketMQ 路由反向更新机制,项目社区导师:ShannonDing 如何参与项目? 欢迎扫描上方海报二维码,查看 Apache RocketMQ 社区项目详情,其中有项目导师的姓名与联系邮箱,可通过邮件与导师进行沟通,并准备项目申请材料、提交项目申请,每位同学可以申请一个项目。 以下是开源之夏的活动流程,更多参与指南请查看
#社区动态

2024年12月20日

Apache RocketMQ 荣获 2024 开源创新榜单“年度开源项目”
近日,以“新纪天工、开物焕彩——致敬开源的力量”为活动主题的“重大科技成就发布会(首场)”在国家科技传播中心成功举办,并隆重揭晓了 2024 开源创新榜单,旨在致敬中国开源力量,传播推广开源科技成就,营造中国开源创新生态。 2024 年开源创新榜单由中国科协科学技术传播中心、中国计算机学会、中国通信学会、中国科学院软件研究所共同主办,中国开发者社区承办,以王怀民院士为首组建评审委员会,进行研讨评审,面向中国开源行业领域,遴选具有创新性、贡献度和影响力的开源项目、社区、应用场景与开源事件。 在评审出的 10 个年度开源项目中,Apache RocketMQ 成功入选。 Apache RocketMQ 社区近况 Apache RocketMQ 创新论文连续被软件工程顶级会议录用 (1)2024 年 9 月,由阿里云消息队列团队发表的关于 RocketMQ 锁性能优化论文《Beyond the Bottleneck: Enhancing HighConcurrency Systems with Lock Tuning》被 CCFA 类软件工程顶级会议 FM 2024 录用。 高并发系统常常面临性能瓶颈,主要是由于线程间激烈竞争锁导致的等待和上下文切换。作为一家云计算公司,我们非常重视性能的最大化。为此,我们对轻量级自旋锁进行了改进,并提出了一种简洁的参数微调策略,能够在最低风险条件下突破系统性能瓶颈。该策略在高吞吐量消息队列系统 Apache RocketMQ 中得到了验证,实现了 X86 CPU 性能提升 37.58% 和 ARM CPU 性能提升 32.82%。此外,我们还确认了这种方法在不同代码版本和 IO 刷新策略下的一致有效性,显示出其在实际应用中的广泛适用性。这项工作不仅为解决高并发系统的性能问题提供了实用工具,还突显了形式化技术在工程问题解决中的实际价值。 (2)2023 年 9 月,由阿里云消息队列团队发表的关于 RocketMQ 高可用范式设计论文《RocketHA: A Logbased Storage High Availability Paradigm for Messaging and Streaming Scenarios》被软件工程 CCFA 类顶级会议 ASE 2023 录用。 该论文详细探讨了 RocketMQ 在其发展历程中所蕴含的高可用性设计理念,凝聚了团队在行业应用中积累的宝贵经验。为了应对分布式系统中常见的故障,如崩溃和网络分区,RocketHA 提出了一种基于日志存储的高可用性设计框架。该框架由六个基本组件构成,旨在实现系统在面对各种故障时的自动集群恢复。具体而言,RocketHA 通过模块化设计,实现了消息、事件及流场景的高可用性,确保系统能够在发生意外故障时迅速且有效地恢复。此外,该设计还优先考虑了高吞吐量与数据丢失防护,以保障系统在进行大规模数据处理时的稳定性和可靠性。评估结果表明,RocketMQ 在多种负载和故障场景下都表现出卓越的高可用性和快速恢复能力。本文提出的 RocketHA 的设计理念可为其他基于日志存储的系统提供参考和借鉴,推动相关领域的研究与开发。 GSoC(Google Summer of Code) 2024 在谷歌主办的 GSoC 2024 中,Apache RocketMQ 开源社区共提报通过两个选题: 1. RocketMQ Dashboard Supports RocketMQ 5.0 Architecture and Enhances Usability:该题目旨在强化 RocketMQ 的开源控制台能力。 2. Optimizing Lock Mechanisms in Apache RocketMQ:该题目旨在优化锁行为,优化 RocketMQ 的性能以及资源占用。 两个题目均成功结项,第一个题目为 Apache RocketMQ 发布了 rocketmqdashboard 2.0.0,自此RocketMQ Dashboard 支持 Apache RocketMQ 5.0 。第二个题目创新性地提出了 ABS 锁,为轻量化的自旋锁提供了一套退避策略,从而实现低成本、有限制的锁自旋行为,同时适应不同强度的资源争抢情况 Apache RocketMQ 社区 5.3.0、5.3.1 版本发布 Apache RocketMQ 社区近期发布了 5.3.0 和 5.3.1 两个版本,两个版本主要修复现有的 bug 并提升系统的整体稳定性和性能。值得一提的是,Apache RocketMQ 5.3.0 引入了 Apache RocketMQ ACL 2.0 支持,为用户带来了更加灵活和安全的访问控制机制。这些改进和新增功能将显著提升 Apache RocketMQ 在生产环境中的稳定性和安全性,进一步满足用户的业务需求。 Apache RocketMQ 中文社区全新升级 2024 年 7 月,Apache RocketMQ 中文社区(https://rocketmq.io)全新升级,致力于为每一位热衷于 RocketMQ 技术探索与实践的开发者,打造一个集时效性、全面性、深度于一体的一站式学习平台。 最全最新资讯: Apache RocketMQ 中文社区提供从基础到深入的全面学习资料,涵盖原理介绍、架构解读、源码分析等基础知识,高级性能使用、技术前沿探索、场景最佳实践等博客文章,用户反馈的真实答疑样例等,并及时更新版本发布、架构演进和功能迭代等社区动态,以及社区相关活动和会议信息,为您提供更多学习和交流的机会。 智能专家答疑: Apache RocketMQ 中文社区基于 Apache RocketMQ 领域专业知识库,并结合先进的大模型技术进行优化,为您提供 AI 问答助手,作为您的智能学习伴侣。通过自然语言问答,让您的疑问得到迅速解答,使您的学习之旅更加轻松有趣。 关于 Apache RocketMQ RocketMQ 致力于构建低延迟、高并发、高可用、高可靠的分布式“消息、事件、流”统一处理平台,覆盖云边端⼀体化数据处理场景,帮助企业和开发者在智能化时代,轻松构建事件驱动架构的云原生应用。 RocketMQ 自 2012 年诞生于阿里巴巴集团的核心交易链路,至今已经历十余年“双十一”的万亿级数据洪峰验证。2015 年,阿里云面向企业提供商业化的消息队列服务,其中包括云消息队列 RocketMQ 版。2016 年,阿里巴巴向 Apache 软件基金会捐赠了 RocketMQ 项目,RocketMQ 进入 Apache 孵化器。2017 年,Apache RocketMQ 成为 Apache 顶级项目,在开源消息中间件领域占据领导地位。2022 年,Apache RocketMQ 5.0 正式发布,全面拥抱云原生架构、超融合架构,进一步拓展事件驱动、物联网等场景。
#社区动态

2024年11月18日

基于Apache RocketMQ 事件驱动架构的 AI 应用实践
AI 应用在商业化服务的阶段会面临诸多挑战,比如更快的服务交付速度,更实时、精准的结果以及更人性化的体验等,传统架构限制于同步交互,无法满足上述需求,本篇文章给大家分享一下如何基于事件驱动架构应对上述挑战。 盘点 AI 应用场景 在深入探讨事件驱动架构跟 AI 结合前,我们先梳理一下 AI 应用的现状。 从应用架构层面,大致可以把 AI 应用分为以下三类: 1)基于基础模型的扩展应用,典型的如 ChatGpt(文本生成)、StableDiffusion(图像生成)、CosyVoice(声音生成)等,这类应用通常会以模型能力为核心,提供相对原子化的服务。 2)智能知识库应用,如 Langchain chatchat,这类应用是以 LLM 为核心,基于 RAG(增强检索技术)构建的具有广泛的业务场景的应用。 3)智能体应用,智能体应用核心要点是应用以 LLM 为交互中枢,能够通过工具的调用联通外部世界,复杂的表现形式如多智能体协作等,是企业 AI 应用落地最具想象空间的一类应用。 浅析 AI “原生” 说到“原生”二字,它代表的是对某种概念的广泛认知,比如提移动原生应用立马可以联想到手机端的 APP,提云原生应用很多开发者立马可以想到容器化等,而对于 AI “原生”,除了 ChatGpt,Midjourney 等几款头部 AI 应用,我们似乎还没有看到像移动应用那样广泛的“原生”应用被定义出来,当然今天也没有办法给出明确的结论,只是通过一些事实,帮大家推演 AI “原生”的方向,希望能够帮助慢慢凝聚在内心中那个对“AI 原生”的影像。 AI 给应用架构带来的变化 当 AI 能力加入后,我们的应用架构发生了较大的变化。RAG,Agent 等编程范式被引入,传统的工作流也因为有了 AI 节点,变得与以往有所不同。 AI 应用架构RAG AI 应用架构Agent 加入 AI 节点的工作流 AI 应用的变化趋势 从观察知名 AI 厂商的产品形态演进看,AI 应用由前面提到的基础模型扩展、智能知识库、智能体三类叠加又相对分离,在慢慢向由智能体统一管控约束的方向发展。 比如 Open AI 的 Canvas,Claude Artifacts,Vercel v0 等产品特性。它们都表现出了一系列的共性:智能内核,多模态,LUI 交互。 从另外一个角度理解,AI 原生的应用只有突破之前的用户体验才有可能让用户买单。分散的基础模型能力,多模态能力都只能在某些场景下有体验提升,某些方面甚至不如传统应用的用户体验。所以需要整合,将对话式交互,智能模型和多模态叠加从而构建出超越传统应用的使用体验。 使用事件驱动构建 AI 原生 这里并不是单纯为了追求技术的先进性而使用事件驱动架构,是因为实践中顺序式的架构有时候无法满足业务需求。 传统顺序式的架构在构建 AI 原生的挑战 顺序调用无法保障推理体验 模型服务的推理耗时远高于传统意义的网络服务调用,比如在文生图这个场景下使用 StableDiffusion 服务,即使经过算法优化后最快也是秒级,并发量较大的时候,会很容易导致服务器宕机。此外如声音的合成,数字人的合成等耗时可能是分钟级的,此时顺序调用明显就不太合适。选择事件驱动的架构可以快速响应用户,推理服务按需执行,这样既能够保障用户体验,同时也降低系统宕机风险。 顺序调用无法支持实时数据构建的需求 在智能问答系统中,结果的好坏跟数据有很大的关系。问答召回数据的实时性和准确性很大程度影响着智能问答系统的用户体验,从系统架构层面,问答和数据的更新是分开的。靠人工去更新海量数据不现实,通过设置定时任务以及构建知识库数据更新的工作流能够更加有效的解决数据实时更新的问题,事件驱动架构在这个场景下优势非常明显。 双向互动场景无法实现 在问答服务场景下,拟人化的行为能够得到用户好感从而扩展商机,传统的问答式应用架构相对机械死板,而使用消息队列作为信息传输可以有效主动触达用户,通过合理的意图判断,主动向用户问好,是有效的留存手段。 事件驱动构建 AI 原生的实践 接下来分享一下基于事件驱动架构构建的 AI 应用的一些实践。 StableDiffusion 异步推理 前面提到了关于文生图模型 StableDiffusion 在服务客户中遇到的问题,我们利用事件驱动架构,使用函数计算和轻量消息队列(原 MNS)构建了 StableDiffusion 的异步推理架构,用户请求到来时经过函数计算网关到达 API 代理函数,API 代理函数对请求进行打标鉴权,之后将请求发送到 MNS 队列,同时记录请求的元数据和推理信息到表格存储 TableStore,推理函数根据任务队列进行消费,调度 GPU 实例启动 StableDiffusion 进行服务,结束后返回图片结果以及更新请求状态,端侧通过页面上的轮询告知用户。 VoiceAgent 实时对话 这是一个相对复杂的应用,使用者可以通过语音跟背后的智能问答服务实时对话,同时还能够接收到来自智能服务的主动询问。 整体依然采用事件驱动架构,其 RTC Server 部分安装 rocketmqclient,订阅中心化的服务 topic,由定时任务(主要是意图分析)触发向队列 topic 生产消息内容,然后由 rocketmqclient 消费,进行主动询问。 VoiceAgent 知识库实时数据流 对于问答的另外一端,知识库的自动更新,则是通过 Catch Data Capture 策略,比如由外部系统数据源触发,或者通过将文档上传 OSS 触发。数据经过切片,向量化之后存储到向量数据库以及全文检索数据库。 面向 AI 原生应用的事件驱动架构 最后分享一下作为 AI 应用开发者的一套组合方案:通过 阿里云云应用平台 CAP(Cloud Application Platform) 选出基础模型服务,如 Ollama,ComfyUI,Cosyvoice,Embedding 等进行快速托管,使用 RcoketMQ,Kafka,MNS, Eventbridge 等搭建数据流管道和消息中心,本地利用 Spring AI Alibaba 等框架开发后端服务,实现 RAG,Agent 等能力。前端使用 Nextjs 框架构建界面,之后将开发好的前后端通过 Serverless Devs 工具部署到 CAP 平台,进行线上调用访问,最终上生产采用云原生网关保驾护航,对于长期的知识库或者智能体的运维则通过 ARMS 进行指标监控。
作者:寒斜
#技术探索

2024年11月6日

Apache RocketMQ 打破锁性能瓶颈之道
背景 Apache RocketMQ 是一个云原生消息传递和流式处理平台,可简化创建事件驱动型应用程序的过程。多年来随着 RocketMQ 的迭代,已经编写了大量代码来利用多核处理器,通过并发提高程序效率。因此管理并发性能变得至关重要,同时锁对于确保在访问共享资源时多个执行线程安全同步至关重要。尽管锁对于确保多核系统中的互斥性是必不可少的,但它们的使用也可能带来优化挑战。随着并发系统内部变得越来越复杂,部署有效的锁管理策略是保持性能的关键。 因此,在 GSOC 2024 中,我们 Apache RocketMQ 开源社区提报了一个非常具有挑战性的题目:《GSOC269 : Optimizing Lock Mechanisms in Apache RocketMQ》。在这个题目中,我们旨在优化锁行为,优化 RocketMQ 的性能以及资源占用。通过这个题目,我们创新性地提出了 ABS 锁 —— Adaptive Backoff Spin Lock。ABS 锁的思想是通过为轻量化的自旋锁提供一套退避策略,从而实现低成本、有限制的锁自旋行为,同时适应不同强度的资源争抢情况。 之所以取其缩写 ABS,是因为该锁的设计思想与刹车系统中的 ABS 系统有一定相似性。在早期系统中使用的自旋锁会一直进行自旋,但是当临界区较大时会产生大量无效自旋,类似刹车制动中的“抱死”结果,这导致我们的资源利用急剧增长。为了避免这种资源损耗,当前社区使用互斥锁对其进行了替代,避免临界区较大时的资源浪费。但是在这种替代方案落实后,当消息较小时,互斥锁的阻塞唤醒机制反而影响到消息发送的响应时间,并带来了更高的 CPU 损耗。 由于自旋锁在临界区较小时的响应时间(RT)以及 CPU 利用方面都有较好的表现,唯一的不足是在资源争抢时会带来资源浪费。因此我们决定对自旋锁进行优化,降低无效的资源损耗,从而更好的利用自旋锁的优点,使其同时适合争抢激烈或不激烈的场景。在实践中,我们已经证明了调整锁策略会影响 RocketMQ 的消息发送性能,带来显著的性能优化结果。 通过 ABS 锁,我们还能解决开源使用者对锁的抉择问题:在 RocketMQ 消息投递时存在两种锁定机制,SpinLock(swapAndSet),以及 ReentrantLock 互斥锁,但并无文档对其进行分析使用的场景。所以我们通过 ABS 锁对其进行整合,达到最优状态并且完成服务端的锁定机制闭环,不需要用户去决定当前场景锁定机制的选取,自然而然地根据争抢情况对锁参数进行微调。ABS 锁可以根据运行时条件动态调整其行为,例如锁争用级别和争用同一资源的线程数。这可以通过最大限度地减少与锁获取和释放相关的开销来提高性能,尤其是在高争用的情况下。通过实时监控系统的性能指标,ABS 锁可以在不同的锁定策略之间切换。 我们目前已经实现自适应锁定机制(ABS 锁),通过实验结果成功验证:自适应锁定机制达到不同场景下单独使用互斥锁/自旋锁的最优效果,简而言之就是取得不同场景下最优锁定机制的效果。本文将详细介绍 Apache RocketMQ 的锁机制迭代过程,并介绍 ABS 锁的优化效果。 相关概念介绍 在文章正式开始前,需要介绍一些本文中可能频繁用到的概念:临界区、互斥锁、自旋锁。了解清楚这些概念将有助于阅读本文的优化思想。 临界区 临界区(Critical Section)是一段供线程独占式访问的代码,也就是说若有一线程正在访问该代码段,其它线程想要访问,只能等待当前线程离开该代码段方可进入,这样保证了线程安全。一般临界区大小是受多方面影响的,例如,本文中消息发送过程的临界区大小可能受消息体大小影响。 互斥锁 互斥锁是一种独占锁,当线程 A 加锁成功后,此时互斥锁已经被线程 A 独占了,只要线程 A 没有释放手中的锁,线程 B 就会失败,就会释放掉 CPU 给其他线程,线程 B 加锁的代码就会被阻塞。 互斥锁加锁失败而阻塞是由操作系统内核实现的,当加锁失败后,内核将线程置为睡眠状态,等到锁被释放后,内核会在合适的时机唤醒线程,当这个线程加锁成功后就可以继续执行。具体的互斥锁行为可以参考下图: 互斥锁带来的性能开销主要在两次线程上下文切换的成本。 1. 当线程加锁失败时,内核将线程的状态从【运行】切换到睡眠状态,然后把CPU切换给其他线程运行; 2. 当锁被释放时,之前睡眠状态的线程会变成就绪状态,然后内核就会在合适的时间把CPU切换给该线程运行。 自旋锁 自旋锁通过 CPU 提供的原子操作 CAS(CompareAndSet),在用户态完成加锁和解锁操作,不会主动产生线程上下文切换,所以相比互斥锁来说,会快一些开销小一些。它和互斥锁的主要区别在于,当加锁失败,互斥锁使用线程切换应对,而自旋锁用忙等待应对。 RocketMQ 的锁机制迭代 下面,我们将介绍 RocketMQ 的锁机制迭代过程。由于自旋锁、互斥锁有各自的优劣势:自旋锁的优势在于其轻量级和低上下文切换开销,适合短时间等待的情况;而互斥锁的优势在于能够节约资源,适合长时间占有锁的场景。 因此本文中将其优势分别比喻为“鱼”和“熊掌”,我们将一步步介绍如何同时拿到最优质的“鱼”和“熊掌”。 鱼和熊掌的抉择——互斥/自旋 在早期为了减小线程上下文切换带来的资源损耗,Apache RocketMQ 选取了自旋锁进行实现。随着版本迭代 RocketMQ 的并发压力日益增长,导致当临界区较大时,自旋锁会产生大量无效自旋,这导致 Broker 的资源利用急剧增长。因此后期 RocketMQ 又使用互斥锁对其进行了替代,避免临界区较大时的资源浪费。 但是当消息较小时,互斥锁的阻塞唤醒机制反而影响到 RT 以及上下文切换引起的更高的 CPU 损耗,自旋锁在临界区较小时的响应时间(RT)以及 CPU 利用方面都有较好的表现,因此在过去我们一直被锁定机制的正确选取所困扰。直到今日,RocketMQ 的内部仍然保留着这两种锁,且通过一个开关进行控制:useReentrantLockWhenPutMessage。 这意味着,使用者需要在启动 broker 时,决定自己的锁类型——如果消息体都比较小,且发送 TPS 并不大,则使用自旋锁;当消息体较大时,或者竞争极为激烈时,则启用互斥锁。 鱼和熊掌兼得—— k 次退避锁 为了解决这个问题,我们启动了锁优化的工作。我们希望有一把锁能够同时具备自旋锁、互斥锁的特点,同时适用于竞争激烈和不激烈的情况。我们最终决定改造自旋锁,通过一把特殊的自旋锁,使系统在各种竞争情况下都保持非常优质的锁行为。自旋锁由于无限自旋直到获取到锁,在临界区较大时会产生较多的空转,耗费大量的 CPU 资源。为了能有效利用自旋锁的优势,因此我们要在临界区较大时对其空转次数的控制,从而避免大量空转,最大程度兼容临界区较大的场景。 最终我们通过对自旋锁的行为建模,提出了 k 次退避锁:进行 K 次自旋后还未获得锁后,执行 Thread.yield() 将 CPU 执行权交给操作系统。这种行为能够避免互斥锁的无谓上下文切换,也能避免高压场景下的无限自旋带来的 CPU 损耗。 这种行为能够缓解系统压力,取得自旋和 CPU 上下文切换两中方法中的最低资源损耗。 在刚过去不久的 FM 24 会议上,Juntao Ji 已经做出 k 次自旋锁的相关理论建模分享,以及实验验证[3]。在k次自旋锁的作用下,我们能找到系统性能的局部最优点,达到最大的 tps 性能。结果如下表所示: 以 X86 架构,同步刷盘的行为为例。实验结果表明,在 k= 10^3 时,发送速度不仅达到峰值(155019.20),CPU 使用率也达到最低。这表明退避策略成功地节省了 CPU 资源。此时,CPU 支持更高的性能水平和较低的利用率水平,这表明性能瓶颈已经转移——例如,可能已经转移到了磁盘上。在表中可以观察到,在具有相同的 k(10^3)和配置参数(最新代码,SYNC 刷盘模式)的 ARM CPU 上,RocketMQ 的性能提高了 10.4%。此外,如上图所示,当 k= 10^3 时,CPU 使用量大幅下降,从平均超过 1000% 下降到 750% 左右。资源消耗的减少表明,减轻其他系统瓶颈可能会导致更显著的性能提高。 最优质的鱼和熊掌—— ABS 锁 在上文已经证明了 k 次退避锁的有效性。但是当前还有一个问题:我们发现当临界区足够大时,自旋锁的资源损耗依旧远超互斥锁(多次退避仍然获取不到)。这样 k 值其实带来的影响是负面的,也就是说,最终逃避不了上下文切换的成本,反而还带来了自旋的等待成本。 因此我们决定通过实现 k 的动态调整再次优化,当临界区较大时,对k进行自适应增大,当达到与互斥锁相同级别资源损耗时(即 k 达到自适应的最大值),此时自旋锁已经不再适合此种场景,因此我们将对其进行互斥锁的切换。这也是我们最终要实现的 ABS 锁。 1. ABS 锁实现 我们通过对不同的锁定机制进行理论分析其所适合的场景,并对其在所适合的场景进行大量实验测试得出多个场景的最优锁定机制。最后通过对运行时条件的动态变化(竞争线程数/TPS/消息大小/临界区大小)进行最优锁定机制的切换。 2. ABS 锁的 K 值自适应策略 上面我们论证出控制自旋次数对于性能优化有不错的效果,但是这个 K 值对于不同系统是不一样的,因此我们需要实现自旋次数 K 的自适应。 简而言之,K 的自适应策略就是一种从低频次自旋到高频次自旋的演化过程,对应临界区争抢变得不断严重或是临界区不断变大的过程。当 k 逐渐增大的过程中,可以增加在线程退避之前就获取到锁的概率,但是当自旋次数增加到一定数量级时,此时自旋成本已经高于线程上下文切换的成本,说明此时已经不适合使用自旋锁——所以此时退化为互斥锁。 通过实验,我们将自旋次数 K 自适应最大值设置为1万,因为在实验中,我们发现当自旋次数大于1万时,竞争激烈时带来的优化效果会受到显著影响,甚至大于一次 CPU 上下文切换的代价。所以此时我们将其切换为互斥阻塞等待锁定机制。 为了自适应调整 K 值,我们提出了一个闭环的工作流,如下图所示: 在图中我们可以看到,我们主要衡量当前 k 值下,自旋获取锁的成功率。如果在当前 k 值下获取锁的成功率不够高,则适当增加 k 值,这将带来更大的获取概率。但是如果一直增加都无法有效提高拿锁成功率,则将其转为互斥锁,会带来更高的效益。 成为互斥锁代表当前可能有较高的突发流量,导致对锁的竞争变得激烈了。但是互斥锁是不适用于低压场景的,所以我们还需要决定如何从互斥锁转回自旋锁。因此我们记录了从自旋锁切换为互斥锁的请求速率。当整体请求速率低于这个数值的 80% 时,则切换回自旋锁。 实验及结果 为了验证 ABS 锁的正确性以及性能优化效果,我们做了多组实验,包括性能测试以及混沌故障测试。本章将介绍具体的实验设计以及结果。 1. 性能实验配置 1. namesrv 一台,broker一台,openmessagebenchmark 一台压测机 2. 上述三台机器的配置为,处理器:8 vCPU,内存:16 GiB,规格族:ecs.c7.2xlarge,公网带宽:5Mbps,内网带宽:5/ 最高 10 Gbps。 3. 消息体大小分别设置了1KB以及2B两种场景,用于影响发送消息时的临界区大小。 2. 性能测试结果 CPU 与耗时的优化 消息 body 大小 1 kb 时,我们记录了不同消息发送速率下的 CPU 占用情况。结果如下图所示: 结果表明,在消息量不断增加时,我们的自适应锁带来了非常明显的优势,有效降低了 CPU 的使用率。 另外,我们还对消息发送时的 P9999 做了记录。P9999 代表发送过程中,发送耗时排在 99.99% 的尾部请求,一般反映了这段时间内的最慢请求速度。结果如下图所示: 可以看到,在不同 TPS 下,我们的自适应锁都能带来更优质的 P9999,有效降低了发送过程中由于锁争抢带来的耗时。 不过,尾部的请求情况一般代表竞争极为激烈时的极端场景。可能有的消息经过多次锁请求尝试但是都未获得锁,因此导致了这种长尾效应。我们还测试了在不同 TPS 下的平均发送耗时情况,发现在竞争极为严重时,我们的 ABS 锁由于自带了一定自旋,所以会让平均延时大约提升 0.5 ms。使用时需要仔细衡量具体场景,以确定是否在高压时开启自适应锁。 我们同时还针对消息体更小的实验场景做了实验,下表是完整的测试结果: 最大性能提升 此外,为了计算由自适应锁带来的性能提升,我们还测试了 Broker 的最大性能,结果如下所示: | CPU Arch | Flush Policy | Original QPS | Optimal QPS | Improvement | | | | | | | | X86 | ASYNC | 176312.35 | 184214.98 | +4.47% | | X86 | SYNC | 177403.12 | 187215.47 | +5.56% | | ARM | ASYNC | 185321.49 | 206431.82 | +11.44% | | ARM | SYNC | 188312.17 | 212314.43 | +12.85% | 根据该表,我们可以认为自适应锁同时能够在多个场景下均找到最大的性能点,实现性能的释放。 实验总结 经过如上所有实验数据,我们可以得出如下结论: 1. 在临界区较小时,ABS 锁提供更高的 TPS 以及更低的响应延时,极限 TPS 提高 12.85% 甚至更高,极限 TPS 下响应时间降低了 50% 左右; 2. 在临界区较大时,ABS 锁提供更低的 RT 以及更低的资源损耗,CPU 损耗从 500%400%,减少无效资源浪费。 3. 故障测试 在软件工程领域,提倡"拥抱故障"的理念意味着认识到错误和异常是正常的一部分,而非完全避免。像 RocketMQ 这样的关键系统,它在生产环境中承受的压力远超于理想化的实验室测试。为此,采用了混沌工程策略,这是一种主动探寻系统极限和脆弱性的实践。 混沌工程的核心目标是增强系统的鲁棒性和容错能力,它是通过在实际操作中人为制造故障,如网络延迟、资源限制等,来观察系统如何应对突发状况。这样做是为了确认系统是否能在不确定和复杂的现实中保持稳定,能否在面对真实世界的问题时依然能高效运作。 RocketMQ 引入自适应锁定机制后,进行了严格的混沌工程实验,包括但不限于模拟分布式节点间的通信故障、负载峰值等情况,目的是验证新机制在压力下的性能和恢复能力。只有当系统经受住这种高强度的“拷打”测试,证明它能在不断变化的环境中维持高可用性,我们才认为这是一个成熟的、可靠的解决方案。 总结起来,通过混沌工程,我们对 RocketMQ 进行了实战演练,以此来衡量其实现高可用性的真正实力。 并希望通过这样的故障测试,证明我们对锁的调整不影响其数据写入的正确性。 实验配置 我们混沌测试的验证实验环境如下: 1. namesrv 一台,内含 namesrv 进程。 2. openchaos 的混沌测试一台,向 broker 发出控制指令。 3. broker 三台,同属一个集群,内含 broker 进程。 上述五台机器的配置为,处理器:8 vCPU,内存:16 GiB,规格族:ecs.c7.2xlarge,公网带宽:5Mbps,内网带宽:5/ 最高 10 Gbps。 在测试中,我们设置了如下的若干种随机测试场景,每种场景都会持续至少 30 秒,且恢复后会保证 30 秒的时间间隔再注入下一次故障: 1. 机器宕机,这个混沌故障注入通过 kill 9 命令实现,将会杀死范围内的随机进程。 2. 机器夯机,这个混沌故障注入通过kill SIGSTOP 命令实现,模拟进程暂停的情况。 每组场景的测试至少重复 5 次,每次至少持续 60 分钟。 实验结论 针对上述提出的所有场景,混沌测试的总时长至少有: 2(场景数) 5(每组测试次数) 60(单组时长) = 600分钟 由于设置的注入时长 30s,恢复时长 30s,因此至少共计注入故障 600/1 = 600 次(实际上注入时长、注入次数远多于上述统计值)。 在这些记录在册的测试结果中,RocketMQ 无消息丢失,数据在故障注入前后均保持强一致。 总结 本文实现了 RocketMQ 锁定机制的迭代过程以及自适应退避自旋锁机制(ABS 锁)的设计以及实现,随着并发系统内部变得越来越复杂,部署有效的锁管理策略是保持性能的关键。因此,我们希望更深入地探索该领域性能优化的潜力,探索性能的极限。 同时我们希望在未来结合多种分布式锁定机制以及其他优秀思想,对其进行实现以及性能测试,期待达到多端本地锁定状态共识:单次通信即可得到锁的成果,减少无效网络通信以及更低的总线资源损耗。但目前专注于消息投递时的锁定机制实现,并未对其进行具体实现以及测试。其它分布式锁机制可以参考如下。 未来可能引入的其它锁优化思想 延迟槽位思想(Delay card slot idea): 对每个客户端分配不同槽位的延时,让客户端进行延迟后再次请求,从而避免总线的大量无效碰撞。 CLH 思想: 循环读取上一个位点的值,更改自身状态。 MCS 思想: 与 CLH 相似,但是时在本地自旋,更改下一个节点的状态。解决了 CLH 在 NUMA 系统架构中获取 locked 域状态内存过远的问题 SMP CPU 架构 基于 Ppersistent CSMA 思想: Ppersistent CSMA(Persistent Carrier Sense Multiple Access)是一种网络通信协议的思想,它通常用于描述分布式环境中如无线局域网(WiFi)中的冲突避免策略。这个思想起源于 CSMA/CD(Carrier Sense Multiple Access with Collision Detection)协议,但它引入了一种更持久的监听机制。 1. 基本流程: 每个设备(节点)在发送数据前会持续监听信道是否空闲。如果信道忙,节点就会等待一段时间后再次尝试。 2. P计数器: 当检测到信道忙时,节点不会立即退回到等待状态,而是启动一个名为 P(persistent period)的计数器。计数器结束后,再尝试发送。 3. 碰撞处理: 如果有多个节点同时开始发送,在信道上发生碰撞,所有参与碰撞的节点都会注意到并增加各自的P计数器后再试。这使得试图发送的数据包在更长的时间内有机会通过,提高了整体效率。 4. 恢复阶段: 当 P 计数器归零后,如果信道依然繁忙,节点会进入恢复阶段,选择一个新的随机延迟时间后再次尝试。 Ppersistent CSMA 通过这种设计减少不必要的冲突次数,但可能会导致网络拥塞情况下的长时间空闲等待。 参考链接 [1] Apache RocketMQ [2] OpenMessaging OpenChaos [3] Ji, Juntao & Gu, Yinyou & Fu, Yubao & Lin, Qingshan. (2024). Beyond the Bottleneck: Enhancing HighConcurrency Systems with Lock Tuning. 10.1007/9783031711770_20. [4] Chaos Engineering [5] T. E. Anderson, "The performance of spin lock alternatives for sharedmoney multiprocessors," in IEEE Transactions on Parallel and Distributed Systems, vol. 1, no. 1, pp. 616, Jan. 1990, doi: 10.1109/71.80120 [6] Y. Woo, S. Kim, C. Kim and E. Seo, "Catnap: A Backoff Scheme for Kernel Spinlocks in ManyCore Systems," in IEEE Access, vol. 8, pp. 2984229856, 2020, doi: 10.1109/ACCESS.2020.2970998 [7] L. Li, P. Wagner, A. Mayer, T. Wild and A. Herkersdorf, "A nonintrusive, operating system independent spinlock profiler for embedded multicore systems," Design, Automation & Test in Europe Conference & Exhibition (DATE), 2017, Lausanne, Switzerland, 2017, pp. 322325, doi: 10.23919/DATE.2017.7927009. [8] OpenMessaging benchmark
作者:王怀远、季俊涛
#技术探索

2024年10月21日

基于 Apache RocketMQ 的 ApsaraMQ Serverless 架构升级
_本文整理于 2024 年云栖大会阿里云智能集团高级技术专家金吉祥(牟羽)带来的主题演讲《ApsaraMQ Serverless 能力再升级,事件驱动架构赋能 AI 应用》_ 云消息队列 ApsaraMQ 全系列产品 Serverless 化,支持按量付费、自适应弹性、跨可用区容灾,帮助客户降低使用和维护成本,专注业务创新。那 ApsaraMQ 是如何一步一步完成 Serverless 能力升级的?在智能化时代,我们的事件驱动架构又是如何拥抱 AI、赋能 AI 的? 本次分享将从以下四个方面展开: + 首先,回顾 ApsaraMQ Serverless 化的完整历程,即 ApsaraMQ 从阿里内部诞生、开源,到捐赠给 Apache 进行孵化,再到完成 Serverless 化升级的不断突破、与时俱进的过程。 + 然后,重点介绍 ApsaraMQ 的存算分离架构,这是全面 Serverless 化进程中不可或缺的前提。 + 接下来,会从技术层面重点解读近一年 ApsaraMQ Serverless 能力的演进升级,包括弹性能力的升级、基于 RDMA 进一步降低存储和计算层之间的 CPU 开销,以及对无感扩缩容的优化。 + 最后,介绍我们在面向 AI 原生应用的事件驱动架构上的探索,以及基于阿里云事件总线定向更新向量数据库,为 AI 应用融入实时最新数据的最佳实践。 ApsaraMQ Serverless 化历程 首先,我们来看 ApsaraMQ Serverless 化的整个发展历程。 + 2012 年,RocketMQ 在阿里巴巴内部诞生,并将代码在 Github 上开源; + 2016 年,云消息队列 RocketMQ 版开启商业化的同时,阿里云将 RocketMQ 捐赠给了 Apache,进入孵化期; + 2017 年,RocketMQ 以较短的时间孵化成为 Apache TLP,并快速迭代新功能特性,顺利发布 4.0 版本,支持了顺序、事务、定时等特殊类型的消息; + 2018 年,随着大数据、互联网技术的发展,数据量爆发式增长,云消息队列 Kafka 版商业化发布; + 2019 年,云消息队列 RabbitMQ 版、云消息队列 MQTT 版两款产品商业化发布,补齐了 ApsaraMQ 在 AMQP、MQTT 协议适配上的缺失; + 2021 年,经过一段时间的孵化,ApsaraMQ 家族中的另一款产品事件总线 EventBridge 开始公测,开始融合事件、消息、流处理; + 2023 年,ApsaraMQ 全系列产品依托存算分离架构,完成 Serverless 化升级,打造事件、消息、流一体的融合型消息处理平台; + 今年,我们专注提升核心技术能力,包括秒级弹性、无感发布、计算存储层之间的 RDMA 等,实现 Serverless 能力的进一步升级,并结合当下客户需求,通过事件驱动架构赋能 AI 原生应用。 存算分离架构全景 第二部分,我们来看 ApsaraMQ 存算分离架构的全景,这是 Serverless 化升级不可或缺的前序准备。 ApsaraMQ 存算分离架构全景:云原生时代的选择 ApsaraMQ 的存算分离架构,是云原生时代的选择。 + 从下往上看,这套架构是完全构建在云原生基础之上的,整个架构 K8s 化,充分利用 IaaS 层的资源弹性能力,同时也基于 OpenTelemetry 的标准实现了metrics、tracing 和 logging 的标准化。 + 再往上一层是基于阿里云飞天盘古构建的存储层,存储节点身份对等,Leaderless 化,副本数量灵活选择,可以在降低存储成本和保证消息可靠性之间实现较好的平衡。 + 再往上一层是在本次架构升级中独立抽出来的计算层,即无状态消息代理层,负责访问控制、模型抽象、协议适配、消息治理等。比较关键的点是,它是无状态的,可独立于存储层进行弹性。 + 在用户接入层,我们基于云原生的通信标准 gRPC 开发了一个全新的轻量级 SDK,与之前的富客户端形成了很好的互补。 “Proxy” 需要代理什么? 接下来我们重点看下这套架构里的核心点,即独立抽出来的 Proxy。 它是一层代理,那到底需要代理什么呢? 在之前的架构中,客户端与 Broker/NameServer 是直连模式,我们需要在中间插入一个代理层,由原先的二层变成三层。然后,将原先客户端侧部分业务逻辑下移,Broker、Namesrv 的部分业务逻辑上移,至中间的代理层,并始终坚持一个原则:往代理层迁移的能力必须是无状态的。 从这张图中,我们将原先客户端里面比较重的负载均衡、流量治理(小黑屋机制)以及 Push/Pull 的消费模型下移至了 Proxy,将原先 Broker 和 Namesrv 侧的访问控制(ACL)、客户端治理、消息路由等无状态能力上移至了 Proxy。然后在 Proxy 上进行模块化设计,抽象出访问控制、多协议适配、通用业务能力、流量治理以及通用的可观测性。 ApsaraMQ 存算分离的技术架构 接下来看 ApsaraMQ 存算分离的技术架构,在原先的架构基础上剥离出了无状态 Proxy 组件,承接客户端侧所有的请求和流量;将无状态 Broker,即消息存储引擎和共享存储层进行剥离,让存储引擎的职责更加聚焦和收敛的同时,充分发挥共享存储层的冷热数据分离、跨可用区容灾的核心优势。 这个架构中无状态 Proxy 承担的职责包括: 1. 多协议适配: 能够识别多种协议的请求,包括 remoting、gRPC 以及 Http 等协议,然后统一翻译成 Proxy 到 Broker、Namesrv 的 remoting 协议。 2. 流量治理、分发: Proxy 具备按照不同维度去识别客户端流量的能力,然后根据分发规则将客户端的流量导到后端正确的 Broker 集群上。 3. 通用业务能力扩展: 包含但不限于访问控制、消息的 Tracing 以及可观测性等。 4. Topic 路由代理: Proxy 还代理了 Namesrv 的 TopicRoute 功能,客户端可以向 Proxy 问询某个 topic 的详细路由信息。 5. 消费模型升级: 使用 Pop 模式来避免单客户端消费卡住导致消息堆积的历史问题。 无状态 Broker,即消息存储引擎的职责更加的聚焦和收敛,包括: 1. 核心存储能力的迭代: 专注消息存储能力的演进,包括消息缓存、索引的构建、消费状态的维护以及高可用的切换等。 2. 存储模型的抽象: 负责冷热存储的模型统一抽象。 共享存储为无状态 Broker 交付了核心的消息读写的能力,包括: 1. 热存储 EBS: 热存储 EBS,提供高性能、高可用存储能力。 2. 冷存储 OSS: 将冷数据卸载到对象存储 OSS,获取无限低成本存储空间。 3. 跨可用区容灾: 基于 OSS 的同城冗余、Regional EBS 的核心能力构建跨可用区容灾,交付一写多读的消息存储能力。 基于云存储的存算分离架构,兼顾消息可靠性和成本 存算分离架构中的存储层是完全构建在阿里云飞天盘古存储体系之上的。基于这套架构,我们能够灵活控制消息的副本数量,在保证消息可靠性和降低存储成本之间做到一个比较好的平衡。 左图是存算分离存储架构中上传和读取的流程。可以看到,我们是在 CommitLog 异步构建 consumeQueue 和 Index 的过程中额外添加了按照 topic 拆分上传到对象存储的过程;在读取过程中智能识别读取消息的存储 Level,然后进行定向化读取。 基于云存储的架构,我们提供了 ApsaraMQ 的核心能力,包括: 1. 超长时间定时消息: 超过一定时间的定时消息所在的时间轮会保存至对象存储上,快到期时时间轮再拉回到本地进行秒级精准定时。 2. 集群缩容自动接管: 消息数据实时同步到对象存储,对象存储的数据能够动态挂载到任意 Broker,实现彻底存算分离,Broker 的无状态化。 3. 按需设置 TTL 成本优化: 支持按需设置消息 TTL,不同重要程度的消息可设置不同的 TTL,满足消息保存时长需求的同时兼顾成本控制。 4. 冷热消息分离、分段预取: 热数据的读取命中本地存储,速度快;冷数据的读取命中远端存储,速率恒定且不会影响热数据的读取。 5. 动态调控云存储的比例: 动态调控 EBS 和 OSS 的比例,大比例的 EBS 能够具备更高的稳定性,应对 OSS 负载过高的背压,提升 EBS 作为 OSS 的前置容灾的能力;小比例的 EBS 则是可以最大化降低消息单位存储成本,让整体的存储成本趋同于 OSS 存储成本。 Serverless 能力再升级 有了存算分离架构的铺垫,ApsaraMQ 全系列产品 Serverless 化就更加顺畅了。接下来展开介绍 ApsaraMQ Serverless 能力的升级。 消息场景下的 Serverless 化 在消息场景下通常会有两个角色:消息服务的使用方和提供方。 在传统架构中通常是这样的流程:使用方会给提供方提需求:我要大促了,你保障下;提供方说给你部署 10 台够不够,使用方说好的;结果真到大促那一刻,真实流量比预估的要大很多倍,整个消息服务被击穿,硬生生挂了半小时。 在 Serverless 化改造前,仍需提前规划容量;但相比传统架构的提升点是,依托 IaaS 层的快速扩容能力,能够极大缩短扩容时间;再演进到当前的 Serverless 架构,我们期望消息服务提供方是能够非常淡定地应对大促。 Serverless 现在已经成为了一个趋势和云的发展方向。ApsaraMQ 全线产品实现弹性灵活的 Serverless 架构,既彰显了技术架构的先进性,也提升了客户的安全感。业务部门减少了容量评估的沟通成本,用多少付多少,更省成本;运维部门免去了容量规划,实现自动弹性扩缩,降低运维人员投入的同时,大大提升了资源的利用率。 Serverless 方案的 Why / What / How Serverless 化预期达到的收益是:省心——秒级弹性,免容量规划;省钱——用多少付多少;省力——少运维、免运维。 要解决的痛点是:用户使用容量模型比较难做精准预估;运维侧手动扩容,是一个非常耗时耗力的过程。 核心目标是: + 弹性要快: 特别是针对一些脉冲型的秒杀业务,需要具备秒级万 TPS 的弹性能力。 + 缩容无感: 应对 MQ 客户端与服务侧 TCP 长连接的特性,缩容、服务端发布时要无感。 + 成本要低: 极致的性能优化,才能进一步降低用户侧的单位 TPS 成本。 通过如下几个关键路径构建 ApsaraMQ Serverless 核心能力: + 多租、安全隔离、业务流量隔离: 是构建 Serverless 核心能力的基础。 + 物理预弹&逻辑弹性: 物理预弹和逻辑弹性结合的极致弹性方案,通过镜像加速、元数据批量创建、主动路由更新等核心技术加速物理弹性,结合逻辑弹性最终交付秒级万 TPS 的极致弹性能力。 + RDMA: 在存储和计算层之间引入 RDMA 技术,充分利用 RDMA 的零拷贝、内核旁路、CPU 卸载等特性进一步降低计算层与存储层之间的通信开销。 + 优雅上下线: 依托 gRPC 协议的 GOAWAY 以及 Remoting 协议的扩展实现 TCP 长连接的优雅上下线。 + 控制资源水位: 通过智能化的集群流量调度以及平滑 Topic 迁移方案,进一步控制单个集群的资源水位在一个合理的值。 这套 Serverless 方案可以同时满足如下几种场景: + 第一种是平稳型:流量上升到一定水位后会平稳运行。 + 第二种是稳中有“进”型:流量平稳运行的同时,偶尔会有一些小脉冲。 + 第三种是周期性“脉冲型”:流量会周期性地变化。 计算、存储与网络:充分利用云的弹性能力 我们前面也有提到,这套架构是完全构建在云原生基础设施之上的,我们在计算、存储、网络上都充分利用了云的弹性能力,具体来看: + 在计算侧,通过 K8s 充分利用 ECS 的弹性能力,通过弹性资源池、HPA 等核心技术支持计算层的快速弹性,并通过跨可用区部署提升可用性。 + 在存储侧,充分利用飞天盘古存储的弹性能力,支持自定义消息的存储时长,冷热数据分离,额外保障冷读的 SLA。 + 在网络侧,公网随开随用,安全和方便兼具,支持多种私网形态接入,并基于 CEN 构建全球互通的消息网络。 在这之上,结合 SRE 平台的智能集群流量调度、集群水位监控、物理预弹性等核心能力,最终交付了一套完整的 ApsaraMQ Serverless 化物理弹性能力。 秒级万 TPS 的极致弹性能力保障 依托于上面的基础物理资源的弹性能力,来看下我们是如何保障秒级万 TPS 的极致弹性能力的? 从两个维度来看用户视角的弹性: + 从限流维度看: 在无损弹性上限以内的 TPS,都不会触发限流;超过无损弹性 TPS 上限后,会有秒级别的限流,通过逻辑弹性调整实例级别的限流阈值后,即可实现最终的 TPS 弹性。 + 从付费角度看: 在预留规格内按规格进行预付费;超过预留规格的上限 TPS,超过部分按量付费,即用多少付多少。 为了满足用户视角的秒级弹性能力,我们通过物理弹性和逻辑弹性相结合的方式来进行保障: + 物理弹性是预弹的机制,弹性时间在分钟级,用户侧无任何感知,由服务侧来 Cover 成本。 + 逻辑弹性是实时生效的,弹性时间在秒级,同时在触发无损弹性 TPS 上限时,用户侧是会有秒级别的限流感知的,另一方面,用户是需要为弹出来的那部分流量进行按量付费的。 两者的关系是:物理弹性是逻辑弹性的支撑,逻辑弹性依赖物理弹性。从弹性流程上看,当用户的流量触发无损弹性上限时,优先判断物理资源是否充足,充足的话就进行秒级逻辑弹性,不充足的话就等待物理资源扩容后再进行逻辑弹性。当然这里会有个预弹的机制,尽量保障逻辑弹性时物理资源都是充足的。 从物理弹性加速来看,通过以下几个方面进行加速: + 计算、存储层按需弹性: 计算层更关注 CPU、客户端连接数等核心指标;存储层更关注内存、磁盘 IO 等性能指标。 + 镜像下载加速: 通过 PlaceHolder + 镜像缓存组件加速新节点的扩容。 + 新增元数据批量创建的机制: 新增存储节点创建 5000 级别的 Topic 下降至 3 秒。 + 新增主动路由更新机制: 降低存储节点物理扩容后承接读写流量的延迟。 我们的系统设计精密,旨在确保用户体验到极致的弹性能力,特别是实现每秒万次事务处理(TPS)的秒级弹性扩展。这一能力的保障策略围绕两个核心维度展开,并深度融合物理与逻辑弹性机制,确保在高吞吐需求下的无缝响应与成本效率。 ApsaraMQ on RDMA:降低计算与存储层之间通信开销 RDMA(全称远程内存直接访问)是一种高性能的网络通信技术,相比 TCP/IP 的网络模式,具有零拷贝、内核旁路、CPU 卸载等核心优势。ApsaraMQ Serverless 化架构具备存算分离的特点,非常适合在计算层和存储层之间引入 RDMA 技术,进一步降低内部组件之间的数据通信开销。 具体来看,计算层 Proxy 在接收到客户端各种协议的消息收发请求以后,通过内置的 Remoting Client 和存储层 Broker 进行数据交换。在原先的 TCP/IP 网络模式中,这些数据是需要从操作系统的用户态拷贝到内核态后,再经由网卡和对端进行交互。依托 RDMA 内核旁路特性,通过实现 RdmaEventLoop 的适配器,消息直接由用户态到 RDMA 网卡和对端进行交互。 从最终 BenchMark 的测试效果来看,引入 RDMA 技术后,存储层 Broker 的 CPU 资源消耗下降了 26.7%,计算层 Proxy 的 CPU 资源消耗下降了 10.1%,计算到存储的发送 RT 下降了 23.8%。 优雅上下线:ApsaraMQ Serverless 弹性如丝般顺滑 在 Serverless 场景下,物理资源的水平弹性扩缩是一个常态化的过程,同时结合 MQ 客户端和计算层 Proxy TCP 长连接的特点,在 Proxy 上下线过程中是比较容易出现连接断开的感知,比如客户端刚发出一个接收消息的请求,连接就被服务侧强行关闭了,是非常容易造成单次请求超时的异常的。 因此,我们通过 gRPC 协议 Http2 的 Go Away 机制以及 Remoting 协议层的扩展,结合 SLB 连接优雅终端的能力来实现 ApsaraMQ 在扩容态、缩容态、以及发布态的无感。 右图展示了缩容态下单台 Proxy 优雅下线的过程: 1. 当收到 Proxy0 需要下线的指令后,SLB 会将 Proxy0 摘除,保障新的连接不再建立到 Proxy0 上,同时 Proxy0 进入 Draining 状态,旧连接进行会话保持。 2. Proxy0 上收到新的请求后,返回的 Response Code 都更新为 Go Away;同时客户单收到 Go Away 的 Response 后,断开原先的连接,向 SLB 发起新建连接的请求。 3. SLB 收到新建连接的请求,会导流至 Proxy1。 4. 等待一段时间后 Proxy0 上的连接会自然消亡,最终达到无感下线的目的。 事件驱动架构赋能 AI 应用 AI 无疑是当今互联网行业的热门话题,同时也是本届云栖大会的核心议题之一。接下来,将阐述面向 AI 原生应用的事件驱动架构如何拥抱 AI,赋能 AI 应用蓬勃发展。 + 面向 AI 应用的实时处理,具备实时 Embedding 至向量数据库、更新私有数据存储的能力,全面提升 AI 应用实时性、个性化和准确度。 + 面向 AI 应用的异步解耦,解耦 AI 推理链路的快慢服务,加快应用响应速度。 + 面向 AI 应用的消息负载,ApsaraMQ 支持主动 Pop 消费模式,允许动态设置每一条消息的个性化消费时长. 同时也支持优先级队列,满足下游多个大模型服务的优先级调度。 + 面向 AI 应用的消息弹性,ApsaraMQ 提供全模式的 Serverless 弹性能力,支持纯按量和预留+弹性、定时弹性等多种流量配置模型; 最后,让我们来看下阿里云事件总线 EventBridge 是如何实现数据实时向量化,提升 AI 应用的实时性和准确度的? 阿里云事件总线的 Event Streaming 事件流处理框架,具备监听多样化数据源,然后经过多次 Transform 之后再投递给下游数据目标的原子能力;依托这个框架,我们是很容易去实现数据的实时向量化的,从 RocketMQ、Kafka、OSS 等多个源监听到实时数据流入之后,经过文本化、切片、向量化等操作,最终存入向量数据库,作为 AI 应用实时问答的多维度数据检索的依据,最终提升了 AI 应用的实时性和准确度。
作者:金吉祥
#行业实践 #云原生