管理 Airflow 连接

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

本页面介绍如何管理环境中的 Airflow 连接以及如何从 DAG 访问这些连接。

Airflow 连接简介

Aiflow 连接会存储凭据和其他连接信息,例如用户名、连接字符串和密码。DAG 通过连接来通信和访问 Google Cloud 中的资源以及通过 DAG 访问的其他服务。

DAG 中的 Airflow 操作器要么使用该运算符的默认连接,要么由您指定自定义连接名称。

关于连接安全性

大多数 Airflow 操作器都不直接接受凭据。而是会使用 Airflow 连接。

当您创建新环境时,Cloud Composer 会为环境生成一个唯一的永久铁网络密钥,并默认保护连接额外项。您可以在 Airflow 界面配置页面查看 fernet_key

如需详细了解如何在 Airflow 中保护连接和密码,请参阅保护连接遮盖敏感数据

关于连接类型

Airflow 使用不同类型的连接来连接到特定服务。 例如,Google Cloud 连接类型可连接到 Google Cloud 中的其他服务。再举一例,S3 连接类型连接到 Amazon S3 存储桶。

如需向 Airflow 添加连接类型,请安装使用该连接类型的 PyPI 软件包。您的环境中已预安装一些软件包。例如,您可以使用来自 apache-airflow-providers-google 软件包的连接,而无需安装自定义 PyPI 软件包。

预配置的连接

Cloud Composer 会在您的环境中配置以下默认连接。您可以使用这些连接访问项目中的资源,而无需进行配置。

  • google_cloud_default
  • bigquery_default
  • google_cloud_datastore_default
  • google_cloud_storage_default

在 Secret Manager 中添加连接

您可以将连接存储在 Secret Manager 中,而无需将其添加到 Airflow。我们建议在存储凭据和其他敏感信息时使用此方法。

如需在 Secret Manager 中添加连接,请执行以下操作:

  1. 为您的环境配置 Secret Manager

  2. 添加一个密钥,其名称与连接模式匹配。

    例如 airflow-connections-example_connection。在 DAG 中,使用不带前缀的连接名称:example_connection

  3. 为连接添加参数:

    JSON 格式

    将您的连接的 JSON 表示法添加为 Secret 的值。例如:

    {
      "conn_type": "mysql",
      "host": "example.com",
      "login": "login",
      "password": "password",
      "port": "9000"
    }
    

    如需详细了解 JSON 连接格式,请参阅 Airflow 文档

    URI 格式

    将您的连接的 URI 表示形式添加为 Secret 的值:

    • 该 Secret 必须存储连接的 URI 表示法。例如 mysql://login:password@example.com:9000

    • URI 必须经过网址编码。例如,包含空格符号的密码必须按如下方式进行网址编码:mysql://login:secret%20password@example.com:9000

    Airflow 提供了一种用于生成连接 URI 的便捷方法Airflow 文档提供了如何使用 JSON extra 对复杂网址进行编码的示例。

  4. 检查是否从 Secret Manager 正确读取了所有连接参数

在 Airflow 中添加连接

除了将连接存储在 Secret Manager 中,您也可以将连接存储在 Airflow 中。

如需在 Airflow 中添加连接,请执行以下操作:

Airflow CLI

通过 Google Cloud CLI 运行 connections add Airflow CLI 命令。例如:

gcloud composer environments run ENVIRONMENT_NAME \
  --location LOCATION \
  connections add -- \
    --conn-type "mysql" \
    --conn-host "example.com" \
    --conn-port "9000" \
    --conn-login "login" \
    --conn-password "password" \
    example_connection

您还可以使用 --conn-uri 参数:

gcloud composer environments run ENVIRONMENT_NAME \
  --location LOCATION \
  connections add -- \
    --conn-uri "mysql://login:password@example.com:9000" \
    example_connection

请替换以下内容:

  • ENVIRONMENT_NAME:您的环境的名称。
  • LOCATION:环境所在的区域。

Airflow 界面

按照有关创建连接的 Airflow 文档进行操作。

检查 Airflow 是否正确读取了连接

您可以通过 Google Cloud CLI 运行 connections get Airflow CLI 命令,以检查是否已正确读取连接。例如,如果您将连接存储在 Secret Manager 中,则可以检查 Airflow 是否从 Secret 读取了连接的所有参数。

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    connections get \
    -- CONNECTION_NAME

您需要在其中:

  • ENVIRONMENT_NAME 替换为环境的名称。
  • LOCATION 替换为环境所在的区域。
  • CONNECTION_NAME 替换为连接的名称。如果您的连接存储在 Secret Manager 中,请使用不带连接前缀的连接名称。例如,指定 example_connection 而不是 airflow-connections-example_connection_json

示例:

gcloud composer environments run example-environment \
    --location us-central1 \
    connections get \
    -- example_connection -o json

在 DAG 中使用 Airflow 连接

本部分介绍如何从 DAG 访问连接。

使用 Secret Manager 连接

使用不带前缀的连接名称。例如,如果您的 Secret 名为 airflow-connections-aws_s3,请指定 aws_s3

transfer_dir_from_s3 = S3ToGCSOperator(
    task_id='transfer_dir_from_s3',
    aws_conn_id='aws_s3',
    prefix='data-for-gcs',
    bucket='example-s3-bucket-transfer-operators',
    dest_gcs='gs://us-central1-example-environ-361f4221-bucket/data/from-s3/')

如果您将默认连接存储在 Secret Manager 中,则可以省略连接名称。如需获取特定操作器的默认连接名称,请参阅 Airflow 文档。例如,S3ToGCSOperator Airflow 操作器默认使用 aws_default 连接。您可以将此默认连接存储在名为 airflow-connections-aws_default 的 Secret 中。

使用存储在 Airflow 中的连接

使用在 Airflow 中定义的连接名称:

transfer_dir_from_s3 = S3ToGCSOperator(
    task_id='transfer_dir_from_s3',
    aws_conn_id='aws_s3',
    prefix='data-for-gcs',
    bucket='example-s3-bucket-transfer-operators',
    dest_gcs='gs://us-central1-example-environ-361f4221-bucket/data/from-s3/')

要使用运算符的默认连接,请省略连接名称。如需获取特定操作使用的默认连接名称,请参阅 Airflow 文档。例如,S3ToGCSOperator Airflow 操作器默认使用 aws_default 连接。

问题排查

如果您的环境无法访问存储在 Secret Manager 中的 Secret,请执行以下操作:

  1. 确保您的环境中已配置 Secret Manager。

  2. 请检查 Secret Manager 中的连接名称是否与 Airflow 使用的连接相对应。例如,对于名为 example_connection 的连接,密钥名称为 airflow-connections-example_connection

  3. 检查 Airflow 是否正确读取连接

后续步骤