高度并行的工作流的最佳做法

本页面提供了有关构建和运行 Dataflow HPC 高度并行工作流的最佳做法,包括如何在流水线中使用外部代码、如何运行流水线以及如何管理错误处理。

在流水线中包含外部代码

高度并行流水线的一个主要区别特征在于,它们在 DoFn 中使用 C++ 代码,而不是标准 Apache Beam SDK 语言之一。对于 Java 流水线,为了更轻松地在流水线中使用 C++ 库,建议您使用外部过程调用。本部分介绍在 Java 流水线中运行外部 (C++) 代码的常规方法。

Apache Beam 流水线定义有几个关键组成部分:

  • PCollections 是同构元素的不可变集合。
  • PTransforms 用于定义对生成其他 PCollectionPCollection 的转换。
  • 流水线是一种结构,可让您通过代码声明 PTransformsPCollections 之间的互动。流水线表现形式为有向无环图 (DAG)

当您使用非标准 Apache Beam SDK 语言的语言时,请将代码放在 PTransform 内的 DoFn 中,并使用一种标准 SDK 语言来定义流水线本身。 我们建议使用 Apache Beam Python SDK 来定义流水线,因为 Python SDK 具有一个实用程序类,可简化其他代码的使用。但是,您可以使用其他 Apache Beam SDK。

您可以使用该代码进行快速实验,而无需完整构建。对于生产系统,您通常会创建自己的二进制文件,以便根据需要自由调整处理流程。

下图演示了流水线数据的两种用法:

  • 使用数据驱动处理流程。
  • 在处理期间获取数据并将其合并到驱动程序数据中。

流水线数据的两个阶段

在此页面中,主要数据(来自来源)被称为驱动数据,而次要数据(来自处理阶段)被称为合并数据

在金融使用场景中,驱动数据可能是几十万笔交易。 每笔交易都需要连同市场数据一起处理。在这种情况下,市场数据是合并数据。在媒体使用场景中,驱动数据可能是需要处理但不需要其他数据来源的图片文件,因此不使用合并数据。

驱动数据的大小考虑因素

如果驱动数据元素的大小处于较小的 MB 级范围内,请使用常规 Beam 范式(即从来源创建 PCollection 对象并将该对象发送到 Apache Beam 转换进行处理)来处理该元素。

如果要处理的驱动数据元素的大小处于较大的 MB 级或 GB 级(如典型的媒体数据),则可以将驱动数据存入 Cloud Storage。然后在起始 PCollection 对象中,引用存储空间 URI,并且仅使用引用该数据的 URI。

合并数据的大小考虑因素

如果合并数据为几百 MB 或更少,请使用辅助输入将此数据加入 Apache Beam 转换。辅助输入将数据包发送给需要它的每个工作器。

如果合并数据处于 GB 或 TB 级范围内,请使用 Bigtable 或 Cloud Storage 将合并数据与驱动数据结合,使用的具体工具取决于数据的性质。例如,Bigtable 非常适用于金融使用场景,其中市场数据通常作为 Bigtable 的键值查询进行访问。如需详细了解如何设计 Bigtable 架构,包括处理时间序列数据的建议,请参阅以下 Bigtable 文档:

运行外部代码

您可以通过多种方式在 Apache Beam 中运行外部代码。

  • 创建一个从 Dataflow 转换中的 DoFn 对象调用的进程。

  • JNI 与 Java SDK 结合使用。

  • 直接从 DoFn 对象创建子进程。虽然这种方法不是最高效的,但易于实现且非常可靠。由于使用 JNI 存在潜在问题,因此本页面演示了如何使用子进程调用。

在设计工作流时,请考虑完整的端到端流水线。由于数据从来源一直到接收器的所有移动均通过单个流水线完成,因此弥补了运行此流程的方法的任何低效之处。如要将此方法与其他方法进行比较,请查看流水线的端到端时间以及端到端费用。

将二进制文件拉入主机

当您使用原生 Apache Beam 语言时,Apache Beam SDK 会自动将所有需要的代码移动到工作器。 但是,当您调用外部代码时,需要手动移动代码。

存储在存储分区中的二进制文件

如需移动代码,请执行以下操作。该示例演示了 Apache Beam Java SDK 的步骤。

  1. 将已编译的外部代码以及版本控制信息存储在 Cloud Storage 中。
  2. @Setup 方法中,创建一个同步块以检查代码文件在本地资源上是否可用。您可以在第一个线程完成时使用静态变量确认其可用性,而无需执行物理检查。
  3. 如果该文件不可用,请使用 Cloud Storage 客户端库将该文件从 Cloud Storage 存储分区拉取到本地工作器。建议使用 Apache Beam FileSystems 类来执行此任务。
  4. 移动文件后,确认在代码文件上设置了执行位。
  5. 在生产系统中,检查二进制文件的哈希值以确保该文件已正确复制。

