执行用于处理存储在 Cloud Storage 中的事件数据的 Cloud Run 作业


您可以使用 Workflows 在执行更复杂的数据处理或编排现有作业的系统的工作流中执行 Cloud Run 作业

本教程演示了如何使用 Workflows 来执行 Cloud Run 作业,以处理存储在 Cloud Storage 存储桶中的事件数据。将事件载荷存储在 Cloud Storage 存储桶中后,您可以使用客户管理的加密密钥来加密数据;如果您将事件数据作为环境变量传递给 Cloud Run 作业,则无法执行此操作。

下图提供了简要概览:

Pub/Sub 事件由 Eventarc 触发器路由到 Workflows,并保存在 Cloud Storage 存储桶中。Cloud Run 作业会处理存储在存储桶中的事件数据。

目标

在此教程中,您将学习以下操作:

  1. 创建一个 Cloud Run 作业,用于处理存储在 Cloud Storage 存储桶中的事件数据。
  2. 部署一个工作流,以执行以下操作:
    1. 接收事件作为参数。
    2. 将事件载荷数据写入 Cloud Storage 存储桶。
    3. 使用 Cloud Run Admin API 连接器执行 Cloud Run 作业。
  3. 创建一个 Pub/Sub 主题,以便向其发布消息。本教程将 Pub/Sub 事件用作示例,介绍如何使用 Workflows 路由事件,将事件保存到 Cloud Storage,以便 Cloud Run 作业可以处理事件数据。
  4. 创建一个 Eventarc 触发器,用于在有消息写入 Pub/Sub 主题时执行工作流。
  5. 向 Pub/Sub 主题写入消息以触发工作流。

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

准备工作

您的组织定义的安全限制条件可能会导致您无法完成以下步骤。如需了解相关问题排查信息,请参阅在受限的 Google Cloud 环境中开发应用

在开始本教程之前,您必须启用特定 API 并创建用户代管式服务账号。您必须向服务账号授予必要的角色和权限,以便您可以使用 Eventarc 触发器路由事件并执行工作流。

请注意,如果您使用 Cloud Shell 试用本教程,则可能需要向 Compute Engine 默认服务账号授予其他角色。如需了解详情,请参阅本文档中的创建 Cloud Run 作业部分。

控制台

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Artifact Registry, Cloud Build, Cloud Run, Cloud Storage, Compute Engine, Eventarc, and Workflows APIs.

    Enable the APIs

  5. Create a service account:

    1. In the Google Cloud console, go to the Create service account page.

      Go to Create service account
    2. Select your project.
    3. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

      In the Service account description field, enter a description. For example, Service account for quickstart.

    4. Click Create and continue.
    5. Grant the following roles to the service account: Cloud Run Admin, Eventarc Event Receiver, Logs Writer, Storage Object Creator, Workflows Invoker.

      To grant a role, find the Select a role list, then select the role.

      To grant additional roles, click Add another role and add each additional role.

    6. Click Continue.
    7. Click Done to finish creating the service account.

  6. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  7. Make sure that billing is enabled for your Google Cloud project.

  8. Enable the Artifact Registry, Cloud Build, Cloud Run, Cloud Storage, Compute Engine, Eventarc, and Workflows APIs.

    Enable the APIs

  9. Create a service account:

    1. In the Google Cloud console, go to the Create service account page.

      Go to Create service account
    2. Select your project.
    3. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

      In the Service account description field, enter a description. For example, Service account for quickstart.

    4. Click Create and continue.
    5. Grant the following roles to the service account: Cloud Run Admin, Eventarc Event Receiver, Logs Writer, Storage Object Creator, Workflows Invoker.

      To grant a role, find the Select a role list, then select the role.

      To grant additional roles, click Add another role and add each additional role.

    6. Click Continue.
    7. Click Done to finish creating the service account.

  10. 如果您在 2021 年 4 月 8 日当天或之前启用了 Cloud Pub/Sub 服务代理,以支持经过身份验证的 Pub/Sub 推送请求,请向该服务代理授予 Service Account Token Creator 角色 (roles/iam.serviceAccountTokenCreator)。否则,系统会默认授予此角色:
    1. 在 Google Cloud 控制台中,前往 IAM 页面。

      转到 IAM

    2. 选中包括 Google 提供的角色授权复选框。
    3. 名称列中,找到 Cloud Pub/Sub 服务账号,然后点击相应行中的 修改主账号
    4. 点击 添加角色添加其他角色
    5. 选择角色列表中,过滤出 Service Account Token Creator,然后选择该角色。
    6. 点击保存
  11. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  12. Cloud Shell 支持本教程中的命令行步骤。

