测试 DAG(工作流)

在将 DAG 部署到生产环境之前,您可以执行 Airflow CLI 子命令,在执行 DAG 的同样上下文中解析 DAG 代码。

在创建 DAG 时进行测试

您可以在本地运行单个任务实例并查看日志输出。 查看输出可以找出语法和任务错误。 在本地进行测试不会检查依赖项或向数据库传递状态。

建议您将 DAG 放在测试环境的 data/test 文件夹中。

检查是否存在 PyPI 软件包错误

由于 PyPI 依赖项可能会导致与 Airflow 所需的依赖项冲突,因此我们建议您在 Airflow 工作器容器中本地安装所需的 Python 软件包并进行测试。

  1. 确定 Cloud Composer 环境的 GKE 集群

  2. 连接到该 GKE 集群

  3. 查看并选择一个 Airflow 工作器 Pod。

    kubectl get pods --all-namespaces

    查找名称类似于 airflow-worker-1a2b3c-x0yz 的 Pod。

  4. 连接到一个 Airflow 工作器容器中的远程 shell。

    kubectl -n composer-1-6-0-airflow-example-namespace \
      exec -it airflow-worker-1a2b3c-x0yz -c airflow-worker -- /bin/bash

    连接到远程 shell 时,命令提示符会显示 Airflow 工作器 pod 的名称,例如 airflow-worker-1a2b3c-x0yz:

  5. 对于在您的环境中运行的 Python 版本,请在 Airflow 工作器容器中安装 Python 软件包,例如:

    sudo python2 -m pip install "[PACKAGE]"

  6. 测试 Airflow 工作器容器中的兼容性。

    • 检查是否有语法错误。
      airflow list_dags
    • 显示此模板。
      airflow test --dry_run [DAG_ID] [TASK_ID] [EXECUTION_DATE]
    • 检查是否有任务错误。

      airflow test [DAG_ID] [TASK_ID] [EXECUTION_DATE]

  7. 从 Airflow 工作器容器中卸载 Python 软件包,例如:

    sudo python2 -m pip uninstall "[PACKAGE]"

检查是否有语法错误

  1. 在您环境的 Cloud Storage 存储分区中,创建一个测试目录。
  2. 如需检查语法错误,请输入以下 gcloud 命令:

    gcloud composer environments run ENVIRONMENT_NAME \
     --location LOCATION \
     list_dags -- -sd /home/airflow/gcs/data/test

    其中:

    • ENVIRONMENT_NAME 是环境的名称。
    • LOCATION 是环境所在的 Compute Engine 区域。

    例如:

    gcloud composer environments run \
     test-environment --location us-central1 \
     list_dags -- -sd /home/airflow/gcs/data/test

检查任务错误

如需检查是否存在任务特有的错误,请输入以下 gcloud 命令:

gcloud composer environments run ENVIRONMENT_NAME \
  --location LOCATION \
  test -- -sd /home/airflow/gcs/data/test DAG_ID \
  TASK_ID DAG_EXECUTION_DATE

其中:

  • ENVIRONMENT_NAME 是环境的名称。
  • LOCATION 是环境所在的 Compute Engine 区域。
  • DAG_ID 是 DAG 的 ID。
  • TASK_ID 是任务的 ID。
  • DAG_EXECUTION_DATE 是 DAG 的执行日期。此日期用于生成模板。无论您在此处指定什么日期,DAG 都会立即运行。

例如:

gcloud composer environments run test-environment --location us-central1 \
        -- -sd /home/airflow/gcs/data/test-dags hello_world print_date 2018-09-03

更新和测试已部署的 DAG

如需在测试环境中测试 DAG 更新,请执行以下操作:

  1. 将您要更新的已部署 DAG 复制到 data/test
  2. 更新此 DAG。
  3. 测试此 DAG。
    1. 检查是否有语法错误
    2. 检查任务特有的错误
  4. 确保 DAG 成功运行。
  5. 在测试环境中关闭 DAG。
    1. 转到 Airflow 界面 > DAG 页面。
    2. 如果要修改的 DAG 始终处于运行状态,请关闭此 DAG。
    3. 如需加快完成正在运行的任务,请点击任务并将其标记为成功
  6. 将此 DAG 部署到您的生产环境。
    1. 在生产环境中关闭此 DAG。
    2. 将更新的 DAG 上传到生产环境中的 dags/ 文件夹。

测试工作流程的常见问题解答

后续步骤