Infrastructure options for data pipelines in advertising (part 2)

This article focuses on the data pipelines and machine learning tasks that are common to the different advertising platforms. The article complements Infrastructure options for serving advertising workloads (part 1). Both articles provide necessary context for the series:

This article is part of the following series::

For an overview into how these platforms work together and the ad-tech terminology used throughout this series, see Building advertising platforms (overview).

The datastores (in part 1) used in the data pipelines are the (unique) user profile store, the context store (in part 1), and the reporting/dashboarding store (in part 1). These datastores are fed by two main sources: events and third-party data. This article focuses on event management. For more information about third-party data and its use in enriching user data, see enriching data (in part 4).

Event lifecycle

The data pipeline from raw events to useful data can be broken down into:

  • Collecting and storing (ingestion): Through a messaging system or recurrent file uploads to a storage system.
  • Processing: Either in batches or in streaming mode for real-time processing, when data freshness is important.
  • Export (or loading): Either directly from the data processing tool or through a custom workflow. Destinations are commonly the stores mentioned above.

The most common events in ad tech are:

  • Ad and bid requests: Generally received from an external system. Requests contain details that form part of the input for ad selection.
  • Impressions: Creatives loaded on a web page but not always viewable.
  • Billable impressions: Rendered and/or viewable impressions.
  • Clicks: Actions that a user can take by clicking on a creative.
  • Conversions: Actions that a targeted user performs on the advertiser's website.

Events associated with real-time bidding are covered in Infrastructure options for RTB bidders (part 4)

Collecting

Events can be created by:

  • Ad or bid request instances: Instances that receive a request return either a URL for the creative or a bid response.
  • Collector instances: Instances that return an invisible pixel to log impressions and/or collect actions that a targeted user performed on an ad (actions such as clicks or video plays).
  • Stackdriver Logging: In some cases, this logging can replace collector instances and server log files.

Events can be collected by:

  • Custom code that publishes the event to a messaging system such as Cloud Pub/Sub, or that writes to a local log file.
  • Third-party tools or native logging functionality on your web server.
  • A GCP logging agent that supports selected third-party software and integrates with Stackdriver Logging.

Events can be ingested:

  • In near-real time, when log files are written locally, then periodically copied to shared storage such as Cloud Storage or BigQuery Capacitor for processing. BigQuery storage is typically used if that processing involves analytical queries.
  • In real time, when using Stackdriver Logging or when your collectors write directly to a low-latency datastore or messaging system such as Cloud Pub/Sub, Apache Kafka, or RabbitMQ. Real-time processing systems often use this option.

Stackdriver Logging can facilitate many of these tasks because it can capture data directly from GCP products such as Compute Engine, Google Kubernetes Engine, and even from HTTP load balancing. Logging can also export data directly to BigQuery for ad hoc analytics, stream to Cloud Pub/Sub for alerting and real-time processing purposes, and export in batch to Cloud Storage for backup or federated analytics.

Here are a few examples that illustrate the previous points and that consider operations, cost, and data freshness:

Option Cost Operational overhead Data freshness
Copy log files to Cloud Storage every X seconds, then to BigQuery using bq load No ingress cost to Cloud Storage

No ingestion cost to BigQuery

BigQuery storage costs
Requires management of files, failures, retries, and sync Near real time
Copy log files to Cloud Storage every X seconds, then to BigQuery using Cloud Functions No ingress cost to Cloud Storage

No ingestion cost to BigQuery

Extra cost with Cloud Functions

BigQuery storage costs
Cloud Functions facilitate the load management. The logic still needs to be coded. Near real time
Use Stackdriver Logging with an export to Cloud Pub/Sub Cloud Pub/Sub costs Low Real-time
Use a local daemon to stream logs to Kafka Storage and compute costs required to run Kafka Setting up and managing Kafka unless using GCP-managed option Near or real time depending on how Kafka is set up

Tip: When using compute instances to collect the events, always consider using preemptible VMs, as explained in the compute platform section, to save on costs.

Storing data

Where you store your data is influenced by the data's format, how the data is accessed and used, and the cost of storing the data. If the data format is unstructured or requires storing before processing then, as recommended previously, consider using Cloud Storage. For structured data, you also need to consider the effort required to access the record(s). The following diagram can help you evaluate the access pattern to minimize the number of operations and the cost.

