/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.runners.dataflow.worker;

import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.when;

import java.util.Arrays;
import java.util.List;
import java.util.Set;
import org.apache.beam.runners.core.InMemoryStateInternals;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.dataflow.worker.StateFetcher.SideInputState;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

/** Tests for {@link StreamingSideInputFetcher}. */
@RunWith(JUnit4.class)
public class StreamingSideInputFetcherTest {

  private static final FixedWindows WINDOW_FN = FixedWindows.of(Duration.millis(10));

  static TupleTag<String> mainOutputTag = new TupleTag<>();
  @Mock StreamingModeExecutionContext execContext;
  @Mock StreamingModeExecutionContext.StepContext stepContext;
  @Mock SideInputReader mockSideInputReader;

  private final InMemoryStateInternals<String> state = InMemoryStateInternals.forKey("dummyKey");

  // Suppressing the rawtype cast to StateInternals. Because Mockito causes a covariant ?
  // to become a contravariant ?, it is not possible to cast state to an appropriate type
  // without rawtypes.
  @SuppressWarnings({"rawtypes", "unchecked"})
  @Before
  public void setUp() {
    MockitoAnnotations.initMocks(this);
    when(stepContext.stateInternals()).thenReturn((StateInternals) state);
  }

  @Test
  public void testStoreIfBlocked() throws Exception {
    PCollectionView<String> view = createView();

    IntervalWindow readyWindow = createWindow(0);
    Windmill.GlobalDataId id =
        Windmill.GlobalDataId.newBuilder()
            .setTag(view.getTagInternal().getId())
            .setVersion(
                ByteString.copyFrom(
                    CoderUtils.encodeToByteArray(IntervalWindow.getCoder(), readyWindow)))
            .build();

    when(stepContext.getSideInputNotifications())
        .thenReturn(Arrays.<Windmill.GlobalDataId>asList(id));
    when(stepContext.issueSideInputFetch(
            eq(view), any(BoundedWindow.class), eq(SideInputState.UNKNOWN)))
        .thenReturn(false);
    when(stepContext.issueSideInputFetch(
            eq(view), any(BoundedWindow.class), eq(SideInputState.KNOWN_READY)))
        .thenReturn(true);

    StreamingSideInputFetcher<String, IntervalWindow> fetcher = createFetcher(Arrays.asList(view));

    // Verify storeIfBlocked
    WindowedValue<String> datum1 = createDatum("e1", 0);
    assertTrue(fetcher.storeIfBlocked(datum1));
    assertThat(fetcher.getBlockedWindows(), Matchers.contains(createWindow(0)));

    WindowedValue<String> datum2 = createDatum("e2", 0);
    assertTrue(fetcher.storeIfBlocked(datum2));
    assertThat(fetcher.getBlockedWindows(), Matchers.contains(createWindow(0)));

    WindowedValue<String> datum3 = createDatum("e3", 10);
    assertTrue(fetcher.storeIfBlocked(datum3));
    assertThat(
        fetcher.getBlockedWindows(),
        Matchers.containsInAnyOrder(createWindow(0), createWindow(10)));

    TimerData timer1 = createTimer(0);
    assertTrue(fetcher.storeIfBlocked(timer1));

    TimerData timer2 = createTimer(15);
    assertTrue(fetcher.storeIfBlocked(timer2));

    // Verify ready windows
    assertThat(fetcher.getReadyWindows(), Matchers.contains(readyWindow));
    Set<WindowedValue<String>> actualElements = Sets.newHashSet();
    for (BagState<WindowedValue<String>> bag :
        fetcher.prefetchElements(ImmutableList.of(readyWindow))) {
      for (WindowedValue<String> elem : bag.read()) {
        actualElements.add(elem);
      }
      bag.clear();
    }
    assertThat(actualElements, Matchers.containsInAnyOrder(datum1, datum2));

    Set<TimerData> actualTimers = Sets.newHashSet();
    for (BagState<TimerData> bag : fetcher.prefetchTimers(ImmutableList.of(readyWindow))) {
      for (TimerData timer : bag.read()) {
        actualTimers.add(timer);
      }
      bag.clear();
    }
    assertThat(actualTimers, Matchers.contains(timer1));

    // Verify releaseBlockedWindows
    fetcher.releaseBlockedWindows(ImmutableList.of(readyWindow));
    assertThat(fetcher.getBlockedWindows(), Matchers.contains(createWindow(10)));

    // Verify rest elements and timers
    Set<WindowedValue<String>> restElements = Sets.newHashSet();
    for (BagState<WindowedValue<String>> bag :
        fetcher.prefetchElements(ImmutableList.of(createWindow(10), createWindow(15)))) {
      for (WindowedValue<String> elem : bag.read()) {
        restElements.add(elem);
      }
    }
    assertThat(restElements, Matchers.contains(datum3));

    Set<TimerData> restTimers = Sets.newHashSet();
    for (BagState<TimerData> bag :
        fetcher.prefetchTimers(ImmutableList.of(createWindow(10), createWindow(15)))) {
      for (TimerData timer : bag.read()) {
        restTimers.add(timer);
      }
    }
    assertThat(restTimers, Matchers.contains(timer2));
  }

  private <ReceiverT> StreamingSideInputFetcher<String, IntervalWindow> createFetcher(
      List<PCollectionView<String>> views) throws Exception {
    @SuppressWarnings({"unchecked", "rawtypes"})
    Iterable<PCollectionView<?>> typedViews = (Iterable) views;

    return new StreamingSideInputFetcher<String, IntervalWindow>(
        typedViews, StringUtf8Coder.of(), WindowingStrategy.of(WINDOW_FN), stepContext);
  }

  private PCollectionView<String> createView() {
    return TestPipeline.create()
        .apply(Create.empty(StringUtf8Coder.of()))
        .apply(Window.<String>into(WINDOW_FN))
        .apply(View.<String>asSingleton());
  }

  private WindowedValue<String> createDatum(String element, long timestamp) {
    return WindowedValue.of(
        element,
        new Instant(timestamp),
        Arrays.asList(createWindow(timestamp)),
        PaneInfo.NO_FIRING);
  }

  private TimerData createTimer(long timestamp) {
    return TimerData.of(
        StateNamespaces.window(IntervalWindow.getCoder(), createWindow(timestamp)),
        new Instant(timestamp),
        TimeDomain.EVENT_TIME);
  }

  private IntervalWindow createWindow(long timestamp) {
    return new IntervalWindow(
        new Instant(timestamp - timestamp % 10), new Instant(timestamp - timestamp % 10 + 10));
  }
}
