blob: 0327ad85cba46a03f5e8add4a9a9c1d073a0b717 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
import java.util.concurrent.atomic.AtomicLong
import scala.collection.JavaConverters._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.hive.execution.command.CarbonSetCommand
import org.apache.spark.sql.profiler.{Profiler, SQLStart}
import org.apache.carbondata.core.util.{CarbonSessionInfo, ThreadLocalSessionInfo}
object CarbonUtils {
private val statementId = new AtomicLong(0)
private[sql] val threadStatementId = new ThreadLocal[Long]
private def withProfiler(sparkSession: SparkSession,
sqlText: String,
generateDF: (QueryExecution, SQLStart) => DataFrame): DataFrame = {
val sse = SQLStart(sqlText, CarbonUtils.statementId.getAndIncrement())
CarbonUtils.threadStatementId.set(sse.statementId)
sse.startTime = System.currentTimeMillis()
try {
val logicalPlan = sparkSession.sessionState.sqlParser.parsePlan(sqlText)
sse.parseEnd = System.currentTimeMillis()
val qe = sparkSession.sessionState.executePlan(logicalPlan)
qe.assertAnalyzed()
sse.isCommand = qe.analyzed match {
case c: Command => true
case u @ Union(children) if children.forall(_.isInstanceOf[Command]) => true
case _ => false
}
sse.analyzerEnd = System.currentTimeMillis()
generateDF(qe, sse)
} finally {
Profiler.invokeIfEnable {
if (sse.isCommand) {
sse.endTime = System.currentTimeMillis()
Profiler.send(sse)
} else {
Profiler.addStatementMessage(sse.statementId, sse)
}
}
}
}
def threadSet(key: String, value: String): Unit = {
var currentThreadSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
if (currentThreadSessionInfo == null) {
currentThreadSessionInfo = new CarbonSessionInfo()
}
else {
currentThreadSessionInfo = currentThreadSessionInfo.clone()
}
val threadParams = currentThreadSessionInfo.getThreadParams
CarbonSetCommand.validateAndSetValue(threadParams, key, value)
ThreadLocalSessionInfo.setCarbonSessionInfo(currentThreadSessionInfo)
}
def threadSet(key: String, value: Object): Unit = {
var currentThreadSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
if (currentThreadSessionInfo == null) {
currentThreadSessionInfo = new CarbonSessionInfo()
}
else {
currentThreadSessionInfo = currentThreadSessionInfo.clone()
}
currentThreadSessionInfo.getThreadParams.setExtraInfo(key, value)
ThreadLocalSessionInfo.setCarbonSessionInfo(currentThreadSessionInfo)
}
def threadUnset(key: String): Unit = {
val currentThreadSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
if (currentThreadSessionInfo != null) {
val currentThreadSessionInfoClone = currentThreadSessionInfo.clone()
val threadParams = currentThreadSessionInfoClone.getThreadParams
CarbonSetCommand.unsetValue(threadParams, key)
threadParams.removeExtraInfo(key)
ThreadLocalSessionInfo.setCarbonSessionInfo(currentThreadSessionInfoClone)
}
}
def updateSessionInfoToCurrentThread(sparkSession: SparkSession): Unit = {
val carbonSessionInfo = CarbonEnv.getInstance(sparkSession).carbonSessionInfo.clone()
val currentThreadSessionInfoOrig = ThreadLocalSessionInfo.getCarbonSessionInfo
if (currentThreadSessionInfoOrig != null) {
val currentThreadSessionInfo = currentThreadSessionInfoOrig.clone()
// copy all the thread parameters to apply to session parameters
currentThreadSessionInfo.getThreadParams.getAll.asScala
.foreach(entry => carbonSessionInfo.getSessionParams.addProperty(entry._1, entry._2))
carbonSessionInfo.setThreadParams(currentThreadSessionInfo.getThreadParams)
}
// preserve thread parameters across call
ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
ThreadLocalSessionInfo
.setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf())
}
}