执行 Cloud Run 作业,用于处理 Cloud Storage 中保存的事件数据


在执行更复杂的数据处理或编排现有作业系统的工作流中,您可以使用 Workflows 执行 Cloud Run 作业

本教程演示了如何使用 Workflows 执行 Cloud Run 作业,该作业用于处理存储在 Cloud Storage 存储桶中的事件数据。通过将事件载荷存储在 Cloud Storage 存储桶中,您可以使用客户管理的加密密钥对数据进行加密,如果您将事件数据作为环境变量传递到 Cloud Run 作业,则无法执行此操作。

下图是简要概览:

Pub/Sub 事件由 Eventarc 触发器路由到 Workflows,并保存在 Cloud Storage 存储桶中。Cloud Run 作业处理存储在存储桶中的事件数据。

目标

在此教程中,您将学习以下操作:

  1. 创建一个 Cloud Run 作业,用于处理存储在 Cloud Storage 存储桶中的事件数据。
  2. 部署执行以下操作的工作流:
    1. 接收事件作为参数。
    2. 将事件载荷数据写入 Cloud Storage 存储桶。
    3. 使用 Cloud Run Admin API 连接器执行 Cloud Run 作业。
  3. 创建 Pub/Sub 主题,以便向其发布消息。本教程以一个 Pub/Sub 事件为例,来说明如何使用 Workflows 路由事件,将事件保存到 Cloud Storage,以便 Cloud Run 作业可以处理事件数据。
  4. 创建一个 Eventarc 触发器,以在有消息写入 Pub/Sub 主题时执行工作流。
  5. 向 Pub/Sub 主题写入消息来触发工作流。

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

准备工作

您的组织定义的安全限制条件可能会导致您无法完成以下步骤。如需了解相关问题排查信息,请参阅在受限的 Google Cloud 环境中开发应用

在开始本教程之前,您必须启用特定的 API 并创建用户代管式服务帐号。您必须向服务帐号授予必要的角色和权限,以便使用 Eventarc 触发器路由事件并执行工作流。

控制台

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. 确保您的 Google Cloud 项目已启用结算功能

  4. 启用 Cloud Build, Cloud Run, Cloud Storage, Eventarc, and Workflows API。

    启用 API

  5. Create a service account:

    1. In the Google Cloud console, go to the Create service account page.

      Go to Create service account
    2. Select your project.
    3. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

      In the Service account description field, enter a description. For example, Service account for quickstart.

    4. Click Create and continue.
    5. Grant the following roles to the service account: Cloud Run Admin, Eventarc Event Receiver, Logs Writer, Storage Object Creator, Workflows Invoker.

      To grant a role, find the Select a role list, then select the role.

      To grant additional roles, click Add another role and add each additional role.

    6. Click Continue.
    7. Click Done to finish creating the service account.

  6. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  7. 确保您的 Google Cloud 项目已启用结算功能

  8. 启用 Cloud Build, Cloud Run, Cloud Storage, Eventarc, and Workflows API。

    启用 API

  9. Create a service account:

    1. In the Google Cloud console, go to the Create service account page.

      Go to Create service account
    2. Select your project.
    3. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

      In the Service account description field, enter a description. For example, Service account for quickstart.

    4. Click Create and continue.
    5. Grant the following roles to the service account: Cloud Run Admin, Eventarc Event Receiver, Logs Writer, Storage Object Creator, Workflows Invoker.

      To grant a role, find the Select a role list, then select the role.

      To grant additional roles, click Add another role and add each additional role.

    6. Click Continue.
    7. Click Done to finish creating the service account.

  10. 如果您在 2021 年 4 月 8 日或之前启用了 Cloud Pub/Sub 服务代理,那么为了支持经过身份验证的 Pub/Sub 推送请求,请向该服务代理授予 Service Account Token Creator 角色 (roles/iam.serviceAccountTokenCreator)。否则,系统会默认授予此角色:
    1. 在 Google Cloud 控制台中,前往 IAM 页面。

      转到 IAM

    2. 选中包括 Google 提供的角色授权复选框。
    3. 名称列中,找到 Cloud Pub/Sub 服务帐号,然后点击相应行中的 修改主账号
    4. 点击 添加角色添加其他角色
    5. 选择角色列表中,过滤出 Service Account Token Creator,然后选择相应角色。
    6. 点击保存
  11. 在 Google Cloud 控制台中,激活 Cloud Shell。

    激活 Cloud Shell

    Cloud Shell 会话随即会在 Google Cloud 控制台的底部启动,并显示命令行提示符。Cloud Shell 是一个已安装 Google Cloud CLI 且已为当前项目设置值的 Shell 环境。该会话可能需要几秒钟时间来完成初始化。

  12. Cloud Shell 支持本教程中的命令行步骤。

