/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.sdk.io.kafka;

import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark.PartitionMark;
import org.apache.beam.sdk.io.kafka.KafkaIO.Read;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Gauge;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.SourceMetrics;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.Deserializer;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * An unbounded reader to read from Kafka. Each reader consumes messages from one or more Kafka
 * partitions. See {@link KafkaIO} for user visible documentation and example usage.
 */
class KafkaUnboundedReader<K, V> extends UnboundedReader<KafkaRecord<K, V>> {

  ///////////////////// Reader API ////////////////////////////////////////////////////////////
  @SuppressWarnings("FutureReturnValueIgnored")
  @Override
  public boolean start() throws IOException {
    final int defaultPartitionInitTimeout = 60 * 1000;
    final int kafkaRequestTimeoutMultiple = 2;

    Read<K, V> spec = source.getSpec();
    consumer = spec.getConsumerFactoryFn().apply(spec.getConsumerConfig());
    consumerSpEL.evaluateAssign(consumer, spec.getTopicPartitions());

    try {
      keyDeserializerInstance = spec.getKeyDeserializer().getDeclaredConstructor().newInstance();
      valueDeserializerInstance =
          spec.getValueDeserializer().getDeclaredConstructor().newInstance();
    } catch (InstantiationException
        | IllegalAccessException
        | InvocationTargetException
        | NoSuchMethodException e) {
      throw new IOException("Could not instantiate deserializers", e);
    }

    keyDeserializerInstance.configure(spec.getConsumerConfig(), true);
    valueDeserializerInstance.configure(spec.getConsumerConfig(), false);

    // Seek to start offset for each partition. This is the first interaction with the server.
    // Unfortunately it can block forever in case of network issues like incorrect ACLs.
    // Initialize partition in a separate thread and cancel it if takes longer than a minute.
    for (final PartitionState pState : partitionStates) {
      Future<?> future = consumerPollThread.submit(() -> setupInitialOffset(pState));

      try {
        // Timeout : 1 minute OR 2 * Kafka consumer request timeout if it is set.
        Integer reqTimeout =
            (Integer) spec.getConsumerConfig().get(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
        future.get(
            reqTimeout != null
                ? kafkaRequestTimeoutMultiple * reqTimeout
                : defaultPartitionInitTimeout,
            TimeUnit.MILLISECONDS);
      } catch (TimeoutException e) {
        consumer.wakeup(); // This unblocks consumer stuck on network I/O.
        // Likely reason : Kafka servers are configured to advertise internal ips, but
        // those ips are not accessible from workers outside.
        String msg =
            String.format(
                "%s: Timeout while initializing partition '%s'. "
                    + "Kafka client may not be able to connect to servers.",
                this, pState.topicPartition);
        LOG.error("{}", msg);
        throw new IOException(msg);
      } catch (Exception e) {
        throw new IOException(e);
      }
      LOG.info(
          "{}: reading from {} starting at offset {}",
          name,
          pState.topicPartition,
          pState.nextOffset);
    }

    // Start consumer read loop.
    // Note that consumer is not thread safe, should not be accessed out side consumerPollLoop().
    consumerPollThread.submit(this::consumerPollLoop);

    // offsetConsumer setup :
    Map<String, Object> offsetConsumerConfig = getOffsetConsumerConfig();

    offsetConsumer = spec.getConsumerFactoryFn().apply(offsetConsumerConfig);
    consumerSpEL.evaluateAssign(offsetConsumer, spec.getTopicPartitions());

    // Fetch offsets once before running periodically.
    updateLatestOffsets();

    offsetFetcherThread.scheduleAtFixedRate(
        this::updateLatestOffsets, 0, OFFSET_UPDATE_INTERVAL_SECONDS, TimeUnit.SECONDS);

    return advance();
  }

  @Override
  public boolean advance() throws IOException {
    /* Read first record (if any). we need to loop here because :
     *  - (a) some records initially need to be skipped if they are before consumedOffset
     *  - (b) if curBatch is empty, we want to fetch next batch and then advance.
     *  - (c) curBatch is an iterator of iterators. we interleave the records from each.
     *        curBatch.next() might return an empty iterator.
     */
    while (true) {
      if (curBatch.hasNext()) {
        PartitionState<K, V> pState = curBatch.next();

        if (!pState.recordIter.hasNext()) { // -- (c)
          pState.recordIter = Collections.emptyIterator(); // drop ref
          curBatch.remove();
          continue;
        }

        elementsRead.inc();
        elementsReadBySplit.inc();

        ConsumerRecord<byte[], byte[]> rawRecord = pState.recordIter.next();
        long expected = pState.nextOffset;
        long offset = rawRecord.offset();

        if (offset < expected) { // -- (a)
          // this can happen when compression is enabled in Kafka (seems to be fixed in 0.10)
          // should we check if the offset is way off from consumedOffset (say > 1M)?
          LOG.warn(
              "{}: ignoring already consumed offset {} for {}",
              this,
              offset,
              pState.topicPartition);
          continue;
        }

        long offsetGap = offset - expected; // could be > 0 when Kafka log compaction is enabled.

        if (curRecord == null) {
          LOG.info("{}: first record offset {}", name, offset);
          offsetGap = 0;
        }

        // Apply user deserializers. User deserializers might throw, which will be propagated up
        // and 'curRecord' remains unchanged. The runner should close this reader.
        // TODO: write records that can't be deserialized to a "dead-letter" additional output.
        KafkaRecord<K, V> record =
            new KafkaRecord<>(
                rawRecord.topic(),
                rawRecord.partition(),
                rawRecord.offset(),
                consumerSpEL.getRecordTimestamp(rawRecord),
                consumerSpEL.getRecordTimestampType(rawRecord),
                ConsumerSpEL.hasHeaders() ? rawRecord.headers() : null,
                keyDeserializerInstance.deserialize(rawRecord.topic(), rawRecord.key()),
                valueDeserializerInstance.deserialize(rawRecord.topic(), rawRecord.value()));

        curTimestamp =
            pState.timestampPolicy.getTimestampForRecord(pState.mkTimestampPolicyContext(), record);
        curRecord = record;

        int recordSize =
            (rawRecord.key() == null ? 0 : rawRecord.key().length)
                + (rawRecord.value() == null ? 0 : rawRecord.value().length);
        pState.recordConsumed(offset, recordSize, offsetGap);
        bytesRead.inc(recordSize);
        bytesReadBySplit.inc(recordSize);
        return true;

      } else { // -- (b)
        nextBatch();

        if (!curBatch.hasNext()) {
          return false;
        }
      }
    }
  }

  @Override
  public Instant getWatermark() {

    if (source.getSpec().getWatermarkFn() != null) {
      // Support old API which requires a KafkaRecord to invoke watermarkFn.
      if (curRecord == null) {
        LOG.debug("{}: getWatermark() : no records have been read yet.", name);
        return initialWatermark;
      }
      return source.getSpec().getWatermarkFn().apply(curRecord);
    }

    // Return minimum watermark among partitions.
    return partitionStates.stream()
        .map(PartitionState::updateAndGetWatermark)
        .min(Comparator.naturalOrder())
        .get();
  }

  @Override
  public CheckpointMark getCheckpointMark() {
    reportBacklog();
    return new KafkaCheckpointMark(
        partitionStates.stream()
            .map(
                p ->
                    new PartitionMark(
                        p.topicPartition.topic(),
                        p.topicPartition.partition(),
                        p.nextOffset,
                        p.lastWatermark.getMillis()))
            .collect(Collectors.toList()),
        source.getSpec().isCommitOffsetsInFinalizeEnabled() ? Optional.of(this) : Optional.empty());
  }

  @Override
  public UnboundedSource<KafkaRecord<K, V>, ?> getCurrentSource() {
    return source;
  }

  @Override
  public KafkaRecord<K, V> getCurrent() throws NoSuchElementException {
    // should we delay updating consumed offset till this point? Mostly not required.
    return curRecord;
  }

  @Override
  public Instant getCurrentTimestamp() throws NoSuchElementException {
    return curTimestamp;
  }

  @Override
  public long getSplitBacklogBytes() {
    long backlogBytes = 0;

    for (PartitionState p : partitionStates) {
      long pBacklog = p.approxBacklogInBytes();
      if (pBacklog == UnboundedReader.BACKLOG_UNKNOWN) {
        return UnboundedReader.BACKLOG_UNKNOWN;
      }
      backlogBytes += pBacklog;
    }

    return backlogBytes;
  }

  ////////////////////////////////////////////////////////////////////////////////////////////////

  private static final Logger LOG = LoggerFactory.getLogger(KafkaUnboundedSource.class);

  @VisibleForTesting static final String METRIC_NAMESPACE = "KafkaIOReader";

  @VisibleForTesting
  static final String CHECKPOINT_MARK_COMMITS_ENQUEUED_METRIC = "checkpointMarkCommitsEnqueued";

  private static final String CHECKPOINT_MARK_COMMITS_SKIPPED_METRIC =
      "checkpointMarkCommitsSkipped";

  private final KafkaUnboundedSource<K, V> source;
  private final String name;
  private Consumer<byte[], byte[]> consumer;
  private final List<PartitionState<K, V>> partitionStates;
  private KafkaRecord<K, V> curRecord;
  private Instant curTimestamp;
  private Iterator<PartitionState<K, V>> curBatch = Collections.emptyIterator();

  private Deserializer<K> keyDeserializerInstance = null;
  private Deserializer<V> valueDeserializerInstance = null;

  private final Counter elementsRead = SourceMetrics.elementsRead();
  private final Counter bytesRead = SourceMetrics.bytesRead();
  private final Counter elementsReadBySplit;
  private final Counter bytesReadBySplit;
  private final Gauge backlogBytesOfSplit;
  private final Gauge backlogElementsOfSplit;
  private final Counter checkpointMarkCommitsEnqueued =
      Metrics.counter(METRIC_NAMESPACE, CHECKPOINT_MARK_COMMITS_ENQUEUED_METRIC);
  // Checkpoint marks skipped in favor of newer mark (only the latest needs to be committed).
  private final Counter checkpointMarkCommitsSkipped =
      Metrics.counter(METRIC_NAMESPACE, CHECKPOINT_MARK_COMMITS_SKIPPED_METRIC);

  /**
   * The poll timeout while reading records from Kafka. If option to commit reader offsets in to
   * Kafka in {@link KafkaCheckpointMark#finalizeCheckpoint()} is enabled, it would be delayed until
   * this poll returns. It should be reasonably low as a result. At the same time it probably can't
   * be very low like 10 millis, I am not sure how it affects when the latency is high. Probably
   * good to experiment. Often multiple marks would be finalized in a batch, it it reduce
   * finalization overhead to wait a short while and finalize only the last checkpoint mark.
   */
  private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000);

