Cloud Composer 1 |Cloud Composer 2 |Cloud Composer 3
本页将介绍如何管理 Airflow 连接 并从 DAG 访问它们。
Airflow 连接简介
Aiflow 连接会存储凭据和其他连接信息,例如 用户名、连接字符串和密码。您的 DAG 使用连接来 通信和访问 Google Cloud 和其他服务中的资源 从 DAG 中获取。
DAG 中的 Airflow 操作器为 运算符,您也可以指定自定义连接名称。
关于连接安全性
大多数 Airflow 操作器都不直接接受凭据。而是使用 Airflow 连接。
当您创建新环境时,Cloud Composer 会生成
用于环境的唯一永久性 fernet 密钥,并保护连接额外项
默认情况。您可以在以下位置的配置页面中查看 fernet_key
:
Airflow 界面。
若要详细了解如何保护连接和密码的安全,请访问 Airflow,请参见 保护连接 和 遮盖敏感数据。
关于连接类型
Airflow 使用不同类型的连接来连接到特定服务。 例如, Google Cloud 连接类型 连接到 Google Cloud 中的其他服务。再举一例,S3 连接 类型连接到 Amazon S3 存储桶。
如需向 Airflow 添加连接类型,请执行以下操作:
安装该连接类型的 PyPI 软件包。
您的环境中已预安装一些软件包。对于
例如,您可以使用 apache-airflow-providers-google
中的连接
软件包,而无需安装自定义 PyPI 软件包。
预配置的连接
Cloud Composer 会在您的 环境您可以使用这些连接访问项目中的资源 而无需对其进行配置
google_cloud_default
bigquery_default
google_cloud_datastore_default
google_cloud_storage_default
在 Secret Manager 中添加连接
您可以将连接存储在 Secret Manager 中,无需添加 将其发送到 Airflow我们建议在存储凭据和 其他敏感信息。
如需在 Secret Manager 中添加连接,请执行以下操作:
添加一个密钥,名称为 与连接的模式匹配。
例如
airflow-connections-example_connection
。在 DAG 中,使用不带前缀的连接名称:example_connection
。为连接添加参数:
JSON 格式
将您的连接的 JSON 表示形式添加为 密钥。例如:
{ "conn_type": "mysql", "host": "example.com", "login": "login", "password": "password", "port": "9000" }
如需详细了解 JSON 连接格式,请参阅 Airflow 文档。
URI 格式
将您的连接的 URI 表示形式添加为 Secret:
该密钥必须存储 URI 表示法 连接。例如
mysql://login:password@example.com:9000
。URI 必须经过网址编码。对于 例如,包含空格符号的密码必须经过网址编码 如下所示:
mysql://login:secret%20password@example.com:9000
.
Airflow 具有 便利方法 用于生成连接 URI示例:如何对复杂网址进行编码 中提供了包含 JSON extra 的 Airflow 文档。
检查所有连接参数是否 从 Secret Manager 中正确读取。
在 Airflow 中添加连接
除了将连接存储在 您可以将它们存储在 Airflow 中。
如需在 Airflow 中添加连接,请执行以下操作:
Airflow CLI
运行
connections add
Airflow CLI
Google Cloud CLI 命令例如:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
connections add -- \
--conn-type "mysql" \
--conn-host "example.com" \
--conn-port "9000" \
--conn-login "login" \
--conn-password "password" \
example_connection
您还可以使用 --conn-uri
参数:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
connections add -- \
--conn-uri "mysql://login:password@example.com:9000" \
example_connection
替换以下内容:
ENVIRONMENT_NAME
:您的环境的名称。LOCATION
:环境所在的区域。
Airflow 界面
按照有关创建连接的 Airflow 文档进行操作。
检查 Airflow 是否正确读取了连接
您可以通过执行以下操作来运行 connections get
Airflow CLI 命令:
Google Cloud CLI
连接已被正确读取。例如,如果您将连接存储在
这样就可以检查
Airflow 从 Secret 读取连接的参数。
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
connections get \
-- CONNECTION_NAME
您需要将其中的:
ENVIRONMENT_NAME
替换为环境的名称。LOCATION
替换为环境所在的区域。- 将
CONNECTION_NAME
替换为连接的名称。如果您的连接是 使用存储在 Secret Manager 中的连接名称,而不使用 连接前缀。例如,请指定example_connection
,而不是airflow-connections-example_connection_json
.
示例:
gcloud composer environments run example-environment \
--location us-central1 \
connections get \
-- example_connection -o json
在 DAG 中使用 Airflow 连接
本部分介绍如何从 DAG 访问连接。
使用 Secret Manager 连接
使用不带前缀的连接名称。例如,如果您的密钥
名为 airflow-connections-aws_s3
,请指定
aws_s3
.
transfer_dir_from_s3 = S3ToGCSOperator(
task_id='transfer_dir_from_s3',
aws_conn_id='aws_s3',
prefix='data-for-gcs',
bucket='example-s3-bucket-transfer-operators',
dest_gcs='gs://us-central1-example-environ-361f4221-bucket/data/from-s3/')
如果您将默认连接存储在 Secret Manager 中,则可以省略
。如需获取特定操作器,请参阅 Airflow 文档
运营商使用的默认连接名称。例如,
S3ToGCSOperator
Airflow 操作使用 aws_default
连接,
默认值。您可以将这个默认连接存储在名为
airflow-connections-aws_default
.
使用存储在 Airflow 中的连接
使用在 Airflow 中定义的连接名称:
transfer_dir_from_s3 = S3ToGCSOperator(
task_id='transfer_dir_from_s3',
aws_conn_id='aws_s3',
prefix='data-for-gcs',
bucket='example-s3-bucket-transfer-operators',
dest_gcs='gs://us-central1-example-environ-361f4221-bucket/data/from-s3/')
要使用运算符的默认连接,请省略连接名称。请参阅
特定操作器(用于获取默认连接)的 Airflow 文档
运算符使用的名称。例如,S3ToGCSOperator
Airflow 运算符
默认使用 aws_default
连接。
问题排查
如果您的环境无法访问存储在 Secret Manager 中的 Secret,请执行以下操作:
确保在 环境
检查 Secret Manager 中的连接名称是否与 连接到 Airflow 使用的连接。例如,对于名为
example_connection
,密钥名称为airflow-connections-example_connection
.检查 Airflow 是否正确读取连接。