2022年6月26日

EventBridge 在 SaaS 企业集成领域的探索与实践
当下降本增效是各行各业的主题,而 SaaS 应用作为更快触达和服务业务场景的方式则被更多企业熟知和采用。随着国内 SaaS 商业环境的逐渐成熟,传统企业中各个部门的工程师和管理者,能迅速决定采购提升效率的 SaaS 产品,然后快速投入生产和使用。但是随着行业 SaaS 越来越多,如何解决各个 SaaS 系统的数据孤岛,如何将SaaS 应用数据与现有系统数据进行打通,已然变成了企业使用 SaaS 的瓶颈。因此,业内也广泛提出 B2B integration 企业集成的概念。 本文将结合实际业务场景讲述在 SaaS 行业的洞察与 SaaS 集成的探索实践。 什么是 SaaS SaaS 概述 SaaS(SoftwareasaService,软件即服务)源自于美国 Salesforce 公司(1999 年创立)创造的新软件服务模式。相比于传统软件,用户使用的 SaaS 软件,其数据保存在云端(国内有很多行业 SaaS 由于其数据敏感会单独部署在客户 IDC)。而且,SaaS 公司提供给客户的服务通常是按需租用的,比如按年缴纳使用费5年,第二年再续费,如果不满意也可以不续费,这会大大激发 SaaS 创业公司持续的打磨产品、持续的为客户提供更大价值的动力。 SaaS 典型分类 SaaS 分类比较繁琐,一般有两个分类维度。一个维度按照使用场景,另一个维度按照商业价值。 SaaS 一般分类为 通用 SaaS 和 行业 SaaS 两个基础类。 通用 SaaS 顾名思义是通用的,跨行业的,比如钉钉即时通讯或者某司的 HR 产品,由于使用场景更广,因而客群也会更多。 行业 SaaS 是在某个行业内使用的产品,比如餐饮企业 SaaS、电商 SaaS 等。 当然,还有第二个维度是工具 SaaS 和 商业 SaaS。 工具 SaaS,为客户企业提供一个提高管理效率的工具;商业 SaaS,除了提供一部分“工具”价值外,还能为客户企业提供增值价值,比如增加营收、获得资金等。 商业 SaaS 产品虽然风险更大,但在国内特色的商业环境、管理水平及人才结构下,更容易快速实现客户价值和自我价值。 SaaS 在中国的发展历程 对于 SaaS 领域来讲,云服务的普及提振了大家对 SaaS 服务稳定性和数据安全性的信心。同时,人口红利消退使得 SaaS 成本优势凸显。当下疫情环境也加快了市场教育,企业主转变思路,降本增效的需求显著上升。随着整个行业的渗透率加快,SaaS 场景和行业越做越深,SaaS 市场可以遇见在未来会有高速的增长。很多企业会在新业务场景使用 SaaS 服务,小步快跑试错,解决活下来的问题,而不是重复造轮子。 什么是 B2B ? B2B 即 BusinesstoBusiness (B2B) integration 是指将两个或者多个组织之间的业务流程和通讯自动化,通过自动化关键业务流程,实现不用应用和组织关系的打通,有效促进应用提供方和客户之间的数据打通与合作。 可以断言,随着 SaaS 行业逐渐渗透,企业集成的诉求会逐渐增多。数据同步、用户同步、接口同步的诉求会逐步增多,包括自建服务与 SaaS 服务的打通,SaaS 服务与 SaaS 服务的打通等。 SaaS 集成领域场景分析 随着行业类 SaaS 的逐渐丰富,在企业生产实践中,应用和应用的数据集成和互通变得至关重要。包括 API 集成,数据集成,流程集成等场景。 API 集成 通过 API 将 SaaS 应用的业务流程串联,现阶段大部分 SaaS 集成对接都是通过标准 API 协议实现的。源端采用 WebHook 机制推送到指定 HTTP 端点,目标端则采用类似 API 接口调用的方式,主动调取执行动作。 实现结构如下: 业内通用方案通几乎均为同步方案,通过 API + 中间网关对调用做解耦和映射。该方案的主要问题是调用追溯难;其次如出现上下游接口限制不一致问题,会导致下游调用大量失败。 数据集成 数据集成场景主要是企业自建系统和 SaaS 系统的打通。当企业使用的行业 SaaS 逐渐增多,数据一致性问题就会变的迫在眉睫。 大部分企业通常会面临云上数据导入/同步到 SaaS 应用的场景,而业内对 SaaS 应用的数据集成方案并没有类似 CDC 场景下的 Debezium 那么标准和通用。 企业在 SaaS 集成领域的痛点 接入成本高 对大量使用 SaaS 应用的企业来讲, SaaS 集成是必须做的基础建设。但是该部分基础建设通常会消耗大量人力,由于各个行业的 SaaS 百花齐放,通常很难使用一套架构满足全部集成场景。意味着通常情况下,企业使用每一款 SaaS 都会面临 SaaS 系统与自身系统集成的困难。 异构数据多 异构数据多是集成领域又一个比较典型的特点,异构数据通常有结构化数据,非机构化数据,半结构化数据。比如企业自建关系型数据库就是典型的结构化数据,但是要被其他 SaaS 系统集成通常是 Json 这种半结构数据入参。当然这部分内容可以通过定制代码搞定,但这个思路一定不是做消息枢纽的思路; 异构数据如何高效的统一处理其实是当前 SaaS 集成亟待解决的问题,也是最大的冲突点。 分发/路由困难 当很多集成需求同时涌现时,如何对已集成数据进行合理分发,会变成集成领域又一个难以解决的问题。每个细分场景甚至每个集成链路所需要的数据内容甚至数据类型都不一样。如果路由/分发无法完成,那么企业统一集成将无法实现。 集成追踪困难 当全部采用同步链路时,这里的集成状态追踪就会变成玄学,除非将链路接入 Tracing ,但是这部分又回产生高额的改造成本,同时多源 Tracing 的复杂相对于单链路会呈几何倍数的增加。 老系统迁移困难 老系统迁移主要是数据集成部分,如果将新老系统对接,并构建统一的应用网是当下企业构建 SaaS 建设的难点。企业迫切的需要一种能将"新"“老”应用联接起来的方式,打破企业应用发展的时间与空间界限,协同企业原有核心数据资产及创新应用,构建可平滑演进的企业IT架构。 EventBridge 一站式企业 SaaS 集成方案 针对业内 SaaS 系统集成的种种痛点,EventBridge 推出一站式企业 SaaS 集成方案。通过收敛 SaaS 集成痛点诉求,EventBridge 推出 API 集成方案和数据集成方案,打通应用与应用,云与应用的连接。 低代码集成平台 提供完全托管的集成平台服务,仅需在控制台进行简单配置即可在几分钟内完成应用集成。客户无需关心底层资源及实现细节即可打通云下到云上,SaaS 到 SaaS 的集成与连接,轻松完成异构数据接入。 金融级稳定性 满足不同客户企业级集成项目的要求,提供高可用性、灾难恢复、安全性、服务等级协议(SLA)和全方面的技术支持。 全方位的集成能力 支持各种集成场景,打通云上云下企业应用、物联网、设备及合作伙伴之间的信息孤岛。支持事件规则,事件路由等多种路由方式,实现跨云跨地域互通和信息共享。同时强大的链路追踪能力可以帮助企业快速排障。 开放的平台 拥抱 CloudEvents 社区,提供标准化的事件集成方案。提供丰富的开发者工具,拥有海量的生态伙伴及开发者,丰富开箱即用的连接器和应用组件可以帮助加速企业业务创新。 EventBridge 在 SaaS 领域的典型应用场景 SaaS 应用同步 应用同步是指在特定时间点将一组特定的事件从一个系统迁移到另一个系统的行为。事件同步模式允许开发人员创建数据自动迁移集成服务;业务人员和开发人员可以通过配置集成应用,自动化的将特定范围内的数据传递到下游应用;创建可重用的服务可以为开发和运营团队节省大量时间。 例如: 把销售机会数据从一个旧式 CRM (客户关系管理) 系统迁移到新的 CRM 实例; 把销售订单数据从一个 CRM 组织迁移到另一个组织; 从 ERP (企业资源计划) 同步产品主数据到 CRM 系统中。 事件广播 事件广播是在连续的、近实时或实时的基础上将事件从单个源系统移动到多个目标系统的行为。本质上,它是一对多的单向同步。通常,“单向同步”表示 1:1 关系。但是,广播模式也可以是 1:n(n 大于 1)的关系。 例如: 当一个销售机会在 CRM 中被标记为成功关单的时候,应在 ERP 中创建销售订单。 SaaS 应用通知 事件通知是指当 SaaS 应用发生某个类型的事件,可以通过钉钉,短信等通知方式告知用户。用户可及时获取到关键事件信息。 例如: 当一个销售机会在 CRM 中被标记为重要商机的时候,会及时通知给其他同事进行跟进并关注。 自建系统到云上迁移 EventBridge 支持云上数据库、云上消息队列、云产品事件对接 SaaS 系统,完善企业用户建设应用一张网的诉求,打破企业应用发展的时间和空间界限,协同企业原有核心资产与 SaaS 系统,构建可演进的企业 IT 架构。 例如: 当引入一个新的 SaaS 应用时,可通过 EventBridge 将数据库/大数据平台的核心资产(如人员信息等)同步至 SaaS 应用。
作者:肯梦
#行业实践 #生态集成

2022年5月17日

