[CARBONDATA-3628] Support alter hive table add array and map type column
Support adding array and map data type column by ALTER TABLE
This closes #3529
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala
index 297ff04..cc34df5 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala
@@ -1022,7 +1022,31 @@
assert(exception.getMessage.contains("Unsupported alter operation on hive table"))
} else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
sql("alter table alter_hive add columns(add string)")
- sql("insert into alter_hive select 'abc','banglore'")
+ sql("alter table alter_hive add columns (var map<string, string>)")
+ sql("insert into alter_hive select 'abc','banglore',map('age','10','birth','2020')")
+ checkAnswer(
+ sql("select * from alter_hive"),
+ Seq(Row("abc", "banglore", Map("age" -> "10", "birth" -> "2020")))
+ )
+ }
+ }
+
+ test("Alter table add column for hive partitioned table for spark version above 2.1") {
+ sql("drop table if exists alter_hive")
+ sql("create table alter_hive(name string) stored as rcfile partitioned by (dt string)")
+ if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+ sql("alter table alter_hive add columns(add string)")
+ sql("alter table alter_hive add columns (var map<string, string>)")
+ sql("alter table alter_hive add columns (loves array<string>)")
+ sql(
+ s"""
+ |insert into alter_hive partition(dt='par')
+ |select 'abc', 'banglore', map('age', '10', 'birth', '2020'), array('a', 'b', 'c')
+ """.stripMargin)
+ checkAnswer(
+ sql("select * from alter_hive where dt='par'"),
+ Seq(Row("abc", "banglore", Map("age" -> "10", "birth" -> "2020"), Seq("a", "b", "c"), "par"))
+ )
}
}
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala
index d6a9413..82541b2 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala
@@ -146,8 +146,7 @@
}
def deleteFile(path: String, extension: String): Unit = {
- val file: CarbonFile = FileFactory
- .getCarbonFile(path, FileFactory.getFileType(path))
+ val file: CarbonFile = FileFactory.getCarbonFile(path)
for (eachDir <- file.listFiles) {
if (!eachDir.isDirectory) {
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
index 9d4fe79..eca20ed 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
@@ -88,14 +88,13 @@
protected def checkAnswer(carbon: String, hive: String, uniqueIdentifier: String): Unit = {
val path = TestQueryExecutor.hiveresultpath + "/" + uniqueIdentifier
- if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) {
- val objinp = new ObjectInputStream(FileFactory
- .getDataInputStream(path, FileFactory.getFileType(path)))
+ if (FileFactory.isFileExist(path)) {
+ val objinp = new ObjectInputStream(FileFactory.getDataInputStream(path))
val rows = objinp.readObject().asInstanceOf[Array[Row]]
objinp.close()
QueryTest.checkAnswer(sql(carbon), rows) match {
case Some(errorMessage) => {
- FileFactory.deleteFile(path, FileFactory.getFileType(path))
+ FileFactory.deleteFile(path)
writeAndCheckAnswer(carbon, hive, path)
}
case None =>
@@ -107,8 +106,7 @@
private def writeAndCheckAnswer(carbon: String, hive: String, path: String): Unit = {
val rows = sql(hive).collect()
- val obj = new ObjectOutputStream(FileFactory.getDataOutputStream(path, FileFactory
- .getFileType(path)))
+ val obj = new ObjectOutputStream(FileFactory.getDataOutputStream(path))
obj.writeObject(rows)
obj.close()
checkAnswer(sql(carbon), rows)
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 0f31471..71e91cc 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -19,7 +19,7 @@
import java.io.IOException
import java.util
-import java.util.{Collections, List, Map}
+import java.util.{Collections, List}
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable
@@ -32,10 +32,11 @@
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.command.{CarbonMergerMapping, NodeInfo}
import org.apache.spark.sql.hive.DistributionUtil
-import org.apache.spark.sql.util.{CarbonException, SparkTypeConverter}
+import org.apache.spark.sql.util.CarbonException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.converter.SparkDataTypeConverterImpl
@@ -64,8 +65,8 @@
import org.apache.carbondata.processing.merger._
import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
import org.apache.carbondata.spark.MergeResult
-import org.apache.carbondata.spark.load.{ByteArrayOrdering, DataLoadProcessBuilderOnSpark, PrimtiveOrdering, StringOrdering}
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util}
+import org.apache.carbondata.spark.load.{DataLoadProcessBuilderOnSpark, PrimtiveOrdering, StringOrdering}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil}
class CarbonMergerRDD[K, V](
@transient private val ss: SparkSession,
@@ -680,7 +681,7 @@
partitionNames = null,
splits = allSplits)
val objectOrdering: Ordering[Object] = createOrderingForColumn(rangeColumn)
- val sparkDataType = Util.convertCarbonToSparkDataType(dataType)
+ val sparkDataType = CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(dataType)
// Change string type to support all types
val sampleRdd = scanRdd
.map(row => (row.get(0, sparkDataType), null))
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index 7c8993e..a851bc3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.strategy
import org.apache.spark.sql._
+import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
@@ -35,11 +36,9 @@
import org.apache.spark.sql.types.StructField
import org.apache.spark.util.{CarbonReflectionUtils, DataMapUtil, FileUtils, SparkUtil}
-import org.apache.carbondata.common.exceptions.DeprecatedFeatureException
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo}
-import org.apache.carbondata.spark.util.Util
/**
* Carbon strategies for ddl commands
@@ -176,8 +175,8 @@
val structField = (alterTableAddColumnsModel.dimCols ++ alterTableAddColumnsModel.msrCols)
.map {
a =>
- StructField(a.column,
- Util.convertCarbonToSparkDataType(DataTypeUtil.valueOf(a.dataType.get)))
+ StructField(a.column, CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(
+ DataTypeUtil.valueOf(a.dataType.get)))
}
val identifier = TableIdentifier(
alterTableAddColumnsModel.tableName,