执行 Cloud Run 作业,以处理保存在 Cloud Storage 中的事件数据


在执行更复杂的数据处理或编排现有作业系统的工作流中,您可以使用 Workflows 执行 Cloud Run 作业

本教程演示了如何使用 Workflows 执行 Cloud Run 作业,以处理存储在 Cloud Storage 存储桶中的事件数据。通过将事件载荷存储在 Cloud Storage 存储桶中,您可以使用客户管理的加密密钥来加密数据,如果将事件数据作为环境变量传递给 Cloud Run 作业,则无法使用这种方法。

下图是简要概览:

Pub/Sub 事件由 Eventarc 触发器路由到 Workflows,并保存在 Cloud Storage 存储桶中。Cloud Run 作业会处理存储在存储桶中的事件数据。

目标

在此教程中,您将学习以下操作:

  1. 创建一个 Cloud Run 作业,用于处理存储在 Cloud Storage 存储桶中的事件数据。
  2. 部署执行以下操作的工作流:
    1. 接收事件作为参数。
    2. 将事件载荷数据写入 Cloud Storage 存储桶。
    3. 使用 Cloud Run Admin API 连接器执行 Cloud Run 作业。
  3. 创建一个 Pub/Sub 主题,以便您可以向该主题发布消息。本教程使用 Pub/Sub 事件作为示例,说明如何使用 Workflows 路由事件,并将事件保存到 Cloud Storage,以便 Cloud Run 作业可以处理事件数据。
  4. 创建一个 Eventarc 触发器,用于在消息写入 Pub/Sub 主题时执行工作流。
  5. 向 Pub/Sub 主题写入消息来触发工作流。

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

准备工作

您的组织定义的安全限制条件可能会导致您无法完成以下步骤。如需了解相关问题排查信息,请参阅在受限的 Google Cloud 环境中开发应用

在开始本教程之前,您必须启用特定 API 并创建用户代管式服务帐号。您必须向该服务帐号授予必要的角色和权限,以便您可以使用 Eventarc 触发器路由事件并执行工作流。

控制台

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目

    转到“项目选择器”

  3. 确保您的 Google Cloud 项目已启用结算功能

  4. 启用 Cloud Build, Cloud Run, Cloud Storage, Eventarc, and Workflows API。

    启用 API

  5. 创建服务帐号:

    1. 在 Google Cloud 控制台中,转到创建服务帐号页面。

      转到“创建服务帐号”
    2. 选择您的项目。
    3. 服务帐号名称字段中,输入一个名称。Google Cloud 控制台会根据此名称填充服务帐号 ID 字段。

      服务帐号说明字段中,输入说明。例如,Service account for quickstart

    4. 点击创建并继续
    5. 向服务帐号授予以下角色: Cloud Run Admin, Eventarc Event Receiver, Logs Writer, Storage Object Creator, Workflows Invoker.

      如需授予角色,请找到选择角色列表,然后选择相应角色。

      如需授予其他角色,请点击 添加其他角色,然后添加其他各个角色。

    6. 点击继续
    7. 点击完成以完成服务帐号的创建过程。

  6. 在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目

    转到“项目选择器”

  7. 确保您的 Google Cloud 项目已启用结算功能

  8. 启用 Cloud Build, Cloud Run, Cloud Storage, Eventarc, and Workflows API。

    启用 API

  9. 创建服务帐号:

    1. 在 Google Cloud 控制台中,转到创建服务帐号页面。

      转到“创建服务帐号”
    2. 选择您的项目。
    3. 服务帐号名称字段中,输入一个名称。Google Cloud 控制台会根据此名称填充服务帐号 ID 字段。

      服务帐号说明字段中,输入说明。例如,Service account for quickstart

    4. 点击创建并继续
    5. 向服务帐号授予以下角色: Cloud Run Admin, Eventarc Event Receiver, Logs Writer, Storage Object Creator, Workflows Invoker.

      如需授予角色,请找到选择角色列表,然后选择相应角色。

      如需授予其他角色,请点击 添加其他角色,然后添加其他各个角色。

    6. 点击继续
    7. 点击完成以完成服务帐号的创建过程。

  10. 如果您在 2021 年 4 月 8 日或之前启用了 Cloud Pub/Sub 服务代理,以支持经过身份验证的 Pub/Sub 推送请求,请向 Google 代管式服务帐号授予 Service Account Token Creator 角色 (roles/iam.serviceAccountTokenCreator)。否则,系统会默认授予此角色:
    1. 在 Google Cloud 控制台中,前往 IAM 页面。

      转到 IAM

    2. 选中包括 Google 提供的角色授权复选框。
    3. 名称列中,找到 Cloud Pub/Sub 服务帐号,然后点击相应行中的 修改主帐号
    4. 点击 添加角色添加其他角色
    5. 选择角色列表中,过滤出 Service Account Token Creator,然后选择角色。
    6. 点击保存
  11. 在 Google Cloud 控制台中,激活 Cloud Shell。

    激活 Cloud Shell

    Cloud Shell 会话随即会在 Google Cloud 控制台的底部启动,并显示命令行提示符。Cloud Shell 是一个已安装 Google Cloud CLI 且已为当前项目设置值的 Shell 环境。该会话可能需要几秒钟时间来完成初始化。

  12. Cloud Shell 支持本教程中的命令行步骤。