Recommendations to help you export data

Heavy-read storing patterns (in part 1) addresses options used for storing and serving. The rest of this section covers analytical data stores used both with streaming and batch processing.

In streaming, you process raw data before storing it. If you also want to make the data immediately available for ad hoc querying, consider streaming into BigQuery. You can do this easily by using this Cloud Dataflow template from Cloud Pub/Sub to BigQuery.

In recurrent batch processing, you consolidate data by storing it in a shared and scalable environment. A common pattern is moving log files every few minutes from their local location to object storage. Filenames are often prefixed and suffixed—for example: logs_serverABC_timestamp123.txt.

You can run your analytics workloads on the following storage systems:

  • Cloud Storage: Using its standard, Nearline, and Coldline storage classes, you can save data for quick access, backup, or archiving. Set up standard storage, the preferred option for analytics, as a regional bucket when possible. Set up the storage in the same region as the compute resources that process the data. Cloud Storage is accessible from most GCP products, including Cloud Dataproc, Cloud Dataflow, Cloud Dataprep by Trifacta, and BigQuery.
  • BigQuery: BigQuery is not only a powerful querying engine. It also has its own storage, called Capacitor. Capacitor lets you store exabytes of data, and BigQuery storage is accessible from Cloud Dataproc, Cloud Dataflow, and from the BigQuery querying engine. With BigQuery's long-term storage, your storage costs drop by approximately 50 percent for partitions that are not edited for 90 days.
  • Cloud Bigtable: With billions of events collected every day, Cloud Bigtable is a great choice if you need both heavy writes and heavy reads in single-digit milliseconds. It is accessible through the HBase API and other clients. Cloud Bigtable can also be used with the big data ecosystem for graphs, time series, and analytics.

We make the following general recommendations:

  • Store raw data in BigQuery in parallel to any other processing. It is easy to do rollups from there on an hourly, daily, weekly, or monthly basis. Loading options depend on your requirements. Read more in the BigQuery loading data documentation.
  • If you are cost conscious, data stored in Cloud Storage can be loaded to BigQuery for no charge, or at a lower price than streaming, by using bq load, Cloud Functions, the job API, or federated queries. The first two options are subject to quotas.
  • Use BigQuery's storage features such as partitions and clustering to minimize querying time and costs.

Processing events

When choosing a technology to build your processing pipelines, consider the following:

  • Latency: Decide which data needs to be processed in real time. For example, you might need to calculate budget counters as quickly as possible.
  • Correctness: Some values must be calculated exactly, although perhaps not immediately—for example, billing amounts.
  • High availability: With billions of data inputs every day, a few minutes of downtime can result in a significant financial impact.
  • Operational overhead: "Keeping the lights on" might not be the best use of your technical resources.

Consider the following example:

  • HTTP load balancing logs are ingested in real time by using Stackdriver Logging.
  • Some logs must be processed immediately to calculate the remaining budget.
  • Impressions counts are aggregated and required hourly; campaign frequency caps daily.

It is common for companies to employ the lambda architecture for their data processing pipelines to balance:

  • Fast approximations through a real-time processing pipeline.
  • Exactness through an additional offline batch-processing pipeline.

Lambda data processing pipeline

This section describes some of the GCP products that you can use to implement both lambda and kappa data processing architectures, as well as Cloud Dataflow:

  • Cloud Dataproc (batch and stream): If you already have existing Hadoop or Spark jobs and scripts, you can migrate them as-is to Cloud Dataproc, GCP-managed Spark, or Hadoop.
  • Cloud Dataflow (batch and stream): If you have new workloads, are looking at using advanced streaming features, or want a unified model programming model, Cloud Dataflow provides a fully managed service that runs Apache Beam—open sourced by Google. Cloud Dataflow also supports many inputs and outputs such as Cloud Pub/Sub and Kafka. Cloud Dataflow offers a unified programming model for streaming and batch data alike that supports exactly-once processing.
  • BigQuery (batch): When considering an ELT (extract, load, and transform) approach or when performing subsequent transformations after the data has been loaded into the data warehouse, you can use BigQuery for the SQL transformations. It is managed and also provides user-defined functions. For orchestration of these queries, consider Cloud Composer, which is managed Apache Airflow.
  • Third-party tools: You can install and manage tools from the Hadoop ecosystem or tools such as Storm on Compute Engine or Google Kubernetes Engine (GKE).

