为数据处理工作流设置 CI/CD 流水线

Last reviewed 2023-04-20 UTC

本教程介绍如何设置持续集成/持续部署 (CI/CD) 流水线,以通过在 Google Cloud 上使用代管式产品实现 CI/CD 方法来处理数据。数据科学家和分析师可以调整从 CI/CD 实践中总结出来的方法,帮助确保数据流程和工作流的高质量、可维护性和适应性。您可以运用以下方法:

  • 源代码的版本控制。
  • 自动构建、测试和部署应用。
  • 与生产环境分开且隔离。
  • 可复制的环境设置程序。

本教程适用于担任下列工作的数据科学家和分析师:构建重复运行的数据处理作业以帮助他们构建系统性且自动维护数据处理工作负载的研发 (R&D)。

部署架构

在本指南中,您将使用以下 Google Cloud 产品:

  • Cloud Build,创建 CI/CD 流水线,用于构建、部署和测试数据处理工作流以及数据处理本身。 Cloud Build 是一项代管式服务,可在 Google Cloud 上运行构建。构建是指一系列构建步骤,其中每个步骤都在 Docker 容器中运行。
  • Cloud Composer,定义和运行工作流步骤,例如启动数据处理、测试和验证结果。Cloud Composer 是一项代管式 Apache Airflow 服务,提供环境供您创建、调度、监控和管理复杂工作流,例如本教程中的数据处理工作流。
  • Dataflow:用于将 Apache Beam WordCount 示例作为示例数据流程运行。

CI/CD 流水线

概括来讲,CI/CD 流水线包含以下步骤:

  1. Cloud Build 使用 Maven 构建器将 WordCount 示例打包进自运行的 Java 归档 (JAR) 文件中。 Maven Builder 是安装有 Maven 的容器。当构建步骤配置为使用 Maven 构建器时,Maven 会运行这些任务。
  2. Cloud Build 会将 JAR 文件上传到 Cloud Storage。
  3. Cloud Build 对数据处理工作流代码运行单元测试,并将工作流代码部署到 Cloud Composer。
  4. Cloud Composer 选取 JAR 文件并在 Dataflow 上运行数据处理作业。

下图显示了 CI/CD 流水线步骤的详细视图。

CI/CD 流水线的架构图。

在本教程中,测试环境和生产环境的部署分为两个不同的 Cloud Build 流水线 - 测试流水线和生产流水线。

在上图中,测试流水线包含以下步骤:

  1. 开发者对 Cloud Source Repositories 代码库的代码进行更改。
  2. 代码更改触发 Cloud Build 中的测试构建。
  3. Cloud Build 构建自动执行 JAR 文件,并将其部署到 Cloud Storage 上的测试 JAR 存储分区。
  4. Cloud Build 将测试文件部署到 Cloud Storage 上的测试文件存储分区。
  5. Cloud Build 在 Cloud Composer 中设置变量,以引用新部署的 JAR 文件。
  6. Cloud Build 测试数据处理工作流有向无环图 (DAG),并将其部署到 Cloud Storage 上的 Cloud Composer 存储分区。
  7. 此时工作流 DAG 文件已部署到 Cloud Composer。
  8. Cloud Build 触发新部署的数据处理工作流开始运行。
  9. 数据处理工作流集成测试通过后,消息会发布到 Pub/Sub,该消息包含对消息数据字段中最新自动执行 JAR(从 Airflow 变量中获取)的引用。

在上图中,生产流水线包含以下步骤:

  1. 当消息发布到 Pub/Sub 主题时,系统会触发生产部署流水线。
  2. 开发者手动批准生产部署流水线,构建运行。
  3. Cloud Build 将最新的自动执行 JAR 文件从测试 JAR 存储分区复制到 Cloud Storage 上的生产 JAR 存储分区。
  4. Cloud Build 测试生产数据处理工作流 DAG 并将其部署到 Cloud Storage 上的 Cloud Composer 存储分区。
  5. 此时生产工作流 DAG 文件已部署到 Cloud Composer。

在本教程中,生产数据处理工作流部署到与测试工作流相同的 Cloud Composer 环境中,以就所有数据处理工作流提供统一视图。为了达到本教程的目的,我们使用不同的 Cloud Storage 分区来保存输入和输出数据,从而分离环境。

