blob: 714693cc68709e7c2274fca6f4728e79ae995ac1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.samza.runtime;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.SideInputHandler;
import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.fnexecution.control.ExecutableStageContext;
import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.samza.SamzaExecutionContext;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.util.DoFnUtils;
import org.apache.beam.runners.samza.util.FutureUtils;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators;
import org.apache.samza.config.Config;
import org.apache.samza.context.Context;
import org.apache.samza.operators.Scheduler;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Samza operator for {@link DoFn}. */
@SuppressWarnings({
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
private static final Logger LOG = LoggerFactory.getLogger(DoFnOp.class);
private final TupleTag<FnOutT> mainOutputTag;
private final DoFn<InT, FnOutT> doFn;
private final Coder<?> keyCoder;
private final Collection<PCollectionView<?>> sideInputs;
private final List<TupleTag<?>> sideOutputTags;
private final WindowingStrategy windowingStrategy;
private final OutputManagerFactory<OutT> outputManagerFactory;
// NOTE: we use HashMap here to guarantee Serializability
// Mapping from view id to a view
private final HashMap<String, PCollectionView<?>> idToViewMap;
private final String transformFullName;
private final String transformId;
private final Coder<InT> inputCoder;
private final Coder<WindowedValue<InT>> windowedValueCoder;
private final HashMap<TupleTag<?>, Coder<?>> outputCoders;
private final PCollection.IsBounded isBounded;
private final String bundleCheckTimerId;
private final String bundleStateId;
// portable api related
private final boolean isPortable;
private final RunnerApi.ExecutableStagePayload stagePayload;
private final JobInfo jobInfo;
private final HashMap<String, TupleTag<?>> idToTupleTagMap;
private transient SamzaTimerInternalsFactory<?> timerInternalsFactory;
private transient DoFnRunner<InT, FnOutT> fnRunner;
private transient PushbackSideInputDoFnRunner<InT, FnOutT> pushbackFnRunner;
private transient SideInputHandler sideInputHandler;
private transient DoFnInvoker<InT, FnOutT> doFnInvoker;
private transient SamzaPipelineOptions samzaPipelineOptions;
// This is derivable from pushbackValues which is persisted to a store.
// TODO: eagerly initialize the hold in init
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
justification = "No bug",
value = "SE_TRANSIENT_FIELD_NOT_RESTORED")
private transient Instant pushbackWatermarkHold;
// TODO: add this to checkpointable state
private transient Instant inputWatermark;
private transient BundleManager<OutT> bundleManager;
private transient Instant sideInputWatermark;
private transient List<WindowedValue<InT>> pushbackValues;
private transient ExecutableStageContext stageContext;
private transient StageBundleFactory stageBundleFactory;
private transient boolean bundleDisabled;
private final DoFnSchemaInformation doFnSchemaInformation;
private final Map<?, PCollectionView<?>> sideInputMapping;
public DoFnOp(
TupleTag<FnOutT> mainOutputTag,
DoFn<InT, FnOutT> doFn,
Coder<?> keyCoder,
Coder<InT> inputCoder,
Coder<WindowedValue<InT>> windowedValueCoder,
Map<TupleTag<?>, Coder<?>> outputCoders,
Collection<PCollectionView<?>> sideInputs,
List<TupleTag<?>> sideOutputTags,
WindowingStrategy windowingStrategy,
Map<String, PCollectionView<?>> idToViewMap,
OutputManagerFactory<OutT> outputManagerFactory,
String transformFullName,
String transformId,
PCollection.IsBounded isBounded,
boolean isPortable,
RunnerApi.ExecutableStagePayload stagePayload,
JobInfo jobInfo,
Map<String, TupleTag<?>> idToTupleTagMap,
DoFnSchemaInformation doFnSchemaInformation,
Map<?, PCollectionView<?>> sideInputMapping) {
this.mainOutputTag = mainOutputTag;
this.doFn = doFn;
this.sideInputs = sideInputs;
this.sideOutputTags = sideOutputTags;
this.inputCoder = inputCoder;
this.windowedValueCoder = windowedValueCoder;
this.outputCoders = new HashMap<>(outputCoders);
this.windowingStrategy = windowingStrategy;
this.idToViewMap = new HashMap<>(idToViewMap);
this.outputManagerFactory = outputManagerFactory;
this.transformFullName = transformFullName;
this.transformId = transformId;
this.keyCoder = keyCoder;
this.isBounded = isBounded;
this.isPortable = isPortable;
this.stagePayload = stagePayload;
this.jobInfo = jobInfo;
this.idToTupleTagMap = new HashMap<>(idToTupleTagMap);
this.bundleCheckTimerId = "_samza_bundle_check_" + transformId;
this.bundleStateId = "_samza_bundle_" + transformId;
this.doFnSchemaInformation = doFnSchemaInformation;
this.sideInputMapping = sideInputMapping;
}
@Override
@SuppressWarnings("unchecked")
public void open(
Config config,
Context context,
Scheduler<KeyedTimerData<Void>> timerRegistry,
OpEmitter<OutT> emitter) {
this.inputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
this.sideInputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
this.pushbackWatermarkHold = BoundedWindow.TIMESTAMP_MAX_VALUE;
final DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
final SamzaExecutionContext samzaExecutionContext =
(SamzaExecutionContext) context.getApplicationContainerContext();
this.samzaPipelineOptions = samzaExecutionContext.getPipelineOptions();
this.bundleDisabled = samzaPipelineOptions.getMaxBundleSize() <= 1;
final String stateId = "pardo-" + transformId;
final SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory =
SamzaStoreStateInternals.createNonKeyedStateInternalsFactory(
stateId, context.getTaskContext(), samzaPipelineOptions);
final FutureCollector<OutT> outputFutureCollector = createFutureCollector();
this.bundleManager =
new BundleManager<>(
createBundleProgressListener(),
outputFutureCollector,
samzaPipelineOptions.getMaxBundleSize(),
samzaPipelineOptions.getMaxBundleTimeMs(),
timerRegistry,
bundleCheckTimerId);
this.timerInternalsFactory =
SamzaTimerInternalsFactory.createTimerInternalFactory(
keyCoder,
(Scheduler) timerRegistry,
getTimerStateId(signature),
nonKeyedStateInternalsFactory,
windowingStrategy,
isBounded,
samzaPipelineOptions);
this.sideInputHandler =
new SideInputHandler(sideInputs, nonKeyedStateInternalsFactory.stateInternalsForKey(null));
if (isPortable) {
final ExecutableStage executableStage = ExecutableStage.fromPayload(stagePayload);
stageContext = SamzaExecutableStageContextFactory.getInstance().get(jobInfo);
stageBundleFactory = stageContext.getStageBundleFactory(executableStage);
this.fnRunner =
SamzaDoFnRunners.createPortable(
transformId,
DoFnUtils.toStepName(executableStage),
bundleStateId,
windowedValueCoder,
executableStage,
sideInputMapping,
sideInputHandler,
nonKeyedStateInternalsFactory,
timerInternalsFactory,
samzaPipelineOptions,
outputManagerFactory.create(emitter, outputFutureCollector),
stageBundleFactory,
samzaExecutionContext,
mainOutputTag,
idToTupleTagMap,
context,
transformFullName);
} else {
this.fnRunner =
SamzaDoFnRunners.create(
samzaPipelineOptions,
doFn,
windowingStrategy,
transformFullName,
stateId,
context,
mainOutputTag,
sideInputHandler,
timerInternalsFactory,
keyCoder,
outputManagerFactory.create(emitter, outputFutureCollector),
inputCoder,
sideOutputTags,
outputCoders,
doFnSchemaInformation,
(Map<String, PCollectionView<?>>) sideInputMapping);
}
this.pushbackFnRunner =
SimplePushbackSideInputDoFnRunner.create(fnRunner, sideInputs, sideInputHandler);
this.pushbackValues = new ArrayList<>();
final Iterator<SamzaDoFnInvokerRegistrar> invokerReg =
ServiceLoader.load(SamzaDoFnInvokerRegistrar.class).iterator();
if (!invokerReg.hasNext()) {
// use the default invoker here
doFnInvoker = DoFnInvokers.tryInvokeSetupFor(doFn, samzaPipelineOptions);
} else {
doFnInvoker =
Iterators.getOnlyElement(invokerReg).invokerSetupFor(doFn, samzaPipelineOptions, context);
}
}
FutureCollector<OutT> createFutureCollector() {
return new FutureCollectorImpl<>();
}
private String getTimerStateId(DoFnSignature signature) {
final StringBuilder builder = new StringBuilder("timer");
if (signature.usesTimers()) {
signature.timerDeclarations().keySet().forEach(builder::append);
}
return builder.toString();
}
@Override
public void processElement(WindowedValue<InT> inputElement, OpEmitter<OutT> emitter) {
try {
bundleManager.tryStartBundle();
final Iterable<WindowedValue<InT>> rejectedValues =
pushbackFnRunner.processElementInReadyWindows(inputElement);
for (WindowedValue<InT> rejectedValue : rejectedValues) {
if (rejectedValue.getTimestamp().compareTo(pushbackWatermarkHold) < 0) {
pushbackWatermarkHold = rejectedValue.getTimestamp();
}
pushbackValues.add(rejectedValue);
}
bundleManager.tryFinishBundle(emitter);
} catch (Throwable t) {
LOG.error("Encountered error during process element", t);
bundleManager.signalFailure(t);
throw t;
}
}
private void doProcessWatermark(Instant watermark, OpEmitter<OutT> emitter) {
this.inputWatermark = watermark;
if (sideInputWatermark.isEqual(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
// this means we will never see any more side input
emitAllPushbackValues();
}
final Instant actualInputWatermark =
pushbackWatermarkHold.isBefore(inputWatermark) ? pushbackWatermarkHold : inputWatermark;
timerInternalsFactory.setInputWatermark(actualInputWatermark);
Collection<? extends KeyedTimerData<?>> readyTimers = timerInternalsFactory.removeReadyTimers();
if (!readyTimers.isEmpty()) {
pushbackFnRunner.startBundle();
for (KeyedTimerData<?> keyedTimerData : readyTimers) {
fireTimer(keyedTimerData);
}
pushbackFnRunner.finishBundle();
}
if (timerInternalsFactory.getOutputWatermark() == null
|| timerInternalsFactory.getOutputWatermark().isBefore(actualInputWatermark)) {
timerInternalsFactory.setOutputWatermark(actualInputWatermark);
emitter.emitWatermark(timerInternalsFactory.getOutputWatermark());
}
}
@Override
public void processWatermark(Instant watermark, OpEmitter<OutT> emitter) {
bundleManager.processWatermark(watermark, emitter);
}
@Override
public void processSideInput(
String id, WindowedValue<? extends Iterable<?>> elements, OpEmitter<OutT> emitter) {
checkState(
bundleDisabled, "Side input not supported in bundling mode. Please disable bundling.");
@SuppressWarnings("unchecked")
final WindowedValue<Iterable<?>> retypedElements = (WindowedValue<Iterable<?>>) elements;
final PCollectionView<?> view = idToViewMap.get(id);
if (view == null) {
throw new IllegalArgumentException("No mapping of id " + id + " to view.");
}
sideInputHandler.addSideInputValue(view, retypedElements);
final List<WindowedValue<InT>> previousPushbackValues = new ArrayList<>(pushbackValues);
pushbackWatermarkHold = BoundedWindow.TIMESTAMP_MAX_VALUE;
pushbackValues.clear();
for (final WindowedValue<InT> value : previousPushbackValues) {
processElement(value, emitter);
}
// We may be able to advance the output watermark since we may have played some pushed back
// events.
processWatermark(this.inputWatermark, emitter);
}
@Override
public void processSideInputWatermark(Instant watermark, OpEmitter<OutT> emitter) {
checkState(
bundleDisabled, "Side input not supported in bundling mode. Please disable bundling.");
sideInputWatermark = watermark;
if (sideInputWatermark.isEqual(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
// this means we will never see any more side input
processWatermark(this.inputWatermark, emitter);
}
}
@Override
@SuppressWarnings("unchecked")
public void processTimer(KeyedTimerData<Void> keyedTimerData, OpEmitter<OutT> emitter) {
// this is internal timer in processing time to check whether a bundle should be closed
if (bundleCheckTimerId.equals(keyedTimerData.getTimerData().getTimerId())) {
bundleManager.processTimer(keyedTimerData, emitter);
return;
}
pushbackFnRunner.startBundle();
fireTimer(keyedTimerData);
pushbackFnRunner.finishBundle();
this.timerInternalsFactory.removeProcessingTimer((KeyedTimerData) keyedTimerData);
}
@Override
public void close() {
doFnInvoker.invokeTeardown();
try (AutoCloseable factory = stageBundleFactory;
AutoCloseable context = stageContext) {
// do nothing
} catch (Exception e) {
LOG.error("Failed to close stage bundle factory", e);
}
}
private void fireTimer(KeyedTimerData<?> keyedTimerData) {
final TimerInternals.TimerData timer = keyedTimerData.getTimerData();
LOG.debug("Firing timer {}", timer);
final StateNamespace namespace = timer.getNamespace();
// NOTE: not sure why this is safe, but DoFnOperator makes this assumption
final BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();
fnRunner.onTimer(
timer.getTimerId(),
timer.getTimerFamilyId(),
keyedTimerData.getKey(),
window,
timer.getTimestamp(),
timer.getOutputTimestamp(),
timer.getDomain());
}
// todo: should this go through bundle manager to start and finish the bundle?
private void emitAllPushbackValues() {
if (!pushbackValues.isEmpty()) {
pushbackFnRunner.startBundle();
final List<WindowedValue<InT>> previousPushbackValues = new ArrayList<>(pushbackValues);
pushbackWatermarkHold = BoundedWindow.TIMESTAMP_MAX_VALUE;
pushbackValues.clear();
for (final WindowedValue<InT> value : previousPushbackValues) {
fnRunner.processElement(value);
}
pushbackFnRunner.finishBundle();
}
}
private BundleManager.BundleProgressListener<OutT> createBundleProgressListener() {
return new BundleManager.BundleProgressListener<OutT>() {
@Override
public void onBundleStarted() {
pushbackFnRunner.startBundle();
}
@Override
public void onBundleFinished(OpEmitter<OutT> emitter) {
pushbackFnRunner.finishBundle();
}
@Override
public void onWatermark(Instant watermark, OpEmitter<OutT> emitter) {
doProcessWatermark(watermark, emitter);
}
};
}
static <T, OutT> CompletionStage<WindowedValue<OutT>> createOutputFuture(
WindowedValue<T> windowedValue,
CompletionStage<T> valueFuture,
Function<T, OutT> valueMapper) {
return valueFuture.thenApply(
res ->
WindowedValue.of(
valueMapper.apply(res),
windowedValue.getTimestamp(),
windowedValue.getWindows(),
windowedValue.getPane()));
}
static class FutureCollectorImpl<OutT> implements FutureCollector<OutT> {
private final List<CompletionStage<WindowedValue<OutT>>> outputFutures;
private final AtomicBoolean collectorSealed;
FutureCollectorImpl() {
/*
* Choosing synchronized list here since the concurrency is low as the message dispatch thread is single threaded.
* We need this guard against scenarios when watermark/finish bundle trigger outputs.
*/
outputFutures = Collections.synchronizedList(new ArrayList<>());
collectorSealed = new AtomicBoolean(true);
}
@Override
public void add(CompletionStage<WindowedValue<OutT>> element) {
checkState(
!collectorSealed.get(),
"Cannot add elements to an unprepared collector. Make sure prepare() is invoked before adding elements.");
outputFutures.add(element);
}
@Override
public void discard() {
collectorSealed.compareAndSet(false, true);
outputFutures.clear();
}
@Override
public CompletionStage<Collection<WindowedValue<OutT>>> finish() {
/*
* We can ignore the results here because its okay to call finish without invoking prepare. It will be a no-op
* and an empty collection will be returned.
*/
collectorSealed.compareAndSet(false, true);
CompletionStage<Collection<WindowedValue<OutT>>> sealedOutputFuture =
FutureUtils.flattenFutures(outputFutures);
outputFutures.clear();
return sealedOutputFuture;
}
@Override
public void prepare() {
boolean isCollectorSealed = collectorSealed.compareAndSet(true, false);
checkState(
isCollectorSealed,
"Failed to prepare the collector. Collector needs to be sealed before prepare() is invoked.");
}
}
/**
* Factory class to create an {@link org.apache.beam.runners.core.DoFnRunners.OutputManager} that
* emits values to the main output only, which is a single {@link
* org.apache.beam.sdk.values.PCollection}.
*
* @param <OutT> type of the output element.
*/
public static class SingleOutputManagerFactory<OutT> implements OutputManagerFactory<OutT> {
@Override
public DoFnRunners.OutputManager create(OpEmitter<OutT> emitter) {
return createOutputManager(emitter, null);
}
@Override
public DoFnRunners.OutputManager create(
OpEmitter<OutT> emitter, FutureCollector<OutT> collector) {
return createOutputManager(emitter, collector);
}
private DoFnRunners.OutputManager createOutputManager(
OpEmitter<OutT> emitter, FutureCollector<OutT> collector) {
return new DoFnRunners.OutputManager() {
@Override
@SuppressWarnings("unchecked")
public <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
// With only one input we know that T is of type OutT.
if (windowedValue.getValue() instanceof CompletionStage) {
CompletionStage<T> valueFuture = (CompletionStage<T>) windowedValue.getValue();
if (collector != null) {
collector.add(createOutputFuture(windowedValue, valueFuture, value -> (OutT) value));
}
} else {
final WindowedValue<OutT> retypedWindowedValue = (WindowedValue<OutT>) windowedValue;
emitter.emitElement(retypedWindowedValue);
}
}
};
}
}
/**
* Factory class to create an {@link org.apache.beam.runners.core.DoFnRunners.OutputManager} that
* emits values to the main output as well as the side outputs via union type {@link
* RawUnionValue}.
*/
public static class MultiOutputManagerFactory implements OutputManagerFactory<RawUnionValue> {
private final Map<TupleTag<?>, Integer> tagToIndexMap;
public MultiOutputManagerFactory(Map<TupleTag<?>, Integer> tagToIndexMap) {
this.tagToIndexMap = tagToIndexMap;
}
@Override
public DoFnRunners.OutputManager create(OpEmitter<RawUnionValue> emitter) {
return createOutputManager(emitter, null);
}
@Override
public DoFnRunners.OutputManager create(
OpEmitter<RawUnionValue> emitter, FutureCollector<RawUnionValue> collector) {
return createOutputManager(emitter, collector);
}
private DoFnRunners.OutputManager createOutputManager(
OpEmitter<RawUnionValue> emitter, FutureCollector<RawUnionValue> collector) {
return new DoFnRunners.OutputManager() {
@Override
@SuppressWarnings("unchecked")
public <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
final int index = tagToIndexMap.get(tupleTag);
final T rawValue = windowedValue.getValue();
if (rawValue instanceof CompletionStage) {
CompletionStage<T> valueFuture = (CompletionStage<T>) rawValue;
if (collector != null) {
collector.add(
createOutputFuture(
windowedValue, valueFuture, res -> new RawUnionValue(index, res)));
}
} else {
final RawUnionValue rawUnionValue = new RawUnionValue(index, rawValue);
emitter.emitElement(windowedValue.withValue(rawUnionValue));
}
}
};
}
}
}