Airflow 接続の管理

このページでは、Airflow 接続の使用方法について説明します。

Airflow 接続を使用すると、Cloud Composer 環境から Google Cloud プロジェクトのリソースにアクセスできます。ログインやホスト名などの情報を保存する Airflow 接続 ID を作成し、ワークフローで接続 ID を参照します。Airflow 接続は、ワークフローで使用されるシークレットと認証情報を格納するのに推奨される方法です。

Airflow 接続によって、Cloud Composer 環境が Google Cloud プロジェクト、他のクラウド プロバイダ、サードパーティ サービスなどと通信するために必要な接続情報を保存できます。

Airflow 接続は、認証情報、ホスト名、追加の API パラメータなどの詳細情報を保存できます。各接続には ID が関連付けられています。この ID を使用して、ワークフロー タスクでプリセットの詳細を参照できます。Airflow 接続を使用してワークフロー タスクのシークレットと認証情報を保存することをおすすめします。

Google Cloud の接続の種類によって、Google Cloud の統合が有効になります。

Fernet 鍵と保護された接続

新しい環境を作成すると、Cloud Composer によってその環境用に一意で永続的な Fernet 鍵が生成され、デフォルトで connection extras が保護されます。fernet_key は Airflow 構成で確認できます。接続を保護する方法については、接続の保護をご覧ください。

デフォルトの接続の使用

デフォルトでは、Cloud Composer は次の Google Cloud Platform の Airflow 接続を構成します。

  • bigquery_default
  • google_cloud_default
  • google_cloud_datastore_default
  • google_cloud_storage_default

デフォルトの接続 ID を使用すると、DAG からの接続を使用できます。次の例では、デフォルトの接続で BigQueryOperator を使用しています。

task_default = bigquery_operator.BigQueryOperator(
    task_id='task_default_connection',
    bql='SELECT 1', use_legacy_sql=False)

演算子を作成するときに接続 ID を明示的に指定することもできます。

task_explicit = bigquery_operator.BigQueryOperator(
    task_id='task_explicit_connection',
    bql='SELECT 1', use_legacy_sql=False,
    # Composer creates a 'google_cloud_default' connection by default.
    bigquery_conn_id='google_cloud_default')

別のプロジェクトのリソースへのアクセス

Cloud Composer 環境で Google Cloud プロジェクトのリソースにアクセスするための推奨される方法は、デフォルトの接続を使用して、環境に関連付けられているサービス アカウントへの適切な Identity and Access Management 権限を割り当てることです。

以下のセクションでは、プロジェクト ID your-composer-projectにデプロイされた Cloud Composer 環境で your-storage-project の Cloud Storage バケットに対する読み取りと書き込みを許可する方法の例を示します。

環境に関連付けられているサービス アカウントの決定

Console

  1. Cloud Console で [環境] ページを開きます。

    [環境] ページを開く

  2. [名前] 列で、環境の名前をクリックして [環境の詳細] ページを開きます。
  3. [サービス アカウント] をメモします。この値は service-account-name@your-composer-project.iam.gserviceaccount.com などのメールアドレスです。

gcloud

次のコマンドを入力し、VARIABLES を適切な値に置き換えます。

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format="get(config.nodeConfig.serviceAccount)" 

出力に service-account-name@your-composer-project.iam.gserviceaccount.com などのアドレスが表示されます。

サービス アカウントへの適切な IAM 権限の付与

your-storage-project で Cloud Storage バケットの読み取りと書き込みを許可するには、Cloud Composer 環境に関連付けられたサービス アカウントに roles/storage.objectAdmin ロールを付与します。

コンソール

  1. ストレージ プロジェクトの [IAM と管理] ページ。

    [IAM と管理] ページを開く

  2. [メンバーを追加] をクリックします。

  3. [メンバーを追加] ダイアログで、Cloud Composer 環境に関連付けられているサービス アカウントの完全なメールアドレスを指定します。

  4. [ロールを選択] プルダウンで、適切な権限を選択します。この例では、[ストレージ] > [オブジェクト管理者] のロールを選択します。

  5. [追加] をクリックします。

