Concurrently Train Multiple Time Series Models Over Spark with XGBoost

Take advantage of the distributive power of Apache Spark and concurrently train thousands of auto-regressive time-series models on big data

Alon Agmon
Towards Data Science

--

Photo by Ricardo Gomez Angel on Unsplash

1. Intro

Suppose you have a large dataset consisting of your customers’ hourly transactions, and you were tasked with helping your company forecast and identify anomalies in their transaction patterns. “If the transaction rate of some customers is suddenly declining then we want to know about it”, the product manager explains, “the problem is that we have to automate this because we just have too many customers to keep track”. You have enough data to train a decent time-series model, but because transaction patterns differ quite a bit between customers, then you need to train a model for each customer in order to accurately forecast and detect anomalies in their specific usage patterns.

I believe that this is quite a common task for many data scientists and machine learning engineers working with SaaS or retail customer data. From a machine learning perspective, this doesn’t seem like such a complicated task, but it can quickly turn into an engineering nightmare. What if we have thousands or even hundreds of thousands of customers? How should we train and manage thousands of models? What if we need to create this forecast relatively frequently? or even in real time? When data volume is constantly growing then even naive requirements can quickly get demanding and we need to ensure we have an infrastructure that can scale reliably as our data grows.

Concurrently training multiple models on a huge dataset is actually one of the few cases that justifies training on a distributed cluster, such as Spark. I know that this is a controversial claim, but when it comes to structured tabular data then training on a distributed cluster (rather than on sampled data, for example) is often not justified. However, when the data we need to process is genuinely “big”, and we need to break it to many datasets and train a ML model on each, then Spark seems like the right direction.

Using Spark for model training provides a lot of capabilities but it also poses quite a few challenges, mostly around how data should be organized and formatted. The purpose of this post is to demonstrate at least one way in which this task can be accomplished, end to end, using Spark (and Scala)— from data formatting to model training and prediction.

Specifically, in what follows we are going to train an autoregressive (“AR”) time-series model using XGBoost over each of our customers time-series data. AR models, in short, take the value to be predicted as a linear function of its previous values. In other words, it models the number of transactions that a given customer will have at hour h as a function of the number of transactions they had on hours h -1, h -2, h -3, h -n. Such models are usually fairly reliable in giving a decent forecast for such tasks, and can be also implemented using boosting trees models which are widely available and easy to use. Indeed we will naively implement this using XGBoost regression.

The trickiest part in time-series training and prediction is to correctly “engineer” the features. Section 2 shortly explains how auto-regression works in the context of time-series, and shows how time-series data can be modeled for AR tasks using pure SQL. Section 3 focuses on how such dataset should be loaded to Spark, and shows how it can be “broken” into multiple training tasks and datasets. At last some of the complexity involved in training ML models over Spark is the use of Spark’s MLlib which some find tedious and counter-intuitive. I will demonstrate how this task can be accomplished without using MLlib API. Section 4 is dedicated to the prediction or forecast stage. Section 5 concludes.

2. Basic feature engineering for AR time-series models

The trickiest part in modelling time-series data as an auto-regressive problem is to properly organize and format it. A simplified and practical example might make the idea (and challenge) clearer.

Suppose we have hourly transaction data collected over 6 hours — from 8AM to 1PM, and we want to forecast the number of transactions each customer will have at 2PM.

Dataset A — The data format we start with

We decide to use 4 parameters in our regression, which means that we are going to use the number of transactions that the customer had on 10AM to 1PM in order to forecast the number of transactions they will have at 2PM. This reflects a more general intuition about our data —that 4 hours of data is enough to accurately forecast or explain the 5th hour (note that this is a very naive and simplified example, this is obviously not the case in the real world ). This means that if we have enough data samples then a well trained model should be able to learn patterns in the customer’s data that will enable it to accurately forecast the number of transaction in any hour given the 4 hours that preceded (I’m intentionally ignoring the idea of seasonality, which is an important concept in time-series analysis).

To train such model we need to create a training set with 4 features. Each row in our training set will consist of a target variable that represents the number of transactions at a given hour, and 4 parameters that capture the number of transactions in the 4 hours that preceded. By “pivoting” the table above, and creating a sliding window of the given shift ( 4 hours) we can create a dataset that will look something like this (per customer):

Dataset B (the first 2 lines represent the actual data) and the lines below explain what this data represents)

Ideally we will have more data, but the idea is the same, our model is supposed to “see” enough samples of 4 hours in order to learn how to correctly predict the 5th hour, which is our y or target variable. Because we want our model to detect patterns in our data we need to make sure it learns enough — sometimes 6 hours will be enough to accurately forecast the 7th hour, and sometimes we will need at least a week.

One simple way to create such a training set for this task is with an SQL query (that can be run using SparkSQL or some other query engine) that looks something like this:

WITH top_customers as (
--- select the customter ids you want to track
),

