Comparing Machine Learning Models for Predictions in Cloud Dataflow Pipelines

This solution describes and compares the different design approaches for calling a machine learning model during a Cloud Dataflow pipeline, and examines the tradeoffs involved in choosing one approach or another. We present the results of a series of experiments that we ran to explore different approaches and illustrate these tradeoffs, both in batch and stream processing pipelines. This solution is designed for people who integrate trained models into data processing pipelines, rather than for data scientists who want to build machine learning models.

Introduction

As the person responsible for integrating this ML model into the Cloud Dataflow pipeline, you might wonder what the various approaches are, and which one suits best system requirements. Several considerations need your attention, such as:

  • Throughput
  • Latency
  • Cost
  • Implementation
  • Maintenance

It's not always easy to balance these considerations, but this solution can help you navigate the decision-making process based on your priorities. The solution compares three approaches for making predictions with a TensorFlow-trained machine learning (ML) model in batch and stream data pipelines:

  • Using a deployed model as a REST/HTTP API for streaming pipelines.
  • Using AI Platform (AI Platform) batch prediction jobs for batch pipelines.
  • Using Cloud Dataflow direct-model prediction for both batch and streaming pipelines.

All the experiments use an existing trained model, called the Natality dataset, which predicts baby weights based on various inputs. Because the goal of this solution is not to build a model, it doesn't discuss how the model was built or trained. See the Next Steps section for more details about the Natality dataset.

Platform

There are various ways to run a data pipeline and call a trained ML model. But the functional requirements are always the same:

  1. Ingesting data from a bounded (batch) or unbounded (streaming) source. Examples of sources from which to ingest data include sensor data, website interactions, and financial transactions.
  2. Transforming and enriching the input data by calling ML models for predictions. An example is parsing a JSON file to extract relevant fields to predict a maintenance date, make a product recommendation, or detect fraud.
  3. Storing the transformed data and predictions for analytics or for backup, or to pass to a queuing system to trigger a new event or additional pipelines. Examples include detecting potential fraud in real time or storing maintenance schedule information in a store that's accessible from a dashboard.

When you transform and enrich data with predictions in a batch ETL process, you aim for maximizing throughputs in order to reduce the overall amount of time needed for the whole data batch. On the other hand, when you process streaming data for online prediction, you aim for minimizing latency in order to receive each prediction in (near) real time. Thus, calling the model might become a bottleneck.

Core components

The batch and streaming experiments in this solution use three main technologies:

  • Apache Beam running on Cloud Dataflow to process the data.
  • TensorFlow to implement and train the ML model.
  • For some experiments, AI Platform as a hosting platform for the trained ML models to perform batch and online predictions.

We chose Apache Beam running on Cloud Dataflow to run data pipelines in this solution because:

  • Apache Beam is an open-source unified programming model that runs both streaming and batch data processing jobs.
  • Cloud Dataflow is a Google Cloud Platform (GCP) product that can run Apache Beam jobs without a server.

TensorFlow is an open-source mathematical library by Google that is used as a machine learning framework. TensorFlow enables building, training, and serving models on a single machine or in distributed environments. Models are portable to various devices and can also leverage available CPU, GPU, or TPU resources for training and serving.

AI Platform is a serverless platform that can train, tune (using the hyper-parameters tuning functionality), and serve TensorFlow models at scale with minimum management required by DevOps. AI Platform supports deploying trained models as REST APIs for online predictions, as well as submitting batch prediction jobs. AI Platform is one of several options that can serve your model as a microservice.

The approaches detailed in this solution use Cloud Dataflow for the data processing pipeline and AI Platform to host the models as HTTP endpoints. However, these approaches could be replaced with other technologies. The performance comparisons between HTTP and a direct TensorFlow model would not drastically change.

Processing batch and streaming data

The experiments in this solution include both batch and streaming use cases. Each experiment leverages different GCP products for input and output because unbounded and bounded sources have different operational requirements.

Batch-processing a bounded dataset

Figure 1 shows that in typical batch processing pipelines, raw input data is stored in object storage, such as Cloud Storage. Structured data storage formats include comma-separated values (CSV), optimized row columnar (ORC), Parquet, or Avro. These formats are often used when data comes from databases or logs.

