[CARBONDATA-3933]Fix DDL/DML failures after table is created
with column names having special characters like #,\,%

Why is this PR needed?
when operations like insert,describe, select is fired after
table is created with column names having special characters,
operations fails. This is because after table creation spark already
stred the column names with special characters correctly,
bt for further operations we try to update the metastore/refresh
with new schema parts which is unnecessary which causes this issue.

What changes were proposed in this PR?
We use unnecessary API to update the serde properties of table by
calling spark-sql, this was required when we used to support the
spark-2.1 and 2.2 version when spark was not supporting many alter
table operations. Now since we use other APIs to alter the table,
these API calls are not necessary. So once we remove we in turn
avoid updating the serde properties with the modified schema parts which solves this issue.

This closes #3862
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
index 8b79b70..2e86d14 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -93,7 +93,7 @@
         .collect { case carbonColumn if !carbonColumn.isInvisible => carbonColumn.getColumnSchema }
       // sort the new columns based on schema order
       val sortedColsBasedActualSchemaOrder = newCols.sortBy(a => a.getSchemaOrdinal)
-      val (tableIdentifier, schemaParts) = AlterTableUtil.updateSchemaInfo(
+      val tableIdentifier = AlterTableUtil.updateSchemaInfo(
           carbonTable,
           schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry),
           thriftTable)(sparkSession)
@@ -110,7 +110,7 @@
       } else {
         Some(carbonColumns ++ sortedColsBasedActualSchemaOrder)
       }
-      CarbonSessionCatalogUtil.alterAddColumns(tableIdentifier, schemaParts, cols, sparkSession)
+      CarbonSessionCatalogUtil.alterAddColumns(tableIdentifier, cols, sparkSession)
       sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
       val alterTablePostExecutionEvent: AlterTableAddColumnPostEvent =
         AlterTableAddColumnPostEvent(sparkSession, carbonTable, alterTableAddColumnsModel)
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
index 1c27dd7..8ce774a 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
@@ -313,10 +313,10 @@
     } else {
       Some(carbonColumns)
     }
-    val (tableIdentifier, schemaParts) = AlterTableUtil.updateSchemaInfo(
+    val tableIdentifier = AlterTableUtil.updateSchemaInfo(
       carbonTable, schemaEvolutionEntry, tableInfo)(sparkSession)
     CarbonSessionCatalogUtil.alterColumnChangeDataTypeOrRename(
-      tableIdentifier, schemaParts, columns, sparkSession)
+      tableIdentifier, columns, sparkSession)
     sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
   }
 
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index 2289aad..55f4c03 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -160,12 +160,12 @@
       val delCols = deletedColumnSchema.map { deleteCols =>
         schemaConverter.fromExternalToWrapperColumnSchema(deleteCols)
       }
-      val (tableIdentifier, schemaParts) = AlterTableUtil.updateSchemaInfo(
+      val tableIdentifier = AlterTableUtil.updateSchemaInfo(
         carbonTable,
         schemaEvolutionEntry,
         tableInfo)(sparkSession)
       // get the columns in schema order and filter the dropped column in the column set
-      val cols = carbonTable.getCreateOrderColumn().asScala
+      val cols = carbonTable.getCreateOrderColumn.asScala
         .collect { case carbonColumn if !carbonColumn.isInvisible => carbonColumn.getColumnSchema }
         .filterNot(column => delCols.contains(column))
       // When we call
@@ -181,7 +181,7 @@
         Some(cols)
       }
       CarbonSessionCatalogUtil.alterDropColumns(
-        tableIdentifier, schemaParts, columns, sparkSession)
+        tableIdentifier, columns, sparkSession)
       sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
       // TODO: 1. add check for deletion of index tables
 
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index 919ce91..05052aa 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -21,7 +21,7 @@
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
 import org.apache.spark.sql.execution.command.{AlterTableRenameModel, MetadataCommand}
-import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalogUtil}
+import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalogUtil, MockClassForAlterRevertTests}
 import org.apache.spark.util.AlterTableUtil
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
@@ -140,6 +140,7 @@
         tableInfo,
         schemaEvolutionEntry,
         carbonTable.getTablePath)(sparkSession)
