Data Validation for PySpark Applications using Pandera

New features and concepts.



Data Validation for PySpark Applications using Pandera
Photo by Jakub Skafiriak on Unsplash

 

If you’re a data practitioner, you’ll appreciate that data validation holds utmost importance in ensuring accuracy and consistency. This becomes particularly crucial when dealing with large datasets or data originating from diverse sources. However, the Pandera Python library can help to streamline and automate the data validation process. Pandera is an open-source library meticulously crafted to simplify the tasks of schema and data validation. It builds upon the robustness and versatility of pandas and introduces an intuitive and expressive API specifically designed for data validation purposes.

This article briefly introduces the key features of Pandera, before moving on to explain how Pandera data validation can be integrated with data processing workflows that use native PySpark SQL since the latest release (Pandera 0.16.0). 

Pandera is designed to work with other popular Python libraries such as pandas, pyspark.pandas, Dask, etc. This makes it easy to incorporate data validation into your existing data processing workflows. Until recently, Pandera lacked native support for PySpark SQL, but to bridge this gap, a team at QuantumBlack, AI by McKinsey comprising Ismail Negm-PARI, Neeraj Malhotra, Jaskaran Singh Sidana, Kasper Janehag, Oleksandr Lazarchuk, along with the Pandera Founder, Niels Bantilan, developed native PySpark SQL support and contributed it to Pandera. The text of this article was also prepared by the team, and is written in their words below.

 

The Key Features of Pandera

 

If you are unfamiliar with using Pandera to validate your data, we recommend reviewing Khuyen Tran’sValidate Your pandas DataFrame with Pandera” which describes the basics. In summary here, we briefly explain the key features and benefits of a simple and intuitive API, in-built validation functions and customisation.

 

Simple and Intuitive API

 

One of the standout features of Pandera is its simple and intuitive API. You can define your data schema using a declarative syntax that is easy to read and understand. This makes it easy to write data validation code that is both efficient and effective.

Here’s an example of schema definition in Pandera:

class InputSchema(pa.DataFrameModel):
   year: Series[int] = pa.Field()
   month: Series[int] = pa.Field()
   day: Series[int] = pa.Field()

 

Inbuilt Validation Functions

 

Pandera provides a set of in-built functions (more commonly called checks) to perform data validations. When we invoke validate()on a Pandera schema, it will perform both schema & data validations. The data validations will invoke check functions behind the scenes.

Here’s a simple example of how to run a data check on a dataframe object using Pandera.

class InputSchema(pa.DataFrameModel):
   year: Series[int] = pa.Field(gt=2000, coerce=True)
   month: Series[int] = pa.Field(ge=1, le=12, coerce=True)
   day: Series[int] = pa.Field(ge=0, le=365, coerce=True)

InputSchema.validate(df)

 

As seen above, for year field we have defined a check gt=2000 enforcing that all values in this field must be greater than 2000 otherwise there will be validation failure raised by Pandera.

Here’s a list of all built-in checks available on Pandera by default:

eq: checks if value is equal to a given literal
ne: checks if value is not equal to a given literal
gt: checks if value is greater than a given literal
ge: checks if value is greater than & equal to a given literal
lt: checks if value is less than a given literal
le: checks if value is less than & equal to a given literal
in_range: checks if value is given range
isin: checks if value is given list of literals
notin: checks if value is not in given list of literals
str_contains: checks if value contains string literal
str_endswith: checks if value ends with string literal
str_length: checks if value length matches
str_matches: checks if value matches string literal
str_startswith: checks if value starts with a string literal

 

Custom Validation Functions

 

In addition to the built-in validation checks, Pandera allows you to define your own custom validation functions. This gives you the flexibility to define your own validation rules based on use case.

For instance, you can define a lambda function for data validation as shown here:

schema = pa.DataFrameSchema({
   "column2": pa.Column(str, [
       pa.Check(lambda s: s.str.startswith("value")),
       pa.Check(lambda s: s.str.split("_", expand=True).shape[1] == 2)
   ]),
})

 

