/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.runners.dataflow.worker;

import static com.google.api.client.util.Preconditions.checkNotNull;
import static org.apache.beam.runners.dataflow.worker.SourceTranslationUtils.cloudPositionToReaderPosition;
import static org.apache.beam.runners.dataflow.worker.SourceTranslationUtils.cloudProgressToReaderProgress;
import static org.apache.beam.runners.dataflow.worker.SourceTranslationUtils.splitRequestToApproximateSplitRequest;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;

import com.google.api.services.dataflow.model.ApproximateReportedProgress;
import com.google.api.services.dataflow.model.ApproximateSplitRequest;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
import org.apache.beam.runners.dataflow.worker.ExperimentContext.Experiment;
import org.apache.beam.runners.dataflow.worker.counters.Counter;
import org.apache.beam.runners.dataflow.worker.counters.CounterName;
import org.apache.beam.runners.dataflow.worker.util.ValueInEmptyWindows;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ByteArrayShufflePosition;
import org.apache.beam.runners.dataflow.worker.util.common.worker.GroupingShuffleEntryIterator;
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ShuffleEntry;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ShuffleEntryReader;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ShuffleReadCounter;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ShuffleReadCounterFactory;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
import org.apache.beam.sdk.util.common.ElementByteSizeObservableIterable;
import org.apache.beam.sdk.util.common.ElementByteSizeObservableIterator;
import org.apache.beam.sdk.util.common.Reiterable;
import org.apache.beam.sdk.util.common.Reiterator;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A source that reads from a shuffled dataset and yields key-grouped data.
 *
 * @param <K> the type of the keys read from the shuffle
 * @param <V> the type of the values read from the shuffle
 */
public class GroupingShuffleReader<K, V> extends NativeReader<WindowedValue<KV<K, Reiterable<V>>>> {
  private static final Logger LOG = LoggerFactory.getLogger(GroupingShuffleReader.class);

  private final PipelineOptions options;
  final byte[] shuffleReaderConfig;
  @Nullable final String startShufflePosition;
  @Nullable final String stopShufflePosition;
  final BatchModeExecutionContext executionContext;
  private final DataflowOperationContext operationContext;
  private final ShuffleReadCounterFactory shuffleReadCounterFactory;

  // Counts how many bytes were from by a given operation from a given shuffle session.
  @Nullable Counter<Long, Long> perOperationPerDatasetBytesCounter;
  Coder<K> keyCoder;
  Coder<?> valueCoder;
  @Nullable Coder<?> secondaryKeyCoder;

  public GroupingShuffleReader(
      PipelineOptions options,
      byte[] shuffleReaderConfig,
      @Nullable String startShufflePosition,
      @Nullable String stopShufflePosition,
      Coder<WindowedValue<KV<K, Iterable<V>>>> coder,
      BatchModeExecutionContext executionContext,
      DataflowOperationContext operationContext,
      ShuffleReadCounterFactory shuffleReadCounterFactory,
      boolean valuesAreSorted)
      throws Exception {
    this.options = options;
    this.shuffleReaderConfig = shuffleReaderConfig;
    this.startShufflePosition = startShufflePosition;
    this.stopShufflePosition = stopShufflePosition;
    this.executionContext = executionContext;
    this.operationContext = operationContext;
    this.shuffleReadCounterFactory = shuffleReadCounterFactory;
    initCoder(coder, valuesAreSorted);
    // We cannot initialize perOperationPerDatasetBytesCounter here, as it
    // depends on shuffleReaderConfig, which isn't populated yet.
  }

  private synchronized void initCounter(String datasetId) {
    if (perOperationPerDatasetBytesCounter == null) {
      perOperationPerDatasetBytesCounter =
          operationContext
              .counterFactory()
              .longSum(
                  CounterName.named(
                      String.format(
                          "dax-shuffle-%s-wf-%s-read-bytes",
                          datasetId, operationContext.nameContext().systemName())));
    }
  }