Architecture of typical batch processing pipelines
Figure 1. Batch-processing architecture

Some analytical platforms such as BigQuery provide storage in addition to query capabilities. BigQuery uses Capacitor for storage. Apache Beam on Cloud Dataflow can read from and write to both BigQuery and Cloud Storage, in addition to other storage options in batch processing pipelines.

Stream-processing an unbounded datastream

For streaming, the inputs to a data processing pipeline are usually a messaging system, as shown in Figure 2. Technologies such as Cloud Pub/Sub or Kafka are typically used to ingest individual data points in JSON, CSV, or protobuf format.

Architecture of typical stream processing pipelines
Figure 2. Stream-processing architecture

Data points can be processed individually, or as groups in micro-batches by using windowing functions to perform temporal event processing. The processed data might go to several destinations, including:

  1. BigQuery for ad hoc analytics, through the streaming APIs.
  2. Cloud Bigtable for serving real-time information.
  3. Cloud Pub/Sub topic for triggering subsequent processes/pipelines.

You can find a complete list of source connectors (input) and sink connectors (output) for both bounded and unbounded data source sinks on the Apache Beam I/O page.

Invoking a TensorFlow model

A TensorFlow-trained model can be invoked in three ways:

  1. Through an HTTP endpoint for online prediction.
  2. Directly, by using the saved model file for batch and online predictions.
  3. Through an AI Platform batch prediction job for batch prediction.

HTTP endpoints for online prediction

TensorFlow models are deployed as HTTP endpoints to be invoked and give predictions in real time, either through a stream data processing pipeline or through client apps.

You can deploy a TensorFlow model as an HTTP endpoint for online predictions by using TensorFlow Serving or any other hosting service, such as Seldon. As shown in Figure 3, you can choose one of the following options:

  1. Deploy the model yourself on one or more Compute Engine instances.
  2. Use a Docker image on Compute Engine or Google Kubernetes Engine.
  3. Leverage Kubeflow to facilitate deployment on Kubernetes or Google Kubernetes Engine.
  4. Use App Engine with Endpoints to host the model in a web app.
  5. Use AI Platform, the fully managed ML training and serving service on GCP.
Options in Cloud Dataflow for serving a model as an HTTP endpoint
Figure 3. Different options in Cloud Dataflow for serving a model as an HTTP endpoint

AI Platform is a fully managed service, so it is easier to implement than the other options. Therefore, in our experiments we use it as the option for serving the model as an HTTP endpoint. We can then focus on the performance of a direct-model versus an HTTP endpoint in AI Platform, rather than comparing the different HTTP model-serving options.

Serving online predictions with AI Platform Prediction

Two tasks are required in order to serve online predictions:

  1. Deploying a model.
  2. Interacting with the deployed model for inference (that is, making predictions).

Deploying a model as an HTTP endpoint using AI Platform Prediction requires the following steps:

  1. Make sure that the trained model files are available on Cloud Storage.
  2. Create a model by using the gcloud ml-engine models create command.
  3. Deploy a model version by using the gcloud ml-engine versions create command, with the model files on Cloud Storage.

You can deploy a model by using commands like the following:

PROJECT="[PROJECT_ID]" # change to your project name
REGION="[REGION]"
BUCKET="[BUCKET]" # change to your bucket name
MODEL_NAME="babyweight_estimator" # change to your estimator name
MODEL_VERSION="v1" # change to your model version
MODEL_BINARIES=gs://${BUCKET}/models/${MODEL_NAME}

