快速入门:使用 Python
在本快速入门中,您将学习如何使用 Python 版 Apache Beam SDK 构建定义流水线的程序。然后,您可以使用直接本地运行程序或云端运行程序(如 Dataflow)来运行流水线。
准备工作
- 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Enable the Dataflow、Compute Engine、Cloud Logging、Cloud Storage、Google Cloud Storage JSON、BigQuery、Cloud Pub/Sub、Cloud Datastore 和 Cloud Resource Manager APIs.
-
Create a service account:
-
In the Google Cloud console, go to the Create service account page.
Go to Create service account - Select your project.
-
In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.
In the Service account description field, enter a description. For example,
Service account for quickstart
. - Click Create and continue.
-
Grant the Project > Owner role to the service account.
To grant the role, find the Select a role list, then select Project > Owner.
- Click Continue.
-
Click Done to finish creating the service account.
Do not close your browser window. You will use it in the next step.
-
-
Create a service account key:
- In the Google Cloud console, click the email address for the service account that you created.
- Click Keys.
- Click Add key, and then click Create new key.
- Click Create. A JSON key file is downloaded to your computer.
- Click Close.
-
Set the environment variable
GOOGLE_APPLICATION_CREDENTIALS
to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again. -
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Enable the Dataflow、Compute Engine、Cloud Logging、Cloud Storage、Google Cloud Storage JSON、BigQuery、Cloud Pub/Sub、Cloud Datastore 和 Cloud Resource Manager APIs.
-
Create a service account:
-
In the Google Cloud console, go to the Create service account page.
Go to Create service account - Select your project.
-
In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.
In the Service account description field, enter a description. For example,
Service account for quickstart
. - Click Create and continue.
-
Grant the Project > Owner role to the service account.
To grant the role, find the Select a role list, then select Project > Owner.
- Click Continue.
-
Click Done to finish creating the service account.
Do not close your browser window. You will use it in the next step.
-
-
Create a service account key:
- In the Google Cloud console, click the email address for the service account that you created.
- Click Keys.
- Click Add key, and then click Create new key.
- Click Create. A JSON key file is downloaded to your computer.
- Click Close.
-
Set the environment variable
GOOGLE_APPLICATION_CREDENTIALS
to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again. - 创建 Cloud Storage 存储桶:
- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click Create bucket.
- On the Create a bucket page, enter your bucket information. To go to the next
step, click Continue.
- For Name your bucket, enter a unique bucket name. Don't include sensitive information in the bucket name, because the bucket namespace is global and publicly visible.
-
For Choose where to store your data, do the following:
- Select a Location type option.
- Select a Location option.
- For Choose a default storage class for your data, select the following: Standard.
- For Choose how to control access to objects, select an Access control option.
- For Advanced settings (optional), specify an encryption method, a retention policy, or bucket labels.
- Click Create.
- 复制 Google Cloud 项目 ID 和 Cloud Storage 存储桶名称。您将在本文档的后面部分用到这些值。
设置环境
在本部分中,您将使用命令提示符,通过 venv 设置独立的 Python 虚拟环境来运行流水线项目。借助此过程,您可以将一个项目的依赖项与其他项目的依赖项隔离开来。
如果您没有便捷易用的命令提示符,可以使用 Cloud Shell。Cloud Shell 已经安装了适用于 Python 3 的软件包管理器,因此您可以跳过创建虚拟环境的过程。
如需安装 Python,然后创建虚拟环境,请按照以下步骤操作:
- 检查系统中是否已运行 Python 3 和
pip
:python --version python -m pip --version
- 如有必要,请安装 Python 3,然后设置 Python 虚拟环境:按照设置 Python 开发环境页面的安装 Python 和设置 venv部分中提供的说明操作。
完成快速入门后,您可以运行 deactivate
来停用虚拟环境。
获取 Apache Beam SDK
Apache Beam SDK 是一个用于数据流水线的开源编程模型。您可以使用 Apache Beam 程序定义流水线,然后选择 Dataflow 等运行程序来运行流水线。
如需下载并安装 Apache Beam SDK,请按照以下步骤操作:
- 验证您在上一部分中创建的 Python 虚拟环境中。确保提示符以
<env_name>
开头,其中env_name
是虚拟环境的名称。 - 安装 Python wheel 打包标准:
pip install wheel
- 安装 Python 版 Apache Beam SDK 的最新版本:
pip install 'apache-beam[gcp]'
根据连接情况,安装可能需要一段时间。
在本地运行流水线
如需查看流水线如何在本地运行,请使用 wordcount
示例的现成 Python 模块,该模块随 apache_beam
软件包提供。
wordcount
流水线示例会执行以下操作:
接收一个文本文件作为输入。
此文本文件位于 Cloud Storage 存储桶中,其资源名称为
gs://dataflow-samples/shakespeare/kinglear.txt
。- 将每一行解析为字词。
- 对标记化字词进行词频计数。
如需在本地暂存 wordcount
流水线,请按照以下步骤操作:
- 从本地终端运行
wordcount
示例:python -m apache_beam.examples.wordcount \ --output outputs
- 查看该流水线的输出:
more outputs*
- 如需退出,请按 q。
wordcount.py
源代码。在 Dataflow 服务上运行流水线
在本部分中,将从 Dataflow 服务上的apache_beam
软件包运行 wordcount
示例流水线。此示例指定 DataflowRunner
作为 --runner
的参数。- 运行流水线:
python -m apache_beam.examples.wordcount \ --region DATAFLOW_REGION \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://STORAGE_BUCKET/results/outputs \ --runner DataflowRunner \ --project PROJECT_ID \ --temp_location gs://STORAGE_BUCKET/tmp/
请替换以下内容:
查看结果
使用 Dataflow 运行流水线时,您的结果存储在 Cloud Storage 存储桶中。在本部分中,使用 Cloud Console 或本地终端验证流水线正在运行。
Cloud Console
要在 Cloud Console 中查看结果,请按照以下步骤操作:
本地终端
如需通过终端查看结果,请使用 gsutil
工具。
您也可以从 Cloud Shell 运行这些命令。
- 列出输出文件:
gsutil ls -lh "gs://STORAGE_BUCKET/results/outputs*"
- 查看输出文件中的结果:
gsutil cat "gs://STORAGE_BUCKET/results/outputs*"
将 STORAGE_BUCKET
替换为流水线程序中使用的 Cloud Storage 存储桶的名称。
修改流水线代码
上述示例中的wordcount
流水线区分大写和小写字词。以下步骤演示了如何修改流水线,以使 wordcount
流水线不区分大小写。- 在本地机器上,从 Apache Beam GitHub 代码库下载
wordcount
代码的最新副本。 - 从本地终端运行流水线:
python wordcount.py --output outputs
- 查看结果。
more outputs*
- 如需退出,请按 q。
- 在您选择的编辑器中,打开
wordcount.py
文件。 - 在
run
函数中,检查流水线步骤:counts = ( lines | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) | 'PairWIthOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum))
split
后面的几行被拆分为字符串形式的字词。 - 如需对字符串进行小写处理,请修改
split
后面的行: 此修改会将counts = ( lines | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) | 'lowercase' >> beam.Map(str.lower) | 'PairWIthOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum))
str.lower
函数映射到每个字词上。这一行相当于beam.Map(lambda word: str.lower(word))
。 - 保存该文件并运行修改后的
wordcount
作业:python wordcount.py --output outputs
- 查看修改后的流水线的结果:
more outputs*
- 如需退出,请按 q。
清理
为避免因本页中使用的资源导致您的 Google Cloud 帐号产生费用,请按照以下步骤操作。
- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click the checkbox for the bucket that you want to delete.
- To delete the bucket, click Delete, and then follow the instructions.