gcloud

gcloud projects add-iam-policy-binding コマンドを使用して、プロジェクト レベルの IAM 権限を追加します。VARIABLES を適切な値に置き換えます。

gcloud projects add-iam-policy-binding YOUR_STORAGE_PROJECT \
    --member=serviceAccount:SERVICE_ACCOUNT_EMAIL \
    --role=roles/storage.objectAdmin 

適切な権限が付与されると、your-composer-project プロジェクトのリソースへのアクセスに使用するのと同じデフォルトの Airflow 接続で your-storage-project プロジェクトのリソースにアクセスできます。

新しい Airflow 接続の作成

始める前に

Cloud Composer 環境に関連付けられたサービス アカウントに適切な IAM 権限を付与し、DAG 定義でデフォルト接続を使用します。そのように処理できない場合は、このセクションの手順に沿って進めてください。

別のプロジェクトへの接続を作成する

以下の手順では、プロジェクト ID your-composer-projectにデプロイされた Cloud Composer 環境で your-storage-project の Cloud Storage バケットに対する読み取りと書き込みを許可する方法の例を示します。

  1. your-storage-project でサービス アカウントを作成し、JSON キーをダウンロードします。

    1. Cloud Console で、[サービス アカウント] ページを開きます。

      [サービス アカウント] ページを開く

    2. [プロジェクトを選択] をクリックします。

    3. プロジェクトを選択し、[開く] をクリックします。

    4. [サービス アカウントを作成] をクリックします。

    5. サービス アカウント名を入力し、サービス アカウントに付与する [Storage] > [Object Admin]などのロールを選択します。

    6. [新しい秘密鍵の提供] をオンにして、[保存] をクリックします。

    7. プレーン テキスト エディタで JSON ファイルを開きます。コンテンツは次のように表示されます。

      { "type": "service_account", "project_id": "your-storage-project", ... }

  2. 新しい接続を作成するには:

    Airflow UI

    1. Cloud Composer 環境で Airflow ウェブ インターフェースにアクセスする

    2. Airflow ウェブ インターフェースで、[管理者] > [接続] ページを開きます。

      Airflow のスクリーンショット。[管理者の接続] メニューを開きます。

    3. 新しい接続フォームを開くには、[作成] タブをクリックします。

      Airflow のスクリーンショット。[作成] タブをクリックします。

    4. 新しい接続を作成するには:

      1. 接続 ID を選択するには、[my_gcp_connection] などを [Conn Id] 項目に入力します。この ID を DAG 定義ファイルで使用します。
      2. [接続の種類] フィールドで、[Google Cloud Platform] オプションを選択します。
      3. サービス アカウントが属するプロジェクトに対応する [プロジェクト ID] の値を入力します。
      4. 次のいずれかを行います。

        1. ダウンロードしたサービス アカウントの JSON キーファイルを、お使いの環境の Cloud Storage バケットの data/ ディレクトリにコピーします。次に、[キーファイル パス] に、/home/airflow/gcs/data/keyfile.json などの JSON キーファイルの場所への Airflow ワーカー上のローカル ファイルパスを入力します。
        2. [JSON キーファイル] で、ダウンロードしたサービス アカウントの JSON キーファイルの内容をコピーします。

        CLI またはウェブ UI を使用して Airflow 接続にアクセスできるユーザーは、keyfile_dict に保存されている認証情報を読み取ることが可能です。こうした認証情報を保護するには、キーファイル パスを使用し、Cloud Storage ACL を使用してキーファイルへのアクセスを制限します。

      5. [スコープ] フィールドに値を入力します。Google Cloud リソースへのアクセスを制限するには、スコープとして https://www.googleapis.com/auth/cloud-platform を使用し、サービス アカウントの IAM 権限を使用することをおすすめします。

      6. 接続を作成するには、[保存] をクリックします。

        Airflow のスクリーンショット。[作成] タブをクリックします。

    gcloud

    次のコマンドを入力します。

    gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION connections -- --add \
    --conn_id=CONNECTION_ID --conn_type=google_cloud_platform \
    --conn_extra '{"extra\__google\_cloud\_platform\__CMD_ARGS": "...",
    "extra\__google\_cloud\_platform\__CMD_ARGS": "...", ...}'
    

    ここで

    • ENVIRONMENT_NAME は、環境の名前です。
    • LOCATION は、環境が配置される Compute Engine のリージョンです。
    • CONNECTION_ID は、接続の識別子です。小文字を使用し、単語をアンダースコアで区切ります。
    • CMD_ARGS は次のようになります。
      • project はプロジェクト ID です。extra__google_cloud_platform__project のみが必須です。
      • key_path は、/home/airflow/gcs/data/keyfile.json などの JSON キーファイルへの Airflow ワーカー上のローカル ファイルパスです。指定された場合は scope も必要です。key_pathkeyfile_dict のいずれかを使用します。両方は使用しまません。
      • keyfile_dict は、ダウンロードした JSON キーファイルの内容を指定する JSON オブジェクトです。指定された場合は scope も必要です。keyfile_dict または key_path を使用します。両方は使用しません。CLI またはウェブ UI を使用して Airflow 接続にアクセスできるユーザーは、keyfile_dict に保存されている認証情報を読み取ることが可能です。これらの認証情報を保護するには、key_path を使用して Cloud Storage ACL を適用してキーファイルへのアクセスを制限することをおすすめします。
      • scope は、OAuth スコープのカンマ区切りのリストです。

    例:

    gcloud composer environments run test-environment \
     --location us-central1 connections -- --add \
     --conn_id=my_gcp_connection --conn_type=google_cloud_platform \
     --conn_extra '{"extra\__google\_cloud\_platform\__project": "your-storage-project",
     "extra\__google\_cloud\_platform\__key_path": "/home/airflow/gcs/data/keyfile.json",
     "extra\__google\_cloud\_platform\__scope": "https://www.googleapis.com/auth/cloud-platform"}'

