[CARBONDATA-3943] Handling the addition of geo column to hive at the time of table creation.

Why is this PR needed?
PR #3774 adds geocolumn to hive when it is generated at the time of load.

What changes were proposed in this PR?
Handling the addition of column at create table itself. Added example class for the scenario
to check create geo table with carbon session.

Does this PR introduce any user interface change?
No

Is any new testcase added?
Yes

This closes #3879
diff --git a/examples/spark/src/main/scala/org/apache/carbondata/examples/GeoTableExampleWithCarbonSession.scala b/examples/spark/src/main/scala/org/apache/carbondata/examples/GeoTableExampleWithCarbonSession.scala
new file mode 100644
index 0000000..ee4a4a0
--- /dev/null
+++ b/examples/spark/src/main/scala/org/apache/carbondata/examples/GeoTableExampleWithCarbonSession.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.File
+
+import org.apache.log4j.PropertyConfigurator
+import org.apache.spark.sql.SparkSession
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.examples.util.ExampleUtils
+
+object GeoTableExampleWithCarbonSession {
+
+  def main(args: Array[String]) {
+    val rootPath = new File(this.getClass.getResource("/").getPath
+                            + "../../../..").getCanonicalPath
+    System.setProperty("path.target", s"$rootPath/examples/spark/target")
+    // print profiler log to a separated file: target/profiler.log
+    PropertyConfigurator.configure(
+      s"$rootPath/examples/spark/src/main/resources/log4j.properties")
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "false")
+    val spark = ExampleUtils.createCarbonSession("GeoTableExampleWithCarbonSession")
+    spark.sparkContext.setLogLevel("error")
+    Seq(
+      "stored as carbondata",
+      "using carbondata",
+      "stored by 'carbondata'",
+      "stored by 'org.apache.carbondata.format'"
+    ).foreach { formatSyntax =>
+      exampleBody(spark, formatSyntax)
+    }
+    spark.close()
+  }
+
+  def exampleBody(spark: SparkSession, formatSyntax: String = "stored as carbondata"): Unit = {
+
+    val rootPath = new File(this.getClass.getResource("/").getPath
+                            + "../../../..").getCanonicalPath
+    val path = s"$rootPath/integration/spark/src/test/resources/geodata.csv"
+
+    spark.sql("DROP TABLE IF EXISTS geoTable")
+
+    // Create table
+    spark.sql(
+      s"""
+        CREATE TABLE geoTable(
+         | timevalue BIGINT,
+         | longitude LONG,
+         | latitude LONG)
+         | $formatSyntax
+         |  TBLPROPERTIES ('SPATIAL_INDEX'='mygeohash',
+         | 'SPATIAL_INDEX.mygeohash.type'='geohash',
+         | 'SPATIAL_INDEX.mygeohash.sourcecolumns'='longitude, latitude',
+         | 'SPATIAL_INDEX.mygeohash.originLatitude'='39.832277',
+         | 'SPATIAL_INDEX.mygeohash.gridSize'='50',
+         | 'SPATIAL_INDEX.mygeohash.minLongitude'='115.811865',
+         | 'SPATIAL_INDEX.mygeohash.maxLongitude'='116.782233',
+         | 'SPATIAL_INDEX.mygeohash.minLatitude'='39.832277',
+         | 'SPATIAL_INDEX.mygeohash.maxLatitude'='40.225281',
+         | 'SPATIAL_INDEX.mygeohash.conversionRatio'='1000000')
+       """.stripMargin)
+    spark.sql("select *from geoTable").show()
+    val descTable = spark.sql(s"describe formatted geoTable").collect
+    // Test if spatial index column is added to column schema
+    descTable.find(_.get(0).toString.contains("mygeohash")) match {
+      case Some(row) => assert(row.get(1).toString.contains("bigint"))
+      case None => assert(false)
+    }
+    spark.sql(s"""LOAD DATA local inpath '$path' INTO TABLE geoTable OPTIONS
+           |('DELIMITER'= ',')""".stripMargin)
+    spark.sql("select *from geoTable").show()
+    spark.sql("DROP TABLE IF EXISTS geoTable")
+  }
+}
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 060de42..aab1a8c 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -31,7 +31,7 @@
 import org.apache.spark.sql.parser.CarbonSparkSqlParserUtil
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.streaming.OutputMode
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{Metadata, StructField, StructType}
 import org.apache.spark.sql.util.CarbonException
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
@@ -281,10 +281,22 @@
       isExternal)
     val updatedFormat = CarbonToSparkAdapter
       .getUpdatedStorageFormat(storageFormat, updatedTableProperties, tableInfo.getTablePath)