为了完全分离环境,您需要在不同项目中创建多个 Cloud Composer 环境,这些环境默认情况下彼此分离。这种分离有助于保护您的生产环境。此方法不在本教程探讨范围之内。如需详细了解如何访问多个 Google Cloud 项目中的资源,请参阅设置服务帐号权限

数据处理工作流

如需了解 Cloud Composer 如何运行数据处理工作流,请参阅用 Python 编写的有向无环图 (DAG)。在 DAG 中,数据处理工作流的所有步骤都通过它们之间的依赖关系一起定义。

CI/CD 流水线会在每个构建中自动将 DAG 定义从 Cloud Source Repositories 代码库部署到 Cloud Composer。此流程可确保 Cloud Composer 始终与最新的工作流定义保持同步,无需任何人工干预。

测试环境的 DAG 定义除了数据处理工作流之外,还定义了端到端测试步骤。测试步骤有助于确保数据处理工作流正确运行。

下图展示了数据处理工作流。

四步数据处理工作流。

数据处理工作流包括以下步骤:

  1. 在 Dataflow 中运行 WordCount 数据流程。
  2. 从 WordCount 流程中下载输出文件。WordCount 流程输出三个文件:

    • download_result_1
    • download_result_2
    • download_result_3
  3. 下载名为 download_ref_string 的参考文件。

  4. 根据参考文件验证结果。此集成测试汇总所有三份结果,并将汇总的结果与参考文件进行比较。

  5. 集成测试通过后,将消息发布到 Pub/Sub。

使用 Cloud Composer 等任务编排框架来管理数据处理工作流,这有助于缓解工作流的代码复杂性。

测试

除了验证端到端数据处理工作流的集成测试之外,本教程还包含两个单元测试。分别是数据处理代码自动测试和数据处理工作流代码自动测试。数据处理代码测试使用 Java 编写,在 Maven 构建过程中自动运行。数据处理工作流代码测试使用 Python 编写,作为独立的构建步骤运行。

目标

  • 配置 Cloud Composer 环境。
  • 为您的数据创建 Cloud Storage 存储分区。
  • 创建构建、测试和生产流水线。
  • 配置构建触发器。

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

完成本文档中描述的任务后,您可以通过删除所创建的资源来避免继续计费。如需了解详情,请参阅清理

准备工作

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目

    转到“项目选择器”

  3. 确保您的 Google Cloud 项目已启用结算功能

  4. 启用 Cloud Build, Cloud Source Repositories, Cloud Composer, and Dataflow API。

    启用 API

  5. 在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目

    转到“项目选择器”

  6. 确保您的 Google Cloud 项目已启用结算功能

  7. 启用 Cloud Build, Cloud Source Repositories, Cloud Composer, and Dataflow API。

    启用 API

示例代码

示例代码位于以下两个文件夹中:

  • env-setup 文件夹包含用于初始设置 Google Cloud 环境的 Shell 脚本。
  • source-code 文件夹包含随时间推移而开发的代码,需要由源代码控制,可触发自动编译和测试流程。此文件夹包含以下子文件夹:

    • data-processing-code 文件夹包含 Apache Beam 流程源代码。
    • workflow-dag 文件夹包含数据处理工作流的 Composer DAG 定义,以及设计、实施和测试 Dataflow 流程的步骤。
    • build-pipeline 文件夹包含两个 Cloud Build 配置:一个用于测试流水线,另一个用于生产流水线。此文件夹还包含流水线的支持脚本。

为了实现本教程的目的,数据处理和 DAG 工作流的源代码文件位于同一源代码存储库中的不同文件夹中。 在生产环境中,源代码文件通常位于自己的源代码存储库中,由不同的团队管理。

设置您的环境

