Using the Dataflow monitoring interface

When you run your pipeline using the Dataflow-managed service, you can view that job and any others by using Dataflow's web-based monitoring user interface. The monitoring interface lets you see and interact with your Dataflow jobs.

You can access the Dataflow monitoring interface by using the Google Cloud Console. The monitoring interface can show you:

  • A list of all currently running Dataflow jobs and previously run jobs within the last 30 days.
  • A graphical representation of each pipeline.
  • Details about your job's status, type, and SDK version.
  • Links to information about the Google Cloud services running your pipeline, such as Compute Engine and Cloud Storage.
  • Any errors or warnings that occur during a job.
  • Additional diagnostics for a job.

You can view job monitoring charts within the Dataflow monitoring interface. These charts display metrics over the duration of a pipeline job and include the following information:

  • Step-level visibility to help identify which steps might be causing pipeline lag.
  • Statistical information that can surface anomalous behavior.
  • I/O metrics that can help identify bottlenecks in your sources and sinks.

Accessing the Dataflow monitoring interface

To access the Dataflow monitoring interface, follow these steps:

  1. Log in to the Cloud Console.
  2. Select your Google Cloud project.
  3. Click the menu in the upper left corner.
  4. Navigate to the Big Data section and click Dataflow.

A list of Dataflow jobs appears along with their status. If you don't see any jobs, you need to run a new job. To learn how to run a job, see the Dataflow quickstarts.

A list of Dataflow jobs with jobs in the running, failed, and succeeded states.
Figure 1: A list of Dataflow jobs in the Cloud Console with jobs in the Running, Failed, and Succeeded states.

A job can have the following statuses:

  • : the monitoring interface has not yet received a status from the Dataflow service.
  • Running: the job is running.
  • Starting...: the job is created, but the system needs some time to prepare before launching.
  • Queued: a FlexRS job is queued.
  • Canceling...: the job is being canceled.
  • Canceled: the job is canceled.
  • Draining...: the job is being drained.
  • Drained: the job is drained.
  • Updating...: the job is being updated.
  • Updated: the job is updated.
  • Succeeded: the job has finished successfully.
  • Failed: the job failed to complete.

For more information about a pipeline, click that job's Name.

Accessing job monitoring charts

To access a job's monitoring charts, click the job Name within the Dataflow monitoring interface. The Job details page is displayed, which contains the following information:

  • Job graph: visual representation of your pipeline
  • Execution details: tool to optimize your pipeline performance
  • Job metrics: metrics about the running of your job
  • Job info panel: descriptive information about your pipeline
  • Job logs: logs generated by the Dataflow service at the job level
  • Worker logs: logs generated by the Dataflow service at the worker level
  • Diagnostics: table showing where errors occurred along the chosen timeline and possible recommendations for your pipeline
  • Time selector: tool that lets you adjust the timespan of your metrics

Within the Job details page, you can switch your job view with the Job graph, Execution details, and Job metrics.

View of the Dataflow monitoring interface with the Job graph tab selected. You can
view your pipeline graph, Job info, Job logs, Worker logs, Diagnostics,
and the time selector tool in this mode.

View of the Dataflow monitoring interface with the Job metrics tab selected.
You can view Job metrics charts, Job info, Job logs, Worker logs, Diagnostics,
and the time selector tool in this mode.

Creating Cloud Monitoring alerts

Dataflow is fully integrated with Cloud Monitoring, which lets you create alerts when your job exceeds a user-defined threshold. To create a Cloud Monitoring alert from a metric chart, click Create alerting policy.

The **Create alerting policy** link lets you create an alert from a metric chart.

For instructions on creating these alerts, read the Using Cloud Monitoring for Dataflow pipelines page. If you are unable to see the monitoring graphs or create alerts, you might need additional Monitoring permissions.

Full screen mode

To view a metric chart in full screen, click .

Using the time selector tool

You can adjust the timespan of the metrics with the time selector tool. You can select a predefined duration or select a custom time interval to analyze your job.

The time selector tool lets you select a time range using increments
of hour and days or a custom range.

For streaming or in-flight batch jobs, the default display of the charts shows the previous six hours of metrics for that job. For stopped or completed streaming jobs, the default display of the charts shows the entire runtime of the job duration.

Step and worker metrics

You can view charts for the following metrics:

  • Data freshness (streaming pipelines only)
  • System latency (streaming pipelines only)
  • Autoscaling
  • Throughput
  • CPU utilization
  • Worker error log count
  • Input and Output Metrics

