2022年3月18日

EventBridge 事件总线及 EDA 架构解析
作为 Gartner 定义的 10 大战略技术趋势之一,事件驱动架构(EDA)逐渐成为主流技术架构。根据 Gartner 的预估,在新型数字化商业的解决方案中,将有 60%使用 EDA,在商业组织参与的技术栈中,EDA 有一半的占比。 当下比较成功的企业已然认识到,要想最大限度提升运营效率和客户体验,务必要将业务和技术两方面的举措紧密结合起来。运营事件或业务形势的变化是时下众多企业关注的焦点,这些变化能够为企业领导者带来切实有用的信息,而架构设计的主旨恰恰是从客户联系人、交易、运营等方面的信息中获取洞见,两者相辅相成。传统技术历来对企业从事件中获取洞见的速度有着诸多限制,比如用于记录、收集和处理此类事件的批处理 ETL(提取、转换、加载)等。基于以上背景,阿里云 EventBridge 应运而生。 EventBridge 是事件驱动的具体落地产品,也是 EDA 的最佳实践方式。 事件驱动(EDA)是什么 早在 2018 年,Gartner 评估报告将 EventDriven Model 列为 10 大战略技术趋势之一,事件驱动架构(EDA)将成为未来微服务的主流。该报告同时做出了以下断言: 到 2022 年,事件通知的软件模型将成为超过 60% 的新型数字化商业的解决方案; 到 2022 年,超过 50% 的商业组织将参与到事件驱动的数字化商业服务的生态系统当中。 很喜欢 George Santayana 在《 The Life of Reason》说的一句话 Those who fail to learn History are doomed to repeat it.(不懂历史的人注定会重蹈覆辙)。我们以史为鉴,来看看为什么会架构会演进到事件驱动。 上图是关于架构演进时间轴线。架构本身没有优劣之分,它本身就是一组技术决策,决定后续项目的所有功能开发(框架,编码规范,文档,流程….),所以这里不谈选型好坏,只谈为什么会引入某些框架,这个框架解决了软件开发中的什么问题。 单体架构:在单节点服务中,单体应用的所有模块都封装在单个进程运行,通信通过相同堆栈调用完成。这种模式下非常容易导致结构和关系不明确,难以对系统进行更改和重构。就像一个不透明的,粘稠的,脆弱的,僵硬的 Big Ball of Mud! 分层架构:在经典的分层架构中,层以相当谨慎的方式使用。即一个层只能知道它下方层的数据。在随后的实际应用中,更多的方式是一个层可以访问它下面的任何层。分层架构解决了单体架构的的逻辑分离问题,每一层都可以被等效替换,是用层区分也更加标准化,同时一个层可以被几个不同/更高级别的层使用。当然,层也有比较明显的缺点,层不能封装掉一切,比如添加到 UI 的某个字段,可能也需要添加到 DB,而且额外多余的层会严重损害系统性能。 MVC 架构:MVC 架构产生的原因其实很简单,随着业务系统的复杂性增加,之前所谓“全栈工程师”已经不适用大部分场景。为了降低前端和后台的集成复杂性,故而开始推广 MVC 架构。其中,Model 代表业务逻辑;View 代表视图层,比如前端 UI 的某个小组件;Controller 提供 View 和 Model 的协调,比如将用户某项操作转为业务逻辑等。此外还有很多扩展架构,譬如 ModelViewPresenter,ModelViewPresenterViewModel,ResourceMethodRepresentation,ActionDomainResponder 就不在细说了,感兴趣的同学可以 wiki 搜索下。 EBI 架构:即 Entity,Boundary(接口),Interactor (控制)。EBI 架构将系统边界视为完整连接,而不仅仅是视图,控制器或接口。EBI 的实体代表持有数据并结束相关行为的实际实体,很类似阿里云的 POP API。EBI 主要还是后端概念,它是与 MVC 相辅相成的。 洋葱架构:洋葱架构是一种低耦合,高内聚的架构模型。所有的应用程序围绕独立的对象模型构建,内层定义接口,外层实现接口,耦合方向向中心内聚,所有代码都可以独立与基础设施进行编译和运行。 SOA 架构:SOA 是 Service Orientated Architure 的缩写,即面向服务架构。表示每一个功能都是通过一个独立的服务来提供,服务定义了明确的可调用接口,服务之间的编排调用可完成一个完整的业务。其实这个架构也是目前架构中最成熟的,日常使用最多的架构模式。 在介绍完之前全部的架构趋势后,在回过头看看什么是 EDA 架构。 EDA 事件驱动架构( EventDriven Architecture ) 是一种系统架构模型,它的核心能力在于能够发现系统“事件”或重要的业务时刻(例如交易节点、站点访问等)并实时或接近实时地对相应的事件采取必要行动。这种模式取代了传统的“ request/response ”模型,在这种传统架构中,服务必须等待回复才能进入下一个任务。事件驱动架构的流程是由事件提供运行的。 上图其实很好的解释了 EDA 架构的模型,但是其实还不够明确,所以这里我们和单体架构一起对比看看他们之间差异。 在如上对比图中,我们其实可以较为清楚看到它与传统架构的区别。在一般传统架构中,创建订单操作发生后,一系列的操作其实都是通过一个系统完成的。而事件驱动的概念则是将全部操作都转换为 “事件” 概念,下游通过捕获某个 “事件” 来决定调用什么系统完成什么样的操作。 我们回过头来看“事件”,刚刚介绍中比较的重要部分其实是将操作转换为某类事件进行分发。那这的事件我们怎么定义呢? 简单来看,其实事件就是状态的显著变化,当用户采取特定行动时触发。以 4S 店售卖汽车为例: 当客户购买汽车并且其状态从 For Sale 变为 Sold 是一个事件; 成功交易后,从帐户中扣除金额是一个事件; 单击预订试驾后,从将预约信息添加到指定用户就是一个事件; 每个事件都可能触发一个或多个选项作为响应。 事件其实云原生 CNCF 基金会在 2018 年托管了开源 CloudEvents 项目,该项目旨在用统一和规范的格式来描述事件,来加强不同的服务、平台以及系统之间的互操作性。在该项目定义下,通用的事件规范是这样的: 事件主要由 Json 体构成,通过不同字段描述发生的事件。 总结来看,事件驱动其实是将比较重要的业务时刻封装成“事件”,并通过某个 EventBus 将事件路由给下游系统。 了解了 EDA 架构的整个处理过程,但是还没解决这个所谓的“EventBus”到底是什么? 如上图就是 EventBus 的核心逻辑架构,它由 Event Producer 和 Event Consumer 两端组成,通过 Bus 解耦中间环节,是不是非常像某个传统的 MQ 架构?别着急,在接下来的落地实践部分会讲解这个架构的复杂部分。 EDA 架构的落地实践思考 在开始介绍落地实践时,我们先来看一个经典的 EDA 架构模型: 这是一个非常经典 EDA 订单架构,该架构主要使用了 EventBridge 和 FC 函数计算(如果不太熟悉 FaaS 的同学可以把 FC 节点当作 ECS 或 Kubernetes 的某个 POD 节点),通过事件驱动各个业务进行协作。 所以这块的中心节点(EventBridge)其实有三个比较重要的能力: 1. For Event Capturing(事件收集):具备采集事件的能力; 2. For Routing(事件路由):通过事件内容将事件路由分发至于下游的能力; 3. For Event Processing(事件过滤/替换):对事件进行脱敏或初步过滤&筛选的能力。 通常情况下,要实现这三个能力是比较困难的,比如:Event Capturing 可能需要熟悉 Dell Boomi, Snaplogic, MuleSoft, Dataflow, Apache Apex 等,Routing 部分可能通过 RocketMQ、RabbitMQ、ActiveMQ、Apache Kafka,Event Processing 需要了解 Apache Storm, Apache Flink 。所以之前讲的逻辑架构其实非常理想,要想实现完成的 EDA 事件驱动还需要包括这些核心能力。 其实,从刚刚的架构中我们也能窥探到一些信息,EDA 架构其实看起来没有那么简单,那它有何优劣呢? 下面简单罗列下 EDA 架构在实践中的优势: 松耦合:事件驱动架构是高度松耦合且高度分布式的架构模型,事件的创建者(来源)只知道发生的事件,并不知道事件的处理方式,也关心有多少相关方订阅该事件; 异步执行:EDA 架构是异步场景下最适合的执行工具,我们可以将需要事件保留在队列中,直到状态正常后执行; 可扩展性:事件驱动架构可以通过路由&过滤能力快速划分服务,提供更便捷的扩展与路由分发; 敏捷性:事件驱动架构可以通过将事件分发至任何地方,提供更敏捷高效的部署方案。 当然,劣势也很明显: 架构复杂:事件驱动架构复杂,路由节点多,系统结成复杂,功能要求多; 路由分发难:事件路由分发难,灵活的事件路由需要依赖强大的实时计算能力,对整体分发系统要求较高; 无法追踪:事件追踪是整个 EDA 架构的保证,EDA 架构中往往很难追踪到事件处理状态,需要大量的定制化开发; 可靠性差:事件驱动由于需要多系统集成,可靠性通常较差,且交付无法保障。 _ 针对 EDA 场景面临的这些问题,阿里云推出了 EventBridge,一款无服务器事件总线服务,其使命是作为云事件的枢纽,以标准化的 CloudEvents 1.0 协议连接云产品和应用、应用和应用,提供中心化的事件治理和驱动能力,帮助用户轻松构建松耦合、分布式的事件驱动架构;另外,在阿里云之外的云市场上有海量垂直领域的 SaaS 服务,EventBridge 将以出色的跨产品、跨组织以及跨云的集成与被集成能力,助力客户打造一个完整的、事件驱动的、高效可控的上云体验。 阿里云对 EventBridge 做了定义,核心价值包括: 统一事件枢纽:统一事件界面,定义事件标准,打破云产品事件孤岛; 事件驱动引擎:海量事件源,毫秒级触发能力,加速 EDA/Serverless 架构升级; 开放与集成:提供丰富的跨产品、跨平台连接能力,促进云产品、应用程序、SaaS 服务相互集成。 下面从架构层面和功能层面对 EventBridge 进行介绍: 架构层面 针对架构复杂问题,EventBridge 提供业内通用的 Source ,Buses,Rules,Targets 模块管理能力,同时支持 EventBus 和 EventStream 两种模式,大幅度降低事件驱动架构难度。 1)事件总线模型经典 EDA( 事件驱动)场景的 N:N 模型,提供多事件路由,事件匹配,事件转换等核心能力,帮助开发者快速搭建事件驱动架构。 2)事件流模型标准 Streaming(1:1) 流式处理场景,无总线概念,用于端到端的数据转储,数据同步及数据处理等,帮助轻松构建云上端到端的数据管道服务。 功能层面 在功能层面,EventBridge 的核心亮点应用包括: 1)事件规则驱动 针对基于事件的路由分发,EventBridge 通过事件规则驱动,支持 8 大事件模式,4 重转换器,满足路由分发的全部诉求。 2)事件追踪 针对事件无法追踪,独家提供事件追踪能力,事件分析/查询能力。为用户完善的全链路事件查询分析能力。 3)DLQ/重试机制、事件全流程触发 针对可靠性差,支持 DLQ/重试机制,与事件全流程触发,大幅度保证由于用户下游系统导致的事件故障与延迟。 4)Schema 注册中心 针对事件管理复杂,支持 Schema 注册中心,支持事件信息的解释、预览和上下游代码生成能力,帮助用户低代码完成事件的收发处理。解决跨部门信息沟通困难,业务代码冗余等一系列事件管理问题。 5)同时,基于以上功能 EventBridge 支持对接 85 种以上的阿里云产品,847 种事件类型。 更多产品功能介绍,可访问 EventBridge 官网 阿里云 EventBridge 更多场景介绍 经典 EDA 事件驱动 事件总线(EventBridge)最重要的能力是通过连接应用程序、云服务和 Serverless 服务来构建 EDA(Eventdriven Architectures) 事件驱动架构,驱动应用与应用,应用与云的连接。 流式 ETL 场景 EventBridge 另一个核心能力是为流式的数据管道的责任,提供基础的过滤和转换的能力,在不同的数据仓库之间、数据处理程序之间、数据分析和处理系统之间进行数据同步/跨地域备份等场景,连接不同的系统与不同服务。 统一事件通知服务 EventBridge 提供丰富的云产品事件源与事件的全生命周期管理工具,您可以通过总线直接监听云产品产生的数据,并上报至监控,通知等下游服务。
作者:肯梦
#技术探索 #事件驱动架构

2022年3月11日

