blob: 1876b37e5981440c513cc2f6a0b03dc300ea7c12 [file] [log] [blame]
package org.example;
import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.checkerframework.checker.nullness.qual.NonNull;
@AutoService(SchemaTransformProvider.class)
public class SkeletonSchemaProvider
extends TypedSchemaTransformProvider<SkeletonSchemaProvider.Configuration> {
private static final String INPUT_ROWS_TAG = "input";
private static final String OUTPUT_ROWS_TAG = "output";
@Override
protected @NonNull Class<Configuration> configurationClass() {
return Configuration.class;
}
@Override
public @NonNull String identifier() {
return "some:urn:transform_name:v1";
}
@Override
public @NonNull String description() {
return "An example transform description.";
}
@Override
public @NonNull List<String> inputCollectionNames() {
return Collections.singletonList(INPUT_ROWS_TAG);
}
@Override
public @NonNull List<String> outputCollectionNames() {
return Collections.singletonList(OUTPUT_ROWS_TAG);
}
@Override
protected @NonNull SchemaTransform from(Configuration configuration) {
return new Identity();
}
protected static class Identity extends SchemaTransform {
private static class IdentityDoFn extends DoFn<Row, Row> {
@ProcessElement
public void processElement(@Element Row element, OutputReceiver<Row> out) {
out.output(Row.fromRow(element).build());
}
}
@Override
public @NonNull PCollectionRowTuple expand(PCollectionRowTuple input) {
// Get input rows
PCollection<Row> inputRows = input.get(INPUT_ROWS_TAG);
// Apply the PTransform
PCollection<Row> outputRows =
inputRows
.apply("Identity", ParDo.of(new IdentityDoFn()))
.setRowSchema(inputRows.getSchema());
// Construct output collection and tag successful records
return PCollectionRowTuple.of(OUTPUT_ROWS_TAG, outputRows);
}
}
@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract static class Configuration implements Serializable {
public static Builder builder() {
return new AutoValue_SkeletonSchemaProvider_Configuration.Builder();
}
@AutoValue.Builder
public abstract static class Builder {
public abstract Configuration build();
}
}
}