/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.sdk.io;

import static org.apache.beam.sdk.io.FileBasedSource.Mode.SINGLE_FILE_OR_SUBRANGE;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;

import java.io.ByteArrayInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InvalidObjectException;
import java.io.ObjectInputStream;
import java.io.ObjectStreamException;
import java.io.PushbackInputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.zip.Inflater;
import java.util.zip.InflaterInputStream;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileConstants;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.compress.compressors.snappy.SnappyCompressorInputStream;
import org.apache.commons.compress.compressors.xz.XZCompressorInputStream;
import org.apache.commons.compress.utils.CountingInputStream;
import org.apache.commons.compress.utils.IOUtils;

// CHECKSTYLE.OFF: JavadocStyle
/**
 * Do not use in pipelines directly: most users should use {@link AvroIO.Read}.
 *
 * <p>A {@link FileBasedSource} for reading Avro files.
 *
 * <p>To read a {@link PCollection} of objects from one or more Avro files, use {@link
 * AvroSource#from} to specify the path(s) of the files to read. The {@link AvroSource} that is
 * returned will read objects of type {@link GenericRecord} with the schema(s) that were written at
 * file creation. To further configure the {@link AvroSource} to read with a user-defined schema, or
 * to return records of a type other than {@link GenericRecord}, use {@link
 * AvroSource#withSchema(Schema)} (using an Avro {@link Schema}), {@link
 * AvroSource#withSchema(String)} (using a JSON schema), or {@link AvroSource#withSchema(Class)} (to
 * return objects of the Avro-generated class specified).
 *
 * <p>An {@link AvroSource} can be read from using the {@link Read} transform. For example:
 *
 * <pre>{@code
 * AvroSource<MyType> source = AvroSource.from(file.toPath()).withSchema(MyType.class);
 * PCollection<MyType> records = Read.from(mySource);
 * }</pre>
 *
 * <p>This class's implementation is based on the <a
 * href="https://avro.apache.org/docs/1.7.7/spec.html">Avro 1.7.7</a> specification and implements
 * parsing of some parts of Avro Object Container Files. The rationale for doing so is that the Avro
 * API does not provide efficient ways of computing the precise offsets of blocks within a file,
 * which is necessary to support dynamic work rebalancing. However, whenever it is possible to use
 * the Avro API in a way that supports maintaining precise offsets, this class uses the Avro API.
 *
 * <p>Avro Object Container files store records in blocks. Each block contains a collection of
 * records. Blocks may be encoded (e.g., with bzip2, deflate, snappy, etc.). Blocks are delineated
 * from one another by a 16-byte sync marker.
 *
 * <p>An {@link AvroSource} for a subrange of a single file contains records in the blocks such that
 * the start offset of the block is greater than or equal to the start offset of the source and less
 * than the end offset of the source.
 *
 * <p>To use XZ-encoded Avro files, please include an explicit dependency on {@code xz-1.8.jar},
 * which has been marked as optional in the Maven {@code sdk/pom.xml}.
 *
 * <pre>{@code
 * <dependency>
 *   <groupId>org.tukaani</groupId>
 *   <artifactId>xz</artifactId>
 *   <version>1.8</version>
 * </dependency>
 * }</pre>
 *
 * <h3>Permissions</h3>
 *
 * <p>Permission requirements depend on the {@link PipelineRunner} that is used to execute the
 * pipeline. Please refer to the documentation of corresponding {@link PipelineRunner}s for more
 * details.
 *
 * @param <T> The type of records to be read from the source.
 */
// CHECKSTYLE.ON: JavadocStyle
@Experimental(Experimental.Kind.SOURCE_SINK)
public class AvroSource<T> extends BlockBasedSource<T> {
  // Default minimum bundle size (chosen as two default-size Avro blocks to attempt to
  // ensure that every source has at least one block of records).
  // The default sync interval is 64k.
  private static final long DEFAULT_MIN_BUNDLE_SIZE = 2L * DataFileConstants.DEFAULT_SYNC_INTERVAL;

