使用 Workflows 执行 Cloud Run 作业


借助 Workflows,您可以在工作流中执行 Cloud Run 作业,以执行更复杂的数据处理或编排现有作业的系统。

本教程演示了如何使用 Workflows 执行 Cloud Run 作业,该作业处理作为环境变量传递给作业的数据,以响应来自 Cloud Storage 的事件。

请注意,您还可以将事件数据存储在 Cloud Storage 存储桶中,从而使用客户管理的加密密钥加密数据。如需了解详情,请参阅执行 Cloud Run 作业以处理保存在 Cloud Storage 中的事件数据

目标

在此教程中,您将学习以下操作:

  1. 创建一个 Cloud Run 作业,用于处理 Cloud Storage 存储桶中的数据文件。
  2. 部署执行以下操作的工作流:
    1. 接受 Cloud Storage 事件作为参数。
    2. 检查事件中指定的 Cloud Storage 存储桶是否与 Cloud Run 作业使用的存储桶相同。
    3. 如果是这样,请使用 Cloud Run Admin API 连接器执行 Cloud Run 作业。
  3. 创建一个 Eventarc 触发器,用于执行工作流以响应影响 Cloud Storage 存储桶的事件。
  4. 通过更新 Cloud Storage 存储桶中的输入数据文件来触发工作流。

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

准备工作

您的组织定义的安全限制条件可能会导致您无法完成以下步骤。如需了解相关问题排查信息,请参阅在受限的 Google Cloud 环境中开发应用

控制台

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目

    转到“项目选择器”

  3. 确保您的 Google Cloud 项目已启用结算功能

  4. 启用 Artifact Registry、Cloud Build、Cloud Run、Cloud Storage、Eventarc 和 Workflows API。

    启用 API

  5. 创建服务帐号:

    1. 在 Google Cloud 控制台中,转到创建服务帐号页面。

      转到“创建服务帐号”
    2. 选择您的项目。
    3. 服务帐号名称字段中,输入一个名称。Google Cloud 控制台会根据此名称填充服务帐号 ID 字段。

      服务帐号说明字段中,输入说明。例如,Service account for quickstart

    4. 点击创建并继续
    5. 向服务帐号授予以下角色: Cloud Run Admin, Eventarc Event Receiver, Logs Writer, Workflows Invoker.

      如需授予角色,请找到选择角色列表,然后选择相应角色。

      如需授予其他角色,请点击 添加其他角色,然后添加其他各个角色。

    6. 点击继续
    7. 点击完成以完成服务帐号的创建过程。

  6. 在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目

    转到“项目选择器”

  7. 确保您的 Google Cloud 项目已启用结算功能

  8. 启用 Artifact Registry、Cloud Build、Cloud Run、Cloud Storage、Eventarc 和 Workflows API。

    启用 API

  9. 创建服务帐号:

    1. 在 Google Cloud 控制台中,转到创建服务帐号页面。

      转到“创建服务帐号”
    2. 选择您的项目。
    3. 服务帐号名称字段中,输入一个名称。Google Cloud 控制台会根据此名称填充服务帐号 ID 字段。

      服务帐号说明字段中,输入说明。例如,Service account for quickstart

    4. 点击创建并继续
    5. 向服务帐号授予以下角色: Cloud Run Admin, Eventarc Event Receiver, Logs Writer, Workflows Invoker.

      如需授予角色,请找到选择角色列表,然后选择相应角色。

      如需授予其他角色,请点击 添加其他角色,然后添加其他各个角色。

    6. 点击继续
    7. 点击完成以完成服务帐号的创建过程。

  10. 在为来自 Cloud Storage 的直接事件创建触发器之前,请将 Pub/Sub Publisher 角色 (roles/pubsub.publisher) 授予 Cloud Storage 服务代理(一个由 Google 代管式服务帐号):
    1. 在 Google Cloud 控制台中,前往 IAM 页面。

      转到 IAM

    2. 选中包括 Google 提供的角色授权复选框。
    3. 主账号列中,找到格式为 service-PROJECT_NUMBER@gs-project-accounts.iam.gserviceaccount.com 的 Cloud Storage 服务代理,然后点击相应行中的 修改主账号
    4. 点击 添加角色添加其他角色
    5. 选择角色列表中,过滤出 Pub/Sub Publisher,然后选择该角色。
    6. 点击保存
  11. 如果您在 2021 年 4 月 8 日或之前启用了 Cloud Pub/Sub 服务代理,以支持经过身份验证的 Pub/Sub 推送请求,请向 Google 代管式服务帐号授予 Service Account Token Creator 角色 (roles/iam.serviceAccountTokenCreator)。否则,系统会默认授予此角色:
    1. 在 Google Cloud 控制台中,前往 IAM 页面。

      转到 IAM

    2. 选中包括 Google 提供的角色授权复选框。
    3. 名称列中,找到 Cloud Pub/Sub 服务帐号,然后点击相应行中的 修改主帐号
    4. 点击 添加角色添加其他角色
    5. 选择角色列表中,过滤出 Service Account Token Creator,然后选择角色。
    6. 点击保存
  12. 在 Google Cloud 控制台中,激活 Cloud Shell。

    激活 Cloud Shell

    Cloud Shell 会话随即会在 Google Cloud 控制台的底部启动,并显示命令行提示符。Cloud Shell 是一个已安装 Google Cloud CLI 且已为当前项目设置值的 Shell 环境。该会话可能需要几秒钟时间来完成初始化。

  13. Cloud Shell 支持本教程中用于生成伪随机数的 /dev/urandom 命令。

