Develop and test Dataflow pipelines

Stay organized with collections Save and categorize content based on your preferences.

This page provides best practices for developing and testing your Dataflow pipeline.

Overview

The way that the code for your pipeline is implemented has a significant influence on how well the pipeline performs in production. To help you create pipeline code that works correctly and efficiently, this document explains the following:

  • Pipeline runners to support code execution in the different stages of development and deployment.
  • Deployment environments that let you run pipelines during development, testing, preproduction, and production.
  • Open source pipeline code and templates that you can use as is, or as the basis for new pipelines to accelerate code development.
  • Coding best practices for developing your pipeline to improve pipeline observability and performance. Many of these practices are applicable to programming using the Apache Beam SDK (the examples use Java) and are not specific to Dataflow. However, in many cases, Dataflow provides features that complement these coding practices for improved production readiness.
  • A best-practices approach for testing pipeline code. First, this document provides an overview that includes the scope and relationship of different test types, such as unit tests, integration tests, and end-to-end tests. Second, each type of test is explored in detail, including methods to create and integrate with test data, and which pipeline runners to use for each test.

Pipeline runners

During development and testing, you use different Apache Beam runners to run pipeline code. The Apache Beam SDK provides a Direct Runner for local development and testing. Your release automation tooling can also use the Direct Runner for unit tests and integration tests. For example, you can use the Direct Runner within your continuous integration (CI) pipeline.

Pipelines that are deployed to Dataflow use the Dataflow Runner, which runs your pipeline in production-like environments. Additionally, you can use the Dataflow Runner for ad hoc development testing and for end-to-end pipeline tests.

Although this page focuses on running pipelines built using the Apache Beam Java SDK, Dataflow also supports Apache Beam pipelines that were developed using Python and Go. The Apache Beam Java, Python, and Go SDKs are generally available for Dataflow. SQL developers can also use Apache Beam SQL to create pipelines that use familiar SQL dialects.

Set up a deployment environment

To separate users, data, code, and other resources across different stages of development, create deployment environments. When possible, to provide isolated environments for the different stages of pipeline development, use separate Google Cloud projects.

The following sections describe a typical set of deployment environments.

Local environment

The local environment is a developer's workstation. For development and rapid testing, use the Direct Runner to run pipeline code locally.

Pipelines that are run locally using the Direct Runner can interact with remote Google Cloud resources, such as Pub/Sub topics or BigQuery tables. Give individual developers separate Cloud projects so that they have a sandbox for ad hoc testing with Google Cloud services.

Some Google Cloud services, such as Pub/Sub and Cloud Bigtable, provide emulators for local development. You can use these emulators with the Direct Runner to enable end-to-end local development and testing.

Sandbox environment

The sandbox environment is a Google Cloud project that provides developers with access to Google Cloud services during code development. Pipeline developers can share a Google Cloud project with other developers, or use their own individual projects. Using individual projects reduces planning complexity relating to shared resource usage and quota management.

Developers use the sandbox environment to perform ad hoc pipeline execution with the Dataflow Runner. The sandbox environment is useful for debugging and testing code against a production runner during the code development phase. For example, ad hoc pipeline execution lets developers do the following:

  • Observe the effect of code changes on scaling behavior.
  • Understand potential differences between the behavior of the Direct Runner and the Dataflow Runner.
  • Understand how Dataflow applies graph optimizations.

For ad hoc testing, developers can deploy code from their local environment in order to run Dataflow within their sandbox environment.

Preproduction environment

The preproduction environment is for development phases that need to run in production-like conditions, such as end-to-end testing. Use a separate project for the preproduction environment and configure it to be as similar to production as possible. Similarly, to allow end-to-end tests with production-like scale, make Cloud project quotas for Dataflow and other services as similar as possible to the production environment.

Depending on your requirements, you can further separate preproduction into multiple environments. For example, a quality control environment can support the work of quality analysts to test service level objectives (SLOs) such as data correctness, freshness, and performance under different workload conditions.

End-to-end tests include integration with data sources and sinks within the scope of testing. Consider how to make these available within the preproduction environment. You can store test data in the preproduction environment itself. For example, test data is stored in a Cloud Storage bucket with your input data. In other cases, test data might originate from outside the preproduction environment, such as a Pub/Sub topic through a separate subscription that's in the production environment. For streaming pipelines, you can also run end-to-end tests using generated data, for example, using the Dataflow Streaming Data Generator to emulate production-like data characteristics and volumes.

