Airflow 接続の管理

このページでは、Airflow 接続の使用方法について説明します。

Airflow 接続を使用すると、Cloud Composer 環境から Google Cloud プロジェクトのリソースにアクセスできます。ログインやホスト名などの情報を保存する Airflow 接続 ID を作成し、ワークフローで接続 ID を参照します。Airflow 接続は、ワークフローで使用されるシークレットと認証情報を格納するのに推奨される方法です。

Airflow 接続によって、Cloud Composer 環境が Google Cloud プロジェクト、他のクラウド プロバイダ、サードパーティ サービスなどと通信するために必要な接続情報を保存できます。

Airflow 接続は、認証情報、ホスト名、追加の API パラメータなどの詳細情報を保存できます。各接続には ID が関連付けられています。この ID を使用して、ワークフロー タスクでプリセットの詳細を参照できます。Airflow 接続を使用してワークフロー タスクのシークレットと認証情報を保存することをおすすめします。

Google Cloud の接続の種類によって、Google Cloud の統合が有効になります。

Fernet 鍵と保護された接続

新しい環境を作成すると、Cloud Composer によってその環境用に一意で永続的な Fernet 鍵が生成され、デフォルトで接続エクストラが保護されます。fernet_key は Airflow 構成で確認できます。接続を保護する方法については、接続の保護をご覧ください。

デフォルトの接続の使用

デフォルトでは、Cloud Composer は次の Google Cloud Platform の Airflow 接続を構成します。

  • bigquery_default
  • google_cloud_default
  • google_cloud_datastore_default
  • google_cloud_storage_default

デフォルトの接続 ID を使用すると、DAG からの接続を使用できます。次の例では、デフォルトの接続で BigQueryOperator を使用しています。

Airflow 2

task_default = bigquery.BigQueryInsertJobOperator(
    task_id='task_default_connection',
    configuration={
        "query": {
            "query": 'SELECT 1',
            "useLegacySql": False
        }
    }
)

Airflow 1

task_default = bigquery_operator.BigQueryOperator(
    task_id='task_default_connection',
    sql='SELECT 1', use_legacy_sql=False)

演算子を作成するときに接続 ID を明示的に指定することもできます。

Airflow 2

# Composer creates a 'google_cloud_default' connection by default.
task_explicit = bigquery.BigQueryInsertJobOperator(
    task_id='task_explicit_connection',
    gcp_conn_id='google_cloud_default',
    configuration={
        "query": {
            "query": 'SELECT 1',
            "useLegacySql": False
        }
    }
)

Airflow 1

task_explicit = bigquery_operator.BigQueryOperator(
    task_id='task_explicit_connection',
    sql='SELECT 1', use_legacy_sql=False,
    # Composer creates a 'google_cloud_default' connection by default.
    bigquery_conn_id='google_cloud_default')

別のプロジェクトのリソースへのアクセス

Cloud Composer 環境で Google Cloud プロジェクトのリソースにアクセスするための推奨される方法は、デフォルトの接続を使用して、環境に関連付けられているサービス アカウントへの適切な Identity and Access Management 権限を割り当てることです。

以下のセクションでは、プロジェクト ID your-composer-projectにデプロイされた Cloud Composer 環境で your-storage-project の Cloud Storage バケットに対する読み取りと書き込みを許可する方法の例を示します。

環境に関連付けられているサービス アカウントの決定

Console

  1. Cloud Console で [環境] ページを開きます。

    [環境] ページを開く

  2. [名前] 列で、環境の名前をクリックして [環境の詳細] ページを開きます。
  3. [サービス アカウント] をメモします。この値は service-account-name@your-composer-project.iam.gserviceaccount.com などのメールアドレスです。

gcloud

次のコマンドを入力し、VARIABLES を適切な値に置き換えます。

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format="get(config.nodeConfig.serviceAccount)" 

出力に service-account-name@your-composer-project.iam.gserviceaccount.com などのアドレスが表示されます。

サービス アカウントへの適切な IAM 権限の付与

your-storage-project で Cloud Storage バケットの読み取りと書き込みを許可するには、Cloud Composer 環境に関連付けられたサービス アカウントに roles/storage.objectAdmin ロールを付与します。