transactions as (
SELECT
cust_id,
dt,
date_trunc('hour', cast(event_time as timestamp)) as event_hour,
count(*) as transactions
FROM ourTable
WHERE
dt between cast(date_add('day', -7, current_date) as varchar)
and cast(current_date as varchar)
GROUP BY 1,2,3 Order By event_hour asc
)

SELECT transactions.cust_id,
transactions.event_hour,
day_of_week(transactions.event_hour) day_of_week,
hour(transactions.event_hour) hour_of_day,
transactions.transactions as transactions,
LAG(transactions,1) OVER
(PARTITION BY transactions.cust_id ORDER BY event_hour) AS lag1,
LAG(transactions,2) OVER
(PARTITION BY transactions.cust_id ORDER BY event_hour) AS lag2,
LAG(transactions,3) OVER
(PARTITION BY transactions.cust_id ORDER BY event_hour) AS lag3,
LAG(transactions,4) OVER
(PARTITION BY transactions.cust_id ORDER BY event_hour) AS lag4
FROM transactions
join top_customers
on transactions.cust_id = top_customers.cust_id

The query starts with 2 WITH clauses: the first just extracts a list of customers we are interested in. Here you can add any condition that is supposed to filter in or out specific customers (perhaps you want to filter new customers or only include customers with sufficient traffic). The second WITH clause simply creates the first data set — Dataset A, which pulls a week of data for these customers and selects the customer id, date, hour, and number of transactions.

Finally, the last and most important SELECT clause generates Dataset B, by using SQL lag() function on each row in order to capture the number of transactions in each of the hours that preceded the hour in the row. Our result should look something like this:

"cust_id", "event_hour", "day_of_week", "hour_of_day", "transactions", "lag1", "lag2", "lag3", "lag4"
"Customer-123","2023-01-14 00:00:00.000","6","0","4093",,,,,,
"Customer-123","2023-01-14 01:00:00.000","6","1","4628","4093",,,,,
"Customer-123","2023-01-14 02:00:00.000","6","2","5138","4628","4093",,,,
"Customer-123","2023-01-14 03:00:00.000","6","3","5412","5138","4628","4093",,,
"Customer-123","2023-01-14 04:00:00.000","6","4","5645","5412","5138","4628","4093",
"Customer-123","2023-01-14 05:00:00.000","6","5","5676","5645","5412","5138","4628",
"Customer-123","2023-01-14 06:00:00.000","6","6","6045","5676","5645","5412","5138",
"Customer-123","2023-01-14 07:00:00.000","6","7","6558","6045","5676","5645","5412",

As you can see, each row (that contains all the lagged values), has a customer id, the hour (as truncated date), the hour (represented as integer), the number of transactions that the customer had in that hour (this will be the target variable in our training set), and then 4 fields that capture the lagged number of transactions in the 4 hours that preceded the target variable (these will be features or parameters in which our autoregression model will learn to identify patterns).

Now that we have our dataset ready, we can move to training with Spark

3. Data loading and model training over Spark

3.1 Data Loading
At this point we have our dataset almost ready for model training and prediction. After much of the heavy lifting involved in organizing the data in sliding windows was done using SQL. The next stage is to read the results using Spark and create a typed Spark Dataset that is ready for model training. This process or transformation will be implemented using the function below (explanation immediately follows)

The typed dataset will be based on a case class named FeaturesRecord (line 1–2) that will represent a data sample. Each feature record has 4 properties: key is the customer id, ts is the time that this record captures, label is the target variable — the number of transactions in that specific time or ts, and features is a sequence of values that represent of the number of transactions the customer had in the preceding hours.

The extraction of all the variables in the function above is pretty straightforward. The functional trick (that Scala makes possible) is the way we build the features vector (lines 11–12). Also, as you can see, other features can be added to the feature vector, such as the day_of_week, that will allow our model to learn some seasonality in our data.

3.2 Model Training

The training of the model will proceed in 3 stages. Beforehand, recall that one requirement that complicates our task is that we need to train a model per customer, in order to capture the patterns and seasonality that characterize each. However, what we currently have is a dataset that contains data from all customers pooled together. So the first thing we need to do is “break” it in smaller datasets and then train our model on each.

Luckily, that can be done relatively easily using Spark’s (very useful) function flatMapGroups() which does exactly that.

def predict(customerID:String, records:Iterator[FeaturesRecord]) = ???

featuresDS.groupByKey(_.key).flatMapGroups(predict)

Recall that our dataset is typed and based on the class FeaturesRecord, which has a key property that represents the customer ID. Using Spark’s groupByKey() followed by flatMapGroups() , we are able to “break” our huge dataset, and call the function predict() on each customer’s data (the function actually does both training and prediction). Depending on Spark’s configuration, Spark will also be able to parallelize this call, which means that we would also be able to do so concurrently in a manner that will allow us to scale and do it fast.

