2021年3月18日

如何在 Spring 生态中玩转Apache RocketMQ?
在 Spring 生态中玩转 RocketMQ 系列教程现已登陆知行动手实验室,立即体验! 移动端同学,需要在PC端登录 start.aliyun.com 进行体验。 RocketMQ 作为业务消息的首选,在消息和流处理领域被广泛应用。而微服务生态 Spring 框架也是业务开发中最受欢迎的框架,两者的完美契合使得 RocketMQ 成为 Spring Messaging 实现中最受欢迎的消息实现。本文展示了 5 种在 Spring 生态中文玩转 RocketMQ 的方式,并描述了每个项目的特点和使用场景。 一、前言 上世纪 90 年代末,随着 Java EE(Enterprise Edition)的出现,特别是 Enterprise Java Beans 的使用需要复杂的描述符配置和死板复杂的代码实现,增加了广大开发者的学习曲线和开发成本,由此基于简单的 XML 配置和普通 Java 对象(Plain Old Java Objects)的 Spring 技术应运而生,依赖注入(Dependency Injection),控制反转(Inversion of Control)和面向切面编程(AOP)的技术更加敏捷地解决了传统 Java 企业及版本的不足。随着 Spring 的持续演进,基于注解(Annotation)的配置逐渐取代了 XML 文件配置。除了依赖注入、控制翻转、AOP 这些技术,Spring 后续衍生出 AMQP、Transactional、Security、Batch、Data Access 等模块,涉及开发的各个领域。 2014 年 4 月 1 日,Spring Boot 1.0.0 正式发布。它基于“约定大于配置”(Convention over configuration)这一理念来快速地开发,测试,运行和部署 Spring 应用,并能通过简单地与各种启动器(如springbootwebstarter)结合,让应用直接以命令行的方式运行,不需再部署到独立容器中。Spring Boot 的出现可以说是 Spring 框架的第二春,它不但简化了开发的流程,目前更是事实标准。下面这幅图可以看出相同功能的 Spring 和 Spring Boot 的代码实现对比。 Apache RocketMQ 是一款是业界知名的分布式消息和流处理中间件,它主要功能是消息分发、异步解耦、削峰填谷等。RocketMQ 是一款金融级消息及流数据平台,RocketMQ 在交易、支付链路上用的很多,主要是对消息链路质量要求非常高的场景,能够支持万亿级消息洪峰。RocketMQ 在业务消息中被广泛应用,并衍生出顺序消息、事务消息、延迟消息等匹配各类业务场景的特殊消息。 本文的主角就是 Spring 和 RocketMQ,那几乎每个 Java 程序员都会使用 Spring 框架与支持丰富业务场景的 RocketMQ 会碰撞出怎么样的火花? 二、RocketMQ 与 Spring 的碰撞 在介绍 RocketMQ 与 Spring 故事之前,不得不提到 Spring 中的两个关于消息的框架,Spring Messaging 和 Spring Cloud Stream。它们都能够与 Spring Boot 整合并提供了一些参考的实现。和所有的实现框架一样,消息框架的目的是实现轻量级的消息驱动的微服务,可以有效地简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。 1. Spring Messaging Spring Messaging 是 Spring Framework 4 中添加的模块,是 Spring 与消息系统集成的一个扩展性的支持。它实现了从基于 JmsTemplate 的简单的使用 JMS 接口到异步接收消息的一整套完整的基础架构,Spring AMQP 提供了该协议所要求的类似的功能集。在与 Spring Boot 的集成后,它拥有了自动配置能力,能够在测试和运行时与相应的消息传递系统进行集成。单纯对于客户端而言,Spring Messaging 提供了一套抽象的 API 或者说是约定的标准,对消息发送端和消息接收端的模式进行规定,比如消息 Messaging 对应的模型就包括一个消息体 Payload 和消息头 Header。不同的消息中间件提供商可以在这个模式下提供自己的 Spring 实现:在消息发送端需要实现的是一个 XXXTemplate 形式的 Java Bean,结合 Spring Boot 的自动化配置选项提供多个不同的发送消息方法;在消息的消费端是一个 XXXMessageListener 接口(实现方式通常会使用一个注解来声明一个消息驱动的 POJO),提供回调方法来监听和消费消息,这个接口同样可以使用 Spring Boot 的自动化选项和一些定制化的属性。 在 Apache RocketMQ 生态中,RocketMQSpringBootStarter(下文简称 RocketMQSpring)就是一个支持 Spring Messaging API 标准的项目。该项目把 RocketMQ 的客户端使用 Spring Boot 的方式进行了封装,可以让用户通过简单的 annotation 和标准的 Spring Messaging API 编写代码来进行消息的发送和消费,也支持扩展出 RocketMQ 原生 API 来支持更加丰富的消息类型。在 RocketMQSpring 毕业初期,RocketMQ 社区同学请 Spring 社区的同学对 RocketMQSpring 代码进行 review,引出一段罗美琪(RocketMQ)和春波特(Spring Boot)故事的佳话[1],著名 Spring 布道师 Josh Long 向国外同学介绍如何使用 RocketMQSpring 收发消息[2]。RocketMQSpring 也在短短两年时间超越 SpringKafka 和 SpringAMQP(注:两者均由 Spring 社区维护),成为 Spring Messaging 生态中最活跃的消息项目。 2. Spring Cloud Stream Spring Cloud Stream 结合了 Spring Integration 的注解和功能,它的应用模型如下: Spring Cloud Stream 框架中提供一个独立的应用内核,它通过输入(@Input)和输出(@Output)通道与外部世界进行通信,消息源端(Source)通过输入通道发送消息,消费目标端(Sink)通过监听输出通道来获取消费的消息。这些通道通过专用的 Binder 实现与外部代理连接。开发人员的代码只需要针对应用内核提供的固定的接口和注解方式进行编程,而不需要关心运行时具体的 Binder 绑定的消息中间件。 在运行时,Spring Cloud Stream 能够自动探测并使用在 classpath 下找到的 Binder。这样开发人员可以轻松地在相同的代码中使用不同类型的中间件:仅仅需要在构建时包含进不同的 Binder。在更加复杂的使用场景中,也可以在应用中打包多个 Binder 并让它自己选择 Binder,甚至在运行时为不同的通道使用不同的 Binder。 Binder 抽象使得 Spring Cloud Stream 应用可以灵活的连接到中间件,加之 Spring Cloud Stream 使用利用了 Spring Boot 的灵活配置配置能力,这样的配置可以通过外部配置的属性和 Spring Boot 支持的任何形式来提供(包括应用启动参数、环境变量和 application.yml 或者 application.properties 文件),部署人员可以在运行时动态选择通道连接 destination(例如,RocketMQ 的 topic 或者 RabbitMQ 的 exchange)。 Spring Cloud Stream 屏蔽了底层消息中间件的实现细节,希望以统一的一套 API 来进行消息的发送/消费,底层消息中间件的实现细节由各消息中间件的 Binder 完成。Spring 官方实现了 Rabbit binder 和 Kafka Binder。Spring Cloud Alibaba 实现了 RocketMQ Binder[3],其主要实现原理是把发送消息最终代理给了 RocketMQSpring 的 RocketMQTemplate,在消费端则内部会启动 RocketMQSpring Consumer Container 来接收消息。以此为基础,Spring Cloud Alibaba 还实现了 Spring Cloud Bus RocketMQ, 用户可以使用 RocketMQ 作为 Spring Cloud 体系内的消息总线,来连接分布式系统的所有节点。通过 Spring Cloud Stream RocketMQ Binder,RocketMQ 可以与 Spring Cloud 生态更好的结合。比如与 Spring Cloud Data Flow、Spring Cloud Funtion 结合,让 RocketMQ 可以在 Spring 流计算生态、Serverless(FaaS) 项目中被使用。 如今 Spring Cloud Stream RocketMQ Binder 和 Spring Cloud Bus RocketMQ 做为 Spring Cloud Alibaba 的实现已登陆 Spring 的官网[4],Spring Cloud Alibaba 也成为 Spring Cloud 最活跃的实现。 三、如何在 Spring 生态中选择 RocketMQ 实现? 通过介绍 Spring 中的消息框架,介绍了以 RocketMQ 为基础与 Spring 消息框架结合的几个项目,主要是 RocketMQSpring、Spring Cloud Stream RocketMQ Binder、Spring Cloud Bus RocketMQ、Spring Data Flow 和 Spring Cloud Function。它们之间的关系可以如下图表示。 如何在实际业务开发中选择相应项目进行使用?下面分别列出每个项目的特点和使用场景。 1. RocketMQSpring 特点: 作为起步依赖,简单引入一个包就能在 Spring 生态用到 RocketMQ 客户端的所有功能。 利用了大量自动配置和注解简化了编程模型,并且支持 Spring Messaging API。 与 RocketMQ 原生 Java SDK 的功能完全对齐。 使用场景: 适合在 Spring Boot 中使用 RocketMQ 的用户,希望能用到 RocketMQ 原生 java 客户端的所有功能,并通过 Spring 注解和自动配置简化编程模型。 2. Spring Cloud Stream RocketMQ Binder 特点: 屏蔽底层 MQ 实现细节,上层 Spring Cloud Stream 的 API 是统一的。如果想从 Kafka 切到 RocketMQ,直接改个配置即可。 与 Spring Cloud 生态整合更加方便。比如 Spring Cloud Data Flow,这上面的流计算都是基于 Spring Cloud Stream;Spring Cloud Bus 消息总线内部也是用的 Spring Cloud Stream。 Spring Cloud Stream 提供的注解,编程体验都是非常棒。 使用场景: 在代码层面能完全屏蔽底层消息中间件的用户,并且希望能项目能更好的接入 Spring Cloud 生态(Spring Cloud Data Flow、Spring Cloud Funtcion 等)。 3. Spring Cloud Bus RocketMQ 特点: 将 RocketMQ 作为事件的“传输器”,通过发送事件(消息)到消息队列上,从而广播到订阅该事件(消息)的所有节点上,完成事件的分发和通知。 使用场景: 在 Spring 生态中希望用 RocketMQ 做消息总线的用户,可以用在应用间事件的通信,配置中心客户端刷新等场景。 4. Spring Cloud Data Flow 特点: 以 Source/Processor/Sink 组件进行流式任务处理。RocketMQ 作为流处理过程中的中间存储组件。 使用场景: 流处理,大数据处理场景。 5. Spring Cloud Function 特点: 消息的消费/生产/处理都是一次函数调用,融合 Java 生态的 Function 模型。 使用场景: Serverless 场景。 本文整体介绍了在 Spring 生态中接入 RockeMQ 的 5 种方法,让各位开发者对几种经典场景有宏观的了解。后续会有专栏详细介绍上述各个项目的具体使用方法和应用场景,真正地在 Spring 生态中玩转 RocketMQ! 在 Spring 生态中玩转 RocketMQ 系列教程现已登陆知行动手实验室,立即体验! 移动端同学,需要在PC端登录 start.aliyun.com 进行体验。 相关链接:
#社区动态 #微服务

