将数据仓库迁移至 BigQuery:数据流水线

本文档是系列文章中的一篇,该系列讨论了如何迁移可将数据加载到数据仓库的上游数据流水线。本文档讨论了数据流水线:数据流水线的定义以及迁移时的注意事项。

该系列文章包含以下部分:

简介

本文档可帮助您了解什么是数据流水线、它可以采用哪些流程和模式,以及大型数据仓库迁移可使用哪些迁移方案和技术。

什么是数据流水线?

在计算中,数据流水线是一种通过一系列关联的处理步骤处理数据的应用。作为一个泛化概念,数据流水线可用于信息系统之间的数据转移、提取、转换和加载 (ETL)、数据丰富以及实时数据分析。通常,数据流水线以批处理的形式运作,可在运行时执行和处理数据;或者以流处理的形式运作,在数据可用时连续执行和处理数据

在数据仓储情境中,数据流水线通常用于读取事务系统中的数据、应用转换,然后将数据写入数据仓库。每个转换都由一个函数描述,而任何给定函数的输入都是前一个函数或前几个函数的输出。我们用一个图来描述这些关联的函数,该图通常被称为有向无环图 (DAG)。其中,图沿一个方向(从来源到目标),最后不形成环形,而任何函数的输入都不能依赖于 DAG 下游的另一个函数的输出。也就是说,不允许循环。图中的每个节点都是一个函数,每条边都表示从一个函数传递到下一个函数的数据。初始函数是来源或通向来源数据系统的连接。最终函数是接收器或通向目标数据系统的连接

在数据流水线情境中,来源通常是事务系统(例如 RDBMS)和连接到数据仓库的接收器。这种图称为数据流 DAG。您还可以使用 DAG 编排数据流水线与其他系统之间的数据移动。此用法称为编排或控制流 DAG

何时迁移数据流水线

用例迁移到 BigQuery 时,您可以选择分流完整迁移

一方面,当您分流用例时,您无需事先迁移上游数据流水线。您可以首先将用例架构和数据从现有数据仓库迁移到 BigQuery。然后,建立从旧版数据仓库到新版数据仓库的增量副本,使数据保持同步。最后,迁移并验证下游过程,例如脚本、查询、信息中心和业务应用。

此时,您的上游数据流水线未发生更改,仍在将数据写入现有数据仓库。您可以将分流的用例再次添加到迁移积压工作项,以便在后续的迭代中完整迁移。

另一方面,当您完整迁移用例时,用例所需的上游数据流水线会迁移到 Google Cloud。完整迁移需要您先分流用例。完整迁移后,您可以弃用本地数据仓库中的相应旧表,因为数据已直接提取到 BigQuery。

在迭代过程中,您可以选择以下某个选项:

  • 仅分流您的用例。
  • 完整迁移先前分流的用例。
  • 先在同一迭代中分流用例,然后从头开始完整迁移用例。

完整迁移所有用例后,您可以选择关闭旧仓库,这是减少开销和费用的重要步骤。

如何迁移数据流水线

本文档的其余部分介绍了如何迁移数据流水线,包括使用哪些方法和流程以及应用哪些技术。从修改现有数据流水线的用途(重定向它们来加载 BigQuery),到重写数据流水线来充分利用 Google Cloud 代管式服务,你有多种选项可使用。

数据流水线的流程和模式

您可以使用数据流水线来执行一些流程和模式。这些流水线在数据仓储中最常用。您可能有批量数据流水线或流式数据流水线。批量数据流水线处理一段时间内收集的数据(例如每天一次)。流式数据流水线处理操作系统生成的实时事件,例如由您的联机事务处理 (OLTP) 数据库生成的 CDC 行更改。

提取、转换和加载 (ETL)

在数据仓储情境中,数据流水线通常会执行提取、转换和加载 (ETL) 流程。ETL 技术在数据仓库外运行,这意味着数据仓库的资源主要用于并行查询,而不是用于准备和转换数据。在数据仓库外执行转换的一个缺点是,您需要学习(SQL 以外的)其他工具和语言来表达转换。

下图展示了典型的 ETL 流程。

流程显示来源(提取)转到一个或多个转换(转换),然后转到接收器,最后转到数据仓库(加载)

