使用 Flex 模板

使用集合让一切井井有条 根据您的偏好保存内容并对其进行分类。

本教程介绍如何通过 Google Cloud CLI 使用自定义 Docker 映像创建 Dataflow Flex 模板作业,然后运行该作业。本教程将指导您完成一个流式处理流水线示例,该示例从 Pub/Sub 读取 JSON 编码的消息,使用 Beam SQL 转换消息数据,再将结果写入 BigQuery 表。在本教程中,您将容器映像存储在 Artifact Registry 中。 Flex 模板还可以使用存储在私有注册表中的预构建映像

如需详细了解 Flex 模板,请参阅 Dataflow 模板

目标

  • 构建 Docker 容器映像。
  • 创建并运行 Dataflow Flex 模板。

费用

本教程使用 Google Cloud 的以下收费组件:

  • Dataflow
  • Pub/Sub
  • Cloud Storage
  • Cloud Scheduler
  • App Engine
  • Artifact Registry
  • Cloud Build
  • BigQuery

请使用价格计算器根据您的预计使用情况来估算费用。

准备工作

本部分介绍如何启用 API、创建服务帐号以及向服务帐号授予 Owner 角色。在生产环境中,请勿授予 Owner 角色。请改用适当的 Dataflow 专用权限和角色。如需了解详情,请参阅了解 Flex 模板权限

  1. 登录您的 Google Cloud 帐号。如果您是 Google Cloud 新手,请创建一个帐号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 安装初始化 Google Cloud CLI。
  3. 创建或选择 Google Cloud 项目。

    • 创建 Cloud 项目:

      gcloud projects create PROJECT_ID
    • 选择您创建的 Cloud 项目:

      gcloud config set project PROJECT_ID
  4. 确保您的 Cloud 项目已启用结算功能。了解如何检查项目是否已启用结算功能

  5. Enable the Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Pub/Sub, Resource Manager, App Engine, Artifact Registry, Cloud Scheduler, and Cloud Build APIs:

    gcloud services enable dataflowcompute_componentloggingstorage_componentstorage_apibigquerypubsubcloudresourcemanager.googleapis.comappengine.googleapis.comartifactregistry.googleapis.com
    cloudscheduler.googleapis.comcloudbuild.googleapis.com
  6. 为您的 Google 帐号创建身份验证凭据:

    gcloud auth application-default login
  7. 向您的 Google 帐号授予角色。对以下每个 IAM 角色运行以下命令一次: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID 替换为您的项目 ID。
    • EMAIL_ADDRESS 替换为您的电子邮件地址。
    • ROLE 替换为每个角色。
  8. 安装初始化 Google Cloud CLI。
  9. 创建或选择 Google Cloud 项目。

    • 创建 Cloud 项目:

      gcloud projects create PROJECT_ID
    • 选择您创建的 Cloud 项目:

      gcloud config set project PROJECT_ID
  10. 确保您的 Cloud 项目已启用结算功能。了解如何检查项目是否已启用结算功能

  11. Enable the Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Pub/Sub, Resource Manager, App Engine, Artifact Registry, Cloud Scheduler, and Cloud Build APIs:

    gcloud services enable dataflowcompute_componentloggingstorage_componentstorage_apibigquerypubsubcloudresourcemanager.googleapis.comappengine.googleapis.comartifactregistry.googleapis.com
    cloudscheduler.googleapis.comcloudbuild.googleapis.com
  12. 为您的 Google 帐号创建身份验证凭据:

    gcloud auth application-default login
  13. 向您的 Google 帐号授予角色。对以下每个 IAM 角色运行以下命令一次: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID 替换为您的项目 ID。
    • EMAIL_ADDRESS 替换为您的电子邮件地址。
    • ROLE 替换为每个角色。
  14. 向您的 Compute Engine 默认服务帐号授予角色。对以下每个 IAM 角色运行以下命令一次:roles/dataflow.adminroles/dataflow.workerroles/bigquery.dataEditorroles/pubsub.editorroles/storage.objectAdmin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
    • PROJECT_ID 替换为您的项目 ID。
    • PROJECT_NUMBER 替换为您的项目编号。 如需查找项目编号,请参阅标识项目
    • SERVICE_ACCOUNT_ROLE 替换为每个角色。

完成本教程后,您可以删除所创建的资源以避免继续计费。如需了解详情,请参阅清理

创建示例来源和接收器

本部分介绍如何创建以下内容:

  • 使用 Pub/Sub 的流式处理数据源
  • 将数据加载到 BigQuery 中的数据集

