构建和运行 Flex 模板


借助 Dataflow Flex 模板,您可以打包 Dataflow 流水线以进行部署。本教程介绍了如何构建 Dataflow Flex 模板,然后使用该模板运行 Dataflow 作业。

目标

  • 构建 Dataflow Flex 模板。
  • 使用该模板运行 Dataflow 作业。

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

完成本文档中描述的任务后,您可以通过删除所创建的资源来避免继续计费。如需了解详情,请参阅清理

准备工作

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 安装 Google Cloud CLI。
  3. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  4. 创建或选择 Google Cloud 项目

    • 创建 Google Cloud 项目:

      gcloud projects create PROJECT_ID

      PROJECT_ID 替换为您要创建的 Google Cloud 项目的名称。

    • 选择您创建的 Google Cloud 项目:

      gcloud config set project PROJECT_ID

      PROJECT_ID 替换为您的 Google Cloud 项目 名称。

  5. 确保您的 Google Cloud 项目已启用结算功能

  6. Enable the Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, Resource Manager, Artifact Registry, and Cloud Build API:

    gcloud services enable dataflow compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com artifactregistry.googleapis.com cloudbuild.googleapis.com
  7. 为您的 Google 账号创建本地身份验证凭据:

    gcloud auth application-default login
  8. 向您的 Google 账号授予角色。对以下每个 IAM 角色运行以下命令一次: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID 替换为您的项目 ID。
    • EMAIL_ADDRESS 替换为您的电子邮件地址。
    • ROLE 替换为每个角色。
  9. 安装 Google Cloud CLI。
  10. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  11. 创建或选择 Google Cloud 项目

    • 创建 Google Cloud 项目:

      gcloud projects create PROJECT_ID

      PROJECT_ID 替换为您要创建的 Google Cloud 项目的名称。

    • 选择您创建的 Google Cloud 项目:

      gcloud config set project PROJECT_ID

      PROJECT_ID 替换为您的 Google Cloud 项目 名称。

  12. 确保您的 Google Cloud 项目已启用结算功能

  13. Enable the Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, Resource Manager, Artifact Registry, and Cloud Build API:

    gcloud services enable dataflow compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com artifactregistry.googleapis.com cloudbuild.googleapis.com
  14. 为您的 Google 账号创建本地身份验证凭据:

    gcloud auth application-default login
  15. 向您的 Google 账号授予角色。对以下每个 IAM 角色运行以下命令一次: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID 替换为您的项目 ID。
    • EMAIL_ADDRESS 替换为您的电子邮件地址。
    • ROLE 替换为每个角色。
  16. 向您的 Compute Engine 默认服务账号授予角色。对以下每个 IAM 角色运行以下命令一次:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    • roles/artifactregistry.reader
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

    替换以下内容:

    • PROJECT_ID:您的项目 ID
    • PROJECT_NUMBER:您的项目编号
    • SERVICE_ACCOUNT_ROLE:每个角色

准备环境

安装 SDK 以及针对开发环境的任何要求。

Java

  1. 下载并安装 Java Development Kit (JDK) 版本 11。验证 JAVA_HOME 环境变量是否已设置并指向您的 JDK 安装。

  2. 下载 Apache Maven,并按照适用于您的具体操作系统的 Maven 安装指南进行安装。

Python

安装 Python 版 Apache Beam SDK

Go

使用 Go 的下载和安装指南下载并安装适用于您的具体操作系统的 Go。如需了解 Apache Beam 支持的 Go 运行时环境,请参阅 Apache Beam 运行时支持

下载代码示例。

Java

  1. 克隆 java-docs-samples 代码库

    git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
    
  2. 找到本教程的代码示例。

    cd java-docs-samples/dataflow/flex-templates/getting_started
    
  3. 在 Uber JAR 文件中构建 Java 项目。

    mvn clean package

    此 Uber JAR 文件中嵌入了所有依赖项。您可以将此文件作为在其他库上没有外部依赖项的独立应用运行。

Python

  1. 克隆 python-docs-samples 代码库

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    
  2. 找到本教程的代码示例。

    cd python-docs-samples/dataflow/flex-templates/getting_started
    

