blob: 34e240dbfd88b94151e6f57daa37dd27b54a0c33 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects.firstNonNull;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.text.DecimalFormat;
import java.util.Arrays;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Objects;
/**
* A default {@link FilenamePolicy} for windowed and unwindowed files. This policy is constructed
* using three parameters that together define the output name of a sharded file, in conjunction
* with the number of shards, index of the particular file, current window and pane information,
* using {@link #constructName}.
*
* <p>Most users will use this {@link DefaultFilenamePolicy}. For more advanced uses in generating
* different files for each window and other sharding controls, see the {@code
* WriteOneFilePerWindow} example pipeline.
*/
public final class DefaultFilenamePolicy extends FilenamePolicy {
/** The default sharding name template. */
public static final String DEFAULT_UNWINDOWED_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
/**
* The default windowed sharding name template used when writing windowed files. This is used as
* default in cases when user did not specify shard template to be used and there is a need to
* write windowed files. In cases when user does specify shard template to be used then provided
* template will be used for both windowed and non-windowed file names.
*/
public static final String DEFAULT_WINDOWED_SHARD_TEMPLATE =
"W-P" + DEFAULT_UNWINDOWED_SHARD_TEMPLATE;
/*
* pattern for both windowed and non-windowed file names.
*/
private static final Pattern SHARD_FORMAT_RE = Pattern.compile("(S+|N+|W|P)");
/**
* Encapsulates constructor parameters to {@link DefaultFilenamePolicy}.
*
* <p>This is used as the {@code DestinationT} argument to allow {@link DefaultFilenamePolicy}
* objects to be dynamically generated.
*/
public static class Params implements Serializable {
@Nullable private final ValueProvider<ResourceId> baseFilename;
private final String shardTemplate;
private final boolean explicitTemplate;
private final String suffix;
/**
* Construct a default Params object. The shard template will be set to the default {@link
* #DEFAULT_UNWINDOWED_SHARD_TEMPLATE} value.
*/
public Params() {
this.baseFilename = null;
this.shardTemplate = DEFAULT_UNWINDOWED_SHARD_TEMPLATE;
this.suffix = "";
this.explicitTemplate = false;
}
private Params(
ValueProvider<ResourceId> baseFilename,
String shardTemplate,
String suffix,
boolean explicitTemplate) {
this.baseFilename = baseFilename;
this.shardTemplate = shardTemplate;
this.suffix = suffix;
this.explicitTemplate = explicitTemplate;
}
/**
* Specify that writes are windowed. This affects the default shard template, changing it to
* {@link #DEFAULT_WINDOWED_SHARD_TEMPLATE}.
*/
public Params withWindowedWrites() {
String template = this.shardTemplate;
if (!explicitTemplate) {
template = DEFAULT_WINDOWED_SHARD_TEMPLATE;
}
return new Params(baseFilename, template, suffix, explicitTemplate);
}
/** Sets the base filename. */
public Params withBaseFilename(ResourceId baseFilename) {
return withBaseFilename(StaticValueProvider.of(baseFilename));
}
/** Like {@link #withBaseFilename(ResourceId)}, but takes in a {@link ValueProvider}. */
public Params withBaseFilename(ValueProvider<ResourceId> baseFilename) {
return new Params(baseFilename, shardTemplate, suffix, explicitTemplate);
}
/** Sets the shard template. */
public Params withShardTemplate(String shardTemplate) {
return new Params(baseFilename, shardTemplate, suffix, true);
}
/** Sets the suffix. */
public Params withSuffix(String suffix) {
return new Params(baseFilename, shardTemplate, suffix, explicitTemplate);
}
@Override
public int hashCode() {
return Objects.hashCode(baseFilename.get(), shardTemplate, suffix);
}
@Override
public boolean equals(Object o) {
if (!(o instanceof Params)) {
return false;
}
Params other = (Params) o;
return baseFilename.get().equals(other.baseFilename.get())
&& shardTemplate.equals(other.shardTemplate)
&& suffix.equals(other.suffix);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("baseFilename", baseFilename)
.add("shardTemplate", shardTemplate)
.add("suffix", suffix)
.toString();
}
}
/** A Coder for {@link Params}. */
public static class ParamsCoder extends AtomicCoder<Params> {
private static final ParamsCoder INSTANCE = new ParamsCoder();
private Coder<String> stringCoder = StringUtf8Coder.of();
public static ParamsCoder of() {
return INSTANCE;
}
@Override
public void encode(Params value, OutputStream outStream) throws IOException {
if (value == null) {
throw new CoderException("cannot encode a null value");
}
stringCoder.encode(value.baseFilename.get().toString(), outStream);
stringCoder.encode(value.shardTemplate, outStream);
stringCoder.encode(value.suffix, outStream);
}
@Override
public Params decode(InputStream inStream) throws IOException {
ResourceId prefix =
FileBasedSink.convertToFileResourceIfPossible(stringCoder.decode(inStream));
String shardTemplate = stringCoder.decode(inStream);
String suffix = stringCoder.decode(inStream);
return new Params()
.withBaseFilename(prefix)
.withShardTemplate(shardTemplate)
.withSuffix(suffix);
}
}
private final Params params;
/**
* Constructs a new {@link DefaultFilenamePolicy}.
*
* @see DefaultFilenamePolicy for more information on the arguments to this function.
*/
@VisibleForTesting
DefaultFilenamePolicy(Params params) {
this.params = params;
}
/**
* Construct a {@link DefaultFilenamePolicy}.
*
* <p>This is a shortcut for:
*
* <pre>{@code
* DefaultFilenamePolicy.fromParams(new Params()
* .withBaseFilename(baseFilename)
* .withShardTemplate(shardTemplate)
* .withSuffix(filenameSuffix)
* .withWindowedWrites())
* }</pre>
*
* <p>Where the respective {@code with} methods are invoked only if the value is non-null or true.
*/
public static DefaultFilenamePolicy fromStandardParameters(
ValueProvider<ResourceId> baseFilename,
@Nullable String shardTemplate,
@Nullable String filenameSuffix,
boolean windowedWrites) {
Params params = new Params().withBaseFilename(baseFilename);
if (shardTemplate != null) {
params = params.withShardTemplate(shardTemplate);
}
if (filenameSuffix != null) {
params = params.withSuffix(filenameSuffix);
}
if (windowedWrites) {
params = params.withWindowedWrites();
}
return fromParams(params);
}
/** Construct a {@link DefaultFilenamePolicy} from a {@link Params} object. */
public static DefaultFilenamePolicy fromParams(Params params) {
return new DefaultFilenamePolicy(params);
}
/**
* Constructs a fully qualified name from components.
*
* <p>The name is built from a base filename, shard template (with shard numbers applied), and a
* suffix. All components are required, but may be empty strings.
*
* <p>Within a shard template, repeating sequences of the letters "S" or "N" are replaced with the
* shard number, or number of shards respectively. "P" is replaced with by stringification of
* current pane. "W" is replaced by stringification of current window.
*
* <p>The numbers are formatted with leading zeros to match the length of the repeated sequence of
* letters.
*
* <p>For example, if baseFilename = "path/to/output", shardTemplate = "-SSS-of-NNN", and suffix =
* ".txt", with shardNum = 1 and numShards = 100, the following is produced:
* "path/to/output-001-of-100.txt".
*/
static ResourceId constructName(
ResourceId baseFilename,
String shardTemplate,
String suffix,
int shardNum,
int numShards,
@Nullable String paneStr,
@Nullable String windowStr) {
String prefix = extractFilename(baseFilename);
// Matcher API works with StringBuffer, rather than StringBuilder.
StringBuffer sb = new StringBuffer();
sb.append(prefix);
Matcher m = SHARD_FORMAT_RE.matcher(shardTemplate);
while (m.find()) {
boolean isCurrentShardNum = (m.group(1).charAt(0) == 'S');
boolean isNumberOfShards = (m.group(1).charAt(0) == 'N');
boolean isPane = (m.group(1).charAt(0) == 'P') && paneStr != null;
boolean isWindow = (m.group(1).charAt(0) == 'W') && windowStr != null;
char[] zeros = new char[m.end() - m.start()];
Arrays.fill(zeros, '0');
DecimalFormat df = new DecimalFormat(String.valueOf(zeros));
if (isCurrentShardNum) {
String formatted = df.format(shardNum);
m.appendReplacement(sb, formatted);
} else if (isNumberOfShards) {
String formatted = df.format(numShards);
m.appendReplacement(sb, formatted);
} else if (isPane) {
m.appendReplacement(sb, paneStr);
} else if (isWindow) {
m.appendReplacement(sb, windowStr);
}
}
m.appendTail(sb);
sb.append(suffix);
return baseFilename
.getCurrentDirectory()
.resolve(sb.toString(), StandardResolveOptions.RESOLVE_FILE);
}
@Override
@Nullable
public ResourceId unwindowedFilename(
int shardNumber, int numShards, OutputFileHints outputFileHints) {
return constructName(
params.baseFilename.get(),
params.shardTemplate,
params.suffix + outputFileHints.getSuggestedFilenameSuffix(),
shardNumber,
numShards,
null,
null);
}
@Override
public ResourceId windowedFilename(
int shardNumber,
int numShards,
BoundedWindow window,
PaneInfo paneInfo,
OutputFileHints outputFileHints) {
String paneStr = paneInfoToString(paneInfo);
String windowStr = windowToString(window);
return constructName(
params.baseFilename.get(),
params.shardTemplate,
params.suffix + outputFileHints.getSuggestedFilenameSuffix(),
shardNumber,
numShards,
paneStr,
windowStr);
}
/*
* Since not all windows have toString() that is nice or is compatible to be a part of file name.
*/
private String windowToString(BoundedWindow window) {
if (window instanceof GlobalWindow) {
return "GlobalWindow";
}
if (window instanceof IntervalWindow) {
IntervalWindow iw = (IntervalWindow) window;
return String.format("%s-%s", iw.start().toString(), iw.end().toString());
}
return window.toString();
}
private String paneInfoToString(PaneInfo paneInfo) {
String paneString = String.format("pane-%d", paneInfo.getIndex());
if (paneInfo.getTiming() == Timing.LATE) {
paneString = String.format("%s-late", paneString);
}
if (paneInfo.isLast()) {
paneString = String.format("%s-last", paneString);
}
return paneString;
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
String displayBaseFilename =
params.baseFilename.isAccessible()
? params.baseFilename.get().toString()
: ("(" + params.baseFilename + ")");
builder.add(
DisplayData.item(
"filenamePattern",
String.format("%s%s%s", displayBaseFilename, params.shardTemplate, params.suffix))
.withLabel("Filename pattern"));
builder.add(
DisplayData.item("filePrefix", params.baseFilename).withLabel("Output File Prefix"));
builder.add(
DisplayData.item("shardNameTemplate", params.shardTemplate)
.withLabel("Output Shard Name Template"));
builder.add(DisplayData.item("fileSuffix", params.suffix).withLabel("Output file Suffix"));
}
private static String extractFilename(ResourceId input) {
if (input.isDirectory()) {
return "";
} else {
return firstNonNull(input.getFilename(), "");
}
}
}