Streamline Data Pipelines: How to Use WhyLogs with PySpark for Effective Data Profiling and Validation

Sarthak Sarbahi
Towards Data Science
9 min readJan 7, 2024

--

Photo by Evan Dennis on Unsplash

Data pipelines, made by data engineers or machine learning engineers, do more than just prepare data for reports or training models. It’s crucial to not only process the data but also ensure its quality. If the data changes over time, you might end up with results you didn’t expect, which is not good.

To avoid this, we often use data profiling and data validation techniques. Data profiling gives us statistics about different columns in our dataset. Data validation checks for errors, comparing what we have with what we expect.

A great tool for this is whylogs. It lets you log all sorts of data. After logging, you can create whylogs profiles. These profiles help you track changes in your data, set rules to make sure the data is correct, and show you summary statistics in an easy way.

In this blog, you’ll learn how to use whylogs with PySpark. We’ll go through a practical guide on how to do data profiling and validation. So let’s dive in!

Table of contents

  1. Components of whylogs
  2. Environment setup
  3. Understanding the dataset
  4. Getting started with PySpark
  5. Data profiling with whylogs
  6. Data validation with whylogs

Components of whylogs

Let’s begin by understanding the important characteristics of whylogs.

  • Logging data: The core of whylogs is its ability to log data. Think of it like keeping a detailed diary of your data’s characteristics. It records various aspects of your data, such as how many rows you have, the range of values in each column, and other statistical details.
  • Whylogs profiles: Once data is logged, whylogs creates “profiles”. These profiles are like snapshots that summarize your data. They include statistics like averages, counts, and distributions. This is handy for understanding your data at a glance and tracking how it changes over time.
  • Data tracking: With whylogs, you can track changes in your data over time. This is important because data often evolves, and what was true last month might not be true today. Tracking helps you catch these changes and understand their impact.
  • Data validation: Whylogs allows you to set up rules or constraints to ensure your data is as expected. For example, if you know a certain column should only have positive numbers, you can set a rule for that. If something doesn’t match your rules, you’ll know there might be an issue.
  • Visualization: It’s easier to understand data through visuals. Whylogs can create graphs and charts to help you see what’s going on in your data, making it more accessible, especially for those who are not data experts.
  • Integrations: Whylogs supports integrations with a variety of tools, frameworks and languages — Spark, Kafka, Pandas, MLFlow, GitHub actions, RAPIDS, Java, Docker, AWS S3 and more.

This is all we need to know about whylogs. If you’re curious to know more, I encourage you to check the documentation. Next, let’s work to set things up for the tutorial.

Environment setup

We’ll use a Jupyter notebook for this tutorial. To make our code work anywhere, we’ll use JupyterLab in Docker. This setup installs all needed libraries and gets the sample data ready. If you’re new to Docker and want to learn how to set it up, check out this link.

Start by downloading the sample data (CSV) from here. This data is what we’ll use for profiling and validation. Create a data folder in your project root directory and save the CSV file there. Next, create a Dockerfile in the same root directory.

Dockerfile for this tutorial (Image by author)

This Dockerfile is a set of instructions to create a specific environment for the tutorial. Let’s break it down:

  • The first line FROM quay.io/jupyter/pyspark-notebook tells Docker to use an existing image as the starting point. This image is a Jupyter notebook that already has PySpark set up.
  • The RUN pip install whylogs whylogs[viz] whylogs[spark] line is about adding the necessary libraries to this environment. It uses pip to add whylogs and its additional features for visualization (viz) and for working with Spark (spark).
  • The last line, COPY data/patient_data.csv /home/patient_data.csv, is about moving your data file into this environment. It takes the CSV file patient_data.csv from the data folder on your project directory and puts it in the /home/ directory inside the Docker environment.

By now your project directory should look something like this.

Project directory in VS Code (Image by author)

Awesome! Now, let’s build a Docker image. To do this, type the following command in your terminal, making sure you’re in your project’s root folder.

docker build -t pyspark-whylogs .

This command creates a Docker image named pyspark-whylogs. You can see it in the ‘Images’ tab of your Docker Desktop app.

Docker image built (Image by author)

