| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.carbondata.mv.rewrite |
| |
| import java.util.concurrent.locks.ReentrantReadWriteLock |
| |
| import org.apache.spark.sql.{DataFrame, SparkSession} |
| import org.apache.spark.sql.catalyst.TableIdentifier |
| import org.apache.spark.sql.catalyst.expressions.Attribute |
| import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan |
| import org.apache.spark.sql.execution.datasources.FindDataSourceTable |
| import org.apache.spark.sql.parser.CarbonSpark2SqlParser |
| import org.apache.spark.sql.util.SparkSQLUtil |
| |
| import org.apache.carbondata.core.datamap.DataMapCatalog |
| import org.apache.carbondata.core.datamap.status.DataMapStatusManager |
| import org.apache.carbondata.core.metadata.schema.table.DataMapSchema |
| import org.apache.carbondata.mv.datamap.MVHelper |
| import org.apache.carbondata.mv.plans.modular.{Flags, ModularPlan, ModularRelation, Select} |
| import org.apache.carbondata.mv.plans.util.Signature |
| import org.apache.carbondata.mv.session.MVSession |
| |
| |
| /** Holds a summary logical plan */ |
| private[mv] case class SummaryDataset(signature: Option[Signature], |
| plan: LogicalPlan, |
| dataMapSchema: DataMapSchema, |
| relation: ModularPlan) |
| |
| /** |
| * It is wrapper on datamap relation along with schema. |
| */ |
| case class MVPlanWrapper(plan: ModularPlan, dataMapSchema: DataMapSchema) extends ModularPlan { |
| override def output: Seq[Attribute] = plan.output |
| |
| override def children: Seq[ModularPlan] = plan.children |
| } |
| |
| private[mv] class SummaryDatasetCatalog(sparkSession: SparkSession) |
| extends DataMapCatalog[SummaryDataset] { |
| |
| @transient |
| private val summaryDatasets = new scala.collection.mutable.ArrayBuffer[SummaryDataset] |
| |
| val mvSession = new MVSession(sparkSession, this) |
| |
| @transient |
| private val registerLock = new ReentrantReadWriteLock |
| |
| /** |
| * parser |
| */ |
| lazy val parser = new CarbonSpark2SqlParser |
| |
| |
| /** Acquires a read lock on the catalog for the duration of `f`. */ |
| private def readLock[A](f: => A): A = { |
| val lock = registerLock.readLock() |
| lock.lock() |
| try f finally { |
| lock.unlock() |
| } |
| } |
| |
| /** Acquires a write lock on the catalog for the duration of `f`. */ |
| private def writeLock[A](f: => A): A = { |
| val lock = registerLock.writeLock() |
| lock.lock() |
| try f finally { |
| lock.unlock() |
| } |
| } |
| |
| /** Clears all summary tables. */ |
| private[mv] def refresh(): Unit = { |
| writeLock { |
| summaryDatasets.clear() |
| } |
| } |
| |
| /** Checks if the catalog is empty. */ |
| private[mv] def isEmpty: Boolean = { |
| readLock { |
| summaryDatasets.isEmpty |
| } |
| } |
| |
| /** |
| * Registers the data produced by the logical representation of the given [[DataFrame]]. Unlike |
| * `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because recomputing |
| * the in-memory columnar representation of the underlying table is expensive. |
| */ |
| private[mv] def registerSchema(dataMapSchema: DataMapSchema): Unit = { |
| writeLock { |
| val updatedQuery = parser.addMVSkipFunction(dataMapSchema.getCtasQuery) |
| val query = sparkSession.sql(updatedQuery) |
| val planToRegister = MVHelper.dropDummFuc(query.queryExecution.analyzed) |
| val modularPlan = |
| mvSession.sessionState.modularizer.modularize( |
| mvSession.sessionState.optimizer.execute(planToRegister)).next().semiHarmonized |
| val signature = modularPlan.signature |
| val identifier = dataMapSchema.getRelationIdentifier |
| val output = new FindDataSourceTable(sparkSession) |
| .apply(SparkSQLUtil.sessionState(sparkSession).catalog |
| .lookupRelation(TableIdentifier(identifier.getTableName, Some(identifier.getDatabaseName)))) |
| .output |
| val relation = ModularRelation(identifier.getDatabaseName, |
| identifier.getTableName, |
| output, |
| Flags.NoFlags, |
| Seq.empty) |
| val select = Select(relation.outputList, |
| relation.outputList, |
| Seq.empty, |
| Seq((0, identifier.getTableName)).toMap, |
| Seq.empty, |
| Seq(relation), |
| Flags.NoFlags, |
| Seq.empty, |
| Seq.empty, |
| None) |
| |
| summaryDatasets += SummaryDataset( |
| signature, |
| planToRegister, |
| dataMapSchema, |
| MVPlanWrapper(select, dataMapSchema)) |
| } |
| } |
| |
| /** Removes the given [[DataFrame]] from the catalog */ |
| private[mv] def unregisterSchema(dataMapName: String): Unit = { |
| writeLock { |
| val dataIndex = summaryDatasets |
| .indexWhere(sd => sd.dataMapSchema.getDataMapName.equals(dataMapName)) |
| require(dataIndex >= 0, s"Datamap $dataMapName is not registered.") |
| summaryDatasets.remove(dataIndex) |
| } |
| } |
| |
| |
| override def listAllValidSchema(): Array[SummaryDataset] = { |
| val statusDetails = DataMapStatusManager.getEnabledDataMapStatusDetails |
| // Only select the enabled datamaps for the query. |
| val enabledDataSets = summaryDatasets.filter { p => |
| statusDetails.exists(_.getDataMapName.equalsIgnoreCase(p.dataMapSchema.getDataMapName)) |
| } |
| enabledDataSets.toArray |
| } |
| |
| /** |
| * API for test only |
| * |
| * Registers the data produced by the logical representation of the given [[DataFrame]]. Unlike |
| * `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because recomputing |
| * the in-memory columnar representation of the underlying table is expensive. |
| */ |
| private[mv] def registerSummaryDataset( |
| query: DataFrame, |
| tableName: Option[String] = None): Unit = { |
| writeLock { |
| val planToRegister = query.queryExecution.analyzed |
| if (lookupSummaryDataset(planToRegister).nonEmpty) { |
| sys.error(s"Asked to register already registered.") |
| } else { |
| val modularPlan = |
| mvSession.sessionState.modularizer.modularize( |
| mvSession.sessionState.optimizer.execute(planToRegister)).next().semiHarmonized |
| val signature = modularPlan.signature |
| summaryDatasets += |
| SummaryDataset(signature, planToRegister, null, null) |
| } |
| } |
| } |
| |
| /** Removes the given [[DataFrame]] from the catalog */ |
| private[mv] def unregisterSummaryDataset(query: DataFrame): Unit = { |
| writeLock { |
| val planToRegister = query.queryExecution.analyzed |
| val dataIndex = summaryDatasets.indexWhere(sd => planToRegister.sameResult(sd.plan)) |
| require(dataIndex >= 0, s"Table $query is not registered.") |
| summaryDatasets.remove(dataIndex) |
| } |
| } |
| |
| /** |
| * Check already with same query present in mv |
| */ |
| private[mv] def isMVWithSameQueryPresent(query: LogicalPlan) : Boolean = { |
| lookupSummaryDataset(query).nonEmpty |
| } |
| |
| /** |
| * API for test only |
| * Tries to remove the data set for the given [[DataFrame]] from the catalog if it's registered |
| */ |
| private[mv] def tryUnregisterSummaryDataset( |
| query: DataFrame, |
| blocking: Boolean = true): Boolean = { |
| writeLock { |
| val planToRegister = query.queryExecution.analyzed |
| val dataIndex = summaryDatasets.indexWhere(sd => planToRegister.sameResult(sd.plan)) |
| val found = dataIndex >= 0 |
| if (found) { |
| summaryDatasets.remove(dataIndex) |
| } |
| found |
| } |
| } |
| |
| /** Optionally returns registered data set for the given [[DataFrame]] */ |
| private[mv] def lookupSummaryDataset(query: DataFrame): Option[SummaryDataset] = { |
| readLock { |
| lookupSummaryDataset(query.queryExecution.analyzed) |
| } |
| } |
| |
| /** Returns feasible registered summary data sets for processing the given ModularPlan. */ |
| private[mv] def lookupSummaryDataset(plan: LogicalPlan): Option[SummaryDataset] = { |
| readLock { |
| summaryDatasets.find(sd => plan.sameResult(sd.plan)) |
| } |
| } |
| |
| |
| /** Returns feasible registered summary data sets for processing the given ModularPlan. */ |
| private[mv] def lookupFeasibleSummaryDatasets(plan: ModularPlan): Seq[SummaryDataset] = { |
| readLock { |
| val sig = plan.signature |
| val statusDetails = DataMapStatusManager.getEnabledDataMapStatusDetails |
| // Only select the enabled datamaps for the query. |
| val enabledDataSets = summaryDatasets.filter { p => |
| statusDetails.exists(_.getDataMapName.equalsIgnoreCase(p.dataMapSchema.getDataMapName)) |
| } |
| |
| // ****not sure what enabledDataSets is used for **** |
| // can enable/disable datamap move to other place ? |
| // val feasible = enabledDataSets.filter { x => |
| val feasible = enabledDataSets.filter { x => |
| (x.signature, sig) match { |
| case (Some(sig1), Some(sig2)) => |
| if (sig1.groupby && sig2.groupby && sig1.datasets.subsetOf(sig2.datasets)) { |
| true |
| } else if (!sig1.groupby && !sig2.groupby && sig1.datasets.subsetOf(sig2.datasets)) { |
| true |
| } else { |
| false |
| } |
| |
| case _ => false |
| } |
| } |
| // heuristics: more tables involved in a summary data set => higher query reduction factor |
| feasible.sortWith(_.signature.get.datasets.size > _.signature.get.datasets.size) |
| } |
| } |
| } |