gcloud

  1. 如需使用已设置 gcloud CLI 的在线终端,请激活 Cloud Shell:

    在此页面底部,一个 Cloud Shell 会话将启动并显示命令行提示符。该会话可能需要几秒钟来完成初始化。

    Cloud Shell 支持本教程中用于生成伪随机数的 /dev/urandom 命令。

  2. 创建或选择 Google Cloud 项目。
    • 创建 Google Cloud 项目:

      gcloud projects create PROJECT_ID
    • 选择您创建的 Google Cloud 项目:

      gcloud config set project PROJECT_ID
  3. 确保您的 Google Cloud 项目已启用结算功能
  4. 启用 Artifact Registry、Cloud Build、Cloud Run、Cloud Storage、Eventarc 和 Workflows API:
    gcloud services enable artifactregistry.googleapis.com \
        cloudbuild.googleapis.com \
        eventarc.googleapis.com \
        run.googleapis.com \
        storage.googleapis.com \
        workflows.googleapis.com
  5. 为您的工作流创建服务帐号以用于与其他 Google Cloud 服务进行身份验证,并向其授予适当的角色。
    1. 创建服务帐号:
      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
      

      SERVICE_ACCOUNT_NAME 替换为服务帐号的名称。

    2. 向您在上一步中创建的用户代管式服务帐号授予角色。请针对每个 IAM 角色运行以下命令一次,也可以在单个命令中多次使用 --role 标志:
      • roles/eventarc.eventReceiver:用于接收事件
      • roles/logging.logWriter:用于写入日志
      • roles/run.admin:用于执行 Cloud Run 作业
      • roles/workflows.invoker:用于调用工作流
      gcloud projects add-iam-policy-binding PROJECT_ID \
          --member=serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com \
          --role=ROLE
      

      替换以下内容:

      • PROJECT_ID:您在其中创建了服务帐号的项目的 ID
      • ROLE:要授予用户管理的服务帐号的角色
  6. 在为 Cloud Storage 中的直接事件创建触发器之前,请将 Pub/Sub Publisher 角色 (roles/pubsub.publisher) 授予 Cloud Storage 服务代理(Google 管理的服务账号):

    SERVICE_ACCOUNT="$(gsutil kms serviceaccount -p PROJECT_ID)"
    
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:${SERVICE_ACCOUNT}" \
        --role='roles/pubsub.publisher'
    
  7. 如果您在 2021 年 4 月 8 日或之前启用了 Cloud Pub/Sub 服务代理,以支持经过身份验证的 Pub/Sub 推送请求,请向 Google 管理的服务账号授予 Service Account Token Creator 角色 (roles/iam.serviceAccountTokenCreator)。否则,系统会默认授予此角色:
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com \
        --role=roles/iam.serviceAccountTokenCreator
  8. PROJECT_NUMBER 替换为您的 Google Cloud 项目编号。您可以在 Google Cloud 控制台的欢迎页面上或者通过运行以下命令找到项目编号:

    gcloud projects describe PROJECT_ID --format='value(projectNumber)'

