blob: 8ecf1071506194b5d4306ebe94dd7cbabc98bf04 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command.index
import java.util
import scala.collection.JavaConverters._
import org.apache.spark.sql._
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.hive.{CarbonHiveIndexMetadataUtil, CarbonRelation}
import org.apache.spark.sql.index.{CarbonIndexUtil, IndexTableUtil}
import org.apache.spark.sql.secondaryindex.command.IndexModel
import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedIndexCommandException}
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.index.status.IndexStatus
import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
import org.apache.carbondata.core.metadata.ColumnarFormatVersion
import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.metadata.index.IndexType
import org.apache.carbondata.core.metadata.schema.index.IndexProperty
import org.apache.carbondata.core.metadata.schema.indextable.{IndexMetadata, IndexTableInfo}
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, IndexSchema}
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.index.IndexProvider
/**
* Below command class will be used to create fg or cg index on table
* and updating the parent table about the index information
*/
case class CarbonCreateIndexCommand(
indexModel: IndexModel,
indexProviderName: String,
properties: Map[String, String],
ifNotExistsSet: Boolean = false,
var deferredRebuild: Boolean = false)
extends AtomicRunnableCommand {
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
private var provider: IndexProvider = _
private var parentTable: CarbonTable = _
private var indexSchema: IndexSchema = _
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
val indexName = indexModel.indexName
val parentTableName = indexModel.tableName
// get parent table
parentTable = CarbonEnv.getCarbonTable(indexModel.dbName, parentTableName)(sparkSession)
val errMsg = s"Parent Table `$parentTableName` is not found. " +
s"To create index, main table is required"
if (parentTable == null) {
throw new MalformedIndexCommandException(errMsg)
}
val dbName = parentTable.getDatabaseName
indexSchema = new IndexSchema(indexName, indexProviderName)
val property = properties.map(x => (x._1.trim, x._2.trim)).asJava
val indexProperties = new java.util.LinkedHashMap[String, String](property)
indexProperties.put(IndexProperty.DEFERRED_REBUILD, deferredRebuild.toString)
indexProperties.put(CarbonCommonConstants.INDEX_COLUMNS, indexModel.columnNames.mkString(","))
indexProperties.put(CarbonCommonConstants.INDEX_PROVIDER, indexProviderName)
setAuditTable(parentTable)
setAuditInfo(Map("provider" -> indexProviderName, "indexName" -> indexName) ++ properties)
if (!parentTable.getTableInfo.isTransactionalTable) {
throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
}
if (parentTable.isMV || parentTable.isIndexTable) {
throw new MalformedIndexCommandException(
"Cannot create index on child table `" + indexName + "`")
}
if (CarbonUtil.getFormatVersion(parentTable) != ColumnarFormatVersion.V3) {
throw new MalformedCarbonCommandException(
s"Unsupported operation on table with V1 or V2 format data")
}
// get metadata lock to avoid concurrent create index operations
val metadataLock = CarbonLockFactory.getCarbonLockObj(
parentTable.getAbsoluteTableIdentifier,
LockUsage.METADATA_LOCK)
try {
if (metadataLock.lockWithRetries()) {
LOGGER.info(s"Acquired the metadata lock for table $dbName.$parentTableName")
// get carbon table again to reflect any changes during lock acquire.
parentTable =
CarbonEnv.getInstance(sparkSession).carbonMetaStore
.lookupRelation(Some(dbName), parentTableName)(sparkSession)
.asInstanceOf[CarbonRelation].carbonTable
if (parentTable == null) {
throw new MalformedIndexCommandException(errMsg)
}
val oldIndexMetaData = parentTable.getIndexMetadata
// check whether the column has index created already
if (null != oldIndexMetaData) {
val indexExistsInCarbon = oldIndexMetaData.getIndexTables.asScala.contains(indexName)
if (indexExistsInCarbon) {
throw new MalformedIndexCommandException(
"Index with name `" + indexName + "` already exists on table `" + parentTableName +
"`")
}
}
val existingIndexColumnsForThisProvider =
if (null != oldIndexMetaData &&
null != oldIndexMetaData.getIndexesMap.get(indexProviderName)) {
oldIndexMetaData.getIndexesMap.get(indexProviderName).values().asScala.flatMap {
key => key.get(CarbonCommonConstants.INDEX_COLUMNS).split(",").toList
}.toList
} else {
Seq.empty
}
val indexColumns = validateAndGetIndexColumns(parentTable,
indexModel.columnNames.toArray,
indexProviderName)
indexColumns.asScala.foreach { column =>
if (existingIndexColumnsForThisProvider.contains(column.getColName)) {
throw new MalformedIndexCommandException(String.format(
"column '%s' already has %s index created",
column.getColName, indexProviderName))
}
val isBloomFilter = IndexType.BLOOMFILTER.getIndexProviderName
.equalsIgnoreCase(indexProviderName)
if (isBloomFilter) {
if (column.getDataType == DataTypes.BINARY) {
throw new MalformedIndexCommandException(
s"BloomFilter does not support Binary datatype column: ${
column.getColName
}")
}
// For bloom filter, the index column datatype cannot be complex type
if (column.isComplex) {
throw new MalformedIndexCommandException(
s"BloomFilter does not support complex datatype column: ${
column.getColName
}")
}
}
}
// set properties
indexSchema.setProperties(indexProperties)
provider = new IndexProvider(parentTable, indexSchema, sparkSession)
if (deferredRebuild && !provider.supportRebuild()) {
throw new MalformedIndexCommandException(
"DEFERRED REFRESH is not supported on this index " + indexModel.indexName +
" with provider " + indexProviderName)
} else if (deferredRebuild && provider.supportRebuild()) {
indexProperties.put(CarbonCommonConstants.INDEX_STATUS, IndexStatus.DISABLED.name())
}
var oldIndexInfo = parentTable.getIndexInfo
if (null == oldIndexInfo) {
oldIndexInfo = ""
}
val indexInfo = IndexTableUtil.checkAndAddIndexTable(
oldIndexInfo,
new IndexTableInfo(dbName, indexName,
indexProperties),
false)
// set index information in parent table
val parentIndexMetadata = if (
parentTable.getTableInfo.getFactTable.getTableProperties
.get(parentTable.getCarbonTableIdentifier.getTableId) != null) {
parentTable.getIndexMetadata
} else {
new IndexMetadata(false)
}
parentIndexMetadata.addIndexTableInfo(indexProviderName,
indexName,
indexProperties)
parentTable.getTableInfo.getFactTable.getTableProperties
.put(parentTable.getCarbonTableIdentifier.getTableId, parentIndexMetadata.serialize)
sparkSession.sql(
s"""ALTER TABLE $dbName.$parentTableName SET SERDEPROPERTIES ('indexInfo' =
|'$indexInfo')""".stripMargin).collect()
CarbonHiveIndexMetadataUtil.refreshTable(dbName, parentTableName, sparkSession)
} else {
LOGGER.error(s"Not able to acquire the metadata lock for table" +
s" $dbName.$parentTableName")
}
} finally {
metadataLock.unlock()
}
Seq.empty
}
override def processData(sparkSession: SparkSession): Seq[Row] = {
if (provider != null) {
provider.setMainTable(parentTable)
if (!deferredRebuild) {
provider.rebuild()
// enable bloom or lucene index
// get metadata lock to avoid concurrent create index operations
val metadataLock = CarbonLockFactory.getCarbonLockObj(
parentTable.getAbsoluteTableIdentifier,
LockUsage.METADATA_LOCK)
try {
if (metadataLock.lockWithRetries()) {
LOGGER.info(s"Acquired the metadata lock for table ${
parentTable
.getDatabaseName
}.${ parentTable.getTableName }")
// get carbon table again to reflect any changes during lock acquire.
parentTable =
CarbonEnv.getInstance(sparkSession).carbonMetaStore
.lookupRelation(indexModel.dbName, indexModel.tableName)(sparkSession)
.asInstanceOf[CarbonRelation].carbonTable
val oldIndexInfo = parentTable.getIndexInfo
val indexInfo = IndexTableUtil.checkAndAddIndexTable(
oldIndexInfo,
new IndexTableInfo(parentTable.getDatabaseName, indexModel.indexName,
indexSchema.getProperties),
false)
val enabledIndexInfo = IndexTableInfo.enableIndex(indexInfo, indexModel.indexName)
// set index information in parent table
val parentIndexMetadata = parentTable.getIndexMetadata
parentIndexMetadata.updateIndexStatus(indexProviderName,
indexModel.indexName,
IndexStatus.ENABLED.name())
parentTable.getTableInfo.getFactTable.getTableProperties
.put(parentTable.getCarbonTableIdentifier.getTableId, parentIndexMetadata.serialize)
sparkSession.sql(
s"""ALTER TABLE ${ parentTable.getDatabaseName }.${ parentTable.getTableName } SET
|SERDEPROPERTIES ('indexInfo' = '$enabledIndexInfo')""".stripMargin).collect()
}
} finally {
metadataLock.unlock()
}
}
CarbonIndexUtil
.addOrModifyTableProperty(
parentTable,
Map("indexExists" -> "true"), needLock = false)(sparkSession)
CarbonHiveIndexMetadataUtil.refreshTable(parentTable.getDatabaseName,
parentTable.getTableName,
sparkSession)
}
Seq.empty
}
/**
* Validate's index columns for the corresponding indexProvider
* Following will be validated
* 1. require INDEX_COLUMNS property
* 2. INDEX_COLUMNS can't contains illegal argument(empty, blank)
* 3. INDEX_COLUMNS can't contains duplicate same columns
* 4. INDEX_COLUMNS should be exists in table columns
*
* @param parentTable to which carbon columns to be derived
* @param indexColumns to be validated
* @param indexProvider name
* @return validated list of CarbonColumns
*/
def validateAndGetIndexColumns(
parentTable: CarbonTable,
indexColumns: Array[String],
indexProvider: String): java.util.List[CarbonColumn] = {
val indexCarbonColumns = parentTable.getIndexedColumns(indexColumns)
val unique: util.Set[String] = new util.HashSet[String]
val properties = parentTable.getTableInfo.getFactTable.getTableProperties.asScala
val spatialProperty = properties.get(CarbonCommonConstants.SPATIAL_INDEX)
for (indexColumn <- indexCarbonColumns.asScala) {
if (spatialProperty.isDefined &&
indexColumn.getColName.equalsIgnoreCase(spatialProperty.get.trim)) {
throw new MalformedIndexCommandException(String.format(
"Spatial Index column is not supported, column '%s' is spatial column",
indexColumn.getColName))
}
if (indexProvider.equalsIgnoreCase(IndexType.LUCENE.getIndexProviderName)) {
// validate whether it is string column.
if (indexColumn.getDataType != DataTypes.STRING) {
throw new MalformedIndexCommandException(String.format(
"Only String column is supported, column '%s' is %s type. ",
indexColumn.getColName,
indexColumn.getDataType))
}
else if (indexColumn.getDataType == DataTypes.DATE) {
throw new MalformedIndexCommandException(String.format(
"Dictionary column is not supported, column '%s' is dictionary column",
indexColumn.getColName))
}
}
unique.add(indexColumn.getColName)
}
if (unique.size != indexColumns.size) {
throw new MalformedIndexCommandException("index column list has duplicate column")
}
indexCarbonColumns
}
override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = {
DropIndexCommand(ifExistsSet = true,
indexModel.dbName,
indexModel.tableName,
indexModel.indexName)
Seq.empty
}
override protected def opName: String = "CREATE INDEX"
}