Building production-ready data pipelines using Dataflow: Deploying data pipelines

This document discusses how to deploy and update production-ready pipelines. It's part of a series that helps you improve the production readiness of your data pipelines by using Dataflow. The series is intended for a technical audience whose responsibilities include the development, deployment, and monitoring of Dataflow pipelines, and who have a working understanding of Dataflow and Apache Beam.

The documents in the series include the following parts:

Introduction

Pipeline development involves different stages and tasks, such as code development, testing, and delivery into production. This document explains:

  • Considerations for continuous integration and continuous delivery (CI/CD) to support automated build, testing, and pipeline deployment into different environments.
  • Dataflow features to optimize performance and resource utilization in production.
  • Approaches and watchpoints for updating streaming pipelines in production.
  • Best practices for improving pipeline reliability in production.

Continuous integration (CI)

Continuous integration (CI) requires developers to merge code into a shared repository frequently, which can be useful for applications that change a lot, such as websites that are updated frequently. Although data pipelines don't usually change as much as other types of applications, CI practices can provide many benefits for pipeline development. For example, test automation provides rapid feedback when defects are encountered and reduces the likelihood that regressions will enter the code base.

Test automation is an important part of CI. When it's combined with appropriate test coverage, test automation can rigorously execute your test suite on each code commit. Your CI server can work in conjunction with a build automation tool like Maven to execute your test suite as one or more steps of the CI pipeline. Code that has successfully passed unit tests and integration tests can be packaged into deployment artifacts from which pipelines can be launched. This is referred to as a passing build.

The number and types of deployment artifacts created from a passing build can vary depending on how pipelines are launched. Using the Apache Beam Java SDK, you can package your pipeline code into a self-executing JAR file. You can then store the JAR file in a bucket that's hosted in the project for a deployment environment, such as the preproduction or production Google Cloud project. If you use Classic Templates (a type of templated execution), the deployment artifacts include a JSON template file, the JAR file for your pipeline, and an optional metadata template. The artifacts can then be deployed into different deployment environments using continuous delivery, as explained in the next section.

Continuous delivery and deployment (CD)

Continuous delivery copies deployment artifacts to one or more deployment environments that are ready to be launched manually. Typically, the artifacts built by the CI server are deployed to one or more preproduction environments to test pipelines end-to-end using Dataflow. The production environment is updated if all end-to-end tests pass successfully.

For batch pipelines, continuous deployment can directly launch the pipeline as a new Dataflow job. Alternatively, other systems can use the artifacts to launch batch jobs when required. For example, you can use Cloud Composer to run batch jobs within a workflow, or use Cloud Scheduler to schedule batch jobs.

Streaming pipelines can be more complex to deploy than batch pipelines, and therefore can be more difficult to automate using continuous deployment. For example, you might need to determine how to replace or update an existing streaming pipeline. If you can't update a pipeline, or if you choose not to update it, you can use other methods such as coordinating multiple Dataflow jobs to minimize or prevent business disruption.

Identities and roles required for CI/CD

Your CI/CD pipeline interacts with different systems to build, test, and deploy pipelines. For example, your pipeline needs access to your source code repository. To enable these interactions, ensure that your pipeline has the proper identities and roles. The following pipeline activities might also require your pipeline to have specific identities and roles.

Integration testing with external services, including Google Cloud

When you use the Direct Runner for integration tests, your pipeline uses either the Cloud SDK credentials to consume Google Cloud data sources and sinks, or it uses the credentials provided by the GOOGLE_APPLICATION_CREDENTIALS environment variable. Ensure that sufficient roles and permissions have been granted to the service account that's used to obtain credentials for Google Cloud resources accessed by the pipeline.

Deploying artifacts to different deployment environments

Where possible, use unique credentials for each environment (effectively for each project) and limit access to resources accordingly.

Use unique service accounts for each project to read and write deployment artifacts to storage buckets. Depending on the type of pipeline execution, your deployment process might need to stage multiple artifacts. For example, creating and staging a Dataflow template requires permissions to be able to write deployment artifacts that are needed in order to launch your pipeline, such as the pipeline's template file, to a Cloud Storage bucket.

