| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.runners.dataflow.worker; |
| |
| import static org.apache.beam.runners.dataflow.util.Structs.addString; |
| import static org.apache.beam.runners.dataflow.util.Structs.getString; |
| import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; |
| import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; |
| import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; |
| import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; |
| |
| import com.google.api.services.dataflow.model.SideInputInfo; |
| import com.google.api.services.dataflow.model.Source; |
| import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; |
| import java.io.IOException; |
| import java.util.AbstractList; |
| import java.util.AbstractMap; |
| import java.util.AbstractSet; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.ListIterator; |
| import java.util.Map; |
| import java.util.NoSuchElementException; |
| import java.util.Set; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import javax.annotation.Nonnull; |
| import org.apache.beam.runners.core.InMemoryMultimapSideInputView; |
| import org.apache.beam.runners.core.SideInputReader; |
| import org.apache.beam.runners.dataflow.internal.IsmFormat; |
| import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecord; |
| import org.apache.beam.runners.dataflow.internal.IsmFormat.MetadataKeyCoder; |
| import org.apache.beam.runners.dataflow.util.CloudObject; |
| import org.apache.beam.runners.dataflow.util.CloudObjects; |
| import org.apache.beam.runners.dataflow.util.PropertyNames; |
| import org.apache.beam.runners.dataflow.util.RandomAccessData; |
| import org.apache.beam.runners.dataflow.worker.util.WorkerPropertyNames; |
| import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader; |
| import org.apache.beam.sdk.coders.Coder; |
| import org.apache.beam.sdk.coders.VarLongCoder; |
| import org.apache.beam.sdk.options.PipelineOptions; |
| import org.apache.beam.sdk.transforms.Materializations.MultimapView; |
| import org.apache.beam.sdk.transforms.ViewFn; |
| import org.apache.beam.sdk.transforms.windowing.BoundedWindow; |
| import org.apache.beam.sdk.transforms.windowing.GlobalWindow; |
| import org.apache.beam.sdk.util.CoderUtils; |
| import org.apache.beam.sdk.util.WindowedValue; |
| import org.apache.beam.sdk.values.KV; |
| import org.apache.beam.sdk.values.PCollectionView; |
| import org.apache.beam.sdk.values.PCollectionViews.IterableViewFn; |
| import org.apache.beam.sdk.values.PCollectionViews.ListViewFn; |
| import org.apache.beam.sdk.values.PCollectionViews.MapViewFn; |
| import org.apache.beam.sdk.values.PCollectionViews.MultimapViewFn; |
| import org.apache.beam.sdk.values.PCollectionViews.SingletonViewFn; |
| import org.apache.beam.sdk.values.TupleTag; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Function; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Objects; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Ints; |
| import org.checkerframework.checker.nullness.qual.Nullable; |
| |
| /** |
| * A side input reader over a set of {@link IsmFormat} files constructed by Dataflow. This reader |
| * expects a very specific key and metadata structure within the files for each side input type. See |
| * {@link #getSingletonForWindow} for singleton views, {@link #getListForWindow} for iterable and |
| * list views, and {@link #getMapForWindow} for map and multimap views. |
| */ |
| @SuppressWarnings({ |
| "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556) |
| "keyfor", |
| "nullness" |
| }) // TODO(https://issues.apache.org/jira/browse/BEAM-10402) |
| public class IsmSideInputReader implements SideInputReader { |
| private static final String SINGLETON_KIND = "singleton"; |
| private static final String COLLECTION_KIND = "collection"; |
| private static final Object NULL_PLACE_HOLDER = new Object(); |
| |
| private static final ImmutableList<Class<? extends ViewFn>> KNOWN_SINGLETON_VIEW_TYPES = |
| ImmutableList.of(SingletonViewFn.class, MapViewFn.class, MultimapViewFn.class); |
| |
| /** |
| * Limit the number of concurrent initializations. |
| * |
| * <p>TODO: Move the bounded executor out to an accessible place such as on PipelineOptions. |
| */ |
| private static final ExecutorService SHARED_EXECUTOR_SERVICE = |
| Executors.newFixedThreadPool(64 /* concurrency limit */); |
| |
| private final ExecutorService executorService; |
| private final Set<TupleTag<?>> singletonMaterializedTags; |
| /** A map from tuple tag to non-empty IsmReaders. */ |
| @VisibleForTesting final Map<TupleTag<?>, List<IsmReader<?>>> tagToIsmReaderMap; |
| /** |
| * A map from tuple tag to empty IsmReaders. Even though this is unused, we want to maintain a |
| * strong reference so that it is retained in memory so the logical reference cache keeps a |
| * reference so that other IsmSideInputReader instances will not need to continuously initialize |
| * empty IsmReaders. |
| */ |
| @SuppressWarnings("unused") |
| @VisibleForTesting |
| final Map<TupleTag<?>, List<IsmReader<?>>> tagToEmptyIsmReaderMap; |
| |
| private final ReaderFactory readerFactory; |
| private final BatchModeExecutionContext executionContext; |
| private final DataflowOperationContext operationContext; |
| |
| private IsmSideInputReader( |
| Iterable<? extends SideInputInfo> sideInputInfos, |
| PipelineOptions options, |
| BatchModeExecutionContext executionContext, |
| ReaderFactory readerFactory, |
| DataflowOperationContext operationContext, |
| ExecutorService executorService) |
| throws Exception { |
| this.readerFactory = readerFactory; |
| this.executionContext = executionContext; |
| this.operationContext = operationContext; |
| this.executorService = executorService; |
| |
| Map<TupleTag<?>, List<IsmReader<?>>> tagToAllIsmReaders = new HashMap<>(); |
| int sideInputIndex = 1; |
| for (SideInputInfo sideInputInfo : sideInputInfos) { |
| TupleTag<?> tag = new TupleTag<>(sideInputInfo.getTag()); |
| List<IsmReader<?>> readers = |
| createReadersFromSources(options, sideInputInfo, executionContext, sideInputIndex); |
| tagToAllIsmReaders.put(tag, readers); |
| sideInputIndex++; |
| } |
| |
| // Collect all the IsmReaders that need to be initialized |
| List<Callable<Void>> ismReadersToInitialize = new ArrayList<>(); |
| for (final IsmReader<?> ismReader : Iterables.concat(tagToAllIsmReaders.values())) { |
| if (!ismReader.isInitialized()) { |
| ismReadersToInitialize.add( |
| () -> { |
| // Checking whether the file is empty ensures that the footer is loaded. |
| ismReader.isEmpty(); |
| return null; |
| }); |
| } |
| } |
| // Initialize all the IsmReaders in parallel and wait. |
| List<Future<Void>> initializations = executorService.invokeAll(ismReadersToInitialize); |
| for (Future<Void> initialization : initializations) { |
| initialization.get(); |
| } |
| |
| ImmutableMap.Builder<TupleTag<?>, List<IsmReader<?>>> tagToIsmReaderMapBuilder = |
| ImmutableMap.builder(); |
| ImmutableMap.Builder<TupleTag<?>, List<IsmReader<?>>> tagToEmptyIsmReaderMapBuilder = |
| ImmutableMap.builder(); |
| ImmutableSet.Builder<TupleTag<?>> singletonMaterializationBuilder = ImmutableSet.builder(); |
| for (Map.Entry<TupleTag<?>, List<IsmReader<?>>> entry : tagToAllIsmReaders.entrySet()) { |
| ImmutableList.Builder<IsmReader<?>> nonEmptyIsmReaders = ImmutableList.builder(); |
| ImmutableList.Builder<IsmReader<?>> emptyIsmReaders = ImmutableList.builder(); |
| for (IsmReader<?> ismReader : entry.getValue()) { |
| if (ismReader.isEmpty()) { |
| emptyIsmReaders.add(ismReader); |
| } else { |
| if (ismReader.getCoder().getKeyComponentCoders().size() == 1) { |
| // The coder for a singleton-materialized Side Input will contain only the window in the |
| // key. Iterable/List materializations have Window + Index, and Multimap has |
| // Window + Key + Index |
| singletonMaterializationBuilder.add(entry.getKey()); |
| } |
| nonEmptyIsmReaders.add(ismReader); |
| } |
| } |
| tagToIsmReaderMapBuilder.put(entry.getKey(), nonEmptyIsmReaders.build()); |
| tagToEmptyIsmReaderMapBuilder.put(entry.getKey(), emptyIsmReaders.build()); |
| } |
| this.tagToIsmReaderMap = tagToIsmReaderMapBuilder.build(); |
| this.tagToEmptyIsmReaderMap = tagToEmptyIsmReaderMapBuilder.build(); |
| this.singletonMaterializedTags = singletonMaterializationBuilder.build(); |
| } |
| |
| private List<IsmReader<?>> createReadersFromSources( |
| PipelineOptions options, |
| SideInputInfo sideInputInfo, |
| DataflowExecutionContext executionContext, |
| int sideInputIndex) |
| throws Exception { |
| String sideInputKind = getString(sideInputInfo.getKind(), PropertyNames.OBJECT_TYPE_NAME); |
| if (SINGLETON_KIND.equals(sideInputKind)) { |
| checkState( |
| sideInputInfo.getSources().size() == 1, |
| "expecting a singleton side input kind to have a single source"); |
| } else if (!COLLECTION_KIND.equals(sideInputKind)) { |
| throw new Exception("unexpected kind of side input: " + sideInputKind); |
| } |
| |
| SideInputReadCounter sideInputReadCounter = |
| new DataflowSideInputReadCounter(executionContext, operationContext, sideInputIndex); |
| |
| ImmutableList.Builder<IsmReader<?>> builder = ImmutableList.builder(); |
| for (Source source : sideInputInfo.getSources()) { |
| Coder<?> coder = null; |
| if (source.getCodec() != null) { |
| coder = CloudObjects.coderFromCloudObject(CloudObject.fromSpec(source.getCodec())); |
| } |
| |
| CloudObject spec = CloudObject.fromSpec(source.getSpec()); |
| final String filepattern = getString(spec, WorkerPropertyNames.FILENAME); |
| |
| for (String file : Filepatterns.expandAtNFilepattern(filepattern)) { |
| CloudObject fileSpec = spec.clone(); // Deep clone. |
| addString(fileSpec, WorkerPropertyNames.FILENAME, file); |
| |
| @SuppressWarnings("unchecked") |
| NativeReader<?> reader = |
| readerFactory.create(fileSpec, coder, options, executionContext, operationContext); |
| |
| checkState( |
| reader instanceof IsmReader, |
| "%s only supports %s as a reader but was %s.", |
| IsmSideInputReader.class.getSimpleName(), |
| IsmReader.class.getSimpleName(), |
| reader.getClass().getSimpleName()); |
| |
| IsmReader ismReader = (IsmReader) reader; |
| builder.add(new SideInputTrackingIsmReader<>(ismReader, sideInputReadCounter)); |
| } |
| } |
| return builder.build(); |
| } |
| |
| /** |
| * Creates a new {@link SideInputReader} that will provide side inputs according to the provided |
| * {@link SideInputInfo} descriptors. |
| */ |
| static IsmSideInputReader of( |
| Iterable<? extends SideInputInfo> sideInputInfos, |
| PipelineOptions options, |
| BatchModeExecutionContext context, |
| ReaderFactory readerFactory, |
| DataflowOperationContext operationContext) |
| throws Exception { |
| return new IsmSideInputReader( |
| sideInputInfos, options, context, readerFactory, operationContext, SHARED_EXECUTOR_SERVICE); |
| } |
| |
| static IsmSideInputReader forTest( |
| Iterable<? extends SideInputInfo> sideInputInfos, |
| PipelineOptions options, |
| BatchModeExecutionContext context, |
| ReaderFactory readerFactory, |
| DataflowOperationContext operationContext, |
| ExecutorService testExecutorService) |
| throws Exception { |
| return new IsmSideInputReader( |
| sideInputInfos, options, context, readerFactory, operationContext, testExecutorService); |
| } |
| |
| @Override |
| public <T> boolean contains(PCollectionView<T> view) { |
| return tagToIsmReaderMap.containsKey(view.getTagInternal()); |
| } |
| |
| @Override |
| public boolean isEmpty() { |
| return tagToIsmReaderMap.isEmpty(); |
| } |
| |
| @Override |
| public <ViewT> ViewT get(final PCollectionView<ViewT> view, final BoundedWindow window) { |
| @SuppressWarnings("rawtypes") |
| final TupleTag tag = view.getTagInternal(); |
| checkArgument(tagToIsmReaderMap.containsKey(tag), "calling getSideInput() with unknown view"); |
| |
| // We specialize each individual view with a specific data structure tailored |
| // for its use. |
| try { |
| ViewFn<?, ?> viewFn = view.getViewFn(); |
| // We handle the singleton case separately since a null value may be returned. |
| // We use a null place holder to represent this, and when we detect it, we translate |
| // back to null for the user. |
| if (viewFn instanceof SingletonViewFn) { |
| ViewT rval = |
| executionContext |
| .<PCollectionViewWindow<ViewT>, ViewT>getLogicalReferenceCache() |
| .get( |
| PCollectionViewWindow.of(view, window), |
| () -> { |
| @SuppressWarnings("unchecked") |
| ViewT viewT = |
| getSingletonForWindow(tag, (SingletonViewFn<ViewT>) viewFn, window); |
| @SuppressWarnings("unchecked") |
| ViewT nullPlaceHolder = (ViewT) NULL_PLACE_HOLDER; |
| return viewT == null ? nullPlaceHolder : viewT; |
| }); |
| return rval == NULL_PLACE_HOLDER ? null : rval; |
| } else if (singletonMaterializedTags.contains(tag)) { |
| checkArgument( |
| viewFn instanceof MapViewFn || viewFn instanceof MultimapViewFn, |
| "Unknown view type stored as singleton. Expected one of %s, got %s", |
| KNOWN_SINGLETON_VIEW_TYPES, |
| viewFn.getClass().getName()); |
| return executionContext |
| .<PCollectionViewWindow<ViewT>, ViewT>getLogicalReferenceCache() |
| .get( |
| PCollectionViewWindow.of(view, window), |
| () -> { |
| return getMapSingletonForViewAndWindow(tag, window); |
| }); |
| } else { |
| return executionContext |
| .<PCollectionViewWindow<ViewT>, ViewT>getLogicalReferenceCache() |
| .get( |
| PCollectionViewWindow.of(view, window), |
| () -> { |
| if (viewFn instanceof IterableViewFn || viewFn instanceof ListViewFn) { |
| @SuppressWarnings("unchecked") |
| ViewT viewT = (ViewT) getListForWindow(tag, window); |
| return viewT; |
| } else if (viewFn instanceof MapViewFn) { |
| @SuppressWarnings("unchecked") |
| ViewT viewT = (ViewT) getMapForWindow(tag, window); |
| return viewT; |
| } else if (viewFn instanceof MultimapViewFn) { |
| @SuppressWarnings("unchecked") |
| ViewT viewT = (ViewT) getMultimapForWindow(tag, window); |
| return viewT; |
| } else if (viewFn |
| instanceof DataflowPortabilityPCollectionView.PortabilityViewFn) { |
| @SuppressWarnings("unchecked") |
| ViewT viewT = (ViewT) getPortabilityMultimapForWindow(tag, window); |
| return viewT; |
| } |
| throw new IllegalArgumentException("Unknown type of view requested: " + view); |
| }); |
| } |
| } catch (ExecutionException e) { |
| throw new IllegalStateException( |
| String.format("Failed to materialize view %s for window %s.", view, window), |
| e.getCause()); |
| } |
| } |
| |
| /** |
| * Returns the singleton for the provided window if available. Otherwise returns the default |
| * specified within the view. This function expects at most one Ism file to contain a given window |
| * and expects the Ism records to have been written out as: |
| * |
| * <ul> |
| * <li>Key 1: Window |
| * <li>Value: WindowedValue |
| * </ul> |
| */ |
| private <T, W extends BoundedWindow> T getSingletonForWindow( |
| TupleTag<?> viewTag, SingletonViewFn<T> viewFn, W window) throws IOException { |
| @SuppressWarnings({ |
| "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556) |
| "unchecked" |
| }) |
| List<IsmReader<WindowedValue<T>>> readers = (List) tagToIsmReaderMap.get(viewTag); |
| List<IsmReader<WindowedValue<T>>.IsmPrefixReaderIterator> readerIterators = |
| findAndStartReaders(readers, ImmutableList.of(window)); |
| |
| // If no reader iterator contains the value, return the default value stored |
| // within the view. |
| if (readerIterators.isEmpty()) { |
| return viewFn.getDefaultValue(); |
| } |
| |
| checkState( |
| readerIterators.size() == 1, |
| "Expected only one Ism file to contain the singleton record for window %s", |
| window); |
| |
| return readerIterators.get(0).getCurrent().getValue().getValue().getValue(); |
| } |
| |
| /** Get a map written as a singleton view due to a nondeterminstic key {@link Coder}. */ |
| @SuppressWarnings("TypeParameterUnusedInFormals") |
| private <T, W extends BoundedWindow> T getMapSingletonForViewAndWindow( |
| TupleTag<?> viewTag, W window) throws IOException { |
| @SuppressWarnings({ |
| "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556) |
| "unchecked" |
| }) |
| List<IsmReader<WindowedValue<T>>> readers = (List) tagToIsmReaderMap.get(viewTag); |
| List<IsmReader<WindowedValue<T>>.IsmPrefixReaderIterator> readerIterators = |
| findAndStartReaders(readers, ImmutableList.of(window)); |
| |
| // If no reader iterator contains the value, return the default value stored |
| // within the view. |
| if (readerIterators.isEmpty()) { |
| return (T) ImmutableMap.of(); |
| } |
| |
| checkState( |
| readerIterators.size() == 1, |
| "Expected only one Ism file to contain the singleton record for window %s", |
| window); |
| |
| return readerIterators.get(0).getCurrent().getValue().getValue().getValue(); |
| } |
| |
| /** |
| * Returns the list for the provided window if available. Otherwise returns an empty list. For the |
| * non-global window, this function expects at most one Ism file to contain a given window and |
| * expects the Ism records to have been written out as: |
| * |
| * <ul> |
| * <li>Key 1: Window |
| * <li>Key 2: Index offset within window |
| * <li>Value: WindowedValue |
| * </ul> |
| * |
| * <p>For the global window, this expects that each file contains a segment of the list and that |
| * the Ism records have been written out as: |
| * |
| * <ul> |
| * <li>Key 1: Window |
| * <li>Key 2: Index offset within file |
| * <li>Value: WindowedValue |
| * </ul> |
| * |
| * The global offset is computed by summing the number of elements in each file. |
| */ |
| private <T, W extends BoundedWindow> List<T> getListForWindow(TupleTag<?> tag, W window) |
| throws IOException { |
| @SuppressWarnings({"unchecked", "rawtypes"}) |
| List<IsmReader<WindowedValue<T>>> readers = (List) tagToIsmReaderMap.get(tag); |
| List<IsmReader<WindowedValue<T>>.IsmPrefixReaderIterator> readerIterators = |
| findAndStartReaders(readers, ImmutableList.of(window)); |
| if (readerIterators.isEmpty()) { |
| return Collections.emptyList(); |
| } |
| checkState( |
| GlobalWindow.INSTANCE.equals(window) || readerIterators.size() == 1, |
| "Expected to have had PCollectionView %s for window %s contained within one file.", |
| tag, |
| window); |
| return Collections.unmodifiableList( |
| new ListOverReaderIterators<>( |
| readerIterators, (WindowedValue<T> value) -> value.getValue())); |
| } |
| |
| /** |
| * Returns a map over a set of readers or an empty map if the window is not within the side input. |
| * The {@code Ism} files are expected to be sharded by the hash of the key's byte representation. |
| * Each record is required to be structured as follows: |
| * |
| * <ul> |
| * <li>Key 1: User key K |
| * <li>Key 2: Window |
| * <li>Key 3: 0L (constant) |
| * <li>Value: Windowed value |
| * </ul> |
| * |
| * <p>Alongside the data records, there are the following metadata records: |
| * |
| * <ul> |
| * <li>Key 1: Metadata Key |
| * <li>Key 2: Window |
| * <li>Key 3: Index [0, size of map] |
| * <li>Value: variable length long byte representation of size of map if index is 0, otherwise |
| * the byte representation of a key |
| * </ul> |
| * |
| * The {@code [META, Window, 0]} record stores the number of unique keys per window, while {@code |
| * [META, Window, i]} for {@code i} in {@code [1, size of map]} stores a the users key. This |
| * allows for one to access the size of the map by looking at {@code [META, Window, 0]} and |
| * iterate over all the keys by accessing {@code [META, Window, i]} for {@code i} in {@code [1, |
| * size of map]}. |
| */ |
| private <K, V, W extends BoundedWindow> Map<K, V> getMapForWindow(TupleTag<?> tag, W window) |
| throws IOException { |
| @SuppressWarnings({"unchecked", "rawtypes"}) |
| List<IsmReader<WindowedValue<V>>> readers = (List) tagToIsmReaderMap.get(tag); |
| if (readers.isEmpty()) { |
| return Collections.emptyMap(); |
| } |
| |
| @SuppressWarnings("unchecked") |
| Coder<K> keyCoder = |
| ((MetadataKeyCoder<K>) readers.get(0).getCoder().getKeyComponentCoder(0)).getKeyCoder(); |
| |
| Long size = |
| findMetadata( |
| readers, ImmutableList.of(IsmFormat.getMetadataKey(), window, 0L), VarLongCoder.of()); |
| |
| // If there is no metadata record for the window, we can return an empty map. |
| if (size == null) { |
| return Collections.emptyMap(); |
| } |
| |
| return Collections.unmodifiableMap( |
| new MapOverReaders<K, V, V, W>(window, new MapToValue<K, V>(), readers, keyCoder, size)); |
| } |
| |
| /** |
| * Returns a map over a set of readers or an empty map if the window is not within the side input. |
| * The {@code Ism} files are expected to be sharded by the hash of the key's byte representation. |
| * Each record is required to be structured as follows: |
| * |
| * <ul> |
| * <li>Key 1: User key K |
| * <li>Key 2: Window |
| * <li>Key 3: Index offset for a given key and window. |
| * <li>Value: Windowed value |
| * </ul> |
| * |
| * <p>Alongside the data records, there are the following metadata records: |
| * |
| * <ul> |
| * <li>Key 1: Metadata Key |
| * <li>Key 2: Window |
| * <li>Key 3: Index [0, size of map] |
| * <li>Value: variable length long byte representation of size of map if index is 0, otherwise |
| * the byte representation of a key |
| * </ul> |
| * |
| * The {@code [META, Window, 0]} record stores the number of unique keys per window, while {@code |
| * [META, Window, i]} for {@code i} in {@code [1, size of map]} stores a the users key. This |
| * allows for one to access the size of the map by looking at {@code [META, Window, 0]} and |
| * iterate over all the keys by accessing {@code [META, Window, i]} for {@code i} in {@code [1, |
| * size of map]}. |
| */ |
| private <K, V, W extends BoundedWindow> Map<K, Iterable<V>> getMultimapForWindow( |
| TupleTag<?> tag, W window) throws IOException { |
| @SuppressWarnings({"unchecked", "rawtypes"}) |
| List<IsmReader<WindowedValue<V>>> readers = (List) tagToIsmReaderMap.get(tag); |
| if (readers.isEmpty()) { |
| return Collections.emptyMap(); |
| } |
| |
| @SuppressWarnings("unchecked") |
| Coder<K> keyCoder = |
| ((MetadataKeyCoder<K>) readers.get(0).getCoder().getKeyComponentCoder(0)).getKeyCoder(); |
| |
| Long size = |
| findMetadata( |
| readers, ImmutableList.of(IsmFormat.getMetadataKey(), window, 0L), VarLongCoder.of()); |
| |
| // If there is no metadata record for the window, we can return an empty map. |
| if (size == null) { |
| return Collections.emptyMap(); |
| } |
| |
| return Collections.unmodifiableMap( |
| new MapOverReaders<K, V, Iterable<V>, W>( |
| window, new MapToIterable<K, V>(), readers, keyCoder, size)); |
| } |
| |
| /** |
| * Returns a {@link MultimapView} over a set of readers. The {@code Ism} files are expected to be |
| * sharded by the hash of the key's byte representation. Each record is required to be structured |
| * as follows: |
| * |
| * <ul> |
| * <li>Key 1: User key K |
| * <li>Key 2: Window |
| * <li>Key 3: Index offset for a given key and window. |
| * <li>Value: Windowed value |
| * </ul> |
| * |
| * <p>There are no metadata records associated with any records. |
| */ |
| private <K, V, W extends BoundedWindow> MultimapView<K, V> getPortabilityMultimapForWindow( |
| TupleTag<?> tag, W window) throws IOException { |
| @SuppressWarnings({"unchecked", "rawtypes"}) |
| List<IsmReader<V>> readers = (List) tagToIsmReaderMap.get(tag); |
| |
| if (readers.isEmpty()) { |
| return InMemoryMultimapSideInputView.empty(); |
| } |
| |
| return new IsmMultimapView<>(window, readers); |
| } |
| |
| /** |
| * A {@link MultimapView} that fronts a set of readers. This view assumes that the readers have |
| * records using the user key followed by window. |
| */ |
| private class IsmMultimapView<K, V> implements MultimapView<K, V> { |
| |
| private final BoundedWindow window; |
| private final List<IsmReader<V>> readers; |
| |
| private IsmMultimapView(BoundedWindow window, List<IsmReader<V>> readers) { |
| this.window = window; |
| this.readers = readers; |
| } |
| |
| @Override |
| public Iterable<K> get() { |
| throw new UnsupportedOperationException("TODO: Support enumerating the keys."); |
| } |
| |
| @Override |
| public Iterable<V> get(K k) { |
| k = checkArgumentNotNull(k); |
| try { |
| return new ListOverReaderIterators<>( |
| findAndStartReaders(readers, ImmutableList.of(k, window)), (V value) -> value); |
| } catch (IOException e) { |
| throw new RuntimeException( |
| String.format("Unable to create view for window %s and key %s.", window, k), e); |
| } |
| } |
| } |
| |
| /** |
| * Returns the last key from each reader iterator. This function assumes that the last key |
| * component is a long. |
| */ |
| private <V> Collection<Long> getListIndexFromReaderIterators( |
| List<IsmReader<V>.IsmPrefixReaderIterator> readerIterators) throws IOException { |
| List<Callable<Long>> callables = new ArrayList<>(); |
| |
| // Build a list of callables that will return the last key component assuming its a long |
| for (final IsmReader<V>.IsmPrefixReaderIterator readerIterator : readerIterators) { |
| callables.add( |
| () -> { |
| WindowedValue<IsmRecord<V>> last = readerIterator.getLast(); |
| if (last == null) { |
| return 0L; |
| } |
| return ((long) |
| last.getValue().getKeyComponent(last.getValue().getKeyComponents().size() - 1)) |
| + 1L; |
| }); |
| } |
| |
| try { |
| List<Future<Long>> results = executorService.invokeAll(callables); |
| List<Long> lastKeyComponents = new ArrayList<>(results.size()); |
| for (Future<Long> result : results) { |
| lastKeyComponents.add(result.get()); |
| } |
| return lastKeyComponents; |
| } catch (InterruptedException | ExecutionException e) { |
| if (e instanceof InterruptedException) { |
| Thread.currentThread().interrupt(); |
| } |
| // Attempt to propagate the cause if possible. |
| Throwables.propagateIfPossible(e.getCause(), IOException.class); |
| throw new IOException(e); |
| } |
| } |
| |
| /** |
| * Returns a list of reader iterators over the provided key components. Each reader iterator |
| * within the returned list is guaranteed to have at least one element and will be in a state |
| * where {@link NativeReader.NativeReaderIterator#start} has already been called. |
| */ |
| private <V> List<IsmReader<V>.IsmPrefixReaderIterator> findAndStartReaders( |
| List<IsmReader<V>> readers, final List<?> keyComponents) throws IOException { |
| if (readers.isEmpty()) { |
| return Collections.emptyList(); |
| } |
| |
| RandomAccessData keyBytes = new RandomAccessData(); |
| int shardId = readers.get(0).getCoder().encodeAndHash(keyComponents, keyBytes); |
| List<IsmReader<V>.IsmPrefixReaderIterator> readerIterators = new ArrayList<>(); |
| for (final IsmReader<V> reader : readers) { |
| IsmReader<V>.IsmPrefixReaderIterator readerIterator = |
| reader.overKeyComponents(keyComponents, shardId, keyBytes); |
| if (readerIterator.start()) { |
| readerIterators.add(readerIterator); |
| } |
| } |
| return readerIterators; |
| } |
| |
| /** |
| * Finds the metadata associated with the specific key components. Returns null if the metadata |
| * does not exist. |
| */ |
| private <V, T> T findMetadata( |
| List<IsmReader<WindowedValue<V>>> readers, List<?> keyComponents, Coder<T> metadataCoder) |
| throws IOException { |
| |
| // Find a set of reader iterators that have the requested key components. |
| List<IsmReader<WindowedValue<V>>.IsmPrefixReaderIterator> readerIterators = |
| findAndStartReaders(readers, keyComponents); |
| |
| if (readerIterators.isEmpty()) { |
| return null; |
| } |
| |
| // We expect at most one such reader iterator to have been returned. |
| IsmReader<WindowedValue<V>>.IsmPrefixReaderIterator readerIterator = |
| Iterables.getOnlyElement(readerIterators); |
| |
| // Decode the metadata |
| return CoderUtils.decodeFromByteArray( |
| metadataCoder, readerIterator.getCurrent().getValue().getMetadata()); |
| } |
| |
| /** |
| * A list that fronts a set of reader iterators. This list assumes that that reader iterators are |
| * already configured for the proper prefix. For an iterable/list view, this would be the window. |
| * For a multimap, this would be the user key and window. |
| */ |
| private class ListOverReaderIterators<T, V> extends AbstractList<V> { |
| private final List<IsmReader<T>.IsmPrefixReaderIterator> readerIterators; |
| private final ArrayList<Long> numberOfRecordsPerReaderIterator; |
| private final Function<T, V> valueTypeToOutputType; |
| |
| private ListOverReaderIterators( |
| List<IsmReader<T>.IsmPrefixReaderIterator> readerIterators, |
| Function<T, V> valueTypeToOutputType) |
| throws IOException { |
| this.readerIterators = readerIterators; |
| this.numberOfRecordsPerReaderIterator = |
| new ArrayList<>(getListIndexFromReaderIterators(readerIterators)); |
| this.valueTypeToOutputType = valueTypeToOutputType; |
| } |
| |
| // TODO: Support greater than Integer.MAX_VALUE values for iteration/lookup and size. |
| @Override |
| public V get(int index) { |
| return getUsingLong(index); |
| } |
| |
| @Override |
| public int size() { |
| return Ints.checkedCast(longSize()); |
| } |
| |
| @Override |
| public Iterator<V> iterator() { |
| return listIterator(); |
| } |
| |
| @Override |
| public ListIterator<V> listIterator() { |
| return new ListIteratorOverReaderIterators(); |
| } |
| |
| /** Returns the value at the specified position. */ |
| private V getUsingLong(long index) { |
| try { |
| if (index < 0) { |
| throw new IndexOutOfBoundsException("Index out of range: " + index); |
| } |
| |
| // We locate which reader iterator contains the requested index |
| // by using the number of records contained within each reader iterator |
| // as a way to compute the local offset. Once we find a local offset |
| // which is within the bounds of the reader iterator, we use that local |
| // offset to find the requested record. |
| long localOffset = index; |
| int readerIteratorIndex; |
| for (readerIteratorIndex = 0; |
| readerIteratorIndex < numberOfRecordsPerReaderIterator.size(); |
| ++readerIteratorIndex) { |
| |
| if (localOffset < numberOfRecordsPerReaderIterator.get(readerIteratorIndex)) { |
| WindowedValue<IsmRecord<T>> rval = |
| readerIterators.get(readerIteratorIndex).get(ImmutableList.of(localOffset)); |
| |
| checkState( |
| rval != null, |
| "Expected to have found index %s, local offset %s within file.", |
| index, |
| localOffset); |
| |
| return valueTypeToOutputType.apply(rval.getValue().getValue()); |
| } |
| localOffset -= numberOfRecordsPerReaderIterator.get(readerIteratorIndex); |
| } |
| |
| // If we went past the last file then we seeked past the end and are out of bounds. |
| throw new IndexOutOfBoundsException("Index out of range: " + index); |
| } catch (IOException e) { |
| throw new IllegalStateException(e); |
| } |
| } |
| |
| private long longSize() { |
| long total = 0; |
| for (Long numRecords : numberOfRecordsPerReaderIterator) { |
| total += numRecords; |
| } |
| return total; |
| } |
| |
| /** An immutable list iterator that uses a long as its position. */ |
| private class ListIteratorOverReaderIterators implements ListIterator<V> { |
| private long position; |
| |
| @Override |
| public boolean hasNext() { |
| return position < longSize(); |
| } |
| |
| @Override |
| public V next() { |
| if (!hasNext()) { |
| throw new NoSuchElementException(); |
| } |
| V rval = getUsingLong(position); |
| position += 1; |
| return rval; |
| } |
| |
| @Override |
| public boolean hasPrevious() { |
| return position > 0; |
| } |
| |
| @Override |
| public V previous() { |
| if (!hasPrevious()) { |
| throw new NoSuchElementException(); |
| } |
| position -= 1; |
| return getUsingLong(position); |
| } |
| |
| @Override |
| public int nextIndex() { |
| return Ints.checkedCast(position); |
| } |
| |
| @Override |
| public int previousIndex() { |
| return Ints.checkedCast(position - 1); |
| } |
| |
| @Override |
| public void remove() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public void set(V e) { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public void add(V e) { |
| throw new UnsupportedOperationException(); |
| } |
| } |
| } |
| |
| /** |
| * A map that fronts a set of readers. This map assumes that the readers have records using the |
| * user key followed by window. |
| */ |
| private class MapOverReaders<K, V1, V2, W extends BoundedWindow> extends AbstractMap<K, V2> { |
| private final W window; |
| private final Function<KV<K, IsmReader<WindowedValue<V1>>.IsmPrefixReaderIterator>, V2> |
| transform; |
| private final List<IsmReader<WindowedValue<V1>>> readers; |
| private final Coder<K> keyCoder; |
| private final long size; |
| |
| private MapOverReaders( |
| W window, |
| Function<KV<K, IsmReader<WindowedValue<V1>>.IsmPrefixReaderIterator>, V2> transform, |
| List<IsmReader<WindowedValue<V1>>> readers, |
| Coder<K> keyCoder, |
| long size) { |
| |
| this.window = window; |
| this.transform = transform; |
| this.readers = readers; |
| this.keyCoder = keyCoder; |
| this.size = size; |
| } |
| |
| @Override |
| public boolean containsKey(Object key) { |
| try { |
| return !findAndStartReaders(readers, ImmutableList.of(key, window)).isEmpty(); |
| } catch (IOException e) { |
| throw new IllegalStateException(e); |
| } |
| } |
| |
| @Override |
| public V2 get(Object objectKey) { |
| @SuppressWarnings("unchecked") |
| K key = (K) objectKey; |
| try { |
| // We find the reader iterator which contains the key/window prefix. |
| // For maps, this yields only one record. For multimaps, this is a valid |
| // prefix reader iterator. |
| List<IsmReader<WindowedValue<V1>>.IsmPrefixReaderIterator> readerIterators = |
| findAndStartReaders(readers, ImmutableList.of(key, window)); |
| if (readerIterators.isEmpty()) { |
| return null; |
| } |
| |
| // Only one such reader iterator is expected. |
| IsmReader<WindowedValue<V1>>.IsmPrefixReaderIterator readerIterator = |
| Iterables.getOnlyElement(readerIterators); |
| |
| return transform.apply(KV.of(key, readerIterator)); |
| } catch (IOException e) { |
| throw new IllegalStateException(e); |
| } |
| } |
| |
| @Override |
| public int size() { |
| return Ints.checkedCast(size); |
| } |
| |
| @Override |
| public Set<Entry<K, V2>> entrySet() { |
| return new EntrySet(); |
| } |
| |
| /** An entry set that is fronted by this map. */ |
| private class EntrySet extends AbstractSet<Entry<K, V2>> { |
| @Override |
| public boolean contains(Object o) { |
| if (!(o instanceof Entry)) { |
| return false; |
| } |
| @SuppressWarnings("unchecked") |
| Entry<K, ?> entry = (Entry<K, ?>) o; |
| try { |
| // We find the reader iterator which contains the key/window prefix. |
| // For maps, this yields only one record. For multimaps, this is a valid |
| // prefix reader iterator. |
| List<IsmReader<WindowedValue<V1>>.IsmPrefixReaderIterator> readerIterators = |
| findAndStartReaders(readers, ImmutableList.of(entry.getKey(), window)); |
| if (readerIterators.isEmpty()) { |
| return false; |
| } |
| |
| // Only one such reader iterator is expected. |
| IsmReader<WindowedValue<V1>>.IsmPrefixReaderIterator readerIterator = |
| Iterables.getOnlyElement(readerIterators); |
| |
| return Objects.equal( |
| entry.getValue(), transform.apply(KV.of(entry.getKey(), readerIterator))); |
| } catch (IOException e) { |
| throw new IllegalStateException(e); |
| } |
| } |
| |
| @Override |
| public Iterator<Entry<K, V2>> iterator() { |
| return new EntrySetIterator(); |
| } |
| |
| @Override |
| public int size() { |
| return Ints.checkedCast(size); |
| } |
| } |
| |
| /** |
| * An entry set iterator that backs this map which utilizes the [META, Window, Index] records to |
| * locate subsequent keys. |
| */ |
| private class EntrySetIterator implements Iterator<Entry<K, V2>> { |
| long position = 0; |
| |
| @Override |
| public boolean hasNext() { |
| return position < size; |
| } |
| |
| @Override |
| public Entry<K, V2> next() { |
| if (!hasNext()) { |
| throw new NoSuchElementException(); |
| } |
| |
| final K key; |
| final V2 value; |
| try { |
| key = |
| findMetadata( |
| readers, |
| ImmutableList.of(IsmFormat.getMetadataKey(), window, position + 1), |
| keyCoder); |
| value = get(key); |
| } catch (IOException e) { |
| throw new IllegalStateException(e); |
| } |
| // Once we have fetched the key and value we can increment the position knowing that |
| // an exception won't be thrown, thus allowing retries. |
| position += 1; |
| return new StructuralMapEntry<>(keyCoder, key, value); |
| } |
| |
| @Override |
| public void remove() { |
| throw new UnsupportedOperationException(); |
| } |
| } |
| } |
| |
| /** |
| * A function which is able to unwrap windowed values to just their values using a reader iterator |
| * as input. |
| */ |
| private static class MapToValue<K, V> |
| implements Function<KV<K, IsmReader<WindowedValue<V>>.IsmPrefixReaderIterator>, V> { |
| |
| @SuppressFBWarnings( |
| value = "NP_METHOD_PARAMETER_TIGHTENS_ANNOTATION", |
| justification = "https://github.com/google/guava/issues/920") |
| @Override |
| public V apply(@Nonnull KV<K, IsmReader<WindowedValue<V>>.IsmPrefixReaderIterator> input) { |
| IsmReader<WindowedValue<V>>.IsmPrefixReaderIterator startedReader = input.getValue(); |
| WindowedValue<IsmRecord<WindowedValue<V>>> value = startedReader.getCurrent(); |
| return value.getValue().getValue().getValue(); |
| } |
| } |
| |
| /** |
| * A function which is able to create a list over reader iterators using the reader iterator to |
| * back it. |
| */ |
| private class MapToIterable<K, V> |
| implements Function<KV<K, IsmReader<WindowedValue<V>>.IsmPrefixReaderIterator>, Iterable<V>> { |
| |
| @SuppressFBWarnings( |
| value = "NP_METHOD_PARAMETER_TIGHTENS_ANNOTATION", |
| justification = "https://github.com/google/guava/issues/920") |
| @Override |
| public Iterable<V> apply( |
| @Nonnull KV<K, IsmReader<WindowedValue<V>>.IsmPrefixReaderIterator> input) { |
| try { |
| return Iterables.unmodifiableIterable( |
| new ListOverReaderIterators<>( |
| ImmutableList.of(input.getValue()), (WindowedValue<V> value) -> value.getValue())); |
| } catch (IOException e) { |
| throw new IllegalStateException(e); |
| } |
| } |
| } |
| |
| /** A map entry which utilizes the structural value of the key for comparison. */ |
| private static class StructuralMapEntry<K, V> extends AbstractMap.SimpleImmutableEntry<K, V> { |
| private final Coder<K> keyCoder; |
| |
| public StructuralMapEntry(Coder<K> keyCoder, K key, V value) { |
| super(key, value); |
| checkNotNull(keyCoder); |
| this.keyCoder = keyCoder; |
| } |
| |
| @Override |
| public boolean equals(@Nullable Object o) { |
| if (!(o instanceof Map.Entry)) { |
| return false; |
| } |
| try { |
| @SuppressWarnings("unchecked") |
| Map.Entry<K, V> other = (Map.Entry<K, V>) o; |
| return Objects.equal( |
| keyCoder.structuralValue(getKey()), keyCoder.structuralValue(other.getKey())) |
| && Objects.equal(getValue(), other.getValue()); |
| } catch (Exception e) { |
| throw new IllegalArgumentException(e); |
| } |
| } |
| |
| @Override |
| public int hashCode() { |
| try { |
| return Objects.hashCode(keyCoder.structuralValue(getKey()), getValue()); |
| } catch (Exception e) { |
| throw new IllegalStateException(e); |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return MoreObjects.toStringHelper(StructuralMapEntry.class) |
| .add("key", getKey()) |
| .add("value", getValue()) |
| .add("keyCoder", keyCoder) |
| .toString(); |
| } |
| } |
| } |