测试 Airflow DAG

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

在将 DAG 部署到生产环境之前,您可以 执行 Airflow CLI 子命令 在执行 DAG 的同一上下文中解析 DAG 代码。

使用 Composer 本地开发 CLI 工具在本地测试 DAG

Composer Local Development CLI 工具简化了 通过运行 Airflow 为 Cloud Composer 2 开发 Apache Airflow DAG 环境此本地 Airflow 环境使用 特定 Cloud Composer 2 版本。

您可以使用此本地 Airflow 环境开发和测试 DAG,然后将 DAG 转移到测试 Cloud Composer 环境。本指南的其余部分介绍了如何在测试 Cloud Composer 环境中测试 DAG。

在创建 DAG 时进行测试

您可以在本地运行单个任务实例并查看日志输出。 查看输出可以找出语法和任务错误。 在本地进行测试不会检查依赖项或向数据库传递状态。

建议您将 DAG 放在测试环境的 data/test 文件夹中。

创建测试目录

在环境的存储桶中,创建一个测试目录并将 DAG 复制到该目录。

gcloud storage cp BUCKET_NAME/dags \
  BUCKET_NAME/data/test --recursive

替换以下内容:

  • BUCKET_NAME:与您的 Cloud Composer 环境关联的存储桶的名称。

示例:

gcloud storage cp gs://us-central1-example-environment-a12bc345-bucket/dags \
  gs://us-central1-example-environment-a12bc345-bucket/data/test --recursive

如需详细了解如何上传 DAG,请参阅 添加和更新 DAG

检查是否有语法错误

如需检查您上传到 /data/test 文件夹的 DAG 中是否存在语法错误,请输入以下 gcloud 命令:

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location ENVIRONMENT_LOCATION \
  dags list -- --subdir /home/airflow/gcs/data/test

替换以下内容:

  • ENVIRONMENT_NAME:环境的名称。
  • ENVIRONMENT_LOCATION:环境所在的区域。

检查任务错误

检查您上传到 /data/test 的 DAG 中是否存在特定于任务的错误 文件夹中,请运行以下 gcloud 命令:

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location ENVIRONMENT_LOCATION \
  tasks test -- --subdir /home/airflow/gcs/data/test \
  DAG_ID TASK_ID \
  DAG_EXECUTION_DATE

替换以下内容:

  • ENVIRONMENT_NAME:环境的名称。
  • ENVIRONMENT_LOCATION:环境所在的区域。
  • DAG_ID:DAG 的 ID。
  • TASK_ID:任务的 ID。
  • DAG_EXECUTION_DATE:DAG 的执行日期。此日期用于生成模板。无论您在此处指定什么日期,DAG 都会立即运行。

示例:

gcloud composer environments run \
  example-environment \
  --location us-central1 \
  tasks test -- --subdir /home/airflow/gcs/data/test \
  hello_world print_date 2021-04-22

更新和测试已部署的 DAG

如需在测试环境中测试 DAG 更新,请执行以下操作:

  1. 将您要更新的已部署 DAG 复制到 data/test
  2. 更新此 DAG。
  3. 测试此 DAG。
    1. 检查是否有语法错误
    2. 检查任务特有的错误
  4. 确保 DAG 成功运行。
  5. 在测试环境中关闭 DAG。
    1. 转到 Airflow 界面 > DAG 页面。
    2. 如果要修改的 DAG 始终处于运行状态,请关闭此 DAG。
    3. 如需加快完成正在运行的任务,请点击任务并将其标记为成功
  6. 将此 DAG 部署到您的生产环境。
    1. 在生产环境中关闭此 DAG。
    2. 将更新的 DAG 上传到生产环境中的 dags/ 文件夹。

DAG 测试常见问题解答

如何隔离生产环境和测试环境中运行的 DAG?

例如,Airflow 在 dags/ 文件夹中有一个全局源代码库,由所有 DAG 运行共享。您想要更新生产环境或测试环境中的源代码,但不希望正在运行的 DAG 受到干扰。

Airflow 不具备强大的 DAG 隔离机制。我们建议您分开维护 Cloud Composer 生产环境和测试环境,以防止测试 DAG 干扰生产 DAG。

当我从不同的 GitHub 分支运行集成测试时,如何避免产生 DAG 干扰?

您可以使用唯一的任务名称来防止这种干扰。例如,您可以使用分支名称作为任务 ID 的前缀。

进行 Airflow 集成测试的最佳做法是什么?

我们建议您使用专用环境来进行 Airflow 集成测试。如需确定 DAG 运行是否成功,一种方法是向 Cloud Storage 文件夹下的一个文件写入数据,然后检查您自己的集成测试用例中的内容。

如何与其他 DAG 贡献者高效协作?

每个贡献者可以在 data/ 文件夹中建立一个子目录进行开发。

Airflow 调度器或网络服务器不会自动提取添加到 data/ 文件夹中的 DAG

DAG 贡献者可以使用 gcloud composer environments run 命令和带有 --subdir 标志的 test 子命令指定贡献者的开发目录,来创建手动 DAG 运行。

例如:

gcloud composer environments run test-environment-name \
  tasks test -- dag-id task-id execution-date \
  --subdir /home/airflow/gcs/data/alice_dev

如何使部署和生产环境保持同步?

您可以按如下方式管理访问权限:

  • 对于身份验证,请使用服务账号

  • 对于访问权限控制,请使用 Cloud Identity and Access Management 和 Cloud Composer 角色和权限

如需从开发环境部署到生产环境,请确保以下事项:

  • 确保一致的配置,例如环境变量和 PyPI 软件包。

  • 确保一致的 DAG 参数。为避免硬编码,我们建议您使用 Airflow 宏和变量。

    例如:

    gcloud composer environments run test-environment-name \
      variables set -- DATA_ENDPOINT_KEY DATA_ENDPOINT_VALUE
    

后续步骤