使用 Flex 模板

本教程介绍如何使用 gcloud 命令行工具创建和运行包含自定义 Docker 映像的 Dataflow Flex 模板作业。本教程将指导您完成一个流式处理流水线示例,该示例从 Pub/Sub 读取 JSON 编码的消息,使用 Beam SQL 转换消息数据,再将结果写入 BigQuery 表。

如需详细了解 Flex 模板,请参阅 Dataflow 模板

目标

  • 构建 Docker 容器映像。
  • 创建并运行 Dataflow Flex 模板。

费用

本教程使用 Google Cloud 的以下收费组件:

  • Dataflow
  • Pub/Sub
  • Cloud Storage
  • Cloud Scheduler
  • App Engine
  • Container Registry
  • Cloud Build
  • BigQuery

请使用价格计算器根据您的预计使用情况来估算费用。

准备工作

  1. 登录您的 Google Cloud 帐号。如果您是 Google Cloud 新手,请创建一个帐号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 在 Google Cloud Console 的项目选择器页面上,选择或创建一个 Google Cloud 项目。

    转到“项目选择器”

  3. 确保您的 Cloud 项目已启用结算功能。 了解如何确认您的项目是否已启用结算功能

  4. 启用 Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Pub/Sub, Resource Manager, App Engine, Cloud Scheduler, and Cloud Build API。

    启用 API

  5. 创建服务帐号:

    1. 在 Cloud Console 中,转到创建服务帐号页面。

      转到“创建服务帐号”
    2. 选择一个项目。
    3. 服务帐号名称字段中,输入一个名称。 Cloud Console 会根据此名称填充服务帐号 ID 字段。

      服务帐号说明字段中,输入说明。例如,Service account for quickstart

    4. 点击创建并继续
    5. 点击选择角色字段。

      快速访问下,点击基本,然后点击所有者

    6. 点击继续
    7. 点击完成以完成服务帐号的创建过程。

      不要关闭浏览器窗口。您将在下一步骤中用到它。

  6. 创建服务帐号密钥:

    1. 在 Cloud Console 中,点击您创建的服务帐号的电子邮件地址。
    2. 点击密钥
    3. 依次点击添加密钥创建新密钥
    4. 点击创建。JSON 密钥文件将下载到您的计算机上。
    5. 点击关闭
  7. 将环境变量 GOOGLE_APPLICATION_CREDENTIALS 设置为包含您的服务帐号密钥的 JSON 文件的路径。 此变量仅适用于当前的 shell 会话,因此,如果您打开新的会话,请重新设置该变量。

  8. 在 Google Cloud Console 的项目选择器页面上,选择或创建一个 Google Cloud 项目。

    转到“项目选择器”

  9. 确保您的 Cloud 项目已启用结算功能。 了解如何确认您的项目是否已启用结算功能

  10. 启用 Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Pub/Sub, Resource Manager, App Engine, Cloud Scheduler, and Cloud Build API。

    启用 API

  11. 创建服务帐号:

    1. 在 Cloud Console 中,转到创建服务帐号页面。

      转到“创建服务帐号”
    2. 选择一个项目。
    3. 服务帐号名称字段中,输入一个名称。 Cloud Console 会根据此名称填充服务帐号 ID 字段。

      服务帐号说明字段中,输入说明。例如,Service account for quickstart

    4. 点击创建并继续
    5. 点击选择角色字段。

      快速访问下,点击基本,然后点击所有者

    6. 点击继续
    7. 点击完成以完成服务帐号的创建过程。

      不要关闭浏览器窗口。您将在下一步骤中用到它。

  12. 创建服务帐号密钥:

    1. 在 Cloud Console 中,点击您创建的服务帐号的电子邮件地址。
    2. 点击密钥
    3. 依次点击添加密钥创建新密钥
    4. 点击创建。JSON 密钥文件将下载到您的计算机上。
    5. 点击关闭
  13. 将环境变量 GOOGLE_APPLICATION_CREDENTIALS 设置为包含您的服务帐号密钥的 JSON 文件的路径。 此变量仅适用于当前的 shell 会话,因此,如果您打开新的会话,请重新设置该变量。

完成本教程后,您可以删除所创建的资源以避免继续计费。如需了解详情,请参阅清理

创建示例来源和接收器

本部分介绍如何创建以下内容:

  • 使用 Pub/Sub 的流式处理数据源
  • 将数据加载到 BigQuery 中的数据集

创建 Cloud Storage 存储分区

使用 gsutil mb 命令:

export BUCKET="my-storage-bucket"
gsutil mb gs://$BUCKET

创建一个 Pub/Sub 主题并订阅该主题

使用 gcloud 命令行工具

