使用 Apache Beam 笔记本进行开发

借助 Apache Beam 交互式运行程序和 JruyterLab 笔记本,您可以迭代地开发流水线、检查流水线图,并在“读取-求值-输出”循环 (REPL) 工作流中解析独立的 PCollection。这些 Apache Beam 笔记本通过 Notebooks 提供,后者是一项代管式服务,用于托管预装了最新数据科学和机器学习框架的笔记本虚拟机。

本指南重点介绍 Apache Beam 笔记本的功能,但未说明如何构建。如需详细了解 Apache Beam,请参阅 Apache Beam 编程指南

准备工作

  1. 登录您的 Google Cloud 帐号。如果您是 Google Cloud 新手,请创建一个帐号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 在 Google Cloud Console 的项目选择器页面上,选择或创建一个 Google Cloud 项目。

    转到“项目选择器”

  3. 确保您的 Cloud 项目已启用结算功能。 了解如何确认您的项目是否已启用结算功能

  4. 启用 Compute Engine, Notebooks API。

    启用 API

完成本指南后,您可以删除所创建的资源以避免继续计费。如需了解详情,请参阅清理

启动 Apache Beam 笔记本实例

  1. 在 Google Cloud Console 的“项目选择器”页面上,选择或创建一个 Google Cloud 项目。
  2. 在侧边栏中导航到 Dataflow,然后点击笔记本
  3. 在工具栏中,点击 新实例
  4. 选择 Apache Beam > 不带 GPU (Without GPUs)。
  5. (可选)如果要在 GPU 上运行笔记本,您可以选择 Apache Beam > With 1 NVIDIA Tesla T4
  6. 新建笔记本实例页面上,选择笔记本虚拟机的网络,然后点击创建
  7. (可选)如果您选择使用 GPU 创建笔记本实例,在新建笔记本实例页面上,必须选中自动为我安装 NVIDIA GPU 驱动程序,然后点击创建
  8. (可选)如果您要设置自定义笔记本实例,请点击自定义。如需详细了解如何自定义实例属性,请参阅创建具有特定属性的 Notebooks 实例
  9. 链接生效后,点击打开 JupyterLab。Notebooks 会创建一个新的 Apache Beam 笔记本实例。

安装依赖项(可选)

Apache Beam 笔记本已经预装了 Apache Beam 和 Google Cloud 连接器依赖项。如果您的流水线包含依赖于第三方库的自定义连接器或自定义 PTransform,您可以在创建笔记本实例后进行安装。如需了解详情,请参阅 Notebooks 文档中的安装依赖项

Apache Beam 笔记本使用入门

打开 Notebooks 实例后,您可以在示例文件夹中找到示例笔记本。目前提供以下示例笔记本:

  • Word Count
  • Streaming Word Count
  • Streaming NYC Taxi Ride Data
  • Dataflow Word Count
  • 将 GPU 与 Apache Beam 搭配使用
  • 直观呈现数据

您可以在教程文件夹中找到介绍 Apache Beam 基础知识的其他教程。目前提供以下示例笔记本:

  • 基本运算
  • 元素相关操作
  • 聚合
  • Windows
  • IO 操作
  • 流式插入

这些笔记本包含说明性文字和注释代码块,可帮助您了解 Apache Beam 概念和 API 使用情况。 教程还提供实践练习,供您练习学到的概念。

创建笔记本实例

导航到文件 > 新建 > 笔记本,然后选择 Apache Beam 2.22 或更高版本的内核。

Apache Beam 安装到您的笔记本实例后,在笔记本中加入 interactive_runnerinteractive_beam 模块。

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

如果您的笔记本使用其他 Google API,请添加以下 import 语句:

from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth

设置交互选项

以下命令会设置 InteractiveRunner 记录无界限来源中的数据的时间量。在此示例中,时长设置为 10 分钟。

ib.options.recording_duration = '10m'

您还可以通过 recording_size_limit 属性来更改无界限来源的记录大小限制(以字节为单位)。

# Set the recording size limit to 1 GB.
ib.options.recording_size_limit = 1e9

如需了解其他交互选项,请参阅 interactive_beam.options 类

创建流水线

使用 InteractiveRunner 对象初始化流水线。

options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

p = beam.Pipeline(InteractiveRunner(), options=options)

读取和直观呈现数据

以下示例展示了一个 Apache Beam 流水线,该流水线会创建对指定 Pub/Sub 主题的订阅,并读取订阅。

words = p
    | "read" >> beam.io.ReadFromPubSub(topic="projects/pubsub-public-data/topics/shakespeare-kinglear")

流水线按窗口对来源的字词进行统计。它会创建固定窗口,每个窗口的时长均为 10 秒。

windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

在将数据划分窗口后,系统会按窗口统计字词。

