Data Engineering: Fast Spatial Joins Across ~2 Billion Rows on a Single Old GPU

Comparing the performance of ORC and Parquet on spatial joins across 2 Billion rows on an old Nvidia GeForce GTX 1060 GPU on a local machine

Daniel Voyce
Towards Data Science

--

Photo by Clay Banks on Unsplash

Over the past few weeks I have been digging a bit deeper into the advances that GPU data processing libraries have made since I last focused on it in 2019.

In 4 years I have found that many of the libraries that were in early alpha in 2019 have matured into solid projects that are being used in real world situations.

I have spent many years in Data Engineering on Big Data solutions, and one of the tasks that we had do regularly was to perform spatial joins of human movement data through multiple polygons. It was a great use case that had numerous layers of potential optimisation, but the simple act of a “point in polygon” test was at the core of everything.

https://en.wikipedia.org/wiki/Point_in_polygon#/media/File:RecursiveEvenPolygon.svg

Previously, we explored various methods to accomplish this, including PostGIS, Redshift, and BigQuery. We ultimately established pipelines to process 200 billion rows daily through approximately 140 million polygons on BigQuery.

With recent advancements in GPU processing, I was intrigued to re-evaluate this task and assess the processing time feasible on a GPU using my laboratory machine. This article details the process of conducting this task on a consumer GPU (GeForce GTX 1060) and the duration it took to execute it.

The setup

You can read about the setup, data-preprocessing, and the rest here :

The dataset used in this experiment consists of CSV latitude and longitude points and a single polygon representing Las Vegas. The dataset was converted to Parquet and ORC to evaluate the performance differences between these formats.

CuSpatial, a library built on rapids.ai specialising in geospatial and GIS applications, was used for the actual performance test.

I intentionally avoided utilising any spatial indexing or other methods to enhance performance. This experiment should be considered as a baseline performance, which can be significantly improved through various techniques.

The dataset is approximately 400GB uncompressed and contains 2.8 billion rows of point data.

The Code

The code provided below relies on Rapids.ai and Dask (specifically dask_cudf) to process the data on a GPU and divide the data to fit the limited GPU memory. This process involved a significant amount of tweaking and format-specific error messages (e.g., issues working on Parquet but not on ORC, and vice versa).

ChatGPT and the NVIDIA team were instrumental in completing this task, I love that everyone is so willing to help in the data community! ChatGPT was useful for understanding some of the features around Dask and chunking the data by providing some workarounds to various errors I unearthed.

The basic setup for each of the experiments is as follows, the only difference is how the ddf is created (read_orc vs read_parquet):

rmm.reinitialize(managed_memory=True)
cluster = LocalCUDACluster()
client = Client(cluster)

polygon_wkt = "POLYGON((-115.074444 36.289153, -115.208314 36.325569, -115.208688 36.325646, -115.259397 36.335589, -115.260628 36.335652, -115.260845 36.335658, -115.276407 36.335967, -115.320842 36.33641, -115.333587 36.30627, -115.368573 36.170149, -115.3682 36.168344, -115.36794 36.16714, -115.353159 36.109493, -115.315922 36.023474, -115.298126 35.998029, -115.102856 35.918199, -115.100108 35.918038, -115.095546 35.917781, -115.084946 35.918409, -115.072079 35.923316, -114.918841 35.983522, -114.919047 36.0226, -114.919067 36.022821, -114.919108 36.022997, -114.93976 36.080677, -115.006009 36.219129, -115.007607 36.222047, -115.008162 36.222806, -115.010791 36.225883, -115.06011 36.27717, -115.068572 36.285772, -115.069297 36.286474, -115.069637 36.286803, -115.070046 36.287197, -115.071477 36.288191, -115.071736 36.288332, -115.074214 36.289087, -115.074444 36.289153))"

#build polygon geoseries
polygon_shapely = wkt.loads(polygon_wkt)
polygon = geopandas.GeoSeries.from_wkt([polygon_wkt])
polygon[0]

def spatial_join(df_partition, polygon):