Go

  1. 克隆 golang-samples 代码库

    git clone https://github.com/GoogleCloudPlatform/golang-samples.git
    
  2. 找到本教程的代码示例。

    cd golang-samples/dataflow/flex-templates/wordcount
    
  3. 编译 Go 二进制文件。

    GOOS=linux GOARCH=amd64 go build -o wordcount .

创建 Cloud Storage 存储桶

使用 gcloud storage buckets create 命令创建 Cloud Storage 存储桶:

gcloud storage buckets create gs://BUCKET_NAME

BUCKET_NAME 替换为您的 Cloud Storage 存储桶的名称。Cloud Storage 存储桶名称必须是全局唯一的,并且符合存储桶命名要求

创建 Artifact Registry 代码库

创建一个您将模板的 Docker 容器映像推送到其中的 Artifact Registry 制品库。

  1. 使用 gcloud artifacts repositories create 命令创建新的 Artifact Registry 制品库。

    gcloud artifacts repositories create REPOSITORY \
     --repository-format=docker \
     --location=LOCATION
    

    替换以下内容:

    • REPOSITORY:代码库的名称。制品库名称对于项目中的每个制品库位置必须是唯一的。
    • LOCATION:制品库的单区域或多区域位置
  2. 使用 gcloud auth configure-docker 命令配置 Docker 以对 Artifact Registry 的请求进行身份验证。此命令会更新 Docker 配置,以便您可以与 Artifact Registry 连接来推送映像。

    gcloud auth configure-docker LOCATION-docker.pkg.dev
    

Flex 模板还可以使用存储在私有注册表中的映像。如需了解详情,请参阅使用私有注册表中的映像

构建 Flex 模板

在此步骤中,您将使用 gcloud dataflow flex-template build 命令构建 Flex 模板。

Flex 模板包含以下组件:

  • 打包流水线代码的 Docker 容器映像。
  • 模板规范文件。此文件是一个 JSON 文档,其中包含容器映像的位置以及模板的相关元数据(例如流水线参数)。

Java

gcloud dataflow flex-template build gs://BUCKET_NAME/getting_started-java.json \
 --image-gcr-path "LOCATION-docker.pkg.dev/PROJECT_ID/REPOSITORY/getting-started-java:latest" \
 --sdk-language "JAVA" \
 --flex-template-base-image JAVA11 \
 --metadata-file "metadata.json" \
 --jar "target/flex-template-getting-started-1.0.jar" \
 --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="com.example.dataflow.FlexTemplateGettingStarted"

替换以下内容:

  • BUCKET_NAME:您之前创建的 Cloud Storage 存储桶的名称
  • LOCATION:位置
  • PROJECT_ID:Google Cloud 项目 ID
  • REPOSITORY:您之前创建的 Artifact Registry 制品库的名称

Python

gcloud dataflow flex-template build gs://BUCKET_NAME/getting_started-py.json \
 --image-gcr-path "LOCATION-docker.pkg.dev/PROJECT_ID/REPOSITORY/getting-started-python:latest" \
 --sdk-language "PYTHON" \
 --flex-template-base-image "PYTHON3" \
 --metadata-file "metadata.json" \
 --py-path "." \
 --env "FLEX_TEMPLATE_PYTHON_PY_FILE=getting_started.py" \
 --env "FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE=requirements.txt"

替换以下内容:

  • BUCKET_NAME:您之前创建的 Cloud Storage 存储桶的名称
  • LOCATION:位置
  • PROJECT_ID:Google Cloud 项目 ID
  • REPOSITORY:您之前创建的 Artifact Registry 制品库的名称

