Building production-ready data pipelines using Dataflow: Monitoring data pipelines

This document discusses how to monitor and create alerts for your data pipelines. It's part of a series that helps you improve the production readiness of your data pipelines by using Dataflow. The series is intended for a technical audience whose responsibilities include the development, deployment, and monitoring of Dataflow pipelines, and who have a working understanding of Dataflow and Apache Beam.

The documents in the series include the following parts:

Various SLOs relating to data freshness and correctness were discussed in an earlier document in this series, Planning data pipelines. This document explains how you can use different metrics as service level indicators (SLIs) to gauge different SLOs. For each scenario, this document includes an example that shows an approach to monitoring and an alerting policy, all based on Cloud Monitoring.

Introduction

Monitoring is an important part of productionizing data pipelines. It lets you combine different types of metrics and observe important indicators of pipeline performance, called service level indicators (SLIs). By comparing SLIs against acceptable thresholds, monitoring gives you critical insight for early detection of potential issues, and lets you define and measure service level objectives (SLOs).

Monitoring and alerting for data pipelines fall into the following areas:

  • Infrastructure metrics such as CPU, memory, disks, networking, and worker autoscaling. Dataflow jobs use Compute Engine resources to execute your pipeline, so you should include those resources in your monitoring strategy. For example, infrastructure monitoring can reveal pressure on specific resources that affect pipeline performance. These insights are useful for day-to-day operations and for identifying potential optimizations to pipeline code, system integrations, and Dataflow job configurations.
  • Pipeline metrics that are measured at key pipeline steps, such as system lag and data watermark. While infrastructure metrics measure computing-resource utilization, pipeline metrics are the basis of SLIs that can more directly measure business SLOs. For example, you can use a pipeline metric that tracks the count of dead letters to set an SLO target for data correctness.
  • Pipeline logs and errors that are generated from your pipeline code, which can indicate code defects and other unanticipated behavior. Dataflow provides built-in infrastructure for writing pipeline logs to Cloud Logging.
  • Service logs and errors that are reported by the Dataflow service during job submission and execution, and by any other Google Cloud services used by your pipeline. These include quota warnings, such as insufficient compute resources to autoscale your pipeline, which are captured in Cloud Logging. For more information about other types of errors you might encounter, see Common error guidance.

This document focuses on pipeline metrics that relate to the scenarios described in Planning data pipelines: Defining and measuring SLOs.

Monitoring tools

You can monitor Dataflow pipelines through the Dataflow monitoring interface and by using Cloud Monitoring.

Dataflow monitoring interface

Dataflow provides a web-based monitoring interface that you can use to view and manage jobs. In addition, the monitoring interface provides a range of important observability features, which helps you diagnose common problems such as pipelines that are stalled or that are not performing efficiently.

Although Cloud Monitoring provides extensive flexibility for pipeline monitoring, the built-in Dataflow monitoring interface offers several observability features that don't require any additional configuration. Examples include the following:

  • Time-series graphs that display critical metrics for running jobs, such as system lag and data freshness.
  • Step-level metrics that can help identify which steps might be causing a stalled pipeline or an unacceptable system lag.
  • I/O metrics that can help identify bottlenecks in sources and sinks.
  • Other statistical information that can expose anomalous behavior.

The Dataflow monitoring interface also provides a range of enhanced usability features. These include the following:

Cloud Monitoring

Dataflow's integration with Cloud Monitoring gives you access to many useful metrics for Dataflow, metrics for Compute Engine, and metrics for other Google Cloud services.

You can create customized charts and dashboards to visualize these metrics. You can also use alerting capabilities to notify you of various conditions, such as high system lag or failed jobs. Additionally, Cloud Monitoring provides a comprehensive suite of capabilities for logging, error reporting, incident management, diagnostics, and more.

For more information, see Using Monitoring for Dataflow pipelines.

Data freshness

In many situations, data freshness greatly influences the usability and usefulness of data. For example, in a time-sensitive use case like next-day demand forecasting, the data has to be processed within a strict timeframe, otherwise it can't be used to drive business decisions. This section discusses examples of how to monitor data freshness.

