使用 Workflows 运行批量作业


Batch 是一项全代管式服务,可让您在 Compute Engine 虚拟机 (VM) 实例上安排工作负载、将工作负载加入队列并执行批处理。Batch 代表您预配资源和管理容量,允许批处理工作负载大规模运行。

借助 Workflows,您可以按照使用 Workflows 语法定义的顺序执行所需的服务。

在本教程中,您将使用适用于 Batch 的 Workflows 连接器安排和运行在两个 Compute Engine 虚拟机上并行执行六项任务的 Batch 作业。通过同时使用 Batch 和 Workflows,您可以结合它们提供的优势,并高效地预配和编排整个流程。

目标

在此教程中,您将学习以下操作:

  1. 为 Docker 容器映像创建 Artifact Registry 代码库。
  2. 从 GitHub 获取批处理工作负载的代码:一个示例程序,可批量生成 10000 个质数。
  3. 为工作负载构建 Docker 映像。
  4. 部署并执行执行以下操作的工作流:
    1. 创建一个 Cloud Storage 存储桶,用于存储质数生成器的结果。
    2. 安排并运行一个批处理作业,该作业在两个 Compute Engine 虚拟机上将 Docker 容器作为六个任务并行运行。
    3. (可选)在批量作业完成后将其删除。
  5. 确认结果符合预期,并且生成的质数批次存储在 Cloud Storage 中。

您可以在 Google Cloud 控制台中运行以下大部分命令,也可以在终端或 Cloud Shell 中使用 Google Cloud CLI 运行所有命令。

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

准备工作

您的组织定义的安全限制条件可能会导致您无法完成以下步骤。如需了解相关问题排查信息,请参阅在受限的 Google Cloud 环境中开发应用

控制台

  1. 在 Google Cloud 控制台的“项目选择器”页面上,选择或创建 Google Cloud 项目

    前往项目选择器

  2. 确保您的 Google Cloud 项目已启用结算功能。了解如何检查项目是否已启用结算功能

  3. 启用 Artifact Registry、Batch、Cloud Build、Compute Engine、Workflow Executions 和 Workflows API。

    启用 API

  4. 为您的工作流创建用于与其他 Google Cloud 服务进行身份验证的服务帐号,并向其授予适当的角色:

    1. 在 Google Cloud 控制台中,进入创建服务账号页面。

      转到“创建服务账号”

    2. 选择您的项目。

    3. 服务账号名称字段中,输入一个名称。Google Cloud 控制台会根据此名称填充服务账号 ID 字段。

      服务账号说明字段中,输入说明。例如 Service account for tutorial

    4. 点击创建并继续

    5. 选择角色列表中,过滤出以下角色,以将其授予您在上一步中创建的用户代管式服务帐号:

      • 批量作业编辑器:用于修改批量作业。
      • Logs Writer:用于写入日志。
      • Storage Admin:用于控制 Cloud Storage 资源。

      如需添加其他角色,请点击 添加其他角色,然后添加其他各个角色。

    6. 点击继续

    7. 如需完成账号的创建过程,请点击完成

  5. 将默认服务帐号的 IAM Service Account User 角色授予上一步中创建的用户代管式服务帐号。启用 Compute Engine API 后,默认服务帐号是 Compute Engine 默认服务帐号 (PROJECT_NUMBER-compute@developer.gserviceaccount.com),且权限通常通过 roles/iam.serviceAccountUser 角色分配。

    1. 服务帐号页面上,点击默认服务帐号的电子邮件地址 (PROJECT_NUMBER-compute@developer.gserviceaccount.com)。

    2. 点击权限标签页。

    3. 点击 授予访问权限按钮。

    4. 如需添加新的主账号,请输入您的服务帐号 (SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com) 的电子邮件地址。

    5. 选择角色列表中,依次选择服务帐号 > Service Account User 角色。

    6. 点击保存

