从 GitHub 测试、同步和部署您的 DAG

Cloud Composer 1 |Cloud Composer 2 |Cloud Composer 3

本指南介绍如何创建 CI/CD 流水线以进行测试、同步和 从 GitHub 将 DAG 部署到 Cloud Composer 环境 存储库

如果您只想同步其他服务中的数据,请参阅从其他服务转移数据

CI/CD 流水线概览

显示流程步骤的架构图。提交前检查和 PR 审核在 GitHub 部分,DAG 同步和手动 DAG 验证在 Google Cloud 部分。
图 1. 显示流程步骤的架构图(点击可放大)

用于测试、同步和部署 DAG 的 CI/CD 流水线包含以下步骤:

  1. 对 DAG 进行更改,然后将该更改推送到代码库中的开发分支。

  2. 您针对代码库的主分支打开一个拉取请求。

  3. Cloud Build 会运行单元测试,以检查 DAG 是否有效。

  4. 您的拉取请求已获批准并会合并到代码库的主分支。

  5. Cloud Build 将开发 Cloud Composer 环境与这些新更改同步。

  6. 您可以验证 DAG 在您的开发中是否按预期运行 环境

  7. 如果您的 DAG 按预期运行,请将该 DAG 上传到您的生产环境中 Cloud Composer 环境。

目标

准备工作

  • 本指南假定您使用的是两个完全相同的 Cloud Composer 环境:一个开发环境和一个 生产环境

    在本指南中,您将仅为开发环境配置 CI/CD 流水线。确保您使用的环境不是生产环境。

  • 本指南假定您已将 DAG 及其测试存储在 GitHub 代码库中。

    通过 CI/CD 流水线示例 演示了示例代码库的内容。DAG 和测试存储在 dags/ 目录中,要求文件、约束条件文件和 Cloud Build 配置文件存储在顶层。DAG 同步实用程序及其要求位于 utils 目录中。

    此结构可用于 Airflow 1、Airflow 2、Cloud Composer 1 和 Cloud Composer 2 环境。

创建提交前检查作业和单元测试

第一个 Cloud Build 作业会运行提交前检查,该检查会为 DAG 执行单元测试。

添加单元测试

如果您尚未为 DAG 编写单元测试,请立即编写。将这些测试与 代码库中的 DAG,每个 DAG 都具有 _test 后缀。例如,example_dag.py 中 DAG 的测试文件为 example_dag_test.py。这些是 这些测试是在您的代码库中作为提交前检查运行的

为提交前检查创建 Cloud Build YAML 配置

在代码库中,创建一个名为 test-dags.cloudbuild.yaml 的 YAML 文件,用于配置 Cloud Build 作业以进行预提交检查。其中包含三个步骤:

  1. 安装 DAG 所需的依赖项。
  2. 安装单元测试所需的依赖项。
  3. 执行 DAG 测试。

steps:
  # install dependencies
  - name: python:3.8-slim
    entrypoint: pip
    args: ["install", "-r", "requirements.txt", "-c", "constraints.txt", "--user"]

  - name: python:3.8-slim
    entrypoint: pip
    args: ["install", "-r", "requirements-test.txt", "--user"]

  # run in python 3.8 which is latest version in Cloud Composer
  - name: python:3.8-slim
    entrypoint: python3.8
    args: ["-m", "pytest", "-s", "dags/"]

为提交前检查创建 Cloud Build 触发器

按照在 GitHub 中构建代码库中的说明操作 创建具有以下配置的基于 GitHub 应用的触发器的指南:

  • 名称test-dags

  • 事件:拉取请求

  • Source(来源)- Repository(代码库):选择您的代码库

  • 来源 - 基本分支:^main$(如有必要,请将 main 更改为代码库基本分支的名称)

  • 来源 - 评论控制:非必要操作

  • 构建配置 - Cloud Build 配置文件:/test-dags.cloudbuild.yaml(build 文件的路径)

创建 DAG 同步作业并添加 DAG 实用程序脚本

