[CARBONDATA-1954] [Pre-Aggregate] CarbonHiveMetastore updated while dropping the Pre-Aggregate table & code refactored
1. To update CarbonHiveMetastore similar function was already there . Removed duplicate function defination and updated the caller.
2. code refactored so that during droping a pre-aggregate table only metadata will be deleted if processMetadata() is called.
This closes #1743
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
index d61971e..c38e6cf 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
@@ -20,7 +20,9 @@
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
+import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.util.CarbonProperties
class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
@@ -76,6 +78,58 @@
assert(dataMapSchemaList.get(2).getChildSchema.getTableName.equals("datamaptest_datamap3"))
}
+ test("check hivemetastore after drop datamap") {
+ try {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+ "true")
+ sql("drop datamap if exists datamap_hiveMetaStoreTable on table hiveMetaStoreTable")
+ sql("drop table if exists hiveMetaStoreTable")
+ sql("create table hiveMetaStoreTable (a string, b string, c string) stored by 'carbondata'")
+
+ sql(
+ "create datamap datamap_hiveMetaStoreTable on table hiveMetaStoreTable using 'preaggregate' dmproperties('key'='value') as select count(a) from hiveMetaStoreTable")
+ checkExistence(sql("show datamap on table hiveMetaStoreTable"), true, "datamap_hiveMetaStoreTable")
+
+ sql("drop datamap datamap_hiveMetaStoreTable on table hiveMetaStoreTable")
+ checkExistence(sql("show datamap on table hiveMetaStoreTable"), false, "datamap_hiveMetaStoreTable")
+
+ }
+ finally {
+ sql("drop table hiveMetaStoreTable")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+ CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
+ }
+ }
+
+ test("drop the table having pre-aggregate"){
+ try {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+ "true")
+ sql("drop datamap if exists datamap_hiveMetaStoreTable_1 on table hiveMetaStoreTable_1")
+ sql("drop table if exists hiveMetaStoreTable_1")
+ sql("create table hiveMetaStoreTable_1 (a string, b string, c string) stored by 'carbondata'")
+
+ sql(
+ "create datamap datamap_hiveMetaStoreTable_1 on table hiveMetaStoreTable_1 using 'preaggregate' dmproperties('key'='value') as select count(a) from hiveMetaStoreTable_1")
+
+ checkExistence(sql("show datamap on table hiveMetaStoreTable_1"),
+ true,
+ "datamap_hiveMetaStoreTable_1")
+
+ sql("drop table hiveMetaStoreTable_1")
+
+ checkExistence(sql("show tables"), false, "datamap_hiveMetaStoreTable_1")
+ }
+ finally {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+ CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
+ }
+ }
+
test("test datamap create with preagg with duplicate name") {
intercept[Exception] {
sql(
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index 7ee3434..34e37c5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -24,7 +24,6 @@
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.SparkSession.Builder
-import org.apache.spark.sql.execution.command.datamap.{DataMapDropTablePostListener, DropDataMapPostListener}
import org.apache.spark.sql.execution.command.preaaggregate._
import org.apache.spark.sql.execution.streaming.CarbonStreamingQueryListener
import org.apache.spark.sql.hive.execution.command.CarbonSetCommand
@@ -250,7 +249,6 @@
def initListeners(): Unit = {
OperationListenerBus.getInstance()
- .addListener(classOf[DropTablePostEvent], DataMapDropTablePostListener)
.addListener(classOf[LoadTablePreStatusUpdateEvent], LoadPostAggregateListener)
.addListener(classOf[DeleteSegmentByIdPreEvent], PreAggregateDeleteSegmentByIdPreListener)
.addListener(classOf[DeleteSegmentByDatePreEvent], PreAggregateDeleteSegmentByDatePreListener)
@@ -261,7 +259,6 @@
.addListener(classOf[AlterTableRenamePreEvent], PreAggregateRenameTablePreListener)
.addListener(classOf[AlterTableDataTypeChangePreEvent], PreAggregateDataTypeChangePreListener)
.addListener(classOf[AlterTableAddColumnPreEvent], PreAggregateAddColumnsPreListener)
- .addListener(classOf[DropDataMapPostEvent], DropDataMapPostListener)
.addListener(classOf[LoadTablePreExecutionEvent], LoadPreAggregateTablePreListener)
.addListener(classOf[AlterTableCompactionPreStatusUpdateEvent],
AlterPreAggregateTableCompactionPostListener)
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index 7f68b05..59aa322 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -25,16 +25,15 @@
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.execution.command.AtomicRunnableCommand
import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
+import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.events._
@@ -103,6 +102,16 @@
carbonTable.get.getTableInfo,
dbName,
tableName))(sparkSession)
+ if (dataMapSchema.isDefined) {
+ if (dataMapSchema.get._1.getRelationIdentifier != null) {
+ CarbonDropTableCommand(
+ ifExistsSet = true,
+ Some(dataMapSchema.get._1.getRelationIdentifier.getDatabaseName),
+ dataMapSchema.get._1.getRelationIdentifier.getTableName,
+ dropChildTable = true
+ ).processMetadata(sparkSession)
+ }
+ }
// fires the event after dropping datamap from main table schema
val dropDataMapPostEvent =
DropDataMapPostEvent(
@@ -136,6 +145,12 @@
// delete the table folder
val tableIdentifier = CarbonEnv.getIdentifier(databaseNameOp, tableName)(sparkSession)
DataMapStoreManager.getInstance().clearDataMap(tableIdentifier, dataMapName)
+ CarbonDropTableCommand(
+ ifExistsSet = true,
+ databaseNameOp,
+ dataMapName,
+ dropChildTable = true
+ ).processData(sparkSession)
Seq.empty
}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/DataMapListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/DataMapListeners.scala
deleted file mode 100644
index d37ca0a..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/DataMapListeners.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.command.datamap
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.execution.command.AtomicRunnableCommand
-import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
-
-import org.apache.carbondata.events.{DropDataMapPostEvent, DropTablePostEvent, Event, OperationContext, OperationEventListener}
-
-object DataMapDropTablePostListener extends OperationEventListener {
-
- /**
- * Called on DropTablePostEvent occurrence
- */
- override def onEvent(event: Event, operationContext: OperationContext): Unit = {
- val dropPostEvent = event.asInstanceOf[DropTablePostEvent]
- val carbonTable = dropPostEvent.carbonTable
- val sparkSession = dropPostEvent.sparkSession
- if (carbonTable.hasDataMapSchema) {
- // drop all child tables
- val childSchemas = carbonTable.getTableInfo.getDataMapSchemaList
- childSchemas.asScala
- .filter(_.getRelationIdentifier != null)
- .foreach { childSchema =>
- CarbonDropTableCommand(
- ifExistsSet = true,
- Some(childSchema.getRelationIdentifier.getDatabaseName),
- childSchema.getRelationIdentifier.getTableName,
- dropChildTable = true
- ).run(sparkSession)
- }
- }
- }
-}
-
-object DropDataMapPostListener extends OperationEventListener {
-
- /**
- * Called on DropDataMapPostEvent occurrence
- */
- override def onEvent(event: Event, operationContext: OperationContext): Unit = {
- val dropPostEvent = event.asInstanceOf[DropDataMapPostEvent]
- val dataMapSchema = dropPostEvent.dataMapSchema
- val sparkSession = dropPostEvent.sparkSession
- if (dataMapSchema.isDefined) {
- if (dataMapSchema.get.getRelationIdentifier != null) {
- CarbonDropTableCommand(
- ifExistsSet = true,
- Some(dataMapSchema.get.getRelationIdentifier.getDatabaseName),
- dataMapSchema.get.getRelationIdentifier.getTableName,
- dropChildTable = true
- ).run(sparkSession)
- }
- }
- }
-}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
index aaad207..312e8b0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
@@ -17,9 +17,11 @@
package org.apache.spark.sql.execution.command.table
+import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.execution.command.AtomicRunnableCommand
import org.apache.spark.sql.util.CarbonException
@@ -32,7 +34,6 @@
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.events._
-import org.apache.carbondata.spark.util.CommonUtil
case class CarbonDropTableCommand(
ifExistsSet: Boolean,
@@ -42,6 +43,7 @@
extends AtomicRunnableCommand {
var carbonTable: CarbonTable = _
+ var childTables : Seq[CarbonTable] = Seq.empty
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -83,6 +85,27 @@
}
CarbonEnv.getInstance(sparkSession).carbonMetastore.dropTable(identifier)(sparkSession)
+ if (carbonTable.hasDataMapSchema) {
+ // drop all child tables
+ val childSchemas = carbonTable.getTableInfo.getDataMapSchemaList
+
+ childTables = childSchemas.asScala
+ .filter(_.getRelationIdentifier != null)
+ .map { childSchema =>
+ val childTable =
+ CarbonEnv.getCarbonTable(
+ TableIdentifier(childSchema.getRelationIdentifier.getTableName,
+ Some(childSchema.getRelationIdentifier.getDatabaseName)))(sparkSession)
+ CarbonDropTableCommand(
+ ifExistsSet = true,
+ Some(childSchema.getRelationIdentifier.getDatabaseName),
+ childSchema.getRelationIdentifier.getTableName,
+ dropChildTable = true
+ ).processMetadata(sparkSession)
+ childTable
+ }
+ }
+
// fires the event after dropping main table
val dropTablePostEvent: DropTablePostEvent =
DropTablePostEvent(
@@ -123,6 +146,19 @@
val file = FileFactory.getCarbonFile(tablePath, fileType)
CarbonUtil.deleteFoldersAndFilesSilent(file)
}
+ if (carbonTable.hasDataMapSchema && childTables.nonEmpty) {
+ // drop all child tables
+ childTables.foreach { childTable =>
+ val carbonDropCommand = CarbonDropTableCommand(
+ ifExistsSet = true,
+ Some(childTable.getDatabaseName),
+ childTable.getTableName,
+ dropChildTable = true
+ )
+ carbonDropCommand.carbonTable = childTable
+ carbonDropCommand.processData(sparkSession)
+ }
+ }
}
Seq.empty
}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index d5ac5ae..a2d1064 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -146,7 +146,7 @@
thriftTableInfo: org.apache.carbondata.format.TableInfo,
carbonTablePath: String)(sparkSession: SparkSession): String = {
val schemaConverter = new ThriftWrapperSchemaConverterImpl
- updateHiveMetaStoreForDataMap(newTableIdentifier,
+ updateHiveMetaStoreForAlter(newTableIdentifier,
oldTableIdentifier,
thriftTableInfo,
carbonTablePath,
@@ -180,27 +180,6 @@
CarbonStorePath.getCarbonTablePath(oldTablePath, newTableIdentifier).getPath
}
- private def updateHiveMetaStoreForDataMap(newTableIdentifier: CarbonTableIdentifier,
- oldTableIdentifier: CarbonTableIdentifier,
- thriftTableInfo: format.TableInfo,
- tablePath: String,
- sparkSession: SparkSession,
- schemaConverter: ThriftWrapperSchemaConverterImpl) = {
- val newTablePath =
- CarbonUtil.getNewTablePath(new Path(tablePath), newTableIdentifier.getTableName)
- val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
- thriftTableInfo,
- newTableIdentifier.getDatabaseName,
- newTableIdentifier.getTableName,
- newTablePath)
- val dbName = oldTableIdentifier.getDatabaseName
- val tableName = oldTableIdentifier.getTableName
- sparkSession.catalog.refreshTable(TableIdentifier(tableName, Some(dbName)).quotedString)
- removeTableFromMetadata(dbName, tableName)
- CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
- newTablePath
- }
-
/**
* This method will is used to remove the evolution entry in case of failure.
*