/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.runners.core;

import java.util.Collection;
import java.util.Objects;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.hamcrest.TypeSafeMatcher;
import org.joda.time.Instant;

/** Matchers that are useful for working with Windowing, Timestamps, etc. */
public class WindowMatchers {

  public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue(
      T value, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo paneInfo) {

    Collection<Matcher<? super BoundedWindow>> windowMatchers =
        Lists.newArrayListWithCapacity(windows.size());
    for (BoundedWindow window : windows) {
      windowMatchers.add(Matchers.equalTo(window));
    }

    return isWindowedValue(
        Matchers.equalTo(value),
        Matchers.equalTo(timestamp),
        Matchers.containsInAnyOrder(windowMatchers),
        Matchers.equalTo(paneInfo));
  }

  public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue(
      Matcher<? super T> valueMatcher,
      Matcher<? super Instant> timestampMatcher,
      Matcher<? super Collection<? extends BoundedWindow>> windowsMatcher,
      Matcher<? super PaneInfo> paneInfoMatcher) {
    return new WindowedValueMatcher<>(
        valueMatcher, timestampMatcher, windowsMatcher, paneInfoMatcher);
  }

  public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue(
      Matcher<? super T> valueMatcher,
      Matcher<? super Instant> timestampMatcher,
      Matcher<? super Collection<? extends BoundedWindow>> windowsMatcher) {
    return new WindowedValueMatcher<>(
        valueMatcher, timestampMatcher, windowsMatcher, Matchers.anything());
  }

  public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue(
      Matcher<? super T> valueMatcher, Matcher<? super Instant> timestampMatcher) {
    return new WindowedValueMatcher<>(
        valueMatcher, timestampMatcher, Matchers.anything(), Matchers.anything());
  }

  public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue(
      Matcher<? super T> valueMatcher) {
    return new WindowedValueMatcher<>(
        valueMatcher, Matchers.anything(), Matchers.anything(), Matchers.anything());
  }

  public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
      T value, long timestamp, long windowStart, long windowEnd) {
    return WindowMatchers.isSingleWindowedValue(
        Matchers.equalTo(value), timestamp, windowStart, windowEnd);
  }

  public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
      T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo) {
    return WindowMatchers.isSingleWindowedValue(
        Matchers.equalTo(value),
        Matchers.equalTo(timestamp),
        Matchers.equalTo(window),
        Matchers.equalTo(paneInfo));
  }

  public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
      T value, Instant timestamp, BoundedWindow window) {
    return WindowMatchers.isSingleWindowedValue(
        Matchers.equalTo(value), Matchers.equalTo(timestamp), Matchers.equalTo(window));
  }

  public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
      Matcher<T> valueMatcher, long timestamp, long windowStart, long windowEnd) {
    IntervalWindow intervalWindow =
        new IntervalWindow(new Instant(windowStart), new Instant(windowEnd));
    return WindowMatchers.isSingleWindowedValue(
        valueMatcher,
        Matchers.describedAs("%0", Matchers.equalTo(new Instant(timestamp)), timestamp),
        Matchers.equalTo(intervalWindow),
        Matchers.anything());
  }

  public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
      Matcher<T> valueMatcher,
      long timestamp,
      long windowStart,
      long windowEnd,
      PaneInfo paneInfo) {
    IntervalWindow intervalWindow =
        new IntervalWindow(new Instant(windowStart), new Instant(windowEnd));
    return WindowMatchers.isSingleWindowedValue(
        valueMatcher,
        Matchers.describedAs("%0", Matchers.equalTo(new Instant(timestamp)), timestamp),
        Matchers.equalTo(intervalWindow),
        Matchers.equalTo(paneInfo));
  }

  public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
      Matcher<? super T> valueMatcher,
      Matcher<? super Instant> timestampMatcher,
      Matcher<? super BoundedWindow> windowMatcher) {
    return new WindowedValueMatcher<>(
        valueMatcher, timestampMatcher, Matchers.contains(windowMatcher), Matchers.anything());
  }

  public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
      Matcher<? super T> valueMatcher,
      Matcher<? super Instant> timestampMatcher,
      Matcher<? super BoundedWindow> windowMatcher,
      Matcher<? super PaneInfo> paneInfoMatcher) {
    return new WindowedValueMatcher<>(
        valueMatcher, timestampMatcher, Matchers.contains(windowMatcher), paneInfoMatcher);
  }

  public static Matcher<IntervalWindow> intervalWindow(long start, long end) {
    return Matchers.equalTo(new IntervalWindow(new Instant(start), new Instant(end)));
  }

  public static <T> Matcher<WindowedValue<? extends T>> valueWithPaneInfo(final PaneInfo paneInfo) {
    return new TypeSafeMatcher<WindowedValue<? extends T>>() {
      @Override
      public void describeTo(Description description) {
        description.appendText("WindowedValue(paneInfo = ").appendValue(paneInfo).appendText(")");
      }

      @Override
      protected boolean matchesSafely(WindowedValue<? extends T> item) {
        return Objects.equals(item.getPane(), paneInfo);
      }

      @Override
      protected void describeMismatchSafely(
          WindowedValue<? extends T> item, Description mismatchDescription) {
        mismatchDescription.appendValue(item.getPane());
      }
    };
  }

  @SuppressWarnings({"unchecked", "rawtypes"})
  @SafeVarargs
  public static final <W extends BoundedWindow> Matcher<Iterable<W>> ofWindows(
      Matcher<W>... windows) {
    return (Matcher) Matchers.containsInAnyOrder(windows);
  }

  private WindowMatchers() {}

  private static class WindowedValueMatcher<T> extends TypeSafeMatcher<WindowedValue<? extends T>> {

    private Matcher<? super T> valueMatcher;
    private Matcher<? super Instant> timestampMatcher;
    private Matcher<? super Collection<? extends BoundedWindow>> windowsMatcher;
    private Matcher<? super PaneInfo> paneInfoMatcher;

    private WindowedValueMatcher(
        Matcher<? super T> valueMatcher,
        Matcher<? super Instant> timestampMatcher,
        Matcher<? super Collection<? extends BoundedWindow>> windowsMatcher,
        Matcher<? super PaneInfo> paneInfoMatcher) {
      this.valueMatcher = valueMatcher;
      this.timestampMatcher = timestampMatcher;
      this.windowsMatcher = windowsMatcher;
      this.paneInfoMatcher = paneInfoMatcher;
    }

    @Override
    public void describeTo(Description description) {
      description
          .appendText("a WindowedValue(")
          .appendValue(valueMatcher)
          .appendText(", ")
          .appendValue(timestampMatcher)
          .appendText(", ")
          .appendValue(windowsMatcher)
          .appendText(", ")
          .appendValue(paneInfoMatcher)
          .appendText(")");
    }

    @Override
    protected boolean matchesSafely(WindowedValue<? extends T> windowedValue) {
      return valueMatcher.matches(windowedValue.getValue())
          && timestampMatcher.matches(windowedValue.getTimestamp())
          && windowsMatcher.matches(windowedValue.getWindows())
          && paneInfoMatcher.matches(windowedValue.getPane());
    }
  }
}