接下来,配置运行 DAG 实用程序脚本的 Cloud Build 作业。此作业中的实用程序脚本将合并到您代码库中主分支的 DAG 与 Cloud Composer 环境同步。

添加 DAG 实用程序脚本

将 DAG 实用程序脚本添加到您的代码库。此实用程序脚本会复制 dags/ 目录中的所有 DAG 文件 代码库复制到临时目录,并忽略所有非 DAG Python 文件。该脚本随后会使用 Cloud Storage 客户端库将临时目录中的所有文件都上传到 Cloud Composer 环境存储桶中的 dags/ 目录。

from __future__ import annotations

import argparse
import glob
import os
from shutil import copytree, ignore_patterns
import tempfile

# Imports the Google Cloud client library
from google.cloud import storage


def _create_dags_list(dags_directory: str) -> tuple[str, list[str]]:
    temp_dir = tempfile.mkdtemp()

    # ignore non-DAG Python files
    files_to_ignore = ignore_patterns("__init__.py", "*_test.py")

    # Copy everything but the ignored files to a temp directory
    copytree(dags_directory, f"{temp_dir}/", ignore=files_to_ignore, dirs_exist_ok=True)

    # The only Python files left in our temp directory are DAG files
    # so we can exclude all non Python files
    dags = glob.glob(f"{temp_dir}/*.py")
    return (temp_dir, dags)


def upload_dags_to_composer(
    dags_directory: str, bucket_name: str, name_replacement: str = "dags/"
) -> None:
    """
    Given a directory, this function moves all DAG files from that directory
    to a temporary directory, then uploads all contents of the temporary directory
    to a given cloud storage bucket
    Args:
        dags_directory (str): a fully qualified path to a directory that contains a "dags/" subdirectory
        bucket_name (str): the GCS bucket of the Cloud Composer environment to upload DAGs to
        name_replacement (str, optional): the name of the "dags/" subdirectory that will be used when constructing the temporary directory path name Defaults to "dags/".
    """
    temp_dir, dags = _create_dags_list(dags_directory)

    if len(dags) > 0:
        # Note - the GCS client library does not currently support batch requests on uploads
        # if you have a large number of files, consider using
        # the Python subprocess module to run gsutil -m cp -r on your dags
        # See https://cloud.google.com/storage/docs/gsutil/commands/cp for more info
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)

        for dag in dags:
            # Remove path to temp dir
            dag = dag.replace(f"{temp_dir}/", name_replacement)

            try:
                # Upload to your bucket
                blob = bucket.blob(dag)
                blob.upload_from_filename(dag)
                print(f"File {dag} uploaded to {bucket_name}/{dag}.")
            except FileNotFoundError:
                current_directory = os.listdir()
                print(
                    f"{name_replacement} directory not found in {current_directory}, you may need to override the default value of name_replacement to point to a relative directory"
                )
                raise

    else:
        print("No DAGs to upload.")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
    )
    parser.add_argument(
        "--dags_directory",
        help="Relative path to the source directory containing your DAGs",
    )
    parser.add_argument(
        "--dags_bucket",
        help="Name of the DAGs bucket of your Composer environment without the gs:// prefix",
    )

    args = parser.parse_args()

    upload_dags_to_composer(args.dags_directory, args.dags_bucket)

创建 Cloud Build YAML 配置以同步 DAG

在代码库中,创建一个名为 add-dags-to-composer.cloudbuild.yaml 的 YAML 文件,用于配置 Cloud Build 作业以同步 DAG。其中包含两个步骤:

  1. 安装 DAG 实用程序脚本所需的依赖项。

  2. 运行实用程序脚本,将代码库中的 DAG 与 Cloud Composer 环境同步。

steps:
  # install dependencies
  - name: python
    entrypoint: pip
    args: ["install", "-r", "utils/requirements.txt", "--user"]

  # run
  - name: python
    entrypoint: python
    args: ["utils/add_dags_to_composer.py", "--dags_directory=${_DAGS_DIRECTORY}", "--dags_bucket=${_DAGS_BUCKET}"]