在本教程中,您将在 Cloud Shell 中运行命令。 Cloud Shell 显示为 Google Cloud Console 底部的一个窗口。

  1. 在 Google Cloud Console 中,打开 Cloud Shell:

    打开 Cloud Shell

  2. 克隆示例代码库:

    git clone https://github.com/GoogleCloudPlatform/ci-cd-for-data-processing-workflow.git
    
  3. 运行脚本以设置环境变量:

    cd ~/ci-cd-for-data-processing-workflow/env-setup
    source set_env.sh
    

    该脚本设置以下环境变量:

    • 您的 Google Cloud 项目 ID
    • 您的地区和区域
    • 构建流水线和数据处理工作流使用的 Cloud Storage 存储分区的名称。

    由于会话之间不保留环境变量,您完成本教程时,如果您的 Cloud Shell 会话关闭或断开连接,则需要重置环境变量。

创建 Cloud Composer 环境

在本教程中,您将设置一个 Cloud Composer 环境。

  1. 在 Cloud Shell 中,将 Cloud Composer v2 API Service Agent Extension (roles/composer.ServiceAgentV2Ext) 角色添加到 Cloud Composer Service Agent 帐号:

    gcloud projects add-iam-policy-binding $GCP_PROJECT_ID \
        --member serviceAccount:service-$PROJECT_NUMBER@cloudcomposer-accounts.iam.gserviceaccount.com \
        --role roles/composer.ServiceAgentV2Ext
    
  2. 在 Cloud Shell 中,创建 Cloud Composer 环境:

    gcloud composer environments create $COMPOSER_ENV_NAME \
        --location $COMPOSER_REGION \
        --image-version composer-2.0.14-airflow-2.2.5
    
  3. 运行脚本以在 Cloud Composer 环境中设置变量。数据处理 DAG 需要变量。

    cd ~/ci-cd-for-data-processing-workflow/env-setup
    chmod +x set_composer_variables.sh
    ./set_composer_variables.sh
    

    该脚本设置以下环境变量:

    • 您的 Google Cloud 项目 ID
    • 您的地区和区域
    • 构建流水线和数据处理工作流使用的 Cloud Storage 存储分区的名称。

提取 Cloud Composer 环境属性

Cloud Composer 使用 Cloud Storage 存储分区来存储 DAG。 将 DAG 定义文件移动到该存储分区会触发 Cloud Composer 自动读取文件。您在创建 Cloud Composer 环境时为 Cloud Composer 创建了 Cloud Storage 存储分区。 在以下程序中,您将提取存储分区的网址,然后配置 CI/CD 流水线以自动将 DAG 定义部署到 Cloud Storage 存储分区。

  1. 在 Cloud Shell 中,将存储分区的网址导出为环境变量:

    export COMPOSER_DAG_BUCKET=$(gcloud composer environments describe $COMPOSER_ENV_NAME \
        --location $COMPOSER_REGION \
        --format="get(config.dagGcsPrefix)")
    
  2. 导出 Cloud Composer 使用的服务帐号的名称,以便访问 Cloud Storage 存储分区:

    export COMPOSER_SERVICE_ACCOUNT=$(gcloud composer environments describe $COMPOSER_ENV_NAME \
        --location $COMPOSER_REGION \
        --format="get(config.nodeConfig.serviceAccount)")
    

创建 Cloud Storage 存储分区

在本节中,您将创建一组 Cloud Storage 存储分区以存储以下内容:

  • 构建过程的中间步骤的工件。
  • 数据处理工作流的输入和输出文件。
  • Dataflow 作业存储其二进制文件的暂存位置。

如需创建 Cloud Storage 分区,请完成以下步骤:

  • 在 Cloud Shell 中,创建 Cloud Storage 存储分区并授予 Cloud Composer 服务帐号运行数据处理工作流的权限:

    cd ~/ci-cd-for-data-processing-workflow/env-setup
    chmod +x create_buckets.sh
    ./create_buckets.sh
    

创建 Pub/Sub 主题

在本部分中,您将创建一个 Pub/Sub 主题,用于接收从数据处理工作流集成测试发送的消息,以自动触发生产构建流水线。

  1. 在 Google Cloud 控制台中,进入 Pub/Sub 主题页面。

    进入“主题”页面

  2. 点击创建主题

  3. 如需配置主题,请完成以下步骤:

    • 对于主题 ID,输入 integration-test-complete-topic
    • 确认添加默认订阅选项处于选中状态。
    • 将其余选项保留未选中状态。
    • 对于加密,选择 Google 管理的加密密钥
    • 点击创建主题

    创建 Pub/Sub 主题。