云钉一体:EventBridge 联合钉钉连接器打通云钉生态
背景 以事件集成阿里云,从 EventBridge 开始”是 EventBridge 这款云产品的愿景和使命。作为一款无服务器事件总线服务,EventBridge 从发布以来,以标准化的 CloudEvents 1.0 协议连接了大量云产品和云事件,用户可以通过 EventBridge 轻松访问云上事件,驱动云上生态。 截止目前为止,EventBridge 已集成 85+ 阿里云产品,提供了 941+ 事件类型,集成 50+ SaaS产品,通过事件规则可轻松驱动 10+ 阿里系一方云产品的计算力。 另一方面,钉钉生态空前繁荣,拥有 4000+ 家的生态伙伴,包括 ISV 生态伙伴、硬件生态伙伴、服务商、咨询生态和交付生态伙伴等。通过事件将钉钉生态与阿里云生态联通,是践行「云钉一体」战略的重要途径,EventBridge 作为阿里云标准化的事件枢纽,其重要性不言而喻。 今天,EventBridge 联合钉钉连接器,打通了钉钉生态和阿里云生态,钉钉的生态伙伴可以通过通道的能力驱动阿里云上海量的计算力。 关键技术 EventBridge 集成阿里云和钉钉生态的方案,核心能力由钉钉连接器和 EventBridge 的 HTTP 事件源能力提供。 钉钉连接器 钉钉连接平台通过可视化拖拽配置、一键订阅等零代码方式,简单高效的实现钉钉、企业内部系统、知名厂商系统(金蝶、用友、SAP 等)、钉钉第三方企业应用之间数据互通和集成。 近期,钉钉连接器在「连接流」中发布了「HTTP Webhook」的执行动作能力,支持将钉钉生态开放给外部生态,EventBridge 正是通过该能力将钉钉生态接入到阿里云生态。 EventBridge HTTP 事件源 事件源是事件驱动的基石,如何获取更多事件源也是 EventBridge 一直在探索和尝试的方向。针对市场上其他云厂商和垂直领域的 Saas 服务,EventBridge 发布了 HTTP 事件源能力,提供简单且易于集成的三方事件推送 ,帮助客户更加高效、便捷地实现业务上云。 具体而言,HTTP 事件源是 EventBridge 支持的事件源的一种,它以 Webhook 形式暴露了发布事件的 HTTP 请求地址,用户可以在有 URL 回调的场景配置 HTTP  事件源,或者直接使用最简单的 HTTP 客户端来完成事件的发布。HTTP  事件源提供了支持 HTTP 与 HTTPS,公网与阿里云 VPC 等不同请求方式、不同网络环境的 Webhook URL,便于用户将其集成到各类应用中。接入时无需使用客户端,仅需保证应用可以访问到对应 Webhook URL 即可,这使得接入过程变得简单而高效。 在将 HTTP 请求转换为 CloudEvent 的时候,EventBridge 会将请求的头部和消息体部分置于 CloudEvent 字段中,其余字段会依据用户 EventBridge 资源属性以及系统默认规则进行填充。用户可以在事件规则中,对所需的内容进行过滤、提取,最终按照模板拼装成所需的消息内容投递给事件目标。 在安全方面,HTTP 事件源不需要用户进行复杂的签名鉴权,支持 3 种类型开箱即用的安全设置,分别是请求方法、源 IP 以及请求来源域名。 请求方法:用户可以配置当前请求此事件源时合法的 HTTP 请求方法,如果方法类型不满足配置规则,请求将被过滤,不会投递到事件总线。 源 IP:用户可以设置允许访问此事件源时合法的源 IP(支持 IP 段和 IP),当请求源 IP 不在设置的范围内时,请求将被过滤,不会投递到事件总线。 请求来源域名:即 HTTP 请求的 referer 字段,当请求的 referer 与用户配置不相符时,请求被过滤,不会投递到事件总线。 应用场景 钉钉连接器市场有数百款连接器,包含官方连接器和第三方生态连接器。 官方连接器,来源主要是钉钉官方的应用,比如视频会议、日程、通讯录、审批流、钉盘、宜搭等,企业和 SaaS 厂商可以充分利用这些官方应用的事件构建企业级的应用系统,也可以将钉钉的官方数据流与其他系统做深度集成。 第三方连接器,来源主要是钉钉的生态合作伙伴,比如金蝶、行翼云、集简云、用友、易快报、销帮帮等。SaaS 厂商可以通过开放连接器来开放数据,与其它应用互联互通。 如上图所示,借助钉钉连接器,可以将钉钉官方事件源和钉钉 SaaS 事件源连接到阿里云 EventBridge,从而能驱动云上的弹性资源。SaaS 厂商能够借助 EventBridge 连接的能力快速构建云原生的 SaaS 应用,借助云的弹性能力,采用云原生最新的技术栈,快速高效地开发 SaaS 应用,同时利用 EventBridge 获取钉钉和其它 SaaS 应用的数据源,轻松进行业务创新。 当钉钉生态和 EventBridge 联通后,能产生哪些应用场景呢? 分析场景:企业借助 EventBridge 事件分析能力,对钉钉官方事件进行分析,快速洞察企业运转数据。比如审批效率,员工变更趋势、会议效率等。 通知场景:钉钉连接器 + EventBridge  可覆盖绝大多数消息通知场景,帮助企业用户快速感知 审批,员工变动,会议室信息等一些列企业基础支持系统。 集成场景:基于阿里云基础建设,可快速提升钉钉生态和企业内部数据的互通。例如当公司需要对钉钉和企业内部 IT 系统进行数据打通时,EventBridge 解决方案可以毫不费力地将建立在阿里云体系的 IT 系统连通起来,比如函数计算,云数据库,消息队列等连接扩展阿里云生态。 EDA 场景:使用 EventBridge 快速构建 EDA 驱动的自动化业务流程。例如在新员工入职时,获取员工变动信息。并集中推送到邮箱系统,业务支持系统(DB),CRM 系统等。对企业新员工权限账户进行一站式授权,较少重复机械的业务审批流程。 最佳实践:新增员工 0 代码入库 本章节介绍使用钉钉连接器和 EventBridge 的最佳实践,通过一个例子展示如何 0 代码将钉钉的一个新员工入职记录录入到自定义的数据库当中,企业可以根据该数据库搭建各类员工管理系统。 方案简介 整个方案涉及到钉钉、钉钉连接器、EventBridge、阿里云数据库等产品,整个链路如下图所示: 前置条件: 拥有一个钉钉账号,并创建一个团队成为管理员,并能登陆钉钉开放平台。 拥有一个阿里云账号,并开通 EventBridge 和阿里云数据库。 实践步骤 整个实践过程分为以下几个步骤。 1)创建事件总线和 HTTP 事件源 首先登陆 EventBridge 控制台,创建一个事件总线和 HTTP 事件源,如下图所示,可以先跳过规则和目标的创建。 创建完成后,进入事件总线的详情列表,获取 HTTP 事件源的公网「Webhook 地址」,如下图所示: 2)创建钉钉连接流 登陆钉钉开放平台,进入连接平台,在「我的连接」下创建连接流,在创建界面,选择触发器为「官方通讯录通讯录用户增加」。 连接流创建完成后,进入编辑页面,添加一个「HTTP Webhook」的节点,在「请求地址」一栏填入上个步骤获取到的「HTTP 事件源」地址。 3)钉钉触发新增员工事件 打开钉钉,进入团队,邀请另一个账号加入团队,然后进入事件总线的「事件追踪」页面,可以发现该员工新增事件已经投递到了事件总线之上。 该事件被转换成了一个「CloudEvents」格式,其「$.data.body」为事件的详情,包含 dingId, userId, department 等字段。 { "datacontenttype": "application/json", "aliyunaccountid": "1148", "data": { "headers": { }, "path": "/webhook/putEvents", "body": { "syncAction": "user_add_org", "orderInDepts": "{1:1762632512}", "dingId": "$::$5RUQhP/pK+4A==", "active": true, "avatar": "", "isAdmin": false, "userId": "141146379", "isHide": false, "isLeaderInDepts": "{1:false}", "isBoss": false, "isSenior": false, "name": "小明", "department": [ 1 ] }, "httpMethod": "POST", "queryString": {} }, "subject": "acs:eventbridge:cnhangzhou::eventbus//eventsource/my.dingtalk", "aliyunoriginalaccountid": "11848", "source": "my.dingtalk", "type": "eventbridge:Events:HTTPEvent", "aliyunpublishtime": "20220513T07:28:29.505Z", "specversion": "1.0", "aliyuneventbusname": "chenyangbus", "id": "7059131cb232c4c3592120ae", "time": "20220513T15:28:29.504+08:00", "aliyunregionid": "cnhangzhou", "aliyunpublishaddr": "..61.88" } 4)数据库创建员工表 通过 RDS 控制台购买一个实例,并创建好数据库,然后根据上述新增员工事件的格式,提取部分字段对数据库进行建表。 CREATE TABLE 'user_info' ( 'dingId' varchar(256) NULL, 'active' varchar(256) NULL, 'isAdmin' varchar(256) NULL, 'userId' varchar(256) NULL, 'name' varchar(256) NULL ) ENGINE=InnoDB DEFAULT CHARACTER SET=utf8; 5)创建事件规则 数据库准备好后,返回 EventBridge 控制台,为第一步创建的事件总线创建事件规则,对「新增员工事件」进行转换并投递至数据库当中。 首先创建规则,过滤第一步创建的 HTTP 事件源。 然后选择 RDS 目标,做好参数映射。 6)触发事件入库 第三步触发事件时,因未配置规则和目标,事件没有被消费,故需要通过钉钉重新触发一次事件,然后从 EventBridge 控制台观察推送轨迹。 从轨迹中可以看出推送成功,然后通过 RDS 控制台可以查询到该条记录。 至此,一个钉钉团队新员工入职的记录通过 0 代码的方式入库到企业数据库当中,可以非常低的成本开发企业级管理应用。 _参考链接:_
作者:尘央
#行业实践 #生态集成

2022年4月20日

EventBridge 集成云服务实践
EvenBridge 集成概述 EventBridge 是阿里云所推出了一款无服务器事件总线,其目标是拓展事件生态,打破系统间的数据孤岛,建立事件集成生态。提供统一的事件标准化接入及管理能力,完善集成与被集成通路,帮助客户快速实现事件驱动的核心原子功能,可将 EventBridge 快速集成至 BPM、RPA、CRM 等系统。 EventBridge 通过事件标准化,接入标准化,组件标准化三个方向作为支点拓展 EventBridge 事件生态: 事件标准化:拥抱 CloudEvents 1.0 开源社区标准协议,原生支持 CloudEvents 社区 SDK 和 API,全面拥抱开源社区事件标准生态; 接入标准化:提供标准事件推送协议 PutEvent,并支持 Pull 和 Push 两种事件接入模型,可有效降低事件接入难度,提供云上完善的事件接入标准化流程; 组件标准化:封装标准的事件下游组件工具链体系,包括 Schema 注册、事件分析、事件检索、事件仪表盘等。提供完善的事件工具链生态。 在集成领域 EventBridge 重点打造事件集成和数据集成两类核心场景,下面将围绕这两类场景具体展开描述。 事件集成 目前 EventBridge 已经拥有 80+ 云产品的事件源,800+ 种事件类型。整个事件生态还正在逐步丰富中。 那么,EventBridge 如何实现云产品的事件集成呢? 首先在 EventBridge 控制台可以看见一个名为 default 的事件总线,云产品的事件都会投递到这个总线; 然后点击创建规则,就可以选择所关心的云产品以及它的相关事件进行事件的监听和投递。 下面以两个例子为例,来看下 EventBridge 事件集成的方式。 OSS 事件集成 以 OSS 事件源为例,来讲解一下如何集成 OSS 事件。 OSS 事件现在主要分为 4 类,操作审计相关、云监控相关、配置审计相关、以及云产品相关的事件例如 PutObject 上传文件等等。其他的云产品的事件源也类似,基本都可以分为这几个类型的事件。 下面演示一下事件驱动的在线文件解压服务: 在 OSS Bucket 下面会有一个  zip 文件夹存放需要解压的文件,一个 unzip 文件夹存放解压后的文件; 当上传文件到 OSS Bucket 之后,会触发文件上传的事件并投递到 EventBridge 的云服务专用总线; 然后会使用一个事件规则过滤 zip 这个 bucket 的事件并投递到解压服务的 HTTP Endpoint; 解压服务会在收到事件之后,根据事件里面的文件路径从 OSS 下载文件解压,并在解压之后将文件传到 unzip 目录下; 同时,还会有一个事件规则,监听 unzip 目录的文件上传事件,并将事件转换后推送到钉钉群。 一起来看下是如何实现的: 前往下方链接查看视频: 1)首先创建一个 bucket,下面有一个 zip 目录用于存放上传的压缩文件,一个 unzip 目录用于存放解压后的文件。 2) 部署解压服务,并且暴露公网访问的地址。 解压服务的源码地址为: 也可以使用 ASK 直接部署,yaml 文件地址为: 3)创建一个事件规则监听 zip 目录下的上传文件的事件,并投递到解压服务的 HTTP  Endpoint。 这里使用 subject,匹配 zip 目录。 4)再创建一个事件规则监听 unzip 目录的事件,投递解压事件到钉钉群。 这里同样使用 subject,匹配 unzip 目录。 对于变量和模板的配置可以参考官方文档 : 。 EventBridge 会通过 JSONPath 的方式从事件中提取参数,然后把这些值放到变量中,最后通过模板的定义渲染出最终的输出投递到事件目标。OSS 事件源的事件格式也可以参考官方文档 : _ _,并根据实际的业务需要使用 JSONPath 定义变量。5)最后,通过 oss 控制台上传一个文件进行验证。 可以看到刚刚上传的 eventbridge.zip 已经解压到并上传上来了,也可以在钉钉群里面,收到解压完成的通知。此外,还可以在事件追踪这边查看事件的内容已经投递的轨迹。 可以看到有两个上传事件:一个是通过控制台上传的事件,一个是解压文件后上传的事件。 可以查看轨迹,都成功投递到了解压服务的 HTTP Endpoint 以及钉钉机器人。 以自定义事件源以及云产品事件目标的方式集成云产品 刚才演示的 demo 是集成云服务的事件源,下面再通过一个 demo 看一下如何通过以自定义事件源以及云产品事件目标的方式集成云产品。 前往下方链接查看视频: 这个 demo 的最终效果是通过 EventBridge 自动进行数据的清洗,并投递到 RDS 中去。事件内容是一个 JSON,拥有两个字段一个名字一个年龄,现在希望将把大于 10 岁的用户过滤出来并存储到 RDS 中。 整体的架构如图所示,使用一个 MNS Queue 作为自定义事件源,并通过 EventBridge 过滤并转换事件最终直接输出到 RDS 中去。 1)首先已经创建好了一个 MNS Queue,创建好一个 RDS 实例以及数据库表,表结构如下所示: 2)创建一个自定事件总线,选择事件提供方为 MNS,队列为提前创建好的队列; 创建好了之后,我们就可以在事件源这里看见一个已经正在运行中的事件源; 3)接下来创建规则投递到 RDS 配置的事件模式内容如下: { "source": [ "my.user" ], "data": { "messageBody": { "age": [ { "numeric": [ "", 10 ] } ] } } } 数值匹配可以参考官方文档:   4) 点击下一步,选择事件目标为数据库,填写数据库信息,配置转化规则,完成创建。 5)最后,先用 MNS Queue 发送一个消息,这个的 age 是大于 10 的。 可以看见这条事件就输出到了 RDS 里面了。 下面再发一个小于 10 的消息到 MNS Queue。 这条事件就被过滤掉了,没有输出到 RDS。 也可通过事件追踪查看事件: 可以看到一条事件成功投递到了 RDS,一条事件被过滤掉了,没有进行投递。 数据集成 事件流是 EventBridge 为数据集成提供的一个更为轻量化、实时的端到端的事件流试的通道,主要目标是将事件在两个端点之间进行数据同步,同时提供过滤和转换的功能。目前已经支持阿里云各消息产品之间的事件流转。 不同于事件总线模型,在事件流中,并不需要事件总线,其 1:1 的模型更加的轻量,直接到目标的方式也让事件更加的实时;通过事件流,我们可以实现不同系统之间的协议转换,数据同步,跨地域备份的能力。 下面将通过一个例子讲解如何使用事件流,将 RocketMQ 的消息路由到 MNS Queue,将两个产品集成起来。 整体的结构如图所示,通过EventBridge 将 RocketMQ 中 TAG 为 MNS 的消息路由到 MNQ Queue。 一起看下怎么实现: 前往下方链接查看视频: 首先创建一个事件流,选择源 RocketMQ 实例,填写 Tag 为 mns。 事件模式内容留空表示匹配所有。 目标选择 MNS,选择目标队列完成创建。 完成创建之后,点击启动,启动事件流任务。 事件流启动完成之后,我们就可以通过控制台或者 SDK 发送消息到源 RocketMQ Topic 里面。当有 Tag 为 mns 的时候,我们可以看见消息路由到了 mns;当有 Tag 不为 mns 的时候,消息就不会路由到 mns。 总结 本篇文章主要向大家分享了通过 EventBridge 如何集成云产品事件源,如何集成云产品事件目标以及通过事件流如何集成消息产品.
作者:李凯(凯易)
#行业实践 #生态集成