Go

  1. 通过 gcloud builds submit 命令,结合使用 Dockerfile 和 Cloud Build 来构建 Docker 映像。此命令会构建文件并将其推送到 Artifact Registry 制品库。

    gcloud builds submit --tag LOCATION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/wordcount-go:latest .
    

    替换以下内容:

    • LOCATION:位置
    • PROJECT_ID:Google Cloud 项目 ID
    • REPOSITORY:您之前创建的 Artifact Registry 制品库的名称
  2. 使用 gcloud dataflow flex-template build 命令在 Cloud Storage 存储桶中创建名为 wordcount-go.json 的 Flex 模板。

    gcloud dataflow flex-template build gs://BUCKET_NAME/samples/dataflow/templates/wordcount-go.json \
      --image "LOCATION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/wordcount-go:latest" \
      --sdk-language "GO" \
      --metadata-file "metadata.json"

    BUCKET_NAME 替换为您之前创建的 Cloud Storage 存储桶的名称。

运行 Flex 模板

在此步骤中,您将使用该模板运行 Dataflow 作业。

Java

  1. 使用 gcloud dataflow flex-template run 命令运行使用 Flex 模板的 Dataflow 作业。

    gcloud dataflow flex-template run "getting-started-`date +%Y%m%d-%H%M%S`" \
     --template-file-gcs-location "gs://BUCKET_NAME/getting_started-java.json" \
     --parameters output="gs://BUCKET_NAME/output-" \
     --region "REGION"

    替换以下内容:

    • BUCKET_NAME:您之前创建的 Cloud Storage 存储桶的名称
    • REGION:区域
  2. 如需在 Google Cloud 控制台中查看 Dataflow 作业的状态,请进入 Dataflow 作业页面。

    打开“作业”

如果作业成功运行,则会将输出写入 Cloud Storage 存储桶中名为 gs://BUCKET_NAME/output--00000-of-00001.txt 的文件。

Python

  1. 使用 gcloud dataflow flex-template run 命令运行使用 Flex 模板的 Dataflow 作业。

    gcloud dataflow flex-template run "getting-started-`date +%Y%m%d-%H%M%S`" \
     --template-file-gcs-location "gs://BUCKET_NAME/getting_started-py.json" \
     --parameters output="gs://BUCKET_NAME/output-" \
     --region "REGION"
    

    替换以下内容:

    • BUCKET_NAME:您之前创建的 Cloud Storage 存储桶的名称
    • REGION:区域
  2. 如需在 Google Cloud 控制台中查看 Dataflow 作业的状态,请进入 Dataflow 作业页面。

    打开“作业”

如果作业成功运行,则会将输出写入 Cloud Storage 存储桶中名为 gs://BUCKET_NAME/output--00000-of-00001.txt 的文件。

Go

  1. 使用 gcloud dataflow flex-template run 命令运行使用 Flex 模板的 Dataflow 作业。

    gcloud dataflow flex-template run "wordcount-go-`date +%Y%m%d-%H%M%S`" \
     --template-file-gcs-location "gs://BUCKET_NAME/samples/dataflow/templates/wordcount-go.json" \
     --parameters output="gs://BUCKET_NAME/samples/dataflow/templates/counts.txt" \
     --region "REGION"
    

    替换以下内容:

    • BUCKET_NAME:您之前创建的 Cloud Storage 存储桶的名称
    • REGION:区域
  2. 如需在 Google Cloud 控制台中查看 Dataflow 作业的状态,请进入 Dataflow 作业页面。

    打开“作业”

如果作业成功运行,则会将输出写入 Cloud Storage 存储桶中名为 gs://BUCKET_NAME/samples/dataflow/templates/count.txt 的文件。

清理

为避免因本教程中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。

删除项目

    删除 Google Cloud 项目:

    gcloud projects delete PROJECT_ID

删除各个资源

  1. 删除 Cloud Storage 存储桶以及存储桶中的所有对象。
    gcloud storage rm gs://BUCKET_NAME --recursive
  2. 删除 Artifact Registry 代码库。
    gcloud artifacts repositories delete REPOSITORY \
        --location=LOCATION
  3. 撤消您授予 Compute Engine 默认服务账号的角色。对以下每个 IAM 角色运行以下命令一次:
    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    • roles/artifactregistry.reader
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
        --role=SERVICE_ACCOUNT_ROLE
  4. 可选:撤消您创建的身份验证凭据,并删除本地凭据文件。

    gcloud auth application-default revoke
  5. 可选:从 gcloud CLI 撤消凭据。

    gcloud auth revoke

后续步骤