Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本页介绍了如何在环境中管理 Airflow 连接,以及如何从 DAG 访问这些连接。
Airflow 连接简介
Airflow 连接会存储凭据和其他连接信息,例如用户名、连接字符串和密码。DAG 使用连接来在 DAG 中进行通信和访问 Google Cloud 及其他服务中的资源。
DAG 中的 Airflow 运算符可以使用运算符的默认连接,也可以指定自定义连接名称。
连接安全性简介
大多数 Airflow 操作符都不直接接受凭据。而是使用 Airflow 连接。
创建新环境时,Cloud Composer 会为该环境生成一个唯一的永久性 Fernet 密钥,并默认保护连接额外服务。您可以在 Airflow 界面的配置页面中查看 fernet_key
。
如需详细了解 Airflow 中如何保护连接和密码,请参阅 Airflow 文档中的保护连接和遮盖敏感数据。
连接类型简介
Airflow 使用不同类型的连接来连接到特定服务。例如,Google Cloud 连接类型用于连接到 Google Cloud中的其他服务。再举一个例子,S3 连接类型用于连接到 Amazon S3 存储分区。
如需向 Airflow 添加连接类型,请安装具有相应连接类型的 PyPI 软件包。您的环境中预安装了一些软件包。例如,您可以使用 apache-airflow-providers-google
软件包中的连接,而无需安装自定义 PyPI 软件包。
预配置的连接
Cloud Composer 会在您的环境中配置以下默认连接。您可以使用这些连接来访问项目中的资源,而无需进行配置。
google_cloud_default
bigquery_default
google_cloud_datastore_default
google_cloud_storage_default
在 Secret Manager 中添加连接
您可以在 Secret Manager 中存储连接,而无需将其添加到 Airflow。我们建议您在存储凭据和其他敏感信息时使用此方法。
如需在 Secret Manager 中添加连接,请执行以下操作:
添加一个 Secret,其名称与连接的模式匹配。
例如
airflow-connections-example_connection
。在 DAG 中,请使用不带前缀的连接名称:example_connection
。为连接添加参数:
JSON 格式
将连接的 JSON 表示法添加为密钥的值。例如:
{ "conn_type": "mysql", "host": "example.com", "login": "login", "password": "password", "port": "9000" }
如需详细了解 JSON 连接格式,请参阅 Airflow 文档。
URI 格式
将连接的 URI 表示法添加为密钥的值:
Secret 必须存储连接的 URI 表示法。例如
mysql://login:password@example.com:9000
。URI 必须进行网址编码。例如,包含空格符号的密码必须按照以下格式进行网址编码:
mysql://login:secret%20password@example.com:9000
。
Airflow 提供一种生成连接 URI 的便捷方法。Airflow 文档中提供了有关如何使用 JSON extras 对复杂网址进行编码的示例。
在 Airflow 中添加连接
除了在 Secret Manager 中存储连接之外,您还可以在 Airflow 中存储连接。
如需在 Airflow 中添加连接,请执行以下操作:
Airflow CLI
使用 Google Cloud CLI 运行 connections add
Airflow CLI 命令。例如:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
connections add -- \
--conn-type "mysql" \
--conn-host "example.com" \
--conn-port "9000" \
--conn-login "login" \
--conn-password "password" \
example_connection
您还可以使用 --conn-uri
参数:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
connections add -- \
--conn-uri "mysql://login:password@example.com:9000" \
example_connection
替换以下内容:
ENVIRONMENT_NAME
:您的环境的名称。LOCATION
:环境所在的区域。
Airflow 界面
按照 Airflow 文档中关于创建连接的部分操作。
检查 Airflow 是否正确读取连接
您可以通过 Google Cloud CLI 运行 connections get
Airflow CLI 命令,以检查是否正确读取了连接。例如,如果您在 Secret Manager 中存储连接,则可以通过此方法检查 Airflow 是否从 Secret 读取了连接的所有参数。
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
connections get \
-- CONNECTION_NAME
您需要进行如下替换:
ENVIRONMENT_NAME
替换为环境的名称。LOCATION
替换为环境所在的区域。- 将
CONNECTION_NAME
替换为连接的名称。如果您的连接存储在 Secret Manager 中,请使用不带连接前缀的连接名称。例如,指定example_connection
而不是airflow-connections-example_connection_json
。
示例:
gcloud composer environments run example-environment \
--location us-central1 \
connections get \
-- example_connection -o json
在 DAG 中使用 Airflow 连接
本部分介绍了如何从 DAG 访问连接。
使用 Secret Manager 连接
使用不带前缀的连接名称。例如,如果您的 Secret 名为 airflow-connections-aws_s3
,请指定 aws_s3
。
transfer_dir_from_s3 = S3ToGCSOperator(
task_id='transfer_dir_from_s3',
aws_conn_id='aws_s3',
prefix='data-for-gcs',
bucket='example-s3-bucket-transfer-operators',
dest_gcs='gs://us-central1-example-environ-361f4221-bucket/data/from-s3/')
如果您在 Secret Manager 中存储默认连接,则可以省略连接名称。如需获取某个运算符使用的默认连接名称,请参阅 Airflow 文档中关于该运算符的部分。例如,S3ToGCSOperator
Airflow 操作器默认使用 aws_default
连接。您可以将此默认连接存储在名为 airflow-connections-aws_default
的 Secret 中。
使用存储在 Airflow 中的连接
使用 Airflow 中定义的连接名称:
transfer_dir_from_s3 = S3ToGCSOperator(
task_id='transfer_dir_from_s3',
aws_conn_id='aws_s3',
prefix='data-for-gcs',
bucket='example-s3-bucket-transfer-operators',
dest_gcs='gs://us-central1-example-environ-361f4221-bucket/data/from-s3/')
如需为运算符使用默认连接,请省略连接名称。如需获取运算符使用的默认连接名称,请参阅特定运算符的 Airflow 文档。例如,S3ToGCSOperator
Airflow 运算符默认使用 aws_default
连接。
问题排查
如果您的环境无法访问存储在 Secret Manager 中的 Secret,请执行以下操作:
确保在您的环境中配置了 Secret Manager。
检查 Secret Manager 中的连接名称是否与 Airflow 使用的连接相对应。例如,对于名为
example_connection
的连接,Secret 名称为airflow-connections-example_connection
。检查 Airflow 是否正确读取了连接。