blob: be9dafc4a8f7b7545e2a763604d8492f08c58144 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command.datamap
import scala.collection.JavaConverters._
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command._
import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException}
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.datamap.{DataMapProvider, DataMapStoreManager}
import org.apache.carbondata.core.datamap.status.DataMapStatusManager
import org.apache.carbondata.core.metadata.ColumnarFormatVersion
import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.metadata.schema.datamap.{DataMapClassProvider, DataMapProperty}
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.datamap.{DataMapManager, IndexDataMapProvider}
import org.apache.carbondata.events._
/**
* Below command class will be used to create datamap on table
* and updating the parent table about the datamap information
*/
case class CarbonCreateDataMapCommand(
dataMapName: String,
tableIdentifier: Option[TableIdentifier],
dmProviderName: String,
dmProperties: Map[String, String],
queryString: Option[String],
ifNotExistsSet: Boolean = false,
var deferredRebuild: Boolean = false)
extends AtomicRunnableCommand {
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
private var dataMapProvider: DataMapProvider = _
private var mainTable: CarbonTable = _
private var dataMapSchema: DataMapSchema = _
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
// since streaming segment does not support building index and pre-aggregate yet,
// so streaming table does not support create datamap
mainTable = tableIdentifier match {
case Some(table) =>
CarbonEnv.getCarbonTable(table.database, table.table)(sparkSession)
case _ => null
}
if (mainTable != null) {
setAuditTable(mainTable)
}
setAuditInfo(Map("provider" -> dmProviderName, "dmName" -> dataMapName) ++ dmProperties)
if (mainTable != null && !mainTable.getTableInfo.isTransactionalTable) {
throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
}
if (mainTable != null && mainTable.getDataMapSchema(dataMapName) != null) {
if (!ifNotExistsSet) {
throw new MalformedDataMapCommandException(s"DataMap name '$dataMapName' already exist")
} else {
return Seq.empty
}
}
if (mainTable != null && CarbonUtil.getFormatVersion(mainTable) != ColumnarFormatVersion.V3) {
throw new MalformedCarbonCommandException(s"Unsupported operation on table with " +
s"V1 or V2 format data")
}
dataMapSchema = new DataMapSchema(dataMapName, dmProviderName)
val property = dmProperties.map(x => (x._1.trim, x._2.trim)).asJava
val javaMap = new java.util.HashMap[String, String](property)
javaMap.put(DataMapProperty.DEFERRED_REBUILD, deferredRebuild.toString)
dataMapSchema.setProperties(javaMap)
if (dataMapSchema.isIndexDataMap && mainTable == null) {
throw new MalformedDataMapCommandException(
"For this datamap, main table is required. Use `CREATE DATAMAP ... ON TABLE ...` ")
}
dataMapProvider = DataMapManager.get.getDataMapProvider(mainTable, dataMapSchema, sparkSession)
if (deferredRebuild && !dataMapProvider.supportRebuild()) {
throw new MalformedDataMapCommandException(
s"DEFERRED REBUILD is not supported on this datamap $dataMapName" +
s" with provider ${dataMapSchema.getProviderName}")
}
if (null != mainTable) {
if (mainTable.isChildTableForMV) {
throw new MalformedDataMapCommandException(
"Cannot create DataMap on child table " + mainTable.getTableUniqueName)
}
}
if (!dataMapSchema.isIndexDataMap) {
if (DataMapStoreManager.getInstance().getAllDataMapSchemas.asScala
.exists(_.getDataMapName.equalsIgnoreCase(dataMapSchema.getDataMapName))) {
if (!ifNotExistsSet) {
throw new MalformedDataMapCommandException(
"DataMap with name " + dataMapSchema.getDataMapName + " already exists in storage")
} else {
return Seq.empty
}
}
}
val systemFolderLocation: String = CarbonProperties.getInstance().getSystemFolderLocation
val operationContext: OperationContext = new OperationContext()
// If it is index datamap, check whether the column has datamap created already
dataMapProvider match {
case provider: IndexDataMapProvider =>
val isBloomFilter = DataMapClassProvider.BLOOMFILTER.getShortName
.equalsIgnoreCase(dmProviderName)
val datamaps = DataMapStoreManager.getInstance.getAllDataMap(mainTable).asScala
val thisDmProviderName =
dataMapProvider.asInstanceOf[IndexDataMapProvider].getDataMapSchema.getProviderName
val existingIndexColumn4ThisProvider = datamaps.filter { datamap =>
thisDmProviderName.equalsIgnoreCase(datamap.getDataMapSchema.getProviderName)
}.flatMap { datamap =>
datamap.getDataMapSchema.getIndexColumns
}.distinct
provider.getIndexedColumns.asScala.foreach { column =>
if (existingIndexColumn4ThisProvider.contains(column.getColName)) {
throw new MalformedDataMapCommandException(String.format(
"column '%s' already has %s index datamap created",
column.getColName, thisDmProviderName))
} else if (isBloomFilter) {
if (column.getDataType == DataTypes.BINARY) {
throw new MalformedDataMapCommandException(
s"BloomFilter datamap does not support Binary datatype column: ${
column.getColName
}")
}
// if datamap provider is bloomfilter,the index column datatype cannot be complex type
if (column.isComplex) {
throw new MalformedDataMapCommandException(
s"BloomFilter datamap does not support complex datatype column: ${
column.getColName
}")
}
}
}
val createDataMapPreExecutionEvent: CreateDataMapPreExecutionEvent =
new CreateDataMapPreExecutionEvent(sparkSession,
systemFolderLocation, tableIdentifier.get)
OperationListenerBus.getInstance().fireEvent(createDataMapPreExecutionEvent,
operationContext)
dataMapProvider.initMeta(queryString.orNull)
DataMapStatusManager.disableDataMap(dataMapName)
case _ =>
val createDataMapPreExecutionEvent: CreateDataMapPreExecutionEvent =
CreateDataMapPreExecutionEvent(sparkSession,
systemFolderLocation, tableIdentifier.orNull)
OperationListenerBus.getInstance().fireEvent(createDataMapPreExecutionEvent,
operationContext)
dataMapProvider.initMeta(queryString.orNull)
}
val createDataMapPostExecutionEvent: CreateDataMapPostExecutionEvent =
CreateDataMapPostExecutionEvent(sparkSession,
systemFolderLocation, tableIdentifier, dmProviderName)
OperationListenerBus.getInstance().fireEvent(createDataMapPostExecutionEvent,
operationContext)
Seq.empty
}
override def processData(sparkSession: SparkSession): Seq[Row] = {
if (dataMapProvider != null) {
dataMapProvider.initData()
// TODO: remove these checks once the preaggregate and preaggregate timeseries are deprecated
if (mainTable != null && !deferredRebuild && dataMapSchema.isIndexDataMap) {
dataMapProvider.rebuild()
if (dataMapSchema.isIndexDataMap) {
val operationContext: OperationContext = new OperationContext()
val systemFolderLocation: String = CarbonProperties.getInstance().getSystemFolderLocation
val updateDataMapPreExecutionEvent: UpdateDataMapPreExecutionEvent =
new UpdateDataMapPreExecutionEvent(sparkSession,
systemFolderLocation, tableIdentifier.get)
OperationListenerBus.getInstance().fireEvent(updateDataMapPreExecutionEvent,
operationContext)
DataMapStatusManager.enableDataMap(dataMapName)
val updateDataMapPostExecutionEvent: UpdateDataMapPostExecutionEvent =
new UpdateDataMapPostExecutionEvent(sparkSession,
systemFolderLocation, tableIdentifier.get)
OperationListenerBus.getInstance().fireEvent(updateDataMapPostExecutionEvent,
operationContext)
}
}
if (null != dataMapSchema.getRelationIdentifier && !dataMapSchema.isIndexDataMap &&
!dataMapSchema.isLazy) {
DataMapStatusManager.enableDataMap(dataMapName)
}
}
Seq.empty
}
override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = {
if (dataMapProvider != null) {
val table =
if (mainTable != null) {
Some(TableIdentifier(mainTable.getTableName, Some(mainTable.getDatabaseName)))
} else {
None
}
CarbonDropDataMapCommand(
dataMapName,
true,
table,
forceDrop = false).run(sparkSession)
}
Seq.empty
}
override protected def opName: String = "CREATE DATAMAP"
}