图 1. 典型的 ETL 流程。

典型的 ETL 数据流水线从一个或多个来源系统(越少越好,以免因系统不可用等问题引起故障)拉取数据。然后,流水线执行清理数据、向数据应用业务规则、检查数据完整性和创建聚合或分解等一系列转换。如需了解详情,请参阅实际的 ETL 周期

拥有多个数据流水线是很常见的。第一个流水线主要将数据从来源系统复制到数据仓库。后续的流水线应用业务逻辑并转换用于各种数据集市的数据,数据集市是数据仓库的一部分,侧重于特定业务部门或业务重点。

如果您有多个数据流水线,则需要对其进行编排。下图展示了此编排过程。

Orchestrator (DAG) 管理两个 ETL 进程(子 DAG)

图 2. 多个数据流水线的编排过程。

在该图中,每个数据流水线都被视为编排 DAG 的子 DAG。每个编排 DAG 都包含多个数据流水线以满足更大的目标。例如,为业务部门准备数据,使业务分析人员能够处理信息中心或报告。

提取、加载和转换 (ELT)

ELT 是 ETL 的替代方案。借助 ELT,数据流水线可分为两个部分。第一部分,ETL 技术从来源系统提取数据并将其加载到数据仓库中。第二部分,以数据仓库为基础的 SQL 脚本执行转换。这种方法的优点是,您可以使用 SQL 来表达转换;缺点是,这可能会消耗并行查询所需的数据仓库资源。因此,ELT 批处理通常在夜间(或非高峰期)运行,数据仓库的系统资源在那时的需求较少。

下图展示了典型的 ELT 流程。

流程显示来源(提取)转到一个或多个转换器(转换),然后转到接收器,最终转到数据仓库(加载)。

图 3. 典型的 ELT 流程。

当您采用 ELT 方法时,通常将提取和加载分到一个 DAG 中,将转换分到它们自己的 DAG 中。数据一次性加载到数据仓库中,然后经过多次转换生成各种表格,以在下游中用于报告等用途。而这些 DAG 在较大的编排 DAG 中又成为子 DAG(如 ETL 部分中所示)

在您将数据流水线从拥塞的本地数据仓库迁移到云端时,请务必注意,BigQuery 之类的云数据仓库系统是并行数据大规模处理技术。事实上,如果使用 BigQuery,您可以购买更多资源来支持不断增长的 ELT 和并行查询需求。如需了解详情,请参阅性能优化的槽部分

提取和加载 (EL)

您可单独使用提取和加载 (EL) 流程,也可后续进行转换(此时它会变为 ELT)。单独提到 EL 的原因是有多项自动化服务可执行此任务,让您无需创建自己的提取数据流水线。如需了解详情,请参阅 BigQuery Data Transfer Service

变更数据捕获 (CDC)

变更数据捕获 (CDC) 是用于跟踪数据变化的若干软件设计模式之一。它通常用于数据仓储,因为数据仓库用于整理和跟踪各种源系统中的数据以及数据在一段时间后在各种来源系统中的变动。

下图展示了 CDC 如何与 ELT 配合工作的示例。

ETL 流程显示各条记录在提取时分配了版本信息并且在加载时添加了时间戳。

图 4. CDC 如何与 ELT 配合工作。

CDC 非常适合 ELT,因为您希望先存储原始记录,然后再对下游进行更改。

为实现 EL 部分,您可以使用 Debezium 等 CDC 软件并通过 Dataflow 将记录写入 BigQuery,以此来处理数据库日志。然后,您可以使用 SQL 查询来确定最新版本,然后再应用进一步的转换。示例如下:

WITH ranked AS (
  SELECT
    *,
    ROW_NUMBER() OVER (
      PARTITION BY <record_key>
      ORDER BY <event_timestamp> DESC
    ) AS rank
  FROM <table>
)
SELECT *
FROM ranked
WHERE rank = 1

当您重构或创建新的数据流水线时,请考虑将所应用的 CDC 模式作为一个 ELT 流程。这种方法可确保您拥有上游完整的数据更改历史记录,并且能够很好地分离职责,例如:

  • 来源系统团队确保其日志的可用性及其数据事件的发布。
  • 数据平台团队确保原始记录的提取排序规则包括数据仓库中的时间戳。
  • 数据工程和分析人员团队安排一系列转换来填充其数据集市。

