快速入门:使用 Python
在本快速入门中,您将学习如何使用 Python 版 Apache Beam SDK 构建定义流水线的程序。然后,您可以使用直接本地运行程序或云端运行程序(如 Dataflow)来运行流水线。
准备工作
- 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
-
在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目。
-
启用 Dataflow、Compute Engine、Cloud Logging、Cloud Storage、Google Cloud Storage JSON、BigQuery、Cloud Pub/Sub、Cloud Datastore 和 Cloud Resource Manager API。
-
创建服务帐号:
-
在 Google Cloud 控制台中,转到创建服务帐号页面。
转到“创建服务帐号” - 选择您的项目。
-
在服务帐号名称字段中,输入一个名称。Google Cloud 控制台会根据此名称填充服务帐号 ID 字段。
在服务帐号说明字段中,输入说明。例如,
Service account for quickstart
。 - 点击创建并继续。
-
将 Project > Owner 角色授予服务帐号。
如需授予该角色,请找到选择角色列表,然后选择 Project > Owner。
- 点击继续。
-
点击完成以完成服务帐号的创建过程。
不要关闭浏览器窗口。您将在下一步骤中用到它。
-
-
创建服务帐号密钥:
- 在 Google Cloud 控制台中,点击您创建的服务帐号的电子邮件地址。
- 点击密钥。
- 点击添加密钥,然后点击创建新密钥。
- 点击创建。JSON 密钥文件将下载到您的计算机上。
- 点击关闭。
-
将环境变量
GOOGLE_APPLICATION_CREDENTIALS
设置为包含凭据的 JSON 文件的路径。 此变量仅适用于当前的 shell 会话,因此,如果您打开新的会话,请重新设置该变量。 -
在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目。
-
启用 Dataflow、Compute Engine、Cloud Logging、Cloud Storage、Google Cloud Storage JSON、BigQuery、Cloud Pub/Sub、Cloud Datastore 和 Cloud Resource Manager API。
-
创建服务帐号:
-
在 Google Cloud 控制台中,转到创建服务帐号页面。
转到“创建服务帐号” - 选择您的项目。
-
在服务帐号名称字段中,输入一个名称。Google Cloud 控制台会根据此名称填充服务帐号 ID 字段。
在服务帐号说明字段中,输入说明。例如,
Service account for quickstart
。 - 点击创建并继续。
-
将 Project > Owner 角色授予服务帐号。
如需授予该角色,请找到选择角色列表,然后选择 Project > Owner。
- 点击继续。
-
点击完成以完成服务帐号的创建过程。
不要关闭浏览器窗口。您将在下一步骤中用到它。
-
-
创建服务帐号密钥:
- 在 Google Cloud 控制台中,点击您创建的服务帐号的电子邮件地址。
- 点击密钥。
- 点击添加密钥,然后点击创建新密钥。
- 点击创建。JSON 密钥文件将下载到您的计算机上。
- 点击关闭。
-
将环境变量
GOOGLE_APPLICATION_CREDENTIALS
设置为包含凭据的 JSON 文件的路径。 此变量仅适用于当前的 shell 会话,因此,如果您打开新的会话,请重新设置该变量。 - 创建 Cloud Storage 存储桶:
- 复制 Google Cloud 项目 ID 和 Cloud Storage 存储桶名称。您将在本文档的后面部分用到这些值。
设置环境
在本部分中,您将使用命令提示符,通过 venv 设置独立的 Python 虚拟环境来运行流水线项目。借助此过程,您可以将一个项目的依赖项与其他项目的依赖项隔离开来。
如果您没有便捷易用的命令提示符,可以使用 Cloud Shell。Cloud Shell 已经安装了适用于 Python 3 的软件包管理器,因此您可以跳过创建虚拟环境的过程。
如需安装 Python,然后创建虚拟环境,请按照以下步骤操作:
- 检查系统中是否已运行 Python 3 和
pip
:python --version python -m pip --version
- 如有必要,请安装 Python 3,然后设置 Python 虚拟环境:按照设置 Python 开发环境页面的安装 Python 和设置 venv部分中提供的说明操作。
完成快速入门后,您可以运行 deactivate
来停用虚拟环境。
获取 Apache Beam SDK
Apache Beam SDK 是一个用于数据流水线的开源编程模型。您可以使用 Apache Beam 程序定义流水线,然后选择 Dataflow 等运行程序来运行流水线。
如需下载并安装 Apache Beam SDK,请按照以下步骤操作:
- 验证您在上一部分中创建的 Python 虚拟环境中。确保提示符以
<env_name>
开头,其中env_name
是虚拟环境的名称。 - 安装 Python wheel 打包标准:
pip install wheel
- 安装 Python 版 Apache Beam SDK 的最新版本:
pip install 'apache-beam[gcp]'
根据连接情况,安装可能需要一段时间。
在本地运行流水线
如需查看流水线如何在本地运行,请使用 wordcount
示例的现成 Python 模块,该模块随 apache_beam
软件包提供。
wordcount
流水线示例会执行以下操作:
接收一个文本文件作为输入。
此文本文件位于 Cloud Storage 存储桶中,其资源名称为
gs://dataflow-samples/shakespeare/kinglear.txt
。- 将每一行解析为字词。
- 对标记化字词进行词频计数。
如需在本地暂存 wordcount
流水线,请按照以下步骤操作:
- 从本地终端运行
wordcount
示例:python -m apache_beam.examples.wordcount \ --output outputs
- 查看该流水线的输出:
more outputs*
- 如需退出,请按 q。
wordcount.py
源代码。在 Dataflow 服务上运行流水线
在本部分中,将从 Dataflow 服务上的apache_beam
软件包运行 wordcount
示例流水线。此示例指定 DataflowRunner
作为 --runner
的参数。- 运行流水线:
python -m apache_beam.examples.wordcount \ --region DATAFLOW_REGION \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://STORAGE_BUCKET/results/outputs \ --runner DataflowRunner \ --project PROJECT_ID \ --temp_location gs://STORAGE_BUCKET/tmp/
请替换以下内容:
查看结果
使用 Dataflow 运行流水线时,您的结果存储在 Cloud Storage 存储桶中。在本部分中,使用 Cloud Console 或本地终端验证流水线正在运行。
Cloud Console
要在 Cloud Console 中查看结果,请按照以下步骤操作:
本地终端
如需通过终端查看结果,请使用 gsutil
工具。
您也可以从 Cloud Shell 运行这些命令。
- 列出输出文件:
gsutil ls -lh "gs://STORAGE_BUCKET/results/outputs*"
- 查看输出文件中的结果:
gsutil cat "gs://STORAGE_BUCKET/results/outputs*"
将 STORAGE_BUCKET
替换为流水线程序中使用的 Cloud Storage 存储桶的名称。
修改流水线代码
上述示例中的wordcount
流水线区分大写和小写字词。以下步骤演示了如何修改流水线,以使 wordcount
流水线不区分大小写。- 在本地机器上,从 Apache Beam GitHub 代码库下载
wordcount
代码的最新副本。 - 从本地终端运行流水线:
python wordcount.py --output outputs
- 查看结果。
more outputs*
- 如需退出,请按 q。
- 在您选择的编辑器中,打开
wordcount.py
文件。 - 在
run
函数中,检查流水线步骤:counts = ( lines | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) | 'PairWIthOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum))
split
后面的几行被拆分为字符串形式的字词。 - 如需对字符串进行小写处理,请修改
split
后面的行:counts = ( lines | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) | 'lowercase' >> beam.Map(str.lower) | 'PairWIthOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum))
此修改会将str.lower
函数映射到每个字词上。这一行相当于beam.Map(lambda word: str.lower(word))
。 - 保存该文件并运行修改后的
wordcount
作业:python wordcount.py --output outputs
- 查看修改后的流水线的结果:
more outputs*
- 如需退出,请按 q。
清理
为避免因本页中使用的资源导致您的 Google Cloud 帐号产生费用,请按照以下步骤操作。
- 在 Google Cloud 控制台中,进入 Cloud Storage 存储桶页面。
- 点击要删除的存储分区对应的复选框。
- 如需删除存储分区,请点击 删除,然后按照说明操作。