Introduction to Historical Loads – for Data Engineers.

There are probably few things in life that will strike more fear and tumult in the heart of the Data Engineer than historical loads. You know, on the surface it seems like such an innocent thing. How could it possibly be, just take a bunch of data stored somewhere and shove it into a table. If only. Life never works that way, and neither does the historical load. You would think after decades we all would have figured it out you know. Is it because we don’t do it enough? Maybe it’s like regex, you just figure it out as you go every single time, telling yourself you’ll do it right next time.

What is a historical load anyways?

I mean it does beg the question, what exactly is a historical data load anyways? When does a data load move over some invisible line to become a historical load, as opposed to a “normal” load. This is a strange question, but generally, I think you know a historical data load when you see one. Probably because it fits into some box that is “outside” normal. Maybe that’s a bad way to approach historical loads, should one process handle it all? What about the ever-infamous “idempotent” data loads, shouldn’t your ETL loads be a Kraken raised from the depths, able to gobble all in its path.

One thing is clear though, since the days of the yore when the Database Developer reigned supreme, the historical data load has always been a sticky issue to deal with for data folk.

What makes historical loads suck so much?

I think there are many and varying reasons that historical loads are such a pain.

  • Historical data may or may not be in the correct/different format.
  • Historical data has a high likely hood of having data quality issues (that cause loads to break).
  • Historical data loads choke the normal “production” pipeline.
  • It’s not always clear “how” the historical load should be done.

It seems to me that the general challenges for historical data loads fall into two categories.

  • Data size, quality, and format issues.
  • How do I do this issues.
Photo by Shane Aldendorff on Unsplash

I think probably one of the most common stumbling blocks for the historical data load is, “how do I/should I do this?” This is an important question that has people taking sides and lobbing grenades. Should your normal daily data transformation scripts be so idempotent and architecture scalable that it simply just does it, historical or normal load, like it’s just another day?

I think this is what most people hope for, as the true north star. But is it realistic?

Probably not. The real world is often much different than what we wish for.

3 Practical Approaches to Historical Data Loads.

This is a strange topic, hard to grasp, but let’s talk about three approaches that one could take when designing historical data loads. I would be interested to hear if there are more or other approaches folks use, but at a high level, these are the ones that I see the most.

  • Historical load integrated directly into normal load/transform scripts.
  • Historical load completely separate from normal load/transform scripts.
  • Quasi-integration of historical load and normal load/transform scripts.

Maybe over-simplified? Maybe not. I think it probably for the most boils down to these three areas. You can either NOT incorporate historical loads into your normal transformation scripts, you CAN, or you can do the sorta route, where you reuse many of the same functions, with some rewritten to specifically support historical loads.

Before you even start talking about code though, one of the main pieces most people forget is architecture and scalability. Can the environment you normally work in ever handle this load? Will it crash the cluster, or cause other pipelines to not run by hogging all the resources? The timing of when you run historical loads, and the compute that is available to run those scripts is a very important decision.

Let’s take a look at a rubber-meets-the-road example for each of the three scenarios listed above, direct integration, no integration, and partial integration of historical loads into normal transform pipelines.

Historical Data Load Use Case – Examples

Ok, I think it will be easier to examine the pros and cons of each style by doing an example project. We going to use the open source Divvy bike trips data set. One nice feature of this data set is that it’s been produced for such a long period of time … you guessed it … the formats changed over time. I mean isn’t that the classic historical data load? That’s how it works in real life. Let’s see how each of our three approaches will solve these common challenges.

Let’s write a simple PySpark data transformation script that expects to read some basic CSV files discussed above and then cast columns to the correct data type, does some other basic transforms and then writes out parquet files.

Here is the basic script. Contrived and oversimplified, but should suffice for our purposes.

from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.functions as F


def read_flat_files(spark: SparkSession, input_uri: str) -> DataFrame:
    df = spark.read.csv(input_uri, header='true')
    return df


def transform_data_types(input_df: DataFrame) -> DataFrame:
    output_df = input_df.withColumn('ride_id', F.col('ride_id').cast("integer"))
    return output_df