使用操作数据流水线实现反馈环

操作数据流水线是数据处理流水线,它从数据仓库获取数据,根据需要对其进行转换,并将结果写入操作系统,由此而得名

操作系统是指处理组织日常事务的系统,如 OLTP 数据库、客户关系管理 (CRM) 系统、产品目录管理 (PCM) 系统等。由于这些系统通常充当数据源,因此操作数据流水线实现了反馈环模式

操作数据流水线模式如下图所示:

ETL 流水线流向数据仓库,然后流向操作流水线,该流水线流回来源系统,来源系统流向 ETL 流水线。

图 5. 操作数据流水线的模式。

以下示例描述了将产品价格写入 PCM 系统的操作数据流水线。PCM 系统是适用于与销售有关的产品信息(例如颜色、销售渠道、价格和季节效应)的权威系统。以下是端到端数据流:

  • 与价格相关的数据来自多个来源。该数据可能包括 PCM 中各区域当前的价格、第三方服务中竞争者的价格,以及内部系统中的需求预测和供应商可靠性等。
  • ETL 流水线从来源中拉取数据,对其进行转换,并将结果写入数据仓库。本例中的转换是涉及所有来源的复杂计算,旨在为 PCM 中的每个产品生成最佳基本价格。
  • 最后,操作流水线从数据仓库中获取基本价格,执行轻度转换以针对季节性活动调整价格,并将最终价格写回 PCM 中。

PCM 系统流向 ETL 系统。

图 6. 将产品价格写入 PCM 系统的操作数据流水线。

操作数据流水线是一种下游过程,而实现 ETLELTCDC 的数据流水线是上游过程。不过,用于实施二者的工具可能会重叠。例如,您可以使用 Dataflow 来定义和运行所有数据处理 DAG,使用标准 SQL 来定义在 BigQuery 内执行的转换,使用 Cloud Composer 来编排端到端数据流。

选择迁移方式

本部分介绍了迁移数据流水线时可采用的不同方法。

重定向数据流水线以写入 BigQuery

如果旧版数据仓库的数据来自执行 ETL 流程的数据流水线,也就是在将数据存储到数据仓库之前执行转换逻辑时,请考虑所用的技术是否提供原生 BigQuery 接收器(写入连接器)。独立软件供应商 (ISV) 提供带有 BigQuery 连接器的数据处理技术,例如:

如果数据流水线技术不支持将数据提取到 BigQuery,请考虑使用此方法的变体将数据临时写入文件中,再由 BigQuery 提取这些文件。

数据流水线被阻止流向旧系统,改为流向 BigQuery。

图 7. 重写或重新配置数据流水线的最后一个函数以将数据写入 BigQuery。

概括来讲,所涉及的工作包括重写或重新配置数据流水线的最后一个函数,以将数据写入 BigQuery。不过,许多选项可能需要你另外更改或新增工作,例如:

功能性

  • 数据映射:鉴于目标数据库表架构可能发生更改,您可能需要重新配置这些映射。
  • 指标验证:您必须同时验证历史报告和新报告,因为架构和查询可能都会发生更改。

非功能性

  • 防火墙可能需要配置为允许从本地到 BigQuery 的数据出站流量。
  • 因为会有数据出站流量,可能需要更改网络才能容纳额外的带宽。

通过使用文件作为中间媒介来重定向数据流水线

如果现有的本地数据流水线技术不支持 Google API,或者您无法使用 Google API,则可以使用文件作为将数据传输到 BigQuery 的中间媒介。

此方法类似于重定向方法,但您使用的不是可以写入 BigQuery 的原生接收器,而是可以写入本地文件系统的接收器。当您的数据存储在文件系统中时,请将文件复制到 Cloud Storage。如需了解详情,请参阅 Cloud Storage 的提取选项概览以及选择提取选项时所涉及的标准。

最后一步是按照文档中列出的准则,将数据从 Cloud Storage 加载到 BigQuery。

下图展示了本部分中介绍的方法。