To access additional information in these charts, click the legend toggle to "Expand chart legend."

The legend toggle button is located near the Create alerting policy
button.

Data freshness (streaming pipelines only)

Data freshness is the amount of time between real time and the output watermark. Each step of your pipeline has an output data watermark. An output data watermark of T indicates that all elements with an event time before T have been processed for computation. The output data watermark is bounded by the earliest input data watermark of all upstream computations. If some input data has not yet been processed, the output watermark might be held back, which affects data freshness.

A data visualization showing data freshness in a
streaming pipeline.

System latency (streaming pipelines only)

System latency is the current maximum duration of time measured in seconds for which an item of data is processed or awaits processing. This metric indicates how long an element waits inside any one source in the pipeline. The maximum duration is adjusted after processing. The following cases are additional considerations:

  • For multiple sources and sinks, system latency is the maximum amount of time that an element waits inside a source before it is written to all sinks.
  • Sometimes, a source does not provide a value for the time period for which an element waits inside the source. In addition, the element might not have metadata to define its event time. In this scenario, system latency is calculated from the time the pipeline first receives the element.

A data visualization showing system latency in a
streaming pipeline.

Autoscaling

The Dataflow service automatically chooses the number of worker instances required to run your autoscaling job. The number of worker instances can change over time according to the job requirements.

A data visualization showing number of workers in a pipeline.

To see the history of autoscaling changes, click the More History button. A table with information about your pipeline's worker history is shown.

Table showing history of a pipeline's worker history.

Throughput

Throughput is the volume of data that is processed at any point in time. This per step metric is displayed as the number of elements per second. To view this metric in bytes per second, click Throughput (elements/sec) > Throughput (bytes/sec).

A data visualization showing throughput of four steps in a
pipeline.

Worker error log count

The Worker error log count shows you the rate of errors observed across all workers at any point in time.

A summary of each logged error and the number of times it occurred.

CPU utilization

CPU utilization is the amount of CPU used divided by the amount of CPU available for processing. This per worker metric is displayed as a percentage.

A data visualization showing CPU utilization in one Dataflow
worker.

Input and Output Metrics

Input metrics and output metrics are displayed if your streaming Dataflow job reads or writes records using Pub/Sub.

By default, all input metrics are combined and all output metrics are also combined. To change which metrics are displayed, a filter dropdown is provided in each section. The following images show all the available filters.

The filter dropdown available to the input metrics for a Dataflow job. The filter dropdown available to the output metrics for a Dataflow job.

The following two charts are displayed in both the Input Metrics and Output Metrics sections.

A series of charts showing input and output metrics for a Dataflow job.

Requests per sec

Requests per sec is the rate of API requests to read or write data by the source or sink over time. If this rate drops to zero, or decreases significantly for an extended time period relative to expected behavior, then the pipeline might be blocked from performing certain operations. Additionally, there might be no data to read. In such a case, review the job steps that have a high system watermark. Also, examine the worker logs for errors or indications about slow processing.

A chart showing the number of API requests to read or write data by the source or sink over time.

Response errors per sec by error type

Response errors per sec by error type is the rate of failed API requests to read or write data by the source or sink over time. If such errors occur frequently, these API requests might slow down processing. Such failed API requests must be investigated. To help troubleshoot these issues, review the general I/O error code documentation and any specific error code documentation used by the source or sink, such as the Pub/Sub error codes.

A chart showing the rate of failed API requests to read or write data by the source or sink over time.

Using Metrics explorer

The following Dataflow I/O metrics can be viewed in Metrics explorer:

  • job/pubsub/write_count: Pub/Sub Publish requests from PubsubIO.Write in Dataflow jobs.
  • job/pubsub/read_count: Pub/Sub Pull requests from PubsubIO.Read in Dataflow jobs.
  • job/bigquery/write_count: BigQuery Publish requests from BigQueryIO.Write in Dataflow jobs. job/bigquery/write_count metrics are available in Python pipelines using the WriteToBigQuery transform with method='STREAMING_INSERTS' enabled on Apache Beam v2.28.0 or later.

For the complete list of Dataflow metrics, see the Google Cloud metrics documentation.

Viewing a pipeline

When you select a specific Dataflow job, the monitoring interface shows information about the pipeline in that job. This information includes a graphical representation of your pipeline as it is runs on the Dataflow service, a job summary, a job log, and information about each step in the pipeline.

