Creating a Data Pipeline with Spark, Google Cloud Storage and Big Query

On-premise and cloud working together to deliver a data product

João Pedro
Towards Data Science

--

Photo by Toro Tseleng on Unsplash

Developing a data pipeline is somewhat similar to playing with lego, you mentalize what needs to be achieved (the data requirements), choose the pieces (software, tools, platforms), and fit them together. And, like in lego, the complexity of the building process is determined by the complexity of the final goal.

It’s possible to go from simple ETL pipelines built with python to move data between two databases to very complex structures, using Kafka to stream real-time messages between all sorts of cloud structures to serve multiple end applications.

But, the reality is that the current data scenario is more like those fancy expensive professional lego sets, with all sorts of pieces to solve specific needs with new ones popping up on every corner. You probably already saw Matt Turck’s 2021 Machine Learning, AI and Data (MAD) Landscape. And the bad part — the instructions manual is not included.

Many open-source data-related tools have been developed in the last decade, like Spark, Hadoop, and Kafka, without mention all the tooling available in the Python libraries. And that’s are the tools I like to cover in my posts, they’re free, they have well-maintained docker images and I can develop self-contained apps with docker that anyone can run anywhere.

But, as the data landscape matures, all the arrows seem to point in the same direction — Cloud computing. And this is, by no means, a surprise. Companies targeting specifically data applications like Databricks, DBT, and Snowflake are exploding in popularity while the classic players (AWS, Azure, and GCP) are also investing heavily in their data products.

And that’s the target of today’s post — We’ll be developing a data pipeline using Apache Spark, Google Cloud Storage, and Google Big Query (using the free tier)

not sponsored.

The tools

Spark is an all-purpose distributed memory-based data processing framework geared towards processing extremely large amounts of data. I covered Spark in many other posts.

Google Cloud Storage (GCS)is Google’s blob storage. The idea is simple: create a bucket and store files in it. Read them later using their “path”. Folders are a lie and the objects are immutable.

Google Big Query (GBQ) is Google’s cloud data warehouse solution. An OLAP-focused database with a serverless SQL query execution capable of processing large amounts of data.

The data

We’re going to build a data pipeline to process and store data from the Brazilian “higher education” (literal translation) census. This census yearly collects many statistics about Brazilian higher education institutions (basically the universities) from many perspectives: institutional, social & demographic, geographic, and so on.

We’ll be dealing with the courses report, which contains statistics about every Brazilian superior course (graduation, post-graduation, doctorate, etc). This data is publically available [CC BY-ND 3.0] in CSV files (one per year).

The implementation

The pipeline idea is simple, download the CSV files to the local machine, convert them into a Delta-Lake table stored in a GCS bucket, do the transformations needed over this delta table, and save the results in a Big Query Table that can be easily consumed by other downstream tasks.

The proposed pipeline. Image by the author.

The bucket will function as a raw file storage that aggregates all the reports in a single place. The BigQuery table will store our service-ready data, already filtered, aggregated, and containing only the useful columns.

As mentioned before, the census collects a lot of statistics from all the higher education institutes which includes, but is not limited to, universities. To simulate a “real situation”, let’s suppose we need to create a table to answer various social/demographic questions about the new students that ingress universities every year.

0. Setting up the environment

All the code is available on this GitHub repository.

You will need docker installed on your local machine to create the Spark cluster and python to download the files.

The docker-compose file:

version: '3'

services:
spark:
build: .
environment:
- SPARK_MODE=master
ports:
- '8080:8080'
- '4040:4040'
volumes:
- ./data:/data
- ./src:/src
spark-worker:
build: .
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark:7077
- SPARK_WORKER_MEMORY=4G
- SPARK_EXECUTOR_MEMORY=4G
- SPARK_WORKER_CORES=4
volumes:
- ./data:/data
- ./src:/src

The spark Dockerfile:

FROM docker.io/bitnami/spark:3.3.1

COPY *.jar $SPARK_HOME/jars