Deploying pipelines to different deployment environments

Where possible, use unique service accounts for each project to access and manage Google Cloud resources within the project, including accessing Dataflow itself.

The service account that you use for creating and managing Dataflow jobs needs to have sufficient IAM permissions for job management. Assigning the roles/dataflow.admin role to the service account is sufficient to grant a minimal permission set for job management—the role grants permissions to create and manage jobs, but not to run jobs.

The controller service account that you use for running Dataflow jobs needs permissions for managing Compute Engine resources during execution and for managing interaction between the Apache Beam pipeline and the Dataflow service. Assigning the roles/dataflow.worker role to the controller service account is sufficient to grant a minimal permission set for running the Dataflow job. Depending on the resources that are used by your job (for example, reading from Pub/Sub topics), you might need to grant additional permissions beyond those provided by the roles/dataflow.worker role.

You use the --serviceAccount pipeline option to specify a user-managed controller service account to use for that job. For the job to run successfully, the service account that's used to create the job must be granted the roles/iam.serviceAccountUser role on the user-managed controller service account. If you don't specify a controller service account when you create a job, Dataflow attempts to use the Compute Engine default service account. We recommend instead that you specify a user-managed controller service account for production environments, because the Compute Engine default service account usually has a broader set of permissions than the permissions that are required for your Dataflow jobs.

In production scenarios, we recommend that you use separate service accounts for Dataflow job management and for the controller service account, because this enables improved security compared to using a single service account. For example, the service account that's used to create Dataflow jobs might not need to access data sources and sinks or to use other resources that are used by the pipeline. In this scenario, the controller service account that's used to run Dataflow jobs is granted permissions to use pipeline resources. A different service account for job creation is granted permissions to manage (including creating) only Dataflow jobs.

Example CI/CD pipeline

The following diagram provides a general and tool-agnostic view of CI/CD for data pipelines. Additionally, the diagram shows the relationship between development tasks, deployment environments, and the pipeline runners as discussed in previous sections.

Stages of a CI/CD pipeline.

The diagram shows the following stages:

  • Code development: During code development, a developer runs pipeline code locally using the Direct Runner. In addition, developers use a sandbox environment for ad hoc pipeline execution using the Dataflow Runner.

    In typical CI/CD pipelines, the continuous integration process is triggered when a developer makes a change to the source control system, such as pushing new code to a repository.

  • Build and test: The continuous integration process compiles the pipeline code and then executes unit tests and transform integration tests using the Direct Runner. System integration tests, which include integration testing with external sources and sinks using small test datasets, can also run.

    If the tests succeed, the CI process stores the deployment artifacts (which might include JAR files, Docker images, and template metadata, depending on the type of job execution) that are required for launching the pipeline to locations that are accessible to the continuous delivery process. Depending on the types of deployment artifacts that are generated, you might use Cloud Storage and Container Registry to store the different artifact types.

  • Deliver and deploy: The continuous delivery process copies the deployment artifacts to a preproduction environment. Developers can manually run end-to-end tests using the Dataflow Runner, or that test can be initiated automatically by using continuous deployment. Typically, a continuous deployment approach is simpler to enable for batch pipelines than for streaming pipelines, because batch pipelines don't run continuously and it's simpler to replace them with a new release.

    The process of updating streaming pipelines can be simple to complex, and you should test updates in the preproduction environment. Update procedures aren't always consistent between releases. For example, a pipeline might change in such a way that in-place updates are no longer possible. For this reason, there might be cases where it's difficult to automate pipeline updates using continuous deployment.

If all end-to-end tests pass, the deployment artifacts can be copied to the production environment as part of the continuous delivery process. If the new pipeline updates or replaces an existing streaming pipeline, use the procedures tested in the preproduction environment to deploy the new pipeline.

Non-templated versus templated job execution

