Bring Your Own Algorithm to Anomaly Detection

Pinterest Engineering
Pinterest Engineering Blog
9 min readOct 17, 2023

--

Charles Wu | Software Engineer; Isabel Tallam | Software Engineer; Kapil Bajaj | Engineering Manager

Overview

In this blog, we present a pragmatic way of integrating analytics, written in Python, with our distributed anomaly detection platform, written in Java. The approach here could be generalized to integrate processing done in one language/paradigm into a platform in another language/paradigm.

Background

Warden is the distributed anomaly detection platform at Pinterest. It aims to be fast, scalable, and end-to-end: starting from fetching the data from various data sources to be analyzed, and ending with pushing result notifications to tools like Slack.

Warden started off as a Java Thrift service built around the EGADs open-source library, which contains Java implementations of various time-series anomaly detection algorithms.

The execution flow of one anomaly detection job, defined by one JSON job spec. Each job is load-balanced to a node in the Warden cluster.

Warden has played an important role at Pinterest; for example, it was used to catch spammers. Over time, we have built more features and optimizations into the Warden platform, such as interactive data visualizations, query pagination, and sending customized notification messages. We have also found it useful to have Warden as a separate Thrift service as it gives us more flexibility to scale it by adding or removing nodes in its clusters, to call it via a Thrift client from a variety of places, and to add instrumentations for better monitoring.

What’s the Problem?

Despite the many useful features of the Warden platform, a requirement emerged. As we expanded the use cases of Warden throughout Pinterest, we started to collaborate more and more with data scientists who would like to use Warden to analyze their data. They found the existing selection of anomaly detection algorithms in EGADs to be limiting. While Warden could be extended with more customized algorithms, they would have to be developed in Java. Many data scientists preferred to bring to Warden their own anomaly detection algorithms in Python instead, which has at its disposal a rich set of ML and data analysis libraries.

What’s the Goal?

Functionally, we want to expand Warden such that it can retain the Java algorithms in the EGADs library used by the existing use-cases like spam detection, but it can also support new algorithms developed in Python. The Python algorithms, like the EGADs Java algorithms, would be part of the end-to-end Warden platform, integrated with all of the existing Warden features.

With that in mind, we want to develop a framework to achieve two things:

  1. For our users (mainly Pinterest data scientists) to develop or migrate their own Python algorithms to the Warden platform
  2. For the Warden platform to deploy the Python algorithms and execute them as part of its workflow

In particular, this framework should satisfy all of the following:

  • Easy to get started: users can start implementing their algorithms very quickly
  • Easy to test deploy the Python algorithms being developed in relation to the Warden platform, while requiring no knowledge of Java, inner workings of Warden, or any deployment pipelines
  • Easy and safe to deploy the algorithms to all the Warden nodes in a production cluster
  • To optimize for the usability in production cases, as well as to minimize the feedback time for testing, the Python algorithms should be executed synchronously on the input data and ideally with minimum latency overhead

Options We Explored

We thought of experimenting with Jython. However, at the time of development, Jython did not have a stable release that supported Python 3+, and at the moment, all Python programs at Pinterest should generally conform to at least Python 3.8.

We have also thought of building a RESTful API endpoint in Python. However, having intensive data processing done through API endpoints is not a good use of the API infrastructure at Pinterest, which is generally designed around low-CPU, I/O-bound use-cases.

Additionally, we had thought about having a Python Thrift service that the Warden Java Thrift service could call to, but Thrift services in Python are not fully supported at Pinterest (compared to Java or C++) and have very few precedents. Setting up a separate Thrift service would also require us to address additional complexities (e.g. setting up additional load-balancers) that are not required by the approach we ended up going with.

High-Level Design

The main idea is to move the computation as close to the data as possible. In this case, we will package all the Python algorithms into one binary executable (we are using Pyinstaller to do this), and then distribute that executable to each Warden node, where the data will reside in memory after Warden has fetched them from the databases. (Note: instead of producing a single executable using Pyinstaller, you can also experiment with producing a folder instead in order to further optimize latency.)

Each Warden node, after fetching the data, will serialize the data using an agreed-upon protocol (like JSON or Thrift), and pass it to the executable along with the name of the Python algorithm being used. The executable contains the logic to deserialize the data and run it through the specified algorithm; it will then pass the algorithm output in a serialized format back to Warden, which will deserialize the result and continue processing it as usual.

This approach has the benefits of being efficient and reliable. Since all the Python algorithms are packaged and distributed to each node, each node can execute these algorithms locally instead of via a network call each time. This enables us to avoid network latency and network failures.

While the executable being distributed to each node contains all the Python algorithms, each node can apply an algorithm to only a subset of the data, if processing the entire data exceeds the memory or CPU resources of that node. Of course, there would then need to be additional logic that distributes the data processing to each node and assembles the results from each node.

Production Deployment

Warden production cluster

To deploy to production, we build an executable with all of the Python algorithms and put that executable into an access area within the company, like a Warden-specific S3 bucket. The Warden service instance on each node will contain the logic to pull the executable from S3 if it’s not found at a pre-specified local file path. (Note: instead of programming this, the build system for your service could also support something like this natively, e.g. Bazel’s http_file functionality.)

To make a new deployment to production, the operator will build and push the executable to S3, and then do a rolling-restart of all the Warden nodes in the production cluster. We have ideas to further automate this, so that the executables are continuously built and deployed as new algorithms are added.

Test Deployment

