Skip to content

Commit

Permalink
Block unsupported data type when updating a Delta table schema
Browse files Browse the repository at this point in the history
Currently Delta doesn't do a data type check hence any data type added by Spark will be supported by Delta automatically. This causes Delta support the following types unintentionally:
  - YearMonthIntervalType
  - DayTimeIntervalType
  - UserDefinedType

In order to prevent such issue from happening, this PR will:
- Add a data type check to only allow the following data types in a Delta table.
The data types defined in the [Protocol](https://github.com/delta-io/delta/blob/6905ce757f67935960a9a13ecb6854d53c117d31/PROTOCOL.md#schema-serialization-format).
  - YearMonthIntervalType
  - DayTimeIntervalType
  - UserDefinedType

- Add an internal flag `spark.databricks.delta.schema.typeCheck.enabled` to allow users to disable the check in case it’s needed.
- Any new data type added in future will be blocked by default.
- `TimestampNTZType` will be rejected given that a user cannot read/write a Delta table using `TimestampNTZType` today.

New added tests.

No. This is not a user facing change because we don't expect this would break any existing workflows.

Closes #1119

GitOrigin-RevId: 685cae655553e72a05f9e36a2c9a5d839cbab7b6
  • Loading branch information
zsxwing authored and scottsand-db committed May 12, 2022
1 parent d90f90b commit 57207c8
Show file tree
Hide file tree
Showing 8 changed files with 273 additions and 4 deletions.
4 changes: 4 additions & 0 deletions core/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,10 @@
"message" : [ "Creating a bloom filter index on a column with type %s is unsupported: %s" ],
"sqlState" : "0A000"
},
"DELTA_UNSUPPORTED_DATA_TYPES" : {
"message" : [ "Found columns using unsupported data types: %s. You can set '%s' to 'false' to disable the type check. Disabling this type check may allow users to create unsupported Delta tables and should only be used when trying to read/write legacy tables." ],
"sqlState" : "0A000"
},
"DELTA_UNSUPPORTED_EXPRESSION_GENERATED_COLUMN" : {
"message" : [ "%s cannot be used in a generated column" ],
"sqlState" : "0A000"
Expand Down
14 changes: 13 additions & 1 deletion core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.delta.catalog.DeltaCatalog
import org.apache.spark.sql.delta.constraints.Constraints
import org.apache.spark.sql.delta.hooks.PostCommitHook
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.schema.{DeltaInvariantViolationException, InvariantViolationException, SchemaUtils}
import org.apache.spark.sql.delta.schema.{DeltaInvariantViolationException, InvariantViolationException, SchemaUtils, UnsupportedDataTypeInfo}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.JsonUtils
import io.delta.sql.DeltaSparkSessionExtension
Expand Down Expand Up @@ -1456,6 +1456,18 @@ object DeltaErrors
)
}

def unsupportedDataTypes(
unsupportedDataType: UnsupportedDataTypeInfo,
moreUnsupportedDataTypes: UnsupportedDataTypeInfo*): Throwable = {
val prettyMessage = (unsupportedDataType +: moreUnsupportedDataTypes)
.map(dt => s"${dt.column}: ${dt.dataType}")
.mkString("[", ", ", "]")
new DeltaAnalysisException(
errorClass = "DELTA_UNSUPPORTED_DATA_TYPES",
messageParameters = Array(prettyMessage, DeltaSQLConf.DELTA_SCHEMA_TYPE_CHECK.key)
)
}

def tableAlreadyExists(table: CatalogTable): Throwable = {
new DeltaAnalysisException(
errorClass = "DELTA_TABLE_ALREADY_EXISTS",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ trait OptimisticTransactionImpl extends TransactionalWrite
/** Unique identifier for the transaction */
val txnId = UUID.randomUUID().toString

/** Whether to check unsupported data type when updating the table schema */
protected var checkUnsupportedDataType: Boolean =
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_SCHEMA_TYPE_CHECK)

/** The end to end execution time of this transaction. */
def txnExecutionTimeMs: Option[Long] = if (commitEndNano == -1) {
None
Expand Down Expand Up @@ -425,6 +429,13 @@ trait OptimisticTransactionImpl extends TransactionalWrite
recordDeltaEvent(deltaLog, "delta.generatedColumns.definition")
}

if (checkUnsupportedDataType) {
val unsupportedTypes = SchemaUtils.findUnsupportedDataTypes(metadata.schema)
if (unsupportedTypes.nonEmpty) {
throw DeltaErrors.unsupportedDataTypes(unsupportedTypes.head, unsupportedTypes.tail: _*)
}
}

val needsProtocolUpdate = Protocol.checkProtocolRequirements(spark, metadata, protocol)
if (needsProtocolUpdate.isDefined) {
newProtocol = needsProtocolUpdate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1004,6 +1004,78 @@ object SchemaUtils {
false
}

/**
* Find the unsupported data type in a table schema. Return all columns that are using unsupported
* data types. For example,
* `findUnsupportedDataType(struct<a: struct<b: unsupported_type>>)` will return
* `Some(unsupported_type, Some("a.b"))`.
*/
def findUnsupportedDataTypes(schema: StructType): Seq[UnsupportedDataTypeInfo] = {
val unsupportedDataTypes = mutable.ArrayBuffer[UnsupportedDataTypeInfo]()
findUnsupportedDataTypesRecursively(unsupportedDataTypes, schema)
unsupportedDataTypes.toSeq
}

/**
* Find the unsupported data types in a `DataType` recursively. Add the unsupported data types to
* the provided `unsupportedDataTypes` buffer.
*
* @param unsupportedDataTypes the buffer to store the found unsupport data types and the column
* paths.
* @param dataType the data type to search.
* @param columnPath the column path to access the given data type. The callder should make sure
* `columnPath` is not empty when `dataType` is not `StructType`.
*/
private def findUnsupportedDataTypesRecursively(
unsupportedDataTypes: mutable.ArrayBuffer[UnsupportedDataTypeInfo],
dataType: DataType,
columnPath: Seq[String] = Nil): Unit = dataType match {
case NullType =>
case BooleanType =>
case ByteType =>
case ShortType =>
case IntegerType | _: YearMonthIntervalType =>
case LongType | _: DayTimeIntervalType =>
case FloatType =>
case DoubleType =>
case StringType =>
case DateType =>
case TimestampType =>
case TimestampNTZType =>
assert(columnPath.nonEmpty, "'columnPath' must not be empty")
unsupportedDataTypes += UnsupportedDataTypeInfo(prettyFieldName(columnPath), TimestampNTZType)
case BinaryType =>
case _: DecimalType =>
case a: ArrayType =>
assert(columnPath.nonEmpty, "'columnPath' must not be empty")
findUnsupportedDataTypesRecursively(
unsupportedDataTypes,
a.elementType,
columnPath.dropRight(1) :+ columnPath.last + "[]")
case m: MapType =>
assert(columnPath.nonEmpty, "'columnPath' must not be empty")
findUnsupportedDataTypesRecursively(
unsupportedDataTypes,
m.keyType,
columnPath.dropRight(1) :+ columnPath.last + "[key]")
findUnsupportedDataTypesRecursively(
unsupportedDataTypes,
m.valueType,
columnPath.dropRight(1) :+ columnPath.last + "[value]")
case s: StructType =>
s.fields.foreach { f =>
findUnsupportedDataTypesRecursively(
unsupportedDataTypes,
f.dataType,
columnPath :+ f.name)
}
case udt: UserDefinedType[_] =>
findUnsupportedDataTypesRecursively(unsupportedDataTypes, udt.sqlType, columnPath)
case dt: DataType =>
assert(columnPath.nonEmpty, "'columnPath' must not be empty")
unsupportedDataTypes += UnsupportedDataTypeInfo(prettyFieldName(columnPath), dt)
}

/**
* Find all the generated columns that depend on the given target column.
*/
Expand All @@ -1030,3 +1102,11 @@ object SchemaUtils {
}
}
}

/**
* The information of unsupported data type returned by [[SchemaUtils.findUnsupportedDataTypes]].
*
* @param column the column path to access the column using an unsupported data type, such as `a.b`.
* @param dataType the unsupported data type.
*/
case class UnsupportedDataTypeInfo(column: String, dataType: DataType)
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,16 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(false)

val DELTA_SCHEMA_TYPE_CHECK =
buildConf("schema.typeCheck.enabled")
.doc(
"""Enable the data type check when updating the table schema. Disabling this flag may
| allow users to create unsupported Delta tables and should only be used when trying to
| read/write legacy tables.""".stripMargin)
.internal()
.booleanConf
.createWithDefault(true)