新しい Airflow 接続の使用

作成した接続を使用するには、Google Cloud Airflow 演算子を作成するときに、対応する接続 ID 引数として設定します。

task_custom = bigquery_operator.BigQueryOperator(
    task_id='task_custom_connection',
    bql='SELECT 1', use_legacy_sql=False,
    # Set a connection ID to use a connection that you have created.
    bigquery_conn_id='my_gcp_connection')

外部データベースへの接続を構成

Cloud Composer は、お使いの環境内にデフォルトの Cloud SQL プロキシを提供し、アプリケーション、クライアント、その他の Google Cloud サービスから、環境の Cloud SQL データベースへのアクセスをリモートで承認します。

SQL データベースやプライベート IP Cloud SQL インスタンスなどの外部データベースに Cloud Composer を接続するには、新しい SQL プロキシ Pod(yaml)を環境の GKE クラスタにデプロイします。

新しい SQL プロキシ サービスをデプロイすると、お使いの環境の GKE クラスタが外部データベースへの接続の起点になります。 Airflow ウェブサーバーが、たとえば Airflow 接続の形式で、外部データベースにアクセスするためには、SQL プロキシ サービスにウェブサーバーからアクセスできるようにする必要があります。そうするには、次のオプションを使用できます。

  • LoadBalancer サービスを作成して、この追加の SQL プロキシ サービスを公開します。 また、この LoadBalancer サービスへのアクセスを制限したい場合もあります。代わりに、セルフマネージド Airflow ウェブサーバーをデプロイすることもできます。
  • プライベート IP Cloud Composer 環境を作成し、ClusterIP サービスを使用して SQL プロキシ サービスを公開します。Cloud Composer でプライベート IP モードを有効にすると、ウェブサーバーは Kubernetes サービスに直接アクセスできます。