Streaming Big Data Files from Cloud Storage

Methods for efficient consumption of large files

Chaim Rand
Towards Data Science

--

Photo by Aron Visuals on Unsplash

Working with very large files can pose challenges to application developers related to efficient resource management and runtime performance. Text file editors, for example, can be divided into those that can handle large files, and those that make your CPU choke, make your PC freeze, and make you want to scream. These challenges are exacerbated when the large files reside in a remote storage location. In such cases one must consider the manner in which the files will be pulled to the application while taking into account: bandwidth capacity, network latency, and the application’s file access pattern. In this post we consider the case in which our data application requires access to one or more large files that reside in cloud object storage. This continues a series of posts on the topic of efficient ingestion of data from the cloud (e.g., here, here, and here).

Before we get started, let’s be clear…when using cloud storage, it is usually not recommended to work with files that are particularly large. If you are working with multiple data files, it is also preferred not to choose a file size that is particularly small (due to the overhead incurred by numerous requests to the storage service). The sweet spot varies across platforms but is usually somewhere between a few MBs to a few hundred MBs. (If you rely heavily on cloud storage, you may want to design a simple experiment to test this out.) Unfortunately, we don’t always have control over the data design process and sometimes have no choice but to work with the cards we’ve been dealt.

Another good practice, especially when working with large files, is to choose a format that supports partial file reads — that is, a format that does not require ingesting the entire file in order to process any part of it. A few examples of such files are:

  • a simple text file,
  • a sequential dataset — a dataset containing individual records that are grouped into a single file in a sequential manner,
  • a dataset that is stored in a columnar format, such as Apache Parquet, that is specifically designed to enable loading only selected columns of the data, and
  • a video file that is stored in a format that allows playback from any time offset (as most formats do).

In this post, we assume that our large files allow for partial reads. We will consider several options for ingesting the file contents in Python and measure how they perform in different application use cases. Although our demonstrations will be based on AWS’s object storage service, Amazon S3, most of what we write pertains equally to any other object storage service. Please note that our choice of specific services, APIs, libraries, etc., should not be viewed as an endorsement of these over any other option. The best solution for streaming data from the cloud is very much dependent on the details of your project and your environment, and it is highly recommended that you conduct your own in-depth analysis before drawing any conclusions.

Direct Download from Amazon S3

In this post, we will assume that we are downloading files directly from Amazon S3. However, it should be noted that there a number of services and/or solutions that include an intermediate step between the object storage and the application. AWS, for example, offers services such as Amazon FSx and Amazon EFS for mirroring your data in a high-performance file system in the cloud. AI Store offers a kubernetes-based solution for a lightweight storage stack adjacent to the data consuming applications. These kinds of solutions might alleviate some of the challenges associated with using large files, e.g., they may reduce latency and support higher bandwidth. On the other hand, they often introduce a bunch of new challenges related to deployment, maintenance, scalability, etc. Plus, they cost extra money.

Metrics for Comparative Performance Measurement

In the next few sections, we will describe different ways of pulling large files from Amazon S3 in Python. To compare the behavior of these methods, we will use the following metrics:

  • Time to first sample — how much time does it take until the first sample in the file is read.
  • Average sequential read time — what is the average read time per sample when iterating sequentially over all of the samples.
  • Total processing time — what is the total processing time of the entire data file.
  • Average random read time— what is the average read time when reading samples at arbitrary offsets.

Different applications will have different preferences as to which of these metrics to prioritize. For example, a video streaming application might prioritize a low time-to-first-sample in order to improve viewer experience. It will also require efficient reads at arbitrary offsets to support features such as fast-forward. On the other hand, as long as the average time-per-sample passes a certain threshold (e.g., 30 frames per second) optimizing this metric is not important. In contrast, a deep learning training application might prioritize reducing the average sequential read and total processing time in order to minimize the potential for performance bottlenecks in the training flow.

Toy Example

To facilitate our discussion, we create a 2 GB binary file and assume that the file contains 2,048 data samples, each one being 1 MB in size. The code block below includes snippets for: creating a file with random data and uploading it to Amazon S3 (using boto3), iterating over all of the samples sequentially, and sampling the data at non-sequential file offsets. For this and all subsequent code snippets, we assume that your AWS account and local environment have been appropriately configured to access Amazon S3.

import os, boto3

KB = 1024
MB = KB * KB

def write_and_upload():
# write 2 GB file
with open('2GB.bin', 'wb') as f:
for i in range(2*KB):
f.write(os.urandom(MB))

# upload to S3
bucket = '<s3 bucket name>'
key = '<s3 key>'
s3 = boto3.client('s3')
s3.upload_file('2GB.bin', bucket, key)

def read_sequential(f, t0):
t1 = time.time()
x = f.read(MB)
print(f'time of first sample: {time.time() - t1}')
print(f'total to first sample: {time.time() - t0}')
t1 = time.time()
count = 0
while True:
x = f.read(MB)
if len(x) == 0:
break
count += 1
print(f'time of avg read: {(time.time() - t1)/count}')

def fast_forward(f):
t1 = time.time()
total = 10
for i in range(total):
f.seek(i * 100 * MB)
t1 = time.time()
x = f.read(MB)
print(f'time of avg random read: {(time.time() - t1)/total}')

Downloading from S3 to Local Disk

The first option we consider is to download the large file to a local disk and then read it from there in the same manner that we would read any other local file. There a number of methods for downloading a file to a local disk. The three we will evaluate here are: Python boto3 API, AWS CLI, and S5cmd.

Boto3 File Download

The most straightforward way to pull files from Amazon S3 in Python is to use the dedicated Boto3 Python library. In the code block below, we show how to define an S3 client and use the download file API to pull our file from S3 to a local path. The API takes a TransferConfig object which includes controls for tuning the download behavior. In our example, we have left the settings with their default values.

import boto3, time

bucket = '<s3 bucket name>'
key = '<s3 key>'
local_path = '<local path>'

s3 = boto3.client('s3')

config = boto3.s3.transfer.TransferConfig(
multipart_threshold=8 * MB,
max_concurrency=10,
multipart_chunksize=8 * MB,
num_download_attempts=5,
max_io_queue=100,
io_chunksize=256 * KB,
use_threads=True,
max_bandwidth=None)

t0 = time.time()
s3.download_file(bucket, key, local_path, Config=config)

with open(local_path, 'rb') as f:
read_sequential(f,t0)

print(f'total time: {time.time()-t0}')

We ran this script (and all subsequent scripts) in our local environment and averaged the results over 10 trials. Not surprisingly, the average time-to-first-sample was relatively high, ~21.3 seconds. This is due to the fact that we needed to wait until the entire file was downloaded before opening it. Once the download was completed, the average read time for both sequential and arbitrary samples was miniscule, as we would expect from any other local file.

Boto3 includes a similar API, download_fileobj, for downloading the file directly into memory (e.g., using an io.BytesIO object). However, this would generally not be recommended when working with large files.

AWS CLI

The AWS CLI utility offers similar functionality for command line use. AWS CLI is written in Python and uses the same underlying APIs as Boto3. Some developers feel more comfortable with this mode of use. The download configuration settings are controlled via the AWS config file.

import shlex, time
from subprocess import Popen

bucket = '<s3 bucket name>'
key = '<s3 key>'
local_path = '<local path>'

cmd = f'aws s3 cp s3://{bucket}/{key} {local_path}'
p = Popen(shlex.split(cmd)).wait()

with open(local_path, 'rb') as f:
read_sequential(f,t0)

print(f'total time: {time.time()-t0}')

Not surprisingly, the results of running this script (with the default configuration settings) were nearly identical to the previous Boto3 results.

S5cmd

We covered the S5cmd command line utility at length in a previous post in which we showed its value in downloading hundreds of small files from cloud storage in parallel. Contrary to the previous method, S5cmd is written in the Go Programming Language and is thus able to better utilize the underlying resources (e.g., CPU cores and TCP connections). Check out this informative blog for more details on how S5cmd works and its significant performance advantages. The S5cmd concurrency flag allows for controlling the download speed. The code block below demonstrates the use of S5cmd with the concurrency set to 10.

