Skip to content

Commit

Permalink
Remove spark's internal metadata stored intentionally in Delta
Browse files Browse the repository at this point in the history
[SPARK-43123](apache/spark#40776) fixed an issue that Spark might leak internal metadata, which caused Delta to store Spark's internal metadata in its table schema.

Spark's internal metadata may trigger special behaviors. For example, if a column metadata has `__metadata_col`, it cannot be selected by star. If we leak `__metadata_col` to any column in a Delta table, we won't be able to query this column when using `SELECT *`.

Although [SPARK-43123](apache/spark#40776) fixes the issue in new Spark versions, we might have already persisted internal metadata in some Delta tables. To make these Delta tables readable again, this PR adds an extra step to clean up internal metadata before returning the table schema to Spark.

GitOrigin-RevId: 60eb4046d55e955379c98e409993b33e753c5256
  • Loading branch information
zsxwing authored and vkorukanti committed Apr 20, 2023
1 parent 30fa2c5 commit 81c7a58
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 8 deletions.
7 changes: 4 additions & 3 deletions core/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -489,12 +489,13 @@ class DeltaLog private(

val relation = HadoopFsRelation(
fileIndex,
partitionSchema = DeltaColumnMapping.dropColumnMappingMetadata(partitionSchema),
partitionSchema = DeltaColumnMapping.dropColumnMappingMetadata(
DeltaTableUtils.removeInternalMetadata(spark, partitionSchema)),
// We pass all table columns as `dataSchema` so that Spark will preserve the partition column
// locations. Otherwise, for any partition columns not in `dataSchema`, Spark would just
// append them to the end of `dataSchema`.
dataSchema = DeltaColumnMapping.dropColumnMappingMetadata(
ColumnWithDefaultExprUtils.removeDefaultExpressions(metadata.schema)),
DeltaTableUtils.removeInternalMetadata(spark, metadata.schema)),
bucketSpec = None,
fileFormat(snapshot.protocol, metadata),
hadoopOptions)(spark)
Expand Down Expand Up @@ -542,7 +543,7 @@ class DeltaLog private(
// locations. Otherwise, for any partition columns not in `dataSchema`, Spark would just
// append them to the end of `dataSchema`
dataSchema = DeltaColumnMapping.dropColumnMappingMetadata(
ColumnWithDefaultExprUtils.removeDefaultExpressions(
DeltaTableUtils.removeInternalMetadata(spark,
SchemaUtils.dropNullTypeColumns(snapshotToUse.metadata.schema))),
bucketSpec = bucketSpec,
fileFormat(snapshotToUse.protocol, snapshotToUse.metadata),
Expand Down
51 changes: 50 additions & 1 deletion core/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.util.{Failure, Success, Try}

import org.apache.spark.sql.delta.files.{TahoeFileIndex, TahoeLogFileIndex}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSourceUtils
import org.apache.spark.sql.delta.sources.{DeltaSourceUtils, DeltaSQLConf}
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.sql.SparkSession
Expand All @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project
import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform}
import org.apache.spark.sql.execution.datasources.{FileFormat, FileIndex, HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

/**
* Extractor Object for pulling out the table scan of a Delta table. It could be a full scan
Expand Down Expand Up @@ -418,4 +419,52 @@ object DeltaTableUtils extends PredicateHelper
new Path(basePath, relativeChildPath)
}
}

/**
* A list of Spark internal metadata keys that we may save in a Delta table schema
* unintentionally due to SPARK-43123. We need to remove them before handing over the schema to
* Spark to avoid Spark interpreting table columns incorrectly.
*
* Hard-coded strings are used intentionally as we want to capture possible keys used before
* SPARK-43123 regardless Spark versions. For example, if Spark changes any key string in future
* after SPARK-43123, the new string won't be leaked, but we still want to clean up the old key.
*/
val SPARK_INTERNAL_METADATA_KEYS = Seq(
"__autoGeneratedAlias",
"__metadata_col",
"__supports_qualified_star", // A key used by an old version. Doesn't exist in latest code
"__qualified_access_only",
"__file_source_metadata_col",
"__file_source_constant_metadata_col",
"__file_source_generated_metadata_col"
)

/**
* Remove leaked metadata keys from the persisted table schema. Old versions might leak metadata
* intentionally. This method removes all possible metadata keys to avoid Spark interpreting
* table columns incorrectly.
*/
def removeInternalMetadata(spark: SparkSession, persistedSchema: StructType): StructType = {
val schema = ColumnWithDefaultExprUtils.removeDefaultExpressions(persistedSchema)
if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_SCHEMA_REMOVE_SPARK_INTERNAL_METADATA)) {
var updated = false
val updatedSchema = schema.map { field =>
if (SPARK_INTERNAL_METADATA_KEYS.exists(field.metadata.contains)) {
updated = true
val newMetadata = new MetadataBuilder().withMetadata(field.metadata)
SPARK_INTERNAL_METADATA_KEYS.foreach(newMetadata.remove)
field.copy(metadata = newMetadata.build())
} else {
field
}
}
if (updated) {
StructType(updatedSchema)
} else {
schema
}
} else {
schema
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ case class DeltaTableV2(

private lazy val tableSchema: StructType =
DeltaColumnMapping.dropColumnMappingMetadata(
ColumnWithDefaultExprUtils.removeDefaultExpressions(snapshot.schema))
DeltaTableUtils.removeInternalMetadata(spark, snapshot.schema))

override def schema(): StructType = tableSchema

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class DeltaDataSource
.getOrElse(snapshot.schema)
}

val schemaToUse = ColumnWithDefaultExprUtils.removeDefaultExpressions(readSchema)
val schemaToUse = DeltaTableUtils.removeInternalMetadata(sqlContext.sparkSession, readSchema)
if (schemaToUse.isEmpty) {
throw DeltaErrors.schemaNotSetException
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,16 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(true)

val DELTA_SCHEMA_REMOVE_SPARK_INTERNAL_METADATA =
buildConf("schema.removeSparkInternalMetadata")
.doc(
"""Whether to remove leaked Spark's internal metadata from the table schema before returning
|to Spark. These internal metadata might be stored unintentionally in tables created by
|old Spark versions""".stripMargin)
.internal()
.booleanConf
.createWithDefault(true)

val DELTA_ASSUMES_DROP_CONSTRAINT_IF_EXISTS =
buildConf("constraints.assumesDropIfExists.enabled")
.doc("""If true, DROP CONSTRAINT quietly drops nonexistent constraints even without
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,7 @@ trait DeltaSourceBase extends Source
protected var hasCheckedReadIncompatibleSchemaChangesOnStreamStart: Boolean = false

override val schema: StructType = {
val schemaWithoutCDC =
ColumnWithDefaultExprUtils.removeDefaultExpressions(readSchemaAtSourceInit)
val schemaWithoutCDC = DeltaTableUtils.removeInternalMetadata(spark, readSchemaAtSourceInit)
if (options.readChangeFeed) {
CDCReader.cdcReadSchema(schemaWithoutCDC)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@ package org.apache.spark.sql.delta

import java.net.URI

import org.apache.spark.sql.delta.DeltaTestUtils.BOOLEAN_DOMAIN
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.hadoop.fs.{Path, RawLocalFileSystem}

import org.apache.spark.SparkConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._

class DeltaTableUtilsSuite extends SharedSparkSession with DeltaSQLCommandTest {

Expand All @@ -49,6 +52,32 @@ class DeltaTableUtilsSuite extends SharedSparkSession with DeltaSQLCommandTest {
assert(DeltaTableUtils.safeConcatPaths(basePathEmpty, "_delta_log") ==
new Path("s3://my-bucket/_delta_log"))
}

test("removeInternalMetadata") {
for (flag <- BOOLEAN_DOMAIN) {
withSQLConf(DeltaSQLConf.DELTA_SCHEMA_REMOVE_SPARK_INTERNAL_METADATA.key -> flag.toString) {
for (internalMetadataKey <- DeltaTableUtils.SPARK_INTERNAL_METADATA_KEYS) {
val metadata = new MetadataBuilder()
.putString(internalMetadataKey, "foo")
.putString("other", "bar")
.build()
val schema = StructType(Seq(StructField("foo", StringType, metadata = metadata)))
val newSchema = DeltaTableUtils.removeInternalMetadata(spark, schema)
newSchema.foreach { f =>
if (flag) {
// Flag on: should remove internal metadata
assert(!f.metadata.contains(internalMetadataKey))
// Should reserve non internal metadata
assert(f.metadata.contains("other"))
} else {
// Flag off: no-op
assert(f.metadata == metadata)
}
}
}
}
}
}
}

private class MockS3FileSystem extends RawLocalFileSystem {
Expand Down

0 comments on commit 81c7a58

Please sign in to comment.