gcloud

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 安装 Google Cloud CLI。
  3. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. 确保您的 Google Cloud 项目已启用结算功能

  6. Enable the Cloud Build, Cloud Run, Cloud Storage, Eventarc, and Workflows APIs:

    gcloud services enable cloudbuild.googleapis.com run.googleapis.com storage.googleapis.com eventarc.googleapis.com workflows.googleapis.com
  7. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant roles to the service account. Run the following command once for each of the following IAM roles: roles/eventarc.eventReceiver, roles/logging.logWriter, roles/run.admin, roles/storage.objectCreator, roles/workflows.invoker :

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • ROLE: the role to grant
  8. 安装 Google Cloud CLI。
  9. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  10. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  11. 确保您的 Google Cloud 项目已启用结算功能

  12. Enable the Cloud Build, Cloud Run, Cloud Storage, Eventarc, and Workflows APIs:

    gcloud services enable cloudbuild.googleapis.com run.googleapis.com storage.googleapis.com eventarc.googleapis.com workflows.googleapis.com
  13. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant roles to the service account. Run the following command once for each of the following IAM roles: roles/eventarc.eventReceiver, roles/logging.logWriter, roles/run.admin, roles/storage.objectCreator, roles/workflows.invoker :

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • ROLE: the role to grant
  14. 如果您在 2021 年 4 月 8 日或之前启用了 Cloud Pub/Sub 服务代理,以支持经过身份验证的 Pub/Sub 推送请求,请向该服务代理授予 Service Account Token Creator 角色 (roles/iam.serviceAccountTokenCreator)。否则,系统会默认授予此角色:
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com \
        --role=roles/iam.serviceAccountTokenCreator
  15. PROJECT_NUMBER 替换为您的 Google Cloud 项目编号。您可以在 Google Cloud 控制台的欢迎页面上或者通过运行以下命令找到项目编号:

    gcloud projects describe PROJECT_ID --format='value(projectNumber)'

创建 Cloud Run 作业

本教程使用的示例代码可在 GitHub 上找到。部署脚本会构建容器映像以创建 Cloud Run 作业。该脚本还会创建一个 Cloud Storage 存储桶。Cloud Run 作业会读取存储在 Cloud Storage 存储桶中的任何事件数据,然后输出事件数据。

  1. 通过将示例应用代码库克隆到本地机器来获取示例代码:

    git clone https://github.com/GoogleCloudPlatform/workflows-demos.git
    

    或者,您也可以下载该示例的 ZIP 文件

  2. 转到包含示例代码的目录:

    cd workflows-demos/cloud-run-jobs-payload-gcs/message-payload-job
    
  3. 通过运行部署脚本创建 Cloud Run 作业:

    ./deploy-job.sh
    

该脚本会创建一个名为 message-payload-PROJECT_ID 的 Cloud Storage 存储桶,其中 PROJECT_ID 是您的 Google Cloud 项目的 ID。系统还会创建一个名为 message-payload-job 的 Cloud Run 作业。

部署执行 Cloud Run 作业的工作流

定义并部署一个工作流,用于执行您刚刚创建的 Cloud Run 作业。工作流定义由使用 Workflows 语法描述的一系列步骤组成。

工作流接收事件,将事件数据保存到 Cloud Storage 存储桶,然后执行 Cloud Run 作业来处理事件数据。

控制台

  1. 在 Google Cloud 控制台中,前往工作流页面:

    进入 Workflows

  2. 点击 创建

  3. 输入新工作流的名称,例如 message-payload-workflow

  4. 选择合适的区域;例如 us-central1

  5. 服务帐号字段中,选择您之前创建的服务帐号。

    服务帐号将用作工作流的身份。您应该已向该服务帐号授予以下角色:

    • Cloud Run Admin:用于执行 Cloud Run 作业
    • Logs Writer:用于写入日志条目
    • Storage Object Creator:在 Cloud Storage 中创建对象
  6. 点击下一步

  7. 在工作流编辑器中,为工作流输入以下定义:

    main:
        params: [event]
        steps:
            - init:
                assign:
                    - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    - event_bucket: ${"message-payload-" + project_id}
                    - event_file: ${event.id + ".data.json"}
                    - job_name: message-payload-job
                    - job_location: us-central1
            - log_event:
                call: sys.log
                args:
                    data: ${event}
            - write_payload_to_gcs:
                call: http.post
                args:
                    url: ${"https://storage.googleapis.com/upload/storage/v1/b/" + event_bucket + "/o"}
                    auth:
                        type: OAuth2
                    query:
                        name: ${event_file}
                    body:
                        ${event.data}
            - run_job_to_process_payload:
                call: googleapis.run.v1.namespaces.jobs.run
                args:
                    name: ${"namespaces/" + project_id + "/jobs/" + job_name}
                    location: ${job_location}
                    body:
                        overrides:
                            containerOverrides:
                                env:
                                    - name: INPUT_BUCKET
                                      value: ${event_bucket}
                                    - name: INPUT_FILE
                                      value: ${event_file}
                result: job_execution
            - finish:
                return: ${job_execution}
  8. 点击部署

