使用版本控制来测试、同步和部署 DAG

本指南介绍了如何创建 CI/CD 流程以将 Cloud Composer 环境中的 DAG 与版本控制系统同步。此流程包括以下步骤:

  1. 对 DAG 进行更改,然后将该更改推送到代码库中的开发分支
  2. 打开针对代码库主分支的拉取请求
  3. Cloud Build 运行单元测试以检查 DAG 是否有效
  4. 您的拉取请求已获批准并会合并到主分支
  5. Cloud Build 将开发 Cloud Composer 环境与这些新更改同步
  6. 验证 DAG 在开发环境中是否按预期运行
  7. 如果您的 DAG 按预期运行,请手动将 DAG 同步到生产 Cloud Composer 环境
显示流程步骤的架构图。提交前审核和 PR 审核位于 GitHub 部分,DAG 同步和手动 DAG 验证位于 GCP 部分。
图 1. 显示流程步骤的架构图(点击可放大)

目标

  • 使用 Cloud Build 运行自动 DAG 单元测试预提交检查。此检查会对 DAG 运行单元测试。
  • 将开发版 Composer 环境中的 DAG 与版本控制系统中的 DAG 同步

费用

本教程使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

完成本教程后,您可以删除所创建的资源以避免继续计费。如需了解详情,请参阅清理

Google Cloud 新用户可能有资格申请免费试用

准备工作

本教程假定您的 DAG 及其测试存储在 GitHub 代码库中。在包含示例代码库内容的 composer/cicd_sample 目录中,DAG 和测试文件存储在 dags 文件夹中,该文件包含要求文件、限制条件文件和 Cloud Build 配置文件。DAG 同步实用程序及其要求位于 utils 文件夹中。此结构可用于 Airflow 1、Airflow 2、Cloud Composer 1 和 Cloud Composer 2 环境。

此外,本教程假定您使用的是两种相同的 Cloud Composer 环境:开发环境和生产环境。

  1. 在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目

    转到“项目选择器”

  2. 确保您的 Cloud 项目已启用结算功能。了解如何检查项目是否已启用结算功能

  3. 如果您尚未创建开发和生产 Cloud Composer 环境,请立即创建。在本教程中,您可以创建 Cloud Composer 2 环境Cloud Composer 1 环境

准备环境

在本部分中,您将设置两个 Cloud Build 作业。第一个作业运行提交前检查,该检查会为您的 DAG 执行单元测试。第二个作业会在 DAG 合并到代码库中的主分支后与 Cloud Composer 环境同步。

添加预提交检查

添加单元测试

为您的 DAG 编写单元测试(如果您尚未这样做)。将这些测试与 DAG 一起保存,每个测试都带有 _test 后缀。例如,example_dag.py 中 DAG 的测试文件是 example_dag_test.py。这些测试将作为提交前检查在代码库中运行。

添加 Cloud Build 预提交检查

在本部分中,您将为预提交检查创建一个 Cloud Build YAML 配置和一个 Cloud Build 触发器。

创建 Cloud Build YAML 配置

创建 YAML 文件以配置名为 test-dags.cloudbuild.yaml 的 Cloud Build 作业。其中包含以下三个步骤:

  1. 安装 DAG 所需的依赖项
  2. 安装单元测试所需的依赖项
  3. 执行 DAG 测试

    
    steps:
      # install dependencies
      - name: python:3.8-slim
        entrypoint: pip
        args: ["install", "-r", "requirements.txt", "-c", "constraints.txt", "--user"]
    
      - name: python:3.8-slim
        entrypoint: pip
        args: ["install", "-r", "requirements-test.txt", "--user"]
    
      # run in python 3.8 which is latest version in Cloud Composer
      - name: python:3.8-slim
        entrypoint: python3.8
        args: ["-m", "pytest", "-s", "dags/"]
    
创建 Cloud Build 触发器

按照通过 GitHub 构建代码库指南创建具有以下配置的基于 GitHub 应用的触发器:

  • 名称:test-dags
  • 事件:拉取请求
  • 来源 - 代码库:选择您的代码库
  • 来源 - 基本分支:^main$(如果需要,请将 main 更改为代码库的基本分支的名称)
  • 来源 - 评论控制:非必要操作
  • 构建配置 - Cloud Build 配置文件:/test-dags.cloudbuild.yaml(构建文件的路径)