Next step: let’s run this image to start JupyterLab. Type another command in your terminal.

docker run -p 8888:8888 pyspark-whylogs

This command launches a container from the pyspark-whylogs image. It makes sure you can access JupyterLab through port 8888 on your computer.

After running this command, you’ll see a URL in the logs that looks like this: http://127.0.0.1:8888/lab?token=your_token. Click on it to open the JupyterLab web interface.

Docker container logs (Image by author)

Great! Everything’s set up for using whylogs. Now, let’s get to know the dataset we’ll be working with.

Understanding the dataset

We’ll use a dataset about hospital patients. The file, named patient_data.csv, includes 100k rows with these columns:

  • patient_id: Each patient’s unique ID. Remember, you might see the same patient ID more than once in the dataset.
  • patient_name: The name of the patient. Different patients can have the same name.
  • height: The patient’s height in centimeters. Each patient has the same height listed for every hospital visit.
  • weight: The patient’s weight in kilograms. It’s always more than zero.
  • visit_date: The date the patient visited the hospital, in the format YYYY-MM-DD.

As for where this dataset came from, don’t worry. It was created by ChatGPT. Next, let’s start writing some code.

Getting started with PySpark

First, open a new notebook in JupyterLab. Remember to save it before you start working.

We’ll begin by importing the needed libraries.

# Import libraries
from typing import Any
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from whylogs.api.pyspark.experimental import collect_column_profile_views
from whylogs.api.pyspark.experimental import collect_dataset_profile_view
from whylogs.core.metrics.condition_count_metric import Condition
from whylogs.core.relations import Predicate
from whylogs.core.schema import DeclarativeSchema
from whylogs.core.resolvers import STANDARD_RESOLVER
from whylogs.core.specialized_resolvers import ConditionCountMetricSpec
from whylogs.core.constraints.factories import condition_meets
from whylogs.core.constraints import ConstraintsBuilder
from whylogs.core.constraints.factories import no_missing_values
from whylogs.core.constraints.factories import greater_than_number
from whylogs.viz import NotebookProfileVisualizer
import pandas as pd
import datetime

Then, we’ll set up a SparkSession. This lets us run PySpark code.

# Initialize a SparkSession
spark = SparkSession.builder.appName('whylogs').getOrCreate()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled","true")

After that, we’ll make a Spark dataframe by reading the CSV file. We’ll also check out its schema.

# Create a dataframe from CSV file
df = spark.read.option("header",True).option("inferSchema",True).csv("/home/patient_data.csv")
df.printSchema()

Next, let’s peek at the data. We’ll view the first row in the dataframe.

# First row from dataframe
df.show(n=1, vertical=True)

Now that we’ve seen the data, it’s time to start data profiling with whylogs.

Data profiling with whylogs

To profile our data, we will use two functions. First, there’s collect_column_profile_views. This function collects detailed profiles for each column in the dataframe. These profiles give us stats like counts, distributions, and more, depending on how we set up whylogs.

# Profile the data with whylogs
df_profile = collect_column_profile_views(df)
print(df_profile)

Each column in the dataset gets its own ColumnProfileView object in a dictionary. We can examine various metrics for each column, like their mean values.

whylogs will look at every data point and statistically decide wether or not that data point is relevant to the final calculation

For example, let’s look at the average height.

df_profile["height"].get_metric("distribution").mean.value

Next, we’ll also calculate the mean directly from the dataframe for comparison.

# Compare with mean from dataframe
df.select(F.mean(F.col("height"))).show()

But, profiling columns one by one isn’t always enough. So, we use another function, collect_dataset_profile_view. This function profiles the whole dataset, not just single columns. We can combine it with Pandas to analyze all the metrics from the profile.

# Putting everything together
df_profile_view = collect_dataset_profile_view(input_df=df)
df_profile_view.to_pandas().head()

We can also save this profile as a CSV file for later use.

# Persist profile as a file
df_profile_view.to_pandas().reset_index().to_csv("/home/jovyan/patint_profile.csv",header = True,index = False)