基于 EventBridge 构建 SaaS 应用集成方案
引言 事件驱动架构(EDA)是一种以事件为纽带,将不同系统进行解耦的异步架构设计模型。在 EDA 中,事件驱动的运行流程天然地划分了各个系统的业务语义,用户可以根据需求对事件与针对此事件做出的响应灵活定制,这使得基于 EDA 架构可以方便地构建出高伸缩性的应用。据 Daitan Group 的调研报告,早在 2017 年,例如 UBER、Deliveroo、Monzo 等公司就已经采用了 EDA 去设计他们的系统。 为了便于用户更加轻松地开发以 EDA 为架构的应用,在 2020 年云栖大会上,阿里云正式推出了 EventBridge。EventBridge 是一款无服务器事件总线服务,能够以标准化的 CloudEvents 1.0 协议在应用之间路由事件。目前,EventBridge 已经集成了众多成熟的阿里云产品,用户可以低代码甚至零代码完成各个阿里云产品和应用之间的打通,轻松高效地构建分布式事件驱动架构。 事件源是事件驱动的基石,如何获取更多事件源也是 EventBridge 一直在探索和尝试的方向。针对市场上其他云厂商和垂直领域的 Saas 服务,EventBridge 发布了 HTTP Source 能力,提供简单且易于集成的三方事件推送 ,帮助客户更加高效、便捷地实现业务上云。 HTTP Source 概述 接入 EventBridge 应用有多种情况:用户自定义应用、阿里云服务、其他云厂商服务或者其他 SaaS 产品。 对于用户自定义应用,用户可以使用 EventBridge 官方的 API 接口、多语言客户端以及 CloudEvents 社区的开源客户端来完成接入。 对于阿里云的云产品,EventBridge 原生支持,用户可以在默认事件总线中选择对应的云产品与其相关的触发事件。 而对于其他云厂商、SaaS 产品,EventBridge 同样也提供便捷的接入方式便于用户进行集成,HTTP Source 事件源便是一种典型的接入方式。 具体而言,HTTP Source 事件源是 EventBridge 支持的事件源的一种,它以 Webhook 形式暴露了发布事件的 HTTP 请求地址,用户可以在有 URL 回调的场景配置 HTTP Source 事件源,或者直接使用最简单的 HTTP 客户端来完成事件的发布。HTTP Source 事件源提供了支持 HTTP 与 HTTPS,公网与阿里云 VPC 等不同请求方式、不同网络环境的 Webhook URL,便于用户将其集成到各类应用中。接入时无需使用客户端,仅需保证应用可以访问到对应 Webhook URL 即可,这使得接入过程变得简单而高效。 在将 HTTP 请求转换为 CloudEvent 的时候,EventBridge 会将请求的头部和消息体部分置于 CloudEvent 字段中,其余字段会依据用户 EventBridge 资源属性以及系统默认规则进行填充。用户可以在事件规则中,对所需的内容进行过滤、提取,最终按照模板拼装成所需的消息内容投递给事件目标。 HTTP Source 事件源目前支持 3 种类型的安全设置,分别是请求方法、源 IP 以及请求来源域名。 请求方法:用户可以配置当前请求此事件源时合法的 HTTP 请求方法,如果方法类型不满足配置规则,请求将被过滤,不会投递到事件总线。 源 IP:用户可以设置允许访问此事件源时合法的源 IP(支持 IP 段和 IP),当请求源 IP 不在设置的范围内时,请求将被过滤,不会投递到事件总线。 请求来源域名:即 HTTP 请求的 referer 字段,当请求的 referer 与用户配置不相符时,请求被过滤,不会投递到事件总线。 抛砖引玉,下面就介绍如何使用 HTTP Source 来构建 SaaS 应用集成的最佳实践,帮助大家快速上手 SaaS 集成方案。 SaaS 集成最佳实践 钉钉监控 GitHub 代码推送事件 GitHub 提供了 Webhook 功能,代码仓库在发生某些特定操作(push、fork等)时,可以通过回调来帮助用户完成特定功能。针对多人开发的项目,将 GitHub 事件推送到特定钉钉群可以帮助成员有效关注代码变更,提高协同效率。 本节我们展示如何通过钉钉监控 GitHub 代码推送事件的最佳实践,主要包含以下几个步骤: 创建一个钉钉机器人; 创建 EventBridge 相关资源:事件总线、事件源(HTTP Source 类型)、事件规则、事件目标(钉钉); 创建自定义事件总线; 选择 GitHub 代码仓库创建 Webhook; 向 GitHub 代码仓库推送代码变更; 钉钉群接收此次代码推送相关信息。 1)创建钉钉机器人 参考钉钉官方文档[1],创建一个群机器人。创建群机器人时,安全设置请勾选“加签”并妥善保管密钥和稍后生成的机器人 Webhook 地址。 2)创建 EventBridge 相关资源 创建 EventBus 事件总线 创建事件源。事件源配置完成之后,点击跳过,我们接下来会专门配置事件规则与目标。 创建完成后,进入事件源详情页,保存刚刚生成的 Webhook URL。 在 EventBridge 控制台页面点击进入刚刚创建的 EventBus 详情页,在左侧一栏中“事件规则”选择“创建规则”。 创建时间目标。选择钉钉,并将钉钉机器人的 Webhook 地址和密钥填入,推送内容侧可以按照需求设计。 我们填写模板变量为: {"repo":"$.data.body.repository.full_name","branch":"$.data.body.ref","pusher":"$.data.body.pusher.name"} 模板为: {"msgtype": "text","text": {"content": "Github push event is triggered. repository: {repo}, git reference: {branch}, pusher: {pusher}." } } 3)在 GitHub 代码仓库创建 Webhook 登陆 GitHub,在 GitHub 代码仓库“setting”中选择左侧“Webhooks”,选择新建 Webhook。 在创建 Webhook 的配置项中填入 HTTP Source 事件源的 Webhook 地址,Content type 部分选择“application/json”,下方触发事件类型选择“Just the push event.”,随后点击“Add Webhook”,创建完成。 4)向 GitHub 代码仓库推送代码变更 本地仓库做一定变更,commit 后推送 GitHub。 5)钉钉群接收此次代码推送相关信息 _异步消费监控报警信息_ 业务上存在异步消费报警信息的场景,例如报警内容备份,根据报警频率自适应调整报警阈值等。而且对于多云业务的用户,如何将跨云服务的报警信息整合起来也是一个麻烦的问题。依托 HTTP Source,用户可以将不同云厂商(腾讯云、华为云等)、不同监控产品(Grafana、Zabbix、Nagios等)统一集成到 EventBridge 平台,以便于实现对报警信息的异步消费。 本节我们介绍如何使用 EventBridge 集成 Grafana,实现异步消费监控报警信息。Grafana 是一款开源数据可视化工具,也同时具有监控报警功能,具体使用可以参阅Grafana 官方文档[2]。本节主要包含以下步骤: 创建 MNS 队列; 创建 EventBridge 相关资源; Grafana 上配置 Webhook; 测试接收结果。 创建 MNS 队列 在 MNS 控制台,选择“队列列表创建队列”。 创建 EventBridge 相关资源 同上文所述,这里仅示例创建事件目标时相关配置。 Grafana 上配置 Webhook 点击 Grafana 控制台左侧“AlertingNotification channels”,选择“Add channel”。 在“type”一栏中选择“Webhook”,url 填写 HTTP Source 事件源的 Webhook 地址,点击下方“Test”。 测试接收结果 登陆 MNS 控制台,进入队列详情页,点击页面右上角“收发消息”,可以看到 MNS 已经接收到刚刚 Grafana 发送的消息。 点击对应消息详情可以看到消息内容,说明消息已经被成功消费。 _更多集成_ HTTP Source 支持的三方集成包括 Prometheus,Zabbix,Skywalking,Grafana,OpenFalcon,Cacti,Nagios,Dynatrace,Salesforce,Shopify,Gitee 等 SaaS 应用。通过简单配置 Webhook 无需开发既可实现事件接收能力。 _总结_ 本文重点介绍 EventBridge 的新特性:HTTP Source 事件源。作为一款无服务器事件总线服务,EventBridge 已经将阿里云云产品管控链路数据、消息产品业务数据整和到事件源生态中,提高了上云用户业务集成的便捷性,Open API 与多语言 sdk 的支持,为客户自身业务接入 EventBridge 提供了便利。 在此基础之上,HTTP Source 事件源更进一步,以 Webhook 形式开放了针对了其他云厂商、SaaS 应用的集成能力,无需代码改动,仅需要简单配置即可完成 EventBridge 集成操作。 _相关链接_
作者:昶风
#行业实践 #生态集成

2022年2月22日

EventBridge消息路由|高效构建消息路由能力
企业数字化转型过程中,天然会遇到消息路由,异地多活,协议适配,消息备份等场景。本篇主要通过 EventBridge 消息路由的应用场景和应用实验介绍,帮助大家了解如何通过 EventBridge 的消息路由高效构建消息路由能力。 背景知识 EventBridge 消息路由主要涉及以下云产品和服务: 事件总线 EventBridge 事件总线 EventBridge 是阿里云提供的一款无服务器事件总线服务,支持阿里云服务、自定义应用、SaaS 应用以标准化、中心化的方式接入,并能够以标准化的 CloudEvents 1.0 协议在这些应用之间路由事件,帮助您轻松构建松耦合、分布式的事件驱动架构。 消息队列 RabbitMQ 版 阿里云消息队列 RabbitMQ 版支持 AMQP 协议,完全兼容 RabbitMQ 开源生态以及多语言客户端,打造分布式、高吞吐、低延迟、高可扩展的云消息服务。开箱即用,用户无需部署免运维,轻松实现快速上云,阿里云提供全托管服务,更专业、更可靠、更安全。 消息队列 MNS 版 阿里云消息服务 MNS 版是一款高效、可靠、安全、便捷、可弹性扩展的分布式消息通知服务。MNS 能够帮助应用开发者在他们应用的分布式组件上自由的传递数据、通知消息,构建松耦合系统。 场景应用 EventBridge 消息路由功能在构建在构建消息系统过程中主要应用于下面三个场景,一是消息路由场景,二是消息多活场景,三是多协议适配场景,下面对这三个场景进行简要介绍。 消息路由场景 该场景是指希望对消息进行二次分发,通过简单过滤或者筛选将消息分发到其他 Topic 或跨地域 Topic,实现消息共享 & 消息脱敏的场景。 通过一层转发将消息分发给不同的 Topic 消费,是消息路由的核心能力。随着企业转型遇到消息拆分且做业务脱敏的场景会越来越多。如下图是一个较为典型的路由分流场景。 消息多活场景 消息多活场景指每个数据中心均部署了完整、独立的 MQ 集群。数据中心内的应用服务只连接本地的 MQ 集群,不连接其他单元的 MQ 集群。MQ 集群中包含的消息路由模块,负责在不同单元 MQ 集群之间同步指定主题的消息。 根据应用服务是否具有单元化能力,可分为中心服务和单元服务两类。中心服务只在一个数据中心提供服务;单元服务在各个数据中心都提供服务,但只负责符合规则的部分用户,而非全量用户。 所有部署了单元服务的数据中心都是一个单元,所有单元的单元服务同时对外提供服务,从而形成一个异地多活架构或者叫单元化架构。通过多活管控平台可动态调整各个单元服务负责的流量。 多协议适配场景 随着业务团队的逐渐庞大,对消息的建设诉求与日俱增,由于部门技术栈的不同会导致部门间的消息协议也不尽相同。多协议适配是指用一种消息协议平滑迁移到多种消息协议的能力。 架构描述 使用 EventBridge 的事件流能力做消息路由,事件流模型是 EventBridge 在消息领域主打的处理模型,适用标准 Streaming(1:1)流式处理场景,无总线概念。用于端到端的消息路由,消息转储,消息同步及处理等,帮助开发者轻松构建云上数据管道服务。 下面的架构展示了如何通过桥接 EventBridge 实现 MNS 消息路由至 RabbitMQ Queues,MNS Queues。(A/B 链路任选其一进行试验) 应用实验 目标 通过本实验教程的操作,您可以通过阿里云控制台,在事件总线控制台中创建消息路由服务,在 EventBridge 控制台实现消息路由与简单的消息脱敏。 体验此实验后,可以掌握的知识有: 创建消息路由任务; 创建 RabbitMQ 实例、MNS 实例与简单的消息发送。 资源 使用到的资源如下:(本次实验资源遵循最小原则,使用满足场景需求的最小化资源) 资源一:EventBridge 事件总线 资源二:阿里云消息队列 RabbitMQ 版 资源三:阿里云消息队列 MNS 版 步骤 1)创建 MNS 资源 本实验分 A /B 两个可选场景: A 、场景通过 MNS Queues1 投递至 MNS Queues2 B 、场景通过 MNS Queues1 投递至 RabbitMQ Queues 可根据兴趣选择不同场景。 本步骤将指导您如何通过控制台创建消息队列 MNS 版。 使用您自己的阿里云账号登录阿里云控制台,然后访问消息队列MNS版控制台。[1] 在控制台左边导航栏中,单击队列列表。(资源地域为同地域即可,本次引导默认选杭州) 在列表页面,单击创建队列并填写名称信息“testmnsq” 创建完成后点击“详情” 找到 MNS 公网接入点信息,并记住该信息,后续实验会用到。 E.g. 注意:重复如上步骤即可创建 A 实验链路的 “testmnsq2” 2)创建 RabbitMQ 资源(B 实验可选) 本步骤将指导您如何通过控制台创建消息队列 RabbitMQ 版。 使用您自己的阿里云账号登录阿里云控制台,然后访问消息队列RabbitMQ版控制台。[2] 在控制台左边导航栏中,单击实例列表。(资源地域为同地域即可,本次引导默认选杭州) 在列表页面,单击创建实例,并完成创建。 创建完成后点击详情进入实例详情页; 在“Vhost 列表” 创建 “testamqpv”; 在“Queue 列表” ,选择 Vhost 为“testamqpv”,并创建 “testamqpq”; 3)创建 EventBridge 事件流任务   MNS TO MNS(A 实验可选) 本步骤将指导您如何通过控制台创建 EventBridge 事件流。 使用您自己的阿里云账号登录阿里云控制台,然后访问 EventBridge 控制台。[3] 注:第一次使用需开通。 单击“事件流”列表,并在列表创建任务 (资源地域为同地域即可,本次引导默认选杭州) 创建事件流名称为“testamqpmns2mns”,点击下一步; 指定事件源,事件提供方为“消息服务 MNS”,队列名称为“testmnsq”,点击下一步; 指定规则,规则部分可不做筛选,默认匹配全部,直接点击下一步; 注意:规则内容可根据需求自行指定,为降低难度本次实验默认投递全部,更多详情请查阅: 服务类型选择“消息服务 MNS”,队列名称选择“testmnsq2”,消息内容选择“部分事件”,点击创建 注意:消息内容可根据需求自行指定,本次实验默认投递 data 字段,更多详情请查阅: 创建完成后,可点击“启动”来启动事件流 4)创建 EventBridge 事件流任务 MNS TO RabbitMQ(B 实验可选) 本步骤将指导您如何通过控制台创建 EventBridge 事件流。 使用您自己的阿里云账号登录阿里云控制台,然后访问 EventBridge 控制台。[3]注:第一次使用需开通。 单击“事件流”列表,并在列表创建任务 (资源地域为同地域即可,本次引导默认选杭州) 创建事件流名称为“testamqpmns2rabbitmq”,点击下一步 指定事件源,事件提供方为“消息服务 MNS”,队列名称为“testmnsq”,点击下一步 指定规则,规则部分可不做筛选,默认匹配全部,直接点击下一步 注意:规则内容可根据需求自行指定,为降低难度本次实验默认投递全部,更多详情请查阅: 服务类型选择“消息队列 RabbitMQ 版本”,具体配置如下,点击创建 实例ID:选择创建好的RabbitMQ ID Vhost:选择“testamqpv” 目标类型:选择“Queue” Queue:选择“testamqpq” Body:选择“部分事件”,填写“$.data” MessageId:选择“常量”,填写“0” Properties:选择“部分事件”,填写“$.source” 注意:消息内容可根据需求自行指定,本次实验默认投递 data 字段,更多详情请查阅: 创建完成后,可点击“启动”来启动事件流 5)验证路由任务 向 MNS Source  “testmnsq ” 发送实验消息 点击下载 MNS SDK[4] 修改 sample.cfg 在 “sample.cfg ” 填写 AccessKeyId,AccessKeySecret,Endpoint 等信息 AccessKeyId,AccessKeySecret 可在阿里云 RAM 控制台[5]创建 Endpoint 即步骤 1 , MNS 公网接入点地址 AccessKeyId = xxxxxx AccessKeySecret = xxxxxxx Endpoint = http://xxxx.mns.cnhangzhou.aliyuncs.com 填完效果如下,保存 找到 sample 目录的“sendmessage.py” 示例 将循环参数调整为 200,并保存 (可选) 保存并运行 “python sendmessage.py testmnsq” python sendmessage.py testmnsq 在事件流控制台[6],分别点开 “testmnsq2”, “testamqpq” 查看详情转储详情。 注意:MNS Q 仅支持单订阅,不支持广播模式。故该测试需要将 MNS/RabbitMQ 两个实验,任选其一关停后进行实验。 如需广播模式,请创建 MNS Topic 资源。 A 链路实验结果: B 链路实验结果: 优势及总结 EventBridge 事件流提供端到端的消息路由能力,通过简单配置即可完成消息分发,消息同步,跨地域消息备份,跨产品消息同步等能力。具有运维简单,成本低,效率高,使用稳定等优势。同时使用 EventBridge 可以实现基础的数据过滤,数据脱敏等数据处理类能力。是消息路由场景下运维成本最低的解决方案。 相关链接 [1] 消息队列MNS版控制台 [2] 消息队列RabbitMQ版控制台 [3] EventBridge 控制台 [4] 点击下载 MNS SDK [5] 阿里云RAM 控制台 [6] 事件流控制台
作者:肯梦
#技术探索 #生态集成

