2021年2月2日

RocketMQ-Spring 毕业两周年,为什么能成为 Spring 生态中最受欢迎的 messaging 实现?
2019 年 1 月,孵化 6 个月的 RocketMQSpring 作为 Apache RocketMQ 的子项目正式毕业,发布了第一个 Release 版本 2.0.1。该项目是把 RocketMQ 的客户端使用 Spring Boot 的方式进行了封装,可以让用户通过简单的 annotation 和标准的 Spring Messaging API 编写代码来进行消息的发送和消费。当时 RocketMQ 社区同学请 Spring 社区的同学对 RocketMQSpring 代码进行 review,引出一段。 时隔两年,RocketMQSpring 正式发布 2.2.0。在这期间,RocketMQSpring 迭代了数个版本,以 RocketMQSpring 为基础实现的 Spring Cloud Stream RocketMQ Binder、Spring Cloud Bus RocketMQ 登上了 ,Spring 布道师 baeldung 向国外同学介绍,越来越多国内外的同学开始使用 RocketMQSpring 收发消息,RocketMQSpring 仓库的 star 数也在短短两年时间内超越了 SpringKafka 和 SpringAMQP(注:两者均由 Spring 社区维护),成为 Apache RocketMQ 最受欢迎的生态项目之一。 RocketMQSpring 的受欢迎一方面得益于支持丰富业务场景的 RocketMQ 与微服务生态 Spring 的完美契合,另一方面也与 RocketMQSpring 本身严格遵循 Spring Messaging API 规范,支持丰富的消息类型分不开。 遵循 Spring Messaging API 规范 Spring Messaging 提供了一套抽象的 API,对消息发送端和消息接收端的模式进行规定,不同的消息中间件提供商可以在这个模式下提供自己的 Spring 实现:在消息发送端需要实现的是一个 XXXTemplate 形式的 Java Bean,结合 Spring Boot 的自动化配置选项提供多个不同的发送消息方法;在消息的消费端是一个 XXXMessageListener 接口(实现方式通常会使用一个注解来声明一个消息驱动的 POJO),提供回调方法来监听和消费消息,这个接口同样可以使用 Spring Boot 的自动化选项和一些定制化的属性。 1. 发送端 RocketMQSpring 在遵循 Spring Messaging API 规范的基础上结合 RocketMQ 自身的功能特点提供了相应的 API。在消息的发送端,RocketMQSpring 通过实现 RocketMQTemplate 完成消息的发送。如下图所示,RocketMQTemplate 继承 AbstractMessageSendingTemplate 抽象类,来支持 Spring Messaging API 标准的消息转换和发送方法,这些方法最终会代理给 doSend 方法,doSend 方法会最终调用 syncSend,由 DefaultMQProducer 实现。 除 Spring Messaging API 规范中的方法,RocketMQTemplate 还实现了 RocketMQ 原生客户端的一些方法,来支持更加丰富的消息类型。值得注意的是,相比于原生客户端需要自己去构建 RocketMQ Message(比如将对象序列化成 byte 数组放入 Message 对象),RocketMQTemplate 可以直接将对象、字符串或者 byte 数组作为参数发送出去(对象序列化操作由 RocketMQSpring 内置完成),在消费端约定好对应的 Schema 即可正常收发。 RocketMQTemplate Send API: SendResult syncSend(String destination, Object payload) SendResult syncSend(String destination, Message message) void asyncSend(String destination, Message message, SendCallback sendCallback) void asyncSend(String destination, Message message, SendCallback sendCallback) …… 2. 消费端 在消费端,需要实现一个包含 @RocketMQMessageListener 注解的类(需要实现 RocketMQListener 接口,并实现 onMessage 方法,在注解中进行 topic、consumerGroup 等属性配置),这个 Listener 会一对一的被放置到 DefaultRocketMQListenerContainer 容器对象中,容器对象会根据消费的方式(并发或顺序),将 RocketMQListener 封装到具体的 RocketMQ 内部的并发或者顺序接口实现。在容器中创建 RocketMQ DefaultPushConsumer 对象,启动并监听定制的 Topic 消息,完成约定 Schema 对象的转换,回调到 Listener 的 onMessage 方法。 @Service @RocketMQMessageListener(topic = "demo.rocketmq.topic", consumerGroup = "string_consumer", selectorExpression = "{demo.rocketmq.tag}") public class StringConsumer implements RocketMQListener { @Override public void onMessage(String message) { System.out.printf(" StringConsumer received: %s \n", message); } } 除此 Push 接口之外,在最新的 2.2.0 版本中,RocketMQSpring 实现了 RocketMQ Lite Pull Consumer。通过在配置文件中进行 consumer 的配置,利用 RocketMQTemplate 的 Recevie 方法即可主动 Pull 消息。 配置文件resource/application.properties: rocketmq.nameserver=localhost:9876 rocketmq.consumer.group=mygroup1 rocketmq.consumer.topic=test Pull Consumer代码: while(!isStop) { List messages = rocketMQTemplate.receive(String.class); System.out.println(messages); } 丰富的消息类型 RocketMQ Spring 消息类型支持方面与 RocketMQ 原生客户端完全对齐,包括同步/异步/oneway、顺序、延迟、批量、事务以及 RequestReply 消息。在这里,主要介绍较为特殊的事务消息和 requestreply 消息。 1. 事务消息 RocketMQ 的事务消息不同于 Spring Messaging 中的事务消息,依然采用 RocketMQ 原生事务消息的方案。如下所示,发送事务消息时需要实现一个包含 @RocketMQTransactionListener 注解的类,并实现 executeLocalTransaction 和 checkLocalTransaction 方法,从而来完成执行本地事务以及检查本地事务执行结果。 // Build a SpringMessage for sending in transaction Message msg = MessageBuilder.withPayload(..)...; // In sendMessageInTransaction(), the first parameter transaction name ("test") // must be same with the @RocketMQTransactionListener's member field 'transName' rocketMQTemplate.sendMessageInTransaction("testtopic", msg, null); // Define transaction listener with the annotation @RocketMQTransactionListener @RocketMQTransactionListener class TransactionListenerImpl implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { // ... local transaction process, return bollback, commit or unknown return RocketMQLocalTransactionState.UNKNOWN; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { // ... check transaction status and return bollback, commit or unknown return RocketMQLocalTransactionState.COMMIT; } } 在 2.1.0 版本中,RocketMQSpring 重构了事务消息的实现,如下图所示,旧版本中每一个 group 对应一个 TransactionProducer,而在新版本中改为每一个 RocketMQTemplate 对应一个 TransationProducer,从而解决了并发使用多个事务消息的问题。当用户需要在单进程使用多个事务消息时,可以使用 ExtRocketMQTemplate 来完成(一般情况下,推荐一个进程使用一个 RocketMQTemplate,ExtRocketMQTemplate 可以使用在同进程中需要使用多个 Producer / LitePullConsumer 的场景,可以为 ExtRocketMQTemplate 指定与标准模版 RocketMQTemplate 不同的 nameserver、group 等配置),并在对应的 RocketMQTransactionListener 注解中指定 rocketMQTemplateBeanName 为 ExtRocketMQTemplate 的 BeanName。 2. RequestReply 消息 在 2.1.0 版本中,RocketMQSpring 开始支持 RequestReply 消息。RequestReply 消息指的是上游服务投递消息后进入等待被通知的状态,直到消费端返回结果并返回给发送端。在 RocketMQSpring 中,发送端通过 RocketMQTemplate 的 sendAndReceivce 方法进行发送,如下所示,主要有同步和异步两种方式。异步方式中通过实现 RocketMQLocalRequestCallback 进行回调。 // 同步发送request并且等待String类型的返回值 String replyString = rocketMQTemplate.sendAndReceive("stringRequestTopic", "request string", String.class); // 异步发送request并且等待User类型的返回值 rocketMQTemplate.sendAndReceive("objectRequestTopic", new User("requestUserName",(byte) 9), new RocketMQLocalRequestCallback() { @Override public void onSuccess(User message) { …… } @Override public void onException(Throwable e) { …… } }); 在消费端,仍然需要实现一个包含 @RocketMQMessageListener 注解的类,但需要实现的接口是 RocketMQReplyListener 接口(普通消息为 RocketMQListener 接口),其中 T 表示接收值的类型,R 表示返回值的类型,接口需要实现带返回值的 onMessage 方法,返回值的内容返回给对应的 Producer。 @Service @RocketMQMessageListener(topic = "stringRequestTopic", consumerGroup = "stringRequestConsumer") public class StringConsumerWithReplyString implements RocketMQReplyListener { @Override public String onMessage(String message) { …… return "reply string"; } } RocketMQSpring 遵循 Spring 约定大于配置(Convention over configuration)的理念,通过启动器(Spring Boot Starter)的方式,在 pom 文件引入依赖(groupId:org.apache.rocketmq,artifactId:rocketmqspringbootstarter)便可以在 Spring Boot 中集成所有 RocketMQ 客户端的所有功能,通过简单的注解使用即可完成消息的收发。在 中有更加详细的用法和常见问题解答。 据统计,从 RocketMQSpring 发布第一个正式版本以来,RocketMQSpring 完成 16 个 bug 修复,37 个 imporvement,其中包括事务消息重构,消息过滤、消息序列化、多实例 RocketMQTemplate 优化等重要优化,欢迎更多的小伙伴能参与到 RocketMQ 社区的建设中来,罗美琪(RocketMQ)和春波特(Spring Boot)的故事还在继续...钉钉搜索群号:21982288,即可进群和众多开发者交流!
作者:RocketMQ 官微
#社区动态 #微服务