You can create a Dataflow job by using the Apache Beam SDK directly from a development environment. This type of job is called a non-templated job. Although this approach is convenient for developers, you might prefer to have separation of concerns between the tasks of developing and running pipelines. To make this separation, you can use templated jobs, which allow you to stage and run your pipelines as independent tasks. After a template is staged, jobs can be executed from the template by other users, including non-developers, using the gcloud command-line tool, the Cloud Console, or the Dataflow REST API.

Dataflow offers the following types of job templates:

  • Classic Templates: Developers use the Apache Beam SDK to run the pipeline code and save the JSON serialized execution graph as the template. The Beam SDK stages the template file to a Cloud Storage location, along with any dependencies that are required by the pipeline code.
  • Flex Templates: Developers use the gcloud command-line tool to package the pipeline as a Docker image, which is then stored in Container Registry. A Flex Template spec file is also automatically generated and stored to a user-specified Cloud Storage location. The Flex Template spec file contains metadata that describes how to run the template, such as pipeline parameters.

For a detailed comparison of the template types, see the documentation on the similarities and differences between Classic Templates and Flex Templates.

In addition to the Flex Template features explained in the linked documentation, Flex Templates offer advantages over Classic Templates for managing templates. With Classic Templates, multiple artifacts (such as JAR files) might be stored in a Cloud Storage staging location, but without any features to manage these multiple artifacts. In comparison, a Flex Template is encapsulated within a Docker image. The image packages all dependencies (aside from the Flex Template spec) that are needed for your pipeline into one deployment artifact that's managed by Container Registry. You can use Docker image management features for your Flex Templates. For example, you can securely share Flex Templates by granting pull (and optionally push) permissions to Container Registry, and use Docker image tags for different versions of your Flex Templates.

Developers can use Classic Templates and Flex Templates to launch jobs in a project that's different from the template project—that is, different from the project that owns the registry and the storage bucket that hosts the template assets (or just the storage bucket if you use Classic Templates). This is useful if you need to isolate data processing for multiple customers into different projects and pipeline jobs. Using Flex Templates, you can further specify different versions of a Docker image to use when you launch a pipeline. This simplifies phased replacement of batch pipelines or streaming pipelines over multiple projects when you update templates later.

Dataflow features for optimizing resource usage

Dataflow provides the following runner-specific features to optimize resource utilization, which can translate into performance and cost improvements:

  • Streaming Engine: This moves execution of streaming pipelines out of VM workers and into a dedicated service. The benefits include improved autoscaling responsiveness, reductions in consumed worker VM resources, and automatic service updates without redeployment.
  • Dataflow Shuffle: This moves shuffle operations for batch pipelines out of VM workers and into a dedicated service. The benefits include faster execution for most batch pipelines, reduced resource consumption by worker VMs, improved autoscaling responsiveness, and improved fault tolerance.
  • Flexible resource scheduling (FlexRS): This reduces batch processing costs by using advanced scheduling techniques, the Dataflow Shuffle service, and a combination of preemptible VM instances and regular VMs.

The Streaming Engine service and the Dataflow Shuffle service can be used in Google Cloud regions where Dataflow is available. For more information, see the Dataflow documentation.

Updating streaming pipelines in production

Unlike batch pipelines, which stop when the job is complete, streaming pipelines often need to run continuously in order to provide uninterrupted processing. After the initial deployment, you have to consider the following when you want to update existing pipelines:

  • The pipeline's business logic must be updated in a way that minimizes or avoids disruption to the pipeline. Depending on your requirements, you might be able to tolerate a temporary disruption to processing while a new version of a pipeline is deployed. However, your application might not be able to incur any disruption.
  • Pipeline update processes need to handle schema changes in a way that minimizes disruption to message processing and to other attached systems. For example, if the schema for messages in an event-processing pipeline changes, schema changes might also be necessary in downstream data sinks.

The rest of this section describes approaches to addressing these two concerns.

Performing in-place updates to streaming pipelines

