| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.spark.sql.execution.command.management |
| |
| import java.io.{InputStreamReader, IOException} |
| import java.util |
| import java.util.Collections |
| import java.util.concurrent.{Callable, Executors, ExecutorService} |
| |
| import scala.collection.JavaConverters._ |
| |
| import com.google.gson.Gson |
| import org.apache.hadoop.conf.Configuration |
| import org.apache.hadoop.mapreduce.InputSplit |
| import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} |
| import org.apache.spark.sql.execution.command.{Checker, DataCommand} |
| import org.apache.spark.sql.util.SparkSQLUtil |
| |
| import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException |
| import org.apache.carbondata.common.logging.LogServiceFactory |
| import org.apache.carbondata.core.constants.CarbonCommonConstants |
| import org.apache.carbondata.core.datastore.filesystem.CarbonFile |
| import org.apache.carbondata.core.datastore.impl.FileFactory |
| import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage} |
| import org.apache.carbondata.core.metadata.{ColumnarFormatVersion, SegmentFileStore} |
| import org.apache.carbondata.core.metadata.schema.table.CarbonTable |
| import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo |
| import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager, StageInput} |
| import org.apache.carbondata.core.util.CarbonProperties |
| import org.apache.carbondata.core.util.path.CarbonTablePath |
| import org.apache.carbondata.events.{OperationContext, OperationListenerBus} |
| import org.apache.carbondata.hadoop.CarbonInputSplit |
| import org.apache.carbondata.processing.loading.FailureCauses |
| import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, LoadTablePreStatusUpdateEvent} |
| import org.apache.carbondata.processing.loading.model.CarbonLoadModel |
| import org.apache.carbondata.processing.util.CarbonLoaderUtil |
| import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark |
| import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory |
| |
| /** |
| * Collect stage input files and trigger a loading into carbon table. |
| * |
| * @param databaseNameOp database name |
| * @param tableName table name |
| */ |
| case class CarbonInsertFromStageCommand( |
| databaseNameOp: Option[String], |
| tableName: String, |
| options: Map[String, String] |
| ) extends DataCommand { |
| |
| private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) |
| |
| override def processData(spark: SparkSession): Seq[Row] = { |
| Checker.validateTableExists(databaseNameOp, tableName, spark) |
| val table = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(spark) |
| val hadoopConf = spark.sessionState.newHadoopConf() |
| FileFactory.getConfiguration.addResource(hadoopConf) |
| setAuditTable(table) |
| |
| if (!table.getTableInfo.isTransactionalTable) { |
| throw new MalformedCarbonCommandException("Unsupported operation on non transactional table") |
| } |
| if (table.isMV) { |
| throw new MalformedCarbonCommandException("Unsupported operation on MV table") |
| } |
| |
| val tablePath = table.getTablePath |
| val stagePath = CarbonTablePath.getStageDir(tablePath) |
| val snapshotFilePath = CarbonTablePath.getStageSnapshotFile(tablePath) |
| var stageFiles: Array[(CarbonFile, CarbonFile)] = Array.empty |
| var executorService: ExecutorService = null |
| val lock = acquireIngestLock(table) |
| |
| try { |
| // 1. Check whether we need to recover from previous failure |
| // We use a snapshot file to indicate whether there was failure in previous |
| // ingest operation. A Snapshot file will be created when an ingest operation |
| // starts and will be deleted only after the whole ingest operation is finished, |
| // which includes two actions: |
| // 1) action1: changing segment status to SUCCESS and |
| // 2) action2: deleting all involved stage files. |
| // |
| // If one of these two actions is failed, the snapshot file will be exist, so |
| // that recovery is needed. |
| // |
| // To do the recovery, do following steps: |
| // 1) Check if corresponding segment in table status is SUCCESS, |
| // means deleting stage files had failed. So need to read the stage |
| // file list from the snapshot file and delete them again. |
| // 2) Check if corresponding segment in table status is INSERT_IN_PROGRESS, |
| // means data loading had failed. So need to read the stage file list |
| // from the snapshot file and load again. |
| recoverIfRequired(snapshotFilePath, table, hadoopConf) |
| |
| // 2. Start ingesting, steps: |
| // 1) read all existing stage files |
| // 2) read all stage files to collect input files for data loading |
| // 3) add a new segment entry in table status as INSERT_IN_PROGRESS, |
| // 4) write all existing stage file names into a new snapshot file |
| // 5) do actual loading |
| // 6) write segment file and update segment state to SUCCESS in table status |
| // 7) delete stage files used for loading |
| // 8) delete the snapshot file |
| |
| // 1) read all existing stage files |
| val batchSize = try { |
| Integer.valueOf( |
| options.getOrElse(CarbonInsertFromStageCommand.BATCH_FILE_COUNT_KEY, |
| CarbonInsertFromStageCommand.BATCH_FILE_COUNT_DEFAULT)) |
| } catch { |
| case _: NumberFormatException => |
| throw new MalformedCarbonCommandException("Option [" + |
| CarbonInsertFromStageCommand.BATCH_FILE_COUNT_KEY + "] is not a number.") |
| } |
| if (batchSize < 1) { |
| throw new MalformedCarbonCommandException("Option [" + |
| CarbonInsertFromStageCommand.BATCH_FILE_COUNT_KEY + "] is less than 1.") |
| } |
| val orderType = options.getOrElse(CarbonInsertFromStageCommand.BATCH_FILE_ORDER_KEY, |
| CarbonInsertFromStageCommand.BATCH_FILE_ORDER_DEFAULT) |
| if (!orderType.equalsIgnoreCase(CarbonInsertFromStageCommand.BATCH_FILE_ORDER_ASC) && |
| !orderType.equalsIgnoreCase(CarbonInsertFromStageCommand.BATCH_FILE_ORDER_DESC)) { |
| throw new MalformedCarbonCommandException("Option [" + |
| CarbonInsertFromStageCommand.BATCH_FILE_ORDER_KEY + "] is invalid, should be " + |
| CarbonInsertFromStageCommand.BATCH_FILE_ORDER_ASC + " or " + |
| CarbonInsertFromStageCommand.BATCH_FILE_ORDER_DESC + ".") |
| } |
| LOGGER.info("Option [" + CarbonInsertFromStageCommand.BATCH_FILE_ORDER_KEY + |
| "] value is " + orderType) |
| stageFiles = listStageFiles(stagePath, hadoopConf, batchSize, |
| orderType.equalsIgnoreCase(CarbonInsertFromStageCommand.BATCH_FILE_ORDER_ASC)) |
| if (stageFiles.isEmpty) { |
| // no stage files, so do nothing |
| LOGGER.warn("files not found under stage metadata folder") |
| return Seq.empty |
| } |
| |
| // We add a tag 'loading' to the stages in process. |
| // different insertstage processes can load different data separately |
| // by choose the stages without 'loading' tag or stages loaded timeout. |
| // which avoid loading the same data between concurrent insertstage processes. |
| // The 'loading' tag is actually an empty file with |
| // '.loading' suffix filename |
| val numThreads = Math.min(Math.max(stageFiles.length, 1), 10) |
| executorService = Executors.newFixedThreadPool(numThreads) |
| createStageLoadingFilesWithRetry(executorService, stageFiles) |
| } catch { |
| case ex: Throwable => |
| LOGGER.error(s"failed to insert ${table.getDatabaseName}.${table.getTableName}", ex) |
| throw ex |
| } finally { |
| lock.unlock() |
| } |
| |
| try{ |
| // 2) read all stage files to collect input files for data loading |
| // create a thread pool to read them |
| val stageInputs = collectStageInputs(executorService, stageFiles) |
| |
| // 3) perform data loading |
| if (table.isHivePartitionTable) { |
| startLoadingWithPartition(spark, table, stageInputs, stageFiles, snapshotFilePath) |
| } else { |
| startLoading(spark, table, stageInputs, stageFiles, snapshotFilePath) |
| } |
| |
| // 4) delete stage files |
| deleteStageFilesWithRetry(executorService, stageFiles) |
| |
| // 5) delete the snapshot file |
| deleteSnapShotFileWithRetry(table, snapshotFilePath) |
| } catch { |
| case ex: Throwable => |
| LOGGER.error(s"failed to insert ${table.getDatabaseName}.${table.getTableName}", ex) |
| throw ex |
| } |
| Seq.empty |
| } |
| |
| /** |
| * Check whether there was failure in previous ingest process and try to recover |
| */ |
| private def recoverIfRequired( |
| snapshotFilePath: String, |
| table: CarbonTable, |
| conf: Configuration): Unit = { |
| if (!FileFactory.isFileExist(snapshotFilePath) || table.isHivePartitionTable) { |
| // everything is fine |
| return |
| } |
| |
| // something wrong, read the snapshot file and do recover steps |
| // 1. check segment state in table status file |
| // 2. If in SUCCESS state, delete all stage files read inn snapshot file |
| // 3. If in IN_PROGRESS state, delete the entry in table status and load again |
| LOGGER.info(s"snapshot file found ($snapshotFilePath), start recovery process") |
| val lines = FileFactory.readLinesInFile(snapshotFilePath, conf) |
| if (lines.size() < 2) { |
| throw new RuntimeException("Invalid snapshot file, " + lines.size() + " lines") |
| } |
| |
| val segmentId = lines.get(0) |
| val stageFileNames = lines.remove(0) |
| LOGGER.info(s"Segment $segmentId need recovery, ${stageFileNames.length} stage files") |
| |
| // lock the table status |
| var lock = CarbonLockFactory.getCarbonLockObj( |
| table.getAbsoluteTableIdentifier, LockUsage.TABLE_STATUS_LOCK) |
| if (!lock.lockWithRetries()) { |
| throw new RuntimeException(s"Failed to lock table status for " + |
| s"${table.getDatabaseName}.${table.getTableName}") |
| } |
| try { |
| val segments = SegmentStatusManager.readTableStatusFile( |
| CarbonTablePath.getTableStatusFilePath(table.getTablePath) |
| ) |
| val matchedSegment = segments.filter(_.getLoadName.equals(segmentId)) |
| if (matchedSegment.length != 1) { |
| throw new RuntimeException("unexpected " + matchedSegment.length + " segment found") |
| } |
| matchedSegment(0).getSegmentStatus match { |
| case SegmentStatus.SUCCESS => |
| // delete all stage files |
| lock.unlock() |
| lock = null |
| LOGGER.info(s"Segment $segmentId is in SUCCESS state, about to delete " + |
| s"${stageFileNames.length} stage files") |
| val numThreads = Math.min(Math.max(stageFileNames.length, 1), 10) |
| val executorService = Executors.newFixedThreadPool(numThreads) |
| stageFileNames.map { fileName => |
| executorService.submit(new Runnable { |
| override def run(): Unit = { |
| FileFactory.getCarbonFile( |
| CarbonTablePath.getStageDir(table.getTablePath) + |
| CarbonCommonConstants.FILE_SEPARATOR + fileName |
| ).delete() |
| } |
| }) |
| }.map { future => |
| future.get() |
| } |
| case other => |
| // delete entry in table status and load again |
| LOGGER.warn(s"Segment $segmentId is in $other state, about to delete the " + |
| s"segment entry and load again") |
| val segmentToWrite = segments.filterNot(_.getLoadName.equals(segmentId)) |
| SegmentStatusManager.writeLoadDetailsIntoFile( |
| CarbonTablePath.getTableStatusFilePath(table.getTablePath), |
| segmentToWrite) |
| } |
| } finally { |
| if (lock != null) { |
| lock.unlock() |
| } |
| } |
| LOGGER.info(s"Finish recovery, delete snapshot file: $snapshotFilePath") |
| FileFactory.getCarbonFile(snapshotFilePath).delete() |
| } |
| |
| /** |
| * Start global sort loading |
| */ |
| private def startLoading( |
| spark: SparkSession, |
| table: CarbonTable, |
| stageInput: Seq[StageInput], |
| stageFiles: Array[(CarbonFile, CarbonFile)], |
| snapshotFilePath: String |
| ): Unit = { |
| var loadModel: CarbonLoadModel = null |
| try { |
| // 1) add new segment with INSERT_IN_PROGRESS into table status |
| loadModel = DataLoadProcessBuilderOnSpark.createLoadModelForGlobalSort(spark, table) |
| CarbonLoaderUtil.recordNewLoadMetadata(loadModel) |
| |
| // 2) write all existing stage file names and segmentId into a new snapshot file |
| // The content of snapshot file is: first line is segmentId, followed by each line is |
| // one stage file name |
| val content = |
| (Seq(loadModel.getSegmentId) ++ stageFiles.map(_._1.getAbsolutePath)).mkString("\n") |
| FileFactory.writeFile(content, snapshotFilePath) |
| |
| // 3) do loading. |
| val splits = stageInput.flatMap(_.createSplits().asScala) |
| LOGGER.info(s"start to load ${splits.size} files into " + |
| s"${table.getDatabaseName}.${table.getTableName}") |
| val start = System.currentTimeMillis() |
| val dataFrame = DataLoadProcessBuilderOnSpark.createInputDataFrame(spark, table, splits) |
| // accumulator to collect segment metadata info such as columnId and it's minMax values |
| val segmentMetaDataAccumulator = spark.sqlContext |
| .sparkContext |
| .collectionAccumulator[Map[String, SegmentMetaDataInfo]] |
| if (table.getBucketingInfo == null) { |
| DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort( |
| spark, |
| Option(dataFrame), |
| loadModel, |
| SparkSQLUtil.sessionState(spark).newHadoopConf(), |
| segmentMetaDataAccumulator |
| ).map { row => |
| (row._1, FailureCauses.NONE == row._2._2.failureCauses) |
| } |
| } else { |
| CarbonDataRDDFactory.loadDataFrame( |
| spark.sqlContext, |
| Option(dataFrame), |
| None, |
| loadModel, |
| segmentMetaDataAccumulator) |
| } |
| LOGGER.info(s"finish data loading, time taken ${System.currentTimeMillis() - start}ms") |
| |
| // 4) write segment file and update the segment entry to SUCCESS |
| val segmentFileName = SegmentFileStore.writeSegmentFile( |
| table, loadModel.getSegmentId, loadModel.getFactTimeStamp.toString) |
| // create operationContext to fire load events |
| val operationContext: OperationContext = new OperationContext |
| val (tableIndexes, indexOperationContext) = CommonLoadUtils.firePreLoadEvents( |
| sparkSession = spark, |
| carbonLoadModel = loadModel, |
| uuid = "", |
| factPath = "", |
| optionsFinal = options.asJava, |
| options = options.asJava, |
| isOverwriteTable = false, |
| isDataFrame = true, |
| updateModel = None, |
| operationContext = operationContext) |
| // in case of insert stage files, added the below property to avoid merge index and |
| // fire event to load data to secondary index |
| operationContext.setProperty(CarbonCommonConstants.IS_INSERT_STAGE, "true") |
| val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent = |
| new LoadTablePreStatusUpdateEvent( |
| table.getCarbonTableIdentifier, |
| loadModel) |
| OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, operationContext) |
| |
| val status = SegmentFileStore.updateTableStatusFile( |
| table, loadModel.getSegmentId, segmentFileName, |
| table.getCarbonTableIdentifier.getTableId, |
| new SegmentFileStore(table.getTablePath, segmentFileName), |
| SegmentStatus.SUCCESS) |
| |
| // trigger load post events |
| if (status) { |
| val loadTablePostStatusUpdateEvent: LoadTablePostStatusUpdateEvent = |
| new LoadTablePostStatusUpdateEvent(loadModel) |
| try { |
| OperationListenerBus.getInstance() |
| .fireEvent(loadTablePostStatusUpdateEvent, operationContext) |
| } catch { |
| case ex: Exception => |
| LOGGER.error("Problem while committing indexes", ex) |
| } |
| } |
| // fire event to load data to materialized views and merge bloom index files |
| CommonLoadUtils.firePostLoadEvents(spark, |
| loadModel, |
| tableIndexes, |
| indexOperationContext, |
| table, |
| operationContext) |
| } catch { |
| case ex: Throwable => |
| LOGGER.error(s"failed to insert ${table.getDatabaseName}.${table.getTableName}", ex) |
| if (loadModel != null) { |
| CarbonLoaderUtil.updateTableStatusForFailure(loadModel) |
| } |
| throw ex |
| } |
| } |
| |
| /** |
| * Start global sort loading of partition table |
| */ |
| private def startLoadingWithPartition( |
| spark: SparkSession, |
| table: CarbonTable, |
| stageInput: Seq[StageInput], |
| stageFiles: Array[(CarbonFile, CarbonFile)], |
| snapshotFilePath: String |
| ): Unit = { |
| val partitionDataList = listPartitionFiles(stageInput) |
| |
| val start = System.currentTimeMillis() |
| partitionDataList.map { |
| case (partition, splits) => |
| LOGGER.info(s"start to load ${splits.size} files into " + |
| s"${table.getDatabaseName}.${table.getTableName}. " + |
| s"Partition information: ${partition.mkString(",")}") |
| val dataFrame = |
| DataLoadProcessBuilderOnSpark.createInputDataFrame(spark, table, splits) |
| val columns = dataFrame.columns |
| val header = columns.mkString(",") |
| val selectColumns = columns.filter(!partition.contains(_)) |
| val selectedDataFrame = dataFrame.select(selectColumns.head, selectColumns.tail: _*) |
| CarbonInsertIntoCommand( |
| databaseNameOp = Option(table.getDatabaseName), |
| tableName = table.getTableName, |
| options = scala.collection.immutable.Map("fileheader" -> header, |
| "binary_decoder" -> "base64"), |
| isOverwriteTable = false, |
| logicalPlan = selectedDataFrame.queryExecution.analyzed, |
| tableInfo = table.getTableInfo, |
| partition = partition |
| ).run(spark) |
| } |
| LOGGER.info(s"finish data loading, time taken ${ System.currentTimeMillis() - start }ms") |
| } |
| |
| /** |
| * @return return a (partitionMap, InputSplits) pair list. |
| * the partitionMap contains all partition column name and value. |
| * the InputSplits is all data file information of current partition. |
| */ |
| private def listPartitionFiles( |
| stageInputs : Seq[StageInput] |
| ): Seq[(Map[String, Option[String]], Seq[InputSplit])] = { |
| val partitionMap = new util.HashMap[Map[String, Option[String]], util.List[InputSplit]]() |
| stageInputs.foreach ( |
| stageInput => { |
| val locations = stageInput.getLocations.asScala |
| locations.foreach ( |
| location => { |
| val partition = location.getPartitions.asScala.map(t => (t._1, Option(t._2))).toMap |
| var splits = partitionMap.get(partition) |
| if (splits == null) { |
| partitionMap.put(partition, new util.ArrayList[InputSplit]()) |
| splits = partitionMap.get(partition) |
| } |
| splits.addAll ( |
| location.getFiles.asScala |
| .filter(_._1.endsWith(CarbonCommonConstants.FACT_FILE_EXT)) |
| .map( |
| file => { |
| CarbonInputSplit.from( |
| "-1", "0", |
| stageInput.getBase + CarbonCommonConstants.FILE_SEPARATOR + file._1, 0, |
| file._2, ColumnarFormatVersion.V3, null |
| ) |
| } |
| ).toList.asJava |
| ) |
| } |
| ) |
| } |
| ) |
| partitionMap.asScala.map(entry => (entry._1, entry._2.asScala)).toSeq |
| } |
| |
| /** |
| * Read stage files and return input files |
| */ |
| private def collectStageInputs( |
| executorService: ExecutorService, |
| stageFiles: Array[(CarbonFile, CarbonFile)] |
| ): Seq[StageInput] = { |
| val startTime = System.currentTimeMillis() |
| val output = Collections.synchronizedList(new util.ArrayList[StageInput]()) |
| val gson = new Gson() |
| stageFiles.map { stage => |
| executorService.submit(new Runnable { |
| override def run(): Unit = { |
| val filePath = stage._1.getAbsolutePath |
| val stream = FileFactory.getDataInputStream(filePath) |
| try { |
| val stageInput = gson.fromJson(new InputStreamReader(stream), classOf[StageInput]) |
| output.add(stageInput) |
| } finally { |
| stream.close() |
| } |
| } |
| }) |
| }.map { future => |
| future.get() |
| } |
| LOGGER.info(s"read stage files taken ${System.currentTimeMillis() - startTime}ms") |
| output.asScala |
| } |
| |
| /** |
| * create '.loading' file to tag the stage in process |
| * return the loading files failed to create |
| */ |
| private def createStageLoadingFiles( |
| executorService: ExecutorService, |
| stageFiles: Array[(CarbonFile, CarbonFile)]): Array[(CarbonFile, CarbonFile)] = { |
| stageFiles.map { files => |
| executorService.submit(new Callable[(CarbonFile, CarbonFile, Boolean)] { |
| override def call(): (CarbonFile, CarbonFile, Boolean) = { |
| // Get the loading files path |
| val stageLoadingFile = |
| FileFactory.getCarbonFile(files._1.getAbsolutePath + |
| CarbonTablePath.LOADING_FILE_SUFFIX); |
| // Try to create loading files |
| // make isFailed to be true if createNewFile return false. |
| // the reason can be file exists or exceptions. |
| var isFailed = !stageLoadingFile.createNewFile() |
| // if file exists, modify the lastmodifiedtime of the file. |
| if (isFailed) { |
| // make isFailed to be true if setLastModifiedTime return false. |
| isFailed = !stageLoadingFile.setLastModifiedTime(System.currentTimeMillis()); |
| } |
| (files._1, files._2, isFailed) |
| } |
| }) |
| }.map { future => |
| future.get() |
| }.filter { files => |
| // keep the files when isFailed is ture. so we can retry on these files. |
| files._3 |
| }.map { files => |
| (files._1, files._2) |
| } |
| } |
| |
| /** |
| * create '.loading' file with retry |
| */ |
| private def createStageLoadingFilesWithRetry( |
| executorService: ExecutorService, |
| stageFiles: Array[(CarbonFile, CarbonFile)]): Unit = { |
| val startTime = System.currentTimeMillis() |
| var retry = CarbonInsertFromStageCommand.DELETE_FILES_RETRY_TIMES |
| var needToCreateStageLoadingFiles = stageFiles |
| while (retry > 0 && needToCreateStageLoadingFiles.nonEmpty) { |
| needToCreateStageLoadingFiles = |
| createStageLoadingFiles(executorService, needToCreateStageLoadingFiles) |
| retry -= 1 |
| } |
| LOGGER.info(s"finished to create stage loading files, time taken: " + |
| s"${System.currentTimeMillis() - startTime}ms") |
| if (needToCreateStageLoadingFiles.nonEmpty) { |
| LOGGER.warn(s"failed to create loading files:" + |
| needToCreateStageLoadingFiles.map(_._1.getName).mkString(",")) |
| } |
| } |
| |
| /** |
| * Delete stage files and success files and loading files |
| * Return the files failed to delete |
| */ |
| private def deleteStageFiles( |
| executorService: ExecutorService, |
| stageFiles: Array[(CarbonFile, CarbonFile)]): Array[(CarbonFile, CarbonFile)] = { |
| stageFiles.map { files => |
| executorService.submit(new Callable[(CarbonFile, CarbonFile, Boolean)] { |
| override def call(): (CarbonFile, CarbonFile, Boolean) = { |
| // Delete three types of file: stage|.success|.loading |
| val stageLoadingFile = |
| FileFactory.getCarbonFile(files._1.getAbsolutePath |
| + CarbonTablePath.LOADING_FILE_SUFFIX); |
| var isFailed = false |
| // If delete() return false, maybe the reason is FileNotFount or FileFailedClean. |
| // Considering FileNotFound means FileCleanSucessfully. |
| // We need double check the file exists or not when delete() return false. |
| if (!(files._1.delete() && files._2.delete() && stageLoadingFile.delete())) { |
| // If the file still exists, make isFailed to be true |
| // So we can retry to delete this file. |
| isFailed = files._1.exists() || files._1.exists() || stageLoadingFile.exists() |
| } |
| (files._1, files._2, isFailed) |
| } |
| }) |
| }.map { future => |
| future.get() |
| }.filter { files => |
| // keep the files when isFailed is ture. so we can retry on these files. |
| files._3 |
| }.map { files => |
| (files._1, files._2) |
| } |
| } |
| |
| /** |
| * Delete stage file and success file with retry |
| */ |
| private def deleteStageFilesWithRetry( |
| executorService: ExecutorService, |
| stageFiles: Array[(CarbonFile, CarbonFile)]): Unit = { |
| val startTime = System.currentTimeMillis() |
| var retry = CarbonInsertFromStageCommand.DELETE_FILES_RETRY_TIMES |
| var needToDeleteStageFiles = stageFiles |
| while (retry > 0 && needToDeleteStageFiles.nonEmpty) { |
| needToDeleteStageFiles = |
| deleteStageFiles(executorService, needToDeleteStageFiles) |
| retry -= 1 |
| } |
| LOGGER.info(s"finished to delete stage files, time taken: " + |
| s"${System.currentTimeMillis() - startTime}ms") |
| // if there are still stage files failed to clean, print log. |
| if (needToDeleteStageFiles.nonEmpty) { |
| LOGGER.warn(s"failed to clean up stage files:" + |
| needToDeleteStageFiles.map(_._1.getName).mkString(",")) |
| } |
| } |
| |
| /** |
| * Delete snapshot file with retry |
| * Return false means the snapshot file was cleaned successfully |
| * While return true means the snapshot file was failed to clean |
| */ |
| private def deleteSnapShotFile( |
| snapshotFilePath: String): Boolean = { |
| val snapshotFile = FileFactory.getCarbonFile(snapshotFilePath) |
| // If delete() return false, maybe the reason is FileNotFount or FileFailedClean. |
| // Considering FileNotFound means FileCleanSucessfully. |
| // We need double check the file exists or not when delete() return false. |
| if (!snapshotFile.delete()) { |
| return snapshotFile.exists() |
| } |
| true |
| } |
| |
| /** |
| * Delete snapshot file with retry |
| */ |
| private def deleteSnapShotFileWithRetry( |
| table: CarbonTable, |
| snapshotFilePath: String): Unit = { |
| if (table.isHivePartitionTable) { |
| return |
| } |
| var retries = CarbonInsertFromStageCommand.DELETE_FILES_RETRY_TIMES |
| while(deleteSnapShotFile(snapshotFilePath) && retries > 0) { |
| retries -= 1 |
| } |
| } |
| |
| /* |
| * Collect all stage files and matched success files. |
| * A stage file without success file will not be collected |
| */ |
| private def listStageFiles( |
| loadDetailsDir: String, |
| hadoopConf: Configuration, |
| batchSize: Int, |
| ascendingSort: Boolean |
| ): Array[(CarbonFile, CarbonFile)] = { |
| val dir = FileFactory.getCarbonFile(loadDetailsDir, hadoopConf) |
| if (dir.exists()) { |
| val allFiles = dir.listFiles() |
| val successFiles = allFiles.filter { file => |
| file.getName.endsWith(CarbonTablePath.SUCCESS_FILE_SUFFIX) |
| }.map { file => |
| (file.getName.substring(0, file.getName.indexOf(".")), file) |
| }.toMap |
| |
| // different insertstage processes can load different data separately |
| // by choose the stages without 'loading' tag or stages loaded timeout. |
| // which avoid loading the same data between concurrent insertstage processes. |
| // Overall, There are two conditions to choose stages to process: |
| // 1) stages never loaded, choose the stages without '.loading' tag. |
| // 2) stages loaded timeout, the timeout threshold depends on INSERT_STAGE_TIMEOUT |
| val loadingFiles = allFiles.filter { file => |
| file.getName.endsWith(CarbonTablePath.LOADING_FILE_SUFFIX) |
| }.filter { file => |
| (System.currentTimeMillis() - file.getLastModifiedTime) < |
| CarbonInsertFromStageCommand.INSERT_STAGE_TIMEOUT |
| }.map { file => |
| (file.getName.substring(0, file.getName.indexOf(".")), file) |
| }.toMap |
| |
| val stageFiles = allFiles.filter { file => |
| !file.getName.endsWith(CarbonTablePath.SUCCESS_FILE_SUFFIX) |
| }.filter { file => |
| !file.getName.endsWith(CarbonTablePath.LOADING_FILE_SUFFIX) |
| }.filter { file => |
| successFiles.contains(file.getName) |
| }.filterNot { file => |
| loadingFiles.contains(file.getName) |
| }.sortWith { |
| (file1, file2) => |
| if (ascendingSort) { |
| file1.getLastModifiedTime < file2.getLastModifiedTime |
| } else { |
| file1.getLastModifiedTime > file2.getLastModifiedTime |
| } |
| }.map { file => |
| (file, successFiles(file.getName)) |
| } |
| if (stageFiles.length <= batchSize) { |
| stageFiles |
| } else { |
| stageFiles.dropRight(stageFiles.length - batchSize) |
| } |
| } else { |
| Array.empty |
| } |
| } |
| |
| /** |
| * INGEST operation does not support concurrent, so it is one lock for one table |
| */ |
| private def acquireIngestLock(table: CarbonTable): ICarbonLock = { |
| val tableIdentifier = table.getAbsoluteTableIdentifier |
| val lock = CarbonLockFactory.getCarbonLockObj(tableIdentifier, LockUsage.INGEST_LOCK) |
| val retryCount = CarbonLockUtil.getLockProperty( |
| CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CARBON_LOCK, |
| CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CARBON_LOCK_DEFAULT |
| ) |
| val maxTimeout = CarbonLockUtil.getLockProperty( |
| CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK, |
| CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT |
| ) |
| if (lock.lockWithRetries(retryCount, maxTimeout)) { |
| lock |
| } else { |
| throw new IOException( |
| s"Not able to acquire the lock for table status file for $tableIdentifier") |
| } |
| } |
| |
| override protected def opName: String = "INSERT STAGE" |
| } |
| |
| object CarbonInsertFromStageCommand { |
| |
| val DELETE_FILES_RETRY_TIMES = 3 |
| |
| val BATCH_FILE_COUNT_KEY = "batch_file_count" |
| |
| val BATCH_FILE_COUNT_DEFAULT: String = Integer.MAX_VALUE.toString |
| |
| val BATCH_FILE_ORDER_KEY = "batch_file_order" |
| |
| /** |
| * Use this option will insert the earliest stage files into the table. |
| */ |
| val BATCH_FILE_ORDER_ASC = "ASC" |
| |
| /** |
| * Use this option will insert the latest stage files into the table. |
| */ |
| val BATCH_FILE_ORDER_DESC = "DESC" |
| |
| /* |
| * Keep default ascending order. (Earliest first) |
| */ |
| val BATCH_FILE_ORDER_DEFAULT: String = BATCH_FILE_ORDER_ASC |
| |
| val INSERT_STAGE_TIMEOUT = CarbonProperties.getInsertStageTimeout |
| } |