存取 Airflow 資料庫

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本頁說明如何連線至 Cloud SQL 執行個體,該執行個體會執行 Cloud Composer 環境的 Airflow 資料庫,並執行 SQL 查詢。

舉例來說,您可能想直接在 Airflow 資料庫上執行查詢、備份資料庫、根據資料庫內容收集統計資料,或是從資料庫擷取任何其他自訂資訊。

事前準備

在 Airflow 資料庫上執行 SQL 查詢

如要連線至 Airflow 資料庫,請按照下列步驟操作:

  1. 使用一或多個 SQLExecuteQueryOperator 運算子建立 DAG。如要開始使用,可以採用範例 DAG。

  2. 在運算子的 sql 參數中,指定您的 SQL 查詢。

  3. 將這個 DAG 上傳至環境。

  4. 觸發 DAG,例如手動觸發,或等待系統按照排程執行。

DAG 範例:

import datetime
import os

import airflow
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

SQL_DATABASE = os.environ["SQL_DATABASE"]

with airflow.DAG(
    "airflow_db_connection_example",
    start_date=datetime.datetime(2025, 1, 1),
    schedule_interval=None,
    catchup=False) as dag:

    SQLExecuteQueryOperator(
        task_id="run_airflow_db_query",
        dag=dag,
        conn_id="airflow_db",
        database=SQL_DATABASE,
        sql="SELECT * FROM dag LIMIT 10;",
    )

如要進一步瞭解如何使用 SQLExecuteQueryOperator,請參閱 Airflow 說明文件中的「How-to Guide for Postgres using SQLExecuteQueryOperator」。

轉儲資料庫內容並轉移至值區

後續步驟