gcloud

  1. 在 Google Cloud 控制台中,激活 Cloud Shell。

    激活 Cloud Shell

    Cloud Shell 会话随即会在 Google Cloud 控制台的底部启动,并显示命令行提示符。Cloud Shell 是一个已安装 Google Cloud CLI 且已为当前项目设置值的 Shell 环境。该会话可能需要几秒钟时间来完成初始化。

  2. 确保您的 Google Cloud 项目已启用结算功能。 了解如何检查项目是否已启用结算功能

  3. 启用 Artifact Registry、Batch、Cloud Build、Compute Engine Workflow Executions 和 Workflows API。

    gcloud services enable artifactregistry.googleapis.com \
      batch.googleapis.com \
      cloudbuild.googleapis.com \
      compute.googleapis.com \
      workflowexecutions.googleapis.com \
      workflows.googleapis.com
    
  4. 为您的工作流创建用于与其他 Google Cloud 服务进行身份验证的服务帐号,并向其授予适当的角色。

    1. 创建服务账号:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
      

      SERVICE_ACCOUNT_NAME 替换为服务帐号的名称。

    2. 向您在上一步中创建的用户代管式服务帐号授予角色。针对以下每个 IAM 角色运行以下命令一次:

      • roles/batch.jobsEditor:用于修改批量作业。
      • roles/logging.logWriter:用于写入日志。
      • roles/storage.admin:用于控制 Cloud Storage 资源。
      gcloud projects add-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com \
        --role=ROLE
      

      替换以下内容:

      • PROJECT_ID:您在其中创建了服务帐号的项目的 ID
      • ROLE:要授予的角色
  5. 将默认服务帐号的 IAM Service Account User 角色授予您在上一步中创建的用户代管式服务帐号。启用 Compute Engine API 后,默认服务帐号是 Compute Engine 默认服务帐号 (PROJECT_NUMBER-compute@developer.gserviceaccount.com),且权限通常通过 roles/iam.serviceAccountUser 角色分配。

    PROJECT_NUMBER=$(gcloud projects describe PROJECT_ID --format='value(projectNumber)')
    gcloud iam service-accounts add-iam-policy-binding \
      $PROJECT_NUMBER-compute@developer.gserviceaccount.com \
      --member=serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com \
      --role=roles/iam.serviceAccountUser
    

创建 Artifact Registry 仓库

创建一个存储 Docker 容器映像的代码库。

控制台

  1. 在 Google Cloud 控制台中,转到代码库页面。

    前往制品库

  2. 点击 创建代码库

  3. 输入 containers 作为代码库名称。

  4. 格式字段中,选择 Docker

  5. 对于位置类型,选择区域

  6. 区域列表中,选择 us-central1

  7. 点击创建

gcloud

运行以下命令:

  gcloud artifacts repositories create containers \
    --repository-format=docker \
    --location=us-central1

您在 us-central1 区域中创建了一个名为 containers 的 Artifact Registry 代码库。如需详细了解支持的区域,请参阅 Artifact Registry 位置

获取代码示例

Google Cloud 将本教程的应用源代码存储在 GitHub 中。您可以克隆该代码库或下载示例。

  1. 将示例应用代码库克隆到本地机器:

    git clone https://github.com/GoogleCloudPlatform/batch-samples.git
    

    或者,您也可以下载 main.zip 文件中的示例并将其解压缩。

  2. 转到包含示例代码的目录:

    cd batch-samples/primegen
    

现在,您已经拥有了开发环境中的应用源代码。

使用 Cloud Build 构建 Docker 映像

Dockerfile 包含使用 Cloud Build 构建 Docker 映像所需的信息。运行以下命令进行构建:

gcloud builds submit \
  -t us-central1-docker.pkg.dev/PROJECT_ID/containers/primegen-service:v1 PrimeGenService/

PROJECT_ID 替换为您的 Google Cloud 项目 ID。

构建完成后,您应该会看到类似于以下内容的输出:

DONE
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
ID: a54818cc-5d14-467b-bfda-5fc9590af68c
CREATE_TIME: 2022-07-29T01:48:50+00:00
DURATION: 48S
SOURCE: gs://project-name_cloudbuild/source/1659059329.705219-17aee3a424a94679937a7200fab15bcf.tgz
IMAGES: us-central1-docker.pkg.dev/project-name/containers/primegen-service:v1
STATUS: SUCCESS

使用 Dockerfile,您构建了名为 primegen-service 的 Docker 映像,并将该映像推送到了名为 containers 的 Artifact Registry 代码库。

部署用于安排和运行批量作业的工作流

以下工作流在两个 Compute Engine 虚拟机上安排并运行一个 Batch 作业,该作业将 Docker 容器作为六个任务并行运行。结果生成了六批质数,并存储在 Cloud Storage 存储桶中。

