Simplified Delta Lake operations with Mack

I like writing code and each time there is a data processing job to write with some business logic I'm very happy. However, with time I've learned to appreciate the Open Source contributions enhancing my daily work. Mack library, the topic of this blog post, is one of those projects discovered recently.

To put it simply, Mack is a Python library with helper methods for Delta Lake, a little bit like my other recent discover, Apache DataFu is for Apache Spark. It's designed around an interesting concept of pure functions to favor the code reusability without importing the whole library every time:

...
  • ...
  • We avoid classes whenever possible. Classes make it harder to copy / paste little chunks of code into notebooks. It's good to Stop Writing Classes.

Even though the project contains a nice README with plenty of examples, I'll try to complete it in this blog post with some extra explanation for the implementation details of the Slowly Changing Dimension update and duplicates removal.

Slowly Changing Dimension Type 2 update

To recall, a SCD Type 2 consists of updating the validity period of a row and inserting a new active row additionally. Mack helps managing this requirement with the following API:

def type_2_scd_generic_upsert(
	path, updates_df,
	primary_key, attr_col_names,
	is_current_col_name,
	effective_time_col_name, end_time_col_name,

The implementation consists of 2 main steps:

Duplicates removal

Besides the SCD, Mack has several helper methods to remove all duplicates or only a part of them. You may be thinking, "PySpark does it already with drop_duplicates". Indeed, it does and the function is used for the scenario removing all duplicates from a DataFrame. But there are 2 other deduplication functions that rely on the MERGE.

The first of these deduplication methods is kill_duplicates. It removes all duplicated rows from the table. In the end, the table contains only rows that have never been duplicated. The logic first creates a DataFrame with these duplicated rows only. The dataset is created from the WINDOW operation and later joined with the current table so that any match made on the defined columns provokes row deletion:

duplicate_records = (
    data_frame.withColumn(
        "amount_of_records",
        	count("*").over(Window.partitionBy(duplication_columns)),
    )
    .filter(col("amount_of_records") > 1)
    .drop("amount_of_records")
    .distinct()
)

# ...
# Remove all the duplicate records
delta_table.alias("old").merge(
    duplicate_records.alias("new"), q
).whenMatchedDelete().execute()

The second method relies on the same principle but it has a different semantic for creating duplicated_records DataFrame. It completes the partitionBy with an orderBy call on top of a unique primary key column defined as a part of the discussed function drop_duplicates_pkey:

duplicate_records = (
    data_frame.withColumn(
       "row_number",
        row_number().over(
                	Window().partitionBy(duplication_columns).orderBy(primary_key)
     ),
     )
     .filter(col("row_number") > 1)
     .drop("row_number")
     .distinct()
)
# ...
# Remove all the duplicate records
delta_table.alias("old").merge(
    duplicate_records.alias("new"), q
).whenMatchedDelete().execute()

That way only the oldest rows are kept in the table.

Even though I only discussed 2 features, Mack does more. It can print table information in a more human-friendly format, copy a table, or even add a schema validation to the append action. But I'll let you discover them by yourself!


If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!