  private static final Duration RECORDS_DEQUEUE_POLL_TIMEOUT = Duration.millis(10);
  private static final Duration RECORDS_ENQUEUE_POLL_TIMEOUT = Duration.millis(100);

  // Use a separate thread to read Kafka messages. Kafka Consumer does all its work including
  // network I/O inside poll(). Polling only inside #advance(), especially with a small timeout
  // like 100 milliseconds does not work well. This along with large receive buffer for
  // consumer achieved best throughput in tests (see `defaultConsumerProperties`).
  private final ExecutorService consumerPollThread = Executors.newSingleThreadExecutor();
  private AtomicReference<Exception> consumerPollException = new AtomicReference<>();
  private final SynchronousQueue<ConsumerRecords<byte[], byte[]>> availableRecordsQueue =
      new SynchronousQueue<>();
  private AtomicReference<KafkaCheckpointMark> finalizedCheckpointMark = new AtomicReference<>();
  private AtomicBoolean closed = new AtomicBoolean(false);

  // Backlog support :
  // Kafka consumer does not have an API to fetch latest offset for topic. We need to seekToEnd()
  // then look at position(). Use another consumer to do this so that the primary consumer does
  // not need to be interrupted. The latest offsets are fetched periodically on a thread. This is
  // still a bit of a hack, but so far there haven't been any issues reported by the users.
  private Consumer<byte[], byte[]> offsetConsumer;
  private final ScheduledExecutorService offsetFetcherThread =
      Executors.newSingleThreadScheduledExecutor();
  private static final int OFFSET_UPDATE_INTERVAL_SECONDS = 1;

