blob: 74f2f53b6cc567c26e419570942fa9e29e95dff8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.secondaryindex.events
import java.util
import scala.collection.JavaConverters._
import org.apache.log4j.Logger
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.index.CarbonIndexUtil
import org.apache.spark.sql.secondaryindex.command.IndexModel
import org.apache.spark.sql.secondaryindex.load.CarbonInternalLoaderUtil
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
import org.apache.carbondata.core.metadata.index.IndexType
import org.apache.carbondata.core.metadata.schema.indextable.IndexMetadata
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{Event, OperationContext, OperationEventListener}
import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePostStatusUpdateEvent
/**
* This Listener is to load the data to failed segments of Secondary index table(s)
*/
class SILoadEventListenerForFailedSegments extends OperationEventListener with Logging {
val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
/**
* Called on a specified event occurrence
*/
override protected def onEvent(event: Event, operationContext: OperationContext): Unit = {
event match {
case postStatusUpdateEvent: LoadTablePostStatusUpdateEvent =>
LOGGER.info("Load post status update event-listener called")
val loadTablePostStatusUpdateEvent = event.asInstanceOf[LoadTablePostStatusUpdateEvent]
val carbonLoadModel = loadTablePostStatusUpdateEvent.getCarbonLoadModel
val sparkSession = SparkSession.getActiveSession.get
// when Si creation and load to main table are parallel, get the carbonTable from the
// metastore which will have the latest index Info
val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
val carbonTable = metaStore
.lookupRelation(Some(carbonLoadModel.getDatabaseName),
carbonLoadModel.getTableName)(sparkSession).asInstanceOf[CarbonRelation].carbonTable
val indexMetadata = carbonTable.getIndexMetadata
val secondaryIndexProvider = IndexType.SI.getIndexProviderName
if (null != indexMetadata && null != indexMetadata.getIndexesMap &&
null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) {
val indexTables = indexMetadata.getIndexesMap
.get(secondaryIndexProvider).keySet().asScala
// if there are no index tables for a given fact table do not perform any action
if (indexTables.nonEmpty) {
val mainTableDetails =
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
indexTables.foreach {
indexTableName =>
val isLoadSIForFailedSegments = sparkSession.sessionState.catalog
.getTableMetadata(TableIdentifier(indexTableName,
Some(carbonLoadModel.getDatabaseName))).storage.properties
.getOrElse("isSITableEnabled", "true").toBoolean
val indexTable = metaStore
.lookupRelation(Some(carbonLoadModel.getDatabaseName), indexTableName)(
sparkSession)
.asInstanceOf[CarbonRelation]
.carbonTable
val mainTblLoadMetadataDetails: Array[LoadMetadataDetails] =
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
if (!isLoadSIForFailedSegments
|| !CarbonInternalLoaderUtil.checkMainTableSegEqualToSISeg(
mainTblLoadMetadataDetails,
siTblLoadMetadataDetails)) {
val indexColumns = indexMetadata.getIndexColumns(secondaryIndexProvider,
indexTableName)
val secondaryIndex = IndexModel(Some(carbonTable.getDatabaseName),
indexMetadata.getParentTableName,
indexColumns.split(",").toList,
indexTableName)
var details = SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
// If it empty, then no need to do further computations because the
// tabletstatus might not have been created and hence next load will take care
if (details.isEmpty) {
return
}
val failedLoadMetadataDetails: java.util.List[LoadMetadataDetails] = new util
.ArrayList[LoadMetadataDetails]()
// read the details of SI table and get all the failed segments during SI
// creation which are MARKED_FOR_DELETE or invalid INSERT_IN_PROGRESS
details.collect {
case loadMetaDetail: LoadMetadataDetails =>
if (loadMetaDetail.getSegmentStatus == SegmentStatus.MARKED_FOR_DELETE &&
checkIfMainTableLoadIsValid(mainTableDetails,
loadMetaDetail.getLoadName)) {
failedLoadMetadataDetails.add(loadMetaDetail)
} else if ((loadMetaDetail.getSegmentStatus ==
SegmentStatus.INSERT_IN_PROGRESS ||
loadMetaDetail.getSegmentStatus ==
SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) &&
checkIfMainTableLoadIsValid(mainTableDetails,
loadMetaDetail.getLoadName)) {
val segmentLock = CarbonLockFactory
.getCarbonLockObj(indexTable.getAbsoluteTableIdentifier,
CarbonTablePath.addSegmentPrefix(loadMetaDetail.getLoadName) +
LockUsage.LOCK)
try {
if (segmentLock.lockWithRetries(1, 0)) {
LOGGER
.info("SIFailedLoadListener: Acquired segment lock on segment:" +
loadMetaDetail.getLoadName)
failedLoadMetadataDetails.add(loadMetaDetail)
}
} finally {
segmentLock.unlock()
LOGGER
.info("SIFailedLoadListener: Released segment lock on segment:" +
loadMetaDetail.getLoadName)
}
}
}
// check for the skipped segments. compare the main table and SI table table
// status file and get the skipped segments if any
CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails).asScala
.foreach(metadataDetail => {
val detail = details
.filter(metadata => metadata.getLoadName.equals(metadataDetail))
if (null == detail || detail.length == 0) {
val newDetails = new LoadMetadataDetails
newDetails.setLoadName(metadataDetail)
LOGGER.error("Added in SILoadFailedSegment " + newDetails.getLoadName)
failedLoadMetadataDetails.add(newDetails)
}
})
try {
if (!failedLoadMetadataDetails.isEmpty) {
CarbonIndexUtil
.LoadToSITable(sparkSession,
carbonLoadModel,
indexTableName,
isLoadToFailedSISegments = true,
secondaryIndex,
carbonTable, indexTable, failedLoadMetadataDetails)
// get the current load metadata details of the index table
details = SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
}
// get updated main table segments and si table segments
val mainTblLoadMetadataDetails: Array[LoadMetadataDetails] =
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
// check if main table has load in progress and SI table has no load
// in progress entry, then no need to enable the SI table
// Only if the valid segments of maintable match the valid segments of SI
// table then we can enable the SI for query
if (CarbonInternalLoaderUtil
.checkMainTableSegEqualToSISeg(mainTblLoadMetadataDetails,
siTblLoadMetadataDetails)
&& CarbonInternalLoaderUtil.checkInProgLoadInMainTableAndSI(carbonTable,
mainTblLoadMetadataDetails, siTblLoadMetadataDetails)) {
// enable the SI table if it was disabled earlier due to failure during SI
// creation time
sparkSession.sql(
s"""ALTER TABLE ${carbonLoadModel.getDatabaseName}.$indexTableName SET
|SERDEPROPERTIES ('isSITableEnabled' = 'true')""".stripMargin).collect()
}
} catch {
case ex: Exception =>
// in case of SI load only for for failed segments, catch the exception, but
// do not fail the main table load, as main table segments should be available
// for query
LOGGER.error(s"Load to SI table to $indexTableName is failed " +
s"or SI table ENABLE is failed. ", ex)
return
}
}
}
}
}
}
}
def checkIfMainTableLoadIsValid(mainTableDetails: Array[LoadMetadataDetails],
loadName: String): Boolean = {
val mainTableLoadDetail = mainTableDetails
.filter(mainTableDetail => mainTableDetail.getLoadName.equals(loadName)).head
if (mainTableLoadDetail.getSegmentStatus ==
SegmentStatus.MARKED_FOR_DELETE ||
mainTableLoadDetail.getSegmentStatus == SegmentStatus.COMPACTED) {
false
} else {
true
}
}
}