Adding Support for PySpark SQL DataFrames to Pandera

 

During the process of adding support to PySpark SQL, we adhered to two fundamental principles: 

  • consistency of interface and user experience
  • performance optimization for PySpark. 

First, let’s delve into the topic of consistency, because it is important that, from a user’s perspective, they have a consistent set of APIs and an interface irrespective of the chosen framework. As Pandera provides multiple frameworks to choose from it was even more critical to have a consistent user experience in PySpark SQL APIs.

With this in mind, we can define the Pandera schema using PySpark SQL as follows:

from pyspark.sql import DataFrame, SparkSession
import pyspark.sql.types as T
import pandera.pyspark as pa

spark = SparkSession.builder.getOrCreate()


class PanderaSchema(DataFrameModel):
       """Test schema"""
       id: T.IntegerType() = Field(gt=5)
       product_name: T.StringType() = Field(str_startswith="B")
       price: T.DecimalType(20, 5) = Field()
       description: T.ArrayType(T.StringType()) = Field()
       meta: T.MapType(T.StringType(), T.StringType()) = Field()


data_fail = [
       (5, "Bread", 44.4, ["description of product"], {"product_category": "dairy"}),
       (15, "Butter", 99.0, ["more details here"], {"product_category": "bakery"}),
   ]

spark_schema = T.StructType(
       [
           T.StructField("id", T.IntegerType(), False),
           T.StructField("product", T.StringType(), False),
           T.StructField("price", T.DecimalType(20, 5), False),
           T.StructField("description", T.ArrayType(T.StringType(), False), False),
           T.StructField(
               "meta", T.MapType(T.StringType(), T.StringType(), False), False
           ),
       ],
   )
df_fail = spark_df(spark, data_fail, spark_schema)

 

In the above code, PanderaSchema defines the schema for incoming pyspark dataframe. It has 5 fields with varying dtypes and enforcement of data checks on id and product_name fields.

class PanderaSchema(DataFrameModel):
       """Test schema"""
       id: T.IntegerType() = Field(gt=5)
       product_name: T.StringType() = Field(str_startswith="B")
       price: T.DecimalType(20, 5) = Field()
       description: T.ArrayType(T.StringType()) = Field()
       meta: T.MapType(T.StringType(), T.StringType()) = Field()

 

Next, we crafted a dummy data and enforced native PySpark SQL schema as defined in spark_schema.

spark_schema = T.StructType(
       [
           T.StructField("id", T.IntegerType(), False),
           T.StructField("product", T.StringType(), False),
           T.StructField("price", T.DecimalType(20, 5), False),
           T.StructField("description", T.ArrayType(T.StringType(), False), False),
           T.StructField(
               "meta", T.MapType(T.StringType(), T.StringType(), False), False
           ),
       ],
   )

df_fail = spark_df(spark, data_fail, spark_schema)

 

This is done to simulate schema and data validation failures.

Here’s the contents of df_fail dataframe:

df_fail.show()

   +---+-------+--------+--------------------+--------------------+
   | id|product|   price|         description|                meta|
   +---+-------+--------+--------------------+--------------------+
   |  5|  Bread|44.40000|[description of p...|{product_category...|
   | 15| Butter|99.00000| [more details here]|{product_category...|
   +---+-------+--------+--------------------+--------------------+

 

Next we can invoke Pandera’s validate function to perform schema and data level validations as follows:

df_out = PanderaSchema.validate(check_obj=df)

 

We will explore the contents of df_out shortly.

 

Performance Optimization for PySpark

 

Our contribution was specifically designed for optimum performance when working with PySpark dataframes, which is crucial when working with large datasets in order to handle the unique challenges of PySpark’s distributed computing environment.

Pandera uses PySpark’s distributed computing architecture to efficiently process large datasets while maintaining data consistency and accuracy. We rewrote Pandera’s custom validation functions for PySpark performance to enable faster and more efficient validation of large datasets, while reducing the risk of data errors and inconsistencies at high volume.

 

Comprehensive Error Reports

 

We made another addition to Pandera for the capability to generate detailed error reports in the form of a Python dictionary object. These reports are accessible via the dataframe returned from the validate function. They provide a comprehensive summary of all schema and data level validations, as per the user’s configurations.

This feature proves to be valuable for developers to swiftly identify and address any data-related issues. By using the generated error report, teams can compile a comprehensive list of schema and data issues within their application. This enables them to prioritize and resolve issues with efficiency and precision.

It is important to note that this feature is currently available exclusively for PySpark SQL, offering users an enhanced experience when working with error reports in Pandera.

In above code example, remember we had invoked validate() on spark dataframe:

df_out = PanderaSchema.validate(check_obj=df)

 

It returned a dataframe object. Using accessors we can extract the error report out of it as follows:

print(df_out.pandera.errors)

 

{
  "SCHEMA":{
     "COLUMN_NOT_IN_DATAFRAME":[
        {
           "schema":"PanderaSchema",
           "column":"PanderaSchema",
           "check":"column_in_dataframe",
           "error":"column 'product_name' not in dataframe Row(id=5, product='Bread', price=None, description=['description of product'], meta={'product_category': 'dairy'})"
        }
     ],
     "WRONG_DATATYPE":[
        {
           "schema":"PanderaSchema",
           "column":"description",
           "check":"dtype('ArrayType(StringType(), True)')",
           "error":"expected column 'description' to have type ArrayType(StringType(), True), got ArrayType(StringType(), False)"
        },
        {
           "schema":"PanderaSchema",
           "column":"meta",
           "check":"dtype('MapType(StringType(), StringType(), True)')",
           "error":"expected column 'meta' to have type MapType(StringType(), StringType(), True), got MapType(StringType(), StringType(), False)"
        }
     ]
  },
  "DATA":{
     "DATAFRAME_CHECK":[
        {
           "schema":"PanderaSchema",
           "column":"id",
           "check":"greater_than(5)",
           "error":"column 'id' with type IntegerType() failed validation greater_than(5)"
        }
     ]
  }
}

 

As seen above, the error report is aggregated on 2 levels in a python dictionary object to be easily consumed by downstream applications such as timeseries visualization of errors over time using tools like Grafana:

  1. type of validation = SCHEMA or DATA
  2. category of errors = DATAFRAME_CHECK or WRONG_DATATYPE, etc.

This new format to restructure the error reporting was introduced in 0.16.0 as part of our contribution.

 

ON/OFF Switch

 

For applications that rely on PySpark, having an On/Off switch is an important feature that can make a significant difference in terms of flexibility and risk management. Specifically, the On/Off switch allows teams to disable data validations in production without requiring code changes.

This is especially important for big data pipelines where performance is critical. In many cases, data validation can take up a significant amount of processing time, which can impact the overall performance of the pipeline. With the On/Off switch, teams can quickly and easily disable data validation if necessary, without having to go through the time-consuming process of modifying code.

Our team introduced the On/Off switch to Pandera so users can easily turn off data validation in production by simply changing a configuration setting. This provides the flexibility needed to prioritize performance, when necessary, without sacrificing data quality or accuracy in development.

To enable validations, set the following in your environment variables:

export PANDERA_VALIDATION_ENABLED=False

 

This will be picked up by Pandera to disable all validations in the application. By default, validation is enabled.

Currently, this feature is only available for PySpark SQL from version 0.16.0 as it is a new concept introduced by our contribution.

 

Granular Control of Pandera’s Execution

 

In addition to the On/Off switch feature, we also introduced a more granular control over the execution of Pandera’s validation flow. This is achieved by introducing configurable settings that allow users to control execution at three different levels:

  1. SCHEMA_ONLY: This setting performs schema validations only. It checks that the data conforms to the schema definition but does not perform any additional data-level validations.
  2. DATA_ONLY: This setting performs data-level validations only. It checks the data against the defined constraints and rules but does not validate the schema.
  3. SCHEMA_AND_DATA: This setting performs both schema and data-level validations. It checks the data against both the schema definition and the defined constraints and rules.

By providing this granular control, users can choose the level of validation that best fits their specific use case. For example, if the main concern is to ensure that the data conforms to the defined schema, the SCHEMA_ONLY setting can be used to reduce the overall processing time. Alternatively, if the data is known to conform to the schema and the focus is on ensuring data quality, the DATA_ONLY setting can be used to prioritize data-level validations.

The enhanced control over Pandera’s execution allows users to strike a fine-tuned balance between precision and efficiency, enabling a more targeted and optimized validation experience.

export PANDERA_VALIDATION_DEPTH=SCHEMA_ONLY

 

By default, validations are enabled, and depth is set to SCHEMA_AND_DATA which can be changed to SCHEMA_ONLY or DATA_ONLY as desired by use case.

Currently, this feature is only available for PySpark SQL from version 0.16.0 as it is a new concept introduced by our contribution.

 

Metadata at Column and Dataframe levels

 

Our team added a new feature to Pandera that allows users to store additional metadata at Field and Schema / Model levels. This feature is designed to allow users to embed contextual information in their schema definitions which can be leveraged by other applications.

For example, by storing details about a specific column, such as data type, format, or units, developers can ensure that downstream applications are able to interpret and use the data correctly. Similarly, by storing information about which columns of a schema are needed for a specific use case, developers can optimize data processing pipelines, reduce storage costs, and improve query performance.

At the schema level, users can store information to help categorize different schema across the entire application. This metadata can include details such as the purpose of the schema, the source of the data, or the date range of the data. This can be particularly useful for managing complex data processing workflows, where multiple schemas are used for different purposes and need to be tracked and managed efficiently.

class PanderaSchema(DataFrameModel):
       """Pandera Schema Class"""
       id: T.IntegerType() = Field(
           gt=5,
           metadata={"usecase": ["RetailPricing", "ConsumerBehavior"],
              "category": "product_pricing"},
       )
       product_name: T.StringType() = Field(str_startswith="B")
       price: T.DecimalType(20, 5) = Field()


       class Config:
           """Config of pandera class"""
           name = "product_info"
           strict = True
           coerce = True
           metadata = {"category": "product-details"}

 

In the above example, we have introduced additional information on the schema object itself. This is allowed at 2 levels: field and schema.

To extract the metadata on schema level (including all fields in it), we provide helper functions as:

PanderaSchema.get_metadata()
The output will be dictionary object as follows:
{
       "product_info": {
           "columns": {
               "id": {"usecase": ["RetailPricing", "ConsumerBehavior"],
                      "category": "product_pricing"},
               "product_name": None,
               "price": None,
           },
           "dataframe": {"category": "product-details"},
       }
}

 

Currently, this feature is a new concept in 0.16.0 and has been added for PySpark SQL and Pandas.

 

Summary

 

We have introduced several new features and concepts, including an On/Off switch that allows teams to disable validations in production without code changes, granular control over Pandera’s validation flow, and the ability to store additional metadata on column and dataframe levels. You can find even more detail in the updated Pandera documentation for version 0.16.0.

As the Pandera Founder, Niels Bantilan, explained in a recent blog post about the release of Pandera 0.16.0:

 

To prove out the extensibility of Pandera with the new schema specification and backend API, we collaborated with the QuantumBlack team to implement a schema and backend for Pyspark SQL … and we completed an MVP in a matter of a few months!

 

This recent contribution to Pandera’s open-source codebase will benefit teams working with PySpark and other big data technologies.

The following team members at QuantumBlack, AI by McKinsey are responsible for this recent contribution: Ismail Negm-PARI, Neeraj Malhotra, Jaskaran Singh Sidana, Kasper Janehag, Oleksandr Lazarchuk. I’d like to thank Neeraj in particular for his assistance in preparing this article for publication.
 
 
Jo Stichbury is a technical writer at QuantumBlack, AI by McKinsey. Technical content creator writing about data science and software. Old-school Symbian C++ developer, now accidental cat herder and goose chaser.