Under certain situations, Dataflow lets you update an ongoing streaming job directly, without having to cancel or drain the pipeline. You can update streaming jobs only for pipelines that are written using the Apache Beam Java SDK and for jobs that are not initiated from Dataflow templates. Dataflow also performs a compatibility check to ensure that updated pipeline code can be safely deployed to the running pipeline. Certain code changes can cause the compatibility check to fail—for example, when side inputs are added to or removed from an existing step. For more information, see Updating an existing pipeline.

In addition to applying code changes, you can use in-place updates to change the number of workers in a streaming Dataflow job. For more information, see the "Manual scaling in streaming mode" section of Deploying a pipeline.

Terminating and replacing streaming pipelines

If you're able to temporarily halt processing, you can cancel or drain the pipeline. Cancelling a pipeline causes Dataflow to immediately halt processing and shut down resources as quickly as possible. This causes some loss of in-flight data—that is, data that's currently being processed in the pipeline. To avoid data loss, in most cases, draining is the preferred action.

Draining a pipeline immediately closes any in-process windows and fires all triggers. Although there is no loss of in-flight data, draining can cause windows to have incomplete data. If this happens, in-process windows emit partial or incomplete results. For more information, see Effects of draining a pipeline. After the existing job completes, you can launch a new streaming job that contains your updated pipeline code, which enables processing to resume.

Another drawback is that you incur some downtime between the time when the existing streaming job terminates and the time when the replacement pipeline can be ready to resume processing data.

However, the advantage of this approach is that it's simple to cancel or drain the existing pipeline, and to then launch a new job to replace it with the updated pipeline.

Message reprocessing using Pub/Sub Snapshot and Seek

In some situations, after you replace or cancel a drained pipeline, you might need to reprocess previously delivered Pub/Sub messages. For example, you might need to reprocess data using updated business logic. Pub/Sub Seek is a feature that lets you replay messages from a Pub/Sub Snapshot. You can use the Pub/Sub Seek feature with Dataflow pipelines to reprocess messages from the time when the subscription snapshot is created.

During development and testing, you can also use Pub/Sub Seek to replay the known messages repeatedly to verify the output from your pipeline. When you use Pub/Sub Seek, don't seek a subscription snapshot when the subscription is being consumed by a pipeline. If you do, it can invalidate Dataflow's watermark logic and can affect the exactly-once processing of Pub/Sub messages.

A recommended Cloud SDK workflow for using Pub/Sub Seek with Dataflow pipelines in a terminal window is as follows:

  1. Create a snapshot of the subscription:

    gcloud pubsub snapshots create snapshot-name --subscription=pipeline-subscription-name
    
  2. Drain or cancel the pipeline:

    gcloud dataflow jobs drain job-id
    

    or

    gcloud dataflow jobs cancel job-id
    
  3. Seek to the snapshot:

    gcloud pubsub subscriptions seek snapshot-name
    
  4. Deploy a new pipeline that consumes the subscription.

Running parallel pipelines

If Dataflow can't update your job directly, you can still avoid disruption of your streaming pipeline by creating a parallel pipeline. To use this method, create a new streaming job that has updated pipeline code and run it in parallel with the existing pipeline.

When you create the new pipeline, use the same windowing strategy that you used for the existing pipeline. You let the existing pipeline continue running until its watermark exceeds the timestamp of the earliest complete window that was processed by the updated pipeline. The existing pipeline is completely drained or cancelled at this point, and the updated pipeline continues to run in its place and effectively takes over processing on its own.

The following diagram illustrates this process.

Pipeline B overlaps with Pipeline B for a 5-minute window.

In the diagram, Pipeline B is the updated job that takes over from Pipeline A. The value t is the timestamp of the earliest complete window processed by Pipeline B. The value w is the watermark for Pipeline A. For simplicity, a perfect watermark is assumed with no late data (processing and wall time is represented on the horizontal axis), and both pipelines use 5-minute fixed (tumbling) windows. Results are triggered after the watermark passes the end of each window.