2022年2月22日

RocketMQ-Streams 首个版本发布,轻量级计算的新选择
RocketMQStreams 聚焦「大数据量高过滤轻窗口计算」场景,核心打造轻资源,高性能优势,在资源敏感场景有很大优势,最低 1Core,1G 可部署。通过大量过滤优化,性能比其他大数据提升 25 倍性能。广泛应用于安全,风控,边缘计算,消息队列流计算。 RocketMQStreams 兼容 Flink 的 SQL,udf/udtf/udaf,将来我们会和 Flink 生态做深度融合,即可以独立运行,也可发布成 Flink 任务,跑在 Flink 集群,对于有 Flink 集群的场景,即能享有轻资源优势,可以做到统一部署和运维。 01 _RocketMQStreams 特点及应用场景_  RocketMQStreams 应用场景 计算场景:适合大数据量高过滤轻窗口计算的场景。不同于主流计算引擎,需要先部署集群,写任务,发布,调优,运行这么复杂的过程。RocketMQStreams 本身就是一个 lib 包,基于 SDK 写完流任务,可以直接运行。支持大数据开发需要的计算特性:ExactlyONCE,灵活窗口(滚动、滑动、会话),双流Join,高吞吐、低延迟、高性能。最低 1Core,1G 可以运行。 SQL引擎:RocketMQStreams 可视作一个 SQL 引擎,兼容 Flink SQL 语法,支持 Flink udf/udtf/udaf 的扩展。支持 SQL 热升级,写完 SQL,通过 SDK 提交 SQL,就可以完成 SQL 的热发布。 ETL引擎:RocketMQStreams 还可视作 ETL 引擎,在很多大数据场景,需要完成数据从一个源经过 ETl,汇聚到统一存储,里面内置了 grok,正则解析等函数,可以结合 SQL 一块完成数据 ETL 。 开发 SDK,它也是一个数据开发 SDK 包,里面的大多数组件都可以单独使用,如 Source/sink,它屏蔽了数据源,数据存储细节,提供统一编程接口,一套代码,切换输入输出,不需要改变代码。  RocketMQStreams 设计思路 设计目标 依赖少,部署简单,1Core,1G 单实例可部署,可随意扩展规模。 实现需要的大数据特性:ExactlyONCE,灵活窗口(滚动、滑动、会话),双流 Join,高吞吐、低延迟、高性能。 实现成本可控,实现低资源,高性能。 兼容 Flink SQL,UDF/UDTF,让非技术人员更易上手。 设计思路 采用 sharednothing 的分布式架构设计,依赖消息队列做负载均衡和容错机制,单实例可启动,增加实例实现能力扩展。并发能力取决于分片数。 利用消息队列的分片做 shuffle,利用消息队列负载均衡实现容错。 利用存储实现状态备份,实现 ExactlyONCE 的语义。用结构化远程存储实现快速启动,不必等本地存储恢复。  RocketMQStreams 特点和创新 02 _RocketMQStreams SDK 详解_  Hello World 按照惯例,我们先从一个例子来了解 RocketMQStreams namespace:相同 namespace 的任务可以跑在一个进程里,可以共享配置 pipelineName:job name DataStreamSource:创建 source 节点 map:用户函数,可以通过实现 MapFunction 扩展功能 toPrint:结果打印出来 start:启动任务 运行上面代码就会启动一个实例。如果想多实例并发,可以启动多个实例,每个实例消费部分 RocketMQ 的数据。 运行结果:把原始消息拼接上“”,并打印出来  RocketMQStreams SDK StreamBuilder 做为起点,通过设置 namespace,jobName 创建一个 DataStreamSource 。 DataStreamSource 通过 from 方法,设置 source,创建 DataStream 对象。 DataStream 提供多种操作,会产生不同的流: to 操作产生 DataStreamAction window 操作产生 WindowStream 配置 window 参数 join 操作产生 JoinStream 配置 join 条件 Split 操作产生 SplitStream 配置 split 条件 其他操作产生 DataStream DataStreamAction 启动整个任务,也可以配置任务的各种策略参数。支持异步启动和同步启动。  RocketMQStreams 算子  RocketMQStreams 算子 SQL 有两种部署模式,1 是直接运行 client 启动 SQL,见第一个红框;2 是搭建 server 集群,通过 client 提交 SQL 实现热部署,见第二个红框。 RocketMQStreams SQL 扩展,支持多种扩展方式: 通过 FlinkUDF,UDTF,UDAF 扩展 SQL 能力,在 SQL 中通过 create function 引入,有个限制条件,即 UDF 在 open 时未用到 Flink FunctionContext 的内容。 通过内置函数扩展 SQL 的函数,语法同 Flink 语法,函数名是内置函数的名称,类名是固定的。如下图,引入了一个 now 的函数,输出当前时间。系统内置了 200 多个函数,可按需引入。 通过扩展函数实现,实现一个函数很简单,只需要在 class 上标注 Function,在需要发布成函数的方法上标注 FunctionMethod,并设置需要发布的函数名即可,如果需要系统信息,前面两个函数可以是 IMessage 和 Abstract,如果不需要,直接写参数即可,参数无格式要求。如下图,创建了一个 now 的函数,两种写法都可以。可以通过 currentTime=now()来调用,会在 Message 中增加一个 key=currentTime,value=当前时间的变量。 把现有 java 代码发布成函数,通过策略配置,把 java 代码的类名,方法名,期望用到的函数名,配置进去,把 java 的 jar 包 copy 到 jar 包目录即可。下图是几种扩展的应用实例。 03 _RocketMQStreams 架构及原理实现_  整体架构  Source 实现 Source 要求实现最少消费一次的语义,系统通过 checkpoint 系统消息实现,在提交 offset 前发送 checkpoint 消息,通知所有算子刷新内存。 Source 支持分片的自动负载均衡和容错。 数据源在分片移除时,发送移除系统消息,让算子完成分片清理工作。 当有新分片时, 发送新增分片消息,让算子完成分片的初始化。 数据源通过 start 方法,启动 consuemr 获取消息。 原始消息经过编码,附加头部信息包装成 Message 投递给后续算子。  Sink 实现 Sink 是实时性和吞吐的一个结合。 实现一个 Sink 只要继承 AbstractSink 类实现 batchInsert 方法即可。batchInsert 的含义是一批数据写入存储,需要子类调用存储接口实现,尽量应用存储的批处理接口,提高吞吐。 常规的使用方式是写 Messagecacheflush存储的方式,系统会严格保证,每次批次写入存储的量不超过 batchsize 的量,如果超了,会拆分成多批写入。 Sink 有一个 cache,数据默认写 cache,批次写入存储,提高吞吐量。(一个分片一个 cache)。 可以开启自动刷新,每个分片会有一个线程,定时刷新 cache 数据到存储,提高实时性。实现类:DataSourceAutoFlushTask 。 也可以通过调用 flush 方法刷新 cache 到存储。 Sink 的 cache 会有内存保护,当 cache 的消息条数batchSize,会强制刷新,释放内存。  RocketMQStreams ExactlyONCE Source 确保在 commit offset 时,会发送 checkpoint 系统消息,收到消息的组件会完成存盘操作。消息至少消费一次。 每条消息会有消息头部,里面封装了 QueueId 和 offset 。 组件在存储数据时,会把 QueueId 和处理的最大 offset 存储下来,当有消息重复时,根据 maxoffset 去重。 内存保护,一个 checkpoint 周期可能有多次 flush(条数触发),保障内存占用可控。  RocketMQStreams Window 支持滚动,滑动和会话窗口。支持事件时间和自然时间(消息进入算子的时间)。 支持高性能模式和高可靠模式,高性能模式不依赖远程存储,但在分片切换时的窗口数据会有丢失。 快速启动,无需等本地存储恢复,在发生错误或分片切换时,异步从远程存储恢复数据,同时直接访问远程存储计算。 利用消息队列负载均衡,实现扩容缩容,每个 Queue 是一个分组,一个分组同一刻只被一台机器消费。 正常计算依赖本地存储,具备 Flink 相似的计算性能。 支持三种触发模式,可以均衡 watermark 延迟和实时性要求 04 _RocketMQStreams 在云安全的应用_  在安全应用的背景 公共云转战专有云,在入侵检测计算方面遇到了资源问题,大数据集群默认不输出,输出最低 6 台高配机器,用户很难接受因为买云盾增配一套大数据集群。 专有云用户升级,运维困难,无法快速升级能力和修复 bug。  流计算在安全的应用 基于安全特点(大数据高过滤轻窗口计算)打造轻量级计算引擎:经过分析所有的规则都会做前置过滤,然后才会做较重的统计,窗口,join 操作,且过滤率比较高,基于此特点,可以用更轻的方案实现统计,join 操作。 通过 RocketMQStreams,覆盖 100%专有云规则(正则,join,统计)。 轻资源,内存是公共云引擎的 1/70,CPU 是 1/6,通过指纹过滤优化,性能提升 5 倍以上,且资源不随规则线性增加,新增规则无资源压力。复用以前的正则引擎资源,可支持 95%以上局点,不需要增加额外物理资源。 通过高压缩维表,支持千万情报。1000 W 数据只需要 330 M 内存。 通过 C/S 部署模式,SQL 和引擎可热发布,尤其护网场景,可快速上线规则。 05 _RocketMQStreams 未来规划_ 新版本下载地址:
作者:袁小栋、程君杰
#社区动态 #流处理

2022年2月5日

