[CARBONDATA-1954] [Pre-Aggregate] CarbonHiveMetastore updated while dropping the Pre-Aggregate table & code refactored

1. To update CarbonHiveMetastore similar function was already there . Removed duplicate function defination and updated the caller.
2. code refactored so that during droping a pre-aggregate table only metadata will be deleted if processMetadata() is called.

This closes #1743
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
index d61971e..c38e6cf 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
@@ -20,7 +20,9 @@
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.util.CarbonProperties
 
 class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
 
@@ -76,6 +78,58 @@
     assert(dataMapSchemaList.get(2).getChildSchema.getTableName.equals("datamaptest_datamap3"))
   }
 
+  test("check hivemetastore after drop datamap") {
+    try {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+          "true")
+      sql("drop datamap if exists datamap_hiveMetaStoreTable on table hiveMetaStoreTable")
+      sql("drop table if exists hiveMetaStoreTable")
+      sql("create table hiveMetaStoreTable (a string, b string, c string) stored by 'carbondata'")
+
+      sql(
+        "create datamap datamap_hiveMetaStoreTable on table hiveMetaStoreTable using 'preaggregate' dmproperties('key'='value') as select count(a) from hiveMetaStoreTable")
+      checkExistence(sql("show datamap on table hiveMetaStoreTable"), true, "datamap_hiveMetaStoreTable")
+
+      sql("drop datamap datamap_hiveMetaStoreTable on table hiveMetaStoreTable")
+      checkExistence(sql("show datamap on table hiveMetaStoreTable"), false, "datamap_hiveMetaStoreTable")
+
+    }
+    finally {
+      sql("drop table hiveMetaStoreTable")
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+          CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
+    }
+  }
+
+  test("drop the table having pre-aggregate"){
+    try {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+          "true")
+      sql("drop datamap if exists datamap_hiveMetaStoreTable_1 on table hiveMetaStoreTable_1")
+      sql("drop table if exists hiveMetaStoreTable_1")
+      sql("create table hiveMetaStoreTable_1 (a string, b string, c string) stored by 'carbondata'")
+
+      sql(
+        "create datamap datamap_hiveMetaStoreTable_1 on table hiveMetaStoreTable_1 using 'preaggregate' dmproperties('key'='value') as select count(a) from hiveMetaStoreTable_1")
+
+      checkExistence(sql("show datamap on table hiveMetaStoreTable_1"),
+        true,
+        "datamap_hiveMetaStoreTable_1")
+
+      sql("drop table hiveMetaStoreTable_1")
+
+      checkExistence(sql("show tables"), false, "datamap_hiveMetaStoreTable_1")
+    }
+    finally {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+          CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
+    }
+  }
+
   test("test datamap create with preagg with duplicate name") {
     intercept[Exception] {
       sql(
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index 7ee3434..34e37c5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -24,7 +24,6 @@
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql.SparkSession.Builder
-import org.apache.spark.sql.execution.command.datamap.{DataMapDropTablePostListener, DropDataMapPostListener}
 import org.apache.spark.sql.execution.command.preaaggregate._
 import org.apache.spark.sql.execution.streaming.CarbonStreamingQueryListener
 import org.apache.spark.sql.hive.execution.command.CarbonSetCommand
@@ -250,7 +249,6 @@
 
   def initListeners(): Unit = {
     OperationListenerBus.getInstance()
-      .addListener(classOf[DropTablePostEvent], DataMapDropTablePostListener)
       .addListener(classOf[LoadTablePreStatusUpdateEvent], LoadPostAggregateListener)
       .addListener(classOf[DeleteSegmentByIdPreEvent], PreAggregateDeleteSegmentByIdPreListener)
       .addListener(classOf[DeleteSegmentByDatePreEvent], PreAggregateDeleteSegmentByDatePreListener)
@@ -261,7 +259,6 @@
       .addListener(classOf[AlterTableRenamePreEvent], PreAggregateRenameTablePreListener)
       .addListener(classOf[AlterTableDataTypeChangePreEvent], PreAggregateDataTypeChangePreListener)
       .addListener(classOf[AlterTableAddColumnPreEvent], PreAggregateAddColumnsPreListener)
-      .addListener(classOf[DropDataMapPostEvent], DropDataMapPostListener)
       .addListener(classOf[LoadTablePreExecutionEvent], LoadPreAggregateTablePreListener)
       .addListener(classOf[AlterTableCompactionPreStatusUpdateEvent],
         AlterPreAggregateTableCompactionPostListener)
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index 7f68b05..59aa322 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -25,16 +25,15 @@
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.execution.command.AtomicRunnableCommand
 import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
+import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
 import org.apache.spark.sql.hive.CarbonRelation
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events._
 
 
@@ -103,6 +102,16 @@
               carbonTable.get.getTableInfo,
               dbName,
               tableName))(sparkSession)
+          if (dataMapSchema.isDefined) {
+            if (dataMapSchema.get._1.getRelationIdentifier != null) {
+              CarbonDropTableCommand(
+                ifExistsSet = true,
+                Some(dataMapSchema.get._1.getRelationIdentifier.getDatabaseName),
+                dataMapSchema.get._1.getRelationIdentifier.getTableName,
+                dropChildTable = true
+              ).processMetadata(sparkSession)
+            }
+          }
           // fires the event after dropping datamap from main table schema
           val dropDataMapPostEvent =
             DropDataMapPostEvent(
@@ -136,6 +145,12 @@
     // delete the table folder
     val tableIdentifier = CarbonEnv.getIdentifier(databaseNameOp, tableName)(sparkSession)
     DataMapStoreManager.getInstance().clearDataMap(tableIdentifier, dataMapName)
+    CarbonDropTableCommand(
+      ifExistsSet = true,
+      databaseNameOp,
+      dataMapName,
+      dropChildTable = true
+    ).processData(sparkSession)
     Seq.empty
   }
 
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/DataMapListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/DataMapListeners.scala
deleted file mode 100644
index d37ca0a..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/DataMapListeners.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.command.datamap
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.execution.command.AtomicRunnableCommand
-import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
-
-import org.apache.carbondata.events.{DropDataMapPostEvent, DropTablePostEvent, Event, OperationContext, OperationEventListener}
-
-object DataMapDropTablePostListener extends OperationEventListener {
-
-  /**
-   * Called on DropTablePostEvent occurrence
-   */
-  override def onEvent(event: Event, operationContext: OperationContext): Unit = {
-    val dropPostEvent = event.asInstanceOf[DropTablePostEvent]
-    val carbonTable = dropPostEvent.carbonTable
-    val sparkSession = dropPostEvent.sparkSession
-    if (carbonTable.hasDataMapSchema) {
-      // drop all child tables
-      val childSchemas = carbonTable.getTableInfo.getDataMapSchemaList
-      childSchemas.asScala
-        .filter(_.getRelationIdentifier != null)
-        .foreach { childSchema =>
-          CarbonDropTableCommand(
-            ifExistsSet = true,
-            Some(childSchema.getRelationIdentifier.getDatabaseName),
-            childSchema.getRelationIdentifier.getTableName,
-            dropChildTable = true
-          ).run(sparkSession)
-        }
-    }
-  }
-}
-
-object DropDataMapPostListener extends OperationEventListener {
-
-  /**
-   * Called on DropDataMapPostEvent occurrence
-   */
-  override def onEvent(event: Event, operationContext: OperationContext): Unit = {
-    val dropPostEvent = event.asInstanceOf[DropDataMapPostEvent]
-    val dataMapSchema = dropPostEvent.dataMapSchema
-    val sparkSession = dropPostEvent.sparkSession
-    if (dataMapSchema.isDefined) {
-      if (dataMapSchema.get.getRelationIdentifier != null) {
-        CarbonDropTableCommand(
-          ifExistsSet = true,
-          Some(dataMapSchema.get.getRelationIdentifier.getDatabaseName),
-          dataMapSchema.get.getRelationIdentifier.getTableName,
-          dropChildTable = true
-        ).run(sparkSession)
-      }
-    }
-  }
-}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
index aaad207..312e8b0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
@@ -17,9 +17,11 @@
 
 package org.apache.spark.sql.execution.command.table
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 
 import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.execution.command.AtomicRunnableCommand
 import org.apache.spark.sql.util.CarbonException
@@ -32,7 +34,6 @@
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.events._
-import org.apache.carbondata.spark.util.CommonUtil
 
 case class CarbonDropTableCommand(
     ifExistsSet: Boolean,
@@ -42,6 +43,7 @@
   extends AtomicRunnableCommand {
 
   var carbonTable: CarbonTable = _
+  var childTables : Seq[CarbonTable] = Seq.empty
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -83,6 +85,27 @@
       }
       CarbonEnv.getInstance(sparkSession).carbonMetastore.dropTable(identifier)(sparkSession)
 
+      if (carbonTable.hasDataMapSchema) {
+        // drop all child tables
+       val childSchemas = carbonTable.getTableInfo.getDataMapSchemaList
+
+        childTables = childSchemas.asScala
+          .filter(_.getRelationIdentifier != null)
+          .map { childSchema =>
+            val childTable =
+              CarbonEnv.getCarbonTable(
+                TableIdentifier(childSchema.getRelationIdentifier.getTableName,
+                  Some(childSchema.getRelationIdentifier.getDatabaseName)))(sparkSession)
+            CarbonDropTableCommand(
+              ifExistsSet = true,
+              Some(childSchema.getRelationIdentifier.getDatabaseName),
+              childSchema.getRelationIdentifier.getTableName,
+              dropChildTable = true
+            ).processMetadata(sparkSession)
+            childTable
+          }
+      }
+
       // fires the event after dropping main table
       val dropTablePostEvent: DropTablePostEvent =
         DropTablePostEvent(
@@ -123,6 +146,19 @@
         val file = FileFactory.getCarbonFile(tablePath, fileType)
         CarbonUtil.deleteFoldersAndFilesSilent(file)
       }
+      if (carbonTable.hasDataMapSchema && childTables.nonEmpty) {
+        // drop all child tables
+        childTables.foreach { childTable =>
+          val carbonDropCommand = CarbonDropTableCommand(
+            ifExistsSet = true,
+            Some(childTable.getDatabaseName),
+            childTable.getTableName,
+            dropChildTable = true
+          )
+          carbonDropCommand.carbonTable = childTable
+          carbonDropCommand.processData(sparkSession)
+        }
+      }
     }
     Seq.empty
   }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index d5ac5ae..a2d1064 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -146,7 +146,7 @@
       thriftTableInfo: org.apache.carbondata.format.TableInfo,
       carbonTablePath: String)(sparkSession: SparkSession): String = {
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
-    updateHiveMetaStoreForDataMap(newTableIdentifier,
+    updateHiveMetaStoreForAlter(newTableIdentifier,
       oldTableIdentifier,
       thriftTableInfo,
       carbonTablePath,
@@ -180,27 +180,6 @@
     CarbonStorePath.getCarbonTablePath(oldTablePath, newTableIdentifier).getPath
   }
 
-  private def updateHiveMetaStoreForDataMap(newTableIdentifier: CarbonTableIdentifier,
-      oldTableIdentifier: CarbonTableIdentifier,
-      thriftTableInfo: format.TableInfo,
-      tablePath: String,
-      sparkSession: SparkSession,
-      schemaConverter: ThriftWrapperSchemaConverterImpl) = {
-    val newTablePath =
-      CarbonUtil.getNewTablePath(new Path(tablePath), newTableIdentifier.getTableName)
-    val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
-      thriftTableInfo,
-      newTableIdentifier.getDatabaseName,
-      newTableIdentifier.getTableName,
-      newTablePath)
-    val dbName = oldTableIdentifier.getDatabaseName
-    val tableName = oldTableIdentifier.getTableName
-    sparkSession.catalog.refreshTable(TableIdentifier(tableName, Some(dbName)).quotedString)
-    removeTableFromMetadata(dbName, tableName)
-    CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
-    newTablePath
-  }
-
   /**
    * This method will is used to remove the evolution entry in case of failure.
    *