When we built our first machine learned model in Spark to predict pros’ interests in customer requests, we loaded pros’ historical engagement features directly into an online Golang application, which used the features to determine which pros are potentially interested in a given request. This approach had a few limitations:
- Other applications could not reuse the model.
- The feature loading process was not scalable due to the ever-increasing number of features.
- The interoperability between Scala (for offline Spark jobs) and Golang (for online applications) led to a substantial amount of duplicated code for common data models, feature transformations and unit tests.
In light of the above limitations and the expected additions of new machine learned models, we decided to build a standalone modeling service in Scala that would be extensible, scalable and interoperable with offline Spark jobs.
To date, this model serving system has supported more than ten models across the engineering team. In the following, we share our experience in building this system within the Scala ecosystem.
The following diagram provides a 10,000 foot view of the components of our model serving system. In essence, we bring features and models stored offline in Hadoop Distributed File System (HDFS) to DynamoDB and build a web service using the Play framework to serve the data stored in DynamoDB.
Periodic Feature/Model Update
We run Spark jobs daily to update our features and models and persist them in HDFS. In order for the online services to use them, we need to propagate these updates to an online store. As all of our infrastructure resides in AWS, we have chosen DynamoDB as the key-value store to host features data, which are normally keyed by certain IDs such as pro IDs. With the help of Airflow, we upload multi-GBs of feature data from HDFS to DynamoDB in a nightly Spark job. This job needs to operate within the constraints of DynamoDB’s provisioned throughput. If we are not mindful about the write throughput, DynamoDB would throttle the traffic and significantly reduce the instantaneous capacity well below the provisioned value, which introduces unpredictability in our system that we would like to avoid. Fortunately, DynamoDB returns consumed capacity in most of its APIs, with which we can enforce rate limiting to ensure we use the full provisioned write capacity without being throttled.
We chose Scala (and the Play framework) to write the model serving online service for two reasons:
- All of our offline feature collection and model training infrastructure is written in Scala/Spark. Using Scala to build the online serving infrastructure would allow reusability of data models and feature transformation libraries.
- Scala is well suited for data manipulation (filter, map, reduce, etc.) due to its functional paradigm and great interoperability in big data ecosystem.
We adopted Play as the web framework and it has worked well for us so far. In the following, we summarize a few aspects we found interesting when using Play (which may apply to other frameworks).
Dependency Injection (DI). Play 2.4 introduced dependency injection (Guice) into controllers, which eliminates the need of constructing all classes during startup and makes it easier to write tests. When injected, each implementation of an interface will be constructed as a new instance. If the implementation should be initiated only once (e.g., for a cache), @singleton annotation can be used to share one implementation in all injections. DI also helps construct test implementation as dependencies. In many cases, we can just call injector.instanceOf[SomeInterface] (where val injector = new GuiceInjectorBuilder().injector()) to get default implementation of SomeInterface in tests. However, sometimes we may want to inject a mock implementation by overriding the default implementation.
Custom error handling. Play allows us to override default error handling. For example, we may return 500 with an error message in production environment but return stack trace with source code indication in development environment. We may also differentiate client or server errors and add counters to them respectively.
Integration tests when server starts. There are situations where we have everything working in unit tests, manual tests, staging environment and yet something breaks in production, as the production environment is different. The production configuration may include a different external URL, point to a different DynamoDB table, use a different authentication profile, etc. Our approach is to test all endpoints programmatically when server starts, which has helped us fail fast and eliminated many bugs.
Monitor all endpoints. We would like to measure success rate (succeeded calls / total calls) and latency (call duration) for all the endpoints. However, manually inserting monitoring APIs (e.g., someCounter.increment()) all over the place is tedious and error-prone. We have addressed this problem by using actions composition to intercept requests and responses and encapsulate the monitoring calls in a single file.
In addition to accessing DynamoDB, we also need to query from PostgreSQL for real-time transactional data of our business logic. For this purpose, we adopt Slick, which can support both object relational mapping (ORM) and raw queries in Scala with compile-time safety and compositionality. In order for us to test database query logic in unit tests, we point Slick to H2 (an in-memory DB) in our test environment by adding a test configuration (e.g., application.test.conf) which is specified to be applied for tests in build.sbt (javaOptions in Test += “-Dconfig.file=conf/application.test.conf”). However, note that H2 and PostgreSQL may have discrepancies in their APIs and implementations. Therefore, we still need integration tests mentioned above to cover all of the database query code.
If you are excited about aforementioned technologies and problems, we have plenty of them. Join us!