2022年4月13日

基于 EventBridge 构建数据库应用集成
引言 事件总线 EventBridge 是阿里云提供的一款无服务器事件总线服务,支持将阿里云服务、自定义应用、SaaS 应用以标准化、中心化的方式接入,并能够以标准化的 CloudEvents 1.0 协议在这些应用之间路由事件,帮助您轻松构建松耦合、分布式的事件驱动架构。事件驱动架构是一种松耦合、分布式的驱动架构,收集到某应用产生的事件后实时对事件采取必要的处理,然后路由至下游系统,无需等待系统响应。使用事件总线 EventBridge 可以构建各种简单或复杂的事件驱动架构,以标准化的 CloudEvents 1.0 协议连接云产品和应用、应用和应用等。 事件目标(Target)负责事件的处理终端与消费事件,是 EventBridge 的核心模块。针对市场上其他云厂商和垂直领域的 DB 服务,EventBridge 发布基于事件目标模块的数据库 Sink,提供简单且易于集成的 DB 落库能力,帮助开发者更加高效、便捷地实现业务上云。 数据库 Sink 概述 数据库 Sink 事件目标是 EventBridge 支持的事件目标的一种,主要能力是通过 EventBridge 将数据投递至指定数据库表中。 得益于 EventBridge 生态体系,数据库 Sink 支持众多接入方式: 阿里云云产品事件,EventBridge 支持云服务总线,通过简单配置即可直接对云服务相关事件进行入库操作; SaaS 应用事件,EventBridge 支持三方 SaaS 事件接入,支持对 SaaS 触发事件落库、查询; 用户自定义应用,用户可以使用 EventBridge 官方的 API 接口、多语言客户端、HTTP Source 以及 CloudEvents 社区的开源客户端来完成接入。 数据库 Sink 能力重点聚焦在如何将 EventBridge 业务的半结构化 Json 数据转为结构化 SQL 语句,提供 LowCode 交互接入,帮助开发者一站式完成数据入库。 数据库 Sink 最佳实践 典型案例: 希望把一些 MNS 的消费消息或者 RocketMQ 的消费消息存储到指定的数据库表中,方便后面的数据分析和消息排查,也可以通过这种方式把数据新增到数据库表中; 通过 HTTP 的事件源把一些重要的日志或者是埋点数据直接存储到 DB 中,不需要经过用户业务系统,可以方便后续的客户场景分析。 使用介绍: 首先现阶段数据库 Sink For MySQL 支持两种方式:一种是基于阿里云的 RDS MySQL(VPC),另一种是用户自建的 MySQL(公网),可根据业务场景选择的不同方式接入。 步骤一 :点击事件规则并创建事件规则 步骤二 :选择事件源 可以选择阿里云官方或者自定义事件源 步骤三 :选择事件目标 1)在事件目标下面的服务类型选择数据库,这时会有两个选项就是一个是阿里云的 RDS MySQL,一个是自建 MySQL; 2)如果是阿里云 RDS MySQL,需要创建服务的关联角色。 3)授权以后就可以选择用户自己创建的 RDS MySQL 数据库的实例 ID 和数据库名称。 数据库账号和密码需手动填写,并发配置可以根据实际业务需要进行填写。因为 RDS MySQL 涉及到了跨地域访问,所以需要专有网络 VPC 的支持。 步骤四 :入库配置 入库配置支持快速配置与自定义 SQL 两种方式: 1)快速配置,支持 LowCode 方式快速选择入库内容。 2)自定义 SQL,支持自定义高级 SQL 语法。 步骤五:事件发布 当创建成功以后可以通过控制台进行事件发布: 步骤六 :事件状态追踪和查询 可以通过上个步骤中的事件 ID 可看到轨迹的详细信息,包括事件执行成功与否等信息。如果事件执行失败,会在页面展示异常信息。 通过事件追踪也可以看到详细的事件轨迹 : 总结 本文重点介绍 EventBridge 的新特性:数据库 Sink 事件目标。 作为一款无服务器事件总线服务,EventBridge 已经将阿里云云产品管控链路数据和消息产品业务数据整合到事件源生态中,提高了上云用户业务集成的便捷性,满足 Open API 与多语言 sdk 的支持,在此基础之上,通过 EventBridge 将数据投递至指定的数据库表中,为客户自身业务接入 EventBridge 提供了便利。 相关链接 [2] RDS 官方文档 [3] EventBridge 官方文档 想要了解更多 EventBridge 相关信息,扫描下方二维码加入钉钉群~
作者:赵海
#行业实践 #生态集成

2022年4月6日

EventBridge 与 FC 一站式深度集成解析
前言:事件总线 EventBridge 产品和 FC (Serverless 函数计算) 产品全面深度集成,意味着函数计算和阿里云生态各产品及业务 SaaS 系统有了统一标准的接入方式;依托 EventBridge 统一标准的事件源接入能力,结合 Serverless 函数计算高效敏捷的开发特点,能够帮助客户基于丰富的事件,结合 EDA 架构快速构建云上业务系统。为了帮助大家更好的理解,今天的介绍主要分为三部分:为什么需要一站式深度集成、FC 和 EventBridge 产品集成功能演示及场景介绍、EventBridge 和函数计算深度集成下一阶段规划。 为什么需要一站式深度集成? 首先让我们一起来看看什么是 EventBridge,什么是函数计算? 什么是 EventBridge? 阿里云事件总线(EventBridge)是一种无服务器事件总线,支持将用户的应用程序、第三方软件即服务 (SaaS)数据和阿里云服务的数据通过事件的方式轻松的连接到一起,这里汇聚了来自云产品及 SaaS 服务的丰富事件; 从整个架构来看,EventBridge 通过事件总线,事件规则将事件源和事件目标进行连接。首先,让我们快速普及下 EventBridge 架构中涉及的几个核心概念: 事件:状态变化的记录; 事件源:事件的来源,事件的产生者,产生事件的系统和服务, 事件源生产事件并将其发布到事件总线; 事件总线:负责接收来自事件源的事件;EventBridge支持两种类型的事件总线: 云服务专用事件总线:无需创建且不可修改的内置事件总线,用于接收您的阿里云官方事件源的事件。 自定义事件总线:标准存储态总线,用于接收自定义应用或存量消息数据的事件,一般事件驱动可选该总线。 事件规则:用于过滤,转化事件,帮助更好的投递事件; 事件目标:事件的消费者,负责具体事件的处理。 通过上面的流程,完成了事件的产生,事件的投递,事件的处理整个过程。当然事件并不是一个新的概念,事件驱动架构也不是一个新的概念,事件在我们的系统中无处不在,事件驱动架构同样伴随着整个计算机的架构演进,不断地被讨论。对于 EventBridge,采用云原生事件标准 CloudEvents 来描述事件;带来事件的标准化,这样的标准化和事件标准的开放性带来一个最显著的优势:接入的标准化,无论是对于事件源还是事件目标。 什么是函数计算(FC)? 函数计算是事件驱动的全托管计算服务。使用函数计算,您无需采购与管理服务器等基础设施,只需编写并上传代码。函数计算为您准备好计算资源,弹性地、可靠地运行任务,并提供日志查询、性能监控和报警等功能。 通过上面的描述,总结起来大家只需要记住几点: 简单易用:快速上线,极大提升业务研发效率; 无服务器运维:节省运维投入; 按需付费:沉稳应对突发流量场景; 事件驱动:云产品互通,快速联动。 为什么函数计算需要 EventBridge? 函数计算以其轻量,快捷,能够利用事件驱动的方式与其他云产品进行联动的特点, 成为很多客户利用事件驱动架构构建业务系统的首选,随着业务及客户需求的不断增加,客户对于函数计算和更多云产品及服务的连接需求变得越来越多,同时对于其他云产品的客户而言, 也希望能够利用Serverless函数计算的特点帮助处理一些系统任务和事件。 1)事件源多样性挑战 事件驱动作为函数计算产品核心竞争力,打通函数计算和其它云产品,以及用户自定义应用,SaaS 服务的连通成为函数计算生态集成的迫切需求,但系统集成,生态建设从来都不是一件容易的事情。函数计算系统在和 EventBridge 集成之前,已经和 OSS,SLS 等用户典型场景的云产品进行了集成,也和阿里云的其它大概十多款产品进行了集成,不同系统具有不同的事件格式,不同系统的注册通知机制也各不相同,以及上游不同系统的失败处理机制也各不相同;部分系统支持同步的调用方式,部分系统支持异步的调用方式,调用方式的差异主要取决于上游系统在接入函数计算的时候当时面临的产品业务场景,对于新的产品能力和业务场景的扩展支持,在当时并未有太多的考虑。随着和更多云产品的集成,集成的投入,集成的困难度和底层数据管理难度越来越大。面对多种事件源集成的客观困难,函数计算希望提高和其他云产品的集成效率。 2)授权复杂及安全隐患 除此之外, 函数计算希望提升用户体验,保证用户关心事件的处理;同时希望能够在面对大量的云产品时保证系统授权层面的复杂度。用户在使用事件触发的时候, 需要了解不同产品接入函数计算的权限要求, 对于客户使用函数计算带来了非常大的困难,为了加速产品接入,大量用户经常使用 FullAcees 权限,造成较大产品安全隐患。 3)通用能力难以沉淀 面对上游不同的事件源, 如何更好的投递事件、更好的消费事件?如何进行事件的错误处理?函数计算调用方式如何选择?以及函数计算后端错误 Backpressure 能力的反馈、重试策略和上游系统参数设置、触发器数量的限制等问题成为函数计算事件触发不得不面对的问题。为了更好的服务客户,提供可靠的消费处理能力,函数计算希望能够有一个统一的接入层,基于统一的接入层进行消费能力和流控能力的建设。通过沉淀在这样一个标准的层面,在保证调用灵活性的同时,提供可靠的服务质量。 为什么 EventBridge 同样需要函数计算? EventBridge 作为标准的事件中心,目的是希望能够帮助客户把这些事件利用起来,能够通过事件将产品的能力进行联动,为了达成这样的目的,势必需要帮助客户通过更便捷的路径来快速消费处理这些事件。EventBridge 和函数计算的深度集成正是为了这样的共同目标 —— 帮助客户快速的构建基于 EDA 架构的业务系统,促进业务获得成功。 FC 和 EventBridge 产品集成功能演示及场景介绍 EventBridge 具体支持的事件类型, 基本上包括了阿里云所有的官方产品。可以通过 EventBridge 官方主页查看目前支持的阿里云官方产品事件源类型 。 EventBridge 触发器及异步集成 点击下方链接跳转查看: 函数计算异步链路支持将处理结果直接投递到 MQ 和 EventBridge,用户可以利用 EventBridge 将相关的结果投递到 SAAS 服务; 点击下方链接跳转查看: 双向集成的变化 1. 函数计算支持 85+阿里云官方事件源; 2. 函数计算支持整个阿里云消息队列的事件触发,包括 RocketMQ, RabbitMQ,MNS 等; 1. EventBridge 和函数计算控制台数据互通,用户无需在函数计算控制台和事件总线控制台来回跳转; 2. 用户通过触发器详情,快速跳转,利用 EventBridge 事件追踪能力帮助用户快速排查问题; 官方事件源运维场景总结 基于官方事件源的事件驱动场景,大概可以总结抽象成四个场景。 场景一:单账号下某个云产品的运维需求。通常客户希望基于这样的一个事件,包括类似像云服务器事件 ECS,或者容器服务镜像事件,通过这样的事件监听做一些自动化诊断和运维操作。 场景二:实际是在场景一的基础上的一个扩展,针对多个云产品的事件,希望能够进一步分析,做一些故障处理。 场景三:我们观察到,大的一些企业,在使用云产品的时候,实际上是由多个账号去使用阿里云的产品。在多个账号,多个产品的情况下,希望能够对多个账号中的云资源使用情况有一个全局统一的视角进行实践分析,同时进行账号配额的一些调整。那这样的话就是可以利用到 EventBridge 跨账号事件投递的能力,然后再利用函数计算做一个统一处理。 场景四:这个场景实际上是一个账号跨域事件处理场景,EventBridge 目前并没有去提供这样一个跨域的能力,这种情况下,可以借助函数计算提供的 HTTP 函数能力,自动生成 HTTP Endpoint,通过 EventBridge 的 HTTP 事件源,完成事件的跨域消费。 自定义事件源场景总结 1)MNS 队列自定义事件源触发场景:客户在 OSS 中上传文件之后,根据文件上传事件对 ACK 进行扩容,目前通过 OSS 事件发送到 MNS 中,然后由 MNSQueue 消息通过 EventBridge 触发函数计算, 在函数计算中根据一定的逻辑进行 ECI 资源的创建;同时客户希望通过 MNS 进行通知服务;利用 EventBridge 订阅模式,通过事件规则的定义,让通知服务和函数计算共享同一个事件订阅规则,可以大大的简化用户的方案。 2)RabbitMQ 队列自定义事件源触发场景:鉴于 RabbitMQ 在稳定性和可靠性方面的表现,在 IOT 场景具有非常普遍的使用,客户通常会选择使用 RabbitMQ 来进行端设备数据采集和存储, 考虑到 IOT 相关的嵌入式设备性能使用环境,通常端设备采集的数据比较偏向底层裸数据,在实际业务层面,客户需要找到一种快速高效的途径对 RabbitMQ 中的数据进行加工,通过 EventBridge 提供的自定义事件总线,利用函数计算对 RabbitMQ 中的数据快速处理, 实现 ETL 目的。 EventBridge 和函数计算深度集成下一阶段规划 事件过滤高级 ETL 处理 将函数计算和 EventBridge 进行更紧密的集成,由函数计算提供一些高级的 ETL 能力,提升整个事件过滤转换的能力。 提供更丰富的事件目标 目前 EventBridge 整个下游的事件目标相对来说较少,我们希望能够通过函数计算和 EventBridge 的一个密切集成,利用函数计算敏捷的开发能力,分别通过大账号模式和用户自持的这样一个能力,构建一些更丰富的 EventBridge 下游事件目标,帮助丰富整个事件目标的生态。
作者:史明伟(世如)
#行业实践 #生态集成

