Upgrade a streaming pipeline

This page provides guidance and recommendations for upgrading your streaming pipelines. For example, you might need to upgrade to a newer version of the Apache Beam SDK, or you might want to update your pipeline code. Different options are provided to suit different scenarios.

Whereas batch pipelines stop when the job is complete, streaming pipelines often run continuously in order to provide uninterrupted processing. Therefore, when you upgrade streaming pipelines, you need to account for the following considerations:

  • You might need to minimize or avoid disruption to the pipeline. In some cases, you might be able to tolerate a temporary disruption to processing while a new version of a pipeline is deployed. In other cases, your application might not be able to tolerate any disruption.
  • Pipeline update processes need to handle schema changes in a way that minimizes disruption to message processing and to other attached systems. For example, if the schema for messages in an event-processing pipeline changes, schema changes might also be necessary in downstream data sinks.

You can use one of the following methods to update streaming pipelines, depending on your pipeline and update requirements:

For more information about issues you might encounter during an update and how to prevent them, see Validate a replacement job and Job compatibility check.

Best practices

  • Upgrade the Apache Beam SDK version separately from any pipeline code changes.
  • Test your pipeline after each change before making additional updates.
  • Regularly upgrade the Apache Beam SDK version that your pipeline uses.

Perform in-flight updates

You can update some ongoing streaming pipelines without stopping the job. This scenario is called an in-flight job update. In-flight job updates are only available in limited circumstances:

  • The job must use Streaming Engine.
  • The job must be in the running state.
  • You are only changing the number of workers that the job uses.

For more information, see Set the autoscaling range in the Horizontal Autoscaling page.

For instructions explaining how to perform an in-flight job update, see Update an existing pipeline.

Launch a replacement job

If the updated job is compatible with the existing job, you can update your pipeline by using the update option. When you replace an existing job, a new job runs your updated pipeline code. The Dataflow service retains the job name but runs the replacement job with an updated Job ID. This process might cause downtime while the existing job stops, the compatibility check runs, and the new job starts. For more details, see The effects of replacing a job.

Dataflow performs a compatibility check to ensure that the updated pipeline code can be safely deployed to the running pipeline. Certain code changes cause the compatibility check to fail, such as when side inputs are added to or removed from an existing step. When the compatibility check fails, you can't perform an in-place job update.

For instructions explaining how to launch a replacement job, see Launch a replacement job.

If the pipeline update is incompatible with the current job, you need to stop and replace the pipeline. If your pipeline can't tolerate downtime, run parallel pipelines.

Stop and replace pipelines

If you can temporarily halt processing, you can cancel or drain the pipeline, and then replace it with the updated pipeline. Cancelling a pipeline causes Dataflow to immediately halt processing and shut down resources as quickly as possible, which can cause some loss of data that's being processed, known as in-flight data. To avoid data loss, in most cases, draining is the preferred action.

Draining a pipeline immediately closes any in-process windows and fires all triggers. Although in-flight data isn't lost, draining might cause windows to have incomplete data. If this happens, in-process windows emit partial or incomplete results. For more information, see Effects of draining a job. After the existing job completes, launch a new streaming job that contains your updated pipeline code, which enables processing to resume.

With this method, you incur some downtime between the time when the existing streaming job stops and the time when the replacement pipeline is ready to resume processing data. However, canceling or draining an existing pipeline and then launching a new job with the updated pipeline is less complicated than running parallel pipelines.

For more detailed instructions, see Drain a Dataflow job. After you drain the current job, start a new job with the same job name.

Message reprocessing with Pub/Sub Snapshot and Seek

In some situations, after you replace or cancel a drained pipeline, you might need to reprocess previously delivered Pub/Sub messages. For example, you might need to use updated business logic to reprocess data. Pub/Sub Seek is a feature that lets you replay messages from a Pub/Sub snapshot. You can use Pub/Sub Seek with Dataflow to reprocess messages from the time when the subscription snapshot is created.