  private static final long UNINITIALIZED_OFFSET = -1;

  // Add SpEL instance to cover the interface difference of Kafka client
  private transient ConsumerSpEL consumerSpEL;

  /** watermark before any records have been read. */
  private static Instant initialWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;

  @Override
  public String toString() {
    return name;
  }

  // Maintains approximate average over last 1000 elements
  private static class MovingAvg {
    private static final int MOVING_AVG_WINDOW = 1000;
    private double avg = 0;
    private long numUpdates = 0;

    void update(double quantity) {
      numUpdates++;
      avg += (quantity - avg) / Math.min(MOVING_AVG_WINDOW, numUpdates);
    }

    double get() {
      return avg;
    }
  }

  private static class TimestampPolicyContext extends TimestampPolicy.PartitionContext {

    private final long messageBacklog;
    private final Instant backlogCheckTime;

    TimestampPolicyContext(long messageBacklog, Instant backlogCheckTime) {
      this.messageBacklog = messageBacklog;
      this.backlogCheckTime = backlogCheckTime;
    }

    @Override
    public long getMessageBacklog() {
      return messageBacklog;
    }

    @Override
    public Instant getBacklogCheckTime() {
      return backlogCheckTime;
    }
  }

  // maintains state of each assigned partition (buffered records, consumed offset, etc)
  private static class PartitionState<K, V> {
    private final TopicPartition topicPartition;
    private long nextOffset;
    private long latestOffset;
    private Instant latestOffsetFetchTime;
    private Instant lastWatermark; // As returned by timestampPolicy
    private final TimestampPolicy<K, V> timestampPolicy;

