blob: 894b41b51c6bf551ef060859533cfd442bd8c35d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.index.bloom;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.SparkMemoryUtils;
import org.apache.hudi.client.common.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.MetadataNotFoundException;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.index.SparkHoodieIndex;
import org.apache.hudi.io.HoodieRangeInfoHandle;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.storage.StorageLevel;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import scala.Tuple2;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toList;
import static org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions;
/**
* Indexing mechanism based on bloom filter. Each parquet file includes its row_key bloom filter in its metadata.
*/
@SuppressWarnings("checkstyle:LineLength")
public class SparkHoodieBloomIndex<T extends HoodieRecordPayload> extends SparkHoodieIndex<T> {
private static final Logger LOG = LogManager.getLogger(SparkHoodieBloomIndex.class);
public SparkHoodieBloomIndex(HoodieWriteConfig config) {
super(config);
}
@Override
public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, HoodieEngineContext context,
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> hoodieTable) {
// Step 0: cache the input record RDD
if (config.getBloomIndexUseCaching()) {
recordRDD.persist(SparkMemoryUtils.getBloomIndexInputStorageLevel(config.getProps()));
}
// Step 1: Extract out thinner JavaPairRDD of (partitionPath, recordKey)
JavaPairRDD<String, String> partitionRecordKeyPairRDD =
recordRDD.mapToPair(record -> new Tuple2<>(record.getPartitionPath(), record.getRecordKey()));
// Lookup indexes for all the partition/recordkey pair
JavaPairRDD<HoodieKey, HoodieRecordLocation> keyFilenamePairRDD =
lookupIndex(partitionRecordKeyPairRDD, context, hoodieTable);
// Cache the result, for subsequent stages.
if (config.getBloomIndexUseCaching()) {
keyFilenamePairRDD.persist(StorageLevel.MEMORY_AND_DISK_SER());
}
if (LOG.isDebugEnabled()) {
long totalTaggedRecords = keyFilenamePairRDD.count();
LOG.debug("Number of update records (ones tagged with a fileID): " + totalTaggedRecords);
}
// Step 4: Tag the incoming records, as inserts or updates, by joining with existing record keys
// Cost: 4 sec.
JavaRDD<HoodieRecord<T>> taggedRecordRDD = tagLocationBacktoRecords(keyFilenamePairRDD, recordRDD);
if (config.getBloomIndexUseCaching()) {
recordRDD.unpersist(); // unpersist the input Record RDD
keyFilenamePairRDD.unpersist();
}
return taggedRecordRDD;
}
/**
* Lookup the location for each record key and return the pair<record_key,location> for all record keys already
* present and drop the record keys if not present.
*/
private JavaPairRDD<HoodieKey, HoodieRecordLocation> lookupIndex(
JavaPairRDD<String, String> partitionRecordKeyPairRDD, final HoodieEngineContext context,
final HoodieTable hoodieTable) {
// Obtain records per partition, in the incoming records
Map<String, Long> recordsPerPartition = partitionRecordKeyPairRDD.countByKey();
List<String> affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet());
// Step 2: Load all involved files as <Partition, filename> pairs
List<Tuple2<String, BloomIndexFileInfo>> fileInfoList =
loadInvolvedFiles(affectedPartitionPathList, context, hoodieTable);
final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo =
fileInfoList.stream().collect(groupingBy(Tuple2::_1, mapping(Tuple2::_2, toList())));
// Step 3: Obtain a RDD, for each incoming record, that already exists, with the file id,
// that contains it.
Map<String, Long> comparisonsPerFileGroup =
computeComparisonsPerFileGroup(recordsPerPartition, partitionToFileInfo, partitionRecordKeyPairRDD);
int inputParallelism = partitionRecordKeyPairRDD.partitions().size();
int joinParallelism = Math.max(inputParallelism, config.getBloomIndexParallelism());
LOG.info("InputParallelism: ${" + inputParallelism + "}, IndexParallelism: ${"
+ config.getBloomIndexParallelism() + "}");
return findMatchingFilesForRecordKeys(partitionToFileInfo, partitionRecordKeyPairRDD, joinParallelism, hoodieTable,
comparisonsPerFileGroup);
}
/**
* Compute the estimated number of bloom filter comparisons to be performed on each file group.
*/
private Map<String, Long> computeComparisonsPerFileGroup(final Map<String, Long> recordsPerPartition,
final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo,
JavaPairRDD<String, String> partitionRecordKeyPairRDD) {
Map<String, Long> fileToComparisons;
if (config.getBloomIndexPruneByRanges()) {
// we will just try exploding the input and then count to determine comparisons
// FIX(vc): Only do sampling here and extrapolate?
fileToComparisons = explodeRecordRDDWithFileComparisons(partitionToFileInfo, partitionRecordKeyPairRDD)
.mapToPair(t -> t).countByKey();
} else {
fileToComparisons = new HashMap<>();
partitionToFileInfo.forEach((key, value) -> {
for (BloomIndexFileInfo fileInfo : value) {
// each file needs to be compared against all the records coming into the partition
fileToComparisons.put(fileInfo.getFileId(), recordsPerPartition.get(key));
}
});
}
return fileToComparisons;
}
/**
* Load all involved files as <Partition, filename> pair RDD.
*/
List<Tuple2<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitions, final HoodieEngineContext context,
final HoodieTable hoodieTable) {
// Obtain the latest data files from all the partitions.
List<Pair<String, String>> partitionPathFileIDList = getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream()
.map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
.collect(toList());
if (config.getBloomIndexPruneByRanges()) {
// also obtain file ranges, if range pruning is enabled
context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on)");
return context.map(partitionPathFileIDList, pf -> {
try {
HoodieRangeInfoHandle rangeInfoHandle = new HoodieRangeInfoHandle(config, hoodieTable, pf);
String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
return new Tuple2<>(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), minMaxKeys[0], minMaxKeys[1]));
} catch (MetadataNotFoundException me) {
LOG.warn("Unable to find range metadata in file :" + pf);
return new Tuple2<>(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
}
}, Math.max(partitionPathFileIDList.size(), 1));
} else {
return partitionPathFileIDList.stream()
.map(pf -> new Tuple2<>(pf.getKey(), new BloomIndexFileInfo(pf.getValue()))).collect(toList());
}
}
@Override
public boolean rollbackCommit(String instantTime) {
// Nope, don't need to do anything.
return true;
}
/**
* This is not global, since we depend on the partitionPath to do the lookup.
*/
@Override
public boolean isGlobal() {
return false;
}
/**
* No indexes into log files yet.
*/
@Override
public boolean canIndexLogFiles() {
return false;
}
/**
* Bloom filters are stored, into the same data files.
*/
@Override
public boolean isImplicitWithStorage() {
return true;
}
/**
* For each incoming record, produce N output records, 1 each for each file against which the record's key needs to be
* checked. For tables, where the keys have a definite insert order (e.g: timestamp as prefix), the number of files
* to be compared gets cut down a lot from range pruning.
* <p>
* Sub-partition to ensure the records can be looked up against files & also prune file<=>record comparisons based on
* recordKey ranges in the index info.
*/
JavaRDD<Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
JavaPairRDD<String, String> partitionRecordKeyPairRDD) {
IndexFileFilter indexFileFilter =
config.useBloomIndexTreebasedFilter() ? new IntervalTreeBasedIndexFileFilter(partitionToFileIndexInfo)
: new ListBasedIndexFileFilter(partitionToFileIndexInfo);
return partitionRecordKeyPairRDD.map(partitionRecordKeyPair -> {
String recordKey = partitionRecordKeyPair._2();
String partitionPath = partitionRecordKeyPair._1();
return indexFileFilter.getMatchingFilesAndPartition(partitionPath, recordKey).stream()
.map(partitionFileIdPair -> new Tuple2<>(partitionFileIdPair.getRight(),
new HoodieKey(recordKey, partitionPath)))
.collect(Collectors.toList());
}).flatMap(List::iterator);
}
/**
* Find out <RowKey, filename> pair. All workload grouped by file-level.
* <p>
* Join PairRDD(PartitionPath, RecordKey) and PairRDD(PartitionPath, File) & then repartition such that each RDD
* partition is a file, then for each file, we do (1) load bloom filter, (2) load rowKeys, (3) Tag rowKey
* <p>
* Make sure the parallelism is atleast the groupby parallelism for tagging location
*/
JavaPairRDD<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
JavaPairRDD<String, String> partitionRecordKeyPairRDD, int shuffleParallelism, HoodieTable hoodieTable,
Map<String, Long> fileGroupToComparisons) {
JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD =
explodeRecordRDDWithFileComparisons(partitionToFileIndexInfo, partitionRecordKeyPairRDD);
if (config.useBloomIndexBucketizedChecking()) {
Partitioner partitioner = new BucketizedBloomCheckPartitioner(shuffleParallelism, fileGroupToComparisons,
config.getBloomIndexKeysPerBucket());
fileComparisonsRDD = fileComparisonsRDD.mapToPair(t -> new Tuple2<>(Pair.of(t._1, t._2.getRecordKey()), t))
.repartitionAndSortWithinPartitions(partitioner).map(Tuple2::_2);
} else {
fileComparisonsRDD = fileComparisonsRDD.sortBy(Tuple2::_1, true, shuffleParallelism);
}
return fileComparisonsRDD.mapPartitionsWithIndex(new HoodieBloomIndexCheckFunction(hoodieTable, config), true)
.flatMap(List::iterator).filter(lr -> lr.getMatchingRecordKeys().size() > 0)
.flatMapToPair(lookupResult -> lookupResult.getMatchingRecordKeys().stream()
.map(recordKey -> new Tuple2<>(new HoodieKey(recordKey, lookupResult.getPartitionPath()),
new HoodieRecordLocation(lookupResult.getBaseInstantTime(), lookupResult.getFileId())))
.collect(Collectors.toList()).iterator());
}
/**
* Tag the <rowKey, filename> back to the original HoodieRecord RDD.
*/
protected JavaRDD<HoodieRecord<T>> tagLocationBacktoRecords(
JavaPairRDD<HoodieKey, HoodieRecordLocation> keyFilenamePairRDD, JavaRDD<HoodieRecord<T>> recordRDD) {
JavaPairRDD<HoodieKey, HoodieRecord<T>> keyRecordPairRDD =
recordRDD.mapToPair(record -> new Tuple2<>(record.getKey(), record));
// Here as the recordRDD might have more data than rowKeyRDD (some rowKeys' fileId is null),
// so we do left outer join.
return keyRecordPairRDD.leftOuterJoin(keyFilenamePairRDD).values()
.map(v1 -> HoodieIndexUtils.getTaggedRecord(v1._1, Option.ofNullable(v1._2.orNull())));
}
@Override
public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, HoodieEngineContext context,
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> hoodieTable) {
return writeStatusRDD;
}
}