blob: 6e11fe4cc1ad7248140301402da3c474064b0c77 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command.management
import scala.collection.JavaConverters._
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionModel, DataProcessCommand, RunnableCommand}
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.util.CarbonException
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
import org.apache.carbondata.spark.util.CommonUtil
/**
* Command for the compaction in alter table command
*/
case class AlterTableCompactionCommand(
alterTableModel: AlterTableModel,
tableInfoOp: Option[TableInfo] = None)
extends RunnableCommand with DataProcessCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
processData(sparkSession)
}
override def processData(sparkSession: SparkSession): Seq[Row] = {
val LOGGER: LogService =
LogServiceFactory.getLogService(this.getClass.getName)
val tableName = alterTableModel.tableName.toLowerCase
val databaseName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase)
val table = if (tableInfoOp.isDefined) {
val tableInfo = tableInfoOp.get
// To DO: CarbonEnv.updateStorePath
CarbonTable.buildFromTableInfo(tableInfo)
} else {
val relation =
CarbonEnv.getInstance(sparkSession).carbonMetastore
.lookupRelation(Option(databaseName), tableName)(sparkSession)
.asInstanceOf[CarbonRelation]
if (relation == null) {
throw new NoSuchTableException(databaseName, tableName)
}
if (null == relation.carbonTable) {
LOGGER.error(s"alter table failed. table not found: $databaseName.$tableName")
throw new NoSuchTableException(databaseName, tableName)
}
relation.carbonTable
}
val carbonLoadModel = new CarbonLoadModel()
carbonLoadModel.setTableName(table.getTableName)
val dataLoadSchema = new CarbonDataLoadSchema(table)
// Need to fill dimension relation
carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
carbonLoadModel.setTableName(table.getTableName)
carbonLoadModel.setDatabaseName(table.getDatabaseName)
carbonLoadModel.setTablePath(table.getTablePath)
var storeLocation = CarbonProperties.getInstance
.getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
System.getProperty("java.io.tmpdir")
)
storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
try {
alterTableForCompaction(sparkSession.sqlContext,
alterTableModel,
carbonLoadModel,
storeLocation
)
} catch {
case e: Exception =>
if (null != e.getMessage) {
CarbonException.analysisException(
s"Compaction failed. Please check logs for more info. ${ e.getMessage }")
} else {
CarbonException.analysisException(
"Exception in compaction. Please check logs for more info.")
}
}
Seq.empty
}
private def alterTableForCompaction(sqlContext: SQLContext,
alterTableModel: AlterTableModel,
carbonLoadModel: CarbonLoadModel,
storeLocation: String): Unit = {
val LOGGER: LogService =
LogServiceFactory.getLogService(this.getClass.getName)
var compactionSize: Long = 0
var compactionType: CompactionType = CompactionType.MINOR_COMPACTION
if (alterTableModel.compactionType.equalsIgnoreCase("major")) {
compactionSize = CarbonDataMergerUtil.getCompactionSize(CompactionType.MAJOR_COMPACTION)
compactionType = CompactionType.MAJOR_COMPACTION
} else if (alterTableModel.compactionType.equalsIgnoreCase(
CompactionType.IUD_UPDDEL_DELTA_COMPACTION.toString)) {
compactionType = CompactionType.IUD_UPDDEL_DELTA_COMPACTION
if (alterTableModel.segmentUpdateStatusManager.isDefined) {
carbonLoadModel.setSegmentUpdateStatusManager(
alterTableModel.segmentUpdateStatusManager.get)
carbonLoadModel.setLoadMetadataDetails(
alterTableModel.segmentUpdateStatusManager.get.getLoadMetadataDetails.toList.asJava)
}
} else if (alterTableModel.compactionType.equalsIgnoreCase("segment_index")) {
compactionType = CompactionType.SEGMENT_INDEX_COMPACTION
} else {
compactionType = CompactionType.MINOR_COMPACTION
}
LOGGER.audit(s"Compaction request received for table " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
if (null == carbonLoadModel.getLoadMetadataDetails) {
CommonUtil.readLoadMetadataDetails(carbonLoadModel)
}
if (compactionType == CompactionType.SEGMENT_INDEX_COMPACTION) {
// Just launch job to merge index and return
CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
CarbonDataMergerUtil.getValidSegmentList(carbonTable.getAbsoluteTableIdentifier).asScala,
carbonLoadModel.getTablePath,
carbonTable, true)
return
}
// reading the start time of data load.
val loadStartTime : Long =
if (alterTableModel.factTimeStamp.isEmpty) {
CarbonUpdateUtil.readCurrentTime
} else {
alterTableModel.factTimeStamp.get
}
carbonLoadModel.setFactTimeStamp(loadStartTime)
val isCompactionTriggerByDDl = true
val compactionModel = CompactionModel(compactionSize,
compactionType,
carbonTable,
isCompactionTriggerByDDl
)
val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
)
.equalsIgnoreCase("true")
// if system level compaction is enabled then only one compaction can run in the system
// if any other request comes at this time then it will create a compaction request file.
// so that this will be taken up by the compaction process which is executing.
if (!isConcurrentCompactionAllowed) {
LOGGER.info("System level compaction lock is enabled.")
CarbonDataRDDFactory.handleCompactionForSystemLocking(sqlContext,
carbonLoadModel,
storeLocation,
compactionType,
carbonTable,
compactionModel
)
} else {
// normal flow of compaction
val lock = CarbonLockFactory
.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
LockUsage.COMPACTION_LOCK
)
if (lock.lockWithRetries()) {
LOGGER.info("Acquired the compaction lock for table" +
s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
try {
CarbonDataRDDFactory.startCompactionThreads(sqlContext,
carbonLoadModel,
storeLocation,
compactionModel,
lock
)
} catch {
case e: Exception =>
LOGGER.error(s"Exception in start compaction thread. ${ e.getMessage }")
lock.unlock()
throw e
}
} else {
LOGGER.audit("Not able to acquire the compaction lock for table " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
LOGGER.error(s"Not able to acquire the compaction lock for table" +
s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
CarbonException.analysisException(
"Table is already locked for compaction. Please try after some time.")
}
}
}
}