blob: 1a256956bfc98318319a01808af55c1d292f1cad [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hudi.common.table.log;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.HoodieEmptyRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import javax.annotation.concurrent.NotThreadSafe;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
* Scans through all the blocks in a list of HoodieLogFile and builds up a compacted/merged list of records which will
* be used as a lookup table when merging the base columnar file with the redo log file.
* <p>
* NOTE: If readBlockLazily is turned on, does not merge, instead keeps reading log blocks and merges everything at once
* This is an optimization to avoid seek() back and forth to read new block (forward seek()) and lazily read content of
* seen block (reverse and forward seek()) during merge | | Read Block 1 Metadata | | Read Block 1 Data | | | Read Block
* 2 Metadata | | Read Block 2 Data | | I/O Pass 1 | ..................... | I/O Pass 2 | ................. | | | Read
* Block N Metadata | | Read Block N Data |
* <p>
* This results in two I/O passes over the log file.
public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
implements Iterable<HoodieRecord>, Closeable {
private static final Logger LOG = LogManager.getLogger(HoodieMergedLogRecordScanner.class);
// A timer for calculating elapsed time in millis
public final HoodieTimer timer = new HoodieTimer();
// Map of compacted/merged records
private final ExternalSpillableMap<String, HoodieRecord> records;
// Set of already scanned prefixes allowing us to avoid scanning same prefixes again
private final Set<String> scannedPrefixes;
// count of merged records in log
private long numMergedRecordsInLog;
private final long maxMemorySizeInBytes;
// Stores the total time taken to perform reading and merging of log blocks
private long totalTimeTakenToReadAndMergeBlocks;
private HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema,
String latestInstantTime, Long maxMemorySizeInBytes, boolean readBlocksLazily,
boolean reverseReader, int bufferSize, String spillableMapBasePath,
Option<InstantRange> instantRange,
ExternalSpillableMap.DiskMapType diskMapType,
boolean isBitCaskDiskMapCompressionEnabled,
boolean withOperationField, boolean forceFullScan,
Option<String> partitionName,
InternalSchema internalSchema,
Option<String> keyFieldOverride,
boolean enableOptimizedLogBlocksScan, HoodieRecordMerger recordMerger) {
super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize,
instantRange, withOperationField, forceFullScan, partitionName, internalSchema, keyFieldOverride, enableOptimizedLogBlocksScan, recordMerger);
try {
this.maxMemorySizeInBytes = maxMemorySizeInBytes;
// Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize
this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(),
new HoodieRecordSizeEstimator(readerSchema), diskMapType, isBitCaskDiskMapCompressionEnabled);
this.scannedPrefixes = new HashSet<>();
} catch (IOException e) {
throw new HoodieIOException("IOException when creating ExternalSpillableMap at " + spillableMapBasePath, e);
if (forceFullScan) {
* Scans delta-log files processing blocks
public final void scan() {
public final void scan(boolean skipProcessingBlocks) {
if (forceFullScan) {
// NOTE: When full-scan is enforced, scanning is invoked upfront (during initialization)
scanInternal(Option.empty(), skipProcessingBlocks);
* Provides incremental scanning capability where only provided keys will be looked
* up in the delta-log files, scanned and subsequently materialized into the internal
* cache
* @param keys to be looked up
public void scanByFullKeys(List<String> keys) {
// We can skip scanning in case reader is in full-scan mode, in which case all blocks
// are processed upfront (no additional scanning is necessary)
if (forceFullScan) {
return; // no-op
List<String> missingKeys =
.filter(key -> !records.containsKey(key))
if (missingKeys.isEmpty()) {
// All the required records are already fetched, no-op
scanInternal(Option.of(KeySpec.fullKeySpec(missingKeys)), false);
* Provides incremental scanning capability where only keys matching provided key-prefixes
* will be looked up in the delta-log files, scanned and subsequently materialized into
* the internal cache
* @param keyPrefixes to be looked up
public void scanByKeyPrefixes(List<String> keyPrefixes) {
// We can skip scanning in case reader is in full-scan mode, in which case all blocks
// are processed upfront (no additional scanning is necessary)
if (forceFullScan) {
List<String> missingKeyPrefixes =
.filter(keyPrefix ->
// NOTE: We can skip scanning the prefixes that have already
// been covered by the previous scans
if (missingKeyPrefixes.isEmpty()) {
// All the required records are already fetched, no-op
// NOTE: When looking up by key-prefixes unfortunately we can't short-circuit
// and will have to scan every time as we can't know (based on just
// the records cached) whether particular prefix was scanned or just records
// matching the prefix looked up (by [[scanByFullKeys]] API)
scanInternal(Option.of(KeySpec.prefixKeySpec(missingKeyPrefixes)), false);
private void performScan() {
// Do the scan and merge
scanInternal(Option.empty(), false);
this.totalTimeTakenToReadAndMergeBlocks = timer.endTimer();
this.numMergedRecordsInLog = records.size();"Number of log files scanned => " + logFilePaths.size());"MaxMemoryInBytes allowed for compaction => " + maxMemorySizeInBytes);"Number of entries in MemoryBasedMap in ExternalSpillableMap => " + records.getInMemoryMapNumEntries());"Total size in bytes of MemoryBasedMap in ExternalSpillableMap => " + records.getCurrentInMemoryMapSize());"Number of entries in BitCaskDiskMap in ExternalSpillableMap => " + records.getDiskBasedMapNumEntries());"Size of file spilled to disk => " + records.getSizeOfFileOnDiskInBytes());
public Iterator<HoodieRecord> iterator() {
return records.iterator();
public Map<String, HoodieRecord> getRecords() {
return records;
public HoodieRecordType getRecordType() {
return recordMerger.getRecordType();
public long getNumMergedRecordsInLog() {
return numMergedRecordsInLog;
* Returns the builder for {@code HoodieMergedLogRecordScanner}.
public static HoodieMergedLogRecordScanner.Builder newBuilder() {
return new Builder();
protected <T> void processNextRecord(HoodieRecord<T> newRecord) throws IOException {
String key = newRecord.getRecordKey();
HoodieRecord<T> prevRecord = records.get(key);
if (prevRecord != null) {
// Merge and store the combined record
HoodieRecord<T> combinedRecord = (HoodieRecord<T>) recordMerger.merge(prevRecord, readerSchema,
newRecord, readerSchema, this.getPayloadProps()).get().getLeft();
// If pre-combine returns existing record, no need to update it
if (combinedRecord.getData() != prevRecord.getData()) {
HoodieRecord latestHoodieRecord =
combinedRecord.newInstance(new HoodieKey(key, newRecord.getPartitionPath()), newRecord.getOperation());
// NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
// it since these records will be put into records(Map).
records.put(key, latestHoodieRecord.copy());
} else {
// Put the record as is
// NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
// it since these records will be put into records(Map).
records.put(key, newRecord.copy());
protected void processNextDeletedRecord(DeleteRecord deleteRecord) {
String key = deleteRecord.getRecordKey();
HoodieRecord oldRecord = records.get(key);
if (oldRecord != null) {
// Merge and store the merged record. The ordering val is taken to decide whether the same key record
// should be deleted or be kept. The old record is kept only if the DELETE record has smaller ordering val.
// For same ordering values, uses the natural order(arrival time semantics).
Comparable curOrderingVal = oldRecord.getOrderingValue(this.readerSchema, this.hoodieTableMetaClient.getTableConfig().getProps());
Comparable deleteOrderingVal = deleteRecord.getOrderingValue();
// Checks the ordering value does not equal to 0
// because we use 0 as the default value which means natural order
boolean choosePrev = !deleteOrderingVal.equals(0)
&& ReflectionUtils.isSameClass(curOrderingVal, deleteOrderingVal)
&& curOrderingVal.compareTo(deleteOrderingVal) > 0;
if (choosePrev) {
// The DELETE message is obsolete if the old message has greater orderingVal.
// Put the DELETE record
if (recordType == HoodieRecordType.AVRO) {
records.put(key, SpillableMapUtils.generateEmptyPayload(key,
deleteRecord.getPartitionPath(), deleteRecord.getOrderingValue(), getPayloadClassFQN()));
} else {
HoodieEmptyRecord record = new HoodieEmptyRecord<>(new HoodieKey(key, deleteRecord.getPartitionPath()), null, deleteRecord.getOrderingValue(), recordType);
records.put(key, record);
public long getTotalTimeTakenToReadAndMergeBlocks() {
return totalTimeTakenToReadAndMergeBlocks;
public void close() {
if (records != null) {
* Builder used to build {@code HoodieUnMergedLogRecordScanner}.
public static class Builder extends AbstractHoodieLogRecordReader.Builder {
private FileSystem fs;
private String basePath;
private List<String> logFilePaths;
private Schema readerSchema;
private InternalSchema internalSchema = InternalSchema.getEmptyInternalSchema();
private String latestInstantTime;
private boolean readBlocksLazily;
private boolean reverseReader;
private int bufferSize;
// specific configurations
private Long maxMemorySizeInBytes;
private String spillableMapBasePath;
private ExternalSpillableMap.DiskMapType diskMapType = HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue();
private boolean isBitCaskDiskMapCompressionEnabled = HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue();
// incremental filtering
private Option<InstantRange> instantRange = Option.empty();
private String partitionName;
// operation field default false
private boolean withOperationField = false;
private String keyFieldOverride;
// By default, we're doing a full-scan
private boolean forceFullScan = true;
// Use scanV2 method.
private boolean enableOptimizedLogBlocksScan = false;
private HoodieRecordMerger recordMerger;
public Builder withFileSystem(FileSystem fs) {
this.fs = fs;
return this;
public Builder withBasePath(String basePath) {
this.basePath = basePath;
return this;
public Builder withLogFilePaths(List<String> logFilePaths) {
this.logFilePaths =
.filter(p -> !p.endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
return this;
public Builder withReaderSchema(Schema schema) {
this.readerSchema = schema;
return this;
public Builder withLatestInstantTime(String latestInstantTime) {
this.latestInstantTime = latestInstantTime;
return this;
public Builder withReadBlocksLazily(boolean readBlocksLazily) {
this.readBlocksLazily = readBlocksLazily;
return this;
public Builder withReverseReader(boolean reverseReader) {
this.reverseReader = reverseReader;
return this;
public Builder withBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
return this;
public Builder withInstantRange(Option<InstantRange> instantRange) {
this.instantRange = instantRange;
return this;
public Builder withMaxMemorySizeInBytes(Long maxMemorySizeInBytes) {
this.maxMemorySizeInBytes = maxMemorySizeInBytes;
return this;
public Builder withSpillableMapBasePath(String spillableMapBasePath) {
this.spillableMapBasePath = spillableMapBasePath;
return this;
public Builder withDiskMapType(ExternalSpillableMap.DiskMapType diskMapType) {
this.diskMapType = diskMapType;
return this;
public Builder withBitCaskDiskMapCompressionEnabled(boolean isBitCaskDiskMapCompressionEnabled) {
this.isBitCaskDiskMapCompressionEnabled = isBitCaskDiskMapCompressionEnabled;
return this;
public Builder withInternalSchema(InternalSchema internalSchema) {
this.internalSchema = internalSchema;
return this;
public Builder withOperationField(boolean withOperationField) {
this.withOperationField = withOperationField;
return this;
public Builder withPartition(String partitionName) {
this.partitionName = partitionName;
return this;
public Builder withOptimizedLogBlocksScan(boolean enableOptimizedLogBlocksScan) {
this.enableOptimizedLogBlocksScan = enableOptimizedLogBlocksScan;
return this;
public Builder withRecordMerger(HoodieRecordMerger recordMerger) {
this.recordMerger = recordMerger;
return this;
public Builder withKeyFiledOverride(String keyFieldOverride) {
this.keyFieldOverride = Objects.requireNonNull(keyFieldOverride);
return this;
public Builder withForceFullScan(boolean forceFullScan) {
this.forceFullScan = forceFullScan;
return this;
public HoodieMergedLogRecordScanner build() {
if (this.partitionName == null && CollectionUtils.nonEmpty(this.logFilePaths)) {
this.partitionName = getRelativePartitionPath(new Path(basePath), new Path(this.logFilePaths.get(0)).getParent());
ValidationUtils.checkArgument(recordMerger != null);
return new HoodieMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema,
latestInstantTime, maxMemorySizeInBytes, readBlocksLazily, reverseReader,
bufferSize, spillableMapBasePath, instantRange,
diskMapType, isBitCaskDiskMapCompressionEnabled, withOperationField, forceFullScan,
Option.ofNullable(partitionName), internalSchema, Option.ofNullable(keyFieldOverride), enableOptimizedLogBlocksScan, recordMerger);