gcloud

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 安装 Google Cloud CLI。
  3. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  4. 创建或选择 Google Cloud 项目

    • 创建 Google Cloud 项目:

      gcloud projects create PROJECT_ID

      PROJECT_ID 替换为您要创建的 Google Cloud 项目的名称。

    • 选择您创建的 Google Cloud 项目:

      gcloud config set project PROJECT_ID

      PROJECT_ID 替换为您的 Google Cloud 项目 名称。

  5. 确保您的 Google Cloud 项目已启用结算功能

  6. Enable the Cloud Build, Cloud Run, Cloud Storage, Eventarc, and Workflows APIs:

    gcloud services enable cloudbuild.googleapis.com run.googleapis.com storage.googleapis.com eventarc.googleapis.com workflows.googleapis.com
  7. 设置身份验证:

    1. 创建服务帐号:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      SERVICE_ACCOUNT_NAME 替换为服务帐号的名称。

    2. 向服务帐号授予角色。对以下每个 IAM 角色运行以下命令一次:roles/eventarc.eventReceiver, roles/logging.logWriter, roles/run.admin, roles/storage.objectCreator, roles/workflows.invoker

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      请替换以下内容:

      • SERVICE_ACCOUNT_NAME:服务帐号的名称
      • PROJECT_ID:您在其中创建服务帐号的项目的 ID
      • ROLE:要授予的角色
  8. 安装 Google Cloud CLI。
  9. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  10. 创建或选择 Google Cloud 项目

    • 创建 Google Cloud 项目:

      gcloud projects create PROJECT_ID

      PROJECT_ID 替换为您要创建的 Google Cloud 项目的名称。

    • 选择您创建的 Google Cloud 项目:

      gcloud config set project PROJECT_ID

      PROJECT_ID 替换为您的 Google Cloud 项目 名称。

  11. 确保您的 Google Cloud 项目已启用结算功能

  12. Enable the Cloud Build, Cloud Run, Cloud Storage, Eventarc, and Workflows APIs:

    gcloud services enable cloudbuild.googleapis.com run.googleapis.com storage.googleapis.com eventarc.googleapis.com workflows.googleapis.com
  13. 设置身份验证:

    1. 创建服务帐号:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      SERVICE_ACCOUNT_NAME 替换为服务帐号的名称。

    2. 向服务帐号授予角色。对以下每个 IAM 角色运行以下命令一次:roles/eventarc.eventReceiver, roles/logging.logWriter, roles/run.admin, roles/storage.objectCreator, roles/workflows.invoker

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      请替换以下内容:

      • SERVICE_ACCOUNT_NAME:服务帐号的名称
      • PROJECT_ID:您在其中创建服务帐号的项目的 ID
      • ROLE:要授予的角色
  14. 如果您在 2021 年 4 月 8 日或之前启用了 Cloud Pub/Sub 服务代理,以支持经过身份验证的 Pub/Sub 推送请求,请向 Google 管理的服务账号授予 Service Account Token Creator 角色 (roles/iam.serviceAccountTokenCreator)。否则,系统会默认授予此角色:
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com \
        --role=roles/iam.serviceAccountTokenCreator
  15. PROJECT_NUMBER 替换为您的 Google Cloud 项目编号。您可以在 Google Cloud 控制台的欢迎页面上或者通过运行以下命令找到项目编号:

    gcloud projects describe PROJECT_ID --format='value(projectNumber)'

