2022年2月22日

RocketMQ-Streams 首个版本发布,轻量级计算的新选择
RocketMQStreams 聚焦「大数据量高过滤轻窗口计算」场景,核心打造轻资源,高性能优势,在资源敏感场景有很大优势,最低 1Core,1G 可部署。通过大量过滤优化,性能比其他大数据提升 25 倍性能。广泛应用于安全,风控,边缘计算,消息队列流计算。 RocketMQStreams 兼容 Flink 的 SQL,udf/udtf/udaf,将来我们会和 Flink 生态做深度融合,即可以独立运行,也可发布成 Flink 任务,跑在 Flink 集群,对于有 Flink 集群的场景,即能享有轻资源优势,可以做到统一部署和运维。 01 _RocketMQStreams 特点及应用场景_  RocketMQStreams 应用场景 计算场景:适合大数据量高过滤轻窗口计算的场景。不同于主流计算引擎,需要先部署集群,写任务,发布,调优,运行这么复杂的过程。RocketMQStreams 本身就是一个 lib 包,基于 SDK 写完流任务,可以直接运行。支持大数据开发需要的计算特性:ExactlyONCE,灵活窗口(滚动、滑动、会话),双流Join,高吞吐、低延迟、高性能。最低 1Core,1G 可以运行。 SQL引擎:RocketMQStreams 可视作一个 SQL 引擎,兼容 Flink SQL 语法,支持 Flink udf/udtf/udaf 的扩展。支持 SQL 热升级,写完 SQL,通过 SDK 提交 SQL,就可以完成 SQL 的热发布。 ETL引擎:RocketMQStreams 还可视作 ETL 引擎,在很多大数据场景,需要完成数据从一个源经过 ETl,汇聚到统一存储,里面内置了 grok,正则解析等函数,可以结合 SQL 一块完成数据 ETL 。 开发 SDK,它也是一个数据开发 SDK 包,里面的大多数组件都可以单独使用,如 Source/sink,它屏蔽了数据源,数据存储细节,提供统一编程接口,一套代码,切换输入输出,不需要改变代码。  RocketMQStreams 设计思路 设计目标 依赖少,部署简单,1Core,1G 单实例可部署,可随意扩展规模。 实现需要的大数据特性:ExactlyONCE,灵活窗口(滚动、滑动、会话),双流 Join,高吞吐、低延迟、高性能。 实现成本可控,实现低资源,高性能。 兼容 Flink SQL,UDF/UDTF,让非技术人员更易上手。 设计思路 采用 sharednothing 的分布式架构设计,依赖消息队列做负载均衡和容错机制,单实例可启动,增加实例实现能力扩展。并发能力取决于分片数。 利用消息队列的分片做 shuffle,利用消息队列负载均衡实现容错。 利用存储实现状态备份,实现 ExactlyONCE 的语义。用结构化远程存储实现快速启动,不必等本地存储恢复。  RocketMQStreams 特点和创新 02 _RocketMQStreams SDK 详解_  Hello World 按照惯例,我们先从一个例子来了解 RocketMQStreams namespace:相同 namespace 的任务可以跑在一个进程里,可以共享配置 pipelineName:job name DataStreamSource:创建 source 节点 map:用户函数,可以通过实现 MapFunction 扩展功能 toPrint:结果打印出来 start:启动任务 运行上面代码就会启动一个实例。如果想多实例并发,可以启动多个实例,每个实例消费部分 RocketMQ 的数据。 运行结果:把原始消息拼接上“”,并打印出来  RocketMQStreams SDK StreamBuilder 做为起点,通过设置 namespace,jobName 创建一个 DataStreamSource 。 DataStreamSource 通过 from 方法,设置 source,创建 DataStream 对象。 DataStream 提供多种操作,会产生不同的流: to 操作产生 DataStreamAction window 操作产生 WindowStream 配置 window 参数 join 操作产生 JoinStream 配置 join 条件 Split 操作产生 SplitStream 配置 split 条件 其他操作产生 DataStream DataStreamAction 启动整个任务,也可以配置任务的各种策略参数。支持异步启动和同步启动。  RocketMQStreams 算子  RocketMQStreams 算子 SQL 有两种部署模式,1 是直接运行 client 启动 SQL,见第一个红框;2 是搭建 server 集群,通过 client 提交 SQL 实现热部署,见第二个红框。 RocketMQStreams SQL 扩展,支持多种扩展方式: 通过 FlinkUDF,UDTF,UDAF 扩展 SQL 能力,在 SQL 中通过 create function 引入,有个限制条件,即 UDF 在 open 时未用到 Flink FunctionContext 的内容。 通过内置函数扩展 SQL 的函数,语法同 Flink 语法,函数名是内置函数的名称,类名是固定的。如下图,引入了一个 now 的函数,输出当前时间。系统内置了 200 多个函数,可按需引入。 通过扩展函数实现,实现一个函数很简单,只需要在 class 上标注 Function,在需要发布成函数的方法上标注 FunctionMethod,并设置需要发布的函数名即可,如果需要系统信息,前面两个函数可以是 IMessage 和 Abstract,如果不需要,直接写参数即可,参数无格式要求。如下图,创建了一个 now 的函数,两种写法都可以。可以通过 currentTime=now()来调用,会在 Message 中增加一个 key=currentTime,value=当前时间的变量。 把现有 java 代码发布成函数,通过策略配置,把 java 代码的类名,方法名,期望用到的函数名,配置进去,把 java 的 jar 包 copy 到 jar 包目录即可。下图是几种扩展的应用实例。 03 _RocketMQStreams 架构及原理实现_  整体架构  Source 实现 Source 要求实现最少消费一次的语义,系统通过 checkpoint 系统消息实现,在提交 offset 前发送 checkpoint 消息,通知所有算子刷新内存。 Source 支持分片的自动负载均衡和容错。 数据源在分片移除时,发送移除系统消息,让算子完成分片清理工作。 当有新分片时, 发送新增分片消息,让算子完成分片的初始化。 数据源通过 start 方法,启动 consuemr 获取消息。 原始消息经过编码,附加头部信息包装成 Message 投递给后续算子。  Sink 实现 Sink 是实时性和吞吐的一个结合。 实现一个 Sink 只要继承 AbstractSink 类实现 batchInsert 方法即可。batchInsert 的含义是一批数据写入存储,需要子类调用存储接口实现,尽量应用存储的批处理接口,提高吞吐。 常规的使用方式是写 Messagecacheflush存储的方式,系统会严格保证,每次批次写入存储的量不超过 batchsize 的量,如果超了,会拆分成多批写入。 Sink 有一个 cache,数据默认写 cache,批次写入存储,提高吞吐量。(一个分片一个 cache)。 可以开启自动刷新,每个分片会有一个线程,定时刷新 cache 数据到存储,提高实时性。实现类:DataSourceAutoFlushTask 。 也可以通过调用 flush 方法刷新 cache 到存储。 Sink 的 cache 会有内存保护,当 cache 的消息条数batchSize,会强制刷新,释放内存。  RocketMQStreams ExactlyONCE Source 确保在 commit offset 时,会发送 checkpoint 系统消息,收到消息的组件会完成存盘操作。消息至少消费一次。 每条消息会有消息头部,里面封装了 QueueId 和 offset 。 组件在存储数据时,会把 QueueId 和处理的最大 offset 存储下来,当有消息重复时,根据 maxoffset 去重。 内存保护,一个 checkpoint 周期可能有多次 flush(条数触发),保障内存占用可控。  RocketMQStreams Window 支持滚动,滑动和会话窗口。支持事件时间和自然时间(消息进入算子的时间)。 支持高性能模式和高可靠模式,高性能模式不依赖远程存储,但在分片切换时的窗口数据会有丢失。 快速启动,无需等本地存储恢复,在发生错误或分片切换时,异步从远程存储恢复数据,同时直接访问远程存储计算。 利用消息队列负载均衡,实现扩容缩容,每个 Queue 是一个分组,一个分组同一刻只被一台机器消费。 正常计算依赖本地存储,具备 Flink 相似的计算性能。 支持三种触发模式,可以均衡 watermark 延迟和实时性要求 04 _RocketMQStreams 在云安全的应用_  在安全应用的背景 公共云转战专有云,在入侵检测计算方面遇到了资源问题,大数据集群默认不输出,输出最低 6 台高配机器,用户很难接受因为买云盾增配一套大数据集群。 专有云用户升级,运维困难,无法快速升级能力和修复 bug。  流计算在安全的应用 基于安全特点(大数据高过滤轻窗口计算)打造轻量级计算引擎:经过分析所有的规则都会做前置过滤,然后才会做较重的统计,窗口,join 操作,且过滤率比较高,基于此特点,可以用更轻的方案实现统计,join 操作。 通过 RocketMQStreams,覆盖 100%专有云规则(正则,join,统计)。 轻资源,内存是公共云引擎的 1/70,CPU 是 1/6,通过指纹过滤优化,性能提升 5 倍以上,且资源不随规则线性增加,新增规则无资源压力。复用以前的正则引擎资源,可支持 95%以上局点,不需要增加额外物理资源。 通过高压缩维表,支持千万情报。1000 W 数据只需要 330 M 内存。 通过 C/S 部署模式,SQL 和引擎可热发布,尤其护网场景,可快速上线规则。 05 _RocketMQStreams 未来规划_ 新版本下载地址:
作者:袁小栋、程君杰
#社区动态 #流处理

