blob: 1ce09fbf6eaf6f29ea8238859219a6f4a5683bc6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command.preaaggregate
import java.util.UUID
import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.AlterTableModel
import org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand, CarbonLoadDataCommand}
import org.apache.spark.sql.execution.command.partition.CarbonAlterTableDropHivePartitionCommand
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
import org.apache.carbondata.common.exceptions.MetadataProcessException
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable}
import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events._
import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
object AlterTableDropPartitionPreStatusListener extends OperationEventListener {
/**
* Called on a specified event occurrence
*
* @param event
* @param operationContext
*/
override protected def onEvent(event: Event,
operationContext: OperationContext) = {
val preStatusListener = event.asInstanceOf[AlterTableDropPartitionPreStatusEvent]
val carbonTable = preStatusListener.carbonTable
val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
val childCommands =
childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
childCommands.foreach(_.processData(SparkSession.getActiveSession.get))
}
}
}
trait CommitHelper {
val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
protected def markInProgressSegmentAsDeleted(tableStatusFile: String,
operationContext: OperationContext,
carbonTable: CarbonTable): Unit = {
val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(tableStatusFile)
val segmentBeingLoaded =
operationContext.getProperty(carbonTable.getTableUniqueName + "_Segment").toString
val newDetails = loadMetaDataDetails.collect {
case detail if detail.getLoadName.equalsIgnoreCase(segmentBeingLoaded) =>
detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE)
detail
case others => others
}
SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFile, newDetails)
}
/**
* Used to rename table status files for commit operation.
*/
protected def renameDataMapTableStatusFiles(sourceFileName: String,
destinationFileName: String, uuid: String): Boolean = {
val oldCarbonFile = FileFactory.getCarbonFile(sourceFileName)
val newCarbonFile = FileFactory.getCarbonFile(destinationFileName)
if (oldCarbonFile.exists() && newCarbonFile.exists()) {
val backUpPostFix = if (uuid.nonEmpty) {
"_backup_" + uuid
} else {
""
}
LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix}")
if (newCarbonFile.renameForce(destinationFileName + backUpPostFix)) {
LOGGER.info(s"Renaming $oldCarbonFile to $destinationFileName")
oldCarbonFile.renameForce(destinationFileName)
} else {
LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix} failed")
false
}
} else {
/**
* Tablestatus_uuid will fail when Pre-Aggregate table is not valid for compaction.
* Hence this should return true
*/
true
}
}
/**
* Used to remove table status files with UUID and segment folders.
*/
protected def cleanUpStaleTableStatusFiles(
childTables: Seq[CarbonTable],
operationContext: OperationContext,
uuid: String): Unit = {
childTables.foreach { childTable =>
val metaDataDir = FileFactory.getCarbonFile(
CarbonTablePath.getMetadataPath(childTable.getTablePath))
val tableStatusFiles = metaDataDir.listFiles(new CarbonFileFilter {
override def accept(file: CarbonFile): Boolean = {
file.getName.contains(uuid) || file.getName.contains("backup")
}
})
tableStatusFiles.foreach(_.delete())
}
}
}
object AlterTableDropPartitionPostStatusListener extends OperationEventListener with CommitHelper {
/**
* Called on a specified event occurrence
*
* @param event
* @param operationContext
*/
override protected def onEvent(event: Event,
operationContext: OperationContext) = {
val postStatusListener = event.asInstanceOf[AlterTableDropPartitionPostStatusEvent]
val carbonTable = postStatusListener.carbonTable
val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
val uuid = Option(operationContext.getProperty("uuid")).getOrElse("").toString
if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
val childCommands =
childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
val updateFailed = try {
val renamedDataMaps = childCommands.takeWhile {
childCommand =>
val childCarbonTable = childCommand.table
val oldTableSchemaPath = CarbonTablePath.getTableStatusFilePathWithUUID(
childCarbonTable.getTablePath, uuid)
// Generate table status file name without UUID, forExample: tablestatus
val newTableSchemaPath = CarbonTablePath.getTableStatusFilePath(
childCarbonTable.getTablePath)
renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid)
}
// if true then the commit for one of the child tables has failed
val commitFailed = renamedDataMaps.lengthCompare(childCommands.length) != 0
if (commitFailed) {
LOGGER.info("Reverting table status file to original state")
renamedDataMaps.foreach {
command =>
val carbonTable = command.table
// rename the backup tablestatus i.e tablestatus_backup_UUID to tablestatus
val backupTableSchemaPath =
CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + "_backup_" + uuid
val tableSchemaPath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
renameDataMapTableStatusFiles(backupTableSchemaPath, tableSchemaPath, "")
}
}
commitFailed
} finally {
// after success/failure of commit delete all tablestatus files with UUID in their names.
// if commit failed then remove the segment directory
// TODO Need to handle deletion on tablestatus files with UUID in cleanup command.
cleanUpStaleTableStatusFiles(childCommands.map(_.table),
operationContext,
uuid)
}
if (updateFailed) {
sys.error("Failed to update table status for pre-aggregate table")
}
}
}
}
object AlterTableDropPartitionMetaListener extends OperationEventListener{
/**
* Called on a specified event occurrence
*
* @param event
* @param operationContext
*/
override protected def onEvent(event: Event,
operationContext: OperationContext) = {
val dropPartitionEvent = event.asInstanceOf[AlterTableDropPartitionMetaEvent]
val parentCarbonTable = dropPartitionEvent.parentCarbonTable
val partitionsToBeDropped = dropPartitionEvent.specs.flatMap(_.keys)
val sparkSession = SparkSession.getActiveSession.get
if (parentCarbonTable.hasAggregationDataMap) {
// used as a flag to block direct drop partition on aggregate tables fired by the user
operationContext.setProperty("isInternalDropCall", "true")
// Filter out all the tables which dont have the partition being dropped.
val childTablesWithoutPartitionColumns =
parentCarbonTable.getTableInfo.getDataMapSchemaList.asScala.filter { dataMapSchema =>
val childColumns = dataMapSchema.getChildSchema.getListOfColumns.asScala
val partitionColExists = partitionsToBeDropped.forall {
partition =>
childColumns.exists { childColumn =>
childColumn.getAggFunction.isEmpty &&
childColumn.getParentColumnTableRelations != null &&
childColumn.getParentColumnTableRelations.asScala.head.getColumnName.
equals(partition)
}
}
!partitionColExists
}
if (childTablesWithoutPartitionColumns.nonEmpty) {
throw new MetadataProcessException(s"Cannot drop partition as one of the partition is not" +
s" participating in the following datamaps ${
childTablesWithoutPartitionColumns.toList.map(_.getChildSchema.getTableName)
}. Please drop the specified aggregate tables to continue")
} else {
val childDropPartitionCommands =
parentCarbonTable.getTableInfo.getDataMapSchemaList.asScala.map { dataMapSchema =>
val tableIdentifier = TableIdentifier(dataMapSchema.getChildSchema.getTableName,
Some(parentCarbonTable.getDatabaseName))
// as the aggregate table columns start with parent table name therefore the
// partition column also has to be updated with parent table name to generate
// partitionSpecs for the child table.
val childSpecs = dropPartitionEvent.specs.map {
spec => spec.map {
case (key, value) => (s"${parentCarbonTable.getTableName}_$key", value)
}
}
CarbonAlterTableDropHivePartitionCommand(
tableIdentifier,
childSpecs,
dropPartitionEvent.ifExists,
dropPartitionEvent.purge,
dropPartitionEvent.retainData,
operationContext)
}
operationContext.setProperty("dropPartitionCommands", childDropPartitionCommands)
childDropPartitionCommands.foreach(_.processMetadata(SparkSession.getActiveSession.get))
}
} else if (parentCarbonTable.isChildDataMap) {
if (operationContext.getProperty("isInternalDropCall") == null) {
throw new UnsupportedOperationException("Cannot drop partition directly on aggregate table")
}
}
}
}
/**
* below class will be used to create load command for compaction
* for all the pre agregate child data map
*/
object CompactionProcessMetaListener extends OperationEventListener {
/**
* Called on a specified event occurrence
*
* @param event
* @param operationContext
*/
override protected def onEvent(event: Event,
operationContext: OperationContext): Unit = {
val sparkSession = SparkSession.getActiveSession.get
val tableEvent = event.asInstanceOf[LoadMetadataEvent]
val table = tableEvent.getCarbonTable
if (!table.isChildDataMap && CarbonUtil.hasAggregationDataMap(table)) {
val aggregationDataMapList = table.getTableInfo.getDataMapSchemaList.asScala
.filter(_.isInstanceOf[AggregationDataMapSchema])
.asInstanceOf[mutable.ArrayBuffer[AggregationDataMapSchema]]
for (dataMapSchema: AggregationDataMapSchema <- aggregationDataMapList) {
val childTableName = dataMapSchema.getRelationIdentifier.getTableName
val childDatabaseName = dataMapSchema.getRelationIdentifier.getDatabaseName
// Creating a new query string to insert data into pre-aggregate table from that same table.
// For example: To compact preaggtable1 we can fire a query like insert into preaggtable1
// select * from preaggtable1
// The following code will generate the select query with a load UDF that will be used to
// apply DataLoadingRules
val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser()
// adding the aggregation load UDF
.addPreAggLoadFunction(
// creating the select query on the bases on table schema
PreAggregateUtil.createChildSelectQuery(
dataMapSchema.getChildSchema, table.getDatabaseName))).drop("preAggLoad")
val loadCommand = PreAggregateUtil.createLoadCommandForChild(
dataMapSchema.getChildSchema.getListOfColumns,
TableIdentifier(childTableName, Some(childDatabaseName)),
childDataFrame,
false,
sparkSession)
val uuid = Option(operationContext.getProperty("uuid")).
getOrElse(UUID.randomUUID()).toString
operationContext.setProperty("uuid", uuid)
loadCommand.processMetadata(sparkSession)
operationContext
.setProperty(dataMapSchema.getChildSchema.getTableName + "_Compaction", loadCommand)
loadCommand.operationContext = operationContext
}
} else if (table.isChildDataMap) {
val childTableName = table.getTableName
val childDatabaseName = table.getDatabaseName
// Creating a new query string to insert data into pre-aggregate table from that same table.
// For example: To compact preaggtable1 we can fire a query like insert into preaggtable1
// select * from preaggtable1
// The following code will generate the select query with a load UDF that will be used to
// apply DataLoadingRules
val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser()
// adding the aggregation load UDF
.addPreAggLoadFunction(
// creating the select query on the bases on table schema
PreAggregateUtil.createChildSelectQuery(
table.getTableInfo.getFactTable, table.getDatabaseName))).drop("preAggLoad")
val loadCommand = PreAggregateUtil.createLoadCommandForChild(
table.getTableInfo.getFactTable.getListOfColumns,
TableIdentifier(childTableName, Some(childDatabaseName)),
childDataFrame,
false,
sparkSession)
val uuid = Option(operationContext.getProperty("uuid")).getOrElse("").toString
loadCommand.processMetadata(sparkSession)
operationContext.setProperty(table.getTableName + "_Compaction", loadCommand)
operationContext.setProperty("uuid", uuid)
loadCommand.operationContext = operationContext
}
}
}
/**
* Below class to is to create LoadCommand for loading the
* the data of pre aggregate data map
*/
object LoadProcessMetaListener extends OperationEventListener {
/**
* Called on a specified event occurrence
*
* @param event
* @param operationContext
*/
override protected def onEvent(event: Event,
operationContext: OperationContext): Unit = {
val sparkSession = SparkSession.getActiveSession.get
val tableEvent = event.asInstanceOf[LoadMetadataEvent]
if (!tableEvent.isCompaction) {
val table = tableEvent.getCarbonTable
if (CarbonUtil.hasAggregationDataMap(table)) {
// getting all the aggergate datamap schema
val aggregationDataMapList = table.getTableInfo.getDataMapSchemaList.asScala
.filter(_.isInstanceOf[AggregationDataMapSchema])
.asInstanceOf[mutable.ArrayBuffer[AggregationDataMapSchema]]
// sorting the datamap for timeseries rollup
val sortedList = aggregationDataMapList.sortBy(_.getOrdinal)
val parentTableName = table.getTableName
val databaseName = table.getDatabaseName
// if the table is child then extract the uuid from the operation context and the parent
// would already generated UUID.
// if parent table then generate a new UUID else use empty.
val uuid =
Option(operationContext.getProperty("uuid")).getOrElse(UUID.randomUUID()).toString
val list = scala.collection.mutable.ListBuffer.empty[AggregationDataMapSchema]
for (dataMapSchema: AggregationDataMapSchema <- sortedList) {
val childTableName = dataMapSchema.getRelationIdentifier.getTableName
val childDatabaseName = dataMapSchema.getRelationIdentifier.getDatabaseName
val childSelectQuery = if (!dataMapSchema.isTimeseriesDataMap) {
(PreAggregateUtil.getChildQuery(dataMapSchema), "")
} else {
// for timeseries rollup policy
val tableSelectedForRollup = PreAggregateUtil.getRollupDataMapNameForTimeSeries(list,
dataMapSchema)
list += dataMapSchema
// if non of the rollup data map is selected hit the maintable and prepare query
if (tableSelectedForRollup.isEmpty) {
(PreAggregateUtil.createTimeSeriesSelectQueryFromMain(dataMapSchema.getChildSchema,
parentTableName,
databaseName), "")
} else {
// otherwise hit the select rollup datamap schema
(PreAggregateUtil.createTimeseriesSelectQueryForRollup(dataMapSchema.getChildSchema,
tableSelectedForRollup.get,
databaseName),
s"$databaseName.${tableSelectedForRollup.get.getChildSchema.getTableName}")
}
}
val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser().addPreAggLoadFunction(
childSelectQuery._1)).drop("preAggLoad")
val isOverwrite =
operationContext.getProperty("isOverwrite").asInstanceOf[Boolean]
val loadCommand = PreAggregateUtil.createLoadCommandForChild(
dataMapSchema.getChildSchema.getListOfColumns,
TableIdentifier(childTableName, Some(childDatabaseName)),
childDataFrame,
isOverwrite,
sparkSession,
timeseriesParentTableName = childSelectQuery._2)
operationContext.setProperty("uuid", uuid)
loadCommand.operationContext.setProperty("uuid", uuid)
loadCommand.processMetadata(sparkSession)
operationContext.setProperty(dataMapSchema.getChildSchema.getTableName, loadCommand)
}
}
}
}
}
object LoadPostAggregateListener extends OperationEventListener {
/**
* Called on a specified event occurrence
*
* @param event
*/
override def onEvent(event: Event, operationContext: OperationContext): Unit = {
val carbonLoadModelOption =
event match {
case e: LoadTablePreStatusUpdateEvent => Some(e.getCarbonLoadModel)
case e: LoadTablePostExecutionEvent => Some(e.getCarbonLoadModel)
case _ => None
}
val sparkSession = SparkSession.getActiveSession.get
if (carbonLoadModelOption.isDefined) {
val carbonLoadModel = carbonLoadModelOption.get
val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
if (CarbonUtil.hasAggregationDataMap(table)) {
// getting all the aggergate datamap schema
val aggregationDataMapList = table.getTableInfo.getDataMapSchemaList.asScala
.filter(_.isInstanceOf[AggregationDataMapSchema])
.asInstanceOf[mutable.ArrayBuffer[AggregationDataMapSchema]]
// sorting the datamap for timeseries rollup
val sortedList = aggregationDataMapList.sortBy(_.getOrdinal)
for (dataMapSchema: AggregationDataMapSchema <- sortedList) {
val childLoadCommand = operationContext
.getProperty(dataMapSchema.getChildSchema.getTableName)
.asInstanceOf[CarbonLoadDataCommand]
childLoadCommand.dataFrame = Some(PreAggregateUtil
.getDataFrame(sparkSession, childLoadCommand.logicalPlan.get))
val isOverwrite =
operationContext.getProperty("isOverwrite").asInstanceOf[Boolean]
childLoadCommand.operationContext = operationContext
val timeseriesParent = childLoadCommand.internalOptions.get("timeseriesParent")
val (parentTableIdentifier, segmentToLoad) =
if (timeseriesParent.isDefined && timeseriesParent.get.nonEmpty) {
val (parentTableDatabase, parentTableName) =
(timeseriesParent.get.split('.')(0), timeseriesParent.get.split('.')(1))
(TableIdentifier(parentTableName, Some(parentTableDatabase)),
operationContext.getProperty(
s"${ parentTableDatabase }_${ parentTableName }_Segment").toString)
} else {
val currentSegmentFile = operationContext.getProperty("current.segmentfile")
val segment = if (currentSegmentFile != null) {
new Segment(carbonLoadModel.getSegmentId, currentSegmentFile.toString)
} else {
Segment.toSegment(carbonLoadModel.getSegmentId, null)
}
(TableIdentifier(table.getTableName, Some(table.getDatabaseName)), segment.toString)
}
PreAggregateUtil.startDataLoadForDataMap(
parentTableIdentifier,
segmentToLoad,
validateSegments = false,
childLoadCommand,
isOverwrite,
sparkSession)
}
}
}
}
}
/**
* This listener is used to commit all the child data aggregate tables in one transaction. If one
* failes all will be reverted to original state.
*/
object CommitPreAggregateListener extends OperationEventListener with CommitHelper {
override protected def onEvent(event: Event,
operationContext: OperationContext): Unit = {
// The same listener is called for both compaction and load therefore getting the
// carbonLoadModel from the appropriate event.
val carbonLoadModel = event match {
case loadEvent: LoadTablePostStatusUpdateEvent =>
loadEvent.getCarbonLoadModel
case compactionEvent: AlterTableCompactionPostStatusUpdateEvent =>
compactionEvent.carbonLoadModel
}
val isCompactionFlow = Option(
operationContext.getProperty("isCompaction")).getOrElse("false").toString.toBoolean
val dataMapSchemas =
carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getTableInfo.getDataMapSchemaList
.asScala.filter(_.getChildSchema != null)
// extract all child LoadCommands
val childLoadCommands = if (!isCompactionFlow) {
// If not compaction flow then the key for load commands will be tableName
dataMapSchemas.map { dataMapSchema =>
operationContext.getProperty(dataMapSchema.getChildSchema.getTableName)
.asInstanceOf[CarbonLoadDataCommand]
}
} else {
// If not compaction flow then the key for load commands will be tableName_Compaction
dataMapSchemas.map { dataMapSchema =>
operationContext.getProperty(dataMapSchema.getChildSchema.getTableName + "_Compaction")
.asInstanceOf[CarbonLoadDataCommand]
}
}
if (dataMapSchemas.nonEmpty) {
val uuid = operationContext.getProperty("uuid").toString
// keep committing until one fails
val renamedDataMaps = childLoadCommands.takeWhile { childLoadCommand =>
val childCarbonTable = childLoadCommand.table
// Generate table status file name with UUID, forExample: tablestatus_1
val oldTableSchemaPath = CarbonTablePath.getTableStatusFilePathWithUUID(
childCarbonTable.getTablePath, uuid)
// Generate table status file name without UUID, forExample: tablestatus
val newTableSchemaPath = CarbonTablePath.getTableStatusFilePath(
childCarbonTable.getTablePath)
renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid)
}
// if true then the commit for one of the child tables has failed
val commitFailed = renamedDataMaps.lengthCompare(dataMapSchemas.length) != 0
if (commitFailed) {
LOGGER.warn("Reverting table status file to original state")
renamedDataMaps.foreach {
loadCommand =>
val carbonTable = loadCommand.table
// rename the backup tablestatus i.e tablestatus_backup_UUID to tablestatus
val backupTableSchemaPath =
CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + "_backup_" + uuid
val tableSchemaPath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
markInProgressSegmentAsDeleted(backupTableSchemaPath, operationContext, carbonTable)
renameDataMapTableStatusFiles(backupTableSchemaPath, tableSchemaPath, "")
}
}
// after success/failure of commit delete all tablestatus files with UUID in their names.
// if commit failed then remove the segment directory
cleanUpStaleTableStatusFiles(childLoadCommands.map(_.table),
operationContext,
uuid)
if (commitFailed) {
sys.error("Failed to update table status for pre-aggregate table")
}
}
}
}
/**
* Listener to handle the operations that have to be done after compaction for a table has finished.
*/
object AlterPreAggregateTableCompactionPostListener extends OperationEventListener {
/**
* Called on a specified event occurrence
*
* @param event
* @param operationContext
*/
override def onEvent(event: Event, operationContext: OperationContext): Unit = {
val compactionEvent = event.asInstanceOf[AlterTableCompactionPreStatusUpdateEvent]
val carbonTable = compactionEvent.carbonTable
val compactionType = compactionEvent.carbonMergerMapping.campactionType
val carbonLoadModel = compactionEvent.carbonLoadModel
val sparkSession = compactionEvent.sparkSession
if (CarbonUtil.hasAggregationDataMap(carbonTable)) {
carbonTable.getTableInfo.getDataMapSchemaList.asScala.foreach { dataMapSchema =>
val childRelationIdentifier = dataMapSchema.getRelationIdentifier
val alterTableModel = AlterTableModel(Some(childRelationIdentifier.getDatabaseName),
childRelationIdentifier.getTableName,
None,
compactionType.toString,
Some(System.currentTimeMillis()),
"")
operationContext.setProperty(
dataMapSchema.getRelationIdentifier.getDatabaseName + "_" +
dataMapSchema.getRelationIdentifier.getTableName + "_Segment",
carbonLoadModel.getSegmentId)
CarbonAlterTableCompactionCommand(alterTableModel, operationContext = operationContext)
.run(sparkSession)
}
}
}
}
object LoadPreAggregateTablePreListener extends OperationEventListener {
/**
* Called on a specified event occurrence
*
* @param event
* @param operationContext
*/
override def onEvent(event: Event, operationContext: OperationContext): Unit = {
val loadEvent = event.asInstanceOf[LoadTablePreExecutionEvent]
val carbonLoadModel = loadEvent.getCarbonLoadModel
val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val isInternalLoadCall = carbonLoadModel.isAggLoadRequest
if (table.isChildDataMap && !isInternalLoadCall) {
throw new UnsupportedOperationException(
"Cannot insert/load data directly into pre-aggregate table")
}
}
}
object PreAggregateDataTypeChangePreListener extends OperationEventListener {
/**
* Called on a specified event occurrence
*
* @param event
* @param operationContext
*/
override def onEvent(event: Event, operationContext: OperationContext): Unit = {
val dataTypeChangePreListener = event.asInstanceOf[AlterTableDataTypeChangePreEvent]
val carbonTable = dataTypeChangePreListener.carbonTable
val alterTableDataTypeChangeModel = dataTypeChangePreListener.alterTableDataTypeChangeModel
val columnToBeAltered: String = alterTableDataTypeChangeModel.columnName
if (CarbonUtil.hasAggregationDataMap(carbonTable)) {
val dataMapSchemas = carbonTable.getTableInfo.getDataMapSchemaList
dataMapSchemas.asScala.foreach { dataMapSchema =>
val childColumns = dataMapSchema.getChildSchema.getListOfColumns
val parentColumnNames = childColumns.asScala
.flatMap(_.getParentColumnTableRelations.asScala.map(_.getColumnName))
if (parentColumnNames.contains(columnToBeAltered)) {
throw new UnsupportedOperationException(
s"Column $columnToBeAltered exists in a pre-aggregate table. Drop pre-aggregate table" +
"to continue")
}
}
}
if (carbonTable.isChildDataMap) {
throw new UnsupportedOperationException(
s"Cannot change data type for columns in pre-aggregate table ${ carbonTable.getDatabaseName
}.${ carbonTable.getTableName }")
}
}
}
object PreAggregateAddColumnsPreListener extends OperationEventListener {
/**
* Called on a specified event occurrence
*
* @param event
* @param operationContext
*/
override def onEvent(event: Event, operationContext: OperationContext): Unit = {
val dataTypeChangePreListener = event.asInstanceOf[AlterTableAddColumnPreEvent]
val carbonTable = dataTypeChangePreListener.carbonTable
if (carbonTable.isChildDataMap) {
throw new UnsupportedOperationException(
s"Cannot add columns in pre-aggreagate table ${ carbonTable.getDatabaseName
}.${ carbonTable.getTableName }")
}
}
}
object PreAggregateDeleteSegmentByDatePreListener extends OperationEventListener {
/**
* Called on a specified event occurrence
*
* @param event
* @param operationContext
*/
override def onEvent(event: Event, operationContext: OperationContext): Unit = {
val deleteSegmentByDatePreEvent = event.asInstanceOf[DeleteSegmentByDatePreEvent]
val carbonTable = deleteSegmentByDatePreEvent.carbonTable
if (carbonTable != null) {
if (CarbonUtil.hasAggregationDataMap(carbonTable)) {
throw new UnsupportedOperationException(
"Delete segment operation is not supported on tables which have a pre-aggregate table. " +
"Drop pre-aggregation table to continue")
}
if (carbonTable.isChildDataMap) {
throw new UnsupportedOperationException(
"Delete segment operation is not supported on pre-aggregate table")
}
}
}
}
object PreAggregateDeleteSegmentByIdPreListener extends OperationEventListener {
/**
* Called on a specified event occurrence
*
* @param event
* @param operationContext
*/
override def onEvent(event: Event, operationContext: OperationContext): Unit = {
val tableEvent = event.asInstanceOf[DeleteSegmentByIdPreEvent]
val carbonTable = tableEvent.carbonTable
if (carbonTable != null) {
if (CarbonUtil.hasAggregationDataMap(carbonTable)) {
throw new UnsupportedOperationException(
"Delete segment operation is not supported on tables which have a pre-aggregate table")
}
if (carbonTable.isChildDataMap) {
throw new UnsupportedOperationException(
"Delete segment operation is not supported on pre-aggregate table")
}
}
}
}
object PreAggregateDropColumnPreListener extends OperationEventListener {
/**
* Called on a specified event occurrence
*
* @param event
* @param operationContext
*/
override def onEvent(event: Event, operationContext: OperationContext): Unit = {
val dataTypeChangePreListener = event.asInstanceOf[AlterTableDropColumnPreEvent]
val carbonTable = dataTypeChangePreListener.carbonTable
val alterTableDropColumnModel = dataTypeChangePreListener.alterTableDropColumnModel
val columnsToBeDropped = alterTableDropColumnModel.columns
if (CarbonUtil.hasAggregationDataMap(carbonTable)) {
val dataMapSchemas = carbonTable.getTableInfo.getDataMapSchemaList
dataMapSchemas.asScala.foreach { dataMapSchema =>
val parentColumnNames = dataMapSchema.getChildSchema.getListOfColumns.asScala
.flatMap(_.getParentColumnTableRelations.asScala.map(_.getColumnName))
val columnExistsInChild = parentColumnNames.collectFirst {
case parentColumnName if columnsToBeDropped.contains(parentColumnName) =>
parentColumnName
}
if (columnExistsInChild.isDefined) {
throw new UnsupportedOperationException(
s"Column ${ columnExistsInChild.head } cannot be dropped because it exists in a " +
s"pre-aggregate table ${ dataMapSchema.getRelationIdentifier.toString }")
}
}
}
if (carbonTable.isChildDataMap) {
throw new UnsupportedOperationException(s"Cannot drop columns in pre-aggreagate table ${
carbonTable.getDatabaseName}.${ carbonTable.getTableName }")
}
}
}
object PreAggregateRenameTablePreListener extends OperationEventListener {
/**
* Called on a specified event occurrence
*
* @param event
* @param operationContext
*/
override def onEvent(event: Event,
operationContext: OperationContext): Unit = {
val renameTablePostListener = event.asInstanceOf[AlterTableRenamePreEvent]
val carbonTable = renameTablePostListener.carbonTable
if (carbonTable.isChildDataMap) {
throw new UnsupportedOperationException(
"Rename operation for pre-aggregate table is not supported.")
}
if (CarbonUtil.hasAggregationDataMap(carbonTable)) {
throw new UnsupportedOperationException(
"Rename operation is not supported for table with pre-aggregate tables")
}
}
}
object UpdatePreAggregatePreListener extends OperationEventListener {
/**
* Called on a specified event occurrence
*
* @param event
* @param operationContext
*/
override def onEvent(event: Event, operationContext: OperationContext): Unit = {
val tableEvent = event.asInstanceOf[UpdateTablePreEvent]
val carbonTable = tableEvent.carbonTable
if (carbonTable != null) {
if (CarbonUtil.hasAggregationDataMap(carbonTable)) {
throw new UnsupportedOperationException(
"Update operation is not supported for tables which have a pre-aggregate table. Drop " +
"pre-aggregate tables to continue.")
}
if (carbonTable.isChildDataMap) {
throw new UnsupportedOperationException(
"Update operation is not supported for pre-aggregate table")
}
}
}
}
object DeletePreAggregatePreListener extends OperationEventListener {
/**
* Called on a specified event occurrence
*
* @param event
* @param operationContext
*/
override def onEvent(event: Event, operationContext: OperationContext): Unit = {
val tableEvent = event.asInstanceOf[DeleteFromTablePreEvent]
val carbonTable = tableEvent.carbonTable
if (carbonTable != null) {
if (CarbonUtil.hasAggregationDataMap(carbonTable)) {
throw new UnsupportedOperationException(
"Delete operation is not supported for tables which have a pre-aggregate table. Drop " +
"pre-aggregate tables to continue.")
}
if (carbonTable.isChildDataMap) {
throw new UnsupportedOperationException(
"Delete operation is not supported for pre-aggregate table")
}
}
}
}