blob: 94b75fde0d350898f3074a608c96be2eb112fae1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.execution.command
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
import org.apache.spark.sql.util.SparkSQLUtil
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonCommonConstantsInternal, CarbonLoadOptionConstants}
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.exception.InvalidConfigurationException
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, SessionParams}
case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
extends RunnableCommand {
override val output: Seq[Attribute] = command.output
override def run(sparkSession: SparkSession): Seq[Row] = {
var rows: Seq[Row] = Seq()
val dbName = command.databaseName
var tablesInDB: Seq[TableIdentifier] = null
if (sparkSession.sessionState.catalog.listDatabases().exists(_.equalsIgnoreCase(dbName))) {
tablesInDB = sparkSession.sessionState.catalog.listTables(dbName)
}
var databaseLocation = ""
try {
databaseLocation = CarbonEnv.getDatabaseLocation(dbName, sparkSession)
} catch {
case e: NoSuchDatabaseException =>
// if database not found and ifExists true return empty
if (command.ifExists) {
return rows
}
}
// DropHiveDB command will fail if cascade is false and one or more table exists in database
if (command.cascade && tablesInDB != null) {
tablesInDB.foreach { tableName =>
CarbonDropTableCommand(true, tableName.database, tableName.table).run(sparkSession)
}
}
rows = command.run(sparkSession)
CarbonUtil.dropDatabaseDirectory(databaseLocation)
rows
}
}
case class CarbonSetCommand(command: SetCommand)
extends MetadataCommand {
override val output: Seq[Attribute] = command.output
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
val sessionParams = CarbonEnv.getInstance(sparkSession).carbonSessionInfo.getSessionParams
command.kv match {
case Some((key, Some(value))) =>
CarbonSetCommand.validateAndSetValue(sessionParams, key, value)
case _ =>
}
command.run(sparkSession)
}
override protected def opName: String = "SET"
}
object CarbonSetCommand {
def validateAndSetValue(sessionParams: SessionParams, key: String, value: String): Unit = {
val isCarbonProperty: Boolean = CarbonProperties.getInstance().isCarbonProperty(key)
if (key.startsWith(CarbonCommonConstants.CARBON_INPUT_SEGMENTS)) {
if (key.split("\\.").length == 5) {
sessionParams.addProperty(key.toLowerCase(), value)
}
else {
throw new MalformedCarbonCommandException(
"property should be in \" carbon.input.segments.<database_name>" +
".<table_name>=<seg_id list> \" format.")
}
} else if (key.startsWith(CarbonCommonConstants.CARBON_DATAMAP_VISIBLE)) {
if (key.split("\\.").length == 6) {
val keyArray = key.split("\\.")
val dbName = keyArray(keyArray.length - 3)
val tableName = keyArray(keyArray.length - 2)
val table = CarbonEnv.getCarbonTable(Some(dbName), tableName)(SparkSQLUtil.getSparkSession)
val isValid = DataMapStoreManager.getInstance
.isDataMapExist(table.getTableId, keyArray(keyArray.length - 1))
if (!isValid) throw new InvalidConfigurationException(String.format(
"Invalid configuration of %s, datamap does not exist",
key))
sessionParams.addProperty(key.toLowerCase, value)
} else {
throw new MalformedCarbonCommandException("property should be in " +
"\" carbon.datamap.visible.<database_name>.<table_name>.<datamap_name>" +
" = <true/false> \" format")
}
} else if (key.startsWith(CarbonCommonConstants.CARBON_LOAD_DATAMAPS_PARALLEL)) {
if (key.split("\\.").length == 6) {
sessionParams.addProperty(key.toLowerCase(), value)
}
else {
throw new MalformedCarbonCommandException(
"property should be in \" carbon.load.datamaps.parallel.<database_name>" +
".<table_name>=<true/false> \" format.")
}
} else if (key.startsWith(CarbonLoadOptionConstants.CARBON_TABLE_LOAD_SORT_SCOPE)) {
if (key.split("\\.").length == 7) {
sessionParams.addProperty(key.toLowerCase(), value)
}
else {
throw new MalformedCarbonCommandException(
"property should be in \" carbon.table.load.sort.scope.<database_name>" +
".<table_name>=<sort_sope> \" format.")
}
} else if (key.startsWith(CarbonCommonConstants.CARBON_ENABLE_INDEX_SERVER)) {
val keySplits = key.split("\\.")
if (keySplits.length == 6 || keySplits.length == 4) {
sessionParams.addProperty(key.toString, value)
}
}
else if (isCarbonProperty) {
sessionParams.addProperty(key, value)
}
}
def unsetValue(sessionParams: SessionParams, key: String): Unit = {
sessionParams.removeProperty(key)
}
}
case class CarbonResetCommand()
extends RunnableCommand {
override val output = ResetCommand.output
override def run(sparkSession: SparkSession): Seq[Row] = {
CarbonEnv.getInstance(sparkSession).carbonSessionInfo.getSessionParams.clear()
ResetCommand.run(sparkSession)
}
}