开发和测试 Dataflow 流水线

本页面介绍了开发和测试 Dataflow 流水线的最佳实践。

概览

流水线代码的实现方式会对流水线在生产环境中的表现产生重大影响。为了帮助您创建正常运行且高效工作的流水线代码,本文档介绍了以下内容:

  • 流水线运行程序在开发和部署的不同阶段支持代码执行。
  • 部署环境可让您在开发、测试、预生产和生产过程中运行流水线。
  • 您可以按原样使用开放源代码流水线代码和模板,也可以用作新流水线的基础代码,从而加快代码开发速度。
  • 测试流水线代码的最佳做法。首先,本文档简要介绍了各种测试类型(例如单元测试、集成测试和端到端测试)的范围和关系。其次,详细了解每种测试,包括用于创建和集成测试数据的方法,以及每个测试要使用的流水线运行程序。

流水线运行程序

在开发和测试期间,您可以使用不同的 Apache Beam 运行程序来运行流水线代码。Apache Beam SDK 提供用于本地开发和测试的直接运行程序。您的发布自动化工具还可以使用 Direct Runner 进行单元测试和集成测试。例如,您可以在持续集成 (CI) 流水线中使用 Direct Runner。

部署到 Dataflow 的流水线使用 Dataflow 运行程序,并在生产环境中运行流水线。此外,您还可以使用 Dataflow Runner 进行临时开发测试以及端到端流水线测试。

虽然本页面重点介绍如何运行使用 Apache Beam Java SDK 构建的流水线,但 Dataflow 还支持使用 Python 和 Go 开发的 Apache Beam 流水线。Apache Beam Java、Python 和 Go SDK 已针对 Dataflow 正式发布。SQL 开发者还可以使用 Apache Beam SQL 来创建使用熟悉的 SQL 方言的流水线。

设置部署环境

如需在不同的开发阶段分隔用户、数据、代码和其他资源,请创建部署环境。如有可能,为了针对流水线开发的不同阶段提供隔离环境,请使用单独的 Google Cloud 项目

以下部分介绍了一组典型的部署环境。

本地环境

本地环境是开发者的工作站。在开发和快速测试中,您可以使用 Direct Runner 在本地运行流水线代码。

使用 Direct Runner 在本地运行的流水线可以与远程 Google Cloud 资源(如 Pub/Sub 主题或 BigQuery 表)进行交互。为各个开发者提供单独的 Google Cloud 项目,以便其可以使用沙盒通过 Google Cloud 服务进行临时测试。

某些 Google Cloud 服务(例如 Pub/SubBigtable)提供适用于本地开发的模拟器。您可以将这些模拟器与直接运行程序配合使用,以实现端到端本地开发和测试。

沙盒环境

沙盒环境是一个 Google Cloud 项目,可以让开发者在代码开发期间访问 Google Cloud 服务。流水线开发者可以与其他开发者共享 Google Cloud 项目,也可以使用自己的项目。使用单个项目可以降低与共享资源用量和配额管理相关的规划复杂性。

开发者使用沙盒环境通过 Dataflow 运行程序执行临时流水线执行。沙盒环境可用于在代码开发阶段针对生产运行程序调试和测试代码。例如,临时流水线执行可让开发者执行以下操作:

  • 观察代码更改对扩缩行为的影响。
  • 了解直接运行程序与 Dataflow 运行程序的行为之间的潜在差异。
  • 了解 Dataflow 如何应用图表优化

对于临时测试,开发者可以从本地环境部署代码,以在沙盒环境中运行 Dataflow。

预生产环境

预生产环境适用于在类似生产的环境(例如端到端测试)中运行的开发阶段。对预生产环境使用单独的项目,并尽可能将其配置为类似于生产环境。同样,如需允许具有类似生产规模的端到端测试,请将 Dataflow 和其他服务的 Google Cloud 项目配额设置为与生产环境尽可能类似。

根据您的需求,您可以将进一步将预生产分隔为多个环境。例如,质量控制环境可以支持质量分析师的工作,以测试不同工作负载条件下的服务等级目标 (SLO),例如数据正确性、新鲜度和性能。