Concurrent output occurs during the time period where the two pipelines overlap. Therefore, you should configure the updated pipeline to write results to a separate destination from where the existing pipeline writes results. Downstream systems can then use an abstraction over the two destination sinks, such as a database view, to query the combined results. These systems can also use the abstraction to deduplicate results from the overlapping execution period.

The following example describes the approach of using a simple pipeline that reads input data from Pub/Sub, performs some processing, and writes the results to BigQuery.

  1. In the initial state, the existing streaming pipeline (Pipeline A) is running and reading messages from a Pub/Sub topic (Topic) using a subscription (Subscription A). The results are written to a BigQuery table (Table A). Results are consumed through a BigQuery view, which acts as a façade to mask underlying table changes. This is an application of a design method called the façade pattern. The following diagram shows the initial state.

    1 pipeline with 1 subscription, and writing to a single BigQuery table.

  2. You create a new subscription (Subscription B) for the updated pipeline. You deploy the updated pipeline (Pipeline B), which reads from the Pub/Sub topic (Topic) using Subscription B and writes to a separate BigQuery table (Table B). The following diagram illustrates this flow.

    2 pipelines, each with 1 subscription. Each pipeline writes to a separate BigQuery table. A façade view reads from both tables.

    At this point, Pipeline A and Pipeline B are running in parallel and writing results to separate tables. You record time t as the timestamp of the earliest complete window that's processed by Pipeline B.

  3. You allow Pipeline A to drain when its watermark has exceeded time t. This closes any open windows and completes processing for any in-flight data. If complete windows are important (assuming no late data), you allow both pipelines to run until you have complete overlapping windows before draining the first pipeline. You stop the streaming job for Pipeline A after all in-flight data is processed and written to Table A. The following diagram shows this stage.

    Pipeline A drains and no longer reads Subscription A, and it no longer sends data to Table A after the drain is complete. All processing is handled by the second pipeline.

  4. At this point, only Pipeline B is running. You can query from a BigQuery view (Facade View), which acts as a façade for Table A and Table B. For rows that have the same timestamp in both tables, you configure the view to return the rows from Table B, or fall back to Table A if the rows don't exist in Table B. The following diagram shows the view (Facade View) reading from both Table A and Table B.

    Pipeline A is gone, and only Pipeline B runs.

    At this point, you can delete Subscription A if you want. You can merge the two BigQuery tables or keep them separate.

    Deploying overlapped (parallel) pipelines can simplify rollback if any issues are detected with a new pipeline deployment. In the previous example, you might want to keep Pipeline A running while you monitor Pipeline B for correct operation. If there are any issues with Pipeline B, you can roll back to using Pipeline A.

Handling schema mutations

Data-handling systems often need to accommodate schema mutations over time, perhaps due to changes in business requirements, or for technical reasons. Applying schema updates typically requires careful planning and execution to avoid disrupting business information systems. When you perform pipeline updates, an important consideration is handling schema mutations within streaming data pipelines without disrupting ingestion and without impacting connected systems that rely on existing schemas.

Consider a simple pipeline that reads messages that contain JSON payloads from a Pub/Sub topic. The pipeline converts each message into a TableRow instance and then writes the rows to a BigQuery table. The schema of the output table is similar to messages that are processed by the pipeline. In the following diagram, the schema is referred to as Schema A.

Pipeline that reads a subscription and writes to a BigQuery output table using Schema A.

Over time, suppose that the message schema mutates in non-trivial ways; fields are added, removed, or replaced, and Schema A evolves into a new schema. In the discussion that follows, the new schema is referred to as Schema B. In this case, Pipeline A needs to be updated, and the output table schema needs to support Schema B.

Handling schema mutations using principal and staging tables

For the output table, schema mutations that add new fields or that relax column modes (for example, from REQUIRED to NULLABLE) can be performed without downtime, and the mutations don't usually impact existing queries. However, schema mutations that modify or remove existing schema fields break queries or result in other disruptions. When existing schema fields are modified or removed, you need to plan your approach to accommodate changes without incurring downtime.