RUN mkdir -p $SPARK_HOME/secrets
COPY ./src/credentials/gcp-credentials.json $SPARK_HOME/secrets/gcp-credentials.json
ENV GOOGLE_APPLICATION_CREDENTIALS=$SPARK_HOME/secrets/gcp-credentials.json

RUN pip install delta-spark

The docker images are already configured to automatically create a new environment from the scratch, so we can focus more on the implementation rather than on the configuration.

Of course, you’ll need to create a Google Cloud Platform account. Even though we are going to use only the free quota, your credit card information is needed. GCP states that it will not charge you unless you explicitly end your free-trial period, but be careful.

Once you’ve created the account follow the steps:

1. Access the GCP console and create a new project. I’ve named my “BigQueryFirstSteps”

2. Authorize the APIs for Google Cloud Storage and BigQuery in the API & Services tab.

3. Create a new bucket in the Google Cloud Storage named censo-ensino-superior

4. Create a new dataset in Google Big Query named censo-ensino-superior

5. Create a new service account in the service account item inside IAM & Administrator tab with the appropriate roles to read, write and create GCP buckets and GBQ tables (I’ve used BigQuery administrator and Storage administrator)

6. Still on this page, generate a new access key (JSON) to the newly created account. The key will be downloaded to your local machine.

Back to the local environment, execute the prepare_env.sh script.

mkdir -p ./data/
mkdir -p ./src/credentials
chmod -R 777 ./src
chmod -R 777 ./data

wget https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-latest-hadoop2.jar

It creates a few folders with specific authorizations (so the spark containers can read and write from them) and downloads the GCS connector for spark.

Now, rename your JSON credentials file to gcp-credentials.json and put it inside the ./src/credentials folder (along with the bucket_name.txt file).

Finally, start the containers with

docker compose up --build

1. Downloading data

Just run the script:

python download_files.py

And the CSV files will be downloaded to the ./data folder.

2. Converting CSV to Delta Lake in GCS

The first thing to do is instantiate a Spark Session and configure it with the Delta-Lake dependencies.

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

from delta import configure_spark_with_delta_pip

MASTER_URI = "spark://spark:7077"


if __name__ == "__main__":
# spark-submit --packages io.delta:delta-core_2.12:2.1.0 --master spark://spark:7077 insert_csv_into_delta_table_gcs.py

builder = SparkSession.builder\
.master(MASTER_URI)\
.appName("Insert CSV Censo into Delta Table")\
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\

spark = configure_spark_with_delta_pip(builder).getOrCreate()

Reading the downloaded CSV files is quite easy, just specify the options and give the path.

# Read the CSV file
df_cursos = (
spark.read
.format("csv")
.option("header", "true")
.option("delimiter", ";")
.option("encoding", "ISO-8859-1")
.option("inferSchema", "true")
.load("/data/MICRODADOS_CADASTRO_CURSOS_*.CSV") # read all csv files
)

To write fewer data in the GCP bucket I’ve also selected just the columns that are useful to the final table: year of the census; course’s identification; area of knowledge; location and type; the number of new students by gender, age and skin color.

# Select Columns
df_cursos = df_cursos.select(
[
# Year
"NU_ANO_CENSO",
# Course AND Institution
"CO_IES",
"NO_CURSO",
"CO_CURSO",
# Total of new students
"QT_ING",
# Age
"QT_ING_0_17",
# ...
# Skin Color
"QT_ING_BRANCA",
"QT_ING_PRETA",
# ...
# Gender COLUMNS
# Place COLUMNS
# Area of Knowledge (CINE) COLUMNS
# FIELDS OMITTED TO MAKE THIS CODE BLOCK SMALLER
]
)

# cast columns to the correct type
for col in df_cursos.columns:
if col in ["NU_ANO_CENSO"] or col.startswith("QT_"):
df_cursos = df_cursos.withColumn(col, df_cursos[col].cast(IntegerType()))
elif col.startswith("IN_"):
df_cursos = df_cursos.withColumn(col, df_cursos[col].cast(BooleanType()))
else:
df_cursos = df_cursos.withColumn(col, df_cursos[col].cast(StringType()))

Writing data to a GCP Bucket is just like writing to the file system but, instead of passing a local path, we need to specify the bucket path with its own syntax: “gs://<bucket_name>/<filepath_to_be_create>”.