2022年1月26日

Apache RocketMQ + Hudi 快速构建 Lakehouse
本文目录 背景知识 大数据时代的构架演进 RocketMQ Connector&Stream Apache Hudi 构建Lakehouse实操 本文标题包含三个关键词:Lakehouse、RocketMQ、Hudi。我们先从整体Lakehouse架构入手,随后逐步分析架构产生的原因、架构组件特点以及构建Lakehouse架构的实操部分。 背景知识 1、Lakehouse架构 Lakehouse最初由Databrick提出,并对Lakehouse架构特征有如下要求: (1)事务支持 企业内部许多数据管道通常会并发读写数据。对ACID事务的支持确保了多方并发读写数据时的一致性问题; (2)Schema enforcement and governance Lakehouse应该有一种方式可以支持模式执行和演进、支持DW schema的范式(如星星或雪花模型),能够对数据完整性进行推理,并且具有健壮的治理和审计机制; (3)开放性 使用的存储格式是开放式和标准化的(如parquet),并且为各类工具和引擎,包括机器学习和Python/R库,提供API,以便它们可以直接有效地访问数据; (4)BI支持 Lakehouse可以直接在源数据上使用BI工具。这样可以提高数据新鲜度、减少延迟,并且降低了在数据池和数据仓库中操作两个数据副本的成本; (5)存储与计算分离 在实践中,这意味着存储和计算使用单独的集群,因此这些系统能够扩展到支持更大的用户并发和数据量。一些现代数仓也具有此属性; (6)支持从非结构化数据到结构化数据的多种数据类型 Lakehouse可用于存储、优化、分析和访问许多数据应用所需的包括image、video、audio、text以及半结构化数据; (7)支持各种工作负载 包括数据科学、机器学习以及SQL和分析。可能需要多种工具来支持这些工作负载,但它们底层都依赖同一数据存储库; (8)端到端流 实时报表是许多企业中的标准应用。对流的支持消除了需要构建单独系统来专门用于服务实时数据应用的需求。 从上述对Lakehouse架构的特点描述我们可以看出,针对单一功能,我们可以利用某些开源产品组合构建出一套解决方案。但对于全部功能的支持,目前好像没有一个通用的解决方案。接下来,我们先了解大数据时代主流的数据处理架构是怎样的。 大数据时代的架构演进 1、大数据时代的开源产品 大数据时代的开源产品种类繁多,消息领域的RocketMQ、Kafka;计算领域的flink、spark、storm;存储领域的HDFS、Hbase、Redis、ElasticSearch、Hudi、DeltaLake等等。 为什么会产生这么多开源产品呢?首先在大数据时代数据量越来越大,而且每个业务的需求也各不相同,因此就产生出各种类型的产品供架构师选择,用于支持各类场景。然而众多的品类产品也给架构师们带来一些困扰,比如选型困难、试错成本高、学习成本高、架构复杂等等。 2、当前主流的多层架构 大数据领域的处理处理场景包含数据分析、BI、科学计算、机器学习、指标监控等场景,针对不同场景,业务方会根据业务特点选择不同的计算引擎和存储引擎;例如交易指标可以采用binlog + CDC+ RocketMQ + Flink + Hbase + ELK组合,用于BI和Metric可视化。 (1)多层架构的优点:支持广泛的业务场景; (2)多层架构的缺点: 处理链路长,延迟高; 数据副本多,成本翻倍; 学习成本高; 造成多层架构缺点主要原因是存储链路和计算链路太长。 我们真的需要如此多的解决方案来支持广泛的业务场景吗?Lakehouse架构是否可以统一解决方案? 多层架构的存储层是否可以合并?Hudi产品是否能够支持多种存储需求? 多层架构的计算层是否可以合并?RocketMQ stream是否能够融合消息层和计算层? 当前主流的多层架构 3、Lakehouse架构产生 Lakehouse架构是多层架构的升级版本,将存储层复杂度继续降低到一层。再进一步压缩计算层,将消息层和计算层融合,RocketMQ stream充当计算的角色。我们得到如下图所示的新架构。新架构中,消息出入口通过RocketMQ connector实现,消息计算层由RocketMQ stream实现,在RocketMQ内部完成消息计算中间态的流转;计算结果通过RocketMQHudiconnector收口落库Hudi,Hudi支持多种索引,并提供统一的API输出给不同产品。 Lakehouse架构 下面我们分析下该架构的特点。 (1)Lakehouse架构的优点: 链路更短,更适合实时场景,数据新鲜感高; 成本可控,降低了存储成本; 学习成本低,对程序员友好; 运维复杂度大幅降低; (2)Lakehouse架构的缺点 对消息产品和数据湖产品的稳定性、易用性等要求高,同时消息产品需要支持计算场景,数据湖产品需要提供强大的索引功能。 (3)选择 在Lakehouse架构中我们选择消息产品RocketMQ和数据湖产品Hudi。 同时,可以利用RocketMQ stream在RocketMQ集群上将计算层放在其中集成,这样就将计算层降低到一层,能够满足绝大部分中小型大数据处理场景。 接下来我们逐步分析RocketMQ和Hudi两款产品的特点。 RocketMQ Connector & Stream RocketMQ 发展历程图 RocketMQ从2017年开始进入Apache孵化,2018年RocketMQ 4.0发布完成云原生化,2021年RocketMQ 5.0发布全面融合消息、事件、流。 1、业务消息领域首选 RocketMQ作为一款“让人睡得着觉的消息产品”成为业务消息领域的首选,这主要源于产品的以下特点: (1)金融级高可靠 经历了阿里巴巴双十一的洪峰检验; (2)极简架构 如下图所示, RocketMQ的架构主要包含两部分包括:源数据集群NameServer Cluster和计算存储集群Broker Cluster。 RocketMQ 构架图 NameServer节点无状态,可以非常简单的进行横向扩容。Broker节点采用主备方式保证数据高可靠性,支持一主多备的场景,配置灵活。 搭建方式:只需要简单的代码就可以搭建RocketMQ集群: Jar: nohup sh bin/mqnamesrv & nohup sh bin/mqbroker n localhost:9876 & On K8S: kubectl apply f example/rocketmq_cluster.yaml (3)极低运维成本 RocketMQ的运维成本很低,提供了很好的CLI工具MQAdmin,MQAdmin提供了丰富的命令支持,覆盖集群健康状态检查、集群进出流量管控等多个方面。例如,mqadmin clusterList一条命令可以获取到当前集群全部节点状态(生产消费流量、延迟、排队长度、磁盘水位等);mqadmin updateBrokerConfig命令可以实时设置broker节点或topic的可读可写状态,从而可以动态摘除临时不可用节点,达到生产消费的流量迁移效果。 (4)丰富的消息类型 RocketMQ支持的消息类型包括:普通消息、事务消息、延迟消息、定时消息、顺序消息等。能够轻松支持大数据场景和业务场景。 (5)高吞吐、低延迟 压测场景主备同步复制模式,每台Broker节点都可以将磁盘利用率打满,同时可以将p99延迟控制在毫秒级别。 2、RocketMQ 5.0概况 RocketMQ 5.0是生于云、长于云的云原生消息、事件、流超融合平台,它具有以下特点: (1)轻量级SDK 全面支持云原生通信标准 gRPC 协议; 无状态 Pop 消费模式,多语言友好,易集成; (2)极简架构 无外部依赖,降低运维负担; 节点间松散耦合,任意服务节点可随时迁移; (3)可分可合的存储计算分离 Broker 升级为真正的无状态服务节点,无 binding; Broker 和 Store节点分离部署、独立扩缩; 多协议标准支持,无厂商锁定; 可分可合,适应多种业务场景,降低运维负担; 如下图所示,计算集群(Broker)主要包括抽象模型和相对应的协议适配,以及消费能力和治理能力。存储集群(Store)主要分为消息存储CommitLog(多类型消息存储、多模态存储)和索引存储Index(多元索引)两部分,如果可以充分发挥云上存储的能力,将CommitLog和Index配置在云端的文件系统就可以天然的实现存储和计算分离。 (4)多模存储支持 满足不同基础场景下的高可用诉求; 充分利用云上基础设施,降低成本; (5)云原生基础设施: 可观测性能力云原生化,OpenTelemetry 标准化; Kubernetes 一键式部署扩容交付。 RocketMQ 5.02021年度大事件及未来规划 3、RocketMQConnector a、传统数据流 (1)传统数据流的弊端 生产者消费者代码需要自己实现,成本高; 数据同步的任务没有统一管理; 重复开发,代码质量参差不齐; (2)解决方案:RocketMQ Connector 合作共建,复用数据同步任务代码; 统一的管理调度,提高资源利用率; b、RocketMQ Connector数据同步流程 相比传统数据流,RocketMQ connector数据流的不同在于将 source 和 sink 进行统一管理,同时它开放源码,社区也很活跃。 4、RocketMQ Connector架构 如上图所示,RocketMQ Connector架构主要包含Runtime和Worker两部分,另外还有生态Source&Sink。 (1)标准:OpenMessaging (2)生态:支持ActiveMQ、Cassandra、ES、JDBC、JMS、MongoDB、Kafka、RabbitMQ、Mysql、Flume、Hbase、Redis等大数据领域的大部分产品; (3)组件:Manager统一管理调度,如果有多个任务可以将所有任务统一进行负载均衡,均匀的分配到不同Worker上,同时Worker可以进行横向扩容。 5、RocketMQ Stream RocketMQ Stream是一款将计算层压缩到一层的产品。它支持一些常见的算子如window、join、维表,兼容Flink SQL、UDF/UDAF/UDTF。 Apache Hudi Hudi 是一个流式数据湖平台,支持对海量数据快速更新。内置表格式,支持事务的存储层、一系列表服务、数据服务(开箱即用的摄取工具)以及完善的运维监控工具。Hudi 可以将存储卸载到阿里云上的 OSS、AWS 的S3这些存储上。 Hudi的特性包括: 事务性写入,MVCC/OCC并发控制; 对记录级别的更新、删除的原生支持; 面向查询优化:小文件自动管理,针对增量拉取优化的设计,自动压缩、聚类以优化文件布局; Apache Hudi是一套完整的数据湖平台。它的特点有: 各模块紧密集成,自我管理; 使用 Spark、Flink、Java 写入; 使用 Spark、Flink、Hive、Presto、Trino、Impala、 AWS Athena/Redshift等进行查询; 进行数据操作的开箱即用工具/服务。 Apache Hudi主要针对以下三类场景进行优化: 1、流式处理栈 (1) 增量处理; (2) 快速、高效; (3) 面向行; (4) 未优化扫描; 2、批处理栈 (1) 批量处理; (2) 低效; (3) 扫描、列存格式; 3、增量处理栈 (1) 增量处理; (2) 快速、高效; (3) 扫描、列存格式。 构建 Lakehouse 实操 该部分只介绍主流程和实操配置项,本机搭建的实操细节可以参考附录部分。 1、准备工作 RocketMQ version:4.9.0 rocketmqconnecthudi version:0.0.1SNAPSHOT Hudi version:0.8.0 2、构建RocketMQHudiconnector (1) 下载: _  git clone _ (2) 配置: /data/lakehouse/rocketmqexternals/rocketmqconnect/rocketmqconnectruntime/target/distribution/conf/connect.conf 中connectorplugin 路径 (3) 编译: cd rocketmqexternals/rocketmqconnecthudi mvn clean install DskipTest U rocketmqconnecthudi0.0.1SNAPSHOTjarwithdependencies.jar就是我们需要使用的rocketmqhudiconnector 3、运行 (1) 启动或使用现有的RocketMQ集群,并初始化元数据Topic: connectorclustertopic (集群信息) connectorconfigtopic (配置信息) connectoroffsettopic (sink消费进度) connectorpositiontopic (source数据处理进度 并且为了保证消息有序,每个topic可以只建一个queue) (2) 启动RocketMQ connector运行时 cd /data/lakehouse/rocketmqexternals/rocketmqconnect/rocketmqconnectruntime sh ./run_worker.sh Worker可以启动多个 (3) 配置并启动RocketMQhudiconnector任务 请求RocketMQ connector runtime创建任务 curl http://{runtimeip}:{runtimeport}/connectors/{rocketmqhudisinkconnectorname} ?config='{"connectorclass":"org.apache.rocketmq.connect.hudi.connector.HudiSinkConnector","topicNames":"topicc","tablePath":"file:///tmp/hudi_connector_test","tableName":"hudi_connector_test_table","insertShuffleParallelism":"2","upsertShuffleParallelism":"2","deleteParallelism":"2","sourcerecordconverter":"org.apache.rocketmq.connect.runtime.converter.RocketMQConverter","sourcerocketmq":"127.0.0.1:9876","srccluster":"DefaultCluster","refreshinterval":"10000","schemaPath":"/data/lakehouse/config/user.avsc"\}’ 启动成功会打印如下日志: 20210906 16:23:14 INFO pool2thread1 Open HoodieJavaWriteClient successfully (4) 此时向source topic生产的数据会自动写入到1Hudi对应的table中,可以通过Hudi的api进行查询。 4、配置解析 (1) RocketMQ connector需要配置RocketMQ集群信息和connector插件位置,包含:connect工作节点id标识workerid、connect服务命令接收端口httpPort、rocketmq集群namesrvAddr、connect本地配置储存目录storePathRootDir、connector插件目录pluginPaths 。 RocketMQ connector配置表 (2) Hudi任务需要配置Hudi表路径tablePath和表名称tableName,以及Hudi使用的Schema文件。 Hudi任务配置表 _点击__即可查看Lakehouse构建实操视频_ 附录:在本地Mac系统构建Lakehouse demo 涉及到的组件:rocketmq、rocketmqconnectorruntime、rocketmqconnecthudi、hudi、hdfs、avro、sparkshell0、启动hdfs 下载hadoop包 cd /Users/osgoo/Documents/hadoop2.10.1 vi coresite.xml fs.defaultFS hdfs://localhost:9000 vi hdfssite.xml dfs.replication 1 ./bin/hdfs namenode format ./sbin/startdfs.sh jps 看下namenode,datanode lsof i:9000 ./bin/hdfs dfs mkdir p /Users/osgoo/Downloads 1、启动rocketmq集群,创建rocketmqconnector内置topic QickStart:https://rocketmq.apache.org/docs/quickstart/ sh mqadmin updatetopic t connectorclustertopic n localhost:9876 c DefaultCluster sh mqadmin updatetopic t connectorconfigtopic n localhost:9876 c DefaultCluster sh mqadmin updatetopic t connectoroffsettopic n localhost:9876 c DefaultCluster sh mqadmin updatetopic t connectorpositiontopic n localhost:9876 c DefaultCluster 2、创建数据入湖的源端topic,testhudi1 sh mqadmin updatetopic t testhudi1 n localhost:9876 c DefaultCluster 3、编译rocketmqconnecthudi0.0.1SNAPSHOTjarwithdependencies.jar cd rocketmqconnecthudi mvn clean install DskipTest U 4、启动rocketmqconnector runtime 配置connect.conf workerId=DEFAULT_WORKER_1 storePathRootDir=/Users/osgoo/Downloads/storeRoot Http port for user to access REST API httpPort=8082 Rocketmq namesrvAddr namesrvAddr=localhost:9876 Source or sink connector jar file dir,The default value is rocketmqconnectsample pluginPaths=/Users/osgoo/Downloads/connectorplugins 拷贝 rocketmqhudiconnector.jar 到 pluginPaths=/Users/osgoo/Downloads/connectorplugins sh run_worker.sh 5、配置入湖config curl http://localhost:8082/connectors/rocketmqconnecthudi?config='\{"connectorclass":"org.apache.rocketmq.connect.hudi.connector.HudiSinkConnector","topicNames":"testhudi1","tablePath":"hdfs://localhost:9000/Users/osgoo/Documents/basepath7","tableName":"t7","insertShuffleParallelism":"2","upsertShuffleParallelism":"2","deleteParallelism":"2","sourcerecordconverter":"org.apache.rocketmq.connect.runtime.converter.RocketMQConverter","sourcerocketmq":"127.0.0.1:9876","sourcecluster":"DefaultCluster","refreshinterval":"10000","schemaPath":"/Users/osgoo/Downloads/user.avsc"\}' 6、发送消息到testhudi1 7、 利用spark读取 cd /Users/osgoo/Downloads/spark3.1.2binhadoop3.2/bin ./sparkshell \ packages org.apache.hudi:hudispark3bundle_2.12:0.9.0,org.apache.spark:sparkavro_2.12:3.0.1 \ conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' import org.apache.hudi.QuickstartUtils._ import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ val tableName = "t7" val basePath = "hdfs://localhost:9000/Users/osgoo/Documents/basepath7" val tripsSnapshotDF = spark. read. format("hudi"). load(basePath + "/") tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") spark.sql("select from hudi_trips_snapshot").show()
#社区动态 #生态集成

