Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本頁面說明如何在 Cloud Composer 中啟用資料沿襲整合功能。
關於資料歷程整合功能
資料歷程是 Dataplex Universal Catalog 的功能,可追蹤資料在系統中的移動方式,包括資料來源、傳遞至何處,以及套用的轉換。
Cloud Composer 會使用 apache-airflow-providers-openlineage
套件產生沿襲事件,並傳送至 Data Lineage API。
這個套件已安裝在 Cloud Composer 環境中。如果您安裝其他版本的套件,支援的運算子清單可能會有所變更。建議您只在必要時才這麼做,否則請保留預先安裝的套件版本。
資料歷程適用於與 Dataplex Universal Catalog 區域相同的環境,且支援資料歷程。
如果 Cloud Composer 環境已啟用資料沿襲功能,Cloud Composer 會將使用支援的運算子的 DAG 沿襲資訊,回報給 Data Lineage API。如要為不受支援的運算子回報沿襲,也可以傳送自訂沿襲事件。
您可以透過下列方式存取沿襲資訊:
建立環境時,如果符合下列條件,系統會自動啟用資料沿襲整合功能:
專案已啟用 Data Lineage API。詳情請參閱 Dataplex Universal Catalog 說明文件的「啟用 Data Lineage API」。
Airflow 中未設定自訂 Lineage 後端。
您可以在建立環境時停用資料沿襲整合。
Cloud Composer 的功能考量事項
在下列情況下,Cloud Composer 會發出 RPC 呼叫來建立沿襲事件:
- Airflow 工作開始或完成時
- DAG 執行作業開始或完成時
如要瞭解這些實體的詳細資料,請參閱 Dataplex Universal Catalog 說明文件中的「歷程資訊模型」和「歷程 API 參考資料」。
發出的歷程流量須遵守 Data Lineage API 的配額。Cloud Composer 會耗用寫入配額。
處理沿襲資料的相關費用適用於沿襲定價。請參閱資料沿襲考量事項。
Cloud Composer 的效能考量
Airflow 工作執行完畢後,系統會回報資料歷程。平均而言,資料沿襲報表約需 1 到 2 秒即可產生。
這不會影響工作本身的效能:如果系統無法順利向 Lineage API 報告沿襲資訊,Airflow 工作不會失敗。主要運算子邏輯不會受到影響,但整個工作執行個體會執行較久,以納入報表沿襲資料。
如果環境會回報資料沿襲,相關聯的費用會稍微增加,因為回報資料沿襲需要額外時間。
法規遵循
資料沿襲功能對 VPC Service Controls 等功能提供不同層級的支援。請參閱資料沿襲注意事項,確保支援等級符合環境需求。
事前準備
這項功能支援多種法規遵循標準。請務必先查看Cloud Composer 專屬的功能注意事項,以及資料沿襲功能注意事項。
Composer Worker (
roles/composer.worker
) 角色已包含資料沿襲的所有必要 IAM 權限。這是環境服務帳戶的必要角色。如要進一步瞭解資料歷程權限,請參閱 Dataplex Universal Catalog 說明文件中的「歷程角色和權限」。
確認是否支援電信業者
資料歷程支援是由運算子所在供應商套件提供:
檢查運算子所在供應器套件的變更記錄,找出新增 OpenLineage 支援的項目。
舉例來說,BigQueryToBigQueryOperator 從
apache-airflow-providers-google
11.0.0 版開始支援 OpenLineage。檢查環境使用的供應商套件版本。如要這麼做,請參閱環境中使用的 Airflow 版本預先安裝的套件清單。您也可以在環境中安裝其他版本的套件。
此外,apache-airflow-providers-openlineage
說明文件中的「支援的類別」頁面列出了最新支援的運算子。
設定資料歷程整合
Cloud Composer 的資料歷程整合功能是以環境為單位進行管理。也就是說,啟用這項功能需要兩個步驟:
- 在專案中啟用 Data Lineage API。
- 在特定 Cloud Composer 環境中啟用資料歷程整合功能。
在 Cloud Composer 中啟用資料歷程
主控台
前往 Google Cloud 控制台的「Environments」頁面。
在環境清單中,按一下環境名稱。 「環境詳細資料」頁面隨即開啟。
選取「環境設定」分頁標籤。
在「Dataplex 資料歷程整合」部分中,按一下「編輯」。
在「Dataplex 資料歷程整合功能」面板中,選取「啟用 Dataplex 資料歷程整合功能」,然後按一下「儲存」。
gcloud
使用 --enable-cloud-data-lineage-integration
引數。
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--enable-cloud-data-lineage-integration
更改下列內容:
ENVIRONMENT_NAME
:環境名稱。LOCATION
:環境所在的區域。
範例:
gcloud composer environments update example-environment \
--location us-central1 \
--enable-cloud-data-lineage-integration
在 Cloud Composer 中停用資料歷程
在 Cloud Composer 環境中停用歷程整合功能,不會停用 Data Lineage API。如要徹底停用專案的歷程報表功能,請一併停用 Data Lineage API。詳情請參閱停用服務的相關說明。
主控台
前往 Google Cloud 控制台的「Environments」頁面。
在環境清單中,按一下環境名稱。 「環境詳細資料」頁面隨即開啟。
選取「環境設定」分頁標籤。
在「Dataplex 資料歷程整合」部分中,按一下「編輯」。
在「Dataplex 資料歷程整合功能」面板中,選取「停用 Dataplex 資料歷程整合功能」,然後按一下「儲存」。
gcloud
使用 --disable-cloud-data-lineage-integration
引數。
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--disable-cloud-data-lineage-integration
更改下列內容:
ENVIRONMENT_NAME
:環境名稱。LOCATION
:環境所在的區域。
範例:
gcloud composer environments update example-environment \
--location us-central1 \
--disable-cloud-data-lineage-integration
在支援的運算子中傳送沿襲事件
如果已啟用資料沿襲,系統會自動傳送沿襲事件。您不需要變更 DAG 程式碼。
舉例來說,執行下列工作:
task = BigQueryInsertJobOperator(
task_id='snapshot_task',
dag=dag,
location='<dataset-location>',
configuration={
'query': {
'query': 'SELECT * FROM dataset.tableA',
'useLegacySql': False,
'destinationTable': {
'project_id': 'example-project',
'dataset_id': 'dataset',
'table_id': 'tableB',
},
}
},
)
在 Dataplex Universal Catalog UI 中建立下列歷程圖:

傳送自訂歷程事件
如要為不支援自動沿襲報告的運算子回報沿襲,可以傳送自訂沿襲事件。
舉例來說,如要傳送自訂事件,請使用:
- BashOperator:修改工作定義中的
inlets
或outlets
參數。 - PythonOperator:修改工作定義中的
task.inlets
或task.outlets
參數。 - 您可以將
AUTO
用於inlets
參數。這會將值設為上游工作的outlets
。
以下範例說明如何使用入口和出口:
from airflow.composer.data_lineage.entities import BigQueryTable
from airflow.lineage import AUTO
...
bash_task = BashOperator(
task_id="bash_task",
dag=dag,
bash_command="sleep 0",
inlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table1",
)
],
outlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table2",
)
],
)
def _python_task(task):
print("Python task")
python_task = PythonOperator(
task_id="python_task",
dag=dag,
python_callable=_python_task,
inlets=[
AUTO,
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table3",
),
],
outlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table4",
)
],
)
bash_task >> python_task
因此,Dataplex Universal Catalog UI 中會建立下列歷程圖:

在 Cloud Composer 中查看沿襲記錄
您可以使用「Dataplex Universal Catalog 資料歷程整合」部分「環境設定」頁面上的連結,檢查與資料歷程相關的記錄。
疑難排解
如果系統未向 Lineage API 回報歷程資料,或您無法在 Dataplex Universal Catalog 中查看歷程資料,請嘗試下列疑難排解步驟:
- 確認 Cloud Composer 環境的專案已啟用 Data Lineage API。
- 檢查 Cloud Composer 環境是否已啟用資料歷程整合功能。
- 檢查您使用的運算子是否包含在自動沿襲報表支援中。請參閱「支援的 Airflow 運算子」。
- 在 Cloud Composer 中檢查沿襲記錄,找出可能的問題。