2021年2月2日

RocketMQ-Spring 毕业两周年,为什么能成为 Spring 生态中最受欢迎的 messaging 实现?
2019 年 1 月,孵化 6 个月的 RocketMQSpring 作为 Apache RocketMQ 的子项目正式毕业,发布了第一个 Release 版本 2.0.1。该项目是把 RocketMQ 的客户端使用 Spring Boot 的方式进行了封装,可以让用户通过简单的 annotation 和标准的 Spring Messaging API 编写代码来进行消息的发送和消费。当时 RocketMQ 社区同学请 Spring 社区的同学对 RocketMQSpring 代码进行 review,引出一段。 时隔两年,RocketMQSpring 正式发布 2.2.0。在这期间,RocketMQSpring 迭代了数个版本,以 RocketMQSpring 为基础实现的 Spring Cloud Stream RocketMQ Binder、Spring Cloud Bus RocketMQ 登上了 ,Spring 布道师 baeldung 向国外同学介绍,越来越多国内外的同学开始使用 RocketMQSpring 收发消息,RocketMQSpring 仓库的 star 数也在短短两年时间内超越了 SpringKafka 和 SpringAMQP(注:两者均由 Spring 社区维护),成为 Apache RocketMQ 最受欢迎的生态项目之一。 RocketMQSpring 的受欢迎一方面得益于支持丰富业务场景的 RocketMQ 与微服务生态 Spring 的完美契合,另一方面也与 RocketMQSpring 本身严格遵循 Spring Messaging API 规范,支持丰富的消息类型分不开。 遵循 Spring Messaging API 规范 Spring Messaging 提供了一套抽象的 API,对消息发送端和消息接收端的模式进行规定,不同的消息中间件提供商可以在这个模式下提供自己的 Spring 实现:在消息发送端需要实现的是一个 XXXTemplate 形式的 Java Bean,结合 Spring Boot 的自动化配置选项提供多个不同的发送消息方法;在消息的消费端是一个 XXXMessageListener 接口(实现方式通常会使用一个注解来声明一个消息驱动的 POJO),提供回调方法来监听和消费消息,这个接口同样可以使用 Spring Boot 的自动化选项和一些定制化的属性。 1. 发送端 RocketMQSpring 在遵循 Spring Messaging API 规范的基础上结合 RocketMQ 自身的功能特点提供了相应的 API。在消息的发送端,RocketMQSpring 通过实现 RocketMQTemplate 完成消息的发送。如下图所示,RocketMQTemplate 继承 AbstractMessageSendingTemplate 抽象类,来支持 Spring Messaging API 标准的消息转换和发送方法,这些方法最终会代理给 doSend 方法,doSend 方法会最终调用 syncSend,由 DefaultMQProducer 实现。 除 Spring Messaging API 规范中的方法,RocketMQTemplate 还实现了 RocketMQ 原生客户端的一些方法,来支持更加丰富的消息类型。值得注意的是,相比于原生客户端需要自己去构建 RocketMQ Message(比如将对象序列化成 byte 数组放入 Message 对象),RocketMQTemplate 可以直接将对象、字符串或者 byte 数组作为参数发送出去(对象序列化操作由 RocketMQSpring 内置完成),在消费端约定好对应的 Schema 即可正常收发。 RocketMQTemplate Send API: SendResult syncSend(String destination, Object payload) SendResult syncSend(String destination, Message message) void asyncSend(String destination, Message message, SendCallback sendCallback) void asyncSend(String destination, Message message, SendCallback sendCallback) …… 2. 消费端 在消费端,需要实现一个包含 @RocketMQMessageListener 注解的类(需要实现 RocketMQListener 接口,并实现 onMessage 方法,在注解中进行 topic、consumerGroup 等属性配置),这个 Listener 会一对一的被放置到 DefaultRocketMQListenerContainer 容器对象中,容器对象会根据消费的方式(并发或顺序),将 RocketMQListener 封装到具体的 RocketMQ 内部的并发或者顺序接口实现。在容器中创建 RocketMQ DefaultPushConsumer 对象,启动并监听定制的 Topic 消息,完成约定 Schema 对象的转换,回调到 Listener 的 onMessage 方法。 @Service @RocketMQMessageListener(topic = "demo.rocketmq.topic", consumerGroup = "string_consumer", selectorExpression = "{demo.rocketmq.tag}") public class StringConsumer implements RocketMQListener { @Override public void onMessage(String message) { System.out.printf(" StringConsumer received: %s \n", message); } } 除此 Push 接口之外,在最新的 2.2.0 版本中,RocketMQSpring 实现了 RocketMQ Lite Pull Consumer。通过在配置文件中进行 consumer 的配置,利用 RocketMQTemplate 的 Recevie 方法即可主动 Pull 消息。 配置文件resource/application.properties: rocketmq.nameserver=localhost:9876 rocketmq.consumer.group=mygroup1 rocketmq.consumer.topic=test Pull Consumer代码: while(!isStop) { List messages = rocketMQTemplate.receive(String.class); System.out.println(messages); } 丰富的消息类型 RocketMQ Spring 消息类型支持方面与 RocketMQ 原生客户端完全对齐,包括同步/异步/oneway、顺序、延迟、批量、事务以及 RequestReply 消息。在这里,主要介绍较为特殊的事务消息和 requestreply 消息。 1. 事务消息 RocketMQ 的事务消息不同于 Spring Messaging 中的事务消息,依然采用 RocketMQ 原生事务消息的方案。如下所示,发送事务消息时需要实现一个包含 @RocketMQTransactionListener 注解的类,并实现 executeLocalTransaction 和 checkLocalTransaction 方法,从而来完成执行本地事务以及检查本地事务执行结果。 // Build a SpringMessage for sending in transaction Message msg = MessageBuilder.withPayload(..)...; // In sendMessageInTransaction(), the first parameter transaction name ("test") // must be same with the @RocketMQTransactionListener's member field 'transName' rocketMQTemplate.sendMessageInTransaction("testtopic", msg, null); // Define transaction listener with the annotation @RocketMQTransactionListener @RocketMQTransactionListener class TransactionListenerImpl implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { // ... local transaction process, return bollback, commit or unknown return RocketMQLocalTransactionState.UNKNOWN; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { // ... check transaction status and return bollback, commit or unknown return RocketMQLocalTransactionState.COMMIT; } } 在 2.1.0 版本中,RocketMQSpring 重构了事务消息的实现,如下图所示,旧版本中每一个 group 对应一个 TransactionProducer,而在新版本中改为每一个 RocketMQTemplate 对应一个 TransationProducer,从而解决了并发使用多个事务消息的问题。当用户需要在单进程使用多个事务消息时,可以使用 ExtRocketMQTemplate 来完成(一般情况下,推荐一个进程使用一个 RocketMQTemplate,ExtRocketMQTemplate 可以使用在同进程中需要使用多个 Producer / LitePullConsumer 的场景,可以为 ExtRocketMQTemplate 指定与标准模版 RocketMQTemplate 不同的 nameserver、group 等配置),并在对应的 RocketMQTransactionListener 注解中指定 rocketMQTemplateBeanName 为 ExtRocketMQTemplate 的 BeanName。 2. RequestReply 消息 在 2.1.0 版本中,RocketMQSpring 开始支持 RequestReply 消息。RequestReply 消息指的是上游服务投递消息后进入等待被通知的状态,直到消费端返回结果并返回给发送端。在 RocketMQSpring 中,发送端通过 RocketMQTemplate 的 sendAndReceivce 方法进行发送,如下所示,主要有同步和异步两种方式。异步方式中通过实现 RocketMQLocalRequestCallback 进行回调。 // 同步发送request并且等待String类型的返回值 String replyString = rocketMQTemplate.sendAndReceive("stringRequestTopic", "request string", String.class); // 异步发送request并且等待User类型的返回值 rocketMQTemplate.sendAndReceive("objectRequestTopic", new User("requestUserName",(byte) 9), new RocketMQLocalRequestCallback() { @Override public void onSuccess(User message) { …… } @Override public void onException(Throwable e) { …… } }); 在消费端,仍然需要实现一个包含 @RocketMQMessageListener 注解的类,但需要实现的接口是 RocketMQReplyListener 接口(普通消息为 RocketMQListener 接口),其中 T 表示接收值的类型,R 表示返回值的类型,接口需要实现带返回值的 onMessage 方法,返回值的内容返回给对应的 Producer。 @Service @RocketMQMessageListener(topic = "stringRequestTopic", consumerGroup = "stringRequestConsumer") public class StringConsumerWithReplyString implements RocketMQReplyListener { @Override public String onMessage(String message) { …… return "reply string"; } } RocketMQSpring 遵循 Spring 约定大于配置(Convention over configuration)的理念,通过启动器(Spring Boot Starter)的方式,在 pom 文件引入依赖(groupId:org.apache.rocketmq,artifactId:rocketmqspringbootstarter)便可以在 Spring Boot 中集成所有 RocketMQ 客户端的所有功能,通过简单的注解使用即可完成消息的收发。在 中有更加详细的用法和常见问题解答。 据统计,从 RocketMQSpring 发布第一个正式版本以来,RocketMQSpring 完成 16 个 bug 修复,37 个 imporvement,其中包括事务消息重构,消息过滤、消息序列化、多实例 RocketMQTemplate 优化等重要优化,欢迎更多的小伙伴能参与到 RocketMQ 社区的建设中来,罗美琪(RocketMQ)和春波特(Spring Boot)的故事还在继续...钉钉搜索群号:21982288,即可进群和众多开发者交流!
作者:RocketMQ 官微
#社区动态 #微服务