2021年3月18日

如何在 Spring 生态中玩转Apache RocketMQ?
在 Spring 生态中玩转 RocketMQ 系列教程现已登陆知行动手实验室,立即体验! 移动端同学,需要在PC端登录 start.aliyun.com 进行体验。 RocketMQ 作为业务消息的首选,在消息和流处理领域被广泛应用。而微服务生态 Spring 框架也是业务开发中最受欢迎的框架,两者的完美契合使得 RocketMQ 成为 Spring Messaging 实现中最受欢迎的消息实现。本文展示了 5 种在 Spring 生态中文玩转 RocketMQ 的方式,并描述了每个项目的特点和使用场景。 一、前言 上世纪 90 年代末,随着 Java EE(Enterprise Edition)的出现,特别是 Enterprise Java Beans 的使用需要复杂的描述符配置和死板复杂的代码实现,增加了广大开发者的学习曲线和开发成本,由此基于简单的 XML 配置和普通 Java 对象(Plain Old Java Objects)的 Spring 技术应运而生,依赖注入(Dependency Injection),控制反转(Inversion of Control)和面向切面编程(AOP)的技术更加敏捷地解决了传统 Java 企业及版本的不足。随着 Spring 的持续演进,基于注解(Annotation)的配置逐渐取代了 XML 文件配置。除了依赖注入、控制翻转、AOP 这些技术,Spring 后续衍生出 AMQP、Transactional、Security、Batch、Data Access 等模块,涉及开发的各个领域。 2014 年 4 月 1 日,Spring Boot 1.0.0 正式发布。它基于“约定大于配置”(Convention over configuration)这一理念来快速地开发,测试,运行和部署 Spring 应用,并能通过简单地与各种启动器(如springbootwebstarter)结合,让应用直接以命令行的方式运行,不需再部署到独立容器中。Spring Boot 的出现可以说是 Spring 框架的第二春,它不但简化了开发的流程,目前更是事实标准。下面这幅图可以看出相同功能的 Spring 和 Spring Boot 的代码实现对比。 Apache RocketMQ 是一款是业界知名的分布式消息和流处理中间件,它主要功能是消息分发、异步解耦、削峰填谷等。RocketMQ 是一款金融级消息及流数据平台,RocketMQ 在交易、支付链路上用的很多,主要是对消息链路质量要求非常高的场景,能够支持万亿级消息洪峰。RocketMQ 在业务消息中被广泛应用,并衍生出顺序消息、事务消息、延迟消息等匹配各类业务场景的特殊消息。 本文的主角就是 Spring 和 RocketMQ,那几乎每个 Java 程序员都会使用 Spring 框架与支持丰富业务场景的 RocketMQ 会碰撞出怎么样的火花? 二、RocketMQ 与 Spring 的碰撞 在介绍 RocketMQ 与 Spring 故事之前,不得不提到 Spring 中的两个关于消息的框架,Spring Messaging 和 Spring Cloud Stream。它们都能够与 Spring Boot 整合并提供了一些参考的实现。和所有的实现框架一样,消息框架的目的是实现轻量级的消息驱动的微服务,可以有效地简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。 1. Spring Messaging Spring Messaging 是 Spring Framework 4 中添加的模块,是 Spring 与消息系统集成的一个扩展性的支持。它实现了从基于 JmsTemplate 的简单的使用 JMS 接口到异步接收消息的一整套完整的基础架构,Spring AMQP 提供了该协议所要求的类似的功能集。在与 Spring Boot 的集成后,它拥有了自动配置能力,能够在测试和运行时与相应的消息传递系统进行集成。单纯对于客户端而言,Spring Messaging 提供了一套抽象的 API 或者说是约定的标准,对消息发送端和消息接收端的模式进行规定,比如消息 Messaging 对应的模型就包括一个消息体 Payload 和消息头 Header。不同的消息中间件提供商可以在这个模式下提供自己的 Spring 实现:在消息发送端需要实现的是一个 XXXTemplate 形式的 Java Bean,结合 Spring Boot 的自动化配置选项提供多个不同的发送消息方法;在消息的消费端是一个 XXXMessageListener 接口(实现方式通常会使用一个注解来声明一个消息驱动的 POJO),提供回调方法来监听和消费消息,这个接口同样可以使用 Spring Boot 的自动化选项和一些定制化的属性。 在 Apache RocketMQ 生态中,RocketMQSpringBootStarter(下文简称 RocketMQSpring)就是一个支持 Spring Messaging API 标准的项目。该项目把 RocketMQ 的客户端使用 Spring Boot 的方式进行了封装,可以让用户通过简单的 annotation 和标准的 Spring Messaging API 编写代码来进行消息的发送和消费,也支持扩展出 RocketMQ 原生 API 来支持更加丰富的消息类型。在 RocketMQSpring 毕业初期,RocketMQ 社区同学请 Spring 社区的同学对 RocketMQSpring 代码进行 review,引出一段罗美琪(RocketMQ)和春波特(Spring Boot)故事的佳话[1],著名 Spring 布道师 Josh Long 向国外同学介绍如何使用 RocketMQSpring 收发消息[2]。RocketMQSpring 也在短短两年时间超越 SpringKafka 和 SpringAMQP(注:两者均由 Spring 社区维护),成为 Spring Messaging 生态中最活跃的消息项目。 2. Spring Cloud Stream Spring Cloud Stream 结合了 Spring Integration 的注解和功能,它的应用模型如下: Spring Cloud Stream 框架中提供一个独立的应用内核,它通过输入(@Input)和输出(@Output)通道与外部世界进行通信,消息源端(Source)通过输入通道发送消息,消费目标端(Sink)通过监听输出通道来获取消费的消息。这些通道通过专用的 Binder 实现与外部代理连接。开发人员的代码只需要针对应用内核提供的固定的接口和注解方式进行编程,而不需要关心运行时具体的 Binder 绑定的消息中间件。 在运行时,Spring Cloud Stream 能够自动探测并使用在 classpath 下找到的 Binder。这样开发人员可以轻松地在相同的代码中使用不同类型的中间件:仅仅需要在构建时包含进不同的 Binder。在更加复杂的使用场景中,也可以在应用中打包多个 Binder 并让它自己选择 Binder,甚至在运行时为不同的通道使用不同的 Binder。 Binder 抽象使得 Spring Cloud Stream 应用可以灵活的连接到中间件,加之 Spring Cloud Stream 使用利用了 Spring Boot 的灵活配置配置能力,这样的配置可以通过外部配置的属性和 Spring Boot 支持的任何形式来提供(包括应用启动参数、环境变量和 application.yml 或者 application.properties 文件),部署人员可以在运行时动态选择通道连接 destination(例如,RocketMQ 的 topic 或者 RabbitMQ 的 exchange)。 Spring Cloud Stream 屏蔽了底层消息中间件的实现细节,希望以统一的一套 API 来进行消息的发送/消费,底层消息中间件的实现细节由各消息中间件的 Binder 完成。Spring 官方实现了 Rabbit binder 和 Kafka Binder。Spring Cloud Alibaba 实现了 RocketMQ Binder[3],其主要实现原理是把发送消息最终代理给了 RocketMQSpring 的 RocketMQTemplate,在消费端则内部会启动 RocketMQSpring Consumer Container 来接收消息。以此为基础,Spring Cloud Alibaba 还实现了 Spring Cloud Bus RocketMQ, 用户可以使用 RocketMQ 作为 Spring Cloud 体系内的消息总线,来连接分布式系统的所有节点。通过 Spring Cloud Stream RocketMQ Binder,RocketMQ 可以与 Spring Cloud 生态更好的结合。比如与 Spring Cloud Data Flow、Spring Cloud Funtion 结合,让 RocketMQ 可以在 Spring 流计算生态、Serverless(FaaS) 项目中被使用。 如今 Spring Cloud Stream RocketMQ Binder 和 Spring Cloud Bus RocketMQ 做为 Spring Cloud Alibaba 的实现已登陆 Spring 的官网[4],Spring Cloud Alibaba 也成为 Spring Cloud 最活跃的实现。 三、如何在 Spring 生态中选择 RocketMQ 实现? 通过介绍 Spring 中的消息框架,介绍了以 RocketMQ 为基础与 Spring 消息框架结合的几个项目,主要是 RocketMQSpring、Spring Cloud Stream RocketMQ Binder、Spring Cloud Bus RocketMQ、Spring Data Flow 和 Spring Cloud Function。它们之间的关系可以如下图表示。 如何在实际业务开发中选择相应项目进行使用?下面分别列出每个项目的特点和使用场景。 1. RocketMQSpring 特点: 作为起步依赖,简单引入一个包就能在 Spring 生态用到 RocketMQ 客户端的所有功能。 利用了大量自动配置和注解简化了编程模型,并且支持 Spring Messaging API。 与 RocketMQ 原生 Java SDK 的功能完全对齐。 使用场景: 适合在 Spring Boot 中使用 RocketMQ 的用户,希望能用到 RocketMQ 原生 java 客户端的所有功能,并通过 Spring 注解和自动配置简化编程模型。 2. Spring Cloud Stream RocketMQ Binder 特点: 屏蔽底层 MQ 实现细节,上层 Spring Cloud Stream 的 API 是统一的。如果想从 Kafka 切到 RocketMQ,直接改个配置即可。 与 Spring Cloud 生态整合更加方便。比如 Spring Cloud Data Flow,这上面的流计算都是基于 Spring Cloud Stream;Spring Cloud Bus 消息总线内部也是用的 Spring Cloud Stream。 Spring Cloud Stream 提供的注解,编程体验都是非常棒。 使用场景: 在代码层面能完全屏蔽底层消息中间件的用户,并且希望能项目能更好的接入 Spring Cloud 生态(Spring Cloud Data Flow、Spring Cloud Funtcion 等)。 3. Spring Cloud Bus RocketMQ 特点: 将 RocketMQ 作为事件的“传输器”,通过发送事件(消息)到消息队列上,从而广播到订阅该事件(消息)的所有节点上,完成事件的分发和通知。 使用场景: 在 Spring 生态中希望用 RocketMQ 做消息总线的用户,可以用在应用间事件的通信,配置中心客户端刷新等场景。 4. Spring Cloud Data Flow 特点: 以 Source/Processor/Sink 组件进行流式任务处理。RocketMQ 作为流处理过程中的中间存储组件。 使用场景: 流处理,大数据处理场景。 5. Spring Cloud Function 特点: 消息的消费/生产/处理都是一次函数调用,融合 Java 生态的 Function 模型。 使用场景: Serverless 场景。 本文整体介绍了在 Spring 生态中接入 RockeMQ 的 5 种方法,让各位开发者对几种经典场景有宏观的了解。后续会有专栏详细介绍上述各个项目的具体使用方法和应用场景,真正地在 Spring 生态中玩转 RocketMQ! 在 Spring 生态中玩转 RocketMQ 系列教程现已登陆知行动手实验室,立即体验! 移动端同学,需要在PC端登录 start.aliyun.com 进行体验。 相关链接:
#社区动态 #微服务

