blob: 408bcc5f0d351191280e589dd000f888d0b32dc9 [file] [log] [blame]
package org.example;
import static com.google.common.base.Preconditions.checkArgument;
import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.checkerframework.checker.nullness.qual.NonNull;
@AutoService(SchemaTransformProvider.class)
public class ToUpperCaseTransformProvider
extends TypedSchemaTransformProvider<ToUpperCaseTransformProvider.Configuration> {
protected static final String INPUT_ROWS_TAG = "input";
protected static final String OUTPUT_ROWS_TAG = "output";
@Override
protected @NonNull Class<Configuration> configurationClass() {
return Configuration.class;
}
@Override
protected @NonNull SchemaTransform from(Configuration configuration) {
return new ToUpperCaseTransform(configuration);
}
@Override
public @NonNull String identifier() {
return "some:urn:to_upper_case:v1";
}
@Override
public @NonNull String description() {
return "Modifies a given field in an element by applying an uppercase function to the field."
+ "\n\n"
+ "This expects a single PCollection of Beam Rows with each row containing at least one"
+ "field which is named the same as the input field parameter."
+ "\n\n"
+ "The field cannot be defined as 'metadata'.";
}
@Override
public @NonNull List<String> inputCollectionNames() {
return Collections.singletonList(INPUT_ROWS_TAG);
}
@Override
public @NonNull List<String> outputCollectionNames() {
return Collections.singletonList(OUTPUT_ROWS_TAG);
}
@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract static class Configuration implements Serializable {
@SchemaFieldDescription(
"The field in the input collection to perform the uppercase operation on. "
+ "This field must be a String.")
public abstract String getField();
@Nullable
public abstract ErrorHandling getErrorHandling();
public void validate() {
checkArgument(!"metadata".equals(getField()), "Cannot modify field 'metadata'");
}
public static Builder builder() {
return new AutoValue_ToUpperCaseTransformProvider_Configuration.Builder();
}
@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setField(String field);
public abstract Builder setErrorHandling(ErrorHandling errorHandling);
public abstract Configuration build();
}
}
protected static class ToUpperCaseTransform extends SchemaTransform {
private static final TupleTag<Row> successValues = new TupleTag<Row>() {};
private static final TupleTag<Row> errorValues = new TupleTag<Row>() {};
private final Configuration configuration;
private ToUpperCaseTransform(Configuration configuration) {
// Validate the transform config before expansion
configuration.validate();
this.configuration = configuration;
}
private static DoFn<Row, Row> createDoFn(String field, boolean handleErrors, Schema errorSchema) {
return new DoFn<Row, Row>() {
@ProcessElement
public void processElement(@Element Row inputRow, MultiOutputReceiver out) {
try {
// Apply toUpperCase() to given field and tag successful records
Row output =
Row.fromRow(inputRow)
.withFieldValue(
field, Objects.requireNonNull(inputRow.getString(field)).toUpperCase())
.build();
out.get(successValues).output(output);
} catch (Exception e) {
if (handleErrors) {
// Catch any errors and tag with error tag if error_handling is specified
out.get(errorValues).output(ErrorHandling.errorRecord(errorSchema, inputRow, e));
} else {
// Throw error if error_handling was not specified
throw new RuntimeException(e);
}
}
}
};
}
@Override
public @NonNull PCollectionRowTuple expand(@NonNull PCollectionRowTuple input) {
// Get input rows
PCollection<Row> inputRows = input.get(INPUT_ROWS_TAG);
// Get input row schema and construct error schema from it
Schema inputSchema = inputRows.getSchema();
Schema errorSchema = ErrorHandling.errorSchema(inputSchema);
// Determine if error_handling was specified
boolean handleErrors = ErrorHandling.hasOutput(configuration.getErrorHandling());
// Apply the PTransform
PCollectionTuple output =
inputRows.apply(
"ToUpperCase",
ParDo.of(createDoFn(configuration.getField(), handleErrors, errorSchema))
.withOutputTags(successValues, TupleTagList.of(errorValues)));
// Set the schemas for successful records and error records.
// This is needed so runner can translate the element schema across SDK's
output.get(successValues).setRowSchema(inputSchema);
output.get(errorValues).setRowSchema(errorSchema);
// Construct output collection and tag successful records
PCollectionRowTuple result =
PCollectionRowTuple.of(OUTPUT_ROWS_TAG, output.get(successValues));
if (handleErrors) {
// Add tagged error records to output collection if error_handling was specified
result =
result.and(
Objects.requireNonNull(configuration.getErrorHandling()).getOutput(),
output.get(errorValues));
}
return result;
}
}
}