2021年1月6日

再见 2020!Apache RocketMQ 发布 4.8.0,DLedger 模式全面提升!
“童年的雨天最是泥泞,却是记忆里最干净的曾经。凛冬散尽,星河长明,新的一年,万事顺遂,再见,2020!” 走过这个岁末,万众期待的 了,在这个版本中社区对 RocketMQ 完成大量的优化和问题修复。更重要的是,该版本从性能、稳定性、功能三个方面大幅度提升 DLedger 模式能力。 是 中一个基于 Raft 的 CommitLog 存储库实现,从 RocketMQ 4.5.0 版本开始,RocketMQ 引入 DLedger 模式来解决了 Broker 组内自动故障转移的问题,而在 4.8.0 版本中社区也对 RocketMQ DLedger 模式进行了全面升级。 性能升级 异步化 pipeline 模式 RocketMQ 4.7.0 重新升级了同步双写的架构,利用异步化 pipeline 模式大幅提升了同步双写的性能。在 RocketMQ 4.8.0 中,社区将这一改进应用到 DLedger 模式中, 下图展示了 DLedger 模式下 broker 处理发送消息的过程。在原本的架构中, SendMessageProcessor 线程对每一个消息的处理,都需要等待多数派复制成功确认,才会返回给客户端,而在新版本中,利用 CompletableFuture 对线程处理消息的过程进行异步化改造,不再等待多数派的确认即可对下一个请求进行处理,Ack 操作由其他线程确认之后再进行结果处理并返回给客户端。通过对复制过程进行切分并将其流水线化,减少线程的长时间等待,充分利用 CPU,从而大幅提高吞吐量。 批量日志复制 Batch 一直是性能优化的重要方式,在新版本中,可以通过设置 isEnableBatchPush=true 来开启 DLedger 模式的批量复制。通过将多条数据聚合在一个包中进行发送,可以降低收发包的个数,从而降低系统调用和上下文的切换。在数据发送压力比较大,并且可能达到系统收发包瓶颈的情况下,批量复制能显著提高吞吐量。值得注意的是,DLedger 模式下的批量复制并不会对单个包进行延时的攒批处理,因此不会影响单个消息的发送时延。 除了上述的性能优化,社区还对 DLedger 模式下影响性能的锁、缓存等做了数项性能优化,使 DLedger 模式下的性能提升数倍。 稳定性升级 为了验证和测试 Dledger 模式的可靠性,除了本地对 DLedger 模式进行了各种各样的测试,社区利用 框架对 RocketMQ DLedger 模式进行了大量 Chaos 测试。OpenMessagingChaos 是一个利用故障注入来验证各种消息平台一致性和高可用性的测试框架,在 OpenMessagingChaos 的测试中,客户端并发地向待测试集群发送和接收消息,中间会间隔性地对集群进行故障注入,最后给出测试结果,包括是否丢消息,是否有重复消息,集群平均故障恢复时间等。利用 OpenMessagingChaos,我们验证了 DLedger 模式在以下故障注入场景下的表现: randompartition(fixedpartition)故障随机挑选节点进行网络隔离,模拟常见的对称网络分区。 randomloss 故障随机挑选节点并对这些节点接收和发送的网络包进行按比例丢弃,模拟一些节点网络较差的情况。 randomkill(minorkill、majorkill、fixedkill)故障模拟常见的进程崩溃情况。 randomsuspend(minorsuspend、majorsuspend、fixedsuspend)故障模拟一些慢节点的情况,比如发生 Full GC、OOM 等。 bridge 和 partitionmajoritiesring 故障模拟比较极端的非对称网络分区。 以 minorkill 故障注入为例,我们部署 5 个节点组成一组 DLedger 模式的 RocketMQ broker 进行 Chaos 测试。minorkill 故障注入会随机挑选集群中少数节点进程杀死,由于杀死少数节点,即使集群不可用也能在一段时间内恢复,方便测试集群平均故障恢复时间。 测试过程中我们设置四个客户端并发向 RocketMQ DLedger 集群发送和接收消息,故障注入时间间隔为 100s,即 100s 正常运行,100s 故障注入,一直循环,整个阶段一共持续 2400s。测试结束后,OpenMessagingChaos 会给出测试结果和时延图。下图展示了整个测试过程中入队操作的时延情况。 图中纵坐标是是时延,横坐标是测试时间,绿色框表示数据发送成功,红色框表示数据发送失败,蓝色框表示不确定是否数据添加成功,灰色部分表示故障注入的时间段。可以看出一些故障注入时间段造成了集群短暂的不可用,一些故障时间段则没有,这是合理的。由于是随机网络分区,所以只有杀死的少数节点包含 leader 节点时才会造成集群重新选举,但即使造成集群重新选举, DLedger 模式在一段时间后也会恢复可用性。 下图是测试结束后 OpenMessagingChaos 给出的统计结果,可以看到一共成功发送了 11W 个数据,没有数据丢失,这表明即使在故障下,RocketMQ DLedger 模式仍旧满足 At Least Once 的投递语义。此外,RTOTestResult 表明 12 次故障时间段一共发生了 3 次集群不可用的情况(与上述时延图一致),但每次集群都能在 30 秒以内恢复,平均故障恢复时间在 22 秒左右。 在 OpenMessaging Chaos 测试过程中,我们发现了 DLedger 模式存在的数个隐性问题并进行了修复,提高了 DLedger 模式下对进程异常崩溃、慢节点、对称/非对称网络分区、网络丢包的容错能力,也进一步验证了 DLedger 模式的可靠性。 功能升级 DLedger 模式支持 Preferred Leader 在旧版本中一组 Broker 中选谁作为 Leader 完全是随机的。但是在新版中我们可以通过配置 preferredLeaderId 来指定优先选举某个节点为 Leader,如下图所示,通过在三个机房部署 DLedger 模式的 broker 组,利用 preferred leader 可以更好的实现机房对齐的功能,图中 DC1 中服务更好,我们可以优先选择在 DC1 中部署 Master。此外,利用 preferred leader 还可以更好实现 DLedger 集群混部,从而充分利用机器资源。 DLedger 模式支持批量消息 从 RocketMQ 4.8.0 开始,DLedger 模式支持批量消息发送,从而在消息类型的支持上完全与 MasterSlave 部署形态保持一致。 除了对 DLedger 模式的大量优化,本次 RocketMQ 版本一共包含 Improvement 25 个,Bug Fix 11 个,文档和代码格式优化 16 个。据不完全统计,这些贡献来自近 40 位 RocketMQ 社区的 Contributor,感谢大家的积极参与。也非常期待有更多的用户、厂商、开发者参与到 RocketMQ 建设中来,加入 Apache RocketMQ 社区,永远不会太迟!
作者:RocketMQ社区
#社区动态 #高可用