Table file formats - vacuum: Delta Lake

Versions: Delta Lake 2.4.0

If you have some experience with RDBMS, who doesn't btw, you have probably run a VACUUM command to reclaim the storage space occupied by deleted or obsolete rows. If you're now working with Delta Lake, you can do the same!

Looking for a better data engineering position and skills?

You have been working as a data engineer but feel stuck? You don't have any new challenges and are still writing the same jobs all over again? You have now different options. You can try to look for a new job, now or later, or learn from the others! "Become a Better Data Engineer" initiative is one of these places where you can find online learning resources where the theory meets the practice. They will help you prepare maybe for the next job, or at least, improve your current skillset without looking for something else.

👉 I'm interested in improving my data engineering skillset

See you there, Bartosz

VACUUM 101

The purpose of the command is to remove all untracked files and folders. Although this definition is simple, it hides some complexity because an untracked file is not only a file not belonging to the Delta table anymore. It also applies to all other files and folders you might store in the Delta table location. Unless they follow a special naming convention... You should understand the distinction better after reading the next sections.

Technically, to execute a VACUUM you can invoke either an API or a SQL command:

DeltaTable.vacuum(retentionHours: Double)
DeltaTable.vacuum()

VACUUM ('path') [RETAIN number HOURS] [DRY RUN]

Internally both are handled by the same operation, namely the VacuumCommand.gc where the physical cleaning takes place.

Time travel safety

The code starts by calling this function:

  val snapshotTombstoneRetentionMillis = DeltaLog.tombstoneRetentionMillis(snapshot.metadata)
  val retentionMillis = retentionHours.map(h => TimeUnit.HOURS.toMillis(math.round(h)))
  checkRetentionPeriodSafety(spark, retentionMillis, snapshotTombstoneRetentionMillis)

VACUUM is an irreversible operation on the file system (unless you have some versioning enabled on the object stores) that physically deletes the files. Naturally, it has some implications. First, on the time travel capabilities. Since the files are not there, the VACUUM defines how far a time travel can be in the past. Second, it can also break active writers and even lead to the corrupted Delta table. How? If you specify too low a retention period for the operation and you've an active writer working on those files, his transaction tries to commit on the files already deleted by the VACUUM, the table state will be corrupted. For sure, it might still be working on the real data but you won't be able to restore a past version properly.

That's why the VACUUM should always be at least equal to the delta.deletedFileRetentionDuration property of the table which defaults to 1 week. If you are absolutely sure that a shorter VACUUM retention won't trouble your writers, you can disable this security check by turning the spark.databricks.delta.retentionDurationCheck.enabled flag off.

Files to clean

The most important part of the VACUUM algorithm is selecting the files to clean. Surprisingly, the first action for this part consists of collecting the files that cannot be removed! The list contains:

Below the code snippet responsible for it:

  def gc(spark: SparkSession, deltaLog: DeltaLog, dryRun: Boolean = true, retentionHours: Option[Double] = None, clock: Clock = new SystemClock): DataFrame = {
// ...
      actions.flatMap {
        	_.unwrap match {
          	case tombstone: RemoveFile if tombstone.delTimestamp < deleteBeforeTimestamp =>
            	Nil
          	case fa: FileAction =>
            	getValidRelativePathsAndSubdirs(
              	fa,
              	fs,
              	reservoirBase,
              	relativizeIgnoreError,
              	isBloomFiltered)

This list is materialized as a DataFrame with a path column. Next, the VACUUM command recursively analyzes the content of the cleaned Delta table in the DeltaFileOperations.recursiveListDirs method. The invocation passes 2 filters, one for hidden directories and one for hidden files detection. Both are the instances of this function:

  def isHiddenDirectory(partitionColumnNames: Seq[String], pathName: String): Boolean = {
	// Names of the form partitionCol=[value] are partition directories, and should be
	// GCed even if they'd normally be hidden. The _db_index directory contains (bloom filter)
	// indexes and these must be GCed when the data they are tied to is GCed.
	(pathName.startsWith(".") || pathName.startsWith("_")) &&
  	!pathName.startsWith("_delta_index") && !pathName.startsWith("_change_data") &&
  	!partitionColumnNames.exists(c => pathName.startsWith(c ++ "="))
  }

I'm explicitly mentioning it on purpose because it's the function that determines what will be the files not impacted by the VACUUM. That's why the Delta Lake documentation recommends the following:

The generated list contains both valid Delta files and the untracked ones. In the next step the algorithm applies the first filter and keeps all expired files or all directories.:

val diff = allFilesAndDirs
  .where(col("modificationTime") < deleteBeforeTimestamp || col("isDir"))
// ...

In the next step the operation maps the files and directories into a list of FileNameAndSize. The directories are different from the files since they always have the size of 0 and the isDir flag set to true:

if (fileStatus.isDir) {
  Iterator.single(FileNameAndSize(
    relativize(fileStatus.getHadoopPath, fs, reservoirBase, isDir = true), 0L))
} else {
  val dirs = getAllSubdirs(basePath, fileStatus.path, fs)
  val dirsWithSlash = dirs.map { p =>
    val relativizedPath = relativize(new Path(p), fs, reservoirBase, isDir = true)
    FileNameAndSize(relativizedPath, 0L)
  }
  dirsWithSlash ++ Iterator(
    FileNameAndSize(relativize(
      fileStatus.getHadoopPath, fs, reservoirBase, isDir = false), fileStatus.length))
}

Later, the operation groups all paths and counts the number of occurrences:

.groupBy(col("path")).agg(count(new Column("*")).as("count"), sum("length").as("length"))

That's only in the next step when the VACUUM combines all paths with all valid files' paths. The join is of the left-anti join type meaning that the result contains all entries from the left dataset that are not present in the right dataset. Therefore, we get here the list of untracked files:

.join(validFiles, Seq("path"), "leftanti")

At the end, the algorithm keeps only files and empty directories:

.where(col("count") === 1)

Dry run

As you saw in the SQL snippet, the VACUUM can be run in the dry run mode. It means the operation will only return the list of the impacted files and directories:

val numFiles = diffFiles.count()
if (dryRun) {
  val stats = DeltaVacuumStats(
	isDryRun = true,
	specifiedRetentionMillis = retentionMillis,
	defaultRetentionMillis = snapshotTombstoneRetentionMillis,
	minRetainedTimestamp = deleteBeforeTimestamp,
	dirsPresentBeforeDelete = dirCounts,
	objectsDeleted = numFiles,
	sizeOfDataToDelete = sizeOfDataToDelete,
	timeTakenToIdentifyEligibleFiles = timeTakenToIdentifyEligibleFiles,
	timeTakenForDelete = 0L)

  recordDeltaEvent(deltaLog, "delta.gc.stats", data = stats)
  logConsole(s"Found $numFiles files ($sizeOfDataToDelete bytes) and directories in " +
	s"a total of $dirCounts directories that are safe to delete.")

  return diffFiles.map(f => stringToPath(f).toString).toDF("path")
}

It can be useful when you're not sure about the Delta table location's content or simply want to estimate the impact of the operation on your infrastructure. You can also use the list as a part of the manually validated CI/CD pipeline and let the users decide on the deletion or not if they see untracked files that shouldn't be removed but somehow landed to the Delta table location.

Cleaning

The dry run doesn't have any physical impact on the files but the cleaning does. It starts by recording a VACUUM START event to the commit log with the metrics such as number of deleted files (numFilesToDelete) or the size of the deleted files (sizeOfDataToDelete). This logging activity can be disabled with the vacuum.logging.enabled flag.

Later, the cleaning operation begins:

val filesDeleted = try {
  delete(diffFiles, spark, basePath,
	hadoopConf, parallelDeleteEnabled, parallelDeletePartitions)
} catch {
  case t: Throwable =>
	logVacuumEnd(deltaLog, spark, path)
	throw t
}

The delete action is either executed in parallel (delta.vacuum.parallelDelete.enabled) or sequentially:

if (parallel) {
  diff.repartition(parallelPartitions).mapPartitions { files =>
    val fs = new Path(basePath).getFileSystem(hadoopConf.value.value)
    val filesDeletedPerPartition =
      files.map(p => stringToPath(p)).count(f => tryDeleteNonRecursive(fs, f))
  Iterator(filesDeletedPerPartition)
}.collect().sum
} else {
  val fs = new Path(basePath).getFileSystem(hadoopConf.value.value)
  val fileResultSet = diff.toLocalIterator().asScala
  fileResultSet.map(p => stringToPath(p)).count(f => tryDeleteNonRecursive(fs, f))
}

Modern data systems, the table file formats but more often even streaming brokers with the archive storage (KIP-405: Kafka Tiered Storage), rely on cheap object stores for long term storage. They're cheap indeed but are flat structured and even if they can give you a file system-like experience, there are no directories. Hence, operations like listing may take an incredibly long time if there are many objects stored. That's why VACUUM is a great tool to keep that part manageable and efficient. After all, it removes all which is not needed by the table, so what would be the reason to store it anyway?