Skip to content

Commit

Permalink
Fix snapshot version used to log CommitStats
Browse files Browse the repository at this point in the history
I found an issue when we were reporting commit stats after a checkpoint for large commits. We were grabbing the current snapshot instead of the one resulting from our commit, which could result in incorrect stats being logged (since the current snapshot can easily change while the checkpoint is in progress).

Existing tests

GitOrigin-RevId: feb0af2101e630fbc0b9b748db5f4f39e386ca73
  • Loading branch information
husseinnagr-db authored and scottsand-db committed Apr 26, 2022
1 parent 47f50a2 commit 5a829a2
Showing 1 changed file with 4 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit.NANOSECONDS

import scala.util.control.NonFatal

import org.apache.spark.sql.delta.{CommitStats, DeltaErrors, DeltaLog, DeltaOperations, DeltaTableIdentifier, OptimisticTransaction, Serializable}
import org.apache.spark.sql.delta.{CommitStats, DeltaErrors, DeltaLog, DeltaOperations, DeltaTableIdentifier, OptimisticTransaction, Serializable, Snapshot}
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.files.TahoeBatchFileIndex
import org.apache.spark.sql.delta.metering.DeltaLogging
Expand Down Expand Up @@ -223,7 +223,7 @@ trait DeltaCommand extends DeltaLogging {
spark: SparkSession,
deltaLog: DeltaLog,
commitSize: Int,
attemptVersion: Long): Unit = {
attemptVersion: Long): Snapshot = {
val currentSnapshot = deltaLog.update()
if (currentSnapshot.version != attemptVersion) {
throw new IllegalStateException(
Expand All @@ -234,6 +234,7 @@ trait DeltaCommand extends DeltaLogging {
logInfo(s"Committed delta #$attemptVersion to ${deltaLog.logPath}. Wrote $commitSize actions.")

deltaLog.checkpoint(currentSnapshot)
currentSnapshot
}

/**
Expand Down Expand Up @@ -311,8 +312,7 @@ trait DeltaCommand extends DeltaLogging {
Some(attemptVersion))
val commitTime = System.nanoTime()

updateAndCheckpoint(spark, deltaLog, commitSize, attemptVersion)
val postCommitSnapshot = deltaLog.snapshot
val postCommitSnapshot = updateAndCheckpoint(spark, deltaLog, commitSize, attemptVersion)
val postCommitReconstructionTime = System.nanoTime()
var stats = CommitStats(
startVersion = txn.readVersion,
Expand Down

0 comments on commit 5a829a2

Please sign in to comment.