blob: 36710dc02bb9beed6f467a96d3999056257de220 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.index.bloom;
import org.apache.hudi.common.util.NumericUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Partitions bloom filter checks by spreading out comparisons across buckets of work.
*
* Each bucket incurs the following cost
*
* <pre>
* 1) Read bloom filter from file footer
* 2) Check keys against bloom filter
* 3) [Conditional] If any key had a hit, open file and check
* </pre>
*
* The partitioner performs a two phase bin packing algorithm, to pack enough work into each bucket such that cost of
* (1) & (3) is amortized. Also, avoids any skews in the sort based approach, by directly partitioning by the file to be
* checked against and ensuring each partition has similar number of buckets. Performance tests show that this approach
* could bound the amount of skew to std_dev(numberOfBucketsPerPartition) * cost of (3), lower than sort partitioning.
*
* Approach has two goals :
*
* <pre>
* 1) Pack as many buckets from same file group into same partition, to amortize cost of (1) and (2) further
* 2) Spread buckets across partitions evenly to achieve skew reduction
* </pre>
*/
public class BucketizedBloomCheckPartitioner extends Partitioner {
private static final Logger LOG = LogManager.getLogger(BucketizedBloomCheckPartitioner.class);
private int partitions;
/**
* Stores the final mapping of a file group to a list of partitions for its keys.
*/
private Map<String, List<Integer>> fileGroupToPartitions;
/**
* Create a partitioner that computes a plan based on provided workload characteristics.
*
* @param targetPartitions maximum number of partitions to target
* @param fileGroupToComparisons number of expected comparisons per file group
* @param keysPerBucket maximum number of keys to pack in a single bucket
*/
public BucketizedBloomCheckPartitioner(int targetPartitions, Map<String, Long> fileGroupToComparisons,
int keysPerBucket) {
this.fileGroupToPartitions = new HashMap<>();
Map<String, Integer> bucketsPerFileGroup = new HashMap<>();
// Compute the buckets needed per file group, using simple uniform distribution
fileGroupToComparisons.forEach((f, c) -> bucketsPerFileGroup.put(f, (int) Math.ceil((c * 1.0) / keysPerBucket)));
int totalBuckets = bucketsPerFileGroup.values().stream().mapToInt(i -> i).sum();
// If totalBuckets > targetPartitions, no need to have extra partitions
this.partitions = Math.min(targetPartitions, totalBuckets);
// PHASE 1 : start filling upto minimum number of buckets into partitions, taking all but one bucket from each file
// This tries to first optimize for goal 1 above, with knowledge that each partition needs a certain minimum number
// of buckets and assigns buckets in the same order as file groups. If we were to simply round robin, then buckets
// for a file group is more or less guaranteed to be placed on different partitions all the time.
int minBucketsPerPartition = Math.max((int) Math.floor((1.0 * totalBuckets) / partitions), 1);
LOG.info(String.format("TotalBuckets %d, min_buckets/partition %d", totalBuckets, minBucketsPerPartition));
int[] bucketsFilled = new int[partitions];
Map<String, AtomicInteger> bucketsFilledPerFileGroup = new HashMap<>();
int partitionIndex = 0;
for (Map.Entry<String, Integer> e : bucketsPerFileGroup.entrySet()) {
for (int b = 0; b < Math.max(1, e.getValue() - 1); b++) {
// keep filled counts upto date
bucketsFilled[partitionIndex]++;
AtomicInteger cnt = bucketsFilledPerFileGroup.getOrDefault(e.getKey(), new AtomicInteger(0));
cnt.incrementAndGet();
bucketsFilledPerFileGroup.put(e.getKey(), cnt);
// mark this partition against the file group
List<Integer> partitionList = this.fileGroupToPartitions.getOrDefault(e.getKey(), new ArrayList<>());
partitionList.add(partitionIndex);
this.fileGroupToPartitions.put(e.getKey(), partitionList);
// switch to new partition if needed
if (bucketsFilled[partitionIndex] >= minBucketsPerPartition) {
partitionIndex = (partitionIndex + 1) % partitions;
}
}
}
// PHASE 2 : for remaining unassigned buckets, round robin over partitions once. Since we withheld 1 bucket from
// each file group uniformly, this remaining is also an uniform mix across file groups. We just round robin to
// optimize for goal 2.
for (Map.Entry<String, Integer> e : bucketsPerFileGroup.entrySet()) {
int remaining = e.getValue() - bucketsFilledPerFileGroup.get(e.getKey()).intValue();
for (int r = 0; r < remaining; r++) {
// mark this partition against the file group
this.fileGroupToPartitions.get(e.getKey()).add(partitionIndex);
bucketsFilled[partitionIndex]++;
partitionIndex = (partitionIndex + 1) % partitions;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Partitions assigned per file groups :" + fileGroupToPartitions);
StringBuilder str = new StringBuilder();
for (int i = 0; i < bucketsFilled.length; i++) {
str.append("p" + i + " : " + bucketsFilled[i] + ",");
}
LOG.debug("Num buckets assigned per file group :" + str);
}
}
@Override
public int numPartitions() {
return partitions;
}
@Override
public int getPartition(Object key) {
final Pair<String, String> parts = (Pair<String, String>) key;
final long hashOfKey = NumericUtils.getMessageDigestHash("MD5", parts.getRight());
final List<Integer> candidatePartitions = fileGroupToPartitions.get(parts.getLeft());
final int idx = (int) Math.floorMod((int) hashOfKey, candidatePartitions.size());
assert idx >= 0;
return candidatePartitions.get(idx);
}
Map<String, List<Integer>> getFileGroupToPartitions() {
return fileGroupToPartitions;
}
}