blob: 7de86e9623ffbd4aa3d5c51756e8cc3e2339486a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.api
import java.io.InputStreamReader
import java.time.{Duration, Instant}
import java.util
import java.util.{Collections, Comparator}
import scala.collection.JavaConverters._
import com.google.gson.Gson
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.unsafe.types.UTF8String
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.filesystem.CarbonFile
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, SegmentFileStore}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.{FileFormat, LoadMetadataDetails, SegmentStatus, SegmentStatusManager, StageInput}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.streaming.segment.StreamSegment
object CarbonStore {
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
def readSegments(
tablePath: String,
showHistory: Boolean,
limit: Option[Int]): Array[LoadMetadataDetails] = {
val metaFolder = CarbonTablePath.getMetadataPath(tablePath)
var segmentsMetadataDetails = if (showHistory) {
SegmentStatusManager.readLoadMetadata(metaFolder) ++
SegmentStatusManager.readLoadHistoryMetadata(metaFolder)
} else {
SegmentStatusManager.readLoadMetadata(metaFolder)
}
if (!showHistory) {
segmentsMetadataDetails = segmentsMetadataDetails
.filter(_.getVisibility.equalsIgnoreCase("true"))
segmentsMetadataDetails = segmentsMetadataDetails.sortWith { (l1, l2) =>
java.lang.Double.parseDouble(l1.getLoadName) >
java.lang.Double.parseDouble(l2.getLoadName)
}
}
if (limit.isDefined) {
segmentsMetadataDetails.slice(0, limit.get)
} else {
segmentsMetadataDetails
}
}
/**
* Read stage files and return input files
*/
def readStages(tablePath: String): Seq[StageInput] = {
val stageFiles = listStageFiles(CarbonTablePath.getStageDir(tablePath))
var output = Collections.synchronizedList(new util.ArrayList[StageInput]())
output.addAll(readStageInput(stageFiles._1,
StageInput.StageStatus.Unload).asJavaCollection)
output.addAll(readStageInput(stageFiles._2,
StageInput.StageStatus.Loading).asJavaCollection)
Collections.sort(output, new Comparator[StageInput]() {
def compare(stageInput1: StageInput, stageInput2: StageInput): Int = {
(stageInput2.getCreateTime - stageInput1.getCreateTime).intValue()
}
})
output.asScala
}
/**
* Read stage files and return input files
*/
def readStageInput(
stageFiles: Seq[CarbonFile],
status: StageInput.StageStatus): Seq[StageInput] = {
val gson = new Gson()
val output = Collections.synchronizedList(new util.ArrayList[StageInput]())
stageFiles.map { stage =>
val filePath = stage.getAbsolutePath
val stream = FileFactory.getDataInputStream(filePath)
try {
val stageInput = gson.fromJson(new InputStreamReader(stream), classOf[StageInput])
stageInput.setCreateTime(stage.getLastModifiedTime)
stageInput.setStatus(status)
output.add(stageInput)
} finally {
stream.close()
}
}
output.asScala
}
/*
* Collect all stage files and matched success files and loading files.
* return unloaded stagefiles and loading stagefiles in the end.
*/
def listStageFiles(
loadDetailsDir: String): (Array[CarbonFile], Array[CarbonFile]) = {
val dir = FileFactory.getCarbonFile(loadDetailsDir)
if (dir.exists()) {
// 1. List all files in the stage dictionary.
val allFiles = dir.listFiles()
// 2. Get StageFile list.
// Firstly, get the stage files in the stage dictionary.
// which exclude the success files and loading files
// Second, only collect the stage files having success tag.
val stageFiles = allFiles.filterNot { file =>
file.getName.endsWith(CarbonTablePath.SUCCESS_FILE_SUFFIX)
}.filterNot { file =>
file.getName.endsWith(CarbonTablePath.LOADING_FILE_SUFFIX)
}.filter { file =>
allFiles.contains(file.getName + CarbonTablePath.SUCCESS_FILE_SUFFIX)
}.sortWith {
(file1, file2) => file1.getLastModifiedTime > file2.getLastModifiedTime
}
// 3. Get the unloaded stage files, which haven't loading tag.
val unloadedFiles = stageFiles.filterNot { file =>
allFiles.contains(file.getName + CarbonTablePath.LOADING_FILE_SUFFIX)
}
// 4. Get the loading stage files, which have loading tag.
val loadingFiles = stageFiles.filter { file =>
allFiles.contains(file.getName + CarbonTablePath.LOADING_FILE_SUFFIX)
}
(unloadedFiles, loadingFiles)
} else {
(Array.empty, Array.empty)
}
}
def getPartitions(tablePath: String, load: LoadMetadataDetails): Seq[String] = {
val segmentFile = SegmentFileStore.readSegmentFile(
CarbonTablePath.getSegmentFilePath(tablePath, load.getSegmentFile))
if (segmentFile == null) {
return Seq.empty
}
val locationMap = segmentFile.getLocationMap
if (locationMap != null) {
locationMap.asScala.map {
case (_, detail) =>
s"{${ detail.getPartitions.asScala.mkString(",") }}"
}.toSeq
} else {
Seq.empty
}
}
def getMergeTo(load: LoadMetadataDetails): String = {
if (load.getMergedLoadName != null) {
load.getMergedLoadName
} else {
"NA"
}
}
def getExternalSegmentPath(load: LoadMetadataDetails): String = {
if (StringUtils.isNotEmpty(load.getPath)) {
load.getPath
} else {
"NA"
}
}
def getLoadStartTime(load: LoadMetadataDetails): String = {
val startTime =
if (load.getLoadStartTime == CarbonCommonConstants.SEGMENT_LOAD_TIME_DEFAULT) {
"NA"
} else {
new java.sql.Timestamp(load.getLoadStartTime).toString
}
startTime
}
def getLoadEndTime(load: LoadMetadataDetails): String = {
val endTime =
if (load.getLoadStartTime == CarbonCommonConstants.SEGMENT_LOAD_TIME_DEFAULT) {
"NA"
} else {
new java.sql.Timestamp(load.getLoadEndTime).toString
}
endTime
}
def getLoadTimeTaken(load: LoadMetadataDetails): String = {
if (load.getLoadEndTime == CarbonCommonConstants.SEGMENT_LOAD_TIME_DEFAULT) {
"NA"
} else {
Duration.between(
Instant.ofEpochMilli(load.getLoadStartTime),
Instant.ofEpochMilli(load.getLoadEndTime)
).toString.replace("PT", "")
}
}
def getLoadTimeTakenAsMillis(load: LoadMetadataDetails): Long = {
if (load.getLoadEndTime == CarbonCommonConstants.SEGMENT_LOAD_TIME_DEFAULT) {
// loading in progress
-1L
} else {
load.getLoadEndTime - load.getLoadStartTime
}
}
def getDataAndIndexSize(
tablePath: String,
load: LoadMetadataDetails): (Long, Long) = {
val (dataSize, indexSize) = if (load.getFileFormat.equals(FileFormat.ROW_V1)) {
// for streaming segment, we should get the actual size from the index file
// since it is continuously inserting data
val segmentDir = CarbonTablePath.getSegmentPath(tablePath, load.getLoadName)
val indexPath = CarbonTablePath.getCarbonStreamIndexFilePath(segmentDir)
val indexFile = FileFactory.getCarbonFile(indexPath)
if (indexFile.exists()) {
val indices =
StreamSegment.readIndexFile(indexPath)
(indices.asScala.map(_.getFile_size).sum, indexFile.getSize)
} else {
(-1L, -1L)
}
} else {
// If the added segment is other than carbon segment then we can only display the data
// size and not index size, we can get the data size from table status file directly
if (!load.getFileFormat.isCarbonFormat) {
(if (load.getDataSize == null) -1L else load.getDataSize.toLong, -1L)
} else {
(if (load.getDataSize == null) -1L else load.getDataSize.toLong,
if (load.getIndexSize == null) -1L else load.getIndexSize.toLong)
}
}
(dataSize, indexSize)
}
/**
* The method deletes all data if forceTableCLean <true> and lean garbage segment
* (MARKED_FOR_DELETE state) if forceTableCLean <false>
*
* @param dbName : Database name
* @param tableName : Table name
* @param tablePath : Table path
* @param carbonTable : CarbonTable Object <null> in case of force clean
* @param forceTableClean : <true> for force clean it will delete all data
* <false> it will clean garbage segment (MARKED_FOR_DELETE state)
* @param currentTablePartitions : Hive Partitions details
*/
def cleanFiles(
dbName: String,
tableName: String,
tablePath: String,
carbonTable: CarbonTable,
forceTableClean: Boolean,
currentTablePartitions: Option[Seq[PartitionSpec]] = None,
truncateTable: Boolean = false): Unit = {
var carbonCleanFilesLock: ICarbonLock = null
val absoluteTableIdentifier = if (forceTableClean) {
AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
} else {
carbonTable.getAbsoluteTableIdentifier
}
try {
val errorMsg = "Clean files request is failed for " +
s"$dbName.$tableName" +
". Not able to acquire the clean files lock due to another clean files " +
"operation is running in the background."
// in case of force clean the lock is not required
if (forceTableClean) {
FileFactory.deleteAllCarbonFilesOfDir(
FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath))
} else {
carbonCleanFilesLock =
CarbonLockUtil
.getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
if (truncateTable) {
SegmentStatusManager.truncateTable(carbonTable)
}
SegmentStatusManager.deleteLoadsAndUpdateMetadata(
carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
currentTablePartitions match {
case Some(partitions) =>
SegmentFileStore.cleanSegments(
carbonTable,
currentTablePartitions.map(_.asJava).orNull,
true)
case _ =>
}
}
} finally {
if (currentTablePartitions.equals(None)) {
cleanUpPartitionFoldersRecursively(carbonTable, List.empty[PartitionSpec])
} else {
cleanUpPartitionFoldersRecursively(carbonTable, currentTablePartitions.get.toList)
}
if (carbonCleanFilesLock != null) {
CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
}
}
}
/**
* delete partition folders recursively
*
* @param carbonTable
* @param partitionSpecList
*/
def cleanUpPartitionFoldersRecursively(carbonTable: CarbonTable,
partitionSpecList: List[PartitionSpec]): Unit = {
if (carbonTable != null && carbonTable.isHivePartitionTable) {
val loadMetadataDetails = SegmentStatusManager
.readLoadMetadata(carbonTable.getMetadataPath)
val carbonFile = FileFactory.getCarbonFile(carbonTable.getTablePath)
// list all files from table path
val listOfDefaultPartFilesIterator = carbonFile.listFiles(true)
loadMetadataDetails.foreach { metadataDetail =>
if (metadataDetail.getSegmentStatus.equals(SegmentStatus.MARKED_FOR_DELETE) &&
metadataDetail.getSegmentFile == null) {
val loadStartTime: Long = metadataDetail.getLoadStartTime
// delete all files of @loadStartTime from tablepath
cleanCarbonFilesInFolder(listOfDefaultPartFilesIterator, loadStartTime)
partitionSpecList.foreach {
partitionSpec =>
val partitionLocation = partitionSpec.getLocation
// For partition folder outside the tablePath
if (!partitionLocation.toString.startsWith(carbonTable.getTablePath)) {
val partitionCarbonFile = FileFactory
.getCarbonFile(partitionLocation.toString)
// list all files from partitionLoacation
val listOfExternalPartFilesIterator = partitionCarbonFile.listFiles(true)
// delete all files of @loadStartTime from externalPath
cleanCarbonFilesInFolder(listOfExternalPartFilesIterator, loadStartTime)
}
}
}
}
}
}
/**
*
* @param carbonFiles
* @param timestamp
*/
private def cleanCarbonFilesInFolder(carbonFiles: java.util.List[CarbonFile],
timestamp: Long): Unit = {
carbonFiles.asScala.foreach {
carbonFile =>
val filePath = carbonFile.getPath
val fileName = carbonFile.getName
if (CarbonTablePath.DataFileUtil.compareCarbonFileTimeStamp(fileName, timestamp)) {
// delete the file
FileFactory.deleteFile(filePath)
}
}
}
// validates load ids
private def validateLoadIds(loadids: Seq[String]): Unit = {
if (loadids.isEmpty) {
val errorMessage = "Error: Segment id(s) should not be empty."
throw new MalformedCarbonCommandException(errorMessage)
}
}
// TODO: move dbName and tableName to caller, caller should handle the log and error
def deleteLoadById(
loadids: Seq[String],
dbName: String,
tableName: String,
carbonTable: CarbonTable): Unit = {
validateLoadIds(loadids)
val path = carbonTable.getMetadataPath
try {
val invalidLoadIds = SegmentStatusManager.updateDeletionStatus(
carbonTable.getAbsoluteTableIdentifier, loadids.asJava, path).asScala
if (invalidLoadIds.isEmpty) {
LOGGER.info(s"Delete segment by Id is successfull for $dbName.$tableName.")
} else {
sys.error(s"Delete segment by Id is failed. Invalid ID is: ${invalidLoadIds.mkString(",")}")
}
} catch {
case ex: Exception =>
sys.error(ex.getMessage)
}
Seq.empty
}
// TODO: move dbName and tableName to caller, caller should handle the log and error
def deleteLoadByDate(
timestamp: String,
dbName: String,
tableName: String,
carbonTable: CarbonTable): Unit = {
val time = validateTimeFormat(timestamp)
val path = carbonTable.getMetadataPath
try {
val invalidLoadTimestamps =
SegmentStatusManager.updateDeletionStatus(
carbonTable.getAbsoluteTableIdentifier,
timestamp,
path,
time).asScala
if (invalidLoadTimestamps.isEmpty) {
LOGGER.info(s"Delete segment by date is successful for $dbName.$tableName.")
} else {
sys.error("Delete segment by date is failed. No matching segment found.")
}
} catch {
case ex: Exception =>
sys.error(ex.getMessage)
}
}
// this function is for test only
def isSegmentValid(
dbName: String,
tableName: String,
storePath: String,
segmentId: String): Boolean = {
val identifier = AbsoluteTableIdentifier.from(storePath, dbName, tableName, tableName)
val validAndInvalidSegments: SegmentStatusManager.ValidAndInvalidSegmentsInfo = new
SegmentStatusManager(
identifier).getValidAndInvalidSegments
validAndInvalidSegments.getValidSegments.contains(segmentId)
}
private def validateTimeFormat(timestamp: String): Long = {
try {
DateTimeUtils.stringToTimestamp(UTF8String.fromString(timestamp)).get
} catch {
case e: Exception =>
val errorMessage = "Error: Invalid load start time format: " + timestamp
throw new MalformedCarbonCommandException(errorMessage)
}
}
}