gcloud

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Artifact Registry, Cloud Build, Cloud Run, Cloud Storage, Compute Engine, Eventarc, and Workflows APIs:

    gcloud services enable artifactregistry.googleapis.com cloudbuild.googleapis.com compute.googleapis.com run.googleapis.com storage.googleapis.com eventarc.googleapis.com workflows.googleapis.com
  7. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant roles to the service account. Run the following command once for each of the following IAM roles: roles/eventarc.eventReceiver, roles/logging.logWriter, roles/run.admin, roles/storage.objectCreator, roles/workflows.invoker :

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • ROLE: the role to grant
  8. Install the Google Cloud CLI.
  9. To initialize the gcloud CLI, run the following command:

    gcloud init
  10. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  11. Make sure that billing is enabled for your Google Cloud project.

  12. Enable the Artifact Registry, Cloud Build, Cloud Run, Cloud Storage, Compute Engine, Eventarc, and Workflows APIs:

    gcloud services enable artifactregistry.googleapis.com cloudbuild.googleapis.com compute.googleapis.com run.googleapis.com storage.googleapis.com eventarc.googleapis.com workflows.googleapis.com
  13. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant roles to the service account. Run the following command once for each of the following IAM roles: roles/eventarc.eventReceiver, roles/logging.logWriter, roles/run.admin, roles/storage.objectCreator, roles/workflows.invoker :

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • ROLE: the role to grant
  14. 如果您在 2021 年 4 月 8 日或之前启用了 Cloud Pub/Sub 服务代理,以支持经过身份验证的 Pub/Sub 推送请求,请向该服务代理授予 Service Account Token Creator 角色 (roles/iam.serviceAccountTokenCreator)。否则,系统会默认授予此角色:
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com \
        --role=roles/iam.serviceAccountTokenCreator
  15. PROJECT_NUMBER 替换为您的 Google Cloud 项目编号。您可以在 Google Cloud 控制台的欢迎页面上或者通过运行以下命令找到项目编号:

    gcloud projects describe PROJECT_ID --format='value(projectNumber)'

创建 Cloud Run 作业

本教程使用了您可以在 GitHub 上找到的示例代码。部署脚本会构建容器映像以创建 Cloud Run 作业。该脚本还会创建一个 Cloud Storage 存储桶。Cloud Run 作业会读取存储在 Cloud Storage 存储桶中的所有事件数据,然后输出事件数据。

  1. 如果您在 Cloud Shell 中运行部署脚本,并且 Compute Engine 默认服务账号没有“Editor”角色,请向 Compute Engine 默认服务账号授予项目的以下角色。(或者,您也可以跳过此步骤,直接在下一步中克隆示例应用代码库。)

    1. 授予 Artifact Registry Writer 角色 (roles/artifactregistry.writer):

      gcloud projects add-iam-policy-binding PROJECT_ID \
          --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
          --role=roles/artifactregistry.writer

      PROJECT_NUMBER 替换为您的 Google Cloud 项目编号。您可以在 Google Cloud 控制台的欢迎页面上或者通过运行以下命令找到项目编号:

      gcloud projects describe PROJECT_ID --format='value(projectNumber)'

    2. 授予 Storage Object User 角色 (roles/storage.objectUser):

      gcloud projects add-iam-policy-binding PROJECT_ID \
          --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
          --role=roles/storage.objectUser
    3. 授予 Logging Logs Writer 角色 (roles/logging.logWriter):

      gcloud projects add-iam-policy-binding PROJECT_ID \
          --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
          --role=roles/logging.logWriter
  2. 将示例应用代码库克隆到本地机器,以获取示例代码:

    git clone https://github.com/GoogleCloudPlatform/workflows-demos.git

    或者,您也可以下载该示例的 ZIP 文件

  3. 转到包含示例代码的目录:

    cd workflows-demos/cloud-run-jobs-payload-gcs/message-payload-job
  4. 通过运行部署脚本创建 Cloud Run 作业:

    ./deploy-job.sh

