| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.runners.dataflow.worker; |
| |
| import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; |
| |
| import java.util.ArrayList; |
| import java.util.List; |
| import javax.annotation.Nullable; |
| import org.apache.beam.runners.core.DoFnRunner; |
| import org.apache.beam.runners.core.DoFnRunners; |
| import org.apache.beam.runners.core.DoFnRunners.OutputManager; |
| import org.apache.beam.runners.core.KeyedWorkItem; |
| import org.apache.beam.runners.core.KeyedWorkItems; |
| import org.apache.beam.runners.core.LateDataDroppingDoFnRunner; |
| import org.apache.beam.runners.core.SideInputReader; |
| import org.apache.beam.runners.core.TimerInternals; |
| import org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoFn; |
| import org.apache.beam.runners.dataflow.worker.util.common.worker.Receiver; |
| import org.apache.beam.sdk.coders.Coder; |
| import org.apache.beam.sdk.options.PipelineOptions; |
| import org.apache.beam.sdk.options.StreamingOptions; |
| import org.apache.beam.sdk.transforms.windowing.BoundedWindow; |
| import org.apache.beam.sdk.util.WindowedValue; |
| import org.apache.beam.sdk.values.KV; |
| import org.apache.beam.sdk.values.PCollectionView; |
| import org.apache.beam.sdk.values.TupleTag; |
| import org.apache.beam.sdk.values.WindowingStrategy; |
| |
| /** A specialized {@link ParDoFn} for GroupAlsoByWindow related {@link GroupAlsoByWindowFn}'s. */ |
| public class GroupAlsoByWindowsParDoFn<InputT, K, V, W extends BoundedWindow> implements ParDoFn { |
| private final PipelineOptions options; |
| |
| private final SideInputReader sideInputReader; |
| private final TupleTag<KV<K, Iterable<V>>> mainOutputTag; |
| private final DataflowExecutionContext.DataflowStepContext stepContext; |
| private final GroupAlsoByWindowFn<InputT, KV<K, Iterable<V>>> doFn; |
| private final WindowingStrategy<?, W> windowingStrategy; |
| private final Iterable<PCollectionView<?>> sideInputViews; |
| private final Coder<InputT> inputCoder; |
| |
| // TODO: do not share this class, or refactor, in a way such that the guts need to do |
| // different things based on whether it is streaming/batch, or what kind of GABW |
| // function it is invoking |
| private final boolean acceptsKeyedWorkItems; |
| |
| // Various DoFn helpers, null between bundles |
| @Nullable private DoFnRunner<InputT, KV<K, Iterable<V>>> fnRunner; |
| @Nullable private Receiver receiver; |
| |
| /** |
| * Creates a {@link GroupAlsoByWindowsParDoFn} using basic information about the {@link |
| * GroupAlsoByWindowFn} and the step being executed. |
| */ |
| GroupAlsoByWindowsParDoFn( |
| PipelineOptions options, |
| GroupAlsoByWindowFn<InputT, KV<K, Iterable<V>>> doFn, |
| WindowingStrategy<?, W> windowingStrategy, |
| Iterable<PCollectionView<?>> sideInputViews, |
| Coder<InputT> inputCoder, |
| SideInputReader sideInputReader, |
| TupleTag<KV<K, Iterable<V>>> mainOutputTag, |
| DataflowExecutionContext.DataflowStepContext stepContext) { |
| this.options = options; |
| |
| this.sideInputReader = sideInputReader; |
| this.mainOutputTag = mainOutputTag; |
| this.stepContext = stepContext; |
| this.doFn = doFn; |
| this.windowingStrategy = windowingStrategy; |
| this.sideInputViews = sideInputViews; |
| this.inputCoder = inputCoder; |
| |
| this.acceptsKeyedWorkItems = inputCoder instanceof WindmillKeyedWorkItem.FakeKeyedWorkItemCoder; |
| } |
| |
| @Override |
| public void startBundle(Receiver... receivers) throws Exception { |
| checkState(fnRunner == null, "bundle already started (or not properly finished)"); |
| checkState( |
| receivers.length == 1, |
| "%s.startBundle() called with %s receivers, expected exactly 1. " |
| + "This is a bug in the Dataflow service", |
| getClass().getSimpleName(), |
| receivers.length); |
| receiver = receivers[0]; |
| fnRunner = createRunner(); |
| fnRunner.startBundle(); |
| } |
| |
| @Override |
| @SuppressWarnings("unchecked") |
| public void processElement(Object untypedElem) throws Exception { |
| checkState(fnRunner != null); |
| |
| // TODO: do not do this with an "if" |
| if (!acceptsKeyedWorkItems) { |
| // The straightforward case: we don't need to mess with timers |
| fnRunner.processElement((WindowedValue<InputT>) untypedElem); |
| } else { |
| // Gather timers just for this step |
| Coder<W> windowCoder = windowingStrategy.getWindowFn().windowCoder(); |
| TimerInternals.TimerData timer; |
| List<TimerInternals.TimerData> firedTimers = new ArrayList<>(); |
| while ((timer = stepContext.getNextFiredTimer(windowCoder)) != null) { |
| firedTimers.add(timer); |
| } |
| |
| // Build a new input with just the timers we should process |
| WindowedValue<KeyedWorkItem<K, V>> typedElem = |
| (WindowedValue<KeyedWorkItem<K, V>>) untypedElem; |
| KeyedWorkItem<K, V> keyedWorkItemWithAllTheTimers = typedElem.getValue(); |
| KeyedWorkItem<K, V> keyedWorkItemWithJustMyTimers = |
| KeyedWorkItems.workItem( |
| keyedWorkItemWithAllTheTimers.key(), |
| firedTimers, |
| keyedWorkItemWithAllTheTimers.elementsIterable()); |
| |
| fnRunner.processElement( |
| (WindowedValue<InputT>) typedElem.withValue(keyedWorkItemWithJustMyTimers)); |
| } |
| } |
| |
| @Override |
| public void processTimers() throws Exception { |
| // TODO: call ReduceFnRunner.onTimers here, without all the intervening |
| // layers, or else process timers one key at a time and have access to |
| // it here to build a KeyedWorkItem |
| } |
| |
| @Override |
| public void finishBundle() throws Exception { |
| checkState(fnRunner != null); |
| fnRunner.finishBundle(); |
| fnRunner = null; |
| } |
| |
| @Override |
| public void abort() throws Exception { |
| fnRunner = null; |
| } |
| |
| /** |
| * Composes and returns a {@link DoFnRunner} based on the parameters. |
| * |
| * <p>A {@code SimpleOldDoFnRunner} executes the {@link GroupAlsoByWindowFn}. |
| * |
| * <p>A {@link LateDataDroppingDoFnRunner} handles late data dropping for a {@link |
| * StreamingGroupAlsoByWindowViaWindowSetFn}. |
| * |
| * <p>A {@link StreamingSideInputDoFnRunner} handles streaming side inputs. |
| * |
| * <p>A {@link StreamingKeyedWorkItemSideInputDoFnRunner} handles streaming side inputs for a |
| * {@link StreamingGroupAlsoByWindowViaWindowSetFn}. |
| */ |
| private DoFnRunner<InputT, KV<K, Iterable<V>>> createRunner() { |
| OutputManager outputManager = |
| new OutputManager() { |
| @Override |
| public <T> void output(TupleTag<T> tag, WindowedValue<T> output) { |
| checkState( |
| tag.equals(mainOutputTag), |
| "Must only output to main output tag (%s), but was %s", |
| tag, |
| mainOutputTag); |
| try { |
| receiver.process(output); |
| } catch (Throwable t) { |
| throw new RuntimeException(t); |
| } |
| } |
| }; |
| |
| boolean hasStreamingSideInput = |
| options.as(StreamingOptions.class).isStreaming() && !sideInputReader.isEmpty(); |
| |
| DoFnRunner<InputT, KV<K, Iterable<V>>> basicRunner = |
| new GroupAlsoByWindowFnRunner<>( |
| options, doFn, sideInputReader, outputManager, mainOutputTag, stepContext); |
| |
| if (doFn instanceof StreamingGroupAlsoByWindowViaWindowSetFn) { |
| DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> streamingGABWRunner = |
| (DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>>) basicRunner; |
| |
| if (hasStreamingSideInput) { |
| @SuppressWarnings("unchecked") |
| WindmillKeyedWorkItem.FakeKeyedWorkItemCoder<K, V> keyedWorkItemCoder = |
| (WindmillKeyedWorkItem.FakeKeyedWorkItemCoder<K, V>) inputCoder; |
| StreamingSideInputFetcher<V, W> sideInputFetcher = |
| new StreamingSideInputFetcher<>( |
| sideInputViews, |
| keyedWorkItemCoder.getElementCoder(), |
| windowingStrategy, |
| (StreamingModeExecutionContext.StreamingModeStepContext) stepContext); |
| |
| streamingGABWRunner = |
| new StreamingKeyedWorkItemSideInputDoFnRunner<>( |
| streamingGABWRunner, |
| keyedWorkItemCoder.getKeyCoder(), |
| sideInputFetcher, |
| stepContext); |
| } |
| return (DoFnRunner<InputT, KV<K, Iterable<V>>>) |
| DoFnRunners.<K, V, Iterable<V>, W>lateDataDroppingRunner( |
| streamingGABWRunner, stepContext.timerInternals(), windowingStrategy); |
| } else { |
| if (hasStreamingSideInput) { |
| return new StreamingSideInputDoFnRunner<>( |
| basicRunner, |
| new StreamingSideInputFetcher<>( |
| sideInputViews, |
| inputCoder, |
| windowingStrategy, |
| (StreamingModeExecutionContext.StreamingModeStepContext) stepContext)); |
| } else { |
| return basicRunner; |
| } |
| } |
| } |
| } |