[CARBONDATA-3835] Fix global sort issues

Why is this PR needed?
For global sort without partition, string comes as byte[], if we use string comparator (StringSerializableComparator) it will convert byte[] to toString which gives address and comparison goes wrong.
For global sort with partition, when sort column is partition column, it was sorting on first column instead of partition columns.

What changes were proposed in this PR?
change data type to byte before choosing a comparator.
get the sorted column based on index, don't just take from first

Does this PR introduce any user interface change?
No

Is any new testcase added?
Yes

This closes #3779
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index 55eee11..e7e1baf 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -32,6 +32,7 @@
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.command.ExecutionErrors
+import org.apache.spark.sql.types.{ByteType, DateType, LongType, StringType, TimestampType}
 import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.unsafe.types.UTF8String
@@ -237,10 +238,15 @@
         CarbonProperties.getInstance().getGlobalSortRddStorageLevel()))
     }
     val sortColumnsLength = model.getCarbonDataLoadSchema.getCarbonTable.getSortColumns.size()
-    val sortColumnDataTypes = dataTypes.take(sortColumnsLength)
-    val rowComparator = GlobalSortHelper.generateRowComparator(sortColumnDataTypes)
+    var sortColumnDataTypes = dataTypes.take(sortColumnsLength)
+    sortColumnDataTypes = sortColumnDataTypes.map {
+      case StringType => ByteType
+      case TimestampType | DateType => LongType
+      case datatype => datatype
+    }
+    val rowComparator = GlobalSortHelper.generateRowComparator(sortColumnDataTypes.zipWithIndex)
     val sortRDD = rdd.sortBy(row =>
-      getKey(row, sortColumnsLength, sortColumnDataTypes),
+      getKey(row, sortColumnsLength),
       true,
       numPartitions)(
       rowComparator, classTag[Array[AnyRef]])