  // Use cases of AvroSource are:
  // 1) AvroSource<GenericRecord> Reading GenericRecord records with a specified schema.
  // 2) AvroSource<Foo> Reading records of a generated Avro class Foo.
  // 3) AvroSource<T> Reading GenericRecord records with an unspecified schema
  //    and converting them to type T.
  //                     |    Case 1     |    Case 2   |     Case 3    |
  // type                | GenericRecord |     Foo     | GenericRecord |
  // readerSchemaString  |    non-null   |   non-null  |     null      |
  // parseFn             |      null     |     null    |   non-null    |
  // outputCoder         |      null     |     null    |   non-null    |
  private static class Mode<T> implements Serializable {
    private final Class<?> type;

    // The JSON schema used to decode records.
    @Nullable private String readerSchemaString;

    @Nullable private final SerializableFunction<GenericRecord, T> parseFn;

    @Nullable private final Coder<T> outputCoder;

    private Mode(
        Class<?> type,
        @Nullable String readerSchemaString,
        @Nullable SerializableFunction<GenericRecord, T> parseFn,
        @Nullable Coder<T> outputCoder) {
      this.type = type;
      this.readerSchemaString = internSchemaString(readerSchemaString);
      this.parseFn = parseFn;
      this.outputCoder = outputCoder;
    }

    private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException {
      is.defaultReadObject();
      readerSchemaString = internSchemaString(readerSchemaString);
    }

    private Coder<T> getOutputCoder() {
      if (parseFn == null) {
        return AvroCoder.of((Class<T>) type, internOrParseSchemaString(readerSchemaString));
      } else {
        return outputCoder;
      }
    }

    private void validate() {
      if (parseFn == null) {
        checkArgument(
            readerSchemaString != null,
            "schema must be specified using withSchema() when not using a parse fn");
      }
    }
  }

  private static Mode<GenericRecord> readGenericRecordsWithSchema(String schema) {
    return new Mode<>(GenericRecord.class, schema, null, null);
  }

  private static <T> Mode<T> readGeneratedClasses(Class<T> clazz) {
    return new Mode<>(clazz, ReflectData.get().getSchema(clazz).toString(), null, null);
  }

  private static <T> Mode<T> parseGenericRecords(
      SerializableFunction<GenericRecord, T> parseFn, Coder<T> outputCoder) {
    return new Mode<>(GenericRecord.class, null, parseFn, outputCoder);
  }

  private final Mode<T> mode;

  /**
   * Reads from the given file name or pattern ("glob"). The returned source needs to be further
   * configured by calling {@link #withSchema} to return a type other than {@link GenericRecord}.
   */
  public static AvroSource<GenericRecord> from(ValueProvider<String> fileNameOrPattern) {
    return new AvroSource<>(
        fileNameOrPattern,
        EmptyMatchTreatment.DISALLOW,
        DEFAULT_MIN_BUNDLE_SIZE,
        readGenericRecordsWithSchema(null /* will need to be specified in withSchema */));
  }

  public static AvroSource<GenericRecord> from(Metadata metadata) {
    return new AvroSource<>(
        metadata,
        DEFAULT_MIN_BUNDLE_SIZE,
        0,
        metadata.sizeBytes(),
        readGenericRecordsWithSchema(null /* will need to be specified in withSchema */));
  }

  /** Like {@link #from(ValueProvider)}. */
  public static AvroSource<GenericRecord> from(String fileNameOrPattern) {
    return from(ValueProvider.StaticValueProvider.of(fileNameOrPattern));
  }

