blob: b28c89a5364691909233c7e5bebfd27801403b9b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.common.HoodieEngineContext;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.NumericUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.stream.Collectors;
import scala.Tuple2;
/**
* Packs incoming records to be upserted, into buckets (1 bucket = 1 RDD partition).
*/
public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partitioner {
private static final Logger LOG = LogManager.getLogger(UpsertPartitioner.class);
/**
* List of all small files to be corrected.
*/
protected List<SmallFile> smallFiles = new ArrayList<>();
/**
* Total number of RDD partitions, is determined by total buckets we want to pack the incoming workload into.
*/
private int totalBuckets = 0;
/**
* Stat for the current workload. Helps in determining inserts, upserts etc.
*/
private WorkloadProfile profile;
/**
* Helps decide which bucket an incoming update should go to.
*/
private HashMap<String, Integer> updateLocationToBucket;
/**
* Helps us pack inserts into 1 or more buckets depending on number of incoming records.
*/
private HashMap<String, List<InsertBucketCumulativeWeightPair>> partitionPathToInsertBucketInfos;
/**
* Remembers what type each bucket is for later.
*/
private HashMap<Integer, BucketInfo> bucketInfoMap;
protected final HoodieTable table;
protected final HoodieWriteConfig config;
public UpsertPartitioner(WorkloadProfile profile, HoodieEngineContext context, HoodieTable table,
HoodieWriteConfig config) {
updateLocationToBucket = new HashMap<>();
partitionPathToInsertBucketInfos = new HashMap<>();
bucketInfoMap = new HashMap<>();
this.profile = profile;
this.table = table;
this.config = config;
assignUpdates(profile);
assignInserts(profile, context);
LOG.info("Total Buckets :" + totalBuckets + ", buckets info => " + bucketInfoMap + ", \n"
+ "Partition to insert buckets => " + partitionPathToInsertBucketInfos + ", \n"
+ "UpdateLocations mapped to buckets =>" + updateLocationToBucket);
}
private void assignUpdates(WorkloadProfile profile) {
// each update location gets a partition
Set<Entry<String, WorkloadStat>> partitionStatEntries = profile.getPartitionPathStatMap().entrySet();
for (Map.Entry<String, WorkloadStat> partitionStat : partitionStatEntries) {
for (Map.Entry<String, Pair<String, Long>> updateLocEntry :
partitionStat.getValue().getUpdateLocationToCount().entrySet()) {
addUpdateBucket(partitionStat.getKey(), updateLocEntry.getKey());
}
}
}
private int addUpdateBucket(String partitionPath, String fileIdHint) {
int bucket = totalBuckets;
updateLocationToBucket.put(fileIdHint, bucket);
BucketInfo bucketInfo = new BucketInfo();
bucketInfo.bucketType = BucketType.UPDATE;
bucketInfo.fileIdPrefix = fileIdHint;
bucketInfo.partitionPath = partitionPath;
bucketInfoMap.put(totalBuckets, bucketInfo);
totalBuckets++;
return bucket;
}
private void assignInserts(WorkloadProfile profile, HoodieEngineContext context) {
// for new inserts, compute buckets depending on how many records we have for each partition
Set<String> partitionPaths = profile.getPartitionPaths();
long averageRecordSize =
averageBytesPerRecord(table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
config);
LOG.info("AvgRecordSize => " + averageRecordSize);
Map<String, List<SmallFile>> partitionSmallFilesMap =
getSmallFilesForPartitions(new ArrayList<String>(partitionPaths), context);
for (String partitionPath : partitionPaths) {
WorkloadStat pStat = profile.getWorkloadStat(partitionPath);
if (pStat.getNumInserts() > 0) {
List<SmallFile> smallFiles = partitionSmallFilesMap.get(partitionPath);
this.smallFiles.addAll(smallFiles);
LOG.info("For partitionPath : " + partitionPath + " Small Files => " + smallFiles);
long totalUnassignedInserts = pStat.getNumInserts();
List<Integer> bucketNumbers = new ArrayList<>();
List<Long> recordsPerBucket = new ArrayList<>();
// first try packing this into one of the smallFiles
for (SmallFile smallFile : smallFiles) {
long recordsToAppend = Math.min((config.getParquetMaxFileSize() - smallFile.sizeBytes) / averageRecordSize,
totalUnassignedInserts);
if (recordsToAppend > 0 && totalUnassignedInserts > 0) {
// create a new bucket or re-use an existing bucket
int bucket;
if (updateLocationToBucket.containsKey(smallFile.location.getFileId())) {
bucket = updateLocationToBucket.get(smallFile.location.getFileId());
LOG.info("Assigning " + recordsToAppend + " inserts to existing update bucket " + bucket);
} else {
bucket = addUpdateBucket(partitionPath, smallFile.location.getFileId());
LOG.info("Assigning " + recordsToAppend + " inserts to new update bucket " + bucket);
}
bucketNumbers.add(bucket);
recordsPerBucket.add(recordsToAppend);
totalUnassignedInserts -= recordsToAppend;
}
}
// if we have anything more, create new insert buckets, like normal
if (totalUnassignedInserts > 0) {
long insertRecordsPerBucket = config.getCopyOnWriteInsertSplitSize();
if (config.shouldAutoTuneInsertSplits()) {
insertRecordsPerBucket = config.getParquetMaxFileSize() / averageRecordSize;
}
int insertBuckets = (int) Math.ceil((1.0 * totalUnassignedInserts) / insertRecordsPerBucket);
LOG.info("After small file assignment: unassignedInserts => " + totalUnassignedInserts
+ ", totalInsertBuckets => " + insertBuckets + ", recordsPerBucket => " + insertRecordsPerBucket);
for (int b = 0; b < insertBuckets; b++) {
bucketNumbers.add(totalBuckets);
recordsPerBucket.add(totalUnassignedInserts / insertBuckets);
BucketInfo bucketInfo = new BucketInfo();
bucketInfo.bucketType = BucketType.INSERT;
bucketInfo.partitionPath = partitionPath;
bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx();
bucketInfoMap.put(totalBuckets, bucketInfo);
totalBuckets++;
}
}
// Go over all such buckets, and assign weights as per amount of incoming inserts.
List<InsertBucketCumulativeWeightPair> insertBuckets = new ArrayList<>();
double curentCumulativeWeight = 0;
for (int i = 0; i < bucketNumbers.size(); i++) {
InsertBucket bkt = new InsertBucket();
bkt.bucketNumber = bucketNumbers.get(i);
bkt.weight = (1.0 * recordsPerBucket.get(i)) / pStat.getNumInserts();
curentCumulativeWeight += bkt.weight;
insertBuckets.add(new InsertBucketCumulativeWeightPair(bkt, curentCumulativeWeight));
}
LOG.info("Total insert buckets for partition path " + partitionPath + " => " + insertBuckets);
partitionPathToInsertBucketInfos.put(partitionPath, insertBuckets);
}
}
}
private Map<String, List<SmallFile>> getSmallFilesForPartitions(List<String> partitionPaths, HoodieEngineContext context) {
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
Map<String, List<SmallFile>> partitionSmallFilesMap = new HashMap<>();
if (partitionPaths != null && partitionPaths.size() > 0) {
context.setJobStatus(this.getClass().getSimpleName(), "Getting small files from partitions");
JavaRDD<String> partitionPathRdds = jsc.parallelize(partitionPaths, partitionPaths.size());
partitionSmallFilesMap = partitionPathRdds.mapToPair((PairFunction<String, String, List<SmallFile>>)
partitionPath -> new Tuple2<>(partitionPath, getSmallFiles(partitionPath))).collectAsMap();
}
return partitionSmallFilesMap;
}
/**
* Returns a list of small files in the given partition path.
*/
protected List<SmallFile> getSmallFiles(String partitionPath) {
// smallFiles only for partitionPath
List<SmallFile> smallFileLocations = new ArrayList<>();
HoodieTimeline commitTimeline = table.getMetaClient().getCommitsTimeline().filterCompletedInstants();
if (!commitTimeline.empty()) { // if we have some commits
HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
List<HoodieBaseFile> allFiles = table.getBaseFileOnlyView()
.getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList());
for (HoodieBaseFile file : allFiles) {
if (file.getFileSize() < config.getParquetSmallFileLimit()) {
String filename = file.getFileName();
SmallFile sf = new SmallFile();
sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
sf.sizeBytes = file.getFileSize();
smallFileLocations.add(sf);
}
}
}
return smallFileLocations;
}
public BucketInfo getBucketInfo(int bucketNumber) {
return bucketInfoMap.get(bucketNumber);
}
public List<InsertBucketCumulativeWeightPair> getInsertBuckets(String partitionPath) {
return partitionPathToInsertBucketInfos.get(partitionPath);
}
@Override
public int numPartitions() {
return totalBuckets;
}
@Override
public int getPartition(Object key) {
Tuple2<HoodieKey, Option<HoodieRecordLocation>> keyLocation =
(Tuple2<HoodieKey, Option<HoodieRecordLocation>>) key;
if (keyLocation._2().isPresent()) {
HoodieRecordLocation location = keyLocation._2().get();
return updateLocationToBucket.get(location.getFileId());
} else {
String partitionPath = keyLocation._1().getPartitionPath();
List<InsertBucketCumulativeWeightPair> targetBuckets = partitionPathToInsertBucketInfos.get(partitionPath);
// pick the target bucket to use based on the weights.
final long totalInserts = Math.max(1, profile.getWorkloadStat(partitionPath).getNumInserts());
final long hashOfKey = NumericUtils.getMessageDigestHash("MD5", keyLocation._1().getRecordKey());
final double r = 1.0 * Math.floorMod(hashOfKey, totalInserts) / totalInserts;
int index = Collections.binarySearch(targetBuckets, new InsertBucketCumulativeWeightPair(new InsertBucket(), r));
if (index >= 0) {
return targetBuckets.get(index).getKey().bucketNumber;
}
if ((-1 * index - 1) < targetBuckets.size()) {
return targetBuckets.get((-1 * index - 1)).getKey().bucketNumber;
}
// return first one, by default
return targetBuckets.get(0).getKey().bucketNumber;
}
}
/**
* Obtains the average record size based on records written during previous commits. Used for estimating how many
* records pack into one file.
*/
protected static long averageBytesPerRecord(HoodieTimeline commitTimeline, HoodieWriteConfig hoodieWriteConfig) {
long avgSize = hoodieWriteConfig.getCopyOnWriteRecordSizeEstimate();
long fileSizeThreshold = (long) (hoodieWriteConfig.getRecordSizeEstimationThreshold() * hoodieWriteConfig.getParquetSmallFileLimit());
try {
if (!commitTimeline.empty()) {
// Go over the reverse ordered commits to get a more recent estimate of average record size.
Iterator<HoodieInstant> instants = commitTimeline.getReverseOrderedInstants().iterator();
while (instants.hasNext()) {
HoodieInstant instant = instants.next();
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(commitTimeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
long totalBytesWritten = commitMetadata.fetchTotalBytesWritten();
long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten();
if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0) {
avgSize = (long) Math.ceil((1.0 * totalBytesWritten) / totalRecordsWritten);
break;
}
}
}
} catch (Throwable t) {
// make this fail safe.
LOG.error("Error trying to compute average bytes/record ", t);
}
return avgSize;
}
}