blob: 3b7824fa449e67b950f9e5076e7513073bcf1bac [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io;
import static org.apache.beam.sdk.io.FileBasedSource.Mode.SINGLE_FILE_OR_SUBRANGE;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
import java.io.IOException;
import java.io.InputStream;
import java.io.InvalidObjectException;
import java.io.ObjectInputStream;
import java.io.ObjectStreamException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.WeakHashMap;
import javax.annotation.concurrent.GuardedBy;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileConstants;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.SeekableInput;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.VarInt;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.checkerframework.checker.nullness.qual.Nullable;
// CHECKSTYLE.OFF: JavadocStyle
/**
* Do not use in pipelines directly: most users should use {@link AvroIO.Read}.
*
* <p>A {@link FileBasedSource} for reading Avro files.
*
* <p>To read a {@link PCollection} of objects from one or more Avro files, use {@link
* AvroSource#from} to specify the path(s) of the files to read. The {@link AvroSource} that is
* returned will read objects of type {@link GenericRecord} with the schema(s) that were written at
* file creation. To further configure the {@link AvroSource} to read with a user-defined schema, or
* to return records of a type other than {@link GenericRecord}, use {@link
* AvroSource#withSchema(Schema)} (using an Avro {@link Schema}), {@link
* AvroSource#withSchema(String)} (using a JSON schema), or {@link AvroSource#withSchema(Class)} (to
* return objects of the Avro-generated class specified).
*
* <p>An {@link AvroSource} can be read from using the {@link Read} transform. For example:
*
* <pre>{@code
* AvroSource<MyType> source = AvroSource.from(file.toPath()).withSchema(MyType.class);
* PCollection<MyType> records = Read.from(mySource);
* }</pre>
*
* <p>This class's implementation is based on the <a
* href="https://avro.apache.org/docs/1.7.7/spec.html">Avro 1.7.7</a> specification and implements
* parsing of some parts of Avro Object Container Files. The rationale for doing so is that the Avro
* API does not provide efficient ways of computing the precise offsets of blocks within a file,
* which is necessary to support dynamic work rebalancing. However, whenever it is possible to use
* the Avro API in a way that supports maintaining precise offsets, this class uses the Avro API.
*
* <p>Avro Object Container files store records in blocks. Each block contains a collection of
* records. Blocks may be encoded (e.g., with bzip2, deflate, snappy, etc.). Blocks are delineated
* from one another by a 16-byte sync marker.
*
* <p>An {@link AvroSource} for a subrange of a single file contains records in the blocks such that
* the start offset of the block is greater than or equal to the start offset of the source and less
* than the end offset of the source.
*
* <p>To use XZ-encoded Avro files, please include an explicit dependency on {@code xz-1.8.jar},
* which has been marked as optional in the Maven {@code sdk/pom.xml}.
*
* <pre>{@code
* <dependency>
* <groupId>org.tukaani</groupId>
* <artifactId>xz</artifactId>
* <version>1.8</version>
* </dependency>
* }</pre>
*
* <h3>Permissions</h3>
*
* <p>Permission requirements depend on the {@link PipelineRunner} that is used to execute the
* pipeline. Please refer to the documentation of corresponding {@link PipelineRunner}s for more
* details.
*
* @param <T> The type of records to be read from the source.
*/
// CHECKSTYLE.ON: JavadocStyle
@Experimental(Kind.SOURCE_SINK)
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class AvroSource<T> extends BlockBasedSource<T> {
// Default minimum bundle size (chosen as two default-size Avro blocks to attempt to
// ensure that every source has at least one block of records).
// The default sync interval is 64k.
private static final long DEFAULT_MIN_BUNDLE_SIZE = 2L * DataFileConstants.DEFAULT_SYNC_INTERVAL;
@FunctionalInterface
public interface DatumReaderFactory<T> extends Serializable {
DatumReader<T> apply(Schema writer, Schema reader);
}
private static final DatumReaderFactory<?> GENERIC_DATUM_READER_FACTORY = GenericDatumReader::new;
private static final DatumReaderFactory<?> REFLECT_DATUM_READER_FACTORY = ReflectDatumReader::new;
// Use cases of AvroSource are:
// 1) AvroSource<GenericRecord> Reading GenericRecord records with a specified schema.
// 2) AvroSource<Foo> Reading records of a generated Avro class Foo.
// 3) AvroSource<T> Reading GenericRecord records with an unspecified schema
// and converting them to type T.
// | Case 1 | Case 2 | Case 3 |
// type | GenericRecord | Foo | GenericRecord |
// readerSchemaString | non-null | non-null | null |
// parseFn | null | null | non-null |
// outputCoder | null | null | non-null |
// readerFactory | either | either | either |
private static class Mode<T> implements Serializable {
private final Class<?> type;
// The JSON schema used to decode records.
private @Nullable String readerSchemaString;
private final @Nullable SerializableFunction<GenericRecord, T> parseFn;
private final @Nullable Coder<T> outputCoder;
private final @Nullable DatumReaderFactory<?> readerFactory;
private Mode(
Class<?> type,
@Nullable String readerSchemaString,
@Nullable SerializableFunction<GenericRecord, T> parseFn,
@Nullable Coder<T> outputCoder,
@Nullable DatumReaderFactory<?> readerFactory) {
this.type = type;
this.readerSchemaString = internSchemaString(readerSchemaString);
this.parseFn = parseFn;
this.outputCoder = outputCoder;
this.readerFactory = readerFactory;
}
private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException {
is.defaultReadObject();
readerSchemaString = internSchemaString(readerSchemaString);
}
private Coder<T> getOutputCoder() {
if (parseFn == null) {
return AvroCoder.of((Class<T>) type, internOrParseSchemaString(readerSchemaString));
} else {
return outputCoder;
}
}
private void validate() {
if (parseFn == null) {
checkArgument(
readerSchemaString != null,
"schema must be specified using withSchema() when not using a parse fn");
}
}
private Mode<T> withReaderFactory(DatumReaderFactory<?> factory) {
return new Mode<>(type, readerSchemaString, parseFn, outputCoder, factory);
}
private DatumReader<?> createReader(Schema writerSchema, Schema readerSchema) {
DatumReaderFactory<?> factory = this.readerFactory;
if (factory == null) {
factory =
(type == GenericRecord.class)
? GENERIC_DATUM_READER_FACTORY
: REFLECT_DATUM_READER_FACTORY;
}
return factory.apply(writerSchema, readerSchema);
}
}
private static Mode<GenericRecord> readGenericRecordsWithSchema(
String schema, @Nullable DatumReaderFactory<?> factory) {
return new Mode<>(GenericRecord.class, schema, null, null, factory);
}
private static <T> Mode<T> readGeneratedClasses(
Class<T> clazz, @Nullable DatumReaderFactory<?> factory) {
return new Mode<>(clazz, ReflectData.get().getSchema(clazz).toString(), null, null, factory);
}
private static <T> Mode<T> parseGenericRecords(
SerializableFunction<GenericRecord, T> parseFn,
Coder<T> outputCoder,
@Nullable DatumReaderFactory<?> factory) {
return new Mode<>(GenericRecord.class, null, parseFn, outputCoder, factory);
}
private final Mode<T> mode;
/**
* Reads from the given file name or pattern ("glob"). The returned source needs to be further
* configured by calling {@link #withSchema} to return a type other than {@link GenericRecord}.
*/
public static AvroSource<GenericRecord> from(ValueProvider<String> fileNameOrPattern) {
return new AvroSource<>(
fileNameOrPattern,
EmptyMatchTreatment.DISALLOW,
DEFAULT_MIN_BUNDLE_SIZE,
readGenericRecordsWithSchema(null /* will need to be specified in withSchema */, null));
}
public static AvroSource<GenericRecord> from(Metadata metadata) {
return new AvroSource<>(
metadata,
DEFAULT_MIN_BUNDLE_SIZE,
0,
metadata.sizeBytes(),
readGenericRecordsWithSchema(null /* will need to be specified in withSchema */, null));
}
/** Like {@link #from(ValueProvider)}. */
public static AvroSource<GenericRecord> from(String fileNameOrPattern) {
return from(ValueProvider.StaticValueProvider.of(fileNameOrPattern));
}
public AvroSource<T> withEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment) {
return new AvroSource<>(
getFileOrPatternSpecProvider(), emptyMatchTreatment, getMinBundleSize(), mode);
}
/** Reads files containing records that conform to the given schema. */
public AvroSource<GenericRecord> withSchema(String schema) {
checkArgument(schema != null, "schema can not be null");
return new AvroSource<>(
getFileOrPatternSpecProvider(),
getEmptyMatchTreatment(),
getMinBundleSize(),
readGenericRecordsWithSchema(schema, mode.readerFactory));
}
/** Like {@link #withSchema(String)}. */
public AvroSource<GenericRecord> withSchema(Schema schema) {
checkArgument(schema != null, "schema can not be null");
return withSchema(schema.toString());
}
/** Reads files containing records of the given class. */
public <X> AvroSource<X> withSchema(Class<X> clazz) {
checkArgument(clazz != null, "clazz can not be null");
if (getMode() == SINGLE_FILE_OR_SUBRANGE) {
return new AvroSource<>(
getSingleFileMetadata(),
getMinBundleSize(),
getStartOffset(),
getEndOffset(),
readGeneratedClasses(clazz, mode.readerFactory));
}
return new AvroSource<>(
getFileOrPatternSpecProvider(),
getEmptyMatchTreatment(),
getMinBundleSize(),
readGeneratedClasses(clazz, mode.readerFactory));
}
/**
* Reads {@link GenericRecord} of unspecified schema and maps them to instances of a custom type
* using the given {@code parseFn} and encoded using the given coder.
*/
public <X> AvroSource<X> withParseFn(
SerializableFunction<GenericRecord, X> parseFn, Coder<X> coder) {
checkArgument(parseFn != null, "parseFn can not be null");
checkArgument(coder != null, "coder can not be null");
if (getMode() == SINGLE_FILE_OR_SUBRANGE) {
return new AvroSource<>(
getSingleFileMetadata(),
getMinBundleSize(),
getStartOffset(),
getEndOffset(),
parseGenericRecords(parseFn, coder, mode.readerFactory));
}
return new AvroSource<>(
getFileOrPatternSpecProvider(),
getEmptyMatchTreatment(),
getMinBundleSize(),
parseGenericRecords(parseFn, coder, mode.readerFactory));
}
/**
* Sets the minimum bundle size. Refer to {@link OffsetBasedSource} for a description of {@code
* minBundleSize} and its use.
*/
public AvroSource<T> withMinBundleSize(long minBundleSize) {
if (getMode() == SINGLE_FILE_OR_SUBRANGE) {
return new AvroSource<>(
getSingleFileMetadata(), minBundleSize, getStartOffset(), getEndOffset(), mode);
}
return new AvroSource<>(
getFileOrPatternSpecProvider(), getEmptyMatchTreatment(), minBundleSize, mode);
}
public AvroSource<T> withDatumReaderFactory(DatumReaderFactory<?> factory) {
Mode<T> newMode = mode.withReaderFactory(factory);
if (getMode() == SINGLE_FILE_OR_SUBRANGE) {
return new AvroSource<>(
getSingleFileMetadata(), getMinBundleSize(), getStartOffset(), getEndOffset(), newMode);
}
return new AvroSource<>(
getFileOrPatternSpecProvider(), getEmptyMatchTreatment(), getMinBundleSize(), newMode);
}
/** Constructor for FILEPATTERN mode. */
private AvroSource(
ValueProvider<String> fileNameOrPattern,
EmptyMatchTreatment emptyMatchTreatment,
long minBundleSize,
Mode<T> mode) {
super(fileNameOrPattern, emptyMatchTreatment, minBundleSize);
this.mode = mode;
}
/** Constructor for SINGLE_FILE_OR_SUBRANGE mode. */
private AvroSource(
Metadata metadata, long minBundleSize, long startOffset, long endOffset, Mode<T> mode) {
super(metadata, minBundleSize, startOffset, endOffset);
this.mode = mode;
}
@Override
public void validate() {
super.validate();
mode.validate();
}
/**
* Used by the Dataflow worker. Do not introduce new usages. Do not delete without confirming that
* Dataflow ValidatesRunner tests pass.
*
* @deprecated Used by Dataflow worker
*/
@Deprecated
public BlockBasedSource<T> createForSubrangeOfFile(String fileName, long start, long end)
throws IOException {
return createForSubrangeOfFile(FileSystems.matchSingleFileSpec(fileName), start, end);
}
@Override
public BlockBasedSource<T> createForSubrangeOfFile(Metadata fileMetadata, long start, long end) {
return new AvroSource<>(fileMetadata, getMinBundleSize(), start, end, mode);
}
@Override
protected BlockBasedReader<T> createSingleFileReader(PipelineOptions options) {
return new AvroReader<>(this);
}
@Override
public Coder<T> getOutputCoder() {
return mode.getOutputCoder();
}
@VisibleForTesting
@Nullable
String getReaderSchemaString() {
return mode.readerSchemaString;
}
/** Avro file metadata. */
@VisibleForTesting
static class AvroMetadata {
private final byte[] syncMarker;
private final String codec;
private final String schemaString;
AvroMetadata(byte[] syncMarker, String codec, String schemaString) {
this.syncMarker = checkNotNull(syncMarker, "syncMarker");
this.codec = checkNotNull(codec, "codec");
this.schemaString = internSchemaString(checkNotNull(schemaString, "schemaString"));
}
/**
* The JSON-encoded <a href="https://avro.apache.org/docs/1.7.7/spec.html#schemas">schema</a>
* string for the file.
*/
public String getSchemaString() {
return schemaString;
}
/**
* The <a href="https://avro.apache.org/docs/1.7.7/spec.html#Required+Codecs">codec</a> of the
* file.
*/
public String getCodec() {
return codec;
}
/**
* The 16-byte sync marker for the file. See the documentation for <a
* href="https://avro.apache.org/docs/1.7.7/spec.html#Object+Container+Files">Object Container
* File</a> for more information.
*/
public byte[] getSyncMarker() {
return syncMarker;
}
}
/**
* Reads the {@link AvroMetadata} from the header of an Avro file.
*
* <p>This method parses the header of an Avro <a
* href="https://avro.apache.org/docs/1.7.7/spec.html#Object+Container+Files">Object Container
* File</a>.
*
* @throws IOException if the file is an invalid format.
*/
@VisibleForTesting
static AvroMetadata readMetadataFromFile(ResourceId fileResource) throws IOException {
String codec = null;
String schemaString = null;
byte[] syncMarker;
try (InputStream stream = Channels.newInputStream(FileSystems.open(fileResource))) {
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null);
// The header of an object container file begins with a four-byte magic number, followed
// by the file metadata (including the schema and codec), encoded as a map. Finally, the
// header ends with the file's 16-byte sync marker.
// See https://avro.apache.org/docs/1.7.7/spec.html#Object+Container+Files for details on
// the encoding of container files.
// Read the magic number.
byte[] magic = new byte[DataFileConstants.MAGIC.length];
decoder.readFixed(magic);
if (!Arrays.equals(magic, DataFileConstants.MAGIC)) {
throw new IOException("Missing Avro file signature: " + fileResource);
}
// Read the metadata to find the codec and schema.
ByteBuffer valueBuffer = ByteBuffer.allocate(512);
long numRecords = decoder.readMapStart();
while (numRecords > 0) {
for (long recordIndex = 0; recordIndex < numRecords; recordIndex++) {
String key = decoder.readString();
// readBytes() clears the buffer and returns a buffer where:
// - position is the start of the bytes read
// - limit is the end of the bytes read
valueBuffer = decoder.readBytes(valueBuffer);
byte[] bytes = new byte[valueBuffer.remaining()];
valueBuffer.get(bytes);
if (key.equals(DataFileConstants.CODEC)) {
codec = new String(bytes, StandardCharsets.UTF_8);
} else if (key.equals(DataFileConstants.SCHEMA)) {
schemaString = new String(bytes, StandardCharsets.UTF_8);
}
}
numRecords = decoder.mapNext();
}
if (codec == null) {
codec = DataFileConstants.NULL_CODEC;
}
// Finally, read the sync marker.
syncMarker = new byte[DataFileConstants.SYNC_SIZE];
decoder.readFixed(syncMarker);
}
checkState(schemaString != null, "No schema present in Avro file metadata %s", fileResource);
return new AvroMetadata(syncMarker, codec, schemaString);
}
// A logical reference cache used to store schemas and schema strings to allow us to
// "intern" values and reduce the number of copies of equivalent objects.
private static final Map<String, Schema> schemaLogicalReferenceCache = new WeakHashMap<>();
private static final Map<String, String> schemaStringLogicalReferenceCache = new WeakHashMap<>();
// We avoid String.intern() because depending on the JVM, these may be added to the PermGenSpace
// which we want to avoid otherwise we could run out of PermGenSpace.
private static synchronized String internSchemaString(String schema) {
String internSchema = schemaStringLogicalReferenceCache.get(schema);
if (internSchema != null) {
return internSchema;
}
schemaStringLogicalReferenceCache.put(schema, schema);
return schema;
}
static synchronized Schema internOrParseSchemaString(String schemaString) {
Schema schema = schemaLogicalReferenceCache.get(schemaString);
if (schema != null) {
return schema;
}
Schema.Parser parser = new Schema.Parser();
schema = parser.parse(schemaString);
schemaLogicalReferenceCache.put(schemaString, schema);
return schema;
}
// Reading the object from Java serialization typically does not go through the constructor,
// we use readResolve to replace the constructed instance with one which uses the constructor
// allowing us to intern any schemas.
@SuppressWarnings("unused")
private Object readResolve() throws ObjectStreamException {
switch (getMode()) {
case SINGLE_FILE_OR_SUBRANGE:
return new AvroSource<>(
getSingleFileMetadata(), getMinBundleSize(), getStartOffset(), getEndOffset(), mode);
case FILEPATTERN:
return new AvroSource<>(
getFileOrPatternSpecProvider(), getEmptyMatchTreatment(), getMinBundleSize(), mode);
default:
throw new InvalidObjectException(
String.format("Unknown mode %s for AvroSource %s", getMode(), this));
}
}
/**
* A {@link BlockBasedSource.Block} of Avro records.
*
* @param <T> The type of records stored in the block.
*/
@Experimental(Kind.SOURCE_SINK)
static class AvroBlock<T> extends Block<T> {
// The current record in the block. Initialized in readNextRecord.
private @Nullable T currentRecord;
// The index of the current record in the block.
private long currentRecordIndex = 0;
private final Iterator<?> iterator;
private final SerializableFunction<GenericRecord, T> parseFn;
private final long numRecordsInBlock;
AvroBlock(
Iterator<?> iter, SerializableFunction<GenericRecord, T> parseFn, long numRecordsInBlock) {
this.iterator = iter;
this.parseFn = parseFn;
this.numRecordsInBlock = numRecordsInBlock;
}
@Override
public T getCurrentRecord() {
return currentRecord;
}
@Override
public boolean readNextRecord() {
if (currentRecordIndex >= numRecordsInBlock) {
return false;
}
Object record = iterator.next();
currentRecord = (parseFn == null) ? ((T) record) : parseFn.apply((GenericRecord) record);
currentRecordIndex++;
return true;
}
@Override
public double getFractionOfBlockConsumed() {
return ((double) currentRecordIndex) / numRecordsInBlock;
}
}
/**
* A {@link BlockBasedSource.BlockBasedReader} for reading blocks from Avro files.
*
* <p>An Avro Object Container File consists of a header followed by a 16-bit sync marker and then
* a sequence of blocks, where each block begins with two encoded longs representing the total
* number of records in the block and the block's size in bytes, followed by the block's
* (optionally-encoded) records. Each block is terminated by a 16-bit sync marker.
*
* @param <T> The type of records contained in the block.
*/
@Experimental(Kind.SOURCE_SINK)
public static class AvroReader<T> extends BlockBasedReader<T> {
private static class SeekableChannelInput implements SeekableInput {
private final SeekableByteChannel channel;
private final InputStream input;
SeekableChannelInput(SeekableByteChannel channel) {
this.channel = channel;
this.input = Channels.newInputStream(channel);
}
@Override
public void seek(long p) throws IOException {
channel.position(p);
}
@Override
public long tell() throws IOException {
return channel.position();
}
@Override
public long length() throws IOException {
return channel.size();
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
return input.read(b, off, len);
}
@Override
public void close() throws IOException {
channel.close();
}
}
// The current block.
// Initialized in readNextRecord.
private @Nullable AvroBlock<T> currentBlock;
private @Nullable DataFileReader<?> dataFileReader;
// A lock used to synchronize block offsets for getRemainingParallelism
private final Object progressLock = new Object();
// Offset of the current block.
@GuardedBy("progressLock")
private long currentBlockOffset = 0;
// Size of the current block.
@GuardedBy("progressLock")
private long currentBlockSizeBytes = 0;
/** Reads Avro records of type {@code T} from the specified source. */
public AvroReader(AvroSource<T> source) {
super(source);
}
@Override
public synchronized AvroSource<T> getCurrentSource() {
return (AvroSource<T>) super.getCurrentSource();
}
// Precondition: the stream is positioned after the sync marker in the current (about to be
// previous) block. currentBlockSize equals the size of the current block, or zero if this
// reader was just started.
//
// Postcondition: same as above, but for the new current (formerly next) block.
@Override
public boolean readNextBlock() {
if (!dataFileReader.hasNext()) {
return false;
}
long headerLength =
(long) VarInt.getLength(dataFileReader.getBlockCount())
+ VarInt.getLength(dataFileReader.getBlockSize())
+ DataFileConstants.SYNC_SIZE;
currentBlock =
new AvroBlock<>(
dataFileReader, getCurrentSource().mode.parseFn, dataFileReader.getBlockCount());
// Atomically update both the position and offset of the new block.
synchronized (progressLock) {
currentBlockOffset = dataFileReader.previousSync();
// Total block size includes the header, block content, and trailing sync marker.
currentBlockSizeBytes = dataFileReader.getBlockSize() + headerLength;
}
return true;
}
@Override
public AvroBlock<T> getCurrentBlock() {
return currentBlock;
}
@Override
public long getCurrentBlockOffset() {
synchronized (progressLock) {
return currentBlockOffset;
}
}
@Override
public long getCurrentBlockSize() {
synchronized (progressLock) {
return currentBlockSizeBytes;
}
}
@Override
public long getSplitPointsRemaining() {
if (isDone()) {
return 0;
}
synchronized (progressLock) {
if (currentBlockOffset + currentBlockSizeBytes >= getCurrentSource().getEndOffset()) {
// This block is known to be the last block in the range.
return 1;
}
}
return super.getSplitPointsRemaining();
}
// Postcondition: the stream is positioned at the beginning of the first block after the start
// of the current source, and currentBlockOffset is that position. Additionally,
// currentBlockSizeBytes will be set to 0 indicating that the previous block was empty.
@Override
protected void startReading(ReadableByteChannel channel) throws IOException {
SeekableChannelInput seekableChannelInput =
new SeekableChannelInput((SeekableByteChannel) channel);
// the channel needs to be at the beginning of the file in order for the DataFileReader to
// read the header, etc, we'll seek it back to where it should be after creating the DFR.
seekableChannelInput.seek(0);
Schema readerSchema = null;
String readerSchemaString = this.getCurrentSource().getReaderSchemaString();
if (readerSchemaString != null) {
readerSchema = AvroSource.internOrParseSchemaString(readerSchemaString);
}
// the DataFileReader will call setSchema with the writer schema when created.
DatumReader<?> reader = this.getCurrentSource().mode.createReader(readerSchema, readerSchema);
dataFileReader = new DataFileReader<>(seekableChannelInput, reader);
long startOffset = getCurrentSource().getStartOffset();
if (startOffset != 0) {
// the start offset may be in the middle of a sync marker, by rewinding SYNC_SIZE bytes we
// ensure that we won't miss the block if so.
dataFileReader.sync(Math.max(0, startOffset - DataFileConstants.SYNC_SIZE));
}
synchronized (progressLock) {
currentBlockOffset = dataFileReader.previousSync();
currentBlockSizeBytes = 0;
}
}
}
}