管理 Airflow 连接

本页面介绍如何使用 Airflow 连接

Airflow 连接可让您从 Cloud Composer 环境访问 Google Cloud 项目中的资源。您可创建 Airflow 连接 ID 来存储信息(例如登录名和主机名),并且您的工作流会引用连接 ID。推荐使用 Airflow 连接来存储工作流中使用的密钥和凭据。

Airflow 连接让您可存储 Cloud Composer 环境与其他 API(例如 Google Cloud 项目、其他云提供商或第三方服务)通信所需的连接信息。

Airflow 连接可以存储详细信息,例如凭据、主机名或其他 API 参数。每个连接都有一个关联的 ID,您可以在工作流任务中使用该 ID 来引用预设详细信息。我们建议您使用 Airflow 连接为工作流任务存储密钥和凭据。

Google Cloud 连接类型支持 Google Cloud 集成

Fernet 密钥和安全连接

创建新环境时,Cloud Composer 会为该环境生成一个唯一的永久性 Fernet 密钥,并默认保护连接额外服务。您可以在 Airflow 配置中查看 fernet_key。如需了解连接的保护方式,请参阅保护连接

使用默认连接

默认情况下,Cloud Composer 会为 Google Cloud Platform 配置以下 Airflow 连接

  • bigquery_default
  • google_cloud_default
  • google_cloud_datastore_default
  • google_cloud_storage_default

您可以使用默认连接 ID 在 DAG 中使用这些连接。以下示例将 BigQueryOperator 与默认连接搭配使用。

task_default = bigquery_operator.BigQueryOperator(
    task_id='task_default_connection',
    bql='SELECT 1', use_legacy_sql=False)

您也可以在创建运算符时显式指定连接 ID。

task_explicit = bigquery_operator.BigQueryOperator(
    task_id='task_explicit_connection',
    bql='SELECT 1', use_legacy_sql=False,
    # Composer creates a 'google_cloud_default' connection by default.
    bigquery_conn_id='google_cloud_default')

访问其他项目中的资源

允许 Cloud Composer 环境访问 Google Cloud 项目中的资源的推荐方法是,使用默认连接,并为与您的环境关联的服务帐号分配适当的 Cloud Identity and Access Management 权限。

以下几个部分提供了有关如何在项目 ID your-composer-project 中部署的 Cloud Composer 环境中允许对 your-storage-project 中的 Cloud Storage 存储分区执行读写操作的示例。

确定与您的环境相关联的服务帐号

控制台

  1. 在 Cloud Console 中,打开环境页面。

    打开“环境”页面

  2. 名称列中,点击环境名称以打开环境详情页面。
  3. 注意服务帐号。此值为电子邮件地址,例如 service-account-name@your-composer-project.iam.gserviceaccount.com

gcloud

输入以下命令,并将 VARIABLES 替换为适当的值:

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format="get(config.nodeConfig.serviceAccount)" 

输出会显示地址,例如 service-account-name@your-composer-project.iam.gserviceaccount.com

向服务帐号授予适当的 IAM 权限

要允许对 your-storage-project 中的 Cloud Storage 存储分区执行读写操作,请将 roles/storage.objectAdmin 角色授予与您的 Cloud Composer 环境关联的服务帐号。

控制台

  1. 在存储项目的 IAM 和管理员页面中。

    打开“IAM 和管理员”页面

  2. 点击添加成员

  3. 添加成员对话框中,指定与 Cloud Composer 环境关联的服务帐号的完整电子邮件地址。

  4. 选择角色下拉列表中,选择适当的权限。对于此示例,选择 Storage > Object Admin 角色。

  5. 点击添加

gcloud

使用 gcloud projects add-iam-policy-binding 命令添加项目级 IAM 权限。将 VARIABLES 替换为适当的值:

gcloud projects add-iam-policy-binding YOUR_STORAGE_PROJECT \
    --member=serviceAccount:SERVICE_ACCOUNT_EMAIL \
    --role=roles/storage.objectAdmin 

授予适当的权限后,您可以访问 your-storage-project 项目中的资源,其中使用的默认 Airflow 连接与您用于访问 your-composer-project 项目中的资源相同。

创建新的 Airflow 连接

准备工作

向与 Cloud Composer 环境关联的服务帐号授予适当的 Cloud IAM 权限,并在 DAG 定义中使用默认连接。如果无法做到这一点,请按照本部分中的步骤执行操作。

创建与其他项目的连接

