使用 Flex 模板

本教程介绍如何使用 gcloud 命令行工具创建和运行包含自定义 Docker 映像的 Dataflow Flex 模板作业。本教程将指导您完成一个流式处理流水线示例,该示例从 Pub/Sub 读取 JSON 编码的消息,使用 Beam SQL 转换消息数据,再将结果写入 BigQuery 表。

如需详细了解 Flex 模板,请参阅 Dataflow 模板

目标

  • 构建 Docker 容器映像。
  • 创建并运行 Dataflow Flex 模板。

费用

本教程使用 Google Cloud 的以下收费组件:

  • Dataflow
  • Pub/Sub
  • Cloud Storage
  • Cloud Scheduler
  • App Engine
  • Container Registry
  • Cloud Build
  • BigQuery

请使用价格计算器根据您的预计使用情况来估算费用。

准备工作

  1. 登录您的 Google Cloud 帐号。如果您是 Google Cloud 新手,请创建一个帐号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 在 Google Cloud Console 的项目选择器页面上,选择或创建一个 Google Cloud 项目。

    转到“项目选择器”

  3. 确保您的 Cloud 项目已启用结算功能。 了解如何确认您的项目是否已启用结算功能

  4. 启用 Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Pub/Sub, Resource Manager, App Engine, Cloud Scheduler, and Cloud Build API。

    启用 API

  5. 创建服务帐号:

    1. 在 Cloud Console 中,转到创建服务帐号页面。

      转到“创建服务帐号”
    2. 选择一个项目。
    3. 服务帐号名称字段中,输入一个名称。 Cloud Console 会根据此名称填充服务帐号 ID 字段。

      服务帐号说明字段中,输入说明。例如,Service account for quickstart

    4. 点击创建
    5. 点击选择角色字段。

      快速访问下,点击基本,然后点击所有者

    6. 点击继续
    7. 点击完成以完成服务帐号的创建过程。

      不要关闭浏览器窗口。您将在下一步骤中用到它。

  6. 创建服务帐号密钥:

    1. 在 Cloud Console 中,点击您创建的服务帐号的电子邮件地址。
    2. 点击密钥
    3. 依次点击添加密钥创建新密钥
    4. 点击创建。JSON 密钥文件将下载到您的计算机上。
    5. 点击关闭
  7. 将环境变量 GOOGLE_APPLICATION_CREDENTIALS 设置为包含您的服务帐号密钥的 JSON 文件的路径。 此变量仅适用于当前的 shell 会话,因此,如果您打开新的会话,请重新设置该变量。

完成本教程后,您可以删除所创建的资源以避免继续计费。如需了解详情,请参阅清理

创建示例来源和接收器

本部分介绍如何创建以下内容:

  • 使用 Pub/Sub 的流式处理数据源
  • 将数据加载到 BigQuery 中的数据集

创建 Cloud Storage 存储分区

使用 gsutil mb 命令:

export BUCKET="my-storage-bucket"
gsutil mb gs://$BUCKET

创建一个 Pub/Sub 主题并订阅该主题

使用 gcloud 命令行工具

export TOPIC="messages"
export SUBSCRIPTION="ratings"

gcloud pubsub topics create $TOPIC
gcloud pubsub subscriptions create --topic $TOPIC $SUBSCRIPTION

创建 Cloud Scheduler 作业

在该步骤中,我们将使用 gcloud 命令行工具创建和运行一个 Cloud Scheduler 作业,该作业会发布“正面评分”和“负面评分”。

  1. 针对此 Google Cloud 项目创建一个 Cloud Scheduler 作业。
    gcloud scheduler jobs create pubsub positive-ratings-publisher \
      --schedule="* * * * *" \
      --topic="$TOPIC" \
      --message-body='{"url": "https://beam.apache.org/", "review": "positive"}'
    
  2. 这将创建并运行一个“正面评分”发布程序,该程序每分钟发布一条消息。
  3. 启动 Cloud Scheduler 作业。
    gcloud scheduler jobs run positive-ratings-publisher
    
  4. 再创建并运行一个类似的“负面评分”发布程序,该程序每两分钟发布一条消息。
    gcloud scheduler jobs create pubsub negative-ratings-publisher \
      --schedule="*/2 * * * *" \
      --topic="$TOPIC" \
      --message-body='{"url": "https://beam.apache.org/", "review": "negative"}'
    
    gcloud scheduler jobs run negative-ratings-publisher
    

创建 BigQuery 数据集

使用 bq mk 命令:

export PROJECT="$(gcloud config get-value project)"
export DATASET="beam_samples"
export TABLE="streaming_beam_sql"

bq mk --dataset "$PROJECT:$DATASET"

下载代码示例

  1. 下载代码示例。

    Java

    克隆 Java-docs-samples 代码库,并导航到本教程的代码示例。

      git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
      cd java-docs-samples/dataflow/flex-templates/streaming_beam_sql

    Python

    克隆 python-docs-samples 代码库,并导航到本教程的代码示例。

      git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
      cd python-docs-samples/dataflow/flex-templates/streaming_beam
  2. 导出本教程的 TEMPLATE_IMAGE
    export TEMPLATE_IMAGE="gcr.io/$PROJECT/samples/dataflow/streaming-beam-sql:latest"
    

设置开发环境

Java

  1. 下载并安装 Java Development Kit (JDK) 版本 11。验证 JAVA_HOME 环境变量是否已设置并指向您的 JDK 安装。
  2. 下载 Apache Maven,并按照适用于您的具体操作系统的 Maven 安装指南进行安装。
  3. (可选)在本地运行 Apache Beam 流水线以进行开发。
      mvn compile exec:java \
        -Dexec.mainClass=org.apache.beam.samples.StreamingBeamSql \
        -Dexec.args="\
          --project=$PROJECT \
          --inputSubscription=$SUBSCRIPTION \
          --outputTable=$PROJECT:$DATASET.$TABLE \
          --tempLocation=gs://$BUCKET/samples/dataflow/temp"
  4. 在 Uber JAR 文件中构建 Java 项目。
      mvn clean package
  5. (可选)请记下 Uber JAR 文件相对于原始文件的大小。
      ls -lh target/*.jar
    此 Uber JAR 文件中嵌入了所有依赖项。您可以将此文件作为在其他库上没有外部依赖项的独立应用运行。

Python

使用 Python 版 Apache Beam SDK

仅限 Python:创建和构建容器映像

本部分包含面向 Python 用户的步骤。如果您使用的是 Java,请跳过以下步骤。

  1. (可选)默认启用对 Kaniko 缓存的使用。
    gcloud config set builds/use_kaniko True
    
    Kaniko 会缓存容器构建工件,因此使用此选项可加快后续构建的速度。
  2. (可选)创建 Dockerfile。您可以在本教程中自定义 Dockerfile。起始文件如下所示:

    Python

      FROM gcr.io/dataflow-templates-base/python3-template-launcher-base
    
      ARG WORKDIR=/dataflow/template
      RUN mkdir -p ${WORKDIR}
      WORKDIR ${WORKDIR}
    
      # Due to a change in the Apache Beam base image in version 2.24, you must to install
      # libffi-dev manually as a dependency. For more information:
      #   https://github.com/GoogleCloudPlatform/python-docs-samples/issues/4891
      RUN apt-get update && apt-get install -y libffi-dev && rm -rf /var/lib/apt/lists/*
    
      COPY requirements.txt .
      COPY streaming_beam.py .
    
      ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt"
      ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/streaming_beam.py"
    
      RUN pip install -U -r ./requirements.txt

    此 Dockerfile 包含 FROMENVCOPY 命令,您可以在 Dockerfile 参考 中阅读相关信息。

    gcr.io/PROJECT/ 开头的映像会保存到项目的 Container Registry 中,供其他 Google Cloud 产品访问。
  3. 结合使用 Dockerfile 和 Cloud Build 来构建 Docker 映像。
    gcloud builds submit --tag $TEMPLATE_IMAGE .
    

创建 Flex 模板

要运行模板,您需要在 Cloud Storage 中创建模板规范文件,其中包含运行作业所需的所有必要信息,例如 SDK 信息和元数据。

此示例中的 metadata.json 文件包含模板的其他信息,例如 namedescription 和输入 parameters 字段。

  1. 创建模板规范文件,其中包含运行作业所需的所有必要信息,例如 SDK 信息和元数据。
    export TEMPLATE_PATH="gs://$BUCKET/samples/dataflow/templates/streaming-beam-sql.json"
    
  2. 构建 Flex 模板。

    Java

        gcloud dataflow flex-template build $TEMPLATE_PATH \
          --image-gcr-path "$TEMPLATE_IMAGE" \
          --sdk-language "JAVA" \
          --flex-template-base-image JAVA11 \
          --metadata-file "metadata.json" \
          --jar "target/streaming-beam-sql-1.0.jar" \
          --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="org.apache.beam.samples.StreamingBeamSql"

    Python

        gcloud dataflow flex-template build $TEMPLATE_PATH \
          --image "$TEMPLATE_IMAGE" \
          --sdk-language "PYTHON" \
          --metadata-file "metadata.json"

现在,您可以通过您指定的 Cloud Storage 位置中的模板文件使用该模板。

运行 Flex 模板流水线

现在,您可以通过引用模板文件并传递流水线所需的模板参数,在 Dataflow 中运行 Apache Beam 流水线。

  1. 运行模板。

    Java

      export REGION="us-central1"
    
      gcloud dataflow flex-template run "streaming-beam-sql-`date +%Y%m%d-%H%M%S`" \
        --template-file-gcs-location "$TEMPLATE_PATH" \
        --parameters inputSubscription="$SUBSCRIPTION" \
        --parameters outputTable="$PROJECT:$DATASET.$TABLE" \
        --region "$REGION"

    Python

      export REGION="us-central1"
    
      gcloud dataflow flex-template run "streaming-beam-`date +%Y%m%d-%H%M%S`" \
        --template-file-gcs-location "$TEMPLATE_PATH" \
        --parameters input_subscription="projects/$PROJECT_ID/subscriptions/$SUBSCRIPTION" \
        --parameters output_table="$PROJECT:$DATASET.$TABLE" \
        --region "$REGION"
    或者,通过 REST API 请求运行模板。
    curl -X POST \
      "https://dataflow.googleapis.com/v1b3/projects/$PROJECT/locations/us-central1/flexTemplates:launch" \
      -H "Content-Type: application/json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)" \
      -d '{
        "launch_parameter": {
          "jobName": "streaming-beam-sql-'$(date +%Y%m%d-%H%M%S)'",
          "parameters": {
            "inputSubscription": "'$SUBSCRIPTION'",
            "outputTable": "'$PROJECT:$DATASET.$TABLE'"
          },
          "containerSpecGcsPath": "'$TEMPLATE_PATH'"
        }
      }'
    
  2. 在您执行运行 Flex 模板的命令后,Dataflow 将返回作业状态为已加入队列的作业 ID。可能几分钟后作业状态才会变为正在运行,您才可以访问作业图。
  3. 通过运行以下查询来查看 BigQuery 中的结果:
    bq query --use_legacy_sql=false 'SELECT * FROM `'"$PROJECT.$DATASET.$TABLE"'`'
    
    在此流水线的运行期间,您可以看到 BigQuery 表中每分钟都在附加新行。

清理

完成本教程后,您可清理在 Google Cloud 上创建的资源,以免这些资源将来产生费用。以下部分介绍如何删除或关闭这些资源。

清理 Flex 模板资源

  1. 停止 Dataflow 流水线。
    gcloud dataflow jobs list \
      --filter 'NAME=streaming-beam-sql AND STATE=Running' \
      --format 'value(JOB_ID)' \
      --region "$REGION" \
      | xargs gcloud dataflow jobs cancel --region "$REGION"
    
  2. 从 Cloud Storage 中删除模板规范文件。
    gsutil rm $TEMPLATE_PATH
    
  3. 从 Container Registry 中删除 Flex 模板容器映像。
    gcloud container images delete $TEMPLATE_IMAGE --force-delete-tags
    

清理 Google Cloud 项目资源

  1. 删除 Cloud Scheduler 作业。
    gcloud scheduler jobs delete negative-ratings-publisher
    gcloud scheduler jobs delete positive-ratings-publisher
    
  2. 删除 Pub/Sub 订阅和主题。
    gcloud pubsub subscriptions delete $SUBSCRIPTION
    gcloud pubsub topics delete $TOPIC
    
  3. 删除 BigQuery 表。
    bq rm -f -t $PROJECT:$DATASET.$TABLE
    
  4. 删除 BigQuery 数据集,只执行此操作不会产生任何费用。

    以下命令还会删除该数据集中的所有表。表和数据无法恢复。

    bq rm -r -f -d $PROJECT:$DATASET
    
  5. 删除 Cloud Storage 存储分区,只执行此操作不会产生任何费用。

    以下命令还会删除该存储分区中的所有对象。这些对象无法恢复。

    gsutil rm -r gs://$BUCKET
    

限制

以下限制适用于 Flex 模板作业:

  • 您必须使用 Google 提供的基础映像来使用 Docker 打包容器。
  • 不支持 waitUntilFinish (Java) 和 wait_until_finish (Python)。

后续步骤