该脚本会创建一个名为 message-payload-PROJECT_ID 的 Cloud Storage 存储桶,其中 PROJECT_ID 是您的 Google Cloud 项目的 ID。系统还会创建一个名为 message-payload-job 的 Cloud Run 作业。

部署用于执行 Cloud Run 作业的工作流

定义并部署一个工作流,用于执行您刚刚创建的 Cloud Run 作业。工作流定义由一系列使用 Workflows 语法描述的步骤组成。

该工作流会接收事件,将事件数据保存到 Cloud Storage 存储桶,然后执行 Cloud Run 作业来处理事件数据。

控制台

  1. 在 Google Cloud 控制台中,前往 Workflows 页面:

    进入 Workflows

  2. 点击 创建

  3. 输入新工作流的名称,例如 message-payload-workflow

  4. 选择适当的区域;例如 us-central1

  5. 服务账号字段中,选择您之前创建的服务账号。

    服务账号充当工作流的身份。您应该已向服务账号授予以下角色:

    • Cloud Run Admin:用于执行 Cloud Run 作业
    • Logs Writer:用于写入日志条目
    • Storage Object Creator:用于在 Cloud Storage 中创建对象
  6. 点击下一步

  7. 在工作流编辑器中,输入工作流的定义:

    main:
        params: [event]
        steps:
            - init:
                assign:
                    - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    - event_bucket: ${"message-payload-" + project_id}
                    - event_file: ${event.id + ".data.json"}
                    - job_name: message-payload-job
                    - job_location: us-central1
            - log_event:
                call: sys.log
                args:
                    data: ${event}
            - write_payload_to_gcs:
                call: http.post
                args:
                    url: ${"https://storage.googleapis.com/upload/storage/v1/b/" + event_bucket + "/o"}
                    auth:
                        type: OAuth2
                    query:
                        name: ${event_file}
                    body:
                        ${event.data}
            - run_job_to_process_payload:
                call: googleapis.run.v1.namespaces.jobs.run
                args:
                    name: ${"namespaces/" + project_id + "/jobs/" + job_name}
                    location: ${job_location}
                    body:
                        overrides:
                            containerOverrides:
                                env:
                                    - name: INPUT_BUCKET
                                      value: ${event_bucket}
                                    - name: INPUT_FILE
                                      value: ${event_file}
                result: job_execution
            - finish:
                return: ${job_execution}
  8. 点击部署

gcloud

  1. 为工作流创建源代码文件:

    touch message-payload-workflow.yaml
  2. 将以下工作流定义复制到 message-payload-workflow.yaml

    main:
        params: [event]
        steps:
            - init:
                assign:
                    - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    - event_bucket: ${"message-payload-" + project_id}
                    - event_file: ${event.id + ".data.json"}
                    - job_name: message-payload-job
                    - job_location: us-central1
            - log_event:
                call: sys.log
                args:
                    data: ${event}
            - write_payload_to_gcs:
                call: http.post
                args:
                    url: ${"https://storage.googleapis.com/upload/storage/v1/b/" + event_bucket + "/o"}
                    auth:
                        type: OAuth2
                    query:
                        name: ${event_file}
                    body:
                        ${event.data}
            - run_job_to_process_payload:
                call: googleapis.run.v1.namespaces.jobs.run
                args:
                    name: ${"namespaces/" + project_id + "/jobs/" + job_name}
                    location: ${job_location}
                    body:
                        overrides:
                            containerOverrides:
                                env:
                                    - name: INPUT_BUCKET
                                      value: ${event_bucket}
                                    - name: INPUT_FILE
                                      value: ${event_file}
                result: job_execution
            - finish:
                return: ${job_execution}
  3. 输入以下命令以部署工作流:

    gcloud workflows deploy message-payload-workflow \
        --location=us-central1 \
        --source=message-payload-workflow.yaml \
        --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

    替换以下内容:

    • SERVICE_ACCOUNT_NAME:您之前创建的服务账号的名称
    • PROJECT_ID:您的 Google Cloud 项目的 ID

    服务账号充当工作流的身份。您应该已向服务账号授予以下角色:

    • roles/logging.logWriter:用于写入日志条目
    • roles/run.admin:用于执行 Cloud Run 作业
    • roles/storage.objectCreator:在 Cloud Storage 中创建对象