The following architecture depicts a recommendation based on these requirements:

  • Ingesting the events in real time.
  • Computing some counters every second.
  • Rolling up impression counts hourly.
  • Calculating the ads' click-through-rate daily.

A data processing pipeline that uses Cloud Pub/Sub

The data processing flow is as follows:

  1. Events are written to Cloud Pub/Sub.
  2. Cloud Dataflow writes the event-level data directly to BigQuery.
  3. Cloud Dataflow also windows the data into 1-second intervals to perform the required aggregations and writes the counters out to regional Cloud Bigtable instances.
  4. BigQuery recurrently runs a query to roll up the data and materializes the results. This can be scheduled either through cron jobs or by using Apache Airflow scheduling options through Cloud Composer.
  5. User frontends can use BigQuery as an OLAP database. For more details, see reporting (in part 1).
  6. Regional ad servers query nearby Cloud Bigtable instances to quickly retrieve counters.

Follow these general recommendations for building a data processing pipeline:

  • Use Cloud Pub/Sub to minimize operational overhead, or consider running Apache Kafka as a managed service on GCP.
  • Consider writing your pipeline with Apache Beam to approach batch and stream through a unified programming model.
  • Run Apache Beam on Cloud Dataflow to benefit from a fully managed service that can autoscale the number of workers throughout the lifetime of the job and dynamically rebalance work to decrease your job's overall processing time.

If you have events or other data that you want to visually explore, clean, and prepare for analysis, consider using Cloud Dataprep. You don't need to write code, and Cloud Dataprep allows you to export Cloud Dataflow templates, which you can reuse and schedule.

Exports

When events have been ingested and processed, the results can be exported to:

  • Offline stores such as BigQuery or Cloud Storage for offline processing, including rollups and analytics.
  • Serving stores such as Cloud Bigtable, Cloud Datastore, Cloud Memorystore, and third-party stores. Frontend servers use these stores, for example, to retrieve information on user profiles and update counters when ads are selected.
  • Messaging systems such as Cloud Pub/Sub or Kafka, when the data requires further downstream processing or is being sent as an alert—for example, when managing budgets.

Data replication is another export use case. For example, when you need data to be close to your frontend servers or even possibly on the servers, there are two approaches:

  • In some cases, if your choice of technology supports it, you can use native replication features. Some technologies such as Redis and Aerospike support replication within regions. However, cross-region replication might prove more challenging.
  • Other technologies might not support replication, in which case you might implement it with a messaging system and processors running on Compute Engine or Cloud Pub/Sub.

The following diagram shows a few different approaches:

Structure of data replication with GCP stores

Data is processed in real time by using Cloud Dataflow and offline by using BigQuery, after which:

  • Cloud Dataflow writes data directly to a Redis cluster, using the Apache Beam Redis IO, which in turn replicates data to its local workers.
  • Cloud Dataflow publishes messages to Cloud Pub/Sub. The messages are then read by an autoscaled pool of subscribers, deployed on Compute Engine, which then write them to an Aerospike cluster that's running on Compute Engine.
  • Records from the BigQuery offline jobs, scheduled through Cloud Composer, are exported to the Redis and Aerospike clusters.

When exporting data, we recommend the following:

  • Make sure that the chosen datastore can handle both your read and write patterns. Otherwise, consider decoupling the read infrastructure, as detailed in heavy-read storing patterns (in part 1).
  • For analytics, use BigQuery with clustering and partitioning to minimize query costs and durations.
  • For single-digit-millisecond reads and writes, consider using Cloud Bigtable. Enable replication for high availability.
  • For real-time writes to BigQuery, use the default Streaming API from the Apache Beam BigQuery IO. With the Apache Beam Java SDK, you can write in micro-batches through Cloud Storage by using FILE_LOADS to reduce costs.
  • For heavy writes that are less than 1 millisecond, consider using a third-party datastore installed on Compute Engine or Cloud Pub/Sub.

Automation

