/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.runners.dataflow.worker;

import java.io.Closeable;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState;
import org.apache.beam.runners.dataflow.worker.counters.Counter;
import org.apache.beam.runners.dataflow.worker.counters.CounterName;
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;

/**
 * This class tracks time and bytes spent when consuming side inputs.
 *
 * <p>This class is designed to track consumption of side inputs across fused steps.
 *
 * <p>We represent a side input as a declaring step, and an input index. The declaring step is the
 * step that originally receives the side input for consumption, and the input index in which the
 * declaring step receives the side input that we want to identify. The declaring step originally
 * receives the side input, but it may not be the only step that spends time reading from this side
 * input. Therefore, to represent the actual consumption of a side input, it is necessary to use
 * three things: 1) the declaring step, 2) the input index, and 3) the currently executing step (as
 * it may be different from the declaring step).
 *
 * <p>The following table summarizes the two different steps tracked when it comes to side inputs:
 *
 * <table>
 *   <tr>
 *     <th>Side Input Read Counter tracks two steps:</th>
 *     <th>Declaring Step Name</th>
 *     <th>Requesting Step Name</th>
 *   </tr>
 *   <tr>
 *     <td>The value of these can vary because:</td>
 *     <td>IsmReader instances are shared between multiple threads for the same PCollectionView</td>
 *     <td>The same Lazy Iterable can be passed to and consumed by downstream steps as an output
 *     element.</td>
 *   </tr>
 *   <tr>
 *     <td>The current value is tracked by:</td>
 *     <td>DelegatingIsmReader and DataflowSideInputReadCounter</td>
 *     <td>The ExecutionStateTracker.</td>
 *   </tr>
 *   <tr>
 *     <td>The current value is read by:</td>
 *     <td>IsmReader and IsmPrefixReaderIterator</td>
 *     <td>DataflowSideInputReadCounter, in the checkStep function.</td>
 *   </tr>
 * </table>
 *
 * <p>This is the case for both batch pipelines, and streaming pipelines, although the underlying
 * storage layers are different (GCS for Batch, Windmill state for Streaming).
 *
 * <p>As an example of a pipeline where the declaring step and the consuming step of a side input
 * are not the same, suppose a pipeline of the following form:
 *
 * <p>SideInputPCollection -> View.AsIterable ------------------ | v MainInputPCollection --->
 * FirstParDo(...).withSideInput() -> AnotherParDo(...)
 *
 * <p>In this pipeline, the FirstParDo transform may simply emit the Iterable provided by the side
 * input, and the AnotherParDo may be the one that actually manipulates that Iterable. This is
 * possible because both ParDos will be fused, so they will simply exchange objects in memory.
 */
public class DataflowSideInputReadCounter implements SideInputReadCounter {
  private final DataflowExecutionContext executionContext;

  /** These two attributes describe the side input via a declaring step name, and an input index. */
  private final DataflowOperationContext declaringOperationContext;

  private final int sideInputIndex;

  /** Maps containing the byte counters, and execution states associated to different steps. */
  private final Map<NameContext, Counter<Long, Long>> byteCounters;

  private final Map<NameContext, DataflowExecutionState> executionStates;

  /**
   * {@link Counter}, {@link DataflowExecutionState}, and {@link NameContext} associated to the
   * latest step to have consumed the side input.
   */
  private Counter<Long, Long> currentCounter;

  private DataflowExecutionState currentExecutionState;
  private NameContext latestConsumingStepName;

  public DataflowSideInputReadCounter(
      DataflowExecutionContext executionContext,
      DataflowOperationContext operationContext,
      int sideInputIndex) {
    this.executionContext = executionContext;
    this.sideInputIndex = sideInputIndex;
    this.declaringOperationContext = operationContext;
    byteCounters = new HashMap<>();
    executionStates = new HashMap<>();
    updateCurrentStateIfOutdated();
  }

  private void updateCurrentStateIfOutdated() {
    DataflowExecutionState currentState =
        (DataflowExecutionState) executionContext.getExecutionStateTracker().getCurrentState();
    if (currentState == null
        || currentState.getStepName().originalName() == null
        || Objects.equals(latestConsumingStepName, currentState.getStepName())) {
      // In this case, the step has not changed, and we'll just return.
      return;
    }
    if (!byteCounters.containsKey(currentState.getStepName())) {
      byteCounters.put(
          currentState.getStepName(),
          executionContext
              .getCounterFactory()
              .longSum(
                  CounterName.named("read-sideinput-byte-count")
                      .withOriginalName(declaringOperationContext.nameContext())
                      .withOrigin("SYSTEM")
                      .withOriginalRequestingStepName(currentState.getStepName().originalName())
                      .withInputIndex(sideInputIndex)));

      executionStates.put(
          currentState.getStepName(),
          executionContext
              .getExecutionStateRegistry()
              .getIOState(
                  declaringOperationContext.nameContext(),
                  "read-sideinput",
                  currentState.getStepName().originalName(),
                  sideInputIndex,
                  currentState.getMetricsContainer(),
                  currentState.getProfileScope()));
    }
    currentCounter = byteCounters.get(currentState.getStepName());
    currentExecutionState = executionStates.get(currentState.getStepName());
    latestConsumingStepName = currentState.getStepName();
  }

  @Override
  public void addBytesRead(long n) {
    if (currentCounter != null) {
      currentCounter.addValue(n);
    }
  }

  @Override
  public Closeable enter() {
    // Only update status from tracked thread to avoid race condition and inconsistent state updates
    if (executionContext.getExecutionStateTracker().getTrackedThread() != Thread.currentThread()) {
      return () -> {};
    }
    updateCurrentStateIfOutdated();
    return executionContext.getExecutionStateTracker().enterState(currentExecutionState);
  }

  @Override
  public String toString() {
    return MoreObjects.toStringHelper(this)
        .add("sideInputIndex", sideInputIndex)
        .add("declaringStep", declaringOperationContext.nameContext())
        .toString();
  }
}
