blob: ccb1e2a964b0ad7508d9bbc5c2ca8d93f6841033 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.secondaryindex.load;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.locks.ICarbonLock;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.SegmentFileStore;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
import org.apache.carbondata.processing.util.CarbonLoaderUtil;
import org.apache.log4j.Logger;
import org.apache.spark.sql.index.CarbonIndexUtil;
public class CarbonInternalLoaderUtil {
private static final Logger LOGGER =
LogServiceFactory.getLogService(CarbonInternalLoaderUtil.class.getName());
public static List<String> getListOfValidSlices(LoadMetadataDetails[] details) {
List<String> activeSlices =
new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
for (LoadMetadataDetails oneLoad : details) {
if (SegmentStatus.SUCCESS.equals(oneLoad.getSegmentStatus())
|| SegmentStatus.LOAD_PARTIAL_SUCCESS.equals(oneLoad.getSegmentStatus())
|| SegmentStatus.MARKED_FOR_UPDATE.equals(oneLoad.getSegmentStatus())) {
activeSlices.add(oneLoad.getLoadName());
}
}
return activeSlices;
}
/**
* This method will return the mapping of valid segments to segment laod start time
*
*/
public static Map<String, Long> getSegmentToLoadStartTimeMapping(LoadMetadataDetails[] details) {
Map<String, Long> segmentToLoadStartTimeMap = new HashMap<>(details.length);
for (LoadMetadataDetails oneLoad : details) {
// valid segments will only have Success status
if (SegmentStatus.SUCCESS.equals(oneLoad.getSegmentStatus())
|| SegmentStatus.LOAD_PARTIAL_SUCCESS.equals(oneLoad.getSegmentStatus())) {
segmentToLoadStartTimeMap.put(oneLoad.getLoadName(), oneLoad.getLoadStartTime());
}
}
return segmentToLoadStartTimeMap;
}
/**
* This API will write the load level metadata for the loadmanagement module inorder to
* manage the load and query execution management smoothly.
*
* @return boolean which determines whether status update is done or not.
*/
public static boolean recordLoadMetadata(List<LoadMetadataDetails> newLoadMetadataDetails,
List<String> validSegments, CarbonTable carbonTable, List<CarbonTable> indexCarbonTables,
String databaseName, String tableName) {
boolean status = false;
String metaDataFilepath = carbonTable.getMetadataPath();
AbsoluteTableIdentifier absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier();
SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
try {
if (carbonLock.lockWithRetries()) {
LOGGER.info("Acquired lock for table" + databaseName + "." + tableName
+ " for table status updation");
if (isSegmentsAlreadyCompactedForNewMetaDataDetails(indexCarbonTables, tableName,
newLoadMetadataDetails)) {
return false;
}
LoadMetadataDetails[] currentLoadMetadataDetails =
SegmentStatusManager.readLoadMetadata(metaDataFilepath);
List<LoadMetadataDetails> updatedLoadMetadataDetails =
new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
// check which load needs to be overwritten which are in in progress state
boolean found = false;
for (int i = 0; i < currentLoadMetadataDetails.length; i++) {
for (LoadMetadataDetails newLoadMetadataDetail : newLoadMetadataDetails) {
if (currentLoadMetadataDetails[i].getLoadName()
.equals(newLoadMetadataDetail.getLoadName())) {
currentLoadMetadataDetails[i] = newLoadMetadataDetail;
found = true;
break;
}
}
updatedLoadMetadataDetails.add(currentLoadMetadataDetails[i]);
}
// check if newLoadMetadataDetail has segments which are not in currentLoadMetaDetails
// and add them to the updatedLoadMetadataDetails
boolean foundNext = false;
for (int i = 0; i < newLoadMetadataDetails.size(); i++) {
foundNext = false;
for (int j = 0; j < currentLoadMetadataDetails.length; j++) {
if (newLoadMetadataDetails.get(i).getLoadName().equals(currentLoadMetadataDetails[j].getLoadName())) {
foundNext = true;
break;
}
if (j == currentLoadMetadataDetails.length - 1 && !foundNext) {
// if not found in the list then add it
updatedLoadMetadataDetails.add(newLoadMetadataDetails.get(i));
found = true;
}
}
}
// when data load is done for first time, add all the details
if (currentLoadMetadataDetails.length == 0 || !found) {
updatedLoadMetadataDetails.addAll(newLoadMetadataDetails);
}
List<String> indexTables = CarbonIndexUtil.getSecondaryIndexes(carbonTable);
if (!indexTables.isEmpty()) {
List<LoadMetadataDetails> newSegmentDetailsListForIndexTable =
new ArrayList<>(validSegments.size());
for (String segmentId : validSegments) {
LoadMetadataDetails newSegmentDetailsObject = new LoadMetadataDetails();
newSegmentDetailsObject.setLoadName(segmentId);
newSegmentDetailsListForIndexTable.add(newSegmentDetailsObject);
}
for (CarbonTable indexTable : indexCarbonTables) {
List<LoadMetadataDetails> indexTableDetailsList = CarbonIndexUtil
.getTableStatusDetailsForIndexTable(updatedLoadMetadataDetails, indexTable,
newSegmentDetailsListForIndexTable);
SegmentStatusManager.writeLoadDetailsIntoFile(
CarbonTablePath.getTableStatusFilePath(indexTable.getTablePath()),
indexTableDetailsList
.toArray(new LoadMetadataDetails[0]));
}
} else if (carbonTable.isIndexTable()) {
SegmentStatusManager.writeLoadDetailsIntoFile(
metaDataFilepath + CarbonCommonConstants.FILE_SEPARATOR
+ CarbonTablePath.TABLE_STATUS_FILE, updatedLoadMetadataDetails
.toArray(new LoadMetadataDetails[0]));
}
status = true;
} else {
LOGGER.error(
"Not able to acquire the lock for Table status updation for table " + databaseName + "."
+ tableName);
}
} catch (IOException e) {
LOGGER.error(
"Not able to acquire the lock for Table status updation for table " + databaseName + "."
+ tableName);
}
finally {
if (carbonLock.unlock()) {
LOGGER.info("Table unlocked successfully after table status updation" + databaseName + "."
+ tableName);
} else {
LOGGER.error("Unable to unlock Table lock for table" + databaseName + "." + tableName
+ " during table status updation");
}
}
return status;
}
/**
* This method read the details of SI table and check whether new metadatadetails are already
* compacted, if it is, then already compaction for SI is completed and updating with new segment
* status is useless, this can happen in case of updating the status of index while loading
* segments for failed segments, so do not update anything, just exit gracefully
*/
private static boolean isSegmentsAlreadyCompactedForNewMetaDataDetails(
List<CarbonTable> indexTables, String indexTableName,
List<LoadMetadataDetails> newLoadMetadataDetails) {
CarbonTable currentIndexTable = null;
for (CarbonTable indexTable : indexTables) {
if (indexTable.getTableName().equalsIgnoreCase(indexTableName)) {
currentIndexTable = indexTable;
break;
}
}
boolean isIndexTableSegmentsCompacted = false;
if (null != currentIndexTable) {
LoadMetadataDetails[] existingLoadMetaDataDetails =
SegmentStatusManager.readLoadMetadata(currentIndexTable.getMetadataPath());
for (LoadMetadataDetails existingLoadMetaDataDetail : existingLoadMetaDataDetails) {
for (LoadMetadataDetails newLoadMetadataDetail : newLoadMetadataDetails) {
if (existingLoadMetaDataDetail.getLoadName()
.equalsIgnoreCase(newLoadMetadataDetail.getLoadName())
&& existingLoadMetaDataDetail.getSegmentStatus() == SegmentStatus.COMPACTED) {
isIndexTableSegmentsCompacted = true;
break;
}
}
if (isIndexTableSegmentsCompacted) {
break;
}
}
return isIndexTableSegmentsCompacted;
} else {
return false;
}
}
/**
* method to update table status in case of IUD Update Delta Compaction.
*
*/
public static boolean updateLoadMetadataWithMergeStatus(CarbonTable indexCarbonTable,
String[] loadsToMerge, String mergedLoadNumber, CarbonLoadModel carbonLoadModel,
Map<String, String> segmentToLoadStartTimeMap, long mergeLoadStartTime,
SegmentStatus segmentStatus, long newLoadStartTime, List<String> rebuiltSegments)
throws IOException {
boolean tableStatusUpdationStatus = false;
List<String> loadMergeList = new ArrayList<>(Arrays.asList(loadsToMerge));
AbsoluteTableIdentifier absoluteTableIdentifier =
carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
try {
if (carbonLock.lockWithRetries()) {
LOGGER.info("Acquired lock for the table " + carbonLoadModel.getDatabaseName() + "."
+ carbonLoadModel.getTableName() + " for table status updation ");
LoadMetadataDetails[] loadDetails =
SegmentStatusManager.readLoadMetadata(indexCarbonTable.getMetadataPath());
long modificationOrDeletionTimeStamp = CarbonUpdateUtil.readCurrentTime();
for (LoadMetadataDetails loadDetail : loadDetails) {
// check if this segment is merged.
if (loadMergeList.contains(loadDetail.getLoadName()) || loadMergeList
.contains(loadDetail.getMergedLoadName())) {
// if the compacted load is deleted after the start of the compaction process,
// then need to discard the compaction process and treat it as failed compaction.
if (loadDetail.getSegmentStatus() == SegmentStatus.MARKED_FOR_DELETE) {
LOGGER.error("Compaction is aborted as the segment " + loadDetail.getLoadName()
+ " is deleted after the compaction is started.");
return false;
}
loadDetail.setSegmentStatus(SegmentStatus.COMPACTED);
loadDetail.setModificationOrDeletionTimestamp(modificationOrDeletionTimeStamp);
loadDetail.setMergedLoadName(mergedLoadNumber);
}
}
// create entry for merged one.
LoadMetadataDetails loadMetadataDetails = new LoadMetadataDetails();
loadMetadataDetails.setSegmentStatus(segmentStatus);
long loadEnddate = CarbonUpdateUtil.readCurrentTime();
loadMetadataDetails.setLoadEndTime(loadEnddate);
loadMetadataDetails.setLoadName(mergedLoadNumber);
loadMetadataDetails.setSegmentFile(SegmentFileStore.genSegmentFileName(mergedLoadNumber,
String.valueOf(segmentToLoadStartTimeMap.get(mergedLoadNumber)))
+ CarbonTablePath.SEGMENT_EXT);
CarbonLoaderUtil
.addDataIndexSizeIntoMetaEntry(loadMetadataDetails, mergedLoadNumber, indexCarbonTable);
if (rebuiltSegments.contains(loadMetadataDetails.getLoadName())) {
loadMetadataDetails.setLoadStartTime(newLoadStartTime);
} else {
loadMetadataDetails.setLoadStartTime(mergeLoadStartTime);
}
// put the merged folder entry
for (int i = 0; i < loadDetails.length; i++) {
if (loadDetails[i].getLoadName().equals(loadMetadataDetails.getLoadName())) {
loadDetails[i] = loadMetadataDetails;
}
}
// if this is a major compaction then set the segment as major compaction.
List<LoadMetadataDetails> updatedDetailsList = new ArrayList<>(Arrays.asList(loadDetails));
SegmentStatusManager.writeLoadDetailsIntoFile(
CarbonTablePath.getTableStatusFilePath(indexCarbonTable.getTablePath()),
updatedDetailsList.toArray(new LoadMetadataDetails[0]));
tableStatusUpdationStatus = true;
} else {
LOGGER.error(
"Could not able to obtain lock for table" + carbonLoadModel.getDatabaseName() + "."
+ carbonLoadModel.getTableName() + "for table status updation");
}
} finally {
if (carbonLock.unlock()) {
LOGGER.info("Table unlocked successfully after table status updation" + carbonLoadModel
.getDatabaseName() + "." + carbonLoadModel.getTableName());
} else {
LOGGER.error(
"Unable to unlock Table lock for table" + carbonLoadModel.getDatabaseName() + "."
+ carbonLoadModel.getTableName() + " during table status updation");
}
}
return tableStatusUpdationStatus;
}
/**
* Method to check if main table and SI have same number of valid segments or not
*/
public static boolean checkMainTableSegEqualToSISeg(LoadMetadataDetails[] mainTableLoadMetadataDetails,
LoadMetadataDetails[] siTableLoadMetadataDetails) {
List<String> mainTableSegmentsList =
getListOfValidSlices(mainTableLoadMetadataDetails);
List<String> indexList =
getListOfValidSlices(siTableLoadMetadataDetails);
Collections.sort(mainTableSegmentsList);
Collections.sort(indexList);
// In the case when number of SI segments are more than the maintable segments do nothing
// and proceed to process the segments. Return False in case if maintable segments are more
// than SI Segments
if (indexList.size() < mainTableSegmentsList.size()) {
return false;
}
// There can be cases when the number of segments in the main table are less than the index
// table. In this case mapping all the segments in main table to SI table.
// Return False if a segment in maintable is not in indextable
HashSet<String> indexTableSet = new HashSet<String>();
for (int i = 0; i < indexList.size(); i++) {
indexTableSet.add(indexList.get(i));
}
for (int i = 0; i < mainTableSegmentsList.size(); i++) {
if (!indexTableSet.contains(mainTableSegmentsList.get(i))) {
return false;
}
}
return true;
}
/**
* Method to check if main table has in progress load and same segment not present in SI
*/
public static boolean checkInProgLoadInMainTableAndSI(CarbonTable carbonTable,
LoadMetadataDetails[] mainTableLoadMetadataDetails,
LoadMetadataDetails[] siTableLoadMetadataDetails) {
List<String> allSiSlices = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
for (LoadMetadataDetails oneLoad : siTableLoadMetadataDetails) {
allSiSlices.add(oneLoad.getLoadName());
}
if (mainTableLoadMetadataDetails.length != 0) {
for (LoadMetadataDetails loadDetail : mainTableLoadMetadataDetails) {
// if load in progress and check if same load is present in SI.
if (SegmentStatusManager.isLoadInProgress(carbonTable.getAbsoluteTableIdentifier(), loadDetail.getLoadName())) {
if (!allSiSlices.contains(loadDetail.getLoadName())) {
return false;
}
}
}
}
return true;
}
}