端到端测试包括与测试范围内的数据源和接收器集成。请考虑如何在预生产环境中实现这些。您可以将测试数据存储在预生产环境中。例如,测试数据与输入数据一起存储在 Cloud Storage 存储桶中。在其他情况下,测试数据可能来自预生产环境外部,例如生产环境中的 Pub/Sub 主题(通过单独订阅)。对于流处理流水线,您还可以使用生成的数据(例如,使用 Dataflow Streaming Data Generator)运行端到端测试,以模拟类似生产的数据特征和数量。

对于流处理流水线,在对生产环境进行任何更改之前,请使用预生产环境来测试流水线更新。测试和验证流处理流水线的更新过程非常重要,特别是在需要协调多个步骤(例如,使用运行并行流水线时)时必须如此,以避免停机。

生产环境

生产环境是专门的 Google Cloud 项目。所有端到端测试都通过时,持续交付会将部署工件复制到生产环境。

开发最佳做法

请参阅 Dataflow 流水线最佳做法

测试流水线

在软件开发中,单元测试、集成测试和端到端测试是常见的软件测试类型。这些测试类型也适用于数据流水线。

Apache Beam SDK 提供了启用这些测试的功能。理想情况下,每种类型的测试都针对不同的部署环境。下图展示了单元测试、集成测试和端到端测试如何应用于流水线和数据的不同部分。

测试类型及其与转换、流水线、数据源和数据接收器的关系。

该图显示了不同测试的范围及其与转换(DoFnPTransform 子类)、流水线、数据源和数据接收器之间的关系。

以下部分介绍了各种正式软件测试如何应用于使用 Dataflow 的数据流水线。在您通读本部分时,请返回此图表,了解不同类型的测试之间的相关性。

数据抽样

如需在 Dataflow 流水线的每个步骤中观察数据,请在测试期间启用数据抽样。这样您就可以查看转换的输出,以确保输出正确无误。

单元测试

通过将这些转换的输出与经过验证的数据输入和输出集进行比较,单元测试可评估 DoFn 子类和复合转换PTransform 子类)的正确运行。通常,开发者可以在本地环境中运行这些测试。这些测试还可以使用构建环境中的持续集成 (CI) 通过单元测试自动功能,自动运行。

直接运行程序用于使用较小的参考测试子集来运行单元测试,重点是测试转换的业务逻辑。测试数据必须足够小,以适合运行测试的计算机上的本地内存。

Apache Beam SDK 提供了一个名为 TestPipeline 的 JUnit 规则,用于单元测试各个转换(DoFn 子类)、复合转换(PTransform 子类)以及整个流水线。您可以在 Apache Beam 流水线运行程序(例如,直接运行程序或 Dataflow 运行程序)上使用 TestPipeline,以便通过 PAssertPCollection 对象内容应用断言,如以下 JUnit 测试类代码段所示:

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void myPipelineTest() throws Exception {
  final PCollection<String> pcol = p.apply(...)
  PAssert.that(pcol).containsInAnyOrder(...);
  p.run();
}

单个转换的单元测试

通过将代码分解为可重复使用的转换(例如,作为顶级或静态嵌套类),您可以为流水线的不同部分创建目标测试。除了测试的优势之外,可重复使用的转换通过将流水线的业务逻辑封装成组件部分来促进代码的可维护性和可重用性。相比之下,如果流水线使用匿名内部类来实现转换,则测试各部分测试可能比较困难。

以下 Java 代码段展示了转换(作为匿名内部类)的实现,但不轻易允许测试。

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options)

PCollection<Integer> output =
    p.apply("Read from text", TextIO.Read.from(...))
        .apply("Split words", ParDo.of(new DoFn() {
          // Untestable anonymous transform 1
        }))
        .apply("Generate anagrams", ParDo.of(new DoFn() {
          // Untestable anonymous transform 2
        }))
        .apply("Count words", Count.perElement());

将上一示例与以下示例进行比较,其中匿名内部类已被重构为已命名的具体 DoFn 子类。您可以为构成端到端流水线的每个具体 DoFn 子类创建单独的单元测试。

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options)

PCollection<Integer> output =
    p.apply("Read from text", TextIO.Read.from(...))
        .apply("Split words", ParDo.of(new SplitIntoWordsFn()))
        .apply("Generate anagrams", ParDo.of(new GenerateAnagramsFn()))
        .apply("Count words", Count.perElement());

