使用 Dataplex 执行数据沿袭

Cloud Composer 1 | Cloud Composer 2

数据沿袭集成简介

数据沿袭Dataplex 的一项功能,可让您跟踪数据在系统中的移动方式:数据的来源、传入的位置以及已应用于数据的转换。数据沿袭适用于:

在 Cloud Composer 环境中启用该功能后,如果运行使用任何受支持运算符的 DAG,Cloud Composer 就会向 Data Lineage API 报告沿袭信息。

然后,您可以通过以下方式访问这些信息:

支持的运算符

以下运算符支持在 Cloud Composer 中自动报告沿袭:

  • airflow.providers.google.cloud.operators.bigquery.BigQueryExecuteQueryOperator
  • airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator
  • airflow.providers.google.cloud.transfers.bigquery_to_bigquery.BigQueryToBigQueryOperator
  • airflow.contrib.operators.bigquery_to_gcs.BigQueryToCloudStorageOperator
  • airflow.providers.google.cloud.transfers.bigquery_to_gcs.BigQueryToGCSOperator
  • airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSToBigQueryOperator
  • airflow.contrib.operators.gcs_to_bq.GoogleCloudStorageToBigQueryOperator
  • airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator

例如,运行以下任务:

task = BigQueryInsertJobOperator(
    task_id='snapshot_task',
    dag=dag,
    location='<dataset-location>',
    configuration={
        'query': {
            'query': 'SELECT * FROM dataset.tableA',
            'useLegacySql': False,
            'destinationTable': {
                'project_id': GCP_PROJECT,
                'dataset_id': 'dataset',
                'table_id': 'tableB',
            },
        }
    },
)

使在 Dataplex 界面中创建以下沿袭图:

Dataplex 界面中的示例沿袭图。
图 1:Dataplex 界面中 BigQuery 表的示例沿袭图。

Cloud Composer 的功能注意事项

每次报告数据沿袭的 Airflow 任务执行都会执行以下操作:

  • 一个沿袭进程的创建或更新 RPC 请求
  • 一个针对沿袭运行的创建或更新 RPC 请求
  • 创建沿袭事件的一个或多个 RPC 请求(大多数时候为 0 或 1)

如需详细了解这些实体,请参阅 Dataplex 文档中的沿袭信息模型Lineage API 参考文档

发出的沿袭流量受 Data Lineage API 中的配额限制。Cloud Composer 会消耗写入配额。

与处理沿袭数据相关的价格取决于沿袭价格。请参阅数据沿袭注意事项

性能影响

数据沿袭会在 Airflow 任务执行结束时报告。平均而言,数据沿袭报告大约需要 1-2 秒。

这不会影响任务本身的性能:如果未成功向 Lineage API 报告沿袭,则 Airflow 任务不会失败。这对主操作器逻辑没有影响,但为了解释报告沿袭数据,整个任务实例执行的时间略长一些。

由于报告数据沿袭需要额外的时间,因此报告数据沿袭的环境将略微增加相关费用。

合规性

数据沿袭为 VPC Service Controls 等功能提供了不同的支持级别。请查看数据沿袭注意事项,以确保支持级别符合您的环境要求。

使用数据沿袭集成

Cloud Composer 的数据沿袭集成按环境进行管理。这意味着要启用此功能,需要执行两个步骤:

  1. 在项目中启用 Data Lineage API。
  2. 在特定 Cloud Composer 环境中启用数据沿袭集成。

准备工作

创建环境时,如果满足以下条件,数据沿袭集成将自动启用

  • 您的项目已启用 Data Lineage API。如需了解详情,请参阅 Dataplex 文档中的启用 Data Lineage API

  • 未在 Airflow 中配置自定义沿袭后端

  • 此环境中未启用客户管理的加密密钥 (CMEK)。数据沿袭不支持对提取的元数据使用 CMEK。 在使用 CMEK 的 Cloud Composer 环境中,您无法启用数据沿袭集成。如需了解详情和其他限制,请参阅数据沿袭注意事项

对于现有环境,您可以随时enable停用数据沿袭集成。

所需的角色

若要与数据沿袭集成,您需要为 Cloud Composer 环境服务帐号添加以下权限:

  • 对于默认服务帐号:无需进行任何更改。默认服务帐号包含所需的权限。
  • 对于用户代管式服务帐号:向您的服务帐号授予 Composer Worker (roles/composer.worker) 角色。此角色包含所有必需的数据沿袭权限。

如需了解详情,请参阅 Dataplex 文档中的沿袭角色和权限

在 Cloud Composer 中启用数据沿袭