ETL 流水线流向文件系统而不是旧数据仓库;而文件系统又流向 Cloud Storage,然后再从此处流向 BigQuery。

图 8. 通过使用文件作为中间媒介来重定向数据流水线。

关于 ETL 流水线的编排,您需要执行两个单独的步骤:

  1. 重复使用现有的本地流水线编排,将转换后的数据写入文件系统。扩展此编排,将文件从本地文件系统复制到 Cloud Storage,或者创建一个定期运行的附加脚本来执行复制步骤。
  2. 当数据存储在 Cloud Storage 中时,使用 Cloud Storage 转移作业来安排从 Cloud Storage 到 BigQuery 的周期性加载作业。Cloud Storage 转移作业的替代方案是 Cloud Storage 触发器Cloud Composer

注意,图 8 中 Google Cloud 上的编排通过利用 SFTP 之类的协议来检索文件,也可以使用拉取模型。

将现有 ELT 流水线迁移到 BigQuery

ELT 流水线由两部分组成:一部分将数据加载到数据仓库中,另一部分使用 SQL 转换数据,使其能够在下游使用。迁移 ELT 流水线时,每个部分都有自己的迁移方法。

对于将数据加载到数据仓库的部分(即 EL 部分),您可以遵循重定向数据流水线部分中的准则,其中的转换建议不属于 EL 流水线,因此不必遵循。

如果您的数据源直接BigQuery Data Transfer Service (DTS) 支持或通过第三方集成受到支持,您可以使用 DTS 替换您的 EL 流水线。Fivetran 解决方案介绍了 Fivetran 连接器如何自动从来源中提取数据、对数据进行归一化并应用一些简单的清理,然后将其路由到 BigQuery,从而在迁移过程中为您提供帮助。

在数据加载到数据仓库后进行转换的部分,使用 SQL 执行转换。如需了解如何将非标准 SQL 迁移到 BigQuery 支持的 ISO SQL 2011 标准,请参阅本系列中的查询转换文档,其中还建议了如何迁移存储过程

将现有 OSS 数据流水线迁移到 Dataproc

在将数据流水线迁移到 Google Cloud 时,建议您迁移使用 Apache HadoopApache SparkApache Flink 等开源软件框架编写的一些旧作业。

利用 Dataproc,您能够以简单、经济高效的方式部署快速、易用的全代管式 Hadoop 和 Spark 集群。Dataproc 集成了 BigQuery 连接器,这是一个 Java 库,使 Hadoop 和 Spark 能够使用精简版本的 Apache Hadoop InputFormatOutputFormat 类直接向 BigQuery 写入数据。

Dataproc 可轻松创建和删除集群,因此你可摒弃单个集群,而改用多个临时集群。这样做具有很多优势:

  • 您可以为各个作业使用不同的集群配置,从而消除跨作业管理工具的管理负担。
  • 您可以扩缩集群使其适合单个作业或一组作业。
  • 您只需在作业使用资源时为其付费。
  • 您无需随着时间的推移维护集群,因为每次使用它们时系统都会对其进行重新配置。
  • 您无需分别为开发、测试和生产维护不同的基础架构。您可以使用相同的定义在需要时按需创建任意数量、不同版本的集群。

迁移作业时,我们建议您采用增量方法。通过逐步迁移,您可以执行以下操作:

  • 将现有 Hadoop 基础架构中的单个作业与成熟环境中固有的复杂性隔离开来。
  • 单独检查每个作业以评估其需求并确定迁移的最佳途径。
  • 在出现意外问题时及时处理,同时不延迟相关任务。
  • 为每个复杂流程创建概念验证,而不影响生产环境。
  • 谨慎地将作业迁移到推荐的临时模型。

当您将现有 Hadoop 和 Spark 作业迁移到 Dataproc 时,可以检查受支持的 Dataproc 版本是否涵盖作业的依赖项。如需安装自定义软件,您可以考虑使用一些可用的初始化操作(例如 Apache Flink 的初始化操作)创建自定义 Dataproc 映像、编写自定义初始化操作,或指定自定义 Python 软件包要求

如需开始使用此工具,请参阅 Dataproc 快速入门指南BigQuery 连接器代码示例。另请参阅如何将 Hadoop 作业从本地迁移到 Dataproc 以及将 Apache Spark 作业迁移到 Dataproc 的指南。