创建 Cloud Run 作业

本教程使用了示例代码,您可以在 GitHub 上找到这些代码。部署脚本会构建容器映像以创建 Cloud Run 作业。该脚本还会创建一个 Cloud Storage 存储桶。Cloud Run 作业会读取存储在 Cloud Storage 存储桶中的任何事件数据,然后输出事件数据。

  1. 通过将示例应用代码库克隆到本地机器来获取示例代码:

    git clone https://github.com/GoogleCloudPlatform/workflows-demos.git
    

    或者,您也可以将示例下载为 ZIP 文件

  2. 转到包含示例代码的目录:

    cd workflows-demos/cloud-run-jobs-payload-gcs/message-payload-job
    
  3. 通过运行部署脚本来创建 Cloud Run 作业:

    ./deploy-job.sh
    

该脚本会创建一个名为 message-payload-PROJECT_ID 的 Cloud Storage 存储桶,其中 PROJECT_ID 是您的 Google Cloud 项目的 ID。系统还会创建一个名为 message-payload-job 的 Cloud Run 作业。

部署执行 Cloud Run 作业的工作流

定义和部署工作流,用于执行您刚刚创建的 Cloud Run 作业。工作流定义由使用 Workflows 语法描述的一系列步骤组成。

工作流接收到事件,将事件数据保存到 Cloud Storage 存储桶,然后执行 Cloud Run 作业来处理事件数据。

控制台

  1. 在 Google Cloud 控制台中,转到工作流页面:

    进入 Workflows

  2. 点击 创建

  3. 输入新工作流的名称,例如 message-payload-workflow

  4. 选择合适的区域;例如 us-central1

  5. 服务帐号字段中,选择您之前创建的服务帐号。

    服务帐号充当工作流的身份。您应该已向服务帐号授予以下角色:

    • Cloud Run Admin:用于执行 Cloud Run 作业
    • Logs Writer:用于写入日志条目
    • Storage Object Creator:用于在 Cloud Storage 中创建对象
  6. 点击下一步

  7. 在工作流编辑器中,为您的工作流输入以下定义:

    main:
        params: [event]
        steps:
            - init:
                assign:
                    - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    - event_bucket: ${"message-payload-" + project_id}
                    - event_file: ${event.id + ".data.json"}
                    - job_name: message-payload-job
                    - job_location: us-central1
            - log_event:
                call: sys.log
                args:
                    data: ${event}
            - write_payload_to_gcs:
                call: http.post
                args:
                    url: ${"https://storage.googleapis.com/upload/storage/v1/b/" + event_bucket + "/o"}
                    auth:
                        type: OAuth2
                    query:
                        name: ${event_file}
                    body:
                        ${event.data}
            - run_job_to_process_payload:
                call: googleapis.run.v1.namespaces.jobs.run
                args:
                    name: ${"namespaces/" + project_id + "/jobs/" + job_name}
                    location: ${job_location}
                    body:
                        overrides:
                            containerOverrides:
                                env:
                                    - name: INPUT_BUCKET
                                      value: ${event_bucket}
                                    - name: INPUT_FILE
                                      value: ${event_file}
                result: job_execution
            - finish:
                return: ${job_execution}
  8. 点击部署