创建 Cloud Storage 存储桶

使用 gsutil mb 命令:

gsutil mb gs://BUCKET_NAME

BUCKET_NAME 替换为符合存储桶命名要求的 Cloud Storage 存储桶的名称。Cloud Storage 存储桶名称必须是全局唯一的。

创建一个 Pub/Sub 主题并订阅该主题

使用 Google Cloud CLI:

gcloud pubsub topics create TOPIC_ID
gcloud pubsub subscriptions create --topic TOPIC_ID SUBSCRIPTION_ID
  • TOPIC_ID 替换为您的 Pub/Sub 主题的名称。
  • SUBSCRIPTION_ID 替换为您的 Pub/Sub 订阅的名称。

创建 Cloud Scheduler 作业

在该步骤中,我们将使用 Google Cloud CLI 创建和运行一个 Cloud Scheduler 作业,该作业会发布“正面评分”和“负面评分”。

  1. 针对此 Google Cloud 项目创建一个 Cloud Scheduler 作业。
         gcloud scheduler jobs create pubsub positive-ratings-publisher \
           --schedule="* * * * *" \
           --location=REGION \
           --topic="TOPIC_ID" \
           --message-body='{"url": "https://beam.apache.org/", "review": "positive"}'
  2. REGION 替换为用于部署 Dataflow 作业的区域端点REGION 变量的值必须是有效的区域名称。如需详细了解区域和位置,请参阅 Dataflow 位置。这将创建并运行一个“正面评分”发布者,该发布者每分钟发布一条消息。
  3. 启动 Cloud Scheduler 作业。
    gcloud scheduler jobs run --location=REGION positive-ratings-publisher
    
  4. 再创建并运行一个类似的“负面评分”发布者,该发布者每两分钟发布一条消息。
    gcloud scheduler jobs create pubsub negative-ratings-publisher \
      --schedule="*/2 * * * *" \
      --location=REGION  \
      --topic="TOPIC_ID" \
      --message-body='{"url": "https://beam.apache.org/", "review": "negative"}'
    
    gcloud scheduler jobs run --location=REGION negative-ratings-publisher
    

创建 BigQuery 数据集

使用 bq mk 命令:

  bq --location=REGION mk \
  PROJECT_ID:DATASET_ID
  • PROJECT_ID 替换为项目的项目 ID。
  • DATASET_ID 替换为您的数据集的名称。如需详细了解如何命名数据集,请参阅“创建数据集”中的命名数据集

下载代码示例并更改目录

下载代码示例,然后更改目录。

Java

克隆 java-docs-samples 代码库

  git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git

找到本教程的代码示例。

  cd java-docs-samples/dataflow/flex-templates/streaming_beam_sql

Python

克隆 python-docs-samples 代码库

  git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

找到本教程的代码示例。

  cd python-docs-samples/dataflow/flex-templates/streaming_beam

设置开发环境

Java

  1. 下载并安装 Java Development Kit (JDK) 版本 11。验证 JAVA_HOME 环境变量是否已设置并指向您的 JDK 安装。
  2. 下载 Apache Maven,并按照适用于您的具体操作系统的 Maven 安装指南进行安装。
  3. (可选)在本地运行 Apache Beam 流水线以进行开发。
      mvn compile exec:java \
        -Dexec.mainClass=org.apache.beam.samples.StreamingBeamSql \
        -Dexec.args="\
          --project=PROJECT_ID \
          --inputSubscription=SUBSCRIPTION_ID \
          --outputTable=PROJECT_ID:DATASET_ID.TABLE_ID \
          --tempLocation=gs://BUCKET_NAME/samples/dataflow/temp"

    TABLE_ID 替换为您的表的名称。

  4. 在 Uber JAR 文件中构建 Java 项目。
      mvn clean package
  5. (可选)请记下 Uber JAR 文件相对于原始文件的大小。
      ls -lh target/*.jar
    此 Uber JAR 文件中嵌入了所有依赖项。您可以将此文件作为在其他库上没有外部依赖项的独立应用运行。

Python

使用 Python 版 Apache Beam SDK

创建和构建容器映像

  1. 您必须先创建 Artifact Registry 代码库,然后才能上传工件。每个代码库可以包含一种受支持格式的工件。

    所有代码库内容都已使用 Google 管理的加密密钥或客户管理的加密密钥进行加密。Artifact Registry 默认使用 Google 管理的加密密钥,此选项无需进行任何配置。

    您必须至少具有代码库的 Artifact Registry Writer 权限

    运行以下命令创建新代码库。该命令使用 --async 标志并立即返回,无需等待正在进行的操作完成。

    gcloud artifacts repositories create REPOSITORY \
        --repository-format=docker \
        --location=REGION \
        --async 

    REPOSITORY 替换为您的代码库的名称。对于项目中的每个代码库位置,代码库名称不得重复。

  2. (可选)创建 Dockerfile。

    Java

    请注意,使用 Dockerfile 构建的 Flex 模板容器仅用于创建作业图以及启动 Dataflow 作业。在 Flex 模板容器中安装的软件包在 Beam 容器中无法使用。

    如果要使软件包成为 Beam 容器的一部分,您必须在 pom.xml 文件中指定这些软件包。

    您可以在本教程中自定义 Dockerfile。起始文件如下所示:

      FROM gcr.io/dataflow-templates-base/java11-template-launcher-base
    
      ARG WORKDIR=/dataflow/template
      RUN mkdir -p ${WORKDIR}
      WORKDIR ${WORKDIR}
    
      COPY pom.xml .
      COPY src .
    
      ENV FLEX_TEMPLATE_JAVA_MAIN_CLASS="org.apache.beam.samples.StreamingBeamSql"
      ENV FLEX_TEMPLATE_JAVA_CLASSPATH="${WORKDIR}/src/main/java/org/apache/beam/samples/StreamingBeamSql.java"
    
      ENTRYPOINT ["/opt/google/dataflow/java_template_launcher"]

    此 Dockerfile 包含 FROMENVCOPY 命令,您可以在 Dockerfile 参考 中阅读相关信息。

    Python

    在创建 Dockerfile 期间,您需要添加用以安装 apache-beam 来生成作业图的相关命令。请注意,使用 Dockerfile 构建的 Flex 模板容器仅用于创建作业图以及启动 Dataflow 作业。在 Flex 模板容器中安装的软件包在 Beam 容器中无法使用。

    如果要使软件包成为 Beam 容器的一部分,您必须在 requirements.txt 文件中指定这些软件包。切勿在 requirements.txt 文件中指定 apache-beam。Beam 容器已经有 apache-beam

    您可以在本教程中自定义 Dockerfile。起始文件如下所示:

      FROM gcr.io/dataflow-templates-base/python3-template-launcher-base
    
      ARG WORKDIR=/dataflow/template
      RUN mkdir -p ${WORKDIR}
      WORKDIR ${WORKDIR}
    
      COPY requirements.txt .
      COPY streaming_beam.py .
    
      # Do not include `apache-beam` in requirements.txt
      ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt"
      ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/streaming_beam.py"
    
      # Install apache-beam and other dependencies to launch the pipeline
      RUN pip install apache-beam[gcp]
      RUN pip install -U -r ./requirements.txt

    此 Dockerfile 包含 FROMENVCOPY 命令,您可以在 Dockerfile 参考 中阅读相关信息。

  3. 请先配置 Docker 以对 Artifact Registry 的请求进行身份验证,然后再推送或拉取映像。如需为 Docker 代码库设置身份验证,请运行以下命令:
    gcloud auth configure-docker REGION-docker.pkg.dev

    该命令将更新您的 Docker 配置。现在,您可以在 Google Cloud 项目中与 Artifact Registry 连接以推送映像。

  4. 结合使用 Dockerfile 和 Cloud Build 来构建 Docker 映像。

    gcloud builds submit --tag REGION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/streaming-beam-sql:latest .
    

    此命令会构建文件并将其推送到您的 Artifact Registry 代码库。

元数据

您可以使用附加元数据扩展模板,以便在运行模板时验证自定义参数。如果要为模板创建元数据,请按照以下步骤操作:

  1. 使用元数据参数中的参数创建 metadata.json 文件。

    如需查看示例,请参阅示例元数据文件

  2. 将元数据文件存储在 Cloud Storage 中模板所在的文件夹内。

元数据参数

参数键 必需 值的说明
name 模板的名称。
description 对模板进行说明的一小段文本。
parameters 模板使用的一组附加参数。默认情况下,使用空数组。
name 模板中使用的参数的名称。
label 人类可读的字符串,用于在 Google Cloud 控制台中标记参数。
helpText 对参数进行说明的一小段文本。
isOptional 如果参数是必需的,则为 false;如果参数是可选的,则为 true。除非设置了值,否则 isOptional 默认为 false。 如果您没有为元数据添加此参数键,则元数据会成为必需参数。
regexes 一组字符串形式的 POSIX-egrep 正则表达式(用于验证参数的值)。例如,["^[a-zA-Z][a-zA-Z0-9]+"] 是一个正则表达式,用于验证以字母开头,并且包含一个或多个字符的值。默认使用空数组。

示例元数据文件

Java

{
  "name": "Streaming Beam SQL",
  "description": "An Apache Beam streaming pipeline that reads JSON encoded messages from Pub/Sub, uses Beam SQL to transform the message data, and writes the results to a BigQuery",
  "parameters": [
    {
      "name": "inputSubscription",
      "label": "Pub/Sub input subscription.",
      "helpText": "Pub/Sub subscription to read from.",
      "regexes": [
        "[a-zA-Z][-_.~+%a-zA-Z0-9]{2,}"
      ]
    },
    {
      "name": "outputTable",
      "label": "BigQuery output table",
      "helpText": "BigQuery table spec to write to, in the form 'project:dataset.table'.",
      "isOptional": true,
      "regexes": [
        "[^:]+:[^.]+[.].+"
      ]
    }
  ]
}

Python

{
  "name": "Streaming beam Python flex template",
  "description": "Streaming beam example for python flex template.",
  "parameters": [
    {
      "name": "input_subscription",
      "label": "Input PubSub subscription.",
      "helpText": "Name of the input PubSub subscription to consume from.",
      "regexes": [
        "projects/[^/]+/subscriptions/[a-zA-Z][-_.~+%a-zA-Z0-9]{2,}"
      ]
    },
    {
      "name": "output_table",
      "label": "BigQuery output table name.",
      "helpText": "Name of the BigQuery output table name.",
      "isOptional": true,
      "regexes": [
        "([^:]+:)?[^.]+[.].+"
      ]
    }
  ]
}

您可以从 Dataflow 模板目录下载 Google 提供的模板的元数据文件。

创建 Flex 模板

要运行模板,您需要在 Cloud Storage 存储桶中创建模板规范文件,其中包含运行作业所需的所有必要信息,例如 SDK 信息和元数据。

本教程使用示例元数据文件,该文件包含模板的其他信息,例如 namedescription 和输入 parameters 字段。

使用 gcloud dataflow flex-template build 命令在 Cloud Storage 存储桶中创建名为 streaming-beam-sql.json 的 Flex 模板。在此命令中,您可以指定文件参数,包括文件名和存储位置。

Java

  gcloud dataflow flex-template build gs://BUCKET_NAME/samples/dataflow/templates/streaming-beam-sql.json \
      --image-gcr-path "REGION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/streaming-beam-sql:latest" \
      --sdk-language "JAVA" \
      --flex-template-base-image JAVA11 \
      --metadata-file "metadata.json" \
      --jar "target/streaming-beam-sql-1.0.jar" \
      --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="org.apache.beam.samples.StreamingBeamSql"

Python

  gcloud dataflow flex-template build gs://BUCKET_NAME/samples/dataflow/templates/streaming-beam-sql.json \
     --image "REGION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/streaming-beam-sql:latest" \
     --sdk-language "PYTHON" \
     --metadata-file "metadata.json"
现在,您可以通过您指定的 Cloud Storage 位置中的模板文件使用该模板。

运行 Flex 模板流水线

现在,您可以通过引用模板文件并传递流水线所需的模板参数,在 Dataflow 中运行 Apache Beam 流水线。
  1. 在 shell 或终端中,运行模板:

    Java

    gcloud dataflow flex-template run "streaming-beam-sql-`date +%Y%m%d-%H%M%S`" \
        --template-file-gcs-location "gs://BUCKET_NAME/samples/dataflow/templates/streaming-beam-sql.json" \
        --parameters inputSubscription="SUBSCRIPTION_ID" \
        --parameters outputTable="PROJECT_ID:DATASET_ID.TABLE_ID" \
        --region "REGION"

    Python

    gcloud dataflow flex-template run "streaming-beam-`date +%Y%m%d-%H%M%S`" \
        --template-file-gcs-location "gs://BUCKET_NAME/samples/dataflow/templates/streaming-beam-sql.json" \
        --parameters input_subscription="projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID" \
        --parameters output_table="PROJECT_ID:DATASET_ID.TABLE_ID" \
        --region "REGION"

    TABLE_ID 替换为您的表的名称。

    或者,您可以使用

    REST API 请求运行模板:

    Java

    export REGION="us-central1"
    
    curl -X POST \
      "https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launch" \
      -H "Content-Type: application/json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)" \
      -d '{
        "launch_parameter": {
          "jobName": "streaming-beam-sql-'$(date +%Y%m%d-%H%M%S)'",
          "parameters": {
            "inputSubscription": "'SUBSCRIPTION_ID'",
            "outputTable": "'PROJECT_ID:DATASET_ID.TABLE_ID'"
          },
          "containerSpecGcsPath": "'gs://BUCKET_NAME/samples/dataflow/templates/streaming-beam-sql.json'"
        }
      }'

    Python

    export REGION="us-central1"
    
    curl -X POST \
      "https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launch" \
      -H "Content-Type: application/json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)" \
      -d '{
        "launch_parameter": {
          "jobName": "streaming-beam-sql-'$(date +%Y%m%d-%H%M%S)'",
          "parameters": {
            "input_subscription": "projects/'PROJECT_ID'/subscriptions/'SUBSCRIPTION_ID'",
            "output_table": "'PROJECT_ID:DATASET_ID.TABLE_ID'"
          },
          "containerSpecGcsPath": "'gs://BUCKET_NAME/samples/dataflow/templates/streaming-beam-sql.json'"
        }
      }'
  2. 在您执行运行 Flex 模板的命令后,Dataflow 将返回作业状态为已加入队列的作业 ID。可能几分钟后作业状态才会变为正在运行,您才可以访问作业图。
  3. 通过运行以下查询来查看 BigQuery 中的结果:
    bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.DATASET_ID.TABLE_ID"'`'
    
    在此流水线的运行期间,您可以看到 BigQuery 表中每分钟都在附加新行。

清理

完成本教程后,您可清理在 Google Cloud 上创建的资源,以免这些资源将来产生费用。以下部分介绍如何删除或关闭这些资源。

清理 Flex 模板资源

  1. 停止 Dataflow 流水线。
    gcloud dataflow jobs list \
      --filter 'NAME=streaming-beam-sql AND STATE=Running' \
      --format 'value(JOB_ID)' \
      --region "REGION" \
      | xargs gcloud dataflow jobs cancel --region "REGION"
  2. 从 Cloud Storage 中删除模板规范文件。
    gsutil rm gs://BUCKET_NAME/samples/dataflow/templates/streaming-beam-sql.json
  3. 删除 Artifact Registry 代码库。
    gcloud artifacts repositories delete REPOSITORY \
    --location=REGION --async

清理 Google Cloud 项目资源

  1. 删除 Cloud Scheduler 作业。
    gcloud scheduler jobs delete negative-ratings-publisher --location=REGION
    
    gcloud scheduler jobs delete positive-ratings-publisher --location=REGION
    
  2. 删除 Pub/Sub 订阅和主题。
    gcloud pubsub subscriptions delete SUBSCRIPTION_ID
    gcloud pubsub topics delete TOPIC_ID
    
  3. 删除 BigQuery 表。
    bq rm -f -t PROJECT_ID:DATASET_ID.TABLE_ID
    
  4. 删除 BigQuery 数据集,只执行此操作不会产生任何费用。

    以下命令还会删除该数据集中的所有表。表和数据无法恢复。

    bq rm -r -f -d PROJECT_ID:DATASET_ID
    
  5. 删除 Cloud Storage 存储桶,只执行此操作不会产生任何费用。

    以下命令还会删除该存储桶中的所有对象。这些对象无法恢复。

    gsutil rm -r gs://BUCKET_NAME
    

撤消凭据

  1. 可选:撤消您创建的身份验证凭据,并删除本地凭据文件。

    gcloud auth application-default revoke
  2. 可选:从 gcloud CLI 撤消凭据。

    gcloud auth revoke
  3. 撤消授予 Compute Engine 默认服务帐号的角色。

    对以下每个 IAM 角色运行以下命令一次:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/bigquery.dataEditor
    • roles/pubsub.editor
    • roles/storage.objectAdmin
    gcloud projects remove-iam-policy-binding PROJECT_ID \
      --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
      --role=SERVICE_ACCOUNT_ROLE

限制

以下限制适用于 Flex 模板作业:

  • 您必须使用 Google 提供的基础映像来使用 Docker 打包容器。 如需查看适用映像的列表,请参阅 Flex 模板基础映像
  • 必须在调用 run 后退出用于构建流水线的程序,流水线才能启动。
  • 不支持 waitUntilFinish (Java) 和 wait_until_finish (Python)。

后续步骤