During development and testing, you can also use Pub/Sub Seek to replay the known messages repeatedly to verify the output from your pipeline. When you use Pub/Sub Seek, don't seek a subscription snapshot when the subscription is being consumed by a pipeline. If you do, the seek can invalidate Dataflow's watermark logic and might impact the exactly-once processing of Pub/Sub messages.

A recommended gcloud CLI workflow for using Pub/Sub Seek with Dataflow pipelines in a terminal window is as follows:

  1. To create a snapshot of the subscription, use the gcloud pubsub snapshots create command:

    gcloud pubsub snapshots create SNAPSHOT_NAME --subscription=PIPELINE_SUBSCRIPTION_NAME
    
  2. To drain or cancel the pipeline, use the gcloud dataflow jobs drain command or the gcloud dataflow jobs cancel command:

    gcloud dataflow jobs drain JOB_ID
    

    or

    gcloud dataflow jobs cancel JOB_ID
    
  3. To seek to the snapshot, use the gcloud pubsub subscriptions seek command:

    gcloud pubsub subscriptions seek SNAPSHOT_NAME
    
  4. Deploy a new pipeline that consumes the subscription.

Run parallel pipelines

If you need to avoid disruption to your streaming pipeline during an update, run parallel pipelines. Create a new streaming job that has the updated pipeline code, and run the new pipeline in parallel with the existing pipeline.

When you create the new pipeline, use the same windowing strategy that you used for the existing pipeline. Let the existing pipeline continue to run until its watermark exceeds the timestamp of the earliest complete window processed by the updated pipeline. Then, drain or cancel the existing pipeline. The updated pipeline continues to run in its place and effectively takes over processing on its own.

The following diagram illustrates this process.

Pipeline B overlaps with Pipeline B for a 5-minute window.

In the diagram, Pipeline B is the updated job that takes over from Pipeline A. The value t is the timestamp of the earliest complete window processed by Pipeline B. The value w is the watermark for Pipeline A. For simplicity, a perfect watermark is assumed with no late data. Processing and wall time are represented on the horizontal axis. Both pipelines use five-minute fixed (tumbling) windows. Results are triggered after the watermark passes the end of each window.

Because concurrent output occurs during the time period where the two pipelines overlap, configure the two pipelines to write results to different destinations. Downstream systems can then use an abstraction over the two destination sinks, such as a database view, to query the combined results. These systems can also use the abstraction to deduplicate results from the overlapping period.

The following example describes the approach of using a pipeline that reads input data from Pub/Sub, performs some processing, and writes the results to BigQuery.

  1. In the initial state, the existing streaming pipeline (Pipeline A) is running and reading messages from a Pub/Sub topic (Topic) by using a subscription (Subscription A). The results are written to a BigQuery table (Table A). Results are consumed through a BigQuery view, which acts as a façade to mask underlying table changes. This process is an application of a design method called the façade pattern. The following diagram shows the initial state.

    One pipeline with one subscription, and writing to a single BigQuery table.

  2. Create a new subscription (Subscription B) for the updated pipeline. Deploy the updated pipeline (Pipeline B), which reads from the Pub/Sub topic (Topic) by using Subscription B and writes to a separate BigQuery table (Table B). The following diagram illustrates this flow.

    Two pipelines, each with one subscription. Each pipeline writes to a separate BigQuery table. A façade view reads from both tables.

    At this point, Pipeline A and Pipeline B are running in parallel and writing results to separate tables. You record time t as the timestamp of the earliest complete window processed by Pipeline B.

  3. When the watermark of Pipeline A exceeds time t, drain Pipeline A. When you drain the pipeline, any open windows close, and processing for in-flight data completes. If complete windows are important (assuming no late data), before draining Pipeline A, let both pipelines to run until you have complete overlapping windows. Stop the streaming job for Pipeline A after all in-flight data is processed and written to Table A. The following diagram shows this stage.

    Pipeline A drains and no longer reads Subscription A, and it no longer sends data to Table A after the drain is complete. All processing is handled by the second pipeline.

  4. At this point, only Pipeline B is running. You can query from a BigQuery view (Façade View), which acts as a façade for Table A and Table B. For rows that have the same timestamp in both tables, configure the view to return the rows from Table B, or, if the rows don't exist in Table B, fall back to Table A. The following diagram shows the view (Façade View) reading from both Table A and Table B.

    Pipeline A is gone, and only Pipeline B runs.

    At this point, you can delete Subscription A.