  @Override
  public GroupingShuffleReaderIterator<K, V> iterator() throws IOException {
    ApplianceShuffleEntryReader entryReader =
        new ApplianceShuffleEntryReader(
            shuffleReaderConfig, executionContext, operationContext, true);

    initCounter(entryReader.getDatasetId());

    return iterator(entryReader);
  }

  private void initCoder(Coder<WindowedValue<KV<K, Iterable<V>>>> coder, boolean valuesAreSorted)
      throws Exception {
    if (!(coder instanceof WindowedValueCoder)) {
      throw new Exception("unexpected kind of coder for WindowedValue: " + coder);
    }
    Coder<KV<K, Iterable<V>>> elemCoder =
        ((WindowedValueCoder<KV<K, Iterable<V>>>) coder).getValueCoder();
    if (!(elemCoder instanceof KvCoder)) {
      throw new Exception(
          "unexpected kind of coder for elements read from "
              + "a key-grouping shuffle: "
              + elemCoder);
    }

    @SuppressWarnings("unchecked")
    KvCoder<K, Iterable<V>> kvCoder = (KvCoder<K, Iterable<V>>) elemCoder;
    this.keyCoder = kvCoder.getKeyCoder();
    Coder<Iterable<V>> kvValueCoder = kvCoder.getValueCoder();
    if (!(kvValueCoder instanceof IterableCoder)) {
      throw new Exception(
          "unexpected kind of coder for values of KVs read from " + "a key-grouping shuffle");
    }
    IterableCoder<V> iterCoder = (IterableCoder<V>) kvValueCoder;
    if (valuesAreSorted) {
      checkState(
          iterCoder.getElemCoder() instanceof KvCoder,
          "unexpected kind of coder for elements read from a "
              + "key-grouping value sorting shuffle: %s",
          iterCoder.getElemCoder());
      @SuppressWarnings("rawtypes")
      KvCoder<?, ?> valueKvCoder = (KvCoder) iterCoder.getElemCoder();
      this.secondaryKeyCoder = valueKvCoder.getKeyCoder();
      this.valueCoder = valueKvCoder.getValueCoder();
    } else {
      this.valueCoder = iterCoder.getElemCoder();
    }
  }

  /**
   * Creates an iterator on top of the given entry reader.
   *
   * <p>Takes "ownership" of the reader: closes the reader once the iterator is closed.
   */
  final GroupingShuffleReaderIterator<K, V> iterator(ShuffleEntryReader reader) {
    return new GroupingShuffleReaderIterator<K, V>(this, reader);
  }

