Using the Cloud Dataflow Monitoring Interface

When you execute your pipeline using the Cloud Dataflow managed service, you can view that job and any others by using Cloud Dataflow's web-based monitoring user interface. The monitoring interface lets you see and interact with your Cloud Dataflow jobs.

You can access the Cloud Dataflow Monitoring Interface by using the Google Cloud Platform Console. The monitoring interface can show you:

  • A list of all Cloud Dataflow jobs currently running and previously run
  • A graphical representation of each pipeline
  • Details about your job's status and execution
  • Links to information about the Google Cloud Platform services running your pipeline (such as Compute Engine and Cloud Storage)
  • Any errors or warnings that occur during a job

Accessing the Cloud Dataflow Monitoring Interface

To access the Cloud Dataflow Monitoring Interface:

  1. Log in to the GCP Console.
  2. Select your Cloud Platform project.
  3. Click the menu in the upper left corner.
  4. Navigate to the Big Data section and click Dataflow.

A list of Cloud Dataflow jobs appears along with their status, which might include running, succeeded, or failed. Select an individual job to see more information about that pipeline.

A list of Cloud Dataflow jobs with jobs in the running, succeeded, and failed states.
Figure 1: A list of Cloud Dataflow jobs in the GCP Console with jobs in the running, succeeded, and failed states.

Viewing a Pipeline

When you select a specific Cloud Dataflow job, the monitoring interface shows detailed information about the pipeline in that job. This includes a graphical representation of your pipeline as it is executed on the Cloud Dataflow service, as well as a job summary, the job log, and detailed information on each step in the pipeline.

An individual pipeline job in the Cloud Dataflow monitoring interface.
Figure 2: An individual pipeline job shown in the Cloud Dataflow monitoring interface.

The Cloud Dataflow monitoring interface provides a graphical representation of your pipeline: the execution graph. A pipeline's execution graph represents each transform in the pipeline as a box that contains the transform name and some status information.

Basic Execution Graph

Pipeline Code:

Java: SDK 2.x

  // Read the lines of the input text.
  p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
     // Count the words.
     .apply(new CountWords())
     // Write the formatted word counts to output.
     .apply("WriteCounts", TextIO.write().to(options.getOutput()))
         .to(options.getOutput()));

Python

(p
 # Read the lines of the input text.
 | 'ReadLines' >> beam.io.ReadFromText(options.input)
 # Count the words.
 | CountWords()
 # Write the formatted word counts to output.
 | 'WriteCounts' >> beam.io.WriteToText(options.output))

Java: SDK 1.x

  // Read the lines of the input text.
  p.apply(TextIO.Read.named("ReadLines").from(options.getInput()))
     // Count the words.
     .apply(new CountWords())
     // Write the formatted word counts to output.
     .apply(TextIO.Write.named("WriteCounts")
         .to(options.getOutput())
         .withNumShards(options.getNumShards()));
Execution Graph:

The execution graph for a WordCount pipeline as shown in the Cloud Dataflow monitoring
              interface.

Figure 3: The pipeline code for a WordCount pipeline shown side-by-side with the resulting execution graph in the Cloud Dataflow monitoring interface.

Composite Transforms

In the execution graph, composite transforms (transforms that contain multiple nested sub-transforms) are expandable. Expandable composite transforms are marked with an arrow in the graph; click the arrow to expand the transform and view the sub-transforms within.

Pipeline Code:

Java: SDK 2.x

  // The CountWords Composite Transform
  // inside the WordCount pipeline.

  public static class CountWords
    extends PTransform<PCollection<String>, PCollection<String>> {

    @Override
    public PCollection<String> apply(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(
        ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts =
        words.apply(Count.<String>perElement());

      return wordCounts;
    }
  }

Note: FormatCounts in the image to the right is not relevant to this SDK.

Python

# The CountWords Composite Transform inside the WordCount pipeline.
class CountWords(beam.PTransform):

  def expand(self, pcoll):
    return (pcoll
            # Convert lines of text into individual words.
            | 'ExtractWords' >> beam.ParDo(ExtractWordsFn())
            # Count the number of times each word occurs.
            | beam.combiners.Count.PerElement()
            # Format each word and count into a printable string.
            | 'FormatCounts' >> beam.ParDo(FormatCountsFn()))

Java: SDK 1.x

  // The CountWords Composite Transform
  // inside the WordCount pipeline.

  public static class CountWords
    extends PTransform<PCollection<String>, PCollection<String>> {

    @Override
    public PCollection<String> apply(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(
        ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts =
        words.apply(Count.<String>perElement());

      // Format each word and count into a printable string.
      PCollection<String> results = wordCounts.apply(
        ParDo.of(new FormatCountsFn()));

      return results;
    }
  }
Execution Graph:

The execution graph for a WordCount pipeline with the CountWords transform expanded
              to show its component transforms.

Figure 4: The pipeline code for the sub-steps of the CountWords transform shown side-by-side with the expanded execution graph for the entire pipeline.

Transform Names

Cloud Dataflow has a few different ways to obtain the transform name that's shown in the monitoring execution graph:

Java: SDK 2.x

  • Cloud Dataflow can use a name that you assign when you apply your transform. The first argument you supply to the apply method will be your transform name.
  • Cloud Dataflow can infer the transform name, either from the class name (if you've built a custom transform) or the name of your DoFn function object (if you're using a core transform such as ParDo).

Python

  • Cloud Dataflow can use a name that you assign when you apply your transform. You can set the transform name by specifying the transform's label argument.
  • Cloud Dataflow can infer the transform name, either from the class name (if you've built a custom transform) or the name of your DoFn function object (if you're using a core transform such as ParDo).

Java: SDK 1.x

  • Cloud Dataflow can use a name that you assign when you apply your transform. The first argument you supply to the apply method will be your transform name.
  • Cloud Dataflow can infer the transform name, either from the class name (if you've built a custom transform) or the name of your DoFn function object (if you're using a core transform such as ParDo).

Total Execution Time

The Cloud Dataflow monitoring interface provides a metric, called Total Execution Time, that tells you the amount of time each step of the pipeline is taking. This metric allows you to easily diagnose which part of your pipeline is taking more time than it should, so that you can make sure your pipeline is running as time-efficiently as possible.

You can find the Total Execution Time in the Step tab of the Monitoring Interface, as shown in the following image:

You can view the amount of time it takes a step in your pipeline to execute.
Figure 5: The Step tab that shows the total execution time for the ExtractUserScore pipeline step.

See Understanding timing in Cloud Dataflow pipelines for an example that uses the Total Execution Time metric to dig deeper into what is causing a pipeline to take more time than expected.

Side Input Metrics

Side input metrics show you how your side input access patterns and algorithms affect your pipeline's performance. When your pipeline uses a side input, Cloud Dataflow writes the collection to a persistent layer (such as a disk) and transforms read from this persistent collection. These reads and writes affect your job's execution time.

Side input metrics are available when you select a transform that creates or consumes a side input collection. You can view the metrics in the Side Input Metrics section of the Step tab.

Transforms that create a side input

If the selected transform creates a side input collection, the Side Input Metrics section displays the name of the collection, along with the following metrics:

  • Time spent writing: The execution time spent writing the side input collection.
  • Bytes written: The total number of bytes written to the side input collection.
  • Time & bytes read from side input: A table that contains additional metrics for all transforms that consume the side input collection (side input consumers).

The Time & bytes read from side input table displays the following information for each side input consumer:

  • Side input consumer: The transform name of the side input consumer.
  • Time spent reading: The time this consumer spent reading the side input collection.
  • Bytes read: The number of bytes this consumer read from the side input collection.

Figure 6 shows side input metrics for a transform that creates a side input collection. Expand the composite transform until the specific subtransform that creates the side input is visible. Then, select that subtransform to view the Side Input Metrics section.

You can select the subtransform and its side input metrics are
         visible in the Step tab.
Figure 6: The execution graph has an expanded composite transform (MakeMapView). The subtransform that creates the side input (CreateDataflowView) is selected, and the side input metrics are visible in the Step tab.

Transforms that consume one or more side inputs

If the selected transform consumes one or more side inputs, the Side Input Metrics section displays the Time & bytes read from side input table. This table displays the following information for each side input collection:

  • Side input collection: The name of the side input collection.
  • Time spent reading: The time the transform spent reading this side input collection.
  • Bytes read: The number of bytes the transform read from this side input collection.

Figure 7 shows side input metrics for a transform that reads from an side input collection.

You can select the transform and its side input metrics are
         visible in the Step tab.
Figure 7: The JoinBothCollections transform reads from a side input collection. JoinBothCollections is selected in the execution graph and the side input metrics are visible in the Step tab.

If your pipeline has a composite transform that reads a side input, expand the composite transform until the specific subtransform that reads the side input is visible. Then, select that subtransform to view the Side Input Metrics section.

Identifying side input performance issues

A common side input performance issue is reiteration. If your side input PCollection is too large, workers are unable to cache it completely in memory. As a result, the workers must repeatedly read from the persistent side input collection. In this scenario, side input metrics show that the total bytes read from the side input collection is much larger than the collection's size (total bytes written).

Figure 8 shows side input metrics for a pipeline with reiteration:

You can select the transform and its side input metrics are
         visible in the Step tab.
Figure 8: The side input collection is 563 MB, and the sum of the bytes read by consuming transforms is almost 12 GB.

To improve the performance of this pipeline, redesign your algorithm to avoid iterating or refetching the side input data. In this case, the pipeline creates the Cartesian product of two collections. The algorithm iterates through the entire side input collection for each element of the main collection. You can improve the access pattern of the pipeline by batching multiple elements of the main collection together. This change reduces the number of times workers must re-read the side input collection.

Another performance issue can occur if your pipeline performs a join by applying a ParDo with one or more large side inputs. In this case, workers can spend a large percentage of the join's execution time reading from the side input collections.

Figure 9 shows side input metrics for this scenario:

You can select the transform and its side input metrics are
         visible in the Step tab.
Figure 9: The JoinBothCollections transform has a total execution time of 18 min 31 sec. Workers spend the majority of this execution time (10 min 3 sec) reading from the 10-GB side input collection.

To improve the performance of this pipeline, use CoGroupByKey instead of side inputs.

Autoscaling

The Cloud Dataflow service automatically chooses the number of worker instances required to run your autoscaling job. The number of worker instances can change over time according to your job's needs. You can view this number and other information about your autoscaling job in the Summary tab.

You can view the number of worker instances used by your autoscaling
          pipeline as well as other information in the 'Summary' tab.

To see the history of autoscaling changes, click on the See More History link. A window with information about your pipeline's worker history will pop up.

To see the history of autoscaling changes, click on the 'See
          More History' link.

Note: You can view autoscaling details about streaming pipelines run or updated on or after December 12, 2016. If your pipeline was run or last updated prior to December 12, you will only be able to see autoscaling details after you update your pipeline.

Error Reporting

The Stackdriver Error Reporting Interface aggregates and displays errors produced in your pipelines.

The error report includes:

  • A list of errors with error messages.
  • The number of times each error occurred.
  • A histogram indicating when each error occurred.
  • The time that the error most recently occurred.

To view the error report for your pipeline, click on the Logs menu above the pipeline graph and then on the Stack Traces tab below the pipeline graph. In the Cloud Dataflow Monitoring Interface, you will see a summary of each logged error and the number of times it occurred.

A summary of each logged error and the number of times it occurred.

To see more information about the errors, click on an error summary. You will be taken to the Stackdriver Error Reporting Interface.

More information about the errors on the Stackdriver Error Reporting Interface.

The Stackdriver Error Reporting Interface offers additional functionality. See Viewing Errors for more information about the errors produced by your pipelines.

Was this page helpful? Let us know how we did:

Send feedback about...

Cloud Dataflow Documentation