This page provides best practices for developing and testing your Dataflow pipeline.
Overview
The way that the code for your pipeline is implemented has a significant influence on how well the pipeline performs in production. To help you create pipeline code that works correctly and efficiently, this document explains the following:
- Pipeline runners to support code execution in the different stages of development and deployment.
- Deployment environments that let you run pipelines during development, testing, preproduction, and production.
- Open source pipeline code and templates that you can use as is, or as the basis for new pipelines to accelerate code development.
- Coding best practices for developing your pipeline to improve pipeline observability and performance. Many of these practices are applicable to programming using the Apache Beam SDK (the examples use Java) and are not specific to Dataflow. However, in many cases, Dataflow provides features that complement these coding practices for improved production readiness.
- A best-practices approach for testing pipeline code. First, this document provides an overview that includes the scope and relationship of different test types, such as unit tests, integration tests, and end-to-end tests. Second, each type of test is explored in detail, including methods to create and integrate with test data, and which pipeline runners to use for each test.
Pipeline runners
During development and testing, you use different Apache Beam runners to run pipeline code. The Apache Beam SDK provides a Direct Runner for local development and testing. Your release automation tooling can also use the Direct Runner for unit tests and integration tests. For example, you can use the Direct Runner within your continuous integration (CI) pipeline.
Pipelines that are deployed to Dataflow use the Dataflow Runner, which runs your pipeline in production-like environments. Additionally, you can use the Dataflow Runner for ad hoc development testing and for end-to-end pipeline tests.
Although this page focuses on running pipelines built using the Apache Beam Java SDK, Dataflow also supports Apache Beam pipelines that were developed using Python and Go. The Apache Beam Java, Python, and Go SDKs are generally available for Dataflow. SQL developers can also use Apache Beam SQL to create pipelines that use familiar SQL dialects.
Set up a deployment environment
To separate users, data, code, and other resources across different stages of development, create deployment environments. When possible, to provide isolated environments for the different stages of pipeline development, use separate Google Cloud projects.
The following sections describe a typical set of deployment environments.
Local environment
The local environment is a developer's workstation. For development and rapid testing, use the Direct Runner to run pipeline code locally.
Pipelines that are run locally using the Direct Runner can interact with remote Google Cloud resources, such as Pub/Sub topics or BigQuery tables. Give individual developers separate Cloud projects so that they have a sandbox for ad hoc testing with Google Cloud services.
Some Google Cloud services, such as Pub/Sub and Cloud Bigtable, provide emulators for local development. You can use these emulators with the Direct Runner to enable end-to-end local development and testing.
Sandbox environment
The sandbox environment is a Google Cloud project that provides developers with access to Google Cloud services during code development. Pipeline developers can share a Google Cloud project with other developers, or use their own individual projects. Using individual projects reduces planning complexity relating to shared resource usage and quota management.
Developers use the sandbox environment to perform ad hoc pipeline execution with the Dataflow Runner. The sandbox environment is useful for debugging and testing code against a production runner during the code development phase. For example, ad hoc pipeline execution lets developers do the following:
- Observe the effect of code changes on scaling behavior.
- Understand potential differences between the behavior of the Direct Runner and the Dataflow Runner.
- Understand how Dataflow applies graph optimizations.
For ad hoc testing, developers can deploy code from their local environment in order to run Dataflow within their sandbox environment.
Preproduction environment
The preproduction environment is for development phases that need to run in production-like conditions, such as end-to-end testing. Use a separate project for the preproduction environment and configure it to be as similar to production as possible. Similarly, to allow end-to-end tests with production-like scale, make Cloud project quotas for Dataflow and other services as similar as possible to the production environment.
Depending on your requirements, you can further separate preproduction into multiple environments. For example, a quality control environment can support the work of quality analysts to test service level objectives (SLOs) such as data correctness, freshness, and performance under different workload conditions.
End-to-end tests include integration with data sources and sinks within the scope of testing. Consider how to make these available within the preproduction environment. You can store test data in the preproduction environment itself. For example, test data is stored in a Cloud Storage bucket with your input data. In other cases, test data might originate from outside the preproduction environment, such as a Pub/Sub topic through a separate subscription that's in the production environment. For streaming pipelines, you can also run end-to-end tests using generated data, for example, using the Dataflow Streaming Data Generator to emulate production-like data characteristics and volumes.
For streaming pipelines, use the preproduction environment to test pipeline updates before any changes are made to production. It's important to test and verify update procedures for streaming pipelines, particularly if you need to coordinate multiple steps, such as when running parallel pipelines to avoid downtime.
Production environment
The production environment is a dedicated Cloud project. Continuous delivery copies deployment artifacts to the production environment when all end-to-end tests have passed.
Development best practices
This section discusses coding and development best practices. Many of these practices complement and enhance aspects of pipeline development and operationalization, such as improving developer productivity, promoting pipeline testability, increasing performance, and enabling deeper insights with monitoring.
Before you begin development, set up deployment environments that support your development, testing, and delivery lifecycle.
Use Google-provided templates
To accelerate pipeline development, check to see if Google provides an existing Dataflow template. Some templates allow you to add custom logic as a pipeline step. For example, the Pub/Sub topic to BigQuery template provides a parameter to run a JavaScript user-defined function (UDF) that's stored in Cloud Storage. Google-provided templates are open source under the Apache License 2.0, so you can use them as the basis for new pipelines. The templates are also useful as code examples for reference.
Create libraries of reusable transforms
The Apache Beam programming model unifies batch and streaming data processing, which makes it possible to reuse transforms. Creating a shared library of common transforms promotes reusability, testability, and code ownership by different teams.
Consider the following two Java code examples, which both read payment events. The first one is from an unbounded Pub/Sub source:
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options)
// Initial read transform
PCollection<PaymentEvent> payments =
p.apply("Read from topic",
PubSubIO.readStrings().withTimestampAttribute(...).fromTopic(...))
.apply("Parse strings into payment events",
ParDo.of(new ParsePaymentEventFn()));
The second one is from a bounded relational database source:
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
PCollection<PaymentEvent> payments =
p.apply(
"Read from database table",
JdbcIO.<PaymentEvent>read()
.withDataSourceConfiguration(...)
.withQuery(...)
.withRowMapper(new RowMapper<PaymentEvent>() {
...
}));
Assuming that both pipelines perform the same processing, they can use the same transforms through a shared library for the remaining processing steps. How you implement code reusability best practices varies by programming language and build tool. For example, if you use Maven, you can separate transform code into its own module. You can then include the module as a submodule in larger multi-module projects for different pipelines, as shown in the following code example:
// Reuse transforms across both pipelines
payments
.apply("ValidatePayments", new PaymentTransforms.ValidatePayments(...))
.apply("ProcessPayments", new PaymentTransforms.ProcessPayments(...))
...
For more information, see the Apache Beam documentation for best practices on writing user code for Apache Beam transforms and for the recommended style guide for PTransforms.
Use dead-letter queues for error handling
Your pipeline might encounter situations where it's not possible to process elements. This situation can occur for different reasons, but a common cause is data issues. For example, an element that contains badly formatted JSON can cause parsing failures.
In this situation, one approach is to catch an exception within the
DoFn.ProcessElement
method. In your exception block, you might log the error and drop the element.
However, this causes the data to be lost and prevents the data from being
inspected later for manual handling or troubleshooting.
A better approach is to use a pattern called a dead-letter queue (or
dead-letter file). Catch exceptions in the DoFn.ProcessElement
method and log
errors as you normally would. Instead of dropping the failed element,
use branching outputs to write failed elements into a separate PCollection
object. These elements are then written to a data sink for later inspection
and handling by using a separate transform.
The following Java code example shows how to implement the dead-letter queue pattern:
TupleTag<Output> successTag = new TupleTag<>() {};
TupleTag<Input> deadLetterTag = new TupleTag<>() {};
PCollection<Input> input = /* ... */;
PCollectionTuple outputTuple =
input.apply(ParDo.of(new DoFn<Input, Output>() {
@Override
void processElement(ProcessContext c) {
try {
c.output(process(c.element()));
} catch (Exception e) {
LOG.severe("Failed to process input {} -- adding to dead-letter file",
c.element(), e);
c.sideOutput(deadLetterTag, c.element());
}
}).withOutputTags(successTag, TupleTagList.of(deadLetterTag)));
// Write the dead-letter inputs to a BigQuery table for later analysis
outputTuple.get(deadLetterTag)
.apply(BigQueryIO.write(...));
// Retrieve the successful elements...
PCollection<Output> success = outputTuple.get(successTag);
// and continue processing ...
You can use Cloud Monitoring to apply different monitoring and alerting policies for your pipeline's dead-letter queue. For example, you can visualize the number and size of elements processed by your dead-letter transform and configure alerting to trigger if certain threshold conditions are met.
Handle schema mutations
You can handle data that has unexpected but valid schemas using a dead-letter
pattern, which writes failed elements to a separate PCollection
object.
However, in some cases, you might want to automatically handle elements
that reflect a mutated schema as valid elements. For example, if an element's
schema reflects a mutation like the addition of new fields, you can adapt the
schema of the data sink to accommodate mutations.
Automatic schema mutation relies on the branching-output approach used by the dead-letter pattern. However, in this case it triggers a transform that mutates the destination schema whenever additive schemas are encountered. For an example of this approach, see How to handle mutating JSON schemas in a streaming pipeline, with Square Enix on the Google Cloud blog.
Choose correctly between side inputs or CoGroupByKey
for joins
Joining datasets is a common use case for data pipelines.
Side inputs
provide a flexible way to solve common data processing problems, such as data
enrichment and keyed lookups. Unlike PCollection
objects, side inputs are also
mutable, and they can be determined at runtime. For example, the values in a
side input might be computed by another branch in your pipeline or determined by
calling a remote service.
Dataflow supports side inputs by persisting data into persistent storage (similar to a shared disk), which makes the complete side input available to all workers. Side input sizes can be very large and might not fit into worker memory. Reading from a large side input can cause performance issues if workers need to constantly read from persistent storage.
The
CoGroupByKey
transform is a
core Apache Beam transform
that merges (flattens) multiple PCollection
objects and groups elements that
have a common key. Unlike a side input, which makes the entire side input data
available to each worker, CoGroupByKey
performs a shuffle (grouping) operation
to distribute data across workers. CoGroupByKey
is therefore ideal when the
PCollection
objects you want to join are very large and don't fit into worker
memory.
Follow these guidelines to help decide whether to use side inputs or
CoGroupByKey
:
- Use side inputs when one of the
PCollection
objects you are joining is disproportionately smaller than the other, and where the smallerPCollection
object fits into worker memory. Caching the side input entirely into memory makes it fast and efficient to fetch elements. - Use
CoGroupByKey
if you need to fetch a large proportion of aPCollection
object that significantly exceeds worker memory. - Use side inputs when you have a
PCollection
object that should be joined multiple times in your pipeline. Instead of using multipleCoGroupByKey
transforms, you can create a single side input that can be reused by multipleParDo
transforms.
For more information, see Troubleshoot Dataflow out of memory errors.
Minimize expensive per-element operations
A DoFn
instance processes batches of elements called
bundles,
which are atomic units of work that consist of zero or more
elements. Individual elements are then processed by the
DoFn.ProcessElement
method, which runs for every element. Because the DoFn.ProcessElement
method is called for every element, any time-consuming or computationally
expensive operations that are invoked by that method cause these operations to
be run for every single element processed by the method.
If you need to perform costly operations only once for a batch of elements,
include those operations in the
DoFn.Setup
and
DoFn.StartBundle
methods instead of in DoFn.ProcessElement
. Examples include the following:
Parsing a configuration file that controls some aspect of the
DoFn
instance's behavior. Only invoke this action one time, when theDoFn
instance is initialized by using theDoFn.Setup
method.Instantiating a short-lived client that is reused across all elements in a bundle, such as when all elements in the bundle are sent over a single network connection. Invoke this action one time per bundle by using the
DoFn.StartBundle
method.
Limit batch sizes and concurrent calls to external services
When you call external services, you can reduce per-call overheads by using the
GroupIntoBatches
transform to create batches of elements of a specified size. Batching sends
elements to an external service as one payload instead of
individually.
In combination with batching, you can limit the maximum number of parallel (concurrent) calls to the external service by choosing appropriate keys to partition the incoming data. The number of partitions determines the maximum parallelization. For example, if every element is given the same key, a downstream transform for calling the external service does not run in parallel.
Consider one of the following approaches to produce keys for elements:
- Choose an attribute of the dataset to use as data keys, such as user IDs.
- Generate data keys to split elements randomly over a fixed number of
partitions, where the number of possible key values determines the number
of partitions. You need to create enough partitions for parallelism, and
each partition needs to have enough elements for
GroupIntoBatches
to be useful.
The following Java code example shows how to randomly split elements over 10 partitions:
// PII or classified data which needs redaction.
PCollection<String> sensitiveData = ...;
int numPartitions = 10; // Number of parallel batches to create.
PCollection<KV<Long, Iterable<String>>> batchedData =
sensitiveData
.apply("Assign data into partitions",
ParDo.of(new DoFn<String, KV<Long, String>>() {
Random random = new Random();
@ProcessElement
public void assignRandomPartition(ProcessContext context) {
context.output(
KV.of(randomPartitionNumber(), context.element()));
}
private static int randomPartitionNumber() {
return random.nextInt(numPartitions);
}
}))
.apply("Create batches of sensitive data",
GroupIntoBatches.<Long, String>ofSize(100L));
// Use batched sensitive data to fully utilize Redaction API
// (which has a rate limit but allows large payloads)
batchedData
.apply("Call Redaction API in batches", callRedactionApiOnBatch());
Identify performance issues caused by inappropriately fused steps
Dataflow builds a graph of steps that represents your pipeline, based on the transforms and data that you used to construct it. This graph is called the pipeline execution graph.
When you deploy your pipeline, Dataflow might modify
your pipeline's execution graph to improve performance. For example, Dataflow
might fuse some operations together, a process known as
fusion optimization,
to avoid the performance and cost impact of writing every intermediate
PCollection
object in your pipeline.
In some cases, Dataflow might incorrectly determine the optimal way to fuse operations in the pipeline, which can limit the Dataflow service's ability to make use of all available workers. In those cases, you might want to prevent some operations from being fused.
Consider the following example Apache Beam code. A
GenerateSequence
transform creates a small bounded PCollection
object, which is then further
processed by two downstream ParDo
transforms.
import com.google.common.math.LongMath;
...
public class FusedStepsPipeline {
final class FindLowerPrimesFn extends DoFn<Long, String> {
@ProcessElement
public void processElement(ProcessContext c) {
Long n = c.element();
if (n > 1) {
for (long i = 2; i < n; i++) {
if (LongMath.isPrime(i)) {
c.output(Long.toString(i));
}
}
}
}
}
public static void main(String[] args) {
Pipeline p = Pipeline.create(options);
PCollection<Long> sequence = p.apply("Generate Sequence",
GenerateSequence
.from(0)
.to(1000000));
// Pipeline branch 1
sequence.apply("Find Primes Less-than-N",
ParDo.of(new FindLowerPrimesFn()));
// Pipeline branch 2
sequence.apply("Increment Number",
MapElements.via(new SimpleFunction<Long, Long>() {
public Long apply(Long n) {
return ++n;
}
}));
p.run().waitUntilFinish();
}
}
The Find Primes Less-than-N
transform might be computationally expensive and is
likely to run slowly for large numbers. In contrast, you would expect the
Increment Number
transform to complete quickly.
The following diagram shows a graphical representation of the pipeline in the Dataflow monitoring interface.
Monitoring the job using the
Dataflow monitoring interface
shows the same slow rate of processing for both transforms, namely 13 elements
per second. You might expect the Increment Number
transform to process
elements quickly, but instead it appears to be tied to the same rate of
processing as Find Primes Less-than-N
.
The reason is that Dataflow fused the steps into a single
stage, which prevents them from running independently. You can use the
following gcloud
command:
gcloud dataflow jobs describe --full job-id --format json
In the resulting output, the fused steps are described in the
ExecutionStageSummary
object in the
ComponentTransform
array:
...
"executionPipelineStage": [
{
"componentSource": [
...
],
"componentTransform": [
{
"name": "s1",
"originalTransform": "Generate Sequence/Read(BoundedCountingSource)",
"userName": "Generate Sequence/Read(BoundedCountingSource)"
},
{
"name": "s2",
"originalTransform": "Find Primes Less-than-N",
"userName": "Find Primes Less-than-N"
},
{
"name": "s3",
"originalTransform": "Increment Number/Map",
"userName": "Increment Number/Map"
}
],
"id": "S01",
"kind": "PAR_DO_KIND",
"name": "F0"
}
...
In this scenario, the Find Primes Less-than-N
transform is the slow step, so
breaking the fusion before that step is an appropriate strategy. One method to
unfuse steps is to insert a
GroupByKey
transform and ungroup before the step, as shown in the following Java code
example:
sequence
.apply("Map Elements", MapElements.via(new SimpleFunction<Long, KV<Long, Void>>() {
public KV<Long, Void> apply(Long n) {
return KV.of(n, null);
}
}))
.apply("Group By Key", GroupByKey.<Long, Void>create())
.apply("Emit Keys", Keys.<Long>create())
.apply("Find Primes Less-than-N", ParDo.of(new FindLowerPrimesFn()));
You can also combine these unfusing steps into a reusable composite transform.
After you unfuse the steps, when you run the pipeline, you see that Increment Number
completes in a matter of seconds, and the much longer-running
Find Primes Less-than-N
transform executes in a separate stage.
This example applies a group and ungroup operation to unfuse steps.
You can use other approaches for other circumstances. In this case, handling
duplicate output is not an issue, given the consecutive output of the
GenerateSequence
transform.
KV
objects with duplicate keys are deduplicated to a single key in the group
(GroupByKey
)
and ungroup
(Keys
)
transforms. To retain duplicates after the group and ungroup operations,
create KV
pairs by using a random key and the original input as the value,
group using the random key, and then emit the values for each key as the
output.
Use Apache Beam metrics to collect pipeline insights
Apache Beam metrics is a utility class for producing various metrics for reporting the properties of a running pipeline. When you use Cloud Monitoring, Apache Beam metrics are available as Cloud Monitoring custom metrics.
The following Java snippet is an example of
Counter
metrics used in a DoFn
subclass.
final TupleTag<String> errorTag = new TupleTag<String>(){};
final TupleTag<MockObject> successTag = new TupleTag<MockObject>(){};
final class ParseEventFn extends DoFn<String, MyObject> {
private final Counter malformedCounter = Metrics.counter(ParseEventFn.class, "malformedJson");
private final Counter emptyCounter = Metrics.counter(ParseEventFn.class, "emptyPayload");
private Gson gsonParser;
@Setup
public setup() {
gsonParser = new Gson();
}
@ProcessElement
public void processElement(ProcessContext c) {
try {
MyObject myObj = gsonParser.fromJson(c.element(), MyObject.class);
if (myObj.getPayload() != null) {
// Output the element if non-empty payload
c.output(successTag, myObj);
}
else {
// Increment empty payload counter
emptyCounter.inc();
}
}
catch (JsonParseException e) {
// Increment malformed JSON counter
malformedCounter.inc();
// Output the element to dead-letter queue
c.output(errorTag, c.element());
}
}
}
The example code uses two counters. One counter tracks JSON parsing failures
(malformedCounter
), and the other counter tracks whether the JSON message is
valid but contains an empty payload (emptyCounter
). In Cloud Monitoring,
the custom metric names are custom.googleapis.com/dataflow/malformedJson
and
custom.googleapis.com/dataflow/emptyPayload
. You can use the custom metrics
to create visualizations and alerting policies in Cloud Monitoring.
Test your pipeline
In software development, unit tests, integration tests, and end-to-end tests are common types of software testing. These testing types are also applicable to data pipelines.
The Apache Beam SDK provides functionality to enable these tests. Ideally, each type of test targets a different deployment environment. The following diagram illustrates how unit tests, integration tests, and end-to-end tests apply to different parts of your pipeline and data.
The diagram shows the scope of different tests and how they relate to
transforms (DoFn
and PTransform
subclasses), pipelines, data sources, and
data sinks.
The following sections describe how various formal software tests apply to data pipelines using Dataflow. As you read through this section, refer back to the diagram to understand how the different types of tests are related.
Unit tests
Unit tests assess the correct functioning of DoFn
subclasses and
composite transforms
(PTransform
subclasses) by comparing the output of those transforms with a
verified set of data inputs and outputs. Typically, developers can run these
tests in the local environment. The tests can also run automatically through
unit-test automation using continuous integration (CI) in the build
environment.
The Direct Runner runs unit tests using a smaller subset of reference test data that focuses on testing the business logic of your transforms. The test data must be small enough to fit into local memory on the machine that runs the test.
The Apache Beam SDK provides a JUnit rule called
TestPipeline
for unit-testing individual transforms (DoFn
subclasses), composite transforms
(PTransform
subclasses), and entire pipelines. You can use TestPipeline
on a
Apache Beam pipeline runner such as the Direct Runner or the
Dataflow Runner to apply assertions on the contents of
PCollection
objects using
PAssert
,
as shown in the following code snippet of a
JUnit test class:
@Rule
public final transient TestPipeline p = TestPipeline.create();
@Test
@Category(NeedsRunner.class)
public void myPipelineTest() throws Exception {
final PCollection<String> pcol = p.apply(...)
PAssert.that(pcol).containsInAnyOrder(...);
p.run();
}
Unit tests for individual transforms
By factoring your code into reusable transforms, for example, as top-level or static nested classes, you can create targeted tests for different parts of your pipeline. Aside from the benefits of testing, reusable transforms enhance code maintainability and reusability by naturally encapsulating the business logic of your pipeline into component parts. In contrast, testing individual parts of your pipeline might be difficult if the pipeline uses anonymous inner classes to implement transforms.
The following Java snippet shows the implementation of transforms as anonymous inner classes, which doesn't easily allow testing.
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options)
PCollection<Integer> output =
p.apply("Read from text", TextIO.Read.from(...))
.apply("Split words", ParDo.of(new DoFn() {
// Untestable anonymous transform 1
}))
.apply("Generate anagrams", ParDo.of(new DoFn() {
// Untestable anonymous transform 2
}))
.apply("Count words", Count.perElement());
Compare the previous example with the following one, where the anonymous inner
classes are refactored into named concrete DoFn
subclasses. You can create
individual unit tests for each concrete DoFn
subclass that makes up the
end-to-end pipeline.
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options)
PCollection<Integer> output =
p.apply("Read from text", TextIO.Read.from(...))
.apply("Split words", ParDo.of(new SplitIntoWordsFn()))
.apply("Generate anagrams", ParDo.of(new GenerateAnagramsFn()))
.apply("Count words", Count.perElement());
Testing each DoFn
subclass is similar to unit testing a batch
pipeline that contains a single transform. Use the Create
transform to
create a PCollection
object of test data, and then pass it to the DoFn
object. Use PAssert
to assert that the contents of the PCollection
object are correct. The following Java code example uses the PAssert
class to
check for correct output form.
@Rule
public final transient TestPipeline p = TestPipeline.create();
@Test
@Category(NeedsRunner.class)
public void testGenerateAnagramsFn() {
// Create the test input
PCollection<String> words = p.apply(Create.of("friend"));
// Test a single DoFn using the test input
PCollection<String> anagrams =
words.apply("Generate anagrams", ParDo.of(new GenerateAnagramsFn()));
// Assert correct output from
PAssert.that(anagrams).containsInAnyOrder(
"finder", "friend", "redfin", "refind");
p.run();
}
Integration tests
Integration tests verify the correct functioning of your entire pipeline. Consider the following types of integration tests:
- A transform integration test that assesses the integrated functionality of all the individual transforms that make up your data pipeline. Think of transform integration tests as a unit test for your entire pipeline, excluding integration with external data sources and sinks. The Apache Beam SDK provides methods to supply test data to your data pipeline and to verify the results of processing. The Direct Runner is used to run transform integration tests.
A system integration test that assesses your data pipeline's integration with live data sources and sinks. For your pipeline to communicate with external systems, you need to configure your tests with appropriate credentials to access external services. Streaming pipelines run indefinitely, so you need to decide when and how to stop the running pipeline. By using the Direct Runner to run system integration tests, you quickly verify the integration between your pipeline and other systems without needing to submit a Dataflow job and wait for it to finish.
Design transform and system integration tests to provide rapid defect detection and feedback without slowing developer productivity. For longer-running tests, such as those that run as Dataflow jobs, you might want to use an end-to-end test that runs less frequently.
Think of a data pipeline as one or more related transforms. You
can create an encapsulating composite transform for your pipeline and
use
TestPipeline
to perform an integration test of your entire pipeline. Depending on whether you
want to test the pipeline in batch or streaming mode, you supply test data using
either the
Create
or
TestStream
transforms.
Use test data for integration tests
In your production environment, your pipeline likely integrates with different data sources and sinks. However, for unit tests and transformation integration tests, focus on verifying the business logic of your pipeline code by providing test inputs and verifying the output directly. In addition to simplifying your tests, this approach allows you to isolate pipeline-specific issues from those that might be caused by data sources and sinks.
Test batch pipelines
For batch pipelines, use the Create
transform to create a PCollection
object of your input test data out of a standard in-memory collection, such as a
Java List
object. Using the Create
transform is appropriate if your test
data is small enough to include in code. You can then use PAssert
on the
output PCollection
objects to determine the correctness of your pipeline code.
This approach is supported by the Direct Runner and by the
Dataflow Runner.
The following Java code snippet shows assertions against output PCollection
objects from a composite transform that includes some or all of the individual
transforms that constitute a pipeline (WeatherStatsPipeline
). The approach is
similar to unit-testing individual transforms in a pipeline.
private class WeatherStatsPipeline extends
PTransform<PCollection<Integer>, PCollection<WeatherSummary>> {
@Override
public PCollection<WeatherSummary> expand(PCollection<Integer> input) {
// Pipeline transforms …
}
}
@Rule
public final transient TestPipeline p = TestPipeline.create();
@Test
@Category(NeedsRunner.class)
public void testWeatherPipeline() {
// Create test input consisting of temperature readings
PCollection<Integer> tempCelsius =
p.apply(Create.of(24, 22, 20, 22, 21, 21, 20));
// CalculateWeatherStats calculates the min, max, and average temperature
PCollection<WeatherSummary> result =
tempCelsius.apply("Calculate weather statistics", new WeatherStatsPipeline());
// Assert correct output from CalculateWeatherStats
PAssert.thatSingleton(result).isEqualTo(new WeatherSummary.Builder()
.withAverageTemp(21)
.withMaxTemp(24)
.withMinTemp(20)
.build());
p.run();
}
To test windowing behavior, you can also use the Create
transform to create
elements with timestamps, as shown in the following code snippet:
private static final Duration WINDOW_DURATION = Duration.standardMinutes(3);
@Rule
public final transient TestPipeline p = TestPipeline.create();
@Test
@Category(NeedsRunner.class)
public void testWindowedData() {
PCollection<String> input =
p.apply(
Create.timestamped(
TimestampedValue.of("a", new Instant(0L)),
TimestampedValue.of("a", new Instant(0L)),
TimestampedValue.of("b", new Instant(0L)),
TimestampedValue.of("c", new Instant(0L)),
TimestampedValue.of("c", new Instant(0L).plus(WINDOW_DURATION)))
.withCoder(StringUtf8Coder.of()));
PCollection<KV<String, Long>> windowedCount =
input
.apply(Window.into(FixedWindows.of(WINDOW_DURATION)))
.apply(Count.perElement());
PAssert.that(windowedCount)
.containsInAnyOrder(
// Output from first window
KV.of("a", 2L),
KV.of("b", 1L),
KV.of("c", 1L),
// Output from second window
KV.of("c", 1L));
p.run();
}
Test streaming pipelines
Streaming pipelines contain assumptions that define how to handle unbounded data. These assumptions are often about the timeliness of data in real-world conditions, and therefore have an impact on correctness depending on whether the assumptions prove to be true or false. Integration tests for streaming pipelines ideally include tests that simulate the non-deterministic nature of streaming data arrival.
To
enable such tests,
the Apache Beam SDK provides the
TestStream
class to model the effects of element timings (early, on-time, or late data) on
the results of your data pipeline. Use these tests together with the
PAssert
class to verify against expected results.
TestStream
is supported by the Direct Runner and the
Dataflow Runner. The following code sample creates a TestStream
transform:
final Duration WINDOW_DURATION = Duration.standardMinutes(3);
@Rule
public final transient TestPipeline p = TestPipeline.create();
@Test
@Category(NeedsRunner.class)
public void testDroppedLateData() {
TestStream<String> input = TestStream.create(StringUtf8Coder.of())
// Add elements arriving before the watermark
.addElements(
TimestampedValue.of("a", new Instant(0L)),
TimestampedValue.of("a", new Instant(0L)),
TimestampedValue.of("b", new Instant(0L)),
TimestampedValue.of("c", new Instant(0L).plus(Duration.standardMinutes(3))))
// Advance the watermark past the end of the window
.advanceWatermarkTo(new Instant(0L).plus(WINDOW_DURATION).plus(Duration.standardMinutes(1)))
// Add elements which will be dropped due to lateness
.addElements(
TimestampedValue.of("c", new Instant(0L)))
// Advance the watermark to infinity which will close all windows
.advanceWatermarkToInfinity();
PCollection<KV<String, Long>> windowedCount =
p.apply(input)
.apply(Window.into(FixedWindows.of(WINDOW_DURATION)))
.apply(Count.perElement());
PAssert.that(windowedCount)
.containsInAnyOrder(
// Output from first window
KV.of("a", 2L),
KV.of("b", 1L),
KV.of("c", 1L));
p.run();
}
For more information about TestStream
, see
Testing Unbounded Pipelines in Apache Beam.
For more information about how to use the Apache Beam SDK for unit testing, see the
Apache Beam documentation.
Use Google Cloud services in integration tests
The
Direct Runner
can integrate with Google Cloud services, so ad hoc tests in the local environment and system integration tests
can use Pub/Sub, BigQuery, and other services as
needed. When you use the Direct Runner, your pipeline runs as the user account
that you configured by using the
gcloud
command-line tool or as a service account that you specified using the
GOOGLE_APPLICATION_CREDENTIALS
environment variable. Therefore, you must grant sufficient
permissions to this account for any required resources before you run your
pipeline. For more details, see
Dataflow security and permissions.
For entirely local integration tests, you can use local emulators for some Google Cloud services. Local emulators are available for Pub/Sub and Bigtable.
For system integration testing of streaming pipelines, you can use the
setBlockOnRun
method (defined in the DirectOptions
interface) to have the Direct Runner run your pipeline asynchronously.
Otherwise, pipeline execution blocks the calling parent process (for
example, a script in your build pipeline) until the pipeline is manually
stopped. If you run the pipeline asynchronously, you can use the returned
PipelineResult
instance to cancel execution of the pipeline, as shown in the following code
example:
public interface StreamingIntegrationTestOptions extends
DirectOptions, StreamingOptions, MyOtherPipelineOptions {
...
}
@Rule
public final transient TestPipeline p = TestPipeline.create();
@Test
@Category(NeedsRunner.class)
public void testNonBlockingPipeline() {
StreamingIntegrationTestOptions options =
p.getOptions().as(StreamingIntegrationOptions.class);
options.setBlockOnRun(false); // Set non-blocking pipeline execution
options.setStreaming(true); // Set streaming mode
p.apply(...); // Apply pipeline transformations
PipelineResult result = p.run(); // Run the pipeline
// Generate input, verify output, etc
...
// Later on, cancel the pipeline using the previously returned
result.cancel();
}
End-to-end tests
End-to-end tests verify the correct operation of your end-to-end pipeline by running it on the Dataflow Runner under conditions that closely resemble production. The tests verify that the business logic functions correctly using the Dataflow Runner and test whether the pipeline performs as expected under production-like loads. You typically run end-to-end tests in a dedicated Google Cloud project that is designated as the preproduction environment.
To test your pipeline at different scales, use different types of end-to-end tests, for example:
- Run small-scale end-to-end tests using a small proportion (such as one percent) of your test dataset to quickly validate pipeline functionality in the preproduction environment.
- Run large-scale end-to-end tests using a full test dataset to validate pipeline functionality under production-like data volumes and conditions.
For streaming pipelines, we recommend that you run test pipelines in parallel with your production pipeline if they can use the same data. This process lets you compare results and operational behavior, such as autoscaling and performance.
End-to-end tests help to predict how well your pipeline will meet your production SLOs. The preproduction environment tests your pipeline under production-like conditions. Within end-to-end tests, pipelines run using the Dataflow Runner to process complete reference datasets that match or closely resemble datasets in production.
It might not be possible to generate synthetic data for testing that accurately simulates real data. To address this problem, one approach is to use cleansed extracts from production data sources to create reference datasets, in which any sensitive data is de-identified through appropriate transformations. We recommend using Cloud Data Loss Prevention (DLP) for this purpose. Cloud DLP can detect sensitive data from a range of content types and data sources and apply a range of de-identification techniques including redaction, masking, format preserving encryption, and date-shifting.
Differences in end-to-end tests for batch and streaming pipelines
Before you run a full end-to-end test against a large test dataset, you
might want to run a test with a smaller percentage of the test data (such
as one percent) and verify expected behavior in a shorter amount of time. Like
with integration tests using the Direct Runner, you can use PAssert
on
PCollection
objects when you run pipelines using the
Dataflow Runner. For more information about PAssert
, see the
Unit tests section on this page.
Depending on your use case, verifying very large output from end-to-end tests might be impractical, costly, or otherwise challenging. In that case, you can verify representative samples from the output result set instead. For example, you can use BigQuery to sample and compare output rows with a reference dataset of expected results.
For streaming pipelines, simulating realistic streaming conditions with synthetic data might be challenging. A common way to provide streaming data for end-to-end tests is to integrate testing with production data sources. If you're using Pub/Sub as a data source, you can enable a separate datastream for end-to-end tests through additional subscriptions to existing topics. You can then compare the results of different pipelines that consume the same data, which is useful for verifying candidate pipelines against other preproduction and production pipelines.
The following diagram shows how this method allows a production pipeline and test pipeline to run in parallel in different deployment environments.
In the diagram, both pipelines read from the same Pub/Sub topic, but they use separate subscriptions. This setup allows the two pipelines to process the same data independently and allows you to compare the results. The test pipeline uses a separate service account from the production project, and therefore avoids using the Pub/Sub subscriber quota for the production project.
Unlike batch pipelines, streaming pipelines continue to run until they are explicitly cancelled. In end-to-end tests, you need to decide whether to leave the pipeline running, perhaps until the next end-to-end test is run, or cancel the pipeline at a point that represents test completion so that you can inspect the results.
The type of test data you use influences this decision. For example, if you use a bounded set of test data that is provided to the streaming pipeline, you might cancel the pipeline when all elements have completed processing. Alternatively, If you use a real data source, such as an existing Pub/Sub topic that is used in production, or if you otherwise generate test data continually, you might want to keep test pipelines running over a longer period. The latter lets you compare the behavior against the production environment, or even against other test pipelines.