    private Iterator<ConsumerRecord<byte[], byte[]>> recordIter = Collections.emptyIterator();

    private MovingAvg avgRecordSize = new MovingAvg();
    private MovingAvg avgOffsetGap = new MovingAvg(); // > 0 only when log compaction is enabled.

    PartitionState(
        TopicPartition partition, long nextOffset, TimestampPolicy<K, V> timestampPolicy) {
      this.topicPartition = partition;
      this.nextOffset = nextOffset;
      this.latestOffset = UNINITIALIZED_OFFSET;
      this.latestOffsetFetchTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
      this.lastWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
      this.timestampPolicy = timestampPolicy;
    }

    // Update consumedOffset, avgRecordSize, and avgOffsetGap
    void recordConsumed(long offset, int size, long offsetGap) {
      nextOffset = offset + 1;

      // This is always updated from single thread. Probably not worth making atomic.
      avgRecordSize.update(size);
      avgOffsetGap.update(offsetGap);
    }

    synchronized void setLatestOffset(long latestOffset, Instant fetchTime) {
      this.latestOffset = latestOffset;
      this.latestOffsetFetchTime = fetchTime;
      LOG.debug(
          "{}: latest offset update for {} : {} (consumer offset {}, avg record size {})",
          this,
          topicPartition,
          latestOffset,
          nextOffset,
          avgRecordSize);
    }

    synchronized long approxBacklogInBytes() {
      // Note that is an an estimate of uncompressed backlog.
      long backlogMessageCount = backlogMessageCount();
      if (backlogMessageCount == UnboundedReader.BACKLOG_UNKNOWN) {
        return UnboundedReader.BACKLOG_UNKNOWN;
      }
      return (long) (backlogMessageCount * avgRecordSize.get());
    }

