blob: 1f31b02fe086a396565478849d11bc553e63d68c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io;
import static org.apache.beam.sdk.io.FileBasedSource.Mode.SINGLE_FILE_OR_SUBRANGE;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
import java.io.ByteArrayInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InvalidObjectException;
import java.io.ObjectInputStream;
import java.io.ObjectStreamException;
import java.io.PushbackInputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.zip.Inflater;
import java.util.zip.InflaterInputStream;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileConstants;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.compress.compressors.snappy.SnappyCompressorInputStream;
import org.apache.commons.compress.compressors.xz.XZCompressorInputStream;
import org.apache.commons.compress.utils.CountingInputStream;
import org.apache.commons.compress.utils.IOUtils;
// CHECKSTYLE.OFF: JavadocStyle
/**
* Do not use in pipelines directly: most users should use {@link AvroIO.Read}.
*
* <p>A {@link FileBasedSource} for reading Avro files.
*
* <p>To read a {@link PCollection} of objects from one or more Avro files, use {@link
* AvroSource#from} to specify the path(s) of the files to read. The {@link AvroSource} that is
* returned will read objects of type {@link GenericRecord} with the schema(s) that were written at
* file creation. To further configure the {@link AvroSource} to read with a user-defined schema, or
* to return records of a type other than {@link GenericRecord}, use {@link
* AvroSource#withSchema(Schema)} (using an Avro {@link Schema}), {@link
* AvroSource#withSchema(String)} (using a JSON schema), or {@link AvroSource#withSchema(Class)} (to
* return objects of the Avro-generated class specified).
*
* <p>An {@link AvroSource} can be read from using the {@link Read} transform. For example:
*
* <pre>{@code
* AvroSource<MyType> source = AvroSource.from(file.toPath()).withSchema(MyType.class);
* PCollection<MyType> records = Read.from(mySource);
* }</pre>
*
* <p>This class's implementation is based on the <a
* href="https://avro.apache.org/docs/1.7.7/spec.html">Avro 1.7.7</a> specification and implements
* parsing of some parts of Avro Object Container Files. The rationale for doing so is that the Avro
* API does not provide efficient ways of computing the precise offsets of blocks within a file,
* which is necessary to support dynamic work rebalancing. However, whenever it is possible to use
* the Avro API in a way that supports maintaining precise offsets, this class uses the Avro API.
*
* <p>Avro Object Container files store records in blocks. Each block contains a collection of
* records. Blocks may be encoded (e.g., with bzip2, deflate, snappy, etc.). Blocks are delineated
* from one another by a 16-byte sync marker.
*
* <p>An {@link AvroSource} for a subrange of a single file contains records in the blocks such that
* the start offset of the block is greater than or equal to the start offset of the source and less
* than the end offset of the source.
*
* <p>To use XZ-encoded Avro files, please include an explicit dependency on {@code xz-1.8.jar},
* which has been marked as optional in the Maven {@code sdk/pom.xml}.
*
* <pre>{@code
* <dependency>
* <groupId>org.tukaani</groupId>
* <artifactId>xz</artifactId>
* <version>1.8</version>
* </dependency>
* }</pre>
*
* <h3>Permissions</h3>
*
* <p>Permission requirements depend on the {@link PipelineRunner} that is used to execute the
* pipeline. Please refer to the documentation of corresponding {@link PipelineRunner}s for more
* details.
*
* @param <T> The type of records to be read from the source.
*/
// CHECKSTYLE.ON: JavadocStyle
@Experimental(Experimental.Kind.SOURCE_SINK)
public class AvroSource<T> extends BlockBasedSource<T> {
// Default minimum bundle size (chosen as two default-size Avro blocks to attempt to
// ensure that every source has at least one block of records).
// The default sync interval is 64k.
private static final long DEFAULT_MIN_BUNDLE_SIZE = 2L * DataFileConstants.DEFAULT_SYNC_INTERVAL;
// Use cases of AvroSource are:
// 1) AvroSource<GenericRecord> Reading GenericRecord records with a specified schema.
// 2) AvroSource<Foo> Reading records of a generated Avro class Foo.
// 3) AvroSource<T> Reading GenericRecord records with an unspecified schema
// and converting them to type T.
// | Case 1 | Case 2 | Case 3 |
// type | GenericRecord | Foo | GenericRecord |
// readerSchemaString | non-null | non-null | null |
// parseFn | null | null | non-null |
// outputCoder | null | null | non-null |
private static class Mode<T> implements Serializable {
private final Class<?> type;
// The JSON schema used to decode records.
@Nullable private String readerSchemaString;
@Nullable private final SerializableFunction<GenericRecord, T> parseFn;
@Nullable private final Coder<T> outputCoder;
private Mode(
Class<?> type,
@Nullable String readerSchemaString,
@Nullable SerializableFunction<GenericRecord, T> parseFn,
@Nullable Coder<T> outputCoder) {
this.type = type;
this.readerSchemaString = internSchemaString(readerSchemaString);
this.parseFn = parseFn;
this.outputCoder = outputCoder;
}
private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException {
is.defaultReadObject();
readerSchemaString = internSchemaString(readerSchemaString);
}
private Coder<T> getOutputCoder() {
if (parseFn == null) {
return AvroCoder.of((Class<T>) type, internOrParseSchemaString(readerSchemaString));
} else {
return outputCoder;
}
}
private void validate() {
if (parseFn == null) {
checkArgument(
readerSchemaString != null,
"schema must be specified using withSchema() when not using a parse fn");
}
}
}
private static Mode<GenericRecord> readGenericRecordsWithSchema(String schema) {
return new Mode<>(GenericRecord.class, schema, null, null);
}
private static <T> Mode<T> readGeneratedClasses(Class<T> clazz) {
return new Mode<>(clazz, ReflectData.get().getSchema(clazz).toString(), null, null);
}
private static <T> Mode<T> parseGenericRecords(
SerializableFunction<GenericRecord, T> parseFn, Coder<T> outputCoder) {
return new Mode<>(GenericRecord.class, null, parseFn, outputCoder);
}
private final Mode<T> mode;
/**
* Reads from the given file name or pattern ("glob"). The returned source needs to be further
* configured by calling {@link #withSchema} to return a type other than {@link GenericRecord}.
*/
public static AvroSource<GenericRecord> from(ValueProvider<String> fileNameOrPattern) {
return new AvroSource<>(
fileNameOrPattern,
EmptyMatchTreatment.DISALLOW,
DEFAULT_MIN_BUNDLE_SIZE,
readGenericRecordsWithSchema(null /* will need to be specified in withSchema */));
}
public static AvroSource<GenericRecord> from(Metadata metadata) {
return new AvroSource<>(
metadata,
DEFAULT_MIN_BUNDLE_SIZE,
0,
metadata.sizeBytes(),
readGenericRecordsWithSchema(null /* will need to be specified in withSchema */));
}
/** Like {@link #from(ValueProvider)}. */
public static AvroSource<GenericRecord> from(String fileNameOrPattern) {
return from(ValueProvider.StaticValueProvider.of(fileNameOrPattern));
}
public AvroSource<T> withEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment) {
return new AvroSource<>(
getFileOrPatternSpecProvider(), emptyMatchTreatment, getMinBundleSize(), mode);
}
/** Reads files containing records that conform to the given schema. */
public AvroSource<GenericRecord> withSchema(String schema) {
checkArgument(schema != null, "schema can not be null");
return new AvroSource<>(
getFileOrPatternSpecProvider(),
getEmptyMatchTreatment(),
getMinBundleSize(),
readGenericRecordsWithSchema(schema));
}
/** Like {@link #withSchema(String)}. */
public AvroSource<GenericRecord> withSchema(Schema schema) {
checkArgument(schema != null, "schema can not be null");
return withSchema(schema.toString());
}
/** Reads files containing records of the given class. */
public <X> AvroSource<X> withSchema(Class<X> clazz) {
checkArgument(clazz != null, "clazz can not be null");
if (getMode() == SINGLE_FILE_OR_SUBRANGE) {
return new AvroSource<>(
getSingleFileMetadata(),
getMinBundleSize(),
getStartOffset(),
getEndOffset(),
readGeneratedClasses(clazz));
}
return new AvroSource<>(
getFileOrPatternSpecProvider(),
getEmptyMatchTreatment(),
getMinBundleSize(),
readGeneratedClasses(clazz));
}
/**
* Reads {@link GenericRecord} of unspecified schema and maps them to instances of a custom type
* using the given {@code parseFn} and encoded using the given coder.
*/
public <X> AvroSource<X> withParseFn(
SerializableFunction<GenericRecord, X> parseFn, Coder<X> coder) {
checkArgument(parseFn != null, "parseFn can not be null");
checkArgument(coder != null, "coder can not be null");
if (getMode() == SINGLE_FILE_OR_SUBRANGE) {
return new AvroSource<>(
getSingleFileMetadata(),
getMinBundleSize(),
getStartOffset(),
getEndOffset(),
parseGenericRecords(parseFn, coder));
}
return new AvroSource<>(
getFileOrPatternSpecProvider(),
getEmptyMatchTreatment(),
getMinBundleSize(),
parseGenericRecords(parseFn, coder));
}
/**
* Sets the minimum bundle size. Refer to {@link OffsetBasedSource} for a description of {@code
* minBundleSize} and its use.
*/
public AvroSource<T> withMinBundleSize(long minBundleSize) {
if (getMode() == SINGLE_FILE_OR_SUBRANGE) {
return new AvroSource<>(
getSingleFileMetadata(), minBundleSize, getStartOffset(), getEndOffset(), mode);
}
return new AvroSource<>(
getFileOrPatternSpecProvider(), getEmptyMatchTreatment(), minBundleSize, mode);
}
/** Constructor for FILEPATTERN mode. */
private AvroSource(
ValueProvider<String> fileNameOrPattern,
EmptyMatchTreatment emptyMatchTreatment,
long minBundleSize,
Mode<T> mode) {
super(fileNameOrPattern, emptyMatchTreatment, minBundleSize);
this.mode = mode;
}
/** Constructor for SINGLE_FILE_OR_SUBRANGE mode. */
private AvroSource(
Metadata metadata, long minBundleSize, long startOffset, long endOffset, Mode<T> mode) {
super(metadata, minBundleSize, startOffset, endOffset);
this.mode = mode;
}
@Override
public void validate() {
super.validate();
mode.validate();
}
/**
* Used by the Dataflow worker. Do not introduce new usages. Do not delete without confirming that
* Dataflow ValidatesRunner tests pass.
*
* @deprecated Used by Dataflow worker
*/
@Deprecated
public BlockBasedSource<T> createForSubrangeOfFile(String fileName, long start, long end)
throws IOException {
return createForSubrangeOfFile(FileSystems.matchSingleFileSpec(fileName), start, end);
}
@Override
public BlockBasedSource<T> createForSubrangeOfFile(Metadata fileMetadata, long start, long end) {
return new AvroSource<>(fileMetadata, getMinBundleSize(), start, end, mode);
}
@Override
protected BlockBasedReader<T> createSingleFileReader(PipelineOptions options) {
return new AvroReader<>(this);
}
@Override
public Coder<T> getOutputCoder() {
return mode.getOutputCoder();
}
@VisibleForTesting
@Nullable
String getReaderSchemaString() {
return mode.readerSchemaString;
}
/** Avro file metadata. */
@VisibleForTesting
static class AvroMetadata {
private final byte[] syncMarker;
private final String codec;
private final String schemaString;
AvroMetadata(byte[] syncMarker, String codec, String schemaString) {
this.syncMarker = checkNotNull(syncMarker, "syncMarker");
this.codec = checkNotNull(codec, "codec");
this.schemaString = internSchemaString(checkNotNull(schemaString, "schemaString"));
}
/**
* The JSON-encoded <a href="https://avro.apache.org/docs/1.7.7/spec.html#schemas">schema</a>
* string for the file.
*/
public String getSchemaString() {
return schemaString;
}
/**
* The <a href="https://avro.apache.org/docs/1.7.7/spec.html#Required+Codecs">codec</a> of the
* file.
*/
public String getCodec() {
return codec;
}
/**
* The 16-byte sync marker for the file. See the documentation for <a
* href="https://avro.apache.org/docs/1.7.7/spec.html#Object+Container+Files">Object Container
* File</a> for more information.
*/
public byte[] getSyncMarker() {
return syncMarker;
}
}
/**
* Reads the {@link AvroMetadata} from the header of an Avro file.
*
* <p>This method parses the header of an Avro <a
* href="https://avro.apache.org/docs/1.7.7/spec.html#Object+Container+Files">Object Container
* File</a>.
*
* @throws IOException if the file is an invalid format.
*/
@VisibleForTesting
static AvroMetadata readMetadataFromFile(ResourceId fileResource) throws IOException {
String codec = null;
String schemaString = null;
byte[] syncMarker;
try (InputStream stream = Channels.newInputStream(FileSystems.open(fileResource))) {
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null);
// The header of an object container file begins with a four-byte magic number, followed
// by the file metadata (including the schema and codec), encoded as a map. Finally, the
// header ends with the file's 16-byte sync marker.
// See https://avro.apache.org/docs/1.7.7/spec.html#Object+Container+Files for details on
// the encoding of container files.
// Read the magic number.
byte[] magic = new byte[DataFileConstants.MAGIC.length];
decoder.readFixed(magic);
if (!Arrays.equals(magic, DataFileConstants.MAGIC)) {
throw new IOException("Missing Avro file signature: " + fileResource);
}
// Read the metadata to find the codec and schema.
ByteBuffer valueBuffer = ByteBuffer.allocate(512);
long numRecords = decoder.readMapStart();
while (numRecords > 0) {
for (long recordIndex = 0; recordIndex < numRecords; recordIndex++) {
String key = decoder.readString();
// readBytes() clears the buffer and returns a buffer where:
// - position is the start of the bytes read
// - limit is the end of the bytes read
valueBuffer = decoder.readBytes(valueBuffer);
byte[] bytes = new byte[valueBuffer.remaining()];
valueBuffer.get(bytes);
if (key.equals(DataFileConstants.CODEC)) {
codec = new String(bytes, StandardCharsets.UTF_8);
} else if (key.equals(DataFileConstants.SCHEMA)) {
schemaString = new String(bytes, StandardCharsets.UTF_8);
}
}
numRecords = decoder.mapNext();
}
if (codec == null) {
codec = DataFileConstants.NULL_CODEC;
}
// Finally, read the sync marker.
syncMarker = new byte[DataFileConstants.SYNC_SIZE];
decoder.readFixed(syncMarker);
}
checkState(schemaString != null, "No schema present in Avro file metadata %s", fileResource);
return new AvroMetadata(syncMarker, codec, schemaString);
}
// A logical reference cache used to store schemas and schema strings to allow us to
// "intern" values and reduce the number of copies of equivalent objects.
private static final Map<String, Schema> schemaLogicalReferenceCache = new WeakHashMap<>();
private static final Map<String, String> schemaStringLogicalReferenceCache = new WeakHashMap<>();
// We avoid String.intern() because depending on the JVM, these may be added to the PermGenSpace
// which we want to avoid otherwise we could run out of PermGenSpace.
private static synchronized String internSchemaString(String schema) {
String internSchema = schemaStringLogicalReferenceCache.get(schema);
if (internSchema != null) {
return internSchema;
}
schemaStringLogicalReferenceCache.put(schema, schema);
return schema;
}
private static synchronized Schema internOrParseSchemaString(String schemaString) {
Schema schema = schemaLogicalReferenceCache.get(schemaString);
if (schema != null) {
return schema;
}
Schema.Parser parser = new Schema.Parser();
schema = parser.parse(schemaString);
schemaLogicalReferenceCache.put(schemaString, schema);
return schema;
}
// Reading the object from Java serialization typically does not go through the constructor,
// we use readResolve to replace the constructed instance with one which uses the constructor
// allowing us to intern any schemas.
@SuppressWarnings("unused")
private Object readResolve() throws ObjectStreamException {
switch (getMode()) {
case SINGLE_FILE_OR_SUBRANGE:
return new AvroSource<>(
getSingleFileMetadata(), getMinBundleSize(), getStartOffset(), getEndOffset(), mode);
case FILEPATTERN:
return new AvroSource<>(
getFileOrPatternSpecProvider(), getEmptyMatchTreatment(), getMinBundleSize(), mode);
default:
throw new InvalidObjectException(
String.format("Unknown mode %s for AvroSource %s", getMode(), this));
}
}
/**
* A {@link BlockBasedSource.Block} of Avro records.
*
* @param <T> The type of records stored in the block.
*/
@Experimental(Experimental.Kind.SOURCE_SINK)
static class AvroBlock<T> extends Block<T> {
private final Mode<T> mode;
// The number of records in the block.
private final long numRecords;
// The current record in the block. Initialized in readNextRecord.
@Nullable private T currentRecord;
// The index of the current record in the block.
private long currentRecordIndex = 0;
// A DatumReader to read records from the block.
private final DatumReader<?> reader;
// A BinaryDecoder used by the reader to decode records.
private final BinaryDecoder decoder;
/**
* Decodes a byte array as an InputStream. The byte array may be compressed using some codec.
* Reads from the returned stream will result in decompressed bytes.
*
* <p>This supports the same codecs as Avro's {@link CodecFactory}, namely those defined in
* {@link DataFileConstants}.
*
* <ul>
* <li>"snappy" : Google's Snappy compression
* <li>"deflate" : deflate compression
* <li>"bzip2" : Bzip2 compression
* <li>"xz" : xz compression
* <li>"null" (the string, not the value): Uncompressed data
* </ul>
*/
private static InputStream decodeAsInputStream(byte[] data, String codec) throws IOException {
ByteArrayInputStream byteStream = new ByteArrayInputStream(data);
switch (codec) {
case DataFileConstants.SNAPPY_CODEC:
return new SnappyCompressorInputStream(byteStream, 1 << 16 /* Avro uses 64KB blocks */);
case DataFileConstants.DEFLATE_CODEC:
// nowrap == true: Do not expect ZLIB header or checksum, as Avro does not write them.
Inflater inflater = new Inflater(true);
return new InflaterInputStream(byteStream, inflater);
case DataFileConstants.XZ_CODEC:
return new XZCompressorInputStream(byteStream);
case DataFileConstants.BZIP2_CODEC:
return new BZip2CompressorInputStream(byteStream);
case DataFileConstants.NULL_CODEC:
return byteStream;
default:
throw new IllegalArgumentException("Unsupported codec: " + codec);
}
}
AvroBlock(byte[] data, long numRecords, Mode<T> mode, String writerSchemaString, String codec)
throws IOException {
this.mode = mode;
this.numRecords = numRecords;
checkNotNull(writerSchemaString, "writerSchemaString");
Schema writerSchema = internOrParseSchemaString(writerSchemaString);
Schema readerSchema =
internOrParseSchemaString(
MoreObjects.firstNonNull(mode.readerSchemaString, writerSchemaString));
this.reader =
(mode.type == GenericRecord.class)
? new GenericDatumReader<T>(writerSchema, readerSchema)
: new ReflectDatumReader<T>(writerSchema, readerSchema);
this.decoder = DecoderFactory.get().binaryDecoder(decodeAsInputStream(data, codec), null);
}
@Override
public T getCurrentRecord() {
return currentRecord;
}
@Override
public boolean readNextRecord() throws IOException {
if (currentRecordIndex >= numRecords) {
return false;
}
Object record = reader.read(null, decoder);
currentRecord =
(mode.parseFn == null) ? ((T) record) : mode.parseFn.apply((GenericRecord) record);
currentRecordIndex++;
return true;
}
@Override
public double getFractionOfBlockConsumed() {
return ((double) currentRecordIndex) / numRecords;
}
}
/**
* A {@link BlockBasedSource.BlockBasedReader} for reading blocks from Avro files.
*
* <p>An Avro Object Container File consists of a header followed by a 16-bit sync marker and then
* a sequence of blocks, where each block begins with two encoded longs representing the total
* number of records in the block and the block's size in bytes, followed by the block's
* (optionally-encoded) records. Each block is terminated by a 16-bit sync marker.
*
* @param <T> The type of records contained in the block.
*/
@Experimental(Experimental.Kind.SOURCE_SINK)
public static class AvroReader<T> extends BlockBasedReader<T> {
// Initialized in startReading.
@Nullable private AvroMetadata metadata;
// The current block.
// Initialized in readNextRecord.
@Nullable private AvroBlock<T> currentBlock;
// A lock used to synchronize block offsets for getRemainingParallelism
private final Object progressLock = new Object();
// Offset of the current block.
@GuardedBy("progressLock")
private long currentBlockOffset = 0;
// Size of the current block.
@GuardedBy("progressLock")
private long currentBlockSizeBytes = 0;
// Stream used to read from the underlying file.
// A pushback stream is used to restore bytes buffered during seeking.
// Initialized in startReading.
@Nullable private PushbackInputStream stream;
// Counts the number of bytes read. Used only to tell how many bytes are taken up in
// a block's variable-length header.
// Initialized in startReading.
@Nullable private CountingInputStream countStream;
// Caches the Avro DirectBinaryDecoder used to decode binary-encoded values from the buffer.
// Initialized in readNextBlock.
@Nullable private BinaryDecoder decoder;
/** Reads Avro records of type {@code T} from the specified source. */
public AvroReader(AvroSource<T> source) {
super(source);
}
@Override
public synchronized AvroSource<T> getCurrentSource() {
return (AvroSource<T>) super.getCurrentSource();
}
// Precondition: the stream is positioned after the sync marker in the current (about to be
// previous) block. currentBlockSize equals the size of the current block, or zero if this
// reader was just started.
//
// Postcondition: same as above, but for the new current (formerly next) block.
@Override
public boolean readNextBlock() throws IOException {
long startOfNextBlock;
synchronized (progressLock) {
startOfNextBlock = currentBlockOffset + currentBlockSizeBytes;
}
// Before reading the variable-sized block header, record the current number of bytes read.
long preHeaderCount = countStream.getBytesRead();
decoder = DecoderFactory.get().directBinaryDecoder(countStream, decoder);
long numRecords;
try {
numRecords = decoder.readLong();
} catch (EOFException e) {
// Expected for the last block, at which the start position is the EOF. The way to detect
// stream ending is to try reading from it.
return false;
}
long blockSize = decoder.readLong();
// Mark header size as the change in the number of bytes read.
long headerSize = countStream.getBytesRead() - preHeaderCount;
// Create the current block by reading blockSize bytes. Block sizes permitted by the Avro
// specification are [32, 2^30], so the cast is safe.
byte[] data = new byte[(int) blockSize];
int bytesRead = IOUtils.readFully(stream, data);
checkState(
blockSize == bytesRead,
"Only able to read %s/%s bytes in the block before EOF reached.",
bytesRead,
blockSize);
currentBlock =
new AvroBlock<>(
data,
numRecords,
getCurrentSource().mode,
metadata.getSchemaString(),
metadata.getCodec());
// Read the end of this block, which MUST be a sync marker for correctness.
byte[] syncMarker = metadata.getSyncMarker();
byte[] readSyncMarker = new byte[syncMarker.length];
long syncMarkerOffset = startOfNextBlock + headerSize + blockSize;
bytesRead = IOUtils.readFully(stream, readSyncMarker);
checkState(
bytesRead == syncMarker.length,
"Only able to read %s/%s bytes of Avro sync marker at position %s before EOF reached.",
bytesRead,
syncMarker.length,
syncMarkerOffset);
if (!Arrays.equals(syncMarker, readSyncMarker)) {
throw new IllegalStateException(
String.format(
"Expected the bytes [%d,%d) in file %s to be a sync marker, but found %s",
syncMarkerOffset,
syncMarkerOffset + syncMarker.length,
getCurrentSource().getFileOrPatternSpec(),
Arrays.toString(readSyncMarker)));
}
// Atomically update both the position and offset of the new block.
synchronized (progressLock) {
currentBlockOffset = startOfNextBlock;
// Total block size includes the header, block content, and trailing sync marker.
currentBlockSizeBytes = headerSize + blockSize + syncMarker.length;
}
return true;
}
@Override
public AvroBlock<T> getCurrentBlock() {
return currentBlock;
}
@Override
public long getCurrentBlockOffset() {
synchronized (progressLock) {
return currentBlockOffset;
}
}
@Override
public long getCurrentBlockSize() {
synchronized (progressLock) {
return currentBlockSizeBytes;
}
}
@Override
public long getSplitPointsRemaining() {
if (isDone()) {
return 0;
}
synchronized (progressLock) {
if (currentBlockOffset + currentBlockSizeBytes >= getCurrentSource().getEndOffset()) {
// This block is known to be the last block in the range.
return 1;
}
}
return super.getSplitPointsRemaining();
}
/**
* Creates a {@link PushbackInputStream} that has a large enough pushback buffer to be able to
* push back the syncBuffer.
*/
private PushbackInputStream createStream(ReadableByteChannel channel) {
return new PushbackInputStream(
Channels.newInputStream(channel), metadata.getSyncMarker().length);
}
// Postcondition: the stream is positioned at the beginning of the first block after the start
// of the current source, and currentBlockOffset is that position. Additionally,
// currentBlockSizeBytes will be set to 0 indicating that the previous block was empty.
@Override
protected void startReading(ReadableByteChannel channel) throws IOException {
try {
metadata = readMetadataFromFile(getCurrentSource().getSingleFileMetadata().resourceId());
} catch (IOException e) {
throw new RuntimeException(
"Error reading metadata from file " + getCurrentSource().getSingleFileMetadata(), e);
}
long startOffset = getCurrentSource().getStartOffset();
byte[] syncMarker = metadata.getSyncMarker();
long syncMarkerLength = syncMarker.length;
if (startOffset != 0) {
// Rewind order to find the sync marker ending the previous block.
long position = Math.max(0, startOffset - syncMarkerLength);
((SeekableByteChannel) channel).position(position);
startOffset = position;
}
// Satisfy the post condition.
stream = createStream(channel);
countStream = new CountingInputStream(stream);
synchronized (progressLock) {
currentBlockOffset = startOffset + advancePastNextSyncMarker(stream, syncMarker);
currentBlockSizeBytes = 0;
}
}
/**
* Advances to the first byte after the next occurrence of the sync marker in the stream when
* reading from the current offset. Returns the number of bytes consumed from the stream. Note
* that this method requires a PushbackInputStream with a buffer at least as big as the marker
* it is seeking for.
*/
static long advancePastNextSyncMarker(PushbackInputStream stream, byte[] syncMarker)
throws IOException {
Seeker seeker = new Seeker(syncMarker);
byte[] syncBuffer = new byte[syncMarker.length];
long totalBytesConsumed = 0;
// Seek until either a sync marker is found or we reach the end of the file.
int mark = -1; // Position of the last byte in the sync marker.
int read; // Number of bytes read.
do {
read = stream.read(syncBuffer);
if (read >= 0) {
mark = seeker.find(syncBuffer, read);
// Update the currentOffset with the number of bytes read.
totalBytesConsumed += read;
}
} while (mark < 0 && read > 0);
// If the sync marker was found, unread block data and update the current offsets.
if (mark >= 0) {
// The current offset after this call should be just past the sync marker, so we should
// unread the remaining buffer contents and update the currentOffset accordingly.
stream.unread(syncBuffer, mark + 1, read - (mark + 1));
totalBytesConsumed = totalBytesConsumed - (read - (mark + 1));
}
return totalBytesConsumed;
}
/**
* A {@link Seeker} looks for a given marker within a byte buffer. Uses naive string matching
* with a sliding window, as sync markers are small and random.
*/
static class Seeker {
// The marker to search for.
private byte[] marker;
// Buffer used for the sliding window.
private byte[] searchBuffer;
// Number of bytes available to be matched in the buffer.
private int available = 0;
/** Create a {@link Seeker} that looks for the given marker. */
public Seeker(byte[] marker) {
this.marker = marker;
this.searchBuffer = new byte[marker.length];
}
/**
* Find the marker in the byte buffer. Returns the index of the end of the marker in the
* buffer. If the marker is not found, returns -1.
*
* <p>State is maintained between calls. If the marker was partially matched, a subsequent
* call to find will resume matching the marker.
*
* @param buffer
* @return the index of the end of the marker within the buffer, or -1 if the buffer was not
* found.
*/
public int find(byte[] buffer, int length) {
for (int i = 0; i < length; i++) {
System.arraycopy(searchBuffer, 1, searchBuffer, 0, searchBuffer.length - 1);
searchBuffer[searchBuffer.length - 1] = buffer[i];
available = Math.min(available + 1, searchBuffer.length);
if (ByteBuffer.wrap(searchBuffer, searchBuffer.length - available, available)
.equals(ByteBuffer.wrap(marker))) {
available = 0;
return i;
}
}
return -1;
}
}
}
}