[SPARK-20680][SQL] Spark-sql do not support for creating table with void column datatype

### What changes were proposed in this pull request?

This is the new PR which to address the close one #17953

1. support "void" primitive data type in the `AstBuilder`, point it to `NullType`
2. forbid creating tables with VOID/NULL column type

### Why are the changes needed?

1. Spark is incompatible with hive void type. When Hive table schema contains void type, DESC table will throw an exception in Spark.

>hive> create table bad as select 1 x, null z from dual;
>hive> describe bad;
OK
x	int
z	void

In Spark2.0.x, the behaviour to read this view is normal:
>spark-sql> describe bad;
x       int     NULL
z       void    NULL
Time taken: 4.431 seconds, Fetched 2 row(s)

But in lastest Spark version, it failed with SparkException: Cannot recognize hive type string: void

>spark-sql> describe bad;
17/05/09 03:12:08 ERROR thriftserver.SparkSQLDriver: Failed in [describe bad]
org.apache.spark.SparkException: Cannot recognize hive type string: void
Caused by: org.apache.spark.sql.catalyst.parser.ParseException:
DataType void() is not supported.(line 1, pos 0)
== SQL ==
void
^^^
        ... 61 more
org.apache.spark.SparkException: Cannot recognize hive type string: void

2. Hive CTAS statements throws error when select clause has NULL/VOID type column since HIVE-11217
In Spark, creating table with a VOID/NULL column should throw readable exception message, include

- create data source table (using parquet, json, ...)
- create hive table (with or without stored as)
- CTAS

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Add unit tests

Closes #28833 from LantaoJin/SPARK-20680_COPY.

Authored-by: LantaoJin <jinlantao@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py
index 320a68d..ddd13ca 100644
--- a/python/pyspark/sql/types.py
+++ b/python/pyspark/sql/types.py
@@ -116,6 +116,9 @@
 
     __metaclass__ = DataTypeSingleton
 
