使用 Dataplex 通用目錄追蹤資料歷程

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本頁面說明如何在 Cloud Composer 中啟用資料沿襲整合功能。

關於資料歷程整合功能

資料歷程Dataplex Universal Catalog 的功能,可追蹤資料在系統中的移動方式,包括資料來源、傳遞至何處,以及套用的轉換。

Cloud Composer 會使用 apache-airflow-providers-openlineage 套件產生沿襲事件,並傳送至 Data Lineage API。

這個套件已安裝在 Cloud Composer 環境中。如果您安裝其他版本的套件,支援的運算子清單可能會有所變更。建議您只在必要時才這麼做,否則請保留預先安裝的套件版本。

  • 資料歷程適用於與 Dataplex Universal Catalog 區域相同的環境,且支援資料歷程

  • 如果 Cloud Composer 環境已啟用資料沿襲功能,Cloud Composer 會將使用支援的運算子的 DAG 沿襲資訊,回報給 Data Lineage API。如要為不受支援的運算子回報沿襲,也可以傳送自訂沿襲事件

  • 您可以透過下列方式存取沿襲資訊:

    • 資料沿襲 API
    • Dataplex Universal Catalog 中支援項目的歷程圖。 詳情請參閱 Dataplex Universal Catalog 說明文件的「歷程圖」一節。

建立環境時,如果符合下列條件,系統會自動啟用資料沿襲整合功能:

  • 專案已啟用 Data Lineage API。詳情請參閱 Dataplex Universal Catalog 說明文件的「啟用 Data Lineage API」。

  • Airflow 中未設定自訂 Lineage 後端

您可以在建立環境時停用資料沿襲整合。

對於現有環境,您可以隨時啟用停用資料歷程整合功能。

Cloud Composer 的功能考量事項

在下列情況下,Cloud Composer 會發出 RPC 呼叫來建立沿襲事件:

  • Airflow 工作開始或完成時
  • DAG 執行作業開始或完成時

如要瞭解這些實體的詳細資料,請參閱 Dataplex Universal Catalog 說明文件中的「歷程資訊模型」和「歷程 API 參考資料」。

發出的歷程流量須遵守 Data Lineage API 的配額。Cloud Composer 會耗用寫入配額。

處理沿襲資料的相關費用適用於沿襲定價。請參閱資料沿襲考量事項

Cloud Composer 的效能考量

Airflow 工作執行完畢後,系統會回報資料歷程。平均而言,資料沿襲報表約需 1 到 2 秒即可產生。

這不會影響工作本身的效能:如果系統無法順利向 Lineage API 報告沿襲資訊,Airflow 工作不會失敗。主要運算子邏輯不會受到影響,但整個工作執行個體會執行較久,以納入報表沿襲資料。

如果環境會回報資料沿襲,相關聯的費用會稍微增加,因為回報資料沿襲需要額外時間。

法規遵循

資料沿襲功能對 VPC Service Controls 等功能提供不同層級的支援。請參閱資料沿襲注意事項,確保支援等級符合環境需求。

事前準備

確認是否支援電信業者

資料歷程支援是由運算子所在供應商套件提供:

  1. 檢查運算子所在供應器套件的變更記錄,找出新增 OpenLineage 支援的項目。

    舉例來說,BigQueryToBigQueryOperator 從apache-airflow-providers-google 11.0.0 版開始支援 OpenLineage。

  2. 檢查環境使用的供應商套件版本。如要這麼做,請參閱環境中使用的 Airflow 版本預先安裝的套件清單。您也可以在環境中安裝其他版本的套件

此外,apache-airflow-providers-openlineage 說明文件中的「支援的類別」頁面列出了最新支援的運算子。

設定資料歷程整合

Cloud Composer 的資料歷程整合功能是以環境為單位進行管理。也就是說,啟用這項功能需要兩個步驟:

  1. 在專案中啟用 Data Lineage API。
  2. 在特定 Cloud Composer 環境中啟用資料歷程整合功能。

在 Cloud Composer 中啟用資料歷程

主控台

  1. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  2. 在環境清單中,按一下環境名稱。 「環境詳細資料」頁面隨即開啟。

  3. 選取「環境設定」分頁標籤。

  4. 在「Dataplex 資料歷程整合」部分中,按一下「編輯」

  5. 在「Dataplex 資料歷程整合功能」面板中,選取「啟用 Dataplex 資料歷程整合功能」,然後按一下「儲存」

gcloud

使用 --enable-cloud-data-lineage-integration 引數。

gcloud composer environments update ENVIRONMENT_NAME \
    --location LOCATION \
    --enable-cloud-data-lineage-integration