Example: Percentage of data processed in a given time period

Suppose you have a recurring batch job that should be at least 80% complete (as measured by the proportion of the dataset that has been processed) within 45 minutes from the start of the job. This type of SLO is mentioned in the Site Reliability Engineering book as a common data pipeline freshness SLO format, expressed as X% of data processed in Y [seconds, days, minutes].

Decide on pipeline metrics

In this example, the following metrics are relevant:

  • The elapsed pipeline run time, which is provided by the job/elapsed_time metric for the pipeline.
  • The percentage of job completion at a specified time, which is the ratio of elements that have completed processing to the number of all elements in the input dataset. You need to choose appropriate metrics to derive the measure of progress, which can differ according to how your pipeline works.

    You can approach the problem in the following ways:

    • Use PCollection element counts: You can use the element counts to calculate a completion ratio, but only if the following conditions are met. First, the input and output datasets must be contained in two separate PCollection objects. Second, each input element results in exactly one output element after element-wise processing—for example, by using the MapElements transform. You can determine PCollection element counts using the job/element_count metric.

    • Use custom metrics: For element-wise transformations that can yield zero or more output elements, such as FlatMap, a ratio based on the direct comparison of PCollection element counts is unlikely to be useful. If you cannot form a meaningful ratio using PCollection element counts, you can use custom metrics at different pipeline steps and then compare related metrics to determine job progress. For example, you can compare a total_payments counter in an early pipeline step with a processed_payments counter at a later step, where each counter is incremented by your pipeline's business logic.

  • You can monitor and create alerts for related metrics, such asjob/elements_produced_count, to understand processing rates at key pipeline steps.

Example pipeline

In the following pipeline code, input is read from one or more files using the Read input transform, and each line of input is processed using the Process element transform.

final class MyFn extends DoFn<String, String> {
  @ProcessElement
  public void processElement(ProcessContext c) {
    // Do some processing
    String result = doMyProcessing(c.element());
    // Output result
    c.output(result);
  }
}

Pipeline p = Pipeline.create(options);

p.apply("Read inputs", TextIO.read().from(...))
    .apply("Unfuse", new UnfuseTransform())
    .apply("Process element", ParDo.of(new MyFn()))
    .apply("Write output", TextIO.write().to(...));

You can use the Dataflow job/element_count metric to instrument the count of elements in the output PCollection objects for the Read input and Process element transforms. After the Read input transform completes, you can use the ratio of the element counts between the two output PCollection objects to determine what proportion of the dataset has been processed.

The example pipeline includes an Unfuse step, which is an instance of a hypothetical PTransform transform (UnfuseTransform) that implements a GroupByKey and ungroup operation. Injecting the Unfuse step prevents Dataflow from fusing the Read input and Process element steps together. As a result, the Read input step is able to complete execution independently of the Process element step.

For more information about fusion, see Fusion Optimization in the Dataflow documentation. For a code example that shows how to use GroupByKey and an ungroup operation to unfuse steps, see Identify performance issues caused by inappropriately fused steps in this series.

When the Read input step has completed, you can get an accurate element count of the total input dataset size by reading the jobs/element_count value for the Read input.out0 output PCollection object.

Create charts for monitoring

The following example charts two metrics:

  • The job/element_count metric for the Read input.out0 PCollection object, Total input elements.
  • The job/element_count metric for the Process element.out0 PCollection object, Processed elements.

Line chart with total input elements and number of processed elements.

By plotting values for both metrics, you can see the relationship between them, and you can visually inspect the trajectory of processed data. If the chart shows any issues, such as declining processing rates, you can correlate these observations with other metrics and with error reporting to aid your organization's response and remediation efforts.

The following chart plots job/elapsed_time on a time series. A threshold line provides a visual aid to indicate 45 minutes (2700 seconds). In this graph, the pipeline runtime has exceeded the threshold.

Line chart of job elapsed time.