@@ -273,8 +279,7 @@
   }
 
   def getKey(row: Array[AnyRef],
-      sortColumnsLength: Int,
-      dataTypes: Seq[org.apache.spark.sql.types.DataType]): Array[AnyRef] = {
+      sortColumnsLength: Int): Array[AnyRef] = {
     val key: Array[AnyRef] = new Array[AnyRef](sortColumnsLength)
     System.arraycopy(row, 0, key, 0, sortColumnsLength)
     key
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/load/GlobalSortHelper.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/load/GlobalSortHelper.scala
index 91ed27e..00891b9 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/load/GlobalSortHelper.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/load/GlobalSortHelper.scala
@@ -50,7 +50,7 @@
 
   def sortBy(updatedRdd: RDD[InternalRow],
       numPartitions: Int,
-      dataTypes: Seq[DataType]
+      dataTypes: Seq[(DataType, Int)]
   ): RDD[InternalRow] = {
     val keyExtractors = generateKeyExtractor(dataTypes)
     val rowComparator = generateRowComparator(dataTypes)
@@ -68,9 +68,8 @@
     key
   }
 
-  def generateKeyExtractor(dataTypes: Seq[DataType]): Array[KeyExtractor] = {
+  def generateKeyExtractor(dataTypes: Seq[(DataType, Int)]): Array[KeyExtractor] = {
     dataTypes
-      .zipWithIndex
       .map { attr =>
         attr._1 match {
           case StringType => UTF8StringKeyExtractor(attr._2)
@@ -91,9 +90,8 @@
       .toArray
   }
 
-  def generateRowComparator(dataTypes: Seq[DataType]): InternalRowComparator = {
+  def generateRowComparator(dataTypes: Seq[(DataType, Int)]): InternalRowComparator = {
     val comparators = dataTypes
-      .zipWithIndex
       .map { attr =>
         val comparator = attr._1 match {
           case StringType => new StringSerializableComparator()
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
index ce699d6..20d29d8 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
@@ -518,7 +518,12 @@
         } else {
           attributes.take(table.getSortColumns.size())
         }
-      val dataTypes = sortColumns.map(_.dataType)
+      val attributesWithIndex = attributes.zipWithIndex
+      val dataTypes = sortColumns.map { column =>
+        val attributeWithIndex =
+          attributesWithIndex.find(x => x._1.name.equalsIgnoreCase(column.name))
+        (column.dataType, attributeWithIndex.get._2)
+      }
       val sortedRDD: RDD[InternalRow] =
         GlobalSortHelper.sortBy(updatedRdd, numPartitions, dataTypes)
       val outputOrdering = sortColumns.map(SortOrder(_, Ascending))
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala b/integration/spark/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
index ef9d1b2..c08370b 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
@@ -90,6 +90,13 @@
     }
   }
 
+  protected def checkAnswerWithoutSort(df: DataFrame, expectedAnswer: Seq[Row]): Unit = {
+    QueryTest.checkAnswer(df, expectedAnswer, needSort = false) match {
+      case Some(errorMessage) => fail(errorMessage)
+      case None =>
+    }
+  }
+
   protected def checkAnswer(df: DataFrame, expectedAnswer: Row): Unit = {
     checkAnswer(df, Seq(expectedAnswer))
   }
@@ -219,8 +226,14 @@
    * @param df the [[DataFrame]] to be executed
    * @param expectedAnswer the expected result in a [[Seq]] of [[Row]]s.
    */
-  def checkAnswer(df: DataFrame, expectedAnswer: Seq[Row]): Option[String] = {
-    val isSorted = df.logicalPlan.collect { case s: logical.Sort => s }.nonEmpty
+  def checkAnswer(df: DataFrame,
+      expectedAnswer: Seq[Row],
+      needSort: Boolean = true): Option[String] = {
+    val isSorted = if (needSort) {
+      df.logicalPlan.collect { case s: logical.Sort => s }.nonEmpty
+    } else {
+      true
+    }
     def prepareAnswer(answer: Seq[Row]): Seq[Row] = {
       // Converts data to types that we can do equality comparison using Scala collections.
       // For BigDecimal type, the Scala type has a better definition of equality test (similar to
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
index 3deb8ba..a9c773d 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
@@ -48,6 +48,66 @@
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE originTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
   }
 
+  test("test global sort column as partition column") {
+    sql("drop table if exists table1")
+    sql(s"""
+         | create table table1(
+         | timestampField TIMESTAMP
+         | )
+         | stored as carbondata
+         | tblproperties('sort_scope'='global_sort', 'sort_columns'='charField')
+         | partitioned by (charField String)
+       """.stripMargin)
+    sql(
+      s"""
+         | INSERT INTO TABLE table1
+         | SELECT
+         | '2015-07-01 00:00:00', 'aaa'
+       """.stripMargin)
+    checkAnswer(sql(s"SELECT charField FROM table1"), Seq(Row("aaa")))
+    sql("drop table if exists table1")
+  }
+
+  test("test global sort order") {
+    sql("DROP TABLE IF EXISTS gs_test")
+    sql("create table gs_test (id string) " +
+        "using carbondata options('sort_scope'='global_sort', 'local_dictionary_enable'='false', 'sort_columns'='id','global_sort_partitions'='1')")
+    sql("insert into gs_test select 'abc1' union all select 'abc5' union all select 'abc10' union all select 'abc20' ")
+    checkAnswerWithoutSort(sql("select * from gs_test"), Seq(Row("abc1"), Row("abc10"), Row("abc20"), Row("abc5")))
+    sql("DROP TABLE IF EXISTS gs_test")
+  }
+
+  test("test global sort order in old insert flow") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT, "true");
+    sql("DROP TABLE IF EXISTS gs_test")
+    sql("create table gs_test (id string) " +
+        "using carbondata options('sort_scope'='global_sort', 'local_dictionary_enable'='false', 'sort_columns'='id','global_sort_partitions'='1')")
+    sql("insert into gs_test select 'abc1' union all select 'abc5' union all select 'abc10' union all select 'abc20' ")
+    checkAnswerWithoutSort(sql("select id from gs_test"), Seq(Row("abc1"), Row("abc10"), Row("abc20"), Row("abc5")))
+    sql("DROP TABLE IF EXISTS gs_test")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT, "false");
+  }
+
+  test("test global sort partition order") {
+    sql("DROP TABLE IF EXISTS gs_test")
+    sql("create table gs_test (id string) partitioned by (val int)" +
+        "stored as carbondata tblproperties('sort_scope'='global_sort', 'local_dictionary_enable'='false', 'sort_columns'='id','global_sort_partitions'='1')")
+    sql("insert into gs_test select 'abc1', 1 union all select 'abc5', 1 union all select 'abc10', 1 union all select 'abc20', 1 ")
+    checkAnswerWithoutSort(sql("select id from gs_test"), Seq(Row("abc1"), Row("abc10"), Row("abc20"), Row("abc5")))
+    sql("DROP TABLE IF EXISTS gs_test")
+  }
+
+  test("test global sort order in old insert partition flow") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT, "true");
+    sql("DROP TABLE IF EXISTS gs_test")
+    sql("create table gs_test (id string) " +
+        "using carbondata options('sort_scope'='global_sort', 'local_dictionary_enable'='false', 'sort_columns'='id','global_sort_partitions'='1')")
+    sql("insert into gs_test select 'abc1' union all select 'abc5' union all select 'abc10' union all select 'abc20' ")
+    checkAnswerWithoutSort(sql("select id from gs_test"), Seq(Row("abc1"), Row("abc10"), Row("abc20"), Row("abc5")))
+    sql("DROP TABLE IF EXISTS gs_test")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT, "false");
+  }
+
   test("data loading for global sort partition table for one partition column") {
     sql(
       """