| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.transforms; |
| |
| import com.google.auto.value.AutoValue; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import javax.annotation.Nullable; |
| import org.apache.beam.sdk.Pipeline; |
| import org.apache.beam.sdk.annotations.Experimental; |
| import org.apache.beam.sdk.values.KV; |
| import org.apache.beam.sdk.values.PCollection; |
| import org.apache.beam.sdk.values.PCollectionTuple; |
| import org.apache.beam.sdk.values.PInput; |
| import org.apache.beam.sdk.values.POutput; |
| import org.apache.beam.sdk.values.PValue; |
| import org.apache.beam.sdk.values.TupleTag; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; |
| |
| /** |
| * A collection of utilities for writing transforms that can handle exceptions raised during |
| * processing of elements. |
| * |
| * <p>Consuming transforms such as {@link MapElements.MapWithFailures} follow the general pattern of |
| * taking in a user-defined exception handler of type {@code |
| * ProcessFunction<ExceptionElement<InputT>, FailureOutputT>} where the input {@link |
| * ExceptionElement} contains an exception along with the input element that was being processed |
| * when the exception was raised. This handler is responsible for producing some output element that |
| * captures relevant details of the failure and can be encoded as part of a failure output {@link |
| * PCollection}. Transforms can then package together their output and failure collections in a |
| * {@link WithFailures.Result} that avoids users needing to interact with {@code TupleTag}s and |
| * indexing into a {@link PCollectionTuple}. |
| * |
| * <p>Exception handlers can narrow their scope by rethrowing the passed {@link |
| * ExceptionElement#exception()} and catching only specific subclasses of {@code Exception}. |
| * Unhandled exceptions will generally bubble up to a top-level {@link |
| * org.apache.beam.sdk.Pipeline.PipelineExecutionException} that halts progress. |
| * |
| * <p>Users can take advantage of {@link Result#failuresTo(List)} for fluent chaining of transforms |
| * that handle exceptions: |
| * |
| * <pre>{@code |
| * PCollection<Integer> input = ... |
| * List<PCollection<Map<String, String>> failureCollections = new ArrayList<>(); |
| * input.apply(MapElements.via(...).exceptionsVia(...)) |
| * .failuresTo(failureCollections) |
| * .apply(MapElements.via(...).exceptionsVia(...)) |
| * .failuresTo(failureCollections); |
| * PCollection<Map<String, String>> failures = PCollectionList.of(failureCollections) |
| * .apply("FlattenFailureCollections", Flatten.pCollections()); |
| * }</pre> |
| */ |
| @Experimental(Experimental.Kind.WITH_EXCEPTIONS) |
| public class WithFailures { |
| |
| /** |
| * The value type passed as input to exception handlers. It wraps an exception together with the |
| * input element that was being processed at the time the exception was raised. |
| * |
| * <p>Exception handlers may want to re-raise the exception and catch only specific subclasses in |
| * order to limit the scope of handled exceptions or access subclass-specific data. |
| */ |
| @AutoValue |
| public abstract static class ExceptionElement<T> { |
| public abstract T element(); |
| |
| public abstract Exception exception(); |
| |
| public static <T> ExceptionElement<T> of(T element, Exception exception) { |
| return new AutoValue_WithFailures_ExceptionElement<>(element, exception); |
| } |
| } |
| |
| /** |
| * A simple handler that extracts information from an exception to a {@code Map<String, String>} |
| * and returns a {@link KV} where the key is the input element that failed processing, and the |
| * value is the map of exception attributes. |
| * |
| * <p>Extends {@link SimpleFunction} so that full type information is captured. Map and {@link KV} |
| * coders are well supported by Beam, so coder inference can be successfully applied if the |
| * consuming transform passes type information to the failure collection's {@link TupleTag}. |
| * |
| * <p>The keys populated in the map are "className", "message", and "stackTrace" of the exception. |
| */ |
| public static class ExceptionAsMapHandler<T> |
| extends SimpleFunction<ExceptionElement<T>, KV<T, Map<String, String>>> { |
| @Override |
| public KV<T, Map<String, String>> apply(ExceptionElement<T> f) { |
| return KV.of( |
| f.element(), |
| ImmutableMap.of( |
| "className", f.exception().getClass().getName(), |
| "message", f.exception().getMessage(), |
| "stackTrace", Arrays.toString(f.exception().getStackTrace()))); |
| } |
| } |
| |
| /** |
| * An intermediate output type for PTransforms that allows an output collection to live alongside |
| * a collection of elements that failed the transform. |
| * |
| * @param <OutputT> Output type |
| * @param <FailureElementT> Element type for the failure {@code PCollection} |
| */ |
| @AutoValue |
| public abstract static class Result<OutputT extends POutput, FailureElementT> |
| implements PInput, POutput { |
| |
| public abstract OutputT output(); |
| |
| @Nullable |
| abstract TupleTag<?> outputTag(); |
| |
| public abstract PCollection<FailureElementT> failures(); |
| |
| abstract TupleTag<FailureElementT> failuresTag(); |
| |
| public static <OutputT extends POutput, FailureElementT> Result<OutputT, FailureElementT> of( |
| OutputT output, PCollection<FailureElementT> failures) { |
| return new AutoValue_WithFailures_Result<>( |
| output, null, failures, new TupleTag<FailureElementT>()); |
| } |
| |
| public static <OutputElementT, FailureElementT> |
| Result<PCollection<OutputElementT>, FailureElementT> of( |
| PCollection<OutputElementT> output, PCollection<FailureElementT> failures) { |
| return new AutoValue_WithFailures_Result<>( |
| output, new TupleTag<OutputElementT>(), failures, new TupleTag<FailureElementT>()); |
| } |
| |
| public static <OutputElementT, FailureElementT> |
| Result<PCollection<OutputElementT>, FailureElementT> of( |
| PCollectionTuple tuple, |
| TupleTag<OutputElementT> outputTag, |
| TupleTag<FailureElementT> failureTag) { |
| return new AutoValue_WithFailures_Result<>( |
| tuple.get(outputTag), outputTag, tuple.get(failureTag), failureTag); |
| } |
| |
| /** Adds the failure collection to the passed list and returns just the output collection. */ |
| public OutputT failuresTo(List<PCollection<FailureElementT>> failureCollections) { |
| failureCollections.add(failures()); |
| return output(); |
| } |
| |
| @Override |
| public Pipeline getPipeline() { |
| return output().getPipeline(); |
| } |
| |
| @Override |
| public Map<TupleTag<?>, PValue> expand() { |
| Map<TupleTag<?>, PValue> values = new HashMap<>(); |
| values.put(failuresTag(), failures()); |
| if (outputTag() != null && output() instanceof PValue) { |
| values.put(outputTag(), (PValue) output()); |
| } |
| return values; |
| } |
| |
| @Override |
| public void finishSpecifyingOutput( |
| String transformName, PInput input, PTransform<?, ?> transform) {} |
| } |
| } |