Building production-ready data pipelines using Dataflow: Planning data pipelines

Stay organized with collections Save and categorize content based on your preferences.

This document explains important planning considerations for your data pipeline before you begin code development. It's part of a series that helps you improve the production readiness of your data pipelines by using Dataflow. The series is intended for a technical audience whose responsibilities include developing, deploying, and monitoring Dataflow pipelines and who have a working understanding of Dataflow and Apache Beam.

The documents in the series include the following parts:

Introduction

Data pipelines move data from one system to another; they're often critical components of a business information system. The performance and reliability of your data pipelines can impact these broader systems, and therefore impact how effectively your business requirements can be met.

As with any system, if you plan your data pipelines before you develop them, you can improve their performance and reliability. This guide explains various planning considerations for Dataflow pipelines, including:

  • Performance expectations for your pipelines, including standards for measurability
  • Integration of your pipelines with data sources, sinks, and other connected systems
  • Regionalization of pipelines, sources, and sinks
  • Security, such as data encryption and private networking

Defining and measuring SLOs

An important measure of performance is how well your pipeline meets your business requirements. Service level objectives (SLOs) provide tangible definitions of performance that you can compare against acceptable thresholds. For example, you might define the following example SLOs for your system:

  • Data freshness: 90% of product recommendations should be generated from user website activity that occurred no later than 3 minutes ago.

  • Data correctness: within a calendar month, less than 0.5% of customer invoices should contain errors.

  • Data isolation/load balancing: within a business day, all high-priority payments should be processed within 10 minutes of lodgement, and standard-priority payments should be completed by the next business day.

SLO compliance can be measured using service level indicators (SLIs), which are quantifiable metrics that indicate how well your system is meeting a given SLO. For example, you could measure the example data-freshness SLO by using the age of the most recently processed user activity as an SLI. If your pipeline generates recommendations from user activity events, and if your SLI reports a 4-minute delay between the event time and when the event is processed, the recommendations will not consider a user's website activity from earlier than 4 minutes. If a pipeline that processes streaming data exceeds a system latency of 4 minutes, you know that the SLO is not met.

It's important to capture a range of SLIs that describe the overall performance of the system beyond the performance of the pipeline itself, including metrics that describe the end-to-end health of your system. This is because system components beyond your pipeline can affect your SLO. For example, your Dataflow pipeline might compute results with acceptable delay, but a performance issue might occur with a downstream system that can impact wider SLOs.

For more information about important SLOs that you should consider, see the Site Reliability Engineering book.

Data freshness

Data freshness refers to the usability of data in relation to its age. The following data freshness SLOs are mentioned in the Site Reliability Engineering book as the most common pipeline data freshness SLO formats:

  • X% of data processed in Y [seconds, days, minutes]. This SLO refers to the percentage of data that's processed in a given period of time, and is commonly used for batch pipelines that process bounded data sources. The metrics for this type of SLO are the input and output data sizes at key processing steps relative to the elapsed pipeline runtime. You can choose a step that reads an input dataset and another step that processes each item of the input. An example SLO could be "For the Shave the Yak game, 99% of user activities that impact players' scores should be accounted for within 30 minutes of match completion."

  • The oldest data is no older than X [seconds, days, minutes]. This SLO refers to the age of data produced by the pipeline, and is commonly used for streaming pipelines that process data from unbounded sources. For this type of SLO, you use metrics that indicate how long your pipeline takes to process data. Two possible metrics are either the age of the oldest unprocessed item (how long an unprocessed item has been in the queue) or the age of the most recently processed item. An example SLO could be "Product recommendations should be generated from user activity that is no older than 5 minutes."

  • The pipeline job has completed successfully within X [seconds, days, minutes]. This SLO sets a deadline for successful completion and is commonly used for batch pipelines that process data from bounded data sources. This SLO requires the total pipeline-elapsed time and job-completion status, in addition to other signals that indicate the success of the job (for example, the percentage of processed elements that result in errors). An example SLO could be "Customer orders from the current business day should be processed by 9 AM the next day."

For practical examples based on the data freshness SLOs like these that use Cloud Monitoring, see Monitoring data pipelines.

Data correctness

Data correctness refers to data being free of errors. You can determine data correctness through different means. One method is to check whether the data is consistent by using a set of validation rules, such as data validation rules that use regular expressions. Another method is to have a domain expert verify that the data is correct.

You can check data against reference data. One challenge is that reference data for validating correctness might not always be available. Therefore, you might need to generate this data using automated tools like Dataflow Data Generator, or you might need to create the reference datasets manually. You can store the reference datasets in BigQuery or Cloud Storage, where they can be used for different pipeline tests.

When you have a reference dataset, you can verify data correctness in the following contexts:

For running pipelines, defining a data correctness target usually involves measuring correctness over a period of time. For example:

  • On a per-job basis, less than X% of input items contain data errors. This SLO can be used to measure data correctness for batch pipelines. An example SLO could be "For each daily batch job to process electricity meter readings, less than 3% of readings contain data entry errors."
  • Over an X-minute moving window, less than Y% of input items contain data errors. This SLO can be used to measure data correctness for streaming pipelines. An example SLO could be "Less than 2% of electricity meter readings over the last hour contain negative values."

To measure these SLOs, you can use metrics over a suitable period of time to accumulate the number of errors by type—for example, the data is incorrect due to a malformed schema, or the data is outside a valid range.

For practical examples based on the data correctness SLOs like these that use Cloud Monitoring, see Monitoring data pipelines.

Data isolation and load balancing

Data isolation involves segmenting data by attribute, which can make load balancing easier. For example, in an online payment-processing platform, you could segment data so that individual payments are either standard priority or high priority. Your pipeline could then use load balancing to ensure that high-priority payments are processed before standard-priority payments.

Imagine that you've defined the following SLOs for payment processing:

  • High-priority payments are processed within 10 minutes.
  • Standard-priority payments are processed by 9 AM the next business day.

If the payment platform complies with these SLOs, customers are able to see finalized high-priority payments on a reporting dashboard as they are completed. In contrast, standard payments might not appear on the dashboard until the next day.

In this example scenario, you have the following options:

  • Run a single pipeline to process both standard-priority and high-priority payments.
  • Isolate and load-balance the data based on priorities across multiple pipelines.

The following sections describe each option in detail.

Using a single pipeline to deliver against mixed SLOs

The following diagram illustrates a single pipeline that's used to process both high-priority and standard-priority payments. The pipeline receives notification of new payments from a streaming data source—for example, a Pub/Sub topic or an Apache Kafka topic. It then immediately processes payments and writes events to BigQuery using streaming inserts.

Single pipeline for all processing, with an overall SLO of less than 10 minutes.

The advantage of a single pipeline is that it simplifies your operational requirements because you need to manage only a single data source and pipeline. Dataflow uses automatic tuning features to help execute your job as quickly and efficiently as possible.

However, a disadvantage is that the shared pipeline can't prioritize high-priority payments over standard-priority payments, and pipeline resources are shared across both payment types. In the business scenario from earlier, your pipeline must maintain the more stringent of the two SLOs–that is, the one for high-priority payments–regardless of the actual priority of the processed payments. In the event of a work backlog, the streaming pipeline is also unable to prioritize backlog processing according to the urgency of work.

Using multiple pipelines tailored for specific SLOs

As an alternative, you can use two pipelines to isolate resources and deliver against specific SLOs. The following diagram illustrates this approach.

Using two pipelines, one for high-priority payments (with SLO less than 10 minutes) and another one for lower-priority payments (with SLO less than 24 hours).

As shown in the diagram, high-priority payments are isolated to a streaming pipeline for prioritized processing. Standard-priority payments are processed by a batch pipeline that runs daily (for example, after the close of business) and that uses BigQuery load jobs to write processed results.

There are advantages to isolating data in different pipelines. For example, to deliver high-priority payments against tighter SLOs, you can shorten processing times by assigning more resources for the high-priority dedicated Dataflow pipeline. This can include adding more Dataflow workers, configuring larger machines, or enabling auto-scaling. Isolating high-priority items to a separate processing queue can also mitigate processing delays if there's a sudden influx of standard-priority payments. (This document discusses Dataflow workers in more detail later.)

When you use multiple pipelines to isolate and load-balance the data from batch and streaming sources, the Apache Beam programming model allows the high-priority (streaming) and standard-priority (batch) pipelines to share the same code. The only exception is the initial read transform, which reads from a bounded source for the batch pipeline. For more information, see Create libraries of reusable transforms later in this series.

Planning for data sources and sinks

To process data, a data pipeline needs to be integrated with other systems. Those systems are referred to as sources and sinks. Data pipelines read data from sources and write data to sinks. In addition to sources and sinks, data pipelines might interact with external systems for data enrichment, filtering, or calling external business logic within a processing step.

For scalability, Dataflow executes the stages of your pipeline in parallel across multiple workers. The scalability of your pipeline can be influenced by factors that are outside your pipeline code and of the Dataflow service itself. These factors might include the following:

  • Scalability of the external systems: external systems that your pipeline interacts with can be a performance constraint and can form the upper bound of scalability. For example, an Apache Kafka topic can affect your pipeline's performance if it's configured with an insufficient number of partitions for the read throughput that you need. To help ensure that the pipeline and its components can meet your performance targets, refer to the best practices documentation for the external systems that you're using. You can also simplify infrastructure capacity planning by using Google Cloud services that provide built-in scalability. For more information, see Using Google Cloud managed sources and sinks later in this document.

  • Choice of data formats: certain data formats might be faster to read than others. For example, using data formats that support parallelizable reads, such as Avro, is usually faster than using CSV files that have embedded newlines in fields, and is faster than using compressed files.

  • Data location and network topology: the geographic proximity and networking characteristics of data sources and sinks in relation to the data pipeline can impact performance. See Regional considerations later in this document.

Calling external services

Calling external services from your pipeline incurs per-call overheads that can decrease the performance and efficiency of your pipeline. If your data pipeline calls external services, you should batch multiple data elements into single requests where possible to reduce these overheads. In fact, many native Beam I/O transforms perform this automatically for you, such as BigQueryIO and streaming insert operations. Aside from capacity limits, some external services might also enforce quotas that limit the total number of calls over a period of time (for example, a daily quota), or restrict the rate of calling (such as number of requests per second).

Because Dataflow parallelizes work across multiple workers, it's possible to overwhelm an external service with too much traffic, or to exhaust available quotas. When autoscaling is used, Dataflow might attempt to compensate by adding workers to execute a slow step like an external call; this can add further pressure on external systems. You should ensure that external systems can support your anticipated load requirements, or you should limit the traffic from your pipeline to sustainable levels. For more information, see Limit batch sizes and concurrent calls to external services later in this series.

Using Google Cloud managed sources and sinks

Using Google Cloud managed services with your Dataflow pipeline removes the complexity of capacity management by providing built-in scalability, consistent performance, and quotas and limits that accommodate most requirements. You should still be mindful of different quotas and limits that apply for different operations. Dataflow itself imposes quotas and limits. You can increase some of these by contacting Google Cloud support.

Dataflow uses Compute Engine VM instances to execute your jobs, so it's important to make sure that you have sufficient Compute Engine quota. Insufficient Compute Engine quota can also hinder pipeline autoscaling or prevent jobs from starting.

The remaining parts of this section explore how different Google Cloud quotas and limits might influence how you design, develop, and monitor your pipeline. Pub/Sub and BigQuery are used as examples of pipeline sources and sinks.

Example 1: Pub/Sub

When you use Pub/Sub with Dataflow, Pub/Sub provides a scalable and durable event ingestion service for delivering messages to and from your streaming data pipelines. You can use the Google Cloud console to view Pub/Sub quota consumption and increase quota limits. We recommend that you request a quota increase if you have any single pipeline that exceeds the per-project quotas and limits.

Pub/Sub quotas and limits are designed around project level usage; that is, publishers and subscribers in different projects are given independent data-throughput quotas. If multiple pipelines publish or subscribe to a single topic, you can get maximum allowable throughput on that topic by deploying each pipeline into its own project. In this configuration, each pipeline uses a different project-based service account to consume and publish messages.

In the following diagram, Pipeline 1 and Pipeline 2 share the same subscriber and publisher throughput quota that's available to Project A. In contrast, Pipeline 3 can use the entire subscriber and publisher throughput quota that's attached to Project B.

Three pipelines. Pipeline 1 and Pipeline 2 are in Pipeline Project A; each has its own subscription to a Pub/Sub topic. Pipeline 3 is in Pipeline Project B, which has its own subscription.

Multiple pipelines can read from a single Pub/Sub topic by using separate subscriptions to the topic, which allows Dataflow pipelines to pull and acknowledge messages independently of other subscribers (in this case, other pipelines). This makes it easy to clone pipelines by creating additional Pub/Sub subscriptions. Creating additional subscriptions is useful for creating replica pipelines for high availability (typically for streaming use cases), for running additional test pipelines against the same data, and for enabling pipeline updates.

Example 2: BigQuery

The Apache Beam SDK provides a connector for reading and writing BigQuery data, namely BigQueryIO for the Java SDK. BigQueryIO supports two methods for reading data, EXPORT (table export) and DIRECT_READ. The different read methods consume different BigQuery quotas.

Table export is the default read method. It works as shown in the following diagram:

The pipeline sends an export request to BigQuery, which writes data to a temporary location in Cloud Storage. The pipeline then reads data from that temporary location.

The diagram shows the following flow:

  1. BigQueryIO invokes a BigQuery export request to export table data. The exported table data is written to a temporary Cloud Storage location.
  2. BigQueryIO reads the table data from the temporary Cloud Storage location.

BigQuery export requests are limited by export quotas. The export request must also complete before the pipeline can start processing data, which adds additional execution time for the job.

In contrast, the direct read approach uses the BigQuery Storage API to directly read table data from BigQuery. The BigQuery Storage API provides high-throughput read performance for table row data using gRPC. Using the BigQuery Storage API makes the export step unnecessary. This avoids export quota restrictions and potentially decreases job run time.

