Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
このページでは、環境内の Airflow 接続を管理し、DAG からアクセスする方法について説明します。
Airflow 接続について
Aiflow 接続には、認証情報、その他の接続情報(ユーザー名、接続文字列、パスワードなど)が保存されます。DAG は接続を使用して、DAG から Google Cloud やその他のサービスのリソースと通信してアクセスします。
DAG の Airflow オペレーターは、オペレーターのデフォルト接続を使用するか、カスタム接続名を指定します。
接続セキュリティについて
ほとんどの Airflow オペレーターは認証情報を直接受け入れません。代わりに、Airflow 接続を使用します。
新しい環境を作成すると、Cloud Composer によってその環境用に一意で永続的な Fernet 鍵が生成され、デフォルトで接続エクストラが保護されます。fernet_key
は、Airflow UI の [構成] ページで確認できます。
Airflow で接続とパスワードを保護する方法の詳細については、Airflow ドキュメントの接続の保護とセンシティブ データのマスキングをご覧ください。
接続タイプについて
Airflow は、さまざまな接続タイプを使用して特定のサービスに接続します。たとえば、Google Cloud 接続タイプは、Google Cloud 内の他のサービスに接続します。別の例として、S3 接続タイプは Amazon S3 バケットに接続します。
Airflow に接続タイプを追加するには、該当する接続タイプの PyPI パッケージをインストールします。一部のパッケージは環境にプリインストールされています。たとえば、カスタム PyPI パッケージをインストールせずに、apache-airflow-providers-google
パッケージの接続を使用できます。
事前構成された接続
Cloud Composer は、お使いの環境で次のデフォルト接続を構成します。これらの接続を使用すると、プロジェクト内のリソースを構成することなくそれらにアクセスできます。
google_cloud_default
bigquery_default
google_cloud_datastore_default
google_cloud_storage_default
Secret Manager に接続を追加する
接続を Airflow に追加することなく Secret Manager に保存できます。この方法は、認証情報などの機密情報を保存する場合におすすめです。
Secret Manager に接続を追加するには:
接続のパターンと一致する名前で、シークレットを追加します。
例:
airflow-connections-example_connection
DAG では、接頭辞のない接続名example_connection
を使用します。接続のパラメータを追加します。
JSON 形式
接続の JSON 表現をシークレットの値として追加します。次に例を示します。
{ "conn_type": "mysql", "host": "example.com", "login": "login", "password": "password", "port": "9000" }
JSON 接続形式の詳細については、Airflow のドキュメントをご覧ください。
URI の形式
接続の URI 表現をシークレットの値として追加します。
Secret に接続の URI 表現を保存する必要があります。例:
mysql://login:password@example.com:9000
URI は URL エンコードする必要があります。たとえば、スペース記号を含むパスワードは、次のように URL エンコードする必要があります:
mysql://login:secret%20password@example.com:9000
。
Airflow には、接続 URI を生成するための便利なメソッドがあります。JSON エクストラで複雑な URL をエンコードする例については、Airflow ドキュメントをご覧ください。
すべての接続パラメータが Secret Manager から正しく読み取られていることを確認します。
Airflow に接続を追加する
接続を Secret Manager に保存する代わりに、Airflow に保存することもできます。
Airflow に接続を追加するには:
Airflow CLI
Google Cloud CLI で connections add
Airflow CLI コマンドを実行します。次に例を示します。
Airflow 2 では:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
connections add -- \
--conn-type "mysql" \
--conn-host "example.com" \
--conn-port "9000" \
--conn-login "login" \
--conn-password "password" \
example_connection
また、--conn-uri
引数を使用することもできます。
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
connections add -- \
--conn-uri "mysql://login:password@example.com:9000" \
example_connection
Airflow 1 では:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
connections -- \
--add \
--conn_type "mysql" \
--conn_host "example.com" \
--conn_port "9000" \
--conn_login "login" \
--conn_password "password" \
--conn_id "example_connection"
以下を置き換えます。
ENVIRONMENT_NAME
: 環境の名前。LOCATION
: 環境が配置されているリージョン。
Airflow UI
接続の作成に関する Airflow ドキュメントに従います。
Airflow が接続を正しく読み取ることを確認する
Google Cloud CLI で connections get
Airflow CLI コマンドを実行して、接続が正しく読み取られていることを確認できます。たとえば、接続を Secret Manager に保存する場合、これによって、接続のすべてのパラメータが Airflow によって Secret から読み取られるかどうかを確認することができます。
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
connections get \
-- CONNECTION_NAME
以下のように置き換えます。
ENVIRONMENT_NAME
を環境の名前にする。LOCATION
は、環境が配置されているリージョン。CONNECTION_NAME
は、接続の名前に置き換えます。接続が Secret Manager に保存されている場合は、接続接頭辞のない接続名を使用します。たとえば、airflow-connections-example_connection_json
ではなくexample_connection
と指定します。
例:
gcloud composer environments run example-environment \
--location us-central1 \
connections get \
-- example_connection -o json
DAG の Airflow 接続を使用する
このセクションでは、DAG から接続にアクセスする方法について説明します。
Secret Manager 接続を使用する
接頭辞のない接続名を使用します。たとえば、シークレットの名前が airflow-connections-aws_s3
の場合は、aws_s3
を指定します。
transfer_dir_from_s3 = S3ToGCSOperator(
task_id='transfer_dir_from_s3',
aws_conn_id='aws_s3',
prefix='data-for-gcs',
bucket='example-s3-bucket-transfer-operators',
dest_gcs='gs://us-central1-example-environ-361f4221-bucket/data/from-s3/')
デフォルトの接続を Secret Manager に保存する場合は、接続名を省略できます。オペレーターによって使用されるデフォルトの接続名を取得するには、特定のオペレーターについて Airflow のドキュメントをご覧ください。たとえば、S3ToGCSOperator
Airflow オペレーターはデフォルトで aws_default
接続を使用します。このデフォルトの接続は、airflow-connections-aws_default
という名前のシークレットに保存できます。
Airflow に保存されている接続を使用する
Airflow で定義されているので、接続の名前を使用します。
transfer_dir_from_s3 = S3ToGCSOperator(
task_id='transfer_dir_from_s3',
aws_conn_id='aws_s3',
prefix='data-for-gcs',
bucket='example-s3-bucket-transfer-operators',
dest_gcs='gs://us-central1-example-environ-361f4221-bucket/data/from-s3/')
オペレーターに対してデフォルトの接続を使用するには、接続名を省略します。オペレーターによって使用されるデフォルトの接続名を取得するには、特定のオペレーターについて Airflow のドキュメントをご覧ください。たとえば、S3ToGCSOperator
Airflow オペレーターはデフォルトで aws_default
接続を使用します。
トラブルシューティング
環境が Secret Manager に保存されているシークレットにアクセスできない場合:
Secret Manager が環境で構成されていることを確認します。
Secret Manager の接続名が Airflow で使用されている接続に対応していることを確認します。たとえば、
example_connection
という名前の接続の場合、シークレット名はairflow-connections-example_connection
です。Airflow が接続を正しく読み取ることを確認する