使用 Java 创建 Dataflow 流水线
本文档介绍了如何设置 Google Cloud 项目、创建使用 Java 版 Apache Beam SDK 构建的示例流水线,以及在 Dataflow 服务上运行示例流水线。该流水线会从 Cloud Storage 读取文本文件,计算该文件中不重复字词的数量,然后将字数统计写回 Cloud Storage。如需了解 WordCount 流水线,请观看如何在 Apache Beam 中使用 WordCount 视频。
本教程需要使用 Maven,但您也可以将示例项目从 Maven 转换为 Gradle。如需了解详情,请参阅可选:从 Maven 转换为 Gradle。
如需在 Google Cloud 控制台中直接遵循有关此任务的分步指导,请点击操作演示:
准备工作
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:
gcloud services enable dataflow
compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:
gcloud services enable dataflow
compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
向您的 Compute Engine 默认服务账号授予角色。对以下每个 IAM 角色运行以下命令一次:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.objectAdmin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
- 将
PROJECT_ID
替换为您的项目 ID。 - 将
PROJECT_NUMBER
替换为您的项目编号。 如需查找项目编号,请参阅识别项目或使用gcloud projects describe
命令。 - 将
SERVICE_ACCOUNT_ROLE
替换为每个角色。
-
Create a Cloud Storage bucket and configure it as follows:
-
Set the storage class to
S
(Standard)。 -
将存储位置设置为以下项:
US
(美国)。 -
将
BUCKET_NAME
替换为 唯一的存储桶名称。请勿在存储桶名称中添加敏感信息,因为存储桶命名空间是全局性的,公开可见。 - 在后续部分中根据需要复制以下内容:
- 您的 Cloud Storage 存储桶名称。
- 您的 Google Cloud 项目 ID。 如需查找此 ID,请参阅识别项目。
- 下载并安装 Java Development Kit (JDK) 版本 11。(Dataflow 继续支持版本 8。)验证
JAVA_HOME
环境变量已设置并指向您的 JDK 安装。 - 下载 Apache Maven,并按照适用于您的具体操作系统的 Maven 安装指南进行安装。
gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
-
Set the storage class to
获取流水线代码
Apache Beam SDK 是一个用于数据处理流水线的开源编程模型。您可使用 Apache Beam 程序定义这些流水线,还可以选择 Dataflow 等运行程序来运行流水线。
- 在您的 shell 或终端中,使用 Maven Archetype 插件在包含 Apache Beam SDK 的
WordCount
示例的计算机上创建 Maven 项目:mvn archetype:generate \ -DarchetypeGroupId=org.apache.beam \ -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \ -DarchetypeVersion=2.61.0 \ -DgroupId=org.example \ -DartifactId=word-count-beam \ -Dversion="0.1" \ -Dpackage=org.apache.beam.examples \ -DinteractiveMode=false
该命令会在当前目录下创建一个名为
word-count-beam
的新目录。word-count-beam
目录包含一个简单的pom.xml
文件和一系列计算文本文件字数的示例流水线。 - 验证
word-count-beam
目录包含pom.xml
文件:Linux 或 macOS
cd word-count-beam/ ls
输出如下所示:
pom.xml src
Windows
cd word-count-beam/ dir
输出如下所示:
pom.xml src
- 验证您的 Maven 项目包含示例流水线:
Linux 或 macOS
ls src/main/java/org/apache/beam/examples/
输出如下所示:
DebuggingWordCount.java WindowedWordCount.java common MinimalWordCount.java WordCount.java
Windows
dir src/main/java/org/apache/beam/examples/
输出如下所示:
DebuggingWordCount.java WindowedWordCount.java common MinimalWordCount.java WordCount.java
如需查看这些示例中使用的 Apache Beam 概念的详细介绍,请参阅 Apache Beam WordCount 示例。后面部分中的说明使用 WordCount.java
。
在本地运行流水线
- 在您的 shell 或终端中,从
word-count-beam
目录在本地运行WordCount
流水线:mvn compile exec:java \ -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args="--output=counts"
输出文件的前缀为
counts
并写入word-count-beam
目录。它们包含输入文本中的不重复单词以及每个单词的出现次数。
在 Dataflow 服务上运行流水线
- 在您的 shell 或终端中,从
word-count-beam
目录在 Dataflow 服务上构建并运行WordCount
流水线:mvn -Pdataflow-runner compile exec:java \ -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args="--project=PROJECT_ID \ --gcpTempLocation=gs://BUCKET_NAME/temp/ \ --output=gs://BUCKET_NAME/output \ --runner=DataflowRunner \ --region=REGION"
请替换以下内容:
PROJECT_ID
:您的 Google Cloud 项目 IDBUCKET_NAME
:Cloud Storage 存储桶的名称REGION
:Dataflow 区域,例如us-central1
查看结果
清理
为避免因本页面中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的 Google Cloud 项目。
删除项目
避免产生费用的最简单方法是删除您为本快速入门创建的 Google Cloud 项目。
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
逐个删除资源
如果您希望保留本快速入门中使用的 Google Cloud 项目,请逐个删除资源:
- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click the checkbox for the bucket that you want to delete.
- To delete the bucket, click Delete, and then follow the instructions.
撤消授予 Compute Engine 默认服务账号的角色。对以下每个 IAM 角色运行以下命令一次:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.objectAdmin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role=SERVICE_ACCOUNT_ROLE
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
后续步骤
- 了解 Apache Beam 编程模型。
- 了解如何使用 Apache Beam 构建流水线。
- 完成 WordCount 和移动游戏示例。