blob: e7f47850a820161108c5e0c1e1c7dbe1b551b32b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io;
import static org.apache.beam.sdk.io.FileIO.ReadMatches.DirectoryTreatment;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
import static org.apache.commons.compress.utils.CharsetNames.UTF_8;
import com.google.auto.value.AutoValue;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
import org.apache.beam.sdk.io.FileIO.MatchConfiguration;
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Predicates;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.joda.time.Duration;
/**
* {@link PTransform}s for reading and writing text files.
*
* <h2>Reading text files</h2>
*
* <p>To read a {@link PCollection} from one or more text files, use {@code TextIO.read()} to
* instantiate a transform and use {@link TextIO.Read#from(String)} to specify the path of the
* file(s) to be read. Alternatively, if the filenames to be read are themselves in a {@link
* PCollection} you can use {@link FileIO} to match them and {@link TextIO#readFiles} to read them.
*
* <p>{@link #read} returns a {@link PCollection} of {@link String Strings}, each corresponding to
* one line of an input UTF-8 text file (split into lines delimited by '\n', '\r', or '\r\n', or
* specified delimiter see {@link TextIO.Read#withDelimiter}).
*
* <h3>Filepattern expansion and watching</h3>
*
* <p>By default, the filepatterns are expanded only once. {@link Read#watchForNewFiles} or the
* combination of {@link FileIO.Match#continuously(Duration, TerminationCondition)} and {@link
* #readFiles()} allow streaming of new files matching the filepattern(s).
*
* <p>By default, {@link #read} prohibits filepatterns that match no files, and {@link #readFiles()}
* allows them in case the filepattern contains a glob wildcard character. Use {@link
* Read#withEmptyMatchTreatment} or {@link
* FileIO.Match#withEmptyMatchTreatment(EmptyMatchTreatment)} plus {@link #readFiles()} to configure
* this behavior.
*
* <p>Example 1: reading a file or filepattern.
*
* <pre>{@code
* Pipeline p = ...;
*
* // A simple Read of a local file (only runs locally):
* PCollection<String> lines = p.apply(TextIO.read().from("/local/path/to/file.txt"));
* }</pre>
*
* <p>Example 2: reading a PCollection of filenames.
*
* <pre>{@code
* Pipeline p = ...;
*
* // E.g. the filenames might be computed from other data in the pipeline, or
* // read from a data source.
* PCollection<String> filenames = ...;
*
* // Read all files in the collection.
* PCollection<String> lines =
* filenames
* .apply(FileIO.matchAll())
* .apply(FileIO.readMatches())
* .apply(TextIO.readFiles());
* }</pre>
*
* <p>Example 3: streaming new files matching a filepattern.
*
* <pre>{@code
* Pipeline p = ...;
*
* PCollection<String> lines = p.apply(TextIO.read()
* .from("/local/path/to/files/*")
* .watchForNewFiles(
* // Check for new files every minute
* Duration.standardMinutes(1),
* // Stop watching the filepattern if no new files appear within an hour
* afterTimeSinceNewOutput(Duration.standardHours(1))));
* }</pre>
*
* <h3>Reading a very large number of files</h3>
*
* <p>If it is known that the filepattern will match a very large number of files (e.g. tens of
* thousands or more), use {@link Read#withHintMatchesManyFiles} for better performance and
* scalability. Note that it may decrease performance if the filepattern matches only a small number
* of files.
*
* <h2>Writing text files</h2>
*
* <p>To write a {@link PCollection} to one or more text files, use {@code TextIO.write()}, using
* {@link TextIO.Write#to(String)} to specify the output prefix of the files to write.
*
* <p>For example:
*
* <pre>{@code
* // A simple Write to a local file (only runs locally):
* PCollection<String> lines = ...;
* lines.apply(TextIO.write().to("/path/to/file.txt"));
*
* // Same as above, only with Gzip compression:
* PCollection<String> lines = ...;
* lines.apply(TextIO.write().to("/path/to/file.txt"))
* .withSuffix(".txt")
* .withCompression(Compression.GZIP));
* }</pre>
*
* <p>Any existing files with the same names as generated output files will be overwritten.
*
* <p>If you want better control over how filenames are generated than the default policy allows, a
* custom {@link FilenamePolicy} can also be set using {@link TextIO.Write#to(FilenamePolicy)}.
*
* <h3>Advanced features</h3>
*
* <p>{@link TextIO} supports all features of {@link FileIO#write} and {@link FileIO#writeDynamic},
* such as writing windowed/unbounded data, writing data to multiple destinations, and so on, by
* providing a {@link Sink} via {@link #sink()}.
*
* <p>For example, to write events of different type to different filenames:
*
* <pre>{@code
* PCollection<Event> events = ...;
* events.apply(FileIO.<EventType, Event>writeDynamic()
* .by(Event::getTypeName)
* .via(TextIO.sink(), Event::toString)
* .to(type -> nameFilesUsingWindowPaneAndShard(".../events/" + type + "/data", ".txt")));
* }</pre>
*
* <p>For backwards compatibility, {@link TextIO} also supports the legacy {@link
* DynamicDestinations} interface for advanced features via {@link Write#to(DynamicDestinations)}.
*/
public class TextIO {
private static final long DEFAULT_BUNDLE_SIZE_BYTES = 64 * 1024 * 1024L;
/**
* A {@link PTransform} that reads from one or more text files and returns a bounded {@link
* PCollection} containing one element for each line of the input files.
*/
public static Read read() {
return new AutoValue_TextIO_Read.Builder()
.setCompression(Compression.AUTO)
.setHintMatchesManyFiles(false)
.setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
.build();
}
/**
* A {@link PTransform} that works like {@link #read}, but reads each file in a {@link
* PCollection} of filepatterns.
*
* <p>Can be applied to both bounded and unbounded {@link PCollection PCollections}, so this is
* suitable for reading a {@link PCollection} of filepatterns arriving as a stream. However, every
* filepattern is expanded once at the moment it is processed, rather than watched for new files
* matching the filepattern to appear. Likewise, every file is read once, rather than watched for
* new entries.
*
* @deprecated You can achieve The functionality of {@link #readAll()} using {@link FileIO}
* matching plus {@link #readFiles()}. This is the preferred method to make composition
* explicit. {@link ReadAll} will not receive upgrades and will be removed in a future version
* of Beam.
*/
@Deprecated
public static ReadAll readAll() {
return new AutoValue_TextIO_ReadAll.Builder()
.setCompression(Compression.AUTO)
.setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
.build();
}
/**
* Like {@link #read}, but reads each file in a {@link PCollection} of {@link
* FileIO.ReadableFile}, returned by {@link FileIO#readMatches}.
*/
public static ReadFiles readFiles() {
return new AutoValue_TextIO_ReadFiles.Builder()
// 64MB is a reasonable value that allows to amortize the cost of opening files,
// but is not so large as to exhaust a typical runner's maximum amount of output per
// ProcessElement call.
.setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
.build();
}
/**
* A {@link PTransform} that writes a {@link PCollection} to a text file (or multiple text files
* matching a sharding pattern), with each element of the input collection encoded into its own
* line.
*/
public static Write write() {
return new TextIO.Write();
}
/**
* A {@link PTransform} that writes a {@link PCollection} to a text file (or multiple text files
* matching a sharding pattern), with each element of the input collection encoded into its own
* line.
*
* <p>This version allows you to apply {@link TextIO} writes to a PCollection of a custom type
* {@link UserT}. A format mechanism that converts the input type {@link UserT} to the String that
* will be written to the file must be specified. If using a custom {@link DynamicDestinations}
* object this is done using {@link DynamicDestinations#formatRecord}, otherwise the {@link
* TypedWrite#withFormatFunction} can be used to specify a format function.
*
* <p>The advantage of using a custom type is that is it allows a user-provided {@link
* DynamicDestinations} object, set via {@link Write#to(DynamicDestinations)} to examine the
* custom type when choosing a destination.
*/
public static <UserT> TypedWrite<UserT, Void> writeCustomType() {
return new AutoValue_TextIO_TypedWrite.Builder<UserT, Void>()
.setFilenamePrefix(null)
.setTempDirectory(null)
.setShardTemplate(null)
.setFilenameSuffix(null)
.setFilenamePolicy(null)
.setDynamicDestinations(null)
.setDelimiter(new char[] {'\n'})
.setWritableByteChannelFactory(FileBasedSink.CompressionType.UNCOMPRESSED)
.setWindowedWrites(false)
.setNumShards(0)
.setNoSpilling(false)
.build();
}
/** Implementation of {@link #read}. */
@AutoValue
public abstract static class Read extends PTransform<PBegin, PCollection<String>> {
@Nullable
abstract ValueProvider<String> getFilepattern();
abstract MatchConfiguration getMatchConfiguration();
abstract boolean getHintMatchesManyFiles();
abstract Compression getCompression();
@SuppressWarnings("mutable") // this returns an array that can be mutated by the caller
@Nullable
abstract byte[] getDelimiter();
abstract Builder toBuilder();
@AutoValue.Builder
abstract static class Builder {
abstract Builder setFilepattern(ValueProvider<String> filepattern);
abstract Builder setMatchConfiguration(MatchConfiguration matchConfiguration);
abstract Builder setHintMatchesManyFiles(boolean hintManyFiles);
abstract Builder setCompression(Compression compression);
abstract Builder setDelimiter(byte[] delimiter);
abstract Read build();
}
/**
* Reads text files that reads from the file(s) with the given filename or filename pattern.
*
* <p>This can be a local path (if running locally), or a Google Cloud Storage filename or
* filename pattern of the form {@code "gs://<bucket>/<filepath>"} (if running locally or using
* remote execution service).
*
* <p>Standard <a href="http://docs.oracle.com/javase/tutorial/essential/io/find.html" >Java
* Filesystem glob patterns</a> ("*", "?", "[..]") are supported.
*
* <p>If it is known that the filepattern will match a very large number of files (at least tens
* of thousands), use {@link #withHintMatchesManyFiles} for better performance and scalability.
*/
public Read from(String filepattern) {
checkArgument(filepattern != null, "filepattern can not be null");
return from(StaticValueProvider.of(filepattern));
}
/** Same as {@code from(filepattern)}, but accepting a {@link ValueProvider}. */
public Read from(ValueProvider<String> filepattern) {
checkArgument(filepattern != null, "filepattern can not be null");
return toBuilder().setFilepattern(filepattern).build();
}
/** Sets the {@link MatchConfiguration}. */
public Read withMatchConfiguration(MatchConfiguration matchConfiguration) {
return toBuilder().setMatchConfiguration(matchConfiguration).build();
}
/** @deprecated Use {@link #withCompression}. */
@Deprecated
public Read withCompressionType(TextIO.CompressionType compressionType) {
return withCompression(compressionType.canonical);
}
/**
* Reads from input sources using the specified compression type.
*
* <p>If no compression type is specified, the default is {@link Compression#AUTO}.
*/
public Read withCompression(Compression compression) {
return toBuilder().setCompression(compression).build();
}
/**
* See {@link MatchConfiguration#continuously}.
*
* <p>This works only in runners supporting {@link Kind#SPLITTABLE_DO_FN}.
*/
@Experimental(Kind.SPLITTABLE_DO_FN)
public Read watchForNewFiles(
Duration pollInterval, TerminationCondition<String, ?> terminationCondition) {
return withMatchConfiguration(
getMatchConfiguration().continuously(pollInterval, terminationCondition));
}
/**
* Hints that the filepattern specified in {@link #from(String)} matches a very large number of
* files.
*
* <p>This hint may cause a runner to execute the transform differently, in a way that improves
* performance for this case, but it may worsen performance if the filepattern matches only a
* small number of files (e.g., in a runner that supports dynamic work rebalancing, it will
* happen less efficiently within individual files).
*/
public Read withHintMatchesManyFiles() {
return toBuilder().setHintMatchesManyFiles(true).build();
}
/** See {@link MatchConfiguration#withEmptyMatchTreatment}. */
public Read withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
return withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment));
}
/** Set the custom delimiter to be used in place of the default ones ('\r', '\n' or '\r\n'). */
public Read withDelimiter(byte[] delimiter) {
checkArgument(delimiter != null, "delimiter can not be null");
checkArgument(!isSelfOverlapping(delimiter), "delimiter must not self-overlap");
return toBuilder().setDelimiter(delimiter).build();
}
static boolean isSelfOverlapping(byte[] s) {
// s self-overlaps if v exists such as s = vu = wv with u and w non empty
for (int i = 1; i < s.length - 1; ++i) {
if (ByteBuffer.wrap(s, 0, i).equals(ByteBuffer.wrap(s, s.length - i, i))) {
return true;
}
}
return false;
}
@Override
public PCollection<String> expand(PBegin input) {
checkNotNull(getFilepattern(), "need to set the filepattern of a TextIO.Read transform");
if (getMatchConfiguration().getWatchInterval() == null && !getHintMatchesManyFiles()) {
return input.apply("Read", org.apache.beam.sdk.io.Read.from(getSource()));
}
// All other cases go through FileIO + ReadFiles
return input
.apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
.apply("Match All", FileIO.matchAll().withConfiguration(getMatchConfiguration()))
.apply(
"Read Matches",
FileIO.readMatches()
.withCompression(getCompression())
.withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
.apply("Via ReadFiles", readFiles().withDelimiter(getDelimiter()));
}
// Helper to create a source specific to the requested compression type.
protected FileBasedSource<String> getSource() {
return CompressedSource.from(
new TextSource(
getFilepattern(),
getMatchConfiguration().getEmptyMatchTreatment(),
getDelimiter()))
.withCompression(getCompression());
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder
.add(
DisplayData.item("compressionType", getCompression().toString())
.withLabel("Compression Type"))
.addIfNotNull(DisplayData.item("filePattern", getFilepattern()).withLabel("File Pattern"))
.include("matchConfiguration", getMatchConfiguration())
.addIfNotNull(
DisplayData.item("delimiter", Arrays.toString(getDelimiter()))
.withLabel("Custom delimiter to split records"));
}
}
/////////////////////////////////////////////////////////////////////////////
/**
* Implementation of {@link #readAll}.
*
* @deprecated See {@link #readAll()} for details.
*/
@Deprecated
@AutoValue
public abstract static class ReadAll
extends PTransform<PCollection<String>, PCollection<String>> {
abstract MatchConfiguration getMatchConfiguration();
abstract Compression getCompression();
@SuppressWarnings("mutable") // this returns an array that can be mutated by the caller
@Nullable
abstract byte[] getDelimiter();
abstract Builder toBuilder();
@AutoValue.Builder
abstract static class Builder {
abstract Builder setMatchConfiguration(MatchConfiguration matchConfiguration);
abstract Builder setCompression(Compression compression);
abstract Builder setDelimiter(byte[] delimiter);
abstract ReadAll build();
}
/** Sets the {@link MatchConfiguration}. */
public ReadAll withMatchConfiguration(MatchConfiguration configuration) {
return toBuilder().setMatchConfiguration(configuration).build();
}
/** @deprecated Use {@link #withCompression}. */
@Deprecated
public ReadAll withCompressionType(TextIO.CompressionType compressionType) {
return withCompression(compressionType.canonical);
}
/**
* Reads from input sources using the specified compression type.
*
* <p>If no compression type is specified, the default is {@link Compression#AUTO}.
*/
public ReadAll withCompression(Compression compression) {
return toBuilder().setCompression(compression).build();
}
/** Same as {@link Read#withEmptyMatchTreatment}. */
public ReadAll withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
return withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment));
}
/** Same as {@link Read#watchForNewFiles(Duration, TerminationCondition)}. */
@Experimental(Kind.SPLITTABLE_DO_FN)
public ReadAll watchForNewFiles(
Duration pollInterval, TerminationCondition<String, ?> terminationCondition) {
return withMatchConfiguration(
getMatchConfiguration().continuously(pollInterval, terminationCondition));
}
ReadAll withDelimiter(byte[] delimiter) {
return toBuilder().setDelimiter(delimiter).build();
}
@Override
public PCollection<String> expand(PCollection<String> input) {
return input
.apply(FileIO.matchAll().withConfiguration(getMatchConfiguration()))
.apply(
FileIO.readMatches()
.withCompression(getCompression())
.withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
.apply(readFiles().withDelimiter(getDelimiter()));
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder
.add(
DisplayData.item("compressionType", getCompression().toString())
.withLabel("Compression Type"))
.addIfNotNull(
DisplayData.item("delimiter", Arrays.toString(getDelimiter()))
.withLabel("Custom delimiter to split records"))
.include("matchConfiguration", getMatchConfiguration());
}
}
/** Implementation of {@link #readFiles}. */
@AutoValue
public abstract static class ReadFiles
extends PTransform<PCollection<FileIO.ReadableFile>, PCollection<String>> {
abstract long getDesiredBundleSizeBytes();
@SuppressWarnings("mutable") // this returns an array that can be mutated by the caller
@Nullable
abstract byte[] getDelimiter();
abstract Builder toBuilder();
@AutoValue.Builder
abstract static class Builder {
abstract Builder setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
abstract Builder setDelimiter(byte[] delimiter);
abstract ReadFiles build();
}
@VisibleForTesting
ReadFiles withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
}
/** Like {@link Read#withDelimiter}. */
public ReadFiles withDelimiter(byte[] delimiter) {
return toBuilder().setDelimiter(delimiter).build();
}
@Override
public PCollection<String> expand(PCollection<FileIO.ReadableFile> input) {
return input.apply(
"Read all via FileBasedSource",
new ReadAllViaFileBasedSource<>(
getDesiredBundleSizeBytes(),
new CreateTextSourceFn(getDelimiter()),
StringUtf8Coder.of()));
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder.addIfNotNull(
DisplayData.item("delimiter", Arrays.toString(getDelimiter()))
.withLabel("Custom delimiter to split records"));
}
private static class CreateTextSourceFn
implements SerializableFunction<String, FileBasedSource<String>> {
private byte[] delimiter;
private CreateTextSourceFn(byte[] delimiter) {
this.delimiter = delimiter;
}
@Override
public FileBasedSource<String> apply(String input) {
return new TextSource(
StaticValueProvider.of(input), EmptyMatchTreatment.DISALLOW, delimiter);
}
}
}
// ///////////////////////////////////////////////////////////////////////////
/** Implementation of {@link #write}. */
@AutoValue
public abstract static class TypedWrite<UserT, DestinationT>
extends PTransform<PCollection<UserT>, WriteFilesResult<DestinationT>> {
/** The prefix of each file written, combined with suffix and shardTemplate. */
@Nullable
abstract ValueProvider<ResourceId> getFilenamePrefix();
/** The suffix of each file written, combined with prefix and shardTemplate. */
@Nullable
abstract String getFilenameSuffix();
/** The base directory used for generating temporary files. */
@Nullable
abstract ValueProvider<ResourceId> getTempDirectory();
/** The delimiter between string records. */
@SuppressWarnings("mutable") // this returns an array that can be mutated by the caller
abstract char[] getDelimiter();
/** An optional header to add to each file. */
@Nullable
abstract String getHeader();
/** An optional footer to add to each file. */
@Nullable
abstract String getFooter();
/** Requested number of shards. 0 for automatic. */
abstract int getNumShards();
/** The shard template of each file written, combined with prefix and suffix. */
@Nullable
abstract String getShardTemplate();
/** A policy for naming output files. */
@Nullable
abstract FilenamePolicy getFilenamePolicy();
/** Allows for value-dependent {@link DynamicDestinations} to be vended. */
@Nullable
abstract DynamicDestinations<UserT, DestinationT, String> getDynamicDestinations();
/** A destination function for using {@link DefaultFilenamePolicy}. */
@Nullable
abstract SerializableFunction<UserT, Params> getDestinationFunction();
/** A default destination for empty PCollections. */
@Nullable
abstract Params getEmptyDestination();
/** A function that converts UserT to a String, for writing to the file. */
@Nullable
abstract SerializableFunction<UserT, String> getFormatFunction();
/** Whether to write windowed output files. */
abstract boolean getWindowedWrites();
/** Whether to skip the spilling of data caused by having maxNumWritersPerBundle. */
abstract boolean getNoSpilling();
/**
* The {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink}. Default is
* {@link FileBasedSink.CompressionType#UNCOMPRESSED}.
*/
abstract WritableByteChannelFactory getWritableByteChannelFactory();
abstract Builder<UserT, DestinationT> toBuilder();
@AutoValue.Builder
abstract static class Builder<UserT, DestinationT> {
abstract Builder<UserT, DestinationT> setFilenamePrefix(
@Nullable ValueProvider<ResourceId> filenamePrefix);
abstract Builder<UserT, DestinationT> setTempDirectory(
@Nullable ValueProvider<ResourceId> tempDirectory);
abstract Builder<UserT, DestinationT> setShardTemplate(@Nullable String shardTemplate);
abstract Builder<UserT, DestinationT> setFilenameSuffix(@Nullable String filenameSuffix);
abstract Builder<UserT, DestinationT> setHeader(@Nullable String header);
abstract Builder<UserT, DestinationT> setFooter(@Nullable String footer);
abstract Builder<UserT, DestinationT> setDelimiter(char[] delimiter);
abstract Builder<UserT, DestinationT> setFilenamePolicy(
@Nullable FilenamePolicy filenamePolicy);
abstract Builder<UserT, DestinationT> setDynamicDestinations(
@Nullable DynamicDestinations<UserT, DestinationT, String> dynamicDestinations);
abstract Builder<UserT, DestinationT> setDestinationFunction(
@Nullable SerializableFunction<UserT, Params> destinationFunction);
abstract Builder<UserT, DestinationT> setEmptyDestination(Params emptyDestination);
abstract Builder<UserT, DestinationT> setFormatFunction(
@Nullable SerializableFunction<UserT, String> formatFunction);
abstract Builder<UserT, DestinationT> setNumShards(int numShards);
abstract Builder<UserT, DestinationT> setWindowedWrites(boolean windowedWrites);
abstract Builder<UserT, DestinationT> setNoSpilling(boolean noSpilling);
abstract Builder<UserT, DestinationT> setWritableByteChannelFactory(
WritableByteChannelFactory writableByteChannelFactory);
abstract TypedWrite<UserT, DestinationT> build();
}
/**
* Writes to text files with the given prefix. The given {@code prefix} can reference any {@link
* FileSystem} on the classpath. This prefix is used by the {@link DefaultFilenamePolicy} to
* generate filenames.
*
* <p>By default, a {@link DefaultFilenamePolicy} will be used built using the specified prefix
* to define the base output directory and file prefix, a shard identifier (see {@link
* #withNumShards(int)}), and a common suffix (if supplied using {@link #withSuffix(String)}).
*
* <p>This default policy can be overridden using {@link #to(FilenamePolicy)}, in which case
* {@link #withShardNameTemplate(String)} and {@link #withSuffix(String)} should not be set.
* Custom filename policies do not automatically see this prefix - you should explicitly pass
* the prefix into your {@link FilenamePolicy} object if you need this.
*
* <p>If {@link #withTempDirectory} has not been called, this filename prefix will be used to
* infer a directory for temporary files.
*/
public TypedWrite<UserT, DestinationT> to(String filenamePrefix) {
return to(FileBasedSink.convertToFileResourceIfPossible(filenamePrefix));
}
/** Like {@link #to(String)}. */
@Experimental(Kind.FILESYSTEM)
public TypedWrite<UserT, DestinationT> to(ResourceId filenamePrefix) {
return toResource(StaticValueProvider.of(filenamePrefix));
}
/** Like {@link #to(String)}. */
public TypedWrite<UserT, DestinationT> to(ValueProvider<String> outputPrefix) {
return toResource(
NestedValueProvider.of(outputPrefix, FileBasedSink::convertToFileResourceIfPossible));
}
/**
* Writes to files named according to the given {@link FileBasedSink.FilenamePolicy}. A
* directory for temporary files must be specified using {@link #withTempDirectory}.
*/
public TypedWrite<UserT, DestinationT> to(FilenamePolicy filenamePolicy) {
return toBuilder().setFilenamePolicy(filenamePolicy).build();
}
/**
* Use a {@link DynamicDestinations} object to vend {@link FilenamePolicy} objects. These
* objects can examine the input record when creating a {@link FilenamePolicy}. A directory for
* temporary files must be specified using {@link #withTempDirectory}.
*
* @deprecated Use {@link FileIO#write()} or {@link FileIO#writeDynamic()} with {@link #sink()}
* instead.
*/
@Deprecated
public <NewDestinationT> TypedWrite<UserT, NewDestinationT> to(
DynamicDestinations<UserT, NewDestinationT, String> dynamicDestinations) {
return (TypedWrite)
toBuilder().setDynamicDestinations((DynamicDestinations) dynamicDestinations).build();
}
/**
* Write to dynamic destinations using the default filename policy. The destinationFunction maps
* the input record to a {@link DefaultFilenamePolicy.Params} object that specifies where the
* records should be written (base filename, file suffix, and shard template). The
* emptyDestination parameter specified where empty files should be written for when the written
* {@link PCollection} is empty.
*
* @deprecated Use {@link FileIO#write()} or {@link FileIO#writeDynamic()} with {@link #sink()}
* instead.
*/
@Deprecated
public TypedWrite<UserT, Params> to(
SerializableFunction<UserT, Params> destinationFunction, Params emptyDestination) {
return (TypedWrite)
toBuilder()
.setDestinationFunction(destinationFunction)
.setEmptyDestination(emptyDestination)
.build();
}
/** Like {@link #to(ResourceId)}. */
@Experimental(Kind.FILESYSTEM)
public TypedWrite<UserT, DestinationT> toResource(ValueProvider<ResourceId> filenamePrefix) {
return toBuilder().setFilenamePrefix(filenamePrefix).build();
}
/**
* Specifies a format function to convert {@link UserT} to the output type. If {@link
* #to(DynamicDestinations)} is used, {@link DynamicDestinations#formatRecord(Object)} must be
* used instead.
*
* @deprecated Use {@link FileIO#write()} or {@link FileIO#writeDynamic()} with {@link #sink()}
* instead.
*/
@Deprecated
public TypedWrite<UserT, DestinationT> withFormatFunction(
@Nullable SerializableFunction<UserT, String> formatFunction) {
return toBuilder().setFormatFunction(formatFunction).build();
}
/** Set the base directory used to generate temporary files. */
@Experimental(Kind.FILESYSTEM)
public TypedWrite<UserT, DestinationT> withTempDirectory(
ValueProvider<ResourceId> tempDirectory) {
return toBuilder().setTempDirectory(tempDirectory).build();
}
/** Set the base directory used to generate temporary files. */
@Experimental(Kind.FILESYSTEM)
public TypedWrite<UserT, DestinationT> withTempDirectory(ResourceId tempDirectory) {
return withTempDirectory(StaticValueProvider.of(tempDirectory));
}
/**
* Uses the given {@link ShardNameTemplate} for naming output files. This option may only be
* used when using one of the default filename-prefix to() overrides - i.e. not when using
* either {@link #to(FilenamePolicy)} or {@link #to(DynamicDestinations)}.
*
* <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are
* used.
*/
public TypedWrite<UserT, DestinationT> withShardNameTemplate(String shardTemplate) {
return toBuilder().setShardTemplate(shardTemplate).build();
}
/**
* Configures the filename suffix for written files. This option may only be used when using one
* of the default filename-prefix to() overrides - i.e. not when using either {@link
* #to(FilenamePolicy)} or {@link #to(DynamicDestinations)}.
*
* <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are
* used.
*/
public TypedWrite<UserT, DestinationT> withSuffix(String filenameSuffix) {
return toBuilder().setFilenameSuffix(filenameSuffix).build();
}
/**
* Configures the number of output shards produced overall (when using unwindowed writes) or
* per-window (when using windowed writes).
*
* <p>For unwindowed writes, constraining the number of shards is likely to reduce the
* performance of a pipeline. Setting this value is not recommended unless you require a
* specific number of output files.
*
* @param numShards the number of shards to use, or 0 to let the system decide.
*/
public TypedWrite<UserT, DestinationT> withNumShards(int numShards) {
checkArgument(numShards >= 0);
return toBuilder().setNumShards(numShards).build();
}
/**
* Forces a single file as output and empty shard name template.
*
* <p>For unwindowed writes, constraining the number of shards is likely to reduce the
* performance of a pipeline. Setting this value is not recommended unless you require a
* specific number of output files.
*
* <p>This is equivalent to {@code .withNumShards(1).withShardNameTemplate("")}
*/
public TypedWrite<UserT, DestinationT> withoutSharding() {
return withNumShards(1).withShardNameTemplate("");
}
/**
* Specifies the delimiter after each string written.
*
* <p>Defaults to '\n'.
*/
public TypedWrite<UserT, DestinationT> withDelimiter(char[] delimiter) {
return toBuilder().setDelimiter(delimiter).build();
}
/**
* Adds a header string to each file. A newline after the header is added automatically.
*
* <p>A {@code null} value will clear any previously configured header.
*/
public TypedWrite<UserT, DestinationT> withHeader(@Nullable String header) {
return toBuilder().setHeader(header).build();
}
/**
* Adds a footer string to each file. A newline after the footer is added automatically.
*
* <p>A {@code null} value will clear any previously configured footer.
*/
public TypedWrite<UserT, DestinationT> withFooter(@Nullable String footer) {
return toBuilder().setFooter(footer).build();
}
/**
* Returns a transform for writing to text files like this one but that has the given {@link
* WritableByteChannelFactory} to be used by the {@link FileBasedSink} during output. The
* default is value is {@link Compression#UNCOMPRESSED}.
*
* <p>A {@code null} value will reset the value to the default value mentioned above.
*/
public TypedWrite<UserT, DestinationT> withWritableByteChannelFactory(
WritableByteChannelFactory writableByteChannelFactory) {
return toBuilder().setWritableByteChannelFactory(writableByteChannelFactory).build();
}
/**
* Returns a transform for writing to text files like this one but that compresses output using
* the given {@link Compression}. The default value is {@link Compression#UNCOMPRESSED}.
*/
public TypedWrite<UserT, DestinationT> withCompression(Compression compression) {
checkArgument(compression != null, "compression can not be null");
return withWritableByteChannelFactory(
FileBasedSink.CompressionType.fromCanonical(compression));
}
/**
* Preserves windowing of input elements and writes them to files based on the element's window.
*
* <p>If using {@link #to(FileBasedSink.FilenamePolicy)}. Filenames will be generated using
* {@link FilenamePolicy#windowedFilename}. See also {@link WriteFiles#withWindowedWrites()}.
*/
public TypedWrite<UserT, DestinationT> withWindowedWrites() {
return toBuilder().setWindowedWrites(true).build();
}
/** See {@link WriteFiles#withNoSpilling()}. */
public TypedWrite<UserT, DestinationT> withNoSpilling() {
return toBuilder().setNoSpilling(true).build();
}
private DynamicDestinations<UserT, DestinationT, String> resolveDynamicDestinations() {
DynamicDestinations<UserT, DestinationT, String> dynamicDestinations =
getDynamicDestinations();
if (dynamicDestinations == null) {
if (getDestinationFunction() != null) {
// In this case, DestinationT == Params
dynamicDestinations =
(DynamicDestinations)
DynamicFileDestinations.toDefaultPolicies(
getDestinationFunction(), getEmptyDestination(), getFormatFunction());
} else {
// In this case, DestinationT == Void
FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
if (usedFilenamePolicy == null) {
usedFilenamePolicy =
DefaultFilenamePolicy.fromStandardParameters(
getFilenamePrefix(),
getShardTemplate(),
getFilenameSuffix(),
getWindowedWrites());
}
dynamicDestinations =
(DynamicDestinations)
DynamicFileDestinations.constant(usedFilenamePolicy, getFormatFunction());
}
}
return dynamicDestinations;
}
@Override
public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) {
checkState(
getFilenamePrefix() != null || getTempDirectory() != null,
"Need to set either the filename prefix or the tempDirectory of a TextIO.Write "
+ "transform.");
List<?> allToArgs =
Lists.newArrayList(
getFilenamePolicy(),
getDynamicDestinations(),
getFilenamePrefix(),
getDestinationFunction());
checkArgument(
1
== Iterables.size(
allToArgs.stream()
.filter(Predicates.notNull()::apply)
.collect(Collectors.toList())),
"Exactly one of filename policy, dynamic destinations, filename prefix, or destination "
+ "function must be set");
if (getDynamicDestinations() != null) {
checkArgument(
getFormatFunction() == null,
"A format function should not be specified "
+ "with DynamicDestinations. Use DynamicDestinations.formatRecord instead");
}
if (getFilenamePolicy() != null || getDynamicDestinations() != null) {
checkState(
getShardTemplate() == null && getFilenameSuffix() == null,
"shardTemplate and filenameSuffix should only be used with the default "
+ "filename policy");
}
ValueProvider<ResourceId> tempDirectory = getTempDirectory();
if (tempDirectory == null) {
tempDirectory = getFilenamePrefix();
}
WriteFiles<UserT, DestinationT, String> write =
WriteFiles.to(
new TextSink<>(
tempDirectory,
resolveDynamicDestinations(),
getDelimiter(),
getHeader(),
getFooter(),
getWritableByteChannelFactory()));
if (getNumShards() > 0) {
write = write.withNumShards(getNumShards());
}
if (getWindowedWrites()) {
write = write.withWindowedWrites();
}
if (getNoSpilling()) {
write = write.withNoSpilling();
}
return input.apply("WriteFiles", write);
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
resolveDynamicDestinations().populateDisplayData(builder);
builder
.addIfNotDefault(
DisplayData.item("numShards", getNumShards()).withLabel("Maximum Output Shards"), 0)
.addIfNotNull(
DisplayData.item("tempDirectory", getTempDirectory())
.withLabel("Directory for temporary files"))
.addIfNotNull(DisplayData.item("fileHeader", getHeader()).withLabel("File Header"))
.addIfNotNull(DisplayData.item("fileFooter", getFooter()).withLabel("File Footer"))
.add(
DisplayData.item(
"writableByteChannelFactory", getWritableByteChannelFactory().toString())
.withLabel("Compression/Transformation Type"));
}
}
/**
* This class is used as the default return value of {@link TextIO#write()}.
*
* <p>All methods in this class delegate to the appropriate method of {@link TextIO.TypedWrite}.
* This class exists for backwards compatibility, and will be removed in Beam 3.0.
*/
public static class Write extends PTransform<PCollection<String>, PDone> {
@VisibleForTesting TypedWrite<String, ?> inner;
Write() {
this(TextIO.writeCustomType());
}
Write(TypedWrite<String, ?> inner) {
this.inner = inner;
}
/** See {@link TypedWrite#to(String)}. */
public Write to(String filenamePrefix) {
return new Write(
inner.to(filenamePrefix).withFormatFunction(SerializableFunctions.identity()));
}
/** See {@link TypedWrite#to(ResourceId)}. */
@Experimental(Kind.FILESYSTEM)
public Write to(ResourceId filenamePrefix) {
return new Write(
inner.to(filenamePrefix).withFormatFunction(SerializableFunctions.identity()));
}
/** See {@link TypedWrite#to(ValueProvider)}. */
public Write to(ValueProvider<String> outputPrefix) {
return new Write(inner.to(outputPrefix).withFormatFunction(SerializableFunctions.identity()));
}
/** See {@link TypedWrite#toResource(ValueProvider)}. */
@Experimental(Kind.FILESYSTEM)
public Write toResource(ValueProvider<ResourceId> filenamePrefix) {
return new Write(
inner.toResource(filenamePrefix).withFormatFunction(SerializableFunctions.identity()));
}
/** See {@link TypedWrite#to(FilenamePolicy)}. */
@Experimental(Kind.FILESYSTEM)
public Write to(FilenamePolicy filenamePolicy) {
return new Write(
inner.to(filenamePolicy).withFormatFunction(SerializableFunctions.identity()));
}
/**
* See {@link TypedWrite#to(DynamicDestinations)}.
*
* @deprecated Use {@link FileIO#write()} or {@link FileIO#writeDynamic()} ()} with {@link
* #sink()} instead.
*/
@Experimental(Kind.FILESYSTEM)
@Deprecated
public Write to(DynamicDestinations<String, ?, String> dynamicDestinations) {
return new Write(
inner.to((DynamicDestinations) dynamicDestinations).withFormatFunction(null));
}
/**
* See {@link TypedWrite#to(SerializableFunction, Params)}.
*
* @deprecated Use {@link FileIO#write()} or {@link FileIO#writeDynamic()} ()} with {@link
* #sink()} instead.
*/
@Experimental(Kind.FILESYSTEM)
@Deprecated
public Write to(
SerializableFunction<String, Params> destinationFunction, Params emptyDestination) {
return new Write(
inner
.to(destinationFunction, emptyDestination)
.withFormatFunction(SerializableFunctions.identity()));
}
/** See {@link TypedWrite#withTempDirectory(ValueProvider)}. */
@Experimental(Kind.FILESYSTEM)
public Write withTempDirectory(ValueProvider<ResourceId> tempDirectory) {
return new Write(inner.withTempDirectory(tempDirectory));
}
/** See {@link TypedWrite#withTempDirectory(ResourceId)}. */
@Experimental(Kind.FILESYSTEM)
public Write withTempDirectory(ResourceId tempDirectory) {
return new Write(inner.withTempDirectory(tempDirectory));
}
/** See {@link TypedWrite#withShardNameTemplate(String)}. */
public Write withShardNameTemplate(String shardTemplate) {
return new Write(inner.withShardNameTemplate(shardTemplate));
}
/** See {@link TypedWrite#withSuffix(String)}. */
public Write withSuffix(String filenameSuffix) {
return new Write(inner.withSuffix(filenameSuffix));
}
/** See {@link TypedWrite#withNumShards(int)}. */
public Write withNumShards(int numShards) {
return new Write(inner.withNumShards(numShards));
}
/** See {@link TypedWrite#withoutSharding()}. */
public Write withoutSharding() {
return new Write(inner.withoutSharding());
}
/** See {@link TypedWrite#withDelimiter(char[])}. */
public Write withDelimiter(char[] delimiter) {
return new Write(inner.withDelimiter(delimiter));
}
/** See {@link TypedWrite#withHeader(String)}. */
public Write withHeader(@Nullable String header) {
return new Write(inner.withHeader(header));
}
/** See {@link TypedWrite#withFooter(String)}. */
public Write withFooter(@Nullable String footer) {
return new Write(inner.withFooter(footer));
}
/** See {@link TypedWrite#withWritableByteChannelFactory(WritableByteChannelFactory)}. */
public Write withWritableByteChannelFactory(
WritableByteChannelFactory writableByteChannelFactory) {
return new Write(inner.withWritableByteChannelFactory(writableByteChannelFactory));
}
/** See {@link TypedWrite#withCompression(Compression)}. */
public Write withCompression(Compression compression) {
return new Write(inner.withCompression(compression));
}
/** See {@link TypedWrite#withWindowedWrites}. */
public Write withWindowedWrites() {
return new Write(inner.withWindowedWrites());
}
/** See {@link TypedWrite#withNoSpilling}. */
public Write withNoSpilling() {
return new Write(inner.withNoSpilling());
}
/**
* Specify that output filenames are wanted.
*
* <p>The nested {@link TypedWrite}transform always has access to output filenames, however due
* to backwards-compatibility concerns, {@link Write} cannot return them. This method simply
* returns the inner {@link TypedWrite} transform which has {@link WriteFilesResult} as its
* output type, allowing access to output files.
*
* <p>The supplied {@code DestinationT} type must be: the same as that supplied in {@link
* #to(DynamicDestinations)} if that method was used; {@link Params} if {@link
* #to(SerializableFunction, Params)} was used, or {@code Void} otherwise.
*/
public <DestinationT> TypedWrite<String, DestinationT> withOutputFilenames() {
return (TypedWrite) inner;
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
inner.populateDisplayData(builder);
}
@Override
public PDone expand(PCollection<String> input) {
inner.expand(input);
return PDone.in(input.getPipeline());
}
}
/** @deprecated Use {@link Compression}. */
@Deprecated
public enum CompressionType {
/** @see Compression#AUTO */
AUTO(Compression.AUTO),
/** @see Compression#UNCOMPRESSED */
UNCOMPRESSED(Compression.UNCOMPRESSED),
/** @see Compression#GZIP */
GZIP(Compression.GZIP),
/** @see Compression#BZIP2 */
BZIP2(Compression.BZIP2),
/** @see Compression#ZIP */
ZIP(Compression.ZIP),
/** @see Compression#ZSTD */
ZSTD(Compression.ZSTD),
/** @see Compression#DEFLATE */
DEFLATE(Compression.DEFLATE);
private final Compression canonical;
CompressionType(Compression canonical) {
this.canonical = canonical;
}
/** @see Compression#matches */
public boolean matches(String filename) {
return canonical.matches(filename);
}
}
//////////////////////////////////////////////////////////////////////////////
/**
* Creates a {@link Sink} that writes newline-delimited strings in UTF-8, for use with {@link
* FileIO#write}.
*/
public static Sink sink() {
return new AutoValue_TextIO_Sink.Builder().build();
}
/** Implementation of {@link #sink}. */
@AutoValue
public abstract static class Sink implements FileIO.Sink<String> {
@Nullable
abstract String getHeader();
@Nullable
abstract String getFooter();
abstract Builder toBuilder();
@AutoValue.Builder
abstract static class Builder {
abstract Builder setHeader(String header);
abstract Builder setFooter(String footer);
abstract Sink build();
}
public Sink withHeader(String header) {
checkArgument(header != null, "header can not be null");
return toBuilder().setHeader(header).build();
}
public Sink withFooter(String footer) {
checkArgument(footer != null, "footer can not be null");
return toBuilder().setFooter(footer).build();
}
@Nullable private transient PrintWriter writer;
@Override
public void open(WritableByteChannel channel) throws IOException {
writer =
new PrintWriter(
new BufferedWriter(new OutputStreamWriter(Channels.newOutputStream(channel), UTF_8)));
if (getHeader() != null) {
writer.println(getHeader());
}
}
@Override
public void write(String element) throws IOException {
writer.println(element);
}
@Override
public void flush() throws IOException {
if (getFooter() != null) {
writer.println(getFooter());
}
// BEAM-7813: don't close writer here
writer.flush();
}
}
/** Disable construction of utility class. */
private TextIO() {}
}