借助 Apache Beam 交互式运行程序和 JruyterLab 笔记本,您可以迭代地开发流水线、检查流水线图,并在“读取-求值-输出”循环 (REPL) 工作流中解析独立的 PCollection。这些 Apache Beam 笔记本通过 AI Platform Notebooks 提供,后者是一项代管式服务,用于托管预装了最新数据科学和机器学习框架的笔记本虚拟机。
本指南重点介绍 Apache Beam 笔记本的功能,但未说明如何构建。如需详细了解 Apache Beam,请参阅 Apache Beam 编程指南。
准备工作
-
登录您的 Google 帐号。
如果您还没有 Google 帐号,请注册新帐号。
-
在 Google Cloud Console 的项目选择器页面上,选择或创建一个 Google Cloud 项目。
-
确保您的 Cloud 项目已启用结算功能。 了解如何确认您的项目是否已启用结算功能。
- 启用 Compute Engine API。
完成本指南后,您可以删除所创建的资源以避免继续计费。如需了解详情,请参阅清理。
启动 Apache Beam 笔记本实例
- 在 Google Cloud Console 的“项目选择器”页面上,选择或创建一个 Google Cloud 项目。
- 在侧边栏中导航到 Dataflow,然后点击笔记本。
- 在工具栏中,点击 新实例。
- 选择 Apache Beam。
- 在新建笔记本实例页面上,选择笔记本虚拟机的网络,然后点击创建。
- (可选)如果您要设置自定义笔记本实例,请点击自定义。如需详细了解自定义实例属性,请参阅创建具有特定属性的 AI Platform Notebooks 实例。
- 链接生效后,点击打开 JupyterLab。AI Platform Notebooks 会创建一个新的 Apache Beam 笔记本实例。
安装依赖项(可选)
Apache Beam 笔记本已经预装了 Apache Beam 和 Google Cloud 连接器依赖项。如果您的流水线包含依赖于第三方库的自定义连接器或自定义 PTransform,您可以在创建笔记本实例后进行安装。如需了解详情,请参阅 AI Platform Notebooks 文档中的安装依赖项。
Apache Beam 笔记本使用入门
打开 AI Platform Notebooks 实例后,您可以在示例文件夹中找到示例笔记本。目前提供以下示例笔记本:
- Word Count
- Streaming Word Count
- Streaming NYC Taxi Ride Data
- Dataflow Word Count
这些笔记本包含说明性文字和注释代码块,可帮助您了解 Apache Beam 概念和 API 使用情况。
创建笔记本实例
导航到文件 > 新建 > 笔记本,然后选择 Apache Beam 2.22 或更高版本的内核。
Apache Beam 安装到您的笔记本实例后,在笔记本中加入 interactive_runner
和 interactive_beam
模块。
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
如果您的笔记本使用其他 Google API,请添加以下 import 语句:
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
设置交互选项
以下命令会设置 InteractiveRunner 记录无界限来源中的数据的时间量。在此示例中,时长设置为 10 分钟。
ib.options.recording_duration = '10m'
您还可以通过 recording_size_limit
属性来更改无界限来源的记录大小限制(以字节为单位)。
# Set the recording size limit to 1 GB.
ib.options.recording_size_limit = 1e9
如需了解其他交互选项,请参阅 interactive_beam.options 类。
创建流水线
使用 InteractiveRunner
对象初始化流水线。
options = pipeline_options.PipelineOptions()
# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True
p = beam.Pipeline(InteractiveRunner(), options=options)
读取和直观呈现数据
以下示例展示了一个 Apache Beam 流水线,此流水线为给定 Pub/Sub 主题创建一个订阅并从此订阅中读取数据。
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
流水线按窗口对来源的字词进行统计。它会创建固定窗口,每个窗口的时长均为 10 秒。
windowed_words = (words
| "window" >> beam.WindowInto(beam.window.FixedWindows(10)))
在将数据划分窗口后,系统会按窗口统计字词。
windowed_words_counts = (windowed_words
| "count" >> beam.combiners.Count.PerElement())
show()
方法会直观呈现笔记本中生成的 PCollection。
ib.show(windowed_word_counts, include_window_info=True)
您可以通过设置两个可选参数 n
和 duration
从 show()
重新限制结果集范围。设置 n
会将结果集限制为最多显示 n
个元素,例如 20。如果未设置 n
,则默认行为是列出来源记录结束之前捕获的最新元素。设置 duration
会将结果集限制为从来源记录开头处开始的指定秒数的数据。如果未设置 duration
,则默认行为是记录结束之前的所有元素。
如果同时设置了这两个可选参数,则只要达到任一阈值,show()
就会停止。在以下示例中,show()
最多可返回 20 个元素,这些元素是根据从记录的来源开始的前 30 秒数据计算的。
ib.show(windowed_word_counts, include_window_info=True, n=20, duration=30)
如需以可视化效果显示数据,请向 show()
方法传递 visualize_data=True
。您可以对可视化数据应用多个过滤条件。以下可视化数据允许您按标签和坐标轴进行过滤:
Apache Beam 笔记本中另一个实用的可视化工具是 Pandas DataFrame。以下示例首先将字词转换为小写,然后计算每个字词的频率。
windowed_lower_word_counts = (windowed_words
| beam.Map(lambda word: word.lower())
| "count" >> beam.combiners.Count.PerElement())
collect()
方法在 Pandas DataFrame 中提供输出。
ib.collect(windowed_lower_word_counts, include_window_info=True)
了解流水线的记录状态
除了可视化之外,您还可以通过调用 describe 检查笔记本实例中一个或所有流水线的记录状态。
# Return the recording status of a specific pipeline. Leave the parameter list empty to return
# the recording status of all pipelines.
ib.recordings.describe(p)
describe()
方法可提供以下详细信息:
- 磁盘上流水线的所有记录的总大小(以字节为单位)
- 后台记录作业的开始时间(与 Unix 计时原点之间相隔的秒数)
- 后台记录作业的当前流水线状态
- 流水线的 Python 变量
从笔记本中创建的流水线启动 Dataflow 作业
- (可选)在使用笔记本运行 Dataflow 作业之前,请重启内核、重新运行所有单元并验证输出。如果您跳过此步骤,则笔记本中的隐藏状态可能会影响流水线对象中的作业图。
- 启用 Dataflow API。
添加以下 import 语句:
from apache_beam.runners import DataflowRunner
传递您的流水线选项。
# Set up Apache Beam pipeline options. options = pipeline_options.PipelineOptions() # Set the project to the default project in your current Google Cloud # environment. _, options.view_as(GoogleCloudOptions).project = google.auth.default() # Set the Google Cloud region to run Dataflow. options.view_as(GoogleCloudOptions).region = 'us-central1' # Choose a Cloud Storage location. dataflow_gcs_location = 'gs://<change me>/dataflow' # Set the staging location. This location is used to stage the # Dataflow pipeline and SDK binary. options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location # Set the temporary location. This location is used to store temporary files # or intermediate results before outputting to the sink. options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location # Set the SDK location. This is used by Dataflow to locate the # SDK needed to run the pipeline. options.view_as(pipeline_options.SetupOptions).sdk_location = ( '/root/apache-beam-custom/packages/beam/sdks/python/dist/apache-beam-%s0.tar.gz' % beam.version.__version__)
您可以调整参数值。例如,您可以从
us-central1
更改region
值。使用
DataflowRunner
运行流水线。这将在 Dataflow 服务上运行作业。runner = DataflowRunner() runner.run_pipeline(p, options=options)
p
是创建流水线中的流水线对象。
如需查看如何在交互式笔记本上执行此转换的示例,请参阅笔记本实例中的 Dataflow Word Count 笔记本。
或者,您也可以将笔记本导出为可执行脚本,按照上述步骤修改生成的 .py
文件,然后将流水线部署到 Dataflow 服务。
保存笔记本
您创建的笔记本将本地保存到正在运行的笔记本实例中。如果您在开发期间重置或关停笔记本实例,则这些新笔记本将被删除。如需保留笔记本以供将来使用,请将它们本地下载到您的工作站,将其保存到 GitHub 或以其他文件格式导出。
清理
使用完 Apache Beam 笔记本实例后,请关停笔记本实例以清理您在 Google Cloud 上创建的资源。