将自定义容器与 C++ 库搭配使用


在本教程中,您将创建一个流水线,该流水线使用自定义容器和 C++ 库来运行 Dataflow HPC 高并行工作流。在本教程中,您将了解如何使用 Dataflow 和 Apache Beam 运行网格计算应用,这些应用需要将数据分布到在许多核心上运行的函数。

本教程依次介绍如何使用直接运行程序和使用 Dataflow 运行程序运行流水线。 通过在本地运行流水线,您可以在部署流水线之前对其进行测试。

此示例使用 GMP 库中的 Cython 绑定和函数。 无论您使用哪个库或绑定工具,您都可以对流水线应用相同的原则。

可以在 GitHub 上找到示例代码。

目标

  • 创建使用自定义容器和 C++ 库的流水线。

  • 使用 Dockerfile 构建 Docker 容器映像。

  • 将代码和依赖项打包到 Docker 容器中。

  • 在本地运行流水线以进行测试。

  • 在分布式环境中运行流水线。

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

  • Artifact Registry
  • Cloud Build
  • Cloud Storage
  • Compute Engine
  • Dataflow

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

完成本文档中描述的任务后,您可以通过删除所创建的资源来避免继续计费。如需了解详情,请参阅清理

准备工作

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 安装 Google Cloud CLI。
  3. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  4. 创建或选择 Google Cloud 项目

    • 创建 Google Cloud 项目:

      gcloud projects create PROJECT_ID

      PROJECT_ID 替换为您要创建的 Google Cloud 项目的名称。

    • 选择您创建的 Google Cloud 项目:

      gcloud config set project PROJECT_ID

      PROJECT_ID 替换为您的 Google Cloud 项目 名称。

  5. 确保您的 Google Cloud 项目已启用结算功能

  6. Enable the Cloud Storage, Cloud Storage JSON, Compute Engine, Dataflow, Resource Manager, Artifact Registry, and Cloud Build APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com storage_component storage_api cloudresourcemanager.googleapis.com artifactregistry.googleapis.com cloudbuild.googleapis.com
  7. 为您的 Google 账号创建本地身份验证凭据:

    gcloud auth application-default login
  8. 向您的 Google 账号授予角色。对以下每个 IAM 角色运行以下命令一次: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID 替换为您的项目 ID。
    • EMAIL_ADDRESS 替换为您的电子邮件地址。
    • ROLE 替换为每个角色。
  9. 安装 Google Cloud CLI。
  10. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  11. 创建或选择 Google Cloud 项目

    • 创建 Google Cloud 项目:

      gcloud projects create PROJECT_ID

      PROJECT_ID 替换为您要创建的 Google Cloud 项目的名称。

    • 选择您创建的 Google Cloud 项目:

      gcloud config set project PROJECT_ID

      PROJECT_ID 替换为您的 Google Cloud 项目 名称。

  12. 确保您的 Google Cloud 项目已启用结算功能

  13. Enable the Cloud Storage, Cloud Storage JSON, Compute Engine, Dataflow, Resource Manager, Artifact Registry, and Cloud Build APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com storage_component storage_api cloudresourcemanager.googleapis.com artifactregistry.googleapis.com cloudbuild.googleapis.com
  14. 为您的 Google 账号创建本地身份验证凭据:

    gcloud auth application-default login
  15. 向您的 Google 账号授予角色。对以下每个 IAM 角色运行以下命令一次: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID 替换为您的项目 ID。
    • EMAIL_ADDRESS 替换为您的电子邮件地址。
    • ROLE 替换为每个角色。
  16. 为新流水线创建用户管理的工作器服务账号,并向服务账号授予必要的角色。

    1. 如需创建服务账号,请运行 gcloud iam service-accounts create 命令。

      gcloud iam service-accounts create parallelpipeline \
          --description="Highly parallel pipeline worker service account" \
          --display-name="Highly parallel data pipeline access"
    2. 向服务账号授予角色。对以下每个 IAM 角色运行以下命令一次:

      • roles/dataflow.admin
      • roles/dataflow.worker
      • roles/storage.objectAdmin
      • roles/artifactregistry.reader
      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:parallelpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

      SERVICE_ACCOUNT_ROLE 替换为每个角色。

    3. 为您的 Google 账号授予一个可让您为服务账号创建访问令牌的角色:

      gcloud iam service-accounts add-iam-policy-binding parallelpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator

下载代码示例并更改目录

下载代码示例,然后更改目录。 GitHub 代码库中的代码示例提供了运行此流水线所需的所有代码。当您准备好构建自己的流水线时,可以使用此示例代码作为模板。

克隆 beam-cp-example 代码库

  1. 使用 git clone 命令克隆 GitHub 代码库:

    git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
    
  2. 切换到应用目录:

    cd dataflow-sample-applications/beam-cpp-example
    

流水线代码

您可以自定义本教程中的流水线代码。此流水线完成了以下任务:

  • 动态生成输入范围内的所有整数。
  • 通过 C++ 函数运行整数并过滤无效值。
  • 将无效值写入辅助通道。
  • 计算每个停止时间的出现次数并对结果进行归一化。
  • 输出结果、设置结果格式并将其写入文本文件。
  • 创建包含一个元素的 PCollection
  • 使用 map 函数处理单个元素,并将频率 PCollection 作为辅助输入传递。
  • 处理 PCollection 并生成单个输出。

起始文件如下所示:

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import argparse
import logging
import os
import sys

def run(argv):
  # Import here to avoid __main__ session pickling issues.
  import io
  import itertools
  import matplotlib.pyplot as plt
  import collatz

  import apache_beam as beam
  from apache_beam.io import restriction_trackers
  from apache_beam.options.pipeline_options import PipelineOptions

  class RangeSdf(beam.DoFn, beam.RestrictionProvider):
    """An SDF producing all the integers in the input range.

    This is preferable to beam.Create(range(...)) as it produces the integers
    dynamically rather than materializing them up front.  It is an SDF to do
    so with perfect dynamic sharding.
    """
    def initial_restriction(self, desired_range):
      start, stop = desired_range
      return restriction_trackers.OffsetRange(start, stop)

    def restriction_size(self, _, restriction):
      return restriction.size()

    def create_tracker(self, restriction):
      return restriction_trackers.OffsetRestrictionTracker(restriction)

    def process(self, _, active_range=beam.DoFn.RestrictionParam()):
      for i in itertools.count(active_range.current_restriction().start):
        if active_range.try_claim(i):
          yield i
        else:
          break

  class GenerateIntegers(beam.PTransform):
    def __init__(self, start, stop):
      self._start = start
      self._stop = stop

    def expand(self, p):
      return (
          p
          | beam.Create([(self._start, self._stop + 1)])
          | beam.ParDo(RangeSdf()))

  parser = argparse.ArgumentParser()
  parser.add_argument('--start', dest='start', type=int, default=1)
  parser.add_argument('--stop', dest='stop', type=int, default=10000)
  parser.add_argument('--output', default='./out.png')

  known_args, pipeline_args = parser.parse_known_args(argv)
  # Store this as a local to avoid capturing the full known_args.
  output_path = known_args.output

  with beam.Pipeline(options=PipelineOptions(pipeline_args)) as p:

    # Generate the integers from start to stop (inclusive).
    integers = p | GenerateIntegers(known_args.start, known_args.stop)

    # Run them through our C++ function, filtering bad records.
    # Requires apache beam 2.34 or later.
    stopping_times, bad_values = (
        integers
        | beam.Map(collatz.total_stopping_time).with_exception_handling(
            use_subprocess=True))

    # Write the bad values to a side channel.
    bad_values | 'WriteBadValues' >> beam.io.WriteToText(
        os.path.splitext(output_path)[0] + '-bad.txt')

    # Count the occurrence of each stopping time and normalize.
    total = known_args.stop - known_args.start + 1
    frequencies = (
        stopping_times
        | 'Aggregate' >> (beam.Map(lambda x: (x, 1)) | beam.CombinePerKey(sum))
        | 'Normalize' >> beam.MapTuple(lambda x, count: (x, count / total)))

    if known_args.stop <= 10:
      # Print out the results for debugging.
      frequencies | beam.Map(print)
    else:
      # Format and write them to a text file.
      (
          frequencies
          | 'Format' >> beam.MapTuple(lambda count, freq: f'{count}, {freq}')
          | beam.io.WriteToText(os.path.splitext(output_path)[0] + '.txt'))

    # Define some helper functions.
    def make_scatter_plot(xy):
      x, y = zip(*xy)
      plt.plot(x, y, '.')
      png_bytes = io.BytesIO()
      plt.savefig(png_bytes, format='png')
      png_bytes.seek(0)
      return png_bytes.read()

    def write_to_path(path, content):
      """Most Beam IOs write multiple elements to some kind of a container
      file (e.g. strings to lines of a text file, avro records to an avro file,
      etc.)  This function writes each element to its own file, given by path.
      """
      # Write to a temporary path and to a rename for fault tolerence.
      tmp_path = path + '.tmp'
      fs = beam.io.filesystems.FileSystems.get_filesystem(path)
      with fs.create(tmp_path) as fout:
        fout.write(content)
      fs.rename([tmp_path], [path])

    (
        p
        # Create a PCollection with a single element.
        | 'CreateSingleton' >> beam.Create([None])
        # Process the single element with a Map function, passing the frequency
        # PCollection as a side input.
        # This will cause the normally distributed frequency PCollection to be
        # colocated and processed as a single unit, producing a single output.
        | 'MakePlot' >> beam.Map(
            lambda _,
            data: make_scatter_plot(data),
            data=beam.pvalue.AsList(frequencies))
        # Pair this with the desired filename.
        |
        'PairWithFilename' >> beam.Map(lambda content: (output_path, content))
        # And actually write it out, using MapTuple to split the tuple into args.
        | 'WriteToOutput' >> beam.MapTuple(write_to_path))

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run(sys.argv)

