| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.spark.sql.events |
| |
| import java.util |
| |
| import scala.collection.JavaConverters._ |
| import scala.collection.mutable |
| |
| import org.apache.spark.internal.Logging |
| import org.apache.spark.rdd.CarbonMergeFilesRDD |
| import org.apache.spark.sql.SparkSession |
| import org.apache.spark.sql.util.CarbonException |
| import org.apache.spark.util.MergeIndexUtil |
| |
| import org.apache.carbondata.common.logging.LogServiceFactory |
| import org.apache.carbondata.core.datamap.Segment |
| import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage} |
| import org.apache.carbondata.core.statusmanager.SegmentStatusManager |
| import org.apache.carbondata.events._ |
| import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePreStatusUpdateEvent |
| import org.apache.carbondata.processing.merger.CarbonDataMergerUtil |
| |
| class MergeIndexEventListener extends OperationEventListener with Logging { |
| val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) |
| |
| override def onEvent(event: Event, operationContext: OperationContext): Unit = { |
| event match { |
| case preStatusUpdateEvent: LoadTablePreStatusUpdateEvent => |
| LOGGER.info("Load post status event-listener called for merge index") |
| val loadModel = preStatusUpdateEvent.getCarbonLoadModel |
| val carbonTable = loadModel.getCarbonDataLoadSchema.getCarbonTable |
| val compactedSegments = loadModel.getMergedSegmentIds |
| val sparkSession = SparkSession.getActiveSession.get |
| if(!carbonTable.isStreamingSink) { |
| if (null != compactedSegments && !compactedSegments.isEmpty) { |
| MergeIndexUtil.mergeIndexFilesForCompactedSegments(sparkSession, |
| carbonTable, |
| compactedSegments) |
| } else { |
| val segmentFileNameMap: java.util.Map[String, String] = new util.HashMap[String, |
| String]() |
| |
| segmentFileNameMap |
| .put(loadModel.getSegmentId, String.valueOf(loadModel.getFactTimeStamp)) |
| CarbonMergeFilesRDD.mergeIndexFiles(sparkSession, |
| Seq(loadModel.getSegmentId), |
| segmentFileNameMap, |
| carbonTable.getTablePath, |
| carbonTable, false) |
| // clear Block dataMap Cache |
| MergeIndexUtil.clearBlockDataMapCache(carbonTable, Seq(loadModel.getSegmentId)) |
| } |
| } |
| case alterTableMergeIndexEvent: AlterTableMergeIndexEvent => |
| val carbonMainTable = alterTableMergeIndexEvent.carbonTable |
| val sparkSession = alterTableMergeIndexEvent.sparkSession |
| if (!carbonMainTable.isStreamingSink) { |
| LOGGER.info(s"Merge Index request received for table " + |
| s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }") |
| val lock = CarbonLockFactory.getCarbonLockObj( |
| carbonMainTable.getAbsoluteTableIdentifier, |
| LockUsage.COMPACTION_LOCK) |
| |
| try { |
| if (lock.lockWithRetries()) { |
| LOGGER.info("Acquired the compaction lock for table" + |
| s" ${ carbonMainTable.getDatabaseName }.${ |
| carbonMainTable |
| .getTableName |
| }") |
| val segmentsToMerge = |
| if (alterTableMergeIndexEvent.alterTableModel.customSegmentIds.isEmpty) { |
| val validSegments = CarbonDataMergerUtil.getValidSegmentList( |
| carbonMainTable.getAbsoluteTableIdentifier, |
| carbonMainTable.isChildTable).asScala |
| val validSegmentIds: mutable.Buffer[String] = mutable.Buffer[String]() |
| validSegments.foreach { segment => |
| validSegmentIds += segment.getSegmentNo |
| } |
| validSegmentIds |
| } else { |
| alterTableMergeIndexEvent.alterTableModel.customSegmentIds.get |
| } |
| |
| val loadFolderDetailsArray = SegmentStatusManager |
| .readLoadMetadata(carbonMainTable.getMetadataPath) |
| val segmentFileNameMap: java.util.Map[String, String] = new util.HashMap[String, |
| String]() |
| loadFolderDetailsArray.foreach(loadMetadataDetails => { |
| segmentFileNameMap |
| .put(loadMetadataDetails.getLoadName, |
| String.valueOf(loadMetadataDetails.getLoadStartTime)) |
| }) |
| // in case of merge index file creation using Alter DDL command |
| // readFileFooterFromCarbonDataFile flag should be true. This flag is check for legacy |
| // store (store <= 1.1 version) and create merge Index file as per new store so that |
| // old store is also upgraded to new store |
| CarbonMergeFilesRDD.mergeIndexFiles( |
| sparkSession = sparkSession, |
| segmentIds = segmentsToMerge, |
| segmentFileNameToSegmentIdMap = segmentFileNameMap, |
| tablePath = carbonMainTable.getTablePath, |
| carbonTable = carbonMainTable, |
| mergeIndexProperty = true, |
| readFileFooterFromCarbonDataFile = true) |
| // clear Block dataMap Cache |
| MergeIndexUtil.clearBlockDataMapCache(carbonMainTable, segmentsToMerge) |
| val requestMessage = "Compaction request completed for table " + |
| s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }" |
| LOGGER.info(requestMessage) |
| } else { |
| val lockMessage = "Not able to acquire the compaction lock for table " + |
| s"${ carbonMainTable.getDatabaseName }." + |
| s"${ carbonMainTable.getTableName}" |
| LOGGER.error(lockMessage) |
| CarbonException.analysisException( |
| "Table is already locked for compaction. Please try after some time.") |
| } |
| } finally { |
| lock.unlock() |
| } |
| } |
| } |
| } |
| |
| } |