2021年1月6日

再见 2020!Apache RocketMQ 发布 4.8.0,DLedger 模式全面提升!
“童年的雨天最是泥泞,却是记忆里最干净的曾经。凛冬散尽,星河长明,新的一年,万事顺遂,再见,2020!” 走过这个岁末,万众期待的 了,在这个版本中社区对 RocketMQ 完成大量的优化和问题修复。更重要的是,该版本从性能、稳定性、功能三个方面大幅度提升 DLedger 模式能力。 是 中一个基于 Raft 的 CommitLog 存储库实现,从 RocketMQ 4.5.0 版本开始,RocketMQ 引入 DLedger 模式来解决了 Broker 组内自动故障转移的问题,而在 4.8.0 版本中社区也对 RocketMQ DLedger 模式进行了全面升级。 性能升级 异步化 pipeline 模式 RocketMQ 4.7.0 重新升级了同步双写的架构,利用异步化 pipeline 模式大幅提升了同步双写的性能。在 RocketMQ 4.8.0 中,社区将这一改进应用到 DLedger 模式中, 下图展示了 DLedger 模式下 broker 处理发送消息的过程。在原本的架构中, SendMessageProcessor 线程对每一个消息的处理,都需要等待多数派复制成功确认,才会返回给客户端,而在新版本中,利用 CompletableFuture 对线程处理消息的过程进行异步化改造,不再等待多数派的确认即可对下一个请求进行处理,Ack 操作由其他线程确认之后再进行结果处理并返回给客户端。通过对复制过程进行切分并将其流水线化,减少线程的长时间等待,充分利用 CPU,从而大幅提高吞吐量。 批量日志复制 Batch 一直是性能优化的重要方式,在新版本中,可以通过设置 isEnableBatchPush=true 来开启 DLedger 模式的批量复制。通过将多条数据聚合在一个包中进行发送,可以降低收发包的个数,从而降低系统调用和上下文的切换。在数据发送压力比较大,并且可能达到系统收发包瓶颈的情况下,批量复制能显著提高吞吐量。值得注意的是,DLedger 模式下的批量复制并不会对单个包进行延时的攒批处理,因此不会影响单个消息的发送时延。 除了上述的性能优化,社区还对 DLedger 模式下影响性能的锁、缓存等做了数项性能优化,使 DLedger 模式下的性能提升数倍。 稳定性升级 为了验证和测试 Dledger 模式的可靠性,除了本地对 DLedger 模式进行了各种各样的测试,社区利用 框架对 RocketMQ DLedger 模式进行了大量 Chaos 测试。OpenMessagingChaos 是一个利用故障注入来验证各种消息平台一致性和高可用性的测试框架,在 OpenMessagingChaos 的测试中,客户端并发地向待测试集群发送和接收消息,中间会间隔性地对集群进行故障注入,最后给出测试结果,包括是否丢消息,是否有重复消息,集群平均故障恢复时间等。利用 OpenMessagingChaos,我们验证了 DLedger 模式在以下故障注入场景下的表现: randompartition(fixedpartition)故障随机挑选节点进行网络隔离,模拟常见的对称网络分区。 randomloss 故障随机挑选节点并对这些节点接收和发送的网络包进行按比例丢弃,模拟一些节点网络较差的情况。 randomkill(minorkill、majorkill、fixedkill)故障模拟常见的进程崩溃情况。 randomsuspend(minorsuspend、majorsuspend、fixedsuspend)故障模拟一些慢节点的情况,比如发生 Full GC、OOM 等。 bridge 和 partitionmajoritiesring 故障模拟比较极端的非对称网络分区。 以 minorkill 故障注入为例,我们部署 5 个节点组成一组 DLedger 模式的 RocketMQ broker 进行 Chaos 测试。minorkill 故障注入会随机挑选集群中少数节点进程杀死,由于杀死少数节点,即使集群不可用也能在一段时间内恢复,方便测试集群平均故障恢复时间。 测试过程中我们设置四个客户端并发向 RocketMQ DLedger 集群发送和接收消息,故障注入时间间隔为 100s,即 100s 正常运行,100s 故障注入,一直循环,整个阶段一共持续 2400s。测试结束后,OpenMessagingChaos 会给出测试结果和时延图。下图展示了整个测试过程中入队操作的时延情况。 图中纵坐标是是时延,横坐标是测试时间,绿色框表示数据发送成功,红色框表示数据发送失败,蓝色框表示不确定是否数据添加成功,灰色部分表示故障注入的时间段。可以看出一些故障注入时间段造成了集群短暂的不可用,一些故障时间段则没有,这是合理的。由于是随机网络分区,所以只有杀死的少数节点包含 leader 节点时才会造成集群重新选举,但即使造成集群重新选举, DLedger 模式在一段时间后也会恢复可用性。 下图是测试结束后 OpenMessagingChaos 给出的统计结果,可以看到一共成功发送了 11W 个数据,没有数据丢失,这表明即使在故障下,RocketMQ DLedger 模式仍旧满足 At Least Once 的投递语义。此外,RTOTestResult 表明 12 次故障时间段一共发生了 3 次集群不可用的情况(与上述时延图一致),但每次集群都能在 30 秒以内恢复,平均故障恢复时间在 22 秒左右。 在 OpenMessaging Chaos 测试过程中,我们发现了 DLedger 模式存在的数个隐性问题并进行了修复,提高了 DLedger 模式下对进程异常崩溃、慢节点、对称/非对称网络分区、网络丢包的容错能力,也进一步验证了 DLedger 模式的可靠性。 功能升级 DLedger 模式支持 Preferred Leader 在旧版本中一组 Broker 中选谁作为 Leader 完全是随机的。但是在新版中我们可以通过配置 preferredLeaderId 来指定优先选举某个节点为 Leader,如下图所示,通过在三个机房部署 DLedger 模式的 broker 组,利用 preferred leader 可以更好的实现机房对齐的功能,图中 DC1 中服务更好,我们可以优先选择在 DC1 中部署 Master。此外,利用 preferred leader 还可以更好实现 DLedger 集群混部,从而充分利用机器资源。 DLedger 模式支持批量消息 从 RocketMQ 4.8.0 开始,DLedger 模式支持批量消息发送,从而在消息类型的支持上完全与 MasterSlave 部署形态保持一致。 除了对 DLedger 模式的大量优化,本次 RocketMQ 版本一共包含 Improvement 25 个,Bug Fix 11 个,文档和代码格式优化 16 个。据不完全统计,这些贡献来自近 40 位 RocketMQ 社区的 Contributor,感谢大家的积极参与。也非常期待有更多的用户、厂商、开发者参与到 RocketMQ 建设中来,加入 Apache RocketMQ 社区,永远不会太迟!
作者:RocketMQ社区
#社区动态 #高可用