The job/elements_produced_count metric for the Process element transform is shown on a separate chart. In the following chart, the processing rate of the Process element transform drops below the threshold and trends towards zero.

Chart showing the processing rate for the MyFn transform.

This might be normal if the job is nearing completion. However, when a low processing rate is correlated with other metrics, it can also be an indication of abnormal pipeline behavior that warrants further investigation.

Create alerting policy

You can create ratio-based thresholds within Cloud Monitoring alerting policies, as shown in the following JSON example:

{
   "displayName":"Slow job alert",
   "combiner":"AND",
   "conditions":[
      {
         "displayName":"Job duration exceeds 45 minutes",
         "conditionThreshold":{
            "aggregations":[
               {
                  "alignmentPeriod":"60s",
                  "crossSeriesReducer":"REDUCE_MEAN",
                  "groupByFields":[
                     "resource.label.job_name"
                  ],
                  "perSeriesAligner":"ALIGN_MEAN"
               }
            ],
            "filter":"metric.type=\"dataflow.googleapis.com/job/elapsed_time\" AND
                       resource.type=\"dataflow_job\"",
            "comparison":"COMPARISON_GT",
            "thresholdValue":2700,
            "duration":"0s"
         }
      },
      {
         "displayName":"Ratio of processed to unprocessed is less than 80%",
         "conditionThreshold":{
            "aggregations":[
               {
                  "alignmentPeriod":"60s",
                  "crossSeriesReducer":"REDUCE_MEAN",
                  "groupByFields":[
                     "resource.label.job_name"
                  ],
                  "perSeriesAligner":"ALIGN_MEAN"
               }
            ],
            "comparison":"COMPARISON_LT",
            "filter":"metric.type=\"dataflow.googleapis.com/job/element_count\" AND
                       metric.label.pcollection=\"Process elements.out0\" AND
                       resource.type = \"dataflow_job\"",
            "denominatorFilter":"metric.type=\"dataflow.googleapis.com/job/element_count\" AND
                                  metric.label.pcollection=\"Read input.out0\" AND
                                  resource.type = \"dataflow_job\"",
            "denominatorAggregations":[
               {
                  "alignmentPeriod":"60s",
                  "crossSeriesReducer":"REDUCE_MEAN",
                  "groupByFields":[
                     "resource.label.job_name"
                  ],
                  "perSeriesAligner":"ALIGN_MEAN"
               }
            ],
            "thresholdValue":0.8,
            "duration":"0s"
         }
      }
   ]
}

The example creates a Cloud Monitoring alerting policy with the following threshold conditions, which are combined using a logical AND:

  • Any running pipeline that exceeds a running time of 2700 seconds (or 45 minutes) and that has a matching job name. This condition uses the Dataflow elapsed_time metric.
  • The ratio of processed to unprocessed elements representing less than 80% completion. If denominatorFilter is specified in a conditionThreshold metric, the denominatorFilter metric becomes the denominator of the ratio, and filter becomes the numerator.

Example: The oldest data is not older than Y [seconds, days, minutes]

Suppose you're using a streaming data pipeline to update a player leaderboard for a massively multiplayer online game. Multiple matches between groups of players are played simultaneously, and the pipeline updates each player's score in a central database at the end of each match. When each player's score is updated, the pipeline finds all other players who have higher scores in order to suggest future matches against more difficult opponents.

For this example SLO, the time between the end of a match and the update of a player's score should generally not exceed 120 seconds. If the delay does exceed 120 seconds, the period of this extended delay should not last more than 5 minutes.

Decide on pipeline metrics

Dataflow provides standard metrics to measure how quickly processing occurs as new data arrives in the pipeline. You can use the following metrics as SLIs:

  • Data watermark age (or lag), which is the age of the most recent item that's been fully processed by the pipeline. In other words, it's the duration between the current time and the watermark at a pipeline step. This Dataflow metric is available as job/data_watermark_age in Cloud Monitoring. Relative to a target threshold, the data watermark age is the primary SLI that indicates whether the SLO is being met.
  • System lag, which is the maximum duration that an item has been awaiting processing at a pipeline step. This Dataflow metric is available as job/system_lag in Cloud Monitoring. A system lag that's close to the acceptable threshold could indicate difficulty in meeting the SLO.

