/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.runners.dataflow.worker;

import static org.apache.beam.runners.dataflow.util.Structs.getString;
import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables.concat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import com.google.api.services.dataflow.model.CounterMetadata;
import com.google.api.services.dataflow.model.CounterStructuredName;
import com.google.api.services.dataflow.model.CounterStructuredNameAndMetadata;
import com.google.api.services.dataflow.model.CounterUpdate;
import com.google.api.services.dataflow.model.SideInputInfo;
import com.google.api.services.dataflow.model.Source;
import com.google.api.services.dataflow.model.SplitInt64;
import java.io.Closeable;
import java.io.IOException;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Random;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import org.apache.beam.runners.dataflow.internal.IsmFormat;
import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecord;
import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecordCoder;
import org.apache.beam.runners.dataflow.internal.IsmFormat.MetadataKeyCoder;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.CloudObjects;
import org.apache.beam.runners.dataflow.util.PropertyNames;
import org.apache.beam.runners.dataflow.util.RandomAccessData;
import org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState;
import org.apache.beam.runners.dataflow.worker.ExperimentContext.Experiment;
import org.apache.beam.runners.dataflow.worker.counters.Counter;
import org.apache.beam.runners.dataflow.worker.counters.CounterName;
import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.NoopProfileScope;
import org.apache.beam.runners.dataflow.worker.util.ValueInEmptyWindows;
import org.apache.beam.runners.dataflow.worker.util.WorkerPropertyNames;
import org.apache.beam.runners.dataflow.worker.util.common.worker.Sink.SinkWriter;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.MapCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Materializations.MultimapView;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Function;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableListMultimap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterators;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ListMultimap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Multimap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Ordering;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.TreeMultimap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.io.Closer;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.MoreExecutors;
import org.hamcrest.Matcher;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Tests for {@link IsmSideInputReader}.
 *
 * <p>Note that we use a {@link BigEndianLongCoder} to because their byte representation compares
 * equivalently to their numeric representation for non-negative values.
 */
@RunWith(JUnit4.class)
public class IsmSideInputReaderTest {
  private static final Logger LOG = LoggerFactory.getLogger(IsmSideInputReaderTest.class);
  private static final long BLOOM_FILTER_SIZE_LIMIT = 10_000;
  private static final int NUM_THREADS = 16;
  private static final DataflowPipelineOptions pipelineOptions =
      PipelineOptionsFactory.as(DataflowPipelineOptions.class);
  private static final Pipeline pipeline = Pipeline.create(pipelineOptions);
  private static final CounterSet counterFactory = new CounterSet();
  private static final BatchModeExecutionContext executionContext =
      BatchModeExecutionContext.forTesting(
          pipelineOptions, counterFactory, NameContextsForTests.nameContextForTest().stageName());
  private static final DataflowOperationContext operationContext =
      executionContext.createOperationContext(NameContextsForTests.nameContextForTest());
  private Closer setupCloser;

  @SuppressWarnings({"unchecked", "rawtypes"})
  private static final Coder<BoundedWindow> GLOBAL_WINDOW_CODER =
      (Coder) GlobalWindow.Coder.INSTANCE;

  @SuppressWarnings({"unchecked", "rawtypes"})
  private static final Coder<BoundedWindow> INTERVAL_WINDOW_CODER =
      (Coder) IntervalWindow.getCoder();

  @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();

  @Before
  public void setUp() {
    pipelineOptions
        .as(DataflowPipelineDebugOptions.class)
        .setExperiments(Lists.newArrayList(Experiment.SideInputIOMetrics.getName()));
    setupCloser = Closer.create();
    setupCloser.register(executionContext.getExecutionStateTracker().activate());
    setupCloser.register(operationContext.enterProcess());
  }

  @After
  public void tearDown() throws IOException {
    setupCloser.close();
  }

  @Test
  public void testSingleton() throws Exception {
    Coder<WindowedValue<Long>> valueCoder =
        WindowedValue.getFullCoder(VarLongCoder.of(), GLOBAL_WINDOW_CODER);
    final WindowedValue<Long> element = valueInGlobalWindow(42L);
    final PCollectionView<Long> view =
        Pipeline.create().apply(Create.empty(VarLongCoder.of())).apply(View.asSingleton());

    final Source source =
        initInputFile(
            fromValues(Arrays.asList(element)),
            IsmRecordCoder.of(1, 0, ImmutableList.<Coder<?>>of(GLOBAL_WINDOW_CODER), valueCoder));

    final IsmSideInputReader reader = sideInputReader(view.getTagInternal().getId(), source);

    List<Callable<Long>> tasks = new ArrayList<>();
    for (int i = 0; i < NUM_THREADS; ++i) {
      tasks.add(
          () -> {
            // Store a strong reference to the returned value so that the logical reference
            // cache is not cleared for this test.
            Long value = reader.get(view, GlobalWindow.INSTANCE);
            assertEquals(element.getValue(), value);
            // Assert that the same value reference was returned showing that it was cached.
            assertSame(value, reader.get(view, GlobalWindow.INSTANCE));
            return value;
          });
    }

    List<Future<Long>> results = pipelineOptions.getExecutorService().invokeAll(tasks);
    // Assert that all threads got back the same reference
    Long value = results.get(0).get();
    for (Future<Long> result : results) {
      assertSame(value, result.get());
    }
  }

  @Test
  public void testSingletonInWindow() throws Exception {
    Coder<WindowedValue<Long>> valueCoder =
        WindowedValue.getFullCoder(VarLongCoder.of(), INTERVAL_WINDOW_CODER);
    IsmRecordCoder<WindowedValue<Long>> ismCoder =
        IsmRecordCoder.of(1, 0, ImmutableList.<Coder<?>>of(INTERVAL_WINDOW_CODER), valueCoder);

    final List<WindowedValue<Long>> elements =
        Arrays.asList(
            valueInIntervalWindow(12, 0),
            valueInIntervalWindow(17, 10),
            valueInIntervalWindow(28, 20));
    final Long defaultValue = 42L;

    final PCollectionView<Long> view =
        Pipeline.create()
            .apply(Create.empty(VarLongCoder.of()))
            .apply(Window.into(FixedWindows.of(Duration.millis(1))))
            .apply(View.<Long>asSingleton().withDefaultValue(defaultValue));

    Source sourceA = initInputFile(fromValues(elements).subList(0, 1), ismCoder);
    Source sourceB = initInputFile(fromValues(elements).subList(1, 3), ismCoder);

    final IsmSideInputReader reader =
        sideInputReader(view.getTagInternal().getId(), sourceA, sourceB);

    List<Callable<Map<BoundedWindow, Long>>> tasks = new ArrayList<>();
    for (int i = 0; i < NUM_THREADS; ++i) {
      tasks.add(
          () -> {
            Map<BoundedWindow, Long> rval = new HashMap<>();
            for (WindowedValue<Long> element : elements) {
              // Store a strong reference to the returned value so that the logical reference
              // cache is not cleared for this test.
              Long value = reader.get(view, windowOf(element));
              assertEquals(element.getValue(), value);
              // Assert that the same value reference was returned showing that it was cached.
              assertSame(value, reader.get(view, windowOf(element)));
              rval.put(windowOf(element), value);
            }
            // Check that if we don't find a value for a given window, we return the default.
            assertEquals(defaultValue, reader.get(view, intervalWindow(30)));
            return rval;
          });
    }

    List<Future<Map<BoundedWindow, Long>>> results =
        pipelineOptions.getExecutorService().invokeAll(tasks);
    Map<BoundedWindow, Long> value = results.get(0).get();
    // Assert that all threads got back the same reference
    for (Future<Map<BoundedWindow, Long>> result : results) {
      assertEquals(value, result.get());
      for (Map.Entry<BoundedWindow, Long> entry : result.get().entrySet()) {
        assertSame(value.get(entry.getKey()), entry.getValue());
      }
    }
  }

  @Test
  public void testSingletonMap() throws Exception {
    final WindowedValue<Map<String, Long>> element =
        valueInGlobalWindow(
            ImmutableMap.<String, Long>builder().put("foo", 0L).put("bar", -1L).build());
    Coder<Map<String, Long>> mapCoder = MapCoder.of(StringUtf8Coder.of(), VarLongCoder.of());
    final PCollectionView<Map<String, Long>> view =
        Pipeline.create()
            .apply(Create.empty(KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of())))
            .apply(View.asMap());

    IsmRecordCoder<WindowedValue<Map<String, Long>>> recordCoder =
        IsmRecordCoder.of(
            1,
            0,
            ImmutableList.<Coder<?>>of(GLOBAL_WINDOW_CODER),
            WindowedValue.getFullCoder(mapCoder, GLOBAL_WINDOW_CODER));
    final Source source = initInputFile(fromValues(Arrays.asList(element)), recordCoder);

    final IsmSideInputReader reader = sideInputReader(view.getTagInternal().getId(), source);

    List<Callable<Map<String, Long>>> tasks = new ArrayList<>();
    for (int i = 0; i < NUM_THREADS; ++i) {
      tasks.add(
          () -> {
            // Store a strong reference to the returned value so that the logical reference
            // cache is not cleared for this test.
            Map<String, Long> value = reader.get(view, GlobalWindow.INSTANCE);
            assertEquals(element.getValue(), value);
            // Assert that the same value reference was returned showing that it was cached.
            assertSame(value, reader.get(view, GlobalWindow.INSTANCE));
            return value;
          });
    }

