Design Dataflow pipeline workflows

Pipeline development involves different stages and tasks, such as code development, testing, and delivery into production. This document explains:

  • Considerations for continuous integration and continuous delivery (CI/CD) to support automated build, testing, and pipeline deployment into different environments.
  • Dataflow features to optimize performance and resource utilization in production.
  • Approaches and watchpoints for updating streaming pipelines in production.
  • Best practices for improving pipeline reliability in production.

Continuous integration

Continuous integration (CI) requires developers to merge code into a shared repository frequently, which is useful for applications that change a lot, such as websites that are updated frequently. Although data pipelines don't usually change as much as other types of applications, CI practices provide many benefits for pipeline development. For example, test automation provides rapid feedback when defects are encountered and reduces the likelihood that regressions enter the code base.

Test automation is an important part of CI. When it's combined with appropriate test coverage, test automation run your test suite on each code commit. Your CI server can work in conjunction with a build automation tool like Maven to run your test suite as one or more steps of the CI pipeline. You can package code that successfully passes unit tests and integration tests into deployment artifacts from which pipelines are launched. This build is referred to as a passing build.

The number and types of deployment artifacts created from a passing build varies depending on how pipelines are launched. Using the Apache Beam Java SDK, you can package your pipeline code into a self-executing JAR file. You can then store the JAR file in a bucket that is hosted in the project for a deployment environment, such as the preproduction or production Google Cloud project. If you use Classic Templates (a type of templated execution), the deployment artifacts include a JSON template file, the JAR file for your pipeline, and an optional metadata template. You can then deploy the artifacts into different deployment environments using continuous delivery, as explained in the following section.

Continuous delivery and deployment

Continuous delivery (CD) copies deployment artifacts to one or more deployment environments that are ready to be launched manually. Typically, the artifacts built by the CI server are deployed to one or more preproduction environments for running end-to-end tests. The production environment is updated if all end-to-end tests pass successfully.

For batch pipelines, continuous deployment can directly launch the pipeline as a new Dataflow job. Alternatively, other systems can use the artifacts to launch batch jobs when required. For example, you can use Cloud Composer to run batch jobs within a workflow, or Cloud Scheduler to schedule batch jobs.

Streaming pipelines can be more complex to deploy than batch pipelines, and therefore can be more difficult to automate using continuous deployment. For example, you might need to determine how to replace or update an existing streaming pipeline. If you can't update a pipeline, or if you choose not to update it, you can use other methods such as coordinating multiple Dataflow jobs to minimize or prevent business disruption.

Identities and roles required for CI/CD

Your CI/CD pipeline interacts with different systems to build, test, and deploy pipelines. For example, your pipeline needs access to your source code repository. To enable these interactions, ensure that your pipeline has the proper identities and roles. The following pipeline activities might also require your pipeline to have specific identities and roles.

Integration testing with external services, including Google Cloud

When you use the Direct Runner for running ad hoc tests or system integration tests, your pipeline uses either the Google Cloud CLI credentials to consume Google Cloud data sources and sinks, or it uses the credentials provided by the GOOGLE_APPLICATION_CREDENTIALS environment variable. Ensure that the service account that's used to obtain credentials for Google Cloud resources accessed by the pipeline has sufficient roles and permissions.

Deploy artifacts to different deployment environments

Where possible, use unique credentials for each environment (effectively for each project) and limit access to resources accordingly.

Use unique service accounts for each project to read and write deployment artifacts to storage buckets. Depending on whether your pipeline uses a template, your deployment process might need to stage multiple artifacts. For example, creating and staging a Dataflow template requires permissions to write deployment artifacts that are needed to launch your pipeline, such as the pipeline's template file, to a Cloud Storage bucket.

Deploy pipelines to different deployment environments

Where possible, use unique service accounts for each project to access and manage Google Cloud resources within the project, including accessing Dataflow itself.

The service account that you use to create and manage Dataflow jobs needs to have sufficient IAM permissions for job management. For details, see the Dataflow service account section in the Dataflow security and permissions page.

The worker service account that you use to run Dataflow jobs needs permission to manage Compute Engine resources while the job runs and to manage the interaction between the Apache Beam pipeline and the Dataflow service. For details, see the Worker service account section in the Dataflow security and permissions page.

