blob: 795d232f77aa2e031e276b87aace24c0451be22a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.transforms;
import com.google.auto.value.AutoValue;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
/**
* A collection of utilities for writing transforms that can handle exceptions raised during
* processing of elements.
*
* <p>Consuming transforms such as {@link MapElements.MapWithFailures} follow the general pattern of
* taking in a user-defined exception handler of type {@code
* ProcessFunction<ExceptionElement<InputT>, FailureOutputT>} where the input {@link
* ExceptionElement} contains an exception along with the input element that was being processed
* when the exception was raised. This handler is responsible for producing some output element that
* captures relevant details of the failure and can be encoded as part of a failure output {@link
* PCollection}. Transforms can then package together their output and failure collections in a
* {@link WithFailures.Result} that avoids users needing to interact with {@code TupleTag}s and
* indexing into a {@link PCollectionTuple}.
*
* <p>Exception handlers can narrow their scope by rethrowing the passed {@link
* ExceptionElement#exception()} and catching only specific subclasses of {@code Exception}.
* Unhandled exceptions will generally bubble up to a top-level {@link
* org.apache.beam.sdk.Pipeline.PipelineExecutionException} that halts progress.
*
* <p>Users can take advantage of {@link Result#failuresTo(List)} for fluent chaining of transforms
* that handle exceptions:
*
* <pre>{@code
* PCollection<Integer> input = ...
* List<PCollection<Map<String, String>> failureCollections = new ArrayList<>();
* input.apply(MapElements.via(...).exceptionsVia(...))
* .failuresTo(failureCollections)
* .apply(MapElements.via(...).exceptionsVia(...))
* .failuresTo(failureCollections);
* PCollection<Map<String, String>> failures = PCollectionList.of(failureCollections)
* .apply("FlattenFailureCollections", Flatten.pCollections());
* }</pre>
*/
@Experimental(Experimental.Kind.WITH_EXCEPTIONS)
public class WithFailures {
/**
* The value type passed as input to exception handlers. It wraps an exception together with the
* input element that was being processed at the time the exception was raised.
*
* <p>Exception handlers may want to re-raise the exception and catch only specific subclasses in
* order to limit the scope of handled exceptions or access subclass-specific data.
*/
@AutoValue
public abstract static class ExceptionElement<T> {
public abstract T element();
public abstract Exception exception();
public static <T> ExceptionElement<T> of(T element, Exception exception) {
return new AutoValue_WithFailures_ExceptionElement<>(element, exception);
}
}
/**
* A simple handler that extracts information from an exception to a {@code Map<String, String>}
* and returns a {@link KV} where the key is the input element that failed processing, and the
* value is the map of exception attributes.
*
* <p>Extends {@link SimpleFunction} so that full type information is captured. Map and {@link KV}
* coders are well supported by Beam, so coder inference can be successfully applied if the
* consuming transform passes type information to the failure collection's {@link TupleTag}.
*
* <p>The keys populated in the map are "className", "message", and "stackTrace" of the exception.
*/
public static class ExceptionAsMapHandler<T>
extends SimpleFunction<ExceptionElement<T>, KV<T, Map<String, String>>> {
@Override
public KV<T, Map<String, String>> apply(ExceptionElement<T> f) {
return KV.of(
f.element(),
ImmutableMap.of(
"className", f.exception().getClass().getName(),
"message", f.exception().getMessage(),
"stackTrace", Arrays.toString(f.exception().getStackTrace())));
}
}
/**
* An intermediate output type for PTransforms that allows an output collection to live alongside
* a collection of elements that failed the transform.
*
* @param <OutputT> Output type
* @param <FailureElementT> Element type for the failure {@code PCollection}
*/
@AutoValue
public abstract static class Result<OutputT extends POutput, FailureElementT>
implements PInput, POutput {
public abstract OutputT output();
@Nullable
abstract TupleTag<?> outputTag();
public abstract PCollection<FailureElementT> failures();
abstract TupleTag<FailureElementT> failuresTag();
public static <OutputT extends POutput, FailureElementT> Result<OutputT, FailureElementT> of(
OutputT output, PCollection<FailureElementT> failures) {
return new AutoValue_WithFailures_Result<>(
output, null, failures, new TupleTag<FailureElementT>());
}
public static <OutputElementT, FailureElementT>
Result<PCollection<OutputElementT>, FailureElementT> of(
PCollection<OutputElementT> output, PCollection<FailureElementT> failures) {
return new AutoValue_WithFailures_Result<>(
output, new TupleTag<OutputElementT>(), failures, new TupleTag<FailureElementT>());
}
public static <OutputElementT, FailureElementT>
Result<PCollection<OutputElementT>, FailureElementT> of(
PCollectionTuple tuple,
TupleTag<OutputElementT> outputTag,
TupleTag<FailureElementT> failureTag) {
return new AutoValue_WithFailures_Result<>(
tuple.get(outputTag), outputTag, tuple.get(failureTag), failureTag);
}
/** Adds the failure collection to the passed list and returns just the output collection. */
public OutputT failuresTo(List<PCollection<FailureElementT>> failureCollections) {
failureCollections.add(failures());
return output();
}
@Override
public Pipeline getPipeline() {
return output().getPipeline();
}
@Override
public Map<TupleTag<?>, PValue> expand() {
Map<TupleTag<?>, PValue> values = new HashMap<>();
values.put(failuresTag(), failures());
if (outputTag() != null && output() instanceof PValue) {
values.put(outputTag(), (PValue) output());
}
return values;
}
@Override
public void finishSpecifyingOutput(
String transformName, PInput input, PTransform<?, ?> transform) {}
}
}