添加 DAG 同步作业

接下来,您将配置运行实用程序脚本的 Cloud Build 作业。此作业中的实用程序脚本将合并到您代码库中主分支的 DAG 与 Cloud Composer 环境同步。

添加 DAG 实用程序脚本

此实用程序脚本将代码库的 dags/ 目录中的所有 DAG 文件都复制到临时目录中,而忽略所有非 DAG Python 文件。该脚本随后会使用 Cloud Storage 客户端库将临时目录中的所有文件都上传到 Cloud Composer 环境中的 dags/ 文件夹

import argparse
import glob
import os
from shutil import copytree, ignore_patterns
import tempfile
from typing import List, Tuple

# Imports the Google Cloud client library
from google.cloud import storage

def _create_dags_list(dags_directory: str) -> Tuple[str, List[str]]:
    temp_dir = tempfile.mkdtemp()

    # ignore non-DAG Python files
    files_to_ignore = ignore_patterns("__init__.py", "*_test.py")

    # Copy everything but the ignored files to a temp directory
    copytree(dags_directory, f"{temp_dir}/", ignore=files_to_ignore, dirs_exist_ok=True)

    # The only Python files left in our temp directory are DAG files
    # so we can exclude all non Python files
    dags = glob.glob(f"{temp_dir}/*.py")
    return (temp_dir, dags)

def upload_dags_to_composer(dags_directory: str, bucket_name: str) -> None:
    temp_dir, dags = _create_dags_list(dags_directory)

    if len(dags) > 0:
        # Note - the GCS client library does not currently support batch requests on uploads
        # if you have a large number of files, consider using
        # the Python subprocess module to run gsutil -m cp -r on your dags
        # See https://cloud.google.com/storage/docs/gsutil/commands/cp for more info
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)

        for dag in dags:
            # Remove path to temp dir
            # if/else is a relative directory workaround for our tests
            current_directory = os.listdir()
            if "dags" in current_directory:
                dag = dag.replace(f"{temp_dir}/", "dags/")
            else:
                dag = dag.replace(f"{temp_dir}/", "../dags/")

            # Upload to your bucket
            blob = bucket.blob(dag)
            blob.upload_from_filename(dag)
            print(f"File {dag} uploaded to {bucket_name}/{dag}.")

    else:
        print("No DAGs to upload.")

if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
    )
    parser.add_argument(
        "--dags_directory",
        help="Relative path to the source directory containing your DAGs",
    )
    parser.add_argument(
        "--dags_bucket",
        help="Name of the DAGs bucket of your Composer environment without the gs:// prefix",
    )

    args = parser.parse_args()

    upload_dags_to_composer(args.dags_directory, args.dags_bucket)

添加 Cloud Build 作业以同步 DAG

在本部分中,您将为 DAG 同步作业创建一个 Cloud Build YAML 配置和一个 Cloud Build 触发器。

创建 Cloud Build YAML 配置

创建 YAML 文件以配置名为 add-dags-to-composer.cloudbuild.yaml 的 Cloud Build 作业。其中包含以下两个步骤:

  1. 安装 DAG 实用程序脚本所需的依赖项
  2. 运行实用程序脚本,将代码库中的 DAG 与 Cloud Composer 环境同步

    steps:
      # install dependencies
      - name: python
        entrypoint: pip
        args: ["install", "-r", "utils/requirements.txt", "--user"]
    
      # run
      - name: python
        entrypoint: python
        args: ["utils/add_dags_to_composer.py", "--dags_directory=${_DAGS_DIRECTORY}", "--dags_bucket=${_DAGS_BUCKET}"]
    
创建 Cloud Build 触发器