val DELTA_ASSUMES_DROP_CONSTRAINT_IF_EXISTS =
buildConf("constraints.assumesDropIfExists.enabled")
.doc("""If true, DROP CONSTRAINT quietly drops nonexistent constraints even without
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ class DeltaColumnRenameSuite extends QueryTest
spark.sql("alter table t1 rename column arr to arr1")
}


// cannot rename b because its child is referenced
assertException("Cannot rename column b") {
spark.sql("alter table t1 rename column b to b1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.delta.constraints.Constraints.NotNull
import org.apache.spark.sql.delta.constraints.Invariants
import org.apache.spark.sql.delta.constraints.Invariants.PersistedRule
import org.apache.spark.sql.delta.hooks.PostCommitHook
import org.apache.spark.sql.delta.schema.{DeltaInvariantViolationException, InvariantViolationException, SchemaMergingUtils, SchemaUtils}
import org.apache.spark.sql.delta.schema.{DeltaInvariantViolationException, InvariantViolationException, SchemaMergingUtils, SchemaUtils, UnsupportedDataTypeInfo}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.JsonUtils
import org.apache.hadoop.fs.Path
Expand All @@ -51,7 +51,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
import org.apache.spark.sql.types.{DataTypes, DateType, IntegerType, MetadataBuilder, NullType, StringType, StructField, StructType}
import org.apache.spark.sql.types.{CalendarIntervalType, DataTypes, DateType, IntegerType, MetadataBuilder, NullType, StringType, StructField, StructType, TimestampNTZType}

trait DeltaErrorsSuiteBase
extends QueryTest
Expand Down Expand Up @@ -1136,6 +1136,20 @@ trait DeltaErrorsSuiteBase
assert(e.getMessage == "Creating a bloom filter index on a column with type date is " +
"unsupported: col1")
}
{
val e = intercept[DeltaAnalysisException] {
throw DeltaErrors.unsupportedDataTypes(
UnsupportedDataTypeInfo("foo", CalendarIntervalType),
UnsupportedDataTypeInfo("bar", TimestampNTZType))
}
assert(e.getErrorClass == "DELTA_UNSUPPORTED_DATA_TYPES")
assert(e.getSqlState == "0A000")
assert(e.getMessage == "Found columns using unsupported data types: " +
"[foo: CalendarIntervalType, bar: TimestampNTZType]. " +
"You can set 'spark.databricks.delta.schema.typeCheck.enabled' to 'false' " +
"to disable the type check. Disabling this type check may allow users to create " +
"unsupported Delta tables and should only be used when trying to read/write legacy tables.")
}
{
val e = intercept[DeltaIllegalStateException] {
throw DeltaErrors.failOnDataLossException(12, 10)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@ import java.util.regex.Pattern

import org.apache.spark.sql.delta.schema.SchemaMergingUtils._
import org.apache.spark.sql.delta.sources.DeltaSourceUtils.GENERATION_EXPRESSION_METADATA_KEY
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import io.delta.tables.DeltaTable
import org.scalatest.GivenWhenThen

import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
Expand All @@ -33,7 +37,8 @@ import org.apache.spark.sql.types._
class SchemaUtilsSuite extends QueryTest
with SharedSparkSession
with GivenWhenThen
with SQLTestUtils {
with SQLTestUtils
with DeltaSQLCommandTest {
import SchemaUtils._
import testImplicits._

Expand Down Expand Up @@ -1515,4 +1520,136 @@ class SchemaUtilsSuite extends QueryTest
}

}

test("findUnsupportedDataTypes") {
def assertUnsupportedDataType(
dataType: DataType,
expected: Seq[UnsupportedDataTypeInfo]): Unit = {
val schema = StructType(Seq(StructField("col", dataType)))
assert(findUnsupportedDataTypes(schema) == expected)
}

assertUnsupportedDataType(NullType, Nil)
assertUnsupportedDataType(BooleanType, Nil)
assertUnsupportedDataType(ByteType, Nil)
assertUnsupportedDataType(ShortType, Nil)
assertUnsupportedDataType(IntegerType, Nil)
assertUnsupportedDataType(YearMonthIntervalType.DEFAULT, Nil)
assertUnsupportedDataType(LongType, Nil)
assertUnsupportedDataType(DayTimeIntervalType.DEFAULT, Nil)
assertUnsupportedDataType(FloatType, Nil)
assertUnsupportedDataType(DoubleType, Nil)
assertUnsupportedDataType(StringType, Nil)
assertUnsupportedDataType(DateType, Nil)
assertUnsupportedDataType(TimestampType, Nil)
assertUnsupportedDataType(
TimestampNTZType,
Seq(UnsupportedDataTypeInfo("col", TimestampNTZType)))
assertUnsupportedDataType(
CalendarIntervalType,
Seq(UnsupportedDataTypeInfo("col", CalendarIntervalType)))
assertUnsupportedDataType(BinaryType, Nil)
assertUnsupportedDataType(DataTypes.createDecimalType(), Nil)
assertUnsupportedDataType(
UnsupportedDataType,
Seq(UnsupportedDataTypeInfo("col", UnsupportedDataType)))

// array
assertUnsupportedDataType(ArrayType(IntegerType, true), Nil)
assertUnsupportedDataType(
ArrayType(UnsupportedDataType, true),
Seq(UnsupportedDataTypeInfo("col[]", UnsupportedDataType)))

// map
assertUnsupportedDataType(MapType(IntegerType, IntegerType, true), Nil)
assertUnsupportedDataType(
MapType(UnsupportedDataType, IntegerType, true),
Seq(UnsupportedDataTypeInfo("col[key]", UnsupportedDataType)))
assertUnsupportedDataType(
MapType(IntegerType, UnsupportedDataType, true),
Seq(UnsupportedDataTypeInfo("col[value]", UnsupportedDataType)))
assertUnsupportedDataType(
MapType(UnsupportedDataType, UnsupportedDataType, true),
Seq(
UnsupportedDataTypeInfo("col[key]", UnsupportedDataType),
UnsupportedDataTypeInfo("col[value]", UnsupportedDataType)))

// struct
assertUnsupportedDataType(StructType(StructField("f", LongType) :: Nil), Nil)
assertUnsupportedDataType(
StructType(StructField("a", LongType) :: StructField("dot.name", UnsupportedDataType) :: Nil),
Seq(UnsupportedDataTypeInfo("col.`dot.name`", UnsupportedDataType)))
val nestedStructType = StructType(Seq(
StructField("a", LongType),
StructField("b", StructType(Seq(
StructField("c", LongType),
StructField("d", UnsupportedDataType)
))),
StructField("e", StructType(Seq(
StructField("f", LongType),
StructField("g", UnsupportedDataType)
)))
))
assertUnsupportedDataType(
nestedStructType,
Seq(
UnsupportedDataTypeInfo("col.b.d", UnsupportedDataType),
UnsupportedDataTypeInfo("col.e.g", UnsupportedDataType)))

// udt
assertUnsupportedDataType(new PointUDT, Nil)
assertUnsupportedDataType(
new UnsupportedUDT,
Seq(UnsupportedDataTypeInfo("col", UnsupportedDataType)))
}

test("unsupported data type check") {
withTempDir { tempDir =>
val path = tempDir.getCanonicalPath

def createTableUsingTimestampNTZType(): Unit = {
DeltaTable.create().addColumn("t", TimestampNTZType, true).location(path).execute()
}

val e = intercept[AnalysisException] {
createTableUsingTimestampNTZType()
}
assert(
e.getMessage.contains("Found columns using unsupported data types: [t: TimestampNTZType]"))
assert(e.getMessage.contains(DeltaSQLConf.DELTA_SCHEMA_TYPE_CHECK.key))

withSQLConf(DeltaSQLConf.DELTA_SCHEMA_TYPE_CHECK.key -> "false") {
createTableUsingTimestampNTZType()
}
}
}
}

object UnsupportedDataType extends DataType {
override def defaultSize: Int = throw new UnsupportedOperationException("defaultSize")
override def asNullable: DataType = throw new UnsupportedOperationException("asNullable")
override def toString: String = "UnsupportedDataType"
}

@SQLUserDefinedType(udt = classOf[PointUDT])
case class Point(x: Int, y: Int)

class PointUDT extends UserDefinedType[Point] {
override def sqlType: DataType = StructType(Array(
StructField("x", IntegerType, nullable = false),
StructField("y", IntegerType, nullable = false)))

override def serialize(obj: Point): Any = InternalRow(obj.x, obj.y)

override def deserialize(datum: Any): Point = datum match {
case row: InternalRow => Point(row.getInt(0), row.getInt(1))
}

override def userClass: Class[Point] = classOf[Point]

override def toString: String = "PointUDT"
}

class UnsupportedUDT extends PointUDT {
override def sqlType: DataType = UnsupportedDataType
}

0 comments on commit 57207c8

Please sign in to comment.