Google Cloud Big Data and Machine Learning Blog

Innovation in data processing and machine learning technology

Moving Thumbtack’s data infrastructure to Google Cloud Platform

Tuesday, July 18, 2017

By Nate Kupp, Manager, Technical Infrastructure at Thumbtack

In early 2015, we began building data infrastructure at Thumbtack. At the time, the company’s data was spread across a PostgreSQL replica and a MongoDB cluster, neither of which were scaling adequately to handle the load of analytics and BI queries. Our largest table in PostgreSQL had more than a billion records and many queries took nearly an hour to complete.

Furthermore, behavioral event data stored in MongoDB was inaccessible to most of our internal data consumers, particularly those without a technical background. Thus, a critical part of Thumbtack’s data had become “dark data.”

Separately, Thumbtack had an increasingly pressing need for more scalable data processing infrastructure. This was driven by a need for better A/B testing and our nascent machine-learning efforts.

To address the need for easier data access, we formed a team focused on (1) making all of Thumbtack’s data easily and reliably accessible, and (2) providing scalable infrastructure for large-scale data processing. Our first efforts involved a monolithic Cloudera CDH cluster on AWS EC2 machines. (All of Thumbtack’s infrastructure was on AWS at the time.) We then adopted Apache Impala to provide distributed SQL support, and Apache Spark for distributed data processing.

Moving to GCP

This configuration worked well for us for about a year before we began hitting some operational issues. In particular, running production Spark workloads alongside ad-hoc Impala SQL queries meant that we had to significantly over-provision the cluster hardware to ensure that we could reliably handle peak workloads. Our team uses Mode Analytics for dashboarding, and we found that most of our users would schedule reports to run daily at 8 or 9am before coming into the office — often overwhelming the cluster, which would still be finishing up nightly jobs started in the early morning hours. Resource contention also impacted our Spark jobs and we began staggering daily job starts throughout the night (when ad-hoc queries were the quietest) to ensure jobs had sufficient resources. Finally, as our cluster grew, operational issues like machine failures became a significant time draw for our team.

Towards the end of 2016, we started evaluating potential new infrastructure designs beyond a single monolithic cluster. At the time, we had dozens of Spark jobs and thousands of Impala SQL queries running daily against billions of records. Given our small team size, we prioritized two goals: (1) moving toward more managed services and away from VMs that we needed to manage ourselves; and (2) fully elastic resources to eliminate our need for a large excess of standing capacity.

For our next solution, we evaluated three options: continue expanding our monolithic cluster, move to AWS managed services or move to Google Cloud Platform (GCP). As we learned more, we found ourselves increasingly drawn to GCP — in particular, the combination of Google Cloud Storage + Google Cloud Dataproc (Spark) + Google BigQuery (SQL) was ideal for our use case. We began our migration in earnest in early 2017, and shut down the previous production cluster in May.

Getting to success on GCP

During the course of standing up our infrastructure on GCP, we learned a few tricks that really helped in making our new infrastructure successful. It was surprisingly easy to wire up GCP resources to our existing infrastructure —​ particularly with Apache Airflow, which we use for all of our workflow scheduling. A particular focus for us was getting to an MVP solution as quickly as possible, to (1) ensure that we were demonstrating clear value and (2) find and address roadblocks early on in the process.

Prioritizing datasets

One of the things we found to be really helpful was to partner with our analytics team to identify a list of the most critical datasets to make available. In our initial efforts, we focused on migrating those datasets to BigQuery. By targeting frequently-used data, our team was able to start migrating Thumbtack’s reporting and dashboards to BigQuery in parallel with the engineering effort. This approach demonstrated clear value much faster.

Thanks to Standard SQL, converting our Mode dashboards from Impala to BigQuery was surprisingly easy. Our infrastructure and analytics teams worked together to convert some of the first few into BigQuery SQL; we then held a series of training workshops to educate our analysts about the new SQL query syntax.

Cross-copy

In most cases, our datasets are the end product of pipelined Spark jobs. To avoid needing to rewrite all of these pipelines at once, we wrote an Airflow operator and corresponding Spark job to handle copying data from our existing tables into corresponding BigQuery tables. This job does the following:

// hdfs:///path/database/table/year=2017/month=01/day=01/*.parquet → dataset.table$20170101
def copy(inputPath: String, gcsPath: String, tableCoordinates: TableReference): Unit = {
  spark.read.parquet(inputPath).saveToGcs(gcsPath)
  BigQueryUtil.loadData(
    tableCoordinates,
    Seq(gcsPath),
    WriteDisposition.WRITE_TRUNCATE,
    CreateDisposition.CREATE_IF_NEEDED
  )
}

We appended Spark jobs with this logic to many of our Airflow ETL pipeline DAGs, which gave us comprehensive coverage of our data in BigQuery right off the bat. Providing this data enabled our dashboard owners to begin their migration work right away, while permitting our engineering team to gradually rewrite our production jobs to emit data to BigQuery.

Data storage and reads/writes

