blob: e4d45a9038ac9eb64b596b0ea1972a893597b3b5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.processing.util;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.cache.Cache;
import org.apache.carbondata.core.cache.CacheProvider;
import org.apache.carbondata.core.cache.CacheType;
import org.apache.carbondata.core.cache.dictionary.Dictionary;
import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datastore.block.Distributable;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.locks.CarbonLockUtil;
import org.apache.carbondata.core.locks.ICarbonLock;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.ColumnIdentifier;
import org.apache.carbondata.core.metadata.SegmentFileStore;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
import org.apache.carbondata.processing.merger.NodeMultiBlockRelation;
import static org.apache.carbondata.core.enums.EscapeSequences.BACKSPACE;
import static org.apache.carbondata.core.enums.EscapeSequences.CARRIAGE_RETURN;
import static org.apache.carbondata.core.enums.EscapeSequences.NEW_LINE;
import static org.apache.carbondata.core.enums.EscapeSequences.TAB;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
public final class CarbonLoaderUtil {
private static final Logger LOGGER =
LogServiceFactory.getLogService(CarbonLoaderUtil.class.getName());
private CarbonLoaderUtil() {
}
/**
* strategy for assign blocks to nodes/executors
*/
public enum BlockAssignmentStrategy {
BLOCK_NUM_FIRST("Assign blocks to node base on number of blocks"),
BLOCK_SIZE_FIRST("Assign blocks to node base on data size of blocks"),
NODE_MIN_SIZE_FIRST("Assign blocks to node base on minimum size of inputs");
private String name;
BlockAssignmentStrategy(String name) {
this.name = name;
}
@Override
public String toString() {
return this.getClass().getSimpleName() + ':' + this.name;
}
}
public static void deleteSegment(CarbonLoadModel loadModel, int currentLoad) {
String segmentPath = CarbonTablePath.getSegmentPath(
loadModel.getTablePath(), currentLoad + "");
deleteStorePath(segmentPath);
}
/**
* the method returns true if the segment has carbondata file else returns false.
*
* @param loadModel
* @param currentLoad
* @return
*/
public static boolean isValidSegment(CarbonLoadModel loadModel,
int currentLoad) {
int fileCount = 0;
String segmentPath = CarbonTablePath.getSegmentPath(
loadModel.getTablePath(), currentLoad + "");
CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
CarbonFile[] files = carbonFile.listFiles(new CarbonFileFilter() {
@Override
public boolean accept(CarbonFile file) {
return file.getName().endsWith(
CarbonTablePath.getCarbonIndexExtension())
|| file.getName().endsWith(
CarbonTablePath.getCarbonDataExtension());
}
});
fileCount += files.length;
if (files.length > 0) {
return true;
}
if (fileCount == 0) {
return false;
}
return true;
}
public static void deleteStorePath(String path) {
try {
if (FileFactory.isFileExist(path)) {
CarbonFile carbonFile = FileFactory.getCarbonFile(path);
CarbonUtil.deleteFoldersAndFiles(carbonFile);
}
} catch (IOException | InterruptedException e) {
LOGGER.error("Unable to delete the given path :: " + e.getMessage(), e);
}
}
/**
* This API will write the load level metadata for the loadmanagement module inorder to
* manage the load and query execution management smoothly.
*
* @param newMetaEntry
* @param loadModel
* @return boolean which determines whether status update is done or not.
* @throws IOException
*/
public static boolean recordNewLoadMetadata(LoadMetadataDetails newMetaEntry,
CarbonLoadModel loadModel, boolean loadStartEntry, boolean insertOverwrite)
throws IOException {
return recordNewLoadMetadata(newMetaEntry, loadModel, loadStartEntry, insertOverwrite, "");
}
/**
* Append a new load metadata into table status file
*
* @param loadModel load model
* @return boolean which determines whether status update is done or not
* @throws IOException
*/
public static boolean recordNewLoadMetadata(CarbonLoadModel loadModel) throws IOException {
LoadMetadataDetails newLoadMetaEntry = new LoadMetadataDetails();
loadModel.setFactTimeStamp(System.currentTimeMillis());
CarbonLoaderUtil.populateNewLoadMetaEntry(
newLoadMetaEntry,
SegmentStatus.INSERT_IN_PROGRESS,
loadModel.getFactTimeStamp(),
false);
return recordNewLoadMetadata(newLoadMetaEntry, loadModel, true, false);
}
/**
* This API deletes the content of the non Transactional Tables when insert overwrite is set true.
*
* @param loadModel
* @throws IOException
*/
public static void deleteNonTransactionalTableForInsertOverwrite(final CarbonLoadModel loadModel)
throws IOException {
// We need to delete the content of the Table Path Folder except the
// Newly added file.
List<String> filesToBeDeleted = new ArrayList<>();
CarbonFile carbonFile = FileFactory.getCarbonFile(loadModel.getTablePath());
CarbonFile[] filteredList = carbonFile.listFiles(new CarbonFileFilter() {
@Override
public boolean accept(CarbonFile file) {
return !file.getName().contains(loadModel.getFactTimeStamp() + "");
}
});
for (CarbonFile file : filteredList) {
filesToBeDeleted.add(file.getAbsolutePath());
}
deleteFiles(filesToBeDeleted);
}
/**
* This API will write the load level metadata for the loadmanagement module inorder to
* manage the load and query execution management smoothly.
*
* @param newMetaEntry
* @param loadModel
* @param uuid
* @return boolean which determines whether status update is done or not.
* @throws IOException
*/
public static boolean recordNewLoadMetadata(LoadMetadataDetails newMetaEntry,
final CarbonLoadModel loadModel, boolean loadStartEntry, boolean insertOverwrite, String uuid)
throws IOException {
// For Non Transactional tables no need to update the the Table Status file.
if (!loadModel.isCarbonTransactionalTable()) {
return true;
}
return recordNewLoadMetadata(newMetaEntry, loadModel, loadStartEntry, insertOverwrite, uuid,
new ArrayList<Segment>(), new ArrayList<Segment>());
}
/**
* This API will write the load level metadata for the loadmanagement module inorder to
* manage the load and query execution management smoothly.
*
* @param newMetaEntry
* @param loadModel
* @param uuid
* @return boolean which determines whether status update is done or not.
* @throws IOException
*/
public static boolean recordNewLoadMetadata(LoadMetadataDetails newMetaEntry,
CarbonLoadModel loadModel, boolean loadStartEntry, boolean insertOverwrite, String uuid,
List<Segment> segmentsToBeDeleted, List<Segment> segmentFilesTobeUpdated) throws IOException {
boolean status = false;
AbsoluteTableIdentifier identifier =
loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
if (loadModel.isCarbonTransactionalTable()) {
String metadataPath = CarbonTablePath.getMetadataPath(identifier.getTablePath());
if (!FileFactory.isFileExist(metadataPath)) {
FileFactory.mkdirs(metadataPath);
}
}
String tableStatusPath = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath());
SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
int retryCount = CarbonLockUtil
.getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT);
int maxTimeout = CarbonLockUtil
.getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT);
try {
if (carbonLock.lockWithRetries(retryCount, maxTimeout)) {
LOGGER.info(
"Acquired lock for table" + loadModel.getDatabaseName() + "." + loadModel.getTableName()
+ " for table status updation");
LoadMetadataDetails[] listOfLoadFolderDetailsArray =
SegmentStatusManager.readLoadMetadata(
CarbonTablePath.getMetadataPath(identifier.getTablePath()));
List<LoadMetadataDetails> listOfLoadFolderDetails =
new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
List<CarbonFile> staleFolders = new ArrayList<>();
Collections.addAll(listOfLoadFolderDetails, listOfLoadFolderDetailsArray);
// create a new segment Id if load has just begun else add the already generated Id
if (loadStartEntry) {
String segmentId =
String.valueOf(SegmentStatusManager.createNewSegmentId(listOfLoadFolderDetailsArray));
loadModel.setLoadMetadataDetails(listOfLoadFolderDetails);
LoadMetadataDetails entryTobeRemoved = null;
if (loadModel.getCarbonDataLoadSchema().getCarbonTable().isChildTableForMV()
&& !loadModel.getSegmentId().isEmpty()) {
for (LoadMetadataDetails entry : listOfLoadFolderDetails) {
if (entry.getLoadName().equalsIgnoreCase(loadModel.getSegmentId())) {
newMetaEntry.setLoadName(loadModel.getSegmentId());
newMetaEntry.setExtraInfo(entry.getExtraInfo());
entryTobeRemoved = entry;
}
}
} else {
newMetaEntry.setLoadName(segmentId);
loadModel.setSegmentId(segmentId);
}
listOfLoadFolderDetails.remove(entryTobeRemoved);
// Exception should be thrown if:
// 1. If insert overwrite is in progress and any other load or insert operation
// is triggered
// 2. If load or insert into operation is in progress and insert overwrite operation
// is triggered
for (LoadMetadataDetails entry : listOfLoadFolderDetails) {
if (entry.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS
&& SegmentStatusManager.isLoadInProgress(
identifier, entry.getLoadName())) {
throw new RuntimeException("Already insert overwrite is in progress");
} else if (newMetaEntry.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS
&& entry.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS
&& SegmentStatusManager.isLoadInProgress(
identifier, entry.getLoadName())) {
throw new RuntimeException("Already insert into or load is in progress");
}
}
listOfLoadFolderDetails.add(newMetaEntry);
} else {
newMetaEntry.setLoadName(String.valueOf(loadModel.getSegmentId()));
// existing entry needs to be overwritten as the entry will exist with some
// intermediate status
int indexToOverwriteNewMetaEntry = 0;
boolean found = false;
for (LoadMetadataDetails entry : listOfLoadFolderDetails) {
if (entry.getLoadName().equals(newMetaEntry.getLoadName())
&& entry.getLoadStartTime() == newMetaEntry.getLoadStartTime()) {
newMetaEntry.setExtraInfo(entry.getExtraInfo());
found = true;
break;
}
indexToOverwriteNewMetaEntry++;
}
if (insertOverwrite) {
for (LoadMetadataDetails entry : listOfLoadFolderDetails) {
if (entry.getSegmentStatus() != SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) {
entry.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
// For insert overwrite, we will delete the old segment folder immediately
// So collect the old segments here
addToStaleFolders(identifier, staleFolders, entry);
}
}
}
if (!found) {
LOGGER.error("Entry not found to update " + newMetaEntry + " From list :: "
+ listOfLoadFolderDetails);
throw new IOException("Entry not found to update in the table status file");
}
listOfLoadFolderDetails.set(indexToOverwriteNewMetaEntry, newMetaEntry);
}
// when no records are inserted then newSegmentEntry will be SegmentStatus.MARKED_FOR_DELETE
// so empty segment folder should be deleted
if (newMetaEntry.getSegmentStatus() == SegmentStatus.MARKED_FOR_DELETE) {
addToStaleFolders(identifier, staleFolders, newMetaEntry);
}
for (LoadMetadataDetails detail: listOfLoadFolderDetails) {
// if the segments is in the list of marked for delete then update the status.
if (segmentsToBeDeleted.contains(new Segment(detail.getLoadName()))) {
detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
} else if (segmentFilesTobeUpdated
.contains(Segment.toSegment(detail.getLoadName(), null))) {
detail.setSegmentFile(
detail.getLoadName() + "_" + newMetaEntry.getUpdateStatusFileName()
+ CarbonTablePath.SEGMENT_EXT);
}
}
SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetails
.toArray(new LoadMetadataDetails[0]));
// Delete all old stale segment folders
for (CarbonFile staleFolder : staleFolders) {
// try block is inside for loop because even if there is failure in deletion of 1 stale
// folder still remaining stale folders should be deleted
try {
CarbonUtil.deleteFoldersAndFiles(staleFolder);
} catch (IOException | InterruptedException e) {
LOGGER.error("Failed to delete stale folder: " + e.getMessage(), e);
}
}
status = true;
} else {
LOGGER.error("Not able to acquire the lock for Table status updation for table " + loadModel
.getDatabaseName() + "." + loadModel.getTableName());
}
} finally {
if (carbonLock.unlock()) {
LOGGER.info(
"Table unlocked successfully after table status updation" + loadModel.getDatabaseName()
+ "." + loadModel.getTableName());
} else {
LOGGER.error(
"Unable to unlock Table lock for table" + loadModel.getDatabaseName() + "." + loadModel
.getTableName() + " during table status updation");
}
}
return status;
}
private static void addToStaleFolders(AbsoluteTableIdentifier identifier,
List<CarbonFile> staleFolders, LoadMetadataDetails entry) throws IOException {
String path = CarbonTablePath.getSegmentPath(
identifier.getTablePath(), entry.getLoadName());
// add to the deletion list only if file exist else HDFS file system will throw
// exception while deleting the file if file path does not exist
if (FileFactory.isFileExist(path)) {
staleFolders.add(FileFactory.getCarbonFile(path));
}
}
/**
* Method to create new entry for load in table status file
*
* @param loadMetadataDetails
* @param loadStatus
* @param loadStartTime
* @param addLoadEndTime
*/
public static void populateNewLoadMetaEntry(LoadMetadataDetails loadMetadataDetails,
SegmentStatus loadStatus, long loadStartTime, boolean addLoadEndTime) {
if (addLoadEndTime) {
long loadEndDate = CarbonUpdateUtil.readCurrentTime();
loadMetadataDetails.setLoadEndTime(loadEndDate);
}
loadMetadataDetails.setSegmentStatus(loadStatus);
loadMetadataDetails.setLoadStartTime(loadStartTime);
}
public static boolean isValidEscapeSequence(String escapeChar) {
return escapeChar.equalsIgnoreCase(NEW_LINE.getName()) ||
escapeChar.equalsIgnoreCase(CARRIAGE_RETURN.getName()) ||
escapeChar.equalsIgnoreCase(TAB.getName()) ||
escapeChar.equalsIgnoreCase(BACKSPACE.getName());
}
public static boolean isValidBinaryDecoder(String binaryDecoderChar) {
return CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER_BASE64.equalsIgnoreCase(
binaryDecoderChar) ||
CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER_HEX.equalsIgnoreCase(
binaryDecoderChar) ||
StringUtils.isBlank(binaryDecoderChar);
}
public static String getEscapeChar(String escapeCharacter) {
if (escapeCharacter.equalsIgnoreCase(NEW_LINE.getName())) {
return NEW_LINE.getEscapeChar();
} else if (escapeCharacter.equalsIgnoreCase(BACKSPACE.getName())) {
return BACKSPACE.getEscapeChar();
} else if (escapeCharacter.equalsIgnoreCase(TAB.getName())) {
return TAB.getEscapeChar();
} else if (escapeCharacter.equalsIgnoreCase(CARRIAGE_RETURN.getName())) {
return CARRIAGE_RETURN.getEscapeChar();
}
return escapeCharacter;
}
public static void readAndUpdateLoadProgressInTableMeta(CarbonLoadModel model,
boolean insertOverwrite, String uuid) throws IOException {
LoadMetadataDetails newLoadMetaEntry = new LoadMetadataDetails();
SegmentStatus status = SegmentStatus.INSERT_IN_PROGRESS;
if (insertOverwrite) {
status = SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS;
}
// reading the start time of data load.
if (model.getFactTimeStamp() == 0) {
long loadStartTime = CarbonUpdateUtil.readCurrentTime();
model.setFactTimeStamp(loadStartTime);
}
CarbonLoaderUtil
.populateNewLoadMetaEntry(newLoadMetaEntry, status, model.getFactTimeStamp(), false);
boolean entryAdded = CarbonLoaderUtil
.recordNewLoadMetadata(newLoadMetaEntry, model, true, insertOverwrite, uuid);
if (!entryAdded) {
throw new IOException("Dataload failed due to failure in table status updation for "
+ model.getTableName());
}
}
public static void readAndUpdateLoadProgressInTableMeta(CarbonLoadModel model,
boolean insertOverwrite) throws IOException {
readAndUpdateLoadProgressInTableMeta(model, insertOverwrite, "");
}
/**
* This method will update the load failure entry in the table status file
*/
public static void updateTableStatusForFailure(CarbonLoadModel model, String uuid)
throws IOException {
// in case if failure the load status should be "Marked for delete" so that it will be taken
// care during clean up
SegmentStatus loadStatus = SegmentStatus.MARKED_FOR_DELETE;
// always the last entry in the load metadata details will be the current load entry
LoadMetadataDetails loadMetaEntry = model.getCurrentLoadMetadataDetail();
if (loadMetaEntry == null) {
return;
}
CarbonLoaderUtil
.populateNewLoadMetaEntry(loadMetaEntry, loadStatus, model.getFactTimeStamp(), true);
boolean entryAdded = CarbonLoaderUtil.recordNewLoadMetadata(
loadMetaEntry, model, false, false, uuid);
if (!entryAdded) {
throw new IOException(
"Failed to update failure entry in table status for " + model.getTableName());
}
}
/**
* This method will update the load failure entry in the table status file with empty uuid.
*/
public static void updateTableStatusForFailure(CarbonLoadModel model)
throws IOException {
updateTableStatusForFailure(model, "");
}
public static Dictionary getDictionary(DictionaryColumnUniqueIdentifier columnIdentifier)
throws IOException {
Cache<DictionaryColumnUniqueIdentifier, Dictionary> dictCache =
CacheProvider.getInstance().createCache(CacheType.REVERSE_DICTIONARY);
return dictCache.get(columnIdentifier);
}
public static Dictionary getDictionary(AbsoluteTableIdentifier absoluteTableIdentifier,
ColumnIdentifier columnIdentifier, DataType dataType)
throws IOException {
return getDictionary(
new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier, columnIdentifier, dataType));
}
/**
* This method will divide the blocks among the tasks of the nodes as per the data locality
*
* @param blockInfos
* @param noOfNodesInput -1 if number of nodes has to be decided
* based on block location information
* @param parallelism total no of tasks to execute in parallel
* @return
*/
public static Map<String, List<List<Distributable>>> nodeBlockTaskMapping(
List<Distributable> blockInfos, int noOfNodesInput, int parallelism,
List<String> activeNode) {
Map<String, List<Distributable>> mapOfNodes =
CarbonLoaderUtil.nodeBlockMapping(blockInfos, noOfNodesInput, activeNode,
BlockAssignmentStrategy.BLOCK_NUM_FIRST, null);
int taskPerNode = parallelism / mapOfNodes.size();
//assigning non zero value to noOfTasksPerNode
int noOfTasksPerNode = taskPerNode == 0 ? 1 : taskPerNode;
// divide the blocks of a node among the tasks of the node.
return assignBlocksToTasksPerNode(mapOfNodes, noOfTasksPerNode);
}
/**
* This method will divide the blocks among the nodes as per the data locality
* @param activeNodes List of all the active nodes running in cluster, based on these and the
* actual nodes, where blocks are present, the mapping is done
* @param blockInfos
* @return
*/
public static Map<String, List<Distributable>> nodeBlockMapping(List<Distributable> blockInfos,
int noOfNodesInput, List<String> activeNodes) {
return nodeBlockMapping(blockInfos, noOfNodesInput, activeNodes,
BlockAssignmentStrategy.BLOCK_NUM_FIRST, null);
}
/**
* This method will divide the blocks among the nodes as per the data locality
*
* @param blockInfos
* @return
*/
public static Map<String, List<Distributable>> nodeBlockMapping(List<Distributable> blockInfos) {
// -1 if number of nodes has to be decided based on block location information
return nodeBlockMapping(blockInfos, -1, null,
BlockAssignmentStrategy.BLOCK_NUM_FIRST, null);
}
/**
* This method will divide the blocks among the nodes as per the data locality
*
* @param blockInfos blocks
* @param numOfNodesInput -1 if number of nodes has to be decided
* based on block location information
* @param blockAssignmentStrategy strategy used to assign blocks
* @param expectedMinSizePerNode the property load_min_size_inmb specified by the user
* @return a map that maps node to blocks
*/
public static Map<String, List<Distributable>> nodeBlockMapping(
List<Distributable> blockInfos, int numOfNodesInput, List<String> activeNodes,
BlockAssignmentStrategy blockAssignmentStrategy, String expectedMinSizePerNode) {
ArrayList<NodeMultiBlockRelation> rtnNode2Blocks = new ArrayList<>();
Set<Distributable> uniqueBlocks = new HashSet<>(blockInfos);
ArrayList<NodeMultiBlockRelation> originNode2Blocks = createNode2BlocksMapping(blockInfos);
Set<String> nodes = new HashSet<>(originNode2Blocks.size());
for (NodeMultiBlockRelation relation : originNode2Blocks) {
nodes.add(relation.getNode());
}
int numOfNodes = (-1 == numOfNodesInput) ? nodes.size() : numOfNodesInput;
if (null != activeNodes) {
numOfNodes = activeNodes.size();
}
// calculate the average expected size for each node
long sizePerNode = 0;
long totalFileSize = 0;
if (BlockAssignmentStrategy.BLOCK_NUM_FIRST == blockAssignmentStrategy) {
if (blockInfos.size() > 0) {
sizePerNode = blockInfos.size() / numOfNodes;
}
sizePerNode = sizePerNode <= 0 ? 1 : sizePerNode;
} else if (BlockAssignmentStrategy.BLOCK_SIZE_FIRST == blockAssignmentStrategy
|| BlockAssignmentStrategy.NODE_MIN_SIZE_FIRST == blockAssignmentStrategy) {
for (Distributable blockInfo : uniqueBlocks) {
totalFileSize += ((TableBlockInfo) blockInfo).getBlockLength();
}
sizePerNode = totalFileSize / numOfNodes;
}
// if enable to control the minimum amount of input data for each node
if (BlockAssignmentStrategy.NODE_MIN_SIZE_FIRST == blockAssignmentStrategy) {
long expectedMinSizePerNodeInt = 0;
// validate the property load_min_size_inmb specified by the user
if (CarbonUtil.validateValidIntType(expectedMinSizePerNode)) {
expectedMinSizePerNodeInt = Integer.parseInt(expectedMinSizePerNode);
} else {
LOGGER.warn("Invalid load_min_size_inmb value found: " + expectedMinSizePerNode
+ ", only int value greater than 0 is supported.");
expectedMinSizePerNodeInt = Integer.parseInt(
CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB_DEFAULT);
}
// If the average expected size for each node greater than load min size,
// then fall back to default strategy
if (expectedMinSizePerNodeInt * 1024 * 1024 < sizePerNode) {
if (CarbonProperties.getInstance().isLoadSkewedDataOptimizationEnabled()) {
blockAssignmentStrategy = BlockAssignmentStrategy.BLOCK_SIZE_FIRST;
} else {
blockAssignmentStrategy = BlockAssignmentStrategy.BLOCK_NUM_FIRST;
// fall back to BLOCK_NUM_FIRST strategy need to reset
// the average expected size for each node
if (numOfNodes == 0) {
sizePerNode = 1;
} else {
sizePerNode = blockInfos.size() / numOfNodes;
sizePerNode = sizePerNode <= 0 ? 1 : sizePerNode;
}
}
LOGGER.info("Specified minimum data size to load is less than the average size "
+ "for each node, fallback to default strategy" + blockAssignmentStrategy);
} else {
sizePerNode = expectedMinSizePerNodeInt;
}
}
if (BlockAssignmentStrategy.NODE_MIN_SIZE_FIRST == blockAssignmentStrategy) {
// assign blocks to each node ignore data locality
assignBlocksIgnoreDataLocality(rtnNode2Blocks, sizePerNode, uniqueBlocks, activeNodes);
} else {
// assign blocks to each node
assignBlocksByDataLocality(rtnNode2Blocks, sizePerNode, uniqueBlocks, originNode2Blocks,
activeNodes, blockAssignmentStrategy);
}
// if any blocks remain then assign them to nodes in round robin.
assignLeftOverBlocks(rtnNode2Blocks, uniqueBlocks, sizePerNode, activeNodes,
blockAssignmentStrategy);
// convert
Map<String, List<Distributable>> rtnNodeBlocksMap =
new HashMap<String, List<Distributable>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
for (NodeMultiBlockRelation relation : rtnNode2Blocks) {
rtnNodeBlocksMap.put(relation.getNode(), relation.getBlocks());
}
return rtnNodeBlocksMap;
}
/**
* Assigning the blocks of a node to tasks.
*
* @param nodeBlocksMap nodeName to list of blocks mapping
* @param noOfTasksPerNode
* @return
*/
private static Map<String, List<List<Distributable>>> assignBlocksToTasksPerNode(
Map<String, List<Distributable>> nodeBlocksMap, int noOfTasksPerNode) {
Map<String, List<List<Distributable>>> outputMap =
new HashMap<String, List<List<Distributable>>>(
CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
// for each node
for (Map.Entry<String, List<Distributable>> eachNode : nodeBlocksMap.entrySet()) {
List<Distributable> blockOfEachNode = eachNode.getValue();
//sorting the block so same block will be give to same executor
Collections.sort(blockOfEachNode);
// create the task list for each node.
createTaskListForNode(outputMap, noOfTasksPerNode, eachNode.getKey());
// take all the block of node and divide it among the tasks of a node.
divideBlockToTasks(outputMap, eachNode.getKey(), blockOfEachNode);
}
return outputMap;
}
/**
* This will divide the blocks of a node to tasks of the node.
*
* @param outputMap
* @param key
* @param blockOfEachNode
*/
private static void divideBlockToTasks(Map<String, List<List<Distributable>>> outputMap,
String key, List<Distributable> blockOfEachNode) {
List<List<Distributable>> taskLists = outputMap.get(key);
int tasksOfNode = taskLists.size();
int i = 0;
for (Distributable block : blockOfEachNode) {
taskLists.get(i % tasksOfNode).add(block);
i++;
}
}
/**
* This will create the empty list for each task of a node.
*
* @param outputMap
* @param noOfTasksPerNode
* @param key
*/
private static void createTaskListForNode(Map<String, List<List<Distributable>>> outputMap,
int noOfTasksPerNode, String key) {
List<List<Distributable>> nodeTaskList =
new ArrayList<List<Distributable>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
for (int i = 0; i < noOfTasksPerNode; i++) {
List<Distributable> eachTask =
new ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
nodeTaskList.add(eachTask);
}
outputMap.put(key, nodeTaskList);
}
/**
* If any left over data blocks are present then assign those to nodes in round robin way. This
* will not obey the data locality.
*/
private static void assignLeftOverBlocks(ArrayList<NodeMultiBlockRelation> outputMap,
Set<Distributable> leftOverBlocks, long expectedSizePerNode, List<String> activeNodes,
BlockAssignmentStrategy blockAssignmentStrategy) {
Map<String, Integer> node2Idx = new HashMap<>(outputMap.size());
for (int idx = 0; idx < outputMap.size(); idx++) {
node2Idx.put(outputMap.get(idx).getNode(), idx);
}
// iterate all the nodes and try to allocate blocks to the nodes
if (activeNodes != null) {
for (String activeNode : activeNodes) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Second assignment iteration: assign for executor: " + activeNode);
}
Integer idx;
List<Distributable> blockLst;
if (node2Idx.containsKey(activeNode)) {
idx = node2Idx.get(activeNode);
blockLst = outputMap.get(idx).getBlocks();
} else {
idx = node2Idx.size();
blockLst = new ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
}
populateBlocks(leftOverBlocks, expectedSizePerNode, blockLst, blockAssignmentStrategy);
if (!node2Idx.containsKey(activeNode) && blockLst.size() > 0) {
outputMap.add(idx, new NodeMultiBlockRelation(activeNode, blockLst));
node2Idx.put(activeNode, idx);
}
}
} else {
for (NodeMultiBlockRelation entry : outputMap) {
List<Distributable> blockLst = entry.getBlocks();
populateBlocks(leftOverBlocks, expectedSizePerNode, blockLst, blockAssignmentStrategy);
}
}
// if there is still blocks left, allocate them in round robin manner to each nodes
assignBlocksUseRoundRobin(outputMap, leftOverBlocks, blockAssignmentStrategy);
}
/**
* assign remaining blocks to nodes
*
* @param remainingBlocks blocks to be allocated
* @param expectedSizePerNode expected size for each node
* @param blockLst destination for the blocks to be allocated
* @param blockAssignmentStrategy block assignment stretegy
*/
private static void populateBlocks(Set<Distributable> remainingBlocks,
long expectedSizePerNode, List<Distributable> blockLst,
BlockAssignmentStrategy blockAssignmentStrategy) {
switch (blockAssignmentStrategy) {
case BLOCK_NUM_FIRST:
populateBlocksByNum(remainingBlocks, expectedSizePerNode, blockLst);
break;
case BLOCK_SIZE_FIRST:
case NODE_MIN_SIZE_FIRST:
populateBlocksBySize(remainingBlocks, expectedSizePerNode, blockLst);
break;
default:
throw new IllegalArgumentException(
"Unsupported block assignment strategy: " + blockAssignmentStrategy);
}
}
/**
* Taken N number of distributable blocks from {@param remainingBlocks} and add them to output
* {@param blockLst}. After added, the total number of {@param blockLst} is less
* than {@param expectedSizePerNode}.
*/
private static void populateBlocksByNum(Set<Distributable> remainingBlocks,
long expectedSizePerNode, List<Distributable> blockLst) {
Iterator<Distributable> blocks = remainingBlocks.iterator();
// if the node is already having the per block nodes then avoid assign the extra blocks
if (blockLst.size() == expectedSizePerNode) {
return;
}
while (blocks.hasNext()) {
Distributable block = blocks.next();
blockLst.add(block);
blocks.remove();
if (blockLst.size() >= expectedSizePerNode) {
break;
}
}
}
/**
* Taken N number of distributable blocks from {@param remainingBlocks} and add them to output
* {@param blockLst}. After added, the total accumulated block size of {@param blockLst}
* is less than {@param expectedSizePerNode}.
*/
private static void populateBlocksBySize(Set<Distributable> remainingBlocks,
long expectedSizePerNode, List<Distributable> blockLst) {
Iterator<Distributable> blocks = remainingBlocks.iterator();
//if the node is already having the avg node size then avoid assign the extra blocks
long fileSize = 0;
for (Distributable block : blockLst) {
fileSize += ((TableBlockInfo) block).getBlockLength();
}
if (fileSize >= expectedSizePerNode) {
LOGGER.debug("Capacity is full, skip allocate blocks on this node");
return;
}
while (blocks.hasNext()) {
Distributable block = blocks.next();
long thisBlockSize = ((TableBlockInfo) block).getBlockLength();
if (fileSize < expectedSizePerNode) {
// `fileSize==0` means there are no blocks assigned to this node before
if (fileSize == 0 || fileSize + thisBlockSize <= expectedSizePerNode * 1.1D) {
blockLst.add(block);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Second Assignment iteration: "
+ ((TableBlockInfo) block).getFilePath() + "-"
+ ((TableBlockInfo) block).getBlockLength() + "-->currentNode");
}
fileSize += thisBlockSize;
blocks.remove();
}
} else {
break;
}
}
}
/**
* allocate the blocks in round robin manner
*/
private static void assignBlocksUseRoundRobin(ArrayList<NodeMultiBlockRelation> node2Blocks,
Set<Distributable> remainingBlocks, BlockAssignmentStrategy blockAssignmentStrategy) {
switch (blockAssignmentStrategy) {
case BLOCK_NUM_FIRST:
roundRobinAssignBlocksByNum(node2Blocks, remainingBlocks);
break;
case BLOCK_SIZE_FIRST:
case NODE_MIN_SIZE_FIRST:
roundRobinAssignBlocksBySize(node2Blocks, remainingBlocks);
break;
default:
throw new IllegalArgumentException("Unsupported block assignment strategy: "
+ blockAssignmentStrategy);
}
}
private static void roundRobinAssignBlocksByNum(ArrayList<NodeMultiBlockRelation> outputMap,
Set<Distributable> remainingBlocks) {
for (NodeMultiBlockRelation relation: outputMap) {
Iterator<Distributable> blocks = remainingBlocks.iterator();
if (blocks.hasNext()) {
Distributable block = blocks.next();
List<Distributable> blockLst = relation.getBlocks();
blockLst.add(block);
blocks.remove();
}
}
}
private static void roundRobinAssignBlocksBySize(ArrayList<NodeMultiBlockRelation> outputMap,
Set<Distributable> remainingBlocks) {
Iterator<Distributable> blocks = remainingBlocks.iterator();
while (blocks.hasNext()) {
// sort the allocated node-2-blocks in ascending order, the total data size of first one is
// the smallest, so we assign this block to it.
Collections.sort(outputMap, NodeMultiBlockRelation.DATA_SIZE_ASC_COMPARATOR);
Distributable block = blocks.next();
List<Distributable> blockLst = outputMap.get(0).getBlocks();
blockLst.add(block);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("RoundRobin assignment iteration: "
+ ((TableBlockInfo) block).getFilePath() + "-"
+ ((TableBlockInfo) block).getBlockLength() + "-->" + outputMap.get(0).getNode());
}
blocks.remove();
}
}
/**
* allocate distributable blocks to nodes based on data locality
*/
private static void assignBlocksByDataLocality(
ArrayList<NodeMultiBlockRelation> outputNode2Blocks,
long expectedSizePerNode, Set<Distributable> remainingBlocks,
List<NodeMultiBlockRelation> inputNode2Blocks, List<String> activeNodes,
BlockAssignmentStrategy blockAssignmentStrategy) {
if (BlockAssignmentStrategy.BLOCK_SIZE_FIRST == blockAssignmentStrategy) {
// sort nodes based on data size of all blocks per node, so that nodes having bigger size
// are assigned first
Collections.sort(inputNode2Blocks, NodeMultiBlockRelation.DATA_SIZE_DESC_COMPARATOR);
} else {
// sort nodes based on number of blocks per node, so that nodes having lesser blocks
// are assigned first
Collections.sort(inputNode2Blocks);
}
Map<String, Integer> executor2Idx = new HashMap<>();
for (NodeMultiBlockRelation nodeMultiBlockRelation : inputNode2Blocks) {
String nodeName = nodeMultiBlockRelation.getNode();
// assign the block to the node only if the node is active
String activeExecutor = nodeName;
if (null != activeNodes) {
activeExecutor = getActiveExecutor(activeNodes, nodeName);
if (null == activeExecutor) {
continue;
}
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("First Assignment iteration: assign for executor: " + activeExecutor);
}
List<Distributable> blocksInThisNode = nodeMultiBlockRelation.getBlocks();
if (BlockAssignmentStrategy.BLOCK_SIZE_FIRST == blockAssignmentStrategy) {
// sort blocks based on block size, so that bigger blocks will be assigned first
Collections.sort(blocksInThisNode, TableBlockInfo.DATA_SIZE_DESC_COMPARATOR);
}
long nodeCapacity = 0;
// loop thru blocks of each Node
for (Distributable block : nodeMultiBlockRelation.getBlocks()) {
if (!remainingBlocks.contains(block)) {
// this block has been added before
continue;
}
// this is the first time to add block to this node, initialize it
if (!executor2Idx.containsKey(activeExecutor)) {
Integer idx = executor2Idx.size();
outputNode2Blocks.add(idx, new NodeMultiBlockRelation(activeExecutor,
new ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE)));
executor2Idx.put(activeExecutor, idx);
}
// assign this block to this node if node has capacity left
if (BlockAssignmentStrategy.BLOCK_NUM_FIRST == blockAssignmentStrategy) {
if (nodeCapacity < expectedSizePerNode) {
Integer idx = executor2Idx.get(activeExecutor);
List<Distributable> infos = outputNode2Blocks.get(idx).getBlocks();
infos.add(block);
nodeCapacity++;
if (LOGGER.isDebugEnabled()) {
try {
LOGGER.debug("First Assignment iteration: block("
+ StringUtils.join(block.getLocations(), ", ")
+ ")-->" + activeExecutor);
} catch (IOException e) {
LOGGER.error(e.getMessage(), e);
}
}
remainingBlocks.remove(block);
} else {
// No need to continue loop as node is full
break;
}
} else if (BlockAssignmentStrategy.BLOCK_SIZE_FIRST == blockAssignmentStrategy) {
long thisBlockSize = ((TableBlockInfo) block).getBlockLength();
// `nodeCapacity == 0` means that there is a huge block that already exceed the
// `expectedSize` of the node, so we have to assign it to some node, otherwise it will
// be assigned in the last RoundRobin iteration.
if (nodeCapacity == 0 || nodeCapacity < expectedSizePerNode) {
if (nodeCapacity == 0 || nodeCapacity + thisBlockSize <= expectedSizePerNode * 1.05D) {
Integer idx = executor2Idx.get(activeExecutor);
List<Distributable> blocks = outputNode2Blocks.get(idx).getBlocks();
blocks.add(block);
nodeCapacity += thisBlockSize;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"First Assignment iteration: " + ((TableBlockInfo) block).getFilePath() + '-'
+ ((TableBlockInfo) block).getBlockLength() + "-->" + activeExecutor);
}
remainingBlocks.remove(block);
}
// this block is too big for current node and there are still capacity left
// for small files, so continue to allocate block on this node in next iteration.
} else {
// No need to continue loop as node is full
break;
}
} else {
throw new IllegalArgumentException(
"Unsupported block assignment strategy: " + blockAssignmentStrategy);
}
}
}
}
/**
* allocate distributable blocks to nodes based on ignore data locality
*/
private static void assignBlocksIgnoreDataLocality(
ArrayList<NodeMultiBlockRelation> outputNode2Blocks,
long expectedSizePerNode, Set<Distributable> remainingBlocks,
List<String> activeNodes) {
// get all blocks
Set<Distributable> uniqueBlocks = new HashSet<>(remainingBlocks);
// shuffle activeNodes ignore data locality
List<String> shuffleNodes = new ArrayList<>(activeNodes);
Collections.shuffle(shuffleNodes);
for (String activeNode : shuffleNodes) {
long nodeCapacity = 0;
NodeMultiBlockRelation nodeBlock = new NodeMultiBlockRelation(activeNode,
new ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE));
// loop thru blocks of each Node
for (Distributable block : uniqueBlocks) {
if (!remainingBlocks.contains(block)) {
// this block has been added before
continue;
}
long thisBlockSize = ((TableBlockInfo) block).getBlockLength();
if (nodeCapacity == 0
|| nodeCapacity + thisBlockSize <= expectedSizePerNode * 1024 * 1024) {
nodeBlock.getBlocks().add(block);
nodeCapacity += thisBlockSize;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"First Assignment iteration: " + ((TableBlockInfo) block).getFilePath() + '-'
+ ((TableBlockInfo) block).getBlockLength() + "-->" + activeNode);
}
remainingBlocks.remove(block);
// this block is too big for current node and there are still capacity left
// for small files, so continue to allocate block on this node in next iteration.
} else {
// No need to continue loop as node is full
break;
}
}
if (nodeBlock.getBlocks().size() != 0) {
outputNode2Blocks.add(nodeBlock);
}
}
}
/**
* method validates whether the node is active or not.
*
* @param activeNode
* @param nodeName
* @return returns true if active else false.
*/
private static String getActiveExecutor(List activeNode, String nodeName) {
boolean isActiveNode = activeNode.contains(nodeName);
if (isActiveNode) {
return nodeName;
}
//if localhost then retrieve the localhost name then do the check
else if (nodeName.equals("localhost")) {
try {
String hostName = InetAddress.getLocalHost().getHostName();
isActiveNode = activeNode.contains(hostName);
if (isActiveNode) {
return hostName;
}
} catch (UnknownHostException ue) {
isActiveNode = false;
}
} else {
try {
String hostAddress = InetAddress.getByName(nodeName).getHostAddress();
isActiveNode = activeNode.contains(hostAddress);
if (isActiveNode) {
return hostAddress;
}
} catch (UnknownHostException ue) {
isActiveNode = false;
}
}
return null;
}
/**
* Create node to blocks mapping
*
* @param blockInfos input block info
*/
private static ArrayList<NodeMultiBlockRelation> createNode2BlocksMapping(
List<Distributable> blockInfos) {
Map<String, Integer> node2Idx = new HashMap<>();
ArrayList<NodeMultiBlockRelation> node2Blocks = new ArrayList<>();
for (Distributable blockInfo : blockInfos) {
try {
for (final String eachNode : blockInfo.getLocations()) {
if (node2Idx.containsKey(eachNode)) {
Integer idx = node2Idx.get(eachNode);
List<Distributable> blocks = node2Blocks.get(idx).getBlocks();
blocks.add(blockInfo);
} else {
// add blocks to this node for the first time
Integer idx = node2Idx.size();
List<Distributable> blocks = new ArrayList<>();
blocks.add(blockInfo);
node2Blocks.add(idx, new NodeMultiBlockRelation(eachNode, blocks));
node2Idx.put(eachNode, idx);
}
}
} catch (IOException e) {
throw new RuntimeException("error getting location of block: " + blockInfo.toString(), e);
}
}
return node2Blocks;
}
/**
* This method will get the store location for the given path, segment id and partition id
*/
public static void checkAndCreateCarbonDataLocation(String segmentId, CarbonTable carbonTable) {
String segmentFolder = CarbonTablePath.getSegmentPath(
carbonTable.getTablePath(), segmentId);
CarbonUtil.checkAndCreateFolder(segmentFolder);
}
/*
* This method will add data size and index size into tablestatus for each segment. And also
* returns the size of the segment.
*/
public static Long addDataIndexSizeIntoMetaEntry(LoadMetadataDetails loadMetadataDetails,
String segmentId, CarbonTable carbonTable) throws IOException {
Map<String, Long> dataIndexSize = CarbonUtil.getDataSizeAndIndexSize(
carbonTable.getTablePath(),
new Segment(segmentId, loadMetadataDetails.getSegmentFile()));
Long dataSize = dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE);
loadMetadataDetails.setDataSize(String.valueOf(dataSize));
Long indexSize = dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_INDEX_SIZE);
loadMetadataDetails.setIndexSize(String.valueOf(indexSize));
return dataSize + indexSize;
}
public static void addIndexSizeIntoMetaEntry(LoadMetadataDetails loadMetadataDetails,
String segmentId, CarbonTable carbonTable) throws IOException {
Segment segment = new Segment(segmentId, loadMetadataDetails.getSegmentFile());
if (segment.getSegmentFileName() != null) {
SegmentFileStore fileStore =
new SegmentFileStore(carbonTable.getTablePath(), segment.getSegmentFileName());
if (fileStore.getLocationMap() != null) {
fileStore.readIndexFiles(FileFactory.getConfiguration());
long carbonIndexSize = CarbonUtil.getCarbonIndexSize(fileStore, fileStore.getLocationMap());
loadMetadataDetails.setIndexSize(String.valueOf(carbonIndexSize));
}
}
}
/**
* Merge index files with in the segment of partitioned table
*
* @param table
* @param segmentId
* @param uuid
* @return
* @throws IOException
*/
public static String mergeIndexFilesInPartitionedSegment(CarbonTable table, String segmentId,
String uuid, String partitionPath) throws IOException {
String tablePath = table.getTablePath();
return new CarbonIndexFileMergeWriter(table)
.mergeCarbonIndexFilesOfSegment(segmentId, uuid, tablePath, partitionPath);
}
private static void deleteFiles(List<String> filesToBeDeleted) throws IOException {
for (String filePath : filesToBeDeleted) {
FileFactory.deleteFile(filePath);
}
}
/**
* Update specified segment status for load to MarkedForDelete in case of failure
*/
public static void updateTableStatusInCaseOfFailure(String loadName,
AbsoluteTableIdentifier absoluteTableIdentifier, String tableName, String databaseName,
String tablePath, String metaDataPath) throws IOException {
SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
try {
if (carbonLock.lockWithRetries()) {
LOGGER.info("Acquired lock for table" + databaseName + "." + tableName
+ " for table status updation");
LoadMetadataDetails[] loadMetadataDetails =
SegmentStatusManager.readLoadMetadata(metaDataPath);
boolean ifTableStatusUpdateRequired = false;
for (LoadMetadataDetails loadMetadataDetail : loadMetadataDetails) {
if (loadMetadataDetail.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS && loadName
.equalsIgnoreCase(loadMetadataDetail.getLoadName())) {
loadMetadataDetail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
ifTableStatusUpdateRequired = true;
}
}
if (ifTableStatusUpdateRequired) {
SegmentStatusManager
.writeLoadDetailsIntoFile(CarbonTablePath.getTableStatusFilePath(tablePath),
loadMetadataDetails);
}
} else {
LOGGER.error(
"Not able to acquire the lock for Table status updation for table " + databaseName + "."
+ tableName);
}
} finally {
if (carbonLock.unlock()) {
LOGGER.info("Table unlocked successfully after table status updation" + databaseName + "."
+ tableName);
} else {
LOGGER.error("Unable to unlock Table lock for table" + databaseName + "." + tableName
+ " during table status updation");
}
}
}
}