For streaming pipelines, use the preproduction environment to test pipeline updates before any changes are made to production. It's important to test and verify update procedures for streaming pipelines, particularly if you need to coordinate multiple steps, such as when running parallel pipelines to avoid downtime.

Production environment

The production environment is a dedicated Cloud project. Continuous delivery copies deployment artifacts to the production environment when all end-to-end tests have passed.

Development best practices

This section discusses coding and development best practices. Many of these practices complement and enhance aspects of pipeline development and operationalization, such as improving developer productivity, promoting pipeline testability, increasing performance, and enabling deeper insights with monitoring.

Before you begin development, set up deployment environments that support your development, testing, and delivery lifecycle.

Use Google-provided templates

To accelerate pipeline development, check to see if Google provides an existing Dataflow template. Some templates allow you to add custom logic as a pipeline step. For example, the Pub/Sub topic to BigQuery template provides a parameter to run a JavaScript user-defined function (UDF) that's stored in Cloud Storage. Google-provided templates are open source under the Apache License 2.0, so you can use them as the basis for new pipelines. The templates are also useful as code examples for reference.

Create libraries of reusable transforms

The Apache Beam programming model unifies batch and streaming data processing, which makes it possible to reuse transforms. Creating a shared library of common transforms promotes reusability, testability, and code ownership by different teams.

Consider the following two Java code examples, which both read payment events. The first one is from an unbounded Pub/Sub source:

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options)

// Initial read transform
PCollection<PaymentEvent> payments = 
    p.apply("Read from topic", 
        PubSubIO.readStrings().withTimestampAttribute(...).fromTopic(...))
        .apply("Parse strings into payment events",
            ParDo.of(new ParsePaymentEventFn()));

The second one is from a bounded relational database source:

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options);

PCollection<PaymentEvent> payments = 
    p.apply(
        "Read from database table", 
        JdbcIO.<PaymentEvent>read()
            .withDataSourceConfiguration(...)
            .withQuery(...)
            .withRowMapper(new RowMapper<PaymentEvent>() {
              ...
            }));

Assuming that both pipelines perform the same processing, they can use the same transforms through a shared library for the remaining processing steps. How you implement code reusability best practices varies by programming language and build tool. For example, if you use Maven, you can separate transform code into its own module. You can then include the module as a submodule in larger multi-module projects for different pipelines, as shown in the following code example:

// Reuse transforms across both pipelines
payments
    .apply("ValidatePayments", new PaymentTransforms.ValidatePayments(...))
    .apply("ProcessPayments", new PaymentTransforms.ProcessPayments(...))
  ...

For more information, see the Apache Beam documentation for best practices on writing user code for Apache Beam transforms and for the recommended style guide for PTransforms.

Use dead-letter queues for error handling

Your pipeline might encounter situations where it's not possible to process elements. This situation can occur for different reasons, but a common cause is data issues. For example, an element that contains badly formatted JSON can cause parsing failures.

In this situation, one approach is to catch an exception within the DoFn.ProcessElement method. In your exception block, you might log the error and drop the element. However, this causes the data to be lost and prevents the data from being inspected later for manual handling or troubleshooting.

A better approach is to use a pattern called a dead-letter queue (or dead-letter file). Catch exceptions in the DoFn.ProcessElement method and log errors as you normally would. Instead of dropping the failed element, use branching outputs to write failed elements into a separate PCollection object. These elements are then written to a data sink for later inspection and handling by using a separate transform.

The following Java code example shows how to implement the dead-letter queue pattern:

TupleTag<Output> successTag = new TupleTag<>() {};
TupleTag<Input> deadLetterTag = new TupleTag<>() {};

PCollection<Input> input = /* ... */;

