blob: e5ed5fe6d5318acb8c4498f28c68395ab42300b4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.mutate;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.index.Segment;
import org.apache.carbondata.core.locks.ICarbonLock;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.SegmentFileStore;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.mutate.data.BlockMappingVO;
import org.apache.carbondata.core.mutate.data.RowCountDetailsVO;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
/**
* This class contains all update utility methods
*/
public class CarbonUpdateUtil {
private static final Logger LOGGER =
LogServiceFactory.getLogService(CarbonUpdateUtil.class.getName());
/**
* returns required filed from tuple id
*
*/
public static String getRequiredFieldFromTID(String Tid, int index) {
return Tid.split(CarbonCommonConstants.FILE_SEPARATOR)[index];
}
/**
* returns required filed from tuple id
*
* @param Tid
* @param tid
* @return
*/
public static String getRequiredFieldFromTID(String Tid, TupleIdEnum tid) {
return Tid.split("/")[tid.getTupleIdIndex()];
}
/**
* returns segment along with block id
* @param Tid
* @return
*/
public static String getSegmentWithBlockFromTID(String Tid, boolean isPartitionTable) {
if (isPartitionTable) {
return getRequiredFieldFromTID(Tid, TupleIdEnum.PARTITION_SEGMENT_ID);
}
// this case is to check for add segment case, as now the segment id is present at first index,
// in add segment case, it will be in second index as the blockletID is generated by adding the
// complete external path
// this is in case of the external segment, where the tuple id has external path with #
// here no need to check for any path present in metadta details, as # can come in tuple id in
// case of multiple partitions, so partition check is already present above.
if (Tid.contains("#/")) {
return getRequiredFieldFromTID(Tid, TupleIdEnum.EXTERNAL_SEGMENT_ID)
+ CarbonCommonConstants.FILE_SEPARATOR + getRequiredFieldFromTID(Tid,
TupleIdEnum.EXTERNAL_BLOCK_ID);
}
return getRequiredFieldFromTID(Tid, TupleIdEnum.SEGMENT_ID)
+ CarbonCommonConstants.FILE_SEPARATOR + getRequiredFieldFromTID(Tid, TupleIdEnum.BLOCK_ID);
}
/**
* Returns block path from tuple id
*/
public static String getTableBlockPath(String tid, String tablePath, boolean isStandardTable,
boolean isPartitionTable) {
String partField = "0";
// If it has segment file then part field can be appended directly to table path
if (!isStandardTable) {
if (isPartitionTable) {
partField = getRequiredFieldFromTID(tid, TupleIdEnum.PARTITION_PART_ID);
return tablePath + CarbonCommonConstants.FILE_SEPARATOR + partField.replace("#", "/");
} else {
return tablePath;
}
}
String part = CarbonTablePath.addPartPrefix(partField);
String segment =
CarbonTablePath.addSegmentPrefix(getRequiredFieldFromTID(tid, TupleIdEnum.SEGMENT_ID));
return CarbonTablePath.getFactDir(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + part
+ CarbonCommonConstants.FILE_SEPARATOR + segment;
}
/**
* returns delete delta file path
*
* @param blockPath
* @param blockPath
* @param timestamp
* @return
*/
public static String getDeleteDeltaFilePath(String blockPath, String blockName,
String timestamp) {
return blockPath + CarbonCommonConstants.FILE_SEPARATOR + blockName
+ CarbonCommonConstants.HYPHEN + timestamp + CarbonCommonConstants.DELETE_DELTA_FILE_EXT;
}
/**
* @param updateDetailsList
* @param table
* @param updateStatusFileIdentifier
* @return
*/
public static boolean updateSegmentStatus(List<SegmentUpdateDetails> updateDetailsList,
CarbonTable table, String updateStatusFileIdentifier, boolean isCompaction,
boolean isForceWrite) {
boolean status = false;
SegmentUpdateStatusManager segmentUpdateStatusManager = new SegmentUpdateStatusManager(table);
if (isForceWrite && !CollectionUtils.isEmpty(updateDetailsList)) {
String segId = String.valueOf(SegmentStatusManager
.createNewSegmentId(segmentUpdateStatusManager.getLoadMetadataDetails()));
for (SegmentUpdateDetails detail : updateDetailsList) {
detail.setSegmentName(segId);
}
}
ICarbonLock updateLock = segmentUpdateStatusManager.getTableUpdateStatusLock();
boolean lockStatus = false;
try {
lockStatus = updateLock.lockWithRetries();
if (lockStatus) {
// read the existing file if present and update the same.
SegmentUpdateDetails[] oldDetails = segmentUpdateStatusManager
.getUpdateStatusDetails();
List<SegmentUpdateDetails> oldList = new ArrayList(Arrays.asList(oldDetails));
for (SegmentUpdateDetails newBlockEntry : updateDetailsList) {
mergeSegmentUpdate(isCompaction, oldList, newBlockEntry);
}
List<SegmentUpdateDetails> updateDetailsValidSeg = new ArrayList<>();
Set<String> loadDetailsSet = new HashSet<>();
for (LoadMetadataDetails details : segmentUpdateStatusManager.getLoadMetadataDetails()) {
loadDetailsSet.add(details.getLoadName());
}
for (SegmentUpdateDetails updateDetails : oldList) {
if (loadDetailsSet.contains(updateDetails.getSegmentName())) {
// we should only keep the update info of segments in table status, especially after
// compaction and clean files some compacted segments will be removed. It can keep
// tableupdatestatus file in small size which is good for performance.
updateDetailsValidSeg.add(updateDetails);
}
}
// In case of ForceWrite, write the segmentUpdateDetails to the tableUpdateStatus file
// without any validation of segments.
if (isForceWrite) {
segmentUpdateStatusManager.writeLoadDetailsIntoFile(oldList, updateStatusFileIdentifier);
} else {
segmentUpdateStatusManager
.writeLoadDetailsIntoFile(updateDetailsValidSeg, updateStatusFileIdentifier);
}
status = true;
} else {
LOGGER.error("Not able to acquire the segment update lock.");
status = false;
}
} catch (IOException e) {
status = false;
} finally {
if (lockStatus) {
if (updateLock.unlock()) {
LOGGER.info("Unlock the segment update lock successful.");
} else {
LOGGER.error("Not able to unlock the segment update lock.");
}
}
}
return status;
}
public static void mergeSegmentUpdate(boolean isCompaction, List<SegmentUpdateDetails> oldList,
SegmentUpdateDetails newBlockEntry) {
int index = oldList.indexOf(newBlockEntry);
if (index != -1) {
// update the element in existing list.
SegmentUpdateDetails blockDetail = oldList.get(index);
if (blockDetail.getDeleteDeltaStartTimestamp().isEmpty() || isCompaction) {
blockDetail
.setDeleteDeltaStartTimestamp(newBlockEntry.getDeleteDeltaStartTimestamp());
}
blockDetail.setDeleteDeltaEndTimestamp(newBlockEntry.getDeleteDeltaEndTimestamp());
blockDetail.setSegmentStatus(newBlockEntry.getSegmentStatus());
blockDetail.setDeletedRowsInBlock(newBlockEntry.getDeletedRowsInBlock());
// If the start and end time is different then the delta is there in multiple files so
// add them to the list to get the delta files easily with out listing.
if (!blockDetail.getDeleteDeltaStartTimestamp()
.equals(blockDetail.getDeleteDeltaEndTimestamp())) {
blockDetail.addDeltaFileStamp(blockDetail.getDeleteDeltaStartTimestamp());
blockDetail.addDeltaFileStamp(blockDetail.getDeleteDeltaEndTimestamp());
} else {
blockDetail.setDeltaFileStamps(null);
}
} else {
// add the new details to the list.
oldList.add(newBlockEntry);
}
}
/**
* Update table status
* @param updatedSegmentsList
* @param table
* @param updatedTimeStamp
* @param isTimestampUpdateRequired
* @param segmentsToBeDeleted
* @return
*/
public static Map<String, String> updateTableMetadataStatus(Set<Segment> updatedSegmentsList,
CarbonTable table, String updatedTimeStamp, boolean isTimestampUpdateRequired,
boolean isUpdateStatusFileUpdateRequired, List<Segment> segmentsToBeDeleted,
String tblStatusWriteVersion) {
return updateTableMetadataStatus(updatedSegmentsList, table, updatedTimeStamp,
isTimestampUpdateRequired, isUpdateStatusFileUpdateRequired,
segmentsToBeDeleted, new ArrayList<Segment>(), tblStatusWriteVersion);
}
/**
*
* @param updatedSegmentsList
* @param table
* @param updatedTimeStamp
* @param isTimestampUpdateRequired
* @param segmentsToBeDeleted
* @return
*/
public static Map<String, String> updateTableMetadataStatus(Set<Segment> updatedSegmentsList,
CarbonTable table, String updatedTimeStamp, boolean isTimestampUpdateRequired,
boolean isUpdateStatusFileUpdateRequired, List<Segment> segmentsToBeDeleted,
List<Segment> segmentFilesTobeUpdated, String uuid) {
boolean status = false;
Map<String, String> tuple = new HashMap<>();
String metaDataFilepath = table.getMetadataPath();
AbsoluteTableIdentifier identifier = table.getAbsoluteTableIdentifier();
String tableStatusPath =
CarbonTablePath.getTableStatusFilePath(identifier.getTablePath(), uuid);
SegmentStatusManager segmentStatusManager =
new SegmentStatusManager(identifier, table.getTableStatusVersion());
ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
boolean lockStatus = false;
try {
lockStatus = carbonLock.lockWithRetries();
if (lockStatus) {
LOGGER.info("Acquired lock for table" + table.getDatabaseName() + "." + table.getTableName()
+ " for table status update");
if (uuid.isEmpty() && CarbonProperties.isTableStatusMultiVersionEnabled()) {
String tblStatusWriteVersion = String.valueOf(System.currentTimeMillis());
tableStatusPath = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath(),
tblStatusWriteVersion);
tuple.put("tblStatusWriteVersion", tblStatusWriteVersion);
}
LoadMetadataDetails[] listOfLoadFolderDetailsArray =
SegmentStatusManager.readLoadMetadata(metaDataFilepath, table.getTableStatusVersion());
// to update table status only when required.
boolean isUpdateRequired = false;
for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
// we are storing the link between the 2 status files in the segment 0 only.
if (isUpdateStatusFileUpdateRequired &&
loadMetadata.getLoadName().equalsIgnoreCase("0")) {
loadMetadata.setUpdateStatusFileName(
CarbonUpdateUtil.getUpdateStatusFileName(updatedTimeStamp));
}
if (isTimestampUpdateRequired) {
// if the segments is in the list of marked for delete then update the status.
if (segmentsToBeDeleted.contains(new Segment(loadMetadata.getLoadName()))) {
loadMetadata.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
loadMetadata.setModificationOrDeletionTimestamp(Long.parseLong(updatedTimeStamp));
isUpdateRequired = true;
}
}
for (Segment segName : updatedSegmentsList) {
if (loadMetadata.getLoadName().equalsIgnoreCase(segName.getSegmentNo())) {
// if this call is coming from the delete delta flow then the time stamp
// String will come empty then no need to write into table status file.
if (isTimestampUpdateRequired) {
// if in case of update flow.
if (loadMetadata.getUpdateDeltaStartTimestamp().isEmpty()) {
// this means for first time it is getting updated .
loadMetadata.setUpdateDeltaStartTimestamp(updatedTimeStamp);
}
// update end timestamp for each time.
loadMetadata.setUpdateDeltaEndTimestamp(updatedTimeStamp);
isUpdateRequired = true;
}
if (segmentFilesTobeUpdated
.contains(Segment.toSegment(loadMetadata.getLoadName(), null))) {
loadMetadata.setSegmentFile(loadMetadata.getLoadName() + "_" + updatedTimeStamp
+ CarbonTablePath.SEGMENT_EXT);
isUpdateRequired = true;
}
}
}
}
try {
if (isUpdateRequired || isUpdateStatusFileUpdateRequired) {
SegmentStatusManager
.writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray);
}
} catch (IOException e) {
tuple.put("status", "false");
return tuple;
}
status = true;
} else {
LOGGER.error("Not able to acquire the lock for Table status update for table " + table
.getDatabaseName() + "." + table.getTableName());
}
} finally {
if (lockStatus) {
if (carbonLock.unlock()) {
LOGGER.info(
"Table unlocked successfully after table status update" + table.getDatabaseName()
+ "." + table.getTableName());
} else {
LOGGER.error(
"Unable to unlock Table lock for table" + table.getDatabaseName() + "." + table
.getTableName() + " during table status update");
}
}
}
tuple.put("status", String.valueOf(status));
return tuple;
}
/**
* gets the file name of the update status file. by appending the latest timestamp to it.
*
* @param updatedTimeStamp
* @return
*/
public static String getUpdateStatusFileName(String updatedTimeStamp) {
return CarbonCommonConstants.TABLEUPDATESTATUS_FILENAME + CarbonCommonConstants.HYPHEN
+ updatedTimeStamp;
}
/**
* This will handle the clean up cases if the update fails.
*
* @param table
* @param timeStamp
*/
public static void cleanStaleDeltaFiles(CarbonTable table, final String timeStamp) {
AbsoluteTableIdentifier identifier = table.getAbsoluteTableIdentifier();
String partitionDir = CarbonTablePath.getPartitionDir(identifier.getTablePath());
CarbonFile file =
FileFactory.getCarbonFile(partitionDir);
if (!file.exists()) {
return;
}
for (CarbonFile eachDir : file.listFiles()) {
// for each dir check if the file with the delta timestamp is present or not.
CarbonFile[] toBeDeleted = eachDir.listFiles(new CarbonFileFilter() {
@Override
public boolean accept(CarbonFile file) {
String fileName = file.getName();
return fileName.endsWith(timeStamp + CarbonCommonConstants.DELETE_DELTA_FILE_EXT);
}
});
// deleting the files of a segment.
try {
CarbonUtil.deleteFoldersAndFilesSilent(toBeDeleted);
} catch (IOException | InterruptedException e) {
LOGGER.error("Exception in deleting the delta files." + e);
}
}
}
/**
* returns timestamp as long value
*
* @param timestamp
* @return
*/
public static Long getTimeStampAsLong(String timestamp) {
try {
return Long.parseLong(timestamp);
} catch (NumberFormatException nfe) {
String errorMsg = "Invalid timestamp : " + timestamp;
LOGGER.error(errorMsg);
return null;
}
}
/**
* returns integer value from given string
*
* @param value
* @return
* @throws Exception
*/
public static Integer getIntegerValue(String value) throws Exception {
try {
return Integer.parseInt(value);
} catch (NumberFormatException nfe) {
LOGGER.error("Invalid row : " + value + nfe.getLocalizedMessage());
throw new Exception("Invalid row : " + nfe.getLocalizedMessage());
}
}
/**
* return only block name from completeBlockName
*
* @param completeBlockName
* @return
*/
public static String getBlockName(String completeBlockName) {
return completeBlockName
.substring(0, completeBlockName.lastIndexOf(CarbonCommonConstants.HYPHEN));
}
/**
* returns segment id from segment name
*
* @param segmentName
* @return
*/
public static String getSegmentId(String segmentName) {
return segmentName.split(CarbonCommonConstants.UNDERSCORE)[1];
}
public static long getLatestTaskIdForSegment(Segment segment, String tablePath)
throws IOException {
long max = 0;
List<String> dataFiles = new ArrayList<>();
if (segment.getSegmentFileName() != null) {
SegmentFileStore fileStore = new SegmentFileStore(tablePath, segment.getSegmentFileName());
fileStore.readIndexFiles(FileFactory.getConfiguration());
Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
List<String> dataFilePaths = new ArrayList<>();
for (List<String> paths : indexFilesMap.values()) {
dataFilePaths.addAll(paths);
}
for (String dataFilePath : dataFilePaths) {
dataFiles.add(new Path(dataFilePath).getName());
}
} else {
String segmentDirPath = CarbonTablePath.getSegmentPath(tablePath, segment.getSegmentNo());
// scan all the carbondata files and get the latest task ID.
CarbonFile segmentDir =
FileFactory.getCarbonFile(segmentDirPath);
CarbonFile[] carbonDataFiles = segmentDir.listFiles(new CarbonFileFilter() {
@Override
public boolean accept(CarbonFile file) {
return file.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
}
});
for (CarbonFile carbonDataFile : carbonDataFiles) {
dataFiles.add(carbonDataFile.getName());
}
}
for (String name : dataFiles) {
long taskNumber =
Long.parseLong(CarbonTablePath.DataFileUtil.getTaskNo(name).split("_")[0]);
if (taskNumber > max) {
max = taskNumber;
}
}
// return max task No
return max;
}
/**
* This will tell whether the max query timeout has been expired or not.
* @param fileTimestamp
* @return
*/
public static boolean isMaxQueryTimeoutExceeded(long fileTimestamp) {
// record current time.
long currentTime = CarbonUpdateUtil.readCurrentTime();
int maxTime;
try {
maxTime = Integer.parseInt(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME,
Integer.toString(CarbonCommonConstants.DEFAULT_MAX_QUERY_EXECUTION_TIME)));
} catch (NumberFormatException e) {
maxTime = CarbonCommonConstants.DEFAULT_MAX_QUERY_EXECUTION_TIME;
}
long difference = currentTime - fileTimestamp;
long minutesElapsed = (difference / (1000 * 60));
return minutesElapsed >= maxTime;
}
private static boolean deleteInvalidFiles(CarbonFile invalidFile) {
boolean isDeleted;
try {
LOGGER.info("deleting the invalid file : " + invalidFile.getName());
CarbonUtil.deleteFoldersAndFiles(invalidFile);
isDeleted = true;
} catch (IOException | InterruptedException e) {
LOGGER.error("error in clean up of invalid files." + e.getMessage(), e);
isDeleted = false;
}
return isDeleted;
}
public static boolean isBlockInvalid(SegmentStatus blockStatus) {
return blockStatus == SegmentStatus.COMPACTED || blockStatus == SegmentStatus.MARKED_FOR_DELETE;
}
/**
* This will return the current time in millis.
* @return
*/
public static long readCurrentTime() {
return System.currentTimeMillis();
}
/**
*
* @param details
* @param segmentBlockCount
*/
public static void decrementDeletedBlockCount(SegmentUpdateDetails details,
Map<String, Long> segmentBlockCount) {
String segId = details.getSegmentName();
segmentBlockCount.put(details.getSegmentName(), segmentBlockCount.get(segId) - 1);
}
/**
*
* @param segmentBlockCount
* @return
*/
public static List<Segment> getListOfSegmentsToMarkDeleted(Map<String, Long> segmentBlockCount) {
List<Segment> segmentsToBeDeleted =
new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
for (Map.Entry<String, Long> eachSeg : segmentBlockCount.entrySet()) {
if (eachSeg.getValue() == 0) {
segmentsToBeDeleted.add(new Segment(eachSeg.getKey(), (String) null));
}
}
return segmentsToBeDeleted;
}
/**
* Return row count of input block
*/
public static long getRowCount(BlockMappingVO blockMappingVO, CarbonTable carbonTable) {
if (blockMappingVO.getBlockRowCountMapping().size() == 1
&& blockMappingVO.getBlockRowCountMapping().get(CarbonCommonConstantsInternal.ROW_COUNT)
!= null) {
return blockMappingVO.getBlockRowCountMapping().get(CarbonCommonConstantsInternal.ROW_COUNT);
}
SegmentUpdateStatusManager updateStatusManager =
new SegmentUpdateStatusManager(carbonTable);
long rowCount = 0;
Map<String, Long> blockRowCountMap = blockMappingVO.getBlockRowCountMapping();
for (Map.Entry<String, Long> blockRowEntry : blockRowCountMap.entrySet()) {
String key = blockRowEntry.getKey();
long alreadyDeletedCount = 0;
SegmentUpdateDetails detail = updateStatusManager.getDetailsForABlock(key);
if (detail != null) {
alreadyDeletedCount = Long.parseLong(detail.getDeletedRowsInBlock());
}
rowCount += (blockRowEntry.getValue() - alreadyDeletedCount);
}
return rowCount;
}
/**
*
* @param blockMappingVO
* @param segmentUpdateStatusManager
*/
public static void createBlockDetailsMap(BlockMappingVO blockMappingVO,
SegmentUpdateStatusManager segmentUpdateStatusManager) {
Map<String, Long> blockRowCountMap = blockMappingVO.getBlockRowCountMapping();
Map<String, RowCountDetailsVO> outputMap =
new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
for (Map.Entry<String, Long> blockRowEntry : blockRowCountMap.entrySet()) {
String key = blockRowEntry.getKey();
long alreadyDeletedCount = 0;
SegmentUpdateDetails detail = segmentUpdateStatusManager.getDetailsForABlock(key);
if (null != detail) {
alreadyDeletedCount = Long.parseLong(detail.getDeletedRowsInBlock());
}
RowCountDetailsVO rowCountDetailsVO =
new RowCountDetailsVO(blockRowEntry.getValue(), alreadyDeletedCount);
outputMap.put(key, rowCountDetailsVO);
}
blockMappingVO.setCompleteBlockRowDetailVO(outputMap);
}
/**
*
* @param segID
* @param blockName
* @return
*/
public static String getSegmentBlockNameKey(String segID, String blockName,
boolean isPartitionTable) {
String blockNameWithOutPartAndBatchNo = blockName
.substring(blockName.indexOf(CarbonCommonConstants.HYPHEN) + 1,
blockName.lastIndexOf(CarbonTablePath.getCarbonDataExtension()))
.replace(CarbonTablePath.BATCH_PREFIX, CarbonCommonConstants.UNDERSCORE);
// to remove compressor name
int index = blockNameWithOutPartAndBatchNo.lastIndexOf(CarbonCommonConstants.POINT);
if (index != -1) {
blockNameWithOutPartAndBatchNo = blockNameWithOutPartAndBatchNo
.replace(blockNameWithOutPartAndBatchNo.substring(index), "");
}
if (isPartitionTable) {
return blockNameWithOutPartAndBatchNo;
} else {
return segID + CarbonCommonConstants.FILE_SEPARATOR + blockNameWithOutPartAndBatchNo;
}
}
/**
* Below method will be used to get the latest delete delta file timestamp
* @param deleteDeltaFiles
* @return latest delete delta file time stamp
*/
public static long getLatestDeleteDeltaTimestamp(String[] deleteDeltaFiles) {
long latestTimestamp = 0;
for (int i = 0; i < deleteDeltaFiles.length; i++) {
long convertTimeStampToLong = Long.parseLong(
CarbonTablePath.DataFileUtil.getTimeStampFromDeleteDeltaFile(deleteDeltaFiles[i]));
if (latestTimestamp < convertTimeStampToLong) {
latestTimestamp = convertTimeStampToLong;
}
}
return latestTimestamp;
}
/**
* Handling of the clean up of old carbondata files, index files , delete delta,
* update status files.
*
* @param table clean up will be handled on this table.
* @param isDryRun if clean files dryRun selected, then only size will be shown,
* files will not be deleted.
*/
public static long cleanUpDeltaFiles(CarbonTable table, boolean isDryRun) throws IOException {
SegmentStatusManager ssm =
new SegmentStatusManager(table.getAbsoluteTableIdentifier(), table.getTableStatusVersion());
LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(table.getMetadataPath(),
table.getTableStatusVersion());
long totalSizeDeleted = 0;
ArrayList<CarbonFile> filesToBeDeleted = new ArrayList<>();
SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(table);
SegmentUpdateDetails[] segmentUpdateDetails = updateStatusManager.getUpdateStatusDetails();
// hold all the segments updated so that wen can check the delta files in them, ne need to
// check the others.
Set<String> updatedSegments = new HashSet<>();
for (SegmentUpdateDetails updateDetails : segmentUpdateDetails) {
updatedSegments.add(updateDetails.getSegmentName());
}
boolean isInvalidFile = false;
// take the update status file name from 0th segment.
String validUpdateStatusFile = ssm.getUpdateStatusFileName(details);
// scan through each segment.
if (table.isHivePartitionTable()) {
List<CarbonFile> partitionList = Arrays.stream(FileFactory.getCarbonFile(table
.getTablePath()).listFiles()).filter(partitionName -> partitionName.getName()
.contains("=")).collect(Collectors.toList());
List<CarbonFile> totalDeltaFiles = new ArrayList<>();
for (CarbonFile carbonFile : partitionList) {
totalDeltaFiles.addAll(carbonFile.listFiles(true).stream().filter(fileName -> fileName
.getName().endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT))
.collect(Collectors.toList()));
}
SegmentUpdateDetails[] updateDetails = updateStatusManager.readLoadMetadata();
// Case 1: When deleteDeltaStartTimestamp = deleteDeltaEndTimestamp. in this case only 1
// delta file is present and deltaFileStamps is NULL
// Case 2: When deleteDeltaStartTimestamp != deleteDeltaEndTimestamp. in this case more
// than 1 delta files are present, then can blindly read deltaFilesStamps variable
Arrays.stream(updateDetails).forEach(block -> {
if (block.getDeleteDeltaStartTimestamp().equals(block.getDeleteDeltaEndTimestamp())) {
totalDeltaFiles.removeIf(deltaFile -> deltaFile.getName().endsWith(block
.getDeleteDeltaEndTimestamp() + CarbonCommonConstants.DELETE_DELTA_FILE_EXT));
} else {
block.getDeltaFileStamps().stream().forEach(fileName -> totalDeltaFiles
.removeIf(deltaFile -> deltaFile.getName().endsWith(fileName +
CarbonCommonConstants.DELETE_DELTA_FILE_EXT)));
}
});
for (CarbonFile invalidFile: totalDeltaFiles) {
totalSizeDeleted += invalidFile.getSize();
filesToBeDeleted.add(invalidFile);
}
} else {
for (LoadMetadataDetails segment : details) {
// if this segment is valid then only we will go for delta file deletion.
// if the segment is mark for delete or compacted then any way it will get deleted.
if (segment.getSegmentStatus() == SegmentStatus.SUCCESS
|| segment.getSegmentStatus() == SegmentStatus.LOAD_PARTIAL_SUCCESS) {
// when there is no update operations done on table, then no need to go ahead. So
// just check the update delta start timestamp and proceed if not empty
if (!segment.getUpdateDeltaStartTimestamp().isEmpty()
|| updatedSegments.contains(segment.getLoadName())) {
// take the list of files from this segment.
String segmentPath = CarbonTablePath.getSegmentPath(
table.getAbsoluteTableIdentifier().getTablePath(), segment.getLoadName());
CarbonFile segDir =
FileFactory.getCarbonFile(segmentPath);
CarbonFile[] allSegmentFiles = segDir.listFiles();
SegmentUpdateDetails[] updateDetails = updateStatusManager.readLoadMetadata();
for (SegmentUpdateDetails block : updateDetails) {
CarbonFile[] invalidDeleteDeltaFiles;
if (!block.getSegmentName().equalsIgnoreCase(segment.getLoadName())) {
continue;
}
invalidDeleteDeltaFiles = updateStatusManager.getDeleteDeltaInvalidFilesList(block,
false, allSegmentFiles, isInvalidFile);
for (CarbonFile invalidFile : invalidDeleteDeltaFiles) {
totalSizeDeleted += invalidFile.getSize();
filesToBeDeleted.add(invalidFile);
}
}
}
}
}
}
// delete the update table status files which are old.
if (null != validUpdateStatusFile && !validUpdateStatusFile.isEmpty()) {
final String updateStatusTimestamp = validUpdateStatusFile
.substring(validUpdateStatusFile.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1);
String tablePath = table.getAbsoluteTableIdentifier().getTablePath();
CarbonFile metaFolder = FileFactory.getCarbonFile(
CarbonTablePath.getMetadataPath(tablePath));
CarbonFile[] invalidUpdateStatusFiles = metaFolder.listFiles(new CarbonFileFilter() {
@Override
public boolean accept(CarbonFile file) {
if (file.getName().startsWith(CarbonCommonConstants.TABLEUPDATESTATUS_FILENAME)) {
// CHECK if this is valid or not.
// we only send invalid ones to delete.
return !file.getName().endsWith(updateStatusTimestamp);
}
return false;
}
});
for (CarbonFile invalidFile : invalidUpdateStatusFiles) {
totalSizeDeleted += invalidFile.getSize();
filesToBeDeleted.add(invalidFile);
}
}
if (!isDryRun) {
for (CarbonFile invalidFile : filesToBeDeleted) {
invalidFile.deleteFile();
}
}
return totalSizeDeleted;
}
}