Terraform

  1. 如需使用已设置 gcloud CLI 的在线终端,请激活 Cloud Shell:

    在此页面底部,一个 Cloud Shell 会话将启动并显示命令行提示符。该会话可能需要几秒钟来完成初始化。

    Cloud Shell 支持本教程中用于生成伪随机数的 /dev/urandom 命令。

  2. 创建或选择 Google Cloud 项目。
    • 创建 Google Cloud 项目:

      gcloud projects create PROJECT_ID
    • 选择您创建的 Google Cloud 项目:

      gcloud config set project PROJECT_ID
  3. 确保您的 Google Cloud 项目已启用结算功能
  4. 启用 Artifact Registry、Cloud Build、Cloud Run、Cloud Storage、Eventarc 和 Workflows API:
    gcloud services enable artifactregistry.googleapis.com \
        cloudbuild.googleapis.com \
        eventarc.googleapis.com \
        run.googleapis.com \
        storage.googleapis.com \
        workflows.googleapis.com
  5. 为您的工作流创建服务帐号以用于与其他 Google Cloud 服务进行身份验证,并向其授予适当的角色。此外,如需支持来自 Cloud Storage 的直接事件,请将 Pub/Sub Publisher 角色 (roles/pubsub.publisher) 授予 Cloud Storage 服务代理(即 Google 代管式服务帐号)。

    修改 main.tf 文件,如以下示例所示。如需了解详情,请参阅适用于 Terraform 的 Google 提供程序文档

    如需了解如何应用或移除 Terraform 配置,请参阅基本 Terraform 命令

    请注意,在典型的 Terraform 工作流中,您可以一次性应用整个计划。但在本教程中,您可以按特定资源作为目标。例如:

    terraform apply -target="google_service_account.workflows"

    # Used to retrieve project information later
    data "google_project" "project" {}
    
    # Create a dedicated service account
    resource "google_service_account" "workflows" {
      account_id   = "workflows-run-job-sa"
      display_name = "Workflows Cloud Run Job Service Account"
    }
    
    # Grant permission to receive Eventarc events
    resource "google_project_iam_member" "eventreceiver" {
      project = data.google_project.project.id
      role    = "roles/eventarc.eventReceiver"
      member  = "serviceAccount:${google_service_account.workflows.email}"
    }
    
    # Grant permission to write logs
    resource "google_project_iam_member" "logwriter" {
      project = data.google_project.project.id
      role    = "roles/logging.logWriter"
      member  = "serviceAccount:${google_service_account.workflows.email}"
    }
    
    # Grant permission to execute Cloud Run jobs
    resource "google_project_iam_member" "runadmin" {
      project = data.google_project.project.id
      role    = "roles/run.admin"
      member  = "serviceAccount:${google_service_account.workflows.email}"
    }
    
    # Grant permission to invoke workflows
    resource "google_project_iam_member" "workflowsinvoker" {
      project = data.google_project.project.id
      role    = "roles/workflows.invoker"
      member  = "serviceAccount:${google_service_account.workflows.email}"
    }
    
    # Grant the Cloud Storage service agent permission to publish Pub/Sub topics
    data "google_storage_project_service_account" "gcs_account" {}
    resource "google_project_iam_member" "pubsubpublisher" {
      project = data.google_project.project.id
      role    = "roles/pubsub.publisher"
      member  = "serviceAccount:${data.google_storage_project_service_account.gcs_account.email_address}"
    }
    
  6. 如果您在 2021 年 4 月 8 日或之前启用了 Cloud Pub/Sub 服务代理,以支持经过身份验证的 Pub/Sub 推送请求,请向 Google 管理的服务账号授予 Service Account Token Creator 角色 (roles/iam.serviceAccountTokenCreator)。否则,系统会默认授予此角色:
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com \
        --role=roles/iam.serviceAccountTokenCreator
  7. PROJECT_NUMBER 替换为您的 Google Cloud 项目编号。您可以在 Google Cloud 控制台的欢迎页面上或者通过运行以下命令找到项目编号:

    gcloud projects describe PROJECT_ID --format='value(projectNumber)'

创建 Cloud Run 作业

