使用 Java 创建 Dataflow 流水线

本文档介绍了如何设置 Google Cloud 项目、创建使用 Java 版 Apache Beam SDK 构建的示例流水线,以及在 Dataflow 服务上运行示例流水线。该流水线会从 Cloud Storage 读取文本文件,计算该文件中不重复字词的数量,然后将字数统计写回 Cloud Storage。如需了解 WordCount 流水线,请观看如何在 Apache Beam 中使用 WordCount 视频。

本教程需要使用 Maven,但您也可以将示例项目从 Maven 转换为 Gradle。如需了解详情,请参阅可选:从 Maven 转换为 Gradle


如需在 Google Cloud 控制台中直接遵循有关此任务的分步指导,请点击操作演示

操作演示


准备工作

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:

    gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com
  7. Create local authentication credentials for your user account:

    gcloud auth application-default login
  8. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  9. Install the Google Cloud CLI.
  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Make sure that billing is enabled for your Google Cloud project.

  13. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:

    gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com
  14. Create local authentication credentials for your user account:

    gcloud auth application-default login
  15. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  16. 向您的 Compute Engine 默认服务账号授予角色。对以下每个 IAM 角色运行以下命令一次:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
    • PROJECT_ID 替换为您的项目 ID。
    • PROJECT_NUMBER 替换为您的项目编号。 如需查找项目编号,请参阅识别项目或使用 gcloud projects describe 命令。
    • SERVICE_ACCOUNT_ROLE 替换为每个角色。
  17. Create a Cloud Storage bucket and configure it as follows:
    • Set the storage class to S (Standard)。
    • 将存储位置设置为以下项: US(美国)。
    • BUCKET_NAME 替换为 唯一的存储桶名称。请勿在存储桶名称中添加敏感信息,因为存储桶命名空间是全局性的,公开可见。
    • gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
    • 在后续部分中根据需要复制以下内容:
      • 您的 Cloud Storage 存储桶名称。
      • 您的 Google Cloud 项目 ID。 如需查找此 ID,请参阅识别项目
    • 下载并安装 Java Development Kit (JDK) 版本 11。(Dataflow 继续支持版本 8。)验证 JAVA_HOME 环境变量已设置并指向您的 JDK 安装。
    • 下载 Apache Maven,并按照适用于您的具体操作系统的 Maven 安装指南进行安装。

获取流水线代码

Apache Beam SDK 是一个用于数据处理流水线的开源编程模型。您可使用 Apache Beam 程序定义这些流水线,还可以选择 Dataflow 等运行程序来运行流水线。

  1. 在您的 shell 或终端中,使用 Maven Archetype 插件在包含 Apache Beam SDK 的 WordCount 示例的计算机上创建 Maven 项目:
    mvn archetype:generate \
        -DarchetypeGroupId=org.apache.beam \
        -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
        -DarchetypeVersion=2.61.0 \
        -DgroupId=org.example \
        -DartifactId=word-count-beam \
        -Dversion="0.1" \
        -Dpackage=org.apache.beam.examples \
        -DinteractiveMode=false
    

    该命令会在当前目录下创建一个名为 word-count-beam 的新目录。word-count-beam 目录包含一个简单的 pom.xml 文件和一系列计算文本文件字数的示例流水线。

  2. 验证 word-count-beam 目录包含 pom.xml 文件:

    Linux 或 macOS

    cd word-count-beam/
    ls

    输出如下所示:

    pom.xml   src

    Windows

    cd word-count-beam/
    dir

    输出如下所示:

    pom.xml   src
  3. 验证您的 Maven 项目包含示例流水线:

    Linux 或 macOS

    ls src/main/java/org/apache/beam/examples/

    输出如下所示:

    DebuggingWordCount.java   WindowedWordCount.java   common
    MinimalWordCount.java   WordCount.java

    Windows

    dir src/main/java/org/apache/beam/examples/

    输出如下所示:

    DebuggingWordCount.java   WindowedWordCount.java   common
    MinimalWordCount.java   WordCount.java

如需查看这些示例中使用的 Apache Beam 概念的详细介绍,请参阅 Apache Beam WordCount 示例。后面部分中的说明使用 WordCount.java

在本地运行流水线

  • 在您的 shell 或终端中,从 word-count-beam 目录在本地运行 WordCount 流水线:
    mvn compile exec:java \
        -Dexec.mainClass=org.apache.beam.examples.WordCount \
        -Dexec.args="--output=counts"
    

    输出文件的前缀为 counts 并写入 word-count-beam 目录。它们包含输入文本中的不重复单词以及每个单词的出现次数。

在 Dataflow 服务上运行流水线

  • 在您的 shell 或终端中,从 word-count-beam 目录在 Dataflow 服务上构建并运行 WordCount 流水线:
    mvn -Pdataflow-runner compile exec:java \
        -Dexec.mainClass=org.apache.beam.examples.WordCount \
        -Dexec.args="--project=PROJECT_ID \
        --gcpTempLocation=gs://BUCKET_NAME/temp/ \
        --output=gs://BUCKET_NAME/output \
        --runner=DataflowRunner \
        --region=REGION"
    

    请替换以下内容:

    • PROJECT_ID:您的 Google Cloud 项目 ID
    • BUCKET_NAME:Cloud Storage 存储桶的名称
    • REGIONDataflow 区域,例如 us-central1

查看结果

  1. 在 Google Cloud 控制台中,前往 Dataflow 作业页面。

    前往作业

    作业页面显示所有可用作业的详细信息,其中包括状态。wordcount 作业的状态最初为正在运行,然后更新为成功

  2. 在 Google Cloud 控制台中,进入 Cloud Storage 存储桶页面。

    前往存储桶

    存储桶页面会显示项目中所有存储桶的列表。

  3. 点击您创建的存储桶。

    存储桶详情页面显示 Dataflow 作业创建的输出文件和暂存文件。

清理

为避免因本页面中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的 Google Cloud 项目。

删除项目

避免产生费用的最简单方法是删除您为本快速入门创建的 Google Cloud 项目。

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

逐个删除资源

如果您希望保留本快速入门中使用的 Google Cloud 项目,请逐个删除资源:

  1. In the Google Cloud console, go to the Cloud Storage Buckets page.

    Go to Buckets

  2. Click the checkbox for the bucket that you want to delete.
  3. To delete the bucket, click Delete, and then follow the instructions.
  4. 撤消授予 Compute Engine 默认服务账号的角色。对以下每个 IAM 角色运行以下命令一次:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
        --role=SERVICE_ACCOUNT_ROLE
  5. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  6. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

后续步骤