blob: 0cbccddda65f6a7e4a38b75117ae3b3494efde21 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command.management
import java.io.{InputStreamReader, IOException}
import java.util
import java.util.Collections
import java.util.concurrent.{Executors, ExecutorService}
import scala.collection.JavaConverters._
import com.google.gson.Gson
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.InputSplit
import org.apache.log4j.Logger
import org.apache.spark.CarbonInputMetrics
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.command.{Checker, DataCommand}
import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter}
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.converter.SparkDataTypeConverterImpl
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.filesystem.CarbonFile
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.{ColumnarFormatVersion, SegmentFileStore}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager, StageInput}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection}
import org.apache.carbondata.processing.loading.FailureCauses
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.util.CarbonLoaderUtil
import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark
import org.apache.carbondata.spark.rdd.CarbonScanRDD
import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
/**
* Collect stage input files and trigger a loading into carbon table.
*
* @param databaseNameOp database name
* @param tableName table name
*/
case class CarbonInsertFromStageCommand(
databaseNameOp: Option[String],
tableName: String
) extends DataCommand {
@transient var LOGGER: Logger = _
override def processData(spark: SparkSession): Seq[Row] = {
LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
Checker.validateTableExists(databaseNameOp, tableName, spark)
val table = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(spark)
val hadoopConf = spark.sessionState.newHadoopConf()
FileFactory.getConfiguration.addResource(hadoopConf)
setAuditTable(table)
if (!table.getTableInfo.isTransactionalTable) {
throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
}
if (table.isChildTableForMV) {
throw new MalformedCarbonCommandException("Unsupported operation on MV table")
}
val tablePath = table.getTablePath
val stagePath = CarbonTablePath.getStageDir(tablePath)
val snapshotFilePath = CarbonTablePath.getStageSnapshotFile(tablePath)
val lock = acquireIngestLock(table)
try {
// 1. Check whether we need to recover from previous failure
// We use a snapshot file to indicate whether there was failure in previous
// ingest operation. A Snapshot file will be created when an ingest operation
// starts and will be deleted only after the whole ingest operation is finished,
// which includes two actions:
// 1) action1: changing segment status to SUCCESS and
// 2) action2: deleting all involved stage files.
//
// If one of these two actions is failed, the snapshot file will be exist, so
// that recovery is needed.
//
// To do the recovery, do following steps:
// 1) Check if corresponding segment in table status is SUCCESS,
// means deleting stage files had failed. So need to read the stage
// file list from the snapshot file and delete them again.
// 2) Check if corresponding segment in table status is INSERT_IN_PROGRESS,
// means data loading had failed. So need to read the stage file list
// from the snapshot file and load again.
recoverIfRequired(snapshotFilePath, table, hadoopConf)
// 2. Start ingesting, steps:
// 1) read all existing stage files
// 2) read all stage files to collect input files for data loading
// 3) add a new segment entry in table status as INSERT_IN_PROGRESS,
// 4) write all existing stage file names into a new snapshot file
// 5) do actual loading
// 6) write segment file and update segment state to SUCCESS in table status
// 7) delete stage files used for loading
// 8) delete the snapshot file
// 1) read all existing stage files
val stageFiles = listStageFiles(stagePath, hadoopConf)
if (stageFiles.isEmpty) {
// no stage files, so do nothing
LOGGER.warn("files not found under stage metadata folder")
return Seq.empty
}
// 2) read all stage files to collect input files for data loading
// create a thread pool to read them
val numThreads = Math.min(Math.max(stageFiles.length, 1), 10)
val executorService = Executors.newFixedThreadPool(numThreads)
val stageInputs = collectStageInputs(executorService, stageFiles)
// 3) perform data loading
if (table.isHivePartitionTable) {
startLoadingWithPartition(spark, table, stageInputs, stageFiles, snapshotFilePath)
} else {
startLoading(spark, table, stageInputs, stageFiles, snapshotFilePath)
}
// 4) delete stage files
deleteStageFiles(executorService, stageFiles)
// 5) delete the snapshot file
FileFactory.getCarbonFile(snapshotFilePath).delete()
} catch {
case ex: Throwable =>
LOGGER.error(s"failed to insert ${table.getDatabaseName}.${table.getTableName}", ex)
throw ex
} finally {
lock.unlock()
}
Seq.empty
}
/**
* Check whether there was failure in previous ingest process and try to recover
*/
private def recoverIfRequired(
snapshotFilePath: String,
table: CarbonTable,
conf: Configuration): Unit = {
if (!FileFactory.isFileExist(snapshotFilePath)) {
// everything is fine
return
}
// something wrong, read the snapshot file and do recover steps
// 1. check segment state in table status file
// 2. If in SUCCESS state, delete all stage files read inn snapshot file
// 3. If in IN_PROGRESS state, delete the entry in table status and load again
LOGGER.info(s"snapshot file found ($snapshotFilePath), start recovery process")
val lines = FileFactory.readLinesInFile(snapshotFilePath, conf)
if (lines.size() < 2) {
throw new RuntimeException("Invalid snapshot file, " + lines.size() + " lines")
}
val segmentId = lines.get(0)
val stageFileNames = lines.remove(0)
LOGGER.info(s"Segment $segmentId need recovery, ${stageFileNames.length} stage files")
// lock the table status
var lock = CarbonLockFactory.getCarbonLockObj(
table.getAbsoluteTableIdentifier, LockUsage.TABLE_STATUS_LOCK)
if (!lock.lockWithRetries()) {
throw new RuntimeException(s"Failed to lock table status for " +
s"${table.getDatabaseName}.${table.getTableName}")
}
try {
val segments = SegmentStatusManager.readTableStatusFile(
CarbonTablePath.getTableStatusFilePath(table.getTablePath)
)
val matchedSegment = segments.filter(_.getLoadName.equals(segmentId))
if (matchedSegment.length != 1) {
throw new RuntimeException("unexpected " + matchedSegment.length + " segment found")
}
matchedSegment(0).getSegmentStatus match {
case SegmentStatus.SUCCESS =>
// delete all stage files
lock.unlock()
lock = null
LOGGER.info(s"Segment $segmentId is in SUCCESS state, about to delete " +
s"${stageFileNames.length} stage files")
val numThreads = Math.min(Math.max(stageFileNames.length, 1), 10)
val executorService = Executors.newFixedThreadPool(numThreads)
stageFileNames.map { fileName =>
executorService.submit(new Runnable {
override def run(): Unit = {
FileFactory.getCarbonFile(
CarbonTablePath.getStageDir(table.getTablePath) +
CarbonCommonConstants.FILE_SEPARATOR + fileName
).delete()
}
})
}.map { future =>
future.get()
}
case SegmentStatus.INSERT_IN_PROGRESS =>
// delete entry in table status and load again
LOGGER.info(s"Segment $segmentId is in INSERT_IN_PROGRESS state, about to delete the " +
s"segment entry and load again")
val segmentToWrite = segments.filterNot(_.getLoadName.equals(segmentId))
SegmentStatusManager.writeLoadDetailsIntoFile(
CarbonTablePath.getTableStatusFilePath(table.getTablePath),
segmentToWrite)
case other =>
throw new RuntimeException(s"Segment $segmentId is in unexpected state: $other")
}
} finally {
if (lock != null) {
lock.unlock()
}
}
LOGGER.info(s"Finish recovery, delete snapshot file: $snapshotFilePath")
FileFactory.getCarbonFile(snapshotFilePath).delete()
}
/**
* Start global sort loading
*/
private def startLoading(
spark: SparkSession,
table: CarbonTable,
stageInput: Seq[StageInput],
stageFiles: Array[(CarbonFile, CarbonFile)],
snapshotFilePath: String
): Unit = {
var loadModel: CarbonLoadModel = null
try {
// 1) add new segment with INSERT_IN_PROGRESS into table status
loadModel = DataLoadProcessBuilderOnSpark.createLoadModelForGlobalSort(spark, table)
CarbonLoaderUtil.recordNewLoadMetadata(loadModel)
// 2) write all existing stage file names and segmentId into a new snapshot file
// The content of snapshot file is: first line is segmentId, followed by each line is
// one stage file name
val content =
(Seq(loadModel.getSegmentId) ++ stageFiles.map(_._1.getAbsolutePath)).mkString("\n")
FileFactory.writeFile(content, snapshotFilePath)
// 3) do loading.
val splits = stageInput.flatMap(_.createSplits().asScala)
LOGGER.info(s"start to load ${splits.size} files into " +
s"${table.getDatabaseName}.${table.getTableName}")
val start = System.currentTimeMillis()
val dataFrame = DataLoadProcessBuilderOnSpark.createInputDataFrame(spark, table, splits)
DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(
spark,
Option(dataFrame),
loadModel,
SparkSQLUtil.sessionState(spark).newHadoopConf()
).map { row =>
(row._1, FailureCauses.NONE == row._2._2.failureCauses)
}
LOGGER.info(s"finish data loading, time taken ${System.currentTimeMillis() - start}ms")
// 4) write segment file and update the segment entry to SUCCESS
val segmentFileName = SegmentFileStore.writeSegmentFile(
table, loadModel.getSegmentId, loadModel.getFactTimeStamp.toString)
SegmentFileStore.updateTableStatusFile(
table, loadModel.getSegmentId, segmentFileName,
table.getCarbonTableIdentifier.getTableId,
new SegmentFileStore(table.getTablePath, segmentFileName),
SegmentStatus.SUCCESS)
} catch {
case ex: Throwable =>
LOGGER.error(s"failed to insert ${table.getDatabaseName}.${table.getTableName}", ex)
if (loadModel != null) {
CarbonLoaderUtil.updateTableStatusForFailure(loadModel)
}
throw ex
}
}
/**
* Start global sort loading of partition table
*/
private def startLoadingWithPartition(
spark: SparkSession,
table: CarbonTable,
stageInput: Seq[StageInput],
stageFiles: Array[(CarbonFile, CarbonFile)],
snapshotFilePath: String
): Unit = {
val partitionDataList = listPartitionFiles(stageInput)
val content = stageFiles.map(_._1.getAbsolutePath).mkString("\n")
FileFactory.writeFile(content, snapshotFilePath)
val start = System.currentTimeMillis()
partitionDataList.map {
case (partition, splits) =>
LOGGER.info(s"start to load ${splits.size} files into " +
s"${table.getDatabaseName}.${table.getTableName}. " +
s"Partition information: ${partition.mkString(",")}")
val dataFrame = createInputDataFrameOfInternalRow(spark, table, splits)
val columns = dataFrame.columns
val header = columns.mkString(",")
val selectColumns = columns.filter(!partition.contains(_))
val selectedDataFrame = dataFrame.select(selectColumns.head, selectColumns.tail: _*)
val loadCommand = CarbonLoadDataCommand(
databaseNameOp = Option(table.getDatabaseName),
tableName = table.getTableName,
factPathFromUser = null,
dimFilesPath = Seq(),
options = scala.collection.immutable.Map("fileheader" -> header,
"binary_decoder" -> "base64"),
isOverwriteTable = false,
inputSqlString = null,
dataFrame = Some(selectedDataFrame),
updateModel = None,
tableInfoOp = None,
internalOptions = Map.empty,
partition = partition
)
loadCommand.run(spark)
}
LOGGER.info(s"finish data loading, time taken ${System.currentTimeMillis() - start}ms")
}
/**
* @return return a (partitionMap, InputSplits) pair list.
* the partitionMap contains all partition column name and value.
* the InputSplits is all data file information of current partition.
*/
private def listPartitionFiles(
stageInputs : Seq[StageInput]
): Seq[(Map[String, Option[String]], Seq[InputSplit])] = {
val partitionMap = new util.HashMap[Map[String, Option[String]], util.List[InputSplit]]()
stageInputs.foreach (
stageInput => {
val locations = stageInput.getLocations.asScala
locations.foreach (
location => {
val partition = location.getPartitions.asScala.map(t => (t._1, Option(t._2))).toMap
var splits = partitionMap.get(partition)
if (splits == null) {
partitionMap.put(partition, new util.ArrayList[InputSplit]())
splits = partitionMap.get(partition)
}
splits.addAll (
location.getFiles.asScala
.filter(_._1.endsWith(CarbonCommonConstants.FACT_FILE_EXT))
.map(
file => {
CarbonInputSplit.from(
"-1", "0",
stageInput.getBase + CarbonCommonConstants.FILE_SEPARATOR + file._1, 0,
file._2, ColumnarFormatVersion.V3, null
)
}
).toList.asJava
)
}
)
}
)
partitionMap.asScala.map(entry => (entry._1, entry._2.asScala)).toSeq
}
/**
* Read stage files and return input files
*/
private def collectStageInputs(
executorService: ExecutorService,
stageFiles: Array[(CarbonFile, CarbonFile)]
): Seq[StageInput] = {
val startTime = System.currentTimeMillis()
val output = Collections.synchronizedList(new util.ArrayList[StageInput]())
val gson = new Gson()
stageFiles.map { stage =>
executorService.submit(new Runnable {
override def run(): Unit = {
val filePath = stage._1.getAbsolutePath
val stream = FileFactory.getDataInputStream(filePath)
try {
val stageInput = gson.fromJson(new InputStreamReader(stream), classOf[StageInput])
output.add(stageInput)
} finally {
stream.close()
}
}
})
}.map { future =>
future.get()
}
LOGGER.info(s"read stage files taken ${System.currentTimeMillis() - startTime}ms")
output.asScala
}
/**
* Delete stage file and success file
*/
private def deleteStageFiles(
executorService: ExecutorService,
stageFiles: Array[(CarbonFile, CarbonFile)]): Unit = {
val startTime = System.currentTimeMillis()
stageFiles.map { files =>
executorService.submit(new Runnable {
override def run(): Unit = {
files._1.delete()
files._2.delete()
}
})
}.map { future =>
future.get()
}
LOGGER.info(s"finished to delete stage files, time taken: " +
s"${System.currentTimeMillis() - startTime}ms")
}
/*
* Collect all stage files and matched success files.
* A stage file without success file will not be collected
*/
private def listStageFiles(
loadDetailsDir: String,
hadoopConf: Configuration
): Array[(CarbonFile, CarbonFile)] = {
val dir = FileFactory.getCarbonFile(loadDetailsDir, hadoopConf)
if (dir.exists()) {
val allFiles = dir.listFiles()
val successFiles = allFiles.filter { file =>
file.getName.endsWith(CarbonTablePath.SUCCESS_FILE_SUBFIX)
}.map { file =>
(file.getName.substring(0, file.getName.indexOf(".")), file)
}.toMap
allFiles.filter { file =>
!file.getName.endsWith(CarbonTablePath.SUCCESS_FILE_SUBFIX)
}.filter { file =>
successFiles.contains(file.getName)
}.map { file =>
(file, successFiles(file.getName))
}
} else {
Array.empty
}
}
/**
* INGEST operation does not support concurrent, so it is one lock for one table
*/
private def acquireIngestLock(table: CarbonTable): ICarbonLock = {
val tableIdentifier = table.getAbsoluteTableIdentifier
val lock = CarbonLockFactory.getCarbonLockObj(tableIdentifier, LockUsage.INGEST_LOCK)
val retryCount = CarbonLockUtil.getLockProperty(
CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CARBON_LOCK,
CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CARBON_LOCK_DEFAULT
)
val maxTimeout = CarbonLockUtil.getLockProperty(
CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT
)
if (lock.lockWithRetries(retryCount, maxTimeout)) {
lock
} else {
throw new IOException(
s"Not able to acquire the lock for table status file for $tableIdentifier")
}
}
/**
* create DataFrame basing on specified splits
*/
private def createInputDataFrameOfInternalRow(
sparkSession: SparkSession,
carbonTable: CarbonTable,
splits: Seq[InputSplit]
): DataFrame = {
val columns = carbonTable
.getCreateOrderColumn
.asScala
.map(_.getColName)
.toArray
val schema = SparkTypeConverter.createSparkSchema(carbonTable, columns)
val rdd: RDD[InternalRow] = new CarbonScanRDD[InternalRow](
sparkSession,
columnProjection = new CarbonProjection(columns),
null,
carbonTable.getAbsoluteTableIdentifier,
carbonTable.getTableInfo.serialize,
carbonTable.getTableInfo,
new CarbonInputMetrics,
null,
classOf[SparkDataTypeConverterImpl],
classOf[SparkRowReadSupportImpl],
splits.asJava
)
SparkSQLUtil.execute(rdd, schema, sparkSession)
}
override protected def opName: String = "INSERT STAGE"
}