您也可以使用 Apache Beam filesToStage 函数,但它不具备运行程序能够自动封装和移动 Java 代码等优点。此外,由于对子进程的调用需要绝对文件位置,您需要使用代码来确定类路径,从而确定 filesToStage 移动的文件的位置。我们不推荐使用这种方法。

运行外部二进制文件

如需运行外部代码,您需要为它构建一个封装容器。使用与外部代码相同的语言(例如,C++)或 shell 脚本编写此封装容器。封装容器允许您传递文件句柄并实现优化,如本页面的设计小 CPU 周期的处理流程部分中所述。 请注意,封装容器不需要太复杂。 以下代码段简要展示了以 C++ 编写的封装容器。

int main(int argc, char* argv[])
{
    if(argc < 3){
        std::cerr << "Required return file and data to process" << '\n';
        return 1;
    }

    std::string returnFile = argv[1];
    std::string word = argv[2];

    std::ofstream myfile;
    myfile.open (returnFile);
    myfile << word;
    myfile.close();
    return 0;
}

此代码从参数列表中读取两个参数。第一个参数是推送数据所在的返回文件的位置。第二个参数是代码回显给用户的数据。(在实际实现中,这段代码不仅仅回显“Hello, world”!)

在编写完封装容器代码之后,请通过执行以下操作来运行外部代码:

  1. 将数据传输给外部代码二进制文件。
  2. 运行二进制文件,捕获其中的错误,并记录错误和结果。
  3. 处理日志记录信息。
  4. 从完成的处理中捕获数据。

将数据传输给二进制文件

如需开始运行库,请将数据传输给 C++ 代码。在该步骤中,您可以充分利用 Dataflow 与其他 Google Cloud 工具的集成。例如,像 Bigtable 这样的工具可以处理非常大的数据集以及短延迟、高并发的访问,这支持数千个核心同时访问该数据集。此外,Bigtable 可以预处理数据,进行数据构形、丰富和过滤。在运行外部代码之前,所有这些工作都可以在 Apache Beam 转换中完成。

对于生产系统,建议使用协议缓冲区来封装输入数据。您可以将输入数据转换为字节并对其进行 base64 编码,然后再将其传递到外部库。将此数据传递给外部库的两种方法如下所示:

  • 小型输入数据。对于不超过系统的命令参数最大长度的小型数据,将该值传递给使用 java.lang.ProcessBuilder 构建的进程的第 2 位参数。
  • 大型输入数据。对于较大的数据,创建一个名称包含 UUID 的文件,以便包含进程所需的数据。

运行 C++ 代码,捕获错误并进行日志记录

捕获和处理错误信息是流水线的关键部分。 这是因为 Dataflow 运行程序使用的资源是临时资源,并且通常很难检查工作器日志文件,所以您必须确保捕获所有有用的信息并将其推送到 Dataflow 运行程序日志记录,然后将日志记录数据存储在一个或多个 Cloud Storage 存储分区中。

建议将 stdoutstderr 重定向到文件,这样可以避免内存不足。例如,在调用 C++ 代码的 Dataflow 运行程序中,您可以添加如下几行:

Java

  import java.lang.ProcessBuilder.Redirect;
  ...
      processbuilder.redirectError(Redirect.appendTo(errfile));
      processbuilder.redirectOutput(Redirect.appendTo(outFile));

Python

# Requires Apache Beam 2.34 or later.
stopping_times, bad_values = (
    integers
    | beam.Map(collatz.total_stopping_time).with_exception_handling(
        use_subprocess=True))

# Write the bad values to a side channel.
bad_values | 'WriteBadValues' >> beam.io.WriteToText(
    os.path.splitext(output_path)[0] + '-bad.txt')

处理日志记录信息

许多使用场景需要处理上百万个元素。成功处理所生成的日志价值较低或者没有价值,因此您必须制定有关保留日志数据的业务决策。例如,考虑下列替代方案以轻松保留所有日志数据:

  • 如果成功的元素处理过程生成的日志中包含的信息没有价值,请不要保留。
  • 创建对日志数据进行采样的逻辑,例如仅对每 10000 个日志条目进行一次采样。如果处理是同质的(例如代码的许多次迭代生成基本相同的日志数据),则该方法可在保留日志数据和优化处理之间提供有效平衡。

在出现故障的情况下,转储到日志的数据量可能很大。对于处理这种大量错误日志数据的有效策略是读取日志条目的前几行,并将这些行推送到 Cloud Logging,您可以将日志文件的其余部分加载到 Cloud Storage 存储桶中。 这种方法让您可以稍后查看错误日志的前几行,然后根据需要查看 Cloud Storage 中的整个文件。