The Dataflow monitoring interface provides a graphical representation of your pipeline: the execution graph. A pipeline's execution graph represents each transform in the pipeline as a box. Each box contains the transform name and information about the job status, which includes the following:

  • Running: the step is running.
  • Queued: the step in a FlexRS job is queued.
  • Succeeded: the step finished successfully.
  • Stopped: the step stopped because the job stopped.
  • Unknown: the step failed to report status.
  • Failed: the step failed to complete.

Basic execution graph

Pipeline Code:

Java: SDK 2.x

  // Read the lines of the input text.
  p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
     // Count the words.
     .apply(new CountWords())
     // Write the formatted word counts to output.
     .apply("WriteCounts", TextIO.write().to(options.getOutput()));

Python

(
    pipeline
    # Read the lines of the input text.
    | 'ReadLines' >> beam.io.ReadFromText(args.input_file)
    # Count the words.
    | CountWords()
    # Write the formatted word counts to output.
    | 'WriteCounts' >> beam.io.WriteToText(args.output_path))

Java: SDK 1.x

Execution Graph:

The execution graph for a WordCount pipeline as shown in the Dataflow monitoring
              interface.

Figure 2: The pipeline code for a WordCount pipeline shown with the resulting execution graph in the Dataflow monitoring interface.

Composite transforms

In the execution graph, composite transforms, transforms that contain multiple nested sub-transforms, are expandable. Expandable composite transforms are marked with an arrow in the graph. Click the arrow to expand the transform and view the sub-transforms within.

Pipeline Code:

Java: SDK 2.x

  // The CountWords Composite Transform
  // inside the WordCount pipeline.

  public static class CountWords
    extends PTransform<PCollection<String>, PCollection<String>> {

    @Override
    public PCollection<String> apply(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(
        ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts =
        words.apply(Count.<String>perElement());

      return wordCounts;
    }
  }

Note: FormatCounts in the image to the right is not relevant to this SDK.

Python

# The CountWords Composite Transform inside the WordCount pipeline.
@beam.ptransform_fn
def CountWords(pcoll):
  return (
      pcoll
      # Convert lines of text into individual words.
      | 'ExtractWords' >> beam.ParDo(ExtractWordsFn())
      # Count the number of times each word occurs.
      | beam.combiners.Count.PerElement()
      # Format each word and count into a printable string.
      | 'FormatCounts' >> beam.ParDo(FormatCountsFn()))

Java: SDK 1.x

Execution Graph:

The execution graph for a WordCount pipeline with the CountWords transform expanded
              to show its component transforms.

Figure 3: The pipeline code for the sub-steps of the CountWords transform shown with the expanded execution graph for the entire pipeline.

Transform names

Dataflow has a few different ways to obtain the transform name that's shown in the monitoring execution graph:

Java: SDK 2.x

  • Dataflow can use a name that you assign when you apply your transform. The first argument you supply to the apply method is your transform name.
  • Dataflow can infer the transform name, either from the class name (if you've built a custom transform) or the name of your DoFn function object (if you're using a core transform such as ParDo).

Python

  • Dataflow can use a name that you assign when you apply your transform. You can set the transform name by specifying the transform's label argument.
  • Dataflow can infer the transform name, either from the class name (if you've built a custom transform) or the name of your DoFn function object (if you're using a core transform such as ParDo).

Java: SDK 1.x

Understanding the metrics

Wall time

When you click a step, the Wall time metric is displayed in the Step info panel. Wall time provides the total approximate time spent across all threads in all workers on the following actions:

  • Initializing the step
  • Processing data
  • Shuffling data
  • Ending the step

For composite steps, wall time tells you the sum of time spent in the component steps. This estimate can help you identify slow steps and diagnose which part of your pipeline is taking more time than required.

You can view the amount of time it takes for a step to run in your pipeline.
Figure 4: The Wall time metric can help you ensure your pipeline is running efficiently.

Side input metrics

Side input metrics show you how your side input access patterns and algorithms affect your pipeline's performance. When your pipeline uses a side input, Dataflow writes the collection to a persistent layer, such as a disk, and your transforms read from this persistent collection. These reads and writes affect your job's run time.

The Dataflow monitoring interface displays side input metrics when you select a transform that creates or consumes a side input collection. You can view the metrics in the Side Input Metrics section of the Step info panel.

Transforms that create a side input

If the selected transform creates a side input collection, the Side Input Metrics section displays the name of the collection, along with the following metrics:

  • Time spent writing: The time spent writing the side input collection.
  • Bytes written: The total number of bytes written to the side input collection.
  • Time & bytes read from side input: A table that contains additional metrics for all transforms that consume the side input collection, called side input consumers.

The Time & bytes read from side input table contains the following information for each side input consumer:

  • Side input consumer: The transform name of the side input consumer.
  • Time spent reading: The time this consumer spent reading the side input collection.
  • Bytes read: The number of bytes this consumer read from the side input collection.

If your pipeline has a composite transform that creates a side input, expand the composite transform until you see the specific subtransform that creates the side input. Then, select that subtransform to view the Side Input Metrics section.

Figure 5 shows side input metrics for a transform that creates a side input collection.

You can select the subtransform and its side input metrics are
         visible in the Step info side panel.
Figure 5: The execution graph has an expanded composite transform (MakeMapView). The subtransform that creates the side input (CreateDataflowView) is selected, and the side input metrics are visible in the Step info side panel.

Transforms that consume one or more side inputs

If the selected transform consumes one or more side inputs, the Side Input Metrics section displays the Time & bytes read from side input table. This table contains the following information for each side input collection:

  • Side input collection: The name of the side input collection.
  • Time spent reading: The time the transform spent reading this side input collection.
  • Bytes read: The number of bytes the transform read from this side input collection.

If your pipeline has a composite transform that reads a side input, expand the composite transform until you see the specific subtransform that reads the side input. Then, select that subtransform to view the Side Input Metrics section.

Figure 6 shows side input metrics for a transform that reads from a side input collection.

You can select the transform and its side input metrics are
         visible in the Step info side panel.
Figure 6: The JoinBothCollections transform reads from a side input collection. JoinBothCollections is selected in the execution graph and the side input metrics are visible in the Step info side panel.

Identifying side input performance issues

Reiteration is a common side input performance issue. If your side input PCollection is too large, workers can't cache the entire collection in memory. As a result, the workers must repeatedly read from the persistent side input collection.

In figure 7, side input metrics show that the total bytes read from the side input collection are much larger than the collection's size, total bytes written.

You can select the transform and its side input metrics are
         visible in the Step info side panel.
Figure 7: An example of reiteration. The side input collection is 563 MB, and the sum of the bytes read by consuming transforms is almost 12 GB.

To improve the performance of this pipeline, redesign your algorithm to avoid iterating or refetching the side input data. In this example, the pipeline creates the Cartesian product of two collections. The algorithm iterates through the entire side input collection for each element of the main collection. You can improve the access pattern of the pipeline by batching multiple elements of the main collection together. This change reduces the number of times workers must re-read the side input collection.

Another common performance issue can occur if your pipeline performs a join by applying a ParDo with one or more large side inputs. In this case, workers spend a large percentage of the processing time for the join operation reading from the side input collections.

Figure 8 shows example side input metrics for this issue:

You can select the transform and its side input metrics are
         visible in the Step info side panel.
Figure 8: The JoinBothCollections transform has a total processing time of 18 min 31 sec. Workers spend the majority of the processing time (10 min 3 sec) reading from the 10 GB side input collection.

To improve the performance of this pipeline, use CoGroupByKey instead of side inputs.

Recommendations and Diagnostics

Diagnostics

The Diagnostics tab under Logs collects and displays interesting log entries produced in your pipelines. These include messages that Dataflow thinks might indicate a probable issue with the pipeline (Service Errors), and error messages with stacktraces. Collected log entries are deduplicated and combined into error groups.

The Diagnostics tab for a Dataflow job with a Service Error error
          group.

The error report includes the following information:

  • A list of errors with error messages.
  • The number of times each error occurred.
  • A histogram indicating when each error occurred.
  • The time that the error most recently occurred.
  • The time that the error first occurred.
  • The status of the error.

To view the error report for a specific error, click the description under the Errors column. The Error reporting page is displayed. If the error is a Service Error, an additional link with documentation including further steps will be displayed ("Troubleshooting guide").

The error group detail page for a Dataflow Service Error.

To know more about the page, see Viewing errors.

Recommendations

The Recommendations tab displays insights from Dataflow regarding the pipeline. The goal of these insights is to identify situations in which improvements in cost and performance might be made. An insight contains a detailed description of the situation seen, plus a link to its respective documentation. The list of insights can be found here.

Enabling Recommendations

Recommendations can be enabled by setting the --experiments=enable_recommendations flag for the pipeline.

The Recommendations tab for a Dataflow job with sample recommendations.

What's next