技术盘点:消息中间件的过去、现在和未来
操作系统、数据库、中间件是基础软件的三驾马车,而消息队列属于最经典的中间件之一,已经有30多年的历史。其发展主要经历了以下几个阶段: 第一个阶段,2000年之前。80年代诞生了第一款消息队列是 The Information Bus,第一次提出发布订阅模式来解决软件之间的通信问题;到了90年代,则是国际商业软件巨头的时代,IBM、Oracle、Microsoft纷纷推出了自己的 MQ,其中最具代表性的是IBM MQ,价格昂贵,面向高端企业,如大型金融、电信等企业;这类商业MQ一般采用高端硬件,软硬件一体机交付,MQ本身的架构是单机架构。 第二阶段,2000~2007年。进入00年代后,初代开源消息队列崛起,诞生了JMS、AMQP两大标准,与之对应的两个实现分别为 ActiveMQ、RabbitMQ,开源极大的促进了消息队列的流行度,降低了使用门槛,逐渐成为了企业级架构的标配。相比于今天而言,这类MQ主要还是面向传统企业级应用,面向小流量场景,横向扩展能力比较弱。 第三阶段,2007~2018年。PC互联网、移动互联网爆发式发展。由于传统的消息队列无法承受亿级用户的访问流量和海量数据传输,诞生了互联网消息中间件,核心能力是全面采用分布式架构、具备很强的横向扩展能力,开源典型代表有 Kafka、RocketMQ,还有淘宝的 Notify。Kafka 的诞生还将消息中间件从Messaging领域延伸到了 Streaming 领域,从分布式应用的异步解耦场景延伸到大数据领域的流存储和流计算场景。 第四阶段,2014~至今。IoT、云计算、云原生引领了新的技术趋势。面向IoT的场景,消息队列开始从云内服务端应用通信,延伸到边缘机房和物联网终端设备,支持MQTT等物联网标准协议也成了各大消息队列的标配。 随着云计算的普及,云原生的理念深入人心,各种云原生代表技术层出不穷,包括容器、微服务、Serverless、Service Mesh、事件驱动等。云原生的核心问题是如何重新设计应用,才能充分释放云计算的技术红利,实现业务成功最短路径。 消息队列本身作为云计算的PaaS服务之一,要进一步发挥“解耦”的能力,帮助业务构建现代化应用,这里最关键的一个能力演进是Eventing的演进。通过将消息升华为“事件”,提供面向标准 CloudEvent 的编排过滤、发布订阅等能力构建更大范围的解耦,包括云服务事件和业务应用的解耦、跨组织SaaS业务事件的解耦、遗留应用和现代化应用的解耦等,同时事件驱动也是天然符合云计算 Serverless 函数计算的范式,是应用 Serverless 化演进的催化剂。 云原生对于消息中间件而言,还有另一层含义就是消息队列自身架构的云原生化演进,如何充分发挥云的弹性计算、存储、网络,让自己获得更强的技术指标和 Serverless 弹性能力。 消息中间件在技术上有哪些进展与突破? 阿里云 MQ 是基于 RocketMQ 打造的一站式消息服务,以 RocketMQ 作为统一内核,实现业界标准、主流的消息协议,包括MQTT、Kafka、RabbitMQ、AMQP、CloudEvent、HTTP等,满足客户多样化场景诉求。为了提高易用性,我们分别对不同的协议进行了产品化,以独立产品的模式提供消息服务(如阿里云RabbitMQ、阿里云Kafka),开箱即用、免运维、完备的可观测体系,帮助开源客户无缝迁云。 在经历数万企业客户多样化场景的持续打磨,数年的超大规模云计算的生产实践,我们的内核RocketMQ逐渐往一体化架构和云原生架构演进。 1. 一体化架构 微服务、大数据、实时计算、IoT、事件驱动等技术潮流,不断的扩展消息的业务边界,业界有不同的消息队列满足不同的业务场景,比如RabbitMQ侧重满足微服务场景,Kafka则是侧重于满足大数据、事件流场景,EMQ则是满足了IoT垂直领域场景。而随着数字化转型的深入,客户的业务往往同时涉及交叉场景,比如来自物联网设备的消息、或者微服务系统产生的业务消息要进行实时计算,如果是引入多套系统,会带来额外的机器、运维、学习等成本。 在过去“分”往往是技术实现的妥协,而现在“合”才是用户的真正需求。RocketMQ 5.0基于统一Commitlog扩展多元化索引,包括时间索引、百万队列索引、事务索引、KV索引、批量索引、逻辑队列等技术。在场景上同时支撑了RabbitMQ、Kafka、MQTT、边缘轻量计算等产品能力,真正实现了“消息、事件、流”,“云边端”一体化架构。 2. 云原生架构   云原生架构是指云上原生的架构,云计算是云原生的“源动力”,脱离了云计算谈云原生如同纸上谈兵。RocketMQ 过去几年正是立足于阿里云超大规模的云计算生产实践,帮助数万企业完成数字化转型的经验中吸取养分,从而完成互联网消息中间件到云原生消息中间件的进化。这也是 RocketMQ 和其他消息中间件最大的区别,他是实践出来的云原生架构,下面我们盘点一下 RocketMQ 在云原生架构的关键技术演进。 RocketMQ 是 2011 年诞生于淘宝核心电商系统,一开始是定位于服务集团业务,面向单一超大规模互联网企业设计。原来的架构并不能很好的满足云计算的场景,有不少的痛点,比如重型 SDK,客户端逻辑复杂、多语言 SDK 开发成本高、商业特性迭代慢;弹性能力差,计算存储耦合、客户端和物理队列数耦合、队列数无法扩展到百万级、千万级;而其他主流的开源消息项目也同样未进行云原生架构的转型,比如 RabbitMQ 单队列能力无法横向扩展、Kafka 弹性扩容会面临大量的数据拷贝均衡等,都不适用于在公共云为大规模客户提供弹性服务。 为此,RocketMQ 5.0 面向云计算的场景进行重新设计,期望从架构层面解决根本性问题,对客户端、Broker到存储引擎全面升级: 客户端轻量化。RocketMQ 5.0 SDK 把大量逻辑下沉到服务端,代码行数精简三分之二,开发维护多语言 SDK 的成本大幅度降低;轻量的 SDK 更容易被 Service Mesh、Dapr等云原生代表技术集成。 可分可合的存算分离架构。用户根据不同的场景诉求,既可以同一进程启动存储和计算的功能,也可以将两者分开部署。分开部署后的计算节点可以做到“无状态”,一个接入点可代理所有流量,在云上结合新硬件内核旁路技术,可以降低分离部署带来的性能及延迟问题。而选择“存储计算一体化”架构,则具备“就近计算”的优势,性能更优。在云上多租、多VPC复杂网络、多协议接入方式的场景下,采用存储计算分离模式能够避免后端存储服务直接暴露给客户端,便于实现流量的管控、隔离、调度、权限管理、协议转换等。 但是有利必有弊,存算分离也同时带来了链路变长、延迟增大、机器成本上升等问题,运维也没得到简化,除了要运维有状态存储节点外,还要多运维无状态计算节点。其实在大多数简单消息收发场景,数据链路基本上就是写Log、读Log,无复杂计算逻辑(计算逻辑和数据库相比太简单),这个时候优选存储计算一体化架构,简单够用、性能高、延迟低。特别是在大数据传输场景下,存算一体能够极大降低机器及流量成本,这个从 Kafka 的架构演进也可以得到印证。总的来说不要为了存算分离而分离,还是要回归客户、业务场景的本质诉求。 弹性存储引擎。面向 IoT 海量设备、云上大规模小客户场景,我们引入 LSM 的 KV 索引,实现单机海量队列的能力,队列数量可以无限扩展;为了进一步释放云存储的能力,我们实现分级存储,消息存储时长从3天提高到月、年级别,存储空间可以无限扩展,同时还分离了冷热数据,冷数据存储成本降低了80%。 Serverless化。在老架构里面,客户感知物理队列,物理队列绑定固定存储节点,强状态。Broker、客户端、物理队列的扩缩容互相耦合,负载均衡粒度是队列级,对Serverless的技术演进很不友好。为了实现极致弹性 Serverless,RocketMQ 5.0 对逻辑资源和物理资源做进一步的解耦。 在 Messaging/无序消息的场景,客户指定 Topic 进行消息无序收发,新架构对客户端屏蔽队列概念,只暴露逻辑资源 Topic。负载均衡粒度从队列级到消息级,实现了客户端的无状态化,客户端、服务端弹性伸缩解耦。 在 Streaming/顺序消息的场景,客户端需要指定 Topic 下的某个队列(也称分区)进行消息顺序收发。在新架构里,对客户端屏蔽物理队列,引入逻辑队列概念,一个逻辑队列通过横向分片和纵向分段,分散在不同的物理存储节点。横向分片解决了高可用问题,同一个逻辑队列的多个分片多点随机可写,基于 Happen before 的原理保序,秒级 Failover,无需主备切换;纵向分段,解决逻辑队列的扩容问题,通过多级队列映射,实现 0 数据迁移的秒级扩容,逻辑资源和物理资源的弹性伸缩解耦。   如何看待消息领域生态玩家? 在云原生、IoT、大数据的趋势引导下,消息成为现代化应用架构的刚需,使用场景更加广泛,可应用于微服务的异步解耦、事件驱动、物联网设备数据上下行、大数据流存储、轻量流计算等场景。客户需求旺盛、市场活跃,吸引了不少厂商加入角逐。 从好的角度来看,厂商的充分竞争,会进一步激活创新,培养更多用户,共同做大消息的市场,用户看起来也有更多的选择; 从坏的角度来看,未来部分竞争失利的消息队列会进入停滞期、下线期,用户的应用就会面临迁移大改造和稳定性风险,所以建议用户在满足自身业务需求的情况下,尽可能选择标准接口、协议的方式接入,或者直接使用业界事实标准的消息队列。 消息中间件未来的发展趋势是什么?   随着 IoT、5G 网络的持续发展,数据量增速28%,预计到2025年物联网设备将达到 400 亿台,进入万物互联的时代。物联网时代的消息存储量和计算量会爆发式增长,消息系统将面临巨大的成本压力。未来消息系统,需要深挖新硬件的红利,比如持久内存、DPU等技术,采用软硬结合的方式深度优化,将消息的存储计算成本进一步降低。 IoT时代还有另外一个很重要的趋势是边缘计算,Gartner 预计到 2025 年,75%的数据将在传统数据中心或云环境之外进行处理,消息系统需要进一步轻量化、降低资源消耗以适应边缘计算环境。这也意味着,消息中间件的一体化架构,要具备良好的插件化设计,能够根据场景的特点实现多形态输出。比如公共云的形态可以和公共云的基础设施深度集成,充分利用云盘、对象存储增强存储能力,集成日志服务、应用监控等服务提升可观测能力;而边缘计算的形态则是以最小的资源代价输出核心存储、轻量计算的能力,简单够用即可。 近几年云计算高速发展,得益于全球范围内大量企业在进行数字化转型,通过业务在线化、业务数据化、数据智能化来提升企业竞争力。数据化转型也伴随着商业思维的转型,越来越多的企业采用“事件驱动”的模式来构建商业逻辑和数字化系统。 Gartner预测,未来超过60%的新型数字化商业的解决方案会采用“事件驱动”模式,从业务角度看,“事件驱动”的模式能够帮助企业实时响应客户,抓住更多的商业机会,创造增量价值;从技术角度看,“事件驱动”的架构,能够以动态、灵活、解耦的方式来链接跨组织、跨环境的异构系统,天然适合用于构建大型的跨组织数字化商业生态。 为了应对这个趋势,Messaing 往 Eventing 演进,出现了 EventBridge (EventBroker)的产品形态。在 EventBridge 里,“事件”这个概念成为一等公民,事件的发布者和订阅者不耦合任何一种具体的消息队列SDK和实现。EventBroker 围绕标准的 CloudEvent 规范构建更加泛化的发布订阅模式,能够链接一切跨组织、跨环境的异构事件源和事件处理目标。 目前以“事件驱动”构建的数字化商业生态才刚起步,未来 EventBridge 将围绕事件这一抽象层次实现更强大的能力,比如事件的全链路可观测、事件分析计算、低代码开发等特性,帮助企业全面落地云时代的“事件驱动”架构。 作者介绍: 林清山(花名:隆基),阿里云资深技术专家,阿里云消息产品线负责人。国际消息领域专家,致力于消息、实时计算、事件驱动等方向的研究与探索,推进 RocketMQ 云原生架构、超融合架构的演进。
作者:林清山(花名:隆基)
#技术探索

2022年1月26日

