blob: 09fef37c425b75ec1f6dab14f68ca259db4bd5af [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.transforms;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.transforms.Contextful.Fn;
import org.apache.beam.sdk.transforms.WithFailures.ExceptionElement;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
/** {@code PTransform}s for mapping a simple function over the elements of a {@link PCollection}. */
public class MapElements<InputT, OutputT>
extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
@Nullable private final transient TypeDescriptor<InputT> inputType;
@Nullable private final transient TypeDescriptor<OutputT> outputType;
@Nullable private final transient Object originalFnForDisplayData;
@Nullable private final Contextful<Fn<InputT, OutputT>> fn;
private MapElements(
@Nullable Contextful<Fn<InputT, OutputT>> fn,
@Nullable Object originalFnForDisplayData,
@Nullable TypeDescriptor<InputT> inputType,
TypeDescriptor<OutputT> outputType) {
this.fn = fn;
this.originalFnForDisplayData = originalFnForDisplayData;
this.inputType = inputType;
this.outputType = outputType;
}
/**
* For {@code InferableFunction<InputT, OutputT>} {@code fn}, returns a {@code PTransform} that
* takes an input {@code PCollection<InputT>} and returns a {@code PCollection<OutputT>}
* containing {@code fn.apply(v)} for every element {@code v} in the input.
*
* <p>{@link InferableFunction} has the advantage of providing type descriptor information, but it
* is generally more convenient to specify output type via {@link #into(TypeDescriptor)}, and
* provide the mapping as a lambda expression to {@link #via(ProcessFunction)}.
*
* <p>Example usage:
*
* <pre>{@code
* PCollection<String> words = ...;
* PCollection<Integer> wordsPerLine = words.apply(MapElements.via(
* new InferableFunction<String, Integer>() {
* public Integer apply(String word) throws Exception {
* return word.length();
* }
* }));
* }</pre>
*/
public static <InputT, OutputT> MapElements<InputT, OutputT> via(
final InferableFunction<InputT, OutputT> fn) {
return new MapElements<>(
Contextful.fn(fn), fn, fn.getInputTypeDescriptor(), fn.getOutputTypeDescriptor());
}
/** Binary compatibility adapter for {@link #via(InferableFunction)}. */
public static <InputT, OutputT> MapElements<InputT, OutputT> via(
final SimpleFunction<InputT, OutputT> fn) {
return via((InferableFunction<InputT, OutputT>) fn);
}
/**
* Returns a new {@link MapElements} transform with the given type descriptor for the output type,
* but the mapping function yet to be specified using {@link #via(ProcessFunction)}.
*/
public static <OutputT> MapElements<?, OutputT> into(final TypeDescriptor<OutputT> outputType) {
return new MapElements<>(null, null, null, outputType);
}
/**
* For a {@code ProcessFunction<InputT, OutputT>} {@code fn} and output type descriptor, returns a
* {@code PTransform} that takes an input {@code PCollection<InputT>} and returns a {@code
* PCollection<OutputT>} containing {@code fn.apply(v)} for every element {@code v} in the input.
*
* <p>Example usage:
*
* <pre>{@code
* PCollection<Integer> wordLengths = words.apply(
* MapElements.into(TypeDescriptors.integers())
* .via((String word) -> word.length()));
* }</pre>
*/
public <NewInputT> MapElements<NewInputT, OutputT> via(ProcessFunction<NewInputT, OutputT> fn) {
return new MapElements<>(Contextful.fn(fn), fn, TypeDescriptors.inputOf(fn), outputType);
}
/** Binary compatibility adapter for {@link #via(ProcessFunction)}. */
public <NewInputT> MapElements<NewInputT, OutputT> via(
SerializableFunction<NewInputT, OutputT> fn) {
return via((ProcessFunction<NewInputT, OutputT>) fn);
}
/** Like {@link #via(ProcessFunction)}, but supports access to context, such as side inputs. */
@Experimental(Kind.CONTEXTFUL)
public <NewInputT> MapElements<NewInputT, OutputT> via(Contextful<Fn<NewInputT, OutputT>> fn) {
return new MapElements<>(
fn, fn.getClosure(), TypeDescriptors.inputOf(fn.getClosure()), outputType);
}
@Override
public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
checkNotNull(fn, "Must specify a function on MapElements using .via()");
return input.apply(
"Map",
ParDo.of(
new DoFn<InputT, OutputT>() {
@ProcessElement
public void processElement(
@Element InputT element, OutputReceiver<OutputT> receiver, ProcessContext c)
throws Exception {
receiver.output(
fn.getClosure().apply(element, Fn.Context.wrapProcessContext(c)));
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder.delegate(MapElements.this);
}
@Override
public TypeDescriptor<InputT> getInputTypeDescriptor() {
return inputType;
}
@Override
public TypeDescriptor<OutputT> getOutputTypeDescriptor() {
checkState(
outputType != null,
"%s output type descriptor was null; "
+ "this probably means that getOutputTypeDescriptor() was called after "
+ "serialization/deserialization, but it is only available prior to "
+ "serialization, for constructing a pipeline and inferring coders",
MapElements.class.getSimpleName());
return outputType;
}
})
.withSideInputs(fn.getRequirements().getSideInputs()));
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder.add(DisplayData.item("class", originalFnForDisplayData.getClass()));
if (originalFnForDisplayData instanceof HasDisplayData) {
builder.include("fn", (HasDisplayData) originalFnForDisplayData);
}
}
/**
* Returns a new {@link MapWithFailures} transform that catches exceptions raised while mapping
* elements, with the given type descriptor used for the failure collection but the exception
* handler yet to be specified using {@link MapWithFailures#exceptionsVia(ProcessFunction)}.
*
* <p>See {@link WithFailures} documentation for usage patterns of the returned {@link
* WithFailures.Result}.
*/
@Experimental(Experimental.Kind.WITH_EXCEPTIONS)
public <NewFailureT> MapWithFailures<InputT, OutputT, NewFailureT> exceptionsInto(
TypeDescriptor<NewFailureT> failureTypeDescriptor) {
return new MapWithFailures<>(
fn, originalFnForDisplayData, inputType, outputType, null, failureTypeDescriptor);
}
/**
* Returns a new {@link MapWithFailures} transform that catches exceptions raised while mapping
* elements, passing the raised exception instance and the input element being processed through
* the given {@code exceptionHandler} and emitting the result to a failure collection.
*
* <p>This method takes advantage of the type information provided by {@link InferableFunction},
* meaning that a call to {@link #exceptionsInto(TypeDescriptor)} may not be necessary.
*
* <p>See {@link WithFailures} documentation for usage patterns of the returned {@link
* WithFailures.Result}.
*
* <p>Example usage:
*
* <pre>{@code
* Result<PCollection<String>, String>> result = words.apply(
* MapElements
* .into(TypeDescriptors.integers())
* .via((String word) -> 1 / word.length) // Could throw ArithmeticException
* .exceptionsVia(new WithFailures.ExceptionAsMapHandler<String>() {}));
* PCollection<Integer> output = result.output();
* PCollection<String> failures = result.failures();
* }</pre>
*/
@Experimental(Experimental.Kind.WITH_EXCEPTIONS)
public <FailureT> MapWithFailures<InputT, OutputT, FailureT> exceptionsVia(
InferableFunction<ExceptionElement<InputT>, FailureT> exceptionHandler) {
return new MapWithFailures<>(
fn,
originalFnForDisplayData,
inputType,
outputType,
exceptionHandler,
exceptionHandler.getOutputTypeDescriptor());
}
/** A {@code PTransform} that adds exception handling to {@link MapElements}. */
@Experimental(Kind.WITH_EXCEPTIONS)
public static class MapWithFailures<InputT, OutputT, FailureT>
extends PTransform<PCollection<InputT>, WithFailures.Result<PCollection<OutputT>, FailureT>> {
private final transient TypeDescriptor<InputT> inputType;
private final transient TypeDescriptor<OutputT> outputType;
@Nullable private final transient TypeDescriptor<FailureT> failureType;
private final transient Object originalFnForDisplayData;
private final Contextful<Fn<InputT, OutputT>> fn;
@Nullable private final ProcessFunction<ExceptionElement<InputT>, FailureT> exceptionHandler;
MapWithFailures(
Contextful<Fn<InputT, OutputT>> fn,
Object originalFnForDisplayData,
TypeDescriptor<InputT> inputType,
TypeDescriptor<OutputT> outputType,
@Nullable ProcessFunction<ExceptionElement<InputT>, FailureT> exceptionHandler,
@Nullable TypeDescriptor<FailureT> failureType) {
this.fn = fn;
this.originalFnForDisplayData = originalFnForDisplayData;
this.inputType = inputType;
this.outputType = outputType;
this.exceptionHandler = exceptionHandler;
this.failureType = failureType;
}
/**
* Returns a {@code PTransform} that catches exceptions raised while mapping elements, passing
* the raised exception instance and the input element being processed through the given {@code
* exceptionHandler} and emitting the result to a failure collection.
*
* <p>Example usage:
*
* <pre>{@code
* Result<PCollection<Integer>, String> result = words.apply(
* MapElements
* .into(TypeDescriptors.integers())
* .via((String word) -> 1 / word.length()) // Could throw ArithmeticException
* .exceptionsInto(TypeDescriptors.strings())
* .exceptionsVia((ExceptionElement<String> ee) -> ee.exception().getMessage()));
* PCollection<Integer> output = result.output();
* PCollection<String> failures = result.failures();
* }</pre>
*/
public MapWithFailures<InputT, OutputT, FailureT> exceptionsVia(
ProcessFunction<ExceptionElement<InputT>, FailureT> exceptionHandler) {
return new MapWithFailures<>(
fn, originalFnForDisplayData, inputType, outputType, exceptionHandler, failureType);
}
@Override
public WithFailures.Result<PCollection<OutputT>, FailureT> expand(PCollection<InputT> input) {
checkArgument(exceptionHandler != null, ".exceptionsVia() is required");
MapFn doFn = new MapFn();
PCollectionTuple tuple =
input.apply(
MapWithFailures.class.getSimpleName(),
ParDo.of(doFn)
.withOutputTags(doFn.outputTag, TupleTagList.of(doFn.failureTag))
.withSideInputs(this.fn.getRequirements().getSideInputs()));
return WithFailures.Result.of(tuple, doFn.outputTag, doFn.failureTag);
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder.add(DisplayData.item("class", originalFnForDisplayData.getClass()));
if (originalFnForDisplayData instanceof HasDisplayData) {
builder.include("fn", (HasDisplayData) originalFnForDisplayData);
}
builder.add(DisplayData.item("exceptionHandler.class", exceptionHandler.getClass()));
if (exceptionHandler instanceof HasDisplayData) {
builder.include("exceptionHandler", (HasDisplayData) exceptionHandler);
}
}
/** A concrete TupleTag that allows coder inference based on failureType. */
private class FailureTag extends TupleTag<FailureT> {
@Override
public TypeDescriptor<FailureT> getTypeDescriptor() {
return failureType;
}
}
/** A DoFn implementation that handles exceptions and outputs a secondary failure collection. */
private class MapFn extends DoFn<InputT, OutputT> {
final TupleTag<OutputT> outputTag = new TupleTag<OutputT>() {};
final TupleTag<FailureT> failureTag = new FailureTag();
@ProcessElement
public void processElement(@Element InputT element, MultiOutputReceiver r, ProcessContext c)
throws Exception {
boolean exceptionWasThrown = false;
OutputT result = null;
try {
result = fn.getClosure().apply(c.element(), Fn.Context.wrapProcessContext(c));
} catch (Exception e) {
exceptionWasThrown = true;
ExceptionElement<InputT> exceptionElement = ExceptionElement.of(element, e);
r.get(failureTag).output(exceptionHandler.apply(exceptionElement));
}
// We make sure our output occurs outside the try block, since runners may implement
// fusion by having output() directly call the body of another DoFn, potentially catching
// exceptions unrelated to this transform.
if (!exceptionWasThrown) {
r.get(outputTag).output(result);
}
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder.delegate(MapWithFailures.this);
}
@Override
public TypeDescriptor<InputT> getInputTypeDescriptor() {
return inputType;
}
@Override
public TypeDescriptor<OutputT> getOutputTypeDescriptor() {
checkState(
outputType != null,
"%s output type descriptor was null; "
+ "this probably means that getOutputTypeDescriptor() was called after "
+ "serialization/deserialization, but it is only available prior to "
+ "serialization, for constructing a pipeline and inferring coders",
MapWithFailures.class.getSimpleName());
return outputType;
}
}
}
}