gcloud

  1. 为工作流创建源代码文件:

    touch message-payload-workflow.yaml
    
  2. 将以下工作流定义复制到 message-payload-workflow.yaml

    main:
        params: [event]
        steps:
            - init:
                assign:
                    - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    - event_bucket: ${"message-payload-" + project_id}
                    - event_file: ${event.id + ".data.json"}
                    - job_name: message-payload-job
                    - job_location: us-central1
            - log_event:
                call: sys.log
                args:
                    data: ${event}
            - write_payload_to_gcs:
                call: http.post
                args:
                    url: ${"https://storage.googleapis.com/upload/storage/v1/b/" + event_bucket + "/o"}
                    auth:
                        type: OAuth2
                    query:
                        name: ${event_file}
                    body:
                        ${event.data}
            - run_job_to_process_payload:
                call: googleapis.run.v1.namespaces.jobs.run
                args:
                    name: ${"namespaces/" + project_id + "/jobs/" + job_name}
                    location: ${job_location}
                    body:
                        overrides:
                            containerOverrides:
                                env:
                                    - name: INPUT_BUCKET
                                      value: ${event_bucket}
                                    - name: INPUT_FILE
                                      value: ${event_file}
                result: job_execution
            - finish:
                return: ${job_execution}
  3. 输入以下命令以部署工作流:

    gcloud workflows deploy message-payload-workflow \
        --location=us-central1 \
        --source=message-payload-workflow.yaml \
        --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
    

    请替换以下内容:

    • SERVICE_ACCOUNT_NAME:您之前创建的服务帐号的名称
    • PROJECT_ID:您的 Google Cloud 项目的 ID

    服务帐号将用作工作流的身份。您应该已向该服务帐号授予以下角色:

    • roles/logging.logWriter:用于写入日志条目
    • roles/run.admin:用于执行 Cloud Run 作业
    • roles/storage.objectCreator:用于在 Cloud Storage 中创建对象

工作流会执行以下操作:

  1. init 步骤 - 接受事件作为参数,并设置必要的变量。

  2. log_event 步骤 - 使用函数 sys.log 在 Cloud Logging 中创建日志条目。

  3. write_payload_to_gcs 步骤 - 发出 HTTP POST 请求,并将事件载荷数据写入 Cloud Storage 存储桶文件。

  4. run_job_to_process_payload 步骤 - 使用 Cloud Run Admin API 连接器方法 googleapis.run.v1.namespaces.jobs.run 执行作业。Cloud Storage 存储桶和数据文件名会作为替换变量从工作流传递到作业。

  5. finish 步骤 - 返回有关作为工作流结果的作业执行的信息。

创建 Pub/Sub 主题

创建 Pub/Sub 主题,以便向其发布消息。Pub/Sub 事件用于演示如何使用 Workflows 路由事件,并将事件保存到 Cloud Storage,以便 Cloud Run 作业可以处理事件数据。

控制台

  1. 在 Google Cloud 控制台中,前往主题页面。

    打开“主题”

  2. 点击 创建主题

  3. 主题 ID 字段中,输入主题的 ID,例如 message-payload-topic

  4. 保留添加默认订阅选项。

  5. 请勿选择其他选项。

  6. 点击创建

gcloud

如需创建 ID 为 message-payload-topic 的主题,请运行 gcloud pubsub topics create 命令:

gcloud pubsub topics create message-payload-topic

创建 Eventarc 触发器以将事件路由到工作流

如需自动执行工作流并进而执行 Cloud Run 作业,请创建一个 Eventarc 触发器来响应 Pub/Sub 事件,并将事件路由到工作流。每当有消息写入 Pub/Sub 主题时,该事件都会触发工作流的执行。

控制台

  1. 在 Google Cloud 控制台中,前往工作流页面:

    进入 Workflows

  2. 点击工作流的名称,例如 message-payload-workflow

  3. 工作流详情页面上,点击 修改

  4. 修改工作流页面的触发器部分中,点击添加新触发器 > Eventarc

    系统随即会打开 Eventarc 触发器窗格。

  5. 触发器名称字段中,输入触发器的名称,例如 message-payload-trigger

  6. 事件提供方列表中,选择 Cloud Pub/Sub

  7. 事件列表中,选择 google.cloud.pubsub.topic.v1.messagePublish

  8. 选择 Cloud Pub/Sub 主题列表中,选择您之前创建的 Pub/Sub 主题。

  9. 服务帐号字段中,选择您之前创建的服务帐号。

    服务帐号充当触发器的身份。您应该已向该服务帐号授予以下角色:

    • Eventarc 事件接收器:用于接收事件
    • Workflows Invoker:执行工作流
  10. 点击保存触发器

    Eventarc 触发器现在显示在修改工作流页面上的触发器部分中。

  11. 点击下一步

  12. 点击部署

