blob: da38bae785c959c654a58b87f05f75148833988c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.mv.datamap
import scala.collection.JavaConverters._
import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, ScalaUDF}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Command, DeserializeToObject, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
import org.apache.carbondata.core.util.ThreadLocalSessionInfo
import org.apache.carbondata.datamap.DataMapManager
import org.apache.carbondata.mv.plans.modular.{ModularPlan, Select}
import org.apache.carbondata.mv.rewrite.{SummaryDataset, SummaryDatasetCatalog}
/**
* Analyzer rule to rewrite the query for MV datamap
*
* @param sparkSession
*/
class MVAnalyzerRule(sparkSession: SparkSession) extends Rule[LogicalPlan] {
// TODO Find way better way to get the provider.
private val dataMapProvider =
DataMapManager.get().getDataMapProvider(null,
new DataMapSchema("", DataMapClassProvider.MV.getShortName), sparkSession)
private val LOGGER = LogServiceFactory.getLogService(classOf[MVAnalyzerRule].getName)
override def apply(plan: LogicalPlan): LogicalPlan = {
var needAnalysis = true
plan.transformAllExpressions {
// first check if any mv UDF is applied it is present is in plan
// then call is from create MV so no need to transform the query plan
// TODO Add different UDF name
case al@Alias(udf: ScalaUDF, name) if name.equalsIgnoreCase(CarbonEnv.MV_SKIP_RULE_UDF) =>
needAnalysis = false
al
// in case of query if any unresolve alias is present then wait for plan to be resolved
// return the same plan as we can tranform the plan only when everything is resolved
case unresolveAlias@UnresolvedAlias(_, _) =>
needAnalysis = false
unresolveAlias
case attr@UnresolvedAttribute(_) =>
needAnalysis = false
attr
}
plan.transform {
case aggregate@Aggregate(grp, aExp, child) =>
// check for if plan is for dataload for preaggregate table, then skip applying mv
val isPreAggLoad = aExp.exists { p =>
if (p.isInstanceOf[UnresolvedAlias]) {
false
} else {
p.name.equals(CarbonEnv.MV_SKIP_RULE_UDF)
}
}
if (isPreAggLoad) {
needAnalysis = false
}
Aggregate(grp, aExp, child)
}
val catalog = DataMapStoreManager.getInstance().getDataMapCatalog(dataMapProvider,
DataMapClassProvider.MV.getShortName).asInstanceOf[SummaryDatasetCatalog]
if (needAnalysis && catalog != null && isValidPlan(plan, catalog)) {
val modularPlan = catalog.mvSession.sessionState.rewritePlan(plan).withMVTable
if (modularPlan.find(_.rewritten).isDefined) {
var compactSQL = modularPlan.asCompactSQL
compactSQL = reWriteTheUDFInSQLWithQualifierName(modularPlan, compactSQL)
val analyzed = sparkSession.sql(compactSQL).queryExecution.analyzed
analyzed
} else {
plan
}
} else {
plan
}
}
/**
* This method is specially handled for timeseries on MV, because when we use timeseries UDF which
* is a scala UDF, so after plan matching when query is made. We get as below query for example
*
* SELECT gen_subsumer_0.`UDF:timeseries(projectjoindate, hour)` AS `UDF:timeseries(projectjoi...
* FROM
* (SELECT datamap1_table.`UDF:timeseries_projectjoindate_hour` AS `UDF:timeseries(projectjoin...
* FROM
* default.datamap1_table
* GROUP BY datamap1_table.`UDF:timeseries_projectjoindate_hour`) gen_subsumer_0
* WHERE
* (UDF:timeseries(projectjoindate, hour) = TIMESTAMP('2016-02-23 09:00:00.0'))
*
* Here for Where filter expression is of type ScalaUDF, so when we do .sql() to prepare SQL, we
* get without qualifier name(Refer org.apache.spark.sql.catalyst.expressions.NonSQLExpression)
* which is 'gen_subsumer_0', so this funtion rewrites with qualifier name and returns, so that
* parsing does not fail in spark, for rewritten MV query.
* @param plan Modular Plan
* @param compactSQL compactSQL generated from Modular plan
* @return Rewritten plan with the qualifier names for where clauses in query.
*/
private def reWriteTheUDFInSQLWithQualifierName(plan: ModularPlan, compactSQL: String): String = {
var outPutUDFColumn = ""
var reWrittenQuery = compactSQL
plan match {
case select: Select =>
select.outputList.collect {
case a: Alias if a.child.isInstanceOf[Attribute] =>
val childName = a.child.asInstanceOf[Attribute].name
if (childName.startsWith("UDF:timeseries")) {
outPutUDFColumn = childName
}
}
var queryArray: Array[String] = Array.empty
if (!outPutUDFColumn.equalsIgnoreCase("") && compactSQL.contains("WHERE")) {
queryArray = compactSQL.split("\n")
queryArray(queryArray.indexOf("WHERE") + 1) = queryArray(
queryArray.indexOf("WHERE") + 1).replace(outPutUDFColumn,
s"gen_subsumer_0.`$outPutUDFColumn`")
reWrittenQuery = queryArray.mkString("\n")
}
reWrittenQuery
case _ =>
compactSQL
}
}
/**
* Whether the plan is valid for doing modular plan matching and datamap replacing.
*/
def isValidPlan(plan: LogicalPlan, catalog: SummaryDatasetCatalog): Boolean = {
if (!plan.isInstanceOf[Command] && !plan.isInstanceOf[DeserializeToObject]) {
val catalogs = extractCatalogs(plan)
!isDataMapReplaced(catalog.listAllValidSchema(), catalogs) &&
isDataMapExists(catalog.listAllValidSchema(), catalogs) &&
!isSegmentSetForMainTable(catalogs)
} else {
false
}
}
/**
* Check whether datamap table already updated in the query.
*
* @param mvdataSetArray Array of available mvdataset which include modular plans
* @return Boolean whether already datamap replaced in the plan or not
*/
def isDataMapReplaced(
mvdataSetArray: Array[SummaryDataset],
catalogs: Seq[Option[CatalogTable]]): Boolean = {
catalogs.exists { c =>
mvdataSetArray.exists { mv =>
val identifier = mv.dataMapSchema.getRelationIdentifier
identifier.getTableName.equals(c.get.identifier.table) &&
identifier.getDatabaseName.equals(c.get.database)
}
}
}
/**
* Check whether any suitable datamaps(like datamap which parent tables are present in the plan)
* exists for this plan.
*
* @param mvs
* @return
*/
def isDataMapExists(mvs: Array[SummaryDataset], catalogs: Seq[Option[CatalogTable]]): Boolean = {
catalogs.exists { c =>
mvs.exists { mv =>
mv.dataMapSchema.getParentTables.asScala.exists { identifier =>
identifier.getTableName.equals(c.get.identifier.table) &&
identifier.getDatabaseName.equals(c.get.database)
}
}
}
}
private def extractCatalogs(plan: LogicalPlan): Seq[Option[CatalogTable]] = {
val catalogs = plan collect {
case l: LogicalRelation => l.catalogTable
}
catalogs
}
/**
* Check if any segments are set for main table for Query. If any segments are set, then
* skip mv datamap table for query
*/
def isSegmentSetForMainTable(catalogs: Seq[Option[CatalogTable]]): Boolean = {
catalogs.foreach { c =>
val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
if (carbonSessionInfo != null) {
val segmentsToQuery = carbonSessionInfo.getSessionParams
.getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
c.get.identifier.database.get + "." +
c.get.identifier.table, "")
if (segmentsToQuery.isEmpty || segmentsToQuery.equalsIgnoreCase("*")) {
return false
} else {
return true
}
} else {
return false
}
}
false
}
}