blob: 9c3e8e1f61460d6e26783cb7e0986803d6d92745 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.util
import scala.reflect.runtime._
import scala.reflect.runtime.universe._
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.Analyzer
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
import org.apache.spark.sql.catalyst.parser.AstBuilder
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand}
import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation}
import org.apache.spark.sql.internal.HiveSerDe
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.StructField
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.hive.{CarbonFileHiveSerDe, CarbonHiveSerDe, MapredCarbonInputFormat, MapredCarbonOutputFormat}
/**
* Reflection APIs
*/
object CarbonReflectionUtils {
private val rm = universe.runtimeMirror(getClass.getClassLoader)
/**
* Returns the field val from a object through reflection.
* @param name - name of the field being retrieved.
* @param obj - Object from which the field has to be retrieved.
* @tparam T
* @return
*/
def getField[T: TypeTag : reflect.ClassTag](name: String, obj: T): Any = {
val im = rm.reflect(obj)
im.symbol.typeSignature.members.find(_.name.toString.equals(name))
.map(l => im.reflectField(l.asTerm).get).orNull
}
def getUnresolvedRelation(
tableIdentifier: TableIdentifier,
tableAlias: Option[String] = None): UnresolvedRelation = {
UnresolvedRelation(tableIdentifier)
}
def getSubqueryAlias(sparkSession: SparkSession, alias: Option[String],
relation: LogicalPlan,
view: Option[TableIdentifier]): SubqueryAlias = {
SubqueryAlias(alias.getOrElse(""), relation)
}
def getInsertIntoCommand(table: LogicalPlan,
partition: Map[String, Option[String]],
query: LogicalPlan,
overwrite: Boolean,
ifPartitionNotExists: Boolean): InsertIntoTable = {
InsertIntoTable(
table,
partition,
query,
overwrite,
ifPartitionNotExists)
}
def getLogicalRelation(relation: BaseRelation,
expectedOutputAttributes: Seq[Attribute],
catalogTable: Option[CatalogTable],
isStreaming: Boolean): LogicalRelation = {
new LogicalRelation(
relation,
expectedOutputAttributes.asInstanceOf[Seq[AttributeReference]],
catalogTable,
isStreaming)
}
def getOverWriteOption[T: TypeTag : reflect.ClassTag](name: String, obj: T): Boolean = {
var overwriteboolean: Boolean = false
val im = rm.reflect(obj)
for (m <- typeOf[T].members.filter(!_.isMethod)) {
if (m.toString.contains("overwrite")) {
val typ = m.typeSignature
if (typ.toString.contains("Boolean")) {
// Spark2.2
overwriteboolean = im.reflectField(m.asTerm).get.asInstanceOf[Boolean]
} else {
overwriteboolean = getOverWrite("enabled", im.reflectField(m.asTerm).get)
}
}
}
overwriteboolean
}
private def getOverWrite[T: TypeTag : reflect.ClassTag](name: String, obj: T): Boolean = {
var overwriteboolean: Boolean = false
val im = rm.reflect(obj)
for (l <- im.symbol.typeSignature.members.filter(_.name.toString.contains("enabled"))) {
overwriteboolean = im.reflectField(l.asTerm).get.asInstanceOf[Boolean]
}
overwriteboolean
}
def getFieldOfCatalogTable[T: TypeTag : reflect.ClassTag](name: String, obj: T): Any = {
val im = rm.reflect(obj)
val sym = im.symbol.typeSignature.member(TermName(name))
val tableMeta = im.reflectMethod(sym.asMethod).apply()
tableMeta
}
def getAstBuilder(conf: Object,
sqlParser: Object,
sparkSession: SparkSession): AstBuilder = {
val className = sparkSession.sparkContext.conf.get(
CarbonCommonConstants.CARBON_SQLASTBUILDER_CLASSNAME,
CarbonCommonConstants.CARBON_SQLASTBUILDER_CLASSNAME_DEFAULT)
createObject(className,
conf,
sqlParser, sparkSession)._1.asInstanceOf[AstBuilder]
}
def getSessionState(sparkContext: SparkContext,
carbonSession: Object,
useHiveMetaStore: Boolean): Any = {
if (useHiveMetaStore) {
val className = sparkContext.conf.get(
CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME,
"org.apache.spark.sql.hive.CarbonSessionStateBuilder")
val tuple = createObject(className, carbonSession, None)
val method = tuple._2.getMethod("build")
method.invoke(tuple._1)
} else {
val className = sparkContext.conf.get(
CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME,
"org.apache.spark.sql.hive.CarbonInMemorySessionStateBuilder")
val tuple = createObject(className, carbonSession, None)
val method = tuple._2.getMethod("build")
method.invoke(tuple._1)
}
}
def hasPredicateSubquery(filterExp: Expression) : Boolean = {
val tuple = Class.forName("org.apache.spark.sql.catalyst.expressions.SubqueryExpression")
val method = tuple.getMethod("hasInOrExistsSubquery", classOf[Expression])
val hasSubquery : Boolean = method.invoke(tuple, filterExp).asInstanceOf[Boolean]
hasSubquery
}
def getDescribeTableFormattedField[T: TypeTag : reflect.ClassTag](obj: T): Boolean = {
val im = rm.reflect(obj)
val isFormatted = im.symbol.typeSignature.members
.find(_.name.toString.equalsIgnoreCase("isFormatted"))
.map(l => im.reflectField(l.asTerm).get).getOrElse("false").asInstanceOf[Boolean]
isFormatted
}
def getRowDataSourceScanExecObj(relation: LogicalRelation,
output: Seq[Attribute],
pushedFilters: Seq[Filter],
handledFilters: Seq[Filter],
rdd: RDD[InternalRow],
partition: Partitioning,
metadata: Map[String, String]): RowDataSourceScanExec = {
RowDataSourceScanExec(output, output.map(output.indexOf),
pushedFilters.toSet, handledFilters.toSet, rdd,
relation.relation,
relation.catalogTable.map(_.identifier))
}
def invokewriteAndReadMethod(dataSourceObj: DataSource,
dataFrame: DataFrame,
data: LogicalPlan,
session: SparkSession,
mode: SaveMode,
query: LogicalPlan,
physicalPlan: SparkPlan): BaseRelation = {
dataSourceObj.writeAndRead(mode, query, query.output.map(_.name), physicalPlan)
}
/**
* method to invoke alter table add columns for hive table from carbon session
* @param table
* @param colsToAdd
* @return
*/
def invokeAlterTableAddColumn(table: TableIdentifier,
colsToAdd: Seq[StructField]): Object = {
AlterTableAddColumnsCommand(table, colsToAdd)
}
def createSingleObject(className: String): Any = {
val classMirror = universe.runtimeMirror(getClass.getClassLoader)
val classTest = classMirror.staticModule(className)
val methods = classMirror.reflectModule(classTest)
methods.instance
}
def createObject(className: String, conArgs: Object*): (Any, Class[_]) = {
val clazz = Utils.classForName(className)
val ctor = clazz.getConstructors.head
ctor.setAccessible(true)
(ctor.newInstance(conArgs: _*), clazz)
}
def createObjectOfPrivateConstructor(className: String, conArgs: Object*): (Any, Class[_]) = {
val clazz = Utils.classForName(className)
val ctor = clazz.getDeclaredConstructors.head
ctor.setAccessible(true)
(ctor.newInstance(conArgs: _*), clazz)
}
/**
* It is a hack to update the carbon input, output format information to #HiveSerDe
*/
def updateCarbonSerdeInfo(): Unit = {
val currentMirror = scala.reflect.runtime.currentMirror
val instanceMirror = currentMirror.reflect(HiveSerDe)
currentMirror.staticClass(HiveSerDe.getClass.getName).
toType.members.find { p =>
!p.isMethod && p.name.toString.equals("serdeMap")
} match {
case Some(field) =>
val serdeMap =
instanceMirror.reflectField(field.asTerm).get.asInstanceOf[Map[String, HiveSerDe]]
val updatedSerdeMap =
serdeMap ++ Map[String, HiveSerDe](
("org.apache.spark.sql.carbonsource", HiveSerDe(Some(
classOf[MapredCarbonInputFormat].getName),
Some(classOf[MapredCarbonOutputFormat[_]].getName),
Some(classOf[CarbonHiveSerDe].getName))),
("carbon", HiveSerDe(Some(
classOf[MapredCarbonInputFormat].getName),
Some(classOf[MapredCarbonOutputFormat[_]].getName),
Some(classOf[CarbonFileHiveSerDe].getName))),
("carbondata", HiveSerDe(Some(
classOf[MapredCarbonInputFormat].getName),
Some(classOf[MapredCarbonOutputFormat[_]].getName),
Some(classOf[CarbonHiveSerDe].getName))))
instanceMirror.reflectField(field.asTerm).set(updatedSerdeMap)
case _ =>
}
}
/**
* This method updates the field of case class through reflection.
*/
def setFieldToCaseClass(caseObj: Object, fieldName: String, objToSet: Object): Unit = {
val nameField = caseObj.getClass.getDeclaredField(fieldName)
nameField.setAccessible(true)
nameField.set(caseObj, objToSet)
}
/**
* This method updates the field of case class through reflection.
*/
def setSuperFieldToClass(caseObj: Object, fieldName: String, objToSet: Object): Unit = {
val nameField = caseObj.getClass.getSuperclass.getDeclaredField(fieldName)
nameField.setAccessible(true)
nameField.set(caseObj, objToSet)
}
def invokeAnalyzerExecute(analyzer: Analyzer,
plan: LogicalPlan): LogicalPlan = {
analyzer.executeAndCheck(plan)
}
}