重新托管要在 Google Cloud 上运行的第三方数据流水线

在构建本地数据流水线时,一种常见的情况是使用第三方软件来管理流水线的执行和计算资源的分配。

为了将这些流水线迁移到云端,您可以根据所用软件的功能及许可、支持和维护条款从多种备选方案中进行选择。

以下部分介绍了其中部分备选方案。

概括来讲,您在 Google Cloud 中执行第三方软件时具有以下备选方案(复杂度由低到高排列):

  • 您的软件供应商已与 Google Cloud 合作,可在 Google Cloud Marketplace 中提供其软件。
  • 您的第三方软件供应商可在 Kubernetes 上运行。
  • 您的第三方软件可在一个或多个虚拟机 (VM) 上运行。

如果您的第三方软件提供了 Cloud Marketplace 解决方案,则涉及的工作如下所示:

这种备选方案最为简单,因为您可以使用供应商提供的熟悉平台将数据流水线添加到云端。您还可以使用供应商提供的专有工具,更快地在原始环境与 Google Cloud 上的新环境之间进行迁移。

如果您的供应商未提供 Cloud Marketplace 解决方案,但他们的产品能够在 Kubernetes 上运行,您可以使用 Google Kubernetes Engine (GKE) 托管您的流水线。涉及的工作如下:

  • 按照供应商的建议创建 GKE 集群,确保第三方产品可以利用 Kubernetes 提供的任务并行功能。
  • 按照供应商的建议,将您的第三方软件安装到 GKE 集群上。
  • 按照本系列概览部分中介绍的迭代方法选择和迁移您的用例。

此备选方案的复杂程度处于中等水平。它利用供应商对 Kubernetes 的原生支持来扩缩和并行执行流水线。不过,您需要创建并管理 GKE 集群。

如果您的供应商不支持 Kubernetes,您必须将其软件安装到虚拟机池上,以实现工作的横向扩容和并行化处理。如果供应商软件原生支持将工作分配到多个虚拟机,请使用提供的这些工具,可能会将虚拟机实例分到托管实例组 (MIG) 中,以根据需要进行扩缩。

处理工作的并行化非常重要。如果供应商不提供将任务分配到不同虚拟机的功能,我们建议您使用任务设定模式,将工作分配到 MIG 中的虚拟机。下图对这种方法进行了说明。

多项输入会转到创建主题的 Pub/Sub。主题由不同的 MIG 读取。

图 9. 包含三个虚拟机的托管实例组 (MIG)。

在该图中,MIG 中的每个虚拟机都执行第三方流水线软件。您可以通过多种方式触发流水线执行:

实质上,这些方法都会向预定义的 Pub/Sub 主题发送消息。您创建一个要安装到每个虚拟机中的简单代理。该代理会侦听一个或多个 Pub/Sub 主题。只要主题收到消息,代理就会从主题中拉取消息,在第三方软件中启动流水线并侦听其完成情况。流水线完成后,代理会从其正在侦听的主题中检索下一条消息。

在各种情况下,我们都建议您与您的供应商合作,按照关于在 Google Cloud 上使用流水线的相应许可条款操作。

重写数据流水线以使用 Google Cloud 代管式服务

在某些情况下,您可以选择重写一些现有数据流水线,来使用完全托管在 Google Cloud 上的新框架和服务。如果您的现有流水线最初是使用现在已被弃用的技术实现的,或者您预计在云端移植并继续维护未修改的流水线很不实际或费用太高,那么此选项就比较适合。

以下部分介绍了让您能够大规模执行高级数据转换的全代管式 Google Cloud 服务:Cloud Data Fusion 和 Dataflow。

Cloud Data Fusion

Cloud Data Fusion 是一项基于开源 CDAP 项目的全代管式数据集成服务,可通过图形界面构建和管理数据流水线。

您可将来源连接到转换、接收器和其他节点以构建 DAG,从而在 Cloud Data Fusion 界面中开发数据流水线。部署数据流水线时,Cloud Data Fusion 计划程序会将此 DAG 转换为一系列并行计算,这些计算将作为 Dataproc 上的 Apache Spark 作业执行。