Apache RocketMQ + Hudi 快速构建 Lakehouse
本文目录 背景知识 大数据时代的构架演进 RocketMQ Connector&Stream Apache Hudi 构建Lakehouse实操 本文标题包含三个关键词:Lakehouse、RocketMQ、Hudi。我们先从整体Lakehouse架构入手,随后逐步分析架构产生的原因、架构组件特点以及构建Lakehouse架构的实操部分。 背景知识 1、Lakehouse架构 Lakehouse最初由Databrick提出,并对Lakehouse架构特征有如下要求: (1)事务支持 企业内部许多数据管道通常会并发读写数据。对ACID事务的支持确保了多方并发读写数据时的一致性问题; (2)Schema enforcement and governance Lakehouse应该有一种方式可以支持模式执行和演进、支持DW schema的范式(如星星或雪花模型),能够对数据完整性进行推理,并且具有健壮的治理和审计机制; (3)开放性 使用的存储格式是开放式和标准化的(如parquet),并且为各类工具和引擎,包括机器学习和Python/R库,提供API,以便它们可以直接有效地访问数据; (4)BI支持 Lakehouse可以直接在源数据上使用BI工具。这样可以提高数据新鲜度、减少延迟,并且降低了在数据池和数据仓库中操作两个数据副本的成本; (5)存储与计算分离 在实践中,这意味着存储和计算使用单独的集群,因此这些系统能够扩展到支持更大的用户并发和数据量。一些现代数仓也具有此属性; (6)支持从非结构化数据到结构化数据的多种数据类型 Lakehouse可用于存储、优化、分析和访问许多数据应用所需的包括image、video、audio、text以及半结构化数据; (7)支持各种工作负载 包括数据科学、机器学习以及SQL和分析。可能需要多种工具来支持这些工作负载,但它们底层都依赖同一数据存储库; (8)端到端流 实时报表是许多企业中的标准应用。对流的支持消除了需要构建单独系统来专门用于服务实时数据应用的需求。 从上述对Lakehouse架构的特点描述我们可以看出,针对单一功能,我们可以利用某些开源产品组合构建出一套解决方案。但对于全部功能的支持,目前好像没有一个通用的解决方案。接下来,我们先了解大数据时代主流的数据处理架构是怎样的。 大数据时代的架构演进 1、大数据时代的开源产品 大数据时代的开源产品种类繁多,消息领域的RocketMQ、Kafka;计算领域的flink、spark、storm;存储领域的HDFS、Hbase、Redis、ElasticSearch、Hudi、DeltaLake等等。 为什么会产生这么多开源产品呢?首先在大数据时代数据量越来越大,而且每个业务的需求也各不相同,因此就产生出各种类型的产品供架构师选择,用于支持各类场景。然而众多的品类产品也给架构师们带来一些困扰,比如选型困难、试错成本高、学习成本高、架构复杂等等。 2、当前主流的多层架构 大数据领域的处理处理场景包含数据分析、BI、科学计算、机器学习、指标监控等场景,针对不同场景,业务方会根据业务特点选择不同的计算引擎和存储引擎;例如交易指标可以采用binlog + CDC+ RocketMQ + Flink + Hbase + ELK组合,用于BI和Metric可视化。 (1)多层架构的优点:支持广泛的业务场景; (2)多层架构的缺点: 处理链路长,延迟高; 数据副本多,成本翻倍; 学习成本高; 造成多层架构缺点主要原因是存储链路和计算链路太长。 我们真的需要如此多的解决方案来支持广泛的业务场景吗?Lakehouse架构是否可以统一解决方案? 多层架构的存储层是否可以合并?Hudi产品是否能够支持多种存储需求? 多层架构的计算层是否可以合并?RocketMQ stream是否能够融合消息层和计算层? 当前主流的多层架构 3、Lakehouse架构产生 Lakehouse架构是多层架构的升级版本,将存储层复杂度继续降低到一层。再进一步压缩计算层,将消息层和计算层融合,RocketMQ stream充当计算的角色。我们得到如下图所示的新架构。新架构中,消息出入口通过RocketMQ connector实现,消息计算层由RocketMQ stream实现,在RocketMQ内部完成消息计算中间态的流转;计算结果通过RocketMQHudiconnector收口落库Hudi,Hudi支持多种索引,并提供统一的API输出给不同产品。 Lakehouse架构 下面我们分析下该架构的特点。 (1)Lakehouse架构的优点: 链路更短,更适合实时场景,数据新鲜感高; 成本可控,降低了存储成本; 学习成本低,对程序员友好; 运维复杂度大幅降低; (2)Lakehouse架构的缺点 对消息产品和数据湖产品的稳定性、易用性等要求高,同时消息产品需要支持计算场景,数据湖产品需要提供强大的索引功能。 (3)选择 在Lakehouse架构中我们选择消息产品RocketMQ和数据湖产品Hudi。 同时,可以利用RocketMQ stream在RocketMQ集群上将计算层放在其中集成,这样就将计算层降低到一层,能够满足绝大部分中小型大数据处理场景。 接下来我们逐步分析RocketMQ和Hudi两款产品的特点。 RocketMQ Connector & Stream RocketMQ 发展历程图 RocketMQ从2017年开始进入Apache孵化,2018年RocketMQ 4.0发布完成云原生化,2021年RocketMQ 5.0发布全面融合消息、事件、流。 1、业务消息领域首选 RocketMQ作为一款“让人睡得着觉的消息产品”成为业务消息领域的首选,这主要源于产品的以下特点: (1)金融级高可靠 经历了阿里巴巴双十一的洪峰检验; (2)极简架构 如下图所示, RocketMQ的架构主要包含两部分包括:源数据集群NameServer Cluster和计算存储集群Broker Cluster。 RocketMQ 构架图 NameServer节点无状态,可以非常简单的进行横向扩容。Broker节点采用主备方式保证数据高可靠性,支持一主多备的场景,配置灵活。 搭建方式:只需要简单的代码就可以搭建RocketMQ集群: Jar: nohup sh bin/mqnamesrv & nohup sh bin/mqbroker n localhost:9876 & On K8S: kubectl apply f example/rocketmq_cluster.yaml (3)极低运维成本 RocketMQ的运维成本很低,提供了很好的CLI工具MQAdmin,MQAdmin提供了丰富的命令支持,覆盖集群健康状态检查、集群进出流量管控等多个方面。例如,mqadmin clusterList一条命令可以获取到当前集群全部节点状态(生产消费流量、延迟、排队长度、磁盘水位等);mqadmin updateBrokerConfig命令可以实时设置broker节点或topic的可读可写状态,从而可以动态摘除临时不可用节点,达到生产消费的流量迁移效果。 (4)丰富的消息类型 RocketMQ支持的消息类型包括:普通消息、事务消息、延迟消息、定时消息、顺序消息等。能够轻松支持大数据场景和业务场景。 (5)高吞吐、低延迟 压测场景主备同步复制模式,每台Broker节点都可以将磁盘利用率打满,同时可以将p99延迟控制在毫秒级别。 2、RocketMQ 5.0概况 RocketMQ 5.0是生于云、长于云的云原生消息、事件、流超融合平台,它具有以下特点: (1)轻量级SDK 全面支持云原生通信标准 gRPC 协议; 无状态 Pop 消费模式,多语言友好,易集成; (2)极简架构 无外部依赖,降低运维负担; 节点间松散耦合,任意服务节点可随时迁移; (3)可分可合的存储计算分离 Broker 升级为真正的无状态服务节点,无 binding; Broker 和 Store节点分离部署、独立扩缩; 多协议标准支持,无厂商锁定; 可分可合,适应多种业务场景,降低运维负担; 如下图所示,计算集群(Broker)主要包括抽象模型和相对应的协议适配,以及消费能力和治理能力。存储集群(Store)主要分为消息存储CommitLog(多类型消息存储、多模态存储)和索引存储Index(多元索引)两部分,如果可以充分发挥云上存储的能力,将CommitLog和Index配置在云端的文件系统就可以天然的实现存储和计算分离。 (4)多模存储支持 满足不同基础场景下的高可用诉求; 充分利用云上基础设施,降低成本; (5)云原生基础设施: 可观测性能力云原生化,OpenTelemetry 标准化; Kubernetes 一键式部署扩容交付。 RocketMQ 5.02021年度大事件及未来规划 3、RocketMQConnector a、传统数据流 (1)传统数据流的弊端 生产者消费者代码需要自己实现,成本高; 数据同步的任务没有统一管理; 重复开发,代码质量参差不齐; (2)解决方案:RocketMQ Connector 合作共建,复用数据同步任务代码; 统一的管理调度,提高资源利用率; b、RocketMQ Connector数据同步流程 相比传统数据流,RocketMQ connector数据流的不同在于将 source 和 sink 进行统一管理,同时它开放源码,社区也很活跃。 4、RocketMQ Connector架构 如上图所示,RocketMQ Connector架构主要包含Runtime和Worker两部分,另外还有生态Source&Sink。 (1)标准:OpenMessaging (2)生态:支持ActiveMQ、Cassandra、ES、JDBC、JMS、MongoDB、Kafka、RabbitMQ、Mysql、Flume、Hbase、Redis等大数据领域的大部分产品; (3)组件:Manager统一管理调度,如果有多个任务可以将所有任务统一进行负载均衡,均匀的分配到不同Worker上,同时Worker可以进行横向扩容。 5、RocketMQ Stream RocketMQ Stream是一款将计算层压缩到一层的产品。它支持一些常见的算子如window、join、维表,兼容Flink SQL、UDF/UDAF/UDTF。 Apache Hudi Hudi 是一个流式数据湖平台,支持对海量数据快速更新。内置表格式,支持事务的存储层、一系列表服务、数据服务(开箱即用的摄取工具)以及完善的运维监控工具。Hudi 可以将存储卸载到阿里云上的 OSS、AWS 的S3这些存储上。 Hudi的特性包括: 事务性写入,MVCC/OCC并发控制; 对记录级别的更新、删除的原生支持; 面向查询优化:小文件自动管理,针对增量拉取优化的设计,自动压缩、聚类以优化文件布局; Apache Hudi是一套完整的数据湖平台。它的特点有: 各模块紧密集成,自我管理; 使用 Spark、Flink、Java 写入; 使用 Spark、Flink、Hive、Presto、Trino、Impala、 AWS Athena/Redshift等进行查询; 进行数据操作的开箱即用工具/服务。 Apache Hudi主要针对以下三类场景进行优化: 1、流式处理栈 (1) 增量处理; (2) 快速、高效; (3) 面向行; (4) 未优化扫描; 2、批处理栈 (1) 批量处理; (2) 低效; (3) 扫描、列存格式; 3、增量处理栈 (1) 增量处理; (2) 快速、高效; (3) 扫描、列存格式。 构建 Lakehouse 实操 该部分只介绍主流程和实操配置项,本机搭建的实操细节可以参考附录部分。 1、准备工作 RocketMQ version:4.9.0 rocketmqconnecthudi version:0.0.1SNAPSHOT Hudi version:0.8.0 2、构建RocketMQHudiconnector (1) 下载: _  git clone _ (2) 配置: /data/lakehouse/rocketmqexternals/rocketmqconnect/rocketmqconnectruntime/target/distribution/conf/connect.conf 中connectorplugin 路径 (3) 编译: cd rocketmqexternals/rocketmqconnecthudi mvn clean install DskipTest U rocketmqconnecthudi0.0.1SNAPSHOTjarwithdependencies.jar就是我们需要使用的rocketmqhudiconnector 3、运行 (1) 启动或使用现有的RocketMQ集群,并初始化元数据Topic: connectorclustertopic (集群信息) connectorconfigtopic (配置信息) connectoroffsettopic (sink消费进度) connectorpositiontopic (source数据处理进度 并且为了保证消息有序,每个topic可以只建一个queue) (2) 启动RocketMQ connector运行时 cd /data/lakehouse/rocketmqexternals/rocketmqconnect/rocketmqconnectruntime sh ./run_worker.sh Worker可以启动多个 (3) 配置并启动RocketMQhudiconnector任务 请求RocketMQ connector runtime创建任务 curl http://{runtimeip}:{runtimeport}/connectors/{rocketmqhudisinkconnectorname} ?config='{"connectorclass":"org.apache.rocketmq.connect.hudi.connector.HudiSinkConnector","topicNames":"topicc","tablePath":"file:///tmp/hudi_connector_test","tableName":"hudi_connector_test_table","insertShuffleParallelism":"2","upsertShuffleParallelism":"2","deleteParallelism":"2","sourcerecordconverter":"org.apache.rocketmq.connect.runtime.converter.RocketMQConverter","sourcerocketmq":"127.0.0.1:9876","srccluster":"DefaultCluster","refreshinterval":"10000","schemaPath":"/data/lakehouse/config/user.avsc"\}’ 启动成功会打印如下日志: 20210906 16:23:14 INFO pool2thread1 Open HoodieJavaWriteClient successfully (4) 此时向source topic生产的数据会自动写入到1Hudi对应的table中,可以通过Hudi的api进行查询。 4、配置解析 (1) RocketMQ connector需要配置RocketMQ集群信息和connector插件位置,包含:connect工作节点id标识workerid、connect服务命令接收端口httpPort、rocketmq集群namesrvAddr、connect本地配置储存目录storePathRootDir、connector插件目录pluginPaths 。 RocketMQ connector配置表 (2) Hudi任务需要配置Hudi表路径tablePath和表名称tableName,以及Hudi使用的Schema文件。 Hudi任务配置表 _点击__即可查看Lakehouse构建实操视频_ 附录:在本地Mac系统构建Lakehouse demo 涉及到的组件:rocketmq、rocketmqconnectorruntime、rocketmqconnecthudi、hudi、hdfs、avro、sparkshell0、启动hdfs 下载hadoop包 cd /Users/osgoo/Documents/hadoop2.10.1 vi coresite.xml fs.defaultFS hdfs://localhost:9000 vi hdfssite.xml dfs.replication 1 ./bin/hdfs namenode format ./sbin/startdfs.sh jps 看下namenode,datanode lsof i:9000 ./bin/hdfs dfs mkdir p /Users/osgoo/Downloads 1、启动rocketmq集群,创建rocketmqconnector内置topic QickStart:https://rocketmq.apache.org/docs/quickstart/ sh mqadmin updatetopic t connectorclustertopic n localhost:9876 c DefaultCluster sh mqadmin updatetopic t connectorconfigtopic n localhost:9876 c DefaultCluster sh mqadmin updatetopic t connectoroffsettopic n localhost:9876 c DefaultCluster sh mqadmin updatetopic t connectorpositiontopic n localhost:9876 c DefaultCluster 2、创建数据入湖的源端topic,testhudi1 sh mqadmin updatetopic t testhudi1 n localhost:9876 c DefaultCluster 3、编译rocketmqconnecthudi0.0.1SNAPSHOTjarwithdependencies.jar cd rocketmqconnecthudi mvn clean install DskipTest U 4、启动rocketmqconnector runtime 配置connect.conf workerId=DEFAULT_WORKER_1 storePathRootDir=/Users/osgoo/Downloads/storeRoot Http port for user to access REST API httpPort=8082 Rocketmq namesrvAddr namesrvAddr=localhost:9876 Source or sink connector jar file dir,The default value is rocketmqconnectsample pluginPaths=/Users/osgoo/Downloads/connectorplugins 拷贝 rocketmqhudiconnector.jar 到 pluginPaths=/Users/osgoo/Downloads/connectorplugins sh run_worker.sh 5、配置入湖config curl http://localhost:8082/connectors/rocketmqconnecthudi?config='\{"connectorclass":"org.apache.rocketmq.connect.hudi.connector.HudiSinkConnector","topicNames":"testhudi1","tablePath":"hdfs://localhost:9000/Users/osgoo/Documents/basepath7","tableName":"t7","insertShuffleParallelism":"2","upsertShuffleParallelism":"2","deleteParallelism":"2","sourcerecordconverter":"org.apache.rocketmq.connect.runtime.converter.RocketMQConverter","sourcerocketmq":"127.0.0.1:9876","sourcecluster":"DefaultCluster","refreshinterval":"10000","schemaPath":"/Users/osgoo/Downloads/user.avsc"\}' 6、发送消息到testhudi1 7、 利用spark读取 cd /Users/osgoo/Downloads/spark3.1.2binhadoop3.2/bin ./sparkshell \ packages org.apache.hudi:hudispark3bundle_2.12:0.9.0,org.apache.spark:sparkavro_2.12:3.0.1 \ conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' import org.apache.hudi.QuickstartUtils._ import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ val tableName = "t7" val basePath = "hdfs://localhost:9000/Users/osgoo/Documents/basepath7" val tripsSnapshotDF = spark. read. format("hudi"). load(basePath + "/") tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") spark.sql("select from hudi_trips_snapshot").show()
#社区动态 #生态集成

2022年1月23日