Although we began with solely writing to BigQuery from our Spark jobs, we now also save our datasets to Cloud Storage as Apache Avro files. We made this change to enable Spark jobs to read from Cloud Storage instead of BigQuery — as we migrated more of our jobs, we found that this was much more cost-effective, particularly for very large tables.

For Cloud Storage writes, we just save to gs://… paths directly from Spark; for BigQuery we use a modified version of Spotify’s spark-bigquery. We encapsulate this behavior by providing Scala traits for each of our data sources like the following:

trait PostgresTableSources extends GCPTableSources {
  val spark: SparkSession
  import spark.implicits._
  …
  // User is an object/case class pair which knows where it is stored on GCS/BQ
  def getUsers(interval: Option[DateTimeInterval] = None): Dataset[User] = {
    val users = getGCSTable[User](User)
    filterInterval(users, _.signupTime, interval)
  }
}

// For use in REPL
object PostgresTableSources {
  def apply(s: SparkSession): PostgresTableSources = new PostgresTableSources {
    override val spark: SparkSession = s
  }
}

Job-scoped clusters

Given the fast spin-up time of Cloud Dataproc clusters, we now run job-scoped Spark clusters everywhere. We ended up writing a simple Airflow operator to automate bringing up a cluster, running a Spark job on that cluster, and then terminating that cluster. With this, all of our job definitions look something like:

event_etl = DataProcSparkOperator(
   project_id=GCP_PROJ_NAME,
   main_class='com.thumbtack.events.EventETL',
   dataproc_spark_jars=TT_JAR_PATH,
   task_id='events-EventETL',
   retries=2,
   dag=dag,

   # Properties to pass to Spark
   dataproc_spark_properties={
       'spark.speculation': 'true',
       'spark.speculation.interval': '1000ms',
       'spark.speculation.multiplier': '2.0',
       'spark.speculation.quantile': '0.8'
   },

   # Arguments to Spark job
   arguments=[
       '--date', '{{ tt_macros.format_date(execution_date, dag.schedule_interval, -1) }}',
   ],

   # Cloud Dataproc cluster definition
   dataproc_cluster_properties={
       'zone': GCP_ZONE,
       'num_workers': 6,
       'num_preemptible_workers': 4,
       'initialization_actions': [init_actions.init_script('init.sh')],
   },
)

This design let us get completely out of the business of managing standing clusters, and all of our jobs run on ephemeral clusters scoped to the size of the job. We originally started with separate operators for cluster bringup/teardown, but encapsulating the cluster manipulation along with the job definition in a single operator really simplified our Airflow DAG definitions. In addition to tearing down the cluster within the operator, we also run a periodic watchdog for leaked clusters.

Cost auditing

Conveniently, with job-scoped clusters, we can directly attribute exact costs to every Spark job we run on Cloud Dataproc from our Airflow DAGs. We stream our Cloud Dataproc and BigQuery usage logs back into BigQuery and built out cost dashboards on top of this logging data. These dashboards let us easily keep track of our total spend across our GCP resources and focus our optimizations on the most expensive jobs.

The BigQuery usage logs also provide us with similar auditability and tracking. We periodically inspect the most expensive 10-20 queries in the past 30 days. These are usually due to un-optimized queries where users neglect to specify a PARTITION. Usually, the fix is as easy as adding a _PARTITIONTIME predicate to a query against a partitioned table.

Security and data access

A critical feature of BigQuery that we found really useful was the variety of tools Google provides to manage data access. The most straightforward path when getting started is to deposit all data in a single BigQuery instance (project) and then provision access to datasets within that project using Google Groups. We started here.

Over time we observed that certain datasets have a natural affinity with different functions in our organization. So we created separate projects that encapsulated that access which again leveraged Google Groups to provision access, both at the project and dataset level. This had the side effect of simplifying our cost accounting since we can track BigQuery costs for each function.

Finally, we found that in many cases different teams wanted to share access to particular tables. One easy way to meet this requirement was to cross-create views in a client project against the primary table in the master project. We manage the entire process with Cloud Deployment Manager and changes submitted to our deployment manager repository are automatically deployed via our CI tool, Jenkins. It looks something like the following:

Conclusion

Over the past six months, we’ve ramped up our GCP usage from a few BigQuery tables to include all of our data infrastructure. Our on-call rotation is quieter than it has ever been, enabling us to focus on our business problems instead of Hadoop cluster maintenance. Going forward, our infrastructure investments will be focused on further empowering our engineering, analytics and data science teams to leverage our large-scale data in new ways. (See our posts about this project here and here.)

We’ve seen tremendous productivity gains across the organization by our move to managed services on Google Cloud, and we’re excited to continue building on those foundations.

  • Big Data Solutions

  • Product deep dives, technical comparisons, how-to's and tips and tricks for using the latest data processing and machine learning technologies.

  • Learn More

12 Months FREE TRIAL

Try BigQuery, Machine Learning and other cloud products and get $300 free credit to spend over 12 months.

TRY IT FREE