Automating training-serving skew detection in AI Platform Prediction

This document is the fourth in a series that shows you how to monitor Machine Learning (ML) models in AI Platform Prediction for data skew detection. This guide introduces an end-to-end solution for automating the detection of training-serving data skews over time and for generating alerts if anomalies are found. It focuses on automation, scheduling, operationalization, and scaling of the serving-logs analysis process for skews and data drift detection.

This guide is for ML engineers who want to maintain the predictive performance of their ML models in production by monitoring how the serving data changes over time and by automatically identifying data skew and anomaly.

The series consists of the following guides:

The GitHub repository for this series

The GitHub repository that's associated with this series contains sample code and instructions for deploying the system described in this document and for submitting and scheduling the log analyzer pipeline runs. The repository also includes example Jupyter notebooks. These notebooks demonstrate techniques for accessing and analyzing statistics and for analyzing anomalies summaries that the system generates.

Architecture

The following diagram shows the architecture of the system that's described in this document.

Architecture of the system, showing flow of information to detect anomalies.

The system consists of two components:

  • Log analyzer pipeline. The core of the system is a Dataflow Flex Template. The template encapsulates an Apache Beam pipeline (the log analyzer pipeline) which extracts and analyzes records from the AI Platform Prediction request-response log. Dataflow is suitable for this task because it scales up and can work with large amounts of data. The pipeline uses TensorFlow Data Validation (TFDV) to calculate descriptive statistics and to detect data anomalies in a time series of records extracted from the log. The artifacts that are generated by the pipeline are pushed to Cloud Storage. You can then analyze them in a Jupyter notebook by using techniques described in a previous document in this series, Analyzing training-serving skew with TensorFlow Data Validation.
  • Pipeline scheduler. You can schedule the log analyzer pipeline to execute at specific times in the future. You manage the scheduling and execution of future runs by using Cloud Tasks, a scalable, fully managed service for scheduling and dispatching compute tasks on Google Cloud. In addition, you can run Dataflow Flex Template jobs directly (meaning that you can run them immediately).

Understanding log analysis workflow

The log analyzer pipeline is a Dataflow pipeline that uses TFDV to identify data skews and anomalies in the serving request-response logs stored in BigQuery.

The following diagram shows the workflow that's implemented by the log analyzer pipeline:

Log analyzer flow.

The diagram shows the sequence of the tasks that the pipeline performs. (The parameters are described later in this section.) The tasks are as follows:

  1. Extract a time series of records from the table in BigQuery that's represented by the request_response_log_table parameter, filtered by the model, version, start_time, and end_time filtering parameters.
  2. Convert the records to the format that's required by TFDV.
  3. Compute descriptive statistics for the time series of records, filtered by slices whose length is specified in the time_window parameter.
  4. Detect data anomalies in the time series by validating the computed statistics against the reference schema that's loaded from the location specified in the schema_file parameter.
  5. If any anomalies are detected, generate alerts.
  6. Store the calculated statistics and anomalies protocol buffers to the Cloud Storage location specified in the output_location parameter.

The following table lists the key parameters that can be passed to the log analyzer pipeline.

Parameter name Type Optional? Description
request_response_log_table String No The full name of the request-response log table in BigQuery. The name is in the following format:

<project_name>.<dataset_name>.<table_name>

model String No The name of the AI Platform Prediction model
version String No The version number of the AI Platform Prediction model
start_time String No The beginning of a time series of records in the log, in ISO date-time format (YYYY-MM-DDTHH:MM:SS)
end_time String No The end of a time series of records in the log, in ISO date-time format (YYYY-MM-DDTHH:MM:SS)
output_location String No A Cloud Storage URL for the output stats and anomalies
schema_file String No A Cloud Storage URL to the reference schema file that describes the model's input interface
baseline_stats_file String Yes The Cloud Storage URL to a baseline statistics file
time_window String Yes A time window for slice calculations. You must use the m or h suffix to designate minutes or hours. For example, 60m defines a 60-minute time window.

For information about the pipeline implementation and about how you can trigger it, see the README file in the associated GitHub repository.

Extracting and encoding the request-response log records

The record of a serving request that's captured in the AI Platform Prediction request-response log includes a timestamp that indicates when the request was captured. The record also includes a copy of the body of the request in JSON format.

The first step in the pipeline extracts a time series of records between two points in time. The start and end times are provided as parameters for that instance of the pipeline run. The request bodies are converted from JSON format into the format required by TFDV (tensorflow_data_validation.type.BeamExample) and then forwarded to other components for TFDV statistics generation and anomaly detection.

Calculating descriptive statistics

The pipeline uses the tensorflow_data_validation.GenerateStatistics PTransform to calculate statistics for a time series of records. For more information about what type of statistics are calculated, see TensorFlow Data Validation: Checking and analyzing your data.

By default, the pipeline calculates statistics by using all records in the time series. The pipeline can also divide the time series into a set of consecutive time slices and calculate additional statistics for each slice. At runtime, the pipeline takes parameters for whether to use a time series slice and what the width of the slice should be.

Detecting data anomalies

After descriptive statistics are calculated, the next step is anomaly detection. The pipeline uses the tensorflow_data_validation.validate_statistics function to detect data anomalies. For more information about the types of anomalies detected by the function, see TensorFlow Data Validation: Checking and analyzing your data.

By default, the pipeline detects data anomalies by comparing the calculated statistics to the reference schema that the serving requests are expected to conform to. The reference schema is a required runtime parameter of the pipeline. The techniques for defining the schema are covered in Analyzing training-serving skew with TensorFlow Data Validation. By comparing the serving data statistics to the reference schema, you can detect schema type skews. These include inconsistent features, inconsistent feature types, and inconsistent feature domains.

To detect feature distribution skews, the pipeline requires additional information. You can provide the pipeline with baseline statistics. Because the baseline statistics capture the expected distributions of the serving request features, TFDV can detect the feature distribution skew anomalies in addition to detecting schema anomalies.

The following flow diagram shows a successful execution of the log analyzer Dataflow pipeline.

Flow diagram produced by Dataflow.

Alerting on data anomalies

If data anomalies are detected, the pipeline can generate alerts. The pipeline logs a warning message in the corresponding Dataflow job execution log.

You can include additional alerting capabilities by modifying the Generate anomaly alerts step of the pipeline.

Scheduling log analysis runs

As described earlier in the architecture overview, the log analyzer pipeline is encapsulated in a Dataflow Flex template. The pipeline runs are executed by configuring and submitting Dataflow Flex Template jobs.

In the system described in this document, Dataflow Flex Template jobs can be started immediately or scheduled to start in the future. Scheduling a series of jobs on a timetable lets you maintain proactive and automated monitoring of serving data.

For proactive monitoring, the job starts immediately after you submit a job request by invoking the Dataflow Flex Template service. For automated monitoring, after you schedule a job, the job request is added to a Cloud Tasks queue. The queued request includes runtime parameters that you pass to the pipeline and it includes the scheduled execution time.

The following screenshot shows the queued log analyzer jobs in Cloud Tasks.

Listing from Cloud Tasks console showing queued jobs.

What's next