blob: 0a97f96a82924cd9af613c01bac5ad2bd2b8491b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
import java.io.IOException
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException}
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.events.{MergeBloomIndexEventListener, MergeIndexEventListener}
import org.apache.spark.sql.execution.command.cache._
import org.apache.spark.sql.execution.command.timeseries.TimeSeriesFunction
import org.apache.spark.sql.hive._
import org.apache.spark.sql.listeners.{AlterDataMaptableCompactionPostListener, DataMapAddColumnsPreListener, DataMapAlterTableDropPartitionMetaListener, DataMapAlterTableDropPartitionPreStatusListener, DataMapChangeDataTypeorRenameColumnPreListener, DataMapDeleteSegmentPreListener, DataMapDropColumnPreListener, DropCacheBloomEventListener, DropCacheDataMapEventListener, LoadMVTablePreListener, LoadPostDataMapListener, PrePrimingEventListener, ShowCacheDataMapEventListener, ShowCachePreMVEventListener}
import org.apache.spark.sql.profiler.Profiler
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util._
import org.apache.carbondata.datamap.{TextMatchMaxDocUDF, TextMatchUDF}
import org.apache.carbondata.events._
import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
import org.apache.carbondata.spark.rdd.SparkReadSupport
import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
/**
* Carbon Environment for unified context
*/
class CarbonEnv {
var carbonMetaStore: CarbonMetaStore = _
var sessionParams: SessionParams = _
var carbonSessionInfo: CarbonSessionInfo = _
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
// set readSupport class global so that the executor can get it.
SparkReadSupport.readSupportClass = classOf[SparkRowReadSupportImpl]
var initialized = false
def init(sparkSession: SparkSession): Unit = {
val properties = CarbonProperties.getInstance()
var storePath = properties.getProperty(CarbonCommonConstants.STORE_LOCATION)
if (storePath == null) {
storePath = sparkSession.conf.get("spark.sql.warehouse.dir")
properties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
}
LOGGER.info(s"Initializing CarbonEnv, store location: $storePath")
sparkSession.udf.register("getTupleId", () => "")
// added for handling MV table creation. when user will fire create ddl for
// create table we are adding a udf so no need to apply PreAggregate rules.
sparkSession.udf.register(CarbonEnv.MV_SKIP_RULE_UDF, () => "")
// register for lucene datamap
// TODO: move it to proper place, it should be registered by datamap implementation
sparkSession.udf.register("text_match", new TextMatchUDF)
sparkSession.udf.register("text_match_with_limit", new TextMatchMaxDocUDF)
// added for handling timeseries function like hour, minute, day , month , year
sparkSession.udf.register("timeseries", new TimeSeriesFunction)
// acquiring global level lock so global configuration will be updated by only one thread
CarbonEnv.carbonEnvMap.synchronized {
if (!initialized) {
// update carbon session parameters , preserve thread parameters
val currentThreadSesssionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
carbonSessionInfo = new CarbonSessionInfo()
// We should not corrupt the information in carbonSessionInfo object which is at the
// session level. Instead create a new object and in that set the user specified values in
// thread/session params
val threadLevelCarbonSessionInfo = new CarbonSessionInfo()
if (currentThreadSesssionInfo != null) {
threadLevelCarbonSessionInfo.setThreadParams(currentThreadSesssionInfo.getThreadParams)
}
ThreadLocalSessionInfo.setCarbonSessionInfo(threadLevelCarbonSessionInfo)
ThreadLocalSessionInfo.setConfigurationToCurrentThread(sparkSession
.sessionState.newHadoopConf())
val config = new CarbonSQLConf(sparkSession)
if (sparkSession.conf.getOption(CarbonCommonConstants.ENABLE_UNSAFE_SORT).isEmpty) {
config.addDefaultCarbonParams()
}
// add session params after adding DefaultCarbonParams
config.addDefaultCarbonSessionParams()
carbonMetaStore = {
// trigger event for CarbonEnv create
val operationContext = new OperationContext
val carbonEnvInitPreEvent: CarbonEnvInitPreEvent =
CarbonEnvInitPreEvent(sparkSession, carbonSessionInfo, storePath)
OperationListenerBus.getInstance.fireEvent(carbonEnvInitPreEvent, operationContext)
CarbonMetaStoreFactory.createCarbonMetaStore(sparkSession.conf)
}
CarbonProperties.getInstance
.addNonSerializableProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "true")
initialized = true
cleanChildTablesNotRegisteredInHive(sparkSession)
}
}
Profiler.initialize(sparkSession.sparkContext)
LOGGER.info("Initialize CarbonEnv completed...")
}
private def cleanChildTablesNotRegisteredInHive(sparkSession: SparkSession): Unit = {
// If in case JDBC application is killed/stopped, when create datamap was in progress, datamap
// table was created and datampschema was saved to the system, but table was not registered to
// metastore. So, when we restart JDBC application, we need to clean up
// stale tables and datamapschema's.
val dataMapSchemas = DataMapStoreManager.getInstance().getAllDataMapSchemas
dataMapSchemas.asScala.foreach {
dataMapSchema =>
if (null != dataMapSchema.getRelationIdentifier &&
!dataMapSchema.isIndexDataMap) {
if (!sparkSession.sessionState
.catalog
.tableExists(TableIdentifier(dataMapSchema.getRelationIdentifier.getTableName,
Some(dataMapSchema.getRelationIdentifier.getDatabaseName)))) {
try {
DataMapStoreManager.getInstance().dropDataMapSchema(dataMapSchema.getDataMapName)
} catch {
case e: IOException =>
throw e
} finally {
DataMapStoreManager.getInstance.unRegisterDataMapCatalog(dataMapSchema)
if (FileFactory.isFileExist(dataMapSchema.getRelationIdentifier.getTablePath)) {
CarbonUtil.deleteFoldersAndFilesSilent(FileFactory.getCarbonFile(dataMapSchema
.getRelationIdentifier
.getTablePath))
}
}
}
}
}
}
}
object CarbonEnv {
lazy val MV_SKIP_RULE_UDF = "mv"
val carbonEnvMap = new ConcurrentHashMap[SparkSession, CarbonEnv]
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
def getInstance(sparkSession: SparkSession): CarbonEnv = {
if (sparkSession.isInstanceOf[CarbonSession]) {
sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].getCarbonEnv
} else {
var carbonEnv: CarbonEnv = carbonEnvMap.get(sparkSession)
if (carbonEnv == null) {
carbonEnv = new CarbonEnv
carbonEnv.init(sparkSession)
carbonEnvMap.put(sparkSession, carbonEnv)
}
carbonEnv
}
}
/**
* Method
* 1. To initialize Listeners to their respective events in the OperationListenerBus
* 2. To register common listeners
* 3. Only initialize once for all the listeners in case of concurrent scenarios we have given
* val, as val initializes once
*/
val init = {
initListeners
}
/**
* Method to initialize Listeners to their respective events in the OperationListenerBus.
*/
def initListeners(): Unit = {
OperationListenerBus.getInstance()
.addListener(classOf[IndexServerLoadEvent], PrePrimingEventListener)
.addListener(classOf[LoadTablePreExecutionEvent], LoadMVTablePreListener)
.addListener(classOf[AlterTableCompactionPreStatusUpdateEvent],
AlterDataMaptableCompactionPostListener)
.addListener(classOf[LoadTablePreStatusUpdateEvent], new MergeIndexEventListener)
.addListener(classOf[LoadTablePostExecutionEvent], LoadPostDataMapListener)
.addListener(classOf[UpdateTablePostEvent], LoadPostDataMapListener )
.addListener(classOf[DeleteFromTablePostEvent], LoadPostDataMapListener )
.addListener(classOf[AlterTableMergeIndexEvent], new MergeIndexEventListener)
.addListener(classOf[BuildDataMapPostExecutionEvent], new MergeBloomIndexEventListener)
.addListener(classOf[DropTableCacheEvent], DropCacheDataMapEventListener)
.addListener(classOf[DropTableCacheEvent], DropCacheBloomEventListener)
.addListener(classOf[ShowTableCacheEvent], ShowCachePreMVEventListener)
.addListener(classOf[ShowTableCacheEvent], ShowCacheDataMapEventListener)
.addListener(classOf[DeleteSegmentByIdPreEvent], DataMapDeleteSegmentPreListener)
.addListener(classOf[DeleteSegmentByDatePreEvent], DataMapDeleteSegmentPreListener)
.addListener(classOf[AlterTableDropColumnPreEvent], DataMapDropColumnPreListener)
.addListener(classOf[AlterTableColRenameAndDataTypeChangePreEvent],
DataMapChangeDataTypeorRenameColumnPreListener)
.addListener(classOf[AlterTableAddColumnPreEvent], DataMapAddColumnsPreListener)
.addListener(classOf[AlterTableDropPartitionMetaEvent],
DataMapAlterTableDropPartitionMetaListener)
.addListener(classOf[AlterTableDropPartitionPreStatusEvent],
DataMapAlterTableDropPartitionPreStatusListener)
}
/**
* Return carbon table instance from cache or by looking up table in `sparkSession`
*/
def getCarbonTable(
databaseNameOp: Option[String],
tableName: String)
(sparkSession: SparkSession): CarbonTable = {
val catalog = getInstance(sparkSession).carbonMetaStore
// if relation is not refreshed of the table does not exist in cache then
if (isRefreshRequired(TableIdentifier(tableName, databaseNameOp))(sparkSession)) {
catalog
.lookupRelation(databaseNameOp, tableName)(sparkSession)
.asInstanceOf[CarbonRelation]
.carbonTable
} else {
CarbonMetadata.getInstance().getCarbonTable(databaseNameOp.getOrElse(sparkSession
.catalog.currentDatabase), tableName)
}
}
/**
*
* @return true is the relation was changes and was removed from cache. false is there is no
* change in the relation.
*/
def isRefreshRequired(identifier: TableIdentifier)(sparkSession: SparkSession): Boolean = {
val carbonEnv = getInstance(sparkSession)
val databaseName = identifier.database.getOrElse(sparkSession.catalog.currentDatabase)
val table = CarbonMetadata.getInstance().getCarbonTable(databaseName, identifier.table)
if (table == null) {
true
} else {
carbonEnv.carbonMetaStore.isSchemaRefreshed(AbsoluteTableIdentifier.from(table.getTablePath,
identifier.database.getOrElse(sparkSession.sessionState.catalog.getCurrentDatabase),
identifier.table, table.getTableInfo.getFactTable.getTableId), sparkSession)
}
}
/**
* Return carbon table instance by looking up table in `sparkSession`
*/
def getCarbonTable(
tableIdentifier: TableIdentifier)
(sparkSession: SparkSession): CarbonTable = {
getCarbonTable(tableIdentifier.database, tableIdentifier.table)(sparkSession)
}
/**
* Return database name or get default name from sparkSession
*/
def getDatabaseName(
databaseNameOp: Option[String]
)(sparkSession: SparkSession): String = {
databaseNameOp.getOrElse(sparkSession.sessionState.catalog.getCurrentDatabase)
}
/**
* Returns true with the database folder exists in file system. False in all other scenarios.
*/
def databaseLocationExists(dbName: String,
sparkSession: SparkSession, ifExists: Boolean): Boolean = {
try {
FileFactory.getCarbonFile(getDatabaseLocation(dbName, sparkSession)).exists()
} catch {
case e: NoSuchDatabaseException =>
if (ifExists) {
false
} else {
throw e
}
}
}
/**
* The method returns the database location
* if carbon.storeLocation does point to spark.sql.warehouse.dir then returns
* the database locationUri as database location else follows the old behaviour
* making database location from carbon fixed store and database name.
* @return database location
*/
def getDatabaseLocation(dbName: String, sparkSession: SparkSession): String = {
var databaseLocation =
sparkSession.sessionState.catalog.asInstanceOf[SessionCatalog].getDatabaseMetadata(dbName)
.locationUri.toString
// for default database and db ends with .db
// check whether the carbon store and hive store is same or different.
if (dbName.equals("default") || databaseLocation.endsWith(".db")) {
val properties = CarbonProperties.getInstance()
val carbonStorePath =
FileFactory.getUpdatedFilePath(properties.getProperty(CarbonCommonConstants.STORE_LOCATION))
val hiveStorePath =
FileFactory.getUpdatedFilePath(sparkSession.conf.get("spark.sql.warehouse.dir"))
// if carbon.store does not point to spark.sql.warehouse.dir then follow the old table path
// format
if (!hiveStorePath.equals(carbonStorePath)) {
databaseLocation = CarbonProperties.getStorePath +
CarbonCommonConstants.FILE_SEPARATOR +
dbName
}
}
FileFactory.getUpdatedFilePath(databaseLocation)
}
/**
* Return table path from carbon table. If table does not exist, construct it using
* database location and table name
*/
def getTablePath(
databaseNameOp: Option[String],
tableName: String
)(sparkSession: SparkSession): String = {
try {
getCarbonTable(databaseNameOp, tableName)(sparkSession).getTablePath
} catch {
case _: NoSuchTableException =>
val dbName = getDatabaseName(databaseNameOp)(sparkSession)
val dbLocation = getDatabaseLocation(dbName, sparkSession)
dbLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName
}
}
def getIdentifier(
databaseNameOp: Option[String],
tableName: String
)(sparkSession: SparkSession): AbsoluteTableIdentifier = {
AbsoluteTableIdentifier.from(
getTablePath(databaseNameOp, tableName)(sparkSession),
getDatabaseName(databaseNameOp)(sparkSession),
tableName)
}
def getThreadParam(key: String, defaultValue: String) : String = {
val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
if (null != carbonSessionInfo) {
carbonSessionInfo.getThreadParams.getProperty(key, defaultValue)
} else {
defaultValue
}
}
def getSessionParam(key: String, defaultValue: String) : String = {
val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
if (null != carbonSessionInfo) {
carbonSessionInfo.getThreadParams.getProperty(key, defaultValue)
} else {
defaultValue
}
}
}