Skip to content

Commit

Permalink
Make Snapshot creation resilient to corrupt checkpoint in OSS Delta
Browse files Browse the repository at this point in the history
When a checkpoint is corrupted, OSS Delta will fail to create the Snapshot. This PR improves the Snapshot loading to make it resilient to corrupt checkpoint: when it fails to read a checkpoint, it will try to search an alternative checkpoint and use it to construct the Snapshot. We will retry at most two times by default (attempt to create Snapshot at most three times).

Alternatively, we could make `Snapshot.logSegment` mutable and re-create it when failing to load a checkpoint file. But it's risky as `Snapshot` would become mutable. E.g., if a caller calls `Snapshot.logSegment` first time before touching the corrupt checkpoint file, then after touching the checkpoint file, it calls `Snapshot.logSegment` again, and two calls now will return different `LogSegment`s.

Adds new unit tests.

GitOrigin-RevId: 1013c7831360a02e70e3713ee0ff664869b29188
  • Loading branch information
zsxwing authored and allisonport-db committed Jan 11, 2022
1 parent 1369ea0 commit 00a3abe
Show file tree
Hide file tree
Showing 4 changed files with 524 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,9 @@ trait Checkpoints extends DeltaLogging {
val checkpoints = store.listFrom(
checkpointPrefix(logPath, math.max(0, cur - 1000)),
hadoopConf)
.filter { file => isCheckpointFile(file.getPath) }
// Checkpoint files of 0 size are invalid but Spark will ignore them silently when reading
// such files, hence we drop them so that we never pick up such checkpoints.
.filter { file => isCheckpointFile(file.getPath) && file.getLen != 0 }
.map{ file => CheckpointInstance(file.getPath) }
.takeWhile(tv => (cur == 0 || tv.version <= cur) && tv.isEarlierThan(cv))
.toArray
Expand Down
271 changes: 216 additions & 55 deletions core/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.delta
import java.io.FileNotFoundException

import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.NonFatal

// scalastyle:off import.ordering.noEmptyLine

Expand All @@ -27,7 +28,8 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.FileNames._
import org.apache.hadoop.fs.{FileStatus, Path}

import org.apache.spark.SparkContext
import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.util.ThreadUtils

Expand All @@ -38,6 +40,7 @@ import org.apache.spark.util.ThreadUtils
* - Updating and exposing the latest snapshot of the Delta table in a thread-safe manner
*/
trait SnapshotManagement { self: DeltaLog =>
import SnapshotManagement.verifyDeltaVersions

@volatile private[delta] var asyncUpdateTask: Future[Unit] = _

Expand Down Expand Up @@ -71,6 +74,28 @@ trait SnapshotManagement { self: DeltaLog =>
isCheckpointFile(path) || isDeltaFile(path)
}

/**
* Returns the delta files and checkpoint files starting from the given `startVersion`.
* `versionToLoad` is an optional parameter to set the max bound. It's usually used to load a
* table snapshot for a specific version.
*
* @param startVersion the version to start. Inclusive.
* @param versionToLoad the optional parameter to set the max version we should return. Inclusive.
*/
private final def listDeltaAndCheckpointFiles(
startVersion: Long,
versionToLoad: Option[Long]): Array[FileStatus] = {
listFrom(startVersion)
// Pick up all checkpoint and delta files
.filter { file => isDeltaCommitOrCheckpointFile(file.getPath) }
// Checkpoint files of 0 size are invalid but Spark will ignore them silently when reading
// such files, hence we drop them so that we never pick up such checkpoints.
.filterNot { file => isCheckpointFile(file.getPath) && file.getLen == 0 }
// take files until the version we want to load
.takeWhile(f => versionToLoad.forall(v => getFileVersion(f.getPath) <= v))
.toArray
}

/**
* Get a list of files that can be used to compute a Snapshot at version `versionToLoad`, If
* `versionToLoad` is not provided, will generate the list of files that are needed to load the
Expand All @@ -92,15 +117,7 @@ trait SnapshotManagement { self: DeltaLog =>

// List from the starting checkpoint. If a checkpoint doesn't exist, this will still return
// deltaVersion=0.
val newFiles = listFrom(startCheckpoint.getOrElse(0L))
// Pick up all checkpoint and delta files
.filter { file => isDeltaCommitOrCheckpointFile(file.getPath) }
// filter out files that aren't atomically visible. Checkpoint files of 0 size are invalid
.filterNot { file => isCheckpointFile(file.getPath) && file.getLen == 0 }
// take files until the version we want to load
.takeWhile(f => versionToLoad.forall(v => getFileVersion(f.getPath) <= v))
.toArray

val newFiles = listDeltaAndCheckpointFiles(startCheckpoint.getOrElse(0L), versionToLoad)
if (newFiles.isEmpty && startCheckpoint.isEmpty) {
throw DeltaErrors.emptyDirectoryException(logPath.toString)
} else if (newFiles.isEmpty) {
Expand All @@ -120,11 +137,17 @@ trait SnapshotManagement { self: DeltaLog =>
// delta version can be 0.
if (startCheckpoint.isDefined) {
// `startCheckpoint` was given but no checkpoint found on delta log. This means that the
// last checkpoint we thought should exist (the `_last_checkpoint` file) no longer exists
// last checkpoint we thought should exist (the `_last_checkpoint` file) no longer exists.
// Try to look up another valid checkpoint and create `LogSegment` from it.
recordDeltaEvent(this, "delta.checkpoint.error.partial")
throw DeltaErrors.missingPartFilesException(
startCheckpoint.get, new FileNotFoundException(
s"Checkpoint file to load version: ${startCheckpoint.get} is missing."))
val alternativeLogSegment = getLogSegmentWithMaxExclusiveCheckpointVersion(
snapshotVersion = versionToLoad.getOrElse(deltaVersion(deltas.last.getPath)),
startCheckpoint.get)
return alternativeLogSegment.getOrElse {
throw DeltaErrors.missingPartFilesException(
startCheckpoint.get, new FileNotFoundException(
s"Checkpoint file to load version: ${startCheckpoint.get} is missing."))
}
}
-1L
}
Expand All @@ -138,15 +161,11 @@ trait SnapshotManagement { self: DeltaLog =>
val deltaVersions = deltasAfterCheckpoint.map(f => deltaVersion(f.getPath))
// We may just be getting a checkpoint file after the filtering
if (deltaVersions.nonEmpty) {
verifyDeltaVersions(deltaVersions)
if (deltaVersions.head != newCheckpointVersion + 1) {
throw DeltaErrors.logFileNotFoundException(
deltaFile(logPath, newCheckpointVersion + 1), deltaVersions.last, metadata)
}
versionToLoad.foreach { version =>
require(deltaVersions.last == version,
s"Did not get the last delta file version: $version to compute Snapshot")
}
verifyDeltaVersions(spark, deltaVersions, Some(newCheckpointVersion + 1), versionToLoad)
}

val newVersion = deltaVersions.lastOption.getOrElse(newCheckpoint.get.version)
Expand All @@ -162,6 +181,12 @@ trait SnapshotManagement { self: DeltaLog =>

// In the case where `deltasAfterCheckpoint` is empty, `deltas` should still not be empty,
// they may just be before the checkpoint version unless we have a bug in log cleanup.
if (deltas.isEmpty) {
throw new IllegalStateException(s"Could not find any delta files for version $newVersion")
}
if (versionToLoad.exists(_ != newVersion)) {
throw new IllegalStateException(s"Trying to load a non-existent version ${versionToLoad.get}")
}
val lastCommitTimestamp = deltas.last.getModificationTime

LogSegment(
Expand All @@ -184,10 +209,7 @@ trait SnapshotManagement { self: DeltaLog =>
val startCheckpoint = segment.checkpointVersion
.map(v => s" starting from checkpoint $v.").getOrElse(".")
logInfo(s"Loading version ${segment.version}$startCheckpoint")
val snapshot = createSnapshot(
segment,
minFileRetentionTimestamp,
segment.lastCommitTimestamp)
val snapshot = createSnapshot(segment, minFileRetentionTimestamp)

lastUpdateTimestamp = clock.getTimeMillis()
logInfo(s"Returning initial snapshot $snapshot")
Expand All @@ -204,18 +226,148 @@ trait SnapshotManagement { self: DeltaLog =>
def snapshot: Snapshot = currentSnapshot

protected def createSnapshot(
segment: LogSegment,
minFileRetentionTimestamp: Long,
timestamp: Long): Snapshot = {
val checksumOpt = readChecksum(segment.version)
new Snapshot(
logPath,
segment.version,
segment,
minFileRetentionTimestamp,
this,
timestamp,
checksumOpt)
initSegment: LogSegment,
minFileRetentionTimestamp: Long): Snapshot = {
val checksumOpt = readChecksum(initSegment.version)
createSnapshotFromGivenOrEquivalentLogSegment(initSegment) { segment =>
new Snapshot(
logPath,
segment.version,
segment,
minFileRetentionTimestamp,
this,
segment.lastCommitTimestamp,
checksumOpt)
}
}

/**
* Returns a [[LogSegment]] for reading `snapshotVersion` such that the segment's checkpoint
* version (if checkpoint present) is LESS THAN `maxExclusiveCheckpointVersion`.
* This is useful when trying to skip a bad checkpoint. Returns `None` when we are not able to
* construct such [[LogSegment]], for example, no checkpoint can be used but we don't have the
* entire history from version 0 to version `snapshotVersion`.
*/
private def getLogSegmentWithMaxExclusiveCheckpointVersion(
snapshotVersion: Long,
maxExclusiveCheckpointVersion: Long): Option[LogSegment] = {
assert(
snapshotVersion >= maxExclusiveCheckpointVersion,
s"snapshotVersion($snapshotVersion) is less than " +
s"maxExclusiveCheckpointVersion($maxExclusiveCheckpointVersion)")
val largestCheckpointVersionToSearch = snapshotVersion.min(maxExclusiveCheckpointVersion - 1)
val previousCp = if (largestCheckpointVersionToSearch < 0) {
None
} else {
findLastCompleteCheckpoint(
// The largest possible `CheckpointInstance` at version `largestCheckpointVersionToSearch`
CheckpointInstance(largestCheckpointVersionToSearch, numParts = Some(Int.MaxValue)))
}
previousCp match {
case Some(cp) =>
val filesSinceCheckpointVersion = listDeltaAndCheckpointFiles(
startVersion = cp.version,
versionToLoad = Some(snapshotVersion))
val (checkpoints, deltas) =
filesSinceCheckpointVersion.partition(f => isCheckpointFile(f.getPath))
if (deltas.isEmpty) {
// We cannot find any delta files. Returns None as we cannot construct a `LogSegment` only
// from checkpoint files. This is because in order to create a `LogSegment`, we need to
// set `LogSegment.lastCommitTimestamp`, and it must be read from the file modification
// time of the delta file for `snapshotVersion`. It cannot be the file modification time
// of a checkpoint file because it should be deterministic regardless how we construct the
// Snapshot, and only delta json log files can ensure that.
return None
}
// `checkpoints` may contain multiple checkpoints for different part sizes, we need to
// search `FileStatus`s of the checkpoint files for `cp`.
val checkpointFileNames = cp.getCorrespondingFiles(logPath).map(_.getName).toSet
val newCheckpointFiles =
checkpoints.filter(f => checkpointFileNames.contains(f.getPath.getName))
assert(newCheckpointFiles.length == checkpointFileNames.size,
"Failed in getting the file information for:\n" +
checkpointFileNames.mkString(" -", "\n -", "") + "\n" +
"among\n" + checkpoints.map(_.getPath).mkString(" -", "\n -", ""))
// Create the list of `FileStatus`s for delta files after `cp.version`.
val deltasAfterCheckpoint = deltas.filter { file =>
deltaVersion(file.getPath) > cp.version
}
val deltaVersions = deltasAfterCheckpoint.map(f => deltaVersion(f.getPath))
// `deltaVersions` should not be empty and `verifyDeltaVersions` will verify it
try {
verifyDeltaVersions(spark, deltaVersions, Some(cp.version + 1), Some(snapshotVersion))
} catch {
case NonFatal(e) =>
logWarning(s"Failed to find a valid LogSegment for $snapshotVersion", e)
return None
}
Some(LogSegment(
logPath,
snapshotVersion,
deltas,
newCheckpointFiles,
Some(cp.version),
deltas.last.getModificationTime))
case None =>
val deltas =
listDeltaAndCheckpointFiles(startVersion = 0, versionToLoad = Some(snapshotVersion))
.filter(file => isDeltaFile(file.getPath))
val deltaVersions = deltas.map(f => deltaVersion(f.getPath))
try {
verifyDeltaVersions(spark, deltaVersions, Some(0), Some(snapshotVersion))
} catch {
case NonFatal(e) =>
logWarning(s"Failed to find a valid LogSegment for $snapshotVersion", e)
return None
}
Some(LogSegment(
logPath = logPath,
version = snapshotVersion,
deltas = deltas,
checkpoint = Nil,
checkpointVersion = None,
lastCommitTimestamp = deltas.last.getModificationTime))
}
}

/**
* Create a [[Snapshot]] from the given [[LogSegment]]. If failing to create the snapshot, we will
* search an equivalent [[LogSegment]] using a different checkpoint and retry up to
* [[DeltaSQLConf.DELTA_SNAPSHOT_LOADING_MAX_RETRIES]] times.
*/
protected def createSnapshotFromGivenOrEquivalentLogSegment(
initSegment: LogSegment)(snapshotCreator: LogSegment => Snapshot): Snapshot = {
val numRetries =
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_SNAPSHOT_LOADING_MAX_RETRIES)
var attempt = 0
var segment = initSegment
// Remember the first error we hit. If all retries fail, we will throw the first error to
// provide the root cause. We catch `SparkException` because corrupt checkpoint files are
// detected in the executor side when a task is trying to read them.
var firstError: SparkException = null
while (true) {
try {
return snapshotCreator(segment)
} catch {
case e: SparkException if attempt < numRetries && segment.checkpointVersion.nonEmpty =>
if (firstError == null) {
firstError = e
}
logWarning(s"Failed to create a snapshot from log segment: $segment. " +
s"Trying a different checkpoint.", e)
segment = getLogSegmentWithMaxExclusiveCheckpointVersion(
segment.version,
segment.checkpointVersion.get).getOrElse {
// Throw the first error if we cannot find an equivalent `LogSegment`.
throw firstError
}
attempt += 1
case e: SparkException if firstError != null =>
logWarning(s"Failed to create a snapshot from log segment: $segment", e)
throw firstError
}
}
throw new IllegalStateException("should not happen")
}

/** Checks if the snapshot of the table has surpassed our allowed staleness. */
Expand Down Expand Up @@ -292,10 +444,7 @@ trait SnapshotManagement { self: DeltaLog =>
.map(v => s" starting from checkpoint version $v.").getOrElse(".")
logInfo(s"Loading version ${segment.version}$startingFrom")

val newSnapshot = createSnapshot(
segment,
minFileRetentionTimestamp,
segment.lastCommitTimestamp)
val newSnapshot = createSnapshot(segment, minFileRetentionTimestamp)

if (currentSnapshot.version > -1 &&
currentSnapshot.metadata.id != newSnapshot.metadata.id) {
Expand Down Expand Up @@ -348,22 +497,7 @@ trait SnapshotManagement { self: DeltaLog =>
.orElse(findLastCompleteCheckpoint(CheckpointInstance(version, None)))
val segment = getLogSegmentForVersion(startingCheckpoint.map(_.version), Some(version))

createSnapshot(
segment,
minFileRetentionTimestamp,
segment.lastCommitTimestamp)
}

/**
* Verify the versions are contiguous.
*/
protected def verifyDeltaVersions(versions: Array[Long]): Unit = {
// Turn this to a vector so that we can compare it with a range.
val deltaVersions = versions.toVector
if (deltaVersions.nonEmpty &&
(deltaVersions.head to deltaVersions.last) != deltaVersions) {
throw DeltaErrors.deltaVersionsNotContiguousException(self.spark, deltaVersions)
}
createSnapshot(segment, minFileRetentionTimestamp)
}
}

Expand All @@ -372,6 +506,33 @@ object SnapshotManagement {
val tpe = ThreadUtils.newDaemonCachedThreadPool("delta-state-update", 8)
ExecutionContext.fromExecutorService(tpe)
}

/**
* - Verify the versions are contiguous.
* - Verify the versions start with `expectedStartVersion` if it's specified.
* - Verify the versions end with `expectedEndVersion` if it's specified.
*/
def verifyDeltaVersions(
spark: SparkSession,
versions: Array[Long],
expectedStartVersion: Option[Long],
expectedEndVersion: Option[Long]): Unit = {
if (versions.nonEmpty) {
// Turn this to a vector so that we can compare it with a range.
val deltaVersions = versions.toVector
if ((deltaVersions.head to deltaVersions.last) != deltaVersions) {
throw DeltaErrors.deltaVersionsNotContiguousException(spark, deltaVersions)
}
}
expectedStartVersion.foreach { v =>
require(versions.nonEmpty && versions.head == v, "Did not get the first delta " +
s"file version: $v to compute Snapshot")
}
expectedEndVersion.foreach { v =>
require(versions.nonEmpty && versions.last == v, "Did not get the first delta " +
s"file version: $v to compute Snapshot")
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,15 @@ trait DeltaSQLConfBase {
.checkValue(n => n > 0, "Delta snapshot partition number must be positive.")
.createOptional

val DELTA_SNAPSHOT_LOADING_MAX_RETRIES =
buildConf("snapshotLoading.maxRetries")
.internal()
.doc("How many times to retry when failing to load a snapshot. Each retry will try to use " +
"a different checkpoint in order to skip potential corrupt checkpoints.")
.intConf
.checkValue(n => n >= 0, "must not be negative.")
.createWithDefault(2)

val DELTA_PARTITION_COLUMN_CHECK_ENABLED =
buildConf("partitionColumnValidity.enabled")
.internal()
Expand Down
Loading

0 comments on commit 00a3abe

Please sign in to comment.