+      new MockClassForAlterRevertTests().mockForAlterRevertTest()
 
       val alterTableRenamePostEvent: AlterTableRenamePostEvent = AlterTableRenamePostEvent(
         carbonTable,
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala
index 316d329..99ceebf 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala
@@ -69,34 +69,4 @@
       storage: CatalogStorageFormat,
       newTableName: String,
       dbName: String): CatalogStorageFormat
-
-  /**
-   * Below method will be used to add new column
-   * @param tableIdentifier table identifier
-   * @param schemaParts schema parts
-   * @param cols cols
-   */
-  def alterAddColumns(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]]): Unit
-
-  /**
-   * Below method will be used to drop column
-   * @param tableIdentifier table identifier
-   * @param schemaParts schema parts
-   * @param cols cols
-   */
-  def alterDropColumns(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]]): Unit
-
-  /**
-   * Below method will be used to alter data type of column in schema
-   * @param tableIdentifier table identifier
-   * @param schemaParts schema parts
-   * @param cols cols
-   */
-  def alterColumnChangeDataTypeOrRename(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]]): Unit
 }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalogUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalogUtil.scala
index 7288239..beaebed 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalogUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalogUtil.scala
@@ -61,21 +61,6 @@
       s"'dbName'='${ newTableIdentifier.database.get }', 'tablePath'='${ newTablePath }')")
   }
 
-  /**
-   * Below method will be used to update serde properties
-   * @param tableIdentifier table identifier
-   * @param schemaParts schema parts
-   * @param cols cols
-   */
-  def alterTable(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]],
-      sparkSession: SparkSession): Unit = {
-    getClient(sparkSession)
-      .runSqlHive(s"ALTER TABLE `${tableIdentifier.database.get}`.`${tableIdentifier.table}` " +
-                  s"SET TBLPROPERTIES($schemaParts)")
-  }
-
   def alterTableProperties(
       sparkSession: SparkSession,
       tableIdentifier: TableIdentifier,
@@ -85,11 +70,11 @@
     val catalog = sparkSession.sessionState.catalog
     val table = catalog.getTableMetadata(tableIdentifier)
     var newProperties = table.storage.properties
-    if (!propKeys.isEmpty) {
+    if (propKeys.nonEmpty) {
       val updatedPropKeys = propKeys.map(_.toLowerCase)
       newProperties = newProperties.filter { case (k, _) => !updatedPropKeys.contains(k) }
     }
-    if (!properties.isEmpty) {
+    if (properties.nonEmpty) {
       newProperties = newProperties ++ CarbonSparkSqlParserUtil.normalizeProperties(properties)
     }
     val newTable = table.copy(
@@ -113,21 +98,18 @@
   }
 
   def alterAddColumns(tableIdentifier: TableIdentifier,
-      schemaParts: String,
       cols: Option[Seq[ColumnSchema]], sparkSession: SparkSession): Unit = {
-    updateCatalogTableForAlter(tableIdentifier, schemaParts, cols, sparkSession)
+    updateCatalogTableForAlter(tableIdentifier, cols, sparkSession)
   }
 
   def alterDropColumns(tableIdentifier: TableIdentifier,
-      schemaParts: String,
       cols: Option[Seq[ColumnSchema]], sparkSession: SparkSession): Unit = {
-    updateCatalogTableForAlter(tableIdentifier, schemaParts, cols, sparkSession)
+    updateCatalogTableForAlter(tableIdentifier, cols, sparkSession)
   }
 
   def alterColumnChangeDataTypeOrRename(tableIdentifier: TableIdentifier,
-      schemaParts: String,
       cols: Option[Seq[ColumnSchema]], sparkSession: SparkSession): Unit = {
-    updateCatalogTableForAlter(tableIdentifier, schemaParts, cols, sparkSession)
+    updateCatalogTableForAlter(tableIdentifier, cols, sparkSession)
   }
 
   /**
@@ -135,16 +117,14 @@
    * schema for all the alter operations like add column, drop column, change datatype or rename
    * column
    * @param tableIdentifier
-   * @param schemaParts
    * @param cols
    */
   private def updateCatalogTableForAlter(tableIdentifier: TableIdentifier,
-      schemaParts: String,
       cols: Option[Seq[ColumnSchema]],
       sparkSession: SparkSession): Unit = {
-    alterTable(tableIdentifier, schemaParts, cols, sparkSession)
+    new MockClassForAlterRevertTests().mockForAlterRevertTest()
     CarbonSessionUtil.alterExternalCatalogForTableWithUpdatedSchema(
-      tableIdentifier, cols, schemaParts, sparkSession)
+      tableIdentifier, cols, sparkSession)
   }
 
   /**
@@ -173,3 +153,9 @@
     storage.copy(locationUri = Some(path.toUri))
   }
 }
+
+// This class is a dummy class to test the alter table scenarios in failure cases.
+class MockClassForAlterRevertTests {
+  def mockForAlterRevertTest(): Unit = {
+  }
+}
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionUtil.scala
index 1b6d8f0..a6d3626 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionUtil.scala
@@ -125,12 +125,10 @@
    * @param tableIdentifier tableIdentifier for table
    * @param cols            all the column of table, which are updated with datatype change of
    *                        new column name
-   * @param schemaParts     schemaParts
    * @param sparkSession    sparkSession
    */
   def alterExternalCatalogForTableWithUpdatedSchema(tableIdentifier: TableIdentifier,
       cols: Option[Seq[ColumnSchema]],
-      schemaParts: String,
       sparkSession: SparkSession): Unit = {
     val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier)(sparkSession)
     val colArray: scala.collection.mutable.ArrayBuffer[StructField] = ArrayBuffer()
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
index ea2c653..9bb7eb6 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
@@ -358,10 +358,9 @@
           tblPropertiesMap.put(property._1, property._2)
         }
       }