该工作流会执行以下操作:

  1. init 步骤 - 接受事件作为参数并设置必要的变量。

  2. log_event 步骤 - 使用函数 sys.log 在 Cloud Logging 中创建日志条目。

  3. write_payload_to_gcs 步骤 - 发出 HTTP POST 请求,并将事件载荷数据写入 Cloud Storage 存储桶文件。

  4. run_job_to_process_payload 步骤 - 使用 Cloud Run Admin API 连接器方法 googleapis.run.v1.namespaces.jobs.run 来执行作业。Cloud Storage 存储桶和数据文件名会作为替换变量从工作流传递给作业。

  5. finish 步骤 - 作为工作流的结果,返回有关作业执行的信息。

创建 Pub/Sub 主题

创建一个 Pub/Sub 主题,以便向其发布消息。我们将使用 Pub/Sub 事件来演示如何使用 Workflows 路由事件并将事件保存到 Cloud Storage,以便 Cloud Run 作业可以处理事件数据。

控制台

  1. 在 Google Cloud 控制台中,前往主题页面。

    打开“主题”

  2. 点击 创建主题

  3. 主题 ID 字段中,输入主题的 ID,例如 message-payload-topic

  4. 保留添加默认订阅选项。

  5. 请勿选择其他选项。

  6. 点击创建

gcloud

如需创建 ID 为 message-payload-topic 的主题,请运行 gcloud pubsub topics create 命令:

gcloud pubsub topics create message-payload-topic

创建 Eventarc 触发器以将事件路由到工作流

如需自动执行工作流,进而执行 Cloud Run 作业,请创建一个 Eventarc 触发器来响应 Pub/Sub 事件,并将事件路由到工作流。每当有消息写入 Pub/Sub 主题时,该事件都会触发工作流的执行。

控制台

  1. 在 Google Cloud 控制台中,前往 Workflows 页面:

    进入 Workflows

  2. 点击工作流的名称,例如 message-payload-workflow

  3. 工作流详情页面上,点击 修改

  4. 修改工作流页面的触发器部分中,依次点击添加新触发器 > Eventarc

    系统随即会打开 Eventarc 触发器窗格。

  5. 触发器名称字段中,输入触发器的名称,例如 message-payload-trigger

  6. 事件提供方列表中,选择 Cloud Pub/Sub

  7. 事件列表中,选择 google.cloud.pubsub.topic.v1.messagePublished

  8. 选择 Cloud Pub/Sub 主题列表中,选择您之前创建的 Pub/Sub 主题。

  9. 服务账号字段中,选择您之前创建的服务账号。

    该服务账号充当触发器的身份。您应该已向服务账号授予以下角色:

    • Eventarc Event Receiver:用于接收事件
    • Workflows Invoker:用于执行工作流
  10. 点击保存触发器

    Eventarc 触发器现在显示在修改工作流页面的触发器部分中。

  11. 点击下一步

  12. 点击部署

gcloud

运行以下命令以创建 Eventarc 触发器:

gcloud eventarc triggers create message-payload-trigger \
    --location=us-central1 \
    --destination-workflow=message-payload-workflow \
    --destination-workflow-location=us-central1 \
    --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
    --transport-topic=projects/PROJECT_ID/topics/message-payload-topic \
    --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

