blob: d50d5f2b5b607b63ab79cfd8f8554ce9b75cf69d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.util
import java.lang.reflect.Method
import scala.reflect.runtime._
import scala.reflect.runtime.universe._
import org.apache.spark.{SPARK_VERSION, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.Analyzer
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.parser.AstBuilder
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation}
import org.apache.spark.sql.internal.HiveSerDe
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.StructField
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.hive.{CarbonHiveSerDe, MapredCarbonInputFormat, MapredCarbonOutputFormat}
/**
* Reflection APIs
*/
object CarbonReflectionUtils {
private val rm = universe.runtimeMirror(getClass.getClassLoader)
/**
* Returns the field val from a object through reflection.
* @param name - name of the field being retrieved.
* @param obj - Object from which the field has to be retrieved.
* @tparam T
* @return
*/
def getField[T: TypeTag : reflect.ClassTag](name: String, obj: T): Any = {
val im = rm.reflect(obj)
im.symbol.typeSignature.members.find(_.name.toString.equals(name))
.map(l => im.reflectField(l.asTerm).get).getOrElse(null)
}
def getUnresolvedRelation(
tableIdentifier: TableIdentifier,
tableAlias: Option[String] = None): UnresolvedRelation = {
val className = "org.apache.spark.sql.catalyst.analysis.UnresolvedRelation"
if (SparkUtil.isSparkVersionEqualTo("2.1")) {
createObject(
className,
tableIdentifier,
tableAlias)._1.asInstanceOf[UnresolvedRelation]
} else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
createObject(
className,
tableIdentifier)._1.asInstanceOf[UnresolvedRelation]
} else {
throw new UnsupportedOperationException(s"Unsupported Spark version $SPARK_VERSION")
}
}
def getSubqueryAlias(sparkSession: SparkSession, alias: Option[String],
relation: LogicalPlan,
view: Option[TableIdentifier]): SubqueryAlias = {
val className = "org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias"
if (SparkUtil.isSparkVersionEqualTo("2.1")) {
createObject(
className,
alias.getOrElse(""),
relation,
Option(view))._1.asInstanceOf[SubqueryAlias]
} else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
createObject(
className,
alias.getOrElse(""),
relation)._1.asInstanceOf[SubqueryAlias]
} else {
throw new UnsupportedOperationException("Unsupported Spark version")
}
}
def getInsertIntoCommand(table: LogicalPlan,
partition: Map[String, Option[String]],
query: LogicalPlan,
overwrite: Boolean,
ifPartitionNotExists: Boolean): InsertIntoTable = {
val className = "org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable"
if (SparkUtil.isSparkVersionEqualTo("2.1")) {
val overwriteOptions = createObject(
"org.apache.spark.sql.catalyst.plans.logical.OverwriteOptions",
overwrite.asInstanceOf[Object], Map.empty.asInstanceOf[Object])._1.asInstanceOf[Object]
createObject(
className,
table,
partition,
query,
overwriteOptions,
ifPartitionNotExists.asInstanceOf[Object])._1.asInstanceOf[InsertIntoTable]
} else if (SparkUtil.isSparkVersionXandAbove("2.2") ) {
createObject(
className,
table,
partition,
query,
overwrite.asInstanceOf[Object],
ifPartitionNotExists.asInstanceOf[Object])._1.asInstanceOf[InsertIntoTable]
} else {
throw new UnsupportedOperationException("Unsupported Spark version")
}
}
def getLogicalRelation(relation: BaseRelation,
expectedOutputAttributes: Seq[Attribute],
catalogTable: Option[CatalogTable],
isStreaming: Boolean): LogicalRelation = {
val className = "org.apache.spark.sql.execution.datasources.LogicalRelation"
if (SparkUtil.isSparkVersionEqualTo("2.1")) {
createObject(
className,
relation,
Some(expectedOutputAttributes),
catalogTable)._1.asInstanceOf[LogicalRelation]
} else if (SparkUtil.isSparkVersionEqualTo("2.2")) {
createObject(
className,
relation,
expectedOutputAttributes,
catalogTable)._1.asInstanceOf[LogicalRelation]
} else if (SparkUtil.isSparkVersionEqualTo("2.3")) {
createObject(
className,
relation,
expectedOutputAttributes,
catalogTable,
isStreaming.asInstanceOf[Object])._1.asInstanceOf[LogicalRelation]
} else {
throw new UnsupportedOperationException("Unsupported Spark version")
}
}
def getOverWriteOption[T: TypeTag : reflect.ClassTag](name: String, obj: T): Boolean = {
var overwriteboolean: Boolean = false
val im = rm.reflect(obj)
for (m <- typeOf[T].members.filter(!_.isMethod)) {
if (m.toString.contains("overwrite")) {
val typ = m.typeSignature
if (typ.toString.contains("Boolean")) {
// Spark2.2
overwriteboolean = im.reflectField(m.asTerm).get.asInstanceOf[Boolean]
} else {
overwriteboolean = getOverWrite("enabled", im.reflectField(m.asTerm).get)
}
}
}
overwriteboolean
}
private def getOverWrite[T: TypeTag : reflect.ClassTag](name: String, obj: T): Boolean = {
var overwriteboolean: Boolean = false
val im = rm.reflect(obj)
for (l <- im.symbol.typeSignature.members.filter(_.name.toString.contains("enabled"))) {
overwriteboolean = im.reflectField(l.asTerm).get.asInstanceOf[Boolean]
}
overwriteboolean
}
def getFieldOfCatalogTable[T: TypeTag : reflect.ClassTag](name: String, obj: T): Any = {
val im = rm.reflect(obj)
val sym = im.symbol.typeSignature.member(TermName(name))
val tableMeta = im.reflectMethod(sym.asMethod).apply()
tableMeta
}
def getAstBuilder(conf: Object,
sqlParser: Object,
sparkSession: SparkSession): AstBuilder = {
val className = sparkSession.sparkContext.conf.get(
CarbonCommonConstants.CARBON_SQLASTBUILDER_CLASSNAME,
CarbonCommonConstants.CARBON_SQLASTBUILDER_CLASSNAME_DEFAULT)
createObject(className,
conf,
sqlParser, sparkSession)._1.asInstanceOf[AstBuilder]
}
def getSessionState(sparkContext: SparkContext,
carbonSession: Object,
useHiveMetaStore: Boolean): Any = {
if (SparkUtil.isSparkVersionEqualTo("2.1")) {
val className = sparkContext.conf.get(
CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME,
"org.apache.spark.sql.hive.CarbonSessionState")
createObject(className, carbonSession)._1
} else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
if (useHiveMetaStore) {
val className = sparkContext.conf.get(
CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME,
"org.apache.spark.sql.hive.CarbonSessionStateBuilder")
val tuple = createObject(className, carbonSession, None)
val method = tuple._2.getMethod("build")
method.invoke(tuple._1)
} else {
val className = sparkContext.conf.get(
CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME,
"org.apache.spark.sql.hive.CarbonInMemorySessionStateBuilder")
val tuple = createObject(className, carbonSession, None)
val method = tuple._2.getMethod("build")
method.invoke(tuple._1)
}
} else {
throw new UnsupportedOperationException("Spark version not supported")
}
}
def hasPredicateSubquery(filterExp: Expression) : Boolean = {
if (SparkUtil.isSparkVersionEqualTo("2.1")) {
val tuple = Class.forName("org.apache.spark.sql.catalyst.expressions.PredicateSubquery")
val method = tuple.getMethod("hasPredicateSubquery", classOf[Expression])
val hasSubquery : Boolean = method.invoke(tuple, filterExp).asInstanceOf[Boolean]
hasSubquery
} else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
val tuple = Class.forName("org.apache.spark.sql.catalyst.expressions.SubqueryExpression")
val method = tuple.getMethod("hasInOrExistsSubquery", classOf[Expression])
val hasSubquery : Boolean = method.invoke(tuple, filterExp).asInstanceOf[Boolean]
hasSubquery
} else {
throw new UnsupportedOperationException("Spark version not supported")
}
}
def getDescribeTableFormattedField[T: TypeTag : reflect.ClassTag](obj: T): Boolean = {
val im = rm.reflect(obj)
val isFormatted = im.symbol.typeSignature.members
.find(_.name.toString.equalsIgnoreCase("isFormatted"))
.map(l => im.reflectField(l.asTerm).get).getOrElse("false").asInstanceOf[Boolean]
isFormatted
}
def getRowDataSourceScanExecObj(relation: LogicalRelation,
output: Seq[Attribute],
pushedFilters: Seq[Filter],
handledFilters: Seq[Filter],
rdd: RDD[InternalRow],
partition: Partitioning,
metadata: Map[String, String]): RowDataSourceScanExec = {
val className = "org.apache.spark.sql.execution.RowDataSourceScanExec"
if (SparkUtil.isSparkVersionEqualTo("2.1") || SparkUtil.isSparkVersionEqualTo("2.2")) {
createObject(className, output, rdd, relation.relation,
partition, metadata,
relation.catalogTable.map(_.identifier))._1.asInstanceOf[RowDataSourceScanExec]
} else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
createObject(className, output, output.map(output.indexOf),
pushedFilters.toSet, handledFilters.toSet, rdd,
relation.relation,
relation.catalogTable.map(_.identifier))._1.asInstanceOf[RowDataSourceScanExec]
} else {
throw new UnsupportedOperationException("Spark version not supported")
}
}
def invokewriteAndReadMethod(dataSourceObj: DataSource,
dataFrame: DataFrame,
data: LogicalPlan,
session: SparkSession,
mode: SaveMode,
query: LogicalPlan,
physicalPlan: SparkPlan): BaseRelation = {
if (SparkUtil.isSparkVersionEqualTo("2.2")) {
val method: Method = dataSourceObj.getClass
.getMethod("writeAndRead", classOf[SaveMode], classOf[DataFrame])
method.invoke(dataSourceObj, mode, dataFrame)
.asInstanceOf[BaseRelation]
} else if (SparkUtil.isSparkVersionEqualTo("2.3")) {
val method: Method = dataSourceObj.getClass
.getMethod("writeAndRead",
classOf[SaveMode],
classOf[LogicalPlan],
classOf[Seq[String]],
classOf[SparkPlan])
// since spark 2.3.2 version (SPARK-PR#22346),
// change 'query.output' to 'query.output.map(_.name)'
method.invoke(dataSourceObj, mode, query, query.output.map(_.name), physicalPlan)
.asInstanceOf[BaseRelation]
} else {
throw new UnsupportedOperationException("Spark version not supported")
}
}
/**
* method to invoke alter table add columns for hive table from carbon session
* @param table
* @param colsToAdd
* @return
*/
def invokeAlterTableAddColumn(table: TableIdentifier,
colsToAdd: Seq[StructField]): Object = {
val caseClassName = "org.apache.spark.sql.execution.command.AlterTableAddColumnsCommand"
CarbonReflectionUtils.createObject(caseClassName, table, colsToAdd)
._1.asInstanceOf[RunnableCommand]
}
def createSingleObject(className: String): Any = {
val classMirror = universe.runtimeMirror(getClass.getClassLoader)
val classTest = classMirror.staticModule(className)
val methods = classMirror.reflectModule(classTest)
methods.instance
}
def createObject(className: String, conArgs: Object*): (Any, Class[_]) = {
val clazz = Utils.classForName(className)
val ctor = clazz.getConstructors.head
ctor.setAccessible(true)
(ctor.newInstance(conArgs: _*), clazz)
}
def createObjectOfPrivateConstructor(className: String, conArgs: Object*): (Any, Class[_]) = {
val clazz = Utils.classForName(className)
val ctor = clazz.getDeclaredConstructors.head
ctor.setAccessible(true)
(ctor.newInstance(conArgs: _*), clazz)
}
/**
* It is a hack to update the carbon input, output format information to #HiveSerDe
*/
def updateCarbonSerdeInfo(): Unit = {
val currentMirror = scala.reflect.runtime.currentMirror
val instanceMirror = currentMirror.reflect(HiveSerDe)
currentMirror.staticClass(HiveSerDe.getClass.getName).
toType.members.find { p =>
!p.isMethod && p.name.toString.equals("serdeMap")
} match {
case Some(field) =>
val serdeMap =
instanceMirror.reflectField(field.asTerm).get.asInstanceOf[Map[String, HiveSerDe]]
val updatedSerdeMap =
serdeMap ++ Map[String, HiveSerDe](
("org.apache.spark.sql.carbonsource", HiveSerDe(Some(
classOf[MapredCarbonInputFormat].getName),
Some(classOf[MapredCarbonOutputFormat[_]].getName),
Some(classOf[CarbonHiveSerDe].getName))),
("carbon", HiveSerDe(Some(
classOf[MapredCarbonInputFormat].getName),
Some(classOf[MapredCarbonOutputFormat[_]].getName),
Some(classOf[CarbonHiveSerDe].getName))),
("carbondata", HiveSerDe(Some(
classOf[MapredCarbonInputFormat].getName),
Some(classOf[MapredCarbonOutputFormat[_]].getName),
Some(classOf[CarbonHiveSerDe].getName))))
instanceMirror.reflectField(field.asTerm).set(updatedSerdeMap)
case _ =>
}
}
/**
* This method updates the field of case class through reflection.
*/
def setFieldToCaseClass(caseObj: Object, fieldName: String, objToSet: Object): Unit = {
val nameField = caseObj.getClass.getDeclaredField(fieldName)
nameField.setAccessible(true)
nameField.set(caseObj, objToSet)
}
/**
* This method updates the field of case class through reflection.
*/
def setSuperFieldToClass(caseObj: Object, fieldName: String, objToSet: Object): Unit = {
val nameField = caseObj.getClass.getSuperclass.getDeclaredField(fieldName)
nameField.setAccessible(true)
nameField.set(caseObj, objToSet)
}
def invokeAnalyzerExecute(analyzer: Analyzer,
plan: LogicalPlan): LogicalPlan = {
if (SparkUtil.isSparkVersionEqualTo("2.1") || SparkUtil.isSparkVersionEqualTo("2.2")) {
val method: Method = analyzer.getClass
.getMethod("execute", classOf[LogicalPlan])
method.invoke(analyzer, plan).asInstanceOf[LogicalPlan]
} else if (SparkUtil.isSparkVersionEqualTo("2.3")) {
val method: Method = analyzer.getClass
.getMethod("executeAndCheck", classOf[LogicalPlan])
method.invoke(analyzer, plan).asInstanceOf[LogicalPlan]
} else {
throw new UnsupportedOperationException("Spark version not supported")
}
}
}