blob: f202a17f457dfa378e3385a11e3cde61fa1dfb78 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.view
import java._
import java.io.IOException
import scala.collection.JavaConverters._
import com.google.gson.Gson
import org.apache.log4j.Logger
import org.apache.spark.sql.{CarbonUtils, SparkSession}
import org.apache.spark.sql.execution.command.management.CarbonInsertIntoCommand
import org.apache.spark.sql.parser.MVQueryParser
import org.apache.carbondata.common.exceptions.sql.NoSuchMVException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.locks.ICarbonLock
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, RelationIdentifier}
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.core.view.{MVSchema, MVStatus}
import org.apache.carbondata.processing.util.CarbonLoaderUtil
class MVRefresher{
}
object MVRefresher {
private val LOGGER: Logger = LogServiceFactory.getLogService(
classOf[MVRefresher].getCanonicalName)
/**
* Refresh the mv by loading all existing data from related table
* This is called when refreshing the mv when
* 1. after mv creation and no "WITH DEFERRED REBUILD" defined
* 2. user manually trigger REFRESH MATERIALIZED VIEW command
*/
@throws[IOException]
@throws[NoSuchMVException]
def refresh(viewSchema: MVSchema, session: SparkSession): Boolean = {
var newLoadName: String = ""
var segmentMap: String = ""
val viewTable: CarbonTable = CarbonTable.buildFromTablePath(
viewSchema.getIdentifier.getTableName,
viewSchema.getIdentifier.getDatabaseName,
viewSchema.getIdentifier.getTablePath,
viewSchema.getIdentifier.getTableId)
val viewIdentifier = viewSchema.getIdentifier
val viewTableIdentifier = viewTable.getAbsoluteTableIdentifier
// Clean up the old invalid segment data before creating a new entry for new load.
SegmentStatusManager.deleteLoadsAndUpdateMetadata(viewTable, false, null)
val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(viewTableIdentifier)
// Acquire table status lock to handle concurrent data loading
val lock: ICarbonLock = segmentStatusManager.getTableStatusLock
val segmentMapping: util.Map[String, util.List[String]] =
new util.HashMap[String, util.List[String]]
val viewManager = MVManagerInSpark.get(session)
try if (lock.lockWithRetries) {
LOGGER.info("Acquired lock for mv " + viewIdentifier + " for table status update")
val viewTableMetadataPath: String =
CarbonTablePath.getMetadataPath(viewIdentifier.getTablePath)
val loadMetadataDetails = SegmentStatusManager.readLoadMetadata(viewTableMetadataPath)
val loadMetadataDetailList: util.List[LoadMetadataDetails] =
new util.ArrayList[LoadMetadataDetails](CarbonCommonConstants.DEFAULT_COLLECTION_SIZE)
// Mark for delete all stale loadMetataDetail
for (loadMetadataDetail <- loadMetadataDetails) {
if (((loadMetadataDetail.getSegmentStatus eq SegmentStatus.INSERT_IN_PROGRESS) ||
(loadMetadataDetail.getSegmentStatus eq SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS)) &&
loadMetadataDetail.getVisibility.equalsIgnoreCase("false")) {
loadMetadataDetail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE)
}
loadMetadataDetailList.add(loadMetadataDetail)
}
if (viewSchema.isRefreshOnManual) {
// check if rebuild to mv is already in progress and throw exception
if (loadMetadataDetails.nonEmpty) {
for (loadMetaDetail <- loadMetadataDetails) {
if (((loadMetaDetail.getSegmentStatus eq SegmentStatus.INSERT_IN_PROGRESS) ||
(loadMetaDetail.getSegmentStatus eq SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS)) &&
SegmentStatusManager.isLoadInProgress(viewTableIdentifier,
loadMetaDetail.getLoadName)) {
throw new RuntimeException(
"Rebuild to materialized view " + viewSchema.getIdentifier.getTableName +
" is already in progress")
}
}
}
}
if (viewSchema.isRefreshIncremental) {
if (!getSpecificSegmentsTobeLoaded(viewSchema, segmentMapping, loadMetadataDetailList)) {
return false
}
} else {
// set segment mapping only for carbondata table
val relatedTableIds =
viewSchema.getRelatedTables.asScala.filter(_.isCarbonDataTable)
for (relatedTableId <- relatedTableIds) {
val relatedTableSegmentList: util.List[String] =
SegmentStatusManager.getValidSegmentList(relatedTableId)
if (relatedTableSegmentList.isEmpty) {
return false
}
segmentMapping.put(relatedTableId.toString, relatedTableSegmentList)
}
}
segmentMap = new Gson().toJson(segmentMapping)
// To handle concurrent data loading to mv, create new loadMetaEntry and
// set segmentMap to new loadMetaEntry and pass new segmentId with load command
val loadMetadataDetail: LoadMetadataDetails = new LoadMetadataDetails
val segmentId: String = String.valueOf(
SegmentStatusManager.createNewSegmentId(loadMetadataDetails))
loadMetadataDetail.setLoadName(segmentId)
loadMetadataDetail.setSegmentStatus(SegmentStatus.INSERT_IN_PROGRESS)
loadMetadataDetail.setExtraInfo(segmentMap)
loadMetadataDetailList.add(loadMetadataDetail)
newLoadName = segmentId
SegmentStatusManager.writeLoadDetailsIntoFile(CarbonTablePath.getTableStatusFilePath(
viewSchema.getIdentifier.getTablePath),
loadMetadataDetailList.toArray(new Array[LoadMetadataDetails](loadMetadataDetailList
.size)))
} else {
LOGGER.error("Not able to acquire the lock for table status update for table " +
viewSchema.getIdentifier.getDatabaseName + "." +
viewSchema.getIdentifier.getTableName)
viewManager.setStatus(viewSchema.getIdentifier, MVStatus.DISABLED)
return false
} finally {
if (lock.unlock) {
LOGGER.info("Table unlocked successfully after table status update" +
viewSchema.getIdentifier.getDatabaseName + "." +
viewSchema.getIdentifier.getTableName)
} else {
LOGGER.error("Unable to unlock Table lock for table" +
viewSchema.getIdentifier.getDatabaseName + "." +
viewSchema.getIdentifier.getTableName +
" during table status update")
}
}
refreshInternal(viewManager, viewSchema, viewTable, newLoadName, segmentMapping, session)
}
@throws[IOException]
private def refreshInternal(
viewManager: MVManagerInSpark,
viewSchema: MVSchema,
viewTable: CarbonTable,
newLoadName: String,
segmentMap: java.util.Map[String, java.util.List[String]],
session: SparkSession): Boolean = {
val query = viewSchema.getQuery
if (query != null) {
val viewIdentifier = viewSchema.getIdentifier
val updatedQuery = MVQueryParser.getQuery(query, session)
val isFullRefresh = !viewSchema.isRefreshIncremental
// Set specified segments for incremental load
val segmentMapIterator = segmentMap.entrySet().iterator()
while (segmentMapIterator.hasNext) {
val entry = segmentMapIterator.next()
setInputSegments(entry.getKey, entry.getValue)
}
val header = viewTable.getTableInfo.getFactTable.getListOfColumns.asScala
.filter { column =>
!column.getColumnName
.equalsIgnoreCase(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)
}.sortBy(_.getSchemaOrdinal).map(_.getColumnName).mkString(",")
val insertIntoCommand = CarbonInsertIntoCommand(
databaseNameOp = Some(viewIdentifier.getDatabaseName),
tableName = viewIdentifier.getTableName,
options = scala.collection.immutable.Map("fileheader" -> header),
isFullRefresh,
logicalPlan = updatedQuery.queryExecution.analyzed,
tableInfo = viewTable.getTableInfo,
internalOptions = Map("mergedSegmentName" -> newLoadName,
CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true"),
partition = Map.empty)
try {
insertIntoCommand.run(session)
} catch {
case exception: Exception =>
// If load to mv table fails, disable the mv and if newLoad is still
// in INSERT_IN_PROGRESS state, mark for delete the newLoad and update table status file
viewManager.setStatus(viewSchema.getIdentifier, MVStatus.DISABLED)
LOGGER.error("Data Load failed for mv: ", exception)
CarbonLoaderUtil.updateTableStatusInCaseOfFailure(
newLoadName, viewTable, SegmentStatus.INSERT_IN_PROGRESS)
throw exception
} finally {
unsetInputSegments(viewSchema)
}
}
true
}
/**
* This method will compare main table and mv table segment List and loads only newly added
* segment from main table to mv table.
* In case if mainTable is compacted, then based on mv to main tables segmentMapping, mv
* will be loaded
* Eg:
* case 1: Consider mainTableSegmentList: {0, 1, 2}, mvToMainTable segmentMap:
* { 0 -> 0, 1-> 1,2}. If (1, 2) segments of main table are compacted to 1.1 and new segment (3)
* is loaded to main table, then mainTableSegmentList will be updated to{0, 1.1, 3}.
* In this case, segment (1) of mv table will be marked for delete, and new segment
* {2 -> 1.1, 3} will be loaded to mv table
* case 2: Consider mainTableSegmentList: {0, 1, 2, 3}, mvToMainTable segmentMap:
* { 0 -> 0,1,2, 1-> 3}. If (1, 2) segments of main table are compacted to 1.1 and new segment
* (4) is loaded to main table, then mainTableSegmentList will be updated to {0, 1.1, 3, 4}.
* In this case, segment (0) of mv table will be marked for delete and segment (0) of
* main table will be added to validSegmentList which needs to be loaded again. Now, new mv
* table segment (2) with main table segmentList{2 -> 1.1, 4, 0} will be loaded to mv table.
* mvToMainTable segmentMap will be updated to {1 -> 3, 2 -> 1.1, 4, 0} after rebuild
*/
@throws[IOException]
private def getSpecificSegmentsTobeLoaded(schema: MVSchema,
segmentMapping: util.Map[String, util.List[String]],
listOfLoadFolderDetails: util.List[LoadMetadataDetails]): Boolean = {
val relationIdentifiers: util.List[RelationIdentifier] = schema.getRelatedTables
// invalidSegmentList holds segment list which needs to be marked for delete
val invalidSegmentList: util.HashSet[String] = new util.HashSet[String]
if (listOfLoadFolderDetails.isEmpty) {
// If segment Map is empty, load all valid segments from main tables to mv
for (relationIdentifier <- relationIdentifiers.asScala) {
val mainTableSegmentList: util.List[String] = SegmentStatusManager.getValidSegmentList(
relationIdentifier)
// If mainTableSegmentList is empty, no need to trigger load command
// TODO: handle in case of multiple tables load to mv table
if (mainTableSegmentList.isEmpty) return false
segmentMapping.put(relationIdentifier.getDatabaseName + CarbonCommonConstants.POINT +
relationIdentifier.getTableName, mainTableSegmentList)
}
}
else {
for (relationIdentifier <- relationIdentifiers.asScala) {
val segmentList: util.List[String] = new util.ArrayList[String]
// Get all segments for parent relationIdentifier
val mainTableSegmentList: util.List[String] = SegmentStatusManager.getValidSegmentList(
relationIdentifier)
var ifTableStatusUpdateRequired: Boolean = false
for (loadMetaDetail <- listOfLoadFolderDetails.asScala) {
if ((loadMetaDetail.getSegmentStatus eq SegmentStatus.SUCCESS) ||
(loadMetaDetail.getSegmentStatus eq SegmentStatus.INSERT_IN_PROGRESS)) {
val segmentMaps: util.Map[String, util.List[String]] =
new Gson().fromJson(loadMetaDetail.getExtraInfo, classOf[util.Map[_, _]])
val mainTableMetaDataPath: String = CarbonTablePath.getMetadataPath(relationIdentifier
.getTablePath)
val parentTableLoadMetaDataDetails: Array[LoadMetadataDetails] = SegmentStatusManager
.readLoadMetadata(
mainTableMetaDataPath)
val table: String = relationIdentifier.getDatabaseName + CarbonCommonConstants.POINT +
relationIdentifier.getTableName
for (segmentId <- mainTableSegmentList.asScala) {
// In case if mv segment(0) is mapped
// to mainTable segments{0,1,2} and if
// {0,1,2} segments of mainTable are compacted to 0.1. Then,
// on next rebuild/load to mv, no need to load segment(0.1) again. Update the
// segmentMapping of mv segment from {0,1,2} to {0.1}
if (!checkIfSegmentsToBeReloaded(parentTableLoadMetaDataDetails,
segmentMaps.get(table),
segmentId)) {
ifTableStatusUpdateRequired = true
// Update loadMetaDetail with updated segment info and clear old segmentMap
val updatedSegmentMap: util.Map[String, util.List[String]] =
new util.HashMap[String, util.List[String]]
val segmentIdList: util.List[String] = new util.ArrayList[String]
segmentIdList.add(segmentId)
updatedSegmentMap.put(table, segmentIdList)
segmentList.add(segmentId)
loadMetaDetail.setExtraInfo(new Gson().toJson(updatedSegmentMap))
segmentMaps.get(table).clear()
}
}
segmentList.addAll(segmentMaps.get(table))
}
}
val originSegmentList: util.List[String] = new util.ArrayList[String](segmentList)
segmentList.removeAll(mainTableSegmentList)
mainTableSegmentList.removeAll(originSegmentList)
if (ifTableStatusUpdateRequired && mainTableSegmentList.isEmpty) {
SegmentStatusManager.writeLoadDetailsIntoFile(CarbonTablePath.getTableStatusFilePath(
schema.getIdentifier.getTablePath),
listOfLoadFolderDetails.toArray(new Array[LoadMetadataDetails](listOfLoadFolderDetails
.size)))
return false
} else if (mainTableSegmentList.isEmpty) {
return false
}
if (!segmentList.isEmpty) {
val invalidMainTableSegmentList: util.List[String] = new util.ArrayList[String]
// validMainTableSegmentList holds segment list which needs to be loaded again
val validMainTableSegmentList: util.HashSet[String] = new util.HashSet[String]
// For mv segments which are not in main table segment list(if main table
// is compacted), iterate over those segments and get mv segments which needs to
// be marked for delete and main table segments which needs to be loaded again
for (segmentId <- segmentList.asScala) {
for (loadMetaDetail <- listOfLoadFolderDetails.asScala) {
if ((loadMetaDetail.getSegmentStatus eq SegmentStatus.SUCCESS) ||
(loadMetaDetail.getSegmentStatus eq SegmentStatus.INSERT_IN_PROGRESS)) {
val segmentMaps: util.Map[String, util.List[String]] =
new Gson().fromJson(loadMetaDetail.getExtraInfo, classOf[util.Map[_, _]])
val segmentIds: util.List[String] = segmentMaps.get(
relationIdentifier.getDatabaseName + CarbonCommonConstants.POINT +
relationIdentifier.getTableName)
if (segmentIds.contains(segmentId)) {
segmentIds.remove(segmentId)
validMainTableSegmentList.addAll(segmentIds)
invalidMainTableSegmentList.add(segmentId)
invalidSegmentList.add(loadMetaDetail.getLoadName)
}
}
}
}
// remove invalid segment from validMainTableSegmentList if present
validMainTableSegmentList.removeAll(invalidMainTableSegmentList)
// Add all valid segments of main table which needs to be loaded again
mainTableSegmentList.addAll(validMainTableSegmentList)
segmentMapping.put(relationIdentifier.getDatabaseName + CarbonCommonConstants.POINT +
relationIdentifier.getTableName, mainTableSegmentList)
}
else segmentMapping.put(relationIdentifier.getDatabaseName + CarbonCommonConstants.POINT +
relationIdentifier.getTableName, mainTableSegmentList)
}
}
// Remove invalid mv segments
if (!invalidSegmentList.isEmpty) {
for (loadMetadataDetail <- listOfLoadFolderDetails.asScala) {
if (invalidSegmentList.contains(loadMetadataDetail.getLoadName)) {
loadMetadataDetail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE)
}
}
}
true
}
/**
* This method checks if mv table segment has to be reloaded again or not
*/
private def checkIfSegmentsToBeReloaded(loadMetaDataDetails: Array[LoadMetadataDetails],
segmentIds: util.List[String],
segmentId: String): Boolean = {
var isToBeLoadedAgain: Boolean = true
val mergedSegments: util.List[String] = new util.ArrayList[String]
for (loadMetadataDetail <- loadMetaDataDetails) {
if (null != loadMetadataDetail.getMergedLoadName &&
loadMetadataDetail.getMergedLoadName.equalsIgnoreCase(segmentId)) {
mergedSegments.add(loadMetadataDetail.getLoadName)
}
}
if (!mergedSegments.isEmpty && segmentIds.containsAll(mergedSegments)) {
isToBeLoadedAgain = false
}
isToBeLoadedAgain
}
/**
* This method will set main table segments which needs to be loaded to mv
*/
private def setInputSegments(tableUniqueName: String,
mainTableSegmentList: java.util.List[String]): Unit = {
CarbonUtils
.threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
tableUniqueName, mainTableSegmentList.asScala.mkString(","))
}
private def unsetInputSegments(schema: MVSchema): Unit = {
val relatedTableIdentifiers = schema.getRelatedTables
for (relationIdentifier <- relatedTableIdentifiers.asScala) {
CarbonUtils
.threadUnset(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
relationIdentifier.getDatabaseName + "." +
relationIdentifier.getTableName)
}
}
}