#build points geoseries
i_ddf = df_partition[['lon', 'lat']].interleave_columns(
points_gseries = cuspatial.GeoSeries.from_points_xy(i_ddf)

#do spatial join
result = cuspatial.point_in_polygon(
points_gseries, polygon
)

#needed for large datasets as wont fit into size_type (https://github.com/rapidsai/cudf/issues/3958)
true_count = np.sum(result)
false_count = len(result) - true_count

return (true_count, false_count)

def wrapped_spatial_join(df_partition):
gpolygon = cuspatial.from_geopandas(polygon)
return spatial_join(df_partition, gpolygon)

sums = ddf.map_partitions(wrapped_spatial_join).compute()

total_true_count = sum(t[0] for t in sums)
total_false_count = sum(t[1] for t in sums)

result = {'True': total_true_count, 'False': total_false_count}
result

Results & File Types

For the results and file types, I chose ORC and Parquet as the export file types in the previous article since they are two of the most widely-used file formats in the big data ecosystem. ORC is often overlooked in favour of Parquet but offers features that can outperform Parquet on certain systems.

I ran the same code on both ORC and Parquet files and obtained the following results:

Parquet Results

3 minutes 9 seconds — Impressive!

sums = ddf.map_partitions(wrapped_spatial_join).compute()

CPU times: user 8.81 s, sys: 2.01 s, total: 10.8 s
Wall time: 3min 9s

total_true_count = sum(t[0] for t in sums)
total_false_count = sum(t[1] for t in sums)

result = {'True': total_true_count, 'False': total_false_count}
result

CPU times: user 1.3 s, sys: 79.2 ms, total: 1.38 s
Wall time: 1.34 s
{'True': 0 2179985812
dtype: int64,
'False': 0 66374320
dtype: int64}

Total: 2,246,360,132 Rows

Total Parquet time from CSV to Result = 23m 04s

Orc Results

6 minutes 18 seconds — Double the time of Parquet but still remarkable!

sums = ddf.map_partitions(wrapped_spatial_join).compute()

CPU times: user 23.8 s, sys: 4.37 s, total: 28.1 s
Wall time: 6min 18s

total_true_count = sum(t[0] for t in sums)
total_false_count = sum(t[1] for t in sums)

result = {'True': total_true_count, 'False': total_false_count}
result

CPU times: user 4.29 s, sys: 301 ms, total: 4.59 s
Wall time: 4.45 s
{'True': 0 2179985812
dtype: int64,
'False': 0 66374320
dtype: int64}

Total: 2,246,360,132 Rows

Total ORC time from CSV to Result = 17m 55s

Overall, Parquet took 23 minutes for the total time versus 18 minutes for ORC. However, the best file format will depend on your use case and the systems you are using.

Pain Points

During this process, I encountered some challenges, such as handling dataframe sizes, limited GPU memory, and dataset size and type issues.

Despite these obstacles, the conclusion drawn from this experiment is that Parquet, although taking longer to convert, was more performant once converted. ORC, on the other hand, was slower in calculations but faster in conversion.

Both formats demonstrated impressive performance on limited hardware, and with basic optimisations, both could be substantially improved.

Challenges in Dask Distributed Implementation

In order to utilise CuSpatial’s spatial join function “point_in_polygon”, the latitude and longitude points must be stored in an interleaved array format:

#interleaved

[
lon,
lat,
lon,
lat,
lon,
lat,
lon,
...
]

#instead of

[[lon,lat],[lon,lat],[lon,lat],[lon,lat],[lon,lat],...]

This specific arrangement is likely due to the efficiency of GPU stream processing for such data structures. The main challenge was handling a dataframe larger than the GPU memory and transforming it into the required interleaved array format.

Initially, I attempted to use dask_cudf to partition the points data I was reading, hoping that it would be sufficient to execute the point_in_polygon function. However, I soon realised that the need to interleave the points made it impossible unless the points (10GB) could fit into the GPU memory (6GB).

In retrospect, the solution was to use map_partitions to process the data in the dask_cudf frame. The interleaving needed to occur within the map_partitions function, rather than prior to passing it, so that only each partition was interleaved instead of the entire dataframe.
This was not as straightforward as it seemed due to issues with Pickle serialisation of the function, ultimately requiring the creation of the “wrapped_spatial_join” function.

In hindsight it was an obvious solution, I would have to use map_partitions to chunk through the data in the dask_cudf frame, the interleave needed to be done within the map_partitions function instead of prior to passing it to map_partitions so that only each chunk was interleaved instead of the whole dataframe.

This also wasn’t as simple as described because dask didn’t like having other functions inside the map_partitions function (it causes problems with Pickle serialisation of the function — hence the final “wrapped_spatial_join” function.

Data size and type challenges

Another issue encountered was the size limitation of the dataset for cuDF, which is restricted by the size of INT_32 (2,147,483,647). My dataset consisted of approximately 2.3 billion records, exceeding this limit.

This meant it was impossible to process the entire dataset at once, necessitating the completion of all operations on data partitions. One example of this was the final count of points within and outside polygons. A straightforward result.value_counts() could not be employed, and a separate calculation needed to be performed for each partition before aggregating the results.

This consideration becomes particularly important for larger or wider datasets that require complex calculations across them, as working on the full dataset might not be feasible. This was really the main focus of this experiment, as now this works on a small GPU, you can be sure that the same processes can be applied to larger GPU’s and much larger datasets!

Conclusions

In terms of performance, Parquet took longer to convert but was more efficient once the conversion was complete. Depending on the workflow, this may be a critical factor to consider. Generally, data only needs to be converted once but may be subjected to multiple analyses, making Parquet the more suitable option.

On the other hand, ORC conversion was faster but less efficient during calculations. For more ad-hoc computations, ORC may be preferable, especially when used with a tool like TrinoDB that has a highly efficient ORC engine.

Both formats exhibited strong performance on limited hardware. With some basic optimisations, it is likely that their performance could be further enhanced. It is valuable to observe the baseline performance of each format.

Future Experiments

In the future, I aim to run the same experiment on multiple polygons and explore methods for handling them. For instance, I will perform a spatial join on a dataset of zip codes in the Las Vegas area to examine the feasibility of processing multiple polygons. Additionally, I plan to utilise a spatial indexing solution such as Uber’s H3 to index the data and assess its impact on the final results.

About the author

Dan Voyce

--

--

CTO, General Nerd. I love new technology, Data geek, Blockchain Hero, Linux Buff and all round evangelist for Open Source