blob: f9a3ee7dc5da4cea7df3b6e82202967727a647bd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.statusmanager;
import java.io.*;
import java.util.*;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.fileoperations.AtomicFileOperationFactory;
import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
import org.apache.carbondata.core.fileoperations.FileWriteOperation;
import org.apache.carbondata.core.index.Segment;
import org.apache.carbondata.core.locks.CarbonLockFactory;
import org.apache.carbondata.core.locks.ICarbonLock;
import org.apache.carbondata.core.locks.LockUsage;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
import org.apache.carbondata.core.mutate.UpdateVO;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import com.google.gson.Gson;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
/**
* Manages Segment & block status of carbon table for Delete operation
*/
public class SegmentUpdateStatusManager {
/**
* logger
*/
private static final Logger LOG =
LogServiceFactory.getLogService(SegmentUpdateStatusManager.class.getName());
private final AbsoluteTableIdentifier identifier;
private LoadMetadataDetails[] segmentDetails;
private SegmentUpdateDetails[] updateDetails;
private Map<String, SegmentUpdateDetails> blockAndDetailsMap;
private boolean isPartitionTable;
/**
* It contains the mapping of segment path and corresponding delete delta file paths,
* avoiding listing these files for every query
*/
private Map<String, List<String>> segmentDeleteDeltaListMap = new HashMap<>();
public SegmentUpdateStatusManager(CarbonTable table,
LoadMetadataDetails[] segmentDetails) {
this(table, segmentDetails, null);
}
/**
* It takes the updateVersion as one of the parameter. Basically user can give on which
* updateVersion user can retrieve the data.It is useful to get the history changed data
* of a particular version.
*/
public SegmentUpdateStatusManager(CarbonTable table,
LoadMetadataDetails[] segmentDetails, String updateVersion) {
this.identifier = table.getAbsoluteTableIdentifier();
this.isPartitionTable = table.isHivePartitionTable();
// current it is used only for read function scenarios, as file update always requires to work
// on latest file status.
this.segmentDetails = segmentDetails;
updateDetails = readLoadMetadata();
updateUpdateDetails(updateVersion);
populateMap();
}
public SegmentUpdateStatusManager(CarbonTable table) {
this(table, (String) null);
}
public SegmentUpdateStatusManager(CarbonTable table, String updateVersion) {
this.identifier = table.getAbsoluteTableIdentifier();
// current it is used only for read function scenarios, as file update always requires to work
// on latest file status.
if (!table.getTableInfo().isTransactionalTable()) {
// fileExist is costly operation, so check based on table Type
segmentDetails = new LoadMetadataDetails[0];
} else {
segmentDetails = SegmentStatusManager.readLoadMetadata(
CarbonTablePath.getMetadataPath(identifier.getTablePath()));
}
this.isPartitionTable = table.isHivePartitionTable();
if (segmentDetails.length != 0) {
updateDetails = readLoadMetadata();
} else {
updateDetails = new SegmentUpdateDetails[0];
}
updateUpdateDetails(updateVersion);
populateMap();
}
/**
* It adds only the SegmentUpdateDetails of given updateVersion, it is used to get the history
* data of updated/deleted data.
*/
private void updateUpdateDetails(String updateVersion) {
if (updateVersion != null) {
List<SegmentUpdateDetails> newUpdateDetails = new ArrayList<>();
for (SegmentUpdateDetails updateDetail : updateDetails) {
if (updateDetail.getDeltaFileStamps() != null) {
if (updateDetail.getDeltaFileStamps().contains(updateVersion)) {
HashSet<String> set = new HashSet<>();
set.add(updateVersion);
updateDetail.setDeltaFileStamps(set);
updateDetail.setSegmentStatus(SegmentStatus.SUCCESS);
newUpdateDetails.add(updateDetail);
}
} else if (updateDetail.getDeleteDeltaStartTimestamp().equalsIgnoreCase(updateVersion)) {
updateDetail.setSegmentStatus(SegmentStatus.SUCCESS);
newUpdateDetails.add(updateDetail);
}
}
updateDetails = newUpdateDetails.toArray(new SegmentUpdateDetails[0]);
}
}
/**
* populate the block and its details in a map.
*/
private void populateMap() {
blockAndDetailsMap = new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
for (SegmentUpdateDetails blockDetails : updateDetails) {
String blockIdentifier = CarbonUpdateUtil
.getSegmentBlockNameKey(blockDetails.getSegmentName(), blockDetails.getActualBlockName(),
isPartitionTable);
blockAndDetailsMap.put(blockIdentifier, blockDetails);
}
}
/**
*
* @param segID
* @param actualBlockName
* @return null if block is not present in segment update status.
*/
private SegmentUpdateDetails getDetailsForABlock(String segID, String actualBlockName) {
String blockIdentifier = CarbonUpdateUtil
.getSegmentBlockNameKey(segID, actualBlockName, isPartitionTable);
return blockAndDetailsMap.get(blockIdentifier);
}
/**
*
* @param key will be like (segmentId/blockName) 0/0-0-5464654654654
* @return
*/
public SegmentUpdateDetails getDetailsForABlock(String key) {
return blockAndDetailsMap.get(key);
}
/**
* Returns the LoadMetadata Details
* @return
*/
public LoadMetadataDetails[] getLoadMetadataDetails() {
return segmentDetails;
}
/**
* Returns the UpdateStatus Details.
* @return
*/
public SegmentUpdateDetails[] getUpdateStatusDetails() {
return updateDetails;
}
/**
*
* @param segmentUpdateDetails
*/
public void setUpdateStatusDetails(SegmentUpdateDetails[] segmentUpdateDetails) {
this.updateDetails = segmentUpdateDetails;
}
/**
* This will return the lock object used to lock the table update status file before updating.
*
* @return
*/
public ICarbonLock getTableUpdateStatusLock() {
return CarbonLockFactory.getCarbonLockObj(identifier,
LockUsage.TABLE_UPDATE_STATUS_LOCK);
}
/**
* Returns all update delta files of specified Segment.
*
* @param segmentId
* @return
* @throws Exception
*/
public List<String> getUpdateDeltaFiles(final String segmentId) {
List<String> updatedDeltaFilesList =
new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
String endTimeStamp = "";
String startTimeStamp = "";
String segmentPath = CarbonTablePath.getSegmentPath(
identifier.getTablePath(), segmentId);
CarbonFile segDir =
FileFactory.getCarbonFile(segmentPath);
for (LoadMetadataDetails eachSeg : segmentDetails) {
if (eachSeg.getLoadName().equalsIgnoreCase(segmentId)) {
// if the segment is found then take the start and end time stamp.
startTimeStamp = eachSeg.getUpdateDeltaStartTimestamp();
endTimeStamp = eachSeg.getUpdateDeltaEndTimestamp();
}
}
// if start timestamp is empty then no update delta is found. so return empty list.
if (startTimeStamp.isEmpty()) {
return updatedDeltaFilesList;
}
final Long endTimeStampFinal = CarbonUpdateUtil.getTimeStampAsLong(endTimeStamp);
final Long startTimeStampFinal = CarbonUpdateUtil.getTimeStampAsLong(startTimeStamp);
// else scan the segment for the delta files with the respective timestamp.
CarbonFile[] files = segDir.listFiles(new CarbonFileFilter() {
@Override
public boolean accept(CarbonFile pathName) {
String fileName = pathName.getName();
if (fileName.endsWith(CarbonCommonConstants.UPDATE_DELTA_FILE_EXT)) {
String firstPart = fileName.substring(0, fileName.indexOf('.'));
long timestamp = Long.parseLong(firstPart
.substring(firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1,
firstPart.length()));
if (Long.compare(timestamp, endTimeStampFinal) <= 0
&& Long.compare(timestamp, startTimeStampFinal) >= 0) {
// if marked for delete then it is invalid.
if (!isBlockValid(segmentId, fileName)) {
return false;
}
return true;
}
}
return false;
}
});
for (CarbonFile file : files) {
updatedDeltaFilesList.add(file.getCanonicalPath());
}
return updatedDeltaFilesList;
}
/**
* Below method will be used to get all the delete delta files based on block name
*
* @param blockFilePath actual block filePath
* @return all delete delta files
*/
public String[] getDeleteDeltaFilePath(String blockFilePath, String segmentId) {
return getDeltaFiles(blockFilePath, segmentId, CarbonCommonConstants.DELETE_DELTA_FILE_EXT)
.toArray(new String[0]);
}
/**
* Returns all delta file paths of specified block
*/
private List<String> getDeltaFiles(String blockPath, String segment, String extension) {
Path path = new Path(blockPath);
String completeBlockName = path.getName();
String blockNameWithoutExtension =
completeBlockName.substring(0, completeBlockName.lastIndexOf('.'));
//blockName without timestamp
final String blockNameFromTuple =
blockNameWithoutExtension.substring(0, blockNameWithoutExtension.lastIndexOf("-"));
return getDeltaFiles(path.getParent().toString(), blockNameFromTuple, extension, segment);
}
/**
* This method returns the list of Blocks associated with the segment
* from the SegmentUpdateDetails List.
* @param segmentName
* @return
*/
public List<String> getBlockNameFromSegment(String segmentName) {
List<String> blockNames = new ArrayList<String>();
for (SegmentUpdateDetails block : updateDetails) {
if (block.getSegmentName().equalsIgnoreCase(segmentName) && !CarbonUpdateUtil
.isBlockInvalid(block.getSegmentStatus())) {
blockNames.add(block.getBlockName());
}
}
return blockNames;
}
/**
* check the block whether is valid
*
* @param segName segment name
* @param blockName block name
* @return the status of block whether is valid
*/
public boolean isBlockValid(String segName, String blockName) {
SegmentUpdateDetails details = getDetailsForABlock(segName, blockName);
return details == null || !CarbonUpdateUtil.isBlockInvalid(details.getSegmentStatus());
}
/**
* Returns all delta file paths of specified block
*
* @param blockDir block directory with CarbonFile format
* @param blockNameFromTuple block name from tuple
* @param extension the file extension name
* @param segment the segment name
* @return the list of delete file
*/
private List<String> getDeltaFiles(String blockDir, final String blockNameFromTuple,
final String extension, String segment) {
List<String> deleteFileList = new ArrayList<>();
for (SegmentUpdateDetails block : updateDetails) {
if (block.getBlockName().equalsIgnoreCase(blockNameFromTuple) && block.getSegmentName()
.equalsIgnoreCase(segment) && !CarbonUpdateUtil
.isBlockInvalid(block.getSegmentStatus())) {
final long deltaStartTimestamp = getStartTimeOfDeltaFile(extension, block);
// If there is no delete delete file , then return null
if (deltaStartTimestamp == 0) {
return deleteFileList;
}
final long deltaEndTimeStamp = getEndTimeOfDeltaFile(extension, block);
// If start and end time is same then it has only one delta file so construct the file
// directly with available information with out listing
if (block.getDeleteDeltaStartTimestamp().equals(block.getDeleteDeltaEndTimestamp())) {
deleteFileList.add(
new StringBuilder(blockDir).append(CarbonCommonConstants.FILE_SEPARATOR)
.append(block.getBlockName()).append("-")
.append(block.getDeleteDeltaStartTimestamp()).append(extension).toString());
// If delta timestamp list has data then it has multiple delta file so construct the file
// directly with list of deltas with out listing
} else if (block.getDeltaFileStamps() != null && block.getDeltaFileStamps().size() > 0) {
for (String delta : block.getDeltaFileStamps()) {
deleteFileList.add(
new StringBuilder(blockDir).append(CarbonCommonConstants.FILE_SEPARATOR)
.append(block.getBlockName()).append("-").append(delta).append(extension)
.toString());
}
} else {
// It is for backward compatibility.It lists the files.
return getFilePaths(blockDir, blockNameFromTuple, extension, deleteFileList,
deltaStartTimestamp, deltaEndTimeStamp);
}
}
}
return deleteFileList;
}
private List<String> getFilePaths(String blockDir, final String blockNameFromTuple,
final String extension, List<String> deleteFileList, final long deltaStartTimestamp,
final long deltaEndTimeStamp) {
List<String> deltaList = segmentDeleteDeltaListMap.get(blockDir);
if (deltaList == null) {
CarbonFile[] files = FileFactory.getCarbonFile(blockDir).listFiles(new CarbonFileFilter() {
@Override
public boolean accept(CarbonFile pathName) {
String fileName = pathName.getName();
if (fileName.endsWith(extension) && pathName.getSize() > 0) {
return true;
}
return false;
}
});
deltaList = new ArrayList<>(files.length);
for (CarbonFile file : files) {
deltaList.add(file.getCanonicalPath());
}
segmentDeleteDeltaListMap.put(blockDir, deltaList);
}
for (String deltaFile : deltaList) {
String deltaFilePathName = new Path(deltaFile).getName();
String firstPart = deltaFilePathName.substring(0, deltaFilePathName.lastIndexOf('.'));
String blockName =
firstPart.substring(0, firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN));
long timestamp =
Long.parseLong(CarbonTablePath.DataFileUtil.getTimeStampFromFileName(deltaFilePathName));
// It compares whether this delta file belongs to this block or not. And also checks that
// corresponding delta file is valid or not by considering its load start and end time with
// the file timestamp.
if (blockNameFromTuple.equals(blockName) && ((Long.compare(timestamp, deltaEndTimeStamp) <= 0)
&& (Long.compare(timestamp, deltaStartTimestamp) >= 0))) {
if (null == deleteFileList) {
deleteFileList = new ArrayList<String>();
}
deleteFileList.add(deltaFile);
}
}
return deleteFileList;
}
/**
* Return all delta file for a block.
* @param segmentId
* @param blockName
* @return
*/
public CarbonFile[] getDeleteDeltaFilesList(final Segment segmentId, final String blockName) {
String segmentPath = CarbonTablePath.getSegmentPath(
identifier.getTablePath(), segmentId.getSegmentNo());
CarbonFile segDir =
FileFactory.getCarbonFile(segmentPath);
for (SegmentUpdateDetails block : updateDetails) {
if ((block.getBlockName().equalsIgnoreCase(blockName)) &&
(block.getSegmentName().equalsIgnoreCase(segmentId.getSegmentNo()))
&& !CarbonUpdateUtil.isBlockInvalid((block.getSegmentStatus()))) {
final long deltaStartTimestamp =
getStartTimeOfDeltaFile(CarbonCommonConstants.DELETE_DELTA_FILE_EXT, block);
final long deltaEndTimeStamp =
getEndTimeOfDeltaFile(CarbonCommonConstants.DELETE_DELTA_FILE_EXT, block);
return segDir.listFiles(new CarbonFileFilter() {
@Override
public boolean accept(CarbonFile pathName) {
String fileName = pathName.getName();
if (pathName.getSize() > 0
&& fileName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)) {
String blkName = fileName.substring(0, fileName.lastIndexOf("-"));
long timestamp =
Long.parseLong(CarbonTablePath.DataFileUtil.getTimeStampFromFileName(fileName));
if (blockName.equals(blkName) && (Long.compare(timestamp, deltaEndTimeStamp) <= 0)
&& (Long.compare(timestamp, deltaStartTimestamp) >= 0)) {
return true;
}
}
return false;
}
});
}
}
return null;
}
/**
* Returns all update delta files of specified Segment.
*
* @param loadMetadataDetail metadata details of segment
* @param validUpdateFiles if true then only the valid range files will be returned.
* @return
*/
public CarbonFile[] getUpdateDeltaFilesList(LoadMetadataDetails loadMetadataDetail,
final boolean validUpdateFiles, final String fileExtension, final boolean excludeOriginalFact,
CarbonFile[] allFilesOfSegment, boolean isAbortedFile) {
String endTimeStamp = "";
String startTimeStamp = "";
long factTimeStamp = 0;
// if the segment is found then take the start and end time stamp.
startTimeStamp = loadMetadataDetail.getUpdateDeltaStartTimestamp();
endTimeStamp = loadMetadataDetail.getUpdateDeltaEndTimestamp();
factTimeStamp = loadMetadataDetail.getLoadStartTime();
// if start timestamp is empty then no update delta is found. so return empty list.
if (startTimeStamp.isEmpty()) {
return new CarbonFile[0];
}
final Long endTimeStampFinal = CarbonUpdateUtil.getTimeStampAsLong(endTimeStamp);
final Long startTimeStampFinal = CarbonUpdateUtil.getTimeStampAsLong(startTimeStamp);
final long factTimeStampFinal = factTimeStamp;
List<CarbonFile> listOfCarbonFiles =
new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
// else scan the segment for the delta files with the respective timestamp.
for (CarbonFile eachFile : allFilesOfSegment) {
String fileName = eachFile.getName();
if (fileName.endsWith(fileExtension)) {
long timestamp =
Long.parseLong(CarbonTablePath.DataFileUtil.getTimeStampFromFileName(fileName));
if (excludeOriginalFact) {
if (Long.compare(factTimeStampFinal, timestamp) == 0) {
continue;
}
}
if (validUpdateFiles) {
if (Long.compare(timestamp, endTimeStampFinal) <= 0
&& Long.compare(timestamp, startTimeStampFinal) >= 0) {
listOfCarbonFiles.add(eachFile);
}
} else {
// invalid cases.
if (isAbortedFile) {
if (Long.compare(timestamp, endTimeStampFinal) > 0) {
listOfCarbonFiles.add(eachFile);
}
} else if (Long.compare(timestamp, startTimeStampFinal) < 0
|| Long.compare(timestamp, endTimeStampFinal) > 0) {
listOfCarbonFiles.add(eachFile);
}
}
}
}
return listOfCarbonFiles.toArray(new CarbonFile[listOfCarbonFiles.size()]);
}
/**
* Returns all update delta files of specified Segment.
*
* @param segmentId
* @param validUpdateFiles
* @param fileExtension
* @param excludeOriginalFact
* @param allFilesOfSegment
* @return
*/
public CarbonFile[] getUpdateDeltaFilesForSegment(String segmentId,
final boolean validUpdateFiles, final String fileExtension, final boolean excludeOriginalFact,
CarbonFile[] allFilesOfSegment) {
String endTimeStamp = "";
String startTimeStamp = "";
long factTimeStamp = 0;
for (LoadMetadataDetails eachSeg : segmentDetails) {
if (eachSeg.getLoadName().equalsIgnoreCase(segmentId)) {
// if the segment is found then take the start and end time stamp.
startTimeStamp = eachSeg.getUpdateDeltaStartTimestamp();
endTimeStamp = eachSeg.getUpdateDeltaEndTimestamp();
factTimeStamp = eachSeg.getLoadStartTime();
}
}
// if start timestamp is empty then no update delta is found. so return empty list.
if (startTimeStamp.isEmpty()) {
return new CarbonFile[0];
}
final Long endTimeStampFinal = CarbonUpdateUtil.getTimeStampAsLong(endTimeStamp);
final Long startTimeStampFinal = CarbonUpdateUtil.getTimeStampAsLong(startTimeStamp);
final long factTimeStampFinal = factTimeStamp;
List<CarbonFile> listOfCarbonFiles =
new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
// else scan the segment for the delta files with the respective timestamp.
for (CarbonFile eachFile : allFilesOfSegment) {
String fileName = eachFile.getName();
if (fileName.endsWith(fileExtension)) {
String firstPart = fileName.substring(0, fileName.indexOf('.'));
long timestamp = Long.parseLong(firstPart
.substring(firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1,
firstPart.length()));
if (excludeOriginalFact) {
if (Long.compare(factTimeStampFinal, timestamp) == 0) {
continue;
}
}
if (validUpdateFiles) {
if (Long.compare(timestamp, endTimeStampFinal) <= 0
&& Long.compare(timestamp, startTimeStampFinal) >= 0) {
boolean validBlock = true;
for (SegmentUpdateDetails blockDetails : getUpdateStatusDetails()) {
if (blockDetails.getActualBlockName().equalsIgnoreCase(eachFile.getName())
&& CarbonUpdateUtil.isBlockInvalid(blockDetails.getSegmentStatus())) {
validBlock = false;
}
}
if (validBlock) {
listOfCarbonFiles.add(eachFile);
}
}
} else {
// invalid cases.
if (Long.compare(timestamp, startTimeStampFinal) < 0) {
listOfCarbonFiles.add(eachFile);
}
}
}
}
return listOfCarbonFiles.toArray(new CarbonFile[listOfCarbonFiles.size()]);
}
/**
*
* @param extension
* @param block
* @return
*/
private long getStartTimeOfDeltaFile(String extension, SegmentUpdateDetails block) {
long startTimestamp;
switch (extension) {
case CarbonCommonConstants.DELETE_DELTA_FILE_EXT:
startTimestamp = block.getDeleteDeltaStartTimeAsLong();
break;
default:
startTimestamp = 0;
}
return startTimestamp;
}
/**
*
* @param extension
* @param block
* @return
*/
private long getEndTimeOfDeltaFile(String extension, SegmentUpdateDetails block) {
long endTimestamp;
switch (extension) {
case CarbonCommonConstants.DELETE_DELTA_FILE_EXT:
endTimestamp = block.getDeleteDeltaEndTimeAsLong();
break;
default: endTimestamp = 0;
}
return endTimestamp;
}
/**
* This method loads segment update details
*
* @return
*/
public SegmentUpdateDetails[] readLoadMetadata() {
// get the updated status file identifier from the table status.
String tableUpdateStatusIdentifier = getUpdatedStatusIdentifier();
return readLoadMetadata(tableUpdateStatusIdentifier, identifier.getTablePath());
}
/**
* This method loads segment update details
*
* @return
*/
public static SegmentUpdateDetails[] readLoadMetadata(String tableUpdateStatusIdentifier,
String tablePath) {
Gson gsonObjectToRead = new Gson();
DataInputStream dataInputStream = null;
BufferedReader buffReader = null;
InputStreamReader inStream = null;
SegmentUpdateDetails[] listOfSegmentUpdateDetailsArray;
if (StringUtils.isEmpty(tableUpdateStatusIdentifier)) {
return new SegmentUpdateDetails[0];
}
String tableUpdateStatusPath =
CarbonTablePath.getMetadataPath(tablePath) +
CarbonCommonConstants.FILE_SEPARATOR + tableUpdateStatusIdentifier;
AtomicFileOperations fileOperation =
AtomicFileOperationFactory.getAtomicFileOperations(tableUpdateStatusPath);
try {
if (!FileFactory
.isFileExist(tableUpdateStatusPath)) {
return new SegmentUpdateDetails[0];
}
dataInputStream = fileOperation.openForRead();
inStream = new InputStreamReader(dataInputStream,
CarbonCommonConstants.DEFAULT_CHARSET);
buffReader = new BufferedReader(inStream);
listOfSegmentUpdateDetailsArray =
gsonObjectToRead.fromJson(buffReader, SegmentUpdateDetails[].class);
} catch (IOException e) {
return new SegmentUpdateDetails[0];
} finally {
closeStreams(buffReader, inStream, dataInputStream);
}
return listOfSegmentUpdateDetailsArray;
}
/**
* @return updateStatusFileName
*/
private String getUpdatedStatusIdentifier() {
if (segmentDetails.length == 0) {
return null;
}
return segmentDetails[0].getUpdateStatusFileName();
}
/**
* writes segment update details into a given file at @param dataLoadLocation
*
* @param listOfSegmentUpdateDetailsArray
* @throws IOException
*/
public void writeLoadDetailsIntoFile(List<SegmentUpdateDetails> listOfSegmentUpdateDetailsArray,
String updateStatusFileIdentifier) throws IOException {
String fileLocation =
CarbonTablePath.getMetadataPath(identifier.getTablePath())
+ CarbonCommonConstants.FILE_SEPARATOR
+ CarbonUpdateUtil.getUpdateStatusFileName(updateStatusFileIdentifier);
AtomicFileOperations fileWrite =
AtomicFileOperationFactory.getAtomicFileOperations(fileLocation);
BufferedWriter brWriter = null;
DataOutputStream dataOutputStream = null;
Gson gsonObjectToWrite = new Gson();
// write the updated data into the metadata file.
try {
dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
CarbonCommonConstants.DEFAULT_CHARSET));
String metadataInstance = gsonObjectToWrite.toJson(listOfSegmentUpdateDetailsArray);
brWriter.write(metadataInstance);
} catch (IOException ioe) {
LOG.error("Error message: " + ioe.getLocalizedMessage());
fileWrite.setFailed();
throw ioe;
} finally {
if (null != brWriter) {
brWriter.flush();
}
CarbonUtil.closeStreams(brWriter);
fileWrite.close();
}
}
/**
* This method closes the streams
*
* @param streams - streams to close.
*/
private static void closeStreams(Closeable... streams) {
// Added if to avoid NullPointerException in case one stream is being passed as null
if (null != streams) {
for (Closeable stream : streams) {
if (null != stream) {
try {
stream.close();
} catch (IOException e) {
LOG.error("Error while closing stream" + stream);
}
}
}
}
}
/**
* Returns the invalid timestamp range of a segment.
*/
public static UpdateVO getInvalidTimestampRange(LoadMetadataDetails loadMetadataDetails) {
UpdateVO range = new UpdateVO();
if (loadMetadataDetails != null) {
range.setSegmentId(loadMetadataDetails.getLoadName());
range.setFactTimestamp(loadMetadataDetails.getLoadStartTime());
if (!loadMetadataDetails.getUpdateDeltaStartTimestamp().isEmpty() && !loadMetadataDetails
.getUpdateDeltaEndTimestamp().isEmpty()) {
range.setUpdateDeltaStartTimestamp(CarbonUpdateUtil
.getTimeStampAsLong(loadMetadataDetails.getUpdateDeltaStartTimestamp()));
range.setLatestUpdateTimestamp(
CarbonUpdateUtil.getTimeStampAsLong(loadMetadataDetails.getUpdateDeltaEndTimestamp()));
}
}
return range;
}
/**
*
* @param block
* @param needCompleteList
* @return
*/
public CarbonFile[] getDeleteDeltaInvalidFilesList(
final SegmentUpdateDetails block, final boolean needCompleteList,
CarbonFile[] allSegmentFiles, boolean isAbortedFile) {
final long deltaStartTimestamp =
getStartTimeOfDeltaFile(CarbonCommonConstants.DELETE_DELTA_FILE_EXT, block);
final long deltaEndTimestamp =
getEndTimeOfDeltaFile(CarbonCommonConstants.DELETE_DELTA_FILE_EXT, block);
Set<CarbonFile> files =
new HashSet<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
for (CarbonFile eachFile : allSegmentFiles) {
String fileName = eachFile.getName();
if (fileName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)) {
String blkName = CarbonTablePath.DataFileUtil.getBlockNameFromDeleteDeltaFile(fileName);
// complete list of delta files of that block is returned.
if (needCompleteList && block.getBlockName().equalsIgnoreCase(blkName)) {
files.add(eachFile);
}
// invalid delete delta files only will be returned.
long timestamp = CarbonUpdateUtil.getTimeStampAsLong(
CarbonTablePath.DataFileUtil.getTimeStampFromDeleteDeltaFile(fileName));
if (block.getBlockName().equalsIgnoreCase(blkName)) {
if (isAbortedFile) {
if (Long.compare(timestamp, deltaEndTimestamp) > 0) {
files.add(eachFile);
}
} else if (Long.compare(timestamp, deltaStartTimestamp) < 0
|| Long.compare(timestamp, deltaEndTimestamp) > 0) {
files.add(eachFile);
}
}
}
}
return files.toArray(new CarbonFile[files.size()]);
}
}