PCollectionTuple outputTuple =
    input.apply(ParDo.of(new DoFn<Input, Output>() {
      @Override
      void processElement(ProcessContext c) {
        try {
          c.output(process(c.element()));
        } catch (Exception e) {
          LOG.severe("Failed to process input {} -- adding to dead-letter file",
              c.element(), e);
          c.sideOutput(deadLetterTag, c.element());
        }
      }).withOutputTags(successTag, TupleTagList.of(deadLetterTag)));

// Write the dead-letter inputs to a BigQuery table for later analysis
outputTuple.get(deadLetterTag)
    .apply(BigQueryIO.write(...));

// Retrieve the successful elements...
PCollection<Output> success = outputTuple.get(successTag);
// and continue processing ...

You can use Cloud Monitoring to apply different monitoring and alerting policies for your pipeline's dead-letter queue. For example, you can visualize the number and size of elements processed by your dead-letter transform and configure alerting to trigger if certain threshold conditions are met.

Handle schema mutations

You can handle data that has unexpected but valid schemas using a dead-letter pattern, which writes failed elements to a separate PCollection object. However, in some cases, you might want to automatically handle elements that reflect a mutated schema as valid elements. For example, if an element's schema reflects a mutation like the addition of new fields, you can adapt the schema of the data sink to accommodate mutations.

Automatic schema mutation relies on the branching-output approach used by the dead-letter pattern. However, in this case it triggers a transform that mutates the destination schema whenever additive schemas are encountered. For an example of this approach, see How to handle mutating JSON schemas in a streaming pipeline, with Square Enix on the Google Cloud blog.

Choose correctly between side inputs or CoGroupByKey for joins

Joining datasets is a common use case for data pipelines. Side inputs provide a flexible way to solve common data processing problems, such as data enrichment and keyed lookups. Unlike PCollection objects, side inputs are also mutable, and they can be determined at runtime. For example, the values in a side input might be computed by another branch in your pipeline or determined by calling a remote service.

Dataflow supports side inputs by persisting data into persistent storage (similar to a shared disk), which makes the complete side input available to all workers. Side input sizes can be very large and might not fit into worker memory. Reading from a large side input can cause performance issues if workers need to constantly read from persistent storage.

The CoGroupByKey transform is a core Apache Beam transform that merges (flattens) multiple PCollection objects and groups elements that have a common key. Unlike a side input, which makes the entire side input data available to each worker, CoGroupByKey performs a shuffle (grouping) operation to distribute data across workers. CoGroupByKey is therefore ideal when the PCollection objects you want to join are very large and don't fit into worker memory.

Follow these guidelines to help decide whether to use side inputs or CoGroupByKey:

  • Use side inputs when one of the PCollection objects you are joining is disproportionately smaller than the other, and where the smaller PCollection object fits into worker memory. Caching the side input entirely into memory makes it fast and efficient to fetch elements.
  • Use CoGroupByKey if you need to fetch a large proportion of a PCollection object that significantly exceeds worker memory.
  • Use side inputs when you have a PCollection object that should be joined multiple times in your pipeline. Instead of using multiple CoGroupByKey transforms, you can create a single side input that can be reused by multiple ParDo transforms.

For more information, see Troubleshoot Dataflow out of memory errors.

Minimize expensive per-element operations

A DoFn instance processes batches of elements called bundles, which are atomic units of work that consist of zero or more elements. Individual elements are then processed by the DoFn.ProcessElement method, which runs for every element. Because the DoFn.ProcessElement method is called for every element, any time-consuming or computationally expensive operations that are invoked by that method cause these operations to be run for every single element processed by the method.

If you need to perform costly operations only once for a batch of elements, include those operations in the DoFn.Setup and DoFn.StartBundle methods instead of in DoFn.ProcessElement. Examples include the following:

  • Parsing a configuration file that controls some aspect of the DoFn instance's behavior. Only invoke this action one time, when the DoFn instance is initialized by using the DoFn.Setup method.

  • Instantiating a short-lived client that is reused across all elements in a bundle, such as when all elements in the bundle are sent over a single network connection. Invoke this action one time per bundle by using the DoFn.StartBundle method.

Limit batch sizes and concurrent calls to external services

When you call external services, you can reduce per-call overheads by using the GroupIntoBatches transform to create batches of elements of a specified size. Batching sends elements to an external service as one payload instead of individually.

In combination with batching, you can limit the maximum number of parallel (concurrent) calls to the external service by choosing appropriate keys to partition the incoming data. The number of partitions determines the maximum parallelization. For example, if every element is given the same key, a downstream transform for calling the external service does not run in parallel.

Consider one of the following approaches to produce keys for elements:

  • Choose an attribute of the dataset to use as data keys, such as user IDs.
  • Generate data keys to split elements randomly over a fixed number of partitions, where the number of possible key values determines the number of partitions. You need to create enough partitions for parallelism, and each partition needs to have enough elements for GroupIntoBatches to be useful.

The following Java code example shows how to randomly split elements over 10 partitions:

// PII or classified data which needs redaction.
PCollection<String> sensitiveData = ...;

int numPartitions = 10; // Number of parallel batches to create.
PCollection<KV<Long, Iterable<String>>> batchedData =
    sensitiveData
        .apply("Assign data into partitions",
            ParDo.of(new DoFn<String, KV<Long, String>>() {
              Random random = new Random();

              @ProcessElement
              public void assignRandomPartition(ProcessContext context) {
                context.output(
                  KV.of(randomPartitionNumber(), context.element()));
              }
              private static int randomPartitionNumber() {
                return random.nextInt(numPartitions);
              }
            }))
        .apply("Create batches of sensitive data",
            GroupIntoBatches.<Long, String>ofSize(100L));

// Use batched sensitive data to fully utilize Redaction API
// (which has a rate limit but allows large payloads)
batchedData
    .apply("Call Redaction API in batches", callRedactionApiOnBatch());

Identify performance issues caused by inappropriately fused steps

Dataflow builds a graph of steps that represents your pipeline, based on the transforms and data that you used to construct it. This graph is called the pipeline execution graph.

When you deploy your pipeline, Dataflow might modify your pipeline's execution graph to improve performance. For example, Dataflow might fuse some operations together, a process known as fusion optimization, to avoid the performance and cost impact of writing every intermediate PCollection object in your pipeline.

In some cases, Dataflow might incorrectly determine the optimal way to fuse operations in the pipeline, which can limit the Dataflow service's ability to make use of all available workers. In those cases, you might want to prevent some operations from being fused.

Consider the following example Apache Beam code. A GenerateSequence transform creates a small bounded PCollection object, which is then further processed by two downstream ParDo transforms.

import com.google.common.math.LongMath;
...

public class FusedStepsPipeline {

  final class FindLowerPrimesFn extends DoFn<Long, String> {
    @ProcessElement
    public void processElement(ProcessContext c) {
      Long n = c.element();
      if (n > 1) {
        for (long i = 2; i < n; i++) {
          if (LongMath.isPrime(i)) {
            c.output(Long.toString(i));
          }
        }
      }
    }
  }

  public static void main(String[] args) {
    Pipeline p = Pipeline.create(options);

    PCollection<Long> sequence = p.apply("Generate Sequence",
        GenerateSequence
            .from(0)
            .to(1000000));

    // Pipeline branch 1
    sequence.apply("Find Primes Less-than-N",
        ParDo.of(new FindLowerPrimesFn()));

    // Pipeline branch 2
    sequence.apply("Increment Number", 
        MapElements.via(new SimpleFunction<Long, Long>() {
          public Long apply(Long n) {
            return ++n;
          }
        }));

    p.run().waitUntilFinish();
  }
}

The Find Primes Less-than-N transform might be computationally expensive and is likely to run slowly for large numbers. In contrast, you would expect the Increment Number transform to complete quickly.

The following diagram shows a graphical representation of the pipeline in the Dataflow monitoring interface.

Representation of pipeline flow in the Dataflow interface.

Monitoring the job using the Dataflow monitoring interface shows the same slow rate of processing for both transforms, namely 13 elements per second. You might expect the Increment Number transform to process elements quickly, but instead it appears to be tied to the same rate of processing as Find Primes Less-than-N.

The reason is that Dataflow fused the steps into a single stage, which prevents them from running independently. You can use the following gcloud command:

gcloud dataflow jobs describe --full job-id --format json

In the resulting output, the fused steps are described in the ExecutionStageSummary object in the ComponentTransform array:

...

    "executionPipelineStage": [
      {
        "componentSource": [
          ...
        ],
        "componentTransform": [
          {
            "name": "s1",
            "originalTransform": "Generate Sequence/Read(BoundedCountingSource)",
            "userName": "Generate Sequence/Read(BoundedCountingSource)"
          },
          {
            "name": "s2",
            "originalTransform": "Find Primes Less-than-N",
            "userName": "Find Primes Less-than-N"
          },
          {
            "name": "s3",
            "originalTransform": "Increment Number/Map",
            "userName": "Increment Number/Map"
          }
        ],
        "id": "S01",
        "kind": "PAR_DO_KIND",
        "name": "F0"
      }

...

In this scenario, the Find Primes Less-than-N transform is the slow step, so breaking the fusion before that step is an appropriate strategy. One method to unfuse steps is to insert a GroupByKey transform and ungroup before the step, as shown in the following Java code example:

sequence
    .apply("Map Elements", MapElements.via(new SimpleFunction<Long, KV<Long, Void>>() {
      public KV<Long, Void> apply(Long n) {
        return KV.of(n, null);
      }
    }))
    .apply("Group By Key", GroupByKey.<Long, Void>create())
    .apply("Emit Keys", Keys.<Long>create())
    .apply("Find Primes Less-than-N", ParDo.of(new FindLowerPrimesFn()));

You can also combine these unfusing steps into a reusable composite transform.

After you unfuse the steps, when you run the pipeline, you see that Increment Number completes in a matter of seconds, and the much longer-running Find Primes Less-than-N transform executes in a separate stage.

This example applies a group and ungroup operation to unfuse steps. You can use other approaches for other circumstances. In this case, handling duplicate output is not an issue, given the consecutive output of the GenerateSequence transform. KV objects with duplicate keys are deduplicated to a single key in the group (GroupByKey) and ungroup (Keys) transforms. To retain duplicates after the group and ungroup operations, create KV pairs by using a random key and the original input as the value, group using the random key, and then emit the values for each key as the output.

Use Apache Beam metrics to collect pipeline insights

Apache Beam metrics is a utility class for producing various metrics for reporting the properties of a running pipeline. When you use Cloud Monitoring, Apache Beam metrics are available as Cloud Monitoring custom metrics.

The following Java snippet is an example of Counter metrics used in a DoFn subclass.

final TupleTag<String> errorTag = new TupleTag<String>(){};
final TupleTag<MockObject> successTag = new TupleTag<MockObject>(){};

final class ParseEventFn extends DoFn<String, MyObject> {

  private final Counter malformedCounter = Metrics.counter(ParseEventFn.class, "malformedJson");
  private final Counter emptyCounter = Metrics.counter(ParseEventFn.class, "emptyPayload");
  private Gson gsonParser;

  @Setup
  public setup() {
    gsonParser = new Gson();
  }

  @ProcessElement
  public void processElement(ProcessContext c) {
    try {
      MyObject myObj = gsonParser.fromJson(c.element(), MyObject.class);
      if (myObj.getPayload() != null) {
        //  Output the element if non-empty payload
        c.output(successTag, myObj);
      }
      else {
        // Increment empty payload counter
        emptyCounter.inc();
      }
    }
    catch (JsonParseException e) {
      // Increment malformed JSON counter
      malformedCounter.inc();
      // Output the element to dead-letter queue
      c.output(errorTag, c.element());
    }
  }
}

The example code uses two counters. One counter tracks JSON parsing failures (malformedCounter), and the other counter tracks whether the JSON message is valid but contains an empty payload (emptyCounter). In Cloud Monitoring, the custom metric names are custom.googleapis.com/dataflow/malformedJson and custom.googleapis.com/dataflow/emptyPayload. You can use the custom metrics to create visualizations and alerting policies in Cloud Monitoring.

Test your pipeline

In software development, unit tests, integration tests, and end-to-end tests are common types of software testing. These testing types are also applicable to data pipelines.

The Apache Beam SDK provides functionality to enable these tests. Ideally, each type of test targets a different deployment environment. The following diagram illustrates how unit tests, integration tests, and end-to-end tests apply to different parts of your pipeline and data.

Test types and how they relate to transforms, pipelines, data sources, and data sinks.

The diagram shows the scope of different tests and how they relate to transforms (DoFn and PTransform subclasses), pipelines, data sources, and data sinks.

The following sections describe how various formal software tests apply to data pipelines using Dataflow. As you read through this section, refer back to the diagram to understand how the different types of tests are related.

Unit tests

Unit tests assess the correct functioning of DoFn subclasses and composite transforms (PTransform subclasses) by comparing the output of those transforms with a verified set of data inputs and outputs. Typically, developers can run these tests in the local environment. The tests can also run automatically through unit-test automation using continuous integration (CI) in the build environment.

The Direct Runner runs unit tests using a smaller subset of reference test data that focuses on testing the business logic of your transforms. The test data must be small enough to fit into local memory on the machine that runs the test.

The Apache Beam SDK provides a JUnit rule called TestPipeline for unit-testing individual transforms (DoFn subclasses), composite transforms (PTransform subclasses), and entire pipelines. You can use TestPipeline on a Apache Beam pipeline runner such as the Direct Runner or the Dataflow Runner to apply assertions on the contents of PCollection objects using PAssert, as shown in the following code snippet of a JUnit test class:

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void myPipelineTest() throws Exception {
  final PCollection<String> pcol = p.apply(...)
  PAssert.that(pcol).containsInAnyOrder(...);
  p.run();
}

Unit tests for individual transforms

By factoring your code into reusable transforms, for example, as top-level or static nested classes, you can create targeted tests for different parts of your pipeline. Aside from the benefits of testing, reusable transforms enhance code maintainability and reusability by naturally encapsulating the business logic of your pipeline into component parts. In contrast, testing individual parts of your pipeline might be difficult if the pipeline uses anonymous inner classes to implement transforms.

The following Java snippet shows the implementation of transforms as anonymous inner classes, which doesn't easily allow testing.

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options)

PCollection<Integer> output = 
    p.apply("Read from text", TextIO.Read.from(...))
        .apply("Split words", ParDo.of(new DoFn() {
          // Untestable anonymous transform 1
        }))
        .apply("Generate anagrams", ParDo.of(new DoFn() {
          // Untestable anonymous transform 2
        }))
        .apply("Count words", Count.perElement());

Compare the previous example with the following one, where the anonymous inner classes are refactored into named concrete DoFn subclasses. You can create individual unit tests for each concrete DoFn subclass that makes up the end-to-end pipeline.

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options)