平安保险基于 SPI 机制的 RocketMQ 定制化应用
为什么选用 RocketMQ 首先跟大家聊聊我们为什么会选用 RocketMQ,在做技术选型的过程中,应用场景应该是最先考虑清楚的,只有确定好了应用场景在做技术选型的过程中才有明确的目标和衡量的标准。像异步、解耦、削峰填谷这些消息中间件共有的特性就不一一介绍了,这些特性是决定你的场景需不需要使用消息中间件,这里主要讲述下在确定使用消息中间件后,又是如何去选择哪款消息中间件的。 同步双写,确保业务数据安全可靠不丢失 我们在搭建消息中间件平台时的定位是给业务系统做业务数据的传输使用,对业务数据的很重要的一个要求就是不允许丢数据,所以选用 RocketMQ 的第一点就是他有同步双写机制,数据在主从服务器上都刷盘成功才算发送成功。同步双写条件下,MQ 的写入性能与异步刷盘异步赋值相比肯定会有所下降,与异步条件下大约会有 20% 左右的下降,单主从架构下,1K 的消息写入性能还是能达到 8W+ 的 TPS,对大部分业务场景而言性能是能完全满足要求的,另外对下降的这部分性能可以通过 broker 的横向扩招来弥补,所以在同步双写条件下,性能是能满足业务需求的。 多 topic 应用场景下,性能依旧强悍 第二点,业务系统的使用场景会特别多,使用场景广泛带来的问题就是会创建大量的 topic,所以这时候就得去衡量消息中间件在多 topic 场景下性能是否能满足需求。我自己在测试的时候呢,用 1K 的消息随机往 1 万个 topic 写数据,单 broker 状态下能达到2W左右的 TPS,这一点比 Kafka 要强很多。所以多 topic 应用场景下,性能依旧强悍是我们选用 topic 的第二个原因。这点也是由底层文件存储结构决定的,像 Kafka、RocketMQ 这类消息中间件能做到接近内存的读写能力,主要取决于文件的顺序读写和内存映射。RocketMQ 中的所有 topic 的消息都是写在同一个 commitLog 文件中的,但是 Kafka 中的消息是以 topic 为基本单位组织的,不同的 topic 之间是相互独立的。在多 topic 场景下就造成了大量的小文件,大量的小文件在读写时存在一个寻址的过程,就有点类似随机读写了,影响整体的性能。 支持事务消息、顺序消息、延迟消息、消息消费失败重试等 RocketMQ 支持事务消息、顺序消息、消息消费失败重试、延迟消息等,功能比较丰富,比较适合复杂多变的业务场景使用 社区建设活跃,阿里开源系统 另外,在选用消息中间件时也要考虑下社区的活跃度和源码所使用的开发语言,RocketMQ 使用 Java 开发,对 Java 开发人员就比较友好,不管是阅读源码排查问题还是在 MQ 的基础上做二次开发都比较容易一点。社区里同学大都是国内的小伙伴,对大家参与 RocketMQ 开源贡献也是比较亲近的,这里呢也是希望更多的小伙伴能参与进来,为国内开源项目多做贡献。 SPI 机制简介及应用 介绍完为什么选用 RocketMQ 后,接下来给大家介绍下我们是如何基于 SPI 机制应用 RocketMQ 的。SPI 全称为 (Service Provider Interface) ,是 JDK 内置的一种服务提供发现机制,我个人简单理解就是面向接口编程,留给使用者一个扩展的点,像 springBoot 中的 spring.factories 也是 SPI 机制的一个应用。如图给大家展示的是 RocketMQ 中 SPI 的一个应用。我们基于 SPI 机制的 RocketMQ 客户端的应用的灵感也是来自于 MQ 中 SPI 机制的应用。RocketMQ 在实现 ACL 权限校验的时候,是通过实现 AccessValidator 接口,PlainAccessValidator 是 MQ 中的默认实现。权限校验这一块,可能因为组织架构的不一样会有不同的实现方式,通过 SPI 机制提供一个接口,为开发者定制化开发提供扩展点。在有定制化需求时只需要重新实现 AccessValidator 接口,不需要对源码大动干戈。 接下来先给大家介绍下我们配置文件的一个简单模型,在这个配置文件中除了 sendMsgService、consumeMsgConcurrently、consumeMsgOrderly 这三个配置项外其余的都是 RocketMQ 原生的配置文件,发送消息和消费消息这三个配置项呢就是 SPI 机制的应用,是为具体实现提供的接口。可能有的同学会有疑问,SPI 的配置文件不是应该放在 METAINF.service 路径下么?这里呢我们是为了方便配置文件的管理,索性就跟 MQ 配置文件放在了一起。前面也提到了,METAINF.service 只是一个默认的路径而已,为了方便管理做相应的修改也没有违背SPI机制的思想。 我们再看下这个配置文件模型,这里的配置项呢囊括了使用 MQ 时所要配置的所有选项,proConfigs 支持所有的 MQ 原生配置,这样呢也就实现了配置与应用实现的解耦,应用端只需呀关注的具体的业务逻辑即可,生产者消费者的实现和消费者消费的 topic 都可以通过配置文件来指定。另外该配置文件也支持多 nameserver 的多环境使用,在较复杂的应用中支持往多套 RocketMQ 环境发送消息和消费多套不同环境下的消息。消费者提供了两个接口主要是为了支持 RocketMQ 的并发消费和顺序消费。接下来呢给大家分享下如何根据这个配置文件来初始化生产者消费者。首先给大家先介绍下我们抽象出来的客户端加载的一个核心流程。 客户端核心流程详情 图中大家可以看到,客户端的核心流程我们抽象成了三部分,分别是启动期、运行期和终止期。首先加载配置文件呢就是加载刚刚介绍的那个配置文件模型,在配置与应用完全解耦的状态下,必须先加载完配置文件才能初始化后续的流程。在初始化生产者和消费者之前应当先创建好应用实现的生产者和消费者的业务逻辑对象 供生产者和消费者使用。在运行期监听配置文件的变化,根据变化动态的调整生产者和消费者实例。这里还是要再强调下配置与应用的解耦为动态调整提供了可能。终止期就比较简单了,就是关闭生产者和消费者,并从容器中移除。这里的终止期指的生产者和消费者的终止,并不是整个应用的终止,生产者和消费者的终止可能出现在动态调整的过程中,所以终止了的实例一定要从容器中移除,方便初始化后续的生产者和消费者。介绍完基本流程后,接下来给大家介绍下配置文件的加载过程。 如何加载配置文件 配置文件加载这一块的话,流程是比较简单的。这里主要讲的是如何去兼容比较老的项目。RocketMQ 客户端支持的 JDK 最低版本是 1.6,所以在封装客户端时应该要考虑到新老项目兼容的问题。在这里呢我们客户端的核心包是支持 JDK1.6 的,spring 早期的项目配置文件一般都是放在在 resources 路径下,我们是自己实现了一套读取配置文件的和监听配置文件的方法,具体的大家可以参考 acl 中配置文件的读取和监听。在核心包的基础上用 springBoot 又封装了一套自动加载配置文件的包供微服务项目使用,配置文件的读取和监听都用的 spring 的那一套。配置文件加载完之后, 配置文件中应用实现的生产者和消费者是如何与 RocketMQ 的生产者和消费者相关联的呢?接下来给大家分享下这方面的内容。 如何将生产消费者与业务实现关联 首先先看下消费者是如何实现关联的,上图是 MQ 消费者的消息监听器,需要我们去实现具体的业务逻辑处理。通过将配置文件中实现的消费逻辑关联到这里就能实现配置文件中的消费者与 RocketMQ 消费者的关联。消费者的接口定义也是很简单,就是去消费消息。消费消息的类型可以通过泛型指定,在初始化消费者的时候获取具体实现的参数类型,并将 MQ 接受到的消息转换为具体的业务类型数据。由客户端统一封装好消息类型的转换。对消费消息的返回值大家可以根据需要与 MQ 提供的 status 做一个映射,这里的 demo 只是简单显示了下。在获取具体的应用消费者实例的时候,如果你的消费逻辑里使用了 spring 管理的对象,那么你实现的消费逻辑对象也要交给 spring 管理,通过 spring 上下文获取初始化好的对象;如果你的消费逻辑里没有使用 spring 进行管理,可以通过反射的方式自己创建具体的应用实例。 与消费者不一样的是生产者需要将初始化好的 producer 对象传递到应用代码中去,而消费者是去获取应用中实现的逻辑对象,那如何将 producer 传递到业务应用中去呢? 业务代码中实现的生产者需要继承 SendMessage,这样业务代码就获得了 RmqProducer 对象,这是一个被封装后的生产者对象,该对象对发送消息的方法进行的规范化定义,使之符合公司的相应规范制度,该对象中的方法也会对 topic 的命名规范进行检查,规范 topic 有一个统一的命名规范。 如何动态调整生产消费者 首先谈到动态调整就需要谈一下动态调整发生的场景,如果没有合适的使用场景的话实现动态调整就有点华而不实了。这里我列举了四个配置文件发生变化的场景: nameserver发生变化的时候,需要重新初始化所有的生产者和消费者,这个一般是在 MQ 做迁移或者当前 MQ 集群不可用是需要紧急切换 MQ; 增减实例的场景只要启动或关闭相应的实例即可,增加应用实例的场景一般是在需要增加一个消费者来消费新的 topic 的,减少消费者一般是在某个消费者发生异常时需要紧急关闭这个消费者,及时止损。 调整消费者线程的场景中我们对源码进行了一点修改,让应用端能获取到消费者的线程池对象,以便对线程池的核心线程数进行动态调整。这个的应用场景一般是在当某个消费者消费的数据比较多,占用过多的 CPU 资源时,导致优先级更高的消息得不到及时处理,可以先将该消费者的线程调小一些。 应用的优点
作者:孙园园
#行业实践

2022年1月20日

消息队列 RocketMQ 遇上可观测:业务核心链路可视化
引言:本篇文章主要介绍 RocketMQ 的可观测性工具在线上生产环境的最佳实践。RocketMQ的可观测性能力领先业界同类产品,RocketMQ 的 Dashboard 和消息轨迹等功能为业务核心链路保驾护航,有效应对线上大规模生产使用过程中遇到的容量规划、消息收发问题排查以及自定义监控等场景。 消息队列简介 进入主题之前,首先简要介绍下什么是阿里云的消息队列? 阿里云提供了丰富的消息产品家族,消息产品矩阵涵盖了互联网、大数据、物联网等各个业务场景的领域,为云上客户提供了多维度可选的消息解决方案。无论哪一款消息队列产品,核心都是帮助用户解决业务和系统的异步、解耦以及应对流量洪峰时的削峰填谷,同时具备分布式、高吞吐、低延迟、高可扩展等特性。 但是不同的消息产品在面向客户业务的应用中也有不同的侧重。简单来做,消息队列 RocketMQ 是业务领域的首选消息通道;Kafka 是大数据领域不可或缺的消息产品;MQTT 是物联网领域的消息解决方案;RabbitMQ 侧重于传统业务消息领域;云原生的产品集成和事件流通道是通过消息队列 MNS 来完成;最后事件总线 EventBridge 是一个阿里云上的一个事件枢纽,统一构建事件中心。 本篇主要讲的是业务领域的消息首选通道:消息队列 RocketMQ。RocketMQ 诞生于阿里的电商系统,具有高性能、低延迟、削峰填谷等能力,并且提供了丰富的在业务和消息场景上应对瞬时流量洪峰的功能,被集成在用户的核心业务链路上。 作为一个核心业务链路上的消息,就要求 RocketMQ 具备非常高的可观测性能力,用户能通过可观测性能力及时的监控定位异常波动,同时对具体的业务数据问题进行排查。由此,可观测性能力逐步成为消息队列 RocketMQ 的核心能力之一。 那么什么是可观测能力呢?下面简单对可观测能力进行介绍。 可观测能力 提到可观测能力,大家可能最先想到的是可观测的三要素:Metrics(指标)、Tracing(追踪)和 Logging(日志)。 结合消息队列的理解,可观测能力三要素的细化解释如下: Metrics:Dashborad 大盘 1)指标涵盖丰富:包含消息量、堆积量、各个阶段耗时等指标,每个指标从实例、Topic、消费 GroupID 多维度做聚合和展示; 2)消息团队最佳实践模板:为用户提供最佳模板,特别是在复杂的消费消息场景,提供了丰富的指标帮助快速定位问题,并持续迭代更新; 3)Prometheus + Grafana:Prometheus标准数据格式、利用Grafana展示,除了模板,用户也可以自定义展示大盘。 Tracing:消息轨迹 1)OpenTelemetry tracing标准:RocketMQ tracing 标准已经合并到 OpenTelemetry 开源标准,规范和丰富 messaging tracing 场景定义; 2)消息领域定制化展示:按照消息维度重新组织抽象的请求 span 数据,展示一对多的消费,多次消费信息,直观、方便理解; 3)可衔接 tracing链路上下游:消息的 tracing 可继承调用上下文,补充到完整调用链路中,消息链路信息串联了异步链路的上游和下游链路信息。 Logging:客户端日志标准化 1)Error Code标准化:不同的错误有唯一的 error code; 2)Error Message 完整:包含完整的错误信息和排序所需要的资源信息; 3)Error Level 标准化:细化了各种不同错误信息的日志级别,让用户根据 Error、Warn 等级别配置更合适和监控告警。 了解消息队列和可观测能力的基础概念,让我们来看看当消息队列 RocketMQ 遇到可观测,会产生什么样的火花? RocketMQ 的可观测性工具的概念介绍 从上文的介绍中可以看到 RocketMQ 的可观测能力能够帮助用户根据错误信息排查消息在生产和消费过程中哪些环节出了问题,为了帮助大家更好的理解功能的应用,先简要介绍下消息生产消费流程过程中的一些概念。 消息生产和消费流程概念 首先我们先明确以下几个概念: Topic:消息主题,一级消息类型,通过Topic对消息进行分类; 消息(Message):消息队列中信息传递的载体; Broker:消息中转角色,负责存储消息,转发消息; Producer:消息生产者,也称为消息发布者,负责生产并发送消息; Consumer:消息消费者,也称为消息订阅者,负责接收并消费消息。 消息生产和消费的流程简单来说就是生产者将消息发送到 topic 的 MessageQueue 上进行存储,然后消费者去消费这些 MessageQueue 上的消息,如果有多个消费者,那么一个完整的一次消息生产发生的生命周期是什么样子的? 这里我们以定时消息为例,生产者 Producer 发送消息经过一定的耗时到达 MQ Server,MQ 将消息存储在 MessageQueue,这时队列中有一个存储时间,如果是定时消息,还需要经过一定的定时时间之后才能被消费者消费,这个时间就是消息就绪的时间;经过定时的时间后消费者 Consumer 开始消费,消费者从 MessageQueue 中拉取消息,然后经过网络的耗时之后到达消费者客户端,这时候不是低码进行消费的,会有一个等待消费者资源线程的过程,等到消费者的线程资源后才开始进行真正的业务消息处理。 从上面的介绍中可以看出,业务消息有一定的耗时处理,完成之后才会向服务端返回ack的结果,在整个生产和消费的过程中,最复杂的便是消费的过程,因为耗时等原因,会经常有消息堆积的场景,下面来重点看一下在消息堆积场景下各个指标表示的含义。 消息堆积场景 如上图,消息队列中,灰色部分的消息表示是已完成的消息量,就是消费者已处理完成并返回 ack 的消息;橙色部分的消息表示这些消息已经被拉取到消费者客户端,正在被处理中,但是还没有返回处理结果的消息,这个消息其实有一个非常重要的指标,就是消息处理耗时;最后绿色的消息表示这些消息在已经发生的 MQ 队列中已存储完成,并且已经是可被消费者消费的一个状态,称为已就绪的消息。 _已就绪消息量(Ready messages):_ _含义:已就绪消息的消息的条数。_ _作用:消息量的大小反映还未被消费的消息规模,在消费者异常情况下,就绪消息量会变多。_ _消息排队时间(Queue time)_ _含义:最早一条就绪消息的就绪时间和当前时间差。_ _作用:这个时间大小反映了还未被处理消息的时间延迟情况,对于时间敏感的业务来说是非常重要的度量指标。_ RocketMQ 的可观测性工具的功能介绍 结合上文介绍的消息队列 RocketMQ 可观测概念,下面具体对 RocketMQ 的可观测性工具的两个核心功能进行介绍。 可观测功能介绍 Dashboard Dashboard 大盘可以根据各种参数查看指定的指标数据,主要的指标数据包含下面三点: 1)Overview(概览): 查看实例据总的消息收发量、TPS、消息类型分布情况。 查看是的各个指标当前的分布和排序情况:发送消息量最多的 Topic、消费消息量最多的 GroupID、堆积消息量最多的 GroupID、排队时间最长的 GroupID 等。 2)Topic(消息发送): 查看指定 Topic 的发送消息量曲线图。 查看指定 Topic 的发送成功率曲线图。 查看指定 Topic 的发送耗时曲线图。 3)GroupID(消息消费): 查看指定 Group 订阅指定 Topic 的消息量曲线图。 查看指定 Group 订阅指定 Topic 的消费成功率。 查看指定 Group 订阅指定 Topic 的消费耗时等指标。 查看指定 Group 订阅指定 Topic 的消息堆积相关指标。 可观测功能介绍 消息轨迹 在 Tracing 方面提供了消息轨迹功能,主要包含以下三方面能力: 1)便捷的查询能力:可根据消息基本信息查询相关的轨迹;二期还可以根据结果状态、耗时时长来过滤查询,过滤出有效轨迹快速定位问题。 2)详细的 tracing 信息:除了各个生命周期的时间和耗时数据,还包含了生产者、消费者的账号和机器信息。 3)优化展示效果:不同的消息类型轨迹;多个消费 GroupID 的场景;同个消费 GroupID 多次重投的场景等。 最佳实践 场景一:问题排查 1)目标:消息生产消费健康情况 2)原则 一级指标:用来报警的指标,公认的没有异议的指标。 二级指标:一级指标发生变化的时候,通过查看二级指标,能够快速定位问题的原因所在。 三级指标:定位二级指标波动原因。根据各自业务的特点和经验添加。 基于目标和原则,生产者用户和消费者用户问题排查和分析方式如下: 场景二:容量规划 容量规划场景下只要解决下面三个问题: 1)问题一:怎样评估实例容量? 解决方法: 实例详情页》查看指定实例数据统计,可以看到所选时间段内的最大消息收发的 TPS 峰值。 铂金版实例可以根据这个数据来添加报警监控和判断业务。 2)问题二:怎样查看标准版实例的消耗 解决方法: 可以查看概览总消息量模块 3)问题三:有哪些已下线,需要清理资源? 解决方法: 指定一段时间内(例如近一周),按 Topic 的消息发送量由小到大排序,查看是否有消息发送量为 0 的 Topic,这些 Topic 相关的业务或许已下线。 指定一段时间内(例如近一周),按 GroupID 的消息消费量由小到大排序,查看是否有消息消费量为 0 的 GroupID,这些 GroupID 相关的业务或许已下线。 场景三:业务规划 业务规划场景下主要解决以下三个问题: 1)问题一:如何查看业务峰值分布情况? 解决方法: 查看 Topic 消息接收量的每天的高峰时间段。 查看 Topic 消息接收量周末和非周某的消息量差别。 查看 Topic 消息接收量节假日的变化情况。 2)问题二:如何判断目前哪些业务有上升趋势? 解决方法: 查看消息量辅助判断业务量变化趋势。 3)问题三 :怎样优化消费者系统性能? 解决方法: 查看消息处理耗时,判断是否在合理范围内有提升的空间。 本篇文章通过消息队列、可观测能力、RocketMQ 可观测概念及功能和最佳实践的介绍,呈现了 RocketMQ 的可观测性工具在业务核心链路上的可视化能力,希望给大家在日常的线上的一些问题排查和运维过程中带来一些帮助。
作者:文婷、不周
#行业实践 #可观测