本教程使用 GitHub 中的示例 Cloud Run 作业。该作业从 Cloud Storage 中的输入文件读取数据,并对文件中的每一行执行一些任意处理。

  1. 通过将示例应用代码库克隆到本地机器来获取示例代码:

    git clone https://github.com/GoogleCloudPlatform/jobs-demos.git
    

    或者,您也可以下载该示例的 ZIP 文件并将其解压缩。

  2. 转到包含示例代码的目录:

    cd jobs-demos/parallel-processing
    
  3. 创建一个 Cloud Storage 存储桶来存储可写入并触发事件的输入文件:

    控制台

    1. 在 Google Cloud 控制台中,进入 Cloud Storage 存储桶页面。

      进入“存储桶”

    2. 点击 add 创建
    3. 创建存储桶页面上,输入存储桶的名称:
      input-PROJECT_ID
      PROJECT_ID 替换为您的 Google Cloud 项目的 ID。
    4. 保留其他默认值。
    5. 点击创建

    gcloud

    运行 gcloud storage buckets create 命令:

    gcloud storage buckets create gs://input-PROJECT_ID

    如果请求成功,该命令将返回以下消息:

    Creating gs://input-PROJECT_ID/...

    Terraform

    如需创建 Cloud Storage 存储桶,请使用 google_storage_bucket 资源并修改 main.tf 文件,如以下示例所示。

    如需了解如何应用或移除 Terraform 配置,请参阅基本的 Terraform 命令

    请注意,在典型的 Terraform 工作流中,您可以一次性应用整个计划。但在本教程中,您可以按特定资源作为目标。例如:

    terraform apply -target="random_id.bucket_name_suffix"

    terraform apply -target="google_storage_bucket.default"

    # Cloud Storage bucket names must be globally unique
    resource "random_id" "bucket_name_suffix" {
      byte_length = 4
    }
    
    # Create a Cloud Storage bucket
    resource "google_storage_bucket" "default" {
      name                        = "input-${data.google_project.project.name}-${random_id.bucket_name_suffix.hex}"
      location                    = "us-central1"
      storage_class               = "STANDARD"
      force_destroy               = false
      uniform_bucket_level_access = true
    }
  4. 创建一个用于存储容器映像的 Artifact Registry 标准代码库:

    控制台

    1. 在 Google Cloud 控制台中,转到 Artifact Registry 代码库页面:

      前往制品库

    2. 点击 创建代码库

    3. 输入代码库的名称,例如 my-repo。对于项目中的每个代码库位置,代码库名称不得重复。

    4. 保留默认格式,即应为 Docker

    5. 保留默认模式,即应为标准

    6. 在区域,选择 us-central1 (Iowa)(us-central1 [爱荷华])。

    7. 保留所有其他默认值。

    8. 点击创建

    gcloud

    运行以下命令:

    gcloud artifacts repositories create REPOSITORY \
        --repository-format=docker \
        --location=us-central1

    REPOSITORY 替换为代码库的唯一名称,例如 my-repo。对于项目中的每个代码库位置,代码库名称必须是唯一的。

    Terraform

    如需创建 Artifact Registry 代码库,请使用 google_artifact_registry_repository 资源并修改 main.tf 文件,如以下示例所示。

    请注意,在典型的 Terraform 工作流中,您可以一次性应用整个计划。但在本教程中,您可以按特定资源作为目标。例如:

    terraform apply -target="google_artifact_registry_repository.default"

    # Create an Artifact Registry repository
    resource "google_artifact_registry_repository" "default" {
      location      = "us-central1"
      repository_id = "my-repo"
      format        = "docker"
    }
  5. 使用默认的 Google Cloud Buildpack 构建容器映像:

    export SERVICE_NAME=parallel-job
    gcloud builds submit \
        --pack image=us-central1-docker.pkg.dev/PROJECT_ID/REPOSITORY/${SERVICE_NAME}
    

    REPOSITORY 替换为您的 Artifact Registry 代码库的名称。

    构建可能需要几分钟时间才能完成。

  6. 创建一个部署容器映像的 Cloud Run 作业:

    控制台

    1. 在 Google Cloud 控制台中,前往 Cloud Run 页面:

      转到 Cloud Run

    2. 点击创建作业,以显示创建作业表单。

      1. 在表单中,选择 us-central1-docker.pkg.dev/PROJECT_ID/REPOSITORY/parallel-job:latest 作为 Artifact Registry 容器映像网址。
      2. 可选:对于作业名称,请输入 parallel-job
      3. 可选:对于区域,选择 us-central1(爱荷华)
      4. 对于要在作业中运行的任务数,请输入 10。所有任务都必须成功,作业才能成功。默认情况下,任务会并行执行。
    3. 展开容器、变量和密钥、连接、安全性部分,并保留除以下设置之外的所有默认设置:

      1. 点击常规标签页。

        1. 对于容器命令,请输入 python
        2. 对于容器参数,请输入 process.py
      2. 点击变量和 Secret 标签页。

        1. 点击添加变量,然后输入 INPUT_BUCKET 作为名称和值。input-PROJECT_ID
        2. 点击添加变量,然后输入 INPUT_FILE 作为名称和值。input_file.txt
    4. 如需创建作业,请点击创建

    gcloud

    1. 运行以下命令:

      gcloud run jobs create parallel-job \
          --image us-central1-docker.pkg.dev/PROJECT_ID/REPOSITORY/parallel-job \
          --command python \
          --args process.py \
          --tasks 10 \
          --set-env-vars=INPUT_BUCKET=input-PROJECT_ID,INPUT_FILE=input_file.txt

      如需查看创建作业时可用的选项的完整列表,请参阅 gcloud run jobs create 命令行文档。

    2. 创建作业后,您应该会看到一条表示成功的消息。

    Terraform

    如需创建 Cloud Run 作业,请使用 google_cloud_run_v2_job 资源并修改 main.tf 文件,如以下示例所示。

    请注意,在典型的 Terraform 工作流中,您可以一次性应用整个计划。但在本教程中,您可以按特定资源作为目标。例如:

    terraform apply -target="google_cloud_run_v2_job.default"

    # Create a Cloud Run job
    resource "google_cloud_run_v2_job" "default" {
      name     = "parallel-job"
      location = "us-central1"
    
      template {
        task_count = 10
        template {
          containers {
            image   = "us-central1-docker.pkg.dev/${data.google_project.project.name}/${google_artifact_registry_repository.default.repository_id}/parallel-job:latest"
            command = ["python"]
            args    = ["process.py"]
            env {
              name  = "INPUT_BUCKET"
              value = google_storage_bucket.default.name
            }
            env {
              name  = "INPUT_FILE"
              value = "input_file.txt"
            }
          }
        }
      }
    }

