Update an existing pipeline

The Apache Beam SDKs provide a way to update an ongoing streaming job on the Dataflow managed service with new pipeline code.

There are various reasons why you might want to update your existing Dataflow job:

  • You want to enhance or otherwise improve your pipeline code.
  • You want to fix bugs in your pipeline code.
  • You want to update your pipeline to handle changes in data format, or to account for version or other changes in your data source.
  • You want to patch a security vulnerability related to Container-Optimized OS for all the Dataflow workers.
  • You want to scale a streaming Apache Beam pipeline to use a different number of workers. See Manual Scaling in Streaming Mode for instructions and restrictions.

When you update your job, the Dataflow service performs a compatibility check between your currently running job and your potential replacement job. The compatibility check ensures that things like intermediate state information and buffered data can be transferred from your prior job to your replacement job.

You can also use the built-in logging infrastructure of the Apache Beam SDK to log information when you update your job. For more information, see Work with pipeline logs. To identify problems with the pipeline code, use the DEBUG logging level.

The update process and its effects

When you update a job on the Dataflow service, you replace the existing job with a new job that runs your updated pipeline code. The Dataflow service retains the job name but runs the replacement job with an updated Job ID. This process can cause downtime while the existing job stops, the compatibility check runs, and the new job starts.

The replacement job preserves the following items:

  • Any intermediate state data from the prior job.
  • Any buffered data records or metadata currently "in-flight" from the prior job. For example, some records in your pipeline might be buffered while waiting for a window to resolve.

In-flight data

"In-flight" data is still processed by the transforms in your new pipeline. However, additional transforms that you add in your replacement pipeline code might or might not take effect, depending on where the records are buffered. In this example, your existing pipeline has the following transforms:

Java

  p.apply("Read", ReadStrings())
   .apply("Format", FormatStrings());

Python

  p | 'Read' >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription)
    | 'Format' >> FormatStrings()

Go

   beam.ParDo(s, ReadStrings)
   beam.ParDo(s, FormatStrings)

You can replace your job with new pipeline code, as follows:

Java

  p.apply("Read", ReadStrings())
   .apply("Remove", RemoveStringsStartingWithA())
   .apply("Format", FormatStrings());

Python

  p | 'Read' >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription)
    | 'Remove' >> RemoveStringsStartingWithA()
    | 'Format' >> FormatStrings()

Go

  beam.ParDo(s, ReadStrings)
  beam.ParDo(s, RemoveStringsStartingWithA)
  beam.ParDo(s, FormatStrings)

Even though you add a transform to filter out strings that begin with the letter "A", the next transform (FormatStrings) might still see buffered or in-flight strings that begin with "A" that were transferred over from the prior job.

Change windowing

You can change windowing and trigger strategies for the PCollections in your replacement pipeline, but use caution. Changing the windowing or trigger strategies does not affect data that is already buffered or otherwise in-flight.

We recommend that you attempt only smaller changes to your pipeline's windowing, such as changing the duration of fixed- or sliding-time windows. Making major changes to windowing or triggers, like changing the windowing algorithm, might have unpredictable results on your pipeline output.

Launch your replacement job

To update your job, launch a new job to replace the ongoing job. When you launch your replacement job, set the following pipeline options to perform the update process in addition to the regular options of the job:

Java

  • Pass the --update option.
  • Set the --jobName option in PipelineOptions to the same name as the job that you want to update.
  • Set the --region option to the same region as the region of the job that you want to update.
  • If any transform names in your pipeline have changed, you must supply a transform mapping and pass it using the --transformNameMapping option.

Python

  • Pass the --update option.
  • Set the --job_name option in PipelineOptions to the same name as the job that you want to update.
  • Set the --region option to the same region as the region of the job that you want to update.
  • If any transform names in your pipeline have changed, you must supply a transform mapping and pass it using the --transform_name_mapping option.

Go

  • Pass the --update option.
  • Set the --job_name option to the same name as the job that you want to update.
  • Set the --region option to the same region as the region of the job that you want to update.
  • If any transform names in your pipeline have changed, you must supply a transform mapping and pass it using the --transform_name_mapping option.

gcloud CLI

To update a job using the gcloud CLI, use the gcloud dataflow flex-template run command.

  • Pass the --update option.
  • Set the JOB_NAME to the same name as the job that you want to update.
  • Set the --region option to the same region as the region of the job that you want to update.
  • If any transform names in your pipeline have changed, you must supply a transform mapping and pass it using the --transform-name-mappings option.

Specify your replacement job name

Java