Lets look at the predict() function line by line (though the next section will focus on the forecast and prediction staeg:

Main flow

The predict function unfolds in 3 stages and eventually returns a case class of type PredictionResult (I will talk about this in the next section). In the first stage, the function getForcastDatasets() simply divides the records into 2 sequences— one that is based on all records but the last 2 hours (that’s the dataset that our model is going to learn) and a sequence with just the last 2 hours (that’s the sequence that our model is going to predict).

In our case, I have to ignore the last datapoint as it might be incomplete, so training data will consist of all the records but the last 3, and the forecast data will be based on the records N-1 and N-2 (excluding N — the last one).

In the second stage, we train our model on the training set. The train function is rather simple. We basically call XGBoost’s train function using our training dataset and a map with the regression parameters. The function returns a Booster object or a trained model. The Booster object will next be used to run prediction on the prediction sequence. (It is important to note that model selection needs to be done carefully, as not all time-series problems behave in a way that tree models can well capture.)

There is, however, one trick that should be mentioned. As you can see, before we pass the sequence of FeatureRecords to the train method, we call a function named toDMatrix(). XGBoost’s train method requires a DMatrix object to represent its sample data. To do so, we need to transform each sequence of FeatureRecords to a DMatrix. In order to so, I have created the toDMatrix() function using an implicit scala class that take a Seq as a parameter and offers a toDMatrix function that transforms each FeatureRecord into XBoost’s LabeledPoint which is then fed into a DMatrix and returned by the function.

Now that we have the functions required to transform a dataset to training dataset for XGBoost, we can run training and use the trained model to predict the next few hours.

4 Prediction and Forecast

Ideally, during training our model has “seen” many samples of 4 hours, and learned how to forecast the 5th. This can be useful for 2 kinds of tasks: simple forecast and anomaly detection. The latter task is an interesting and useful one in this context. The common technique is pretty simple: if we take time series data of the last few hours and use our model to forecast the last one, then a well trained model should give us a prediction that is pretty close to the actual number. However, if the predicted number is too high or too low, then this could mean 2 things: either the model is not accurate or the real data of the last hour is unexpected or anomalous. Imagine, for example, that we learn to identify traffic pressure over 24 hours in a specific junction, and one day there is an unusual traffic jam due to an accident. If we predict this hour and compare it to the actual data, then we will see that the predicted pressure is significantly lower than the actual one. This might mean that the model is simply wrong or that it accurately captures the fact that there is an anomaly at the moment.

We will follow this logic and measure the difference between the predicted value and the actual value by dividing them. In this way, assuming that our model is accurate, a value of 1 will indicate that the actual data is indeed predicted and expected while a value of less than 1 will mean that actual data seems to be lower than expected and we might need to alert about this customer.

For the purpose of prediction we will use a case class named PredictionResult that will consist of a key (customer), ts (timestamp), label (actual data), prediction (forecast value), and ratio (which is the diff between them).

We generate predictions by calling the Booster’s predict() method on the feature vector. Next, we zip() the feature record with the relevant forecast result in order to construct, for each prediction, a PredictionResult object (line 11) that will also calculate the ratio between the actual and predicted value. Finally we construct a typed dataset using the list of PredictionResult which will look as follows:

As you can see, the ratio column can help us identify interesting anomalies. For most records, you can see that the value is pretty close to 1, which means that the actual value is almost identical to the predicted one, but we can certainly find some anomalies, mostly such that the actual value is 20%–40% higher than the predicted one.

5. Conclusion

Multiple model training in scale is an engineering challenge, especially where data is constantly growing and we want to benefit from more of it. ML pipelines that work great for one, five or ten models might start to choke when the number of concurrently trained models grows to hundreds or even thousands.

Apache Spark is a great tool for training in scale but using it for machine learning tasks is quite different than the common Python-based frameworks, and not suitable for every project. In this post, I demonstrated how we can create a training pipeline that will be able to scale. I picked time-series forecasting as an example, because it is considered a relatively complicated task from an engineering perspective, though it can be easily adapted to regression and classification problems using the same stack.

We saw that much of the heavy lifting involved in creating the training set can (and should) be done using SQL over a query engine that supports SQL, such as Spark for example, for better readability and speed. Next, we used Spark’s Dataset APIs in order to split a huge dataset to many small training tasks. Finally, we used XGBoost’s booster API to create train an auto-regressive TS model, and use it to detect anomalies in our customers’ data. As mentioned earlier, multiple model training is a task that can quickly get very complicated. In this post, I tried to show that Spark’s API give us enough tools to make it relatively straightforward, and keep it simple, as much as we can.

Happy engineering! Hope this was helpful

** You can find a link to the full sample code here
*** there are many resources on time series analysis and auto-regression. I think that the resource that at I have learnt the most from is: Forecasting: Principles and Practice (2nd ed) by Rob J Hyndman and George Athanasopoulos (available online here)

--

--