Cloud Composer 1 |Cloud Composer 2 |Cloud Composer 3
数据沿袭集成简介
数据沿袭是一个 Dataplex 功能,您可以跟踪数据在系统中是如何移动的: 数据来源、传递到数据的位置以及对数据应用了哪些转换。 数据沿袭适用于:
运行 2.1.2 及更高版本的 Cloud Composer 2 环境,以及运行 2.2.5 及更高版本的 Airflow。
Cloud Composer 2 环境 支持数据沿袭的 Data Catalog 区域。
在 Cloud Composer 环境中启用此功能后,运行利用任何受支持运算符的 DAG 会导致 Cloud Composer 向 Data Lineage API 报告谱系信息。
然后,您可以通过以下方式访问这些信息:
- Data Lineage API
- 支持的 Data Catalog 条目的沿袭可视化图 数据。请参阅 Dataplex 文档中的沿袭可视化图。
支持的运算符
以下运算符支持 Cloud Composer:
airflow.providers.google.cloud.operators.bigquery.BigQueryExecuteQueryOperator
airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator
airflow.providers.google.cloud.transfers.bigquery_to_bigquery.BigQueryToBigQueryOperator
airflow.contrib.operators.bigquery_to_gcs.BigQueryToCloudStorageOperator
airflow.providers.google.cloud.transfers.bigquery_to_gcs.BigQueryToGCSOperator
airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSToBigQueryOperator
airflow.contrib.operators.gcs_to_bq.GoogleCloudStorageToBigQueryOperator
airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator
例如,运行以下任务:
task = BigQueryInsertJobOperator(
task_id='snapshot_task',
dag=dag,
location='<dataset-location>',
configuration={
'query': {
'query': 'SELECT * FROM dataset.tableA',
'useLegacySql': False,
'destinationTable': {
'project_id': GCP_PROJECT,
'dataset_id': 'dataset',
'table_id': 'tableB',
},
}
},
)
这会在 Dataplex 界面中创建以下谱系图:
Cloud Composer 的功能注意事项
每项报告数据沿袭的 Airflow 任务执行都会执行以下操作:
- 针对谱系进程的一次创建或更新 RPC 请求
- 一个针对沿袭运行作业的创建或更新 RPC 请求
- 用于创建沿袭事件的一个或多个 RPC 请求(大多数情况下为 0 或 1)
如需详细了解这些实体,请参阅 Dataplex 文档中的谱系信息模型和谱系 API 参考。
发出的沿袭流量受制于 Data Lineage API 中的配额。 Cloud Composer 会消耗写入配额。
与处理沿袭数据相关的价格取决于沿袭价格。 请参阅数据沿袭注意事项。
性能影响
系统会在 Airflow 任务执行结束时报告数据沿袭。平均而言,数据传承报告大约需要 1-2 秒。
这不会影响任务本身的性能:Airflow 任务不会 如果沿袭未成功报告给 Lineage API,则会失败。 对主操作器逻辑没有影响,但整个任务实例会 执行时间略长一点,以便将沿袭数据纳入考虑范围。
在报告数据沿袭的环境中, ,因为报告数据沿袭需要额外的时间,从而节省费用。
合规性
数据源流可为 VPC Service Controls 等功能提供不同的支持级别。评价 数据沿袭注意事项 以确保支持级别符合您的环境要求。
使用数据沿袭集成
Cloud Composer 的数据沿袭集成通过 。这意味着,启用该功能需要完成以下两个步骤:
- 在项目中启用 Data Lineage API。
- 在特定 Cloud Composer 环境中启用数据沿袭集成。
准备工作
创建环境时,数据沿袭集成 如果满足以下条件,则会自动启用:
您的项目已启用 Data Lineage API。如需了解详情,请参阅 启用 Data Lineage API: Dataplex 文档。
未在 Airflow 中配置自定义谱系后端。
所需的角色
若要与数据源流水线集成,您需要为 Cloud Composer 环境服务账号添加以下权限:
- 对于默认服务账号:无需进行任何更改。默认服务账号包含所需权限。
- 对于用户代管式服务账号:向 Composer Worker 授予
(
roles/composer.worker
) 角色分配给您的服务账号。此角色包括 所有必要的数据沿袭权限
如需了解详情,请参阅 Dataplex 文档中的沿袭角色和权限。
在 Cloud Composer 中启用数据沿袭
控制台
在 Google Cloud 控制台中,前往环境页面。
在环境列表中,点击您的环境名称。环境详情页面会打开。
选择环境配置标签页。
在 Dataplex 数据沿袭集成部分中,点击修改。
在 Dataplex 数据沿袭集成面板中,选择启用与 Dataplex 数据沿袭的集成,然后点击保存。
gcloud
使用 --enable-cloud-data-lineage-integration
参数。
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--enable-cloud-data-lineage-integration
您需要将其中的:
将
ENVIRONMENT_NAME
替换为环境的名称。该名称必须以小写字母开头,后面最多可跟 62 个小写字母、数字或连字符,但不能以连字符结尾。该环境名称用于创建环境的子组件,因此您必须提供一个有效的 Cloud Storage 存储桶名称。如需查看限制列表,请参阅存储桶命名准则。
LOCATION
替换为环境的区域。该位置是环境的 GKE 集群所在的区域。
示例:
gcloud composer environments update example-environment \
--location us-central1 \
--enable-cloud-data-lineage-integration
发送自定义谱系事件
如果您想报告某个运营商的沿袭,可以发送自定义沿袭事件 自动沿袭报告不支持的数据。
例如,要通过以下代码发送自定义事件:
BashOperator
,修改任务中的inlets
或outlets
参数 定义。PythonOperator
,请修改task.inlets
或task.outlets
参数 。对inlets
参数使用AUTO
可设置其 值等于其上游任务的outlets
。
例如,运行以下任务:
from airflow.composer.data_lineage.entities import BigQueryTable
from airflow.lineage import AUTO
…
bash_task = BashOperator(
task_id='bash_task',
dag=dag,
bash_command='sleep 0',
inlets=[BigQueryTable(
project_id=GCP_PROJECT,
dataset_id='dataset',
table_id='table1',
)],
outlets=[BigQueryTable(
project_id=GCP_PROJECT,
dataset_id='dataset',
table_id='table2',
)]
)
def _python_task(task):
task.inlets.append(BigQueryTable(
project_id=GCP_PROJECT,
dataset_id='dataset',
table_id='table3',
))
task.outlets.append(BigQueryTable(
project_id=GCP_PROJECT,
dataset_id='dataset',
table_id='table4',
))
python_task = PythonOperator(
task_id='python_task',
dag=dag,
python_callable=_python_task,
inlets=[AUTO],
)
bash_task >> python_task
这会在 Dataplex 界面中创建以下谱系图:
在 Cloud Composer 中停用数据沿袭
在 Cloud Composer 环境中停用谱系集成不会停用 Data Lineage API。如果要完全停用沿袭 报告项目的数据,也要停用 Data Lineage API。请参阅停用服务。
控制台
在 Google Cloud 控制台中,前往环境页面。
在环境列表中,点击您的环境名称。环境详情页面会打开。
选择环境配置标签页。
在 Dataplex 数据沿袭集成部分中,点击修改。
在 Dataplex 数据沿袭集成面板中,选择停用与 Dataplex 数据沿袭的集成,然后点击保存。
gcloud
使用 --disable-cloud-data-lineage-integration
参数。
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--disable-cloud-data-lineage-integration
您需要将其中的:
将
ENVIRONMENT_NAME
替换为环境的名称。该名称必须以小写字母开头,后面最多可跟 62 个小写字母、数字或连字符,但不能以连字符结尾。该环境名称用于创建环境的子组件,因此您必须提供一个有效的 Cloud Storage 存储桶名称。如需查看限制列表,请参阅存储桶命名准则。
LOCATION
替换为环境的区域。该位置是环境的 GKE 集群所在的区域。
示例:
gcloud composer environments update example-environment \
--location us-central1 \
--disable-cloud-data-lineage-integration
在 Cloud Composer 中查看谱系日志
您可以使用 Dataplex 中的环境配置页面 数据沿袭集成部分。
问题排查
如果沿袭数据未报告给 Lineage API,或者您在 Dataplex,请尝试执行以下问题排查步骤:
- 确保在项目的项目中启用了 Data Lineage API Cloud Composer 环境。
- 检查是否已在 Cloud Composer 中启用数据沿袭集成 环境
- 检查您使用的运营商是否包含在自动沿袭中 报告支持。请参阅支持的 Airflow 运算符。
- 查看 Cloud Composer 中的沿袭日志,了解可能存在的问题。