/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.sdk.values;

import java.io.Serializable;
import java.util.Objects;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.transforms.windowing.Window.OnTimeBehavior;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects;
import org.joda.time.Duration;

/**
 * A {@code WindowingStrategy} describes the windowing behavior for a specific collection of values.
 * It has both a {@link WindowFn} describing how elements are assigned to windows and a {@link
 * Trigger} that controls when output is produced for each window.
 *
 * @param <T> type of elements being windowed
 * @param <W> {@link BoundedWindow} subclass used to represent the windows used by this {@code
 *     WindowingStrategy}
 */
public class WindowingStrategy<T, W extends BoundedWindow> implements Serializable {

  /** The accumulation modes that can be used with windowing. */
  public enum AccumulationMode {
    DISCARDING_FIRED_PANES,
    ACCUMULATING_FIRED_PANES
  }

  private static final Duration DEFAULT_ALLOWED_LATENESS = Duration.ZERO;
  private static final WindowingStrategy<Object, GlobalWindow> DEFAULT = of(new GlobalWindows());

  private final WindowFn<T, W> windowFn;
  private final Trigger trigger;
  private final AccumulationMode mode;
  private final Duration allowedLateness;
  private final ClosingBehavior closingBehavior;
  private final OnTimeBehavior onTimeBehavior;
  private final TimestampCombiner timestampCombiner;
  private final boolean triggerSpecified;
  private final boolean modeSpecified;
  private final boolean allowedLatenessSpecified;
  private final boolean timestampCombinerSpecified;

  private WindowingStrategy(
      WindowFn<T, W> windowFn,
      Trigger trigger,
      boolean triggerSpecified,
      AccumulationMode mode,
      boolean modeSpecified,
      Duration allowedLateness,
      boolean allowedLatenessSpecified,
      TimestampCombiner timestampCombiner,
      boolean timestampCombinerSpecified,
      ClosingBehavior closingBehavior,
      OnTimeBehavior onTimeBehavior) {
    this.windowFn = windowFn;
    this.trigger = trigger;
    this.triggerSpecified = triggerSpecified;
    this.mode = mode;
    this.modeSpecified = modeSpecified;
    this.allowedLateness = allowedLateness;
    this.allowedLatenessSpecified = allowedLatenessSpecified;
    this.closingBehavior = closingBehavior;
    this.onTimeBehavior = onTimeBehavior;
    this.timestampCombiner = timestampCombiner;
    this.timestampCombinerSpecified = timestampCombinerSpecified;
  }

  /** Return a fully specified, default windowing strategy. */
  public static WindowingStrategy<Object, GlobalWindow> globalDefault() {
    return DEFAULT;
  }

  public static <T, W extends BoundedWindow> WindowingStrategy<T, W> of(WindowFn<T, W> windowFn) {
    return new WindowingStrategy<>(
        windowFn,
        DefaultTrigger.of(),
        false,
        AccumulationMode.DISCARDING_FIRED_PANES,
        false,
        DEFAULT_ALLOWED_LATENESS,
        false,
        TimestampCombiner.END_OF_WINDOW,
        false,
        ClosingBehavior.FIRE_IF_NON_EMPTY,
        OnTimeBehavior.FIRE_ALWAYS);
  }

  public WindowFn<T, W> getWindowFn() {
    return windowFn;
  }

  public Trigger getTrigger() {
    return trigger;
  }

  public boolean isTriggerSpecified() {
    return triggerSpecified;
  }

  public Duration getAllowedLateness() {
    return allowedLateness;
  }

  public boolean isAllowedLatenessSpecified() {
    return allowedLatenessSpecified;
  }

  public AccumulationMode getMode() {
    return mode;
  }

  public boolean isModeSpecified() {
    return modeSpecified;
  }

  public ClosingBehavior getClosingBehavior() {
    return closingBehavior;
  }

  public OnTimeBehavior getOnTimeBehavior() {
    return onTimeBehavior;
  }

  public TimestampCombiner getTimestampCombiner() {
    return timestampCombiner;
  }

  public boolean isTimestampCombinerSpecified() {
    return timestampCombinerSpecified;
  }

  /**
   * Returns a {@link WindowingStrategy} identical to {@code this} but with the trigger set to
   * {@code wildcardTrigger}.
   */
  public WindowingStrategy<T, W> withTrigger(Trigger trigger) {
    return new WindowingStrategy<>(
        windowFn,
        trigger,
        true,
        mode,
        modeSpecified,
        allowedLateness,
        allowedLatenessSpecified,
        timestampCombiner,
        timestampCombinerSpecified,
        closingBehavior,
        onTimeBehavior);
  }

