Python 使用快速入门

本页面介绍如何设置 Python 开发环境,获取 Python 版 Apache Beam SDK,以及运行和修改示例流水线。

准备工作

  1. 登录您的 Google 帐号。

    如果您还没有 Google 帐号,请注册新帐号

  2. 在 Cloud Console 的项目选择器页面上,选择或创建 Cloud 项目。

    转到项目选择器页面

  3. 确保您的 Google Cloud 项目已启用结算功能。 了解如何确认您的项目已启用结算功能

  4. 启用 Cloud Dataflow、Compute Engine、Stackdriver Logging、Cloud Storage、Cloud Storage JSON、BigQuery、Cloud Pub/Sub、Cloud Datastore 和 Cloud Resource Manager API。

    启用 API

  5. 设置身份验证:
    1. 在 Cloud Console 中,转到创建服务帐号密钥页面。

      转到“创建服务帐号密钥”页面
    2. 服务帐号列表中,选择新的服务帐号
    3. 服务帐号名称字段中,输入一个名称。
    4. 角色列表中,选择项目 > 所有者

      注意角色字段授权您的服务帐号资源访问权限。您稍后可以使用 Cloud Console 查看和更改此字段。如果您开发的是正式版应用,请指定比项目 > Owner 更为精细的权限。如需了解详情,请参阅为服务帐号授予角色
    5. 点击创建。包含密钥的 JSON 文件就会下载到计算机。
  6. 将环境变量 GOOGLE_APPLICATION_CREDENTIALS 设置为包含服务帐号密钥的 JSON 文件的路径。此变量仅适用于当前的 shell 会话,因此,如果您打开新的会话,请重新设置该变量。

  7. 创建 Cloud Storage 存储分区:
    1. 在 Cloud Console 中,转到 Cloud Storage 浏览器页面。

      转到“Cloud Storage 浏览器”页面

    2. 点击创建存储分区
    3. 创建存储分区对话框中,指定以下属性:
      • 名称:唯一的存储分区名称。请勿在存储分区名称中添加敏感信息,因为存储分区命名空间属于全局性质,并会公开显示。
      • 默认存储类别Standard
      • 存储分区数据存储的位置。
    4. 点击创建

设置环境

  1. 将 Python 版 Apache Beam SDKpip 和 Python 2.7、3.5、3.6 或 3.7 版搭配使用。请运行以下命令来检查您是否具有正常运行的 Python 以及是否已安装 pip
    python --version
        python -m pip --version
    如果您没有 Python,请在安装 Python 页面上查找适用于您的操作系统的安装步骤。
  2. 从 2020 年 10 月 7 日起,Dataflow 将停止支持使用 Python 2 的流水线。如需了解详情,请访问 Google Cloud 上的 Python 2 支持页面。

  3. 为初始实验安装 Python 虚拟环境。如果您没有 virtualenv 13.1.0 或更高版本,请在安装和使用 virtualenv 工具页面上查找适用于您的操作系统的安装步骤。

    运行以下命令以设置并激活新的虚拟环境:

    python -m virtualenv env
        source env/bin/activate
    在本快速入门中使用虚拟环境。完成快速入门后,您可以通过运行 deactivate 来停用虚拟环境。
设置 Python 开发环境页面上,详细了解如何在 Google Cloud 上使用 Python。

注意:为获得最佳结果,请使用 Apache Beam 2.16.0 或更高版本启动 Python 3 流水线。如需查看 Apache Beam 中的 Python 3 近期改进摘要,请参阅 Apache Beam 问题跟踪器

获取 Apache Beam SDK

Apache Beam SDK 是一个用于数据流水线的开源编程模型。您可以使用 Apache Beam 程序定义这些流水线,并且可以选择运行程序(如 Dataflow)来执行流水线。

通过在虚拟环境中运行以下命令,安装 Python 版 Apache Beam SDK 的最新版本:

pip install apache-beam[gcp]
    

在本地运行 WordCount

WordCount 示例演示了一个会执行以下步骤的流水线:
  1. 接收一个文本文件作为输入。
  2. 将每一行解析为字词。
  3. 对标记化字词进行词频计数。

使用以下命令,在本地机器上运行 apache_beam 软件包中的 wordcount 模块:

python -m apache_beam.examples.wordcount \
      --output outputs
