[CARBONDATA-2142] [CARBONDATA-1763] Fixed issues while creation concurrent datamaps

Analysis:
1. GenerateTableSchemaString in CarbonMetastore did not have any specific implementation for hive metastore due to which carbontables were being
cached in MetaData. As there is no way to refresh table in hivemetastore therefore this is wrong. All queries should get the latest carbon table
from metastore and not from cache.
2. If updating the main table status fails then revertMainTableChanges method is called to revert the changes. The logic to revert was wrong which led
to wrong entry getting deleted from the schema.
3. Moved the force remove logic before taking locks as deletion from metastore should happen even if the lock if not present as the table is in
stale state(Entry is not there in parent but available in metastore).

This closes #1975
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
index 1fcccfb..c399ef4 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
@@ -74,9 +74,11 @@
   public static ICarbonLock getLockObject(AbsoluteTableIdentifier absoluteTableIdentifier,
       String lockType, String errorMsg) {
     ICarbonLock carbonLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier, lockType);
-    LOGGER.info("Trying to acquire lock: " + carbonLock);
+    LOGGER.info("Trying to acquire lock: " + lockType + "for table: " +
+        absoluteTableIdentifier.toString());
     if (carbonLock.lockWithRetries()) {
-      LOGGER.info("Successfully acquired the lock " + carbonLock);
+      LOGGER.info("Successfully acquired the lock " + lockType + "for table: " +
+          absoluteTableIdentifier.toString());
     } else {
       LOGGER.error(errorMsg);
       throw new RuntimeException(errorMsg);
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java b/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
index 6ef2671..d3250aa 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
@@ -143,4 +143,9 @@
   public String getTableName() {
     return carbonTableIdentifier.getTableName();
   }
+
+  public String toString() {
+    return carbonTableIdentifier.toString();
+  }
+
 }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index 8ef394c..2675036 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -51,10 +51,10 @@
     forceDrop: Boolean = false)
   extends AtomicRunnableCommand {
 
+  val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
   var commandToRun: CarbonDropTableCommand = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK)
     val carbonEnv = CarbonEnv.getInstance(sparkSession)
@@ -65,6 +65,7 @@
     catalog.checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName)))
     val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer()
     try {
+      forceDropTableFromMetaStore(sparkSession)
       locksToBeAcquired foreach {
         lock => carbonLocks += CarbonLockUtil.getLockObject(tableIdentifier, lock)
       }
@@ -115,22 +116,6 @@
         } else if (!ifExistsSet) {
           throw new NoSuchDataMapException(dataMapName, tableName)
         }