    synchronized long backlogMessageCount() {
      if (latestOffset < 0 || nextOffset < 0) {
        return UnboundedReader.BACKLOG_UNKNOWN;
      }
      double remaining = (latestOffset - nextOffset) / (1 + avgOffsetGap.get());
      return Math.max(0, (long) Math.ceil(remaining));
    }

    synchronized TimestampPolicyContext mkTimestampPolicyContext() {
      return new TimestampPolicyContext(backlogMessageCount(), latestOffsetFetchTime);
    }

    Instant updateAndGetWatermark() {
      lastWatermark = timestampPolicy.getWatermark(mkTimestampPolicyContext());
      return lastWatermark;
    }
  }

  KafkaUnboundedReader(
      KafkaUnboundedSource<K, V> source, @Nullable KafkaCheckpointMark checkpointMark) {
    this.consumerSpEL = new ConsumerSpEL();
    this.source = source;
    this.name = "Reader-" + source.getId();

    List<TopicPartition> partitions = source.getSpec().getTopicPartitions();
    List<PartitionState<K, V>> states = new ArrayList<>(partitions.size());

    if (checkpointMark != null) {
      checkState(
          checkpointMark.getPartitions().size() == partitions.size(),
          "checkPointMark and assignedPartitions should match");
    }

    for (int i = 0; i < partitions.size(); i++) {
      TopicPartition tp = partitions.get(i);
      long nextOffset = UNINITIALIZED_OFFSET;
      Optional<Instant> prevWatermark = Optional.empty();

      if (checkpointMark != null) {
        // Verify that assigned and check-pointed partitions match exactly and set next offset.

        PartitionMark ckptMark = checkpointMark.getPartitions().get(i);

        TopicPartition partition = new TopicPartition(ckptMark.getTopic(), ckptMark.getPartition());
        checkState(
            partition.equals(tp),
            "checkpointed partition %s and assigned partition %s don't match",
            partition,
            tp);
        nextOffset = ckptMark.getNextOffset();
        prevWatermark = Optional.of(new Instant(ckptMark.getWatermarkMillis()));
      }

      states.add(
          new PartitionState<>(
              tp,
              nextOffset,
              source
                  .getSpec()
                  .getTimestampPolicyFactory()
                  .createTimestampPolicy(tp, prevWatermark)));
    }

    partitionStates = ImmutableList.copyOf(states);

    String splitId = String.valueOf(source.getId());
    elementsReadBySplit = SourceMetrics.elementsReadBySplit(splitId);
    bytesReadBySplit = SourceMetrics.bytesReadBySplit(splitId);
    backlogBytesOfSplit = SourceMetrics.backlogBytesOfSplit(splitId);
    backlogElementsOfSplit = SourceMetrics.backlogElementsOfSplit(splitId);
  }

  private void consumerPollLoop() {
    // Read in a loop and enqueue the batch of records, if any, to availableRecordsQueue.

    try {
      ConsumerRecords<byte[], byte[]> records = ConsumerRecords.empty();
      while (!closed.get()) {
        try {
          if (records.isEmpty()) {
            records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
          } else if (availableRecordsQueue.offer(
              records, RECORDS_ENQUEUE_POLL_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS)) {
            records = ConsumerRecords.empty();
          }
          KafkaCheckpointMark checkpointMark = finalizedCheckpointMark.getAndSet(null);
          if (checkpointMark != null) {
            commitCheckpointMark(checkpointMark);
          }
        } catch (InterruptedException e) {
          LOG.warn("{}: consumer thread is interrupted", this, e); // not expected
          break;
        } catch (WakeupException e) {
          break;
        }
      }
      LOG.info("{}: Returning from consumer pool loop", this);
    } catch (Exception e) { // mostly an unrecoverable KafkaException.
      LOG.error("{}: Exception while reading from Kafka", this, e);
      consumerPollException.set(e);
      throw e;
    }
  }

