使用 Java 创建 Dataflow 流水线

本文档介绍了如何设置 Google Cloud 项目、创建使用 Java 版 Apache Beam SDK 构建的示例流水线,以及在 Dataflow 服务上运行示例流水线。该流水线会从 Cloud Storage 读取文本文件,计算该文件中不重复字词的数量,然后将字数统计写回 Cloud Storage。如需了解 WordCount 流水线,请观看如何在 Apache Beam 中使用 WordCount 视频。

本教程需要使用 Maven,但您也可以将示例项目从 Maven 转换为 Gradle。如需了解详情,请参阅可选:从 Maven 转换为 Gradle


如需在 Google Cloud 控制台中直接遵循有关此任务的分步指导,请点击操作演示

操作演示


准备工作

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 安装 Google Cloud CLI。
  3. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  4. 创建或选择 Google Cloud 项目

    • 创建 Google Cloud 项目:

      gcloud projects create PROJECT_ID

      PROJECT_ID 替换为您要创建的 Google Cloud 项目的名称。

    • 选择您创建的 Google Cloud 项目:

      gcloud config set project PROJECT_ID

      PROJECT_ID 替换为您的 Google Cloud 项目 名称。

  5. 确保您的 Google Cloud 项目已启用结算功能

  6. Enable the Dataflow、Compute Engine、Cloud Logging、Cloud Storage、Google Cloud Storage JSON、BigQuery、Cloud Pub/Sub、Cloud Datastore 和 Cloud Resource Manager APIs:

    gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com
  7. 为您的 Google 账号创建本地身份验证凭据:

    gcloud auth application-default login
  8. 向您的 Google 账号授予角色。对以下每个 IAM 角色运行以下命令一次: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID 替换为您的项目 ID。
    • EMAIL_ADDRESS 替换为您的电子邮件地址。
    • ROLE 替换为每个角色。
  9. 安装 Google Cloud CLI。
  10. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  11. 创建或选择 Google Cloud 项目

    • 创建 Google Cloud 项目:

      gcloud projects create PROJECT_ID

      PROJECT_ID 替换为您要创建的 Google Cloud 项目的名称。

    • 选择您创建的 Google Cloud 项目:

      gcloud config set project PROJECT_ID

      PROJECT_ID 替换为您的 Google Cloud 项目 名称。

  12. 确保您的 Google Cloud 项目已启用结算功能

  13. Enable the Dataflow、Compute Engine、Cloud Logging、Cloud Storage、Google Cloud Storage JSON、BigQuery、Cloud Pub/Sub、Cloud Datastore 和 Cloud Resource Manager APIs:

    gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com
  14. 为您的 Google 账号创建本地身份验证凭据:

    gcloud auth application-default login
  15. 向您的 Google 账号授予角色。对以下每个 IAM 角色运行以下命令一次: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID 替换为您的项目 ID。
    • EMAIL_ADDRESS 替换为您的电子邮件地址。
    • ROLE 替换为每个角色。
  16. 向您的 Compute Engine 默认服务账号授予角色。对以下每个 IAM 角色运行以下命令一次:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
    • PROJECT_ID 替换为您的项目 ID。
    • PROJECT_NUMBER 替换为您的项目编号。 如需查找项目编号,请参阅识别项目或使用 gcloud projects describe 命令。
    • SERVICE_ACCOUNT_ROLE 替换为每个角色。
  17. 创建一个 Cloud Storage 存储分区并按如下所示进行配置:
    • 将存储类别设置为 S (Standard)。
    • 将存储位置设置为以下项: US(美国)。
    • BUCKET_NAME 替换为唯一的存储分区名称。请勿在存储分区名称中添加敏感信息,因为存储分区命名空间是全局性的,公开可见。
    gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
  18. 在后续部分中根据需要复制以下内容:
    • 您的 Cloud Storage 存储桶名称。
    • 您的 Google Cloud 项目 ID。 如需查找此 ID,请参阅识别项目
  19. 下载并安装 Java Development Kit (JDK) 版本 11。(Dataflow 继续支持版本 8。)验证 JAVA_HOME 环境变量已设置并指向您的 JDK 安装。
  20. 下载 Apache Maven,并按照适用于您的具体操作系统的 Maven 安装指南进行安装。

获取流水线代码