此文本文件位于 Cloud Storage 存储分区中,其资源名称为 gs://dataflow-samples/shakespeare/kinglear.txt。要查看输出,请运行以下命令:
more outputs*

要退出,请按 Q 键。

通过在本地执行流水线,您可以测试和调试 Apache Beam 程序。 您可以在 Apache Beam GitHub 上查看 wordcount.py 源代码。

在 Dataflow 服务上运行 WordCount

通过在 runner 字段中指定 DataflowRunner 并选择要在其中执行流水线的区域,您可以在 Dataflow 服务上运行 apache_beam 软件包中的 wordcount 模块。

首先,定义 PROJECT、BUCKET 和 REGION 变量:

    PROJECT=PROJECT_ID
    BUCKET=GCS_BUCKET
    REGION=DATAFLOW_REGION
通过运行以下命令来运行此流水线:
    python -m apache_beam.examples.wordcount \
      --region $REGION \
      --input gs://dataflow-samples/shakespeare/kinglear.txt \
      --output gs://$BUCKET/wordcount/outputs \
      --runner DataflowRunner \
      --project $PROJECT \
      --temp_location gs://$BUCKET/tmp/

使用 GCP 查看结果

使用 Dataflow 运行流水线时,您的结果位于 Cloud Storage 存储分区中。

您可以通过终端使用 gsutil 工具查看结果。

通过运行以下命令来列出输出文件:

    gsutil ls -lh "gs://$BUCKET/wordcount/outputs*"  
通过运行以下命令来查看这些文件中的结果:
    gsutil cat "gs://$BUCKET/wordcount/outputs*"

要通过监控界面查看结果,请执行以下操作:
  1. 打开 Dataflow 监控界面。
    转到 Dataflow 网页界面

    您应该看到 wordcount 作业的状态最初为正在运行,然后变为成功

    状态为“成功”的 Cloud Dataflow WordCount 作业。
  2. 在 Google Cloud Console 中打开 Cloud Storage 浏览器。
    转到 Cloud Storage 浏览器

    wordcount 目录中,您应该会看到作业创建的输出文件:

    结果目录,其中包含 WordCount 作业的输出文件。

修改流水线代码

上述示例中的 WordCount 流水线区分大写和小写字词。 下面将演示如何修改流水线,以便 WordCount 流水线不区分大小写。
  1. 从 Apache Beam GitHub 代码库下载 WordCount 代码的最新副本。
  2. 在本地机器上运行流水线:
    python wordcount.py --output outputs
  3. 通过运行以下命令来查看结果:
    more outputs*
    要退出,请按 Q 键。
  4. 使用您所选的编辑器打开 wordcount.py 文件。
  5. 检查 run 函数中的流水线步骤。split 下面的几行被拆分为字符串形式的字词。
    counts = (lines
                  | 'split' >> (beam.ParDo(WordExtractingDoFn())
                                .with_output_types(unicode))
                  | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
                  | 'group' >> beam.GroupByKey()
                  | 'count' >> beam.Map(count_ones))
  6. 修改 split 下面的一行,将这些字符串改为小写形式。
    counts = (lines
                  | 'split' >> (beam.ParDo(WordExtractingDoFn())
                                .with_output_types(unicode))
                  | 'lowercase' >> beam.Map(unicode.lower)     # Add this line to modify the pipeline
                  | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
                  | 'group' >> beam.GroupByKey()
                  | 'count' >> beam.Map(count_ones)) 
    此修改会将 str.lower 函数映射到每个字词上。这一行相当于 beam.Map(lambda word: str.lower(word))
  7. 在本地机器上保存该文件并运行修改后的 WordCount 作业:
    python wordcount.py --output outputs
  8. 通过运行以下命令来查看修改后流水线的结果:
    more outputs*
    要退出,请按 Q 键。

清理

为避免系统因本快速入门中使用的资源向您的 Google Cloud 帐号收取费用,请按照以下步骤操作。

  1. 在 Cloud Console 中,转到 Cloud Storage 浏览器页面。

    转到“Cloud Storage 浏览器”页面

  2. 点击要删除的存储分区对应的复选框。
  3. 如需删除存储分区,请点击删除

后续步骤

Apache Beam™ 是 Apache Software Foundation 或其关联公司在美国和/或其他国家/地区的商标。