コンソール

  1. ストレージ プロジェクトの [IAM と管理] ページ。

    [IAM と管理] ページを開く

  2. [メンバーを追加] をクリックします。

  3. [メンバーを追加] ダイアログで、Cloud Composer 環境に関連付けられているサービス アカウントの完全なメールアドレスを指定します。

  4. [ロールを選択] プルダウンで、適切な権限を選択します。この例では、[ストレージ] > [オブジェクト管理者] のロールを選択します。

  5. [追加] をクリックします。

gcloud

gcloud projects add-iam-policy-binding コマンドを使用して、プロジェクト レベルの IAM 権限を追加します。VARIABLES を適切な値に置き換えます。

gcloud projects add-iam-policy-binding YOUR_STORAGE_PROJECT \
    --member=serviceAccount:SERVICE_ACCOUNT_EMAIL \
    --role=roles/storage.objectAdmin 

適切な権限が付与されると、your-composer-project プロジェクトのリソースへのアクセスに使用するのと同じデフォルトの Airflow 接続で your-storage-project プロジェクトのリソースにアクセスできます。

新しい Airflow 接続の作成

始める前に

Cloud Composer 環境に関連付けられたサービス アカウントに適切な IAM 権限を付与し、DAG 定義でデフォルト接続を使用します。これができない場合は、このセクションの手順をお試しください。

別のプロジェクトへの接続の作成