2022年3月11日

基于 EventBridge 构建 SaaS 应用集成方案
引言 事件驱动架构(EDA)是一种以事件为纽带,将不同系统进行解耦的异步架构设计模型。在 EDA 中,事件驱动的运行流程天然地划分了各个系统的业务语义,用户可以根据需求对事件与针对此事件做出的响应灵活定制,这使得基于 EDA 架构可以方便地构建出高伸缩性的应用。据 Daitan Group 的调研报告,早在 2017 年,例如 UBER、Deliveroo、Monzo 等公司就已经采用了 EDA 去设计他们的系统。 为了便于用户更加轻松地开发以 EDA 为架构的应用,在 2020 年云栖大会上,阿里云正式推出了 EventBridge。EventBridge 是一款无服务器事件总线服务,能够以标准化的 CloudEvents 1.0 协议在应用之间路由事件。目前,EventBridge 已经集成了众多成熟的阿里云产品,用户可以低代码甚至零代码完成各个阿里云产品和应用之间的打通,轻松高效地构建分布式事件驱动架构。 事件源是事件驱动的基石,如何获取更多事件源也是 EventBridge 一直在探索和尝试的方向。针对市场上其他云厂商和垂直领域的 Saas 服务,EventBridge 发布了 HTTP Source 能力,提供简单且易于集成的三方事件推送 ,帮助客户更加高效、便捷地实现业务上云。 HTTP Source 概述 接入 EventBridge 应用有多种情况:用户自定义应用、阿里云服务、其他云厂商服务或者其他 SaaS 产品。 对于用户自定义应用,用户可以使用 EventBridge 官方的 API 接口、多语言客户端以及 CloudEvents 社区的开源客户端来完成接入。 对于阿里云的云产品,EventBridge 原生支持,用户可以在默认事件总线中选择对应的云产品与其相关的触发事件。 而对于其他云厂商、SaaS 产品,EventBridge 同样也提供便捷的接入方式便于用户进行集成,HTTP Source 事件源便是一种典型的接入方式。 具体而言,HTTP Source 事件源是 EventBridge 支持的事件源的一种,它以 Webhook 形式暴露了发布事件的 HTTP 请求地址,用户可以在有 URL 回调的场景配置 HTTP Source 事件源,或者直接使用最简单的 HTTP 客户端来完成事件的发布。HTTP Source 事件源提供了支持 HTTP 与 HTTPS,公网与阿里云 VPC 等不同请求方式、不同网络环境的 Webhook URL,便于用户将其集成到各类应用中。接入时无需使用客户端,仅需保证应用可以访问到对应 Webhook URL 即可,这使得接入过程变得简单而高效。 在将 HTTP 请求转换为 CloudEvent 的时候,EventBridge 会将请求的头部和消息体部分置于 CloudEvent 字段中,其余字段会依据用户 EventBridge 资源属性以及系统默认规则进行填充。用户可以在事件规则中,对所需的内容进行过滤、提取,最终按照模板拼装成所需的消息内容投递给事件目标。 HTTP Source 事件源目前支持 3 种类型的安全设置,分别是请求方法、源 IP 以及请求来源域名。 请求方法:用户可以配置当前请求此事件源时合法的 HTTP 请求方法,如果方法类型不满足配置规则,请求将被过滤,不会投递到事件总线。 源 IP:用户可以设置允许访问此事件源时合法的源 IP(支持 IP 段和 IP),当请求源 IP 不在设置的范围内时,请求将被过滤,不会投递到事件总线。 请求来源域名:即 HTTP 请求的 referer 字段,当请求的 referer 与用户配置不相符时,请求被过滤,不会投递到事件总线。 抛砖引玉,下面就介绍如何使用 HTTP Source 来构建 SaaS 应用集成的最佳实践,帮助大家快速上手 SaaS 集成方案。 SaaS 集成最佳实践 钉钉监控 GitHub 代码推送事件 GitHub 提供了 Webhook 功能,代码仓库在发生某些特定操作(push、fork等)时,可以通过回调来帮助用户完成特定功能。针对多人开发的项目,将 GitHub 事件推送到特定钉钉群可以帮助成员有效关注代码变更,提高协同效率。 本节我们展示如何通过钉钉监控 GitHub 代码推送事件的最佳实践,主要包含以下几个步骤: 创建一个钉钉机器人; 创建 EventBridge 相关资源:事件总线、事件源(HTTP Source 类型)、事件规则、事件目标(钉钉); 创建自定义事件总线; 选择 GitHub 代码仓库创建 Webhook; 向 GitHub 代码仓库推送代码变更; 钉钉群接收此次代码推送相关信息。 1)创建钉钉机器人 参考钉钉官方文档[1],创建一个群机器人。创建群机器人时,安全设置请勾选“加签”并妥善保管密钥和稍后生成的机器人 Webhook 地址。 2)创建 EventBridge 相关资源 创建 EventBus 事件总线 创建事件源。事件源配置完成之后,点击跳过,我们接下来会专门配置事件规则与目标。 创建完成后,进入事件源详情页,保存刚刚生成的 Webhook URL。 在 EventBridge 控制台页面点击进入刚刚创建的 EventBus 详情页,在左侧一栏中“事件规则”选择“创建规则”。 创建时间目标。选择钉钉,并将钉钉机器人的 Webhook 地址和密钥填入,推送内容侧可以按照需求设计。 我们填写模板变量为: {"repo":"$.data.body.repository.full_name","branch":"$.data.body.ref","pusher":"$.data.body.pusher.name"} 模板为: {"msgtype": "text","text": {"content": "Github push event is triggered. repository: {repo}, git reference: {branch}, pusher: {pusher}." } } 3)在 GitHub 代码仓库创建 Webhook 登陆 GitHub,在 GitHub 代码仓库“setting”中选择左侧“Webhooks”,选择新建 Webhook。 在创建 Webhook 的配置项中填入 HTTP Source 事件源的 Webhook 地址,Content type 部分选择“application/json”,下方触发事件类型选择“Just the push event.”,随后点击“Add Webhook”,创建完成。 4)向 GitHub 代码仓库推送代码变更 本地仓库做一定变更,commit 后推送 GitHub。 5)钉钉群接收此次代码推送相关信息 _异步消费监控报警信息_ 业务上存在异步消费报警信息的场景,例如报警内容备份,根据报警频率自适应调整报警阈值等。而且对于多云业务的用户,如何将跨云服务的报警信息整合起来也是一个麻烦的问题。依托 HTTP Source,用户可以将不同云厂商(腾讯云、华为云等)、不同监控产品(Grafana、Zabbix、Nagios等)统一集成到 EventBridge 平台,以便于实现对报警信息的异步消费。 本节我们介绍如何使用 EventBridge 集成 Grafana,实现异步消费监控报警信息。Grafana 是一款开源数据可视化工具,也同时具有监控报警功能,具体使用可以参阅Grafana 官方文档[2]。本节主要包含以下步骤: 创建 MNS 队列; 创建 EventBridge 相关资源; Grafana 上配置 Webhook; 测试接收结果。 创建 MNS 队列 在 MNS 控制台,选择“队列列表创建队列”。 创建 EventBridge 相关资源 同上文所述,这里仅示例创建事件目标时相关配置。 Grafana 上配置 Webhook 点击 Grafana 控制台左侧“AlertingNotification channels”,选择“Add channel”。 在“type”一栏中选择“Webhook”,url 填写 HTTP Source 事件源的 Webhook 地址,点击下方“Test”。 测试接收结果 登陆 MNS 控制台,进入队列详情页,点击页面右上角“收发消息”,可以看到 MNS 已经接收到刚刚 Grafana 发送的消息。 点击对应消息详情可以看到消息内容,说明消息已经被成功消费。 _更多集成_ HTTP Source 支持的三方集成包括 Prometheus,Zabbix,Skywalking,Grafana,OpenFalcon,Cacti,Nagios,Dynatrace,Salesforce,Shopify,Gitee 等 SaaS 应用。通过简单配置 Webhook 无需开发既可实现事件接收能力。 _总结_ 本文重点介绍 EventBridge 的新特性:HTTP Source 事件源。作为一款无服务器事件总线服务,EventBridge 已经将阿里云云产品管控链路数据、消息产品业务数据整和到事件源生态中,提高了上云用户业务集成的便捷性,Open API 与多语言 sdk 的支持,为客户自身业务接入 EventBridge 提供了便利。 在此基础之上,HTTP Source 事件源更进一步,以 Webhook 形式开放了针对了其他云厂商、SaaS 应用的集成能力,无需代码改动,仅需要简单配置即可完成 EventBridge 集成操作。 _相关链接_
作者:昶风
#行业实践 #生态集成