import shlex, time
from subprocess import Popen

bucket = '<s3 bucket name>'
key = '<s3 key>'
local_path = '<local path>'

s5cmd = f's5cmd cp --concurrency 10 s3://{bucket}/{key} {local_path}'
p = Popen(shlex.split(cmd)).wait()

with open(local_path, 'rb') as f:
read_sequential(f,t0)

print(f'total time: {time.time()-t0}')

Sadly, we were not able to reproduce the overwhelming performance benefit of S5cmd that was previously demonstrated on a 50 GB file. The average time-to-first-sample was ~23.1 seconds, slightly higher than our previous results.

Multi-part vs. Single-threaded Download

Each of the previous methods employed multi-part downloading under the hood. In multi-part downloading, multiple threads are run in parallel, each of which is responsible for downloading a disjoint chunk of the file. Multi-part downloading is critical for pulling large files from the cloud in a timely fashion. To demonstrate its importance, we reran the Boto3 experiment with the use_threads flag set to False, effectively disabling multi-part downloading. This caused the resultant average time-to-first-sample to jump to a whopping 156 seconds.

The Art of Prefetching Data Files

Prefetching is a common technique that is employed when iterating over multiple large files. When prefetching, an application will start to download one or more subsequent files in parallel to its processing the current file. In this way the application can avoid the download delay for all but the very first file. Effective prefetching requires appropriate tuning to reach the most optimal results. Prefetching data from the cloud is employed by many frameworks for accelerating the speed of data ingestion. For example, both PyTorch and TensorFlow support prefetching training-data files for optimizing deep learning training.

Streaming Data from S3

Some applications may be willing to compromise on the average read-time-per-sample in exchange for a low time-to-first-sample. In this section we demonstrate a Boto3 option for streaming the file from S3 in such a way that allows us to begin processing it before completing the file download. The method we describe involves creating a Linux FIFO pipe and feeding it into the Boto3 download_fileobj API:

import os, boto3, time, multiprocessing as mp

bucket = '<s3 bucket name>'
key = '<s3 key>'

t0 = time.time()

os.mkfifo(local_path)

def stream_file():
s3 = boto3.client('s3')
with open(local_path, 'wb') as f:
s3.download_fileobj(bucket, key, f)


proc = mp.Process(target=stream_file)
proc.start()

with open(local_path, 'rb') as f:
read_sequential(f, t0)

print(f'total time: {time.time()-t0}')

Sure enough, the average time-to-first-sample dropped all the way down to ~2.31 seconds (from more than 20). On the other hand, both the average time-per-sample and the total file processing time increased to ~0.01 seconds and ~24.7 seconds, respectively.

Reading Data from Arbitrary Offsets

Some applications require the ability to read only select portions of the file at arbitrary offsets. For these use cases, downloading the entire file can be extremely wasteful. Here we show how to download specific byte-ranges of the file using the Boto3 get_object data streaming API. The code block below demonstrates the use of the API for both streaming the entire file and reading arbitrary data chunks.

import boto3, time

bucket = '<s3 bucket name>'
key = '<s3 key>'

s3 = boto3.client('s3')

# stream entire file
t0 = time.time()
response = s3.get_object(
Bucket=bucket,
Key=key
)
f = response['Body']
read_sequential(f,t0)

print(f'total time: {time.time()-t0}')

# fast forward
total = 10
t0 = time.time()

for i in range(total):
response = s3.get_object(
Bucket=bucket,
Key=key,
Range=f'bytes={i*100*MB}-{i*100*MB+MB-1}'
)
f = response['Body']
x = f.read()

print(f'time of avg random read: {(time.time() - t0)/total}')

While this solution results in a time-to-first-sample result of ~1.37 seconds, the total file processing time (~119 seconds) renders it unusable for sequential reading. Its value is demonstrated by the average time to read arbitrary samples — ~0.191 seconds.

Note that our example did not take advantage of the fact that the arbitrary offsets were predetermined. A real-world application would use this information to prefetch file segments and boost performance.