When users want to test their algorithm, they would run a script that would build their algorithm into an executable and copy that executable into the running service container on each node of the Warden test cluster. Afterwards, from places like Jupyter notebook, users could send a job to the Warden test cluster (via a Thrift call) to use the test algorithm that they have just copied over.

We have invested time to make this process as simple as possible, and have made calling the script an essentially one-stop process for the user to deploy their algorithms to the test Warden cluster. No knowledge of Java, the inner workings of Warden, or any deployment pipelines is required.

Interfaces

On the note of simplicity, another way that we have tried to make adding algorithms easy for our users is by organizing algorithms through clearly defined and documented interfaces.

Each Python algorithm will implement an interface (or, more accurately in Python, extend an abstract base class) that defines a specific set of inputs and outputs for the algorithm. All the users have to do is to implement the interface, and the Warden platform will have the logic to connect this algorithm with the rest of the platform.

Below is a very simple example of an interface for anomaly detection:

@abstractmethod def detect( self, dimensions: List[str], timestamps: List[int], values: List[float] ) -> Tuple[List[int], List[float]]: “”” Detects anomalies in the provided time-series data. @param dimensions: list of dimensions for the time-series @param timestamps: list of timestamps @param values: list of metric values We expect the length of timestamps to equal to that of values, and that for any i, values[i] happens at timesstamps[i].

The typical workflow for the users to create an algorithm is to:

  1. Select and implement an interface
  2. Test deploy their algorithm through the one-stop process as described in Test Deployment
  3. Submit a PR for their algorithm code

Once the PR has been approved and merged, the algorithms will be deployed to production

In practice, we try to define interfaces broadly enough that users who wish to develop or migrate their algorithms to Warden can usually find an interface that their algorithm fits under; however, if none fit, then users would have to request to have a new interface supported by the Warden team.

Interfaces give us a way of organizing the algorithms as well as the serialization logic in the Warden platform. For each interface, we can implement the serialization logic in the Warden platform just once (to support the passing of data between the Java platform and the executable), and it would apply to all the algorithms under that interface.

Additionally, and perhaps more importantly, interfaces provide us a way of designing solutions: when we start thinking about what new functionalities the platform should support via its Python algorithms, we can start by specifying the set of inputs and outputs we need. From there, we can work backwards and see how we get those inputs and where we pass those outputs.

For example, when we want to have Python algorithms for root-cause analysis in the Warden platform, we can start by defining an interface similar to the following:

@abstractmethod def simple_rca( self, metric_of_interest: TimeSeries, related_metrics: List[TimeSeries], k: int, anomalies: List[int], params: Dict[str, str], ) -> Dict[str, float] “”” Performs RCA. @param metric_of_interest: metric that we are interested in @param related_metrics: metrics that are related/could explain changes in metric_of_interest @param k: top k related metrics we are interested in @param anomalies: known anomalies in metric_of_interest

Where TimeSeries could be defined as:

class TimeSeries: # For each dimension of the time-series, # maps dimension name to dimension value. dimensions: Dict[str, str] # The following four lists should all have the same length; in particular, # at Unix time {{times[i]}}, the value is {{values[i]}}, which is aggregated # from a sample with size {{sizes[i]}}, with sample variance {{variances[i]}}. times: List[int] values: List[float] sizes: List[int] variances: list[float]

For you, the reader, it would be a fun and useful exercise to think about whether the analytic problems you are working on could be abstracted down to broad categories of interfaces.

Impacts

We are currently expanding Bring Your Own Algorithm throughout Pinterest.

We are migrating the algorithms used in several existing Jupyter reports (used in metrics reviews) to the Warden platform through the Bring Your Own Algorithm framework. This enables better, more standardized code review and version control, since the algorithms will actually be checked into a Python repo instead of residing in the Jupyter notebooks. This also leads to easier collaboration on future enhancements, as once the users migrate their use-case to the Warden platform, they can easily switch within a library of Warden algorithms and take advantage of various Warden features (e.g. pagination, and customized notifications/alerts).

Bring Your Own Algorithm has also enabled Warden to support algorithms based on a variety of Python ML and data science libraries. For instance, we have added an algorithm using Prophet, an open-source, time-series forecasting library from Meta. This has enabled us to perform anomaly detection with more sophisticated analytics, including tunable uncertainty intervals, and take into account seasonalities and holiday effects. We are using this algorithm to capture meaningful anomalies in Pinner metrics that went unnoticed with simpler statistical methods.

Additionally, as alluded to in the Interfaces section above, Bring Your Own Algorithm is serving as the foundation for adding root-cause analysis capabilities to Warden, as we set up the workflow and Python interface that would enable data scientists to plug in their root-cause analysis algorithms. This separation of expertise — us focusing on developing the platform, and the data scientists focusing on the algorithms and statistics — will undoubtedly facilitate more collaborations on exciting problems into the future.

Conclusion

In summary, we have presented here an approach to embedding analytics done in one language within a platform done in another, as well as an interface-driven approach to algorithm and functionality development. We hope you can take the approach outlined here and tailor it to your own analytic needs.

Acknowledgements

We would like to extend our sincere gratitude to our data scientist partners, who have always been enthusiastic in using Warden to solve their problems, and who have always been eager to contribute their statistical expertise to Warden.

To learn more about engineering at Pinterest, check out the rest of our Engineering Blog and visit our Pinterest Labs site. To explore and apply to open roles, visit our Careers page.

--

--