2022年1月23日

平安保险基于 SPI 机制的 RocketMQ 定制化应用
为什么选用 RocketMQ 首先跟大家聊聊我们为什么会选用 RocketMQ,在做技术选型的过程中,应用场景应该是最先考虑清楚的,只有确定好了应用场景在做技术选型的过程中才有明确的目标和衡量的标准。像异步、解耦、削峰填谷这些消息中间件共有的特性就不一一介绍了,这些特性是决定你的场景需不需要使用消息中间件,这里主要讲述下在确定使用消息中间件后,又是如何去选择哪款消息中间件的。 同步双写,确保业务数据安全可靠不丢失 我们在搭建消息中间件平台时的定位是给业务系统做业务数据的传输使用,对业务数据的很重要的一个要求就是不允许丢数据,所以选用 RocketMQ 的第一点就是他有同步双写机制,数据在主从服务器上都刷盘成功才算发送成功。同步双写条件下,MQ 的写入性能与异步刷盘异步赋值相比肯定会有所下降,与异步条件下大约会有 20% 左右的下降,单主从架构下,1K 的消息写入性能还是能达到 8W+ 的 TPS,对大部分业务场景而言性能是能完全满足要求的,另外对下降的这部分性能可以通过 broker 的横向扩招来弥补,所以在同步双写条件下,性能是能满足业务需求的。 多 topic 应用场景下,性能依旧强悍 第二点,业务系统的使用场景会特别多,使用场景广泛带来的问题就是会创建大量的 topic,所以这时候就得去衡量消息中间件在多 topic 场景下性能是否能满足需求。我自己在测试的时候呢,用 1K 的消息随机往 1 万个 topic 写数据,单 broker 状态下能达到2W左右的 TPS,这一点比 Kafka 要强很多。所以多 topic 应用场景下,性能依旧强悍是我们选用 topic 的第二个原因。这点也是由底层文件存储结构决定的,像 Kafka、RocketMQ 这类消息中间件能做到接近内存的读写能力,主要取决于文件的顺序读写和内存映射。RocketMQ 中的所有 topic 的消息都是写在同一个 commitLog 文件中的,但是 Kafka 中的消息是以 topic 为基本单位组织的,不同的 topic 之间是相互独立的。在多 topic 场景下就造成了大量的小文件,大量的小文件在读写时存在一个寻址的过程,就有点类似随机读写了,影响整体的性能。 支持事务消息、顺序消息、延迟消息、消息消费失败重试等 RocketMQ 支持事务消息、顺序消息、消息消费失败重试、延迟消息等,功能比较丰富,比较适合复杂多变的业务场景使用 社区建设活跃,阿里开源系统 另外,在选用消息中间件时也要考虑下社区的活跃度和源码所使用的开发语言,RocketMQ 使用 Java 开发,对 Java 开发人员就比较友好,不管是阅读源码排查问题还是在 MQ 的基础上做二次开发都比较容易一点。社区里同学大都是国内的小伙伴,对大家参与 RocketMQ 开源贡献也是比较亲近的,这里呢也是希望更多的小伙伴能参与进来,为国内开源项目多做贡献。 SPI 机制简介及应用 介绍完为什么选用 RocketMQ 后,接下来给大家介绍下我们是如何基于 SPI 机制应用 RocketMQ 的。SPI 全称为 (Service Provider Interface) ,是 JDK 内置的一种服务提供发现机制,我个人简单理解就是面向接口编程,留给使用者一个扩展的点,像 springBoot 中的 spring.factories 也是 SPI 机制的一个应用。如图给大家展示的是 RocketMQ 中 SPI 的一个应用。我们基于 SPI 机制的 RocketMQ 客户端的应用的灵感也是来自于 MQ 中 SPI 机制的应用。RocketMQ 在实现 ACL 权限校验的时候,是通过实现 AccessValidator 接口,PlainAccessValidator 是 MQ 中的默认实现。权限校验这一块,可能因为组织架构的不一样会有不同的实现方式,通过 SPI 机制提供一个接口,为开发者定制化开发提供扩展点。在有定制化需求时只需要重新实现 AccessValidator 接口,不需要对源码大动干戈。 接下来先给大家介绍下我们配置文件的一个简单模型,在这个配置文件中除了 sendMsgService、consumeMsgConcurrently、consumeMsgOrderly 这三个配置项外其余的都是 RocketMQ 原生的配置文件,发送消息和消费消息这三个配置项呢就是 SPI 机制的应用,是为具体实现提供的接口。可能有的同学会有疑问,SPI 的配置文件不是应该放在 METAINF.service 路径下么?这里呢我们是为了方便配置文件的管理,索性就跟 MQ 配置文件放在了一起。前面也提到了,METAINF.service 只是一个默认的路径而已,为了方便管理做相应的修改也没有违背SPI机制的思想。 我们再看下这个配置文件模型,这里的配置项呢囊括了使用 MQ 时所要配置的所有选项,proConfigs 支持所有的 MQ 原生配置,这样呢也就实现了配置与应用实现的解耦,应用端只需呀关注的具体的业务逻辑即可,生产者消费者的实现和消费者消费的 topic 都可以通过配置文件来指定。另外该配置文件也支持多 nameserver 的多环境使用,在较复杂的应用中支持往多套 RocketMQ 环境发送消息和消费多套不同环境下的消息。消费者提供了两个接口主要是为了支持 RocketMQ 的并发消费和顺序消费。接下来呢给大家分享下如何根据这个配置文件来初始化生产者消费者。首先给大家先介绍下我们抽象出来的客户端加载的一个核心流程。 客户端核心流程详情 图中大家可以看到,客户端的核心流程我们抽象成了三部分,分别是启动期、运行期和终止期。首先加载配置文件呢就是加载刚刚介绍的那个配置文件模型,在配置与应用完全解耦的状态下,必须先加载完配置文件才能初始化后续的流程。在初始化生产者和消费者之前应当先创建好应用实现的生产者和消费者的业务逻辑对象 供生产者和消费者使用。在运行期监听配置文件的变化,根据变化动态的调整生产者和消费者实例。这里还是要再强调下配置与应用的解耦为动态调整提供了可能。终止期就比较简单了,就是关闭生产者和消费者,并从容器中移除。这里的终止期指的生产者和消费者的终止,并不是整个应用的终止,生产者和消费者的终止可能出现在动态调整的过程中,所以终止了的实例一定要从容器中移除,方便初始化后续的生产者和消费者。介绍完基本流程后,接下来给大家介绍下配置文件的加载过程。 如何加载配置文件 配置文件加载这一块的话,流程是比较简单的。这里主要讲的是如何去兼容比较老的项目。RocketMQ 客户端支持的 JDK 最低版本是 1.6,所以在封装客户端时应该要考虑到新老项目兼容的问题。在这里呢我们客户端的核心包是支持 JDK1.6 的,spring 早期的项目配置文件一般都是放在在 resources 路径下,我们是自己实现了一套读取配置文件的和监听配置文件的方法,具体的大家可以参考 acl 中配置文件的读取和监听。在核心包的基础上用 springBoot 又封装了一套自动加载配置文件的包供微服务项目使用,配置文件的读取和监听都用的 spring 的那一套。配置文件加载完之后, 配置文件中应用实现的生产者和消费者是如何与 RocketMQ 的生产者和消费者相关联的呢?接下来给大家分享下这方面的内容。 如何将生产消费者与业务实现关联 首先先看下消费者是如何实现关联的,上图是 MQ 消费者的消息监听器,需要我们去实现具体的业务逻辑处理。通过将配置文件中实现的消费逻辑关联到这里就能实现配置文件中的消费者与 RocketMQ 消费者的关联。消费者的接口定义也是很简单,就是去消费消息。消费消息的类型可以通过泛型指定,在初始化消费者的时候获取具体实现的参数类型,并将 MQ 接受到的消息转换为具体的业务类型数据。由客户端统一封装好消息类型的转换。对消费消息的返回值大家可以根据需要与 MQ 提供的 status 做一个映射,这里的 demo 只是简单显示了下。在获取具体的应用消费者实例的时候,如果你的消费逻辑里使用了 spring 管理的对象,那么你实现的消费逻辑对象也要交给 spring 管理,通过 spring 上下文获取初始化好的对象;如果你的消费逻辑里没有使用 spring 进行管理,可以通过反射的方式自己创建具体的应用实例。 与消费者不一样的是生产者需要将初始化好的 producer 对象传递到应用代码中去,而消费者是去获取应用中实现的逻辑对象,那如何将 producer 传递到业务应用中去呢? 业务代码中实现的生产者需要继承 SendMessage,这样业务代码就获得了 RmqProducer 对象,这是一个被封装后的生产者对象,该对象对发送消息的方法进行的规范化定义,使之符合公司的相应规范制度,该对象中的方法也会对 topic 的命名规范进行检查,规范 topic 有一个统一的命名规范。 如何动态调整生产消费者 首先谈到动态调整就需要谈一下动态调整发生的场景,如果没有合适的使用场景的话实现动态调整就有点华而不实了。这里我列举了四个配置文件发生变化的场景: nameserver发生变化的时候,需要重新初始化所有的生产者和消费者,这个一般是在 MQ 做迁移或者当前 MQ 集群不可用是需要紧急切换 MQ; 增减实例的场景只要启动或关闭相应的实例即可,增加应用实例的场景一般是在需要增加一个消费者来消费新的 topic 的,减少消费者一般是在某个消费者发生异常时需要紧急关闭这个消费者,及时止损。 调整消费者线程的场景中我们对源码进行了一点修改,让应用端能获取到消费者的线程池对象,以便对线程池的核心线程数进行动态调整。这个的应用场景一般是在当某个消费者消费的数据比较多,占用过多的 CPU 资源时,导致优先级更高的消息得不到及时处理,可以先将该消费者的线程调小一些。 应用的优点
作者:孙园园
#行业实践

2022年1月20日