更改下列內容:

  • ENVIRONMENT_NAME:環境名稱。
  • LOCATION:環境所在的區域。

範例:

gcloud composer environments update example-environment \
    --location us-central1 \
    --enable-cloud-data-lineage-integration

在 Cloud Composer 中停用資料歷程

在 Cloud Composer 環境中停用歷程整合功能,不會停用 Data Lineage API。如要徹底停用專案的歷程報表功能,請一併停用 Data Lineage API。詳情請參閱停用服務的相關說明。

主控台

  1. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  2. 在環境清單中,按一下環境名稱。 「環境詳細資料」頁面隨即開啟。

  3. 選取「環境設定」分頁標籤。

  4. 在「Dataplex 資料歷程整合」部分中,按一下「編輯」

  5. 在「Dataplex 資料歷程整合功能」面板中,選取「停用 Dataplex 資料歷程整合功能」,然後按一下「儲存」

gcloud

使用 --disable-cloud-data-lineage-integration 引數。

gcloud composer environments update ENVIRONMENT_NAME \
    --location LOCATION \
    --disable-cloud-data-lineage-integration

更改下列內容:

  • ENVIRONMENT_NAME:環境名稱。
  • LOCATION:環境所在的區域。

範例:

gcloud composer environments update example-environment \
    --location us-central1 \
    --disable-cloud-data-lineage-integration

在支援的運算子中傳送沿襲事件

如果已啟用資料沿襲,系統會自動傳送沿襲事件。您不需要變更 DAG 程式碼。

舉例來說,執行下列工作:

task = BigQueryInsertJobOperator(
    task_id='snapshot_task',
    dag=dag,
    location='<dataset-location>',
    configuration={
        'query': {
            'query': 'SELECT * FROM dataset.tableA',
            'useLegacySql': False,
            'destinationTable': {
                'project_id': 'example-project',
                'dataset_id': 'dataset',
                'table_id': 'tableB',
            },
        }
    },
)

在 Dataplex Universal Catalog UI 中建立下列歷程圖:

Dataplex UI 中的歷程圖範例。
圖 1. Dataplex Universal Catalog UI 中的 BigQuery 資料表歷程圖示例。

傳送自訂歷程事件

如要為不支援自動沿襲報告的運算子回報沿襲,可以傳送自訂沿襲事件。

舉例來說,如要傳送自訂事件,請使用:

  • BashOperator:修改工作定義中的 inletsoutlets 參數。
  • PythonOperator:修改工作定義中的 task.inletstask.outlets 參數。
  • 您可以將 AUTO 用於 inlets 參數。這會將值設為上游工作的 outlets

以下範例說明如何使用入口和出口:

from airflow.composer.data_lineage.entities import BigQueryTable
from airflow.lineage import AUTO

...

bash_task = BashOperator(
    task_id="bash_task",
    dag=dag,
    bash_command="sleep 0",
    inlets=[
        BigQueryTable(
            project_id="example-project",
            dataset_id="dataset",
            table_id="table1",
        )
    ],
    outlets=[
        BigQueryTable(
            project_id="example-project",
            dataset_id="dataset",
            table_id="table2",
        )
    ],
)


def _python_task(task):
    print("Python task")


python_task = PythonOperator(
    task_id="python_task",
    dag=dag,
    python_callable=_python_task,
    inlets=[
        AUTO,
        BigQueryTable(
            project_id="example-project",
            dataset_id="dataset",
            table_id="table3",
        ),
    ],
    outlets=[
        BigQueryTable(
            project_id="example-project",
            dataset_id="dataset",
            table_id="table4",
        )
    ],
)

bash_task >> python_task

因此,Dataplex Universal Catalog UI 中會建立下列歷程圖:

Dataplex UI 中自訂事件的歷程圖範例。
圖 2. Dataplex Universal Catalog UI 中多個 BigQuery 資料表的歷程圖示例。

在 Cloud Composer 中查看沿襲記錄

您可以使用「Dataplex Universal Catalog 資料歷程整合」部分「環境設定」頁面上的連結,檢查與資料歷程相關的記錄。

疑難排解

如果系統未向 Lineage API 回報歷程資料,或您無法在 Dataplex Universal Catalog 中查看歷程資料,請嘗試下列疑難排解步驟:

  • 確認 Cloud Composer 環境的專案已啟用 Data Lineage API。
  • 檢查 Cloud Composer 環境是否已啟用資料歷程整合功能。
  • 檢查您使用的運算子是否包含在自動沿襲報表支援中。請參閱「支援的 Airflow 運算子」。
  • 在 Cloud Composer 中檢查沿襲記錄,找出可能的問題。

後續步驟