blob: 8df43b785759e469421f4322ebd926be375e9268 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.client
import java.lang.{Boolean => JBoolean, Integer => JInteger, Long => JLong}
import java.lang.reflect.{InvocationTargetException, Method, Modifier}
import java.net.URI
import java.util.{ArrayList => JArrayList, List => JList, Locale, Map => JMap, Set => JSet}
import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.IMetaStoreClient
import org.apache.hadoop.hive.metastore.TableType
import org.apache.hadoop.hive.metastore.api.{Database, EnvironmentContext, Function => HiveFunction, FunctionType, MetaException, PrincipalType, ResourceType, ResourceUri}
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.io.AcidUtils
import org.apache.hadoop.hive.ql.metadata.{Hive, HiveException, Partition, Table}
import org.apache.hadoop.hive.ql.plan.AddPartitionDesc
import org.apache.hadoop.hive.ql.processors.{CommandProcessor, CommandProcessorFactory}
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTablePartition, CatalogUtils, FunctionResource, FunctionResourceType}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{AtomicType, IntegralType, StringType}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
/**
* A shim that defines the interface between [[HiveClientImpl]] and the underlying Hive library used
* to talk to the metastore. Each Hive version has its own implementation of this class, defining
* version-specific version of needed functions.
*
* The guideline for writing shims is:
* - always extend from the previous version unless really not possible
* - initialize methods in lazy vals, both for quicker access for multiple invocations, and to
* avoid runtime errors due to the above guideline.
*/
private[client] sealed abstract class Shim {
/**
* Set the current SessionState to the given SessionState. Also, set the context classloader of
* the current thread to the one set in the HiveConf of this given `state`.
*/
def setCurrentSessionState(state: SessionState): Unit
/**
* This shim is necessary because the return type is different on different versions of Hive.
* All parameters are the same, though.
*/
def getDataLocation(table: Table): Option[String]
def setDataLocation(table: Table, loc: String): Unit
def getAllPartitions(hive: Hive, table: Table): Seq[Partition]
def getPartitionsByFilter(hive: Hive, table: Table, predicates: Seq[Expression]): Seq[Partition]
def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor
def getDriverResults(driver: Driver): Seq[String]
def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long
def alterTable(hive: Hive, tableName: String, table: Table): Unit
def alterPartitions(hive: Hive, tableName: String, newParts: JList[Partition]): Unit
def getTablesByType(
hive: Hive,
dbName: String,
pattern: String,
tableType: TableType): Seq[String]
def createPartitions(
hive: Hive,
db: String,
table: String,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit
def loadPartition(
hive: Hive,
loadPath: Path,
tableName: String,
partSpec: JMap[String, String],
replace: Boolean,
inheritTableSpecs: Boolean,
isSkewedStoreAsSubdir: Boolean,
isSrcLocal: Boolean): Unit
def loadTable(
hive: Hive,
loadPath: Path,
tableName: String,
replace: Boolean,
isSrcLocal: Boolean): Unit
def loadDynamicPartitions(
hive: Hive,
loadPath: Path,
tableName: String,
partSpec: JMap[String, String],
replace: Boolean,
numDP: Int,
listBucketingEnabled: Boolean): Unit
def createFunction(hive: Hive, db: String, func: CatalogFunction): Unit
def dropFunction(hive: Hive, db: String, name: String): Unit
def renameFunction(hive: Hive, db: String, oldName: String, newName: String): Unit
def alterFunction(hive: Hive, db: String, func: CatalogFunction): Unit
def getFunctionOption(hive: Hive, db: String, name: String): Option[CatalogFunction]
def listFunctions(hive: Hive, db: String, pattern: String): Seq[String]
def dropIndex(hive: Hive, dbName: String, tableName: String, indexName: String): Unit
def dropTable(
hive: Hive,
dbName: String,
tableName: String,
deleteData: Boolean,
ignoreIfNotExists: Boolean,
purge: Boolean): Unit
def dropPartition(
hive: Hive,
dbName: String,
tableName: String,
part: JList[String],
deleteData: Boolean,
purge: Boolean): Unit
def getDatabaseOwnerName(db: Database): String
def setDatabaseOwnerName(db: Database, owner: String): Unit
protected def findStaticMethod(klass: Class[_], name: String, args: Class[_]*): Method = {
val method = findMethod(klass, name, args: _*)
require(Modifier.isStatic(method.getModifiers()),
s"Method $name of class $klass is not static.")
method
}
def getMSC(hive: Hive): IMetaStoreClient
protected def findMethod(klass: Class[_], name: String, args: Class[_]*): Method = {
klass.getMethod(name, args: _*)
}
}
private[client] class Shim_v0_12 extends Shim with Logging {
// See HIVE-12224, HOLD_DDLTIME was broken as soon as it landed
protected lazy val holdDDLTime = JBoolean.FALSE
// deletes the underlying data along with metadata
protected lazy val deleteDataInDropIndex = JBoolean.TRUE
protected lazy val getMSCMethod = {
// Since getMSC() in Hive 0.12 is private, findMethod() could not work here
val msc = classOf[Hive].getDeclaredMethod("getMSC")
msc.setAccessible(true)
msc
}
override def getMSC(hive: Hive): IMetaStoreClient = {
getMSCMethod.invoke(hive).asInstanceOf[IMetaStoreClient]
}
private lazy val startMethod =
findStaticMethod(
classOf[SessionState],
"start",
classOf[SessionState])
private lazy val getDataLocationMethod = findMethod(classOf[Table], "getDataLocation")
private lazy val setDataLocationMethod =
findMethod(
classOf[Table],
"setDataLocation",
classOf[URI])
private lazy val getAllPartitionsMethod =
findMethod(
classOf[Hive],
"getAllPartitionsForPruner",
classOf[Table])
private lazy val getCommandProcessorMethod =
findStaticMethod(
classOf[CommandProcessorFactory],
"get",
classOf[String],
classOf[HiveConf])
private lazy val getDriverResultsMethod =
findMethod(
classOf[Driver],
"getResults",
classOf[JArrayList[String]])
private lazy val createPartitionMethod =
findMethod(
classOf[Hive],
"createPartition",
classOf[Table],
classOf[JMap[String, String]],
classOf[Path],
classOf[JMap[String, String]],
classOf[String],
classOf[String],
JInteger.TYPE,
classOf[JList[Object]],
classOf[String],
classOf[JMap[String, String]],
classOf[JList[Object]],
classOf[JList[Object]])
private lazy val loadPartitionMethod =
findMethod(
classOf[Hive],
"loadPartition",
classOf[Path],
classOf[String],
classOf[JMap[String, String]],
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE)
private lazy val loadTableMethod =
findMethod(
classOf[Hive],
"loadTable",
classOf[Path],
classOf[String],
JBoolean.TYPE,
JBoolean.TYPE)
private lazy val loadDynamicPartitionsMethod =
findMethod(
classOf[Hive],
"loadDynamicPartitions",
classOf[Path],
classOf[String],
classOf[JMap[String, String]],
JBoolean.TYPE,
JInteger.TYPE,
JBoolean.TYPE,
JBoolean.TYPE)
private lazy val dropIndexMethod =
findMethod(
classOf[Hive],
"dropIndex",
classOf[String],
classOf[String],
classOf[String],
JBoolean.TYPE)
private lazy val alterTableMethod =
findMethod(
classOf[Hive],
"alterTable",
classOf[String],
classOf[Table])
private lazy val alterPartitionsMethod =
findMethod(
classOf[Hive],
"alterPartitions",
classOf[String],
classOf[JList[Partition]])
override def setCurrentSessionState(state: SessionState): Unit = {
// Starting from Hive 0.13, setCurrentSessionState will internally override
// the context class loader of the current thread by the class loader set in
// the conf of the SessionState. So, for this Hive 0.12 shim, we add the same
// behavior and make shim.setCurrentSessionState of all Hive versions have the
// consistent behavior.
Thread.currentThread().setContextClassLoader(state.getConf.getClassLoader)
startMethod.invoke(null, state)
}
override def getDataLocation(table: Table): Option[String] =
Option(getDataLocationMethod.invoke(table)).map(_.toString())
override def setDataLocation(table: Table, loc: String): Unit =
setDataLocationMethod.invoke(table, new URI(loc))
// Follows exactly the same logic of DDLTask.createPartitions in Hive 0.12
override def createPartitions(
hive: Hive,
database: String,
tableName: String,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit = {
val table = hive.getTable(database, tableName)
parts.foreach { s =>
val location = s.storage.locationUri.map(
uri => new Path(table.getPath, new Path(uri))).orNull
val params = if (s.parameters.nonEmpty) s.parameters.asJava else null
val spec = s.spec.asJava
if (hive.getPartition(table, spec, false) != null && ignoreIfExists) {
// Ignore this partition since it already exists and ignoreIfExists == true
} else {
if (location == null && table.isView()) {
throw new HiveException("LOCATION clause illegal for view partition");
}
createPartitionMethod.invoke(
hive,
table,
spec,
location,
params, // partParams
null, // inputFormat
null, // outputFormat
-1: JInteger, // numBuckets
null, // cols
null, // serializationLib
null, // serdeParams
null, // bucketCols
null) // sortCols
}
}
}
override def getAllPartitions(hive: Hive, table: Table): Seq[Partition] =
getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]].asScala.toSeq
override def getPartitionsByFilter(
hive: Hive,
table: Table,
predicates: Seq[Expression]): Seq[Partition] = {
// getPartitionsByFilter() doesn't support binary comparison ops in Hive 0.12.
// See HIVE-4888.
logDebug("Hive 0.12 doesn't support predicate pushdown to metastore. " +
"Please use Hive 0.13 or higher.")
getAllPartitions(hive, table)
}
override def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor =
getCommandProcessorMethod.invoke(null, token, conf).asInstanceOf[CommandProcessor]
override def getDriverResults(driver: Driver): Seq[String] = {
val res = new JArrayList[String]()
getDriverResultsMethod.invoke(driver, res)
res.asScala
}
override def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long = {
conf.getIntVar(HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY) * 1000L
}
override def getTablesByType(
hive: Hive,
dbName: String,
pattern: String,
tableType: TableType): Seq[String] = {
throw new UnsupportedOperationException("Hive 2.2 and lower versions don't support " +
"getTablesByType. Please use Hive 2.3 or higher version.")
}
override def loadPartition(
hive: Hive,
loadPath: Path,
tableName: String,
partSpec: JMap[String, String],
replace: Boolean,
inheritTableSpecs: Boolean,
isSkewedStoreAsSubdir: Boolean,
isSrcLocal: Boolean): Unit = {
loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
JBoolean.FALSE, inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean)
}
override def loadTable(
hive: Hive,
loadPath: Path,
tableName: String,
replace: Boolean,
isSrcLocal: Boolean): Unit = {
loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, holdDDLTime)
}
override def loadDynamicPartitions(
hive: Hive,
loadPath: Path,
tableName: String,
partSpec: JMap[String, String],
replace: Boolean,
numDP: Int,
listBucketingEnabled: Boolean): Unit = {
loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
numDP: JInteger, holdDDLTime, listBucketingEnabled: JBoolean)
}
override def dropIndex(hive: Hive, dbName: String, tableName: String, indexName: String): Unit = {
dropIndexMethod.invoke(hive, dbName, tableName, indexName, deleteDataInDropIndex)
}
override def dropTable(
hive: Hive,
dbName: String,
tableName: String,
deleteData: Boolean,
ignoreIfNotExists: Boolean,
purge: Boolean): Unit = {
if (purge) {
throw new UnsupportedOperationException("DROP TABLE ... PURGE")
}
hive.dropTable(dbName, tableName, deleteData, ignoreIfNotExists)
}
override def alterTable(hive: Hive, tableName: String, table: Table): Unit = {
alterTableMethod.invoke(hive, tableName, table)
}
override def alterPartitions(hive: Hive, tableName: String, newParts: JList[Partition]): Unit = {
alterPartitionsMethod.invoke(hive, tableName, newParts)
}
override def dropPartition(
hive: Hive,
dbName: String,
tableName: String,
part: JList[String],
deleteData: Boolean,
purge: Boolean): Unit = {
if (purge) {
throw new UnsupportedOperationException("ALTER TABLE ... DROP PARTITION ... PURGE")
}
hive.dropPartition(dbName, tableName, part, deleteData)
}
override def createFunction(hive: Hive, db: String, func: CatalogFunction): Unit = {
throw new AnalysisException("Hive 0.12 doesn't support creating permanent functions. " +
"Please use Hive 0.13 or higher.")
}
def dropFunction(hive: Hive, db: String, name: String): Unit = {
throw new NoSuchPermanentFunctionException(db, name)
}
def renameFunction(hive: Hive, db: String, oldName: String, newName: String): Unit = {
throw new NoSuchPermanentFunctionException(db, oldName)
}
def alterFunction(hive: Hive, db: String, func: CatalogFunction): Unit = {
throw new NoSuchPermanentFunctionException(db, func.identifier.funcName)
}
def getFunctionOption(hive: Hive, db: String, name: String): Option[CatalogFunction] = {
None
}
def listFunctions(hive: Hive, db: String, pattern: String): Seq[String] = {
Seq.empty[String]
}
override def getDatabaseOwnerName(db: Database): String = ""
override def setDatabaseOwnerName(db: Database, owner: String): Unit = {}
}
private[client] class Shim_v0_13 extends Shim_v0_12 {
private lazy val setCurrentSessionStateMethod =
findStaticMethod(
classOf[SessionState],
"setCurrentSessionState",
classOf[SessionState])
private lazy val setDataLocationMethod =
findMethod(
classOf[Table],
"setDataLocation",
classOf[Path])
private lazy val getAllPartitionsMethod =
findMethod(
classOf[Hive],
"getAllPartitionsOf",
classOf[Table])
private lazy val getPartitionsByFilterMethod =
findMethod(
classOf[Hive],
"getPartitionsByFilter",
classOf[Table],
classOf[String])
private lazy val getCommandProcessorMethod =
findStaticMethod(
classOf[CommandProcessorFactory],
"get",
classOf[Array[String]],
classOf[HiveConf])
private lazy val getDriverResultsMethod =
findMethod(
classOf[Driver],
"getResults",
classOf[JList[Object]])
private lazy val getDatabaseOwnerNameMethod =
findMethod(
classOf[Database],
"getOwnerName")
private lazy val setDatabaseOwnerNameMethod =
findMethod(
classOf[Database],
"setOwnerName",
classOf[String])
override def setCurrentSessionState(state: SessionState): Unit =
setCurrentSessionStateMethod.invoke(null, state)
override def setDataLocation(table: Table, loc: String): Unit =
setDataLocationMethod.invoke(table, new Path(loc))
override def createPartitions(
hive: Hive,
db: String,
table: String,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit = {
val addPartitionDesc = new AddPartitionDesc(db, table, ignoreIfExists)
parts.zipWithIndex.foreach { case (s, i) =>
addPartitionDesc.addPartition(
s.spec.asJava, s.storage.locationUri.map(CatalogUtils.URIToString(_)).orNull)
if (s.parameters.nonEmpty) {
addPartitionDesc.getPartition(i).setPartParams(s.parameters.asJava)
}
}
hive.createPartitions(addPartitionDesc)
}
override def getAllPartitions(hive: Hive, table: Table): Seq[Partition] =
getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]].asScala.toSeq
private def toHiveFunction(f: CatalogFunction, db: String): HiveFunction = {
val resourceUris = f.resources.map { resource =>
new ResourceUri(ResourceType.valueOf(
resource.resourceType.resourceType.toUpperCase(Locale.ROOT)), resource.uri)
}
new HiveFunction(
f.identifier.funcName,
db,
f.className,
null,
PrincipalType.USER,
TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis).toInt,
FunctionType.JAVA,
resourceUris.asJava)
}
override def createFunction(hive: Hive, db: String, func: CatalogFunction): Unit = {
hive.createFunction(toHiveFunction(func, db))
}
override def dropFunction(hive: Hive, db: String, name: String): Unit = {
hive.dropFunction(db, name)
}
override def renameFunction(hive: Hive, db: String, oldName: String, newName: String): Unit = {
val catalogFunc = getFunctionOption(hive, db, oldName)
.getOrElse(throw new NoSuchPermanentFunctionException(db, oldName))
.copy(identifier = FunctionIdentifier(newName, Some(db)))
val hiveFunc = toHiveFunction(catalogFunc, db)
hive.alterFunction(db, oldName, hiveFunc)
}
override def alterFunction(hive: Hive, db: String, func: CatalogFunction): Unit = {
hive.alterFunction(db, func.identifier.funcName, toHiveFunction(func, db))
}
private def fromHiveFunction(hf: HiveFunction): CatalogFunction = {
val name = FunctionIdentifier(hf.getFunctionName, Option(hf.getDbName))
val resources = hf.getResourceUris.asScala.map { uri =>
val resourceType = uri.getResourceType() match {
case ResourceType.ARCHIVE => "archive"
case ResourceType.FILE => "file"
case ResourceType.JAR => "jar"
case r => throw new AnalysisException(s"Unknown resource type: $r")
}
FunctionResource(FunctionResourceType.fromString(resourceType), uri.getUri())
}
CatalogFunction(name, hf.getClassName, resources)
}
override def getFunctionOption(hive: Hive, db: String, name: String): Option[CatalogFunction] = {
try {
Option(hive.getFunction(db, name)).map(fromHiveFunction)
} catch {
case NonFatal(e) if isCausedBy(e, s"$name does not exist") =>
None
}
}
private def isCausedBy(e: Throwable, matchMassage: String): Boolean = {
if (e.getMessage.contains(matchMassage)) {
true
} else if (e.getCause != null) {
isCausedBy(e.getCause, matchMassage)
} else {
false
}
}
override def listFunctions(hive: Hive, db: String, pattern: String): Seq[String] = {
hive.getFunctions(db, pattern).asScala
}
/**
* Converts catalyst expression to the format that Hive's getPartitionsByFilter() expects, i.e.
* a string that represents partition predicates like "str_key=\"value\" and int_key=1 ...".
*
* Unsupported predicates are skipped.
*/
def convertFilters(table: Table, filters: Seq[Expression]): String = {
/**
* An extractor that matches all binary comparison operators except null-safe equality.
*
* Null-safe equality is not supported by Hive metastore partition predicate pushdown
*/
object SpecialBinaryComparison {
def unapply(e: BinaryComparison): Option[(Expression, Expression)] = e match {
case _: EqualNullSafe => None
case _ => Some((e.left, e.right))
}
}
object ExtractableLiteral {
def unapply(expr: Expression): Option[String] = expr match {
case Literal(null, _) => None // `null`s can be cast as other types; we want to avoid NPEs.
case Literal(value, _: IntegralType) => Some(value.toString)
case Literal(value, _: StringType) => Some(quoteStringLiteral(value.toString))
case _ => None
}
}
object ExtractableLiterals {
def unapply(exprs: Seq[Expression]): Option[Seq[String]] = {
// SPARK-24879: The Hive metastore filter parser does not support "null", but we still want
// to push down as many predicates as we can while still maintaining correctness.
// In SQL, the `IN` expression evaluates as follows:
// > `1 in (2, NULL)` -> NULL
// > `1 in (1, NULL)` -> true
// > `1 in (2)` -> false
// Since Hive metastore filters are NULL-intolerant binary operations joined only by
// `AND` and `OR`, we can treat `NULL` as `false` and thus rewrite `1 in (2, NULL)` as
// `1 in (2)`.
// If the Hive metastore begins supporting NULL-tolerant predicates and Spark starts
// pushing down these predicates, then this optimization will become incorrect and need
// to be changed.
val extractables = exprs
.filter {
case Literal(null, _) => false
case _ => true
}.map(ExtractableLiteral.unapply)
if (extractables.nonEmpty && extractables.forall(_.isDefined)) {
Some(extractables.map(_.get))
} else {
None
}
}
}
object ExtractableValues {
private lazy val valueToLiteralString: PartialFunction[Any, String] = {
case value: Byte => value.toString
case value: Short => value.toString
case value: Int => value.toString
case value: Long => value.toString
case value: UTF8String => quoteStringLiteral(value.toString)
}
def unapply(values: Set[Any]): Option[Seq[String]] = {
val extractables = values.toSeq.map(valueToLiteralString.lift)
if (extractables.nonEmpty && extractables.forall(_.isDefined)) {
Some(extractables.map(_.get))
} else {
None
}
}
}
object SupportedAttribute {
// hive varchar is treated as catalyst string, but hive varchar can't be pushed down.
private val varcharKeys = table.getPartitionKeys.asScala
.filter(col => col.getType.startsWith(serdeConstants.VARCHAR_TYPE_NAME) ||
col.getType.startsWith(serdeConstants.CHAR_TYPE_NAME))
.map(col => col.getName).toSet
def unapply(attr: Attribute): Option[String] = {
val resolver = SQLConf.get.resolver
if (varcharKeys.exists(c => resolver(c, attr.name))) {
None
} else if (attr.dataType.isInstanceOf[IntegralType] || attr.dataType == StringType) {
Some(attr.name)
} else {
None
}
}
}
def convertInToOr(name: String, values: Seq[String]): String = {
values.map(value => s"$name = $value").mkString("(", " or ", ")")
}
val useAdvanced = SQLConf.get.advancedPartitionPredicatePushdownEnabled
object ExtractAttribute {
def unapply(expr: Expression): Option[Attribute] = {
expr match {
case attr: Attribute => Some(attr)
case Cast(child @ AtomicType(), dt: AtomicType, _)
if Cast.canUpCast(child.dataType.asInstanceOf[AtomicType], dt) => unapply(child)
case _ => None
}
}
}
def convert(expr: Expression): Option[String] = expr match {
case In(ExtractAttribute(SupportedAttribute(name)), ExtractableLiterals(values))
if useAdvanced =>
Some(convertInToOr(name, values))
case InSet(ExtractAttribute(SupportedAttribute(name)), ExtractableValues(values))
if useAdvanced =>
Some(convertInToOr(name, values))
case op @ SpecialBinaryComparison(
ExtractAttribute(SupportedAttribute(name)), ExtractableLiteral(value)) =>
Some(s"$name ${op.symbol} $value")
case op @ SpecialBinaryComparison(
ExtractableLiteral(value), ExtractAttribute(SupportedAttribute(name))) =>
Some(s"$value ${op.symbol} $name")
case And(expr1, expr2) if useAdvanced =>
val converted = convert(expr1) ++ convert(expr2)
if (converted.isEmpty) {
None
} else {
Some(converted.mkString("(", " and ", ")"))
}
case Or(expr1, expr2) if useAdvanced =>
for {
left <- convert(expr1)
right <- convert(expr2)
} yield s"($left or $right)"
case _ => None
}
filters.flatMap(convert).mkString(" and ")
}
private def quoteStringLiteral(str: String): String = {
if (!str.contains("\"")) {
s""""$str""""
} else if (!str.contains("'")) {
s"""'$str'"""
} else {
throw new UnsupportedOperationException(
"""Partition filter cannot have both `"` and `'` characters""")
}
}
override def getPartitionsByFilter(
hive: Hive,
table: Table,
predicates: Seq[Expression]): Seq[Partition] = {
// Hive getPartitionsByFilter() takes a string that represents partition
// predicates like "str_key=\"value\" and int_key=1 ..."
val filter = convertFilters(table, predicates)
val partitions =
if (filter.isEmpty) {
getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]]
} else {
logDebug(s"Hive metastore filter is '$filter'.")
val tryDirectSqlConfVar = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL
// We should get this config value from the metaStore. otherwise hit SPARK-18681.
// To be compatible with hive-0.12 and hive-0.13, In the future we can achieve this by:
// val tryDirectSql = hive.getMetaConf(tryDirectSqlConfVar.varname).toBoolean
val tryDirectSql = hive.getMSC.getConfigValue(tryDirectSqlConfVar.varname,
tryDirectSqlConfVar.defaultBoolVal.toString).toBoolean
try {
// Hive may throw an exception when calling this method in some circumstances, such as
// when filtering on a non-string partition column when the hive config key
// hive.metastore.try.direct.sql is false
getPartitionsByFilterMethod.invoke(hive, table, filter)
.asInstanceOf[JArrayList[Partition]]
} catch {
case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] &&
!tryDirectSql =>
logWarning("Caught Hive MetaException attempting to get partition metadata by " +
"filter from Hive. Falling back to fetching all partition metadata, which will " +
"degrade performance. Modifying your Hive metastore configuration to set " +
s"${tryDirectSqlConfVar.varname} to true may resolve this problem.", ex)
// HiveShim clients are expected to handle a superset of the requested partitions
getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]]
case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] &&
tryDirectSql =>
throw new RuntimeException("Caught Hive MetaException attempting to get partition " +
"metadata by filter from Hive. You can set the Spark configuration setting " +
s"${SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key} to false to work around this " +
"problem, however this will result in degraded performance. Please report a bug: " +
"https://issues.apache.org/jira/browse/SPARK", ex)
}
}
partitions.asScala.toSeq
}
override def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor =
getCommandProcessorMethod.invoke(null, Array(token), conf).asInstanceOf[CommandProcessor]
override def getDriverResults(driver: Driver): Seq[String] = {
val res = new JArrayList[Object]()
getDriverResultsMethod.invoke(driver, res)
res.asScala.map { r =>
r match {
case s: String => s
case a: Array[Object] => a(0).asInstanceOf[String]
}
}
}
override def getDatabaseOwnerName(db: Database): String = {
Option(getDatabaseOwnerNameMethod.invoke(db)).map(_.asInstanceOf[String]).getOrElse("")
}
override def setDatabaseOwnerName(db: Database, owner: String): Unit = {
setDatabaseOwnerNameMethod.invoke(db, owner)
}
}
private[client] class Shim_v0_14 extends Shim_v0_13 {
// true if this is an ACID operation
protected lazy val isAcid = JBoolean.FALSE
// true if list bucketing enabled
protected lazy val isSkewedStoreAsSubdir = JBoolean.FALSE
private lazy val loadPartitionMethod =
findMethod(
classOf[Hive],
"loadPartition",
classOf[Path],
classOf[String],
classOf[JMap[String, String]],
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE)
private lazy val loadTableMethod =
findMethod(
classOf[Hive],
"loadTable",
classOf[Path],
classOf[String],
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE)
private lazy val loadDynamicPartitionsMethod =
findMethod(
classOf[Hive],
"loadDynamicPartitions",
classOf[Path],
classOf[String],
classOf[JMap[String, String]],
JBoolean.TYPE,
JInteger.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE)
private lazy val dropTableMethod =
findMethod(
classOf[Hive],
"dropTable",
classOf[String],
classOf[String],
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE)
private lazy val getTimeVarMethod =
findMethod(
classOf[HiveConf],
"getTimeVar",
classOf[HiveConf.ConfVars],
classOf[TimeUnit])
override def loadPartition(
hive: Hive,
loadPath: Path,
tableName: String,
partSpec: JMap[String, String],
replace: Boolean,
inheritTableSpecs: Boolean,
isSkewedStoreAsSubdir: Boolean,
isSrcLocal: Boolean): Unit = {
loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
holdDDLTime, inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean,
isSrcLocal: JBoolean, isAcid)
}
override def loadTable(
hive: Hive,
loadPath: Path,
tableName: String,
replace: Boolean,
isSrcLocal: Boolean): Unit = {
loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, holdDDLTime,
isSrcLocal: JBoolean, isSkewedStoreAsSubdir, isAcid)
}
override def loadDynamicPartitions(
hive: Hive,
loadPath: Path,
tableName: String,
partSpec: JMap[String, String],
replace: Boolean,
numDP: Int,
listBucketingEnabled: Boolean): Unit = {
loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
numDP: JInteger, holdDDLTime, listBucketingEnabled: JBoolean, isAcid)
}
override def dropTable(
hive: Hive,
dbName: String,
tableName: String,
deleteData: Boolean,
ignoreIfNotExists: Boolean,
purge: Boolean): Unit = {
dropTableMethod.invoke(hive, dbName, tableName, deleteData: JBoolean,
ignoreIfNotExists: JBoolean, purge: JBoolean)
}
override def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long = {
getTimeVarMethod.invoke(
conf,
HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY,
TimeUnit.MILLISECONDS).asInstanceOf[Long]
}
}
private[client] class Shim_v1_0 extends Shim_v0_14
private[client] class Shim_v1_1 extends Shim_v1_0 {
// throws an exception if the index does not exist
protected lazy val throwExceptionInDropIndex = JBoolean.TRUE
private lazy val dropIndexMethod =
findMethod(
classOf[Hive],
"dropIndex",
classOf[String],
classOf[String],
classOf[String],
JBoolean.TYPE,
JBoolean.TYPE)
override def dropIndex(hive: Hive, dbName: String, tableName: String, indexName: String): Unit = {
dropIndexMethod.invoke(hive, dbName, tableName, indexName, throwExceptionInDropIndex,
deleteDataInDropIndex)
}
}
private[client] class Shim_v1_2 extends Shim_v1_1 {
// txnId can be 0 unless isAcid == true
protected lazy val txnIdInLoadDynamicPartitions: JLong = 0L
private lazy val loadDynamicPartitionsMethod =
findMethod(
classOf[Hive],
"loadDynamicPartitions",
classOf[Path],
classOf[String],
classOf[JMap[String, String]],
JBoolean.TYPE,
JInteger.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
JLong.TYPE)
private lazy val dropOptionsClass =
Utils.classForName("org.apache.hadoop.hive.metastore.PartitionDropOptions")
private lazy val dropOptionsDeleteData = dropOptionsClass.getField("deleteData")
private lazy val dropOptionsPurge = dropOptionsClass.getField("purgeData")
private lazy val dropPartitionMethod =
findMethod(
classOf[Hive],
"dropPartition",
classOf[String],
classOf[String],
classOf[JList[String]],
dropOptionsClass)
override def loadDynamicPartitions(
hive: Hive,
loadPath: Path,
tableName: String,
partSpec: JMap[String, String],
replace: Boolean,
numDP: Int,
listBucketingEnabled: Boolean): Unit = {
loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
numDP: JInteger, holdDDLTime, listBucketingEnabled: JBoolean, isAcid,
txnIdInLoadDynamicPartitions)
}
override def dropPartition(
hive: Hive,
dbName: String,
tableName: String,
part: JList[String],
deleteData: Boolean,
purge: Boolean): Unit = {
val dropOptions = dropOptionsClass.getConstructor().newInstance().asInstanceOf[Object]
dropOptionsDeleteData.setBoolean(dropOptions, deleteData)
dropOptionsPurge.setBoolean(dropOptions, purge)
dropPartitionMethod.invoke(hive, dbName, tableName, part, dropOptions)
}
}
private[client] class Shim_v2_0 extends Shim_v1_2 {
private lazy val loadPartitionMethod =
findMethod(
classOf[Hive],
"loadPartition",
classOf[Path],
classOf[String],
classOf[JMap[String, String]],
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE)
private lazy val loadTableMethod =
findMethod(
classOf[Hive],
"loadTable",
classOf[Path],
classOf[String],
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE)
private lazy val loadDynamicPartitionsMethod =
findMethod(
classOf[Hive],
"loadDynamicPartitions",
classOf[Path],
classOf[String],
classOf[JMap[String, String]],
JBoolean.TYPE,
JInteger.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
JLong.TYPE)
override def loadPartition(
hive: Hive,
loadPath: Path,
tableName: String,
partSpec: JMap[String, String],
replace: Boolean,
inheritTableSpecs: Boolean,
isSkewedStoreAsSubdir: Boolean,
isSrcLocal: Boolean): Unit = {
loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean,
isSrcLocal: JBoolean, isAcid)
}
override def loadTable(
hive: Hive,
loadPath: Path,
tableName: String,
replace: Boolean,
isSrcLocal: Boolean): Unit = {
loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, isSrcLocal: JBoolean,
isSkewedStoreAsSubdir, isAcid)
}
override def loadDynamicPartitions(
hive: Hive,
loadPath: Path,
tableName: String,
partSpec: JMap[String, String],
replace: Boolean,
numDP: Int,
listBucketingEnabled: Boolean): Unit = {
loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
numDP: JInteger, listBucketingEnabled: JBoolean, isAcid, txnIdInLoadDynamicPartitions)
}
}
private[client] class Shim_v2_1 extends Shim_v2_0 {
// true if there is any following stats task
protected lazy val hasFollowingStatsTask = JBoolean.FALSE
// TODO: Now, always set environmentContext to null. In the future, we should avoid setting
// hive-generated stats to -1 when altering tables by using environmentContext. See Hive-12730
protected lazy val environmentContextInAlterTable = null
private lazy val loadPartitionMethod =
findMethod(
classOf[Hive],
"loadPartition",
classOf[Path],
classOf[String],
classOf[JMap[String, String]],
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE)
private lazy val loadTableMethod =
findMethod(
classOf[Hive],
"loadTable",
classOf[Path],
classOf[String],
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE)
private lazy val loadDynamicPartitionsMethod =
findMethod(
classOf[Hive],
"loadDynamicPartitions",
classOf[Path],
classOf[String],
classOf[JMap[String, String]],
JBoolean.TYPE,
JInteger.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
JLong.TYPE,
JBoolean.TYPE,
classOf[AcidUtils.Operation])
private lazy val alterTableMethod =
findMethod(
classOf[Hive],
"alterTable",
classOf[String],
classOf[Table],
classOf[EnvironmentContext])
private lazy val alterPartitionsMethod =
findMethod(
classOf[Hive],
"alterPartitions",
classOf[String],
classOf[JList[Partition]],
classOf[EnvironmentContext])
override def loadPartition(
hive: Hive,
loadPath: Path,
tableName: String,
partSpec: JMap[String, String],
replace: Boolean,
inheritTableSpecs: Boolean,
isSkewedStoreAsSubdir: Boolean,
isSrcLocal: Boolean): Unit = {
loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean,
isSrcLocal: JBoolean, isAcid, hasFollowingStatsTask)
}
override def loadTable(
hive: Hive,
loadPath: Path,
tableName: String,
replace: Boolean,
isSrcLocal: Boolean): Unit = {
loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, isSrcLocal: JBoolean,
isSkewedStoreAsSubdir, isAcid, hasFollowingStatsTask)
}
override def loadDynamicPartitions(
hive: Hive,
loadPath: Path,
tableName: String,
partSpec: JMap[String, String],
replace: Boolean,
numDP: Int,
listBucketingEnabled: Boolean): Unit = {
loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
numDP: JInteger, listBucketingEnabled: JBoolean, isAcid, txnIdInLoadDynamicPartitions,
hasFollowingStatsTask, AcidUtils.Operation.NOT_ACID)
}
override def alterTable(hive: Hive, tableName: String, table: Table): Unit = {
alterTableMethod.invoke(hive, tableName, table, environmentContextInAlterTable)
}
override def alterPartitions(hive: Hive, tableName: String, newParts: JList[Partition]): Unit = {
alterPartitionsMethod.invoke(hive, tableName, newParts, environmentContextInAlterTable)
}
}
private[client] class Shim_v2_2 extends Shim_v2_1
private[client] class Shim_v2_3 extends Shim_v2_1 {
private lazy val getTablesByTypeMethod =
findMethod(
classOf[Hive],
"getTablesByType",
classOf[String],
classOf[String],
classOf[TableType])
override def getTablesByType(
hive: Hive,
dbName: String,
pattern: String,
tableType: TableType): Seq[String] = {
getTablesByTypeMethod.invoke(hive, dbName, pattern, tableType)
.asInstanceOf[JList[String]].asScala
}
}
private[client] class Shim_v3_0 extends Shim_v2_3 {
// Spark supports only non-ACID operations
protected lazy val isAcidIUDoperation = JBoolean.FALSE
// Writer ID can be 0 for non-ACID operations
protected lazy val writeIdInLoadTableOrPartition: JLong = 0L
// Statement ID
protected lazy val stmtIdInLoadTableOrPartition: JInteger = 0
protected lazy val listBucketingLevel: JInteger = 0
private lazy val clazzLoadFileType = getClass.getClassLoader.loadClass(
"org.apache.hadoop.hive.ql.plan.LoadTableDesc$LoadFileType")
private lazy val loadPartitionMethod =
findMethod(
classOf[Hive],
"loadPartition",
classOf[Path],
classOf[Table],
classOf[JMap[String, String]],
clazzLoadFileType,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
classOf[JLong],
JInteger.TYPE,
JBoolean.TYPE)
private lazy val loadTableMethod =
findMethod(
classOf[Hive],
"loadTable",
classOf[Path],
classOf[String],
clazzLoadFileType,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
classOf[JLong],
JInteger.TYPE,
JBoolean.TYPE)
private lazy val loadDynamicPartitionsMethod =
findMethod(
classOf[Hive],
"loadDynamicPartitions",
classOf[Path],
classOf[String],
classOf[JMap[String, String]],
clazzLoadFileType,
JInteger.TYPE,
JInteger.TYPE,
JBoolean.TYPE,
JLong.TYPE,
JInteger.TYPE,
JBoolean.TYPE,
classOf[AcidUtils.Operation],
JBoolean.TYPE)
override def loadPartition(
hive: Hive,
loadPath: Path,
tableName: String,
partSpec: JMap[String, String],
replace: Boolean,
inheritTableSpecs: Boolean,
isSkewedStoreAsSubdir: Boolean,
isSrcLocal: Boolean): Unit = {
val session = SparkSession.getActiveSession
assert(session.nonEmpty)
val database = session.get.sessionState.catalog.getCurrentDatabase
val table = hive.getTable(database, tableName)
val loadFileType = if (replace) {
clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("REPLACE_ALL"))
} else {
clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("KEEP_EXISTING"))
}
assert(loadFileType.isDefined)
loadPartitionMethod.invoke(hive, loadPath, table, partSpec, loadFileType.get,
inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean,
isSrcLocal: JBoolean, isAcid, hasFollowingStatsTask,
writeIdInLoadTableOrPartition, stmtIdInLoadTableOrPartition, replace: JBoolean)
}
override def loadTable(
hive: Hive,
loadPath: Path,
tableName: String,
replace: Boolean,
isSrcLocal: Boolean): Unit = {
val loadFileType = if (replace) {
clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("REPLACE_ALL"))
} else {
clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("KEEP_EXISTING"))
}
assert(loadFileType.isDefined)
loadTableMethod.invoke(hive, loadPath, tableName, loadFileType.get, isSrcLocal: JBoolean,
isSkewedStoreAsSubdir, isAcidIUDoperation, hasFollowingStatsTask,
writeIdInLoadTableOrPartition, stmtIdInLoadTableOrPartition: JInteger, replace: JBoolean)
}
override def loadDynamicPartitions(
hive: Hive,
loadPath: Path,
tableName: String,
partSpec: JMap[String, String],
replace: Boolean,
numDP: Int,
listBucketingEnabled: Boolean): Unit = {
val loadFileType = if (replace) {
clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("REPLACE_ALL"))
} else {
clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("KEEP_EXISTING"))
}
assert(loadFileType.isDefined)
loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, loadFileType.get,
numDP: JInteger, listBucketingLevel, isAcid, writeIdInLoadTableOrPartition,
stmtIdInLoadTableOrPartition, hasFollowingStatsTask, AcidUtils.Operation.NOT_ACID,
replace: JBoolean)
}
}
private[client] class Shim_v3_1 extends Shim_v3_0