消息队列 RocketMQ 遇上可观测:业务核心链路可视化
引言:本篇文章主要介绍 RocketMQ 的可观测性工具在线上生产环境的最佳实践。RocketMQ的可观测性能力领先业界同类产品,RocketMQ 的 Dashboard 和消息轨迹等功能为业务核心链路保驾护航,有效应对线上大规模生产使用过程中遇到的容量规划、消息收发问题排查以及自定义监控等场景。 消息队列简介 进入主题之前,首先简要介绍下什么是阿里云的消息队列? 阿里云提供了丰富的消息产品家族,消息产品矩阵涵盖了互联网、大数据、物联网等各个业务场景的领域,为云上客户提供了多维度可选的消息解决方案。无论哪一款消息队列产品,核心都是帮助用户解决业务和系统的异步、解耦以及应对流量洪峰时的削峰填谷,同时具备分布式、高吞吐、低延迟、高可扩展等特性。 但是不同的消息产品在面向客户业务的应用中也有不同的侧重。简单来做,消息队列 RocketMQ 是业务领域的首选消息通道;Kafka 是大数据领域不可或缺的消息产品;MQTT 是物联网领域的消息解决方案;RabbitMQ 侧重于传统业务消息领域;云原生的产品集成和事件流通道是通过消息队列 MNS 来完成;最后事件总线 EventBridge 是一个阿里云上的一个事件枢纽,统一构建事件中心。 本篇主要讲的是业务领域的消息首选通道:消息队列 RocketMQ。RocketMQ 诞生于阿里的电商系统,具有高性能、低延迟、削峰填谷等能力,并且提供了丰富的在业务和消息场景上应对瞬时流量洪峰的功能,被集成在用户的核心业务链路上。 作为一个核心业务链路上的消息,就要求 RocketMQ 具备非常高的可观测性能力,用户能通过可观测性能力及时的监控定位异常波动,同时对具体的业务数据问题进行排查。由此,可观测性能力逐步成为消息队列 RocketMQ 的核心能力之一。 那么什么是可观测能力呢?下面简单对可观测能力进行介绍。 可观测能力 提到可观测能力,大家可能最先想到的是可观测的三要素:Metrics(指标)、Tracing(追踪)和 Logging(日志)。 结合消息队列的理解,可观测能力三要素的细化解释如下: Metrics:Dashborad 大盘 1)指标涵盖丰富:包含消息量、堆积量、各个阶段耗时等指标,每个指标从实例、Topic、消费 GroupID 多维度做聚合和展示; 2)消息团队最佳实践模板:为用户提供最佳模板,特别是在复杂的消费消息场景,提供了丰富的指标帮助快速定位问题,并持续迭代更新; 3)Prometheus + Grafana:Prometheus标准数据格式、利用Grafana展示,除了模板,用户也可以自定义展示大盘。 Tracing:消息轨迹 1)OpenTelemetry tracing标准:RocketMQ tracing 标准已经合并到 OpenTelemetry 开源标准,规范和丰富 messaging tracing 场景定义; 2)消息领域定制化展示:按照消息维度重新组织抽象的请求 span 数据,展示一对多的消费,多次消费信息,直观、方便理解; 3)可衔接 tracing链路上下游:消息的 tracing 可继承调用上下文,补充到完整调用链路中,消息链路信息串联了异步链路的上游和下游链路信息。 Logging:客户端日志标准化 1)Error Code标准化:不同的错误有唯一的 error code; 2)Error Message 完整:包含完整的错误信息和排序所需要的资源信息; 3)Error Level 标准化:细化了各种不同错误信息的日志级别,让用户根据 Error、Warn 等级别配置更合适和监控告警。 了解消息队列和可观测能力的基础概念,让我们来看看当消息队列 RocketMQ 遇到可观测,会产生什么样的火花? RocketMQ 的可观测性工具的概念介绍 从上文的介绍中可以看到 RocketMQ 的可观测能力能够帮助用户根据错误信息排查消息在生产和消费过程中哪些环节出了问题,为了帮助大家更好的理解功能的应用,先简要介绍下消息生产消费流程过程中的一些概念。 消息生产和消费流程概念 首先我们先明确以下几个概念: Topic:消息主题,一级消息类型,通过Topic对消息进行分类; 消息(Message):消息队列中信息传递的载体; Broker:消息中转角色,负责存储消息,转发消息; Producer:消息生产者,也称为消息发布者,负责生产并发送消息; Consumer:消息消费者,也称为消息订阅者,负责接收并消费消息。 消息生产和消费的流程简单来说就是生产者将消息发送到 topic 的 MessageQueue 上进行存储,然后消费者去消费这些 MessageQueue 上的消息,如果有多个消费者,那么一个完整的一次消息生产发生的生命周期是什么样子的? 这里我们以定时消息为例,生产者 Producer 发送消息经过一定的耗时到达 MQ Server,MQ 将消息存储在 MessageQueue,这时队列中有一个存储时间,如果是定时消息,还需要经过一定的定时时间之后才能被消费者消费,这个时间就是消息就绪的时间;经过定时的时间后消费者 Consumer 开始消费,消费者从 MessageQueue 中拉取消息,然后经过网络的耗时之后到达消费者客户端,这时候不是低码进行消费的,会有一个等待消费者资源线程的过程,等到消费者的线程资源后才开始进行真正的业务消息处理。 从上面的介绍中可以看出,业务消息有一定的耗时处理,完成之后才会向服务端返回ack的结果,在整个生产和消费的过程中,最复杂的便是消费的过程,因为耗时等原因,会经常有消息堆积的场景,下面来重点看一下在消息堆积场景下各个指标表示的含义。 消息堆积场景 如上图,消息队列中,灰色部分的消息表示是已完成的消息量,就是消费者已处理完成并返回 ack 的消息;橙色部分的消息表示这些消息已经被拉取到消费者客户端,正在被处理中,但是还没有返回处理结果的消息,这个消息其实有一个非常重要的指标,就是消息处理耗时;最后绿色的消息表示这些消息在已经发生的 MQ 队列中已存储完成,并且已经是可被消费者消费的一个状态,称为已就绪的消息。 _已就绪消息量(Ready messages):_ _含义:已就绪消息的消息的条数。_ _作用:消息量的大小反映还未被消费的消息规模,在消费者异常情况下,就绪消息量会变多。_ _消息排队时间(Queue time)_ _含义:最早一条就绪消息的就绪时间和当前时间差。_ _作用:这个时间大小反映了还未被处理消息的时间延迟情况,对于时间敏感的业务来说是非常重要的度量指标。_ RocketMQ 的可观测性工具的功能介绍 结合上文介绍的消息队列 RocketMQ 可观测概念,下面具体对 RocketMQ 的可观测性工具的两个核心功能进行介绍。 可观测功能介绍 Dashboard Dashboard 大盘可以根据各种参数查看指定的指标数据,主要的指标数据包含下面三点: 1)Overview(概览): 查看实例据总的消息收发量、TPS、消息类型分布情况。 查看是的各个指标当前的分布和排序情况:发送消息量最多的 Topic、消费消息量最多的 GroupID、堆积消息量最多的 GroupID、排队时间最长的 GroupID 等。 2)Topic(消息发送): 查看指定 Topic 的发送消息量曲线图。 查看指定 Topic 的发送成功率曲线图。 查看指定 Topic 的发送耗时曲线图。 3)GroupID(消息消费): 查看指定 Group 订阅指定 Topic 的消息量曲线图。 查看指定 Group 订阅指定 Topic 的消费成功率。 查看指定 Group 订阅指定 Topic 的消费耗时等指标。 查看指定 Group 订阅指定 Topic 的消息堆积相关指标。 可观测功能介绍 消息轨迹 在 Tracing 方面提供了消息轨迹功能,主要包含以下三方面能力: 1)便捷的查询能力:可根据消息基本信息查询相关的轨迹;二期还可以根据结果状态、耗时时长来过滤查询,过滤出有效轨迹快速定位问题。 2)详细的 tracing 信息:除了各个生命周期的时间和耗时数据,还包含了生产者、消费者的账号和机器信息。 3)优化展示效果:不同的消息类型轨迹;多个消费 GroupID 的场景;同个消费 GroupID 多次重投的场景等。 最佳实践 场景一:问题排查 1)目标:消息生产消费健康情况 2)原则 一级指标:用来报警的指标,公认的没有异议的指标。 二级指标:一级指标发生变化的时候,通过查看二级指标,能够快速定位问题的原因所在。 三级指标:定位二级指标波动原因。根据各自业务的特点和经验添加。 基于目标和原则,生产者用户和消费者用户问题排查和分析方式如下: 场景二:容量规划 容量规划场景下只要解决下面三个问题: 1)问题一:怎样评估实例容量? 解决方法: 实例详情页》查看指定实例数据统计,可以看到所选时间段内的最大消息收发的 TPS 峰值。 铂金版实例可以根据这个数据来添加报警监控和判断业务。 2)问题二:怎样查看标准版实例的消耗 解决方法: 可以查看概览总消息量模块 3)问题三:有哪些已下线,需要清理资源? 解决方法: 指定一段时间内(例如近一周),按 Topic 的消息发送量由小到大排序,查看是否有消息发送量为 0 的 Topic,这些 Topic 相关的业务或许已下线。 指定一段时间内(例如近一周),按 GroupID 的消息消费量由小到大排序,查看是否有消息消费量为 0 的 GroupID,这些 GroupID 相关的业务或许已下线。 场景三:业务规划 业务规划场景下主要解决以下三个问题: 1)问题一:如何查看业务峰值分布情况? 解决方法: 查看 Topic 消息接收量的每天的高峰时间段。 查看 Topic 消息接收量周末和非周某的消息量差别。 查看 Topic 消息接收量节假日的变化情况。 2)问题二:如何判断目前哪些业务有上升趋势? 解决方法: 查看消息量辅助判断业务量变化趋势。 3)问题三 :怎样优化消费者系统性能? 解决方法: 查看消息处理耗时,判断是否在合理范围内有提升的空间。 本篇文章通过消息队列、可观测能力、RocketMQ 可观测概念及功能和最佳实践的介绍,呈现了 RocketMQ 的可观测性工具在业务核心链路上的可视化能力,希望给大家在日常的线上的一些问题排查和运维过程中带来一些帮助。
作者:文婷、不周
#行业实践 #可观测

2022年1月14日