2021年2月2日

RocketMQ-Spring 毕业两周年,为什么能成为 Spring 生态中最受欢迎的 messaging 实现?
2019 年 1 月,孵化 6 个月的 RocketMQSpring 作为 Apache RocketMQ 的子项目正式毕业,发布了第一个 Release 版本 2.0.1。该项目是把 RocketMQ 的客户端使用 Spring Boot 的方式进行了封装,可以让用户通过简单的 annotation 和标准的 Spring Messaging API 编写代码来进行消息的发送和消费。当时 RocketMQ 社区同学请 Spring 社区的同学对 RocketMQSpring 代码进行 review,引出一段。 时隔两年,RocketMQSpring 正式发布 2.2.0。在这期间,RocketMQSpring 迭代了数个版本,以 RocketMQSpring 为基础实现的 Spring Cloud Stream RocketMQ Binder、Spring Cloud Bus RocketMQ 登上了 ,Spring 布道师 baeldung 向国外同学介绍,越来越多国内外的同学开始使用 RocketMQSpring 收发消息,RocketMQSpring 仓库的 star 数也在短短两年时间内超越了 SpringKafka 和 SpringAMQP(注:两者均由 Spring 社区维护),成为 Apache RocketMQ 最受欢迎的生态项目之一。 RocketMQSpring 的受欢迎一方面得益于支持丰富业务场景的 RocketMQ 与微服务生态 Spring 的完美契合,另一方面也与 RocketMQSpring 本身严格遵循 Spring Messaging API 规范,支持丰富的消息类型分不开。 遵循 Spring Messaging API 规范 Spring Messaging 提供了一套抽象的 API,对消息发送端和消息接收端的模式进行规定,不同的消息中间件提供商可以在这个模式下提供自己的 Spring 实现:在消息发送端需要实现的是一个 XXXTemplate 形式的 Java Bean,结合 Spring Boot 的自动化配置选项提供多个不同的发送消息方法;在消息的消费端是一个 XXXMessageListener 接口(实现方式通常会使用一个注解来声明一个消息驱动的 POJO),提供回调方法来监听和消费消息,这个接口同样可以使用 Spring Boot 的自动化选项和一些定制化的属性。 1. 发送端 RocketMQSpring 在遵循 Spring Messaging API 规范的基础上结合 RocketMQ 自身的功能特点提供了相应的 API。在消息的发送端,RocketMQSpring 通过实现 RocketMQTemplate 完成消息的发送。如下图所示,RocketMQTemplate 继承 AbstractMessageSendingTemplate 抽象类,来支持 Spring Messaging API 标准的消息转换和发送方法,这些方法最终会代理给 doSend 方法,doSend 方法会最终调用 syncSend,由 DefaultMQProducer 实现。 除 Spring Messaging API 规范中的方法,RocketMQTemplate 还实现了 RocketMQ 原生客户端的一些方法,来支持更加丰富的消息类型。值得注意的是,相比于原生客户端需要自己去构建 RocketMQ Message(比如将对象序列化成 byte 数组放入 Message 对象),RocketMQTemplate 可以直接将对象、字符串或者 byte 数组作为参数发送出去(对象序列化操作由 RocketMQSpring 内置完成),在消费端约定好对应的 Schema 即可正常收发。 RocketMQTemplate Send API: SendResult syncSend(String destination, Object payload) SendResult syncSend(String destination, Message message) void asyncSend(String destination, Message message, SendCallback sendCallback) void asyncSend(String destination, Message message, SendCallback sendCallback) …… 2. 消费端 在消费端,需要实现一个包含 @RocketMQMessageListener 注解的类(需要实现 RocketMQListener 接口,并实现 onMessage 方法,在注解中进行 topic、consumerGroup 等属性配置),这个 Listener 会一对一的被放置到 DefaultRocketMQListenerContainer 容器对象中,容器对象会根据消费的方式(并发或顺序),将 RocketMQListener 封装到具体的 RocketMQ 内部的并发或者顺序接口实现。在容器中创建 RocketMQ DefaultPushConsumer 对象,启动并监听定制的 Topic 消息,完成约定 Schema 对象的转换,回调到 Listener 的 onMessage 方法。 @Service @RocketMQMessageListener(topic = "demo.rocketmq.topic", consumerGroup = "string_consumer", selectorExpression = "{demo.rocketmq.tag}") public class StringConsumer implements RocketMQListener { @Override public void onMessage(String message) { System.out.printf(" StringConsumer received: %s \n", message); } } 除此 Push 接口之外,在最新的 2.2.0 版本中,RocketMQSpring 实现了 RocketMQ Lite Pull Consumer。通过在配置文件中进行 consumer 的配置,利用 RocketMQTemplate 的 Recevie 方法即可主动 Pull 消息。 配置文件resource/application.properties: rocketmq.nameserver=localhost:9876 rocketmq.consumer.group=mygroup1 rocketmq.consumer.topic=test Pull Consumer代码: while(!isStop) { List messages = rocketMQTemplate.receive(String.class); System.out.println(messages); } 丰富的消息类型 RocketMQ Spring 消息类型支持方面与 RocketMQ 原生客户端完全对齐,包括同步/异步/oneway、顺序、延迟、批量、事务以及 RequestReply 消息。在这里,主要介绍较为特殊的事务消息和 requestreply 消息。 1. 事务消息 RocketMQ 的事务消息不同于 Spring Messaging 中的事务消息,依然采用 RocketMQ 原生事务消息的方案。如下所示,发送事务消息时需要实现一个包含 @RocketMQTransactionListener 注解的类,并实现 executeLocalTransaction 和 checkLocalTransaction 方法,从而来完成执行本地事务以及检查本地事务执行结果。 // Build a SpringMessage for sending in transaction Message msg = MessageBuilder.withPayload(..)...; // In sendMessageInTransaction(), the first parameter transaction name ("test") // must be same with the @RocketMQTransactionListener's member field 'transName' rocketMQTemplate.sendMessageInTransaction("testtopic", msg, null); // Define transaction listener with the annotation @RocketMQTransactionListener @RocketMQTransactionListener class TransactionListenerImpl implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { // ... local transaction process, return bollback, commit or unknown return RocketMQLocalTransactionState.UNKNOWN; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { // ... check transaction status and return bollback, commit or unknown return RocketMQLocalTransactionState.COMMIT; } } 在 2.1.0 版本中,RocketMQSpring 重构了事务消息的实现,如下图所示,旧版本中每一个 group 对应一个 TransactionProducer,而在新版本中改为每一个 RocketMQTemplate 对应一个 TransationProducer,从而解决了并发使用多个事务消息的问题。当用户需要在单进程使用多个事务消息时,可以使用 ExtRocketMQTemplate 来完成(一般情况下,推荐一个进程使用一个 RocketMQTemplate,ExtRocketMQTemplate 可以使用在同进程中需要使用多个 Producer / LitePullConsumer 的场景,可以为 ExtRocketMQTemplate 指定与标准模版 RocketMQTemplate 不同的 nameserver、group 等配置),并在对应的 RocketMQTransactionListener 注解中指定 rocketMQTemplateBeanName 为 ExtRocketMQTemplate 的 BeanName。 2. RequestReply 消息 在 2.1.0 版本中,RocketMQSpring 开始支持 RequestReply 消息。RequestReply 消息指的是上游服务投递消息后进入等待被通知的状态,直到消费端返回结果并返回给发送端。在 RocketMQSpring 中,发送端通过 RocketMQTemplate 的 sendAndReceivce 方法进行发送,如下所示,主要有同步和异步两种方式。异步方式中通过实现 RocketMQLocalRequestCallback 进行回调。 // 同步发送request并且等待String类型的返回值 String replyString = rocketMQTemplate.sendAndReceive("stringRequestTopic", "request string", String.class); // 异步发送request并且等待User类型的返回值 rocketMQTemplate.sendAndReceive("objectRequestTopic", new User("requestUserName",(byte) 9), new RocketMQLocalRequestCallback() { @Override public void onSuccess(User message) { …… } @Override public void onException(Throwable e) { …… } }); 在消费端,仍然需要实现一个包含 @RocketMQMessageListener 注解的类,但需要实现的接口是 RocketMQReplyListener 接口(普通消息为 RocketMQListener 接口),其中 T 表示接收值的类型,R 表示返回值的类型,接口需要实现带返回值的 onMessage 方法,返回值的内容返回给对应的 Producer。 @Service @RocketMQMessageListener(topic = "stringRequestTopic", consumerGroup = "stringRequestConsumer") public class StringConsumerWithReplyString implements RocketMQReplyListener { @Override public String onMessage(String message) { …… return "reply string"; } } RocketMQSpring 遵循 Spring 约定大于配置(Convention over configuration)的理念,通过启动器(Spring Boot Starter)的方式,在 pom 文件引入依赖(groupId:org.apache.rocketmq,artifactId:rocketmqspringbootstarter)便可以在 Spring Boot 中集成所有 RocketMQ 客户端的所有功能,通过简单的注解使用即可完成消息的收发。在 中有更加详细的用法和常见问题解答。 据统计,从 RocketMQSpring 发布第一个正式版本以来,RocketMQSpring 完成 16 个 bug 修复,37 个 imporvement,其中包括事务消息重构,消息过滤、消息序列化、多实例 RocketMQTemplate 优化等重要优化,欢迎更多的小伙伴能参与到 RocketMQ 社区的建设中来,罗美琪(RocketMQ)和春波特(Spring Boot)的故事还在继续...钉钉搜索群号:21982288,即可进群和众多开发者交流!
作者:RocketMQ 官微
#社区动态 #微服务