Your data pipeline might have one of several offline flows to:

  • Copy BigQuery rollup data to another store for fast OLAP dashboards.
  • Copy serving data such as updated customer segments to Redis, Aerospike, or Cloud Bigtable.
  • Replicate data across datacenters.
  • Copy metadata data from the user frontend database (in part 1) to stores that can handle heavy reads (in part 1).

For end-to-end automation and failure management, consider using Cloud Composer for Apache Airflow. Airflow is the recommended open source technology for managing workflows on GCP. DAGs can be triggered manually, by an event, or scheduled to run recurrently.

If you require a simpler event-driven action, you can trigger Cloud Functions on new files created on Cloud Storage or on new events published to Cloud Pub/Sub. Cloud Functions is fully managed, which eliminates the operational overhead. For more customized serverless options, consider reading about Knative, a promising Kubernetes-based add-on to build, deploy, and manage serverless workloads.

Analytics

BigQuery is our recommended data warehouse for analytical processing and ad hoc querying because it:

  • Is fully managed.
  • Provides an optimized storage layer and a scalable querying engine.
  • Enables ad hoc querying over terabytes of data by using standard SQL, including window functions and UDFs.
  • Offers pricing options for your querying usage through on-demand or flat-rate pricing.
  • Offers long-term storage pricing with a long-term rate.
  • Provides machine learning capabilities with BigQuery ML.
  • Has integrated monitoring and cost controls.

Tips:

Consider using BigQuery authorized views. An authorized view allows you to share query results without giving access to the underlying tables, and restrict the columns the users are able to query.

If you are interested in migrating from Hive, consider loading Parquet files to BigQuery.

Although we recommend using BigQuery for your analytics and SQL-based offline processing, GCP also provides other options:

  • For Hadoop workloads including Apache Spark, Hive, and Pig, consider Cloud Dataproc. The Cloud Dataproc connector lets you run Apache Spark and Hadoop jobs over Cloud Storage and has a number of benefits, including high data availability, interoperability with other GCP services, and HDFS compatibility.
  • You can install and manage third-party tools on Compute Engine or Cloud Pub/Sub. Druid is commonly used in addition to BigQuery as a low latency OLAP database for frontend users.

Build machine learning capabilities

Processing events is not only about cleaning and aggregating. By adding machine learning capabilities to your data pipeline, you can add intelligence such as recommending better ads or creating virtual user segments that can be used as model features. GCP offers a full range of machine learning AI building blocks, including:

With billions of daily events being collected and stored in your data lake or warehouse, whether it is Cloud Storage or BigQuery, you can use this data to train powerful models related to bidding—for example:

  • Decide whether to bid or not.
  • Estimate the CTR.
  • Optimize the bid price.
  • Segment customers.
  • Calculate customer lifetime values (LTVs).
  • Recommend an ad to select.

When choosing your machine learning platform, you must answer some questions:

  • How well do I know my data?
  • How much data do I have?
  • How much data will be used for training?
  • Is training going to be online or offline?
  • Are predictions going to be done online or offline?
  • Can serving happen independently of the prediction?

The following diagram shows a common machine learning flow, with the following steps:

  1. Clean/prepare the datasets with BigQuery.
  2. Export the training and evaluation datasets to Cloud Storage.
  3. Train the model using AI Platform.
  4. Export the model to Cloud Storage.
  5. When a worker is initialized, import the model from Cloud Storage.
  6. Use the TensorFlow model locally to perform predictions at low latency.

A common machine learning flow

Preparing data and feature engineering

Before the data is ready to be fed to a machine learning model, do the following tasks:

  1. Explore the dataset in order to understand the data's suitability for the task at hand.
  2. Clean and prepare the dataset by joining data from multiple tables and filtering out non-applicable records.
  3. Extract, construct, and select features—creating informative, discriminative properties of the thing being observed.

BigQuery is well suited to these tasks for data stored in BigQuery and for external federated data sources. You can use BigQuery to query and explore the data before exporting your filtered, selected datasets to Cloud Storage for feature engineering.