The folder /home/jovyan in our Docker container is from Jupyter's Docker Stacks (ready-to-use Docker images containing Jupyter applications). In these Docker setups, 'jovyan' is the default user for running Jupyter. The /home/jovyan folder is where Jupyter notebooks usually start and where you should put files to access them in Jupyter.

And that’s how we profile data with whylogs. Next, we’ll explore data validation.

Data validation with whylogs

For our data validation, we’ll perform these checks:

  • patient_id: Make sure there are no missing values.
  • weight: Ensure every value is more than zero.
  • visit_date: Check if dates are in the YYYY-MM-DD format.

Now, let’s start. Data validation in whylogs starts from data profiling. We can use the collect_dataset_profile_view function to create a profile, like we saw before.

However, this function usually makes a profile with standard metrics like average and count. But what if we need to check individual values in a column as opposed to the other constraints, that can be checked against aggregate metrics? That’s where condition count metrics come in. It’s like adding a custom metric to our profile.

Let’s create one for the visit_date column to validate each row.

def check_date_format(date_value: Any) -> bool:
date_format = '%Y-%m-%d'
try:
datetime.datetime.strptime(date_value, date_format)
return True
except ValueError:
return False

visit_date_condition = {"is_date_format": Condition(Predicate().is_(check_date_format))}

Once we have our condition, we add it to the profile. We use a Standard Schema and add our custom check.

# Create condition count metric
schema = DeclarativeSchema(STANDARD_RESOLVER)
schema.add_resolver_spec(column_name="visit_date", metrics=[ConditionCountMetricSpec(visit_date_condition)])

Then we re-create the profile with both standard metrics and our new custom metric for the visit_date column.

# Use the schema to pass to logger with collect_dataset_profile_view
# This creates profile with standard metrics as well as condition count metrics
df_profile_view_v2 = collect_dataset_profile_view(input_df=df, schema=schema)

With our profile ready, we can now set up our validation checks for each column.

builder = ConstraintsBuilder(dataset_profile_view=df_profile_view_v2)
builder.add_constraint(no_missing_values(column_name="patient_id"))
builder.add_constraint(condition_meets(column_name="visit_date", condition_name="is_date_format"))
builder.add_constraint(greater_than_number(column_name="weight",number=0))

constraints = builder.build()
constraints.generate_constraints_report()

We can also use whylogs to show a report of these checks.

# Visualize constraints report using Notebook Profile Visualizer
visualization = NotebookProfileVisualizer()
visualization.constraints_report(constraints, cell_height=300)

It’ll be an HTML report showing which checks passed or failed.

whylogs constraints report (Image by author)

Here’s what we find:

  • The patient_id column has no missing values. Good!
  • Some visit_date values don’t match the YYYY-MM-DD format.
  • A few weight values are zero.

Let’s double-check these findings in our dataframe. First, we check the visit_date format with PySpark code.

# Validate visit_date column
df \
.withColumn("check_visit_date",F.to_date(F.col("visit_date"),"yyyy-MM-dd")) \
.withColumn("null_check",F.when(F.col("check_visit_date").isNull(),"null").otherwise("not_null")) \
.groupBy("null_check") \
.count() \
.show(truncate = False)

+----------+-----+
|null_check|count|
+----------+-----+
|not_null |98977|
|null |1023 |
+----------+-----+

It shows that 1023 out of 100,000 rows don’t match our date format. Next, the weight column.

# Validate weight column
df \
.select("weight") \
.groupBy("weight") \
.count() \
.orderBy(F.col("weight")) \
.limit(1) \
.show(truncate = False)

+------+-----+
|weight|count|
+------+-----+
|0 |2039 |
+------+-----+

Again, our findings match whylogs. Almost 2,000 rows have a weight of zero. And that wraps up our tutorial. You can find the notebook for this tutorial here.

Conclusion

In this tutorial, we’ve covered how to use whylogs with PySpark. We began by preparing our environment using Docker, and then we did data profiling and validation on our dataset. Remember, this is just the beginning. Whylogs offers a lot more, from tracking data changes (data drift) in machine learning to checking data quality in real-time streams.

I sincerely hope this guide was beneficial for you. Should you have any questions, please don’t hesitate to drop them in the comments below.

References

--

--