使用 Dataplex 通用目录实现数据沿袭

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本页面介绍如何在 Cloud Composer 中启用数据沿袭集成。

数据沿袭集成简介

数据沿袭Dataplex Universal Catalog 的一项功能,可跟踪数据在系统中的移动方式:数据来自何处、传递到何处以及对其应用了哪些转换。

Cloud Composer 使用 apache-airflow-providers-openlineage 软件包生成发送到 Data Lineage API 的沿袭事件。

此软件包已安装在 Cloud Composer 环境中。如果您安装此软件包的其他版本,支持的运营商列表可能会发生变化。我们建议您仅在必要时执行此操作,否则请保留预安装的软件包版本。

  • 数据沿袭适用于与支持数据沿袭的 Dataplex Universal Catalog 区域位于同一区域的环境。

  • 如果您的 Cloud Composer 环境中启用了数据沿袭,Cloud Composer 会将使用任何受支持的运算符的 DAG 的沿袭信息报告给 Data Lineage API。如果您想报告不受支持的运算符的沿袭信息,还可以发送自定义沿袭事件

  • 您可以使用以下方式访问沿袭信息:

    • Data Lineage API
    • Dataplex Universal Catalog 中受支持条目的沿袭图。 如需了解详情,请参阅 Dataplex Universal Catalog 文档中的沿袭图

创建环境时,如果满足以下条件,系统会自动启用数据沿袭集成:

  • 您的项目中已启用 Data Lineage API。如需了解详情,请参阅 Dataplex Universal Catalog 文档中的启用 Data Lineage API

  • 未在 Airflow 中配置自定义 Lineage 后端

您可以在创建环境时停用数据沿袭集成。

对于现有环境,您可以随时启用停用数据沿袭集成。

Cloud Composer 中的功能注意事项

在以下情况下,Cloud Composer 会进行 RPC 调用以创建谱系事件:

  • Airflow 任务开始或完成时
  • 当 DAG 运行开始或结束时

如需详细了解这些实体,请参阅 Dataplex Universal Catalog 文档中的沿袭信息模型沿袭 API 参考文档

发出的沿袭流量受 Data Lineage API 中的配额限制。 Cloud Composer 会消耗写入配额。

与处理谱系数据相关的价格需遵守谱系价格。 请参阅数据沿袭注意事项

Cloud Composer 中的性能注意事项

数据沿袭是在 Airflow 任务执行结束时报告的。 平均而言,数据沿袭报告大约需要 1-2 秒。

这不会影响任务本身的性能:如果沿袭未成功报告给 Lineage API,Airflow 任务不会失败。 这不会影响主要运算符逻辑,但整个任务实例的执行时间会略长,以纳入报告沿袭数据。

如果环境会报告数据沿袭,则相关费用会略有增加,因为报告数据沿袭需要额外的时间。

合规性

数据沿袭对 VPC Service Controls 等功能提供不同的支持级别。查看数据沿袭注意事项,确保支持级别符合您的环境要求。

准备工作

检查运营商是否受支持

数据沿袭支持由运算符所在的提供程序软件包提供:

  1. 检查运算符所在提供程序软件包的更改日志,查找添加了 OpenLineage 支持的条目。

    例如,BigQueryToBigQueryOperator 从 apache-airflow-providers-google 版本 11.0.0 开始支持 OpenLineage。

  2. 检查您的环境使用的提供程序软件包的版本。为此,请参阅您环境中使用的 Airflow build 的预安装软件包列表。您还可以在环境中安装其他版本的软件包。

此外,apache-airflow-providers-openlineage 文档中的支持的类页面列出了最新支持的运算符。

配置数据沿袭集成

Cloud Composer 的数据沿袭集成是按环境管理的。这意味着,启用此功能需要执行两个步骤:

  1. 在项目中启用 Data Lineage API。
  2. 在特定的 Cloud Composer 环境中启用数据沿袭集成。

在 Cloud Composer 中启用数据沿袭

控制台

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 选择环境配置标签页。

  4. Dataplex 数据沿袭集成部分中,点击修改

  5. Dataplex 数据沿袭集成面板中,选择启用与 Dataplex 数据沿袭的集成,然后点击保存

gcloud

使用 --enable-cloud-data-lineage-integration 实参。

gcloud composer environments update ENVIRONMENT_NAME \
    --location LOCATION \
    --enable-cloud-data-lineage-integration

