blob: 45f16caba60210f9b3800a45bda609050f521d19 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.util.path;
import java.io.File;
import java.util.Objects;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.locks.LockUsage;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import static org.apache.carbondata.core.constants.CarbonCommonConstants.DASH;
import static org.apache.carbondata.core.constants.CarbonCommonConstants.POINT;
import org.apache.hadoop.conf.Configuration;
/**
* Helps to get Table content paths.
*/
public class CarbonTablePath {
private static final String METADATA_DIR = "Metadata";
private static final String DICTIONARY_EXT = ".dict";
public static final String SCHEMA_FILE = "schema";
private static final String FACT_DIR = "Fact";
private static final String SEGMENT_PREFIX = "Segment_";
private static final String PARTITION_PREFIX = "Part";
private static final String DATA_PART_PREFIX = "part-";
private static final String BATCH_PREFIX = "_batchno";
private static final String LOCK_DIR = "LockFiles";
public static final String TABLE_STATUS_FILE = "tablestatus";
public static final String TABLE_STATUS_HISTORY_FILE = "tablestatus.history";
public static final String CARBON_DATA_EXT = ".carbondata";
public static final String INDEX_FILE_EXT = ".carbonindex";
public static final String MERGE_INDEX_FILE_EXT = ".carbonindexmerge";
public static final String SEGMENT_EXT = ".segment";
private static final String STREAMING_DIR = ".streaming";
private static final String STREAMING_LOG_DIR = "log";
private static final String STREAMING_CHECKPOINT_DIR = "checkpoint";
private static final String STAGE_DIR = "stage";
private static final String STAGE_DATA_DIR = "stage_data";
public static final String SUCCESS_FILE_SUFFIX = ".success";
public static final String LOADING_FILE_SUFFIX = ".loading";
private static final String SNAPSHOT_FILE_NAME = "snapshot";
public static final String SYSTEM_FOLDER_DIR = "_system";
/**
* This class provides static utility only.
*/
private CarbonTablePath() {
}
public static String getStageDir(String tablePath) {
return getMetadataPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + STAGE_DIR;
}
public static String getStageDataDir(String tablePath) {
return tablePath + CarbonCommonConstants.FILE_SEPARATOR + STAGE_DATA_DIR;
}
public static String getStageSnapshotFile(String tablePath) {
return CarbonTablePath.getStageDir(tablePath) + CarbonCommonConstants.FILE_SEPARATOR +
SNAPSHOT_FILE_NAME;
}
/**
* The method returns the folder path containing the carbon file.
*
* @param carbonFilePath
*/
public static String getFolderContainingFile(String carbonFilePath) {
int lastIndex = carbonFilePath.lastIndexOf('/');
// below code for handling windows environment
if (-1 == lastIndex) {
lastIndex = carbonFilePath.lastIndexOf(File.separator);
}
return carbonFilePath.substring(0, lastIndex);
}
/**
* @param columnId unique column identifier
* @return name of dictionary file
*/
public static String getDictionaryFileName(String columnId) {
return columnId + DICTIONARY_EXT;
}
/**
* whether carbonFile is dictionary file or not
*
* @param carbonFile
* @return
*/
public static Boolean isDictionaryFile(CarbonFile carbonFile) {
return (!carbonFile.isDirectory()) && (carbonFile.getName().endsWith(DICTIONARY_EXT));
}
/**
* check if it is carbon data file matching extension
*
* @param fileNameWithPath
* @return boolean
*/
public static boolean isCarbonDataFile(String fileNameWithPath) {
int pos = fileNameWithPath.lastIndexOf('.');
if (pos != -1) {
return fileNameWithPath.substring(pos).startsWith(CARBON_DATA_EXT);
}
return false;
}
/**
* check if it is carbon index file matching extension
*
* @param fileNameWithPath
* @return boolean
*/
public static boolean isCarbonIndexFile(String fileNameWithPath) {
int pos = fileNameWithPath.lastIndexOf('.');
if (pos != -1) {
return fileNameWithPath.substring(pos).startsWith(INDEX_FILE_EXT);
}
return false;
}
/**
* Return absolute path of dictionary file
*/
public static String getDictionaryFilePath(String tablePath, String columnId) {
return getMetadataPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR +
getDictionaryFileName(columnId);
}
/**
* Return metadata path
*/
public static String getMetadataPath(String tablePath) {
return tablePath + CarbonCommonConstants.FILE_SEPARATOR + METADATA_DIR;
}
/**
* return the schema file path
* @param tablePath path to table files
* @return schema file path
*/
public static String getSchemaFilePath(String tablePath) {
return getActualSchemaFilePath(tablePath, null);
}
/**
* return the schema file path
* @param tablePath path to table files
* @param hadoopConf hadoop configuration instance
* @return schema file path
*/
public static String getSchemaFilePath(String tablePath, Configuration hadoopConf) {
return getActualSchemaFilePath(tablePath, hadoopConf);
}
private static String getActualSchemaFilePath(String tablePath, Configuration hadoopConf) {
String metaPath = tablePath + CarbonCommonConstants.FILE_SEPARATOR + METADATA_DIR;
CarbonFile carbonFile;
if (hadoopConf != null) {
carbonFile = FileFactory.getCarbonFile(metaPath, hadoopConf);
} else {
carbonFile = FileFactory.getCarbonFile(metaPath);
}
CarbonFile[] schemaFile = carbonFile.listFiles(new CarbonFileFilter() {
@Override
public boolean accept(CarbonFile file) {
return file.getName().startsWith(SCHEMA_FILE);
}
});
if (schemaFile != null && schemaFile.length > 0 &&
FileFactory.getFileType(tablePath) != FileFactory.FileType.ALLUXIO) {
return schemaFile[0].getAbsolutePath();
} else {
return metaPath + CarbonCommonConstants.FILE_SEPARATOR + SCHEMA_FILE;
}
}
/**
* Return absolute path of table status file
*/
public static String getTableStatusFilePath(String tablePath) {
return getMetadataPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + TABLE_STATUS_FILE;
}
public static String getTableStatusFilePathWithUUID(String tablePath, String uuid) {
if (!uuid.isEmpty()) {
return getTableStatusFilePath(tablePath) + CarbonCommonConstants.UNDERSCORE + uuid;
} else {
return getTableStatusFilePath(tablePath);
}
}
/**
* Below method will be used to get the index file present in the segment folder
* based on task id
*
* @param taskId task id of the file
* @param segmentId segment number
* @return full qualified carbon index path
*/
private static String getCarbonIndexFilePath(final String tablePath, final String taskId,
final String segmentId, final String bucketNumber) {
String segmentDir = getSegmentPath(tablePath, segmentId);
CarbonFile carbonFile =
FileFactory.getCarbonFile(segmentDir);
CarbonFile[] files = carbonFile.listFiles(new CarbonFileFilter() {
@Override
public boolean accept(CarbonFile file) {
if (bucketNumber.equals("-1")) {
return file.getName().startsWith(taskId) && file.getName().endsWith(INDEX_FILE_EXT);
}
return file.getName().startsWith(taskId + DASH + bucketNumber) && file.getName()
.endsWith(INDEX_FILE_EXT);
}
});
if (files.length > 0) {
return files[0].getAbsolutePath();
} else {
throw new RuntimeException("Missing Carbon index file for Segment[" + segmentId + "], "
+ "taskId[" + taskId + "]");
}
}
/**
* Below method will be used to get the carbon index file path
* @param taskId
* task id
* @param segmentId
* segment id
* @param bucketNumber
* bucket number
* @param timeStamp
* timestamp
* @return carbon index file path
*/
public static String getCarbonIndexFilePath(String tablePath, String taskId, String segmentId,
String bucketNumber, String timeStamp, ColumnarFormatVersion columnarFormatVersion) {
switch (columnarFormatVersion) {
case V3:
String segmentDir = getSegmentPath(tablePath, segmentId);
return segmentDir + CarbonCommonConstants.FILE_SEPARATOR + getCarbonIndexFileName(taskId,
Integer.parseInt(bucketNumber), timeStamp, segmentId);
default:
throw new UnsupportedOperationException(
"Unsupported file version: " + columnarFormatVersion);
}
}
private static String getCarbonIndexFileName(String taskNo, int bucketNumber,
String factUpdatedTimestamp, String segmentNo) {
if (bucketNumber == -1) {
return new StringBuilder()
.append(taskNo).append(DASH)
.append(segmentNo).append(DASH)
.append(factUpdatedTimestamp)
.append(INDEX_FILE_EXT)
.toString();
} else {
return new StringBuilder()
.append(taskNo).append(DASH)
.append(bucketNumber).append(DASH)
.append(segmentNo).append(DASH)
.append(factUpdatedTimestamp)
.append(INDEX_FILE_EXT)
.toString();
}
}
/**
* Return the segment path from table path and segmentId
*/
public static String getSegmentPath(String tablePath, String segmentId) {
return getPartitionDir(tablePath) + CarbonCommonConstants.FILE_SEPARATOR
+ SEGMENT_PREFIX + segmentId;
}
/**
* Gets data file name only, without parent path
*/
public static String getCarbonDataFileName(Integer filePartNo, String taskNo, int bucketNumber,
int batchNo, String factUpdateTimeStamp, String segmentNo, String compressor) {
Objects.requireNonNull(filePartNo);
Objects.requireNonNull(taskNo);
Objects.requireNonNull(factUpdateTimeStamp);
Objects.requireNonNull(compressor);
// Start from CarbonData 2.0, the data file name patten is:
// partNo-taskNo-batchNo-bucketNo-segmentNo-timestamp.compressor.carbondata
// For example:
// part-0-0_batchno0-0-0-1580982686749.zstd.carbondata
//
// If the compressor name is missing, the file is compressed by snappy, which is
// the default compressor in CarbonData 1.x
return new StringBuilder()
.append(DATA_PART_PREFIX)
.append(filePartNo).append(DASH)
.append(taskNo).append(BATCH_PREFIX)
.append(batchNo).append(DASH)
.append(bucketNumber).append(DASH)
.append(segmentNo).append(DASH)
.append(factUpdateTimeStamp).append(POINT)
.append(compressor).append(CARBON_DATA_EXT)
.toString();
}
public static String getShardName(String taskNo, int bucketNumber, int batchNo,
String factUpdateTimeStamp, String segmentNo) {
return taskNo + BATCH_PREFIX + batchNo + DASH + bucketNumber + DASH + segmentNo + DASH
+ factUpdateTimeStamp;
}
/**
* Below method will be used to get the carbon index filename
*
* @param taskNo task number
* @param factUpdatedTimeStamp time stamp
* @return filename
*/
public static String getCarbonIndexFileName(String taskNo, int bucketNumber, int batchNo,
String factUpdatedTimeStamp, String segmentNo) {
return getShardName(taskNo, bucketNumber, batchNo, factUpdatedTimeStamp, segmentNo)
+ INDEX_FILE_EXT;
}
public static String getCarbonStreamIndexFileName() {
return getCarbonIndexFileName("0", 0, 0, "0", "0");
}
public static String getCarbonStreamIndexFilePath(String segmentDir) {
return segmentDir + CarbonCommonConstants.FILE_SEPARATOR + getCarbonStreamIndexFileName();
}
// This partition is not used in any code logic, just keep backward compatibility
public static final String DEPRECATED_PARTITION_ID = "0";
public static String getPartitionDir(String tablePath) {
return getFactDir(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + PARTITION_PREFIX +
CarbonTablePath.DEPRECATED_PARTITION_ID;
}
public static String getFactDir(String tablePath) {
return tablePath + CarbonCommonConstants.FILE_SEPARATOR + FACT_DIR;
}
public static String getStreamingLogDir(String tablePath) {
return tablePath + CarbonCommonConstants.FILE_SEPARATOR + STREAMING_DIR +
CarbonCommonConstants.FILE_SEPARATOR + STREAMING_LOG_DIR;
}
public static String getStreamingCheckpointDir(String tablePath) {
return tablePath + CarbonCommonConstants.FILE_SEPARATOR + STREAMING_DIR +
CarbonCommonConstants.FILE_SEPARATOR + STREAMING_CHECKPOINT_DIR;
}
/**
* Return store path for index based on the taskNo,if three tasks get launched during loading,
* then three folders will be created based on the shard name and lucene index file will be
* written into those folders
*
* @return store path based on index shard name
*/
public static String getIndexStorePathOnShardName(String tablePath, String segmentId,
String indexName, String shardName) {
return new StringBuilder()
.append(getIndexesStorePath(tablePath, segmentId, indexName))
.append(CarbonCommonConstants.FILE_SEPARATOR)
.append(shardName)
.toString();
}
/**
* Return store path for index based on the indexName,
*
* @return store path based on index name
*/
public static String getIndexesStorePath(String tablePath, String segmentId,
String indexName) {
return new StringBuilder()
.append(tablePath)
.append(CarbonCommonConstants.FILE_SEPARATOR)
.append(indexName)
.append(CarbonCommonConstants.FILE_SEPARATOR)
.append(segmentId)
.toString();
}
/**
* To manage data file name and composition
*/
public static class DataFileUtil {
/**
* gets updated timestamp information from given carbon data file name
*/
public static String getTimeStampFromFileName(String carbonDataFileName) {
// Get the timestamp portion of the file.
String fileName = getFileName(carbonDataFileName);
int startIndex;
if (carbonDataFileName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
startIndex = fileName.lastIndexOf(CarbonCommonConstants.UNDERSCORE) + 1;
} else {
startIndex = fileName.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1;
}
int endIndex = fileName.indexOf(".", startIndex);
return fileName.substring(startIndex, endIndex);
}
/**
* gets updated timestamp information from given carbon data file name
* and compares with given timestamp
*
* @param fileName
* @param timestamp
* @return
*/
public static Boolean compareCarbonFileTimeStamp(String fileName, Long timestamp) {
int startIndex = fileName.lastIndexOf(DASH);
int endIndex = fileName.indexOf(CarbonCommonConstants.POINT, startIndex);
if (startIndex > 0 && endIndex > 0) {
return fileName.substring(startIndex + 1, endIndex).equals(timestamp.toString());
}
return false;
}
/**
* Return the timestamp present in the delete delta file.
*/
public static String getTimeStampFromDeleteDeltaFile(String fileName) {
return DataFileUtil.getTimeStampFromFileName(fileName);
}
/**
* Return the timestamp present in the delete delta file.
*/
public static String getBlockNameFromDeleteDeltaFile(String fileName) {
return fileName.substring(0,
fileName.lastIndexOf(CarbonCommonConstants.HYPHEN));
}
/**
* Return the updated timestamp information from given carbon data file name
*/
public static String getBucketNo(String carbonFilePath) {
// Get the file name from path
String fileName = getFileName(carbonFilePath);
// + 1 for size of "-"
int firstDashPos = fileName.indexOf(DASH);
int secondDash = fileName.indexOf(DASH, firstDashPos + 1);
int startIndex = fileName.indexOf(DASH, secondDash + 1) + 1;
int endIndex = fileName.indexOf(DASH, startIndex);
// to support backward compatibility
if (endIndex == -1) {
return "-1";
}
return fileName.substring(startIndex, endIndex);
}
/**
* Return the file part number information from given carbon data file name
*/
public static String getPartNo(String carbonDataFileName) {
// Get the file name from path
String fileName = getFileName(carbonDataFileName);
// + 1 for size of "-"
int startIndex = fileName.indexOf(DASH) + 1;
int endIndex = fileName.indexOf(DASH, startIndex);
return fileName.substring(startIndex, endIndex);
}
/**
* Return the updated timestamp information from given carbon data file name
*/
public static String getTaskNo(String carbonDataFileName) {
// Get the file name from path
String fileName = getFileName(carbonDataFileName);
// + 1 for size of "-"
int firstDashPos = fileName.indexOf(DASH);
int startIndex = fileName.indexOf(DASH, firstDashPos + 1) + 1;
int endIndex = fileName.indexOf(DASH, startIndex);
return fileName.substring(startIndex, endIndex);
}
/**
* Return task id in the carbon data file name
*/
public static long getTaskId(String carbonDataFileName) {
return Long.parseLong(getTaskNo(carbonDataFileName).split(BATCH_PREFIX)[0]);
}
/**
* Return the updated timestamp information from given carbon data file name
*/
public static String getSegmentNo(String carbonDataFileName) {
// Get the file name from path
String fileName = getFileName(carbonDataFileName);
// + 1 for size of "-"
int firstDashPos = fileName.indexOf(DASH);
int startIndex1 = fileName.indexOf(DASH, firstDashPos + 1) + 1;
int endIndex1 = fileName.indexOf(DASH, startIndex1);
int startIndex = fileName.indexOf(DASH, endIndex1 + 1);
if (startIndex > -1) {
startIndex += 1;
int endIndex = fileName.indexOf(DASH, startIndex);
if (endIndex == -1) {
return null;
}
return fileName.substring(startIndex, endIndex);
} else {
return null;
}
}
/**
* Return the taskId part from taskNo(include taskId + batchNo)
*/
public static String getTaskIdFromTaskNo(String taskNo) {
return taskNo.split(BATCH_PREFIX)[0];
}
/**
* Return the batch number from taskNo string
*/
public static int getBatchNoFromTaskNo(String taskNo) {
return Integer.parseInt(taskNo.split(BATCH_PREFIX)[1]);
}
/**
* Return the file name from file path
*/
public static String getFileName(String dataFilePath) {
int endIndex = dataFilePath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR);
if (endIndex > -1) {
return dataFilePath.substring(endIndex + 1, dataFilePath.length());
} else {
return dataFilePath;
}
}
/**
* gets segment id from given absolute data file path
*/
public static String getSegmentIdFromPath(String dataFileAbsolutePath) {
// find segment id from last of data file path
String tempDataFileAbsolutePath = dataFileAbsolutePath.replace(
CarbonCommonConstants.WINDOWS_FILE_SEPARATOR, CarbonCommonConstants.FILE_SEPARATOR);
int endIndex = tempDataFileAbsolutePath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR);
// + 1 for size of "/"
int startIndex = tempDataFileAbsolutePath.lastIndexOf(
CarbonCommonConstants.FILE_SEPARATOR, endIndex - 1) + 1;
String segmentDirStr = dataFileAbsolutePath.substring(startIndex, endIndex);
//identify id in segment_<id>
String[] segmentDirSplits = segmentDirStr.split(CarbonCommonConstants.UNDERSCORE);
try {
if (segmentDirSplits.length == 2) {
return segmentDirSplits[1];
}
} catch (Exception e) {
return CarbonCommonConstants.INVALID_SEGMENT_ID;
}
return CarbonCommonConstants.INVALID_SEGMENT_ID;
}
}
/**
* Return the carbondata file name without extension name
*/
public static String getCarbonDataFileName(String carbonDataFilePath) {
return carbonDataFilePath.substring(
carbonDataFilePath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR) + 1,
carbonDataFilePath.lastIndexOf(CARBON_DATA_EXT));
}
/**
* Return prefix of carbon data
*/
public static String getCarbonDataPrefix() {
return DATA_PART_PREFIX;
}
/**
*
* @return carbon data extension
*/
public static String getCarbonDataExtension() {
return CARBON_DATA_EXT;
}
/**
*
* @return carbon index extension
*/
public static String getCarbonIndexExtension() {
return INDEX_FILE_EXT;
}
/**
*
* @return carbon index merge file extension
*/
public static String getCarbonMergeIndexExtension() {
return MERGE_INDEX_FILE_EXT;
}
/**
* This method will remove strings in path and return short block id
*
* @param blockId
* @return shortBlockId
*/
public static String getShortBlockId(String blockId) {
return blockId.replace(PARTITION_PREFIX, "")
.replace(SEGMENT_PREFIX, "")
.replace(DATA_PART_PREFIX, "")
.replace(CARBON_DATA_EXT, "");
}
/**
* This method will remove strings in path and return short block id
*
* @param blockId
* @return shortBlockId
*/
public static String getShortBlockIdForPartitionTable(String blockId) {
return blockId.replace(DATA_PART_PREFIX, "")
.replace(CARBON_DATA_EXT, "");
}
/**
* adds data part prefix to given value
* @return partition prefix
*/
public static String addDataPartPrefix(String value) {
return DATA_PART_PREFIX + value;
}
/**
* adds part prefix to given value
* @return partition prefix
*/
public static String addPartPrefix(String value) {
return PARTITION_PREFIX + value;
}
/**
* adds part prefix to given value
* @return partition prefix
*/
public static String addSegmentPrefix(String value) {
return SEGMENT_PREFIX + value;
}
/**
* Unique task name
*
* A shard name is composed by `TaskNo-BucketNo-SegmentNo-Timestamp`
* As for data before version 1.4, shard name was `TaskNo-BucketNo-Timestamp`
*
* @param actualBlockName
* @return
*/
public static String getShardName(String actualBlockName) {
String segmentNoStr = DataFileUtil.getSegmentNo(actualBlockName);
StringBuilder shardName = new StringBuilder();
if (null != segmentNoStr) {
shardName.append(DataFileUtil.getTaskNo(actualBlockName)).append(DASH);
shardName.append(DataFileUtil.getBucketNo(actualBlockName)).append(DASH);
shardName.append(segmentNoStr).append(DASH);
shardName.append(DataFileUtil.getTimeStampFromFileName(actualBlockName));
return shardName.toString();
} else {
// data before version 1.4 does not have SegmentNo in carbondata filename
shardName.append(DataFileUtil.getTaskNo(actualBlockName)).append(DASH);
shardName.append(DataFileUtil.getBucketNo(actualBlockName)).append(DASH);
shardName.append(DataFileUtil.getTimeStampFromFileName(actualBlockName));
return shardName.toString();
}
}
/**
* Get the segment file locations of table
*/
public static String getSegmentFilesLocation(String tablePath) {
return getMetadataPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + "segments";
}
/**
* Get the segment file path of table
*/
public static String getSegmentFilePath(String tablePath, String segmentFileName) {
return getMetadataPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + "segments"
+ CarbonCommonConstants.FILE_SEPARATOR + segmentFileName;
}
/**
* Get the lock files directory
*/
public static String getLockFilesDirPath(String tablePath) {
return tablePath + CarbonCommonConstants.FILE_SEPARATOR + LOCK_DIR;
}
/**
* Get the lock file
*/
public static String getLockFilePath(String tablePath, String lockType) {
return getLockFilesDirPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + lockType;
}
/**
* return true if this lock file is a segment lock file otherwise false.
*/
public static boolean isSegmentLockFilePath(String lockFileName) {
return lockFileName.startsWith(SEGMENT_PREFIX) && lockFileName.endsWith(LockUsage.LOCK);
}
/**
* Return table status history file path based on `tablePath`
*/
public static String getTableStatusHistoryFilePath(String tablePath) {
return getMetadataPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR
+ TABLE_STATUS_HISTORY_FILE;
}
public static String generateBadRecordsPath(String badLogStoreLocation, String segmentId,
String taskNo, boolean isTransactionalTable) {
if (!isTransactionalTable) {
return badLogStoreLocation + CarbonCommonConstants.FILE_SEPARATOR + "SdkWriterBadRecords"
+ CarbonCommonConstants.FILE_SEPARATOR + taskNo;
} else {
return badLogStoreLocation + CarbonCommonConstants.FILE_SEPARATOR + segmentId +
CarbonCommonConstants.FILE_SEPARATOR + taskNo;
}
}
/**
* Return the parent path of the input file.
* For example, if input file path is /user/warehouse/t1/file.carbondata
* then return will be /user/warehouse/t1
*/
public static String getParentPath(String dataFilePath) {
int endIndex = dataFilePath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR);
if (endIndex > -1) {
return dataFilePath.substring(0, endIndex);
} else {
return dataFilePath;
}
}
}