启用数据沿袭集成

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

数据沿袭集成简介

数据沿袭Dataplex 的一项功能,可让您跟踪数据在系统中是如何移动的:数据的来源、传入的位置以及对数据应用了哪些转换。数据沿袭适用于:

在 Cloud Composer 环境中启用此功能后,如果运行使用任何受支持运算符的 DAG,则会导致 Cloud Composer 向 Data Lineage API 报告沿袭信息。

然后,您可以通过以下方式访问这些信息:

支持的运算符

以下运算符支持 Cloud Composer 中的自动沿袭报告:

  • airflow.providers.google.cloud.operators.bigquery.BigQueryExecuteQueryOperator
  • airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator
  • airflow.providers.google.cloud.transfers.bigquery_to_bigquery.BigQueryToBigQueryOperator
  • airflow.contrib.operators.bigquery_to_gcs.BigQueryToCloudStorageOperator
  • airflow.providers.google.cloud.transfers.bigquery_to_gcs.BigQueryToGCSOperator
  • airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSToBigQueryOperator
  • airflow.contrib.operators.gcs_to_bq.GoogleCloudStorageToBigQueryOperator
  • airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator

例如,运行以下任务:

task = BigQueryInsertJobOperator(
    task_id='snapshot_task',
    dag=dag,
    location='<dataset-location>',
    configuration={
        'query': {
            'query': 'SELECT * FROM dataset.tableA',
            'useLegacySql': False,
            'destinationTable': {
                'project_id': GCP_PROJECT,
                'dataset_id': 'dataset',
                'table_id': 'tableB',
            },
        }
    },
)

导致在 Dataplex 界面中创建以下沿袭图:

Dataplex 界面中的沿袭图示例。
图 1:Dataplex 界面中 BigQuery 表的沿袭图示例。

Cloud Composer 的功能注意事项

每项报告数据沿袭的 Airflow 任务执行都会执行以下操作:

  • 一个针对沿袭过程的创建或更新 RPC 请求
  • 一个针对沿袭运行作业的创建或更新 RPC 请求
  • 用于创建沿袭事件的一个或多个 RPC 请求(大多数情况下为 0 或 1)

如需详细了解这些实体,请参阅 Dataplex 文档中的沿袭信息模型Lineage API 参考文档

发出的沿袭流量受 Data Lineage API 中的配额限制。Cloud Composer 会消耗写入配额。

与处理沿袭数据相关的价格取决于沿袭价格。请参阅数据沿袭注意事项

性能影响

数据沿袭会在 Airflow 任务执行结束时报告。平均而言,数据沿袭报告大约需要 1-2 秒。

这不会影响任务本身的性能:如果未成功向 Lineage API 报告沿袭,Airflow 任务不会失败。对主操作器逻辑没有影响,但整个任务实例的执行时间会稍长一些,以便将沿袭数据纳入考虑范围。

由于报告数据沿袭需要额外的时间,因此报告数据沿袭的环境的相关费用将略有增加。

法规遵从

数据沿袭为 VPC Service Controls 等功能提供不同的支持级别。请查看数据沿袭注意事项,以确保支持级别符合您的环境要求。

使用数据沿袭集成

Cloud Composer 的数据沿袭集成是按环境进行管理的。这意味着,启用此功能需要执行两个步骤:

  1. 在项目中启用 Data Lineage API。
  2. 在特定 Cloud Composer 环境中启用数据沿袭集成。

准备工作

当您创建环境时,如果满足以下条件,系统就会自动启用数据沿袭集成:

  • 您的项目中已启用 Data Lineage API。如需了解详情,请参阅 Dataplex 文档中的启用 Data Lineage API

  • Airflow 中未配置自定义沿袭后端

对于现有环境,您可以随时enable停用数据沿袭集成。

所需的角色

如需与数据沿袭集成,您需要为 Cloud Composer 环境服务帐号添加以下权限:

  • 对于默认服务账号:无需进行任何更改。默认服务帐号包含所需的权限。
  • 对于用户代管式服务帐号:向您的服务帐号授予 Composer Worker (roles/composer.worker) 角色。此角色拥有所有必需的数据沿袭权限。

如需了解详情,请参阅 Dataplex 文档中的沿袭角色和权限

在 Cloud Composer 中启用数据沿袭

控制台

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 选择环境配置标签页。

  4. Dataplex 数据沿袭集成部分中,点击修改

  5. Dataplex 数据沿袭集成面板中,选择启用与 Dataplex 数据沿袭的集成,然后点击保存