控制台

  1. 在 Google Cloud 控制台中,前往工作流页面。

    进入 Workflows

  2. 点击 创建

  3. 输入新工作流的名称,例如 batch-workflow

  4. 区域列表中,选择 us-central1

  5. 选择您之前创建的服务帐号

  6. 点击下一步

  7. 在工作流编辑器中,为工作流输入以下定义:

    YAML

    main:
      params: [args]
      steps:
        - init:
            assign:
              - projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
              - region: "us-central1"
              - imageUri: ${region + "-docker.pkg.dev/" + projectId + "/containers/primegen-service:v1"}
              - jobId: ${"job-primegen-" + string(int(sys.now()))}
              - bucket: ${projectId + "-" + jobId}
        - createBucket:
            call: googleapis.storage.v1.buckets.insert
            args:
              query:
                project: ${projectId}
              body:
                name: ${bucket}
        - logCreateBucket:
            call: sys.log
            args:
              data: ${"Created bucket " + bucket}
        - logCreateBatchJob:
            call: sys.log
            args:
              data: ${"Creating and running the batch job " + jobId}
        - createAndRunBatchJob:
            call: googleapis.batch.v1.projects.locations.jobs.create
            args:
                parent: ${"projects/" + projectId + "/locations/" + region}
                jobId: ${jobId}
                body:
                  taskGroups:
                    taskSpec:
                      runnables:
                        - container:
                            imageUri: ${imageUri}
                          environment:
                            variables:
                              BUCKET: ${bucket}
                    # Run 6 tasks on 2 VMs
                    taskCount: 6
                    parallelism: 2
                  logsPolicy:
                    destination: CLOUD_LOGGING
            result: createAndRunBatchJobResponse
        # You can delete the batch job or keep it for debugging
        - logDeleteBatchJob:
            call: sys.log
            args:
              data: ${"Deleting the batch job " + jobId}
        - deleteBatchJob:
            call: googleapis.batch.v1.projects.locations.jobs.delete
            args:
                name: ${"projects/" + projectId + "/locations/" + region + "/jobs/" + jobId}
            result: deleteResult
        - returnResult:
            return:
              jobId: ${jobId}
              bucket: ${bucket}

    JSON

    {
      "main": {
        "params": [
          "args"
        ],
        "steps": [
          {
            "init": {
              "assign": [
                {
                  "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}"
                },
                {
                  "region": "us-central1"
                },
                {
                  "imageUri": "${region + \"-docker.pkg.dev/\" + projectId + \"/containers/primegen-service:v1\"}"
                },
                {
                  "jobId": "${\"job-primegen-\" + string(int(sys.now()))}"
                },
                {
                  "bucket": "${projectId + \"-\" + jobId}"
                }
              ]
            }
          },
          {
            "createBucket": {
              "call": "googleapis.storage.v1.buckets.insert",
              "args": {
                "query": {
                  "project": "${projectId}"
                },
                "body": {
                  "name": "${bucket}"
                }
              }
            }
          },
          {
            "logCreateBucket": {
              "call": "sys.log",
              "args": {
                "data": "${\"Created bucket \" + bucket}"
              }
            }
          },
          {
            "logCreateBatchJob": {
              "call": "sys.log",
              "args": {
                "data": "${\"Creating and running the batch job \" + jobId}"
              }
            }
          },
          {
            "createAndRunBatchJob": {
              "call": "googleapis.batch.v1.projects.locations.jobs.create",
              "args": {
                "parent": "${\"projects/\" + projectId + \"/locations/\" + region}",
                "jobId": "${jobId}",
                "body": {
                  "taskGroups": {
                    "taskSpec": {
                      "runnables": [
                        {
                          "container": {
                            "imageUri": "${imageUri}"
                          },
                          "environment": {
                            "variables": {
                              "BUCKET": "${bucket}"
                            }
                          }
                        }
                      ]
                    },
                    "taskCount": 6,
                    "parallelism": 2
                  },
                  "logsPolicy": {
                    "destination": "CLOUD_LOGGING"
                  }
                }
              },
              "result": "createAndRunBatchJobResponse"
            }
          },
          {
            "logDeleteBatchJob": {
              "call": "sys.log",
              "args": {
                "data": "${\"Deleting the batch job \" + jobId}"
              }
            }
          },
          {
            "deleteBatchJob": {
              "call": "googleapis.batch.v1.projects.locations.jobs.delete",
              "args": {
                "name": "${\"projects/\" + projectId + \"/locations/\" + region + \"/jobs/\" + jobId}"
              },
              "result": "deleteResult"
            }
          },
          {
            "returnResult": {
              "return": {
                "jobId": "${jobId}",
                "bucket": "${bucket}"
              }
            }
          }
        ]
      }
    }
    
  8. 点击部署

