blob: 0542a3b7da09bb6e4ce69c9b9087eb0aa32ead91 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.table.log;
import org.apache.hudi.common.fs.BoundedFsDataInputStream;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.SchemeAwareFSDataInputStream;
import org.apache.hudi.common.fs.StorageSchemes;
import org.apache.hudi.common.fs.TimedFSDataInputStream;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieCDCDataBlock;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieCorruptBlock;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.table.log.block.HoodieHFileDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.CorruptedLogFileException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BufferedFSInputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import javax.annotation.Nullable;
import java.io.EOFException;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
/**
* Scans a log file and provides block level iterator on the log file Loads the entire block contents in memory Can emit
* either a DataBlock, CommandBlock, DeleteBlock or CorruptBlock (if one is found).
*/
public class HoodieLogFileReader implements HoodieLogFormat.Reader {
public static final int DEFAULT_BUFFER_SIZE = 16 * 1024 * 1024; // 16 MB
private static final int BLOCK_SCAN_READ_BUFFER_SIZE = 1024 * 1024; // 1 MB
private static final Logger LOG = LogManager.getLogger(HoodieLogFileReader.class);
private final FileSystem fs;
private final Configuration hadoopConf;
private final FSDataInputStream inputStream;
private final HoodieLogFile logFile;
private final byte[] magicBuffer = new byte[6];
private final Schema readerSchema;
private InternalSchema internalSchema;
private final String keyField;
private boolean readBlockLazily;
private long reverseLogFilePosition;
private long lastReverseLogFilePosition;
private boolean reverseReader;
private boolean enableRecordLookups;
private boolean closed = false;
private transient Thread shutdownThread = null;
public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize,
boolean readBlockLazily) throws IOException {
this(fs, logFile, readerSchema, bufferSize, readBlockLazily, false);
}
public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize,
boolean readBlockLazily, boolean reverseReader) throws IOException {
this(fs, logFile, readerSchema, bufferSize, readBlockLazily, reverseReader, false,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
}
public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize,
boolean readBlockLazily, boolean reverseReader, boolean enableRecordLookups,
String keyField) throws IOException {
this(fs, logFile, readerSchema, bufferSize, readBlockLazily, reverseReader, enableRecordLookups, keyField, InternalSchema.getEmptyInternalSchema());
}
public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize,
boolean readBlockLazily, boolean reverseReader, boolean enableRecordLookups,
String keyField, InternalSchema internalSchema) throws IOException {
this.fs = fs;
this.hadoopConf = fs.getConf();
// NOTE: We repackage {@code HoodieLogFile} here to make sure that the provided path
// is prefixed with an appropriate scheme given that we're not propagating the FS
// further
this.logFile = new HoodieLogFile(FSUtils.makeQualified(fs, logFile.getPath()), logFile.getFileSize());
this.inputStream = getFSDataInputStream(fs, this.logFile, bufferSize);
this.readerSchema = readerSchema;
this.readBlockLazily = readBlockLazily;
this.reverseReader = reverseReader;
this.enableRecordLookups = enableRecordLookups;
this.keyField = keyField;
this.internalSchema = internalSchema == null ? InternalSchema.getEmptyInternalSchema() : internalSchema;
if (this.reverseReader) {
this.reverseLogFilePosition = this.lastReverseLogFilePosition = this.logFile.getFileSize();
}
addShutDownHook();
}
@Override
public HoodieLogFile getLogFile() {
return logFile;
}
/**
* Close the inputstream if not closed when the JVM exits.
*/
private void addShutDownHook() {
shutdownThread = new Thread(() -> {
try {
close();
} catch (Exception e) {
LOG.warn("unable to close input stream for log file " + logFile, e);
// fail silently for any sort of exception
}
});
Runtime.getRuntime().addShutdownHook(shutdownThread);
}
// TODO : convert content and block length to long by using ByteBuffer, raw byte [] allows
// for max of Integer size
private HoodieLogBlock readBlock() throws IOException {
int blockSize;
long blockStartPos = inputStream.getPos();
try {
// 1 Read the total size of the block
blockSize = (int) inputStream.readLong();
} catch (EOFException | CorruptedLogFileException e) {
// An exception reading any of the above indicates a corrupt block
// Create a corrupt block by finding the next MAGIC marker or EOF
return createCorruptBlock(blockStartPos);
}
// We may have had a crash which could have written this block partially
// Skip blockSize in the stream and we should either find a sync marker (start of the next
// block) or EOF. If we did not find either of it, then this block is a corrupted block.
boolean isCorrupted = isBlockCorrupted(blockSize);
if (isCorrupted) {
return createCorruptBlock(blockStartPos);
}
// 2. Read the version for this log format
HoodieLogFormat.LogFormatVersion nextBlockVersion = readVersion();
// 3. Read the block type for a log block
HoodieLogBlockType blockType = tryReadBlockType(nextBlockVersion);
// 4. Read the header for a log block, if present
Map<HeaderMetadataType, String> header =
nextBlockVersion.hasHeader() ? HoodieLogBlock.getLogMetadata(inputStream) : null;
// 5. Read the content length for the content
// Fallback to full-block size if no content-length
// TODO replace w/ hasContentLength
int contentLength =
nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION ? (int) inputStream.readLong() : blockSize;
// 6. Read the content or skip content based on IO vs Memory trade-off by client
long contentPosition = inputStream.getPos();
boolean shouldReadLazily = readBlockLazily && nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION;
Option<byte[]> content = HoodieLogBlock.tryReadContent(inputStream, contentLength, shouldReadLazily);
// 7. Read footer if any
Map<HeaderMetadataType, String> footer =
nextBlockVersion.hasFooter() ? HoodieLogBlock.getLogMetadata(inputStream) : null;
// 8. Read log block length, if present. This acts as a reverse pointer when traversing a
// log file in reverse
if (nextBlockVersion.hasLogBlockLength()) {
inputStream.readLong();
}
// 9. Read the log block end position in the log file
long blockEndPos = inputStream.getPos();
HoodieLogBlock.HoodieLogBlockContentLocation logBlockContentLoc =
new HoodieLogBlock.HoodieLogBlockContentLocation(hadoopConf, logFile, contentPosition, contentLength, blockEndPos);
switch (Objects.requireNonNull(blockType)) {
case AVRO_DATA_BLOCK:
if (nextBlockVersion.getVersion() == HoodieLogFormatVersion.DEFAULT_VERSION) {
return HoodieAvroDataBlock.getBlock(content.get(), readerSchema, internalSchema);
} else {
return new HoodieAvroDataBlock(inputStream, content, readBlockLazily, logBlockContentLoc,
getTargetReaderSchemaForBlock(), header, footer, keyField);
}
case HFILE_DATA_BLOCK:
checkState(nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION,
String.format("HFile block could not be of version (%d)", HoodieLogFormatVersion.DEFAULT_VERSION));
return new HoodieHFileDataBlock(inputStream, content, readBlockLazily, logBlockContentLoc,
Option.ofNullable(readerSchema), header, footer, enableRecordLookups, logFile.getPath());
case PARQUET_DATA_BLOCK:
checkState(nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION,
String.format("Parquet block could not be of version (%d)", HoodieLogFormatVersion.DEFAULT_VERSION));
return new HoodieParquetDataBlock(inputStream, content, readBlockLazily, logBlockContentLoc,
getTargetReaderSchemaForBlock(), header, footer, keyField);
case DELETE_BLOCK:
return new HoodieDeleteBlock(content, inputStream, readBlockLazily, Option.of(logBlockContentLoc), header, footer);
case COMMAND_BLOCK:
return new HoodieCommandBlock(content, inputStream, readBlockLazily, Option.of(logBlockContentLoc), header, footer);
case CDC_DATA_BLOCK:
return new HoodieCDCDataBlock(inputStream, content, readBlockLazily, logBlockContentLoc, readerSchema, header, keyField);
default:
throw new HoodieNotSupportedException("Unsupported Block " + blockType);
}
}
private Option<Schema> getTargetReaderSchemaForBlock() {
// we should use write schema to read log file,
// since when we have done some DDL operation, the readerSchema maybe different from writeSchema, avro reader will throw exception.
// eg: origin writeSchema is: "a String, b double" then we add a new column now the readerSchema will be: "a string, c int, b double". it's wrong to use readerSchema to read old log file.
// after we read those record by writeSchema, we rewrite those record with readerSchema in AbstractHoodieLogRecordReader
if (internalSchema.isEmptySchema()) {
return Option.ofNullable(this.readerSchema);
} else {
return Option.empty();
}
}
@Nullable
private HoodieLogBlockType tryReadBlockType(HoodieLogFormat.LogFormatVersion blockVersion) throws IOException {
if (blockVersion.getVersion() == HoodieLogFormatVersion.DEFAULT_VERSION) {
return null;
}
int type = inputStream.readInt();
checkArgument(type < HoodieLogBlockType.values().length, "Invalid block byte type found " + type);
return HoodieLogBlockType.values()[type];
}
private HoodieLogBlock createCorruptBlock(long blockStartPos) throws IOException {
LOG.info("Log " + logFile + " has a corrupted block at " + blockStartPos);
inputStream.seek(blockStartPos);
long nextBlockOffset = scanForNextAvailableBlockOffset();
// Rewind to the initial start and read corrupted bytes till the nextBlockOffset
inputStream.seek(blockStartPos);
LOG.info("Next available block in " + logFile + " starts at " + nextBlockOffset);
int corruptedBlockSize = (int) (nextBlockOffset - blockStartPos);
long contentPosition = inputStream.getPos();
Option<byte[]> corruptedBytes = HoodieLogBlock.tryReadContent(inputStream, corruptedBlockSize, readBlockLazily);
HoodieLogBlock.HoodieLogBlockContentLocation logBlockContentLoc =
new HoodieLogBlock.HoodieLogBlockContentLocation(hadoopConf, logFile, contentPosition, corruptedBlockSize, nextBlockOffset);
return new HoodieCorruptBlock(corruptedBytes, inputStream, readBlockLazily, Option.of(logBlockContentLoc), new HashMap<>(), new HashMap<>());
}
private boolean isBlockCorrupted(int blocksize) throws IOException {
if (StorageSchemes.isWriteTransactional(fs.getScheme())) {
// skip block corrupt check if writes are transactional. see https://issues.apache.org/jira/browse/HUDI-2118
return false;
}
long currentPos = inputStream.getPos();
long blockSizeFromFooter;
try {
// check if the blocksize mentioned in the footer is the same as the header;
// by seeking and checking the length of a long. We do not seek `currentPos + blocksize`
// which can be the file size for the last block in the file, causing EOFException
// for some FSDataInputStream implementation
inputStream.seek(currentPos + blocksize - Long.BYTES);
// Block size in the footer includes the magic header, which the header does not include.
// So we have to shorten the footer block size by the size of magic hash
blockSizeFromFooter = inputStream.readLong() - magicBuffer.length;
} catch (EOFException e) {
LOG.info("Found corrupted block in file " + logFile + " with block size(" + blocksize + ") running past EOF");
// this is corrupt
// This seek is required because contract of seek() is different for naked DFSInputStream vs BufferedFSInputStream
// release-3.1.0-RC1/DFSInputStream.java#L1455
// release-3.1.0-RC1/BufferedFSInputStream.java#L73
inputStream.seek(currentPos);
return true;
}
if (blocksize != blockSizeFromFooter) {
LOG.info("Found corrupted block in file " + logFile + ". Header block size(" + blocksize
+ ") did not match the footer block size(" + blockSizeFromFooter + ")");
inputStream.seek(currentPos);
return true;
}
try {
readMagic();
// all good - either we found the sync marker or EOF. Reset position and continue
return false;
} catch (CorruptedLogFileException e) {
// This is a corrupted block
LOG.info("Found corrupted block in file " + logFile + ". No magic hash found right after footer block size entry");
return true;
} finally {
inputStream.seek(currentPos);
}
}
private long scanForNextAvailableBlockOffset() throws IOException {
// Make buffer large enough to scan through the file as quick as possible especially if it is on S3/GCS.
byte[] dataBuf = new byte[BLOCK_SCAN_READ_BUFFER_SIZE];
boolean eof = false;
while (true) {
long currentPos = inputStream.getPos();
try {
Arrays.fill(dataBuf, (byte) 0);
inputStream.readFully(dataBuf, 0, dataBuf.length);
} catch (EOFException e) {
eof = true;
}
long pos = Bytes.indexOf(dataBuf, HoodieLogFormat.MAGIC);
if (pos >= 0) {
return currentPos + pos;
}
if (eof) {
return inputStream.getPos();
}
inputStream.seek(currentPos + dataBuf.length - HoodieLogFormat.MAGIC.length);
}
}
@Override
public void close() throws IOException {
if (!closed) {
this.inputStream.close();
if (null != shutdownThread) {
Runtime.getRuntime().removeShutdownHook(shutdownThread);
}
closed = true;
}
}
/*
* hasNext is not idempotent. TODO - Fix this. It is okay for now - PR
*/
@Override
public boolean hasNext() {
try {
return readMagic();
} catch (IOException e) {
throw new HoodieIOException("IOException when reading logfile " + logFile, e);
}
}
/**
* Read log format version from log file.
*/
private HoodieLogFormat.LogFormatVersion readVersion() throws IOException {
return new HoodieLogFormatVersion(inputStream.readInt());
}
private boolean readMagic() throws IOException {
try {
boolean hasMagic = hasNextMagic();
if (!hasMagic) {
throw new CorruptedLogFileException(
logFile + " could not be read. Did not find the magic bytes at the start of the block");
}
return hasMagic;
} catch (EOFException e) {
// We have reached the EOF
return false;
}
}
private boolean hasNextMagic() throws IOException {
// 1. Read magic header from the start of the block
inputStream.readFully(magicBuffer, 0, 6);
return Arrays.equals(magicBuffer, HoodieLogFormat.MAGIC);
}
@Override
public HoodieLogBlock next() {
try {
// hasNext() must be called before next()
return readBlock();
} catch (IOException io) {
throw new HoodieIOException("IOException when reading logblock from log file " + logFile, io);
}
}
/**
* hasPrev is not idempotent.
*/
@Override
public boolean hasPrev() {
try {
if (!this.reverseReader) {
throw new HoodieNotSupportedException("Reverse log reader has not been enabled");
}
reverseLogFilePosition = lastReverseLogFilePosition;
reverseLogFilePosition -= Long.BYTES;
lastReverseLogFilePosition = reverseLogFilePosition;
inputStream.seek(reverseLogFilePosition);
} catch (Exception e) {
// Either reached EOF while reading backwards or an exception
return false;
}
return true;
}
/**
* This is a reverse iterator Note: At any point, an instance of HoodieLogFileReader should either iterate reverse
* (prev) or forward (next). Doing both in the same instance is not supported WARNING : Every call to prev() should be
* preceded with hasPrev()
*/
@Override
public HoodieLogBlock prev() throws IOException {
if (!this.reverseReader) {
throw new HoodieNotSupportedException("Reverse log reader has not been enabled");
}
long blockSize = inputStream.readLong();
long blockEndPos = inputStream.getPos();
// blocksize should read everything about a block including the length as well
try {
inputStream.seek(reverseLogFilePosition - blockSize);
} catch (Exception e) {
// this could be a corrupt block
inputStream.seek(blockEndPos);
throw new CorruptedLogFileException("Found possible corrupted block, cannot read log file in reverse, "
+ "fallback to forward reading of logfile");
}
boolean hasNext = hasNext();
reverseLogFilePosition -= blockSize;
lastReverseLogFilePosition = reverseLogFilePosition;
return next();
}
/**
* Reverse pointer, does not read the block. Return the current position of the log file (in reverse) If the pointer
* (inputstream) is moved in any way, it is the job of the client of this class to seek/reset it back to the file
* position returned from the method to expect correct results
*/
public long moveToPrev() throws IOException {
if (!this.reverseReader) {
throw new HoodieNotSupportedException("Reverse log reader has not been enabled");
}
inputStream.seek(lastReverseLogFilePosition);
long blockSize = inputStream.readLong();
// blocksize should be everything about a block including the length as well
inputStream.seek(reverseLogFilePosition - blockSize);
reverseLogFilePosition -= blockSize;
lastReverseLogFilePosition = reverseLogFilePosition;
return reverseLogFilePosition;
}
@Override
public void remove() {
throw new UnsupportedOperationException("Remove not supported for HoodieLogFileReader");
}
/**
* Fetch the right {@link FSDataInputStream} to be used by wrapping with required input streams.
* @param fs instance of {@link FileSystem} in use.
* @param bufferSize buffer size to be used.
* @return the right {@link FSDataInputStream} as required.
*/
private static FSDataInputStream getFSDataInputStream(FileSystem fs,
HoodieLogFile logFile,
int bufferSize) throws IOException {
FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), bufferSize);
if (FSUtils.isGCSFileSystem(fs)) {
// in GCS FS, we might need to interceptor seek offsets as we might get EOF exception
return new SchemeAwareFSDataInputStream(getFSDataInputStreamForGCS(fsDataInputStream, logFile, bufferSize), true);
}
if (FSUtils.isCHDFileSystem(fs)) {
return new BoundedFsDataInputStream(fs, logFile.getPath(), fsDataInputStream);
}
if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
return new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
}
// fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
// need to wrap in another BufferedFSInputStream the make bufferSize work?
return fsDataInputStream;
}
/**
* GCS FileSystem needs some special handling for seek and hence this method assists to fetch the right {@link FSDataInputStream} to be
* used by wrapping with required input streams.
* @param fsDataInputStream original instance of {@link FSDataInputStream}.
* @param bufferSize buffer size to be used.
* @return the right {@link FSDataInputStream} as required.
*/
private static FSDataInputStream getFSDataInputStreamForGCS(FSDataInputStream fsDataInputStream,
HoodieLogFile logFile,
int bufferSize) {
// incase of GCS FS, there are two flows.
// a. fsDataInputStream.getWrappedStream() instanceof FSInputStream
// b. fsDataInputStream.getWrappedStream() not an instanceof FSInputStream, but an instance of FSDataInputStream.
// (a) is handled in the first if block and (b) is handled in the second if block. If not, we fallback to original fsDataInputStream
if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
return new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
}
if (fsDataInputStream.getWrappedStream() instanceof FSDataInputStream
&& ((FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream() instanceof FSInputStream) {
FSInputStream inputStream = (FSInputStream)((FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream();
return new TimedFSDataInputStream(logFile.getPath(),
new FSDataInputStream(new BufferedFSInputStream(inputStream, bufferSize)));
}
return fsDataInputStream;
}
}