PCollection<Integer> output = 
    p.apply("Read from text", TextIO.Read.from(...))
        .apply("Split words", ParDo.of(new SplitIntoWordsFn()))
        .apply("Generate anagrams", ParDo.of(new GenerateAnagramsFn()))
        .apply("Count words", Count.perElement());

Testing each DoFn subclass is similar to unit testing a batch pipeline that contains a single transform. Use the Create transform to create a PCollection object of test data, and then pass it to the DoFn object. Use PAssert to assert that the contents of the PCollection object are correct. The following Java code example uses the PAssert class to check for correct output form.

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testGenerateAnagramsFn() {
    // Create the test input
    PCollection<String> words = p.apply(Create.of("friend"));

    // Test a single DoFn using the test input
    PCollection<String> anagrams = 
        words.apply("Generate anagrams", ParDo.of(new GenerateAnagramsFn()));

    // Assert correct output from
    PAssert.that(anagrams).containsInAnyOrder(
        "finder", "friend", "redfin", "refind");

    p.run();
}

Integration tests

Integration tests verify the correct functioning of your entire pipeline. Consider the following types of integration tests:

  • A transform integration test that assesses the integrated functionality of all the individual transforms that make up your data pipeline. Think of transform integration tests as a unit test for your entire pipeline, excluding integration with external data sources and sinks. The Apache Beam SDK provides methods to supply test data to your data pipeline and to verify the results of processing. The Direct Runner is used to run transform integration tests.
  • A system integration test that assesses your data pipeline's integration with live data sources and sinks. For your pipeline to communicate with external systems, you need to configure your tests with appropriate credentials to access external services. Streaming pipelines run indefinitely, so you need to decide when and how to stop the running pipeline. By using the Direct Runner to run system integration tests, you quickly verify the integration between your pipeline and other systems without needing to submit a Dataflow job and wait for it to finish.

Design transform and system integration tests to provide rapid defect detection and feedback without slowing developer productivity. For longer-running tests, such as those that run as Dataflow jobs, you might want to use an end-to-end test that runs less frequently.

Think of a data pipeline as one or more related transforms. You can create an encapsulating composite transform for your pipeline and use TestPipeline to perform an integration test of your entire pipeline. Depending on whether you want to test the pipeline in batch or streaming mode, you supply test data using either the Create or TestStream transforms.

Use test data for integration tests

In your production environment, your pipeline likely integrates with different data sources and sinks. However, for unit tests and transformation integration tests, focus on verifying the business logic of your pipeline code by providing test inputs and verifying the output directly. In addition to simplifying your tests, this approach allows you to isolate pipeline-specific issues from those that might be caused by data sources and sinks.

Test batch pipelines

For batch pipelines, use the Create transform to create a PCollection object of your input test data out of a standard in-memory collection, such as a Java List object. Using the Create transform is appropriate if your test data is small enough to include in code. You can then use PAssert on the output PCollection objects to determine the correctness of your pipeline code. This approach is supported by the Direct Runner and by the Dataflow Runner.

The following Java code snippet shows assertions against output PCollection objects from a composite transform that includes some or all of the individual transforms that constitute a pipeline (WeatherStatsPipeline). The approach is similar to unit-testing individual transforms in a pipeline.

