blob: 13e7c4505877ee151319202837915be6dac94a72 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.util
import java.lang.reflect.Method
import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkContext
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.EmptyRule
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeSeq, NamedExpression}
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.internal.{SessionState, SQLConf}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.{CarbonReflectionUtils, SerializableConfiguration, SparkUtil, Utils}
object SparkSQLUtil {
def sessionState(sparkSession: SparkSession): SessionState = sparkSession.sessionState
def execute(logicalPlan: LogicalPlan, sparkSession: SparkSession): DataFrame = {
Dataset.ofRows(sparkSession, logicalPlan)
}
def execute(rdd: RDD[InternalRow], schema: StructType, sparkSession: SparkSession): DataFrame = {
execute(LogicalRDD(schema.toAttributes, rdd)(sparkSession), sparkSession)
}
def getSparkSession: SparkSession = {
SparkSession.getDefaultSession.get
}
def invokeStatsMethod(logicalPlanObj: LogicalPlan, conf: SQLConf): Statistics = {
if (SparkUtil.isSparkVersionEqualTo("2.2")) {
val method: Method = logicalPlanObj.getClass.getMethod("stats", classOf[SQLConf])
method.invoke(logicalPlanObj, conf).asInstanceOf[Statistics]
} else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
val method: Method = logicalPlanObj.getClass.getMethod("stats")
method.invoke(logicalPlanObj).asInstanceOf[Statistics]
} else {
throw new UnsupportedOperationException("Spark version not supported")
}
}
def invokeQueryPlannormalizeExprId(r: NamedExpression, input: AttributeSeq)
: NamedExpression = {
if (SparkUtil.isSparkVersionXandAbove("2.2")) {
val clazz = Utils.classForName("org.apache.spark.sql.catalyst.plans.QueryPlan")
clazz.getDeclaredMethod("normalizeExprId", classOf[Any], classOf[AttributeSeq]).
invoke(null, r, input).asInstanceOf[NamedExpression]
} else {
r
}
}
def getStatisticsObj(outputList: Seq[NamedExpression],
plan: LogicalPlan, stats: Statistics,
aliasMap: Option[AttributeMap[Attribute]] = None)
: Statistics = {
val className = "org.apache.spark.sql.catalyst.plans.logical.Statistics"
if (SparkUtil.isSparkVersionXandAbove("2.2")) {
val output = outputList.map(_.toAttribute)
val mapSeq = plan.collect { case n: logical.LeafNode => n }.map {
table => AttributeMap(table.output.zip(output))
}
val rewrites = mapSeq.head
val attributes : AttributeMap[ColumnStat] = CarbonReflectionUtils.
getField("attributeStats", stats).asInstanceOf[AttributeMap[ColumnStat]]
var attributeStats = AttributeMap(attributes.iterator
.map { pair => (rewrites(pair._1), pair._2) }.toSeq)
if (aliasMap.isDefined) {
attributeStats = AttributeMap(
attributeStats.map(pair => (aliasMap.get(pair._1), pair._2)).toSeq)
}
val hints = CarbonReflectionUtils.getField("hints", stats).asInstanceOf[Object]
CarbonReflectionUtils.createObject(className, stats.sizeInBytes,
stats.rowCount, attributeStats, hints).asInstanceOf[Statistics]
} else {
val output = outputList.map(_.name)
val mapSeq = plan.collect { case n: logical.LeafNode => n }.map {
table => table.output.map(_.name).zip(output).toMap
}
val rewrites = mapSeq.head
val colStats = CarbonReflectionUtils.getField("colStats", stats)
.asInstanceOf[Map[String, ColumnStat]]
var attributeStats = colStats.iterator
.map { pair => (rewrites(pair._1), pair._2) }.toMap
if (aliasMap.isDefined) {
val aliasMapName = aliasMap.get.map(x => (x._1.name, x._2.name))
attributeStats =
attributeStats.map(pair => (aliasMapName.getOrElse(pair._1, pair._1)
, pair._2))
}
CarbonReflectionUtils.createObject(className, stats.sizeInBytes,
stats.rowCount, attributeStats).asInstanceOf[Statistics]
}
}
def getEliminateViewObj(): Rule[LogicalPlan] = {
if (SparkUtil.isSparkVersionXandAbove("2.2")) {
val className = "org.apache.spark.sql.catalyst.analysis.EliminateView"
CarbonReflectionUtils.createSingleObject(className).asInstanceOf[Rule[LogicalPlan]]
} else {
EmptyRule
}
}
def getPullupCorrelatedPredicatesObj(): Rule[LogicalPlan] = {
if (SparkUtil.isSparkVersionXandAbove("2.2")) {
val className = "org.apache.spark.sql.catalyst.optimizer.PullupCorrelatedPredicates"
CarbonReflectionUtils.createSingleObject(className).asInstanceOf[Rule[LogicalPlan]]
} else {
EmptyRule
}
}
def getRemoveRedundantAliasesObj(): Rule[LogicalPlan] = {
if (SparkUtil.isSparkVersionXandAbove("2.2")) {
val className = "org.apache.spark.sql.catalyst.optimizer.RemoveRedundantAliases"
CarbonReflectionUtils.createSingleObject(className).asInstanceOf[Rule[LogicalPlan]]
} else {
EmptyRule
}
}
def getReorderJoinObj(conf: SQLConf): Rule[LogicalPlan] = {
if (SparkUtil.isSparkVersionEqualTo("2.2")) {
val className = "org.apache.spark.sql.catalyst.optimizer.ReorderJoin";
CarbonReflectionUtils.createObject(className, conf)._1.asInstanceOf[Rule[LogicalPlan]]
} else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
val className = "org.apache.spark.sql.catalyst.optimizer.ReorderJoin$";
CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
.asInstanceOf[Rule[LogicalPlan]]
} else if (SparkUtil.isSparkVersionEqualTo("2.1")) {
val className = "org.apache.spark.sql.catalyst.optimizer.ReorderJoin$";
CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
.asInstanceOf[Rule[LogicalPlan]]
}
else {
throw new UnsupportedOperationException("Spark version not supported")
}
}
def getEliminateOuterJoinObj(conf: SQLConf): Rule[LogicalPlan] = {
if (SparkUtil.isSparkVersionEqualTo("2.2")) {
val className = "org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin";
CarbonReflectionUtils.createObject(className, conf)._1.asInstanceOf[Rule[LogicalPlan]]
} else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
val className = "org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$";
CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
.asInstanceOf[Rule[LogicalPlan]]
} else if (SparkUtil.isSparkVersionEqualTo("2.1")) {
val className = "org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$";
CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
.asInstanceOf[Rule[LogicalPlan]]
}
else {
throw new UnsupportedOperationException("Spark version not supported")
}
}
def getNullPropagationObj(conf: SQLConf): Rule[LogicalPlan] = {
if (SparkUtil.isSparkVersionEqualTo("2.2")) {
val className = "org.apache.spark.sql.catalyst.optimizer.NullPropagation";
CarbonReflectionUtils.createObject(className, conf)._1.asInstanceOf[Rule[LogicalPlan]]
} else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
val className = "org.apache.spark.sql.catalyst.optimizer.NullPropagation$";
CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
.asInstanceOf[Rule[LogicalPlan]]
} else if (SparkUtil.isSparkVersionEqualTo("2.1")) {
val className = "org.apache.spark.sql.catalyst.optimizer.NullPropagation$";
CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
.asInstanceOf[Rule[LogicalPlan]]
} else {
throw new UnsupportedOperationException("Spark version not supported")
}
}
def getCheckCartesianProductsObj(conf: SQLConf): Rule[LogicalPlan] = {
if (SparkUtil.isSparkVersionEqualTo("2.2")) {
val className = "org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts";
CarbonReflectionUtils.createObject(className, conf)._1.asInstanceOf[Rule[LogicalPlan]]
} else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
val className = "org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$";
CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
.asInstanceOf[Rule[LogicalPlan]]
} else if (SparkUtil.isSparkVersionEqualTo("2.1")) {
val className = "org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts";
CarbonReflectionUtils.createObject(className, conf)._1.asInstanceOf[Rule[LogicalPlan]]
}
else {
throw new UnsupportedOperationException("Spark version not supported")
}
}
/**
* Method to broadcast a variable using spark SerializableConfiguration class
*
* @param sparkContext
* @param hadoopConf
* @return
*/
def broadCastHadoopConf(sparkContext: SparkContext,
hadoopConf: Configuration): Broadcast[SerializableConfiguration] = {
sparkContext.broadcast(getSerializableConfigurableInstance(hadoopConf))
}
def getSerializableConfigurableInstance(hadoopConf: Configuration): SerializableConfiguration = {
new SerializableConfiguration(hadoopConf)
}
/**
* Get the task group id
*
* @param sparkSession
* @return
*/
def getTaskGroupId(sparkSession: SparkSession): String = {
val taskGroupId = sparkSession.sparkContext.getLocalProperty("spark.jobGroup.id") match {
case null => ""
case _ => sparkSession.sparkContext.getLocalProperty("spark.jobGroup.id")
}
taskGroupId
}
/**
* Get the task group description
*
* @param sparkSession
* @return
*/
def getTaskGroupDesc(sparkSession: SparkSession): String = {
val taskGroupDesc = sparkSession.sparkContext.getLocalProperty("spark.job.description") match {
case null => ""
case _ => sparkSession.sparkContext.getLocalProperty("spark.job.description")
}
taskGroupDesc
}
}