/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.runners.dataflow;

import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;

import java.util.List;
import java.util.Map;
import org.apache.beam.runners.dataflow.BatchViewOverrides.TransformedMap;
import org.apache.beam.runners.dataflow.internal.IsmFormat;
import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecord;
import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecordCoder;
import org.apache.beam.runners.dataflow.internal.IsmFormat.MetadataKeyCoder;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** Tests for {@link BatchViewOverrides}. */
@RunWith(JUnit4.class)
public class BatchViewOverridesTest {
  @Rule public ExpectedException thrown = ExpectedException.none();

  @Test
  public void testBatchViewAsSingletonToIsmRecord() throws Exception {
    DoFnTester<
            KV<Integer, Iterable<KV<GlobalWindow, WindowedValue<String>>>>,
            IsmRecord<WindowedValue<String>>>
        doFnTester =
            DoFnTester.of(
                new BatchViewOverrides.BatchViewAsSingleton.IsmRecordForSingularValuePerWindowDoFn<
                    String, GlobalWindow>(GlobalWindow.Coder.INSTANCE));

    assertThat(
        doFnTester.processBundle(
            ImmutableList.of(
                KV.of(
                    0, ImmutableList.of(KV.of(GlobalWindow.INSTANCE, valueInGlobalWindow("a")))))),
        contains(IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE), valueInGlobalWindow("a"))));
  }

  @Test
  public void testBatchViewAsSingletonToIsmRecordWithMultipleValuesThrowsException()
      throws Exception {
    DoFnTester<
            KV<Integer, Iterable<KV<GlobalWindow, WindowedValue<String>>>>,
            IsmRecord<WindowedValue<String>>>
        doFnTester =
            DoFnTester.of(
                new BatchViewOverrides.BatchViewAsSingleton.IsmRecordForSingularValuePerWindowDoFn<
                    String, GlobalWindow>(GlobalWindow.Coder.INSTANCE));

    thrown.expect(IllegalStateException.class);
    thrown.expectMessage("found for singleton within window");
    doFnTester.processBundle(
        ImmutableList.of(
            KV.of(
                0,
                ImmutableList.of(
                    KV.of(GlobalWindow.INSTANCE, valueInGlobalWindow("a")),
                    KV.of(GlobalWindow.INSTANCE, valueInGlobalWindow("b"))))));
  }

  @Test
  public void testBatchViewAsListToIsmRecordForGlobalWindow() throws Exception {
    DoFnTester<String, IsmRecord<WindowedValue<String>>> doFnTester =
        DoFnTester.of(
            new BatchViewOverrides.BatchViewAsList.ToIsmRecordForGlobalWindowDoFn<String>());

    // The order of the output elements is important relative to processing order
    assertThat(
        doFnTester.processBundle(ImmutableList.of("a", "b", "c")),
        contains(
            IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE, 0L), valueInGlobalWindow("a")),
            IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE, 1L), valueInGlobalWindow("b")),
            IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE, 2L), valueInGlobalWindow("c"))));
  }

  @Test
  public void testBatchViewAsListToIsmRecordForNonGlobalWindow() throws Exception {
    DoFnTester<
            KV<Integer, Iterable<KV<IntervalWindow, WindowedValue<Long>>>>,
            IsmRecord<WindowedValue<Long>>>
        doFnTester =
            DoFnTester.of(
                new BatchViewOverrides.BatchViewAsList.ToIsmRecordForNonGlobalWindowDoFn<
                    Long, IntervalWindow>(IntervalWindow.getCoder()));

    IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10));
    IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20));
    IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30));

    Iterable<KV<Integer, Iterable<KV<IntervalWindow, WindowedValue<Long>>>>> inputElements =
        ImmutableList.of(
            KV.of(
                1,
                (Iterable<KV<IntervalWindow, WindowedValue<Long>>>)
                    ImmutableList.of(
                        KV.of(
                            windowA,
                            WindowedValue.of(110L, new Instant(1), windowA, PaneInfo.NO_FIRING)),
                        KV.of(
                            windowA,
                            WindowedValue.of(111L, new Instant(3), windowA, PaneInfo.NO_FIRING)),
                        KV.of(
                            windowA,
                            WindowedValue.of(112L, new Instant(4), windowA, PaneInfo.NO_FIRING)),
                        KV.of(
                            windowB,
                            WindowedValue.of(120L, new Instant(12), windowB, PaneInfo.NO_FIRING)),
                        KV.of(
                            windowB,
                            WindowedValue.of(121L, new Instant(14), windowB, PaneInfo.NO_FIRING)))),
            KV.of(
                2,
                (Iterable<KV<IntervalWindow, WindowedValue<Long>>>)
                    ImmutableList.of(
                        KV.of(
                            windowC,
                            WindowedValue.of(
                                210L, new Instant(25), windowC, PaneInfo.NO_FIRING)))));

    // The order of the output elements is important relative to processing order
    assertThat(
        doFnTester.processBundle(inputElements),
        contains(
            IsmRecord.of(
                ImmutableList.of(windowA, 0L),
                WindowedValue.of(110L, new Instant(1), windowA, PaneInfo.NO_FIRING)),
            IsmRecord.of(
                ImmutableList.of(windowA, 1L),
                WindowedValue.of(111L, new Instant(3), windowA, PaneInfo.NO_FIRING)),
            IsmRecord.of(
                ImmutableList.of(windowA, 2L),
                WindowedValue.of(112L, new Instant(4), windowA, PaneInfo.NO_FIRING)),
            IsmRecord.of(
                ImmutableList.of(windowB, 0L),
                WindowedValue.of(120L, new Instant(12), windowB, PaneInfo.NO_FIRING)),
            IsmRecord.of(
                ImmutableList.of(windowB, 1L),
                WindowedValue.of(121L, new Instant(14), windowB, PaneInfo.NO_FIRING)),
            IsmRecord.of(
                ImmutableList.of(windowC, 0L),
                WindowedValue.of(210L, new Instant(25), windowC, PaneInfo.NO_FIRING))));
  }

  @Test
  public void testToIsmRecordForMapLikeDoFn() throws Exception {
    TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForSizeTag = new TupleTag<>();
    TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForEntrySetTag = new TupleTag<>();

    Coder<Long> keyCoder = VarLongCoder.of();
    Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder();

    IsmRecordCoder<WindowedValue<Long>> ismCoder =
        IsmRecordCoder.of(
            1,
            2,
            ImmutableList.of(
                MetadataKeyCoder.of(keyCoder), IntervalWindow.getCoder(), BigEndianLongCoder.of()),
            FullWindowedValueCoder.of(VarLongCoder.of(), windowCoder));

    DoFnTester<
            KV<Integer, Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>>,
            IsmRecord<WindowedValue<Long>>>
        doFnTester =
            DoFnTester.of(
                new BatchViewOverrides.BatchViewAsMultimap.ToIsmRecordForMapLikeDoFn<>(
                    outputForSizeTag,
                    outputForEntrySetTag,
                    windowCoder,
                    keyCoder,
                    ismCoder,
                    false /* unique keys */));

    IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10));
    IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20));
    IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30));

    Iterable<KV<Integer, Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>>>
        inputElements =
            ImmutableList.of(
                KV.of(
                    1,
                    (Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>)
                        ImmutableList.of(
                            KV.of(
                                KV.of(1L, windowA),
                                WindowedValue.of(
                                    110L, new Instant(1), windowA, PaneInfo.NO_FIRING)),
                            // same window same key as to previous
                            KV.of(
                                KV.of(1L, windowA),
                                WindowedValue.of(
                                    111L, new Instant(2), windowA, PaneInfo.NO_FIRING)),
                            // same window different key as to previous
                            KV.of(
                                KV.of(2L, windowA),
                                WindowedValue.of(
                                    120L, new Instant(3), windowA, PaneInfo.NO_FIRING)),
                            // different window same key as to previous
                            KV.of(
                                KV.of(2L, windowB),
                                WindowedValue.of(
                                    210L, new Instant(11), windowB, PaneInfo.NO_FIRING)),
                            // different window and different key as to previous
                            KV.of(
                                KV.of(3L, windowB),
                                WindowedValue.of(
                                    220L, new Instant(12), windowB, PaneInfo.NO_FIRING)))),
                KV.of(
                    2,
                    (Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>)
                        ImmutableList.of(
                            // different shard
                            KV.of(
                                KV.of(4L, windowC),
                                WindowedValue.of(
                                    330L, new Instant(21), windowC, PaneInfo.NO_FIRING)))));

    // The order of the output elements is important relative to processing order
    assertThat(
        doFnTester.processBundle(inputElements),
        contains(
            IsmRecord.of(
                ImmutableList.of(1L, windowA, 0L),
                WindowedValue.of(110L, new Instant(1), windowA, PaneInfo.NO_FIRING)),
            IsmRecord.of(
                ImmutableList.of(1L, windowA, 1L),
                WindowedValue.of(111L, new Instant(2), windowA, PaneInfo.NO_FIRING)),
            IsmRecord.of(
                ImmutableList.of(2L, windowA, 0L),
                WindowedValue.of(120L, new Instant(3), windowA, PaneInfo.NO_FIRING)),
            IsmRecord.of(
                ImmutableList.of(2L, windowB, 0L),
                WindowedValue.of(210L, new Instant(11), windowB, PaneInfo.NO_FIRING)),
            IsmRecord.of(
                ImmutableList.of(3L, windowB, 0L),
                WindowedValue.of(220L, new Instant(12), windowB, PaneInfo.NO_FIRING)),
            IsmRecord.of(
                ImmutableList.of(4L, windowC, 0L),
                WindowedValue.of(330L, new Instant(21), windowC, PaneInfo.NO_FIRING))));

    // Verify the number of unique keys per window.
    assertThat(
        doFnTester.takeOutputElements(outputForSizeTag),
        contains(
            KV.of(
                ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowA)),
                KV.of(windowA, 2L)),
            KV.of(
                ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowB)),
                KV.of(windowB, 2L)),
            KV.of(
                ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowC)),
                KV.of(windowC, 1L))));

    // Verify the output for the unique keys.
    assertThat(
        doFnTester.takeOutputElements(outputForEntrySetTag),
        contains(
            KV.of(
                ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowA)),
                KV.of(windowA, 1L)),
            KV.of(
                ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowA)),
                KV.of(windowA, 2L)),
            KV.of(
                ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowB)),
                KV.of(windowB, 2L)),
            KV.of(
                ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowB)),
                KV.of(windowB, 3L)),
            KV.of(
                ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowC)),
                KV.of(windowC, 4L))));
  }

  @Test
  public void testToIsmRecordForMapLikeDoFnWithoutUniqueKeysThrowsException() throws Exception {
    TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForSizeTag = new TupleTag<>();
    TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForEntrySetTag = new TupleTag<>();

    Coder<Long> keyCoder = VarLongCoder.of();
    Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder();

    IsmRecordCoder<WindowedValue<Long>> ismCoder =
        IsmRecordCoder.of(
            1,
            2,
            ImmutableList.of(
                MetadataKeyCoder.of(keyCoder), IntervalWindow.getCoder(), BigEndianLongCoder.of()),
            FullWindowedValueCoder.of(VarLongCoder.of(), windowCoder));

    DoFnTester<
            KV<Integer, Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>>,
            IsmRecord<WindowedValue<Long>>>
        doFnTester =
            DoFnTester.of(
                new BatchViewOverrides.BatchViewAsMultimap.ToIsmRecordForMapLikeDoFn<>(
                    outputForSizeTag,
                    outputForEntrySetTag,
                    windowCoder,
                    keyCoder,
                    ismCoder,
                    true /* unique keys */));

    IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10));

    Iterable<KV<Integer, Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>>>
        inputElements =
            ImmutableList.of(
                KV.of(
                    1,
                    (Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>)
                        ImmutableList.of(
                            KV.of(
                                KV.of(1L, windowA),
                                WindowedValue.of(
                                    110L, new Instant(1), windowA, PaneInfo.NO_FIRING)),
                            // same window same key as to previous
                            KV.of(
                                KV.of(1L, windowA),
                                WindowedValue.of(
                                    111L, new Instant(2), windowA, PaneInfo.NO_FIRING)))));

    thrown.expect(IllegalStateException.class);
    thrown.expectMessage("Unique keys are expected but found key");
    doFnTester.processBundle(inputElements);
  }

  @Test
  public void testToIsmMetadataRecordForSizeDoFn() throws Exception {

    Coder<Long> keyCoder = VarLongCoder.of();
    Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder();

    IsmRecordCoder<WindowedValue<Long>> ismCoder =
        IsmRecordCoder.of(
            1,
            2,
            ImmutableList.of(
                MetadataKeyCoder.of(keyCoder), IntervalWindow.getCoder(), BigEndianLongCoder.of()),
            FullWindowedValueCoder.of(VarLongCoder.of(), windowCoder));

    DoFnTester<KV<Integer, Iterable<KV<IntervalWindow, Long>>>, IsmRecord<WindowedValue<Long>>>
        doFnTester =
            DoFnTester.of(
                new BatchViewOverrides.BatchViewAsMultimap.ToIsmMetadataRecordForSizeDoFn<
                    Long, Long, IntervalWindow>(windowCoder));

    IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10));
    IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20));
    IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30));

    Iterable<KV<Integer, Iterable<KV<IntervalWindow, Long>>>> inputElements =
        ImmutableList.of(
            KV.of(
                1,
                (Iterable<KV<IntervalWindow, Long>>)
                    ImmutableList.of(KV.of(windowA, 2L), KV.of(windowA, 3L), KV.of(windowB, 7L))),
            KV.of(
                ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowB)),
                (Iterable<KV<IntervalWindow, Long>>) ImmutableList.of(KV.of(windowC, 9L))));

    // The order of the output elements is important relative to processing order
    assertThat(
        doFnTester.processBundle(inputElements),
        contains(
            IsmRecord.<WindowedValue<Long>>meta(
                ImmutableList.of(IsmFormat.getMetadataKey(), windowA, 0L),
                CoderUtils.encodeToByteArray(VarLongCoder.of(), 5L)),
            IsmRecord.<WindowedValue<Long>>meta(
                ImmutableList.of(IsmFormat.getMetadataKey(), windowB, 0L),
                CoderUtils.encodeToByteArray(VarLongCoder.of(), 7L)),
            IsmRecord.<WindowedValue<Long>>meta(
                ImmutableList.of(IsmFormat.getMetadataKey(), windowC, 0L),
                CoderUtils.encodeToByteArray(VarLongCoder.of(), 9L))));
  }

  @Test
  public void testToIsmMetadataRecordForKeyDoFn() throws Exception {

    Coder<Long> keyCoder = VarLongCoder.of();
    Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder();

    IsmRecordCoder<WindowedValue<Long>> ismCoder =
        IsmRecordCoder.of(
            1,
            2,
            ImmutableList.of(
                MetadataKeyCoder.of(keyCoder), IntervalWindow.getCoder(), BigEndianLongCoder.of()),
            FullWindowedValueCoder.of(VarLongCoder.of(), windowCoder));

    DoFnTester<KV<Integer, Iterable<KV<IntervalWindow, Long>>>, IsmRecord<WindowedValue<Long>>>
        doFnTester =
            DoFnTester.of(
                new BatchViewOverrides.BatchViewAsMultimap.ToIsmMetadataRecordForKeyDoFn<
                    Long, Long, IntervalWindow>(keyCoder, windowCoder));

    IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10));
    IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20));
    IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30));

    Iterable<KV<Integer, Iterable<KV<IntervalWindow, Long>>>> inputElements =
        ImmutableList.of(
            KV.of(
                1,
                (Iterable<KV<IntervalWindow, Long>>)
                    ImmutableList.of(
                        KV.of(windowA, 2L),
                        // same window as previous
                        KV.of(windowA, 3L),
                        // different window as previous
                        KV.of(windowB, 3L))),
            KV.of(
                ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowB)),
                (Iterable<KV<IntervalWindow, Long>>) ImmutableList.of(KV.of(windowC, 3L))));

    // The order of the output elements is important relative to processing order
    assertThat(
        doFnTester.processBundle(inputElements),
        contains(
            IsmRecord.<WindowedValue<Long>>meta(
                ImmutableList.of(IsmFormat.getMetadataKey(), windowA, 1L),
                CoderUtils.encodeToByteArray(VarLongCoder.of(), 2L)),
            IsmRecord.<WindowedValue<Long>>meta(
                ImmutableList.of(IsmFormat.getMetadataKey(), windowA, 2L),
                CoderUtils.encodeToByteArray(VarLongCoder.of(), 3L)),
            IsmRecord.<WindowedValue<Long>>meta(
                ImmutableList.of(IsmFormat.getMetadataKey(), windowB, 1L),
                CoderUtils.encodeToByteArray(VarLongCoder.of(), 3L)),
            IsmRecord.<WindowedValue<Long>>meta(
                ImmutableList.of(IsmFormat.getMetadataKey(), windowC, 1L),
                CoderUtils.encodeToByteArray(VarLongCoder.of(), 3L))));
  }

  @Test
  public void testToMapDoFn() throws Exception {
    Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder();

    DoFnTester<
            KV<Integer, Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>>,
            IsmRecord<WindowedValue<TransformedMap<Long, WindowedValue<Long>, Long>>>>
        doFnTester =
            DoFnTester.of(
                new BatchViewOverrides.BatchViewAsMap.ToMapDoFn<Long, Long, IntervalWindow>(
                    windowCoder));

    IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10));
    IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20));
    IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30));

    Iterable<KV<Integer, Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>>>
        inputElements =
            ImmutableList.of(
                KV.of(
                    1,
                    (Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>)
                        ImmutableList.of(
                            KV.of(
                                windowA,
                                WindowedValue.of(
                                    KV.of(1L, 11L), new Instant(3), windowA, PaneInfo.NO_FIRING)),
                            KV.of(
                                windowA,
                                WindowedValue.of(
                                    KV.of(2L, 21L), new Instant(7), windowA, PaneInfo.NO_FIRING)),
                            KV.of(
                                windowB,
                                WindowedValue.of(
                                    KV.of(2L, 21L), new Instant(13), windowB, PaneInfo.NO_FIRING)),
                            KV.of(
                                windowB,
                                WindowedValue.of(
                                    KV.of(3L, 31L),
                                    new Instant(15),
                                    windowB,
                                    PaneInfo.NO_FIRING)))),
                KV.of(
                    2,
                    (Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>)
                        ImmutableList.of(
                            KV.of(
                                windowC,
                                WindowedValue.of(
                                    KV.of(4L, 41L),
                                    new Instant(25),
                                    windowC,
                                    PaneInfo.NO_FIRING)))));

    // The order of the output elements is important relative to processing order
    List<IsmRecord<WindowedValue<TransformedMap<Long, WindowedValue<Long>, Long>>>> output =
        doFnTester.processBundle(inputElements);
    assertEquals(3, output.size());
    Map<Long, Long> outputMap;

    outputMap = output.get(0).getValue().getValue();
    assertEquals(2, outputMap.size());
    assertEquals(ImmutableMap.of(1L, 11L, 2L, 21L), outputMap);

    outputMap = output.get(1).getValue().getValue();
    assertEquals(2, outputMap.size());
    assertEquals(ImmutableMap.of(2L, 21L, 3L, 31L), outputMap);

    outputMap = output.get(2).getValue().getValue();
    assertEquals(1, outputMap.size());
    assertEquals(ImmutableMap.of(4L, 41L), outputMap);
  }

  @Test
  public void testToMultimapDoFn() throws Exception {
    Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder();

    DoFnTester<
            KV<Integer, Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>>,
            IsmRecord<
                WindowedValue<TransformedMap<Long, Iterable<WindowedValue<Long>>, Iterable<Long>>>>>
        doFnTester =
            DoFnTester.of(
                new BatchViewOverrides.BatchViewAsMultimap.ToMultimapDoFn<
                    Long, Long, IntervalWindow>(windowCoder));

    IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10));
    IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20));
    IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30));

    Iterable<KV<Integer, Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>>>
        inputElements =
            ImmutableList.of(
                KV.of(
                    1,
                    (Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>)
                        ImmutableList.of(
                            KV.of(
                                windowA,
                                WindowedValue.of(
                                    KV.of(1L, 11L), new Instant(3), windowA, PaneInfo.NO_FIRING)),
                            // [BEAM-5184] Specifically test with a duplicate value to ensure that
                            // duplicate key/values are not lost.
                            KV.of(
                                windowA,
                                WindowedValue.of(
                                    KV.of(1L, 11L), new Instant(3), windowA, PaneInfo.NO_FIRING)),
                            KV.of(
                                windowA,
                                WindowedValue.of(
                                    KV.of(1L, 12L), new Instant(5), windowA, PaneInfo.NO_FIRING)),
                            KV.of(
                                windowA,
                                WindowedValue.of(
                                    KV.of(2L, 21L), new Instant(7), windowA, PaneInfo.NO_FIRING)),
                            KV.of(
                                windowB,
                                WindowedValue.of(
                                    KV.of(2L, 21L), new Instant(13), windowB, PaneInfo.NO_FIRING)),
                            KV.of(
                                windowB,
                                WindowedValue.of(
                                    KV.of(3L, 31L),
                                    new Instant(15),
                                    windowB,
                                    PaneInfo.NO_FIRING)))),
                KV.of(
                    2,
                    (Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>)
                        ImmutableList.of(
                            KV.of(
                                windowC,
                                WindowedValue.of(
                                    KV.of(4L, 41L),
                                    new Instant(25),
                                    windowC,
                                    PaneInfo.NO_FIRING)))));

    // The order of the output elements is important relative to processing order
    List<
            IsmRecord<
                WindowedValue<TransformedMap<Long, Iterable<WindowedValue<Long>>, Iterable<Long>>>>>
        output = doFnTester.processBundle(inputElements);
    assertEquals(3, output.size());
    Map<Long, Iterable<Long>> outputMap;

    outputMap = output.get(0).getValue().getValue();
    assertEquals(2, outputMap.size());
    assertThat(outputMap.get(1L), containsInAnyOrder(11L, 11L, 12L));
    assertThat(outputMap.get(2L), containsInAnyOrder(21L));

    outputMap = output.get(1).getValue().getValue();
    assertEquals(2, outputMap.size());
    assertThat(outputMap.get(2L), containsInAnyOrder(21L));
    assertThat(outputMap.get(3L), containsInAnyOrder(31L));

    outputMap = output.get(2).getValue().getValue();
    assertEquals(1, outputMap.size());
    assertThat(outputMap.get(4L), containsInAnyOrder(41L));
  }
}
