使用 Java 创建 Dataflow 流水线
本文档介绍了如何设置 Google Cloud 项目、创建使用 Java 版 Apache Beam SDK 构建的示例流水线,以及在 Dataflow 服务上运行示例流水线。该流水线会从 Cloud Storage 读取文本文件,计算该文件中不重复字词的数量,然后将字数统计写回 Cloud Storage。如需了解 WordCount 流水线,请观看如何在 Apache Beam 中使用 WordCount 视频。
本教程需要使用 Maven,但您也可以将示例项目从 Maven 转换为 Gradle。如需了解详情,请参阅可选:从 Maven 转换为 Gradle。
如需在 Google Cloud 控制台中直接遵循有关此任务的分步指导,请点击操作演示:
准备工作
- 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
- 安装 Google Cloud CLI。
-
如需初始化 gcloud CLI,请运行以下命令:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Enable the Dataflow、Compute Engine、Cloud Logging、Cloud Storage、Google Cloud Storage JSON、BigQuery、Cloud Pub/Sub、Cloud Datastore 和 Cloud Resource Manager APIs:
gcloud services enable dataflow
compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com -
为您的 Google 账号创建本地身份验证凭据:
gcloud auth application-default login
-
向您的 Google 账号授予角色。对以下每个 IAM 角色运行以下命令一次:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
- 将
PROJECT_ID
替换为您的项目 ID。 - 将
EMAIL_ADDRESS
替换为您的电子邮件地址。 - 将
ROLE
替换为每个角色。
- 将
- 安装 Google Cloud CLI。
-
如需初始化 gcloud CLI,请运行以下命令:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Enable the Dataflow、Compute Engine、Cloud Logging、Cloud Storage、Google Cloud Storage JSON、BigQuery、Cloud Pub/Sub、Cloud Datastore 和 Cloud Resource Manager APIs:
gcloud services enable dataflow
compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com -
为您的 Google 账号创建本地身份验证凭据:
gcloud auth application-default login
-
向您的 Google 账号授予角色。对以下每个 IAM 角色运行以下命令一次:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
- 将
PROJECT_ID
替换为您的项目 ID。 - 将
EMAIL_ADDRESS
替换为您的电子邮件地址。 - 将
ROLE
替换为每个角色。
- 将
向您的 Compute Engine 默认服务账号授予角色。对以下每个 IAM 角色运行以下命令一次:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.objectAdmin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
- 将
PROJECT_ID
替换为您的项目 ID。 - 将
PROJECT_NUMBER
替换为您的项目编号。 如需查找项目编号,请参阅识别项目或使用gcloud projects describe
命令。 - 将
SERVICE_ACCOUNT_ROLE
替换为每个角色。
-
创建一个 Cloud Storage 存储分区并按如下所示进行配置:
-
将存储类别设置为
S
(Standard)。 -
将存储位置设置为以下项:
US
(美国)。 -
将
BUCKET_NAME
替换为唯一的存储分区名称。请勿在存储分区名称中添加敏感信息,因为存储分区命名空间是全局性的,公开可见。
gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
-
将存储类别设置为
- 在后续部分中根据需要复制以下内容:
- 您的 Cloud Storage 存储桶名称。
- 您的 Google Cloud 项目 ID。 如需查找此 ID,请参阅识别项目。
- 下载并安装 Java Development Kit (JDK) 版本 11。(Dataflow 继续支持版本 8。)验证
JAVA_HOME
环境变量已设置并指向您的 JDK 安装。 - 下载 Apache Maven,并按照适用于您的具体操作系统的 Maven 安装指南进行安装。
获取流水线代码
Apache Beam SDK 是一个用于数据处理流水线的开源编程模型。您可使用 Apache Beam 程序定义这些流水线,还可以选择 Dataflow 等运行程序来运行流水线。
- 在您的 shell 或终端中,使用 Maven Archetype 插件在包含 Apache Beam SDK 的
WordCount
示例的计算机上创建 Maven 项目:mvn archetype:generate \ -DarchetypeGroupId=org.apache.beam \ -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \ -DarchetypeVersion=2.55.1 \ -DgroupId=org.example \ -DartifactId=word-count-beam \ -Dversion="0.1" \ -Dpackage=org.apache.beam.examples \ -DinteractiveMode=false
该命令会在当前目录下创建一个名为
word-count-beam
的新目录。word-count-beam
目录包含一个简单的pom.xml
文件和一系列计算文本文件字数的示例流水线。 - 验证
word-count-beam
目录包含pom.xml
文件:Linux 或 macOS
cd word-count-beam/ ls
输出如下所示:
pom.xml src
Windows
cd word-count-beam/ dir
输出如下所示:
pom.xml src
- 验证您的 Maven 项目包含示例流水线:
Linux 或 macOS
ls src/main/java/org/apache/beam/examples/
输出如下所示:
DebuggingWordCount.java WindowedWordCount.java common MinimalWordCount.java WordCount.java
Windows
dir src/main/java/org/apache/beam/examples/
输出如下所示:
DebuggingWordCount.java WindowedWordCount.java common MinimalWordCount.java WordCount.java
如需查看这些示例中使用的 Apache Beam 概念的详细介绍,请参阅 Apache Beam WordCount 示例。后面部分中的说明使用 WordCount.java
。
在本地运行流水线
- 在您的 shell 或终端中,从
word-count-beam
目录在本地运行WordCount
流水线:mvn compile exec:java \ -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args="--output=counts"
输出文件的前缀为
counts
并写入word-count-beam
目录。它们包含输入文本中的不重复单词以及每个单词的出现次数。
在 Dataflow 服务上运行流水线
- 在您的 shell 或终端中,从
word-count-beam
目录在 Dataflow 服务上构建并运行WordCount
流水线:mvn -Pdataflow-runner compile exec:java \ -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args="--project=PROJECT_ID \ --gcpTempLocation=gs://BUCKET_NAME/temp/ \ --output=gs://BUCKET_NAME/output \ --runner=DataflowRunner \ --region=REGION"
请替换以下内容:
PROJECT_ID
:您的 Google Cloud 项目 IDBUCKET_NAME
:Cloud Storage 存储桶的名称REGION
:Dataflow 区域,例如us-central1
查看结果
清理
为避免因本页面中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的 Google Cloud 项目。
删除项目
避免产生费用的最简单方法是删除您为本快速入门创建的 Google Cloud 项目。
- 在 Google Cloud 控制台中,进入管理资源页面。
- 在项目列表中,选择要删除的项目,然后点击删除。
- 在对话框中输入项目 ID,然后点击关闭以删除项目。
逐个删除资源
如果您希望保留本快速入门中使用的 Google Cloud 项目,请逐个删除资源:
- 在 Google Cloud 控制台中,进入 Cloud Storage 存储桶页面。
- 点击要删除的存储分区对应的复选框。
- 如需删除存储分区,请点击 删除,然后按照说明操作。
撤消授予 Compute Engine 默认服务账号的角色。对以下每个 IAM 角色运行以下命令一次:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.objectAdmin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role=SERVICE_ACCOUNT_ROLE
-
可选:撤消您创建的身份验证凭据,并删除本地凭据文件。
gcloud auth application-default revoke
-
可选:从 gcloud CLI 撤消凭据。
gcloud auth revoke
后续步骤
- 了解 Apache Beam 编程模型。
- 了解如何使用 Apache Beam 构建流水线。
- 完成 WordCount 和移动游戏示例。