# upload the local SavedModel to GCS
gsutil -m cp -r model/trained/v1/* gs://${BUCKET}/models/${MODEL_NAME}

# set the current project
gcloud config set project ${PROJECT}

# list model files on GCS
gsutil ls ${MODEL_BINARIES}

# deploy model to GCP
gcloud ml-engine models create ${MODEL_NAME} --regions=${REGION}

# deploy model version
gcloud ml-engine versions create ${MODEL_VERSION} --model=${MODEL_NAME} --origin=${MODEL_BINARIES} --runtime-version=1.4

The code creates an AI Platform Prediction model called babyweight_estimator in the GCP project, with model version v1.

After the model is deployed, you can invoke it. The following Python code shows a way to invoke a model version in AI Platform Prediction as a REST API:

cmle_api = None

def init_api():

    global cmle_api

    if cmle_api is None:
        cmle_api = discovery.build('ml', 'v1',
                              discoveryServiceUrl='https://storage.googleapis.com/cloud-ml/discovery/ml_v1_discovery.json',
                              cache_discovery=True)


def estimate_cmle(instances):
    """
    Calls the babyweight estimator API on CMLE to get predictions

    Args:
       instances: list of json objects
    Returns:
        int - estimated baby weight
    """
    init_api()

    request_data = {'instances': instances}

    model_url = 'projects/{}/models/{}/versions/{}'.format(PROJECT, CMLE_MODEL_NAME, CMLE_MODEL_VERSION)
    response = cmle_api.projects().predict(body=request_data, name=model_url).execute()
    values = [item["predictions"][0] for item in response['predictions']]
    return values

If you have a large dataset available in something like BigQuery or Cloud Storage and you want to maximize the throughput of the overall process, serving your ML model as an HTTP endpoint is not recommended for batch prediction. Doing this generates one HTTP request for each data point, which results in a huge volume of HTTP requests. The following section presents better options for batch prediction.

Direct-model for batch and online predictions

The direct-model prediction technique leverages a local TensorFlow SavedModel on the Cloud Dataflow instances. The saved model is a copy of the output files created after you have finished building and training the TensorFlow model. The TensorFlow SavedModel can be:

  • Part of the pipeline source code that is submitted as a Cloud Dataflow job.
  • Downloaded from Cloud Storage, as shown in Figure 4.
Direct-model prediction in Cloud Dataflow
Figure 4. Direct-model prediction in Cloud Dataflow

In this solution, we use a SavedModel that is part of the source code on GitHub. To load a model on the instances, you do the following:

  1. When you create the Cloud Dataflow job, specify the pipeline dependencies to be loaded, including the model file. The following Python code shows the setup.py file that includes the model files to be submitted with the Cloud Dataflow job.

    import setuptools
    
    requirements = []
    
    setuptools.setup(
        name='TF-DATAFLOW-DEMO',
        version='v1',
        install_requires=requirements,
        packages=setuptools.find_packages(),
        package_data={'model': ['trained/*',
                                'trained/v1/*',
                                'trained/v1/variables/*']
                      },
    )
  2. Call the local model files during the pipeline. This produces the prediction for the given instances. The following Python code shows how to do this.

    predictor_fn = None
    
    
    def init_predictor():
        """ Loads the TensorFlow saved model to the predictor object
    
        Returns:
            predictor_fn
        """
    
        global predictor_fn
    
        if predictor_fn is None:
    
            logging.info("Initialising predictor...")
            dir_path = os.path.dirname(os.path.realpath(__file__))
            export_dir = os.path.join(dir_path, SAVED_MODEL_DIR)
    
            if os.path.exists(export_dir):
                predictor_fn = tf.contrib.predictor.from_saved_model(
                    export_dir=export_dir,
                    signature_def_key="predict"
                )
            else:
                logging.error("Model not found! - Invalid model path: {}".format(export_dir))
    
    
    def estimate_local(instances):
        """
        Calls the local babyweight estimator to get predictions
    
        Args:
           instances: list of json objects
        Returns:
            int - estimated baby weight
        """
    
        init_predictor()
    
        inputs = dict((k, [v]) for k, v in instances[0].items())
        for i in range(1,len(instances)):
            instance = instances[i]
    
            for k, v in instance.items():
                inputs[k] += [v]
    
        values = predictor_fn(inputs)['predictions']
        return [value.item() for value in values.reshape(-1)]

See the Apache Beam Multiple File Dependencies page for more details.

AI Platform batch prediction job

Besides deploying the model as an HTTP endpoint, AI Platform lets you run a batch prediction job by using a deployed model version or a TensorFlow SavedModel in Cloud Storage.

An AI Platform batch prediction job takes the Cloud Storage location of the input data files as a parameter. It uses the model to get predictions for that data, and then stores the prediction results in another Cloud Storage output location that is also given as a parameter. The following example shows gcloud commands that submit an AI Platform batch prediction job.

BUCKET='<BUCKET>'
DATA_FORMAT="TEXT"
INPUT_PATHS=gs://${BUCKET}/data/babyweight/experiments/outputs/data-prep-*
OUTPUT_PATH=gs://${BUCKET}/data/babyweight/experiments/outputs/cmle-estimates
MODEL_NAME='babyweight_estimator'
VERSION_NAME='v1'
REGION='<REGION>'
now=$(date +"%Y%m%d_%H%M%S")
JOB_NAME="batch_predict_$MODEL_NAME$now"
MAX_WORKER_COUNT="20"

gcloud ml-engine jobs submit prediction $JOB_NAME \
    --model=$MODEL_NAME \
    --input-paths=$INPUT_PATHS \
    --output-path=$OUTPUT_PATH \
    --region=$REGION \
    --data-format=$DATA_FORMAT \
    --max-worker-count=$MAX_WORKER_COUNT

Point-by-point versus micro-batching for online prediction

In real-time prediction pipelines, whether you are serving the model as an HTTP endpoint or using the model directly from the workers, you have two options to get predictions for incoming data points:

  • Individual point. The obvious option is to send each data point to the model individually and get a prediction.
  • Micro-batches. A more optimized option is to use a windowing function to create micro-batches, grouping data points within a specific time period, such as every 5 seconds. The micro-batch is then sent to the model to get predictions for all the instances at at time.

The following Python code shows how to create time-based micro-batches using a windowing function in an Apache Beam pipeline.

def run_pipeline_with_micro_batches(inference_type, project,
                                    pubsub_topic, pubsub_subscription,
                                    bq_dataset, bq_table,
                                    window_size, runner, args=None):

    prepare_steaming_source(project, pubsub_topic, pubsub_subscription)
    prepare_steaming_sink(project, bq_dataset, bq_table)
    pubsub_subscription_url = "projects/{}/subscriptions/{}".format(project, pubsub_subscription)
    options = beam.pipeline.PipelineOptions(flags=[], **args)

    pipeline = beam.Pipeline(runner, options=options)
    (
            pipeline
            | 'Read from PubSub' >> beam.io.ReadStringsFromPubSub(subscription=pubsub_subscription_url, id_label="source_id")
            | 'Micro-batch - Window Size: {} Seconds'.format(window_size) >> beam.WindowInto(FixedWindows(size=window_size))
            | 'Estimate Targets - {}'.format(inference_type) >> beam.FlatMap(lambda messages: estimate(messages, inference_type))
            | 'Write to BigQuery' >> beam.io.WriteToBigQuery(project=project,
                                                             dataset=bq_dataset,
                                                             table=bq_table
                                                             )
    )

    pipeline.run()

The micro-batching approach uses models deployed as HTTP endpoints, which dramatically reduces the number of HTTP requests and reduces latency. Even when the micro-batching technique used with the direct model, sending the model a tensor with N instances for prediction is more efficient than sending a tensor with a length of 1 because of the vectorized operations.

Batch experiments

In the batch experiments, we want to estimate baby weights in the Natality dataset in BigQuery by using a TensorFlow regression model. We then want to save the prediction results in Cloud Storage as CSV files by using a Cloud Dataflow batch pipeline. The following section describes different experiments we tried to accomplish this task.

Approach 1: Cloud Dataflow with direct-model prediction

In this approach, Cloud Dataflow workers host the TensorFlow SavedModel, which is called directly for prediction during the batch processing pipeline for each record. Figure 5 shows the high-level architecture of this approach.

Batch Approach 1: Cloud Dataflow with direct model prediction
Figure 5. Batch Approach 1: Cloud Dataflow with direct model prediction

The Cloud Dataflow pipeline performs the following steps:

  1. Read data from BigQuery.
  2. Prepare BigQuery record for prediction.
  3. Call the local TensorFlow SavedModel to get a prediction for each record.
  4. Convert the result (input record and estimated baby weight) to a CSV file.
  5. Write the CSV file to Cloud Storage.

In this approach, there are no calls to remote services, such as a deployed model on AI Platform as an HTTP endpoint. The prediction is done locally within each Cloud Dataflow worker by using the TensorFlow SavedModel.

Approach 2: Cloud Dataflow with AI Platform batch prediction

In this approach, the TensorFlow SavedModel is stored in Cloud Storage and used by AI Platform for prediction. However, instead of making an API call to the deployed model for each record as with the previous approach, the data is prepared for prediction and submitted as a batch.

This approach has two phases:

  1. Cloud Dataflow prepares the data from BigQuery for prediction, and then stores the data in Cloud Storage.
  2. The AI Platform batch prediction job is submitted with the prepared data, and the prediction results are stored in Cloud Storage.

Figure 6 shows the overall architecture of this two-phased approach.

Batch Approach 2: Cloud Dataflow with AI Platform batch prediction
Figure 6. Batch Approach 2: Cloud Dataflow with AI Platform batch prediction

The workflow steps, including the Cloud Dataflow pipeline, are as follows:

  1. Read data from BigQuery.
  2. Prepare BigQuery record for prediction.
  3. Write JSON data to Cloud Storage. The serving_fn function in the model expects JSON instances as input.
  4. Submit an AI Platform batch prediction job with the prepared data in Cloud Storage. This job writes the prediction results to Cloud Storage as well.

The Cloud Dataflow job prepares the data for prediction rather than submitting the AI Platform prediction job. In other words, the data preparation task and the batch prediction task are not tightly coupled. Cloud Functions, Airflow, or any scheduler can orchestrate the workflow by executing the Cloud Dataflow job and then submitting the AI Platform job for batch prediction.

AI Platform batch prediction is recommended for both performance and ease of use if your data meets the following criteria:

  • Your data is available in Cloud Storage, in the format expected for prediction, from a previous data ingestion process.
  • You don't have control of the first phase of the workflow, such as the Cloud Dataflow pipeline that prepares the data in Cloud Storage for prediction.

Experiment configurations

We used the following configurations in three experiments:

  • Data sizes: 10K, 100K, 1M, and 10M rows
  • Cloud Storage class: Regional Storage
  • Cloud Storage location: europe-west1-b
  • Cloud Dataflow region: europe-west1-b
  • Cloud Dataflow worker machine type: n1-standard-1
  • Cloud Dataflow Autoscaling for batch data up to 1 million records
  • Cloud Dataflow num_worker: 20 for batch data up to 10 million records
  • AI Platform batch prediction max-worker-count setting: 20

The Cloud Storage location and the Cloud Dataflow region should be the same. This solution uses the europe-west1-b region as an arbitrary value.

Results

The following table summarizes the results (timings) of performing the batch predictions and direct-model predictions with different sizes of datasets.

Batch data size Metric Cloud Dataflow then AI Platform batch prediction Cloud Dataflow with direct-model prediction
10K rows Running time 15 min 30 sec

(Cloud Dataflow: 7 min 47 sec +
AI Platform: 7 min 43 sec)
8 min 24 sec
Total vCPU time 0.301 hr

(Cloud Dataflow: 0.151 hr +
AI Platform: 0.15 hr)
0.173 hr
100K rows Running time 16 min 37 sec

(Cloud Dataflow: 8 min 39 sec +
AI Platform: 7 min 58 sec)
10 min 31 sec
Total vCPU time 0.334 hr

(Cloud Dataflow: 0.184 hr +
AI Platform: 0.15 hr)
0.243 hr
1M rows Running time 21 min 11 sec
(Cloud Dataflow: 11 min 07 sec +
AI Platform: 10 min 04 sec)
17 min 12 sec
Total vCPU time 0.446 hr

(Cloud Dataflow: 0.256 hr +
AI Platform: 0.19 hr)
1.115 hr
10M rows Running time 33 min 08 sec
(Cloud Dataflow: 12 min 15 sec +
AI Platform: 20 min 53 sec)
25 min 02 sec
Total vCPU time 5.251 hr

(Cloud Dataflow: 3.581 hr +
AI Platform: 1.67 hr)
7.878 hr

Figure 7 shows a graph of these results.

Graph showing timings for 3 approaches for 4 different dataset sizes
Figure 7. Graph showing timings for 3 approaches for 4 different dataset sizes

As the results show, the AI Platform batch prediction job on its own takes less time to produce predictions for the input data, given that the data is already in Cloud Storage in the format used for prediction. However, when the batch prediction job is combined with a preprocessing step (extracting and preparing the data from BigQuery to Cloud Storage for prediction) and with a post-processing step (storing the data back to BigQuery), the direct-model approach produces better end-to-end execution time. In addition, the performance of the direct-model prediction approach can be further optimized using micro-batching (which we discuss later for the streaming experiments).

Stream experiments

In the streaming experiments, the Cloud Dataflow pipeline reads data points from a Cloud Pub/Sub topic and writes the data to BigQuery by using the streaming APIs. The Cloud Dataflow streaming pipeline processes the data and gets predictions using the TensorFlow baby-weight estimation model.

The topic receives data from a stream simulator that generates data points, which are the instances to estimate the baby weight for, at a predefined rate of events per second. This simulates a real-world example of an unbounded data source. The following Python code simulates the data stream sent to a Cloud Pub/Sub topic.

client = pubsub.Client(project=PARAMS.project_id)
topic = client.topic(PARAMS.pubsub_topic)
if not topic.exists():
    print 'Topic does not exist. Please run a stream pipeline first to create the topic.'
    print 'Simulation aborted.'

    return

for index in range(PARAMS.stream_sample_size):

    message = send_message(topic, index)

    # for debugging
    if PARAMS.show_message:
        print "Message {} was sent: {}".format(index+1, message)
        print ""

    time.sleep(sleep_time_per_msg)

Approach 1: Cloud Dataflow with AI Platform online prediction

In this approach, the TensorFlow model is deployed and hosted in AI Platform as a REST API. The Cloud Dataflow streaming pipeline calls the API for each message consumed from Cloud Pub/Sub get predictions. The high-level architecture of this approach is shown in Figure 8.

Stream Approach 1: Cloud Dataflow with AI Platform online prediction
Figure 8. Stream Approach 1: Cloud Dataflow with AI Platform online prediction. The HTTP request might include a single data point or a group of data points in a micro-batch.

In this approach, the Cloud Dataflow pipeline performs the following steps:

  1. Read messages from a Cloud Pub/Sub topic.
  2. Send an HTTP request to the AI Platform model's API to get predictions for each message.
  3. Write results to BigQuery by using streaming APIs.

Micro-batching is a better approach. That is, instead of sending an HTTP request to the model's REST API for each message that is read from Cloud Pub/Sub, Cloud Dataflow groups messages received during a 1-second window. It then sends this group of messages as a micro-batch in a single HTTP request to the model's API. In this approach, the Cloud Dataflow pipeline performs the following steps:

  1. Read messages from Cloud Pub/Sub topic.
  2. Apply a 1-second windowing operation to create a micro-batch of messages.
  3. Send an HTTP request with the micro-batch to the AI Platform model's API to get predictions for the messages.
  4. Write results to BigQuery by using streaming APIs.

The rationale behind this approach is that it:

  1. Reduces the number of calls to the remote service, such as the AI Platform model.
  2. Reduces the average latency of serving each message.
  3. Reduces the overall processing time of the pipeline.

In this experiment, the time window was set to 1 second. However, the micro-batch size, which is the number of messages sent as a batch to AI Platform mode, varies. The micro-batch size depends on the message generation frequency—the number of messages per second.

The following section describes experiments with three different frequencies: 50, 100, and 500 messages per second. That is, the micro-batch size is 50, 100, and 500.

Approach 2: Cloud Dataflow with direct-model prediction

This approach is similar to the approach that was used in the batch experiments. The TensorFlow SavedModel is hosted on Cloud Dataflow workers and is called for prediction during the stream processing pipeline for each record. Figure 9 shows the high-level architecture of this approach.

Stream approach 2: Cloud Dataflow with direct-model prediction
Figure 9. Stream approach 2: Cloud Dataflow with direct-model prediction

In this approach, the Cloud Dataflow pipeline performs the following steps:

  1. Read messages from Cloud Pub/Sub topic.
  2. Call the local TensorFlow SavedModel to get predictions for each record.
  3. Write results to BigQuery by using streaming APIs.

The micro-batching technique can also be used in the stream pipeline with the direct-model prediction approach. Instead of sending a tensor of one data instance to the model, we can send a tensor of N data instances, where N is equal to the messages received within the Cloud Dataflow window to the model. This technique uses the vectorized operations of the TensorFlow model and gets several predictions in parallel.

Experiment configurations

We used the following configurations for these experiments:

  • Stream data size: 10K records (messages)
  • Simulated messages per second (MPS): 50, 100, and 500
  • Window size (in micro-batch experiments): 1 second
  • Cloud Dataflow region: europe-west1-b
  • Cloud Dataflow worker machine type: n1-standard-1
  • Cloud Dataflow num_worker: 5 (no auto-scaling)
  • AI Platform model API nodes: 3 (manualScale)

Results

The following table summarizes the results of performing the streaming experiments with different volumes of data (messages per second). Messages frequency refers to the number of messages sent per second, and simulation time refers to the time to send all the messages.

Stream messages frequency Metric Cloud Dataflow with AI Platform online prediction   Cloud Dataflow with direct-model prediction  
    Single message Micro-batching Single message Micro-batching
50 msg per sec

(Simulation time: 3 min 20 sec)
Total time 9 min 34 sec 7 min 44 sec 3 min 43 sec 3 min 22 sec
100 msg per sec

(Simulation time: 1 min 40 sec)
Total time 6 min 03 sec 4 min 34 sec 1 min 51 sec 1 min 41 sec
500 msg per sec

(Simulation time: 20 sec)
Total time NA - Default AI Platform Online Prediction Quota 2 min 47 sec 1 min 23 sec 48 sec

Figure 10 shows a graph of these results.

Graph showing timings for different approaches and frequencies
Figure 10. Graph showing timings for different approaches and frequencies

As shown in the results, the micro-batching technique improves the execution performance in both AI Platform online prediction and in direct-model prediction. In addition, using direct-model with streaming pipeline shows 2 times to 4 times the performance improvement compared to calling an external REST/HTTP API for online prediction.

Conclusions

According to the described approaches and experiment results, we recommend the following approaches.

Batch processing

  • If you are building your batch data processing pipeline, and you want prediction as part of the pipeline, use the direct-model approach for the best performance.
  • Improve the performance of the direct-model approach by creating micro-batches of the data points before calling the local model for prediction to make use of the parallelization of the vectorized operations.
  • If your data is populated to Cloud Storage in the format expected for prediction, use AI Platform batch prediction for the best performance.
  • Use AI Platform if you want to use the power of GPUs for batch prediction.
  • Do not use AI Platform online prediction for batch prediction.

Stream processing

  • Use direct-model in the streaming pipeline for best performance and reduced average latency. Predictions are performed locally, with no HTTP calls to remote services.
  • Decouple your model from your data processing pipelines for better maintainability of models used in online predictions. The best approach is to serve your model as an independent microservice by using AI Platform or any other web hosting service.
  • Deploy your model as an independent web service to allow multiple data processing pipelines and online apps to consume the model service as an endpoint. In addition, changes to the model are transparent to the apps and pipelines that consume it.
  • Deploy multiple instances of the service with load balancing to improve the scalability and the availability of the model web service. With AI Platform, you only need to specify the number of nodes (manualScaling) or minNodes (autoScaling) in the yaml configuration file when you deploy a model version.
  • If you deploy your model in a separate microservice, there are extra costs, depending on the underlying serving infrastructure. See the pricing FAQ for AI Platform online prediction.
  • Use micro-batching in your streaming data processing pipeline for better performance with both the direct-model and HTTP-model service. Micro-batching reduces the number of HTTP requests to the model service, and uses the vectorized operations of the TensorFlow model to get predictions.

What's next

Оцените, насколько информация на этой странице была вам полезна:

Оставить отзыв о...

Текущей странице