export TOPIC="messages"
export SUBSCRIPTION="ratings"

gcloud pubsub topics create $TOPIC
gcloud pubsub subscriptions create --topic $TOPIC $SUBSCRIPTION

创建 Cloud Scheduler 作业

在该步骤中,我们将使用 gcloud 命令行工具创建和运行一个 Cloud Scheduler 作业,该作业会发布“正面评分”和“负面评分”。

  1. 针对此 Google Cloud 项目创建一个 Cloud Scheduler 作业。
    gcloud scheduler jobs create pubsub positive-ratings-publisher \
      --schedule="* * * * *" \
      --topic="$TOPIC" \
      --message-body='{"url": "https://beam.apache.org/", "review": "positive"}'
    
  2. 这将创建并运行一个“正面评分”发布程序,该程序每分钟发布一条消息。
  3. 启动 Cloud Scheduler 作业。
    gcloud scheduler jobs run positive-ratings-publisher
    
  4. 再创建并运行一个类似的“负面评分”发布程序,该程序每两分钟发布一条消息。
    gcloud scheduler jobs create pubsub negative-ratings-publisher \
      --schedule="*/2 * * * *" \
      --topic="$TOPIC" \
      --message-body='{"url": "https://beam.apache.org/", "review": "negative"}'
    
    gcloud scheduler jobs run negative-ratings-publisher
    

创建 BigQuery 数据集

使用 bq mk 命令:

export PROJECT="$(gcloud config get-value project)"
export DATASET="beam_samples"
export TABLE="streaming_beam_sql"

bq mk --dataset "$PROJECT:$DATASET"

下载代码示例

  1. 下载代码示例。

    Java

    克隆 Java-docs-samples 代码库,并导航到本教程的代码示例。

      git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
      cd java-docs-samples/dataflow/flex-templates/streaming_beam_sql

    Python

    克隆 python-docs-samples 代码库,并导航到本教程的代码示例。

      git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
      cd python-docs-samples/dataflow/flex-templates/streaming_beam
  2. 导出本教程的 TEMPLATE_IMAGE
    export TEMPLATE_IMAGE="gcr.io/$PROJECT/samples/dataflow/streaming-beam-sql:latest"
    

设置开发环境

