快速入门:使用 Python

在本快速入门中,您将学习如何使用 Python 版 Apache Beam SDK 构建定义流水线的程序。然后,您可以使用直接本地运行程序或云端运行程序(如 Dataflow)来运行流水线。

准备工作

  1. 登录您的 Google Cloud 帐号。如果您是 Google Cloud 新手,请创建一个帐号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 在 Google Cloud Console 的项目选择器页面上,选择或创建一个 Google Cloud 项目。

    转到“项目选择器”

  3. 确保您的 Cloud 项目已启用结算功能。 了解如何确认您的项目是否已启用结算功能

  4. 启用 Dataflow、Compute Engine、Cloud Logging、Cloud Storage、Google Cloud Storage JSON、BigQuery、Cloud Pub/Sub、Cloud Datastore 和 Cloud Resource Manager API。

    启用 API

  5. 创建服务帐号:

    1. 在 Cloud Console 中,转到创建服务帐号页面。

      转到“创建服务帐号”
    2. 选择一个项目。
    3. 服务帐号名称字段中,输入一个名称。 Cloud Console 会根据此名称填充服务帐号 ID 字段。

      服务帐号说明字段中,输入说明。例如,Service account for quickstart

    4. 点击创建并继续
    5. 点击选择角色字段。

      快速访问下,点击基本,然后点击所有者

    6. 点击继续
    7. 点击完成以完成服务帐号的创建过程。

      不要关闭浏览器窗口。您将在下一步骤中用到它。

  6. 创建服务帐号密钥:

    1. 在 Cloud Console 中,点击您创建的服务帐号的电子邮件地址。
    2. 点击密钥
    3. 依次点击添加密钥创建新密钥
    4. 点击创建。JSON 密钥文件将下载到您的计算机上。
    5. 点击关闭
  7. 将环境变量 GOOGLE_APPLICATION_CREDENTIALS 设置为包含您的服务帐号密钥的 JSON 文件的路径。 此变量仅适用于当前的 shell 会话,因此,如果您打开新的会话,请重新设置该变量。

  8. 在 Google Cloud Console 的项目选择器页面上,选择或创建一个 Google Cloud 项目。

    转到“项目选择器”

  9. 确保您的 Cloud 项目已启用结算功能。 了解如何确认您的项目是否已启用结算功能

  10. 启用 Dataflow、Compute Engine、Cloud Logging、Cloud Storage、Google Cloud Storage JSON、BigQuery、Cloud Pub/Sub、Cloud Datastore 和 Cloud Resource Manager API。

    启用 API

  11. 创建服务帐号:

    1. 在 Cloud Console 中,转到创建服务帐号页面。

      转到“创建服务帐号”
    2. 选择一个项目。
    3. 服务帐号名称字段中,输入一个名称。 Cloud Console 会根据此名称填充服务帐号 ID 字段。

      服务帐号说明字段中,输入说明。例如,Service account for quickstart

    4. 点击创建并继续
    5. 点击选择角色字段。

      快速访问下,点击基本,然后点击所有者

    6. 点击继续
    7. 点击完成以完成服务帐号的创建过程。

      不要关闭浏览器窗口。您将在下一步骤中用到它。

  12. 创建服务帐号密钥:

    1. 在 Cloud Console 中,点击您创建的服务帐号的电子邮件地址。
    2. 点击密钥
    3. 依次点击添加密钥创建新密钥
    4. 点击创建。JSON 密钥文件将下载到您的计算机上。
    5. 点击关闭
  13. 将环境变量 GOOGLE_APPLICATION_CREDENTIALS 设置为包含您的服务帐号密钥的 JSON 文件的路径。 此变量仅适用于当前的 shell 会话,因此,如果您打开新的会话,请重新设置该变量。

  14. 创建 Cloud Storage 存储分区:
    1. 在 Cloud Console 中,转到 Cloud Storage 浏览器页面。

      转到浏览器

    2. 点击创建存储分区
    3. 创建存储分区页面上,输入您的存储分区信息。要转到下一步,请点击继续
      • 指定存储分区的名称中,输入唯一的存储分区名称。请勿在存储分区名称中添加敏感信息,因为存储分区命名空间是全局性的,公开可见。
      • 对于选择数据存储位置,执行以下操作:
        • 选择位置类型选项。
        • 选择位置选项。
      • 对于为数据选择一个默认存储类别,选择以下项: Standard
      • 对于选择如何控制对象的访问权限,请选择访问权限控制选项。
      • 对于高级设置(可选),请指定加密方法保留政策存储分区标签
    4. 点击创建
  15. 复制 Google Cloud 项目 ID 和 Cloud Storage 存储桶名称。您将在本文档的后面部分用到这些值。

设置环境

在本部分中,您将使用命令提示符,通过 venv 设置独立的 Python 虚拟环境来运行流水线项目。借助此过程,您可以将一个项目的依赖项与其他项目的依赖项隔离开来。

如果您没有便捷易用的命令提示符,可以使用 Cloud Shell。Cloud Shell 已经安装了适用于 Python 3 的软件包管理器,因此您可以跳过创建虚拟环境的过程。

如需安装 Python,然后创建虚拟环境,请按照以下步骤操作:

  1. 检查系统中是否已运行 Python 3 和 pip
    python --version
    python -m pip --version
    
  2. 如有必要,请安装 Python 3,然后设置 Python 虚拟环境:按照设置 Python 开发环境页面的安装 Python 和设置 venv部分中提供的说明操作。

完成快速入门后,您可以运行 deactivate 来停用虚拟环境。

获取 Apache Beam SDK