Apache Beam SDK 是一个用于数据处理流水线的开源编程模型。您可使用 Apache Beam 程序定义这些流水线,还可以选择 Dataflow 等运行程序来运行流水线。

  1. 在您的 shell 或终端中,使用 Maven Archetype 插件在包含 Apache Beam SDK 的 WordCount 示例的计算机上创建 Maven 项目:
    mvn archetype:generate \
        -DarchetypeGroupId=org.apache.beam \
        -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
        -DarchetypeVersion=2.55.1 \
        -DgroupId=org.example \
        -DartifactId=word-count-beam \
        -Dversion="0.1" \
        -Dpackage=org.apache.beam.examples \
        -DinteractiveMode=false
    

    该命令会在当前目录下创建一个名为 word-count-beam 的新目录。word-count-beam 目录包含一个简单的 pom.xml 文件和一系列计算文本文件字数的示例流水线。

  2. 验证 word-count-beam 目录包含 pom.xml 文件:

    Linux 或 macOS

    cd word-count-beam/
    ls

    输出如下所示:

    pom.xml   src

    Windows

    cd word-count-beam/
    dir

    输出如下所示:

    pom.xml   src
  3. 验证您的 Maven 项目包含示例流水线:

    Linux 或 macOS

    ls src/main/java/org/apache/beam/examples/

    输出如下所示:

    DebuggingWordCount.java   WindowedWordCount.java   common
    MinimalWordCount.java   WordCount.java

    Windows

    dir src/main/java/org/apache/beam/examples/

    输出如下所示:

    DebuggingWordCount.java   WindowedWordCount.java   common
    MinimalWordCount.java   WordCount.java

如需查看这些示例中使用的 Apache Beam 概念的详细介绍,请参阅 Apache Beam WordCount 示例。后面部分中的说明使用 WordCount.java

在本地运行流水线

  • 在您的 shell 或终端中,从 word-count-beam 目录在本地运行 WordCount 流水线:
    mvn compile exec:java \
        -Dexec.mainClass=org.apache.beam.examples.WordCount \
        -Dexec.args="--output=counts"
    

    输出文件的前缀为 counts 并写入 word-count-beam 目录。它们包含输入文本中的不重复单词以及每个单词的出现次数。

在 Dataflow 服务上运行流水线

  • 在您的 shell 或终端中,从 word-count-beam 目录在 Dataflow 服务上构建并运行 WordCount 流水线:
    mvn -Pdataflow-runner compile exec:java \
        -Dexec.mainClass=org.apache.beam.examples.WordCount \
        -Dexec.args="--project=PROJECT_ID \
        --gcpTempLocation=gs://BUCKET_NAME/temp/ \
        --output=gs://BUCKET_NAME/output \
        --runner=DataflowRunner \
        --region=REGION"
    

    请替换以下内容:

    • PROJECT_ID:您的 Google Cloud 项目 ID
    • BUCKET_NAME:Cloud Storage 存储桶的名称
    • REGIONDataflow 区域,例如 us-central1

查看结果

  1. 在 Google Cloud 控制台中,前往 Dataflow 作业页面。

    前往作业

    作业页面显示所有可用作业的详细信息,其中包括状态。wordcount 作业的状态最初为正在运行,然后更新为成功

  2. 在 Google Cloud 控制台中,进入 Cloud Storage 存储桶页面。

    前往存储桶

    存储桶页面会显示项目中所有存储桶的列表。

  3. 点击您创建的存储桶。

    存储桶详情页面显示 Dataflow 作业创建的输出文件和暂存文件。

清理

为避免因本页面中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的 Google Cloud 项目。

删除项目

避免产生费用的最简单方法是删除您为本快速入门创建的 Google Cloud 项目。

  1. 在 Google Cloud 控制台中,进入管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

逐个删除资源

如果您希望保留本快速入门中使用的 Google Cloud 项目,请逐个删除资源:

  1. 在 Google Cloud 控制台中,进入 Cloud Storage 存储桶页面。

    进入“存储桶”

  2. 点击要删除的存储分区对应的复选框。
  3. 如需删除存储分区,请点击删除,然后按照说明操作。
  4. 撤消授予 Compute Engine 默认服务账号的角色。对以下每个 IAM 角色运行以下命令一次:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
        --role=SERVICE_ACCOUNT_ROLE
  5. 可选:撤消您创建的身份验证凭据,并删除本地凭据文件。

    gcloud auth application-default revoke
  6. 可选:从 gcloud CLI 撤消凭据。

    gcloud auth revoke

后续步骤