替换以下内容:

  • ENVIRONMENT_NAME:您的环境的名称。
  • LOCATION:环境所在的区域。

示例:

gcloud composer environments update example-environment \
    --location us-central1 \
    --enable-cloud-data-lineage-integration

在 Cloud Composer 中停用数据沿袭

在 Cloud Composer 环境中停用沿袭集成不会停用 Data Lineage API。如果您想彻底停用项目的沿袭报告功能,请同时停用 Data Lineage API。请参阅停用服务

控制台

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 选择环境配置标签页。

  4. Dataplex 数据沿袭集成部分中,点击修改

  5. Dataplex 数据沿袭集成面板中,选择停用与 Dataplex 数据沿袭的集成,然后点击保存

gcloud

使用 --disable-cloud-data-lineage-integration 实参。

gcloud composer environments update ENVIRONMENT_NAME \
    --location LOCATION \
    --disable-cloud-data-lineage-integration

替换以下内容:

  • ENVIRONMENT_NAME:您的环境的名称。
  • LOCATION:环境所在的区域。

示例:

gcloud composer environments update example-environment \
    --location us-central1 \
    --disable-cloud-data-lineage-integration

在受支持的运算符中发送沿袭事件

如果启用了数据沿袭,受支持的运算符会自动发送沿袭事件。您无需更改 DAG 代码。

例如,运行以下任务:

task = BigQueryInsertJobOperator(
    task_id='snapshot_task',
    dag=dag,
    location='<dataset-location>',
    configuration={
        'query': {
            'query': 'SELECT * FROM dataset.tableA',
            'useLegacySql': False,
            'destinationTable': {
                'project_id': 'example-project',
                'dataset_id': 'dataset',
                'table_id': 'tableB',
            },
        }
    },
)

在 Dataplex Universal Catalog 界面中创建以下沿袭图:

Dataplex 界面中的沿袭图示例。
图 1. Dataplex Universal Catalog 界面中 BigQuery 表的沿袭图示例。

发送自定义沿袭事件

如果您想报告未支持自动沿袭报告的运算符的沿袭,可以发送自定义沿袭事件。

例如,如需发送具有以下内容的自定义事件:

  • BashOperator:修改任务定义中的 inletsoutlets 参数。
  • PythonOperator:修改任务定义中的 task.inletstask.outlets 参数。
  • 您可以为 inlets 参数使用 AUTO。此属性的值设置为等于其上游任务的 outlets

以下示例展示了入口和出口的用法:

from airflow.composer.data_lineage.entities import BigQueryTable
from airflow.lineage import AUTO

...

bash_task = BashOperator(
    task_id="bash_task",
    dag=dag,
    bash_command="sleep 0",
    inlets=[
        BigQueryTable(
            project_id="example-project",
            dataset_id="dataset",
            table_id="table1",
        )
    ],
    outlets=[
        BigQueryTable(
            project_id="example-project",
            dataset_id="dataset",
            table_id="table2",
        )
    ],
)


def _python_task(task):
    print("Python task")


python_task = PythonOperator(
    task_id="python_task",
    dag=dag,
    python_callable=_python_task,
    inlets=[
        AUTO,
        BigQueryTable(
            project_id="example-project",
            dataset_id="dataset",
            table_id="table3",
        ),
    ],
    outlets=[
        BigQueryTable(
            project_id="example-project",
            dataset_id="dataset",
            table_id="table4",
        )
    ],
)

bash_task >> python_task

因此,Dataplex Universal Catalog 界面中会创建以下沿袭图:

Dataplex 界面中自定义事件的沿袭图示例。
图 2. Dataplex Universal Catalog 界面中多个 BigQuery 表的沿袭图示例。

在 Cloud Composer 中查看谱系日志

您可以在 Dataplex Universal Catalog 数据沿袭集成部分的环境配置页面上,使用相关链接检查与数据沿袭相关的日志。

问题排查

如果沿袭数据未报告给 Lineage API,或者您在 Dataplex Universal Catalog 中看不到该数据,请尝试执行以下问题排查步骤:

  • 确保在 Cloud Composer 环境的项目中启用 Data Lineage API。
  • 检查 Cloud Composer 环境中是否已启用数据沿袭集成。
  • 检查您使用的运算符是否包含在自动沿袭报告支持中。请参阅受支持的 Airflow 运算符
  • 在 Cloud Composer 中检查沿袭日志,了解可能存在的问题。

后续步骤