以下步骤提供了有关如何在项目 ID your-composer-project 中部署的 Cloud Composer 环境中允许对 your-storage-project 中的 Cloud Storage 存储分区执行读写操作的示例。

  1. your-storage-project 中创建服务帐号并下载 JSON 密钥:

    1. 在 Cloud Console 中,打开服务帐号页面。

      打开“服务帐号”页面

    2. 点击选择项目

    3. 选择您的项目,然后点击打开

    4. 点击创建服务帐号

    5. 输入服务帐号名称,选择要授予该服务帐号的角色,例如 Storage > Object Admin

    6. 选中提供新的私钥,然后点击保存

    7. 在纯文本编辑器中打开 JSON 文件。内容应如下所示:

      { "type": "service_account", "project_id": "your-storage-project", ... }

  2. 创建新连接:

    Airflow 界面

    1. 访问 Cloud Composer 环境的 Airflow 网页界面

    2. 在 Airflow 网页界面中,打开管理员 > 连接页面。

      Airflow 屏幕截图。打开“管理员连接”菜单。

    3. 要打开新的连接表单,请点击创建标签页。

      Airflow 屏幕截图。点击“创建”标签页。

    4. 创建新连接:

      1. 要选择连接 ID,请填写连接 ID 字段,例如 my_gcp_connection。在您的 DAG 定义文件中使用此 ID。
      2. 连接类型字段中,选择 Google Cloud Platform 选项。
      3. 输入项目 ID 的值,该值与您的服务帐号所属的项目相对应。
      4. 执行下列其中一项操作:

        1. 将您下载的服务帐号 JSON 密钥文件复制到您环境的 Cloud Storage 存储分区的 data/ 目录中。然后,在密钥文件路径中,输入 Airflow 工作器上的本地文件路径,以指向 JSON 密钥文件的位置,例如 /home/airflow/gcs/data/keyfile.json
        2. 密钥文件 JSON 中,复制您下载的服务帐号 JSON 密钥文件的内容

        通过 CLI 或网页界面访问 Airflow 连接的用户可以读取存储在 keyfile_dict 中的凭据。要保护这些凭据,我们建议您使用密钥文件路径并使用 Cloud Storage ACL 限制对密钥文件的访问。

      5. 范围字段中输入一个值。建议使用 https://www.googleapis.com/auth/cloud-platform 作为范围,并使用服务帐号的 Cloud IAM 权限来限制对 Google Cloud 资源的访问权限。

      6. 要创建连接,请点击保存

        Airflow 屏幕截图。点击“创建”标签页。

    gcloud

    输入以下命令:

    gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION connections -- --add \
    --conn_id=CONNECTION_ID --conn_type=google_cloud_platform \
    --conn_extra '{"extra\__google\_cloud\_platform\__CMD_ARGS": "...",
    "extra\__google\_cloud\_platform\__CMD_ARGS": "...", ...}'
    

    其中:

    • ENVIRONMENT_NAME 是环境的名称。
    • LOCATION 是环境所在的 Compute Engine 区域。
    • CONNECTION_ID 是连接的标识符。使用小写字符,并用下划线分隔字词。
    • CMD_ARGS 为以下内容:
      • project 是项目 ID。只有 extra__google_cloud_platform__project 是必需的。
      • key_path 是 Airflow 工作器上指向 JSON 密钥文件(如 /home/airflow/gcs/data/keyfile.json)的本地文件路径。如果提供,则还需要 scope。使用 key_pathkeyfile_dict,但不要同时使用这两者。
      • keyfile_dict 是一个 JSON 对象,用于指定您下载的 JSON 密钥文件的内容。如果提供,则还需要 scope。使用 keyfile_dictkey_path,但不要同时使用这两者。通过 CLI 或网页界面访问 Airflow 连接的用户可以读取存储在 keyfile_dict 中的凭据。为保护这些凭据,我们建议您使用 key_path 并应用 Cloud Storage ACL 来限制对密钥文件的访问。
      • scope 是 OAuth 范围的逗号分隔列表。

    例如:

    gcloud composer environments run test-environment \
     --location us-central1 connections -- --add \
     --conn_id=my_gcp_connection --conn_type=google_cloud_platform \
     --conn_extra '{"extra\__google\_cloud\_platform\__project": "your-storage-project",
     "extra\__google\_cloud\_platform\__key_path": "/home/airflow/gcs/data/keyfile.json",
     "extra\__google\_cloud\_platform\__scope": "https://www.googleapis.com/auth/cloud-platform"}'

使用新的 Airflow 连接

要使用您创建的连接,请在构建 Google Cloud Airflow 操作器时将其设置为相应的连接 ID 参数。

task_custom = bigquery_operator.BigQueryOperator(
    task_id='task_custom_connection',
    bql='SELECT 1', use_legacy_sql=False,
    # Set a connection ID to use a connection that you have created.
    bigquery_conn_id='my_gcp_connection')

配置与外部数据库的连接

Cloud Composer 在您的环境中提供默认 Cloud SQL 代理,以通过应用、客户端或其他 Google Cloud 服务远程授权对该环境的 Cloud SQL 数据库的访问权限。

要将 Cloud Composer 连接到外部数据库,例如 SQL 数据库或专用 IP Cloud SQL 实例,您必须将 SQL 代理 Pod (yaml) 部署到您环境的 GKE 集群中。

部署新的 SQL 代理服务后,与外部数据库的连接将源自环境的 GKE 集群。如需让 Airflow 网络服务器访问外部数据库(例如采用 Airflow 连接形式),应该可以从网络服务器访问 SQL 代理服务。为此,您可以使用以下选项: