/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.runners.direct;

import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.InMemoryMultimapSideInputView;
import org.apache.beam.runners.core.ReadyCheckingSideInputReader;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.Materializations;
import org.apache.beam.sdk.transforms.Materializations.MultimapView;
import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v20_0.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v20_0.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LoadingCache;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;

/**
 * An in-process container for {@link PCollectionView PCollectionViews}, which provides methods for
 * constructing {@link SideInputReader SideInputReaders} which block until a side input is available
 * and writing to a {@link PCollectionView}.
 */
class SideInputContainer {
  private final Collection<PCollectionView<?>> containedViews;
  private final LoadingCache<
          PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>>
      viewByWindows;

  /** Create a new {@link SideInputContainer} with the provided views and the provided context. */
  public static SideInputContainer create(
      final EvaluationContext context, Collection<PCollectionView<?>> containedViews) {
    for (PCollectionView<?> pCollectionView : containedViews) {
      checkArgument(
          Materializations.MULTIMAP_MATERIALIZATION_URN.equals(
              pCollectionView.getViewFn().getMaterialization().getUrn()),
          "This handler is only capable of dealing with %s materializations "
              + "but was asked to handle %s for PCollectionView with tag %s.",
          Materializations.MULTIMAP_MATERIALIZATION_URN,
          pCollectionView.getViewFn().getMaterialization().getUrn(),
          pCollectionView.getTagInternal().getId());
    }
    LoadingCache<PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>>
        viewByWindows = CacheBuilder.newBuilder().build(new CallbackSchedulingLoader(context));
    return new SideInputContainer(containedViews, viewByWindows);
  }

  private SideInputContainer(
      Collection<PCollectionView<?>> containedViews,
      LoadingCache<PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>>
          viewByWindows) {
    this.containedViews = ImmutableSet.copyOf(containedViews);
    this.viewByWindows = viewByWindows;
  }

  /**
   * Return a view of this {@link SideInputContainer} that contains only the views in the provided
   * argument. The returned {@link SideInputContainer} is unmodifiable without casting, but will
   * change as this {@link SideInputContainer} is modified.
   */
  public ReadyCheckingSideInputReader createReaderForViews(
      Collection<PCollectionView<?>> newContainedViews) {
    if (!containedViews.containsAll(newContainedViews)) {
      Set<PCollectionView<?>> currentlyContained = ImmutableSet.copyOf(containedViews);
      Set<PCollectionView<?>> newRequested = ImmutableSet.copyOf(newContainedViews);
      throw new IllegalArgumentException(
          "Can't create a SideInputReader with unknown views "
              + Sets.difference(newRequested, currentlyContained));
    }
    return new SideInputContainerSideInputReader(newContainedViews);
  }

  /**
   * Write the provided values to the provided view.
   *
   * <p>The windowed values are first exploded, then for each window the pane is determined. For
   * each window, if the pane is later than the current pane stored within this container, write all
   * of the values to the container as the new values of the {@link PCollectionView}.
   *
   * <p>The provided iterable is expected to contain only a single window and pane.
   */
  public void write(PCollectionView<?> view, Iterable<? extends WindowedValue<?>> values) {
    Map<BoundedWindow, Collection<WindowedValue<?>>> valuesPerWindow = indexValuesByWindow(values);
    for (Map.Entry<BoundedWindow, Collection<WindowedValue<?>>> windowValues :
        valuesPerWindow.entrySet()) {
      updatePCollectionViewWindowValues(view, windowValues.getKey(), windowValues.getValue());
    }
  }

  /** Index the provided values by all {@link BoundedWindow windows} in which they appear. */
  private Map<BoundedWindow, Collection<WindowedValue<?>>> indexValuesByWindow(
      Iterable<? extends WindowedValue<?>> values) {
    Map<BoundedWindow, Collection<WindowedValue<?>>> valuesPerWindow = new HashMap<>();
    for (WindowedValue<?> value : values) {
      for (BoundedWindow window : value.getWindows()) {
        Collection<WindowedValue<?>> windowValues =
            valuesPerWindow.computeIfAbsent(window, k -> new ArrayList<>());
        windowValues.add(value);
      }
    }
    return valuesPerWindow;
  }

  /**
   * Set the value of the {@link PCollectionView} in the {@link BoundedWindow} to be based on the
   * specified values, if the values are part of a later pane than currently exist within the {@link
   * PCollectionViewWindow}.
   */
  private void updatePCollectionViewWindowValues(
      PCollectionView<?> view, BoundedWindow window, Collection<WindowedValue<?>> windowValues) {
    PCollectionViewWindow<?> windowedView = PCollectionViewWindow.of(view, window);
    AtomicReference<Iterable<? extends WindowedValue<?>>> contents =
        viewByWindows.getUnchecked(windowedView);
    if (contents.compareAndSet(null, windowValues)) {
      // the value had never been set, so we set it and are done.
      return;
    }
    PaneInfo newPane = windowValues.iterator().next().getPane();

    Iterable<? extends WindowedValue<?>> existingValues;
    long existingPane;
    do {
      existingValues = contents.get();
      existingPane =
          Iterables.isEmpty(existingValues)
              ? -1L
              : existingValues.iterator().next().getPane().getIndex();
    } while (newPane.getIndex() > existingPane
        && !contents.compareAndSet(existingValues, windowValues));
  }

  private static class CallbackSchedulingLoader
      extends CacheLoader<
          PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>> {
    private final EvaluationContext context;

    public CallbackSchedulingLoader(EvaluationContext context) {
      this.context = context;
    }

    @Override
    public AtomicReference<Iterable<? extends WindowedValue<?>>> load(
        PCollectionViewWindow<?> view) {

      AtomicReference<Iterable<? extends WindowedValue<?>>> contents = new AtomicReference<>();
      WindowingStrategy<?, ?> windowingStrategy = view.getView().getWindowingStrategyInternal();

      context.scheduleAfterOutputWouldBeProduced(
          view.getView(),
          view.getWindow(),
          windowingStrategy,
          new WriteEmptyViewContents(view.getView(), view.getWindow(), contents));
      return contents;
    }
  }

  private static class WriteEmptyViewContents implements Runnable {
    private final PCollectionView<?> view;
    private final BoundedWindow window;
    private final AtomicReference<Iterable<? extends WindowedValue<?>>> contents;

    private WriteEmptyViewContents(
        PCollectionView<?> view,
        BoundedWindow window,
        AtomicReference<Iterable<? extends WindowedValue<?>>> contents) {
      this.contents = contents;
      this.view = view;
      this.window = window;
    }

    @Override
    public void run() {
      // The requested window has closed without producing elements, so reflect that in
      // the PCollectionView. If set has already been called, will do nothing.
      contents.compareAndSet(null, Collections.emptyList());
    }

    @Override
    public String toString() {
      return MoreObjects.toStringHelper(this).add("view", view).add("window", window).toString();
    }
  }

  private final class SideInputContainerSideInputReader implements ReadyCheckingSideInputReader {
    private final Collection<PCollectionView<?>> readerViews;
    private final LoadingCache<
            PCollectionViewWindow<?>, Optional<? extends Iterable<? extends WindowedValue<?>>>>
        viewContents;

    private SideInputContainerSideInputReader(Collection<PCollectionView<?>> readerViews) {
      this.readerViews = ImmutableSet.copyOf(readerViews);
      this.viewContents = CacheBuilder.newBuilder().build(new CurrentViewContentsLoader());
    }

    @Override
    public boolean isReady(final PCollectionView<?> view, final BoundedWindow window) {
      checkArgument(
          readerViews.contains(view),
          "Tried to check if view %s was ready in a SideInputReader that does not contain it. "
              + "Contained views; %s",
          view,
          readerViews);
      return viewContents.getUnchecked(PCollectionViewWindow.of(view, window)).isPresent();
    }

    @Override
    @Nullable
    public <T> T get(final PCollectionView<T> view, final BoundedWindow window) {
      checkArgument(
          readerViews.contains(view), "call to get(PCollectionView) with unknown view: %s", view);
      checkArgument(
          isReady(view, window),
          "calling get() on PCollectionView %s that is not ready in window %s",
          view,
          window);
      // Safe covariant cast since we know that the view only contains KVs.
      @SuppressWarnings("unchecked")
      Iterable<KV<?, ?>> elements =
          Iterables.transform(
              (Iterable<WindowedValue<KV<?, ?>>>)
                  viewContents.getUnchecked(PCollectionViewWindow.of(view, window)).get(),
              WindowedValue::getValue);

      ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>) view.getViewFn();
      Coder<?> keyCoder = ((KvCoder<?, ?>) view.getCoderInternal()).getKeyCoder();
      return (T)
          viewFn.apply(InMemoryMultimapSideInputView.fromIterable(keyCoder, (Iterable) elements));
    }

    @Override
    public <T> boolean contains(PCollectionView<T> view) {
      return readerViews.contains(view);
    }

    @Override
    public boolean isEmpty() {
      return readerViews.isEmpty();
    }
  }

  /**
   * A {@link CacheLoader} that loads the current contents of a {@link PCollectionViewWindow} into
   * an optional.
   */
  private class CurrentViewContentsLoader
      extends CacheLoader<
          PCollectionViewWindow<?>, Optional<? extends Iterable<? extends WindowedValue<?>>>> {

    @Override
    public Optional<? extends Iterable<? extends WindowedValue<?>>> load(
        PCollectionViewWindow<?> key) {
      return Optional.fromNullable(viewByWindows.getUnchecked(key).get());
    }
  }
}
