Airflow 接続を管理する

Cloud Composer 1 | Cloud Composer 2

このページでは、環境内の Airflow 接続を管理し、DAG からアクセスする方法について説明します。

Airflow 接続について

Aiflow 接続には、認証情報、その他の接続情報(ユーザー名、接続文字列、パスワードなど)が保存されます。DAG は接続を使用して、DAG から Google Cloud やその他のサービスのリソースと通信してアクセスします。

DAG の Airflow オペレーターは、オペレーターのデフォルト接続を使用するか、カスタム接続名を指定します。

接続のセキュリティについて

ほとんどの Airflow オペレーターは認証情報を直接受け入れません。代わりに Airflow 接続を使用します。

新しい環境を作成すると、Cloud Composer によってその環境用に一意で永続的な Fernet 鍵が生成され、デフォルトで接続エクストラが保護されます。fernet_key は、Airflow UI の [構成] ページで確認できます。

Airflow での接続とパスワードの保護方法の詳細については、接続の保護機密データのマスキングをご覧ください。

接続タイプについて

Airflow ではさまざまなタイプの接続を使用して特定のサービスに接続します。 たとえば、Google Cloud 接続タイプは、Google Cloud の他のサービスに接続します。別の例として、S3 接続タイプは Amazon S3 バケットに接続します。

Airflow に接続タイプを追加するには、その接続タイプで PyPI パッケージをインストールします。一部のパッケージは、環境にプリインストールされています。たとえば、カスタム PyPI パッケージをインストールせずに、apache-airflow-providers-google パッケージからの接続を使用できます。

事前構成された接続

Cloud Composer は、環境内の次のデフォルト接続を構成します。これらの接続を使用すると、リソースを構成せずにプロジェクト内のリソースにアクセスできます。

  • google_cloud_default
  • bigquery_default
  • google_cloud_datastore_default
  • google_cloud_storage_default

Secret Manager で接続を追加する

Airflow に追加せずに接続を Secret Manager に保存できます。認証情報やその他の機密情報を保存する際には、この方法を使用することをおすすめします。

Secret Manager で接続を追加するには:

  1. 環境に Secret Manager を構成する

  2. 接続のパターンと一致する名前のシークレットを追加します。

    例: airflow-connections-example_connectionDAG では、接頭辞 example_connection なしで接続名を使用します。

  3. 接続のパラメータを追加します。

    JSON 形式

    接続の JSON 表現をシークレットの値として追加します。例:

    {
      "conn_type": "mysql",
      "host": "example.com",
      "login": "login",
      "password": "password",
      "port": "9000"
    }
    

    JSON 接続形式の詳細については、Airflow のドキュメントをご覧ください。

    URI の形式

    接続の URI 表現をシークレットの値として追加します。

    • Secret には、接続の URI 表現を保存する必要があります。例: mysql://login:password@example.com:9000

    • URI は URL エンコードする必要があります。たとえば、スペース記号を含むパスワードは、mysql://login:secret%20password@example.com:9000 のように URL エンコードする必要があります。

    Airflow には、接続 URI を生成するための便利なメソッドがあります。JSON エクストラで複雑な URL をエンコードする例については、Airflow ドキュメントをご覧ください。

  4. すべての接続パラメータが Secret Manager から正しく読み取られることを確認します。

Airflow で接続を追加する

Secret Manager に接続を保存する代わりに、Airflow に保存することもできます。

Airflow で接続を追加するには:

Airflow CLI

Google Cloud CLI を使用して connections add Airflow CLI コマンドを実行します。例:

Airflow 2 では:

gcloud composer environments run ENVIRONMENT_NAME \
  --location LOCATION \
  connections add -- \
    --conn-type "mysql" \
    --conn-host "example.com" \
    --conn-port "9000" \
    --conn-login "login" \
    --conn-password "password" \
    example_connection

また、--conn-uri 引数を使用することもできます。

gcloud composer environments run ENVIRONMENT_NAME \
  --location LOCATION \
  connections add -- \
    --conn-uri "mysql://login:password@example.com:9000" \
    example_connection

Airflow 1 では:

gcloud composer environments run ENVIRONMENT_NAME \
  --location LOCATION \
  connections -- \
    --add \
    --conn_type "mysql" \
    --conn_host "example.com" \
    --conn_port "9000" \
    --conn_login "login" \
    --conn_password "password" \
    --conn_id "example_connection"

以下を置き換えます。

  • ENVIRONMENT_NAME: 環境の名前。
  • LOCATION: 環境が配置されているリージョン。

Airflow UI

接続の作成に関する Airflow ドキュメントに従います。

Airflow が接続を正しく読み取ることを確認する

Google Cloud CLI を使用して connections get Airflow CLI コマンドを実行し、接続が正しく読み取られることを確認できます。たとえば、接続を Secret Manager に保存する場合、これによって、接続のすべてのパラメータが Airflow によって Secret から読み取られるかどうかを確認することができます。

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    connections get \
    -- CONNECTION_NAME

以下のように置き換えます。

  • ENVIRONMENT_NAME を環境の名前にする。
  • LOCATION は、環境が配置されているリージョン。
  • CONNECTION_NAME は、接続の名前に置き換えます。接続が Secret Manager に保存されている場合は、接続接頭辞のない接続名を使用します。たとえば、airflow-connections-example_connection_json ではなく example_connection を指定します。

例:

gcloud composer environments run example-environment \
    --location us-central1 \
    connections get \
    -- example_connection -o json

DAG で Airflow 接続を使用する

このセクションでは、DAG から接続にアクセスする方法について説明します。

Secret Manager 接続を使用する

接頭辞のない接続名を使用します。たとえば、Secret の名前が airflow-connections-aws_s3 の場合、aws_s3 を指定します。

transfer_dir_from_s3 = S3ToGCSOperator(
    task_id='transfer_dir_from_s3',
    aws_conn_id='aws_s3',
    prefix='data-for-gcs',
    bucket='example-s3-bucket-transfer-operators',
    dest_gcs='gs://us-central1-example-environ-361f4221-bucket/data/from-s3/')

デフォルトの接続を Secret Manager に保存する場合は、接続名を省略できます。オペレーターによって使用されるデフォルトの接続名を取得するには、特定のオペレーターについて Airflow のドキュメントをご覧ください。たとえば、S3ToGCSOperator Airflow オペレーターはデフォルトで aws_default 接続を使用します。このデフォルトの接続は、airflow-connections-aws_default という名前のシークレットに保存できます。

Airflow に保存されている接続を使用する

Airflow で定義されている接続の名前を使用します。

transfer_dir_from_s3 = S3ToGCSOperator(
    task_id='transfer_dir_from_s3',
    aws_conn_id='aws_s3',
    prefix='data-for-gcs',
    bucket='example-s3-bucket-transfer-operators',
    dest_gcs='gs://us-central1-example-environ-361f4221-bucket/data/from-s3/')

オペレーターに対してデフォルトの接続を使用するには、接続名を省略します。オペレーターによって使用されるデフォルトの接続名を取得するには、特定のオペレーターについて Airflow のドキュメントをご覧ください。たとえば、S3ToGCSOperator Airflow オペレーターはデフォルトで aws_default 接続を使用します。

トラブルシューティング

Secret Manager に保存されているシークレットに環境がアクセスできない場合:

  1. Secret Manager が環境で構成されていることを確認します。

  2. Secret Manager の接続名が、Airflow で使用される接続に対応していることを確認します。たとえば、example_connection という名前の接続の場合、シークレット名は airflow-connections-example_connection です。

  3. Airflow が接続を正しく読み取ることを確認します。

次のステップ