-      } else if (forceDrop) {
-        val childCarbonTable: Option[CarbonTable] = try {
-          val childTableName = tableName + "_" + dataMapName
-          Some(CarbonEnv.getCarbonTable(databaseNameOp, childTableName)(sparkSession))
-        } catch {
-          case _: Exception =>
-            None
-        }
-        if (childCarbonTable.isDefined) {
-          commandToRun = CarbonDropTableCommand(
-            ifExistsSet = true,
-            Some(childCarbonTable.get.getDatabaseName),
-            childCarbonTable.get.getTableName,
-            dropChildTable = true)
-          commandToRun.processMetadata(sparkSession)
-        }
       } else if (carbonTable.isDefined &&
         carbonTable.get.getTableInfo.getDataMapSchemaList.size() == 0) {
         if (!ifExistsSet) {
@@ -156,6 +141,33 @@
     Seq.empty
   }
 
+  /**
+   * Used to drop child datamap from hive metastore if it exists.
+   * forceDrop will be true only when an exception occurs in main table status updation for
+   * parent table or in processData from CreatePreAggregateTableCommand.
+   */
+  private def forceDropTableFromMetaStore(sparkSession: SparkSession): Unit = {
+    if (forceDrop) {
+      val childTableName = tableName + "_" + dataMapName
+      LOGGER.info(s"Trying to force drop $childTableName from metastore")
+      val childCarbonTable: Option[CarbonTable] = try {
+        Some(CarbonEnv.getCarbonTable(databaseNameOp, childTableName)(sparkSession))
+      } catch {
+        case _: Exception =>
+          LOGGER.warn(s"Child table $childTableName not found in metastore")
+          None
+      }
+      if (childCarbonTable.isDefined) {
+        commandToRun = CarbonDropTableCommand(
+          ifExistsSet = true,
+          Some(childCarbonTable.get.getDatabaseName),
+          childCarbonTable.get.getTableName,
+          dropChildTable = true)
+        commandToRun.processMetadata(sparkSession)
+      }
+    }
+  }
+
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     // delete the table folder
     if (commandToRun != null) {
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index c861b00..d2acb00 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -131,11 +131,17 @@
     dmProperties.foreach(f => childSchema.getProperties.put(f._1, f._2))
 
     // updating the parent table about child table
-    PreAggregateUtil.updateMainTable(
-      CarbonEnv.getDatabaseName(parentTableIdentifier.database)(sparkSession),
-      parentTableIdentifier.table,
-      childSchema,
-      sparkSession)
+    try {
+      PreAggregateUtil.updateMainTable(
+        CarbonEnv.getDatabaseName(parentTableIdentifier.database)(sparkSession),
+        parentTableIdentifier.table,
+        childSchema,
+        sparkSession)
+    } catch {
+      case ex: Exception =>
+        undoMetadata(sparkSession, ex)
+        throw ex
+    }
     val updatedLoadQuery = if (timeSeriesFunction.isDefined) {
       PreAggregateUtil.createTimeSeriesSelectQueryFromMain(childSchema.getChildSchema,
         parentTable.getTableName,
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index ae9bc9b..0bee383 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -419,7 +419,6 @@
       LockUsage.DROP_TABLE_LOCK)
     var locks = List.empty[ICarbonLock]
     var carbonTable: CarbonTable = null
-    var numberOfCurrentChild: Int = 0
     try {
       val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
       carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
@@ -435,7 +434,6 @@
         dbName,
         tableName,
         carbonTable.getTablePath)
-      numberOfCurrentChild = wrapperTableInfo.getDataMapSchemaList.size
       if (wrapperTableInfo.getDataMapSchemaList.asScala.
         exists(f => f.getDataMapName.equalsIgnoreCase(childSchema.getDataMapName))) {
         throw new Exception("Duplicate datamap")
@@ -445,11 +443,11 @@
         .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
       updateSchemaInfo(carbonTable,
         thriftTable)(sparkSession)
-      LOGGER.info(s"Parent table updated is successful for table $dbName.$tableName")
+      LOGGER.info(s"Parent table updated is successful for table" +
+                  s" $dbName.${childSchema.getRelationIdentifier.toString}")
     } catch {
       case e: Exception =>
         LOGGER.error(e, "Pre Aggregate Parent table update failed reverting changes")
-        revertMainTableChanges(dbName, tableName, numberOfCurrentChild)(sparkSession)
         throw e
     } finally {
       // release lock after command execution completion
@@ -518,27 +516,6 @@
     }
   }
 
-  /**
-   * This method reverts the changes to the schema if add column command fails.
-   *
-   * @param dbName
-   * @param tableName
-   * @param numberOfChildSchema
-   * @param sparkSession
-   */
-  def revertMainTableChanges(dbName: String, tableName: String, numberOfChildSchema: Int)
-    (sparkSession: SparkSession): Unit = {
-    val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-    val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
-    carbonTable.getTableLastUpdatedTime
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
-    val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
-    if (thriftTable.dataMapSchemas.size > numberOfChildSchema) {
-      metastore.revertTableSchemaForPreAggCreationFailure(
-        carbonTable.getAbsoluteTableIdentifier, thriftTable)(sparkSession)
-    }
-  }
-
   def getChildCarbonTable(databaseName: String, tableName: String)
     (sparkSession: SparkSession): Option[CarbonTable] = {
     val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index 2aa3c34..16ef38d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -24,7 +24,7 @@
 
 import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
 import org.apache.carbondata.core.datamap.DataMapStoreManager
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.{schema, AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonUtil
@@ -183,6 +183,18 @@
   }
 
   /**
+   * Generates schema string from TableInfo
+   */
+  override def generateTableSchemaString(
+      tableInfo: schema.table.TableInfo,
+      absoluteTableIdentifier: AbsoluteTableIdentifier): String = {
+    val schemaEvolutionEntry = new schema.SchemaEvolutionEntry
+    schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
+    tableInfo.getFactTable.getSchemaEvalution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
+    CarbonUtil.convertToMultiGsonStrings(tableInfo, " ", "", ",")
+  }
+
+  /**
    * This method will is used to remove the evolution entry in case of failure.
    *
    * @param carbonTableIdentifier