blob: e8c239f46f3ee39ebb23f931023f68198cbb6f61 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.OutputWindowedValue;
import org.apache.beam.runners.core.ReduceFnRunner;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.StepContext;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.construction.TriggerTranslation;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowFn;
import org.apache.beam.runners.dataflow.worker.util.StreamingGroupAlsoByWindowFn;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.WindowingStrategy;
/**
* A general {@link BatchGroupAlsoByWindowFn}. This delegates all of the logic to the {@link
* ReduceFnRunner}.
*/
public class StreamingGroupAlsoByWindowViaWindowSetFn<K, InputT, OutputT, W extends BoundedWindow>
extends StreamingGroupAlsoByWindowFn<K, InputT, OutputT> {
public static <K, InputT, OutputT, W extends BoundedWindow>
GroupAlsoByWindowFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> create(
WindowingStrategy<?, W> strategy,
StateInternalsFactory<K> stateInternalsFactory,
SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn) {
return new StreamingGroupAlsoByWindowViaWindowSetFn<>(
strategy, stateInternalsFactory, reduceFn);
}
private final WindowingStrategy<Object, W> windowingStrategy;
private final StateInternalsFactory<K> stateInternalsFactory;
private SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn;
private StreamingGroupAlsoByWindowViaWindowSetFn(
WindowingStrategy<?, W> windowingStrategy,
StateInternalsFactory<K> stateInternalsFactory,
SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn) {
@SuppressWarnings("unchecked")
WindowingStrategy<Object, W> noWildcard = (WindowingStrategy<Object, W>) windowingStrategy;
this.windowingStrategy = noWildcard;
this.reduceFn = reduceFn;
this.stateInternalsFactory = stateInternalsFactory;
}
@Override
public void processElement(
KeyedWorkItem<K, InputT> keyedWorkItem,
PipelineOptions options,
StepContext stepContext,
SideInputReader sideInputReader,
OutputWindowedValue<KV<K, OutputT>> output)
throws Exception {
K key = keyedWorkItem.key();
StateInternals stateInternals = stateInternalsFactory.stateInternalsForKey(key);
ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner =
new ReduceFnRunner<K, InputT, OutputT, W>(
key,
windowingStrategy,
ExecutableTriggerStateMachine.create(
TriggerStateMachines.stateMachineForTrigger(
TriggerTranslation.toProto(windowingStrategy.getTrigger()))),
stateInternals,
stepContext.timerInternals(),
output,
sideInputReader,
reduceFn,
options);
reduceFnRunner.processElements(keyedWorkItem.elementsIterable());
reduceFnRunner.onTimers(keyedWorkItem.timersIterable());
reduceFnRunner.persist();
}
public GroupAlsoByWindowFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn() {
// Safe contravariant cast
@SuppressWarnings("unchecked")
GroupAlsoByWindowFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asFn =
(GroupAlsoByWindowFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>>) this;
return asFn;
}
}