[CARBONDATA-3833] Make geoID visible

Why is this PR needed?
To make geohash column visible to the user

What changes were proposed in this PR?
Generated geohash column is included in the schema. Validation added to avoid alter and drop the geohash column. Indexes, MV and other table properties not supported on this column.

Does this PR introduce any user interface change?
No

Is any new testcase added?
Yes

This closes #3774
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
index 80fb2e1..255fa6f 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
@@ -207,7 +207,6 @@
     thriftColumnSchema.setInvisible(wrapperColumnSchema.isInvisible());
     thriftColumnSchema.setColumnReferenceId(wrapperColumnSchema.getColumnReferenceId());
     thriftColumnSchema.setSchemaOrdinal(wrapperColumnSchema.getSchemaOrdinal());
-    thriftColumnSchema.setSpatialColumn(wrapperColumnSchema.isSpatialColumn());
     if (wrapperColumnSchema.isSortColumn()) {
       Map<String, String> properties = wrapperColumnSchema.getColumnProperties();
       if (null == properties) {
@@ -506,7 +505,6 @@
     wrapperColumnSchema.setInvisible(externalColumnSchema.isInvisible());
     wrapperColumnSchema.setColumnReferenceId(externalColumnSchema.getColumnReferenceId());
     wrapperColumnSchema.setSchemaOrdinal(externalColumnSchema.getSchemaOrdinal());
-    wrapperColumnSchema.setSpatialColumn(externalColumnSchema.isSpatialColumn());
     wrapperColumnSchema.setSortColumn(false);
     Map<String, String> properties = externalColumnSchema.getColumnProperties();
     if (properties != null) {
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 81ce39a..cbaf18a 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -300,9 +300,7 @@
   private void fillCreateOrderColumn() {
     List<CarbonColumn> columns = new ArrayList<CarbonColumn>();
     for (CarbonDimension dimension : visibleDimensions) {
-      if (!dimension.getColumnSchema().isSpatialColumn()) {
-        columns.add(dimension);
-      }
+      columns.add(dimension);
     }
     columns.addAll(visibleMeasures);
     columns.sort(new Comparator<CarbonColumn>() {
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java
index ae2775a..40015a3 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java
@@ -167,14 +167,6 @@
   }
 
   /**
-   * Checks if it is spatial index column
-   * @return Returns True if the column is an spatial index column. Otherwise returns False.
-   */
-  public boolean isSpatialColumn() {
-    return columnSchema.isSpatialColumn();
-  }
-
-  /**
    * @return column property map
    */
   public Map<String, String> getColumnProperties() {
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
index dd4b4a0..88b55ce 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
@@ -122,11 +122,6 @@
   private boolean isSortColumn = false;
 
   /**
-   *  Whether it is a spatial index column
-   */
-  private boolean spatialColumn = false;
-
-  /**
    * aggregate function used in pre aggregate table
    */
   private String aggFunction = "";
@@ -538,7 +533,6 @@
       }
     }
     out.writeBoolean(isLocalDictColumn);
-    out.writeBoolean(spatialColumn);
   }
 
   @Override
@@ -588,7 +582,6 @@
       }
     }
     this.isLocalDictColumn = in.readBoolean();
-    this.spatialColumn = in.readBoolean();
   }
 
   /**
@@ -612,21 +605,4 @@
       throw new RuntimeException("Error occur while cloning ColumnSchema", e);
     }
   }
-
-  /**
-   * Checks whether it is a spatial index column.
-   * @return Returns True if the column is a spatial index column. Otherwise returns False.
-   */
-  public boolean isSpatialColumn() {
-    return spatialColumn;
-  }
-
-  /**
-   * Set the column spatial index property. True or False to indicate column is a spatial index
-   * column or not respectively.
-   * @param spatialColumn True or False
-   */
-  public void setSpatialColumn(boolean spatialColumn) {
-    this.spatialColumn = spatialColumn;
-  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index fa77fc4..12d96e9 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -1966,7 +1966,6 @@
     wrapperColumnSchema.setScale(externalColumnSchema.getScale());
     wrapperColumnSchema.setDefaultValue(externalColumnSchema.getDefault_value());
     wrapperColumnSchema.setSchemaOrdinal(externalColumnSchema.getSchemaOrdinal());
-    wrapperColumnSchema.setSpatialColumn(externalColumnSchema.isSpatialColumn());
     Map<String, String> properties = externalColumnSchema.getColumnProperties();
     if (properties != null) {
       if (properties.get(CarbonCommonConstants.SORT_COLUMNS) != null) {
diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java b/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
index fadc2ad..236c3fc 100644
--- a/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
@@ -158,11 +158,6 @@
         return thriftColumnSchema;
       }
 
-      @Mock public org.apache.carbondata.format.ColumnSchema setSpatialColumn(
-          boolean spatialColumn) {
-        return thriftColumnSchema;
-      }
-
       @Mock public String getColumn_id() {
         return "1";
       }
diff --git a/format/src/main/thrift/schema.thrift b/format/src/main/thrift/schema.thrift
index b7fabef..fc3a2ff 100644
--- a/format/src/main/thrift/schema.thrift
+++ b/format/src/main/thrift/schema.thrift
@@ -133,11 +133,6 @@
   **/
   /** Deprecated */
 	17: optional list<ParentColumnTableRelation> parentColumnTableRelations;
-
-  /**
-   * To specify if it is a spatial index column. Its Default value is false
-	 */
-	18: optional bool spatialColumn;
 }
 
 /**
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/util/Util.java b/integration/spark/src/main/java/org/apache/carbondata/spark/util/Util.java
index 0fce43a..30c2b5f 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/util/Util.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/util/Util.java
@@ -111,7 +111,7 @@
     List<ColumnSchema> columns = table.getTableInfo().getFactTable().getListOfColumns();
     List<ColumnSchema> validColumnSchema = new ArrayList<>();
     for (ColumnSchema column : columns) {
-      if (!column.isInvisible() && !column.isSpatialColumn() && !column.isComplexColumn()) {
+      if (!column.isInvisible() && !column.isComplexColumn()) {
         validColumnSchema.add(column);
       }
     }
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index 05b9ac3..6851959 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -209,16 +209,6 @@
           row = new CarbonRow(rowParser.parseRow(rows.next()))
         }
         row = rowConverter.convert(row)
-        if (row != null) {
-          // In case of partition, after Input processor and converter steps, all the rows are given
-          // to hive to create partition folders. As hive is unaware of non-schema columns,
-          // should discard those columns from rows and return.
-          val schemaColumnValues = row.getData.zipWithIndex.collect {
-            case (data, index) if !conf.getDataFields()(index).getColumn.isSpatialColumn =>
-              data
-          }
-          row.setData(schemaColumnValues)
-        }
         rowCounter.add(1)
         row
       }
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 540de75..98e8152 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -375,7 +375,7 @@
               .getFactTable
               .getListOfColumns
               .asScala
-              .filterNot(col => col.isInvisible || col.isSpatialColumn || col.isComplexColumn)
+              .filterNot(col => col.isInvisible || col.isComplexColumn)
             val convertedRdd = CommonLoadUtils.getConvertedInternalRow(
               colSchema,
               scanResultRdd.get,
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
index b30f104..38f2f15 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
@@ -71,10 +71,10 @@
     val fields = new Array[String](
       carbonRelation.dimensionsAttr.size + carbonRelation.measureAttr.size)
     val carbonTable = carbonRelation.carbonTable
-    val columnSchemas: mutable.Buffer[ColumnSchema] = carbonTable.getTableInfo.getFactTable.
-      getListOfColumns.asScala
-      .filter(cSchema => !cSchema.isInvisible && cSchema.getSchemaOrdinal != -1 &&
-                         !cSchema.isSpatialColumn).sortWith(_.getSchemaOrdinal < _.getSchemaOrdinal)
+    val columnSchemas: mutable.Buffer[ColumnSchema] = carbonTable.getTableInfo
+      .getFactTable.getListOfColumns.asScala
+      .filter(cSchema => !cSchema.isInvisible && cSchema.getSchemaOrdinal != -1)
+      .sortWith(_.getSchemaOrdinal < _.getSchemaOrdinal)
     val columnList = columnSchemas.toList.asJava
     carbonRelation.dimensionsAttr.foreach(attr => {
       val carbonColumn = carbonTable.getColumnByName(attr.name)
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index e55f205..e998491 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -654,6 +654,30 @@
   }
 
   /**
+   * This method will validate for spatial column
+   *
+   * @param properties
+   */
+  def validateForSpatialTypeColumn(properties: Map[String, String]): Unit = {
+    // Do not allow to set table properties on spatial column.
+    // Spatial column is only allowed in sort columns and spatial index property
+    val spatialProperty = properties.get(CarbonCommonConstants.SPATIAL_INDEX)
+    if (spatialProperty.isDefined) {
+      val spatialColumn = spatialProperty.get.trim
+      properties.foreach { case (key, value) =>
+        if (!key.startsWith(CarbonCommonConstants.SPATIAL_INDEX) &&
+            !key.equalsIgnoreCase(CarbonCommonConstants.SORT_COLUMNS) &&
+            value.contains(spatialColumn)) {
+          val errorMessage =
+            s"$spatialColumn is a spatial index column and is not allowed for " +
+            s"the option(s): $key"
+          throw new MalformedCarbonCommandException(errorMessage)
+        }
+      }
+    }
+  }
+
+  /**
    * This method will validate the cache level
    *
    * @param cacheLevel
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
index 0c67aeb..55a6e9d 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
@@ -217,6 +217,7 @@
 
     // do not allow below key words as column name
     validateColumnNames(allFields)
+    CommonUtil.validateForSpatialTypeColumn(tableProperties)
 
     fields.zipWithIndex.foreach { case (field, index) =>
       field.schemaOrdinal = index
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index e551301..1ecb004 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -648,7 +648,6 @@
     columnSchema.setScale(field.scale)
     columnSchema.setSchemaOrdinal(field.schemaOrdinal)
     columnSchema.setSortColumn(false)
-    columnSchema.setSpatialColumn(field.spatialIndex)
     if (isVarcharColumn(colName)) {
       columnSchema.setDataType(DataTypes.VARCHAR)
     }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/CarbonCreateIndexCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/CarbonCreateIndexCommand.scala
index 16df7bb..8ecf107 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/CarbonCreateIndexCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/CarbonCreateIndexCommand.scala
@@ -285,7 +285,15 @@
       indexProvider: String): java.util.List[CarbonColumn] = {
     val indexCarbonColumns = parentTable.getIndexedColumns(indexColumns)
     val unique: util.Set[String] = new util.HashSet[String]
+    val properties = parentTable.getTableInfo.getFactTable.getTableProperties.asScala
+    val spatialProperty = properties.get(CarbonCommonConstants.SPATIAL_INDEX)
     for (indexColumn <- indexCarbonColumns.asScala) {
+      if (spatialProperty.isDefined &&
+          indexColumn.getColName.equalsIgnoreCase(spatialProperty.get.trim)) {
+        throw new MalformedIndexCommandException(String.format(
+          "Spatial Index column is not supported, column '%s' is spatial column",
+          indexColumn.getColName))
+      }
       if (indexProvider.equalsIgnoreCase(IndexType.LUCENE.getIndexProviderName)) {
         // validate whether it is string column.
         if (indexColumn.getDataType != DataTypes.STRING) {
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
index 01afa5d..a87042a 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
@@ -35,6 +35,7 @@
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
@@ -471,6 +472,8 @@
     var reArrangedIndex: Seq[Int] = Seq()
     var selectedColumnSchema: Seq[ColumnSchema] = Seq()
     var partitionIndex: Seq[Int] = Seq()
+    val properties = tableInfo.getFactTable.getTableProperties.asScala
+    val spatialProperty = properties.get(CarbonCommonConstants.SPATIAL_INDEX)
     // internal order ColumnSchema (non-flat structure)
     val columnSchema = (table.getVisibleDimensions.asScala ++
                         table.getVisibleMeasures.asScala).map(_.getColumnSchema)
@@ -494,20 +497,20 @@
     }
     columnSchema.foreach {
       col =>
-        if (col.isSpatialColumn) {
+        if (spatialProperty.isDefined &&
+            col.getColumnName.equalsIgnoreCase(spatialProperty.get.trim)) {
           carbonLoadModel.setNonSchemaColumnsPresent(true)
+        }
+        var skipPartitionColumn = false
+        if (partitionColumnNames != null &&
+            partitionColumnNames.contains(col.getColumnName)) {
+          partitionIndex = partitionIndex :+ createOrderMap(col.getColumnName)
+          skipPartitionColumn = true
         } else {
-          var skipPartitionColumn = false
-          if (partitionColumnNames != null &&
-              partitionColumnNames.contains(col.getColumnName)) {
-            partitionIndex = partitionIndex :+ createOrderMap(col.getColumnName)
-            skipPartitionColumn = true
-          } else {
-            reArrangedIndex = reArrangedIndex :+ createOrderMap(col.getColumnName)
-          }
-          if (!skipPartitionColumn) {
-            selectedColumnSchema = selectedColumnSchema :+ col
-          }
+          reArrangedIndex = reArrangedIndex :+ createOrderMap(col.getColumnName)
+        }
+        if (!skipPartitionColumn) {
+          selectedColumnSchema = selectedColumnSchema :+ col
         }
     }
     if (partitionColumnSchema != null) {
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
index 15a0823..15428fa 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
@@ -864,7 +864,7 @@
       // input data from csv files. Convert to logical plan
       val allCols = new ArrayBuffer[String]()
       // get only the visible dimensions from table
-      allCols ++= table.getVisibleDimensions.asScala.filterNot(_.isSpatialColumn).map(_.getColName)
+      allCols ++= table.getVisibleDimensions.asScala.map(_.getColName)
       allCols ++= table.getVisibleMeasures.asScala.map(_.getColName)
       StructType(
         allCols.filterNot(_.equals(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)).map(
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index 7d3225d..16d90ef 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -27,6 +27,8 @@
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types.{ArrayType, LongType}
 import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.AlterTableUtil
+import scala.collection.JavaConverters._
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -70,6 +72,8 @@
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
     setAuditTable(carbonTable)
     setAuditInfo(Map("plan" -> plan.simpleString))
+    // Do not allow spatial index and its source columns to be updated.
+    AlterTableUtil.validateColumnsWithSpatialIndexProperties(carbonTable, columns)
     columns.foreach { col =>
       val dataType = carbonTable.getColumnByName(col).getColumnSchema.getDataType
       if (dataType.isComplexType) {
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
index 75c21ae..8b79b70 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -62,9 +62,6 @@
         throw new MalformedCarbonCommandException(
           "alter table add column is not supported for index indexSchema")
       }
-      val alterColumns =
-        (alterTableAddColumnsModel.dimCols ++ alterTableAddColumnsModel.msrCols).map(_.column)
-      AlterTableUtil.validateSpatialIndexColumn(carbonTable, alterColumns)
       val operationContext = new OperationContext
       val alterTableAddColumnListener = AlterTableAddColumnPreEvent(sparkSession, carbonTable,
         alterTableAddColumnsModel)
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
index ae80a12..1c27dd7 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
@@ -127,8 +127,8 @@
         throw new MalformedCarbonCommandException(
           "alter table column rename is not supported for index indexSchema")
       }
-      // Do not allow spatial index source columns to be changed.
-      AlterTableUtil.validateSpatialIndexSources(carbonTable,
+      // Do not allow spatial index column and its source columns to be changed.
+      AlterTableUtil.validateColumnsWithSpatialIndexProperties(carbonTable,
         List(alterTableColRenameAndDataTypeChangeModel.columnName))
       val operationContext = new OperationContext
       operationContext.setProperty("childTableColumnRename", childTableColumnRename)
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index 5c03871..2289aad 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -63,7 +63,8 @@
           "alter table drop column is not supported for index indexSchema")
       }
       // Do not allow spatial index source columns to be dropped.
-      AlterTableUtil.validateSpatialIndexSources(carbonTable, alterTableDropColumnModel.columns)
+      AlterTableUtil.validateColumnsWithSpatialIndexProperties(carbonTable,
+        alterTableDropColumnModel.columns)
       val partitionInfo = carbonTable.getPartitionInfo()
       val tableColumns = carbonTable.getCreateOrderColumn().asScala
       if (partitionInfo != null) {
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
index 5be4ef7..9de0833 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
@@ -172,10 +172,25 @@
     val viewSchema = getOutputSchema(logicalPlan)
     val relatedTables = getRelatedTables(logicalPlan)
     val relatedTableList = toCarbonTables(session, relatedTables)
+    val inputCols = logicalPlan.output.map(x =>
+      x.name
+    ).toList
     val relatedTableNames = new util.ArrayList[String](relatedTableList.size())
     // Check if load is in progress in any of the parent table mapped to the indexSchema
     relatedTableList.asScala.foreach {
       table =>
+        val tableProperties = table.getTableInfo.getFactTable.getTableProperties.asScala
+        // validate for spatial index column
+        val spatialProperty = tableProperties.get(CarbonCommonConstants.SPATIAL_INDEX)
+        if (spatialProperty.isDefined) {
+          val spatialColumn = spatialProperty.get.trim
+          if (inputCols.contains(spatialColumn)) {
+            val errorMessage =
+              s"$spatialColumn is a spatial index column and is not allowed for " +
+              s"the option(s): MATERIALIZED VIEW"
+            throw new MalformedCarbonCommandException(errorMessage)
+          }
+        }
         if (!table.getTableInfo.isTransactionalTable) {
           throw new MalformedCarbonCommandException(
             "Cannot create mv on non-transactional table")
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 0b898eb..3caabb1 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -31,9 +31,11 @@
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.parser.CarbonSparkSqlParserUtil
 import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.CarbonReflectionUtils
 
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.fileoperations.FileWriteOperation
@@ -228,8 +230,17 @@
             c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||
             c.getClass.getName.equals(
               "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation")) =>
-        val catalogTable =
+        var catalogTable =
           CarbonReflectionUtils.getFieldOfCatalogTable("tableMeta", c).asInstanceOf[CatalogTable]
+        // Here, catalogTable will have spatial column in schema which is used to build carbon
+        // table. As spatial column is not supposed to be present in user-defined columns,
+        // removing it here. Later from tableproperties the column will be added in carbonTable.
+        val spatialProperty = catalogTable.properties.get(CarbonCommonConstants.SPATIAL_INDEX)
+        if (spatialProperty.isDefined) {
+          val originalSchema = StructType(catalogTable.schema.
+            filterNot(_.name.equalsIgnoreCase(spatialProperty.get.trim)))
+          catalogTable = catalogTable.copy(schema = originalSchema)
+        }
         val tableInfo = CarbonSparkSqlParserUtil.buildTableInfoFromCatalogTable(
           catalogTable, false, sparkSession)
         val carbonTable = CarbonTable.buildFromTableInfo(tableInfo)
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
index 508e910..264d06b 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
@@ -49,11 +49,7 @@
   }
 
   val dimensionsAttr: Seq[AttributeReference] = {
-    val sett = new LinkedHashSet(carbonTable
-      .getVisibleDimensions
-      .asScala
-      .filterNot(_.isSpatialColumn)
-      .asJava)
+    val sett = new LinkedHashSet(carbonTable.getVisibleDimensions.asScala.asJava)
     sett.asScala.toSeq.map(dim => {
       val dimension = carbonTable.getDimensionByName(dim.getColName)
       val output: DataType = dimension.getDataType.getName.toLowerCase match {
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
index a03345f..3b8a19b 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
@@ -210,6 +210,14 @@
             .get
         }")
       }
+      val properties = carbonTable.getTableInfo.getFactTable.getTableProperties.asScala
+      val spatialProperty = properties.get(CarbonCommonConstants.SPATIAL_INDEX)
+      if (spatialProperty.isDefined) {
+        if (indexModel.columnNames.exists(x => x.equalsIgnoreCase(spatialProperty.get.trim))) {
+          throw new ErrorMessage(s"Secondary Index is not supported for Spatial index column:" +
+                                 s" ${ spatialProperty.get.trim }")
+        }
+      }
       if (indexModel.columnNames.exists(x => !dimNames.contains(x))) {
         throw new ErrorMessage(
           s"one or more specified index cols either does not exist or not a key column or complex" +
diff --git a/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 8d12a7c..2d2e711 100644
--- a/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -422,8 +422,6 @@
       properties.foreach { entry =>
         lowerCasePropertiesMap.put(entry._1.toLowerCase, entry._2)
       }
-      // validate the required cache level properties
-      validateColumnMetaCacheAndCacheLevel(carbonTable, lowerCasePropertiesMap)
       // get the latest carbon table
       // read the latest schema file
       val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable)
@@ -438,6 +436,12 @@
       val tblPropertiesMap: mutable.Map[String, String] =
         thriftTable.fact_table.getTableProperties.asScala
 
+      // validate for spatial index column
+      CommonUtil.validateForSpatialTypeColumn(tblPropertiesMap ++ lowerCasePropertiesMap)
+
+      // validate the required cache level properties
+      validateColumnMetaCacheAndCacheLevel(carbonTable, lowerCasePropertiesMap)
+
       // validate the local dictionary properties
       validateLocalDictionaryProperties(lowerCasePropertiesMap, tblPropertiesMap, carbonTable)
 
@@ -625,7 +629,7 @@
     if (propertiesMap.get(CarbonCommonConstants.COLUMN_META_CACHE).isDefined) {
       val schemaList: util.List[ColumnSchema] = CarbonUtil
         .getColumnSchemaList(carbonTable.getVisibleDimensions.asScala
-          .filterNot(_.getColumnSchema.isSpatialColumn).asJava, carbonTable.getVisibleMeasures)
+          .asJava, carbonTable.getVisibleMeasures)
       val tableColumns: Seq[String] = schemaList.asScala
         .map(columnSchema => columnSchema.getColumnName)
       CommonUtil
@@ -1060,21 +1064,9 @@
     }
   }
 
-  def validateSpatialIndexColumn(carbonTable: CarbonTable, alterColumns: Seq[String]): Unit = {
-    // Do not allow columns to be added with spatial index column name
-    val properties = carbonTable.getTableInfo.getFactTable.getTableProperties.asScala
-    val indexProperty = properties.get(CarbonCommonConstants.SPATIAL_INDEX)
-    if (indexProperty.isDefined) {
-      indexProperty.get.split(",").map(_.trim).foreach(element =>
-        if (alterColumns.contains(element)) {
-          throw new MalformedCarbonCommandException(s"Column: $element is not allowed. " +
-            s"This column is present in ${CarbonCommonConstants.SPATIAL_INDEX} table property.")
-        })
-      }
-  }
-
-  def validateSpatialIndexSources(carbonTable: CarbonTable, alterColumns: Seq[String]): Unit = {
-    // Do not allow spatial index source columns to be altered
+  def validateColumnsWithSpatialIndexProperties(carbonTable: CarbonTable, alterColumns: Seq[String])
+  : Unit = {
+    // Do not allow spatial index column and its source columns to be altered
     val properties = carbonTable.getTableInfo.getFactTable.getTableProperties.asScala
     val indexProperty = properties.get(CarbonCommonConstants.SPATIAL_INDEX)
     if (indexProperty.isDefined) {
@@ -1082,9 +1074,9 @@
         val srcColumns
         = properties.get(CarbonCommonConstants.SPATIAL_INDEX + s".$element.sourcecolumns")
         val common = alterColumns.intersect(srcColumns.get.split(",").map(_.trim))
-        if (common.nonEmpty) {
+        if (common.nonEmpty || alterColumns.contains(element)) {
           throw new MalformedCarbonCommandException(s"Columns present in " +
-            s"${CarbonCommonConstants.SPATIAL_INDEX} table property cannot be altered.")
+            s"${CarbonCommonConstants.SPATIAL_INDEX} table property cannot be altered/updated")
         }
       }
     }
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
index baf2f61..7235527 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
@@ -3,8 +3,7 @@
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
-
-import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedIndexCommandException}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 
 class GeoTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
@@ -75,6 +74,84 @@
       s"table."))
   }
 
+  test("test geo table with invalid table properties") {
+    var exception = intercept[MalformedCarbonCommandException](
+      createTable(table1, " 'RANGE_COLUMN'='timevalue', 'COLUMN_META_CACHE' = 'mygeohash', "))
+    assert(exception.getMessage.contains(
+      s"mygeohash is a spatial index column and is not allowed for " +
+      s"the option(s): column_meta_cache"))
+
+    exception = intercept[MalformedCarbonCommandException](
+      createTable(table1, " 'NO_INVERTED_INDEX'='mygeohash', "))
+    assert(exception.getMessage.contains(
+      s"mygeohash is a spatial index column and is not allowed for " +
+      s"the option(s): no_inverted_index"))
+
+    exception = intercept[MalformedCarbonCommandException](
+      createTable(table1,
+        " 'SORT_COLUMNS'='mygeohash, timevalue ', 'INVERTED_INDEX'='mygeohash', "))
+    assert(exception.getMessage.contains(
+      s"mygeohash is a spatial index column and is not allowed for " +
+      s"the option(s): inverted_index"))
+
+    exception = intercept[MalformedCarbonCommandException](
+      createTable(table1, " 'RANGE_COLUMN'='mygeohash', "))
+    assert(exception.getMessage.contains(
+      s"mygeohash is a spatial index column and is not allowed for " +
+      s"the option(s): range_column"))
+
+    exception = intercept[MalformedCarbonCommandException](
+      createTable(table1, " 'BUCKET_NUMBER'='10', 'BUCKET_COLUMNS'='mygeohash', "))
+    assert(exception.getMessage.contains(
+      s"mygeohash is a spatial index column and is not allowed for " +
+      s"the option(s): bucket_columns"))
+  }
+
+  test("test alter table with invalid table properties") {
+    createTable()
+    var exception = intercept[RuntimeException](
+      sql(s"ALTER TABLE $table1 SET TBLPROPERTIES('SORT_COLUMNS'='mygeohash, timevalue ', " +
+          s"'INVERTED_INDEX'='mygeohash')"))
+    assert(exception.getMessage.contains(
+      s"mygeohash is a spatial index column and is not allowed for " +
+      s"the option(s): inverted_index"))
+
+    exception = intercept[RuntimeException](
+      sql(s"ALTER TABLE $table1 SET TBLPROPERTIES('NO_INVERTED_INDEX'='mygeohash')"))
+    assert(exception.getMessage.contains(
+      s"mygeohash is a spatial index column and is not allowed for " +
+      s"the option(s): no_inverted_index"))
+
+    exception = intercept[RuntimeException](
+      sql(s"ALTER TABLE $table1 SET TBLPROPERTIES('COLUMN_META_CACHE' = 'mygeohash')"))
+    assert(exception.getMessage.contains(
+      s"mygeohash is a spatial index column and is not allowed for " +
+      s"the option(s): column_meta_cache"))
+  }
+
+  test("test materialized view with spatial column") {
+    createTable()
+    loadData()
+    val exception = intercept[MalformedCarbonCommandException](sql(
+      s"CREATE MATERIALIZED VIEW view1 AS SELECT longitude, mygeohash FROM $table1"))
+    assert(exception.getMessage.contains(
+      s"mygeohash is a spatial index column and is not allowed for " +
+      s"the option(s): MATERIALIZED VIEW"))
+  }
+
+  test("test geo table create index on spatial column") {
+    createTable()
+    loadData()
+    val exception = intercept[MalformedIndexCommandException](sql(
+      s"""
+         | CREATE INDEX bloom_index ON TABLE $table1 (mygeohash)
+         | AS 'bloomfilter'
+         | PROPERTIES('BLOOM_SIZE'='640000', 'BLOOM_FPP'='0.00001')
+      """.stripMargin))
+    assert(exception.getMessage.contains(
+      s"Spatial Index column is not supported, column 'mygeohash' is spatial column"))
+  }
+
   test("test geo table create and load and check describe formatted") {
     createTable()
     loadData()
@@ -90,6 +167,33 @@
     }
   }
 
+  test("test geo table drop spatial index column") {
+    createTable()
+    loadData()
+    val exception = intercept[MalformedCarbonCommandException](
+      sql(s"alter table $table1 drop columns(mygeohash)"))
+    assert(exception.getMessage.contains(
+      s"Columns present in ${ CarbonCommonConstants.SPATIAL_INDEX } " +
+      s"table property cannot be altered/updated"))
+  }
+
+  test("test geo table alter spatial index column") {
+    createTable()
+    loadData()
+    val exception = intercept[MalformedCarbonCommandException](
+      sql(s"update $table1 set (mygeohash)=(111111) where longitude=116285807 "))
+    assert(exception.getMessage.contains(
+      s"Columns present in ${ CarbonCommonConstants.SPATIAL_INDEX } " +
+      s"table property cannot be altered/updated"))
+  }
+
+  test("test geo table filter by geo spatial index column") {
+    createTable()
+    loadData()
+    checkAnswer(sql(s"select *from $table1 where mygeohash = '2196036'"),
+      Seq(Row(2196036, 1575428400000L, 116337069, 39951887)))
+  }
+
   test("test polygon query") {
     createTable()
     loadData()
@@ -112,6 +216,51 @@
       result)
   }
 
+  test("insert into table select from another geo table with different properties") {
+    val sourceTable = table1;
+    val targetTable = table2;
+    sql(
+      s"""
+         | CREATE TABLE $sourceTable(
+         | timevalue BIGINT,
+         | longitude LONG,
+         | latitude LONG) COMMENT "This is a GeoTable"
+         | STORED AS carbondata
+         | TBLPROPERTIES ('SPATIAL_INDEX'='spatial',
+         | 'SPATIAL_INDEX.spatial.type'='geohash',
+         | 'SPATIAL_INDEX.spatial.sourcecolumns'='longitude, latitude',
+         | 'SPATIAL_INDEX.spatial.originLatitude'='39.832277',
+         | 'SPATIAL_INDEX.spatial.gridSize'='60',
+         | 'SPATIAL_INDEX.spatial.minLongitude'='115.811865',
+         | 'SPATIAL_INDEX.spatial.maxLongitude'='116.782233',
+         | 'SPATIAL_INDEX.spatial.minLatitude'='39.832277',
+         | 'SPATIAL_INDEX.spatial.maxLatitude'='40.225281',
+         | 'SPATIAL_INDEX.spatial.conversionRatio'='1000000')
+       """.stripMargin)
+    loadData(sourceTable)
+    createTable(targetTable)
+    sql(s"insert into  $targetTable select * from $sourceTable")
+    checkAnswer(sql(s"select *from $targetTable where mygeohash = '2196036'"),
+      Seq(Row(2196036, 1575428400000L, 116337069, 39951887)))
+  }
+
+  test("test insert into non-geo table select from geo table") {
+    val sourceTable = table1;
+    val targetTable = table2;
+    createTable(sourceTable)
+    loadData(sourceTable)
+    sql(
+      s"""
+          CREATE TABLE IF NOT EXISTS $targetTable
+          (spatial Long, time Bigint, longitude Long, latitude Long)
+          STORED AS carbondata
+        """)
+    sql(s"insert into  $targetTable select * from $sourceTable")
+    checkAnswer(
+      sql(s"select * from $targetTable where spatial='2196036'"),
+      Seq(Row(2196036, 1575428400000L, 116337069, 39951887)))
+  }
+
   test("test insert into table select from another table with target table sort scope as global") {
     val sourceTable = table1;
     val targetTable = table2;
@@ -127,20 +276,49 @@
 
   test("test block pruning for polygon query") {
     createTable()
-    sql(s"insert into $table1 select 1575428400000,116285807,40084087")
-    sql(s"insert into $table1 select 1575428400000,116372142,40129503")
-    sql(s"insert into $table1 select 1575428400000,116187332,39979316")
-    sql(s"insert into $table1 select 1575428400000,116337069,39951887")
-    sql(s"insert into $table1 select 1575428400000,116359102,40154684")
-    sql(s"insert into $table1 select 1575428400000,116736367,39970323")
-    sql(s"insert into $table1 select 1575428400000,116362699,39942444")
-    sql(s"insert into $table1 select 1575428400000,116325378,39963129")
-    sql(s"insert into $table1 select 1575428400000,116302895,39930753")
-    sql(s"insert into $table1 select 1575428400000,116288955,39999101")
-    val df = sql(s"select longitude, latitude from $table1 where IN_POLYGON('116.321011 " +
+    sql(s"insert into $table1 select 0,1575428400000,116285807,40084087")
+    sql(s"insert into $table1 select 0,1575428400000,116372142,40129503")
+    sql(s"insert into $table1 select 0,1575428400000,116187332,39979316")
+    sql(s"insert into $table1 select 0,1575428400000,116337069,39951887")
+    sql(s"insert into $table1 select 0,1575428400000,116359102,40154684")
+    sql(s"insert into $table1 select 0,1575428400000,116736367,39970323")
+    sql(s"insert into $table1 select 0,1575428400000,116362699,39942444")
+    sql(s"insert into $table1 select 0,1575428400000,116325378,39963129")
+    sql(s"insert into $table1 select 0,1575428400000,116302895,39930753")
+    sql(s"insert into $table1 select 0,1575428400000,116288955,39999101")
+    val df = sql(s"select * from $table1 where IN_POLYGON('116.321011 " +
                  s"40.123503, 116.137676 39.947911, 116.560993 39.935276, 116.321011 40.123503')")
     assert(df.rdd.getNumPartitions == 6)
-    checkAnswer(df, result)
+    checkAnswer(df, Seq(Row(733215, 1575428400000L, 116187332, 39979316),
+      Row(2160019, 1575428400000L, 116362699, 39942444),
+      Row(2170151, 1575428400000L, 116288955, 39999101),
+      Row(2174509, 1575428400000L, 116325378, 39963129),
+      Row(2196036, 1575428400000L, 116337069, 39951887),
+      Row(2361256, 1575428400000L, 116285807, 40084087)))
+  }
+
+  test("test insert into on table partitioned by timevalue column") {
+    sql(
+      s"""
+         | CREATE TABLE $table1(
+         | longitude LONG,
+         | latitude LONG) COMMENT "This is a GeoTable" PARTITIONED BY (timevalue BIGINT)
+         | STORED AS carbondata
+         | TBLPROPERTIES ('SPATIAL_INDEX'='mygeohash',
+         | 'SPATIAL_INDEX.mygeohash.type'='geohash',
+         | 'SPATIAL_INDEX.mygeohash.sourcecolumns'='longitude, latitude',
+         | 'SPATIAL_INDEX.mygeohash.originLatitude'='39.832277',
+         | 'SPATIAL_INDEX.mygeohash.gridSize'='50',
+         | 'SPATIAL_INDEX.mygeohash.minLongitude'='115.811865',
+         | 'SPATIAL_INDEX.mygeohash.maxLongitude'='116.782233',
+         | 'SPATIAL_INDEX.mygeohash.minLatitude'='39.832277',
+         | 'SPATIAL_INDEX.mygeohash.maxLatitude'='40.225281',
+         | 'SPATIAL_INDEX.mygeohash.conversionRatio'='1000000')
+       """.stripMargin)
+    sql(s"insert into $table1 select 0, 116337069, 39951887, 1575428400000")
+    checkAnswer(
+      sql(s"select * from $table1 where mygeohash = '2196036'"),
+      Seq(Row(2196036, 116337069, 39951887, 1575428400000L)))
   }
 
   test("test polygon query on table partitioned by timevalue column") {
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
index 5a82e7a..4ad4cce 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
@@ -18,7 +18,9 @@
 package org.apache.carbondata.processing.loading.converter.impl;
 
 import java.util.List;
+import java.util.Map;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
@@ -70,9 +72,17 @@
   public FieldConverter createFieldEncoder(
       DataField dataField, int index, String nullFormat, boolean isEmptyBadRecord,
       boolean isConvertToBinary, String binaryDecoder, CarbonDataLoadConfiguration configuration) {
+    String spatialProperty = null;
+    if (configuration != null) {
+      Map<String, String> properties =
+          configuration.getTableSpec().getCarbonTable().getTableInfo().getFactTable()
+              .getTableProperties();
+      spatialProperty = properties.get(CarbonCommonConstants.SPATIAL_INDEX);
+    }
     // Converters are only needed for dimensions and measures it return null.
     if (dataField.getColumn().isDimension()) {
-      if (dataField.getColumn().isSpatialColumn()) {
+      if (spatialProperty != null && dataField.getColumn().getColName()
+          .equalsIgnoreCase(spatialProperty.trim())) {
         return new SpatialIndexFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord,
             configuration);
       } else if (dataField.getColumn().getDataType() == DataTypes.DATE &&
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
index 2b6657c..c89b932 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
@@ -19,8 +19,10 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
@@ -83,14 +85,18 @@
     List<FieldConverter> fieldConverterList = new ArrayList<>();
     List<FieldConverter> nonSchemaFieldConverterList = new ArrayList<>();
     long lruCacheStartTime = System.currentTimeMillis();
-
+    Map<String, String> properties =
+        configuration.getTableSpec().getCarbonTable().getTableInfo().getFactTable()
+            .getTableProperties();
+    String spatialProperty = properties.get(CarbonCommonConstants.SPATIAL_INDEX);
     for (int i = 0; i < fields.length; i++) {
       FieldConverter fieldConverter = FieldEncoderFactory.getInstance()
           .createFieldEncoder(fields[i], i, nullFormat, isEmptyBadRecord, isConvertToBinary,
               (String) configuration.getDataLoadProperty(
                   CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER),
               configuration);
-      if (fields[i].getColumn().isSpatialColumn()) {
+      if (spatialProperty != null && fields[i].getColumn().getColName()
+          .equalsIgnoreCase(spatialProperty.trim())) {
         nonSchemaFieldConverterList.add(fieldConverter);
       } else {
         fieldConverterList.add(fieldConverter);
@@ -107,9 +113,17 @@
   public CarbonRow convert(CarbonRow row) throws CarbonDataLoadingException {
     logHolder.setLogged(false);
     logHolder.clear();
+    Map<String, String> properties =
+        configuration.getTableSpec().getCarbonTable().getTableInfo().getFactTable()
+            .getTableProperties();
+    String spatialProperty = properties.get(CarbonCommonConstants.SPATIAL_INDEX);
+    boolean isSpatialColumn = false;
     for (int i = 0; i < fieldConverters.length; i++) {
-      if (configuration.isNonSchemaColumnsPresent() && !fieldConverters[i].getDataField()
-          .getColumn().isSpatialColumn()) {
+      if (spatialProperty != null) {
+        isSpatialColumn = fieldConverters[i].getDataField().getColumn().getColName()
+            .equalsIgnoreCase(spatialProperty.trim());
+      }
+      if (configuration.isNonSchemaColumnsPresent() && !isSpatialColumn) {
         // Skip the conversion for schema columns if the conversion is required only for non-schema
         // columns
         continue;
@@ -156,13 +170,18 @@
     boolean isEmptyBadRecord = Boolean.parseBoolean(
         configuration.getDataLoadProperty(DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD)
             .toString());
+    Map<String, String> properties =
+        configuration.getTableSpec().getCarbonTable().getTableInfo().getFactTable()
+            .getTableProperties();
+    String spatialProperty = properties.get(CarbonCommonConstants.SPATIAL_INDEX);
     for (int i = 0; i < fields.length; i++) {
       FieldConverter fieldConverter = FieldEncoderFactory.getInstance()
           .createFieldEncoder(fields[i], i, nullFormat, isEmptyBadRecord, isConvertToBinary,
               (String) configuration.getDataLoadProperty(
                   CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER),
               configuration);
-      if (fields[i].getColumn().isSpatialColumn()) {
+      if (spatialProperty != null && fields[i].getColumn().getColName()
+          .equalsIgnoreCase(spatialProperty.trim())) {
         nonSchemaFieldConverterList.add(fieldConverter);
       } else {
         fieldConverterList.add(fieldConverter);
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java
index 45d62c9..23caf31 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java
@@ -18,7 +18,9 @@
 package org.apache.carbondata.processing.loading.parser.impl;
 
 import java.util.ArrayList;
+import java.util.Map;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.loading.DataField;
 import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
@@ -72,10 +74,15 @@
     numberOfColumns = header.length;
     DataField[] input = new DataField[fields.length];
     inputMapping = new int[input.length];
+    Map<String, String> properties =
+        configuration.getTableSpec().getCarbonTable().getTableInfo().getFactTable()
+            .getTableProperties();
+    String spatialProperty = properties.get(CarbonCommonConstants.SPATIAL_INDEX);
     int k = 0;
     for (int i = 0; i < fields.length; i++) {
-      if (fields[i].getColumn().isSpatialColumn()) {
-        // Index columns are non-schema fields. They are not present in the header. So set
+      if (spatialProperty != null && fields[i].getColumn().getColName()
+          .equalsIgnoreCase(spatialProperty.trim())) {
+        // Spatial index columns are not present in the header. So set
         // the input mapping as -1 for the field and continue
         input[k] = fields[i];
         inputMapping[k] = -1;
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
index 21a4bcc..c8fa656 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
@@ -366,8 +366,13 @@
 
     private Object[] convertToNoDictionaryToBytes(Object[] data, DataField[] dataFields) {
       Object[] newData = new Object[dataFields.length];
+      Map<String, String> properties =
+          configuration.getTableSpec().getCarbonTable().getTableInfo().getFactTable()
+              .getTableProperties();
+      String spatialProperty = properties.get(CarbonCommonConstants.SPATIAL_INDEX);
       for (int i = 0; i < dataFields.length; i++) {
-        if (dataFields[i].getColumn().isSpatialColumn()) {
+        if (spatialProperty != null && dataFields[i].getColumn().getColName()
+            .equalsIgnoreCase(spatialProperty.trim())) {
           continue;
         }
         if (i < noDictionaryMapping.length && noDictionaryMapping[i]) {
@@ -410,27 +415,31 @@
     private Object[] convertToNoDictionaryToBytesWithoutReArrange(Object[] data,
         DataField[] dataFields) {
       Object[] newData = new Object[dataFields.length];
+      Map<String, String> properties =
+          configuration.getTableSpec().getCarbonTable().getTableInfo().getFactTable()
+              .getTableProperties();
+      String spatialProperty = properties.get(CarbonCommonConstants.SPATIAL_INDEX);
       // now dictionary is removed, no need of no dictionary mapping
-      for (int i = 0, index = 0; i < dataFields.length; i++) {
-        if (dataFields[i].getColumn().isSpatialColumn()) {
+      for (int i = 0; i < dataFields.length; i++) {
+        if (spatialProperty != null && dataFields[i].getColumn().getColName()
+            .equalsIgnoreCase(spatialProperty.trim())) {
           continue;
         }
         if (DataTypeUtil.isPrimitiveColumn(dataTypes[i])) {
           // keep the no dictionary measure column as original data
-          newData[i] = data[index];
+          newData[i] = data[i];
         } else if (dataTypes[i].isComplexType()) {
-          getComplexTypeByteArray(newData, i, data, dataFields[i], index, true);
-        } else if (dataTypes[i] == DataTypes.DATE && data[index] instanceof Long) {
+          getComplexTypeByteArray(newData, i, data, dataFields[i], i, true);
+        } else if (dataTypes[i] == DataTypes.DATE && data[i] instanceof Long) {
           if (dateDictionaryGenerator == null) {
             dateDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
                 .getDirectDictionaryGenerator(dataTypes[i], dataFields[i].getDateFormat());
           }
-          newData[i] = dateDictionaryGenerator.generateKey((long) data[index]);
+          newData[i] = dateDictionaryGenerator.generateKey((long) data[i]);
         } else {
           newData[i] =
-              DataTypeUtil.getBytesDataDataTypeForNoDictionaryColumn(data[index], dataTypes[i]);
+              DataTypeUtil.getBytesDataDataTypeForNoDictionaryColumn(data[i], dataTypes[i]);
         }
-        index++;
       }
       return newData;
     }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index d8a22a7..474a54e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -333,8 +333,12 @@
     Set<String> columnNames = new HashSet<String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     List<CarbonDimension> dimensions =
         schema.getCarbonTable().getVisibleDimensions();
+    Map<String, String> properties =
+        schema.getCarbonTable().getTableInfo().getFactTable().getTableProperties();
+    String spatialProperty = properties.get(CarbonCommonConstants.SPATIAL_INDEX);
     for (CarbonDimension dimension : dimensions) {
-      if (!dimension.isSpatialColumn()) {
+      if (spatialProperty != null && !dimension.getColName()
+          .equalsIgnoreCase(spatialProperty.trim())) {
         // skip the non-schema column
         columnNames.add(dimension.getColName());
       }