全链路灰度之 RocketMQ 灰度
之前的系列文章中,我们已经通过全链路金丝雀发布这个功能来介绍了 MSE 对于全链路流量控制的场景,我们已经了解了 Spring Cloud 和 Dubbo 这一类 RPC 调用的全链路灰度应该如何实现,但是没有涉及到消息这类异步场景下的流量控制,今天我们将以上次介绍过的《》中的场景为基础,来进一步介绍消息场景的全链路灰度。 虽然绝大多数业务场景下对于消息的灰度的要求并不像 RPC 的要求得这么严格,但是在以下两个场景下,还是会对消息的全链路有一定的诉求的。 1、第一种场景是在消息消费时,可能会产生新的 RPC 调用,如果没有在消息这一环去遵循之前设定好的全链路流量控制的规则,会导致通过消息产生的这部分流量“逃逸”,从而导致全链路灰度的规则遭到破坏,导致出现不符合预期的情况。 为了防止出现这个情况,我们需要在消费时候将消息里原来的流量标复原,并在 RPC 调用的时候遵循原来的规则。我们通过架构图来详细描述一下,满足这个逻辑之后,调用链路是怎样的,从下图中我们可以看到,灰度和基线环境生产出来的消息,虽然在消息推送的时候是随机的,但是在消费过程中,产生的新的 RPC 调用,还是能够回到流量原来所属的环境。 2、第二种场景需要更加严格的消息灰度隔离。比如当消息的消费逻辑进行了修改时,这时候希望通过小流量的方式来验证新的消息消费逻辑的正确性,要严格地要求灰度的消息只能被推送给灰度的消息消费者。 今天我们就来实操一下第二种场景消息的全链路灰度,目前 MSE 仅支持 RocketMQ 消息的灰度。若您使用的是开源版 RocketMQ,那么版本需要在 4.5.0 及以上,若您使用的是阿里云商业版 RocketMQ,那么需要使用铂金版,且 Ons Client 版本在 1.8.0.Final 及以上。如果只是想使用第一种场景,只需要给 B 应用开启全链路灰度的功能即可,不需要做额外的消息灰度相关的配置。 在这次最佳实践的操作中,我们是将应用部署在阿里云容器服务 Kubernetes 版本,即 ACK 集群来演示,但是事实上,消息灰度对于应用的部署模式是没有限制性要求的,您可以参考 MSE 帮助文档,找到自己所使用的部署模式对应的接入方式,也能使用消息全链路灰度。 前提条件 1. 开通 MSE 专业版,请参见开通 MSE 微服务治理专业版[1]。 2. 创建 ACK 集群,请参见创建 Kubernetes 集群[2]。 操作步骤 步骤一:接入 MSE 微服务治理 1、安装 mseackpilot 1. 登录容器服务控制台[3]。 2. 在左侧导航栏单击市场 应用目录。 3. 在应用目录页面点击阿里云应用,选择微服务,并单击 ackmsepilot。 4. 在 ackmsepilot 页面右侧集群列表中选择集群,然后单击创建。 安装 MSE 微服务治理组件大约需要 2 分钟,请耐心等待。 创建成功后,会自动跳转到目标集群的 Helm 页面,检查安装结果。如果出现以下页面,展示相关资源,则说明安装成功。 2、为 ACK 命名空间中的应用开启 MSE 微服务治理 1. 登录 MSE 治理中心控制台[4],如果您尚未开通 MSE 微服务治理,请根据提示开通。 2. 在左侧导航栏选择微服务治理中心 Kubernetes 集群列表。 3. 在 Kubernetes 集群列表页面搜索框列表中选择集群名称或集群 ID,然后输入相应的关键字,单击搜索图标。 4. 单击目标集群操作列的管理。 5. 在集群详情页面命名空间列表区域,单击目标命名空间操作列下的开启微服务治理。 6. 在开启微服务治理对话框中单击确认。 步骤二:还原线上场景 首先,我们将分别部署  springcloudzuul、springclouda、springcloudb、springcloudc 这四个业务应用,以及注册中心 Nacos Server 和消息服务 RocketMQ Server,模拟出一个真实的调用链路。 Demo 应用的结构图下图,应用之间的调用,既包含了 Spring Cloud 的调用,也包含了 Dubbo 的调用,覆盖了当前市面上最常用的两种微服务框架。其中 C 应用会生产出 RocketMQ 消息,由 A 应用进行消费,A 在消费消息时,也会发起新的调用。这些应用都是最简单的 Spring Cloud 、 Dubbo 和 RocketMQ 的标准用法,您也可以直接在 项目上查看源码。 部署之前,简单介绍一下这个调用链路 springcloudzuul 应用在收到 “/A/dubbo” 的请求时,会把请求转发给 springclouda ,然后 springclouda 通过 dubbo 协议去访问 springcloudb, springcloudb 也通过 dubbo 协议去访问 springcloudc,springcloudc 在收到请求后,会生产一个消息,并返回自己的环境标签和 ip。这些生产出来的消息会由 springclouda 应用消费,springclouda 应用在消费消息的时候,会通过 spring cloud 去调用 B,B 进而通过 spring cloud 去调用 C,并且将结果输出到自己的日志中。 当我们调用 /A/dubbo 的时候 返回值是这样 A[10.25.0.32] B[10.25.0.152] C[10.25.0.30] 同时,A 应用在接收到消息之后,输出的日志如下 20211228 10:58:50.301 INFO 1 [essageThread_15] c.a.mse.demo.service.MqConsumer : topic:TEST_MQ,producer:C[10.25.0.30],invoke result:A[10.25.0.32] B[10.25.0.152] C[10.25.0.30] 熟悉了调用链路之后,我们继续部署应用,您可以使用 kubectl 或者直接使用 ACK 控制台来部署应用。部署所使用的 yaml 文件如下,您同样可以直接在 上获取对应的源码。 部署 Nacos Server apiVersion: apps/v1 kind: Deployment metadata: name: nacosserver spec: selector: matchLabels: app: nacosserver template: metadata: annotations: labels: app: nacosserver spec: containers: env: name: MODE value: "standalone" image: registry.cnshanghai.aliyuncs.com/yizhan/nacosserver:latest imagePullPolicy: IfNotPresent name: nacosserver ports: containerPort: 8848 apiVersion: v1 kind: Service metadata: name: nacosserver spec: type: ClusterIP selector: app: nacosserver ports: name: http port: 8848 targetPort: 8848 部署业务应用 apiVersion: apps/v1 kind: Deployment metadata: name: springcloudzuul spec: selector: matchLabels: app: springcloudzuul template: metadata: annotations: msePilotCreateAppName: springcloudzuul labels: app: springcloudzuul spec: containers: env: name: JAVA_HOME value: /usr/lib/jvm/java1.8openjdk/jre name: enable.mq.invoke value: 'true' image: registry.cnshanghai.aliyuncs.com/yizhan/springcloudzuul:1.0.0 imagePullPolicy: Always name: springcloudzuul ports: containerPort: 20000 apiVersion: v1 kind: Service metadata: annotations: service.beta.kubernetes.io/alibabacloudloadbalancerspec: slb.s1.small service.beta.kubernetes.io/alicloudloadbalanceraddresstype: internet name: zuulslb spec: ports: port: 80 protocol: TCP targetPort: 20000 selector: app: springcloudzuul type: LoadBalancer status: loadBalancer: {} apiVersion: apps/v1 kind: Deployment metadata: name: springclouda spec: selector: matchLabels: app: springclouda template: metadata: annotations: msePilotCreateAppName: springclouda labels: app: springclouda spec: containers: env: name: JAVA_HOME value: /usr/lib/jvm/java1.8openjdk/jre image: registry.cnshanghai.aliyuncs.com/yizhan/springclouda:1.0.0 imagePullPolicy: Always name: springclouda ports: containerPort: 20001 livenessProbe: tcpSocket: port: 20001 initialDelaySeconds: 10 periodSeconds: 30 apiVersion: apps/v1 kind: Deployment metadata: name: springcloudb spec: selector: matchLabels: app: springcloudb template: metadata: annotations: msePilotCreateAppName: springcloudb labels: app: springcloudb spec: containers: env: name: JAVA_HOME value: /usr/lib/jvm/java1.8openjdk/jre image: registry.cnshanghai.aliyuncs.com/yizhan/springcloudb:1.0.0 imagePullPolicy: Always name: springcloudb ports: containerPort: 20002 livenessProbe: tcpSocket: port: 20002 initialDelaySeconds: 10 periodSeconds: 30 apiVersion: apps/v1 kind: Deployment metadata: name: springcloudc spec: selector: matchLabels: app: springcloudc template: metadata: annotations: msePilotCreateAppName: springcloudc labels: app: springcloudc spec: containers: env: name: JAVA_HOME value: /usr/lib/jvm/java1.8openjdk/jre image: registry.cnshanghai.aliyuncs.com/yizhan/springcloudc:1.0.0 imagePullPolicy: Always name: springcloudc ports: containerPort: 20003 livenessProbe: tcpSocket: port: 20003 initialDelaySeconds: 10 periodSeconds: 30 apiVersion: apps/v1 kind: Deployment metadata: name: rockectmqbroker spec: selector: matchLabels: app: rockectmqbroker template: metadata: labels: app: rockectmqbroker spec: containers: command: sh mqbroker 'n' 'mqnamesrv:9876' 'c /home/rocketmq/rocketmq4.5.0/conf/broker.conf' env: name: ROCKETMQ_HOME value: /home/rocketmq/rocketmq4.5.0 image: registry.cnshanghai.aliyuncs.com/yizhan/rocketmq:4.5.0 imagePullPolicy: Always name: rockectmqbroker ports: containerPort: 9876 protocol: TCP containerPort: 10911 protocol: TCP containerPort: 10912 protocol: TCP containerPort: 10909 apiVersion: apps/v1 kind: Deployment metadata: name: rocketmqnameserver spec: selector: matchLabels: app: rocketmqnameserver template: metadata: labels: app: rocketmqnameserver spec: containers: command: sh mqnamesrv env: name: ROCKETMQ_HOME value: /home/rocketmq/rocketmq4.5.0 image: registry.cnshanghai.aliyuncs.com/yizhan/rocketmq:4.5.0 imagePullPolicy: Always name: rocketmqnameserver ports: containerPort: 9876 protocol: TCP containerPort: 10911 protocol: TCP containerPort: 10912 protocol: TCP containerPort: 10909 protocol: TCP apiVersion: v1 kind: Service metadata: name: mqnamesrv spec: type: ClusterIP selector: app: rocketmqnameserver ports: name: mqnamesrv98769876 port: 9876 targetPort: 9876 安装成功后,示例如下: ➜ ~ kubectl get svc,deploy NAME TYPE CLUSTERIP EXTERNALIP PORT(S) AGE service/kubernetes ClusterIP 192.168.0.1 443/TCP 7d service/mqnamesrv ClusterIP 192.168.213.38 9876/TCP 47h service/nacosserver ClusterIP 192.168.24.189 8848/TCP 47h service/zuulslb LoadBalancer 192.168.189.111 123.56.253.4 80:30260/TCP 47h NAME READY UPTODATE AVAILABLE AGE deployment.apps/nacosserver 1/1 1 1 4m deployment.apps/rockectmqbroker 1/1 1 1 4m deployment.apps/rocketmqnameserver 1/1 1 1 5m deployment.apps/springclouda 1/1 1 1 5m deployment.apps/springcloudb 1/1 1 1 5m deployment.apps/springcloudc 1/1 1 1 5m deployment.apps/springcloudzuul 1/1 1 1 5m 同时这里我们可以通过 zuulslb 来验证一下刚才所说的调用链路 ➜ ~ curl http://123.56.253.4/A/dubbo A[10.25.0.32] B[10.25.0.152] C[10.25.0.30] 步骤三:开启消息灰度功能 现在根据控制台的提示,在消息的生产者 springcloudc 和消息的消费者 springclouda 都开启消息的灰度。我们直接通过 MSE 的控制台开启,点击进入应用的详情页,选择“消息灰度”标签。 可以看到,在未打标环境忽略的标签中,我们输入了 gray,这里意味着,带着 gray 环境标的消息,只能由 springcloudagray 消费,不能由 springclouda 来消费。 _1、这里需要额外说明一下,因为考虑到实际场景中,springcloudc 应用和 springclouda  应用的所有者可能不是同一个人,不一定能够做到两者同时进行灰度发布同步的操作,所以在消息的灰度中,未打标环境默认的行为是消费所有消息。这样 springcloudc 在进行灰度发布的时候,可以不需要强制 springclouda 应用也一定要同时灰度发布。_ _2、我们把未打标环境消费行为的选择权交给 springclouda 的所有者,如果需要实现未打标环境不消费 cgray 生产出来的消息,只需要在控制台进行配置即可,配置之后实时生效。_ 使用此功能您无需修改应用的代码和配置。 消息的生产者和消息的消费者,需要同时开启消息灰度,消息的灰度功能才能生效。 消息类型目前只支持 RocketMQ,包含开源版本和阿里云商业版。 如果您使用开源 RocketMQ,则 RocketMQ Server 和 RocketMQ Client 都需要使用 4.5.0 及以上版本。 如果您使用阿里云 RocketMQ,需要使用铂金版,且 Ons Client 使用 1.8.0.Final 及以上版本。 开启消息灰度后,MSE 会修改消息的 Consumer Group。例如原来的 Consumer Group 为 group1,环境标签为 gray,开启消息灰度后,则 group 会被修改成 group1_gray,如果您使用的是阿里云 RocketMQ ,请提前创建好 group。 默认使用 SQL92 的过滤方式,如果您使用的开源 RocketMQ,需要在服务端开启此功能(即在 broker.conf 中配置 enablePropertyFilter=true)。 默认情况下,未打标节点将消费所有环境的消息,若需要指定 未打标环节点 不消费 某个标签环境生产出来的消息,请配置“未打标环境忽略的标签”,修改此配置后动态生效,无需重启应用。 步骤四:重启节点,部署新版本应用,并引入流量进行验证 首先,因为开启和关闭应用的消息灰度功能后都需要重启节点才能生效,所以首先我们需要重启一下 springclouda 和 springcloudc 应用,重启的方式可以在控制台上选择重新部署,或者直接使用 kubectl 命令删除现有的 pod。 然后,继续使用 yaml 文件的方式在 Kubernetes 集群中部署新版本的 springcloudagray、springcloudbgray 和 springcloudcgray apiVersion: apps/v1 kind: Deployment metadata: name: springcloudagray spec: selector: matchLabels: app: springcloudagray template: metadata: annotations: alicloud.service.tag: gray msePilotCreateAppName: springclouda labels: app: springcloudagray spec: containers: env: name: JAVA_HOME value: /usr/lib/jvm/java1.8openjdk/jre image: registry.cnshanghai.aliyuncs.com/yizhan/springclouda:1.0.0 imagePullPolicy: Always name: springcloudagray ports: containerPort: 20001 livenessProbe: tcpSocket: port: 20001 initialDelaySeconds: 10 periodSeconds: 30 apiVersion: apps/v1 kind: Deployment metadata: name: springcloudbgray spec: selector: matchLabels: app: springcloudbgray template: metadata: annotations: alicloud.service.tag: gray msePilotCreateAppName: springcloudb labels: app: springcloudbgray spec: containers: env: name: JAVA_HOME value: /usr/lib/jvm/java1.8openjdk/jre image: registry.cnshanghai.aliyuncs.com/yizhan/springcloudb:1.0.0 imagePullPolicy: Always name: springcloudbgray ports: containerPort: 20002 livenessProbe: tcpSocket: port: 20002 initialDelaySeconds: 10 periodSeconds: 30 apiVersion: apps/v1 kind: Deployment metadata: name: springcloudcgray spec: selector: matchLabels: app: springcloudcgray template: metadata: annotations: alicloud.service.tag: gray msePilotCreateAppName: springcloudc labels: app: springcloudcgray spec: containers: env: name: JAVA_HOME value: /usr/lib/jvm/java1.8openjdk/jre image: registry.cnshanghai.aliyuncs.com/yizhan/springcloudc:1.0.0 imagePullPolicy: Always name: springcloudcgray ports: containerPort: 20003 livenessProbe: tcpSocket: port: 20003 initialDelaySeconds: 10 periodSeconds: 30 部署完成之后,我们引入流量,并进行验证 1. 登录 MSE 治理中心控制台[4],选择应用列表。 2. 单击应用 springclouda 应用详情菜单,此时可以看到,所有的流量请求都是去往 springclouda 应用未打标的版本,即稳定版本。 3. 点击页面下方的 标签路由中的添加按钮,为 springclouda 应用的 gray 版本设置灰度规则。 4. 发起流量调用,我们通过 zuulslb,分别发起流量调用,并查看灰度的情况。 我们通过 springclouda 和 springcloudagray 的日志去查看消息消费的情况。可以看到,消息的灰度功能已经生效, springcloudagray 这个环境,只会消费带有 gray 标的消息,springclouda 这个环境,只会消费未打标的流量生产出来的消息。 在截图中我们可以看见,springcloudagray 环境输出的日志  topic:TEST_MQ, producer: Cgray [10.25.0.102] , invoke result: Agray[10.25.0.101] Bgray[10.25.0.25] Cgray[10.25.0.102], springcloudagray 只会消费 Cgray 生产出来的消息,而且消费消息过程中发起的 Spring Cloud 调用,结果也是 Agray[10.25.0.101] Bgray[10.25.0.25] Cgray[10.25.0.102],即在灰度环境闭环。 而 springclouda 这个环境,输出的日志为 topic:TEST_MQ,producer:C[10.25.0.157],invoke result:A[10.25.0.100] B[10.25.0.152] C[10.25.0.157],只会消费 C 的基线环境生产出来的消息,且在这个过程中发起的 Spring Cloud 调用,也是在基线环境闭环。 步骤五:调整消息的标签过滤规则,并进行验证 因为考虑到实际场景中,springcloudc 应用和 springclouda  应用的所有者可能不是同一个人,不一定能够做到两者同时进行灰度发布同步的操作,所以在消息的灰度中,未打标环境默认的行为是消费所有消息。这样 springcloudc 在进行灰度发布的时候,可以不需要强制 springclouda 应用也一定要同时灰度发布,且使用相同的环境标。 springclouda 在消费时候,未打标环境的行为的选择权是交给 springclouda 的所有者,如果需要实现未打标环境不消费 cgray 生产出来的消息,只需要在控制台进行配置即可,配置之后实时生效。 1. 调整 springclouda 未打标环境的过滤规则。比如这里我们要选择未打标环境不再消费 gray 环境生产出来的消息,只需要在“未打标环境忽略的标签”里面选择 gray,然后点击确定即可。 2. 调整规则之后,规则是可以动态地生效,不需要进行重启的操作,我们直接查看 springclouda 的日志,验证规则调整生效。 从这个日志中,我们可以看到,此时基线环境可以同时消费 gray 和 基线环境生产出来的消息,而且在消费对应环境消息时产生的 Spring Cloud 调用分别路由到 gray 和 基线环境中。 操作总结 1. 全链路消息灰度的整个过程是不需要修改任何代码和配置的。 2. 目前仅支持 RocketMQ,Client 版本需要在 4.5.0 之后的版本。RocketMQ Server 端需要支持 SQL92 规则过滤,即开源 RocketMQ 需要配置 enablePropertyFilter=true,阿里云 RocketMQ 需要使用铂金版。 3. 开启消息灰度后,MSE Agent 会修改消息消费者的 group,如原来的消费 group 为 group1,环境标签为 gray,则 group 会被修改成 group1_gray,如果使用的是阿里云 RocketMQ,需要提前创建好修改后的 group。 4. 开启和关闭消息灰度后,应用需要重启才能生效;修改未打标环境忽略的标签功能可以动态生效,不需要重启。 相关链接 [1] MSE 微服务治理专业版: [2] Kubernetes 集群: [3] 容器服务控制台: [4] MSE 治理中心控制台
作者:亦盏
#行业实践 #功能特性