将源代码推送到 Cloud Source Repositories 代码库

在本教程中,您需要将一个源代码库添加到版本控制中。下列步骤展示了代码库是如何开发并随时间发生变化的。每当将更改推送到存储库时,系统都会触发流水线的构建、部署和测试过程。

  • 在 Cloud Shell 中,将 source-code 文件夹推送到 Cloud Source Repositories:

    gcloud source repos create $SOURCE_CODE_REPO
    cp -r ~/ci-cd-for-data-processing-workflow/source-code ~/$SOURCE_CODE_REPO
    cd ~/$SOURCE_CODE_REPO
    git init
    git remote add google \
        https://source.developers.google.com/p/$GCP_PROJECT_ID/r/$SOURCE_CODE_REPO
    git add .
    git commit -m 'initial commit'
    git push google master
    

    这些是在新目录中初始化 Git 并将内容推送到远程代码库的标准命令。

创建 Cloud Build 流水线

在本节中,您将创建用于构建、部署和测试数据处理工作流的构建流水线。

授予对 Cloud Build 服务帐号的访问权限

Cloud Build 会部署 Cloud Composer DAG 并触发工作流,这些工作流会在您向 Cloud Build 服务帐号添加其他访问权限时启用。如需详细了解使用 Cloud Composer 时可用的不同角色,请参阅访问控制文档

  1. 在 Cloud Shell 中,为 Cloud Build 服务帐号添加 composer.admin 角色,使 Cloud Build 作业可以在 Cloud Composer 中设置 Airflow 变量:

    gcloud projects add-iam-policy-binding $GCP_PROJECT_ID \
        --member=serviceAccount:$PROJECT_NUMBER@cloudbuild.gserviceaccount.com \
        --role=roles/composer.admin
    
  2. 为 Cloud Build 服务帐号添加 composer.worker 角色,使 Cloud Build 作业可以触发 Cloud Composer 中的数据工作流:

    gcloud projects add-iam-policy-binding $GCP_PROJECT_ID \
        --member=serviceAccount:$PROJECT_NUMBER@cloudbuild.gserviceaccount.com \
        --role=roles/composer.worker
    

创建构建和测试流水线

构建和测试流水线步骤在 YAML 配置文件中配置。 在本教程中,您将为 gitmavengsutilgcloud 使用预构建的构建器映像,以在每个构建步骤中运行任务。 您可以使用配置变量替代内容来定义构建时的环境设置。源代码存储库位置由变量替代内容以及 Cloud Storage 存储分区的位置定义。构建时,有了此信息才能部署 JAR 文件、测试文件和 DAG 定义。

  • 在 Cloud Shell 中,提交构建流水线配置文件以在 Cloud Build 中创建流水线:

    cd ~/ci-cd-for-data-processing-workflow/source-code/build-pipeline
    gcloud builds submit --config=build_deploy_test.yaml --substitutions=\
    REPO_NAME=$SOURCE_CODE_REPO,\
    _DATAFLOW_JAR_BUCKET=$DATAFLOW_JAR_BUCKET_TEST,\
    _COMPOSER_INPUT_BUCKET=$INPUT_BUCKET_TEST,\
    _COMPOSER_REF_BUCKET=$REF_BUCKET_TEST,\
    _COMPOSER_DAG_BUCKET=$COMPOSER_DAG_BUCKET,\
    _COMPOSER_ENV_NAME=$COMPOSER_ENV_NAME,\
    _COMPOSER_REGION=$COMPOSER_REGION,\
    _COMPOSER_DAG_NAME_TEST=$COMPOSER_DAG_NAME_TEST
    

    此命令指示 Cloud Build 通过以下步骤运行构建:

    1. 构建并部署 WordCount 自动执行 JAR 文件。

      1. 查看源代码。
      2. 将 WordCount Beam 源代码编译为自动执行 JAR 文件。
      3. 将 JAR 文件存储在 Cloud Storage 中,Cloud Composer 可以从中选取该文件以运行 WordCount 处理作业。
    2. 在 Cloud Composer 上部署和设置数据处理工作流。

      1. 对工作流 DAG 使用的自定义操作符代码运行单元测试。
      2. 在 Cloud Storage 上部署测试输入文件和测试参考文件。测试输入文件是 WordCount 处理作业的输入。使用测试参考文件作为参考,以验证 WordCount 处理作业的输出。
      3. 将 Cloud Composer 变量设置为指向新构建的 JAR 文件。
      4. 将工作流 DAG 定义部署到 Cloud Composer 环境。
    3. 在测试环境中运行数据处理工作流以触发测试处理工作流。

