Skip to content

Commit

Permalink
Improve row metrics for UPDATE
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 26ec189bbdc45d795511c6f282e17f911b9a6621
  • Loading branch information
alexoss68 authored and allisonport-db committed Mar 12, 2022
1 parent 2d9968f commit 59b4c8e
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -185,14 +185,6 @@ object DeltaOperations {
override val parameters: Map[String, Any] = predicate.map("predicate" -> _).toMap
override val operationMetrics: Set[String] = DeltaOperationMetrics.UPDATE

override def transformMetrics(metrics: Map[String, SQLMetric]): Map[String, String] = {
val numOutputRows = metrics("numOutputRows").value
val numUpdatedRows = metrics("numUpdatedRows").value
var strMetrics = super.transformMetrics(metrics)
val numCopiedRows = numOutputRows - numUpdatedRows
strMetrics += "numCopiedRows" -> numCopiedRows.toString
strMetrics
}
override def changesData: Boolean = true
}
/** Recorded when the table is created. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ case class UpdateCommand(
"numAddedFiles" -> createMetric(sc, "number of files added."),
"numRemovedFiles" -> createMetric(sc, "number of files removed."),
"numUpdatedRows" -> createMetric(sc, "number of rows updated."),
"numCopiedRows" -> createMetric(sc, "number of rows copied."),
"executionTimeMs" -> createMetric(sc, "time taken to execute the entire operation"),
"scanTimeMs" -> createMetric(sc, "time taken to scan the files for matches"),
"rewriteTimeMs" -> createMetric(sc, "time taken to rewrite the matched files")
Expand Down Expand Up @@ -180,6 +181,7 @@ case class UpdateCommand(
if (metrics("numUpdatedRows").value == 0 && outputRows != 0) {
metrics("numUpdatedRows").set(outputRows)
}
metrics("numCopiedRows").set(outputRows - metrics("numUpdatedRows").value)
txn.registerSQLMetrics(sparkSession, metrics)
txn.commit(actions, DeltaOperations.Update(condition.map(_.toString)))
// This is needed to make the SQL metrics visible in the Spark UI
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -713,81 +713,87 @@ trait DescribeDeltaHistorySuiteBase
}
}

def metricsUpdateTest : Unit = withTempDir { tempDir =>
// Create the initial table as a single file
Seq(1, 2, 5, 11, 21, 3, 4, 6, 9, 7, 8, 0).toDF("key")
.withColumn("value", 'key % 2)
.write
.format("delta")
.save(tempDir.getAbsolutePath)

// append additional data with the same number range to the table.
// This data is saved as a separate file as well
Seq(15, 16, 17).toDF("key")
.withColumn("value", 'key % 2)
.repartition(1)
.write
.format("delta")
.mode("append")
.save(tempDir.getAbsolutePath)
val deltaTable = io.delta.tables.DeltaTable.forPath(spark, tempDir.getAbsolutePath)
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)
deltaLog.snapshot.numOfFiles

// update the table
deltaTable.update(col("key") === lit("16"), Map("value" -> lit("1")))
// The file from the append gets updated but the file from the initial table gets scanned
// as well. We want to make sure numCopied rows is calculated from written files and not
// scanned files[SC-33980]

// get operation metrics
val operationMetrics = getOperationMetrics(deltaTable.history(1))
val expectedMetrics = Map(
"numAddedFiles" -> "1",
"numRemovedFiles" -> "1",
"numUpdatedRows" -> "1",
"numCopiedRows" -> "2" // There should be only three rows in total(updated + copied)
)
checkOperationMetrics(expectedMetrics, operationMetrics, DeltaOperationMetrics.UPDATE)
val expectedTimeMetrics = Set("executionTimeMs", "scanTimeMs", "rewriteTimeMs")
checkOperationTimeMetricsInvariant(expectedTimeMetrics, operationMetrics)
}

test("operation metrics - update") {
withSQLConf(DeltaSQLConf.DELTA_HISTORY_METRICS_ENABLED.key -> "true") {
withTempDir { tempDir =>
// Create the initial table as a single file
Seq(1, 2, 5, 11, 21, 3, 4, 6, 9, 7, 8, 0).toDF("key")
.withColumn("value", 'key % 2)
.write
.format("delta")
.save(tempDir.getAbsolutePath)
withSQLConf((DeltaSQLConf.DELTA_HISTORY_METRICS_ENABLED.key -> "true")) {
metricsUpdateTest
}
}

// append additional data with the same number range to the table.
// This data is saved as a separate file as well
Seq(15, 16, 17).toDF("key")
.withColumn("value", 'key % 2)
.repartition(1)
.write
.format("delta")
.mode("append")
.save(tempDir.getAbsolutePath)
val deltaTable = io.delta.tables.DeltaTable.forPath(spark, tempDir.getAbsolutePath)
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)
deltaLog.snapshot.numOfFiles
def metricsUpdatePartitionedColumnTest : Unit = {
val numRows = 100
val numPartitions = 5
withTempDir { tempDir =>
spark.range(numRows)
.withColumn("c1", 'id + 1)
.withColumn("c2", 'id % numPartitions)
.write
.partitionBy("c2")
.format("delta")
.save(tempDir.getAbsolutePath)

// update the table
deltaTable.update(col("key") === lit("16"), Map("value" -> lit("1")))
// The file from the append gets updated but the file from the initial table gets scanned
// as well. We want to make sure numCopied rows is calculated from written files and not
// scanned files[SC-33980]
val deltaTable = io.delta.tables.DeltaTable.forPath(tempDir.getAbsolutePath)
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)
val numFilesBeforeUpdate = deltaLog.snapshot.numOfFiles
deltaTable.update(col("c2") < 1, Map("c2" -> lit("1")))
val numFilesAfterUpdate = deltaLog.snapshot.numOfFiles

// get operation metrics
val operationMetrics = getOperationMetrics(deltaTable.history(1))
val expectedMetrics = Map(
"numAddedFiles" -> "1",
"numRemovedFiles" -> "1",
"numUpdatedRows" -> "1",
"numCopiedRows" -> "2" // There should be only three rows in total(updated + copied)
)
checkOperationMetrics(expectedMetrics, operationMetrics, DeltaOperationMetrics.UPDATE)
val expectedTimeMetrics = Set("executionTimeMs", "scanTimeMs", "rewriteTimeMs")
checkOperationTimeMetricsInvariant(expectedTimeMetrics, operationMetrics)
}
val operationMetrics = getOperationMetrics(deltaTable.history(1))
val newFiles = numFilesAfterUpdate - numFilesBeforeUpdate
val oldFiles = numFilesBeforeUpdate / numPartitions
val addedFiles = newFiles + oldFiles
val expectedMetrics = Map(
"numUpdatedRows" -> (numRows / numPartitions).toString,
"numCopiedRows" -> "0",
"numAddedFiles" -> addedFiles.toString,
"numRemovedFiles" -> (numFilesBeforeUpdate / numPartitions).toString
)
checkOperationMetrics(expectedMetrics, operationMetrics, DeltaOperationMetrics.UPDATE)
}
}

test("operation metrics - update - partitioned column") {
withSQLConf(DeltaSQLConf.DELTA_HISTORY_METRICS_ENABLED.key -> "true") {
val numRows = 100
val numPartitions = 5
withTempDir { tempDir =>
spark.range(numRows)
.withColumn("c1", 'id + 1)
.withColumn("c2", 'id % numPartitions)
.write
.partitionBy("c2")
.format("delta")
.save(tempDir.getAbsolutePath)

val deltaTable = io.delta.tables.DeltaTable.forPath(tempDir.getAbsolutePath)
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)
val numFilesBeforeUpdate = deltaLog.snapshot.numOfFiles
deltaTable.update(col("c2") < 1, Map("c2" -> lit("1")))
val numFilesAfterUpdate = deltaLog.snapshot.numOfFiles

val operationMetrics = getOperationMetrics(deltaTable.history(1))
val newFiles = numFilesAfterUpdate - numFilesBeforeUpdate
val oldFiles = numFilesBeforeUpdate / numPartitions
val addedFiles = newFiles + oldFiles
val expectedMetrics = Map(
"numUpdatedRows" -> (numRows / numPartitions).toString,
"numCopiedRows" -> "0",
"numAddedFiles" -> addedFiles.toString,
"numRemovedFiles" -> (numFilesBeforeUpdate / numPartitions).toString
)
checkOperationMetrics(expectedMetrics, operationMetrics, DeltaOperationMetrics.UPDATE)
}
withSQLConf((DeltaSQLConf.DELTA_HISTORY_METRICS_ENABLED.key -> "true")) {
metricsUpdatePartitionedColumnTest
}
}

Expand Down

0 comments on commit 59b4c8e

Please sign in to comment.