| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.io; |
| |
| import static org.apache.beam.sdk.io.FileIO.ReadMatches.DirectoryTreatment; |
| import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument; |
| import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull; |
| |
| import com.google.auto.value.AutoValue; |
| import java.io.IOException; |
| import java.io.Serializable; |
| import java.nio.channels.Channels; |
| import java.nio.channels.WritableByteChannel; |
| import java.util.Map; |
| import javax.annotation.Nullable; |
| import org.apache.avro.Schema; |
| import org.apache.avro.file.CodecFactory; |
| import org.apache.avro.file.DataFileWriter; |
| import org.apache.avro.generic.GenericDatumWriter; |
| import org.apache.avro.generic.GenericRecord; |
| import org.apache.avro.reflect.ReflectData; |
| import org.apache.avro.reflect.ReflectDatumWriter; |
| import org.apache.beam.sdk.annotations.Experimental; |
| import org.apache.beam.sdk.annotations.Experimental.Kind; |
| import org.apache.beam.sdk.coders.AvroCoder; |
| import org.apache.beam.sdk.coders.CannotProvideCoderException; |
| import org.apache.beam.sdk.coders.Coder; |
| import org.apache.beam.sdk.coders.CoderRegistry; |
| import org.apache.beam.sdk.coders.StringUtf8Coder; |
| import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; |
| import org.apache.beam.sdk.io.FileIO.MatchConfiguration; |
| import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; |
| import org.apache.beam.sdk.io.fs.ResourceId; |
| import org.apache.beam.sdk.options.ValueProvider; |
| import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; |
| import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; |
| import org.apache.beam.sdk.transforms.Create; |
| import org.apache.beam.sdk.transforms.PTransform; |
| import org.apache.beam.sdk.transforms.SerializableFunction; |
| import org.apache.beam.sdk.transforms.SerializableFunctions; |
| import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition; |
| import org.apache.beam.sdk.transforms.display.DisplayData; |
| import org.apache.beam.sdk.values.PBegin; |
| import org.apache.beam.sdk.values.PCollection; |
| import org.apache.beam.sdk.values.PDone; |
| import org.apache.beam.sdk.values.TypeDescriptors; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Supplier; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps; |
| import org.joda.time.Duration; |
| |
| /** |
| * {@link PTransform}s for reading and writing Avro files. |
| * |
| * <h2>Reading Avro files</h2> |
| * |
| * <p>To read a {@link PCollection} from one or more Avro files with the same schema known at |
| * pipeline construction time, use {@link #read}, using {@link AvroIO.Read#from} to specify the |
| * filename or filepattern to read from. If the filepatterns to be read are themselves in a {@link |
| * PCollection} you can use {@link FileIO} to match them and {@link TextIO#readFiles} to read them. |
| * If the schema is unknown at pipeline construction time, use {@link #parseGenericRecords} or |
| * {@link #parseFilesGenericRecords}. |
| * |
| * <p>Many configuration options below apply to several or all of these transforms. |
| * |
| * <p>See {@link FileSystems} for information on supported file systems and filepatterns. |
| * |
| * <h3>Filepattern expansion and watching</h3> |
| * |
| * <p>By default, the filepatterns are expanded only once. {@link Read#watchForNewFiles} or the |
| * combination of {@link FileIO.Match#continuously(Duration, TerminationCondition)} and {@link |
| * AvroIO#readFiles(Class)} allow streaming of new files matching the filepattern(s). |
| * |
| * <p>By default, {@link #read} prohibits filepatterns that match no files, and {@link |
| * AvroIO#readFiles(Class)} allows them in case the filepattern contains a glob wildcard character. |
| * Use {@link Read#withEmptyMatchTreatment} or {@link |
| * FileIO.Match#withEmptyMatchTreatment(EmptyMatchTreatment)} plus {@link AvroIO#readFiles(Class)} |
| * to configure this behavior. |
| * |
| * <h3>Reading records of a known schema</h3> |
| * |
| * <p>To read specific records, such as Avro-generated classes, use {@link #read(Class)}. To read |
| * {@link GenericRecord GenericRecords}, use {@link #readGenericRecords(Schema)} which takes a |
| * {@link Schema} object, or {@link #readGenericRecords(String)} which takes an Avro schema in a |
| * JSON-encoded string form. An exception will be thrown if a record doesn't match the specified |
| * schema. Likewise, to read a {@link PCollection} of filepatterns, apply {@link FileIO} matching |
| * plus {@link #readFilesGenericRecords}. |
| * |
| * <p>For example: |
| * |
| * <pre>{@code |
| * Pipeline p = ...; |
| * |
| * // Read Avro-generated classes from files on GCS |
| * PCollection<AvroAutoGenClass> records = |
| * p.apply(AvroIO.read(AvroAutoGenClass.class).from("gs://my_bucket/path/to/records-*.avro")); |
| * |
| * // Read GenericRecord's of the given schema from files on GCS |
| * Schema schema = new Schema.Parser().parse(new File("schema.avsc")); |
| * PCollection<GenericRecord> records = |
| * p.apply(AvroIO.readGenericRecords(schema) |
| * .from("gs://my_bucket/path/to/records-*.avro")); |
| * }</pre> |
| * |
| * <h3>Reading records of an unknown schema</h3> |
| * |
| * <p>To read records from files whose schema is unknown at pipeline construction time or differs |
| * between files, use {@link #parseGenericRecords} - in this case, you will need to specify a |
| * parsing function for converting each {@link GenericRecord} into a value of your custom type. |
| * Likewise, to read a {@link PCollection} of filepatterns with unknown schema, use {@link FileIO} |
| * matching plus {@link #parseFilesGenericRecords(SerializableFunction)}. |
| * |
| * <p>For example: |
| * |
| * <pre>{@code |
| * Pipeline p = ...; |
| * |
| * PCollection<Foo> records = |
| * p.apply(AvroIO.parseGenericRecords(new SerializableFunction<GenericRecord, Foo>() { |
| * public Foo apply(GenericRecord record) { |
| * // If needed, access the schema of the record using record.getSchema() |
| * return ...; |
| * } |
| * })); |
| * }</pre> |
| * |
| * <h3>Reading from a {@link PCollection} of filepatterns</h3> |
| * |
| * <pre>{@code |
| * Pipeline p = ...; |
| * |
| * PCollection<String> filepatterns = p.apply(...); |
| * PCollection<AvroAutoGenClass> records = |
| * filepatterns.apply(AvroIO.read(AvroAutoGenClass.class)); |
| * PCollection<AvroAutoGenClass> records = |
| * filepatterns |
| * .apply(FileIO.matchAll()) |
| * .apply(FileIO.readMatches()) |
| * .apply(AvroIO.readFiles(AvroAutoGenClass.class)); |
| * PCollection<GenericRecord> genericRecords = |
| * filepatterns.apply(AvroIO.readGenericRecords(schema)); |
| * PCollection<Foo> records = |
| * filepatterns |
| * .apply(FileIO.matchAll()) |
| * .apply(FileIO.readMatches()) |
| * .apply(AvroIO.parseFilesGenericRecords(new SerializableFunction...); |
| * }</pre> |
| * |
| * <h3>Streaming new files matching a filepattern</h3> |
| * |
| * <pre>{@code |
| * Pipeline p = ...; |
| * |
| * PCollection<AvroAutoGenClass> lines = p.apply(AvroIO |
| * .read(AvroAutoGenClass.class) |
| * .from("gs://my_bucket/path/to/records-*.avro") |
| * .watchForNewFiles( |
| * // Check for new files every minute |
| * Duration.standardMinutes(1), |
| * // Stop watching the filepattern if no new files appear within an hour |
| * afterTimeSinceNewOutput(Duration.standardHours(1)))); |
| * }</pre> |
| * |
| * <h3>Reading a very large number of files</h3> |
| * |
| * <p>If it is known that the filepattern will match a very large number of files (e.g. tens of |
| * thousands or more), use {@link Read#withHintMatchesManyFiles} for better performance and |
| * scalability. Note that it may decrease performance if the filepattern matches only a small number |
| * of files. |
| * |
| * <h2>Writing Avro files</h2> |
| * |
| * <p>To write a {@link PCollection} to one or more Avro files, use {@link AvroIO.Write}, using |
| * {@code AvroIO.write().to(String)} to specify the output filename prefix. The default {@link |
| * DefaultFilenamePolicy} will use this prefix, in conjunction with a {@link ShardNameTemplate} (set |
| * via {@link Write#withShardNameTemplate(String)}) and optional filename suffix (set via {@link |
| * Write#withSuffix(String)}, to generate output filenames in a sharded way. You can override this |
| * default write filename policy using {@link Write#to(FileBasedSink.FilenamePolicy)} to specify a |
| * custom file naming policy. |
| * |
| * <p>By default, {@link AvroIO.Write} produces output files that are compressed using the {@link |
| * org.apache.avro.file.Codec CodecFactory.snappyCodec()}. This default can be changed or overridden |
| * using {@link AvroIO.Write#withCodec}. |
| * |
| * <h3>Writing specific or generic records</h3> |
| * |
| * <p>To write specific records, such as Avro-generated classes, use {@link #write(Class)}. To write |
| * {@link GenericRecord GenericRecords}, use either {@link #writeGenericRecords(Schema)} which takes |
| * a {@link Schema} object, or {@link #writeGenericRecords(String)} which takes a schema in a |
| * JSON-encoded string form. An exception will be thrown if a record doesn't match the specified |
| * schema. |
| * |
| * <p>For example: |
| * |
| * <pre>{@code |
| * // A simple Write to a local file (only runs locally): |
| * PCollection<AvroAutoGenClass> records = ...; |
| * records.apply(AvroIO.write(AvroAutoGenClass.class).to("/path/to/file.avro")); |
| * |
| * // A Write to a sharded GCS file (runs locally and using remote execution): |
| * Schema schema = new Schema.Parser().parse(new File("schema.avsc")); |
| * PCollection<GenericRecord> records = ...; |
| * records.apply("WriteToAvro", AvroIO.writeGenericRecords(schema) |
| * .to("gs://my_bucket/path/to/numbers") |
| * .withSuffix(".avro")); |
| * }</pre> |
| * |
| * <h3>Writing windowed or unbounded data</h3> |
| * |
| * <p>By default, all input is put into the global window before writing. If per-window writes are |
| * desired - for example, when using a streaming runner - {@link AvroIO.Write#withWindowedWrites()} |
| * will cause windowing and triggering to be preserved. When producing windowed writes with a |
| * streaming runner that supports triggers, the number of output shards must be set explicitly using |
| * {@link AvroIO.Write#withNumShards(int)}; some runners may set this for you to a runner-chosen |
| * value, so you may need not set it yourself. A {@link FileBasedSink.FilenamePolicy} must be set, |
| * and unique windows and triggers must produce unique filenames. |
| * |
| * <h3>Writing data to multiple destinations</h3> |
| * |
| * <p>The following shows a more-complex example of AvroIO.Write usage, generating dynamic file |
| * destinations as well as a dynamic Avro schema per file. In this example, a PCollection of user |
| * events (e.g. actions on a website) is written out to Avro files. Each event contains the user id |
| * as an integer field. We want events for each user to go into a specific directory for that user, |
| * and each user's data should be written with a specific schema for that user; a side input is |
| * used, so the schema can be calculated in a different stage. |
| * |
| * <pre>{@code |
| * // This is the user class that controls dynamic destinations for this avro write. The input to |
| * // AvroIO.Write will be UserEvent, and we will be writing GenericRecords to the file (in order |
| * // to have dynamic schemas). Everything is per userid, so we define a dynamic destination type |
| * // of Integer. |
| * class UserDynamicAvroDestinations |
| * extends DynamicAvroDestinations<UserEvent, Integer, GenericRecord> { |
| * private final PCollectionView<Map<Integer, String>> userToSchemaMap; |
| * public UserDynamicAvroDestinations( PCollectionView<Map<Integer, String>> userToSchemaMap) { |
| * this.userToSchemaMap = userToSchemaMap; |
| * } |
| * public GenericRecord formatRecord(UserEvent record) { |
| * return formatUserRecord(record, getSchema(record.getUserId())); |
| * } |
| * public Schema getSchema(Integer userId) { |
| * return new Schema.Parser().parse(sideInput(userToSchemaMap).get(userId)); |
| * } |
| * public Integer getDestination(UserEvent record) { |
| * return record.getUserId(); |
| * } |
| * public Integer getDefaultDestination() { |
| * return 0; |
| * } |
| * public FilenamePolicy getFilenamePolicy(Integer userId) { |
| * return DefaultFilenamePolicy.fromParams(new Params().withBaseFilename(baseDir + "/user-" |
| * + userId + "/events")); |
| * } |
| * public List<PCollectionView<?>> getSideInputs() { |
| * return ImmutableList.<PCollectionView<?>>of(userToSchemaMap); |
| * } |
| * } |
| * PCollection<UserEvents> events = ...; |
| * PCollectionView<Map<Integer, String>> userToSchemaMap = events.apply( |
| * "ComputePerUserSchemas", new ComputePerUserSchemas()); |
| * events.apply("WriteAvros", AvroIO.<Integer>writeCustomTypeToGenericRecords() |
| * .to(new UserDynamicAvroDestinations(userToSchemaMap))); |
| * }</pre> |
| */ |
| public class AvroIO { |
| /** |
| * Reads records of the given type from an Avro file (or multiple Avro files matching a pattern). |
| * |
| * <p>The schema must be specified using one of the {@code withSchema} functions. |
| */ |
| public static <T> Read<T> read(Class<T> recordClass) { |
| return new AutoValue_AvroIO_Read.Builder<T>() |
| .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW)) |
| .setRecordClass(recordClass) |
| .setSchema(ReflectData.get().getSchema(recordClass)) |
| .setInferBeamSchema(false) |
| .setHintMatchesManyFiles(false) |
| .build(); |
| } |
| |
| /** |
| * Like {@link #read}, but reads each file in a {@link PCollection} of {@link |
| * FileIO.ReadableFile}, returned by {@link FileIO#readMatches}. |
| * |
| * <p>You can read {@link GenericRecord} by using {@code #readFiles(GenericRecord.class)} or |
| * {@code #readFiles(new Schema.Parser().parse(schema))} if the schema is a String. |
| */ |
| public static <T> ReadFiles<T> readFiles(Class<T> recordClass) { |
| return new AutoValue_AvroIO_ReadFiles.Builder<T>() |
| .setRecordClass(recordClass) |
| .setSchema(ReflectData.get().getSchema(recordClass)) |
| .setInferBeamSchema(false) |
| .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES) |
| .build(); |
| } |
| |
| /** |
| * Like {@link #read}, but reads each filepattern in the input {@link PCollection}. |
| * |
| * @deprecated You can achieve The functionality of {@link #readAll} using {@link FileIO} matching |
| * plus {@link #readFiles(Class)}. This is the preferred method to make composition explicit. |
| * {@link ReadAll} will not receive upgrades and will be removed in a future version of Beam. |
| */ |
| @Deprecated |
| public static <T> ReadAll<T> readAll(Class<T> recordClass) { |
| return new AutoValue_AvroIO_ReadAll.Builder<T>() |
| .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD)) |
| .setRecordClass(recordClass) |
| .setSchema(ReflectData.get().getSchema(recordClass)) |
| .setInferBeamSchema(false) |
| .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES) |
| .build(); |
| } |
| |
| /** Reads Avro file(s) containing records of the specified schema. */ |
| public static Read<GenericRecord> readGenericRecords(Schema schema) { |
| return new AutoValue_AvroIO_Read.Builder<GenericRecord>() |
| .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW)) |
| .setRecordClass(GenericRecord.class) |
| .setSchema(schema) |
| .setInferBeamSchema(false) |
| .setHintMatchesManyFiles(false) |
| .build(); |
| } |
| |
| /** |
| * Like {@link #readGenericRecords(Schema)}, but for a {@link PCollection} of {@link |
| * FileIO.ReadableFile}, for example, returned by {@link FileIO#readMatches}. |
| */ |
| public static ReadFiles<GenericRecord> readFilesGenericRecords(Schema schema) { |
| return new AutoValue_AvroIO_ReadFiles.Builder<GenericRecord>() |
| .setRecordClass(GenericRecord.class) |
| .setSchema(schema) |
| .setInferBeamSchema(false) |
| .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES) |
| .build(); |
| } |
| |
| /** |
| * Like {@link #readGenericRecords(Schema)}, but for a {@link PCollection} of {@link |
| * FileIO.ReadableFile}, for example, returned by {@link FileIO#readMatches}. |
| * |
| * @deprecated You can achieve The functionality of {@link #readAllGenericRecords(Schema)} using |
| * {@link FileIO} matching plus {@link #readGenericRecords(Schema)}. This is the preferred |
| * method to make composition explicit. {@link ReadAll} will not receive upgrades and will be |
| * removed in a future version of Beam. |
| */ |
| @Deprecated |
| public static ReadAll<GenericRecord> readAllGenericRecords(Schema schema) { |
| return new AutoValue_AvroIO_ReadAll.Builder<GenericRecord>() |
| .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD)) |
| .setRecordClass(GenericRecord.class) |
| .setSchema(schema) |
| .setInferBeamSchema(false) |
| .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES) |
| .build(); |
| } |
| |
| /** |
| * Reads Avro file(s) containing records of the specified schema. The schema is specified as a |
| * JSON-encoded string. |
| */ |
| public static Read<GenericRecord> readGenericRecords(String schema) { |
| return readGenericRecords(new Schema.Parser().parse(schema)); |
| } |
| |
| /** Like {@link #readGenericRecords(String)}, but for {@link FileIO.ReadableFile} collections. */ |
| public static ReadFiles<GenericRecord> readFilesGenericRecords(String schema) { |
| return readFilesGenericRecords(new Schema.Parser().parse(schema)); |
| } |
| |
| /** |
| * Like {@link #readGenericRecords(String)}, but reads each filepattern in the input {@link |
| * PCollection}. |
| * |
| * @deprecated You can achieve The functionality of {@link #readAllGenericRecords(String)} using |
| * {@link FileIO} matching plus {@link #readGenericRecords(String)}. This is the preferred |
| * method to make composition explicit. {@link ReadAll} will not receive upgrades and will be |
| * removed in a future version of Beam. |
| */ |
| @Deprecated |
| public static ReadAll<GenericRecord> readAllGenericRecords(String schema) { |
| return readAllGenericRecords(new Schema.Parser().parse(schema)); |
| } |
| |
| /** |
| * Reads Avro file(s) containing records of an unspecified schema and converting each record to a |
| * custom type. |
| */ |
| public static <T> Parse<T> parseGenericRecords(SerializableFunction<GenericRecord, T> parseFn) { |
| return new AutoValue_AvroIO_Parse.Builder<T>() |
| .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW)) |
| .setParseFn(parseFn) |
| .setHintMatchesManyFiles(false) |
| .build(); |
| } |
| |
| /** |
| * Like {@link #parseGenericRecords(SerializableFunction)}, but reads each {@link |
| * FileIO.ReadableFile} in the input {@link PCollection}. |
| */ |
| public static <T> ParseFiles<T> parseFilesGenericRecords( |
| SerializableFunction<GenericRecord, T> parseFn) { |
| return new AutoValue_AvroIO_ParseFiles.Builder<T>() |
| .setParseFn(parseFn) |
| .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES) |
| .build(); |
| } |
| |
| /** |
| * Like {@link #parseGenericRecords(SerializableFunction)}, but reads each filepattern in the |
| * input {@link PCollection}. |
| * |
| * @deprecated You can achieve The functionality of {@link |
| * #parseAllGenericRecords(SerializableFunction)} using {@link FileIO} matching plus {@link |
| * #parseFilesGenericRecords(SerializableFunction)} ()}. This is the preferred method to make |
| * composition explicit. {@link ParseAll} will not receive upgrades and will be removed in a |
| * future version of Beam. |
| */ |
| @Deprecated |
| public static <T> ParseAll<T> parseAllGenericRecords( |
| SerializableFunction<GenericRecord, T> parseFn) { |
| return new AutoValue_AvroIO_ParseAll.Builder<T>() |
| .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD)) |
| .setParseFn(parseFn) |
| .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES) |
| .build(); |
| } |
| |
| /** |
| * Writes a {@link PCollection} to an Avro file (or multiple Avro files matching a sharding |
| * pattern). |
| */ |
| public static <T> Write<T> write(Class<T> recordClass) { |
| return new Write<>( |
| AvroIO.<T, T>defaultWriteBuilder() |
| .setGenericRecords(false) |
| .setSchema(ReflectData.get().getSchema(recordClass)) |
| .build()); |
| } |
| |
| /** Writes Avro records of the specified schema. */ |
| public static Write<GenericRecord> writeGenericRecords(Schema schema) { |
| return new Write<>( |
| AvroIO.<GenericRecord, GenericRecord>defaultWriteBuilder() |
| .setGenericRecords(true) |
| .setSchema(schema) |
| .build()); |
| } |
| |
| /** |
| * A {@link PTransform} that writes a {@link PCollection} to an avro file (or multiple avro files |
| * matching a sharding pattern), with each element of the input collection encoded into its own |
| * record of type OutputT. |
| * |
| * <p>This version allows you to apply {@link AvroIO} writes to a PCollection of a custom type |
| * {@link UserT}. A format mechanism that converts the input type {@link UserT} to the output type |
| * that will be written to the file must be specified. If using a custom {@link |
| * DynamicAvroDestinations} object this is done using {@link |
| * DynamicAvroDestinations#formatRecord}, otherwise the {@link |
| * AvroIO.TypedWrite#withFormatFunction} can be used to specify a format function. |
| * |
| * <p>The advantage of using a custom type is that is it allows a user-provided {@link |
| * DynamicAvroDestinations} object, set via {@link AvroIO.Write#to(DynamicAvroDestinations)} to |
| * examine the custom type when choosing a destination. |
| * |
| * <p>If the output type is {@link GenericRecord} use {@link #writeCustomTypeToGenericRecords()} |
| * instead. |
| */ |
| public static <UserT, OutputT> TypedWrite<UserT, Void, OutputT> writeCustomType() { |
| return AvroIO.<UserT, OutputT>defaultWriteBuilder().setGenericRecords(false).build(); |
| } |
| |
| /** |
| * Similar to {@link #writeCustomType()}, but specialized for the case where the output type is |
| * {@link GenericRecord}. A schema must be specified either in {@link |
| * DynamicAvroDestinations#getSchema} or if not using dynamic destinations, by using {@link |
| * TypedWrite#withSchema(Schema)}. |
| */ |
| public static <UserT> TypedWrite<UserT, Void, GenericRecord> writeCustomTypeToGenericRecords() { |
| return AvroIO.<UserT, GenericRecord>defaultWriteBuilder().setGenericRecords(true).build(); |
| } |
| |
| /** |
| * Writes Avro records of the specified schema. The schema is specified as a JSON-encoded string. |
| */ |
| public static Write<GenericRecord> writeGenericRecords(String schema) { |
| return writeGenericRecords(new Schema.Parser().parse(schema)); |
| } |
| |
| private static <UserT, OutputT> TypedWrite.Builder<UserT, Void, OutputT> defaultWriteBuilder() { |
| return new AutoValue_AvroIO_TypedWrite.Builder<UserT, Void, OutputT>() |
| .setFilenameSuffix(null) |
| .setShardTemplate(null) |
| .setNumShards(0) |
| .setCodec(TypedWrite.DEFAULT_SERIALIZABLE_CODEC) |
| .setMetadata(ImmutableMap.of()) |
| .setWindowedWrites(false) |
| .setNoSpilling(false); |
| } |
| |
| private static <T> PCollection<T> setBeamSchema( |
| PCollection<T> pc, Class<T> clazz, @Nullable Schema schema) { |
| org.apache.beam.sdk.schemas.Schema beamSchema = |
| org.apache.beam.sdk.schemas.utils.AvroUtils.getSchema(clazz, schema); |
| if (beamSchema != null) { |
| pc.setSchema( |
| beamSchema, |
| org.apache.beam.sdk.schemas.utils.AvroUtils.getToRowFunction(clazz, schema), |
| org.apache.beam.sdk.schemas.utils.AvroUtils.getFromRowFunction(clazz)); |
| } |
| return pc; |
| } |
| |
| /** |
| * 64MB is a reasonable value that allows to amortize the cost of opening files, but is not so |
| * large as to exhaust a typical runner's maximum amount of output per ProcessElement call. |
| */ |
| private static final long DEFAULT_BUNDLE_SIZE_BYTES = 64 * 1024 * 1024L; |
| |
| /** Implementation of {@link #read} and {@link #readGenericRecords}. */ |
| @AutoValue |
| public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> { |
| @Nullable |
| abstract ValueProvider<String> getFilepattern(); |
| |
| abstract MatchConfiguration getMatchConfiguration(); |
| |
| @Nullable |
| abstract Class<T> getRecordClass(); |
| |
| @Nullable |
| abstract Schema getSchema(); |
| |
| abstract boolean getInferBeamSchema(); |
| |
| abstract boolean getHintMatchesManyFiles(); |
| |
| abstract Builder<T> toBuilder(); |
| |
| @AutoValue.Builder |
| abstract static class Builder<T> { |
| abstract Builder<T> setFilepattern(ValueProvider<String> filepattern); |
| |
| abstract Builder<T> setMatchConfiguration(MatchConfiguration matchConfiguration); |
| |
| abstract Builder<T> setRecordClass(Class<T> recordClass); |
| |
| abstract Builder<T> setSchema(Schema schema); |
| |
| abstract Builder<T> setInferBeamSchema(boolean infer); |
| |
| abstract Builder<T> setHintMatchesManyFiles(boolean hintManyFiles); |
| |
| abstract Read<T> build(); |
| } |
| |
| /** |
| * Reads from the given filename or filepattern. |
| * |
| * <p>If it is known that the filepattern will match a very large number of files (at least tens |
| * of thousands), use {@link #withHintMatchesManyFiles} for better performance and scalability. |
| */ |
| public Read<T> from(ValueProvider<String> filepattern) { |
| return toBuilder().setFilepattern(filepattern).build(); |
| } |
| |
| /** Like {@link #from(ValueProvider)}. */ |
| public Read<T> from(String filepattern) { |
| return from(StaticValueProvider.of(filepattern)); |
| } |
| |
| /** Sets the {@link MatchConfiguration}. */ |
| public Read<T> withMatchConfiguration(MatchConfiguration matchConfiguration) { |
| return toBuilder().setMatchConfiguration(matchConfiguration).build(); |
| } |
| |
| /** Configures whether or not a filepattern matching no files is allowed. */ |
| public Read<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) { |
| return withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment)); |
| } |
| |
| /** |
| * Continuously watches for new files matching the filepattern, polling it at the given |
| * interval, until the given termination condition is reached. The returned {@link PCollection} |
| * is unbounded. |
| * |
| * <p>This works only in runners supporting {@link Kind#SPLITTABLE_DO_FN}. |
| */ |
| @Experimental(Kind.SPLITTABLE_DO_FN) |
| public Read<T> watchForNewFiles( |
| Duration pollInterval, TerminationCondition<String, ?> terminationCondition) { |
| return withMatchConfiguration( |
| getMatchConfiguration().continuously(pollInterval, terminationCondition)); |
| } |
| |
| /** |
| * Hints that the filepattern specified in {@link #from(String)} matches a very large number of |
| * files. |
| * |
| * <p>This hint may cause a runner to execute the transform differently, in a way that improves |
| * performance for this case, but it may worsen performance if the filepattern matches only a |
| * small number of files (e.g., in a runner that supports dynamic work rebalancing, it will |
| * happen less efficiently within individual files). |
| */ |
| public Read<T> withHintMatchesManyFiles() { |
| return toBuilder().setHintMatchesManyFiles(true).build(); |
| } |
| |
| @Experimental(Kind.SCHEMAS) |
| public Read<T> withBeamSchemas(boolean withBeamSchemas) { |
| return toBuilder().setInferBeamSchema(withBeamSchemas).build(); |
| } |
| |
| @Override |
| @SuppressWarnings("unchecked") |
| public PCollection<T> expand(PBegin input) { |
| checkNotNull(getFilepattern(), "filepattern"); |
| checkNotNull(getSchema(), "schema"); |
| |
| if (getMatchConfiguration().getWatchInterval() == null && !getHintMatchesManyFiles()) { |
| PCollection<T> read = |
| input.apply( |
| "Read", |
| org.apache.beam.sdk.io.Read.from( |
| createSource( |
| getFilepattern(), |
| getMatchConfiguration().getEmptyMatchTreatment(), |
| getRecordClass(), |
| getSchema()))); |
| return getInferBeamSchema() ? setBeamSchema(read, getRecordClass(), getSchema()) : read; |
| } |
| |
| // All other cases go through FileIO + ReadFiles |
| ReadFiles<T> readFiles = |
| (getRecordClass() == GenericRecord.class) |
| ? (ReadFiles<T>) readFilesGenericRecords(getSchema()) |
| : readFiles(getRecordClass()); |
| return input |
| .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of())) |
| .apply("Match All", FileIO.matchAll().withConfiguration(getMatchConfiguration())) |
| .apply( |
| "Read Matches", |
| FileIO.readMatches().withDirectoryTreatment(DirectoryTreatment.PROHIBIT)) |
| .apply("Via ReadFiles", readFiles); |
| } |
| |
| @Override |
| public void populateDisplayData(DisplayData.Builder builder) { |
| super.populateDisplayData(builder); |
| builder |
| .add( |
| DisplayData.item("inferBeamSchema", getInferBeamSchema()) |
| .withLabel("Infer Beam Schema")) |
| .addIfNotNull(DisplayData.item("schema", String.valueOf(getSchema()))) |
| .addIfNotNull(DisplayData.item("recordClass", getRecordClass()).withLabel("Record Class")) |
| .addIfNotNull( |
| DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern")) |
| .include("matchConfiguration", getMatchConfiguration()); |
| } |
| |
| @SuppressWarnings("unchecked") |
| private static <T> AvroSource<T> createSource( |
| ValueProvider<String> filepattern, |
| EmptyMatchTreatment emptyMatchTreatment, |
| Class<T> recordClass, |
| Schema schema) { |
| AvroSource<?> source = |
| AvroSource.from(filepattern).withEmptyMatchTreatment(emptyMatchTreatment); |
| return recordClass == GenericRecord.class |
| ? (AvroSource<T>) source.withSchema(schema) |
| : source.withSchema(recordClass); |
| } |
| } |
| |
| ///////////////////////////////////////////////////////////////////////////// |
| |
| /** Implementation of {@link #readFiles}. */ |
| @AutoValue |
| public abstract static class ReadFiles<T> |
| extends PTransform<PCollection<FileIO.ReadableFile>, PCollection<T>> { |
| @Nullable |
| abstract Class<T> getRecordClass(); |
| |
| @Nullable |
| abstract Schema getSchema(); |
| |
| abstract long getDesiredBundleSizeBytes(); |
| |
| abstract boolean getInferBeamSchema(); |
| |
| abstract Builder<T> toBuilder(); |
| |
| @AutoValue.Builder |
| abstract static class Builder<T> { |
| abstract Builder<T> setRecordClass(Class<T> recordClass); |
| |
| abstract Builder<T> setSchema(Schema schema); |
| |
| abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes); |
| |
| abstract Builder<T> setInferBeamSchema(boolean infer); |
| |
| abstract ReadFiles<T> build(); |
| } |
| |
| @VisibleForTesting |
| ReadFiles<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes) { |
| return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build(); |
| } |
| |
| /** |
| * If set to true, a Beam schema will be inferred from the AVRO schema. This allows the output |
| * to be used by SQL and by the schema-transform library. |
| */ |
| @Experimental(Kind.SCHEMAS) |
| public ReadFiles<T> withBeamSchemas(boolean withBeamSchemas) { |
| return toBuilder().setInferBeamSchema(withBeamSchemas).build(); |
| } |
| |
| @Override |
| public PCollection<T> expand(PCollection<FileIO.ReadableFile> input) { |
| checkNotNull(getSchema(), "schema"); |
| PCollection<T> read = |
| input.apply( |
| "Read all via FileBasedSource", |
| new ReadAllViaFileBasedSource<>( |
| getDesiredBundleSizeBytes(), |
| new CreateSourceFn<>(getRecordClass(), getSchema().toString()), |
| AvroCoder.of(getRecordClass(), getSchema()))); |
| return getInferBeamSchema() ? setBeamSchema(read, getRecordClass(), getSchema()) : read; |
| } |
| |
| @Override |
| public void populateDisplayData(DisplayData.Builder builder) { |
| super.populateDisplayData(builder); |
| builder |
| .add( |
| DisplayData.item("inferBeamSchema", getInferBeamSchema()) |
| .withLabel("Infer Beam Schema")) |
| .addIfNotNull(DisplayData.item("schema", String.valueOf(getSchema()))) |
| .addIfNotNull( |
| DisplayData.item("recordClass", getRecordClass()).withLabel("Record Class")); |
| } |
| } |
| |
| ///////////////////////////////////////////////////////////////////////////// |
| |
| /** |
| * Implementation of {@link #readAll}. |
| * |
| * @deprecated See {@link #readAll(Class)} for details. |
| */ |
| @Deprecated |
| @AutoValue |
| public abstract static class ReadAll<T> extends PTransform<PCollection<String>, PCollection<T>> { |
| abstract MatchConfiguration getMatchConfiguration(); |
| |
| @Nullable |
| abstract Class<T> getRecordClass(); |
| |
| @Nullable |
| abstract Schema getSchema(); |
| |
| abstract long getDesiredBundleSizeBytes(); |
| |
| abstract boolean getInferBeamSchema(); |
| |
| abstract Builder<T> toBuilder(); |
| |
| @AutoValue.Builder |
| abstract static class Builder<T> { |
| abstract Builder<T> setMatchConfiguration(MatchConfiguration matchConfiguration); |
| |
| abstract Builder<T> setRecordClass(Class<T> recordClass); |
| |
| abstract Builder<T> setSchema(Schema schema); |
| |
| abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes); |
| |
| abstract Builder<T> setInferBeamSchema(boolean infer); |
| |
| abstract ReadAll<T> build(); |
| } |
| |
| /** Sets the {@link MatchConfiguration}. */ |
| public ReadAll<T> withMatchConfiguration(MatchConfiguration configuration) { |
| return toBuilder().setMatchConfiguration(configuration).build(); |
| } |
| |
| /** Like {@link Read#withEmptyMatchTreatment}. */ |
| public ReadAll<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) { |
| return withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment)); |
| } |
| |
| /** Like {@link Read#watchForNewFiles}. */ |
| @Experimental(Kind.SPLITTABLE_DO_FN) |
| public ReadAll<T> watchForNewFiles( |
| Duration pollInterval, TerminationCondition<String, ?> terminationCondition) { |
| return withMatchConfiguration( |
| getMatchConfiguration().continuously(pollInterval, terminationCondition)); |
| } |
| |
| @VisibleForTesting |
| ReadAll<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes) { |
| return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build(); |
| } |
| |
| /** |
| * If set to true, a Beam schema will be inferred from the AVRO schema. This allows the output |
| * to be used by SQL and by the schema-transform library. |
| */ |
| @Experimental(Kind.SCHEMAS) |
| public ReadAll<T> withBeamSchemas(boolean withBeamSchemas) { |
| return toBuilder().setInferBeamSchema(withBeamSchemas).build(); |
| } |
| |
| @Override |
| public PCollection<T> expand(PCollection<String> input) { |
| checkNotNull(getSchema(), "schema"); |
| PCollection<T> read = |
| input |
| .apply(FileIO.matchAll().withConfiguration(getMatchConfiguration())) |
| .apply(FileIO.readMatches().withDirectoryTreatment(DirectoryTreatment.PROHIBIT)) |
| .apply(readFiles(getRecordClass())); |
| return getInferBeamSchema() ? setBeamSchema(read, getRecordClass(), getSchema()) : read; |
| } |
| |
| @Override |
| public void populateDisplayData(DisplayData.Builder builder) { |
| super.populateDisplayData(builder); |
| builder |
| .add( |
| DisplayData.item("inferBeamSchema", getInferBeamSchema()) |
| .withLabel("Infer Beam Schema")) |
| .addIfNotNull(DisplayData.item("schema", String.valueOf(getSchema()))) |
| .addIfNotNull(DisplayData.item("recordClass", getRecordClass()).withLabel("Record Class")) |
| .include("matchConfiguration", getMatchConfiguration()); |
| } |
| } |
| |
| private static class CreateSourceFn<T> |
| implements SerializableFunction<String, FileBasedSource<T>> { |
| private final Class<T> recordClass; |
| private final Supplier<Schema> schemaSupplier; |
| |
| CreateSourceFn(Class<T> recordClass, String jsonSchema) { |
| this.recordClass = recordClass; |
| this.schemaSupplier = AvroUtils.serializableSchemaSupplier(jsonSchema); |
| } |
| |
| @Override |
| public FileBasedSource<T> apply(String input) { |
| return Read.createSource( |
| StaticValueProvider.of(input), |
| EmptyMatchTreatment.DISALLOW, |
| recordClass, |
| schemaSupplier.get()); |
| } |
| } |
| |
| ///////////////////////////////////////////////////////////////////////////// |
| |
| /** Implementation of {@link #parseGenericRecords}. */ |
| @AutoValue |
| public abstract static class Parse<T> extends PTransform<PBegin, PCollection<T>> { |
| @Nullable |
| abstract ValueProvider<String> getFilepattern(); |
| |
| abstract MatchConfiguration getMatchConfiguration(); |
| |
| abstract SerializableFunction<GenericRecord, T> getParseFn(); |
| |
| @Nullable |
| abstract Coder<T> getCoder(); |
| |
| abstract boolean getHintMatchesManyFiles(); |
| |
| abstract Builder<T> toBuilder(); |
| |
| @AutoValue.Builder |
| abstract static class Builder<T> { |
| abstract Builder<T> setFilepattern(ValueProvider<String> filepattern); |
| |
| abstract Builder<T> setMatchConfiguration(MatchConfiguration matchConfiguration); |
| |
| abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn); |
| |
| abstract Builder<T> setCoder(Coder<T> coder); |
| |
| abstract Builder<T> setHintMatchesManyFiles(boolean hintMatchesManyFiles); |
| |
| abstract Parse<T> build(); |
| } |
| |
| /** Reads from the given filename or filepattern. */ |
| public Parse<T> from(String filepattern) { |
| return from(StaticValueProvider.of(filepattern)); |
| } |
| |
| /** Like {@link #from(String)}. */ |
| public Parse<T> from(ValueProvider<String> filepattern) { |
| return toBuilder().setFilepattern(filepattern).build(); |
| } |
| |
| /** Sets the {@link MatchConfiguration}. */ |
| public Parse<T> withMatchConfiguration(MatchConfiguration configuration) { |
| return toBuilder().setMatchConfiguration(configuration).build(); |
| } |
| |
| /** Like {@link Read#withEmptyMatchTreatment}. */ |
| public Parse<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) { |
| return withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment)); |
| } |
| |
| /** Like {@link Read#watchForNewFiles}. */ |
| @Experimental(Kind.SPLITTABLE_DO_FN) |
| public Parse<T> watchForNewFiles( |
| Duration pollInterval, TerminationCondition<String, ?> terminationCondition) { |
| return withMatchConfiguration( |
| getMatchConfiguration().continuously(pollInterval, terminationCondition)); |
| } |
| |
| /** Sets a coder for the result of the parse function. */ |
| public Parse<T> withCoder(Coder<T> coder) { |
| return toBuilder().setCoder(coder).build(); |
| } |
| |
| /** Like {@link Read#withHintMatchesManyFiles()}. */ |
| public Parse<T> withHintMatchesManyFiles() { |
| return toBuilder().setHintMatchesManyFiles(true).build(); |
| } |
| |
| @Override |
| public PCollection<T> expand(PBegin input) { |
| checkNotNull(getFilepattern(), "filepattern"); |
| Coder<T> coder = inferCoder(getCoder(), getParseFn(), input.getPipeline().getCoderRegistry()); |
| |
| if (getMatchConfiguration().getWatchInterval() == null && !getHintMatchesManyFiles()) { |
| return input.apply( |
| org.apache.beam.sdk.io.Read.from( |
| AvroSource.from(getFilepattern()).withParseFn(getParseFn(), coder))); |
| } |
| |
| // All other cases go through FileIO + ParseFilesGenericRecords. |
| return input |
| .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of())) |
| .apply("Match All", FileIO.matchAll().withConfiguration(getMatchConfiguration())) |
| .apply( |
| "Read Matches", |
| FileIO.readMatches().withDirectoryTreatment(DirectoryTreatment.PROHIBIT)) |
| .apply("Via ParseFiles", parseFilesGenericRecords(getParseFn()).withCoder(coder)); |
| } |
| |
| private static <T> Coder<T> inferCoder( |
| @Nullable Coder<T> explicitCoder, |
| SerializableFunction<GenericRecord, T> parseFn, |
| CoderRegistry coderRegistry) { |
| if (explicitCoder != null) { |
| return explicitCoder; |
| } |
| // If a coder was not specified explicitly, infer it from parse fn. |
| try { |
| return coderRegistry.getCoder(TypeDescriptors.outputOf(parseFn)); |
| } catch (CannotProvideCoderException e) { |
| throw new IllegalArgumentException( |
| "Unable to infer coder for output of parseFn. Specify it explicitly using withCoder().", |
| e); |
| } |
| } |
| |
| @Override |
| public void populateDisplayData(DisplayData.Builder builder) { |
| super.populateDisplayData(builder); |
| builder |
| .addIfNotNull( |
| DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern")) |
| .add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function")) |
| .include("matchConfiguration", getMatchConfiguration()); |
| } |
| } |
| |
| ///////////////////////////////////////////////////////////////////////////// |
| |
| /** Implementation of {@link #parseFilesGenericRecords}. */ |
| @AutoValue |
| public abstract static class ParseFiles<T> |
| extends PTransform<PCollection<FileIO.ReadableFile>, PCollection<T>> { |
| abstract SerializableFunction<GenericRecord, T> getParseFn(); |
| |
| @Nullable |
| abstract Coder<T> getCoder(); |
| |
| abstract long getDesiredBundleSizeBytes(); |
| |
| abstract Builder<T> toBuilder(); |
| |
| @AutoValue.Builder |
| abstract static class Builder<T> { |
| abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn); |
| |
| abstract Builder<T> setCoder(Coder<T> coder); |
| |
| abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes); |
| |
| abstract ParseFiles<T> build(); |
| } |
| |
| /** Specifies the coder for the result of the {@code parseFn}. */ |
| public ParseFiles<T> withCoder(Coder<T> coder) { |
| return toBuilder().setCoder(coder).build(); |
| } |
| |
| @VisibleForTesting |
| ParseFiles<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes) { |
| return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build(); |
| } |
| |
| @Override |
| public PCollection<T> expand(PCollection<FileIO.ReadableFile> input) { |
| final Coder<T> coder = |
| Parse.inferCoder(getCoder(), getParseFn(), input.getPipeline().getCoderRegistry()); |
| final SerializableFunction<GenericRecord, T> parseFn = getParseFn(); |
| final SerializableFunction<String, FileBasedSource<T>> createSource = |
| new CreateParseSourceFn<>(parseFn, coder); |
| return input.apply( |
| "Parse Files via FileBasedSource", |
| new ReadAllViaFileBasedSource<>(getDesiredBundleSizeBytes(), createSource, coder)); |
| } |
| |
| @Override |
| public void populateDisplayData(DisplayData.Builder builder) { |
| super.populateDisplayData(builder); |
| builder.add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function")); |
| } |
| |
| private static class CreateParseSourceFn<T> |
| implements SerializableFunction<String, FileBasedSource<T>> { |
| private final SerializableFunction<GenericRecord, T> parseFn; |
| private final Coder<T> coder; |
| |
| CreateParseSourceFn(SerializableFunction<GenericRecord, T> parseFn, Coder<T> coder) { |
| this.parseFn = parseFn; |
| this.coder = coder; |
| } |
| |
| @Override |
| public FileBasedSource<T> apply(String input) { |
| return AvroSource.from(input).withParseFn(parseFn, coder); |
| } |
| } |
| } |
| |
| ///////////////////////////////////////////////////////////////////////////// |
| |
| /** |
| * Implementation of {@link #parseAllGenericRecords}. |
| * |
| * @deprecated See {@link #parseAllGenericRecords(SerializableFunction)} for details. |
| */ |
| @Deprecated |
| @AutoValue |
| public abstract static class ParseAll<T> extends PTransform<PCollection<String>, PCollection<T>> { |
| abstract MatchConfiguration getMatchConfiguration(); |
| |
| abstract SerializableFunction<GenericRecord, T> getParseFn(); |
| |
| @Nullable |
| abstract Coder<T> getCoder(); |
| |
| abstract long getDesiredBundleSizeBytes(); |
| |
| abstract Builder<T> toBuilder(); |
| |
| @AutoValue.Builder |
| abstract static class Builder<T> { |
| abstract Builder<T> setMatchConfiguration(MatchConfiguration matchConfiguration); |
| |
| abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn); |
| |
| abstract Builder<T> setCoder(Coder<T> coder); |
| |
| abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes); |
| |
| abstract ParseAll<T> build(); |
| } |
| |
| /** Sets the {@link MatchConfiguration}. */ |
| public ParseAll<T> withMatchConfiguration(MatchConfiguration configuration) { |
| return toBuilder().setMatchConfiguration(configuration).build(); |
| } |
| |
| /** Like {@link Read#withEmptyMatchTreatment}. */ |
| public ParseAll<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) { |
| return withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment)); |
| } |
| |
| /** Like {@link Read#watchForNewFiles}. */ |
| @Experimental(Kind.SPLITTABLE_DO_FN) |
| public ParseAll<T> watchForNewFiles( |
| Duration pollInterval, TerminationCondition<String, ?> terminationCondition) { |
| return withMatchConfiguration( |
| getMatchConfiguration().continuously(pollInterval, terminationCondition)); |
| } |
| |
| /** Specifies the coder for the result of the {@code parseFn}. */ |
| public ParseAll<T> withCoder(Coder<T> coder) { |
| return toBuilder().setCoder(coder).build(); |
| } |
| |
| @VisibleForTesting |
| ParseAll<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes) { |
| return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build(); |
| } |
| |
| @Override |
| public PCollection<T> expand(PCollection<String> input) { |
| return input |
| .apply(FileIO.matchAll().withConfiguration(getMatchConfiguration())) |
| .apply(FileIO.readMatches().withDirectoryTreatment(DirectoryTreatment.PROHIBIT)) |
| .apply( |
| "Parse all via FileBasedSource", |
| parseFilesGenericRecords(getParseFn()).withCoder(getCoder())); |
| } |
| |
| @Override |
| public void populateDisplayData(DisplayData.Builder builder) { |
| super.populateDisplayData(builder); |
| builder |
| .add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function")) |
| .include("matchConfiguration", getMatchConfiguration()); |
| } |
| } |
| |
| ///////////////////////////////////////////////////////////////////////////// |
| |
| /** Implementation of {@link #write}. */ |
| @AutoValue |
| public abstract static class TypedWrite<UserT, DestinationT, OutputT> |
| extends PTransform<PCollection<UserT>, WriteFilesResult<DestinationT>> { |
| static final CodecFactory DEFAULT_CODEC = CodecFactory.snappyCodec(); |
| static final SerializableAvroCodecFactory DEFAULT_SERIALIZABLE_CODEC = |
| new SerializableAvroCodecFactory(DEFAULT_CODEC); |
| |
| @Nullable |
| abstract SerializableFunction<UserT, OutputT> getFormatFunction(); |
| |
| @Nullable |
| abstract ValueProvider<ResourceId> getFilenamePrefix(); |
| |
| @Nullable |
| abstract String getShardTemplate(); |
| |
| @Nullable |
| abstract String getFilenameSuffix(); |
| |
| @Nullable |
| abstract ValueProvider<ResourceId> getTempDirectory(); |
| |
| abstract int getNumShards(); |
| |
| abstract boolean getGenericRecords(); |
| |
| @Nullable |
| abstract Schema getSchema(); |
| |
| abstract boolean getWindowedWrites(); |
| |
| abstract boolean getNoSpilling(); |
| |
| @Nullable |
| abstract FilenamePolicy getFilenamePolicy(); |
| |
| @Nullable |
| abstract DynamicAvroDestinations<UserT, DestinationT, OutputT> getDynamicDestinations(); |
| |
| /** |
| * The codec used to encode the blocks in the Avro file. String value drawn from those in |
| * https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html |
| */ |
| abstract SerializableAvroCodecFactory getCodec(); |
| /** Avro file metadata. */ |
| abstract ImmutableMap<String, Object> getMetadata(); |
| |
| abstract Builder<UserT, DestinationT, OutputT> toBuilder(); |
| |
| @AutoValue.Builder |
| abstract static class Builder<UserT, DestinationT, OutputT> { |
| abstract Builder<UserT, DestinationT, OutputT> setFormatFunction( |
| @Nullable SerializableFunction<UserT, OutputT> formatFunction); |
| |
| abstract Builder<UserT, DestinationT, OutputT> setFilenamePrefix( |
| ValueProvider<ResourceId> filenamePrefix); |
| |
| abstract Builder<UserT, DestinationT, OutputT> setFilenameSuffix( |
| @Nullable String filenameSuffix); |
| |
| abstract Builder<UserT, DestinationT, OutputT> setTempDirectory( |
| ValueProvider<ResourceId> tempDirectory); |
| |
| abstract Builder<UserT, DestinationT, OutputT> setNumShards(int numShards); |
| |
| abstract Builder<UserT, DestinationT, OutputT> setShardTemplate( |
| @Nullable String shardTemplate); |
| |
| abstract Builder<UserT, DestinationT, OutputT> setGenericRecords(boolean genericRecords); |
| |
| abstract Builder<UserT, DestinationT, OutputT> setSchema(Schema schema); |
| |
| abstract Builder<UserT, DestinationT, OutputT> setWindowedWrites(boolean windowedWrites); |
| |
| abstract Builder<UserT, DestinationT, OutputT> setNoSpilling(boolean noSpilling); |
| |
| abstract Builder<UserT, DestinationT, OutputT> setFilenamePolicy( |
| FilenamePolicy filenamePolicy); |
| |
| abstract Builder<UserT, DestinationT, OutputT> setCodec(SerializableAvroCodecFactory codec); |
| |
| abstract Builder<UserT, DestinationT, OutputT> setMetadata( |
| ImmutableMap<String, Object> metadata); |
| |
| abstract Builder<UserT, DestinationT, OutputT> setDynamicDestinations( |
| DynamicAvroDestinations<UserT, DestinationT, OutputT> dynamicDestinations); |
| |
| abstract TypedWrite<UserT, DestinationT, OutputT> build(); |
| } |
| |
| /** |
| * Writes to file(s) with the given output prefix. See {@link FileSystems} for information on |
| * supported file systems. |
| * |
| * <p>The name of the output files will be determined by the {@link FilenamePolicy} used. |
| * |
| * <p>By default, a {@link DefaultFilenamePolicy} will build output filenames using the |
| * specified prefix, a shard name template (see {@link #withShardNameTemplate(String)}, and a |
| * common suffix (if supplied using {@link #withSuffix(String)}). This default can be overridden |
| * using {@link #to(FilenamePolicy)}. |
| */ |
| public TypedWrite<UserT, DestinationT, OutputT> to(String outputPrefix) { |
| return to(FileBasedSink.convertToFileResourceIfPossible(outputPrefix)); |
| } |
| |
| /** |
| * Writes to file(s) with the given output prefix. See {@link FileSystems} for information on |
| * supported file systems. This prefix is used by the {@link DefaultFilenamePolicy} to generate |
| * filenames. |
| * |
| * <p>By default, a {@link DefaultFilenamePolicy} will build output filenames using the |
| * specified prefix, a shard name template (see {@link #withShardNameTemplate(String)}, and a |
| * common suffix (if supplied using {@link #withSuffix(String)}). This default can be overridden |
| * using {@link #to(FilenamePolicy)}. |
| * |
| * <p>This default policy can be overridden using {@link #to(FilenamePolicy)}, in which case |
| * {@link #withShardNameTemplate(String)} and {@link #withSuffix(String)} should not be set. |
| * Custom filename policies do not automatically see this prefix - you should explicitly pass |
| * the prefix into your {@link FilenamePolicy} object if you need this. |
| * |
| * <p>If {@link #withTempDirectory} has not been called, this filename prefix will be used to |
| * infer a directory for temporary files. |
| */ |
| @Experimental(Kind.FILESYSTEM) |
| public TypedWrite<UserT, DestinationT, OutputT> to(ResourceId outputPrefix) { |
| return toResource(StaticValueProvider.of(outputPrefix)); |
| } |
| |
| private static class OutputPrefixToResourceId |
| implements SerializableFunction<String, ResourceId> { |
| @Override |
| public ResourceId apply(String input) { |
| return FileBasedSink.convertToFileResourceIfPossible(input); |
| } |
| } |
| |
| /** Like {@link #to(String)}. */ |
| public TypedWrite<UserT, DestinationT, OutputT> to(ValueProvider<String> outputPrefix) { |
| return toResource( |
| NestedValueProvider.of( |
| outputPrefix, |
| // The function cannot be created as an anonymous class here since the enclosed class |
| // may contain unserializable members. |
| new OutputPrefixToResourceId())); |
| } |
| |
| /** Like {@link #to(ResourceId)}. */ |
| @Experimental(Kind.FILESYSTEM) |
| public TypedWrite<UserT, DestinationT, OutputT> toResource( |
| ValueProvider<ResourceId> outputPrefix) { |
| return toBuilder().setFilenamePrefix(outputPrefix).build(); |
| } |
| |
| /** |
| * Writes to files named according to the given {@link FileBasedSink.FilenamePolicy}. A |
| * directory for temporary files must be specified using {@link #withTempDirectory}. |
| */ |
| @Experimental(Kind.FILESYSTEM) |
| public TypedWrite<UserT, DestinationT, OutputT> to(FilenamePolicy filenamePolicy) { |
| return toBuilder().setFilenamePolicy(filenamePolicy).build(); |
| } |
| |
| /** |
| * Use a {@link DynamicAvroDestinations} object to vend {@link FilenamePolicy} objects. These |
| * objects can examine the input record when creating a {@link FilenamePolicy}. A directory for |
| * temporary files must be specified using {@link #withTempDirectory}. |
| * |
| * @deprecated Use {@link FileIO#write()} or {@link FileIO#writeDynamic()} instead. |
| */ |
| @Experimental(Kind.FILESYSTEM) |
| @Deprecated |
| public <NewDestinationT> TypedWrite<UserT, NewDestinationT, OutputT> to( |
| DynamicAvroDestinations<UserT, NewDestinationT, OutputT> dynamicDestinations) { |
| return toBuilder() |
| .setDynamicDestinations((DynamicAvroDestinations) dynamicDestinations) |
| .build(); |
| } |
| |
| /** |
| * Sets the the output schema. Can only be used when the output type is {@link GenericRecord} |
| * and when not using {@link #to(DynamicAvroDestinations)}. |
| */ |
| public TypedWrite<UserT, DestinationT, OutputT> withSchema(Schema schema) { |
| return toBuilder().setSchema(schema).build(); |
| } |
| |
| /** |
| * Specifies a format function to convert {@link UserT} to the output type. If {@link |
| * #to(DynamicAvroDestinations)} is used, {@link DynamicAvroDestinations#formatRecord} must be |
| * used instead. |
| */ |
| public TypedWrite<UserT, DestinationT, OutputT> withFormatFunction( |
| @Nullable SerializableFunction<UserT, OutputT> formatFunction) { |
| return toBuilder().setFormatFunction(formatFunction).build(); |
| } |
| |
| /** Set the base directory used to generate temporary files. */ |
| @Experimental(Kind.FILESYSTEM) |
| public TypedWrite<UserT, DestinationT, OutputT> withTempDirectory( |
| ValueProvider<ResourceId> tempDirectory) { |
| return toBuilder().setTempDirectory(tempDirectory).build(); |
| } |
| |
| /** Set the base directory used to generate temporary files. */ |
| @Experimental(Kind.FILESYSTEM) |
| public TypedWrite<UserT, DestinationT, OutputT> withTempDirectory(ResourceId tempDirectory) { |
| return withTempDirectory(StaticValueProvider.of(tempDirectory)); |
| } |
| |
| /** |
| * Uses the given {@link ShardNameTemplate} for naming output files. This option may only be |
| * used when using one of the default filename-prefix to() overrides. |
| * |
| * <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are |
| * used. |
| */ |
| public TypedWrite<UserT, DestinationT, OutputT> withShardNameTemplate(String shardTemplate) { |
| return toBuilder().setShardTemplate(shardTemplate).build(); |
| } |
| |
| /** |
| * Configures the filename suffix for written files. This option may only be used when using one |
| * of the default filename-prefix to() overrides. |
| * |
| * <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are |
| * used. |
| */ |
| public TypedWrite<UserT, DestinationT, OutputT> withSuffix(String filenameSuffix) { |
| return toBuilder().setFilenameSuffix(filenameSuffix).build(); |
| } |
| |
| /** |
| * Configures the number of output shards produced overall (when using unwindowed writes) or |
| * per-window (when using windowed writes). |
| * |
| * <p>For unwindowed writes, constraining the number of shards is likely to reduce the |
| * performance of a pipeline. Setting this value is not recommended unless you require a |
| * specific number of output files. |
| * |
| * @param numShards the number of shards to use, or 0 to let the system decide. |
| */ |
| public TypedWrite<UserT, DestinationT, OutputT> withNumShards(int numShards) { |
| checkArgument(numShards >= 0); |
| return toBuilder().setNumShards(numShards).build(); |
| } |
| |
| /** |
| * Forces a single file as output and empty shard name template. This option is only compatible |
| * with unwindowed writes. |
| * |
| * <p>For unwindowed writes, constraining the number of shards is likely to reduce the |
| * performance of a pipeline. Setting this value is not recommended unless you require a |
| * specific number of output files. |
| * |
| * <p>This is equivalent to {@code .withNumShards(1).withShardNameTemplate("")} |
| */ |
| public TypedWrite<UserT, DestinationT, OutputT> withoutSharding() { |
| return withNumShards(1).withShardNameTemplate(""); |
| } |
| |
| /** |
| * Preserves windowing of input elements and writes them to files based on the element's window. |
| * |
| * <p>If using {@link #to(FileBasedSink.FilenamePolicy)}. Filenames will be generated using |
| * {@link FilenamePolicy#windowedFilename}. See also {@link WriteFiles#withWindowedWrites()}. |
| */ |
| public TypedWrite<UserT, DestinationT, OutputT> withWindowedWrites() { |
| return toBuilder().setWindowedWrites(true).build(); |
| } |
| |
| /** See {@link WriteFiles#withNoSpilling()}. */ |
| public TypedWrite<UserT, DestinationT, OutputT> withNoSpilling() { |
| return toBuilder().setNoSpilling(true).build(); |
| } |
| |
| /** Writes to Avro file(s) compressed using specified codec. */ |
| public TypedWrite<UserT, DestinationT, OutputT> withCodec(CodecFactory codec) { |
| return toBuilder().setCodec(new SerializableAvroCodecFactory(codec)).build(); |
| } |
| |
| /** |
| * Writes to Avro file(s) with the specified metadata. |
| * |
| * <p>Supported value types are String, Long, and byte[]. |
| */ |
| public TypedWrite<UserT, DestinationT, OutputT> withMetadata(Map<String, Object> metadata) { |
| Map<String, String> badKeys = Maps.newLinkedHashMap(); |
| for (Map.Entry<String, Object> entry : metadata.entrySet()) { |
| Object v = entry.getValue(); |
| if (!(v instanceof String || v instanceof Long || v instanceof byte[])) { |
| badKeys.put(entry.getKey(), v.getClass().getSimpleName()); |
| } |
| } |
| checkArgument( |
| badKeys.isEmpty(), |
| "Metadata value type must be one of String, Long, or byte[]. Found {}", |
| badKeys); |
| return toBuilder().setMetadata(ImmutableMap.copyOf(metadata)).build(); |
| } |
| |
| DynamicAvroDestinations<UserT, DestinationT, OutputT> resolveDynamicDestinations() { |
| DynamicAvroDestinations<UserT, DestinationT, OutputT> dynamicDestinations = |
| getDynamicDestinations(); |
| if (dynamicDestinations == null) { |
| // In this case DestinationT is Void. |
| FilenamePolicy usedFilenamePolicy = getFilenamePolicy(); |
| if (usedFilenamePolicy == null) { |
| usedFilenamePolicy = |
| DefaultFilenamePolicy.fromStandardParameters( |
| getFilenamePrefix(), |
| getShardTemplate(), |
| getFilenameSuffix(), |
| getWindowedWrites()); |
| } |
| dynamicDestinations = |
| (DynamicAvroDestinations<UserT, DestinationT, OutputT>) |
| constantDestinations( |
| usedFilenamePolicy, |
| getSchema(), |
| getMetadata(), |
| getCodec().getCodec(), |
| getFormatFunction()); |
| } |
| return dynamicDestinations; |
| } |
| |
| @Override |
| public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) { |
| checkArgument( |
| getFilenamePrefix() != null || getTempDirectory() != null, |
| "Need to set either the filename prefix or the tempDirectory of a AvroIO.Write " |
| + "transform."); |
| if (getFilenamePolicy() != null) { |
| checkArgument( |
| getShardTemplate() == null && getFilenameSuffix() == null, |
| "shardTemplate and filenameSuffix should only be used with the default " |
| + "filename policy"); |
| } |
| if (getDynamicDestinations() != null) { |
| checkArgument( |
| getFormatFunction() == null, |
| "A format function should not be specified " |
| + "with DynamicDestinations. Use DynamicDestinations.formatRecord instead"); |
| } else { |
| checkArgument( |
| getSchema() != null, "Unless using DynamicDestinations, .withSchema() is required."); |
| } |
| |
| ValueProvider<ResourceId> tempDirectory = getTempDirectory(); |
| if (tempDirectory == null) { |
| tempDirectory = getFilenamePrefix(); |
| } |
| WriteFiles<UserT, DestinationT, OutputT> write = |
| WriteFiles.to( |
| new AvroSink<>(tempDirectory, resolveDynamicDestinations(), getGenericRecords())); |
| if (getNumShards() > 0) { |
| write = write.withNumShards(getNumShards()); |
| } |
| if (getWindowedWrites()) { |
| write = write.withWindowedWrites(); |
| } |
| if (getNoSpilling()) { |
| write = write.withNoSpilling(); |
| } |
| return input.apply("Write", write); |
| } |
| |
| @Override |
| public void populateDisplayData(DisplayData.Builder builder) { |
| super.populateDisplayData(builder); |
| resolveDynamicDestinations().populateDisplayData(builder); |
| builder |
| .addIfNotDefault( |
| DisplayData.item("numShards", getNumShards()).withLabel("Maximum Output Shards"), 0) |
| .addIfNotNull( |
| DisplayData.item("tempDirectory", getTempDirectory()) |
| .withLabel("Directory for temporary files")); |
| } |
| } |
| |
| /** |
| * This class is used as the default return value of {@link AvroIO#write} |
| * |
| * <p>All methods in this class delegate to the appropriate method of {@link AvroIO.TypedWrite}. |
| * This class exists for backwards compatibility, and will be removed in Beam 3.0. |
| */ |
| public static class Write<T> extends PTransform<PCollection<T>, PDone> { |
| @VisibleForTesting final TypedWrite<T, ?, T> inner; |
| |
| Write(TypedWrite<T, ?, T> inner) { |
| this.inner = inner; |
| } |
| |
| /** See {@link TypedWrite#to(String)}. */ |
| public Write<T> to(String outputPrefix) { |
| return new Write<>( |
| inner |
| .to(FileBasedSink.convertToFileResourceIfPossible(outputPrefix)) |
| .withFormatFunction(SerializableFunctions.identity())); |
| } |
| |
| /** See {@link TypedWrite#to(ResourceId)} . */ |
| @Experimental(Kind.FILESYSTEM) |
| public Write<T> to(ResourceId outputPrefix) { |
| return new Write<>( |
| inner.to(outputPrefix).withFormatFunction(SerializableFunctions.identity())); |
| } |
| |
| /** See {@link TypedWrite#to(ValueProvider)}. */ |
| public Write<T> to(ValueProvider<String> outputPrefix) { |
| return new Write<>( |
| inner.to(outputPrefix).withFormatFunction(SerializableFunctions.identity())); |
| } |
| |
| /** See {@link TypedWrite#to(ResourceId)}. */ |
| @Experimental(Kind.FILESYSTEM) |
| public Write<T> toResource(ValueProvider<ResourceId> outputPrefix) { |
| return new Write<>( |
| inner.toResource(outputPrefix).withFormatFunction(SerializableFunctions.identity())); |
| } |
| |
| /** See {@link TypedWrite#to(FilenamePolicy)}. */ |
| public Write<T> to(FilenamePolicy filenamePolicy) { |
| return new Write<>( |
| inner.to(filenamePolicy).withFormatFunction(SerializableFunctions.identity())); |
| } |
| |
| /** |
| * See {@link TypedWrite#to(DynamicAvroDestinations)}. |
| * |
| * @deprecated Use {@link FileIO#write()} or {@link FileIO#writeDynamic()} instead. |
| */ |
| @Deprecated |
| public Write<T> to(DynamicAvroDestinations<T, ?, T> dynamicDestinations) { |
| return new Write<>(inner.to(dynamicDestinations).withFormatFunction(null)); |
| } |
| |
| /** See {@link TypedWrite#withSchema}. */ |
| public Write<T> withSchema(Schema schema) { |
| return new Write<>(inner.withSchema(schema)); |
| } |
| /** See {@link TypedWrite#withTempDirectory(ValueProvider)}. */ |
| @Experimental(Kind.FILESYSTEM) |
| public Write<T> withTempDirectory(ValueProvider<ResourceId> tempDirectory) { |
| return new Write<>(inner.withTempDirectory(tempDirectory)); |
| } |
| |
| /** See {@link TypedWrite#withTempDirectory(ResourceId)}. */ |
| public Write<T> withTempDirectory(ResourceId tempDirectory) { |
| return new Write<>(inner.withTempDirectory(tempDirectory)); |
| } |
| |
| /** See {@link TypedWrite#withShardNameTemplate}. */ |
| public Write<T> withShardNameTemplate(String shardTemplate) { |
| return new Write<>(inner.withShardNameTemplate(shardTemplate)); |
| } |
| |
| /** See {@link TypedWrite#withSuffix}. */ |
| public Write<T> withSuffix(String filenameSuffix) { |
| return new Write<>(inner.withSuffix(filenameSuffix)); |
| } |
| |
| /** See {@link TypedWrite#withNumShards}. */ |
| public Write<T> withNumShards(int numShards) { |
| return new Write<>(inner.withNumShards(numShards)); |
| } |
| |
| /** See {@link TypedWrite#withoutSharding}. */ |
| public Write<T> withoutSharding() { |
| return new Write<>(inner.withoutSharding()); |
| } |
| |
| /** See {@link TypedWrite#withWindowedWrites}. */ |
| public Write<T> withWindowedWrites() { |
| return new Write<>(inner.withWindowedWrites()); |
| } |
| |
| /** See {@link TypedWrite#withCodec}. */ |
| public Write<T> withCodec(CodecFactory codec) { |
| return new Write<>(inner.withCodec(codec)); |
| } |
| |
| /** |
| * Specify that output filenames are wanted. |
| * |
| * <p>The nested {@link TypedWrite}transform always has access to output filenames, however due |
| * to backwards-compatibility concerns, {@link Write} cannot return them. This method simply |
| * returns the inner {@link TypedWrite} transform which has {@link WriteFilesResult} as its |
| * output type, allowing access to output files. |
| * |
| * <p>The supplied {@code DestinationT} type must be: the same as that supplied in {@link |
| * #to(DynamicAvroDestinations)} if that method was used, or {@code Void} otherwise. |
| */ |
| public <DestinationT> TypedWrite<T, DestinationT, T> withOutputFilenames() { |
| return (TypedWrite) inner; |
| } |
| |
| /** See {@link TypedWrite#withMetadata} . */ |
| public Write<T> withMetadata(Map<String, Object> metadata) { |
| return new Write<>(inner.withMetadata(metadata)); |
| } |
| |
| @Override |
| public PDone expand(PCollection<T> input) { |
| input.apply(inner); |
| return PDone.in(input.getPipeline()); |
| } |
| |
| @Override |
| public void populateDisplayData(DisplayData.Builder builder) { |
| inner.populateDisplayData(builder); |
| } |
| } |
| |
| /** |
| * Returns a {@link DynamicAvroDestinations} that always returns the same {@link FilenamePolicy}, |
| * schema, metadata, and codec. |
| */ |
| public static <UserT, OutputT> DynamicAvroDestinations<UserT, Void, OutputT> constantDestinations( |
| FilenamePolicy filenamePolicy, |
| Schema schema, |
| Map<String, Object> metadata, |
| CodecFactory codec, |
| SerializableFunction<UserT, OutputT> formatFunction) { |
| return new ConstantAvroDestination<>(filenamePolicy, schema, metadata, codec, formatFunction); |
| } |
| ///////////////////////////////////////////////////////////////////////////// |
| |
| /** |
| * Formats an element of a user type into a record with the given schema. |
| * |
| * @deprecated Users can achieve the same by providing this transform in a {@link |
| * org.apache.beam.sdk.transforms.ParDo} before using write in AvroIO {@link #write(Class)}. |
| */ |
| @Deprecated |
| public interface RecordFormatter<ElementT> extends Serializable { |
| GenericRecord formatRecord(ElementT element, Schema schema); |
| } |
| |
| /** |
| * A {@link Sink} for use with {@link FileIO#write} and {@link FileIO#writeDynamic}, writing |
| * elements of the given generated class, like {@link #write(Class)}. |
| */ |
| public static <ElementT> Sink<ElementT> sink(final Class<ElementT> clazz) { |
| return new AutoValue_AvroIO_Sink.Builder<ElementT>() |
| .setJsonSchema(ReflectData.get().getSchema(clazz).toString()) |
| .setMetadata(ImmutableMap.of()) |
| .setCodec(TypedWrite.DEFAULT_SERIALIZABLE_CODEC) |
| .build(); |
| } |
| |
| /** |
| * A {@link Sink} for use with {@link FileIO#write} and {@link FileIO#writeDynamic}, writing |
| * elements by converting each one to a {@link GenericRecord} with a given (common) schema, like |
| * {@link #writeCustomTypeToGenericRecords()}. |
| * |
| * @deprecated RecordFormatter will be removed in future versions. |
| */ |
| @Deprecated |
| public static <ElementT> Sink<ElementT> sinkViaGenericRecords( |
| Schema schema, RecordFormatter<ElementT> formatter) { |
| return new AutoValue_AvroIO_Sink.Builder<ElementT>() |
| .setRecordFormatter(formatter) |
| .setJsonSchema(schema.toString()) |
| .setMetadata(ImmutableMap.of()) |
| .setCodec(TypedWrite.DEFAULT_SERIALIZABLE_CODEC) |
| .build(); |
| } |
| |
| /** Implementation of {@link #sink} and {@link #sinkViaGenericRecords}. */ |
| @AutoValue |
| public abstract static class Sink<ElementT> implements FileIO.Sink<ElementT> { |
| /** @deprecated RecordFormatter will be removed in future versions. */ |
| @Nullable |
| @Deprecated |
| abstract RecordFormatter<ElementT> getRecordFormatter(); |
| |
| @Nullable |
| abstract String getJsonSchema(); |
| |
| abstract Map<String, Object> getMetadata(); |
| |
| abstract SerializableAvroCodecFactory getCodec(); |
| |
| abstract Builder<ElementT> toBuilder(); |
| |
| @AutoValue.Builder |
| abstract static class Builder<ElementT> { |
| /** @deprecated RecordFormatter will be removed in future versions. */ |
| @Deprecated |
| abstract Builder<ElementT> setRecordFormatter(RecordFormatter<ElementT> formatter); |
| |
| abstract Builder<ElementT> setJsonSchema(String jsonSchema); |
| |
| abstract Builder<ElementT> setMetadata(Map<String, Object> metadata); |
| |
| abstract Builder<ElementT> setCodec(SerializableAvroCodecFactory codec); |
| |
| abstract Sink<ElementT> build(); |
| } |
| |
| /** Specifies to put the given metadata into each generated file. By default, empty. */ |
| public Sink<ElementT> withMetadata(Map<String, Object> metadata) { |
| return toBuilder().setMetadata(metadata).build(); |
| } |
| |
| /** |
| * Specifies to use the given {@link CodecFactory} for each generated file. By default, {@code |
| * CodecFactory.snappyCodec()}. |
| */ |
| public Sink<ElementT> withCodec(CodecFactory codec) { |
| return toBuilder().setCodec(new SerializableAvroCodecFactory(codec)).build(); |
| } |
| |
| @Nullable private transient Schema schema; |
| @Nullable private transient DataFileWriter<ElementT> reflectWriter; |
| @Nullable private transient DataFileWriter<GenericRecord> genericWriter; |
| |
| @Override |
| public void open(WritableByteChannel channel) throws IOException { |
| this.schema = new Schema.Parser().parse(getJsonSchema()); |
| DataFileWriter<?> writer; |
| if (getRecordFormatter() == null) { |
| writer = reflectWriter = new DataFileWriter<>(new ReflectDatumWriter<>(schema)); |
| } else { |
| writer = genericWriter = new DataFileWriter<>(new GenericDatumWriter<>(schema)); |
| } |
| writer.setCodec(getCodec().getCodec()); |
| for (Map.Entry<String, Object> entry : getMetadata().entrySet()) { |
| Object v = entry.getValue(); |
| if (v instanceof String) { |
| writer.setMeta(entry.getKey(), (String) v); |
| } else if (v instanceof Long) { |
| writer.setMeta(entry.getKey(), (Long) v); |
| } else if (v instanceof byte[]) { |
| writer.setMeta(entry.getKey(), (byte[]) v); |
| } else { |
| throw new IllegalStateException( |
| "Metadata value type must be one of String, Long, or byte[]. Found " |
| + v.getClass().getSimpleName()); |
| } |
| } |
| writer.create(schema, Channels.newOutputStream(channel)); |
| } |
| |
| @Override |
| public void write(ElementT element) throws IOException { |
| if (getRecordFormatter() == null) { |
| reflectWriter.append(element); |
| } else { |
| genericWriter.append(getRecordFormatter().formatRecord(element, schema)); |
| } |
| } |
| |
| @Override |
| public void flush() throws IOException { |
| MoreObjects.firstNonNull(reflectWriter, genericWriter).flush(); |
| } |
| } |
| |
| /** Disallow construction of utility class. */ |
| private AvroIO() {} |
| } |