private class WeatherStatsPipeline extends
    PTransform<PCollection<Integer>, PCollection<WeatherSummary>> {
  @Override
  public PCollection<WeatherSummary> expand(PCollection<Integer> input) {
    // Pipeline transforms …
  }
}

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testWeatherPipeline() {
  // Create test input consisting of temperature readings
  PCollection<Integer> tempCelsius =
      p.apply(Create.of(24, 22, 20, 22, 21, 21, 20));

  // CalculateWeatherStats calculates the min, max, and average temperature
  PCollection<WeatherSummary> result =
      tempCelsius.apply("Calculate weather statistics", new WeatherStatsPipeline());

   // Assert correct output from CalculateWeatherStats
   PAssert.thatSingleton(result).isEqualTo(new WeatherSummary.Builder()
       .withAverageTemp(21)
       .withMaxTemp(24)
       .withMinTemp(20)
       .build());

   p.run();
}

To test windowing behavior, you can also use the Create transform to create elements with timestamps, as shown in the following code snippet:

private static final Duration WINDOW_DURATION = Duration.standardMinutes(3);

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testWindowedData() {
    PCollection<String> input =
        p.apply(
            Create.timestamped(
                    TimestampedValue.of("a", new Instant(0L)),
                    TimestampedValue.of("a", new Instant(0L)),
                    TimestampedValue.of("b", new Instant(0L)),
                    TimestampedValue.of("c", new Instant(0L)),
                    TimestampedValue.of("c", new Instant(0L).plus(WINDOW_DURATION)))
                .withCoder(StringUtf8Coder.of()));

   PCollection<KV<String, Long>> windowedCount =
       input
           .apply(Window.into(FixedWindows.of(WINDOW_DURATION)))
           .apply(Count.perElement());

    PAssert.that(windowedCount)
        .containsInAnyOrder(
            // Output from first window
            KV.of("a", 2L),
            KV.of("b", 1L),
            KV.of("c", 1L),
            // Output from second window
            KV.of("c", 1L));

   p.run();
}

