blob: 557165693fc6ee7bee0395e56674726ac8b6abed [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.api
import java.lang.Long
import scala.collection.JavaConverters._
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.util.CarbonException
import org.apache.spark.unsafe.types.UTF8String
import org.apache.carbondata.common.Strings
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.filesystem.CarbonFile
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, SegmentFileStore}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.streaming.segment.StreamSegment
object CarbonStore {
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
def showSegments(
limit: Option[String],
tablePath: String,
showHistory: Boolean): Seq[Row] = {
val metaFolder = CarbonTablePath.getMetadataPath(tablePath)
val loadMetadataDetailsArray = if (showHistory) {
SegmentStatusManager.readLoadMetadata(metaFolder) ++
SegmentStatusManager.readLoadHistoryMetadata(metaFolder)
} else {
SegmentStatusManager.readLoadMetadata(metaFolder)
}
if (loadMetadataDetailsArray.nonEmpty) {
var loadMetadataDetailsSortedArray = loadMetadataDetailsArray.sortWith { (l1, l2) =>
java.lang.Double.parseDouble(l1.getLoadName) > java.lang.Double.parseDouble(l2.getLoadName)
}
if (!showHistory) {
loadMetadataDetailsSortedArray = loadMetadataDetailsSortedArray
.filter(_.getVisibility.equalsIgnoreCase("true"))
}
if (limit.isDefined) {
val limitLoads = limit.get
try {
val lim = Integer.parseInt(limitLoads)
loadMetadataDetailsSortedArray = loadMetadataDetailsSortedArray.slice(0, lim)
} catch {
case _: NumberFormatException =>
CarbonException.analysisException("Entered limit is not a valid Number")
}
}
loadMetadataDetailsSortedArray
.map { load =>
val mergedTo =
if (load.getMergedLoadName != null) {
load.getMergedLoadName
} else {
"NA"
}
val path =
if (StringUtils.isNotEmpty(load.getPath)) {
load.getPath
} else {
"NA"
}
val startTime =
if (load.getLoadStartTime == CarbonCommonConstants.SEGMENT_LOAD_TIME_DEFAULT) {
"NA"
} else {
new java.sql.Timestamp(load.getLoadStartTime).toString
}
val endTime =
if (load.getLoadEndTime == CarbonCommonConstants.SEGMENT_LOAD_TIME_DEFAULT) {
"NA"
} else {
new java.sql.Timestamp(load.getLoadEndTime).toString
}
val (dataSize, indexSize) = if (load.getFileFormat.equals(FileFormat.ROW_V1)) {
// for streaming segment, we should get the actual size from the index file
// since it is continuously inserting data
val segmentDir = CarbonTablePath.getSegmentPath(tablePath, load.getLoadName)
val indexPath = CarbonTablePath.getCarbonStreamIndexFilePath(segmentDir)
val indexFile = FileFactory.getCarbonFile(indexPath)
if (indexFile.exists()) {
val indices =
StreamSegment.readIndexFile(indexPath)
(indices.asScala.map(_.getFile_size).sum, indexFile.getSize)
} else {
(-1L, -1L)
}
} else {
// If the added segment is other than carbon segment then we can only display the data
// size and not index size, we can get the data size from table status file directly
if (!load.getFileFormat.isCarbonFormat) {
(if (load.getIndexSize == null) -1L else load.getIndexSize.toLong, -1L)
} else {
(if (load.getDataSize == null) -1L else load.getDataSize.toLong,
if (load.getIndexSize == null) -1L else load.getIndexSize.toLong)
}
}
if (showHistory) {
Row(
load.getLoadName,
load.getSegmentStatus.getMessage,
startTime,
endTime,
mergedTo,
load.getFileFormat.toString.toUpperCase,
load.getVisibility,
Strings.formatSize(dataSize.toFloat),
Strings.formatSize(indexSize.toFloat),
path)
} else {
Row(
load.getLoadName,
load.getSegmentStatus.getMessage,
startTime,
endTime,
mergedTo,
load.getFileFormat.toString.toUpperCase,
Strings.formatSize(dataSize.toFloat),
Strings.formatSize(indexSize.toFloat),
path)
}
}.toSeq
} else {
Seq.empty
}
}
/**
* The method deletes all data if forceTableCLean <true> and lean garbage segment
* (MARKED_FOR_DELETE state) if forceTableCLean <false>
*
* @param dbName : Database name
* @param tableName : Table name
* @param tablePath : Table path
* @param carbonTable : CarbonTable Object <null> in case of force clean
* @param forceTableClean : <true> for force clean it will delete all data
* <false> it will clean garbage segment (MARKED_FOR_DELETE state)
* @param currentTablePartitions : Hive Partitions details
*/
def cleanFiles(
dbName: String,
tableName: String,
tablePath: String,
carbonTable: CarbonTable,
forceTableClean: Boolean,
currentTablePartitions: Option[Seq[PartitionSpec]] = None,
truncateTable: Boolean = false): Unit = {
var carbonCleanFilesLock: ICarbonLock = null
val absoluteTableIdentifier = if (forceTableClean) {
AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
} else {
carbonTable.getAbsoluteTableIdentifier
}
try {
val errorMsg = "Clean files request is failed for " +
s"$dbName.$tableName" +
". Not able to acquire the clean files lock due to another clean files " +
"operation is running in the background."
// in case of force clean the lock is not required
if (forceTableClean) {
FileFactory.deleteAllCarbonFilesOfDir(
FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath))
} else {
carbonCleanFilesLock =
CarbonLockUtil
.getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
if (truncateTable) {
SegmentStatusManager.truncateTable(carbonTable)
}
SegmentStatusManager.deleteLoadsAndUpdateMetadata(
carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
currentTablePartitions match {
case Some(partitions) =>
SegmentFileStore.cleanSegments(
carbonTable,
currentTablePartitions.map(_.asJava).orNull,
true)
case _ =>
}
}
} finally {
if (currentTablePartitions.equals(None)) {
cleanUpPartitionFoldersRecursively(carbonTable, List.empty[PartitionSpec])
} else {
cleanUpPartitionFoldersRecursively(carbonTable, currentTablePartitions.get.toList)
}
if (carbonCleanFilesLock != null) {
CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
}
}
}
/**
* delete partition folders recursively
*
* @param carbonTable
* @param partitionSpecList
*/
def cleanUpPartitionFoldersRecursively(carbonTable: CarbonTable,
partitionSpecList: List[PartitionSpec]): Unit = {
if (carbonTable != null && carbonTable.isHivePartitionTable) {
val loadMetadataDetails = SegmentStatusManager
.readLoadMetadata(carbonTable.getMetadataPath)
val carbonFile = FileFactory.getCarbonFile(carbonTable.getTablePath)
// list all files from table path
val listOfDefaultPartFilesIterator = carbonFile.listFiles(true)
loadMetadataDetails.foreach { metadataDetail =>
if (metadataDetail.getSegmentStatus.equals(SegmentStatus.MARKED_FOR_DELETE) &&
metadataDetail.getSegmentFile == null) {
val loadStartTime: Long = metadataDetail.getLoadStartTime
// delete all files of @loadStartTime from tablepath
cleanCarbonFilesInFolder(listOfDefaultPartFilesIterator, loadStartTime)
partitionSpecList.foreach {
partitionSpec =>
val partitionLocation = partitionSpec.getLocation
// For partition folder outside the tablePath
if (!partitionLocation.toString.startsWith(carbonTable.getTablePath)) {
val partitionCarbonFile = FileFactory
.getCarbonFile(partitionLocation.toString)
// list all files from partitionLoacation
val listOfExternalPartFilesIterator = partitionCarbonFile.listFiles(true)
// delete all files of @loadStartTime from externalPath
cleanCarbonFilesInFolder(listOfExternalPartFilesIterator, loadStartTime)
}
}
}
}
}
}
/**
*
* @param carbonFiles
* @param timestamp
*/
private def cleanCarbonFilesInFolder(carbonFiles: java.util.List[CarbonFile],
timestamp: Long): Unit = {
carbonFiles.asScala.foreach {
carbonFile =>
val filePath = carbonFile.getPath
val fileName = carbonFile.getName
if (CarbonTablePath.DataFileUtil.compareCarbonFileTimeStamp(fileName, timestamp)) {
// delete the file
FileFactory.deleteFile(filePath)
}
}
}
// validates load ids
private def validateLoadIds(loadids: Seq[String]): Unit = {
if (loadids.isEmpty) {
val errorMessage = "Error: Segment id(s) should not be empty."
throw new MalformedCarbonCommandException(errorMessage)
}
}
// TODO: move dbName and tableName to caller, caller should handle the log and error
def deleteLoadById(
loadids: Seq[String],
dbName: String,
tableName: String,
carbonTable: CarbonTable): Unit = {
validateLoadIds(loadids)
val path = carbonTable.getMetadataPath
try {
val invalidLoadIds = SegmentStatusManager.updateDeletionStatus(
carbonTable.getAbsoluteTableIdentifier, loadids.asJava, path).asScala
if (invalidLoadIds.isEmpty) {
LOGGER.info(s"Delete segment by Id is successfull for $dbName.$tableName.")
} else {
sys.error(s"Delete segment by Id is failed. Invalid ID is: ${invalidLoadIds.mkString(",")}")
}
} catch {
case ex: Exception =>
sys.error(ex.getMessage)
}
Seq.empty
}
// TODO: move dbName and tableName to caller, caller should handle the log and error
def deleteLoadByDate(
timestamp: String,
dbName: String,
tableName: String,
carbonTable: CarbonTable): Unit = {
val time = validateTimeFormat(timestamp)
val path = carbonTable.getMetadataPath
try {
val invalidLoadTimestamps =
SegmentStatusManager.updateDeletionStatus(
carbonTable.getAbsoluteTableIdentifier,
timestamp,
path,
time).asScala
if (invalidLoadTimestamps.isEmpty) {
LOGGER.info(s"Delete segment by date is successful for $dbName.$tableName.")
} else {
sys.error("Delete segment by date is failed. No matching segment found.")
}
} catch {
case ex: Exception =>
sys.error(ex.getMessage)
}
}
// this function is for test only
def isSegmentValid(
dbName: String,
tableName: String,
storePath: String,
segmentId: String): Boolean = {
val identifier = AbsoluteTableIdentifier.from(storePath, dbName, tableName, tableName)
val validAndInvalidSegments: SegmentStatusManager.ValidAndInvalidSegmentsInfo = new
SegmentStatusManager(
identifier).getValidAndInvalidSegments
validAndInvalidSegments.getValidSegments.contains(segmentId)
}
private def validateTimeFormat(timestamp: String): Long = {
try {
DateTimeUtils.stringToTimestamp(UTF8String.fromString(timestamp)).get
} catch {
case e: Exception =>
val errorMessage = "Error: Invalid load start time format: " + timestamp
throw new MalformedCarbonCommandException(errorMessage)
}
}
}