Part 1: Organizing Chaos
Over the past year, we’ve built out Thumbtack’s data infrastructure from the ground up. In this two-part blog post, I wanted to share where we came from, some of the lessons we’ve learned, and key decisions we’ve made along the way.
When we started this project in early 2015, Thumbtack didn’t have a standalone data infrastructure; all analytics and data-oriented tasks were accomplished by directly using production databases. Individuals across all engineering and non-engineering teams were using the PGAdmin desktop tool for running queries. These and other dashboard/analytics queries were directly hitting a production PostgreSQL replica. Dashboarding and follow-up analysis work was primarily performed in Excel or Google Sheets.
Along with relational data, we also were capturing a very large amount of event data. In web companies, event data typically represents timestamped logs of user interactions with our website and mobile apps. In many cases, the stream of events is also used to evaluate the performance of variants in online A/B testing. At the onset of our data infrastructure development, our event data collection and A/B testing infrastructure was not in good shape—the original design hadn’t kept up with our growth and was encountering significant scaling issues. Our event tracking had grown to collecting millions of events per day, and the system we had in place to process this data was running too slowly. Over time, this system was increasingly falling behind, as it could not keep up with the website’s event volume. Since the beginning of the project, our event volume has skyrocketed, and the number of event records we collect per day is now about 25x greater than in 2014:
Legacy Event Data Pipeline
Our pre-existing event-logging pipeline was designed to collect events from our front-end servers and deposit those events into MongoDB. All event data was batched on the production front-end servers and an hourly cron was executed to send these batches off to MongoDB for storage. Once in MongoDB, an aggregation process would summarize inbound events.
Our MongoDB instance was configured as an unsharded replica set, and a single Python process on one machine was responsible for aggregating event data towards summarizing experiment outcomes. The aggregation process would read raw events and summarize them into aggregate objects, also stored in MongoDB. The code we used to aggregate events was built with a home-grown DSL that made heavy use of Python decorators. For example:
@events.capture('type = "home page/view" AND browser_engine') def on_homepage_view(event): on_view(event) @events.capture(type='category group page/close lightbox') @events.capture(type='landing page/close lightbox') @events.capture(type='service/close lightbox') @events.capture(type='request form/close lightbox') def on_interaction_event(event): on_view(event)
The events “captured” by the decorators were handed off to the associated function, where the on_view(event) function would query the appropriate aggregate collection in MongoDB and upsert some updated information into an aggregate record.
Each aggregation was expressed as a Python script containing a collection of these expressions. A wrapper scanned a directory of these Python scripts and invoked them all by calling exec() on whatever scripts it found, and injecting the events object into the execution context.
Given our growth and ever-increasing volume of event data, this system was completely falling over, and couldn’t keep up with our inbound event data.
Planning: Our Roadmap for Success
In planning our data infrastructure development project, we had three primary goals:
A scalable data infrastructure to accommodate our growth was top-of-mind, given how our existing infrastructure wasn’t able to support our data volume. We wanted to ensure our redesigned version could support significant, sustained growth.
Decoupling of analytics and production resources was also important to ensure that failures in our analytics infrastructure did not impact production resources and vice versa. Decoupling these systems also enables us to scale our production and analytics infrastructures independently.
Ease of access
In our revised data infrastructure, we wanted to ensure that everyone at Thumbtack had easy access to all of our data, even for teams outside of engineering. For us, this meant our planning included both (1) a SQL interface, and (2) dashboarding/analytics tools.
We began designing systems to support these objectives. Our very first architecture diagram looked something like this:
While the spirit of this design has remained the same (we have systems for ingesting data, batch processing, and distributed SQL queries), many of the specifics of our data infrastructure have ended up being somewhat different.
In the following sections, we articulate some of the design decisions we made, and the thought process behind those decisions.
One of the most pressing problems that we needed to solve at Thumbtack was a system for distributed batch jobs to process our data. As described above, our event analytics were encountering serious scaling issues, and replacing this system with some kind of distributed event analytics was our highest priority. Our events data was essentially unusable, given that it was locked up in an underpowered MongoDB cluster with no parallelism for aggregations or downstream processing. This was a key decision point for us, since quite a few different batch processing systems were available. In my previous work, I used vanilla Hadoop MapReduce, but had seen data engineering teams elsewhere having success with Cascading/Scalding and/or Apache Spark.
Ultimately, our team chose to implement all of our batch jobs using Spark. Spark is a distributed computing framework, designed to support a variety of data processing workloads at scale. This was motivated by a few reasons: we were very excited about the ease of getting started with Spark (easily run the same code locally and on the cluster), its versatility, the large number of associated first and third-party libraries (Streaming, MLLib, GraphX), its close integration with the Parquet format, and its quickly growing community.
Once we decided to use Spark, we also had to decide how to deploy it in production. We considered several options for our Spark deployment, based on either Amazon Elastic MapReduce (EMR) or Cloudera CDH. These were:
- Amazon EMR: write data to S3, run Spark jobs on EMR and write to S3
- Amazon EMR: write data to EMR HDFS, run Spark jobs on EMR and write to EMR HDFS
- Cloudera CDH: write data to CDH HDFS on EC2 nodes, run Spark jobs on CDH and write to CDH HDFS
At the start of this project, we expected to end up with EMR, since it has the least operational overhead. However, a few considerations ultimately led to us moving forward with a Cloudera CDH cluster in production:
- Speed in our prototyping, we found that S3 was significantly slower for batch reads in Spark than local-disk HDFS.
- Spark at the time we began this project in early 2015, Spark was not yet available on EMR (Spark on EMR was introduced in June 2015), and so we’d need to manage our own Spark installation on EC2 nodes.
- Package Versions EMR tends to package and support somewhat old versions of tools in the Hadoop ecosystem, a point that became particularly important when we were considering what distributed SQL engine to use. For example, at the time of writing, EMR packages Hadoop 2.4.0, Hive 0.13.1, and Impala 1.2.4, whereas we currently have Hadoop 2.6.0, Hive 1.1.0, and Impala 2.3.0 deployed on our production CDH cluster.
- Availability Amazon’s EMR releases differ from the Apache releases (see notes here). Most importantly, the high availability features of HDFS and YARN are not included. We were particularly concerned about availability guarantees with our cluster, and this was one of the biggest factors in choosing not to use EMR.
- Experience two members of our team had prior experience maintaining large production CDH clusters.
Given the performance issues, availability differences, and older versions of packaged software, we decided to deploy our own Cloudera CDH cluster on EC2 nodes. We started putting together a prototype cluster, along with a spreadsheet to summarize EC2 instance pricing.
|Instance Type||Monthly Statistic||EMR||EMR + gzip||EMR + snappy||CDH||CDH + gzip||CDH + snappy|
Conveniently, this was right around the time that AWS launched their new dense-storage d2 instances. These nodes were a great fit for us, as an economical route to getting a lot of storage for HDFS along with plenty of compute and memory. In the end, we launched our first cluster with two r3.4xlarge namenodes in HDFS high availability, and four d2.8xlarge data nodes. We’ve since scaled up the number of data nodes to meet our growing compute requirements.
Distributed SQL Infrastructure
With this infrastructure in place to address running batch jobs, we also needed to support SQL queries for offline analytics. This was critical to the business, as everyone from engineering to product management to finance to marketing needed rapid access to large amounts of relational data. Largely, the organization was still using PGAdmin to run queries against a production PostgreSQL replica. This had a few major problems:
- Aggregations there were no aggregations/rollups or summary tables. The SQL tables were just raw tables representing users, requests, and quotes. Every team had started implementing their own queries to get KPIs for the business, each with slightly different filters/summary methods. This led to a mess of slightly-differing KPI tracking across the business.
- Coupling since analysis queries were hitting a read-only replica of our production PostgreSQL database, offline analytics were tightly coupled to the state of production infrastructure.
- Performance many of our offline analytics queries involved large batch reads (e.g. summarize all actions taken on Thumbtack by pro over the past X years) from a single database replica, which meant that performance was terrible. Many queries would take nearly an hour to complete and any queries hitting our largest tables would frequently just time out.
- Authentication and Authorization all queries were run against the database using a single read-only user, which meant that we had very little visibility or control regarding access to data.
We considered several alternatives for our distributed SQL engine. Motivated by the poor state of offline analytics, we wanted to address the issues above. At first, we hoped to just use Spark SQL to support SQL queries, since we were already planning on using Spark for our batch jobs. However, Spark SQL did not yet have the authentication/authorization support we were looking for. In particular, we wanted to ensure that we had fine-grained control over who had access to each table in our databases. Presto was another tool we considered, but Presto’s performance appeared to be lagging with respect to Impala or Spark SQL (see Cloudera benchmarks here).
Another major contender for us was Amazon Redshift—it’s high-performing with easy deployment and integration capabilities. However, the cost and potential issues with ensuring data consistency under such a regime meant we decided to keep looking.
Ultimately, we decided to move forward with Impala for all of our analytics SQL support. There were a few factors involved with this decision: 1.) the performance metrics were promising compared to other distributed SQL engines, 2.) it integrated with our LDAP server for authentication and supported per-database/table authorization with Sentry, and 3.) Impala worked directly with our Parquet files in HDFS and did not require building a separate ETL pipeline. This simplified the task of keeping our data consistent and reduced the engineering work required to get up and running.
Dashboarding & BI Tools
The widespread access to a PostgreSQL replica meant that a plethora of downstream tools had accumulated and it was now time to painstakingly migrate to Impala (fun!). At the onset, many fragile pipelines had popped up to move data around to teams outside of engineering, and we had no idea all the ways that teams were querying this data.
On top of that, our dashboarding tools sorely needed an update. A wide variety of homegrown dashboarding tools had been developed—most of them just a thin wrapper on top of SQL queries—which produced tabular summaries or simple plots. We also had an existing Looker instance pointed at a PostgreSQL replica.
In re-imagining our dashboarding tools, we wanted to consider the needs of Thumbtack and the characteristics of our team. Importantly, we found that most of the team here in San Francisco has at least some familiarity with SQL. We evaluated Tableau, but ultimately found that the learning curve and cost made it significantly less attractive than other alternatives. We considered quite a few other dashboarding tools, but ruled out all of them for a variety of reasons (missing features, lack of Impala support, inconvenient for SQL authors).
In the end, we settled on using Mode Analytics for our primary dashboarding tool, retaining a Looker instance for some legacy dashboards and to support team members who were not familiar with SQL. At this point, the majority of team members in our SF office are active Mode users, on average running around 3,000 queries per day.
Data Ingest Pipelines
At Thumbtack, we have many sources of data that we wanted to ETL into a single data warehouse. However, at the beginning of this project, we had two data sources we were most interested in ingesting and using. First, we wanted to ingest all of our relational data to provide access in Impala. This data is frequently used for tracking key company metrics, financial reporting, and other critical business analytics. Second, we wanted to ingest our event data, and transform it (with Spark batch jobs) to make it accessible for downstream analytics, most importantly in support of A/B testing.
In both cases, we stored all of our production data in Snappy-compressed Parquet format. Parquet is very fast (particularly since most jobs touch only a small subset of the columns present in the data) and has been fantastic for making it easy to work with data from both code and SQL (Impala). There is a great discussion of compression options with Hadoop here, but, for us, Snappy’s CPU/disk tradeoffs met our needs well.
All of our ETL pipeline tasks were originally automated with cron jobs. We’ve since transitioned all of these jobs to Airbnb’s Airflow workflow manager, and we now have several dozen tasks with fairly complex dependencies automated throughout our relational and event ETL workflows.
For our relational data, we needed a way to retrieve data from our PostgreSQL database and ingest it into HDFS. Our goal was to get all of our relational data regularly imported from Postgres into Parquet files on HDFS, without too much delay from the most recent records stored in Postgres. We use Parquet to reap the performance advantage of a columnar data storage format.
We ended up deploying Apache Sqoop for these imports, as it is really the only tool available for importing relational data into HDFS. Our Sqoop imports are automated with Airflow to run a full export of our production PostgreSQL database into HDFS every three hours. Subsequent Airflow tasks automate generation of aggregate relational tables from the raw Sqoop import data, and all of these are exposed through Impala / Hive for SQL queries.
When we began our effort to bring event data into Hadoop, we needed a fast, production-ready solution. The existing event ETL pipeline, which was loading events into MongoDB, batched events in hourly event logs local to each of the front-end servers. A cron ran hourly, picked up these logs, and pushed them into MongoDB.
We found that we could get up and running by just updating this hourly cron. Instead of pushing events into MongoDB, we ingested events into HDFS by using the HTTPFS REST API that Hadoop provides. We stood up a handful of EC2 nodes running HTTPFS, put them behind a load balancer, and then updated the hourly cron to post event logs to this endpoint.
Now, once the raw event logs are in HDFS, we batch process the events in periodic Airflow jobs which enrich events and convert them to Snappy-compressed Parquet files with a very wide schema. These Parquet files are then exposed via date-partitioned Impala tables for use in downstream batch jobs and ad-hoc analytics queries. Over time we’ve found that it is very useful to clean up event schemas in the enriching stage. This ensures that our data is consistent, even when ingested from legacy event types with inconsistent schemas. One of the most tedious and time-consuming efforts in bringing our data infrastructure deployment online was standardizing and reconciling a huge variety of legacy, irregular event logging. In the future, we plan to standardize our event schemas, using Thrift-JSON to define and enforce standardized schemas in associated client libraries.
This event ETL pipeline has served us well in getting our infrastructure off the ground, but we are beginning to feel its limits. In particular, the legacy event log / cron pairing assumes that our infrastructure is just a PHP monolith, and as we transition to SOA, we don’t have the ability to log events from our newly-minted backend services into HDFS.
The solution? Transitioning to a v2 event ETL pipeline. For the replacement system, we considered a few different solutions. But ultimately, we decided to use Logstash, and Logstash output plugins to forward event data from our servers to S3. When a user action is taken on the website, an associated event is generated by a simple logging library we’ve written. The event is then written to a local log file on EBS. Even if one of the processes involved in submitting the event die (the PHP process handling the HTTP request or Logstash itself), event submission will still be properly handled by Logstash. This allowed us to simplify our application logic while maintaining high durability guarantees, since Logstash handles most of the batching and retry logic.
Logstash is now automatically provisioned on our production machines, and we schedule our event-enriching job to pick up raw event data from S3 and write the enriched output data out to HDFS for consumption by downstream jobs.
Time to Put Our Plan Into Play
Over the past year, we’ve spent a lot of time thoughtfully designing and building out our data infrastructure and made great progress towards our goals of scalability, decoupling, and ease of access. We’ve now scaled up to handle thousands of SQL queries per day across billions of records, our data infrastructure is fully decoupled, and the vast majority of the organization is designing and running reports and dashboards touching data from every corner of the organization.
But having a great plan is one thing; implementing it is another. In the next post, I’ll discuss some of the lessons we’ve learned putting these systems into production, and dive into some of the details for executing Spark and Sqoop jobs in our production systems. Stay tuned!
We’re hiring! Reach out if you’re looking to join a fast-growing team focused on building out a world-class data infrastructure.
Thumbtack connects consumers with skilled professionals to get things done. More than 200,000 professionals in almost 1,100 unique categories—from handymen and housekeepers, to tutors, photographers, wedding planners, and more—use Thumbtack to connect with millions of customers across all 50 states.