When you launch your replacement job, the value you pass for the --jobName option must match exactly the name of the job you want to replace.

Python

When you launch your replacement job, the value you pass for the --job_name option must match exactly the name of the job you want to replace.

Go

When you launch your replacement job, the value you pass for the --job_name option must match exactly the name of the job you want to replace.

gcloud CLI

When you launch your replacement job, the JOB_NAME must match exactly the name of the job you want to replace.

To find the correct job name value, select your prior job in the Dataflow Monitoring Interface. Then, find the Job name field in the Job info side panel:

Figure 1: The Job info side panel for a running Dataflow job with the Job name field.

Alternatively, you can query a list of existing jobs by using the Dataflow Command-line Interface. Enter the command gcloud dataflow jobs list into your shell or terminal window to obtain a list of Dataflow jobs in your Google Cloud project, and find the NAME field for the job you want to replace:

JOB_ID                                    NAME                        TYPE       CREATION_TIME        STATE    REGION
2020-12-28_12_01_09-yourdataflowjobid     ps-topic                    Streaming  2020-12-28 20:01:10  Running  us-central1

Create the transform mapping

Java

If your replacement pipeline has changed any transform names from the names in your prior pipeline, the Dataflow service requires a transform mapping. The transform mapping maps the named transforms in your prior pipeline code to names in your replacement pipeline code. You can pass the mapping by using the --transformNameMapping command-line option, using the following general format:

--transformNameMapping= . 
{"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}

You only need to provide mapping entries in --transformNameMapping for transform names that have changed between your prior pipeline and your replacement pipeline.

Python

If your replacement pipeline has changed any transform names from the names in your prior pipeline, the Dataflow service requires a transform mapping. The transform mapping maps the named transforms in your prior pipeline code to names in your replacement pipeline code. You can pass the mapping by using the --transform_name_mapping command-line option, using the following general format:

--transform_name_mapping= .
{"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}

You only need to provide mapping entries in --transform_name_mapping for transform names that have changed between your prior pipeline and your replacement pipeline.

Go

If your replacement pipeline has changed any transform names from the names in your prior pipeline, the Dataflow service requires a transform mapping. The transform mapping maps the named transforms in your prior pipeline code to names in your replacement pipeline code. You can pass the mapping by using the --transform_name_mapping command-line option, using the following general format:

--transform_name_mapping= .
{"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}

You only need to provide mapping entries in --transform_name_mapping for transform names that have changed between your prior pipeline and your replacement pipeline.

gcloud CLI

If your replacement pipeline has changed any transform names from the names in your prior pipeline, the Dataflow service requires a transform mapping. The transform mapping maps the named transforms in your prior pipeline code to names in your replacement pipeline code. You can pass the mapping by using the --transform-name-mappings option, using the following general format:

--transform-name-mappings= .
{"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}

You only need to provide mapping entries in --transform-name-mappings for transform names that have changed between your prior pipeline and your replacement pipeline.

Determine transform names

The transform name in each instance in the map is the name that you supplied when you applied the transform in your pipeline. For example:

Java

.apply("FormatResults", ParDo
  .of(new DoFn<KV<String, Long>>, String>() {
    ...
   }
}))

Python

| 'FormatResults' >> beam.ParDo(MyDoFn())

Go

// In Go, this is always the package-qualified name of the DoFn itself.
// For example, if the FormatResults DoFn is in the main package, its name
// is "main.FormatResults".
beam.ParDo(s, FormatResults, results)

You can also obtain the transform names for your prior job by examining the execution graph of the job in the Dataflow Monitoring Interface:

Figure 2: The execution graph for a WordCount pipeline as shown in the Dataflow Monitoring Interface.

Naming in composite transforms

Transform names are hierarchical, based on the transform hierarchy in your pipeline. If your pipeline has a composite transform, the nested transforms are named in terms of their containing transform. For example, suppose that your pipeline contains a composite transform named CountWidgets, which contains an inner transform named Parse. The full name of your transform is CountWidgets/Parse, and you must specify that full name in your transform mapping.

If your new pipeline maps a composite transform to a different name, all nested transforms are also automatically renamed. You must specify the changed names for the inner transforms in your transform mapping.

Refactor the transform hierarchy

If your replacement pipeline uses a different transform hierarchy than your prior pipeline, you must explicitly declare the mapping. You might have a different transform hierarchy because you refactored your composite transforms, or your pipeline depends on a composite transform from a library that changed.

For example, your prior pipeline applied a composite transform, CountWidgets, which contained an inner transform named Parse. The replacement pipeline refactors CountWidgets, and nests Parse inside another transform named Scan. For your update to succeed, you must explicitly map the complete transform name in the prior pipeline (CountWidgets/Parse) to the transform name in the new pipeline (CountWidgets/Scan/Parse):

Java

--transformNameMapping={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}

If you delete a transform entirely in your replacement pipeline, you must provide a null mapping. Suppose that your replacement pipeline removes the CountWidgets/Parse transform entirely:

--transformNameMapping={"CountWidgets/Parse":""}

Python

--transform_name_mapping={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}

If you delete a transform entirely in your replacement pipeline, you must provide a null mapping. Suppose that your replacement pipeline removes the CountWidgets/Parse transform entirely:

--transform_name_mapping={"CountWidgets/Parse":""}

Go

--transform_name_mapping={"CountWidgets/main.Parse":"CountWidgets/Scan/main.Parse"}

If you delete a transform entirely in your replacement pipeline, you must provide a null mapping. Suppose that your replacement pipeline removes the CountWidgets/Parse transform entirely:

--transform_name_mapping={"CountWidgets/main.Parse":""}

gcloud CLI

--transform-name-mappings={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}

If you delete a transform entirely in your replacement pipeline, you must provide a null mapping. Suppose that your replacement pipeline removes the CountWidgets/Parse transform entirely:

--transform-name-mappings={"CountWidgets/main.Parse":""}

Job compatibility check

When you launch your replacement job, the Dataflow service performs a compatibility check between your replacement job and your prior job. If the compatibility check passes, your prior job is stopped. Your replacement job then launches on the Dataflow service while retaining the same job name. If the compatibility check fails, your prior job continues running on the Dataflow service and your replacement job returns an error.

Java

Python

Go

gcloud CLI

The compatibility check uses the provided transform mapping to ensure that Dataflow can transfer intermediate state data from the steps in your prior job to your replacement job. The compatibility check also ensures that the PCollections in your pipeline are using the same Coders. Changing a Coder can cause the compatibility check to fail because any in-flight data or buffered records might not be correctly serialized in the replacement pipeline.

Prevent compatibility breaks

Certain differences between your prior pipeline and your replacement pipeline can cause the compatibility to check to fail. These differences include:

  • Changing the pipeline graph without providing a mapping. When you update a job, Dataflow attempts to match the transforms in your prior job to the transforms in the replacement job. This matching process helps Dataflow transfer intermediate state data for each step. If you rename or remove any steps, you must provide a transform mapping so that Dataflow can match state data accordingly.
  • Changing the side inputs for a step. Adding side inputs to or removing them from a transform in your replacement pipeline causes the compatibility check to fail.
  • Changing the Coder for a step. When you update a job, Dataflow preserves any currently buffered data records and handles them in the replacement job. For example, buffered data might occur while windowing is resolving. If the replacement job uses different or incompatible data encoding, Dataflow is not able to serialize or deserialize these records.
  • Removing a "stateful" operation from your pipeline. If you remove stateful operations from your pipeline, your replacement job might fail the compatibility check. Dataflow can fuse multiple steps together for efficiency. If you remove a state-dependent operation from within a fused step, the check fails. Stateful operations include:

    • Transforms that produce or consume side inputs.
    • I/O reads.
    • Transforms that use keyed state.
    • Transforms that have window merging.
  • Changing stateful DoFn variables. For ongoing streaming jobs, if your pipeline includes stateful DoFns, changing the stateful DoFn variables might cause the pipeline to fail.

  • Attempting to run your replacement job in a different geographic zone. Run your replacement job in the same zone in which you ran your prior job.

Updating schemas

Apache Beam allows PCollections to have schemas with named fields, in which case explicit Coders are not needed. If the field names and types for a given schema are unchanged (including nested fields), then that schema does not cause the update check to fail. However, the update might still be blocked if other segments of the new pipeline are incompatible.

Evolve schemas

Often it's necessary to evolve a PCollection's schema due to evolving business requirements. The Dataflow service allows making the following changes to a schema when updating pipeline:

  • Adding one or more new fields to a schema, including nested fields.
  • Making a required (non-nullable) field type optional (nullable).

Removing fields, changing field names, or changing field types are not currently permitted during update.

Pass additional data into an existing ParDo operation

You can pass additional (out-of-band) data into an existing ParDo operation by using one of the following methods, depending on your use case:

  • Serialize information as fields in your DoFn subclass.
  • Any variables referenced by the methods in an anonymous DoFn are automatically serialized.
  • Compute data inside DoFn.startBundle().
  • Pass in data using ParDo.withSideInputs.

For more information, see the following pages: