blob: 5d7359d998a6bca6bc556b120410863318c7ff88 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command.management
import java.io.{InputStreamReader, IOException}
import java.util
import java.util.Collections
import java.util.concurrent.{Executors, ExecutorService}
import scala.collection.JavaConverters._
import com.google.gson.Gson
import org.apache.hadoop.conf.Configuration
import org.apache.log4j.Logger
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.execution.command.{Checker, DataCommand}
import org.apache.spark.sql.util.SparkSQLUtil
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.filesystem.CarbonFile
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager, StageInput}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.processing.loading.FailureCauses
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.util.CarbonLoaderUtil
import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark
/**
* Collect stage input files and trigger a loading into carbon table.
*
* @param databaseNameOp database name
* @param tableName table name
*/
case class CarbonInsertFromStageCommand(
databaseNameOp: Option[String],
tableName: String
) extends DataCommand {
@transient var LOGGER: Logger = _
override def processData(spark: SparkSession): Seq[Row] = {
LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
Checker.validateTableExists(databaseNameOp, tableName, spark)
val table = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(spark)
val hadoopConf = spark.sessionState.newHadoopConf()
FileFactory.getConfiguration.addResource(hadoopConf)
setAuditTable(table)
if (!table.getTableInfo.isTransactionalTable) {
throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
}
if (table.isChildTableForMV) {
throw new MalformedCarbonCommandException("Unsupported operation on MV table")
}
val tablePath = table.getTablePath
val stagePath = CarbonTablePath.getStageDir(tablePath)
val snapshotFilePath = CarbonTablePath.getStageSnapshotFile(tablePath)
var loadModel: CarbonLoadModel = null
val lock = acquireIngestLock(table)
try {
// 1. Check whether we need to recover from previous failure
// We use a snapshot file to indicate whether there was failure in previous
// ingest operation. A Snapshot file will be created when an ingest operation
// starts and will be deleted only after the whole ingest operation is finished,
// which includes two actions:
// 1) action1: changing segment status to SUCCESS and
// 2) action2: deleting all involved stage files.
//
// If one of these two actions is failed, the snapshot file will be exist, so
// that recovery is needed.
//
// To do the recovery, do following steps:
// 1) Check if corresponding segment in table status is SUCCESS,
// means deleting stage files had failed. So need to read the stage
// file list from the snapshot file and delete them again.
// 2) Check if corresponding segment in table status is INSERT_IN_PROGRESS,
// means data loading had failed. So need to read the stage file list
// from the snapshot file and load again.
recoverIfRequired(snapshotFilePath, table, hadoopConf)
// 2. Start ingesting, steps:
// 1) read all existing stage files
// 2) read all stage files to collect input files for data loading
// 3) add a new segment entry in table status as INSERT_IN_PROGRESS,
// 4) write all existing stage file names into a new snapshot file
// 5) do actual loading
// 6) write segment file and update segment state to SUCCESS in table status
// 7) delete stage files used for loading
// 8) delete the snapshot file
// 1) read all existing stage files
val stageFiles = listStageFiles(stagePath, hadoopConf)
if (stageFiles.isEmpty) {
// no stage files, so do nothing
LOGGER.warn("files not found under stage metadata folder")
return Seq.empty
}
// 2) read all stage files to collect input files for data loading
// create a thread pool to read them
val numThreads = Math.min(Math.max(stageFiles.length, 1), 10)
val executorService = Executors.newFixedThreadPool(numThreads)
val stageInputs = collectStageInputs(executorService, stageFiles)
// 3) add new segment with INSERT_IN_PROGRESS into table status
loadModel = DataLoadProcessBuilderOnSpark.createLoadModelForGlobalSort(spark, table)
CarbonLoaderUtil.recordNewLoadMetadata(loadModel)
// 4) write all existing stage file names and segmentId into a new snapshot file
// The content of snapshot file is: first line is segmentId, followed by each line is
// one stage file name
val content =
(Seq(loadModel.getSegmentId) ++ stageFiles.map(_._1.getAbsolutePath)).mkString("\n")
FileFactory.writeFile(content, snapshotFilePath)
// 5) perform data loading
startLoading(spark, table, loadModel, stageInputs)
// 6) write segment file and update the segment entry to SUCCESS
val segmentFileName = SegmentFileStore.writeSegmentFile(
table, loadModel.getSegmentId, loadModel.getFactTimeStamp.toString)
SegmentFileStore.updateTableStatusFile(
table, loadModel.getSegmentId, segmentFileName,
table.getCarbonTableIdentifier.getTableId,
new SegmentFileStore(table.getTablePath, segmentFileName),
SegmentStatus.SUCCESS)
// 7) delete stage files
deleteStageFiles(executorService, stageFiles)
// 8) delete the snapshot file
FileFactory.getCarbonFile(snapshotFilePath).delete()
} catch {
case ex: Throwable =>
LOGGER.error(s"failed to insert ${table.getDatabaseName}.${table.getTableName}", ex)
if (loadModel != null) {
CarbonLoaderUtil.updateTableStatusForFailure(loadModel)
}
throw ex
} finally {
lock.unlock()
}
Seq.empty
}
/**
* Check whether there was failure in previous ingest process and try to recover
*/
private def recoverIfRequired(
snapshotFilePath: String,
table: CarbonTable,
conf: Configuration): Unit = {
if (!FileFactory.isFileExist(snapshotFilePath)) {
// everything is fine
return
}
// something wrong, read the snapshot file and do recover steps
// 1. check segment state in table status file
// 2. If in SUCCESS state, delete all stage files read inn snapshot file
// 3. If in IN_PROGRESS state, delete the entry in table status and load again
LOGGER.info(s"snapshot file found ($snapshotFilePath), start recovery process")
val lines = FileFactory.readLinesInFile(snapshotFilePath, conf)
if (lines.size() < 2) {
throw new RuntimeException("Invalid snapshot file, " + lines.size() + " lines")
}
val segmentId = lines.get(0)
val stageFileNames = lines.remove(0)
LOGGER.info(s"Segment $segmentId need recovery, ${stageFileNames.length} stage files")
// lock the table status
var lock = CarbonLockFactory.getCarbonLockObj(
table.getAbsoluteTableIdentifier, LockUsage.TABLE_STATUS_LOCK)
if (!lock.lockWithRetries()) {
throw new RuntimeException(s"Failed to lock table status for " +
s"${table.getDatabaseName}.${table.getTableName}")
}
try {
val segments = SegmentStatusManager.readTableStatusFile(
CarbonTablePath.getTableStatusFilePath(table.getTablePath)
)
val matchedSegment = segments.filter(_.getLoadName.equals(segmentId))
if (matchedSegment.length != 1) {
throw new RuntimeException("unexpected " + matchedSegment.length + " segment found")
}
matchedSegment(0).getSegmentStatus match {
case SegmentStatus.SUCCESS =>
// delete all stage files
lock.unlock()
lock = null
LOGGER.info(s"Segment $segmentId is in SUCCESS state, about to delete " +
s"${stageFileNames.length} stage files")
val numThreads = Math.min(Math.max(stageFileNames.length, 1), 10)
val executorService = Executors.newFixedThreadPool(numThreads)
stageFileNames.map { fileName =>
executorService.submit(new Runnable {
override def run(): Unit = {
FileFactory.getCarbonFile(
CarbonTablePath.getStageDir(table.getTablePath) +
CarbonCommonConstants.FILE_SEPARATOR + fileName
).delete()
}
})
}.map { future =>
future.get()
}
case SegmentStatus.INSERT_IN_PROGRESS =>
// delete entry in table status and load again
LOGGER.info(s"Segment $segmentId is in INSERT_IN_PROGRESS state, about to delete the " +
s"segment entry and load again")
val segmentToWrite = segments.filterNot(_.getLoadName.equals(segmentId))
SegmentStatusManager.writeLoadDetailsIntoFile(
CarbonTablePath.getTableStatusFilePath(table.getTablePath),
segmentToWrite)
case other =>
throw new RuntimeException(s"Segment $segmentId is in unexpected state: $other")
}
} finally {
if (lock != null) {
lock.unlock()
}
}
LOGGER.info(s"Finish recovery, delete snapshot file: $snapshotFilePath")
FileFactory.getCarbonFile(snapshotFilePath).delete()
}
/**
* Start global sort loading
*/
private def startLoading(
spark: SparkSession,
table: CarbonTable,
loadModel: CarbonLoadModel,
stageInput: Seq[StageInput]
): Unit = {
val splits = stageInput.flatMap(_.createSplits().asScala)
LOGGER.info(s"start to load ${splits.size} files into " +
s"${table.getDatabaseName}.${table.getTableName}")
val start = System.currentTimeMillis()
val dataFrame = DataLoadProcessBuilderOnSpark.createInputDataFrame(spark, table, splits)
DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(
spark,
Option(dataFrame),
loadModel,
SparkSQLUtil.sessionState(spark).newHadoopConf()
).map { row =>
(row._1, FailureCauses.NONE == row._2._2.failureCauses)
}
LOGGER.info(s"finish data loading, time taken ${System.currentTimeMillis() - start}ms")
}
/**
* Read stage files and return input files
*/
private def collectStageInputs(
executorService: ExecutorService,
stageFiles: Array[(CarbonFile, CarbonFile)]
): Seq[StageInput] = {
val startTime = System.currentTimeMillis()
val output = Collections.synchronizedList(new util.ArrayList[StageInput]())
val gson = new Gson()
stageFiles.map { stage =>
executorService.submit(new Runnable {
override def run(): Unit = {
val filePath = stage._1.getAbsolutePath
val stream = FileFactory.getDataInputStream(filePath)
try {
val stageInput = gson.fromJson(new InputStreamReader(stream), classOf[StageInput])
output.add(stageInput)
} finally {
stream.close()
}
}
})
}.map { future =>
future.get()
}
LOGGER.info(s"read stage files taken ${System.currentTimeMillis() - startTime}ms")
output.asScala
}
/**
* Delete stage file and success file
*/
private def deleteStageFiles(
executorService: ExecutorService,
stageFiles: Array[(CarbonFile, CarbonFile)]): Unit = {
val startTime = System.currentTimeMillis()
stageFiles.map { files =>
executorService.submit(new Runnable {
override def run(): Unit = {
files._1.delete()
files._2.delete()
}
})
}.map { future =>
future.get()
}
LOGGER.info(s"finished to delete stage files, time taken: " +
s"${System.currentTimeMillis() - startTime}ms")
}
/*
* Collect all stage files and matched success files.
* A stage file without success file will not be collected
*/
private def listStageFiles(
loadDetailsDir: String,
hadoopConf: Configuration
): Array[(CarbonFile, CarbonFile)] = {
val dir = FileFactory.getCarbonFile(loadDetailsDir, hadoopConf)
if (dir.exists()) {
val allFiles = dir.listFiles()
val successFiles = allFiles.filter { file =>
file.getName.endsWith(CarbonTablePath.SUCCESS_FILE_SUBFIX)
}.map { file =>
(file.getName.substring(0, file.getName.indexOf(".")), file)
}.toMap
allFiles.filter { file =>
!file.getName.endsWith(CarbonTablePath.SUCCESS_FILE_SUBFIX)
}.filter { file =>
successFiles.contains(file.getName)
}.map { file =>
(file, successFiles(file.getName))
}
} else {
Array.empty
}
}
/**
* INGEST operation does not support concurrent, so it is one lock for one table
*/
private def acquireIngestLock(table: CarbonTable): ICarbonLock = {
val tableIdentifier = table.getAbsoluteTableIdentifier
val lock = CarbonLockFactory.getCarbonLockObj(tableIdentifier, LockUsage.INGEST_LOCK)
val retryCount = CarbonLockUtil.getLockProperty(
CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CARBON_LOCK,
CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CARBON_LOCK_DEFAULT
)
val maxTimeout = CarbonLockUtil.getLockProperty(
CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT
)
if (lock.lockWithRetries(retryCount, maxTimeout)) {
lock
} else {
throw new IOException(
s"Not able to acquire the lock for table status file for $tableIdentifier")
}
}
override protected def opName: String = "INSERT STAGE"
}