测试每个 DoFn 子类类似于对包含单个转换的批处理流水线进行单元测试。使用 Create 转换创建测试数据的 PCollection 对象,然后将其传递给 DoFn 对象。您可以使用 PAssert 断言 PCollection 对象的内容是否正确。以下 Java 代码示例使用 PAssert 类检查是否存在正确的输出表单。

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testGenerateAnagramsFn() {
    // Create the test input
    PCollection<String> words = p.apply(Create.of("friend"));

    // Test a single DoFn using the test input
    PCollection<String> anagrams =
        words.apply("Generate anagrams", ParDo.of(new GenerateAnagramsFn()));

    // Assert correct output from
    PAssert.that(anagrams).containsInAnyOrder(
        "finder", "friend", "redfin", "refind");

    p.run();
}

集成测试

集成测试可验证整个流水线的正常运行。 请考虑以下类型的集成测试:

  • 转换集成测试可评估构成数据流水线的所有单个转换的集成功能。您可以将转换集成测试视为整个流水线的单元测试,不包括与外部数据源和接收器的集成。Apache Beam SDK 提供了为数据流水线提供测试数据并验证处理结果的方法。直接运行程序用于运行转换集成测试。
  • 此系统集成测试可评估数据流水线与实时数据源和接收器的集成。要使流水线能够与外部系统通信,您需要配置测试以使用这些凭据来访问外部服务。流处理流水线是无限期运行的,因此您需要确定何时以及如何停止正在运行的流水线。通过使用直接运行程序运行系统集成测试,您可以快速验证流水线与其他系统之间的集成,而无需提交 Dataflow 作业并等待它完成。

您可以设计转换和系统集成测试,以便快速进行缺陷检测和反馈,而无需拖慢开发者工作效率。对于运行时间较长的测试(例如作为 Dataflow 作业运行的测试),建议您使用运行频率较低的端到端测试。

可以将数据流水线视为一个或多个相关转换。您可以为流水线创建封装复合转换,并使用 TestPipeline 对整个流水线执行集成测试。根据您是要在批量模式还是流式模式下测试流水线,请使用 CreateTestStream 转换。

使用测试数据进行集成测试

在生产环境中,您的流水线可能与不同的数据源和接收器集成。但是,对于单元测试和转换集成测试,请通过提供测试输入并直接验证输出,专注于验证流水线代码的业务逻辑。除了简化测试之外,此方法还可让您将流水线特有问题与可能被数据源和接收器引发的问题隔离开来。

测试批处理流水线

对于批处理流水线,您可以使用 Create 转换从标准内存集合(例如 Java List 对象)创建输入测试数据的 PCollection 对象。如果测试数据足够小可包含在代码中,则使用 Create 转换是合适的。然后,您可以对输出 PCollection 对象使用 PAssert 来确定流水线代码的正确性。直接运行程序和 Dataflow Runner 支持此方法。

以下 Java 代码段显示了复合转换中 PCollection 对象(包含构成流水线的部分或所有转换)的断言 (WeatherStatsPipeline)。该方法类似于流水线中单个转换的组合。

private class WeatherStatsPipeline extends
    PTransform<PCollection<Integer>, PCollection<WeatherSummary>> {
  @Override
  public PCollection<WeatherSummary> expand(PCollection<Integer> input) {
    // Pipeline transforms 
  }
}

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testWeatherPipeline() {
  // Create test input consisting of temperature readings
  PCollection<Integer> tempCelsius =
      p.apply(Create.of(24, 22, 20, 22, 21, 21, 20));

  // CalculateWeatherStats calculates the min, max, and average temperature
  PCollection<WeatherSummary> result =
      tempCelsius.apply("Calculate weather statistics", new WeatherStatsPipeline());

   // Assert correct output from CalculateWeatherStats
   PAssert.thatSingleton(result).isEqualTo(new WeatherSummary.Builder()
       .withAverageTemp(21)
       .withMaxTemp(24)
       .withMinTemp(20)
       .build());

   p.run();
}

要测试数据选取行为,您还可以使用 Create 转换来创建带有时间戳的元素,如以下代码段所示:

private static final Duration WINDOW_DURATION = Duration.standardMinutes(3);

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testWindowedData() {
    PCollection<String> input =
        p.apply(
            Create.timestamped(
                    TimestampedValue.of("a", new Instant(0L)),
                    TimestampedValue.of("a", new Instant(0L)),
                    TimestampedValue.of("b", new Instant(0L)),
                    TimestampedValue.of("c", new Instant(0L)),
                    TimestampedValue.of("c", new Instant(0L).plus(WINDOW_DURATION)))
                .withCoder(StringUtf8Coder.of()));

   PCollection<KV<String, Long>> windowedCount =
       input
           .apply(Window.into(FixedWindows.of(WINDOW_DURATION)))
           .apply(Count.perElement());

    PAssert.that(windowedCount)
        .containsInAnyOrder(
            // Output from first window
            KV.of("a", 2L),
            KV.of("b", 1L),
            KV.of("c", 1L),
            // Output from second window
            KV.of("c", 1L));

   p.run();
}

测试流式传输流水线

流式传输流水线包含假设数据,定义如何处理无界限数据。这些假设通常与真实情况中数据的时间线有关,因此假设证明的是正确还是错误会影响正确性。理想情况下,流式传输流水线的集成测试包括模拟流式数据到达的不确定性测试。

为了启用此类测试,Apache Beam SDK 提供了 TestStream 类来模拟元素计时对于数据流水现结果的效果(提前、及时或延迟数据)。您可以将这些测试与 PAssert 类结合使用,以验证预期的结果。

Direct Runner 和 Dataflow Runner 支持 TestStream。以下代码示例创建 TestStream 转换:

final Duration WINDOW_DURATION = Duration.standardMinutes(3);

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testDroppedLateData() {
   TestStream<String> input = TestStream.create(StringUtf8Coder.of())
      // Add elements arriving before the watermark
      .addElements(
         TimestampedValue.of("a", new Instant(0L)),
         TimestampedValue.of("a", new Instant(0L)),
         TimestampedValue.of("b", new Instant(0L)),
         TimestampedValue.of("c", new Instant(0L).plus(Duration.standardMinutes(3))))
         // Advance the watermark past the end of the window
      .advanceWatermarkTo(new Instant(0L).plus(WINDOW_DURATION).plus(Duration.standardMinutes(1)))
      // Add elements which will be dropped due to lateness
      .addElements(
         TimestampedValue.of("c", new Instant(0L)))
      // Advance the watermark to infinity which will close all windows
      .advanceWatermarkToInfinity();

      PCollection<KV<String, Long>> windowedCount =
          p.apply(input)
             .apply(Window.into(FixedWindows.of(WINDOW_DURATION)))
             .apply(Count.perElement());

   PAssert.that(windowedCount)
      .containsInAnyOrder(
          // Output from first window
          KV.of("a", 2L),
          KV.of("b", 1L),
          KV.of("c", 1L));

   p.run();
}

如需详细了解 TestStream,请参阅在 Apache Beam 中测试无界限流水线。如需详细了解如何使用 Apache Beam SDK 进行单元测试,请参阅 Apache Beam 文档

在集成测试中使用 Google Cloud 服务

Direct Runner 可以与 Google Cloud 服务集成,因此本地环境中的临时测试和系统集成测试可以根据需要使用 Pub/Sub、BigQuery 和其他服务。使用直接运行程序时,您的流水线将作为您使用 gcloud 命令行工具配置的用户账号,或以您使用 GOOGLE_APPLICATION_CREDENTIALS 环境变量指定的用户账号。因此,在运行流水线之前,您必须为此账号授予针对任何必需资源所需的足够权限。如需了解详情,请参阅 Dataflow 安全性和权限

对于完全本地集成测试,您可以针对某些 Google Cloud 服务使用本地模拟器。本地模拟器适用于 Pub/SubBigtable

对于流式传输流水线的系统集成测试,您可以使用 setBlockOnRun 方法(在 DirectOptions 界面中定义),让直接运行程序异步运行流水线。否则,流水线执行将阻止调用父级进程(例如,构建流水线中的脚本),直到手动停止流水线为止。如果您异步运行流水线,则可以使用返回的 PipelineResult 实例取消流水线的执行,如以下代码示例所示:

public interface StreamingIntegrationTestOptions extends
   DirectOptions, StreamingOptions, MyOtherPipelineOptions {
   ...
}

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testNonBlockingPipeline() {
    StreamingIntegrationTestOptions options =
        p.getOptions().as(StreamingIntegrationOptions.class);

    options.setBlockOnRun(false); // Set non-blocking pipeline execution
    options.setStreaming(true); // Set streaming mode

    p.apply(...); // Apply pipeline transformations

    PipelineResult result = p.run(); // Run the pipeline

    // Generate input, verify output, etc
    ...

    // Later on, cancel the pipeline using the previously returned
    result.cancel();
}

端到端测试

端到端测试在与生产环境非常相似的条件下,通过在 Dataflow Runner 上运行您的端到端流水线来验证其正确操作。这些测试会验证流水线逻辑是否使用 Dataflow 运行程序正常运行,以及测试流水线在生产级负载下是否按预期运行。您通常在已指定为预生产环境的专用 Google Cloud 项目中运行端到端测试。

如需在不同规模下测试流水线,请使用不同类型的端到端测试,例如:

  • 使用测试数据集的一小部分(例如 1%)运行小规模的端到端测试,以在预生产环境中快速验证流水线功能。
  • 使用完整测试数据集运行大规模的端到端测试,以验证类似生产数据卷和条件下的流水线功能。

对于流式处理流水线,如果生产流水线可以使用相同的数据,我们建议与生产流水线并行运行。此过程可让您比较结果和操作行为,例如自动扩缩和性能。

端到端测试可帮助预测流水线满足生产 SLO 的能力。预生产环境会在类似生产环境的条件下测试流水线。在端到端测试中,流水线使用 Dataflow Runner 运行,以处理与生产数据集匹配或非常相似的完整参考数据集。

可能无法生成用于测试的合成数据(可准确模拟真实数据)要解决此问题,一种方法是使用生产数据源中的清理提取来创建参考数据集,其中任何敏感数据均通过适当转换进行去标识化。我们建议使用敏感数据保护来实现此目的。敏感数据保护可以检测一系列内容类型和数据源中的敏感数据,并应用一系列去标识化方法,包括隐去、遮盖、保留加密格式和日期偏移。

批处理流水线和流式处理流水线的端到端测试之间的差异

在对大型测试数据集运行完整的端到端测试之前,您可能需要使用一小部分测试数据(例如 1%)运行测试,并验证可在短时间内收到预期行为。与使用直接运行程序进行集成测试一样,在使用 Dataflow Runner 运行流水线时,您可以在 PCollection 对象上使用 PAssert。如需详细了解 PAssert,请参阅本页面的单元测试部分。

根据您的用例,从端到端测试验证非常大的输出可能不现实且成本高昂,或者存在其他难题。在这种情况下,您可以改为从输出结果集中验证代表性样本。例如,您可以使用 BigQuery 对输出行进行采样,并将输出行与预期结果的参考数据集进行比较。

对于流式处理流水线,使用合成数据模拟实际流式传输条件可能并非易事。为端到端测试提供流式数据的常见方法是将测试与生产数据源集成。如果您使用 Pub/Sub 作为数据源,则可以通过对现有主题进行额外订阅,针对端到端测试启用单独的数据流。然后,您可以比较使用相同数据的不同流水线的结果,这对于验证其他生产流水线和生产流水线非常有用。

下图展示了生产流水线和测试流水线如何在不同的部署环境中并行运行。

使用单个 Pub/Sub 流式来源与生产流水线并行运行测试流水线。

在该图中,两个流水线从同一 Pub/Sub 主题读取数据,但它们使用单独的订阅。此设置可让这两个流水线单独处理相同的数据,还可以比较结果。测试流水线在生产项目中使用单独的服务账号,因此无需对生产项目使用 Pub/Sub 订阅者配额。

与批处理流水线不同,流式处理流水线在明确取消之前会继续运行。在端到端测试中,您需要决定是让流水线运行(可能在下一个端到端测试运行之前),还是在代表测试完成的点取消流水线,以便检查结果。

您使用的测试数据类型会影响此决定。例如,如果您使用提供给流式传输流水线的有界限测试数据,则当所有元素都处理完毕后,您可能会取消流水线。或者,如果您使用实际数据源(例如生产环境中使用的现有 Pub/Sub 主题),或者如果您连续生成测试数据,则可能需要使测试流水线继续运行。在后一种情况下,您可以比较生产环境中的行为,甚至可以比较其他测试流水线中的行为。