[HOTFIX] fix dataloading/insertInto complex data type issue for partition table

for partition table, complex data type of target table should be converted to binary data type.

This closes #3555
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionComplexDataTypeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionComplexDataTypeTestCase.scala
new file mode 100644
index 0000000..c3923e5
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionComplexDataTypeTestCase.scala
@@ -0,0 +1,103 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.standardpartition
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+ * StandardPartitionComplexDataTypeTestCase
+ */
+class StandardPartitionComplexDataTypeTestCase extends QueryTest with BeforeAndAfterAll {
+  override protected def beforeAll(): Unit = {
+    dropTable
+  }
+  override protected def afterAll(): Unit = {
+    dropTable
+  }
+  def dropTable = {
+    sql("drop table if exists tbl_complex_p")
+    sql("drop table if exists tbl_complex_p_carbondata")
+  }
+  test("test complex datatype for partition table") {
+    val tableName1 = "tbl_complex_p"
+    sql(s"drop table if exists $tableName1")
+    sql(
+      s"""
+         | create table $tableName1 (
+         | col1 int,
+         | col2 string,
+         | col3 float,
+         | col4 struct<level: string, ratio: float, sub: struct<level: string, ratio: float>>,
+         | col5 array<struct<ratio: float>>,
+         | col6 map<string, struct<ratio: float>>,
+         | col7 date
+         | ) """.stripMargin)
+    sql(s"""
+           | insert into table $tableName1
+           | select
+           |   1,
+           |   'a',
+           |   1.1,
+           |   struct('b', 1.2, struct('bc', 1.21)),
+           |   array(struct(1.3), struct(1.4)),
+           |   map('l1', struct(1.5), 'l2', struct(1.6)),
+           |   to_date('2019-01-01')
+           | """.stripMargin)
+    val tableName2 = "tbl_complex_p_carbondata"
+    sql(s"drop table if exists $tableName2")
+    sql(
+      s"""
+         | create table $tableName2 (
+         | col1 int,
+         | col2 string,
+         | col3 float,
+         | col4 struct<level: string, ratio: float, sub: struct<level: string, ratio: float>>,
+         | col5 array<struct<ratio: float>>,
+         | col6 map<string, struct<ratio: float>>
+         | )
+         | stored as carbondata
+         | partitioned by (col7 date)
+         | """.stripMargin)
+    sql(s"insert into table $tableName2 select * from $tableName1")
+    checkAnswer(
+      sql(
+        s"""
+           |select
+           |  cast(round(col4.ratio, 1) as float),
+           |  cast(round(col4.sub.ratio, 2) as float),
+           |  cast(round(col5[1].ratio, 1) as float),
+           |  cast(round(col6['l1'].ratio, 1) as float)
+           | from $tableName2
+           |""".stripMargin),
+      sql(
+        s"""
+           |select
+           |  col4.ratio, col4.sub.ratio,
+           |  col5[1].ratio,
+           |  col6['l1'].ratio
+           | from $tableName1
+           |""".stripMargin)
+    )
+  }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 4e560b8..7a853b9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -872,13 +872,19 @@
     val table = loadModel.getCarbonDataLoadSchema.getCarbonTable
     val metastoreSchema = StructType(catalogTable.schema.fields.map{f =>
       val column = table.getColumnByName(f.name)
-      if (column.hasEncoding(Encoding.DICTIONARY)) {
-        f.copy(dataType = IntegerType)
-      } else if (f.dataType == TimestampType || f.dataType == DateType) {
-        f.copy(dataType = LongType)
+      val updatedDataType = if (column.hasEncoding(Encoding.DICTIONARY)) {
+        IntegerType
       } else {
-        f
+        f.dataType match {
+          case TimestampType | DateType =>
+            LongType
+          case _: StructType | _: ArrayType | _: MapType =>
+            BinaryType
+          case _ =>
+            f.dataType
+        }
+      f.copy(dataType = updatedDataType)
     val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions
     val catalog = new CatalogFileIndex(