设置开发环境

  1. 使用 Python 版 Apache Beam SDK

  2. 安装 GMP 库:

    apt-get install libgmp3-dev
    
  3. 如需安装依赖项,请使用 requirements.txt 文件。

    pip install -r requirements.txt
    
  4. 如需构建 Python 绑定,请运行以下命令。

    python setup.py build_ext --inplace
    

您可以在本教程中自定义 requirements.txt 文件。起始文件包含以下依赖项:

#
#    Licensed to the Apache Software Foundation (ASF) under one or more
#    contributor license agreements.  See the NOTICE file distributed with
#    this work for additional information regarding copyright ownership.
#    The ASF licenses this file to You under the Apache License, Version 2.0
#    (the "License"); you may not use this file except in compliance with
#    the License.  You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

apache-beam[gcp]==2.46.0
cython==0.29.24
pyparsing==2.4.2
matplotlib==3.4.3

在本地运行流水线

在本地运行流水线对于测试非常有用。通过在本地运行流水线,您可以在将流水线部署到分布式环境之前确认该流水线按预期运行和工作。

您可以使用以下命令在本地运行流水线。 此命令会输出名为 out.png 的映像。

python pipeline.py

创建 Google Cloud 资源

本部分介绍如何创建以下资源:

  • 用作临时存储位置和输出位置的 Cloud Storage 存储桶。
  • 用于打包流水线代码和依赖项的 Docker 容器。

创建 Cloud Storage 存储桶

首先使用 Google Cloud CLI 创建 Cloud Storage 存储桶。此存储桶由 Dataflow 流水线用作临时存储位置。

如需创建存储桶,请使用 gcloud storage buckets create 命令

gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION

替换以下内容:

  • BUCKET_NAME:符合存储桶命名要求的 Cloud Storage 存储桶的名称。Cloud Storage 存储桶名称必须是全局唯一的。
  • LOCATION:存储桶的位置

创建和构建容器映像

您可以在本教程中自定义 Dockerfile。 起始文件如下所示:

FROM apache/beam_python3.9_sdk:2.46.0

# Install a C++ library.
RUN apt-get update
RUN apt-get install -y libgmp3-dev

# Install Python dependencies.
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt

# Install the code and some python bindings.
COPY pipeline.py pipeline.py
COPY collatz.pyx collatz.pyx
COPY setup.py setup.py
RUN python setup.py install