2021年12月11日

“全”事件触发:阿里云函数计算与事件总线产品完成全面深度集成
随着云原生技术的普及和落地,企业在构建业务系统时,往往需要依赖多个云产品和服务,产品互联、系统协同的需求越来越强。事件驱动架构将事件应用于解耦服务之间的触发和交互, 能够帮助用户很好实现产品、系统之间的互联互动。函数计算作为事件驱动架构的最佳选择,需要为用户提供丰富的事件源触发能力。 对于函数计算而言,事件源接入需要清晰地了解上游每一个事件源的诸多细节和鉴权要求,同时事件处理和系统错误追踪变得越加困难,集成效率成为阻碍产品能力的最大障碍。为了加速事件源集成的效率,函数计算需要找到一种统一标准的事件源接入方式,基于通用的接入层进行基础能力和可观测性的建设,为客户提供丰富的事件源触发选择。 在这样的背景和需求下,阿里云函数计算(Function Compute)和阿里云事件总线(EventBridge)产品完成全面深度集成。这意味着函数计算和阿里云生态各产品及业务 SaaS 系统有了统一标准的接入方式,意味着函数计算将具备接入 EventBridge 所有事件源的触发能力,Serverless 函数计算将实现触达阿里云全系产品服务的“最后一公里”,为基于阿里云生态产品提供重要的架构扩展能力。 为什么是 EventBridge? 阿里云事件总线(EventBridge)是一种无服务器事件总线,支持将用户的应用程序、第三方软件即服务(SaaS)数据和阿里云服务的数据通过事件的方式轻松的连接到一起,这里汇聚了来自云产品及 SaaS 服务的丰富事件,EventBridge 具备事件标准化和接入标准化的能力: 事件标准化:EventBridge 遵循业界标准的 CloudEvent 事件规范,汇聚了来自阿里云生态和 EventBridge 合作伙伴丰富事件源的各种事件,同时提供了完善的事件投递机制和消费策略,整个系统事件流转遵循统一的事件格式; 接入标准化:函数计算选择和 EventBridge 集成,无论是产品服务类型众多的阿里云官方事件源,还是第三方 SaaS 系统,EventBridge 都能够为函数计算和其它系统集成提供统一的集成界面,函数计算无需关注上游事件源的具体实现细节,只需要专注于事件处理,将事件的集成和投递全部交给 EventBridge 来处理; EventBridge  + Function Compute 的结合让事件驱动型应用程序的构建变得简单,因为它可以为您完成事件摄取和交付、安全保障、授权以及错误处理工作。允许您构建松散耦合和分布的事件驱动型架构,帮助提高开发人员敏捷性和应用程序弹性。函数计算系统提供了完善的函数创建, 发布和运行体系,灵活的构建能力结合极致的运行时弹性能力将帮助业务构建云原生时代最富显著特征的事件驱动型架构。 同时,EventBridge 能够提供来自事件源(例如 MQ、OSS、RDB等)的实时数据流,并将该数据路由到阿里云函数计算作为目标。您可以设置路由规则来确定发送数据的目的地,以便构建能够实时响应所有数据源的应用程序架构。 函数计算 + EventBridge 带来的变化? 提供 90+ 事件源接入 在和 EventBridge 集成之前, 函数计算已经实现了和阿里云部分核心系统的集成,随着函数计算 EventBridge 的深度集成,阿里云生态大量服务实现了和函数计算集成, 这些服务或产品的事件将作为事件源触发函数;目前函数计算触发器类型已经从原来的 15+ 增加到 90+,并随着 EventBridge 上游接入系统的增加而不断丰富; 控制台享受一站式服务 EventBridge 和函数计算控制台数据互通,用户在 EventBridge 控制台能够以事件为主体选择函数计算作为事件处理目标,在 EventBridge 控制台享受一站式服务;同样在函数计算控制台,用户能够根据不同触发器类型根据对应的事件类型编写函数;用户无需在函数计算控制台和事件总线控制台来回跳转; 保证数据一致性和稳定性 用户无论是在函数计算控制台上通过创建触发器的方式处理指定事件源的事件;还是在 EventBridge 控制台使用函数计算作为事件处理目标,提供统一的资源视图;同时在底层系统实现上,由于后端系统 API 的深度集成,能够保证上层业务逻辑采用统一的 API 及处理逻辑,从技术层面确保了多个入口功能实现的一致性,为客户系统稳定运行奠定坚实的基础; 简化数据消费投递的复杂度 对于数据消费场景,EventBridge 负责了上游系统的对接和数据消费,用户无需关心事件源系统数据具体消费方式,这部分工作统一由 EventBridge 完成;对于函数计算用户,只需要考虑数据投递的逻辑;用户可以直接选择 EventBridge 提供的下游 Target 实现数据投递,也可以在代码层面仅使用 EventBridge 提供的 SDK 实现数据的投递,大大简化了数据投递的复杂度。 触发器业务应用场景 下面就让我们一起探索, 实际的业务生产环境,我们如何利用这两把利器让这一切简单的发生: 自动化运营分析和展示 业务系统会产生大量动态指标数据,需要提取指标数据做运营分析和展示,通过 EventBridge 和 FC 异步化串联实现自动化运营分析和展示。传统方案需要基于实时计算或者离线计算产品做数据提取和分析,整个方案较重,配置复杂。数据分析结果需要做预定义的展示渲染和推送,需要手工对接业务系统,步骤繁琐。 采用新的 EDA 架构,采用 EventBridge 对接业务自定义事件数据,规则驱动过滤逻辑简单。采用 FC 可以轻量化实现常见的数据分析操作,代码编写调试更简单;同时利用EventBridge 丰富的推送能力,可以实现分析结果快速触达受众。 异步解耦 以交易引擎为例,交易系统引擎作为最核心的系统,每笔交易订单数据需要被几十几个下游业务系统关注,包括物品批价、发货、积分、流计算分析等等,多个系统对消息的处理逻辑不一致,单个系统不可能去适配每一个关联业务。结合 EventBridge 事件中心和函数计算灵活的逻辑扩展能力构建业务逻辑。 新零售大促场景 Serverless + EDA 整合 大型新零售场景会伴随不定期大促,平时流量不大的业务在大促场景也会产生系统流量突增,极致弹性和稳定解耦的架构至关重要。基于传统模式开发稳定可靠、高弹性的后台服务人力不足、工期紧张;大促场景保障峰值流量需要预留大量资源,平时低峰期资源闲置浪费。新零售大促场景利用函数计算 + EventBridge + API 网关搭建 Serverless 模式服务中台,支撑海量请求访问, 系统具备极致弹性,无需预留管理 IaaS 资源,极大程度降低闲置成本;同时函数计算提供敏捷开发结合 EventBridge 低代码异步驱动,业务迭代效率大幅提升。 总结 如果说事件背后的服务是阿里云生态服务的积木, 那么 Serverless 函数计算将是能够将这些积木通过轻巧的方式组合起来艺术化的最佳手段;你可以利用函数计算为这些积木涂上更绚丽的色彩,同时能够将他们串联起来,搭建一个具有无比想象空间的 SaaS/PaaS 服务艺术品。 EventBridge 触发器现已在阿里云函数计算控制台所有地域(Region)开放,欢迎大家点击进行使用体验! 关于触发器具体创建,配置,参考阿里云函数计算官方帮助文档:
作者:史明伟(世如)
#行业实践 #生态集成 #云原生