Java

  1. 下载并安装 Java Development Kit (JDK) 版本 11。验证 JAVA_HOME 环境变量是否已设置并指向您的 JDK 安装。
  2. 下载 Apache Maven,并按照适用于您的具体操作系统的 Maven 安装指南进行安装。
  3. (可选)在本地运行 Apache Beam 流水线以进行开发。
      mvn compile exec:java \
        -Dexec.mainClass=org.apache.beam.samples.StreamingBeamSql \
        -Dexec.args="\
          --project=$PROJECT \
          --inputSubscription=$SUBSCRIPTION \
          --outputTable=$PROJECT:$DATASET.$TABLE \
          --tempLocation=gs://$BUCKET/samples/dataflow/temp"
  4. 在 Uber JAR 文件中构建 Java 项目。
      mvn clean package
  5. (可选)请记下 Uber JAR 文件相对于原始文件的大小。
      ls -lh target/*.jar
    此 Uber JAR 文件中嵌入了所有依赖项。您可以将此文件作为在其他库上没有外部依赖项的独立应用运行。

Python

使用 Python 版 Apache Beam SDK

仅限 Python:创建和构建容器映像

本部分包含面向 Python 用户的步骤。如果您使用的是 Java,请跳过以下步骤。

如果您的作业无法运行,并显示错误消息 A Timeout in polling error message,请参阅问题排查步骤

  1. (可选)默认启用对 Kaniko 缓存的使用。
    gcloud config set builds/use_kaniko True
    
    Kaniko 会缓存容器构建工件,因此使用此选项可加快后续构建的速度。
  2. (可选)创建 Dockerfile。您可以在本教程中自定义 Dockerfile。起始文件如下所示:

    Python

      FROM gcr.io/dataflow-templates-base/python3-template-launcher-base
    
      ARG WORKDIR=/dataflow/template
      RUN mkdir -p ${WORKDIR}
      WORKDIR ${WORKDIR}
    
      COPY requirements.txt .
      COPY streaming_beam.py .
    
      # Do not include `apache-beam` in requirements.txt
      ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt"
      ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/streaming_beam.py"
    
      # Install apache-beam and other dependencies to launch the pipeline
      RUN pip install apache-beam[gcp]
      RUN pip install -U -r ./requirements.txt

    此 Dockerfile 包含 FROMENVCOPY 命令,您可以在 Dockerfile 参考 中阅读相关信息。

    gcr.io/PROJECT/ 开头的映像会保存到项目的 Container Registry 中,供其他 Google Cloud 产品访问。
  3. 结合使用 Dockerfile 和 Cloud Build 来构建 Docker 映像。
    gcloud builds submit --tag $TEMPLATE_IMAGE .
    

元数据

您可以使用附加元数据扩展模板,以便在运行模板时验证自定义参数。如果要为模板创建元数据,请按照以下步骤操作:

  1. 使用元数据参数中的参数创建 metadata.json 文件。

    如需查看示例,请参阅示例元数据文件

  2. 将元数据文件存储在 Cloud Storage 中模板所在的文件夹内。

元数据参数

参数键 必需 值的说明
name 模板的名称。
description 对模板进行说明的一小段文本。
parameters 模板使用的一组附加参数。默认情况下,使用空数组。
name 模板中使用的参数的名称。
label 人类可读的字符串,用于在 Cloud Console 中标记参数。
helpText 对参数进行说明的一小段文本。
isOptional 如果参数是必需的,则为 false;如果参数是可选的,则为 true。除非设置了值,否则 isOptional 默认为 false。 如果您没有为元数据添加此参数键,则元数据会成为必需参数。
regexes 一组字符串形式的 POSIX-egrep 正则表达式(用于验证参数的值)。例如,["^[a-zA-Z][a-zA-Z0-9]+"] 是一个正则表达式,用于验证以字母开头,并且包含一个或多个字符的值。默认使用空数组。

示例元数据文件

Java

{
  "name": "Streaming Beam SQL",
  "description": "An Apache Beam streaming pipeline that reads JSON encoded messages from Pub/Sub, uses Beam SQL to transform the message data, and writes the results to a BigQuery",
  "parameters": [
    {
      "name": "inputSubscription",
      "label": "Pub/Sub input subscription.",
      "helpText": "Pub/Sub subscription to read from.",
      "regexes": [
        "[a-zA-Z][-_.~+%a-zA-Z0-9]{2,}"
      ]
    },
    {
      "name": "outputTable",
      "label": "BigQuery output table",
      "helpText": "BigQuery table spec to write to, in the form 'project:dataset.table'.",
      "isOptional": true,
      "regexes": [
        "[^:]+:[^.]+[.].+"
      ]
    }
  ]
}

Python

{
  "name": "Streaming beam Python flex template",
  "description": "Streaming beam example for python flex template.",
  "parameters": [
    {
      "name": "input_subscription",
      "label": "Input PubSub subscription.",
      "helpText": "Name of the input PubSub subscription to consume from.",
      "regexes": [
        "projects/[^/]+/subscriptions/[a-zA-Z][-_.~+%a-zA-Z0-9]{2,}"
      ]
    },
    {
      "name": "output_table",
      "label": "BigQuery output table name.",
      "helpText": "Name of the BigQuery output table name.",
      "isOptional": true,
      "regexes": [
        "[^:]+:[^.]+[.].+"
      ]
    }
  ]
}

您可以从 Dataflow 模板目录下载 Google 提供的模板的元数据文件。

创建 Flex 模板

要运行模板,您需要在 Cloud Storage 中创建模板规范文件,其中包含运行作业所需的所有必要信息,例如 SDK 信息和元数据。

本教程使用示例元数据文件,该文件包含模板的其他信息,例如 namedescription 和输入 parameters 字段。

  1. 创建模板规范文件,其中包含运行作业所需的所有必要信息,例如 SDK 信息和元数据。
    export TEMPLATE_PATH="gs://$BUCKET/samples/dataflow/templates/streaming-beam-sql.json"
    
  2. 构建 Flex 模板。

    Java

        gcloud dataflow flex-template build $TEMPLATE_PATH \
          --image-gcr-path "$TEMPLATE_IMAGE" \
          --sdk-language "JAVA" \
          --flex-template-base-image JAVA11 \
          --metadata-file "metadata.json" \
          --jar "target/streaming-beam-sql-1.0.jar" \
          --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="org.apache.beam.samples.StreamingBeamSql"

    Python

        gcloud dataflow flex-template build $TEMPLATE_PATH \
          --image "$TEMPLATE_IMAGE" \
          --sdk-language "PYTHON" \
          --metadata-file "metadata.json"

现在,您可以通过您指定的 Cloud Storage 位置中的模板文件使用该模板。

运行 Flex 模板流水线

现在,您可以通过引用模板文件并传递流水线所需的模板参数,在 Dataflow 中运行 Apache Beam 流水线。

  1. 在 shell 或终端中,运行模板:

    Java

    export REGION="us-central1"
    
    gcloud dataflow flex-template run "streaming-beam-sql-`date +%Y%m%d-%H%M%S`" \
        --template-file-gcs-location "$TEMPLATE_PATH" \
        --parameters inputSubscription="$SUBSCRIPTION" \
        --parameters outputTable="$PROJECT:$DATASET.$TABLE" \
        --region "$REGION"

    Python

    export REGION="us-central1"
    
    gcloud dataflow flex-template run "streaming-beam-`date +%Y%m%d-%H%M%S`" \
        --template-file-gcs-location "$TEMPLATE_PATH" \
        --parameters input_subscription="projects/$PROJECT/subscriptions/$SUBSCRIPTION" \
        --parameters output_table="$PROJECT:$DATASET.$TABLE" \
        --region "$REGION"
    或者,您可以使用 REST API 请求运行模板:

    Java

    export REGION="us-central1"
    
    curl -X POST \
      "https://dataflow.googleapis.com/v1b3/projects/$PROJECT/locations/$REGION/flexTemplates:launch" \
      -H "Content-Type: application/json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)" \
      -d '{
        "launch_parameter": {
          "jobName": "streaming-beam-sql-'$(date +%Y%m%d-%H%M%S)'",
          "parameters": {
            "inputSubscription": "'$SUBSCRIPTION'",
            "outputTable": "'$PROJECT:$DATASET.$TABLE'"
          },
          "containerSpecGcsPath": "'$TEMPLATE_PATH'"
        }
      }'

    Python

    export REGION="us-central1"
    
    curl -X POST \
      "https://dataflow.googleapis.com/v1b3/projects/$PROJECT/locations/$REGION/flexTemplates:launch" \
      -H "Content-Type: application/json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)" \
      -d '{
        "launch_parameter": {
          "jobName": "streaming-beam-sql-'$(date +%Y%m%d-%H%M%S)'",
          "parameters": {
            "input_subscription": "projects/'$PROJECT'/subscriptions/'$SUBSCRIPTION'",
            "output_table": "'$PROJECT:$DATASET.$TABLE'"
          },
          "containerSpecGcsPath": "'$TEMPLATE_PATH'"
        }
      }'
  2. 在您执行运行 Flex 模板的命令后,Dataflow 将返回作业状态为已加入队列的作业 ID。可能几分钟后作业状态才会变为正在运行,您才可以访问作业图。
  3. 通过运行以下查询来查看 BigQuery 中的结果:
    bq query --use_legacy_sql=false 'SELECT * FROM `'"$PROJECT.$DATASET.$TABLE"'`'
    
    在此流水线的运行期间,您可以看到 BigQuery 表中每分钟都在附加新行。

清理

完成本教程后,您可清理在 Google Cloud 上创建的资源,以免这些资源将来产生费用。以下部分介绍如何删除或关闭这些资源。

清理 Flex 模板资源

  1. 停止 Dataflow 流水线。
    gcloud dataflow jobs list \
      --filter 'NAME=streaming-beam-sql AND STATE=Running' \
      --format 'value(JOB_ID)' \
      --region "$REGION" \
      | xargs gcloud dataflow jobs cancel --region "$REGION"
    
  2. 从 Cloud Storage 中删除模板规范文件。
    gsutil rm $TEMPLATE_PATH
    
  3. 从 Container Registry 中删除 Flex 模板容器映像。
    gcloud container images delete $TEMPLATE_IMAGE --force-delete-tags
    

清理 Google Cloud 项目资源

  1. 删除 Cloud Scheduler 作业。
    gcloud scheduler jobs delete negative-ratings-publisher
    gcloud scheduler jobs delete positive-ratings-publisher
    
  2. 删除 Pub/Sub 订阅和主题。
    gcloud pubsub subscriptions delete $SUBSCRIPTION
    gcloud pubsub topics delete $TOPIC
    
  3. 删除 BigQuery 表。
    bq rm -f -t $PROJECT:$DATASET.$TABLE
    
  4. 删除 BigQuery 数据集,只执行此操作不会产生任何费用。

    以下命令还会删除该数据集中的所有表。表和数据无法恢复。

    bq rm -r -f -d $PROJECT:$DATASET
    
  5. 删除 Cloud Storage 存储分区,只执行此操作不会产生任何费用。

    以下命令还会删除该存储分区中的所有对象。这些对象无法恢复。

    gsutil rm -r gs://$BUCKET
    

限制

以下限制适用于 Flex 模板作业:

  • 您必须使用 Google 提供的基础映像来使用 Docker 打包容器。 如需查看适用映像的列表,请参阅 Flex 模板基础映像
  • 必须在调用 run 后退出用于构建流水线的程序,流水线才能启动。
  • 不支持 waitUntilFinish (Java) 和 wait_until_finish (Python)。

后续步骤