验证构建和测试流水线

提交构建文件后,请验证构建步骤。

  1. 在 Google Cloud 控制台中,转到构建记录页面以查看过去和当前正在运行的所有构建的列表。

    转到“构建记录”页面

  2. 点击当前正在运行的构建。

  3. 构建详情页面上,验证构建步骤是否与上述步骤相符。

    构建步骤的详细信息。

    构建结束时,构建详情页面上的构建状态字段会显示 Build successful

  4. 在 Cloud Shell 中,验证 WordCount 示例 JAR 文件是否已复制到正确的存储分区:

    gsutil ls gs://$DATAFLOW_JAR_BUCKET_TEST/dataflow_deployment*.jar
    

    输出类似于以下内容:

    gs://…-composer-dataflow-source-test/dataflow_deployment_e88be61e-50a6-4aa0-beac-38d75871757e.jar
    
  5. 获取您的 Cloud Composer 网页界面的网址。请记下该网址,因为下一步需要使用它。

    gcloud composer environments describe $COMPOSER_ENV_NAME \
        --location $COMPOSER_REGION \
        --format="get(config.airflowUri)"
    
  6. 使用上一步获取的网址转到 Cloud Composer 界面,验证 DAG 是否成功运行。如果运行列未显示任何信息,请等待几分钟并重新加载页面。

    1. 如需验证数据处理工作流 DAG test_word_count 是否已部署且处于运行模式,请将指针悬停在运行下方的浅绿色圆圈上,并验证它是否显示正在运行

      DAG 处理的正在运行状态。

    2. 如要以图形方式查看正在运行的数据处理工作流,请点击浅绿色圆圈,然后点击 DAG 运行页面上的 Dag Id:test_word_count

    3. 重新加载图表视图页面以更新当前 DAG 运行的状态。工作流通常需要三到五分钟才能完成。如需验证 DAG 运行是否成功完成,请将指针放在每个任务上,以验证提示是否显示状态:成功。倒数第二个任务名为 do_comparison,是用于根据参考文件验证流程输出的集成测试。

  7. 集成测试完成后,最后一个任务(名为 publish_test_complete)会将消息发布到 integration-test-complete-topic Pub/Sub 主题,以用于触发生产构建流水线。

    1. 如需验证已发布的消息是否包含对最新 JAR 文件的正确引用,我们可以从默认的 integration-test-complete-topic-sub Pub/Sub 订阅拉取该消息。

    2. 在 Google Cloud 控制台中,进入订阅页面。

      进入“订阅”页面

    3. 点击 integration-test-complete-topic-sub,选择消息标签页,然后点击拉取

    4. 输出应类似如下所示:

      测试完成消息。

创建生产流水线

当测试处理工作流成功运行时,您可以将当前版本的工作流提升至生产环境。目前有以下几种方法可以将工作流部署到生产环境:

  • 手动。
  • 在测试或暂存环境中通过所有测试时自动触发。
  • 由预定作业自动触发。

在本教程中,当测试环境中所有测试都通过时,系统会自动触发生产构建。如需详细了解自动化方法,请参阅发布工程

在实现自动化方法之前,您需要手动部署到生产环境,以验证生产部署构建。生产部署构建遵循以下步骤:

  1. 将 WordCount JAR 文件从测试存储分区复制到生产存储分区。
  2. 将生产工作流的 Cloud Composer 变量设置为指向新提升的 JAR 文件。
  3. 在 Cloud Composer 环境中部署生产工作流 DAG 定义并运行该工作流。

