blob: f101d859fdbf92472b6ce4c8283501e1b4c18afb [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.beam.runners.dataflow.worker;
import static org.apache.beam.runners.dataflow.util.Structs.addString;
import static org.apache.beam.runners.dataflow.util.Structs.getString;
import static;
import static;
import static;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.AbstractList;
import java.util.AbstractMap;
import java.util.AbstractSet;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.annotation.Nonnull;
import org.apache.beam.runners.core.InMemoryMultimapSideInputView;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.dataflow.internal.IsmFormat;
import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecord;
import org.apache.beam.runners.dataflow.internal.IsmFormat.MetadataKeyCoder;
import org.apache.beam.runners.dataflow.util.CloudObject;
import org.apache.beam.runners.dataflow.util.CloudObjects;
import org.apache.beam.runners.dataflow.util.PropertyNames;
import org.apache.beam.runners.dataflow.util.RandomAccessData;
import org.apache.beam.runners.dataflow.worker.ExperimentContext.Experiment;
import org.apache.beam.runners.dataflow.worker.util.WorkerPropertyNames;
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Materializations.MultimapView;
import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PCollectionViews.IterableViewFn;
import org.apache.beam.sdk.values.PCollectionViews.ListViewFn;
import org.apache.beam.sdk.values.PCollectionViews.MapViewFn;
import org.apache.beam.sdk.values.PCollectionViews.MultimapViewFn;
import org.apache.beam.sdk.values.PCollectionViews.SingletonViewFn;
import org.apache.beam.sdk.values.TupleTag;
* A side input reader over a set of {@link IsmFormat} files constructed by Dataflow. This reader
* expects a very specific key and metadata structure within the files for each side input type. See
* {@link #getSingletonForWindow} for singleton views, {@link #getListForWindow} for iterable and
* list views, and {@link #getMapForWindow} for map and multimap views.
public class IsmSideInputReader implements SideInputReader {
private static final String SINGLETON_KIND = "singleton";
private static final String COLLECTION_KIND = "collection";
private static final Object NULL_PLACE_HOLDER = new Object();
private static final ImmutableList<Class<? extends ViewFn>> KNOWN_SINGLETON_VIEW_TYPES =
ImmutableList.of(SingletonViewFn.class, MapViewFn.class, MultimapViewFn.class);
* Limit the number of concurrent initializations.
* <p>TODO: Move the bounded executor out to an accessible place such as on PipelineOptions.
private static final ExecutorService SHARED_EXECUTOR_SERVICE =
Executors.newFixedThreadPool(64 /* concurrency limit */);
private final ExecutorService executorService;
private final Set<TupleTag<?>> singletonMaterializedTags;
/** A map from tuple tag to non-empty IsmReaders. */
@VisibleForTesting final Map<TupleTag<?>, List<IsmReader<?>>> tagToIsmReaderMap;
* A map from tuple tag to empty IsmReaders. Even though this is unused, we want to maintain a
* strong reference so that it is retained in memory so the logical reference cache keeps a
* reference so that other IsmSideInputReader instances will not need to continuously initialize
* empty IsmReaders.
final Map<TupleTag<?>, List<IsmReader<?>>> tagToEmptyIsmReaderMap;
private final ReaderFactory readerFactory;
private final BatchModeExecutionContext executionContext;
private final DataflowOperationContext operationContext;
private IsmSideInputReader(
Iterable<? extends SideInputInfo> sideInputInfos,
PipelineOptions options,
BatchModeExecutionContext executionContext,
ReaderFactory readerFactory,
DataflowOperationContext operationContext,
ExecutorService executorService)
throws Exception {
this.readerFactory = readerFactory;
this.executionContext = executionContext;
this.operationContext = operationContext;
this.executorService = executorService;
Map<TupleTag<?>, List<IsmReader<?>>> tagToAllIsmReaders = new HashMap<>();
int sideInputIndex = 1;
for (SideInputInfo sideInputInfo : sideInputInfos) {
TupleTag<?> tag = new TupleTag<>(sideInputInfo.getTag());
List<IsmReader<?>> readers =
createReadersFromSources(options, sideInputInfo, executionContext, sideInputIndex);
tagToAllIsmReaders.put(tag, readers);
// Collect all the IsmReaders that need to be initialized
List<Callable<Void>> ismReadersToInitialize = new ArrayList<>();
for (final IsmReader<?> ismReader : Iterables.concat(tagToAllIsmReaders.values())) {
if (!ismReader.isInitialized()) {
() -> {
// Checking whether the file is empty ensures that the footer is loaded.
return null;
// Initialize all the IsmReaders in parallel and wait.
List<Future<Void>> initializations = executorService.invokeAll(ismReadersToInitialize);
for (Future<Void> initialization : initializations) {
ImmutableMap.Builder<TupleTag<?>, List<IsmReader<?>>> tagToIsmReaderMapBuilder =
ImmutableMap.Builder<TupleTag<?>, List<IsmReader<?>>> tagToEmptyIsmReaderMapBuilder =
ImmutableSet.Builder<TupleTag<?>> singletonMaterializationBuilder = ImmutableSet.builder();
for (Map.Entry<TupleTag<?>, List<IsmReader<?>>> entry : tagToAllIsmReaders.entrySet()) {
ImmutableList.Builder<IsmReader<?>> nonEmptyIsmReaders = ImmutableList.builder();
ImmutableList.Builder<IsmReader<?>> emptyIsmReaders = ImmutableList.builder();
for (IsmReader<?> ismReader : entry.getValue()) {
if (ismReader.isEmpty()) {
} else {
if (ismReader.getCoder().getKeyComponentCoders().size() == 1) {
// The coder for a singleton-materialized Side Input will contain only the window in the
// key. Iterable/List materializations have Window + Index, and Multimap has
// Window + Key + Index
this.tagToIsmReaderMap =;
this.tagToEmptyIsmReaderMap =;
this.singletonMaterializedTags =;
private List<IsmReader<?>> createReadersFromSources(
PipelineOptions options,
SideInputInfo sideInputInfo,
DataflowExecutionContext executionContext,
int sideInputIndex)
throws Exception {
String sideInputKind = getString(sideInputInfo.getKind(), PropertyNames.OBJECT_TYPE_NAME);
if (SINGLETON_KIND.equals(sideInputKind)) {
sideInputInfo.getSources().size() == 1,
"expecting a singleton side input kind to have a single source");
} else if (!COLLECTION_KIND.equals(sideInputKind)) {
throw new Exception("unexpected kind of side input: " + sideInputKind);
SideInputReadCounter sideInputReadCounter;
ExperimentContext ec = ExperimentContext.parseFrom(options);
if (ec.isEnabled(Experiment.SideInputIOMetrics)) {
sideInputReadCounter =
new DataflowSideInputReadCounter(executionContext, operationContext, sideInputIndex);
} else {
sideInputReadCounter = new NoopSideInputReadCounter();
ImmutableList.Builder<IsmReader<?>> builder = ImmutableList.builder();
for (Source source : sideInputInfo.getSources()) {
Coder<?> coder = null;
if (source.getCodec() != null) {
coder = CloudObjects.coderFromCloudObject(CloudObject.fromSpec(source.getCodec()));
CloudObject spec = CloudObject.fromSpec(source.getSpec());
final String filepattern = getString(spec, WorkerPropertyNames.FILENAME);
for (String file : Filepatterns.expandAtNFilepattern(filepattern)) {
CloudObject fileSpec = spec.clone(); // Deep clone.
addString(fileSpec, WorkerPropertyNames.FILENAME, file);
NativeReader<?> reader =
readerFactory.create(fileSpec, coder, options, executionContext, operationContext);
reader instanceof IsmReader,
"%s only supports %s as a reader but was %s.",
IsmReader ismReader = (IsmReader) reader;
builder.add(new SideInputTrackingIsmReader<>(ismReader, sideInputReadCounter));
* Creates a new {@link SideInputReader} that will provide side inputs according to the provided
* {@link SideInputInfo} descriptors.
static IsmSideInputReader of(
Iterable<? extends SideInputInfo> sideInputInfos,
PipelineOptions options,
BatchModeExecutionContext context,
ReaderFactory readerFactory,
DataflowOperationContext operationContext)
throws Exception {
return new IsmSideInputReader(
sideInputInfos, options, context, readerFactory, operationContext, SHARED_EXECUTOR_SERVICE);
static IsmSideInputReader forTest(
Iterable<? extends SideInputInfo> sideInputInfos,
PipelineOptions options,
BatchModeExecutionContext context,
ReaderFactory readerFactory,
DataflowOperationContext operationContext,
ExecutorService testExecutorService)
throws Exception {
return new IsmSideInputReader(
sideInputInfos, options, context, readerFactory, operationContext, testExecutorService);
public <T> boolean contains(PCollectionView<T> view) {
return tagToIsmReaderMap.containsKey(view.getTagInternal());
public boolean isEmpty() {
return tagToIsmReaderMap.isEmpty();
public <ViewT> ViewT get(final PCollectionView<ViewT> view, final BoundedWindow window) {
final TupleTag tag = view.getTagInternal();
checkArgument(tagToIsmReaderMap.containsKey(tag), "calling getSideInput() with unknown view");
// We specialize each individual view with a specific data structure tailored
// for its use.
try {
ViewFn<?, ?> viewFn = view.getViewFn();
// We handle the singleton case separately since a null value may be returned.
// We use a null place holder to represent this, and when we detect it, we translate
// back to null for the user.
if (viewFn instanceof SingletonViewFn) {
ViewT rval =
.<PCollectionViewWindow<ViewT>, ViewT>getLogicalReferenceCache()
PCollectionViewWindow.of(view, window),
() -> {
ViewT viewT =
getSingletonForWindow(tag, (SingletonViewFn<ViewT>) viewFn, window);
ViewT nullPlaceHolder = (ViewT) NULL_PLACE_HOLDER;
return viewT == null ? nullPlaceHolder : viewT;
return rval == NULL_PLACE_HOLDER ? null : rval;
} else if (singletonMaterializedTags.contains(tag)) {
viewFn instanceof MapViewFn || viewFn instanceof MultimapViewFn,
"Unknown view type stored as singleton. Expected one of %s, got %s",
return executionContext
.<PCollectionViewWindow<ViewT>, ViewT>getLogicalReferenceCache()
PCollectionViewWindow.of(view, window),
() -> {
return getMapSingletonForViewAndWindow(tag, window);
} else {
return executionContext
.<PCollectionViewWindow<ViewT>, ViewT>getLogicalReferenceCache()
PCollectionViewWindow.of(view, window),
() -> {
if (viewFn instanceof IterableViewFn || viewFn instanceof ListViewFn) {
ViewT viewT = (ViewT) getListForWindow(tag, window);
return viewT;
} else if (viewFn instanceof MapViewFn) {
ViewT viewT = (ViewT) getMapForWindow(tag, window);
return viewT;
} else if (viewFn instanceof MultimapViewFn) {
ViewT viewT = (ViewT) getMultimapForWindow(tag, window);
return viewT;
} else if (viewFn
instanceof DataflowPortabilityPCollectionView.PortabilityViewFn) {
ViewT viewT = (ViewT) getPortabilityMultimapForWindow(tag, window);
return viewT;
throw new IllegalArgumentException("Unknown type of view requested: " + view);
} catch (ExecutionException e) {
throw new IllegalStateException(
String.format("Failed to materialize view %s for window %s.", view, window),
* Returns the singleton for the provided window if available. Otherwise returns the default
* specified within the view. This function expects at most one Ism file to contain a given window
* and expects the Ism records to have been written out as:
* <ul>
* <li>Key 1: Window
* <li>Value: WindowedValue
* </ul>
private <T, W extends BoundedWindow> T getSingletonForWindow(
TupleTag<?> viewTag, SingletonViewFn<T> viewFn, W window) throws IOException {
@SuppressWarnings({"rawtypes", "unchecked"})
List<IsmReader<WindowedValue<T>>> readers = (List) tagToIsmReaderMap.get(viewTag);
List<IsmReader<WindowedValue<T>>.IsmPrefixReaderIterator> readerIterators =
findAndStartReaders(readers, ImmutableList.of(window));
// If no reader iterator contains the value, return the default value stored
// within the view.
if (readerIterators.isEmpty()) {
return viewFn.getDefaultValue();
readerIterators.size() == 1,
"Expected only one Ism file to contain the singleton record for window %s",
return readerIterators.get(0).getCurrent().getValue().getValue().getValue();
/** Get a map written as a singleton view due to a nondeterminstic key {@link Coder}. */
private <T, W extends BoundedWindow> T getMapSingletonForViewAndWindow(
TupleTag<?> viewTag, W window) throws IOException {
@SuppressWarnings({"rawtypes", "unchecked"})
List<IsmReader<WindowedValue<T>>> readers = (List) tagToIsmReaderMap.get(viewTag);
List<IsmReader<WindowedValue<T>>.IsmPrefixReaderIterator> readerIterators =
findAndStartReaders(readers, ImmutableList.of(window));
// If no reader iterator contains the value, return the default value stored
// within the view.
if (readerIterators.isEmpty()) {
return (T) ImmutableMap.of();
readerIterators.size() == 1,
"Expected only one Ism file to contain the singleton record for window %s",
return readerIterators.get(0).getCurrent().getValue().getValue().getValue();
* Returns the list for the provided window if available. Otherwise returns an empty list. For the
* non-global window, this function expects at most one Ism file to contain a given window and
* expects the Ism records to have been written out as:
* <ul>
* <li>Key 1: Window
* <li>Key 2: Index offset within window
* <li>Value: WindowedValue
* </ul>
* <p>For the global window, this expects that each file contains a segment of the list and that
* the Ism records have been written out as:
* <ul>
* <li>Key 1: Window
* <li>Key 2: Index offset within file
* <li>Value: WindowedValue
* </ul>
* The global offset is computed by summing the number of elements in each file.
private <T, W extends BoundedWindow> List<T> getListForWindow(TupleTag<?> tag, W window)
throws IOException {
@SuppressWarnings({"unchecked", "rawtypes"})
List<IsmReader<WindowedValue<T>>> readers = (List) tagToIsmReaderMap.get(tag);
List<IsmReader<WindowedValue<T>>.IsmPrefixReaderIterator> readerIterators =
findAndStartReaders(readers, ImmutableList.of(window));
if (readerIterators.isEmpty()) {
return Collections.emptyList();
GlobalWindow.INSTANCE.equals(window) || readerIterators.size() == 1,
"Expected to have had PCollectionView %s for window %s contained within one file.",
return Collections.unmodifiableList(
new ListOverReaderIterators<>(
readerIterators, (WindowedValue<T> value) -> value.getValue()));
* Returns a map over a set of readers or an empty map if the window is not within the side input.
* The {@code Ism} files are expected to be sharded by the hash of the key's byte representation.
* Each record is required to be structured as follows:
* <ul>
* <li>Key 1: User key K
* <li>Key 2: Window
* <li>Key 3: 0L (constant)
* <li>Value: Windowed value
* </ul>
* <p>Alongside the data records, there are the following metadata records:
* <ul>
* <li>Key 1: Metadata Key
* <li>Key 2: Window
* <li>Key 3: Index [0, size of map]
* <li>Value: variable length long byte representation of size of map if index is 0, otherwise
* the byte representation of a key
* </ul>
* The {@code [META, Window, 0]} record stores the number of unique keys per window, while {@code
* [META, Window, i]} for {@code i} in {@code [1, size of map]} stores a the users key. This
* allows for one to access the size of the map by looking at {@code [META, Window, 0]} and
* iterate over all the keys by accessing {@code [META, Window, i]} for {@code i} in {@code [1,
* size of map]}.
private <K, V, W extends BoundedWindow> Map<K, V> getMapForWindow(TupleTag<?> tag, W window)
throws IOException {
@SuppressWarnings({"unchecked", "rawtypes"})
List<IsmReader<WindowedValue<V>>> readers = (List) tagToIsmReaderMap.get(tag);
if (readers.isEmpty()) {
return Collections.emptyMap();
Coder<K> keyCoder =
((MetadataKeyCoder<K>) readers.get(0).getCoder().getKeyComponentCoder(0)).getKeyCoder();
Long size =
readers, ImmutableList.of(IsmFormat.getMetadataKey(), window, 0L), VarLongCoder.of());
// If there is no metadata record for the window, we can return an empty map.
if (size == null) {
return Collections.emptyMap();
return Collections.unmodifiableMap(
new MapOverReaders<K, V, V, W>(window, new MapToValue<K, V>(), readers, keyCoder, size));
* Returns a map over a set of readers or an empty map if the window is not within the side input.
* The {@code Ism} files are expected to be sharded by the hash of the key's byte representation.
* Each record is required to be structured as follows:
* <ul>
* <li>Key 1: User key K
* <li>Key 2: Window
* <li>Key 3: Index offset for a given key and window.
* <li>Value: Windowed value
* </ul>
* <p>Alongside the data records, there are the following metadata records:
* <ul>
* <li>Key 1: Metadata Key
* <li>Key 2: Window
* <li>Key 3: Index [0, size of map]
* <li>Value: variable length long byte representation of size of map if index is 0, otherwise
* the byte representation of a key
* </ul>
* The {@code [META, Window, 0]} record stores the number of unique keys per window, while {@code
* [META, Window, i]} for {@code i} in {@code [1, size of map]} stores a the users key. This
* allows for one to access the size of the map by looking at {@code [META, Window, 0]} and
* iterate over all the keys by accessing {@code [META, Window, i]} for {@code i} in {@code [1,
* size of map]}.
private <K, V, W extends BoundedWindow> Map<K, Iterable<V>> getMultimapForWindow(
TupleTag<?> tag, W window) throws IOException {
@SuppressWarnings({"unchecked", "rawtypes"})
List<IsmReader<WindowedValue<V>>> readers = (List) tagToIsmReaderMap.get(tag);
if (readers.isEmpty()) {
return Collections.emptyMap();
Coder<K> keyCoder =
((MetadataKeyCoder<K>) readers.get(0).getCoder().getKeyComponentCoder(0)).getKeyCoder();
Long size =
readers, ImmutableList.of(IsmFormat.getMetadataKey(), window, 0L), VarLongCoder.of());
// If there is no metadata record for the window, we can return an empty map.
if (size == null) {
return Collections.emptyMap();
return Collections.unmodifiableMap(
new MapOverReaders<K, V, Iterable<V>, W>(
window, new MapToIterable<K, V>(), readers, keyCoder, size));
* Returns a {@link MultimapView} over a set of readers. The {@code Ism} files are expected to be
* sharded by the hash of the key's byte representation. Each record is required to be structured
* as follows:
* <ul>
* <li>Key 1: User key K
* <li>Key 2: Window
* <li>Key 3: Index offset for a given key and window.
* <li>Value: Windowed value
* </ul>
* <p>There are no metadata records associated with any records.
private <K, V, W extends BoundedWindow> MultimapView<K, V> getPortabilityMultimapForWindow(
TupleTag<?> tag, W window) throws IOException {
@SuppressWarnings({"unchecked", "rawtypes"})
List<IsmReader<V>> readers = (List) tagToIsmReaderMap.get(tag);
if (readers.isEmpty()) {
return InMemoryMultimapSideInputView.empty();
return new IsmMultimapView<>(window, readers);
* A {@link MultimapView} that fronts a set of readers. This view assumes that the readers have
* records using the user key followed by window.
private class IsmMultimapView<K, V> implements MultimapView<K, V> {
private final BoundedWindow window;
private final List<IsmReader<V>> readers;
private IsmMultimapView(BoundedWindow window, List<IsmReader<V>> readers) {
this.window = window;
this.readers = readers;
public Iterable<K> get() {
throw new UnsupportedOperationException("TODO: Support enumerating the keys.");
public Iterable<V> get(K k) {
try {
return new ListOverReaderIterators<>(
findAndStartReaders(readers, ImmutableList.of(k, window)), (V value) -> value);
} catch (IOException e) {
throw new RuntimeException(
String.format("Unable to create view for window %s and key %s.", window, k), e);
* Returns the last key from each reader iterator. This function assumes that the last key
* component is a long.
private <V> Collection<Long> getListIndexFromReaderIterators(
List<IsmReader<V>.IsmPrefixReaderIterator> readerIterators) throws IOException {
List<Callable<Long>> callables = new ArrayList<>();
// Build a list of callables that will return the last key component assuming its a long
for (final IsmReader<V>.IsmPrefixReaderIterator readerIterator : readerIterators) {
() -> {
WindowedValue<IsmRecord<V>> last = readerIterator.getLast();
if (last == null) {
return 0L;
return ((long)
last.getValue().getKeyComponent(last.getValue().getKeyComponents().size() - 1))
+ 1L;
try {
List<Future<Long>> results = executorService.invokeAll(callables);
List<Long> lastKeyComponents = new ArrayList<>(results.size());
for (Future<Long> result : results) {
return lastKeyComponents;
} catch (InterruptedException | ExecutionException e) {
if (e instanceof InterruptedException) {
// Attempt to propagate the cause if possible.
Throwables.propagateIfPossible(e.getCause(), IOException.class);
throw new IOException(e);
* Returns a list of reader iterators over the provided key components. Each reader iterator
* within the returned list is guaranteed to have at least one element and will be in a state
* where {@link NativeReader.NativeReaderIterator#start} has already been called.
private <V> List<IsmReader<V>.IsmPrefixReaderIterator> findAndStartReaders(
List<IsmReader<V>> readers, final List<?> keyComponents) throws IOException {
if (readers.isEmpty()) {
return Collections.emptyList();
RandomAccessData keyBytes = new RandomAccessData();
int shardId = readers.get(0).getCoder().encodeAndHash(keyComponents, keyBytes);
List<IsmReader<V>.IsmPrefixReaderIterator> readerIterators = new ArrayList<>();
for (final IsmReader<V> reader : readers) {
IsmReader<V>.IsmPrefixReaderIterator readerIterator =
reader.overKeyComponents(keyComponents, shardId, keyBytes);
if (readerIterator.start()) {
return readerIterators;
* Finds the metadata associated with the specific key components. Returns null if the metadata
* does not exist.
private <V, T> T findMetadata(
List<IsmReader<WindowedValue<V>>> readers, List<?> keyComponents, Coder<T> metadataCoder)
throws IOException {
// Find a set of reader iterators that have the requested key components.
List<IsmReader<WindowedValue<V>>.IsmPrefixReaderIterator> readerIterators =
findAndStartReaders(readers, keyComponents);
if (readerIterators.isEmpty()) {
return null;
// We expect at most one such reader iterator to have been returned.
IsmReader<WindowedValue<V>>.IsmPrefixReaderIterator readerIterator =
// Decode the metadata
return CoderUtils.decodeFromByteArray(
metadataCoder, readerIterator.getCurrent().getValue().getMetadata());
* A list that fronts a set of reader iterators. This list assumes that that reader iterators are
* already configured for the proper prefix. For an iterable/list view, this would be the window.
* For a multimap, this would be the user key and window.
private class ListOverReaderIterators<T, V> extends AbstractList<V> {
private final List<IsmReader<T>.IsmPrefixReaderIterator> readerIterators;
private final ArrayList<Long> numberOfRecordsPerReaderIterator;
private final Function<T, V> valueTypeToOutputType;
private ListOverReaderIterators(
List<IsmReader<T>.IsmPrefixReaderIterator> readerIterators,
Function<T, V> valueTypeToOutputType)
throws IOException {
this.readerIterators = readerIterators;
this.numberOfRecordsPerReaderIterator =
new ArrayList<>(getListIndexFromReaderIterators(readerIterators));
this.valueTypeToOutputType = valueTypeToOutputType;
// TODO: Support greater than Integer.MAX_VALUE values for iteration/lookup and size.
public V get(int index) {
return getUsingLong(index);
public int size() {
return Ints.checkedCast(longSize());
public Iterator<V> iterator() {
return listIterator();
public ListIterator<V> listIterator() {
return new ListIteratorOverReaderIterators();
/** Returns the value at the specified position. */
private V getUsingLong(long index) {
try {
if (index < 0) {
throw new IndexOutOfBoundsException("Index out of range: " + index);
// We locate which reader iterator contains the requested index
// by using the number of records contained within each reader iterator
// as a way to compute the local offset. Once we find a local offset
// which is within the bounds of the reader iterator, we use that local
// offset to find the requested record.
long localOffset = index;
int readerIteratorIndex;
for (readerIteratorIndex = 0;
readerIteratorIndex < numberOfRecordsPerReaderIterator.size();
++readerIteratorIndex) {
if (localOffset < numberOfRecordsPerReaderIterator.get(readerIteratorIndex)) {
WindowedValue<IsmRecord<T>> rval =
rval != null,
"Expected to have found index %s, local offset %s within file.",
return valueTypeToOutputType.apply(rval.getValue().getValue());
localOffset -= numberOfRecordsPerReaderIterator.get(readerIteratorIndex);
// If we went past the last file then we seeked past the end and are out of bounds.
throw new IndexOutOfBoundsException("Index out of range: " + index);
} catch (IOException e) {
throw new IllegalStateException(e);
private long longSize() {
long total = 0;
for (Long numRecords : numberOfRecordsPerReaderIterator) {
total += numRecords;
return total;
/** An immutable list iterator that uses a long as its position. */
private class ListIteratorOverReaderIterators implements ListIterator<V> {
private long position;
public boolean hasNext() {
return position < longSize();
public V next() {
if (!hasNext()) {
throw new NoSuchElementException();
V rval = getUsingLong(position);
position += 1;
return rval;
public boolean hasPrevious() {
return position > 0;
public V previous() {
if (!hasPrevious()) {
throw new NoSuchElementException();
position -= 1;
return getUsingLong(position);
public int nextIndex() {
return Ints.checkedCast(position);
public int previousIndex() {
return Ints.checkedCast(position - 1);
public void remove() {
throw new UnsupportedOperationException();
public void set(V e) {
throw new UnsupportedOperationException();
public void add(V e) {
throw new UnsupportedOperationException();
* A map that fronts a set of readers. This map assumes that the readers have records using the
* user key followed by window.
private class MapOverReaders<K, V1, V2, W extends BoundedWindow> extends AbstractMap<K, V2> {
private final W window;
private final Function<KV<K, IsmReader<WindowedValue<V1>>.IsmPrefixReaderIterator>, V2>
private final List<IsmReader<WindowedValue<V1>>> readers;
private final Coder<K> keyCoder;
private final long size;
private MapOverReaders(
W window,
Function<KV<K, IsmReader<WindowedValue<V1>>.IsmPrefixReaderIterator>, V2> transform,
List<IsmReader<WindowedValue<V1>>> readers,
Coder<K> keyCoder,
long size) {
this.window = window;
this.transform = transform;
this.readers = readers;
this.keyCoder = keyCoder;
this.size = size;
public boolean containsKey(Object key) {
try {
return !findAndStartReaders(readers, ImmutableList.of(key, window)).isEmpty();
} catch (IOException e) {
throw new IllegalStateException(e);
public V2 get(Object objectKey) {
K key = (K) objectKey;
try {
// We find the reader iterator which contains the key/window prefix.
// For maps, this yields only one record. For multimaps, this is a valid
// prefix reader iterator.
List<IsmReader<WindowedValue<V1>>.IsmPrefixReaderIterator> readerIterators =
findAndStartReaders(readers, ImmutableList.of(key, window));
if (readerIterators.isEmpty()) {
return null;
// Only one such reader iterator is expected.
IsmReader<WindowedValue<V1>>.IsmPrefixReaderIterator readerIterator =
return transform.apply(KV.of(key, readerIterator));
} catch (IOException e) {
throw new IllegalStateException(e);
public int size() {
return Ints.checkedCast(size);
public Set<Entry<K, V2>> entrySet() {
return new EntrySet();
/** An entry set that is fronted by this map. */
private class EntrySet extends AbstractSet<Entry<K, V2>> {
public boolean contains(Object o) {
if (!(o instanceof Entry)) {
return false;
Entry<K, ?> entry = (Entry<K, ?>) o;
try {
// We find the reader iterator which contains the key/window prefix.
// For maps, this yields only one record. For multimaps, this is a valid
// prefix reader iterator.
List<IsmReader<WindowedValue<V1>>.IsmPrefixReaderIterator> readerIterators =
findAndStartReaders(readers, ImmutableList.of(entry.getKey(), window));
if (readerIterators.isEmpty()) {
return false;
// Only one such reader iterator is expected.
IsmReader<WindowedValue<V1>>.IsmPrefixReaderIterator readerIterator =
return Objects.equal(
entry.getValue(), transform.apply(KV.of(entry.getKey(), readerIterator)));
} catch (IOException e) {
throw new IllegalStateException(e);
public Iterator<Entry<K, V2>> iterator() {
return new EntrySetIterator();
public int size() {
return Ints.checkedCast(size);
* An entry set iterator that backs this map which utilizes the [META, Window, Index] records to
* locate subsequent keys.
private class EntrySetIterator implements Iterator<Entry<K, V2>> {
long position = 0;
public boolean hasNext() {
return position < size;
public Entry<K, V2> next() {
if (!hasNext()) {
throw new NoSuchElementException();
final K key;
final V2 value;
try {
key =
ImmutableList.of(IsmFormat.getMetadataKey(), window, position + 1),
value = get(key);
} catch (IOException e) {
throw new IllegalStateException(e);
// Once we have fetched the key and value we can increment the position knowing that
// an exception won't be thrown, thus allowing retries.
position += 1;
return new StructuralMapEntry<>(keyCoder, key, value);
public void remove() {
throw new UnsupportedOperationException();
* A function which is able to unwrap windowed values to just their values using a reader iterator
* as input.
private static class MapToValue<K, V>
implements Function<KV<K, IsmReader<WindowedValue<V>>.IsmPrefixReaderIterator>, V> {
justification = "")
public V apply(@Nonnull KV<K, IsmReader<WindowedValue<V>>.IsmPrefixReaderIterator> input) {
IsmReader<WindowedValue<V>>.IsmPrefixReaderIterator startedReader = input.getValue();
WindowedValue<IsmRecord<WindowedValue<V>>> value = startedReader.getCurrent();
return value.getValue().getValue().getValue();
* A function which is able to create a list over reader iterators using the reader iterator to
* back it.
private class MapToIterable<K, V>
implements Function<KV<K, IsmReader<WindowedValue<V>>.IsmPrefixReaderIterator>, Iterable<V>> {
justification = "")
public Iterable<V> apply(
@Nonnull KV<K, IsmReader<WindowedValue<V>>.IsmPrefixReaderIterator> input) {
try {
return Iterables.unmodifiableIterable(
new ListOverReaderIterators<>(
ImmutableList.of(input.getValue()), (WindowedValue<V> value) -> value.getValue()));
} catch (IOException e) {
throw new IllegalStateException(e);
/** A map entry which utilizes the structural value of the key for comparison. */
private static class StructuralMapEntry<K, V> extends AbstractMap.SimpleImmutableEntry<K, V> {
private final Coder<K> keyCoder;
public StructuralMapEntry(Coder<K> keyCoder, K key, V value) {
super(key, value);
this.keyCoder = keyCoder;
public boolean equals(Object o) {
if (!(o instanceof Map.Entry)) {
return false;
try {
Map.Entry<K, V> other = (Map.Entry<K, V>) o;
return Objects.equal(
keyCoder.structuralValue(getKey()), keyCoder.structuralValue(other.getKey()))
&& Objects.equal(getValue(), other.getValue());
} catch (Exception e) {
throw new IllegalArgumentException(e);
public int hashCode() {
try {
return Objects.hashCode(keyCoder.structuralValue(getKey()), getValue());
} catch (Exception e) {
throw new IllegalStateException(e);
public String toString() {
return MoreObjects.toStringHelper(StructuralMapEntry.class)
.add("key", getKey())
.add("value", getValue())
.add("keyCoder", keyCoder)