One approach to avoid disruption is to separate the data that's written by the pipeline into a principal table and into one or more staging tables. In this approach, the principal table stores historic data written by the pipeline, and staging tables store the latest pipeline output. You can define a BigQuery façade view over the principal and staging tables, which lets consumers query both historic and up-to-date data.

The following diagram revises the previous flow to include a staging table (Staging Table A), a principal table, and a façade view.

Pipeline that reads a subscription and writes to a BigQuery staging table. A second (principal) table has output from a previous version of the schema. A façade view reads from both the staging table and the principal table.

In the revised flow, Pipeline A processes messages that use Schema A and writes the output to Staging Table A, which has a compatible schema. The principal table contains historic data written by previous versions of the pipeline, plus results that are periodically merged from the staging table. Consumers can query up-to-date data, including both historic and real-time data, by using the façade view.

When the message schema mutates from Schema A to Schema B, you might update the pipeline code to be compatible with messages that use Schema B. The existing pipeline needs to be updated with the new implementation. By performing in-place updates or by running parallel pipelines, you can ensure that streaming data processing continues without disruption. On the other hand, terminating and replacing pipelines results in a break in processing because there is some period of time where no pipeline is running.

The updated pipeline writes to an additional staging table (Staging Table B) that uses Schema B. The new staging table can be created prior to updating the pipeline through an orchestrated workflow, or the updated pipeline code might create the table automatically using The façade view should also be updated (perhaps using a related workflow step) to include results from the new staging table. The following diagram shows the updated flow that shows Staging Table B with Schema B, and how the façade view has been updated to include content from the principal table and from both staging tables.

The pipeline now uses Schema B and writes to Staging Table B. A façade view reads from the Principal table, Staging Table A, and Staging Table B.

As a separate process from the pipeline update, you can merge the staging tables into the principal table, either periodically or as required. The following diagram shows how Staging Table A is merged into the principal table.

Staging Table A is merged into the principal table. The façade view reads from Staging Table B and from the principal table.

Life of a Dataflow job

A Dataflow job goes through a lifecycle that's represented by various job states. To run a Dataflow job, you submit it to a regional endpoint. From there it's routed to an available Dataflow backend in one of the zones within the region. Before Dataflow assigns a backend, it verifies that you have sufficient quota and permissions to run the job. When these preflight checks are complete and a backend has been assigned, the job moves to a JOB_STATE_PENDING state. For FlexRS jobs, the backend assignment could be delayed to a future time, and these jobs enter a JOB_STATE_QUEUED state.

The assigned backend picks up the job for execution and attempts to start Dataflow workers in your Google project. The zone that's chosen for the worker VMs depends on a number of factors. For batch jobs that use Dataflow Shuffle, the service also tries to ensure that the Dataflow backend and worker VMs are located in the same zone to avoid cross-zone traffic.

After the Dataflow workers are started, they request work from the Dataflow backend. The backend is responsible for splitting the work into parallelizable chunks, called bundles, that are distributed among the workers. If the workers can't handle the existing work, and if autoscaling is enabled, the backend starts more workers in order to handle the work. Similarly, if there are more workers than needed, some of the workers are shut down.

After the Dataflow workers have started, the Dataflow backend acts as the control plane to orchestrate the job's execution. During processing, the job's data plane performs shuffle operations such as GroupByKey, CoGroupByKey, and Combine. Jobs use one the following data-plane implementations for shuffle operations:

  • The data plane runs on the Dataflow workers, and shuffle data is stored on persistent disks. There are two variants of this implementation: Shuffle Appliance for batch pipelines and Streaming Appliance for streaming pipelines. The variant that's used for a job is selected automatically based on whether batch or streaming mode is specified when the job is created.
  • The data plane runs as a service, externalized from the worker VMs. There are two variants of this implementation, which you specify when you create the job: Dataflow Shuffle for batch pipelines and Streaming Engine for streaming pipelines. The service-based shuffle significantly improves the performance and scalability of data-shuffling operations compared to the worker-based shuffle.