控制台

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 选择环境配置标签页。

  4. Dataplex 数据沿袭集成部分中,点击修改

  5. Dataplex 数据沿袭集成面板中,选择启用与 Dataplex 数据沿袭的集成,然后点击保存

gcloud

使用 --enable-cloud-data-lineage-integration 参数。

gcloud composer environments update ENVIRONMENT_NAME \
    --location LOCATION \
    --enable-cloud-data-lineage-integration

您需要在其中:

  • ENVIRONMENT_NAME 替换为环境的名称。

    该名称必须以小写字母开头,后面最多可跟 62 个小写字母、数字或连字符,但不能以连字符结尾。该环境名称用于创建环境的子组件,因此您必须提供一个有效的 Cloud Storage 存储桶名称。如需查看限制列表,请参阅存储桶命名准则

  • LOCATION 替换为环境的区域。

    位置是环境的 GKE 集群所在的区域。

示例:

gcloud composer environments update example-environment \
    --location us-central1 \
    --enable-cloud-data-lineage-integration

发送自定义沿袭事件

如果要为自动沿袭报告不支持的运算符报告沿袭,您可以发送自定义沿袭事件。

例如,要通过以下代码发送自定义事件:

  • BashOperator,请修改任务定义中的 inletsoutlets 参数。
  • PythonOperator,请修改任务定义中的 task.inletstask.outlets 参数。为 inlets 参数使用 AUTO 会将其值设置为等于其上游任务的 outlets

例如,运行此任务:


from airflow.composer.data_lineage.entities import BigQueryTable
from airflow.lineage import AUTO

…

bash_task = BashOperator(
   task_id='bash_task',
   dag=dag,
   bash_command='sleep 0',
   inlets=[BigQueryTable(
       project_id=GCP_PROJECT,
       dataset_id='dataset',
       table_id='table1',
   )],
   outlets=[BigQueryTable(
       project_id=GCP_PROJECT,
       dataset_id='dataset',
       table_id='table2',
   )]
)

def _python_task(task):
   task.inlets.append(BigQueryTable(
       project_id=GCP_PROJECT,
       dataset_id='dataset',
       table_id='table3',
   ))

   task.outlets.append(BigQueryTable(
       project_id=GCP_PROJECT,
       dataset_id='dataset',
       table_id='table4',
   ))

python_task = PythonOperator(
   task_id='python_task',
   dag=dag,
   python_callable=_python_task,
   inlets=[AUTO],
)

bash_task >> python_task

使在 Dataplex 界面中创建以下沿袭图:

Dataplex 界面中自定义事件的沿袭图示例。
图 2:Dataplex 界面中多个 BigQuery 表的示例沿袭图。

在 Cloud Composer 中停用数据沿袭

在 Cloud Composer 环境中停用沿袭集成不会停用 Data Lineage API。如果要为项目完全停用沿袭报告,请同时停用 Data Lineage API。请参阅停用服务

控制台

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 选择环境配置标签页。

  4. Dataplex 数据沿袭集成部分中,点击修改

  5. Dataplex 数据沿袭集成面板中,选择停用与 Dataplex 数据沿袭的集成,然后点击保存

gcloud

使用 --disable-cloud-data-lineage-integration 参数。

gcloud composer environments update ENVIRONMENT_NAME \
    --location LOCATION \
    --disable-cloud-data-lineage-integration

您需要在其中:

  • ENVIRONMENT_NAME 替换为环境的名称。

    该名称必须以小写字母开头,后面最多可跟 62 个小写字母、数字或连字符,但不能以连字符结尾。该环境名称用于创建环境的子组件,因此您必须提供一个有效的 Cloud Storage 存储桶名称。如需查看限制列表,请参阅存储桶命名准则

  • LOCATION 替换为环境的区域。

    位置是环境的 GKE 集群所在的区域。

示例:

gcloud composer environments update example-environment \
    --location us-central1 \
    --disable-cloud-data-lineage-integration

在 Cloud Composer 中查看沿袭日志

您可以使用 Dataplex 数据沿袭集成部分中环境配置页面上的链接来检查与数据沿袭相关的日志。

问题排查

如果沿袭数据未报告给 Lineage API,或者您在 Dataplex 中看不到这些数据,请尝试执行以下问题排查步骤:

  • 确保在 Cloud Composer 环境的项目中启用了 Data Lineage API。
  • 检查 Cloud Composer 环境中是否启用了数据沿袭集成。
  • 检查您使用的运算符是否包含在自动沿袭报告支持中。请参阅支持的 Airflow 运算符
  • 查看 Cloud Composer 中的沿袭日志,看看是否存在潜在问题。