windowed_words_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

show() 方法会直观呈现笔记本中生成的 PCollection。

ib.show(windowed_word_counts, include_window_info=True)

以表格形式直观呈现 PCollection 的 show 方法。

您可以通过设置两个可选参数 ndurationshow() 重新限制结果集范围。设置 n 会将结果集限制为最多显示 n 个元素,例如 20。如果未设置 n,则默认行为是列出来源记录结束之前捕获的最新元素。设置 duration 会将结果集限制为从来源记录开头处开始的指定秒数的数据。如果未设置 duration,则默认行为是记录结束之前的所有元素。

如果同时设置了这两个可选参数,则只要达到任一阈值,show() 就会停止。在以下示例中,show() 最多返回 20 个元素,这些元素根据记录的来源中首个 30 秒的数据进行计算。

ib.show(windowed_word_counts, include_window_info=True, n=20, duration=30)

如需以可视化效果显示数据,请向 show() 方法传递 visualize_data=True。您可以对可视化数据应用多个过滤条件。以下可视化数据允许您按标签和坐标轴进行过滤:

以丰富的可过滤界面元素直观呈现 PCollection 的 show 方法。

Apache Beam 笔记本中另一个实用的可视化工具是 Pandas DataFrame。以下示例首先将字词转换为小写,然后计算每个字词的频率。

windowed_lower_word_counts = (windowed_words
   | beam.Map(lambda word: word.lower())
   | "count" >> beam.combiners.Count.PerElement())

collect() 方法在 Pandas DataFrame 中提供输出。

ib.collect(windowed_lower_word_counts, include_window_info=True)

在 Pandas DataFrame 中展示 PCollection 的 collect 方法。

了解流水线的记录状态

除了可视化之外,您还可以通过调用 describe 检查笔记本实例中一个或所有流水线的记录状态。

# Return the recording status of a specific pipeline. Leave the parameter list empty to return
# the recording status of all pipelines.
ib.recordings.describe(p)

describe() 方法可提供以下详细信息:

  • 磁盘上流水线的所有记录的总大小(以字节为单位)
  • 后台记录作业的开始时间(与 Unix 计时原点之间相隔的秒数)
  • 后台记录作业的当前流水线状态
  • 流水线的 Python 变量

从笔记本中创建的流水线启动 Dataflow 作业

  1. (可选)在使用笔记本运行 Dataflow 作业之前,请重启内核、重新运行所有单元并验证输出。如果您跳过此步骤,则笔记本中的隐藏状态可能会影响流水线对象中的作业图。
  2. 启用 Dataflow API
  3. 添加以下 import 语句:

    from apache_beam.runners import DataflowRunner
    
  4. 传递您的流水线选项

    # Set up Apache Beam pipeline options.
    options = pipeline_options.PipelineOptions()
    
    # Set the project to the default project in your current Google Cloud
    # environment.
    _, options.view_as(GoogleCloudOptions).project = google.auth.default()
    
    # Set the Google Cloud region to run Dataflow.
    options.view_as(GoogleCloudOptions).region = 'us-central1'
    
    # Choose a Cloud Storage location.
    dataflow_gcs_location = 'gs://<change me>/dataflow'
    
    # Set the staging location. This location is used to stage the
    # Dataflow pipeline and SDK binary.
    options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
    
    # Set the temporary location. This location is used to store temporary files
    # or intermediate results before outputting to the sink.
    options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
    
    # Set the SDK location. This is used by Dataflow to locate the
    # SDK needed to run the pipeline.
    options.view_as(pipeline_options.SetupOptions).sdk_location = (
        '/root/apache-beam-custom/packages/beam/sdks/python/dist/apache-beam-%s0.tar.gz' %
        beam.version.__version__)
    

    您可以调整参数值。例如,您可以从 us-central1 更改 region 值。

  5. 使用 DataflowRunner 运行流水线。这将在 Dataflow 服务上运行作业。

    runner = DataflowRunner()
    runner.run_pipeline(p, options=options)
    

    p创建流水线中的流水线对象。

如需查看如何在交互式笔记本上执行此转换的示例,请参阅笔记本实例中的 Dataflow Word Count 笔记本。

或者,您也可以将笔记本导出为可执行脚本,按照上述步骤修改生成的 .py 文件,然后将流水线部署到 Dataflow 服务。

保存笔记本

您创建的笔记本将本地保存到正在运行的笔记本实例中。如果您在开发期间重置或关停笔记本实例,则这些新笔记本将被删除。如需保留笔记本以供将来使用,请将它们本地下载到您的工作站,将其保存到 GitHub 或以其他文件格式导出。

清理

使用完 Apache Beam 笔记本实例后,请关停笔记本实例以清理您在 Google Cloud 上创建的资源。