df_cursos.write\
.format("delta")\
.partitionBy(["NU_ANO_CENSO"])\
.mode("overwrite")\
.save("gs://censo-ensino-superior/cens_cursos")

The code above creates a new Delta Table inside the censo-ensino-superior bucket named censo_cursos.

We don’t need to handle authentication in the code because the credentials were properly configured during the docker build phase.

To execute this script, access the spark container’s terminal (master or worker) and execute:

# cd into the folder where the script is stored
spark-submit --packages io.delta:delta-core_2.12:2.1.0 --master spark://spark:7077 insert_csv_into_delta_table_gcs.py

After a minute or so, the script will finish and the data will be available in your GCS bucket.

3. Processing Delta Table from GCS to GBQ

Again, the first thing to do is instantiate a Spark Session, the same way did before.

Reading data from a bucket also follows the same logic as writing.

df_censo = (
spark.read
.format("delta")
.load("gs://censo-ensino-superior/cens_cursos")
)

With the data in hand, we can make some transformations as usual.

    df_censo = (
df_censo
# Bachelor and Licenciatura TP_GRAU_ACADEMICO = 4
.filter(
(F.col('TP_GRAU_ACADEMICO') == "1")
| (F.col('TP_GRAU_ACADEMICO') == "4")
)
# Group by CO_CINE_AREA_DETALHADA, CO_UF (STATE) and NU_ANO_CENSO (YEAR)
.groupBy(
'CO_CINE_AREA_DETALHADA', 'CO_UF', 'NU_ANO_CENSO'
)
.agg(
F.max('NO_CINE_AREA_DETALHADA').alias('NO_CINE_AREA_DETALHADA'),

F.max('NO_CINE_AREA_ESPECIFICA').alias('NO_CINE_AREA_ESPECIFICA'),
F.max('CO_CINE_AREA_ESPECIFICA').alias('CO_CINE_AREA_ESPECIFICA'),
F.max('NO_CINE_AREA_GERAL').alias('NO_CINE_AREA_GERAL'),
F.max('CO_CINE_AREA_GERAL').alias('CO_CINE_AREA_GERAL'),

F.max('SG_UF').alias('SG_UF'),
F.max('NO_REGIAO').alias('NO_REGIAO'),
F.max('CO_REGIAO').alias('CO_REGIAO'),

F.count('CO_CURSO').alias('QT_CO_CURSO'),
F.sum('QT_CURSO').alias('QT_CURSO'),
F.sum('QT_VG_TOTAL').alias('QT_VG_TOTAL'),
F.sum('QT_ING').alias('QT_ING'),

F.sum('QT_ING_0_17').alias('QT_ING_0_17'),
F.sum('QT_ING_18_24').alias('QT_ING_18_24'),
F.sum('QT_ING_25_29').alias('QT_ING_25_29'),
F.sum('QT_ING_30_34').alias('QT_ING_30_34'),
F.sum('QT_ING_35_39').alias('QT_ING_35_39'),
F.sum('QT_ING_40_49').alias('QT_ING_40_49'),
F.sum('QT_ING_50_59').alias('QT_ING_50_59'),
F.sum('QT_ING_60_MAIS').alias('QT_ING_60_MAIS'),

F.sum('QT_ING_BRANCA').alias('QT_ING_BRANCA'),
F.sum('QT_ING_PRETA').alias('QT_ING_PRETA'),
F.sum('QT_ING_PARDA').alias('QT_ING_PARDA'),
F.sum('QT_ING_AMARELA').alias('QT_ING_AMARELA'),
F.sum('QT_ING_INDIGENA').alias('QT_ING_INDIGENA'),
F.sum('QT_ING_CORND').alias('QT_ING_CORND'),

F.sum('QT_ING_FEM').alias('QT_ING_FEM'),
F.sum('QT_ING_MASC').alias('QT_ING_MASC'),
)
)

The query above first filters only the bachelor and degree courses (using their codes) and groups the result by year, detailed area, and state, summing up the number of new students (“QT_ING_XYZ”) of each category.

