[CARBONDATA-3028][32k] Fix bugs in spark file format table with blanks in longstringcolumns
If we create a spark file format table with multiple longstringcolumns
and the option long_string_columns contains blank characters, the
query on that table will fail, cause it didn't recognize the correct
varchar columns. The root cause is that carbondata didn't trim the blank
in long_string_columns while it recognizing the varchar columns.
This closes #2834
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
index 755a7df..250e9a6 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
@@ -19,7 +19,6 @@
import java.io.File
import java.text.SimpleDateFormat
-import java.util
import java.util.{Date, Random}
import scala.collection.JavaConverters._
@@ -30,17 +29,14 @@
import org.apache.spark.util.SparkUtil
import org.apache.spark.sql.carbondata.datasource.TestUtil.{spark, _}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonV3DataFormatConstants}
import org.apache.carbondata.core.datastore.filesystem.CarbonFile
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataFileFooterConverter}
import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema}
-import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.Row
-import org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexReplaceRule
+import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter
@@ -433,14 +429,16 @@
.append("[ \n")
.append(" {\"name\":\"string\"},\n")
.append(" {\"address\":\"varchar\"},\n")
- .append(" {\"age\":\"int\"}\n")
+ .append(" {\"age\":\"int\"},\n")
+ .append(" {\"note\":\"varchar\"}\n")
.append("]")
.toString()
val builder = CarbonWriter.builder()
val writer = builder.outputPath(writerPath).withCsvInput(Schema.parseJson(schema)).build()
- for (i <- 0 until 3) {
+ val totalRecordsNum = 3
+ for (i <- 0 until totalRecordsNum) {
// write a varchar with 75,000 length
- writer.write(Array[String](s"name_$i", RandomStringUtils.randomAlphabetic(75000), i.toString))
+ writer.write(Array[String](s"name_$i", RandomStringUtils.randomAlphabetic(75000), i.toString, RandomStringUtils.randomAlphabetic(75000)))
}
writer.close()
@@ -449,19 +447,19 @@
if (spark.sparkContext.version.startsWith("2.1")) {
//data source file format
spark.sql(
- s"""CREATE TABLE sdkOutputTable (name string, address string, age int)
- |USING carbon OPTIONS (PATH '$writerPath', "long_String_columns" "address") """
+ s"""CREATE TABLE sdkOutputTable (name string, address string, age int, note string)
+ |USING carbon OPTIONS (PATH '$writerPath', "long_String_columns" "address, note") """
.stripMargin)
} else {
//data source file format
spark.sql(
- s"""CREATE TABLE sdkOutputTable (name string, address string, age int) USING carbon
- |OPTIONS("long_String_columns"="address") LOCATION
+ s"""CREATE TABLE sdkOutputTable (name string, address string, age int, note string) USING carbon
+ |OPTIONS("long_String_columns"="address, note") LOCATION
|'$writerPath' """.stripMargin)
}
- assert(spark.sql("select * from sdkOutputTable where age = 0").count() == 1)
- val op = spark.sql("select address from sdkOutputTable limit 1").collectAsList()
- assert(op.get(0).getString(0).length == 75000)
+ checkAnswer(spark.sql("select count(*) from sdkOutputTable where age = 0"), Seq(Row(1)))
+ checkAnswer(spark.sql("SELECT COUNT(*) FROM (select address,age,note from sdkOutputTable where length(address)=75000 and length(note)=75000)"),
+ Seq(Row(totalRecordsNum)))
spark.sql("DROP TABLE sdkOutputTable")
//--------------- data source external table without schema ---------------------------
@@ -471,16 +469,16 @@
spark
.sql(
s"""CREATE TABLE sdkOutputTableWithoutSchema USING carbon OPTIONS (PATH
- |'$writerPath', "long_String_columns" "address") """.stripMargin)
+ |'$writerPath', "long_String_columns" "address, note") """.stripMargin)
} else {
//data source file format
spark.sql(
s"""CREATE TABLE sdkOutputTableWithoutSchema USING carbon OPTIONS
- |("long_String_columns"="address") LOCATION '$writerPath' """.stripMargin)
+ |("long_String_columns"="address, note") LOCATION '$writerPath' """.stripMargin)
}
- assert(spark.sql("select * from sdkOutputTableWithoutSchema where age = 0").count() == 1)
- val op1 = spark.sql("select address from sdkOutputTableWithoutSchema limit 1").collectAsList()
- assert(op1.get(0).getString(0).length == 75000)
+ checkAnswer(spark.sql("select count(*) from sdkOutputTableWithoutSchema where age = 0"), Seq(Row(1)))
+ checkAnswer(spark.sql("SELECT COUNT(*) FROM (select address,age,note from sdkOutputTableWithoutSchema where length(address)=75000 and length(note)=75000)"),
+ Seq(Row(totalRecordsNum)))
spark.sql("DROP TABLE sdkOutputTableWithoutSchema")
clearDataMapCache
cleanTestData()
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 87930f6..ed2c956 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -32,6 +32,7 @@
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
@@ -412,12 +413,17 @@
public CarbonLoadModel buildLoadModel(Schema carbonSchema)
throws IOException, InvalidLoadOptionException {
timestamp = System.nanoTime();
- Set<String> longStringColumns = null;
- if (options != null && options.get("long_string_columns") != null) {
- longStringColumns =
- new HashSet<>(Arrays.asList(options.get("long_string_columns").toLowerCase().split(",")));
+ // validate long_string_column
+ Set<String> longStringColumns = new HashSet<>();
+ if (options != null && options.get(CarbonCommonConstants.LONG_STRING_COLUMNS) != null) {
+ String[] specifiedLongStrings =
+ options.get(CarbonCommonConstants.LONG_STRING_COLUMNS).toLowerCase().split(",");
+ for (String str : specifiedLongStrings) {
+ longStringColumns.add(str.trim());
+ }
validateLongStringColumns(carbonSchema, longStringColumns);
}
+ // for the longstring field, change the datatype from string to varchar
this.schema = updateSchemaFields(carbonSchema, longStringColumns);
// build CarbonTable using schema
CarbonTable table = buildCarbonTable();
@@ -603,12 +609,11 @@
for (int i = 0; i < fields.length; i++) {
if (fields[i] != null) {
fields[i].updateNameToLowerCase();
- }
-
- if (longStringColumns != null) {
- /* Also update the string type to varchar */
- if (longStringColumns.contains(fields[i].getFieldName())) {
- fields[i].updateDataTypeToVarchar();
+ if (longStringColumns != null) {
+ /* Also update the string type to varchar */
+ if (longStringColumns.contains(fields[i].getFieldName())) {
+ fields[i].updateDataTypeToVarchar();
+ }
}
}
}