gcloud

  1. 为工作流创建源代码文件:

    touch message-payload-workflow.yaml
    
  2. 将以下工作流定义复制到 message-payload-workflow.yaml

    main:
        params: [event]
        steps:
            - init:
                assign:
                    - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    - event_bucket: ${"message-payload-" + project_id}
                    - event_file: ${event.id + ".data.json"}
                    - job_name: message-payload-job
                    - job_location: us-central1
            - log_event:
                call: sys.log
                args:
                    data: ${event}
            - write_payload_to_gcs:
                call: http.post
                args:
                    url: ${"https://storage.googleapis.com/upload/storage/v1/b/" + event_bucket + "/o"}
                    auth:
                        type: OAuth2
                    query:
                        name: ${event_file}
                    body:
                        ${event.data}
            - run_job_to_process_payload:
                call: googleapis.run.v1.namespaces.jobs.run
                args:
                    name: ${"namespaces/" + project_id + "/jobs/" + job_name}
                    location: ${job_location}
                    body:
                        overrides:
                            containerOverrides:
                                env:
                                    - name: INPUT_BUCKET
                                      value: ${event_bucket}
                                    - name: INPUT_FILE
                                      value: ${event_file}
                result: job_execution
            - finish:
                return: ${job_execution}
  3. 输入以下命令以部署工作流:

    gcloud workflows deploy message-payload-workflow \
        --location=us-central1 \
        --source=message-payload-workflow.yaml \
        --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
    

    替换以下内容:

    • SERVICE_ACCOUNT_NAME:您之前创建的服务帐号的名称
    • PROJECT_ID:您的 Google Cloud 项目的 ID

    服务帐号充当工作流的身份。您应该已向服务帐号授予以下角色:

    • roles/logging.logWriter:用于写入日志条目
    • roles/run.admin:用于执行 Cloud Run 作业
    • roles/storage.objectCreator:用于在 Cloud Storage 中创建对象

工作流会执行以下操作:

  1. init 步骤 - 接受事件作为参数,并设置必要的变量。

  2. log_event 步骤 - 使用函数 sys.log 在 Cloud Logging 中创建日志条目。

  3. write_payload_to_gcs 步骤 - 发出 HTTP POST 请求,并将事件载荷数据写入 Cloud Storage 存储桶文件。

  4. run_job_to_process_payload 步骤 - 使用 Cloud Run Admin API 连接器方法 (googleapis.run.v1.namespaces.jobs.run) 执行作业。Cloud Storage 存储桶和数据文件名会作为工作流到作业的替换变量进行传递。

  5. finish 步骤 - 因工作流而返回有关作业执行的信息。

创建 Pub/Sub 主题

创建一个 Pub/Sub 主题,以便您可以向该主题发布消息。Pub/Sub 事件用于演示如何使用 Workflows 路由事件并将事件保存到 Cloud Storage,以便 Cloud Run 作业可以处理事件数据。

控制台

  1. 在 Google Cloud 控制台中,前往主题页面。

    打开“主题”

  2. 点击 创建主题

  3. 主题 ID 字段中,输入主题的 ID,例如 message-payload-topic

  4. 保留添加默认订阅选项。

  5. 请勿选择其他选项。

  6. 点击创建

gcloud

如需创建 ID 为 message-payload-topic 的主题,请运行 gcloud pubsub topics create 命令:

gcloud pubsub topics create message-payload-topic

创建 Eventarc 触发器以将事件路由到工作流

如需自动执行工作流并转而执行 Cloud Run 作业,请创建一个响应 Pub/Sub 事件并将事件路由到工作流的 Eventarc 触发器。每当有消息写入 Pub/Sub 主题时,该事件都会触发工作流的执行。

控制台

  1. 在 Google Cloud 控制台中,转到工作流页面:

    进入 Workflows

  2. 点击工作流的名称,例如 message-payload-workflow

  3. 工作流详情页面上,点击 修改

  4. 修改工作流页面的触发器部分中,点击添加新触发器 > Eventarc

    此时会打开 Eventarc 触发器窗格。

  5. 触发器名称字段中,输入触发器的名称,例如 message-payload-trigger

  6. 事件提供方列表中,选择 Cloud Pub/Sub

  7. 事件列表中,选择 google.cloud.pubsub.topic.v1.messagePublish

  8. 选择 Cloud Pub/Sub 主题列表中,选择您之前创建的 Pub/Sub 主题。

  9. 服务帐号字段中,选择您之前创建的服务帐号。

    该服务帐号充当触发器的身份。您应该已向服务帐号授予以下角色:

    • Eventarc 事件接收器:用于接收事件
    • Workflows Invoker:用于执行工作流
  10. 点击保存触发器

    Eventarc 触发器现在会显示在修改工作流页面的触发器部分中。

  11. 点击下一步

  12. 点击部署

gcloud

通过运行以下命令创建 Eventarc 触发器:

gcloud eventarc triggers create message-payload-trigger \
    --location=us-central1 \
    --destination-workflow=message-payload-workflow \
    --destination-workflow-location=us-central1 \
    --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
    --transport-topic=projects/PROJECT_ID/topics/message-payload-topic \
    --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

替换以下内容:

  • PROJECT_ID:您的 Google Cloud 项目的 ID
  • SERVICE_ACCOUNT_NAME:您之前创建的服务帐号的名称。

该服务帐号充当触发器的身份。您应该已向服务帐号授予以下角色:

  • roles/eventarc.eventReceiver:用于接收事件
  • roles/workflows.invoker:用于执行工作流

触发工作流

通过向 Pub/Sub 主题发布消息并生成事件来测试端到端系统。如需了解详情,请参阅使用事件或 Pub/Sub 消息触发工作流

  1. 向 Pub/Sub 主题发布消息以生成事件:

    gcloud pubsub topics publish message-payload-topic --message="Hello World"
    

    该事件会被路由到用于记录事件消息的工作流,将事件数据保存到 Cloud Storage 存储桶,并执行 Cloud Run 作业来处理保存在 Cloud Storage 中的数据。请稍等片刻。

  2. 查看作业执行情况,确认 Cloud Run 作业按预期运行:

    gcloud run jobs executions list --job=message-payload-job
    

    您应该会在输出中看到新的作业执行。

  3. 如需查看通过触发工作流创建的与事件相关的日志条目,请运行以下命令:

    gcloud logging read "resource.type=cloud_run_job AND textPayload:Payload"
    
  4. 查找类似如下的日志条目:

    textPayload: "Payload: {'message': {'data': 'SGVsbG8gV29ybGQ=', 'messageId': '8834108235224238',\
    \ 'publishTime': '2023-09-20T17:07:52.921Z'}, 'subscription': 'projects/MY_PROJECT/subscriptions/eventarc-us-central1-message-payload-trigger-sub-741'}"
    ...
    resource:
    labels:
      job_name: message-payload-job
      location: us-central1
      project_id: MY_PROJECT
    type: cloud_run_job
    textPayload: Processing message payload gs://message-payload-MY_PROJECT/8254002311197919.data.json
    
  5. 您可以查看 Cloud Storage 存储桶对象中的事件数据,确认结果是否符合预期。

    1. 检索您的存储桶名称:

      gsutil ls

      输出类似于以下内容:

      gs://message-payload-PROJECT_ID/

    2. 列出存储桶中的对象:

      gsutil ls -r gs://message-payload-PROJECT_ID/**

      输出应类似如下所示:

      gs://message-payload-PROJECT_ID/OBJECT_ID.data.json

      请记下要在下一步中使用的 OBJECT_ID

    3. 将存储桶中的对象下载为文件:

      gcloud storage cp gs://message-payload-PROJECT_ID/OBJECT_ID.data.json message-event.txt

      OBJECT_ID 替换为上一步中返回的 ID。

    4. 在文本编辑器中,打开 message-event.txt 文件。写入该文件的事件正文应类似于以下内容:

      {
        "message": {
          "data": "SGVsbG8gV29ybGQ=",
          "messageId": "8254002311197919",
          "publishTime": "2023-09-20T16:54:29.312Z"
        },
        "subscription": "projects/MY_PROJECT/subscriptions/eventarc-us-central1-message-payload-trigger-sub-741"
      }
      

      请注意,如果您从 Base64 格式中解码 SGVsbG8gV29ybGQ= 的数据值,将返回“Hello World”。

清理

如果您为本教程创建了一个新项目,请删除项目。 如果您使用的是现有项目,希望保留此项目且不保留本教程中添加的任何更改,请删除为教程创建的资源

删除项目

若要避免产生费用,最简单的方法是删除您为本教程创建的项目。

要删除项目,请执行以下操作:

  1. 在 Google Cloud 控制台中,进入管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

删除教程资源

删除您在本教程中创建的资源:

  1. 删除 Pub/Sub 主题

  2. 删除 Eventarc 触发器

  3. 删除 Workflows 工作流

  4. 删除 Cloud Run 作业

  5. 删除 Cloud Storage 存储桶

后续步骤