Skip to content

Commit

Permalink
Expand metrics for DeleteCommand
Browse files Browse the repository at this point in the history
In this PR, we add 13 new metrics to usage logs for the `DELETE` command:
 * `numRemovedFiles`: how many files we removed. Alias for `numTouchedFiles`
 * `numAddedFiles`: how many files we added. Alias for `numRewrittenFiles`
 * `numFilesBeforeSkipping`: how many candidate files before data skipping
 * `numBytesBeforeSkipping`: how many candidate bytes before data skipping
 * `numFilesAfterSkipping`: how many candidate files after data skipping
 * `numBytesAfterSkipping`: how many candidate bytes after data skipping
 * `numPartitionsAfterSkipping`: how many candidate partitions after data skipping
 * `numPartitionsAddedTo`: how many new partitions were added
 *  `numPartitionsRemovedFrom`: how many partitions were removed
 *  `numCopiedRows`: how many rows were copied
 * `numDeletedRows`: how many rows were deleted
 * `numBytesAdded`: how many bytes were added
 * `numBytesRemoved`: how many bytes were removed

Unit tests.

GitOrigin-RevId: e6cad814b21521e16b377e2da52ea89479eb5439
  • Loading branch information
c27kwan authored and scottsand-db committed Mar 3, 2022
1 parent fcdaf67 commit 1c60f88
Showing 1 changed file with 143 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package org.apache.spark.sql.delta.commands

import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions.Action
import org.apache.spark.sql.delta.commands.MergeIntoCommand.totalBytesAndDistinctPartitionValues
import org.apache.spark.sql.delta.files.TahoeBatchFileIndex
import com.fasterxml.jackson.databind.annotation.JsonDeserialize

import org.apache.spark.SparkContext
import org.apache.spark.sql.{Column, Dataset, Row, SparkSession}
Expand Down Expand Up @@ -56,6 +58,16 @@ case class DeleteCommand(
"numRemovedFiles" -> createMetric(sc, "number of files removed."),
"numAddedFiles" -> createMetric(sc, "number of files added."),
"numDeletedRows" -> createMetric(sc, "number of rows deleted."),
"numFilesBeforeSkipping" -> createMetric(sc, "number of files before skipping"),
"numBytesBeforeSkipping" -> createMetric(sc, "number of bytes before skipping"),
"numFilesAfterSkipping" -> createMetric(sc, "number of files after skipping"),
"numBytesAfterSkipping" -> createMetric(sc, "number of bytes after skipping"),
"numPartitionsAfterSkipping" -> createMetric(sc, "number of partitions after skipping"),
"numPartitionsAddedTo" -> createMetric(sc, "number of partitions added"),
"numPartitionsRemovedFrom" -> createMetric(sc, "number of partitions removed"),
"numCopiedRows" -> createMetric(sc, "number of rows copied"),
"numBytesAdded" -> createMetric(sc, "number of bytes added"),
"numBytesRemoved" -> createMetric(sc, "number of bytes removed"),
"executionTimeMs" -> createMetric(sc, "time taken to execute the entire operation"),
"scanTimeMs" -> createMetric(sc, "time taken to scan the files for matches"),
"rewriteTimeMs" -> createMetric(sc, "time taken to rewrite the matched files")
Expand Down Expand Up @@ -84,10 +96,21 @@ case class DeleteCommand(
txn: OptimisticTransaction): Seq[Action] = {
import sparkSession.implicits._

var numTouchedFiles: Long = 0
var numRewrittenFiles: Long = 0
var numRemovedFiles: Long = 0
var numAddedFiles: Long = 0
var scanTimeMs: Long = 0
var rewriteTimeMs: Long = 0
var numBytesAdded: Long = 0
var numBytesRemoved: Long = 0
var numFilesBeforeSkipping: Long = 0
var numBytesBeforeSkipping: Long = 0
var numFilesAfterSkipping: Long = 0
var numBytesAfterSkipping: Long = 0
var numPartitionsAfterSkipping: Option[Long] = None
var numPartitionsRemovedFrom: Option[Long] = None
var numPartitionsAddedTo: Option[Long] = None
var numDeletedRows: Option[Long] = None
var numCopiedRows: Option[Long] = None

val startTime = System.nanoTime()
val numFilesTotal = deltaLog.snapshot.numOfFiles
Expand All @@ -97,9 +120,17 @@ case class DeleteCommand(
// Case 1: Delete the whole table if the condition is true
val allFiles = txn.filterFiles(Nil)

numTouchedFiles = allFiles.size
numRemovedFiles = allFiles.size
scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000

val (numBytes, numPartitions) = totalBytesAndDistinctPartitionValues(allFiles)
numBytesRemoved = numBytes
numFilesBeforeSkipping = numRemovedFiles
numBytesBeforeSkipping = numBytes
numFilesAfterSkipping = numRemovedFiles
numBytesAfterSkipping = numBytes
if (txn.metadata.partitionColumns.nonEmpty) {
numPartitionsAfterSkipping = Some(numPartitions)
}
val operationTimestamp = System.currentTimeMillis()
metrics("numRemovedFiles").set(allFiles.size)
allFiles.map(_.removeWithTimestamp(operationTimestamp))
Expand All @@ -108,22 +139,40 @@ case class DeleteCommand(
DeltaTableUtils.splitMetadataAndDataPredicates(
cond, txn.metadata.partitionColumns, sparkSession)

numFilesBeforeSkipping = txn.snapshot.numOfFiles
numBytesBeforeSkipping = txn.snapshot.sizeInBytes

if (otherPredicates.isEmpty) {
// Case 2: The condition can be evaluated using metadata only.
// Delete a set of files without the need of scanning any data files.
val operationTimestamp = System.currentTimeMillis()
val candidateFiles = txn.filterFiles(metadataPredicates)

scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000
numTouchedFiles = candidateFiles.size

metrics("numRemovedFiles").set(numTouchedFiles)
numRemovedFiles = candidateFiles.size
numBytesRemoved = candidateFiles.map(_.size).sum
numFilesAfterSkipping = candidateFiles.size
val (numCandidateBytes, numCandidatePartitions) =
totalBytesAndDistinctPartitionValues(candidateFiles)
numBytesAfterSkipping = numCandidateBytes
if (txn.metadata.partitionColumns.nonEmpty) {
numPartitionsAfterSkipping = Some(numCandidatePartitions)
}
metrics("numRemovedFiles").set(numRemovedFiles)
candidateFiles.map(_.removeWithTimestamp(operationTimestamp))
} else {
// Case 3: Delete the rows based on the condition.
val candidateFiles = txn.filterFiles(metadataPredicates ++ otherPredicates)

numTouchedFiles = candidateFiles.size
numFilesAfterSkipping = candidateFiles.size
val (numCandidateBytes, numCandidatePartitions) =
totalBytesAndDistinctPartitionValues(candidateFiles)
numBytesAfterSkipping = numCandidateBytes
if (txn.metadata.partitionColumns.nonEmpty) {
numPartitionsAfterSkipping = Some(numCandidatePartitions)
}

numRemovedFiles = candidateFiles.size
val nameToAddFileMap = generateCandidateFileMap(deltaLog.dataPath, candidateFiles)

val fileIndex = new TahoeBatchFileIndex(
Expand All @@ -139,7 +188,7 @@ case class DeleteCommand(
}.asNondeterministic()
val filesToRewrite =
withStatusCode("DELTA", s"Finding files to rewrite for DELETE operation") {
if (numTouchedFiles == 0) {
if (numRemovedFiles == 0) {
Array.empty[String]
} else {
data
Expand All @@ -163,47 +212,85 @@ case class DeleteCommand(
// Keep everything from the resolved target except a new TahoeFileIndex
// that only involves the affected files instead of all files.
val newTarget = DeltaTableUtils.replaceFileIndex(target, baseRelation.location)

val copiedRowCount = metrics("numCopiedRows")
val copiedRowUdf = udf { () =>
copiedRowCount += 1
true
}.asNondeterministic()
val targetDF = Dataset.ofRows(sparkSession, newTarget)
val filterCond = Not(EqualNullSafe(cond, Literal.TrueLiteral))
val updatedDF = targetDF.filter(new Column(filterCond))
.filter(copiedRowUdf())

val rewrittenFiles = withStatusCode(
"DELTA", s"Rewriting ${filesToRewrite.size} files for DELETE operation") {
txn.writeFiles(updatedDF)
}

numRewrittenFiles = rewrittenFiles.size
val removedFiles = filesToRewrite.map(f =>
getTouchedFile(deltaLog.dataPath, f, nameToAddFileMap))
val (removedBytes, removedPartitions) =
totalBytesAndDistinctPartitionValues(removedFiles)
numBytesRemoved = removedBytes
val (rewrittenBytes, rewrittenPartitions) =
totalBytesAndDistinctPartitionValues(rewrittenFiles)
numBytesAdded = rewrittenBytes
if (txn.metadata.partitionColumns.nonEmpty) {
numPartitionsRemovedFrom = Some(removedPartitions)
numPartitionsAddedTo = Some(rewrittenPartitions)
}
numAddedFiles = rewrittenFiles.size
rewriteTimeMs = (System.nanoTime() - startTime) / 1000 / 1000 - scanTimeMs
numDeletedRows = Some(metrics("numDeletedRows").value)
numCopiedRows = Some(metrics("numCopiedRows").value)

val operationTimestamp = System.currentTimeMillis()
removeFilesFromPaths(deltaLog, nameToAddFileMap, filesToRewrite, operationTimestamp) ++
rewrittenFiles
}
}
}
if (deleteActions.nonEmpty) {
metrics("numAddedFiles").set(numRewrittenFiles)
val executionTimeMs = (System.nanoTime() - startTime) / 1000 / 1000
metrics("executionTimeMs").set(executionTimeMs)
metrics("scanTimeMs").set(scanTimeMs)
metrics("rewriteTimeMs").set(rewriteTimeMs)
txn.registerSQLMetrics(sparkSession, metrics)
// This is needed to make the SQL metrics visible in the Spark UI
val executionId = sparkSession.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(
sparkSession.sparkContext, executionId, metrics.values.toSeq)
}
metrics("numAddedFiles").set(numAddedFiles)
val executionTimeMs = (System.nanoTime() - startTime) / 1000 / 1000
metrics("executionTimeMs").set(executionTimeMs)
metrics("scanTimeMs").set(scanTimeMs)
metrics("rewriteTimeMs").set(rewriteTimeMs)
metrics("numBytesAdded").set(numBytesAdded)
metrics("numBytesRemoved").set(numBytesRemoved)
metrics("numFilesBeforeSkipping").set(numFilesBeforeSkipping)
metrics("numBytesBeforeSkipping").set(numBytesBeforeSkipping)
metrics("numFilesAfterSkipping").set(numFilesAfterSkipping)
metrics("numBytesAfterSkipping").set(numBytesAfterSkipping)
numPartitionsAfterSkipping.foreach(metrics("numPartitionsAfterSkipping").set)
numPartitionsAddedTo.foreach(metrics("numPartitionsAddedTo").set)
numPartitionsRemovedFrom.foreach(metrics("numPartitionsRemovedFrom").set)
txn.registerSQLMetrics(sparkSession, metrics)
// This is needed to make the SQL metrics visible in the Spark UI
val executionId = sparkSession.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(
sparkSession.sparkContext, executionId, metrics.values.toSeq)

recordDeltaEvent(
deltaLog,
"delta.dml.delete.stats",
data = DeleteMetric(
condition = condition.map(_.sql).getOrElse("true"),
numFilesTotal,
numTouchedFiles,
numRewrittenFiles,
numRemovedFiles,
numAddedFiles,
numRemovedFiles,
numAddedFiles,
numAddedChangeFiles = 0,
numFilesBeforeSkipping,
numBytesBeforeSkipping,
numFilesAfterSkipping,
numBytesAfterSkipping,
numPartitionsAfterSkipping,
numPartitionsAddedTo,
numPartitionsRemovedFrom,
numCopiedRows,
numDeletedRows,
numBytesAdded,
numBytesRemoved,
changeFileBytes = 0,
scanTimeMs,
rewriteTimeMs)
Expand Down Expand Up @@ -232,9 +319,22 @@ object DeleteCommand {
*
* @param condition: what was the delete condition
* @param numFilesTotal: how big is the table
* @param numTouchedFiles: how many files did we touch
* @param numRewrittenFiles: how many files had to be rewritten
* @param numTouchedFiles: how many files did we touch. Alias for `numRemovedFiles`
* @param numRewrittenFiles: how many files had to be rewritten. Alias for `numAddedFiles`
* @param numRemovedFiles: how many files we removed. Alias for `numTouchedFiles`
* @param numAddedFiles: how many files we added. Alias for `numRewrittenFiles`
* @param numAddedChangeFiles: how many change files were generated
* @param numFilesBeforeSkipping: how many candidate files before skipping
* @param numBytesBeforeSkipping: how many candidate bytes before skipping
* @param numFilesAfterSkipping: how many candidate files after skipping
* @param numBytesAfterSkipping: how many candidate bytes after skipping
* @param numPartitionsAfterSkipping: how many candidate partitions after skipping
* @param numPartitionsAddedTo: how many new partitions were added
* @param numPartitionsRemovedFrom: how many partitions were removed
* @param numCopiedRows: how many rows were copied
* @param numDeletedRows: how many rows were deleted
* @param numBytesAdded: how many bytes were added
* @param numBytesRemoved: how many bytes were removed
* @param changeFileBytes: total size of change files generated
* @param scanTimeMs: how long did finding take
* @param rewriteTimeMs: how long did rewriting take
Expand All @@ -246,7 +346,22 @@ case class DeleteMetric(
numFilesTotal: Long,
numTouchedFiles: Long,
numRewrittenFiles: Long,
numRemovedFiles: Long,
numAddedFiles: Long,
numAddedChangeFiles: Long,
numFilesBeforeSkipping: Long,
numBytesBeforeSkipping: Long,
numFilesAfterSkipping: Long,
numBytesAfterSkipping: Long,
numPartitionsAfterSkipping: Option[Long],
numPartitionsAddedTo: Option[Long],
numPartitionsRemovedFrom: Option[Long],
@JsonDeserialize(contentAs = classOf[java.lang.Long])
numCopiedRows: Option[Long],
@JsonDeserialize(contentAs = classOf[java.lang.Long])
numDeletedRows: Option[Long],
numBytesAdded: Long,
numBytesRemoved: Long,
changeFileBytes: Long,
scanTimeMs: Long,
rewriteTimeMs: Long
Expand Down

0 comments on commit 1c60f88

Please sign in to comment.