Streaming jobs that enter a JOB_STATE_RUNNING state can continue running indefinitely until they're cancelled or drained, unless a job failure occurs. Batch jobs automatically terminate when all processing is completed or if an unrecoverable error occurs. Depending on how the job is stopped, Dataflow will set the job's status to one of multiple terminal states, including JOB_STATE_CANCELLED, JOB_STATE_DRAINED, or JOB_STATE_DONE.

Pipeline reliability best practices

This section discusses failures that can occur when you work with Dataflow, and describes best practices for handling errors that occur during job submission and when a pipeline runs.

Follow isolation principles

A general recommendation to improve overall pipeline reliability is to follow the isolation principles behind regions and zones. Try to ensure that your pipelines do not have critical cross-region dependencies. If you have a pipeline that has critical dependency on services from multiple regions, your pipeline is likely to be affected by a failure in any one of those regions. To help avoid this issue, deploy to multiple regions for redundancy and backup.

Handling job submission failures

You submit traditional Dataflow jobs using the Apache Beam SDK. To submit the job, you run the pipeline using the Dataflow Runner, which is specified as part of the pipeline's options. The Apache Beam SDK stages files in Cloud Storage, creates a job request file, and submits the file to Dataflow.

Alternatively, jobs that are created from Dataflow templates use different submission methods, which commonly rely on the templates API.

You might see different errors returned by Dataflow that indicate job failure for traditional and templated jobs. This section discusses different types of job submission failures, and best practices for handling or mitigating them.

Retry job submissions after transient failures

If a job fails to start due to a Dataflow service issue, retry the job a few times. This improves pipeline resilience against transient service issues.

Mitigate zonal failures by specifying a worker region

Dataflow provides regional availability and is available in multiple regions. When a user submits a job to a regional endpoint without explicitly specifying a zone, Dataflow routes the job to a zone in the specified region based on resource availability.

The recommended option for job placement is to specify a worker region using the --region flag instead of the --zone flag whenever possible. This allows Dataflow to provide an additional level of fault tolerance for your pipelines by automatically choosing the best possible zone for that job. Jobs that specify an explicit zone don't have this benefit, and they will fail if problems occur within the zone—for example, with resource exhaustion. If a job submission fails due to a zonal issue, you can often resolve the problem by retrying the job without explicitly specifying a zone.

Mitigate regional failures by storing data in multiple regions

If an entire region is unavailable, try the job in a different region. It's important to think about the availability of your data when jobs fail across regions. To protect against single-region failures without manually copying data to multiple regions, use Google Cloud resources that automatically store data in multiple regions. For example, use BigQuery multi-regional locations for datasets or Cloud Storage dual-region and multi-region buckets. If one region becomes unavailable, you can rerun the pipeline in another region where the data is available.

For an example of using multi-regional services with Dataflow, see High availability and geographic redundancy.

Handling failures in running pipelines

After a job is submitted and has been accepted, the only valid operations for the job are the following:

  • cancel for batch jobs
  • update, drain, or cancel for streaming jobs

You can't change the location of running jobs after you've submitted the job. If you're not using FlexRS, jobs usually start processing data within a few minutes after submission. (FlexRS jobs can take up to 6 hours for data processing to begin.)

This section discusses failures for running jobs and best practices for handling them.

Monitor jobs to identify and resolve issues caused by transient errors

For batch jobs, bundles that include a failing item are retried 4 times. Dataflow terminates the job when a single bundle has failed 4 times. This takes care of many transient issues. However, if a prolonged failure occurs, the maximum retry limit is usually reached quickly, which allows the job to fail quickly.

For monitoring and incident management, configure alerting rules to detect failed jobs. If a job fails, inspect job logs to identify job failures that are caused by failed work items that exceeded the retry limit.

For streaming jobs, Dataflow retries failed work items indefinitely; it won't terminate the job. However, the job could stall until the issue is resolved. Create monitoring policies to detect signs of a stalled pipeline, such as an increase in system latency and a decrease in data freshness. Implement error logging in your pipeline code to help identify pipeline stalls caused by work items that fail repeatedly.

