The power of dbt incremental models for Big Data

An experiment on BigQuery

Suela Isaj
Towards Data Science

--

If you are processing a couple of MB or GB with your dbt model, this is not a post for you; you are doing just fine! This post is for those poor souls that need to scan terabytes of data in BigQuery to calculate some counts, sums, or rolling totals over huge event data on a daily or even at a higher frequency basis. In this post, I will go over a technique for enabling a cheap data injestion and cheap data consumption for “big data”.

Photo by Joshua Sortino on Unsplash

Let’s imagine we have raw data in the granularity of timestamps and we need to calculate the totals per customer. Additionally, we would like to have analysis on the day and mothly basis because we need aggregated sums on different granularity. I will take the example of Twitter likes, where user_id is a Twitter user, post_id is a Twitter post, and timestamp is the timestamp when user_id liked post_id . The raw data is stored as a partioned table on day granularity. If your raw data is not partitioned, that is already your first improvement, so consider migrating the raw data to a partitioned table, following this example. Make sure to make the partition filter as required https://cloud.google.com/bigquery/docs/managing-partitioned-tables#require-filter to avoid accidents where all raw data is being scanned.

Raw log of Twitter post likes

And this data is actually around 869.91 GB! So if I want the number of likes that each user has performed, my [costly] naive query would be:

SELECT 
user_id,
COUNT(*) as nr_likes
FROM twitter_likes

However, the likes on Twitter are immutable events, with no chance to be modified. This allows me to load this data incrementally. I will go over the raw data to load the day, go over the daily aggregated data to load the month, and finally through the daily data to update the counts for the user that have changed.

For both incremental models, I will use the static partitioning technique, which is proven to be the most performant https://discourse.getdbt.com/t/benchmarking-incremental-strategies-on-bigquery/981. This means that I will operate on the partitions and not scan data, so I will pick the two last-day partitions (just in case I have late arriving events), aggregate them, and add them to my dbt model twitter_likes_daily

{% set partitions_to_replace = [
'current_date',
'date_sub(current_date, interval 1 day)'
] %}

{{
config(
materialized='incremental',
partition_by = { 'field': 'date', 'data_type': 'date' },
cluster_by = ["user_id"],
incremental_strategy = 'insert_overwrite'
partitions = partitions_to_replace
)
}}

SELECT
user_id,
DATE(timestamp) as date,
COUNT(*) as nr_likes
FROM {{ source ('twitter', 'twitter_likes')}}
-- I have a fake condition here because
-- my raw table does not let me query without a partition filter
WHERE DATE(timestamp) >= "1990-01-01"

{% if is_incremental() %}
-- this filter will only be applied on an incremental run
AND DATE(timestamp) in ({{ partitions_to_replace | join(',') }})
{% endif %}
GROUP BY 1, 2

twitter_likes_daily is only 55.52 GB and the daily load scans 536.6 MB! Let’s create the monthly aggregates, and for that, we can directly work on twitter_likes_daily instead of the raw twitter_likes

{% set partitions_to_replace = [
'date_trunc(current_date, month)',
'date_trunc(date_sub(current_date, interval 1 month), month)'
] %}

{{
config(
materialized='incremental',
partition_by = { 'field': 'month_year', 'data_type': 'date', "granularity": "month" },
cluster_by = ["user_id"],
incremental_strategy = 'insert_overwrite'
partitions = partitions_to_replace
)
}}

SELECT
user_id,
DATE_TRUNC(date, MONTH) as month_year,
SUM(nr_likes) as nr_likes
FROM {{ref('twitter_likes_daily')}}

{% if is_incremental() %}
-- this filter will only be applied on an incremental run
AND DATE_TRUNC(date, MONTH) in ({{ partitions_to_replace | join(',') }})
{% endif %}
GROUP BY 1, 2

The twitter_likes_monthly is 14.32 GB and 1 load scans only 2.5 GB!

Now let’s go for the totals. For this case, we can decide to run a merge operation to update the totals of only those users that have new likes in the last 2 days. The rest of the users have no updates. So for that case, I can pick the users with likes from today from twitter_likes_daily, calculate their totals intwitter_likes_monthly, and finally merge them in the table with the totals twitter_likes_total . Note that the table is clustered using the user_id because a merge with a cluster on the id is more performant that a regular merge https://discourse.getdbt.com/t/benchmarking-incremental-strategies-on-bigquery/981

{{
config(
materialized='incremental',
unique_key = 'user_id',
cluster_by = ["user_id"],
incremental_strategy = 'merge'
)
}}

{% if is_incremental() %}
WITH users_changed as (
SELECT DISTINCT user_id
FROM {{ref('twitter_likes_daily')}}
WHERE date = current_date()
)

{% endif %}
SELECT
user_id,
sum(nr_likes) as total_likes
FROM {{ref('twitter_likes_monthly')}}

{% if is_incremental() %}
AND user_id in (select user_id from users_changed)
{% endif %}
group by 1

twitter_likes_total is 1.6 GB and scans 16 GB! So from 869.91 GB we ended up scanning only 16 GB and producing a bunch of light and cheap analytical tables.

However, the approach of twitter_likes_total could also have been a table configuration on dbt that only scans the twitter_likes_monthly . That would be like below:

{{
config(
materialized='table',
cluster_by = ["user_id"]
)
}}

SELECT
user_id,
sum(nr_likes) as total_likes
FROM {{ref('twitter_likes_monthly')}}

The above script will scan 14.5 GB so actually lighter than the merge procedure! What just happened here? It is important to understand that a merge can be less effective than a full table recreation. Given that the table that we are scanning twitter_likes_monthly is not that large and 14.5 GB, a full table recreation is more performant than a merge. So when the data ends up not being that large, a merge would be really over-engineering.

Let’s wrap up

The scans of raw immutable events can be efficiently treated by using incremental models on dbt, and specifically, using a static insert+overwrite strategy. The improvement can be huge, from TB to a couple of GB daily for the dbt load, but most importantly, enabling data analysts to run queries on what would have been a costly scan of raw tables to some light, pre-aggregated tables. Finally, while appending data to a table might look theoretically cheaper than re-creating the table, depending on the use case, sometimes using a merge can be less performant than a full table recreation, when the source data is a couple of GB.

--

--

Overall a data person 👩‍💻 — Data and ML Engineer at Churney 🔧 PhD in Computer Science 🤓