When issues are detected with a new pipeline deployment, having parallel pipelines can simplify rollback. In this example, you might want to keep Pipeline A running while you monitor Pipeline B for correct operation. If any issues occur with Pipeline B, you can roll back to Pipeline A.

Handle schema mutations

Data-handling systems often need to accommodate schema mutations over time, sometimes due to changes in business requirements and other times for technical reasons. Applying schema updates typically requires careful planning and execution to avoid disruptions to business information systems.

Consider a pipeline that reads messages that contain JSON payloads from a Pub/Sub topic. The pipeline converts each message into a TableRow instance and then writes the rows to a BigQuery table. The schema of the output table is similar to messages that are processed by the pipeline. In the following diagram, the schema is referred to as Schema A.

Pipeline that reads a subscription and writes to a BigQuery output table using Schema A.

Over time, the message schema might mutate in non-trivial ways. For example, fields are added, removed, or replaced. Schema A evolves into a new schema. In the discussion that follows, the new schema is referred to as Schema B. In this case, Pipeline A needs to be updated, and the output table schema needs to support Schema B.

For the output table, you can perform some schema mutations without downtown. For example, you can add new fields or relax column modes, such as changing REQUIRED to NULLABLE, without downtime. These mutations don't usually impact existing queries. However, schema mutations that modify or remove existing schema fields break queries or result in other disruptions. The following approach accommodates changes without requiring downtime.

Separate the data that's written by the pipeline into a principal table and into one or more staging tables. The principal table stores historic data written by the pipeline. Staging tables store the latest pipeline output. You can define a BigQuery façade view over the principal and staging tables, which lets consumers query both historic and up-to-date data.

The following diagram revises the previous pipeline flow to include a staging table (Staging Table A), a principal table, and a façade view.

Pipeline that reads a subscription and writes to a BigQuery staging table. A second (principal) table has output from a previous version of the schema. A façade view reads from both the staging table and the principal table.

In the revised flow, Pipeline A processes messages that use Schema A and writes the output to Staging Table A, which has a compatible schema. The principal table contains historic data written by previous versions of the pipeline, as well as results that are periodically merged from the staging table. Consumers can query up-to-date data, including both historic and real-time data, by using the façade view.

When the message schema mutates from Schema A to Schema B, you might update the pipeline code to be compatible with messages that use Schema B. The existing pipeline needs to be updated with the new implementation. By running parallel pipelines, you can ensure that streaming data processing continues without disruption. Terminating and replacing pipelines results in a break in processing, because no pipeline is running for a period of time.

The updated pipeline writes to an additional staging table (Staging Table B) that uses Schema B. You can use an orchestrated workflow to create the new staging table before updating the pipeline. Update the façade view to include results from the new staging table, potentially using a related workflow step.

The following diagram shows the updated flow that shows Staging Table B with Schema B and how the façade view is updated to include content from the principal table and from both staging tables.

The pipeline now uses Schema B and writes to Staging Table B. A façade view reads from the Principal table, Staging Table A, and Staging Table B.

As a separate process from the pipeline update, you can merge the staging tables into the principal table, either periodically or as required. The following diagram shows how Staging Table A is merged into the principal table.

Staging Table A is merged into the principal table. The façade view reads from Staging Table B and from the principal table.

What's next