+    def simpleString(self):
+        return 'unknown'
+
 
 class AtomicType(DataType):
     """An internal type used to represent everything that is not
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
index 2a0a944..a406040 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
@@ -34,6 +34,7 @@
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
     case AlterTableAddColumnsStatement(
          nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) =>
+      cols.foreach(c => failNullType(c.dataType))
       cols.foreach(c => failCharType(c.dataType))
       val changes = cols.map { col =>
         TableChange.addColumn(
@@ -47,6 +48,7 @@
 
     case AlterTableReplaceColumnsStatement(
         nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) =>
+      cols.foreach(c => failNullType(c.dataType))
       cols.foreach(c => failCharType(c.dataType))
       val changes: Seq[TableChange] = loadTable(catalog, tbl.asIdentifier) match {
         case Some(table) =>
@@ -69,6 +71,7 @@
 
     case a @ AlterTableAlterColumnStatement(
          nameParts @ NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _) =>
+      a.dataType.foreach(failNullType)
       a.dataType.foreach(failCharType)
       val colName = a.column.toArray
       val typeChange = a.dataType.map { newDataType =>
@@ -145,6 +148,7 @@
 
     case c @ CreateTableStatement(
          NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
+      assertNoNullTypeInSchema(c.tableSchema)
       assertNoCharTypeInSchema(c.tableSchema)
       CreateV2Table(
         catalog.asTableCatalog,
@@ -157,6 +161,9 @@
 
     case c @ CreateTableAsSelectStatement(
          NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
+      if (c.asSelect.resolved) {
+        assertNoNullTypeInSchema(c.asSelect.schema)
+      }
       CreateTableAsSelect(
         catalog.asTableCatalog,
         tbl.asIdentifier,
@@ -172,6 +179,7 @@
 
     case c @ ReplaceTableStatement(
          NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
+      assertNoNullTypeInSchema(c.tableSchema)
       assertNoCharTypeInSchema(c.tableSchema)
       ReplaceTable(
         catalog.asTableCatalog,
@@ -184,6 +192,9 @@
 
     case c @ ReplaceTableAsSelectStatement(
          NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
+      if (c.asSelect.resolved) {
+        assertNoNullTypeInSchema(c.asSelect.schema)
+      }
       ReplaceTableAsSelect(
         catalog.asTableCatalog,
         tbl.asIdentifier,
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index d08bcb1..6b41a8b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -2203,6 +2203,7 @@
         DecimalType(precision.getText.toInt, 0)
       case ("decimal" | "dec" | "numeric", precision :: scale :: Nil) =>
         DecimalType(precision.getText.toInt, scale.getText.toInt)
+      case ("void", Nil) => NullType
       case ("interval", Nil) => CalendarIntervalType
       case (dt, params) =>
         val dtStr = if (params.nonEmpty) s"$dt(${params.mkString(",")})" else dt
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index e1f3293..d130a13 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -28,7 +28,7 @@
 import org.apache.spark.sql.catalyst.plans.logical.AlterTable
 import org.apache.spark.sql.connector.catalog.TableChange._
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.types.{ArrayType, DataType, HIVE_TYPE_STRING, HiveStringType, MapType, StructField, StructType}
+import org.apache.spark.sql.types.{ArrayType, DataType, HIVE_TYPE_STRING, HiveStringType, MapType, NullType, StructField, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.util.Utils
 
@@ -346,4 +346,23 @@
       }
     }
   }
+
+  def failNullType(dt: DataType): Unit = {
+    def containsNullType(dt: DataType): Boolean = dt match {
+      case ArrayType(et, _) => containsNullType(et)
+      case MapType(kt, vt, _) => containsNullType(kt) || containsNullType(vt)
+      case StructType(fields) => fields.exists(f => containsNullType(f.dataType))
+      case _ => dt.isInstanceOf[NullType]
+    }
+    if (containsNullType(dt)) {
+      throw new AnalysisException(
+        "Cannot create tables with unknown type.")
+    }
+  }
+
+  def assertNoNullTypeInSchema(schema: StructType): Unit = {
+    schema.foreach { f =>
+      failNullType(f.dataType)
+    }
+  }
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/NullType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/NullType.scala
index 14097a5..6c9a1d6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/NullType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/NullType.scala
@@ -32,6 +32,10 @@
   override def defaultSize: Int = 1
 
   private[spark] override def asNullable: NullType = this
+
+  // "null" is mainly used to represent a literal in Spark,
+  // it's better to avoid using it for data types.
+  override def simpleString: String = "unknown"
 }
 
 /**
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DataTypeParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DataTypeParserSuite.scala
index d519fdf..655b1d2 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DataTypeParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DataTypeParserSuite.scala
@@ -61,6 +61,7 @@
   checkDataType("varchAr(20)", StringType)
   checkDataType("cHaR(27)", StringType)
   checkDataType("BINARY", BinaryType)
+  checkDataType("void", NullType)
   checkDataType("interval", CalendarIntervalType)
 
   checkDataType("array<doublE>", ArrayType(DoubleType, true))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index bf90875..bc3f38a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -48,6 +48,7 @@
   override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
     case AlterTableAddColumnsStatement(
          nameParts @ SessionCatalogAndTable(catalog, tbl), cols) =>
+      cols.foreach(c => failNullType(c.dataType))
       loadTable(catalog, tbl.asIdentifier).collect {
         case v1Table: V1Table =>
           if (!DDLUtils.isHiveTable(v1Table.v1Table)) {
@@ -76,6 +77,7 @@
 
     case AlterTableReplaceColumnsStatement(
         nameParts @ SessionCatalogAndTable(catalog, tbl), cols) =>
+      cols.foreach(c => failNullType(c.dataType))
       val changes: Seq[TableChange] = loadTable(catalog, tbl.asIdentifier) match {
         case Some(_: V1Table) =>
           throw new AnalysisException("REPLACE COLUMNS is only supported with v2 tables.")
@@ -100,6 +102,7 @@
 
     case a @ AlterTableAlterColumnStatement(
          nameParts @ SessionCatalogAndTable(catalog, tbl), _, _, _, _, _) =>
+      a.dataType.foreach(failNullType)
       loadTable(catalog, tbl.asIdentifier).collect {
         case v1Table: V1Table =>
           if (!DDLUtils.isHiveTable(v1Table.v1Table)) {
@@ -268,6 +271,7 @@
     // session catalog and the table provider is not v2.
     case c @ CreateTableStatement(
          SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
+      assertNoNullTypeInSchema(c.tableSchema)
       val provider = c.provider.getOrElse(conf.defaultDataSourceName)
       if (!isV2Provider(provider)) {
         if (!DDLUtils.isHiveTable(Some(provider))) {
@@ -292,6 +296,9 @@
 
     case c @ CreateTableAsSelectStatement(
          SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
+      if (c.asSelect.resolved) {
+        assertNoNullTypeInSchema(c.asSelect.schema)
+      }
       val provider = c.provider.getOrElse(conf.defaultDataSourceName)
       if (!isV2Provider(provider)) {
         val tableDesc = buildCatalogTable(tbl.asTableIdentifier, new StructType,
@@ -319,6 +326,7 @@
     // session catalog and the table provider is not v2.
     case c @ ReplaceTableStatement(
          SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
+      assertNoNullTypeInSchema(c.tableSchema)
       val provider = c.provider.getOrElse(conf.defaultDataSourceName)
       if (!isV2Provider(provider)) {
         throw new AnalysisException("REPLACE TABLE is only supported with v2 tables.")
@@ -336,6 +344,9 @@
 
     case c @ ReplaceTableAsSelectStatement(
          SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
+      if (c.asSelect.resolved) {
+        assertNoNullTypeInSchema(c.asSelect.schema)
+      }
       val provider = c.provider.getOrElse(conf.defaultDataSourceName)
       if (!isV2Provider(provider)) {
         throw new AnalysisException("REPLACE TABLE AS SELECT is only supported with v2 tables.")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 95343e2..60cacda 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -25,6 +25,7 @@
 import org.apache.spark.sql.catalyst.expressions.{Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, RowOrdering}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.catalog.CatalogV2Util.assertNoNullTypeInSchema
 import org.apache.spark.sql.connector.expressions.{FieldReference, RewritableTransform}
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
@@ -292,6 +293,8 @@
       "in the table definition of " + table.identifier,
       sparkSession.sessionState.conf.caseSensitiveAnalysis)
 
+    assertNoNullTypeInSchema(schema)
+
     val normalizedPartCols = normalizePartitionColumns(schema, table)
     val normalizedBucketSpec = normalizeBucketSpec(schema, table)
 
diff --git a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
index 8898a11..c39adac 100644
--- a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
+++ b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
@@ -34,7 +34,7 @@
 | org.apache.spark.sql.catalyst.expressions.Ascii | ascii | SELECT ascii('222') | struct<ascii(222):int> |
 | org.apache.spark.sql.catalyst.expressions.Asin | asin | SELECT asin(0) | struct<ASIN(CAST(0 AS DOUBLE)):double> |
 | org.apache.spark.sql.catalyst.expressions.Asinh | asinh | SELECT asinh(0) | struct<ASINH(CAST(0 AS DOUBLE)):double> |
-| org.apache.spark.sql.catalyst.expressions.AssertTrue | assert_true | SELECT assert_true(0 < 1) | struct<assert_true((0 < 1)):null> |
+| org.apache.spark.sql.catalyst.expressions.AssertTrue | assert_true | SELECT assert_true(0 < 1) | struct<assert_true((0 < 1)):unknown> |
 | org.apache.spark.sql.catalyst.expressions.Atan | atan | SELECT atan(0) | struct<ATAN(CAST(0 AS DOUBLE)):double> |
 | org.apache.spark.sql.catalyst.expressions.Atan2 | atan2 | SELECT atan2(0, 0) | struct<ATAN2(CAST(0 AS DOUBLE), CAST(0 AS DOUBLE)):double> |
 | org.apache.spark.sql.catalyst.expressions.Atanh | atanh | SELECT atanh(0) | struct<ATANH(CAST(0 AS DOUBLE)):double> |
diff --git a/sql/core/src/test/resources/sql-tests/results/ansi/literals.sql.out b/sql/core/src/test/resources/sql-tests/results/ansi/literals.sql.out
index f6720f6..0274771 100644
--- a/sql/core/src/test/resources/sql-tests/results/ansi/literals.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/ansi/literals.sql.out
Binary files differ
diff --git a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out
index 9943b93..2dd6960 100644
--- a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out
@@ -49,7 +49,7 @@
 -- !query
 select * from values ("one", null), ("two", null) as data(a, b)
 -- !query schema
-struct<a:string,b:null>
+struct<a:string,b:unknown>
 -- !query output
 one	NULL
 two	NULL
diff --git a/sql/core/src/test/resources/sql-tests/results/literals.sql.out b/sql/core/src/test/resources/sql-tests/results/literals.sql.out
index f6720f6..0274771 100644
--- a/sql/core/src/test/resources/sql-tests/results/literals.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/literals.sql.out
Binary files differ
diff --git a/sql/core/src/test/resources/sql-tests/results/misc-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/misc-functions.sql.out
index bd8ffb8..8d34bf2 100644
--- a/sql/core/src/test/resources/sql-tests/results/misc-functions.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/misc-functions.sql.out
@@ -7,7 +7,7 @@
 -- !query schema
 struct<typeof(NULL):string>
 -- !query output
-null
+unknown
 
 
 -- !query
diff --git a/sql/core/src/test/resources/sql-tests/results/postgreSQL/select.sql.out b/sql/core/src/test/resources/sql-tests/results/postgreSQL/select.sql.out
index 1e59036..8b32bd6 100644
--- a/sql/core/src/test/resources/sql-tests/results/postgreSQL/select.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/postgreSQL/select.sql.out
@@ -308,7 +308,7 @@
 -- !query
 select foo.* from (select null) as foo
 -- !query schema
-struct<NULL:null>
+struct<NULL:unknown>
 -- !query output
 NULL
 
@@ -316,7 +316,7 @@
 -- !query
 select foo.* from (select 'xyzzy',1,null) as foo
 -- !query schema
-struct<xyzzy:string,1:int,NULL:null>
+struct<xyzzy:string,1:int,NULL:unknown>
 -- !query output
 xyzzy	1	NULL
 
diff --git a/sql/core/src/test/resources/sql-tests/results/sql-compatibility-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/sql-compatibility-functions.sql.out
index 26a44a8..b905f9e 100644
--- a/sql/core/src/test/resources/sql-tests/results/sql-compatibility-functions.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/sql-compatibility-functions.sql.out
@@ -5,7 +5,7 @@
 -- !query
 SELECT ifnull(null, 'x'), ifnull('y', 'x'), ifnull(null, null)
 -- !query schema
-struct<ifnull(NULL, x):string,ifnull(y, x):string,ifnull(NULL, NULL):null>
+struct<ifnull(NULL, x):string,ifnull(y, x):string,ifnull(NULL, NULL):unknown>
 -- !query output
 x	y	NULL
 
@@ -21,7 +21,7 @@
 -- !query
 SELECT nvl(null, 'x'), nvl('y', 'x'), nvl(null, null)
 -- !query schema
-struct<nvl(NULL, x):string,nvl(y, x):string,nvl(NULL, NULL):null>
+struct<nvl(NULL, x):string,nvl(y, x):string,nvl(NULL, NULL):unknown>
 -- !query output
 x	y	NULL
 
@@ -29,7 +29,7 @@
 -- !query
 SELECT nvl2(null, 'x', 'y'), nvl2('n', 'x', 'y'), nvl2(null, null, null)
 -- !query schema
-struct<nvl2(NULL, x, y):string,nvl2(n, x, y):string,nvl2(NULL, NULL, NULL):null>
+struct<nvl2(NULL, x, y):string,nvl2(n, x, y):string,nvl2(NULL, NULL, NULL):unknown>
 -- !query output
 y	x	NULL
 
diff --git a/sql/core/src/test/resources/sql-tests/results/udf/udf-inline-table.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-inline-table.sql.out
index d78d347..0680a87 100644
--- a/sql/core/src/test/resources/sql-tests/results/udf/udf-inline-table.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-inline-table.sql.out
@@ -49,7 +49,7 @@
 -- !query
 select udf(a), b from values ("one", null), ("two", null) as data(a, b)
 -- !query schema
-struct<CAST(udf(cast(a as string)) AS STRING):string,b:null>
+struct<CAST(udf(cast(a as string)) AS STRING):string,b:unknown>
 -- !query output
 one	NULL
 two	NULL
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index 231a8f2..daa262d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -405,7 +405,7 @@
         ""
       }
       def errorMessage(format: String): String = {
-        s"$format data source does not support null data type."
+        s"$format data source does not support unknown data type."
       }
       withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> useV1List) {
         withTempDir { dir =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index b9c98f4..2b1eb05 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -28,6 +28,7 @@
 import org.apache.spark.sql.catalyst.planning._
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics}
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.catalog.CatalogV2Util.assertNoNullTypeInSchema
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
 import org.apache.spark.sql.execution.datasources.CreateTable
@@ -225,6 +226,8 @@
             isConvertible(tableDesc) && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_CTAS) =>
         // validation is required to be done here before relation conversion.
         DDLUtils.checkDataColNames(tableDesc.copy(schema = query.schema))
+        // This is for CREATE TABLE .. STORED AS PARQUET/ORC AS SELECT null
+        assertNoNullTypeInSchema(query.schema)
         OptimizedCreateHiveTableAsSelectCommand(
           tableDesc, query, query.output.map(_.name), mode)
     }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index e8cf4ad..774fb5b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -31,6 +31,7 @@
 import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.connector.FakeV2Provider
 import org.apache.spark.sql.connector.catalog.CatalogManager
 import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER
 import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils}
@@ -2309,6 +2310,126 @@
     }
   }
 
+  test("SPARK-20680: Spark-sql do not support for unknown column datatype") {
+    withTable("t") {
+      withView("tabUnknownType") {
+        hiveClient.runSqlHive("CREATE TABLE t (t1 int)")
+        hiveClient.runSqlHive("INSERT INTO t VALUES (3)")
+        hiveClient.runSqlHive("CREATE VIEW tabUnknownType AS SELECT NULL AS col FROM t")
+        checkAnswer(spark.table("tabUnknownType"), Row(null))
+        // No exception shows
+        val desc = spark.sql("DESC tabUnknownType").collect().toSeq
+        assert(desc.contains(Row("col", NullType.simpleString, null)))
+      }
+    }
+
+    // Forbid CTAS with unknown type
+    withTable("t1", "t2", "t3") {
+      val e1 = intercept[AnalysisException] {
+        spark.sql("CREATE TABLE t1 USING PARQUET AS SELECT null as null_col")
+      }.getMessage
+      assert(e1.contains("Cannot create tables with unknown type"))
+
+      val e2 = intercept[AnalysisException] {
+        spark.sql("CREATE TABLE t2 AS SELECT null as null_col")
+      }.getMessage
+      assert(e2.contains("Cannot create tables with unknown type"))
+
+      val e3 = intercept[AnalysisException] {
+        spark.sql("CREATE TABLE t3 STORED AS PARQUET AS SELECT null as null_col")
+      }.getMessage
+      assert(e3.contains("Cannot create tables with unknown type"))
+    }
+
+    // Forbid Replace table AS SELECT with unknown type
+    withTable("t") {
+      val v2Source = classOf[FakeV2Provider].getName
+      val e = intercept[AnalysisException] {
+        spark.sql(s"CREATE OR REPLACE TABLE t USING $v2Source AS SELECT null as null_col")
+      }.getMessage
+      assert(e.contains("Cannot create tables with unknown type"))
+    }
+
+    // Forbid creating table with VOID type in Spark
+    withTable("t1", "t2", "t3", "t4") {
+      val e1 = intercept[AnalysisException] {
+        spark.sql(s"CREATE TABLE t1 (v VOID) USING PARQUET")
+      }.getMessage
+      assert(e1.contains("Cannot create tables with unknown type"))
+      val e2 = intercept[AnalysisException] {
+        spark.sql(s"CREATE TABLE t2 (v VOID) USING hive")
+      }.getMessage
+      assert(e2.contains("Cannot create tables with unknown type"))
+      val e3 = intercept[AnalysisException] {
+        spark.sql(s"CREATE TABLE t3 (v VOID)")
+      }.getMessage
+      assert(e3.contains("Cannot create tables with unknown type"))
+      val e4 = intercept[AnalysisException] {
+        spark.sql(s"CREATE TABLE t4 (v VOID) STORED AS PARQUET")
+      }.getMessage
+      assert(e4.contains("Cannot create tables with unknown type"))
+    }
+
+    // Forbid Replace table with VOID type
+    withTable("t") {
+      val v2Source = classOf[FakeV2Provider].getName
+      val e = intercept[AnalysisException] {
+        spark.sql(s"CREATE OR REPLACE TABLE t (v VOID) USING $v2Source")
+      }.getMessage
+      assert(e.contains("Cannot create tables with unknown type"))
+    }
+
+    // Make sure spark.catalog.createTable with null type will fail
+    val schema1 = new StructType().add("c", NullType)
+    assertHiveTableNullType(schema1)
+    assertDSTableNullType(schema1)
+
+    val schema2 = new StructType()
+      .add("c", StructType(Seq(StructField.apply("c1", NullType))))
+    assertHiveTableNullType(schema2)
+    assertDSTableNullType(schema2)
+
+    val schema3 = new StructType().add("c", ArrayType(NullType))
+    assertHiveTableNullType(schema3)
+    assertDSTableNullType(schema3)
+
+    val schema4 = new StructType()
+      .add("c", MapType(StringType, NullType))
+    assertHiveTableNullType(schema4)
+    assertDSTableNullType(schema4)
+
+    val schema5 = new StructType()
+      .add("c", MapType(NullType, StringType))
+    assertHiveTableNullType(schema5)
+    assertDSTableNullType(schema5)
+  }
+
+  private def assertHiveTableNullType(schema: StructType): Unit = {
+    withTable("t") {
+      val e = intercept[AnalysisException] {
+        spark.catalog.createTable(
+          tableName = "t",
+          source = "hive",
+          schema = schema,
+          options = Map("fileFormat" -> "parquet"))
+      }.getMessage
+      assert(e.contains("Cannot create tables with unknown type"))
+    }
+  }
+
+  private def assertDSTableNullType(schema: StructType): Unit = {
+    withTable("t") {
+      val e = intercept[AnalysisException] {
+        spark.catalog.createTable(
+          tableName = "t",
+          source = "json",
+          schema = schema,
+          options = Map.empty[String, String])
+      }.getMessage
+      assert(e.contains("Cannot create tables with unknown type"))
+    }
+  }
+
   test("SPARK-21216: join with a streaming DataFrame") {
     import org.apache.spark.sql.execution.streaming.MemoryStream
     import testImplicits._
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala
index 91fd8a4..61c48c6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala
@@ -121,7 +121,7 @@
       msg = intercept[AnalysisException] {
         sql("select null").write.mode("overwrite").orc(orcDir)
       }.getMessage
-      assert(msg.contains("ORC data source does not support null data type."))
+      assert(msg.contains("ORC data source does not support unknown data type."))
 
       msg = intercept[AnalysisException] {
         spark.udf.register("testType", () => new IntervalData())