Airflow データベースにアクセスする

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

このページでは、Cloud Composer 環境の Airflow データベースを実行する Cloud SQL インスタンスに接続して SQL クエリを実行する方法について説明します。

たとえば、Airflow データベースで直接クエリを実行する、データベースのバックアップを作成する、データベースのコンテンツに基づいて統計情報を収集する、またはデータベースからその他のカスタム情報を取得することが必要になる場合があります。

Cloud Composer 環境の場合、これを行うための 1 つの方法は、環境の GKE クラスタ内の VM から Airflow データベースに接続することです。

環境のクラスタの名前とゾーンを取得する

  1. Google Cloud Console で [環境] ページに移動します。

    [環境] に移動

  2. 環境を選択します。

  3. [環境の設定] タブに移動します。

  4. [GKE クラスタ] 項目に、環境の GKE クラスタの名前とゾーンが表示されます。

    たとえば、このパラメータの値が projects/example-project/zones/europe-west3-a/clusters/europe-west3-composer-exam--7b206598-gke の場合、クラスタ名は europe-west3-composer-exam--7b206598-gke、クラスタゾーンは europe-west3-a です。

データベース接続パラメータを取得する

Airflow 2

  1. 環境の Airflow ウェブ インターフェースにアクセスします。
  2. Airflow ウェブ インターフェースで、[管理者] > [構成] に移動します。

  3. sql_alchemy_conn パラメータを見つけます。

  4. このパラメータの値から、ユーザー名、パスワード、データベース名を取得します。

    たとえば、このパラメータの値が postgresql+psycopg2://root:example-password@127.0.0.1:3306/composer-1-17-0-airflow-2-0-1-400fa094、ユーザー名が root、パスワードが example-password の場合、データベース名は composer-1-17-0-airflow-2-0-1-400fa094 です。

Airflow 1

  1. 環境の Airflow ウェブ インターフェースにアクセスします。
  2. Airflow ウェブ インターフェースで、[管理者] > [構成] に移動します。
  3. sql_alchemy_conn パラメータを見つけます。
  4. このパラメータの値から、ユーザー名、パスワード、データベース名を取得します。

    たとえば、このパラメータの値が mysql+mysqldb://root:example-password@127.0.0.1/composer-1-16-2-airflow-1-10-14-3e9e2312?charset=utf8、ユーザー名が root、パスワードが example-password の場合、データベース名は composer-1-16-2-airflow-1-10-14-3e9e2312 です。

データベース エンドポイント アドレスを取得する

  1. Google Cloud コンソールで、[Kubernetes Engine] > [Gateways、Services、Ingress] ページに移動します。

    [Service と Ingress] に移動

  2. クラスタの airflow-sqlproxy-service サービスを選択します。クラスタの名前または表示されているエンドポイントの IP 範囲を使用して、正しい項目を検索できます。

  3. [サービスの詳細] ページで、処理元の Pod の IP アドレスを探します。これは [処理元の Pod] の [エンドポイント] 列に表示されます。

VM インスタンスを作成する

次のパラメータを使用して新しい VM インスタンスを作成します。

  • Region を使用します。クラスタと同じリージョンを選択します。

  • ゾーン。クラスタと同じゾーンを選択します。

  • ブートディスクLinux ディストリビューション(Ubuntu 21.04 LTS など)を選択します。

  • クラスタの IP 範囲内の IP アドレスを指定します。

    1. [管理、セキュリティ、ディスク、ネットワーク、単一テナンシー] メニューを展開します。
    2. [ネットワーキング] タブに移動します。
    3. [ネットワーク インターフェース] で、[デフォルト] インターフェース項目を展開します。
    4. [エイリアス IP 範囲を表示] を展開します。
    5. [サブネット範囲] で、クラスタの gke-services IP 範囲を選択します。例: gke-europe-west3-composer-exam7b206598-gke-services-115b26e7前のステップでクラスタの名前を取得しました。
    6. [エイリアス IP 範囲] で、指定したサブネット範囲の IP アドレス範囲内の VM インスタンスの IP アドレスを指定します。

VM インスタンスに接続し、SQL クライアント パッケージをインストールする

Airflow 2

  1. 前のステップで作成した VM インスタンスに接続します。

  2. 次の方法で、postgresql-client パッケージをインストールします。

    sudo apt-get update
    sudo apt-get install postgresql-client
    

Airflow 1

  1. 前のステップで作成した VM インスタンスに接続します。

  2. 次の方法で、mysql-client パッケージをインストールします。

    sudo apt-get update
    sudo apt-get install mysql-client
    

Airflow データベースに接続する

VM インスタンスに接続した状態で、Airflow データベースに接続するには、次のコマンドを実行します。

Airflow 2

psql --user=USERNAME --password \
  --host=DB_ENDPOINT \
  --port=3306 \
  DB_NAME

Airflow 1

mysql --user=USERNAME --password \
  --host=DB_ENDPOINT \
  DB_NAME

前のステップで取得した値に置き換えます。

  • USERNAME は、ユーザー名に置き換えます。
  • DB_ENDPOINT は、先にこのガイドで取得したデータベース エンドポイントの IP アドレスに置き換えます。
  • DB_NAME は、データベース名に置き換えます。

SQL クエリを実行する

プロンプトが表示されます。このプロンプトで SQL クエリを実行できます。次に例を示します。

SELECT * FROM dag LIMIT 10;

データベース コンテンツをダンプしてバケットに転送する

データベース コンテンツをファイルにダンプするには、次のコマンドを実行します。

Airflow 2

pg_dump --user=USERNAME --password \
  --host=DB_ENDPOINT \
  --port=3306 \
  DB_NAME > DUMP_FILE

Airflow 1

mysqldump --user=USERNAME --password \
  --host=DB_ENDPOINT \
  DB_NAME > DUMP_FILE

以下のように置き換えます。

  • USERNAME は、ユーザー名に置き換えます。
  • DB_ENDPOINT は、データベース エンドポイントの IP アドレスに置き換えます。
  • DB_NAME は、データベース名に置き換えます。
  • DUMP_FILE は、ダンプファイルの名前に置き換えます。例: dump.sql

ダンプファイルを Cloud Storage バケットに転送するには:

gcloud storage cp DUMP_FILE BUCKET_ADDRESS

以下のように置き換えます。

  • DUMP_FILE は、ダンプファイルの名前に置き換えます。
  • BUCKET_ADDRESS は、バケットのアドレスに置き換えます。例: gs://europe-west3-db-dump

その代わりに、gcloud compute scp を使用してダンプファイルをローカルに転送することもできます。

次のステップ