使用 Cloud Data Fusion 时,您可以使用 Java Database Connectivity (JDBC) 驱动程序连接到来源系统的数据库,以读取和转换数据,然后将数据加载到您选择的目标位置(例如,BigQuery),无需编写任何代码。为此,您需要将 JDBC 驱动程序上传到 Cloud Data Fusion 实例并对其进行配置,以便在数据流水线中使用。如需了解详情,请参阅如何将 JDBC 驱动程序与 Cloud Data Fusion 搭配使用的指南。

Cloud Data Fusion 将用于来源、转换、聚合、接收器、错误收集器、提醒发布器、操作和运行后操作的插件公开为可自定义的组件。预置的插件可以访问各种数据源。如果插件不存在,您可以使用 Cloud Data Fusion 插件 API 构建自定义插件。如需了解详情,请参阅插件概览

借助 Cloud Data Fusion 流水线,您可以同时创建批量和流式数据流水线。数据流水线提供对日志和指标的访问权限,因此管理员在不使用自定义工具的情况下也可以操作数据处理工作流。

如需开始使用此功能,请参阅 Cloud Data Fusion 概念性概览。如需实例,请参阅快速入门指南和关于创建定位广告系列流水线的教程。

Dataflow

Dataflow 是一种用于大规模运行 Apache Beam 作业的全代管式服务。Apache Beam 是一种开源框架,提供一组丰富的数据选取和会话分析基本功能,以及一个包含来源连接器与接收器连接器(包括 BigQuery 的连接器)的生态系统。您可以通过 Apache Beam 以流式(实时)模式和批量(历史)模式对数据进行转换并丰富数据内容,同时保持同等的可靠性和表现力。

Dataflow 的无服务器方案可自动处理性能、扩缩、可用性、安全性和合规性方面的问题,省去了运营开销。因此您可专注于编程,而不用去管理服务器集群。

您可以通过命令行界面Java SDKPython SDK 提交 Dataflow 作业。此外,我们正在开发一种可移植性框架,在所有 SDK 和运行程序之间实现完全互操作性。

如果您想将数据查询和流水线从其他框架迁移到 Apache Beam 和 Dataflow,请参阅 Apache Beam 编程模型并浏览官方的 Dataflow 文档

如需实例,请参阅 Dataflow 快速入门教程

编排和调度

概括来讲,编排是多个系统的自动协调,而调度是指自动触发编排工作

  • 放大:数据流水线本身是一种由 DAG 描述的数据转换编排,而 DAG 是一种数据处理 DAG
  • 缩小:当数据流水线依赖于其他数据流水线的输出时,您需要编排多个流水线。每个流水线在较大的 DAG(即编排 DAG)中构成一个子 DAG

此设置在数据仓储中很常见。ETL 部分中的图 1 显示了一个示例设置。以下部分主要介绍几种数据流水线的编排。

依赖项

依赖项可以扇入,即多个数据流水线合并到一个编排 DAG 的顶点;也可以扇出,即单个数据流水线触发多个其他流水线;通常也可同时扇入和扇出,如下图所示

标有 A、B 和 C 的多个流水线扇入 D 流水线。D 流水线扇出到 E、F 和 G 流水线。这一切都由编排 DAG 进行编排。

图 10. 组合使用扇入和扇出依赖项。

在状况欠佳的环境中,某些依赖项因可用资源量有限而产生。例如,某个数据流水线会运行和产生一些共用数据作为副产物。其他数据流水线依赖这些共用数据只是为了避免重新计算,而与创建数据的数据流水线无关。如果第一个流水线遇到任何功能性或非功能性问题,则故障会向下级联到依赖它的数据流水线,导致的结果轻则让这些数据流水线等待,重则阻止它们运行,如下图所示。

A 流水线遇到故障。B 和 C 流水线依赖于 A 流水线的输出,因此它们也会发生故障。

图 11. 向下级联到某一数据流水线的故障导致从属流水线无法运行。

在 Google Cloud 中,您可以利用丰富的计算资源和专业工具来优化流水线的执行及其编排。其余部分将介绍这些资源和工具。

涉及的迁移工作

