管理 Airflow 连接

Cloud Composer 1 |Cloud Composer 2 |Cloud Composer 3

本页将介绍如何管理 Airflow 连接 并从 DAG 访问它们。

Airflow 连接简介

Aiflow 连接会存储凭据和其他连接信息,例如 用户名、连接字符串和密码。您的 DAG 使用连接来 通信和访问 Google Cloud 和其他服务中的资源 从 DAG 中获取。

DAG 中的 Airflow 操作器为 运算符,您也可以指定自定义连接名称。

关于连接安全性

大多数 Airflow 操作器都不直接接受凭据。而是使用 Airflow 连接。

当您创建新环境时,Cloud Composer 会生成 用于环境的唯一永久性 fernet 密钥,并保护连接额外项 默认情况。您可以在以下位置的配置页面中查看 fernet_keyAirflow 界面

若要详细了解如何保护连接和密码的安全,请访问 Airflow,请参见 保护连接遮盖敏感数据

关于连接类型

Airflow 使用不同类型的连接来连接到特定服务。 例如, Google Cloud 连接类型 连接到 Google Cloud 中的其他服务。再举一例,S3 连接 类型连接到 Amazon S3 存储桶。

如需向 Airflow 添加连接类型,请执行以下操作: 安装该连接类型的 PyPI 软件包。 您的环境中已预安装一些软件包。对于 例如,您可以使用 apache-airflow-providers-google 中的连接 软件包,而无需安装自定义 PyPI 软件包。

预配置的连接

Cloud Composer 会在您的 环境您可以使用这些连接访问项目中的资源 而无需对其进行配置

  • google_cloud_default
  • bigquery_default
  • google_cloud_datastore_default
  • google_cloud_storage_default

在 Secret Manager 中添加连接

您可以将连接存储在 Secret Manager 中,无需添加 将其发送到 Airflow我们建议在存储凭据和 其他敏感信息。

如需在 Secret Manager 中添加连接,请执行以下操作:

  1. 为您的环境配置 Secret Manager

  2. 添加一个密钥,名称为 与连接的模式匹配。

    例如 airflow-connections-example_connection。在 DAG 中,使用不带前缀的连接名称:example_connection

  3. 为连接添加参数:

    JSON 格式

    将您的连接的 JSON 表示形式添加为 密钥。例如:

    {
      "conn_type": "mysql",
      "host": "example.com",
      "login": "login",
      "password": "password",
      "port": "9000"
    }
    

    如需详细了解 JSON 连接格式,请参阅 Airflow 文档

    URI 格式

    将您的连接的 URI 表示形式添加为 Secret:

    • 该密钥必须存储 URI 表示法 连接。例如 mysql://login:password@example.com:9000

    • URI 必须经过网址编码。对于 例如,包含空格符号的密码必须经过网址编码 如下所示: mysql://login:secret%20password@example.com:9000.

    Airflow 具有 便利方法 用于生成连接 URI示例:如何对复杂网址进行编码 中提供了包含 JSON extra 的 Airflow 文档

  4. 检查所有连接参数是否 从 Secret Manager 中正确读取

在 Airflow 中添加连接

除了将连接存储在 您可以将它们存储在 Airflow 中。

如需在 Airflow 中添加连接,请执行以下操作:

Airflow CLI

运行 connections add Airflow CLI Google Cloud CLI 命令例如:

gcloud composer environments run ENVIRONMENT_NAME \
  --location LOCATION \
  connections add -- \
    --conn-type "mysql" \
    --conn-host "example.com" \
    --conn-port "9000" \
    --conn-login "login" \
    --conn-password "password" \
    example_connection

您还可以使用 --conn-uri 参数:

gcloud composer environments run ENVIRONMENT_NAME \
  --location LOCATION \
  connections add -- \
    --conn-uri "mysql://login:password@example.com:9000" \
    example_connection

替换以下内容:

  • ENVIRONMENT_NAME:您的环境的名称。
  • LOCATION:环境所在的区域。

Airflow 界面

按照有关创建连接的 Airflow 文档进行操作。

检查 Airflow 是否正确读取了连接

您可以通过执行以下操作来运行 connections get Airflow CLI 命令: Google Cloud CLI 连接已被正确读取。例如,如果您将连接存储在 这样就可以检查 Airflow 从 Secret 读取连接的参数。

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    connections get \
    -- CONNECTION_NAME

您需要将其中的:

  • ENVIRONMENT_NAME 替换为环境的名称。
  • LOCATION 替换为环境所在的区域。
  • CONNECTION_NAME 替换为连接的名称。如果您的连接是 使用存储在 Secret Manager 中的连接名称,而不使用 连接前缀。例如,请指定 example_connection,而不是 airflow-connections-example_connection_json.

示例:

gcloud composer environments run example-environment \
    --location us-central1 \
    connections get \
    -- example_connection -o json

在 DAG 中使用 Airflow 连接

本部分介绍如何从 DAG 访问连接。

使用 Secret Manager 连接

使用不带前缀的连接名称。例如,如果您的密钥 名为 airflow-connections-aws_s3,请指定 aws_s3.

transfer_dir_from_s3 = S3ToGCSOperator(
    task_id='transfer_dir_from_s3',
    aws_conn_id='aws_s3',
    prefix='data-for-gcs',
    bucket='example-s3-bucket-transfer-operators',
    dest_gcs='gs://us-central1-example-environ-361f4221-bucket/data/from-s3/')

如果您将默认连接存储在 Secret Manager 中,则可以省略 。如需获取特定操作器,请参阅 Airflow 文档 运营商使用的默认连接名称。例如, S3ToGCSOperator Airflow 操作使用 aws_default 连接, 默认值。您可以将这个默认连接存储在名为 airflow-connections-aws_default.

使用存储在 Airflow 中的连接

使用在 Airflow 中定义的连接名称:

transfer_dir_from_s3 = S3ToGCSOperator(
    task_id='transfer_dir_from_s3',
    aws_conn_id='aws_s3',
    prefix='data-for-gcs',
    bucket='example-s3-bucket-transfer-operators',
    dest_gcs='gs://us-central1-example-environ-361f4221-bucket/data/from-s3/')

要使用运算符的默认连接,请省略连接名称。请参阅 特定操作器(用于获取默认连接)的 Airflow 文档 运算符使用的名称。例如,S3ToGCSOperator Airflow 运算符 默认使用 aws_default 连接。

问题排查

如果您的环境无法访问存储在 Secret Manager 中的 Secret,请执行以下操作:

  1. 确保在 环境

  2. 检查 Secret Manager 中的连接名称是否与 连接到 Airflow 使用的连接。例如,对于名为 example_connection,密钥名称为 airflow-connections-example_connection.

  3. 检查 Airflow 是否正确读取连接

后续步骤