执行 Cloud Run 作业,用于处理 Cloud Storage 中保存的事件数据


您可以使用 Workflows 来 Cloud Run 作业(作为工作流的一部分) 执行更复杂的数据处理,或编排系统 现有作业

本教程演示了如何使用 Workflows 执行 用于处理 Cloud Storage 存储桶。将事件载荷存储在 Cloud Storage 中 你可以使用存储桶对数据进行加密 客户管理的加密密钥 但如果您是 将事件数据作为环境变量传递 分配给 Cloud Run 作业。

下图是简要概览:

Pub/Sub 事件由 Eventarc 触发器路由到 Workflows 并保存到 Cloud
Storage 存储桶。Cloud Run 作业处理存储在存储桶中的事件数据。

目标

在此教程中,您将学习以下操作:

  1. 创建 Cloud Run 作业来处理存储在 Cloud Storage 存储桶
  2. 部署执行以下操作的工作流:
    1. 接收事件作为参数。
    2. 将事件载荷数据写入 Cloud Storage 存储桶。
    3. 使用 Cloud Run Admin API 连接器 来执行 Cloud Run 作业。
  3. 创建一个 Pub/Sub 主题,以便向其发布消息。 本教程使用 Pub/Sub 事件作为示例来介绍如何 将事件保存到 Cloud Storage,以便 Cloud Run 作业可以处理 事件数据。
  4. 创建一个 Eventarc 触发器,用于在以下情况下执行工作流: 该消息会写入 Pub/Sub 主题
  5. 通过向 Pub/Sub 写入消息来触发工作流 主题。

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

准备工作

您的组织定义的安全限制条件可能会导致您无法完成以下步骤。如需了解相关问题排查信息,请参阅在受限的 Google Cloud 环境中开发应用

在开始学习本教程之前,您必须启用特定的 API 并创建一个 用户代管式服务账号。您必须向服务账号授予角色 这样就能使用 Eventarc 可触发并执行工作流。

控制台

  1. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  2. 确保您的 Google Cloud 项目已启用结算功能

  3. 启用 Cloud Build, Cloud Run, Cloud Storage, Eventarc, and Workflows API。

    启用 API

  4. Create a service account:

    1. In the Google Cloud console, go to the Create service account page.

      Go to Create service account
    2. Select your project.
    3. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

      In the Service account description field, enter a description. For example, Service account for quickstart.

    4. Click Create and continue.
    5. Grant the following roles to the service account: Cloud Run Admin, Eventarc Event Receiver, Logs Writer, Storage Object Creator, Workflows Invoker.

      To grant a role, find the Select a role list, then select the role.

      To grant additional roles, click Add another role and add each additional role.

    6. Click Continue.
    7. Click Done to finish creating the service account.

  5. 如果您在或 在 2021 年 4 月 8 日之前启用,以支持通过身份验证的 Pub/Sub 推送 请求,授予 服务 Account Token Creator 角色 (roles/iam.serviceAccountTokenCreator) 发送给服务代理否则,系统会默认授予此角色:
    1. 在 Google Cloud 控制台中,前往 IAM 页面。

      转到 IAM

    2. 选中包括 Google 提供的角色授权复选框。
    3. 名称列中,找到 Cloud Pub/Sub 服务账号 然后点击 修改 主账号
    4. 点击 添加角色添加其他角色
    5. 选择角色列表中,过滤出 Service Account Token Creator,然后选择角色。
    6. 点击保存
  6. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  7. Cloud Shell 支持本教程中的命令行步骤。

gcloud

  1. Install the Google Cloud CLI.
  2. To initialize the gcloud CLI, run the following command:

    gcloud init
  3. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  4. 确保您的 Google Cloud 项目已启用结算功能

  5. Enable the Cloud Build, Cloud Run, Cloud Storage, Eventarc, and Workflows APIs:

    gcloud services enable cloudbuild.googleapis.com run.googleapis.com storage.googleapis.com eventarc.googleapis.com workflows.googleapis.com
  6. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant roles to the service account. Run the following command once for each of the following IAM roles: roles/eventarc.eventReceiver, roles/logging.logWriter, roles/run.admin, roles/storage.objectCreator, roles/workflows.invoker :

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • ROLE: the role to grant
  7. 如果您在 2021 年 4 月 8 日或之前启用了 Cloud Pub/Sub 服务代理,以支持经过身份验证的 Pub/Sub 推送请求,请向该服务代理授予 Service Account Token Creator 角色 (roles/iam.serviceAccountTokenCreator)。否则,系统会默认授予此角色:
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com \
        --role=roles/iam.serviceAccountTokenCreator
  8. PROJECT_NUMBER 替换为您的 Google Cloud 项目编号。您可以在 Google Cloud 控制台的欢迎页面上或者通过运行以下命令找到项目编号:

    gcloud projects describe PROJECT_ID --format='value(projectNumber)'