这是简化编排需求的最佳做法。编排复杂度与数据流水线之间的依赖项数量成正比。您可以通过迁移到 Google Cloud 来检查编排 DAG、识别依赖项并确定如何优化这些依赖项。

我们建议您逐步优化依赖项,具体如下:

  1. 在第一次迭代中,将编排按原样移动到 Google Cloud。
  2. 在之后的迭代中,分析依赖项并在可能的情况下将其并行化。
  3. 最后,将常规任务提取到它们自己的 DAG 中,以重新整理您的编排。

下一部分将通过一个实例介绍此方法。

实例

假设某个组织有两个相关流水线:

  • 第一个流水线用于计算整个组织的损益 (P&L) 情况。此流水线涉及许多转换,非常复杂。该流水线的部分工作包括计算月销售额,该值会用于后续转换步骤并最终写入表中。
  • 第二个流水线计算不同产品的年同比和月同比销售增长,以便营销部门调整其广告系列工作。此流水线需要之前由损益数据流水线计算出的月销售额数据。

该组织认为损益数据流水线的优先级高于营销流水线。遗憾的是,由于损益是一种复杂的数据流水线,会消耗大量资源,从而导致其他流水线无法同时运行。此外,如果损益流水线出现故障,营销流水线和其他从属流水线将由于没有必需的数据而无法运行,并且必须等待损益流水线重试。下图对这种情况进行了说明。

损益流水线创建营销流水线所需的“月销售额”软件工件。损益流水线可能会遇到延时和其他问题。

图 12. 复杂的数据流水线可防止优先级较低的流水线运行。

该组织正计划迁移到 BigQuery。它确定了两个用例(损益和营销销售增长)并将其添加到迁移积压工作项中。在规划下一次迭代时,该组织优先考虑损益用例并将其添加在迭代积压工作项中,因为该用例受到当前本地资源的严重限制,并且经常造成延迟。此外,还添加了一些依赖它的用例,其中包括营销用例。

迁移团队运行第一次迭代。他们选择使用重定向方法将损益和营销用例移至 Google Cloud,且未对流水线步骤或编排进行更改。一个重要的区别是,现在的损益流水线可以运用几乎无限的计算能力,因此执行速度远快于本地。该流水线将销售月度数据写入营销增长流水线使用的 BigQuery 表。下图对这些变化进行了说明。

损益流水线与之前相同,但不会遇到延时。

图 13. 使用重定向方法加速复杂的数据流水线。

虽然 Google Cloud 帮助解决了非功能性损益问题,但仍然存在功能性问题。在计算月销售额之前执行的一些不相关任务常常会导致错误,这些错误会阻止计算,并导致从属流水线无法启动。

在第二次迭代中,团队希望在迭代积压工作项中同时添加这两个用例来提高性能。他们确定了用于在损益流水线中计算月销售额的流水线步骤。这些步骤构成了子 DAG,如下图所示。迁移团队将子 DAG 复制到营销流水线中,使该流水线独立于损益流水线运行。Google Cloud 拥有足够的计算能力,可以同时运行这两个流水线。

损益流水线和营销流水线现在作为单独的子 DAG 运行,因此,如果损益流水线遇到问题,营销流水线将不再受影响。

图 14. 使用子 DAG 并行运行的流水线。

但缺点是复制子 DAG 逻辑会产生代码管理开销,因为团队现在需要将子 DAG 逻辑的两个副本保持同步。

在第三次迭代中,团队重新审视用例,并将月销售额子 DAG 提取到独立的流水线中。新的月销售额流水线完成后,它会触发或扇出到损益、营销增长和其他相关流水线。此配置会创建一个新的整体编排 DAG,其中每个流水线都是它的一个子 DAG。

现在,月销售额流水线首先流向损益流水线和营销流水线。

图 15. 每个子 DAG 中包含一个流水线的整体编排 DAG。

在后续迭代中,迁移团队可解决任何遗留的功能性问题,并迁移流水线来使用以下 Google Cloud 代管式服务及其他服务:

虽然 Airflow 本身支持子 DAG,但此功能有可能会限制其性能,因此不建议使用。您可以通过搭配使用独立的 DAG 和 TriggerDagRunOperator 运算符来取代此功能。

后续步骤