blob: 1caaa3454f49c18d62e1fc525e82257b76063e3e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.secondaryindex.command
import java.util
import scala.collection.JavaConverters._
import org.apache.log4j.Logger
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.DataCommand
import org.apache.spark.sql.util.CarbonException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.SchemaReader
import org.apache.carbondata.core.metadata.schema.table.TableInfo
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
/**
* Register index table with main table
* 1. check if the main and index table exist
* 2. call the create index command with isCreateSIndex = false
* (do not create the si table in store path & avoid data load for si)
*/
case class RegisterIndexTableCommand(dbName: Option[String], indexTableName: String,
parentTable: String)
extends DataCommand {
val LOGGER: Logger =
LogServiceFactory.getLogService(this.getClass.getName)
override def processData(sparkSession: SparkSession): Seq[Row] = {
val databaseName = CarbonEnv.getDatabaseName(dbName)(sparkSession)
val databaseLocation = CarbonEnv.getDatabaseLocation(databaseName, sparkSession)
val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + indexTableName
val absoluteTableIdentifier = AbsoluteTableIdentifier.from(tablePath, databaseName,
indexTableName)
setAuditTable(databaseName, indexTableName)
setAuditInfo(Map("Parent TableName" -> parentTable))
// 1. check if the main and index table exist
val tables: Seq[TableIdentifier] = sparkSession.sessionState.catalog.listTables(databaseName)
if (!tables.exists(_.table.equalsIgnoreCase(parentTable))) {
val message: String = s"Secondary Index Table registration for table [$indexTableName] with" +
s" table" +
s" [$databaseName.$parentTable] failed." +
s"Table [$parentTable] does not exists under database [$databaseName]"
CarbonException.analysisException(message)
}
if (!tables.exists(_.table.equalsIgnoreCase(indexTableName))) {
val message: String = s"Secondary Index Table registration for table [$indexTableName] with" +
s" table" +
s" [$databaseName.$parentTable] failed." +
s"Secondary Index Table [$indexTableName] does not exists under database [$databaseName]"
CarbonException.analysisException(message)
}
// 2. Read TableInfo
val tableInfo = SchemaReader.getTableInfo(absoluteTableIdentifier)
val columns: List[String] = getIndexColumn(tableInfo)
val secondaryIndex = SecondaryIndex(dbName, parentTable.toLowerCase, columns,
indexTableName.toLowerCase)
// 3. Call the create index command with isCreateSIndex = false
// (do not create the si table in store path)
CreateIndexTable(indexModel = secondaryIndex,
tableProperties = tableInfo.getFactTable.getTableProperties.asScala,
isCreateSIndex = false).run(sparkSession)
LOGGER.info(s"Table [$indexTableName] registered as Secondary Index table with" +
s" table [$databaseName.$parentTable] successfully.")
Seq.empty
}
/**
* The method return's the List of dimension columns excluding the positionReference dimension
*
* @param tableInfo TableInfo object
* @return List[String] List of dimension column names
*/
def getIndexColumn(tableInfo: TableInfo) : List[String] = {
val columns: util.List[ColumnSchema] = tableInfo.getFactTable.getListOfColumns
columns.asScala.filter(f => f.isDimensionColumn &&
!f.getColumnName.equalsIgnoreCase(CarbonCommonConstants.POSITION_REFERENCE)
).map(column => column.getColumnName.toLowerCase()).toList
}
override protected def opName: String = "Register Index Table"
}