2022年1月14日

全链路灰度之 RocketMQ 灰度
之前的系列文章中,我们已经通过全链路金丝雀发布这个功能来介绍了 MSE 对于全链路流量控制的场景,我们已经了解了 Spring Cloud 和 Dubbo 这一类 RPC 调用的全链路灰度应该如何实现,但是没有涉及到消息这类异步场景下的流量控制,今天我们将以上次介绍过的《》中的场景为基础,来进一步介绍消息场景的全链路灰度。 虽然绝大多数业务场景下对于消息的灰度的要求并不像 RPC 的要求得这么严格,但是在以下两个场景下,还是会对消息的全链路有一定的诉求的。 1、第一种场景是在消息消费时,可能会产生新的 RPC 调用,如果没有在消息这一环去遵循之前设定好的全链路流量控制的规则,会导致通过消息产生的这部分流量“逃逸”,从而导致全链路灰度的规则遭到破坏,导致出现不符合预期的情况。 为了防止出现这个情况,我们需要在消费时候将消息里原来的流量标复原,并在 RPC 调用的时候遵循原来的规则。我们通过架构图来详细描述一下,满足这个逻辑之后,调用链路是怎样的,从下图中我们可以看到,灰度和基线环境生产出来的消息,虽然在消息推送的时候是随机的,但是在消费过程中,产生的新的 RPC 调用,还是能够回到流量原来所属的环境。 2、第二种场景需要更加严格的消息灰度隔离。比如当消息的消费逻辑进行了修改时,这时候希望通过小流量的方式来验证新的消息消费逻辑的正确性,要严格地要求灰度的消息只能被推送给灰度的消息消费者。 今天我们就来实操一下第二种场景消息的全链路灰度,目前 MSE 仅支持 RocketMQ 消息的灰度。若您使用的是开源版 RocketMQ,那么版本需要在 4.5.0 及以上,若您使用的是阿里云商业版 RocketMQ,那么需要使用铂金版,且 Ons Client 版本在 1.8.0.Final 及以上。如果只是想使用第一种场景,只需要给 B 应用开启全链路灰度的功能即可,不需要做额外的消息灰度相关的配置。 在这次最佳实践的操作中,我们是将应用部署在阿里云容器服务 Kubernetes 版本,即 ACK 集群来演示,但是事实上,消息灰度对于应用的部署模式是没有限制性要求的,您可以参考 MSE 帮助文档,找到自己所使用的部署模式对应的接入方式,也能使用消息全链路灰度。 前提条件 1. 开通 MSE 专业版,请参见开通 MSE 微服务治理专业版[1]。 2. 创建 ACK 集群,请参见创建 Kubernetes 集群[2]。 操作步骤 步骤一:接入 MSE 微服务治理 1、安装 mseackpilot 1. 登录容器服务控制台[3]。 2. 在左侧导航栏单击市场 应用目录。 3. 在应用目录页面点击阿里云应用,选择微服务,并单击 ackmsepilot。 4. 在 ackmsepilot 页面右侧集群列表中选择集群,然后单击创建。 安装 MSE 微服务治理组件大约需要 2 分钟,请耐心等待。 创建成功后,会自动跳转到目标集群的 Helm 页面,检查安装结果。如果出现以下页面,展示相关资源,则说明安装成功。 2、为 ACK 命名空间中的应用开启 MSE 微服务治理 1. 登录 MSE 治理中心控制台[4],如果您尚未开通 MSE 微服务治理,请根据提示开通。 2. 在左侧导航栏选择微服务治理中心 Kubernetes 集群列表。 3. 在 Kubernetes 集群列表页面搜索框列表中选择集群名称或集群 ID,然后输入相应的关键字,单击搜索图标。 4. 单击目标集群操作列的管理。 5. 在集群详情页面命名空间列表区域,单击目标命名空间操作列下的开启微服务治理。 6. 在开启微服务治理对话框中单击确认。 步骤二:还原线上场景 首先,我们将分别部署  springcloudzuul、springclouda、springcloudb、springcloudc 这四个业务应用,以及注册中心 Nacos Server 和消息服务 RocketMQ Server,模拟出一个真实的调用链路。 Demo 应用的结构图下图,应用之间的调用,既包含了 Spring Cloud 的调用,也包含了 Dubbo 的调用,覆盖了当前市面上最常用的两种微服务框架。其中 C 应用会生产出 RocketMQ 消息,由 A 应用进行消费,A 在消费消息时,也会发起新的调用。这些应用都是最简单的 Spring Cloud 、 Dubbo 和 RocketMQ 的标准用法,您也可以直接在 项目上查看源码。 部署之前,简单介绍一下这个调用链路 springcloudzuul 应用在收到 “/A/dubbo” 的请求时,会把请求转发给 springclouda ,然后 springclouda 通过 dubbo 协议去访问 springcloudb, springcloudb 也通过 dubbo 协议去访问 springcloudc,springcloudc 在收到请求后,会生产一个消息,并返回自己的环境标签和 ip。这些生产出来的消息会由 springclouda 应用消费,springclouda 应用在消费消息的时候,会通过 spring cloud 去调用 B,B 进而通过 spring cloud 去调用 C,并且将结果输出到自己的日志中。 当我们调用 /A/dubbo 的时候 返回值是这样 A[10.25.0.32] B[10.25.0.152] C[10.25.0.30] 同时,A 应用在接收到消息之后,输出的日志如下 20211228 10:58:50.301 INFO 1 [essageThread_15] c.a.mse.demo.service.MqConsumer : topic:TEST_MQ,producer:C[10.25.0.30],invoke result:A[10.25.0.32] B[10.25.0.152] C[10.25.0.30] 熟悉了调用链路之后,我们继续部署应用,您可以使用 kubectl 或者直接使用 ACK 控制台来部署应用。部署所使用的 yaml 文件如下,您同样可以直接在 上获取对应的源码。 部署 Nacos Server apiVersion: apps/v1 kind: Deployment metadata: name: nacosserver spec: selector: matchLabels: app: nacosserver template: metadata: annotations: labels: app: nacosserver spec: containers: env: name: MODE value: "standalone" image: registry.cnshanghai.aliyuncs.com/yizhan/nacosserver:latest imagePullPolicy: IfNotPresent name: nacosserver ports: containerPort: 8848 apiVersion: v1 kind: Service metadata: name: nacosserver spec: type: ClusterIP selector: app: nacosserver ports: name: http port: 8848 targetPort: 8848 部署业务应用 apiVersion: apps/v1 kind: Deployment metadata: name: springcloudzuul spec: selector: matchLabels: app: springcloudzuul template: metadata: annotations: msePilotCreateAppName: springcloudzuul labels: app: springcloudzuul spec: containers: env: name: JAVA_HOME value: /usr/lib/jvm/java1.8openjdk/jre name: enable.mq.invoke value: 'true' image: registry.cnshanghai.aliyuncs.com/yizhan/springcloudzuul:1.0.0 imagePullPolicy: Always name: springcloudzuul ports: containerPort: 20000 apiVersion: v1 kind: Service metadata: annotations: service.beta.kubernetes.io/alibabacloudloadbalancerspec: slb.s1.small service.beta.kubernetes.io/alicloudloadbalanceraddresstype: internet name: zuulslb spec: ports: port: 80 protocol: TCP targetPort: 20000 selector: app: springcloudzuul type: LoadBalancer status: loadBalancer: {} apiVersion: apps/v1 kind: Deployment metadata: name: springclouda spec: selector: matchLabels: app: springclouda template: metadata: annotations: msePilotCreateAppName: springclouda labels: app: springclouda spec: containers: env: name: JAVA_HOME value: /usr/lib/jvm/java1.8openjdk/jre image: registry.cnshanghai.aliyuncs.com/yizhan/springclouda:1.0.0 imagePullPolicy: Always name: springclouda ports: containerPort: 20001 livenessProbe: tcpSocket: port: 20001 initialDelaySeconds: 10 periodSeconds: 30 apiVersion: apps/v1 kind: Deployment metadata: name: springcloudb spec: selector: matchLabels: app: springcloudb template: metadata: annotations: msePilotCreateAppName: springcloudb labels: app: springcloudb spec: containers: env: name: JAVA_HOME value: /usr/lib/jvm/java1.8openjdk/jre image: registry.cnshanghai.aliyuncs.com/yizhan/springcloudb:1.0.0 imagePullPolicy: Always name: springcloudb ports: containerPort: 20002 livenessProbe: tcpSocket: port: 20002 initialDelaySeconds: 10 periodSeconds: 30 apiVersion: apps/v1 kind: Deployment metadata: name: springcloudc spec: selector: matchLabels: app: springcloudc template: metadata: annotations: msePilotCreateAppName: springcloudc labels: app: springcloudc spec: containers: env: name: JAVA_HOME value: /usr/lib/jvm/java1.8openjdk/jre image: registry.cnshanghai.aliyuncs.com/yizhan/springcloudc:1.0.0 imagePullPolicy: Always name: springcloudc ports: containerPort: 20003 livenessProbe: tcpSocket: port: 20003 initialDelaySeconds: 10 periodSeconds: 30 apiVersion: apps/v1 kind: Deployment metadata: name: rockectmqbroker spec: selector: matchLabels: app: rockectmqbroker template: metadata: labels: app: rockectmqbroker spec: containers: command: sh mqbroker 'n' 'mqnamesrv:9876' 'c /home/rocketmq/rocketmq4.5.0/conf/broker.conf' env: name: ROCKETMQ_HOME value: /home/rocketmq/rocketmq4.5.0 image: registry.cnshanghai.aliyuncs.com/yizhan/rocketmq:4.5.0 imagePullPolicy: Always name: rockectmqbroker ports: containerPort: 9876 protocol: TCP containerPort: 10911 protocol: TCP containerPort: 10912 protocol: TCP containerPort: 10909 apiVersion: apps/v1 kind: Deployment metadata: name: rocketmqnameserver spec: selector: matchLabels: app: rocketmqnameserver template: metadata: labels: app: rocketmqnameserver spec: containers: command: sh mqnamesrv env: name: ROCKETMQ_HOME value: /home/rocketmq/rocketmq4.5.0 image: registry.cnshanghai.aliyuncs.com/yizhan/rocketmq:4.5.0 imagePullPolicy: Always name: rocketmqnameserver ports: containerPort: 9876 protocol: TCP containerPort: 10911 protocol: TCP containerPort: 10912 protocol: TCP containerPort: 10909 protocol: TCP apiVersion: v1 kind: Service metadata: name: mqnamesrv spec: type: ClusterIP selector: app: rocketmqnameserver ports: name: mqnamesrv98769876 port: 9876 targetPort: 9876 安装成功后,示例如下: ➜ ~ kubectl get svc,deploy NAME TYPE CLUSTERIP EXTERNALIP PORT(S) AGE service/kubernetes ClusterIP 192.168.0.1 443/TCP 7d service/mqnamesrv ClusterIP 192.168.213.38 9876/TCP 47h service/nacosserver ClusterIP 192.168.24.189 8848/TCP 47h service/zuulslb LoadBalancer 192.168.189.111 123.56.253.4 80:30260/TCP 47h NAME READY UPTODATE AVAILABLE AGE deployment.apps/nacosserver 1/1 1 1 4m deployment.apps/rockectmqbroker 1/1 1 1 4m deployment.apps/rocketmqnameserver 1/1 1 1 5m deployment.apps/springclouda 1/1 1 1 5m deployment.apps/springcloudb 1/1 1 1 5m deployment.apps/springcloudc 1/1 1 1 5m deployment.apps/springcloudzuul 1/1 1 1 5m 同时这里我们可以通过 zuulslb 来验证一下刚才所说的调用链路 ➜ ~ curl http://123.56.253.4/A/dubbo A[10.25.0.32] B[10.25.0.152] C[10.25.0.30] 步骤三:开启消息灰度功能 现在根据控制台的提示,在消息的生产者 springcloudc 和消息的消费者 springclouda 都开启消息的灰度。我们直接通过 MSE 的控制台开启,点击进入应用的详情页,选择“消息灰度”标签。 可以看到,在未打标环境忽略的标签中,我们输入了 gray,这里意味着,带着 gray 环境标的消息,只能由 springcloudagray 消费,不能由 springclouda 来消费。 _1、这里需要额外说明一下,因为考虑到实际场景中,springcloudc 应用和 springclouda  应用的所有者可能不是同一个人,不一定能够做到两者同时进行灰度发布同步的操作,所以在消息的灰度中,未打标环境默认的行为是消费所有消息。这样 springcloudc 在进行灰度发布的时候,可以不需要强制 springclouda 应用也一定要同时灰度发布。_ _2、我们把未打标环境消费行为的选择权交给 springclouda 的所有者,如果需要实现未打标环境不消费 cgray 生产出来的消息,只需要在控制台进行配置即可,配置之后实时生效。_ 使用此功能您无需修改应用的代码和配置。 消息的生产者和消息的消费者,需要同时开启消息灰度,消息的灰度功能才能生效。 消息类型目前只支持 RocketMQ,包含开源版本和阿里云商业版。 如果您使用开源 RocketMQ,则 RocketMQ Server 和 RocketMQ Client 都需要使用 4.5.0 及以上版本。 如果您使用阿里云 RocketMQ,需要使用铂金版,且 Ons Client 使用 1.8.0.Final 及以上版本。 开启消息灰度后,MSE 会修改消息的 Consumer Group。例如原来的 Consumer Group 为 group1,环境标签为 gray,开启消息灰度后,则 group 会被修改成 group1_gray,如果您使用的是阿里云 RocketMQ ,请提前创建好 group。 默认使用 SQL92 的过滤方式,如果您使用的开源 RocketMQ,需要在服务端开启此功能(即在 broker.conf 中配置 enablePropertyFilter=true)。 默认情况下,未打标节点将消费所有环境的消息,若需要指定 未打标环节点 不消费 某个标签环境生产出来的消息,请配置“未打标环境忽略的标签”,修改此配置后动态生效,无需重启应用。 步骤四:重启节点,部署新版本应用,并引入流量进行验证 首先,因为开启和关闭应用的消息灰度功能后都需要重启节点才能生效,所以首先我们需要重启一下 springclouda 和 springcloudc 应用,重启的方式可以在控制台上选择重新部署,或者直接使用 kubectl 命令删除现有的 pod。 然后,继续使用 yaml 文件的方式在 Kubernetes 集群中部署新版本的 springcloudagray、springcloudbgray 和 springcloudcgray apiVersion: apps/v1 kind: Deployment metadata: name: springcloudagray spec: selector: matchLabels: app: springcloudagray template: metadata: annotations: alicloud.service.tag: gray msePilotCreateAppName: springclouda labels: app: springcloudagray spec: containers: env: name: JAVA_HOME value: /usr/lib/jvm/java1.8openjdk/jre image: registry.cnshanghai.aliyuncs.com/yizhan/springclouda:1.0.0 imagePullPolicy: Always name: springcloudagray ports: containerPort: 20001 livenessProbe: tcpSocket: port: 20001 initialDelaySeconds: 10 periodSeconds: 30 apiVersion: apps/v1 kind: Deployment metadata: name: springcloudbgray spec: selector: matchLabels: app: springcloudbgray template: metadata: annotations: alicloud.service.tag: gray msePilotCreateAppName: springcloudb labels: app: springcloudbgray spec: containers: env: name: JAVA_HOME value: /usr/lib/jvm/java1.8openjdk/jre image: registry.cnshanghai.aliyuncs.com/yizhan/springcloudb:1.0.0 imagePullPolicy: Always name: springcloudbgray ports: containerPort: 20002 livenessProbe: tcpSocket: port: 20002 initialDelaySeconds: 10 periodSeconds: 30 apiVersion: apps/v1 kind: Deployment metadata: name: springcloudcgray spec: selector: matchLabels: app: springcloudcgray template: metadata: annotations: alicloud.service.tag: gray msePilotCreateAppName: springcloudc labels: app: springcloudcgray spec: containers: env: name: JAVA_HOME value: /usr/lib/jvm/java1.8openjdk/jre image: registry.cnshanghai.aliyuncs.com/yizhan/springcloudc:1.0.0 imagePullPolicy: Always name: springcloudcgray ports: containerPort: 20003 livenessProbe: tcpSocket: port: 20003 initialDelaySeconds: 10 periodSeconds: 30 部署完成之后,我们引入流量,并进行验证 1. 登录 MSE 治理中心控制台[4],选择应用列表。 2. 单击应用 springclouda 应用详情菜单,此时可以看到,所有的流量请求都是去往 springclouda 应用未打标的版本,即稳定版本。 3. 点击页面下方的 标签路由中的添加按钮,为 springclouda 应用的 gray 版本设置灰度规则。 4. 发起流量调用,我们通过 zuulslb,分别发起流量调用,并查看灰度的情况。 我们通过 springclouda 和 springcloudagray 的日志去查看消息消费的情况。可以看到,消息的灰度功能已经生效, springcloudagray 这个环境,只会消费带有 gray 标的消息,springclouda 这个环境,只会消费未打标的流量生产出来的消息。 在截图中我们可以看见,springcloudagray 环境输出的日志  topic:TEST_MQ, producer: Cgray [10.25.0.102] , invoke result: Agray[10.25.0.101] Bgray[10.25.0.25] Cgray[10.25.0.102], springcloudagray 只会消费 Cgray 生产出来的消息,而且消费消息过程中发起的 Spring Cloud 调用,结果也是 Agray[10.25.0.101] Bgray[10.25.0.25] Cgray[10.25.0.102],即在灰度环境闭环。 而 springclouda 这个环境,输出的日志为 topic:TEST_MQ,producer:C[10.25.0.157],invoke result:A[10.25.0.100] B[10.25.0.152] C[10.25.0.157],只会消费 C 的基线环境生产出来的消息,且在这个过程中发起的 Spring Cloud 调用,也是在基线环境闭环。 步骤五:调整消息的标签过滤规则,并进行验证 因为考虑到实际场景中,springcloudc 应用和 springclouda  应用的所有者可能不是同一个人,不一定能够做到两者同时进行灰度发布同步的操作,所以在消息的灰度中,未打标环境默认的行为是消费所有消息。这样 springcloudc 在进行灰度发布的时候,可以不需要强制 springclouda 应用也一定要同时灰度发布,且使用相同的环境标。 springclouda 在消费时候,未打标环境的行为的选择权是交给 springclouda 的所有者,如果需要实现未打标环境不消费 cgray 生产出来的消息,只需要在控制台进行配置即可,配置之后实时生效。 1. 调整 springclouda 未打标环境的过滤规则。比如这里我们要选择未打标环境不再消费 gray 环境生产出来的消息,只需要在“未打标环境忽略的标签”里面选择 gray,然后点击确定即可。 2. 调整规则之后,规则是可以动态地生效,不需要进行重启的操作,我们直接查看 springclouda 的日志,验证规则调整生效。 从这个日志中,我们可以看到,此时基线环境可以同时消费 gray 和 基线环境生产出来的消息,而且在消费对应环境消息时产生的 Spring Cloud 调用分别路由到 gray 和 基线环境中。 操作总结 1. 全链路消息灰度的整个过程是不需要修改任何代码和配置的。 2. 目前仅支持 RocketMQ,Client 版本需要在 4.5.0 之后的版本。RocketMQ Server 端需要支持 SQL92 规则过滤,即开源 RocketMQ 需要配置 enablePropertyFilter=true,阿里云 RocketMQ 需要使用铂金版。 3. 开启消息灰度后,MSE Agent 会修改消息消费者的 group,如原来的消费 group 为 group1,环境标签为 gray,则 group 会被修改成 group1_gray,如果使用的是阿里云 RocketMQ,需要提前创建好修改后的 group。 4. 开启和关闭消息灰度后,应用需要重启才能生效;修改未打标环境忽略的标签功能可以动态生效,不需要重启。 相关链接 [1] MSE 微服务治理专业版: [2] Kubernetes 集群: [3] 容器服务控制台: [4] MSE 治理中心控制台
作者:亦盏
#行业实践 #功能特性

