Plan your Dataflow pipeline

This page explains important considerations for planning your data pipeline before you begin code development. Data pipelines move data from one system to another and are often critical components of business information systems. The performance and reliability of your data pipeline can impact these broader systems and how effectively your business requirements are met.

If you plan your data pipelines before you develop them, you can improve their performance and reliability. This page explains various planning considerations for Dataflow pipelines, including:

  • Performance expectations for your pipelines, including standards for measurability
  • Integration of your pipelines with data sources, sinks, and other connected systems
  • Regionalization of pipelines, sources, and sinks
  • Security, such as data encryption and private networking

Define and measure SLOs

An important measure of performance is how well your pipeline meets your business requirements. Service level objectives (SLOs) provide tangible definitions of performance that you can compare against acceptable thresholds. For example, you might define the following example SLOs for your system:

  • Data freshness: generate 90% of product recommendations from user website activity that occurred no later than 3 minutes ago.

  • Data correctness: within a calendar month, less than 0.5% of customer invoices contain errors.

  • Data isolation/load balancing: within a business day, process all high-priority payments within 10 minutes of lodgement, and complete standard-priority payments by the next business day.

You can use service level indicators (SLIs) to measure SLO compliance. SLIs are quantifiable metrics that indicate how well your system is meeting a given SLO. For example, you can measure the example data-freshness SLO by using the age of the most recently processed user activity as an SLI. If your pipeline generates recommendations from user activity events, and if your SLI reports a 4-minute delay between the event time and the time the event is processed, the recommendations don't consider a user's website activity from earlier than 4 minutes. If a pipeline that processes streaming data exceeds a system latency of 4 minutes, you know that the SLO is not met.

Because system components beyond your pipeline affect your SLO, it's important to capture a range of SLIs that describe the overall performance of the system beyond the performance of the pipeline itself, including metrics that describe the end-to-end health of your system. For example, your Dataflow pipeline might compute results with acceptable delay, but a performance issue might occur with a downstream system that impacts wider SLOs.

For more information about important SLOs to consider, see the Site Reliability Engineering book.

Data freshness

Data freshness refers to the usability of data in relation to its age. The following data freshness SLOs are mentioned in the Site Reliability Engineering book as the most common pipeline data freshness SLO formats:

  • X% of data processed in Y [seconds, days, minutes]. This SLO refers to the percentage of data that's processed in a given period of time. It is commonly used for batch pipelines that process bounded data sources. The metrics for this type of SLO are the input and output data sizes at key processing steps relative to the elapsed pipeline runtime. You can choose a step that reads an input dataset and another step that processes each item of the input. An example SLO is "For the Shave the Yak game, 99% of user activities that impact players' scores are accounted for within 30 minutes of match completion."

  • The oldest data is no older than X [seconds, days, minutes]. This SLO refers to the age of data produced by the pipeline. It is commonly used for streaming pipelines that process data from unbounded sources. For this type of SLO, use metrics that indicate how long your pipeline takes to process data. Two possible metrics are the age of the oldest unprocessed item, that is, how long an unprocessed item has been in the queue, or the age of the most recently processed item. An example SLO is "Product recommendations are generated from user activity that is no older than 5 minutes."

  • The pipeline job completes successfully within X [seconds, days, minutes]. This SLO sets a deadline for successful completion and is commonly used for batch pipelines that process data from bounded data sources. This SLO requires the total pipeline-elapsed time and job-completion status, in addition to other signals that indicate the success of the job, such as the percentage of processed elements that result in errors. An example SLO is "Customer orders from the current business day are processed by 9 AM the next day."

For information about using Cloud Monitoring to measure data freshness, see Dataflow job metrics.

Data correctness

Data correctness refers to data being free of errors. You can determine data correctness through different means, including:

For running pipelines, defining a data correctness target usually involves measuring correctness over a period of time. For example:

  • On a per-job basis, less than X% of input items contain data errors. You can use this SLO to measure data correctness for batch pipelines. An example SLO is "For each daily batch job to process electricity meter readings, less than 3% of readings contain data entry errors."
  • Over an X-minute moving window, less than Y% of input items contain data errors. You can use this SLO to measure data correctness for streaming pipelines. An example SLO is "Less than 2% of electricity meter readings over the last hour contain negative values."

To measure these SLOs, use metrics over a suitable period of time to accumulate the number of errors by type. Examples of error types are the data is incorrect due to a malformed schema, or the data is outside a valid range.

For information about using Cloud Monitoring to measure data correctness, see Dataflow job metrics.

Data isolation and load balancing

Data isolation involves segmenting data by attribute, which can make load balancing easier. For example, in an online payment-processing platform, you can segment data so that individual payments are either standard priority or high priority. Your pipeline can then use load balancing to ensure that high-priority payments are processed before standard-priority payments.

Imagine that you define the following SLOs for payment processing:

  • High-priority payments are processed within 10 minutes.
  • Standard-priority payments are processed by 9 AM the next business day.

If the payment platform complies with these SLOs, customers are able to see finalized high-priority payments on a reporting dashboard as they are completed. In contrast, standard payments might not appear on the dashboard until the next day.

In this example scenario, you have the following options:

  • Run a single pipeline to process both standard-priority and high-priority payments.
  • Isolate and load-balance the data based on priorities across multiple pipelines.

The following sections describe each option in detail.

Use a single pipeline to deliver against mixed SLOs

The following diagram illustrates a single pipeline that's used to process both high-priority and standard-priority payments. The pipeline receives notification of new payments from a streaming data source, such as a Pub/Sub topic or an Apache Kafka topic. It then immediately processes payments and writes events to BigQuery using streaming inserts.

Single pipeline for all processing, with an overall SLO of less than 10 minutes.

The advantage of a single pipeline is that it simplifies your operational requirements, because you need to manage only a single data source and pipeline. Dataflow uses automatic tuning features to help run your job as quickly and efficiently as possible.

A disadvantage of a single pipeline is that the shared pipeline can't prioritize high-priority payments over standard-priority payments, and pipeline resources are shared across both payment types. In the business scenario described previously, your pipeline must maintain the more stringent of the two SLOs. That is, the pipeline must use the SLO for high-priority payments regardless of the actual priority of the processed payments. Another disadvantage is that in the event of a work backlog, the streaming pipeline is unable to prioritize backlog processing according to the urgency of work.

Use multiple pipelines tailored for specific SLOs

You can use two pipelines to isolate resources and deliver against specific SLOs. The following diagram illustrates this approach.

Using two pipelines, one for high-priority payments (with SLO less than 10 minutes) and another one for lower-priority payments (with SLO less than 24 hours).

High-priority payments are isolated to a streaming pipeline for prioritized processing. Standard-priority payments are processed by a batch pipeline that runs daily and that uses BigQuery load jobs to write processed results.

Isolating data in different pipelines has advantages. To deliver high-priority payments against tighter SLOs, you can shorten processing times by assigning more resources to the pipeline dedicated to high-priority payments. Resources configurations include adding Dataflow workers, using larger machines, and enabling auto-scaling. Isolating high-priority items to a separate processing queue can also mitigate processing delays if a sudden influx of standard-priority payments occurs.

When you use multiple pipelines to isolate and load-balance the data from batch and streaming sources, the Apache Beam programming model allows the high-priority (streaming) and standard-priority (batch) pipelines to share the same code. The only exception is the initial read transform, which reads from a bounded source for the batch pipeline. For more information, see Create libraries of reusable transforms.

Plan for data sources and sinks

To process data, a data pipeline needs to be integrated with other systems. Those systems are referred to as sources and sinks. Data pipelines read data from sources and write data to sinks. In addition to sources and sinks, data pipelines might interact with external systems for data enrichment, filtering, or calling external business logic within a processing step.

For scalability, Dataflow runs the stages of your pipeline in parallel across multiple workers. Factors that are outside your pipeline code and the Dataflow service also impact the scalability of your pipeline. These factors might include the following:

  • Scalability of external systems: external systems that your pipeline interacts with can constrain performance and can form the upper bound of scalability. For example, an Apache Kafka topic configured with an insufficient number of partitions for the read throughput that you need can affect your pipeline's performance. To help ensure that the pipeline and its components meet your performance targets, refer to the best practices documentation for the external systems that you're using. You can also simplify infrastructure capacity planning by using Google Cloud services that provide built-in scalability. For more information, see Using Google Cloud managed sources and sinks on this page.

  • Choice of data formats: certain data formats might be faster to read than others. For example, using data formats that support parallelizable reads, such as Avro, is usually faster than using CSV files that have embedded newlines in fields, and is faster than using compressed files.

  • Data location and network topology: the geographic proximity and networking characteristics of data sources and sinks in relation to the data pipeline might impact performance. For more information, see Regional considerations on this page.