创建 Cloud Run 作业

本教程使用的示例代码可在 GitHub 上找到。Deployment 脚本构建容器映像,以创建 Cloud Run 作业。通过 脚本还会创建一个 Cloud Storage 存储桶。通过 Cloud Run 作业可读取存储在 Cloud Storage 存储桶,然后输出事件数据。

  1. 通过将示例应用代码库克隆到本地来获取示例代码 机器:

    git clone https://github.com/GoogleCloudPlatform/workflows-demos.git
    

    或者,您也可以 将示例下载为 ZIP 文件

  2. 转到包含示例代码的目录:

    cd workflows-demos/cloud-run-jobs-payload-gcs/message-payload-job
    
  3. 通过运行部署脚本创建 Cloud Run 作业:

    ./deploy-job.sh
    

该脚本会创建一个名为 message-payload-PROJECT_ID,其中 PROJECT_ID 是您的 Google Cloud 项目的 ID。 系统还会创建一个名为 message-payload-job 的 Cloud Run 作业。

部署执行 Cloud Run 作业的工作流

定义并部署用于执行 Cloud Run 作业的工作流 创建的新实例工作流程定义由一系列步骤组成 使用 Workflows 语法描述。

工作流接收事件,将事件数据保存到 Cloud Storage 然后执行 Cloud Run 作业来处理事件 数据。

控制台

  1. 在 Google Cloud 控制台中,前往 工作流程页面:

    进入 Workflows

  2. 点击 创建

  3. 输入新工作流的名称,例如 message-payload-workflow

  4. 选择合适的区域;例如 us-central1

  5. 服务账号字段中,选择您所需的服务账号 之前创建的

    服务账号将用作工作流的身份。您应该有 已向该服务账号授予以下角色:

    • Cloud Run Admin:用于执行 Cloud Run 作业
    • Logs Writer:用于写入日志条目
    • Storage Object Creator:在 Cloud Storage 中创建对象
  6. 点击下一步

  7. 在工作流编辑器中,为 工作流:

    main:
        params: [event]
        steps:
            - init:
                assign:
                    - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    - event_bucket: ${"message-payload-" + project_id}
                    - event_file: ${event.id + ".data.json"}
                    - job_name: message-payload-job
                    - job_location: us-central1
            - log_event:
                call: sys.log
                args:
                    data: ${event}
            - write_payload_to_gcs:
                call: http.post
                args:
                    url: ${"https://storage.googleapis.com/upload/storage/v1/b/" + event_bucket + "/o"}
                    auth:
                        type: OAuth2
                    query:
                        name: ${event_file}
                    body:
                        ${event.data}
            - run_job_to_process_payload:
                call: googleapis.run.v1.namespaces.jobs.run
                args:
                    name: ${"namespaces/" + project_id + "/jobs/" + job_name}
                    location: ${job_location}
                    body:
                        overrides:
                            containerOverrides:
                                env:
                                    - name: INPUT_BUCKET
                                      value: ${event_bucket}
                                    - name: INPUT_FILE
                                      value: ${event_file}
                result: job_execution
            - finish:
                return: ${job_execution}
  8. 点击部署

