Airflow 接続を管理する

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

このページでは、環境内の Airflow 接続を管理し、DAG からアクセスする方法について説明します。

Airflow 接続について

Aiflow 接続には、認証情報、その他の接続情報(ユーザー名、接続文字列、パスワードなど)が保存されます。DAG は接続を使用して、DAG から Google Cloud やその他のサービスのリソースと通信してアクセスします。

DAG の Airflow オペレーターは、オペレーターのデフォルト接続を使用するか、カスタム接続名を指定します。

接続のセキュリティについて

ほとんどの Airflow オペレーターは認証情報を直接受け入れません。代わりに、Airflow 接続を使用します。

新しい環境を作成すると、Cloud Composer によってその環境用に一意で永続的な Fernet 鍵が生成され、デフォルトで接続エクストラが保護されます。fernet_key は、Airflow UI の [構成] ページで確認できます。

Airflow で接続とパスワードを保護する方法の詳細については、接続の保護センシティブ データのマスキングをご覧ください。

接続タイプについて

Airflow は、さまざまなタイプの接続を使用して特定のサービスに接続します。たとえば、Google Cloud の接続タイプは、Google Cloud 内の他のサービスに接続します。別の例として、S3 接続タイプは Amazon S3 バケットに接続します。

Airflow に接続タイプを追加するには、該当する接続タイプの PyPI パッケージをインストールします。一部のパッケージは環境にプリインストールされています。たとえば、カスタム PyPI パッケージをインストールせずに、apache-airflow-providers-google パッケージの接続を使用できます。

事前構成された接続

Cloud Composer は、環境で次のデフォルト接続を構成します。これらの接続を使用すると、構成しなくても、プロジェクト内のリソースにアクセスできます。

  • google_cloud_default
  • bigquery_default
  • google_cloud_datastore_default
  • google_cloud_storage_default

Secret Manager に接続を追加する

接続を Airflow に追加せずに、Secret Manager に保存できます。このアプローチは、認証情報などの機密情報を保存する場合におすすめします。

Secret Manager に接続を追加するには:

  1. 環境に Secret Manager を構成する

  2. 接続のパターンと一致する名前で、シークレットを追加します

    例: airflow-connections-example_connectionDAG では、接頭辞のない接続名 example_connection を使用します。

  3. 接続のパラメータを追加します。

    JSON 形式

    接続の JSON 表現をシークレットの値として追加します。次に例を示します。

    {
      "conn_type": "mysql",
      "host": "example.com",
      "login": "login",
      "password": "password",
      "port": "9000"
    }
    

    JSON 接続形式の詳細については、Airflow のドキュメントをご覧ください。

    URI の形式

    接続の URI 表現をシークレットの値として追加します。

    • Secret には、接続の URI 表現を保存する必要があります。例: mysql://login:password@example.com:9000

    • URI は URL エンコードする必要があります。たとえば、スペース記号を含むパスワードは、次のように URL エンコードする必要があります。mysql://login:secret%20password@example.com:9000

    Airflow には、接続 URI を生成するための便利なメソッドがあります。JSON エクストラで複雑な URL をエンコードする例については、Airflow ドキュメントをご覧ください。

  4. すべての接続パラメータが Secret Manager から正しく読み取られていることを確認します。

Airflow に接続を追加する

接続を Secret Manager に保存する代わりに、Airflow に保存することもできます。

Airflow に接続を追加するには:

Airflow CLI

Google Cloud CLI で connections add Airflow CLI コマンドを実行します。次に例を示します。

Airflow 2 では:

gcloud composer environments run ENVIRONMENT_NAME \
  --location LOCATION \
  connections add -- \
    --conn-type "mysql" \
    --conn-host "example.com" \
    --conn-port "9000" \
    --conn-login "login" \
    --conn-password "password" \
    example_connection

また、--conn-uri 引数を使用することもできます。

gcloud composer environments run ENVIRONMENT_NAME \
  --location LOCATION \
  connections add -- \
    --conn-uri "mysql://login:password@example.com:9000" \
    example_connection

Airflow 1 では:

gcloud composer environments run ENVIRONMENT_NAME \
  --location LOCATION \
  connections -- \
    --add \
    --conn_type "mysql" \
    --conn_host "example.com" \
    --conn_port "9000" \
    --conn_login "login" \
    --conn_password "password" \
    --conn_id "example_connection"

以下を置き換えます。

  • ENVIRONMENT_NAME: 環境の名前。
  • LOCATION: 環境が配置されているリージョン。

Airflow UI

接続の作成に関する Airflow ドキュメントに従います。

Airflow が接続を正しく読み取ることを確認する

Google Cloud CLI で connections get Airflow CLI コマンドを実行して、接続が正しく読み取られていることを確認できます。たとえば、接続を Secret Manager に保存する場合、これによって、接続のすべてのパラメータが Airflow によって Secret から読み取られるかどうかを確認することができます。

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    connections get \
    -- CONNECTION_NAME

以下のように置き換えます。

  • ENVIRONMENT_NAME を環境の名前にする。
  • LOCATION は、環境が配置されているリージョン。
  • CONNECTION_NAME は、接続の名前に置き換えます。接続が Secret Manager に保存されている場合は、接続接頭辞のない接続名を使用します。たとえば、airflow-connections-example_connection_json ではなく example_connection と指定します。

例:

gcloud composer environments run example-environment \
    --location us-central1 \
    connections get \
    -- example_connection -o json

DAG の Airflow 接続を使用する

このセクションでは、DAG から接続にアクセスする方法について説明します。

Secret Manager 接続を使用する

接頭辞を付けない接続の名前を使用します。たとえば、シークレットの名前が airflow-connections-aws_s3 の場合は、aws_s3 を指定します。

transfer_dir_from_s3 = S3ToGCSOperator(
    task_id='transfer_dir_from_s3',
    aws_conn_id='aws_s3',
    prefix='data-for-gcs',
    bucket='example-s3-bucket-transfer-operators',
    dest_gcs='gs://us-central1-example-environ-361f4221-bucket/data/from-s3/')

デフォルトの接続を Secret Manager に保存する場合は、接続名を省略できます。オペレーターによって使用されるデフォルトの接続名を取得するには、特定のオペレーターについて Airflow のドキュメントをご覧ください。たとえば、S3ToGCSOperator Airflow オペレーターはデフォルトで aws_default 接続を使用します。このデフォルトの接続は、airflow-connections-aws_default という名前のシークレットに保存できます。

Airflow に保存されている接続を使用する

Airflow で定義されているので、接続の名前を使用します。

transfer_dir_from_s3 = S3ToGCSOperator(
    task_id='transfer_dir_from_s3',
    aws_conn_id='aws_s3',
    prefix='data-for-gcs',
    bucket='example-s3-bucket-transfer-operators',
    dest_gcs='gs://us-central1-example-environ-361f4221-bucket/data/from-s3/')

オペレーターに対してデフォルトの接続を使用するには、接続名を省略します。オペレーターによって使用されるデフォルトの接続名を取得するには、特定のオペレーターについて Airflow のドキュメントをご覧ください。たとえば、S3ToGCSOperator Airflow オペレーターはデフォルトで aws_default 接続を使用します。

トラブルシューティング

環境で Secret Manager に保存されているシークレットにアクセスできない場合:

  1. Secret Manager が環境で構成されていることを確認します。

  2. Secret Manager の接続名が Airflow で使用されている接続に対応していることを確認します。たとえば、example_connection という名前の接続の場合、シークレット名は airflow-connections-example_connection です。

  3. Airflow が接続を正しく読み取ることを確認する

次のステップ