Tip: In addition to using BigQuery for the exploration of your data, you can connect Cloud Dataprep to BigQuery to sample and (visually profile your data.

The next task typically requires you to consider whether the predictions will be made online or offline. It is important when making online predictions that you consider how the features will be extracted from the prediction's request data:

  • For online predictions, you need to perform the same feature creation steps during training and prediction to prevent skew. tf.Transform lets you define these pre-processing steps, leverage Apache Beam to carry out this work at scale during training and evaluation, and also export the steps as part of a TensorFlow graph to serve the predictions. This blog provides some great additional insights on how this process works.
  • For offline predictions, you can use the same approach during the training and prediction phases. You could use BigQuery to create the features as part of your batch preprocessing. For example, you could vectorize features by using a hash function, or look up an associative value through a join.

Training and evaluation

GCP offers a number of different options for training and evaluating a model, including:

  • Using AI Platform to run XGboost, Scikit-Learn, and TensorFlow training and evaluation tasks in a fully managed environment. AI Platform also provides extra features such as automated hyperparameter tuning and distributed training.

  • Running the training and evaluation tasks on Compute Engine instances directly. You will have to install the desired software packages. You can also take advantage of GPUs, TPUs, or preemptible VMs when appropriate to reduce costs and processing time.

  • Using Kubeflow to install and run TensorFlow and many machine learning tools such as Notebook in a containerized environment on Kubernetes.

  • Using BigQuery ML straight from the SQL interface to train offline on structured data.

Your choice of your ML platform depends on the requirements:

  • To minimize costs, use Compute Engine with preemptible VMs.
  • If you require a fully managed environment for training, deploying, and serving, use AI Platform.
  • To create reproducible, extended ML environments that you can run on any Kubernetes cluster, use Kubeflow.
  • When you have structured data, you will be predicting offline, and are wanting to implement either a linear or logistic regression, use BigQuery ML.

Predicting

Predicting is done either offline or online using the same products mentioned in the training section. Compute Engine, Kubeflow, and AI Platform can all use TensorFlow Serving to make predictions. The differences between these options are the operational overhead, tuning options, and price.

If low latency is a critical requirement, you can also use the serialized or compiled model directly, which can also be useful in data pipelines. See What's next for additional links.

Serving

Predicting and serving are sometimes considered to be the same task, which is true for online predictions. However, if you make the predictions offline and then persist the results to a data store, you will need to serve the predictions from this store as and when they are requested.

Serving fast predictions is a trade-off between effectiveness and efficiency. You can use different approaches or a combination of some of them. If you decide to use TensorFlow Serving to predict in real time, consider using accelerators such as GPUs or TPUs, and using one of the following methods to optimize your model for serving:

  • Quantization with tf.quantize.
  • Freezing the graph variables to constants.
  • Structure your code to ensure that prediction code doesn't contain any of the overhead used in training or evaluations, for example, surplus logging.
  • Consider fused operations such as fused batch normalization when using a GPU.

If you decide to use pre-made predictions from a fast key/value store, you need to create keys based on permutations of features. Suppose that you want to predict whether to bid or not based on continent and device type:

  1. Create all possible combinations of continent names and mobile/web.
  2. Store the result of the predictions for those combinations.

    Key Prediction
    antarctica_mobile 0
    antarctica_web 1
    asia_mobile 1
    asia_web 0
    europe_mobile 1
    europe_web 0
    northamerica_mobile 1
    northamerica_web 1
    southamerica_mobile 1
    southamerica_web 0
    oceania_mobile 1
    oceania_web 0

    You can then do a quick fetch at the correct key when you receive a request. Redis, Aerospike, and Cloud Bigtable are good candidates for this use case.

Before implementing, keep the following in mind:

  • If you have many features, the size of the combination might be greater than the allowed maximum size for a key.
  • To support a large number of requests and a large number of keys, consider the key distribution and hash (part of) the key if necessary to avoid hotspots on specific rows. Cloud Bigtable has a key visualizer tool to help diagnose such issues.
  • If you have no categorical data for such a continuous value, you must bucketize them. Determining the bucket size for each feature is a task in itself.
  • Use embeddings in order to calculate distance between keys. If the key doesn't exist, you can find the closest neighbor. There are different techniques to create locality-sensitive hashing. Computing those hashes is a machine learning task.

What's next

Was this page helpful? Let us know how we did:

Send feedback about...