External services calls

Calling external services from your pipeline incurs per-call overheads that can decrease the performance and efficiency of your pipeline. If your data pipeline calls external services, to reduce overheads, batch multiple data elements into single requests where possible. Many native Apache Beam I/O transforms automatically perform this task, including BigQueryIO and streaming insert operations. Aside from capacity limits, some external services also enforce quotas that limit the total number of calls over a period of time, such as a daily quota, or restrict the rate of calling, such as number of requests per second.

Because Dataflow parallelizes work across multiple workers, too much traffic can overwhelm an external service or exhaust available quotas. When autoscaling is used, Dataflow might attempt to compensate by adding workers to run a slow step like an external call. Adding workers can add further pressure on external systems. Ensure that external systems can support your anticipated load requirements, or limit the traffic from your pipeline to sustainable levels. For more information, see Limit batch sizes and concurrent calls to external services.

Use Google Cloud managed sources and sinks

Using Google Cloud managed services with your Dataflow pipeline removes the complexity of capacity management by providing built-in scalability, consistent performance, and quotas and limits that accommodate most requirements. You still need to be aware of different quotas and limits for pipeline operations. Dataflow itself imposes quotas and limits. You can increase some of these by contacting Google Cloud support.

Dataflow uses Compute Engine VM instances to run your jobs, so you need sufficient Compute Engine quota. Insufficient Compute Engine quota can hinder pipeline autoscaling or prevent jobs from starting.

The remaining parts of this section explore how different Google Cloud quotas and limits might influence how you design, develop, and monitor your pipeline. Pub/Sub and BigQuery are used as examples of pipeline sources and sinks.

Example 1: Pub/Sub

When you use Pub/Sub with Dataflow, Pub/Sub provides a scalable and durable event ingestion service for delivering messages to and from your streaming data pipelines. You can use the Google Cloud console to view Pub/Sub quota consumption and increase quota limits. We recommend that you request a quota increase if you have any single pipeline that exceeds the per-project quotas and limits.

Pub/Sub quotas and limits are designed around project level usage. Specifically, publishers and subscribers in different projects are given independent data-throughput quotas. If multiple pipelines publish or subscribe to a single topic, you can get maximum allowable throughput on that topic by deploying each pipeline into its own project. In this configuration, each pipeline uses a different project-based service account to consume and publish messages.

In the following diagram, Pipeline 1 and Pipeline 2 share the same subscriber and publisher throughput quota that's available to Project A. In contrast, Pipeline 3 can use the entire subscriber and publisher throughput quota that's attached to Project B.

Three pipelines. Pipeline 1 and Pipeline 2 are in Pipeline Project A; each has its own subscription to a Pub/Sub topic. Pipeline 3 is in Pipeline Project B, which has its own subscription.

Multiple pipelines can read from a single Pub/Sub topic by using separate subscriptions to the topic, which allows Dataflow pipelines to pull and acknowledge messages independently of other subscribers, such as other pipelines. This feature makes it easy to clone pipelines by creating additional Pub/Sub subscriptions. Creating additional subscriptions is useful for creating replica pipelines for high availability (typically for streaming use cases), for running additional test pipelines against the same data, and for enabling pipeline updates.

Example 2: BigQuery

Reading and writing BigQuery data is supported by the Apache Beam SDK for multiple languages, including Java, Python, and Go. When you use Java, the BigQueryIO class provides this functionality. BigQueryIO supports two methods for reading data: EXPORT (table export) and DIRECT_READ. The different read methods consume different BigQuery quotas.

Table export is the default read method. It works as shown in the following diagram:

The pipeline sends an export request to BigQuery, which writes data to a temporary location in Cloud Storage. The pipeline then reads data from that temporary location.

The diagram shows the following flow:

  1. BigQueryIO invokes a BigQuery export request to export table data. The exported table data is written to a temporary Cloud Storage location.
  2. BigQueryIO reads the table data from the temporary Cloud Storage location.

BigQuery export requests are limited by export quotas. The export request must also complete before the pipeline can start processing data, which adds additional run time for the job.

In contrast, the direct read approach uses the BigQuery Storage API to read table data directly from BigQuery. The BigQuery Storage API provides high-throughput read performance for table row data using gRPC. Using the BigQuery Storage API makes the export step unnecessary, which avoids export quota restrictions and potentially decreases job run time.

