You can use Google Cloud Platform to build a scalable, efficient, and effective service for delivering relevant product recommendations to users in an online store.
Competition in online-selling sites has never been as fierce as it is now. Customers spend more money across all their providers, but they spend less per retailer. The average size of a single cart has decreased, partly due to the fact that competition is just one click away. Offering relevant recommendations to potential customers can play a central role in converting shoppers to buyers and growing average order size.
After reading this solution, you should be able to set up an environment that supports a basic recommendation engine that you can grow and improve, based on the needs of your particular workload. Running a recommendation engine on Cloud Platform will give you flexibility and scalability in the solutions that you want to run.
In this solution, you will see how a real estate rental company can compute relevant recommendations and present them to customers who are browsing a website.
Samantha is looking for a house to rent for her holidays. She has a profile on a
vacation rentals website and has previously rented and rated several holiday
packages. Sam is looking for recommendations based on her preferences and
tastes. The system should already know Sam’s tastes. Apparently she seems to
like accommodations of the type
house, based on her rating page. The system
should recommend something similar.
To provide recommendations, whether in real time while customers browse or through email later on, several things need to happen. At first, while you know little about your users' tastes and preferences, you might base recommendations on item attributes alone. But your system needs to be able to learn from your users, collecting data about their tastes and preferences. Over time and with enough data, you can use machine learning algorithms to perform useful analysis and deliver meaningful recommendations. Other users’ inputs can also improve the results, enabling the system to be retrained periodically. This solution deals with a recommendations system that already has enough data to benefit from machine learning algorithms.
A recommendation engine typically processes data through the following four phases:
The architecture of such a system can be represented by the following diagram:
Each step can be customized to meet the requirements. The system consists of:
A scalable front end that records user interactions to collect data.
Permanent storage that can be accessed by a machine learning platform. Loading the data into this storage can include several steps, such as import- export and transformation of the data.
A machine learning platform that can analyze the existing content to create relevant recommendations.
Storage that can be used by the front end, in real time or later, based on the timeliness requirements for recommendations.
Choosing the components
To achieve a good compromise between speed, simplicity, cost control, and
accuracy, this solution uses Google App Engine,
Google Cloud SQL, and Apache Spark
running on Google Compute Engine using
App Engine can handle several tens of thousands of queries per second with little management required. Whether creating the website or saving the data to a backend storage, App Engine lets you write your code and deploy it to production in a matter of seconds.
Cloud SQL offers simplicity of deployment, too. Cloud SQL can scale up to 32-core virtual machines with up to 208 GB of RAM, and can grow storage on demand to 10 TB with 30 IOPS per GB and thousands of concurrent connections. These specifications are plenty sufficient for the example in this solution, and for a large number of real-world recommendation engines. Cloud SQL also offers the advantage that it can be directly accessed from Spark.
Spark offers much better performance than a typical Hadoop setup; Spark can be 10 to 100 times faster. With Spark MLlib, you can analyze several hundreds of millions of ratings in minutes, which increases the agility of the recommendations, enabling the administrator to run the algorithm more often. Spark also tends to have simpler programming models, an easier API to use, and better language flexibility. It leverages memory for computing, as much as possible, to reduce round trips to disk. It also tries to minimize I/O. This solution uses Compute Engine to host the analysis infrastructure. Compute Engine helps to keep the price of the analysis as low as possible through its per-second, on-demand pricing.
The following diagram maps to the previous architecture diagram, but shows the technology at use for each step:
Collecting the data
A recommendation engine can collect data about users based on their implicit behavior or their explicit input.
Behavior data is easy to collect because you can keep logs of user activities. Collecting this data is also straightforward because it doesn't require any additional action from the user; they're already using the application. The downside of this approach is that it's harder to analyze. For example, filtering the interesting logs from the less-interesting ones can be cumbersome. To see an example of implicit-data analysis that uses logs entries, see Real-time Log Analysis using Fluentd and BigQuery.
Input data can be harder to collect because users need to take additional actions, such as writing a review. Users might not want to provide this data for a variety of reasons. But when it comes to understanding user preferences, such results are quite precise.
Storing the data
The more data you can make available to your algorithms, the better the recommendations will be. This means that any recommendations project can quickly turn into a big data project.
The type of data that you use to create recommendations can help you decide the type of storage to use. You could choose to use a NoSQL database, a standard SQL database, or even some kind of object storage. Each of these options is viable depending on whether you're capturing user input or behavior and on factors such as ease of implementation, the amount of data that the storage can manage, integration with the rest of the environment, and portability.
When saving user ratings or events, a scalable and managed database minimizes the amount of operational tasks needed and helps to focus on the recommendation. Cloud SQL fulfills both of these needs and also makes it easy to load the data directly from Spark.
The following example code shows the schemas of the Cloud SQL tables. The
Accommodation table represents the rental property and the
represents a user's rating for a particular property.
CREATE TABLE Accommodation ( id varchar(255), title varchar(255), location varchar(255), price int, rooms int, rating float, type varchar(255), PRIMARY KEY (ID) ); CREATE TABLE Rating ( userId varchar(255), accoId varchar(255), rating int, PRIMARY KEY(accoId, userId), FOREIGN KEY (accoId) REFERENCES Accommodation(id) );
Spark can receive data from various sources, such as Hadoop HDFS or Cloud Storage. This solution receives the data directly from Cloud SQL by using the Spark Java Database Connectivity (JDBC) connector. Because Spark jobs run in parallel, the connector must be available on all cluster instances.
Analyzing the data
Designing the analysis phase requires an understanding of the application's requirements. These requirements include:
The timeliness of a recommendation. How quickly does the application need to present recommendations?
The filtering approach for the data. Will the application base its recommendation on the user's tastes alone, pivot the data based on what other users think, or pivot on which products logically fit together?
The first factor to consider in analyzing the data is how quickly you need to present the recommendations to the user. If you want to present recommendations immediately, such as when the user is viewing a product, you will need a more- nimble type of analysis than if, for example, you want to send the customer an email that contains recommendations at a later date.
Real-time systems can process data as it's created. This type of system usually involves tools that can process and analyze streams of events. A real-time system would be required to give in-the-moment recommendations.
Batch analysis requires you to process the data periodically. This approach implies that enough data needs to be created in order to make the analysis relevant, such as daily sales volume. A batch system might work fine to send an e-mail at a later date.
Near-real-time analysis lets you gather data quickly so you can refresh the analytics every few minutes or seconds. A near-real-time system might be good for providing recommendations during the same browsing session.
A recommendation could fall under any of these three timeliness categories but, for an online sales tool, you could consider something in between near-real-time and batch processing, depending on how much traffic and user input the application gets. The platform running the analysis could work directly from a database where the data is saved or on a dump saved periodically in persistent storage.
Filtering the data
A core component of building a recommendation engine is filtering. The most common approaches include:
Content-based: A popular, recommended product has similar attributes to what the user views or likes.
Cluster: Recommended products go well together, no matter what other users have done.
Collaborative: Other users, who like the same products the user views or likes, also liked a recommended product.
While Cloud Platform can support any of these approaches, this solution focuses on collaborative filtering, which is implemented by using Apache Spark. For more information about content-based filtering or cluster filtering, see the appendix.
Collaborative filtering enables you to make product attributes abstract and make predictions based on user tastes. The output of this filtering is based on the assumption that two different users who liked the same products in the past will probably like the same ones now.
You can represent data about ratings or interactions as a set of matrices, with products and users as dimensions. Collaborative filtering tries to predict the missing cells in a matrix for a specific user-product pair. The following two matrices are similar, but the second is deduced from the first by replacing existing ratings with the number one and missing ratings by the number zero. The resulting matrix is a truth table where a number one represents an interaction by users with a product.
|Rating matrix||Interaction matrix|
There are two distinct approaches to employing collaborative filtering:
Memory-based filtering calculates similarities between products or users.
Model-based filtering tries to learn the underlying pattern that dictates how users rate or interact with items.
This solution uses the model-based approach, where users have rated items.
All the analysis features this solution requires are available through PySpark, which provides a Python interface to the Spark programming language. Other options are available using Scala or Java; see the Spark documentation.
Training the models
Spark MLlib implements the Alternating Least Squares (ALS) algorithm to train the models. You will use various combinations of the following parameters to get the best compromise between variance and bias:
Rank: The number of unknown factors that led a user to give a rating. These could include factors such as age, gender, or location. The higher the rank, the better the recommendation will be, to some extent. Starting at 5 and increasing by 5 until the recommendation improvement rate slows down, memory and CPU permitting, is a good approach.
Lambda: A regularization parameter to prevent overfitting, represented by high variance, and low bias. Variance represents how much the predictions fluctuate at a given point, over multiple runs, compared to the theoretically correct value for that point. Bias represents how far away the generated predictions are from the true value you're trying to predict. Overfitting happens when the model works well on training data using known noise but doesn't perform well on the actual testing data. The higher the lambda, the lower the overfitting but the greater the bias. Values of 0.01, 1 and 10 are good values to test.
The following diagram shows the relationship between variance and bias. The bullseye represents the value that the algorithm is trying to predict.
Variance versus Bias (best is on the top left)
Iteration: The number of times that the training will run. In this example, you will do 5, 10, and 20 iterations for various combinations of rank and lambda.
The following example code shows how to start an ALS model training run in Spark.
from pyspark.mllib.recommendation import ALS model = ALS.train(training, rank = 10, iterations = 5, lambda_=0.01)
Finding the right model
The collaborative filtering using the ALS algorithm is based on three different sets of data:
Training set: Contains data with known output. This set is what a perfect result would look like. In this solution, it contains the user ratings.
Validating set: Contains data that will help tune the training to pick the right combination of parameters and choose the best model.
Testing set: Contains data that will be used to evaluate the performance of the best trained model. This would be equivalent to running the analysis in a real-world example.
To find the best model, you need to calculate the root-mean-square error (RMSE) based on the model that was calculated, the validation set, and its size. The lower the RMSE, the better the model.
Delivering the recommendations
To make the results available to the user quickly and easily, you need to load them into a database that can be queried on demand. Again, Cloud SQL is a great option here. From Spark 1.4, you can write the results of the prediction directly to the database from PySpark.
The schema of the
Recommendation table looks like this:
CREATE TABLE Recommendation ( userId varchar(255), accoId varchar(255), prediction float, PRIMARY KEY(userId, accoId), FOREIGN KEY (accoId) REFERENCES Accommodation(id) );
This section walks through the code to train the models.
Get the data from Cloud SQL
The Spark SQL context lets you easily connect to a Cloud SQL instance through
the JDBC connector. The loaded data is in
jdbcUrl = 'jdbc:mysql://%s:3306/%s?user=%s&password=%s' % (CLOUDSQL_INSTANCE_IP, CLOUDSQL_DB_NAME, CLOUDSQL_USER, CLOUDSQL_PWD) dfAccos = sqlContext.read.jdbc(url=jdbcUrl, table=TABLE_ITEMS) dfRates = sqlContext.read.jdbc(url=jdbcUrl, table=TABLE_RATINGS)
DataFrame to RDD and create the various datasets
Spark uses a concept called a Resilient Distributed Dataset (RDD), which facilitates working on elements in parallel. RDDs are read-only collections created from persistent storage. They can be processed in memory, so they're well suited for iterative processing.
Recall that to get the best model to make your prediction, you need to split your datasets into three different sets. The following code uses a helper function that randomly splits non-overlapping values on a 60/20/20 percentage basis:
rddTraining, rddValidating, rddTesting = dfRates.rdd.randomSplit([6,2,2])
Train models based on various parameters
Recall that, when using the ALS method, the system needs to work with the rank,
regularization, and iteration parameters to find the best model. The ratings
exist, so the results of the
train function must be compared to the validation
set. You want to make sure that the user’s tastes are also in the training set.
for cRank, cRegul, cIter in itertools.product(ranks, reguls, iters): model = ALS.train(rddTraining, cRank, cIter, float(cRegul)) dist = howFarAreWe(model, rddValidating, nbValidating) if dist < finalDist: print("Best so far:%f" % dist) finalModel = model finalRank = cRank finalRegul = cRegul finalIter = cIter finalDist = dist
def howFarAreWe(model, against, sizeAgainst): # Ignore the rating column againstNoRatings = against.map(lambda x: (int(x), int(x)) ) # Keep the rating to compare against againstWiRatings = against.map(lambda x: ((int(x),int(x)), int(x)) ) # Make a prediction and map it for later comparison # The map has to be ((user,product), rating) not ((product,user), rating) predictions = model.predictAll(againstNoRatings).map(lambda p: ( (p,p), p) ) # Returns the pairs (prediction, rating) predictionsAndRatings = predictions.join(againstWiRatings).values() # Returns the variance return sqrt(predictionsAndRatings.map(lambda s: (s - s) ** 2).reduce(add) / float(sizeAgainst))
Calculating top predictions for the user
Now that you have a model that can give a reasonable prediction, you can use it to see what the user is most likely to be interested in based on their tastes and ratings by others with similar tastes. In this step, you can see the matrix- mapping that was described previously.
# Build our model with the best found values # Rating, Rank, Iteration, Regulation model = ALS.train(rddTraining, BEST_RANK, BEST_ITERATION, BEST_REGULATION) # Calculate all predictions predictions = model.predictAll(pairsPotential).map(lambda p: (str(p), str(p), float(p))) # Take the top 5 ones topPredictions = predictions.takeOrdered(5, key=lambda x: -x) print(topPredictions) schema = StructType([StructField("userId", StringType(), True), StructField("accoId", StringType(), True), StructField("prediction", FloatType(), True)]) dfToSave = sqlContext.createDataFrame(topPredictions, schema) dfToSave.write.jdbc(url=jdbcUrl, table=TABLE_RECOMMENDATIONS, mode='overwrite')
Saving the top predictions
Now that you have a list of all the predictions, you can save the top 10 in Cloud SQL so the system can offer some recommendations to the user. For example, a good time to use these predictions might be when the user logs into the site.
dfToSave = sqlContext.createDataFrame(topPredictions, schema) dfToSave.write.jdbc(url=jdbcUrl, table=TABLE_RECOMMENDATIONS, mode='overwrite')
Running the solution
To run this solution, follow the step-by step instructions of on the GitHub page. By following the directions, you should be able to calculate and display recommendations for the user.
The final SQL code fetches the top recommendation from the database and display it on Samantha’s welcome page.
The query, when run in the Cloud Platform Console or a MySQL client, returns a result similar to the following example:
In the website, the same query can enhance the welcome page and increase the likelihood of conversion of a visitor to a customer:
This seems to be quite similar to what Sam likes based on what the system already knew about Sam, as discussed in the scenario description.
Monitoring the jobs
Monitoring with the
You've already used SSH to connect to the master instance. When you start to run jobs, it will be important to be able to monitor them. Spark offers a management console interface that you can use in the browser.
The console runs by default on port
8080, which you must open in the firewall
for each instance by following
You can open the console by using the
external IP of the instance
in the URL:
http://184.108.40.206:8080, for example. In
the following screenshot, you can see two workers listed, the running
applications and the ones that have completed, the Spark shell in this case.
The Spark console
Monitoring with Cloud Dataproc
You can get the complete contents of the tutorial, including setup instructions and source code, from GitHub.
- Try out other Google Cloud Platform features for yourself. Have a look at our tutorials.
- Learn how to use Google Cloud Platform products to build end-to-end solutions.
While you've seen how to build an effective and scalable collaborative filtering solution, crossing the results with other types of filtering can improve the recommendation. Recall the other two main types of filtering: content-based and clustering. A combination of these approaches can produce a better recommendation for the user.
Content-based filtering works directly with item attributes and understands their similarities, which facilitates creating recommendations for items that have attributes but few user ratings. As the user base grows, this type of filtering remains manageable, even with a large number of users.
To add content-based filtering, you can use the previous ratings of other users for the items in the catalog. Based on these ratings, you can find the most-similar products to the current one.
One common way to calculate similarity between two products is to use cosine similarity and find the closest neighbors:
The similarity result will be between 0 and 1. The closer to 1, the more similar the products are.
Consider the following matrix:
In this matrix, the similarity between P1 and P2 can be calculated as follows:
You can achieve content-based filtering through various tools. If you want to learn more, have a look at:
Twitter all-pairs similarity. The
CosineSimilaritiesScala function added to MLlib can be run in the Spark environment.
Mahout. If you want to get access to more libraries to either complement or replace some MLlib algorithm, you can install Mahout on your
bdutilsetup. You can add Mahout to the current setup by cloning the GitHub project:
git clone https://github.com/apache/mahout.git mahout export MAHOUT_HOME=/path/to/mahout export MAHOUT_LOCAL=false #For cluster operation export SPARK_HOME=/path/to/spark export MASTER=spark://hadoop-m:7077 #Found in Spark console
It's also important to understand the browsing context and what the user is currently looking at. The same person browsing at different times could be interested in two completely different products, or might even be buying a gift for someone else. Being able to understand which items are similar to the one currently displayed is essential. Using K-mean clustering enables the system to put similar items into buckets, based on their core attributes.
For this solution, a person looking to rent a house in London, for example, probably isn't interested in renting something in Auckland at the moment, so the system should filter out those cases when doing a recommendation.
from pyspark.mllib.clustering import KMeans, KMeansModel clusters = KMeans.train(parsedData, 2, maxIterations=10, runs=10, initializationMode="random")
You can improve the recommendation further by taking other customer data into consideration, such as past orders, support, and personal attributes such as age, location or gender. These attributes, which are often already available in a customer relationship management (CRM) or enterprise resource planning (ERP) system, will help narrow down the choices.
Thinking further, it's not only internal system data that will have an impact on users' behavior and choices; external factors matter, too. In this solution's vacation rental use case, being able to know the quality of the air might be important to a young family. So integrating a recommendation engine built on Cloud Platform with another API, such as Breezometer, might bring a competitive advantage.