变量替代内容通过生产处理工作流使用的 Cloud Storage 存储分区,定义部署到生产环境中的最新 JAR 文件的名称。如需创建部署生产工作流的 Cloud Build 流水线,请完成以下步骤:

  1. 在 Cloud Shell 中,通过输出 JAR 文件名的 Cloud Composer 变量来读取最新 JAR 文件的文件名:

    export DATAFLOW_JAR_FILE_LATEST=$(gcloud composer environments run $COMPOSER_ENV_NAME \
        --location $COMPOSER_REGION variables get -- \
        dataflow_jar_file_test 2>&1 | grep -i '.jar')
    
  2. 使用构建流水线配置文件 deploy_prod.yaml, 在 Cloud Build 中创建流水线:

    cd ~/ci-cd-for-data-processing-workflow/source-code/build-pipeline
    gcloud builds submit --config=deploy_prod.yaml --substitutions=\
    REPO_NAME=$SOURCE_CODE_REPO,\
    _DATAFLOW_JAR_BUCKET_TEST=$DATAFLOW_JAR_BUCKET_TEST,\
    _DATAFLOW_JAR_FILE_LATEST=$DATAFLOW_JAR_FILE_LATEST,\
    _DATAFLOW_JAR_BUCKET_PROD=$DATAFLOW_JAR_BUCKET_PROD,\
    _COMPOSER_INPUT_BUCKET=$INPUT_BUCKET_PROD,\
    _COMPOSER_ENV_NAME=$COMPOSER_ENV_NAME,\
    _COMPOSER_REGION=$COMPOSER_REGION,\
    _COMPOSER_DAG_BUCKET=$COMPOSER_DAG_BUCKET,\
    _COMPOSER_DAG_NAME_PROD=$COMPOSER_DAG_NAME_PROD
    

验证生产流水线创建的数据处理工作流

  1. 获取 Cloud Composer 界面的网址:

    gcloud composer environments describe $COMPOSER_ENV_NAME \
        --location $COMPOSER_REGION \
        --format="get(config.airflowUri)"
    
  2. 如需验证生产数据处理工作流 DAG 是否已部署,请转到在上一步中检索到的网址,并验证 prod_word_count DAG 是否包含在 DAG 列表中。

    1. DAG 页面的 prod_word_count 行中,点击 Trigger DAG(触发 DAG)。

      播放图标以触发 DAG。

  3. 点击 Airflow 徽标或重新加载页面以更新 DAG 运行状态。运行列中的浅绿色圆圈表示 DAG 当前正在运行。将指针悬停在圆圈上,以查看显示正在运行的提示。

    DAG 运行的正在运行状态。

  4. 运行成功后,将指针悬停在 DAG 运行列下方的深绿色圆圈上,并验证它是否显示成功

    DAG 运行的成功状态。

  5. 在 Cloud Shell 中,列出 Cloud Storage 存储分区中的结果文件:

    gsutil ls gs://$RESULT_BUCKET_PROD
    

    输出类似于以下内容:

    gs://…-composer-result-prod/output-00000-of-00003
    gs://…-composer-result-prod/output-00001-of-00003
    gs://…-composer-result-prod/output-00002-of-00003
    

创建 Cloud Build 触发器

在本部分中,您将创建 Cloud Build 触发器,以将源代码更改关联到测试构建流程以及测试流水线和生产构建流水线之间。

配置测试构建流水线触发器

您可以设置 Cloud Build 触发器,在将更改推送到源代码库的主分支时触发新构建。

  1. 在 Google Cloud 控制台中,进入构建触发器页面。

    转到“构建触发器”页面

  2. 点击创建触发器

  3. 如需配置触发器设置,请完成以下步骤:

    • 名称字段中,输入 trigger-build-in-test-environment
    • 区域下拉列表中,选择全球(非区域级)
    • 对于事件,点击推送到分支
    • 对于来源,选择 data-pipeline-source
    • 分支名称字段中,输入 master
    • 配置部分,点击 Cloud Build 配置文件(yaml 或 json)
    • 对于位置,点击代码库
    • Cloud Build 配置文件位置字段中,输入 build-pipeline/build_deploy_test.yaml
  4. 在 Cloud Shell 中,运行以下命令以获取进行构建所需的所有替代变量。记下下列值,因为在以后的步骤中需要用到。

    echo "_COMPOSER_DAG_BUCKET : ${COMPOSER_DAG_BUCKET}
    _COMPOSER_DAG_NAME_TEST : ${COMPOSER_DAG_NAME_TEST}
    _COMPOSER_ENV_NAME : ${COMPOSER_ENV_NAME}
    _COMPOSER_INPUT_BUCKET : ${INPUT_BUCKET_TEST}
    _COMPOSER_REF_BUCKET : ${REF_BUCKET_TEST}
    _COMPOSER_REGION : ${COMPOSER_REGION}
    _DATAFLOW_JAR_BUCKET : ${DATAFLOW_JAR_BUCKET_TEST}"
    

    注意:输出名称/值对用于以下步骤

  5. 触发器设置页面的 Advanced, Substitution variables(高级替代变量)下,将相应变量替换为您在上一步从环境中获得的值。一次性添加以下内容,然后点击每个名称-值对对应的 + 添加项

    • _COMPOSER_DAG_BUCKET
    • _COMPOSER_DAG_NAME_TEST
    • _COMPOSER_ENV_NAME
    • _COMPOSER_INPUT_BUCKET
    • _COMPOSER_REF_BUCKET
    • _COMPOSER_REGION
    • _DATAFLOW_JAR_BUCKET

      映射名称-值对。

  6. 点击创建

配置生产构建流水线触发器

您可以设置 Cloud Build 触发器,以在测试环境中通过测试且消息发布到 tests-complete Pub/Sub 主题后触发生产构建。此触发器包括一个批准步骤,其中在运行生产流水线之前需要手动批准构建。

  1. 在 Google Cloud 控制台中,进入构建触发器页面。

    转到“构建触发器”页面

  2. 点击创建触发器

  3. 如需配置触发器设置,请完成以下步骤:

    • 名称字段中,输入 trigger-build-in-prod-environment
    • 区域下拉列表中,选择全球(非区域级)
    • 对于事件,点击 Pub/Sub 消息
    • 对于订阅,选择 integration-test-complete-topic
    • 对于来源,选择 data-pipeline-source
    • 对于修订版本,选择分支
    • 分支名称字段中,输入 master
    • 配置部分,点击 Cloud Build 配置文件(yaml 或 json)
    • 对于位置,点击代码库
    • Cloud Build 配置文件位置字段中,输入 build-pipeline/deploy_prod.yaml
  4. 在 Cloud Shell 中,运行以下命令以获取进行构建所需的所有替代变量。记下下列值,因为在以后的步骤中需要用到。

    echo "_COMPOSER_DAG_BUCKET : ${COMPOSER_DAG_BUCKET}
    _COMPOSER_DAG_NAME_PROD : ${COMPOSER_DAG_NAME_PROD}
    _COMPOSER_ENV_NAME : ${COMPOSER_ENV_NAME}
    _COMPOSER_INPUT_BUCKET : ${INPUT_BUCKET_PROD}
    _COMPOSER_REGION : ${COMPOSER_REGION}
    _DATAFLOW_JAR_BUCKET_PROD : ${DATAFLOW_JAR_BUCKET_PROD}
    _DATAFLOW_JAR_BUCKET_TEST : ${DATAFLOW_JAR_BUCKET_TEST}"
    

    注意:输出名称/值对用于以下步骤

  5. 触发器设置页面的 Advanced, Substitution variables(高级替代变量)下,将相应变量替换为您在上一步从环境中获得的值。一次性添加以下内容,然后点击每个名称-值对对应的 + 添加项

    • _COMPOSER_DAG_BUCKET
    • _COMPOSER_DAG_NAME_PROD
    • _COMPOSER_ENV_NAME
    • _COMPOSER_INPUT_BUCKET
    • _COMPOSER_REGION
    • _DATAFLOW_JAR_BUCKET_PROD
    • _DATAFLOW_JAR_BUCKET_TEST
    • _DATAFLOW_JAR_FILE_LATEST = $(body.message.data)

      映射名称-值对。

  6. 对于批准,选中构建执行前需要获得批准

  7. 点击创建

测试触发器