  public AvroSource<T> withEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment) {
    return new AvroSource<>(
        getFileOrPatternSpecProvider(), emptyMatchTreatment, getMinBundleSize(), mode);
  }

  /** Reads files containing records that conform to the given schema. */
  public AvroSource<GenericRecord> withSchema(String schema) {
    checkArgument(schema != null, "schema can not be null");
    return new AvroSource<>(
        getFileOrPatternSpecProvider(),
        getEmptyMatchTreatment(),
        getMinBundleSize(),
        readGenericRecordsWithSchema(schema));
  }

  /** Like {@link #withSchema(String)}. */
  public AvroSource<GenericRecord> withSchema(Schema schema) {
    checkArgument(schema != null, "schema can not be null");
    return withSchema(schema.toString());
  }

  /** Reads files containing records of the given class. */
  public <X> AvroSource<X> withSchema(Class<X> clazz) {
    checkArgument(clazz != null, "clazz can not be null");
    if (getMode() == SINGLE_FILE_OR_SUBRANGE) {
      return new AvroSource<>(
          getSingleFileMetadata(),
          getMinBundleSize(),
          getStartOffset(),
          getEndOffset(),
          readGeneratedClasses(clazz));
    }
    return new AvroSource<>(
        getFileOrPatternSpecProvider(),
        getEmptyMatchTreatment(),
        getMinBundleSize(),
        readGeneratedClasses(clazz));
  }

  /**
   * Reads {@link GenericRecord} of unspecified schema and maps them to instances of a custom type
   * using the given {@code parseFn} and encoded using the given coder.
   */
  public <X> AvroSource<X> withParseFn(
      SerializableFunction<GenericRecord, X> parseFn, Coder<X> coder) {
    checkArgument(parseFn != null, "parseFn can not be null");
    checkArgument(coder != null, "coder can not be null");
    if (getMode() == SINGLE_FILE_OR_SUBRANGE) {
      return new AvroSource<>(
          getSingleFileMetadata(),
          getMinBundleSize(),
          getStartOffset(),
          getEndOffset(),
          parseGenericRecords(parseFn, coder));
    }
    return new AvroSource<>(
        getFileOrPatternSpecProvider(),
        getEmptyMatchTreatment(),
        getMinBundleSize(),
        parseGenericRecords(parseFn, coder));
  }

  /**
   * Sets the minimum bundle size. Refer to {@link OffsetBasedSource} for a description of {@code
   * minBundleSize} and its use.
   */
  public AvroSource<T> withMinBundleSize(long minBundleSize) {
    if (getMode() == SINGLE_FILE_OR_SUBRANGE) {
      return new AvroSource<>(
          getSingleFileMetadata(), minBundleSize, getStartOffset(), getEndOffset(), mode);
    }
    return new AvroSource<>(
        getFileOrPatternSpecProvider(), getEmptyMatchTreatment(), minBundleSize, mode);
  }

  /** Constructor for FILEPATTERN mode. */
  private AvroSource(
      ValueProvider<String> fileNameOrPattern,
      EmptyMatchTreatment emptyMatchTreatment,
      long minBundleSize,
      Mode<T> mode) {
    super(fileNameOrPattern, emptyMatchTreatment, minBundleSize);
    this.mode = mode;
  }

  /** Constructor for SINGLE_FILE_OR_SUBRANGE mode. */
  private AvroSource(
      Metadata metadata, long minBundleSize, long startOffset, long endOffset, Mode<T> mode) {
    super(metadata, minBundleSize, startOffset, endOffset);
    this.mode = mode;
  }

  @Override
  public void validate() {
    super.validate();
    mode.validate();
  }

  /**
   * Used by the Dataflow worker. Do not introduce new usages. Do not delete without confirming that
   * Dataflow ValidatesRunner tests pass.
   *
   * @deprecated Used by Dataflow worker
   */
  @Deprecated
  public BlockBasedSource<T> createForSubrangeOfFile(String fileName, long start, long end)
      throws IOException {
    return createForSubrangeOfFile(FileSystems.matchSingleFileSpec(fileName), start, end);
  }

  @Override
  public BlockBasedSource<T> createForSubrangeOfFile(Metadata fileMetadata, long start, long end) {
    return new AvroSource<>(fileMetadata, getMinBundleSize(), start, end, mode);
  }

  @Override
  protected BlockBasedReader<T> createSingleFileReader(PipelineOptions options) {
    return new AvroReader<>(this);
  }

  @Override
  public Coder<T> getOutputCoder() {
    return mode.getOutputCoder();
  }

  @VisibleForTesting
  @Nullable
  String getReaderSchemaString() {
    return mode.readerSchemaString;
  }

  /** Avro file metadata. */
  @VisibleForTesting
  static class AvroMetadata {
    private final byte[] syncMarker;
    private final String codec;
    private final String schemaString;

    AvroMetadata(byte[] syncMarker, String codec, String schemaString) {
      this.syncMarker = checkNotNull(syncMarker, "syncMarker");
      this.codec = checkNotNull(codec, "codec");
      this.schemaString = internSchemaString(checkNotNull(schemaString, "schemaString"));
    }

    /**
     * The JSON-encoded <a href="https://avro.apache.org/docs/1.7.7/spec.html#schemas">schema</a>
     * string for the file.
     */
    public String getSchemaString() {
      return schemaString;
    }

    /**
     * The <a href="https://avro.apache.org/docs/1.7.7/spec.html#Required+Codecs">codec</a> of the
     * file.
     */
    public String getCodec() {
      return codec;
    }

    /**
     * The 16-byte sync marker for the file. See the documentation for <a
     * href="https://avro.apache.org/docs/1.7.7/spec.html#Object+Container+Files">Object Container
     * File</a> for more information.
     */
    public byte[] getSyncMarker() {
      return syncMarker;
    }
  }

  /**
   * Reads the {@link AvroMetadata} from the header of an Avro file.
   *
   * <p>This method parses the header of an Avro <a
   * href="https://avro.apache.org/docs/1.7.7/spec.html#Object+Container+Files">Object Container
   * File</a>.
   *
   * @throws IOException if the file is an invalid format.
   */
  @VisibleForTesting
  static AvroMetadata readMetadataFromFile(ResourceId fileResource) throws IOException {
    String codec = null;
    String schemaString = null;
    byte[] syncMarker;
    try (InputStream stream = Channels.newInputStream(FileSystems.open(fileResource))) {
      BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null);

      // The header of an object container file begins with a four-byte magic number, followed
      // by the file metadata (including the schema and codec), encoded as a map. Finally, the
      // header ends with the file's 16-byte sync marker.
      // See https://avro.apache.org/docs/1.7.7/spec.html#Object+Container+Files for details on
      // the encoding of container files.

      // Read the magic number.
      byte[] magic = new byte[DataFileConstants.MAGIC.length];
      decoder.readFixed(magic);
      if (!Arrays.equals(magic, DataFileConstants.MAGIC)) {
        throw new IOException("Missing Avro file signature: " + fileResource);
      }

      // Read the metadata to find the codec and schema.
      ByteBuffer valueBuffer = ByteBuffer.allocate(512);
      long numRecords = decoder.readMapStart();
      while (numRecords > 0) {
        for (long recordIndex = 0; recordIndex < numRecords; recordIndex++) {
          String key = decoder.readString();
          // readBytes() clears the buffer and returns a buffer where:
          // - position is the start of the bytes read
          // - limit is the end of the bytes read
          valueBuffer = decoder.readBytes(valueBuffer);
          byte[] bytes = new byte[valueBuffer.remaining()];
          valueBuffer.get(bytes);
          if (key.equals(DataFileConstants.CODEC)) {
            codec = new String(bytes, StandardCharsets.UTF_8);
          } else if (key.equals(DataFileConstants.SCHEMA)) {
            schemaString = new String(bytes, StandardCharsets.UTF_8);
          }
        }
        numRecords = decoder.mapNext();
      }
      if (codec == null) {
        codec = DataFileConstants.NULL_CODEC;
      }

      // Finally, read the sync marker.
      syncMarker = new byte[DataFileConstants.SYNC_SIZE];
      decoder.readFixed(syncMarker);
    }
    checkState(schemaString != null, "No schema present in Avro file metadata %s", fileResource);
    return new AvroMetadata(syncMarker, codec, schemaString);
  }

  // A logical reference cache used to store schemas and schema strings to allow us to
  // "intern" values and reduce the number of copies of equivalent objects.
  private static final Map<String, Schema> schemaLogicalReferenceCache = new WeakHashMap<>();
  private static final Map<String, String> schemaStringLogicalReferenceCache = new WeakHashMap<>();

  // We avoid String.intern() because depending on the JVM, these may be added to the PermGenSpace
  // which we want to avoid otherwise we could run out of PermGenSpace.
  private static synchronized String internSchemaString(String schema) {
    String internSchema = schemaStringLogicalReferenceCache.get(schema);
    if (internSchema != null) {
      return internSchema;
    }
    schemaStringLogicalReferenceCache.put(schema, schema);
    return schema;
  }

  private static synchronized Schema internOrParseSchemaString(String schemaString) {
    Schema schema = schemaLogicalReferenceCache.get(schemaString);
    if (schema != null) {
      return schema;
    }
    Schema.Parser parser = new Schema.Parser();
    schema = parser.parse(schemaString);
    schemaLogicalReferenceCache.put(schemaString, schema);
    return schema;
  }

  // Reading the object from Java serialization typically does not go through the constructor,
  // we use readResolve to replace the constructed instance with one which uses the constructor
  // allowing us to intern any schemas.
  @SuppressWarnings("unused")
  private Object readResolve() throws ObjectStreamException {
    switch (getMode()) {
      case SINGLE_FILE_OR_SUBRANGE:
        return new AvroSource<>(
            getSingleFileMetadata(), getMinBundleSize(), getStartOffset(), getEndOffset(), mode);
      case FILEPATTERN:
        return new AvroSource<>(
            getFileOrPatternSpecProvider(), getEmptyMatchTreatment(), getMinBundleSize(), mode);
      default:
        throw new InvalidObjectException(
            String.format("Unknown mode %s for AvroSource %s", getMode(), this));
    }
  }

  /**
   * A {@link BlockBasedSource.Block} of Avro records.
   *
   * @param <T> The type of records stored in the block.
   */
  @Experimental(Experimental.Kind.SOURCE_SINK)
  static class AvroBlock<T> extends Block<T> {
    private final Mode<T> mode;

    // The number of records in the block.
    private final long numRecords;

    // The current record in the block. Initialized in readNextRecord.
    @Nullable private T currentRecord;

    // The index of the current record in the block.
    private long currentRecordIndex = 0;

    // A DatumReader to read records from the block.
    private final DatumReader<?> reader;

    // A BinaryDecoder used by the reader to decode records.
    private final BinaryDecoder decoder;

    /**
     * Decodes a byte array as an InputStream. The byte array may be compressed using some codec.
     * Reads from the returned stream will result in decompressed bytes.
     *
     * <p>This supports the same codecs as Avro's {@link CodecFactory}, namely those defined in
     * {@link DataFileConstants}.
     *
     * <ul>
     *   <li>"snappy" : Google's Snappy compression
     *   <li>"deflate" : deflate compression
     *   <li>"bzip2" : Bzip2 compression
     *   <li>"xz" : xz compression
     *   <li>"null" (the string, not the value): Uncompressed data
     * </ul>
     */
    private static InputStream decodeAsInputStream(byte[] data, String codec) throws IOException {
      ByteArrayInputStream byteStream = new ByteArrayInputStream(data);
      switch (codec) {
        case DataFileConstants.SNAPPY_CODEC:
          return new SnappyCompressorInputStream(byteStream, 1 << 16 /* Avro uses 64KB blocks */);
        case DataFileConstants.DEFLATE_CODEC:
          // nowrap == true: Do not expect ZLIB header or checksum, as Avro does not write them.
          Inflater inflater = new Inflater(true);
          return new InflaterInputStream(byteStream, inflater);
        case DataFileConstants.XZ_CODEC:
          return new XZCompressorInputStream(byteStream);
        case DataFileConstants.BZIP2_CODEC:
          return new BZip2CompressorInputStream(byteStream);
        case DataFileConstants.NULL_CODEC:
          return byteStream;
        default:
          throw new IllegalArgumentException("Unsupported codec: " + codec);
      }
    }

    AvroBlock(byte[] data, long numRecords, Mode<T> mode, String writerSchemaString, String codec)
        throws IOException {
      this.mode = mode;
      this.numRecords = numRecords;
      checkNotNull(writerSchemaString, "writerSchemaString");
      Schema writerSchema = internOrParseSchemaString(writerSchemaString);
      Schema readerSchema =
          internOrParseSchemaString(
              MoreObjects.firstNonNull(mode.readerSchemaString, writerSchemaString));
      this.reader =
          (mode.type == GenericRecord.class)
              ? new GenericDatumReader<T>(writerSchema, readerSchema)
              : new ReflectDatumReader<T>(writerSchema, readerSchema);
      this.decoder = DecoderFactory.get().binaryDecoder(decodeAsInputStream(data, codec), null);
    }

    @Override
    public T getCurrentRecord() {
      return currentRecord;
    }

    @Override
    public boolean readNextRecord() throws IOException {
      if (currentRecordIndex >= numRecords) {
        return false;
      }
      Object record = reader.read(null, decoder);
      currentRecord =
          (mode.parseFn == null) ? ((T) record) : mode.parseFn.apply((GenericRecord) record);
      currentRecordIndex++;
      return true;
    }

    @Override
    public double getFractionOfBlockConsumed() {
      return ((double) currentRecordIndex) / numRecords;
    }
  }

  /**
   * A {@link BlockBasedSource.BlockBasedReader} for reading blocks from Avro files.
   *
   * <p>An Avro Object Container File consists of a header followed by a 16-bit sync marker and then
   * a sequence of blocks, where each block begins with two encoded longs representing the total
   * number of records in the block and the block's size in bytes, followed by the block's
   * (optionally-encoded) records. Each block is terminated by a 16-bit sync marker.
   *
   * @param <T> The type of records contained in the block.
   */
  @Experimental(Experimental.Kind.SOURCE_SINK)
  public static class AvroReader<T> extends BlockBasedReader<T> {
    // Initialized in startReading.
    @Nullable private AvroMetadata metadata;

    // The current block.
    // Initialized in readNextRecord.
    @Nullable private AvroBlock<T> currentBlock;

    // A lock used to synchronize block offsets for getRemainingParallelism
    private final Object progressLock = new Object();

    // Offset of the current block.
    @GuardedBy("progressLock")
    private long currentBlockOffset = 0;

    // Size of the current block.
    @GuardedBy("progressLock")
    private long currentBlockSizeBytes = 0;

    // Stream used to read from the underlying file.
    // A pushback stream is used to restore bytes buffered during seeking.
    // Initialized in startReading.
    @Nullable private PushbackInputStream stream;

    // Counts the number of bytes read. Used only to tell how many bytes are taken up in
    // a block's variable-length header.
    // Initialized in startReading.
    @Nullable private CountingInputStream countStream;

    // Caches the Avro DirectBinaryDecoder used to decode binary-encoded values from the buffer.
    // Initialized in readNextBlock.
    @Nullable private BinaryDecoder decoder;

    /** Reads Avro records of type {@code T} from the specified source. */
    public AvroReader(AvroSource<T> source) {
      super(source);
    }

    @Override
    public synchronized AvroSource<T> getCurrentSource() {
      return (AvroSource<T>) super.getCurrentSource();
    }

    // Precondition: the stream is positioned after the sync marker in the current (about to be
    // previous) block. currentBlockSize equals the size of the current block, or zero if this
    // reader was just started.
    //
    // Postcondition: same as above, but for the new current (formerly next) block.
    @Override
    public boolean readNextBlock() throws IOException {
      long startOfNextBlock;
      synchronized (progressLock) {
        startOfNextBlock = currentBlockOffset + currentBlockSizeBytes;
      }

      // Before reading the variable-sized block header, record the current number of bytes read.
      long preHeaderCount = countStream.getBytesRead();
      decoder = DecoderFactory.get().directBinaryDecoder(countStream, decoder);
      long numRecords;
      try {
        numRecords = decoder.readLong();
      } catch (EOFException e) {
        // Expected for the last block, at which the start position is the EOF. The way to detect
        // stream ending is to try reading from it.
        return false;
      }
      long blockSize = decoder.readLong();

      // Mark header size as the change in the number of bytes read.
      long headerSize = countStream.getBytesRead() - preHeaderCount;

      // Create the current block by reading blockSize bytes. Block sizes permitted by the Avro
      // specification are [32, 2^30], so the cast is safe.
      byte[] data = new byte[(int) blockSize];
      int bytesRead = IOUtils.readFully(stream, data);
      checkState(
          blockSize == bytesRead,
          "Only able to read %s/%s bytes in the block before EOF reached.",
          bytesRead,
          blockSize);
      currentBlock =
          new AvroBlock<>(
              data,
              numRecords,
              getCurrentSource().mode,
              metadata.getSchemaString(),
              metadata.getCodec());

      // Read the end of this block, which MUST be a sync marker for correctness.
      byte[] syncMarker = metadata.getSyncMarker();
      byte[] readSyncMarker = new byte[syncMarker.length];
      long syncMarkerOffset = startOfNextBlock + headerSize + blockSize;
      bytesRead = IOUtils.readFully(stream, readSyncMarker);
      checkState(
          bytesRead == syncMarker.length,
          "Only able to read %s/%s bytes of Avro sync marker at position %s before EOF reached.",
          bytesRead,
          syncMarker.length,
          syncMarkerOffset);
      if (!Arrays.equals(syncMarker, readSyncMarker)) {
        throw new IllegalStateException(
            String.format(
                "Expected the bytes [%d,%d) in file %s to be a sync marker, but found %s",
                syncMarkerOffset,
                syncMarkerOffset + syncMarker.length,
                getCurrentSource().getFileOrPatternSpec(),
                Arrays.toString(readSyncMarker)));
      }

      // Atomically update both the position and offset of the new block.
      synchronized (progressLock) {
        currentBlockOffset = startOfNextBlock;
        // Total block size includes the header, block content, and trailing sync marker.
        currentBlockSizeBytes = headerSize + blockSize + syncMarker.length;
      }

      return true;
    }

    @Override
    public AvroBlock<T> getCurrentBlock() {
      return currentBlock;
    }

    @Override
    public long getCurrentBlockOffset() {
      synchronized (progressLock) {
        return currentBlockOffset;
      }
    }

    @Override
    public long getCurrentBlockSize() {
      synchronized (progressLock) {
        return currentBlockSizeBytes;
      }
    }

    @Override
    public long getSplitPointsRemaining() {
      if (isDone()) {
        return 0;
      }
      synchronized (progressLock) {
        if (currentBlockOffset + currentBlockSizeBytes >= getCurrentSource().getEndOffset()) {
          // This block is known to be the last block in the range.
          return 1;
        }
      }
      return super.getSplitPointsRemaining();
    }

    /**
     * Creates a {@link PushbackInputStream} that has a large enough pushback buffer to be able to
     * push back the syncBuffer.
     */
    private PushbackInputStream createStream(ReadableByteChannel channel) {
      return new PushbackInputStream(
          Channels.newInputStream(channel), metadata.getSyncMarker().length);
    }

    // Postcondition: the stream is positioned at the beginning of the first block after the start
    // of the current source, and currentBlockOffset is that position. Additionally,
    // currentBlockSizeBytes will be set to 0 indicating that the previous block was empty.
    @Override
    protected void startReading(ReadableByteChannel channel) throws IOException {
      try {
        metadata = readMetadataFromFile(getCurrentSource().getSingleFileMetadata().resourceId());
      } catch (IOException e) {
        throw new RuntimeException(
            "Error reading metadata from file " + getCurrentSource().getSingleFileMetadata(), e);
      }

      long startOffset = getCurrentSource().getStartOffset();
      byte[] syncMarker = metadata.getSyncMarker();
      long syncMarkerLength = syncMarker.length;

      if (startOffset != 0) {
        // Rewind order to find the sync marker ending the previous block.
        long position = Math.max(0, startOffset - syncMarkerLength);
        ((SeekableByteChannel) channel).position(position);
        startOffset = position;
      }

      // Satisfy the post condition.
      stream = createStream(channel);
      countStream = new CountingInputStream(stream);
      synchronized (progressLock) {
        currentBlockOffset = startOffset + advancePastNextSyncMarker(stream, syncMarker);
        currentBlockSizeBytes = 0;
      }
    }

    /**
     * Advances to the first byte after the next occurrence of the sync marker in the stream when
     * reading from the current offset. Returns the number of bytes consumed from the stream. Note
     * that this method requires a PushbackInputStream with a buffer at least as big as the marker
     * it is seeking for.
     */
    static long advancePastNextSyncMarker(PushbackInputStream stream, byte[] syncMarker)
        throws IOException {
      Seeker seeker = new Seeker(syncMarker);
      byte[] syncBuffer = new byte[syncMarker.length];
      long totalBytesConsumed = 0;
      // Seek until either a sync marker is found or we reach the end of the file.
      int mark = -1; // Position of the last byte in the sync marker.
      int read; // Number of bytes read.
      do {
        read = stream.read(syncBuffer);
        if (read >= 0) {
          mark = seeker.find(syncBuffer, read);
          // Update the currentOffset with the number of bytes read.
          totalBytesConsumed += read;
        }
      } while (mark < 0 && read > 0);

      // If the sync marker was found, unread block data and update the current offsets.
      if (mark >= 0) {
        // The current offset after this call should be just past the sync marker, so we should
        // unread the remaining buffer contents and update the currentOffset accordingly.
        stream.unread(syncBuffer, mark + 1, read - (mark + 1));
        totalBytesConsumed = totalBytesConsumed - (read - (mark + 1));
      }
      return totalBytesConsumed;
    }

    /**
     * A {@link Seeker} looks for a given marker within a byte buffer. Uses naive string matching
     * with a sliding window, as sync markers are small and random.
     */
    static class Seeker {
      // The marker to search for.
      private byte[] marker;

      // Buffer used for the sliding window.
      private byte[] searchBuffer;

      // Number of bytes available to be matched in the buffer.
      private int available = 0;

      /** Create a {@link Seeker} that looks for the given marker. */
      public Seeker(byte[] marker) {
        this.marker = marker;
        this.searchBuffer = new byte[marker.length];
      }

      /**
       * Find the marker in the byte buffer. Returns the index of the end of the marker in the
       * buffer. If the marker is not found, returns -1.
       *
       * <p>State is maintained between calls. If the marker was partially matched, a subsequent
       * call to find will resume matching the marker.
       *
       * @param buffer
       * @return the index of the end of the marker within the buffer, or -1 if the buffer was not
       *     found.
       */
      public int find(byte[] buffer, int length) {
        for (int i = 0; i < length; i++) {
          System.arraycopy(searchBuffer, 1, searchBuffer, 0, searchBuffer.length - 1);
          searchBuffer[searchBuffer.length - 1] = buffer[i];
          available = Math.min(available + 1, searchBuffer.length);
          if (ByteBuffer.wrap(searchBuffer, searchBuffer.length - available, available)
              .equals(ByteBuffer.wrap(marker))) {
            available = 0;
            return i;
          }
        }
        return -1;
      }
    }
  }
}