按照本指南的操作,使用以下配置创建基于 GitHub 应用的触发器:

  • 名称:add-dags-to-composer
  • 事件:推送至分支
  • 来源 - 代码库:选择您的代码库
  • 来源 - 基本分支:^main$(如果需要,请将 main 更改为代码库的基本分支的名称)
  • 来源 - 包含的文件过滤条件 (glob):dags/**
  • 构建配置 - Cloud Build 配置文件:/add-dags-to-composer.cloudbuild.yaml(构建文件的路径)

在高级配置中,添加两个替代变量

  • _DAGS_DIRECTORY - DAG 在代码库中所在的目录。如果您完全按照本教程操作,则目录为 dags/
  • _DAGS_BUCKET - 在开发 Cloud Composer 环境中包含 dags/ 文件夹的 Cloud Storage 存储桶(不带 gs:// 前缀)。例如:us-east1-my-env-1234ab56-bucket

综合应用

在本部分中,您将按照 DAG 开发流程操作以利用新创建的 Cloud Build 触发器。

运行预提交作业

为您的 main 分支创建一个拉取请求,以测试您的 build。您的检查位于页面底部。点击“详细信息”,然后选择“查看有关 Google Cloud Build 的更多详情”,以查看 Google Cloud Console 中的构建日志。

名为 test-dags 的 GitHub 检查的屏幕截图,其中的红色箭头指向括号中的项目名称
图 2. GitHub 上 Cloud Build 预提交检查状态的屏幕截图(点击可放大)

如果预提交检查失败,请参阅“解决构建失败的问题”部分

在开发 Cloud Composer 环境中验证 DAG 成功

拉取请求获得批准后,请将其合并到您的主要分支。使用 Google Cloud Console 查看构建结果。如果您有许多 Cloud Build 触发器,则可以按触发器名称 add-dags-to-composer 过滤构建。

Cloud Build 同步作业成功后,DAG 会显示在开发版 Cloud Composer 环境中。在该页面中,您可以验证 DAG 是否按预期运行。在该 DAG 按预期运行后,在 Cloud Composer 生产环境中,通过手动将该 DAG 文件添加dags/ 文件夹,以将其提升到生产环境。

如果您的 DAG 同步作业失败或 DAG 在开发 Cloud Composer 环境中未按预期运行,请参阅“解决构建失败的问题”部分

解决构建失败的问题

在本部分中,您将学习如何解决常见的构建失败场景。

如果预提交检查失败,该怎么办?

在拉取请求中,点击“详细信息”,然后选择“查看有关 Google Cloud Build 的更多详细信息”,以便在 Google Cloud Console 中查看构建日志。这些日志可帮助您调试 DAG 的问题。解决问题后,请提交修复程序并推送到您的分支。提交前检查将重新运行,您可以继续将日志用作调试工具。

如果我的 DAG 同步作业失败,该怎么办?

使用 Google Cloud Console 查看构建结果。如果您有许多 Cloud Build 触发器,则可以按触发器名称 add-dags-to-composer 过滤构建。检查构建作业的日志并解决错误。如果您在解决错误方面需要其他帮助,请使用支持渠道

如果我的 DAG 在 Cloud Composer 环境中无法正常运行,该怎么办?

如果您的 DAG 在开发 Cloud Composer 环境中未按预期运行,请不要手动将 DAG 提升到 Cloud Composer 生产环境。请改为执行以下操作之一:

  1. 使用破坏 DAG 的更改还原拉取请求,以将其恢复到更改之前的状态(这也将还原该拉取请求中的所有其他文件)
  2. 创建新的拉取请求,以手动还原对损坏的 DAG 进行的更改
  3. 创建新的拉取请求以修正 DAG 中的错误

完成以下任何步骤后将再次触发预提交检查,并在合并后执行 DAG 同步作业。

清理

为避免因本教程中使用的资源导致您的 Google Cloud 帐号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。

删除项目

  1. 在 Cloud Console 中,转到管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

删除各个资源

清理 Cloud Build 资源

使用 Google Cloud Console 或 Google Cloud CLI 删除两个构建触发器

  • test-dags
  • add-dags-to-composer

清理 Cloud Composer 资源

删除 Cloud Composer 环境

如果您为本教程创建了 Cloud Composer 环境,但不想继续使用,请删除环境以避免产生费用。如果您想保留环境,请移除 DAG

从 Cloud Composer 中移除 DAG

从开发和生产 Cloud Composer 环境中删除 DAG

后续步骤