如需测试触发器,请在测试输入文件中添加一个新单词,并对测试参考文件进行相应的调整。您验证构建流水线是否已由向 Cloud Source Repositories 代码库进行的推送触发,同时验证数据处理工作流是否正随更新后的测试文件正确运行。

  1. 在 Cloud Shell 中,在测试文件末尾添加一个测试字词:

    echo "testword" >>  ~/$SOURCE_CODE_REPO/workflow-dag/support-files/input.txt
    
  2. 更新测试结果参考文件 ref.txt,以匹配测试输入文件中完成的更改:

    echo "testword: 1" >>  ~/$SOURCE_CODE_REPO/workflow-dag/support-files/ref.txt
    
  3. 提交并推送更改至 Cloud Source Repositories 代码库:

    cd ~/$SOURCE_CODE_REPO
    git add .
    git commit -m 'change in test files'
    git push google master
    
  4. 在 Google Cloud 控制台中,进入历史记录页面。

    进入“历史记录”页面

  5. 如需验证新测试构建是否是由上一向主分支进行的推送触发的,请在当前正在运行的构建中查看参考列是否显示为

  6. 在 Cloud Shell 中,获取 Cloud Composer 网页界面的网址:

    gcloud composer environments describe $COMPOSER_ENV_NAME \
        --location $COMPOSER_REGION --format="get(config.airflowUri)"
    
  7. 构建完成后,转到从上一命令获得的网址,验证 test_word_count DAG 是否正在运行。

    等 DAG 运行完毕,DAG 运行列中的浅绿色圆圈消失时即表示运行完毕。此过程通常需要三到五分钟才能完成。

  8. 在 Cloud Shell 中,下载测试结果文件:

    mkdir ~/result-download
    cd ~/result-download
    gsutil cp gs://$RESULT_BUCKET_TEST/output* .
    
  9. 验证新添加的字词是否位于其中一个结果文件中:

    grep testword output*
    

    输出类似于以下内容:

    output-00000-of-00003:testword: 1
    
  10. 在 Google Cloud 控制台中,进入历史记录页面。

    进入“历史记录”页面

  11. 验证集成测试完成是否触发了新生产构建,以及该构建是否正在等待批准。

    已触发生产构建。

  12. 选中构建旁边的复选框,点击批准,然后点击确认框中的批准,以运行生产构建流水线。

  13. 构建完成后,前往上一个命令输出的网址并手动触发 prod_word_count DAG 以运行生产流水线。

    播放图标以触发 DAG。

清理

为避免因本教程中使用的资源导致您的 Google Cloud 帐号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。

删除项目

  1. 在 Google Cloud 控制台中,进入管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

逐个删除资源

如果您希望保留本教程中使用的项目,请按照以下步骤删除您在本教程中创建的资源。

  1. 如需删除 Cloud Build 触发器,请完成以下步骤:

    1. 在 Google Cloud 控制台中,转到触发器页面。

      进入“触发器”页面

    2. 在您创建的触发器旁边,点击更多,然后点击删除

  2. 在 Cloud Shell 中,删除 Cloud Composer 环境:

    gcloud -q composer environments delete $COMPOSER_ENV_NAME \
        --location $COMPOSER_REGION
    
  3. 删除 Cloud Storage 存储分区及其文件:

    gsutil -m rm -r gs://$DATAFLOW_JAR_BUCKET_TEST \
        gs://$INPUT_BUCKET_TEST \
        gs://$REF_BUCKET_TEST \
        gs://$RESULT_BUCKET_TEST \
        gs://$DATAFLOW_STAGING_BUCKET_TEST \
        gs://$DATAFLOW_JAR_BUCKET_PROD \
        gs://$INPUT_BUCKET_PROD \
        gs://$RESULT_BUCKET_PROD \
        gs://$DATAFLOW_STAGING_BUCKET_PROD
    
  4. 如需删除 Pub/Sub 主题和默认订阅,请在 Cloud Shell 中运行以下命令:

    gcloud pubsub topics delete integration-test-complete-topic
    gcloud pubsub subscriptions delete integration-test-complete-topic-sub
    
  5. 删除代码库:

    gcloud -q source repos delete $SOURCE_CODE_REPO
    
  6. 删除您创建的文件和文件夹:

    rm -rf ~/ci-cd-for-data-processing-workflow
    rm -rf ~/$SOURCE_CODE_REPO
    rm -rf ~/result-download
    

后续步骤