Updating an Existing Pipeline

IMPORTANT: This document applies only to the Dataflow SDK for Java. It will be updated in the future to cover the Dataflow SDK for Python.

The Dataflow SDKs provide a way to update an ongoing job on the Cloud Dataflow managed service with new pipeline code.

There are various reasons why you might want to update your existing Dataflow job:

  • You want to enhance or otherwise improve your pipeline code.
  • You want to fix bugs in your pipeline code.
  • You want to update your pipeline to handle changes in data format, or to account for version or other changes in your data source.

When you update your job, the Dataflow service performs a compatibility check between your currently-running job and your potential replacement job. The compatibility check ensures that things like intermediate state information and buffered data can be transferred from your prior job to your replacement job.

The Update Process and Its Effects

When you update a job on the Dataflow service, you replace the existing job with a new job that runs your updated pipeline code. The Dataflow service retains the job name, but runs the replacement job with an updated jobId.

The replacement job preserves any intermediate state data from the prior job, as well as any buffered data records or metadata currently "in-flight" from the prior job. For example, some records in your pipeline might be buffered while waiting for a window to resolve.

In-Flight Data

"In-flight" data will still be processed by the transforms in your new pipeline; however, additional transforms that you add in your replacement pipeline code may or may not take effect, depending on where the records are buffered. For example, let's say your existing pipeline has the following transforms:

  p.apply(ReadStrings())
   .apply(FormatStrings());

Now, suppose you want to replace your job with new pipeline code, as follows:

  p.apply(ReadStrings())
   .apply(RemoveStringsStartingWithA())
   .apply(FormatStrings());

Even though you've added a transform to filter out strings that begin with the letter "A", the next transform (FormatStrings) may still see buffered or in-flight strings that begin with "A" that were transferred over from the prior job.

Changing Windowing

You can change Windowing and Trigger strategies for the PCollections in your replacement pipeline, but use caution. Changing the windowing or trigger strategies will not affect data that is already buffered or otherwise in-flight.

It's recommended that you attempt only smaller changes to your pipeline's windowing, such as changing the duration of fixed- or sliding-time windows. Making major changes to windowing or triggers, like changing the windowing algorithm, might have unpredictable results on your pipeline output.

Launching Your Replacement Job