The following diagram shows the flow if you use the BigQuery Storage API. In contrast to the flow that uses a BigQuery export request, as shown previously, this flow is simpler; there's a single direct-read step to get the data from the table to the pipeline.

The pipelines reads from a BigQuery table directly.

Writing data to BigQuery tables also has its own quota implications. For example, batch pipelines that use BigQuery load jobs consume different BigQuery load job quotas that apply at the table and project level. Similarly, streaming pipelines that use BigQuery streaming inserts consume BigQuery streaming insert quotas.

Consider your use case to determine the most appropriate methods to read and write data. For example, you should avoid using BigQuery load jobs to append data thousands of times per day into a table, which is a common requirement for streaming pipelines. BigQuery streaming inserts are suited for this use case, because there are no quota limits on rows inserted per day.

Regional considerations

Dataflow is offered as a managed service in multiple Google Cloud regions. The regions where you choose to execute your jobs can depend on multiple factors. For example, you might consider the location of data sources and sinks, preferences or restrictions on data processing locations, and Dataflow features that are offered only in specific regions.

There are two considerations relating to the regional placement of Dataflow jobs. The first consideration is the regional endpoint that's used to manage execution of a given job. The second is the zone that's used for the job's workers. For a given job, the region setting that you use for the regional endpoint and workers can differ. For more information, including when to specify regions and zones, see the regional endpoints documentation.

By specifying regions to run your Dataflow jobs, you can plan around regional considerations for high availability and disaster recovery. For more information, see High availability and geographic redundancy.

Regional endpoints

Regional endpoints store and handle metadata relating to your job—for example, information about the Beam graph itself, such as transform names. They also control worker behaviors such as autoscaling. Specifying a regional endpoint helps you meet your needs for security and compliance, data locality, and the regional placement of a job. Where possible, to avoid performance impact from cross-region network calls, we recommend that you use the same region for the job's regional endpoint and for workers.

Currently, Dataflow Shuffle and Streaming Engine are available only in specific regions. Enabling these features can improve the performance of your job. For more information, see Dataflow features for optimizing resource usage later in this series.

Dataflow workers

Dataflow jobs use Compute Engine VM instances (called Dataflow workers) to run your pipeline. Dataflow jobs can use any Compute Engine zone for workers, including regions where there are no Dataflow regional endpoints. By specifying a worker region for your job, you can control the regional placement of your workers. To specify a worker region or zone, do the following:

  • If you use the gcloud CLI to create a job from a Dataflow template, use the --worker-region flag to override the worker region, or use --worker-zone flag to override the worker zone.
  • If you use the Apache Beam Java SDK to create your job, you set regions and zones for workers using pipeline options. Use workerRegion to override the worker region or workerZone to override the worker zone.

To improve network latency and throughput, we recommend that you create workers in a region that's geographically close to your data sources and sinks. If you do not specify a region or zone for workers when you create a job, Dataflow automatically defaults to a zone that's in the same region as the job's regional endpoint.

If you don't use the Dataflow Shuffle service or Streaming Engine, the data that's processed by the job (that is, data stored in any PCollection object) resides on the job's workers, assuming that no user code transmits data outside the workers. If either the Dataflow Shuffle service or Streaming Engine is enabled, the distributed dataset represented by a PCollection object can be transmitted between the workers and these services. In that case, you should use workers that are in the same location as the regional endpoint in order to maximize performance.

Data encryption considerations

As a fully managed service, Dataflow automatically encrypts data that moves through your data pipeline using Google-managed encryption keys for both in-flight data and at-rest data. Instead of using Google-managed encryption keys, you might prefer to manage your own encryption keys. For that case, Dataflow supports customer-managed encryption keys (CMEK) using the Cloud Key Management Service (KMS). You can also use Cloud HSM, a cloud-hosted hardware security module (HSM) service that allows you to host encryption keys and perform cryptographic operations in a cluster of FIPS 140-2 Level 3 certified HSMs.

When you use CMEK, Dataflow uses your Cloud KMS key to encrypt the data, except for data-key-based operations such as windowing, grouping, and joining. If data keys contain sensitive data, such as personally identifiable information (PII), you must hash or otherwise transform the keys before they enter the Dataflow pipeline.

Private networking considerations

Your networking and security requirements might mandate that VM-based workloads such as Dataflow jobs use only private IP addresses. Dataflow allows you to specify that only private IP addresses be used for all network communication by workers. If public IPs are disabled, you must enable Private Google Access on the subnetwork so that Dataflow workers can reach Google APIs and services, including the Dataflow regional endpoint.

Disabling public IPs also prevents Dataflow workers from accessing resources that are outside the subnetwork or from accessing peer VPC networks. Similarly, network access to VM workers from outside the subnetwork or peer VPC networks is also prevented.

For more information about using the --usePublicIps pipeline option to specify whether workers should have only private IPs, see the Dataflow documentation.

What's next