Restart jobs in a different zone if a zonal outage occurs

After a job starts, the Dataflow workers that run user code are constrained to a single zone. If there's a zonal outage, Dataflow jobs are likely to be affected, depending on the extent of the outage.

For outages that affect only Dataflow backends, the backends are automatically migrated to a different zone by the managed service so they can continue the job. (If the job uses Dataflow Shuffle, the backend cannot be moved across zones.) If a Dataflow backend migration occurs, jobs might be temporarily stalled.

If a zonal outage occurs, jobs that are running are likely to fail or become stalled until zone availability is restored. If a zone becomes unavailable for a long period, you should stop jobs (cancel for batch jobs and drain for streaming jobs) and then restart them to let Dataflow choose a new, healthy zone.

Restart batch jobs in a different region if a regional outage occurs

If a regional outage occurs in a region where your Dataflow jobs are running, the jobs can fail or become stalled. For batch jobs, restart the job in a different region if possible. As with handling job submission failures when a region is unavailable, it's important to ensure that your data is available in different regions.

Mitigate regional outages using high availability or failover

For streaming jobs, there are different options for mitigating failures, depending on the fault tolerance and budget for your application. For a regional outage, the simplest and most cost-effective option is to wait until the outage ends. However, this might be unacceptable if your application is latency-sensitive, and if data processing must either not be disrupted or should be resumed with minimal delay. The following sections discuss options.

High-availability: Latency-sensitive with no data loss

If your application cannot tolerate data loss, run duplicate pipelines in parallel in two different regions, and have the pipelines consume the same data. This requires the same data sources to be available in both regions. The downstream applications that depend on the output of these pipelines must be able to switch between the output from these two regions. Due to the duplication of resources, this option involves the highest cost compared to other options. For an example deployment, see the next section, High availability and geographic redundancy.

Failover: Latency-sensitive with some potential data loss

If your application can tolerate potential data loss, make the streaming data source available in multiple regions. For example, using Pub/Sub, maintain two independent subscriptions (one for each region) for the same topic. If a regional outage occurs, you can start a replacement pipeline in another region, and have the pipeline consume data from the backup subscription. You should replay the backup subscription to an appropriate time to keep data loss to a minimum without sacrificing latency. Downstream applications must know how to switch to the running pipeline's output (similar to the high-availability option). This option uses fewer resources than running duplicate pipelines because only the data is duplicated.

High availability and geographic redundancy

You can run multiple streaming pipelines in parallel for high-availability data processing. For example, you can run two parallel streaming jobs in different regions, which provides geographical redundancy and fault tolerance for data processing.

By considering the geographic availability of data sources and sinks, you can operate end-to-end pipelines in a highly available, multi-region configuration. The following diagram shows an example deployment.

2 regional pipelines use separate subscriptions to read from a global Pub/Sub topic. The pipelines write to separate multi-regional BigQuery tables, one in the US and one in Europe.

The diagram shows the following flow:

  1. Pub/Sub runs in most regions around the world. This lets the service offer fast, global data access, while giving you control over where messages are stored. Pub/Sub can automatically store published messages in the Google Cloud region that's nearest to subscribers, or you can configure it to use a specific region or set of regions using message storage policies. Pub/Sub then delivers the messages to subscribers across the world, regardless of where the messages are stored. Pub/Sub clients don't need to be aware of the server locations they are connecting to, because global load-balancing mechanisms direct traffic to the nearest Google Cloud data center where messages are stored.
  2. Dataflow runs in specific Google Cloud regions. By running parallel pipelines in separate Google Cloud regions, you can isolate your jobs from failures that affect a single region. The diagram shows two instances of the same pipeline, each one running in a separate Google Cloud region.
  3. BigQuery provides regional and multi-regional dataset locations. When you choose a multi-regional location, your dataset is in at least two geographical regions. The diagram depicts two separate pipelines, each writing to a separate multi-regional dataset.

What's next