+    var catalogSchema = table.schema
+    val columnSchemas = tableInfo.getFactTable.getListOfColumns
+    val spatialProperty = updatedTableProperties.get(CarbonCommonConstants.SPATIAL_INDEX)
+    // In case of geo table, an additional spatial column is generated.
+    // Update schema with new column.
+    if (spatialProperty.isDefined) {
+      val spatialColumn = columnSchemas.asScala
+        .find(schema => schema.getColumnName.equalsIgnoreCase(spatialProperty.get.trim)).get
+      val additionalSchema = StructType(Array(StructField(spatialColumn.getColumnName,
+        org.apache.spark.sql.types.DataTypes.LongType, true, Metadata.empty)))
+      catalogSchema = StructType(additionalSchema ++ catalogSchema)
+    }
     val updatedSchema = if (isExternal) {
-      table.schema
+      catalogSchema
     } else {
-      CarbonSparkUtil.updateStruct(table.schema)
+      CarbonSparkUtil.updateStruct(catalogSchema)
     }
     table.copy(
       storage = updatedFormat,
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
index 55a6e9d..208befc 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
@@ -213,13 +213,13 @@
 
     // Process spatial index property
     val indexFields = processSpatialIndexProperty(tableProperties, fields)
-    val allFields = fields ++ indexFields
+    val allFields = indexFields ++ fields
 
     // do not allow below key words as column name
     validateColumnNames(allFields)
     CommonUtil.validateForSpatialTypeColumn(tableProperties)
 
-    fields.zipWithIndex.foreach { case (field, index) =>
+    allFields.zipWithIndex.foreach { case (field, index) =>
       field.schemaOrdinal = index
     }
 
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index b16579e..932c175 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -238,7 +238,8 @@
         // Here, catalogTable will have spatial column in schema which is used to build carbon
         // table. As spatial column is not supposed to be present in user-defined columns,
         // removing it here. Later from tableproperties the column will be added in carbonTable.
-        val spatialProperty = catalogTable.properties.get(CarbonCommonConstants.SPATIAL_INDEX)
+        val spatialProperty = catalogTable.storage.properties
+          .get(CarbonCommonConstants.SPATIAL_INDEX)
         if (spatialProperty.isDefined) {
           val originalSchema = StructType(catalogTable.schema.
             filterNot(_.name.equalsIgnoreCase(spatialProperty.get.trim)))
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
index 7235527..b7fcd37 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
@@ -131,7 +131,6 @@
 
   test("test materialized view with spatial column") {
     createTable()
-    loadData()
     val exception = intercept[MalformedCarbonCommandException](sql(
       s"CREATE MATERIALIZED VIEW view1 AS SELECT longitude, mygeohash FROM $table1"))
     assert(exception.getMessage.contains(
@@ -141,7 +140,6 @@
 
   test("test geo table create index on spatial column") {
     createTable()
-    loadData()
     val exception = intercept[MalformedIndexCommandException](sql(
       s"""
          | CREATE INDEX bloom_index ON TABLE $table1 (mygeohash)
@@ -152,9 +150,8 @@
       s"Spatial Index column is not supported, column 'mygeohash' is spatial column"))
   }
 
-  test("test geo table create and load and check describe formatted") {
+  test("test geo table create with spark session and check describe formatted") {
     createTable()
-    loadData()
     // Test if spatial index column is added as a sort column
     val descTable = sql(s"describe formatted $table1").collect
     descTable.find(_.get(0).toString.contains("Sort Scope")) match {
@@ -165,11 +162,42 @@
       case Some(row) => assert(row.get(1).toString.contains("mygeohash"))
       case None => assert(false)
     }
+    // Test if spatial index column is added to column schema
+    descTable.find(_.get(0).toString.contains("mygeohash")) match {
+      case Some(row) => assert(row.get(1).toString.contains("bigint"))
+      case None => assert(false)
+    }
+  }
+
+  test("test create geo table with spark session having syntax: using carbondata") {
+    sql(
+      s"""
+         | CREATE TABLE $table1(
+         | timevalue BIGINT,
+         | longitude LONG,
+         | latitude LONG)
+         | using carbondata
+         | options ('SPATIAL_INDEX'='mygeohash',
+         | 'SPATIAL_INDEX.mygeohash.type'='geohash',
+         | 'SPATIAL_INDEX.mygeohash.sourcecolumns'='longitude, latitude',
+         | 'SPATIAL_INDEX.mygeohash.originLatitude'='39.832277',
+         | 'SPATIAL_INDEX.mygeohash.gridSize'='50',
+         | 'SPATIAL_INDEX.mygeohash.minLongitude'='115.811865',
+         | 'SPATIAL_INDEX.mygeohash.maxLongitude'='116.782233',
+         | 'SPATIAL_INDEX.mygeohash.minLatitude'='39.832277',
+         | 'SPATIAL_INDEX.mygeohash.maxLatitude'='40.225281',
+         | 'SPATIAL_INDEX.mygeohash.conversionRatio'='1000000')
+       """.stripMargin)
+    val descTable = sql(s"describe formatted $table1").collect
+    // Test if spatial index column is added to column schema
+    descTable.find(_.get(0).toString.contains("mygeohash")) match {
+      case Some(row) => assert(row.get(1).toString.contains("bigint"))
+      case None => assert(false)
+    }
   }
 
   test("test geo table drop spatial index column") {
     createTable()
-    loadData()
     val exception = intercept[MalformedCarbonCommandException](
       sql(s"alter table $table1 drop columns(mygeohash)"))
     assert(exception.getMessage.contains(
@@ -179,7 +207,6 @@
 
   test("test geo table alter spatial index column") {
     createTable()
-    loadData()
     val exception = intercept[MalformedCarbonCommandException](
       sql(s"update $table1 set (mygeohash)=(111111) where longitude=116285807 "))
     assert(exception.getMessage.contains(