创建 Cloud Build 触发器

按照在 GitHub 中构建代码库中的说明操作 创建具有以下配置的基于 GitHub 应用的触发器的指南:

  • 名称add-dags-to-composer

  • 事件:推送到分支

  • Source(来源)- Repository(代码库):选择您的代码库

  • 来源 - 基本分支:^main$(将 main 更改为您的 代码库的基础分支(如果需要)

  • 来源 - 包含的文件过滤条件 (glob):dags/**

  • 构建配置 - Cloud Build 配置文件:/add-dags-to-composer.cloudbuild.yaml(build 文件的路径)

在“高级”配置中,添加两个替代变量

  • _DAGS_DIRECTORY - DAG 在代码库中所在的目录。如果您使用的是本指南中的示例代码库,则它为 dags/

  • _DAGS_BUCKET - 在开发 Cloud Composer 环境中包含 dags/ 目录的 Cloud Storage 存储桶。省略 gs:// 前缀。例如:us-central1-example-env-1234ab56-bucket

测试 CI/CD 流水线

在本部分中,我们将介绍一个 DAG 开发流程,此流程利用新技术 创建了 Cloud Build 触发器

运行预提交作业

向主分支创建拉取请求以测试 build。找到您的 提交前检查点击详细信息,然后选择 查看有关 Google Cloud Build 的更多详情,以便在 Google Cloud 控制台。

一个名为 test-dags 的 GitHub 检查的屏幕截图,其中有一个红色箭头指向括号中的项目名称
图 2.GitHub 上的 Cloud Build 提交前检查状态的屏幕截图(点击可放大)

如果预提交检查失败,请参阅解决构建失败的问题

验证 DAG 是否在开发 Cloud Composer 环境中正常运行

拉取请求获得批准后,将其合并到主分支。使用 Google Cloud 控制台查看构建结果。如果您有多个 Cloud Build 触发器,可以按触发器名称 add-dags-to-composer 过滤构建。

Cloud Build 同步作业成功后,系统会显示同步的 DAG Cloud Composer 开发环境中。您可以在其中验证 DAG 是否按预期运行。

将 DAG 添加到您的生产环境

在 DAG 按预期运行后,请手动将其添加到生产环境。为此,请将 DAG 文件上传到生产 Cloud Composer 环境存储桶中的 dags/ 目录。

如果您的 DAG 同步作业失败或 DAG 在开发 Cloud Composer 环境中未按预期运行,请参阅解决构建失败的问题

解决构建失败的问题

本部分介绍了如何解决常见的构建失败场景。

如果预提交检查失败,该怎么办?

在拉取请求中,点击详细信息,然后选择在 Google Cloud Build 中查看更多详情,即可在 Google Cloud 控制台中查看构建日志。使用这些日志可帮助您调试 DAG。解决问题后,提交修复程序并推送到 分支。系统会再次运行提交前检查,您可以继续迭代,并将日志用作调试工具。

如果我的 DAG 同步作业失败,该怎么办?

使用 Google Cloud 控制台 查看构建结果。如果您有多个 Cloud Build 触发器,可以按触发器名称 add-dags-to-composer 过滤构建。检查构建作业的日志并解决 错误。如果您在解决错误时需要其他帮助,请使用支持渠道

如果我的 DAG 在 Cloud Composer 环境中无法正常运行,该怎么办?

如果您的 DAG 在开发 Cloud Composer 环境中未按预期运行,请不要手动将 DAG 提升到 Cloud Composer 生产环境。请改为采用以下方法之一:

  • 还原拉取请求 包含破坏 DAG 以将其恢复为状态的更改 (这也会还原该拉取请求中的所有其他文件)。
  • 创建新的拉取请求,以手动还原对损坏的 DAG 进行的更改。
  • 创建新的拉取请求以修复 DAG 中的错误。

完成以下任何步骤后,系统都会触发新的预提交检查,并在合并后执行 DAG 同步作业。

后续步骤