部署执行 Cloud Run 作业的工作流

定义和部署工作流,用于执行您刚刚创建的 Cloud Run 作业。工作流定义由使用 Workflows 语法描述的一系列步骤组成。

控制台

  1. 在 Google Cloud 控制台中,转到工作流页面:

    进入 Workflows

  2. 点击 创建

  3. 输入新工作流的名称,例如 cloud-run-job-workflow

  4. 在区域,选择 us-central1 (Iowa)(us-central1 [爱荷华])。

  5. 服务帐号字段中,选择您之前创建的服务帐号。

    服务帐号充当工作流的身份。您应该已将 Cloud Run Admin 角色授予服务帐号,以便工作流可以执行 Cloud Run 作业。

  6. 点击下一步

  7. 在工作流编辑器中,为您的工作流输入以下定义:

    main:
        params: [event]
        steps:
            - init:
                assign:
                    - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    - event_bucket: ${event.data.bucket}
                    - event_file: ${event.data.name}
                    - target_bucket: ${"input-" + project_id}
                    - job_name: parallel-job
                    - job_location: us-central1
            - check_input_file:
                switch:
                    - condition: ${event_bucket == target_bucket}
                      next: run_job
                    - condition: true
                      next: end
            - run_job:
                call: googleapis.run.v1.namespaces.jobs.run
                args:
                    name: ${"namespaces/" + project_id + "/jobs/" + job_name}
                    location: ${job_location}
                    body:
                        overrides:
                            containerOverrides:
                                env:
                                    - name: INPUT_BUCKET
                                      value: ${event_bucket}
                                    - name: INPUT_FILE
                                      value: ${event_file}
                result: job_execution
            - finish:
                return: ${job_execution}
  8. 点击部署

