This document discusses how to monitor and create alerts for your data pipelines. It's part of a series that helps you improve the production readiness of your data pipelines by using Dataflow. The series is intended for a technical audience whose responsibilities include the development, deployment, and monitoring of Dataflow pipelines, and who have a working understanding of Dataflow and Apache Beam.
The documents in the series include the following parts:
- Overview
- Planning data pipelines
- Developing and testing data pipelines
- Deploying data pipelines
- Monitoring data pipelines (this document)
Various SLOs relating to data freshness and correctness were discussed in an earlier document in this series, Planning data pipelines. This document explains how you can use different metrics as service level indicators (SLIs) to gauge different SLOs. For each scenario, this document includes an example that shows an approach to monitoring and an alerting policy, all based on Cloud Monitoring.
Introduction
Monitoring is an important part of productionizing data pipelines. It lets you combine different types of metrics and observe important indicators of pipeline performance, called service level indicators (SLIs). By comparing SLIs against acceptable thresholds, monitoring gives you critical insight for early detection of potential issues, and lets you define and measure service level objectives (SLOs).
Monitoring and alerting for data pipelines fall into the following areas:
- Infrastructure metrics such as CPU, memory, disks, networking, and worker autoscaling. Dataflow jobs use Compute Engine resources to execute your pipeline, so you should include those resources in your monitoring strategy. For example, infrastructure monitoring can reveal pressure on specific resources that affect pipeline performance. These insights are useful for day-to-day operations and for identifying potential optimizations to pipeline code, system integrations, and Dataflow job configurations.
- Pipeline metrics that are measured at key pipeline steps, such as system lag and data watermark. While infrastructure metrics measure computing-resource utilization, pipeline metrics are the basis of SLIs that can more directly measure business SLOs. For example, you can use a pipeline metric that tracks the count of dead letters to set an SLO target for data correctness.
- Pipeline logs and errors that are generated from your pipeline code, which can indicate code defects and other unanticipated behavior. Dataflow provides built-in infrastructure for writing pipeline logs to Cloud Logging.
- Service logs and errors that are reported by the Dataflow service during job submission and execution, and by any other Google Cloud services used by your pipeline. These include quota warnings, such as insufficient compute resources to autoscale your pipeline, which are captured in Cloud Logging. For more information about other types of errors you might encounter, see Common error guidance.
This document focuses on pipeline metrics that relate to the scenarios described in Planning data pipelines: Defining and measuring SLOs.
Monitoring tools
You can monitor Dataflow pipelines through the Dataflow monitoring interface and by using Cloud Monitoring.
Dataflow monitoring interface
Dataflow provides a web-based monitoring interface that you can use to view and manage jobs. In addition, the monitoring interface provides a range of important observability features, which helps you diagnose common problems such as pipelines that are stalled or that are not performing efficiently.
Although Cloud Monitoring provides extensive flexibility for pipeline monitoring, the built-in Dataflow monitoring interface offers several observability features that don't require any additional configuration. Examples include the following:
- Time-series graphs that display critical metrics for running jobs, such as system lag and data freshness.
- Step-level metrics that can help identify which steps might be causing a stalled pipeline or an unacceptable system lag.
- I/O metrics that can help identify bottlenecks in sources and sinks.
- Other statistical information that can expose anomalous behavior.
The Dataflow monitoring interface also provides a range of enhanced usability features. These include the following:
- Integration with Cloud Monitoring to create threshold-based alerts.
- A way to inspect job logs without leaving the UI.
- Built-in controls to adjust the timespan of displayed metrics.
Cloud Monitoring
Dataflow's integration with Cloud Monitoring gives you access to many useful metrics for Dataflow, metrics for Compute Engine, and metrics for other Google Cloud services.
You can create customized charts and dashboards to visualize these metrics. You can also use alerting capabilities to notify you of various conditions, such as high system lag or failed jobs. Additionally, Cloud Monitoring provides a comprehensive suite of capabilities for logging, error reporting, incident management, diagnostics, and more.
For more information, see Using Monitoring for Dataflow pipelines.
Data freshness
In many situations, data freshness greatly influences the usability and usefulness of data. For example, in a time-sensitive use case like next-day demand forecasting, the data has to be processed within a strict timeframe, otherwise it can't be used to drive business decisions. This section discusses examples of how to monitor data freshness.
Example: Percentage of data processed in a given time period
Suppose you have a recurring batch job that should be at least 80% complete (as measured by the proportion of the dataset that has been processed) within 45 minutes from the start of the job. This type of SLO is mentioned in the Site Reliability Engineering book as a common data pipeline freshness SLO format, expressed as X% of data processed in Y [seconds, days, minutes].
Decide on pipeline metrics
In this example, the following metrics are relevant:
- The elapsed pipeline run time, which is provided by the
job/elapsed_time
metric for the pipeline. The percentage of job completion at a specified time, which is the ratio of elements that have completed processing to the number of all elements in the input dataset. You need to choose appropriate metrics to derive the measure of progress, which can differ according to how your pipeline works.
You can approach the problem in the following ways:
Use
PCollection
element counts: You can use the element counts to calculate a completion ratio, but only if the following conditions are met. First, the input and output datasets must be contained in two separatePCollection
objects. Second, each input element results in exactly one output element after element-wise processing—for example, by using theMapElements
transform. You can determinePCollection
element counts using thejob/element_count
metric.Use custom metrics: For element-wise transformations that can yield zero or more output elements, such as
FlatMap
, a ratio based on the direct comparison ofPCollection
element counts is unlikely to be useful. If you cannot form a meaningful ratio usingPCollection
element counts, you can use custom metrics at different pipeline steps and then compare related metrics to determine job progress. For example, you can compare atotal_payments
counter in an early pipeline step with aprocessed_payments
counter at a later step, where each counter is incremented by your pipeline's business logic.
You can monitor and create alerts for related metrics, such as
job/elements_produced_count
, to understand processing rates at key pipeline steps.
Example pipeline
In the following pipeline code, input is read from one or more files using the
Read input
transform, and each line of input is processed using the Process
element
transform.
final class MyFn extends DoFn<String, String> {
@ProcessElement
public void processElement(ProcessContext c) {
// Do some processing
String result = doMyProcessing(c.element());
// Output result
c.output(result);
}
}
Pipeline p = Pipeline.create(options);
p.apply("Read inputs", TextIO.read().from(...))
.apply("Unfuse", new UnfuseTransform())
.apply("Process element", ParDo.of(new MyFn()))
.apply("Write output", TextIO.write().to(...));
You can use the Dataflow job/element_count
metric to
instrument the count of elements in the output PCollection
objects for the
Read input
and Process element
transforms. After the Read input
transform completes, you can use the ratio of the element counts between the two
output PCollection
objects to determine what proportion of the dataset has
been processed.
The example pipeline includes an Unfuse
step, which is an instance of a
hypothetical PTransform
transform (UnfuseTransform
) that implements a
GroupByKey
and ungroup operation. Injecting the Unfuse
step prevents
Dataflow from fusing the Read input
and Process element
steps
together. As a result, the Read input
step is able to complete execution
independently of the Process element
step.
For more information about fusion, see
Fusion Optimization
in the Dataflow documentation. For a code example that shows how
to use GroupByKey
and an ungroup operation to unfuse steps, see
Identify performance issues caused by inappropriately fused steps
in this series.
When the Read input
step has completed, you can get an accurate element count
of the total input dataset size by reading the jobs/element_count
value for
the Read input.out0
output PCollection
object.
Create charts for monitoring
The following example charts two metrics:
- The
job/element_count
metric for theRead input.out0
PCollection object
, Total input elements. - The
job/element_count
metric for theProcess element.out0
PCollection
object, Processed elements.
By plotting values for both metrics, you can see the relationship between them, and you can visually inspect the trajectory of processed data. If the chart shows any issues, such as declining processing rates, you can correlate these observations with other metrics and with error reporting to aid your organization's response and remediation efforts.
The following chart plots job/elapsed_time
on a time series. A threshold line
provides a visual aid to indicate 45 minutes (2700 seconds). In this graph, the
pipeline runtime has exceeded the threshold.
The job/elements_produced_count
metric for the Process element
transform is
shown on a separate chart. In the following chart, the processing rate of the
Process element
transform drops below the threshold and trends towards zero.
This might be normal if the job is nearing completion. However, when a low processing rate is correlated with other metrics, it can also be an indication of abnormal pipeline behavior that warrants further investigation.
Create alerting policy
You can create ratio-based thresholds within Cloud Monitoring alerting policies, as shown in the following JSON example:
{
"displayName":"Slow job alert",
"combiner":"AND",
"conditions":[
{
"displayName":"Job duration exceeds 45 minutes",
"conditionThreshold":{
"aggregations":[
{
"alignmentPeriod":"60s",
"crossSeriesReducer":"REDUCE_MEAN",
"groupByFields":[
"resource.label.job_name"
],
"perSeriesAligner":"ALIGN_MEAN"
}
],
"filter":"metric.type=\"dataflow.googleapis.com/job/elapsed_time\" AND
resource.type=\"dataflow_job\"",
"comparison":"COMPARISON_GT",
"thresholdValue":2700,
"duration":"0s"
}
},
{
"displayName":"Ratio of processed to unprocessed is less than 80%",
"conditionThreshold":{
"aggregations":[
{
"alignmentPeriod":"60s",
"crossSeriesReducer":"REDUCE_MEAN",
"groupByFields":[
"resource.label.job_name"
],
"perSeriesAligner":"ALIGN_MEAN"
}
],
"comparison":"COMPARISON_LT",
"filter":"metric.type=\"dataflow.googleapis.com/job/element_count\" AND
metric.label.pcollection=\"Process elements.out0\" AND
resource.type = \"dataflow_job\"",
"denominatorFilter":"metric.type=\"dataflow.googleapis.com/job/element_count\" AND
metric.label.pcollection=\"Read input.out0\" AND
resource.type = \"dataflow_job\"",
"denominatorAggregations":[
{
"alignmentPeriod":"60s",
"crossSeriesReducer":"REDUCE_MEAN",
"groupByFields":[
"resource.label.job_name"
],
"perSeriesAligner":"ALIGN_MEAN"
}
],
"thresholdValue":0.8,
"duration":"0s"
}
}
]
}
The example creates a Cloud Monitoring alerting policy with the following threshold conditions, which are combined using a logical AND:
- Any running pipeline that exceeds a running time of 2700 seconds (or 45
minutes) and that has a matching job name. This condition uses the
Dataflow
elapsed_time
metric. - The ratio of processed to unprocessed elements representing less than 80%
completion. If
denominatorFilter
is specified in aconditionThreshold
metric, thedenominatorFilter
metric becomes the denominator of the ratio, andfilter
becomes the numerator.
Example: The oldest data is not older than Y [seconds, days, minutes]
Suppose you're using a streaming data pipeline to update a player leaderboard for a massively multiplayer online game. Multiple matches between groups of players are played simultaneously, and the pipeline updates each player's score in a central database at the end of each match. When each player's score is updated, the pipeline finds all other players who have higher scores in order to suggest future matches against more difficult opponents.
For this example SLO, the time between the end of a match and the update of a player's score should generally not exceed 120 seconds. If the delay does exceed 120 seconds, the period of this extended delay should not last more than 5 minutes.
Decide on pipeline metrics
Dataflow provides standard metrics to measure how quickly processing occurs as new data arrives in the pipeline. You can use the following metrics as SLIs:
- Data watermark age (or lag), which is the age of the most recent item
that's been fully processed by the pipeline. In other words, it's the
duration between the current time and the watermark at a pipeline step.
This Dataflow metric is available as
job/data_watermark_age
in Cloud Monitoring. Relative to a target threshold, the data watermark age is the primary SLI that indicates whether the SLO is being met. - System lag, which is the maximum duration that an item has been awaiting
processing at a pipeline step. This Dataflow metric is
available as
job/system_lag
in Cloud Monitoring. A system lag that's close to the acceptable threshold could indicate difficulty in meeting the SLO.
Individual steps in a running pipeline have their own system lag (calculated
from the oldest element in the nearest input queue to that step), and data
watermark age (calculated from the most recent element that's processed by the
step). In Cloud Monitoring, job/system_lag
is the maximum system lag of all
steps, and job/data_watermark_age
is the age of the most recent element that's
processed from all outputs.
Example pipeline
The following Java code snippet implements an example pipeline for the scenario.
final class ComputeScoresFn extends DoFn<ScoreEvent, String> {
@ProcessElement
public void processElement(ProcessContext c) {
ScoreEvent event = c.element();
// Do some processing to compute and update score
// ...
// Find all higher scoring players to pit the player against!
for (Player p : getHigherScoringPlayers(event.getScore())) {
if (isPlayerIdealMatch(event.getPlayerId(), p.getPlayerId()) {
c.output(KV.of(event.getPlayerId(), p));
}
};
}
}
Pipeline p = Pipeline.create(options);
p.apply("Read score update events", PubsubIO.readStrings().fromTopic())
.apply(
"Parse event",
MapElements.via(
new SimpleFunction<String, ScoreEvent>() {
ScoreEvent apply(String payload) {
// ... returns ScoreEvent object parsed from String
}
}))
.apply("Update scores", ParDo.of(new ComputeScoresFn()))
.apply("Send rematch suggestions", ParDo.of(PubsubIO.writeStrings().to()));
p.run();
The pipeline reads score updates (parsed into ScoreEvent
objects) from a
Pub/Sub topic. Each ScoreEvent
object is processed by a DoFn
subclass (ComputeScoreFn
) that updates the score database and calls other
helper functions to determine future player match-ups. Each match-up is output
as a key-value pair that includes the player with the updated score and another
player. The key-value pair is written to a Pub/Sub topic for further
processing by a matchmaking system.
You might notice that ComputeScoresFn
could be a high fan-out transform that
produces multiple output elements per input element. The following section
discusses potential effects of the high fan-out transform on pipeline
performance and behavior.
Create charts for monitoring
You can create a chart that plots both system lag and data watermark age for a pipeline. Combining both metrics into one chart makes it easier to see the correlation between the two metrics.
The following chart plots the system lag and data watermark over a roughly 1-hour period.
The purple line represents the job/system_lag
metric, and the orange line
represents the job/data_watermark_age
metric. A visual threshold line that's
manually set at 2 minutes indicates a maximum peak for both metrics, which is
expected under normal circumstances.
You can see the following from the chart:
- Until approximately 10:20 PM, both system lag and data watermark age are generally below 120 seconds, but have regular peaks that increase both metrics up to the general acceptable threshold. However, the elevated metrics are returned to lower levels within the acceptable 5-minute period.
- Beginning at approximately 10:20 PM, there is significant elevation of both the system lag and data watermark metrics that persist beyond the acceptable 5-minute period. In the scenario, a Cloud Monitoring alert should be triggered to notify pipeline operators of an incident that requires attention and possible manual remediation.
- The age of the oldest item awaiting processing (the system lag) begins to significantly decline at about 10:45, and the data watermark age follows the same trend at approximately 10:48 PM. Acceptable levels of latency are reached by approximately 10:54 PM.
You can use additional Cloud Monitoring charts to understand different aspects of pipeline performance, resource utilization, and scaling behavior.
The rest of this section shows charts that are produced by the sample pipeline, and provides an analysis to illustrate how these charts are useful for diagnosis and troubleshooting.
Visualize the rate of incoming data
The following chart shows the rate of messages that are read by the pipeline
from the Pub/Sub topic, using the jobs/elements_produced
metric.
An increase in incoming data volume can cause a pipeline to fall behind in processing, which results in a growing (and aging) backlog of items. As the volume of data that arrives in the pipeline goes up and down, you might see fluctuations in both system lag and the data watermark. Spikes are common in real-world conditions, but a prolonged elevation in system lag and watermark age can cause issues, such as late data being delivered by the pipeline. However, the Pub/Sub read rate chart indicates no direct correlation in the number of messages read and the elevation in pipeline lag shown in the System and watermark lag chart from earlier, despite some evident peaks up to approximately 2,000 elements per second.
You can also chart the publish rate to the Pub/Sub topic using
the topic/send_request_count
metric, which in this scenario has remained at a
consistently (low) queries per second (QPS) rate over the period of elevated
pipeline latency. You can see this on the following Publish request rate
chart:
Visualize processing rates for key transforms
The following chart shows a rapid increase in the rate of elements produced from
the Compute Scores
transform from approximately 10:27 PM, which is within
the period of the rising data watermark age and system lag. The pipeline appears
to be processing more data, but it's unable to reduce the aging backlog of work.
From approximately 10:28 PM to 10:45 PM, the rate of processing fluctuates by 25% or more between readings. Different issues can cause this—for example, it can result from vCPU or I/O contention, from increasing volumes of incoming data, from changes to the shape of data, or from problems with the transform logic itself. You should investigate each of these possibilities to determine the exact cause.
An interesting observation is that the number of elements produced by the
Compute Scores
transform is much higher than the number of incoming messages
into the pipeline. If you refer to the example code, you can see that the
Compute Scores
transform is potentially fan-out. This could be an indication
of a performance bottleneck in the pipeline.
Visualize the number of vCPUs in use
The following chart shows the pipeline undergoing autoscaling up to a maximum of 80 vCPUs at approximately 10:25 PM.
The autoscaling value correlates roughly with the increase in processing rate to 500,000 elements per second, but the pipeline appears unable to sustain this rate of processing. However, the graph shows that the pipeline is responding correctly to a decrease in throughput by scaling up with additional workers.
Visualize vCPU utilization
The following chart shows vCPU utilization across all pipeline workers. Beginning at 10:20 PM, which is when the prolonged elevation in pipeline lag begins, you can observe that the pipeline is completely CPU-bound.
At approximately 10:25 PM, the pipeline is scaled up with additional workers, which also become consumed by CPU-bound work after the initial startup phase.
Analysis and suggested remediation
A correlation of all the factors suggests that the pipeline is becoming
CPU-bound. In addition, there is high fan-out in the Compute Scores
transform
that's correlated with the saturation of CPU resources. Based on the Score
processing rate and Pub/Sub read rate charts, you also know that there is
no direct correlation between incoming and outgoing data volumes. Although
pipeline autoscaling appeared to help with reducing the incoming backlog, the
longer-term resolution might require more than just increasing worker counts.
If you focus on the Compute Scores
transform, you see that the
getHigherScoringPlayers
method can return multiple players, which results in
multiple calls to the isPlayerIdealMatch
method. The significant increase in
elements produced by Compute Scores
suggests that there are many matching
players over the period of elevated pipeline latency, and the logic that is
repeated for each matching player consumes significant CPU resources. This in
turn causes a backlog of data awaiting processing, reflected by the rising data
watermark and system lag metrics. Player score updates become significantly
delayed, because both the score update and matchmaking logic is tightly coupled
in a single
DoFn
subclass.
You can try the following remediations:
Optimize the graph: Decouple the score update and player matchmaking logic into separate
DoFn
subclasses—for example, into anUpdateScoreFn
class and aFindMatchingPlayersFn
class. Furthermore, you might need to inject aGroupByKey
operation and an ungroup operation before theFindMatchingPlayersFn
function to prevent fusing a high-fanout and CPU-intensive step withUpdateScoreFn
. For more information about fusion optimization, see the Identify performance issues caused by inappropriately fused steps in this series.Review and optimize other code: You might need to optimize the
getHigherScoringPlayers
andisPlayerIdealMatch
methods. This is beyond the scope of the series.Optimize the Dataflow job: You might be able to improve the performance of the pipeline by increasing the minimum worker count, which can then absorb some workload peaks. However, increasing minimum worker counts might create idle workers, which can increase costs. Analyzing workload characteristics over time, such as the "burstiness" of data volumes and resource utilization, can help you make this decision.
You can also improve performance by enabling Streaming Engine, which creates additional workers and gets them performing work more quickly. For more information, see Dataflow features for optimizing resource usage in this series.
Review and optimize dependent systems: You might also need to review and optimize the systems that provide data for the
getHigherScoringPlayers
andisPlayerIdealMatch
methods for performance and capacity issues.
Create an alerting policy
You can configure an alerting policy to notify you when the additional latency fails to resolve over an acceptable period of time, such as if you use pipeline autoscaling to automatically increase the number of workers.
The following JSON example creates a Cloud Monitoring alerting policy.
{
"combiner": "OR",
"conditions": [
{
"conditionThreshold": {
"aggregations": [
{
"alignmentPeriod": "60s",
"crossSeriesReducer": "REDUCE_MEAN",
"groupByFields": [
"resource.label.job_name"
],
"perSeriesAligner": "ALIGN_MEAN"
}
],
"comparison": "COMPARISON_GT",
"duration": "180s",
"filter": "metric.type=\"dataflow.googleapis.com/job/system_lag\" AND
resource.type=\"dataflow_job\"",
"thresholdValue": 30.0,
"trigger": {
"count": 1
}
},
"displayName": "System lag exceeds 30 seconds for 2 minutes"
},
{
"conditionThreshold": {
"aggregations": [
{
"alignmentPeriod": "60s",
"crossSeriesReducer": "REDUCE_MEAN",
"groupByFields": [
"resource.label.job_name"
],
"perSeriesAligner": "ALIGN_MEAN"
}
],
"comparison": "COMPARISON_GT",
"duration": "300s",
"filter": "metric.type=\"dataflow.googleapis.com/job/data_watermark_age\" AND
resource.type=\"dataflow_job\"",
"thresholdValue": 60.0,
"trigger": {
"count": 1
}
},
"displayName": "Data watermark lag exceeds 60 seconds for 5 minutes"
},
{
"conditionThreshold": {
"aggregations": [
{
"alignmentPeriod": "60s",
"crossSeriesReducer": "REDUCE_MEAN",
"groupByFields": [
"resource.label.job_name"
],
"perSeriesAligner": "ALIGN_MEAN"
},
{
"alignmentPeriod": "60s",
"perSeriesAligner": "ALIGN_PERCENT_CHANGE"
}
],
"comparison": "COMPARISON_GT",
"duration": "0s",
"filter": "metric.type=\"dataflow.googleapis.com/job/system_lag\" AND
resource.type=\"dataflow_job\"",
"thresholdValue": 70.0,
"trigger": {
"count": 1
}
},
"displayName": "System lag increases by 70% over a 1 minute period"
],
"displayName": "Elevated pipeline latency"
}
The example creates a Cloud Monitoring alerting policy that has the following threshold conditions, which are combined using a logical OR:
- As a leading indicator of increased pipeline latency, trigger an alert if system lag exceeds 30 seconds for a duration longer than 3 minutes.
- As another leading indicator of pipeline latency, trigger an alert if system lag increases by 70% over a 1-minute period.
- Trigger an alert if the data watermark age exceeds 60 seconds for at least 5 minutes.
Example: The pipeline job has completed successfully within a given period of time
Suppose that you run a daily batch job to update customer accounts. Each nightly batch job commences after the close of business at 18:00, and it needs to complete within a 3-hour window. This SLO is mentioned in the Site Reliability Engineering book as a common data pipeline freshness SLO format, expressed as the pipeline job has completed successfully within Y [seconds, days, minutes].
Decide on pipeline metrics
In effect, this SLO is a specialized case of X% of data processed in Y minutes, where the pipeline needs to complete data processing within a specified time duration. The following metrics are relevant:
- The elapsed pipeline run time, which is provided by the
job/elapsed_time
metric for the pipeline. If you want to be alerted on the violation of the SLO target, the threshold to use for this metric is the 3-hour window. You can create additional alerts before the 3-hour window is exceeded to allow early response and remediation if possible. - You can monitor other secondary metrics, such as
job/elements_produced_count
, to understand the processing rates at key pipeline steps, or the percentage completion of a job at various times before the deadline. If each nightly job is guaranteed to start successfully at 18:00, comparing
job/elapsed_time
against a threshold of 3 hours might be sufficient for monitoring and alerting.If the job cannot be guaranteed to begin at the required start time (18:00), you can use a custom metric to determine whether the job is still running after the required end time (21:00).
As one approach to devise a pipeline metric like this, you can use a Beam metric to propagate the current time to Cloud Monitoring in the form of a delta value from a reference point in time (for example, seconds elapsed since midnight). Because this type of metric can be updated only within a transform when the transform is processing data, this delta value is only approximate.
The following Java code snippet shows an example DoFn
subclass that
updates a seconds_since_midnight
metric when each element is processed.
final class MyFn extends DoFn<String, MyObject> {
private final Distribution secondsSinceMidnight =
Metrics.distribution(MyFn.class, "seconds_since_midnight");
@ProcessElement
public void processElement(ProcessContext c) {
// Do some processing
MyObject result = doMyProcessing(c.element());
// Update seconds since midnight
secondsSinceMidnight.update(getSecondsSinceMidnight());
// Output result
c.output(result);
}
}
Create charts for monitoring
The following chart visualizes the seconds_since_midnight_MAX
on a line
chart. A threshold line is set at 75,600 seconds, which is the number of seconds
between 00:00 and 21:00.
For example charts that are useful for monitoring in this scenario, see X% of data processed in Y minutes earlier in this document.
Create an alerting policy
You can configure an alerting policy to notify you if the job duration exceeds 75,600 seconds past midnight.
The following JSON example creates a Cloud Monitoring alerting policy for this scenario.
{
"combiner": "OR",
"conditions": [
{
"conditionThreshold": {
"aggregations": [
{
"alignmentPeriod": "60s",
"crossSeriesReducer": "REDUCE_MEAN",
"groupByFields": [
"resource.label.job_name"
],
"perSeriesAligner": "ALIGN_MEAN"
}
],
"comparison": "COMPARISON_GT",
"duration": "180s",
"filter": "metric.type=\"custom.googleapis.com/dataflow/seconds_since_midnight_MAX\" AND
resource.type=\"dataflow_job\""
,
"thresholdValue": 75600,
"trigger": {
"count": 1
}
},
"displayName": "Pipeline is running past 75600 seconds since midnight"
},
"displayName": "Slow job alert"
}
For example conditions that are useful for alerting in this scenario, see X% of data processed in Y minutes earlier in this document.
Data correctness
Maximizing data correctness involves tasks across the pipeline lifecycle, from development and testing, through operational monitoring and alerting. Outside of the pipeline itself, the principle of garbage-in-garbage-out also applies; the value of the results cannot be better than the quality of the data that you start with.
There are also different types of correctness, some of which are easier to detect than others. For example, otherwise correct data that's described using a malformed schema could be considered incorrect data; if it has to be discarded, it might result in incomplete and incorrect results. It's usually easier to use measurements to detect and handle a malformed schema than it is to detect incorrect data that's described using a valid schema.
During pipeline development, you use a variety of tests to verify your pipeline's functionality, including using unit tests and integration tests to compare results against reference data. However, from an operational perspective, there often aren't simple methods to verify output. Instead, you incorporate a range of primary metrics for data issues that are directly measurable, and secondary metrics that provide you with leading indicators of data issues.
Primary metrics that indicate data or logic errors
Primary metrics are high-confidence signals that indicate data correctness issues. Common examples of primary metrics include the following:
Schema validation errors: examples include schema checks that fail, such as an invalid (malformed) or unrecognized JSON schema when elements are parsed, or data-type checking that fails.
Data validation errors: examples include errors that are detected when values are checked against defined thresholds, when they're checked using regular expression patterns, or when they're checked against business rules.
Pipeline errors: examples include code exceptions that are caused by data issues and that are exposed and measured through error reporting.
When incorrect data is detected (for example, through exception handling in pipeline code), a best practice is to write the incorrect data to a dead letter queue. The dead letter queue serves multiple purposes. For example, the size of the queue can be an SLI for a data correctness SLO ("less than x items failing processing over t period of time").
Beam Metrics can also keep count of specific types of data errors. For example,
a pipeline transform can write a payload that has a missing required field to a
dead letter queue and then increment a missing_required_field
counter. If the
data issue is not severe enough to treat as a dead letter, the
missing_required_field
counter can be incremented and the data retained for
processing as normal. The following code example shows how to implement this.
final class MyFn extends DoFn<String, String> {
private final Counter missingField =
Metrics.counter(MyFn.class, "missing_required_field");
private Gson gsonParser;
@Setup
public void setup() {
gsonParser = new Gson();
}
@ProcessElement
public void processElement(ProcessContext c) {
try {
MyEvent event = gsonParser.fromJson(c.element(), MyEvent.class);
// Check if data is valid but missing a desired field
if (event.getDescription() == null) {
// Increment missing field counter
missingField.inc();
}
// Output element as normal
c.output(successTag, event);
}
catch (JsonParseException e) {
// Send malformed data to dead letter queue
c.output(errorTag, c.element());
}
c.output(...);
}
}
The following Cloud Monitoring chart shows how these metrics might appear on a line graph.
In the graph, Missing Required Field uses the missing_required_field
custom Cloud Monitoring metric, while Malformed JSON uses the
element_count
metric of the output PCollection
object (elements that have
been output with the errorTag
tag).
Secondary metrics for data correctness
Primary metrics for correctness should be hard indicators of data issues. In contrast, secondary metrics let you observe and measure other signals that might indicate an issue or that can lead to data incorrectness.
The secondary metrics that are useful differ case by case, and they frequently depend on a business context. For example, if a pipeline computes an unusually high average temperature from IoT sensors at nighttime in a relatively cool location, it can indicate different possibilities. One possibility is an issue with data correctness in the input data, perhaps due to faulty sensors. Another possibility is an issue with the pipeline's functionality. In this case, you might create a metric for average temperature over time and create alerts based on some appropriate thresholds.
For streaming pipelines, an increase in the number of
dropped elements due to lateness
can be a useful signal for detecting problems. While the number of dropped
elements is not in itself an error condition, it could signal problems that can
lead to data completeness and correctness issues. These problems might include
incorrect assumptions about data timeliness or about the need to include or
adjust late triggers, or even issues with data sources. For streaming jobs,
Dataflow automatically creates two Beam counters to track the
counts of dropped elements from lateness. These counters are
droppedDueToClosedWindow
for closed windows from trigger execution, and
droppedDueToLateness
for expired windows where the watermark has passed the
end of a window.
What's Next
- Learn how to use the Dataflow monitoring interface.
Learn how to use Cloud Monitoring for Dataflow pipelines.
Explore reference architectures, diagrams, tutorials, and best practices about Google Cloud. Take a look at our Cloud Architecture Center.