2024年7月24日

RocketMQ 快速上手体验
体验准备 体验开源 RocketMQ 准备 本教程旨在让开发者们快速体验开源 RocketMQ 环境配置、集群搭建、消息快速收发体验等环节。为降低部署门槛,提升上手体验,故仅在单机上部署整个 RocketMQ 集群,暂不考虑高可用部署形态。其它拓展部署形式,请参见第四章的参考资料自行尝试部署。因此,本部分需要您准备: 运行机器,64 位操作系统,推荐 Linux/Unix/macOS 64 位 JDK 1.8+ 具体体验内容请参见第二章。 体验阿里云云消息队列 RocketMQ 版准备 本部分教程旨在让开发者快速上手阿里云云消息队列 RocketMQ 版的资源创建、消息收发等流程。为降低使用门槛,优化上手体验,本教程将在阿里云云消息队列 RocketMQ 版推出的按量付费实例上进行。该类实例随买随用,用后即可释放,使用过程可能会产生极少量费用。因此,本部分需要您准备: 阿里云账号 确保账号余额能够支持按量付费类型实例的体验费用 具体体验内容请参见第三章。 快速体验开源 RocketMQ 环境配置 RocketMQ 的安装包分为两种,二进制包和源码包。二进制包是已经编译完成后可以直接运行的,源码包是需要编译后运行的。为提升体验过程的流畅性,这边建议下载二进制包,直接运行 RocketMQ。您可以点击下载二进制包。也可以点击下载 Apache RocketMQ 5.2.0 的源码包。若您想从源码包开始进行上手体验,需要您,进入下载源码的目录,执行如下编译命令: ```bash $ unzip rocketmqall5.2.0sourcerelease.zip $ cd rocketmqall5.2.0sourcerelease/ $ mvn Preleaseall DskipTests Dspotbugs.skip=true clean install U ``` 集群部署 若您直接下载 RocketMQ 的二进制包,则可以直接进入二进制包的目录中: ```bash $ cd rocketmqall5.2.0binrelease ``` 若您选择从源码开始体验,且已经在本地自行编译完成了二进制文件,则可进入源码目录下的 distribution/target 中的二进制文件目录: ```bash $ cd distribution/target/rocketmq5.2.0/rocketmq5.2.0 ``` 后续体验中的所有指令均在上述目录下执行。 启动 NameServer 安装完 RocketMQ 包后,我们执行下面的指令启动 NameServer: ```shell 启动namesrv $ nohup sh bin/mqnamesrv & 验证namesrv是否启动成功 $ cat ~/logs/rocketmqlogs/namesrv.log ``` 若一切正常,则会在执行完上述命令后,输出如下内容: ```shell The Name Server boot success... ``` 启动 Broker + Proxy NameServer 成功启动后,我们启动 Broker 和 Proxy。这里我们使用 Local 模式部署,即 Broker 和 Proxy 同进程部署。5.x 版本也支持 Broker 和 Proxy 分离部署以实现更灵活的集群能力。详情参考。 ```shell 先启动broker $ nohup sh bin/mqbroker n localhost:9876 enableproxy & 验证broker是否启动成功, 比如, broker的ip是192.168.1.2 然后名字是brokera $ cat ~/logs/rocketmqlogs/proxy.log ``` 我们可以在执行完上述命令后看到 proxy.log 中的内容,若看到如下信息,则表明 broker 已成功启动: ```shell The broker[brokera,192.169.1.2:10911] boot success... ``` 至此,一个单节点副本的 RocketMQ 集群已经部署起来了,我们可以利用脚本进行简单的消息收发。 消息收发 工具测试消息收发 在进行工具测试消息收发之前,我们需要告诉客户端 NameServer 的地址,RocketMQ 有多种方式在客户端中设置 NameServer 地址,这里我们利用环境变量 NAMESRV_ADDR。 ```shell $ export NAMESRV_ADDR=localhost:9876 ``` 完成环境变量配置后,可以在命令行输入如下指令,启动生产者: ```shell $ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer ``` 若生产成功,会输出如下内容: ```shell SendResult [sendStatus=SEND_OK, msgId= ... ``` 消息生产完成后,该消息便已经保存在本地 Broker 的存储中了。接下去再输入命令,启动消费者: ```shell $ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer ``` 若消费成功,则会出现如下的输出内容: ```shell ConsumeMessageThread_%d Receive New Messages: [MessageExt... ``` SDK 测试消息收发 工具测试完成后,我们可以尝试使用 SDK 收发消息。使用 SDK 进行消息收发的教程较为复杂,若有一定工程代码编写、运行经验,可以参考该自行尝试,本文不再赘述。 配套运维能力 mqadmin 工具介绍 mqadmin 是 RocketMQ 配套的运维工具,能够非常简便的查看集群状态,创建、修改 topic 等元数据。该工具的使用方式可以参考该。本文档仅举例如何使用该工具进行集群状态查看。 查看集群状态 对于刚刚启动的 Broker,我们可以尝试使用 mqadmin 工具对它状态进行查看,在二进制包目录下输入如下命令: ```shell sh bin/mqadmin clusterlist n localhost:9876 ``` 若集群运行正常,则输出如下:在该输出中,您可以看到该 NameServer 下的集群名称、Broker 名称、对应 IP 地址、Broker 代码版本、消息生产速度、消息消费速度、定时消息总数、刷盘等待时长、消息保留时长、磁盘使用率等信息。善用 mqadmin 工具,将能在集群故障时快速定位问题所在,并有能力人工介入作恢复。 关闭集群 当上述测试均完成后,您需要将集群进程(NameServer, Proxy, Broker)进行关闭,关闭方法如下: ```shell 关闭Broker $ sh bin/mqshutdown broker 若一切正常,则会输出如下内容: The mqbroker(36695) is running... Send shutdown request to mqbroker with proxy enable OK(36695) 关闭NameServer $ sh bin/mqshutdown namesrv 若一切正常,则会输出如下内容: The mqnamesrv(36664) is running... Send shutdown request to mqnamesrv(36664) OK ``` 快速体验阿里云云消息队列 RocketMQ 版 体验阿里云云消息队列 RocketMQ 版主要需要如下图所示的几个步骤。本文将按照下面的流程,分三部分引导您快速体验。 创建账号 & 授权 注意:若您的账号为阿里云账号,则默认拥有云消息队列 RocketMQ 版服务的所有权限,无需进行授权操作。账号角色查看方法如下: 登录阿里云控制台,页面右上角区域显示账号基本信息,若账号 ID 下显示主账号,表示该账号为阿里云账号,无需授权;若显示 RAM 用户,则该账号需要进行授权。 (左图为主账号,无需授权;右图为 RAM 角色账号,需要授权)若您使用的是 RAM 账号,则需要按进行授权。考虑到大部分体验者应是个人开发者,授权过程故不在本文中展开说明。 创建资源 在调用 SDK 收发消息前,您需要提前创建云消息队列 RocketMQ 版的相关资源,包括创建云消息队列 RocketMQ 版实例、获取实例的接入点、创建 Topic、创建 ConsumerGroup。调用 SDK 时,需要将这些资源信息填写到 SDK 代码中。需要注意的是,由于云消息队列 RocketMQ 版需要您预先准备网络、安全组等资源,所以在开通云消息队列 RocketMQ 版实例前,请尽量先参考如下教程做好准备工作: 当然,若您觉得跟着文档走比较复杂,云消息队列 RocketMQ 版在开通过程中也提供了全面的引导,辅助您在开通过程中自查资源准备情况,并立即补齐资源。下面我们直接以“零准备”的主账号进行云消息队列 RocketMQ 版实例购买、配置。 创建实例 1. 进入云消息队列 RocketMQ 版产品控制台。可以直接从阿里云官网的产品下拉框中进入,选择“中间件”,并从中找到“云消息队列 RocketMQ 版”。 2. 进入控制台后,点击“创建实例”按钮。 3. 选择“Serverless 按累积量”的实例类型,进入创建配置页面。请注意,若您要创建 Serverless 类型实例,请确保您的购买地域支持该实例类型。 4. 确认您在该地域是否已经有 VPC 资源。若无,则点击创建 VPC 专有网络。 5. 进入专有网络的创建页面后,请输入专有网络名称、网段、交换机名称等信息: 6. 请注意,由于云消息队列 RocketMQ 版的多可用区容灾高可用设计,需要您至少在两个可用区创建交换机。点击图中的“添加(1/10)”,可以同时创建多个交换机。 若您已经创建完成,仍可以进入专有网络控制台独立进行交换机的创建:需要注意的是,创建交换机时,请选取和已创建交换机不同可用区进行创建。 7. 创建完成后,重新返回云消息队列 RocketMQ 版控制台,即可在此处进行 VPC 专有网络的选择,以及 VSwitch(交换机)的选择。此处我们勾选两个可用区进行配置。 7. 若您未创建安全组,则可以在安全组选择栏下直接进入“创建安全组”的流程。 进入创建页面后,选择创建安全组。在网络栏选择刚刚配置的专有网络,其余安全组规则按默认即可,即可完成创建。 8. 返回云消息队列 RocketMQ 版控制台,查看“服务关联角色”是否已经创建。若未创建,则可点击该按钮直接进行创建。 创建完成的效果如下: 9. 若上述均已配置完成,但是购买按钮仍然显示灰色,且显示 PrivateLink 未开通,则点击进行开通即可。 注意,此处开通完成后,返回云消息队列 RocketMQ 版控制台,页面需要进行刷新才可正常购买。刷新可通过页面中“选择 VPC”等下拉框后面的“刷新”小按钮完成。 10. 刷新完成后,即可正常购买云消息队列 RocketMQ 版实例了,创建若干分钟后,您就拥有了一个按量付费的云消息队列 RocketMQ 版实例。 获取接入点 1. 在实例列表页面中单击目标实例名称。 2. 在实例详情页面的 TCP 协议接入点区域即可查看实例的接入点信息。 VPC 专有网络接入点:使用 VPC 专有网络访问云消息队列 RocketMQ 版时使用。云消息队列 RocketMQ 版默认提供的接入点。 公网接入点:使用公网访问云消息队列 RocketMQ 版时使用该接入点。仅当开启公网访问时显示。 获取账号密码 客户端接入云消息队列 RocketMQ 版服务端时,需要根据接入方式配置实例用户名密码。 使用公网访问云消息队列 RocketMQ 版服务端:需要配置实例的用户名密码。 使用VPC网络访问云消息队列 RocketMQ 版服务端:无需配置实例的用户名密码,系统会根据VPC接入点智能识别用户身份。 此处我们以公网访问为例,查看如何获取 Serverless 系列实例的账号密码:如上图所示,在您实例下点击“访问控制”按钮,进入“智能身份识别”一栏,下面便是您的实例账号、密码。后续若您需要用公网操作您的实例便需要填入此处的内容。 创建 Topic 现在我们已经拥有了一个 RocketMQ 实例,下面我们便在该实例下创建 Topic 资源。 1. 在实例列表页面中单击目标实例名称。 2. 在左侧导航栏单击 Topic 管理,然后在 Topic 管理页面单击创建 Topic。 3. 在创建 Topic 面板中填写Topic名称和描述,此处我们将 Topic 命名为"test", 选择消息类型为普通消息,然后单击确定,一个 Topic 便创建完成了。 创建订阅组(Group) 拥有一个 Topic 后,我们再创建一个订阅组(Group)。订阅组将被用于消息消费过程。 1. 在实例列表页面中单击目标实例名称。 2. 在左侧导航栏单击 Group 管理,然后在 Group 管理页面单击创建 Group。 3. 在创建 Group 面板填写Group ID,此处我们将 Group ID 设置为"testgroup"。其他参数可使用默认配置,然后单击确定。此时,一个订阅组便创建完成了。 收发消息 为方便体验,我们选择在控制台进行消息的发送,编写消费者代码并运行,以消费控制台发送的那条消息。 1. 控制台发送消息。首先进入 Topic 详情页面,点击右侧“快速体验”按钮。 2. 填入消息内容,即可点击发送。发送成功后,这条消息便已进入您实例所在的存储中,您可点击查看其消息轨迹。 3. 编写消费者代码,本教程将说明如何在 IntelliJ IDEA 中完成消费者的启动。本教程将从 0 开始教您从零开始构建一个 Java 项目。若您已有一定开发经验,请您根据真实情况选择性跳过。 1. 首先,安装 IntelliJ IDEA。点击该,下滑页面,选择社区版(Community)进行下载。 2. 新建 Java 工程: 3. 在运行代码前,请在您的工程中添加 pom 依赖: ```xml org.apache.rocketmq rocketmqclientjava 5.0.7 ``` 添加完成后,pom 文件如下所示: 4. 完成依赖添加后,您可以直接复制下面的代码并运行,但是需要注意的是,您要在代码中填入您的实例相关信息,这些信息均已经使用中括号({})框起。 ```java import org.apache.rocketmq.client.apis.; import org.apache.rocketmq.client.apis.consumer.ConsumeResult; import org.apache.rocketmq.client.apis.consumer.FilterExpression; import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; import org.apache.rocketmq.client.apis.consumer.PushConsumer; import org.apache.rocketmq.shaded.org.slf4j.Logger; import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collections; public class PushConsumerExample { private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class); private PushConsumerExample() { } public static void main(String[] args) throws ClientException, IOException, InterruptedException { / 实例接入点,从控制台实例详情页的接入点页签中获取。 如果是在阿里云ECS内网访问,建议填写VPC接入点。 如果是在本地公网访问,或者是线下IDC环境访问,可以使用公网接入点。使用公网接入点访问,必须开启实例的公网访问功能。 / String endpoints = "{实例接入点,如rmqcnxxx.cnzhangjiakou.rmq.aliyuncs.com:8080}"; //指定需要订阅哪个目标Topic,Topic需要提前在控制台创建,如果不创建直接使用会返回报错。 String topic = "{Topic名称,如test}"; //为消费者指定所属的消费者分组,Group需要提前在控制台创建,如果不创建直接使用会返回报错。 String consumerGroup = "{Group ID, 如testgroup}"; String instanceId = "{实例id,如rmqcnxxx}"; String userName = "{账号名}"; String passWord = "{密码}"; final ClientServiceProvider provider = ClientServiceProvider.loadService(); ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .setNamespace(instanceId) .setCredentialProvider(new StaticSessionCredentialsProvider(userName, passWord)) .build(); //订阅消息的过滤规则,表示订阅所有Tag的消息。 String tag = ""; FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); //初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。 PushConsumer pushConsumer = provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) //设置消费者分组。 .setConsumerGroup(consumerGroup) //设置预绑定的订阅关系。 .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) //设置消费监听器。 .setMessageListener(messageView { //处理消息并返回消费结果。 // LOGGER.info("Consume message={}", messageView); System.out.println("Consume Message: " + messageView); return ConsumeResult.SUCCESS; }) .build(); Thread.sleep(Long.MAX_VALUE); //如果不需要再使用PushConsumer,可关闭该进程。 //pushConsumer.close(); } } ``` 启动后,消费成功即可拿到之前在控制台发送的消息: 可观测能力 刚刚发送消息后,我们可以在控制台进行消息轨迹的查看。进入仪表盘时,会提示您创建服务关联角色,点击创建、授权即可。阿里云云消息队列 RocketMQ 版的可观测能力多样,细粒度的有消息级别的查询、轨迹查询。粗粒度的有仪表盘,能够在实例维度查看消息的生产、发送、堆积等情况。 消息查询 & 轨迹 针对我们刚刚发送的消息,可以点击“消息查询”功能,查询该消息的具体内容、查看消息轨迹,并可指定消费者进行消费能力验证等。尤其是消息轨迹功能,我们能够支持对特定消息进行全生命周期的展示,包括其生产者、存储时间、存储 ID、投递事件、消费者等信息。通过该可观测能力,我们能够十分清晰地了解消息收发的细节。 仪表盘 相对于消息查询功能,仪表盘属于粗粒度的可观测能力。该能力可以展现实例维度、Topic 维度、Group 维度的整体情况,包括但不限于收发速率、堆积情况等数据。且依托于 Grafana 的可视化能力,这些指标的展示都是十分直观且灵活的。如下图,我们可以看到刚刚测试的消息在何时进入实例,消费延迟时间等信息。 其它拓展能力以及参考文档 开源 RocketMQ 在 GitHub 社区中不断迭代成长,定期发布版本,您可以在社区内查看最新特性、提出 Bug,甚至参与 Bug 的修复。 Apache RocketMQ 的官网为: Apache RocketMQ 的开源 Github 社区为: 此外,阿里云云消息队列 RocketMQ 版的更多特性、教程、最佳实践均可在官方文档中找到。基于 Serverless 系列可以让体验成本可控,若对其它消息队列特性感兴趣,请自行上手尝试。 阿里云云消息队列 RocketMQ 版的官方文档为:
#基础学习资料

2024年7月24日

RocketMQ 原理和架构
Apache RocketMQ 是用于消息传递场景中高吞吐量、低延迟、高可用的分布式中间件,被广泛应用于搜索、社交网络活动流、数据管道、交易流程、物联网等领域,本文介绍 Apache RocketMQ 的核心原理和架构。 领域模型 以下是 RocketMQ 中的术语,更多细节请参考 。 Message(消息):Message 是 RocketMQ 传输的基本单元,包含了具体的业务数据以及一些元数据(如消息 ID、主题、标签、发送时间等)。消息可以是文本、二进制数据或其他任何序列化后的对象形式。 Topic(主题):Topic 是一类消息的逻辑分类名,是 Apache RocketMQ 中消息传输和存储的顶层容器。类似于邮件系统中的邮箱地址或发布/订阅模式中的“频道”。生产者向特定的 Topic 发送消息,消费者则根据 Topic 订阅并接收消息。一个 Topic 可以被多个生产者写入,同时也能被多个消费者订阅。 Queue(队列):每个 Topic 被划分为多个 Queue(队列),或称 MessageQueue,这些队列用于存储消息。生产者发送到 Topic 的消息会被分配到其下的各个 Queue 中;消费者则是从这些 Queue 中拉取消息进行消费。 Subscription(订阅):Subscription 表示消费者对某个 Topic 消息的兴趣表达。订阅关系由消费者分组动态注册到服务端系统,并在消息传输中按照订阅关系定义的过滤规则进行消息匹配和消费进度的维护。 Producer(生产者):生产者是消息产生的源头,将消息发送到服务端指定 Topic。 Consumer(消费者):消费者负责从服务端中拉取消息并进行处理。 ProducerGroup(生产者组):ProducerGroup 是一组生产者的逻辑分组,共享同样的 Topic 发送配置,实现发送端的负载均衡和容错。如果组内某个生产者失败,其他生产者可以继续工作,保证消息发送的连续性。 ConsumerGroup(消费者组):消费者分组是 Apache RocketMQ 系统中承载多个消费行为一致的消费者的负载均衡分组。和消费者不同,消费者分组并不是运行实体,而是一个逻辑资源。分组中的消费者共同订阅同一个 Topic 并以某种策略(如广播、集群消费)消费消息。在 Apache RocketMQ 中,通过消费者分组内初始化多个消费者实现消费性能的水平扩展以及高可用容灾。 技术架构 Apache RocketMQ 服务端基础组件包括 NameServer,Broker,Proxy,推荐使用存储计算分离模式部署。 直连模式部署 RocketMQ 是一个典型的发布订阅系统,通过 Broker 节点中转和持久化数据,解耦上下游。Broker 是真实存储数据的节点,由多个水平部署但不一定完全对等的副本组构成,单个副本组的不同节点上的数据会达到最终一致。单个副本组同一时间只有一个可读写的 Master 和若干个只读的 Slave,主故障时会进行选举来容忍故障,此时单个副本组可读不可写。NameServer 是独立的一个无状态组件,接受 Broker 的元数据注册并动态维护着一些映射关系,同时为客户端提供服务发现的能力。在这个模型中,我们使用不同主题 (Topic) 来区分不同类别的信息流,为消费者设置订阅组 (Group) 进行更好的管理与负载均衡。如下图中间部分所示: 1. 服务端 Broker Master1 和 Slave1 构成其中的一个副本组。 2. 服务端 Broker 1 和 Broker 2 两个副本组以负载均衡的形式共同为客户端提供读写。 注:Producer 和 Consumer 会和 NameServer,Broker 都维持长连接。Producer 只会向 Master 副本发送消息,Consumer 可以从 Master 或者 Slave 消费消息。 存储计算分离部署 存储和计算分离是一种良好的模块化设计。无状态的 Proxy 集群是数据流量的入口,提供签名校验与权限控制、计量与可观测、客户端连接管理、消息编解码处理、流量控制、多协议接入等能力。原 Broker 节点演化为以存储为核心的有状态集群,支持读写多类型消息,它们的底层是多模态存储和多元化的高效索引。存储计算分离的形态利于不同业务场景下单独调整存储或计算节点的数量,来实现扩容和缩容。网关模式接入还能带来升级简单,组网便利等好处。Proxy 和 Broker 都属于服务端组件,内网通信的延迟不会显著增加客户端收发消息的延迟。注:Proxy 自身会向 NameServer 和 Broker 都建立长连接,Producer 和 Consumer 仅连接到 Proxy。 通信机制 Apache RocketMQ 客户端使用 TCP 访问服务端,根据传输的数据格式分为 Remoting 协议和 gRPC 协议。 Remoting 协议诞生较早,是组件间通信默认的私有协议。其中 Remoting Java 客户端和主仓库同步演进和迭代,而多语言客户端(以下简称 SDK)归属于 Apache 社区多个独立仓库。 gRPC 协议自 RocketMQ 5.0 版本推出,以 Protobuf 定义了底层传输的数据格式(详见 ),旨在以云原生主流技术演进轻量、标准、易扩展的客户端服务端通信协议。使用 gRPC 协议的 SDK 是以独立仓库 方式演进,支持 Java/C++/.NET/Go/Rust 等众多语言。 RocketMQ 5.0 在服务端内部也提供了基于 Protobuf + gRPC 的管控 API 实现。 RocketMQ 的接入点是什么?为了简化客户端配置的复杂度,以直连模式部署的集群,客户端需要和服务端的 NameServer,Broker 进行点对点直连通信,客户端需要配置 NameServer 集群的负载均衡地址。对于以代理模式部署的集群,无论客户端使用 Remoting 还是 gRPC 协议,客户端仅需和 Proxy 进行通信,需要将配置接入点为 Proxy 的负载均衡地址。服务端会使用,自动区分 Remoting 和 gRPC 协议并处理客户端的请求。在受限网络环境中,客户端需要同时放通接入点的 8080 和 8081 端口。 存储机制 元数据管理 为了提升整体的吞吐量与提供跨副本组的高可用能力,RocketMQ 服务端一般会为单个 Topic 创建多个逻辑分区,即在多个副本组上各自维护部分分区 (Partition),我们把它称为队列 (MessageQueue)。同一个副本组上同一个 Topic 的队列数相同并从 0 开始连续编号,不同副本组上的 MessageQueue 数量可以不同。例如 topica 可以在 broker1 主副本上有 4 个队列,编号 (queueId) 是 03,在 broker1 备副本上完全相同,但是 broker2 上可能就只有 2 个队列,编号 01。在 Broker 上元数据的组织管理方式是与上述模型匹配的,每一个 Topic 的 TopicConfig,包含了几个核心的属性,名称,读写队列数,权限与许多元数据标识,这个模型类似于 K8s 的 StatefulSet,队列从 0 开始编号,扩缩队列都在尾部操作(例如 24 个队列缩分区到 16,是留下了编号为 015 的分区)。Broker 还管理着当前节点上 Group 的相关信息和消费进度(位点),当消费进度更新时 并不会像 Topic Group 那样立刻持久化,而是使用一个定时任务做 CheckPoint。这个周期默认是 5 秒,所以当客户端有上下线,服务端主备切换或者正常发布时,可能会有秒级的消息重复,并观察到堆积量的短暂上升。 高效的存储层实现 RocketMQ 存储的核心是极致优化的顺序写盘,数据以 append only 的形式不断的将新的消息追加到文件末尾。RocketMQ 使用了一种称为 MappedByteBuffer 的内存映射文件的办法,将一个文件映射到进程的地址空间,实现文件的磁盘地址和进程的一段虚拟地址关联,实际上是利用了NIO 中的 FileChannel 模型。在进行这种绑定后,用户进程就可以用指针(偏移量)的形式写入磁盘而不用进行 read / write 的系统调用,减少了数据在缓冲区之间来回拷贝的开销。当然这种内核实现的机制有一些限制,单个 mmap 的文件不能太大 (RocketMQ 选择了 1G),此时再把多个 mmap 的文件用一个链表串起来构成一个逻辑队列 (称为 MappedFileQueue),就可以在逻辑上实现一个无需考虑长度的存储空间来保存全部的消息。 单条消息的存储格式 RocketMQ 有一套相对复杂的消息存储编码用来将消息对象序列化,随后再将非定长的数据落到上述的真实的写入到文件中,存储格式中包括了索引队列的编号和位置。单条消息的存储格式如下:可以发现,单条消息本身元数据占用的存储空间为固定的描述信息和变长的 body 和 properties 部分,而消息的 payload 通常大于 2K,也就是说元数据带来的额外存储开销只增加了 5%10% 左右。很明显,单条消息越大,存储本身额外的开销(比例)就相对的越少。 构建消息的索引 在数据写入 CommitLog 后,有一个后端的 ReputMessageService 服务 (也被称为 dispatch 线程) 会异步的构建多种索引(例如 ConsumeQueue 和 Index),满足不同形式的读取和查询诉求。在 RocketMQ 的模型下,消息本身存在的逻辑队列称为 MessageQueue,而对应的物理索引文件称为 ConsumeQueue。其中 dispatch 线程会源源不断的将消息从 CommitLog 取出,再拿出消息在 CommitLog 中的物理偏移量,消息长度以及 Tag Hash 等信息作为单条消息的索引,分发到对应的消费队列,构成了对 CommitLog 的引用 (Reference)。ConsumeQueue 中单条消息占用的索引空间只有 20B。当客户端尝试从服务端拉取消息时,会先读取索引并进行过滤,随后根据索引从 CommitLog 中获得真实的消息并返回。 高可用机制 架构演进 最早的时候,RocketMQ 基于 MasterSlave 模式提供了主备部署的架构,这种模式提供了一定的高可用能力,在 Master 节点负载较高情况下,读流量可以被重定向到备机。备机在正常工作场景下资源使用率较低,造成一定的资源浪费。为了解决这个问题,社区提出了在一个 Broker 进程内运行多个 BrokerContainer,通过在单节点主备交叉部署来同时承担多份流量,该方案无外部依赖,自愈能力强。这种方式下隔离性弱于使用原生容器方式进行隔离,同时由于架构的复杂度增加导致了自愈流程较为复杂。另一条演进路线则是基于可切换的,RocketMQ 也尝试过依托于 Zookeeper 的分布式锁和通知机制进行 HA 状态的管理。引入外部依赖的同时给架构带来了复杂性,不容易做小型化部署,部署运维和诊断的成本较高。DLedger 方案是基于 Raft 的日志实现,集群内同一个副本组下的 Broker 会自动选主,Raft 中的副本身份被透出和复用到 Broker Role 层面,无外部依赖,然而强一致的 Raft 设计并未支持灵活的降级策略,无法在 C 和 A 之间灵活调整。而 RocketMQ DLedger 融合模式是 RocketMQ 5.0 演进中结合上述两条路线后的一个系统的解决方案,推荐使用两副本进行部署,很好的权衡了整体拥有成本和运维复杂度,其核心流程如下: 1. 利用可内嵌于 NameServer 的 Controller 进行选主,无外部依赖,对两副本支持友好。 2. 引入 EpochStartOffset 机制来计算日志分叉位点。 3. 消息在进行写入时,提供了灵活的配置来协调系统对于可用性还是一致性优先的诉求。 4. 简化日志复制协议使得日志复制为高效。 利用任期 Epoch 和偏移量 StartOffset 实现一个新的截断算法。这种 EpochStartOffset 满足如下原则: 1. 通过共识协议保证给定的一个任期 Epoch 只有一个Leader。 2. 只有 Leader 可以写入新的数据流,满足一定条件才会被提交。 3. Follower 只能从 Leader 获取最新的数据流,Follower 上线时按照选举算法进行截断。 下面是一个选举截断的具体案例,选举截断算法思想和流程如下: 主 CommitLog Min = 300,Max = 2500,EpochMap = {, , } 备 CommitLog Min = 300,Max = 2500,EpochMap = {, , } 1. 备节点连接到主节点进行 HA 协商,获取主节点的 EpochStartOffset 信息并比较 2. 备从后向前找到任期起始点相同的那个点作为分叉任期,在上述案例里是 3. 选择这个任期里主备结束位点的最小值(如果主副本没有切换且为最大任期,则主副本的结束位点是无穷大) 实现对比 | | 模式 | 优点 | 缺点 | | | | | | | 无切换架构 | MasterSlave 模式 | 实现简单,适用于中小型用户,人为干预管控力强。 | 冷备浪费资源,故障需要人工介入,由于故障的副本组不可写入消息,还会导致一些二级消息消费暂停,整体运维成本高。 | | 无切换架构 | Broker Container 模式 | 无需选主,无外部依赖,自愈能力强,故障转移时间从 ~30 秒级降低为 客户端 RocketMQ 提供了灵活的负载均衡机制,主要体现在消费者如何均衡地从消息队列中获取消息。主要分为三种消费模式:Push(推送模式),Pull(拉取模式),Pop(无状态消费模式)。 Push 和 Pull 消费 RocketMQ 中的 Push 并不是指传统意义上的客户端完全被动接收,底层是基于长轮询机制实现。 1. 长轮询:客户端与 Broker 建立长连接,并发送拉取消息的请求。如果当前没有新消息,Broker 不会立即响应,而是等待一段时间或直到有新消息到达再返回。 2. 消费位点:每个消费者维护自己的消费进度(消费位点),Broker 根据这些位点信息,只推送消费者尚未消费的消息。 3. 重平衡:当消费者组内的消费者实例发生变化时(如增加或减少消费者实例),RocketMQ会触发一次重平衡(Rebalance)操作,重新分配消息队列到各个消费者实例,以实现负载均衡。这个过程确保了消息的均匀消费,避免了消息积压或某些消费者空闲的情况。 Pull 模式更加主动,消费者根据自己的消费能力和需求,主动从 Broker 拉取消息。 1. 主动拉取:消费者主动向Broker发送拉取请求,指定要拉取的消息数量和偏移量(或时间戳),Broker 响应包含消息或空结果。 2. 位点管理和重平衡:与Push模式类似,每个消费者维护自己的消费进度,并在消费者实例变化时进行重平衡。但是,在Pull模式下,重平衡的逻辑更依赖于消费者的主动参与,消费者需要根据新的队列分配情况调整自己的拉取策略。 Push / Pull 消费模式的负载均衡是在客户端完成的,性能较高,但也有一些缺陷。 1. 客户端代码逻辑复杂,客户端要实现完整的负载均衡,拉消息,位点管理,消费失败后将消息发回 Broker 重试等逻辑。这给多语言客户端的支持造成很大的阻碍。 2. 消费者无法无限扩展,当消费者数量扩大到大于队列数量时,有的消费者将无法分配到队列。 3. 当某些消费者僵死(hang 住)时,会造成其消费的队列的消息堆积。 Pop 消费 在 RocketMQ 5.0 中,Pop 消费模式借助 gRPC 封装的接口,促进了轻量化多语言客户端的实现,无需在各客户端重复实现重平衡逻辑,显著提升了系统的灵活性和扩展性。该设计核心在于将重平衡、位点管理及消息重试等任务转移至服务端处理,有效避免单点故障引起的消息积压,优化了整体消息处理效率和系统的水平扩展能力。Push / Pull 模式下队列中有慢任务会阻塞整个队列。例如有位点为 34567 的 5 条消息,消费 offset = 5 时业务逻辑耗时非常久,并发消费模式下 67 两条消息消费较快,而观察到的堆积一直为 3 造成误判。消费者或者服务端宕机,业务对产生几秒的消费重复依然敏感,影响用户体验,例如短信推送场景。甚至,我们还有更有代表性的场景来命中这些 “缺陷”,例如渲染业务,队列中每一条消息代表一个渲染任务。 1. 消费者数量较多,同一个订阅组可能有成百上千台机器同时消费。 2. 该场景下单条数据的处理耗时较长,需要几秒至几个小时不等。 3. 由于消费者负载高和大量使用竞价实例,导致消费方进程假死和宕机率远高于一般业务。 传统的消息队列会遇到很经典的 “WorkStealing” 难题,任务的负载无法均衡的分配到所有消费方,单条消息的阻塞会影响后续消费成功消息位点的提交。此时我们想要的是一个基于不可见时间的投递算法,该算法大致的工作流程如下: 1. 客户端设置一个不可见时间,例如 5 分钟,并向服务端拉取一批消息。 2. 服务端返回一批消息,并在后台开始倒计时 5 分钟,消息上会附加一个字段用来标识,也称为 handle。 3. 如果客户端 5 分钟内没有提交消费成功(ack by handle),5 分钟后客户端再次可以获取到这批消息。 很快我们就会发现这个模型还是有缺陷的,假如消费者拉取消息 1 分钟后立刻宕机了,业务不得不忍受 4 分钟的延迟才能再次处理,哪怕此时其他消费者还是空闲状态。这个时候就可以选择将消息的不可见时间设置为 1 分钟,在客户端处理业务的同时不停的 refresh 不可见时间,例如每隔 30 秒就调用 change invisible time,使剩余的不可见时间更新为 1 分钟,此时无论客户端何时宕机,消息的延迟时间会控制在 1 分钟之内。在 RocketMQ 中,这种基于区间和单条消息进行消费的方式被称为 “pop 消费”,对应的客户端实现是 SimpleConsumer,它的简单性在于客户端不再需要关心复杂的负载均衡和位点管理,也更容易适配多语言。 高级特性 顺序消息 顺序消息是 Apache RocketMQ 提供的一种高级消息类型,支持消费者按照发送消息的先后顺序获取消息,从而实现业务场景中的顺序处理。 相比其他类型消息,顺序消息在发送、存储和投递的处理过程中,更多强调多条消息间的先后顺序关系。Apache RocketMQ 顺序消息的顺序关系通过消息组(MessageGroup)判定和识别。(注:这个概念在旧版本中被称为 ShardingKey)发送顺序消息时需要为每条消息设置归属的消息组,相同消息组的多条消息之间遵循先进先出的顺序关系,不同消息组、无消息组的消息之间不涉及顺序性。Apache RocketMQ 的消息的顺序性分为两部分,生产顺序性和消费顺序性:如需保证消息生产的顺序性,则必须满足以下条件: 单一生产者:消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的消息组,不同生产者之间产生的消息也无法判定其先后顺序。 串行发送:Apache RocketMQ 生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。 Apache RocketMQ 通过消费者和服务端的协议保障消息消费严格按照存储的先后顺序来处理。 如需保证消息消费的顺序性,则必须满足投递顺序和有限重试两个条件,详情请参考。 定时消息 在分布式场景中会有定时调度、任务超时处理等场景,使用 Apache RocketMQ 的定时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力。RocketMQ 服务端有两种实现延时的机制,一种是延时队列,另一种是可持久化单层时间轮。下面简述这两种方案的服务端实现: 1. 延时队列实现:通过划分延时 Level 的方式,将不同延迟级别的消息放入不同的延迟队列,排序操作转换为了 O(1) 的 ConsumeQueue 的 append 操作。broker 有一个定时任务不断从各个延迟队列 “消费消息”,如果到达预期时间,就取出消息并重新放入到 commitLog 中。这样设计的延时队列已经满足大部分场景下的需求,如 15 分钟的特定场景,消息会堆积于单个队列中。该方案性能非常高,缺陷是只支持固定间隔的定时消息,例如 1 分钟,15 分钟,60 分钟等。 2. 可持久化单层时间轮:每一秒为一个定时的 Slot,将所有消息通过持久化的方式放入 TimerLog 中,当定时时间有冲突时使用哈希拉链的方式解决冲突,后一条消息保存对前一条消息的引用。同时对时间轮提供内存锁定,对于所有追加只在 TimerLog 尾部操作。当取出消息的线程从时间轮中扫描 Slot 时,从 TimerLog 中以此拉取所有定时消息并出队。 事务消息 RocketMQ 提供了事务消息的功能,采用 2PC (两段式协议) + 补偿机制(事务回查)的分布式事务功能,通过这种方式能达到分布式事务的最终一致。这里需要先来理解两个概念: 半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了消息队列 RocketMQ 版服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。 消息回查: 由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列 RocketMQ 版服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者组(ProducerGroup)的任意一个客户端询问该消息的最终状态(Commit 或是 Rollback),即消息回查。 事务消息发送步骤如下: 1. 发送方将半事务消息发送至消息队列 RocketMQ 版服务端。 2. 消息队列 RocketMQ 版服务端将消息持久化成功之后,向发送方返回 Ack 确认消息已经发送成功,此时消息为半事务消息。 3. 发送方开始执行本地事务逻辑。 4. 发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到 Commit 状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半事务消息,订阅方将不会接受该消息。 事务消息回查步骤如下: 5. 在断网或者是应用重启的特殊情况下,上述步骤 4 提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。 6. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。 7. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半事务消息进行操作。 消息查询 RocketMQ 作为业务消息的首选,除了上文中 ReputMessageService 线程除了构建消费队列的索引外,还同时为每条消息根据 id, key 构建了索引到 IndexFile。这是方便快速快速定位目标消息而产生的,当然这个构建随机索引的能力是可以降级的,IndexFile文件结构如下:IndexFile 也是定长的,从单个文件的数据结构来说,这是实现了一种简单原生的哈希拉链机制。当一条新的消息索引进来时,首先使用 hash 算法命中黄色部分 500w 个 slot 中的一个,如果存在冲突就使用拉链解决,将最新索引数据的 next 指向上一条索引位置。同时将消息的索引数据 append 至文件尾部(绿色部分),这样便形成了一条当前 slot 按照时间存入的倒序的链表。这里其实也是一种 LSM compaction 在消息模型下的改进,降低了写放大。当用户按照 UniqueKey(MsgId)或者业务 Key 来进行查询时,会先从索引查询消息报存在 CommitLog 中的位置并取回数据返回客户端。
#基础学习资料

2024年7月24日

RocketMQ 应用场景
Apache RocketMQ 作为一款分布式消息中间件,具备异步通信的优势,系统拓扑简单、上下游耦合较弱,主要应用于异步解耦,流量削峰填谷等场景。广泛应用于各种大规模分布式系统和微服务架构中,以实现高性能、低延迟的消息传递。对于同步链路,需要实时返回调用结果的场景,建议使用 RPC 调用方案。如上图所示,RocketMQ 中消息的生命周期主要分为“消息生产、消息存储、消息消费”这三部分。生产者生产消息并发送至 RocketMQ 服务端,消息被存储在服务端的主题的队列中,消费者通过订阅主题消费消息。消息生产生产者(Producer):RocketMQ 中用于产生消息的运行实体,一般集成于业务调用链路的上游。生产者是轻量级匿名无身份的。消息存储 主题(Topic):RocketMQ 消息传输和存储的分组容器,主题内部由多个队列组成,消息的存储和水平扩展实际是通过主题内的队列实现的。 队列(MessageQueue):RocketMQ 消息传输和存储的实际单元容器,类比于 Kafka 中的分区。RocketMQ 通过流式特性的无限队列结构来存储消息,消息在队列内具备顺序性存储特征。 消息(Message):RocketMQ 的最小传输单元。消息具备不可变性,在初始化发送和完成存储后即不可变。 消息消费 消费者分组(ConsumerGroup):RocketMQ 发布订阅模型中定义的独立的消费身份分组,用于统一管理底层运行的多个消费者(Consumer)。同一个消费组的多个消费者必须保持消费逻辑和配置一致,共同分担该消费组订阅的消息,实现消费能力的水平扩展。 消费者(Consumer):RocketMQ 消费消息的运行实体,一般集成在业务调用链路的下游。消费者必须被指定到某一个消费组中。 订阅关系(Subscription):RocketMQ 发布订阅模型中消息过滤、重试、消费进度的规则配置。订阅关系以消费组粒度进行管理,消费组通过定义订阅关系控制指定消费组下的消费者如何实现消息过滤、消费重试及消费进度恢复等。RocketMQ 的订阅关系除过滤表达式之外都是持久化的,即服务端重启或请求断开,订阅关系依然保留。 RocketMQ 典型应用场景 作为一款消息中间件,RocketMQ 的典型应用场景有:削峰填谷与流量控制、异步解耦。 削峰填谷与流量控制 在秒杀、大促等高并发场景下,RocketMQ 可以作为缓冲区,暂时存储大量涌入的请求,从而避免系统直接承受峰值压力,实现流量削峰。通过异步处理这些消息,系统可以平稳地处理高峰流量,保障服务稳定性。 异步解耦 RocketMQ 可以用来解耦系统组件,允许服务间的异步通信。这样服务就可以独立部署、升级,而不影响彼此,提高了系统的可扩展性和灵活性。 RocketMQ 特性消息的应用场景 顺序消息处理 在需要保证消息顺序的场景中,如金融交易系统中的订单处理(创建、支付、退款等),RocketMQ 支持顺序消息功能,确保消息严格按照发送顺序被消费,满足业务逻辑的一致性要求。RocketMQ 顺序消息的特性非常适合那些对消息处理顺序有严格要求的业务场景,确保消息按照特定的顺序被消费,以维持业务逻辑的连贯性和一致性。以下是一些典型的使用 RocketMQ 顺序消息的业务场景: 金融交易系统:在金融领域,如银行账户的转账、扣款、充值等操作,必须严格按照发生的顺序处理,以避免资金账户余额不准确,如先扣除金额再执行充值会导致短暂的账户透支问题。 订单处理流程:电商平台的订单处理,包括创建订单、支付订单、发货通知、完成订单等步骤,这些操作需要按照特定顺序执行,以确保用户体验和业务流程的完整性。 库存管理:在库存管理系统中,商品的入库、出库、盘点等操作需要顺序执行,以防止库存数据混乱,比如先扣减库存再进行商品销售确认,可能导致超卖现象。 消息日志记录:在需要保持操作记录顺序的场景,如审计日志、操作历史追踪等,顺序消息可以保证记录的时序性,便于追踪和回溯。 游戏状态同步:在线游戏中的用户操作,如移动、攻击、升级等,为了保持游戏世界的连贯性,这些操作的处理顺序至关重要,以免出现逻辑冲突或玩家体验受损。 计费系统:电信、云服务等领域的计费系统,需要按照用户使用服务的顺序来计算费用,避免因计费顺序错误导致的计费争议。 消息驱动的流程引擎:在工作流或业务流程自动化系统中,顺序消息可以作为流程节点之间的驱动器,确保每个步骤按照预定的顺序执行。 通过使用 RocketMQ 的顺序消息功能,可以确保在分布式系统中,即使消息被并行处理,也能维持业务逻辑所需的顺序,进而提高系统的可靠性和业务的正确性。 分布式事务支持 RocketMQ 提供了分布式事务消息功能,能够在分布式系统中实现事务的最终一致性。这使得跨服务的事务操作得以原子化执行,确保数据的一致性。RocketMQ 的事务消息特性主要用于解决分布式系统中需要保持数据一致性的复杂场景,特别是当业务操作需要跨多个服务或数据库,且这些操作必须保持原子性时。以下是事务消息的几个典型业务场景: 订单支付场景:用户在电商平台下单后,需要同时更新订单状态、扣减库存、处理支付等操作。事务消息可以确保这一系列操作要么全部成功,要么全部失败,避免出现订单已确认但库存未扣减或支付未完成的情况。 金融交易:在转账、理财购买、贷款发放等金融交易中,事务消息能确保资金的划转与账户余额更新、交易记录生成等一系列操作保持一致,防止资金丢失或重复记账。 库存扣减与订单创建:在库存管理中,当用户下单时,需确保库存扣减与订单创建同时成功或失败,避免超卖或订单错误。事务消息可以保证这一流程的原子性。 票务系统:火车票、飞机票的预订和出票过程中,座位锁定与订单生成必须同步,事务消息可以确保不会出现座位已分配但订单未创建或相反的情况。 用户注册与邮件/短信通知:在用户注册后,需要同时完成用户账户的创建与发送验证邮件/短信的操作,事务消息可以保证用户数据与通知消息的一致性。 通过事务消息,RocketMQ 提供了一种在分布式系统中实现事务一致性的有效手段,特别是在那些需要跨服务、跨数据库操作且保证数据一致性的场景中尤为重要。 延时消息与定时任务 RocketMQ 支持延时消息功能,可以设定消息在未来某一时间点被消费,适用于如订单超时自动取消、定时提醒等场景。RocketMQ 的定时消息功能为分布式系统提供了强大的消息调度能力,允许消息在特定时间点之后才被消费者处理。这一特性适用于多种业务场景,确保消息处理与时间相关联,以下是几个典型的使用场景: 订单超时处理:在电子商务平台中,可以利用定时消息来处理未支付的订单。例如,当用户下单后,系统发送一个延迟几分钟至几小时的定时消息,如果到时仍未支付,则自动取消订单,释放库存。 系统任务调度:定时消息可以作为轻量级的任务调度工具,比如定期数据统计、报表生成、缓存清理、数据库维护任务等,无需额外的复杂调度系统。 消息重试机制:在消息处理失败时,可以通过发送一个延迟的定时消息来实现自动重试机制,比如支付确认、通知发送失败等情况,避免即时重试带来的系统压力。 营销活动触发:在营销活动中,可以预先设定好活动开始或结束的通知,通过定时消息在特定时间点自动触发达成营销效果,如促销活动的开始提醒、优惠券到期提醒等。 系统通知与提醒:如用户生日祝福、账户到期提醒、定期报告发送等,通过定时消息自动触发,确保消息按时送达。 金融交易确认:在需要一定时间确认的金融交易中,如转账确认、贷款审批通知等,可以设置定时消息在预期的确认时间后通知用户或更新系统状态。 资源回收与清理:在云服务或资源分配场景中,可以安排定时消息来自动回收不再使用的资源,如临时生成的文件、过期的访问令牌等。 社交应用的消息定时发送:如定时发送节日祝福、生日祝福等,提升用户体验。 通过上述场景可以看出,RocketMQ 的定时消息功能极大地丰富了分布式系统处理时间敏感型任务的能力,提高了系统的灵活性和自动化水平。 日志与监控数据收集 RocketMQ 可以作为日志或监控数据的收集管道,从各个服务收集信息,统一处理后发送至数据分析或监控平台,便于系统监控和故障排查。 服务编排与工作流 通过消息驱动的方式,RocketMQ 可以作为服务编排的工具,实现复杂工作流的自动化,如订单处理流程中的多个服务协同工作。 消息驱动的微服务架构 RocketMQ 可以作为微服务间通信的基础设施,实现事件驱动的微服务架构,提高系统的响应速度和可维护性。这些应用场景展示了 RocketMQ 在现代分布式系统中的重要作用,它不仅帮助系统应对高并发挑战,也促进了系统的灵活扩展和高效协作。
#基础学习资料