| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.runners.core; |
| |
| import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument; |
| |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Map; |
| import java.util.Set; |
| import javax.annotation.Nullable; |
| import org.apache.beam.sdk.coders.Coder; |
| import org.apache.beam.sdk.coders.IterableCoder; |
| import org.apache.beam.sdk.coders.KvCoder; |
| import org.apache.beam.sdk.coders.SetCoder; |
| import org.apache.beam.sdk.state.CombiningState; |
| import org.apache.beam.sdk.state.ValueState; |
| import org.apache.beam.sdk.transforms.Combine; |
| import org.apache.beam.sdk.transforms.Materializations; |
| import org.apache.beam.sdk.transforms.Materializations.MultimapView; |
| import org.apache.beam.sdk.transforms.ViewFn; |
| import org.apache.beam.sdk.transforms.windowing.BoundedWindow; |
| import org.apache.beam.sdk.util.WindowedValue; |
| import org.apache.beam.sdk.values.PCollectionView; |
| |
| /** |
| * Generic side input handler that uses {@link StateInternals} to store all data. Both the actual |
| * side-input data and data about the windows for which we have side inputs available are stored |
| * using {@code StateInternals}. |
| * |
| * <p>The given {@code StateInternals} must not be scoped to an element key. The state must instead |
| * be scoped to one key group for which the side input is being managed. |
| * |
| * <p>This is useful for runners that transmit the side-input elements in band, as opposed to how |
| * Dataflow has an external service for managing side inputs. |
| * |
| * <p>Note: storing the available windows in an extra state is redundant for now but in the future |
| * we might want to know which windows we have available so that we can garbage collect side input |
| * data. For now, this will never clean up side-input data because we have no way of knowing when we |
| * reach the GC horizon. |
| */ |
| public class SideInputHandler implements ReadyCheckingSideInputReader { |
| |
| /** The list of side inputs that we're handling. */ |
| protected final Collection<PCollectionView<?>> sideInputs; |
| |
| /** |
| * State internals that are scoped not to the key of a value but are global. The state can still |
| * be kept locally but if side inputs are broadcast to all parallel operators then all will have |
| * the same view of the state. |
| */ |
| private final StateInternals stateInternals; |
| |
| /** |
| * A state tag for each side input that we handle. The state is used to track for which windows we |
| * have input available. |
| */ |
| private final Map< |
| PCollectionView<?>, |
| StateTag<CombiningState<BoundedWindow, Set<BoundedWindow>, Set<BoundedWindow>>>> |
| availableWindowsTags; |
| |
| /** State tag for the actual contents of each side input per window. */ |
| private final Map<PCollectionView<?>, StateTag<ValueState<Iterable<?>>>> sideInputContentsTags; |
| |
| /** |
| * Creates a new {@code SideInputHandler} for the given side inputs that uses the given {@code |
| * StateInternals} to store side input data and side-input meta data. |
| */ |
| public SideInputHandler( |
| Collection<PCollectionView<?>> sideInputs, StateInternals stateInternals) { |
| this.sideInputs = sideInputs; |
| this.stateInternals = stateInternals; |
| this.availableWindowsTags = new HashMap<>(); |
| this.sideInputContentsTags = new HashMap<>(); |
| |
| for (PCollectionView<?> sideInput : sideInputs) { |
| checkArgument( |
| Materializations.MULTIMAP_MATERIALIZATION_URN.equals( |
| sideInput.getViewFn().getMaterialization().getUrn()), |
| "This handler is only capable of dealing with %s materializations " |
| + "but was asked to handle %s for PCollectionView with tag %s.", |
| Materializations.MULTIMAP_MATERIALIZATION_URN, |
| sideInput.getViewFn().getMaterialization().getUrn(), |
| sideInput.getTagInternal().getId()); |
| |
| @SuppressWarnings("unchecked") |
| Coder<BoundedWindow> windowCoder = |
| (Coder<BoundedWindow>) |
| sideInput.getWindowingStrategyInternal().getWindowFn().windowCoder(); |
| |
| StateTag<CombiningState<BoundedWindow, Set<BoundedWindow>, Set<BoundedWindow>>> availableTag = |
| StateTags.combiningValue( |
| "side-input-available-windows-" + sideInput.getTagInternal().getId(), |
| SetCoder.of(windowCoder), |
| new WindowSetCombineFn()); |
| |
| availableWindowsTags.put(sideInput, availableTag); |
| |
| StateTag<ValueState<Iterable<?>>> stateTag = |
| StateTags.value( |
| "side-input-data-" + sideInput.getTagInternal().getId(), |
| (Coder) IterableCoder.of(sideInput.getCoderInternal())); |
| sideInputContentsTags.put(sideInput, stateTag); |
| } |
| } |
| |
| /** |
| * Add the given value to the internal side-input store of the given side input. This might change |
| * the result of {@link #isReady(PCollectionView, BoundedWindow)} for that side input. |
| */ |
| public void addSideInputValue(PCollectionView<?> sideInput, WindowedValue<Iterable<?>> value) { |
| @SuppressWarnings("unchecked") |
| Coder<BoundedWindow> windowCoder = |
| (Coder<BoundedWindow>) sideInput.getWindowingStrategyInternal().getWindowFn().windowCoder(); |
| |
| StateTag<ValueState<Iterable<?>>> stateTag = sideInputContentsTags.get(sideInput); |
| |
| for (BoundedWindow window : value.getWindows()) { |
| stateInternals |
| .state(StateNamespaces.window(windowCoder, window), stateTag) |
| .write(value.getValue()); |
| |
| stateInternals |
| .state(StateNamespaces.global(), availableWindowsTags.get(sideInput)) |
| .add(window); |
| } |
| } |
| |
| @Nullable |
| @Override |
| public <T> T get(PCollectionView<T> view, BoundedWindow window) { |
| |
| Iterable<?> elements = getIterable(view, window); |
| // TODO: Add support for choosing which representation is contained based upon the |
| // side input materialization. We currently can assume that we always have a multimap |
| // materialization as that is the only supported type within the Java SDK. |
| ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>) view.getViewFn(); |
| Coder<?> keyCoder = ((KvCoder<?, ?>) view.getCoderInternal()).getKeyCoder(); |
| return (T) |
| viewFn.apply(InMemoryMultimapSideInputView.fromIterable(keyCoder, (Iterable) elements)); |
| } |
| |
| /** |
| * Retrieve the value as written by {@link #addSideInputValue(PCollectionView, WindowedValue)}, |
| * without applying the SDK specific {@link ViewFn}. |
| * |
| * @param view |
| * @param window |
| * @param <T> |
| * @return |
| */ |
| public <T> Iterable<?> getIterable(PCollectionView<T> view, BoundedWindow window) { |
| @SuppressWarnings("unchecked") |
| Coder<BoundedWindow> windowCoder = |
| (Coder<BoundedWindow>) view.getWindowingStrategyInternal().getWindowFn().windowCoder(); |
| |
| StateTag<ValueState<Iterable<?>>> stateTag = sideInputContentsTags.get(view); |
| |
| ValueState<Iterable<?>> state = |
| stateInternals.state(StateNamespaces.window(windowCoder, window), stateTag); |
| |
| Iterable<?> elements = state.read(); |
| // return empty collection when no side input was received for ready window |
| return (elements != null) ? elements : Collections.emptyList(); |
| } |
| |
| @Override |
| public boolean isReady(PCollectionView<?> sideInput, BoundedWindow window) { |
| Set<BoundedWindow> readyWindows = |
| stateInternals.state(StateNamespaces.global(), availableWindowsTags.get(sideInput)).read(); |
| |
| return readyWindows != null && readyWindows.contains(window); |
| } |
| |
| @Override |
| public <T> boolean contains(PCollectionView<T> view) { |
| return sideInputs.contains(view); |
| } |
| |
| @Override |
| public boolean isEmpty() { |
| return sideInputs.isEmpty(); |
| } |
| |
| /** For keeping track of the windows for which we have available side input. */ |
| private static class WindowSetCombineFn |
| extends Combine.CombineFn<BoundedWindow, Set<BoundedWindow>, Set<BoundedWindow>> { |
| |
| @Override |
| public Set<BoundedWindow> createAccumulator() { |
| return new HashSet<>(); |
| } |
| |
| @Override |
| public Set<BoundedWindow> addInput(Set<BoundedWindow> accumulator, BoundedWindow input) { |
| accumulator.add(input); |
| return accumulator; |
| } |
| |
| @Override |
| public Set<BoundedWindow> mergeAccumulators(Iterable<Set<BoundedWindow>> accumulators) { |
| Set<BoundedWindow> result = new HashSet<>(); |
| for (Set<BoundedWindow> acc : accumulators) { |
| result.addAll(acc); |
| } |
| return result; |
| } |
| |
| @Override |
| public Set<BoundedWindow> extractOutput(Set<BoundedWindow> accumulator) { |
| return accumulator; |
| } |
| } |
| } |