As mentioned above, pulling large files from the cloud efficiently relies on parallel multi-part downloading. One way to implement this is by using the Boto3 get_object API to read disjoint file chunks in the manner just shown.

Filtering Data with Amazon S3 Select

Sometimes the partial data we are seeking is a small number of rows and/or columns from a large file that is stored in a CSV, JSON, or Apache Parquet file format. In such cases, we can simply apply an SQL filter using a dedicated service such as Amazon S3 Select. To run an SQL filter to multiple files, you may consider a service such as Amazon Athena. Both services allow you to limit your data retrieval to the specific information you need and avoid high overhead of having to pull one or more large files. Be sure to check out the documentation to learn more.

Mounting S3 Data Using Goofys

All of the solutions we have covered until now have involved explicitly pulling data directly from cloud storage. Other solutions expose the cloud storage access to the application as a (POSIX-like) file system. Goofys is a popular FUSE based library for reading from Amazon S3. The command below demonstrates how to mount an S3 bucket to a local file path.

goofys -o ro -f <s3 bucket name> <local path>

Once the goofys mount has been configured, the application can access the large files by pointing to them via the local path, as shown in the code block below:

bucket = '<s3 bucket name>'
key = '<s3 key>'
mount_dir = '<local goofys mount>'
sequential = True # toggle flag to run fast_forward

t0 = time.time()
with open(f'{mount_dir}/{key}', 'rb') as f:
if sequential:
read_sequential(f, t0)
print(f'total time: {time.time()-t0}')
else:
fast_forward(f)

The goofys-based solution resulted in a time-to-first-sample of ~1.36 seconds, total file processing time of ~27.6 seconds, and an average time of ~0.149 seconds for reading arbitrary samples.

It should be noted that under the hood, goofys attempts to optimize the response time even at the expense of additional calls to Amazon S3 (e.g., by prefetching data chunks even before they are requested). Depending on your setup and details of your application’s data access pattern, this could result in slightly higher Amazon S3 costs relative to other solutions. Goofys includes a number of settings for controlling its behavior, such as the “ — cheap” flag for trading performance for potentially lower costs. Note that these controls are applied when defining the mount. Contrary to the Boto3 based solutions, goofys does not enable you to tune the controls (e.g., chunk size, prefetching behavior, etc.) during runtime to support varying data ingestion patterns.

Another point to be aware of is that the portions of data read by goofys are cached. Thus, if you read the same portion of data a second time while it is still in the cache, the response will be immediate. Keep this in mind when running your tests and be sure to reset the goofys mount before each experiment.

Be sure to check out the documentation for more details on how goofys works, how to control its behavior, its limitations, and more.

Comparative Results

The following table summarizes the results of the experiments that we ran:

Comparative Results of Pulling 2 GB File from S3 (by Author)

The results indicate that the best method for ingesting a large data file is likely to depend on the specific needs of the application. Based on these results, an application that seeks to minimize the time-to-first-sample would benefit most from a goofys based solution, while an application that seeks to minimize the total processing time would opt for downloading the file to a local disk using Boto3.

We strongly caution against relying on this table to make any decisions for your own applications. The tests were run in a very specific setup with the versions of the tools available at the time of this writing. The comparative results are likely to vary greatly based on: the setup, the network load, the distance from the cloud storage facility, the versions of the utilities used, the resource requirements of the application, and more.

Our results did not account for differences in the utilization of CPU, memory, and other system resources between the different solutions. In practice, the load on the system resources should be taken into consideration as they may impact the application behavior.

Be sure to run your own in-depth, use-case driven experiments before making any design decisions.

Summary

In this post we have covered the topic of pulling big data files from cloud storage. We have seen that the best approach will likely vary based on the particular needs of the data application in question. Our discussion has highlighted a theme that is common across all of our posts covering cloud based services — while the cloud opens up a wide range of new opportunities for technological development and advancement, it also introduces quite a number of unique and exciting challenges, and compels us to rethink common application design principles.

As usual, comments, questions, and corrections are more than welcome.

--

--

I am a Machine Learning Algorithm Developer working on Autonomous Vehicle technologies at Mobileye. The views expressed in my posts are my own.