def transfrom_data(input_df: DataFrame) -> DataFrame:
    output_df = input_df.withColumn('member',
                                    F.when(F.col('member_casual') == 'member', F.lit('yes')).otherwise(F.lit('no'))
                                    )
    return output_df


if __name__ == '__main__':
    spark = SparkSession.builder.appName("DataLoader") \
    .config('spark.sql.shuffle.partitions', '10') \
    .enableHiveSupport() \
    .getOrCreate()
    data_warehouse = 'data_warehouse/trips'

    df = read_flat_files(spark, 'normal_data_files/')
    df_types = transform_data_types(df)
    final = transfrom_data(df_types)
    final.write.parquet(data_warehouse, mode='append')
    spark.read.parquet(data_warehouse).show()

You can see our output shows our yes and no for the transformed members’ column etc.

Where it all goes wrong.

Ok, so what if your boss tells you that someone supplied some historic data, in the same CSV format, they were dropped in such and such a location, and you should load those records into the warehouse. Easy right? We just re-run our script pointing at the historical location with the following code … df = read_flat_files(spark, 'historic_data_files/').

What happens? It blows up of course.

Traceback (most recent call last):
  File "/Users/danielbeach/PycharmProjects/historicalLoads/main.py", line 30, in <module>
    df_types = transform_data_types(df)
  File "/Users/danielbeach/PycharmProjects/historicalLoads/main.py", line 11, in transform_data_types
    output_df = input_df.withColumn('ride_id', F.col('ride_id').cast("integer"))
  File "/usr/local/Cellar/apache-spark/3.1.1/libexec/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 2455, in withColumn
  File "/usr/local/Cellar/apache-spark/3.1.1/libexec/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
  File "/usr/local/Cellar/apache-spark/3.1.1/libexec/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