    List<Future<Map<String, Long>>> results = pipelineOptions.getExecutorService().invokeAll(tasks);
    // Assert that all threads got back the same reference
    Map<String, Long> value = results.get(0).get();
    for (Future<Map<String, Long>> result : results) {
      assertSame(value, result.get());
    }
  }

  @Test
  public void testSingletonMapInWindow() throws Exception {
    IntervalWindow firstWindow = new IntervalWindow(new Instant(0L), new Instant(100L));
    IntervalWindow secondWindow = new IntervalWindow(new Instant(50L), new Instant(150L));
    IntervalWindow emptyWindow = new IntervalWindow(new Instant(75L), new Instant(175L));
    final Map<IntervalWindow, WindowedValue<Map<String, Long>>> elements =
        ImmutableMap.<IntervalWindow, WindowedValue<Map<String, Long>>>builder()
            .put(
                firstWindow,
                WindowedValue.of(
                    ImmutableMap.<String, Long>builder().put("foo", 0L).put("bar", -1L).build(),
                    new Instant(7),
                    firstWindow,
                    PaneInfo.NO_FIRING))
            .put(
                secondWindow,
                WindowedValue.of(
                    ImmutableMap.<String, Long>builder().put("bar", -1L).put("baz", 1L).build(),
                    new Instant(53L),
                    secondWindow,
                    PaneInfo.NO_FIRING))
            .build();
    Coder<Map<String, Long>> mapCoder = MapCoder.of(StringUtf8Coder.of(), VarLongCoder.of());
    final PCollectionView<Map<String, Long>> view =
        Pipeline.create()
            .apply(Create.empty(KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of())))
            .apply(
                Window.into(SlidingWindows.of(Duration.millis(100L)).every(Duration.millis(50L))))
            .apply(View.asMap());

    IsmRecordCoder<WindowedValue<Map<String, Long>>> recordCoder =
        IsmRecordCoder.of(
            1,
            0,
            ImmutableList.<Coder<?>>of(INTERVAL_WINDOW_CODER),
            WindowedValue.getFullCoder(mapCoder, INTERVAL_WINDOW_CODER));
    final Source source = initInputFile(fromValues(elements.values()), recordCoder);

    final IsmSideInputReader reader = sideInputReader(view.getTagInternal().getId(), source);

    List<Callable<Map<BoundedWindow, Map<String, Long>>>> tasks = new ArrayList<>();
    for (int i = 0; i < NUM_THREADS; ++i) {
      tasks.add(
          () -> {
            // Store a strong reference to the returned value so that the logical reference
            // cache is not cleared for this test.
            Map<String, Long> value = reader.get(view, firstWindow);
            assertEquals(elements.get(firstWindow).getValue(), value);
            // Assert that the same value reference was returned showing that it was cached.
            assertSame(value, reader.get(view, firstWindow));
            Map<String, Long> secondValue = reader.get(view, secondWindow);
            assertEquals(elements.get(secondWindow).getValue(), secondValue);
            // Assert that the same value reference was returned showing that it was cached.
            assertSame(secondValue, reader.get(view, secondWindow));
            Map<String, Long> emptyValue = reader.get(view, emptyWindow);
            assertThat(emptyValue.keySet(), empty());
            Map<BoundedWindow, Map<String, Long>> result =
                ImmutableMap.<BoundedWindow, Map<String, Long>>builder()
                    .put(firstWindow, value)
                    .put(secondWindow, secondValue)
                    .put(emptyWindow, emptyValue)
                    .build();
            return result;
          });
    }

    List<Future<Map<BoundedWindow, Map<String, Long>>>> results =
        pipelineOptions.getExecutorService().invokeAll(tasks);
    // Assert that all threads got back the same reference
    Map<BoundedWindow, Map<String, Long>> value = results.get(0).get();
    for (Future<Map<BoundedWindow, Map<String, Long>>> result : results) {
      assertEquals(value, result.get());
      for (Map.Entry<BoundedWindow, Map<String, Long>> entry : result.get().entrySet()) {
        assertSame(value.get(entry.getKey()), entry.getValue());
      }
    }
  }

  @Test
  public void testSingletonMultimapInWindow() throws Exception {
    IntervalWindow firstWindow = new IntervalWindow(new Instant(0L), new Instant(100L));
    IntervalWindow secondWindow = new IntervalWindow(new Instant(50L), new Instant(150L));
    IntervalWindow emptyWindow = new IntervalWindow(new Instant(75L), new Instant(175L));
    @SuppressWarnings({"unchecked", "rawtypes"}) // Collection is iterable, and this is immutable
    final Map<IntervalWindow, WindowedValue<Map<String, Iterable<Long>>>> elements =
        ImmutableMap.<IntervalWindow, WindowedValue<Map<String, Iterable<Long>>>>builder()
            .put(
                firstWindow,
                WindowedValue.of(
                    (Map)
                        ImmutableListMultimap.<String, Long>builder()
                            .put("foo", 0L)
                            .put("foo", 2L)
                            .put("bar", -1L)
                            .build()
                            .asMap(),
                    new Instant(7),
                    firstWindow,
                    PaneInfo.NO_FIRING))
            .put(
                secondWindow,
                WindowedValue.of(
                    (Map)
                        ImmutableListMultimap.<String, Long>builder()
                            .put("bar", -1L)
                            .put("baz", 1L)
                            .put("baz", 3L)
                            .build()
                            .asMap(),
                    new Instant(53L),
                    secondWindow,
                    PaneInfo.NO_FIRING))
            .build();
    StringUtf8Coder strCoder = StringUtf8Coder.of();
    Coder<Map<String, Iterable<Long>>> mapCoder =
        MapCoder.of(strCoder, IterableCoder.of(VarLongCoder.of()));
    final PCollectionView<Map<String, Iterable<Long>>> view =
        Pipeline.create()
            .apply(Create.empty(KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of())))
            .apply(Window.into(FixedWindows.of(Duration.millis(100L))))
            .apply(View.asMultimap());

    IsmRecordCoder<WindowedValue<Map<String, Iterable<Long>>>> recordCoder =
        IsmRecordCoder.of(
            1,
            0,
            ImmutableList.<Coder<?>>of(INTERVAL_WINDOW_CODER),
            WindowedValue.getFullCoder(mapCoder, INTERVAL_WINDOW_CODER));
    final Source source = initInputFile(fromValues(elements.values()), recordCoder);

    final IsmSideInputReader reader = sideInputReader(view.getTagInternal().getId(), source);

    List<Callable<Map<BoundedWindow, Map<String, Iterable<Long>>>>> tasks = new ArrayList<>();
    for (int i = 0; i < NUM_THREADS; ++i) {
      tasks.add(
          () -> {
            // Store a strong reference to the returned value so that the logical reference
            // cache is not cleared for this test.
            Map<String, Iterable<Long>> value = reader.get(view, firstWindow);
            assertEquals(elements.get(firstWindow).getValue(), value);
            // Assert that the same value reference was returned showing that it was cached.
            assertSame(value, reader.get(view, firstWindow));
            Map<String, Iterable<Long>> secondValue = reader.get(view, secondWindow);
            assertEquals(elements.get(secondWindow).getValue(), secondValue);
            // Assert that the same value reference was returned showing that it was cached.
            assertSame(secondValue, reader.get(view, secondWindow));
            Map<String, Iterable<Long>> emptyValue = reader.get(view, emptyWindow);
            assertThat(emptyValue.keySet(), empty());
            Map<BoundedWindow, Map<String, Iterable<Long>>> result =
                ImmutableMap.<BoundedWindow, Map<String, Iterable<Long>>>builder()
                    .put(firstWindow, value)
                    .put(secondWindow, secondValue)
                    .put(emptyWindow, emptyValue)
                    .build();
            return result;
          });
    }

    List<Future<Map<BoundedWindow, Map<String, Iterable<Long>>>>> results =
        pipelineOptions.getExecutorService().invokeAll(tasks);
    Map<BoundedWindow, Map<String, Iterable<Long>>> value = results.get(0).get();
    for (Future<Map<BoundedWindow, Map<String, Iterable<Long>>>> result : results) {
      assertEquals(value, result.get());
      for (Map.Entry<BoundedWindow, Map<String, Iterable<Long>>> entry : result.get().entrySet()) {
        assertSame(value.get(entry.getKey()), entry.getValue());
      }
    }
  }

  @Test
  public void testIterable() throws Exception {
    Coder<WindowedValue<Long>> valueCoder =
        WindowedValue.getFullCoder(VarLongCoder.of(), GLOBAL_WINDOW_CODER);
    IsmRecordCoder<WindowedValue<Long>> ismCoder =
        IsmRecordCoder.of(
            1, 0, ImmutableList.of(GLOBAL_WINDOW_CODER, BigEndianLongCoder.of()), valueCoder);

    final List<KV<Long, WindowedValue<Long>>> firstElements =
        Arrays.asList(
            KV.of(0L, valueInGlobalWindow(12L)),
            KV.of(1L, valueInGlobalWindow(22L)),
            KV.of(2L, valueInGlobalWindow(32L)));
    final List<KV<Long, WindowedValue<Long>>> secondElements =
        Arrays.asList(
            KV.of(0L, valueInGlobalWindow(42L)),
            KV.of(1L, valueInGlobalWindow(52L)),
            KV.of(2L, valueInGlobalWindow(62L)));

    final PCollectionView<Iterable<Long>> view =
        Pipeline.create().apply(Create.empty(VarLongCoder.of())).apply(View.asIterable());

    Source sourceA = initInputFile(fromKvsForList(firstElements), ismCoder);
    Source sourceB = initInputFile(fromKvsForList(secondElements), ismCoder);

    final IsmSideInputReader reader =
        sideInputReader(view.getTagInternal().getId(), sourceA, sourceB);

    List<Callable<Iterable<Long>>> tasks = new ArrayList<>();
    for (int i = 0; i < NUM_THREADS; ++i) {
      tasks.add(
          () -> {
            // Store a strong reference to the returned value so that the logical reference
            // cache is not cleared for this test.
            Iterable<Long> value = reader.get(view, GlobalWindow.INSTANCE);
            verifyIterable(toValueList(concat(firstElements, secondElements)), value);
            // Assert that the same value reference was returned showing that it was cached.
            assertSame(reader.get(view, GlobalWindow.INSTANCE), value);
            return value;
          });
    }

    List<Future<Iterable<Long>>> results = pipelineOptions.getExecutorService().invokeAll(tasks);
    Iterable<Long> value = results.get(0).get();
    // Assert that all threads got back the same reference
    for (Future<Iterable<Long>> result : results) {
      assertSame(value, result.get());
    }
  }

  @Test
  public void testIterableAtN() throws Exception {
    Coder<WindowedValue<Long>> valueCoder =
        WindowedValue.getFullCoder(VarLongCoder.of(), GLOBAL_WINDOW_CODER);
    IsmRecordCoder<WindowedValue<Long>> ismCoder =
        IsmRecordCoder.of(
            1, 0, ImmutableList.of(GLOBAL_WINDOW_CODER, BigEndianLongCoder.of()), valueCoder);

    final List<KV<Long, WindowedValue<Long>>> firstElements =
        Arrays.asList(
            KV.of(0L, valueInGlobalWindow(12L)),
            KV.of(1L, valueInGlobalWindow(22L)),
            KV.of(2L, valueInGlobalWindow(32L)));
    final List<KV<Long, WindowedValue<Long>>> secondElements =
        Arrays.asList(
            KV.of(0L, valueInGlobalWindow(42L)),
            KV.of(1L, valueInGlobalWindow(52L)),
            KV.of(2L, valueInGlobalWindow(62L)));

    final PCollectionView<Iterable<Long>> view =
        Pipeline.create().apply(Create.empty(VarLongCoder.of())).apply(View.asIterable());

    String tmpFilePrefix = tmpFolder.newFile().getPath();
    initInputFile(fromKvsForList(firstElements), ismCoder, tmpFilePrefix + "-00000-of-00002.ism");
    initInputFile(fromKvsForList(secondElements), ismCoder, tmpFilePrefix + "-00001-of-00002.ism");

    Source source = newIsmSource(ismCoder, tmpFilePrefix + "@2.ism");

    final IsmSideInputReader reader = sideInputReader(view.getTagInternal().getId(), source);

    List<Callable<Iterable<Long>>> tasks = new ArrayList<>();
    for (int i = 0; i < NUM_THREADS; ++i) {
      tasks.add(
          () -> {
            // Store a strong reference to the returned value so that the logical reference
            // cache is not cleared for this test.
            Iterable<Long> value = reader.get(view, GlobalWindow.INSTANCE);
            verifyIterable(toValueList(concat(firstElements, secondElements)), value);
            // Assert that the same value reference was returned showing that it was cached.
            assertSame(reader.get(view, GlobalWindow.INSTANCE), value);
            return value;
          });
    }

    List<Future<Iterable<Long>>> results = pipelineOptions.getExecutorService().invokeAll(tasks);
    Iterable<Long> value = results.get(0).get();
    // Assert that all threads got back the same reference
    for (Future<Iterable<Long>> result : results) {
      assertSame(value, result.get());
    }
  }

  @Test
  public void testIterableInWindow() throws Exception {
    Coder<WindowedValue<Long>> valueCoder =
        WindowedValue.getFullCoder(VarLongCoder.of(), INTERVAL_WINDOW_CODER);
    IsmRecordCoder<WindowedValue<Long>> ismCoder =
        IsmRecordCoder.of(
            1, 0, ImmutableList.of(INTERVAL_WINDOW_CODER, BigEndianLongCoder.of()), valueCoder);

    final List<KV<Long, WindowedValue<Long>>> firstElements =
        Arrays.asList(
            KV.of(0L, valueInIntervalWindow(12, 10)),
            KV.of(1L, valueInIntervalWindow(22, 10)),
            KV.of(2L, valueInIntervalWindow(32, 10)));
    final List<KV<Long, WindowedValue<Long>>> secondElements =
        Arrays.asList(
            KV.of(0L, valueInIntervalWindow(42, 20)),
            KV.of(1L, valueInIntervalWindow(52, 20)),
            KV.of(2L, valueInIntervalWindow(62, 20)));
    final List<KV<Long, WindowedValue<Long>>> thirdElements =
        Arrays.asList(
            KV.of(0L, valueInIntervalWindow(42L, 30)),
            KV.of(1L, valueInIntervalWindow(52L, 30)),
            KV.of(2L, valueInIntervalWindow(62L, 30)));

    final PCollectionView<Iterable<Long>> view =
        Pipeline.create()
            .apply(Create.empty(VarLongCoder.of()))
            .apply(Window.into(FixedWindows.of(Duration.millis(10))))
            .apply(View.asIterable());

    Source sourceA = initInputFile(fromKvsForList(concat(firstElements, secondElements)), ismCoder);
    Source sourceB = initInputFile(fromKvsForList(thirdElements), ismCoder);

    final IsmSideInputReader reader =
        sideInputReader(view.getTagInternal().getId(), sourceA, sourceB);

    List<Callable<Map<BoundedWindow, Iterable<Long>>>> tasks = new ArrayList<>();
    for (int i = 0; i < NUM_THREADS; ++i) {
      tasks.add(
          () -> {
            // Store a strong reference to the returned value so that the logical reference
            // cache is not cleared for this test.
            Iterable<Long> firstValues = reader.get(view, intervalWindow(10));
            Iterable<Long> secondValues = reader.get(view, intervalWindow(20));
            Iterable<Long> thirdValues = reader.get(view, intervalWindow(30));

            verifyIterable(toValueList(firstElements), firstValues);
            verifyIterable(toValueList(secondElements), secondValues);
            verifyIterable(toValueList(thirdElements), thirdValues);

            // Assert that the same value reference was returned showing that it was cached.
            assertSame(firstValues, reader.get(view, intervalWindow(10)));
            assertSame(secondValues, reader.get(view, intervalWindow(20)));
            assertSame(thirdValues, reader.get(view, intervalWindow(30)));
            return ImmutableMap.<BoundedWindow, Iterable<Long>>of(
                intervalWindow(10), firstValues,
                intervalWindow(20), secondValues,
                intervalWindow(30), thirdValues);
          });
    }

    List<Future<Map<BoundedWindow, Iterable<Long>>>> results =
        pipelineOptions.getExecutorService().invokeAll(tasks);
    Map<BoundedWindow, Iterable<Long>> value = results.get(0).get();
    // Assert that all threads got back the same reference
    for (Future<Map<BoundedWindow, Iterable<Long>>> result : results) {
      assertEquals(value, result.get());
      for (Map.Entry<BoundedWindow, Iterable<Long>> entry : result.get().entrySet()) {
        assertSame(value.get(entry.getKey()), entry.getValue());
      }
    }
  }

  @Test
  public void testList() throws Exception {
    Coder<WindowedValue<Long>> valueCoder =
        WindowedValue.getFullCoder(VarLongCoder.of(), GLOBAL_WINDOW_CODER);
    IsmRecordCoder<WindowedValue<Long>> ismCoder =
        IsmRecordCoder.of(
            1, 0, ImmutableList.of(GLOBAL_WINDOW_CODER, BigEndianLongCoder.of()), valueCoder);

    final List<KV<Long, WindowedValue<Long>>> firstElements =
        Arrays.asList(
            KV.of(0L, valueInGlobalWindow(12L)),
            KV.of(1L, valueInGlobalWindow(22L)),
            KV.of(2L, valueInGlobalWindow(32L)));
    final List<KV<Long, WindowedValue<Long>>> secondElements =
        Arrays.asList(
            KV.of(0L, valueInGlobalWindow(42L)),
            KV.of(1L, valueInGlobalWindow(52L)),
            KV.of(2L, valueInGlobalWindow(62L)));

    final PCollectionView<List<Long>> view =
        Pipeline.create().apply(Create.empty(VarLongCoder.of())).apply(View.asList());

    Source sourceA = initInputFile(fromKvsForList(firstElements), ismCoder);
    Source sourceB = initInputFile(fromKvsForList(secondElements), ismCoder);

    final IsmSideInputReader reader =
        sideInputReader(view.getTagInternal().getId(), sourceA, sourceB);

    List<Callable<List<Long>>> tasks = new ArrayList<>();
    for (int i = 0; i < NUM_THREADS; ++i) {
      tasks.add(
          () -> {
            // Store a strong reference to the returned value so that the logical reference
            // cache is not cleared for this test.
            List<Long> value = reader.get(view, GlobalWindow.INSTANCE);
            verifyList(toValueList(concat(firstElements, secondElements)), value);
            // Assert that the same value reference was returned showing that it was cached.
            assertSame(reader.get(view, GlobalWindow.INSTANCE), value);
            return value;
          });
    }

    List<Future<List<Long>>> results = pipelineOptions.getExecutorService().invokeAll(tasks);
    List<Long> value = results.get(0).get();
    // Assert that all threads got back the same reference
    for (Future<List<Long>> result : results) {
      assertSame(value, result.get());
    }
  }

  @Test
  public void testListInWindow() throws Exception {
    Coder<WindowedValue<Long>> valueCoder =
        WindowedValue.getFullCoder(VarLongCoder.of(), INTERVAL_WINDOW_CODER);
    IsmRecordCoder<WindowedValue<Long>> ismCoder =
        IsmRecordCoder.of(
            1, 0, ImmutableList.of(INTERVAL_WINDOW_CODER, BigEndianLongCoder.of()), valueCoder);

    final List<KV<Long, WindowedValue<Long>>> firstElements =
        Arrays.asList(
            KV.of(0L, valueInIntervalWindow(12, 10)),
            KV.of(1L, valueInIntervalWindow(22, 10)),
            KV.of(2L, valueInIntervalWindow(32, 10)));
    final List<KV<Long, WindowedValue<Long>>> secondElements =
        Arrays.asList(
            KV.of(0L, valueInIntervalWindow(42, 20)),
            KV.of(1L, valueInIntervalWindow(52, 20)),
            KV.of(2L, valueInIntervalWindow(62, 20)));
    final List<KV<Long, WindowedValue<Long>>> thirdElements =
        Arrays.asList(
            KV.of(0L, valueInIntervalWindow(42L, 30)),
            KV.of(1L, valueInIntervalWindow(52L, 30)),
            KV.of(2L, valueInIntervalWindow(62L, 30)));

    final PCollectionView<List<Long>> view =
        Pipeline.create()
            .apply(Create.empty(VarLongCoder.of()))
            .apply(Window.into(FixedWindows.of(Duration.millis(10))))
            .apply(View.asList());

    Source sourceA = initInputFile(fromKvsForList(concat(firstElements, secondElements)), ismCoder);
    Source sourceB = initInputFile(fromKvsForList(thirdElements), ismCoder);

    final IsmSideInputReader reader =
        sideInputReader(view.getTagInternal().getId(), sourceA, sourceB);

    List<Callable<Map<BoundedWindow, List<Long>>>> tasks = new ArrayList<>();
    for (int i = 0; i < NUM_THREADS; ++i) {
      tasks.add(
          () -> {
            // Store a strong reference to the returned value so that the logical reference
            // cache is not cleared for this test.
            List<Long> firstValues = reader.get(view, intervalWindow(10));
            List<Long> secondValues = reader.get(view, intervalWindow(20));
            List<Long> thirdValues = reader.get(view, intervalWindow(30));

            verifyList(toValueList(firstElements), firstValues);
            verifyList(toValueList(secondElements), secondValues);
            verifyList(toValueList(thirdElements), thirdValues);

            // Assert that the same value reference was returned showing that it was cached.
            assertSame(firstValues, reader.get(view, intervalWindow(10)));
            assertSame(secondValues, reader.get(view, intervalWindow(20)));
            assertSame(thirdValues, reader.get(view, intervalWindow(30)));

            // Also verify when requesting a window that is not part of the side input
            assertEquals(Collections.EMPTY_LIST, reader.get(view, intervalWindow(40)));

            return ImmutableMap.<BoundedWindow, List<Long>>of(
                intervalWindow(10), firstValues,
                intervalWindow(20), secondValues,
                intervalWindow(30), thirdValues);
          });
    }

    List<Future<Map<BoundedWindow, List<Long>>>> results =
        pipelineOptions.getExecutorService().invokeAll(tasks);
    Map<BoundedWindow, List<Long>> value = results.get(0).get();
    // Assert that all threads got back the same reference
    for (Future<Map<BoundedWindow, List<Long>>> result : results) {
      assertEquals(value, result.get());
      for (Map.Entry<BoundedWindow, List<Long>> entry : result.get().entrySet()) {
        assertSame(value.get(entry.getKey()), entry.getValue());
      }
    }
  }

  @Test
  public void testMap() throws Exception {
    // Note that we purposely use byte[]s as keys to force structural equality testing
    // versus using java equality testing.
    Coder<WindowedValue<Long>> valueCoder =
        WindowedValue.getFullCoder(VarLongCoder.of(), GLOBAL_WINDOW_CODER);
    final ListMultimap<byte[], WindowedValue<Long>> elements =
        ImmutableListMultimap.<byte[], WindowedValue<Long>>builder()
            .put(new byte[] {0x00}, valueInGlobalWindow(12L))
            .put(new byte[] {0x01}, valueInGlobalWindow(22L))
            .put(new byte[] {0x02}, valueInGlobalWindow(32L))
            .put(new byte[] {0x03}, valueInGlobalWindow(42L))
            .put(new byte[] {0x04}, valueInGlobalWindow(52L))
            .put(new byte[] {0x05}, valueInGlobalWindow(62L))
            .build();

    final PCollectionView<Map<byte[], Long>> view =
        Pipeline.create()
            .apply(Create.empty(KvCoder.of(ByteArrayCoder.of(), VarLongCoder.of())))
            .apply(View.asMap());

    IsmRecordCoder<WindowedValue<Long>> ismCoder =
        IsmRecordCoder.of(
            1,
            2,
            ImmutableList.of(
                MetadataKeyCoder.of(ByteArrayCoder.of()),
                GLOBAL_WINDOW_CODER,
                BigEndianLongCoder.of()),
            valueCoder);

    Multimap<Integer, IsmRecord<WindowedValue<Long>>> elementsPerShard = forMap(ismCoder, elements);
    List<IsmRecord<WindowedValue<Long>>> firstElements = new ArrayList<>();
    List<IsmRecord<WindowedValue<Long>>> secondElements = new ArrayList<>();
    for (Map.Entry<Integer, Collection<IsmRecord<WindowedValue<Long>>>> entry :
        elementsPerShard.asMap().entrySet()) {
      if (entry.getKey() % 2 == 0) {
        firstElements.addAll(entry.getValue());
      } else {
        secondElements.addAll(entry.getValue());
      }
    }
    // Ensure that each file will have some records.
    checkState(!firstElements.isEmpty());
    checkState(!secondElements.isEmpty());

    Source sourceA = initInputFile(firstElements, ismCoder);
    Source sourceB = initInputFile(secondElements, ismCoder);

    List<IsmRecord<WindowedValue<Long>>> mapMetadata =
        forMapMetadata(ByteArrayCoder.of(), elements.keySet(), GlobalWindow.INSTANCE);
    Source sourceMeta = initInputFile(mapMetadata, ismCoder);

    final IsmSideInputReader reader =
        sideInputReader(view.getTagInternal().getId(), sourceA, sourceB, sourceMeta);

    List<Callable<Map<byte[], Long>>> tasks = new ArrayList<>();
    for (int i = 0; i < NUM_THREADS; ++i) {
      tasks.add(
          () -> {
            // Store a strong reference to the returned value so that the logical reference
            // cache is not cleared for this test.
            Map<byte[], Long> value = reader.get(view, GlobalWindow.INSTANCE);
            verifyMap(
                Maps.transformValues(elements.asMap(), new TransformForMap<Long>()),
                value,
                new ComparatorForMap<Long>());
            // Assert that the same value reference was returned showing that it was cached.
            assertSame(reader.get(view, GlobalWindow.INSTANCE), value);

            return value;
          });
    }

    List<Future<Map<byte[], Long>>> results = pipelineOptions.getExecutorService().invokeAll(tasks);
    Map<byte[], Long> value = results.get(0).get();
    // Assert that all threads got back the same reference
    for (Future<Map<byte[], Long>> result : results) {
      assertSame(value, result.get());
    }
  }

  @Test
  public void testMapInWindow() throws Exception {
    // Note that we purposely use byte[]s as keys to force structural equality testing
    // versus using java equality testing.
    Coder<WindowedValue<Long>> valueCoder =
        WindowedValue.getFullCoder(VarLongCoder.of(), INTERVAL_WINDOW_CODER);
    final ListMultimap<byte[], WindowedValue<Long>> firstWindow =
        ImmutableListMultimap.<byte[], WindowedValue<Long>>builder()
            .put(new byte[] {0x00}, valueInIntervalWindow(12L, 10))
            .put(new byte[] {0x01}, valueInIntervalWindow(22L, 10))
            .put(new byte[] {0x02}, valueInIntervalWindow(32L, 10))
            .build();
    final ListMultimap<byte[], WindowedValue<Long>> secondWindow =
        ImmutableListMultimap.<byte[], WindowedValue<Long>>builder()
            .put(new byte[] {0x00}, valueInIntervalWindow(42L, 20))
            .put(new byte[] {0x03}, valueInIntervalWindow(52L, 20))
            .put(new byte[] {0x02}, valueInIntervalWindow(62L, 20))
            .build();
    final ListMultimap<byte[], WindowedValue<Long>> thirdWindow =
        ImmutableListMultimap.<byte[], WindowedValue<Long>>builder()
            .put(new byte[] {0x02}, valueInIntervalWindow(72L, 30))
            .put(new byte[] {0x04}, valueInIntervalWindow(82L, 30))
            .put(new byte[] {0x05}, valueInIntervalWindow(92L, 30))
            .build();

    final PCollectionView<Map<byte[], Long>> view =
        Pipeline.create()
            .apply(Create.empty(KvCoder.of(ByteArrayCoder.of(), VarLongCoder.of())))
            .apply(Window.into(FixedWindows.of(Duration.millis(10))))
            .apply(View.asMap());

    IsmRecordCoder<WindowedValue<Long>> ismCoder =
        IsmRecordCoder.of(
            1,
            2,
            ImmutableList.of(
                MetadataKeyCoder.of(ByteArrayCoder.of()),
                INTERVAL_WINDOW_CODER,
                BigEndianLongCoder.of()),
            valueCoder);

    Multimap<Integer, IsmRecord<WindowedValue<Long>>> elementsPerShard =
        forMap(ismCoder, firstWindow);
    elementsPerShard.putAll(forMap(ismCoder, secondWindow));
    elementsPerShard.putAll(forMap(ismCoder, thirdWindow));

    List<IsmRecord<WindowedValue<Long>>> firstElements = new ArrayList<>();
    List<IsmRecord<WindowedValue<Long>>> secondElements = new ArrayList<>();
    for (Map.Entry<Integer, Collection<IsmRecord<WindowedValue<Long>>>> entry :
        elementsPerShard.asMap().entrySet()) {
      if (entry.getKey() % 2 == 0) {
        firstElements.addAll(entry.getValue());
      } else {
        secondElements.addAll(entry.getValue());
      }
    }
    // Ensure that each file will have some records.
    checkState(!firstElements.isEmpty());
    checkState(!secondElements.isEmpty());

    Source sourceA = initInputFile(firstElements, ismCoder);
    Source sourceB = initInputFile(secondElements, ismCoder);

    List<IsmRecord<WindowedValue<Long>>> firstWindowMapMetadata =
        forMapMetadata(ByteArrayCoder.of(), firstWindow.keySet(), intervalWindow(10));
    List<IsmRecord<WindowedValue<Long>>> secondWindowMapMetadata =
        forMapMetadata(ByteArrayCoder.of(), secondWindow.keySet(), intervalWindow(20));
    List<IsmRecord<WindowedValue<Long>>> thirdWindowMapMetadata =
        forMapMetadata(ByteArrayCoder.of(), thirdWindow.keySet(), intervalWindow(30));
    Source sourceMetaA = initInputFile(firstWindowMapMetadata, ismCoder);
    Source sourceMetaB =
        initInputFile(concat(secondWindowMapMetadata, thirdWindowMapMetadata), ismCoder);

    final IsmSideInputReader reader =
        sideInputReader(view.getTagInternal().getId(), sourceA, sourceB, sourceMetaA, sourceMetaB);

    List<Callable<Map<BoundedWindow, Map<byte[], Long>>>> tasks = new ArrayList<>();
    for (int i = 0; i < NUM_THREADS; ++i) {
      tasks.add(
          () -> {
            // Store a strong reference to the returned value so that the logical reference
            // cache is not cleared for this test.
            Map<byte[], Long> firstValues = reader.get(view, intervalWindow(10));
            Map<byte[], Long> secondValues = reader.get(view, intervalWindow(20));
            Map<byte[], Long> thirdValues = reader.get(view, intervalWindow(30));

            verifyMap(
                Maps.transformValues(firstWindow.asMap(), new TransformForMap<Long>()),
                firstValues,
                new ComparatorForMap<Long>());
            verifyMap(
                Maps.transformValues(secondWindow.asMap(), new TransformForMap<Long>()),
                secondValues,
                new ComparatorForMap<Long>());
            verifyMap(
                Maps.transformValues(thirdWindow.asMap(), new TransformForMap<Long>()),
                thirdValues,
                new ComparatorForMap<Long>());

            // Assert that the same value reference was returned showing that it was cached.
            assertSame(firstValues, reader.get(view, intervalWindow(10)));
            assertSame(secondValues, reader.get(view, intervalWindow(20)));
            assertSame(thirdValues, reader.get(view, intervalWindow(30)));

            // Also verify when requesting a window that is not part of the side input
            assertEquals(Collections.EMPTY_MAP, reader.get(view, intervalWindow(40)));

            return ImmutableMap.<BoundedWindow, Map<byte[], Long>>of(
                intervalWindow(10), firstValues,
                intervalWindow(20), secondValues,
                intervalWindow(30), thirdValues);
          });
    }

    List<Future<Map<BoundedWindow, Map<byte[], Long>>>> results =
        pipelineOptions.getExecutorService().invokeAll(tasks);
    Map<BoundedWindow, Map<byte[], Long>> value = results.get(0).get();
    // Assert that all threads got back the same reference
    for (Future<Map<BoundedWindow, Map<byte[], Long>>> result : results) {
      assertEquals(value, result.get());
      for (Map.Entry<BoundedWindow, Map<byte[], Long>> entry : result.get().entrySet()) {
        assertSame(value.get(entry.getKey()), entry.getValue());
      }
    }
  }

  @Test
  public void testMultimap() throws Exception {
    // Note that we purposely use byte[]s as keys to force structural equality testing
    // versus using java equality testing.
    Coder<WindowedValue<Long>> valueCoder =
        WindowedValue.getFullCoder(VarLongCoder.of(), GLOBAL_WINDOW_CODER);
    final ListMultimap<byte[], WindowedValue<Long>> elements =
        ImmutableListMultimap.<byte[], WindowedValue<Long>>builder()
            .put(new byte[] {0x00}, valueInGlobalWindow(12L))
            .put(new byte[] {0x01}, valueInGlobalWindow(22L))
            .put(new byte[] {0x02}, valueInGlobalWindow(32L))
            .put(new byte[] {0x03}, valueInGlobalWindow(42L))
            .put(new byte[] {0x04}, valueInGlobalWindow(52L))
            .put(new byte[] {0x05}, valueInGlobalWindow(62L))
            .build();

    final PCollectionView<Map<byte[], Iterable<Long>>> view =
        Pipeline.create()
            .apply(Create.empty(KvCoder.of(ByteArrayCoder.of(), VarLongCoder.of())))
            .apply(View.asMultimap());

    IsmRecordCoder<WindowedValue<Long>> ismCoder =
        IsmRecordCoder.of(
            1,
            2,
            ImmutableList.of(
                MetadataKeyCoder.of(ByteArrayCoder.of()),
                GLOBAL_WINDOW_CODER,
                BigEndianLongCoder.of()),
            valueCoder);

    Multimap<Integer, IsmRecord<WindowedValue<Long>>> elementsPerShard = forMap(ismCoder, elements);
    List<IsmRecord<WindowedValue<Long>>> firstElements = new ArrayList<>();
    List<IsmRecord<WindowedValue<Long>>> secondElements = new ArrayList<>();
    for (Map.Entry<Integer, Collection<IsmRecord<WindowedValue<Long>>>> entry :
        elementsPerShard.asMap().entrySet()) {
      if (entry.getKey() % 2 == 0) {
        firstElements.addAll(entry.getValue());
      } else {
        secondElements.addAll(entry.getValue());
      }
    }
    // Ensure that each file will have some records.
    checkState(!firstElements.isEmpty());
    checkState(!secondElements.isEmpty());

    Source sourceA = initInputFile(firstElements, ismCoder);
    Source sourceB = initInputFile(secondElements, ismCoder);

    List<IsmRecord<WindowedValue<Long>>> mapMetadata =
        forMapMetadata(ByteArrayCoder.of(), elements.keySet(), GlobalWindow.INSTANCE);
    Source sourceMeta = initInputFile(mapMetadata, ismCoder);

    final IsmSideInputReader reader =
        sideInputReader(view.getTagInternal().getId(), sourceA, sourceB, sourceMeta);

    List<Callable<Map<byte[], Iterable<Long>>>> tasks = new ArrayList<>();
    for (int i = 0; i < NUM_THREADS; ++i) {
      tasks.add(
          () -> {
            // Store a strong reference to the returned value so that the logical reference
            // cache is not cleared for this test.
            Map<byte[], Iterable<Long>> value = reader.get(view, GlobalWindow.INSTANCE);
            verifyMap(
                Maps.transformValues(elements.asMap(), new TransformForMultimap<Long>()),
                value,
                new ComparatorForMultimap<Long>());
            // Assert that the same value reference was returned showing that it was cached.
            assertSame(reader.get(view, GlobalWindow.INSTANCE), value);

            return value;
          });
    }

    List<Future<Map<byte[], Iterable<Long>>>> results =
        pipelineOptions.getExecutorService().invokeAll(tasks);
    Map<byte[], Iterable<Long>> value = results.get(0).get();
    // Assert that all threads got back the same reference
    for (Future<Map<byte[], Iterable<Long>>> result : results) {
      assertSame(value, result.get());
    }
  }

  @Test
  public void testMultimapInWindow() throws Exception {
    // Note that we purposely use byte[]s as keys to force structural equality testing
    // versus using java equality testing.
    Coder<WindowedValue<Long>> valueCoder =
        WindowedValue.getFullCoder(VarLongCoder.of(), INTERVAL_WINDOW_CODER);
    final ListMultimap<byte[], WindowedValue<Long>> firstWindow =
        ImmutableListMultimap.<byte[], WindowedValue<Long>>builder()
            .put(new byte[] {0x00}, valueInIntervalWindow(12L, 10))
            .put(new byte[] {0x01}, valueInIntervalWindow(22L, 10))
            .put(new byte[] {0x02}, valueInIntervalWindow(32L, 10))
            .build();
    final ListMultimap<byte[], WindowedValue<Long>> secondWindow =
        ImmutableListMultimap.<byte[], WindowedValue<Long>>builder()
            .put(new byte[] {0x00}, valueInIntervalWindow(42L, 20))
            .put(new byte[] {0x03}, valueInIntervalWindow(52L, 20))
            .put(new byte[] {0x02}, valueInIntervalWindow(62L, 20))
            .build();
    final ListMultimap<byte[], WindowedValue<Long>> thirdWindow =
        ImmutableListMultimap.<byte[], WindowedValue<Long>>builder()
            .put(new byte[] {0x02}, valueInIntervalWindow(72L, 30))
            .put(new byte[] {0x04}, valueInIntervalWindow(82L, 30))
            .put(new byte[] {0x05}, valueInIntervalWindow(92L, 30))
            .build();

    final PCollectionView<Map<byte[], Iterable<Long>>> view =
        Pipeline.create()
            .apply(Create.empty(KvCoder.of(ByteArrayCoder.of(), VarLongCoder.of())))
            .apply(Window.into(FixedWindows.of(Duration.millis(10))))
            .apply(View.asMultimap());

    IsmRecordCoder<WindowedValue<Long>> ismCoder =
        IsmRecordCoder.of(
            1,
            2,
            ImmutableList.of(
                MetadataKeyCoder.of(ByteArrayCoder.of()),
                INTERVAL_WINDOW_CODER,
                BigEndianLongCoder.of()),
            valueCoder);

    Multimap<Integer, IsmRecord<WindowedValue<Long>>> elementsPerShard =
        forMap(ismCoder, firstWindow);
    elementsPerShard.putAll(forMap(ismCoder, secondWindow));
    elementsPerShard.putAll(forMap(ismCoder, thirdWindow));

    List<IsmRecord<WindowedValue<Long>>> firstElements = new ArrayList<>();
    List<IsmRecord<WindowedValue<Long>>> secondElements = new ArrayList<>();
    for (Map.Entry<Integer, Collection<IsmRecord<WindowedValue<Long>>>> entry :
        elementsPerShard.asMap().entrySet()) {
      if (entry.getKey() % 2 == 0) {
        firstElements.addAll(entry.getValue());
      } else {
        secondElements.addAll(entry.getValue());
      }
    }
    // Ensure that each file will have some records.
    checkState(!firstElements.isEmpty());
    checkState(!secondElements.isEmpty());

    Source sourceA = initInputFile(firstElements, ismCoder);
    Source sourceB = initInputFile(secondElements, ismCoder);

    List<IsmRecord<WindowedValue<Long>>> firstWindowMapMetadata =
        forMapMetadata(ByteArrayCoder.of(), firstWindow.keySet(), intervalWindow(10));
    List<IsmRecord<WindowedValue<Long>>> secondWindowMapMetadata =
        forMapMetadata(ByteArrayCoder.of(), secondWindow.keySet(), intervalWindow(20));
    List<IsmRecord<WindowedValue<Long>>> thirdWindowMapMetadata =
        forMapMetadata(ByteArrayCoder.of(), thirdWindow.keySet(), intervalWindow(30));
    Source sourceMetaA = initInputFile(firstWindowMapMetadata, ismCoder);
    Source sourceMetaB =
        initInputFile(concat(secondWindowMapMetadata, thirdWindowMapMetadata), ismCoder);

    final IsmSideInputReader reader =
        sideInputReader(view.getTagInternal().getId(), sourceA, sourceB, sourceMetaA, sourceMetaB);

    List<Callable<Map<BoundedWindow, Map<byte[], Iterable<Long>>>>> tasks = new ArrayList<>();
    for (int i = 0; i < NUM_THREADS; ++i) {
      tasks.add(
          () -> {
            // Store a strong reference to the returned value so that the logical reference
            // cache is not cleared for this test.
            Map<byte[], Iterable<Long>> firstValues = reader.get(view, intervalWindow(10));
            Map<byte[], Iterable<Long>> secondValues = reader.get(view, intervalWindow(20));
            Map<byte[], Iterable<Long>> thirdValues = reader.get(view, intervalWindow(30));

            verifyMap(
                Maps.transformValues(firstWindow.asMap(), new TransformForMultimap<Long>()),
                firstValues,
                new ComparatorForMultimap<Long>());
            verifyMap(
                Maps.transformValues(secondWindow.asMap(), new TransformForMultimap<Long>()),
                secondValues,
                new ComparatorForMultimap<Long>());
            verifyMap(
                Maps.transformValues(thirdWindow.asMap(), new TransformForMultimap<Long>()),
                thirdValues,
                new ComparatorForMultimap<Long>());

            // Assert that the same value reference was returned showing that it was cached.
            assertSame(firstValues, reader.get(view, intervalWindow(10)));
            assertSame(secondValues, reader.get(view, intervalWindow(20)));
            assertSame(thirdValues, reader.get(view, intervalWindow(30)));

            // Also verify when requesting a window that is not part of the side input
            assertEquals(Collections.EMPTY_MAP, reader.get(view, intervalWindow(40)));

            return ImmutableMap.<BoundedWindow, Map<byte[], Iterable<Long>>>of(
                intervalWindow(10), firstValues,
                intervalWindow(20), secondValues,
                intervalWindow(30), thirdValues);
          });
    }

    List<Future<Map<BoundedWindow, Map<byte[], Iterable<Long>>>>> results =
        pipelineOptions.getExecutorService().invokeAll(tasks);
    Map<BoundedWindow, Map<byte[], Iterable<Long>>> value = results.get(0).get();
    // Assert that all threads got back the same reference
    for (Future<Map<BoundedWindow, Map<byte[], Iterable<Long>>>> result : results) {
      assertEquals(value, result.get());
      for (Map.Entry<BoundedWindow, Map<byte[], Iterable<Long>>> entry : result.get().entrySet()) {
        assertSame(value.get(entry.getKey()), entry.getValue());
      }
    }
  }

  @Test
  public void testMultimapViewInWindow() throws Exception {
    // Note that we purposely use byte[]s as keys to force structural equality testing
    // versus using java equality testing. Since we want to define a duplicate key for
    // the multimap, we specifically use the same instance of the byte[].
    byte[] duplicateKey = new byte[] {0x01};
    Coder<WindowedValue<Long>> valueCoder =
        WindowedValue.getFullCoder(VarLongCoder.of(), INTERVAL_WINDOW_CODER);
    final ListMultimap<byte[], WindowedValue<Long>> firstWindow =
        ImmutableListMultimap.<byte[], WindowedValue<Long>>builder()
            .put(new byte[] {0x00}, valueInIntervalWindow(12L, 10))
            .put(duplicateKey, valueInIntervalWindow(22L, 10))
            .put(duplicateKey, valueInIntervalWindow(23L, 10))
            .put(new byte[] {0x02}, valueInIntervalWindow(32L, 10))
            .build();
    final ListMultimap<byte[], WindowedValue<Long>> secondWindow =
        ImmutableListMultimap.<byte[], WindowedValue<Long>>builder()
            .put(new byte[] {0x00}, valueInIntervalWindow(42L, 20))
            .put(new byte[] {0x03}, valueInIntervalWindow(52L, 20))
            .put(new byte[] {0x02}, valueInIntervalWindow(62L, 20))
            .build();
    final ListMultimap<byte[], WindowedValue<Long>> thirdWindow =
        ImmutableListMultimap.<byte[], WindowedValue<Long>>builder()
            .put(new byte[] {0x02}, valueInIntervalWindow(73L, 30))
            .put(new byte[] {0x04}, valueInIntervalWindow(82L, 30))
            .put(new byte[] {0x05}, valueInIntervalWindow(92L, 30))
            .build();

    final PCollectionView<MultimapView<byte[], WindowedValue<Long>>> view =
        DataflowPortabilityPCollectionView.with(
            new TupleTag<>(),
            FullWindowedValueCoder.of(
                KvCoder.of(ByteArrayCoder.of(), valueCoder), INTERVAL_WINDOW_CODER));

    IsmRecordCoder<WindowedValue<Long>> ismCoder =
        IsmRecordCoder.of(
            1,
            0,
            ImmutableList.of(ByteArrayCoder.of(), INTERVAL_WINDOW_CODER, BigEndianLongCoder.of()),
            valueCoder);

    Multimap<Integer, IsmRecord<WindowedValue<Long>>> elementsPerShard =
        forMap(ismCoder, firstWindow);
    elementsPerShard.putAll(forMap(ismCoder, secondWindow));
    elementsPerShard.putAll(forMap(ismCoder, thirdWindow));

    List<IsmRecord<WindowedValue<Long>>> firstElements = new ArrayList<>();
    List<IsmRecord<WindowedValue<Long>>> secondElements = new ArrayList<>();
    for (Map.Entry<Integer, Collection<IsmRecord<WindowedValue<Long>>>> entry :
        elementsPerShard.asMap().entrySet()) {
      if (entry.getKey() % 2 == 0) {
        firstElements.addAll(entry.getValue());
      } else {
        secondElements.addAll(entry.getValue());
      }
    }
    // Ensure that each file will have some records.
    checkState(!firstElements.isEmpty());
    checkState(!secondElements.isEmpty());

    Source sourceA = initInputFile(firstElements, ismCoder);
    Source sourceB = initInputFile(secondElements, ismCoder);

    final IsmSideInputReader reader =
        sideInputReader(view.getTagInternal().getId(), sourceA, sourceB);

    List<Callable<Map<BoundedWindow, MultimapView<byte[], WindowedValue<Long>>>>> tasks =
        new ArrayList<>();
    for (int i = 0; i < 3; ++i) {
      tasks.add(
          () -> {
            // Store a strong reference to the returned value so that the logical reference
            // cache is not cleared for this test.
            MultimapView<byte[], WindowedValue<Long>> firstValues =
                reader.get(view, intervalWindow(10));
            MultimapView<byte[], WindowedValue<Long>> secondValues =
                reader.get(view, intervalWindow(20));
            MultimapView<byte[], WindowedValue<Long>> thirdValues =
                reader.get(view, intervalWindow(30));

            for (Map.Entry<byte[], Collection<WindowedValue<Long>>> entry :
                firstWindow.asMap().entrySet()) {
              verifyIterable(entry.getValue(), firstValues.get(entry.getKey()));
            }
            for (Map.Entry<byte[], Collection<WindowedValue<Long>>> entry :
                secondWindow.asMap().entrySet()) {
              verifyIterable(entry.getValue(), secondValues.get(entry.getKey()));
            }
            for (Map.Entry<byte[], Collection<WindowedValue<Long>>> entry :
                thirdWindow.asMap().entrySet()) {
              verifyIterable(entry.getValue(), thirdValues.get(entry.getKey()));
            }

            // Assert that the same value reference was returned showing that it was cached.
            assertSame(firstValues, reader.get(view, intervalWindow(10)));
            assertSame(secondValues, reader.get(view, intervalWindow(20)));
            assertSame(thirdValues, reader.get(view, intervalWindow(30)));

            return ImmutableMap.of(
                intervalWindow(10), firstValues,
                intervalWindow(20), secondValues,
                intervalWindow(30), thirdValues);
          });
    }

    List<Future<Map<BoundedWindow, MultimapView<byte[], WindowedValue<Long>>>>> results =
        pipelineOptions.getExecutorService().invokeAll(tasks);
    Map<BoundedWindow, MultimapView<byte[], WindowedValue<Long>>> value = results.get(0).get();
    // Assert that all threads got back the same reference
    for (Future<Map<BoundedWindow, MultimapView<byte[], WindowedValue<Long>>>> result : results) {
      assertEquals(value, result.get());
      for (Map.Entry<BoundedWindow, MultimapView<byte[], WindowedValue<Long>>> entry :
          result.get().entrySet()) {
        assertSame(value.get(entry.getKey()), entry.getValue());
      }
    }
  }

  @Test
  public void testIterableSideInputReadCounter() throws Exception {
    // These are the expected msec and byte counters:
    CounterUpdate expectedSideInputMsecUpdate =
        new CounterUpdate()
            .setStructuredNameAndMetadata(
                new CounterStructuredNameAndMetadata()
                    .setMetadata(new CounterMetadata().setKind("SUM"))
                    .setName(
                        new CounterStructuredName()
                            .setOrigin("SYSTEM")
                            .setName("read-sideinput-msecs")
                            .setOriginalStepName("originalName")
                            .setExecutionStepName("stageName")
                            .setOriginalRequestingStepName("originalName2")
                            .setInputIndex(1)))
            .setCumulative(true)
            .setInteger(new SplitInt64().setHighBits(0).setLowBits(0L));
    CounterName expectedCounterName =
        CounterName.named("read-sideinput-byte-count")
            .withOriginalName(operationContext.nameContext())
            .withOrigin("SYSTEM")
            .withOriginalRequestingStepName("originalName2")
            .withInputIndex(1);

    // Test startup:
    Coder<WindowedValue<Long>> valueCoder =
        WindowedValue.getFullCoder(VarLongCoder.of(), GLOBAL_WINDOW_CODER);
    IsmRecordCoder<WindowedValue<Long>> ismCoder =
        IsmRecordCoder.of(
            1, 0, ImmutableList.of(GLOBAL_WINDOW_CODER, BigEndianLongCoder.of()), valueCoder);

    // Create a new state, which represents a step that receives the side input.
    DataflowExecutionState state2 =
        executionContext
            .getExecutionStateRegistry()
            .getState(
                NameContext.create("stageName", "originalName2", "systemName2", "userName2"),
                "process",
                null,
                NoopProfileScope.NOOP);

    final List<KV<Long, WindowedValue<Long>>> firstElements =
        Arrays.asList(KV.of(0L, valueInGlobalWindow(0L)));
    final List<KV<Long, WindowedValue<Long>>> secondElements = new ArrayList<>();
    for (long i = 0; i < 100; i++) {
      secondElements.add(KV.of(i, valueInGlobalWindow(i * 10)));
    }

    final PCollectionView<Iterable<Long>> view =
        Pipeline.create().apply(Create.empty(VarLongCoder.of())).apply(View.asIterable());

    Source sourceA = initInputFile(fromKvsForList(firstElements), ismCoder);
    Source sourceB = initInputFile(fromKvsForList(secondElements), ismCoder);
    try (Closeable state2Closeable =
        executionContext.getExecutionStateTracker().enterState(state2)) {
      final IsmSideInputReader reader =
          serialSideInputReader(view.getTagInternal().getId(), sourceA, sourceB);
      // Store a strong reference to the returned value so that the logical reference
      // cache is not cleared for this test.
      Iterable<Long> value = reader.get(view, GlobalWindow.INSTANCE);
      verifyIterable(toValueList(concat(firstElements, secondElements)), value);
      // Assert that the same value reference was returned showing that it was cached.
      assertSame(reader.get(view, GlobalWindow.INSTANCE), value);

      Iterable<CounterUpdate> counterUpdates =
          executionContext.getExecutionStateRegistry().extractUpdates(true);
      assertThat(counterUpdates, hasItem(expectedSideInputMsecUpdate));
      Counter<?, ?> expectedCounter = counterFactory.getExistingCounter(expectedCounterName);
      assertNotNull(expectedCounter);
    }
  }

  @Test
  public void testIsmReaderReferenceCaching() throws Exception {
    Coder<WindowedValue<Long>> valueCoder =
        WindowedValue.getFullCoder(VarLongCoder.of(), GLOBAL_WINDOW_CODER);
    final WindowedValue<Long> element = valueInGlobalWindow(42L);
    final PCollectionView<Long> view =
        Pipeline.create().apply(Create.empty(VarLongCoder.of())).apply(View.asSingleton());

    final Source source =
        initInputFile(
            fromValues(Arrays.asList(element)),
            IsmRecordCoder.of(1, 0, ImmutableList.<Coder<?>>of(GLOBAL_WINDOW_CODER), valueCoder));
    final Source emptySource =
        initInputFile(
            fromValues(Arrays.asList()),
            IsmRecordCoder.of(1, 0, ImmutableList.<Coder<?>>of(GLOBAL_WINDOW_CODER), valueCoder));

    final IsmSideInputReader reader =
        sideInputReader(view.getTagInternal().getId(), source, emptySource);

    assertTrue(reader.tagToIsmReaderMap.containsKey(view.getTagInternal()));
    assertEquals(1, reader.tagToIsmReaderMap.get(view.getTagInternal()).size());
    assertEquals(
        FileSystems.matchSingleFileSpec(getString(source.getSpec(), WorkerPropertyNames.FILENAME))
            .resourceId(),
        reader.tagToIsmReaderMap.get(view.getTagInternal()).get(0).getResourceId());
    assertTrue(reader.tagToEmptyIsmReaderMap.containsKey(view.getTagInternal()));
    assertEquals(1, reader.tagToEmptyIsmReaderMap.get(view.getTagInternal()).size());
    assertEquals(
        FileSystems.matchSingleFileSpec(
                getString(emptySource.getSpec(), WorkerPropertyNames.FILENAME))
            .resourceId(),
        reader.tagToEmptyIsmReaderMap.get(view.getTagInternal()).get(0).getResourceId());
  }

  private static class TransformForMap<T> implements Function<Collection<WindowedValue<T>>, T> {
    @Override
    public T apply(Collection<WindowedValue<T>> input) {
      return Iterables.getOnlyElement(input).getValue();
    }
  }

  private static class TransformForMultimap<T>
      implements Function<Collection<WindowedValue<T>>, Iterable<T>> {
    @Override
    public Iterable<T> apply(Collection<WindowedValue<T>> input) {
      return Iterables.transform(input, WindowedValue::getValue);
    }
  }

  private static class ComparatorForMap<T> implements Comparator<T> {
    @Override
    public int compare(T o1, T o2) {
      assertEquals(o1, o2);
      return 0;
    }
  }

  private static class ComparatorForMultimap<T> implements Comparator<Iterable<T>> {
    @Override
    public int compare(Iterable<T> o1, Iterable<T> o2) {
      ArrayList<T> elements = new ArrayList<>();
      Iterables.addAll(elements, o2);
      @SuppressWarnings({"unchecked", "rawtypes"})
      Matcher<Iterable<T>> matcher = (Matcher) containsInAnyOrder(elements.toArray());
      assertThat(o1, matcher);
      return 0;
    }
  }

  private <T> void verifyIterable(Collection<T> expectedElements, Iterable<T> actual) {
    Iterator<T> actualIterator = actual.iterator();
    // Test with using hasNext
    for (T expectedElement : expectedElements) {
      assertTrue(actualIterator.hasNext());
      assertEquals(expectedElement, actualIterator.next());
    }
    assertFalse(actualIterator.hasNext());
    try {
      actualIterator.next();
      fail("NoSuchElementException was expected");
    } catch (NoSuchElementException expected) {
    }

    actualIterator = actual.iterator();
    // Test without using hasNext
    for (T expectedElement : expectedElements) {
      assertEquals(expectedElement, actualIterator.next());
    }
    try {
      actualIterator.next();
      fail("NoSuchElementException was expected");
    } catch (NoSuchElementException expected) {
    }
  }

  private <T> void verifyList(List<T> expected, List<T> actual) {
    assertEquals(expected.size(), actual.size());

    List<Integer> iterationOrder = new ArrayList<Integer>();
    Random random = new Random(1892389023490L);
    for (int i = 0; i < expected.size(); ++i) {
      iterationOrder.add(i);
    }
    Collections.shuffle(iterationOrder, random);

    // Test random iteration order
    for (int index : iterationOrder) {
      assertEquals(expected.get(index), actual.get(index));
    }

    // Test the iterator
    verifyIterable(expected, actual);

    // Test the list iterator
    for (int index : iterationOrder) {
      ListIterator<T> expectedListIterator = expected.listIterator(index);
      ListIterator<T> actualListIterator = actual.listIterator(index);

      assertEquals(expectedListIterator.hasNext(), actualListIterator.hasNext());
      assertEquals(expectedListIterator.hasPrevious(), actualListIterator.hasPrevious());
      assertEquals(expectedListIterator.nextIndex(), actualListIterator.nextIndex());
      assertEquals(expectedListIterator.previousIndex(), actualListIterator.previousIndex());

      if (expectedListIterator.hasNext()) {
        // Move to next element and compare
        assertEquals(expectedListIterator.next(), actualListIterator.next());
        assertEquals(expectedListIterator.hasNext(), actualListIterator.hasNext());
        assertEquals(expectedListIterator.hasPrevious(), actualListIterator.hasPrevious());
        assertEquals(expectedListIterator.nextIndex(), actualListIterator.nextIndex());
        assertEquals(expectedListIterator.previousIndex(), actualListIterator.previousIndex());

        // Move to previous element and compare
        assertEquals(expectedListIterator.previous(), actualListIterator.previous());
        assertEquals(expectedListIterator.hasNext(), actualListIterator.hasNext());
        assertEquals(expectedListIterator.hasPrevious(), actualListIterator.hasPrevious());
        assertEquals(expectedListIterator.nextIndex(), actualListIterator.nextIndex());
        assertEquals(expectedListIterator.previousIndex(), actualListIterator.previousIndex());
      }

      if (expectedListIterator.hasPrevious()) {
        // Move to previous element and compare
        assertEquals(expectedListIterator.previous(), actualListIterator.previous());
        assertEquals(expectedListIterator.hasNext(), actualListIterator.hasNext());
        assertEquals(expectedListIterator.hasPrevious(), actualListIterator.hasPrevious());
        assertEquals(expectedListIterator.nextIndex(), actualListIterator.nextIndex());
        assertEquals(expectedListIterator.previousIndex(), actualListIterator.previousIndex());

        // Move to next element and compare
        assertEquals(expectedListIterator.next(), actualListIterator.next());
        assertEquals(expectedListIterator.hasNext(), actualListIterator.hasNext());
        assertEquals(expectedListIterator.hasPrevious(), actualListIterator.hasPrevious());
        assertEquals(expectedListIterator.nextIndex(), actualListIterator.nextIndex());
        assertEquals(expectedListIterator.previousIndex(), actualListIterator.previousIndex());
      }
    }
  }

  private static <T> void verifyMap(
      Map<byte[], T> expectedMap, Map<byte[], T> mapView, Comparator<T> valueComparator) {
    List<Entry<byte[], T>> expectedElements = new ArrayList<>(expectedMap.entrySet());

    Random random = new Random(1237812387L);

    // Verify the size
    assertEquals(expectedMap.size(), mapView.size());

    // Verify random look ups
    Collections.shuffle(expectedElements, random);
    for (Entry<byte[], T> expected : expectedElements) {
      assertTrue(valueComparator.compare(expected.getValue(), mapView.get(expected.getKey())) == 0);
    }

    // Verify random contains
    Collections.shuffle(expectedElements, random);
    for (Entry<byte[], T> expected : expectedElements) {
      assertTrue(mapView.containsKey(expected.getKey()));
    }

    // Verify random key set contains
    Collections.shuffle(expectedElements, random);
    Set<byte[]> mapViewKeySet = mapView.keySet();
    for (Entry<byte[], T> expected : expectedElements) {
      mapViewKeySet.contains(expected.getKey());
    }

    // Verify key set iterator
    Iterator<byte[]> mapViewKeySetIterator = mapView.keySet().iterator();
    assertEquals(expectedElements.size(), Iterators.size(mapViewKeySetIterator));
    try {
      mapViewKeySetIterator.next();
      fail("Expected to have thrown NoSuchElementException");
    } catch (NoSuchElementException expected) {
    }

    // Verify random entry set contains
    Collections.shuffle(expectedElements, random);
    Set<Map.Entry<byte[], T>> mapViewEntrySet = mapView.entrySet();
    for (Entry<byte[], T> expected : expectedElements) {
      mapViewEntrySet.contains(new SimpleImmutableEntry<>(expected.getKey(), expected.getValue()));
    }

    // Verify entry set iterator
    Iterator<Map.Entry<byte[], T>> mapViewEntrySetIterator = mapView.entrySet().iterator();
    assertEquals(expectedElements.size(), Iterators.size(mapViewEntrySetIterator));
    try {
      mapViewEntrySetIterator.next();
      fail("Expected to have thrown NoSuchElementException");
    } catch (NoSuchElementException expected) {
    }

    // Verify random value collection contains
    Collections.shuffle(expectedElements, random);
    Collection<T> mapViewValues = mapView.values();
    for (Entry<byte[], T> expected : expectedElements) {
      mapViewValues.contains(expected.getValue());
    }

    // Verify entry set iterator
    Iterator<T> mapViewValuesIterator = mapView.values().iterator();
    assertEquals(expectedElements.size(), Iterators.size(mapViewValuesIterator));
    try {
      mapViewValuesIterator.next();
      fail("Expected to have thrown NoSuchElementException");
    } catch (NoSuchElementException expected) {
    }
  }

  WindowedValue<Long> valueInIntervalWindow(long value, long startOfWindow) {
    return WindowedValue.of(
        value, new Instant(startOfWindow), intervalWindow(startOfWindow), PaneInfo.NO_FIRING);
  }

  private static IntervalWindow intervalWindow(long start) {
    return new IntervalWindow(new Instant(start), new Instant(start + 1));
  }

  private static BoundedWindow windowOf(WindowedValue<?> windowedValue) {
    return windowedValue.getWindows().iterator().next();
  }

  <T> List<IsmRecord<WindowedValue<T>>> fromValues(Iterable<WindowedValue<T>> elements) {
    List<IsmRecord<WindowedValue<T>>> rval = new ArrayList<>();
    for (WindowedValue<T> element : elements) {
      rval.add(IsmRecord.of(ImmutableList.of(windowOf(element)), element));
    }
    return rval;
  }

  <K, V> List<IsmRecord<WindowedValue<V>>> fromKvsForList(
      Iterable<KV<K, WindowedValue<V>>> elements) {
    List<IsmRecord<WindowedValue<V>>> rval = new ArrayList<>();
    for (KV<K, WindowedValue<V>> element : elements) {
      rval.add(
          IsmRecord.of(
              ImmutableList.of(windowOf(element.getValue()), element.getKey()),
              element.getValue()));
    }
    return rval;
  }

  /**
   * Note that it is important that the return value if split is only split on shard boundaries
   * because it is expected that each shard id only appears in one source.
   *
   * <p>Each windowed value is expected to be within the same window.
   */
  <K, V> Multimap<Integer, IsmRecord<WindowedValue<V>>> forMap(
      IsmRecordCoder<WindowedValue<V>> coder, ListMultimap<K, WindowedValue<V>> elements)
      throws Exception {

    Multimap<Integer, IsmRecord<WindowedValue<V>>> rval =
        TreeMultimap.create(Ordering.natural(), new IsmReaderTest.IsmRecordKeyComparator<>(coder));

    for (K key : elements.keySet()) {
      long i = 0;
      for (WindowedValue<V> value : elements.get(key)) {
        IsmRecord<WindowedValue<V>> record =
            IsmRecord.of(ImmutableList.of(key, windowOf(value), i), value);
        rval.put(coder.hash(record.getKeyComponents()), record);
        i += 1L;
      }
    }

    return rval;
  }

  /** Each windowed value is expected to be within the same window. */
  <K, V> List<IsmRecord<WindowedValue<V>>> forMapMetadata(
      Coder<K> keyCoder, Collection<K> keys, BoundedWindow window) throws Exception {

    List<IsmRecord<WindowedValue<V>>> rval = new ArrayList<>();
    // Add the size metadata record
    rval.add(
        IsmRecord.<WindowedValue<V>>meta(
            ImmutableList.of(IsmFormat.getMetadataKey(), window, 0L),
            CoderUtils.encodeToByteArray(VarLongCoder.of(), (long) keys.size())));

    // Add the positional entries for each key
    long i = 1L;
    for (K key : keys) {
      rval.add(
          IsmRecord.<WindowedValue<V>>meta(
              ImmutableList.of(IsmFormat.getMetadataKey(), window, i),
              CoderUtils.encodeToByteArray(keyCoder, key)));
      i += 1L;
    }
    return rval;
  }

  <K, V> ArrayList<V> toValueList(Iterable<KV<K, WindowedValue<V>>> windowedValues) {
    ArrayList<V> values = new ArrayList<>();
    for (KV<K, WindowedValue<V>> kvWindowedValue : windowedValues) {
      values.add(kvWindowedValue.getValue().getValue());
    }
    return values;
  }

  @SuppressWarnings("unchecked")
  private static RandomAccessData encodeKeyPortion(IsmRecordCoder<?> coder, IsmRecord<?> record)
      throws IOException {
    RandomAccessData keyBytes = new RandomAccessData();
    for (int i = 0; i < coder.getKeyComponentCoders().size(); ++i) {
      coder.getKeyComponentCoder(i).encode(record.getKeyComponent(i), keyBytes.asOutputStream());
    }
    return keyBytes;
  }

  /** Write input elements to a new temporary file and return the corresponding IsmSource. */
  private <K, V> Source initInputFile(
      Iterable<IsmRecord<WindowedValue<V>>> elements, IsmRecordCoder<WindowedValue<V>> coder)
      throws Exception {
    return initInputFile(elements, coder, tmpFolder.newFile().getPath());
  }

  /** Write input elements to the given file and return the corresponding IsmSource. */
  private <K, V> Source initInputFile(
      Iterable<IsmRecord<WindowedValue<V>>> elements,
      IsmRecordCoder<WindowedValue<V>> coder,
      String tmpFilePath)
      throws Exception {
    // Group the keys by shard and sort the values within a shard by the composite key.
    Map<Integer, SortedMap<RandomAccessData, IsmRecord<WindowedValue<V>>>> writeOrder =
        new HashMap<>();
    for (IsmRecord<WindowedValue<V>> element : elements) {
      int shardId = coder.hash(element.getKeyComponents());
      if (!writeOrder.containsKey(shardId)) {
        writeOrder.put(
            shardId,
            new TreeMap<RandomAccessData, IsmRecord<WindowedValue<V>>>(
                RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR));
      }
      RandomAccessData data = encodeKeyPortion(coder, element);
      writeOrder.get(shardId).put(data, element);
    }

    IsmSink<WindowedValue<V>> sink =
        new IsmSink<>(
            FileSystems.matchNewResource(tmpFilePath, false), coder, BLOOM_FILTER_SIZE_LIMIT);
    try (SinkWriter<WindowedValue<IsmRecord<WindowedValue<V>>>> writer = sink.writer()) {
      for (Entry<Integer, SortedMap<RandomAccessData, IsmRecord<WindowedValue<V>>>> entry :
          writeOrder.entrySet()) {
        for (IsmRecord<WindowedValue<V>> record : entry.getValue().values()) {
          writer.add(new ValueInEmptyWindows<>(record));
        }
      }
    }
    return newIsmSource(coder, tmpFilePath);
  }

  /** Returns a new Source for the given ISM file using the specified coder. */
  private <K, V> Source newIsmSource(IsmRecordCoder<WindowedValue<V>> coder, String tmpFilePath) {
    Source source = new Source();
    source.setCodec(
        CloudObjects.asCloudObject(
            WindowedValue.getFullCoder(coder, GLOBAL_WINDOW_CODER), /*sdkComponents=*/ null));
    source.setSpec(new HashMap<String, Object>());
    source.getSpec().put(PropertyNames.OBJECT_TYPE_NAME, "IsmSource");
    source.getSpec().put(WorkerPropertyNames.FILENAME, tmpFilePath);
    return source;
  }

  private SideInputInfo toSideInputInfo(String tagId, Source... sources) {
    SideInputInfo sideInputInfo = new SideInputInfo();
    sideInputInfo.setTag(tagId);
    sideInputInfo.setKind(new HashMap<String, Object>());
    if (sources.length == 1) {
      sideInputInfo.getKind().put(PropertyNames.OBJECT_TYPE_NAME, "singleton");
    } else {
      sideInputInfo.getKind().put(PropertyNames.OBJECT_TYPE_NAME, "collection");
    }
    sideInputInfo.setSources(new ArrayList<>(Arrays.asList(sources)));
    return sideInputInfo;
  }

  private IsmSideInputReader serialSideInputReader(String tagId, Source... sources)
      throws Exception {
    return IsmSideInputReader.forTest(
        Arrays.asList(toSideInputInfo(tagId, sources)),
        pipelineOptions,
        executionContext,
        ReaderRegistry.defaultRegistry(),
        operationContext,
        MoreExecutors.newDirectExecutorService());
  }

  private IsmSideInputReader sideInputReader(String tagId, Source... sources) throws Exception {
    return IsmSideInputReader.of(
        Arrays.asList(toSideInputInfo(tagId, sources)),
        pipelineOptions,
        executionContext,
        ReaderRegistry.defaultRegistry(),
        operationContext);
  }
}