  /**
   * A ReaderIterator that reads from a ShuffleEntryReader and groups all the values with the same
   * key.
   *
   * <p>A key limitation of this implementation is that all iterator accesses must by externally
   * synchronized (the iterator objects are not individually thread-safe, and the iterators derived
   * from a single original iterator access shared state that is not thread-safe).
   *
   * <p>To access the current position, the iterator must advance on-demand and cache the next batch
   * of key grouped shuffle entries. The iterator does not advance a second time in @next() to avoid
   * asking the underlying iterator to advance to the next key before the caller/user iterates over
   * the values corresponding to the current key, which would introduce a performance penalty.
   */
  @VisibleForTesting
  static final class GroupingShuffleReaderIterator<K, V>
      extends NativeReaderIterator<WindowedValue<KV<K, Reiterable<V>>>> {
    // The enclosing GroupingShuffleReader.
    private final GroupingShuffleReader<K, V> parentReader;
    private final ShuffleEntryReader entryReader;

    /** The iterator over shuffle entries, grouped by common key. */
    private final GroupingShuffleEntryIterator groups;

    private final AtomicLong currentGroupSize = new AtomicLong(0L);

    private final ShuffleReadCounter shuffleReadCounter;

    private ExecutionStateTracker tracker;
    private ExecutionState readState;
    private WindowedValue<KV<K, Reiterable<V>>> current;

    public GroupingShuffleReaderIterator(
        final GroupingShuffleReader<K, V> parentReader, ShuffleEntryReader entryReader) {
      this.parentReader = parentReader;
      this.entryReader = entryReader;
      this.readState =
          parentReader.operationContext.newExecutionState(ExecutionStateTracker.PROCESS_STATE_NAME);
      this.tracker = parentReader.executionContext.getExecutionStateTracker();
      String baseShuffleOriginalStepName =
          parentReader.operationContext.nameContext().originalName();

      // Note: Some usage of the GroupingShuffleReader may not set the
      // shuffleReadCounterFactory.
      ExperimentContext ec = ExperimentContext.parseFrom(parentReader.options);
      if (parentReader.shuffleReadCounterFactory == null) {
        this.shuffleReadCounter = null;
      } else {
        this.shuffleReadCounter =
            parentReader.shuffleReadCounterFactory.create(
                baseShuffleOriginalStepName,
                ec.isEnabled(Experiment.IntertransformIO),
                parentReader.perOperationPerDatasetBytesCounter);
      }

      try (Closeable read = tracker.enterState(readState)) {
        this.groups =
            new GroupingShuffleEntryIterator(
                entryReader,
                parentReader.startShufflePosition == null
                    ? null
                    : ByteArrayShufflePosition.fromBase64(parentReader.startShufflePosition),
                parentReader.stopShufflePosition == null
                    ? null
                    : ByteArrayShufflePosition.fromBase64(parentReader.stopShufflePosition)) {
              @Override
              protected void notifyElementRead(long byteSize) {
                // We accumulate the sum of bytes read in a local variable. This sum will be counted
                // when the values are actually read by the consumer of the shuffle reader.
                currentGroupSize.addAndGet(byteSize);
                parentReader.notifyElementRead(byteSize);
              }

              @Override
              protected void commitBytesRead(long bytes) {
                if (shuffleReadCounter != null) {
                  shuffleReadCounter.addBytesRead(bytes);
                }
              }
            };
      } catch (IOException e) {
        throw new RuntimeException(e);
      }
    }

    @Override
    public boolean start() throws IOException {
      return advance();
    }

    @Override
    public boolean advance() throws IOException {
      try (Closeable read = tracker.enterState(readState)) {
        if (!groups.advance()) {
          current = null;
          return false;
        }
      }

      K key = CoderUtils.decodeFromByteArray(parentReader.keyCoder, groups.getCurrent().key);
      parentReader.executionContext.setKey(key);
      current =
          new ValueInEmptyWindows<>(
              KV.<K, Reiterable<V>>of(key, new ValuesIterable(groups.getCurrent().values)));
      return true;
    }

    @Override
    public WindowedValue<KV<K, Reiterable<V>>> getCurrent() throws NoSuchElementException {
      if (current == null) {
        throw new NoSuchElementException();
      }
      return current;
    }

    /**
     * Returns the position before the next {@code KV<K, Reiterable<V>>} to be returned by the
     * {@link GroupingShuffleReaderIterator}. Returns null if the {@link
     * GroupingShuffleReaderIterator} is finished.
     */
    @Override
    public Progress getProgress() {
      com.google.api.services.dataflow.model.Position position =
          new com.google.api.services.dataflow.model.Position();
      ApproximateReportedProgress progress = new ApproximateReportedProgress();
      ByteArrayShufflePosition groupStart =
          (ByteArrayShufflePosition) groups.getRangeTracker().getLastGroupStart();
      if (groupStart != null) {
        position.setShufflePosition(groupStart.encodeBase64());
        progress.setPosition(position);
      }
      com.google.api.services.dataflow.model.ReportedParallelism consumedParallelism =
          new com.google.api.services.dataflow.model.ReportedParallelism();
      consumedParallelism.setValue((double) groups.getRangeTracker().getSplitPointsProcessed());
      progress.setConsumedParallelism(consumedParallelism);
      return cloudProgressToReaderProgress(progress);
    }

    /**
     * Updates the stop position of the shuffle source to the position proposed. Ignores the
     * proposed stop position if it is smaller than or equal to the position before the next {@code
     * KV<K, Reiterable<V>>} to be returned by the {@link GroupingShuffleReaderIterator}.
     */
    @Override
    public DynamicSplitResult requestDynamicSplit(DynamicSplitRequest splitRequest) {
      checkNotNull(splitRequest);
      ApproximateSplitRequest splitProgress = splitRequestToApproximateSplitRequest(splitRequest);
      com.google.api.services.dataflow.model.Position splitPosition = splitProgress.getPosition();
      if (splitPosition == null) {
        LOG.warn(
            "GroupingShuffleReader only supports split at a Position. Requested: {}", splitRequest);
        return null;
      }
      String splitShufflePosition = splitPosition.getShufflePosition();
      if (splitShufflePosition == null) {
        LOG.warn(
            "GroupingShuffleReader only supports split at a shuffle position. Requested: {}",
            splitPosition);
        return null;
      }
      ByteArrayShufflePosition newStopPosition =
          ByteArrayShufflePosition.fromBase64(splitShufflePosition);
      if (groups.trySplitAtPosition(newStopPosition)) {
        return new DynamicSplitResultWithPosition(cloudPositionToReaderPosition(splitPosition));
      } else {
        return null;
      }
    }

    @Override
    public void close() throws IOException {
      entryReader.close();
    }

    /**
     * Provides the {@link Reiterable} used to iterate through the values part of a {@code KV<K,
     * Reiterable<V>>} entry produced by a {@link GroupingShuffleReader}.
     */
    private final class ValuesIterable extends ElementByteSizeObservableIterable<V, ValuesIterator>
        implements Reiterable<V> {
      // N.B. This class is *not* static; it uses the valueCoder from
      // its enclosing GroupingShuffleReader.

      private final Reiterable<ShuffleEntry> base;

      public ValuesIterable(Reiterable<ShuffleEntry> base) {
        this.base = checkNotNull(base);
      }

      @Override
      public ValuesIterator iterator() {
        return new ValuesIterator(base.iterator());
      }

      @Override
      protected ValuesIterator createIterator() {
        return iterator();
      }
    }

    /**
     * Provides the {@link Reiterator} used to iterate through the values part of a {@code KV<K,
     * Reiterable<V>>} entry produced by a {@link GroupingShuffleReader}.
     */
    private final class ValuesIterator extends ElementByteSizeObservableIterator<V>
        implements Reiterator<V> {
      // N.B. This class is *not* static; it uses the valueCoder from
      // its enclosing GroupingShuffleReader.

      private final Reiterator<ShuffleEntry> base;

      public ValuesIterator(Reiterator<ShuffleEntry> base) {
        this.base = checkNotNull(base);
      }

      @Override
      public boolean hasNext() {
        try (Closeable read = tracker.enterState(readState)) {
          return base.hasNext();
        } catch (IOException e) {
          throw new RuntimeException(e);
        }
      }

      @Override
      public V next() {
        try (Closeable read = tracker.enterState(readState)) {
          ShuffleEntry entry = base.next();
          checkNotNull(entry);

          // The shuffle entries are handed over to the consumer of this iterator. Therefore, we can
          // notify the bytes that have been read so far.
          notifyValueReturned(currentGroupSize.getAndSet(0L));
          try {
            if (parentReader.secondaryKeyCoder != null) {
              ByteArrayInputStream bais = new ByteArrayInputStream(entry.getSecondaryKey());
              @SuppressWarnings("unchecked")
              V value =
                  (V)
                      KV.of(
                          // We ignore decoding the timestamp.
                          parentReader.secondaryKeyCoder.decode(bais),
                          CoderUtils.decodeFromByteArray(
                              parentReader.valueCoder, entry.getValue()));
              return value;
            } else {
              @SuppressWarnings("unchecked")
              V value =
                  (V) CoderUtils.decodeFromByteArray(parentReader.valueCoder, entry.getValue());
              return value;
            }
          } catch (IOException exn) {
            throw new RuntimeException(exn);
          }
        } catch (IOException e) {
          throw new RuntimeException(e);
        }
      }

      @Override
      public void remove() {
        base.remove();
      }

      @Override
      public ValuesIterator copy() {
        return new ValuesIterator(base.copy());
      }
    }
  }
}
