[CARBONDATA-3028][32k] Fix bugs in spark file format table with blanks in longstringcolumns

If we create a spark file format table with multiple longstringcolumns
and the option long_string_columns contains blank characters, the
query on that table will fail, cause it didn't recognize the correct
varchar columns. The root cause is that carbondata didn't trim the blank
in long_string_columns while it recognizing the varchar columns.

This closes #2834
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
index 755a7df..250e9a6 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
@@ -19,7 +19,6 @@
 
 import java.io.File
 import java.text.SimpleDateFormat
-import java.util
 import java.util.{Date, Random}
 
 import scala.collection.JavaConverters._
@@ -30,17 +29,14 @@
 import org.apache.spark.util.SparkUtil
 import org.apache.spark.sql.carbondata.datasource.TestUtil.{spark, _}
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonV3DataFormatConstants}
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataFileFooterConverter}
 import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema}
-import org.apache.hadoop.conf.Configuration
 import org.apache.spark.sql.Row
-import org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexReplaceRule
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter
@@ -433,14 +429,16 @@
       .append("[ \n")
       .append("   {\"name\":\"string\"},\n")
       .append("   {\"address\":\"varchar\"},\n")
-      .append("   {\"age\":\"int\"}\n")
+      .append("   {\"age\":\"int\"},\n")
+      .append("   {\"note\":\"varchar\"}\n")
       .append("]")
       .toString()
     val builder = CarbonWriter.builder()
     val writer = builder.outputPath(writerPath).withCsvInput(Schema.parseJson(schema)).build()
-    for (i <- 0 until 3) {
+    val totalRecordsNum = 3
+    for (i <- 0 until totalRecordsNum) {
       // write a varchar with 75,000 length
-      writer.write(Array[String](s"name_$i", RandomStringUtils.randomAlphabetic(75000), i.toString))
+      writer.write(Array[String](s"name_$i", RandomStringUtils.randomAlphabetic(75000), i.toString, RandomStringUtils.randomAlphabetic(75000)))
     }
     writer.close()
 
@@ -449,19 +447,19 @@
     if (spark.sparkContext.version.startsWith("2.1")) {
       //data source file format
       spark.sql(
-        s"""CREATE TABLE sdkOutputTable (name string, address string, age int)
-           |USING carbon OPTIONS (PATH '$writerPath', "long_String_columns" "address") """
+        s"""CREATE TABLE sdkOutputTable (name string, address string, age int, note string)
+           |USING carbon OPTIONS (PATH '$writerPath', "long_String_columns" "address, note") """
           .stripMargin)
     } else {
       //data source file format
       spark.sql(
-        s"""CREATE TABLE sdkOutputTable (name string, address string, age int) USING carbon
-           |OPTIONS("long_String_columns"="address") LOCATION
+        s"""CREATE TABLE sdkOutputTable (name string, address string, age int, note string) USING carbon
+           |OPTIONS("long_String_columns"="address, note") LOCATION
            |'$writerPath' """.stripMargin)
     }
-    assert(spark.sql("select * from sdkOutputTable where age = 0").count() == 1)
-    val op = spark.sql("select address from sdkOutputTable limit 1").collectAsList()
-    assert(op.get(0).getString(0).length == 75000)
+    checkAnswer(spark.sql("select count(*) from sdkOutputTable where age = 0"), Seq(Row(1)))
+    checkAnswer(spark.sql("SELECT COUNT(*) FROM (select address,age,note from sdkOutputTable where length(address)=75000 and length(note)=75000)"),
+      Seq(Row(totalRecordsNum)))
     spark.sql("DROP TABLE sdkOutputTable")
 
     //--------------- data source external table without schema ---------------------------
@@ -471,16 +469,16 @@
       spark
         .sql(
           s"""CREATE TABLE sdkOutputTableWithoutSchema USING carbon OPTIONS (PATH
-             |'$writerPath', "long_String_columns" "address") """.stripMargin)
+             |'$writerPath', "long_String_columns" "address, note") """.stripMargin)
     } else {
       //data source file format
       spark.sql(
         s"""CREATE TABLE sdkOutputTableWithoutSchema USING carbon OPTIONS
-           |("long_String_columns"="address") LOCATION '$writerPath' """.stripMargin)
+           |("long_String_columns"="address, note") LOCATION '$writerPath' """.stripMargin)
     }
-    assert(spark.sql("select * from sdkOutputTableWithoutSchema where age = 0").count() == 1)
-    val op1 = spark.sql("select address from sdkOutputTableWithoutSchema limit 1").collectAsList()
-    assert(op1.get(0).getString(0).length == 75000)
+    checkAnswer(spark.sql("select count(*) from sdkOutputTableWithoutSchema where age = 0"), Seq(Row(1)))
+    checkAnswer(spark.sql("SELECT COUNT(*) FROM (select address,age,note from sdkOutputTableWithoutSchema where length(address)=75000 and length(note)=75000)"),
+      Seq(Row(totalRecordsNum)))
     spark.sql("DROP TABLE sdkOutputTableWithoutSchema")
     clearDataMapCache
     cleanTestData()
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 87930f6..ed2c956 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -32,6 +32,7 @@
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
 import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
@@ -412,12 +413,17 @@
   public CarbonLoadModel buildLoadModel(Schema carbonSchema)
       throws IOException, InvalidLoadOptionException {
     timestamp = System.nanoTime();
-    Set<String> longStringColumns = null;
-    if (options != null && options.get("long_string_columns") != null) {
-      longStringColumns =
-          new HashSet<>(Arrays.asList(options.get("long_string_columns").toLowerCase().split(",")));
+    // validate long_string_column
+    Set<String> longStringColumns = new HashSet<>();
+    if (options != null && options.get(CarbonCommonConstants.LONG_STRING_COLUMNS) != null) {
+      String[] specifiedLongStrings =
+          options.get(CarbonCommonConstants.LONG_STRING_COLUMNS).toLowerCase().split(",");
+      for (String str : specifiedLongStrings) {
+        longStringColumns.add(str.trim());
+      }
       validateLongStringColumns(carbonSchema, longStringColumns);
     }
+    // for the longstring field, change the datatype from string to varchar
     this.schema = updateSchemaFields(carbonSchema, longStringColumns);
     // build CarbonTable using schema
     CarbonTable table = buildCarbonTable();
@@ -603,12 +609,11 @@
     for (int i = 0; i < fields.length; i++) {
       if (fields[i] != null) {
         fields[i].updateNameToLowerCase();
-      }
-
-      if (longStringColumns != null) {
-        /* Also update the string type to varchar */
-        if (longStringColumns.contains(fields[i].getFieldName())) {
-          fields[i].updateDataTypeToVarchar();
+        if (longStringColumns != null) {
+          /* Also update the string type to varchar */
+          if (longStringColumns.contains(fields[i].getFieldName())) {
+            fields[i].updateDataTypeToVarchar();
+          }
         }
       }
     }