Individual steps in a running pipeline have their own system lag (calculated from the oldest element in the nearest input queue to that step), and data watermark age (calculated from the most recent element that's processed by the step). In Cloud Monitoring, job/system_lag is the maximum system lag of all steps, and job/data_watermark_age is the age of the most recent element that's processed from all outputs.

Example pipeline

The following Java code snippet implements an example pipeline for the scenario.

final class ComputeScoresFn extends DoFn<ScoreEvent, String> {
  @ProcessElement
  public void processElement(ProcessContext c) {
    ScoreEvent event = c.element();
    // Do some processing to compute and update score
    // ...
    // Find all higher scoring players to pit the player against!
    for (Player p : getHigherScoringPlayers(event.getScore())) {
      if (isPlayerIdealMatch(event.getPlayerId(), p.getPlayerId()) {
        c.output(KV.of(event.getPlayerId(), p));
      }
    };
  }
}

Pipeline p = Pipeline.create(options);

p.apply("Read score update events", PubsubIO.readStrings().fromTopic())
    .apply(
        "Parse event",
        MapElements.via(
            new SimpleFunction<String, ScoreEvent>() {
              ScoreEvent apply(String payload) {
                // ... returns ScoreEvent object parsed from String
              }
            }))
    .apply("Update scores", ParDo.of(new ComputeScoresFn()))
    .apply("Send rematch suggestions", ParDo.of(PubsubIO.writeStrings().to()));

p.run();

The pipeline reads score updates (parsed into ScoreEvent objects) from a Pub/Sub topic. Each ScoreEvent object is processed by a DoFn subclass (ComputeScoreFn) that updates the score database and calls other helper functions to determine future player match-ups. Each match-up is output as a key-value pair that includes the player with the updated score and another player. The key-value pair is written to a Pub/Sub topic for further processing by a matchmaking system.

You might notice that ComputeScoresFn could be a high fan-out transform that produces multiple output elements per input element. The following section discusses potential effects of the high fan-out transform on pipeline performance and behavior.

Create charts for monitoring

You can create a chart that plots both system lag and data watermark age for a pipeline. Combining both metrics into one chart makes it easier to see the correlation between the two metrics.

The following chart plots the system lag and data watermark over a roughly 1-hour period.

Chart showing system lag and data watermark rising and peaking around 10:45 PM.

The purple line represents the job/system_lag metric, and the orange line represents the job/data_watermark_age metric. A visual threshold line that's manually set at 2 minutes indicates a maximum peak for both metrics, which is expected under normal circumstances.

You can see the following from the chart:

  • Until approximately 10:20 PM, both system lag and data watermark age are generally below 120 seconds, but have regular peaks that increase both metrics up to the general acceptable threshold. However, the elevated metrics are returned to lower levels within the acceptable 5-minute period.
  • Beginning at approximately 10:20 PM, there is significant elevation of both the system lag and data watermark metrics that persist beyond the acceptable 5-minute period. In the scenario, a Cloud Monitoring alert should be triggered to notify pipeline operators of an incident that requires attention and possible manual remediation.
  • The age of the oldest item awaiting processing (the system lag) begins to significantly decline at about 10:45, and the data watermark age follows the same trend at approximately 10:48 PM. Acceptable levels of latency are reached by approximately 10:54 PM.

You can use additional Cloud Monitoring charts to understand different aspects of pipeline performance, resource utilization, and scaling behavior.

The rest of this section shows charts that are produced by the sample pipeline, and provides an analysis to illustrate how these charts are useful for diagnosis and troubleshooting.

Visualize the rate of incoming data

The following chart shows the rate of messages that are read by the pipeline from the Pub/Sub topic, using the jobs/elements_produced metric.

Chart showing the rate at which messages are read from Pub/Sub, with spiking starting around 10:30 PM.

An increase in incoming data volume can cause a pipeline to fall behind in processing, which results in a growing (and aging) backlog of items. As the volume of data that arrives in the pipeline goes up and down, you might see fluctuations in both system lag and the data watermark. Spikes are common in real-world conditions, but a prolonged elevation in system lag and watermark age can cause issues, such as late data being delivered by the pipeline. However, the Pub/Sub read rate chart indicates no direct correlation in the number of messages read and the elevation in pipeline lag shown in the System and watermark lag chart from earlier, despite some evident peaks up to approximately 2,000 elements per second.

You can also chart the publish rate to the Pub/Sub topic using the topic/send_request_count metric, which in this scenario has remained at a consistently (low) queries per second (QPS) rate over the period of elevated pipeline latency. You can see this on the following Publish request rate chart:

Chart showing the publish request rate, with a big dip around 10:30 PM.

Visualize processing rates for key transforms

The following chart shows a rapid increase in the rate of elements produced from the Compute Scores transform from approximately 10:27 PM, which is within the period of the rising data watermark age and system lag. The pipeline appears to be processing more data, but it's unable to reduce the aging backlog of work.

Chart showing increase in the elements created by the transform, starting around 10:30 PM.

From approximately 10:28 PM to 10:45 PM, the rate of processing fluctuates by 25% or more between readings. Different issues can cause this—for example, it can result from vCPU or I/O contention, from increasing volumes of incoming data, from changes to the shape of data, or from problems with the transform logic itself. You should investigate each of these possibilities to determine the exact cause.

An interesting observation is that the number of elements produced by the Compute Scores transform is much higher than the number of incoming messages into the pipeline. If you refer to the example code, you can see that the Compute Scores transform is potentially fan-out. This could be an indication of a performance bottleneck in the pipeline.

Visualize the number of vCPUs in use

The following chart shows the pipeline undergoing autoscaling up to a maximum of 80 vCPUs at approximately 10:25 PM.

Chart showing vCPUs in use, stepping down until about 10:30 PM, when they go up to 80 and stay there.

The autoscaling value correlates roughly with the increase in processing rate to 500,000 elements per second, but the pipeline appears unable to sustain this rate of processing. However, the graph shows that the pipeline is responding correctly to a decrease in throughput by scaling up with additional workers.

Visualize vCPU utilization

The following chart shows vCPU utilization across all pipeline workers. Beginning at 10:20 PM, which is when the prolonged elevation in pipeline lag begins, you can observe that the pipeline is completely CPU-bound.

Chart showing vCPU utilization, hitting 100% for all vCPUs around 10:18 PM, and new vCPUs coming online and going to 100%.

At approximately 10:25 PM, the pipeline is scaled up with additional workers, which also become consumed by CPU-bound work after the initial startup phase.

Analysis and suggested remediation

A correlation of all the factors suggests that the pipeline is becoming CPU-bound. In addition, there is high fan-out in the Compute Scores transform that's correlated with the saturation of CPU resources. Based on the Score processing rate and Pub/Sub read rate charts, you also know that there is no direct correlation between incoming and outgoing data volumes. Although pipeline autoscaling appeared to help with reducing the incoming backlog, the longer-term resolution might require more than just increasing worker counts.

If you focus on the Compute Scores transform, you see that the getHigherScoringPlayers method can return multiple players, which results in multiple calls to the isPlayerIdealMatch method. The significant increase in elements produced by Compute Scores suggests that there are many matching players over the period of elevated pipeline latency, and the logic that is repeated for each matching player consumes significant CPU resources. This in turn causes a backlog of data awaiting processing, reflected by the rising data watermark and system lag metrics. Player score updates become significantly delayed, because both the score update and matchmaking logic is tightly coupled in a single DoFn subclass.

You can try the following remediations:

  • Optimize the graph: Decouple the score update and player matchmaking logic into separate DoFn subclasses—for example, into an UpdateScoreFn class and a FindMatchingPlayersFn class. Furthermore, you might need to inject a GroupByKey operation and an ungroup operation before the FindMatchingPlayersFn function to prevent fusing a high-fanout and CPU-intensive step with UpdateScoreFn. For more information about fusion optimization, see the Identify performance issues caused by inappropriately fused steps in this series.

  • Review and optimize other code: You might need to optimize the getHigherScoringPlayers and isPlayerIdealMatch methods. This is beyond the scope of the series.

  • Optimize the Dataflow job: You might be able to improve the performance of the pipeline by increasing the minimum worker count, which can then absorb some workload peaks. However, increasing minimum worker counts might create idle workers, which can increase costs. Analyzing workload characteristics over time, such as the "burstiness" of data volumes and resource utilization, can help you make this decision.

    You can also improve performance by enabling Streaming Engine, which creates additional workers and gets them performing work more quickly. For more information, see Dataflow features for optimizing resource usage in this series.

  • Review and optimize dependent systems: You might also need to review and optimize the systems that provide data for the getHigherScoringPlayers and isPlayerIdealMatch methods for performance and capacity issues.

Create an alerting policy

You can configure an alerting policy to notify you when the additional latency fails to resolve over an acceptable period of time, such as if you use pipeline autoscaling to automatically increase the number of workers.

The following JSON example creates a Cloud Monitoring alerting policy.

{
    "combiner": "OR",
    "conditions": [
      {
        "conditionThreshold": {
          "aggregations": [
            {
              "alignmentPeriod": "60s",
              "crossSeriesReducer": "REDUCE_MEAN",
              "groupByFields": [
                "resource.label.job_name"
              ],
              "perSeriesAligner": "ALIGN_MEAN"
            }
          ],
          "comparison": "COMPARISON_GT",
          "duration": "180s",
          "filter": "metric.type=\"dataflow.googleapis.com/job/system_lag\" AND
                      resource.type=\"dataflow_job\"",
          "thresholdValue": 30.0,
          "trigger": {
            "count": 1
          }
        },
        "displayName": "System lag exceeds 30 seconds for 2 minutes"
      },
      {
        "conditionThreshold": {
          "aggregations": [
            {
              "alignmentPeriod": "60s",
              "crossSeriesReducer": "REDUCE_MEAN",
              "groupByFields": [
                "resource.label.job_name"
              ],
              "perSeriesAligner": "ALIGN_MEAN"
            }
          ],
          "comparison": "COMPARISON_GT",
          "duration": "300s",
          "filter": "metric.type=\"dataflow.googleapis.com/job/data_watermark_age\" AND
                      resource.type=\"dataflow_job\"",
          "thresholdValue": 60.0,
          "trigger": {
            "count": 1
          }
        },
        "displayName": "Data watermark lag exceeds 60 seconds for 5 minutes"
      },
      {
        "conditionThreshold": {
          "aggregations": [
            {
              "alignmentPeriod": "60s",
              "crossSeriesReducer": "REDUCE_MEAN",
              "groupByFields": [
                "resource.label.job_name"
              ],
              "perSeriesAligner": "ALIGN_MEAN"
            },
            {
              "alignmentPeriod": "60s",
              "perSeriesAligner": "ALIGN_PERCENT_CHANGE"
            }
          ],
          "comparison": "COMPARISON_GT",
          "duration": "0s",
          "filter": "metric.type=\"dataflow.googleapis.com/job/system_lag\" AND
                      resource.type=\"dataflow_job\"",
          "thresholdValue": 70.0,
          "trigger": {
            "count": 1
          }
        },
        "displayName": "System lag increases by 70% over a 1 minute period"
    ],
    "displayName": "Elevated pipeline latency"
}

The example creates a Cloud Monitoring alerting policy that has the following threshold conditions, which are combined using a logical OR:

  • As a leading indicator of increased pipeline latency, trigger an alert if system lag exceeds 30 seconds for a duration longer than 3 minutes.
  • As another leading indicator of pipeline latency, trigger an alert if system lag increases by 70% over a 1-minute period.
  • Trigger an alert if the data watermark age exceeds 60 seconds for at least 5 minutes.

Example: The pipeline job has completed successfully within a given period of time

Suppose that you run a daily batch job to update customer accounts. Each nightly batch job commences after the close of business at 18:00, and it needs to complete within a 3-hour window. This SLO is mentioned in the Site Reliability Engineering book as a common data pipeline freshness SLO format, expressed as the pipeline job has completed successfully within Y [seconds, days, minutes].

Decide on pipeline metrics

In effect, this SLO is a specialized case of X% of data processed in Y minutes, where the pipeline needs to complete data processing within a specified time duration. The following metrics are relevant:

  • The elapsed pipeline run time, which is provided by the job/elapsed_time metric for the pipeline. If you want to be alerted on the violation of the SLO target, the threshold to use for this metric is the 3-hour window. You can create additional alerts before the 3-hour window is exceeded to allow early response and remediation if possible.
  • You can monitor other secondary metrics, such as job/elements_produced_count, to understand the processing rates at key pipeline steps, or the percentage completion of a job at various times before the deadline.
  • If each nightly job is guaranteed to start successfully at 18:00, comparing job/elapsed_time against a threshold of 3 hours might be sufficient for monitoring and alerting.

    If the job cannot be guaranteed to begin at the required start time (18:00), you can use a custom metric to determine whether the job is still running after the required end time (21:00).

As one approach to devise a pipeline metric like this, you can use a Beam metric to propagate the current time to Cloud Monitoring in the form of a delta value from a reference point in time (for example, seconds elapsed since midnight). Because this type of metric can be updated only within a transform when the transform is processing data, this delta value is only approximate.

The following Java code snippet shows an example DoFn subclass that updates a seconds_since_midnight metric when each element is processed.

final class MyFn extends DoFn<String, MyObject> {
  private final Distribution secondsSinceMidnight = 
      Metrics.distribution(MyFn.class, "seconds_since_midnight");

  @ProcessElement
  public void processElement(ProcessContext c) {
    // Do some processing
    MyObject result = doMyProcessing(c.element());
    // Update seconds since midnight
    secondsSinceMidnight.update(getSecondsSinceMidnight());
    // Output result
    c.output(result);
  }
}

Create charts for monitoring

The following chart visualizes the seconds_since_midnight_MAX on a line chart. A threshold line is set at 75,600 seconds, which is the number of seconds between 00:00 and 21:00.

Chart showing currently running jobs, staying flat at just under 20,000.

For example charts that are useful for monitoring in this scenario, see X% of data processed in Y minutes earlier in this document.

Create an alerting policy

You can configure an alerting policy to notify you if the job duration exceeds 75,600 seconds past midnight.

The following JSON example creates a Cloud Monitoring alerting policy for this scenario.

{
    "combiner": "OR",
    "conditions": [
      {
        "conditionThreshold": {
          "aggregations": [
            {
              "alignmentPeriod": "60s",
              "crossSeriesReducer": "REDUCE_MEAN",
              "groupByFields": [
                "resource.label.job_name"
              ],
              "perSeriesAligner": "ALIGN_MEAN"
            }
          ],
          "comparison": "COMPARISON_GT",
          "duration": "180s",
          "filter": "metric.type=\"custom.googleapis.com/dataflow/seconds_since_midnight_MAX\" AND
                      resource.type=\"dataflow_job\""
                      ,
          "thresholdValue": 75600,
          "trigger": {
            "count": 1
          }
        },
        "displayName": "Pipeline is running past 75600 seconds since midnight"
      },
    "displayName": "Slow job alert"
}

For example conditions that are useful for alerting in this scenario, see X% of data processed in Y minutes earlier in this document.

Data correctness

Maximizing data correctness involves tasks across the pipeline lifecycle, from development and testing, through operational monitoring and alerting. Outside of the pipeline itself, the principle of garbage-in-garbage-out also applies; the value of the results cannot be better than the quality of the data that you start with.

There are also different types of correctness, some of which are easier to detect than others. For example, otherwise correct data that's described using a malformed schema could be considered incorrect data; if it has to be discarded, it might result in incomplete and incorrect results. It's usually easier to use measurements to detect and handle a malformed schema than it is to detect incorrect data that's described using a valid schema.

During pipeline development, you use a variety of tests to verify your pipeline's functionality, including using unit tests and integration tests to compare results against reference data. However, from an operational perspective, there often aren't simple methods to verify output. Instead, you incorporate a range of primary metrics for data issues that are directly measurable, and secondary metrics that provide you with leading indicators of data issues.

Primary metrics that indicate data or logic errors

Primary metrics are high-confidence signals that indicate data correctness issues. Common examples of primary metrics include the following:

  • Schema validation errors: examples include schema checks that fail, such as an invalid (malformed) or unrecognized JSON schema when elements are parsed, or data-type checking that fails.

  • Data validation errors: examples include errors that are detected when values are checked against defined thresholds, when they're checked using regular expression patterns, or when they're checked against business rules.

  • Pipeline errors: examples include code exceptions that are caused by data issues and that are exposed and measured through error reporting.

When incorrect data is detected (for example, through exception handling in pipeline code), a best practice is to write the incorrect data to a dead letter queue. The dead letter queue serves multiple purposes. For example, the size of the queue can be an SLI for a data correctness SLO ("less than x items failing processing over t period of time").

Beam Metrics can also keep count of specific types of data errors. For example, a pipeline transform can write a payload that has a missing required field to a dead letter queue and then increment a missing_required_field counter. If the data issue is not severe enough to treat as a dead letter, the missing_required_field counter can be incremented and the data retained for processing as normal. The following code example shows how to implement this.

final class MyFn extends DoFn<String, String> {
  private final Counter missingField = 
       Metrics.counter(MyFn.class, "missing_required_field");

  private Gson gsonParser;

  @Setup
  public void setup() {
    gsonParser = new Gson();
  }

  @ProcessElement
  public void processElement(ProcessContext c) {
    try {
      MyEvent event = gsonParser.fromJson(c.element(), MyEvent.class);
      // Check if data is valid but missing a desired field
      if (event.getDescription() == null) {
        // Increment missing field counter
        missingField.inc();
      }
      // Output element as normal
      c.output(successTag, event);
    }
    catch (JsonParseException e) {
      // Send malformed data to dead letter queue
      c.output(errorTag, c.element());
    }
    c.output(...);
  }
}

The following Cloud Monitoring chart shows how these metrics might appear on a line graph.

Chart showing data errors and warnings. One line shows missing required fields (938 total), and another line shows malformed JSON (2 total).

In the graph, Missing Required Field uses the missing_required_field custom Cloud Monitoring metric, while Malformed JSON uses the element_count metric of the output PCollection object (elements that have been output with the errorTag tag).

Secondary metrics for data correctness

Primary metrics for correctness should be hard indicators of data issues. In contrast, secondary metrics let you observe and measure other signals that might indicate an issue or that can lead to data incorrectness.

The secondary metrics that are useful differ case by case, and they frequently depend on a business context. For example, if a pipeline computes an unusually high average temperature from IoT sensors at nighttime in a relatively cool location, it can indicate different possibilities. One possibility is an issue with data correctness in the input data, perhaps due to faulty sensors. Another possibility is an issue with the pipeline's functionality. In this case, you might create a metric for average temperature over time and create alerts based on some appropriate thresholds.

For streaming pipelines, an increase in the number of dropped elements due to lateness can be a useful signal for detecting problems. While the number of dropped elements is not in itself an error condition, it could signal problems that can lead to data completeness and correctness issues. These problems might include incorrect assumptions about data timeliness or about the need to include or adjust late triggers, or even issues with data sources. For streaming jobs, Dataflow automatically creates two Beam counters to track the counts of dropped elements from lateness. These counters are droppedDueToClosedWindow for closed windows from trigger execution, and droppedDueToLateness for expired windows where the watermark has passed the end of a window.

What's Next