To update your job, you'll need to launch a new job to replace the ongoing job. When you launch your replacement job, you'll need to set the following pipeline options to perform the update process (in addition to the job's regular options):

  • Pass the --update option.
  • Set the --jobName option (in DataflowPipelineOptions) to the same name as the job you want to update.
  • If any transform names in your pipeline have changed, you must supply a transform mapping and pass it using the --transformNameMapping option.

Specifying the jobName

When you launch your replacement job, the value you pass for the --jobName option must match exactly the name of the job you want to replace. To find the correct job name value, select your prior job in the Dataflow Monitoring Interface and find the Job Name field in the Summary tab:

Figure 1: The Summary tab for a running Dataflow job, with the Job Name field highlighted.

Alternatively, you can query a list of existing jobs by using the Dataflow Command-line Interface. Enter the command gcloud alpha dataflow jobs list into your shell or terminal window to obtain a list of Dataflow jobs in your Cloud Platform project, and find the NAME field for the job you want to replace:

ID                                        NAME                                 TYPE       CREATION_TIME        STATUS
2015-07-28_17_02_27-7257409117866690674   windowedwordcount-johndoe-0729000214 Streaming  2015-07-28 17:02:28  Running

Creating the Transform Mapping

If your replacement pipeline has changed any transform names from those in your prior pipeline, the Dataflow service requires a transform mapping. The transform mapping maps the named transforms in your prior pipeline code to names in your replacement pipeline code. You can pass the mapping by using the --transformNameMapping command-line option, using the following format:

  --transformNameMapping={"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}

Note that you only need to provide mapping entries in --transformNameMapping for transform names that have changed between your prior pipeline and your replacement pipeline.

Determining Transform Names

The transform name in each instance in the map is the name you supplied when you applied the transform in your pipeline, usually by invoking .named() on your transform. For example, see the ParDo named "FormatResults" from the WordCount Example Pipeline:

  .apply(ParDo
    .named("FormatResults")
    .of(new DoFn<KV<String, Long>>, String>() {
      ...
     }
  }))

You can also obtain the transform names for your prior job by examining the job's execution graph in the Dataflow Monitoring Interface:

Figure 2: The execution graph for a WordCount pipeline as shown in the Dataflow Monitoring Interface.

Naming in Composite Transforms

Transform names are hierarchical, based on the transform hierachy in your pipeline. If your pipeline has a composite transform, the nested transforms are named in terms of their containing transform. For example, suppose your pipeline contains a composite transform named CountWidgets, which contains an inner transform named Parse. The inner transform's full name will be CountWidgets/Parse, and you must specify that full name in your transform mapping.

If your new pipeline maps a composite transform to a different name, all of the nested transforms are automatically renamed as well, and you'll need to specify the changed names for the inner transforms in your transform mapping.

Refactoring the Transform Hierarchy

If your replacement pipeline uses a different transform hierarchy than your prior pipeline (e.g. because you refactored your composite transforms, or your pipeline depends on a composite transform from a library that changed), you'll need to explicitly declare the mapping.

For example, let's suppose your prior pipeline applied a composite transform, CountWidgets, that contained an inner transform named Parse. Now, let's say your replacement pipeline refactors CountWidgets, and nests Parse inside another transform named Scan. For your update to succeed, you must explicitly map the prior pipeline's complete transform name (CountWidgets/Parse) to the new pipeline's transform name (CountWidgets/Scan/Parse):

  --transformNameMapping={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}

If you delete a transform entirely in your replacement pipeline, you must provide a null mapping. Suppose our replacement pipeline removes the CountWidgets/Parse transform entirely:

  --transformNameMapping={"CountWidgets/Parse":""}

Job Compatibility Check

When you launch your replacement job, the Dataflow service performs a compatibility check between your replacement job and your prior job. If the compatibility check passes, your prior job will be stopped, and your replacement job will launch on the Dataflow service while retaining the same job name. If the compatibility check fails, your prior job will continue running on the Dataflow service, while your replacement job will return an error.

The compatibility check ensures that the Dataflow service can transfer intermediate state data from the steps in your prior job to your replacement job, as specified by the transform mapping that you provide. The compatibility check also ensures that your pipeline's PCollections are using the same Coders. Changing a Coder can cause the compatibility check to fail because any in-flight data or buffered records may not be correctly serialized in the replacement pipeline.

Preventing Compatibility Breaks

Certain differences between your prior pipeline and your replacement pipeline can cause the compatibility to check to fail. These include:

  • Changing the pipeline graph without providing a mapping. When you update a job, the Dataflow service attempts to match the transforms in your prior job to the transforms in the replacement job in order to transfer intermediate state data for each step. If you've renamed or removed any steps, you'll need to provide a transform mapping so that Dataflow can match state data accordingly.
  • Changing the side inputs for a step. Adding side inputs to or removing them from a transform in your replacement pipeline will cause the compatibility check to fail.
  • Changing the Coder for a step. When you update a job, the Dataflow service preserves any data records currently buffered (for example, while windowing is resolving) and handles them in the replacement job. If the replacement job uses different or incompatible data encoding, the Dataflow service will not be able to serialize or deserialize these records.
  • You've removed a "stateful" operation from your pipeline. Your replacement job might fail Dataflow's compatibility check if you remove certain stateful operations from your pipeline. The Dataflow service can fuse multiple steps together for efficiency; if you've removed a state-dependent operation from within a fused step, the check will fail. Stateful operations include:
    • Transforms that produce or consume side inputs
    • I/O Reads
    • Transforms that use keyed state
    • Transforms that have window merging
  • You're attempting to run your replacement job in a different geographic zone. You must run your replacement job in the same zone in which you ran your prior job.

Send feedback about...

Cloud Dataflow Documentation