To specify a user-managed worker service account for a job, use the --serviceAccount pipeline option. If you don't specify a worker service account when you create a job, Dataflow attempts to use the Compute Engine default service account. We recommend instead that you specify a user-managed worker service account for production environments, because the Compute Engine default service account usually has a broader set of permissions than the permissions that are required for your Dataflow jobs.

In production scenarios, we recommend that you use separate service accounts for Dataflow job management and for the worker service account, which provides improved security compared to using a single service account. For example, the service account that's used to create Dataflow jobs might not need to access data sources and sinks or to use other resources that are used by the pipeline. In this scenario, the worker service account that's used to run Dataflow jobs is granted permissions to use pipeline resources. A different service account for job creation is granted permissions to manage (including creating) Dataflow jobs.

Example CI/CD pipeline

The following diagram provides a general and tool-agnostic view of CI/CD for data pipelines. Additionally, the diagram shows the relationship between development tasks, deployment environments, and the pipeline runners.

Stages of a CI/CD pipeline.

The diagram shows the following stages:

  • Code development: During code development, a developer runs pipeline code locally using the Direct Runner. In addition, developers use a sandbox environment for ad hoc pipeline execution using the Dataflow Runner.

    In typical CI/CD pipelines, the continuous integration process is triggered when a developer makes a change to the source control system, such as pushing new code to a repository.

  • Build and test: The continuous integration process compiles the pipeline code and then runs unit tests and transform integration tests using the Direct Runner. Optional system integration tests, which include integration testing with external sources and sinks using small test datasets, can also run.

    If the tests succeed, the CI process stores the deployment artifacts, which might include JAR files, Docker images, and template metadata, required to launch the pipeline to locations that are accessible to the continuous delivery process. Depending on the types of deployment artifacts generated, you might use Cloud Storage and Artifact Registry to store the different artifact types.

  • Deliver and deploy: The continuous delivery process copies the deployment artifacts to a preproduction environment or makes these artifacts available for use within that environment. Developers can manually run end-to-end tests using the Dataflow Runner, or they can use continuous deployment to initiate the test automatically. Typically, a continuous deployment approach is easier to enable for batch pipelines than for streaming pipelines. Because batch pipelines don't run continuously, it's easier to replace them with a new release.

    The process of updating streaming pipelines might be simple or complex, and you should test updates in the preproduction environment. Update procedures might not always be consistent between releases. For example, a pipeline might change in such a way that makes in-place updates impossible. For this reason, it's sometimes difficult to automate pipeline updates using continuous deployment.

If all end-to-end tests pass, you can copy the deployment artifacts or make them available to the production environment as part of the continuous delivery process. If the new pipeline updates or replaces an existing streaming pipeline, use the procedures tested in the preproduction environment to deploy the new pipeline.

Non-templated versus templated job execution

You can create a Dataflow job by using the Apache Beam SDK directly from a development environment. This type of job is called a non-templated job. Although this approach is convenient for developers, you might prefer to separate the tasks of developing and running pipelines. To make this separation, you can use Dataflow templates, which allow you to stage and run your pipelines as independent tasks. After a template is staged, other users, including non-developers, can run the jobs from the template using the Google Cloud CLI, the Google Cloud console, or the Dataflow REST API.

Dataflow offers the following types of job templates:

  • Classic Templates: Developers use the Apache Beam SDK to run the pipeline code and save the JSON serialized execution graph as the template. The Apache Beam SDK stages the template file to a Cloud Storage location, along with any dependencies that are required by the pipeline code.
  • Flex Templates: Developers use the Google Cloud CLI to package the pipeline as a Docker image, which is then stored in Artifact Registry. A Flex Template spec file is also automatically generated and stored to a user-specified Cloud Storage location. The Flex Template spec file contains metadata that describes how to run the template, such as pipeline parameters.

In addition to the Flex Template features explained in the linked documentation, Flex Templates offer advantages over Classic Templates for managing templates.

  • With Classic Templates, multiple artifacts, such as JAR files, might be stored in a Cloud Storage staging location, but without any features to manage these multiple artifacts. In comparison, a Flex Template is encapsulated within a Docker image. The image packages all dependencies, aside from the Flex Template spec, that are needed for your pipeline into one deployment artifact that's managed by Artifact Registry.
  • You can use Docker image management features for your Flex Templates. For example, you can securely share Flex Templates by granting pull (and optionally push) permissions to Artifact Registry, and use Docker image tags for different versions of your Flex Templates.

Developers can use Classic Templates and Flex Templates to launch jobs in a project that's different from the project that owns the registry and the storage bucket that hosts the template assets, or just the storage bucket if you use Classic Templates. This feature is useful if you need to isolate data processing for multiple customers into different projects and pipeline jobs. Using Flex Templates, you can further specify different versions of a Docker image to use when you launch a pipeline. This feature simplifies phased replacement of batch pipelines or streaming pipelines over multiple projects when you update templates later.

Dataflow features for optimizing resource usage

Dataflow provides the following runner-specific features to optimize resource usage, which can improve performance and lower costs:

  • Streaming Engine: Streaming Engine moves the execution of streaming pipelines out of VM workers and into a dedicated service. The benefits include improved autoscaling responsiveness, reductions in consumed worker VM resources, and automatic service updates without redeployment. Enabling Streaming Engine is recommended for streaming pipelines. The feature is enabled by default when you use the latest versions of the Apache Beam Java SDK or the Python SDK.
  • Dataflow Shuffle: Dataflow Shuffle moves shuffle operations for batch pipelines out of VM workers and into a dedicated service. The benefits include faster execution for most batch pipelines, reduced resource consumption by worker VMs, improved autoscaling responsiveness, and improved fault tolerance. Enabling Dataflow Shuffle is recommended for batch pipelines. The feature is enabled by default using the Apache Beam Java SDK and the latest Python SDK.
  • Flexible resource scheduling (FlexRS): FlexRS reduces batch processing costs by using advanced scheduling techniques, the Dataflow Shuffle service, and a combination of preemptible VM instances and regular VMs.

Update streaming pipelines in production

Unlike batch pipelines, which stop when the job is complete, streaming pipelines often run continuously in order to provide uninterrupted processing. After the initial deployment, consider the following facts when you want to update existing pipelines:

  • The pipeline's business logic must be updated in a way that minimizes or avoids disruption to the pipeline. Depending on your requirements, you might be able to tolerate a temporary disruption to processing while a new version of a pipeline is deployed. However, your application might not be able to incur any disruption.
  • Pipeline update processes need to handle schema changes in a way that minimizes disruption to message processing and to other attached systems. For example, if the schema for messages in an event-processing pipeline changes, schema changes might also be necessary in downstream data sinks.

The rest of this section describes approaches to addressing these two concerns.

Perform in-place updates to streaming pipelines

Under certain circumstances, Dataflow lets you update an ongoing streaming job directly, without having to cancel or drain the pipeline. Dataflow performs a compatibility check to ensure that the updated pipeline code can be safely deployed to the running pipeline. Certain code changes cause the compatibility check to fail, such as when side inputs are added to or removed from an existing step. For more information, see Update an existing pipeline.

In addition to applying code changes, you can use in-place updates to change the number of workers in a streaming Dataflow job. For more information, see Manually scale a streaming pipeline in the Horizontal Autoscaling page.

Stop and replace streaming pipelines

If you're able to temporarily halt processing, you can cancel or drain the pipeline. Cancelling a pipeline causes Dataflow to immediately halt processing and shut down resources as quickly as possible, which causes some loss of data that's currently being processed (in-flight data) in the pipeline. To avoid data loss, in most cases, draining is the preferred action.

Draining a pipeline immediately closes any in-process windows and fires all triggers. Although in-flight data isn't lost, draining might cause windows to have incomplete data. If this happens, in-process windows emit partial or incomplete results. For more information, see Effects of draining a job. After the existing job completes, you can launch a new streaming job that contains your updated pipeline code, which enables processing to resume.

Another drawback is that you incur some downtime between the time when the existing streaming job stops and the time when the replacement pipeline is ready to resume processing data.

However, the advantage of this approach is that it's relatively simple to cancel or drain the existing pipeline and then launch a new job with the updated pipeline.

Message reprocessing using Pub/Sub Snapshot and Seek

In some situations, after you replace or cancel a drained pipeline, you might need to reprocess previously delivered Pub/Sub messages. For example, you might need to reprocess data using updated business logic. Pub/Sub Seek is a feature that lets you replay messages from a Pub/Sub Snapshot. You can use the Pub/Sub Seek feature with Dataflow pipelines to reprocess messages from the time when the subscription snapshot is created.

During development and testing, you can also use Pub/Sub Seek to replay the known messages repeatedly to verify the output from your pipeline. When you use Pub/Sub Seek, don't seek a subscription snapshot when the subscription is being consumed by a pipeline. If you do, it can invalidate Dataflow's watermark logic and might impact the exactly-once processing of Pub/Sub messages.

A recommended gcloud CLI workflow for using Pub/Sub Seek with Dataflow pipelines in a terminal window is as follows:

  1. Create a snapshot of the subscription:

    gcloud pubsub snapshots create SNAPSHOT_NAME --subscription=PIPELINE_SUBSCRIPTION_NAME
  2. Drain or cancel the pipeline:

    gcloud dataflow jobs drain JOB_ID


    gcloud dataflow jobs cancel JOB_ID
  3. Seek to the snapshot:

    gcloud pubsub subscriptions seek SNAPSHOT_NAME
  4. Deploy a new pipeline that consumes the subscription.

Run parallel pipelines

If Dataflow can't update your job directly, you can avoid disruption of your streaming pipeline by creating a parallel pipeline. To use this method, create a new streaming job that has the updated pipeline code and run it in parallel with the existing pipeline.

When you create the new pipeline, use the same windowing strategy that you used for the existing pipeline. Let the existing pipeline continue to run until its watermark exceeds the timestamp of the earliest complete window processed by the updated pipeline. Then, drain or cancel the existing pipeline. The updated pipeline continues to run in its place and effectively takes over processing on its own.

The following diagram illustrates this process.

Pipeline B overlaps with Pipeline B for a 5-minute window.

In the diagram, Pipeline B is the updated job that takes over from Pipeline A. The value t is the timestamp of the earliest complete window processed by Pipeline B. The value w is the watermark for Pipeline A. For simplicity, a perfect watermark is assumed with no late data (processing and wall time is represented on the horizontal axis), and both pipelines use 5-minute fixed (tumbling) windows. Results are triggered after the watermark passes the end of each window.

Concurrent output occurs during the time period where the two pipelines overlap. Therefore, configure the updated pipeline to write results to a separate destination from where the existing pipeline writes results. Downstream systems can then use an abstraction over the two destination sinks, such as a database view, to query the combined results. These systems can also use the abstraction to deduplicate results from the overlapping period.

The following example describes the approach of using a simple pipeline that reads input data from Pub/Sub, performs some processing, and writes the results to BigQuery.

  1. In the initial state, the existing streaming pipeline (Pipeline A) is running and reading messages from a Pub/Sub topic (Topic) using a subscription (Subscription A). The results are written to a BigQuery table (Table A). Results are consumed through a BigQuery view, which acts as a façade to mask underlying table changes. This process is an application of a design method called the façade pattern. The following diagram shows the initial state.

    One pipeline with one subscription, and writing to a single BigQuery table.

  2. You create a new subscription (Subscription B) for the updated pipeline. You deploy the updated pipeline (Pipeline B), which reads from the Pub/Sub topic (Topic) using Subscription B and writes to a separate BigQuery table (Table B). The following diagram illustrates this flow.

    Two pipelines, each with one subscription. Each pipeline writes to a separate BigQuery table. A façade view reads from both tables.

    At this point, Pipeline A and Pipeline B are running in parallel and writing results to separate tables. You record time t as the timestamp of the earliest complete window processed by Pipeline B.

  3. You allow Pipeline A to drain when its watermark exceeds time t, which closes any open windows and completes processing for any in-flight data. If complete windows are important (assuming no late data), you allow both pipelines to run until you have complete overlapping windows before draining the first pipeline. You stop the streaming job for Pipeline A after all in-flight data is processed and written to Table A. The following diagram shows this stage.

    Pipeline A drains and no longer reads Subscription A, and it no longer sends data to Table A after the drain is complete. All processing is handled by the second pipeline.

  4. At this point, only Pipeline B is running. You can query from a BigQuery view (Façade View), which acts as a façade for Table A and Table B. For rows that have the same timestamp in both tables, you configure the view to return the rows from Table B, or fall back to Table A if the rows don't exist in Table B. The following diagram shows the view (Façade View) reading from both Table A and Table B.

    Pipeline A is gone, and only Pipeline B runs.

    At this point, you can delete Subscription A. You can merge the two BigQuery tables or keep them separate.

    Deploying overlapping (parallel) pipelines can simplify rollback if any issues are detected with a new pipeline deployment. In the previous example, you might want to keep Pipeline A running while you monitor Pipeline B for correct operation. If there are any issues with Pipeline B, you can roll back to Pipeline A.

Handle schema mutations

Data-handling systems often need to accommodate schema mutations over time, sometimes due to changes in business requirements and other times for technical reasons. Applying schema updates typically requires careful planning and execution to avoid disruptions to business information systems. When you perform pipeline updates, handling schema mutations within streaming data pipelines without disrupting ingestion and without impacting connected systems that rely on existing schemas is important.

Consider a simple pipeline that reads messages that contain JSON payloads from a Pub/Sub topic. The pipeline converts each message into a TableRow instance and then writes the rows to a BigQuery table. The schema of the output table is similar to messages that are processed by the pipeline. In the following diagram, the schema is referred to as Schema A.

Pipeline that reads a subscription and writes to a BigQuery output table using Schema A.

Over time, suppose that the message schema mutates in non-trivial ways; fields are added, removed, or replaced, and Schema A evolves into a new schema. In the discussion that follows, the new schema is referred to as Schema B. In this case, Pipeline A needs to be updated, and the output table schema needs to support Schema B.

Handling schema mutations using principal and staging tables

For the output table, schema mutations that add new fields or that relax column modes (for example, from REQUIRED to NULLABLE) can be performed without downtime, and the mutations don't usually impact existing queries. However, schema mutations that modify or remove existing schema fields break queries or result in other disruptions. When existing schema fields are modified or removed, you need to plan your approach to accommodate changes without incurring downtime.

One approach to avoid disruption is to separate the data that's written by the pipeline into a principal table and into one or more staging tables. In this approach, the principal table stores historic data written by the pipeline, and staging tables store the latest pipeline output. You can define a BigQuery façade view over the principal and staging tables, which lets consumers query both historic and up-to-date data.

The following diagram revises the previous flow to include a staging table (Staging Table A), a principal table, and a façade view.

Pipeline that reads a subscription and writes to a BigQuery staging table. A second (principal) table has output from a previous version of the schema. A façade view reads from both the staging table and the principal table.

In the revised flow, Pipeline A processes messages that use Schema A and writes the output to Staging Table A, which has a compatible schema. The principal table contains historic data written by previous versions of the pipeline, as well as results that are periodically merged from the staging table. Consumers can query up-to-date data, including both historic and real-time data, by using the façade view.

When the message schema mutates from Schema A to Schema B, you might update the pipeline code to be compatible with messages that use Schema B. The existing pipeline needs to be updated with the new implementation. By performing in-place updates or by running parallel pipelines, you can ensure that streaming data processing continues without disruption. On the other hand, terminating and replacing pipelines results in a break in processing because no pipeline is running for a period of time.

The updated pipeline writes to an additional staging table (Staging Table B) that uses Schema B. You can use an orchestrated workflow to create the new staging table before updating the pipeline. Also update the façade view to include results from the new staging table, potentially using a related workflow step.

The following diagram shows the updated flow that shows Staging Table B with Schema B and how the façade view has been updated to include content from the principal table and from both staging tables.

The pipeline now uses Schema B and writes to Staging Table B. A façade view reads from the Principal table, Staging Table A, and Staging Table B.

As a separate process from the pipeline update, you can merge the staging tables into the principal table, either periodically or as required. The following diagram shows how Staging Table A is merged into the principal table.

Staging Table A is merged into the principal table. The façade view reads from Staging Table B and from the principal table.

Life of a Dataflow job

A Dataflow job goes through a lifecycle that's represented by various job states. To run a Dataflow job, submit it to a regional endpoint. The job is then routed to an available Dataflow backend in one of the zones within the region. Before Dataflow assigns a backend, it verifies that you have sufficient quota and permissions to run the job. When these preflight checks are complete and a backend has been assigned, the job moves to a JOB_STATE_PENDING state. For FlexRS jobs, the backend assignment might be delayed to a future time, and these jobs enter a JOB_STATE_QUEUED state.

The assigned backend picks up the job to run and attempts to start Dataflow workers in your Google Cloud project. The zone that's chosen for the worker VMs depends on a number of factors. For batch jobs that use Dataflow Shuffle, the service also tries to ensure that the Dataflow backend and worker VMs are located in the same zone to avoid cross-zone traffic.

After the Dataflow workers start, they request work from the Dataflow backend. The backend is responsible for splitting the work into parallelizable chunks, called bundles, that are distributed among the workers. If the workers can't handle the existing work, and if autoscaling is enabled, the backend starts more workers in order to handle the work. Similarly, if more workers are started than are needed, some of the workers are shut down.

After the Dataflow workers start, the Dataflow backend acts as the control plane to orchestrate the job's execution. During processing, the job's data plane performs shuffle operations such as GroupByKey, CoGroupByKey, and Combine. Jobs use one the following data-plane implementations for shuffle operations:

  • The data plane runs on the Dataflow workers, and shuffle data is stored on persistent disks.
  • The data plane runs as a service, externalized from the worker VMs. This implementation has two variants, which you specify when you create the job: Dataflow Shuffle for batch pipelines and Streaming Engine for streaming pipelines. The service-based shuffle significantly improves the performance and scalability of data-shuffling operations compared to the worker-based shuffle.

Streaming jobs that enter a JOB_STATE_RUNNING state continue to run indefinitely until they're cancelled or drained, unless a job failure occurs. Batch jobs automatically stop when all processing is completed or if an unrecoverable error occurs. Depending on how the job is stopped, Dataflow sets the job's status to one of multiple terminal states, including JOB_STATE_CANCELLED, JOB_STATE_DRAINED, or JOB_STATE_DONE.

Pipeline reliability best practices

This section discusses failures that might occur when you work with Dataflow and best practices for Dataflow jobs.

Follow isolation principles

A general recommendation to improve overall pipeline reliability is to follow the isolation principles behind regions and zones. Ensure that your pipelines do not have critical cross-region dependencies. If you have a pipeline that has critical dependency on services from multiple regions, a failure in any one of those regions can impact your pipeline. To help avoid this issue, deploy to multiple regions for redundancy and backup.

Create Dataflow snapshots

Dataflow offers a snapshot feature that provides a backup of a pipeline's state. You can restore the pipeline snapshot into a new streaming Dataflow pipeline in another zone or region. You can then start the reprocessing of messages in the Pub/Sub or Kafka topics starting at the snapshot timestamp. If you set up regular snapshots of your pipelines, you can minimize Recovery Time Objective (RTO) time.

For more information about Dataflow snapshots, see Use Dataflow snapshots.

Handle job submission failures

You submit non-template Dataflow jobs using the Apache Beam SDK. To submit the job, you run the pipeline using the Dataflow Runner, which is specified as part of the pipeline's options. The Apache Beam SDK stages files in Cloud Storage, creates a job request file, and submits the file to Dataflow.

Alternatively, jobs created from Dataflow templates use different submission methods, which commonly rely on the templates API.

You might see different errors returned by Dataflow that indicate job failure for traditional and templated jobs. This section discusses different types of job submission failures and best practices for handling or mitigating them.

Retry job submissions after transient failures

If a job fails to start due to a Dataflow service issue, retry the job a few times. Retrying mitigates transient service issues.

Mitigate zonal failures by specifying a worker region

Dataflow provides regional availability and is available in multiple regions. When a user submits a job to a regional endpoint without explicitly specifying a zone, Dataflow routes the job to a zone in the specified region based on resource availability.

The recommended option for job placement is to specify a worker region using the --region flag instead of the --zone flag whenever possible. This step allows Dataflow to provide an additional level of fault tolerance for your pipelines by automatically choosing the best possible zone for that job. Jobs that specify an explicit zone don't have this benefit, and they fail if problems occur within the zone. If a job submission fails due to a zonal issue, you can often resolve the problem by retrying the job without explicitly specifying a zone.

Mitigate regional failures by storing data in multiple regions

If an entire region is unavailable, try the job in a different region. It's important to think about the availability of your data when jobs fail across regions. To protect against single-region failures without manually copying data to multiple regions, use Google Cloud resources that automatically store data in multiple regions. For example, use BigQuery multi-regional locations for datasets or Cloud Storage dual-region and multi-region buckets. If one region becomes unavailable, you can rerun the pipeline in another region where the data is available.

For an example of using multi-regional services with Dataflow, see High availability and geographic redundancy.

Handle failures in running pipelines

After a job is submitted and has been accepted, the only valid operations for the job are the following:

  • cancel for batch jobs
  • update, drain, or cancel for streaming jobs

You can't change the location of running jobs after you submit the job. If you're not using FlexRS, jobs usually start processing data within a few minutes after submission. FlexRS jobs can take up to six hours for data processing to begin.

This section discusses failures for running jobs and best practices for handling them.

Monitor jobs to identify and resolve issues caused by transient errors

For batch jobs, bundles that include a failing item are retried four times. Dataflow terminates the job when a single bundle has failed four times. This process takes care of many transient issues. However, if a prolonged failure occurs, the maximum retry limit is usually reached quickly, which allows the job to fail quickly.

For monitoring and incident management, configure alerting rules to detect failed jobs. If a job fails, inspect the job logs to identify job failures caused by failed work items that exceeded the retry limit.

For streaming jobs, Dataflow retries failed work items indefinitely. The job is not terminated. However, the job might stall until the issue is resolved. Create monitoring policies to detect signs of a stalled pipeline, such as an increase in system latency and a decrease in data freshness. Implement error logging in your pipeline code to help identify pipeline stalls caused by work items that fail repeatedly.

Restart jobs in a different zone if a zonal outage occurs

After a job starts, the Dataflow workers that run user code are constrained to a single zone. If a zonal outage occurs, Dataflow jobs are often impacted, depending on the extent of the outage.

For outages that impact only Dataflow backends, the backends are automatically migrated to a different zone by the managed service so that they can continue the job. If the job uses Dataflow Shuffle, the backend cannot be moved across zones. If a Dataflow backend migration occurs, jobs might be temporarily stalled.

If a zonal outage occurs, running jobs are likely to fail or stall until zone availability is restored. If a zone becomes unavailable for a long period, stop jobs (cancel for batch jobs and drain for streaming jobs) and then restart them to let Dataflow choose a new, healthy zone.

Restart batch jobs in a different region if a regional outage occurs

If a regional outage occurs in a region where your Dataflow jobs are running, the jobs can fail or stall. For batch jobs, restart the job in a different region if possible. It's important to ensure that your data is available in different regions.

Mitigate regional outages by using high availability or failover

For streaming jobs, depending on the fault tolerance and budget for your application, you have different options for mitigating failures. For a regional outage, the simplest and most cost-effective option is to wait until the outage ends. However, if your application is latency-sensitive or if data processing must either not be disrupted or should be resumed with minimal delay, the following sections discuss options.

High-availability: Latency-sensitive with no data loss

If your application cannot tolerate data loss, run duplicate pipelines in parallel in two different regions, and have the pipelines consume the same data. The same data sources need to be available in both regions. The downstream applications that depend on the output of these pipelines must be able to switch between the output from these two regions. Due to the duplication of resources, this option involves the highest cost compared to other options. For an example deployment, see the section High availability and geographic redundancy.

Failover: Latency-sensitive with some potential data loss

If your application can tolerate potential data loss, make the streaming data source available in multiple regions. For example, using Pub/Sub, maintain two independent subscriptions for the same topic, one for each region. If a regional outage occurs, start a replacement pipeline in another region, and have the pipeline consume data from the backup subscription.

Replay the backup subscription to an appropriate time to keep data loss to a minimum without sacrificing latency. Downstream applications must know how to switch to the running pipeline's output, similar to the high-availability option. This option uses fewer resources than running duplicate pipelines because only the data is duplicated.

High availability and geographic redundancy

You can run multiple streaming pipelines in parallel for high-availability data processing. For example, you can run two parallel streaming jobs in different regions, which provides geographical redundancy and fault tolerance for data processing.

By considering the geographic availability of data sources and sinks, you can operate end-to-end pipelines in a highly available, multi-region configuration. The following diagram shows an example deployment.

2 regional pipelines use separate subscriptions to read from a global Pub/Sub topic. The pipelines write to separate multi-regional BigQuery tables, one in the US and one in Europe.

The diagram shows the following flow:

  1. Pub/Sub runs in most regions around the world, which lets the service offer fast, global data access, while giving you control over where messages are stored. Pub/Sub can automatically store published messages in the Google Cloud region that's nearest to subscribers, or you can configure it to use a specific region or set of regions by using message storage policies.

    Pub/Sub then delivers the messages to subscribers across the world, regardless of where the messages are stored. Pub/Sub clients don't need to be aware of the server locations they are connecting to, because global load-balancing mechanisms direct traffic to the nearest Google Cloud data center where messages are stored.

  2. Dataflow runs in specific Google Cloud regions. By running parallel pipelines in separate Google Cloud regions, you can isolate your jobs from failures that affect a single region. The diagram shows two instances of the same pipeline, each one running in a separate Google Cloud region.

  3. BigQuery provides regional and multi-regional dataset locations. When you choose a multi-regional location, your dataset is in at least two geographical regions. The diagram depicts two separate pipelines, each writing to a separate multi-regional dataset.