The following diagram shows the flow if you use the BigQuery Storage API. In contrast to the flow that uses a BigQuery export request, this flow is simpler, because it only has a single direct-read step to get the data from the table to the pipeline.

The pipelines reads from a BigQuery table directly.

Writing data to BigQuery tables also has its own quota implications. Batch pipelines that use BigQuery load jobs consume different BigQuery load job quotas that apply at the table and project level. Similarly, streaming pipelines that use BigQuery streaming inserts consume BigQuery streaming insert quotas.

To determine the most appropriate methods to read and write data, consider your use case. For example, avoid using BigQuery load jobs to append data thousands of times per day into a table. Use a streaming pipeline to write near real-time data to BigQuery. Your streaming pipeline should use either streaming inserts or the Storage Write API for this purpose.

Regional considerations

Dataflow is offered as a managed service in multiple Google Cloud regions When choosing a region to use to run your jobs, consider the following factors:

  • The location of data sources and sinks
  • Preferences or restrictions on data processing locations
  • Dataflow features that are offered only in specific regions
  • The region that's used to manage execution of a given job
  • The zone that's used for the job's workers

For a given job, the region setting that you use for the job and for the workers can differ. For more information, including when to specify regions and zones, see the Dataflow regions documentation.

By specifying regions to run your Dataflow jobs, you can plan around regional considerations for high availability and disaster recovery. For more information, see High availability and geographic redundancy.

Regions

Dataflow regions store and handle metadata relating to your job, such as information about the Apache Beam graph itself, like transform names. They also control worker behaviors such as autoscaling. Specifying a region helps you meet your needs for security and compliance, data locality, and the regional placement of a job. To avoid performance impact from cross-region network calls, we recommend that you use the same region for the job and for the workers when possible.

Dataflow workers

Dataflow jobs use Compute Engine VM instances, called Dataflow workers, to run your pipeline. Dataflow jobs can use any Compute Engine zone for workers, including regions where there are no Dataflow locations. By specifying a worker region for your job, you can control the regional placement of your workers. To specify a worker region or zone, do the following:

  • If you use the gcloud CLI to create a job from a Dataflow template, use the --worker-region flag to override the worker region, or use the --worker-zone flag to override the worker zone.
  • If you use the Apache Beam Java SDK to create your job, set regions and zones for workers using pipeline options. Use workerRegion to override the worker region or workerZone to override the worker zone.

To improve network latency and throughput, we recommend that you create workers in a region that's geographically close to your data sources and sinks. If you don't specify a region or zone for workers when you create a job, Dataflow automatically defaults to a zone that's in the same region as the job.

If you don't use the Dataflow Shuffle service or Streaming Engine, the data that's processed by the job (that is, data stored in any PCollection object) resides on the job's workers, assuming that no user code transmits data outside the workers. If either the Dataflow Shuffle service or Streaming Engine is enabled, the distributed dataset represented by a PCollection object can be transmitted between the workers and these services.

Data encryption considerations

As a fully managed service, Dataflow automatically encrypts data that moves through your data pipeline using Google-managed encryption keys for both in-flight data and at-rest data. Instead of using Google-managed encryption keys, you might prefer to manage your own encryption keys. For that case, Dataflow supports customer-managed encryption keys (CMEK) using the Cloud Key Management Service (KMS). You can also use Cloud HSM, a cloud-hosted hardware security module (HSM) service that allows you to host encryption keys and perform cryptographic operations in a cluster of FIPS 140-2 Level 3 certified HSMs.

When you use CMEK, Dataflow uses your Cloud KMS key to encrypt the data, except for data-key-based operations such as windowing, grouping, and joining. If data keys contain sensitive data, such as personally identifiable information (PII), you must hash or otherwise transform the keys before they enter the Dataflow pipeline.

Private networking considerations

Your networking and security requirements might mandate that VM-based workloads such as Dataflow jobs use only private IP addresses. Dataflow lets you specify that workers use private IP addresses for all network communication. If public IPs are disabled, you must enable Private Google Access on the subnetwork so that Dataflow workers can reach Google APIs and services.

We recommend that you disable public IPs for Dataflow workers, unless your Dataflow jobs require public IPs to access network resources outside of Google Cloud. Disabling public IPs prevents Dataflow workers from accessing resources that are outside the subnetwork or from accessing peer VPC networks. Similarly, network access to VM workers from outside the subnetwork or peer VPC networks is prevented.

For more information about using the --usePublicIps pipeline option to specify whether workers should have only private IPs, see Pipeline options.

What's next