  /**
   * Returns a {@link WindowingStrategy} identical to {@code this} but with the accumulation mode
   * set to {@code mode}.
   */
  public WindowingStrategy<T, W> withMode(AccumulationMode mode) {
    return new WindowingStrategy<>(
        windowFn,
        trigger,
        triggerSpecified,
        mode,
        true,
        allowedLateness,
        allowedLatenessSpecified,
        timestampCombiner,
        timestampCombinerSpecified,
        closingBehavior,
        onTimeBehavior);
  }

  /**
   * Returns a {@link WindowingStrategy} identical to {@code this} but with the window function set
   * to {@code wildcardWindowFn}.
   */
  public WindowingStrategy<T, W> withWindowFn(WindowFn<?, ?> wildcardWindowFn) {
    @SuppressWarnings("unchecked")
    WindowFn<T, W> typedWindowFn = (WindowFn<T, W>) wildcardWindowFn;

    return new WindowingStrategy<>(
        typedWindowFn,
        trigger,
        triggerSpecified,
        mode,
        modeSpecified,
        allowedLateness,
        allowedLatenessSpecified,
        timestampCombiner,
        timestampCombinerSpecified,
        closingBehavior,
        onTimeBehavior);
  }

  /**
   * Returns a {@link WindowingStrategy} identical to {@code this} but with the allowed lateness set
   * to {@code allowedLateness}.
   */
  public WindowingStrategy<T, W> withAllowedLateness(Duration allowedLateness) {
    return new WindowingStrategy<>(
        windowFn,
        trigger,
        triggerSpecified,
        mode,
        modeSpecified,
        allowedLateness,
        true,
        timestampCombiner,
        timestampCombinerSpecified,
        closingBehavior,
        onTimeBehavior);
  }

  public WindowingStrategy<T, W> withClosingBehavior(ClosingBehavior closingBehavior) {
    return new WindowingStrategy<>(
        windowFn,
        trigger,
        triggerSpecified,
        mode,
        modeSpecified,
        allowedLateness,
        allowedLatenessSpecified,
        timestampCombiner,
        timestampCombinerSpecified,
        closingBehavior,
        onTimeBehavior);
  }

  public WindowingStrategy<T, W> withOnTimeBehavior(OnTimeBehavior onTimeBehavior) {
    return new WindowingStrategy<>(
        windowFn,
        trigger,
        triggerSpecified,
        mode,
        modeSpecified,
        allowedLateness,
        allowedLatenessSpecified,
        timestampCombiner,
        timestampCombinerSpecified,
        closingBehavior,
        onTimeBehavior);
  }

  @Experimental(Experimental.Kind.OUTPUT_TIME)
  public WindowingStrategy<T, W> withTimestampCombiner(TimestampCombiner timestampCombiner) {

    return new WindowingStrategy<>(
        windowFn,
        trigger,
        triggerSpecified,
        mode,
        modeSpecified,
        allowedLateness,
        allowedLatenessSpecified,
        timestampCombiner,
        true,
        closingBehavior,
        onTimeBehavior);
  }

  @Override
  public String toString() {
    return MoreObjects.toStringHelper(this)
        .add("windowFn", windowFn)
        .add("allowedLateness", allowedLateness)
        .add("trigger", trigger)
        .add("accumulationMode", mode)
        .add("timestampCombiner", timestampCombiner)
        .toString();
  }

  @Override
  public boolean equals(Object object) {
    if (!(object instanceof WindowingStrategy)) {
      return false;
    }
    WindowingStrategy<?, ?> other = (WindowingStrategy<?, ?>) object;
    return isAllowedLatenessSpecified() == other.isAllowedLatenessSpecified()
        && isModeSpecified() == other.isModeSpecified()
        && isTimestampCombinerSpecified() == other.isTimestampCombinerSpecified()
        && getMode().equals(other.getMode())
        && getAllowedLateness().equals(other.getAllowedLateness())
        && getClosingBehavior().equals(other.getClosingBehavior())
        && getOnTimeBehavior().equals(other.getOnTimeBehavior())
        && getTrigger().equals(other.getTrigger())
        && getTimestampCombiner().equals(other.getTimestampCombiner())
        && getWindowFn().equals(other.getWindowFn());
  }

  @Override
  public int hashCode() {
    return Objects.hash(
        allowedLatenessSpecified,
        modeSpecified,
        timestampCombinerSpecified,
        mode,
        allowedLateness,
        closingBehavior,
        trigger,
        timestampCombiner,
        windowFn);
  }

  /**
   * Fixes all the defaults so that equals can be used to check that two strategies are the same,
   * regardless of the state of "defaulted-ness".
   */
  @VisibleForTesting
  public WindowingStrategy<T, W> fixDefaults() {
    return new WindowingStrategy<>(
        windowFn,
        trigger,
        true,
        mode,
        true,
        allowedLateness,
        true,
        timestampCombiner,
        true,
        closingBehavior,
        onTimeBehavior);
  }
}