2021年12月15日

重新定义分析 - EventBridge实时事件分析平台发布
对于日志分析大家可能并不陌生,在分布式计算、大数据处理和 Spark 等开源分析框架的支持下,每天可以对潜在的数百万日志进行分析。 事件分析则和日志分析是两个完全不同的领域,事件分析对实时性的要求更高,需要磨平事件领域中从半结构化到结构化的消息转换管道,实现查询检索,可视化等功能。但是目前针对流式的事件做分析的可用工具非常少,这对于期望使用Serverless架构或 EDA(事件驱动)架构的开发者会非常不便。(更多 EDA 架构介绍参考 :) 基于事件的特征,无法追溯事件内容,无法跟踪事件流转,无法对事件做可视化分析成为了事件驱动架构演进的绊脚石。为了解决事件领域中针对流式事件做分析的难题,EventBridge 近日发布了针对事件/消息领域的全新分析工具EventBridge 实时事件分析平台。下面简要对 EventBridge 实时事件分析平台的内容进行介绍。 EventBridge 实时事件分析平台简介_ EventBridge 实时事件分析平台依托基于事件的实时处理引擎,提供数值检索、可视化分析、多组态分析、事件轨迹、事件溯源和 Schema 管理等能力。EventBridge 实时事件分析平台具有无入侵、无需数据上报,低成本,操作快捷等特点,通过简单的引导式交互,即可快速实现基于事件的流式查询与分析。 EventBridge 实时事件分析平台依托基于事件的实时处理引擎,提供数值检索,可视化分析,多组态分析,事件轨迹,事件溯源,Schema 管理等能力。EventBridge 实时事件具有无入侵,无需数据上报,低成本,操作快捷等特点,通过简单的引导式交互,即可快速实现基于事件的流式查询与分析。 核心功能 多场景支持 目前市面上比较流行的是事件查询平台,但是分析和查询还是有些本质区别,分析基于查询,但是查询并不是分析的全部。 EventBridge 构建了一套完整的事件工具链,帮助开发,运维,甚至运营团队更高效的使用分析工具,统一在一个分析平台上无缝整合全部事件,提供高效、可靠、通用的事件分析能力。 Serverless 领域:得益于 Serverless 架构的推广,事件驱动被更多用在企业核心链路。无服务器的定义是不必管理任何基础设施,但是无服务器的不透明且难以调试却是整个架构必需解决的痛点,当我们配置完触发器后不会知道什么数据在什么时刻触发了函数,触发链路是否异常。EventBridge 事件分析能力将彻底解决 Serverless触发数据黑箱的问题,让所有事件触发都清晰可见。 微服务领域:微服务在现代开发架构中比较常见,该架构由小型、松耦合、可独立部署的服务集合而成,这导致微服务架构很难调试,系统中某一部分的小故障可能会导致大规模服务崩溃。很多时候不得不跳过某些正常服务来调试单个请求。EventBridge 事件分析可将全部链路微服务消息通过事件 ID 染色做有效追踪与排障,帮助微服务做可视化排障。 消息领域:在传统消息领域,消息 Schema 管理、消息内容检索一直是无法解决的难题,大部分情况下需要增加订阅者来对消息做离线分析。EventBridge 事件分析平台提供消息 Schema 管理与消息内容查询能力,为消息可视化提供更完全的解决方案。 云产品领域:云产品在极大程度降低了企业对基础设施建设的复杂性,但同样带来了诸多问题,以 ECS 为例,很多情况会因系统错误或云盘性能受损而触发故障类事件,这类事件通常会涉及到周边产品(比如 ACK 等),捕获全部云上事件做基础排障的挑战性比较大。EventBridge 支持全部云服务事件无缝接入,更大程度降低由云产品变更导致的运维故障。 EventBridge 提供更高效、通用的事件分析平台,基于该平台可以解决大部分场景对事件分析、事件查询、事件轨迹的诉求。 开箱即用 支持提供 Schema 管理,数值检索,可视化分析,多组态分析,事件轨迹,事件溯源等核心能力,无需额外部署,即开即用。 数值检索:提供基础数值检索能力,支持键入 key,value ,= ,!= , exists ,AND,OR 等参数,满足事件检索场景的基本诉求。 可视化分析:提供 GROUP BY,ORDER BY 等可视化分析能力,支持多组态,多图表,多维度分析能力。 链路追踪:提供事件轨迹能力,还原事件整体链路状态。帮助开发者快速排障,快速定位链路问题。 低成本接入 EventBridge 支持以事件总线(EventBus)形式接入,分为云服务事件总线和自定义事件总线。云服务总线支持几乎全部阿里云产品事件,无缝支持云服务事件接入事件分析平台;自定义事件总线支持 RocketMQ、Kafka 或其他自定义事件接入(当前版本仅支持少量云服务事件)。 整体接入流程较为简单,对原有业务入侵小,可随时关闭或开启事件分析,同时实现在线配置,且具备实时生效功能。 总结_ EventBridge 提供更便捷高效的事件分析工具,可以帮助开发人员简单定义查询条件,及时进行可视化的事件内容分析。
作者:肯梦
#技术探索 #生态集成
收藏
收藏暂无数据,请从小助手对话框添加
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
无疑 AI答疑专家
当前服务输出的内容均由人工智能模型生成,其生成内容的准确性和完整性无法保证,不代表我们的态度或观点。
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
专家答疑