gcloud

  1. 为工作流创建源代码文件:

    touch batch-workflow.JSON_OR_YAML
    

    根据工作流的格式,将 JSON_OR_YAML 替换为 yamljson

  2. 在文本编辑器中,将以下工作流复制到源代码文件:

    YAML

    main:
      params: [args]
      steps:
        - init:
            assign:
              - projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
              - region: "us-central1"
              - imageUri: ${region + "-docker.pkg.dev/" + projectId + "/containers/primegen-service:v1"}
              - jobId: ${"job-primegen-" + string(int(sys.now()))}
              - bucket: ${projectId + "-" + jobId}
        - createBucket:
            call: googleapis.storage.v1.buckets.insert
            args:
              query:
                project: ${projectId}
              body:
                name: ${bucket}
        - logCreateBucket:
            call: sys.log
            args:
              data: ${"Created bucket " + bucket}
        - logCreateBatchJob:
            call: sys.log
            args:
              data: ${"Creating and running the batch job " + jobId}
        - createAndRunBatchJob:
            call: googleapis.batch.v1.projects.locations.jobs.create
            args:
                parent: ${"projects/" + projectId + "/locations/" + region}
                jobId: ${jobId}
                body:
                  taskGroups:
                    taskSpec:
                      runnables:
                        - container:
                            imageUri: ${imageUri}
                          environment:
                            variables:
                              BUCKET: ${bucket}
                    # Run 6 tasks on 2 VMs
                    taskCount: 6
                    parallelism: 2
                  logsPolicy:
                    destination: CLOUD_LOGGING
            result: createAndRunBatchJobResponse
        # You can delete the batch job or keep it for debugging
        - logDeleteBatchJob:
            call: sys.log
            args:
              data: ${"Deleting the batch job " + jobId}
        - deleteBatchJob:
            call: googleapis.batch.v1.projects.locations.jobs.delete
            args:
                name: ${"projects/" + projectId + "/locations/" + region + "/jobs/" + jobId}
            result: deleteResult
        - returnResult:
            return:
              jobId: ${jobId}
              bucket: ${bucket}

    JSON

    {
      "main": {
        "params": [
          "args"
        ],
        "steps": [
          {
            "init": {
              "assign": [
                {
                  "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}"
                },
                {
                  "region": "us-central1"
                },
                {
                  "imageUri": "${region + \"-docker.pkg.dev/\" + projectId + \"/containers/primegen-service:v1\"}"
                },
                {
                  "jobId": "${\"job-primegen-\" + string(int(sys.now()))}"
                },
                {
                  "bucket": "${projectId + \"-\" + jobId}"
                }
              ]
            }
          },
          {
            "createBucket": {
              "call": "googleapis.storage.v1.buckets.insert",
              "args": {
                "query": {
                  "project": "${projectId}"
                },
                "body": {
                  "name": "${bucket}"
                }
              }
            }
          },
          {
            "logCreateBucket": {
              "call": "sys.log",
              "args": {
                "data": "${\"Created bucket \" + bucket}"
              }
            }
          },
          {
            "logCreateBatchJob": {
              "call": "sys.log",
              "args": {
                "data": "${\"Creating and running the batch job \" + jobId}"
              }
            }
          },
          {
            "createAndRunBatchJob": {
              "call": "googleapis.batch.v1.projects.locations.jobs.create",
              "args": {
                "parent": "${\"projects/\" + projectId + \"/locations/\" + region}",
                "jobId": "${jobId}",
                "body": {
                  "taskGroups": {
                    "taskSpec": {
                      "runnables": [
                        {
                          "container": {
                            "imageUri": "${imageUri}"
                          },
                          "environment": {
                            "variables": {
                              "BUCKET": "${bucket}"
                            }
                          }
                        }
                      ]
                    },
                    "taskCount": 6,
                    "parallelism": 2
                  },
                  "logsPolicy": {
                    "destination": "CLOUD_LOGGING"
                  }
                }
              },
              "result": "createAndRunBatchJobResponse"
            }
          },
          {
            "logDeleteBatchJob": {
              "call": "sys.log",
              "args": {
                "data": "${\"Deleting the batch job \" + jobId}"
              }
            }
          },
          {
            "deleteBatchJob": {
              "call": "googleapis.batch.v1.projects.locations.jobs.delete",
              "args": {
                "name": "${\"projects/\" + projectId + \"/locations/\" + region + \"/jobs/\" + jobId}"
              },
              "result": "deleteResult"
            }
          },
          {
            "returnResult": {
              "return": {
                "jobId": "${jobId}",
                "bucket": "${bucket}"
              }
            }
          }
        ]
      }
    }
    
  3. 输入以下命令以部署工作流:

    gcloud workflows deploy batch-workflow \
      --source=batch-workflow.yaml \
      --location=us-central1 \
      --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
    

    SERVICE_ACCOUNT_NAME 替换为您之前创建的服务帐号的名称。