gcloud

使用 --enable-cloud-data-lineage-integration 参数。

gcloud composer environments update ENVIRONMENT_NAME \
    --location LOCATION \
    --enable-cloud-data-lineage-integration

您需要在其中:

  • ENVIRONMENT_NAME 替换为环境的名称。

    该名称必须以小写字母开头,后面最多可跟 62 个小写字母、数字或连字符,但不能以连字符结尾。该环境名称用于创建环境的子组件,因此您必须提供一个有效的 Cloud Storage 存储桶名称。如需查看限制列表,请参阅存储桶命名准则

  • LOCATION 替换为环境的区域。

    位置是指环境的 GKE 集群所在的区域。

示例:

gcloud composer environments update example-environment \
    --location us-central1 \
    --enable-cloud-data-lineage-integration

发送自定义沿袭事件

如果您想报告自动沿袭报告不支持的运营商的沿袭,可以发送自定义沿袭事件。

例如,要通过以下代码发送自定义事件:

  • BashOperator 时,修改任务定义中的 inletsoutlets 参数。
  • PythonOperator 时,请修改任务定义中的 task.inletstask.outlets 参数。对 inlets 参数使用 AUTO 会将其值设为其上游任务的 outlets

例如,运行以下任务:


from airflow.composer.data_lineage.entities import BigQueryTable
from airflow.lineage import AUTO

…

bash_task = BashOperator(
   task_id='bash_task',
   dag=dag,
   bash_command='sleep 0',
   inlets=[BigQueryTable(
       project_id=GCP_PROJECT,
       dataset_id='dataset',
       table_id='table1',
   )],
   outlets=[BigQueryTable(
       project_id=GCP_PROJECT,
       dataset_id='dataset',
       table_id='table2',
   )]
)


def _python_task(task):
   task.inlets.append(BigQueryTable(
       project_id=GCP_PROJECT,
       dataset_id='dataset',
       table_id='table3',
   ))

   task.outlets.append(BigQueryTable(
       project_id=GCP_PROJECT,
       dataset_id='dataset',
       table_id='table4',
   ))


python_task = PythonOperator(
   task_id='python_task',
   dag=dag,
   python_callable=_python_task,
   inlets=[AUTO],
)

bash_task >> python_task

导致在 Dataplex 界面中创建以下沿袭图:

Dataplex 界面中自定义事件的沿袭图示例。
图 2:Dataplex 界面中多个 BigQuery 表的沿袭图示例。

在 Cloud Composer 中停用数据沿袭

在 Cloud Composer 环境中停用沿袭集成不会停用 Data Lineage API。如果要为项目完全停用沿袭报告,请同时停用 Data Lineage API。请参阅停用服务

控制台

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 选择环境配置标签页。

  4. Dataplex 数据沿袭集成部分中,点击修改

  5. Dataplex 数据沿袭集成面板中,选择停用与 Dataplex 数据沿袭的集成,然后点击保存

gcloud

使用 --disable-cloud-data-lineage-integration 参数。

gcloud composer environments update ENVIRONMENT_NAME \
    --location LOCATION \
    --disable-cloud-data-lineage-integration

您需要在其中:

  • ENVIRONMENT_NAME 替换为环境的名称。

    该名称必须以小写字母开头,后面最多可跟 62 个小写字母、数字或连字符,但不能以连字符结尾。该环境名称用于创建环境的子组件,因此您必须提供一个有效的 Cloud Storage 存储桶名称。如需查看限制列表,请参阅存储桶命名准则

  • LOCATION 替换为环境的区域。

    位置是指环境的 GKE 集群所在的区域。

示例:

gcloud composer environments update example-environment \
    --location us-central1 \
    --disable-cloud-data-lineage-integration

在 Cloud Composer 中查看沿袭日志

您可以使用 Dataplex 数据沿袭集成部分中环境配置页面上的链接,检查与数据沿袭相关的日志。

问题排查

如果沿袭数据未报告给 Lineage API,或者您无法在 Dataplex 中看到这些数据,请尝试执行以下问题排查步骤:

  • 确保在 Cloud Composer 环境的项目中启用了 Data Lineage API。
  • 检查 Cloud Composer 环境中是否启用了数据沿袭集成。
  • 检查您使用的运营商是否已纳入自动化沿袭报告支持中。请参阅支持的 Airflow 运算符
  • 查看 Cloud Composer 中的沿袭日志,了解可能存在的问题。