blob: f4a4ca350a03849f37be7d13cb27fbaaedd2c9d4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command.table
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, Union}
import org.apache.spark.sql.execution.command.{ExplainCommand, MetadataCommand}
import org.apache.spark.sql.types.StringType
import org.apache.carbondata.core.profiler.ExplainCollector
case class CarbonExplainCommand(
child: LogicalPlan,
override val output: Seq[Attribute] =
Seq(AttributeReference("plan", StringType, nullable = true)()))
extends MetadataCommand {
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
val explainCommand = child.asInstanceOf[ExplainCommand]
setAuditInfo(Map("query" -> explainCommand.logicalPlan.simpleString))
val isCommand = explainCommand.logicalPlan match {
case _: Command => true
case Union(childern) if childern.forall(_.isInstanceOf[Command]) => true
case _ => false
}
if (explainCommand.logicalPlan.isStreaming || isCommand) {
explainCommand.run(sparkSession)
} else {
CarbonExplainCommand.collectProfiler(explainCommand, sparkSession) ++
explainCommand.run(sparkSession)
}
}
override protected def opName: String = "EXPLAIN"
}
case class CarbonInternalExplainCommand(
explainCommand: ExplainCommand,
override val output: Seq[Attribute] =
Seq(AttributeReference("plan", StringType, nullable = true)()))
extends MetadataCommand {
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
CarbonExplainCommand
.collectProfiler(explainCommand, sparkSession) ++ explainCommand.run(sparkSession)
}
override protected def opName: String = "Carbon EXPLAIN"
}
object CarbonExplainCommand {
def collectProfiler(
explain: ExplainCommand,
sparkSession: SparkSession): Seq[Row] = {
try {
ExplainCollector.setup()
if (ExplainCollector.enabled()) {
val queryExecution =
sparkSession.sessionState.executePlan(explain.logicalPlan)
queryExecution.toRdd.partitions
// For count(*) queries the explain collector will be disabled, so profiler
// informations not required in such scenarios.
if (null == ExplainCollector.getFormattedOutput) {
Seq.empty
}
Seq(Row("== CarbonData Profiler ==\n" + ExplainCollector.getFormattedOutput))
} else {
Seq.empty
}
} finally {
ExplainCollector.remove()
}
}
}