gcloud

  1. 为工作流创建源代码文件:

    touch message-payload-workflow.yaml
    
  2. 将以下工作流定义复制到 message-payload-workflow.yaml

    main:
        params: [event]
        steps:
            - init:
                assign:
                    - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    - event_bucket: ${"message-payload-" + project_id}
                    - event_file: ${event.id + ".data.json"}
                    - job_name: message-payload-job
                    - job_location: us-central1
            - log_event:
                call: sys.log
                args:
                    data: ${event}
            - write_payload_to_gcs:
                call: http.post
                args:
                    url: ${"https://storage.googleapis.com/upload/storage/v1/b/" + event_bucket + "/o"}
                    auth:
                        type: OAuth2
                    query:
                        name: ${event_file}
                    body:
                        ${event.data}
            - run_job_to_process_payload:
                call: googleapis.run.v1.namespaces.jobs.run
                args:
                    name: ${"namespaces/" + project_id + "/jobs/" + job_name}
                    location: ${job_location}
                    body:
                        overrides:
                            containerOverrides:
                                env:
                                    - name: INPUT_BUCKET
                                      value: ${event_bucket}
                                    - name: INPUT_FILE
                                      value: ${event_file}
                result: job_execution
            - finish:
                return: ${job_execution}
  3. 输入以下命令以部署工作流:

    gcloud workflows deploy message-payload-workflow \
        --location=us-central1 \
        --source=message-payload-workflow.yaml \
        --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
    

    替换以下内容:

    • SERVICE_ACCOUNT_NAME: 您之前创建的服务账号
    • PROJECT_ID:您的 Google Cloud 项目

    服务账号将用作工作流的身份。您应该有 已向该服务账号授予以下角色:

    • roles/logging.logWriter:用于写入日志条目
    • roles/run.admin:用于执行 Cloud Run 作业
    • roles/storage.objectCreator:用于在 Cloud Storage 中创建对象