gcloud

  1. 为工作流创建源代码文件:

    touch cloud-run-job-workflow.yaml
    
  2. 将以下工作流定义复制到源代码文件:

    main:
        params: [event]
        steps:
            - init:
                assign:
                    - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    - event_bucket: ${event.data.bucket}
                    - event_file: ${event.data.name}
                    - target_bucket: ${"input-" + project_id}
                    - job_name: parallel-job
                    - job_location: us-central1
            - check_input_file:
                switch:
                    - condition: ${event_bucket == target_bucket}
                      next: run_job
                    - condition: true
                      next: end
            - run_job:
                call: googleapis.run.v1.namespaces.jobs.run
                args:
                    name: ${"namespaces/" + project_id + "/jobs/" + job_name}
                    location: ${job_location}
                    body:
                        overrides:
                            containerOverrides:
                                env:
                                    - name: INPUT_BUCKET
                                      value: ${event_bucket}
                                    - name: INPUT_FILE
                                      value: ${event_file}
                result: job_execution
            - finish:
                return: ${job_execution}
  3. 输入以下命令以部署工作流:

    gcloud workflows deploy cloud-run-job-workflow \
        --location=us-central1 \
        --source=cloud-run-job-workflow.yaml \
        --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
    

    替换以下内容:

    • SERVICE_ACCOUNT_NAME:您之前创建的服务帐号的名称
    • PROJECT_ID:您的 Google Cloud 项目的 ID

    服务帐号充当工作流的身份。您应该已将 roles/run.admin 角色授予服务帐号,以便工作流可以执行 Cloud Run 作业。

Terraform

如需创建工作流,请使用 google_workflows_workflow 资源并修改 main.tf 文件,如以下示例所示。

如需了解如何应用或移除 Terraform 配置,请参阅基本 Terraform 命令

请注意,在典型的 Terraform 工作流中,您可以一次性应用整个计划。但在本教程中,您可以按特定资源作为目标。例如:

terraform apply -target="google_workflows_workflow.default"