执行工作流

执行某个工作流会运行与该工作流关联的当前工作流定义。

控制台

  1. 在 Google Cloud 控制台中,前往工作流页面。

    进入 Workflows

  2. 工作流页面上,点击批处理工作流工作流以转到其详情页面。

  3. Workflow details 页面上,点击 Execute

  4. 再次点击执行

    工作流执行过程应该需要几分钟。

  5. 输出窗格中查看工作流的结果。

    结果应如下所示:

    {
      "bucket": "project-name-job-primegen-TIMESTAMP",
      "jobId": "job-primegen-TIMESTAMP"
    }
    

gcloud

  1. 执行工作流:

    gcloud workflows run batch-workflow \
      --location=us-central1

    工作流执行过程应该需要几分钟。

  2. 您可以检查长时间运行的执行的状态

  3. 如需获取上次已完成的执行的状态,请运行以下命令:

    gcloud workflows executions describe-last

    结果应类似于以下内容:

    name: projects/PROJECT_NUMBER/locations/us-central1/workflows/batch-workflow/executions/EXECUTION_ID
    result: '{"bucket":"project-name-job-primegen-TIMESTAMP","jobId":"job-primegen-TIMESTAMP"}'
    startTime: '2022-07-29T16:08:39.725306421Z'
    state: SUCCEEDED
    status:
      currentSteps:
      - routine: main
        step: returnResult
    workflowRevisionId: 000001-9ba
    

列出输出存储桶中的对象

您可以通过列出 Cloud Storage 输出存储桶中的对象来确认结果是否符合预期。

控制台

  1. 在 Google Cloud 控制台中,进入 Cloud Storage 存储桶页面。

    进入“存储桶”

  2. 在存储桶列表中,点击您要查看其内容的存储桶的名称。

    结果应类似于以下内容,其中总共包含六个文件,每个文件都列出一批 10,000 个质数:

    primes-1-10000.txt
    primes-10001-20000.txt
    primes-20001-30000.txt
    primes-30001-40000.txt
    primes-40001-50000.txt
    primes-50001-60000.txt
    

gcloud

  1. 检索输出存储桶名称:

    gsutil ls

    输出类似于以下内容:

    gs://PROJECT_ID-job-primegen-TIMESTAMP/

  2. 列出输出存储桶中的对象:

    gsutil ls -r gs://PROJECT_ID-job-primegen-TIMESTAMP/**

    TIMESTAMP 替换为上一条命令返回的时间戳。

    输出内容应类似于以下内容,其中总共包含六个文件,每个文件都列出一批 10,000 个质数:

    gs://project-name-job-primegen-TIMESTAMP/primes-1-10000.txt
    gs://project-name-job-primegen-TIMESTAMP/primes-10001-20000.txt
    gs://project-name-job-primegen-TIMESTAMP/primes-20001-30000.txt
    gs://project-name-job-primegen-TIMESTAMP/primes-30001-40000.txt
    gs://project-name-job-primegen-TIMESTAMP/primes-40001-50000.txt
    gs://project-name-job-primegen-TIMESTAMP/primes-50001-60000.txt
    

清理

如果您为本教程创建了一个新项目,请删除项目。 如果您使用的是现有项目,希望保留此项目且不保留本教程中添加的任何更改,请删除为教程创建的资源

删除项目

若要避免产生费用,最简单的方法是删除您为本教程创建的项目。

要删除项目,请执行以下操作:

  1. 在 Google Cloud 控制台中,进入管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

删除在本教程中创建的资源

  1. 删除批量作业:

    1. 首先检索作业名称:

      gcloud batch jobs list --location=us-central1
      

      输出应类似如下所示:

      NAME: projects/project-name/locations/us-central1/jobs/job-primegen-TIMESTAMP
      STATE: SUCCEEDED
      

      其中,job-primegen-TIMESTAMP 是批处理作业的名称。

    2. 删除作业:

      gcloud batch jobs delete BATCH_JOB_NAME --location us-central1
      
  2. 删除工作流:

    gcloud workflows delete WORKFLOW_NAME
    
  3. 删除容器代码库:

    gcloud artifacts repositories delete REPOSITORY_NAME --location=us-central1
    
  4. Cloud Build 使用 Cloud Storage 来存储构建资源。 如需删除 Cloud Storage 存储桶,请参阅删除存储分区

后续步骤