pyspark.sql.utils.AnalysisException: cannot resolve '`ride_id`' given input columns: [bikeid, birthyear, from_station_id, from_station_name, gender, starttime, stoptime, to_station_id, to_station_name, trip_id, tripduration, usertype];
'Project [trip_id#16, starttime#17, stoptime#18, bikeid#19, tripduration#20, from_station_id#21, from_station_name#22, to_station_id#23, to_station_name#24, usertype#25, gender#26, birthyear#27, cast('ride_id as int) AS ride_id#40]
+- Relation[trip_id#16,starttime#17,stoptime#18,bikeid#19,tripduration#20,from_station_id#21,from_station_name#22,to_station_id#23,to_station_name#24,usertype#25,gender#26,birthyear#27] csv

Now comes historical load decision time.

And so it happens, and now comes the moment every Data Engineer dreads, what direction to turn from here. I mean it’s one thing to simply fix the problems as they surface, it’s another thing to do so in an elegant fashion. Do we start modifying the current code to meet the demands of the new but old data? What if it muddies the water? Is it worth it for code that gets run only once, or a few times at most?

Is it silly to write an entirely new script, that is duplicates a lot of the functionality of the code that already exists? Do we something in between? What oh what do we do!?

Photo by Jason Strull on Unsplash

Option 1 – Historical load integrated directly into normal load/transform scripts.

Our first option is to just decide that we should take our normal load and transform code, and just make adjustments in that codebase to support historic data loads. This has some advantages.

  • We minimize the duplication of code.
  • We can find everything in a single spot.
  • It’s easier to reason about when everything is in one place.
  • Codebase becomes more flexible and extensible when it’s refactored to handle other situations.

How might this look in our case? Well, we know off the bat the column names are different between our historical files and current files, and since we are using Spark for our data transformations … we could just check the column names upon read and that will tell us which files we are working on. Simple enough. Of course this is contrived and there is holes in this logic, but just play along will you?

def check_for_historical_file_read(input_df: DataFrame) -> bool:
    if 'ride_id' in input_df.columns:
        return False  # set historical flag as false
    else:
        return True  # set historical flag as true

And now we have a flag we can use as switch.

is_historical = check_for_historical_file_read(df)

Using this sort of switch logic we can make changes to the logic if some of our functions to handle our historic data … for example.

def transform_data_types(input_df: DataFrame, is_historical: bool) -> DataFrame:
    if is_historical:
        input_df = input_df.withColumnRenamed('trip_id', 'ride_id')
    output_df = input_df.withColumn('ride_id', F.col('ride_id').cast("integer"))
    return output_df

and

def transform_data(input_df: DataFrame, is_historical: bool) -> DataFrame:
    check_column = 'member_casual'
    check_against = 'member'
    if is_historical:
        check_column = 'usertype'
        check_against = 'Subscriber'
    output_df = input_df.withColumn('member',
                                    F.when(F.col(check_column) == check_against, F.lit('yes')).otherwise(F.lit('no'))
                                    )
    return output_df

And now we have a transform/load script that can both handle the normal data load as well as our historical files. Of course in this simple example it was quite easy, but it did require significant changes to several functions and methods. You can well guess that with a larger and real-life more complicated codebase, that such changes could easily touch many many more functions, with much more complex logic.

One could easily envision a case where the logic and complexity of putting all the logic inside a single codebase could greatly muddy the waters, causing complex logic to be more complex and harder to debug. Is this the right decision?

Option 2 – Historical load completely separate from normal load/transform scripts.

Photo by Anne Nygård on Unsplash

But happens if we don’t like the the previous option of total integration and we swing all the way to the other end? Total separation of historical load logic from the codebase can seem like a good idea on the surface.

  • Separation of the codebase allows for fewer bugs.
  • Separation of the codebase allows for easier debugging and reasoning.
  • Reduces the complexity of functions and methods.

But, on the other side of the coin there are plenty of downsides to this type of total separation approach.

  • Duplication of logic and code between two codebases.
  • Writing historical code separately can lead to lower standards etc.

I mean suppose in our case we decide to just write the functions over, specific to the historical load process, and now we have pretty much duplicated logic, in different places, for different code. This seems counter-intuative.

def transform_data(input_df: DataFrame) -> DataFrame:
    output_df = input_df.withColumn('member',
                                    F.when(F.col('usertype') == 'Subscriber', F.lit('yes')).otherwise(F.lit('no'))
                                    )
    return output_df

and

def transfrom_data(input_df: DataFrame) -> DataFrame:
    output_df = input_df.withColumn('member',
                                    F.when(F.col('member_casual') == 'member', F.lit('yes')).otherwise(F.lit('no'))
                                    )
    return output_df

This just seems like not the best idea, although it would work perfectly fine. Sure, we are breaking a few rules with duplicating code, but we are also keeping our normal and most often used codbase clean and more simple, which probably means less complex and error prone. But is it worth it? Who knows, probably depends on the data team.

Do we have to pick one of these two options, both on the opposite end of the spectrum? No, there is a third option.

Option 3 – Quasi-integration of historical load and normal load/transform scripts.

I think this third option is one less explored by Data Engineers, which is shame. Finding options that are outside the box are how the best data solutions are built. Not choosing to fully integrate historical data loads into the normal transformation scripts is a great option because you keep complexity to a minimum and reduce the chance of bugs and over-engineering. But, total separation is not the best choice either, you end up with complete code duplication is many spots. This can lead to headaches and bugs down the road when business logic changes and you now have two codebases to maintain.

There is yet a more excellent way.

I would recommend when you next run into a historical load, you choose the third option if you can. Identify the minimum amount of transformations needed to be updated to handle the historical load. There is a very good chance that this is not all of the codebase, but only a smaller subset of functionally that is required to be updated. If you have a function or two that are overcomplicated by handling both use cases, split only those functions out, writing new ones for the historical load.

  • Update existing functions to support the historical loads in as few places as possible.
  • Write new historical load functions only where the complexity of the logic requires it.
  • Write a single master historical load script to pull this all together.

Update the minimum set of functions only, and re-use many of the other methods as possible, write one or two new functions where necessary to reduce complexity. Write a single overarching historical load script that combines all of this existing, updated, and new functions into a single load script.

Now you have the best of both worlds.