| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.carbondata.mv.datamap |
| |
| import java.io.IOException |
| |
| import scala.collection.JavaConverters._ |
| |
| import org.apache.spark.sql.{CarbonEnv, CarbonSession, SparkSession} |
| import org.apache.spark.sql.catalyst.TableIdentifier |
| import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias |
| import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand |
| import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand |
| import org.apache.spark.sql.execution.datasources.FindDataSourceTable |
| import org.apache.spark.sql.parser.CarbonSpark2SqlParser |
| import org.apache.spark.sql.util.SparkSQLUtil |
| |
| import org.apache.carbondata.common.annotations.InterfaceAudience |
| import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException |
| import org.apache.carbondata.common.logging.LogServiceFactory |
| import org.apache.carbondata.core.constants.CarbonCommonConstants |
| import org.apache.carbondata.core.datamap.{DataMapCatalog, DataMapProvider, DataMapStoreManager} |
| import org.apache.carbondata.core.datamap.dev.{DataMap, DataMapFactory} |
| import org.apache.carbondata.core.datamap.status.DataMapStatusManager |
| import org.apache.carbondata.core.indexstore.Blocklet |
| import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema} |
| import org.apache.carbondata.mv.rewrite.{SummaryDataset, SummaryDatasetCatalog} |
| import org.apache.carbondata.processing.util.CarbonLoaderUtil |
| |
| @InterfaceAudience.Internal |
| class MVDataMapProvider( |
| mainTable: CarbonTable, |
| sparkSession: SparkSession, |
| dataMapSchema: DataMapSchema) |
| extends DataMapProvider(mainTable, dataMapSchema) { |
| protected var dropTableCommand: CarbonDropTableCommand = null |
| |
| private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) |
| |
| @throws[MalformedDataMapCommandException] |
| @throws[IOException] |
| override def initMeta(ctasSqlStatement: String): Unit = { |
| if (ctasSqlStatement == null) { |
| throw new MalformedDataMapCommandException( |
| "select statement is mandatory") |
| } |
| MVHelper.createMVDataMap(sparkSession, |
| dataMapSchema, |
| ctasSqlStatement, |
| true, |
| mainTable) |
| try { |
| DataMapStoreManager.getInstance.registerDataMapCatalog(this, dataMapSchema) |
| if (dataMapSchema.isLazy) { |
| DataMapStatusManager.disableDataMap(dataMapSchema.getDataMapName) |
| } |
| } catch { |
| case exception: Exception => |
| dropTableCommand = new CarbonDropTableCommand(true, |
| new Some[String](dataMapSchema.getRelationIdentifier.getDatabaseName), |
| dataMapSchema.getRelationIdentifier.getTableName, |
| true) |
| dropTableCommand.run(sparkSession) |
| DataMapStoreManager.getInstance().dropDataMapSchema(dataMapSchema.getDataMapName) |
| throw exception |
| } |
| } |
| |
| override def initData(): Unit = { |
| if (!dataMapSchema.isLazy) { |
| if (rebuild()) { |
| DataMapStatusManager.enableDataMap(dataMapSchema.getDataMapName) |
| } |
| } |
| } |
| |
| @throws[IOException] |
| override def cleanMeta(): Unit = { |
| dropTableCommand = new CarbonDropTableCommand(true, |
| new Some[String](dataMapSchema.getRelationIdentifier.getDatabaseName), |
| dataMapSchema.getRelationIdentifier.getTableName, |
| true) |
| dropTableCommand.processMetadata(sparkSession) |
| // First, drop datamapschema and unregister datamap from catalog, because if in |
| // case, unregister fails, datamapschema will not be deleted from system and cannot |
| // create datamap also again |
| try { |
| DataMapStoreManager.getInstance().dropDataMapSchema(dataMapSchema.getDataMapName) |
| } catch { |
| case e: IOException => |
| throw e |
| } finally { |
| DataMapStoreManager.getInstance.unRegisterDataMapCatalog(dataMapSchema) |
| } |
| } |
| |
| override def cleanData(): Unit = { |
| if (dropTableCommand != null) { |
| dropTableCommand.processData(sparkSession) |
| } |
| } |
| |
| @throws[IOException] |
| override def rebuildInternal(newLoadName: String, |
| segmentMap: java.util.Map[String, java.util.List[String]], |
| dataMapTable: CarbonTable): Boolean = { |
| val ctasQuery = dataMapSchema.getCtasQuery |
| if (ctasQuery != null) { |
| val identifier = dataMapSchema.getRelationIdentifier |
| val updatedQuery = new CarbonSpark2SqlParser().addMVSkipFunction(ctasQuery) |
| val queryPlan = SparkSQLUtil.execute( |
| sparkSession.sql(updatedQuery).queryExecution.analyzed, |
| sparkSession).drop("mv") |
| var isOverwriteTable = false |
| val isFullRefresh = |
| if (null != dataMapSchema.getProperties.get("full_refresh")) { |
| dataMapSchema.getProperties.get("full_refresh").toBoolean |
| } else { |
| false |
| } |
| if (isFullRefresh) { |
| isOverwriteTable = true |
| } |
| // Set specified segments for incremental load |
| val segmentMapIterator = segmentMap.entrySet().iterator() |
| while (segmentMapIterator.hasNext) { |
| val entry = segmentMapIterator.next() |
| setSegmentsToLoadDataMap(entry.getKey, entry.getValue) |
| } |
| val header = dataMapTable.getTableInfo.getFactTable.getListOfColumns.asScala |
| .filter { column => |
| !column.getColumnName |
| .equalsIgnoreCase(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE) |
| }.sortBy(_.getSchemaOrdinal).map(_.getColumnName).mkString(",") |
| val loadCommand = CarbonLoadDataCommand( |
| databaseNameOp = Some(identifier.getDatabaseName), |
| tableName = identifier.getTableName, |
| factPathFromUser = null, |
| dimFilesPath = Seq(), |
| options = scala.collection.immutable.Map("fileheader" -> header), |
| isOverwriteTable, |
| inputSqlString = null, |
| dataFrame = Some(queryPlan), |
| updateModel = None, |
| tableInfoOp = None, |
| internalOptions = Map("mergedSegmentName" -> newLoadName, |
| CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true"), |
| partition = Map.empty) |
| |
| try { |
| SparkSQLUtil.execute(loadCommand, sparkSession) |
| } catch { |
| case ex: Exception => |
| // If load to dataMap table fails, disable the dataMap and if newLoad is still |
| // in INSERT_IN_PROGRESS state, mark for delete the newLoad and update table status file |
| DataMapStatusManager.disableDataMap(dataMapSchema.getDataMapName) |
| LOGGER.error("Data Load failed for DataMap: ", ex) |
| CarbonLoaderUtil.updateTableStatusInCaseOfFailure( |
| newLoadName, |
| dataMapTable.getAbsoluteTableIdentifier, |
| dataMapTable.getTableName, |
| dataMapTable.getDatabaseName, |
| dataMapTable.getTablePath, |
| dataMapTable.getMetadataPath) |
| throw ex |
| } finally { |
| unsetMainTableSegments() |
| } |
| } |
| true |
| } |
| |
| /** |
| * This method will set main table segments which needs to be loaded to mv dataMap |
| */ |
| private def setSegmentsToLoadDataMap(tableUniqueName: String, |
| mainTableSegmentList: java.util.List[String]): Unit = { |
| CarbonSession |
| .threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + |
| tableUniqueName, mainTableSegmentList.asScala.mkString(",")) |
| } |
| |
| private def unsetMainTableSegments(): Unit = { |
| val relationIdentifiers = dataMapSchema.getParentTables.asScala |
| for (relationIdentifier <- relationIdentifiers) { |
| CarbonSession |
| .threadUnset(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + |
| relationIdentifier.getDatabaseName + "." + |
| relationIdentifier.getTableName) |
| } |
| } |
| |
| override def createDataMapCatalog : DataMapCatalog[SummaryDataset] = |
| new SummaryDatasetCatalog(sparkSession) |
| |
| override def getDataMapFactory: DataMapFactory[_ <: DataMap[_ <: Blocklet]] = { |
| throw new UnsupportedOperationException |
| } |
| |
| override def supportRebuild(): Boolean = true |
| } |