Airflow データベースにアクセスする

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

このページでは、Cloud Composer 環境の Airflow データベースを実行する Cloud SQL インスタンスに接続して SQL クエリを実行する方法について説明します。

たとえば、Airflow データベースで直接クエリを実行する、データベースのバックアップを作成する、データベースのコンテンツに基づいて統計情報を収集する、またはデータベースからその他のカスタム情報を取得することが必要になる場合があります。

始める前に

Airflow データベースで SQL クエリを実行する

Airflow データベースに接続するには:

  1. 1 つ以上の PostgresOperator 演算子を使用して DAG を作成します。始めるにあたっては、サンプル DAG を使用できます。

  2. 演算子の sql パラメータで、SQL クエリを指定します。

  3. この DAG を環境にアップロードします。

  4. DAG をトリガーします。たとえば、手動でトリガーすることも、スケジュールに従って実行されるまで待つこともできます。

DAG の例:

import datetime
import os

import airflow
from airflow.providers.postgres.operators.postgres import PostgresOperator

SQL_DATABASE = os.environ["SQL_DATABASE"]

with airflow.DAG(
    "airflow_db_connection_example",
    start_date=datetime.datetime(2024, 1, 1),
    schedule_interval=None,
    catchup=False) as dag:

    PostgresOperator(
        task_id="run_airflow_db_query",
        dag=dag,
        postgres_conn_id="airflow_db",
        database=SQL_DATABASE,
        sql="SELECT * FROM dag LIMIT 10;",
    )

データベース コンテンツをダンプしてバケットに転送する

次のステップ