以下の手順では、プロジェクト ID your-composer-projectにデプロイされた Cloud Composer 環境で your-storage-project の Cloud Storage バケットに対する読み取りと書き込みを許可する方法の例を示します。

  1. your-storage-project でサービス アカウントを作成し、JSON キーをダウンロードします。

    1. Cloud Console で、[サービス アカウント] ページを開きます。

      [サービス アカウント] ページを開く

    2. [プロジェクトを選択] をクリックします。

    3. プロジェクトを選択し、[開く] をクリックします。

    4. [サービス アカウントを作成] をクリックします。

    5. サービス アカウント名を入力し、サービス アカウントに付与する [Storage] > [Object Admin]などのロールを選択します。

    6. [新しい秘密鍵の提供] をオンにして、[保存] をクリックします。

    7. プレーン テキスト エディタで JSON ファイルを開きます。コンテンツは次のように表示されます。

      { "type": "service_account", "project_id": "your-storage-project", ... }

  2. 新しい接続を作成するには:

    Airflow UI

    1. Cloud Composer 環境で Airflow ウェブ インターフェースにアクセスする

    2. Airflow ウェブ インターフェースで、[管理者] > [接続] ページを開きます。

      Airflow のスクリーンショット。[管理者の接続] メニューを開きます。

    3. 新しい接続フォームを開くには、[作成] タブをクリックします。

      Airflow のスクリーンショット。[作成] タブをクリックします。

    4. 新しい接続を作成するには:

      1. 接続 ID を選択するには、[Conn Id] フィールドに入力します(例: my_gcp_connection)。この ID は DAG 定義ファイルで使用します。
      2. [接続の種類] フィールドで、[Google Cloud Platform] オプションを選択します。
      3. サービス アカウントが属するプロジェクトに対応する [プロジェクト ID] の値を入力します。
      4. 次のいずれかを行います。

        1. ダウンロードしたサービス アカウントの JSON キーファイルを、お使いの環境の Cloud Storage バケットの data/ ディレクトリにコピーします。次に、[キーファイル パス] に、/home/airflow/gcs/data/keyfile.json などの JSON キーファイルの場所への Airflow ワーカー上のローカル ファイルパスを入力します。
        2. [JSON キーファイル] で、ダウンロードしたサービス アカウントの JSON キーファイルの内容をコピーします。

        CLI またはウェブ UI を使用して Airflow 接続にアクセスできるユーザーは、keyfile_dict に保存されている認証情報を読み取ることが可能です。こうした認証情報を保護するには、キーファイル パスを使用し、Cloud Storage ACL を使用してキーファイルへのアクセスを制限します。

      5. [スコープ] フィールドに値を入力します。Google Cloud リソースへのアクセスを制限するには、スコープとして https://www.googleapis.com/auth/cloud-platform を使用し、サービス アカウントの IAM 権限を使用することをおすすめします。

      6. 接続を作成するには、[保存] をクリックします。

        Airflow のスクリーンショット。[作成] タブをクリックします。

    gcloud

    次のコマンドを入力します。

    Airflow 1.10 CLI

    gcloud composer environments run \
      ENVIRONMENT_NAME \
      --location LOCATION \
      connections -- --add \
      --conn_id=CONNECTION_ID \
      --conn_type=google_cloud_platform \
      --conn_extra '{"extra\__google\_cloud\_platform\__CMD_ARGS": "...",
      "extra\__google\_cloud\_platform\__CMD_ARGS": "...", ...}'
    

    Airflow 2.0 CLI

    gcloud beta composer environments run \
    ENVIRONMENT_NAME \
      --location LOCATION \
      connections add -- \
      CONNECTION_ID \
      --conn-type=google_cloud_platform \
      --conn-extra '{"extra\__google\_cloud\_platform\__CMD_ARGS": "...",
      "extra\__google\_cloud\_platform\__CMD_ARGS": "...", ...}'
    

    ここで

    • ENVIRONMENT_NAME は、環境の名前です。
    • LOCATION は、環境が配置される Compute Engine のリージョンです。
    • CONNECTION_ID は、接続の識別子です。小文字を使用して、単語はアンダースコアで区切ります。
    • CMD_ARGS は次のようになります。
      • project はプロジェクト ID です。extra__google_cloud_platform__project のみが必須です。
      • key_path は、/home/airflow/gcs/data/keyfile.json などの JSON キーファイルへの Airflow ワーカー上のローカル ファイルパスです。指定された場合は scope も必要です。key_pathkeyfile_dict のいずれかを使用します。両方は使用しまません。
      • keyfile_dict は、ダウンロードした JSON キーファイルの内容を指定する JSON オブジェクトです。指定された場合は scope も必要です。keyfile_dictkey_path のいずれかを使用します。両方は使用しまません。CLI またはウェブ UI を使用して Airflow 接続にアクセスできるユーザーは、keyfile_dict に保存されている認証情報を読み取ることが可能です。これらの認証情報を保護するには、key_path を使用して Cloud Storage ACL を適用してキーファイルへのアクセスを制限することをおすすめします。
      • scope は、OAuth スコープのカンマ区切りのリストです。

    例:

    Airflow 1.10 CLI

    gcloud composer environments run test-environment \
       --location us-central1 connections -- --add \
       --conn_id=my_gcp_connection --conn_type=google_cloud_platform \
       --conn_extra '{"extra\__google\_cloud\_platform\__project": "your-storage-project",
       "extra\__google\_cloud\_platform\__key_path": "/home/airflow/gcs/data/keyfile.json",
       "extra\__google\_cloud\_platform\__scope": "https://www.googleapis.com/auth/cloud-platform"}'
    

    Airflow 2.0 CLI

    gcloud beta composer environments run test-environment \
       --location us-central1 connections add -- \
       --conn-id=my_gcp_connection --conn-type=google_cloud_platform \
       --conn-extra '{"extra\__google\_cloud\_platform\__project": "your-storage-project",
       "extra\__google\_cloud\_platform\__key_path": "/home/airflow/gcs/data/keyfile.json",
       "extra\__google\_cloud\_platform\__scope": "https://www.googleapis.com/auth/cloud-platform"}'
    

新しい Airflow 接続の使用

作成した接続を使用するには、Google Cloud Airflow 演算子を作成するときに、対応する接続 ID 引数として設定します。

Airflow 2

# Set a gcp_conn_id to use a connection that you have created.
task_custom = bigquery.BigQueryInsertJobOperator(
    task_id='task_custom_connection',
    gcp_conn_id='my_gcp_connection',
    configuration={
        "query": {
            "query": 'SELECT 1',
            "useLegacySql": False
        }
    }
)

Airflow 1

task_custom = bigquery_operator.BigQueryOperator(
    task_id='task_custom_connection',
    sql='SELECT 1', use_legacy_sql=False,
    # Set a connection ID to use a connection that you have created.
    bigquery_conn_id='my_gcp_connection')