blob: 86b2d00e3e4c2643803bb3d536692c15e8106b54 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command.preaaggregate
import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.command.datamap.CarbonDropDataMapCommand
import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
import org.apache.spark.util.{DataMapUtil, PartitionUtils}
import org.apache.carbondata.common.exceptions.MetadataProcessException
import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.metadata.schema.partition.PartitionType
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
/**
* Below helper class will be used to create pre-aggregate table
* and updating the parent table about the child table information
* It will be either success or nothing happen in case of failure:
* 1. failed to create pre aggregate table.
* 2. failed to update main table
*
*/
case class PreAggregateTableHelper(
var parentTable: CarbonTable,
dataMapName: String,
dataMapClassName: String,
dataMapProperties: java.util.Map[String, String],
queryString: String,
timeSeriesFunction: Option[String] = None,
ifNotExistsSet: Boolean = false) {
var loadCommand: CarbonLoadDataCommand = _
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
def initMeta(sparkSession: SparkSession): Seq[Row] = {
val dmProperties = dataMapProperties.asScala
val updatedQuery = new CarbonSpark2SqlParser().addPreAggFunction(queryString)
val df = sparkSession.sql(updatedQuery)
val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
df.logicalPlan, queryString)
val partitionInfo = parentTable.getPartitionInfo
val fields = fieldRelationMap.keySet.toSeq
val tableProperties = mutable.Map[String, String]()
val usePartitioning = dataMapProperties.getOrDefault("partitioning", "true").toBoolean
val parentPartitionColumns = if (!usePartitioning) {
Seq.empty
} else if (parentTable.isHivePartitionTable) {
partitionInfo.getColumnSchemaList.asScala.map(_.getColumnName)
} else {
Seq()
}
// Generate child table partition columns in the same order as the parent table.
val partitionerFields =
PartitionUtils.getPartitionerFields(parentPartitionColumns, fieldRelationMap)
dmProperties.foreach(t => tableProperties.put(t._1, t._2))
val selectTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan)
if (!parentTable.getTableName.equalsIgnoreCase(selectTable.getTableName)) {
throw new MalformedDataMapCommandException(
"Parent table name is different in select and create")
}
DataMapUtil
.inheritTablePropertiesFromMainTable(parentTable, fields, fieldRelationMap, tableProperties)
val tableIdentifier =
TableIdentifier(parentTable.getTableName + "_" + dataMapName,
Some(parentTable.getDatabaseName))
// prepare table model of the collected tokens
val tableModel: TableModel = new CarbonSpark2SqlParser().prepareTableModel(
ifNotExistPresent = ifNotExistsSet,
new CarbonSpark2SqlParser().convertDbNameToLowerCase(tableIdentifier.database),
tableIdentifier.table.toLowerCase,
fields,
partitionerFields,
tableProperties,
None,
isAlterFlow = false,
true,
None)
// updating the relation identifier, this will be stored in child table
// which can be used during dropping of pre-aggreate table as parent table will
// also get updated
if(timeSeriesFunction != null) {
TimeSeriesUtil.validateTimeSeriesEventTime(dmProperties.toMap, parentTable)
TimeSeriesUtil.validateEventTimeColumnExitsInSelect(
fieldRelationMap,
dmProperties(TimeSeriesUtil.TIMESERIES_EVENTTIME))
TimeSeriesUtil.updateTimeColumnSelect(
fieldRelationMap,
dmProperties(TimeSeriesUtil.TIMESERIES_EVENTTIME),
timeSeriesFunction.get)
}
tableModel.parentTable = Some(parentTable)
tableModel.dataMapRelation = Some(fieldRelationMap)
val tablePath = if (dmProperties.contains("path")) {
dmProperties("path")
} else {
CarbonEnv.getTablePath(tableModel.databaseNameOp, tableModel.tableName)(sparkSession)
}
CarbonCreateTableCommand(TableNewProcessor(tableModel),
tableModel.ifNotExistsSet, Some(tablePath), isVisible = false).run(sparkSession)
val table = CarbonEnv.getCarbonTable(tableIdentifier)(sparkSession)
val tableInfo = table.getTableInfo
// child schema object will be saved on parent table schema
val childSchema = tableInfo.getFactTable.buildChildSchema(
dataMapName,
dataMapClassName,
tableInfo.getDatabaseName,
queryString,
"AGGREGATION")
dmProperties.foreach(f => childSchema.getProperties.put(f._1, f._2))
try {
// updating the parent table about child table
PreAggregateUtil.updateMainTable(parentTable, childSchema, sparkSession)
} catch {
case e: MetadataProcessException =>
throw e
case ex: Exception =>
// If updation failed then forcefully remove datamap from metastore.
val dropTableCommand = CarbonDropDataMapCommand(childSchema.getDataMapName,
ifExistsSet = true,
Some(TableIdentifier
.apply(parentTable.getTableName, Some(parentTable.getDatabaseName))),
forceDrop = true)
dropTableCommand.processMetadata(sparkSession)
throw ex
}
// After updating the parent carbon table with data map entry extract the latest table object
// to be used in further create process.
parentTable = CarbonEnv.getCarbonTable(Some(parentTable.getDatabaseName),
parentTable.getTableName)(sparkSession)
val updatedLoadQuery = if (timeSeriesFunction != null) {
PreAggregateUtil.createTimeSeriesSelectQueryFromMain(
childSchema.getChildSchema,
parentTable.getTableName,
parentTable.getDatabaseName)
} else {
queryString
}
val dataFrame = sparkSession.sql(new CarbonSpark2SqlParser().addPreAggLoadFunction(
updatedLoadQuery)).drop("preAggLoad")
loadCommand = PreAggregateUtil.createLoadCommandForChild(
childSchema.getChildSchema.getListOfColumns,
tableIdentifier,
dataFrame,
isOverwrite = false,
sparkSession = sparkSession,
mutable.Map.empty[String, String])
loadCommand.processMetadata(sparkSession)
Seq.empty
}
def initData(sparkSession: SparkSession): Seq[Row] = {
// load child table if parent table has existing segments
// This will be used to check if the parent table has any segments or not. If not then no
// need to fire load for pre-aggregate table. Therefore reading the load details for PARENT
// table.
SegmentStatusManager.deleteLoadsAndUpdateMetadata(
parentTable,
false,
CarbonFilters.getCurrentPartitions(
sparkSession,
TableIdentifier(parentTable.getTableName,
Some(parentTable.getDatabaseName))
).map(_.asJava).orNull)
if (SegmentStatusManager.isLoadInProgressInTable(parentTable)) {
throw new UnsupportedOperationException(
"Cannot create pre-aggregate table when insert is in progress on parent table")
}
// check if any segment if available for load in the parent table
val loadAvailable = SegmentStatusManager.readLoadMetadata(parentTable.getMetadataPath)
.collect {
case segment if segment.getSegmentStatus == SegmentStatus.SUCCESS ||
segment.getSegmentStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS =>
new Segment(segment.getLoadName, segment.getSegmentFile).toString
}
if (loadAvailable.nonEmpty) {
// Passing segmentToLoad as * because we want to load all the segments into the
// pre-aggregate table even if the user has set some segments on the parent table.
loadCommand.dataFrame = Some(PreAggregateUtil
.getDataFrame(sparkSession, loadCommand.logicalPlan.get))
PreAggregateUtil.startDataLoadForDataMap(
TableIdentifier(parentTable.getTableName, Some(parentTable.getDatabaseName)),
segmentToLoad = loadAvailable.mkString(","),
validateSegments = false,
loadCommand,
isOverwrite = false,
sparkSession)
} else {
LOGGER.info(s"No segment available for load in table ${parentTable.getTableUniqueName}")
}
Seq.empty
}
}