blob: 1d7481df76b4a535b8b698b14e0c1e14b2bed77d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.metadata;
import org.apache.hudi.avro.ConvertingGenericData;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieDeltaWriteStat;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.util.Lazy;
import org.apache.avro.AvroTypeException;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema;
import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields;
import static org.apache.hudi.avro.HoodieAvroUtils.convertValueForSpecificDataTypes;
import static org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldSchemaFromWriteSchema;
import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
import static org.apache.hudi.metadata.HoodieMetadataPayload.unwrapStatisticValueWrapper;
import static org.apache.hudi.metadata.HoodieTableMetadata.EMPTY_PARTITION_NAME;
import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME;
/**
* A utility to convert timeline information to metadata table records.
*/
public class HoodieTableMetadataUtil {
private static final Logger LOG = LogManager.getLogger(HoodieTableMetadataUtil.class);
public static final String PARTITION_NAME_FILES = "files";
public static final String PARTITION_NAME_COLUMN_STATS = "column_stats";
public static final String PARTITION_NAME_BLOOM_FILTERS = "bloom_filters";
/**
* Returns whether the files partition of metadata table is ready for read.
*
* @param metaClient {@link HoodieTableMetaClient} instance.
* @return true if the files partition of metadata table is ready for read,
* based on the table config; false otherwise.
*/
public static boolean isFilesPartitionAvailable(HoodieTableMetaClient metaClient) {
return metaClient.getTableConfig().getMetadataPartitions()
.contains(HoodieTableMetadataUtil.PARTITION_NAME_FILES);
}
/**
* Collects {@link HoodieColumnRangeMetadata} for the provided collection of records, pretending
* as if provided records have been persisted w/in given {@code filePath}
*
* @param records target records to compute column range metadata for
* @param targetFields columns (fields) to be collected
* @param filePath file path value required for {@link HoodieColumnRangeMetadata}
*
* @return map of {@link HoodieColumnRangeMetadata} for each of the provided target fields for
* the collection of provided records
*/
public static Map<String, HoodieColumnRangeMetadata<Comparable>> collectColumnRangeMetadata(List<IndexedRecord> records,
List<Schema.Field> targetFields,
String filePath) {
// Helper class to calculate column stats
class ColumnStats {
Object minValue;
Object maxValue;
long nullCount;
long valueCount;
}
HashMap<String, ColumnStats> allColumnStats = new HashMap<>();
// Collect stats for all columns by iterating through records while accounting
// corresponding stats
records.forEach((record) -> {
// For each column (field) we have to index update corresponding column stats
// with the values from this record
targetFields.forEach(field -> {
ColumnStats colStats = allColumnStats.computeIfAbsent(field.name(), (ignored) -> new ColumnStats());
GenericRecord genericRecord = (GenericRecord) record;
final Object fieldVal = convertValueForSpecificDataTypes(field.schema(), genericRecord.get(field.name()), false);
final Schema fieldSchema = getNestedFieldSchemaFromWriteSchema(genericRecord.getSchema(), field.name());
colStats.valueCount++;
if (fieldVal != null && canCompare(fieldSchema)) {
// Set the min value of the field
if (colStats.minValue == null
|| ConvertingGenericData.INSTANCE.compare(fieldVal, colStats.minValue, fieldSchema) < 0) {
colStats.minValue = fieldVal;
}
// Set the max value of the field
if (colStats.maxValue == null || ConvertingGenericData.INSTANCE.compare(fieldVal, colStats.maxValue, fieldSchema) > 0) {
colStats.maxValue = fieldVal;
}
} else {
colStats.nullCount++;
}
});
});
Collector<HoodieColumnRangeMetadata<Comparable>, ?, Map<String, HoodieColumnRangeMetadata<Comparable>>> collector =
Collectors.toMap(colRangeMetadata -> colRangeMetadata.getColumnName(), Function.identity());
return (Map<String, HoodieColumnRangeMetadata<Comparable>>) targetFields.stream()
.map(field -> {
ColumnStats colStats = allColumnStats.get(field.name());
return HoodieColumnRangeMetadata.<Comparable>create(
filePath,
field.name(),
colStats == null ? null : coerceToComparable(field.schema(), colStats.minValue),
colStats == null ? null : coerceToComparable(field.schema(), colStats.maxValue),
colStats == null ? 0 : colStats.nullCount,
colStats == null ? 0 : colStats.valueCount,
// NOTE: Size and compressed size statistics are set to 0 to make sure we're not
// mixing up those provided by Parquet with the ones from other encodings,
// since those are not directly comparable
0,
0
);
})
.collect(collector);
}
/**
* Converts instance of {@link HoodieMetadataColumnStats} to {@link HoodieColumnRangeMetadata}
*/
public static HoodieColumnRangeMetadata<Comparable> convertColumnStatsRecordToColumnRangeMetadata(HoodieMetadataColumnStats columnStats) {
return HoodieColumnRangeMetadata.<Comparable>create(
columnStats.getFileName(),
columnStats.getColumnName(),
unwrapStatisticValueWrapper(columnStats.getMinValue()),
unwrapStatisticValueWrapper(columnStats.getMaxValue()),
columnStats.getNullCount(),
columnStats.getValueCount(),
columnStats.getTotalSize(),
columnStats.getTotalUncompressedSize());
}
/**
* Delete the metadata table for the dataset. This will be invoked during upgrade/downgrade operation during which
* no other
* process should be running.
*
* @param basePath base path of the dataset
* @param context instance of {@link HoodieEngineContext}.
*/
public static void deleteMetadataTable(String basePath, HoodieEngineContext context) {
final String metadataTablePathStr = HoodieTableMetadata.getMetadataTableBasePath(basePath);
FileSystem fs = FSUtils.getFs(metadataTablePathStr, context.getHadoopConf().get());
try {
Path metadataTablePath = new Path(metadataTablePathStr);
if (fs.exists(metadataTablePath)) {
fs.delete(metadataTablePath, true);
}
} catch (Exception e) {
throw new HoodieMetadataException("Failed to remove metadata table from path " + metadataTablePathStr, e);
}
}
/**
* Deletes the metadata partition from the file system.
*
* @param basePath - base path of the dataset
* @param context - instance of {@link HoodieEngineContext}
* @param partitionType - {@link MetadataPartitionType} of the partition to delete
*/
public static void deleteMetadataPartition(String basePath, HoodieEngineContext context, MetadataPartitionType partitionType) {
final String metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(basePath);
FileSystem fs = FSUtils.getFs(metadataTablePath, context.getHadoopConf().get());
try {
fs.delete(new Path(metadataTablePath, partitionType.getPartitionPath()), true);
} catch (Exception e) {
throw new HoodieMetadataException(String.format("Failed to remove metadata partition %s from path %s", partitionType, metadataTablePath), e);
}
}
/**
* Check if the given metadata partition exists.
*
* @param basePath base path of the dataset
* @param context instance of {@link HoodieEngineContext}.
*/
public static boolean metadataPartitionExists(String basePath, HoodieEngineContext context, MetadataPartitionType partitionType) {
final String metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(basePath);
FileSystem fs = FSUtils.getFs(metadataTablePath, context.getHadoopConf().get());
try {
return fs.exists(new Path(metadataTablePath, partitionType.getPartitionPath()));
} catch (Exception e) {
throw new HoodieIOException(String.format("Failed to check metadata partition %s exists.", partitionType.getPartitionPath()));
}
}
/**
* Convert commit action to metadata records for the enabled partition types.
*
* @param commitMetadata - Commit action metadata
* @param instantTime - Action instant time
* @param recordsGenerationParams - Parameters for the record generation
* @return Map of partition to metadata records for the commit action
*/
public static Map<MetadataPartitionType, HoodieData<HoodieRecord>> convertMetadataToRecords(
HoodieEngineContext context, HoodieCommitMetadata commitMetadata, String instantTime,
MetadataRecordsGenerationParams recordsGenerationParams) {
final Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap = new HashMap<>();
final HoodieData<HoodieRecord> filesPartitionRecordsRDD = context.parallelize(
convertMetadataToFilesPartitionRecords(commitMetadata, instantTime), 1);
partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecordsRDD);
if (recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.BLOOM_FILTERS)) {
final HoodieData<HoodieRecord> metadataBloomFilterRecords = convertMetadataToBloomFilterRecords(context, commitMetadata, instantTime, recordsGenerationParams);
partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, metadataBloomFilterRecords);
}
if (recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.COLUMN_STATS)) {
final HoodieData<HoodieRecord> metadataColumnStatsRDD = convertMetadataToColumnStatsRecords(commitMetadata, context, recordsGenerationParams);
partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, metadataColumnStatsRDD);
}
return partitionToRecordsMap;
}
/**
* Finds all new files/partitions created as part of commit and creates metadata table records for them.
*
* @param commitMetadata - Commit action metadata
* @param instantTime - Commit action instant time
* @return List of metadata table records
*/
public static List<HoodieRecord> convertMetadataToFilesPartitionRecords(HoodieCommitMetadata commitMetadata,
String instantTime) {
List<HoodieRecord> records = new ArrayList<>(commitMetadata.getPartitionToWriteStats().size());
// Add record bearing added partitions list
List<String> partitionsAdded = getPartitionsAdded(commitMetadata);
// Add record bearing deleted partitions list
List<String> partitionsDeleted = getPartitionsDeleted(commitMetadata);
records.add(HoodieMetadataPayload.createPartitionListRecord(partitionsAdded, partitionsDeleted));
// Update files listing records for each individual partition
List<HoodieRecord<HoodieMetadataPayload>> updatedPartitionFilesRecords =
commitMetadata.getPartitionToWriteStats().entrySet()
.stream()
.map(entry -> {
String partitionStatName = entry.getKey();
List<HoodieWriteStat> writeStats = entry.getValue();
String partition = getPartitionIdentifier(partitionStatName);
HashMap<String, Long> updatedFilesToSizesMapping =
writeStats.stream().reduce(new HashMap<>(writeStats.size()),
(map, stat) -> {
String pathWithPartition = stat.getPath();
if (pathWithPartition == null) {
// Empty partition
LOG.warn("Unable to find path in write stat to update metadata table " + stat);
return map;
}
String fileName = FSUtils.getFileName(pathWithPartition, partitionStatName);
// Since write-stats are coming in no particular order, if the same
// file have previously been appended to w/in the txn, we simply pick max
// of the sizes as reported after every write, since file-sizes are
// monotonically increasing (ie file-size never goes down, unless deleted)
map.merge(fileName, stat.getFileSizeInBytes(), Math::max);
return map;
},
CollectionUtils::combine);
return HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.of(updatedFilesToSizesMapping),
Option.empty());
})
.collect(Collectors.toList());
records.addAll(updatedPartitionFilesRecords);
LOG.info("Updating at " + instantTime + " from Commit/" + commitMetadata.getOperationType()
+ ". #partitions_updated=" + records.size());
return records;
}
private static List<String> getPartitionsAdded(HoodieCommitMetadata commitMetadata) {
return commitMetadata.getPartitionToWriteStats().keySet().stream()
// We need to make sure we properly handle case of non-partitioned tables
.map(HoodieTableMetadataUtil::getPartitionIdentifier)
.collect(Collectors.toList());
}
private static List<String> getPartitionsDeleted(HoodieCommitMetadata commitMetadata) {
if (commitMetadata instanceof HoodieReplaceCommitMetadata
&& WriteOperationType.DELETE_PARTITION.equals(commitMetadata.getOperationType())) {
Map<String, List<String>> partitionToReplaceFileIds =
((HoodieReplaceCommitMetadata) commitMetadata).getPartitionToReplaceFileIds();
return partitionToReplaceFileIds.keySet().stream()
// We need to make sure we properly handle case of non-partitioned tables
.map(HoodieTableMetadataUtil::getPartitionIdentifier)
.collect(Collectors.toList());
}
return Collections.emptyList();
}
/**
* Convert commit action metadata to bloom filter records.
*
* @param context - Engine context to use
* @param commitMetadata - Commit action metadata
* @param instantTime - Action instant time
* @param recordsGenerationParams - Parameters for bloom filter record generation
* @return HoodieData of metadata table records
*/
public static HoodieData<HoodieRecord> convertMetadataToBloomFilterRecords(
HoodieEngineContext context, HoodieCommitMetadata commitMetadata,
String instantTime, MetadataRecordsGenerationParams recordsGenerationParams) {
final List<HoodieWriteStat> allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream()
.flatMap(entry -> entry.stream()).collect(Collectors.toList());
if (allWriteStats.isEmpty()) {
return context.emptyHoodieData();
}
final int parallelism = Math.max(Math.min(allWriteStats.size(), recordsGenerationParams.getBloomIndexParallelism()), 1);
HoodieData<HoodieWriteStat> allWriteStatsRDD = context.parallelize(allWriteStats, parallelism);
return allWriteStatsRDD.flatMap(hoodieWriteStat -> {
final String partition = hoodieWriteStat.getPartitionPath();
// For bloom filter index, delta writes do not change the base file bloom filter entries
if (hoodieWriteStat instanceof HoodieDeltaWriteStat) {
return Collections.emptyListIterator();
}
String pathWithPartition = hoodieWriteStat.getPath();
if (pathWithPartition == null) {
// Empty partition
LOG.error("Failed to find path in write stat to update metadata table " + hoodieWriteStat);
return Collections.emptyListIterator();
}
String fileName = FSUtils.getFileName(pathWithPartition, partition);
if (!FSUtils.isBaseFile(new Path(fileName))) {
return Collections.emptyListIterator();
}
final Path writeFilePath = new Path(recordsGenerationParams.getDataMetaClient().getBasePath(), pathWithPartition);
try (HoodieFileReader<IndexedRecord> fileReader =
HoodieFileReaderFactory.getFileReader(recordsGenerationParams.getDataMetaClient().getHadoopConf(), writeFilePath)) {
try {
final BloomFilter fileBloomFilter = fileReader.readBloomFilter();
if (fileBloomFilter == null) {
LOG.error("Failed to read bloom filter for " + writeFilePath);
return Collections.emptyListIterator();
}
ByteBuffer bloomByteBuffer = ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes());
HoodieRecord record = HoodieMetadataPayload.createBloomFilterMetadataRecord(
partition, fileName, instantTime, recordsGenerationParams.getBloomFilterType(), bloomByteBuffer, false);
return Collections.singletonList(record).iterator();
} catch (Exception e) {
LOG.error("Failed to read bloom filter for " + writeFilePath);
return Collections.emptyListIterator();
} finally {
fileReader.close();
}
} catch (IOException e) {
LOG.error("Failed to get bloom filter for file: " + writeFilePath + ", write stat: " + hoodieWriteStat);
}
return Collections.emptyListIterator();
});
}
/**
* Convert the clean action to metadata records.
*/
public static Map<MetadataPartitionType, HoodieData<HoodieRecord>> convertMetadataToRecords(HoodieEngineContext engineContext,
HoodieCleanMetadata cleanMetadata,
MetadataRecordsGenerationParams recordsGenerationParams,
String instantTime) {
final Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap = new HashMap<>();
final HoodieData<HoodieRecord> filesPartitionRecordsRDD = engineContext.parallelize(
convertMetadataToFilesPartitionRecords(cleanMetadata, instantTime), 1);
partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecordsRDD);
if (recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.BLOOM_FILTERS)) {
final HoodieData<HoodieRecord> metadataBloomFilterRecordsRDD =
convertMetadataToBloomFilterRecords(cleanMetadata, engineContext, instantTime, recordsGenerationParams);
partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, metadataBloomFilterRecordsRDD);
}
if (recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.COLUMN_STATS)) {
final HoodieData<HoodieRecord> metadataColumnStatsRDD =
convertMetadataToColumnStatsRecords(cleanMetadata, engineContext, recordsGenerationParams);
partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, metadataColumnStatsRDD);
}
return partitionToRecordsMap;
}
/**
* Finds all files that were deleted as part of a clean and creates metadata table records for them.
*
* @param cleanMetadata
* @param instantTime
* @return a list of metadata table records
*/
public static List<HoodieRecord> convertMetadataToFilesPartitionRecords(HoodieCleanMetadata cleanMetadata,
String instantTime) {
List<HoodieRecord> records = new LinkedList<>();
int[] fileDeleteCount = {0};
List<String> deletedPartitions = new ArrayList<>();
cleanMetadata.getPartitionMetadata().forEach((partitionName, partitionMetadata) -> {
final String partition = getPartitionIdentifier(partitionName);
// Files deleted from a partition
List<String> deletedFiles = partitionMetadata.getDeletePathPatterns();
HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(),
Option.of(new ArrayList<>(deletedFiles)));
records.add(record);
fileDeleteCount[0] += deletedFiles.size();
boolean isPartitionDeleted = partitionMetadata.getIsPartitionDeleted();
if (isPartitionDeleted) {
deletedPartitions.add(partitionName);
}
});
if (!deletedPartitions.isEmpty()) {
// if there are partitions to be deleted, add them to delete list
records.add(HoodieMetadataPayload.createPartitionListRecord(deletedPartitions, true));
}
LOG.info("Updating at " + instantTime + " from Clean. #partitions_updated=" + records.size()
+ ", #files_deleted=" + fileDeleteCount[0] + ", #partitions_deleted=" + deletedPartitions.size());
return records;
}
/**
* Convert clean metadata to bloom filter index records.
*
* @param cleanMetadata - Clean action metadata
* @param engineContext - Engine context
* @param instantTime - Clean action instant time
* @param recordsGenerationParams - Parameters for bloom filter record generation
* @return List of bloom filter index records for the clean metadata
*/
public static HoodieData<HoodieRecord> convertMetadataToBloomFilterRecords(HoodieCleanMetadata cleanMetadata,
HoodieEngineContext engineContext,
String instantTime,
MetadataRecordsGenerationParams recordsGenerationParams) {
List<Pair<String, String>> deleteFileList = new ArrayList<>();
cleanMetadata.getPartitionMetadata().forEach((partition, partitionMetadata) -> {
// Files deleted from a partition
List<String> deletedFiles = partitionMetadata.getDeletePathPatterns();
deletedFiles.forEach(entry -> {
final Path deletedFilePath = new Path(entry);
if (FSUtils.isBaseFile(deletedFilePath)) {
deleteFileList.add(Pair.of(partition, deletedFilePath.getName()));
}
});
});
final int parallelism = Math.max(Math.min(deleteFileList.size(), recordsGenerationParams.getBloomIndexParallelism()), 1);
HoodieData<Pair<String, String>> deleteFileListRDD = engineContext.parallelize(deleteFileList, parallelism);
return deleteFileListRDD.map(deleteFileInfoPair -> HoodieMetadataPayload.createBloomFilterMetadataRecord(
deleteFileInfoPair.getLeft(), deleteFileInfoPair.getRight(), instantTime, StringUtils.EMPTY_STRING,
ByteBuffer.allocate(0), true));
}
/**
* Convert clean metadata to column stats index records.
*
* @param cleanMetadata - Clean action metadata
* @param engineContext - Engine context
* @param recordsGenerationParams - Parameters for bloom filter record generation
* @return List of column stats index records for the clean metadata
*/
public static HoodieData<HoodieRecord> convertMetadataToColumnStatsRecords(HoodieCleanMetadata cleanMetadata,
HoodieEngineContext engineContext,
MetadataRecordsGenerationParams recordsGenerationParams) {
List<Pair<String, String>> deleteFileList = new ArrayList<>();
cleanMetadata.getPartitionMetadata().forEach((partition, partitionMetadata) -> {
// Files deleted from a partition
List<String> deletedFiles = partitionMetadata.getDeletePathPatterns();
deletedFiles.forEach(entry -> deleteFileList.add(Pair.of(partition, entry)));
});
HoodieTableMetaClient dataTableMetaClient = recordsGenerationParams.getDataMetaClient();
List<String> columnsToIndex =
getColumnsToIndex(recordsGenerationParams,
Lazy.lazily(() -> tryResolveSchemaForTable(dataTableMetaClient)));
if (columnsToIndex.isEmpty()) {
// In case there are no columns to index, bail
return engineContext.emptyHoodieData();
}
int parallelism = Math.max(Math.min(deleteFileList.size(), recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
return engineContext.parallelize(deleteFileList, parallelism)
.flatMap(deleteFileInfoPair -> {
String partitionPath = deleteFileInfoPair.getLeft();
String filePath = deleteFileInfoPair.getRight();
if (filePath.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
return getColumnStatsRecords(partitionPath, filePath, dataTableMetaClient, columnsToIndex, true).iterator();
}
return Collections.emptyListIterator();
});
}
/**
* Convert restore action metadata to metadata table records.
*/
public static Map<MetadataPartitionType, HoodieData<HoodieRecord>> convertMetadataToRecords(
HoodieEngineContext engineContext, HoodieActiveTimeline metadataTableTimeline, HoodieRestoreMetadata restoreMetadata,
MetadataRecordsGenerationParams recordsGenerationParams, String instantTime, Option<String> lastSyncTs) {
final Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap = new HashMap<>();
final Map<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<>();
final Map<String, List<String>> partitionToDeletedFiles = new HashMap<>();
processRestoreMetadata(metadataTableTimeline, restoreMetadata, partitionToAppendedFiles, partitionToDeletedFiles, lastSyncTs);
final HoodieData<HoodieRecord> filesPartitionRecordsRDD =
engineContext.parallelize(convertFilesToFilesPartitionRecords(partitionToDeletedFiles, partitionToAppendedFiles, instantTime, "Restore"), 1);
partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecordsRDD);
if (recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.BLOOM_FILTERS)) {
final HoodieData<HoodieRecord> metadataBloomFilterRecordsRDD =
convertFilesToBloomFilterRecords(engineContext, partitionToDeletedFiles, partitionToAppendedFiles, recordsGenerationParams, instantTime);
partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, metadataBloomFilterRecordsRDD);
}
if (recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.COLUMN_STATS)) {
final HoodieData<HoodieRecord> metadataColumnStatsRDD =
convertFilesToColumnStatsRecords(engineContext, partitionToDeletedFiles, partitionToAppendedFiles, recordsGenerationParams);
partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, metadataColumnStatsRDD);
}
return partitionToRecordsMap;
}
/**
* Aggregates all files deleted and appended to from all rollbacks associated with a restore operation then
* creates metadata table records for them.
*
* @param restoreMetadata - Restore action metadata
* @return a list of metadata table records
*/
private static void processRestoreMetadata(HoodieActiveTimeline metadataTableTimeline,
HoodieRestoreMetadata restoreMetadata,
Map<String, Map<String, Long>> partitionToAppendedFiles,
Map<String, List<String>> partitionToDeletedFiles,
Option<String> lastSyncTs) {
restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> rms.forEach(rm -> processRollbackMetadata(metadataTableTimeline, rm,
partitionToDeletedFiles, partitionToAppendedFiles, lastSyncTs)));
}
/**
* Convert rollback action metadata to metadata table records.
*/
public static Map<MetadataPartitionType, HoodieData<HoodieRecord>> convertMetadataToRecords(
HoodieEngineContext engineContext, HoodieActiveTimeline metadataTableTimeline,
HoodieRollbackMetadata rollbackMetadata, MetadataRecordsGenerationParams recordsGenerationParams,
String instantTime, Option<String> lastSyncTs, boolean wasSynced) {
final Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap = new HashMap<>();
Map<String, List<String>> partitionToDeletedFiles = new HashMap<>();
Map<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<>();
List<HoodieRecord> filesPartitionRecords =
convertMetadataToRollbackRecords(metadataTableTimeline, rollbackMetadata, partitionToDeletedFiles, partitionToAppendedFiles, instantTime, lastSyncTs, wasSynced);
final HoodieData<HoodieRecord> rollbackRecordsRDD = engineContext.parallelize(filesPartitionRecords, 1);
partitionToRecordsMap.put(MetadataPartitionType.FILES, rollbackRecordsRDD);
if (recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.BLOOM_FILTERS)) {
final HoodieData<HoodieRecord> metadataBloomFilterRecordsRDD =
convertFilesToBloomFilterRecords(engineContext, partitionToDeletedFiles, partitionToAppendedFiles, recordsGenerationParams, instantTime);
partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, metadataBloomFilterRecordsRDD);
}
if (recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.COLUMN_STATS)) {
final HoodieData<HoodieRecord> metadataColumnStatsRDD =
convertFilesToColumnStatsRecords(engineContext, partitionToDeletedFiles, partitionToAppendedFiles, recordsGenerationParams);
partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, metadataColumnStatsRDD);
}
return partitionToRecordsMap;
}
/**
* Convert rollback action metadata to files partition records.
*/
private static List<HoodieRecord> convertMetadataToRollbackRecords(HoodieActiveTimeline metadataTableTimeline,
HoodieRollbackMetadata rollbackMetadata,
Map<String, List<String>> partitionToDeletedFiles,
Map<String, Map<String, Long>> partitionToAppendedFiles,
String instantTime,
Option<String> lastSyncTs, boolean wasSynced) {
processRollbackMetadata(metadataTableTimeline, rollbackMetadata, partitionToDeletedFiles,
partitionToAppendedFiles, lastSyncTs);
if (!wasSynced) {
// Since the instant-being-rolled-back was never committed to the metadata table, the files added there
// need not be deleted. For MOR Table, the rollback appends logBlocks so we need to keep the appended files.
partitionToDeletedFiles.clear();
}
return convertFilesToFilesPartitionRecords(partitionToDeletedFiles, partitionToAppendedFiles, instantTime, "Rollback");
}
/**
* Extracts information about the deleted and append files from the {@code HoodieRollbackMetadata}.
* <p>
* During a rollback files may be deleted (COW, MOR) or rollback blocks be appended (MOR only) to files. This
* function will extract this change file for each partition.
*
* @param metadataTableTimeline Current timeline of the Metadata Table
* @param rollbackMetadata {@code HoodieRollbackMetadata}
* @param partitionToDeletedFiles The {@code Map} to fill with files deleted per partition.
* @param partitionToAppendedFiles The {@code Map} to fill with files appended per partition and their sizes.
*/
private static void processRollbackMetadata(HoodieActiveTimeline metadataTableTimeline,
HoodieRollbackMetadata rollbackMetadata,
Map<String, List<String>> partitionToDeletedFiles,
Map<String, Map<String, Long>> partitionToAppendedFiles,
Option<String> lastSyncTs) {
rollbackMetadata.getPartitionMetadata().values().forEach(pm -> {
final String instantToRollback = rollbackMetadata.getCommitsRollback().get(0);
// Has this rollback produced new files?
boolean hasRollbackLogFiles = pm.getRollbackLogFiles() != null && !pm.getRollbackLogFiles().isEmpty();
boolean hasNonZeroRollbackLogFiles = hasRollbackLogFiles && pm.getRollbackLogFiles().values().stream().mapToLong(Long::longValue).sum() > 0;
// If instant-to-rollback has not been synced to metadata table yet then there is no need to update metadata
// This can happen in two cases:
// Case 1: Metadata Table timeline is behind the instant-to-rollback.
boolean shouldSkip = lastSyncTs.isPresent()
&& HoodieTimeline.compareTimestamps(instantToRollback, HoodieTimeline.GREATER_THAN, lastSyncTs.get());
if (!hasNonZeroRollbackLogFiles && shouldSkip) {
LOG.info(String.format("Skipping syncing of rollbackMetadata at %s, given metadata table is already synced upto to %s",
instantToRollback, lastSyncTs.get()));
return;
}
// Case 2: The instant-to-rollback was never committed to Metadata Table. This can happen if the instant-to-rollback
// was a failed commit (never completed).
//
// There are two cases for failed commit that we need to take care of:
// 1) The commit was synced to metadata table successfully but the dataset meta file switches state failed
// (from INFLIGHT to COMPLETED), the committed files should be rolled back thus the rollback metadata
// can not be skipped, usually a failover should be triggered and the metadata active timeline expects
// to contain the commit, we could check whether the commit was synced to metadata table
// through HoodieActiveTimeline#containsInstant.
//
// 2) The commit synced to metadata table failed or was never synced to metadata table,
// in this case, the rollback metadata should be skipped.
//
// And in which case,
// metadataTableTimeline.getCommitsTimeline().isBeforeTimelineStarts(syncedInstant.getTimestamp())
// returns true ?
// It is most probably because of compaction rollback, we schedule a compaction plan early in the timeline (say t1)
// then after a long time schedule and execute the plan then try to rollback it.
//
// scheduled execution rollback compaction actions
// ----- t1 ----- t3 ----- t4 ----- dataset timeline
//
// ---------- t2 (archive) ----------- metadata timeline
//
// when at time t4, we commit the compaction rollback,the above check returns true.
HoodieInstant syncedInstant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, instantToRollback);
if (metadataTableTimeline.getCommitsTimeline().isBeforeTimelineStarts(syncedInstant.getTimestamp())) {
throw new HoodieMetadataException(String.format("The instant %s required to sync rollback of %s has been archived",
syncedInstant, instantToRollback));
}
shouldSkip = !metadataTableTimeline.containsInstant(syncedInstant);
if (!hasNonZeroRollbackLogFiles && shouldSkip) {
LOG.info(String.format("Skipping syncing of rollbackMetadata at %s, since this instant was never committed to Metadata Table",
instantToRollback));
return;
}
final String partition = pm.getPartitionPath();
if ((!pm.getSuccessDeleteFiles().isEmpty() || !pm.getFailedDeleteFiles().isEmpty()) && !shouldSkip) {
if (!partitionToDeletedFiles.containsKey(partition)) {
partitionToDeletedFiles.put(partition, new ArrayList<>());
}
// Extract deleted file name from the absolute paths saved in getSuccessDeleteFiles()
List<String> deletedFiles = pm.getSuccessDeleteFiles().stream().map(p -> new Path(p).getName())
.collect(Collectors.toList());
if (!pm.getFailedDeleteFiles().isEmpty()) {
deletedFiles.addAll(pm.getFailedDeleteFiles().stream().map(p -> new Path(p).getName())
.collect(Collectors.toList()));
}
partitionToDeletedFiles.get(partition).addAll(deletedFiles);
}
BiFunction<Long, Long, Long> fileMergeFn = (oldSize, newSizeCopy) -> {
// if a file exists in both written log files and rollback log files, we want to pick the one that is higher
// as rollback file could have been updated after written log files are computed.
return oldSize > newSizeCopy ? oldSize : newSizeCopy;
};
if (hasRollbackLogFiles) {
if (!partitionToAppendedFiles.containsKey(partition)) {
partitionToAppendedFiles.put(partition, new HashMap<>());
}
// Extract appended file name from the absolute paths saved in getAppendFiles()
pm.getRollbackLogFiles().forEach((path, size) -> {
partitionToAppendedFiles.get(partition).merge(new Path(path).getName(), size, fileMergeFn);
});
}
});
}
/**
* Convert rollback action metadata to files partition records.
*/
private static List<HoodieRecord> convertFilesToFilesPartitionRecords(Map<String, List<String>> partitionToDeletedFiles,
Map<String, Map<String, Long>> partitionToAppendedFiles,
String instantTime, String operation) {
List<HoodieRecord> records = new LinkedList<>();
int[] fileChangeCount = {0, 0}; // deletes, appends
partitionToDeletedFiles.forEach((partitionName, deletedFiles) -> {
fileChangeCount[0] += deletedFiles.size();
final String partition = getPartitionIdentifier(partitionName);
Option<Map<String, Long>> filesAdded = Option.empty();
if (partitionToAppendedFiles.containsKey(partitionName)) {
filesAdded = Option.of(partitionToAppendedFiles.remove(partitionName));
}
HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, filesAdded,
Option.of(new ArrayList<>(deletedFiles)));
records.add(record);
});
partitionToAppendedFiles.forEach((partitionName, appendedFileMap) -> {
final String partition = getPartitionIdentifier(partitionName);
fileChangeCount[1] += appendedFileMap.size();
// Validate that no appended file has been deleted
checkState(
!appendedFileMap.keySet().removeAll(partitionToDeletedFiles.getOrDefault(partition, Collections.emptyList())),
"Rollback file cannot both be appended and deleted");
// New files added to a partition
HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.of(appendedFileMap),
Option.empty());
records.add(record);
});
LOG.info("Found at " + instantTime + " from " + operation + ". #partitions_updated=" + records.size()
+ ", #files_deleted=" + fileChangeCount[0] + ", #files_appended=" + fileChangeCount[1]);
return records;
}
/**
* Returns partition name for the given path.
*/
public static String getPartitionIdentifier(@Nonnull String relativePartitionPath) {
return EMPTY_PARTITION_NAME.equals(relativePartitionPath) ? NON_PARTITIONED_NAME : relativePartitionPath;
}
/**
* Convert added and deleted files metadata to bloom filter index records.
*/
public static HoodieData<HoodieRecord> convertFilesToBloomFilterRecords(HoodieEngineContext engineContext,
Map<String, List<String>> partitionToDeletedFiles,
Map<String, Map<String, Long>> partitionToAppendedFiles,
MetadataRecordsGenerationParams recordsGenerationParams,
String instantTime) {
HoodieData<HoodieRecord> allRecordsRDD = engineContext.emptyHoodieData();
List<Pair<String, List<String>>> partitionToDeletedFilesList = partitionToDeletedFiles.entrySet()
.stream().map(e -> Pair.of(e.getKey(), e.getValue())).collect(Collectors.toList());
int parallelism = Math.max(Math.min(partitionToDeletedFilesList.size(), recordsGenerationParams.getBloomIndexParallelism()), 1);
HoodieData<Pair<String, List<String>>> partitionToDeletedFilesRDD = engineContext.parallelize(partitionToDeletedFilesList, parallelism);
HoodieData<HoodieRecord> deletedFilesRecordsRDD = partitionToDeletedFilesRDD.flatMap(partitionToDeletedFilesPair -> {
final String partitionName = partitionToDeletedFilesPair.getLeft();
final List<String> deletedFileList = partitionToDeletedFilesPair.getRight();
return deletedFileList.stream().flatMap(deletedFile -> {
if (!FSUtils.isBaseFile(new Path(deletedFile))) {
return Stream.empty();
}
final String partition = getPartitionIdentifier(partitionName);
return Stream.<HoodieRecord>of(HoodieMetadataPayload.createBloomFilterMetadataRecord(
partition, deletedFile, instantTime, StringUtils.EMPTY_STRING, ByteBuffer.allocate(0), true));
}).iterator();
});
allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD);
List<Pair<String, Map<String, Long>>> partitionToAppendedFilesList = partitionToAppendedFiles.entrySet()
.stream().map(entry -> Pair.of(entry.getKey(), entry.getValue())).collect(Collectors.toList());
parallelism = Math.max(Math.min(partitionToAppendedFilesList.size(), recordsGenerationParams.getBloomIndexParallelism()), 1);
HoodieData<Pair<String, Map<String, Long>>> partitionToAppendedFilesRDD = engineContext.parallelize(partitionToAppendedFilesList, parallelism);
HoodieData<HoodieRecord> appendedFilesRecordsRDD = partitionToAppendedFilesRDD.flatMap(partitionToAppendedFilesPair -> {
final String partitionName = partitionToAppendedFilesPair.getLeft();
final Map<String, Long> appendedFileMap = partitionToAppendedFilesPair.getRight();
final String partition = getPartitionIdentifier(partitionName);
return appendedFileMap.entrySet().stream().flatMap(appendedFileLengthPairEntry -> {
final String appendedFile = appendedFileLengthPairEntry.getKey();
if (!FSUtils.isBaseFile(new Path(appendedFile))) {
return Stream.empty();
}
final String pathWithPartition = partitionName + "/" + appendedFile;
final Path appendedFilePath = new Path(recordsGenerationParams.getDataMetaClient().getBasePath(), pathWithPartition);
try (HoodieFileReader<IndexedRecord> fileReader =
HoodieFileReaderFactory.getFileReader(recordsGenerationParams.getDataMetaClient().getHadoopConf(), appendedFilePath)) {
final BloomFilter fileBloomFilter = fileReader.readBloomFilter();
if (fileBloomFilter == null) {
LOG.error("Failed to read bloom filter for " + appendedFilePath);
return Stream.empty();
}
ByteBuffer bloomByteBuffer = ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes());
HoodieRecord record = HoodieMetadataPayload.createBloomFilterMetadataRecord(
partition, appendedFile, instantTime, recordsGenerationParams.getBloomFilterType(), bloomByteBuffer, false);
return Stream.of(record);
} catch (IOException e) {
LOG.error("Failed to get bloom filter for file: " + appendedFilePath);
}
return Stream.empty();
}).iterator();
});
allRecordsRDD = allRecordsRDD.union(appendedFilesRecordsRDD);
return allRecordsRDD;
}
/**
* Convert added and deleted action metadata to column stats index records.
*/
public static HoodieData<HoodieRecord> convertFilesToColumnStatsRecords(HoodieEngineContext engineContext,
Map<String, List<String>> partitionToDeletedFiles,
Map<String, Map<String, Long>> partitionToAppendedFiles,
MetadataRecordsGenerationParams recordsGenerationParams) {
HoodieData<HoodieRecord> allRecordsRDD = engineContext.emptyHoodieData();
HoodieTableMetaClient dataTableMetaClient = recordsGenerationParams.getDataMetaClient();
final List<String> columnsToIndex =
getColumnsToIndex(recordsGenerationParams,
Lazy.lazily(() -> tryResolveSchemaForTable(dataTableMetaClient)));
if (columnsToIndex.isEmpty()) {
// In case there are no columns to index, bail
return engineContext.emptyHoodieData();
}
final List<Pair<String, List<String>>> partitionToDeletedFilesList = partitionToDeletedFiles.entrySet().stream()
.map(e -> Pair.of(e.getKey(), e.getValue()))
.collect(Collectors.toList());
int deletedFilesTargetParallelism = Math.max(Math.min(partitionToDeletedFilesList.size(), recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
final HoodieData<Pair<String, List<String>>> partitionToDeletedFilesRDD =
engineContext.parallelize(partitionToDeletedFilesList, deletedFilesTargetParallelism);
HoodieData<HoodieRecord> deletedFilesRecordsRDD = partitionToDeletedFilesRDD.flatMap(partitionToDeletedFilesPair -> {
final String partitionPath = partitionToDeletedFilesPair.getLeft();
final String partitionId = getPartitionIdentifier(partitionPath);
final List<String> deletedFileList = partitionToDeletedFilesPair.getRight();
return deletedFileList.stream().flatMap(deletedFile -> {
final String filePathWithPartition = partitionPath + "/" + deletedFile;
return getColumnStatsRecords(partitionId, filePathWithPartition, dataTableMetaClient, columnsToIndex, true);
}).iterator();
});
allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD);
final List<Pair<String, Map<String, Long>>> partitionToAppendedFilesList = partitionToAppendedFiles.entrySet().stream()
.map(entry -> Pair.of(entry.getKey(), entry.getValue()))
.collect(Collectors.toList());
int appendedFilesTargetParallelism = Math.max(Math.min(partitionToAppendedFilesList.size(), recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
final HoodieData<Pair<String, Map<String, Long>>> partitionToAppendedFilesRDD =
engineContext.parallelize(partitionToAppendedFilesList, appendedFilesTargetParallelism);
HoodieData<HoodieRecord> appendedFilesRecordsRDD = partitionToAppendedFilesRDD.flatMap(partitionToAppendedFilesPair -> {
final String partitionPath = partitionToAppendedFilesPair.getLeft();
final String partitionId = getPartitionIdentifier(partitionPath);
final Map<String, Long> appendedFileMap = partitionToAppendedFilesPair.getRight();
return appendedFileMap.entrySet().stream().flatMap(appendedFileNameLengthEntry -> {
if (!FSUtils.isBaseFile(new Path(appendedFileNameLengthEntry.getKey()))
|| !appendedFileNameLengthEntry.getKey().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
return Stream.empty();
}
final String filePathWithPartition = partitionPath + "/" + appendedFileNameLengthEntry.getKey();
return getColumnStatsRecords(partitionId, filePathWithPartition, dataTableMetaClient, columnsToIndex, false);
}).iterator();
});
allRecordsRDD = allRecordsRDD.union(appendedFilesRecordsRDD);
return allRecordsRDD;
}
/**
* Map a record key to a file group in partition of interest.
* <p>
* Note: For hashing, the algorithm is same as String.hashCode() but is being defined here as hashCode()
* implementation is not guaranteed by the JVM to be consistent across JVM versions and implementations.
*
* @param recordKey record key for which the file group index is looked up for.
* @return An integer hash of the given string
*/
public static int mapRecordKeyToFileGroupIndex(String recordKey, int numFileGroups) {
int h = 0;
for (int i = 0; i < recordKey.length(); ++i) {
h = 31 * h + recordKey.charAt(i);
}
return Math.abs(Math.abs(h) % numFileGroups);
}
/**
* Get the latest file slices for a Metadata Table partition. If the file slice is
* because of pending compaction instant, then merge the file slice with the one
* just before the compaction instant time. The list of file slices returned is
* sorted in the correct order of file group name.
*
* @param metaClient Instance of {@link HoodieTableMetaClient}.
* @param fsView Metadata table filesystem view.
* @param partition The name of the partition whose file groups are to be loaded.
* @return List of latest file slices for all file groups in a given partition.
*/
public static List<FileSlice> getPartitionLatestMergedFileSlices(
HoodieTableMetaClient metaClient, HoodieTableFileSystemView fsView, String partition) {
LOG.info("Loading latest merged file slices for metadata table partition " + partition);
return getPartitionFileSlices(metaClient, Option.of(fsView), partition, true);
}
/**
* Get the latest file slices for a Metadata Table partition. The list of file slices
* returned is sorted in the correct order of file group name.
*
* @param metaClient - Instance of {@link HoodieTableMetaClient}.
* @param fsView - Metadata table filesystem view
* @param partition - The name of the partition whose file groups are to be loaded.
* @return List of latest file slices for all file groups in a given partition.
*/
public static List<FileSlice> getPartitionLatestFileSlices(HoodieTableMetaClient metaClient,
Option<HoodieTableFileSystemView> fsView, String partition) {
LOG.info("Loading latest file slices for metadata table partition " + partition);
return getPartitionFileSlices(metaClient, fsView, partition, false);
}
/**
* Get metadata table file system view.
*
* @param metaClient - Metadata table meta client
* @return Filesystem view for the metadata table
*/
public static HoodieTableFileSystemView getFileSystemView(HoodieTableMetaClient metaClient) {
// If there are no commits on the metadata table then the table's
// default FileSystemView will not return any file slices even
// though we may have initialized them.
HoodieTimeline timeline = metaClient.getActiveTimeline();
if (timeline.empty()) {
final HoodieInstant instant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION,
HoodieActiveTimeline.createNewInstantTime());
timeline = new HoodieDefaultTimeline(Stream.of(instant), metaClient.getActiveTimeline()::getInstantDetails);
}
return new HoodieTableFileSystemView(metaClient, timeline);
}
/**
* Get the latest file slices for a given partition.
*
* @param metaClient - Instance of {@link HoodieTableMetaClient}.
* @param partition - The name of the partition whose file groups are to be loaded.
* @param mergeFileSlices - When enabled, will merge the latest file slices with the last known
* completed instant. This is useful for readers when there are pending
* compactions. MergeFileSlices when disabled, will return the latest file
* slices without any merging, and this is needed for the writers.
* @return List of latest file slices for all file groups in a given partition.
*/
private static List<FileSlice> getPartitionFileSlices(HoodieTableMetaClient metaClient,
Option<HoodieTableFileSystemView> fileSystemView,
String partition,
boolean mergeFileSlices) {
HoodieTableFileSystemView fsView = fileSystemView.orElse(getFileSystemView(metaClient));
Stream<FileSlice> fileSliceStream;
if (mergeFileSlices) {
if (metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().isPresent()) {
fileSliceStream = fsView.getLatestMergedFileSlicesBeforeOrOn(
partition, metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().get().getTimestamp());
} else {
return Collections.EMPTY_LIST;
}
} else {
fileSliceStream = fsView.getLatestFileSlices(partition);
}
return fileSliceStream.sorted(Comparator.comparing(FileSlice::getFileId)).collect(Collectors.toList());
}
/**
* Get the latest file slices for a given partition including the inflight ones.
*
* @param metaClient - instance of {@link HoodieTableMetaClient}
* @param fileSystemView - hoodie table file system view, which will be fetched from meta client if not already present
* @param partition - name of the partition whose file groups are to be loaded
* @return
*/
public static List<FileSlice> getPartitionLatestFileSlicesIncludingInflight(HoodieTableMetaClient metaClient,
Option<HoodieTableFileSystemView> fileSystemView,
String partition) {
HoodieTableFileSystemView fsView = fileSystemView.orElse(getFileSystemView(metaClient));
Stream<FileSlice> fileSliceStream = fsView.fetchLatestFileSlicesIncludingInflight(partition);
return fileSliceStream
.sorted(Comparator.comparing(FileSlice::getFileId))
.collect(Collectors.toList());
}
public static HoodieData<HoodieRecord> convertMetadataToColumnStatsRecords(HoodieCommitMetadata commitMetadata,
HoodieEngineContext engineContext,
MetadataRecordsGenerationParams recordsGenerationParams) {
List<HoodieWriteStat> allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream()
.flatMap(Collection::stream).collect(Collectors.toList());
if (allWriteStats.isEmpty()) {
return engineContext.emptyHoodieData();
}
try {
Option<Schema> writerSchema =
Option.ofNullable(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY))
.flatMap(writerSchemaStr ->
isNullOrEmpty(writerSchemaStr)
? Option.empty()
: Option.of(new Schema.Parser().parse(writerSchemaStr)));
HoodieTableMetaClient dataTableMetaClient = recordsGenerationParams.getDataMetaClient();
HoodieTableConfig tableConfig = dataTableMetaClient.getTableConfig();
// NOTE: Writer schema added to commit metadata will not contain Hudi's metadata fields
Option<Schema> tableSchema = writerSchema.map(schema ->
tableConfig.populateMetaFields() ? addMetadataFields(schema) : schema);
List<String> columnsToIndex = getColumnsToIndex(recordsGenerationParams,
Lazy.eagerly(tableSchema));
if (columnsToIndex.isEmpty()) {
// In case there are no columns to index, bail
return engineContext.emptyHoodieData();
}
int parallelism = Math.max(Math.min(allWriteStats.size(), recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
return engineContext.parallelize(allWriteStats, parallelism)
.flatMap(writeStat ->
translateWriteStatToColumnStats(writeStat, dataTableMetaClient, columnsToIndex).iterator());
} catch (Exception e) {
throw new HoodieException("Failed to generate column stats records for metadata table", e);
}
}
/**
* Get the list of columns for the table for column stats indexing
*/
private static List<String> getColumnsToIndex(MetadataRecordsGenerationParams recordsGenParams,
Lazy<Option<Schema>> lazyWriterSchemaOpt) {
checkState(recordsGenParams.isColumnStatsIndexEnabled());
List<String> targetColumns = recordsGenParams.getTargetColumnsForColumnStatsIndex();
if (!targetColumns.isEmpty()) {
return targetColumns;
}
Option<Schema> writerSchemaOpt = lazyWriterSchemaOpt.get();
return writerSchemaOpt
.map(writerSchema ->
writerSchema.getFields().stream()
.map(Schema.Field::name)
.collect(Collectors.toList()))
.orElse(Collections.emptyList());
}
private static Stream<HoodieRecord> translateWriteStatToColumnStats(HoodieWriteStat writeStat,
HoodieTableMetaClient datasetMetaClient,
List<String> columnsToIndex) {
if (writeStat instanceof HoodieDeltaWriteStat && ((HoodieDeltaWriteStat) writeStat).getColumnStats().isPresent()) {
Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap = ((HoodieDeltaWriteStat) writeStat).getColumnStats().get();
Collection<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = columnRangeMap.values();
return HoodieMetadataPayload.createColumnStatsRecords(writeStat.getPartitionPath(), columnRangeMetadataList, false);
}
return getColumnStatsRecords(writeStat.getPartitionPath(), writeStat.getPath(), datasetMetaClient, columnsToIndex, false);
}
private static Stream<HoodieRecord> getColumnStatsRecords(String partitionPath,
String filePath,
HoodieTableMetaClient datasetMetaClient,
List<String> columnsToIndex,
boolean isDeleted) {
String filePartitionPath = filePath.startsWith("/") ? filePath.substring(1) : filePath;
String fileName = FSUtils.getFileName(filePath, partitionPath);
if (isDeleted) {
// TODO we should delete records instead of stubbing them
List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = columnsToIndex.stream()
.map(entry -> HoodieColumnRangeMetadata.stub(fileName, entry))
.collect(Collectors.toList());
return HoodieMetadataPayload.createColumnStatsRecords(partitionPath, columnRangeMetadataList, true);
}
List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadata =
readColumnRangeMetadataFrom(filePartitionPath, datasetMetaClient, columnsToIndex);
return HoodieMetadataPayload.createColumnStatsRecords(partitionPath, columnRangeMetadata, false);
}
private static List<HoodieColumnRangeMetadata<Comparable>> readColumnRangeMetadataFrom(String filePath,
HoodieTableMetaClient datasetMetaClient,
List<String> columnsToIndex) {
try {
if (filePath.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
Path fullFilePath = new Path(datasetMetaClient.getBasePath(), filePath);
List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList =
new ParquetUtils().readRangeFromParquetMetadata(datasetMetaClient.getHadoopConf(), fullFilePath, columnsToIndex);
return columnRangeMetadataList;
}
LOG.warn("Column range index not supported for: " + filePath);
return Collections.emptyList();
} catch (Exception e) {
// NOTE: In case reading column range metadata from individual file failed,
// we simply fall back, in lieu of failing the whole task
LOG.error("Failed to fetch column range metadata for: " + filePath);
return Collections.emptyList();
}
}
/**
* Get file group count for a metadata table partition.
*
* @param partitionType - Metadata table partition type
* @param metaClient - Metadata table meta client
* @param fsView - Filesystem view
* @param metadataConfig - Metadata config
* @param isBootstrapCompleted - Is bootstrap completed for the metadata table
* @return File group count for the requested metadata partition type
*/
public static int getPartitionFileGroupCount(final MetadataPartitionType partitionType,
final Option<HoodieTableMetaClient> metaClient,
final Option<HoodieTableFileSystemView> fsView,
final HoodieMetadataConfig metadataConfig, boolean isBootstrapCompleted) {
if (isBootstrapCompleted) {
final List<FileSlice> latestFileSlices = HoodieTableMetadataUtil
.getPartitionLatestFileSlices(metaClient.get(), fsView, partitionType.getPartitionPath());
if (latestFileSlices.size() == 0 && !partitionType.getPartitionPath().equals(MetadataPartitionType.FILES.getPartitionPath())) {
return getFileGroupCount(partitionType, metadataConfig);
}
return Math.max(latestFileSlices.size(), 1);
}
return getFileGroupCount(partitionType, metadataConfig);
}
private static int getFileGroupCount(MetadataPartitionType partitionType, final HoodieMetadataConfig metadataConfig) {
switch (partitionType) {
case BLOOM_FILTERS:
return metadataConfig.getBloomFilterIndexFileGroupCount();
case COLUMN_STATS:
return metadataConfig.getColumnStatsIndexFileGroupCount();
default:
return 1;
}
}
/**
* Does an upcast for {@link BigDecimal} instance to align it with scale/precision expected by
* the {@link org.apache.avro.LogicalTypes.Decimal} Avro logical type
*/
public static BigDecimal tryUpcastDecimal(BigDecimal value, final LogicalTypes.Decimal decimal) {
final int scale = decimal.getScale();
final int valueScale = value.scale();
boolean scaleAdjusted = false;
if (valueScale != scale) {
try {
value = value.setScale(scale, RoundingMode.UNNECESSARY);
scaleAdjusted = true;
} catch (ArithmeticException aex) {
throw new AvroTypeException(
"Cannot encode decimal with scale " + valueScale + " as scale " + scale + " without rounding");
}
}
int precision = decimal.getPrecision();
int valuePrecision = value.precision();
if (valuePrecision > precision) {
if (scaleAdjusted) {
throw new AvroTypeException("Cannot encode decimal with precision " + valuePrecision + " as max precision "
+ precision + ". This is after safely adjusting scale from " + valueScale + " to required " + scale);
} else {
throw new AvroTypeException(
"Cannot encode decimal with precision " + valuePrecision + " as max precision " + precision);
}
}
return value;
}
private static Option<Schema> tryResolveSchemaForTable(HoodieTableMetaClient dataTableMetaClient) {
if (dataTableMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants() == 0) {
return Option.empty();
}
try {
TableSchemaResolver schemaResolver = new TableSchemaResolver(dataTableMetaClient);
return Option.of(schemaResolver.getTableAvroSchema());
} catch (Exception e) {
throw new HoodieException("Failed to get latest columns for " + dataTableMetaClient.getBasePath(), e);
}
}
/**
* Given a schema, coerces provided value to instance of {@link Comparable<?>} such that
* it could subsequently used in column stats
*
* NOTE: This method has to stay compatible with the semantic of
* {@link ParquetUtils#readRangeFromParquetMetadata} as they are used in tandem
*/
private static Comparable<?> coerceToComparable(Schema schema, Object val) {
if (val == null) {
return null;
}
switch (schema.getType()) {
case UNION:
// TODO we need to handle unions in general case as well
return coerceToComparable(resolveNullableSchema(schema), val);
case FIXED:
case BYTES:
if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
return (Comparable<?>) val;
}
return (ByteBuffer) val;
case INT:
if (schema.getLogicalType() == LogicalTypes.date()
|| schema.getLogicalType() == LogicalTypes.timeMillis()) {
// NOTE: This type will be either {@code java.sql.Date} or {org.joda.LocalDate}
// depending on the Avro version. Hence, we simply cast it to {@code Comparable<?>}
return (Comparable<?>) val;
}
return (Integer) val;
case LONG:
if (schema.getLogicalType() == LogicalTypes.timeMicros()
|| schema.getLogicalType() == LogicalTypes.timestampMicros()
|| schema.getLogicalType() == LogicalTypes.timestampMillis()) {
// NOTE: This type will be either {@code java.sql.Date} or {org.joda.LocalDate}
// depending on the Avro version. Hence, we simply cast it to {@code Comparable<?>}
return (Comparable<?>) val;
}
return (Long) val;
case STRING:
// unpack the avro Utf8 if possible
return val.toString();
case FLOAT:
case DOUBLE:
case BOOLEAN:
return (Comparable<?>) val;
// TODO add support for those types
case ENUM:
case MAP:
case NULL:
case RECORD:
case ARRAY:
return null;
default:
throw new IllegalStateException("Unexpected type: " + schema.getType());
}
}
private static boolean canCompare(Schema schema) {
return schema.getType() != Schema.Type.MAP;
}
public static Set<String> getInflightMetadataPartitions(HoodieTableConfig tableConfig) {
return new HashSet<>(tableConfig.getMetadataPartitionsInflight());
}
public static Set<String> getInflightAndCompletedMetadataPartitions(HoodieTableConfig tableConfig) {
Set<String> inflightAndCompletedPartitions = getInflightMetadataPartitions(tableConfig);
inflightAndCompletedPartitions.addAll(tableConfig.getMetadataPartitions());
return inflightAndCompletedPartitions;
}
}