工作流会执行以下操作:

  1. init 步骤 - 接受事件作为参数,并设置必要的变量。

  2. log_event 步骤 - 使用函数在 Cloud Logging 中创建日志条目。 sys.log.

  3. write_payload_to_gcs 步骤 - 发出 HTTP POST 请求并写入事件 Cloud Storage 存储桶文件。

  4. run_job_to_process_payload 步骤 - 使用 Cloud Run Admin API 连接器 方法, googleapis.run.v1.namespaces.jobs.run, 以执行作业。Cloud Storage 存储桶和数据文件名 作为替换变量从工作流传递到作业。

  5. finish 步骤 - 返回有关作业执行的信息(作为 工作流。

创建 Pub/Sub 主题

创建一个 Pub/Sub 主题,以便向其发布消息。 Pub/Sub 事件用于演示如何路由事件 并将事件保存到 Cloud Storage 以便 Cloud Run 作业能够处理事件数据。

控制台

  1. 在 Google Cloud 控制台中,前往主题页面。

    打开“主题”

  2. 点击 创建主题

  3. 主题 ID 字段中,输入主题的 ID,例如 message-payload-topic

  4. 保留添加默认订阅选项。

  5. 请勿选择其他选项。

  6. 点击创建

gcloud

如需创建 ID 为 message-payload-topic 的主题,请运行 gcloud pubsub topics create 命令:

gcloud pubsub topics create message-payload-topic

创建 Eventarc 触发器以将事件路由到工作流

自动执行工作流,进而执行 Cloud Run 创建一个用于响应 Pub/Sub 事件,用于将事件路由到工作流。 每当有消息写入 Pub/Sub 主题时 触发工作流的执行。

控制台

  1. 在 Google Cloud 控制台中,前往 工作流程页面:

    进入 Workflows

  2. 点击工作流的名称,例如 message-payload-workflow

  3. 工作流详情页面上,点击 修改

  4. 修改工作流页面的触发器部分, 点击添加新触发器 >Eventarc

    系统随即会打开 Eventarc 触发器窗格。

  5. 触发器名称字段中,输入触发器的名称,例如 message-payload-trigger

  6. 事件提供方列表中,选择 Cloud Pub/Sub

  7. 事件列表中,选择 google.cloud.pubsub.topic.v1.messagePublished

  8. 选择 Cloud Pub/Sub 主题列表中, 您之前创建的 Pub/Sub 主题。

  9. 服务账号字段中,选择您所需的服务账号 之前创建的

    服务账号充当触发器的身份。您应该有 已向该服务账号授予以下角色:

    • Eventarc 事件接收器:用于接收事件
    • Workflows Invoker:执行工作流
  10. 点击保存触发器

    Eventarc 触发器现在显示在触发器中 部分(位于修改工作流程页面上)。

  11. 点击下一步

  12. 点击部署

gcloud

运行以下命令,创建 Eventarc 触发器:

gcloud eventarc triggers create message-payload-trigger \
    --location=us-central1 \
    --destination-workflow=message-payload-workflow \
    --destination-workflow-location=us-central1 \
    --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
    --transport-topic=projects/PROJECT_ID/topics/message-payload-topic \
    --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

替换以下内容:

  • PROJECT_ID:您的 Google Cloud 项目的 ID
  • SERVICE_ACCOUNT_NAME:服务的名称 您之前创建的 Google Ads 账号。

服务账号充当触发器的身份。您应该有 已向该服务账号授予以下角色:

  • roles/eventarc.eventReceiver:用于接收事件
  • roles/workflows.invoker:用于执行工作流

触发工作流

通过向 Pub/Sub 发布消息来测试端到端系统 主题并生成事件。如需了解详情,请参阅 使用事件或 Pub/Sub 消息触发工作流

  1. 向 Pub/Sub 主题发布消息以生成事件:

    gcloud pubsub topics publish message-payload-topic --message="Hello World"
    

    系统将事件传送到工作流程,该工作流程会记录事件消息, 事件数据传输到 Cloud Storage 存储桶,并执行 Cloud Run 作业,用于处理保存在 Cloud Storage此过程可能需要一分钟时间。

  2. 通过查看 作业执行:

    gcloud run jobs executions list --job=message-payload-job
    

    您应该会在输出中看到新的作业执行。

  3. 如需查看因触发工作流而创建的与事件相关的日志条目,请运行以下命令: 以下命令:

    gcloud logging read "resource.type=cloud_run_job AND textPayload:Payload"
    
  4. 查找类似如下的日志条目:

    textPayload: "Payload: {'message': {'data': 'SGVsbG8gV29ybGQ=', 'messageId': '8834108235224238',\
    \ 'publishTime': '2023-09-20T17:07:52.921Z'}, 'subscription': 'projects/MY_PROJECT/subscriptions/eventarc-us-central1-message-payload-trigger-sub-741'}"
    ...
    resource:
    labels:
      job_name: message-payload-job
      location: us-central1
      project_id: MY_PROJECT
    type: cloud_run_job
    textPayload: Processing message payload gs://message-payload-MY_PROJECT/8254002311197919.data.json
    
  5. 您可以通过在以下位置查看事件数据来确认结果符合预期: Cloud Storage 存储桶对象。

    1. 检索存储桶名称:

      gcloud storage ls

      输出类似于以下内容:

      gs://message-payload-PROJECT_ID/

    2. 列出存储桶中的对象:

      gcloud storage ls gs://message-payload-PROJECT_ID/** --recursive

      输出应类似如下所示:

      gs://message-payload-PROJECT_ID/OBJECT_ID.data.json

      请记下要在下一步中使用的 OBJECT_ID

    3. 将存储桶中的对象下载为文件:

      gcloud storage cp gs://message-payload-PROJECT_ID/OBJECT_ID.data.json message-event.txt

      OBJECT_ID 替换为 上一步。

    4. 在文本编辑器中,打开 message-event.txt 文件。事件正文 文件应类似于以下内容:

      {
        "message": {
          "data": "SGVsbG8gV29ybGQ=",
          "messageId": "8254002311197919",
          "publishTime": "2023-09-20T16:54:29.312Z"
        },
        "subscription": "projects/MY_PROJECT/subscriptions/eventarc-us-central1-message-payload-trigger-sub-741"
      }
      

      请注意,如果您将 SGVsbG8gV29ybGQ= 的数据值从其 Base64 格式,“Hello World”。

清理

如果您为本教程创建了一个新项目,请删除项目。 如果您使用的是现有项目,希望保留此项目且不保留本教程中添加的任何更改,请删除为教程创建的资源

删除项目

为了避免产生费用,最简单的方法是删除您为本教程创建的项目。

要删除项目,请执行以下操作:

  1. 在 Google Cloud 控制台中,进入管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

删除教程资源

删除您在本教程中创建的资源:

  1. 删除 Pub/Sub 主题

  2. 删除 Eventarc 触发器

  3. 删除 Workflows 工作流

  4. 删除 Cloud Run 作业

  5. 删除 Cloud Storage 存储分区

后续步骤