gcloud

运行以下命令,创建 Eventarc 触发器:

gcloud eventarc triggers create message-payload-trigger \
    --location=us-central1 \
    --destination-workflow=message-payload-workflow \
    --destination-workflow-location=us-central1 \
    --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
    --transport-topic=projects/PROJECT_ID/topics/message-payload-topic \
    --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

请替换以下内容:

  • PROJECT_ID:您的 Google Cloud 项目的 ID
  • SERVICE_ACCOUNT_NAME:您之前创建的服务帐号的名称。

服务帐号充当触发器的身份。您应该已向该服务帐号授予以下角色:

  • roles/eventarc.eventReceiver:用于接收事件
  • roles/workflows.invoker:用于执行工作流

触发工作流

通过向 Pub/Sub 主题发布消息并生成事件来测试端到端系统。如需了解详情,请参阅使用事件或 Pub/Sub 消息触发工作流

  1. 向 Pub/Sub 主题发布消息以生成事件:

    gcloud pubsub topics publish message-payload-topic --message="Hello World"
    

    系统将事件路由到用于记录事件消息、将事件数据保存到 Cloud Storage 存储桶并执行 Cloud Run 作业以处理 Cloud Storage 中保存数据的工作流。此过程可能需要一分钟时间。

  2. 通过查看作业执行情况,确认 Cloud Run 作业按预期运行:

    gcloud run jobs executions list --job=message-payload-job
    

    您应该会在输出中看到新的作业执行。

  3. 如需查看因触发工作流而创建的与事件相关的日志条目,请运行以下命令:

    gcloud logging read "resource.type=cloud_run_job AND textPayload:Payload"
    
  4. 查找类似如下的日志条目:

    textPayload: "Payload: {'message': {'data': 'SGVsbG8gV29ybGQ=', 'messageId': '8834108235224238',\
    \ 'publishTime': '2023-09-20T17:07:52.921Z'}, 'subscription': 'projects/MY_PROJECT/subscriptions/eventarc-us-central1-message-payload-trigger-sub-741'}"
    ...
    resource:
    labels:
      job_name: message-payload-job
      location: us-central1
      project_id: MY_PROJECT
    type: cloud_run_job
    textPayload: Processing message payload gs://message-payload-MY_PROJECT/8254002311197919.data.json
    
  5. 您可以通过查看 Cloud Storage 存储桶对象中的事件数据来确认结果是否符合预期。

    1. 检索存储桶名称:

      gsutil ls

      输出类似于以下内容:

      gs://message-payload-PROJECT_ID/

    2. 列出存储桶中的对象:

      gsutil ls -r gs://message-payload-PROJECT_ID/**

      输出应类似如下所示:

      gs://message-payload-PROJECT_ID/OBJECT_ID.data.json

      请记下要在下一步中使用的 OBJECT_ID

    3. 将存储桶中的对象下载为文件:

      gcloud storage cp gs://message-payload-PROJECT_ID/OBJECT_ID.data.json message-event.txt

      OBJECT_ID 替换为上一步中返回的 ID。

    4. 在文本编辑器中,打开 message-event.txt 文件。写入该文件的事件正文应如下所示:

      {
        "message": {
          "data": "SGVsbG8gV29ybGQ=",
          "messageId": "8254002311197919",
          "publishTime": "2023-09-20T16:54:29.312Z"
        },
        "subscription": "projects/MY_PROJECT/subscriptions/eventarc-us-central1-message-payload-trigger-sub-741"
      }
      

      请注意,如果您从 SGVsbG8gV29ybGQ= 的 Base64 格式中解码其数据值,系统会返回“Hello World”。

清理

如果您为本教程创建了一个新项目,请删除项目。 如果您使用的是现有项目,希望保留此项目且不保留本教程中添加的任何更改,请删除为教程创建的资源

删除项目

若要避免产生费用,最简单的方法是删除您为本教程创建的项目。

要删除项目,请执行以下操作:

  1. 在 Google Cloud 控制台中,进入管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

删除教程资源

删除您在本教程中创建的资源:

  1. 删除 Pub/Sub 主题

  2. 删除 Eventarc 触发器

  3. 删除 Workflows 工作流

  4. 删除 Cloud Run 作业

  5. 删除 Cloud Storage 存储桶

后续步骤