[CARBONDATA-3596] Fix exception when execute load data command or select sql on a table which includes complex columns after execute 'add column' command
Problem:
After execute 'add column' command, it will throw exception when execute load data command or select sql on a table which includes complex columns
Solution:
Put complex type columns at the end of dimension columns after execute 'add column' command.
This closes #3485
diff --git a/.gitignore b/.gitignore
index 854ebff..d0d1505 100644
--- a/.gitignore
+++ b/.gitignore
@@ -18,3 +18,9 @@
metastore_db/
derby.log
python/.idea/
+*/.cache-main
+*/.cache-tests
+*/*/.cache-main
+*/*/.cache-tests
+*/*/*/.cache-main
+*/*/*/.cache-tests
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
index 3e39dca..d011da3 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
@@ -149,7 +149,7 @@
fillComplexColumnDataBufferForThisRow();
for (int i = 0; i < queryDimensions.length; i++) {
fillDimensionData(scannedResult, surrogateResult, noDictionaryKeys, complexTypeKeyArray,
- comlexDimensionInfoMap, row, i);
+ comlexDimensionInfoMap, row, i, queryDimensions[i].getDimension().getOrdinal());
}
} else {
scannedResult.incrementCounter();
@@ -239,9 +239,23 @@
}
}
+ /**
+ * fill the data of dimension columns into row
+ *
+ * @param scannedResult
+ * @param surrogateResult
+ * @param noDictionaryKeys
+ * @param complexTypeKeyArray
+ * @param complexDimensionInfoMap
+ * @param row: row data
+ * @param i: dimension columns index
+ * @param actualOrdinal: the actual ordinal of dimension columns in segment
+ *
+ */
void fillDimensionData(BlockletScannedResult scannedResult, int[] surrogateResult,
byte[][] noDictionaryKeys, byte[][] complexTypeKeyArray,
- Map<Integer, GenericQueryType> complexDimensionInfoMap, Object[] row, int i) {
+ Map<Integer, GenericQueryType> complexDimensionInfoMap, Object[] row, int i,
+ int actualOrdinal) {
if (!dictionaryEncodingArray[i]) {
if (implicitColumnArray[i]) {
if (CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID
@@ -261,9 +275,8 @@
ByteBuffer.wrap(complexTypeKeyArray[complexTypeColumnIndex++]));
} else {
row[order[i]] =
- complexDimensionInfoMap.get(queryDimensions[i].getDimension().getOrdinal())
- .getDataBasedOnDataType(
- ByteBuffer.wrap(complexTypeKeyArray[complexTypeColumnIndex++]));
+ complexDimensionInfoMap.get(actualOrdinal).getDataBasedOnDataType(
+ ByteBuffer.wrap(complexTypeKeyArray[complexTypeColumnIndex++]));
}
} else {
if (queryDimensionToComplexParentOrdinal.get(i) != -1) {
@@ -283,7 +296,7 @@
surrogateResult[actualIndexInSurrogateKey[dictionaryColumnIndex++]]);
}
} else if (complexDataTypeArray[i]) {
- row[order[i]] = complexDimensionInfoMap.get(queryDimensions[i].getDimension().getOrdinal())
+ row[order[i]] = complexDimensionInfoMap.get(actualOrdinal)
.getDataBasedOnDataType(ByteBuffer.wrap(complexTypeKeyArray[complexTypeColumnIndex++]));
dictionaryColumnIndex++;
} else {
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
index 73b0d6d..3627e00 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
@@ -43,7 +43,6 @@
measureDefaultValues = new Object[queryMeasures.length];
fillMeasureDefaultValues();
initDimensionAndMeasureIndexesForFillingData();
- initDimensionAndMeasureIndexesForFillingData();
isDimensionExists = queryDimensions.length > 0;
}
@@ -83,6 +82,7 @@
dictionaryColumnIndex = 0;
noDictionaryColumnIndex = 0;
complexTypeColumnIndex = 0;
+ int segmentDimensionsIdx = 0;
for (int i = 0; i < queryDimensions.length; i++) {
// fill default value in case the dimension does not exist in the current block
if (!dimensionInfo.getDimensionExists()[i]) {
@@ -98,7 +98,8 @@
continue;
}
fillDimensionData(scannedResult, surrogateResult, noDictionaryKeys, complexTypeKeyArray,
- comlexDimensionInfoMap, row, i);
+ comlexDimensionInfoMap, row, i, executionInfo
+ .getProjectionDimensions()[segmentDimensionsIdx++].getDimension().getOrdinal());
}
} else {
scannedResult.incrementCounter();
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RowIdBasedResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RowIdBasedResultCollector.java
index 30ce616..2111b02 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RowIdBasedResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RowIdBasedResultCollector.java
@@ -57,7 +57,7 @@
complexTypeColumnIndex = 0;
for (int i = 0; i < queryDimensions.length; i++) {
fillDimensionData(scannedResult, surrogateResult, noDictionaryKeys, complexTypeKeyArray,
- comlexDimensionInfoMap, row, i);
+ comlexDimensionInfoMap, row, i, queryDimensions[i].getDimension().getOrdinal());
}
} else {
scannedResult.incrementCounter();
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
index 54dfb1b..d32afd9 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
@@ -181,4 +181,4 @@
// keep default, one record in one datamap
return 1;
}
-}
\ No newline at end of file
+}
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
new file mode 100644
index 0000000..a6d1d62
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.alterTable
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class TestAlterTableAddColumns extends QueryTest with BeforeAndAfterAll {
+
+ override def beforeAll(): Unit = {
+ }
+
+ override def afterAll(): Unit = {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "true")
+ }
+
+ private def testAddColumnForComplexTable(): Unit = {
+ val tableName = "test_add_column_for_complex_table"
+ sql(s"""DROP TABLE IF EXISTS ${ tableName }""")
+ sql(
+ s"""
+ | CREATE TABLE IF NOT EXISTS ${ tableName }(id INT, name STRING, file array<array<float>>,
+ | city STRING, salary FLOAT, ls STRING, map_column map<short,int>, struct_column struct<s:short>)
+ | STORED BY 'carbondata'
+ | TBLPROPERTIES('sort_columns'='name', 'SORT_SCOPE'='LOCAL_SORT', 'LONG_STRING_COLUMNS'='ls',
+ | 'LOCAL_DICTIONARY_ENABLE'='true', 'LOCAL_DICTIONARY_INCLUDE'='city')
+ """.stripMargin)
+ sql(
+ s"""
+ | insert into table ${tableName} values
+ | (1, 'name1', array(array(1.1, 2.1), array(1.1, 2.1)), 'city1', 40000.0, '${ ("123" * 12000) }', map(1,1), named_struct('s',1)),
+ | (2, 'name2', array(array(1.2, 2.2), array(1.2, 2.2)), 'city2', 50000.0, '${ ("456" * 12000) }', map(2,2), named_struct('s',2)),
+ | (3, 'name3', array(array(1.3, 2.3), array(1.3, 2.3)), 'city3', 60000.0, '${ ("789" * 12000) }', map(3,3), named_struct('s',3))
+ """.stripMargin)
+ checkAnswer(sql(s"select count(1) from ${ tableName }"), Seq(Row(3)))
+ checkAnswer(sql(s"select name, city from ${ tableName } where id = 3"), Seq(Row("name3", "city3")))
+
+ sql(s"""desc formatted ${tableName}""").show(100, false)
+ sql(s"""alter table ${tableName} add columns (add_column string) TBLPROPERTIES('LOCAL_DICTIONARY_INCLUDE'='add_column')""")
+ sql(s"""ALTER TABLE ${tableName} SET TBLPROPERTIES('SORT_COLUMNS'='id, add_column, city')""")
+ sql(s"""desc formatted ${tableName}""").show(100, false)
+
+ sql(
+ s"""
+ | insert into table ${tableName} values
+ | (4, 'name4', array(array(1.4, 2.4), array(1.4, 2.4)), 'city4', 70000.0, '${ ("123" * 12000) }', map(4,4), named_struct('s',4), 'add4'),
+ | (5, 'name5', array(array(1.5, 2.5), array(1.5, 2.5)), 'city5', 80000.0, '${ ("456" * 12000) }', map(5,5), named_struct('s',5), 'add5'),
+ | (6, 'name6', array(array(1.6, 2.6), array(1.6, 2.6)), 'city6', 90000.0, '${ ("789" * 12000) }', map(6,6), named_struct('s',6), 'add6')
+ """.stripMargin)
+ checkAnswer(sql(s"select count(1) from ${ tableName }"), Seq(Row(6)))
+ checkAnswer(sql(s"""select add_column, id, city, name from ${ tableName } where id = 6"""),
+ Seq(Row("add6", 6, "city6", "name6")))
+
+ sql(s"""desc formatted ${tableName}""").show(100, false)
+ sql(s"""ALTER TABLE ${ tableName } DROP COLUMNS (city)""")
+ sql(s"""desc formatted ${tableName}""").show(100, false)
+
+ sql(
+ s"""
+ | insert into table ${tableName} values
+ | (7, 'name7', array(array(1.4, 2.4), array(1.4, 2.4)), 70000.0, '${ ("123" * 12000) }', map(7,7), named_struct('s',7), 'add7'),
+ | (8, 'name8', array(array(1.5, 2.5), array(1.5, 2.5)), 80000.0, '${ ("456" * 12000) }', map(8,8), named_struct('s',8), 'add8'),
+ | (9, 'name9', array(array(1.6, 2.6), array(1.6, 2.6)), 90000.0, '${ ("789" * 12000) }', map(9,9), named_struct('s',9), 'add9')
+ """.stripMargin)
+ checkAnswer(sql(s"select count(1) from ${ tableName }"), Seq(Row(9)))
+ checkAnswer(sql(s"""select id, add_column, name from ${ tableName } where id = 9"""), Seq(Row(9, "add9", "name9")))
+
+ sql(s"""desc formatted ${tableName}""").show(100, false)
+ sql(s"""alter table ${tableName} add columns (add_column_ls string) TBLPROPERTIES('LONG_STRING_COLUMNS'='add_column_ls')""")
+ sql(s"""desc formatted ${tableName}""").show(100, false)
+
+ sql(
+ s"""
+ | insert into table ${tableName} values
+ | (10, 'name10', array(array(1.4, 2.4), array(1.4, 2.4)), 100000.0, '${ ("123" * 12000) }', map(4,4), named_struct('s',4), 'add4', '${ ("999" * 12000) }'),
+ | (11, 'name11', array(array(1.5, 2.5), array(1.5, 2.5)), 110000.0, '${ ("456" * 12000) }', map(5,5), named_struct('s',5), 'add5', '${ ("888" * 12000) }'),
+ | (12, 'name12', array(array(1.6, 2.6), array(1.6, 2.6)), 120000.0, '${ ("789" * 12000) }', map(6,6), named_struct('s',6), 'add6', '${ ("777" * 12000) }')
+ """.stripMargin)
+ checkAnswer(sql(s"select count(1) from ${ tableName }"), Seq(Row(12)))
+ checkAnswer(sql(s"""select id, name, file, add_column_ls, map_column, struct_column from ${ tableName } where id = 10"""),
+ Seq(Row(10, "name10",
+ mutable.WrappedArray.make(Array(mutable.WrappedArray.make(Array(1.4, 2.4)), mutable.WrappedArray.make(Array(1.4, 2.4)))),
+ ("999" * 12000), Map(4 -> 4), Row(4))))
+
+ sql(s"""DROP TABLE IF EXISTS ${ tableName }""")
+ }
+
+ test("[CARBONDATA-3596] Fix exception when execute load data command or select sql on a table which includes complex columns after execute 'add column' command") {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "true")
+ // test for not vector reader
+ testAddColumnForComplexTable()
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "false")
+ // test for vector reader
+ testAddColumnForComplexTable()
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "true")
+ }
+}
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index 3db3ebe..b6441fb 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -241,7 +241,12 @@
val tableSchema = tableInfo.getFactTable
val tableCols = tableSchema.getListOfColumns.asScala
val existingColsSize = tableCols.size
- var allColumns = tableCols.filter(x => x.isDimensionColumn)
+ var longStringCols = Seq[ColumnSchema]()
+ // get all original dimension columns
+ // but exclude complex type columns and long string columns
+ var allColumns = tableCols.filter(x =>
+ (x.isDimensionColumn && !x.getDataType.isComplexType()
+ && x.getSchemaOrdinal != -1 && (x.getDataType != DataTypes.VARCHAR)))
var newCols = Seq[ColumnSchema]()
var invertedIndxCols: Array[String] = Array[String]()
if (alterTableModel.tableProperties.get(CarbonCommonConstants.INVERTED_INDEX).isDefined) {
@@ -249,6 +254,7 @@
.split(',').map(_.trim)
}
+ // add new dimension columns
alterTableModel.dimCols.foreach(field => {
val encoders = new java.util.ArrayList[Encoding]()
encoders.add(Encoding.DICTIONARY)
@@ -263,10 +269,26 @@
alterTableModel.databaseName.getOrElse(dbName),
isSortColumn(field.name.getOrElse(field.column)),
isVarcharColumn(field.name.getOrElse(field.column)))
- allColumns ++= Seq(columnSchema)
+ if (columnSchema.getDataType == DataTypes.VARCHAR) {
+ // put the new long string columns in 'longStringCols'
+ // and add them after old long string columns
+ longStringCols ++= Seq(columnSchema)
+ } else {
+ allColumns ++= Seq(columnSchema)
+ }
newCols ++= Seq(columnSchema)
})
+ // put the old long string columns
+ allColumns ++= tableCols.filter(x =>
+ (x.isDimensionColumn && (x.getDataType == DataTypes.VARCHAR)))
+ // put the new long string columns
+ allColumns ++= longStringCols
+ // put complex type columns at the end of dimension columns
+ allColumns ++= tableCols.filter(x =>
+ (x.isDimensionColumn && (x.getDataType.isComplexType() || x.getSchemaOrdinal == -1)))
+ // original measure columns
allColumns ++= tableCols.filter(x => !x.isDimensionColumn)
+ // add new measure columns
alterTableModel.msrCols.foreach(field => {
val encoders = new java.util.ArrayList[Encoding]()
val columnSchema: ColumnSchema = TableNewProcessor.createColumnSchema(
diff --git a/log b/log
deleted file mode 100644
index e69de29..0000000
--- a/log
+++ /dev/null