-      val (tableIdentifier, schemaParts) = AlterTableUtil.updateSchemaInfo(
+      val tableIdentifier = AlterTableUtil.updateSchemaInfo(
         carbonTable = carbonTable,
         thriftTable = thriftTable)(sparkSession)
-      CarbonSessionCatalogUtil.alterTable(tableIdentifier, schemaParts, None, sparkSession)
       // remove from the cache so that the table will be loaded again with the new table properties
       CarbonInternalMetastore
         .removeTableFromMetadataCache(carbonTable.getDatabaseName, tableName)(sparkSession)
diff --git a/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index afab626..9f29382 100644
--- a/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -196,7 +196,7 @@
   def updateSchemaInfo(carbonTable: CarbonTable,
       schemaEvolutionEntry: SchemaEvolutionEntry = null,
       thriftTable: TableInfo)
-    (sparkSession: SparkSession): (TableIdentifier, String) = {
+    (sparkSession: SparkSession): TableIdentifier = {
     val dbName = carbonTable.getDatabaseName
     val tableName = carbonTable.getTableName
     CarbonEnv.getInstance(sparkSession).carbonMetaStore
@@ -206,35 +206,9 @@
         schemaEvolutionEntry,
         carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
     val tableIdentifier = TableIdentifier(tableName, Some(dbName))
-    sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
-    val schema = CarbonEnv.getInstance(sparkSession).carbonMetaStore
-      .lookupRelation(tableIdentifier)(sparkSession).schema.json
-    val schemaParts = prepareSchemaJsonForAlterTable(sparkSession.sparkContext.getConf, schema)
-    (tableIdentifier, schemaParts)
-  }
-
-  /**
-   * This method will split schema string into multiple parts of configured size and
-   * registers the parts as keys in tableProperties which will be read by spark to prepare
-   * Carbon Table fields
-   *
-   * @param sparkConf
-   * @param schemaJsonString
-   * @return
-   */
-  def prepareSchemaJsonForAlterTable(sparkConf: SparkConf,
-      schemaJsonString: String): String = {
-    val threshold = sparkConf
-      .getInt(CarbonCommonConstants.SPARK_SCHEMA_STRING_LENGTH_THRESHOLD,
-        CarbonCommonConstants.SPARK_SCHEMA_STRING_LENGTH_THRESHOLD_DEFAULT)
-    // Split the JSON string.
-    val parts = schemaJsonString.grouped(threshold).toSeq
-    var schemaParts: Seq[String] = Seq.empty
-    schemaParts = schemaParts :+ s"'$DATASOURCE_SCHEMA_NUMPARTS'='${ parts.size }'"
-    parts.zipWithIndex.foreach { case (part, index) =>
-      schemaParts = schemaParts :+ s"'$DATASOURCE_SCHEMA_PART_PREFIX$index'='$part'"
-    }
-    schemaParts.mkString(",")
+    CarbonEnv.getInstance(sparkSession).carbonMetaStore
+      .lookupRelation(tableIdentifier)(sparkSession)
+    tableIdentifier
   }
 
   /**
@@ -522,11 +496,10 @@
         // check if duplicate columns are present in both local dictionary include and exclude
         CarbonScalaUtil.validateDuplicateColumnsForLocalDict(tblPropertiesMap)
       }
-      val (tableIdentifier, schemaParts) = updateSchemaInfo(
+      val tableIdentifier = updateSchemaInfo(
         carbonTable = carbonTable,
         schemaEvolutionEntry,
         thriftTable = thriftTable)(sparkSession)
-      CarbonSessionCatalogUtil.alterTable(tableIdentifier, schemaParts, None, sparkSession)
       CarbonSessionCatalogUtil.alterTableProperties(
         sparkSession, tableIdentifier, lowerCasePropertiesMap.toMap, propKeys)
       sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
diff --git a/integration/spark/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSessionStateBuilder.scala b/integration/spark/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSessionStateBuilder.scala
index 832596e..8a9f8f3 100644
--- a/integration/spark/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSessionStateBuilder.scala
+++ b/integration/spark/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSessionStateBuilder.scala
@@ -112,25 +112,6 @@
     CarbonSessionCatalogUtil.getClient(sparkSession)
   }
 
-  override def alterAddColumns(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]]): Unit = {
-    CarbonSessionCatalogUtil.alterAddColumns(tableIdentifier, schemaParts, cols, sparkSession)
-  }
-
-  override def alterDropColumns(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]]): Unit = {
-    CarbonSessionCatalogUtil.alterDropColumns(tableIdentifier, schemaParts, cols, sparkSession)
-  }
-
-  override def alterColumnChangeDataTypeOrRename(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]]): Unit = {
-    CarbonSessionCatalogUtil.alterColumnChangeDataTypeOrRename(
-      tableIdentifier, schemaParts, cols, sparkSession)
-  }
-
   /**
    * This is alternate way of getting partition information. It first fetches all partitions from
    * hive and then apply filter instead of querying hive along with filters.
diff --git a/integration/spark/src/main/spark2.4/org/apache/spark/sql/hive/CarbonSessionStateBuilder.scala b/integration/spark/src/main/spark2.4/org/apache/spark/sql/hive/CarbonSessionStateBuilder.scala
index 33438e4..b3666d1 100644
--- a/integration/spark/src/main/spark2.4/org/apache/spark/sql/hive/CarbonSessionStateBuilder.scala
+++ b/integration/spark/src/main/spark2.4/org/apache/spark/sql/hive/CarbonSessionStateBuilder.scala
@@ -112,25 +112,6 @@
     CarbonSessionCatalogUtil.getClient(sparkSession)
   }
 
-  override def alterAddColumns(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]]): Unit = {
-    CarbonSessionCatalogUtil.alterAddColumns(tableIdentifier, schemaParts, cols, sparkSession)
-  }
-
-  override def alterDropColumns(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]]): Unit = {
-    CarbonSessionCatalogUtil.alterDropColumns(tableIdentifier, schemaParts, cols, sparkSession)
-  }
-
-  override def alterColumnChangeDataTypeOrRename(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]]): Unit = {
-    CarbonSessionCatalogUtil.alterColumnChangeDataTypeOrRename(
-      tableIdentifier, schemaParts, cols, sparkSession)
-  }
-
   /**
    * This is alternate way of getting partition information. It first fetches all partitions from
    * hive and then apply filter instead of querying hive along with filters.
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
index 1e81554..6ae9e6b 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
@@ -347,6 +347,20 @@
       CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL)
   }
 
+  test("test table creation with special char and other commands") {
+    sql("drop table if exists special_char")
+    sql("create table special_char(`i#d` string, `nam(e` string,`ci)&#@!ty` string,`a\be` int, `ag!e` float, `na^me1` Decimal(8,4)) stored as carbondata")
+    sql("insert into special_char values('1','joey','hud', 2, 2.2, 2.3456)")
+    checkAnswer(sql("select * from special_char"), Seq(Row("1","joey","hud", 2, 2.2, 2.3456)))
+    val df = sql("describe formatted special_char").collect()
+    assert(df.exists(_.get(0).toString.contains("i#d")))
+    assert(df.exists(_.get(0).toString.contains("nam(e")))
+    assert(df.exists(_.get(0).toString.contains("ci)&#@!ty")))
+    assert(df.exists(_.get(0).toString.contains("a\be")))
+    assert(df.exists(_.get(0).toString.contains("ag!e")))
+    assert(df.exists(_.get(0).toString.contains("na^me1")))
+  }
+
   override def afterEach {
     sql("DROP TABLE if exists loadtest")
     sql("drop table if exists invalidMeasures")
diff --git a/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala b/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
index 4e61916..5d2f510 100644
--- a/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
@@ -19,7 +19,9 @@
 
 import java.io.File
 
+import mockit.{Mock, MockUp}
 import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.hive.MockClassForAlterRevertTests
 import org.apache.spark.sql.test.TestQueryExecutor
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
@@ -30,6 +32,13 @@
 class AlterTableRevertTestCase extends QueryTest with BeforeAndAfterAll {
 
   override def beforeAll() {
+    new MockUp[MockClassForAlterRevertTests]() {
+      @Mock
+      @throws[ProcessMetaDataException]
+      def mockForAlterRevertTest(): Unit = {
+        throw new ProcessMetaDataException("default", "reverttest", "thrown in mock")
+      }
+    }
     sql("drop table if exists reverttest")
     sql(
       "CREATE TABLE reverttest(intField int,stringField string,timestampField timestamp," +
@@ -40,11 +49,9 @@
 
   test("test to revert new added columns on failure") {
     intercept[ProcessMetaDataException] {
-      hiveClient.runSqlHive("set hive.security.authorization.enabled=true")
       sql(
         "Alter table reverttest add columns(newField string) TBLPROPERTIES" +
         "('DEFAULT.VALUE.newField'='def')")
-      hiveClient.runSqlHive("set hive.security.authorization.enabled=false")
       intercept[AnalysisException] {
         sql("select newField from reverttest")
       }
@@ -64,18 +71,14 @@
 
   test("test to revert drop columns on failure") {
     intercept[ProcessMetaDataException] {
-      hiveClient.runSqlHive("set hive.security.authorization.enabled=true")
       sql("Alter table reverttest drop columns(decimalField)")
-      hiveClient.runSqlHive("set hive.security.authorization.enabled=false")
     }
     assert(sql("select decimalField from reverttest").count().equals(1L))
   }
 
   test("test to revert changed datatype on failure") {
     intercept[ProcessMetaDataException] {
-      hiveClient.runSqlHive("set hive.security.authorization.enabled=true")
       sql("Alter table reverttest change intField intfield bigint")
-      hiveClient.runSqlHive("set hive.security.authorization.enabled=false")
     }
     assert(
       sql("select intfield from reverttest").schema.fields.apply(0).dataType.simpleString == "int")
@@ -83,11 +86,9 @@
 
   test("test to check if dictionary files are deleted for new column if query fails") {
     intercept[ProcessMetaDataException] {
-      hiveClient.runSqlHive("set hive.security.authorization.enabled=true")
       sql(
         "Alter table reverttest add columns(newField string) TBLPROPERTIES" +
         "('DEFAULT.VALUE.newField'='def')")
-      hiveClient.runSqlHive("set hive.security.authorization.enabled=false")
       intercept[AnalysisException] {
         sql("select newField from reverttest")
       }
@@ -98,7 +99,11 @@
   }
 
   override def afterAll() {
-    hiveClient.runSqlHive("set hive.security.authorization.enabled=false")
+    new MockUp[MockClassForAlterRevertTests]() {
+      @Mock
+      def mockForAlterRevertTest(): Unit = {
+      }
+    }
     sql("drop table if exists reverttest")
     sql("drop table if exists reverttest_fail")
   }