To write data to a BigQuery table we need to use the format “bigquery” along with a few options. Obviously, we need to pass the table and database that we’re writing to. As this table still does not exist, the option “createDisposition” needs to be set to“CREATE_IF_NEEDED”.

The standard GBQ-Spark connector uses a GCS bucket as an intermediate buffer to move the data from/to GBQ. So, we need to pass a “temporaryGcsBucket” option with a bucket name. For simplicity, I’ve used the same bucket created previously.

df_censo.write\
.format("bigquery")\
.mode("overwrite")\
.option("temporaryGcsBucket", "censo-ensino-superior")\
.option("database", "censo_ensino_superior")\
.option("table", "censo_ensino_superior.cursos_graduacao_e_licenciatura")\
.option("createDisposition", "CREATE_IF_NEEDED")\
.save()

Also, be aware that this writing is running on mode=“overwrite”, it will erase any previous data if the table already exists. If you only want to add new rows, use the “append” mode.

And that’s all.

To run this job, just type:

# cd into the folder where the script is stored
spark-submit --packages io.delta:delta-core_2.12:2.1.0,com.google.cloud.spark:spark-3.1-bigquery:0.28.0-preview aggregate_delta_gcs_to_gbq_table.py

And the table will be created and populated, let’s have a look:

Just to exemplify, let’s run a query.

The query below calculates the percentages of men and women in each area of knowledge.

SELECT
CO_CINE_AREA_GERAL,
NO_CINE_AREA_GERAL,
SUM(QT_ING_MASC)/SUM(QT_ING_FEM + QT_ING_MASC) AS PERCENT_MASC,
SUM(QT_ING_FEM)/SUM(QT_ING_FEM + QT_ING_MASC) AS PERCENT_FEM
FROM
`censo_ensino_superior.cursos_graduacao_e_licenciatura`
GROUP BY
NO_CINE_AREA_GERAL,
CO_CINE_AREA_GERAL
ORDER BY
PERCENT_MASC

The results:

Query results with commented translations. Image by the author.

Conclusion

In this post, we learned how to develop a data pipeline using Apache Spark (on-premise) as the data processing tool and the pair Google Cloud Storage (for raw file storage) and Google Big Query (to serve the processed data to analytical queries) as the storage solution.

Interacting with data from the cloud using spark is not so special. The harder part is configuring the environment: finding the right connectors, putting them in the correct place, and using the correct formats. Confession: I struggle a lot to learn how to properly configure the docker images with the credentials. But once this process is mastered, querying and manipulating data flows just as usual.

And that’s one of the things I most like about Apache Spark —It separates the processing logic from the connection logic. If, for example, we change our blob storage solution from GCS to Amazon’s S3, all that we need to do is reconfigure the environment with new AWS credentials and change the read and write commands. All the query/transformation logic remains the same.

But, besides not being “so special”, learning how to interact with the cloud storage components is an extremely important skill, and I hope that this post helped you in having a better understanding of this process.

As always, I’m not an expert in any of the subjects addressed in the post, and I strongly recommend further reading, see the references below.

Thank you for reading! ;)

References

All the code is available in this GitHub repository.
Data used —
Microdados do Censo da Educação Superior, [CC BY-ND 3.0], INEP-Brazilian Gov.

[1] Chambers, B., & Zaharia, M. (2018). Spark: The definitive guide: Big data processing made simple. “ O’Reilly Media, Inc.”.
[2] What is BigQuery? (n.d.). Google Cloud. Link.
[3] Delta Lake Official Page. (n.d.). Delta Lake. https://delta.io/
[4] Databricks. (2020c, September 15). Making Apache SparkTM Better with Delta Lake [Video]. YouTube.
[5]Use the BigQuery connector with Spark. (n.d.). Google Cloud. Link.
[6] Sohail, K. (2021, December 15). Read files from Google Cloud Storage Bucket using local PySpark and Jupyter Notebooks. Medium. Link.

--

--

Bachelor of IT at UFRN. Graduate of BI at UFRN — IMD. Strongly interested in Machine Learning, Data Science and Data Engineering.