2021年1月6日

再见 2020!Apache RocketMQ 发布 4.8.0,DLedger 模式全面提升!
“童年的雨天最是泥泞,却是记忆里最干净的曾经。凛冬散尽,星河长明,新的一年,万事顺遂,再见,2020!” 走过这个岁末,万众期待的 了,在这个版本中社区对 RocketMQ 完成大量的优化和问题修复。更重要的是,该版本从性能、稳定性、功能三个方面大幅度提升 DLedger 模式能力。 是 中一个基于 Raft 的 CommitLog 存储库实现,从 RocketMQ 4.5.0 版本开始,RocketMQ 引入 DLedger 模式来解决了 Broker 组内自动故障转移的问题,而在 4.8.0 版本中社区也对 RocketMQ DLedger 模式进行了全面升级。 性能升级 异步化 pipeline 模式 RocketMQ 4.7.0 重新升级了同步双写的架构,利用异步化 pipeline 模式大幅提升了同步双写的性能。在 RocketMQ 4.8.0 中,社区将这一改进应用到 DLedger 模式中, 下图展示了 DLedger 模式下 broker 处理发送消息的过程。在原本的架构中, SendMessageProcessor 线程对每一个消息的处理,都需要等待多数派复制成功确认,才会返回给客户端,而在新版本中,利用 CompletableFuture 对线程处理消息的过程进行异步化改造,不再等待多数派的确认即可对下一个请求进行处理,Ack 操作由其他线程确认之后再进行结果处理并返回给客户端。通过对复制过程进行切分并将其流水线化,减少线程的长时间等待,充分利用 CPU,从而大幅提高吞吐量。 批量日志复制 Batch 一直是性能优化的重要方式,在新版本中,可以通过设置 isEnableBatchPush=true 来开启 DLedger 模式的批量复制。通过将多条数据聚合在一个包中进行发送,可以降低收发包的个数,从而降低系统调用和上下文的切换。在数据发送压力比较大,并且可能达到系统收发包瓶颈的情况下,批量复制能显著提高吞吐量。值得注意的是,DLedger 模式下的批量复制并不会对单个包进行延时的攒批处理,因此不会影响单个消息的发送时延。 除了上述的性能优化,社区还对 DLedger 模式下影响性能的锁、缓存等做了数项性能优化,使 DLedger 模式下的性能提升数倍。 稳定性升级 为了验证和测试 Dledger 模式的可靠性,除了本地对 DLedger 模式进行了各种各样的测试,社区利用 框架对 RocketMQ DLedger 模式进行了大量 Chaos 测试。OpenMessagingChaos 是一个利用故障注入来验证各种消息平台一致性和高可用性的测试框架,在 OpenMessagingChaos 的测试中,客户端并发地向待测试集群发送和接收消息,中间会间隔性地对集群进行故障注入,最后给出测试结果,包括是否丢消息,是否有重复消息,集群平均故障恢复时间等。利用 OpenMessagingChaos,我们验证了 DLedger 模式在以下故障注入场景下的表现: randompartition(fixedpartition)故障随机挑选节点进行网络隔离,模拟常见的对称网络分区。 randomloss 故障随机挑选节点并对这些节点接收和发送的网络包进行按比例丢弃,模拟一些节点网络较差的情况。 randomkill(minorkill、majorkill、fixedkill)故障模拟常见的进程崩溃情况。 randomsuspend(minorsuspend、majorsuspend、fixedsuspend)故障模拟一些慢节点的情况,比如发生 Full GC、OOM 等。 bridge 和 partitionmajoritiesring 故障模拟比较极端的非对称网络分区。 以 minorkill 故障注入为例,我们部署 5 个节点组成一组 DLedger 模式的 RocketMQ broker 进行 Chaos 测试。minorkill 故障注入会随机挑选集群中少数节点进程杀死,由于杀死少数节点,即使集群不可用也能在一段时间内恢复,方便测试集群平均故障恢复时间。 测试过程中我们设置四个客户端并发向 RocketMQ DLedger 集群发送和接收消息,故障注入时间间隔为 100s,即 100s 正常运行,100s 故障注入,一直循环,整个阶段一共持续 2400s。测试结束后,OpenMessagingChaos 会给出测试结果和时延图。下图展示了整个测试过程中入队操作的时延情况。 图中纵坐标是是时延,横坐标是测试时间,绿色框表示数据发送成功,红色框表示数据发送失败,蓝色框表示不确定是否数据添加成功,灰色部分表示故障注入的时间段。可以看出一些故障注入时间段造成了集群短暂的不可用,一些故障时间段则没有,这是合理的。由于是随机网络分区,所以只有杀死的少数节点包含 leader 节点时才会造成集群重新选举,但即使造成集群重新选举, DLedger 模式在一段时间后也会恢复可用性。 下图是测试结束后 OpenMessagingChaos 给出的统计结果,可以看到一共成功发送了 11W 个数据,没有数据丢失,这表明即使在故障下,RocketMQ DLedger 模式仍旧满足 At Least Once 的投递语义。此外,RTOTestResult 表明 12 次故障时间段一共发生了 3 次集群不可用的情况(与上述时延图一致),但每次集群都能在 30 秒以内恢复,平均故障恢复时间在 22 秒左右。 在 OpenMessaging Chaos 测试过程中,我们发现了 DLedger 模式存在的数个隐性问题并进行了修复,提高了 DLedger 模式下对进程异常崩溃、慢节点、对称/非对称网络分区、网络丢包的容错能力,也进一步验证了 DLedger 模式的可靠性。 功能升级 DLedger 模式支持 Preferred Leader 在旧版本中一组 Broker 中选谁作为 Leader 完全是随机的。但是在新版中我们可以通过配置 preferredLeaderId 来指定优先选举某个节点为 Leader,如下图所示,通过在三个机房部署 DLedger 模式的 broker 组,利用 preferred leader 可以更好的实现机房对齐的功能,图中 DC1 中服务更好,我们可以优先选择在 DC1 中部署 Master。此外,利用 preferred leader 还可以更好实现 DLedger 集群混部,从而充分利用机器资源。 DLedger 模式支持批量消息 从 RocketMQ 4.8.0 开始,DLedger 模式支持批量消息发送,从而在消息类型的支持上完全与 MasterSlave 部署形态保持一致。 除了对 DLedger 模式的大量优化,本次 RocketMQ 版本一共包含 Improvement 25 个,Bug Fix 11 个,文档和代码格式优化 16 个。据不完全统计,这些贡献来自近 40 位 RocketMQ 社区的 Contributor,感谢大家的积极参与。也非常期待有更多的用户、厂商、开发者参与到 RocketMQ 建设中来,加入 Apache RocketMQ 社区,永远不会太迟!
作者:RocketMQ社区
#社区动态 #高可用