检查日志文件的大小也很有用。如果文件大小为零,则可以安全地忽略它,或者记录一则表明此文件没有数据的简单日志消息。

从完成的处理中捕获数据

不建议您使用 stdout 将计算结果传递回 DoFn 函数。这是因为您的 C++ 代码调用的其他代码,甚至您自己的代码,均可能向 stdout 发送消息,导致包含日志记录数据的 stdoutput 流受到影响。更好的做法是对 C++ 封装容器代码进行更改,以允许代码接受一个参数,指明在何处创建存储该值的文件。理想情况下,此文件应使用协议缓冲区以与语言无关的方式存储,这样一来,C++ 代码就可以将对象传递回 Java 或 Python 代码。DoFn 对象可以直接从文件中读取结果,并将结果信息传递给自己的 output 调用。

经验表明了运行针对该进程本身的单元测试的重要性。实现一个与 Dataflow 流水线无关、仅仅运行该外部进程的单元测试非常重要。独立并且无需运行整个流水线的单元测试,可以更有效地对库进行调试。

针对小 CPU 周期进行设计处理

调用子进程会产生开销。根据您的工作负载,您可能需要做额外的工作来降低正在进行的工作与启动和关闭进程的管理开销之间的比率。

在媒体使用场景中,驱动数据元素的大小可能处于较大的 MB 级或 GB 级。因此,每个数据元素的处理可能需要很长时间。 在这种情况下,与总处理时间相比,调用子进程的费用可以忽略不计。 因此,最好的方法是让单个元素启动自己的进程。

但是,在其他使用场景(例如金融)中,处理流程需要的 CPU 时间单位非常小(几十毫秒)。在这种情况下,调用子进程的开销就会大得不成比例。此问题的解决方案是使用 Apache Beam 的 GroupByKey 转换创建 50 至 100 个元素的批次,然后将这些批次送往进程。例如,您可以按照以下步骤操作:

  • DoFn 函数中,创建键值对。如果您正在处理金融交易,则可以使用交易号作为键。或者,如果您没有唯一的数字能够用作键,则可以根据数据生成校验和,并使用取模函数创建包含 50 个元素的分区。
  • 将键发送给 GroupByKey.create 函数,该函数会返回一个 KV<key,Iterable<data>> 集合,其中包含您随后可以发送给进程的 50 个元素。

限制工作器的并行性

当您使用 Dataflow 运行程序中原生支持的语言时,您永远无需担心工作器的运行状况。这是因为 Dataflow 有许多进程负责监控流控制以及批量和流式模式的线程。

但是,如果您使用的是 C++ 之类的外部语言,应清楚启动子进程是一个有些不同寻常的做法。与流式模式相比,在批量模式下,Dataflow 运行程序使用的工作线程占用 CPU 的比率较小。建议您在类中创建一个信号量,以便更直接地控制单个工作器的并行性,特别是在流式模式下。

例如,对于媒体处理,您可能不希望单个工作器并行处理数百个转码元素。在这些情况下,您可以创建一个实用程序类,为正在执行的工作提供 DoFn 函数的许可。使用此类可以直接控制流水线中的工作器线程。

在 Google Cloud 中使用高容量数据接收器

数据处理完后,会被发送到数据接收器,因此接收器需要能够处理由网格处理解决方案创建的大量结果。

下图展示了 Dataflow 运行网格工作负载时 Google Cloud 中可用的一些接收器。

Google Cloud 中可用的接收器

Bigtable、BigQuery 和 Pub/Sub 均可以处理庞大的数据流。例如,每个 Bigtable 节点每秒可以处理 10000 次插入,最大大小为 1K,并且可轻松实现横向扩缩。因此,具有 100 个节点的 Bigtable 集群每秒可处理由 Dataflow 网格生成的 1000000 条消息。

管理分段

在流水线中使用 C++ 代码时,您需要决定如何管理分段,因为如果处理不当,它们会出现非局部化影响。Dataflow 运行程序会根据需要采用 Java、Python 或 Go 创建进程,然后以软件包的形式将工作分配给进程。

如果使用紧密耦合的工具(如 JNI 或 Cython)以及 C++ 进程片段故障调用 C++ 代码,则调用进程和 Java 虚拟机 JVM)也会崩溃。在这种情况下,无法捕获错误数据点。为使错误数据点可捕获,请使用松散耦合,从而分离错误数据并允许流水线继续。但是,通过使用已针对所有数据变体进行全面测试的成熟 C++ 代码,您可以使用 Cython 等机制。

后续步骤