blob: 8cf691844202d7af5e6bbe44a3768dc9cb3c7423 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
import java.io.File
import java.util.concurrent.atomic.AtomicLong
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession.Builder
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.command.mutation.merge.MergeDataSetBuilder
import org.apache.spark.sql.internal.{SessionState, SharedState, StaticSQLConf}
import org.apache.spark.sql.profiler.{Profiler, SQLStart}
import org.apache.spark.util.{CarbonReflectionUtils, Utils}
import org.apache.carbondata.common.annotations.InterfaceAudience
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.streaming.CarbonStreamingQueryListener
/**
* Session implementation for {org.apache.spark.sql.SparkSession}
* Implemented this class only to use our own SQL DDL commands.
* User needs to use {CarbonSession.getOrCreateCarbon} to create Carbon session.
*/
class CarbonSession(@transient val sc: SparkContext,
@transient private val existingSharedState: Option[SharedState],
@transient private val useHiveMetaStore: Boolean = true
) extends SparkSession(sc) { self =>
def this(sc: SparkContext) {
this(sc, None)
}
@transient
override lazy val sessionState: SessionState = {
CarbonReflectionUtils.getSessionState(sparkContext, this, useHiveMetaStore)
.asInstanceOf[SessionState]
}
/**
* State shared across sessions, including the `SparkContext`, cached data, listener,
* and a catalog that interacts with external systems.
*/
@transient
override lazy val sharedState: SharedState = {
existingSharedState match {
case Some(_) =>
val ss = existingSharedState.get
if (ss == null) {
new SharedState(sparkContext)
} else {
ss
}
case None =>
new SharedState(sparkContext)
}
}
override def newSession(): SparkSession = {
new CarbonSession(sparkContext, Some(sharedState), useHiveMetaStore)
}
/**
* Run search mode if enabled, otherwise run SparkSQL
*/
override def sql(sqlText: String): DataFrame = {
withProfiler(
sqlText,
(qe, sse) => {
new Dataset[Row](self, qe, RowEncoder(qe.analyzed.schema))
}
)
}
/**
* Return true if the specified sql statement will hit the datamap
* This API is for test purpose only
*/
@InterfaceAudience.Developer(Array("DataMap"))
def isDataMapHit(sqlStatement: String, dataMapName: String): Boolean = {
// explain command will output the dataMap information only if enable.query.statistics = true
val message = sql(s"EXPLAIN $sqlStatement").collect()
message(0).getString(0).contains(dataMapName)
}
/**
* Run SparkSQL directly
*/
def sparkSql(sqlText: String): DataFrame = {
withProfiler(
sqlText,
(qe, sse) => new Dataset[Row](self, qe, RowEncoder(qe.analyzed.schema))
)
}
private def withProfiler(
sqlText: String,
generateDF: (QueryExecution, SQLStart) => DataFrame): DataFrame = {
val sse = SQLStart(sqlText, CarbonSession.statementId.getAndIncrement())
CarbonSession.threadStatementId.set(sse.statementId)
sse.startTime = System.currentTimeMillis()
try {
val logicalPlan = sessionState.sqlParser.parsePlan(sqlText)
sse.parseEnd = System.currentTimeMillis()
val qe = sessionState.executePlan(logicalPlan)
qe.assertAnalyzed()
sse.isCommand = qe.analyzed match {
case c: Command => true
case u @ Union(children) if children.forall(_.isInstanceOf[Command]) => true
case _ => false
}
sse.analyzerEnd = System.currentTimeMillis()
generateDF(qe, sse)
} finally {
Profiler.invokeIfEnable {
if (sse.isCommand) {
sse.endTime = System.currentTimeMillis()
Profiler.send(sse)
} else {
Profiler.addStatementMessage(sse.statementId, sse)
}
}
}
}
}
object CarbonSession {
private val statementId = new AtomicLong(0)
private var enableInMemCatlog: Boolean = false
private[sql] val threadStatementId = new ThreadLocal[Long]()
implicit class CarbonBuilder(builder: Builder) {
def enableInMemoryCatalog(): Builder = {
enableInMemCatlog = true
builder
}
def getOrCreateCarbonSession(): SparkSession = {
getOrCreateCarbonSession(null, null)
}
def getOrCreateCarbonSession(storePath: String): SparkSession = {
getOrCreateCarbonSession(
storePath,
new File(CarbonCommonConstants.METASTORE_LOCATION_DEFAULT_VAL).getCanonicalPath)
}
def getOrCreateCarbonSession(storePath: String,
metaStorePath: String): SparkSession = synchronized {
if (!enableInMemCatlog) {
builder.enableHiveSupport()
}
val options =
getValue("options", builder).asInstanceOf[scala.collection.mutable.HashMap[String, String]]
val userSuppliedContext: Option[SparkContext] =
getValue("userSuppliedContext", builder).asInstanceOf[Option[SparkContext]]
CarbonReflectionUtils.updateCarbonSerdeInfo()
if (StringUtils.isNotBlank(metaStorePath)) {
val hadoopConf = new Configuration()
val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml")
if (configFile != null) {
hadoopConf.addResource(configFile)
}
if (options.get(CarbonCommonConstants.HIVE_CONNECTION_URL).isEmpty &&
hadoopConf.get(CarbonCommonConstants.HIVE_CONNECTION_URL) == null) {
val metaStorePathAbsolute = new File(metaStorePath).getCanonicalPath
val hiveMetaStoreDB = metaStorePathAbsolute + "/metastore_db"
options ++= Map[String, String]((CarbonCommonConstants.HIVE_CONNECTION_URL,
s"jdbc:derby:;databaseName=$hiveMetaStoreDB;create=true"))
}
}
// Get the session from current thread's active session.
var session: SparkSession = SparkSession.getActiveSession match {
case Some(sparkSession: CarbonSession) =>
if ((sparkSession ne null) && !sparkSession.sparkContext.isStopped) {
options.foreach { case (k, v) => sparkSession.sessionState.conf.setConfString(k, v) }
sparkSession
} else {
null
}
case _ => null
}
if (session ne null) {
return session
}
// Global synchronization so we will only set the default session once.
SparkSession.synchronized {
// If the current thread does not have an active session, get it from the global session.
session = SparkSession.getDefaultSession match {
case Some(sparkSession: CarbonSession) =>
if ((sparkSession ne null) && !sparkSession.sparkContext.isStopped) {
options.foreach { case (k, v) => sparkSession.sessionState.conf.setConfString(k, v) }
sparkSession
} else {
null
}
case _ => null
}
if (session ne null) {
return session
}
// No active nor global default session. Create a new one.
val sparkContext = userSuppliedContext.getOrElse {
// set app name if not given
val randomAppName = java.util.UUID.randomUUID().toString
val sparkConf = new SparkConf()
options.foreach { case (k, v) => sparkConf.set(k, v) }
if (!sparkConf.contains("spark.app.name")) {
sparkConf.setAppName(randomAppName)
}
sparkConf.set("spark.sql.extensions", "org.apache.spark.sql.CarbonExtensions")
val sc = SparkContext.getOrCreate(sparkConf)
// maybe this is an existing SparkContext, update its SparkConf which maybe used
// by SparkSession
options.foreach { case (k, v) => sc.conf.set(k, v) }
if (!sc.conf.contains("spark.app.name")) {
sc.conf.setAppName(randomAppName)
}
sc
}
// Initialize extensions if the user has defined a configurator class.
val extensionConfOption = sparkContext.conf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS)
val extensionInstance : SparkSessionExtensions = new SparkSessionExtensions
if (extensionConfOption.isDefined) {
val extensionConfClassName = extensionConfOption.get
try {
val extensionConfClass = Utils.classForName(extensionConfClassName)
val ex = extensionConfClass.newInstance()
.asInstanceOf[(SparkSessionExtensions) => Unit]
ex(extensionInstance)
} catch {
// Ignore the error if we cannot find the class or when the class has the wrong type.
case e @ (_: ClassCastException |
_: ClassNotFoundException |
_: NoClassDefFoundError) =>
// Ignore extensions
}
}
session = new CarbonSession(sparkContext, None, !enableInMemCatlog)
CarbonReflectionUtils.setSuperFieldToClass(session, "extensions", extensionInstance)
val carbonProperties = CarbonProperties.getInstance()
if (StringUtils.isNotBlank(storePath)) {
carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
// In case if it is in carbon.properties for backward compatible
} else if (carbonProperties.getProperty(CarbonCommonConstants.STORE_LOCATION) == null) {
carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION,
session.sessionState.conf.warehousePath)
}
options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
SparkSession.setDefaultSession(session)
// Register a successfully instantiated context to the singleton. This should be at the
// end of the class definition so that the singleton is updated only if there is no
// exception in the construction of the instance.
CarbonToSparkAdapter.addSparkListener(sparkContext)
session.streams.addListener(new CarbonStreamingQueryListener(session))
}
session
}
/**
* It is a hack to get the private field from class.
*/
def getValue(name: String, builder: Builder): Any = {
val currentMirror = scala.reflect.runtime.currentMirror
val instanceMirror = currentMirror.reflect(builder)
val m = currentMirror.classSymbol(builder.getClass).
toType.members.find { p =>
p.name.toString.equals(name)
}.get.asTerm
instanceMirror.reflectField(m).get
}
}
implicit class DataSetMerge(val ds: Dataset[Row]) {
def merge(srcDS: Dataset[Row], expr: String): MergeDataSetBuilder = {
new MergeDataSetBuilder(ds, srcDS, expr, ds.sparkSession)
}
def merge(srcDS: Dataset[Row], expr: Column): MergeDataSetBuilder = {
new MergeDataSetBuilder(ds, srcDS, expr, ds.sparkSession)
}
}
}