Apache Beam SDK 是一个用于数据流水线的开源编程模型。您可以使用 Apache Beam 程序定义流水线,然后选择 Dataflow 等运行程序来运行流水线。

如需下载并安装 Apache Beam SDK,请按照以下步骤操作:

  1. 验证您在上一部分中创建的 Python 虚拟环境中。确保提示符以 <env_name> 开头,其中 env_name 是虚拟环境的名称。
  2. 安装 Python wheel 打包标准:
    pip install wheel
    
  3. 安装 Python 版 Apache Beam SDK 的最新版本:
  4. pip install 'apache-beam[gcp]'

    根据连接情况,安装可能需要一段时间。

在本地运行流水线

如需查看流水线如何在本地运行,请使用 wordcount 示例的现成 Python 模块,该模块随 apache_beam 软件包提供。

wordcount 流水线示例会执行以下操作:

  1. 接收一个文本文件作为输入。

    此文本文件位于 Cloud Storage 存储分区中,其资源名称为 gs://dataflow-samples/shakespeare/kinglear.txt

  2. 将每一行解析为字词。
  3. 对标记化字词进行词频计数。

如需在本地暂存 wordcount 流水线,请按照以下步骤操作:

  1. 从本地终端运行 wordcount 示例:
    python -m apache_beam.examples.wordcount \
      --output outputs
  2. 查看该流水线的输出:
    more outputs*
  3. 如需退出,请按 q
通过在本地运行流水线,您可以测试和调试 Apache Beam 程序。您可以在 Apache Beam GitHub 上查看 wordcount.py 源代码。

在 Dataflow 服务上运行流水线

在本部分中,将从 Dataflow 服务上的 apache_beam 软件包运行 wordcount 示例流水线。此示例指定 DataflowRunner 作为 --runner 的参数。
  • 运行流水线:
    python -m apache_beam.examples.wordcount \
        --region DATAFLOW_REGION \
        --input gs://dataflow-samples/shakespeare/kinglear.txt \
        --output gs://STORAGE_BUCKET/results/outputs \
        --runner DataflowRunner \
        --project PROJECT_ID \
        --temp_location gs://STORAGE_BUCKET/tmp/

    请替换以下内容:

    • DATAFLOW_REGION:要在其中部署 Dataflow 作业的区域端点,例如 europe-west1

      --region 标志会替换元数据服务器、本地客户端或环境变量中设置的默认区域。

    • STORAGE_BUCKET您之前复制的 Cloud Storage 名称
    • PROJECT_ID您之前复制的 Google Cloud 项目 ID

查看结果

使用 Dataflow 运行流水线时,您的结果存储在 Cloud Storage 存储桶中。在本部分中,使用 Cloud Console 或本地终端验证流水线正在运行。

Cloud Console

要在 Cloud Console 中查看结果,请按照以下步骤操作:

  1. 在 Cloud Console 中,转到 Dataflow 作业页面。

    转到作业

    作业页面会显示 wordcount 作业的详细信息,包括状态最初为正在运行,然后变为成功

  2. 转到 Cloud Storage 浏览器页面。

    转到浏览器

  3. 在项目的存储桶列表中,点击您之前创建的存储桶。

    wordcount 目录中,系统会显示已创建的作业的输出文件。

本地终端

如需通过终端查看结果,请使用 gsutil 工具。 您也可以从 Cloud Shell 运行这些命令。

  1. 列出输出文件:
    gsutil ls -lh "gs://STORAGE_BUCKET/results/outputs*"  
  2. STORAGE_BUCKET 替换为流水线程序中使用的 Cloud Storage 存储桶的名称。

  3. 查看输出文件中的结果:
    gsutil cat "gs://STORAGE_BUCKET/results/outputs*"

修改流水线代码

上述示例中的 wordcount 流水线区分大写和小写字词。以下步骤演示了如何修改流水线,以使 wordcount 流水线不区分大小写。
  1. 在本地机器上,从 Apache Beam GitHub 代码库下载 wordcount 代码的最新副本。
  2. 从本地终端运行流水线:
    python wordcount.py --output outputs
  3. 查看结果。
    more outputs*
  4. 如需退出,请按 q
  5. 在您选择的编辑器中,打开 wordcount.py 文件。
  6. run 函数中,检查流水线步骤:
    counts = (
            lines
            | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
            | 'PairWIthOne' >> beam.Map(lambda x: (x, 1))
            | 'GroupAndSum' >> beam.CombinePerKey(sum))

    split 后面的几行被拆分为字符串形式的字词。

  7. 如需对字符串进行小写处理,请修改 split 后面的行:
    counts = (
            lines
            | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
            | 'lowercase' >> beam.Map(str.lower)
            | 'PairWIthOne' >> beam.Map(lambda x: (x, 1))
            | 'GroupAndSum' >> beam.CombinePerKey(sum)) 
    此修改会将 str.lower 函数映射到每个字词上。这一行相当于 beam.Map(lambda word: str.lower(word))
  8. 保存该文件并运行修改后的 wordcount 作业:
    python wordcount.py --output outputs
  9. 查看修改后的流水线的结果:
    more outputs*
  10. 如需退出,请按 q

清理

为避免因本页中使用的资源导致您的 Google Cloud 帐号产生费用,请按照以下步骤操作。

  1. 在 Cloud Console 中,转到 Cloud Storage 浏览器页面。

    转到浏览器

  2. 点击要删除的存储分区对应的复选框。
  3. 如需删除存储分区,请点击删除,然后按照说明操作。

后续步骤

Apache Beam™ 是 Apache Software Foundation 或其关联公司在美国和/或其他国家/地区的商标。