Cloud Composer 1 | Cloud Composer 2
このページでは、環境内の Airflow 接続を管理し、DAG からアクセスする方法について説明します。
Airflow 接続について
Aiflow 接続には、認証情報、その他の接続情報(ユーザー名、接続文字列、パスワードなど)が保存されます。DAG は接続を使用して、DAG から Google Cloud やその他のサービスのリソースと通信してアクセスします。
DAG の Airflow オペレーターは、オペレーターのデフォルト接続を使用するか、カスタム接続名を指定します。
接続のセキュリティについて
ほとんどの Airflow オペレーターは認証情報を直接受け入れません。代わりに Airflow 接続を使用します。
新しい環境を作成すると、Cloud Composer によってその環境用に一意で永続的な Fernet 鍵が生成され、デフォルトで接続エクストラが保護されます。fernet_key
は、Airflow UI の [構成] ページで確認できます。
Airflow で接続とパスワードを保護する方法の詳細については、接続の保護と機密データのマスキングをご覧ください。
接続タイプについて
Airflow は、さまざまなタイプの接続を使用して、特定のサービスに接続します。たとえば、Google Cloud の接続タイプは、Google Cloud 内の他のサービスに接続します。別の例として、S3 接続タイプが Amazon S3 バケットに接続します。
Airflow に接続タイプを追加するには、その接続タイプで PyPI パッケージをインストールします。一部のパッケージは環境にプリインストールされています。たとえば、カスタム PyPI パッケージをインストールせずに、apache-airflow-providers-google
パッケージからの接続を使用できます。
事前構成された接続
Cloud Composer では、環境内で次のデフォルト接続が構成されます。これらの接続を使用すると、構成しなくてもプロジェクト内のリソースにアクセスできます。
google_cloud_default
bigquery_default
google_cloud_datastore_default
google_cloud_storage_default
Secret Manager に接続を追加する
Airflow に追加せずに、Secret Manager に接続を保存できます。認証情報やその他の機密情報を保存する場合は、この方法を使用することをおすすめします。
Secret Manager に接続を追加するには:
接続のパターンと一致する名前で、シークレットを追加します。
例:
airflow-connections-example_connection
DAG では、接頭辞example_connection
のない接続名を使用します。接続のパラメータを追加します。
JSON 形式
接続の JSON 表現を Secret の値として追加します。例:
{ "conn_type": "mysql", "host": "example.com", "login": "login", "password": "password", "port": "9000" }
JSON 接続形式の詳細については、Airflow のドキュメントをご覧ください。
URI の形式
接続の URI 表現をシークレットの値として追加します。
シークレットには、接続の URI 表現を保存する必要があります。例:
mysql://login:password@example.com:9000
URI は URL エンコードする必要があります。たとえば、パスワードにスペース記号が含まれている場合は、
mysql://login:secret%20password@example.com:9000
のように URL エンコードする必要があります。
Airflow には、接続 URI を生成するための便利なメソッドがあります。JSON エクストラで複雑な URL をエンコードする例については、Airflow ドキュメントをご覧ください。
すべての接続パラメータが Secret Manager から正しく読み取られることを確認します。
Airflow に接続を追加する
Secret Manager に接続を保存する代わりに、Airflow に保存することもできます。
Airflow に接続を追加するには:
Airflow CLI
Google Cloud CLI で connections add
Airflow CLI コマンドを実行します。次に例を示します。
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
connections add -- \
--conn-type "mysql" \
--conn-host "example.com" \
--conn-port "9000" \
--conn-login "login" \
--conn-password "password" \
example_connection
また、--conn-uri
引数を使用することもできます。
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
connections add -- \
--conn-uri "mysql://login:password@example.com:9000" \
example_connection
以下を置き換えます。
ENVIRONMENT_NAME
: 環境の名前。LOCATION
: 環境が配置されているリージョン。
Airflow UI
接続の作成に関する Airflow ドキュメントに従います。
Airflow が接続を正しく読み取ることを確認する
Google Cloud CLI で connections get
Airflow CLI コマンドを実行して、接続が正しく読み取られていることを確認できます。たとえば、接続を Secret Manager に保存する場合、これによって、接続のすべてのパラメータが Airflow によって Secret から読み取られるかどうかを確認することができます。
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
connections get \
-- CONNECTION_NAME
以下のように置き換えます。
ENVIRONMENT_NAME
を環境の名前にする。LOCATION
は、環境が配置されているリージョン。CONNECTION_NAME
は、接続の名前に置き換えます。接続が Secret Manager に保存されている場合は、接続接頭辞のない接続名を使用します。たとえば、airflow-connections-example_connection_json
ではなくexample_connection
と指定します。
例:
gcloud composer environments run example-environment \
--location us-central1 \
connections get \
-- example_connection -o json
DAG で Airflow 接続を使用する
このセクションでは、DAG から接続にアクセスする方法について説明します。
Secret Manager 接続を使用する
接頭辞のない接続の名前を使用します。たとえば、Secret の名前が airflow-connections-aws_s3
の場合、aws_s3
を指定します。
transfer_dir_from_s3 = S3ToGCSOperator(
task_id='transfer_dir_from_s3',
aws_conn_id='aws_s3',
prefix='data-for-gcs',
bucket='example-s3-bucket-transfer-operators',
dest_gcs='gs://us-central1-example-environ-361f4221-bucket/data/from-s3/')
デフォルトの接続を Secret Manager に保存する場合は、接続名を省略できます。オペレーターによって使用されるデフォルトの接続名を取得するには、特定のオペレーターについて Airflow のドキュメントをご覧ください。たとえば、S3ToGCSOperator
Airflow オペレーターはデフォルトで aws_default
接続を使用します。このデフォルト接続は airflow-connections-aws_default
という名前のシークレットに保存できます。
Airflow に保存されている接続を使用する
Airflow で定義されているので、接続の名前を使用します。
transfer_dir_from_s3 = S3ToGCSOperator(
task_id='transfer_dir_from_s3',
aws_conn_id='aws_s3',
prefix='data-for-gcs',
bucket='example-s3-bucket-transfer-operators',
dest_gcs='gs://us-central1-example-environ-361f4221-bucket/data/from-s3/')
オペレーターに対してデフォルトの接続を使用するには、接続名を省略します。オペレーターによって使用されるデフォルトの接続名を取得するには、特定のオペレーターについて Airflow のドキュメントをご覧ください。たとえば、S3ToGCSOperator
Airflow オペレーターはデフォルトで aws_default
接続を使用します。
トラブルシューティング
環境が Secret Manager に保存されているシークレットにアクセスできない場合:
Secret Manager が環境で構成されていることを確認します。
Secret Manager の接続名が Airflow で使用される接続に対応していることを確認します。たとえば、
example_connection
という名前の接続の場合、シークレット名はairflow-connections-example_connection
です。Airflow が接続を正しく読み取ることを確認します。