/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.fn.harness;

import com.google.auto.service.AutoService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
import org.apache.beam.model.pipeline.v1.RunnerApi.StandardPTransforms;
import org.apache.beam.runners.core.construction.BeamUrns;
import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
import org.apache.beam.sdk.function.ThrowingFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.transforms.windowing.WindowFn.MergeContext;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;

/**
 * Merges windows using a {@link org.apache.beam.sdk.transforms.windowing.WindowFn}.
 *
 * <p>Window merging function:
 *
 * <ul>
 *   <li>Input: {@code KV<nonce, iterable<OriginalWindow>>}
 *   <li>Output: {@code KV<nonce, KV<iterable<UnmergedOriginalWindow>, iterable<KV<MergedWindow,
 *       iterable<ConsumedOriginalWindow>>>>}
 * </ul>
 *
 * <p>For each set of original windows, a list of all unmerged windows is output alongside a map of
 * merged window to set of consumed windows. All original windows must be contained in either the
 * unmerged original window set or one of the consumed original window sets. Each original window
 * can only be part of one output set. The nonce is used by a runner to associate each input with
 * its output. The nonce is represented as an opaque set of bytes.
 */
public abstract class WindowMergingFnRunner<T, W extends BoundedWindow> {
  static final String URN = BeamUrns.getUrn(StandardPTransforms.Primitives.MERGE_WINDOWS);

  /**
   * A registrar which provides a factory to handle merging windows based upon the {@link WindowFn}.
   */
  @AutoService(PTransformRunnerFactory.Registrar.class)
  public static class Registrar implements PTransformRunnerFactory.Registrar {

    @Override
    public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
      return ImmutableMap.of(
          URN,
          MapFnRunners.forValueMapFnFactory(WindowMergingFnRunner::createMapFunctionForPTransform));
    }
  }

  static <T, W extends BoundedWindow>
      ThrowingFunction<KV<T, Iterable<W>>, KV<T, KV<Iterable<W>, Iterable<KV<W, Iterable<W>>>>>>
          createMapFunctionForPTransform(String ptransformId, PTransform ptransform)
              throws IOException {
    RunnerApi.SdkFunctionSpec payload =
        RunnerApi.SdkFunctionSpec.parseFrom(ptransform.getSpec().getPayload());

    WindowFn<?, W> windowFn =
        (WindowFn<?, W>) WindowingStrategyTranslation.windowFnFromProto(payload);
    return WindowMergingFnRunner.<T, W>create(windowFn)::mergeWindows;
  }

  static <T, W extends BoundedWindow> WindowMergingFnRunner<T, W> create(WindowFn<?, W> windowFn) {
    if (windowFn.isNonMerging()) {
      return new NonMergingWindowFnRunner();
    } else {
      return new MergingViaWindowFnRunner(windowFn);
    }
  }

  /**
   * Returns the set of unmerged windows and a mapping from merged windows to sets of original
   * windows.
   */
  abstract KV<T, KV<Iterable<W>, Iterable<KV<W, Iterable<W>>>>> mergeWindows(
      KV<T, Iterable<W>> windowsToMerge) throws Exception;

  /////////////////////////////////////////////////////////////////////////////////////////////////

  /**
   * An optimized version of window merging where the {@link WindowFn} does not do any window
   * merging.
   *
   * <p>Note that this is likely to never be invoked and the identity mapping will be handled
   * directly by runners. We have this here because runners may not perform this optimization.
   */
  private static class NonMergingWindowFnRunner<T, W extends BoundedWindow>
      extends WindowMergingFnRunner<T, W> {
    @Override
    KV<T, KV<Iterable<W>, Iterable<KV<W, Iterable<W>>>>> mergeWindows(
        KV<T, Iterable<W>> windowsToMerge) {
      return KV.of(
          windowsToMerge.getKey(), KV.of(windowsToMerge.getValue(), Collections.emptyList()));
    }
  }

  /** An implementation which uses a {@link WindowFn} to merge windows. */
  private static class MergingViaWindowFnRunner<T, W extends BoundedWindow>
      extends WindowMergingFnRunner<T, W> {
    private final WindowFn<T, W> windowFn;
    private final WindowFn<?, W>.MergeContext mergeContext;
    private Collection<W> currentWindows;
    private List<KV<W, Collection<W>>> mergedWindows;

    private MergingViaWindowFnRunner(WindowFn<T, W> windowFn) {
      this.windowFn = windowFn;
      this.mergedWindows = new ArrayList<>();
      this.currentWindows = new ArrayList<>();
      this.mergeContext =
          windowFn.new MergeContext() {

            @Override
            public Collection<W> windows() {
              return currentWindows;
            }

            @Override
            public void merge(Collection<W> toBeMerged, W mergeResult) throws Exception {
              mergedWindows.add(KV.of(mergeResult, toBeMerged));
            }
          };
    }

    @Override
    KV<T, KV<Iterable<W>, Iterable<KV<W, Iterable<W>>>>> mergeWindows(
        KV<T, Iterable<W>> windowsToMerge) throws Exception {
      currentWindows = Sets.newHashSet(windowsToMerge.getValue());
      windowFn.mergeWindows((MergeContext) mergeContext);
      for (KV<W, Collection<W>> mergedWindow : mergedWindows) {
        currentWindows.removeAll(mergedWindow.getValue());
      }
      return KV.of(windowsToMerge.getKey(), KV.of(currentWindows, (Iterable) mergedWindows));
    }
  }
}