# Create a workflow
resource "google_workflows_workflow" "default" {
  name        = "cloud-run-job-workflow"
  region      = "us-central1"
  description = "Workflow that routes a Cloud Storage event and executes a Cloud Run job"

  # Note that $$ is needed for Terraform
  source_contents = <<EOF
  main:
      params: [event]
      steps:
          - init:
              assign:
                  - project_id: $${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                  - event_bucket: $${event.data.bucket}
                  - event_file: $${event.data.name}
                  - target_bucket: "${google_storage_bucket.default.name}"
                  - job_name: parallel-job
                  - job_location: us-central1
          - check_input_file:
              switch:
                  - condition: $${event_bucket == target_bucket}
                    next: run_job
                  - condition: true
                    next: end
          - run_job:
              call: googleapis.run.v1.namespaces.jobs.run
              args:
                  name: $${"namespaces/" + project_id + "/jobs/" + job_name}
                  location: $${job_location}
                  body:
                      overrides:
                          containerOverrides:
                              env:
                                  - name: INPUT_BUCKET
                                    value: $${event_bucket}
                                  - name: INPUT_FILE
                                    value: $${event_file}
              result: job_execution
          - finish:
              return: $${job_execution}
  EOF
}

工作流会执行以下操作:

  1. init 步骤 - 接受 Cloud Storage 事件作为参数,然后设置必要的变量。

  2. check_input_file 步骤 - 检查事件中指定的 Cloud Storage 存储桶是否为 Cloud Run 作业使用的存储桶。

    • 如果答案为“是”,则工作流会继续执行 run_job 步骤。
    • 如果否,工作流将终止,并停止任何进一步处理。
  3. run_job 步骤 - 使用 Cloud Run Admin API 连接器的 googleapis.run.v1.namespaces.jobs.run 方法执行作业。Cloud Storage 存储桶和数据文件名会作为工作流到作业的替换变量进行传递。

  4. finish 步骤 - 因工作流而返回有关作业执行的信息。

为工作流创建 Eventarc 触发器

若要在每次更新输入数据文件时自动执行该工作流并反过来再执行 Cloud Run 作业,请创建一个 Eventarc 触发器以响应包含输入数据文件的存储桶中的 Cloud Storage 事件。

控制台

  1. 在 Google Cloud 控制台中,转到工作流页面:

    进入 Workflows

  2. 点击工作流的名称,例如 cloud-run-job-workflow

  3. 工作流详情页面上,点击 修改

  4. 修改工作流页面的触发器部分中,点击添加新触发器 > Eventarc

    此时会打开 Eventarc 触发器窗格。

  5. 触发器名称字段中,输入触发器的名称,例如 cloud-run-job-workflow-trigger

  6. 事件提供方列表中,选择 Cloud Storage

  7. 事件列表中,选择 google.cloud.storage.object.v1. finalized

  8. 存储桶字段中,选择包含输入数据文件的存储桶。存储桶名称的格式为 input-PROJECT_ID

  9. 服务帐号字段中,选择您之前创建的服务帐号。

    该服务帐号充当触发器的身份。您应该已向服务帐号授予以下角色:

    • Eventarc 事件接收器:用于接收事件
    • Workflows Invoker:用于执行工作流
  10. 点击保存触发器

    Eventarc 触发器现在会显示在修改工作流页面的触发器部分中。

  11. 点击下一步

  12. 点击部署

gcloud

通过运行以下命令创建 Eventarc 触发器:

gcloud eventarc triggers create cloud-run-job-workflow-trigger \
    --location=us \
    --destination-workflow=cloud-run-job-workflow  \
    --destination-workflow-location=us-central1 \
    --event-filters="type=google.cloud.storage.object.v1.finalized" \
    --event-filters="bucket=input-PROJECT_ID" \
    --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

替换以下内容:

  • PROJECT_ID:您的 Google Cloud 项目的 ID
  • SERVICE_ACCOUNT_NAME:您之前创建的服务帐号的名称。

该服务帐号充当触发器的身份。您应该已向服务帐号授予以下角色:

  • roles/eventarc.eventReceiver:用于接收事件
  • roles/workflows.invoker:用于执行工作流

Terraform

如需创建触发器,请使用 google_eventarc_trigger 资源并修改 main.tf 文件,如以下示例所示。

如需了解如何应用或移除 Terraform 配置,请参阅基本 Terraform 命令

请注意,在典型的 Terraform 工作流中,您可以一次性应用整个计划。但在本教程中,您可以按特定资源作为目标。例如:

terraform apply -target="google_eventarc_trigger.default"

# Create an Eventarc trigger that routes Cloud Storage events to Workflows
resource "google_eventarc_trigger" "default" {
  name     = "cloud-run-job-trigger"
  location = google_workflows_workflow.default.region

  # Capture objects changed in the bucket
  matching_criteria {
    attribute = "type"
    value     = "google.cloud.storage.object.v1.finalized"
  }
  matching_criteria {
    attribute = "bucket"
    value     = google_storage_bucket.default.name
  }

  # Send events to Workflows
  destination {
    workflow = google_workflows_workflow.default.id
  }

  service_account = google_service_account.workflows.email

}

每当在包含输入数据文件的 Cloud Storage 存储桶中上传或覆盖文件时,系统就会以相应的 Cloud Storage 事件作为参数来执行工作流。

触发工作流

通过更新 Cloud Storage 中的输入数据文件来测试端到端系统。

  1. 为输入文件生成新数据,并将其上传到 Cloud Storage 中的 Cloud Run 作业预期位置:

    base64 /dev/urandom | head -c 100000 >input_file.txt
    gsutil cp input_file.txt gs://BUCKET_NAME/input_file.txt
    

    BUCKET_NAME 替换为您的 Cloud Storage 存储桶的名称。

    如果您使用 Terraform 创建了 Cloud Storage 存储桶,则可以通过运行以下命令来检索存储桶的名称:

    gcloud storage buckets list gs://input*
    

    Cloud Run 作业可能需要几分钟才能运行完毕。

  2. 查看作业执行情况,确认 Cloud Run 作业按预期运行:

    gcloud config set run/region us-central1
    gcloud run jobs executions list --job=parallel-job
    

    您应该会在输出中看到作业执行成功,表示 10/10 任务已完成。

详细了解如何使用事件或 Pub/Sub 消息触发工作流

清理

如果您为本教程创建了一个新项目,请删除项目。 如果您使用的是现有项目,希望保留此项目且不保留本教程中添加的任何更改,请删除为教程创建的资源

删除项目

若要避免产生费用,最简单的方法是删除您为本教程创建的项目。

要删除项目,请执行以下操作:

  1. 在 Google Cloud 控制台中,进入管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

删除教程资源

删除您在本教程中创建的资源:

  1. 删除 Eventarc 触发器:

    gcloud eventarc triggers delete cloud-run-job-workflow-trigger --location=us
    
  2. 删除工作流:

    gcloud workflows delete cloud-run-job-workflow --location=us-central1
    
  3. 删除 Cloud Run 作业:

    gcloud run jobs delete parallel-job
    
  4. 删除为输入数据创建的 Cloud Storage 存储桶:

    gcloud storage rm --recursive gs://input-PROJECT_ID/
    
  5. 删除 Artifact Registry 代码库:

    gcloud artifacts repositories delete REPOSITORY --location=us-central1
    

后续步骤