Test streaming pipelines

Streaming pipelines contain assumptions that define how to handle unbounded data. These assumptions are often about the timeliness of data in real-world conditions, and therefore have an impact on correctness depending on whether the assumptions prove to be true or false. Integration tests for streaming pipelines ideally include tests that simulate the non-deterministic nature of streaming data arrival.

To enable such tests, the Apache Beam SDK provides the TestStream class to model the effects of element timings (early, on-time, or late data) on the results of your data pipeline. Use these tests together with the PAssert class to verify against expected results.

TestStream is supported by the Direct Runner and the Dataflow Runner. The following code sample creates a TestStream transform:

final Duration WINDOW_DURATION = Duration.standardMinutes(3);

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testDroppedLateData() {
   TestStream<String> input = TestStream.create(StringUtf8Coder.of())
      // Add elements arriving before the watermark
      .addElements(
         TimestampedValue.of("a", new Instant(0L)),
         TimestampedValue.of("a", new Instant(0L)),
         TimestampedValue.of("b", new Instant(0L)),
         TimestampedValue.of("c", new Instant(0L).plus(Duration.standardMinutes(3))))
         // Advance the watermark past the end of the window
      .advanceWatermarkTo(new Instant(0L).plus(WINDOW_DURATION).plus(Duration.standardMinutes(1)))
      // Add elements which will be dropped due to lateness
      .addElements(
         TimestampedValue.of("c", new Instant(0L)))
      // Advance the watermark to infinity which will close all windows
      .advanceWatermarkToInfinity();

      PCollection<KV<String, Long>> windowedCount = 
          p.apply(input)
             .apply(Window.into(FixedWindows.of(WINDOW_DURATION)))
             .apply(Count.perElement());

   PAssert.that(windowedCount)
      .containsInAnyOrder(
          // Output from first window
          KV.of("a", 2L),
          KV.of("b", 1L),
          KV.of("c", 1L));

   p.run();
}

For more information about TestStream, see Testing Unbounded Pipelines in Apache Beam. For more information about how to use the Apache Beam SDK for unit testing, see the Apache Beam documentation.

Use Google Cloud services in integration tests

The Direct Runner can integrate with Google Cloud services, so ad hoc tests in the local environment and system integration tests can use Pub/Sub, BigQuery, and other services as needed. When you use the Direct Runner, your pipeline runs as the user account that you configured by using the gcloud command-line tool or as a service account that you specified using the GOOGLE_APPLICATION_CREDENTIALS environment variable. Therefore, you must grant sufficient permissions to this account for any required resources before you run your pipeline. For more details, see Dataflow security and permissions.

For entirely local integration tests, you can use local emulators for some Google Cloud services. Local emulators are available for Pub/Sub and Bigtable.

For system integration testing of streaming pipelines, you can use the setBlockOnRun method (defined in the DirectOptions interface) to have the Direct Runner run your pipeline asynchronously. Otherwise, pipeline execution blocks the calling parent process (for example, a script in your build pipeline) until the pipeline is manually stopped. If you run the pipeline asynchronously, you can use the returned PipelineResult instance to cancel execution of the pipeline, as shown in the following code example:

public interface StreamingIntegrationTestOptions extends
   DirectOptions, StreamingOptions, MyOtherPipelineOptions {
   ...
}

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testNonBlockingPipeline() {
    StreamingIntegrationTestOptions options =
        p.getOptions().as(StreamingIntegrationOptions.class);

    options.setBlockOnRun(false); // Set non-blocking pipeline execution
    options.setStreaming(true); // Set streaming mode

    p.apply(...); // Apply pipeline transformations

    PipelineResult result = p.run(); // Run the pipeline

    // Generate input, verify output, etc
    ...

    // Later on, cancel the pipeline using the previously returned 
    result.cancel();
}

End-to-end tests

End-to-end tests verify the correct operation of your end-to-end pipeline by running it on the Dataflow Runner under conditions that closely resemble production. The tests verify that the business logic functions correctly using the Dataflow Runner and test whether the pipeline performs as expected under production-like loads. You typically run end-to-end tests in a dedicated Google Cloud project that is designated as the preproduction environment.

To test your pipeline at different scales, use different types of end-to-end tests, for example:

  • Run small-scale end-to-end tests using a small proportion (such as one percent) of your test dataset to quickly validate pipeline functionality in the preproduction environment.
  • Run large-scale end-to-end tests using a full test dataset to validate pipeline functionality under production-like data volumes and conditions.

For streaming pipelines, we recommend that you run test pipelines in parallel with your production pipeline if they can use the same data. This process lets you compare results and operational behavior, such as autoscaling and performance.

End-to-end tests help to predict how well your pipeline will meet your production SLOs. The preproduction environment tests your pipeline under production-like conditions. Within end-to-end tests, pipelines run using the Dataflow Runner to process complete reference datasets that match or closely resemble datasets in production.

It might not be possible to generate synthetic data for testing that accurately simulates real data. To address this problem, one approach is to use cleansed extracts from production data sources to create reference datasets, in which any sensitive data is de-identified through appropriate transformations. We recommend using Cloud Data Loss Prevention (DLP) for this purpose. Cloud DLP can detect sensitive data from a range of content types and data sources and apply a range of de-identification techniques including redaction, masking, format preserving encryption, and date-shifting.

Differences in end-to-end tests for batch and streaming pipelines

Before you run a full end-to-end test against a large test dataset, you might want to run a test with a smaller percentage of the test data (such as one percent) and verify expected behavior in a shorter amount of time. Like with integration tests using the Direct Runner, you can use PAssert on PCollection objects when you run pipelines using the Dataflow Runner. For more information about PAssert, see the Unit tests section on this page.

Depending on your use case, verifying very large output from end-to-end tests might be impractical, costly, or otherwise challenging. In that case, you can verify representative samples from the output result set instead. For example, you can use BigQuery to sample and compare output rows with a reference dataset of expected results.

For streaming pipelines, simulating realistic streaming conditions with synthetic data might be challenging. A common way to provide streaming data for end-to-end tests is to integrate testing with production data sources. If you're using Pub/Sub as a data source, you can enable a separate datastream for end-to-end tests through additional subscriptions to existing topics. You can then compare the results of different pipelines that consume the same data, which is useful for verifying candidate pipelines against other preproduction and production pipelines.

The following diagram shows how this method allows a production pipeline and test pipeline to run in parallel in different deployment environments.

Running a test pipeline in parallel with a production pipeline using a single Pub/Sub streaming source.

In the diagram, both pipelines read from the same Pub/Sub topic, but they use separate subscriptions. This setup allows the two pipelines to process the same data independently and allows you to compare the results. The test pipeline uses a separate service account from the production project, and therefore avoids using the Pub/Sub subscriber quota for the production project.

Unlike batch pipelines, streaming pipelines continue to run until they are explicitly cancelled. In end-to-end tests, you need to decide whether to leave the pipeline running, perhaps until the next end-to-end test is run, or cancel the pipeline at a point that represents test completion so that you can inspect the results.

The type of test data you use influences this decision. For example, if you use a bounded set of test data that is provided to the streaming pipeline, you might cancel the pipeline when all elements have completed processing. Alternatively, If you use a real data source, such as an existing Pub/Sub topic that is used in production, or if you otherwise generate test data continually, you might want to keep test pipelines running over a longer period. The latter lets you compare the behavior against the production environment, or even against other test pipelines.