替换以下内容:

  • PROJECT_ID:您的 Google Cloud 项目的 ID
  • SERVICE_ACCOUNT_NAME:您之前创建的服务账号的名称。

该服务账号充当触发器的身份。您应该已向服务账号授予以下角色:

  • roles/eventarc.eventReceiver:接收事件
  • roles/workflows.invoker:用于执行工作流

触发工作流

通过向 Pub/Sub 主题发布消息并生成事件来测试端到端系统。如需了解详情,请参阅使用事件或 Pub/Sub 消息触发工作流

  1. 向 Pub/Sub 主题发布消息以生成事件:

    gcloud pubsub topics publish message-payload-topic --message="Hello World"
    

    系统会将事件路由到工作流,该工作流会记录事件消息、将事件数据保存到 Cloud Storage 存储桶,并执行 Cloud Run 作业以处理保存在 Cloud Storage 中的数据。此过程可能需要 1 分钟左右。

  2. 查看作业执行情况,确认 Cloud Run 作业是否按预期运行:

    gcloud run jobs executions list --job=message-payload-job

    您应该会在输出中看到新的作业执行。

  3. 如需查看触发工作流程时创建的与事件相关的日志条目,请运行以下命令:

    gcloud logging read "resource.type=cloud_run_job AND textPayload:Payload"
    
  4. 查找类似如下的日志条目:

    textPayload: "Payload: {'message': {'data': 'SGVsbG8gV29ybGQ=', 'messageId': '8834108235224238',\
    \ 'publishTime': '2023-09-20T17:07:52.921Z'}, 'subscription': 'projects/MY_PROJECT/subscriptions/eventarc-us-central1-message-payload-trigger-sub-741'}"
    ...
    resource:
    labels:
      job_name: message-payload-job
      location: us-central1
      project_id: MY_PROJECT
    type: cloud_run_job
    textPayload: Processing message payload gs://message-payload-MY_PROJECT/8254002311197919.data.json
    
  5. 您可以通过查看 Cloud Storage 存储桶对象中的事件数据,确认结果是否符合预期。

    1. 检索您的存储桶名称:

      gcloud storage ls

      输出类似于以下内容:

      gs://message-payload-PROJECT_ID/

    2. 列出存储桶中的对象:

      gcloud storage ls gs://message-payload-PROJECT_ID/** --recursive

      输出应类似如下所示:

      gs://message-payload-PROJECT_ID/OBJECT_ID.data.json

      请记下要在下一步中使用的 OBJECT_ID

    3. 将存储桶中的对象作为文件下载:

      gcloud storage cp gs://message-payload-PROJECT_ID/OBJECT_ID.data.json message-event.txt

      OBJECT_ID 替换为上一步中返回的 ID。

    4. 在文本编辑器中,打开 message-event.txt 文件。写入文件的事件正文应类似于以下内容:

      {
        "message": {
          "data": "SGVsbG8gV29ybGQ=",
          "messageId": "8254002311197919",
          "publishTime": "2023-09-20T16:54:29.312Z"
        },
        "subscription": "projects/MY_PROJECT/subscriptions/eventarc-us-central1-message-payload-trigger-sub-741"
      }
      

      请注意,如果您从 Base64 格式对 SGVsbG8gV29ybGQ= 的数据值进行解码,则返回“Hello World”。

清理

如果您为本教程创建了一个新项目,请删除项目。 如果您使用的是现有项目,希望保留此项目且不保留本教程中添加的任何更改,请删除为教程创建的资源

删除项目

为了避免产生费用,最简单的方法是删除您为本教程创建的项目。

要删除项目,请执行以下操作:

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

删除教程资源

删除您在本教程中创建的资源:

  1. 删除 Pub/Sub 主题

  2. 删除 Eventarc 触发器

  3. 删除 Workflows 工作流

  4. 删除 Cloud Run 作业

  5. 删除 Cloud Storage 存储桶

后续步骤