  private void commitCheckpointMark(KafkaCheckpointMark checkpointMark) {
    LOG.debug("{}: Committing finalized checkpoint {}", this, checkpointMark);

    consumer.commitSync(
        checkpointMark.getPartitions().stream()
            .filter(p -> p.getNextOffset() != UNINITIALIZED_OFFSET)
            .collect(
                Collectors.toMap(
                    p -> new TopicPartition(p.getTopic(), p.getPartition()),
                    p -> new OffsetAndMetadata(p.getNextOffset()))));
  }

  /**
   * Enqueue checkpoint mark to be committed to Kafka. This does not block until it is committed.
   * There could be a delay of up to KAFKA_POLL_TIMEOUT (1 second). Any checkpoint mark enqueued
   * earlier is dropped in favor of this checkpoint mark. Documentation for {@link
   * CheckpointMark#finalizeCheckpoint()} says these are finalized in order. Only the latest offsets
   * need to be committed.
   */
  void finalizeCheckpointMarkAsync(KafkaCheckpointMark checkpointMark) {
    if (finalizedCheckpointMark.getAndSet(checkpointMark) != null) {
      checkpointMarkCommitsSkipped.inc();
    }
    checkpointMarkCommitsEnqueued.inc();
  }

  private void nextBatch() throws IOException {
    curBatch = Collections.emptyIterator();

    ConsumerRecords<byte[], byte[]> records;
    try {
      // poll available records, wait (if necessary) up to the specified timeout.
      records =
          availableRecordsQueue.poll(
              RECORDS_DEQUEUE_POLL_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      LOG.warn("{}: Unexpected", this, e);
      return;
    }

    if (records == null) {
      // Check if the poll thread failed with an exception.
      if (consumerPollException.get() != null) {
        throw new IOException("Exception while reading from Kafka", consumerPollException.get());
      }
      return;
    }

    partitionStates.forEach(p -> p.recordIter = records.records(p.topicPartition).iterator());

    // cycle through the partitions in order to interleave records from each.
    curBatch = Iterators.cycle(new ArrayList<>(partitionStates));
  }

  private void setupInitialOffset(PartitionState pState) {
    Read<K, V> spec = source.getSpec();

    if (pState.nextOffset != UNINITIALIZED_OFFSET) {
      consumer.seek(pState.topicPartition, pState.nextOffset);
    } else {
      // nextOffset is uninitialized here, meaning start reading from latest record as of now
      // ('latest' is the default, and is configurable) or 'look up offset by startReadTime.
      // Remember the current position without waiting until the first record is read. This
      // ensures checkpoint is accurate even if the reader is closed before reading any records.
      Instant startReadTime = spec.getStartReadTime();
      if (startReadTime != null) {
        pState.nextOffset =
            consumerSpEL.offsetForTime(consumer, pState.topicPartition, spec.getStartReadTime());
        consumer.seek(pState.topicPartition, pState.nextOffset);
      } else {
        pState.nextOffset = consumer.position(pState.topicPartition);
      }
    }
  }

  // Update latest offset for each partition.
  // Called from setupInitialOffset() at the start and then periodically from offsetFetcher thread.
  private void updateLatestOffsets() {
    for (PartitionState p : partitionStates) {
      try {
        Instant fetchTime = Instant.now();
        consumerSpEL.evaluateSeek2End(offsetConsumer, p.topicPartition);
        long offset = offsetConsumer.position(p.topicPartition);
        p.setLatestOffset(offset, fetchTime);
      } catch (Exception e) {
        if (closed.get()) { // Ignore the exception if the reader is closed.
          break;
        }
        LOG.warn(
            "{}: exception while fetching latest offset for partition {}. will be retried.",
            this,
            p.topicPartition,
            e);
        // Don't update the latest offset.
      }
    }

    LOG.debug("{}:  backlog {}", this, getSplitBacklogBytes());
  }

  private void reportBacklog() {
    long splitBacklogBytes = getSplitBacklogBytes();
    if (splitBacklogBytes < 0) {
      splitBacklogBytes = UnboundedReader.BACKLOG_UNKNOWN;
    }
    backlogBytesOfSplit.set(splitBacklogBytes);
    long splitBacklogMessages = getSplitBacklogMessageCount();
    if (splitBacklogMessages < 0) {
      splitBacklogMessages = UnboundedReader.BACKLOG_UNKNOWN;
    }
    backlogElementsOfSplit.set(splitBacklogMessages);
  }

  private long getSplitBacklogMessageCount() {
    long backlogCount = 0;

    for (PartitionState p : partitionStates) {
      long pBacklog = p.backlogMessageCount();
      if (pBacklog == UnboundedReader.BACKLOG_UNKNOWN) {
        return UnboundedReader.BACKLOG_UNKNOWN;
      }
      backlogCount += pBacklog;
    }

    return backlogCount;
  }

  @VisibleForTesting
  Map<String, Object> getOffsetConsumerConfig() {
    Map<String, Object> offsetConsumerConfig = new HashMap<>(source.getSpec().getConsumerConfig());
    offsetConsumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

    Object groupId = source.getSpec().getConsumerConfig().get(ConsumerConfig.GROUP_ID_CONFIG);
    // override group_id and disable auto_commit so that it does not interfere with main consumer
    String offsetGroupId =
        String.format(
            "%s_offset_consumer_%d_%s",
            name, (new Random()).nextInt(Integer.MAX_VALUE), (groupId == null ? "none" : groupId));
    offsetConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, offsetGroupId);

    if (source.getSpec().getOffsetConsumerConfig() != null) {
      offsetConsumerConfig.putAll(source.getSpec().getOffsetConsumerConfig());
    }

    // Force read isolation level to 'read_uncommitted' for offset consumer. This consumer
    // fetches latest offset for two reasons : (a) to calculate backlog (number of records
    // yet to be consumed) (b) to advance watermark if the backlog is zero. The right thing to do
    // for (a) is to leave this config unchanged from the main config (i.e. if there are records
    // that can't be read because of uncommitted records before them, they shouldn't
    // ideally count towards backlog when "read_committed" is enabled. But (b)
    // requires finding out if there are any records left to be read (committed or uncommitted).
    // Rather than using two separate consumers we will go with better support for (b). If we do
    // hit a case where a lot of records are not readable (due to some stuck transactions), the
    // pipeline would report more backlog, but would not be able to consume it. It might be ok
    // since CPU consumed on the workers would be low and will likely avoid unnecessary upscale.
    offsetConsumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_uncommitted");

    return offsetConsumerConfig;
  }

  @Override
  public void close() throws IOException {
    closed.set(true);
    consumerPollThread.shutdown();
    offsetFetcherThread.shutdown();

    boolean isShutdown = false;

    // Wait for threads to shutdown. Trying this as a loop to handle a tiny race where poll thread
    // might block to enqueue right after availableRecordsQueue.poll() below.
    while (!isShutdown) {

      if (consumer != null) {
        consumer.wakeup();
      }
      if (offsetConsumer != null) {
        offsetConsumer.wakeup();
      }
      availableRecordsQueue.poll(); // drain unread batch, this unblocks consumer thread.
      try {
        isShutdown =
            consumerPollThread.awaitTermination(10, TimeUnit.SECONDS)
                && offsetFetcherThread.awaitTermination(10, TimeUnit.SECONDS);
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new RuntimeException(e); // not expected
      }

      if (!isShutdown) {
        LOG.warn("An internal thread is taking a long time to shutdown. will retry.");
      }
    }

    Closeables.close(keyDeserializerInstance, true);
    Closeables.close(valueDeserializerInstance, true);

    Closeables.close(offsetConsumer, true);
    Closeables.close(consumer, true);
  }
}