此 Dockerfile 包含 FROMCOPYRUN 命令,您可以在 Dockerfile 参考 中阅读相关信息。

  1. 如需上传工件,请创建一个 Artifact Registry 代码库。每个代码库可以包含一种受支持格式的工件。

    所有代码库内容都已使用 Google 管理的加密密钥或客户管理的加密密钥进行加密。Artifact Registry 默认使用 Google 管理的加密密钥,此选项无需进行任何配置。

    您必须至少具有代码库的 Artifact Registry Writer 权限

    运行以下命令创建新代码库。该命令使用 --async 标志并立即返回,无需等待正在进行的操作完成。

    gcloud artifacts repositories create REPOSITORY \
       --repository-format=docker \
       --location=LOCATION \
       --async
    

    REPOSITORY 替换为您的代码库的名称。对于项目中的每个代码库位置,代码库名称不得重复。

  2. 创建 Dockerfile。

    如果要使软件包成为 Beam 容器的一部分,您必须在 requirements.txt 文件中指定这些软件包。切勿在 requirements.txt 文件中指定 apache-beam。Apache Beam 容器已经有 apache-beam

  3. 请先配置 Docker 以对 Artifact Registry 的请求进行身份验证,然后再推送或拉取映像。如需为 Docker 代码库设置身份验证,请运行以下命令:

    gcloud auth configure-docker LOCATION-docker.pkg.dev
    

    该命令将更新您的 Docker 配置。现在,您可以在 Google Cloud 项目中与 Artifact Registry 连接以推送映像。

  4. 结合使用 Dockerfile 和 Cloud Build 来构建 Docker 映像。

    更新以下命令中的路径,以匹配您创建的 Dockerfile。此命令会构建文件并将其推送到您的 Artifact Registry 代码库。

    gcloud builds submit --tag LOCATION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/cpp_beam_container:latest .
    

将代码和依赖项打包到 Docker 容器中

  1. 如需在分布式环境中运行此流水线,请将代码和依赖项打包到 Docker 容器中。

    docker build . -t cpp_beam_container
    
  2. 打包代码和依赖项后,您可以在本地运行流水线以进行测试。

    python pipeline.py \
       --runner=PortableRunner \
       --job_endpoint=embed \
       --environment_type=DOCKER \
       --environment_config="docker.io/library/cpp_beam_container"
    

    此命令会在 Docker 映像中写入输出。如需查看输出,请使用 --output 运行流水线,并将输出写入 Cloud Storage 存储桶。例如,运行以下命令。

    python pipeline.py \
       --runner=PortableRunner \
       --job_endpoint=embed \
       --environment_type=DOCKER \
       --environment_config="docker.io/library/cpp_beam_container" \
       --output=gs://BUCKET_NAME/out.png
    

运行流水线

现在,您可以通过引用包含流水线代码的文件并传递流水线所需的参数,在 Dataflow 中运行 Apache Beam 流水线。

在您的 shell 或终端中,使用 Dataflow Runner 运行流水线。

python pipeline.py \
    --runner=DataflowRunner \
    --project=PROJECT_ID \
    --region=REGION \
    --temp_location=gs://BUCKET_NAME/tmp \
    --sdk_container_image="LOCATION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/cpp_beam_container:latest" \
    --experiment=use_runner_v2 \
    --output=gs://BUCKET_NAME/out.png

在您执行运行流水线的命令后,Dataflow 将返回作业状态为已加入队列的作业 ID。可能几分钟后作业状态才会变为正在运行,您才可以访问作业图

查看结果

查看写入 Cloud Storage 存储桶的数据。使用 gcloud storage ls 命令列出存储桶顶层的内容:

gcloud storage ls gs://BUCKET_NAME

如果成功,该命令将返回类似于以下内容的消息:

gs://BUCKET_NAME/out.png

清理

为避免因本教程中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。

删除项目

为了避免产生费用,最简单的方法是删除您为本教程创建的 Google Cloud 项目。

  1. 在 Google Cloud 控制台中,进入管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

逐个删除资源

如果您希望重复使用该项目,请删除为本教程创建的资源。

清理 Google Cloud 项目资源

  1. 删除 Artifact Registry 代码库。

    gcloud artifacts repositories delete REPOSITORY \
       --location=LOCATION --async
    
  2. 删除 Cloud Storage 存储桶。单独这一个存储桶不会产生任何费用。

    gcloud storage rm gs://BUCKET_NAME --recursive
    

撤销凭据

  1. 撤消您授予用户管理的工作器服务账号的角色。对以下每个 IAM 角色运行以下命令一次:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    • roles/artifactregistry.reader
    gcloud projects remove-iam-policy-binding PROJECT_ID \
      --member=serviceAccount:parallelpipeline@PROJECT_ID.iam.gserviceaccount.com \
      --role=SERVICE_ACCOUNT_ROLE
  2. 可选:撤消您创建的身份验证凭据,并删除本地凭据文件。

    gcloud auth application-default revoke
  3. 可选:从 gcloud CLI 撤消凭据。

    gcloud auth revoke

后续步骤